第 10 课

第10课:Team Protocols —— 队友之间的"规矩"

一句话总结:队友之间需要正式的沟通流程。关闭协议:先请求再关闭;方案审批:先提交再执行。两个协议都用同一个 request_id 来关联请求和回复。

一句话总结:队友之间需要正式的沟通流程。关闭协议:先请求再关闭;方案审批:先提交再执行。两个协议都用同一个 request_id 来关联请求和回复。


你将学到什么


核心概念:从"自由沟通"到"正式流程"

问题:没有规矩的团队会出乱子

上一课我们的Agent团队能互相发消息了,但有两个问题:

问题1:Lead想关掉一个队友,直接杀掉线程?

text
Lead: "alice你不用了,我把你关了" (直接杀掉alice的线程) alice可能正在写文件,写到一半被杀了 --> 文件损坏!

问题2:队友做了一个重要决定,没人审批?

text
alice: "我觉得应该把整个数据库重构一下" (直接动手重构) Lead: "等等!我没让你重构啊!" --> 为时已晚

类比: 这就像公司里——你不能直接开除员工(要走离职流程),员工做重大决策前要走审批(不能擅自做主)。

解决方案:两个协议

协议1:关闭协议                         协议2:方案审批

Lead: "alice请关闭"                    alice: "我想重构数据库,可以吗?"
  |                                      |
  v                                      v
alice: "好的,我同意关闭"               Lead: "可以,批准了" / "不行,换个方案"
  |                                      |
  v                                      v
Lead: 确认后关闭alice                   alice: 执行批准的方案 / 修改方案重新提交

关键设计:request_id 关联模式

两个协议看起来不同,但底层用的是同一个模式

text
1. 发起方生成一个唯一的 request_id(比如 "abc123") 2. 请求消息带着这个 request_id 发出去 3. 回复消息也带着同一个 request_id 回来 4. 通过 request_id 就能把请求和回复对应上 这就像快递单号:你寄了一个包裹,快递单号是 abc123。 收件人签收后,你通过 abc123 就能查到"已签收"。

状态机:pending -> approved / rejected

每个协议请求都有一个状态:

                    +-- approved(同意)
                    |
pending(等待中)---+
                    |
                    +-- rejected(拒绝)

这两种状态转换覆盖了所有场景,没有别的可能。


ASCII流程图

关闭协议流程

Lead                                    队友 (alice)
  |                                       |
  | 1. shutdown_request                   |
  |    {request_id: "abc"}                |
  |-------------------------------------->|
  |                                       |
  |                  2. 队友收到请求       |
  |                     决定:同意还是拒绝?|
  |                                       |
  |    3. shutdown_response               |
  |    {request_id: "abc", approve: true} |
  |<--------------------------------------|
  |                                       |
  | 4. Lead确认,队友关闭                  |
  |                                  [线程结束]

状态追踪器(Lead端):
  shutdown_requests = {
    "abc": {"target": "alice", "status": "pending"}
       --> 收到回复后变成 "approved" 或 "rejected"
  }

方案审批流程

队友 (alice)                            Lead
  |                                       |
  | 1. plan_approval                      |
  |    {request_id: "xyz", plan: "重构"}  |
  |-------------------------------------->|
  |                                       |
  |                  2. Lead收到方案       |
  |                     审批:批准还是驳回?|
  |                                       |
  |    3. plan_approval_response          |
  |    {request_id: "xyz", approve: true} |
  |<--------------------------------------|
  |                                       |
  | 4. alice收到批准,开始执行             |
  |                                       |

状态追踪器(Lead端):
  plan_requests = {
    "xyz": {"from": "alice", "plan": "重构", "status": "pending"}
       --> 审批后变成 "approved" 或 "rejected"
  }

和上一课的对比

第9课的结构:                          第10课的结构:

