Visual Machine learning#

Through the public API, the Python client allows you to automate all the aspects of the lifecycle of machine learning models.

  • Creating a visual analysis and ML task

  • Tuning settings

  • Training models

  • Inspecting model details and results

  • Deploying saved models to Flow and retraining them

Concepts#

In DSS, you train models as part of a visual analysis. A visual analysis is made of a preparation script, and one or several ML Tasks.

A ML Task is an individual section in which you train models. A ML Task is either a prediction of a single target variable, or a clustering.

The ML API allows you to manipulate ML Tasks, and use them to train models, inspect their details, and deploy them to the Flow.

Once deployed to the Flow, the Saved model can be retrained by the usual build mechanism of DSS.

A ML Task has settings, which control:

  • Which features are active

  • The preprocessing settings for each features

  • Which algorithms are active

  • The hyperparameter settings (including grid searched hyperparameters) for each algorithm

  • The settings of the grid search

  • Train/Test splitting settings

  • Feature selection and generation settings

Usage samples#

The whole cycle#

This examples create a prediction task, enables an algorithm, trains it, inspects models, and deploys one of the model to Flow

# client is a DSS API client

p = client.get_project("MYPROJECT")

# Create a new ML Task to predict the variable "target" from "trainset"
mltask = p.create_prediction_ml_task(
    input_dataset="trainset",
    target_variable="target",
    ml_backend_type='PY_MEMORY', # ML backend to use
    guess_policy='DEFAULT' # Template to use for setting default parameters
)

# Wait for the ML task to be ready
mltask.wait_guess_complete()

# Obtain settings, enable GBT, save settings
settings = mltask.get_settings()
settings.set_algorithm_enabled("GBT_CLASSIFICATION", True)
settings.save()

# Start train and wait for it to be complete
mltask.start_train()
mltask.wait_train_complete()

# Get the identifiers of the trained models
# There will be 3 of them because Logistic regression and Random forest were default enabled
ids = mltask.get_trained_models_ids()

for id in ids:
    details = mltask.get_trained_model_details(id)
    algorithm = details.get_modeling_settings()["algorithm"]
    auc = details.get_performance_metrics().get("auc", None)

    print("Algorithm=%s AUC=%s" % (algorithm, auc))

# Let's deploy the first model
model_to_deploy = ids[0]

ret = mltask.deploy_to_flow(model_to_deploy, "my_model", "trainset")

print("Deployed to saved model id = %s train recipe = %s" % (ret["savedModelId"], ret["trainRecipeName"]))

The methods for creating prediction and clustering ML tasks are defined at create_prediction_ml_task() and create_clustering_ml_task().

Obtaining a handle to an existing ML Task#

When you create these ML tasks, the returned dataikuapi.dss.ml.DSSMLTask object will contain two fields analysis_id and mltask_id that can later be used to retrieve the same DSSMLTask object

# client is a DSS API client

p = client.get_project("MYPROJECT")
mltask = p.get_ml_task(analysis_id, mltask_id)

Tuning feature preprocessing#

Enabling and disabling features#

# mltask is a DSSMLTask object

settings = mltask.get_settings()

settings.reject_feature("name_of_not_useful_feature")
settings.use_feature("name_of_useful_feature")

settings.save()

Changing advanced parameters for a feature#

# mltask is a DSSMLTask object

settings = mltask.get_settings()

# Use impact coding rather than dummy-coding
fs = settings.get_feature_preprocessing("myfeature")
fs["category_handling"] = "IMPACT"

# Impute missing with most frequent value
fs["missing_handling"] = "IMPUTE"
fs["missing_impute_with"] = "MODE"

settings.save()

Tuning algorithms#

Exporting a model documentation#

This sample shows how to generate and download a model documentation from a template.

See Model Document Generator for more information.

# mltask is a DSSMLTask object

details = mltask.get_trained_model_details(id)

