一句话总结:一个Agent不够用了,我们需要一个"团队"。每个队友是独立运行的Agent,有自己的线程、对话历史和角色,队友之间通过"信箱"(JSONL文件)互相发消息。
一句话总结:一个Agent不够用了,我们需要一个"团队"。每个队友是独立运行的Agent,有自己的线程、对话历史和角色,队友之间通过"信箱"(JSONL文件)互相发消息。
前面几课的Agent都是"单打独斗"——一个模型处理所有事情。但如果你让它同时写前端、写后端、跑测试,它只能一件一件来,效率很低。
类比: 你一个人开公司,既当CEO又当程序员又当设计师。活少的时候还行,活多了根本忙不过来。
之前(单Agent): 现在(Agent团队):
你 --> Lead Agent 你 --> Lead Agent(老板)
| |
v +---> alice(前端开发)
一个人干所有的活 +---> bob(后端开发)
+---> charlie(测试)
每个人都是独立的Agent!每个队友(Teammate)都是一个独立的Agent,有自己的:
text子Agent(s04): 队友(s09): spawn --> 执行 --> 返回结果 --> 销毁 spawn --> 干活 --> 空闲 --> 干活 --> ... --> 关闭 一次性的,用完就扔 持久的,一直活着,随时可以交流 像临时工 像正式员工 没有自己的通信渠道 有自己的"信箱"
队友之间怎么聊天?答案是文件信箱:
.team/inbox/ 目录
+------------------+
| alice.jsonl | <-- alice的信箱
| bob.jsonl | <-- bob的信箱
| lead.jsonl | <-- Lead的信箱
+------------------+
发消息 = 往对方的.jsonl文件里追加一行JSON
收消息 = 读取自己的.jsonl文件,读完就清空(取信)
举个例子,Lead给alice发消息:
--> 打开 alice.jsonl
--> 追加一行:{"type":"message","from":"lead","content":"去写首页"}
--> 关闭文件
alice读信箱:
--> 读取 alice.jsonl 所有行
--> 解析每行JSON,得到消息列表
--> 清空 alice.jsonl(信取走了)
--> 返回消息列表类比: 就像公司里每个人桌上有一个收件箱,别人给你发文件就放进去,你拿走之后收件箱就空了。
用户输入
|
v
+-------------------+
| Lead Agent | Lead有9个工具:
| (主Agent/老板) | - bash, read_file, write_file, edit_file(基础4个)
+-------------------+ - spawn_teammate(创建队友)
| - list_teammates(查看团队)
| spawn_teammate - send_message(发消息)
| - read_inbox(读信箱)
v - broadcast(群发)
+---+-------+-------+
| | |
v v v
Thread A Thread B Thread C
+--------+ +--------+ +--------+
| alice | | bob | | charlie|
| 前端 | | 后端 | | 测试 |
+--------+ +--------+ +--------+
| | |
v v v
各自的 各自的 各自的
信箱 信箱 信箱
alice.jsonl bob.jsonl charlie.jsonl
消息流向:
Lead --send_message--> alice.jsonl --> alice读取
alice --send_message--> lead.jsonl --> Lead读取
Lead --broadcast--> 所有人的.jsonl --> 所有人读取第7课的结构: 第9课的结构:
+-- 初始化配置 +-- 初始化配置
+-- System Prompt +-- System Prompt(加了团队管理说明)
+-- safe_path +-- safe_path
+-- TaskManager +-- MessageBus(新增!信箱系统)
+-- TOOL_HANDLERS(9个工具) +-- TeammateManager(新增!团队管理)
+-- TOOLS(9个工具) +-- TOOL_HANDLERS(9个工具,换了4个)
+-- agent_loop(没变) +-- TOOLS(9个工具)
+-- main +-- agent_loop(几乎没变!只加了读信箱)
+-- main关键发现: agent_loop的核心逻辑(调API -> 处理工具 -> 循环)完全没变!我们只是:
MessageBus 类来处理消息收发TeammateManager 类来管理队友源代码文件:agents/s09_agent_teams.py
python#!/usr/bin/env python3 """ s09_agent_teams.py - Agent团队 核心思想:多个Agent组成团队,每个队友在自己的线程里运行, 通过JSONL文件信箱互相通信。 子Agent(s04): 创建 -> 执行 -> 返回结果 -> 销毁 队友(s09): 创建 -> 干活 -> 空闲 -> 干活 -> ... -> 关闭 """ import json import os import subprocess import threading import time from pathlib import Path from anthropic import Anthropic from dotenv import load_dotenv # ============================================================ # 初始化配置(和之前几课一样) # ============================================================ load_dotenv(override=True) if os.getenv("ANTHROPIC_BASE_URL"): os.environ.pop("ANTHROPIC_AUTH_TOKEN", None) WORKDIR = Path.cwd() client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL")) MODEL = os.environ["MODEL_ID"] # 团队相关的目录 TEAM_DIR = WORKDIR / ".team" # 团队配置目录 INBOX_DIR = TEAM_DIR / "inbox" # 信箱目录,每个队友一个.jsonl文件 # Lead的系统提示词 SYSTEM = f"You are a team lead at {WORKDIR}. Spawn teammates and communicate via inboxes." # 支持的消息类型(5种) # message: 普通消息 # broadcast: 广播消息(群发) # shutdown_request: 关闭请求(第10课用) # shutdown_response: 关闭回复(第10课用) # plan_approval_response: 方案审批回复(第10课用) VALID_MSG_TYPES = { "message", "broadcast", "shutdown_request", "shutdown_response", "plan_approval_response", } # ============================================================ # MessageBus:信箱系统 # 核心机制:每个队友一个JSONL文件,发消息=追加一行,收消息=读取并清空 # ============================================================ class MessageBus: def __init__(self, inbox_dir: Path): self.dir = inbox_dir # 创建信箱目录 self.dir.mkdir(parents=True, exist_ok=True) def send(self, sender: str, to: str, content: str, msg_type: str = "message", extra: dict = None) -> str: """发送消息:往收件人的JSONL文件里追加一行""" # 检查消息类型是否合法 if msg_type not in VALID_MSG_TYPES: return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}" # 构造消息信封(统一格式,方便后续扩展) msg = { "type": msg_type, # 消息类型 "from": sender, # 发送者 "content": content, # 消息内容 "timestamp": time.time(),# 时间戳 } # 如果有额外字段(比如request_id),合并进去 if extra: msg.update(extra) # 关键操作:追加写入收件人的信箱文件 inbox_path = self.dir / f"{to}.jsonl" with open(inbox_path, "a") as f: f.write(json.dumps(msg) + "\n") return f"Sent {msg_type} to {to}" def read_inbox(self, name: str) -> list: """读取信箱:读取所有消息,然后清空信箱(取信模式)""" inbox_path = self.dir / f"{name}.jsonl" if not inbox_path.exists(): return [] # 读取所有行,每行解析为一条消息 messages = [] for line in inbox_path.read_text().strip().splitlines(): if line: messages.append(json.loads(line)) # 重点:读完就清空!这是"取信"模式,不是"查看"模式 # 这样每条消息只会被处理一次 inbox_path.write_text("") return messages def broadcast(self, sender: str, content: str, teammates: list) -> str: """广播:给所有队友发消息(除了自己)""" count = 0 for name in teammates: if name != sender: # 不给自己发 self.send(sender, name, content, "broadcast") count += 1 return f"Broadcast to {count} teammates" # 全局信箱实例 BUS = MessageBus(INBOX_DIR) # ============================================================ # TeammateManager:队友管理器 # 负责创建队友、管理团队配置、运行队友的Agent循环 # ============================================================ class TeammateManager: def __init__(self, team_dir: Path): self.dir = team_dir self.dir.mkdir(exist_ok=True) self.config_path = self.dir / "config.json" # 团队配置文件 self.config = self._load_config() # 加载或初始化配置 self.threads = {} # 存放队友线程的字典 def _load_config(self) -> dict: """加载团队配置,没有就创建一个空的""" if self.config_path.exists(): return json.loads(self.config_path.read_text()) return {"team_name": "default", "members": []} def _save_config(self): """保存团队配置到文件""" self.config_path.write_text(json.dumps(self.config, indent=2)) def _find_member(self, name: str) -> dict: """按名字查找队友""" for m in self.config["members"]: if m["name"] == name: return m return None def spawn(self, name: str, role: str, prompt: str) -> str: """创建一个新队友(或重新激活已有的队友)""" member = self._find_member(name) if member: # 队友已存在,检查状态 if member["status"] not in ("idle", "shutdown"): return f"Error: '{name}' is currently {member['status']}" member["status"] = "working" member["role"] = role else: # 新队友,加入团队 member = {"name": name, "role": role, "status": "working"} self.config["members"].append(member) self._save_config() # 关键:每个队友在自己的线程里运行! # daemon=True 表示主线程退出时子线程也跟着退出 thread = threading.Thread( target=self._teammate_loop, # 队友的Agent循环 args=(name, role, prompt), daemon=True, ) self.threads[name] = thread thread.start() return f"Spawned '{name}' (role: {role})" def _teammate_loop(self, name: str, role: str, prompt: str): """ 队友的Agent循环 —— 这是每个队友独立运行的核心! 和Lead的agent_loop结构一样:调API -> 处理工具 -> 循环 区别是:队友有自己的系统提示词、工具集和消息列表 """ # 队友自己的系统提示词,告诉它自己是谁 sys_prompt = ( f"You are '{name}', role: {role}, at {WORKDIR}. " f"Use send_message to communicate. Complete your task." ) # 队友自己的消息列表(Lead看不到这个!) # 这就是队友的"私人记忆",只有信箱里的消息是共享的 messages = [{"role": "user", "content": prompt}] tools = self._teammate_tools() # 最多循环50轮(防止无限循环) for _ in range(50): # 先检查信箱,有新消息就加到对话里 inbox = BUS.read_inbox(name) for msg in inbox: # 信箱里的消息变成"用户消息",这样模型就能看到了 messages.append({"role": "user", "content": json.dumps(msg)}) try: # 调用API(和Lead一样的模式) response = client.messages.create( model=MODEL, system=sys_prompt, messages=messages, tools=tools, max_tokens=8000, ) except Exception: break messages.append({"role": "assistant", "content": response.content}) # 没有工具调用就结束(任务完成了) if response.stop_reason != "tool_use": break # 处理工具调用(和Lead一样的模式) results = [] for block in response.content: if block.type == "tool_use": # 关键:工具执行时自动注入sender身份 # 这样send_message就知道是谁发的 output = self._exec(name, block.name, block.input) print(f" [{name}] {block.name}: {str(output)[:120]}") results.append({ "type": "tool_result", "tool_use_id": block.id, "content": str(output), }) messages.append({"role": "user", "content": results}) # 循环结束,把队友状态设为idle member = self._find_member(name) if member and member["status"] != "shutdown": member["status"] = "idle" self._save_config() def _exec(self, sender: str, tool_name: str, args: dict) -> str: """ 队友的工具执行器 重点:send_message时自动注入sender身份,队友不需要手动填自己的名字 """ if tool_name == "bash": return _run_bash(args["command"]) if tool_name == "read_file": return _run_read(args["path"]) if tool_name == "write_file": return _run_write(args["path"], args["content"]) if tool_name == "edit_file": return _run_edit(args["path"], args["old_text"], args["new_text"]) if tool_name == "send_message": # sender身份由系统注入,不是模型自己填的 return BUS.send(sender, args["to"], args["content"], args.get("msg_type", "message")) if tool_name == "read_inbox": return json.dumps(BUS.read_inbox(sender), indent=2) return f"Unknown tool: {tool_name}" def _teammate_tools(self) -> list: """队友可用的工具列表(6个)""" return [ # 基础4个工具(和s02一样) {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "edit_file", "description": "Replace exact text in file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, # 通信工具(队友专用) {"name": "send_message", "description": "Send message to a teammate.", "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}}, {"name": "read_inbox", "description": "Read and drain your inbox.", "input_schema": {"type": "object", "properties": {}}}, ] def list_all(self) -> str: """列出所有队友的状态""" if not self.config["members"]: return "No teammates." lines = [f"Team: {self.config['team_name']}"] for m in self.config["members"]: lines.append(f" {m['name']} ({m['role']}): {m['status']}") return "\n".join(lines) def member_names(self) -> list: """返回所有队友的名字列表""" return [m["name"] for m in self.config["members"]] # 全局团队管理器实例 TEAM = TeammateManager(TEAM_DIR) # ============================================================ # 基础工具实现(和s02完全一样,不赘述) # ============================================================ def _safe_path(p: str) -> Path: path = (WORKDIR / p).resolve() if not path.is_relative_to(WORKDIR): raise ValueError(f"Path escapes workspace: {p}") return path def _run_bash(command: str) -> str: dangerous = ["rm -rf /", "sudo", "shutdown", "reboot"] if any(d in command for d in dangerous): return "Error: Dangerous command blocked" try: r = subprocess.run(command, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=120) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" def _run_read(path: str, limit: int = None) -> str: try: lines = _safe_path(path).read_text().splitlines() if limit and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more)"] return "\n".join(lines)[:50000] except Exception as e: return f"Error: {e}" def _run_write(path: str, content: str) -> str: try: fp = _safe_path(path) fp.parent.mkdir(parents=True, exist_ok=True) fp.write_text(content) return f"Wrote {len(content)} bytes" except Exception as e: return f"Error: {e}" def _run_edit(path: str, old_text: str, new_text: str) -> str: try: fp = _safe_path(path) c = fp.read_text() if old_text not in c: return f"Error: Text not found in {path}" fp.write_text(c.replace(old_text, new_text, 1)) return f"Edited {path}" except Exception as e: return f"Error: {e}" # ============================================================ # Lead的工具分发表(9个工具) # 比队友多了3个管理工具:spawn_teammate, list_teammates, broadcast # ============================================================ TOOL_HANDLERS = { # 基础4个(和之前一样) "bash": lambda **kw: _run_bash(kw["command"]), "read_file": lambda **kw: _run_read(kw["path"], kw.get("limit")), "write_file": lambda **kw: _run_write(kw["path"], kw["content"]), "edit_file": lambda **kw: _run_edit(kw["path"], kw["old_text"], kw["new_text"]), # 团队管理3个(Lead专用) "spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]), "list_teammates": lambda **kw: TEAM.list_all(), "broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()), # 通信2个(Lead也有信箱) "send_message": lambda **kw: BUS.send("lead", kw["to"], kw["content"], kw.get("msg_type", "message")), "read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2), } # Lead的工具定义列表(给API看的) TOOLS = [ {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "edit_file", "description": "Replace exact text in file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, {"name": "spawn_teammate", "description": "Spawn a persistent teammate that runs in its own thread.", "input_schema": {"type": "object", "properties": {"name": {"type": "string"}, "role": {"type": "string"}, "prompt": {"type": "string"}}, "required": ["name", "role", "prompt"]}}, {"name": "list_teammates", "description": "List all teammates with name, role, status.", "input_schema": {"type": "object", "properties": {}}}, {"name": "send_message", "description": "Send a message to a teammate's inbox.", "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}}, {"name": "read_inbox", "description": "Read and drain the lead's inbox.", "input_schema": {"type": "object", "properties": {}}}, {"name": "broadcast", "description": "Send a message to all teammates.", "input_schema": {"type": "object", "properties": {"content": {"type": "string"}}, "required": ["content"]}}, ] # ============================================================ # Lead的Agent循环 # 和之前几乎一样!只是在每轮开头多了"读信箱"的操作 # ============================================================ def agent_loop(messages: list): while True: # 【新增】每轮开始前,先检查Lead的信箱 # 队友发来的消息会出现在这里 inbox = BUS.read_inbox("lead") if inbox: # 把信箱里的消息作为"用户消息"插入对话 # 这样模型下一次推理就能看到队友说了什么 messages.append({ "role": "user", "content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>", }) # 调用API(和之前完全一样) response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000, ) messages.append({"role": "assistant", "content": response.content}) # 没有工具调用就结束 if response.stop_reason != "tool_use": return # 处理工具调用(和之前完全一样) results = [] for block in response.content: if block.type == "tool_use": handler = TOOL_HANDLERS.get(block.name) try: output = handler(**block.input) if handler else f"Unknown tool: {block.name}" except Exception as e: output = f"Error: {e}" print(f"> {block.name}:") print(str(output)[:200]) results.append({ "type": "tool_result", "tool_use_id": block.id, "content": str(output), }) messages.append({"role": "user", "content": results}) # ============================================================ # 主程序入口 # ============================================================ if __name__ == "__main__": history = [] while True: try: query = input("\033[36ms09 >> \033[0m") except (EOFError, KeyboardInterrupt): break if query.strip().lower() in ("q", "exit", ""): break # 本地快捷命令,不经过模型 if query.strip() == "/team": print(TEAM.list_all()) continue if query.strip() == "/inbox": print(json.dumps(BUS.read_inbox("lead"), indent=2)) continue history.append({"role": "user", "content": query}) agent_loop(history) # 打印模型的文字回复 response_content = history[-1]["content"] if isinstance(response_content, list): for block in response_content: if hasattr(block, "text"): print(block.text) print()
这是整个团队通信的基础设施,大概40行代码搞定了一个完整的消息系统。
核心思路: 每个人一个JSONL文件,发消息就追加一行,收消息就全部读出来然后清空。
pythonclass MessageBus: def send(self, sender, to, content, msg_type="message", extra=None): # 1. 构造统一格式的消息信封 msg = {"type": msg_type, "from": sender, "content": content, "timestamp": time.time()} # 2. 追加写入收件人的文件(append模式,不会覆盖已有消息) with open(self.dir / f"{to}.jsonl", "a") as f: f.write(json.dumps(msg) + "\n") def read_inbox(self, name): # 1. 读取文件所有行 # 2. 清空文件(取信模式) # 3. 返回消息列表
为什么用JSONL而不是JSON?
为什么"读完就清空"?
这是团队管理的核心,负责创建队友、运行队友的Agent循环。
spawn方法:创建队友
pythondef spawn(self, name, role, prompt): # 1. 检查队友是否已存在 # 2. 更新或创建配置 # 3. 关键:启动一个新线程来运行队友的Agent循环 thread = threading.Thread( target=self._teammate_loop, args=(name, role, prompt), daemon=True, # 主线程退出时自动结束 ) thread.start()
_teammate_loop方法:队友自己的Agent循环
这个方法是每个队友的"大脑",运行在独立线程中:
pythondef _teammate_loop(self, name, role, prompt): # 1. 队友有自己的系统提示词 sys_prompt = f"You are '{name}', role: {role}..." # 2. 队友有自己的消息列表(私有的,Lead看不到) messages = [{"role": "user", "content": prompt}] # 3. 标准Agent循环(最多50轮) for _ in range(50): # 3a. 先读信箱,把新消息加入对话 inbox = BUS.read_inbox(name) for msg in inbox: messages.append({"role": "user", "content": json.dumps(msg)}) # 3b. 调用API response = client.messages.create(...) # 3c. 没有工具调用就结束 if response.stop_reason != "tool_use": break # 3d. 执行工具 for block in response.content: if block.type == "tool_use": output = self._exec(name, block.name, block.input) # 注意:sender身份是由系统注入的,不是模型自己填的 # 4. 循环结束,状态改为idle
关键设计:sender身份注入
pythondef _exec(self, sender, tool_name, args): if tool_name == "send_message": # sender是系统传入的,不是模型填的 # 这样模型不需要"记住自己是谁",减少出错 return BUS.send(sender, args["to"], args["content"], ...)
Lead的循环和之前几课几乎一模一样,唯一的区别是每轮开始时多了"读信箱":
pythondef agent_loop(messages): while True: # 【唯一新增的部分】读Lead的信箱 inbox = BUS.read_inbox("lead") if inbox: messages.append({ "role": "user", "content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>", }) # 以下和之前完全一样... response = client.messages.create(...)
text.team/ config.json <-- 团队配置 inbox/ alice.jsonl <-- alice的信箱 bob.jsonl <-- bob的信箱 lead.jsonl <-- Lead的信箱
config.json 的内容:
json{ "team_name": "default", "members": [ {"name": "alice", "role": "coder", "status": "idle"}, {"name": "bob", "role": "tester", "status": "working"} ] }
texts09 >> 创建两个队友:alice负责写代码,bob负责写测试 > spawn_teammate: Spawned 'alice' (role: coder) > spawn_teammate: Spawned 'bob' (role: tester) 好的,我已经创建了两个队友: - alice(角色:coder)—— 负责写代码 - bob(角色:tester)—— 负责写测试
texts09 >> /team Team: default alice (coder): working bob (tester): working
texts09 >> 告诉alice写一个hello.py > send_message: Sent message to alice [alice] write_file: Wrote 45 bytes [alice] send_message: Sent message to lead (稍等片刻,alice完成后会往Lead的信箱发消息) > read_inbox: [{"type": "message", "from": "alice", "content": "已完成hello.py的编写"}]
texts09 >> 通知所有人下班 > broadcast: Broadcast to 2 teammates
队友能互相发消息了,但还缺"规矩":
第10课我们会加入关闭协议和方案审批协议——让团队协作有章可循。