芥末
发布于 2025-12-16 / 0 阅读
0
0

LangGraph 多智能体应用开发:图工作流、状态管理与 Supervisor/Swarm 架构

单个 Agent(智能体)适合处理边界清晰、步骤不多的任务,比如查询天气、总结一段文本、调用一个接口完成简单操作。一旦任务变成“先理解需求,再检索资料,再调用多个工具,再根据结果继续决策,必要时还要让人确认”,线性链式调用就会开始变得笨重。

LangGraph 解决的正是这类问题。它把 LLM(Large Language Model,大语言模型)应用的执行过程抽象成一张有向图:节点负责处理任务,边负责决定执行顺序,状态在节点之间流转。这样一来,Agent 不再只是“一步接一步”的链,而可以拥有分支、循环、并行、持久化、人工中断、恢复执行等能力。

一个典型 LangGraph 工作流可以理解成下面这张图:

flowchart LR
    START((START)) --> A[解析用户请求]
    A --> B{需要调用工具?}
    B -- 是 --> C[工具节点]
    C --> D[模型总结]
    B -- 否 --> D
    D --> E{需要人工确认?}
    E -- 是 --> F[人工审核]
    F --> G[生成最终结果]
    E -- 否 --> G
    G --> END((END))

LangGraph 适合解决什么问题

传统 LangChain 的 Chain 更像流水线,适合固定步骤:

flowchart LR
    A[输入] --> B[Prompt]
    B --> C[LLM]
    C --> D[解析输出]

但真实业务里的 Agent 往往不是固定流水线,而是会反复判断:

  • 当前信息是否足够?
  • 是否需要调用工具?
  • 工具失败后是否需要重试?
  • 是否需要把任务交给另一个 Agent?
  • 是否需要人工审批?
  • 是否需要从某个历史状态重新执行?

LangGraph 的图结构更适合表达这种非线性流程:

flowchart TD
    A[用户请求] --> B[Planner 规划]
    B --> C{任务类型}
    C -- 查询类 --> D[检索 Agent]
    C -- 计算类 --> E[计算 Agent]
    C -- 操作类 --> F[执行 Agent]
    D --> G[汇总结果]
    E --> G
    F --> H{高风险操作?}
    H -- 是 --> I[人工确认]
    I --> G
    H -- 否 --> G
    G --> J[返回用户]

LangGraph 的核心价值主要体现在几件事上:

能力解决的问题
图结构编排把复杂任务拆成节点,用边表达执行关系
状态管理所有节点共享同一份 State,避免状态散落在各处
条件路由根据当前状态决定后续执行哪个节点
循环执行Agent 可以反复思考、调用工具、修正答案
并行分支多个子任务可以同时执行,再汇总结果
检查点每一步保存状态,支持恢复、审计和调试
人机协作在关键节点暂停,等待人工输入后继续
多智能体多个专业 Agent 可以协作完成复杂任务

快速创建一个 ReAct Agent

安装 LangGraph:

pip install -U langgraph

LangGraph 提供了开箱即用的 create_react_agent,可以快速创建一个 ReAct 风格的 Agent。ReAct 指的是 Reasoning + Acting,也就是模型先推理,再决定是否调用工具,然后继续推理。

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.prebuilt import create_react_agent

def get_weather(city: str) -> str:
    """获取指定城市的天气信息"""
    return f"{city}今天是晴天,气温 25 摄氏度。"

model = ChatOpenAI(
    model="gpt-4o-mini",
    base_url="https://api.example.com/v1",
    api_key="your-api-key",
)

agent = create_react_agent(
    model=model,
    tools=[get_weather],
    prompt="你是一个天气助手,需要在回答天气问题时调用工具。"
)

result = agent.invoke({
    "messages": [
        HumanMessage(content="深圳今天的天气怎么样?")
    ]
})

print(result["messages"][-1].content)

Agent 支持同步和异步两种调用方式:

模式方法
同步一次性返回invoke()
同步流式返回stream()
异步一次性返回ainvoke()
异步流式返回astream()

为了防止 Agent 在“思考—调用工具—再思考”的循环里跑不出来,可以设置递归限制:

result = agent.invoke(
    {
        "messages": [
            {"role": "user", "content": "帮我预订深圳到北京的机票"}
        ]
    },
    config={
        "recursion_limit": 10
    }
)

recursion_limit 控制一次图执行最多走多少步。超过限制后,LangGraph 会抛出 GraphRecursionError,这对生产环境很重要,因为模型偶尔会因为提示词、工具结果或状态设计问题进入循环。

LangGraph 的三个核心元素:State、Node、Edge

LangGraph 工作流由三类核心元素组成:

flowchart LR
    S[State 状态] <--> N1[Node 节点 A]
    S <--> N2[Node 节点 B]
    N1 -- Edge 边 --> N2
  • State:贯穿整个工作流的共享状态。
  • Node:实际执行逻辑的处理单元。
  • Edge:定义节点之间如何流转。

State:工作流的共享数据

State 是整张图运行过程中的状态快照。它可以保存用户消息、工具结果、检索文档、当前步骤、业务变量等数据。

在 Python 里,State 常用 TypedDict 或 Pydantic BaseModel 定义:

from typing import TypedDict
from langgraph.graph import StateGraph

class GraphState(TypedDict):
    user_input: str
    answer: str
    tool_result: dict

builder = StateGraph(GraphState)

节点接收 State,并返回要更新的字段:

def parse_input(state: GraphState) -> dict:
    text = state["user_input"]
    return {
        "answer": f"收到用户输入:{text}"
    }

注意,节点通常不应该把数据存在自己的全局变量里。更稳妥的方式是让所有可追踪、可恢复、可调试的数据都进入 State。

Node:图中的执行步骤

Node 是一个函数,也可以是一个 Agent、一个工具调用、一个子图,或者任意业务逻辑。

def input_node(state: GraphState) -> dict:
    return {
        "tool_result": {
            "input_checked": True
        }
    }

def output_node(state: GraphState) -> dict:
    return {
        "answer": "处理完成"
    }

builder.add_node("input", input_node)
builder.add_node("output", output_node)

如果节点需要额外参数,可以用 functools.partial 绑定:

from functools import partial

def process_node(state: GraphState, retry_count: int, mode: str) -> dict:
    return {
        "tool_result": {
            "mode": mode,
            "retry_count": retry_count
        }
    }

process_with_config = partial(process_node, retry_count=3, mode="strict")
builder.add_node("process", process_with_config)

节点设计建议:

原则说明
单一职责一个节点只做一类事情,比如检索、计算、审核、总结
无内部状态节点不要依赖隐藏的全局变量,状态应来自 State 或运行时上下文
可重试同样输入尽量产生同样输出,失败后重跑不会破坏业务数据
可测试节点最好能脱离整张图单独单元测试
副作用隔离写数据库、发消息、扣款等操作要特别处理幂等性

Edge:控制执行顺序

Edge 定义节点之间的连接关系。