# Launch the model document generation by either
# using the default template for this model by calling without argument
# or specifying a managed folder id and the path to the template to use in that folder
future = details.generate_documentation(FOLDER_ID, "path/my_template.docx")

# Alternatively, use a custom uploaded template file
with open("my_template.docx", "rb") as f:
    future = details.generate_documentation_from_custom_template(f)

# Wait for the generation to finish, retrieve the result and download the generated
# model documentation to the specified file
result = future.wait_for_result()
export_id = result["exportId"]

details.download_documentation_to_file(export_id, "path/my_model_documentation.docx")

Using a model in a Python recipe or notebook#

Once a Saved Model has been deployed to the Flow, the normal way to use it is to use scoring recipes.

However, you can also use the dataiku.Model class in a Python recipe or notebook to directly score records.

This method has a number of limitations:

  • It cannot be used together with containerized execution

  • It is not compatible with Partitioned models

import dataiku

m = dataiku.Model(my_model_id)
my_predictor = m.get_predictor()

predicted_dataframe = my_predictor.predict(input_dataframe)

Detailed examples#

This section contains more advanced examples using ML Tasks and Saved Models.

Deploy best MLTask model to the Flow#

After training several models in a ML Task you can programmatically deploy the best one by creating a new Saved Model or updating an existing one. In the following example:

  • The deploy_with_best_model() function creates a new Saved Model with the input MLTask’s best model

  • The update_with_best_model() function updates an existing Saved Model with the MLTask’s best model.

Both functions rely on dataikuapi.dss.ml.DSSMLTask and dataikuapi.dss.savedmodel.DSSSavedModel.


def get_best_model(project, analysis_id, ml_task_id, metric):
    analysis = project.get_analysis(analysis_id)
    ml_task = analysis.get_ml_task(ml_task_id)
    trained_models = ml_task.get_trained_models_ids()
    trained_models_snippets = [ml_task.get_trained_model_snippet(m) for m in trained_models]
    # Assumes that for your metric, "higher is better"
    best_model_snippet = max(trained_models_snippets, key=lambda x:x[metric])
    best_model_id = best_model_snippet["fullModelId"]
    return ml_task, best_model_id


def deploy_with_best_model(project,
    analysis_id,
    ml_task_id,
    metric,
    saved_model_name,
    training_dataset):
    """Create a new Saved Model in the Flow with the 'best model' of a MLTask.
    """

    ml_task, best_model_id = get_best_model(project,
                                            analysis_id,
                                            ml_task_id,
                                            metric)
    ml_task.deploy_to_flow(best_model_id,
                           saved_model_name,
                           training_dataset)

def update_with_best_model(project,
                           analysis_id,
                           ml_task_id,
                           metric,
                           saved_model_name,
                           activate=True):
    """Update an existing Saved Model in the Flow with the 'best model' 
       of a MLTask.
    """
    ml_task, best_model_id = get_best_model(project,
                                            analysis_id,
                                            ml_task_id,
                                            metric)
    training_recipe_name = f"train_{saved_model_name}"
    ml_task.redeploy_to_flow(model_id=best_model_id,
                             recipe_name=training_recipe_name,
                             activate=activate)

List details of all Saved Models#

You can retrieve, for each Saved Model in a Project, the current model algorithm and performances. In the following example, the get_project_saved_models() function outputs a Python dictionary with several details on the current activeversions of all Saved Models in the target Project.

def explore_saved_models(client=None, project_key=None):
    """List saved models of a project and give details on the active versions.
    Args:
        client: A handle on the target DSS instance
        project_key: A string representing the target project key
    Returns:
        smdl_list: A dict with all saved model ids and perf + algorithm 
                   for the active versions. 
    """
    smdl_list = []
    prj = client.get_project(project_key)
    smdl_ids = [x["id"] for x in prj.list_saved_models()]
    for smdl in smdl_ids:
        data = {}
        obj = prj.get_saved_model(smdl)
        data["version_ids"] = [m["id"] for m in obj.list_versions()]
        active_version_id = obj.get_active_version()["id"]
        active_version_details = obj.get_version_details(active_version_id)
        data["active_version"] = {"id": active_version_id,
                                  "algorithm": active_version_details.details["actualParams"]["resolved"]["algorithm"],
                                  "performance_metrics": active_version_details.get_performance_metrics()}
        smdl_list.append(data)
    return smdl_list

