Back to handbooks index

LangGraph Practitioner's Handbook

Production-grade reference for building stateful, long-running agents with LangGraph 1.x — covering agents, tools, RAG, memory, streaming, and debugging.

LangGraph 1.1+ Python 3.11+ Gemini · GPT · Claude LangSmith Tracing

📦Install & Setup

# Core install
pip install langgraph langchain-core langchain-community

# Model provider packages
pip install langchain-google-vertexai          # Gemini / Vertex AI
pip install langchain-openai                   # OpenAI / Azure OpenAI
pip install langchain-anthropic                # Claude

# Persistence backends
pip install langgraph-checkpoint-postgres      # Postgres (production)
pip install langgraph-checkpoint-sqlite        # SQLite (dev/test)

# Tools
pip install langchain-tavily                   # Tavily web search
pip install langchain-community                # DuckDuckGo, Wikipedia, etc.

# Observability
pip install langsmith
# LangSmith tracing (highly recommended from day one)
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=ls__...
LANGCHAIN_PROJECT=my-langgraph-app

# Model keys
OPENAI_API_KEY=sk-...
TAVILY_API_KEY=tvly-...

# Google / Vertex — use Application Default Credentials (ADC) or service account
GOOGLE_APPLICATION_CREDENTIALS=/path/to/sa.json
GOOGLE_CLOUD_PROJECT=my-gcp-project
LangGraph 1.x API stability: The 1.0 release (Oct 2025) guarantees no breaking changes until 2.0. The core primitives — StateGraph, MessagesState, START/END, add_messages, MemorySaver — are stable. langgraph-prebuilt is now bundled into the main package.

🧠Core Concepts

PRIMITIVE
State
A TypedDict (or Pydantic model) that flows through the graph. Every node reads it and returns partial updates. Reducers merge updates (e.g., add_messages appends instead of replacing).
PRIMITIVE
Node
A Python function (sync or async) that takes state and returns a dict of updates. Nodes are the units of work. Can call LLMs, tools, databases, or any code.
PRIMITIVE
Edge
Connects nodes. Regular edges are unconditional. Conditional edges route to different nodes based on state — the core of branching logic.
RUNTIME
Checkpointer
Saves state snapshot at every superstep. Enables fault tolerance, resume-after-crash, human-in-the-loop, and time-travel debugging.
RUNTIME
Thread
A unique thread_id in config. Groups checkpoints for a single conversation/workflow instance. Multi-tenant apps need one thread per session.
RUNTIME
Superstep
One round of node executions. All nodes that can run in parallel do so in one superstep. The graph stops when no more nodes are ready — reaching END.
START
node_a
conditional_edge
node_b or node_c
END

🚀First Agent — Minimal ReAct

from typing import Annotated
from typing_extensions import TypedDict
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import InMemorySaver

# ── 1. Define tools ────────────────────────────────────────
@tool
def multiply(a: int, b: int) -> int:
    """Multiply two integers."""
    return a * b

tools = [multiply]

# ── 2. Bind model ───────────────────────────────────────────
llm = ChatOpenAI(model="gpt-4o", temperature=0)
llm_with_tools = llm.bind_tools(tools)

# ── 3. Define nodes ─────────────────────────────────────────
def call_model(state: MessagesState):
    return {"messages": [llm_with_tools.invoke(state["messages"])]}

# ── 4. Build graph ──────────────────────────────────────────
builder = StateGraph(MessagesState)
builder.add_node("agent", call_model)
builder.add_node("tools", ToolNode(tools))

builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", tools_condition)  # routes to "tools" or END
builder.add_edge("tools", "agent")                       # loop back after tool call

# ── 5. Compile with memory ──────────────────────────────────
graph = builder.compile(checkpointer=InMemorySaver())

# ── 6. Invoke ───────────────────────────────────────────────
config = {"configurable": {"thread_id": "session-1"}}
result = graph.invoke(
    {"messages": [{"role": "user", "content": "What is 6 × 7?"}]},
    config=config
)
print(result["messages"][-1].content)
💡
MessagesState is a built-in state that uses add_messages as its reducer. Messages are appended — never replaced. It's the standard choice for chat/agent applications. For custom state, extend it or define your own TypedDict.

🌐Gemini — Vertex AI

from langchain_google_vertexai import ChatVertexAI
from google.auth import default as gauth

# ── Auth: Application Default Credentials (recommended) ─────
# Run once locally: gcloud auth application-default login
# In GCP (Cloud Run / GKE): uses attached service account automatically

credentials, project = gauth()

# ── Model init ──────────────────────────────────────────────
llm = ChatVertexAI(
    model="gemini-2.0-flash",      # or "gemini-2.0-pro", "gemini-1.5-pro"
    project="my-gcp-project",
    location="us-central1",
    temperature=0.1,
    max_output_tokens=4096,
    streaming=True,                  # enable token streaming
)

# ── With tools ──────────────────────────────────────────────
llm_with_tools = llm.bind_tools(tools)