+-- 初始化配置                         +-- 初始化配置
+-- MessageBus(信箱系统)             +-- MessageBus(一样)
+-- TeammateManager                   +-- 状态追踪器(新增!shutdown_requests, plan_requests)
                                      +-- TeammateManager(队友多了2个工具)
      队友6个工具                            队友8个工具:
      - bash, read_file, ...                 - bash, read_file, ...(基础4个)
      - send_message, read_inbox             - send_message, read_inbox(通信2个)
                                             - shutdown_response(新增!回复关闭请求)
                                             - plan_approval(新增!提交方案审批)
+-- TOOL_HANDLERS(9个工具)           +-- 协议处理函数(新增!)
                                      +-- TOOL_HANDLERS(12个工具!+3个协议工具)
+-- TOOLS(9个工具)                   +-- TOOLS(12个工具!)
+-- agent_loop(读信箱)               +-- agent_loop(一样!)
+-- main                              +-- main

关键发现: agent_loop 一行都没改!协议只是多了几种消息类型和对应的工具。

新增的3个Lead工具:

  1. shutdown_request:发起关闭请求
  2. shutdown_response:查看关闭请求状态
  3. plan_approval:审批队友提交的方案

新增的2个队友工具:

  1. shutdown_response:回复关闭请求(同意/拒绝)
  2. plan_approval:提交方案等待审批

完整代码

源代码文件:agents/s10_team_protocols.py

