Streaming endpoints#

class dataiku.StreamingEndpoint(id, project_key=None)#

This is a handle to obtain readers and writers on a dataiku streaming endpoint.

get_location_info(sensitive_info=False)#

Get details on the stream underlying the streaming endpoint.

Parameters:

sensitive_info (boolean) – whether to get the details of the connection this endpoint uses.

Returns:

a dict of the information. The top-level has an info field containing

  • projectKey and id : identifiers for the streaming endpoint

  • type : type of streaming endpoint (Kafka, SQS, …)

  • connection : name of the connection that the endpoint uses

  • additional fields depending on the connection type. For example kafka endpoints have a topic field

  • connectionParams : if sensitive info was requested and the user has access to the details of the connection, this field is a dict of the connection params and credentials

Return type:

dict

get_schema(raise_if_empty=True)#

Get the schema of this streaming endpoint.

Returns:

an array of columns. Each column is a dict with fields:

  • name : the column name

  • type : the column type (smallint, int, bigint, float, double, boolean, date, string)

  • length : the string length

  • comment : the column name

Return type:

dataiku.core.dataset.Schema

set_schema(columns)#

Set the schema of this streaming endpoint.

Usage example:

# copy schema from input to output of recipe

    input = dataiku.StreamingEndpoint("input_endpoint_name")
    output = dataiku.StreamingEndpoint("output_endpoint_name")
    output.set_schema(input.get_schema()) 
Parameters:

columns (list) –

an array of columns. Each column is a dict with fields:

  • name : the column name

  • type : the column type (smallint, int, bigint, float, double, boolean, date, string)

  • length : the string length

  • comment : the column name

get_writer()#

Get a stream writer to append to this streaming endpoint as a sink.

Note

The writes are buffered python-side, so the writer must be regularly flushed with calls to dataiku.core.continuous_write.ContinuousWriterBase.flush()

Important

The schema of the streaming endpoint MUST be set before using this. If you don’t set the schema of the streaming endpoint, your data will generally not be stored by the output writers

Usage example:

# write increasing integers to a stream
i = 0
with endpoint.get_writer() as test_writer:
    while True:
        test_writer.write_row_dict({"counter":i})
        test_writer.flush()
        time.sleep(0.1)
        i += 1
Returns:

a writer to wich records can be sent.

Return type:

dataiku.core.continuous_write.StreamingEndpointContinuousWriter

get_message_iterator(previous_state=None, columns=[])#

Get the records of the stream underlying the streaming endpoint.

Attention

The effect of the previous state given to this method depends on the underlying streaming system. At the moment it should only be used with systems which can replay messages (for example Kafka).

Usage example:

# read 5 first messages
messages = endpoint.get_message_iterator()
last_state = None
for i in range(0, 5):
    print("got: %s" % messages.next())
    last_state = messages.get_state()

# then the rest
print("last state was %s" % last_state)
for message in endpoint.get_message_iterator(last_state):
    print("got %s" % message)
Parameters:
Returns:

a generator of records from the stream

Return type:

dataiku.core.streaming_endpoint.StreamingEndpointStream

get_native_kafka_topic(broker_version='1.0.0')#

Get a pykafka topic for the Kafka topic of this streaming endpoint.

Parameters:

broker_version (string) – The protocol version of the cluster being connected to (see pykafka doc)

Return type:

pykafka.topic.Topic

get_native_kafka_consumer(broker_version='1.0.0', **kwargs)#

Get a pykafka consumer for the Kafka topic of this streaming endpoint.

See pykafka doc for the possible parameters.

Parameters:

broker_version (string) – The protocol version of the cluster being connected to

Return type:

pykafka.simpleconsumer.SimpleConsumer

get_native_kafka_producer(broker_version='1.0.0', **kwargs)#

Get a pykafka producer for the Kafka topic of this streaming endpoint.

See pykafka doc for the possible parameters.

Parameters:

