Documentation Index
Fetch the complete documentation index at: https://mintlify.com/langchain-ai/langgraph/llms.txt
Use this file to discover all available pages before exploring further.
Deploying LangGraph applications requires careful consideration of persistence, scalability, monitoring, and infrastructure.
Deployment Options
LangSmith Deployment
The easiest way to deploy LangGraph applications:
# Install CLI
pip install langgraph-cli
# Initialize project
langgraph init
# Deploy to LangSmith
langgraph deploy
Benefits:
- Managed infrastructure
- Built-in observability
- Automatic scaling
- Production checkpointers
- LangGraph Studio integration
LangSmith Deployment handles persistence, scaling, and monitoring automatically.
Self-Hosted Deployment
For self-hosted deployments, you’ll need to configure:
- Web server (FastAPI, Flask)
- Persistent checkpointer (PostgreSQL, SQLite)
- Message queue (for async processing)
- Load balancer
- Monitoring and logging
Production Setup
Use a persistent checkpointer:
from langgraph.checkpoint.postgres import PostgresSaver
import os
# Production database
DB_URI = os.getenv("DATABASE_URL")
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup()
app = graph.compile(checkpointer=checkpointer)
Wrap your graph in a web API:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
api = FastAPI()
class InvokeRequest(BaseModel):
input: dict
thread_id: str
class InvokeResponse(BaseModel):
output: dict
thread_id: str
@api.post("/invoke", response_model=InvokeResponse)
async def invoke_graph(request: InvokeRequest):
"""Invoke the graph."""
try:
config = {"configurable": {"thread_id": request.thread_id}}
result = await app.ainvoke(request.input, config)
return InvokeResponse(
output=result,
thread_id=request.thread_id,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@api.post("/stream")
async def stream_graph(request: InvokeRequest):
"""Stream graph execution."""
config = {"configurable": {"thread_id": request.thread_id}}
async def event_generator():
async for chunk in app.astream(request.input, config):
yield f"data: {json.dumps(chunk)}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
)
if __name__ == "__main__":
uvicorn.run(api, host="0.0.0.0", port=8000)
@api.get("/health")
async def health_check():
"""Health check endpoint."""
try:
# Check database connection
state = app.get_state({"configurable": {"thread_id": "health-check"}})
return {"status": "healthy"}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Unhealthy: {e}")
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str
openai_api_key: str
anthropic_api_key: str
log_level: str = "INFO"
max_workers: int = 4
class Config:
env_file = ".env"
settings = Settings()
Containerization
Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Run server
CMD ["uvicorn", "main:api", "--host", "0.0.0.0", "--port", "8000"]
Docker Compose
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:password@db:5432/langgraph
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- db
db:
image: postgres:15
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=langgraph
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
Kubernetes Deployment
Deployment Manifest
apiVersion: apps/v1
kind: Deployment
metadata:
name: langgraph-app
spec:
replicas: 3
selector:
matchLabels:
app: langgraph
template:
metadata:
labels:
app: langgraph
spec:
containers:
- name: app
image: your-registry/langgraph-app:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: app-secrets
key: database-url
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: app-secrets
key: openai-api-key
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
Service Manifest
apiVersion: v1
kind: Service
metadata:
name: langgraph-service
spec:
selector:
app: langgraph
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
Scaling Considerations
Horizontal Scaling
LangGraph applications can scale horizontally:
# Use connection pooling
from psycopg_pool import ConnectionPool
pool = ConnectionPool(
conninfo=DB_URI,
min_size=2,
max_size=10,
)
checkpointer = PostgresSaver(pool)
Async Processing
Handle long-running workflows asynchronously:
from celery import Celery
celery_app = Celery('langgraph', broker='redis://localhost:6379')
@celery_app.task
def process_graph(input_data: dict, thread_id: str):
"""Process graph in background."""
config = {"configurable": {"thread_id": thread_id}}
result = app.invoke(input_data, config)
return result
# API endpoint
@api.post("/invoke-async")
async def invoke_async(request: InvokeRequest):
task = process_graph.delay(request.input, request.thread_id)
return {"task_id": task.id}
Caching
Implement caching for frequently accessed data:
from langgraph.cache.memory import InMemoryCache
cache = InMemoryCache()
app = graph.compile(
checkpointer=checkpointer,
cache=cache,
)
Monitoring
LangSmith Integration
import os
# Enable LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production-app"
# Traces automatically sent to LangSmith
result = app.invoke(input_data, config)
Custom Metrics
from prometheus_client import Counter, Histogram
import time
# Define metrics
invocation_counter = Counter(
'langgraph_invocations_total',
'Total graph invocations',
['status']
)
invocation_duration = Histogram(
'langgraph_invocation_duration_seconds',
'Graph invocation duration'
)
# Instrument code
def invoke_with_metrics(input_data, config):
start = time.time()
try:
result = app.invoke(input_data, config)
invocation_counter.labels(status='success').inc()
return result
except Exception as e:
invocation_counter.labels(status='error').inc()
raise
finally:
duration = time.time() - start
invocation_duration.observe(duration)
Logging
import logging
import json
# Structured logging
logger = logging.getLogger(__name__)
class StructuredLogger:
@staticmethod
def log_invocation(thread_id: str, input_data: dict, result: dict):
logger.info(json.dumps({
"event": "graph_invocation",
"thread_id": thread_id,
"input": input_data,
"output": result,
"timestamp": datetime.now().isoformat(),
}))
# Use in API
@api.post("/invoke")
async def invoke_graph(request: InvokeRequest):
result = app.invoke(request.input, config)
StructuredLogger.log_invocation(
request.thread_id,
request.input,
result,
)
return result
Security
Authentication
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
"""Verify JWT token."""
token = credentials.credentials
# Verify token
if not is_valid_token(token):
raise HTTPException(status_code=401, detail="Invalid token")
return get_user_from_token(token)
@api.post("/invoke")
async def invoke_graph(
request: InvokeRequest,
user = Depends(verify_token),
):
# Use user-specific thread_id
config = {"configurable": {"thread_id": f"{user.id}-{request.thread_id}"}}
return app.invoke(request.input, config)
Rate Limiting
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
api.state.limiter = limiter
@api.post("/invoke")
@limiter.limit("10/minute")
async def invoke_graph(request: Request, invoke_request: InvokeRequest):
# Rate limited to 10 requests per minute
return app.invoke(invoke_request.input, config)
Best Practices
- Use persistent checkpointers: PostgreSQL or managed services for production
- Implement health checks: Monitor application and database health
- Enable tracing: Use LangSmith for observability
- Handle errors gracefully: Return meaningful error messages
- Validate input: Check user input before processing
- Set resource limits: Prevent resource exhaustion
- Use environment variables: Never hardcode secrets
- Implement retries: Handle transient failures
- Monitor performance: Track latency and throughput
- Plan for scaling: Design for horizontal scaling from the start
Next Steps