Agents#

For more details on the Agents, please refer to our documentation: AI Agents.
For more details on the LLM Mesh, please refer to our documentation Introduction.
If you want more information about the LLM Mesh API, please refer to LLM Mesh.

Tutorials

You can find tutorials on this subject in the Developer Guide: Agents and Tools for Generative AI.

Note

Once you have a DKUChatModel, obtained by as_langchain_chat_model(), you can use the langchain methods (like invoke, stream, etc.)

Tip

In the code provided on this page, you will need to provide ID of other elements. You will find how to list your LLM , list your Knowledge Bank or list your Agent Tools in dedicated pages.

Listing your agents#

The following code lists all the Agents and their IDs, which you can reuse in the code samples listed later on this page.

How to retrieve an Agent’s ID#
import dataiku

client = dataiku.api_client()
project = client.get_default_project()
llm_list = project.list_llms()
for llm in llm_list:
    if 'agent:' in llm.id:
        print(f"- {llm.description} (id: {llm.id})")

Using your agent#

Using the native DSSLLM completion, for more information, refer to Perform completion queries on LLMs:

Using your agent#
import dataiku

AGENT_ID = "" # Fill with your agent id
client = dataiku.api_client()
project = client.get_default_project()
llm = project.get_llm(AGENT_ID)
completion = llm.new_completion()
resp = completion.with_message("How to run an agent?").execute()
if resp.success:
    print(resp.text)

Streaming (in a notebook)#

Streaming (in a notebook)#
from dataikuapi.dss.llm import DSSLLMStreamedCompletionChunk, DSSLLMStreamedCompletionFooter
from IPython.display import display, clear_output

AGENT_ID = "" # Fill with your agent id
client = dataiku.api_client()
project = client.get_default_project()
llm = project.get_llm(AGENT_ID)
completion = llm.new_completion()
completion.with_message("Who is the customer fdouetteau? Please provide additional information.")

gen = ""
for chunk in completion.execute_streamed():
    if isinstance(chunk, DSSLLMStreamedCompletionChunk):
        gen += chunk.data["text"]
        clear_output()
        display("Received text: %s" % gen)
    elif isinstance(chunk, DSSLLMStreamedCompletionFooter):
        print("Completion is complete: %s" % chunk.data)

Asynchronous Streaming (in a notebook)#

Asynchronous Streaming (in a notebook)#
import asyncio
from IPython.display import display, clear_output

async def func(response):
    gen = ""
    async for r in response:
        clear_output()
        gen += r.content
        display(gen)

AGENT_ID = "" # Fill with your agent id
langchain_llm = project.get_llm(AGENT_ID).as_langchain_chat_model()
resp = langchain_llm.astream("Who is the customer fdouetteau? Please provide additional information.")


await(func(resp))

Agent response: sources and artifacts#

When an agent is queried via the Dataiku LLM Mesh, it may return sources to show which documents it has used to formulate its response. It may also return artifacts to enhance its response.

  • Sources are documents that an agent reads to help it formulate an answer to a user’s query.

  • Artifacts are similar to sources, but they are produced by an agent as an output for the user, rather than being read as an input.

After a call to your agent, you will get a dataikuapi.dss.llm.DSSLLMCompletionResponse. If the call was streamed, you will get a series of DSSLLMStreamedCompletionChunk objects and a final DSSLLMStreamedCompletionFooter object.

In non-streamed mode, the artifacts are returned in the artifacts field of the response, and the sources are returned in the additionalInformation.sources field of the response:

Agent response in non-streamed mode#
{
  "ok": true,
  "text": "..."
  "finishReason": "STOP",
  "artifacts": [  ],  /* list of artifact objects */
  "additionalInformation": {
    "sources": [  ],  /* list of source objects */
  }
}

In streamed mode, artifacts are streamed inline with text chunks, and sources are included in the streaming footer:

Agent response in streamed mode#
{
    {"type": "event", "eventKind": "AGENT_GETTING_READY"}
    {"type": "event", "eventKind": "AGENT_THINKING"}
    {"type": "content", "text": "..." }
    {"type": "content", "artifacts": [  ]}
    {"type": "content", "artifacts": [  ]}
    {"type": "content", "text": "..." }
    /* ... */
    {
      "type": "footer",
      "finishReason": "STOP",
      "additionalInformation": {
        "sources": [  ]
      }
      "trace": " "
    }
}

