Customizing a Text Embedding Model for RAG Applications#

The embedding model used to create and retrieve context from a Knowledge Bank is a crucial building block of an RAG pipeline. Typical embedding models available out-of-the-box today have been pre-trained on generic data, which can limit their effectiveness for company or domain-specific use cases. Fine-tuning an embedding model can significantly improve the quality of retrieved documents and the coherence of generated responses.

In this tutorial, we walk you through customizing a text embedding model for an RAG application based on highly technical scientific content from fields like Physics, Chemistry, or Biology.

Prerequisites#

  • Dataiku > 13.3

  • The Scientific Question Answering dataset from the Ai2 non-profit AI research institute. It is available both on the HuggingFace Hub and Kaggle. It contains more than 13k crowdsourced science multiple-choice questions, with an additional paragraph that provides supporting evidence for the correct answer.

  • The Sentence Transformers package, maintained today by HuggingFace.

  • A code environment with a Python version 3.10 and with the following packages:

    accelerate>=0.21.0
    sentence-transformers
    datasets
    transformers
    

Preparing the embedding dataset#

The goal is to fine-tune the model to better find (and retrieve) the appropriate context for a given question. In other words, we want the model to learn the semantic similarity of highly technical scientific texts.

For this, we leverage the question and support columns of our input dataset as positive pairs of (query, context).

We used a Prepare recipe to keep only those two columns and renamed them anchor and positive, respectively. We also removed the rows where positive was empty and added an _id column. This is an important step since sentence_transformers expects input datasets and column names to match the exact format used by the target loss function for your use case.

We also used a Split recipe to create train and test datasets (80/20) randomly.

import dataiku
import os
import tempfile

from datasets import Dataset, concatenate_datasets

from sentence_transformers import SentenceTransformer
from sentence_transformers.evaluation import InformationRetrievalEvaluator
from sentence_transformers.losses import MultipleNegativesRankingLoss
from sentence_transformers import SentenceTransformerTrainingArguments, SentenceTransformerTrainer
from sentence_transformers.training_args import BatchSamplers


# Load the DSS datasets as Pandas dataframes
sci_qa_train_df = dataiku.Dataset("sci_q_and_a_train").get_dataframe()
sci_qa_test_df = dataiku.Dataset("sci_q_and_a_test").get_dataframe()

# And then from Pandas dataframes to Datasets (to be used by the trainer)
sci_qa_train = Dataset.from_pandas(sci_qa_train_df)
sci_qa_test = Dataset.from_pandas(sci_qa_test_df)

Loading the embedding model#

We use the embedding model all-MiniLM-L6-v2 from the Hugging Face Hub. We chose a small model that can be easily fine-tuned, even on a CPU. But you can try any model with the sentence-transformers tag.

model_id = "sentence-transformers/all-MiniLM-L6-v2"
model = SentenceTransformer(model_id)

Creating an evaluator and evaluating the base model#

We will use the InformationRetrievalEvaluator to evaluate the performance of our model. For a given set of queries, it will retrieve the top-k most similar document out of a corpus (k = 1 in our case). It then computes several metrics based on Mean Reciprocal Rank (MRR), Recall@k, Mean Average Precision (MAP), and Normalized Discounted Cumulative Gain (NDCG). NDCG is a good measure of ranking quality, so we’ll focus on this one here.

The queries come from our test set, and we create the corpus for potential retrieval from all “documents” from the train and test split.

sci_qa_corpus = concatenate_datasets([sci_qa_train, sci_qa_test])
# Convert the datasets to dictionaries
corpus = dict(
    zip(sci_qa_corpus["_id"], sci_qa_corpus["positive"])
)  # Our corpus (cid => document)
queries = dict(
    zip(sci_qa_test["_id"], sci_qa_test["anchor"])
)  # Our queries (qid => question)

# Create a mapping of relevant document for each query
relevant_docs = {}  # Query ID to relevant documents (qid => set([relevant_cids])
for q_id in queries:
    relevant_docs[q_id] = [q_id] # The only revelant document, 
                                 # in our case, has the same id as the query