LangGraph 有两个特殊节点:

  • START:图的入口。
  • END:图的终点。
from langgraph.graph import START, END

builder.add_edge(START, "input")
builder.add_edge("input", "process")
builder.add_edge("process", "output")
builder.add_edge("output", END)

graph = builder.compile()
result = graph.invoke({
    "user_input": "你好",
    "answer": "",
    "tool_result": {}
})

完整执行关系如下:

flowchart LR
    START((START)) --> A[input]
    A --> B[process]
    B --> C[output]
    C --> END((END))

compile() 会检查图结构是否合法,比如边指向了不存在的节点,就会在编译阶段报错。

节点重试与缓存

生产环境里的节点经常会调用外部服务,比如搜索、数据库、模型接口、第三方 API。这些调用可能因为网络抖动或限流失败,因此需要重试。

from langgraph.types import RetryPolicy
from requests import RequestException
from timeout_decorator import TimeoutError

retry_policy = RetryPolicy(
    max_attempts=3,
    initial_interval=1,
    backoff_factor=2,
    jitter=True,
    retry_on=[RequestException, TimeoutError],
)

builder.add_node("call_api", call_api_node, retry=retry_policy)

参数含义:

参数作用
max_attempts最大尝试次数
initial_interval第一次重试等待时间
backoff_factor每次重试等待时间的增长倍数
jitter加入随机抖动,避免大量请求同时重试
retry_on只对指定异常类型重试

对于耗时但结果短时间内稳定的节点,可以使用缓存:

import time
from typing import TypedDict
from langgraph.graph import StateGraph
from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy

class CacheState(TypedDict):
    x: int
    result: int

def expensive_node(state: CacheState) -> dict:
    time.sleep(2)
    return {
        "result": state["x"] * 2
    }

builder = StateGraph(CacheState)
builder.add_node(
    "expensive",
    expensive_node,
    cache_policy=CachePolicy(ttl=60)
)
builder.set_entry_point("expensive")
builder.set_finish_point("expensive")

graph = builder.compile(cache=InMemoryCache())

缓存适合:

场景建议
静态知识查询可以设置较长 TTL
天气、库存、价格TTL 要短,避免返回旧数据
有副作用操作不要随便缓存,比如支付、下单、发邮件
依赖用户权限的数据缓存键必须包含用户身份或权限信息

State 如何合并:Reducer

多个节点都可能更新 State。同一个字段被更新时,LangGraph 需要知道是覆盖、追加,还是用自定义逻辑合并。这个逻辑叫 Reducer。

默认策略:覆盖

如果字段没有指定 Reducer,后写入的值会覆盖旧值:

from typing import TypedDict

class OverrideState(TypedDict):
    data: dict

比如节点 A 返回:

{"data": {"step": "A"}}

节点 B 返回:

{"data": {"step": "B"}}

最终 data 会变成:

{"step": "B"}

使用 Annotated 指定合并策略

可以用 typing.Annotated 为字段指定合并函数。常见做法是用 operator.add 实现数字相加、列表追加、字符串拼接。

from typing import TypedDict, Annotated
from operator import add

class AddState(TypedDict):
    count: Annotated[int, add]
    logs: Annotated[list[str], add]
    text: Annotated[str, add]

def node_a(state: AddState) -> dict:
    return {
        "count": 1,
        "logs": ["node_a"],
        "text": "hello "
    }

def node_b(state: AddState) -> dict:
    return {
        "count": 2,
        "logs": ["node_b"],
        "text": "world"
    }

执行后:

{
    "count": 3,
    "logs": ["node_a", "node_b"],
    "text": "hello world"
}

add_messages:对话消息专用合并器

对话系统常用 add_messages 管理消息列表。它不是简单追加,而是会根据消息 ID 做合并:

  • 新消息 ID 不存在时追加。
  • 新消息 ID 已存在时覆盖旧消息。
  • 字符串可以自动转换成消息对象。
  • 适合多轮对话、工具调用结果、流式消息更新。
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage

class MessageState(TypedDict):
    messages: Annotated[list, add_messages]

def system_node(state: MessageState) -> dict:
    return {
        "messages": [
            SystemMessage(content="你是 LangGraph 工程助手。")
        ]
    }

def user_node(state: MessageState) -> dict:
    return {
        "messages": [
            HumanMessage(content="LangGraph 的 State 是什么?")
        ]
    }

def answer_node(state: MessageState) -> dict:
    return {
        "messages": [
            AIMessage(content="State 是图执行过程中的共享状态。")
        ]
    }

自定义 Reducer

业务状态经常需要自定义合并逻辑,比如字典合并、保留最大分数、按时间排序、去重追加。

from typing import TypedDict, Annotated

def merge_dict(old: dict, new: dict) -> dict:
    merged = old.copy()
    merged.update(new)
    return merged

def keep_max(old: int, new: int) -> int:
    return max(old, new)

class CustomState(TypedDict):
    metadata: Annotated[dict, merge_dict]
    max_score: Annotated[int, keep_max]

Reducer 的选择会直接影响图的行为。并行节点同时写同一字段时,Reducer 尤其重要,否则状态很容易被意外覆盖。

条件边:根据状态动态选择路径

固定边只能表达确定流程。真实 Agent 经常需要根据状态选择不同节点,比如:

  • 用户意图是查订单还是退货?
  • 模型置信度是否足够?
  • 工具调用是否失败?
  • 是否需要人工审核?

条件边通过路由函数实现。

from typing import Literal
from langgraph.graph import StateGraph, START, END

class RouteState(TypedDict):
    sentiment: str
    result: str

def route_by_sentiment(state: RouteState) -> Literal["positive", "negative", "neutral"]:
    return state["sentiment"]

def positive_node(state: RouteState) -> dict:
    return {"result": "用户情绪积极"}

def negative_node(state: RouteState) -> dict:
    return {"result": "用户情绪消极,需要优先处理"}

def neutral_node(state: RouteState) -> dict:
    return {"result": "用户情绪中性"}

builder = StateGraph(RouteState)
builder.add_node("positive_handler", positive_node)
builder.add_node("negative_handler", negative_node)
builder.add_node("neutral_handler", neutral_node)

builder.add_conditional_edges(
    START,
    route_by_sentiment,
    {
        "positive": "positive_handler",
        "negative": "negative_handler",
        "neutral": "neutral_handler",
    }
)

builder.add_edge("positive_handler", END)
builder.add_edge("negative_handler", END)
builder.add_edge("neutral_handler", END)

graph = builder.compile()

执行结构如下:

flowchart TD
    START((START)) --> R{route_by_sentiment}
    R -- positive --> A[positive_handler]
    R -- negative --> B[negative_handler]
    R -- neutral --> C[neutral_handler]
    A --> END((END))
    B --> END
    C --> END

LangGraph 支持把图导出成 Mermaid 图片,便于检查实际执行结构:

png_data = graph.get_graph().draw_mermaid_png()

with open("graph.png", "wb") as f:
    f.write(png_data)

Send:动态并行分发任务