python
#!/usr/bin/env python3 """ s10_team_protocols.py - 团队协议 在s09团队通信的基础上,加入两个正式协议: 1. 关闭协议:Lead请求关闭队友,队友同意后才关闭 2. 方案审批:队友提交方案,Lead审批通过后才执行 核心模式:request_id 关联(请求和回复通过同一个ID对应) """ import json import os import subprocess import threading import time import uuid # 新增:用来生成唯一的request_id from pathlib import Path from anthropic import Anthropic from dotenv import load_dotenv # ============================================================ # 初始化配置(和s09一样) # ============================================================ load_dotenv(override=True) if os.getenv("ANTHROPIC_BASE_URL"): os.environ.pop("ANTHROPIC_AUTH_TOKEN", None) WORKDIR = Path.cwd() client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL")) MODEL = os.environ["MODEL_ID"] TEAM_DIR = WORKDIR / ".team" INBOX_DIR = TEAM_DIR / "inbox" # 系统提示词,提到了协议 SYSTEM = f"You are a team lead at {WORKDIR}. Manage teammates with shutdown and plan approval protocols." VALID_MSG_TYPES = { "message", "broadcast", "shutdown_request", "shutdown_response", "plan_approval_response", } # ============================================================ # 【新增】状态追踪器 # 两个字典,分别追踪关闭请求和方案审批请求的状态 # 这就是协议的"记忆"——记住谁发了什么请求,现在是什么状态 # ============================================================ shutdown_requests = {} # {request_id: {"target": 队友名, "status": "pending/approved/rejected"}} plan_requests = {} # {request_id: {"from": 队友名, "plan": 方案内容, "status": "pending/approved/rejected"}} _tracker_lock = threading.Lock() # 多线程安全锁,防止同时读写追踪器 # ============================================================ # MessageBus:信箱系统(和s09完全一样) # ============================================================ class MessageBus: def __init__(self, inbox_dir: Path): self.dir = inbox_dir self.dir.mkdir(parents=True, exist_ok=True) def send(self, sender: str, to: str, content: str, msg_type: str = "message", extra: dict = None) -> str: if msg_type not in VALID_MSG_TYPES: return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}" msg = { "type": msg_type, "from": sender, "content": content, "timestamp": time.time(), } if extra: msg.update(extra) inbox_path = self.dir / f"{to}.jsonl" with open(inbox_path, "a") as f: f.write(json.dumps(msg) + "\n") return f"Sent {msg_type} to {to}" def read_inbox(self, name: str) -> list: inbox_path = self.dir / f"{name}.jsonl" if not inbox_path.exists(): return [] messages = [] for line in inbox_path.read_text().strip().splitlines(): if line: messages.append(json.loads(line)) inbox_path.write_text("") return messages def broadcast(self, sender: str, content: str, teammates: list) -> str: count = 0 for name in teammates: if name != sender: self.send(sender, name, content, "broadcast") count += 1 return f"Broadcast to {count} teammates" BUS = MessageBus(INBOX_DIR) # ============================================================ # TeammateManager:队友管理器(在s09基础上加了协议支持) # ============================================================ class TeammateManager: def __init__(self, team_dir: Path): self.dir = team_dir self.dir.mkdir(exist_ok=True) self.config_path = self.dir / "config.json" self.config = self._load_config() self.threads = {} def _load_config(self) -> dict: if self.config_path.exists(): return json.loads(self.config_path.read_text()) return {"team_name": "default", "members": []} def _save_config(self): self.config_path.write_text(json.dumps(self.config, indent=2)) def _find_member(self, name: str) -> dict: for m in self.config["members"]: if m["name"] == name: return m return None def spawn(self, name: str, role: str, prompt: str) -> str: """创建队友(和s09一样)""" member = self._find_member(name) if member: if member["status"] not in ("idle", "shutdown"): return f"Error: '{name}' is currently {member['status']}" member["status"] = "working" member["role"] = role else: member = {"name": name, "role": role, "status": "working"} self.config["members"].append(member) self._save_config() thread = threading.Thread( target=self._teammate_loop, args=(name, role, prompt), daemon=True, ) self.threads[name] = thread thread.start() return f"Spawned '{name}' (role: {role})" def _teammate_loop(self, name: str, role: str, prompt: str): """ 队友的Agent循环(在s09基础上加了协议支持) 关键变化: 1. 系统提示词提到了协议 2. 多了should_exit标志,用于优雅关闭 3. 多了shutdown_response和plan_approval两个工具 """ # 【变化1】系统提示词里提到了协议 sys_prompt = ( f"You are '{name}', role: {role}, at {WORKDIR}. " f"Submit plans via plan_approval before major work. " # 做大事前要审批 f"Respond to shutdown_request with shutdown_response." # 收到关闭请求要回复 ) messages = [{"role": "user", "content": prompt}] tools = self._teammate_tools() # 【变化2】should_exit标志:队友同意关闭后,完成当前工具调用再退出 # 不是立即"杀掉",而是优雅地结束 should_exit = False for _ in range(50): # 读信箱 inbox = BUS.read_inbox(name) for msg in inbox: messages.append({"role": "user", "content": json.dumps(msg)}) # 如果已经同意关闭了,完成当前轮次后退出 if should_exit: break try: response = client.messages.create( model=MODEL, system=sys_prompt, messages=messages, tools=tools, max_tokens=8000, ) except Exception: break messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": break results = [] for block in response.content: if block.type == "tool_use": output = self._exec(name, block.name, block.input) print(f" [{name}] {block.name}: {str(output)[:120]}") results.append({ "type": "tool_result", "tool_use_id": block.id, "content": str(output), }) # 【变化3】如果队友同意了关闭,标记should_exit if block.name == "shutdown_response" and block.input.get("approve"): should_exit = True messages.append({"role": "user", "content": results}) # 【变化4】退出时根据是否是主动关闭来设置状态 member = self._find_member(name) if member: member["status"] = "shutdown" if should_exit else "idle" self._save_config() def _exec(self, sender: str, tool_name: str, args: dict) -> str: """ 队友的工具执行器 比s09多了2个工具:shutdown_response 和 plan_approval """ # 基础4个工具(和s09一样) if tool_name == "bash": return _run_bash(args["command"]) if tool_name == "read_file": return _run_read(args["path"]) if tool_name == "write_file": return _run_write(args["path"], args["content"]) if tool_name == "edit_file": return _run_edit(args["path"], args["old_text"], args["new_text"]) # 通信2个工具(和s09一样) if tool_name == "send_message": return BUS.send(sender, args["to"], args["content"], args.get("msg_type", "message")) if tool_name == "read_inbox": return json.dumps(BUS.read_inbox(sender), indent=2) # 【新增】关闭协议:队友回复关闭请求 if tool_name == "shutdown_response": req_id = args["request_id"] # 关联的请求ID approve = args["approve"] # 同意还是拒绝 # 更新Lead端的状态追踪器 with _tracker_lock: if req_id in shutdown_requests: shutdown_requests[req_id]["status"] = "approved" if approve else "rejected" # 通过信箱把回复发给Lead BUS.send( sender, "lead", args.get("reason", ""), "shutdown_response", {"request_id": req_id, "approve": approve}, ) return f"Shutdown {'approved' if approve else 'rejected'}" # 【新增】方案审批:队友提交方案 if tool_name == "plan_approval": plan_text = args.get("plan", "") req_id = str(uuid.uuid4())[:8] # 生成唯一的请求ID # 在追踪器里记录这个方案请求 with _tracker_lock: plan_requests[req_id] = { "from": sender, "plan": plan_text, "status": "pending", } # 把方案发到Lead的信箱 BUS.send( sender, "lead", plan_text, "plan_approval_response", {"request_id": req_id, "plan": plan_text}, ) return f"Plan submitted (request_id={req_id}). Waiting for lead approval." return f"Unknown tool: {tool_name}" def _teammate_tools(self) -> list: """队友可用的工具列表(8个,比s09多了2个协议工具)""" return [ # 基础4个 {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "edit_file", "description": "Replace exact text in file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, # 通信2个 {"name": "send_message", "description": "Send message to a teammate.", "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}}, {"name": "read_inbox", "description": "Read and drain your inbox.", "input_schema": {"type": "object", "properties": {}}}, # 【新增】协议2个 {"name": "shutdown_response", "description": "Respond to a shutdown request. Approve to shut down, reject to keep working.", "input_schema": {"type": "object", "properties": { "request_id": {"type": "string"}, "approve": {"type": "boolean"}, "reason": {"type": "string"} }, "required": ["request_id", "approve"]}}, {"name": "plan_approval", "description": "Submit a plan for lead approval. Provide plan text.", "input_schema": {"type": "object", "properties": { "plan": {"type": "string"} }, "required": ["plan"]}}, ] def list_all(self) -> str: if not self.config["members"]: return "No teammates." lines = [f"Team: {self.config['team_name']}"] for m in self.config["members"]: lines.append(f" {m['name']} ({m['role']}): {m['status']}") return "\n".join(lines) def member_names(self) -> list: return [m["name"] for m in self.config["members"]] TEAM = TeammateManager(TEAM_DIR) # ============================================================ # 基础工具实现(和s09完全一样,不赘述) # ============================================================ def _safe_path(p: str) -> Path: path = (WORKDIR / p).resolve() if not path.is_relative_to(WORKDIR): raise ValueError(f"Path escapes workspace: {p}") return path def _run_bash(command: str) -> str: dangerous = ["rm -rf /", "sudo", "shutdown", "reboot"] if any(d in command for d in dangerous): return "Error: Dangerous command blocked" try: r = subprocess.run(command, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=120) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" def _run_read(path: str, limit: int = None) -> str: try: lines = _safe_path(path).read_text().splitlines() if limit and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more)"] return "\n".join(lines)[:50000] except Exception as e: return f"Error: {e}" def _run_write(path: str, content: str) -> str: try: fp = _safe_path(path) fp.parent.mkdir(parents=True, exist_ok=True) fp.write_text(content) return f"Wrote {len(content)} bytes" except Exception as e: return f"Error: {e}" def _run_edit(path: str, old_text: str, new_text: str) -> str: try: fp = _safe_path(path) c = fp.read_text() if old_text not in c: return f"Error: Text not found in {path}" fp.write_text(c.replace(old_text, new_text, 1)) return f"Edited {path}" except Exception as e: return f"Error: {e}" # ============================================================ # 【新增】Lead端的协议处理函数 # ============================================================ def handle_shutdown_request(teammate: str) -> str: """ Lead发起关闭请求 1. 生成唯一的request_id 2. 在追踪器里记录(状态:pending) 3. 通过信箱发送关闭请求给队友 """ req_id = str(uuid.uuid4())[:8] # 生成8位唯一ID with _tracker_lock: # 在追踪器里登记这个请求 shutdown_requests[req_id] = {"target": teammate, "status": "pending"} # 通过信箱发送关闭请求 BUS.send( "lead", teammate, "Please shut down gracefully.", "shutdown_request", {"request_id": req_id}, ) return f"Shutdown request {req_id} sent to '{teammate}' (status: pending)" def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> str: """ Lead审批队友的方案 1. 通过request_id找到对应的方案请求 2. 更新追踪器状态 3. 通过信箱把审批结果发回给队友 """ with _tracker_lock: req = plan_requests.get(request_id) if not req: return f"Error: Unknown plan request_id '{request_id}'" # 更新状态 with _tracker_lock: req["status"] = "approved" if approve else "rejected" # 把审批结果发回给队友 BUS.send( "lead", req["from"], feedback, "plan_approval_response", {"request_id": request_id, "approve": approve, "feedback": feedback}, ) return f"Plan {req['status']} for '{req['from']}'" def _check_shutdown_status(request_id: str) -> str: """查看关闭请求的当前状态""" with _tracker_lock: return json.dumps(shutdown_requests.get(request_id, {"error": "not found"})) # ============================================================ # Lead的工具分发表(12个工具,比s09多了3个协议工具) # ============================================================ TOOL_HANDLERS = { # 基础4个(和之前一样) "bash": lambda **kw: _run_bash(kw["command"]), "read_file": lambda **kw: _run_read(kw["path"], kw.get("limit")), "write_file": lambda **kw: _run_write(kw["path"], kw["content"]), "edit_file": lambda **kw: _run_edit(kw["path"], kw["old_text"], kw["new_text"]), # 团队管理3个(和s09一样) "spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]), "list_teammates": lambda **kw: TEAM.list_all(), "broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()), # 通信2个(和s09一样) "send_message": lambda **kw: BUS.send("lead", kw["to"], kw["content"], kw.get("msg_type", "message")), "read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2), # 【新增】协议3个 "shutdown_request": lambda **kw: handle_shutdown_request(kw["teammate"]), "shutdown_response": lambda **kw: _check_shutdown_status(kw.get("request_id", "")), "plan_approval": lambda **kw: handle_plan_review(kw["request_id"], kw["approve"], kw.get("feedback", "")), } # Lead的工具定义列表(12个) TOOLS = [ # 基础4个 {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "edit_file", "description": "Replace exact text in file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, # 团队管理3个 {"name": "spawn_teammate", "description": "Spawn a persistent teammate.", "input_schema": {"type": "object", "properties": {"name": {"type": "string"}, "role": {"type": "string"}, "prompt": {"type": "string"}}, "required": ["name", "role", "prompt"]}}, {"name": "list_teammates", "description": "List all teammates.", "input_schema": {"type": "object", "properties": {}}}, {"name": "broadcast", "description": "Send a message to all teammates.", "input_schema": {"type": "object", "properties": {"content": {"type": "string"}}, "required": ["content"]}}, # 通信2个 {"name": "send_message", "description": "Send a message to a teammate.", "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}}, {"name": "read_inbox", "description": "Read and drain the lead's inbox.", "input_schema": {"type": "object", "properties": {}}}, # 【新增】协议3个 {"name": "shutdown_request", "description": "Request a teammate to shut down gracefully. Returns a request_id for tracking.", "input_schema": {"type": "object", "properties": {"teammate": {"type": "string"}}, "required": ["teammate"]}}, {"name": "shutdown_response", "description": "Check the status of a shutdown request by request_id.", "input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}}, "required": ["request_id"]}}, {"name": "plan_approval", "description": "Approve or reject a teammate's plan. Provide request_id + approve + optional feedback.", "input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}, "approve": {"type": "boolean"}, "feedback": {"type": "string"}}, "required": ["request_id", "approve"]}}, ] # ============================================================ # Lead的Agent循环(和s09完全一样!) # ============================================================ def agent_loop(messages: list): while True: # 读Lead的信箱(队友的协议消息也会出现在这里) inbox = BUS.read_inbox("lead") if inbox: messages.append({ "role": "user", "content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>", }) response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000, ) messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": return results = [] for block in response.content: if block.type == "tool_use": handler = TOOL_HANDLERS.get(block.name) try: output = handler(**block.input) if handler else f"Unknown tool: {block.name}" except Exception as e: output = f"Error: {e}" print(f"> {block.name}:") print(str(output)[:200]) results.append({ "type": "tool_result", "tool_use_id": block.id, "content": str(output), }) messages.append({"role": "user", "content": results}) # ============================================================ # 主程序入口 # ============================================================ if __name__ == "__main__": history = [] while True: try: query = input("\033[36ms10 >> \033[0m") except (EOFError, KeyboardInterrupt): break if query.strip().lower() in ("q", "exit", ""): break if query.strip() == "/team": print(TEAM.list_all()) continue if query.strip() == "/inbox": print(json.dumps(BUS.read_inbox("lead"), indent=2)) continue history.append({"role": "user", "content": query}) agent_loop(history) response_content = history[-1]["content"] if isinstance(response_content, list): for block in response_content: if hasattr(block, "text"): print(block.text) print()

