芥末
发布于 2025-09-23 / 0 阅读
0
0

用 LangGraph 搭建 Claude Code 风格的代码智能体:ReAct、SubAgent、Todo 与上下文压缩

Claude Code 是 Anthropic 推出的 CLI(Command Line Interface,命令行接口)代码智能体。它能读文件、改代码、执行命令、拆解任务,还会在危险操作前向用户确认。这样的工具看起来复杂,但核心并不是某个神秘的大模型技巧,而是一套 Agent 工程设计:状态管理、工具调用、人工介入、子任务隔离、任务列表、上下文压缩和流式交互。

LangGraph 很适合复刻这类代码智能体。它不像高层 Agent 框架那样把流程封装得很死,而是把智能体拆成几个可控的概念:

概念作用
State保存对话消息、任务列表、压缩记录等运行状态
Node图中的执行节点,例如调用 LLM、执行工具、人工确认、上下文压缩
Edge节点之间的跳转关系,可以是固定跳转,也可以按状态动态路由
Tool暴露给 LLM(Large Language Model,大语言模型)的能力,例如读文件、写文件、执行命令、创建 SubAgent
Checkpointer保存图运行过程中的检查点,用于中断恢复和多轮会话

一个 Claude Code 风格的智能体,可以按下面这条路线逐步搭起来:

flowchart TD
    A[基础 ReAct Agent] --> B[人工确认与检查点恢复]
    B --> C[TaskTool 创建 SubAgent]
    C --> D[TodoRead / TodoWrite 任务跟踪]
    D --> E[上下文压缩]
    E --> F[流式输出与实时中断恢复]

从最小 ReAct Agent 开始

ReAct Agent 的基本循环很简单:模型先思考并决定是否调用工具;如果要调用工具,就执行工具并把结果放回消息列表;模型看到工具结果后继续生成,直到不再需要工具。

flowchart LR
    START([START]) --> LLM[调用 LLM]
    LLM -->|有 tool_calls| TOOLS[执行工具]
    TOOLS --> LLM
    LLM -->|无 tool_calls| END([END])

用 LangGraphJS 写出来,大致是这样的:

import { Annotation, END, START, StateGraph } from "@langchain/langgraph";
import { ToolNode } from "@langchain/langgraph/prebuilt";
import type { BaseMessage } from "@langchain/core/messages";

const agentState = Annotation.Root({
  messages: Annotation<BaseMessage[]>({
    reducer: safeMessagesStateReducer,
    default: () => [],
  }),
});

const toolNode = new ToolNode(tools);

const shouldContinue = (state: typeof agentState.State) => {
  const { messages } = state;
  const lastMessage = messages[messages.length - 1];

  if (
    "tool_calls" in lastMessage &&
    Array.isArray(lastMessage.tool_calls) &&
    lastMessage.tool_calls.length > 0
  ) {
    return "tools";
  }

  return END;
};

const callModel = async (state: typeof agentState.State) => {
  const response = await modelWithTools.invoke(state.messages);

  return {
    messages: [response],
  };
};

const workflow = new StateGraph(agentState)
  .addNode("llm", callModel)
  .addNode("tools", toolNode)
  .addEdge(START, "llm")
  .addConditionalEdges("llm", shouldContinue, ["tools", END])
  .addEdge("tools", "llm");

const agent = workflow.compile();

这里已经出现了 LangGraph 最重要的三个东西:

  • Statemessages 是整个 Agent 的上下文。
  • Nodellmtools 都是图节点。
  • Edge:模型输出决定下一步进入工具节点还是结束。

如果需要把运行过程实时推给前端,可以使用 streamEvents

const config = {
  configurable: {
    thread_id: sessionId,
  },
  streamMode: ["values", "messages", "updates"],
  version: "v2" as const,
};

const stream = agent.streamEvents(
  {
    messages: [inputMessage],
  },
  config,
);

for await (const event of stream) {
  // 根据 event 类型推送给前端
}

这个基础版本已经能完成工具调用,但离代码智能体还差几个关键能力:权限确认、长任务拆解、任务跟踪、上下文管理和可中断交互。

人工确认:让危险操作进入 Human-in-the-loop

代码智能体经常需要读写文件、执行命令。读文件通常风险较低,写文件和执行 shell 命令风险更高,因此在真正执行工具之前加入人工确认是必要的。

典型流程是:

flowchart LR
    START([START]) --> AGENT[Agent 判断下一步]
    AGENT -->|无需工具| END([END])
    AGENT -->|需要工具| REVIEW[人工确认]
    REVIEW -->|同意| TOOLS[执行工具]
    REVIEW -->|拒绝或修改| AGENT
    TOOLS --> AGENT

固定人工确认节点

可以在图里加入一个 human_review 节点,让所有需要审查的工具调用都先经过它。

import { Command, interrupt } from "@langchain/langgraph";
import { HumanMessage } from "@langchain/core/messages";

const humanReviewNode = async (state: typeof agentState.State) => {
  const lastMessage = state.messages[state.messages.length - 1];

  const answer = interrupt({
    question: "确认是否执行该工具调用?",
    toolCall: lastMessage,
    options: ["approve", "reject", "modify"],
  });

  if (answer.type === "approve") {
    return new Command({
      goto: "tools",
      update: {
        messages: [new HumanMessage("用户已确认执行该工具调用。")],
      },
    });
  }

  if (answer.type === "reject") {
    return new Command({
      goto: "llm",
      update: {
        messages: [
          new HumanMessage(
            `用户拒绝执行该工具调用,原因:${answer.reason ?? "未提供"}`,
          ),
        ],
      },
    });
  }

  return new Command({
    goto: "llm",
    update: {
      messages: [
        new HumanMessage(`用户修改了要求:${answer.content}`),
      ],
    },
  });
};

图结构可以改成:

const workflowWithReview = new StateGraph(agentState)
  .addNode("llm", callModel)
  .addNode("human_review", humanReviewNode)
  .addNode("tools", toolNode)
  .addEdge(START, "llm")
  .addConditionalEdges("llm", shouldContinueWithReview, [
    "human_review",
    END,
  ])
  .addEdge("human_review", "tools")
  .addEdge("tools", "llm");

这里有两个 LangGraph API 很关键。

interrupt 用于暂停图运行。它会让当前执行停在这个节点,等待外部输入。暂停时需要配合检查点,否则服务重启或者请求打到另一台机器后,状态就丢了。

Command 用于在节点内部动态指定跳转目标。普通 edge 是预先定义好的固定边,而 Command 可以根据用户输入、模型输出或者业务状态决定下一步去哪。

让模型主动请求人工协助

人工确认不一定只用于工具权限。模型在任务不清楚、需求有歧义、需要用户补充信息时,也应该能主动提问。做法是把“询问用户”伪装成一个工具交给模型。

import { tool } from "@langchain/core/tools";
import { ToolMessage } from "@langchain/core/messages";
import { getCurrentTaskInput, Command } from "@langchain/langgraph";
import { z } from "zod";

function createAskHumanTool() {
  const executor = async (
    args: { question: string },
    config: any,
  ) => {
    const state = getCurrentTaskInput() as typeof agentState.State;

    const toolResult = new ToolMessage({
      content: `已发起人工询问:${args.question}`,
      name: "askHuman",
      tool_call_id: config.toolCall.id,
    });

    return new Command({
      graph: Command.PARENT,
      goto: "askHuman",
      update: {
        messages: state.messages.concat(toolResult),
      },
    });
  };

  return tool(executor, {
    name: "askHuman",
    description:
      "当需求不明确、需要用户确认方案、需要用户补充信息时,调用该工具向用户提问。",
    schema: z.object({
      question: z.string().describe("需要询问用户的问题"),
    }),
  });
}

这个设计的关键点是:模型只知道自己调用了一个工具,但工具内部并不执行普通业务逻辑,而是跳转到人工交互节点。为了保持消息链完整,还需要补一个 ToolMessage,否则后续模型可能会看到缺失的工具调用结果。

检查点持久化

只要有人工中断,就一定要保存检查点。单机 demo 可以用 MemorySaver,生产环境更适合 Redis、MongoDB、PostgreSQL 这类外部存储。

import { MemorySaver } from "@langchain/langgraph";

const app = workflowWithReview.compile({
  checkpointer: new MemorySaver(),
});

const config = {
  configurable: {
    thread_id: "review-session-1",
  },
};

await app.invoke(
  {
    messages: [inputMessage],
  },
  config,
);

// 用户确认后继续
await app.invoke(null, config);

如果自己实现 Redis Checkpointer,核心就是实现几类读写方法:

方法作用
put保存某个检查点
putWrites保存节点运行过程中的中间写入
getTuple根据 thread_id 等信息读取检查点及其元数据
list按条件列出检查点历史,便于调试和回放

人工确认、流式中断、长任务恢复都依赖这套能力。

SubAgent:用 TaskTool 拆出隔离执行环境

复杂代码任务通常不适合全部塞进一个主 Agent。比如“分析整个项目的架构并找出性能问题”,主 Agent 可以创建一个专门的代码分析 SubAgent,让它独立搜索文件、读取代码、产出结论,再把结果交回主 Agent 汇总。

整体结构可以这样理解:

flowchart TD
    USER[用户请求] --> MAIN[主 Agent]
    MAIN -->|调用 TaskTool| TASK[TaskTool]
    TASK --> SA1[general-purpose SubAgent]
    TASK --> SA2[code-analyzer SubAgent]
    TASK --> SA3[document-writer SubAgent]
    SA1 --> RESULT[子任务结果]
    SA2 --> RESULT
    SA3 --> RESULT
    RESULT --> MAIN
    MAIN --> USER

SubAgent 设计有几个原则:

设计点说明
入口收敛只允许主 Agent 通过 TaskTool 创建 SubAgent
上下文隔离每个 SubAgent 使用独立 messages,不继承主 Agent 的完整上下文
工具隔离不同类型 SubAgent 只拿到允许使用的工具
防递归SubAgent 默认不能再调用 TaskTool,避免无限创建
结果回传SubAgent 返回结构化结果,由主 Agent 负责解释和汇总

SubAgent 配置

先定义几类 SubAgent:

interface SubAgentConfig {
  type: "general-purpose" | "code-analyzer" | "document-writer";
  systemPrompt: string;
  allowedTools: string[] | null;
}

const subAgentConfigs: SubAgentConfig[] = [
  {
    type: "general-purpose",
    systemPrompt: `
你是一个通用任务 Agent,适合处理复杂搜索、内容分析和多步骤任务。
处理问题时要拆解步骤,给出清晰、可验证的结论。
    `.trim(),
    allowedTools: null,
  },
  {
    type: "code-analyzer",
    systemPrompt: `
你是代码分析 Agent,重点关注:
- 代码质量问题
- 架构设计问题
- 性能瓶颈
- 安全风险
输出必须具体到文件、函数或代码片段,并给出可执行修改建议。
    `.trim(),
    allowedTools: ["Read", "Grep", "Glob", "LS"],
  },
  {
    type: "document-writer",
    systemPrompt: `
你是技术文档 Agent,重点生成:
- API 文档
- 使用说明
- 项目结构说明
- 面向开发者的教程
文档要结构清晰,并尽量包含示例。
    `.trim(),
    allowedTools: ["Read", "Write", "Edit"],
  },
];

TaskTool 实现

TaskTool 本质上还是一个工具,只是它内部会启动另一个 Agent。

import { createReactAgent } from "@langchain/langgraph/prebuilt";
import { tool } from "@langchain/core/tools";
import { z } from "zod";

function createTaskTool(
  baseTools: any[],
  model: any,
  subAgentConfigs: SubAgentConfig[],
) {
  const agentInstances = new Map<string, any>();

  for (const config of subAgentConfigs) {
    const filteredTools = config.allowedTools
      ? baseTools.filter((tool) => config.allowedTools!.includes(tool.name))
      : baseTools.filter((tool) => tool.name !== "TaskTool");

    agentInstances.set(
      config.type,
      createReactAgent({
        llm: model,
        tools: filteredTools,
        systemMessage: config.systemPrompt,
      }),
    );
  }

  return tool(
    async (args: { description: string; subagent_type: string }) => {
      const { description, subagent_type } = args;

      const agent = agentInstances.get(subagent_type);
      if (!agent) {
        throw new Error(`未知 SubAgent 类型:${subagent_type}`);
      }

      const subAgentState = {
        messages: [
          {
            role: "user",
            content: description,
          },
        ],
      };

      const result = await agent.invoke(subAgentState);
      const finalMessage = result.messages[result.messages.length - 1];

      return `
SubAgent 执行完成。

类型:${subagent_type}

任务:
${description}

结果:
${finalMessage.content}

请主 Agent 根据该结果提取关键信息,并面向用户给出最终回答。
      `.trim();
    },
    {
      name: "TaskTool",
      description: `
启动专门的 SubAgent 来处理复杂多步骤任务。

可用类型:
- general-purpose:通用任务,适合搜索、分析和多步骤执行
- code-analyzer:代码分析,适合架构、质量、性能、安全问题定位
- document-writer:文档编写,适合 API 文档、使用说明和项目文档

使用规则:
- 复杂任务、跨文件分析、大范围搜索时优先使用
- 每次调用都是独立执行环境
- 任务描述必须具体,包含目标、范围和输出要求
- SubAgent 完成后,由主 Agent 负责总结给用户
      `.trim(),
      schema: z.object({
        description: z.string().describe("给 SubAgent 的详细任务描述"),
        subagent_type: z
          .enum(["general-purpose", "code-analyzer", "document-writer"])
          .describe("要使用的 SubAgent 类型"),
      }),
    },
  );
}

这段实现里最重要的是隔离:SubAgent 的输入只有 description,不会自动拿到主 Agent 的全部历史消息。这样可以减少上下文污染,也能降低 token 消耗。

并发安全与非并发安全工具

代码智能体里有些工具天然适合并发,例如读文件、搜索文件;有些工具必须串行,例如写文件、执行命令。可以把工具分成两组,交给不同的 ToolNode。

const safeConcurrencyTools = [
  taskTool,
  readFileTool,
  grepTool,
  globTool,
  lsTool,
];

const unsafeConcurrencyTools = [
  writeFileTool,
  editFileTool,
  bashTool,
];

const safeConcurrencyToolNode = new ToolNode(safeConcurrencyTools, {
  maxConcurrency: 10,
  name: "safeConcurrencyTools",
});

const unsafeConcurrencyToolNode = new ToolNode(unsafeConcurrencyTools, {
  maxConcurrency: 1,
  name: "unsafeConcurrencyTools",
});

路由逻辑根据模型调用的工具名决定去哪:

function getToolCalls(message: any) {
  return Array.isArray(message.tool_calls) ? message.tool_calls : [];
}

function isSafeConcurrencyTool(name: string) {
  return safeConcurrencyTools.some((tool) => tool.name === name);
}

const shouldContinue = (state: typeof agentState.State) => {
  const { messages } = state;
  const lastMessage = messages[messages.length - 1];
  const toolCalls = getToolCalls(lastMessage);

  if (toolCalls.length === 0) {
    return END;
  }

  const allSafe = toolCalls.every((call) => isSafeConcurrencyTool(call.name));

  return allSafe ? "safeConcurrencyTools" : "unsafeConcurrencyTools";
};

const graph = new StateGraph(agentState)
  .addNode("llm", callModel)
  .addNode("safeConcurrencyTools", safeConcurrencyToolNode)
  .addNode("unsafeConcurrencyTools", unsafeConcurrencyToolNode)
  .addEdge(START, "llm")
  .addConditionalEdges("llm", shouldContinue, [
    "safeConcurrencyTools",
    "unsafeConcurrencyTools",
    END,
  ])
  .addEdge("safeConcurrencyTools", "llm")
  .addEdge("unsafeConcurrencyTools", "llm");

这种分组方式比“所有工具都并发”更安全,也比“所有工具都串行”更快。

Todo 任务管理:让 Agent 自己维护进度

Claude Code 风格的代码智能体有一个很实用的能力:遇到复杂任务时自动创建 Todo,并在执行过程中持续更新状态。工程实现并不复杂,关键在于把任务列表作为状态,同时提供 TodoReadTodoWrite 两个工具。

流程如下:

flowchart TD
    USER[复杂用户请求] --> AGENT[Agent]
    AGENT -->|判断需要拆任务| WRITE[TodoWrite 创建任务列表]
    WRITE --> AGENT
    AGENT --> READ[TodoRead 读取当前任务]
    READ --> WORK[执行某个任务]
    WORK --> UPDATE[TodoWrite 更新状态]
    UPDATE --> AGENT
    AGENT -->|全部完成| FINAL[汇总回复用户]

状态定义

export enum TaskStatus {
  Pending = "pending",
  InProgress = "in_progress",
  Completed = "completed",
  Failed = "failed",
  Blocked = "blocked",
}

export interface TodoItem {
  id: string;
  name: string;
  desc: string;
  status: TaskStatus;
  priority?: "high" | "medium" | "low";
  startTime?: string;
  endTime?: string;
  error?: string;
}

const agentState = Annotation.Root({
  messages: Annotation<BaseMessage[]>({
    reducer: safeMessagesStateReducer,
    default: () => [],
  }),

  todoList: Annotation<TodoItem[]>({
    reducer: (_oldValue, newValue) => newValue,
    default: () => [],
  }),
});

这里把 todoList 放进 LangGraph State。生产环境也可以把它落到文件、Redis 或数据库里,State 中只保留当前快照。

TodoWrite 工具

TodoWrite 负责接受模型生成的新任务列表,并写回 State。

import { Command, getCurrentTaskInput } from "@langchain/langgraph";
import { ToolMessage } from "@langchain/core/messages";
import { tool } from "@langchain/core/tools";
import { z } from "zod";

export function createTodoWriteTool() {
  const executor = async (
    args: { todoList: TodoItem[] },
    config: any,
  ) => {
    const state = getCurrentTaskInput() as typeof agentState.State;

    const normalizedTodoList = args.todoList.map((item) => ({
      ...item,
      id: item.id || crypto.randomUUID(),
    }));

    const responseMsg = new ToolMessage({
      content: `任务列表已更新,共 ${normalizedTodoList.length} 个任务。`,
      name: "TodoWrite",
      tool_call_id: config.toolCall.id,
    });

    return new Command({
      update: {
        todoList: normalizedTodoList,
        messages: state.messages.concat(responseMsg),
      },
    });
  };

  return tool(executor, {
    name: "TodoWrite",
    description: `
创建和更新当前会话的任务列表。

适合使用:
- 用户请求包含 3 个以上步骤
- 任务需要跨文件修改或多轮操作
- 用户明确要求制定计划或跟踪进度
- 执行过程中发现新的后续任务
- 开始任务前需要标记 in_progress
- 完成任务后需要标记 completed

不适合使用:
- 单步、简单、可立即完成的请求
- 纯解释型问题
- 无需执行动作的信息查询

规则:
- 同一时间通常只保持一个任务为 in_progress
- 只有确认完成后才能标记 completed
- 遇到阻塞要标记 blocked,并说明原因
- 执行失败要标记 failed,并写入 error
    `.trim(),
    schema: z.object({
      todoList: z.array(
        z.object({
          id: z.string().describe("任务唯一标识"),
          name: z.string().describe("任务名称"),
          desc: z.string().describe("任务描述"),
          status: z.nativeEnum(TaskStatus).describe("任务状态"),
          priority: z.enum(["high", "medium", "low"]).optional(),
          startTime: z.string().optional(),
          endTime: z.string().optional(),
          error: z.string().optional(),
        }),
      ),
    }),
  });
}

TodoRead 工具

TodoRead 负责把当前任务列表返回给模型。

export function createTodoReadTool() {
  const executor = async (_args: {}, config: any) => {
    const state = getCurrentTaskInput() as typeof agentState.State;
    const currentTasks = state.todoList ?? [];

    return new ToolMessage({
      content: `
请继续使用 TodoRead 和 TodoWrite 跟踪任务进度。

当前任务列表:
${JSON.stringify(currentTasks, null, 2)}
      `.trim(),
      name: "TodoRead",
      tool_call_id: config.toolCall.id,
    });
  };

  return tool(executor, {
    name: "TodoRead",
    description:
      "读取当前会话的任务列表。开始复杂任务、完成任务、遇到阻塞或不确定下一步时,应主动读取任务状态。",
    schema: z.object({}),
  });
}

Todo 工具能否稳定工作,很大程度取决于提示词。工程侧只负责保存和读取,真正决定“什么时候创建任务、什么时候更新状态”的,是工具描述和系统提示词。

可以把任务管理规则放进主 Agent 的 system prompt:

const todoPrompt = `
你具备任务管理能力,需要遵守这些规则:

1. 对复杂请求使用 TodoWrite 创建任务列表:
   - 任务包含 3 个以上步骤
   - 需要跨文件搜索、修改、验证
   - 用户给出多个目标
   - 用户明确要求制定计划

2. 执行前使用 TodoRead 查看当前任务状态。

3. 开始处理某个任务前,将它标记为 in_progress。

4. 完成任务后,将它标记为 completed。

5. 遇到无法继续的情况,将任务标记为 blocked,并说明阻塞原因。

6. 执行失败时,将任务标记为 failed,并写明错误信息。

7. 不要为简单单步问题创建任务列表。
`.trim();

const systemPrompt = `
你是一个代码智能体,可以阅读文件、搜索代码、修改文件、执行安全命令,并通过 Todo 工具管理复杂任务。

${todoPrompt}
`.trim();

一个好用的 Todo 机制,本质是“规则”和“智能”的分界:状态写入、状态校验、持久化由代码保证;拆不拆任务、任务怎么命名、何时更新状态交给模型判断。

上下文压缩:长对话里的记忆管理

代码智能体会产生大量消息:用户需求、工具调用、文件内容、命令输出、修改记录、错误日志。如果全部塞进模型上下文,很快会触达上下文窗口上限。

常见做法是在每次调用模型前检查 token 使用量,超过阈值后把历史消息压缩成摘要。

flowchart TD
    A[进入 LLM 节点前] --> B[检查最新 token usage]
    B --> C{是否超过阈值}
    C -->|否| D[正常调用 LLM]
    C -->|是| E[调用压缩模型生成摘要]
    E --> F[删除部分历史消息]
    F --> G[插入摘要消息]
    G --> D

在 State 中记录压缩历史

interface CompressionRecord {
  beforeTokens: number;
  afterTokens: number;
  createdAt: string;
  summary: string;
}

const agentStateWithCompression = Annotation.Root({
  messages: Annotation<BaseMessage[]>({
    reducer: safeMessagesStateReducer,
    default: () => [],
  }),

  todoList: Annotation<TodoItem[]>({
    reducer: (_oldValue, newValue) => newValue,
    default: () => [],
  }),

  compressionHistory: Annotation<CompressionRecord[]>({
    reducer: (oldValue, newValue) => [
      ...(oldValue ?? []),
      ...(newValue ?? []),
    ],
    default: () => [],
  }),
});

单独设置压缩节点

压缩逻辑可以写在 callModel 内部,也可以拆成单独节点。拆成节点更容易观察、测试和调试。

const workflow = new StateGraph(agentStateWithCompression)
  .addNode("compression", compressionNode)
  .addNode("llm", callModel)
  .addNode("tools", toolNode)
  .addEdge(START, "compression")
  .addEdge("compression", "llm")
  .addConditionalEdges("llm", shouldContinue, ["tools", END])
  .addEdge("tools", "compression");

这样每轮工具执行回来后,都会先检查是否需要压缩,再决定是否进入模型。

Token 使用量检测

有些模型会在响应元数据里返回 token 使用情况。为了减少扫描成本,可以从最新消息倒序找 usage。

import { AIMessage, BaseMessage } from "@langchain/core/messages";

function isAIMessage(msg: BaseMessage): msg is AIMessage {
  return msg.getType() === "ai";
}

export function getLatestTokenUsage(messages: BaseMessage[]): number {
  for (let i = messages.length - 1; i >= 0; i--) {
    const msg = messages[i];

    if (isAIMessage(msg) && msg.response_metadata?.usage) {
      const usage = msg.response_metadata.usage;

      return (
        (usage.total_tokens ?? 0) +
        (usage.cache_creation_tokens ?? 0) +
        (usage.cache_read_tokens ?? 0)
      );
    }
  }

  return estimateTokens(messages);
}

压缩阈值不要简单写成“达到模型最大上下文就压缩”,因为还要给模型输出预留空间。

function needsCompress(params: {
  usedTokens: number;
  maxContextTokens: number;
  reservedOutputTokens: number;
  thresholdRatio: number;
}) {
  const usableInputTokens =
    params.maxContextTokens - params.reservedOutputTokens;

  return params.usedTokens >= usableInputTokens * params.thresholdRatio;
}

const shouldCompress = needsCompress({
  usedTokens,
  maxContextTokens: 200_000,
  reservedOutputTokens: 16_000,
  thresholdRatio: 0.92,
});

压缩节点实现

import { AIMessage, RemoveMessage } from "@langchain/core/messages";

async function compressionNode(
  state: typeof agentStateWithCompression.State,
) {
  const usedTokens = getLatestTokenUsage(state.messages);

  const shouldCompress = needsCompress({
    usedTokens,
    maxContextTokens: 200_000,
    reservedOutputTokens: 16_000,
    thresholdRatio: 0.92,
  });

  if (!shouldCompress) {
    return {};
  }

  const compressionPrompt = generateCompressionPrompt();

  const compressedSummary = await compressionModel.invoke([
    {
      role: "system",
      content: compressionPrompt,
    },
    ...state.messages,
  ]);

  const messagesToRemove = selectMessagesToRemove(state.messages);

  const summaryMessage = new AIMessage({
    content: `
以下是此前对话和工作的压缩摘要。后续继续执行任务时,必须把它当作已有上下文。

${compressedSummary.content}
    `.trim(),
  });

  return {
    messages: [
      ...messagesToRemove.map((message) => new RemoveMessage({ id: message.id! })),
      summaryMessage,
    ],
    compressionHistory: [
      {
        beforeTokens: usedTokens,
        afterTokens: estimateTokens([summaryMessage]),
        createdAt: new Date().toISOString(),
        summary: String(compressedSummary.content),
      },
    ],
  };
}

selectMessagesToRemove 需要谨慎设计。一般会保留最近几轮对话、当前工具调用链、系统消息和未完成任务相关内容,把更早的长历史压缩掉。

摘要提示词结构

上下文压缩最怕丢失关键细节。可以把摘要固定成 8 个必选部分,再加一个可选下一步:

部分要保留的信息
主要请求和意图用户明确提出过的目标、约束、偏好
关键技术概念涉及的框架、库、架构、协议、模型
文件和代码段读过、改过、创建过的文件,以及关键代码片段
错误和修复遇到的报错、原因、修复方式、用户反馈
已解决问题已经完成的分析、修改、验证结果
用户消息所有非工具结果的用户指令,尤其是需求变化
待处理任务明确还没完成的事项
当前工作压缩发生前正在做的具体操作
可选下一步和当前工作直接相关的下一步行动

提示词可以这样写:

你的任务是为当前代码智能体会话生成详细摘要,用于在压缩上下文后继续开发工作。

摘要必须保留技术细节、文件路径、代码结构、架构决策、用户明确要求和仍未完成的任务。

生成最终摘要前,请先使用 <analysis> 标签梳理信息,检查是否遗漏关键内容。

最终摘要必须包含:

1. 主要请求和意图
2. 关键技术概念
3. 文件和代码段
4. 错误和修复
5. 已解决问题
6. 用户消息
7. 待处理任务
8. 当前工作
9. 可选下一步

不要泛泛而谈。涉及代码时保留必要代码片段、函数名、文件名和修改原因。

上下文压缩是一种有损操作。越激进,越省 token,也越容易丢信息。代码智能体通常更适合保守压缩:多保留近期上下文、完整保留用户需求、谨慎删除工具结果。

流式输出与实时中断恢复

普通流式输出只是把模型生成过程实时展示出来。代码智能体还需要更进一步:用户在 Agent 执行中途输入新指令时,系统可以中断当前运行,保存状态,然后用新输入继续。

可以用 AbortSignal + Checkpointer + streamEvents 做一个基础版本。

sequenceDiagram
    participant U as 用户
    participant F as 前端
    participant S as 服务端
    participant G as LangGraph
    participant C as Checkpointer

    U->>F: 输入任务
    F->>S: 发起请求并携带 AbortSignal
    S->>G: streamEvents 启动图
    G->>C: 持续保存检查点
    G-->>F: 流式返回事件
    U->>F: 中途输入新指令
    F->>S: abort 当前请求
    S->>G: 中断图运行
    G->>C: 保存中断前状态
    F->>S: 发送新指令
    S->>C: 读取检查点
    S->>G: 从检查点继续运行

服务端把 signal 传给 LangGraph:

const config = {
  configurable: {
    thread_id: sessionId,
  },
  streamMode: ["values", "messages", "updates"] as any,
  version: "v2" as const,
  signal: abortSignal,
};

const stream = agent.streamEvents(
  {
    messages: [inputMessage],
  },
  config,
);

for await (const event of stream) {
  sendToClient(event);
}

前端请求使用 AbortController

const abortController = new AbortController();

const response = await fetch("/api/agent", {
  method: "POST",
  body: JSON.stringify({
    sessionId,
    message,
  }),
  headers: {
    "Content-Type": "application/json",
  },
  signal: abortController.signal,
});

// 用户输入新指令时
abortController.abort();

中断后,新请求继续使用同一个 thread_id,LangGraph 就能从检查点恢复。

await agent.invoke(
  {
    messages: [
      {
        role: "user",
        content: newUserMessage,
      },
    ],
  },
  {
    configurable: {
      thread_id: sessionId,
    },
  },
);

要做得更接近成熟代码智能体,还需要补几层工程能力:

能力作用
消息队列用户连续输入时按顺序处理
背压控制防止前端、服务端、模型事件流互相压垮
意图合并用户新输入可能是补充、纠错、取消或改变方向
工具取消中断时需要停止正在执行的 shell、文件扫描或网络请求
部分结果保留已完成的工具结果应进入上下文,未完成结果不能伪装成成功

基础版本可以先实现“中断当前运行并用新消息恢复”,再逐步补齐队列和取消语义。

完整图结构

把前面的模块组合起来,可以得到一个简版 Claude Code 风格智能体:

flowchart TD
    START([START]) --> COMPRESS[上下文压缩检查]
    COMPRESS --> LLM[调用 LLM]

    LLM -->|无工具调用| END([END])

    LLM -->|需要人工确认的工具| REVIEW[人工确认]
    REVIEW -->|同意| ROUTER[工具路由]
    REVIEW -->|拒绝 / 修改| LLM

    LLM -->|无需人工确认的工具| ROUTER

    ROUTER -->|并发安全| SAFE[并发安全工具节点]
    ROUTER -->|非并发安全| UNSAFE[串行工具节点]

    SAFE --> COMPRESS
    UNSAFE --> COMPRESS

    SAFE -.包含.-> TASK[TaskTool]
    TASK -.创建.-> SUB[SubAgent]

    SAFE -.包含.-> TODOREAD[TodoRead]
    SAFE -.包含.-> TODOWRITE[TodoWrite]

对应能力关系如下:

模块LangGraph 实现方式解决的问题
ReAct 循环llm 节点 + tools 节点 + 条件边让模型自主选择工具并循环执行
人工确认interrupt + Command危险操作前暂停,等待用户决定
检查点checkpointer中断恢复、多轮会话、跨机器恢复
SubAgentTaskTool 内部创建独立 Agent复杂任务隔离执行
并发工具多个 ToolNode + 路由函数区分读操作并发和写操作串行
TodotodoList State + TodoRead/TodoWrite长任务进度跟踪
上下文压缩独立 compression 节点长对话避免超过上下文窗口
实时中断AbortSignal + 检查点恢复执行中接收用户新指令

工程落地时的几个坑

不要把所有逻辑都交给模型

适合交给代码强约束的逻辑包括:

  • token 是否超过阈值
  • 哪些工具需要人工确认
  • 哪些工具允许并发
  • 检查点如何保存
  • 文件写入前后的 diff 校验
  • shell 命令超时和取消

适合交给模型判断的逻辑包括:

  • 是否需要拆任务
  • 任务如何命名和排序
  • 是否需要创建 SubAgent
  • 当前结果如何解释给用户
  • 需求不明确时该问什么问题

规则驱动负责可靠性,模型驱动负责灵活性。两者混在一起时,系统会变得既不可控又不好调试。

SubAgent 必须限制工具

如果 SubAgent 能继续调用 TaskTool,很容易出现递归创建 Agent 的问题。更安全的做法是:

const filteredTools = baseTools.filter((tool) => tool.name !== "TaskTool");

对于专用 SubAgent,还应该只给必要工具。例如文档 Agent 不一定需要 Bash,代码分析 Agent 多数情况下也不需要写文件。

Todo 工具的提示词比代码更重要

TodoWrite 的代码只是保存列表。模型是否会主动更新任务,主要取决于工具描述和 system prompt。提示词里要清楚写出:

  • 什么时候用
  • 什么时候不用
  • 状态怎么流转
  • 什么情况下不能标记完成
  • 遇到失败或阻塞怎么处理

只写“管理任务列表”这种描述,效果通常不稳定。

压缩摘要必须保留用户需求变化

代码智能体长时间执行时,用户可能中途改变目标。如果压缩摘要只保留技术细节,不保留用户消息和需求变化,后续 Agent 很容易继续执行旧目标。

因此摘要中一定要有“用户消息”和“当前工作”两个部分。

流式中断不等于真正取消一切

AbortSignal 可以中断请求和图运行,但工具内部也要支持取消。例如 shell 命令、文件扫描、远程请求都需要接收取消信号。否则前端看起来已经停止,后端任务仍然在跑。

一个可执行的建设顺序

真正动手时,不需要一次实现所有模块。更稳妥的顺序是:

阶段目标
1搭出基础 ReAct Agent,支持读文件和搜索
2加入写文件、执行命令,并为危险工具增加人工确认
3接入 Checkpointer,让中断和多轮会话可恢复
4增加 TodoRead / TodoWrite,让复杂任务可跟踪
5增加 TaskTool 和少量 SubAgent 类型
6区分并发安全工具和串行工具
7增加上下文压缩节点
8接入流式输出、AbortSignal 和前端中断恢复

做到第 4 阶段,已经能得到一个可用的代码助手;做到第 6 阶段,复杂任务体验会明显改善;做到第 8 阶段,交互方式会更接近成熟的代码智能体。

Claude Code 风格的核心设计并不是单点能力,而是多个简单机制叠加后的系统效果:用 ReAct 完成工具循环,用 Human-in-the-loop 控制风险,用 SubAgent 隔离复杂任务,用 Todo 维持执行进度,用上下文压缩支撑长会话,再用流式中断让用户随时改变方向。LangGraph 的价值就在于这些机制都可以显式建模,既能让模型保持自主性,又能在关键路径上用工程规则兜底。


评论