Send 用来动态创建多个执行分支,适合 Map-Reduce 场景。例如用户要求总结 10 篇文档,可以为每篇文档创建一个并行任务,再把结果合并。

flowchart TD
    A[生成任务列表] --> B{Send 多个任务}
    B --> C1[处理任务 1]
    B --> C2[处理任务 2]
    B --> C3[处理任务 3]
    C1 --> D[汇总结果]
    C2 --> D
    C3 --> D

示例代码:

from typing import TypedDict, Annotated
from operator import add
from langgraph.types import Send
from langgraph.graph import StateGraph, START, END

class MapReduceState(TypedDict):
    tasks: list[str]
    results: Annotated[list[str], add]
    final_answer: str

def generate_tasks(state: MapReduceState) -> dict:
    return {
        "tasks": ["文档A", "文档B", "文档C"]
    }

def route_tasks(state: MapReduceState) -> list[Send]:
    return [
        Send("process_task", {"tasks": [task], "results": [], "final_answer": ""})
        for task in state["tasks"]
    ]

def process_task(state: MapReduceState) -> dict:
    task = state["tasks"][0]
    return {
        "results": [f"{task} 的摘要"]
    }

def reduce_results(state: MapReduceState) -> dict:
    return {
        "final_answer": "\n".join(state["results"])
    }

builder = StateGraph(MapReduceState)
builder.add_node("generate_tasks", generate_tasks)
builder.add_node("process_task", process_task)
builder.add_node("reduce_results", reduce_results)

builder.add_edge(START, "generate_tasks")
builder.add_conditional_edges("generate_tasks", route_tasks)
builder.add_edge("process_task", "reduce_results")
builder.add_edge("reduce_results", END)

graph = builder.compile()

这里 results 使用 Annotated[list[str], add],因为多个并行分支都会写入结果列表。如果没有合并器,后完成的分支可能覆盖先完成的结果。

Command:同时更新状态和控制路由

条件边只负责决定去哪个节点。Command 更进一步,它可以在节点返回时同时做两件事:

  1. 更新 State。
  2. 指定后续跳转节点。

适合以下场景:

  • Agent 判断自己无法处理,要移交给专家 Agent。
  • 人工输入恢复后,需要跳转到指定节点。
  • 子图需要跳回父图。
  • 节点内部根据执行结果决定后续路径,同时写入状态。
from typing import TypedDict
from langgraph.types import Command

class AgentState(TypedDict):
    messages: list
    need_expert: bool
    route_reason: str

def triage_agent(state: AgentState) -> Command:
    if state["need_expert"]:
        return Command(
            goto="expert_agent",
            update={
                "route_reason": "当前问题需要专家 Agent 处理"
            }
        )

    return Command(
        goto="normal_agent",
        update={
            "route_reason": "普通 Agent 可以处理"
        }
    )

SendCommand 的差异可以这样理解:

机制主要用途是否更新状态是否适合并行
条件边根据条件选择路径一般不用于动态并行
Send动态创建多个分支给目标节点传入局部状态适合
Command状态更新 + 路由控制可以结合 Send 使用

检查点:让工作流可以恢复

Agent 工作流可能执行很多步,中间还可能调用外部工具。如果进程重启、服务异常或需要人工确认,不能让整个任务从头再跑。

LangGraph 的检查点机制会在每个步骤后保存状态快照。快照里包含:

  • 当前 State 的值。
  • 后续要执行的节点。
  • 当前配置。
  • 元数据。
  • 待执行任务。
  • 中断信息。
  • 错误信息。
sequenceDiagram
    participant App as 应用
    participant Graph as LangGraph
    participant Store as Checkpointer

    App->>Graph: invoke(input, thread_id)
    Graph->>Graph: 执行节点 A
    Graph->>Store: 保存检查点 A
    Graph->>Graph: 执行节点 B
    Graph->>Store: 保存检查点 B
    Graph-->>App: 返回结果或中断信息

内存检查点示例:

from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

config = {
    "configurable": {
        "thread_id": "user-session-001"
    }
}

result = graph.invoke(
    {"value": 5, "operations": []},
    config=config
)

thread_id 是 LangGraph 用来区分会话的 ID,不是操作系统线程 ID。同一个用户的连续对话、同一个长任务的多次恢复,都应该使用同一个 thread_id

获取历史检查点:

history = list(graph.get_state_history(config))

for index, snapshot in enumerate(history):
    print(index, snapshot.next, snapshot.values)

常见 Checkpointer:

存储适合场景
内存本地测试、单进程 Demo
SQLite/PostgreSQL需要持久化、可审计的业务任务
Redis高并发、短期会话、恢复速度要求高
自定义存储企业内部已有状态存储系统

时间旅行:从历史状态重新执行

启用检查点后,LangGraph 可以回到某个历史状态,检查当时的 State,甚至修改状态后从那里重新执行。这种能力通常叫时间旅行。

适用场景:

场景用法
调试回到出错前的状态,重跑后续节点
审计查看某一步模型看到了什么状态
分支探索修改历史状态,比较不同执行路径
人工修正人发现中间结果错了,改完继续跑

示例:

checkpoints = list(graph.get_state_history(config))

# 注意:历史列表通常按时间倒序返回,index 0 往往是最新状态
target = checkpoints[2]

graph.update_state(
    target.config,
    {
        "operations": ["人工修正后的操作记录"]
    }
)

result = graph.invoke(None, config=target.config)

时间旅行很适合调试 Agent,因为 Agent 的错误很多不是代码异常,而是“某一步模型做了错误判断”。如果没有完整状态记录,只看最终结果很难定位问题。

人机协作:在关键节点暂停

HIL(Human-in-the-Loop,人机协作)是在自动化流程中加入人工确认。常见场景包括:

  • 执行命令前让用户确认。
  • 下单、付款、删除数据前人工审批。
  • AI 置信度低时转人工。
  • 生成代码补丁后等待开发者接受。
  • 合规审核、风控审核、内容审核。

LangGraph 通过 interrupt() 暂停工作流,再通过 Command(resume=...) 恢复。

from typing import TypedDict, Annotated
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver

class ReviewState(TypedDict):
    request: str
    analysis: str
    approved: bool
    human_feedback: str

def analyze_node(state: ReviewState) -> dict:
    return {
        "analysis": f"对请求进行分析:{state['request']}"
    }

def human_review_node(state: ReviewState) -> dict:
    review_payload = {
        "type": "human_review",
        "request": state["request"],
        "analysis": state["analysis"],
        "prompt": "请输入 approve 或 reject,并给出原因"
    }

    human_input = interrupt(review_payload)

    return {
        "approved": human_input["decision"] == "approve",
        "human_feedback": human_input["feedback"]
    }

def route_after_review(state: ReviewState) -> str:
    return "execute" if state["approved"] else "reject"

恢复执行:

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

config = {
    "configurable": {
        "thread_id": "review-task-001"
    }
}