broker_version (string) – The protocol version of the cluster being connected to

Return type:

pykafka.producer.Producer

get_native_httpsse_consumer()#

Get a SSEClient for the HTTP SSE url of this streaming endpoint.

Return type:

sseclient.SSEClient

get_native_sqs_consumer()#

Get a generator of the messages in a SQS queue, backed by a boto3 client.

Each message is returned as-is, i.e. as a string

Return type:

generator[string]

class dataiku.core.streaming_endpoint.StreamingEndpointStream(streaming_endpoint, previous_state, columns)#

Handle to read a streaming endpoint.

This class is a Python generator, returning a sequence of records, each one a dict. The fields in each record are named according to the columns in the schema of the streaming endpoint.

Note

Do not instantiate this class directly, use dataiku.StreamingEndpoint.get_message_iterator() instead

Usage example:

# print stream to standard output
endpoint = dataiku.StreamingEndpoint("my_endpoint_name")
for message in e.get_message_iterator():
    print(message)
next()#

Get next record in stream.

Returns:

a dict with fields according to the streaming endpoint schema

Return type:

dict

get_state()#

Get the state of the stream.

Returns:

an endpoint-type specific string that can be passed as previous_state in dataiku.StreamingEndpoint.get_message_iterator(). For a Kafka endpoint, this would be a string representing the offsets of the consumer in the topic

Return type:

string

class dataiku.core.continuous_write.ContinuousWriterBase#

Handle to write using the continuous write API to a dataset or streaming endpoint.

Note

Do not instantiate directly, use dataiku.Dataset.get_continuous_writer() or dataiku.StreamingEndpoint.get_writer() instead

Caution

You MUST close the handle after usage. Failure to do so will result in resource leaks.

write_tuple(row)#

Write a single row from a tuple or list of column values.

Important

The schema of the dataset or streaming endpoint MUST be set before using this.

Note

Strings MUST be given as Unicode object. Giving str objects will fail.

Parameters:

row (tuple or list) – columns values, must be given in the order of the dataset schema.

write_row_array(row)#

Write a single row from an array of column values.

Caution

Deprecated. Use write_tuple() instead

Parameters:

row (tuple or list) – columns values, must be given in the order of the dataset schema.

write_row_dict(row)#

Write a single row from a dict of column name -> column value.

Some columns can be omitted, empty values will be inserted instead.

Important

The schema of the dataset or streaming endpoint MUST be set before using this.

Note

Strings MUST be given as Unicode object. Giving str objects will fail.

Parameters:

row (dict) – a dict of column name to column value (the method doesn’t take use kwargs)

write_dataframe(df)#

Append a Pandas dataframe to the dataset being written.

This method can be called multiple times (especially when you have been using iter_dataframes to read from an input dataset)

Note

Strings MUST be given as Unicode object. Giving str objects will fail.

Parameters:

df (pandas.core.frame.DataFrame) – a Pandas dataframe

flush()#

Send pending writes to the dataset or streaming endpoint.

checkpoint(state)#

Checkpoint the dataset and set its current state.

Upon checkpointing, the dataset will attempt to flush writes atomically and to record the state. The state would then be accessible for reading later on, via get_state().

Whether the flush happens atomically depends on the dataset type, and whether the state is effectively recorded is dependent on the dataset type and settings. At the moment, only file-based datasets are able to do full checkpoints (atomic flush, and state recorded).

Parameters:

state (string) – state associated to the checkpoint

get_state()#

Retrieve the current state of the dataset being written to.

The state is what was passed as parameter to the last checkpoint() call on the dataset.

Return type:

string

close(failed)#

Close this dataset or streaming endpoint writer.

Parameters:

failed (boolean) – if true, the last write chunk is not commited

class dataiku.core.continuous_write.StreamingEndpointContinuousWriter(streaming_endpoint)#

Handle to write using the continuous write API to a streaming endpoint.

Note

Do not instantiate directly, use dataiku.StreamingEndpoint.get_writer() instead

