Using Dask DataFrames in Dataiku#

This article introduces distributed data manipulation in Dataiku using Dask DataFrames.
Specifically, it illustrates how to:

  1. Deploy a Dask Cluster on a Dataiku-managed Elastic AI cluster.

  2. Natively read a Dataiku dataset as a Dask DataFrame (i.e., without an intermediate step as a Pandas DataFrame).

  3. Perform distributed, parallelized computations on the Dask DataFrame.

Pre-requisites#

Note

The code environment only needs to be built locally, i.e., containerized execution support is not required.

Deploying a Dataiku-managed Elastic AI cluster#

Before deploying the Dask Cluster, a Dataiku administrator must deploy a Dataiku-managed Elastic AI cluster.

A few things to keep in mind about the Elastic AI Cluster:

  • Dataiku recommends creating a dedicated nodegroup to host the Dask Cluster; the nodegroup can be static or auto-scale.

  • Dataiku recommends that each cluster node have at least 32 GB RAM for typical data processing workloads.

Deploying a Dask Cluster#

The procedure to deploy a Dask Cluster onto a Dataiku-managed Elastic AI cluster is as follows.

Important

Note that this procedure requires Dataiku Administrator privileges.

  1. Install this plugin.

  2. Navigate to the “Macros” section of a Dataiku project, search for the “Dask” macros, and select the “Start Dask Cluster” macro.

    Figure 1.1 -- Macros for Dask

    Figure 1.1 – Macros for Dask#

  3. Fill in the macro fields, keeping in mind the following information:

    • Dask image tag: this should be a valid tag from Dask’s Docker Registry; the Python version of the image tag should match the Python version of the Dataiku code environment created in the pre-requisites section.

    • The CPU / memory request should be at least 1 CPU / 2GB RAM per worker.

    • The total worker resources should fit within the Elastic AI Cluster’s default nodegroup.

    Run the macro.

    Figure 1.2 -- Start macro

    Figure 1.2 – Start macro#

  4. Once the “Start Dask Cluster” macro is complete, run the “Inspect Dask Cluster” macro; this retrieves the Dask Cluster deployment status and provides the Dask Cluster endpoint once the cluster is ready.

    Figure 1.3 -- Inspect Dask Cluster

    Figure 1.3 – Inspect Dask Cluster#

  5. Optionally, set up a port forwarding connection to the Dask Dashboard using the “Dask Dashboard Port-Forward” macro.

    Figure 1.4 -- Dask Dashboard Port-Forward

    Figure 1.4 – Dask Dashboard Port-Forward#

    The Dask Dashboard will then be accessible at http://\<Dataiku instance domain>:8787 while the macro is running.

If everything goes well, you should end up with something similar to:

Figure 1 -- Dask cluster is up and running

Figure 1: Dask cluster is up and running#

For more information on this procedure and how to further customize the deployment of a Dask Cluster on Kubernetes (e.g., writing custom Dask Operator plugins), please refer to the relevant Dask Kubernetes documentation.

Read and Manipulate a Dataiku Dataset as a Dask DataFrame#

Now that the Dask cluster is running, users can manipulate Dataiku datasets in a distributed, parallelized fashion using Dask DataFrames. All that is required is the Dask cluster endpoint determined above.

First, you will need to create a Project Library called dku_dask (under the python folder) with one file, utils.py, with the following content:

utils.py
utils.py#
import dataiku

def extract_packages_list_from_pyenv(env_name):
    """
    Extracts python package list (requested) from an environment.
    Returns a list of python packages.
    """   
    client = dataiku.api_client()
    pyenv = client.get_code_env(env_lang="PYTHON", env_name=env_name)
    pyenv_settings = pyenv.get_settings()
    
    packages = pyenv_settings.settings["specPackageList"]
    packages_list = packages.split("\n")
    
    return packages_list

def s3_credentials_from_dataset(dataset_name):
    """
    Retireves S3 credentials (access key, secret key, session token) from S3 dataset.
    Assumues dataset is stored on S3 connection, with AssumeRole as auth method.
    
    TODO: actually check this, and don't just fail.
    """
    client = dataiku.api_client()
    
    ds = dataiku.Dataset(dataset_name) # resolved path and connection name via internal API
    ds_connection_name = ds.get_config()["params"]["connection"]

    ds_connection = client.get_connection(ds_connection_name)
    ds_connection_info = ds_connection.get_info()
    access_key = ds_connection_info["resolvedAWSCredential"]["accessKey"]
    secret_key = ds_connection_info["resolvedAWSCredential"]["secretKey"]
    session_token = ds_connection_info["resolvedAWSCredential"]["sessionToken"]
    
    return access_key, secret_key, session_token