# 第一次执行,到 human_review_node 会暂停
result = graph.invoke(
    {
        "request": "删除生产环境中的过期数据",
        "analysis": "",
        "approved": False,
        "human_feedback": ""
    },
    config=config
)

# 人工输入来自 Web 页面、后台系统或命令行
resume_cmd = Command(
    resume={
        "decision": "approve",
        "feedback": "确认只删除 30 天前的过期数据"
    }
)

final_result = graph.invoke(resume_cmd, config=config)

人机协作的执行过程:

sequenceDiagram
    participant User as 用户/审核人
    participant App as 业务系统
    participant Graph as LangGraph
    participant CP as Checkpointer

    App->>Graph: invoke(input)
    Graph->>Graph: 执行 analyze
    Graph->>CP: 保存状态
    Graph->>Graph: 执行 human_review
    Graph-->>App: 返回 interrupt 信息
    App-->>User: 展示待审核内容
    User-->>App: 提交审核意见
    App->>Graph: invoke(Command(resume=...), same thread_id)
    Graph->>Graph: 从 interrupt 位置恢复
    Graph->>Graph: 执行后续节点
    Graph-->>App: 返回最终结果

一个很容易踩的坑:interrupt() 不是阻塞等待。它会让当前 invoke() 返回中断信息。恢复时,节点函数会从调用 interrupt() 的位置重新进入,因此 interrupt() 之前的代码可能被重复执行。

错误写法:

def review_node(state):
    # 恢复执行时可能重复写数据库
    db.insert_audit_log(state["request"])

    human_input = interrupt({"prompt": "是否通过?"})
    return {"approved": human_input["approved"]}

更稳妥的写法是把副作用操作放到独立节点,或者加入幂等键:

def review_node(state):
    human_input = interrupt({"prompt": "是否通过?"})
    return {"approved": human_input["approved"]}

def write_audit_node(state):
    # 使用 request_id 做幂等控制
    db.upsert_audit_log(
        request_id=state["request_id"],
        approved=state["approved"]
    )
    return {}

记忆:短期记忆和长期记忆

Agent 的记忆可以分成两类:

类型生命周期实现方式用途
短期记忆单个会话内State + Checkpoint多轮对话上下文、当前任务状态
长期记忆跨会话Store用户偏好、历史资料、长期画像

短期记忆

短期记忆依赖 thread_id。同一个 thread_id 下,图可以从之前的消息和状态继续执行。

from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {
    "configurable": {
        "thread_id": "chat-user-001"
    }
}

graph.invoke(
    {
        "messages": [
            {"role": "user", "content": "我叫张三"}
        ]
    },
    config=config
)

graph.invoke(
    {
        "messages": [
            {"role": "user", "content": "我叫什么?"}
        ]
    },
    config=config
)

长期记忆

长期记忆通过 Store 实现。它类似键值数据库,也可以结合向量检索做语义搜索。

from langgraph.store.memory import InMemoryStore
from langgraph.config import get_store

namespace = ("users", "profile")

def write_profile(state: dict) -> dict:
    store = get_store()
    store.put(
        namespace,
        "user_123",
        {
            "name": "张三",
            "city": "深圳",
            "preference": "喜欢简洁回答"
        }
    )
    return {}

def read_profile(state: dict) -> dict:
    store = get_store()
    profile = store.get(namespace, "user_123")
    search_result = store.search(namespace, query="深圳", limit=10)

    return {
        "profile": profile.value if profile else None,
        "search_result": [item.value for item in search_result]
    }

store = InMemoryStore()
graph = builder.compile(store=store)

长期记忆要谨慎设计命名空间,避免不同用户或不同业务的数据互相污染。

子图:把复杂工作流拆成可复用模块

LangGraph 支持把一张完整的图作为另一张图的节点。这个能力适合复杂系统拆分:

flowchart TD
    A[父图: 接收用户任务] --> B[子图: 资料检索]
    B --> C[子图: 数据分析]
    C --> D[子图: 报告生成]
    D --> E[父图: 汇总返回]

当父图和子图有同名 State 字段时,这些字段可以共享。

from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class ParentState(TypedDict):
    messages: list[str]

class SubgraphState(TypedDict):
    messages: list[str]
    sub_result: str

def sub_node(state: SubgraphState) -> dict:
    return {
        "messages": state["messages"] + ["子图处理完成"],
        "sub_result": "子图结果"
    }

sub_builder = StateGraph(SubgraphState)
sub_builder.add_node("sub_node", sub_node)
sub_builder.add_edge(START, "sub_node")
sub_builder.add_edge("sub_node", END)
subgraph = sub_builder.compile()

parent_builder = StateGraph(ParentState)
parent_builder.add_node("subgraph", subgraph)
parent_builder.add_edge(START, "subgraph")
parent_builder.add_edge("subgraph", END)

parent_graph = parent_builder.compile()

如果父图和子图的状态结构不同,可以用代理节点做状态转换:

def call_subgraph(state: ParentState) -> dict:
    sub_input = {
        "messages": state["messages"],
        "sub_result": ""
    }

    sub_output = subgraph.invoke(sub_input)

    return {
        "messages": sub_output["messages"]
    }

parent_builder.add_node("call_subgraph", call_subgraph)

子图的好处是边界清晰。检索、分析、生成、审核都可以做成独立子图,分别测试,再组合到父图里。

集成 MCP:把外部工具接入 Agent

MCP(Model Context Protocol,模型上下文协议)是一套开放协议,用来规范应用如何向大模型提供工具和上下文。LangGraph 可以通过 langchain-mcp-adapters 使用 MCP Server 暴露的工具。

安装依赖:

pip install langchain-mcp-adapters

创建 MCP 工具服务

from datetime import datetime
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("demo-tools")

@mcp.tool()
def get_weather(location: str) -> str:
    """获取指定地点天气"""
    return f"{location}今天晴天"

@mcp.tool()
def get_time() -> str:
    """获取当前时间"""
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

@mcp.tool()
def add(a: int, b: int) -> int:
    """两个整数相加"""
    return a + b

@mcp.tool()
def multiply(a: int, b: int) -> int:
    """两个整数相乘"""
    return a * b

if __name__ == "__main__":
    mcp.run(transport="streamable-http")

在 Agent 中加载 MCP 工具

import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent

async def build_agent():
    client = MultiServerMCPClient(
        {
            "demo": {
                "url": "http://localhost:8000/mcp",
                "transport": "streamable_http",
            }
        }
    )

    tools = await client.get_tools()

    model = ChatOpenAI(
        model="gpt-4o-mini",
        base_url="https://api.example.com/v1",
        api_key="your-api-key"
    )

    return create_react_agent(model, tools)

async def main():
    agent = await build_agent()

    result = await agent.ainvoke({
        "messages": [
            HumanMessage(content="计算 (15 + 7) * 3,并告诉我现在几点")
        ]
    })

    print(result["messages"][-1].content)

asyncio.run(main())

MCP 的价值在于工具标准化。不同系统只要按 MCP 暴露工具,Agent 侧就可以统一接入,而不需要为每个工具写一套私有适配逻辑。