Artifact structure#

The artifact object has the following structure:

Artifact structure#
{
  "id": "...",
  "type": "...",
  "name": "...",
  "description": "...",
  "hierarchy": [  ],
  "parts": [  ],
}
  • id is optional. It is used for aggregating streaming chunks if one artifact is streamed in multiple chunks.

  • type is mandatory. It instructs Dataiku on how to display this artifact and signals which parts are permitted.

  • name and description are optional. They are used in the Dataiku UI to describe this artifact to the user.

  • hierarchy is optional. It is used to identify the origin of artifacts in complex agent architectures.

  • parts is mandatory. This is a list of part objects, where the data of the artifact resides. Both sources and artifacts share the same data structure for parts (called items for sources).

Source structure#

The source object has the following format:

Source structure#
{
  "toolCallDescription": "...",
  "hierarchy": [  ],
  "items": [  ],
}
  • toolCallDescription is optional.

  • hierarchy is optional. It is used to identify the origin of artifacts in complex agent architectures.

  • items is mandatory. This is a list of part objects, where the data of the artifact resides. Both sources and artifacts share the same data structure for items (called parts for artifacts)

Source items and Artifact parts structure#

The parts field of artifact and the items field of source have the same format:

Parts and Items structure#
{
  "type": "...",
  "index": /* integer value */,
  /* type-specific fields */
}
  • type is mandatory. It informs DSS of the type of part this item is, and it determines which type-specific fields can be provided.

  • index is optional. It is used for aggregating streaming chunks if one part is streamed in multiple chunks.

  • Each part has different additional fields specific to the part type.

Code Agents#

In Dataiku, you can implement a custom agent in code that leverages models from the LLM Mesh, LangChain, and its wider ecosystem.

The resulting agent becomes part of the LLM Mesh, seamlessly integrating into your AI workflows.

Dataiku includes basic code examples to help you get started. Below are more advanced samples that showcase full-fledged examples of agents built with LangChain and LangGraph. They both work with the internal code environment for retrieval-augmented generation to avoid any code env issue.

This support agent is designed to handle customer inquiries efficiently. With its tools, it can:

  • retrieve relevant information from an FAQ database

  • log issues for follow-up when immediate answers aren’t available

  • escalate complex requests to a human agent when necessary.