# ── Gemini native grounding (Google Search) ─────────────────
from langchain_google_vertexai.model_garden import ChatAnthropicVertex
from vertexai.generative_models import Tool, grounding

# Enable Vertex AI Search grounding on the model
grounding_tool = Tool.from_google_search_retrieval(
    google_search_retrieval=grounding.GoogleSearchRetrieval()
)

llm_grounded = ChatVertexAI(
    model="gemini-2.0-flash",
    project="my-gcp-project",
    location="us-central1",
    tools=[grounding_tool],           # native Vertex AI grounding
)

Vertex AI Agent Engine (managed deployment)

import vertexai
from vertexai.preview import reasoning_engines

vertexai.init(project="my-project", location="us-central1")

# Wrap compiled graph in AgentEngine
app = reasoning_engines.AdkApp(
    agent=graph,
    enable_tracing=True,
)

# Deploy to Vertex AI Agent Engine
remote_app = reasoning_engines.ReasoningEngine.create(
    app,
    requirements=["langgraph>=1.0", "langchain-google-vertexai"],
    display_name="my-agent",
)

# Invoke remotely
response = remote_app.query(input="What is the exchange rate for USD/INR?")

☁️GPT — Azure OpenAI

from langchain_openai import AzureChatOpenAI
import os

# ── Azure OpenAI init ────────────────────────────────────────
llm = AzureChatOpenAI(
    azure_deployment="gpt-4o",           # your deployment name in Azure
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    api_key=os.environ["AZURE_OPENAI_API_KEY"],
    api_version="2025-01-01-preview",     # use latest stable API version
    temperature=0,
    streaming=True,
    max_retries=3,
)

# ── With tools ──────────────────────────────────────────────
llm_with_tools = llm.bind_tools(tools)

# ── Azure env var pattern (recommended) ─────────────────────
# Set in .env / Key Vault / CI secret, never in code:
# AZURE_OPENAI_ENDPOINT = https://my-resource.openai.azure.com/
# AZURE_OPENAI_API_KEY  = <key>
# OPENAI_API_VERSION    = 2025-01-01-preview
# When these are set, AzureChatOpenAI() picks them up automatically.

# ── Azure with Managed Identity (prod, no key in env) ───────
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from langchain_openai import AzureChatOpenAI

token_provider = get_bearer_token_provider(
    DefaultAzureCredential(),
    "https://cognitiveservices.azure.com/.default"
)

llm_managed = AzureChatOpenAI(
    azure_deployment="gpt-4o",
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    api_version="2025-01-01-preview",
    azure_ad_token_provider=token_provider,  # no API key needed
    temperature=0,
)

Multi-Model Patterns

Route different nodes to different models. Use a fast/cheap model for routing decisions, a powerful model for generation, a local model for sensitive data.

from langchain_google_vertexai import ChatVertexAI
from langchain_openai import AzureChatOpenAI

# Router: fast Gemini Flash for routing decisions
router_llm = ChatVertexAI(model="gemini-2.0-flash", temperature=0)

# Generator: GPT-4o via Azure for content generation
gen_llm = AzureChatOpenAI(azure_deployment="gpt-4o", temperature=0.7)

# Analyst: Gemini Pro for long context analysis (2M tokens)
analyst_llm = ChatVertexAI(model="gemini-2.0-pro", temperature=0)

def router_node(state: State) -> State:
    decision = router_llm.invoke(
        [("system", "Route this query. Reply with exactly: GENERATE or ANALYSE"),
         ("user", state["query"])]
    )
    return {"route": decision.content.strip()}

def generate_node(state: State) -> State:
    response = gen_llm.invoke(state["messages"])
    return {"messages": [response]}

def analyse_node(state: State) -> State:
    response = analyst_llm.invoke(state["messages"])
    return {"messages": [response]}

def route_decision(state: State) -> str:
    return "generate" if state["route"] == "GENERATE" else "analyse"

builder.add_conditional_edges("router", route_decision)

🔧Tool Binding

from langchain_core.tools import tool, StructuredTool
from pydantic import BaseModel, Field

# ── Simple tool with @tool decorator ────────────────────────
@tool
def get_stock_price(ticker: str) -> str:
    """Get current stock price for a ticker symbol like RELIANCE.NS or TCS.NS."""
    # real impl: call yfinance / Alpha Vantage / NSE API
    return f"₹2,450.00 (mock)"

# ── Pydantic schema for complex input ───────────────────────
class SearchInput(BaseModel):
    query: str = Field(description="Search query string")
    max_results: int = Field(default=5, description="Number of results")

@tool(args_schema=SearchInput)
def web_search(query: str, max_results: int = 5) -> str:
    """Search the web for current information."""
    # impl: call Tavily / DuckDuckGo
    ...

# ── StructuredTool from function ────────────────────────────
def _run_python(code: str) -> str:
    """Execute Python code in sandbox and return output."""
    ...

