Flow creation and management#

Programmatically building a Flow#

The flow, including datasets, recipes, … can be fully managed and created programmatically.

Datasets can be created and managed using the methods detailed in Datasets (other operations).

Recipes can be created using the new_recipe() method. This follows a builder pattern: new_recipe() returns you a recipe creator object, on which you add settings, and then call the create() method to actually create the recipe object.

The builder objects reproduce the functionality available in the recipe creation modals in the UI, so for more control on the recipe’s setup, it is often necessary to get its settings after creation, modify it, and save it again.

Creating a Python recipe#

builder = project.new_recipe("python")

# Set the input
builder.with_input("myinputdataset")
# Create a new managed dataset for the output in the filesystem_managed connection
builder.with_new_output_dataset("grouped_dataset", "filesystem_managed")

# Set the code - builder is a PythonRecipeCreator, and has a ``with_script`` method
builder.with_script("""
import dataiku
from dataiku import recipe
input_dataset = recipe.get_inputs_as_datasets()[0]
output_dataset = recipe.get_outputs_as_datasets()[0]

df = input_dataset.get_dataframe()
df = df.groupby("something").count()
output_dataset.write_with_schema(df)
""")

recipe = builder.create()

# recipe is now a ``DSSRecipe`` representing the new recipe, and we can now run it
job = recipe.run()

Creating a Sync recipe#

builder = project.new_recipe("sync")
builder = builder.with_input("input_dataset_name")
builder = builder.with_new_output("output_dataset_name", "filesystem_managed")

recipe = builder.create()
job = recipe.run()

Creating and modifying a grouping recipe#

The recipe creation mostly handles setting up the inputs and outputs of the recipes, so most of the setup of the recipe has to be done by retrieving its settings, altering and saving them, then applying schema changes to the output

builder = project.new_recipe("grouping")
builder.with_input("dataset_to_group_on")
# Create a new managed dataset for the output in the "filesystem_managed" connection
builder.with_new_output("grouped_dataset", "filesystem_managed")
builder.with_group_key("column")
recipe = builder.build()

# After the recipe is created, you can edit its settings
recipe_settings = recipe.get_settings()
recipe_settings.set_column_aggregations("myvaluecolumn", sum=True)
recipe_settings.save()

# And you may need to apply new schemas to the outputs
# This will add the myvaluecolumn_sum to the "grouped_dataset" dataset
recipe.compute_schema_updates().apply()

# It should be noted that running a recipe is equivalent to building its output(s)
job = recipe.run()

A complete example#

This examples shows a complete chain:

  • Creating an external dataset

  • Automatically detecting the settings of the dataset (see Datasets (other operations) for details)

  • Creating a prepare recipe to cleanup the dataset

  • Then chaining a grouping recipe, setting an aggregation on it

  • Running the entire chain

dataset = project.create_sql_table_dataset("mydataset", "PostgreSQL", "my_sql_connection", "mytable", "myschema")

dataset_settings = dataset.autodetect_settings()
dataset_settings.save()

# As a shortcut, we can call new_recipe on the DSSDataset object. This way, we don't need to call "with_input"

prepare_builder = dataset.new_recipe("prepare")
prepare_builder.with_new_output("mydataset_cleaned", "filesystem_managed")

prepare_recipe = prepare_builder.create()

# Add a step to clean values in "nb_colis" that are not valid Double
prepare_settings = prepare_recipe.get_settings()
prepare_settings.add_processor_step("FilterOnBadType", {
    "action":"REMOVE_ROW","booleanMode":"AND",
    "appliesTo":"SINGLE_COLUMN",
    "columns":["nb_colis"],"type":"Double"})
prepare_settings.save()

prepare_recipe.compute_schema_updates().apply()
prepare_recipe.run()

# Grouping recipe

grouping_builder = project.new_recipe("grouping")
grouping_builder.with_input("mydataset_cleaned")
grouping_builder.with_new_output("mydataset_cleaned_grouped", "filesystem_managed")
grouping_builder.with_group_key("week")
grouping_recipe = grouping_builder.build()