运行时上下文:传递不属于 State 的信息

State 适合保存工作流状态,但有些信息不应该进入 State,例如:

  • 当前使用的模型供应商。
  • 数据库连接。
  • 租户 ID。
  • 环境配置。
  • 日志追踪 ID。

LangGraph 可以定义运行时上下文:

from dataclasses import dataclass
from langgraph.runtime import Runtime

@dataclass
class ContextSchema:
    llm_provider: str = "openai"
    trace_id: str = ""

def node_a(state: dict, runtime: Runtime[ContextSchema]) -> dict:
    provider = runtime.context.llm_provider
    trace_id = runtime.context.trace_id

    return {
        "provider_used": provider,
        "trace_id": trace_id
    }

builder = StateGraph(dict, context_schema=ContextSchema)
builder.add_node("node_a", node_a)

graph = builder.compile()

graph.invoke(
    {"input": "hello"},
    context={
        "llm_provider": "deepseek",
        "trace_id": "trace-001"
    }
)

简单判断标准:会影响流程恢复和业务结果的数据放进 State;运行依赖、连接对象、追踪信息放进 Context。

为什么需要多智能体

单 Agent 在简单任务里很好用,但复杂系统会遇到三个典型问题:

  1. 工具太多:一个 Agent 挂几十个工具后,模型可能选错工具,或者反复调用同一个工具。
  2. 上下文太长:所有信息都塞进一个 Agent 的消息里,容易超过上下文窗口,也会增加推理干扰。
  3. 职责太杂:同一个 Agent 既做规划、又做检索、又写代码、又审核安全,提示词会越来越难维护。

多智能体系统把任务拆给多个专业 Agent:

flowchart TD
    U[用户任务] --> P[规划 Agent]
    P --> R[检索 Agent]
    P --> C[代码 Agent]
    P --> A[分析 Agent]
    R --> S[汇总 Agent]
    C --> S
    A --> S
    S --> U

拆分后的收益更具体:

收益说明
模块化每个 Agent 独立开发、测试、替换
专业化不同 Agent 使用不同提示词、工具和模型
可控性可以明确限制 Agent 之间如何通信
可观测性更容易定位哪个 Agent 做错了
成本优化简单 Agent 用小模型,关键 Agent 用强模型

常见多智能体架构

多智能体系统常见结构如下:

架构工作方式适合场景代价
Network 网络型每个 Agent 都能和其他 Agent 通信高自由度探索、研究型任务路径难预测,调试复杂
Supervisor 主管型中央主管决定调用哪个 Agent流程明确、需要稳定编排主管容易成为瓶颈
Supervisor as Tools子 Agent 被包装成工具,主管通过工具调用它们接口边界清晰的专业能力调用子 Agent 自主性较弱
Hierarchical 层级型多层 Supervisor 管理多个团队大型复杂任务、企业流程架构设计和状态管理更复杂
Custom 自定义图用 LangGraph 自定义节点和边业务规则强、流程特殊需要更多工程设计

结构示意:

flowchart TD
    subgraph Supervisor 模式
        U1[用户] --> S[主管 Agent]
        S --> A1[航班 Agent]
        S --> A2[酒店 Agent]
        S --> A3[支付 Agent]
    end

    subgraph Swarm 模式
        U2[用户] --> B1[航班 Agent]
        B1 <--> B2[酒店 Agent]
        B2 <--> B3[行程 Agent]
    end

Agent 之间如何通信

多智能体通信主要有两种模式:handoff 和 tool calling。

通信方式含义适合场景
Handoff 交接一个 Agent 把控制权交给另一个 Agent自主协作、流程不固定
Tool Calling 工具调用一个 Agent 把另一个 Agent 当工具调用中央调度、接口明确

共享完整历史,还是只共享结果

Agent 之间传递消息时,还有一个关键选择:传全部推理过程,还是只传最终结果。

策略优点缺点
共享完整历史其他 Agent 能理解完整上下文和中间步骤State 膨胀快,成本高,容易引入噪声
只共享最终结果状态简洁,边界清楚,便于控制其他 Agent 看不到推理依据,协作能力弱
共享结构化摘要兼顾信息量和可控性需要设计摘要格式

实践中更推荐共享结构化摘要,而不是直接把所有中间消息丢给下一个 Agent。

{
    "task": "预订酒店",
    "constraints": {
        "city": "深圳",
        "budget": "500元以内",
        "date": "2026-06-10"
    },
    "previous_result": {
        "flight": "北京到深圳航班已预订"
    }
}

Supervisor:集中调度的多智能体架构

Supervisor 架构里,中央主管 Agent 负责理解用户任务、拆分任务、选择工作 Agent、整合结果。

sequenceDiagram
    participant U as 用户
    participant S as Supervisor
    participant F as 航班 Agent
    participant H as 酒店 Agent

    U->>S: 帮我订机票和酒店
    S->>F: 处理航班预订
    F-->>S: 航班预订成功
    S->>H: 处理酒店预订
    H-->>S: 酒店预订成功
    S-->>U: 汇总预订结果

安装:

pip install langgraph-supervisor

示例:

from langgraph.prebuilt import create_react_agent
from langgraph_supervisor import create_supervisor

def book_flight(from_airport: str, to_airport: str) -> str:
    """预订航班"""
    return f"已预订从 {from_airport} 到 {to_airport} 的航班。"

def book_hotel(hotel_name: str) -> str:
    """预订酒店"""
    return f"已预订 {hotel_name}。"

flight_agent = create_react_agent(
    model=model,
    tools=[book_flight],
    name="flight_agent",
    prompt=(
        "你是航班预订助手,只处理航班预订。"
        "提取出发地和目的地后调用 book_flight。"
        "完成后向主管汇报结果,不要重复调用工具。"
    )
)

hotel_agent = create_react_agent(
    model=model,
    tools=[book_hotel],
    name="hotel_agent",
    prompt=(
        "你是酒店预订助手,只处理酒店预订。"
        "如果用户没有指定酒店,选择经济型酒店。"
        "完成后向主管汇报结果,不要重复调用工具。"
    )
)

workflow = create_supervisor(
    agents=[flight_agent, hotel_agent],
    model=model,
    prompt=(
        "你是任务调度主管。"
        "用户需要航班时调用 flight_agent,用户需要酒店时调用 hotel_agent。"
        "每个任务完成一次即可,不要重复调用。"
        "所有任务完成后汇总结果并结束。"
    )
).compile()

result = workflow.invoke({
    "messages": [
        {
            "role": "user",
            "content": "帮我预订北京到深圳的机票,再订一个酒店"
        }
    ]
})

Supervisor 可以控制工作 Agent 的消息输出模式:

# 保留工作 Agent 的完整消息历史
workflow = create_supervisor(
    agents=[agent1, agent2],
    model=model,
    output_mode="full_history"
)

# 只保留工作 Agent 的最后一条消息
workflow = create_supervisor(
    agents=[agent1, agent2],
    model=model,
    output_mode="last_message"
)