python_tool = StructuredTool.from_function(
    func=_run_python,
    name="execute_python",
    description="Run Python code and return stdout. Use for calculations.",
)

# ── Async tool ──────────────────────────────────────────────
@tool
async def fetch_url(url: str) -> str:
    """Fetch content from a URL."""
    async with aiohttp.ClientSession() as s:
        async with s.get(url) as r:
            return await r.text()

# ── Bind all tools to model ──────────────────────────────────
all_tools = [get_stock_price, web_search, python_tool, fetch_url]
llm_with_tools = llm.bind_tools(all_tools)

📌Grounding & Citations

Grounding anchors model responses to verifiable sources. There are two patterns: tool-based grounding (the agent actively searches and cites) and native grounding (built into Vertex AI for Gemini).

from langchain_core.messages import SystemMessage, HumanMessage

@tool
def search_and_cite(query: str) -> str:
    """Search the web and return results with source URLs for citation."""
    results = tavily.invoke(query)
    # Format with sources for the model to cite
    formatted = []
    for r in results:
        formatted.append(f"[Source: {r['url']}]\n{r['content']}")
    return "\n\n---\n\n".join(formatted)

def grounded_agent_node(state: MessagesState):
    sys = SystemMessage(content="""You answer questions using provided sources.
Always cite sources inline as [Source: URL]. Never make up facts.""")
    response = llm_with_tools.invoke([sys] + state["messages"])
    return {"messages": [response]}
from vertexai.generative_models import GenerativeModel, Tool, grounding
import vertexai

vertexai.init(project="my-project", location="us-central1")

# Google Search grounding built into Gemini on Vertex AI
model = GenerativeModel(
    model_name="gemini-2.0-flash",
    tools=[Tool.from_google_search_retrieval(
        google_search_retrieval=grounding.GoogleSearchRetrieval(
            dynamic_retrieval_config=grounding.DynamicRetrievalConfig(
                dynamic_threshold=0.7  # only search when confidence low
            )
        )
    )]
)

# Response includes grounding_metadata with citations
response = model.generate_content("What is the current Nifty 50 level?")
for chunk in response.candidates[0].grounding_metadata.grounding_chunks:
    print(chunk.web.uri, chunk.web.title)

🔩ToolNode — Prebuilt Tool Executor

from langgraph.prebuilt import ToolNode, tools_condition

# ToolNode automatically:
#  1. Extracts tool_calls from the last AIMessage
#  2. Executes each tool (parallel by default in v1.x)
#  3. Returns ToolMessages with results

tool_node = ToolNode(
    tools=all_tools,
    handle_tool_errors=True,   # catches exceptions, returns error as ToolMessage
    name="tools",
)

# tools_condition: routes to "tools" if last msg has tool_calls, else END
builder.add_conditional_edges("agent", tools_condition)
builder.add_edge("tools", "agent")

# Custom tool error handling
def handle_tool_error(state: MessagesState) -> MessagesState:
    error = state.get("error")
    tool_calls = state["messages"][-1].tool_calls
    return {"messages": [
        ToolMessage(
            content=f"Error: {repr(error)}\nRetry with corrected input.",
            tool_call_id=tc["id"],
        )
        for tc in tool_calls
    ]}

tool_node_safe = ToolNode(tools, handle_tool_errors=handle_tool_error)

🗺Workflow Patterns

PATTERN
Sequential
Fixed pipeline: A → B → C → END. No branching. Ideal for ETL-style workflows, document processing, data enrichment chains.
PATTERN
Conditional Branching
Router node decides next step at runtime. Core of most agent architectures. Use add_conditional_edges with a router function.
PATTERN
Cycles / Loops
Nodes can loop back: agent → tools → agent. Required for ReAct. Always add a max_steps guard to prevent infinite loops.
PATTERN
Fan-out / Fan-in (Map-Reduce)
Use Send() to spawn parallel branches with different inputs. Results merge back in an aggregator node. Ideal for parallel research.
PATTERN
Supervisor / Multi-Agent
A supervisor node routes tasks to specialist agents. Each specialist is itself a compiled subgraph. Hierarchical control with clear separation.
PATTERN
Human-in-the-Loop
interrupt() pauses the graph, saves state, waits for human input. Resume with Command(resume=value). Works across hours or days.

Fan-out with Send (parallel map-reduce)

from langgraph.types import Send
from operator import add
from typing import Annotated

class ResearchState(TypedDict):
    topics: list[str]
    results: Annotated[list[str], add]  # reducer: append results from all branches

def generate_topics(state: ResearchState) -> ResearchState:
    return {"topics": ["market analysis", "risk factors", "valuation"]}

def fan_out(state: ResearchState) -> list[Send]:
    # Spawn one branch per topic — they run in parallel
    return [Send("research_topic", {"topic": t}) for t in state["topics"]]

def research_topic(state: dict) -> ResearchState:
    result = llm.invoke(f"Research: {state['topic']}")
    return {"results": [result.content]}

