LLM Mesh#

The LLM Mesh is the common backbone for Enterprise Generative AI Applications. For more details on the LLM Mesh features of Dataiku, please visit Generative AI and LLM Mesh.

The LLM Mesh API allows you to:

  • Send completion and embedding queries to all LLMs supported by the LLM Mesh

  • Stream responses from LLMs that support it

  • Query LLMs using multimodal inputs (image and text)

  • Query the LLM Mesh from LangChain code

  • Interact with knowledge banks, and perform semantic search

  • Create a fine-tuned saved model

Perform completion queries on LLMs#

Your first simple completion query#

This sample

import dataiku

# Fill with your LLM id. For example, if you have an OpenAI connection called "myopenai", LLM_ID can be "openai:myopenai:gpt-4o"
# To get the list of LLM ids, you can use project.list_llms() (see below)
LLM_ID = ""

# Create a handle for the LLM of your choice
client = dataiku.api_client()
project = client.get_default_project()
llm = project.get_llm(LLM_ID)

# Create and run a completion query
completion = llm.new_completion()
completion.with_message("Write a haiku on GPT models")
resp = completion.execute()

# Display the LLM output
if resp.success:
    print(resp.text)

# GPT, a marvel,
# Deep learning's symphony plays,
# Thoughts dance, words unveil.

Multi-turn and system prompts#

You can have multiple messages in the completion object, with roles

completion = llm.new_completion()

# First, put a system prompt
completion.with_message("You are a poetic assistant who always answers in haikus", role="system")

# Then, give an example, or send the conversation history
completion.with_message("What is a transformer", role="user")
completion.with_message("Transformers, marvels\nOf the deep learning research\nAttention, you need", role="assistant")

# Then, the last query of the user
completion.with_message("What's your name", role="user")

resp = completion.execute()

Multimodal input#

Multimodal input is supported on a subset of the LLMs in the LLM Mesh:

  • OpenAI

  • Bedrock Anthropic Claude

  • Azure OpenAI

  • Gemini Pro

completion = llm.new_completion()

with open("myimage.jpg", "rb") as f:
    image = f.read()

mp_message = completion.new_multipart_message()
mp_message.with_text("The image represents an artwork. Describe it as it would be described by art critics")
mp_message.with_inline_image(image)

# Add it to the completion request
mp_message.add()

resp = completion.execute()

Completion settings#

You can set settings on the completion query

completion = llm.new_completion()
completion.with_message("Write a haiku on GPT models")

completion.settings["temperature"] = 0.7
completion.settings["topK"] = 10
completion.settings["topP"] = 0.3
completion.settings["maxOutputTokens"] = 2048
completion.settings["stopSequences"] = [".", "\n"]
completion.settings["presencePenalty"] = 0.6
completion.settings["frequencyPenalty"] = 0.9
completion.settings["logitBias"] = {
  1489: 60,  # apply a logit bias of 60 on token value "1489"
}
completion.settings["logProbs"] = True
completion.settings["topLogProbs"] = 3

resp = completion.execute()

Response streaming#

from dataikuapi.dss.llm import DSSLLMStreamedCompletionChunk, DSSLLMStreamedCompletionFooter

completion = llm.new_completion()
completion.with_message("Please explain special relativity")

for chunk in completion.execute_streamed():
    if isinstance(chunk, DSSLLMStreamedCompletionChunk):
        print("Received text: %s" % chunk.data["text"])
    elif isinstance(chunk, DSSLLMStreamedCompletionFooter):
        print("Completion is complete: %s" % chunk.data)

Read LLM Mesh metadata#

List and get LLMs#

import dataiku
client = dataiku.api_client()
project = client.get_default_project()
llm_list = project.list_llms()

By default, list_llms() returns a list of DSSLLMListItem. To get more details :

for llm in llm_list:
    print(f"- {llm.description} (id: {llm.id})")

Text embedding#

import dataiku

EMBEDDING_MODEL_ID = "" # Fill with your embedding model id, for example: openai:myopenai:text-embedding-3-small

# Create a handle for the embedding model of your choice
client = dataiku.api_client()
project = client.get_default_project()
emb_model = project.get_llm(EMBEDDING_MODEL_ID)