# Given queries, a corpus and a mapping with relevant documents,
# the InformationRetrievalEvaluator computes different IR metrics.
ir_evaluator = InformationRetrievalEvaluator(
    queries=queries,
    corpus=corpus,
    relevant_docs=relevant_docs,
)
results = ir_evaluator(model)
# print(f"cosine_ndcg@10: {results['cosine_ndcg@10']}")
#   --> this gave use a baseline of ~0.67

Initializing the loss function#

We employ the standard MultipleNegativesRankingLoss, a well-established method in the field. This approach necessitates using positive text pairs with an anchor and a corresponding positive sample.

training_loss = MultipleNegativesRankingLoss(model)

Creating a trainer and fine-tuning the embedding model#

# Managed folder to store the fine-tuned model
folder = dataiku.Folder("a_valid_managed_folder_id")

with tempfile.TemporaryDirectory() as temp_dir:
    
    # Define training arguments
    args = SentenceTransformerTrainingArguments(
        # Required parameter:
        output_dir=temp_dir,
        
        # Optional training parameters:
        num_train_epochs=2,                        # number of epochs
        per_device_train_batch_size=8,             # train batch size
        gradient_accumulation_steps=8,             # for a global batch size of 64 (= 8 * 8)
        per_device_eval_batch_size=8,              # evaluation batch size
        learning_rate=2e-5,                        # learning rate
        warmup_ratio=0.1,                          # warmup ratio
        fp16=True,                                 # use fp16 precision (set to False if your GPU can't run on FP16)
        bf16=False,                                # use bf16 precision (set to True if your GPU can run on BF16)
        batch_sampler=BatchSamplers.NO_DUPLICATES, # losses that use "in-batch negatives" benefit from no duplicates
        
        # Optional tracking/debugging parameters:
        eval_strategy="epoch",                     # evaluate after each epoch
        save_strategy="no",                        # save after each epoch
        save_total_limit=2,                        # save the last 2 models
        save_only_model=True,                      # for each checkpoints, save only the model (no optimizer.pt/scheduler.pt) 
        logging_steps=100,                         # log every 100 steps
    )
    
    # Create a trainer & train. 
    embedding_trainer = SentenceTransformerTrainer(
        model=model,
        args=args,
        train_dataset=sci_qa_train.select_columns(
            ["anchor", "positive"]
        ),  # training dataset,
        loss=training_loss,
        evaluator=ir_evaluator,
    )
    embedding_trainer.train()
    
    # Save the fine-tuned model in the managed folder
    embedding_trainer.save_model(output_dir=temp_dir)
    for root, dirs, files in os.walk(temp_dir):
        for file in files:
            source_path = os.path.join(root, file)
            target_path = os.path.relpath(source_path, temp_dir)
            folder.upload_file(target_path, source_path)

Here, we don’t provide an eval dataset directly; we only provide the evaluator. It gives us more interesting metrics. The total number of training steps is:

\[\text{nb_of_epochs} \times \frac{\text{size_of_training_dataset}}{(\text{batch_size} \times \text{gradient_accumulation_steps})}\]

Evaluating the model against baseline#

results_ft = ir_evaluator(embedding_trainer.model)
# print(f"cosine_ndcg@10: {results['cosine_ndcg@10']}") 
#   --> this gave use a baseline of ~0.77, which represents a 15% performance increase !

Wrapping up#

This tutorial demonstrated how to fine-tune an embedding model on technical, scientific content using the Sentence Transformers package. By following the steps to prepare your dataset, load the model, and fine-tune it, you can enhance document retrieval and response coherence in your Retrieval-Augmented Generation (RAG) applications for various domain-specific use cases.

Here is the complete code for this tutorial:

app.py
import dataiku
import os
import tempfile

from datasets import Dataset, concatenate_datasets

from sentence_transformers import SentenceTransformer
from sentence_transformers.evaluation import InformationRetrievalEvaluator
from sentence_transformers.losses import MultipleNegativesRankingLoss
from sentence_transformers import SentenceTransformerTrainingArguments, SentenceTransformerTrainer
from sentence_transformers.training_args import BatchSamplers