def s3_path_from_dataset(dataset_name):
    """
    Retrieves full S3 path (i.e. s3:// + bucket name + path in bucjet) for dataset.
    Assumues dataset is stored on S3 connection.
    
    TODO: actually check this, and don't just fail.
    """
    ds = dataiku.Dataset(dataset_name) # resolved path and connection name via internal API
    ds_path = ds.get_location_info()["info"]["path"]
    
    return ds_path

Second, you must create a dataset in the Flow, which will be read as a Dask DataFrame. This example (and the code of the dku_dask project library) assumes that this dataset is:

  • on S3

  • in the parquet format

  • on a Dataiku S3 connection that uses STS with AssumeRole as its authentication method and that the user running the code has “details readable by” access on the connection

Note

Modifying this example code to work with different connection and/or authentication types should be straightforward. For help, see the Dask DataFrame API documentation.

Finally, the following code illustrates how to load a Dataiku dataset as a Dask DataFrame, apply a groupby transformation to the DataFrame (distributed over the cluster), and then collect the results. In this case, the avocado_transactions dataset is a slightly processed version of this Kaggle dataset.

import dataiku
from dku_dask.utils import (
    extract_packages_list_from_pyenv, 
    s3_credentials_from_dataset,
    s3_path_from_dataset
)

from dask.distributed import Client, PipInstall
import dask.dataframe as dd

# Attach to the Dask cluster
# Note: <Cluster IP> is the Dask cluster endpoint determined during the Dask cluster setup steps.
dask_client = Client("<Dask Endpoint>")

# Install missing packages on the cluster 
# Note: <Dataiku code env name> is the name of the code environment created during the pre-requisites steps.
packages = extract_packages_list_from_pyenv("<Dataiku code env name>")
plugin = PipInstall(packages=packages, restart_workers=True)
dask_client.register_plugin(plugin)

# Retrieve Dataiku dataset as Dask DataFrame
## Get S3 credentials
access_key, secret_key, session_token = s3_credentials_from_dataset("avocado_transactions")
storage_options = {
    "key": access_key,
    "secret": secret_key,
    "token": session_token
}

## Get dataset S3 path
dataset_s3_path = s3_path_from_dataset("avocado_transactions")

## Read dataset as Dask DataFrame
df = dd.read_parquet(dataset_s3_path, aggregate_files=True, storage_options=storage_options)

# Perform a groupy manipulation on the DataFrame
result = df.groupby(["type"]).mean()
result.compute()

If all goes well, you should end up with something similar to (assuming you’ve run the code in a notebook):

Figure 1 -- Groupby result

Figure 1: Groupby result#

Conclusion#

You now know how to set up a Dask cluster to achieve distributed, parallelized data manipulation using Dask DataFrames. Adapting this tutorial to your specific ETL needs should be easy.

Here is the complete code for this tutorial:

code.py
import dataiku
from dku_dask.utils import (
    extract_packages_list_from_pyenv, 
    s3_credentials_from_dataset,
    s3_path_from_dataset
)

from dask.distributed import Client, PipInstall
import dask.dataframe as dd

# Attach to the Dask cluster
# Note: <Cluster IP> is the Dask cluster endpoint determined during the Dask cluster setup steps.
dask_client = Client("<Dask Endpoint>")

# Install missing packages on the cluster 
# Note: <Dataiku code env name> is the name of the code environment created during the pre-requisites steps.
packages = extract_packages_list_from_pyenv("<Dataiku code env name>")
plugin = PipInstall(packages=packages, restart_workers=True)
dask_client.register_plugin(plugin)

# Retrieve Dataiku dataset as Dask DataFrame
## Get S3 credentials
access_key, secret_key, session_token = s3_credentials_from_dataset("avocado_transactions")
storage_options = {
    "key": access_key,
    "secret": secret_key,
    "token": session_token
}

## Get dataset S3 path
dataset_s3_path = s3_path_from_dataset("avocado_transactions")

## Read dataset as Dask DataFrame
df = dd.read_parquet(dataset_s3_path, aggregate_files=True, storage_options=storage_options)

# Perform a groupy manipulation on the DataFrame
result = df.groupby(["type"]).mean()
result.compute()