# Create and run an embedding query
txt = "The quick brown fox jumps over the lazy dog."
emb_query = emb_model.new_embeddings()
emb_query.add_text(txt)
emb_resp = emb_query.execute()

# Display the embedding output
print(emb_resp.get_embeddings())

# [[0.000237455,
#   -0.103262354,
#   ...
# ]]

Tool calls#

Tool calls (sometimes referred to as “function calling”) allow you to augment a LLM with “tools”, functions that it can call and provide the arguments. Your client code can then perform those calls, and provide the output back to the LLM so that it can generate the next response.

Tool calls are supported on the compatible completion models of some LLM connections:

  • OpenAI

  • Azure OpenAI

  • Azure LLM

  • Anthropic Claude

  • Anthropic Claude models on AWS Bedrock connections

  • MistralAI

Define tools#

You can define tools as settings in the completion query. Tool parameters are defined as JSON Schema objects. See the JSON Schema reference for documentation about the format.

Tools can also be automatically prepared and invoked from Python code, e.g. using Langchain.

completion = llm.new_completion()
completion.settings["tools"] = [
  {
    "type": "function",
    "function": {
      "name": "multiply",
      "description": "Multiply integers",
      "parameters": {
        "type": "object",
        "properties": {
          "a": {
            "type": "integer",
            "description": "The first integer to multiply",
          },
          "b": {
            "type": "integer",
            "description": "The other integer to multiply",
          },
        },
        "required": ["a", "b"],
      }
    }
  }
]

completion.with_message("What is 3 * 6 ?")
resp = completion.execute()

# {
#   "ok": true,
#   "finishReason": "tool_calls",
#   "toolCalls": [
#     {
#       "type": "function",
#       "function": {
#         "name": "multiply",
#         "arguments": "{\"a\":3,\"b\":6}"
#       },
#       "id": "call_da9P8tJ0TcnvdFbsJtpUwqZr"
#     }
#   ],
#   ...
# }

Response streaming with tool calls#

LLM responses which include tool calls can also leverage streaming. Depending on the LLM, response chunks may include either complete tool calls or partial tool calls. When the LLM sends partial tool calls, the streamed chunk contains an extra field index allowing to reconstruct the whole LLM response.

for chunk in completion.execute_streamed():
    if isinstance(chunk, DSSLLMStreamedCompletionChunk):
        if "text" in chunk.data:
            print("Received text: %s" % chunk.data["text"])
        if "toolCalls" in chunk.data:
            print("Received tool call: %s" % chunk.data["toolCalls"])

    elif isinstance(chunk, DSSLLMStreamedCompletionFooter):
        print("Completion is complete: %s" % chunk.data)

Provide tool outputs#

Tool calls can then be parsed and executed. In order to provide the tool response in the chat messages, use the following methods:

tool_calls = resp.tool_calls
call_id = tool_calls[0]["id"]

completion.with_tool_calls(tool_calls)
completion.with_tool_output("18", tool_call_id=call_id)

resp = completion.execute()

# {
#   "ok": true,
#   "finishReason": "stop",
#   "text": "3 multiplied by 6 is 18.",
#   ...
# }

Control tool usage#

Tool usage can be constrained in the completion settings:

completion = llm.new_completion()

# Let the LLM decide whether to call a tool
completion.settings["toolChoice"] = {"type": "auto"}

# The LLM must call at least one tool
completion.settings["toolChoice"] = {"type": "required"}

# The LLM must not call any tool
completion.settings["toolChoice"] = {"type": "none"}

# The LLM must call the tool with name 'multiply'
completion.settings["toolChoice"] = {"type": "tool_name", "name": "multiply"}

Knowledge Banks (KB)#

List and get KBs#

To list the KB present in a project:

import dataiku
client = dataiku.api_client()
project = client.get_default_project()
kb_list = project.list_knowledge_banks()

By default, list_knowledge_banks() returns a list of DSSKnowledgeBankListItem. To get more details:

for kb in kb_list:
    print(f"{kb.name} (id: {kb.id})")

To get a “core handle” on the KB (i.e. to retrieve a KnowledgeBank object) :

