Skip to main content

Overview

Human-in-the-loop (HITL) enables LangGraph agents to pause for human approval, input, or correction. This is critical for high-stakes decisions, quality control, and building trustworthy AI systems.

Interrupt Points

The simplest HITL pattern: pause execution at specific nodes.

Interrupt Before

Pause before a node executes:
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing_extensions import TypedDict

class State(TypedDict):
    plan: str
    approved: bool

def create_plan(state: State) -> dict:
    return {"plan": "Execute action X"}

def execute_plan(state: State) -> dict:
    # This only runs after human approval
    return {"approved": True}

builder = StateGraph(State)
builder.add_node("planner", create_plan)
builder.add_node("executor", execute_plan)
builder.add_edge(START, "planner")
builder.add_edge("planner", "executor")
builder.add_edge("executor", END)

# Pause before executor runs
checkpointer = InMemorySaver()
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["executor"]
)

Interrupt After

Pause after a node completes:
# Review output before continuing
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_after=["planner"]  # Review plan before execution
)

Interrupt All

Pause at every node:
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before="*",  # Pause before every node
    interrupt_after="*"     # Or after every node
)
Interrupts require a checkpointer. Without one, the graph cannot save state to resume later.

Using Interrupts

Basic Workflow

config = {"configurable": {"thread_id": "workflow-1"}}

# First invocation - runs until interrupt
result = graph.invoke({"plan": "", "approved": False}, config)

# Check state at interrupt
state = graph.get_state(config)
print(state.values)  # {'plan': 'Execute action X', 'approved': False}
print(state.next)    # ('executor',) - waiting to run executor

# Human reviews the plan...
if human_approves(state.values["plan"]):
    # Resume execution
    result = graph.invoke(None, config)
    print(result)  # {'plan': '...', 'approved': True}

Streaming with Interrupts

config = {"configurable": {"thread_id": "workflow-2"}}

# Stream until interrupt
for event in graph.stream({"plan": ""}, config, stream_mode="updates"):
    print(event)
    # {'planner': {'plan': 'Execute action X'}}
    # Pauses here at interrupt_before=["executor"]

# Resume streaming
for event in graph.stream(None, config, stream_mode="updates"):
    print(event)
    # {'executor': {'approved': True}}

Dynamic Interrupts

Trigger interrupts programmatically from within nodes:
from langgraph.types import interrupt

def review_node(state: State) -> dict:
    # Analyze the content
    if requires_review(state["content"]):
        # Interrupt and ask for human input
        human_input = interrupt(
            {
                "question": "Does this look correct?",
                "content": state["content"],
                "options": ["approve", "reject", "edit"]
            }
        )
        
        # This code runs AFTER human resumes
        if human_input == "reject":
            return {"status": "rejected"}
        elif human_input == "edit":
            return {"content": get_edited_content()}
    
    return {"status": "approved"}

builder.add_node("review", review_node)

Using interrupt()

The interrupt() function:
  1. First call: Raises GraphInterrupt, pausing execution
  2. Surfaces the interrupt value to the client
  3. Resume with value: On next invocation, returns the resume value
  4. Node re-executes from the start with the resume value
from langgraph.types import Command

config = {"configurable": {"thread_id": "review-1"}}

# First run - encounters interrupt
for event in graph.stream({"content": "Draft"}, config):
    if "__interrupt__" in event:
        interrupt_info = event["__interrupt__"][0]
        print(interrupt_info.value)
        # {'question': 'Does this look correct?', ...}

# Get current state
state = graph.get_state(config)
interrupt = state.interrupts[0]
print(interrupt.value)  # Same as above

# Resume with human decision
for event in graph.stream(
    Command(resume="approve"),  # Provide resume value
    config
):
    print(event)
    # {'review': {'status': 'approved'}}

Multiple Interrupts

A node can have multiple interrupt points:
def complex_node(state: State) -> dict:
    # First interrupt
    step1_approval = interrupt("Approve step 1?")
    
    if step1_approval:
        process_step1()
    
    # Second interrupt
    step2_approval = interrupt("Approve step 2?")
    
    if step2_approval:
        process_step2()
    
    return {"status": "complete"}

# Resume with list of values (matched in order)
for event in graph.stream(
    Command(resume=[True, True]),  # Approve both steps
    config
):
    print(event)
Resume values are matched to interrupts in the order they appear in the node’s execution.

Editing State During Interrupts

Modify state while paused:
config = {"configurable": {"thread_id": "edit-1"}}

# Run until interrupt
graph.invoke({"draft": "Initial content"}, config)

# Check state
state = graph.get_state(config)
print(state.values["draft"])

# Human edits the draft
updated_config = graph.update_state(
    config,
    values={"draft": "Edited content"},
    as_node="editor"  # Pretend 'editor' node made this change
)

# Resume with edited state
result = graph.invoke(None, updated_config)

Update and Resume in One Step

Combine state update with resuming:
from langgraph.types import Command

# Update state and resume interrupt
for event in graph.stream(
    Command(
        update={"draft": "Edited content"},
        resume="approve"
    ),
    config
):
    print(event)

Interrupt Metadata

Access detailed information about interrupts:
state = graph.get_state(config)

for interrupt in state.interrupts:
    print(interrupt.value)  # Data passed to interrupt()
    print(interrupt.id)     # Unique interrupt ID

# Use interrupt ID to resume specific interrupt
for event in graph.stream(
    Command(resume={interrupt.id: "approval_value"}),
    config
):
    print(event)

Approval Workflows

Simple Approval