List version details of a given Saved Model#

This code snippet allows you to retrieve a summary of all versions of a given Saved Model (algorithm, hyperparameters, performance, features) using dataikuapi.dss.savedmodel.DSSSavedModel.

import copy
from dataiku import recipe

def export_saved_model_metadata(project, saved_model_id):
    """
    """

    model = project.get_saved_model(saved_model_id)
    output = []
    for version in model.list_versions():
        version_details = model.get_version_details(version["id"])
        version_dict = {}
    
        # Retrieve algorithm and hyperarameters
        resolved = copy.deepcopy(version_details.get_actual_modeling_params()["resolved"])
        version_dict["algorithm"] = resolved["algorithm"]
        del resolved["algorithm"]
        del resolved["skipExpensiveReports"]
        for (key, hyperparameters) in resolved.items():
            for (hyperparameter_key, hyperparameter_value) in hyperparameters.items():
                version_dict["hyperparameter_%s" % hyperparameter_key] = hyperparameter_value
            
        # Retrieve test performance
        for (metric_key, metric_value) in version_details.get_performance_metrics().items():
            version_dict["test_perf_%s" % metric_key] = metric_value
        
        # Retrieve lineage
        version_dict["training_target_variable"] = version_details.details["coreParams"]["target_variable"]
        split_desc = version_details.details["splitDesc"]
        version_dict["training_train_rows"] = split_desc["trainRows"]
        version_dict["training_test_rows"] = split_desc["testRows"]
        training_used_features = []
        for (key, item) in version_details.get_preprocessing_settings()["per_feature"].items():
            if item["role"] == "INPUT":
                training_used_features.append(key)
        version_dict["training_used_features"] = ",".join(training_used_features)
        
        # Retrieve training time
        ti = version_details.get_train_info()
        version_dict["training_total_time"] = int((ti["endTime"] - ti["startTime"])/1000)
        version_dict["training_preprocessing_time"] = int(ti["preprocessingTime"]/1000)
        version_dict["training_training_time"] = int(ti["trainingTime"]/1000)
    
        output.append(version_dict)

    return output

Retrieve linear model coefficients#

You can retrieve the list of coefficient names and values from a Saved Model version for compatible algorithms.

def get_model_coefficients(project, saved_model_id, version_id):
    """
    Returns a dictionary with key="coefficient name" and value=coefficient
    """

    model = project.get_saved_model(saved_model_id)
    if version_id is None:
        version_id = model.get_active_version().get('id')
    details = model.get_version_details(version_id)
    details_lr = details.details.get('iperf', {}).get('lmCoefficients', {})
    rescaled_coefs = details_lr.get('rescaledCoefs', [])
    variables = details_lr.get('variables',[])
    coef_dict = {var: coef for var, coef in zip(variables, rescaled_coefs)}
    if len(coef_dict)==0:
        print(f"Model {saved_model_id} and version {version_id} does not have coefficients")
    return coef_dict

Export model#

You can programmatically export the best version of a Saved Model as either a Python function or a MLFlow model. In the following example, the get_best_classifier_version() function returns the best version id of the classifier.

  1. Pass that id to the dataikuapi.dss.savedmodel.DSSSavedModel.get_version_details() method to get a dataikuapi.dss.ml.DSSTrainedPredictionModelDetails handle.

  2. Then either use get_scoring_python() or get_scoring_mlflow() to download the model archive to a given file name in either Python or MLflow, respectively.