def aggregate(state: ResearchState) -> ResearchState:
    combined = "\n\n".join(state["results"])
    synthesis = llm.invoke(f"Synthesise: {combined}")
    return {"final_report": synthesis.content}

builder = StateGraph(ResearchState)
builder.add_node("topics", generate_topics)
builder.add_node("research_topic", research_topic)
builder.add_node("aggregate", aggregate)
builder.add_conditional_edges("topics", fan_out, ["research_topic"])
builder.add_edge("research_topic", "aggregate")

🔄ReAct Agent (Prebuilt)

from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import InMemorySaver

# Fastest way to a production ReAct agent
graph = create_react_agent(
    model=llm,
    tools=all_tools,
    checkpointer=InMemorySaver(),
    state_modifier="""You are a financial research assistant for Indian markets.
Always cite your sources. Use INR currency formatting.
If you don't know, say so — never hallucinate data.""",
    max_steps=15,             # guard against infinite loops
)

# Stream results
config = {"configurable": {"thread_id": "research-session-1"}}
async for chunk in graph.astream(
    {"messages": [("user", "Analyse the latest Reliance Industries earnings")]},
    config=config,
    stream_mode="messages",
):
    msg, meta = chunk
    if hasattr(msg, "content") and msg.content:
        print(msg.content, end="", flush=True)

📋Plan & Execute

For complex, multi-step tasks. A planner LLM creates a task list; an executor works through each step, potentially updating the plan.

from typing import Annotated
from operator import add

class PlanState(TypedDict):
    task: str
    plan: list[str]          # steps to execute
    results: Annotated[list, add]  # accumulated step results
    final: str

def planner(state: PlanState) -> PlanState:
    plan_prompt = f"""Break this task into 3-5 actionable steps:
Task: {state['task']}
Return as a numbered list, one step per line."""
    response = llm.invoke(plan_prompt)
    steps = [s.strip() for s in response.content.split("\n") if s.strip()]
    return {"plan": steps}

def executor(state: PlanState) -> PlanState:
    step = state["plan"][0]              # take next step
    remaining = state["plan"][1:]         # pop it
    result = executor_agent.invoke({"messages": [("user", step)]})
    return {
        "plan": remaining,
        "results": [result["messages"][-1].content]
    }

def synthesiser(state: PlanState) -> PlanState:
    all_results = "\n".join(state["results"])
    final = llm.invoke(f"Synthesise these findings:\n{all_results}")
    return {"final": final.content}

def should_continue(state: PlanState) -> str:
    return "execute" if state["plan"] else "synthesise"

builder = StateGraph(PlanState)
builder.add_node("plan",       planner)
builder.add_node("execute",    executor)
builder.add_node("synthesise", synthesiser)
builder.add_edge(START, "plan")
builder.add_edge("plan", "execute")
builder.add_conditional_edges("execute", should_continue)
builder.add_edge("synthesise", END)

🤝Multi-Agent Systems

from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.messages import HumanMessage

# ── Specialist sub-agents ────────────────────────────────────
research_agent = create_react_agent(llm, tools=[tavily], name="Researcher")
analyst_agent  = create_react_agent(llm, tools=[python_tool], name="Analyst")
writer_agent   = create_react_agent(llm, tools=[], name="Writer")

class SupervisorState(TypedDict):
    messages: Annotated[list, add_messages]
    next_agent: str

AGENTS = ["researcher", "analyst", "writer", "FINISH"]

def supervisor_node(state: SupervisorState) -> SupervisorState:
    prompt = f"""You are a supervisor. Based on the conversation, choose the next worker.
Workers: {AGENTS}. Reply with exactly one worker name or FINISH."""
    response = llm.invoke([("system", prompt)] + state["messages"])
    return {"next_agent": response.content.strip()}

def make_agent_node(agent, name: str):
    def node(state: SupervisorState):
        result = agent.invoke({"messages": state["messages"]})
        last = result["messages"][-1]
        return {"messages": [HumanMessage(content=last.content, name=name)]}
    return node

builder = StateGraph(SupervisorState)
builder.add_node("supervisor",  supervisor_node)
builder.add_node("researcher",  make_agent_node(research_agent, "Researcher"))
builder.add_node("analyst",     make_agent_node(analyst_agent,  "Analyst"))
builder.add_node("writer",      make_agent_node(writer_agent,   "Writer"))

builder.add_edge(START, "supervisor")
builder.add_conditional_edges(
    "supervisor",
    lambda s: s["next_agent"],
    {"researcher": "researcher", "analyst": "analyst",
     "writer": "writer", "FINISH": END}
)
for agent_name in ["researcher", "analyst", "writer"]:
    builder.add_edge(agent_name, "supervisor")

📚RAG Patterns

from langchain_community.vectorstores import Chroma
from langchain_google_vertexai import VertexAIEmbeddings
from langchain_core.documents import Document

# ── Vector store setup ───────────────────────────────────────
embeddings = VertexAIEmbeddings(model="text-embedding-005")
vectorstore = Chroma(
    collection_name="financial_docs",
    embedding_function=embeddings,
    persist_directory="./chroma_db",
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

# ── RAG State ───────────────────────────────────────────────
class RAGState(TypedDict):
    question: str
    context: list[Document]
    answer: str

# ── Nodes ───────────────────────────────────────────────────
def retrieve(state: RAGState) -> RAGState:
    docs = retriever.invoke(state["question"])
    return {"context": docs}

def generate(state: RAGState) -> RAGState:
    context_text = "\n\n".join(d.page_content for d in state["context"])
    prompt = f"""Answer using only the provided context. Cite page numbers.

Context:
{context_text}

Question: {state['question']}
Answer:"""
    response = llm.invoke(prompt)
    return {"answer": response.content}

builder = StateGraph(RAGState)
builder.add_node("retrieve", retrieve)
builder.add_node("generate", generate)
builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "generate")
builder.add_edge("generate", END)

🔬Adaptive RAG / Self-RAG

Adaptive RAG uses a grader to evaluate retrieved documents and the generated answer — rewriting the query or falling back to web search when quality is poor.

from pydantic import BaseModel

class GradeDoc(BaseModel):
    relevant: bool
    reason: str

class GradeAnswer(BaseModel):
    grounded: bool
    useful: bool

grader_llm = llm.with_structured_output(GradeDoc)
answer_grader = llm.with_structured_output(GradeAnswer)

class AdaptiveRAGState(TypedDict):
    question: str
    context: list[Document]
    answer: str
    retries: int

def grade_documents(state: AdaptiveRAGState) -> AdaptiveRAGState:
    filtered = []
    for doc in state["context"]:
        grade = grader_llm.invoke(
            f"Question: {state['question']}\nDocument: {doc.page_content}\nIs relevant?"
        )
        if grade.relevant:
            filtered.append(doc)
    return {"context": filtered}

def decide_after_grading(state: AdaptiveRAGState) -> str:
    if not state["context"] or state["retries"] > 2:
        return "web_search"     # fall back to web
    return "generate"

def rewrite_query(state: AdaptiveRAGState) -> AdaptiveRAGState:
    rewrite = llm.invoke(
        f"Rewrite this query for better retrieval: {state['question']}"
    )
    return {"question": rewrite.content, "retries": state["retries"] + 1}

def grade_answer(state: AdaptiveRAGState) -> str:
    grade = answer_grader.invoke(
        f"Question: {state['question']}\nAnswer: {state['answer']}"
    )
    return END if (grade.grounded and grade.useful) else "rewrite"

builder.add_node("retrieve",       retrieve)
builder.add_node("grade_docs",     grade_documents)
builder.add_node("generate",       generate)
builder.add_node("rewrite",        rewrite_query)
builder.add_node("web_search",     web_search_node)
builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "grade_docs")
builder.add_conditional_edges("grade_docs", decide_after_grading)
builder.add_edge("generate", "grade_answer_node")
builder.add_conditional_edges("grade_answer_node", grade_answer)

🗃State Design

from typing import Annotated
from operator import add
from langgraph.graph.message import add_messages
from pydantic import BaseModel, field_validator

# ── 1. TypedDict (standard) ─────────────────────────────────
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]  # reducer: append messages
    steps: Annotated[int, lambda a, b: a + b]  # reducer: sum
    context: str                               # last-write-wins (no reducer)
    max_steps: int                             # configuration constant

# ── 2. Pydantic (validation + defaults) ─────────────────────
class ValidatedState(BaseModel):
    messages: Annotated[list, add_messages] = []
    confidence: float = 0.0
    retries: int = 0

    @field_validator("confidence")
    def clamp_confidence(cls, v): return max(0.0, min(1.0, v))

# ── 3. Input/Output schemas (restrict what callers see) ──────
class InputState(TypedDict):
    question: str

class OutputState(TypedDict):
    answer: str
    sources: list[str]

graph = builder.compile(input_schema=InputState, output_schema=OutputState)

# ── 4. Private state (internal only, not in input/output) ────
class PrivateState(TypedDict):
    _intermediate_steps: list  # underscore = private by convention
Keep state minimal. Don't store transient values (loop variables, intermediate strings) in state — pass through function scope. Only persist what needs to survive across nodes or be accessible from multiple nodes. Large state objects increase checkpoint size and slow persistence.

🧩Memory & Checkpointers

from langgraph.checkpoint.memory import InMemorySaver    # dev/testing only
from langgraph.checkpoint.sqlite import SqliteSaver       # local / small apps
from langgraph.checkpoint.postgres import PostgresSaver   # production
from psycopg_pool import ConnectionPool

# ── Dev: in-memory (no persistence between restarts) ────────
checkpointer = InMemorySaver()

# ── Dev: SQLite (persists to disk) ──────────────────────────
with SqliteSaver.from_conn_string(":memory:") as cp:
    graph = builder.compile(checkpointer=cp)

# ── Production: Postgres ────────────────────────────────────
DB_URI = "postgresql://user:pass@host:5432/langgraph?sslmode=require"
pool = ConnectionPool(conninfo=DB_URI, max_size=10, open=False)
pool.open()
with pool.connection() as conn:
    pg_saver = PostgresSaver(conn)
    pg_saver.setup()                # creates tables on first run
    graph = builder.compile(checkpointer=pg_saver)

# ── Thread-scoped config ─────────────────────────────────────
config = {
    "configurable": {
        "thread_id": f"tenant-{tenant_id}:user-{user_id}:session-{session_id}",
        "checkpoint_ns": f"tenant-{tenant_id}",   # multi-tenant isolation
    }
}

# ── Time travel: replay from a specific checkpoint ───────────
checkpoints = list(graph.get_state_history(config))
past_config = checkpoints[-3].config    # 3 steps back
graph.invoke(None, config=past_config)   # replay from there

# ── Inspect current state ───────────────────────────────────
state = graph.get_state(config)
print(state.values)          # current state dict
print(state.next)            # next nodes to execute
print(state.metadata)        # step number, source

Long-term cross-session memory

from langgraph.store.memory import InMemoryStore

store = InMemoryStore()

def memory_node(state: MessagesState, store=store):
    # Load user preferences from long-term store
    user_id = state["user_id"]
    prefs = store.get(("user_prefs", user_id), "preferences")

    # After generating response, save new facts
    store.put(
        ("user_prefs", user_id), "preferences",
        {"currency": "INR", "domain": "finance"}
    )
    return state

graph = builder.compile(checkpointer=InMemorySaver(), store=store)

🔲Subgraphs

Subgraphs are independently compiled graphs used as nodes in a parent graph. They manage their own internal state and expose only what the parent needs.

from langgraph.graph import StateGraph, START, END

# ── Subgraph: internal state ─────────────────────────────────
class ResearchSubState(TypedDict):
    query: str        # shared with parent (same key name)
    search_results: list  # internal to subgraph
    summary: str      # output back to parent

def search_step(state: ResearchSubState):
    results = tavily.invoke(state["query"])
    return {"search_results": results}

def summarise_step(state: ResearchSubState):
    summary = llm.invoke(f"Summarise: {state['search_results']}")
    return {"summary": summary.content}

sub_builder = StateGraph(ResearchSubState)
sub_builder.add_node("search",   search_step)
sub_builder.add_node("summarise", summarise_step)
sub_builder.add_edge(START, "search")
sub_builder.add_edge("search", "summarise")
sub_builder.add_edge("summarise", END)
research_subgraph = sub_builder.compile()

# ── Parent graph: add subgraph as a node ─────────────────────
class ParentState(TypedDict):
    query: str           # same key as subgraph — state is passed down
    summary: str         # output from subgraph flows back up
    final_report: str

parent = StateGraph(ParentState)
parent.add_node("research", research_subgraph)  # compiled subgraph as node
parent.add_node("report", lambda s: {"final_report": s["summary"]})
parent.add_edge(START, "research")
parent.add_edge("research", "report")
parent.add_edge("report", END)

# Stream with subgraph events visible
async for event in graph.astream(input, stream_mode="updates", subgraphs=True):
    namespace, data = event
    print(f"[{' > '.join(namespace) or 'root'}]", data)

Streaming

ModeWhat streamsBest for
messagesToken-by-token LLM output (AIMessageChunk)Chat UI, real-time display
updatesDict of state changes after each nodeDashboards, progress tracking
valuesFull state snapshot after each superstepDebugging, state inspection
customAnything you emit via astream_eventsTool call results, intermediate data
debugEvery internal event (verbose)Deep debugging
import asyncio
from langchain_core.messages import AIMessageChunk

# ── Token streaming (chat UI) ────────────────────────────────
async for chunk in graph.astream(
    {"messages": [("user", "Explain SEBI regulations")]},
    config=config,
    stream_mode="messages",
):
    msg, metadata = chunk
    if isinstance(msg, AIMessageChunk) and msg.content:
        print(msg.content, end="", flush=True)

# ── Node updates (dashboard) ─────────────────────────────────
async for update in graph.astream(input, stream_mode="updates"):
    node_name = list(update.keys())[0]
    print(f"✓ {node_name} completed")

# ── Multi-mode: tokens + node events ────────────────────────
async for chunk in graph.astream(
    input, config=config,
    stream_mode=["messages", "updates"],
    subgraphs=True,
    version="v2",              # v2: chunk has type/ns/data keys
):
    if chunk["type"] == "messages":
        msg, _ = chunk["data"]
        if isinstance(msg, AIMessageChunk) and msg.content:
            print(msg.content, end="", flush=True)
    elif chunk["type"] == "updates":
        node = list(chunk["data"].keys())[0]
        ns = chunk["ns"]
        print(f"\n[{' > '.join(ns) or 'root'}::{node}]")