代码逐行拆解

模块一:状态追踪器(协议的"记忆")

整个协议系统的核心就是两个字典:

python
# 关闭请求追踪器 shutdown_requests = {} # 结构:{request_id: {"target": "alice", "status": "pending"}} # 方案审批追踪器 plan_requests = {} # 结构:{request_id: {"from": "alice", "plan": "重构数据库", "status": "pending"}} # 多线程安全锁 _tracker_lock = threading.Lock()

为什么需要锁? 因为Lead和队友在不同线程里运行,可能同时读写追踪器。加锁保证同一时间只有一个线程在操作。

为什么用字典而不用数据库? 因为简单!这个追踪器只在进程运行期间存在。如果要持久化,存成JSON文件就行(和s07的任务管理一样的思路)。

模块二:关闭协议的完整流程

第1步:Lead发起关闭请求

python
def handle_shutdown_request(teammate): # 生成唯一ID(取UUID前8位就够了) req_id = str(uuid.uuid4())[:8] # 比如 "a1b2c3d4" # 在追踪器里登记 shutdown_requests[req_id] = {"target": teammate, "status": "pending"} # 发送请求到队友的信箱 BUS.send("lead", teammate, "Please shut down gracefully.", "shutdown_request", {"request_id": req_id}) # 返回request_id给Lead,方便后续查状态

第2步:队友收到请求,决定同意还是拒绝

队友的信箱里会出现这条消息:

json
{"type": "shutdown_request", "from": "lead", "content": "Please shut down gracefully.", "request_id": "a1b2c3d4"}

模型看到后,会调用 shutdown_response 工具:

python
# 在队友的_exec方法里 if tool_name == "shutdown_response": req_id = args["request_id"] # "a1b2c3d4" —— 和请求对应 approve = args["approve"] # True 或 False # 更新Lead端的追踪器 shutdown_requests[req_id]["status"] = "approved" if approve else "rejected" # 把回复发到Lead的信箱 BUS.send(sender, "lead", reason, "shutdown_response", {"request_id": req_id, "approve": approve})

第3步:队友优雅退出

python
# 在_teammate_loop里 for block in response.content: if block.type == "tool_use": output = self._exec(name, block.name, block.input) # 如果队友同意了关闭,标记should_exit if block.name == "shutdown_response" and block.input.get("approve"): should_exit = True # 不是立即退出,而是完成当前轮次后退出 # 退出后设置状态 member["status"] = "shutdown" if should_exit else "idle"

关键设计:为什么不直接杀线程?

模块三:方案审批的完整流程

第1步:队友提交方案

python
# 在队友的_exec方法里 if tool_name == "plan_approval": plan_text = args["plan"] # 比如 "我想重构数据库" req_id = str(uuid.uuid4())[:8] # 生成唯一ID # 在追踪器里登记 plan_requests[req_id] = {"from": sender, "plan": plan_text, "status": "pending"} # 发到Lead的信箱 BUS.send(sender, "lead", plan_text, "plan_approval_response", {"request_id": req_id, "plan": plan_text}) return f"Plan submitted (request_id={req_id}). Waiting for lead approval." # 队友会看到这个返回值,知道要等待审批