# Load the DSS datasets as Pandas dataframes
sci_qa_train_df = dataiku.Dataset("sci_q_and_a_train").get_dataframe()
sci_qa_test_df = dataiku.Dataset("sci_q_and_a_test").get_dataframe()

# And then from Pandas dataframes to Datasets (to be used by the trainer)
sci_qa_train = Dataset.from_pandas(sci_qa_train_df)
sci_qa_test = Dataset.from_pandas(sci_qa_test_df)

model_id = "sentence-transformers/all-MiniLM-L6-v2"
model = SentenceTransformer(model_id)

sci_qa_corpus = concatenate_datasets([sci_qa_train, sci_qa_test])
# Convert the datasets to dictionaries
corpus = dict(
    zip(sci_qa_corpus["_id"], sci_qa_corpus["positive"])
)  # Our corpus (cid => document)
queries = dict(
    zip(sci_qa_test["_id"], sci_qa_test["anchor"])
)  # Our queries (qid => question)

# Create a mapping of relevant document for each query
relevant_docs = {}  # Query ID to relevant documents (qid => set([relevant_cids])
for q_id in queries:
    relevant_docs[q_id] = [q_id] # The only revelant document, 
                                 # in our case, has the same id as the query

# Given queries, a corpus and a mapping with relevant documents,
# the InformationRetrievalEvaluator computes different IR metrics.
ir_evaluator = InformationRetrievalEvaluator(
    queries=queries,
    corpus=corpus,
    relevant_docs=relevant_docs,
)
results = ir_evaluator(model)
# print(f"cosine_ndcg@10: {results['cosine_ndcg@10']}")
#   --> this gave use a baseline of ~0.67

training_loss = MultipleNegativesRankingLoss(model)

# Managed folder to store the fine-tuned model
folder = dataiku.Folder("a_valid_managed_folder_id")

with tempfile.TemporaryDirectory() as temp_dir:
    
    # Define training arguments
    args = SentenceTransformerTrainingArguments(
        # Required parameter:
        output_dir=temp_dir,
        
        # Optional training parameters:
        num_train_epochs=2,                        # number of epochs
        per_device_train_batch_size=8,             # train batch size
        gradient_accumulation_steps=8,             # for a global batch size of 64 (= 8 * 8)
        per_device_eval_batch_size=8,              # evaluation batch size
        learning_rate=2e-5,                        # learning rate
        warmup_ratio=0.1,                          # warmup ratio
        fp16=True,                                 # use fp16 precision (set to False if your GPU can't run on FP16)
        bf16=False,                                # use bf16 precision (set to True if your GPU can run on BF16)
        batch_sampler=BatchSamplers.NO_DUPLICATES, # losses that use "in-batch negatives" benefit from no duplicates
        
        # Optional tracking/debugging parameters:
        eval_strategy="epoch",                     # evaluate after each epoch
        save_strategy="no",                        # save after each epoch
        save_total_limit=2,                        # save the last 2 models
        save_only_model=True,                      # for each checkpoints, save only the model (no optimizer.pt/scheduler.pt) 
        logging_steps=100,                         # log every 100 steps
    )
    
    # Create a trainer & train. 
    embedding_trainer = SentenceTransformerTrainer(
        model=model,
        args=args,
        train_dataset=sci_qa_train.select_columns(
            ["anchor", "positive"]
        ),  # training dataset,
        loss=training_loss,
        evaluator=ir_evaluator,
    )
    embedding_trainer.train()
    
    # Save the fine-tuned model in the managed folder
    embedding_trainer.save_model(output_dir=temp_dir)
    for root, dirs, files in os.walk(temp_dir):
        for file in files:
            source_path = os.path.join(root, file)
            target_path = os.path.relpath(source_path, temp_dir)
            folder.upload_file(target_path, source_path)

results_ft = ir_evaluator(embedding_trainer.model)
# print(f"cosine_ndcg@10: {results['cosine_ndcg@10']}") 
#   --> this gave use a baseline of ~0.77, which represents a 15% performance increase !