Using the LLM Mesh to parse and output JSON objects#
Introduction#
In this tutorial, you will process structured objects and receive JSON output from a model via the LLM Mesh. As autoregressive text generation models, LLMs almost often produce free-form text responses. You can ensure consistent results using JSON for both input and output, especially by specifying an output schema. Defined schemas are also easy to process, less error-prone and especially useful for saving the output for data analysis or use in downstream applications. The tutorial showcases this technique by performing sentiment analysis on product reviews. It could be extended to other tasks that process or output text.
Prerequisites#
Dataiku >= 13.3
Project permissions for “Read project content” and “Write project content”
An existing LLM Mesh connection that supports JSON output (OpenAI, Azure OpenAI, Vertex Gemini as of 13.3, with experimental support on Hugging Face models)
Data extraction#
This tutorial uses the Amazon Review
Dataset. The Python script below
downloads one of the subset
datasets,
creates a small sample of reviews and uploads it as a dataset named
amznreviews-sample
. To use this script, you must create a Python recipe from
the Flow with an output dataset named amznreviews-sample
and copy the code
into the recipe’s editor. Pay attention to how the reviews are stored as JSON
with keys for product category and review text.
import dataiku
import requests
import gzip
import json
import random
# URL & filenames to download & create
URL = 'http://jmcauley.ucsd.edu/data/amazon_v2/categoryFilesSmall/Luxury_Beauty_5.json.gz'
FILE_NAME = 'Luxury_Beauty_5.json.gz'
FILE_UNZIP = 'Luxury_Beauty_5.json'
PROD_CATEGORY = "Luxury Beauty"
SAMPLE_SIZE = 47
DATASET_NAME = "amznreviews-sample"
response = requests.get(URL)
with open(FILE_NAME, 'wb') as f:
f.write(response.content)
# Unzip the archive
with gzip.open(FILE_NAME, 'rb') as gz_file:
with open(FILE_UNZIP, "wb") as f_out:
f_out.write(gz_file.read())
with open(FILE_UNZIP, "r", encoding="utf-8") as f:
data = []
for line in f:
record = json.loads(line)
review = {
"product_category": PROD_CATEGORY,
"text": record.get("reviewText", "")
}
data.append({
"review": json.dumps(review),
"sentiment_score": record.get("overall", ""),
"sentiment": "negative" if record["overall"] in [1, 2]
else "neutral" if record["overall"] == 3
else "positive"
})
# Get a random sample of records
sample_data = random.sample(data, SAMPLE_SIZE)
# Get the dataset object
dataset = dataiku.Dataset(DATASET_NAME)
# Define the schema for the dataset
schema = [{"name": "review", "type": "string"},
{"name": "sentiment_score", "type": "int"},
{"name": "sentiment", "type": "string"}]
# Write the schema to the dataset
dataset.write_schema(schema)
# Write the rows to the dataset
with dataset.get_writer() as writer:
for row in sample_data:
writer.write_row_dict(row)
Setting up the schema for JSON output#
Note
Similar to the last script, you’ll create another Python recipe in the Flow with amznreviews-sample
as the input dataset and amznreviews-sample-llm-scored
as the output. Copy the scoring script (score
) available at the end of this tutorial into the recipe’s editor. The sections below will discuss only relevant snippets of code.
Next, you will use the LLM Mesh to analyze product reviews and generate structured JSON responses. The key to getting consistent, structured output is defining the JSON schema beforehand when setting up the LLM’s completion task. It ensures that the output follows a predefined schema, making it easier to process and validate. The goal is to direct the LLM’s response to a consistent structure, since the output will saved as a structured dataset.
# Define the JSON schema
SCHEMA = {
"type": "object",
"properties": {
"llm_sentiment": {
"type": "string",
"enum": ["positive", "negative", "neutral"]
},
"llm_explanation": {
"type": "string"
},
"llm_confidence": {
"type": "number"
}
},
"required": ["llm_sentiment", "llm_explanation", "llm_confidence"],
"additionalProperties": False
}
Getting structured output#
Once you define the schema of your output, you’ll need to outline how you want the LLM to process the JSON input and what keys the output should contain. This is done using a system prompt:
# Outline the prompt
PROMPT = """
You are an assistant that classifies reviews in JSON format according to their sentiment.
Respond with a JSON object containing the following fields:
- llm_explanation: a very short explanation for the sentiment
- llm_sentiment: should only be either "positive" or "negative" or "neutral" without punctuation
- llm_confidence: a float between 0-1 showing your confidence in the sentiment score
"""
Now, you can specify that the LLM output needs to matching the schema you
defined using the
with_json_output()
method.
Here’s what a test of this setup could look like:
completion = llm.new_completion()
completion.with_json_output(schema=SCHEMA)
completion.with_message(PROMPT, role="system")
review_json = {
"text": "This is an amazing product! It is exactly what I wanted.",
"category": "Luxury_Beauty"
}
completion.with_message(json.dumps(review_json), role="user")
response = completion.execute()
result = response.json
print(f"Sentiment: {result['llm_sentiment']}")
print(f"Explanation: {result['llm_explanation']}")
print(f"Confidence: {result['llm_confidence']}")
# Sentiment: positive
# Explanation: The review expresses strong satisfaction with the product.
# Confidence: 0.95
Processing Multiple Reviews#
The new_completions()
method sends multiple queries in a single request for
batch processing multiple reviews from the extracted sample. This approach
allows you to send multiple reviews in one batch to the LLM, which is more
efficient than sending individual requests, as in the example above.
It is also helpful in parsing or creating large datasets since each review is processed consistently according to the schema you defined.
# Use a multi-completion query
completions = llm.new_completions()
completions.with_json_output(schema=SCHEMA)
for row in ds_in.iter_rows():
# Load review JSON
review_data = json.loads(row["review"])
comp = completions.new_completion()
comp.with_message(PROMPT, role="system")
comp.with_message(json.dumps(review_data), role="user")
# Execute all completions in batch
responses = completions.execute()
results = [r.json for r in responses.responses]
Saving scores and other results#
The results can be saved back to a Dataiku dataset. You’ll define the schema for the output dataset, ensuring that each review’s scores and analysis are stored in a structured format. The complete JSON output from the LLM is also saved.
# Write the results to the output dataset
df_in = ds_in.get_dataframe()
df_out = df_in.copy()
df_out["llm_json_output"] = [json.dumps(r) for r in results]
df_out["llm_sentiment"] = [r.get("llm_sentiment") for r in results]
df_out["llm_explanation"] = [r.get("llm_explanation") for r in results]
df_out["llm_confidence"] = [r.get("llm_confidence") for r in results]
ds_out.write_with_schema(df_out)
Wrapping up#
Using Dataiku’s LLM Mesh with structured output provides several benefits, including built-in validation through JSON schema. You could extend this example by trying different schema definitions and including options like strict checking.
extract
import dataiku
import requests
import gzip
import json
import random
# URL & filenames to download & create
URL = 'http://jmcauley.ucsd.edu/data/amazon_v2/categoryFilesSmall/Luxury_Beauty_5.json.gz'
FILE_NAME = 'Luxury_Beauty_5.json.gz'
FILE_UNZIP = 'Luxury_Beauty_5.json'
PROD_CATEGORY = "Luxury Beauty"
SAMPLE_SIZE = 47
DATASET_NAME = "amznreviews-sample"
response = requests.get(URL)
with open(FILE_NAME, 'wb') as f:
f.write(response.content)
# Unzip the archive
with gzip.open(FILE_NAME, 'rb') as gz_file:
with open(FILE_UNZIP, "wb") as f_out:
f_out.write(gz_file.read())
with open(FILE_UNZIP, "r", encoding="utf-8") as f:
data = []
for line in f:
record = json.loads(line)
review = {
"product_category": PROD_CATEGORY,
"text": record.get("reviewText", "")
}
data.append({
"review": json.dumps(review),
"sentiment_score": record.get("overall", ""),
"sentiment": "negative" if record["overall"] in [1, 2]
else "neutral" if record["overall"] == 3
else "positive"
})
# Get a random sample of records
sample_data = random.sample(data, SAMPLE_SIZE)
# Get the dataset object
dataset = dataiku.Dataset(DATASET_NAME)
# Define the schema for the dataset
schema = [{"name": "review", "type": "string"},
{"name": "sentiment_score", "type": "int"},
{"name": "sentiment", "type": "string"}]
# Write the schema to the dataset
dataset.write_schema(schema)
# Write the rows to the dataset
with dataset.get_writer() as writer:
for row in sample_data:
writer.write_row_dict(row)
score
import dataiku
import json
# Get DSS client and get project
client = dataiku.api_client()
project = client.get_default_project()
# Get the LLM from the project
LLM_ID = "" # Set your LLM ID here
llm = project.get_llm(LLM_ID)
# Set up datasets
ds_in = dataiku.Dataset("amznreviews-sample")
ds_out = dataiku.Dataset("amznreviews-sample-llm-scored")
# Define the JSON schema
SCHEMA = {
"type": "object",
"properties": {
"llm_sentiment": {
"type": "string",
"enum": ["positive", "negative", "neutral"]
},
"llm_explanation": {
"type": "string"
},
"llm_confidence": {
"type": "number"
}
},
"required": ["llm_sentiment", "llm_explanation", "llm_confidence"],
"additionalProperties": False
}
# Outline the prompt
PROMPT = """
You are an assistant that classifies reviews in JSON format according to their sentiment.
Respond with a JSON object containing the following fields:
- llm_explanation: a very short explanation for the sentiment
- llm_sentiment: should only be either "positive" or "negative" or "neutral" without punctuation
- llm_confidence: a float between 0-1 showing your confidence in the sentiment score
"""
# Use a multi-completion query
completions = llm.new_completions()
completions.with_json_output(schema=SCHEMA)
for row in ds_in.iter_rows():
# Load review JSON
review_data = json.loads(row["review"])
comp = completions.new_completion()
comp.with_message(PROMPT, role="system")
comp.with_message(json.dumps(review_data), role="user")
# Execute all completions in batch
responses = completions.execute()
results = [r.json for r in responses.responses]
# Write the results to the output dataset
df_in = ds_in.get_dataframe()
df_out = df_in.copy()
df_out["llm_json_output"] = [json.dumps(r) for r in results]
df_out["llm_sentiment"] = [r.get("llm_sentiment") for r in results]
df_out["llm_explanation"] = [r.get("llm_explanation") for r in results]
df_out["llm_confidence"] = [r.get("llm_confidence") for r in results]
ds_out.write_with_schema(df_out)