一句话总结:之前队友都是被动的——老板分配才干活。现在让队友主动扫描任务板、自己认领任务。从"老板指派"变成"看板管理"——墙上贴着待办事项,谁有空自己揭一张做。
一句话总结:之前队友都是被动的——老板分配才干活。现在让队友主动扫描任务板、自己认领任务。从"老板指派"变成"看板管理"——墙上贴着待办事项,谁有空自己揭一张做。
scan_unclaimed_tasks() 扫描任务板上没人认领的任务claim_task() 原子认领:用锁防止两个人抢同一个任务上一课我们的Lead能指挥队友了,但有一个大问题:
每个任务都要Lead亲自分配
textLead: "alice你去写登录页面" Lead: "bob你去修那个bug" Lead: "alice你做完了?那去写注册页面" Lead: "bob你也做完了?那去写测试" ...Lead累死了,变成了人肉调度器
类比: 这就像工厂里的"传统管理"——厂长亲自给每个工人分配任务。工人做完了就干等着,直到厂长再来派活。厂长一忙就成了瓶颈。
text传统模式(老板指派): 看板模式(自己找活): Lead: "alice做任务1" Lead: 往任务板上贴任务1、2、3 Lead: "bob做任务2" alice: "我有空了,看看板... 任务1没人做,我来!" Lead: "alice做完了?做任务3" bob: "我也有空了... 任务2没人做,我来!" alice: "任务1做完了,看看板... 任务3没人做,我来!" 结果:Lead只管贴任务,不用管分配 结果:队友自驱动,Lead省心多了
机制1:IDLE/WORK 模式切换
队友的一生就是在两个状态间反复横跳:
WORK模式:正在干活(调API、写代码、读文件...)
|
| 活干完了,调用 idle 工具
v
IDLE模式:没活干了,每5秒看一次任务板
|
| 发现新任务 / 收到新消息
v
WORK模式:又有活干了!
|
| 60秒内没找到任何活
v
关机:这个队友退出(不白白占资源)机制2:扫描 + 认领(scan + claim)
text任务板(.tasks/目录): task_1.json: {status: "pending", owner: null} <-- 没人认领,可以抢 task_2.json: {status: "in_progress", owner: "alice"} <-- alice已经认领了 task_3.json: {status: "pending", owner: null} <-- 没人认领,可以抢 alice空闲了 --> scan_unclaimed_tasks() --> 发现task_1和task_3 --> claim_task(1, "alice") --> 成功!task_1归alice了 bob也空闲了 --> scan_unclaimed_tasks() --> 发现task_3(task_1已经被抢了) --> claim_task(3, "bob") --> 成功!task_3归bob了
机制3:身份重注入
text问题:Agent聊了很久后,对话被压缩,可能"忘了自己是谁" 解决:在恢复工作前,往对话开头插入一条身份提醒: messages = [ {"role": "user", "content": "你是'coder',角色:backend,团队:my-team"}, {"role": "assistant", "content": "我是coder,继续工作。"}, ... 后续对话 ... ] 就像醒来先看看自己的工牌——"哦对,我是后端工程师"
+-------------------+
| Lead 往任务板 |
| 贴任务1,2,3 |
+--------+----------+
|
+-------------+-------------+
| |
+-----v-----+ +-----v-----+
| alice线程 | | bob线程 |
+-----+-----+ +-----+-----+
| |
[收到初始任务] [收到初始任务]
| |
+-----v-----+ +-----v-----+
| WORK模式 | | WORK模式 |
| 调LLM干活 | | 调LLM干活 |
+-----+-----+ +-----+-----+
| |
[调用idle工具] [调用idle工具]
| |
+-----v-----+ +-----v-----+
| IDLE模式 | | IDLE模式 |
| 每5秒轮询 | | 每5秒轮询 |
+-----+-----+ +-----+-----+
| |
[扫描任务板] [扫描任务板]
[发现task_2] [发现task_3]
[claim成功!] [claim成功!]
| |
+-----v-----+ +-----v-----+
| WORK模式 | | WORK模式 |
| 继续干活 | | 继续干活 |
+-----+-----+ +-----+-----+
| |
[60秒没新活] [60秒没新活]
| |
+-----v-----+ +-----v-----+
| 关机退出 | | 关机退出 |
+-----------+ +-----------+| 对比项 | 第10课(Team Protocols) | 第11课(Autonomous Agents) |
|---|---|---|
| 核心主题 | 队友之间的沟通规矩 | 队友自己找活干 |
| 任务分配 | Lead发消息指派 | 队友自己扫描任务板认领 |
| 队友空闲时 | 等Lead发消息 | 主动轮询任务板 |
| 新增机制 | 关闭协议、方案审批 | IDLE/WORK切换、scan/claim、身份重注入 |
| Lead的角色 | 指挥官(一个个指派) | 任务发布者(贴任务就行) |
| 队友的状态 | working / shutdown | working / idle / shutdown(多了idle) |
| 并发安全 | 不需要(Lead单线程分配) | 需要_claim_lock(多线程抢任务) |
| agent_loop | 不变 | 不变(核心循环依然稳定) |
一句话: 第10课让队友有了"规矩",第11课让队友有了"主动性"。
python#!/usr/bin/env python3 """ s11_autonomous_agents.py - 自治Agent 核心思路: 队友不再被动等Lead分配任务,而是自己扫描任务板、认领任务。 干完活进入IDLE模式,每5秒看一次任务板,有活就接着干, 60秒没活就自动关机,不浪费资源。 队友的生命周期: +-------+ | 创建 | +---+---+ | v +-------+ 有tool调用 +-------+ | 干活 | <----------- | LLM | +---+---+ +-------+ | | LLM说"我做完了"(stop_reason不是tool_use) v +--------+ | 空闲 | 每5秒轮询,最多等60秒 +---+----+ | +---> 检查收件箱 -> 有消息? -> 回到干活模式 | +---> 扫描.tasks/ -> 有未认领任务? -> 认领 -> 回到干活模式 | +---> 超时(60秒) -> 关机 身份重注入: 对话太长被压缩后,Agent可能"忘了自己是谁" 恢复工作前,在对话开头插入身份提醒: messages = [身份提醒, ...剩余对话...] "你是'coder',角色:backend,团队:my-team" """ import json import os import subprocess import threading import time import uuid from pathlib import Path from anthropic import Anthropic from dotenv import load_dotenv # ========== 环境配置 ========== load_dotenv(override=True) if os.getenv("ANTHROPIC_BASE_URL"): # 如果用了第三方兼容API,清除可能冲突的认证变量 os.environ.pop("ANTHROPIC_AUTH_TOKEN", None) WORKDIR = Path.cwd() # 工作目录(项目根目录) client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL")) # API客户端 MODEL = os.environ["MODEL_ID"] # 使用的模型ID # ========== 目录结构 ========== TEAM_DIR = WORKDIR / ".team" # 团队配置目录 INBOX_DIR = TEAM_DIR / "inbox" # 消息收件箱目录 TASKS_DIR = WORKDIR / ".tasks" # 【新增】任务板目录,存放所有任务JSON # ========== IDLE模式参数 ========== POLL_INTERVAL = 5 # 空闲时每5秒轮询一次 IDLE_TIMEOUT = 60 # 空闲超过60秒就自动关机 # ========== 系统提示词 ========== # 告诉Lead:你的队友是自治的,他们会自己找活干 SYSTEM = f"You are a team lead at {WORKDIR}. Teammates are autonomous -- they find work themselves." # ========== 合法的消息类型 ========== VALID_MSG_TYPES = { "message", # 普通消息 "broadcast", # 广播消息 "shutdown_request", # 关闭请求 "shutdown_response", # 关闭回复 "plan_approval_response", # 方案审批回复 } # ========== 请求追踪器 ========== shutdown_requests = {} # 关闭请求的追踪:{request_id: {target, status}} plan_requests = {} # 方案审批的追踪:{request_id: {from, plan, status}} _tracker_lock = threading.Lock() # 追踪器的锁(多线程安全) # 【核心新增】任务认领锁 # 为什么需要这个锁?因为现在多个队友可能同时扫描到同一个未认领任务, # 如果不加锁,两个队友可能同时读到"没人认领",然后都写入自己的名字, # 导致一个任务被两个人"认领"了。加锁保证同一时间只有一个人能认领。 _claim_lock = threading.Lock() # ============================================================ # MessageBus:消息总线(和之前一样,每个队友一个JSONL文件当收件箱) # ============================================================ class MessageBus: def __init__(self, inbox_dir: Path): self.dir = inbox_dir self.dir.mkdir(parents=True, exist_ok=True) def send(self, sender: str, to: str, content: str, msg_type: str = "message", extra: dict = None) -> str: """发消息给某个队友(写到他的收件箱文件里)""" if msg_type not in VALID_MSG_TYPES: return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}" msg = { "type": msg_type, "from": sender, "content": content, "timestamp": time.time(), } if extra: msg.update(extra) # 往目标队友的收件箱文件追加一行JSON inbox_path = self.dir / f"{to}.jsonl" with open(inbox_path, "a") as f: f.write(json.dumps(msg) + "\n") return f"Sent {msg_type} to {to}" def read_inbox(self, name: str) -> list: """读取并清空自己的收件箱(读完就清空,防止重复处理)""" inbox_path = self.dir / f"{name}.jsonl" if not inbox_path.exists(): return [] messages = [] for line in inbox_path.read_text().strip().splitlines(): if line: messages.append(json.loads(line)) inbox_path.write_text("") # 读完清空 return messages def broadcast(self, sender: str, content: str, teammates: list) -> str: """群发消息给所有队友(除了自己)""" count = 0 for name in teammates: if name != sender: self.send(sender, name, content, "broadcast") count += 1 return f"Broadcast to {count} teammates" BUS = MessageBus(INBOX_DIR) # ============================================================ # 【核心新增】任务板扫描和认领 # ============================================================ def scan_unclaimed_tasks() -> list: """ 扫描任务板,找出所有可以认领的任务。 一个任务可以被认领,必须同时满足三个条件: 1. status == "pending"(待处理) 2. owner == null(没人认领) 3. blockedBy == null(没被其他任务阻塞) 就像看板上的便利贴:只有贴在"待办"列、没人名字、没标"等待XX完成"的, 你才能揭下来做。 """ TASKS_DIR.mkdir(exist_ok=True) unclaimed = [] for f in sorted(TASKS_DIR.glob("task_*.json")): task = json.loads(f.read_text()) # 三个条件缺一不可 if (task.get("status") == "pending" and not task.get("owner") and not task.get("blockedBy")): unclaimed.append(task) return unclaimed def claim_task(task_id: int, owner: str) -> str: """ 认领一个任务(原子操作,用锁保证不会被抢)。 为什么要用锁?想象两个队友同时发现task_1没人认领: alice读取task_1 -> status=pending, owner=null -> "太好了,我来!" bob也读取task_1 -> status=pending, owner=null -> "太好了,我也来!" alice写入owner=alice bob写入owner=bob --> alice的认领被覆盖了! 加了_claim_lock后: alice获得锁 -> 读取 -> 写入owner=alice -> 释放锁 bob获得锁 -> 读取 -> 发现owner=alice -> "已经被认领了" -> 释放锁 """ with _claim_lock: path = TASKS_DIR / f"task_{task_id}.json" if not path.exists(): return f"Error: Task {task_id} not found" task = json.loads(path.read_text()) # 检查是否已经被别人认领 if task.get("owner"): existing_owner = task.get("owner") or "someone else" return f"Error: Task {task_id} has already been claimed by {existing_owner}" # 检查状态是否是pending if task.get("status") != "pending": status = task.get("status") return f"Error: Task {task_id} cannot be claimed because its status is '{status}'" # 检查是否被其他任务阻塞 if task.get("blockedBy"): return f"Error: Task {task_id} is blocked by other task(s) and cannot be claimed yet" # 一切OK,认领! task["owner"] = owner task["status"] = "in_progress" path.write_text(json.dumps(task, indent=2)) return f"Claimed task #{task_id} for {owner}" # ============================================================ # 【核心新增】身份重注入 # ============================================================ def make_identity_block(name: str, role: str, team_name: str) -> dict: """ 生成一条"身份提醒"消息。 为什么需要这个? Agent聊了很久后,对话可能被压缩(太长了要省token), 压缩后Agent可能"忘了"自己是谁、什么角色、属于哪个团队。 这就像上班第一天,你会看看自己的工牌确认一下身份。 长时间休假回来也会重新看看自己的岗位职责。 返回一条user消息,告诉Agent它的身份信息。 """ return { "role": "user", "content": f"<identity>You are '{name}', role: {role}, team: {team_name}. Continue your work.</identity>", } # ============================================================ # TeammateManager:队友管理器(加入了自治能力) # ============================================================ class TeammateManager: def __init__(self, team_dir: Path): self.dir = team_dir self.dir.mkdir(exist_ok=True) self.config_path = self.dir / "config.json" self.config = self._load_config() self.threads = {} # 每个队友一个线程 def _load_config(self) -> dict: """加载团队配置文件""" if self.config_path.exists(): return json.loads(self.config_path.read_text()) return {"team_name": "default", "members": []} def _save_config(self): """保存团队配置文件""" self.config_path.write_text(json.dumps(self.config, indent=2)) def _find_member(self, name: str) -> dict: """按名字查找队友""" for m in self.config["members"]: if m["name"] == name: return m return None def _set_status(self, name: str, status: str): """更新队友状态(working/idle/shutdown)""" member = self._find_member(name) if member: member["status"] = status self._save_config() def spawn(self, name: str, role: str, prompt: str) -> str: """ 创建并启动一个自治队友。 和之前的区别:队友现在会自己进入IDLE模式、自己扫描任务板。 Lead只需要spawn一次,队友就会持续工作,直到没活可干。 """ member = self._find_member(name) if member: # 如果队友已存在,检查状态 if member["status"] not in ("idle", "shutdown"): return f"Error: '{name}' is currently {member['status']}" member["status"] = "working" member["role"] = role else: # 新建队友 member = {"name": name, "role": role, "status": "working"} self.config["members"].append(member) self._save_config() # 启动一个守护线程来运行队友的主循环 thread = threading.Thread( target=self._loop, # 队友的主循环函数 args=(name, role, prompt), daemon=True, # 守护线程,主程序退出时自动结束 ) self.threads[name] = thread thread.start() return f"Spawned '{name}' (role: {role})" def _loop(self, name: str, role: str, prompt: str): """ 【核心】队友的主循环——WORK和IDLE模式的无限切换。 这是整个自治机制的心脏: 1. WORK阶段:正常的Agent循环(调LLM、执行工具) 2. IDLE阶段:每5秒轮询一次(检查收件箱 + 扫描任务板) 3. 找到新活 -> 回到WORK;60秒没活 -> 关机 """ team_name = self.config["team_name"] # 队友的系统提示词:告诉它"你会自动认领任务" sys_prompt = ( f"You are '{name}', role: {role}, team: {team_name}, at {WORKDIR}. " f"Use idle tool when you have no more work. You will auto-claim new tasks." ) messages = [{"role": "user", "content": prompt}] tools = self._teammate_tools() while True: # ========================================== # WORK阶段:标准的Agent工具循环 # ========================================== for _ in range(50): # 最多50轮工具调用(防无限循环) # 先检查收件箱,看有没有新消息 inbox = BUS.read_inbox(name) for msg in inbox: # 如果收到关闭请求,立即优雅退出 if msg.get("type") == "shutdown_request": self._set_status(name, "shutdown") return # 其他消息加入对话上下文 messages.append({"role": "user", "content": json.dumps(msg)}) # 调用LLM try: response = client.messages.create( model=MODEL, system=sys_prompt, messages=messages, tools=tools, max_tokens=8000, ) except Exception: # API调用失败,进入idle状态 self._set_status(name, "idle") return # 把LLM的回复加入对话历史 messages.append({"role": "assistant", "content": response.content}) # 如果LLM没有调用任何工具,说明它"说完了",退出WORK阶段 if response.stop_reason != "tool_use": break # 执行LLM调用的工具 results = [] idle_requested = False for block in response.content: if block.type == "tool_use": if block.name == "idle": # 【关键】idle工具是一个"控制信号" # LLM主动说"我没活干了",触发进入IDLE阶段 idle_requested = True output = "Entering idle phase. Will poll for new tasks." else: # 正常执行其他工具 output = self._exec(name, block.name, block.input) print(f" [{name}] {block.name}: {str(output)[:120]}") results.append({ "type": "tool_result", "tool_use_id": block.id, "content": str(output), }) messages.append({"role": "user", "content": results}) # 如果LLM请求了idle,跳出WORK循环,进入IDLE阶段 if idle_requested: break # ========================================== # IDLE阶段:轮询收件箱和任务板 # ========================================== self._set_status(name, "idle") resume = False polls = IDLE_TIMEOUT // max(POLL_INTERVAL, 1) # 60/5=12次轮询 for _ in range(polls): time.sleep(POLL_INTERVAL) # 每5秒看一次 # 第一优先:检查收件箱 inbox = BUS.read_inbox(name) if inbox: for msg in inbox: if msg.get("type") == "shutdown_request": self._set_status(name, "shutdown") return messages.append({"role": "user", "content": json.dumps(msg)}) resume = True break # 第二优先:扫描任务板 unclaimed = scan_unclaimed_tasks() if unclaimed: task = unclaimed[0] # 拿第一个未认领的任务 result = claim_task(task["id"], name) if result.startswith("Error:"): # 认领失败(被别人抢先了),继续轮询 continue # 认领成功!构造任务提示词 task_prompt = ( f"<auto-claimed>Task #{task['id']}: {task['subject']}\n" f"{task.get('description', '')}</auto-claimed>" ) # 【身份重注入】如果对话很短(可能被压缩过),重新注入身份 if len(messages) <= 3: messages.insert(0, make_identity_block(name, role, team_name)) messages.insert(1, {"role": "assistant", "content": f"I am {name}. Continuing."}) # 把新任务加入对话 messages.append({"role": "user", "content": task_prompt}) messages.append({"role": "assistant", "content": f"Claimed task #{task['id']}. Working on it."}) resume = True break if not resume: # 60秒内没找到任何活,队友自动关机 # 不白白占着资源空转 self._set_status(name, "shutdown") return # 找到了新活,回到WORK模式 self._set_status(name, "working") def _exec(self, sender: str, tool_name: str, args: dict) -> str: """执行工具调用(分发到具体的工具函数)""" if tool_name == "bash": return _run_bash(args["command"]) if tool_name == "read_file": return _run_read(args["path"]) if tool_name == "write_file": return _run_write(args["path"], args["content"]) if tool_name == "edit_file": return _run_edit(args["path"], args["old_text"], args["new_text"]) if tool_name == "send_message": return BUS.send(sender, args["to"], args["content"], args.get("msg_type", "message")) if tool_name == "read_inbox": return json.dumps(BUS.read_inbox(sender), indent=2) if tool_name == "shutdown_response": # 处理关闭请求的回复 req_id = args["request_id"] with _tracker_lock: if req_id in shutdown_requests: shutdown_requests[req_id]["status"] = "approved" if args["approve"] else "rejected" BUS.send( sender, "lead", args.get("reason", ""), "shutdown_response", {"request_id": req_id, "approve": args["approve"]}, ) return f"Shutdown {'approved' if args['approve'] else 'rejected'}" if tool_name == "plan_approval": # 提交方案等待审批 plan_text = args.get("plan", "") req_id = str(uuid.uuid4())[:8] with _tracker_lock: plan_requests[req_id] = {"from": sender, "plan": plan_text, "status": "pending"} BUS.send( sender, "lead", plan_text, "plan_approval_response", {"request_id": req_id, "plan": plan_text}, ) return f"Plan submitted (request_id={req_id}). Waiting for approval." if tool_name == "claim_task": # 队友也可以主动通过工具调用来认领任务(不只是IDLE自动认领) return claim_task(args["task_id"], sender) return f"Unknown tool: {tool_name}" def _teammate_tools(self) -> list: """队友可用的工具列表(比Lead少一些管理类工具,多了idle和claim_task)""" return [ # 基础工具(和之前一样) {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "edit_file", "description": "Replace exact text in file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, # 通信工具 {"name": "send_message", "description": "Send message to a teammate.", "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}}, {"name": "read_inbox", "description": "Read and drain your inbox.", "input_schema": {"type": "object", "properties": {}}}, # 协议工具(和第10课一样) {"name": "shutdown_response", "description": "Respond to a shutdown request.", "input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}, "approve": {"type": "boolean"}, "reason": {"type": "string"}}, "required": ["request_id", "approve"]}}, {"name": "plan_approval", "description": "Submit a plan for lead approval.", "input_schema": {"type": "object", "properties": {"plan": {"type": "string"}}, "required": ["plan"]}}, # 【新增】自治工具 {"name": "idle", "description": "Signal that you have no more work. Enters idle polling phase.", "input_schema": {"type": "object", "properties": {}}}, {"name": "claim_task", "description": "Claim a task from the task board by ID.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}}, ] def list_all(self) -> str: """列出所有队友的状态""" if not self.config["members"]: return "No teammates." lines = [f"Team: {self.config['team_name']}"] for m in self.config["members"]: lines.append(f" {m['name']} ({m['role']}): {m['status']}") return "\n".join(lines) def member_names(self) -> list: """返回所有队友的名字列表""" return [m["name"] for m in self.config["members"]] TEAM = TeammateManager(TEAM_DIR) # ============================================================ # 基础工具实现(和之前完全一样) # ============================================================ def _safe_path(p: str) -> Path: """路径安全检查,防止访问工作目录之外的文件""" path = (WORKDIR / p).resolve() if not path.is_relative_to(WORKDIR): raise ValueError(f"Path escapes workspace: {p}") return path def _run_bash(command: str) -> str: """执行shell命令(带危险命令过滤和超时保护)""" dangerous = ["rm -rf /", "sudo", "shutdown", "reboot"] if any(d in command for d in dangerous): return "Error: Dangerous command blocked" try: r = subprocess.run( command, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=120, ) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" def _run_read(path: str, limit: int = None) -> str: """读取文件内容""" try: lines = _safe_path(path).read_text().splitlines() if limit and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more)"] return "\n".join(lines)[:50000] except Exception as e: return f"Error: {e}" def _run_write(path: str, content: str) -> str: """写入文件内容""" try: fp = _safe_path(path) fp.parent.mkdir(parents=True, exist_ok=True) fp.write_text(content) return f"Wrote {len(content)} bytes" except Exception as e: return f"Error: {e}" def _run_edit(path: str, old_text: str, new_text: str) -> str: """编辑文件(查找并替换一处文本)""" try: fp = _safe_path(path) c = fp.read_text() if old_text not in c: return f"Error: Text not found in {path}" fp.write_text(c.replace(old_text, new_text, 1)) return f"Edited {path}" except Exception as e: return f"Error: {e}" # ============================================================ # Lead专用的协议处理函数(和第10课一样) # ============================================================ def handle_shutdown_request(teammate: str) -> str: """Lead发起关闭请求""" req_id = str(uuid.uuid4())[:8] with _tracker_lock: shutdown_requests[req_id] = {"target": teammate, "status": "pending"} BUS.send( "lead", teammate, "Please shut down gracefully.", "shutdown_request", {"request_id": req_id}, ) return f"Shutdown request {req_id} sent to '{teammate}'" def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> str: """Lead审批队友提交的方案""" with _tracker_lock: req = plan_requests.get(request_id) if not req: return f"Error: Unknown plan request_id '{request_id}'" with _tracker_lock: req["status"] = "approved" if approve else "rejected" BUS.send( "lead", req["from"], feedback, "plan_approval_response", {"request_id": request_id, "approve": approve, "feedback": feedback}, ) return f"Plan {req['status']} for '{req['from']}'" def _check_shutdown_status(request_id: str) -> str: """查询关闭请求的状态""" with _tracker_lock: return json.dumps(shutdown_requests.get(request_id, {"error": "not found"})) # ============================================================ # Lead的工具调度表(14个工具) # ============================================================ # Lead和队友共享同一个任务板,Lead也可以认领任务 TOOL_HANDLERS = { "bash": lambda **kw: _run_bash(kw["command"]), "read_file": lambda **kw: _run_read(kw["path"], kw.get("limit")), "write_file": lambda **kw: _run_write(kw["path"], kw["content"]), "edit_file": lambda **kw: _run_edit(kw["path"], kw["old_text"], kw["new_text"]), "spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]), "list_teammates": lambda **kw: TEAM.list_all(), "send_message": lambda **kw: BUS.send("lead", kw["to"], kw["content"], kw.get("msg_type", "message")), "read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2), "broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()), "shutdown_request": lambda **kw: handle_shutdown_request(kw["teammate"]), "shutdown_response": lambda **kw: _check_shutdown_status(kw.get("request_id", "")), "plan_approval": lambda **kw: handle_plan_review(kw["request_id"], kw["approve"], kw.get("feedback", "")), "idle": lambda **kw: "Lead does not idle.", # Lead不需要idle "claim_task": lambda **kw: claim_task(kw["task_id"], "lead"), # Lead也能认领任务 } # Lead的工具定义列表(供API使用) TOOLS = [ {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "edit_file", "description": "Replace exact text in file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, {"name": "spawn_teammate", "description": "Spawn an autonomous teammate.", "input_schema": {"type": "object", "properties": {"name": {"type": "string"}, "role": {"type": "string"}, "prompt": {"type": "string"}}, "required": ["name", "role", "prompt"]}}, {"name": "list_teammates", "description": "List all teammates.", "input_schema": {"type": "object", "properties": {}}}, {"name": "send_message", "description": "Send a message to a teammate.", "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}}, {"name": "read_inbox", "description": "Read and drain the lead's inbox.", "input_schema": {"type": "object", "properties": {}}}, {"name": "broadcast", "description": "Send a message to all teammates.", "input_schema": {"type": "object", "properties": {"content": {"type": "string"}}, "required": ["content"]}}, {"name": "shutdown_request", "description": "Request a teammate to shut down.", "input_schema": {"type": "object", "properties": {"teammate": {"type": "string"}}, "required": ["teammate"]}}, {"name": "shutdown_response", "description": "Check shutdown request status.", "input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}}, "required": ["request_id"]}}, {"name": "plan_approval", "description": "Approve or reject a teammate's plan.", "input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}, "approve": {"type": "boolean"}, "feedback": {"type": "string"}}, "required": ["request_id", "approve"]}}, {"name": "idle", "description": "Enter idle state (for lead -- rarely used).", "input_schema": {"type": "object", "properties": {}}}, {"name": "claim_task", "description": "Claim a task from the board by ID.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}}, ] # ============================================================ # Lead的主循环(和之前一样,没有变化!) # ============================================================ def agent_loop(messages: list): """ Lead的Agent循环。 注意:这个函数从第2课到现在基本没变过! 所有新功能都是通过"加工具"和"加队友能力"来实现的, 核心循环保持稳定。 """ while True: # 先检查Lead的收件箱 inbox = BUS.read_inbox("lead") if inbox: messages.append({ "role": "user", "content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>", }) # 调用LLM response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000, ) messages.append({"role": "assistant", "content": response.content}) # 如果LLM不需要调用工具,结束本轮 if response.stop_reason != "tool_use": return # 执行工具调用 results = [] for block in response.content: if block.type == "tool_use": handler = TOOL_HANDLERS.get(block.name) try: output = handler(**block.input) if handler else f"Unknown tool: {block.name}" except Exception as e: output = f"Error: {e}" print(f"> {block.name}:") print(str(output)[:200]) results.append({ "type": "tool_result", "tool_use_id": block.id, "content": str(output), }) messages.append({"role": "user", "content": results}) # ============================================================ # 主入口:命令行交互 # ============================================================ if __name__ == "__main__": history = [] while True: try: query = input("\033[36ms11 >> \033[0m") except (EOFError, KeyboardInterrupt): break if query.strip().lower() in ("q", "exit", ""): break # 快捷命令:查看团队状态 if query.strip() == "/team": print(TEAM.list_all()) continue # 快捷命令:查看Lead的收件箱 if query.strip() == "/inbox": print(json.dumps(BUS.read_inbox("lead"), indent=2)) continue # 【新增】快捷命令:查看任务板 if query.strip() == "/tasks": TASKS_DIR.mkdir(exist_ok=True) for f in sorted(TASKS_DIR.glob("task_*.json")): t = json.loads(f.read_text()) # 用不同符号表示任务状态 marker = { "pending": "[ ]", # 待处理 "in_progress": "[>]", # 进行中 "completed": "[x]", # 已完成 }.get(t["status"], "[?]") owner = f" @{t['owner']}" if t.get("owner") else "" print(f" {marker} #{t['id']}: {t['subject']}{owner}") continue # 正常对话 history.append({"role": "user", "content": query}) agent_loop(history) response_content = history[-1]["content"] if isinstance(response_content, list): for block in response_content: if hasattr(block, "text"): print(block.text) print()
pythonTASKS_DIR = WORKDIR / ".tasks" # 任务板目录,每个任务一个JSON文件 POLL_INTERVAL = 5 # IDLE模式下每5秒轮询一次 IDLE_TIMEOUT = 60 # 空闲超过60秒就关机 _claim_lock = threading.Lock() # 认领锁,防止多人抢同一个任务
这就像看板系统的基础设施:一面墙(.tasks/目录)、一个计时器(POLL_INTERVAL)、一个"先到先得"的规则(_claim_lock)。
pythondef scan_unclaimed_tasks() -> list: unclaimed = [] for f in sorted(TASKS_DIR.glob("task_*.json")): task = json.loads(f.read_text()) if (task.get("status") == "pending" # 条件1:待处理 and not task.get("owner") # 条件2:没人认领 and not task.get("blockedBy")): # 条件3:没被阻塞 unclaimed.append(task) return unclaimed
核心逻辑: 遍历 .tasks/ 目录下的所有任务文件,找出同时满足三个条件的任务。就像在看板墙上扫一眼,只看"待办"列里没贴人名的便利贴。
pythondef claim_task(task_id: int, owner: str) -> str: with _claim_lock: # 获取锁(同一时间只有一人能进) task = json.loads(path.read_text()) if task.get("owner"): # 已经被别人认领了 return "Error: ..." task["owner"] = owner # 写入认领者 task["status"] = "in_progress" # 状态改为进行中 path.write_text(json.dumps(task)) return f"Claimed task #{task_id}" # 释放锁
为什么用锁? 没有锁的话,两个线程可能同时读到"没人认领",然后都写入自己名字,后写的覆盖先写的。加了 with _claim_lock 后,同一时间只有一个线程能读+写,保证认领是原子的。
pythondef make_identity_block(name, role, team_name): return { "role": "user", "content": f"<identity>You are '{name}', role: {role}, team: {team_name}. Continue your work.</identity>", }
什么时候用? 当对话历史很短(len(messages) <= 3)时,说明可能被压缩过了。这时候在对话开头插入这条身份提醒,让Agent "想起自己是谁"。
python# IDLE阶段开始 self._set_status(name, "idle") for _ in range(polls): # 最多轮询12次(60秒/5秒) time.sleep(POLL_INTERVAL) # 等5秒 # 优先级1:检查收件箱 inbox = BUS.read_inbox(name) if inbox: resume = True; break # 有消息!回去干活 # 优先级2:扫描任务板 unclaimed = scan_unclaimed_tasks() if unclaimed: result = claim_task(unclaimed[0]["id"], name) if not result.startswith("Error:"): resume = True; break # 认领成功!回去干活 if not resume: self._set_status(name, "shutdown") # 没活了,关机 return
这段代码就是"自治"的灵魂: 队友干完活后不是傻等,而是每5秒主动看看有没有新活。先看收件箱(也许Lead发了新指令),再看任务板(也许有新任务可以认领)。60秒内找到了就继续干,找不到就优雅退出。
pythonif block.name == "idle": idle_requested = True output = "Entering idle phase. Will poll for new tasks."
关键设计: idle 不是一个真正的"工具"(它不执行任何操作),而是一个控制信号。LLM通过调用 idle 来告诉系统"我觉得我做完了",然后系统就让它进入IDLE轮询阶段。这是LLM和系统之间的一种"约定"。
pythonif query.strip() == "/tasks": for f in sorted(TASKS_DIR.glob("task_*.json")): t = json.loads(f.read_text()) marker = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"} owner = f" @{t['owner']}" if t.get("owner") else "" print(f" {marker} #{t['id']}: {t['subject']}{owner}")
输入 /tasks 就能看到任务板的实时状态,方便你观察队友们的工作进展。
bashexport MODEL_ID=claude-sonnet-4-20250514 python agents/s11_autonomous_agents.py
第一步:让Lead创建任务并启动队友
texts11 >> 在.tasks/目录创建3个任务:task_1是写登录页面,task_2是写注册页面,task_3是写测试。 然后创建两个队友alice(前端)和bob(测试)。 > write_file: Wrote 89 bytes > write_file: Wrote 91 bytes > write_file: Wrote 85 bytes > spawn_teammate: Spawned 'alice' (role: frontend) > spawn_teammate: Spawned 'bob' (role: testing)
第二步:观察队友自动认领任务
texts11 >> /tasks [>] #1: 写登录页面 @alice [>] #2: 写注册页面 @bob [ ] #3: 写测试 (alice和bob自动认领了task_1和task_2!没人指派,他们自己揭的)
第三步:等一会再看
texts11 >> /tasks [x] #1: 写登录页面 @alice [x] #2: 写注册页面 @bob [>] #3: 写测试 @alice (alice做完task_1后,进入IDLE,扫描到task_3没人做,自动认领了!)
第四步:查看团队状态
texts11 >> /team Team: default alice (frontend): working bob (testing): idle (bob做完了在等活,alice还在做task_3)
idle 来表达"我做完了",系统收到信号后切换到IDLE模式。这是一种优雅的"LLM和系统协商"的方式。with _claim_lock 保证认领是原子的——读+判断+写三步不可中断。第12课:Context Management —— 上下文管理。现在队友能自己找活干了,但对话越来越长怎么办?token用完了怎么办?下一课我们学习如何压缩对话、管理上下文窗口,让Agent在有限的token预算内持续工作。