send_init_request()#

Internal.

get_schema()#

Get the schema of the streaming endpoint being written to.

Returns:

an array of columns definition with their types and names. See dataiku.core.dataset.Schema for more information.

Return type:

dataiku.core.dataset.Schema

class dataikuapi.dss.streaming_endpoint.DSSStreamingEndpointListItem(client, data)#

An item in a list of streaming endpoints.

Important

Do not instantiate this class, use DSSProject.list_streaming_endpoints() instead

to_streaming_endpoint()#

Get a handle on the corresponding streaming endpoint.

Return type:

DSSStreamingEndpoint

property name#

Get the streaming endpoint name.

Return type:

string

property id#

Get the streaming endpoint identifier.

Note

For streaming endpoints, the identifier is equal to its name

Return type:

string

property type#

Get the streaming endpoint type.

Returns:

the type is the same as the type of the connection that the streaming endpoint uses, ie Kafka, SQS, KDBPlus or httpsse

Return type:

string

property schema#

Get the schema of the streaming endpoint.

Usage example:

# list all endpoints containing columns with a 'PII' in comment
for endpoint in p.list_streaming_endpoints():
    column_list = endpoint.schema['columns']
    pii_columns = [c for c in column_list if 'PII' in c.get("comment", "")]
    if len(pii_columns) > 0:
        print("Streaming endpoint %s contains %s PII columns" % (endpoint.id, len(pii_columns)))            
Returns:

a schema, as a dict with a columns array, in which each element is a column, itself as a dict of

  • name : the column name

  • type : the column type (smallint, int, bigint, float, double, boolean, date, string)

  • length : the string length

  • comment : user comment about the column

Return type:

dict

property connection#

Get the connection on which this streaming endpoint is attached.

Returns:

a connection name, or None for HTTP SSE endpoints

Return type:

string

get_column(column)#

Get a column in the schema, by its name.

Parameters:

column (string) – name of column to find

Returns:

a dict of the column settings or None if column does not exist. Fields are:

  • name : the column name

  • type : the column type (smallint, int, bigint, float, double, boolean, date, string)

  • length : the string length

  • comment : user comment about the column

Return type:

dict

class dataikuapi.dss.streaming_endpoint.DSSStreamingEndpoint(client, project_key, streaming_endpoint_name)#

A streaming endpoint on the DSS instance.

property name#

Get the streaming endpoint name.

Return type:

string

property id#

Get the streaming endpoint identifier.

Note

For streaming endpoints, the identifier is equal to its name

Return type:

string

delete()#

Delete the streaming endpoint from the flow, and objects using it (recipes or continuous recipes)

Attention

This call doesn’t delete the underlying streaming data. For example for a Kafka streaming endpoint the topic isn’t deleted

get_settings()#

Get the settings of this streaming endpoint.

Know subclasses of DSSStreamingEndpointSettings include KafkaStreamingEndpointSettings and HTTPSSEStreamingEndpointSettings

You must use save() on the returned object to make your changes effective on the streaming endpoint.

# Example: changing the topic on a kafka streaming endpoint
streaming_endpoint = project.get_streaming_endpoint("my_endpoint")
settings = streaming_endpoint.get_settings()
settings.set_connection_and_topic(None, "country")
settings.save()
Returns:

an object containing the settings

Return type:

DSSStreamingEndpointSettings or a subclass

exists()#

Whether this streaming endpoint exists in DSS.

Return type:

boolean

get_schema()#

Get the schema of the streaming endpoint.

Returns:

a schema, as a dict with a columns array, in which each element is a column, itself as a dict of

  • name : the column name

  • type : the column type (smallint, int, bigint, float, double, boolean, date, string)

  • comment : user comment about the column

Return type:

dict

set_schema(schema)#

Set the schema of the streaming endpoint.

Usage example:

# copy schema of the input of a continuous recipe to its output
recipe = p.get_recipe('my_recipe_name')
recipe_settings = recipe.get_settings()
input_endpoint = p.get_streaming_endpoint(recipe_settings.get_flat_input_refs()[0])
output_endpoint = p.get_streaming_endpoint(recipe_settings.get_flat_output_refs()[0])
output_endpoint.set_schema(input_endpoint.get_schema())
Parameters:

schema (dict) –

a schema, as a dict with a columns array, in which each element is a column, itself as a dict of

  • name : the column name

  • type : the column type (smallint, int, bigint, float, double, boolean, date, string)

  • comment : user comment about the column

get_zone()#

Get the flow zone of this streaming endpoint.

Returns:

a flow zone

Return type:

dataikuapi.dss.flow.DSSFlowZone

move_to_zone(zone)#

Move this object to a flow zone.

Parameters:

zone (object) – a dataikuapi.dss.flow.DSSFlowZone where to move the object, or its identifier

share_to_zone(zone)#

Share this object to a flow zone.

Parameters:

zone (object) – a dataikuapi.dss.flow.DSSFlowZone where to share the object, or its identifier

unshare_from_zone(zone)#

Unshare this object from a flow zone.

Parameters:

zone (object) – a dataikuapi.dss.flow.DSSFlowZone from where to unshare the object, or its identifier

get_usages()#

Get the recipes referencing this streaming endpoint.

Usage example:

for usage in streaming_endpoint.get_usages():
    if usage["type"] == 'RECIPE_INPUT':
        print("Used as input of %s" % usage["objectId"])
Returns:

a list of usages, each one a dict of:

  • type : the type of usage, either “RECIPE_INPUT” or “RECIPE_OUTPUT”

  • objectId : name of the recipe or continuous recipe

  • objectProjectKey : project of the recipe or continuous recipe

Return type:

list[dict]

get_object_discussions()#

Get a handle to manage discussions on the streaming endpoint.

Returns:

the handle to manage discussions

Return type:

dataikuapi.dss.discussion.DSSObjectDiscussions

test_and_detect(infer_storage_types=False, limit=10, timeout=60)#

Used internally by autodetect_settings(). It is not usually required to call this method

Attention

Only Kafka and HTTP-SSE streaming endpoints are handled

Note

Schema inferrence is done on the captured rows from the underlying stream. If no record is captured, for example because no message is posted to the stream while the capture is done, then no schema inferrence can take place.

Returns:

a future object on the result of the detection. The future’s result is a dict with fields:

  • table : the captured rows

  • schemaDetection : the result of the schema inference. Notable sub-fields are

    • detectedSchema : the inferred schema

    • detectedButNotInSchema : list of column names found in the capture, but not yet in the endpoint’s schema

    • inSchemaButNotDetected : list of column names present in the endpoint’s schema, but not found in the capture

Return type:

dataikuapi.dss.future.DSSFuture

autodetect_settings(infer_storage_types=False, limit=10, timeout=60)#

Detect an appropriate schema for this streaming endpoint using Dataiku detection engine.

The detection bases itself on a capture of the stream that this streaming endpoint represents. First a number of messages are captured from the stream, within the bounds passed as parameters, then a detection of the columns and their types is done on the captured messages. If no message is send on the stream during the capture, no column is inferred.

Attention

Only Kafka and HTTP-SSE streaming endpoints are handled

Note

Format-related settings are not automatically inferred. For example the method will not detect whether Kafka messages are JSON-encoded or Avro-encoded

Usage example:

# create a kafka endpoint on a json stream and detect its schema
e = project.create_kafka_streaming_endpoint('test_endpoint', 'kafka_connection', 'kafka-topic')
s = e.autodetect_settings(infer_storage_types=True)
s.save()
Parameters:
  • infer_storage_types (boolean) – if True, DSS will try to guess types of the columns. If False, all columns will be assumed to be strings (default: False)

  • limit (int) – max number of rows to use for the autodetection (default: 10)

  • timeout (int) – max duration in seconds of the stream capture to use for the autodetection (default: 60)

