Streaming endpoints#
- class dataiku.StreamingEndpoint(id, project_key=None)#
This is a handle to obtain readers and writers on a dataiku streaming endpoint.
- get_location_info(sensitive_info=False)#
Get details on the stream underlying the streaming endpoint.
- Parameters:
sensitive_info (boolean) – whether to get the details of the connection this endpoint uses.
- Returns:
a dict of the information. The top-level has an info field containing
projectKey and id : identifiers for the streaming endpoint
type : type of streaming endpoint (Kafka, SQS, …)
connection : name of the connection that the endpoint uses
additional fields depending on the connection type. For example kafka endpoints have a topic field
connectionParams : if sensitive info was requested and the user has access to the details of the connection, this field is a dict of the connection params and credentials
- Return type:
dict
- get_schema(raise_if_empty=True)#
Get the schema of this streaming endpoint.
- Returns:
an array of columns. Each column is a dict with fields:
name : the column name
type : the column type (smallint, int, bigint, float, double, boolean, date, string)
length : the string length
comment : the column name
- Return type:
- set_schema(columns)#
Set the schema of this streaming endpoint.
Usage example:
# copy schema from input to output of recipe input = dataiku.StreamingEndpoint("input_endpoint_name") output = dataiku.StreamingEndpoint("output_endpoint_name") output.set_schema(input.get_schema())
- Parameters:
columns (list) –
an array of columns. Each column is a dict with fields:
name : the column name
type : the column type (smallint, int, bigint, float, double, boolean, date, string)
length : the string length
comment : the column name
- get_writer()#
Get a stream writer to append to this streaming endpoint as a sink.
Note
The writes are buffered python-side, so the writer must be regularly flushed with calls to
dataiku.core.continuous_write.ContinuousWriterBase.flush()
Important
The schema of the streaming endpoint MUST be set before using this. If you don’t set the schema of the streaming endpoint, your data will generally not be stored by the output writers
Usage example:
# write increasing integers to a stream i = 0 with endpoint.get_writer() as test_writer: while True: test_writer.write_row_dict({"counter":i}) test_writer.flush() time.sleep(0.1) i += 1
- Returns:
a writer to wich records can be sent.
- Return type:
dataiku.core.continuous_write.StreamingEndpointContinuousWriter
- get_message_iterator(previous_state=None, columns=[])#
Get the records of the stream underlying the streaming endpoint.
Attention
The effect of the previous state given to this method depends on the underlying streaming system. At the moment it should only be used with systems which can replay messages (for example Kafka).
Usage example:
# read 5 first messages messages = endpoint.get_message_iterator() last_state = None for i in range(0, 5): print("got: %s" % messages.next()) last_state = messages.get_state() # then the rest print("last state was %s" % last_state) for message in endpoint.get_message_iterator(last_state): print("got %s" % message)
- Parameters:
previous_state (string) – string representing a state in the stream. Should be obtained via
dataiku.core.streaming_endpoint.StreamingEndpointStream.get_state()
columns (list[string]) – (optional) list of columns to retrieve. Default: retrieve all columns
- Returns:
a generator of records from the stream
- Return type:
- get_native_kafka_topic(broker_version='1.0.0')#
Get a pykafka topic for the Kafka topic of this streaming endpoint.
- Parameters:
broker_version (string) – The protocol version of the cluster being connected to (see pykafka doc)
- Return type:
pykafka.topic.Topic
- get_native_kafka_consumer(broker_version='1.0.0', **kwargs)#
Get a pykafka consumer for the Kafka topic of this streaming endpoint.
See pykafka doc for the possible parameters.
- Parameters:
broker_version (string) – The protocol version of the cluster being connected to
- Return type:
pykafka.simpleconsumer.SimpleConsumer
- get_native_kafka_producer(broker_version='1.0.0', **kwargs)#
Get a pykafka producer for the Kafka topic of this streaming endpoint.
See pykafka doc for the possible parameters.
- Parameters:
broker_version (string) – The protocol version of the cluster being connected to
- Return type:
pykafka.producer.Producer
- get_native_httpsse_consumer()#
Get a SSEClient for the HTTP SSE url of this streaming endpoint.
- Return type:
sseclient.SSEClient
- get_native_sqs_consumer()#
Get a generator of the messages in a SQS queue, backed by a boto3 client.
Each message is returned as-is, i.e. as a string
- Return type:
generator[string]
- class dataiku.core.streaming_endpoint.StreamingEndpointStream(streaming_endpoint, previous_state, columns)#
Handle to read a streaming endpoint.
This class is a Python generator, returning a sequence of records, each one a dict. The fields in each record are named according to the columns in the schema of the streaming endpoint.
Note
Do not instantiate this class directly, use
dataiku.StreamingEndpoint.get_message_iterator()
insteadUsage example:
# print stream to standard output endpoint = dataiku.StreamingEndpoint("my_endpoint_name") for message in e.get_message_iterator(): print(message)
- next()#
Get next record in stream.
- Returns:
a dict with fields according to the streaming endpoint schema
- Return type:
dict
- get_state()#
Get the state of the stream.
- Returns:
an endpoint-type specific string that can be passed as previous_state in
dataiku.StreamingEndpoint.get_message_iterator()
. For a Kafka endpoint, this would be a string representing the offsets of the consumer in the topic- Return type:
string
- class dataiku.core.continuous_write.ContinuousWriterBase#
Handle to write using the continuous write API to a dataset or streaming endpoint.
Note
Do not instantiate directly, use
dataiku.Dataset.get_continuous_writer()
ordataiku.StreamingEndpoint.get_writer()
insteadCaution
You MUST close the handle after usage. Failure to do so will result in resource leaks.
- write_tuple(row)#
Write a single row from a tuple or list of column values.
Important
The schema of the dataset or streaming endpoint MUST be set before using this.
Note
Strings MUST be given as Unicode object. Giving str objects will fail.
- Parameters:
row (tuple or list) – columns values, must be given in the order of the dataset schema.
- write_row_array(row)#
Write a single row from an array of column values.
Caution
Deprecated. Use
write_tuple()
instead- Parameters:
row (tuple or list) – columns values, must be given in the order of the dataset schema.
- write_row_dict(row)#
Write a single row from a dict of column name -> column value.
Some columns can be omitted, empty values will be inserted instead.
Important
The schema of the dataset or streaming endpoint MUST be set before using this.
Note
Strings MUST be given as Unicode object. Giving str objects will fail.
- Parameters:
row (dict) – a dict of column name to column value (the method doesn’t take use kwargs)
- write_dataframe(df)#
Append a Pandas dataframe to the dataset being written.
This method can be called multiple times (especially when you have been using iter_dataframes to read from an input dataset)
Note
Strings MUST be given as Unicode object. Giving str objects will fail.
- Parameters:
df (
pandas.core.frame.DataFrame
) – a Pandas dataframe
- flush()#
Send pending writes to the dataset or streaming endpoint.
- checkpoint(state)#
Checkpoint the dataset and set its current state.
Upon checkpointing, the dataset will attempt to flush writes atomically and to record the state. The state would then be accessible for reading later on, via
get_state()
.Whether the flush happens atomically depends on the dataset type, and whether the state is effectively recorded is dependent on the dataset type and settings. At the moment, only file-based datasets are able to do full checkpoints (atomic flush, and state recorded).
- Parameters:
state (string) – state associated to the checkpoint
- get_state()#
Retrieve the current state of the dataset being written to.
The state is what was passed as parameter to the last
checkpoint()
call on the dataset.- Return type:
string
- close(failed)#
Close this dataset or streaming endpoint writer.
- Parameters:
failed (boolean) – if true, the last write chunk is not commited
- class dataiku.core.continuous_write.StreamingEndpointContinuousWriter(streaming_endpoint)#
Handle to write using the continuous write API to a streaming endpoint.
Note
Do not instantiate directly, use
dataiku.StreamingEndpoint.get_writer()
instead- send_init_request()#
Internal.
- get_schema()#
Get the schema of the streaming endpoint being written to.
- Returns:
an array of columns definition with their types and names. See
dataiku.core.dataset.Schema
for more information.- Return type:
- class dataikuapi.dss.streaming_endpoint.DSSStreamingEndpointListItem(client, data)#
An item in a list of streaming endpoints.
Important
Do not instantiate this class, use
DSSProject.list_streaming_endpoints()
instead- to_streaming_endpoint()#
Get a handle on the corresponding streaming endpoint.
- Return type:
- property name#
Get the streaming endpoint name.
- Return type:
string
- property id#
Get the streaming endpoint identifier.
Note
For streaming endpoints, the identifier is equal to its name
- Return type:
string
- property type#
Get the streaming endpoint type.
- Returns:
the type is the same as the type of the connection that the streaming endpoint uses, ie Kafka, SQS, KDBPlus or httpsse
- Return type:
string
- property schema#
Get the schema of the streaming endpoint.
Usage example:
# list all endpoints containing columns with a 'PII' in comment for endpoint in p.list_streaming_endpoints(): column_list = endpoint.schema['columns'] pii_columns = [c for c in column_list if 'PII' in c.get("comment", "")] if len(pii_columns) > 0: print("Streaming endpoint %s contains %s PII columns" % (endpoint.id, len(pii_columns)))
- Returns:
a schema, as a dict with a columns array, in which each element is a column, itself as a dict of
name : the column name
type : the column type (smallint, int, bigint, float, double, boolean, date, string)
length : the string length
comment : user comment about the column
- Return type:
dict
- property connection#
Get the connection on which this streaming endpoint is attached.
- Returns:
a connection name, or None for HTTP SSE endpoints
- Return type:
string
- get_column(column)#
Get a column in the schema, by its name.
- Parameters:
column (string) – name of column to find
- Returns:
a dict of the column settings or None if column does not exist. Fields are:
name : the column name
type : the column type (smallint, int, bigint, float, double, boolean, date, string)
length : the string length
comment : user comment about the column
- Return type:
dict
- class dataikuapi.dss.streaming_endpoint.DSSStreamingEndpoint(client, project_key, streaming_endpoint_name)#
A streaming endpoint on the DSS instance.
- property name#
Get the streaming endpoint name.
- Return type:
string
- property id#
Get the streaming endpoint identifier.
Note
For streaming endpoints, the identifier is equal to its name
- Return type:
string
- delete()#
Delete the streaming endpoint from the flow, and objects using it (recipes or continuous recipes)
Attention
This call doesn’t delete the underlying streaming data. For example for a Kafka streaming endpoint the topic isn’t deleted
- get_settings()#
Get the settings of this streaming endpoint.
Know subclasses of
DSSStreamingEndpointSettings
includeKafkaStreamingEndpointSettings
andHTTPSSEStreamingEndpointSettings
You must use
save()
on the returned object to make your changes effective on the streaming endpoint.# Example: changing the topic on a kafka streaming endpoint streaming_endpoint = project.get_streaming_endpoint("my_endpoint") settings = streaming_endpoint.get_settings() settings.set_connection_and_topic(None, "country") settings.save()
- Returns:
an object containing the settings
- Return type:
DSSStreamingEndpointSettings
or a subclass
- exists()#
Whether this streaming endpoint exists in DSS.
- Return type:
boolean
- get_schema()#
Get the schema of the streaming endpoint.
- Returns:
a schema, as a dict with a columns array, in which each element is a column, itself as a dict of
name : the column name
type : the column type (smallint, int, bigint, float, double, boolean, date, string)
comment : user comment about the column
- Return type:
dict
- set_schema(schema)#
Set the schema of the streaming endpoint.
Usage example:
# copy schema of the input of a continuous recipe to its output recipe = p.get_recipe('my_recipe_name') recipe_settings = recipe.get_settings() input_endpoint = p.get_streaming_endpoint(recipe_settings.get_flat_input_refs()[0]) output_endpoint = p.get_streaming_endpoint(recipe_settings.get_flat_output_refs()[0]) output_endpoint.set_schema(input_endpoint.get_schema())
- Parameters:
schema (dict) –
a schema, as a dict with a columns array, in which each element is a column, itself as a dict of
name : the column name
type : the column type (smallint, int, bigint, float, double, boolean, date, string)
comment : user comment about the column
- get_zone()#
Get the flow zone of this streaming endpoint.
- Returns:
a flow zone
- Return type:
- move_to_zone(zone)#
Move this object to a flow zone.
- Parameters:
zone (object) – a
dataikuapi.dss.flow.DSSFlowZone
where to move the object, or its identifier
Share this object to a flow zone.
- Parameters:
zone (object) – a
dataikuapi.dss.flow.DSSFlowZone
where to share the object, or its identifier
Unshare this object from a flow zone.
- Parameters:
zone (object) – a
dataikuapi.dss.flow.DSSFlowZone
from where to unshare the object, or its identifier
- get_usages()#
Get the recipes referencing this streaming endpoint.
Usage example:
for usage in streaming_endpoint.get_usages(): if usage["type"] == 'RECIPE_INPUT': print("Used as input of %s" % usage["objectId"])
- Returns:
a list of usages, each one a dict of:
type : the type of usage, either “RECIPE_INPUT” or “RECIPE_OUTPUT”
objectId : name of the recipe or continuous recipe
objectProjectKey : project of the recipe or continuous recipe
- Return type:
list[dict]
- get_object_discussions()#
Get a handle to manage discussions on the streaming endpoint.
- Returns:
the handle to manage discussions
- Return type:
- test_and_detect(infer_storage_types=False, limit=10, timeout=60)#
Used internally by
autodetect_settings()
. It is not usually required to call this methodAttention
Only Kafka and HTTP-SSE streaming endpoints are handled
Note
Schema inferrence is done on the captured rows from the underlying stream. If no record is captured, for example because no message is posted to the stream while the capture is done, then no schema inferrence can take place.
- Returns:
a future object on the result of the detection. The future’s result is a dict with fields:
table : the captured rows
schemaDetection : the result of the schema inference. Notable sub-fields are
detectedSchema : the inferred schema
detectedButNotInSchema : list of column names found in the capture, but not yet in the endpoint’s schema
inSchemaButNotDetected : list of column names present in the endpoint’s schema, but not found in the capture
- Return type:
- autodetect_settings(infer_storage_types=False, limit=10, timeout=60)#
Detect an appropriate schema for this streaming endpoint using Dataiku detection engine.
The detection bases itself on a capture of the stream that this streaming endpoint represents. First a number of messages are captured from the stream, within the bounds passed as parameters, then a detection of the columns and their types is done on the captured messages. If no message is send on the stream during the capture, no column is inferred.
Attention
Only Kafka and HTTP-SSE streaming endpoints are handled
Note
Format-related settings are not automatically inferred. For example the method will not detect whether Kafka messages are JSON-encoded or Avro-encoded
Usage example:
# create a kafka endpoint on a json stream and detect its schema e = project.create_kafka_streaming_endpoint('test_endpoint', 'kafka_connection', 'kafka-topic') s = e.autodetect_settings(infer_storage_types=True) s.save()
- Parameters:
infer_storage_types (boolean) – if True, DSS will try to guess types of the columns. If False, all columns will be assumed to be strings (default: False)
limit (int) – max number of rows to use for the autodetection (default: 10)
timeout (int) – max duration in seconds of the stream capture to use for the autodetection (default: 60)
- Returns:
streaming endpoint settings with an updated schema that you can
DSSStreamingEndpointSettings.save()
.- Return type:
DSSStreamingEndpointSettings
or a subclass
- get_as_core_streaming_endpoint()#
Get the streaming endpoint as a handle for use inside DSS.
- Returns:
the streaming endpoint
- Return type:
- new_code_recipe(type, code=None, recipe_name=None)#
Start the creation of a new code recipe taking this streaming endpoint as input.
Usage example:
# create a continuous python recipe from an endpoint recipe_creator = endpoint.new_code_recipe('cpython', 'some code here', 'compute_something') recipe_creator.with_new_output_streaming_endpoint('something', 'my_kafka_connection', 'avro') recipe = recipe_creator.create()
- Parameters:
type (string) – type of the recipe. Can be ‘cpython’, ‘streaming_spark_scala’ or ‘ksql’; the non-continuous type ‘python’ is also possible
code (string) – (optional) The script of the recipe
recipe_name (string) – (optional) base name for the new recipe.
- Returns:
an object to create a new recipe
- Return type:
- new_recipe(type, recipe_name=None)#
Start the creation of a new recipe taking this streaming endpoint as input.
This method can create non-code recipes like Sync or continuous Sync recipes. For more details, please see
dataikuapi.dss.project.DSSProject.new_recipe()
.Usage example:
# create a continuous sync from an endpoint to a dataset recipe_creator = endpoint.new_recipe('csync', 'compute_my_dataset_name') recipe_creator.with_new_output('my_dataset_name', 'filesystem_managed') recipe = recipe_creator.create()
- Parameters:
type (string) – type of the recipe. Possible values are ‘csync’, ‘cpython’, ‘python’, ‘ksql’, ‘streaming_spark_scala’
recipe_name (string) – (optional) base name for the new recipe.
- Returns:
A new DSS Recipe Creator handle
- Return type:
dataikuapi.dss.recipe.DSSRecipeCreator
or a subclass
- class dataikuapi.dss.streaming_endpoint.DSSStreamingEndpointSettings(streaming_endpoint, settings)#
Base settings class for a DSS streaming endpoint.
Important
Do not instantiate this class directly, use
DSSStreamingEndpoint.get_settings()
Use
save()
to save your changes- get_raw()#
Get the streaming endpoint settings.
- Returns:
the settings, as a dict. The type-specific parameters, that depend on the connection type, are a params sub-dict.
- Return type:
dict
- get_raw_params()#
Get the type-specific (Kafka/ HTTP-SSE/ Kdb+…) params as a dict.
- Returns:
the type-specific params. Each type defines a set of fields; commonly found fields are :
connection : name of the connection used by the streaming endpoint
topic or queueName : name of the Kafka topic or SQS queue corresponding to this streaming endpoint
- Return type:
dict
- property type#
Get the type of streaming system that the streaming endpoint uses.
- Returns:
a type of streaming system. Possible values: ‘kafka’, ‘SQS’, ‘kdbplustick’, ‘httpsse’
- Return type:
string
- add_raw_schema_column(column)#
Add a column in the schema
- Parameters:
column (dict) – a dict defining the column. It should contain a “name” field and a “type” field (with a DSS type, like bigint, double, string, date, boolean, …), optionally a “length” field for string-typed columns and a “comment” field.
- save()#
Save the changes to the settings on the streaming endpoint.
- class dataikuapi.dss.streaming_endpoint.KafkaStreamingEndpointSettings(streaming_endpoint, settings)#
Settings for a Kafka streaming endpoint.
This class inherits from
DSSStreamingEndpointSettings
.Important
Do not instantiate this class directly, use
DSSStreamingEndpoint.get_settings()
Use
save()
to save your changes- set_connection_and_topic(connection=None, topic=None)#
Change the connection and topic of an endpoint.
- Parameters:
connection (string) – (optional) name of a Kafka connection in DSS
topic (string) – (optional) name of a Kafka topic. Can contain DSS variables
- class dataikuapi.dss.streaming_endpoint.HTTPSSEStreamingEndpointSettings(streaming_endpoint, settings)#
Settings for a HTTP-SSE streaming endpoint.
This class inherits from
DSSStreamingEndpointSettings
.Important
Do not instantiate this class directly, use
DSSStreamingEndpoint.get_settings()
Use
save()
to save your changes- set_url(url)#
Change the URL of the endpoint.
- Parameters:
url (string) – url to connect to
- class dataikuapi.dss.streaming_endpoint.DSSManagedStreamingEndpointCreationHelper(project, streaming_endpoint_name, streaming_endpoint_type)#
Utility class to help create a new streaming endpoint.
Note
Do not instantiate directly, use
DSSProject.new_managed_streaming_endpoint()
insteadImportant
Only Kafka and SQS endpoints support managed streaming endpoints
- get_creation_settings()#
Get the settings for the creation of the new managed streaming endpoint.
Note
Modifying the values in the creation settings directly is discouraged, use
with_store_into()
instead.- Returns:
the settings as a dict
- Return type:
dict
- with_store_into(connection, format_option_id=None)#
Set the DSS connection underlying the new streaming endpoint.
- Parameters:
connection (string) – name of the connection to store into
format_option_id (string) –
(optional) identifier of a serialization format option. For Kafka endpoints, possible values are :
json : messages are JSON strings
single : messages are handled as a single typed field
avro : messages are Kafka-Avro messages (ie an avro message padded with a field indicating the avro schema version in the kafka schema registry)
For SQS endpoints, possible values are
json : messages are JSON strings
string : messages are raw strings
- Returns:
self
- create(overwrite=False)#
Execute the creation of the streaming endpoint according to the selected options
- Parameters:
overwrite (boolean) – If the streaming endpoint being created already exists, delete it first
- Returns:
an object corresponding to the newly created streaming endpoint
- Return type:
- already_exists()#
Whether the desired name for the new streaming endpoint is already used by another streaming endpoint
- Return type:
boolean
- class dataikuapi.dss.continuousactivity.DSSContinuousActivity(client, project_key, recipe_id)#
A handle to interact with the execution of a continuous recipe on the DSS instance.
Important
Do not create this class directly, instead use
dataikuapi.dss.project.DSSProject.get_continuous_activity()
- start(loop_params={})#
Start the continuous activity
- Parameters:
loop_params (dict) –
controls how the recipe is restarted after a failure, and the delay before the restarting. Default is to restart indefinitely without delay. Fields are:
abortAfterCrashes : when reaching this number of failures, the recipe isn’t restarted anymore. Use -1 as ‘no limit on number of failures’
initialRestartDelayMS : initial delay to wait before restarting after a failure
restartDelayIncMS : increase to the delay before restarting upon subsequent failures
maxRestartDelayMS : max delay before restarting after failure
- stop()#
Stop the continuous activity.
- get_status()#
Get the current status of the continuous activity.
Usage example:
# stop a continuous activity via its future from dataikuapi.dss.future import DSSFuture activity = project.get_continuous_activity("my_continuous_recipe") status = activity.get_status() future = DSSFuture(a.client, status["mainLoopState"]['futureId'], status["mainLoopState"]['futureInfo']) future.abort() # this is equivalent to simply stop() activity.stop()
- Returns:
the state of the continuous activity. The state as requested by the use is stored in a desiredState field (values: ‘STARTED’ or ‘STOPPED’), and the current effective state in a mainLoopState sub-dict.
- Return type:
dict
- get_recipe()#
Get a handle on the associated recipe.
- Return type: