Back to handbooks index

LangChain Practitioner's Handbook

A production-ready reference covering enterprise LLM integration, RAG pipelines, memory, tools, MCP, guardrails, and multi-agent architectures with LangGraph.

LangChain 0.3+ · LCEL langchain-core · langgraph Azure OpenAI · Vertex AI March 2026

Table of Contents

This handbook is organized into seven production-focused modules. Each module contains full, copy-pasteable code using the latest LangChain Expression Language (LCEL) and langchain-core imports.

📋
Convention: All code uses LCEL pipe syntax (prompt | model | parser), modern langchain-core imports, and avoids deprecated LLMChain, AgentType.ZERO_SHOT_REACT_DESCRIPTION, or legacy chain constructors. Type hints are included for production readiness.

Installation & Package Map

LangChain 0.3+ uses a modular package architecture. Install only what you need.

# Core framework — always needed
pip install langchain-core langchain

# Enterprise model providers
pip install langchain-openai          # Azure OpenAI + OpenAI
pip install langchain-google-vertexai # Google Vertex AI

# RAG essentials
pip install langchain-chroma          # ChromaDB vector store
pip install langchain-community       # Document loaders, misc integrations

# Agents & multi-agent
pip install langgraph                 # State-graph multi-agent orchestration

# Memory persistence
pip install langchain-redis           # Redis-backed chat history
pip install langchain-postgres        # PostgreSQL-backed chat history

# MCP integration
pip install langchain-mcp-adapters    # Bridge MCP servers ↔ LangChain tools
PackagePurposeKey Imports
langchain-core Base abstractions: runnables, prompts, output parsers, callbacks ChatPromptTemplate, StrOutputParser, RunnablePassthrough
langchain-openai OpenAI + Azure OpenAI chat models and embeddings AzureChatOpenAI, AzureOpenAIEmbeddings
langchain-google-vertexai Google Vertex AI (Gemini) chat models ChatVertexAI, VertexAIEmbeddings
langgraph Multi-agent state-graph orchestration StateGraph, END, MessagesState
langchain-mcp-adapters Model Context Protocol bridge MultiServerMCPClient

Module 1 — Core Models & Enterprise Integration

Enterprise LLM deployments require secure credential handling, region-specific endpoints, and grounding against authoritative data sources. This module covers Azure OpenAI, Google Vertex AI, and enterprise grounding patterns.

🔑 Credentials 🏗️ Model Init 💬 Chat Call 🌍 Grounding

1.1 — Azure OpenAI Setup

Use AzureChatOpenAI from langchain-openai. Credentials are loaded from environment variables — never hardcode secrets. The azure_endpoint, api_version, and azure_deployment map to your Azure OpenAI resource.

Environment Variables

# .env — load via python-dotenv or set in your shell / CI pipeline
AZURE_OPENAI_API_KEY=your-api-key-here
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_VERSION=2024-12-01-preview
AZURE_OPENAI_DEPLOYMENT=gpt-4o
AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT=text-embedding-3-large

Basic Chat Call

import os
from dotenv import load_dotenv
from langchain_openai import AzureChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage

# Load credentials from .env — keeps secrets out of source code
load_dotenv()

# Initialize the Azure OpenAI chat model
llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    # api_key is auto-read from AZURE_OPENAI_API_KEY env var
    temperature=0.0,       # Deterministic for enterprise use
    max_tokens=1024,
)

# Simple invocation with typed messages
messages: list = [
    SystemMessage(content="You are a helpful enterprise assistant."),
    HumanMessage(content="Summarize our Q4 revenue in three bullet points."),
]

response = llm.invoke(messages)
print(response.content)
💡
Managed Identity: For production Azure deployments, replace the API key with DefaultAzureCredential from azure-identity. Pass azure_ad_token_provider to AzureChatOpenAI to use workload identity, managed identity, or service principal auth without any stored secrets.

Azure AD Token Provider (Managed Identity)

import os
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from langchain_openai import AzureChatOpenAI

# DefaultAzureCredential auto-detects managed identity, CLI login, etc.
credential = DefaultAzureCredential()
token_provider = get_bearer_token_provider(
    credential,
    "https://cognitiveservices.azure.com/.default",
)

llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    azure_ad_token_provider=token_provider,  # No API key needed
    temperature=0.0,
)

response = llm.invoke("What is our current Azure spend?")
print(response.content)

LCEL Chain — Prompt → Model → Parser

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Define a reusable prompt template
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a concise enterprise analyst."),
    ("human", "{question}"),
])

# LCEL pipe: prompt → model → string output parser
chain = prompt | llm | StrOutputParser()

# Invoke with a dict of template variables
result: str = chain.invoke({"question": "Explain Azure OpenAI rate limits."})
print(result)

1.2 — Google Vertex AI Setup

Vertex AI uses Google Cloud Application Default Credentials (ADC). Authenticate via gcloud auth application-default login locally, or use a service account in production. The ChatVertexAI wrapper connects to Gemini models.

Authentication

# Terminal: authenticate locally (one-time)
gcloud auth application-default login

# Or set a service account key (CI/CD environments)
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json

# Ensure your project is set
gcloud config set project your-gcp-project-id

Vertex AI Chat Model

from langchain_google_vertexai import ChatVertexAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Initialize Gemini 2.0 Flash via Vertex AI
# Credentials auto-loaded from ADC (gcloud login or service account)
llm = ChatVertexAI(
    model_name="gemini-2.0-flash",
    project="your-gcp-project-id",
    location="us-central1",
    temperature=0.0,
    max_output_tokens=1024,
)

# Build an LCEL chain identical to the Azure example
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a GCP infrastructure expert."),
    ("human", "{question}"),
])

chain = prompt | llm | StrOutputParser()

result: str = chain.invoke({"question": "Explain Vertex AI quotas for Gemini 2.0."})
print(result)
ℹ️
Model names: Use gemini-2.0-flash for fast, cost-effective tasks or gemini-2.5-pro for complex reasoning. Both support tool calling natively via the LangChain wrapper.

1.3 — Enterprise Grounding Samples

Grounding connects your LLM to authoritative data sources so responses are factual and traceable. Below are patterns for both Azure (via Azure AI Search) and Google (via Vertex AI grounding with Google Search).

Azure Grounding — Azure AI Search RAG retriever

Pattern: Use Azure AI Search as a retriever to ground Azure OpenAI responses against your indexed enterprise documents.

Hybrid search Semantic ranking Enterprise docs

Azure AI Search Grounding

import os
from dotenv import load_dotenv
from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings
from langchain_community.vectorstores.azuresearch import AzureSearch
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

load_dotenv()

# 1. Embeddings model for vectorizing queries
embeddings = AzureOpenAIEmbeddings(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
)

# 2. Azure AI Search as a vector store / retriever
vector_store = AzureSearch(
    azure_search_endpoint=os.environ["AZURE_SEARCH_ENDPOINT"],
    azure_search_key=os.environ["AZURE_SEARCH_KEY"],
    index_name="enterprise-docs",
    embedding_function=embeddings.embed_query,
    search_type="hybrid",  # Combines keyword + vector for best recall
)
retriever = vector_store.as_retriever(search_kwargs={"k": 5})

# 3. LLM for generating grounded answers
llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    temperature=0.0,
)

# 4. Grounded prompt — instructs the model to use ONLY retrieved context
prompt = ChatPromptTemplate.from_messages([
    ("system", """Answer the question using ONLY the provided context.
If the context does not contain the answer, say "I don't have enough information."
Always cite the source document.

Context:
{context}"""),
    ("human", "{question}"),
])

# Helper: format retrieved documents into a single string
def format_docs(docs) -> str:
    return "\n\n".join(
        f"[{doc.metadata.get('source', 'unknown')}]: {doc.page_content}"
        for doc in docs
    )

# 5. LCEL RAG chain: retrieve → format → prompt → LLM → parse
grounded_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

# Invoke — the response is grounded against Azure AI Search documents
answer = grounded_chain.invoke("What is our data retention policy?")
print(answer)

Vertex AI Grounding — Google Search Native grounding

Pattern: Use Vertex AI's built-in Google Search grounding to augment Gemini responses with real-time web data.

Real-time web Dynamic grounding Zero infra

Vertex AI with Google Search Grounding

from langchain_google_vertexai import ChatVertexAI
from google.cloud.aiplatform_v1beta1.types import (
    GoogleSearchRetrieval,
    GroundingSpec,
    Tool as VertexTool,
)
from langchain_core.messages import HumanMessage

# Configure Google Search grounding as a Vertex AI tool
google_search_tool = VertexTool(
    google_search_retrieval=GoogleSearchRetrieval(
        disable_attribution=False,  # Include source citations
    )
)

grounding_spec = GroundingSpec(grounding_tools=[google_search_tool])

# Initialize ChatVertexAI with grounding enabled
llm = ChatVertexAI(
    model_name="gemini-2.0-flash",
    project="your-gcp-project-id",
    location="us-central1",
    temperature=0.0,
    additional_headers={},
)

# Pass grounding config via model_kwargs at invocation time
response = llm.invoke(
    [HumanMessage(content="What were Google's Q3 2025 earnings?")],
    model_kwargs={"tools": [google_search_tool]},
)

print(response.content)

# Access grounding metadata (sources) from the response
if hasattr(response, "response_metadata"):
    grounding_meta = response.response_metadata.get("grounding_metadata")
    if grounding_meta:
        for chunk in grounding_meta.get("grounding_chunks", []):
            print(f"  Source: {chunk.get('web', {}).get('uri', 'N/A')}")
💡
Dynamic grounding threshold: Vertex AI supports a dynamic_retrieval_config with a threshold (0.0–1.0) to control when Google Search is triggered. A lower threshold means the model grounds more aggressively. This is configured at the API level via DynamicRetrievalConfig.

Side-by-Side Comparison

AspectAzure AI Search GroundingVertex AI Google Search
Data source Your indexed enterprise documents Live web via Google Search
Latency Low (pre-indexed, hybrid retrieval) Moderate (real-time web fetch)
Privacy Data stays in your Azure tenant Queries hit Google Search APIs
Best for Internal knowledge bases, compliance docs, policy lookup Real-time facts, public information, market data
Setup effort Requires index creation + embedding pipeline Zero setup — built into Vertex AI
LangChain pattern LCEL retriever chain Native tool via model_kwargs
Production tip: For most enterprise use cases, combine both approaches. Use Azure AI Search for internal/private data grounding, and Vertex AI Google Search for supplementary real-time public information. Route via a LangGraph decision node that picks the right grounding source based on query classification.

Module 2 — Retrieval-Augmented Generation (RAG)

RAG is the dominant pattern for grounding LLMs against your own data without fine-tuning. This module walks through the full pipeline — from raw documents to a production retrieval chain — using LCEL and modern langchain-core abstractions.

📄 Load Docs ✂️ Split Text 🔢 Embed 🗄️ Vector Store 🔍 Retrieve 💬 Generate

2.1 — Document Loading

LangChain provides loaders for PDFs, web pages, CSVs, Notion, and more. Each loader returns a list of Document objects with page_content and metadata.

from langchain_community.document_loaders import (
    PyPDFLoader,
    TextLoader,
    WebBaseLoader,
    CSVLoader,
)

# Load a PDF — each page becomes one Document
pdf_docs = PyPDFLoader("docs/architecture-guide.pdf").load()
print(f"Loaded {len(pdf_docs)} pages from PDF")

# Load a plain text file
txt_docs = TextLoader("docs/changelog.md", encoding="utf-8").load()

# Load a web page (requires beautifulsoup4)
web_docs = WebBaseLoader("https://docs.langchain.com/overview").load()

# Load a CSV — each row becomes one Document
csv_docs = CSVLoader("data/customers.csv").load()

# Combine all sources into a unified corpus
all_docs = pdf_docs + txt_docs + web_docs + csv_docs
print(f"Total documents loaded: {len(all_docs)}")
💡
Lazy loading: For large datasets, use .lazy_load() instead of .load() to get a generator that yields documents one at a time, avoiding memory spikes.

2.2 — Text Splitting

Documents are rarely the right size for embedding. Splitters break them into semantically coherent chunks with configurable overlap to preserve context across boundaries.

from langchain_text_splitters import RecursiveCharacterTextSplitter

# RecursiveCharacterTextSplitter tries to split on paragraphs, then
# sentences, then words — preserving semantic units as much as possible.
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,        # Max characters per chunk
    chunk_overlap=200,       # Overlap to retain context across chunks
    separators=[             # Priority order of split boundaries
        "\n\n",               # Paragraph breaks first
        "\n",                 # Line breaks second
        ". ",                 # Sentence boundaries third
        " ",                  # Word boundaries last resort
    ],
    length_function=len,
)

# Split all loaded documents — metadata is preserved on each chunk
chunks = text_splitter.split_documents(all_docs)
print(f"Split into {len(chunks)} chunks")

# Inspect a chunk
print(f"Chunk 0 length: {len(chunks[0].page_content)} chars")
print(f"Chunk 0 metadata: {chunks[0].metadata}")
ParameterGuidanceTypical Range
chunk_size Larger chunks retain more context but reduce retrieval precision 500 – 2000 chars
chunk_overlap Overlap prevents information loss at chunk boundaries 10–20% of chunk_size
separators Tune to your doc format (e.g., add "## " for Markdown headers) Paragraphs → sentences → words

2.3 — Embeddings

Embeddings convert text chunks into dense vectors for similarity search. Choose the embedding model that matches your vector store and retrieval needs.

import os
from dotenv import load_dotenv

load_dotenv()

# ── Option A: Azure OpenAI Embeddings ─────────────────────────────
from langchain_openai import AzureOpenAIEmbeddings

azure_embeddings = AzureOpenAIEmbeddings(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
)

# ── Option B: Google Vertex AI Embeddings ─────────────────────────
from langchain_google_vertexai import VertexAIEmbeddings

vertex_embeddings = VertexAIEmbeddings(
    model_name="text-embedding-005",
    project="your-gcp-project-id",
    location="us-central1",
)

# ── Option C: Open-source / local (e.g., sentence-transformers) ───
from langchain_community.embeddings import HuggingFaceEmbeddings

local_embeddings = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2",
    model_kwargs={"device": "cpu"},  # Use "cuda" for GPU
)

# Pick one for the rest of this module
embeddings = azure_embeddings

2.4 — Vector Store Initialization

The vector store indexes your embedded chunks for fast similarity retrieval. Chroma is shown here for local development; swap in Pinecone, Weaviate, pgvector, or Azure AI Search for production.

from langchain_chroma import Chroma

# Create a persistent Chroma vector store from the chunks
# This embeds all chunks and stores them on disk at ./chroma_db
vector_store = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    persist_directory="./chroma_db",       # Persistent storage path
    collection_name="enterprise_docs",    # Logical namespace
)

print(f"Indexed {vector_store._collection.count()} chunks in Chroma")

# To reload an existing store (no re-embedding needed)
vector_store = Chroma(
    persist_directory="./chroma_db",
    embedding_function=embeddings,
    collection_name="enterprise_docs",
)
ℹ️
Production vector stores: For production, use a managed service — langchain-pinecone for Pinecone, langchain-postgres with pgvector for PostgreSQL, or langchain-community's AzureSearch for Azure AI Search (shown in Module 1). Chroma is ideal for local prototyping and tests.

2.5 — Standard RAG Retrieval Chain (LCEL)

This is the canonical RAG pattern: retrieve relevant chunks, inject them into a prompt, and generate a grounded answer. The entire pipeline is a single LCEL expression.

from langchain_openai import AzureChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

# 1. Create a retriever from the vector store
retriever = vector_store.as_retriever(
    search_type="similarity",    # or "mmr" for max marginal relevance
    search_kwargs={"k": 5},       # Return top-5 most relevant chunks
)

# 2. LLM — reuse the Azure OpenAI model from Module 1
llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    temperature=0.0,
)

# 3. RAG prompt — instructs the model to answer from retrieved context only
rag_prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a helpful assistant. Answer the user's question
using ONLY the provided context. If the answer is not in the context,
say "I don't have enough information to answer that."

Context:
{context}"""),
    ("human", "{question}"),
])

# 4. Helper to format retrieved Document objects into a single string
def format_docs(docs: list) -> str:
    """Join document contents with source metadata for traceability."""
    return "\n\n---\n\n".join(
        f"[Source: {d.metadata.get('source', 'unknown')}]\n{d.page_content}"
        for d in docs
    )

# 5. LCEL RAG chain
#    - "context" branch: query → retriever → format_docs
#    - "question" branch: query passes through unchanged
#    Both feed into the prompt → LLM → string parser
rag_chain = (
    {
        "context": retriever | format_docs,
        "question": RunnablePassthrough(),
    }
    | rag_prompt
    | llm
    | StrOutputParser()
)

# 6. Invoke the chain
answer: str = rag_chain.invoke("What are the key architectural decisions in our system?")
print(answer)
💡
Streaming: Replace .invoke() with .stream() to get token-by-token output: for chunk in rag_chain.stream("question"): print(chunk, end=""). Works out of the box with LCEL — no code changes needed to the chain itself.

Full Pipeline — End-to-End Script

Here's every step combined in a single, copy-pasteable script:

"""
Complete RAG pipeline — load, split, embed, store, retrieve, generate.
Run: pip install langchain langchain-openai langchain-chroma langchain-community
"""
import os
from dotenv import load_dotenv
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

load_dotenv()

# ── Load ───────────────────────────────────────────────────────────
docs = PyPDFLoader("docs/architecture-guide.pdf").load()

# ── Split ──────────────────────────────────────────────────────────
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = splitter.split_documents(docs)

# ── Embed + Store ─────────────────────────────────────────────────
embeddings = AzureOpenAIEmbeddings(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
)
store = Chroma.from_documents(chunks, embeddings, persist_directory="./chroma_db")

# ── Retrieve + Generate ───────────────────────────────────────────
retriever = store.as_retriever(search_kwargs={"k": 5})
llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    temperature=0.0,
)

prompt = ChatPromptTemplate.from_messages([
    ("system", "Answer using ONLY this context:\n{context}"),
    ("human", "{question}"),
])

def format_docs(docs):
    return "\n\n".join(d.page_content for d in docs)

chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt | llm | StrOutputParser()
)

print(chain.invoke("Summarize the key architectural decisions."))

2.6 — Advanced: Retrieval with Metadata Filtering

In production, you rarely want to search the entire corpus. Metadata filters let you scope retrieval to specific departments, document types, date ranges, or access levels — critical for multi-tenant and compliance-sensitive systems.

Filtered Retrieval search_kwargs → filter

Use when: You need to restrict results to a specific department, document type, date range, or tenant ID — common in enterprise multi-tenant RAG systems.

Multi-tenant Access control Scoped search Compliance

Adding Rich Metadata During Ingestion

from langchain_core.documents import Document

# Enrich chunks with structured metadata during ingestion.
# This metadata enables precise filtered retrieval later.
enriched_chunks: list[Document] = []
for chunk in chunks:
    # Preserve original metadata and add business context
    chunk.metadata.update({
        "department": "engineering",       # Business unit tag
        "doc_type": "architecture",        # Document classification
        "year": 2025,                      # Temporal filter
        "access_level": "internal",       # Access control tier
    })
    enriched_chunks.append(chunk)

# Re-index with enriched metadata
filtered_store = Chroma.from_documents(
    documents=enriched_chunks,
    embedding=embeddings,
    persist_directory="./chroma_filtered_db",
    collection_name="enterprise_docs_filtered",
)

Retrieving with Metadata Filters

# ── Filter 1: Single field — only "engineering" department docs ────
eng_retriever = filtered_store.as_retriever(
    search_type="similarity",
    search_kwargs={
        "k": 5,
        "filter": {"department": "engineering"},
    },
)

# ── Filter 2: Multiple fields (AND logic) ─────────────────────────
scoped_retriever = filtered_store.as_retriever(
    search_type="similarity",
    search_kwargs={
        "k": 5,
        "filter": {
            "department": "engineering",
            "year": 2025,
            "access_level": "internal",
        },
    },
)

# ── Filter 3: Chroma advanced operators ($gt, $in, $and, $or) ─────
advanced_retriever = filtered_store.as_retriever(
    search_type="similarity",
    search_kwargs={
        "k": 5,
        "filter": {
            "$and": [
                {"department": {"$in": ["engineering", "product"]}},
                {"year": {"$gte": 2024}},
            ]
        },
    },
)

Filtered RAG Chain — Complete Example

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

# Build a RAG chain that only retrieves from engineering docs since 2024
filtered_rag_chain = (
    {
        "context": advanced_retriever | format_docs,
        "question": RunnablePassthrough(),
    }
    | rag_prompt
    | llm
    | StrOutputParser()
)

# This query will only search engineering + product docs from 2024+
answer = filtered_rag_chain.invoke(
    "What microservices patterns did we adopt in the latest architecture review?"
)
print(answer)

Dynamic Filters — User-Driven Scoping

In real apps, filters come from the user's session (role, department, tenant). Here's a pattern using RunnableLambda to inject dynamic filters at runtime:

from langchain_core.runnables import RunnableLambda

def get_filtered_retriever(input_dict: dict):
    """Create a retriever scoped to the user's department and access level."""
    user_filters = {
        "department": input_dict["user_department"],
        "access_level": input_dict["user_access_level"],
    }
    retriever = filtered_store.as_retriever(
        search_kwargs={"k": 5, "filter": user_filters}
    )
    # Run the retriever with the user's actual question
    docs = retriever.invoke(input_dict["question"])
    return format_docs(docs)

# Dynamic RAG chain — filters are resolved at invocation time
dynamic_rag_chain = (
    {
        "context": RunnableLambda(get_filtered_retriever),
        "question": lambda x: x["question"],
    }
    | rag_prompt
    | llm
    | StrOutputParser()
)

# Invoke with user context — retrieval is automatically scoped
answer = dynamic_rag_chain.invoke({
    "question": "What's the deployment process for our API gateway?",
    "user_department": "engineering",
    "user_access_level": "internal",
})
print(answer)
Filter syntax varies by store: The filter dict syntax shown above is Chroma-specific. Pinecone uses a similar syntax but with its own operators. pgvector/PostgreSQL uses SQL-style filters. Azure AI Search uses OData filter expressions. Always consult your vector store's docs for the exact filter grammar.

Retrieval Strategy Comparison

StrategyWhen to UseLangChain Config
Similarity Default — nearest neighbors by cosine distance search_type="similarity"
MMR When you need diverse results (avoid near-duplicates) search_type="mmr", fetch_k=20, lambda_mult=0.5
Similarity + Score When you need a relevance threshold (discard low-quality matches) search_type="similarity_score_threshold", score_threshold=0.7
Filtered Multi-tenant, access-controlled, or time-bounded retrieval search_kwargs={"filter": {...}}

Module 3 — Memory Management

LLMs are stateless — every call starts from scratch. Memory gives your chains conversational continuity. LangChain provides two tiers: short-term (within a session) and long-term (across sessions, persisted to a database). Both integrate cleanly with LCEL via RunnableWithMessageHistory.

💬 User Message 📖 Load History 🧠 LLM (with context) 💾 Save to Store

3.1 — Short-Term Memory (In-Session Conversation History)

Short-term memory tracks the current conversation using a session_id. The RunnableWithMessageHistory wrapper automatically loads past messages before each call and saves new ones after. For in-memory (non-persistent) use cases, ChatMessageHistory is the simplest store.

Core Concept: RunnableWithMessageHistory

import os
from dotenv import load_dotenv
from langchain_openai import AzureChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory

load_dotenv()

# 1. LLM setup
llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    temperature=0.3,
)

# 2. Prompt with a placeholder for injected chat history
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant. Use the conversation history to maintain context."),
    MessagesPlaceholder(variable_name="history"),  # ← history is injected here
    ("human", "{input}"),
])

# 3. LCEL chain — no memory awareness needed at the chain level
chain = prompt | llm | StrOutputParser()

# 4. In-memory session store — maps session_id → ChatMessageHistory
#    In production, replace this dict with a database-backed store.
session_store: dict[str, ChatMessageHistory] = {}

def get_session_history(session_id: str) -> BaseChatMessageHistory:
    """Return or create the message history for a given session."""
    if session_id not in session_store:
        session_store[session_id] = ChatMessageHistory()
    return session_store[session_id]

# 5. Wrap the chain with message history management
chain_with_history = RunnableWithMessageHistory(
    chain,
    get_session_history,
    input_messages_key="input",       # Key in the input dict for user message
    history_messages_key="history",   # Matches the MessagesPlaceholder name
)

# 6. Invoke with a session_id — history is auto-managed
config = {"configurable": {"session_id": "user-123-session-abc"}}

# Turn 1
r1 = chain_with_history.invoke({"input": "My name is Alice and I work in the data team."}, config=config)
print(f"Turn 1: {r1}")

# Turn 2 — the model remembers Alice's name and team
r2 = chain_with_history.invoke({"input": "What team did I say I work in?"}, config=config)
print(f"Turn 2: {r2}")  # → "You said you work in the data team."

# Turn 3 — different session = fresh history
config_new = {"configurable": {"session_id": "user-456-session-xyz"}}
r3 = chain_with_history.invoke({"input": "Do you know my name?"}, config=config_new)
print(f"Turn 3 (new session): {r3}")  # → "I don't know your name yet."
💡
Streaming with history: chain_with_history.stream({"input": "..."}, config=config) works out of the box — history is loaded/saved while tokens stream to the client.

How It Works Under the Hood

StepWhat Happens
1. Before invoke get_session_history(session_id) retrieves the history object
2. Inject Past messages are injected into the MessagesPlaceholder("history") slot
3. LLM call The chain runs with the full conversation context
4. After invoke The user input and AI response are appended to the history for next turn

Trimming History (Token Budget Management)

Unbounded history will eventually exceed the model's context window. Use trim_messages to keep the history within a token budget:

from langchain_core.messages import trim_messages

# Trim strategy: keep the system message + last N tokens worth of history
trimmer = trim_messages(
    max_tokens=4000,                    # Max tokens to keep in history
    strategy="last",                     # Keep the most recent messages
    token_counter=llm,                   # Use the LLM's tokenizer for accuracy
    include_system=True,                # Always preserve the system message
    allow_partial=False,                # Don't split messages mid-content
)

# Insert the trimmer into the chain — it runs BEFORE the prompt
chain_with_trimming = (
    trimmer      # ← trims the message list first
    | prompt
    | llm
    | StrOutputParser()
)

# Wrap with history as before
trimmed_chain_with_history = RunnableWithMessageHistory(
    chain_with_trimming,
    get_session_history,
    input_messages_key="input",
    history_messages_key="history",
)
ℹ️
Alternatives to trimming: For smarter compression, use a summary memory pattern — periodically summarize older messages into a single summary message, keeping recent turns verbatim. This preserves long-range context while staying within token limits.

3.2 — Long-Term Memory (Persistent Across Sessions)

Long-term memory persists conversation history to external databases so it survives server restarts, session timeouts, and re-deployments. LangChain provides drop-in history backends for Redis, PostgreSQL, and more. Simply swap the get_session_history function — the rest of the chain stays identical.

Redis-Backed Memory langchain-redis

Best for: Fast read/write, session-oriented apps, microservices with shared state. Redis TTL auto-expires old sessions.

Sub-ms latency TTL expiration Horizontally scalable

Option A: Redis

# pip install langchain-redis redis
from langchain_redis import RedisChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory

# Redis connection URL — use rediss:// for TLS in production
REDIS_URL = "redis://localhost:6379/0"

def get_redis_history(session_id: str) -> BaseChatMessageHistory:
    """Persistent history backed by Redis. Messages survive restarts."""
    return RedisChatMessageHistory(
        session_id=session_id,
        redis_url=REDIS_URL,
        ttl=86400,  # Auto-expire after 24 hours (optional)
    )

# Wrap your existing LCEL chain — zero changes to the chain itself
redis_chain = RunnableWithMessageHistory(
    chain,                              # Same chain from 3.1
    get_redis_history,
    input_messages_key="input",
    history_messages_key="history",
)

# Session persists across app restarts — Redis stores the messages
config = {"configurable": {"session_id": "user-123-persistent"}}
response = redis_chain.invoke({"input": "Remember: our API rate limit is 1000 req/min."}, config=config)
print(response)

# Later (even after restart) the model still knows the rate limit
response = redis_chain.invoke({"input": "What's our API rate limit?"}, config=config)
print(response)  # → "Your API rate limit is 1000 requests per minute."
PostgreSQL-Backed Memory langchain-postgres

Best for: Apps already using PostgreSQL, compliance requirements for durable storage, complex queries on history.

ACID durable SQL queryable Existing infra

Option B: PostgreSQL

# pip install langchain-postgres psycopg[binary]
from langchain_postgres import PostgresChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
import psycopg

# Connection string — use a secrets manager in production, never hardcode
PG_CONN = "postgresql://user:password@localhost:5432/chatdb"

# Create the messages table (run once at startup)
PostgresChatMessageHistory.create_tables(psycopg.connect(PG_CONN), table_name="chat_history")

def get_pg_history(session_id: str) -> BaseChatMessageHistory:
    """Persistent history stored in a PostgreSQL table."""
    return PostgresChatMessageHistory(
        table_name="chat_history",
        session_id=session_id,
        sync_connection=psycopg.connect(PG_CONN),
    )

# Same pattern — swap the history factory, chain stays unchanged
pg_chain = RunnableWithMessageHistory(
    chain,
    get_pg_history,
    input_messages_key="input",
    history_messages_key="history",
)

config = {"configurable": {"session_id": "user-789-durable"}}
response = pg_chain.invoke({"input": "I prefer Python over JavaScript."}, config=config)
print(response)
Connection management: In production, use a connection pool (e.g., psycopg_pool.ConnectionPool) instead of creating a new connection per request. The snippet above uses psycopg.connect() for clarity.
Vector Store-Based Long-Term Memory Semantic recall

Best for: Recalling relevant past conversations by meaning, not just replaying the last N turns. Ideal for personalization and user preference recall across many sessions.

Semantic retrieval Cross-session Personalization

Option C: Vector Store Memory (Semantic Long-Term Recall)

Instead of replaying raw message history, this pattern embeds past interactions and retrieves the most relevant ones via similarity search. This is powerful for agents that need to recall facts from hundreds of past sessions.

import os
from datetime import datetime
from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda

embeddings = AzureOpenAIEmbeddings(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
)

# A dedicated vector store for long-term memory (separate from RAG docs)
memory_store = Chroma(
    collection_name="long_term_memory",
    embedding_function=embeddings,
    persist_directory="./memory_db",
)

# ── Save a memory ─────────────────────────────────────────────────
def save_memory(user_id: str, user_msg: str, ai_msg: str) -> None:
    """Embed and store a conversation turn for long-term recall."""
    memory_doc = Document(
        page_content=f"User: {user_msg}\nAssistant: {ai_msg}",
        metadata={
            "user_id": user_id,
            "timestamp": datetime.now().isoformat(),
        },
    )
    memory_store.add_documents([memory_doc])

# ── Recall relevant memories ──────────────────────────────────────
def recall_memories(input_dict: dict) -> str:
    """Retrieve past interactions semantically relevant to the current query."""
    retriever = memory_store.as_retriever(
        search_kwargs={
            "k": 3,
            "filter": {"user_id": input_dict["user_id"]},
        }
    )
    docs = retriever.invoke(input_dict["input"])
    if not docs:
        return "No relevant past interactions found."
    return "\n---\n".join(d.page_content for d in docs)

# ── Chain with semantic memory ────────────────────────────────────
llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    temperature=0.3,
)

memory_prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a personalized assistant with long-term memory.
Use these relevant past interactions to personalize your response:

{memories}"""),
    ("human", "{input}"),
])

memory_chain = (
    {
        "memories": RunnableLambda(recall_memories),
        "input": lambda x: x["input"],
    }
    | memory_prompt
    | llm
    | StrOutputParser()
)

# ── Usage: Save some memories, then query ─────────────────────────
save_memory("alice", "I prefer dark mode in all my tools.", "Noted! I'll remember your preference for dark mode.")
save_memory("alice", "Our tech stack is FastAPI + React + PostgreSQL.", "Got it — FastAPI backend, React frontend, PostgreSQL database.")

# Later — even in a new session — the model recalls relevant past context
result = memory_chain.invoke({
    "user_id": "alice",
    "input": "Can you suggest a deployment setup for our stack?",
})
print(result)  # → References FastAPI + React + PostgreSQL from memory

Memory Strategy Comparison

StrategyScopeBackendBest For
ChatMessageHistory (in-memory) Single process, lost on restart Python dict Prototyping, tests
Redis Cross-process, TTL expirable Redis server Session-oriented apps, microservices
PostgreSQL Durable, queryable via SQL PostgreSQL Compliance, audit trails, existing PG infra
Vector Store Semantic cross-session recall Chroma, Pinecone, pgvector Personalization, long-running agents, user preference recall
💡
Hybrid approach: Production systems often combine strategies — Redis for fast short-term session history (last N turns) plus a vector store for long-term semantic recall. The prompt receives both recent history and retrieved long-term memories, giving the model both recency and relevance.

Module 4 — Tools & Model Context Protocol

Tools give LLMs the ability to take actions — call APIs, query databases, run calculations, or interact with external systems. LangChain provides a clean abstraction: define a tool, bind it to a model, and let the model decide when and how to invoke it. The Model Context Protocol (MCP) extends this by standardizing how tools and context are exposed across platforms.

🔧 Define Tool 🔗 Bind to LLM 🤖 Model Decides ⚡ Execute 💬 Return Result

4.1 — Adding Tools

LangChain offers two primary ways to create tools: the @tool decorator for simple functions and StructuredTool for more control over schema and validation.

Method 1: @tool Decorator (Simple & Fast)

from langchain_core.tools import tool

@tool
def get_weather(city: str) -> str:
    """Get the current weather for a given city.

    Args:
        city: The name of the city to look up weather for.
    """
    # In production, call a real weather API here
    weather_data = {
        "London": "15°C, cloudy",
        "New York": "22°C, sunny",
        "Tokyo": "18°C, light rain",
    }
    return weather_data.get(city, f"Weather data not available for {city}")

@tool
def calculate(expression: str) -> str:
    """Evaluate a mathematical expression safely.

    Args:
        expression: A mathematical expression to evaluate, e.g. '2 + 2 * 3'.
    """
    # Use a safe evaluator — never use eval() with untrusted input
    allowed_chars = set("0123456789+-*/.(). ")
    if not all(c in allowed_chars for c in expression):
        return "Error: expression contains invalid characters."
    try:
        result = round(float(eval(expression, {"__builtins__": {}}, {})), 6)
        return str(result)
    except Exception as e:
        return f"Error: {e}"

# Inspect the tool schema — this is what the LLM sees
print(get_weather.name)         # → "get_weather"
print(get_weather.description)  # → "Get the current weather for a given city."
print(get_weather.args_schema.model_json_schema())  # → JSON schema for args
💡
Docstrings matter: The @tool decorator uses the function's docstring as the tool description the LLM sees. Write clear, specific descriptions — they directly affect how reliably the model selects and invokes your tool.

Method 2: StructuredTool (Full Control)

Use StructuredTool when you need a Pydantic model for input validation, custom error handling, or when the tool wraps an existing function you don't want to decorate.

from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field

# Define a Pydantic schema for rich input validation
class SearchInput(BaseModel):
    """Input schema for the enterprise search tool."""
    query: str = Field(description="The search query string")
    department: str = Field(
        default="all",
        description="Filter results to a specific department",
    )
    max_results: int = Field(
        default=5,
        ge=1,
        le=20,
        description="Maximum number of results to return (1-20)",
    )

# The actual implementation function
def search_enterprise_docs(query: str, department: str = "all", max_results: int = 5) -> str:
    """Search internal documents. In production, call your search API here."""
    return f"Found {max_results} results for '{query}' in {department} department."

# Create the tool with explicit schema, name, and description
search_tool = StructuredTool.from_function(
    func=search_enterprise_docs,
    name="search_enterprise_docs",
    description="Search the company's internal document repository. Use this when the user asks about internal policies, procedures, or company information.",
    args_schema=SearchInput,
    return_direct=False,  # False = LLM can process the result before responding
)

# Test it directly
print(search_tool.invoke({"query": "vacation policy", "department": "hr"}))

Async Tools

For I/O-bound operations (API calls, database queries), define async tools to avoid blocking:

import httpx
from langchain_core.tools import tool

@tool
async def fetch_stock_price(symbol: str) -> str:
    """Fetch the current stock price for a given ticker symbol.

    Args:
        symbol: Stock ticker symbol, e.g. 'AAPL', 'GOOGL'.
    """
    async with httpx.AsyncClient() as client:
        # Example API call — replace with your actual data provider
        resp = await client.get(
            f"https://api.example.com/stock/{symbol}",
            timeout=10.0,
        )
        resp.raise_for_status()
        data = resp.json()
        return f"{symbol}: ${data['price']:.2f}"

4.2 — Tool Binding

Defining tools is only half the story. You must bind them to a chat model so it knows they exist and can decide when to call them. LangChain's .bind_tools() method attaches tool schemas to the model. The model then returns tool_calls in its response when it decides a tool should be invoked.

Binding Tools to a Model

import os
from dotenv import load_dotenv
from langchain_openai import AzureChatOpenAI

load_dotenv()

llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    temperature=0.0,
)

# Bind tools — the model now knows about get_weather, calculate, search_tool
tools = [get_weather, calculate, search_tool]
llm_with_tools = llm.bind_tools(tools)

# Ask a question that should trigger a tool call
response = llm_with_tools.invoke("What's the weather in London?")

# The model doesn't call the tool itself — it returns a tool_call request
print(response.content)       # → "" (empty when tool call is made)
print(response.tool_calls)    # → [{'name': 'get_weather', 'args': {'city': 'London'}, 'id': '...'}]
ℹ️
Key concept: bind_tools() doesn't execute tools. The model returns a tool_calls list describing which tool to call and with what arguments. Your code (or an AgentExecutor) is responsible for actually executing the tool and feeding the result back. This is the standard "tool calling" protocol supported by OpenAI, Azure, Gemini, and Anthropic models.

Complete Tool Calling Loop (Manual)

Here's the full loop: invoke the model → detect tool calls → execute tools → feed results back → get final answer.

from langchain_core.messages import HumanMessage, ToolMessage

# Step 1: User asks a question
messages = [HumanMessage(content="What's 15% of 340, and what's the weather in Tokyo?")]

# Step 2: Model decides which tools to call (may call multiple)
ai_response = llm_with_tools.invoke(messages)
messages.append(ai_response)  # Add AI response to conversation

# Step 3: Execute each tool call and collect results
tool_map = {t.name: t for t in tools}
for tool_call in ai_response.tool_calls:
    tool_fn = tool_map[tool_call["name"]]
    result = tool_fn.invoke(tool_call["args"])
    # ToolMessage carries the result back, linked by tool_call_id
    messages.append(ToolMessage(
        content=str(result),
        tool_call_id=tool_call["id"],
    ))

# Step 4: Model synthesizes tool results into a natural language answer
final_response = llm_with_tools.invoke(messages)
print(final_response.content)
# → "15% of 340 is 51. The weather in Tokyo is 18°C with light rain."

LCEL Chain with Auto Tool Execution

For simpler cases, use tool_choice to force a specific tool and pipe the result directly:

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant with access to tools."),
    ("human", "{input}"),
])

# Force the model to always call a specific tool
forced_weather = llm.bind_tools([get_weather], tool_choice="get_weather")

# Or let it decide freely (tool_choice="auto" is the default)
auto_tools = llm.bind_tools(tools, tool_choice="auto")

4.3 — Model Context Protocol (MCP)

The Model Context Protocol is an open standard (originally from Anthropic) that defines how LLMs discover and invoke tools, access resources, and receive context — regardless of the model provider or client. Think of it as a USB-C for AI tools: one standard interface that works everywhere.

MCP in the LangChain Ecosystem langchain-mcp-adapters

How it fits: The langchain-mcp-adapters package bridges MCP servers (which expose tools via the MCP protocol) with LangChain's tool abstraction. Any MCP server becomes a set of LangChain-compatible tools — usable with bind_tools(), agents, and LangGraph.

Open standard Provider-agnostic Tool interop Stdio & SSE transports

MCP Architecture

ComponentRoleExample
MCP Server Exposes tools, resources, and prompts via the MCP protocol A file system server, a database server, a GitHub server
MCP Client Connects to servers, discovers tools, and routes invocations MultiServerMCPClient from langchain-mcp-adapters
LangChain Bridge Converts MCP tools → LangChain BaseTool instances Automatic via adapter — tools appear native to LangChain
Transport Communication channel between client and server stdio (local process), sse (HTTP streaming)

Connecting an MCP Server to LangChain (Stdio Transport)

# pip install langchain-mcp-adapters
import asyncio
import os
from dotenv import load_dotenv
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_openai import AzureChatOpenAI
from langgraph.prebuilt import create_react_agent

load_dotenv()

async def main():
    # 1. Configure MCP server connections
    #    Each server is a separate process communicating via stdio
    async with MultiServerMCPClient(
        {
            "filesystem": {
                "command": "npx",
                "args": [
                    "-y",
                    "@modelcontextprotocol/server-filesystem",
                    "/path/to/allowed/directory",
                ],
                "transport": "stdio",
            },
            "github": {
                "command": "npx",
                "args": ["-y", "@modelcontextprotocol/server-github"],
                "transport": "stdio",
                "env": {
                    "GITHUB_TOKEN": os.environ["GITHUB_TOKEN"],
                },
            },
        }
    ) as mcp_client:
        # 2. Get LangChain-compatible tools from ALL connected MCP servers
        mcp_tools = mcp_client.get_tools()
        print(f"Discovered {len(mcp_tools)} tools from MCP servers:")
        for t in mcp_tools:
            print(f"  - {t.name}: {t.description[:80]}")

        # 3. Create a LangGraph ReAct agent with MCP tools
        #    The agent can now read files, list directories, create GitHub issues, etc.
        llm = AzureChatOpenAI(
            azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
            azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
            api_version=os.environ["AZURE_OPENAI_API_VERSION"],
            temperature=0.0,
        )

        agent = create_react_agent(llm, mcp_tools)

        # 4. Invoke — the agent uses MCP tools as if they were native
        result = await agent.ainvoke({
            "messages": [("human", "List all Python files in the project and summarize README.md")],
        })
        print(result["messages"][-1].content)

asyncio.run(main())

Connecting via SSE (Remote MCP Server)

For remote MCP servers exposed over HTTP (e.g., deployed as a microservice), use the sse transport:

async with MultiServerMCPClient(
    {
        "remote_tools": {
            "url": "http://mcp-server.internal:8080/sse",
            "transport": "sse",
        },
    }
) as mcp_client:
    tools = mcp_client.get_tools()
    # Use tools with any LangChain agent or chain...

Bridging a Custom MCP Server with LangChain

If you've built your own MCP server (e.g., exposing a proprietary database), here's the conceptual pattern for bridging it into a LangChain agent:

"""
Conceptual architecture: Custom MCP Server → LangChain Agent

1. Your MCP server exposes tools via the MCP protocol:
   - list_customers(region: str) → customer records
   - get_order_status(order_id: str) → status
   - create_support_ticket(subject: str, body: str) → ticket ID

2. langchain-mcp-adapters discovers these tools at runtime
3. Tools are automatically converted to LangChain BaseTool instances
4. Any LangChain agent can invoke them — AgentExecutor, LangGraph, etc.
"""
import asyncio
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langchain_openai import AzureChatOpenAI

async def run_custom_mcp_agent():
    # Connect to your custom MCP server
    async with MultiServerMCPClient(
        {
            "crm": {
                "command": "python",
                "args": ["my_mcp_server.py"],    # Your custom MCP server
                "transport": "stdio",
            },
        }
    ) as client:
        crm_tools = client.get_tools()
        # crm_tools now contains:
        #   - list_customers (LangChain BaseTool)
        #   - get_order_status (LangChain BaseTool)
        #   - create_support_ticket (LangChain BaseTool)

        # Combine MCP tools with native LangChain tools
        all_tools = crm_tools + [get_weather, calculate]  # Mix and match

        llm = AzureChatOpenAI(
            azure_endpoint="...",
            azure_deployment="gpt-4o",
            api_version="2024-12-01-preview",
        )

        # Agent has access to both MCP and native tools seamlessly
        agent = create_react_agent(llm, all_tools)
        result = await agent.ainvoke({
            "messages": [("human", "List all customers in EMEA and check order #12345 status")],
        })
        print(result["messages"][-1].content)

asyncio.run(run_custom_mcp_agent())
Security: MCP servers have access to real systems. Always run them with least-privilege credentials, validate tool inputs, and audit tool invocations. Use the env dict in the client config to scope secrets per server — never expose credentials globally.

MCP vs. Native LangChain Tools — When to Use What

AspectNative LangChain ToolsMCP Tools
Definition @tool decorator or StructuredTool in Python MCP server (any language) exposes tools via protocol
Discovery Manually registered in code Auto-discovered at runtime from connected servers
Language Python only Any (TypeScript, Python, Go, Rust…)
Reusability Within your LangChain codebase Across any MCP-compatible client (Claude, Copilot, LangChain, etc.)
Best for App-specific logic tightly coupled to your chain Shared platform tools, org-wide services, polyglot environments
Setup overhead Minimal — just Python functions Requires an MCP server process
💡
Best practice: Use native @tool for chain-specific logic (formatters, calculators, custom RAG helpers). Use MCP for organization-wide shared tools (database access, ticketing systems, file systems) that multiple AI clients need to use. The two approaches compose seamlessly — all_tools = mcp_tools + native_tools.

Module 5 — Middleware & Callbacks

Callbacks are LangChain's observability layer — hooks that fire at key lifecycle events (LLM start, tool invocation, chain error, token streamed, etc.). They power logging, tracing, streaming, cost tracking, and custom monitoring without modifying your chain logic. Think of them as middleware for your AI pipeline.

📥 Chain Start 🧠 LLM Start 🔤 Token Stream ✅ LLM End 📤 Chain End

5.1 — Built-In Callbacks & Streaming

Token-Level Streaming

The most common use of callbacks — streaming tokens to the user as they're generated, giving instant feedback instead of waiting for the full response:

import os
from dotenv import load_dotenv
from langchain_openai import AzureChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

load_dotenv()

llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    temperature=0.7,
    streaming=True,
)

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a concise technical writer."),
    ("human", "{question}"),
])

chain = prompt | llm

# Method 1: .stream() — yields chunks as they arrive
for chunk in chain.stream({"question": "Explain LCEL in 3 sentences"}):
    print(chunk.content, end="", flush=True)

# Method 2: .astream() — async streaming (FastAPI, async apps)
import asyncio

async def stream_response():
    async for chunk in chain.astream({"question": "What is RAG?"}):
        print(chunk.content, end="", flush=True)

asyncio.run(stream_response())

Streaming Events (Full Lifecycle Visibility)

.astream_events() gives you fine-grained access to every event in the chain — useful for building progress indicators, debugging, or advanced UIs:

from langchain_core.output_parsers import StrOutputParser

chain = prompt | llm | StrOutputParser()

async def observe_events():
    async for event in chain.astream_events(
        {"question": "What are LangChain callbacks?"},
        version="v2",
    ):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            # Token-level streaming from the LLM
            token = event["data"]["chunk"].content
            print(token, end="", flush=True)
        elif kind == "on_chain_start":
            print(f"\n⏳ Chain started: {event['name']}")
        elif kind == "on_chain_end":
            print(f"\n✅ Chain finished: {event['name']}")
        elif kind == "on_tool_start":
            print(f"\n🔧 Tool invoked: {event['name']}")

asyncio.run(observe_events())
ℹ️
Event types: on_chain_start/end, on_chat_model_start/end/stream, on_llm_start/end/stream, on_tool_start/end, on_retriever_start/end, on_parser_start/end. Each event includes name, data, run_id, and parent_ids for correlation.

Inline Callbacks with Handler List

You can pass callback handlers directly to any .invoke() or .stream() call via the config parameter:

from langchain_core.callbacks import StdOutCallbackHandler

# StdOutCallbackHandler prints every lifecycle event to stdout
result = chain.invoke(
    {"question": "What is LCEL?"},
    config={"callbacks": [StdOutCallbackHandler()]},
)

# Multiple handlers compose — each receives all events
result = chain.invoke(
    {"question": "What is LCEL?"},
    config={"callbacks": [StdOutCallbackHandler(), my_custom_handler]},
)

5.2 — Custom Callback Handlers

For production systems, you'll want custom handlers that log to your observability stack, track costs, enforce guardrails, or measure latency. Extend BaseCallbackHandler (sync) or AsyncCallbackHandler (async) and override only the hooks you need.

Production Logging Handler

import time
import logging
from typing import Any
from uuid import UUID
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage
from langchain_core.outputs import LLMResult

logger = logging.getLogger("langchain.telemetry")

class ProductionLoggingHandler(BaseCallbackHandler):
    """Logs LLM calls, latency, token usage, and errors to structured logging."""

    def __init__(self):
        self._start_times: dict[UUID, float] = {}

    def on_chat_model_start(
        self,
        serialized: dict[str, Any],
        messages: list[list[BaseMessage]],
        *,
        run_id: UUID,
        **kwargs: Any,
    ) -> None:
        self._start_times[run_id] = time.perf_counter()
        model = serialized.get("kwargs", {}).get("model_name", "unknown")
        logger.info(
            "LLM call started",
            extra={"run_id": str(run_id), "model": model},
        )

    def on_llm_end(
        self,
        response: LLMResult,
        *,
        run_id: UUID,
        **kwargs: Any,
    ) -> None:
        elapsed = time.perf_counter() - self._start_times.pop(run_id, 0)
        usage = response.llm_output.get("token_usage", {}) if response.llm_output else {}
        logger.info(
            "LLM call completed",
            extra={
                "run_id": str(run_id),
                "latency_ms": round(elapsed * 1000, 2),
                "prompt_tokens": usage.get("prompt_tokens"),
                "completion_tokens": usage.get("completion_tokens"),
                "total_tokens": usage.get("total_tokens"),
            },
        )

    def on_llm_error(
        self,
        error: BaseException,
        *,
        run_id: UUID,
        **kwargs: Any,
    ) -> None:
        elapsed = time.perf_counter() - self._start_times.pop(run_id, 0)
        logger.error(
            "LLM call failed",
            extra={
                "run_id": str(run_id),
                "latency_ms": round(elapsed * 1000, 2),
                "error": str(error),
            },
        )

Cost Tracking Handler

from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from uuid import UUID
from typing import Any

# Pricing per 1K tokens (update with your Azure/OpenAI pricing)
PRICING = {
    "gpt-4o":      {"input": 0.0025, "output": 0.01},
    "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
    "gpt-4.1":     {"input": 0.002, "output": 0.008},
}

class CostTracker(BaseCallbackHandler):
    """Track cumulative token usage and estimated cost across a session."""

    def __init__(self, model_name: str = "gpt-4o"):
        self.model_name = model_name
        self.total_prompt_tokens = 0
        self.total_completion_tokens = 0
        self.total_cost = 0.0
        self.call_count = 0

    def on_llm_end(self, response: LLMResult, *, run_id: UUID, **kwargs: Any) -> None:
        usage = response.llm_output.get("token_usage", {}) if response.llm_output else {}
        prompt_tokens = usage.get("prompt_tokens", 0)
        completion_tokens = usage.get("completion_tokens", 0)

        self.total_prompt_tokens += prompt_tokens
        self.total_completion_tokens += completion_tokens
        self.call_count += 1

        prices = PRICING.get(self.model_name, PRICING["gpt-4o"])
        cost = (prompt_tokens / 1000) * prices["input"] + \
               (completion_tokens / 1000) * prices["output"]
        self.total_cost += cost

    def report(self) -> str:
        return (
            f"📊 Session Report:\n"
            f"   Calls: {self.call_count}\n"
            f"   Prompt tokens: {self.total_prompt_tokens:,}\n"
            f"   Completion tokens: {self.total_completion_tokens:,}\n"
            f"   Estimated cost: ${self.total_cost:.4f}"
        )

# Usage
cost_tracker = CostTracker(model_name="gpt-4o")
log_handler = ProductionLoggingHandler()

result = chain.invoke(
    {"question": "Explain microservices"},
    config={"callbacks": [cost_tracker, log_handler]},
)

print(cost_tracker.report())
# 📊 Session Report:
#    Calls: 1
#    Prompt tokens: 28
#    Completion tokens: 142
#    Estimated cost: $0.0015

Async Callback Handler (for I/O-bound telemetry)

When your callback needs to make network calls (send metrics to Datadog, post to a webhook), use AsyncCallbackHandler to avoid blocking the chain:

import httpx
from langchain_core.callbacks import AsyncCallbackHandler
from langchain_core.outputs import LLMResult
from uuid import UUID
from typing import Any

class WebhookNotifier(AsyncCallbackHandler):
    """Send LLM completion events to a webhook endpoint asynchronously."""

    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    async def on_llm_end(
        self,
        response: LLMResult,
        *,
        run_id: UUID,
        **kwargs: Any,
    ) -> None:
        usage = response.llm_output.get("token_usage", {}) if response.llm_output else {}
        async with httpx.AsyncClient() as client:
            await client.post(
                self.webhook_url,
                json={
                    "event": "llm_completion",
                    "run_id": str(run_id),
                    "tokens": usage,
                },
                timeout=5.0,
            )

    async def on_llm_error(
        self,
        error: BaseException,
        *,
        run_id: UUID,
        **kwargs: Any,
    ) -> None:
        async with httpx.AsyncClient() as client:
            await client.post(
                self.webhook_url,
                json={
                    "event": "llm_error",
                    "run_id": str(run_id),
                    "error": str(error),
                },
                timeout=5.0,
            )

Streaming to FastAPI with Callbacks

A common production pattern — streaming LLM output to a web client via Server-Sent Events:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import AzureChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

app = FastAPI()

llm = AzureChatOpenAI(
    azure_endpoint="...",
    azure_deployment="gpt-4o",
    api_version="2024-12-01-preview",
    streaming=True,
)

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("human", "{question}"),
])

chain = prompt | llm | StrOutputParser()

@app.get("/stream")
async def stream_answer(question: str):
    async def event_generator():
        async for chunk in chain.astream({"question": question}):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
    )
Performance: Sync callbacks (BaseCallbackHandler) run on the main thread and block execution. For network calls (logging to external services, sending webhooks), always use AsyncCallbackHandler to avoid adding latency to your chain.

All Available Callback Hooks

HookFires WhenKey Data
on_chat_model_start Chat model receives messages Serialized model, input messages, run_id
on_llm_start Any LLM receives prompts Serialized LLM, prompt strings
on_llm_new_token Each streamed token arrives Token string, chunk index
on_llm_end LLM returns full response LLMResult with generations + token_usage
on_llm_error LLM call raises an exception Exception object, run_id
on_chain_start / end / error Any Runnable chain starts/finishes/fails Serialized chain, inputs/outputs
on_tool_start / end / error Tool invocation lifecycle Tool name, input, output
on_retriever_start / end / error Retriever fetches documents Query, retrieved documents
on_retry A retry attempt is made Retry state, error that triggered it
💡
LangSmith integration: For production tracing without building custom handlers, use LangSmith. Set LANGCHAIN_TRACING_V2=true and LANGCHAIN_API_KEY=... in your environment, and all chain executions are automatically traced with latency, token usage, inputs/outputs, and error details — zero code changes needed.

Module 6 — Guardrails & Output Validation

Production LLM applications need deterministic, validated outputs. LangChain provides first-class support for structured output parsing, Pydantic validation, and fallback chains — ensuring your pipelines degrade gracefully and always return data your downstream code can trust.

🧠 LLM Output 📋 Parse ✅ Validate 🔄 Retry / Fallback 📦 Typed Data

6.1 — Structured Output with Pydantic

The most reliable approach: use .with_structured_output() to force the model to return data matching a Pydantic schema. This uses the model's native tool-calling / JSON mode under the hood — no fragile prompt engineering required.

import os
from dotenv import load_dotenv
from langchain_openai import AzureChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field

load_dotenv()

# Define the expected output schema
class SentimentAnalysis(BaseModel):
    """Structured sentiment analysis result."""
    sentiment: str = Field(description="One of: positive, negative, neutral")
    confidence: float = Field(ge=0.0, le=1.0, description="Confidence score 0-1")
    reasoning: str = Field(description="Brief explanation of the classification")
    key_phrases: list[str] = Field(description="Key phrases that drove the classification")

llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    temperature=0.0,
)

# .with_structured_output() returns a Pydantic instance directly
structured_llm = llm.with_structured_output(SentimentAnalysis)

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a sentiment analysis engine. Analyze the given text."),
    ("human", "{text}"),
])

chain = prompt | structured_llm

# Result is a validated Pydantic object — not a string!
result: SentimentAnalysis = chain.invoke({
    "text": "The new API is blazing fast but the docs are terrible."
})

print(result.sentiment)      # → "negative" or "neutral"
print(result.confidence)     # → 0.72
print(result.key_phrases)    # → ["blazing fast", "docs are terrible"]
print(result.model_dump())   # → Full dict for serialization
💡
Why .with_structured_output() over manual parsing? It uses the model's native function-calling / JSON schema mode, which is significantly more reliable than asking the LLM to produce JSON in free text and then parsing it. Pydantic validation is applied automatically — if the model returns a confidence of 1.5, the validation will catch it.

Complex Nested Schemas

from pydantic import BaseModel, Field
from enum import Enum

class Priority(str, Enum):
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"

class ActionItem(BaseModel):
    task: str = Field(description="Specific action to take")
    assignee: str = Field(description="Suggested owner for this task")
    priority: Priority = Field(description="Priority level")

class MeetingNotes(BaseModel):
    """Structured meeting summary with decisions and action items."""
    title: str = Field(description="Meeting title")
    summary: str = Field(description="2-3 sentence summary of the meeting")
    decisions: list[str] = Field(description="Key decisions made")
    action_items: list[ActionItem] = Field(description="Concrete action items")
    follow_up_date: str | None = Field(
        default=None,
        description="Next follow-up date if mentioned, ISO format",
    )

# The model will produce valid nested objects
structured_llm = llm.with_structured_output(MeetingNotes)
notes = structured_llm.invoke("Summarize: We met on Monday to discuss Q3 launch...")

for item in notes.action_items:
    print(f"[{item.priority.value}] {item.task} → {item.assignee}")

PydanticOutputParser (Manual Approach)

For models that don't support native structured output, or when you need more control over the prompt, use PydanticOutputParser:

from langchain_core.output_parsers import PydanticOutputParser

parser = PydanticOutputParser(pydantic_object=SentimentAnalysis)

prompt = ChatPromptTemplate.from_messages([
    ("system", "Analyze the sentiment of the text.\n{format_instructions}"),
    ("human", "{text}"),
])

# Inject format_instructions into the prompt (tells model the JSON schema)
chain = prompt.partial(format_instructions=parser.get_format_instructions()) | llm | parser

result = chain.invoke({"text": "Best product I've ever used!"})
# result is a validated SentimentAnalysis instance

Output Fixing Parser (Auto-Correct Malformed Output)

from langchain.output_parsers import OutputFixingParser

# Wraps another parser — if parsing fails, asks the LLM to fix the output
fixing_parser = OutputFixingParser.from_llm(parser=parser, llm=llm)

# If the model returns malformed JSON, fixing_parser will:
# 1. Catch the parse error
# 2. Send the error + original output back to the LLM
# 3. Ask the LLM to fix it
# 4. Parse again
chain_with_fix = prompt.partial(
    format_instructions=parser.get_format_instructions()
) | llm | fixing_parser

6.2 — Fallbacks & Retry Logic

LLMs are non-deterministic and external APIs fail. LangChain's .with_fallbacks() and .with_retry() let you build resilient chains that gracefully degrade without complex try/except trees.

Model Fallback Chain

If the primary model fails (rate limit, outage, timeout), automatically fall back to an alternative:

from langchain_openai import AzureChatOpenAI

# Primary: GPT-4o (best quality)
primary_llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment="gpt-4o",
    api_version="2024-12-01-preview",
    temperature=0.0,
    request_timeout=30,
)

# Fallback: GPT-4o-mini (faster, cheaper, still good)
fallback_llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment="gpt-4o-mini",
    api_version="2024-12-01-preview",
    temperature=0.0,
    request_timeout=30,
)

# If primary raises ANY exception, fallback is tried
resilient_llm = primary_llm.with_fallbacks([fallback_llm])

# Use it exactly like a normal LLM — fallback is transparent
result = resilient_llm.invoke("Explain quantum computing in one paragraph.")

Retry with Exponential Backoff

# Retry on transient errors (rate limits, network blips)
retrying_llm = primary_llm.with_retry(
    stop_after_attempt=3,           # Max 3 attempts
    wait_exponential_jitter=True,    # Exponential backoff with jitter
)

# Combine retry + fallback for maximum resilience
robust_llm = primary_llm.with_retry(
    stop_after_attempt=2,
    wait_exponential_jitter=True,
).with_fallbacks([
    fallback_llm.with_retry(stop_after_attempt=2, wait_exponential_jitter=True)
])

# Execution order: primary (try 1) → primary (try 2) → fallback (try 1) → fallback (try 2)

Chain-Level Fallbacks

Fallbacks work at the chain level too — not just on individual models:

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Detailed chain with structured output
detailed_prompt = ChatPromptTemplate.from_messages([
    ("system", "Provide a detailed, structured analysis with sections and bullet points."),
    ("human", "{question}"),
])
detailed_chain = detailed_prompt | primary_llm | StrOutputParser()

# Simple chain as fallback — less fancy, but always works
simple_prompt = ChatPromptTemplate.from_messages([
    ("system", "Give a brief, concise answer."),
    ("human", "{question}"),
])
simple_chain = simple_prompt | fallback_llm | StrOutputParser()

# If the detailed chain fails for any reason, fall back to the simple one
resilient_chain = detailed_chain.with_fallbacks([simple_chain])

Conditional Validation with RunnableLambda

Add custom validation logic anywhere in the chain using RunnableLambda:

from langchain_core.runnables import RunnableLambda

def validate_sentiment(result: SentimentAnalysis) -> SentimentAnalysis:
    """Post-processing validation — raise if output is suspicious."""
    valid_sentiments = {"positive", "negative", "neutral"}
    if result.sentiment.lower() not in valid_sentiments:
        raise ValueError(f"Invalid sentiment: {result.sentiment}")
    if not result.key_phrases:
        raise ValueError("key_phrases cannot be empty")
    return result

# Chain: prompt → structured LLM → validate → output
validated_chain = (
    prompt
    | llm.with_structured_output(SentimentAnalysis)
    | RunnableLambda(validate_sentiment)
)

# Add a fallback for validation failures
fallback_chain = (
    prompt
    | fallback_llm.with_structured_output(SentimentAnalysis)
    | RunnableLambda(validate_sentiment)
)

safe_chain = validated_chain.with_fallbacks([fallback_chain])
ℹ️
Fallback strategies in practice:
Model degradation: GPT-4o → GPT-4o-mini (quality trade-off for reliability)
Provider failover: Azure OpenAI → OpenAI direct → Anthropic Claude
Approach change: Structured output → free text with manual parsing
Graceful default: Return cached/default response if all else fails

Putting It All Together — Production Guardrails Pattern

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import AzureChatOpenAI
from pydantic import BaseModel, Field

class ExtractedEntity(BaseModel):
    name: str = Field(description="Entity name")
    entity_type: str = Field(description="Type: person, org, location, date")
    context: str = Field(description="Sentence where entity appears")

class EntityExtractionResult(BaseModel):
    entities: list[ExtractedEntity] = Field(description="All extracted entities")
    source_language: str = Field(description="Detected language of input")

prompt = ChatPromptTemplate.from_messages([
    ("system", "Extract all named entities from the text."),
    ("human", "{text}"),
])

# Build the full production chain:
# 1. Structured output with Pydantic validation
# 2. Retry on transient failures
# 3. Fallback to cheaper model if primary fails
# 4. Custom post-validation

def post_validate(result: EntityExtractionResult) -> EntityExtractionResult:
    # Remove any entities with empty names (hallucination guard)
    result.entities = [e for e in result.entities if e.name.strip()]
    return result

primary_chain = (
    prompt
    | primary_llm.with_structured_output(EntityExtractionResult).with_retry(
        stop_after_attempt=2, wait_exponential_jitter=True
    )
    | RunnableLambda(post_validate)
)

fallback_chain = (
    prompt
    | fallback_llm.with_structured_output(EntityExtractionResult).with_retry(
        stop_after_attempt=2, wait_exponential_jitter=True
    )
    | RunnableLambda(post_validate)
)

production_chain = primary_chain.with_fallbacks([fallback_chain])

# Use it — resilient, validated, type-safe
result = production_chain.invoke({
    "text": "Tim Cook announced that Apple will open a new office in Berlin by March 2026."
})
for e in result.entities:
    print(f"  {e.entity_type}: {e.name}")
#   person: Tim Cook
#   org: Apple
#   location: Berlin
#   date: March 2026

Module 7 — Agents & Multi-Agent Systems

Agents are LLM-powered programs that can reason, plan, and take actions in a loop. Unlike chains (which follow a fixed path), agents dynamically decide which tools to call, in what order, and when to stop. LangChain supports two approaches: legacy AgentExecutor (simple, good for prototyping) and LangGraph (production-grade, full control over state and flow).

Agent Architecture The ReAct Loop

How agents work: The model receives a task + tool descriptions → reasons about what to do → emits a tool call → your code executes the tool → result goes back to the model → repeats until the model decides it has a final answer. This is the Reasoning + Acting (ReAct) pattern.

💬 User Task 🧠 Reason 🔧 Tool Call 📊 Observe 🔁 Loop or 📤 Answer

7.1 — Single Agent with Tool Calling

LangGraph ReAct Agent (Recommended)

create_react_agent from LangGraph is the modern, production-recommended way to build agents. It gives you a stateful graph with built-in tool execution, message history, and streaming — all composable with LangGraph's state machine primitives.

import os
from dotenv import load_dotenv
from langchain_openai import AzureChatOpenAI
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent

load_dotenv()

# Define tools
@tool
def search_docs(query: str) -> str:
    """Search the knowledge base for relevant documents.

    Args:
        query: The search query to find relevant information.
    """
    # In production, call your retriever / search API here
    return f"Found 3 documents matching '{query}': [Doc about API design, Doc about testing, Doc about deployment]"

@tool
def create_ticket(title: str, description: str, priority: str = "medium") -> str:
    """Create a support ticket in the ticketing system.

    Args:
        title: Short title for the ticket.
        description: Detailed description of the issue.
        priority: Priority level - low, medium, or high.
    """
    # In production, call your ticketing API here
    return f"✅ Ticket created: '{title}' (priority: {priority}) — ID: TICKET-4521"

@tool
def get_user_info(user_id: str) -> str:
    """Look up information about a user by their ID.

    Args:
        user_id: The unique user identifier.
    """
    return f"User {user_id}: Jane Smith, Engineering, joined 2023-06"

# Create the model
llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    temperature=0.0,
)

# Create the agent — this is a full LangGraph StateGraph under the hood
agent = create_react_agent(
    model=llm,
    tools=[search_docs, create_ticket, get_user_info],
    prompt="You are an IT support agent. Help users with their technical issues. Use your tools to look up information and create tickets when needed.",
)

# Invoke — the agent loops: reason → tool → observe → repeat
result = agent.invoke({
    "messages": [("human", "User u-1234 is having API timeout issues. Search the docs for solutions and if there's no quick fix, create a high priority ticket.")],
})

# Print the full conversation (shows reasoning + tool calls + final answer)
for msg in result["messages"]:
    print(f"{msg.__class__.__name__}: {msg.content[:120]}")

Streaming Agent Output

# Stream the agent's execution step-by-step
import asyncio

async def stream_agent():
    async for event in agent.astream_events(
        {"messages": [("human", "Look up user u-5678 and create a ticket for their login issue")]},
        version="v2",
    ):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            token = event["data"]["chunk"].content
            if token:
                print(token, end="", flush=True)
        elif kind == "on_tool_start":
            print(f"\n🔧 Calling tool: {event['name']}")
        elif kind == "on_tool_end":
            print(f"\n✅ Tool result: {event['data'].get('output', '')[:100]}")

asyncio.run(stream_agent())

Agent with Memory (Stateful Conversations)

LangGraph agents support persistent memory via checkpointers — each thread maintains its full conversation history across invocations:

from langgraph.checkpoint.memory import MemorySaver

# In-memory checkpointer (use PostgresSaver or RedisSaver in production)
memory = MemorySaver()

agent_with_memory = create_react_agent(
    model=llm,
    tools=[search_docs, create_ticket, get_user_info],
    prompt="You are an IT support agent. Remember our conversation history.",
    checkpointer=memory,
)

# Thread ID groups messages into a conversation
config = {"configurable": {"thread_id": "session-001"}}

# First message
result1 = agent_with_memory.invoke(
    {"messages": [("human", "Look up user u-1234")]},
    config=config,
)
print(result1["messages"][-1].content)

# Follow-up — agent remembers the previous context
result2 = agent_with_memory.invoke(
    {"messages": [("human", "Create a ticket for them about slow builds")]},
    config=config,
)
print(result2["messages"][-1].content)
# → Agent knows "them" = user u-1234 (Jane Smith) from previous turn
Legacy AgentExecutor: You may see create_tool_calling_agent() + AgentExecutor in older tutorials. While still functional, LangGraph's create_react_agent is the recommended approach for new projects — it offers better streaming, state management, error recovery, and composability. The AgentExecutor will eventually be deprecated.

7.2 — Multi-Agent Architectures with LangGraph

For complex tasks that exceed a single agent's scope, LangGraph lets you compose multiple specialized agents into a coordinated workflow. Each agent has its own tools and expertise, and a supervisor (or a graph topology) routes tasks between them.

Architecture Patterns

PatternDescriptionBest For
Supervisor One coordinator agent routes tasks to worker agents Heterogeneous tasks (research + code + review)
Sequential Agents form a pipeline — output of one feeds the next Multi-step workflows (draft → review → publish)
Hierarchical Supervisors delegate to sub-supervisors → workers Large organizations with specialized teams
Collaborative Agents communicate peer-to-peer on a shared state Debate, negotiation, multi-perspective analysis

Supervisor Multi-Agent Pattern

The most common pattern: a supervisor LLM decides which specialist agent to invoke based on the task. Each worker agent has its own tools and system prompt.

from typing import Annotated, Literal
from typing_extensions import TypedDict
import operator

from langchain_openai import AzureChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, BaseMessage
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import create_react_agent

# ─── Shared State ───
class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], operator.add]
    next_agent: str

# ─── Define Specialist Tools ───
@tool
def search_web(query: str) -> str:
    """Search the web for current information.

    Args:
        query: Search query string.
    """
    return f"Web results for '{query}': [Article about latest trends, Research paper, Blog post]"

@tool
def run_python(code: str) -> str:
    """Execute Python code for data analysis.

    Args:
        code: Python code to execute safely.
    """
    return f"Code executed successfully. Output: [Analysis results with 3 charts]"

@tool
def write_document(title: str, content: str) -> str:
    """Write and save a document.

    Args:
        title: Document title.
        content: Full document content in markdown.
    """
    return f"Document '{title}' saved successfully ({len(content)} chars)"

# ─── Create Specialist Agents ───
llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    temperature=0.0,
)

researcher = create_react_agent(
    model=llm,
    tools=[search_web],
    prompt="You are a research specialist. Search the web thoroughly and provide detailed findings with sources.",
)

analyst = create_react_agent(
    model=llm,
    tools=[run_python],
    prompt="You are a data analyst. Analyze data using Python and produce clear summaries with key metrics.",
)

writer = create_react_agent(
    model=llm,
    tools=[write_document],
    prompt="You are a technical writer. Take research and analysis and produce a well-structured report.",
)

Building the Supervisor Graph

from pydantic import BaseModel, Field

# ─── Supervisor decides which agent to call next ───
class RouterDecision(BaseModel):
    """The supervisor's routing decision."""
    next: Literal["researcher", "analyst", "writer", "FINISH"] = Field(
        description="Which agent should handle the next step, or FINISH if done."
    )
    reasoning: str = Field(description="Why this agent was chosen.")

supervisor_llm = llm.with_structured_output(RouterDecision)

SUPERVISOR_PROMPT = """You are a project supervisor managing a team of specialists:
- **researcher**: Searches the web for information and current data
- **analyst**: Runs Python code for data analysis and computations
- **writer**: Produces well-structured documents and reports

Given the conversation so far, decide which agent should act next.
If the task is fully complete, respond with FINISH.
Always route research tasks before analysis, and analysis before writing."""

def supervisor_node(state: AgentState) -> AgentState:
    # Ask the supervisor LLM to decide
    decision = supervisor_llm.invoke([
        ("system", SUPERVISOR_PROMPT),
        *state["messages"],
        ("human", "Based on the progress so far, which agent should act next?"),
    ])
    return {"messages": [], "next_agent": decision.next}

# ─── Worker node wrappers ───
def researcher_node(state: AgentState) -> AgentState:
    result = researcher.invoke({"messages": state["messages"]})
    return {"messages": result["messages"][-1:], "next_agent": ""}

def analyst_node(state: AgentState) -> AgentState:
    result = analyst.invoke({"messages": state["messages"]})
    return {"messages": result["messages"][-1:], "next_agent": ""}

def writer_node(state: AgentState) -> AgentState:
    result = writer.invoke({"messages": state["messages"]})
    return {"messages": result["messages"][-1:], "next_agent": ""}

# ─── Build the Graph ───
graph = StateGraph(AgentState)

# Add nodes
graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher_node)
graph.add_node("analyst", analyst_node)
graph.add_node("writer", writer_node)

# Entry point → supervisor
graph.set_entry_point("supervisor")

# Supervisor routes to workers or END
graph.add_conditional_edges(
    "supervisor",
    lambda state: state["next_agent"],
    {
        "researcher": "researcher",
        "analyst": "analyst",
        "writer": "writer",
        "FINISH": END,
    },
)

# Workers always report back to supervisor
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyst", "supervisor")
graph.add_edge("writer", "supervisor")

# Compile the graph
multi_agent = graph.compile()

# Run the multi-agent system
result = multi_agent.invoke({
    "messages": [
        HumanMessage(content="Research the latest trends in LLM efficiency, analyze the key metrics, and write a summary report.")
    ],
    "next_agent": "",
})

# The supervisor orchestrated: researcher → analyst → writer → FINISH
print(result["messages"][-1].content)
ℹ️
LangGraph advantages over AgentExecutor:
Explicit state: You define exactly what's in your agent state (messages, intermediate results, metadata)
Conditional routing: Supervisor patterns, branching, loops, and early termination via graph edges
Persistence: Built-in checkpointers (Memory, Redis, PostgreSQL) for long-running workflows
Human-in-the-loop: Add interrupt_before / interrupt_after on any node for human review
Streaming: Stream events from any node in the graph in real-time

Human-in-the-Loop Approval Pattern

For high-stakes actions (creating tickets, sending emails, deploying code), add human approval checkpoints:

from langgraph.checkpoint.memory import MemorySaver

# Create agent with human approval before specific tools
agent_with_approval = create_react_agent(
    model=llm,
    tools=[search_docs, create_ticket, get_user_info],
    prompt="You are an IT support agent. Ask to create tickets when appropriate.",
    checkpointer=MemorySaver(),
    interrupt_before=["tools"],  # Pause BEFORE any tool execution
)

config = {"configurable": {"thread_id": "approval-session"}}

# Start the agent — it will plan a tool call, then pause
result = agent_with_approval.invoke(
    {"messages": [("human", "Create a high-priority ticket for the API outage")]},
    config=config,
)

# At this point, the agent has PLANNED the tool call but NOT executed it
# You can inspect what it wants to do:
pending_tool_calls = result["messages"][-1].tool_calls
print(f"Agent wants to call: {pending_tool_calls[0]['name']}")
print(f"With args: {pending_tool_calls[0]['args']}")

# If approved, resume execution (the agent will execute the tool and continue)
final_result = agent_with_approval.invoke(None, config=config)
print(final_result["messages"][-1].content)

Custom LangGraph Agent (Full Control)

For maximum flexibility, build your agent graph from scratch instead of using create_react_agent:

from typing import Annotated
from typing_extensions import TypedDict
import operator

from langchain_core.messages import BaseMessage, ToolMessage
from langgraph.graph import StateGraph, END

# ─── Define State ───
class CustomAgentState(TypedDict):
    messages: Annotated[list[BaseMessage], operator.add]
    iteration_count: int

# ─── Agent Node: reason and possibly call tools ───
def agent_node(state: CustomAgentState) -> CustomAgentState:
    llm_with_tools = llm.bind_tools([search_docs, create_ticket])
    response = llm_with_tools.invoke(state["messages"])
    return {
        "messages": [response],
        "iteration_count": state["iteration_count"] + 1,
    }

# ─── Tool Node: execute any pending tool calls ───
tool_map = {"search_docs": search_docs, "create_ticket": create_ticket}

def tool_node(state: CustomAgentState) -> CustomAgentState:
    last_msg = state["messages"][-1]
    tool_messages = []
    for tc in last_msg.tool_calls:
        result = tool_map[tc["name"]].invoke(tc["args"])
        tool_messages.append(ToolMessage(content=str(result), tool_call_id=tc["id"]))
    return {"messages": tool_messages, "iteration_count": state["iteration_count"]}

# ─── Router: decide if we should call tools, continue, or stop ───
def should_continue(state: CustomAgentState) -> str:
    last_msg = state["messages"][-1]
    # Safety: stop after 10 iterations to prevent infinite loops
    if state["iteration_count"] >= 10:
        return "end"
    # If the model made tool calls, execute them
    if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
        return "tools"
    # Otherwise, the agent is done
    return "end"

# ─── Build Graph ───
graph = StateGraph(CustomAgentState)
graph.add_node("agent", agent_node)
graph.add_node("tools", tool_node)

graph.set_entry_point("agent")

graph.add_conditional_edges(
    "agent",
    should_continue,
    {"tools": "tools", "end": END},
)
graph.add_edge("tools", "agent")  # After tools → back to agent

custom_agent = graph.compile()

# Run
result = custom_agent.invoke({
    "messages": [("human", "Search for deployment best practices and create a ticket to update our runbook")],
    "iteration_count": 0,
})
print(result["messages"][-1].content)
💡
Production checklist for agents:
Max iterations: Always set a loop limit to prevent runaway agents (and runaway costs)
Timeout: Set a request_timeout on the LLM to prevent hung calls
Observability: Use callbacks or LangSmith to trace every step of the agent's reasoning
Tool safety: Validate tool inputs, use least-privilege credentials, audit tool invocations
Fallbacks: Use .with_fallbacks() on the LLM so agent survives model outages
Human-in-the-loop: For destructive actions, add interrupt_before checkpoints

When to Use What

ApproachComplexityUse When
prompt | llm.bind_tools() Low Single-turn tool call, no looping needed
create_react_agent() Medium Standard agent with tools that needs to loop (most use cases)
Custom StateGraph High Custom state, branching logic, multi-agent, human-in-the-loop
Supervisor + Workers High Multiple specialist agents coordinating on complex tasks