Hybrid RAG Pipeline with Breakpoints
Last Updated: June 17, 2026
This notebook demonstrates how to setup breakpoints in a Haystack pipeline. In this case, we will set up break points in a hybrid retrieval-augmented generation (RAG) pipeline. The pipeline combines BM25 and embedding-based retrieval methods, then uses a transformer-based reranker and an LLM to generate answers.
Install packages
%%bash
pip install haystack-ai>=2.16.0
pip install "transformers[torch,sentencepiece]"
pip install sentence-transformers-haystack
Setup OpenAI API keys
import os
from getpass import getpass
# Enable writing pipeline snapshots to disk when a breakpoint is hit
os.environ["HAYSTACK_PIPELINE_SNAPSHOT_SAVE_ENABLED"] = "true"
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")
Import Required Libraries
First, let’s import all the necessary components from Haystack.
from haystack import Document, Pipeline
from haystack.components.builders import AnswerBuilder, ChatPromptBuilder
from haystack_integrations.components.embedders.sentence_transformers import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import DocumentJoiner
from haystack_integrations.components.rankers.sentence_transformers import SentenceTransformersSimilarityRanker
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever, InMemoryEmbeddingRetriever
from haystack.components.writers import DocumentWriter
from haystack.dataclasses import ChatMessage
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.document_stores.types import DuplicatePolicy
Document Store Initializations
Let’s create a simple document store with some sample documents and their embeddings.
def indexing():
"""
Indexing documents in a DocumentStore.
"""
print("Indexing documents...")
# Create sample documents
documents = [
Document(content="My name is Jean and I live in Paris. The weather today is 25°C."),
Document(content="My name is Mark and I live in Berlin. The weather today is 15°C."),
Document(content="My name is Giorgio and I live in Rome. The weather today is 30°C."),
]
# Initialize document store and components
document_store = InMemoryDocumentStore()
doc_writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP)
doc_embedder = SentenceTransformersDocumentEmbedder(model="intfloat/e5-base-v2", progress_bar=False)
# Build and run the ingestion pipeline
ingestion_pipe = Pipeline()
ingestion_pipe.add_component(instance=doc_embedder, name="doc_embedder")
ingestion_pipe.add_component(instance=doc_writer, name="doc_writer")
ingestion_pipe.connect("doc_embedder.documents", "doc_writer.documents")
ingestion_pipe.run({"doc_embedder": {"documents": documents}})
return document_store
A Hybrid Retrieval Pipeline
Now let’s build a hybrid RAG pipeline.
def hybrid_retrieval(doc_store):
"""
A simple pipeline for hybrid retrieval using BM25 and embeddings.
"""
# Initialize query embedder
query_embedder = SentenceTransformersTextEmbedder(model="intfloat/e5-base-v2", progress_bar=False)
# Define the prompt template for the LLM
template = [
ChatMessage.from_system(
"You are a helpful AI assistant. Answer the following question based on the given context information only. If the context is empty or just a '\n' answer with None, example: 'None'."
),
ChatMessage.from_user(
"""
Context:
{% for document in documents %}
{{ document.content }}
{% endfor %}
Question: {{question}}
"""
)
]
# Build the RAG pipeline
rag_pipeline = Pipeline()
# Add components to the pipeline
rag_pipeline.add_component(instance=InMemoryBM25Retriever(document_store=doc_store), name="bm25_retriever")
rag_pipeline.add_component(instance=query_embedder, name="query_embedder")
rag_pipeline.add_component(instance=InMemoryEmbeddingRetriever(document_store=doc_store), name="embedding_retriever")
rag_pipeline.add_component(instance=DocumentJoiner(sort_by_score=False), name="doc_joiner")
rag_pipeline.add_component(instance=SentenceTransformersSimilarityRanker(model="intfloat/simlm-msmarco-reranker", top_k=5), name="ranker")
rag_pipeline.add_component(instance=ChatPromptBuilder(template=template, required_variables=["question", "documents"]), name="prompt_builder", )
rag_pipeline.add_component(instance=OpenAIChatGenerator(), name="llm")
rag_pipeline.add_component(instance=AnswerBuilder(), name="answer_builder")
# Connect the components
rag_pipeline.connect("query_embedder", "embedding_retriever.query_embedding")
rag_pipeline.connect("embedding_retriever", "doc_joiner.documents")
rag_pipeline.connect("bm25_retriever", "doc_joiner.documents")
rag_pipeline.connect("doc_joiner", "ranker.documents")
rag_pipeline.connect("ranker", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
rag_pipeline.connect("llm.replies", "answer_builder.replies")
rag_pipeline.connect("doc_joiner", "answer_builder.documents")
return rag_pipeline
Running the pipeline with breakpoints
Now we demonstrate how to set breakpoints in a Haystack pipeline to inspect and debug the pipeline execution at specific points. Breakpoints allow you to pause execution, save the current state of pipeline, and later resume from where you left off.
We’ll run the pipeline with a breakpoint set at the query_embedder component. This will save the pipeline state before executing the query_embedder and raise PipelineBreakpointException to stop execution.
from haystack.dataclasses.breakpoints import Breakpoint
break_point = Breakpoint(component_name="query_embedder", visit_count=0, snapshot_file_path="snapshots/")
# Initialize document store and pipeline
doc_store = indexing()
pipeline = hybrid_retrieval(doc_store)
# Define the query
question = "Where does Mark live?"
data = {
"query_embedder": {"text": question},
"bm25_retriever": {"query": question},
"ranker": {"query": question, "top_k": 10},
"prompt_builder": {"question": question},
"answer_builder": {"query": question},
}
from haystack.core.errors import BreakpointException
try:
pipeline.run(data, break_point=break_point)
except BreakpointException as e:
print(f"Breakpoint hit at '{e.component}' - pipeline snapshot saved to the 'snapshots/' folder.")
Indexing documents...
Loading weights: 0%| | 0/199 [00:00<?, ?it/s]
Loading weights: 100%|██████████| 199/199 [00:00<00:00, 22286.89it/s]
Loading weights: 0%| | 0/201 [00:00<?, ?it/s]
Loading weights: 100%|██████████| 201/201 [00:00<00:00, 12578.41it/s]
Breakpoint hit at 'query_embedder' - pipeline snapshot saved to the 'snapshots/' folder.
This run should be interruped with a BreakpointException: Breaking at component query_embedder visit count 0 - and this will generate a JSON file in the “snapshots” directory containing a snapshot of the before running the component query_embedder.
The snapshot files, named after the component associated with the breakpoint, can be inspected and edited, and later injected into a pipeline and resume the execution from the point where the breakpoint was triggered.
!ls snapshots/
prompt_builder_0_2026_06_16_13_32_02.json
query_embedder_0_2026_06_16_13_31_45.json
query_embedder_0_2026_06_16_13_42_26.json
Resuming from a break point
We can then resume a pipeline from its saved pipeline_snapshot by passing it to the Pipeline.run() method. This will run the pipeline to the end.
# Load the pipeline_snapshot and continue execution
from haystack.core.pipeline.breakpoint import load_pipeline_snapshot
import glob, os
latest = max(glob.glob("snapshots/query_embedder_*.json"), key=os.path.getctime)
snapshot = load_pipeline_snapshot(latest)
result = pipeline.run(data={}, pipeline_snapshot=snapshot)
# Print the results
print(result['answer_builder']['answers'][0].data)
print(result['answer_builder']['answers'][0].meta)
Mark lives in Berlin.
{'model': 'gpt-5-mini-2025-08-07', 'index': 0, 'finish_reason': 'stop', 'usage': {'completion_tokens': 78, 'prompt_tokens': 122, 'total_tokens': 200, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 64, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'all_messages': [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Mark lives in Berlin.')], _name=None, _meta={'model': 'gpt-5-mini-2025-08-07', 'index': 0, 'finish_reason': 'stop', 'usage': {'completion_tokens': 78, 'prompt_tokens': 122, 'total_tokens': 200, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 64, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}})]}
Advanced Use Cases for Pipeline Breakpoints
Here are some advanced scenarios where pipeline breakpoints can be particularly valuable:
-
Set a breakpoint at the LLM to try results of different prompts and iterate in real time.
-
Place a breakpoint after the document retriever to examine and modify retrieved documents.
-
Set a breakpoint before a component to inject gold-standard inputs and isolate whether issues stem from input quality or downstream logic.
To demonstrate the use case stated in point 1, we reuse the same query pipeline with a new question. First, we run the pipeline with the prompt that we originally passed to the prompt_builder. Then, we define a breakpoint at the prompt_builder to try an alternative prompt. This allows us to compare the results generated by different prompts without running the whole pipeline again.
# Initialize document store and pipeline
doc_store = indexing()
pipeline = hybrid_retrieval(doc_store)
# Define the query
question = "What's the temperature difference between the warmest and coldest city?"
data = {
"query_embedder": {"text": question},
"bm25_retriever": {"query": question},
"ranker": {"query": question, "top_k": 10},
"prompt_builder": {"question": question},
"answer_builder": {"query": question},
}
break_point = Breakpoint(component_name="prompt_builder", visit_count=0, snapshot_file_path="snapshots/")
try:
pipeline.run(data, break_point=break_point)
except BreakpointException as e:
print(f"Breakpoint hit at '{e.component}' - pipeline snapshot saved to the 'snapshots/' folder.")
Indexing documents...
Loading weights: 0%| | 0/201 [00:00<?, ?it/s]
Loading weights: 100%|██████████| 201/201 [00:00<00:00, 15077.71it/s]
Breakpoint hit at 'prompt_builder' - pipeline snapshot saved to the 'snapshots/' folder.
Now we can manually insert a different template into the prompt_builder and inspect the results. To do this, we update the template input within the prompt_builder component in the state file.
template = ChatMessage.from_system(
"""You are a mathematical analysis assistant. Follow these steps:
1. Identify all temperatures mentioned
2. Find the maximum and minimum values
3. Calculate their difference
4. Format response as: 'The temperature difference is X°C (max Y°C in [city] - min Z°C in [city])'
Use ONLY the information provided in the context."""
)
Now we just load the snapshot file and resume the pipeline with the updated snapshot.
!ls snapshots/prompt_builder*
snapshots/prompt_builder_0_2026_06_16_13_32_02.json
snapshots/prompt_builder_0_2026_06_16_13_42_51.json
import glob, os
latest = max(glob.glob("snapshots/prompt_builder_*.json"), key=os.path.getctime)
snapshot = load_pipeline_snapshot(latest)
result = pipeline.run(data={}, pipeline_snapshot=snapshot)
print(result['answer_builder']['answers'][0].data)
15°C