import dataiku

PROJECT_KEY = 'YOUR_PROJECT_KEY'
METRIC = 'auc' # or any classification metrics of interest.
SAVED_MODEL_ID = 'YOUR_SAVED_MODEL_ID'
FILENAME = 'path/to/model-archive.zip'


def get_best_classifier_version(project, saved_model_id, metric):
    """
    This function returns the best version id of a
    given DSS classifier model in a project.
    """

    model = project.get_saved_model(saved_model_id)
    outcome = []
    
    for version in model.list_versions():    
        version_id = version.get('id')
        version_details = model.get_version_details(version_id)
        perf = version_details.get_raw_snippet().get(metric)
        outcome.append((version_id, perf))
    
    # get the best version id. User reverse=False if 
    # lower metric means better
    best_version_id = sorted(
        outcome, key = lambda x: x[1], reverse=True)[0][0]
    
    return best_version_id
        


client = dataiku.api_client()
project = client.get_project(PROJECT_KEY)
model = project.get_saved_model(SAVED_MODEL_ID)
best_version_id = get_best_classifier_version(project, SAVED_MODEL_ID, METRIC)
version_details = model.get_version_details(best_version_id)

# Export in Python
version_details.get_scoring_python(FILENAME)

# Export in MLflow format
version_details.get_scoring_mlflow(FILENAME)

Using a Saved Model in a Python recipe or notebook#

Once a model has been trained and deployed as a saved model, you typically use scoring recipes or API node in order to use them.

You can however also use the saved model directly in a Python recipe or notebook for performing scoring from your own code.

This comes with several limitations:

  • It only supports models trained with the in-memory engine. It does not support MLlib models

  • It does not support running in containers. Only local execution is supported.

Here is an example:

  import dataiku

  m = dataiku.Model(my_model_id)
  my_predictor = m.get_predictor()
  my_predictor.predict(my_df_to_score)

Reference documentation#

dataiku.Model(lookup[, project_key, ignore_flow])

Handle to interact with a saved model.

dataiku.core.saved_model.SavedModelVersionMetrics(...)

Handle to the metrics of a version of a saved model

Reference documentation#

Interaction with a ML Task#

dataikuapi.dss.ml.DSSMLTask(client, ...)

A handle to interact with a ML Task for prediction or clustering in a DSS visual analysis.

Manipulation of settings#

dataikuapi.dss.ml.DSSMLTaskSettings(client, ...)

Object to read and modify the settings of an existing ML task.

dataikuapi.dss.ml.DSSPredictionMLTaskSettings(...)

dataikuapi.dss.ml.DSSClusteringMLTaskSettings(...)

dataikuapi.dss.ml.DSSTimeseriesForecastingMLTaskSettings(...)

dataikuapi.dss.ml.PredictionSplitParamsHandler(...)

Object to modify the train/test dataset splitting params.

Exploration of results#

dataikuapi.dss.ml.DSSTrainedPredictionModelDetails(...)

Object to read details of a trained prediction model

dataikuapi.dss.ml.DSSTrainedClusteringModelDetails(...)

Object to read details of a trained clustering model

Saved models#

dataikuapi.dss.savedmodel.DSSSavedModel(...)

Handle to interact with a saved model on the DSS instance.

dataikuapi.dss.savedmodel.DSSSavedModelSettings(...)

Handle on the settings of a saved model.

MLflow models#

dataikuapi.dss.savedmodel.ExternalModelVersionHandler(...)

Handler to interact with an External model version (MLflow import of Proxy model).

dataikuapi.dss.savedmodel.MLFlowVersionSettings(...)

Handle for the settings of an imported MLFlow model version.

Saved models (usage in a recipe)#

dataiku.Model(lookup[, project_key, ignore_flow])

Handle to interact with a saved model.

dataiku.core.saved_model.Predictor(params, ...)

Object allowing to preprocess and make predictions on a dataframe.