Overview
LangGraph supports multiple streaming modes to provide real-time feedback, build responsive UIs, and monitor graph execution. Instead of waiting for the entire graph to complete, you can process results as they become available.
Stream Modes
LangGraph offers 7 different streaming modes, each serving different use cases:
from langgraph.graph import StateGraph
graph = builder.compile()
# Single mode
for event in graph.stream(input_data, stream_mode = "values" ):
print (event)
# Multiple modes simultaneously
for event in graph.stream(input_data, stream_mode = [ "values" , "updates" ]):
print (event)
Mode Overview
values Complete state after each step
updates Individual node outputs
messages LLM token streaming
custom User-defined events
checkpoints State snapshots
tasks Task execution events
debug Debugging information
Values Mode
Emits the complete state after each step:
from typing_extensions import TypedDict
class State ( TypedDict ):
messages: list[ str ]
count: int
for state in graph.stream({ "messages" : [], "count" : 0 }, stream_mode = "values" ):
print (state)
# {'messages': ['Hello'], 'count': 1}
# {'messages': ['Hello', 'Response'], 'count': 2}
# {'messages': ['Hello', 'Response', 'Done'], 'count': 3}
Use cases:
Display complete state in UI
Monitor full state changes
Simple progress tracking
values is the default stream mode. It includes the initial state before any nodes execute.
Updates Mode
Emits individual node outputs as they complete:
for event in graph.stream({ "count" : 0 }, stream_mode = "updates" ):
print (event)
# {'node1': {'count': 1}}
# {'node2': {'count': 2}}
# {'node3': {'count': 3}}
Each event is a dictionary with:
Key: Node name
Value: Node’s output (state update)
Use cases:
Track which nodes executed
Show per-node progress
Collect individual results
Parallel Node Updates
# If nodes run in parallel, you get separate events
for event in graph.stream(input_data, stream_mode = "updates" ):
print (event)
# {'fetch_user': {'user': {...}}}
# {'fetch_orders': {'orders': [...]}}
# {'combine': {'result': {...}}}
Messages Mode
Stream LLM tokens in real-time:
from langgraph.graph import MessagesState
class State ( MessagesState ):
pass
def llm_node ( state : State):
# LLM calls are automatically streamed
response = llm.invoke(state[ "messages" ])
return { "messages" : [response]}
for event in graph.stream(
{ "messages" : [( "user" , "Tell me a story" )]},
stream_mode = "messages"
):
# Event format: (message_chunk, metadata)
chunk, metadata = event
print (chunk.content, end = "" , flush = True )
# Metadata includes:
# - langgraph_node: which node emitted this
# - langgraph_step: execution step
# - langgraph_path: node path in nested graphs
for event in graph.stream(input_data, stream_mode = "messages" ):
chunk, metadata = event
print ( f "Node: { metadata[ 'langgraph_node' ] } " )
print ( f "Step: { metadata[ 'langgraph_step' ] } " )
print ( f "Content: { chunk.content } " )
# For tool calls
if hasattr (chunk, "tool_calls" ):
print ( f "Tool calls: { chunk.tool_calls } " )
Use cases:
Real-time chat interfaces
Streaming chatbots
Progressive text generation
Custom Mode
Emit custom events from within nodes:
from langgraph.types import StreamWriter
def my_node ( state : State, writer : StreamWriter) -> dict :
"""Node that emits custom events."""
# Emit progress updates
writer( "Starting processing..." )
for i, item in enumerate (state[ "items" ]):
result = process_item(item)
# Emit custom event
writer({
"type" : "progress" ,
"item_index" : i,
"result" : result,
"percentage" : (i + 1 ) / len (state[ "items" ]) * 100
})
writer( "Processing complete!" )
return { "status" : "done" }
# Stream custom events
for event in graph.stream(input_data, stream_mode = "custom" ):
print (event)
# "Starting processing..."
# {'type': 'progress', 'item_index': 0, ...}
# {'type': 'progress', 'item_index': 1, ...}
# "Processing complete!"
StreamWriter is automatically injected when requested as a parameter. It’s a no-op when not using stream_mode="custom".
Use cases:
Fine-grained progress tracking
Custom metrics/telemetry
Application-specific events
Checkpoints Mode
Emits state snapshots when checkpoints are created:
for event in graph.stream(input_data, stream_mode = "checkpoints" ):
print (event)
# StateSnapshot(
# values={'messages': [...], 'count': 1},
# next=('node2',),
# config={'configurable': {'thread_id': '1'}},
# metadata={'step': 0, 'source': 'loop'},
# created_at='2024-01-15T10:30:00Z',
# tasks=(...),
# )
Each checkpoint includes:
values: Current state
next: Upcoming nodes
config: Runtime configuration
metadata: Step info, source
created_at: Timestamp
tasks: Pending tasks
Use cases:
Monitor checkpointing
Display execution timeline
Debug state persistence
Requires a checkpointer to be configured.
Tasks Mode
Emits events for task lifecycle:
for event in graph.stream(input_data, stream_mode = "tasks" ):
print (event)
# ('task', {
# 'id': 'abc-123',
# 'name': 'process_data',
# 'path': ('process_data',),
# 'result': None, # None while running
# 'error': None,
# })
#
# ('task_result', {
# 'id': 'abc-123',
# 'name': 'process_data',
# 'path': ('process_data',),
# 'result': {'count': 1},
# 'error': None,
# })
Event types:
task: Task started
task_result: Task completed (with result or error)
Use cases:
Monitor task execution
Track task duration
Debug failures
Debug Mode
Combines checkpoints and tasks for comprehensive debugging:
for event in graph.stream(input_data, stream_mode = "debug" ):
event_type, event_data = event
if event_type == "checkpoint" :
print ( f "Checkpoint at step { event_data.metadata[ 'step' ] } " )
elif event_type == "task" :
print ( f "Task { event_data[ 'name' ] } started" )
elif event_type == "task_result" :
print ( f "Task { event_data[ 'name' ] } completed" )
Use cases:
Development debugging
Troubleshooting execution
Performance analysis
Multiple Stream Modes
Combine modes for richer output:
for event in graph.stream(
input_data,
stream_mode = [ "values" , "updates" , "messages" ]
):
event_type = list (event.keys())[ 0 ]
event_data = event[event_type]
if event_type == "values" :
print ( f "State: { event_data } " )
elif event_type == "updates" :
print ( f "Update: { event_data } " )
elif event_type == "messages" :
chunk, metadata = event_data
print ( f "Token: { chunk.content } " )
Each event is a dict with a single key indicating the mode:
{ "values" : < state > }
{ "updates" : < update > }
{ "messages" : ( < chunk > , < metadata > )}
Async Streaming
All streaming modes support async iteration:
async for event in graph.astream(
input_data,
stream_mode = "updates"
):
await process_event(event)
Use async streaming for:
Async I/O operations
Concurrent event processing
WebSocket connections
Server-sent events (SSE)
Streaming with Subgraphs
Control subgraph streaming:
# Stream events from subgraphs too
for event in graph.stream(
input_data,
stream_mode = "updates" ,
subgraphs = True # Include subgraph events
):
print (event)
# Events include namespace information
# {'parent_node:subgraph_node': {'count': 1}}
Building a Streaming UI
Real-Time Chat Interface
from langgraph.graph import MessagesState
class ChatState ( MessagesState ):
pass
def display_stream ( user_message : str ):
config = { "configurable" : { "thread_id" : "chat-1" }}
# Stream LLM tokens
print ( "Assistant: " , end = "" )
for event in graph.stream(
{ "messages" : [( "user" , user_message)]},
config = config,
stream_mode = "messages"
):
chunk, metadata = event
print (chunk.content, end = "" , flush = True )
print () # Newline
display_stream( "What's the weather?" )
# Assistant: I don't have access to real-time weather...
Progress Bar with Custom Events
from tqdm import tqdm
from langgraph.types import StreamWriter
def batch_processor (
state : State,
writer : StreamWriter
) -> dict :
items = state[ "items" ]
for i, item in enumerate (items):
result = process(item)
writer({
"processed" : i + 1 ,
"total" : len (items)
})
return { "status" : "complete" }
# Display progress
with tqdm( total = 100 ) as pbar:
for event in graph.stream(input_data, stream_mode = "custom" ):
if isinstance (event, dict ) and "processed" in event:
pbar.update( 1 )
FastAPI SSE Endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.post ( "/stream" )
async def stream_graph ( request : dict ):
async def event_generator ():
async for event in graph.astream(
request[ "input" ],
stream_mode = "updates"
):
yield f "data: { json.dumps(event) } \n\n "
return StreamingResponse(
event_generator(),
media_type = "text/event-stream"
)
Stream Configuration
Early Emission
Force eager event emission:
graph = builder.compile(
stream_eager = True # Emit events as soon as possible
)
By default, events are batched. stream_eager=True reduces latency.
Filtering Stream Channels
Limit which state keys are streamed:
from langgraph.pregel import Pregel
# Using low-level Pregel API
app = Pregel(
nodes = { ... },
channels = { ... },
output_channels = [ "messages" , "status" ],
stream_channels = [ "messages" ], # Only stream 'messages'
)
Best Practices
Use values for state monitoring and simple UIs
Use updates to track individual node execution
Use messages for chat interfaces with LLMs
Use custom for application-specific events
Use debug during development
Combine modes when you need multiple perspectives
Buffer tokens before displaying (avoid flickering)
Show loading indicators between node executions
Handle reconnection for long-running streams
Display node names from updates mode
Use custom events for progress bars
Troubleshooting
Verify correct stream mode
Check if graph has any nodes
Ensure nodes return state updates
For messages mode, confirm LLM is used
For custom mode, verify StreamWriter calls
Enable stream_eager=True
Check for buffering in transport layer
Verify async streaming is used correctly
Review node execution time
Missing tokens in messages mode
Ensure LLM supports streaming
Check that LLM is configured for streaming
Verify message format is correct
Review LangChain callback configuration
Next Steps
Human-in-the-Loop Combine streaming with interrupts for human oversight
Checkpointing Use checkpoint streaming for state monitoring