第2步:Lead审批

Lead的信箱里会出现方案。Lead审批后调用 plan_approval 工具:

python
def handle_plan_review(request_id, approve, feedback=""): # 通过request_id找到方案 req = plan_requests[request_id] # 更新状态 req["status"] = "approved" if approve else "rejected" # 把审批结果发回给队友 BUS.send("lead", req["from"], feedback, "plan_approval_response", {"request_id": request_id, "approve": approve, "feedback": feedback})

第3步:队友收到审批结果

队友的信箱里会出现:

json
{"type": "plan_approval_response", "from": "lead", "approve": true, "feedback": "可以,但要注意备份"}

队友的模型看到后,就知道方案被批准了,可以开始执行。

模块四:两个协议的共同模式

text
关闭协议 方案审批 发起方: Lead 队友 接收方: 队友 Lead request_id:Lead生成 队友生成 追踪器: shutdown_requests plan_requests 状态流转: pending -> approved/rejected(完全一样!)

两个协议的底层模式完全相同:

  1. 发起方生成 request_id
  2. 发起方在追踪器里记录(status: pending)
  3. 接收方通过信箱收到请求
  4. 接收方回复时带上同一个 request_id
  5. 追踪器状态更新为 approved 或 rejected

这就是"协议"的本质:约定好的消息格式 + 状态流转规则。