grouping_recipe_settings = grouping_recipe.get_settings()
grouping_recipe_settings.set_column_aggregations("month", min=True)
grouping_recipe_settings.save()

grouping_recipe.compute_schema_updates().apply()
grouping_recipe.run()

Working with flow zones#

Listing and getting zones#

# List zones

for zone in flow.list_zones():
    print("Zone id=%s name=%s" % (zone.id, zone.name))

    print("Zone has the following items:")
    for item in zone.items:
        print("Zone item: %s" % item)

# Get a zone by id - beware, id not name
zone = flow.get_zone("21344ZsQZ")

# Get the "Default" zone
zone = flow.get_default_zone()

Creating a zone and adding items in it#

flow = project.get_flow()
zone = flow.create_zone("zone1")

# First way of adding an item to a zone
dataset = project.get_dataset("mydataset")
zone.add_item(dataset)

# Second way of adding an item to a zone
dataset = project.get_dataset("mydataset")
dataset.move_to_zone(zone)

# Third way of adding an item to a zone
zones = flow.list_zones()
zone = "zone1"
zoneId = [z.id for z in zones if z.name==zone][0]

dataset = project.get_dataset("mydataset")
dataset.move_to_zone(zoneId)

Changing the settings of a zone#

flow = project.get_flow()
zone = flow.get_zone("21344ZsQZ")

settings = zone.get_settings()
settings.name = "New name"

settings.save()

Getting the zone of a dataset#

dataset = project.get_dataset("mydataset")

zone = dataset.get_zone()
print("Dataset is in zone %s" % zone.id)

Schema propagation#

When the schema of an input dataset is modified, or when the settings of a recipe are modified, you need to propagate this schema change across the flow.

This can be done from the UI, but can also be automated through the API

flow = project.get_flow()

# A propagation always starts from a source dataset and will move from left to right till the end of the Flow

propagation = flow.new_schema_propagation("sourcedataset")

future = propagation.start()
future.wait_for_result()

There are many options for propagation, see dataikuapi.dss.flow.DSSSchemaPropagationRunBuilder

Exporting a flow documentation#

This sample shows how to generate and download a flow documentation from a template.

See Flow Document Generator for more information.

# project is a DSSProject object

flow = project.get_flow()

# Launch the flow document generation by either
# using the default template by calling without arguments
# or specifying a managed folder id and the path to the template to use in that folder
future = flow.generate_documentation(FOLDER_ID, "path/my_template.docx")

# Alternatively, use a custom uploaded template file
with open("my_template.docx", "rb") as f:
    future = flow.generate_documentation_from_custom_template(f)

# Wait for the generation to finish, retrieve the result and download the generated
# flow documentation to the specified file
result = future.wait_for_result()
export_id = result["exportId"]

flow.download_documentation_to_file(export_id, "path/my_flow_documentation.docx")

Detailed examples#

This section contains more advanced examples on Flow-based operations.

Delete orphaned Datasets#

It can happen that after some operations on a Flow one or more Datasets end up not being linked to any Recipe and thus become disconnected from the Flow branches. In order to programmatically remove those Datasets from the Flow, you can list nodes that have neither predecessor nor successor in the graph using the following function:


def delete_orphaned_datasets(project, drop_data=False, dry_run=True):
    """Delete datasets that are not linked to any recipe.
    """

    flow = project.get_flow()
    graph = flow.get_graph()
    cpt = 0
    for name, props in graph.nodes.items():
        if not props["predecessors"] and not props["successors"]:
            print(f"- Deleting {name}...")
            ds = project.get_dataset(name)
            if not dry_run:
                ds.delete(drop_data=drop_data)
                cpt +=1 
            else:
                print("Dry run: nothing was deleted.")
    print(f"{cpt} datasets deleted.")


Attention

Note that the function has additional flags with default values set up to prevent accidental data deletion. Even so, we recommend you to remain extra cautious when clearing/deleting Datasets.

Reference documentation#

Classes#

dataikuapi.dss.recipe.CodeRecipeCreator(...)

Create a recipe running a script.

dataikuapi.dss.dataset.DSSDataset(client, ...)

A dataset on the DSS instance.

dataikuapi.dss.flow.DSSFlowZone(flow, data)