We have tested it on this [Paris Olympics FAQ dataset](https://www.kaggle.com/datasets/sahityasetu/paris-2024-olympics-faq), which we used to create a knowledge bank with the Embed recipe. We have embedded a column containing both the question and the corresponding answer. Use the agent on inquiries like: How will transportation work in Paris during the Olympic Games? or I booked a hotel for the Olympic games in Paris, but never received any confirmation. What’s happening? and see how it reacts!

import dataiku
from dataiku.llm.python import BaseLLM
from dataiku.langchain import LangchainToDKUTracer
from langchain.tools import Tool
from langchain.agents import create_openai_tools_agent, AgentExecutor
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder

## 1. Set Up Vector Search for FAQs
# Here, we are using a knowledge bank from the flow, build with our native Embed recipe.
# We make it a Langchain retriever and pass it to our first tool.

KB_ID = "" # fill with the ID of your Knowledge Bank
LLM_ID = "" # fill with your LLM id

faq_retriever = dataiku.KnowledgeBank(id=KB_ID).as_langchain_retriever()
faq_retriever_tool = Tool(
    name="FAQRetriever",
    func=faq_retriever.get_relevant_documents,
    description="Retrieves answers from the FAQ database based on user questions."
)


## 2. Define (fake) Ticketing & Escalation Tools
# Simulated ticket creation function

def create_ticket(issue: str):
    # Here, you would typically use the API to your internal ticketing tool.
    return f"Ticket created: {issue}"


ticketing_tool = Tool(
    name="CreateTicket",
    func=create_ticket,
    description="Creates a support ticket when the issue cannot be resolved automatically."
)


# Simulated escalation function
def escalate_to_human(issue: str):
    # This function could send a notification to the support engineers, for instance.
    # It can be useful to attach info about the customer's request, sentiment, and history.
    return f"Escalation triggered: {issue} has been sent to a human agent."


escalation_tool = Tool(
    name="EscalateToHuman",
    func=escalate_to_human,
    description="Escalates the issue to a human when it's too complex, or the user is upset."
)

## 3. Build the LangChain Agent
# Define LLM for agent reasoning
llm = dataiku.api_client().get_default_project().get_llm(LLM_ID).as_langchain_chat_model()

# Agent tools (FAQ retrieval + ticketing + escalation)
tools = [faq_retriever_tool, ticketing_tool, escalation_tool]
tool_names = [tool.name for tool in tools]

# Define the prompt
prompt = ChatPromptTemplate.from_messages(
    [
        ("system",
         """You are an AI customer support agent. Your job is to assist users by:
         - Answering questions using the FAQ retriever tool.
         - Creating support tickets for unresolved issues.
         - Escalating issues to a human when necessary."""),
        MessagesPlaceholder("chat_history", optional=True),
        ("human", "{input}"),
        MessagesPlaceholder("agent_scratchpad"),
    ]
)

# Initialize an agent with tools.
# Here, we define it as an agent that uses OpenAI tools.
# More options are available at https://python.langchain.com/api_reference/langchain/agents.html

agent = create_openai_tools_agent(llm=llm, tools=tools, prompt=prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools)

agent_executor.invoke({"input": "How will transportation work in Paris during the Olympic Games?"})

class MyLLM(BaseLLM):
    def __init__(self):
        pass

    def process(self, query, settings, trace):
        prompt = query["messages"][0]["content"]
        tracer = LangchainToDKUTracer(dku_trace=trace)
        # Wrap the agent in an executor
        agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
        response = agent_executor.invoke({"input": prompt}, config={"callbacks": [tracer]})
        return {"text": response["output"]}

Creating your code agent#

All code agents must implement the BaseLLM class. The BaseLLM class is somewhat similar to this implementation:

Creating your code agent#
class BaseLLM(BaseModel):
    """The base interface for a Custom LLM"""

    # Implement this for synchronous answer
    def process(self, query: SingleCompletionQuery, settings: CompletionSettings,
                trace: SpanBuilder) -> CompletionResponse:
        raise _NotImplementedError

    # Implement this for asynchronous answer
    async def aprocess(self, query: SingleCompletionQuery, settings: CompletionSettings,
                       trace: SpanBuilder) -> CompletionResponse:
        raise _NotImplementedError

    # Implement this for a streamed answer
    def process_stream(self, query: SingleCompletionQuery, settings: CompletionSettings,
                       trace: SpanBuilder) -> Iterator[StreamCompletionResponse]:
        raise _NotImplementedError
        yield

    # Implement this for a asynchronous streamed answer
    async def aprocess_stream(self, query: SingleCompletionQuery,
                              settings: CompletionSettings, trace: SpanBuilder) -> \
            AsyncIterator[StreamCompletionResponse]:
        raise _NotImplementedError
        yield

Generating an answer from a code agent follows the same rules, whether the agent is synchronous, streamed, or not.

  1. Get an LLM (refer to Using your agent, for more details).

  2. Process the input (and the settings).

  3. Potentially manipulate the trace.

  4. Invoke the LLM (refer to Using your agent, for more details).

  5. Return the answer (refer to Using your agent, for more details).

1. Process the input without history#

If you want to deal with the last message, you should only take the last content from the query, as highlighted in the following code:

Process the input without history#
    prompt = query["messages"][-1]["content"]
    completion = llm.new_completion().with_message(prompt)

    ## .../...

    llm_resp = completion.execute()

2. Process the input with history#

If your intent is to use your agent conversationally, you may need to process the query to provide the whole context to the LLM.

Process the input with history#
completion = llm.new_completion()
for m in query.get('messages'):
    completion.with_message(m.get('content'), m.get('role'))

## .../...

llm_resp = completion.execute()

3. Adding trace information#

Adding trace information#
with trace.subspan("Invoke the LLM") as subspan:
    ai_msg = llm.invoke(messages)
    subspan.attributes['messages']= str(messages)

Adding visual dependencies to your Code Agent settings#

You can add visual dependencies to your Code Agent to show interactions in the Flow. A dependency is represented by a dict object with the following structure:

{'type': str, 'ref': str}

The type value is one of the following:

  • DATASET and the ref value is the Dataset name.

  • SAVED_MODEL and the ref value is the Model ID.

  • RETRIEVABLE_KNOWLEDGE and the ref value is the Knowledge Bank ID.

Modifying your Code Agent settings#
import dataiku

CODE_AGENT_ID = "" # fill with your Code Agent id

client = dataiku.api_client()
project = client.get_default_project()
code_agent = project.get_agent(CODE_AGENT_ID)

settings = code_agent.get_settings()
version = settings.active_version
vsettings = settings.get_version_settings(version)
raw = vsettings.get_raw()
raw["pythonAgentSettings"]["dependencies"] = [] # fill with the dependencies dictionnaries
vsettings._version_settings = raw
settings.save()

Handling the trace#

Once you have a trace, you can access the inputs, outputs, and attributes objects, or if you prefer, you can use the to_dict() method. Then you can modify these objects to reflect your needs. You can add a child to the trace (use the subspan() method) or append another trace to the current trace (use the append_trace() method).

In the following code example, we consider trace to be a trace, and dict to be a dictionary built using trace.to_dict().

Adding input information on a trace#
trace.inputs["additional_information"] = "Useful information"
dict["inputs"].update({"additional_information" : "Useful information"})
Adding output information on a trace#
trace.outputs["additional_information"] = "Useful information"
dict["outputs"].update({"additional_information" : "Useful information"})
Adding attributes information on a trace#
trace.attributes["additional_information"] = "Useful information"
dict["attributes"].update({"additional_information" : "Useful information"})
Changing the name of a trace#
dict["name"] = "New name"

Note

Updating the name is rarely helpful, as you can provide the name with the subspan() method

Agent Tools#

Listing your agent tools#

The following code lists all the Agent Tools with their names and IDs, which you can reuse in the code samples listed later on this page.

Listing your agent tools#
tools_list = project.list_agent_tools()

for tool in tools_list:
    print(f"{tool.name} - {tool.id}")

Creating your agent tool#

Creating your agent tool#
import dataiku

KB_ID = "" # fill with the ID of your Knowledge Bank

client = dataiku.api_client()
project = client.get_default_project()
vector_search_tool_creator = project.new_agent_tool("VectorStoreSearch")
vector_search_tool = vector_search_tool_creator.with_knowledge_bank(KB_ID).create()

Modifying your agent tool settings#

Modifying your agent tool settings#
settings = vector_search_tool.get_settings()
print(settings.params['maxDocuments'])
settings.params['maxDocuments'] = 8
settings.save()

Running your agent tool#

Running your agent tool#
vector_search_tool.run({"searchQuery": "best review"})

Deleting your agent tool#

Deleting your agent tool#
vector_search_tool.delete()

Tool response#

The response object contains additional information like sources and artifacts. The explanations are the same as seen in the Agent response: sources and artifacts section.

Tool response#
{
  "output": "",
  "trace": "<removed for print>",
  "sources": [],
  "artifacts": []
}

Retrieval-augmented LLM agent#

Creating your RAG agent#

Creating your RAG agent#
import dataiku

KB_ID = "" # fill with your Knowledge Bank's id
LLM_ID = "" # fill with your LLM id

client = dataiku.api_client()
project = client.get_default_project()
rag_agent = project.create_retrieval_augmented_llm("MyRAG", KB_ID, LLM_ID)

Modifying your RAG agent settings#

Modifying your RAG agent settings#
LLM_ID2 = "" # fill with an alternate LLM id
rag_agent_settings = rag_agent.get_settings()
active_version = rag_agent_settings.active_version
v_rag_agent_settings = rag_agent_settings.get_version_settings(active_version)
v_rag_agent_settings.llm_id = LLM_ID2
rag_agent_settings.save()

Running your RAG agent#

Running your RAG agent#
response = rag_agent.as_llm().new_completion().with_message("What will inflation in Europe look like and why?").execute()
print(response.text)

Deleting your RAG agent#

Deleting your RAG agent#
rag_agent.delete()

Visual Agent#

Creating your visual agent#

Creating your visual agent#
import dataiku

TOOL_ID = "" # fill with your agent tool's id

client = dataiku.api_client()
project = client.get_default_project()
agent = project.create_agent("visual1", "TOOLS_USING_AGENT")

Modifying your visual agent settings#

Modifying your visual agent settings#
TOOL_ID = "" # fill with your agent tool id
tool = project.get_agent_tool(TOOL_ID)
agent_settings = agent.get_settings()
versionned_agent_settings = agent_settings.get_version_settings("v1")
versionned_agent_settings.llm_id = LLM_ID
versionned_agent_settings.add_tool(tool)
agent_settings.save()

Running your visual agent#

Running your visual agent#
response = agent.as_llm().new_completion().with_message("tell me everything you know about Dataiku").execute()
response.text

Deleting your visual agent#

Deleting your visual agent#
agent.delete()

Extracting sources#

Suppose you have an agent that is querying a knowledge bank. You can retrieve the sources by using the following code snippet:

Extracting sources#
#AGENT_ID = "" # Fill with your agent id
langchain_llm = project.get_llm(AGENT_ID).as_langchain_chat_model()
messages = "What is the climate in 2024"
current_agent_response = langchain_llm.invoke(messages)
last_trace = current_agent_response.response_metadata.get('lastTrace', None)
attributes = last_trace.get('attributes')
completionResponse = attributes.get('completionResponse')
additionalInformation = completionResponse.get('additionalInformation')
sources = additionalInformation.get('sources')
for source in sources:
    items = source.get('items')
    for document in items:
        print(document)

Running multiple asynchronous agents#

Sometimes, achieving a task requires running multiple agents. Some of the tasks handled by the agent can be run in parallel.

To run agents in parallel, you need to invoke agents asynchronously, and be sure that your agent/tool is async compatible. The ainvoke() method allows the user to make an asynchronous call.

Running multiple asynchronous agents#
import asyncio

AGENT_ID = "" ## Fill you your agent ID (agent:xxxxxxxx)
langchain_llm = project.get_llm(AGENT_ID).as_langchain_chat_model()

async def queryA():
    try:
        print("Calling query A")
        resp = langchain_llm.ainvoke("Give all the professional information you can about the customer with ID: fdouetteau. Also include information about the company if you can.")
        response = await resp
        return response
    except:
        return None

async def queryB():
    try:
        print("Calling query B")
        resp = langchain_llm.ainvoke("Give all the professional information you can about the customer with ID: tcook. Also include information about the company if you can.")
        response = await resp
        return response
    except:
        return None

## Uncomment this if you are running into a notebook
# import nest_asyncio
# nest_asyncio.apply()

loop = asyncio.get_event_loop()
results = [asyncio.create_task(query) for query in [queryA(), queryB()]]
loop.run_until_complete(asyncio.wait(results))
for r in results:
    if r.result() and r.result().content:
        print(r.result().content)

Reference documentation#

Classes#

dataikuapi.DSSClient(host[, api_key, ...])

Entry point for the DSS API client

dataikuapi.dss.langchain.DKUChatModel(*args, ...)

Langchain-compatible wrapper around Dataiku-mediated chat LLMs

dataikuapi.dss.llm.DSSLLM(client, ...)

A handle to interact with a DSS-managed LLM.

dataikuapi.dss.llm.DSSLLMCompletionQuery(llm)

A handle to interact with a completion query.

dataikuapi.dss.project.DSSProject(client, ...)

A handle to interact with a project on the DSS instance.

Functions#

append_trace(trace_to_append)

execute()

Run the completions query and retrieve the LLM response.

execute_streamed()

Run the completion query and retrieve the LLM response as streamed chunks.

get_default_project()

Get a handle to the current default project, if available (i.e.

get_llm(llm_id)

Get a handle to interact with a specific LLM

list_llms([purpose, as_type])

List the LLM usable in this project

new_completion()

Create a new completion query.

subspan(name)

to_dict()

with_message(message[, role])

Add a message to the completion query.