KB_ID = "" # Fill with your KB id
kb_public_api = project.get_knowledge_bank(KB_ID)
kb_core = kb_public_api.as_core_knowledge_bank()

LangChain integration#

Dataiku LLM model objects can be turned into langchain-compatible objects, making it easy to:

  • stream responses

  • run asynchronous queries

  • batch queries

  • chain several models and adapters

  • integrate with the wider langchain ecosystem

Transforming LLM handles to LangChain model#

# In this sample, llm is the result of calling project.get_llm() (see above)

# Turn a regular LLM handle into a langchain-compatible one
langchain_llm = llm.as_langchain_llm()

# Run a single completion query
langchain_llm.invoke("Write a haiku on GPT models")

# Run a batch of completion queries
langchain_llm.batch(["Write a haiku on GPT models", "Write a haiku on GPT models in German"])

# Run a completion query and stream the response
for chunk in langchain_llm.stream("Write a haiku on GPT models"):
    print(chunk, end="", flush=True)

See the langchain documentation for more details.

You can also turn it into a langchain “chat model”, a specific type of LLM geared towards conversation:

# In this sample, llm is the result of calling project.get_llm() (see above)

# Turn a regular LLM handle into a langchain-compatible one
langchain_llm = llm.as_langchain_chat_model()

# Run a simple query
langchain_llm.invoke("Write a haiku on GPT models")

# Run a chat query
from langchain_core.messages import HumanMessage, SystemMessage

messages = [
    SystemMessage(content="You're a helpful assistant"),
    HumanMessage(content="What is the purpose of model regularization?"),
]
langchain_llm.invoke(messages)

# Streaming and chaining
from langchain.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
chain = prompt | langchain_llm
for chunk in chain.stream({"topic": "parrot"}):
    print(chunk.content, end="", flush=True)

See the langchain documentation for more details.

Creating Langchain models directly#

If running from inside DSS, you can also directly create the Langchain model:

from dataiku.langchain.dku_llm import DKULLM, DKUChatLLM

langchain_llm = DKUChatLLM(llm_id="your llm id") # For example: openai:myopenai:gpt-4o

Response streaming#

LangChain adapters (DKULLM and DKUChatLLM) also support streaming of answer:

from dataiku.langchain.dku_llm import DKULLM, DKUChatLLM
from langchain_core.messages import HumanMessage, SystemMessage

langchain_llm = DKUChatLLM(llm_id="your llm id") # For example: openai:myopenai:gpt-4o

messages = [
    SystemMessage(content="You're a helpful assistant"),
    HumanMessage(content="What is the purpose of model regularization?"),
]

for gen in langchain_llm.stream(messages):
    print(gen)

Using knowledge banks as LangChain retrievers#

Core handles allow users to leverage the Langchain library and, through it:

  • query the KB for semantic similarity search

  • combine the KB with an LLM to form a chain and perform complex workflows such as retrieval-augmented generation (RAG).

In practice, core handles expose KBs as a Langchain-native vector store through two different methods:

  • as_langchain_retriever() returns a generic VectorStoreRetriever object

  • as_langchain_vectorstore() returns an object whose class corresponds to the KB type. For example, for a FAISS-based KB, you will get a langchain.vectorstores.faiss.FAISS object.

import dataiku
client = dataiku.api_client()
project = client.get_default_project()
kb_core = project.get_knowledge_bank(KB_ID).as_core_knowledge_bank()

# Return a langchain.vectorstores.base.VectorStoreRetriever
lc_generic_vs= kb_core.as_langchain_retriever()

# Return an object which type depends on the KB type
lc_vs = kb_core.as_langchain_vectorstore()

# [...] Move forward with similarity search or RAG 

Using tool calls#

The LangChain chat model adapter supports tool calling, assuming that the underlying LLM supports it too.

import dataiku

from langchain_core.tools import tool
from langchain_core.messages import HumanMessage

# Define tools

@tool
def add(a: int, b: int) -> int:
    """Adds a and b."""
    return a + b

@tool
def multiply(a: int, b: int) -> int:
    """Multiplies a and b."""
    return a * b

tools_by_name = {"add": add, "multiply": multiply}
tools = [add, multiply]
tool_choice = {"type": "auto"}

