Scenarios#
The scenario API can control all aspects of managing a scenario.
Note
There is a dedicated API to use within a scenario to run steps and report on progress of the scenario. For that, please see Scenarios (in a scenario).
Example use cases#
In all examples, project
is a dataikuapi.dss.project.DSSProject
handle, obtained using client.get_project()
or client.get_default_project()
Run a scenario#
Variant 1: Run and wait for it to complete#
scenario = project.get_scenario("myscenario")
scenario.run_and_wait()
Variant 2: Run, then poll while doing other stuff#
scenario = project.get_scenario("myscenario")
trigger_fire = scenario.run()
# When you call `run` a scenario, the scenario is not immediately
# started. Instead a "manual trigger" fires.
#
# This trigger fire can be cancelled if the scenario was already running,
# or if another trigger fires
# Thus, the scenario run is not available immediately, and we must "wait"
# for it
scenario_run = trigger_fire.wait_for_scenario_run()
# Now the scenario is running. We can wait for it synchronously with
# scenario_run.wait_for_completion(), but if we want to do other stuff
# at the same time, we can use refresh
while True:
# Do a bit of other stuff
# ...
scenario_run.refresh()
if scenario_run.running:
print("Scenario is still running ...")
else:
print("Scenario is not running anymore")
break
time.sleep(5)
Get information about the last completed run of a scenario#
scenario = project.get_scenario("myscenario")
last_runs = scenario.get_last_runs(only_finished_runs=True)
if len(last_runs) == 0:
raise Exception("The scenario never ran")
last_run = last_runs[0]
# outcome can be one of SUCCESS, WARNING, FAILED or ABORTED
print("The last run finished with %s" % last_run.outcome)
# start_time and end_time are datetime.datetime objects
print("Last run started at %s and finished at %s" % (last_run.start_time, last_run.end_time))
Disable/enable scenarios#
Disable and remember#
This snippet disables all scenarios in a project (i.e. prevents them from auto-triggering), and also keeps a list of the ones that were active, so that you can selectively re-enable them later
# List of scenario ids that were active
previously_active = []
for scenario in project.list_scenarios(as_type="objects"):
settings = scenario.get_settings()
if settings.active:
previously_active.append(scenario.id)
settings.active = False
# In order for settings change to take effect, you need to save them
settings.save()
Enable scenarios from a list of ids#
for scenario_id in previously_active:
scenario = project.get_scenario(scenario_id)
settings = scenario.get_settings()
settings.active = True
settings.save()
List the “run as” user for all scenarios#
This snippet allows you to list the identity under which a scenario runs:
for scenario in project.list_scenarios(as_type="objects"):
settings = scenario.get_settings()
# We must use `effective_run_as` and not `run_as` here.
# run_as contains the "configured" run as, which can be None - in that case, it will run
# as the last modifier of the scenario
# effective_run_as is always valued and is the resolved version.
print("Scenario %s runs as user %s" % (scenario.id, settings.effective_run_as))
Reassign scenarios to another user#
If user “u1” has left the company, you may want to reassign all scenarios that ran under his identity to another user “u2”.
for scenario in project.list_scenarios(as_type="objects"):
settings = scenario.get_settings()
if settings.effective_run_as == "u1":
print("Scenario %s used to run as u1, reassigning it")
# To configure a run_as, we must use the run_as property.
# effective_run_as is read-only
settings.run_as = "u2"
settings.save()
Get the “next expected” run for a scenario#
If the scenario has a temporal trigger enabled, this will return a datetime of the approximate next expected run
scenario = project.get_scenario("myscenario")
# next_run is None if no next run is scheduled
print("Next run is at %s" % scenario.get_status().next_run)
Get the list of jobs started by a scenario#
“Build/Train” or Python steps in a scenario can start jobs. This snippet will give you the list of job ids that a particular scenario run executed.
These job ids can then be used together with dataikuapi.dss.project.DSSProject.get_job()
scenario = project.get_scenario("myscenario")
# Focusing only on the last completed run. Else, use get_last_runs() and iterate
last_run = scenario.get_last_finished_run()
last_run_details = last_run.get_details()
all_job_ids = []
for step in last_run_details.steps:
all_job_ids.extend(step.job_ids)
print("All job ids started by scenario run %s : %s" % (last_run.id, all_job_ids))
Get the first error that happened in a scenario run#
This snippet retrieves the first error that happened during a scenario run.
scenario = project.get_scenario("myscenario")
last_run = scenario.get_last_finished_run()
if last_run.outcome == "FAILED":
last_run_details = last_run.get_details()
print("Error was: %s" % (last_run_details.first_error_details))
Start multiple scenarios and wait for all of them to complete#
This code snippet starts multiple scenarios and returns when all of them have completed, returning the updated DSSScenarioRun for each
import time
scenarios_ids_to_run = ["s1", "s2", "s3"]
scenario_runs = []
for scenario_id in scenarios_ids_to_run:
scenario = project.get_scenario(scenario_id)
trigger_fire = scenario.run()
# Wait for the trigger fire to have actually started a scenario
scenario_run = trigger_fire.wait_for_scenario_run()
scenario_runs.append(scenario_run)
# Poll all scenario runs, until all of them have completed
while True:
any_not_complete = False
for scenario_run in scenario_runs:
# Update the status from the DSS API
scenario_run.refresh()
if scenario_run.running:
any_not_complete = True
if any_not_complete:
print("At least a scenario is still running...")
else:
print("All scenarios are complete")
break
# Wait a bit before checking again
time.sleep(30)
print("Scenario run ids and outcomes: %s" % ([(sr.id, sr.outcome) for sr in scenario_runs]))
Change the “from” email for email reporters#
Note that usually, we would recommend using variables for “from” and “to” email. But you can also modify them with the API.
scenario = project.get_scenario("myscenario")
settings = scenario.get_settings()
for reporter in settings.raw_reporters:
# Only look into 'email' kind of reporters
if reporter["messaging"]["type"] == "mail-scenario":
messaging_configuration = reporter["messaging"]["configuration"]
messaging_configuration["sender"] = "new.email.address@company.com"
print("Updated reporter %s" % reporter["id"])
settings.save()
Detailed examples#
This section contains more advanced examples on Scenarios.
Get last run results#
You can programmatically get the outcome of the last finished run for a given Scenario.
scenario = project.get_scenario(scenario_id)
last_run = scenario.get_last_finished_run()
data = {
"scenario_id": scenario_id,
"outcome": last_run.outcome,
"start_time": last_run.start_time.isoformat(),
"end_time": last_run.end_time.isoformat()
}
print(data)
From there, you can easily extend the same logic to loop across all Scenarios within a Project.
Define Scenario timeout#
There is no explicit timeout functionality for “Build” steps within Dataiku Scenarios, however in the case of custom Scenarios you can implement one using the public API’s Python client. The following example is a custom Scenario with a build_dataset
step which is aborted if it takes more than 1 hour (3600 seconds) to run.
###########################################################################################
# !! CUSTOM SCENARIO EXAMPLE !! #
# See https://doc.dataiku.com/dss/latest/scenarios/custom_scenarios.html for more details #
###########################################################################################
import time
import dataiku
from dataiku.scenario import Scenario, BuildFlowItemsStepDefHelper
from dataikuapi.dss.future import DSSFuture
TIMEOUT_SECONDS = 3600
s = Scenario()
# Replace this commented block by your Scenario steps
# Example: build a Dataset
step_handle = s.build_dataset("your_dataset_name", asynchronous=True)
start = time.time()
while not step_handle.is_done():
end = time.time()
print("Duration: {}s".format(end-start))
if end - start > TIMEOUT_SECONDS:
f = DSSFuture(dataiku.api_client(), step_handle.future_id)
f.abort()
raise Exception("Scenario was aborted because it took too much time.")
Reference documentation#
|
A handle to interact with a scenario on the DSS instance. |
Settings of a scenario. |
|
A handle containing basic info about a past run of a scenario. |
|
A handle representing the firing of a trigger on a scenario. |
|
An item in a list of scenarios. |