运行效果

示例1:关闭协议

text
s10 >> 创建一个队友alice负责写代码 > spawn_teammate: Spawned 'alice' (role: coder) s10 >> 请关闭alice > shutdown_request: Shutdown request a1b2c3d4 sent to 'alice' (status: pending) [alice] shutdown_response: Shutdown approved > read_inbox: [{"type": "shutdown_response", "from": "alice", "request_id": "a1b2c3d4", "approve": true}] alice已经同意关闭,正在优雅退出。 s10 >> /team Team: default alice (coder): shutdown

示例2:方案审批

text
s10 >> 创建一个队友bob负责重构项目 > spawn_teammate: Spawned 'bob' (role: refactorer) [bob] plan_approval: Plan submitted (request_id=x1y2z3). Waiting for lead approval. (bob在开始工作前自动提交了方案) > read_inbox: [{"type": "plan_approval_response", "from": "bob", "request_id": "x1y2z3", "plan": "我计划将所有数据库查询迁移到ORM模式"}] s10 >> 批准bob的方案 > plan_approval: Plan approved for 'bob' [bob] bash: (开始执行批准的方案...)

示例3:拒绝方案

text
s10 >> 驳回bob的方案,让他换个思路 > plan_approval: Plan rejected for 'bob' [bob] plan_approval: Plan submitted (request_id=a2b3c4). Waiting for lead approval. (bob修改方案后重新提交了)

关键收获

  1. 协议 = 约定 + 追踪:约定好消息格式(request_id + approve),追踪状态变化(pending -> approved/rejected)
  2. request_id 关联模式:请求和回复通过同一个ID对应,这是所有协议的基础模式
  3. 状态机思维:每个请求只有三种状态(pending / approved / rejected),简单清晰
  4. 优雅关闭:不是直接杀线程,而是发请求、等回复、完成手头工作再退出
  5. 方案审批:重大决策前先审批,防止队友"擅自做主"
  6. 线程安全:多线程操作共享数据时要加锁(_tracker_lock)
  7. agent_loop 还是没变:协议只是多了几种消息类型和工具,核心循环完全不动

下一课预告

到目前为止,我们的Agent系统已经具备了:

接下来的课程会探索更高级的主题,比如如何让Agent团队处理更复杂的真实场景。

上一课 09. Agent 团队