# Get the LangChain chat model, bind it to the tools
client = dataiku.api_client()
project = client.get_default_project()
llm_id = "<your llm id>"  # For example: "openai:myopenai:gpt-4o"
llm = project.get_llm(llm_id).as_langchain_chat_model()
llm_with_tools = llm.bind_tools(tools, tool_choice=tool_choice)

# Ask your question
messages = [HumanMessage("What is 3 * 12? and 6 + 4?")]
ai_msg = llm_with_tools.invoke(messages)
messages.append(ai_msg)

# Retrieve tool calls, run them and put the results in the chat messages
for tool_call in ai_msg.tool_calls:
    tool_name = tool_call["name"]
    selected_tool = tools_by_name[tool_name]
    tool_msg = selected_tool.invoke(tool_call)
    messages.append(tool_msg)

# Get the final response
ai_msg = llm_with_tools.invoke(messages)
ai_msg.content
# '3 * 12 is 36, and 6 + 4 is 10.'

Fine-tuning#

Create a Fine-tuned LLM Saved Model version#

Note

This API capability is experimental.

Visual model fine-tuning is also available to customers with the Advanced LLM Mesh add-on.

With a Python recipe or notebook, it is possible to fine-tune an LLM from the HuggingFace Hub and save it as a Fine-tuned LLM Saved Model version. This is done with the create_finetuned_llm_version() method, which takes an LLM Mesh connection name as input. Settings on this connection like usage permission, guardrails, code environment, or container configuration, will apply at inference time.

The above method must be called on an existing Saved Model. Create one either programmatically (if you are in a notebook and don’t have one yet) with create_finetuned_llm_saved_model() or visually from the Saved Models list via +New Saved Model > Fine-tuned LLM (if you want to do this in a python recipe, its output Saved Model must exist to create the recipe).

Here we fine-tune using several open-source frameworks from HuggingFace: transformers, trl & peft.

Attention

Note that fine-tuning a local LLM requires significant computational resources (GPU). The code samples below show state-of-the-art techniques to optimize memory usage and processing time, but this depends on your setup and might not always work. Also, beware that the size of your training (and optionally validation) dataset(s) greatly impacts the memory use and storage during fine-tuning.

One can fine-tune a smaller LLM with a small GPU available. Phi3 Mini is a good example, with “only” 3.8B parameters.

There are many techniques available to reduce memory usage and speed up computation. One of them is called Low-Rank Adaptation. It consists in freezing the weights from the base model and adding new, trainable matrices to the Transformer architecture. It drastically reduces the number of trainable parameters and, hence, the GPU memory requirement.

import datasets
from peft import LoraConfig
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import SFTConfig, SFTTrainer

from dataiku import recipe
from dataiku.llm.finetuning import formatters

base_model_name = "microsoft/Phi-3-mini-4k-instruct"
assert base_model_name, ("please specify a base LLM, it must be available"
                         " on HuggingFace hub")

connection_name = "a_huggingface_connection_name"
assert connection_name, ("please specify a connection name, the fine-tuned "
                         "LLM will be available from this connection")

##################
# Initial setup
##################
# Here, we're assuming that your training dataset is composed of 2 columns:
# the input (user message) and expected output (assistant message).
# If using a validation dataset, format should be the same.
user_message_column = "input"
assistant_message_column = "output"
columns = [user_message_column, assistant_message_column]

system_message_column = ""  # optional
static_system_message = ""  # optional
if system_message_column:
    columns.append(system_message_column)

# Turn Dataiku datasets into SFTTrainer datasets. 
training_dataset = recipe.get_inputs()[0]
df = training_dataset.get_dataframe(columns=columns)
train_dataset = datasets.Dataset.from_pandas(df)

validation_dataset = None
eval_dataset = None
if len(recipe.get_inputs()) > 1:
    validation_dataset = recipe.get_inputs()[1]
    df = validation_dataset.get_dataframe(columns=columns)
    eval_dataset = datasets.Dataset.from_pandas(df)

saved_model = recipe.get_outputs()[0]

##################
# Model loading
##################
model = AutoModelForCausalLM.from_pretrained(base_model_name)
tokenizer = AutoTokenizer.from_pretrained(base_model_name)