多层 Supervisor 可以组成层级架构:

research_team = create_supervisor(
    agents=[search_agent, math_agent],
    model=model,
    supervisor_name="research_supervisor"
).compile(name="research_team")

writing_team = create_supervisor(
    agents=[outline_agent, editor_agent],
    model=model,
    supervisor_name="writing_supervisor"
).compile(name="writing_team")

top_supervisor = create_supervisor(
    agents=[research_team, writing_team],
    model=model,
    supervisor_name="top_supervisor"
).compile(name="top_supervisor")

给 Supervisor 添加短期记忆和长期记忆:

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore

checkpointer = InMemorySaver()
store = InMemoryStore()

workflow = create_supervisor(
    agents=[flight_agent, hotel_agent],
    model=model
).compile(
    checkpointer=checkpointer,
    store=store
)

Supervisor 适合流程边界比较明确、需要集中管控的任务,比如客服工单流转、企业审批、运维自动化、金融风控分析。

Swarm:去中心化交接的多智能体架构

Swarm 架构没有固定主管。每个 Agent 都可以根据任务状态决定是否把控制权交给另一个 Agent。

flowchart LR
    U[用户] --> F[航班 Agent]
    F -- 需要酒店 --> H[酒店 Agent]
    H -- 需要行程规划 --> T[行程 Agent]
    T -- 需要补充航班 --> F

安装:

pip install langgraph-swarm

示例:

from langgraph.prebuilt import create_react_agent
from langgraph_swarm import create_swarm, create_handoff_tool
from langchain_core.messages import HumanMessage

def book_flight(from_airport: str, to_airport: str) -> str:
    """预订航班"""
    return f"已预订从 {from_airport} 到 {to_airport} 的航班。"

def book_hotel(hotel_name: str) -> str:
    """预订酒店"""
    return f"已预订 {hotel_name}。"

transfer_to_hotel = create_handoff_tool(
    agent_name="hotel_agent",
    description="用户需要预订酒店时,把任务交给酒店 Agent。"
)

transfer_to_flight = create_handoff_tool(
    agent_name="flight_agent",
    description="用户需要预订航班时,把任务交给航班 Agent。"
)

flight_agent = create_react_agent(
    model=model,
    tools=[book_flight, transfer_to_hotel],
    name="flight_agent",
    prompt=(
        "你是航班预订助手。"
        "需要订航班时调用 book_flight。"
        "如果用户还需要酒店,完成航班后调用 transfer_to_hotel。"
    )
)

hotel_agent = create_react_agent(
    model=model,
    tools=[book_hotel, transfer_to_flight],
    name="hotel_agent",
    prompt=(
        "你是酒店预订助手。"
        "需要订酒店时调用 book_hotel。"
        "如果用户还需要航班,调用 transfer_to_flight。"
        "所有需求完成后向用户确认。"
    )
)

swarm = create_swarm(
    agents=[flight_agent, hotel_agent],
    default_active_agent="flight_agent"
).compile()

result = swarm.invoke({
    "messages": [
        HumanMessage(content="帮我预订北京到上海的航班,并预订如家酒店")
    ]
})

Swarm 也支持短期记忆和长期记忆:

checkpointer = InMemorySaver()
store = InMemoryStore()

swarm = create_swarm(
    agents=[flight_agent, hotel_agent],
    default_active_agent="flight_agent"
).compile(
    checkpointer=checkpointer,
    store=store
)

Supervisor 和 Swarm 的选择可以参考这张表:

对比项SupervisorSwarm
控制方式中央主管统一调度Agent 之间自主交接
可预测性更强较弱
灵活性中等更强
调试难度相对低相对高
适合任务企业流程、审批、客服、运维探索型任务、开放协作、复杂咨询
主要风险主管判断错误会影响全局可能来回交接或循环

复杂系统可以混合使用:关键路径用 Supervisor 管控,开放探索环节用 Swarm 协作。

自定义 handoff:控制交接时传什么状态

handoff 的核心是两件事:

  1. 目标 Agent 是谁。
  2. 给目标 Agent 传什么状态。

可以自己实现交接工具,把任务描述和当前 State 转成目标 Agent 的输入。

from typing import Annotated
from langchain_core.tools import tool
from langgraph.graph import MessagesState
from langgraph.prebuilt import InjectedState
from langgraph.types import Command, Send

def create_task_handoff_tool(agent_name: str, description: str):
    tool_name = f"transfer_to_{agent_name}"

    @tool(tool_name, description=description)
    def handoff_tool(
        task_description: Annotated[str, "给下一个 Agent 的任务说明"],
        state: Annotated[MessagesState, InjectedState],
    ) -> Command:
        agent_input = {
            **state,
            "messages": [
                {
                    "role": "user",
                    "content": task_description
                }
            ]
        }

        return Command(
            goto=[Send(agent_name, agent_input)],
            graph=Command.PARENT,
        )

    return handoff_tool

使用自定义交接工具:

transfer_to_hotel = create_task_handoff_tool(
    agent_name="hotel_agent",
    description="把任务交给酒店预订 Agent"
)

transfer_to_flight = create_task_handoff_tool(
    agent_name="flight_agent",
    description="把任务交给航班预订 Agent"
)

自定义 handoff 适合需要强约束输入格式的场景。比如只允许传结构化 JSON,不允许把完整聊天历史交给下一个 Agent。

多智能体系统的稳定性问题

多智能体示例跑通不难,跑稳比较难。常见问题包括:

问题原因处理方式
重复调用工具提示词没有明确终止条件,状态没有记录任务完成在 State 里记录 completed tasks,提示词要求完成后停止
Agent 来回交接handoff 工具描述太宽泛,路由条件不清晰明确每个 Agent 的职责边界
上下文膨胀全量共享历史只共享结构化摘要或最后结果
工具选错工具太多、描述相似拆分 Agent,减少单个 Agent 的工具数量
死循环条件边没有出口,模型反复规划设置 recursion_limit,增加终止节点
恢复后副作用重复interrupt 前执行了写操作副作用独立节点化,或者用幂等键
成本过高每个 Agent 都用大模型按任务难度选择不同模型

提示词也要工程化。不要只写“你是一个很专业的助手”,而要写清楚:

  • 这个 Agent 负责什么。
  • 不负责什么。
  • 什么时候调用工具。
  • 工具最多调用几次。
  • 完成任务后如何汇报。
  • 什么条件下必须结束。
  • 什么条件下移交给其他 Agent。

Java 生态:LangChain4J 与 LangGraph4J

Python 生态适合快速实验,但很多企业后端系统以 Java 为主。Java 项目可以用 LangChain4J 调用模型、工具、记忆和防护机制,再用 LangGraph4J 编排复杂工作流。

Maven 依赖

<!-- LangChain4J 核心包 -->
<dependency>
    <groupId>dev.langchain4j</groupId>
    <artifactId>langchain4j</artifactId>
    <version>1.6.0</version>
