LangGraph Practitioner's Handbook
Production-grade reference for building stateful, long-running agents with LangGraph 1.x — covering agents, tools, RAG, memory, streaming, and debugging.
📦Install & Setup
# Core install pip install langgraph langchain-core langchain-community # Model provider packages pip install langchain-google-vertexai # Gemini / Vertex AI pip install langchain-openai # OpenAI / Azure OpenAI pip install langchain-anthropic # Claude # Persistence backends pip install langgraph-checkpoint-postgres # Postgres (production) pip install langgraph-checkpoint-sqlite # SQLite (dev/test) # Tools pip install langchain-tavily # Tavily web search pip install langchain-community # DuckDuckGo, Wikipedia, etc. # Observability pip install langsmith
# LangSmith tracing (highly recommended from day one) LANGCHAIN_TRACING_V2=true LANGCHAIN_API_KEY=ls__... LANGCHAIN_PROJECT=my-langgraph-app # Model keys OPENAI_API_KEY=sk-... TAVILY_API_KEY=tvly-... # Google / Vertex — use Application Default Credentials (ADC) or service account GOOGLE_APPLICATION_CREDENTIALS=/path/to/sa.json GOOGLE_CLOUD_PROJECT=my-gcp-project
StateGraph, MessagesState, START/END, add_messages, MemorySaver — are stable. langgraph-prebuilt is now bundled into the main package.🧠Core Concepts
add_messages appends instead of replacing).thread_id in config. Groups checkpoints for a single conversation/workflow instance. Multi-tenant apps need one thread per session.🚀First Agent — Minimal ReAct
from typing import Annotated from typing_extensions import TypedDict from langchain_openai import ChatOpenAI from langchain_core.tools import tool from langgraph.graph import StateGraph, MessagesState, START, END from langgraph.prebuilt import ToolNode, tools_condition from langgraph.checkpoint.memory import InMemorySaver # ── 1. Define tools ──────────────────────────────────────── @tool def multiply(a: int, b: int) -> int: """Multiply two integers.""" return a * b tools = [multiply] # ── 2. Bind model ─────────────────────────────────────────── llm = ChatOpenAI(model="gpt-4o", temperature=0) llm_with_tools = llm.bind_tools(tools) # ── 3. Define nodes ───────────────────────────────────────── def call_model(state: MessagesState): return {"messages": [llm_with_tools.invoke(state["messages"])]} # ── 4. Build graph ────────────────────────────────────────── builder = StateGraph(MessagesState) builder.add_node("agent", call_model) builder.add_node("tools", ToolNode(tools)) builder.add_edge(START, "agent") builder.add_conditional_edges("agent", tools_condition) # routes to "tools" or END builder.add_edge("tools", "agent") # loop back after tool call # ── 5. Compile with memory ────────────────────────────────── graph = builder.compile(checkpointer=InMemorySaver()) # ── 6. Invoke ─────────────────────────────────────────────── config = {"configurable": {"thread_id": "session-1"}} result = graph.invoke( {"messages": [{"role": "user", "content": "What is 6 × 7?"}]}, config=config ) print(result["messages"][-1].content)
add_messages as its reducer. Messages are appended — never replaced. It's the standard choice for chat/agent applications. For custom state, extend it or define your own TypedDict.🌐Gemini — Vertex AI
from langchain_google_vertexai import ChatVertexAI from google.auth import default as gauth # ── Auth: Application Default Credentials (recommended) ───── # Run once locally: gcloud auth application-default login # In GCP (Cloud Run / GKE): uses attached service account automatically credentials, project = gauth() # ── Model init ────────────────────────────────────────────── llm = ChatVertexAI( model="gemini-2.0-flash", # or "gemini-2.0-pro", "gemini-1.5-pro" project="my-gcp-project", location="us-central1", temperature=0.1, max_output_tokens=4096, streaming=True, # enable token streaming ) # ── With tools ────────────────────────────────────────────── llm_with_tools = llm.bind_tools(tools) # ── Gemini native grounding (Google Search) ───────────────── from langchain_google_vertexai.model_garden import ChatAnthropicVertex from vertexai.generative_models import Tool, grounding # Enable Vertex AI Search grounding on the model grounding_tool = Tool.from_google_search_retrieval( google_search_retrieval=grounding.GoogleSearchRetrieval() ) llm_grounded = ChatVertexAI( model="gemini-2.0-flash", project="my-gcp-project", location="us-central1", tools=[grounding_tool], # native Vertex AI grounding )
Vertex AI Agent Engine (managed deployment)
import vertexai from vertexai.preview import reasoning_engines vertexai.init(project="my-project", location="us-central1") # Wrap compiled graph in AgentEngine app = reasoning_engines.AdkApp( agent=graph, enable_tracing=True, ) # Deploy to Vertex AI Agent Engine remote_app = reasoning_engines.ReasoningEngine.create( app, requirements=["langgraph>=1.0", "langchain-google-vertexai"], display_name="my-agent", ) # Invoke remotely response = remote_app.query(input="What is the exchange rate for USD/INR?")
☁️GPT — Azure OpenAI
from langchain_openai import AzureChatOpenAI import os # ── Azure OpenAI init ──────────────────────────────────────── llm = AzureChatOpenAI( azure_deployment="gpt-4o", # your deployment name in Azure azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], api_key=os.environ["AZURE_OPENAI_API_KEY"], api_version="2025-01-01-preview", # use latest stable API version temperature=0, streaming=True, max_retries=3, ) # ── With tools ────────────────────────────────────────────── llm_with_tools = llm.bind_tools(tools) # ── Azure env var pattern (recommended) ───────────────────── # Set in .env / Key Vault / CI secret, never in code: # AZURE_OPENAI_ENDPOINT = https://my-resource.openai.azure.com/ # AZURE_OPENAI_API_KEY = <key> # OPENAI_API_VERSION = 2025-01-01-preview # When these are set, AzureChatOpenAI() picks them up automatically. # ── Azure with Managed Identity (prod, no key in env) ─────── from azure.identity import DefaultAzureCredential, get_bearer_token_provider from langchain_openai import AzureChatOpenAI token_provider = get_bearer_token_provider( DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default" ) llm_managed = AzureChatOpenAI( azure_deployment="gpt-4o", azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], api_version="2025-01-01-preview", azure_ad_token_provider=token_provider, # no API key needed temperature=0, )
⚡Multi-Model Patterns
Route different nodes to different models. Use a fast/cheap model for routing decisions, a powerful model for generation, a local model for sensitive data.
from langchain_google_vertexai import ChatVertexAI from langchain_openai import AzureChatOpenAI # Router: fast Gemini Flash for routing decisions router_llm = ChatVertexAI(model="gemini-2.0-flash", temperature=0) # Generator: GPT-4o via Azure for content generation gen_llm = AzureChatOpenAI(azure_deployment="gpt-4o", temperature=0.7) # Analyst: Gemini Pro for long context analysis (2M tokens) analyst_llm = ChatVertexAI(model="gemini-2.0-pro", temperature=0) def router_node(state: State) -> State: decision = router_llm.invoke( [("system", "Route this query. Reply with exactly: GENERATE or ANALYSE"), ("user", state["query"])] ) return {"route": decision.content.strip()} def generate_node(state: State) -> State: response = gen_llm.invoke(state["messages"]) return {"messages": [response]} def analyse_node(state: State) -> State: response = analyst_llm.invoke(state["messages"]) return {"messages": [response]} def route_decision(state: State) -> str: return "generate" if state["route"] == "GENERATE" else "analyse" builder.add_conditional_edges("router", route_decision)
🔧Tool Binding
from langchain_core.tools import tool, StructuredTool from pydantic import BaseModel, Field # ── Simple tool with @tool decorator ──────────────────────── @tool def get_stock_price(ticker: str) -> str: """Get current stock price for a ticker symbol like RELIANCE.NS or TCS.NS.""" # real impl: call yfinance / Alpha Vantage / NSE API return f"₹2,450.00 (mock)" # ── Pydantic schema for complex input ─────────────────────── class SearchInput(BaseModel): query: str = Field(description="Search query string") max_results: int = Field(default=5, description="Number of results") @tool(args_schema=SearchInput) def web_search(query: str, max_results: int = 5) -> str: """Search the web for current information.""" # impl: call Tavily / DuckDuckGo ... # ── StructuredTool from function ──────────────────────────── def _run_python(code: str) -> str: """Execute Python code in sandbox and return output.""" ... python_tool = StructuredTool.from_function( func=_run_python, name="execute_python", description="Run Python code and return stdout. Use for calculations.", ) # ── Async tool ────────────────────────────────────────────── @tool async def fetch_url(url: str) -> str: """Fetch content from a URL.""" async with aiohttp.ClientSession() as s: async with s.get(url) as r: return await r.text() # ── Bind all tools to model ────────────────────────────────── all_tools = [get_stock_price, web_search, python_tool, fetch_url] llm_with_tools = llm.bind_tools(all_tools)
🔍Web Search Tools
from langchain_tavily import TavilySearch # Tavily is purpose-built for LLM-agent web search tavily = TavilySearch( max_results=5, topic="general", # "general" | "news" | "finance" include_answer=True, # get a direct answer summary include_raw_content=False, days=7, # restrict to last N days (news mode) ) # Use directly in graph as tool tools = [tavily] llm_with_tools = llm.bind_tools(tools)
from langchain_community.tools import DuckDuckGoSearchRun, DuckDuckGoSearchResults # Simple text result ddg = DuckDuckGoSearchRun() # Structured results with URL + snippet ddg_full = DuckDuckGoSearchResults( output_format="list", num_results=5, ) tools = [ddg_full]
from langchain_google_community import GoogleSearchAPIWrapper, GoogleSearchRun search = GoogleSearchAPIWrapper( google_api_key=os.environ["GOOGLE_API_KEY"], google_cse_id=os.environ["GOOGLE_CSE_ID"], k=5, ) google_tool = GoogleSearchRun(api_wrapper=search) tools = [google_tool]
📌Grounding & Citations
Grounding anchors model responses to verifiable sources. There are two patterns: tool-based grounding (the agent actively searches and cites) and native grounding (built into Vertex AI for Gemini).
from langchain_core.messages import SystemMessage, HumanMessage @tool def search_and_cite(query: str) -> str: """Search the web and return results with source URLs for citation.""" results = tavily.invoke(query) # Format with sources for the model to cite formatted = [] for r in results: formatted.append(f"[Source: {r['url']}]\n{r['content']}") return "\n\n---\n\n".join(formatted) def grounded_agent_node(state: MessagesState): sys = SystemMessage(content="""You answer questions using provided sources. Always cite sources inline as [Source: URL]. Never make up facts.""") response = llm_with_tools.invoke([sys] + state["messages"]) return {"messages": [response]}
from vertexai.generative_models import GenerativeModel, Tool, grounding import vertexai vertexai.init(project="my-project", location="us-central1") # Google Search grounding built into Gemini on Vertex AI model = GenerativeModel( model_name="gemini-2.0-flash", tools=[Tool.from_google_search_retrieval( google_search_retrieval=grounding.GoogleSearchRetrieval( dynamic_retrieval_config=grounding.DynamicRetrievalConfig( dynamic_threshold=0.7 # only search when confidence low ) ) )] ) # Response includes grounding_metadata with citations response = model.generate_content("What is the current Nifty 50 level?") for chunk in response.candidates[0].grounding_metadata.grounding_chunks: print(chunk.web.uri, chunk.web.title)
🔩ToolNode — Prebuilt Tool Executor
from langgraph.prebuilt import ToolNode, tools_condition # ToolNode automatically: # 1. Extracts tool_calls from the last AIMessage # 2. Executes each tool (parallel by default in v1.x) # 3. Returns ToolMessages with results tool_node = ToolNode( tools=all_tools, handle_tool_errors=True, # catches exceptions, returns error as ToolMessage name="tools", ) # tools_condition: routes to "tools" if last msg has tool_calls, else END builder.add_conditional_edges("agent", tools_condition) builder.add_edge("tools", "agent") # Custom tool error handling def handle_tool_error(state: MessagesState) -> MessagesState: error = state.get("error") tool_calls = state["messages"][-1].tool_calls return {"messages": [ ToolMessage( content=f"Error: {repr(error)}\nRetry with corrected input.", tool_call_id=tc["id"], ) for tc in tool_calls ]} tool_node_safe = ToolNode(tools, handle_tool_errors=handle_tool_error)
🗺Workflow Patterns
add_conditional_edges with a router function.Send() to spawn parallel branches with different inputs. Results merge back in an aggregator node. Ideal for parallel research.Fan-out with Send (parallel map-reduce)
from langgraph.types import Send from operator import add from typing import Annotated class ResearchState(TypedDict): topics: list[str] results: Annotated[list[str], add] # reducer: append results from all branches def generate_topics(state: ResearchState) -> ResearchState: return {"topics": ["market analysis", "risk factors", "valuation"]} def fan_out(state: ResearchState) -> list[Send]: # Spawn one branch per topic — they run in parallel return [Send("research_topic", {"topic": t}) for t in state["topics"]] def research_topic(state: dict) -> ResearchState: result = llm.invoke(f"Research: {state['topic']}") return {"results": [result.content]} def aggregate(state: ResearchState) -> ResearchState: combined = "\n\n".join(state["results"]) synthesis = llm.invoke(f"Synthesise: {combined}") return {"final_report": synthesis.content} builder = StateGraph(ResearchState) builder.add_node("topics", generate_topics) builder.add_node("research_topic", research_topic) builder.add_node("aggregate", aggregate) builder.add_conditional_edges("topics", fan_out, ["research_topic"]) builder.add_edge("research_topic", "aggregate")
🔄ReAct Agent (Prebuilt)
from langgraph.prebuilt import create_react_agent from langgraph.checkpoint.memory import InMemorySaver # Fastest way to a production ReAct agent graph = create_react_agent( model=llm, tools=all_tools, checkpointer=InMemorySaver(), state_modifier="""You are a financial research assistant for Indian markets. Always cite your sources. Use INR currency formatting. If you don't know, say so — never hallucinate data.""", max_steps=15, # guard against infinite loops ) # Stream results config = {"configurable": {"thread_id": "research-session-1"}} async for chunk in graph.astream( {"messages": [("user", "Analyse the latest Reliance Industries earnings")]}, config=config, stream_mode="messages", ): msg, meta = chunk if hasattr(msg, "content") and msg.content: print(msg.content, end="", flush=True)
📋Plan & Execute
For complex, multi-step tasks. A planner LLM creates a task list; an executor works through each step, potentially updating the plan.
from typing import Annotated from operator import add class PlanState(TypedDict): task: str plan: list[str] # steps to execute results: Annotated[list, add] # accumulated step results final: str def planner(state: PlanState) -> PlanState: plan_prompt = f"""Break this task into 3-5 actionable steps: Task: {state['task']} Return as a numbered list, one step per line.""" response = llm.invoke(plan_prompt) steps = [s.strip() for s in response.content.split("\n") if s.strip()] return {"plan": steps} def executor(state: PlanState) -> PlanState: step = state["plan"][0] # take next step remaining = state["plan"][1:] # pop it result = executor_agent.invoke({"messages": [("user", step)]}) return { "plan": remaining, "results": [result["messages"][-1].content] } def synthesiser(state: PlanState) -> PlanState: all_results = "\n".join(state["results"]) final = llm.invoke(f"Synthesise these findings:\n{all_results}") return {"final": final.content} def should_continue(state: PlanState) -> str: return "execute" if state["plan"] else "synthesise" builder = StateGraph(PlanState) builder.add_node("plan", planner) builder.add_node("execute", executor) builder.add_node("synthesise", synthesiser) builder.add_edge(START, "plan") builder.add_edge("plan", "execute") builder.add_conditional_edges("execute", should_continue) builder.add_edge("synthesise", END)
🤝Multi-Agent Systems
from langgraph.graph import StateGraph, MessagesState, START, END from langchain_core.messages import HumanMessage # ── Specialist sub-agents ──────────────────────────────────── research_agent = create_react_agent(llm, tools=[tavily], name="Researcher") analyst_agent = create_react_agent(llm, tools=[python_tool], name="Analyst") writer_agent = create_react_agent(llm, tools=[], name="Writer") class SupervisorState(TypedDict): messages: Annotated[list, add_messages] next_agent: str AGENTS = ["researcher", "analyst", "writer", "FINISH"] def supervisor_node(state: SupervisorState) -> SupervisorState: prompt = f"""You are a supervisor. Based on the conversation, choose the next worker. Workers: {AGENTS}. Reply with exactly one worker name or FINISH.""" response = llm.invoke([("system", prompt)] + state["messages"]) return {"next_agent": response.content.strip()} def make_agent_node(agent, name: str): def node(state: SupervisorState): result = agent.invoke({"messages": state["messages"]}) last = result["messages"][-1] return {"messages": [HumanMessage(content=last.content, name=name)]} return node builder = StateGraph(SupervisorState) builder.add_node("supervisor", supervisor_node) builder.add_node("researcher", make_agent_node(research_agent, "Researcher")) builder.add_node("analyst", make_agent_node(analyst_agent, "Analyst")) builder.add_node("writer", make_agent_node(writer_agent, "Writer")) builder.add_edge(START, "supervisor") builder.add_conditional_edges( "supervisor", lambda s: s["next_agent"], {"researcher": "researcher", "analyst": "analyst", "writer": "writer", "FINISH": END} ) for agent_name in ["researcher", "analyst", "writer"]: builder.add_edge(agent_name, "supervisor")
📚RAG Patterns
from langchain_community.vectorstores import Chroma from langchain_google_vertexai import VertexAIEmbeddings from langchain_core.documents import Document # ── Vector store setup ─────────────────────────────────────── embeddings = VertexAIEmbeddings(model="text-embedding-005") vectorstore = Chroma( collection_name="financial_docs", embedding_function=embeddings, persist_directory="./chroma_db", ) retriever = vectorstore.as_retriever(search_kwargs={"k": 5}) # ── RAG State ─────────────────────────────────────────────── class RAGState(TypedDict): question: str context: list[Document] answer: str # ── Nodes ─────────────────────────────────────────────────── def retrieve(state: RAGState) -> RAGState: docs = retriever.invoke(state["question"]) return {"context": docs} def generate(state: RAGState) -> RAGState: context_text = "\n\n".join(d.page_content for d in state["context"]) prompt = f"""Answer using only the provided context. Cite page numbers. Context: {context_text} Question: {state['question']} Answer:""" response = llm.invoke(prompt) return {"answer": response.content} builder = StateGraph(RAGState) builder.add_node("retrieve", retrieve) builder.add_node("generate", generate) builder.add_edge(START, "retrieve") builder.add_edge("retrieve", "generate") builder.add_edge("generate", END)
🔬Adaptive RAG / Self-RAG
Adaptive RAG uses a grader to evaluate retrieved documents and the generated answer — rewriting the query or falling back to web search when quality is poor.
from pydantic import BaseModel class GradeDoc(BaseModel): relevant: bool reason: str class GradeAnswer(BaseModel): grounded: bool useful: bool grader_llm = llm.with_structured_output(GradeDoc) answer_grader = llm.with_structured_output(GradeAnswer) class AdaptiveRAGState(TypedDict): question: str context: list[Document] answer: str retries: int def grade_documents(state: AdaptiveRAGState) -> AdaptiveRAGState: filtered = [] for doc in state["context"]: grade = grader_llm.invoke( f"Question: {state['question']}\nDocument: {doc.page_content}\nIs relevant?" ) if grade.relevant: filtered.append(doc) return {"context": filtered} def decide_after_grading(state: AdaptiveRAGState) -> str: if not state["context"] or state["retries"] > 2: return "web_search" # fall back to web return "generate" def rewrite_query(state: AdaptiveRAGState) -> AdaptiveRAGState: rewrite = llm.invoke( f"Rewrite this query for better retrieval: {state['question']}" ) return {"question": rewrite.content, "retries": state["retries"] + 1} def grade_answer(state: AdaptiveRAGState) -> str: grade = answer_grader.invoke( f"Question: {state['question']}\nAnswer: {state['answer']}" ) return END if (grade.grounded and grade.useful) else "rewrite" builder.add_node("retrieve", retrieve) builder.add_node("grade_docs", grade_documents) builder.add_node("generate", generate) builder.add_node("rewrite", rewrite_query) builder.add_node("web_search", web_search_node) builder.add_edge(START, "retrieve") builder.add_edge("retrieve", "grade_docs") builder.add_conditional_edges("grade_docs", decide_after_grading) builder.add_edge("generate", "grade_answer_node") builder.add_conditional_edges("grade_answer_node", grade_answer)
🗃State Design
from typing import Annotated from operator import add from langgraph.graph.message import add_messages from pydantic import BaseModel, field_validator # ── 1. TypedDict (standard) ───────────────────────────────── class AgentState(TypedDict): messages: Annotated[list, add_messages] # reducer: append messages steps: Annotated[int, lambda a, b: a + b] # reducer: sum context: str # last-write-wins (no reducer) max_steps: int # configuration constant # ── 2. Pydantic (validation + defaults) ───────────────────── class ValidatedState(BaseModel): messages: Annotated[list, add_messages] = [] confidence: float = 0.0 retries: int = 0 @field_validator("confidence") def clamp_confidence(cls, v): return max(0.0, min(1.0, v)) # ── 3. Input/Output schemas (restrict what callers see) ────── class InputState(TypedDict): question: str class OutputState(TypedDict): answer: str sources: list[str] graph = builder.compile(input_schema=InputState, output_schema=OutputState) # ── 4. Private state (internal only, not in input/output) ──── class PrivateState(TypedDict): _intermediate_steps: list # underscore = private by convention
🧩Memory & Checkpointers
from langgraph.checkpoint.memory import InMemorySaver # dev/testing only from langgraph.checkpoint.sqlite import SqliteSaver # local / small apps from langgraph.checkpoint.postgres import PostgresSaver # production from psycopg_pool import ConnectionPool # ── Dev: in-memory (no persistence between restarts) ──────── checkpointer = InMemorySaver() # ── Dev: SQLite (persists to disk) ────────────────────────── with SqliteSaver.from_conn_string(":memory:") as cp: graph = builder.compile(checkpointer=cp) # ── Production: Postgres ──────────────────────────────────── DB_URI = "postgresql://user:pass@host:5432/langgraph?sslmode=require" pool = ConnectionPool(conninfo=DB_URI, max_size=10, open=False) pool.open() with pool.connection() as conn: pg_saver = PostgresSaver(conn) pg_saver.setup() # creates tables on first run graph = builder.compile(checkpointer=pg_saver) # ── Thread-scoped config ───────────────────────────────────── config = { "configurable": { "thread_id": f"tenant-{tenant_id}:user-{user_id}:session-{session_id}", "checkpoint_ns": f"tenant-{tenant_id}", # multi-tenant isolation } } # ── Time travel: replay from a specific checkpoint ─────────── checkpoints = list(graph.get_state_history(config)) past_config = checkpoints[-3].config # 3 steps back graph.invoke(None, config=past_config) # replay from there # ── Inspect current state ─────────────────────────────────── state = graph.get_state(config) print(state.values) # current state dict print(state.next) # next nodes to execute print(state.metadata) # step number, source
Long-term cross-session memory
from langgraph.store.memory import InMemoryStore store = InMemoryStore() def memory_node(state: MessagesState, store=store): # Load user preferences from long-term store user_id = state["user_id"] prefs = store.get(("user_prefs", user_id), "preferences") # After generating response, save new facts store.put( ("user_prefs", user_id), "preferences", {"currency": "INR", "domain": "finance"} ) return state graph = builder.compile(checkpointer=InMemorySaver(), store=store)
🔲Subgraphs
Subgraphs are independently compiled graphs used as nodes in a parent graph. They manage their own internal state and expose only what the parent needs.
from langgraph.graph import StateGraph, START, END # ── Subgraph: internal state ───────────────────────────────── class ResearchSubState(TypedDict): query: str # shared with parent (same key name) search_results: list # internal to subgraph summary: str # output back to parent def search_step(state: ResearchSubState): results = tavily.invoke(state["query"]) return {"search_results": results} def summarise_step(state: ResearchSubState): summary = llm.invoke(f"Summarise: {state['search_results']}") return {"summary": summary.content} sub_builder = StateGraph(ResearchSubState) sub_builder.add_node("search", search_step) sub_builder.add_node("summarise", summarise_step) sub_builder.add_edge(START, "search") sub_builder.add_edge("search", "summarise") sub_builder.add_edge("summarise", END) research_subgraph = sub_builder.compile() # ── Parent graph: add subgraph as a node ───────────────────── class ParentState(TypedDict): query: str # same key as subgraph — state is passed down summary: str # output from subgraph flows back up final_report: str parent = StateGraph(ParentState) parent.add_node("research", research_subgraph) # compiled subgraph as node parent.add_node("report", lambda s: {"final_report": s["summary"]}) parent.add_edge(START, "research") parent.add_edge("research", "report") parent.add_edge("report", END) # Stream with subgraph events visible async for event in graph.astream(input, stream_mode="updates", subgraphs=True): namespace, data = event print(f"[{' > '.join(namespace) or 'root'}]", data)
⚡Streaming
| Mode | What streams | Best for |
|---|---|---|
messages | Token-by-token LLM output (AIMessageChunk) | Chat UI, real-time display |
updates | Dict of state changes after each node | Dashboards, progress tracking |
values | Full state snapshot after each superstep | Debugging, state inspection |
custom | Anything you emit via astream_events | Tool call results, intermediate data |
debug | Every internal event (verbose) | Deep debugging |
import asyncio from langchain_core.messages import AIMessageChunk # ── Token streaming (chat UI) ──────────────────────────────── async for chunk in graph.astream( {"messages": [("user", "Explain SEBI regulations")]}, config=config, stream_mode="messages", ): msg, metadata = chunk if isinstance(msg, AIMessageChunk) and msg.content: print(msg.content, end="", flush=True) # ── Node updates (dashboard) ───────────────────────────────── async for update in graph.astream(input, stream_mode="updates"): node_name = list(update.keys())[0] print(f"✓ {node_name} completed") # ── Multi-mode: tokens + node events ──────────────────────── async for chunk in graph.astream( input, config=config, stream_mode=["messages", "updates"], subgraphs=True, version="v2", # v2: chunk has type/ns/data keys ): if chunk["type"] == "messages": msg, _ = chunk["data"] if isinstance(msg, AIMessageChunk) and msg.content: print(msg.content, end="", flush=True) elif chunk["type"] == "updates": node = list(chunk["data"].keys())[0] ns = chunk["ns"] print(f"\n[{' > '.join(ns) or 'root'}::{node}]") # ── Custom stream events ───────────────────────────────────── from langchain_core.callbacks.manager import adispatch_custom_event async def my_node(state): await adispatch_custom_event("tool_start", {"tool": "web_search"}) result = await do_search(state["query"]) await adispatch_custom_event("tool_end", {"results": len(result)}) return {"context": result} async for event in graph.astream_events(input, version="v2"): if event["event"] == "on_custom_event": print(event["name"], event["data"])
🙋Human-in-the-Loop (HITL)
from langgraph.types import interrupt, Command from langgraph.checkpoint.memory import InMemorySaver def approve_action(state: MessagesState): # Graph pauses here — state is checkpointed action = state["proposed_action"] user_input = interrupt({ "message": f"Approve this action?\n{action}", "options": ["yes", "no", "edit"] }) # blocks until Command(resume=...) is called if user_input == "yes": return {"approved": True} elif user_input == "no": return {"approved": False} else: # Loop: ask again with edited action edited = interrupt({"message": "Enter edited action:"}) return {"proposed_action": edited, "approved": True} graph = builder.compile(checkpointer=InMemorySaver()) # 1. Run until interrupt result = graph.invoke(input, config=config) # result["__interrupt__"] contains the interrupt payload # 2. Inspect state state = graph.get_state(config) # 3. Resume with user's answer graph.invoke(Command(resume="yes"), config=config) # 4. Or update state and continue graph.update_state(config, {"proposed_action": "revised plan"}) graph.invoke(None, config=config) # resume from updated state
# Compile with breakpoints — simpler than interrupt() graph = builder.compile( checkpointer=InMemorySaver(), interrupt_before=["execute_tools"], # pause BEFORE this node runs interrupt_after=["planner"], # pause AFTER this node runs ) # Inspect, optionally modify, then continue state = graph.get_state(config) print(state.next) # which node will run next graph.update_state(config, {"plan": modified_plan}) graph.invoke(None, config=config) # continue
🛡Error Handling
from tenacity import retry, stop_after_attempt, wait_exponential from langchain_core.messages import ToolMessage # ── Retry on node failure ──────────────────────────────────── @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10)) async def resilient_node(state: AgentState): return await llm.ainvoke(state["messages"]) # ── Max steps guard (prevent infinite loops) ──────────────── def should_continue(state: AgentState) -> str: if state["steps"] >= state.get("max_steps", 20): return "force_end" # emergency exit last_msg = state["messages"][-1] return "tools" if last_msg.tool_calls else END # ── Tool error as ToolMessage (not exception) ─────────────── def safe_tool_node(state: MessagesState): last = state["messages"][-1] results = [] for tc in last.tool_calls: try: result = tool_map[tc["name"]].invoke(tc["args"]) results.append(ToolMessage(content=str(result), tool_call_id=tc["id"])) except Exception as e: results.append(ToolMessage( content=f"Error: {e}. Try a different approach.", tool_call_id=tc["id"] )) return {"messages": results} # ── Fallback LLM node ──────────────────────────────────────── primary_llm = ChatVertexAI(model="gemini-2.0-flash") fallback_llm = AzureChatOpenAI(azure_deployment="gpt-4o") resilient_llm = primary_llm.with_fallbacks([fallback_llm])
🔬Debugging — LangSmith & Local
LangGraph 1.x integrates deeply with LangSmith. Every node execution, LLM call, and tool call becomes a traced span. This is the primary production debugging path.
# Enable LangSmith tracing — add to .env before starting your process LANGCHAIN_TRACING_V2=true LANGCHAIN_API_KEY=ls__xxxxxxxxxxxxxxxx LANGCHAIN_PROJECT=arthavidya-agent LANGCHAIN_ENDPOINT=https://api.smith.langchain.com # default
from langgraph.graph import StateGraph # ── 1. Visualise graph structure ───────────────────────────── graph = builder.compile() print(graph.get_graph().draw_ascii()) # terminal ASCII art graph.get_graph().draw_mermaid_png() # .png file of the graph graph.get_graph(xray=True).draw_mermaid_png() # show subgraph internals # ── 2. Debug stream — every internal event ─────────────────── for event in graph.stream(input, config=config, stream_mode="debug"): print(event["type"], event.get("step")) # ── 3. Inspect state at any checkpoint ────────────────────── state = graph.get_state(config) print("current values:", state.values) print("next nodes:", state.next) print("at step:", state.metadata["step"]) # ── 4. Full state history (time travel) ───────────────────── for snapshot in graph.get_state_history(config): print(snapshot.metadata["step"], snapshot.next, snapshot.config) # ── 5. Print every message in a run ───────────────────────── final = graph.invoke(input, config=config) for msg in final["messages"]: print(f"[{msg.type}] {msg.content[:200]}") # ── 6. astream_events — fine-grained event trace ──────────── async for event in graph.astream_events(input, version="v2"): kind = event["event"] name = event["name"] if kind == "on_chat_model_start": print(f"→ LLM start: {name}") elif kind == "on_tool_start": print(f"→ Tool: {name} args={event['data']['input']}") elif kind == "on_chat_model_end": output = event["data"]["output"] tokens = output.usage_metadata print(f"← LLM done | tokens: {tokens}")
LangSmith key debugging workflows
evaluate() against it with a custom evaluator. Catch regressions in CI: pytest --langsmith-eval.from langsmith import traceable, Client client = Client() # Add metadata to any graph run for filtering in LangSmith config = { "configurable": {"thread_id": "t1"}, "metadata": { "user_id": "u123", "session_type": "financial_research", "model_version": "arthavidya-v0.2", }, "tags": ["production", "nse"], } # Add feedback to a run client.create_feedback( run_id=run_id, key="correctness", score=1.0, comment="Answer was accurate and well-cited" ) # @traceable wraps any function in a LangSmith span @traceable(name="custom_retriever") def hybrid_search(query: str) -> list: bm25 = keyword_search(query) dense = vector_search(query) return rerank(bm25 + dense)
Annotated[list, add] reducers for accumulation.📏Standards & Best Practices
| Rule | Severity | Rationale |
|---|---|---|
| Always type state with TypedDict or Pydantic | MUST | Untyped dicts break IDE support, hide bugs, cause silent key errors at runtime |
Assign a meaningful thread_id on every invocation |
MUST | Required for checkpointing, HITL, and time-travel debugging to work |
| Use Postgres checkpointer in production (not InMemorySaver) | MUST | InMemorySaver loses all state on process restart — unacceptable for prod |
Set max_steps guard on every cyclical graph |
MUST | Prevents infinite loops from runaway models / bad tool outputs |
| Never raise unhandled exceptions in tool functions | MUST | Crashes the graph node. Return error string as ToolMessage instead |
| Enable LangSmith tracing from day one | SHOULD | Impossible to debug complex agents without traces. Traces are free in dev tier |
Prefer async nodes for I/O-bound work |
SHOULD | Parallel subgraphs and Send() fan-out use asyncio — sync nodes block the event loop |
| Separate tool definitions from graph wiring | SHOULD | Tools should be independently testable. Graph file only does wiring |
Use input_schema / output_schema on compiled graphs |
SHOULD | Hides internal state from callers; enables cleaner API surface |
| Keep state minimal — transient data in function scope | SHOULD | Bloated state = large checkpoints = slow persistence = hard-to-read debug traces |
| Use connection pooling for Postgres checkpointer | SHOULD | One connection per request = connection exhaustion under load |
| Avoid storing LangChain RunnableConfig in state | AVOID | Config is not serializable; breaks checkpointing silently |
Avoid using .invoke() for streaming-first apps |
AVOID | Returns only after full completion — users see a spinner for 30+ second runs |
| Avoid top-level graph mutations after compile() | AVOID | The compiled graph is immutable. Add new nodes by rebuilding the StateGraph |
Project layout
my_agent/ ├── graphs/ │ ├── __init__.py │ ├── main_graph.py # StateGraph wiring only │ ├── research_subgraph.py # subgraph definitions │ └── rag_graph.py ├── nodes/ │ ├── llm_nodes.py # LLM call nodes │ ├── tool_nodes.py # custom tool executor nodes │ └── router_nodes.py # conditional edge functions ├── tools/ │ ├── web_search.py # tool definitions (independently testable) │ ├── database.py │ └── calculators.py ├── state/ │ └── schemas.py # TypedDict / Pydantic state definitions ├── config/ │ └── settings.py # pydantic-settings, env var loading ├── tests/ │ ├── test_nodes.py # unit tests per node │ ├── test_tools.py │ └── test_graph.py # integration tests with InMemorySaver └── CLAUDE.md # langgraph context for Claude Code
🔗Reference Links
- Docsdocs.langchain.com — LangGraph Overview
- APIreference.langchain.com — LangGraph API Reference
- GitHubgithub.com/langchain-ai/langgraph
- Quickstartdocs.langchain.com — LangGraph Quickstart
- GuidesHow-to Guides (streaming, memory, subgraphs)
- HITLdocs.langchain.com — Interrupts & HITL
- CheckpointersPyPI — langgraph-checkpoint
- LangSmithsmith.langchain.com — Tracing & Evaluation
- Academyacademy.langchain.com — Free LangGraph Course
- Vertex AIVertex AI Agent Engine — LangGraph docs
- AzureAzure OpenAI Service docs
- TemplatesLangGraph examples on GitHub
- Forumforum.langchain.com — Community Q&A
- Best practicesLangGraph Best Practices — Swarnendu De