Scenarios#

The scenario API can control all aspects of managing a scenario.

Note

There is a dedicated API to use within a scenario to run steps and report on progress of the scenario. For that, please see Scenarios (in a scenario).

Example use cases#

In all examples, project is a dataikuapi.dss.project.DSSProject handle, obtained using get_project() or get_default_project()

Run a scenario#

Variant 1: Run and wait for it to complete#

scenario = project.get_scenario("MYSCENARIO")

scenario.run_and_wait()

Variant 2: Run, then poll while doing other stuff#

scenario = project.get_scenario("MYSCENARIO")

trigger_fire = scenario.run()

# When you call `run` a scenario, the scenario is not immediately
# started. Instead a "manual trigger" fires.
#
# This trigger fire can be cancelled if the scenario was already running,
# or if another trigger fires
# Thus, the scenario run is not available immediately, and we must "wait"
# for it

scenario_run = trigger_fire.wait_for_scenario_run()

# Now the scenario is running. We can wait for it synchronously with
# scenario_run.wait_for_completion(), but if we want to do other stuff
# at the same time, we can use refresh

while True:
    # Do a bit of other stuff
    # ...

    scenario_run.refresh()
    if scenario_run.running:
        print("Scenario is still running ...")
    else:
        print("Scenario is not running anymore")
        break

    time.sleep(5)

Get information about the last completed run of a scenario#

scenario = project.get_scenario("MYSCENARIO")

last_runs = scenario.get_last_runs(only_finished_runs=True)

if len(last_runs) == 0:
    raise Exception("The scenario never ran")

last_run = last_runs[0]

# outcome can be one of SUCCESS, WARNING, FAILED or ABORTED
print("The last run finished with %s" % last_run.outcome)

# start_time and end_time are datetime.datetime objects
print("Last run started at %s and finished at %s" % (last_run.start_time, last_run.end_time))

Disable/enable scenarios#

Disable and remember#

This snippet disables all scenarios in a project (i.e. prevents them from auto-triggering), and also keeps a list of the ones that were active, so that you can selectively re-enable them later

# List of scenario ids that were active
previously_active = []

for scenario in project.list_scenarios(as_type="objects"):
    settings = scenario.get_settings()

    if settings.active:
        previously_active.append(scenario.id)
        settings.active = False
        # In order for settings change to take effect, you need to save them
        settings.save()

Enable scenarios from a list of ids#

for scenario_id in previously_active:
    scenario = project.get_scenario(scenario_id)
    settings = scenario.get_settings()

    settings.active = True
    settings.save()

List the “run as” user for all scenarios#

This snippet allows you to list the identity under which a scenario runs:

for scenario in project.list_scenarios(as_type="objects"):
    settings = scenario.get_settings()
    # We must use `effective_run_as` and not `run_as` here.
    # run_as contains the "configured" run as, which can be None - in that case, it will run
    # as the last modifier of the scenario
    # effective_run_as is always valued and is the resolved version.
    print("Scenario %s runs as user %s" % (scenario.id, settings.effective_run_as))

Reassign scenarios to another user#

If user “u1” has left the company, you may want to reassign all scenarios that ran under his identity to another user “u2”.

for scenario in project.list_scenarios(as_type="objects"):
    settings = scenario.get_settings()

    if settings.effective_run_as == "u1":
        print("Scenario %s used to run as u1, reassigning it")
        # To configure a run_as, we must use the run_as property.
        # effective_run_as is read-only
        settings.run_as = "u2"
        settings.save()

Get the “next expected” run for a scenario#

If the scenario has a temporal trigger enabled, this will return a datetime of the approximate next expected run

scenario = project.get_scenario("MYSCENARIO")
# next_run is None if no next run is scheduled
print("Next run is at %s" % scenario.get_status().next_run)

Get the list of jobs started by a scenario#

“Build/Train” or Python steps in a scenario can start jobs. This snippet will give you the list of job ids that a particular scenario run executed.

These job ids can then be used together with get_job()

scenario = project.get_scenario("MYSCENARIO")
# Focusing only on the last completed run. Else, use get_last_runs() and iterate
last_run = scenario.get_last_finished_run()

last_run_details = last_run.get_details()

all_job_ids = []
for step in last_run_details.steps:
    all_job_ids.extend(step.job_ids)

print("All job ids started by scenario run %s : %s" % (last_run.id, all_job_ids))

Get the first error that happened in a scenario run#

This snippet retrieves the first error that happened during a scenario run.

scenario = project.get_scenario("MYSCENARIO")

last_run = scenario.get_last_finished_run()

if last_run.outcome == "FAILED":
    last_run_details = last_run.get_details()
    print("Error was: %s" % (last_run_details.first_error_details))

Start multiple scenarios and wait for all of them to complete#

This code snippet starts multiple scenarios and returns when all of them have completed, returning the updated DSSScenarioRun for each

import time

scenarios_ids_to_run = ["s1", "s2", "s3"]

scenario_runs = []

for scenario_id in scenarios_ids_to_run:
    scenario = project.get_scenario(scenario_id)

    trigger_fire = scenario.run()
    # Wait for the trigger fire to have actually started a scenario
    scenario_run = trigger_fire.wait_for_scenario_run()
    scenario_runs.append(scenario_run)

# Poll all scenario runs, until all of them have completed
while True:
    any_not_complete = False
    for scenario_run in scenario_runs:
        # Update the status from the DSS API
        scenario_run.refresh()
        if scenario_run.running:
            any_not_complete = True

    if any_not_complete:
        print("At least a scenario is still running...")
    else:
        print("All scenarios are complete")
        break

    # Wait a bit before checking again
    time.sleep(30)

print("Scenario run ids and outcomes: %s" % ([(sr.id, sr.outcome) for sr in scenario_runs]))

Change the “from” email for email reporters#

Note that usually, we would recommend using variables for “from” and “to” email. But you can also modify them with the API.

scenario = project.get_scenario("MYSCENARIO")

settings = scenario.get_settings()

for reporter in settings.raw_reporters:
    # Only look into 'email' kind of reporters
    if reporter["messaging"]["type"] == "mail-scenario":
        messaging_configuration = reporter["messaging"]["configuration"]
        messaging_configuration["sender"] = "new.email.address@company.com"
        print("Updated reporter %s" % reporter["id"])

settings.save()

Detailed examples#

This section contains more advanced examples on Scenarios.

Get last run results#

You can programmatically get the outcome of the last finished run for a given Scenario.

scenario = project.get_scenario(scenario_id)
last_run = scenario.get_last_finished_run()
data = {
    "scenario_id": scenario_id,
    "outcome": last_run.outcome,
    "start_time": last_run.start_time.isoformat(),
    "end_time": last_run.end_time.isoformat()
}
print(data)

From there, you can easily extend the same logic to loop across all Scenarios within a Project.

Reference documentation#

dataikuapi.dss.scenario.DSSScenario(client, ...)

A handle to interact with a scenario on the DSS instance.

dataikuapi.dss.scenario.DSSScenarioSettings(...)

Settings of a scenario.

dataikuapi.dss.scenario.DSSScenarioRun(...)

A handle containing basic info about a past run of a scenario.

dataikuapi.dss.scenario.DSSTriggerFire(...)

A handle representing the firing of a trigger on a scenario.

dataikuapi.dss.scenario.DSSScenarioListItem(...)

An item in a list of scenarios.