</dependency>

<!-- OpenAI 兼容接口 -->
<dependency>
    <groupId>dev.langchain4j</groupId>
    <artifactId>langchain4j-open-ai</artifactId>
    <version>1.2.0</version>
</dependency>

<!-- LangGraph4J -->
<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-core</artifactId>
    <version>1.5.2</version>
</dependency>

用 LangChain4J 调用大模型

import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.model.openai.OpenAiChatModel;
import dev.langchain4j.data.message.SystemMessage;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.data.message.AiMessage;

import java.time.Duration;

public class ChatDemo {

    public static void main(String[] args) {
        ChatModel chatModel = OpenAiChatModel.builder()
                .baseUrl("https://api.example.com/v1")
                .apiKey("your-api-key")
                .modelName("gpt-4o-mini")
                .timeout(Duration.ofSeconds(60))
                .maxRetries(3)
                .temperature(0.7)
                .logRequests(true)
                .logResponses(true)
                .build();

        SystemMessage systemMessage = SystemMessage.from(
                "你是 LangChain 和 LangGraph 工程助手。"
        );

        UserMessage userMessage = UserMessage.from(
                "介绍一下 LangGraph 的核心概念。"
        );

        ChatResponse response = chatModel.chat(systemMessage, userMessage);
        AiMessage aiMessage = response.aiMessage();

        System.out.println(aiMessage.text());
    }
}

提示词模板

import dev.langchain4j.model.input.PromptTemplate;

import java.util.Map;

PromptTemplate template = PromptTemplate.from(
        "你是一个{{domain}}领域专家,请回答:{{question}}"
);

String prompt = template.apply(Map.of(
        "domain", "LangGraph",
        "question", "State、Node、Edge 分别是什么?"
)).text();

chatModel.chat(prompt);

AI Service:把模型能力封装成 Java 接口

AI Service 让模型调用看起来像普通 Java Service。

import dev.langchain4j.service.AiServices;
import dev.langchain4j.service.SystemMessage;
import dev.langchain4j.service.UserMessage;
import dev.langchain4j.service.V;

interface GraphAssistant {

    @SystemMessage("你是 LangGraph 专家。")
    @UserMessage("请回答用户问题:{{question}}")
    String answer(@V("question") String question);
}

public class AiServiceDemo {

    public static void main(String[] args) {
        GraphAssistant assistant = AiServices.create(
                GraphAssistant.class,
                chatModel
        );

        String answer = assistant.answer("LangGraph 的 Reducer 有什么用?");
        System.out.println(answer);
    }
}

添加短期记忆

import dev.langchain4j.memory.chat.MessageWindowChatMemory;

GraphAssistant assistant = AiServices.builder(GraphAssistant.class)
        .chatModel(chatModel)
        .chatMemory(MessageWindowChatMemory.withMaxMessages(10))
        .build();

使用工具

import dev.langchain4j.agent.tool.Tool;
import dev.langchain4j.agent.tool.P;

public class StockTools {

    @Tool("查询公司股价")
    public String getStockPrice(@P("公司名称") String company) {
        return company + " 当前股价为 100";
    }
}

StockAssistant assistant = AiServices.builder(StockAssistant.class)
        .chatModel(chatModel)
        .tools(new StockTools())
        .build();

Guardrail:输入输出防护

Guardrail 用来在模型调用前后做安全控制:

类型执行时机典型用途
输入 Guardrail用户输入进入模型前敏感词拦截、提示注入检测、格式校验
输出 Guardrail模型结果返回用户前内容安全、JSON 校验、结果重试、格式修正

示例:

import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.guardrail.InputGuardrail;
import dev.langchain4j.guardrail.InputGuardrailResult;
import dev.langchain4j.guardrail.OutputGuardrail;
import dev.langchain4j.guardrail.OutputGuardrailResult;

import java.util.Set;

public class SensitiveInputGuardrail implements InputGuardrail {

    private static final Set<String> SENSITIVE_WORDS =
            Set.of("攻击", "作弊", "开挂");

    @Override
    public InputGuardrailResult validate(UserMessage userMessage) {
        String input = userMessage.singleText();

        for (String word : SENSITIVE_WORDS) {
            if (input.contains(word)) {
                return fatal("请求包含违规内容,已拦截。");
            }
        }

        return InputGuardrailResult.success();
    }
}

public class SafetyOutputGuardrail implements OutputGuardrail {

    @Override
    public OutputGuardrailResult validate(AiMessage aiMessage) {
        String output = aiMessage.text();

        if (output.contains("不安全内容")) {
            return retry("请用安全、中立、合规的方式重新回答。");
        }

        return OutputGuardrailResult.success();
    }
}

应用到 AI Service:

import dev.langchain4j.service.guardrail.InputGuardrails;
import dev.langchain4j.service.guardrail.OutputGuardrails;

@InputGuardrails(SensitiveInputGuardrail.class)
@OutputGuardrails(SafetyOutputGuardrail.class)
interface SafeAssistant {
    String chat(String userMessage);
}

多模态调用

如果模型支持图像输入,可以构造包含文本和图片的消息。

import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.data.message.TextContent;
import dev.langchain4j.data.message.ImageContent;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Base64;

byte[] imageBytes = Files.readAllBytes(Paths.get("demo.png"));
String base64 = Base64.getEncoder().encodeToString(imageBytes);

UserMessage message = UserMessage.from(
        TextContent.from("描述图片内容"),
        ImageContent.from(base64, "image/png")
);

chatModel.chat(message);

用 LangGraph4J 构建工作流

LangGraph4J 的概念和 Python 版一致:State、Node、Edge、条件边、检查点、人机协作,只是 API 写法换成 Java。

创建 Node、Edge、State

import org.bsc.langgraph4j.StateGraph;
import org.bsc.langgraph4j.CompiledGraph;
import org.bsc.langgraph4j.GraphStateException;
import org.bsc.langgraph4j.action.AsyncNodeAction;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class GraphDemo {

    public static void main(String[] args) throws GraphStateException {
        StateGraph<AgentState> graph = new StateGraph<>(AgentState::new);

        graph.addNode("input_node", AsyncNodeAction.node_async(state -> {
            System.out.println("[input_node] state = " + state.data());
            return Map.of("input_checked", true);
        }));

        graph.addNode("process_node", AsyncNodeAction.node_async(state -> {
            System.out.println("[process_node] state = " + state.data());
            return Map.of("processed", true);
        }));

        graph.addEdge(StateGraph.START, "input_node");
        graph.addEdge("input_node", "process_node");
        graph.addEdge("process_node", StateGraph.END);

        CompiledGraph<AgentState> compiled = graph.compile();

        Map<String, Object> input = new HashMap<>();
        input.put("user_input", "hello");

        Optional<AgentState> result = compiled.invoke(input);
        result.ifPresent(state -> System.out.println(state.data()));
    }
}

Channels:Java 版状态合并策略

LangGraph4J 通过 Channels 定义字段合并规则,作用类似 Python 版 Reducer。