# ── Custom stream events ─────────────────────────────────────
from langchain_core.callbacks.manager import adispatch_custom_event

async def my_node(state):
    await adispatch_custom_event("tool_start", {"tool": "web_search"})
    result = await do_search(state["query"])
    await adispatch_custom_event("tool_end", {"results": len(result)})
    return {"context": result}

async for event in graph.astream_events(input, version="v2"):
    if event["event"] == "on_custom_event":
        print(event["name"], event["data"])

🙋Human-in-the-Loop (HITL)

from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver

def approve_action(state: MessagesState):
    # Graph pauses here — state is checkpointed
    action = state["proposed_action"]
    user_input = interrupt({
        "message": f"Approve this action?\n{action}",
        "options": ["yes", "no", "edit"]
    })  # blocks until Command(resume=...) is called

    if user_input == "yes":
        return {"approved": True}
    elif user_input == "no":
        return {"approved": False}
    else:
        # Loop: ask again with edited action
        edited = interrupt({"message": "Enter edited action:"})
        return {"proposed_action": edited, "approved": True}

graph = builder.compile(checkpointer=InMemorySaver())

# 1. Run until interrupt
result = graph.invoke(input, config=config)
# result["__interrupt__"] contains the interrupt payload

# 2. Inspect state
state = graph.get_state(config)

# 3. Resume with user's answer
graph.invoke(Command(resume="yes"), config=config)

# 4. Or update state and continue
graph.update_state(config, {"proposed_action": "revised plan"})
graph.invoke(None, config=config)  # resume from updated state
# Compile with breakpoints — simpler than interrupt()
graph = builder.compile(
    checkpointer=InMemorySaver(),
    interrupt_before=["execute_tools"],   # pause BEFORE this node runs
    interrupt_after=["planner"],           # pause AFTER this node runs
)

# Inspect, optionally modify, then continue
state = graph.get_state(config)
print(state.next)                     # which node will run next
graph.update_state(config, {"plan": modified_plan})
graph.invoke(None, config=config)      # continue

🛡Error Handling

from tenacity import retry, stop_after_attempt, wait_exponential
from langchain_core.messages import ToolMessage

# ── Retry on node failure ────────────────────────────────────
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
async def resilient_node(state: AgentState):
    return await llm.ainvoke(state["messages"])

# ── Max steps guard (prevent infinite loops) ────────────────
def should_continue(state: AgentState) -> str:
    if state["steps"] >= state.get("max_steps", 20):
        return "force_end"     # emergency exit
    last_msg = state["messages"][-1]
    return "tools" if last_msg.tool_calls else END

# ── Tool error as ToolMessage (not exception) ───────────────
def safe_tool_node(state: MessagesState):
    last = state["messages"][-1]
    results = []
    for tc in last.tool_calls:
        try:
            result = tool_map[tc["name"]].invoke(tc["args"])
            results.append(ToolMessage(content=str(result), tool_call_id=tc["id"]))
        except Exception as e:
            results.append(ToolMessage(
                content=f"Error: {e}. Try a different approach.",
                tool_call_id=tc["id"]
            ))
    return {"messages": results}

# ── Fallback LLM node ────────────────────────────────────────
primary_llm = ChatVertexAI(model="gemini-2.0-flash")
fallback_llm = AzureChatOpenAI(azure_deployment="gpt-4o")
resilient_llm = primary_llm.with_fallbacks([fallback_llm])

🔬Debugging — LangSmith & Local

LangGraph 1.x integrates deeply with LangSmith. Every node execution, LLM call, and tool call becomes a traced span. This is the primary production debugging path.

# Enable LangSmith tracing — add to .env before starting your process
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=ls__xxxxxxxxxxxxxxxx
LANGCHAIN_PROJECT=arthavidya-agent
LANGCHAIN_ENDPOINT=https://api.smith.langchain.com  # default
from langgraph.graph import StateGraph

# ── 1. Visualise graph structure ─────────────────────────────
graph = builder.compile()
print(graph.get_graph().draw_ascii())          # terminal ASCII art
graph.get_graph().draw_mermaid_png()            # .png file of the graph
graph.get_graph(xray=True).draw_mermaid_png() # show subgraph internals

# ── 2. Debug stream — every internal event ───────────────────
for event in graph.stream(input, config=config, stream_mode="debug"):
    print(event["type"], event.get("step"))

# ── 3. Inspect state at any checkpoint ──────────────────────
state = graph.get_state(config)
print("current values:",  state.values)
print("next nodes:",      state.next)
print("at step:",         state.metadata["step"])

# ── 4. Full state history (time travel) ─────────────────────
for snapshot in graph.get_state_history(config):
    print(snapshot.metadata["step"], snapshot.next, snapshot.config)