# It is mandatory to define a formatting function for fine-tuning,
# because ultimately, the model is fed with only one string:
# the concatenation of your input columns, in a specific format.
# Here, we leverage the apply_chat_template method, which depends on
# the tokenizer. For more information, see
# https://huggingface.co/docs/transformers/v4.43.3/chat_templating
formatting_func = formatters.ConversationalPromptFormatter(tokenizer.apply_chat_template,
                                                           *columns)

##################
# Fine-tune using SFTTrainer
##################
with saved_model.create_finetuned_llm_version(connection_name) as finetuned_llm_version:
    # feel free to customize, the only requirement is for a transformers model
    # to be created in finetuned_model_version.working_directory

    # TRL package offers many possibilities to configure the training job. 
    # For the full list,
    # see https://huggingface.co/docs/transformers/v4.43.3/en/main_classes/trainer#transformers.TrainingArguments
    train_conf = SFTConfig(
        output_dir=finetuned_llm_version.working_directory,
        save_safetensors=True,
        gradient_checkpointing=True,
        num_train_epochs=1,
        logging_steps=5,
        eval_strategy="steps" if eval_dataset else "no",
    )

    # LoRA is one of the most popular adapter-based methods to reduce memory-usage
    # and speed up fine-tuning
    peft_conf = LoraConfig(
        r=16,
        lora_alpha=32,
        lora_dropout=0.05,
        task_type="CAUSAL_LM",
        target_modules="all-linear",
    )

    trainer = SFTTrainer(
        model=model,
        tokenizer=tokenizer,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        formatting_func=formatting_func,
        args=train_conf,
        peft_config=peft_conf,
    )
    trainer.train()
    trainer.save_model()

    # Finally, we are logging training information to the Saved Model version
    config = finetuned_llm_version.config
    config["trainingDataset"] = training_dataset.short_name
    if validation_dataset:
        config["validationDataset"] = validation_dataset.short_name
    config["userMessageColumn"] = user_message_column
    config["assistantMessageColumn"] = assistant_message_column
    config["systemMessageColumn"] = system_message_column
    config["staticSystemMessage"] = static_system_message
    config["batchSize"] = trainer.state.train_batch_size
    config["eventLog"] = trainer.state.log_history

In these examples, we used popular techniques to optimize memory usage and processing time, like LoRA, quantization or gradient checkpointing. Note that the research and open source community is constantly coming up with new ways to make fine-tuning more accessible, while trying to avoid too much performance loss. For more information on other techniques you could try, see for instance the Transformers or PEFT documentations.

Reference documentation#

dataikuapi.dss.llm.DSSLLM(client, ...)

A handle to interact with a DSS-managed LLM.

dataikuapi.dss.llm.DSSLLMListItem(client, ...)

An item in a list of llms

dataikuapi.dss.llm.DSSLLMCompletionQuery(llm)

A handle to interact with a completion query.

dataikuapi.dss.llm.DSSLLMCompletionsQuery(llm)

A handle to interact with a multi-completion query.

dataikuapi.dss.llm.DSSLLMCompletionsQuerySingleQuery()

dataikuapi.dss.llm.DSSLLMCompletionQueryMultipartMessage(q, role)

dataikuapi.dss.llm.DSSLLMCompletionResponse(...)

A handle to interact with a completion response.

dataikuapi.dss.llm.DSSLLMEmbeddingsQuery(...)

A handle to interact with an embedding query.

dataikuapi.dss.llm.DSSLLMEmbeddingsResponse(...)

A handle to interact with an embedding query result.

dataikuapi.dss.knowledgebank.DSSKnowledgeBankListItem(...)

An item in a list of knowledege banks

dataikuapi.dss.knowledgebank.DSSKnowledgeBank(...)

A handle to interact with a DSS-managed knowledge bank.

dataikuapi.dss.langchain.DKUChatModel(*args, ...)

Langchain-compatible wrapper around Dataiku-mediated chat LLMs

dataikuapi.dss.langchain.DKULLM(*args, **kwargs)

Langchain-compatible wrapper around Dataiku-mediated LLMs

dataikuapi.dss.langchain.DKUEmbeddings(...)

Langchain-compatible wrapper around Dataiku-mediated embedding LLMs