Returns:

streaming endpoint settings with an updated schema that you can DSSStreamingEndpointSettings.save().

Return type:

DSSStreamingEndpointSettings or a subclass

get_as_core_streaming_endpoint()#

Get the streaming endpoint as a handle for use inside DSS.

Returns:

the streaming endpoint

Return type:

dataiku.StreamingEndpoint

new_code_recipe(type, code=None, recipe_name=None)#

Start the creation of a new code recipe taking this streaming endpoint as input.

Usage example:

# create a continuous python recipe from an endpoint
recipe_creator = endpoint.new_code_recipe('cpython', 'some code here', 'compute_something')
recipe_creator.with_new_output_streaming_endpoint('something', 'my_kafka_connection', 'avro')
recipe = recipe_creator.create()
Parameters:
  • type (string) – type of the recipe. Can be ‘cpython’, ‘streaming_spark_scala’ or ‘ksql’; the non-continuous type ‘python’ is also possible

  • code (string) – (optional) The script of the recipe

  • recipe_name (string) – (optional) base name for the new recipe.

Returns:

an object to create a new recipe

Return type:

dataikuapi.dss.recipe.CodeRecipeCreator

new_recipe(type, recipe_name=None)#

Start the creation of a new recipe taking this streaming endpoint as input.

This method can create non-code recipes like Sync or continuous Sync recipes. For more details, please see dataikuapi.dss.project.DSSProject.new_recipe().

Usage example:

# create a continuous sync from an endpoint to a dataset
recipe_creator = endpoint.new_recipe('csync', 'compute_my_dataset_name')
recipe_creator.with_new_output('my_dataset_name', 'filesystem_managed')
recipe = recipe_creator.create()
Parameters:
  • type (string) – type of the recipe. Possible values are ‘csync’, ‘cpython’, ‘python’, ‘ksql’, ‘streaming_spark_scala’

  • recipe_name (string) – (optional) base name for the new recipe.

Returns:

A new DSS Recipe Creator handle

Return type:

dataikuapi.dss.recipe.DSSRecipeCreator or a subclass

class dataikuapi.dss.streaming_endpoint.DSSStreamingEndpointSettings(streaming_endpoint, settings)#

Base settings class for a DSS streaming endpoint.

Important

Do not instantiate this class directly, use DSSStreamingEndpoint.get_settings()

Use save() to save your changes

get_raw()#

Get the streaming endpoint settings.

Returns:

the settings, as a dict. The type-specific parameters, that depend on the connection type, are a params sub-dict.

Return type:

dict

get_raw_params()#

Get the type-specific (Kafka/ HTTP-SSE/ Kdb+…) params as a dict.

Returns:

the type-specific params. Each type defines a set of fields; commonly found fields are :

  • connection : name of the connection used by the streaming endpoint

  • topic or queueName : name of the Kafka topic or SQS queue corresponding to this streaming endpoint

Return type:

dict

property type#

Get the type of streaming system that the streaming endpoint uses.

Returns:

a type of streaming system. Possible values: ‘kafka’, ‘SQS’, ‘kdbplustick’, ‘httpsse’

Return type:

string

add_raw_schema_column(column)#

Add a column in the schema

Parameters:

column (dict) – a dict defining the column. It should contain a “name” field and a “type” field (with a DSS type, like bigint, double, string, date, boolean, …), optionally a “length” field for string-typed columns and a “comment” field.

save()#

Save the changes to the settings on the streaming endpoint.

class dataikuapi.dss.streaming_endpoint.KafkaStreamingEndpointSettings(streaming_endpoint, settings)#

Settings for a Kafka streaming endpoint.

This class inherits from DSSStreamingEndpointSettings.

Important

Do not instantiate this class directly, use DSSStreamingEndpoint.get_settings()

Use save() to save your changes

set_connection_and_topic(connection=None, topic=None)#

