单个 Agent(智能体)适合处理边界清晰、步骤不多的任务,比如查询天气、总结一段文本、调用一个接口完成简单操作。一旦任务变成“先理解需求,再检索资料,再调用多个工具,再根据结果继续决策,必要时还要让人确认”,线性链式调用就会开始变得笨重。
LangGraph 解决的正是这类问题。它把 LLM(Large Language Model,大语言模型)应用的执行过程抽象成一张有向图:节点负责处理任务,边负责决定执行顺序,状态在节点之间流转。这样一来,Agent 不再只是“一步接一步”的链,而可以拥有分支、循环、并行、持久化、人工中断、恢复执行等能力。
一个典型 LangGraph 工作流可以理解成下面这张图:
flowchart LR
START((START)) --> A[解析用户请求]
A --> B{需要调用工具?}
B -- 是 --> C[工具节点]
C --> D[模型总结]
B -- 否 --> D
D --> E{需要人工确认?}
E -- 是 --> F[人工审核]
F --> G[生成最终结果]
E -- 否 --> G
G --> END((END))
LangGraph 适合解决什么问题
传统 LangChain 的 Chain 更像流水线,适合固定步骤:
flowchart LR
A[输入] --> B[Prompt]
B --> C[LLM]
C --> D[解析输出]
但真实业务里的 Agent 往往不是固定流水线,而是会反复判断:
- 当前信息是否足够?
- 是否需要调用工具?
- 工具失败后是否需要重试?
- 是否需要把任务交给另一个 Agent?
- 是否需要人工审批?
- 是否需要从某个历史状态重新执行?
LangGraph 的图结构更适合表达这种非线性流程:
flowchart TD
A[用户请求] --> B[Planner 规划]
B --> C{任务类型}
C -- 查询类 --> D[检索 Agent]
C -- 计算类 --> E[计算 Agent]
C -- 操作类 --> F[执行 Agent]
D --> G[汇总结果]
E --> G
F --> H{高风险操作?}
H -- 是 --> I[人工确认]
I --> G
H -- 否 --> G
G --> J[返回用户]
LangGraph 的核心价值主要体现在几件事上:
| 能力 | 解决的问题 |
|---|---|
| 图结构编排 | 把复杂任务拆成节点,用边表达执行关系 |
| 状态管理 | 所有节点共享同一份 State,避免状态散落在各处 |
| 条件路由 | 根据当前状态决定后续执行哪个节点 |
| 循环执行 | Agent 可以反复思考、调用工具、修正答案 |
| 并行分支 | 多个子任务可以同时执行,再汇总结果 |
| 检查点 | 每一步保存状态,支持恢复、审计和调试 |
| 人机协作 | 在关键节点暂停,等待人工输入后继续 |
| 多智能体 | 多个专业 Agent 可以协作完成复杂任务 |
快速创建一个 ReAct Agent
安装 LangGraph:
pip install -U langgraph
LangGraph 提供了开箱即用的 create_react_agent,可以快速创建一个 ReAct 风格的 Agent。ReAct 指的是 Reasoning + Acting,也就是模型先推理,再决定是否调用工具,然后继续推理。
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.prebuilt import create_react_agent
def get_weather(city: str) -> str:
"""获取指定城市的天气信息"""
return f"{city}今天是晴天,气温 25 摄氏度。"
model = ChatOpenAI(
model="gpt-4o-mini",
base_url="https://api.example.com/v1",
api_key="your-api-key",
)
agent = create_react_agent(
model=model,
tools=[get_weather],
prompt="你是一个天气助手,需要在回答天气问题时调用工具。"
)
result = agent.invoke({
"messages": [
HumanMessage(content="深圳今天的天气怎么样?")
]
})
print(result["messages"][-1].content)
Agent 支持同步和异步两种调用方式:
| 模式 | 方法 |
|---|---|
| 同步一次性返回 | invoke() |
| 同步流式返回 | stream() |
| 异步一次性返回 | ainvoke() |
| 异步流式返回 | astream() |
为了防止 Agent 在“思考—调用工具—再思考”的循环里跑不出来,可以设置递归限制:
result = agent.invoke(
{
"messages": [
{"role": "user", "content": "帮我预订深圳到北京的机票"}
]
},
config={
"recursion_limit": 10
}
)
recursion_limit 控制一次图执行最多走多少步。超过限制后,LangGraph 会抛出 GraphRecursionError,这对生产环境很重要,因为模型偶尔会因为提示词、工具结果或状态设计问题进入循环。
LangGraph 的三个核心元素:State、Node、Edge
LangGraph 工作流由三类核心元素组成:
flowchart LR
S[State 状态] <--> N1[Node 节点 A]
S <--> N2[Node 节点 B]
N1 -- Edge 边 --> N2
- State:贯穿整个工作流的共享状态。
- Node:实际执行逻辑的处理单元。
- Edge:定义节点之间如何流转。
State:工作流的共享数据
State 是整张图运行过程中的状态快照。它可以保存用户消息、工具结果、检索文档、当前步骤、业务变量等数据。
在 Python 里,State 常用 TypedDict 或 Pydantic BaseModel 定义:
from typing import TypedDict
from langgraph.graph import StateGraph
class GraphState(TypedDict):
user_input: str
answer: str
tool_result: dict
builder = StateGraph(GraphState)
节点接收 State,并返回要更新的字段:
def parse_input(state: GraphState) -> dict:
text = state["user_input"]
return {
"answer": f"收到用户输入:{text}"
}
注意,节点通常不应该把数据存在自己的全局变量里。更稳妥的方式是让所有可追踪、可恢复、可调试的数据都进入 State。
Node:图中的执行步骤
Node 是一个函数,也可以是一个 Agent、一个工具调用、一个子图,或者任意业务逻辑。
def input_node(state: GraphState) -> dict:
return {
"tool_result": {
"input_checked": True
}
}
def output_node(state: GraphState) -> dict:
return {
"answer": "处理完成"
}
builder.add_node("input", input_node)
builder.add_node("output", output_node)
如果节点需要额外参数,可以用 functools.partial 绑定:
from functools import partial
def process_node(state: GraphState, retry_count: int, mode: str) -> dict:
return {
"tool_result": {
"mode": mode,
"retry_count": retry_count
}
}
process_with_config = partial(process_node, retry_count=3, mode="strict")
builder.add_node("process", process_with_config)
节点设计建议:
| 原则 | 说明 |
|---|---|
| 单一职责 | 一个节点只做一类事情,比如检索、计算、审核、总结 |
| 无内部状态 | 节点不要依赖隐藏的全局变量,状态应来自 State 或运行时上下文 |
| 可重试 | 同样输入尽量产生同样输出,失败后重跑不会破坏业务数据 |
| 可测试 | 节点最好能脱离整张图单独单元测试 |
| 副作用隔离 | 写数据库、发消息、扣款等操作要特别处理幂等性 |
Edge:控制执行顺序
Edge 定义节点之间的连接关系。
LangGraph 有两个特殊节点:
START:图的入口。END:图的终点。
from langgraph.graph import START, END
builder.add_edge(START, "input")
builder.add_edge("input", "process")
builder.add_edge("process", "output")
builder.add_edge("output", END)
graph = builder.compile()
result = graph.invoke({
"user_input": "你好",
"answer": "",
"tool_result": {}
})
完整执行关系如下:
flowchart LR
START((START)) --> A[input]
A --> B[process]
B --> C[output]
C --> END((END))
compile() 会检查图结构是否合法,比如边指向了不存在的节点,就会在编译阶段报错。
节点重试与缓存
生产环境里的节点经常会调用外部服务,比如搜索、数据库、模型接口、第三方 API。这些调用可能因为网络抖动或限流失败,因此需要重试。
from langgraph.types import RetryPolicy
from requests import RequestException
from timeout_decorator import TimeoutError
retry_policy = RetryPolicy(
max_attempts=3,
initial_interval=1,
backoff_factor=2,
jitter=True,
retry_on=[RequestException, TimeoutError],
)
builder.add_node("call_api", call_api_node, retry=retry_policy)
参数含义:
| 参数 | 作用 |
|---|---|
max_attempts | 最大尝试次数 |
initial_interval | 第一次重试等待时间 |
backoff_factor | 每次重试等待时间的增长倍数 |
jitter | 加入随机抖动,避免大量请求同时重试 |
retry_on | 只对指定异常类型重试 |
对于耗时但结果短时间内稳定的节点,可以使用缓存:
import time
from typing import TypedDict
from langgraph.graph import StateGraph
from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy
class CacheState(TypedDict):
x: int
result: int
def expensive_node(state: CacheState) -> dict:
time.sleep(2)
return {
"result": state["x"] * 2
}
builder = StateGraph(CacheState)
builder.add_node(
"expensive",
expensive_node,
cache_policy=CachePolicy(ttl=60)
)
builder.set_entry_point("expensive")
builder.set_finish_point("expensive")
graph = builder.compile(cache=InMemoryCache())
缓存适合:
| 场景 | 建议 |
|---|---|
| 静态知识查询 | 可以设置较长 TTL |
| 天气、库存、价格 | TTL 要短,避免返回旧数据 |
| 有副作用操作 | 不要随便缓存,比如支付、下单、发邮件 |
| 依赖用户权限的数据 | 缓存键必须包含用户身份或权限信息 |
State 如何合并:Reducer
多个节点都可能更新 State。同一个字段被更新时,LangGraph 需要知道是覆盖、追加,还是用自定义逻辑合并。这个逻辑叫 Reducer。
默认策略:覆盖
如果字段没有指定 Reducer,后写入的值会覆盖旧值:
from typing import TypedDict
class OverrideState(TypedDict):
data: dict
比如节点 A 返回:
{"data": {"step": "A"}}
节点 B 返回:
{"data": {"step": "B"}}
最终 data 会变成:
{"step": "B"}
使用 Annotated 指定合并策略
可以用 typing.Annotated 为字段指定合并函数。常见做法是用 operator.add 实现数字相加、列表追加、字符串拼接。
from typing import TypedDict, Annotated
from operator import add
class AddState(TypedDict):
count: Annotated[int, add]
logs: Annotated[list[str], add]
text: Annotated[str, add]
def node_a(state: AddState) -> dict:
return {
"count": 1,
"logs": ["node_a"],
"text": "hello "
}
def node_b(state: AddState) -> dict:
return {
"count": 2,
"logs": ["node_b"],
"text": "world"
}
执行后:
{
"count": 3,
"logs": ["node_a", "node_b"],
"text": "hello world"
}
add_messages:对话消息专用合并器
对话系统常用 add_messages 管理消息列表。它不是简单追加,而是会根据消息 ID 做合并:
- 新消息 ID 不存在时追加。
- 新消息 ID 已存在时覆盖旧消息。
- 字符串可以自动转换成消息对象。
- 适合多轮对话、工具调用结果、流式消息更新。
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
class MessageState(TypedDict):
messages: Annotated[list, add_messages]
def system_node(state: MessageState) -> dict:
return {
"messages": [
SystemMessage(content="你是 LangGraph 工程助手。")
]
}
def user_node(state: MessageState) -> dict:
return {
"messages": [
HumanMessage(content="LangGraph 的 State 是什么?")
]
}
def answer_node(state: MessageState) -> dict:
return {
"messages": [
AIMessage(content="State 是图执行过程中的共享状态。")
]
}
自定义 Reducer
业务状态经常需要自定义合并逻辑,比如字典合并、保留最大分数、按时间排序、去重追加。
from typing import TypedDict, Annotated
def merge_dict(old: dict, new: dict) -> dict:
merged = old.copy()
merged.update(new)
return merged
def keep_max(old: int, new: int) -> int:
return max(old, new)
class CustomState(TypedDict):
metadata: Annotated[dict, merge_dict]
max_score: Annotated[int, keep_max]
Reducer 的选择会直接影响图的行为。并行节点同时写同一字段时,Reducer 尤其重要,否则状态很容易被意外覆盖。
条件边:根据状态动态选择路径
固定边只能表达确定流程。真实 Agent 经常需要根据状态选择不同节点,比如:
- 用户意图是查订单还是退货?
- 模型置信度是否足够?
- 工具调用是否失败?
- 是否需要人工审核?
条件边通过路由函数实现。
from typing import Literal
from langgraph.graph import StateGraph, START, END
class RouteState(TypedDict):
sentiment: str
result: str
def route_by_sentiment(state: RouteState) -> Literal["positive", "negative", "neutral"]:
return state["sentiment"]
def positive_node(state: RouteState) -> dict:
return {"result": "用户情绪积极"}
def negative_node(state: RouteState) -> dict:
return {"result": "用户情绪消极,需要优先处理"}
def neutral_node(state: RouteState) -> dict:
return {"result": "用户情绪中性"}
builder = StateGraph(RouteState)
builder.add_node("positive_handler", positive_node)
builder.add_node("negative_handler", negative_node)
builder.add_node("neutral_handler", neutral_node)
builder.add_conditional_edges(
START,
route_by_sentiment,
{
"positive": "positive_handler",
"negative": "negative_handler",
"neutral": "neutral_handler",
}
)
builder.add_edge("positive_handler", END)
builder.add_edge("negative_handler", END)
builder.add_edge("neutral_handler", END)
graph = builder.compile()
执行结构如下:
flowchart TD
START((START)) --> R{route_by_sentiment}
R -- positive --> A[positive_handler]
R -- negative --> B[negative_handler]
R -- neutral --> C[neutral_handler]
A --> END((END))
B --> END
C --> END
LangGraph 支持把图导出成 Mermaid 图片,便于检查实际执行结构:
png_data = graph.get_graph().draw_mermaid_png()
with open("graph.png", "wb") as f:
f.write(png_data)
Send:动态并行分发任务
Send 用来动态创建多个执行分支,适合 Map-Reduce 场景。例如用户要求总结 10 篇文档,可以为每篇文档创建一个并行任务,再把结果合并。
flowchart TD
A[生成任务列表] --> B{Send 多个任务}
B --> C1[处理任务 1]
B --> C2[处理任务 2]
B --> C3[处理任务 3]
C1 --> D[汇总结果]
C2 --> D
C3 --> D
示例代码:
from typing import TypedDict, Annotated
from operator import add
from langgraph.types import Send
from langgraph.graph import StateGraph, START, END
class MapReduceState(TypedDict):
tasks: list[str]
results: Annotated[list[str], add]
final_answer: str
def generate_tasks(state: MapReduceState) -> dict:
return {
"tasks": ["文档A", "文档B", "文档C"]
}
def route_tasks(state: MapReduceState) -> list[Send]:
return [
Send("process_task", {"tasks": [task], "results": [], "final_answer": ""})
for task in state["tasks"]
]
def process_task(state: MapReduceState) -> dict:
task = state["tasks"][0]
return {
"results": [f"{task} 的摘要"]
}
def reduce_results(state: MapReduceState) -> dict:
return {
"final_answer": "\n".join(state["results"])
}
builder = StateGraph(MapReduceState)
builder.add_node("generate_tasks", generate_tasks)
builder.add_node("process_task", process_task)
builder.add_node("reduce_results", reduce_results)
builder.add_edge(START, "generate_tasks")
builder.add_conditional_edges("generate_tasks", route_tasks)
builder.add_edge("process_task", "reduce_results")
builder.add_edge("reduce_results", END)
graph = builder.compile()
这里 results 使用 Annotated[list[str], add],因为多个并行分支都会写入结果列表。如果没有合并器,后完成的分支可能覆盖先完成的结果。
Command:同时更新状态和控制路由
条件边只负责决定去哪个节点。Command 更进一步,它可以在节点返回时同时做两件事:
- 更新 State。
- 指定后续跳转节点。
适合以下场景:
- Agent 判断自己无法处理,要移交给专家 Agent。
- 人工输入恢复后,需要跳转到指定节点。
- 子图需要跳回父图。
- 节点内部根据执行结果决定后续路径,同时写入状态。
from typing import TypedDict
from langgraph.types import Command
class AgentState(TypedDict):
messages: list
need_expert: bool
route_reason: str
def triage_agent(state: AgentState) -> Command:
if state["need_expert"]:
return Command(
goto="expert_agent",
update={
"route_reason": "当前问题需要专家 Agent 处理"
}
)
return Command(
goto="normal_agent",
update={
"route_reason": "普通 Agent 可以处理"
}
)
Send 和 Command 的差异可以这样理解:
| 机制 | 主要用途 | 是否更新状态 | 是否适合并行 |
|---|---|---|---|
| 条件边 | 根据条件选择路径 | 否 | 一般不用于动态并行 |
| Send | 动态创建多个分支 | 给目标节点传入局部状态 | 适合 |
| Command | 状态更新 + 路由控制 | 是 | 可以结合 Send 使用 |
检查点:让工作流可以恢复
Agent 工作流可能执行很多步,中间还可能调用外部工具。如果进程重启、服务异常或需要人工确认,不能让整个任务从头再跑。
LangGraph 的检查点机制会在每个步骤后保存状态快照。快照里包含:
- 当前 State 的值。
- 后续要执行的节点。
- 当前配置。
- 元数据。
- 待执行任务。
- 中断信息。
- 错误信息。
sequenceDiagram
participant App as 应用
participant Graph as LangGraph
participant Store as Checkpointer
App->>Graph: invoke(input, thread_id)
Graph->>Graph: 执行节点 A
Graph->>Store: 保存检查点 A
Graph->>Graph: 执行节点 B
Graph->>Store: 保存检查点 B
Graph-->>App: 返回结果或中断信息
内存检查点示例:
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {
"configurable": {
"thread_id": "user-session-001"
}
}
result = graph.invoke(
{"value": 5, "operations": []},
config=config
)
thread_id 是 LangGraph 用来区分会话的 ID,不是操作系统线程 ID。同一个用户的连续对话、同一个长任务的多次恢复,都应该使用同一个 thread_id。
获取历史检查点:
history = list(graph.get_state_history(config))
for index, snapshot in enumerate(history):
print(index, snapshot.next, snapshot.values)
常见 Checkpointer:
| 存储 | 适合场景 |
|---|---|
| 内存 | 本地测试、单进程 Demo |
| SQLite/PostgreSQL | 需要持久化、可审计的业务任务 |
| Redis | 高并发、短期会话、恢复速度要求高 |
| 自定义存储 | 企业内部已有状态存储系统 |
时间旅行:从历史状态重新执行
启用检查点后,LangGraph 可以回到某个历史状态,检查当时的 State,甚至修改状态后从那里重新执行。这种能力通常叫时间旅行。
适用场景:
| 场景 | 用法 |
|---|---|
| 调试 | 回到出错前的状态,重跑后续节点 |
| 审计 | 查看某一步模型看到了什么状态 |
| 分支探索 | 修改历史状态,比较不同执行路径 |
| 人工修正 | 人发现中间结果错了,改完继续跑 |
示例:
checkpoints = list(graph.get_state_history(config))
# 注意:历史列表通常按时间倒序返回,index 0 往往是最新状态
target = checkpoints[2]
graph.update_state(
target.config,
{
"operations": ["人工修正后的操作记录"]
}
)
result = graph.invoke(None, config=target.config)
时间旅行很适合调试 Agent,因为 Agent 的错误很多不是代码异常,而是“某一步模型做了错误判断”。如果没有完整状态记录,只看最终结果很难定位问题。
人机协作:在关键节点暂停
HIL(Human-in-the-Loop,人机协作)是在自动化流程中加入人工确认。常见场景包括:
- 执行命令前让用户确认。
- 下单、付款、删除数据前人工审批。
- AI 置信度低时转人工。
- 生成代码补丁后等待开发者接受。
- 合规审核、风控审核、内容审核。
LangGraph 通过 interrupt() 暂停工作流,再通过 Command(resume=...) 恢复。
from typing import TypedDict, Annotated
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
class ReviewState(TypedDict):
request: str
analysis: str
approved: bool
human_feedback: str
def analyze_node(state: ReviewState) -> dict:
return {
"analysis": f"对请求进行分析:{state['request']}"
}
def human_review_node(state: ReviewState) -> dict:
review_payload = {
"type": "human_review",
"request": state["request"],
"analysis": state["analysis"],
"prompt": "请输入 approve 或 reject,并给出原因"
}
human_input = interrupt(review_payload)
return {
"approved": human_input["decision"] == "approve",
"human_feedback": human_input["feedback"]
}
def route_after_review(state: ReviewState) -> str:
return "execute" if state["approved"] else "reject"
恢复执行:
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {
"configurable": {
"thread_id": "review-task-001"
}
}
# 第一次执行,到 human_review_node 会暂停
result = graph.invoke(
{
"request": "删除生产环境中的过期数据",
"analysis": "",
"approved": False,
"human_feedback": ""
},
config=config
)
# 人工输入来自 Web 页面、后台系统或命令行
resume_cmd = Command(
resume={
"decision": "approve",
"feedback": "确认只删除 30 天前的过期数据"
}
)
final_result = graph.invoke(resume_cmd, config=config)
人机协作的执行过程:
sequenceDiagram
participant User as 用户/审核人
participant App as 业务系统
participant Graph as LangGraph
participant CP as Checkpointer
App->>Graph: invoke(input)
Graph->>Graph: 执行 analyze
Graph->>CP: 保存状态
Graph->>Graph: 执行 human_review
Graph-->>App: 返回 interrupt 信息
App-->>User: 展示待审核内容
User-->>App: 提交审核意见
App->>Graph: invoke(Command(resume=...), same thread_id)
Graph->>Graph: 从 interrupt 位置恢复
Graph->>Graph: 执行后续节点
Graph-->>App: 返回最终结果
一个很容易踩的坑:interrupt() 不是阻塞等待。它会让当前 invoke() 返回中断信息。恢复时,节点函数会从调用 interrupt() 的位置重新进入,因此 interrupt() 之前的代码可能被重复执行。
错误写法:
def review_node(state):
# 恢复执行时可能重复写数据库
db.insert_audit_log(state["request"])
human_input = interrupt({"prompt": "是否通过?"})
return {"approved": human_input["approved"]}
更稳妥的写法是把副作用操作放到独立节点,或者加入幂等键:
def review_node(state):
human_input = interrupt({"prompt": "是否通过?"})
return {"approved": human_input["approved"]}
def write_audit_node(state):
# 使用 request_id 做幂等控制
db.upsert_audit_log(
request_id=state["request_id"],
approved=state["approved"]
)
return {}
记忆:短期记忆和长期记忆
Agent 的记忆可以分成两类:
| 类型 | 生命周期 | 实现方式 | 用途 |
|---|---|---|---|
| 短期记忆 | 单个会话内 | State + Checkpoint | 多轮对话上下文、当前任务状态 |
| 长期记忆 | 跨会话 | Store | 用户偏好、历史资料、长期画像 |
短期记忆
短期记忆依赖 thread_id。同一个 thread_id 下,图可以从之前的消息和状态继续执行。
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {
"configurable": {
"thread_id": "chat-user-001"
}
}
graph.invoke(
{
"messages": [
{"role": "user", "content": "我叫张三"}
]
},
config=config
)
graph.invoke(
{
"messages": [
{"role": "user", "content": "我叫什么?"}
]
},
config=config
)
长期记忆
长期记忆通过 Store 实现。它类似键值数据库,也可以结合向量检索做语义搜索。
from langgraph.store.memory import InMemoryStore
from langgraph.config import get_store
namespace = ("users", "profile")
def write_profile(state: dict) -> dict:
store = get_store()
store.put(
namespace,
"user_123",
{
"name": "张三",
"city": "深圳",
"preference": "喜欢简洁回答"
}
)
return {}
def read_profile(state: dict) -> dict:
store = get_store()
profile = store.get(namespace, "user_123")
search_result = store.search(namespace, query="深圳", limit=10)
return {
"profile": profile.value if profile else None,
"search_result": [item.value for item in search_result]
}
store = InMemoryStore()
graph = builder.compile(store=store)
长期记忆要谨慎设计命名空间,避免不同用户或不同业务的数据互相污染。
子图:把复杂工作流拆成可复用模块
LangGraph 支持把一张完整的图作为另一张图的节点。这个能力适合复杂系统拆分:
flowchart TD
A[父图: 接收用户任务] --> B[子图: 资料检索]
B --> C[子图: 数据分析]
C --> D[子图: 报告生成]
D --> E[父图: 汇总返回]
当父图和子图有同名 State 字段时,这些字段可以共享。
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class ParentState(TypedDict):
messages: list[str]
class SubgraphState(TypedDict):
messages: list[str]
sub_result: str
def sub_node(state: SubgraphState) -> dict:
return {
"messages": state["messages"] + ["子图处理完成"],
"sub_result": "子图结果"
}
sub_builder = StateGraph(SubgraphState)
sub_builder.add_node("sub_node", sub_node)
sub_builder.add_edge(START, "sub_node")
sub_builder.add_edge("sub_node", END)
subgraph = sub_builder.compile()
parent_builder = StateGraph(ParentState)
parent_builder.add_node("subgraph", subgraph)
parent_builder.add_edge(START, "subgraph")
parent_builder.add_edge("subgraph", END)
parent_graph = parent_builder.compile()
如果父图和子图的状态结构不同,可以用代理节点做状态转换:
def call_subgraph(state: ParentState) -> dict:
sub_input = {
"messages": state["messages"],
"sub_result": ""
}
sub_output = subgraph.invoke(sub_input)
return {
"messages": sub_output["messages"]
}
parent_builder.add_node("call_subgraph", call_subgraph)
子图的好处是边界清晰。检索、分析、生成、审核都可以做成独立子图,分别测试,再组合到父图里。
集成 MCP:把外部工具接入 Agent
MCP(Model Context Protocol,模型上下文协议)是一套开放协议,用来规范应用如何向大模型提供工具和上下文。LangGraph 可以通过 langchain-mcp-adapters 使用 MCP Server 暴露的工具。
安装依赖:
pip install langchain-mcp-adapters
创建 MCP 工具服务
from datetime import datetime
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("demo-tools")
@mcp.tool()
def get_weather(location: str) -> str:
"""获取指定地点天气"""
return f"{location}今天晴天"
@mcp.tool()
def get_time() -> str:
"""获取当前时间"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@mcp.tool()
def add(a: int, b: int) -> int:
"""两个整数相加"""
return a + b
@mcp.tool()
def multiply(a: int, b: int) -> int:
"""两个整数相乘"""
return a * b
if __name__ == "__main__":
mcp.run(transport="streamable-http")
在 Agent 中加载 MCP 工具
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
async def build_agent():
client = MultiServerMCPClient(
{
"demo": {
"url": "http://localhost:8000/mcp",
"transport": "streamable_http",
}
}
)
tools = await client.get_tools()
model = ChatOpenAI(
model="gpt-4o-mini",
base_url="https://api.example.com/v1",
api_key="your-api-key"
)
return create_react_agent(model, tools)
async def main():
agent = await build_agent()
result = await agent.ainvoke({
"messages": [
HumanMessage(content="计算 (15 + 7) * 3,并告诉我现在几点")
]
})
print(result["messages"][-1].content)
asyncio.run(main())
MCP 的价值在于工具标准化。不同系统只要按 MCP 暴露工具,Agent 侧就可以统一接入,而不需要为每个工具写一套私有适配逻辑。
运行时上下文:传递不属于 State 的信息
State 适合保存工作流状态,但有些信息不应该进入 State,例如:
- 当前使用的模型供应商。
- 数据库连接。
- 租户 ID。
- 环境配置。
- 日志追踪 ID。
LangGraph 可以定义运行时上下文:
from dataclasses import dataclass
from langgraph.runtime import Runtime
@dataclass
class ContextSchema:
llm_provider: str = "openai"
trace_id: str = ""
def node_a(state: dict, runtime: Runtime[ContextSchema]) -> dict:
provider = runtime.context.llm_provider
trace_id = runtime.context.trace_id
return {
"provider_used": provider,
"trace_id": trace_id
}
builder = StateGraph(dict, context_schema=ContextSchema)
builder.add_node("node_a", node_a)
graph = builder.compile()
graph.invoke(
{"input": "hello"},
context={
"llm_provider": "deepseek",
"trace_id": "trace-001"
}
)
简单判断标准:会影响流程恢复和业务结果的数据放进 State;运行依赖、连接对象、追踪信息放进 Context。
为什么需要多智能体
单 Agent 在简单任务里很好用,但复杂系统会遇到三个典型问题:
- 工具太多:一个 Agent 挂几十个工具后,模型可能选错工具,或者反复调用同一个工具。
- 上下文太长:所有信息都塞进一个 Agent 的消息里,容易超过上下文窗口,也会增加推理干扰。
- 职责太杂:同一个 Agent 既做规划、又做检索、又写代码、又审核安全,提示词会越来越难维护。
多智能体系统把任务拆给多个专业 Agent:
flowchart TD
U[用户任务] --> P[规划 Agent]
P --> R[检索 Agent]
P --> C[代码 Agent]
P --> A[分析 Agent]
R --> S[汇总 Agent]
C --> S
A --> S
S --> U
拆分后的收益更具体:
| 收益 | 说明 |
|---|---|
| 模块化 | 每个 Agent 独立开发、测试、替换 |
| 专业化 | 不同 Agent 使用不同提示词、工具和模型 |
| 可控性 | 可以明确限制 Agent 之间如何通信 |
| 可观测性 | 更容易定位哪个 Agent 做错了 |
| 成本优化 | 简单 Agent 用小模型,关键 Agent 用强模型 |
常见多智能体架构
多智能体系统常见结构如下:
| 架构 | 工作方式 | 适合场景 | 代价 |
|---|---|---|---|
| Network 网络型 | 每个 Agent 都能和其他 Agent 通信 | 高自由度探索、研究型任务 | 路径难预测,调试复杂 |
| Supervisor 主管型 | 中央主管决定调用哪个 Agent | 流程明确、需要稳定编排 | 主管容易成为瓶颈 |
| Supervisor as Tools | 子 Agent 被包装成工具,主管通过工具调用它们 | 接口边界清晰的专业能力调用 | 子 Agent 自主性较弱 |
| Hierarchical 层级型 | 多层 Supervisor 管理多个团队 | 大型复杂任务、企业流程 | 架构设计和状态管理更复杂 |
| Custom 自定义图 | 用 LangGraph 自定义节点和边 | 业务规则强、流程特殊 | 需要更多工程设计 |
结构示意:
flowchart TD
subgraph Supervisor 模式
U1[用户] --> S[主管 Agent]
S --> A1[航班 Agent]
S --> A2[酒店 Agent]
S --> A3[支付 Agent]
end
subgraph Swarm 模式
U2[用户] --> B1[航班 Agent]
B1 <--> B2[酒店 Agent]
B2 <--> B3[行程 Agent]
end
Agent 之间如何通信
多智能体通信主要有两种模式:handoff 和 tool calling。
| 通信方式 | 含义 | 适合场景 |
|---|---|---|
| Handoff 交接 | 一个 Agent 把控制权交给另一个 Agent | 自主协作、流程不固定 |
| Tool Calling 工具调用 | 一个 Agent 把另一个 Agent 当工具调用 | 中央调度、接口明确 |
共享完整历史,还是只共享结果
Agent 之间传递消息时,还有一个关键选择:传全部推理过程,还是只传最终结果。
| 策略 | 优点 | 缺点 |
|---|---|---|
| 共享完整历史 | 其他 Agent 能理解完整上下文和中间步骤 | State 膨胀快,成本高,容易引入噪声 |
| 只共享最终结果 | 状态简洁,边界清楚,便于控制 | 其他 Agent 看不到推理依据,协作能力弱 |
| 共享结构化摘要 | 兼顾信息量和可控性 | 需要设计摘要格式 |
实践中更推荐共享结构化摘要,而不是直接把所有中间消息丢给下一个 Agent。
{
"task": "预订酒店",
"constraints": {
"city": "深圳",
"budget": "500元以内",
"date": "2026-06-10"
},
"previous_result": {
"flight": "北京到深圳航班已预订"
}
}
Supervisor:集中调度的多智能体架构
Supervisor 架构里,中央主管 Agent 负责理解用户任务、拆分任务、选择工作 Agent、整合结果。
sequenceDiagram
participant U as 用户
participant S as Supervisor
participant F as 航班 Agent
participant H as 酒店 Agent
U->>S: 帮我订机票和酒店
S->>F: 处理航班预订
F-->>S: 航班预订成功
S->>H: 处理酒店预订
H-->>S: 酒店预订成功
S-->>U: 汇总预订结果
安装:
pip install langgraph-supervisor
示例:
from langgraph.prebuilt import create_react_agent
from langgraph_supervisor import create_supervisor
def book_flight(from_airport: str, to_airport: str) -> str:
"""预订航班"""
return f"已预订从 {from_airport} 到 {to_airport} 的航班。"
def book_hotel(hotel_name: str) -> str:
"""预订酒店"""
return f"已预订 {hotel_name}。"
flight_agent = create_react_agent(
model=model,
tools=[book_flight],
name="flight_agent",
prompt=(
"你是航班预订助手,只处理航班预订。"
"提取出发地和目的地后调用 book_flight。"
"完成后向主管汇报结果,不要重复调用工具。"
)
)
hotel_agent = create_react_agent(
model=model,
tools=[book_hotel],
name="hotel_agent",
prompt=(
"你是酒店预订助手,只处理酒店预订。"
"如果用户没有指定酒店,选择经济型酒店。"
"完成后向主管汇报结果,不要重复调用工具。"
)
)
workflow = create_supervisor(
agents=[flight_agent, hotel_agent],
model=model,
prompt=(
"你是任务调度主管。"
"用户需要航班时调用 flight_agent,用户需要酒店时调用 hotel_agent。"
"每个任务完成一次即可,不要重复调用。"
"所有任务完成后汇总结果并结束。"
)
).compile()
result = workflow.invoke({
"messages": [
{
"role": "user",
"content": "帮我预订北京到深圳的机票,再订一个酒店"
}
]
})
Supervisor 可以控制工作 Agent 的消息输出模式:
# 保留工作 Agent 的完整消息历史
workflow = create_supervisor(
agents=[agent1, agent2],
model=model,
output_mode="full_history"
)
# 只保留工作 Agent 的最后一条消息
workflow = create_supervisor(
agents=[agent1, agent2],
model=model,
output_mode="last_message"
)
多层 Supervisor 可以组成层级架构:
research_team = create_supervisor(
agents=[search_agent, math_agent],
model=model,
supervisor_name="research_supervisor"
).compile(name="research_team")
writing_team = create_supervisor(
agents=[outline_agent, editor_agent],
model=model,
supervisor_name="writing_supervisor"
).compile(name="writing_team")
top_supervisor = create_supervisor(
agents=[research_team, writing_team],
model=model,
supervisor_name="top_supervisor"
).compile(name="top_supervisor")
给 Supervisor 添加短期记忆和长期记忆:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore
checkpointer = InMemorySaver()
store = InMemoryStore()
workflow = create_supervisor(
agents=[flight_agent, hotel_agent],
model=model
).compile(
checkpointer=checkpointer,
store=store
)
Supervisor 适合流程边界比较明确、需要集中管控的任务,比如客服工单流转、企业审批、运维自动化、金融风控分析。
Swarm:去中心化交接的多智能体架构
Swarm 架构没有固定主管。每个 Agent 都可以根据任务状态决定是否把控制权交给另一个 Agent。
flowchart LR
U[用户] --> F[航班 Agent]
F -- 需要酒店 --> H[酒店 Agent]
H -- 需要行程规划 --> T[行程 Agent]
T -- 需要补充航班 --> F
安装:
pip install langgraph-swarm
示例:
from langgraph.prebuilt import create_react_agent
from langgraph_swarm import create_swarm, create_handoff_tool
from langchain_core.messages import HumanMessage
def book_flight(from_airport: str, to_airport: str) -> str:
"""预订航班"""
return f"已预订从 {from_airport} 到 {to_airport} 的航班。"
def book_hotel(hotel_name: str) -> str:
"""预订酒店"""
return f"已预订 {hotel_name}。"
transfer_to_hotel = create_handoff_tool(
agent_name="hotel_agent",
description="用户需要预订酒店时,把任务交给酒店 Agent。"
)
transfer_to_flight = create_handoff_tool(
agent_name="flight_agent",
description="用户需要预订航班时,把任务交给航班 Agent。"
)
flight_agent = create_react_agent(
model=model,
tools=[book_flight, transfer_to_hotel],
name="flight_agent",
prompt=(
"你是航班预订助手。"
"需要订航班时调用 book_flight。"
"如果用户还需要酒店,完成航班后调用 transfer_to_hotel。"
)
)
hotel_agent = create_react_agent(
model=model,
tools=[book_hotel, transfer_to_flight],
name="hotel_agent",
prompt=(
"你是酒店预订助手。"
"需要订酒店时调用 book_hotel。"
"如果用户还需要航班,调用 transfer_to_flight。"
"所有需求完成后向用户确认。"
)
)
swarm = create_swarm(
agents=[flight_agent, hotel_agent],
default_active_agent="flight_agent"
).compile()
result = swarm.invoke({
"messages": [
HumanMessage(content="帮我预订北京到上海的航班,并预订如家酒店")
]
})
Swarm 也支持短期记忆和长期记忆:
checkpointer = InMemorySaver()
store = InMemoryStore()
swarm = create_swarm(
agents=[flight_agent, hotel_agent],
default_active_agent="flight_agent"
).compile(
checkpointer=checkpointer,
store=store
)
Supervisor 和 Swarm 的选择可以参考这张表:
| 对比项 | Supervisor | Swarm |
|---|---|---|
| 控制方式 | 中央主管统一调度 | Agent 之间自主交接 |
| 可预测性 | 更强 | 较弱 |
| 灵活性 | 中等 | 更强 |
| 调试难度 | 相对低 | 相对高 |
| 适合任务 | 企业流程、审批、客服、运维 | 探索型任务、开放协作、复杂咨询 |
| 主要风险 | 主管判断错误会影响全局 | 可能来回交接或循环 |
复杂系统可以混合使用:关键路径用 Supervisor 管控,开放探索环节用 Swarm 协作。
自定义 handoff:控制交接时传什么状态
handoff 的核心是两件事:
- 目标 Agent 是谁。
- 给目标 Agent 传什么状态。
可以自己实现交接工具,把任务描述和当前 State 转成目标 Agent 的输入。
from typing import Annotated
from langchain_core.tools import tool
from langgraph.graph import MessagesState
from langgraph.prebuilt import InjectedState
from langgraph.types import Command, Send
def create_task_handoff_tool(agent_name: str, description: str):
tool_name = f"transfer_to_{agent_name}"
@tool(tool_name, description=description)
def handoff_tool(
task_description: Annotated[str, "给下一个 Agent 的任务说明"],
state: Annotated[MessagesState, InjectedState],
) -> Command:
agent_input = {
**state,
"messages": [
{
"role": "user",
"content": task_description
}
]
}
return Command(
goto=[Send(agent_name, agent_input)],
graph=Command.PARENT,
)
return handoff_tool
使用自定义交接工具:
transfer_to_hotel = create_task_handoff_tool(
agent_name="hotel_agent",
description="把任务交给酒店预订 Agent"
)
transfer_to_flight = create_task_handoff_tool(
agent_name="flight_agent",
description="把任务交给航班预订 Agent"
)
自定义 handoff 适合需要强约束输入格式的场景。比如只允许传结构化 JSON,不允许把完整聊天历史交给下一个 Agent。
多智能体系统的稳定性问题
多智能体示例跑通不难,跑稳比较难。常见问题包括:
| 问题 | 原因 | 处理方式 |
|---|---|---|
| 重复调用工具 | 提示词没有明确终止条件,状态没有记录任务完成 | 在 State 里记录 completed tasks,提示词要求完成后停止 |
| Agent 来回交接 | handoff 工具描述太宽泛,路由条件不清晰 | 明确每个 Agent 的职责边界 |
| 上下文膨胀 | 全量共享历史 | 只共享结构化摘要或最后结果 |
| 工具选错 | 工具太多、描述相似 | 拆分 Agent,减少单个 Agent 的工具数量 |
| 死循环 | 条件边没有出口,模型反复规划 | 设置 recursion_limit,增加终止节点 |
| 恢复后副作用重复 | interrupt 前执行了写操作 | 副作用独立节点化,或者用幂等键 |
| 成本过高 | 每个 Agent 都用大模型 | 按任务难度选择不同模型 |
提示词也要工程化。不要只写“你是一个很专业的助手”,而要写清楚:
- 这个 Agent 负责什么。
- 不负责什么。
- 什么时候调用工具。
- 工具最多调用几次。
- 完成任务后如何汇报。
- 什么条件下必须结束。
- 什么条件下移交给其他 Agent。
Java 生态:LangChain4J 与 LangGraph4J
Python 生态适合快速实验,但很多企业后端系统以 Java 为主。Java 项目可以用 LangChain4J 调用模型、工具、记忆和防护机制,再用 LangGraph4J 编排复杂工作流。
Maven 依赖
<!-- LangChain4J 核心包 -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j</artifactId>
<version>1.6.0</version>
</dependency>
<!-- OpenAI 兼容接口 -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-open-ai</artifactId>
<version>1.2.0</version>
</dependency>
<!-- LangGraph4J -->
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-core</artifactId>
<version>1.5.2</version>
</dependency>
用 LangChain4J 调用大模型
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.model.openai.OpenAiChatModel;
import dev.langchain4j.data.message.SystemMessage;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.data.message.AiMessage;
import java.time.Duration;
public class ChatDemo {
public static void main(String[] args) {
ChatModel chatModel = OpenAiChatModel.builder()
.baseUrl("https://api.example.com/v1")
.apiKey("your-api-key")
.modelName("gpt-4o-mini")
.timeout(Duration.ofSeconds(60))
.maxRetries(3)
.temperature(0.7)
.logRequests(true)
.logResponses(true)
.build();
SystemMessage systemMessage = SystemMessage.from(
"你是 LangChain 和 LangGraph 工程助手。"
);
UserMessage userMessage = UserMessage.from(
"介绍一下 LangGraph 的核心概念。"
);
ChatResponse response = chatModel.chat(systemMessage, userMessage);
AiMessage aiMessage = response.aiMessage();
System.out.println(aiMessage.text());
}
}
提示词模板
import dev.langchain4j.model.input.PromptTemplate;
import java.util.Map;
PromptTemplate template = PromptTemplate.from(
"你是一个{{domain}}领域专家,请回答:{{question}}"
);
String prompt = template.apply(Map.of(
"domain", "LangGraph",
"question", "State、Node、Edge 分别是什么?"
)).text();
chatModel.chat(prompt);
AI Service:把模型能力封装成 Java 接口
AI Service 让模型调用看起来像普通 Java Service。
import dev.langchain4j.service.AiServices;
import dev.langchain4j.service.SystemMessage;
import dev.langchain4j.service.UserMessage;
import dev.langchain4j.service.V;
interface GraphAssistant {
@SystemMessage("你是 LangGraph 专家。")
@UserMessage("请回答用户问题:{{question}}")
String answer(@V("question") String question);
}
public class AiServiceDemo {
public static void main(String[] args) {
GraphAssistant assistant = AiServices.create(
GraphAssistant.class,
chatModel
);
String answer = assistant.answer("LangGraph 的 Reducer 有什么用?");
System.out.println(answer);
}
}
添加短期记忆
import dev.langchain4j.memory.chat.MessageWindowChatMemory;
GraphAssistant assistant = AiServices.builder(GraphAssistant.class)
.chatModel(chatModel)
.chatMemory(MessageWindowChatMemory.withMaxMessages(10))
.build();
使用工具
import dev.langchain4j.agent.tool.Tool;
import dev.langchain4j.agent.tool.P;
public class StockTools {
@Tool("查询公司股价")
public String getStockPrice(@P("公司名称") String company) {
return company + " 当前股价为 100";
}
}
StockAssistant assistant = AiServices.builder(StockAssistant.class)
.chatModel(chatModel)
.tools(new StockTools())
.build();
Guardrail:输入输出防护
Guardrail 用来在模型调用前后做安全控制:
| 类型 | 执行时机 | 典型用途 |
|---|---|---|
| 输入 Guardrail | 用户输入进入模型前 | 敏感词拦截、提示注入检测、格式校验 |
| 输出 Guardrail | 模型结果返回用户前 | 内容安全、JSON 校验、结果重试、格式修正 |
示例:
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.guardrail.InputGuardrail;
import dev.langchain4j.guardrail.InputGuardrailResult;
import dev.langchain4j.guardrail.OutputGuardrail;
import dev.langchain4j.guardrail.OutputGuardrailResult;
import java.util.Set;
public class SensitiveInputGuardrail implements InputGuardrail {
private static final Set<String> SENSITIVE_WORDS =
Set.of("攻击", "作弊", "开挂");
@Override
public InputGuardrailResult validate(UserMessage userMessage) {
String input = userMessage.singleText();
for (String word : SENSITIVE_WORDS) {
if (input.contains(word)) {
return fatal("请求包含违规内容,已拦截。");
}
}
return InputGuardrailResult.success();
}
}
public class SafetyOutputGuardrail implements OutputGuardrail {
@Override
public OutputGuardrailResult validate(AiMessage aiMessage) {
String output = aiMessage.text();
if (output.contains("不安全内容")) {
return retry("请用安全、中立、合规的方式重新回答。");
}
return OutputGuardrailResult.success();
}
}
应用到 AI Service:
import dev.langchain4j.service.guardrail.InputGuardrails;
import dev.langchain4j.service.guardrail.OutputGuardrails;
@InputGuardrails(SensitiveInputGuardrail.class)
@OutputGuardrails(SafetyOutputGuardrail.class)
interface SafeAssistant {
String chat(String userMessage);
}
多模态调用
如果模型支持图像输入,可以构造包含文本和图片的消息。
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.data.message.TextContent;
import dev.langchain4j.data.message.ImageContent;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Base64;
byte[] imageBytes = Files.readAllBytes(Paths.get("demo.png"));
String base64 = Base64.getEncoder().encodeToString(imageBytes);
UserMessage message = UserMessage.from(
TextContent.from("描述图片内容"),
ImageContent.from(base64, "image/png")
);
chatModel.chat(message);
用 LangGraph4J 构建工作流
LangGraph4J 的概念和 Python 版一致:State、Node、Edge、条件边、检查点、人机协作,只是 API 写法换成 Java。
创建 Node、Edge、State
import org.bsc.langgraph4j.StateGraph;
import org.bsc.langgraph4j.CompiledGraph;
import org.bsc.langgraph4j.GraphStateException;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
public class GraphDemo {
public static void main(String[] args) throws GraphStateException {
StateGraph<AgentState> graph = new StateGraph<>(AgentState::new);
graph.addNode("input_node", AsyncNodeAction.node_async(state -> {
System.out.println("[input_node] state = " + state.data());
return Map.of("input_checked", true);
}));
graph.addNode("process_node", AsyncNodeAction.node_async(state -> {
System.out.println("[process_node] state = " + state.data());
return Map.of("processed", true);
}));
graph.addEdge(StateGraph.START, "input_node");
graph.addEdge("input_node", "process_node");
graph.addEdge("process_node", StateGraph.END);
CompiledGraph<AgentState> compiled = graph.compile();
Map<String, Object> input = new HashMap<>();
input.put("user_input", "hello");
Optional<AgentState> result = compiled.invoke(input);
result.ifPresent(state -> System.out.println(state.data()));
}
}
Channels:Java 版状态合并策略
LangGraph4J 通过 Channels 定义字段合并规则,作用类似 Python 版 Reducer。
import org.bsc.langgraph4j.state.Channel;
import org.bsc.langgraph4j.state.Channels;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
Map<String, Channel<?>> channels = new LinkedHashMap<>();
// 列表追加
channels.put("messages", Channels.appender(ArrayList::new));
// 数字累加
channels.put("counter", Channels.base(Integer::sum, () -> 0));
// 保留最大值
channels.put("max_score", Channels.base(Math::max, () -> 0));
StateGraph<AgentState> graph = new StateGraph<>(channels, AgentState::new);
条件边
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Map<String, String> mapping = new HashMap<>();
mapping.put("pass", "pass_handler");
mapping.put("fail", "fail_handler");
graph.addConditionalEdges(
"score_node",
state -> {
int score = (Integer) state.value("score").orElse(0);
String route = score >= 90 ? "pass" : "fail";
return CompletableFuture.completedFuture(route);
},
mapping
);
检查点
import org.bsc.langgraph4j.checkpoint.MemorySaver;
import org.bsc.langgraph4j.CompileConfig;
MemorySaver checkpoint = new MemorySaver();
CompileConfig config = CompileConfig.builder()
.checkpointSaver(checkpoint)
.build();
CompiledGraph<AgentState> compiled = graph.compile(config);
compiled.invoke(new HashMap<>());
Java 版人机协作
LangGraph4J 可以在指定节点前中断,等待人工输入后继续执行。
import org.bsc.langgraph4j.RunnableConfig;
MemorySaver checkpointer = new MemorySaver();
StateGraph<AgentState> graph = new StateGraph<>(AgentState::new);
graph.addNode("receive_question", AsyncNodeAction.node_async(state -> {
return Map.of(
"status", "received",
"timestamp", System.currentTimeMillis()
);
}));
graph.addNode("ai_answer", AsyncNodeAction.node_async(state -> {
return Map.of(
"status", "ai_answered",
"ai_response", "需要人工处理",
"confidence", 40
);
}));
graph.addNode("human_agent", AsyncNodeAction.node_async(state -> {
return Map.of(
"human_response", "人工回复内容",
"handled_by", "human",
"status", "human_handled"
);
}));
graph.addNode("complete", AsyncNodeAction.node_async(state -> {
return Map.of(
"status", "completed",
"completion_time", System.currentTimeMillis()
);
}));
graph.addEdge(StateGraph.START, "receive_question");
graph.addEdge("receive_question", "ai_answer");
graph.addConditionalEdges(
"ai_answer",
state -> {
int confidence = (Integer) state.value("confidence").orElse(0);
return CompletableFuture.completedFuture(
confidence < 60 ? "human" : "complete"
);
},
Map.of(
"human", "human_agent",
"complete", "complete"
)
);
graph.addEdge("human_agent", "complete");
graph.addEdge("complete", StateGraph.END);
CompileConfig config = CompileConfig.builder()
.checkpointSaver(checkpointer)
.interruptBefore("human_agent")
.build();
CompiledGraph<AgentState> compiled = graph.compile(config);
RunnableConfig runnableConfig = RunnableConfig.builder()
.threadId("thread-001")
.build();
// 第一次执行,在 human_agent 前中断
compiled.invoke(
Map.of("user_question", "如何退款?"),
runnableConfig
);
// 人工输入后更新状态
Map<String, Object> humanInput = new HashMap<>();
humanInput.put("human_response", "已确认退款条件,允许继续处理。");
RunnableConfig updatedConfig = compiled.updateState(
runnableConfig,
humanInput
);
// 从中断位置继续执行
compiled.invoke(null, updatedConfig);
落地 LangGraph 时需要注意的坑
1. State 不要无限膨胀
把所有消息、所有工具结果、所有中间推理都塞进 State,会让上下文越来越大。更好的方式是分层保存:
| 数据 | 建议 |
|---|---|
| 当前任务必要信息 | 放 State |
| 长期用户画像 | 放 Store |
| 调试日志 | 放日志系统 |
| 大文件、大文档 | 放对象存储,State 里只放引用 |
| 中间推理 | 视情况保留摘要 |
2. 每个节点要有明确输入输出
节点之间最好传结构化数据,而不是让后续节点从一大段自然语言里猜。
{
"intent": "book_flight",
"slots": {
"from": "北京",
"to": "深圳",
"date": "2026-06-10"
},
"missing_slots": []
}
结构化状态更容易测试,也更容易做条件边。
3. 高风险工具必须加人工确认
删除数据、执行命令、发起支付、发送邮件、修改配置这类操作,不应该完全交给模型自动执行。比较稳妥的流程是:
flowchart LR
A[模型生成操作计划] --> B[展示给人工审核]
B --> C{是否通过}
C -- 通过 --> D[执行工具]
C -- 拒绝 --> E[返回修改意见]
4. 对循环设置出口
Agent 循环必须有终止条件,比如:
- 工具调用次数达到上限。
- 已经得到最终答案。
- 用户要求已全部完成。
- 置信度低时转人工。
- 超过递归限制直接失败。
5. 多智能体不要一开始就拆太细
Agent 拆分过细会引入额外通信成本,也会增加路由错误概率。比较稳妥的做法是按职责边界拆:
- 一个 Agent 对应一类工具。
- 一个 Agent 对应一个业务阶段。
- 一个 Agent 对应一种专业能力。
- 不要为了“多智能体”而拆 Agent。
6. 提示词要写终止规则
多智能体系统里,提示词要明确“什么时候停”。例如:
完成工具调用并得到成功结果后,立即向上级 Agent 汇报,不要再次调用同一个工具。
如果用户需求已经全部满足,输出最终总结并结束。
如果缺少必要参数,只询问缺失参数,不要猜测。
7. 生产环境要接入可观测性
至少记录这些信息:
| 信息 | 用途 |
|---|---|
| thread_id | 定位一次会话或任务 |
| 节点执行顺序 | 排查路由问题 |
| State 快照 | 调试决策过程 |
| 工具入参和出参 | 排查工具调用错误 |
| 模型输入输出 | 分析提示词和模型行为 |
| 中断和恢复记录 | 审计人工参与过程 |
| token 和耗时 | 成本与性能优化 |
适合从哪里开始用 LangGraph
如果只是做一个简单问答机器人,直接用普通 LangChain Chain 或 ReAct Agent 就够了。出现以下需求时,LangGraph 会更合适:
| 需求 | 是否适合 LangGraph |
|---|---|
| 固定一次模型调用 | 不一定需要 |
| 简单工具调用 Agent | 可以用预构建 ReAct Agent |
| 多步骤任务编排 | 适合 |
| 条件分支和循环 | 适合 |
| 需要断点恢复 | 适合 |
| 需要人工审批 | 适合 |
| 多 Agent 协作 | 适合 |
| 需要审计执行过程 | 适合 |
LangGraph 的核心不是“让 Agent 更神秘”,而是把 Agent 的执行过程工程化:用图表达流程,用 State 管理上下文,用 Reducer 控制状态合并,用 Checkpoint 保证可恢复,用 HIL 控制风险,用多智能体架构拆分复杂职责。对于复杂 LLM 应用来说,这些能力比单纯调用一个更强的模型更关键。