A zone in the Flow.

dataikuapi.dss.flow.DSSFlowZoneSettings(zone)

The settings of a flow zone.

dataikuapi.dss.future.DSSFuture(client, job_id)

A future represents a long-running task on a DSS instance.

dataikuapi.dss.job.DSSJob(client, ...)

A job on the DSS instance.

dataikuapi.dss.project.DSSProject(client, ...)

A handle to interact with a project on the DSS instance.

dataikuapi.dss.flow.DSSProjectFlow(client, ...)

dataikuapi.dss.flow.DSSProjectFlowGraph(...)

dataikuapi.dss.recipe.DSSRecipe(client, ...)

A handle to an existing recipe on the DSS instance.

dataikuapi.dss.recipe.DSSRecipeCreator(type, ...)

Helper to create new recipes.

dataikuapi.dss.recipe.DSSRecipeSettings(...)

Settings of a recipe.

dataikuapi.dss.flow.DSSSchemaPropagationRunBuilder(...)

dataikuapi.dss.recipe.GroupingRecipeCreator(...)

Create a Group recipe.

dataikuapi.dss.recipe.GroupingRecipeSettings(...)

Settings of a grouping recipe.

dataikuapi.dss.recipe.PrepareRecipeSettings(...)

Settings of a Prepare recipe.

dataikuapi.dss.recipe.SingleOutputRecipeCreator(...)

Create a recipe that has a single output.

dataikuapi.dss.flow.DSSSchemaPropagationRunBuilder(...)

Functions#

add_processor_step(type, params)

Add a step in the script.

add_item(obj)

Adds an item to this zone.

autodetect_settings([infer_storage_types])

Detect appropriate settings for this dataset using Dataiku detection engine

compute_schema_updates()

Computes which updates are required to the outputs of this recipe.

create()

Creates the new recipe in the project, and return a handle to interact with it.

create_sql_table_dataset(dataset_name, type, ...)

Create a new SQL table dataset in the project, and return a handle to interact with it.

download_documentation_to_file(export_id, path)

Download a flow documentation into the given output file.

generate_documentation([folder_id, path])

Start the flow document generation from a template docx file in a managed folder, or from the default template if no folder id and path are specified.

generate_documentation_from_custom_template(fp)

Start the flow document generation from a docx template (as a file object).

get_default_zone()

Returns the default zone of the Flow.

get_graph()

Get the flow graph.

get_items_in_traversal_order([as_type])

Get the list of nodes in left to right order.

get_settings()

Get the settings of the recipe, as a DSSRecipeSettings or one of its subclasses.

get_source_computables([as_type])

param str as_type:

How to return the source computables. Possible values are "dict" and "object" (defaults to dict)

get_zone(id)

Gets a single Flow zone by id.

id

items

The list of items explicitly belonging to this zone.

list_zones()

Lists all zones in the Flow.

move_to_zone(zone)

Move this object to a flow zone

new_recipe(type[, recipe_name])

Start the creation of a new recipe taking this dataset as input.

new_recipe(type[, name])

Initializes the creation of a new recipe.

new_schema_propagation(dataset_name)

Start an automatic schema propagation from a dataset.

replace_input_computable(current_ref, new_ref)

This method replaces all references to a "computable" (Dataset, Managed Folder or Saved Model) as input of recipes in the whole Flow by a reference to another computable.

run([job_type, partitions, wait, no_fail])

Starts a new job to run this recipe and wait for it to complete.

save()

Save back the recipe in DSS.

set_column_aggregations(column[, type, min, ...])

Set the basic aggregations on a column.

start()

Starts the actual propagation.

wait_for_result()

Waits for the completion of the long-running task, and returns its result.

with_group_key(group_key)

Set a column as the first grouping key.

with_input(input_id[, project_key, role])

Add an existing object as input to the recipe-to-be-created.

with_new_output(name, connection[, type, ...])

Create a new dataset or managed folder as output to the recipe-to-be-created.

with_new_output_dataset(name, connection[, ...])

Create a new managed dataset as output to the recipe-to-be-created.

with_script(script)

Set the code of the recipe-to-be-created.