一句话总结:多个Agent同时改代码会打架(文件冲突)。用git worktree给每个任务创建独立的工作目录,各写各的互不干扰,干完再合并回来。加上EventBus做全局日志,谁干了啥一清二楚。
一句话总结:多个Agent同时改代码会打架(文件冲突)。用git worktree给每个任务创建独立的工作目录,各写各的互不干扰,干完再合并回来。加上EventBus做全局日志,谁干了啥一清二楚。
上一课我们的Agent已经能自己找活干了,但有一个致命问题:
所有Agent都在同一个代码目录里工作
text场景:两个Agent同时改代码 alice正在改 auth.py,改到一半... bob也要改 auth.py,直接覆盖了alice的修改... alice: "我的代码呢???" 或者更隐蔽的: alice: git checkout -b feature-login bob: git checkout -b fix-bug → bob的checkout把alice正在改的文件全切走了!
类比:图书馆只有一张桌子
text想象一个图书馆只有一张大桌子: alice在桌上摊开论文资料,正在写第三章... bob也来了,把alice的资料推到一边,摊开自己的... charlie也来了... → 大家互相干扰,谁都写不好 解决方案:给每人一个小隔间! 隔间1: alice安静写她的论文 隔间2: bob安静写他的论文 隔间3: charlie安静写他的论文 → 各写各的,互不干扰 → 写完了,各自交给装订处(合并回主分支)
传统方式(一个目录): worktree方式(多个目录):
my-project/ my-project/ ← 主目录(main分支)
├── src/ .worktrees/
├── tests/ ├── auth-refactor/ ← 隔间1(wt/auth-refactor分支)
└── ... │ ├── src/
所有人都在这里改 │ └── tests/
切分支就会互相影响 ├── fix-bug/ ← 隔间2(wt/fix-bug分支)
│ ├── src/
│ └── tests/
└── add-tests/ ← 隔间3(wt/add-tests分支)
├── src/
└── tests/
每个隔间是独立的完整代码副本!控制面(Task): 执行面(Worktree):
"做什么" "在哪做"
.tasks/ .worktrees/
task_1.json: auth-refactor/
subject: "重构认证" ├── src/
status: "in_progress" └── ...(独立的代码目录)
worktree: "auth-refactor" ──────→
fix-bug/
task_2.json: ├── src/
subject: "修复登录bug" └── ...(独立的代码目录)
worktree: "fix-bug" ────────────→
任务管理 ← task_id → 目录绑定 命令在这里执行核心设计理念: 用任务ID把"做什么"和"在哪做"连接起来。
text所有重要事件都记到一个JSONL文件里(每行一条JSON): events.jsonl: {"event":"worktree.create.before", "ts":1234, "task":{"id":1}, "worktree":{"name":"auth"}} {"event":"worktree.create.after", "ts":1235, "task":{"id":1}, "worktree":{"name":"auth","status":"active"}} {"event":"task.completed", "ts":1300, "task":{"id":1,"subject":"重构认证","status":"completed"}} {"event":"worktree.remove.after", "ts":1301, "worktree":{"name":"auth","status":"removed"}} 好处: - 出了问题可以回溯:谁在什么时候做了什么 - Agent可以查看历史:worktree_events工具读取最近N条事件 - 不污染对话:事件是侧通道,不会塞进和模型的对话里
用户输入: "创建任务:重构认证模块"
│
▼
┌─────────────┐ EventBus记录
│ task_create │───→ (无,任务创建暂不记事件)
│ 创建任务#1 │
│ status:pending│
└─────┬───────┘
│
▼
┌──────────────────┐ EventBus记录
│ worktree_create │───→ worktree.create.before
│ name:"auth" │───→ worktree.create.after
│ task_id: 1 │
│ │
│ 实际执行: │
│ git worktree add │
│ -b wt/auth │
│ .worktrees/auth │
│ HEAD │
└─────┬────────────┘
│ 同时自动:task#1.worktree = "auth"
│ task#1.status = "in_progress"
▼
┌──────────────────┐
│ worktree_run │ 在隔间里执行命令
│ name:"auth" │ cwd = .worktrees/auth/
│ cmd:"编写代码..." │ 互不影响其他worktree
└─────┬────────────┘
│
▼ 工作完成,两种选择:
│
┌───┴───┐
│ │
▼ ▼
┌─────┐ ┌──────────────────┐
│keep │ │ remove │ EventBus记录
│保留 │ │ complete_task=True│───→ task.completed
│隔间 │ │ 删除隔间+完成任务 │───→ worktree.remove.after
└─────┘ └──────────────────┘| 维度 | S11 自治Agent | S12 Worktree隔离 |
|---|---|---|
| 解决的问题 | Agent被动等老板派活 | 多Agent改代码互相冲突 |
| 核心机制 | IDLE/WORK模式 + 扫描认领 | git worktree + 目录隔离 |
| 任务管理 | 有(扫描+认领) | 有(创建+绑定worktree) |
| 代码隔离 | 无(都在同一目录) | 有(每任务独立目录) |
| 生命周期追踪 | 无 | EventBus全程记录 |
| 收尾方式 | 任务标记完成 | keep(保留)或remove(删除)worktree |
| 类比 | 看板管理:自己揭任务 | 图书馆隔间:各写各的论文 |
进化关系: S11让Agent会自己找活干,S12让Agent干活时不互相打架。两者结合 = 自驱动 + 无冲突的完美团队。
python#!/usr/bin/env python3 # 第12课:Worktree + Task 隔离 # 用目录级隔离实现并行任务执行,互不冲突 """ s12_worktree_task_isolation.py - Worktree + Task Isolation 目录级别的隔离,让并行任务执行不冲突。 任务(Task)是控制面,工作树(Worktree)是执行面。 .tasks/task_12.json ← 控制面:记录"做什么" { "id": 12, "subject": "Implement auth refactor", "status": "in_progress", "worktree": "auth-refactor" ← 绑定到哪个执行目录 } .worktrees/index.json ← 执行面:记录"在哪做" { "worktrees": [ { "name": "auth-refactor", "path": ".../.worktrees/auth-refactor", "branch": "wt/auth-refactor", "task_id": 12, "status": "active" } ] } 核心理念:"用目录隔离执行,用任务ID协调关联。" """ import json import os import re import subprocess import time from pathlib import Path from anthropic import Anthropic from dotenv import load_dotenv # 加载环境变量 load_dotenv(override=True) # 如果用的是第三方兼容API,清掉多余的认证变量 if os.getenv("ANTHROPIC_BASE_URL"): os.environ.pop("ANTHROPIC_AUTH_TOKEN", None) WORKDIR = Path.cwd() # 当前工作目录 client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL")) # API客户端 MODEL = os.environ["MODEL_ID"] # 模型ID def detect_repo_root(cwd: Path) -> Path | None: """检测当前目录是否在git仓库里,返回仓库根目录。""" try: r = subprocess.run( ["git", "rev-parse", "--show-toplevel"], # git命令:获取仓库根目录 cwd=cwd, capture_output=True, text=True, timeout=10, ) if r.returncode != 0: return None # 不在git仓库里 root = Path(r.stdout.strip()) return root if root.exists() else None except Exception: return None # 检测仓库根目录,找不到就用当前目录兜底 # 这样即使不在git仓库里,任务和事件功能仍然可以演示 REPO_ROOT = detect_repo_root(WORKDIR) or WORKDIR # 系统提示词:告诉Agent它有哪些工具可以用 SYSTEM = ( f"You are a coding agent at {WORKDIR}. " "Use task + worktree tools for multi-task work. " "For parallel or risky changes: create tasks, allocate worktree lanes, " "run commands in those lanes, then choose keep/remove for closeout. " "Use worktree_events when you need lifecycle visibility." ) # ================================================================ # EventBus:追加写入的事件日志,用于全局可观测性 # ================================================================ # 类比:飞机上的黑匣子,记录所有重要操作 # 好处:事件是"侧通道",不会塞进模型对话里污染上下文 class EventBus: def __init__(self, event_log_path: Path): self.path = event_log_path self.path.parent.mkdir(parents=True, exist_ok=True) # 确保目录存在 if not self.path.exists(): self.path.write_text("") # 创建空日志文件 def emit( self, event: str, # 事件名称,如 "worktree.create.before" task: dict | None = None, # 关联的任务信息 worktree: dict | None = None, # 关联的worktree信息 error: str | None = None, # 错误信息(如果有) ): """写入一条事件到JSONL日志文件(追加写入,每行一条JSON)。""" payload = { "event": event, "ts": time.time(), # 时间戳 "task": task or {}, "worktree": worktree or {}, } if error: payload["error"] = error # 追加写入,不会覆盖已有记录 with self.path.open("a", encoding="utf-8") as f: f.write(json.dumps(payload) + "\n") def list_recent(self, limit: int = 20) -> str: """读取最近N条事件,供Agent查询历史。 这是一个只读工具——让模型可以回顾之前发生了什么, 用于推理和决策,而不是凭记忆猜。 """ n = max(1, min(int(limit or 20), 200)) # 限制范围:1~200条 lines = self.path.read_text(encoding="utf-8").splitlines() recent = lines[-n:] # 取最后N行 items = [] for line in recent: try: items.append(json.loads(line)) except Exception: items.append({"event": "parse_error", "raw": line}) # 解析失败也不丢 return json.dumps(items, indent=2) # ================================================================ # TaskManager:持久化的任务看板,支持worktree绑定 # ================================================================ # 每个任务存为一个JSON文件:.tasks/task_1.json, task_2.json, ... # 关键字段:id, subject, status, worktree(绑定的工作目录名) class TaskManager: def __init__(self, tasks_dir: Path): self.dir = tasks_dir self.dir.mkdir(parents=True, exist_ok=True) # 确保目录存在 self._next_id = self._max_id() + 1 # 自增ID def _max_id(self) -> int: """扫描已有任务文件,找到最大ID。""" ids = [] for f in self.dir.glob("task_*.json"): try: ids.append(int(f.stem.split("_")[1])) # 从文件名提取ID except Exception: pass return max(ids) if ids else 0 def _path(self, task_id: int) -> Path: """任务文件路径:.tasks/task_1.json""" return self.dir / f"task_{task_id}.json" def _load(self, task_id: int) -> dict: """加载一个任务。""" path = self._path(task_id) if not path.exists(): raise ValueError(f"Task {task_id} not found") return json.loads(path.read_text()) def _save(self, task: dict): """保存一个任务到文件。""" self._path(task["id"]).write_text(json.dumps(task, indent=2)) def create(self, subject: str, description: str = "") -> str: """创建新任务。worktree字段初始为空,等待后续绑定。 worktree字段是控制面(任务)和执行面(工作目录)之间的桥梁。 """ task = { "id": self._next_id, "subject": subject, # 任务标题 "description": description, # 任务描述 "status": "pending", # 初始状态:待处理 "owner": "", # 负责人(暂空) "worktree": "", # 绑定的worktree名(暂空) "blockedBy": [], # 依赖的其他任务 "created_at": time.time(), "updated_at": time.time(), } self._save(task) self._next_id += 1 return json.dumps(task, indent=2) def get(self, task_id: int) -> str: """获取任务详情。""" return json.dumps(self._load(task_id), indent=2) def exists(self, task_id: int) -> bool: """检查任务是否存在。""" return self._path(task_id).exists() def update(self, task_id: int, status: str = None, owner: str = None) -> str: """更新任务状态或负责人。""" task = self._load(task_id) if status: if status not in ("pending", "in_progress", "completed"): raise ValueError(f"Invalid status: {status}") task["status"] = status if owner is not None: task["owner"] = owner task["updated_at"] = time.time() self._save(task) return json.dumps(task, indent=2) def bind_worktree(self, task_id: int, worktree: str, owner: str = "") -> str: """把任务绑定到一个worktree。 绑定意味着"这个任务将在这个独立目录里执行"。 如果任务还是pending状态,绑定时自动变为in_progress—— 因为分配了执行环境,就说明工作开始了。 """ task = self._load(task_id) task["worktree"] = worktree if owner: task["owner"] = owner if task["status"] == "pending": # 绑定执行环境 = 隐式开始工作 task["status"] = "in_progress" task["updated_at"] = time.time() self._save(task) return json.dumps(task, indent=2) def unbind_worktree(self, task_id: int) -> str: """解除任务和worktree的绑定(worktree被删除后调用)。""" task = self._load(task_id) task["worktree"] = "" task["updated_at"] = time.time() self._save(task) return json.dumps(task, indent=2) def list_all(self) -> str: """列出所有任务,显示状态、负责人、绑定的worktree。""" tasks = [] for f in sorted(self.dir.glob("task_*.json")): tasks.append(json.loads(f.read_text())) if not tasks: return "No tasks." lines = [] for t in tasks: # 状态标记:[ ] 待处理 [>] 进行中 [x] 已完成 marker = { "pending": "[ ]", "in_progress": "[>]", "completed": "[x]", }.get(t["status"], "[?]") owner = f" owner={t['owner']}" if t.get("owner") else "" wt = f" wt={t['worktree']}" if t.get("worktree") else "" lines.append(f"{marker} #{t['id']}: {t['subject']}{owner}{wt}") return "\n".join(lines) # 初始化任务管理器和事件总线 TASKS = TaskManager(REPO_ROOT / ".tasks") EVENTS = EventBus(REPO_ROOT / ".worktrees" / "events.jsonl") # ================================================================ # WorktreeManager:git worktree的完整生命周期管理 # ================================================================ # 核心能力:创建/列出/执行命令/保留/删除 worktree # 每个操作都通过EventBus记录生命周期事件 class WorktreeManager: def __init__(self, repo_root: Path, tasks: TaskManager, events: EventBus): self.repo_root = repo_root self.tasks = tasks # 需要TaskManager来做绑定 self.events = events # 需要EventBus来记日志 self.dir = repo_root / ".worktrees" self.dir.mkdir(parents=True, exist_ok=True) # index.json:记录所有worktree的元信息 self.index_path = self.dir / "index.json" if not self.index_path.exists(): self.index_path.write_text(json.dumps({"worktrees": []}, indent=2)) self.git_available = self._is_git_repo() # 检查是否在git仓库里 def _is_git_repo(self) -> bool: """检查当前目录是否是git仓库。""" try: r = subprocess.run( ["git", "rev-parse", "--is-inside-work-tree"], cwd=self.repo_root, capture_output=True, text=True, timeout=10, ) return r.returncode == 0 except Exception: return False def _run_git(self, args: list[str]) -> str: """统一的git命令执行入口。 所有git worktree操作都走这一个入口, 好处是:失败时统一转成Python异常,方便记录事件日志。 """ if not self.git_available: raise RuntimeError("Not in a git repository. worktree tools require git.") r = subprocess.run( ["git", *args], cwd=self.repo_root, capture_output=True, text=True, timeout=120, # 最多等2分钟 ) if r.returncode != 0: msg = (r.stdout + r.stderr).strip() raise RuntimeError(msg or f"git {' '.join(args)} failed") return (r.stdout + r.stderr).strip() or "(no output)" def _load_index(self) -> dict: """加载worktree索引文件。""" return json.loads(self.index_path.read_text()) def _save_index(self, data: dict): """保存worktree索引文件。""" self.index_path.write_text(json.dumps(data, indent=2)) def _find(self, name: str) -> dict | None: """按名称查找worktree。""" idx = self._load_index() for wt in idx.get("worktrees", []): if wt.get("name") == name: return wt return None def _validate_name(self, name: str): """校验worktree名称:只允许字母、数字、点、下划线、横线,1~40字符。""" if not re.fullmatch(r"[A-Za-z0-9._-]{1,40}", name or ""): raise ValueError( "Invalid worktree name. Use 1-40 chars: letters, numbers, ., _, -" ) def create(self, name: str, task_id: int = None, base_ref: str = "HEAD") -> str: """创建一个新的worktree(独立工作目录)。 核心命令:git worktree add -b wt/<name> .worktrees/<name> HEAD 这会同时创建一个新分支和一个checkout出来的目录, 给每个任务一条独立的执行通道(lane)。 如果指定了task_id,会自动把任务和这个worktree绑定。 """ self._validate_name(name) if self._find(name): raise ValueError(f"Worktree '{name}' already exists in index") if task_id is not None and not self.tasks.exists(task_id): raise ValueError(f"Task {task_id} not found") path = self.dir / name # worktree的物理路径 branch = f"wt/{name}" # 对应的git分支名 # 创建前记录事件 self.events.emit( "worktree.create.before", task={"id": task_id} if task_id is not None else {}, worktree={"name": name, "base_ref": base_ref}, ) try: # 关键命令!创建worktree + 新分支 # -b branch:创建新分支 # str(path):checkout到这个目录 # base_ref:基于哪个提交创建(默认HEAD) self._run_git(["worktree", "add", "-b", branch, str(path), base_ref]) # worktree的元信息 entry = { "name": name, "path": str(path), "branch": branch, "task_id": task_id, "status": "active", # 状态:活跃 "created_at": time.time(), } # 写入索引 idx = self._load_index() idx["worktrees"].append(entry) self._save_index(idx) if task_id is not None: # 在任务看板上记录:这个任务在哪个目录里执行 # 这样人类和模型后续都能找到正确的目录 self.tasks.bind_worktree(task_id, name) # 创建后记录事件 self.events.emit( "worktree.create.after", task={"id": task_id} if task_id is not None else {}, worktree={ "name": name, "path": str(path), "branch": branch, "status": "active", }, ) return json.dumps(entry, indent=2) except Exception as e: # 创建失败也要记录事件,方便排查 self.events.emit( "worktree.create.failed", task={"id": task_id} if task_id is not None else {}, worktree={"name": name, "base_ref": base_ref}, error=str(e), ) raise def list_all(self) -> str: """列出所有worktree及其状态。""" idx = self._load_index() wts = idx.get("worktrees", []) if not wts: return "No worktrees in index." lines = [] for wt in wts: suffix = f" task={wt['task_id']}" if wt.get("task_id") else "" lines.append( f"[{wt.get('status', 'unknown')}] {wt['name']} -> " f"{wt['path']} ({wt.get('branch', '-')}){suffix}" ) return "\n".join(lines) def status(self, name: str) -> str: """查看指定worktree的git状态(有没有未提交的修改等)。""" wt = self._find(name) if not wt: return f"Error: Unknown worktree '{name}'" path = Path(wt["path"]) if not path.exists(): return f"Error: Worktree path missing: {path}" r = subprocess.run( ["git", "status", "--short", "--branch"], cwd=path, # 在worktree目录里执行git status capture_output=True, text=True, timeout=60, ) text = (r.stdout + r.stderr).strip() return text or "Clean worktree" def run(self, name: str, command: str) -> str: """在指定的worktree目录里执行shell命令。 这就是worktree隔离的核心价值: 每个并行任务都在自己的checkout目录里改文件, 不会影响其他worktree或主目录。 """ # 安全检查:拦截危险命令 dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"] if any(d in command for d in dangerous): return "Error: Dangerous command blocked" wt = self._find(name) if not wt: return f"Error: Unknown worktree '{name}'" path = Path(wt["path"]) if not path.exists(): return f"Error: Worktree path missing: {path}" try: # 关键:cwd=path,命令在worktree自己的目录里执行 r = subprocess.run( command, shell=True, cwd=path, # ← 这里是隔离的关键! capture_output=True, text=True, timeout=300, # 最多等5分钟 ) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (300s)" def remove(self, name: str, force: bool = False, complete_task: bool = False) -> str: """删除一个worktree。 可选参数: - force:强制删除(即使有未提交的修改) - complete_task:同时把绑定的任务标记为completed 类比:论文写完了,把小隔间清空还回去,论文交给装订处。 """ wt = self._find(name) if not wt: return f"Error: Unknown worktree '{name}'" # 删除前记录事件 self.events.emit( "worktree.remove.before", task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {}, worktree={"name": name, "path": wt.get("path")}, ) try: # 执行 git worktree remove args = ["worktree", "remove"] if force: args.append("--force") args.append(wt["path"]) self._run_git(args) # 如果需要同时完成任务 if complete_task and wt.get("task_id") is not None: task_id = wt["task_id"] before = json.loads(self.tasks.get(task_id)) self.tasks.update(task_id, status="completed") self.tasks.unbind_worktree(task_id) # 任务完成是"一等公民"事件—— # 因为worktree的收尾往往比git输出本身更重要 self.events.emit( "task.completed", task={ "id": task_id, "subject": before.get("subject", ""), "status": "completed", }, worktree={"name": name}, ) # 更新索引:标记为removed idx = self._load_index() for item in idx.get("worktrees", []): if item.get("name") == name: item["status"] = "removed" item["removed_at"] = time.time() self._save_index(idx) # 删除后记录事件 self.events.emit( "worktree.remove.after", task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {}, worktree={"name": name, "path": wt.get("path"), "status": "removed"}, ) return f"Removed worktree '{name}'" except Exception as e: # 删除失败也要记录 self.events.emit( "worktree.remove.failed", task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {}, worktree={"name": name, "path": wt.get("path")}, error=str(e), ) raise def keep(self, name: str) -> str: """保留worktree(不删除),标记为kept状态。 类比:论文还没写完,隔间先留着,下次继续。 或者:论文写完了但想再检查一遍,隔间先不还。 """ wt = self._find(name) if not wt: return f"Error: Unknown worktree '{name}'" idx = self._load_index() kept = None for item in idx.get("worktrees", []): if item.get("name") == name: item["status"] = "kept" # 标记为"保留" item["kept_at"] = time.time() kept = item self._save_index(idx) # 记录保留事件 self.events.emit( "worktree.keep", task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {}, worktree={ "name": name, "path": wt.get("path"), "status": "kept", }, ) return json.dumps(kept, indent=2) if kept else f"Error: Unknown worktree '{name}'" # 初始化Worktree管理器(依赖TaskManager和EventBus) WORKTREES = WorktreeManager(REPO_ROOT, TASKS, EVENTS) # ================================================================ # 基础工具:文件读写、命令执行(和之前课程一样的风格) # ================================================================ def safe_path(p: str) -> Path: """安全路径检查:确保不会逃逸出工作目录。""" path = (WORKDIR / p).resolve() if not path.is_relative_to(WORKDIR): raise ValueError(f"Path escapes workspace: {p}") return path def run_bash(command: str) -> str: """在工作目录执行shell命令(注意:这是在主目录,不是在worktree里)。""" dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"] if any(d in command for d in dangerous): return "Error: Dangerous command blocked" try: r = subprocess.run( command, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=120, ) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" def run_read(path: str, limit: int = None) -> str: """读取文件内容。""" try: lines = safe_path(path).read_text().splitlines() if limit and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more)"] return "\n".join(lines)[:50000] except Exception as e: return f"Error: {e}" def run_write(path: str, content: str) -> str: """写入文件内容。""" try: fp = safe_path(path) fp.parent.mkdir(parents=True, exist_ok=True) fp.write_text(content) return f"Wrote {len(content)} bytes" except Exception as e: return f"Error: {e}" def run_edit(path: str, old_text: str, new_text: str) -> str: """替换文件中的指定文本。""" try: fp = safe_path(path) c = fp.read_text() if old_text not in c: return f"Error: Text not found in {path}" fp.write_text(c.replace(old_text, new_text, 1)) return f"Edited {path}" except Exception as e: return f"Error: {e}" # ================================================================ # 工具注册表:把所有工具函数映射到工具名称 # ================================================================ TOOL_HANDLERS = { # --- 基础工具 --- "bash": lambda **kw: run_bash(kw["command"]), "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")), "write_file": lambda **kw: run_write(kw["path"], kw["content"]), "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]), # --- 任务工具(控制面)--- "task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")), "task_list": lambda **kw: TASKS.list_all(), "task_get": lambda **kw: TASKS.get(kw["task_id"]), "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("owner")), "task_bind_worktree": lambda **kw: TASKS.bind_worktree(kw["task_id"], kw["worktree"], kw.get("owner", "")), # --- Worktree工具(执行面)--- "worktree_create": lambda **kw: WORKTREES.create(kw["name"], kw.get("task_id"), kw.get("base_ref", "HEAD")), "worktree_list": lambda **kw: WORKTREES.list_all(), "worktree_status": lambda **kw: WORKTREES.status(kw["name"]), "worktree_run": lambda **kw: WORKTREES.run(kw["name"], kw["command"]), "worktree_keep": lambda **kw: WORKTREES.keep(kw["name"]), "worktree_remove": lambda **kw: WORKTREES.remove(kw["name"], kw.get("force", False), kw.get("complete_task", False)), # --- 事件工具(可观测性)--- "worktree_events": lambda **kw: EVENTS.list_recent(kw.get("limit", 20)), } # ================================================================ # 工具定义:告诉Claude每个工具的名称、描述、参数格式 # ================================================================ TOOLS = [ # --- 4个基础工具 --- { "name": "bash", "description": "Run a shell command in the current workspace (blocking).", "input_schema": { "type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"], }, }, { "name": "read_file", "description": "Read file contents.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "limit": {"type": "integer"}, }, "required": ["path"], }, }, { "name": "write_file", "description": "Write content to file.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "content": {"type": "string"}, }, "required": ["path", "content"], }, }, { "name": "edit_file", "description": "Replace exact text in file.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}, }, "required": ["path", "old_text", "new_text"], }, }, # --- 4个任务工具(控制面)--- { "name": "task_create", "description": "Create a new task on the shared task board.", "input_schema": { "type": "object", "properties": { "subject": {"type": "string"}, "description": {"type": "string"}, }, "required": ["subject"], }, }, { "name": "task_list", "description": "List all tasks with status, owner, and worktree binding.", "input_schema": {"type": "object", "properties": {}}, }, { "name": "task_get", "description": "Get task details by ID.", "input_schema": { "type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"], }, }, { "name": "task_update", "description": "Update task status or owner.", "input_schema": { "type": "object", "properties": { "task_id": {"type": "integer"}, "status": { "type": "string", "enum": ["pending", "in_progress", "completed"], }, "owner": {"type": "string"}, }, "required": ["task_id"], }, }, { "name": "task_bind_worktree", "description": "Bind a task to a worktree name.", "input_schema": { "type": "object", "properties": { "task_id": {"type": "integer"}, "worktree": {"type": "string"}, "owner": {"type": "string"}, }, "required": ["task_id", "worktree"], }, }, # --- 6个Worktree工具(执行面)--- { "name": "worktree_create", "description": "Create a git worktree and optionally bind it to a task.", "input_schema": { "type": "object", "properties": { "name": {"type": "string"}, "task_id": {"type": "integer"}, "base_ref": {"type": "string"}, }, "required": ["name"], }, }, { "name": "worktree_list", "description": "List worktrees tracked in .worktrees/index.json.", "input_schema": {"type": "object", "properties": {}}, }, { "name": "worktree_status", "description": "Show git status for one worktree.", "input_schema": { "type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"], }, }, { "name": "worktree_run", "description": "Run a shell command in a named worktree directory.", "input_schema": { "type": "object", "properties": { "name": {"type": "string"}, "command": {"type": "string"}, }, "required": ["name", "command"], }, }, { "name": "worktree_remove", "description": "Remove a worktree and optionally mark its bound task completed.", "input_schema": { "type": "object", "properties": { "name": {"type": "string"}, "force": {"type": "boolean"}, "complete_task": {"type": "boolean"}, }, "required": ["name"], }, }, { "name": "worktree_keep", "description": "Mark a worktree as kept in lifecycle state without removing it.", "input_schema": { "type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"], }, }, # --- 1个事件工具(可观测性)--- { "name": "worktree_events", "description": "List recent worktree/task lifecycle events from .worktrees/events.jsonl.", "input_schema": { "type": "object", "properties": {"limit": {"type": "integer"}}, }, }, ] # ================================================================ # Agent主循环:和之前课程完全一样的结构 # ================================================================ # 到了第12课,循环本身还是那个循环。 # 复杂度都在工具和持久化状态里,而不在对话轮次机制里。 def agent_loop(messages: list): while True: # 调用API response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000, ) messages.append({"role": "assistant", "content": response.content}) # 如果不需要调用工具,结束循环 if response.stop_reason != "tool_use": return # 处理工具调用 results = [] for block in response.content: if block.type == "tool_use": handler = TOOL_HANDLERS.get(block.name) try: output = handler(**block.input) if handler else f"Unknown tool: {block.name}" except Exception as e: output = f"Error: {e}" print(f"> {block.name}:") print(str(output)[:200]) results.append( { "type": "tool_result", "tool_use_id": block.id, "content": str(output), } ) messages.append({"role": "user", "content": results}) # ================================================================ # 入口:交互式命令行 # ================================================================ if __name__ == "__main__": print(f"Repo root for s12: {REPO_ROOT}") if not WORKTREES.git_available: # 即使不在git仓库里也能运行(任务和事件功能照常工作) # 但worktree操作会返回明确的错误 print("Note: Not in a git repo. worktree_* tools will return errors.") history = [] while True: try: query = input("\033[36ms12 >> \033[0m") # 青色提示符 except (EOFError, KeyboardInterrupt): break if query.strip().lower() in ("q", "exit", ""): break history.append({"role": "user", "content": query}) agent_loop(history) # 打印Agent的文本回复 response_content = history[-1]["content"] if isinstance(response_content, list): for block in response_content: if hasattr(block, "text"): print(block.text) print()
pythonclass EventBus: def __init__(self, event_log_path: Path): self.path = event_log_path # 日志文件路径
为什么需要EventBus?
Agent和模型的对话是"主通道",如果什么都往对话里塞,上下文会爆炸。EventBus是"侧通道"——所有生命周期事件(创建、删除、失败...)都写到一个文件里,需要的时候再查。
JSONL格式的妙处:
text一行一条JSON,追加写入,不用读取-修改-写回整个文件: {"event":"worktree.create.before","ts":1234,...} {"event":"worktree.create.after","ts":1235,...} {"event":"task.completed","ts":1300,...} 对比JSON数组格式(每次要读取整个文件再追加): [ {"event":"worktree.create.before",...}, {"event":"worktree.create.after",...}, ... 一万条?全部读进来再加一条?太浪费了 ]
pythonclass TaskManager: def create(self, subject, description="") -> str: task = { "id": self._next_id, "worktree": "", # ← 关键:初始为空,等待绑定 }
worktree字段是桥梁:
任务创建时worktree是空的。当Agent决定"这个任务需要一个独立目录来做"时,调用bind_worktree填上worktree名称。这样任务(控制面)和目录(执行面)就关联起来了。
bind_worktree的隐式状态变更:
pythondef bind_worktree(self, task_id, worktree, owner=""): if task["status"] == "pending": task["status"] = "in_progress" # 分配了执行环境 = 工作开始了
这是一个设计巧思:你不用手动改任务状态为"进行中",绑定worktree的时候就自动变了。因为你给任务分配了执行环境,肯定是要开始干活了。
创建worktree的核心命令:
pythonself._run_git(["worktree", "add", "-b", branch, str(path), base_ref])
翻译成命令行就是:
bashgit worktree add -b wt/auth-refactor .worktrees/auth-refactor HEAD
这一条命令做了三件事:
wt/auth-refactor.worktrees/auth-refactor/ 目录checkout这个分支worktree_run的隔离关键:
pythondef run(self, name, command): r = subprocess.run(command, shell=True, cwd=path, ...) # ^^^^^^^^ # 在worktree目录里执行!
cwd=path 就是隔离的全部秘密。命令在worktree自己的目录里执行,改的文件也在那个目录里,不会碰到其他worktree或主目录。
两种收尾方式:keep vs remove
textkeep(保留): 隔间先不还,标记为"kept" 适用于:还想继续改、想review一下、暂时搁置 remove(删除): 清空隔间还回去 可以同时标记任务完成(complete_task=True) 适用于:工作完成,合并到主分支后删掉
pythonTOOL_HANDLERS = { # 4个基础工具 + 5个任务工具 + 7个worktree工具 = 16个工具 "bash": ..., # 在主目录执行命令 "worktree_run": ..., # 在worktree目录执行命令(关键区别!) "worktree_events": ..., # 查看事件日志 }
Agent循环本身和第1课完全一样——这说明架构设计得好:复杂度在工具里,不在循环里。
bash# 启动(需要在git仓库里) python agents/s12_worktree_task_isolation.py # 如果不在git仓库里,会提示: # Note: Not in a git repo. worktree_* tools will return errors.
对话示例:
texts12 >> 帮我创建两个并行任务:重构认证模块 和 修复登录bug > task_create: {"id": 1, "subject": "重构认证模块", "status": "pending", "worktree": ""} > task_create: {"id": 2, "subject": "修复登录bug", "status": "pending", "worktree": ""} > worktree_create: {"name": "auth-refactor", "branch": "wt/auth-refactor", "task_id": 1, "status": "active"} > worktree_create: {"name": "fix-login", "branch": "wt/fix-login", "task_id": 2, "status": "active"} Agent: 已创建两个独立的工作目录: - auth-refactor(任务#1:重构认证模块) - fix-login(任务#2:修复登录bug) 每个任务都在自己的目录里工作,互不干扰。 s12 >> 在auth-refactor里创建一个新文件 > worktree_run: name: "auth-refactor" command: "echo 'new auth code' > auth_v2.py" → (no output) # 这个文件只存在于auth-refactor目录里 # fix-login目录和主目录都看不到这个文件! s12 >> 查看事件日志 > worktree_events: [ {"event": "worktree.create.before", "ts": 1234, "task": {"id": 1}}, {"event": "worktree.create.after", "ts": 1235, "task": {"id": 1}}, {"event": "worktree.create.before", "ts": 1236, "task": {"id": 2}}, {"event": "worktree.create.after", "ts": 1237, "task": {"id": 2}} ] s12 >> 认证重构完成了,删除worktree并标记任务完成 > worktree_remove: name: "auth-refactor", complete_task: true → Removed worktree 'auth-refactor' > worktree_events(最近事件): {"event": "task.completed", "task": {"id": 1, "subject": "重构认证模块", "status": "completed"}} {"event": "worktree.remove.after", "worktree": {"name": "auth-refactor", "status": "removed"}}
多Agent同时工作,不隔离就必然冲突。git worktree提供了目录级别的隔离,让每个任务都有自己的"小隔间"。
任务(Task)管"做什么",Worktree管"在哪做"。用task_id把两者连接起来。这种分离让系统更清晰,也更容易扩展。
不往对话里塞日志,而是写到单独的JSONL文件里。需要的时候用工具查询。这样既不污染上下文,又保留了完整的审计线索。
创建-使用-保留/删除,每个阶段都有对应的工具和事件记录。不会出现"隔间用完了忘记还"或"不知道谁在用哪个隔间"的情况。
从第1课到第12课,Agent的主循环结构始终是一样的:调API -> 检查是否要调工具 -> 调工具 -> 把结果喂回去。所有的进化都在工具和状态管理里。
S01 Agent Loop S02 Tool Use S03 Todo Write
基础对话循环 给Agent装上手脚 持久化任务管理
┌──────────┐ ┌──────────┐ ┌──────────┐
│ while: │ │ while: │ │ while: │
│ ask() │ ──装工具──→ │ ask() │ ──加状态──→ │ ask() │
│ reply() │ │ tools() │ │ tools() │
│ repeat │ │ repeat │ │ todos[] │
└──────────┘ └──────────┘ └──────────┘
"只会说话" "能读写文件了" "能记住待办了"
│ │ │
└────────────────────────────┴──────────────────────────┘
基础能力层
│
▼
S04 Subagent S05 Skill Loading S06 Context Compact
一个人干不完,派小弟 动态加载技能包 上下文快爆了,压缩
┌──────────┐ ┌──────────┐ ┌──────────┐
│ main │ │ agent │ │ agent │
│ ├─sub1 │ ──加技能──→ │ ├─skill │ ──压上下文──→│ compact │
│ └─sub2 │ │ └─skill │ │ resume │
└──────────┘ └──────────┘ └──────────┘
"老板+打工人" "按需学新技能" "忘掉细节记住重点"
│ │ │
└────────────────────────────┴──────────────────────────┘
效率提升层
│
▼
S07 Task System S08 Background Tasks S09 Agent Teams
正经的任务板 后台跑耗时任务 多Agent组队
┌──────────┐ ┌──────────┐ ┌──────────┐
│ tasks/ │ │ main │ │ lead │
│ t1.json │ ──异步化──→ │ bg_task │ ──组团队──→ │ ├─alice │
│ t2.json │ │ poll() │ │ └─bob │
└──────────┘ └──────────┘ └──────────┘
"文件化任务管理" "不用干等结果了" "三个人一起干活"
│ │ │
└────────────────────────────┴──────────────────────────┘
协作能力层
│
▼
S10 Team Protocols S11 Autonomous Agents S12 Worktree Isolation
团队沟通协议 自驱动Agent 目录隔离,终极形态
┌──────────┐ ┌──────────┐ ┌──────────────┐
│ lead │ │ agent │ │ task ←→ wt │
│ msg() │ ──自驱动──→ │ scan() │ ──加隔离──→ │ EventBus │
│ proto[] │ │ claim() │ │ .worktrees/ │
└──────────┘ └──────────┘ └──────────────┘
"统一沟通语言" "自己找活干" "各干各的不冲突"| 课程 | 一句话 | 解决的核心问题 |
|---|---|---|
| S01 | Agent Loop:最简单的对话循环 | 怎么和AI来回对话 |
| S02 | Tool Use:给Agent装上手脚 | AI只会说不会做 |
| S03 | Todo Write:让Agent记住任务 | 做到一半忘了 |
| S04 | Subagent:一个人干不完就派小弟 | 一个Agent忙不过来 |
| S05 | Skill Loading:按需加载技能 | 不可能什么都会 |
| S06 | Context Compact:上下文压缩 | 聊多了上下文爆了 |
| S07 | Task System:正经的任务看板 | 任务管理太原始 |
| S08 | Background Tasks:后台执行 | 耗时任务卡住主流程 |
| S09 | Agent Teams:多Agent组队 | 一个人干活太慢 |
| S10 | Team Protocols:团队协议 | 多人协作没有规矩 |
| S11 | Autonomous Agents:自驱动 | 老板累死了 |
| S12 | Worktree Isolation:目录隔离 | 多人改代码打架 |
单Agent能力: S01 对话 → S02 工具 → S03 记忆 → S05 技能 → S06 压缩
└── 一个Agent从"只会说话"到"能干活还不会忘事"
拆分与并行: S04 子Agent → S08 后台任务
└── 从"一个人干"到"能派活+异步"
团队协作: S07 任务板 → S09 组队 → S10 协议 → S11 自驱动
└── 从"有任务板"到"自己找活干的团队"
终极隔离: S12 Worktree
└── 团队并行工作时互不干扰的最后一块拼图恭喜你完成了全部12课! 你已经从零开始,一步步构建了一个完整的多Agent协作系统:能对话、能执行、能记忆、能分工、能自驱动、能隔离并行。这就是Claude Code背后的核心思想。