def requires_approval(state: State) -> dict:
    if state["amount"] > 1000:
        approval = interrupt(f"Approve ${state['amount']} transaction?")
        if not approval:
            return {"status": "rejected"}
    
    return {"status": "approved"}

Multi-Level Approval

def multi_level_approval(state: State) -> dict:
    amount = state["amount"]
    
    # Level 1: Manager approval for $1k+
    if amount >= 1000:
        manager_approval = interrupt(
            {"level": "manager", "amount": amount}
        )
        if not manager_approval:
            return {"status": "rejected"}
    
    # Level 2: Director approval for $10k+
    if amount >= 10000:
        director_approval = interrupt(
            {"level": "director", "amount": amount}
        )
        if not director_approval:
            return {"status": "rejected"}
    
    return {"status": "approved", "amount": amount}

Review and Edit Workflows

Content Review

def content_reviewer(state: State) -> dict:
    draft = state["draft"]
    
    # Human reviews the draft
    feedback = interrupt({
        "action": "review",
        "draft": draft,
        "options": ["approve", "request_changes", "reject"]
    })
    
    if feedback["action"] == "approve":
        return {"status": "approved", "final": draft}
    
    elif feedback["action"] == "request_changes":
        # Loop back for revisions
        return {
            "status": "needs_revision",
            "feedback": feedback["comments"]
        }
    
    else:  # reject
        return {"status": "rejected"}

Interactive Editing

def interactive_editor(state: State) -> dict:
    content = state["content"]
    
    while True:
        # Show content and get feedback
        action = interrupt({
            "content": content,
            "prompt": "Edit, approve, or reject?"
        })
        
        if action["type"] == "edit":
            content = action["new_content"]
            continue
        
        elif action["type"] == "approve":
            return {"content": content, "status": "approved"}
        
        else:  # reject
            return {"status": "rejected"}
Be cautious with loops in interrupt logic. Each iteration re-executes the node from the start.

Error Handling

Retry After Human Review

def safe_operation(state: State) -> dict:
    try:
        result = risky_operation(state["input"])
        return {"result": result}
    
    except Exception as e:
        # Let human decide whether to retry
        action = interrupt({
            "error": str(e),
            "question": "Retry with different input?",
            "current_input": state["input"]
        })
        
        if action["retry"]:
            # Try with modified input
            result = risky_operation(action["new_input"])
            return {"result": result, "input": action["new_input"]}
        else:
            raise

Building HITL UIs

CLI Interface

import uuid

config = {"configurable": {"thread_id": str(uuid.uuid4())}}

while True:
    # Run until interrupt or completion
    for event in graph.stream(user_input, config):
        if "__interrupt__" in event:
            # Display interrupt to user
            interrupt_data = event["__interrupt__"][0].value
            print(f"\n⚠️  Approval Required:")
            print(f"   {interrupt_data}")
            
            # Get human input
            response = input("\nYour decision (yes/no): ").lower()
            approval = response == "yes"
            
            # Resume with decision
            user_input = Command(resume=approval)
            break
        else:
            print(event)
    else:
        # No interrupt, we're done
        break

Web API

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class ResumeRequest(BaseModel):
    thread_id: str
    resume_value: Any

@app.post("/start")
async def start_workflow(input_data: dict):
    thread_id = str(uuid.uuid4())
    config = {"configurable": {"thread_id": thread_id}}
    
    # Run until interrupt
    result = graph.invoke(input_data, config)
    
    # Check for interrupts
    state = graph.get_state(config)
    if state.interrupts:
        return {
            "status": "interrupted",
            "thread_id": thread_id,
            "interrupt": state.interrupts[0].value,
            "next": state.next
        }
    
    return {"status": "complete", "result": result}

@app.post("/resume")
async def resume_workflow(request: ResumeRequest):
    config = {"configurable": {"thread_id": request.thread_id}}
    
    # Resume with human input
    result = graph.invoke(
        Command(resume=request.resume_value),
        config
    )
    
    # Check if more interrupts
    state = graph.get_state(config)
    if state.interrupts:
        return {
            "status": "interrupted",
            "interrupt": state.interrupts[0].value
        }
    
    return {"status": "complete", "result": result}

Best Practices

  • High-stakes decisions (financial, legal, medical)
  • Quality control and content moderation
  • Ambiguous situations requiring judgment
  • Initial system deployment (reduce automation gradually)
  • Compliance and audit requirements
  • Training and validation scenarios
  • Provide clear context in interrupt values
  • Include relevant state data
  • Offer specific action options
  • Set appropriate timeout expectations
  • Log all human decisions for audit
  • Make interrupt points resumable
  • Always use persistent checkpointers in production
  • Test resume logic thoroughly
  • Handle state updates gracefully
  • Validate human input before resuming
  • Consider versioning for long-running workflows
  • Show progress before and after interrupts
  • Provide clear instructions to humans
  • Display relevant context efficiently
  • Support undo/redo when possible
  • Track and display approval history

Troubleshooting

  • Verify checkpointer is configured
  • Check interrupt_before/after configuration
  • Ensure node names are correct
  • For dynamic interrupts, verify interrupt() is called
  • Confirm thread_id matches
  • Check Command(resume=…) syntax
  • Verify interrupt ID for multi-interrupt nodes
  • Review node re-execution logic
  • Use persistent checkpointer (not InMemorySaver in production)
  • Verify database connectivity
  • Check thread_id consistency
  • Review checkpoint write logs

Next Steps

Checkpointing

Deep dive into state persistence for HITL

Streaming

Stream events to build responsive HITL UIs