# ── 5. Print every message in a run ─────────────────────────
final = graph.invoke(input, config=config)
for msg in final["messages"]:
    print(f"[{msg.type}] {msg.content[:200]}")

# ── 6. astream_events — fine-grained event trace ────────────
async for event in graph.astream_events(input, version="v2"):
    kind = event["event"]
    name = event["name"]
    if kind == "on_chat_model_start":
        print(f"→ LLM start: {name}")
    elif kind == "on_tool_start":
        print(f"→ Tool: {name} args={event['data']['input']}")
    elif kind == "on_chat_model_end":
        output = event["data"]["output"]
        tokens = output.usage_metadata
        print(f"← LLM done | tokens: {tokens}")

LangSmith key debugging workflows

LANGSMITH
Trace Explorer
Every graph run becomes a trace with nested spans. Click any node span to see input state, output state, latency, token usage. Filter by project, tag, or date range at smith.langchain.com.
LANGSMITH
Dataset + Evaluation
Save failing traces to a dataset. Run evaluate() against it with a custom evaluator. Catch regressions in CI: pytest --langsmith-eval.
LANGSMITH
LangSmith Studio
Visual graph editor. Run and replay traces interactively. Modify node inputs and re-run from any point. Invaluable for debugging HITL flows.
LANGSMITH
Playground
Replay any traced LLM call with modified prompts. Compare model outputs side-by-side. Iterate on system prompt without re-running the full graph.
from langsmith import traceable, Client

client = Client()

# Add metadata to any graph run for filtering in LangSmith
config = {
    "configurable": {"thread_id": "t1"},
    "metadata": {
        "user_id": "u123",
        "session_type": "financial_research",
        "model_version": "arthavidya-v0.2",
    },
    "tags": ["production", "nse"],
}

# Add feedback to a run
client.create_feedback(
    run_id=run_id,
    key="correctness",
    score=1.0,
    comment="Answer was accurate and well-cited"
)

# @traceable wraps any function in a LangSmith span
@traceable(name="custom_retriever")
def hybrid_search(query: str) -> list:
    bm25 = keyword_search(query)
    dense = vector_search(query)
    return rerank(bm25 + dense)
Debug tip — state not updating? Check your reducer. If two nodes write the same key without a reducer, the last write wins. If they run in parallel in the same superstep, this creates a race condition. Use Annotated[list, add] reducers for accumulation.

📏Standards & Best Practices

RuleSeverityRationale
Always type state with TypedDict or Pydantic MUST Untyped dicts break IDE support, hide bugs, cause silent key errors at runtime
Assign a meaningful thread_id on every invocation MUST Required for checkpointing, HITL, and time-travel debugging to work
Use Postgres checkpointer in production (not InMemorySaver) MUST InMemorySaver loses all state on process restart — unacceptable for prod
Set max_steps guard on every cyclical graph MUST Prevents infinite loops from runaway models / bad tool outputs
Never raise unhandled exceptions in tool functions MUST Crashes the graph node. Return error string as ToolMessage instead
Enable LangSmith tracing from day one SHOULD Impossible to debug complex agents without traces. Traces are free in dev tier
Prefer async nodes for I/O-bound work SHOULD Parallel subgraphs and Send() fan-out use asyncio — sync nodes block the event loop
Separate tool definitions from graph wiring SHOULD Tools should be independently testable. Graph file only does wiring
Use input_schema / output_schema on compiled graphs SHOULD Hides internal state from callers; enables cleaner API surface
Keep state minimal — transient data in function scope SHOULD Bloated state = large checkpoints = slow persistence = hard-to-read debug traces
Use connection pooling for Postgres checkpointer SHOULD One connection per request = connection exhaustion under load
Avoid storing LangChain RunnableConfig in state AVOID Config is not serializable; breaks checkpointing silently
Avoid using .invoke() for streaming-first apps AVOID Returns only after full completion — users see a spinner for 30+ second runs
Avoid top-level graph mutations after compile() AVOID The compiled graph is immutable. Add new nodes by rebuilding the StateGraph

Project layout

my_agent/
├── graphs/
│   ├── __init__.py
│   ├── main_graph.py        # StateGraph wiring only
│   ├── research_subgraph.py # subgraph definitions
│   └── rag_graph.py
├── nodes/
│   ├── llm_nodes.py         # LLM call nodes
│   ├── tool_nodes.py        # custom tool executor nodes
│   └── router_nodes.py      # conditional edge functions
├── tools/
│   ├── web_search.py        # tool definitions (independently testable)
│   ├── database.py
│   └── calculators.py
├── state/
│   └── schemas.py           # TypedDict / Pydantic state definitions
├── config/
│   └── settings.py          # pydantic-settings, env var loading
├── tests/
│   ├── test_nodes.py        # unit tests per node
│   ├── test_tools.py
│   └── test_graph.py        # integration tests with InMemorySaver
└── CLAUDE.md                # langgraph context for Claude Code

🔗Reference Links