Govern Actions#
Actions are used to manually trigger user-defined scripts in Dataiku Govern. They are written in Python and exist at two levels: artifact and instance.
Here, we provide some use cases that demonstrate how to use actions.
Trigger a scenario on a design node from a govern artifact#
Scenarios defined at the project level in the Design node can be triggered directly from the Govern node using actions attached to project artifacts.
The logic implemented here is the following:
Retrieve the Dataiku projects associated with the Govern project.
Loop over the artifacts retrieving the associated project key and node ID and filtering out automation nodes.
Use DSSClient on each Dataiku artifact to retrieve the requested scenario and run it with the specified custom parameters.
Note
In the sample code below, the scenario is run synchronously, blocking the kernel until its completion. The scenario can also be run asynchronously using scenario.run(params)
from govern.core.artifact_action_handler import get_artifact_action_handler
import dataikuapi
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
handler = get_artifact_action_handler()
SCENARIO_ID = 'TESTPARAM'
DSS_NODES_CONFIG = {
"Staging design": {
"url": "https://design-node-url",
"api_key": "SECRET_KEY"
}
}
def get_dataiku_items_from_govern_item(govern_client, govern_item):
definition = govern_item.get_definition()
dku_items = definition.get_raw().get('fields', {}).get('dataiku_item', [])
return [govern_client.get_artifact(artifact_id) for artifact_id in dku_items]
def trigger_scenario_on_node(node_id, project_key, scenario_id):
if node_id not in DSS_NODES_CONFIG:
logger.info(f"[Skipping] No credentials for Node ID: {node_id}")
return
creds = DSS_NODES_CONFIG[node_id]
try:
client = dataikuapi.DSSClient(creds["url"], creds["api_key"])
project = client.get_project(project_key)
scenario = project.get_scenario(scenario_id)
params = handler.params
logger.info(f"Triggering scenario '{SCENARIO_ID}' on project '{project_key}' and node '{node_id}'")
# we can run the scenario asynchronously and not wait for the result
# outcome = scenario.run(params)
outcome = scenario.run_and_wait(params)
logger.info(f"Scenario finished with outcome: {outcome.outcome}")
except Exception as e:
handler.status = "ERROR"
handler.message = f"An error occurred: {e}"
logger.error(f"An error occurred running the scenario: {e}")
def trigger_scenario_workflow():
govern_client = handler.client
govern_project = govern_client.get_artifact(handler.enrichedArtifact.artifact.id)
linked_dku_artifacts = get_dataiku_items_from_govern_item(govern_client, govern_project)
logger.info(f"Found {len(linked_dku_artifacts)} linked Dataiku projects.")
for dku_artifact in linked_dku_artifacts:
raw_fields = dku_artifact.get_definition().get_raw().get('fields', {})
target_project_key = raw_fields.get('project_key')
target_node_id = raw_fields.get('node_id')
is_automation = raw_fields.get('automation_node')
logger.info(f"Processing '{target_project_key}' on '{target_node_id}'")
# Filter: Skip if it is an automation node
if is_automation is True:
logger.info(f"Skipping automation node project '{target_project_key}' and node '{target_node_id}'")
continue
# Process only Design nodes
if target_project_key and target_node_id:
trigger_scenario_on_node(target_node_id, target_project_key, SCENARIO_ID)
trigger_scenario_workflow()
