LangChain Practitioner's Handbook
A production-ready reference covering enterprise LLM integration, RAG pipelines, memory, tools, MCP, guardrails, and multi-agent architectures with LangGraph.
Table of Contents
This handbook is organized into seven production-focused modules. Each module contains full, copy-pasteable code using the latest LangChain Expression Language (LCEL) and langchain-core imports.
RunnableWithMessageHistory, long-term persistence via Redis/PostgreSQL stores.@tool and StructuredTool, tool binding to LLMs, and MCP integration.BaseCallbackHandler for telemetry, auditing, and metadata injection.PydanticOutputParser, Guardrails AI integration, native fallbacks to prevent hallucinations and enforce schema compliance.AgentExecutor, multi-agent workflows using LangGraph state graphs.prompt | model | parser), modern langchain-core imports, and avoids deprecated LLMChain, AgentType.ZERO_SHOT_REACT_DESCRIPTION, or legacy chain constructors. Type hints are included for production readiness.Installation & Package Map
LangChain 0.3+ uses a modular package architecture. Install only what you need.
# Core framework — always needed pip install langchain-core langchain # Enterprise model providers pip install langchain-openai # Azure OpenAI + OpenAI pip install langchain-google-vertexai # Google Vertex AI # RAG essentials pip install langchain-chroma # ChromaDB vector store pip install langchain-community # Document loaders, misc integrations # Agents & multi-agent pip install langgraph # State-graph multi-agent orchestration # Memory persistence pip install langchain-redis # Redis-backed chat history pip install langchain-postgres # PostgreSQL-backed chat history # MCP integration pip install langchain-mcp-adapters # Bridge MCP servers ↔ LangChain tools
| Package | Purpose | Key Imports |
|---|---|---|
langchain-core |
Base abstractions: runnables, prompts, output parsers, callbacks | ChatPromptTemplate, StrOutputParser, RunnablePassthrough |
langchain-openai |
OpenAI + Azure OpenAI chat models and embeddings | AzureChatOpenAI, AzureOpenAIEmbeddings |
langchain-google-vertexai |
Google Vertex AI (Gemini) chat models | ChatVertexAI, VertexAIEmbeddings |
langgraph |
Multi-agent state-graph orchestration | StateGraph, END, MessagesState |
langchain-mcp-adapters |
Model Context Protocol bridge | MultiServerMCPClient |
Module 1 — Core Models & Enterprise Integration
Enterprise LLM deployments require secure credential handling, region-specific endpoints, and grounding against authoritative data sources. This module covers Azure OpenAI, Google Vertex AI, and enterprise grounding patterns.
1.1 — Azure OpenAI Setup
Use AzureChatOpenAI from langchain-openai. Credentials are loaded from environment variables — never hardcode secrets. The azure_endpoint, api_version, and azure_deployment map to your Azure OpenAI resource.
Environment Variables
# .env — load via python-dotenv or set in your shell / CI pipeline
AZURE_OPENAI_API_KEY=your-api-key-here
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_VERSION=2024-12-01-preview
AZURE_OPENAI_DEPLOYMENT=gpt-4o
AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT=text-embedding-3-large
Basic Chat Call
import os from dotenv import load_dotenv from langchain_openai import AzureChatOpenAI from langchain_core.messages import HumanMessage, SystemMessage # Load credentials from .env — keeps secrets out of source code load_dotenv() # Initialize the Azure OpenAI chat model llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], # api_key is auto-read from AZURE_OPENAI_API_KEY env var temperature=0.0, # Deterministic for enterprise use max_tokens=1024, ) # Simple invocation with typed messages messages: list = [ SystemMessage(content="You are a helpful enterprise assistant."), HumanMessage(content="Summarize our Q4 revenue in three bullet points."), ] response = llm.invoke(messages) print(response.content)
DefaultAzureCredential from azure-identity. Pass azure_ad_token_provider to AzureChatOpenAI to use workload identity, managed identity, or service principal auth without any stored secrets.Azure AD Token Provider (Managed Identity)
import os from azure.identity import DefaultAzureCredential, get_bearer_token_provider from langchain_openai import AzureChatOpenAI # DefaultAzureCredential auto-detects managed identity, CLI login, etc. credential = DefaultAzureCredential() token_provider = get_bearer_token_provider( credential, "https://cognitiveservices.azure.com/.default", ) llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], azure_ad_token_provider=token_provider, # No API key needed temperature=0.0, ) response = llm.invoke("What is our current Azure spend?") print(response.content)
LCEL Chain — Prompt → Model → Parser
from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser # Define a reusable prompt template prompt = ChatPromptTemplate.from_messages([ ("system", "You are a concise enterprise analyst."), ("human", "{question}"), ]) # LCEL pipe: prompt → model → string output parser chain = prompt | llm | StrOutputParser() # Invoke with a dict of template variables result: str = chain.invoke({"question": "Explain Azure OpenAI rate limits."}) print(result)
1.2 — Google Vertex AI Setup
Vertex AI uses Google Cloud Application Default Credentials (ADC). Authenticate via gcloud auth application-default login locally, or use a service account in production. The ChatVertexAI wrapper connects to Gemini models.
Authentication
# Terminal: authenticate locally (one-time) gcloud auth application-default login # Or set a service account key (CI/CD environments) export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json # Ensure your project is set gcloud config set project your-gcp-project-id
Vertex AI Chat Model
from langchain_google_vertexai import ChatVertexAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser # Initialize Gemini 2.0 Flash via Vertex AI # Credentials auto-loaded from ADC (gcloud login or service account) llm = ChatVertexAI( model_name="gemini-2.0-flash", project="your-gcp-project-id", location="us-central1", temperature=0.0, max_output_tokens=1024, ) # Build an LCEL chain identical to the Azure example prompt = ChatPromptTemplate.from_messages([ ("system", "You are a GCP infrastructure expert."), ("human", "{question}"), ]) chain = prompt | llm | StrOutputParser() result: str = chain.invoke({"question": "Explain Vertex AI quotas for Gemini 2.0."}) print(result)
gemini-2.0-flash for fast, cost-effective tasks or gemini-2.5-pro for complex reasoning. Both support tool calling natively via the LangChain wrapper.1.3 — Enterprise Grounding Samples
Grounding connects your LLM to authoritative data sources so responses are factual and traceable. Below are patterns for both Azure (via Azure AI Search) and Google (via Vertex AI grounding with Google Search).
Pattern: Use Azure AI Search as a retriever to ground Azure OpenAI responses against your indexed enterprise documents.
Azure AI Search Grounding
import os from dotenv import load_dotenv from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings from langchain_community.vectorstores.azuresearch import AzureSearch from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough load_dotenv() # 1. Embeddings model for vectorizing queries embeddings = AzureOpenAIEmbeddings( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], ) # 2. Azure AI Search as a vector store / retriever vector_store = AzureSearch( azure_search_endpoint=os.environ["AZURE_SEARCH_ENDPOINT"], azure_search_key=os.environ["AZURE_SEARCH_KEY"], index_name="enterprise-docs", embedding_function=embeddings.embed_query, search_type="hybrid", # Combines keyword + vector for best recall ) retriever = vector_store.as_retriever(search_kwargs={"k": 5}) # 3. LLM for generating grounded answers llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], temperature=0.0, ) # 4. Grounded prompt — instructs the model to use ONLY retrieved context prompt = ChatPromptTemplate.from_messages([ ("system", """Answer the question using ONLY the provided context. If the context does not contain the answer, say "I don't have enough information." Always cite the source document. Context: {context}"""), ("human", "{question}"), ]) # Helper: format retrieved documents into a single string def format_docs(docs) -> str: return "\n\n".join( f"[{doc.metadata.get('source', 'unknown')}]: {doc.page_content}" for doc in docs ) # 5. LCEL RAG chain: retrieve → format → prompt → LLM → parse grounded_chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() ) # Invoke — the response is grounded against Azure AI Search documents answer = grounded_chain.invoke("What is our data retention policy?") print(answer)
Pattern: Use Vertex AI's built-in Google Search grounding to augment Gemini responses with real-time web data.
Vertex AI with Google Search Grounding
from langchain_google_vertexai import ChatVertexAI from google.cloud.aiplatform_v1beta1.types import ( GoogleSearchRetrieval, GroundingSpec, Tool as VertexTool, ) from langchain_core.messages import HumanMessage # Configure Google Search grounding as a Vertex AI tool google_search_tool = VertexTool( google_search_retrieval=GoogleSearchRetrieval( disable_attribution=False, # Include source citations ) ) grounding_spec = GroundingSpec(grounding_tools=[google_search_tool]) # Initialize ChatVertexAI with grounding enabled llm = ChatVertexAI( model_name="gemini-2.0-flash", project="your-gcp-project-id", location="us-central1", temperature=0.0, additional_headers={}, ) # Pass grounding config via model_kwargs at invocation time response = llm.invoke( [HumanMessage(content="What were Google's Q3 2025 earnings?")], model_kwargs={"tools": [google_search_tool]}, ) print(response.content) # Access grounding metadata (sources) from the response if hasattr(response, "response_metadata"): grounding_meta = response.response_metadata.get("grounding_metadata") if grounding_meta: for chunk in grounding_meta.get("grounding_chunks", []): print(f" Source: {chunk.get('web', {}).get('uri', 'N/A')}")
dynamic_retrieval_config with a threshold (0.0–1.0) to control when Google Search is triggered. A lower threshold means the model grounds more aggressively. This is configured at the API level via DynamicRetrievalConfig.Side-by-Side Comparison
| Aspect | Azure AI Search Grounding | Vertex AI Google Search |
|---|---|---|
| Data source | Your indexed enterprise documents | Live web via Google Search |
| Latency | Low (pre-indexed, hybrid retrieval) | Moderate (real-time web fetch) |
| Privacy | Data stays in your Azure tenant | Queries hit Google Search APIs |
| Best for | Internal knowledge bases, compliance docs, policy lookup | Real-time facts, public information, market data |
| Setup effort | Requires index creation + embedding pipeline | Zero setup — built into Vertex AI |
| LangChain pattern | LCEL retriever chain | Native tool via model_kwargs |
Module 2 — Retrieval-Augmented Generation (RAG)
RAG is the dominant pattern for grounding LLMs against your own data without fine-tuning. This module walks through the full pipeline — from raw documents to a production retrieval chain — using LCEL and modern langchain-core abstractions.
2.1 — Document Loading
LangChain provides loaders for PDFs, web pages, CSVs, Notion, and more. Each loader returns a list of Document objects with page_content and metadata.
from langchain_community.document_loaders import ( PyPDFLoader, TextLoader, WebBaseLoader, CSVLoader, ) # Load a PDF — each page becomes one Document pdf_docs = PyPDFLoader("docs/architecture-guide.pdf").load() print(f"Loaded {len(pdf_docs)} pages from PDF") # Load a plain text file txt_docs = TextLoader("docs/changelog.md", encoding="utf-8").load() # Load a web page (requires beautifulsoup4) web_docs = WebBaseLoader("https://docs.langchain.com/overview").load() # Load a CSV — each row becomes one Document csv_docs = CSVLoader("data/customers.csv").load() # Combine all sources into a unified corpus all_docs = pdf_docs + txt_docs + web_docs + csv_docs print(f"Total documents loaded: {len(all_docs)}")
.lazy_load() instead of .load() to get a generator that yields documents one at a time, avoiding memory spikes.2.2 — Text Splitting
Documents are rarely the right size for embedding. Splitters break them into semantically coherent chunks with configurable overlap to preserve context across boundaries.
from langchain_text_splitters import RecursiveCharacterTextSplitter # RecursiveCharacterTextSplitter tries to split on paragraphs, then # sentences, then words — preserving semantic units as much as possible. text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, # Max characters per chunk chunk_overlap=200, # Overlap to retain context across chunks separators=[ # Priority order of split boundaries "\n\n", # Paragraph breaks first "\n", # Line breaks second ". ", # Sentence boundaries third " ", # Word boundaries last resort ], length_function=len, ) # Split all loaded documents — metadata is preserved on each chunk chunks = text_splitter.split_documents(all_docs) print(f"Split into {len(chunks)} chunks") # Inspect a chunk print(f"Chunk 0 length: {len(chunks[0].page_content)} chars") print(f"Chunk 0 metadata: {chunks[0].metadata}")
| Parameter | Guidance | Typical Range |
|---|---|---|
chunk_size |
Larger chunks retain more context but reduce retrieval precision | 500 – 2000 chars |
chunk_overlap |
Overlap prevents information loss at chunk boundaries | 10–20% of chunk_size |
separators |
Tune to your doc format (e.g., add "## " for Markdown headers) |
Paragraphs → sentences → words |
2.3 — Embeddings
Embeddings convert text chunks into dense vectors for similarity search. Choose the embedding model that matches your vector store and retrieval needs.
import os from dotenv import load_dotenv load_dotenv() # ── Option A: Azure OpenAI Embeddings ───────────────────────────── from langchain_openai import AzureOpenAIEmbeddings azure_embeddings = AzureOpenAIEmbeddings( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], ) # ── Option B: Google Vertex AI Embeddings ───────────────────────── from langchain_google_vertexai import VertexAIEmbeddings vertex_embeddings = VertexAIEmbeddings( model_name="text-embedding-005", project="your-gcp-project-id", location="us-central1", ) # ── Option C: Open-source / local (e.g., sentence-transformers) ─── from langchain_community.embeddings import HuggingFaceEmbeddings local_embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={"device": "cpu"}, # Use "cuda" for GPU ) # Pick one for the rest of this module embeddings = azure_embeddings
2.4 — Vector Store Initialization
The vector store indexes your embedded chunks for fast similarity retrieval. Chroma is shown here for local development; swap in Pinecone, Weaviate, pgvector, or Azure AI Search for production.
from langchain_chroma import Chroma # Create a persistent Chroma vector store from the chunks # This embeds all chunks and stores them on disk at ./chroma_db vector_store = Chroma.from_documents( documents=chunks, embedding=embeddings, persist_directory="./chroma_db", # Persistent storage path collection_name="enterprise_docs", # Logical namespace ) print(f"Indexed {vector_store._collection.count()} chunks in Chroma") # To reload an existing store (no re-embedding needed) vector_store = Chroma( persist_directory="./chroma_db", embedding_function=embeddings, collection_name="enterprise_docs", )
langchain-pinecone for Pinecone, langchain-postgres with pgvector for PostgreSQL, or langchain-community's AzureSearch for Azure AI Search (shown in Module 1). Chroma is ideal for local prototyping and tests.2.5 — Standard RAG Retrieval Chain (LCEL)
This is the canonical RAG pattern: retrieve relevant chunks, inject them into a prompt, and generate a grounded answer. The entire pipeline is a single LCEL expression.
from langchain_openai import AzureChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough # 1. Create a retriever from the vector store retriever = vector_store.as_retriever( search_type="similarity", # or "mmr" for max marginal relevance search_kwargs={"k": 5}, # Return top-5 most relevant chunks ) # 2. LLM — reuse the Azure OpenAI model from Module 1 llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], temperature=0.0, ) # 3. RAG prompt — instructs the model to answer from retrieved context only rag_prompt = ChatPromptTemplate.from_messages([ ("system", """You are a helpful assistant. Answer the user's question using ONLY the provided context. If the answer is not in the context, say "I don't have enough information to answer that." Context: {context}"""), ("human", "{question}"), ]) # 4. Helper to format retrieved Document objects into a single string def format_docs(docs: list) -> str: """Join document contents with source metadata for traceability.""" return "\n\n---\n\n".join( f"[Source: {d.metadata.get('source', 'unknown')}]\n{d.page_content}" for d in docs ) # 5. LCEL RAG chain # - "context" branch: query → retriever → format_docs # - "question" branch: query passes through unchanged # Both feed into the prompt → LLM → string parser rag_chain = ( { "context": retriever | format_docs, "question": RunnablePassthrough(), } | rag_prompt | llm | StrOutputParser() ) # 6. Invoke the chain answer: str = rag_chain.invoke("What are the key architectural decisions in our system?") print(answer)
.invoke() with .stream() to get token-by-token output: for chunk in rag_chain.stream("question"): print(chunk, end=""). Works out of the box with LCEL — no code changes needed to the chain itself.Full Pipeline — End-to-End Script
Here's every step combined in a single, copy-pasteable script:
""" Complete RAG pipeline — load, split, embed, store, retrieve, generate. Run: pip install langchain langchain-openai langchain-chroma langchain-community """ import os from dotenv import load_dotenv from langchain_community.document_loaders import PyPDFLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings from langchain_chroma import Chroma from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough load_dotenv() # ── Load ─────────────────────────────────────────────────────────── docs = PyPDFLoader("docs/architecture-guide.pdf").load() # ── Split ────────────────────────────────────────────────────────── splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) chunks = splitter.split_documents(docs) # ── Embed + Store ───────────────────────────────────────────────── embeddings = AzureOpenAIEmbeddings( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], ) store = Chroma.from_documents(chunks, embeddings, persist_directory="./chroma_db") # ── Retrieve + Generate ─────────────────────────────────────────── retriever = store.as_retriever(search_kwargs={"k": 5}) llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], temperature=0.0, ) prompt = ChatPromptTemplate.from_messages([ ("system", "Answer using ONLY this context:\n{context}"), ("human", "{question}"), ]) def format_docs(docs): return "\n\n".join(d.page_content for d in docs) chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() ) print(chain.invoke("Summarize the key architectural decisions."))
2.6 — Advanced: Retrieval with Metadata Filtering
In production, you rarely want to search the entire corpus. Metadata filters let you scope retrieval to specific departments, document types, date ranges, or access levels — critical for multi-tenant and compliance-sensitive systems.
Use when: You need to restrict results to a specific department, document type, date range, or tenant ID — common in enterprise multi-tenant RAG systems.
Adding Rich Metadata During Ingestion
from langchain_core.documents import Document # Enrich chunks with structured metadata during ingestion. # This metadata enables precise filtered retrieval later. enriched_chunks: list[Document] = [] for chunk in chunks: # Preserve original metadata and add business context chunk.metadata.update({ "department": "engineering", # Business unit tag "doc_type": "architecture", # Document classification "year": 2025, # Temporal filter "access_level": "internal", # Access control tier }) enriched_chunks.append(chunk) # Re-index with enriched metadata filtered_store = Chroma.from_documents( documents=enriched_chunks, embedding=embeddings, persist_directory="./chroma_filtered_db", collection_name="enterprise_docs_filtered", )
Retrieving with Metadata Filters
# ── Filter 1: Single field — only "engineering" department docs ──── eng_retriever = filtered_store.as_retriever( search_type="similarity", search_kwargs={ "k": 5, "filter": {"department": "engineering"}, }, ) # ── Filter 2: Multiple fields (AND logic) ───────────────────────── scoped_retriever = filtered_store.as_retriever( search_type="similarity", search_kwargs={ "k": 5, "filter": { "department": "engineering", "year": 2025, "access_level": "internal", }, }, ) # ── Filter 3: Chroma advanced operators ($gt, $in, $and, $or) ───── advanced_retriever = filtered_store.as_retriever( search_type="similarity", search_kwargs={ "k": 5, "filter": { "$and": [ {"department": {"$in": ["engineering", "product"]}}, {"year": {"$gte": 2024}}, ] }, }, )
Filtered RAG Chain — Complete Example
from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough # Build a RAG chain that only retrieves from engineering docs since 2024 filtered_rag_chain = ( { "context": advanced_retriever | format_docs, "question": RunnablePassthrough(), } | rag_prompt | llm | StrOutputParser() ) # This query will only search engineering + product docs from 2024+ answer = filtered_rag_chain.invoke( "What microservices patterns did we adopt in the latest architecture review?" ) print(answer)
Dynamic Filters — User-Driven Scoping
In real apps, filters come from the user's session (role, department, tenant). Here's a pattern using RunnableLambda to inject dynamic filters at runtime:
from langchain_core.runnables import RunnableLambda def get_filtered_retriever(input_dict: dict): """Create a retriever scoped to the user's department and access level.""" user_filters = { "department": input_dict["user_department"], "access_level": input_dict["user_access_level"], } retriever = filtered_store.as_retriever( search_kwargs={"k": 5, "filter": user_filters} ) # Run the retriever with the user's actual question docs = retriever.invoke(input_dict["question"]) return format_docs(docs) # Dynamic RAG chain — filters are resolved at invocation time dynamic_rag_chain = ( { "context": RunnableLambda(get_filtered_retriever), "question": lambda x: x["question"], } | rag_prompt | llm | StrOutputParser() ) # Invoke with user context — retrieval is automatically scoped answer = dynamic_rag_chain.invoke({ "question": "What's the deployment process for our API gateway?", "user_department": "engineering", "user_access_level": "internal", }) print(answer)
filter dict syntax shown above is Chroma-specific. Pinecone uses a similar syntax but with its own operators. pgvector/PostgreSQL uses SQL-style filters. Azure AI Search uses OData filter expressions. Always consult your vector store's docs for the exact filter grammar.Retrieval Strategy Comparison
| Strategy | When to Use | LangChain Config |
|---|---|---|
| Similarity | Default — nearest neighbors by cosine distance | search_type="similarity" |
| MMR | When you need diverse results (avoid near-duplicates) | search_type="mmr", fetch_k=20, lambda_mult=0.5 |
| Similarity + Score | When you need a relevance threshold (discard low-quality matches) | search_type="similarity_score_threshold", score_threshold=0.7 |
| Filtered | Multi-tenant, access-controlled, or time-bounded retrieval | search_kwargs={"filter": {...}} |
Module 3 — Memory Management
LLMs are stateless — every call starts from scratch. Memory gives your chains conversational continuity. LangChain provides two tiers: short-term (within a session) and long-term (across sessions, persisted to a database). Both integrate cleanly with LCEL via RunnableWithMessageHistory.
3.1 — Short-Term Memory (In-Session Conversation History)
Short-term memory tracks the current conversation using a session_id. The RunnableWithMessageHistory wrapper automatically loads past messages before each call and saves new ones after. For in-memory (non-persistent) use cases, ChatMessageHistory is the simplest store.
Core Concept: RunnableWithMessageHistory
import os from dotenv import load_dotenv from langchain_openai import AzureChatOpenAI from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.output_parsers import StrOutputParser from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory from langchain_community.chat_message_histories import ChatMessageHistory load_dotenv() # 1. LLM setup llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], temperature=0.3, ) # 2. Prompt with a placeholder for injected chat history prompt = ChatPromptTemplate.from_messages([ ("system", "You are a helpful assistant. Use the conversation history to maintain context."), MessagesPlaceholder(variable_name="history"), # ← history is injected here ("human", "{input}"), ]) # 3. LCEL chain — no memory awareness needed at the chain level chain = prompt | llm | StrOutputParser() # 4. In-memory session store — maps session_id → ChatMessageHistory # In production, replace this dict with a database-backed store. session_store: dict[str, ChatMessageHistory] = {} def get_session_history(session_id: str) -> BaseChatMessageHistory: """Return or create the message history for a given session.""" if session_id not in session_store: session_store[session_id] = ChatMessageHistory() return session_store[session_id] # 5. Wrap the chain with message history management chain_with_history = RunnableWithMessageHistory( chain, get_session_history, input_messages_key="input", # Key in the input dict for user message history_messages_key="history", # Matches the MessagesPlaceholder name ) # 6. Invoke with a session_id — history is auto-managed config = {"configurable": {"session_id": "user-123-session-abc"}} # Turn 1 r1 = chain_with_history.invoke({"input": "My name is Alice and I work in the data team."}, config=config) print(f"Turn 1: {r1}") # Turn 2 — the model remembers Alice's name and team r2 = chain_with_history.invoke({"input": "What team did I say I work in?"}, config=config) print(f"Turn 2: {r2}") # → "You said you work in the data team." # Turn 3 — different session = fresh history config_new = {"configurable": {"session_id": "user-456-session-xyz"}} r3 = chain_with_history.invoke({"input": "Do you know my name?"}, config=config_new) print(f"Turn 3 (new session): {r3}") # → "I don't know your name yet."
chain_with_history.stream({"input": "..."}, config=config) works out of the box — history is loaded/saved while tokens stream to the client.How It Works Under the Hood
| Step | What Happens |
|---|---|
| 1. Before invoke | get_session_history(session_id) retrieves the history object |
| 2. Inject | Past messages are injected into the MessagesPlaceholder("history") slot |
| 3. LLM call | The chain runs with the full conversation context |
| 4. After invoke | The user input and AI response are appended to the history for next turn |
Trimming History (Token Budget Management)
Unbounded history will eventually exceed the model's context window. Use trim_messages to keep the history within a token budget:
from langchain_core.messages import trim_messages # Trim strategy: keep the system message + last N tokens worth of history trimmer = trim_messages( max_tokens=4000, # Max tokens to keep in history strategy="last", # Keep the most recent messages token_counter=llm, # Use the LLM's tokenizer for accuracy include_system=True, # Always preserve the system message allow_partial=False, # Don't split messages mid-content ) # Insert the trimmer into the chain — it runs BEFORE the prompt chain_with_trimming = ( trimmer # ← trims the message list first | prompt | llm | StrOutputParser() ) # Wrap with history as before trimmed_chain_with_history = RunnableWithMessageHistory( chain_with_trimming, get_session_history, input_messages_key="input", history_messages_key="history", )
3.2 — Long-Term Memory (Persistent Across Sessions)
Long-term memory persists conversation history to external databases so it survives server restarts, session timeouts, and re-deployments. LangChain provides drop-in history backends for Redis, PostgreSQL, and more. Simply swap the get_session_history function — the rest of the chain stays identical.
Best for: Fast read/write, session-oriented apps, microservices with shared state. Redis TTL auto-expires old sessions.
Option A: Redis
# pip install langchain-redis redis from langchain_redis import RedisChatMessageHistory from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory # Redis connection URL — use rediss:// for TLS in production REDIS_URL = "redis://localhost:6379/0" def get_redis_history(session_id: str) -> BaseChatMessageHistory: """Persistent history backed by Redis. Messages survive restarts.""" return RedisChatMessageHistory( session_id=session_id, redis_url=REDIS_URL, ttl=86400, # Auto-expire after 24 hours (optional) ) # Wrap your existing LCEL chain — zero changes to the chain itself redis_chain = RunnableWithMessageHistory( chain, # Same chain from 3.1 get_redis_history, input_messages_key="input", history_messages_key="history", ) # Session persists across app restarts — Redis stores the messages config = {"configurable": {"session_id": "user-123-persistent"}} response = redis_chain.invoke({"input": "Remember: our API rate limit is 1000 req/min."}, config=config) print(response) # Later (even after restart) the model still knows the rate limit response = redis_chain.invoke({"input": "What's our API rate limit?"}, config=config) print(response) # → "Your API rate limit is 1000 requests per minute."
Best for: Apps already using PostgreSQL, compliance requirements for durable storage, complex queries on history.
Option B: PostgreSQL
# pip install langchain-postgres psycopg[binary] from langchain_postgres import PostgresChatMessageHistory from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory import psycopg # Connection string — use a secrets manager in production, never hardcode PG_CONN = "postgresql://user:password@localhost:5432/chatdb" # Create the messages table (run once at startup) PostgresChatMessageHistory.create_tables(psycopg.connect(PG_CONN), table_name="chat_history") def get_pg_history(session_id: str) -> BaseChatMessageHistory: """Persistent history stored in a PostgreSQL table.""" return PostgresChatMessageHistory( table_name="chat_history", session_id=session_id, sync_connection=psycopg.connect(PG_CONN), ) # Same pattern — swap the history factory, chain stays unchanged pg_chain = RunnableWithMessageHistory( chain, get_pg_history, input_messages_key="input", history_messages_key="history", ) config = {"configurable": {"session_id": "user-789-durable"}} response = pg_chain.invoke({"input": "I prefer Python over JavaScript."}, config=config) print(response)
psycopg_pool.ConnectionPool) instead of creating a new connection per request. The snippet above uses psycopg.connect() for clarity.Best for: Recalling relevant past conversations by meaning, not just replaying the last N turns. Ideal for personalization and user preference recall across many sessions.
Option C: Vector Store Memory (Semantic Long-Term Recall)
Instead of replaying raw message history, this pattern embeds past interactions and retrieves the most relevant ones via similarity search. This is powerful for agents that need to recall facts from hundreds of past sessions.
import os from datetime import datetime from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings from langchain_chroma import Chroma from langchain_core.documents import Document from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough, RunnableLambda embeddings = AzureOpenAIEmbeddings( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], ) # A dedicated vector store for long-term memory (separate from RAG docs) memory_store = Chroma( collection_name="long_term_memory", embedding_function=embeddings, persist_directory="./memory_db", ) # ── Save a memory ───────────────────────────────────────────────── def save_memory(user_id: str, user_msg: str, ai_msg: str) -> None: """Embed and store a conversation turn for long-term recall.""" memory_doc = Document( page_content=f"User: {user_msg}\nAssistant: {ai_msg}", metadata={ "user_id": user_id, "timestamp": datetime.now().isoformat(), }, ) memory_store.add_documents([memory_doc]) # ── Recall relevant memories ────────────────────────────────────── def recall_memories(input_dict: dict) -> str: """Retrieve past interactions semantically relevant to the current query.""" retriever = memory_store.as_retriever( search_kwargs={ "k": 3, "filter": {"user_id": input_dict["user_id"]}, } ) docs = retriever.invoke(input_dict["input"]) if not docs: return "No relevant past interactions found." return "\n---\n".join(d.page_content for d in docs) # ── Chain with semantic memory ──────────────────────────────────── llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], temperature=0.3, ) memory_prompt = ChatPromptTemplate.from_messages([ ("system", """You are a personalized assistant with long-term memory. Use these relevant past interactions to personalize your response: {memories}"""), ("human", "{input}"), ]) memory_chain = ( { "memories": RunnableLambda(recall_memories), "input": lambda x: x["input"], } | memory_prompt | llm | StrOutputParser() ) # ── Usage: Save some memories, then query ───────────────────────── save_memory("alice", "I prefer dark mode in all my tools.", "Noted! I'll remember your preference for dark mode.") save_memory("alice", "Our tech stack is FastAPI + React + PostgreSQL.", "Got it — FastAPI backend, React frontend, PostgreSQL database.") # Later — even in a new session — the model recalls relevant past context result = memory_chain.invoke({ "user_id": "alice", "input": "Can you suggest a deployment setup for our stack?", }) print(result) # → References FastAPI + React + PostgreSQL from memory
Memory Strategy Comparison
| Strategy | Scope | Backend | Best For |
|---|---|---|---|
| ChatMessageHistory (in-memory) | Single process, lost on restart | Python dict | Prototyping, tests |
| Redis | Cross-process, TTL expirable | Redis server | Session-oriented apps, microservices |
| PostgreSQL | Durable, queryable via SQL | PostgreSQL | Compliance, audit trails, existing PG infra |
| Vector Store | Semantic cross-session recall | Chroma, Pinecone, pgvector | Personalization, long-running agents, user preference recall |
Module 4 — Tools & Model Context Protocol
Tools give LLMs the ability to take actions — call APIs, query databases, run calculations, or interact with external systems. LangChain provides a clean abstraction: define a tool, bind it to a model, and let the model decide when and how to invoke it. The Model Context Protocol (MCP) extends this by standardizing how tools and context are exposed across platforms.
4.1 — Adding Tools
LangChain offers two primary ways to create tools: the @tool decorator for simple functions and StructuredTool for more control over schema and validation.
Method 1: @tool Decorator (Simple & Fast)
from langchain_core.tools import tool @tool def get_weather(city: str) -> str: """Get the current weather for a given city. Args: city: The name of the city to look up weather for. """ # In production, call a real weather API here weather_data = { "London": "15°C, cloudy", "New York": "22°C, sunny", "Tokyo": "18°C, light rain", } return weather_data.get(city, f"Weather data not available for {city}") @tool def calculate(expression: str) -> str: """Evaluate a mathematical expression safely. Args: expression: A mathematical expression to evaluate, e.g. '2 + 2 * 3'. """ # Use a safe evaluator — never use eval() with untrusted input allowed_chars = set("0123456789+-*/.(). ") if not all(c in allowed_chars for c in expression): return "Error: expression contains invalid characters." try: result = round(float(eval(expression, {"__builtins__": {}}, {})), 6) return str(result) except Exception as e: return f"Error: {e}" # Inspect the tool schema — this is what the LLM sees print(get_weather.name) # → "get_weather" print(get_weather.description) # → "Get the current weather for a given city." print(get_weather.args_schema.model_json_schema()) # → JSON schema for args
@tool decorator uses the function's docstring as the tool description the LLM sees. Write clear, specific descriptions — they directly affect how reliably the model selects and invokes your tool.Method 2: StructuredTool (Full Control)
Use StructuredTool when you need a Pydantic model for input validation, custom error handling, or when the tool wraps an existing function you don't want to decorate.
from langchain_core.tools import StructuredTool from pydantic import BaseModel, Field # Define a Pydantic schema for rich input validation class SearchInput(BaseModel): """Input schema for the enterprise search tool.""" query: str = Field(description="The search query string") department: str = Field( default="all", description="Filter results to a specific department", ) max_results: int = Field( default=5, ge=1, le=20, description="Maximum number of results to return (1-20)", ) # The actual implementation function def search_enterprise_docs(query: str, department: str = "all", max_results: int = 5) -> str: """Search internal documents. In production, call your search API here.""" return f"Found {max_results} results for '{query}' in {department} department." # Create the tool with explicit schema, name, and description search_tool = StructuredTool.from_function( func=search_enterprise_docs, name="search_enterprise_docs", description="Search the company's internal document repository. Use this when the user asks about internal policies, procedures, or company information.", args_schema=SearchInput, return_direct=False, # False = LLM can process the result before responding ) # Test it directly print(search_tool.invoke({"query": "vacation policy", "department": "hr"}))
Async Tools
For I/O-bound operations (API calls, database queries), define async tools to avoid blocking:
import httpx from langchain_core.tools import tool @tool async def fetch_stock_price(symbol: str) -> str: """Fetch the current stock price for a given ticker symbol. Args: symbol: Stock ticker symbol, e.g. 'AAPL', 'GOOGL'. """ async with httpx.AsyncClient() as client: # Example API call — replace with your actual data provider resp = await client.get( f"https://api.example.com/stock/{symbol}", timeout=10.0, ) resp.raise_for_status() data = resp.json() return f"{symbol}: ${data['price']:.2f}"
4.2 — Tool Binding
Defining tools is only half the story. You must bind them to a chat model so it knows they exist and can decide when to call them. LangChain's .bind_tools() method attaches tool schemas to the model. The model then returns tool_calls in its response when it decides a tool should be invoked.
Binding Tools to a Model
import os from dotenv import load_dotenv from langchain_openai import AzureChatOpenAI load_dotenv() llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], temperature=0.0, ) # Bind tools — the model now knows about get_weather, calculate, search_tool tools = [get_weather, calculate, search_tool] llm_with_tools = llm.bind_tools(tools) # Ask a question that should trigger a tool call response = llm_with_tools.invoke("What's the weather in London?") # The model doesn't call the tool itself — it returns a tool_call request print(response.content) # → "" (empty when tool call is made) print(response.tool_calls) # → [{'name': 'get_weather', 'args': {'city': 'London'}, 'id': '...'}]
bind_tools() doesn't execute tools. The model returns a tool_calls list describing which tool to call and with what arguments. Your code (or an AgentExecutor) is responsible for actually executing the tool and feeding the result back. This is the standard "tool calling" protocol supported by OpenAI, Azure, Gemini, and Anthropic models.Complete Tool Calling Loop (Manual)
Here's the full loop: invoke the model → detect tool calls → execute tools → feed results back → get final answer.
from langchain_core.messages import HumanMessage, ToolMessage # Step 1: User asks a question messages = [HumanMessage(content="What's 15% of 340, and what's the weather in Tokyo?")] # Step 2: Model decides which tools to call (may call multiple) ai_response = llm_with_tools.invoke(messages) messages.append(ai_response) # Add AI response to conversation # Step 3: Execute each tool call and collect results tool_map = {t.name: t for t in tools} for tool_call in ai_response.tool_calls: tool_fn = tool_map[tool_call["name"]] result = tool_fn.invoke(tool_call["args"]) # ToolMessage carries the result back, linked by tool_call_id messages.append(ToolMessage( content=str(result), tool_call_id=tool_call["id"], )) # Step 4: Model synthesizes tool results into a natural language answer final_response = llm_with_tools.invoke(messages) print(final_response.content) # → "15% of 340 is 51. The weather in Tokyo is 18°C with light rain."
LCEL Chain with Auto Tool Execution
For simpler cases, use tool_choice to force a specific tool and pipe the result directly:
from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate prompt = ChatPromptTemplate.from_messages([ ("system", "You are a helpful assistant with access to tools."), ("human", "{input}"), ]) # Force the model to always call a specific tool forced_weather = llm.bind_tools([get_weather], tool_choice="get_weather") # Or let it decide freely (tool_choice="auto" is the default) auto_tools = llm.bind_tools(tools, tool_choice="auto")
4.3 — Model Context Protocol (MCP)
The Model Context Protocol is an open standard (originally from Anthropic) that defines how LLMs discover and invoke tools, access resources, and receive context — regardless of the model provider or client. Think of it as a USB-C for AI tools: one standard interface that works everywhere.
How it fits: The langchain-mcp-adapters package bridges MCP servers (which expose tools via the MCP protocol) with LangChain's tool abstraction. Any MCP server becomes a set of LangChain-compatible tools — usable with bind_tools(), agents, and LangGraph.
MCP Architecture
| Component | Role | Example |
|---|---|---|
| MCP Server | Exposes tools, resources, and prompts via the MCP protocol | A file system server, a database server, a GitHub server |
| MCP Client | Connects to servers, discovers tools, and routes invocations | MultiServerMCPClient from langchain-mcp-adapters |
| LangChain Bridge | Converts MCP tools → LangChain BaseTool instances |
Automatic via adapter — tools appear native to LangChain |
| Transport | Communication channel between client and server | stdio (local process), sse (HTTP streaming) |
Connecting an MCP Server to LangChain (Stdio Transport)
# pip install langchain-mcp-adapters import asyncio import os from dotenv import load_dotenv from langchain_mcp_adapters.client import MultiServerMCPClient from langchain_openai import AzureChatOpenAI from langgraph.prebuilt import create_react_agent load_dotenv() async def main(): # 1. Configure MCP server connections # Each server is a separate process communicating via stdio async with MultiServerMCPClient( { "filesystem": { "command": "npx", "args": [ "-y", "@modelcontextprotocol/server-filesystem", "/path/to/allowed/directory", ], "transport": "stdio", }, "github": { "command": "npx", "args": ["-y", "@modelcontextprotocol/server-github"], "transport": "stdio", "env": { "GITHUB_TOKEN": os.environ["GITHUB_TOKEN"], }, }, } ) as mcp_client: # 2. Get LangChain-compatible tools from ALL connected MCP servers mcp_tools = mcp_client.get_tools() print(f"Discovered {len(mcp_tools)} tools from MCP servers:") for t in mcp_tools: print(f" - {t.name}: {t.description[:80]}") # 3. Create a LangGraph ReAct agent with MCP tools # The agent can now read files, list directories, create GitHub issues, etc. llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], temperature=0.0, ) agent = create_react_agent(llm, mcp_tools) # 4. Invoke — the agent uses MCP tools as if they were native result = await agent.ainvoke({ "messages": [("human", "List all Python files in the project and summarize README.md")], }) print(result["messages"][-1].content) asyncio.run(main())
Connecting via SSE (Remote MCP Server)
For remote MCP servers exposed over HTTP (e.g., deployed as a microservice), use the sse transport:
async with MultiServerMCPClient( { "remote_tools": { "url": "http://mcp-server.internal:8080/sse", "transport": "sse", }, } ) as mcp_client: tools = mcp_client.get_tools() # Use tools with any LangChain agent or chain...
Bridging a Custom MCP Server with LangChain
If you've built your own MCP server (e.g., exposing a proprietary database), here's the conceptual pattern for bridging it into a LangChain agent:
""" Conceptual architecture: Custom MCP Server → LangChain Agent 1. Your MCP server exposes tools via the MCP protocol: - list_customers(region: str) → customer records - get_order_status(order_id: str) → status - create_support_ticket(subject: str, body: str) → ticket ID 2. langchain-mcp-adapters discovers these tools at runtime 3. Tools are automatically converted to LangChain BaseTool instances 4. Any LangChain agent can invoke them — AgentExecutor, LangGraph, etc. """ import asyncio from langchain_mcp_adapters.client import MultiServerMCPClient from langgraph.prebuilt import create_react_agent from langchain_openai import AzureChatOpenAI async def run_custom_mcp_agent(): # Connect to your custom MCP server async with MultiServerMCPClient( { "crm": { "command": "python", "args": ["my_mcp_server.py"], # Your custom MCP server "transport": "stdio", }, } ) as client: crm_tools = client.get_tools() # crm_tools now contains: # - list_customers (LangChain BaseTool) # - get_order_status (LangChain BaseTool) # - create_support_ticket (LangChain BaseTool) # Combine MCP tools with native LangChain tools all_tools = crm_tools + [get_weather, calculate] # Mix and match llm = AzureChatOpenAI( azure_endpoint="...", azure_deployment="gpt-4o", api_version="2024-12-01-preview", ) # Agent has access to both MCP and native tools seamlessly agent = create_react_agent(llm, all_tools) result = await agent.ainvoke({ "messages": [("human", "List all customers in EMEA and check order #12345 status")], }) print(result["messages"][-1].content) asyncio.run(run_custom_mcp_agent())
env dict in the client config to scope secrets per server — never expose credentials globally.MCP vs. Native LangChain Tools — When to Use What
| Aspect | Native LangChain Tools | MCP Tools |
|---|---|---|
| Definition | @tool decorator or StructuredTool in Python |
MCP server (any language) exposes tools via protocol |
| Discovery | Manually registered in code | Auto-discovered at runtime from connected servers |
| Language | Python only | Any (TypeScript, Python, Go, Rust…) |
| Reusability | Within your LangChain codebase | Across any MCP-compatible client (Claude, Copilot, LangChain, etc.) |
| Best for | App-specific logic tightly coupled to your chain | Shared platform tools, org-wide services, polyglot environments |
| Setup overhead | Minimal — just Python functions | Requires an MCP server process |
@tool for chain-specific logic (formatters, calculators, custom RAG helpers). Use MCP for organization-wide shared tools (database access, ticketing systems, file systems) that multiple AI clients need to use. The two approaches compose seamlessly — all_tools = mcp_tools + native_tools.Module 5 — Middleware & Callbacks
Callbacks are LangChain's observability layer — hooks that fire at key lifecycle events (LLM start, tool invocation, chain error, token streamed, etc.). They power logging, tracing, streaming, cost tracking, and custom monitoring without modifying your chain logic. Think of them as middleware for your AI pipeline.
5.1 — Built-In Callbacks & Streaming
Token-Level Streaming
The most common use of callbacks — streaming tokens to the user as they're generated, giving instant feedback instead of waiting for the full response:
import os from dotenv import load_dotenv from langchain_openai import AzureChatOpenAI from langchain_core.prompts import ChatPromptTemplate load_dotenv() llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], temperature=0.7, streaming=True, ) prompt = ChatPromptTemplate.from_messages([ ("system", "You are a concise technical writer."), ("human", "{question}"), ]) chain = prompt | llm # Method 1: .stream() — yields chunks as they arrive for chunk in chain.stream({"question": "Explain LCEL in 3 sentences"}): print(chunk.content, end="", flush=True) # Method 2: .astream() — async streaming (FastAPI, async apps) import asyncio async def stream_response(): async for chunk in chain.astream({"question": "What is RAG?"}): print(chunk.content, end="", flush=True) asyncio.run(stream_response())
Streaming Events (Full Lifecycle Visibility)
.astream_events() gives you fine-grained access to every event in the chain — useful for building progress indicators, debugging, or advanced UIs:
from langchain_core.output_parsers import StrOutputParser chain = prompt | llm | StrOutputParser() async def observe_events(): async for event in chain.astream_events( {"question": "What are LangChain callbacks?"}, version="v2", ): kind = event["event"] if kind == "on_chat_model_stream": # Token-level streaming from the LLM token = event["data"]["chunk"].content print(token, end="", flush=True) elif kind == "on_chain_start": print(f"\n⏳ Chain started: {event['name']}") elif kind == "on_chain_end": print(f"\n✅ Chain finished: {event['name']}") elif kind == "on_tool_start": print(f"\n🔧 Tool invoked: {event['name']}") asyncio.run(observe_events())
on_chain_start/end, on_chat_model_start/end/stream, on_llm_start/end/stream, on_tool_start/end, on_retriever_start/end, on_parser_start/end. Each event includes name, data, run_id, and parent_ids for correlation.Inline Callbacks with Handler List
You can pass callback handlers directly to any .invoke() or .stream() call via the config parameter:
from langchain_core.callbacks import StdOutCallbackHandler # StdOutCallbackHandler prints every lifecycle event to stdout result = chain.invoke( {"question": "What is LCEL?"}, config={"callbacks": [StdOutCallbackHandler()]}, ) # Multiple handlers compose — each receives all events result = chain.invoke( {"question": "What is LCEL?"}, config={"callbacks": [StdOutCallbackHandler(), my_custom_handler]}, )
5.2 — Custom Callback Handlers
For production systems, you'll want custom handlers that log to your observability stack, track costs, enforce guardrails, or measure latency. Extend BaseCallbackHandler (sync) or AsyncCallbackHandler (async) and override only the hooks you need.
Production Logging Handler
import time import logging from typing import Any from uuid import UUID from langchain_core.callbacks import BaseCallbackHandler from langchain_core.messages import BaseMessage from langchain_core.outputs import LLMResult logger = logging.getLogger("langchain.telemetry") class ProductionLoggingHandler(BaseCallbackHandler): """Logs LLM calls, latency, token usage, and errors to structured logging.""" def __init__(self): self._start_times: dict[UUID, float] = {} def on_chat_model_start( self, serialized: dict[str, Any], messages: list[list[BaseMessage]], *, run_id: UUID, **kwargs: Any, ) -> None: self._start_times[run_id] = time.perf_counter() model = serialized.get("kwargs", {}).get("model_name", "unknown") logger.info( "LLM call started", extra={"run_id": str(run_id), "model": model}, ) def on_llm_end( self, response: LLMResult, *, run_id: UUID, **kwargs: Any, ) -> None: elapsed = time.perf_counter() - self._start_times.pop(run_id, 0) usage = response.llm_output.get("token_usage", {}) if response.llm_output else {} logger.info( "LLM call completed", extra={ "run_id": str(run_id), "latency_ms": round(elapsed * 1000, 2), "prompt_tokens": usage.get("prompt_tokens"), "completion_tokens": usage.get("completion_tokens"), "total_tokens": usage.get("total_tokens"), }, ) def on_llm_error( self, error: BaseException, *, run_id: UUID, **kwargs: Any, ) -> None: elapsed = time.perf_counter() - self._start_times.pop(run_id, 0) logger.error( "LLM call failed", extra={ "run_id": str(run_id), "latency_ms": round(elapsed * 1000, 2), "error": str(error), }, )
Cost Tracking Handler
from langchain_core.callbacks import BaseCallbackHandler from langchain_core.outputs import LLMResult from uuid import UUID from typing import Any # Pricing per 1K tokens (update with your Azure/OpenAI pricing) PRICING = { "gpt-4o": {"input": 0.0025, "output": 0.01}, "gpt-4o-mini": {"input": 0.00015, "output": 0.0006}, "gpt-4.1": {"input": 0.002, "output": 0.008}, } class CostTracker(BaseCallbackHandler): """Track cumulative token usage and estimated cost across a session.""" def __init__(self, model_name: str = "gpt-4o"): self.model_name = model_name self.total_prompt_tokens = 0 self.total_completion_tokens = 0 self.total_cost = 0.0 self.call_count = 0 def on_llm_end(self, response: LLMResult, *, run_id: UUID, **kwargs: Any) -> None: usage = response.llm_output.get("token_usage", {}) if response.llm_output else {} prompt_tokens = usage.get("prompt_tokens", 0) completion_tokens = usage.get("completion_tokens", 0) self.total_prompt_tokens += prompt_tokens self.total_completion_tokens += completion_tokens self.call_count += 1 prices = PRICING.get(self.model_name, PRICING["gpt-4o"]) cost = (prompt_tokens / 1000) * prices["input"] + \ (completion_tokens / 1000) * prices["output"] self.total_cost += cost def report(self) -> str: return ( f"📊 Session Report:\n" f" Calls: {self.call_count}\n" f" Prompt tokens: {self.total_prompt_tokens:,}\n" f" Completion tokens: {self.total_completion_tokens:,}\n" f" Estimated cost: ${self.total_cost:.4f}" ) # Usage cost_tracker = CostTracker(model_name="gpt-4o") log_handler = ProductionLoggingHandler() result = chain.invoke( {"question": "Explain microservices"}, config={"callbacks": [cost_tracker, log_handler]}, ) print(cost_tracker.report()) # 📊 Session Report: # Calls: 1 # Prompt tokens: 28 # Completion tokens: 142 # Estimated cost: $0.0015
Async Callback Handler (for I/O-bound telemetry)
When your callback needs to make network calls (send metrics to Datadog, post to a webhook), use AsyncCallbackHandler to avoid blocking the chain:
import httpx from langchain_core.callbacks import AsyncCallbackHandler from langchain_core.outputs import LLMResult from uuid import UUID from typing import Any class WebhookNotifier(AsyncCallbackHandler): """Send LLM completion events to a webhook endpoint asynchronously.""" def __init__(self, webhook_url: str): self.webhook_url = webhook_url async def on_llm_end( self, response: LLMResult, *, run_id: UUID, **kwargs: Any, ) -> None: usage = response.llm_output.get("token_usage", {}) if response.llm_output else {} async with httpx.AsyncClient() as client: await client.post( self.webhook_url, json={ "event": "llm_completion", "run_id": str(run_id), "tokens": usage, }, timeout=5.0, ) async def on_llm_error( self, error: BaseException, *, run_id: UUID, **kwargs: Any, ) -> None: async with httpx.AsyncClient() as client: await client.post( self.webhook_url, json={ "event": "llm_error", "run_id": str(run_id), "error": str(error), }, timeout=5.0, )
Streaming to FastAPI with Callbacks
A common production pattern — streaming LLM output to a web client via Server-Sent Events:
from fastapi import FastAPI from fastapi.responses import StreamingResponse from langchain_openai import AzureChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser app = FastAPI() llm = AzureChatOpenAI( azure_endpoint="...", azure_deployment="gpt-4o", api_version="2024-12-01-preview", streaming=True, ) prompt = ChatPromptTemplate.from_messages([ ("system", "You are a helpful assistant."), ("human", "{question}"), ]) chain = prompt | llm | StrOutputParser() @app.get("/stream") async def stream_answer(question: str): async def event_generator(): async for chunk in chain.astream({"question": question}): yield f"data: {chunk}\n\n" yield "data: [DONE]\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", )
BaseCallbackHandler) run on the main thread and block execution. For network calls (logging to external services, sending webhooks), always use AsyncCallbackHandler to avoid adding latency to your chain.All Available Callback Hooks
| Hook | Fires When | Key Data |
|---|---|---|
on_chat_model_start |
Chat model receives messages | Serialized model, input messages, run_id |
on_llm_start |
Any LLM receives prompts | Serialized LLM, prompt strings |
on_llm_new_token |
Each streamed token arrives | Token string, chunk index |
on_llm_end |
LLM returns full response | LLMResult with generations + token_usage |
on_llm_error |
LLM call raises an exception | Exception object, run_id |
on_chain_start / end / error |
Any Runnable chain starts/finishes/fails | Serialized chain, inputs/outputs |
on_tool_start / end / error |
Tool invocation lifecycle | Tool name, input, output |
on_retriever_start / end / error |
Retriever fetches documents | Query, retrieved documents |
on_retry |
A retry attempt is made | Retry state, error that triggered it |
LANGCHAIN_TRACING_V2=true and LANGCHAIN_API_KEY=... in your environment, and all chain executions are automatically traced with latency, token usage, inputs/outputs, and error details — zero code changes needed.Module 6 — Guardrails & Output Validation
Production LLM applications need deterministic, validated outputs. LangChain provides first-class support for structured output parsing, Pydantic validation, and fallback chains — ensuring your pipelines degrade gracefully and always return data your downstream code can trust.
6.1 — Structured Output with Pydantic
The most reliable approach: use .with_structured_output() to force the model to return data matching a Pydantic schema. This uses the model's native tool-calling / JSON mode under the hood — no fragile prompt engineering required.
import os from dotenv import load_dotenv from langchain_openai import AzureChatOpenAI from langchain_core.prompts import ChatPromptTemplate from pydantic import BaseModel, Field load_dotenv() # Define the expected output schema class SentimentAnalysis(BaseModel): """Structured sentiment analysis result.""" sentiment: str = Field(description="One of: positive, negative, neutral") confidence: float = Field(ge=0.0, le=1.0, description="Confidence score 0-1") reasoning: str = Field(description="Brief explanation of the classification") key_phrases: list[str] = Field(description="Key phrases that drove the classification") llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], temperature=0.0, ) # .with_structured_output() returns a Pydantic instance directly structured_llm = llm.with_structured_output(SentimentAnalysis) prompt = ChatPromptTemplate.from_messages([ ("system", "You are a sentiment analysis engine. Analyze the given text."), ("human", "{text}"), ]) chain = prompt | structured_llm # Result is a validated Pydantic object — not a string! result: SentimentAnalysis = chain.invoke({ "text": "The new API is blazing fast but the docs are terrible." }) print(result.sentiment) # → "negative" or "neutral" print(result.confidence) # → 0.72 print(result.key_phrases) # → ["blazing fast", "docs are terrible"] print(result.model_dump()) # → Full dict for serialization
.with_structured_output() over manual parsing? It uses the model's native function-calling / JSON schema mode, which is significantly more reliable than asking the LLM to produce JSON in free text and then parsing it. Pydantic validation is applied automatically — if the model returns a confidence of 1.5, the validation will catch it.Complex Nested Schemas
from pydantic import BaseModel, Field from enum import Enum class Priority(str, Enum): HIGH = "high" MEDIUM = "medium" LOW = "low" class ActionItem(BaseModel): task: str = Field(description="Specific action to take") assignee: str = Field(description="Suggested owner for this task") priority: Priority = Field(description="Priority level") class MeetingNotes(BaseModel): """Structured meeting summary with decisions and action items.""" title: str = Field(description="Meeting title") summary: str = Field(description="2-3 sentence summary of the meeting") decisions: list[str] = Field(description="Key decisions made") action_items: list[ActionItem] = Field(description="Concrete action items") follow_up_date: str | None = Field( default=None, description="Next follow-up date if mentioned, ISO format", ) # The model will produce valid nested objects structured_llm = llm.with_structured_output(MeetingNotes) notes = structured_llm.invoke("Summarize: We met on Monday to discuss Q3 launch...") for item in notes.action_items: print(f"[{item.priority.value}] {item.task} → {item.assignee}")
PydanticOutputParser (Manual Approach)
For models that don't support native structured output, or when you need more control over the prompt, use PydanticOutputParser:
from langchain_core.output_parsers import PydanticOutputParser parser = PydanticOutputParser(pydantic_object=SentimentAnalysis) prompt = ChatPromptTemplate.from_messages([ ("system", "Analyze the sentiment of the text.\n{format_instructions}"), ("human", "{text}"), ]) # Inject format_instructions into the prompt (tells model the JSON schema) chain = prompt.partial(format_instructions=parser.get_format_instructions()) | llm | parser result = chain.invoke({"text": "Best product I've ever used!"}) # result is a validated SentimentAnalysis instance
Output Fixing Parser (Auto-Correct Malformed Output)
from langchain.output_parsers import OutputFixingParser # Wraps another parser — if parsing fails, asks the LLM to fix the output fixing_parser = OutputFixingParser.from_llm(parser=parser, llm=llm) # If the model returns malformed JSON, fixing_parser will: # 1. Catch the parse error # 2. Send the error + original output back to the LLM # 3. Ask the LLM to fix it # 4. Parse again chain_with_fix = prompt.partial( format_instructions=parser.get_format_instructions() ) | llm | fixing_parser
6.2 — Fallbacks & Retry Logic
LLMs are non-deterministic and external APIs fail. LangChain's .with_fallbacks() and .with_retry() let you build resilient chains that gracefully degrade without complex try/except trees.
Model Fallback Chain
If the primary model fails (rate limit, outage, timeout), automatically fall back to an alternative:
from langchain_openai import AzureChatOpenAI # Primary: GPT-4o (best quality) primary_llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment="gpt-4o", api_version="2024-12-01-preview", temperature=0.0, request_timeout=30, ) # Fallback: GPT-4o-mini (faster, cheaper, still good) fallback_llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment="gpt-4o-mini", api_version="2024-12-01-preview", temperature=0.0, request_timeout=30, ) # If primary raises ANY exception, fallback is tried resilient_llm = primary_llm.with_fallbacks([fallback_llm]) # Use it exactly like a normal LLM — fallback is transparent result = resilient_llm.invoke("Explain quantum computing in one paragraph.")
Retry with Exponential Backoff
# Retry on transient errors (rate limits, network blips) retrying_llm = primary_llm.with_retry( stop_after_attempt=3, # Max 3 attempts wait_exponential_jitter=True, # Exponential backoff with jitter ) # Combine retry + fallback for maximum resilience robust_llm = primary_llm.with_retry( stop_after_attempt=2, wait_exponential_jitter=True, ).with_fallbacks([ fallback_llm.with_retry(stop_after_attempt=2, wait_exponential_jitter=True) ]) # Execution order: primary (try 1) → primary (try 2) → fallback (try 1) → fallback (try 2)
Chain-Level Fallbacks
Fallbacks work at the chain level too — not just on individual models:
from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser # Detailed chain with structured output detailed_prompt = ChatPromptTemplate.from_messages([ ("system", "Provide a detailed, structured analysis with sections and bullet points."), ("human", "{question}"), ]) detailed_chain = detailed_prompt | primary_llm | StrOutputParser() # Simple chain as fallback — less fancy, but always works simple_prompt = ChatPromptTemplate.from_messages([ ("system", "Give a brief, concise answer."), ("human", "{question}"), ]) simple_chain = simple_prompt | fallback_llm | StrOutputParser() # If the detailed chain fails for any reason, fall back to the simple one resilient_chain = detailed_chain.with_fallbacks([simple_chain])
Conditional Validation with RunnableLambda
Add custom validation logic anywhere in the chain using RunnableLambda:
from langchain_core.runnables import RunnableLambda def validate_sentiment(result: SentimentAnalysis) -> SentimentAnalysis: """Post-processing validation — raise if output is suspicious.""" valid_sentiments = {"positive", "negative", "neutral"} if result.sentiment.lower() not in valid_sentiments: raise ValueError(f"Invalid sentiment: {result.sentiment}") if not result.key_phrases: raise ValueError("key_phrases cannot be empty") return result # Chain: prompt → structured LLM → validate → output validated_chain = ( prompt | llm.with_structured_output(SentimentAnalysis) | RunnableLambda(validate_sentiment) ) # Add a fallback for validation failures fallback_chain = ( prompt | fallback_llm.with_structured_output(SentimentAnalysis) | RunnableLambda(validate_sentiment) ) safe_chain = validated_chain.with_fallbacks([fallback_chain])
• Model degradation: GPT-4o → GPT-4o-mini (quality trade-off for reliability)
• Provider failover: Azure OpenAI → OpenAI direct → Anthropic Claude
• Approach change: Structured output → free text with manual parsing
• Graceful default: Return cached/default response if all else fails
Putting It All Together — Production Guardrails Pattern
from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnableLambda from langchain_openai import AzureChatOpenAI from pydantic import BaseModel, Field class ExtractedEntity(BaseModel): name: str = Field(description="Entity name") entity_type: str = Field(description="Type: person, org, location, date") context: str = Field(description="Sentence where entity appears") class EntityExtractionResult(BaseModel): entities: list[ExtractedEntity] = Field(description="All extracted entities") source_language: str = Field(description="Detected language of input") prompt = ChatPromptTemplate.from_messages([ ("system", "Extract all named entities from the text."), ("human", "{text}"), ]) # Build the full production chain: # 1. Structured output with Pydantic validation # 2. Retry on transient failures # 3. Fallback to cheaper model if primary fails # 4. Custom post-validation def post_validate(result: EntityExtractionResult) -> EntityExtractionResult: # Remove any entities with empty names (hallucination guard) result.entities = [e for e in result.entities if e.name.strip()] return result primary_chain = ( prompt | primary_llm.with_structured_output(EntityExtractionResult).with_retry( stop_after_attempt=2, wait_exponential_jitter=True ) | RunnableLambda(post_validate) ) fallback_chain = ( prompt | fallback_llm.with_structured_output(EntityExtractionResult).with_retry( stop_after_attempt=2, wait_exponential_jitter=True ) | RunnableLambda(post_validate) ) production_chain = primary_chain.with_fallbacks([fallback_chain]) # Use it — resilient, validated, type-safe result = production_chain.invoke({ "text": "Tim Cook announced that Apple will open a new office in Berlin by March 2026." }) for e in result.entities: print(f" {e.entity_type}: {e.name}") # person: Tim Cook # org: Apple # location: Berlin # date: March 2026
Module 7 — Agents & Multi-Agent Systems
Agents are LLM-powered programs that can reason, plan, and take actions in a loop. Unlike chains (which follow a fixed path), agents dynamically decide which tools to call, in what order, and when to stop. LangChain supports two approaches: legacy AgentExecutor (simple, good for prototyping) and LangGraph (production-grade, full control over state and flow).
How agents work: The model receives a task + tool descriptions → reasons about what to do → emits a tool call → your code executes the tool → result goes back to the model → repeats until the model decides it has a final answer. This is the Reasoning + Acting (ReAct) pattern.
7.1 — Single Agent with Tool Calling
LangGraph ReAct Agent (Recommended)
create_react_agent from LangGraph is the modern, production-recommended way to build agents. It gives you a stateful graph with built-in tool execution, message history, and streaming — all composable with LangGraph's state machine primitives.
import os from dotenv import load_dotenv from langchain_openai import AzureChatOpenAI from langchain_core.tools import tool from langgraph.prebuilt import create_react_agent load_dotenv() # Define tools @tool def search_docs(query: str) -> str: """Search the knowledge base for relevant documents. Args: query: The search query to find relevant information. """ # In production, call your retriever / search API here return f"Found 3 documents matching '{query}': [Doc about API design, Doc about testing, Doc about deployment]" @tool def create_ticket(title: str, description: str, priority: str = "medium") -> str: """Create a support ticket in the ticketing system. Args: title: Short title for the ticket. description: Detailed description of the issue. priority: Priority level - low, medium, or high. """ # In production, call your ticketing API here return f"✅ Ticket created: '{title}' (priority: {priority}) — ID: TICKET-4521" @tool def get_user_info(user_id: str) -> str: """Look up information about a user by their ID. Args: user_id: The unique user identifier. """ return f"User {user_id}: Jane Smith, Engineering, joined 2023-06" # Create the model llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], temperature=0.0, ) # Create the agent — this is a full LangGraph StateGraph under the hood agent = create_react_agent( model=llm, tools=[search_docs, create_ticket, get_user_info], prompt="You are an IT support agent. Help users with their technical issues. Use your tools to look up information and create tickets when needed.", ) # Invoke — the agent loops: reason → tool → observe → repeat result = agent.invoke({ "messages": [("human", "User u-1234 is having API timeout issues. Search the docs for solutions and if there's no quick fix, create a high priority ticket.")], }) # Print the full conversation (shows reasoning + tool calls + final answer) for msg in result["messages"]: print(f"{msg.__class__.__name__}: {msg.content[:120]}")
Streaming Agent Output
# Stream the agent's execution step-by-step import asyncio async def stream_agent(): async for event in agent.astream_events( {"messages": [("human", "Look up user u-5678 and create a ticket for their login issue")]}, version="v2", ): kind = event["event"] if kind == "on_chat_model_stream": token = event["data"]["chunk"].content if token: print(token, end="", flush=True) elif kind == "on_tool_start": print(f"\n🔧 Calling tool: {event['name']}") elif kind == "on_tool_end": print(f"\n✅ Tool result: {event['data'].get('output', '')[:100]}") asyncio.run(stream_agent())
Agent with Memory (Stateful Conversations)
LangGraph agents support persistent memory via checkpointers — each thread maintains its full conversation history across invocations:
from langgraph.checkpoint.memory import MemorySaver # In-memory checkpointer (use PostgresSaver or RedisSaver in production) memory = MemorySaver() agent_with_memory = create_react_agent( model=llm, tools=[search_docs, create_ticket, get_user_info], prompt="You are an IT support agent. Remember our conversation history.", checkpointer=memory, ) # Thread ID groups messages into a conversation config = {"configurable": {"thread_id": "session-001"}} # First message result1 = agent_with_memory.invoke( {"messages": [("human", "Look up user u-1234")]}, config=config, ) print(result1["messages"][-1].content) # Follow-up — agent remembers the previous context result2 = agent_with_memory.invoke( {"messages": [("human", "Create a ticket for them about slow builds")]}, config=config, ) print(result2["messages"][-1].content) # → Agent knows "them" = user u-1234 (Jane Smith) from previous turn
create_tool_calling_agent() + AgentExecutor in older tutorials. While still functional, LangGraph's create_react_agent is the recommended approach for new projects — it offers better streaming, state management, error recovery, and composability. The AgentExecutor will eventually be deprecated.7.2 — Multi-Agent Architectures with LangGraph
For complex tasks that exceed a single agent's scope, LangGraph lets you compose multiple specialized agents into a coordinated workflow. Each agent has its own tools and expertise, and a supervisor (or a graph topology) routes tasks between them.
Architecture Patterns
| Pattern | Description | Best For |
|---|---|---|
| Supervisor | One coordinator agent routes tasks to worker agents | Heterogeneous tasks (research + code + review) |
| Sequential | Agents form a pipeline — output of one feeds the next | Multi-step workflows (draft → review → publish) |
| Hierarchical | Supervisors delegate to sub-supervisors → workers | Large organizations with specialized teams |
| Collaborative | Agents communicate peer-to-peer on a shared state | Debate, negotiation, multi-perspective analysis |
Supervisor Multi-Agent Pattern
The most common pattern: a supervisor LLM decides which specialist agent to invoke based on the task. Each worker agent has its own tools and system prompt.
from typing import Annotated, Literal from typing_extensions import TypedDict import operator from langchain_openai import AzureChatOpenAI from langchain_core.tools import tool from langchain_core.messages import HumanMessage, BaseMessage from langgraph.graph import StateGraph, END from langgraph.prebuilt import create_react_agent # ─── Shared State ─── class AgentState(TypedDict): messages: Annotated[list[BaseMessage], operator.add] next_agent: str # ─── Define Specialist Tools ─── @tool def search_web(query: str) -> str: """Search the web for current information. Args: query: Search query string. """ return f"Web results for '{query}': [Article about latest trends, Research paper, Blog post]" @tool def run_python(code: str) -> str: """Execute Python code for data analysis. Args: code: Python code to execute safely. """ return f"Code executed successfully. Output: [Analysis results with 3 charts]" @tool def write_document(title: str, content: str) -> str: """Write and save a document. Args: title: Document title. content: Full document content in markdown. """ return f"Document '{title}' saved successfully ({len(content)} chars)" # ─── Create Specialist Agents ─── llm = AzureChatOpenAI( azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"], api_version=os.environ["AZURE_OPENAI_API_VERSION"], temperature=0.0, ) researcher = create_react_agent( model=llm, tools=[search_web], prompt="You are a research specialist. Search the web thoroughly and provide detailed findings with sources.", ) analyst = create_react_agent( model=llm, tools=[run_python], prompt="You are a data analyst. Analyze data using Python and produce clear summaries with key metrics.", ) writer = create_react_agent( model=llm, tools=[write_document], prompt="You are a technical writer. Take research and analysis and produce a well-structured report.", )
Building the Supervisor Graph
from pydantic import BaseModel, Field # ─── Supervisor decides which agent to call next ─── class RouterDecision(BaseModel): """The supervisor's routing decision.""" next: Literal["researcher", "analyst", "writer", "FINISH"] = Field( description="Which agent should handle the next step, or FINISH if done." ) reasoning: str = Field(description="Why this agent was chosen.") supervisor_llm = llm.with_structured_output(RouterDecision) SUPERVISOR_PROMPT = """You are a project supervisor managing a team of specialists: - **researcher**: Searches the web for information and current data - **analyst**: Runs Python code for data analysis and computations - **writer**: Produces well-structured documents and reports Given the conversation so far, decide which agent should act next. If the task is fully complete, respond with FINISH. Always route research tasks before analysis, and analysis before writing.""" def supervisor_node(state: AgentState) -> AgentState: # Ask the supervisor LLM to decide decision = supervisor_llm.invoke([ ("system", SUPERVISOR_PROMPT), *state["messages"], ("human", "Based on the progress so far, which agent should act next?"), ]) return {"messages": [], "next_agent": decision.next} # ─── Worker node wrappers ─── def researcher_node(state: AgentState) -> AgentState: result = researcher.invoke({"messages": state["messages"]}) return {"messages": result["messages"][-1:], "next_agent": ""} def analyst_node(state: AgentState) -> AgentState: result = analyst.invoke({"messages": state["messages"]}) return {"messages": result["messages"][-1:], "next_agent": ""} def writer_node(state: AgentState) -> AgentState: result = writer.invoke({"messages": state["messages"]}) return {"messages": result["messages"][-1:], "next_agent": ""} # ─── Build the Graph ─── graph = StateGraph(AgentState) # Add nodes graph.add_node("supervisor", supervisor_node) graph.add_node("researcher", researcher_node) graph.add_node("analyst", analyst_node) graph.add_node("writer", writer_node) # Entry point → supervisor graph.set_entry_point("supervisor") # Supervisor routes to workers or END graph.add_conditional_edges( "supervisor", lambda state: state["next_agent"], { "researcher": "researcher", "analyst": "analyst", "writer": "writer", "FINISH": END, }, ) # Workers always report back to supervisor graph.add_edge("researcher", "supervisor") graph.add_edge("analyst", "supervisor") graph.add_edge("writer", "supervisor") # Compile the graph multi_agent = graph.compile() # Run the multi-agent system result = multi_agent.invoke({ "messages": [ HumanMessage(content="Research the latest trends in LLM efficiency, analyze the key metrics, and write a summary report.") ], "next_agent": "", }) # The supervisor orchestrated: researcher → analyst → writer → FINISH print(result["messages"][-1].content)
• Explicit state: You define exactly what's in your agent state (messages, intermediate results, metadata)
• Conditional routing: Supervisor patterns, branching, loops, and early termination via graph edges
• Persistence: Built-in checkpointers (Memory, Redis, PostgreSQL) for long-running workflows
• Human-in-the-loop: Add
interrupt_before / interrupt_after on any node for human review
• Streaming: Stream events from any node in the graph in real-time
Human-in-the-Loop Approval Pattern
For high-stakes actions (creating tickets, sending emails, deploying code), add human approval checkpoints:
from langgraph.checkpoint.memory import MemorySaver # Create agent with human approval before specific tools agent_with_approval = create_react_agent( model=llm, tools=[search_docs, create_ticket, get_user_info], prompt="You are an IT support agent. Ask to create tickets when appropriate.", checkpointer=MemorySaver(), interrupt_before=["tools"], # Pause BEFORE any tool execution ) config = {"configurable": {"thread_id": "approval-session"}} # Start the agent — it will plan a tool call, then pause result = agent_with_approval.invoke( {"messages": [("human", "Create a high-priority ticket for the API outage")]}, config=config, ) # At this point, the agent has PLANNED the tool call but NOT executed it # You can inspect what it wants to do: pending_tool_calls = result["messages"][-1].tool_calls print(f"Agent wants to call: {pending_tool_calls[0]['name']}") print(f"With args: {pending_tool_calls[0]['args']}") # If approved, resume execution (the agent will execute the tool and continue) final_result = agent_with_approval.invoke(None, config=config) print(final_result["messages"][-1].content)
Custom LangGraph Agent (Full Control)
For maximum flexibility, build your agent graph from scratch instead of using create_react_agent:
from typing import Annotated from typing_extensions import TypedDict import operator from langchain_core.messages import BaseMessage, ToolMessage from langgraph.graph import StateGraph, END # ─── Define State ─── class CustomAgentState(TypedDict): messages: Annotated[list[BaseMessage], operator.add] iteration_count: int # ─── Agent Node: reason and possibly call tools ─── def agent_node(state: CustomAgentState) -> CustomAgentState: llm_with_tools = llm.bind_tools([search_docs, create_ticket]) response = llm_with_tools.invoke(state["messages"]) return { "messages": [response], "iteration_count": state["iteration_count"] + 1, } # ─── Tool Node: execute any pending tool calls ─── tool_map = {"search_docs": search_docs, "create_ticket": create_ticket} def tool_node(state: CustomAgentState) -> CustomAgentState: last_msg = state["messages"][-1] tool_messages = [] for tc in last_msg.tool_calls: result = tool_map[tc["name"]].invoke(tc["args"]) tool_messages.append(ToolMessage(content=str(result), tool_call_id=tc["id"])) return {"messages": tool_messages, "iteration_count": state["iteration_count"]} # ─── Router: decide if we should call tools, continue, or stop ─── def should_continue(state: CustomAgentState) -> str: last_msg = state["messages"][-1] # Safety: stop after 10 iterations to prevent infinite loops if state["iteration_count"] >= 10: return "end" # If the model made tool calls, execute them if hasattr(last_msg, "tool_calls") and last_msg.tool_calls: return "tools" # Otherwise, the agent is done return "end" # ─── Build Graph ─── graph = StateGraph(CustomAgentState) graph.add_node("agent", agent_node) graph.add_node("tools", tool_node) graph.set_entry_point("agent") graph.add_conditional_edges( "agent", should_continue, {"tools": "tools", "end": END}, ) graph.add_edge("tools", "agent") # After tools → back to agent custom_agent = graph.compile() # Run result = custom_agent.invoke({ "messages": [("human", "Search for deployment best practices and create a ticket to update our runbook")], "iteration_count": 0, }) print(result["messages"][-1].content)
• Max iterations: Always set a loop limit to prevent runaway agents (and runaway costs)
• Timeout: Set a
request_timeout on the LLM to prevent hung calls
• Observability: Use callbacks or LangSmith to trace every step of the agent's reasoning
• Tool safety: Validate tool inputs, use least-privilege credentials, audit tool invocations
• Fallbacks: Use
.with_fallbacks() on the LLM so agent survives model outages
• Human-in-the-loop: For destructive actions, add
interrupt_before checkpoints
When to Use What
| Approach | Complexity | Use When |
|---|---|---|
prompt | llm.bind_tools() |
Low | Single-turn tool call, no looping needed |
create_react_agent() |
Medium | Standard agent with tools that needs to loop (most use cases) |
Custom StateGraph |
High | Custom state, branching logic, multi-agent, human-in-the-loop |
| Supervisor + Workers | High | Multiple specialist agents coordinating on complex tasks |