Change the connection and topic of an endpoint.

Parameters:
  • connection (string) – (optional) name of a Kafka connection in DSS

  • topic (string) – (optional) name of a Kafka topic. Can contain DSS variables

class dataikuapi.dss.streaming_endpoint.HTTPSSEStreamingEndpointSettings(streaming_endpoint, settings)#

Settings for a HTTP-SSE streaming endpoint.

This class inherits from DSSStreamingEndpointSettings.

Important

Do not instantiate this class directly, use DSSStreamingEndpoint.get_settings()

Use save() to save your changes

set_url(url)#

Change the URL of the endpoint.

Parameters:

url (string) – url to connect to

class dataikuapi.dss.streaming_endpoint.DSSManagedStreamingEndpointCreationHelper(project, streaming_endpoint_name, streaming_endpoint_type)#

Utility class to help create a new streaming endpoint.

Note

Do not instantiate directly, use DSSProject.new_managed_streaming_endpoint() instead

Important

Only Kafka and SQS endpoints support managed streaming endpoints

get_creation_settings()#

Get the settings for the creation of the new managed streaming endpoint.

Note

Modifying the values in the creation settings directly is discouraged, use with_store_into() instead.

Returns:

the settings as a dict

Return type:

dict

with_store_into(connection, format_option_id=None)#

Set the DSS connection underlying the new streaming endpoint.

Parameters:
  • connection (string) – name of the connection to store into

  • format_option_id (string) –

    (optional) identifier of a serialization format option. For Kafka endpoints, possible values are :

    • json : messages are JSON strings

    • single : messages are handled as a single typed field

    • avro : messages are Kafka-Avro messages (ie an avro message padded with a field indicating the avro schema version in the kafka schema registry)

    For SQS endpoints, possible values are

    • json : messages are JSON strings

    • string : messages are raw strings

Returns:

self

create(overwrite=False)#

Execute the creation of the streaming endpoint according to the selected options

Parameters:

overwrite (boolean) – If the streaming endpoint being created already exists, delete it first

Returns:

an object corresponding to the newly created streaming endpoint

Return type:

DSSStreamingEndpoint

already_exists()#

Whether the desired name for the new streaming endpoint is already used by another streaming endpoint

Return type:

boolean

class dataikuapi.dss.continuousactivity.DSSContinuousActivity(client, project_key, recipe_id)#

A handle to interact with the execution of a continuous recipe on the DSS instance.

Important

Do not create this class directly, instead use dataikuapi.dss.project.DSSProject.get_continuous_activity()

start(loop_params={})#

Start the continuous activity

Parameters:

loop_params (dict) –

controls how the recipe is restarted after a failure, and the delay before the restarting. Default is to restart indefinitely without delay. Fields are:

  • abortAfterCrashes : when reaching this number of failures, the recipe isn’t restarted anymore. Use -1 as ‘no limit on number of failures’

  • initialRestartDelayMS : initial delay to wait before restarting after a failure

  • restartDelayIncMS : increase to the delay before restarting upon subsequent failures

  • maxRestartDelayMS : max delay before restarting after failure

stop()#

Stop the continuous activity.

get_status()#

Get the current status of the continuous activity.

Usage example:

# stop a continuous activity via its future
from dataikuapi.dss.future import DSSFuture
activity = project.get_continuous_activity("my_continuous_recipe")
status = activity.get_status()
future = DSSFuture(a.client, status["mainLoopState"]['futureId'], status["mainLoopState"]['futureInfo'])  
future.abort()          

# this is equivalent to simply stop()
activity.stop()
Returns:

the state of the continuous activity. The state as requested by the use is stored in a desiredState field (values: ‘STARTED’ or ‘STOPPED’), and the current effective state in a mainLoopState sub-dict.

Return type:

dict

get_recipe()#

Get a handle on the associated recipe.

Return type:

dataikuapi.dss.recipe.DSSRecipe