import org.bsc.langgraph4j.state.Channel;
import org.bsc.langgraph4j.state.Channels;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;

Map<String, Channel<?>> channels = new LinkedHashMap<>();

// 列表追加
channels.put("messages", Channels.appender(ArrayList::new));

// 数字累加
channels.put("counter", Channels.base(Integer::sum, () -> 0));

// 保留最大值
channels.put("max_score", Channels.base(Math::max, () -> 0));

StateGraph<AgentState> graph = new StateGraph<>(channels, AgentState::new);

条件边

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

Map<String, String> mapping = new HashMap<>();
mapping.put("pass", "pass_handler");
mapping.put("fail", "fail_handler");

graph.addConditionalEdges(
        "score_node",
        state -> {
            int score = (Integer) state.value("score").orElse(0);
            String route = score >= 90 ? "pass" : "fail";
            return CompletableFuture.completedFuture(route);
        },
        mapping
);

检查点

import org.bsc.langgraph4j.checkpoint.MemorySaver;
import org.bsc.langgraph4j.CompileConfig;

MemorySaver checkpoint = new MemorySaver();

CompileConfig config = CompileConfig.builder()
        .checkpointSaver(checkpoint)
        .build();

CompiledGraph<AgentState> compiled = graph.compile(config);
compiled.invoke(new HashMap<>());

Java 版人机协作

LangGraph4J 可以在指定节点前中断,等待人工输入后继续执行。

import org.bsc.langgraph4j.RunnableConfig;

MemorySaver checkpointer = new MemorySaver();

StateGraph<AgentState> graph = new StateGraph<>(AgentState::new);

graph.addNode("receive_question", AsyncNodeAction.node_async(state -> {
    return Map.of(
            "status", "received",
            "timestamp", System.currentTimeMillis()
    );
}));

graph.addNode("ai_answer", AsyncNodeAction.node_async(state -> {
    return Map.of(
            "status", "ai_answered",
            "ai_response", "需要人工处理",
            "confidence", 40
    );
}));

graph.addNode("human_agent", AsyncNodeAction.node_async(state -> {
    return Map.of(
            "human_response", "人工回复内容",
            "handled_by", "human",
            "status", "human_handled"
    );
}));

graph.addNode("complete", AsyncNodeAction.node_async(state -> {
    return Map.of(
            "status", "completed",
            "completion_time", System.currentTimeMillis()
    );
}));

graph.addEdge(StateGraph.START, "receive_question");
graph.addEdge("receive_question", "ai_answer");

graph.addConditionalEdges(
        "ai_answer",
        state -> {
            int confidence = (Integer) state.value("confidence").orElse(0);
            return CompletableFuture.completedFuture(
                    confidence < 60 ? "human" : "complete"
            );
        },
        Map.of(
                "human", "human_agent",
                "complete", "complete"
        )
);

graph.addEdge("human_agent", "complete");
graph.addEdge("complete", StateGraph.END);

CompileConfig config = CompileConfig.builder()
        .checkpointSaver(checkpointer)
        .interruptBefore("human_agent")
        .build();

CompiledGraph<AgentState> compiled = graph.compile(config);

RunnableConfig runnableConfig = RunnableConfig.builder()
        .threadId("thread-001")
        .build();

// 第一次执行,在 human_agent 前中断
compiled.invoke(
        Map.of("user_question", "如何退款?"),
        runnableConfig
);

// 人工输入后更新状态
Map<String, Object> humanInput = new HashMap<>();
humanInput.put("human_response", "已确认退款条件,允许继续处理。");

RunnableConfig updatedConfig = compiled.updateState(
        runnableConfig,
        humanInput
);

// 从中断位置继续执行
compiled.invoke(null, updatedConfig);

落地 LangGraph 时需要注意的坑

1. State 不要无限膨胀

把所有消息、所有工具结果、所有中间推理都塞进 State,会让上下文越来越大。更好的方式是分层保存:

数据建议
当前任务必要信息放 State
长期用户画像放 Store
调试日志放日志系统
大文件、大文档放对象存储,State 里只放引用
中间推理视情况保留摘要

2. 每个节点要有明确输入输出

节点之间最好传结构化数据,而不是让后续节点从一大段自然语言里猜。

{
    "intent": "book_flight",
    "slots": {
        "from": "北京",
        "to": "深圳",
        "date": "2026-06-10"
    },
    "missing_slots": []
}

结构化状态更容易测试,也更容易做条件边。

3. 高风险工具必须加人工确认

删除数据、执行命令、发起支付、发送邮件、修改配置这类操作,不应该完全交给模型自动执行。比较稳妥的流程是:

flowchart LR
    A[模型生成操作计划] --> B[展示给人工审核]
    B --> C{是否通过}
    C -- 通过 --> D[执行工具]
    C -- 拒绝 --> E[返回修改意见]

4. 对循环设置出口

Agent 循环必须有终止条件,比如:

  • 工具调用次数达到上限。
  • 已经得到最终答案。
  • 用户要求已全部完成。
  • 置信度低时转人工。
  • 超过递归限制直接失败。

5. 多智能体不要一开始就拆太细

Agent 拆分过细会引入额外通信成本,也会增加路由错误概率。比较稳妥的做法是按职责边界拆:

  • 一个 Agent 对应一类工具。
  • 一个 Agent 对应一个业务阶段。
  • 一个 Agent 对应一种专业能力。
  • 不要为了“多智能体”而拆 Agent。

6. 提示词要写终止规则

多智能体系统里,提示词要明确“什么时候停”。例如:

完成工具调用并得到成功结果后,立即向上级 Agent 汇报,不要再次调用同一个工具。
如果用户需求已经全部满足,输出最终总结并结束。
如果缺少必要参数,只询问缺失参数,不要猜测。

7. 生产环境要接入可观测性

至少记录这些信息:

信息用途
thread_id定位一次会话或任务
节点执行顺序排查路由问题
State 快照调试决策过程
工具入参和出参排查工具调用错误
模型输入输出分析提示词和模型行为
中断和恢复记录审计人工参与过程
token 和耗时成本与性能优化

适合从哪里开始用 LangGraph

如果只是做一个简单问答机器人,直接用普通 LangChain Chain 或 ReAct Agent 就够了。出现以下需求时,LangGraph 会更合适:

需求是否适合 LangGraph
固定一次模型调用不一定需要
简单工具调用 Agent可以用预构建 ReAct Agent
多步骤任务编排适合
条件分支和循环适合
需要断点恢复适合
需要人工审批适合
多 Agent 协作适合
需要审计执行过程适合

LangGraph 的核心不是“让 Agent 更神秘”,而是把 Agent 的执行过程工程化:用图表达流程,用 State 管理上下文,用 Reducer 控制状态合并,用 Checkpoint 保证可恢复,用 HIL 控制风险,用多智能体架构拆分复杂职责。对于复杂 LLM 应用来说,这些能力比单纯调用一个更强的模型更关键。


评论