Agents#
Tutorials
You can find tutorials on this subject in the Developer Guide: Agents and Tools for Generative AI.
Note
Once you have a DKUChatModel, obtained by as_langchain_chat_model(),
you can use the langchain methods (like invoke, stream, etc.)
Tip
In the code provided on this page, you will need to provide ID of other elements. You will find how to list your LLM , list your Knowledge Bank or list your Agent Tools in dedicated pages.
Listing your agents#
The following code lists all the Agents and their IDs, which you can reuse in the code samples listed later on this page.
import dataiku
client = dataiku.api_client()
project = client.get_default_project()
llm_list = project.list_llms()
for llm in llm_list:
if 'agent:' in llm.id:
print(f"- {llm.description} (id: {llm.id})")
Using your agent#
Using the native DSSLLM completion, for more information,
refer to Perform completion queries on LLMs:
import dataiku
AGENT_ID = "" # Fill with your agent id
client = dataiku.api_client()
project = client.get_default_project()
llm = project.get_llm(AGENT_ID)
completion = llm.new_completion()
resp = completion.with_message("How to run an agent?").execute()
if resp.success:
print(resp.text)
With the DKUChatModel, for more information,
refer to LangChain integration:
AGENT_ID = "" # Fill with your agent id
langchain_llm = project.get_llm(AGENT_ID).as_langchain_chat_model()
resp = langchain_llm.invoke("How to run an agent?")
print(resp.content)
Streaming (in a notebook)#
from dataikuapi.dss.llm import DSSLLMStreamedCompletionChunk, DSSLLMStreamedCompletionFooter
from IPython.display import display, clear_output
AGENT_ID = "" # Fill with your agent id
client = dataiku.api_client()
project = client.get_default_project()
llm = project.get_llm(AGENT_ID)
completion = llm.new_completion()
completion.with_message("Who is the customer fdouetteau? Please provide additional information.")
gen = ""
for chunk in completion.execute_streamed():
if isinstance(chunk, DSSLLMStreamedCompletionChunk):
gen += chunk.data["text"]
clear_output()
display("Received text: %s" % gen)
elif isinstance(chunk, DSSLLMStreamedCompletionFooter):
print("Completion is complete: %s" % chunk.data)
from IPython.display import display, clear_output
AGENT_ID = "" # Fill with your agent id
langchain_llm = project.get_llm(AGENT_ID).as_langchain_chat_model()
resp = langchain_llm.stream("Who is the customer fdouetteau? Please provide additional information.")
gen = ""
for r in resp:
clear_output()
gen += r.content
display(gen)
Asynchronous Streaming (in a notebook)#
import asyncio
from IPython.display import display, clear_output
async def func(response):
gen = ""
async for r in response:
clear_output()
gen += r.content
display(gen)
AGENT_ID = "" # Fill with your agent id
langchain_llm = project.get_llm(AGENT_ID).as_langchain_chat_model()
resp = langchain_llm.astream("Who is the customer fdouetteau? Please provide additional information.")
await(func(resp))
Agent response: sources and artifacts#
When an agent is queried via the Dataiku LLM Mesh, it may return sources to show which documents it has used to formulate its response. It may also return artifacts to enhance its response.
Sources are documents that an agent reads to help it formulate an answer to a user’s query.
Artifacts are similar to sources, but they are produced by an agent as an output for the user, rather than being read as an input.
After a call to your agent, you will get a dataikuapi.dss.llm.DSSLLMCompletionResponse.
If the call was streamed, you will get a series of DSSLLMStreamedCompletionChunk objects and a final DSSLLMStreamedCompletionFooter object.
In non-streamed mode, the artifacts are returned in the artifacts field of the response, and the sources are returned in the additionalInformation.sources field of the response:
{
"ok": True,
"text": "...",
"finishReason": "STOP",
"artifacts": [ ], /* list of artifact objects */
"additionalInformation": {
"sources": [ ], /* list of source objects */
}
}
In streamed mode, artifacts are streamed inline with text chunks, and sources are included in the streaming footer:
{
{"type": "event", "eventKind": "AGENT_GETTING_READY"}
{"type": "event", "eventKind": "AGENT_THINKING"}
{"type": "content", "text": "..." }
{"type": "content", "artifacts": [ ]}
{"type": "content", "artifacts": [ ]}
{"type": "content", "text": "..." }
/* ... */
{
"type": "footer",
"finishReason": "STOP",
"additionalInformation": {
"sources": [ ]
}
"trace": " "
}
}
Artifact structure#
The artifact object has the following structure:
{
"id": "...",
"type": "...",
"name": "...",
"description": "...",
"hierarchy": [ ],
"parts": [ ],
}
idis optional. It is used for aggregating streaming chunks if one artifact is streamed in multiple chunks.typeis mandatory. It instructs Dataiku on how to display this artifact and signals which parts are permitted.nameis optional. It is used in the Dataiku UI to describe this artifact to the user.descriptionis optional. It is used in the Dataiku UI to describe this artifact to the user.hierarchyis optional. It identifies the origin of artifacts in complex agent architectures.partsis mandatory. This is a list of part objects, where the data of the artifact resides. Both sources and artifacts share the same data structure for parts (calleditemsfor sources).
Source structure#
The source object has the following format:
{
"toolCallDescription": "...",
"hierarchy": [ ],
"items": [ ],
}
toolCallDescriptionis optional.hierarchyis optional. It identifies the origin of sources in complex agent architectures.itemsis mandatory. This is a list of part objects, where the data of the source resides. Both sources and artifacts share the same data structure for items (calledpartsfor artifacts)
Source items and Artifact parts structure#
The parts field of artifact and the items field of source have the same format:
{
"type": "...",
"index": /* integer value */,
/* type-specific fields */
}
The type field is mandatory and can contain many different values. The most common values are
REASONING and TEXT. However, when dealing with sources such as with RAG-based tools and agents,
you can also encounter values such as SIMPLE_DOCUMENT, FILE_BASED_DOCUMENT, DATA_INLINE and RECORDS.
Plugins might be providing other values as well.
The index field is optional. It is used for aggregating streaming chunks if one part is streamed in multiple chunks.
Each part can contain different additional fields specific to the part type.
When you are provided with type REASONING you will see the reasoning of the LLM agent separated in
different parts. Each part is a sequential reasoning text, leveraging the index field.
Each of these parts will be provided with the TEXT field.
Human approval#
Agents queried via the Dataiku LLM Mesh may also interrupt their execution to request human approval before moving forward with calling some tools, when the tools are configured to require it.
In that case, the agent response includes a toolValidationRequests field with an array of toolValidationRequest objects
{
"ok": True,
"text": "...",
"finishReason": "TOOL_VALIDATION_REQUESTS",
"toolValidationRequests": [ ], /* list of toolValidationRequest objects */
"artifacts": [ ], /* list of artifact objects */
"additionalInformation": {
"sources": [ ], /* list of source objects */
}
}
In streamed mode, tool validation requests are streamed with text chunks and artifacts:
{
{"type": "event", "eventKind": "AGENT_GETTING_READY"}
{"type": "event", "eventKind": "AGENT_THINKING"}
{"type": "content", "text": "..." }
{"type": "content", "artifacts": [ ]}
{"type": "content", "artifacts": [ ]}
{"type": "content", "text": "..." }
{"type": "content", "toolValidationRequests": [ ]}
/* ... */
{
"type": "footer",
"finishReason": "TOOL_VALIDATION_REQUESTS",
"additionalInformation": {
"sources": [ ]
}
"trace": " "
}
}
Tool validation request structure#
The toolValidationRequest object has the following format:
{
"id": "...",
"hierarchy": [ ],
"message": "...",
"allowEditingInputs": /* boolean value */,
"toolRef": "...",
"toolName": "...",
"toolType": "...",
"toolInputSchema": /* JSON schema */,
"toolCall": /* tool call object */,
}
idis mandatory. It uniquely identifies the validation request, in order to match the corresponding tool validation response.hierarchyis optional. It identifies the origin of tool validation requests in complex agent architectures.messageis optional. It provides additional information about the tool call that is pending human approval.allowEditingInputsis mandatory. This flag indicates whether editing the inputs of the tool call in the tool validation response is permitted.toolRefis optional. It contains the DSS id of the tool when the tool is a DSS agent tool.toolNameis optional. It is used in the Dataiku UI to describe the tool to the user.toolTypeis optional. It contains the DSS tool type when the tool is a DSS agent tool. It is used in the Dataiku UI to display the correct tool icon.toolInputSchemais optional. It informs about the expected schema of the tool call input, e.g. for human edition.toolCallis mandatory. It contains the tool call pending human approval with the following data structure.
Tool call structure#
{
"type": "function",
"id": "...",
"index": /* int value */,
"function": {
"name": "...",
"arguments": "..."
}
}
idis mandatory. It uniquely identifies the tool call with the underlying LLM.indexis optional. It is used for aggregating streaming chunks if the tool call is streamed in multiple chunks.functionis mandatory. It contains thenameof the tool as known by the underlying LLM, and theargumentsof the pending call in the form of a serialized JSON.
Code Agents#
In Dataiku, you can implement a custom agent in code that leverages models from the LLM Mesh, LangChain, and its wider ecosystem.
The resulting agent becomes part of the LLM Mesh, seamlessly integrating into your AI workflows.
Dataiku includes basic code examples to help you get started. Below are more advanced samples that showcase full-fledged examples of agents built with LangChain and LangGraph. They both work with the internal code environment for retrieval-augmented generation to avoid any code env issue.
This support agent is designed to handle customer inquiries efficiently. With its tools, it can:
retrieve relevant information from an FAQ database
log issues for follow-up when immediate answers aren’t available
escalate complex requests to a human agent when necessary.
We have tested it on this [Paris Olympics FAQ dataset](https://www.kaggle.com/datasets/sahityasetu/paris-2024-olympics-faq), which we used to create a knowledge bank with the Embed recipe. We have embedded a column containing both the question and the corresponding answer. Use the agent on inquiries like: How will transportation work in Paris during the Olympic Games? or I booked a hotel for the Olympic games in Paris, but never received any confirmation. What’s happening? and see how it reacts!
import dataiku
from dataiku.llm.python import BaseLLM
from dataiku.langchain import LangchainToDKUTracer
from langchain.tools import Tool
from langchain.agents import create_openai_tools_agent, AgentExecutor
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
## 1. Set Up Vector Search for FAQs
# Here, we are using a knowledge bank from the flow, build with our native Embed recipe.
# We make it a Langchain retriever and pass it to our first tool.
KB_ID = "" # fill with the ID of your Knowledge Bank
LLM_ID = "" # fill with your LLM id
faq_retriever = dataiku.KnowledgeBank(id=KB_ID).as_langchain_retriever()
faq_retriever_tool = Tool(
name="FAQRetriever",
func=faq_retriever.get_relevant_documents,
description="Retrieves answers from the FAQ database based on user questions."
)
## 2. Define (fake) Ticketing & Escalation Tools
# Simulated ticket creation function
def create_ticket(issue: str):
# Here, you would typically use the API to your internal ticketing tool.
return f"Ticket created: {issue}"
ticketing_tool = Tool(
name="CreateTicket",
func=create_ticket,
description="Creates a support ticket when the issue cannot be resolved automatically."
)
# Simulated escalation function
def escalate_to_human(issue: str):
# This function could send a notification to the support engineers, for instance.
# It can be useful to attach info about the customer's request, sentiment, and history.
return f"Escalation triggered: {issue} has been sent to a human agent."
escalation_tool = Tool(
name="EscalateToHuman",
func=escalate_to_human,
description="Escalates the issue to a human when it's too complex, or the user is upset."
)
## 3. Build the LangChain Agent
# Define LLM for agent reasoning
llm = dataiku.api_client().get_default_project().get_llm(LLM_ID).as_langchain_chat_model()
# Agent tools (FAQ retrieval + ticketing + escalation)
tools = [faq_retriever_tool, ticketing_tool, escalation_tool]
tool_names = [tool.name for tool in tools]
# Define the prompt
prompt = ChatPromptTemplate.from_messages(
[
("system",
"""You are an AI customer support agent. Your job is to assist users by:
- Answering questions using the FAQ retriever tool.
- Creating support tickets for unresolved issues.
- Escalating issues to a human when necessary."""),
MessagesPlaceholder("chat_history", optional=True),
("human", "{input}"),
MessagesPlaceholder("agent_scratchpad"),
]
)
# Initialize an agent with tools.
# Here, we define it as an agent that uses OpenAI tools.
# More options are available at https://python.langchain.com/api_reference/langchain/agents.html
agent = create_openai_tools_agent(llm=llm, tools=tools, prompt=prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools)
agent_executor.invoke({"input": "How will transportation work in Paris during the Olympic Games?"})
class MyLLM(BaseLLM):
def __init__(self):
pass
def process(self, query, settings, trace):
prompt = query["messages"][0]["content"]
tracer = LangchainToDKUTracer(dku_trace=trace)
# Wrap the agent in an executor
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
response = agent_executor.invoke({"input": prompt}, config={"callbacks": [tracer]})
return {"text": response["output"]}
This data analysis agent is designed to automate insights from data.
Given a table (from an SQL database) and its schema (list of columns with information about what they contain), it can:
take a user question
translate it into an SQL query
run the query and fetch the result
interpret the result and convert it back into natural language.
The code below was written for [this dataset about car sales](https://www.kaggle.com/datasets/missionjee/car-sales-report). We used a Prepare recipe to remove some columns and parse the date to a proper format. Once implemented, test your agent with questions like: What were the top 5 best-selling car models in 2023? or What was the year-over-year evolution in the Scottsdale region regarding the number of sales?.
import dataiku
from dataiku.llm.python import BaseLLM
from dataiku.langchain import LangchainToDKUTracer
from langchain.prompts import ChatPromptTemplate
from dataiku import SQLExecutor2
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
LLM_ID = "" # fill with your LLM id
# Basic configuration
# Initialize LLM
llm = dataiku.api_client().get_default_project().get_llm(LLM_ID).as_langchain_chat_model()
# Connect to the sales database
dataset = dataiku.Dataset("car_data_prepared_sql")
table_name = dataset.get_location_info().get('info', {}).get('table')
table_schema = """
- `Car_id` (TEXT): Unique car ID
- `Date` (DATE): Date of the sale
- `Dealer_Name` (TEXT): Name of the car dealer
- `Company` (TEXT): Company or brand of the car
- `Model` (TEXT): Model of the car
- `Transmission` (TEXT): Type of transmission in the car
- `Color` (TEXT): Color of the car's exterior
- `Price` (INTEGER): Listed price of the car sold
- `Body_Style` (TEXT): Style or design of the car's body
- `Dealer_Region` (TEXT): Geographic region of the car dealer
"""
# Here, we are adding a dispatcher as the first step of our graph. If the user query is not related to car sales,
# the agent will simply answer that it can't talk about anything else that car sales.
def dispatcher(state):
"""
Decides if the query is related to car sales data or just a general question.
Args:
state (dict): The current graph state
Returns:
str: Binary decision for the next node to call
"""
user_query = state["user_query"]
# Classification prompt
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a classifier that determines whether a user's query is related to car sales data.\n\n"
"The table contains information about car sales, including:\n"
"- Sale date & price\n"
"- Info about the cars (brand, model, transmission, body style & color) \n"
"- Dealer name and region\n\n"
"If the query is related to analyzing car sales data, return 'SQL'.\n"
"Otherwise, return 'GENERIC'."
),
(
"human", "{query}"
)
]
)
# Get the classification result
classification = llm.invoke(
prompt.format_messages(query=user_query)
).content.strip()
return classification
# First node, take the user input and translate it into a coherent SQL query.
def sql_translation(state):
"""
Translates a natural language query into SQL using the database schema.
Args:
state (dict): The current graph state that contains the user_query
Returns:
state (dict): New key added to state -- sql_query -- that contains the query to execute.
"""
print("---Translate to SQL---")
user_query = state["user_query"]
# We need to pass the model our table name and schema. Adapt instructions according to your needs, of course.
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are an AI assistant that converts natural language questions into SQL queries.\n\n"
"Here are the table name: {name} and schema:\n{schema}\n\n"
"Here are some important rules:\n"
"- Use correct table and column names\n"
"- Do NOT use placeholders (e.g., '?', ':param').\n"
"- The SQL should be executable in PostgreSQL, which means that table and column names should ALWAYS be double-quoted.\n"
"- Never return your answer with SQL Markdown decorators. Just the SQL query, nothing else."
),
(
"human",
"Convert the following natural language query into an SQL query:\n\n{query}"
)
]
)
# Invoke LLM with formatted prompt
sql_query = llm.invoke(
prompt.format_messages(name=table_name, schema=table_schema, query=user_query)
).content
return {"sql_query": sql_query}
# Second node, run the SQL query on the table. For this, we are using Dataiku's API.
def database_query(state):
"""
Executes the SQL query and retrieves results.
Args:
state (dict): The current graph state that contains the query to execute.
Returns:
state (dict): New key added to state -- query_result -- that contains the result of the query.
Returns an error key if not working.
"""
print("---Run SQL query---")
sql_query = state["sql_query"]
try:
executor = SQLExecutor2(dataset=dataset)
df = executor.query_to_df(sql_query)
return {"query_result": df.to_dict(orient="records")}
except Exception as e:
return {"error": str(e)}
# Third node, interpret the results and convert it back into natural language.
def result_interpreter(state):
"""
Takes the raw database output and converts it into a natural language response.
Args:
state (dict): The current graph state, that contains the result of the query (or an error if it didn't work)
Returns:
state (dict): New key added to state -- response -- that contains the final agent response.
"""
print("---Interpret results---")
query_result = state.get("query_result", [])
if not query_result:
return {"response": "No results were found, or the query failed."}
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are an AI assistant that summarizes findings from database results in a clear, human-readable format.\n"
),
(
"human", "{query_result}"
)
]
)
formatted_prompt = prompt.format_messages(query_result=query_result)
if len(formatted_prompt) > 1000:
return {"response": "The returned results were too long to be analyzed. Rephrase your query."}
summary = llm.invoke(formatted_prompt).content
return {"response": summary}
# On the other branch of our graph, if the question is too generic, the agent will just answer with a generic response.
def generic_response(state):
return {
"response": "I'm an agent specialized in car sales data analysis. I only have access to info like "
"sales date, price, car characteristics, and dealer name or region. "
"Ask me anything about car sales!"
}
class AgentState(TypedDict):
"""State object for the agent workflow."""
user_query: str
sql_query: str
query_result: list
response: str
# Create graph
graph = StateGraph(AgentState)
# Add nodes
graph.add_node("sql_translation", sql_translation)
graph.add_node("database_query", database_query)
graph.add_node("result_interpreter", result_interpreter)
graph.add_node("generic_response", generic_response)
# Define decision edges
graph.add_conditional_edges(
START,
dispatcher,
{
"SQL": "sql_translation", # If query is about sales, go to SQL path
"GENERIC": "generic_response" # Otherwise, respond with a generic answer
}
)
# Define SQL query flow
graph.add_edge("sql_translation", "database_query")
graph.add_edge("database_query", "result_interpreter")
graph.add_edge("result_interpreter", END)
class MyLLM(BaseLLM):
def __init__(self):
pass
def process(self, query, settings, trace):
prompt = query["messages"][0]["content"]
tracer = LangchainToDKUTracer(dku_trace=trace)
# Compile the graph
query_analyzer = graph.compile()
result = query_analyzer.invoke({"user_query": prompt}, config={"callbacks": [tracer]})
resp_text = result["response"]
sql_query = result.get("sql_query", [])
if not sql_query:
return {"text": resp_text}
# If the agent did succeed, then we return the final response, as well as the sql_query, for audit purposes.
full_resp_text = f"{resp_text}\n\nHere is the SQL query I ran:\n\n{sql_query}"
return {"text": full_resp_text}
Creating your code agent#
All code agents must implement the BaseLLM class.
The BaseLLM class is somewhat similar to this implementation:
class BaseLLM(BaseModel):
"""The base interface for a Custom LLM"""
# Implement this for synchronous answer
def process(self, query: SingleCompletionQuery, settings: CompletionSettings,
trace: SpanBuilder) -> CompletionResponse:
raise _NotImplementedError
# Implement this for asynchronous answer
async def aprocess(self, query: SingleCompletionQuery, settings: CompletionSettings,
trace: SpanBuilder) -> CompletionResponse:
raise _NotImplementedError
# Implement this for a streamed answer
def process_stream(self, query: SingleCompletionQuery, settings: CompletionSettings,
trace: SpanBuilder) -> Iterator[StreamCompletionResponse]:
raise _NotImplementedError
yield
# Implement this for a asynchronous streamed answer
async def aprocess_stream(self, query: SingleCompletionQuery,
settings: CompletionSettings, trace: SpanBuilder) -> \
AsyncIterator[StreamCompletionResponse]:
raise _NotImplementedError
yield
Generating an answer from a code agent follows the same rules, whether the agent is synchronous, streamed, or not.
Get an LLM (refer to Using your agent, for more details).
Process the input (and the settings).
Potentially manipulate the trace.
Invoke the LLM (refer to Using your agent, for more details).
Return the answer (refer to Using your agent, for more details).
1. Process the input without history#
If you want to deal with the last message, you should only take the last content from the query,
as highlighted in the following code:
prompt = query["messages"][-1]["content"]
completion = llm.new_completion().with_message(prompt)
## .../...
llm_resp = completion.execute()
prompt = query["messages"][-1]["content"]
message = [HumanMessage(prompt)]
## .../...
llm_resp = llm.invoke(message)
2. Process the input with history#
If your intent is to use your agent conversationally, you may need to process the query to provide the whole context to the LLM.
completion = llm.new_completion()
for m in query.get('messages'):
completion.with_message(m.get('content'), m.get('role'))
## .../...
llm_resp = completion.execute()
messages = []
for m in query.get('messages'):
match m.get('role'):
case 'user':
messages.append(HumanMessage(m.get('content')))
case 'assistant':
messages.append(AIMessage(m.get('content')))
case 'system':
messages.append(SystemMessage(m.get('content')))
case 'tool':
messages.append(ToolMessage(m.get('content')))
case _:
logger.info('Unknown role', m.get('content'))
##.../...
llm.invoke(messages)
3. Adding trace information#
with trace.subspan("Invoke the LLM") as subspan:
ai_msg = llm.invoke(messages)
subspan.attributes['messages']= str(messages)
Adding visual dependencies to your Code Agent settings#
You can add visual dependencies to your Code Agent to show interactions in the Flow.
A dependency is represented by a dict object with the following structure:
{'type': str, 'ref': str}
The type value is one of the following:
DATASETand therefvalue is the Dataset name.SAVED_MODELand therefvalue is the Model ID.RETRIEVABLE_KNOWLEDGEand therefvalue is the Knowledge Bank ID.
import dataiku
CODE_AGENT_ID = "" # fill with your Code Agent id
client = dataiku.api_client()
project = client.get_default_project()
code_agent = project.get_agent(CODE_AGENT_ID)
settings = code_agent.get_settings()
version = settings.active_version
vsettings = settings.get_version_settings(version)
raw = vsettings.get_raw()
raw["pythonAgentSettings"]["dependencies"] = [] # fill with the dependencies dictionnaries
vsettings._version_settings = raw
settings.save()
Handling the trace#
Once you have a trace, you can access the inputs, outputs, and attributes objects,
or if you prefer, you can use the to_dict() method.
Then you can modify these objects to reflect your needs.
You can add a child to the trace (use the subspan() method)
or append another trace to the current trace (use the append_trace() method).
In the following code example, we consider trace to be a trace,
and dict to be a dictionary built using trace.to_dict().
trace.inputs["additional_information"] = "Useful information"
dict["inputs"].update({"additional_information" : "Useful information"})
trace.outputs["additional_information"] = "Useful information"
dict["outputs"].update({"additional_information" : "Useful information"})
trace.attributes["additional_information"] = "Useful information"
dict["attributes"].update({"additional_information" : "Useful information"})
dict["name"] = "New name"
Note
Updating the name is rarely helpful, as you can provide the name with the subspan() method
Agent Tools#
Listing your agent tools#
The following code lists all the Agent Tools with their names and IDs, which you can reuse in the code samples listed later on this page.
tools_list = project.list_agent_tools()
for tool in tools_list:
print(f"{tool.name} - {tool.id}")
Creating your agent tool#
import dataiku
KB_ID = "" # fill with the ID of your Knowledge Bank
client = dataiku.api_client()
project = client.get_default_project()
vector_search_tool_creator = project.new_agent_tool("VectorStoreSearch")
vector_search_tool = vector_search_tool_creator.with_knowledge_bank(KB_ID).create()
Modifying your agent tool settings#
You can read and modify the parameters specific to a given tool like so:
settings = vector_search_tool.get_settings()
print(settings.params['maxDocuments'])
settings.params['maxDocuments'] = 8
settings.save()
You can also modify the generic settings of your DSS tool. For instance, you can enable/disable human approval for this tool in the context of visual agents.
settings = vector_search_tool.get_settings()
settings.get_raw()['requireHumanApproval'] = True
settings.get_raw()['allowEditingInputs'] = True # optionally enable editing inputs when validating tool calls
settings.save()
Running your agent tool#
vector_search_tool.run({"searchQuery": "best review"})
Impersonate end-user identity in a custom agent tool#
Note
If you are looking for impersonation in webapps, see Impersonation with webapps.
When a custom agent tool is called from Agent Hub or from a visual agent,
the Additional Request Context contains a dkuCallerTicket entry.
You can use this ticket with TicketImpersonationContext to create a DSS client
that performs API calls on behalf of the end user rather than the backend technical account.
This is useful when the tool contains custom authentication logic and must respect the user’s permissions when interacting with the agent.
dkuCallerTicket and TicketImpersonationContext in a custom agent tool#import dataiku
from dataiku.core.intercom import TicketImpersonationContext
from dataiku.llm.agent_tools import BaseAgentTool
class MyAgentTool(BaseAgentTool):
"""Custom agent tool that performs DSS API calls as the end user."""
def get_descriptor(self, tool):
return {
"description": "Returns authentication information for the calling DSS user",
"inputSchema": {
"type": "object",
"properties": {}
}
}
def invoke(self, input, trace):
caller_ticket = input["context"].get("dkuCallerTicket")
with TicketImpersonationContext(caller_ticket):
client = dataiku.api_client()
# client is now authenticated as the caller of this tool
# auth_info contains the user information of the caller
auth_info = client.get_auth_info()
return {
"output": auth_info,
"sources": []
}
Deleting your agent tool#
vector_search_tool.delete()
Tool response#
The response object contains additional information like sources and artifacts.
The explanations are the same as seen in the Agent response: sources and artifacts section.
{
"output": "",
"trace": "<removed for print>",
"sources": [],
"artifacts": []
}
Retrieval-augmented LLM agent#
Creating your RAG agent#
import dataiku
KB_ID = "" # fill with your Knowledge Bank's id
LLM_ID = "" # fill with your LLM id
client = dataiku.api_client()
project = client.get_default_project()
rag_agent = project.create_retrieval_augmented_llm("MyRAG", KB_ID, LLM_ID)
Modifying your RAG agent settings#
LLM_ID2 = "" # fill with an alternate LLM id
rag_agent_settings = rag_agent.get_settings()
active_version = rag_agent_settings.active_version
v_rag_agent_settings = rag_agent_settings.get_version_settings(active_version)
v_rag_agent_settings.llm_id = LLM_ID2
rag_agent_settings.save()
Running your RAG agent#
response = rag_agent.as_llm().new_completion().with_message("What will inflation in Europe look like and why?").execute()
print(response.text)
Deleting your RAG agent#
rag_agent.delete()
Visual Agent#
Creating your visual agent#
import dataiku
TOOL_ID = "" # fill with your agent tool's id
client = dataiku.api_client()
project = client.get_default_project()
agent = project.create_agent("visual1", "TOOLS_USING_AGENT")
Modifying your visual agent settings#
TOOL_ID = "" # fill with your agent tool id
tool = project.get_agent_tool(TOOL_ID)
agent_settings = agent.get_settings()
versionned_agent_settings = agent_settings.get_version_settings("v1")
versionned_agent_settings.llm_id = LLM_ID
versionned_agent_settings.add_tool(tool)
agent_settings.save()
Running your visual agent#
response = agent.as_llm().new_completion().with_message("tell me everything you know about Dataiku").execute()
response.text
Handling tool validation requests#
When running a visual agent with a human approval enabled tool, the agent requests human approval before making a call to that tool. For DSS agent tools, you can enable/disable human approval in the agent tool settings.
completion = agent.as_llm().new_completion().with_message("tell me everything you know about Dataiku")
response = completion.execute()
response.tool_validation_requests # inspect the tool validation requests
# augment the completion query to include the data required to resume execution of the query
completion.with_memory_fragment(response.memory_fragment)
completion.with_tool_validation_requests(response.tool_validation_requests)
# add validation response for each validation request
for tvreq in response.tool_validation_requests:
completion.with_tool_validation_response(tvreq["id"], validated=True)
# execute the augmented query to get the final answer
final_response = completion.execute()
final_response.text
Alternatively, you can build a separate followup query directly from the first response with response.prepare_followup:
# prepare a followup completion query to include the necessary data to resume
followup = response.prepare_followup()
# add validation responses for each validation request
for tvreq in response.tool_validation_requests:
followup.with_tool_validation_response(tvreq["id"], validated=True)
# execute the follow-up query to get the final answer
final_response = followup.execute()
final_response.text
Deleting your visual agent#
agent.delete()
Extracting sources#
Suppose you have an agent that is querying a knowledge bank. You can retrieve the sources by using the following code snippet:
#AGENT_ID = "" # Fill with your agent id
langchain_llm = project.get_llm(AGENT_ID).as_langchain_chat_model()
messages = "What is the climate in 2024"
current_agent_response = langchain_llm.invoke(messages)
last_trace = current_agent_response.response_metadata.get('lastTrace', None)
attributes = last_trace.get('attributes')
completionResponse = attributes.get('completionResponse')
additionalInformation = completionResponse.get('additionalInformation')
sources = additionalInformation.get('sources')
for source in sources:
items = source.get('items')
for document in items:
print(document)
Running multiple asynchronous agents#
Sometimes, achieving a task requires running multiple agents. Some of the tasks handled by the agent can be run in parallel.
To run agents in parallel, you need to invoke agents asynchronously, and be sure that your agent/tool is async compatible.
The ainvoke() method allows the user to make an asynchronous call.
import asyncio
AGENT_ID = "" ## Fill you your agent ID (agent:xxxxxxxx)
langchain_llm = project.get_llm(AGENT_ID).as_langchain_chat_model()
async def queryA():
try:
print("Calling query A")
resp = langchain_llm.ainvoke("Give all the professional information you can about the customer with ID: fdouetteau. Also include information about the company if you can.")
response = await resp
return response
except:
return None
async def queryB():
try:
print("Calling query B")
resp = langchain_llm.ainvoke("Give all the professional information you can about the customer with ID: tcook. Also include information about the company if you can.")
response = await resp
return response
except:
return None
## Uncomment this if you are running into a notebook
# import nest_asyncio
# nest_asyncio.apply()
loop = asyncio.get_event_loop()
results = [asyncio.create_task(query) for query in [queryA(), queryB()]]
loop.run_until_complete(asyncio.wait(results))
for r in results:
if r.result() and r.result().content:
print(r.result().content)
from langchain_core.runnables import RunnableParallel
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
AGENT_ID = "" ## Fill you your agent ID (agent:xxxxxxxx)
ids = ["id1", "id2"] ## Use your data
langchain_llm = project.get_llm(AGENT_ID).as_langchain_chat_model()
local_prompt = PromptTemplate(
input_variables=["user_id"],
template="Give all the professional information you can about the customer with ID: {user_id}. Also include information about the company if you can."
)
runnable_map = {
f"chain_{i}": local_prompt.partial(user_id=ida) | langchain_llm
for i,ida in enumerate(ids)}
parallel_chain = RunnableParallel(runnable_map)
results = parallel_chain.invoke({})
for key, output in results.items():
print(f"{key}: {getattr(output, 'content', output)}")
Reference documentation#
Classes#
|
Entry point for the DSS API client |
|
Langchain-compatible wrapper around Dataiku-mediated chat LLMs |
|
A handle to interact with a DSS-managed LLM. |
A handle to interact with a completion query. |
|
|
A handle to interact with a project on the DSS instance. |
|
A simple impersonation context that uses an externally-provided ticket |
Functions#
|
|
|
Run the completions query and retrieve the LLM response. |
|
Run the completion query and retrieve the LLM response as streamed chunks. |
Get a handle to the current default project, if available (i.e. |
|
|
Get a handle to interact with a specific LLM |
|
List the LLM usable in this project |
Create a new completion query. |
|
|
|
|
|
|
Add a message to the completion query. |
