Scenarios#
The scenario API can control all aspects of managing a scenario.
Note
There is a dedicated API to use within a scenario to run steps and report on progress of the scenario. For that, please see Scenarios (in a scenario).
Example use cases#
In all examples, project
is a dataikuapi.dss.project.DSSProject
handle, obtained using get_project()
or get_default_project()
Run a scenario#
Variant 1: Run and wait for it to complete#
scenario = project.get_scenario("MYSCENARIO")
scenario.run_and_wait()
Variant 2: Run, then poll while doing other stuff#
scenario = project.get_scenario("MYSCENARIO")
trigger_fire = scenario.run()
# When you call `run` a scenario, the scenario is not immediately
# started. Instead a "manual trigger" fires.
#
# This trigger fire can be cancelled if the scenario was already running,
# or if another trigger fires
# Thus, the scenario run is not available immediately, and we must "wait"
# for it
scenario_run = trigger_fire.wait_for_scenario_run()
# Now the scenario is running. We can wait for it synchronously with
# scenario_run.wait_for_completion(), but if we want to do other stuff
# at the same time, we can use refresh
while True:
# Do a bit of other stuff
# ...
scenario_run.refresh()
if scenario_run.running:
print("Scenario is still running ...")
else:
print("Scenario is not running anymore")
break
time.sleep(5)
Get information about the last completed run of a scenario#
scenario = project.get_scenario("MYSCENARIO")
last_runs = scenario.get_last_runs(only_finished_runs=True)
if len(last_runs) == 0:
raise Exception("The scenario never ran")
last_run = last_runs[0]
# outcome can be one of SUCCESS, WARNING, FAILED or ABORTED
print("The last run finished with %s" % last_run.outcome)
# start_time and end_time are datetime.datetime objects
print("Last run started at %s and finished at %s" % (last_run.start_time, last_run.end_time))
Disable/enable scenarios#
Disable and remember#
This snippet disables all scenarios in a project (i.e. prevents them from auto-triggering), and also keeps a list of the ones that were active, so that you can selectively re-enable them later
# List of scenario ids that were active
previously_active = []
for scenario in project.list_scenarios(as_type="objects"):
settings = scenario.get_settings()
if settings.active:
previously_active.append(scenario.id)
settings.active = False
# In order for settings change to take effect, you need to save them
settings.save()
Enable scenarios from a list of ids#
for scenario_id in previously_active:
scenario = project.get_scenario(scenario_id)
settings = scenario.get_settings()
settings.active = True
settings.save()
List the “run as” user for all scenarios#
This snippet allows you to list the identity under which a scenario runs:
for scenario in project.list_scenarios(as_type="objects"):
settings = scenario.get_settings()
# We must use `effective_run_as` and not `run_as` here.
# run_as contains the "configured" run as, which can be None - in that case, it will run
# as the last modifier of the scenario
# effective_run_as is always valued and is the resolved version.
print("Scenario %s runs as user %s" % (scenario.id, settings.effective_run_as))
Reassign scenarios to another user#
If user “u1” has left the company, you may want to reassign all scenarios that ran under his identity to another user “u2”.
for scenario in project.list_scenarios(as_type="objects"):
settings = scenario.get_settings()
if settings.effective_run_as == "u1":
print("Scenario %s used to run as u1, reassigning it")
# To configure a run_as, we must use the run_as property.
# effective_run_as is read-only
settings.run_as = "u2"
settings.save()
Get the “next expected” run for a scenario#
If the scenario has a temporal trigger enabled, this will return a datetime of the approximate next expected run
scenario = project.get_scenario("MYSCENARIO")
# next_run is None if no next run is scheduled
print("Next run is at %s" % scenario.get_status().next_run)
Get the list of jobs started by a scenario#
“Build/Train” or Python steps in a scenario can start jobs. This snippet will give you the list of job ids that a particular scenario run executed.
These job ids can then be used together with get_job()
scenario = project.get_scenario("MYSCENARIO")
# Focusing only on the last completed run. Else, use get_last_runs() and iterate
last_run = scenario.get_last_finished_run()
last_run_details = last_run.get_details()
all_job_ids = []
for step in last_run_details.steps:
all_job_ids.extend(step.job_ids)
print("All job ids started by scenario run %s : %s" % (last_run.id, all_job_ids))
Get the first error that happened in a scenario run#
This snippet retrieves the first error that happened during a scenario run.
scenario = project.get_scenario("MYSCENARIO")
last_run = scenario.get_last_finished_run()
if last_run.outcome == "FAILED":
last_run_details = last_run.get_details()
print("Error was: %s" % (last_run_details.first_error_details))
Start multiple scenarios and wait for all of them to complete#
This code snippet starts multiple scenarios and returns when all of them have completed, returning the updated DSSScenarioRun for each
import time
scenarios_ids_to_run = ["s1", "s2", "s3"]
scenario_runs = []
for scenario_id in scenarios_ids_to_run:
scenario = project.get_scenario(scenario_id)
trigger_fire = scenario.run()
# Wait for the trigger fire to have actually started a scenario
scenario_run = trigger_fire.wait_for_scenario_run()
scenario_runs.append(scenario_run)
# Poll all scenario runs, until all of them have completed
while True:
any_not_complete = False
for scenario_run in scenario_runs:
# Update the status from the DSS API
scenario_run.refresh()
if scenario_run.running:
any_not_complete = True
if any_not_complete:
print("At least a scenario is still running...")
else:
print("All scenarios are complete")
break
# Wait a bit before checking again
time.sleep(30)
print("Scenario run ids and outcomes: %s" % ([(sr.id, sr.outcome) for sr in scenario_runs]))
Change the “from” email for email reporters#
Note that usually, we would recommend using variables for “from” and “to” email. But you can also modify them with the API.
scenario = project.get_scenario("MYSCENARIO")
settings = scenario.get_settings()
for reporter in settings.raw_reporters:
# Only look into 'email' kind of reporters
if reporter["messaging"]["type"] == "mail-scenario":
messaging_configuration = reporter["messaging"]["configuration"]
messaging_configuration["sender"] = "new.email.address@company.com"
print("Updated reporter %s" % reporter["id"])
settings.save()
Detailed examples#
This section contains more advanced examples on Scenarios.
Get last run results#
You can programmatically get the outcome of the last finished run for a given Scenario.
scenario = project.get_scenario(scenario_id)
last_run = scenario.get_last_finished_run()
data = {
"scenario_id": scenario_id,
"outcome": last_run.outcome,
"start_time": last_run.start_time.isoformat(),
"end_time": last_run.end_time.isoformat()
}
print(data)
From there, you can easily extend the same logic to loop across all Scenarios within a Project.
Reference documentation#
|
A handle to interact with a scenario on the DSS instance. |
Settings of a scenario. |
|
A handle containing basic info about a past run of a scenario. |
|
A handle representing the firing of a trigger on a scenario. |
|
An item in a list of scenarios. |