第 12 课

第12课:Worktree隔离 —— 各干各的不冲突(终章)

一句话总结:多个Agent同时改代码会打架(文件冲突)。用git worktree给每个任务创建独立的工作目录,各写各的互不干扰,干完再合并回来。加上EventBus做全局日志,谁干了啥一清二楚。

一句话总结:多个Agent同时改代码会打架(文件冲突)。用git worktree给每个任务创建独立的工作目录,各写各的互不干扰,干完再合并回来。加上EventBus做全局日志,谁干了啥一清二楚。


你将学到什么


核心概念:用目录隔离解决并行冲突

问题:大家在同一个目录里改代码,必然打架

上一课我们的Agent已经能自己找活干了,但有一个致命问题:

所有Agent都在同一个代码目录里工作

text
场景:两个Agent同时改代码 alice正在改 auth.py,改到一半... bob也要改 auth.py,直接覆盖了alice的修改... alice: "我的代码呢???" 或者更隐蔽的: alice: git checkout -b feature-login bob: git checkout -b fix-bug → bob的checkout把alice正在改的文件全切走了!

类比:图书馆只有一张桌子

text
想象一个图书馆只有一张大桌子: alice在桌上摊开论文资料,正在写第三章... bob也来了,把alice的资料推到一边,摊开自己的... charlie也来了... → 大家互相干扰,谁都写不好 解决方案:给每人一个小隔间! 隔间1: alice安静写她的论文 隔间2: bob安静写他的论文 隔间3: charlie安静写他的论文 → 各写各的,互不干扰 → 写完了,各自交给装订处(合并回主分支)

git worktree就是"小隔间"

传统方式(一个目录):               worktree方式(多个目录):

my-project/                         my-project/           ← 主目录(main分支)
  ├── src/                          .worktrees/
  ├── tests/                          ├── auth-refactor/  ← 隔间1(wt/auth-refactor分支)
  └── ...                             │     ├── src/
  所有人都在这里改                    │     └── tests/
  切分支就会互相影响                  ├── fix-bug/        ← 隔间2(wt/fix-bug分支)
                                      │     ├── src/
                                      │     └── tests/
                                      └── add-tests/      ← 隔间3(wt/add-tests分支)
                                            ├── src/
                                            └── tests/
                                    每个隔间是独立的完整代码副本!

两个面:控制面 vs 执行面

控制面(Task):                    执行面(Worktree):
"做什么"                           "在哪做"

.tasks/                            .worktrees/
  task_1.json:                       auth-refactor/
    subject: "重构认证"                ├── src/
    status: "in_progress"              └── ...(独立的代码目录)
    worktree: "auth-refactor"  ──────→
                                     fix-bug/
  task_2.json:                         ├── src/
    subject: "修复登录bug"              └── ...(独立的代码目录)
    worktree: "fix-bug"  ────────────→

任务管理 ← task_id → 目录绑定       命令在这里执行

核心设计理念: 用任务ID把"做什么"和"在哪做"连接起来。

EventBus:全局黑匣子

text
所有重要事件都记到一个JSONL文件里(每行一条JSON): events.jsonl: {"event":"worktree.create.before", "ts":1234, "task":{"id":1}, "worktree":{"name":"auth"}} {"event":"worktree.create.after", "ts":1235, "task":{"id":1}, "worktree":{"name":"auth","status":"active"}} {"event":"task.completed", "ts":1300, "task":{"id":1,"subject":"重构认证","status":"completed"}} {"event":"worktree.remove.after", "ts":1301, "worktree":{"name":"auth","status":"removed"}} 好处: - 出了问题可以回溯:谁在什么时候做了什么 - Agent可以查看历史:worktree_events工具读取最近N条事件 - 不污染对话:事件是侧通道,不会塞进和模型的对话里

流程图:一个任务的完整生命周期

用户输入: "创建任务:重构认证模块"
    │
    ▼
┌─────────────┐    EventBus记录
│ task_create  │───→ (无,任务创建暂不记事件)
│ 创建任务#1   │
│ status:pending│
└─────┬───────┘
      │
      ▼
┌──────────────────┐    EventBus记录
│ worktree_create   │───→ worktree.create.before
│ name:"auth"       │───→ worktree.create.after
│ task_id: 1        │
│                   │
│ 实际执行:          │
│ git worktree add  │
│ -b wt/auth        │
│ .worktrees/auth   │
│ HEAD              │
└─────┬────────────┘
      │  同时自动:task#1.worktree = "auth"
      │           task#1.status = "in_progress"
      ▼
┌──────────────────┐
│ worktree_run      │    在隔间里执行命令
│ name:"auth"       │    cwd = .worktrees/auth/
│ cmd:"编写代码..."  │    互不影响其他worktree
└─────┬────────────┘
      │
      ▼ 工作完成,两种选择:
      │
  ┌───┴───┐
  │       │
  ▼       ▼
┌─────┐ ┌──────────────────┐
│keep │ │ remove            │    EventBus记录
│保留  │ │ complete_task=True│───→ task.completed
│隔间  │ │ 删除隔间+完成任务  │───→ worktree.remove.after
└─────┘ └──────────────────┘

和上一课的对比

维度 S11 自治Agent S12 Worktree隔离
解决的问题 Agent被动等老板派活 多Agent改代码互相冲突
核心机制 IDLE/WORK模式 + 扫描认领 git worktree + 目录隔离
任务管理 有(扫描+认领) 有(创建+绑定worktree)
代码隔离 无(都在同一目录) 有(每任务独立目录)
生命周期追踪 EventBus全程记录
收尾方式 任务标记完成 keep(保留)或remove(删除)worktree
类比 看板管理:自己揭任务 图书馆隔间:各写各的论文

进化关系: S11让Agent会自己找活干,S12让Agent干活时不互相打架。两者结合 = 自驱动 + 无冲突的完美团队。


完整代码(带全面中文注释)

python
#!/usr/bin/env python3 # 第12课:Worktree + Task 隔离 # 用目录级隔离实现并行任务执行,互不冲突 """ s12_worktree_task_isolation.py - Worktree + Task Isolation 目录级别的隔离,让并行任务执行不冲突。 任务(Task)是控制面,工作树(Worktree)是执行面。 .tasks/task_12.json ← 控制面:记录"做什么" { "id": 12, "subject": "Implement auth refactor", "status": "in_progress", "worktree": "auth-refactor" ← 绑定到哪个执行目录 } .worktrees/index.json ← 执行面:记录"在哪做" { "worktrees": [ { "name": "auth-refactor", "path": ".../.worktrees/auth-refactor", "branch": "wt/auth-refactor", "task_id": 12, "status": "active" } ] } 核心理念:"用目录隔离执行,用任务ID协调关联。" """ import json import os import re import subprocess import time from pathlib import Path from anthropic import Anthropic from dotenv import load_dotenv # 加载环境变量 load_dotenv(override=True) # 如果用的是第三方兼容API,清掉多余的认证变量 if os.getenv("ANTHROPIC_BASE_URL"): os.environ.pop("ANTHROPIC_AUTH_TOKEN", None) WORKDIR = Path.cwd() # 当前工作目录 client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL")) # API客户端 MODEL = os.environ["MODEL_ID"] # 模型ID def detect_repo_root(cwd: Path) -> Path | None: """检测当前目录是否在git仓库里,返回仓库根目录。""" try: r = subprocess.run( ["git", "rev-parse", "--show-toplevel"], # git命令:获取仓库根目录 cwd=cwd, capture_output=True, text=True, timeout=10, ) if r.returncode != 0: return None # 不在git仓库里 root = Path(r.stdout.strip()) return root if root.exists() else None except Exception: return None # 检测仓库根目录,找不到就用当前目录兜底 # 这样即使不在git仓库里,任务和事件功能仍然可以演示 REPO_ROOT = detect_repo_root(WORKDIR) or WORKDIR # 系统提示词:告诉Agent它有哪些工具可以用 SYSTEM = ( f"You are a coding agent at {WORKDIR}. " "Use task + worktree tools for multi-task work. " "For parallel or risky changes: create tasks, allocate worktree lanes, " "run commands in those lanes, then choose keep/remove for closeout. " "Use worktree_events when you need lifecycle visibility." ) # ================================================================ # EventBus:追加写入的事件日志,用于全局可观测性 # ================================================================ # 类比:飞机上的黑匣子,记录所有重要操作 # 好处:事件是"侧通道",不会塞进模型对话里污染上下文 class EventBus: def __init__(self, event_log_path: Path): self.path = event_log_path self.path.parent.mkdir(parents=True, exist_ok=True) # 确保目录存在 if not self.path.exists(): self.path.write_text("") # 创建空日志文件 def emit( self, event: str, # 事件名称,如 "worktree.create.before" task: dict | None = None, # 关联的任务信息 worktree: dict | None = None, # 关联的worktree信息 error: str | None = None, # 错误信息(如果有) ): """写入一条事件到JSONL日志文件(追加写入,每行一条JSON)。""" payload = { "event": event, "ts": time.time(), # 时间戳 "task": task or {}, "worktree": worktree or {}, } if error: payload["error"] = error # 追加写入,不会覆盖已有记录 with self.path.open("a", encoding="utf-8") as f: f.write(json.dumps(payload) + "\n") def list_recent(self, limit: int = 20) -> str: """读取最近N条事件,供Agent查询历史。 这是一个只读工具——让模型可以回顾之前发生了什么, 用于推理和决策,而不是凭记忆猜。 """ n = max(1, min(int(limit or 20), 200)) # 限制范围:1~200条 lines = self.path.read_text(encoding="utf-8").splitlines() recent = lines[-n:] # 取最后N行 items = [] for line in recent: try: items.append(json.loads(line)) except Exception: items.append({"event": "parse_error", "raw": line}) # 解析失败也不丢 return json.dumps(items, indent=2) # ================================================================ # TaskManager:持久化的任务看板,支持worktree绑定 # ================================================================ # 每个任务存为一个JSON文件:.tasks/task_1.json, task_2.json, ... # 关键字段:id, subject, status, worktree(绑定的工作目录名) class TaskManager: def __init__(self, tasks_dir: Path): self.dir = tasks_dir self.dir.mkdir(parents=True, exist_ok=True) # 确保目录存在 self._next_id = self._max_id() + 1 # 自增ID def _max_id(self) -> int: """扫描已有任务文件,找到最大ID。""" ids = [] for f in self.dir.glob("task_*.json"): try: ids.append(int(f.stem.split("_")[1])) # 从文件名提取ID except Exception: pass return max(ids) if ids else 0 def _path(self, task_id: int) -> Path: """任务文件路径:.tasks/task_1.json""" return self.dir / f"task_{task_id}.json" def _load(self, task_id: int) -> dict: """加载一个任务。""" path = self._path(task_id) if not path.exists(): raise ValueError(f"Task {task_id} not found") return json.loads(path.read_text()) def _save(self, task: dict): """保存一个任务到文件。""" self._path(task["id"]).write_text(json.dumps(task, indent=2)) def create(self, subject: str, description: str = "") -> str: """创建新任务。worktree字段初始为空,等待后续绑定。 worktree字段是控制面(任务)和执行面(工作目录)之间的桥梁。 """ task = { "id": self._next_id, "subject": subject, # 任务标题 "description": description, # 任务描述 "status": "pending", # 初始状态:待处理 "owner": "", # 负责人(暂空) "worktree": "", # 绑定的worktree名(暂空) "blockedBy": [], # 依赖的其他任务 "created_at": time.time(), "updated_at": time.time(), } self._save(task) self._next_id += 1 return json.dumps(task, indent=2) def get(self, task_id: int) -> str: """获取任务详情。""" return json.dumps(self._load(task_id), indent=2) def exists(self, task_id: int) -> bool: """检查任务是否存在。""" return self._path(task_id).exists() def update(self, task_id: int, status: str = None, owner: str = None) -> str: """更新任务状态或负责人。""" task = self._load(task_id) if status: if status not in ("pending", "in_progress", "completed"): raise ValueError(f"Invalid status: {status}") task["status"] = status if owner is not None: task["owner"] = owner task["updated_at"] = time.time() self._save(task) return json.dumps(task, indent=2) def bind_worktree(self, task_id: int, worktree: str, owner: str = "") -> str: """把任务绑定到一个worktree。 绑定意味着"这个任务将在这个独立目录里执行"。 如果任务还是pending状态,绑定时自动变为in_progress—— 因为分配了执行环境,就说明工作开始了。 """ task = self._load(task_id) task["worktree"] = worktree if owner: task["owner"] = owner if task["status"] == "pending": # 绑定执行环境 = 隐式开始工作 task["status"] = "in_progress" task["updated_at"] = time.time() self._save(task) return json.dumps(task, indent=2) def unbind_worktree(self, task_id: int) -> str: """解除任务和worktree的绑定(worktree被删除后调用)。""" task = self._load(task_id) task["worktree"] = "" task["updated_at"] = time.time() self._save(task) return json.dumps(task, indent=2) def list_all(self) -> str: """列出所有任务,显示状态、负责人、绑定的worktree。""" tasks = [] for f in sorted(self.dir.glob("task_*.json")): tasks.append(json.loads(f.read_text())) if not tasks: return "No tasks." lines = [] for t in tasks: # 状态标记:[ ] 待处理 [>] 进行中 [x] 已完成 marker = { "pending": "[ ]", "in_progress": "[>]", "completed": "[x]", }.get(t["status"], "[?]") owner = f" owner={t['owner']}" if t.get("owner") else "" wt = f" wt={t['worktree']}" if t.get("worktree") else "" lines.append(f"{marker} #{t['id']}: {t['subject']}{owner}{wt}") return "\n".join(lines) # 初始化任务管理器和事件总线 TASKS = TaskManager(REPO_ROOT / ".tasks") EVENTS = EventBus(REPO_ROOT / ".worktrees" / "events.jsonl") # ================================================================ # WorktreeManager:git worktree的完整生命周期管理 # ================================================================ # 核心能力:创建/列出/执行命令/保留/删除 worktree # 每个操作都通过EventBus记录生命周期事件 class WorktreeManager: def __init__(self, repo_root: Path, tasks: TaskManager, events: EventBus): self.repo_root = repo_root self.tasks = tasks # 需要TaskManager来做绑定 self.events = events # 需要EventBus来记日志 self.dir = repo_root / ".worktrees" self.dir.mkdir(parents=True, exist_ok=True) # index.json:记录所有worktree的元信息 self.index_path = self.dir / "index.json" if not self.index_path.exists(): self.index_path.write_text(json.dumps({"worktrees": []}, indent=2)) self.git_available = self._is_git_repo() # 检查是否在git仓库里 def _is_git_repo(self) -> bool: """检查当前目录是否是git仓库。""" try: r = subprocess.run( ["git", "rev-parse", "--is-inside-work-tree"], cwd=self.repo_root, capture_output=True, text=True, timeout=10, ) return r.returncode == 0 except Exception: return False def _run_git(self, args: list[str]) -> str: """统一的git命令执行入口。 所有git worktree操作都走这一个入口, 好处是:失败时统一转成Python异常,方便记录事件日志。 """ if not self.git_available: raise RuntimeError("Not in a git repository. worktree tools require git.") r = subprocess.run( ["git", *args], cwd=self.repo_root, capture_output=True, text=True, timeout=120, # 最多等2分钟 ) if r.returncode != 0: msg = (r.stdout + r.stderr).strip() raise RuntimeError(msg or f"git {' '.join(args)} failed") return (r.stdout + r.stderr).strip() or "(no output)" def _load_index(self) -> dict: """加载worktree索引文件。""" return json.loads(self.index_path.read_text()) def _save_index(self, data: dict): """保存worktree索引文件。""" self.index_path.write_text(json.dumps(data, indent=2)) def _find(self, name: str) -> dict | None: """按名称查找worktree。""" idx = self._load_index() for wt in idx.get("worktrees", []): if wt.get("name") == name: return wt return None def _validate_name(self, name: str): """校验worktree名称:只允许字母、数字、点、下划线、横线,1~40字符。""" if not re.fullmatch(r"[A-Za-z0-9._-]{1,40}", name or ""): raise ValueError( "Invalid worktree name. Use 1-40 chars: letters, numbers, ., _, -" ) def create(self, name: str, task_id: int = None, base_ref: str = "HEAD") -> str: """创建一个新的worktree(独立工作目录)。 核心命令:git worktree add -b wt/<name> .worktrees/<name> HEAD 这会同时创建一个新分支和一个checkout出来的目录, 给每个任务一条独立的执行通道(lane)。 如果指定了task_id,会自动把任务和这个worktree绑定。 """ self._validate_name(name) if self._find(name): raise ValueError(f"Worktree '{name}' already exists in index") if task_id is not None and not self.tasks.exists(task_id): raise ValueError(f"Task {task_id} not found") path = self.dir / name # worktree的物理路径 branch = f"wt/{name}" # 对应的git分支名 # 创建前记录事件 self.events.emit( "worktree.create.before", task={"id": task_id} if task_id is not None else {}, worktree={"name": name, "base_ref": base_ref}, ) try: # 关键命令!创建worktree + 新分支 # -b branch:创建新分支 # str(path):checkout到这个目录 # base_ref:基于哪个提交创建(默认HEAD) self._run_git(["worktree", "add", "-b", branch, str(path), base_ref]) # worktree的元信息 entry = { "name": name, "path": str(path), "branch": branch, "task_id": task_id, "status": "active", # 状态:活跃 "created_at": time.time(), } # 写入索引 idx = self._load_index() idx["worktrees"].append(entry) self._save_index(idx) if task_id is not None: # 在任务看板上记录:这个任务在哪个目录里执行 # 这样人类和模型后续都能找到正确的目录 self.tasks.bind_worktree(task_id, name) # 创建后记录事件 self.events.emit( "worktree.create.after", task={"id": task_id} if task_id is not None else {}, worktree={ "name": name, "path": str(path), "branch": branch, "status": "active", }, ) return json.dumps(entry, indent=2) except Exception as e: # 创建失败也要记录事件,方便排查 self.events.emit( "worktree.create.failed", task={"id": task_id} if task_id is not None else {}, worktree={"name": name, "base_ref": base_ref}, error=str(e), ) raise def list_all(self) -> str: """列出所有worktree及其状态。""" idx = self._load_index() wts = idx.get("worktrees", []) if not wts: return "No worktrees in index." lines = [] for wt in wts: suffix = f" task={wt['task_id']}" if wt.get("task_id") else "" lines.append( f"[{wt.get('status', 'unknown')}] {wt['name']} -> " f"{wt['path']} ({wt.get('branch', '-')}){suffix}" ) return "\n".join(lines) def status(self, name: str) -> str: """查看指定worktree的git状态(有没有未提交的修改等)。""" wt = self._find(name) if not wt: return f"Error: Unknown worktree '{name}'" path = Path(wt["path"]) if not path.exists(): return f"Error: Worktree path missing: {path}" r = subprocess.run( ["git", "status", "--short", "--branch"], cwd=path, # 在worktree目录里执行git status capture_output=True, text=True, timeout=60, ) text = (r.stdout + r.stderr).strip() return text or "Clean worktree" def run(self, name: str, command: str) -> str: """在指定的worktree目录里执行shell命令。 这就是worktree隔离的核心价值: 每个并行任务都在自己的checkout目录里改文件, 不会影响其他worktree或主目录。 """ # 安全检查:拦截危险命令 dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"] if any(d in command for d in dangerous): return "Error: Dangerous command blocked" wt = self._find(name) if not wt: return f"Error: Unknown worktree '{name}'" path = Path(wt["path"]) if not path.exists(): return f"Error: Worktree path missing: {path}" try: # 关键:cwd=path,命令在worktree自己的目录里执行 r = subprocess.run( command, shell=True, cwd=path, # ← 这里是隔离的关键! capture_output=True, text=True, timeout=300, # 最多等5分钟 ) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (300s)" def remove(self, name: str, force: bool = False, complete_task: bool = False) -> str: """删除一个worktree。 可选参数: - force:强制删除(即使有未提交的修改) - complete_task:同时把绑定的任务标记为completed 类比:论文写完了,把小隔间清空还回去,论文交给装订处。 """ wt = self._find(name) if not wt: return f"Error: Unknown worktree '{name}'" # 删除前记录事件 self.events.emit( "worktree.remove.before", task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {}, worktree={"name": name, "path": wt.get("path")}, ) try: # 执行 git worktree remove args = ["worktree", "remove"] if force: args.append("--force") args.append(wt["path"]) self._run_git(args) # 如果需要同时完成任务 if complete_task and wt.get("task_id") is not None: task_id = wt["task_id"] before = json.loads(self.tasks.get(task_id)) self.tasks.update(task_id, status="completed") self.tasks.unbind_worktree(task_id) # 任务完成是"一等公民"事件—— # 因为worktree的收尾往往比git输出本身更重要 self.events.emit( "task.completed", task={ "id": task_id, "subject": before.get("subject", ""), "status": "completed", }, worktree={"name": name}, ) # 更新索引:标记为removed idx = self._load_index() for item in idx.get("worktrees", []): if item.get("name") == name: item["status"] = "removed" item["removed_at"] = time.time() self._save_index(idx) # 删除后记录事件 self.events.emit( "worktree.remove.after", task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {}, worktree={"name": name, "path": wt.get("path"), "status": "removed"}, ) return f"Removed worktree '{name}'" except Exception as e: # 删除失败也要记录 self.events.emit( "worktree.remove.failed", task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {}, worktree={"name": name, "path": wt.get("path")}, error=str(e), ) raise def keep(self, name: str) -> str: """保留worktree(不删除),标记为kept状态。 类比:论文还没写完,隔间先留着,下次继续。 或者:论文写完了但想再检查一遍,隔间先不还。 """ wt = self._find(name) if not wt: return f"Error: Unknown worktree '{name}'" idx = self._load_index() kept = None for item in idx.get("worktrees", []): if item.get("name") == name: item["status"] = "kept" # 标记为"保留" item["kept_at"] = time.time() kept = item self._save_index(idx) # 记录保留事件 self.events.emit( "worktree.keep", task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {}, worktree={ "name": name, "path": wt.get("path"), "status": "kept", }, ) return json.dumps(kept, indent=2) if kept else f"Error: Unknown worktree '{name}'" # 初始化Worktree管理器(依赖TaskManager和EventBus) WORKTREES = WorktreeManager(REPO_ROOT, TASKS, EVENTS) # ================================================================ # 基础工具:文件读写、命令执行(和之前课程一样的风格) # ================================================================ def safe_path(p: str) -> Path: """安全路径检查:确保不会逃逸出工作目录。""" path = (WORKDIR / p).resolve() if not path.is_relative_to(WORKDIR): raise ValueError(f"Path escapes workspace: {p}") return path def run_bash(command: str) -> str: """在工作目录执行shell命令(注意:这是在主目录,不是在worktree里)。""" dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"] if any(d in command for d in dangerous): return "Error: Dangerous command blocked" try: r = subprocess.run( command, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=120, ) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" def run_read(path: str, limit: int = None) -> str: """读取文件内容。""" try: lines = safe_path(path).read_text().splitlines() if limit and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more)"] return "\n".join(lines)[:50000] except Exception as e: return f"Error: {e}" def run_write(path: str, content: str) -> str: """写入文件内容。""" try: fp = safe_path(path) fp.parent.mkdir(parents=True, exist_ok=True) fp.write_text(content) return f"Wrote {len(content)} bytes" except Exception as e: return f"Error: {e}" def run_edit(path: str, old_text: str, new_text: str) -> str: """替换文件中的指定文本。""" try: fp = safe_path(path) c = fp.read_text() if old_text not in c: return f"Error: Text not found in {path}" fp.write_text(c.replace(old_text, new_text, 1)) return f"Edited {path}" except Exception as e: return f"Error: {e}" # ================================================================ # 工具注册表:把所有工具函数映射到工具名称 # ================================================================ TOOL_HANDLERS = { # --- 基础工具 --- "bash": lambda **kw: run_bash(kw["command"]), "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")), "write_file": lambda **kw: run_write(kw["path"], kw["content"]), "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]), # --- 任务工具(控制面)--- "task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")), "task_list": lambda **kw: TASKS.list_all(), "task_get": lambda **kw: TASKS.get(kw["task_id"]), "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("owner")), "task_bind_worktree": lambda **kw: TASKS.bind_worktree(kw["task_id"], kw["worktree"], kw.get("owner", "")), # --- Worktree工具(执行面)--- "worktree_create": lambda **kw: WORKTREES.create(kw["name"], kw.get("task_id"), kw.get("base_ref", "HEAD")), "worktree_list": lambda **kw: WORKTREES.list_all(), "worktree_status": lambda **kw: WORKTREES.status(kw["name"]), "worktree_run": lambda **kw: WORKTREES.run(kw["name"], kw["command"]), "worktree_keep": lambda **kw: WORKTREES.keep(kw["name"]), "worktree_remove": lambda **kw: WORKTREES.remove(kw["name"], kw.get("force", False), kw.get("complete_task", False)), # --- 事件工具(可观测性)--- "worktree_events": lambda **kw: EVENTS.list_recent(kw.get("limit", 20)), } # ================================================================ # 工具定义:告诉Claude每个工具的名称、描述、参数格式 # ================================================================ TOOLS = [ # --- 4个基础工具 --- { "name": "bash", "description": "Run a shell command in the current workspace (blocking).", "input_schema": { "type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"], }, }, { "name": "read_file", "description": "Read file contents.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "limit": {"type": "integer"}, }, "required": ["path"], }, }, { "name": "write_file", "description": "Write content to file.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "content": {"type": "string"}, }, "required": ["path", "content"], }, }, { "name": "edit_file", "description": "Replace exact text in file.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}, }, "required": ["path", "old_text", "new_text"], }, }, # --- 4个任务工具(控制面)--- { "name": "task_create", "description": "Create a new task on the shared task board.", "input_schema": { "type": "object", "properties": { "subject": {"type": "string"}, "description": {"type": "string"}, }, "required": ["subject"], }, }, { "name": "task_list", "description": "List all tasks with status, owner, and worktree binding.", "input_schema": {"type": "object", "properties": {}}, }, { "name": "task_get", "description": "Get task details by ID.", "input_schema": { "type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"], }, }, { "name": "task_update", "description": "Update task status or owner.", "input_schema": { "type": "object", "properties": { "task_id": {"type": "integer"}, "status": { "type": "string", "enum": ["pending", "in_progress", "completed"], }, "owner": {"type": "string"}, }, "required": ["task_id"], }, }, { "name": "task_bind_worktree", "description": "Bind a task to a worktree name.", "input_schema": { "type": "object", "properties": { "task_id": {"type": "integer"}, "worktree": {"type": "string"}, "owner": {"type": "string"}, }, "required": ["task_id", "worktree"], }, }, # --- 6个Worktree工具(执行面)--- { "name": "worktree_create", "description": "Create a git worktree and optionally bind it to a task.", "input_schema": { "type": "object", "properties": { "name": {"type": "string"}, "task_id": {"type": "integer"}, "base_ref": {"type": "string"}, }, "required": ["name"], }, }, { "name": "worktree_list", "description": "List worktrees tracked in .worktrees/index.json.", "input_schema": {"type": "object", "properties": {}}, }, { "name": "worktree_status", "description": "Show git status for one worktree.", "input_schema": { "type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"], }, }, { "name": "worktree_run", "description": "Run a shell command in a named worktree directory.", "input_schema": { "type": "object", "properties": { "name": {"type": "string"}, "command": {"type": "string"}, }, "required": ["name", "command"], }, }, { "name": "worktree_remove", "description": "Remove a worktree and optionally mark its bound task completed.", "input_schema": { "type": "object", "properties": { "name": {"type": "string"}, "force": {"type": "boolean"}, "complete_task": {"type": "boolean"}, }, "required": ["name"], }, }, { "name": "worktree_keep", "description": "Mark a worktree as kept in lifecycle state without removing it.", "input_schema": { "type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"], }, }, # --- 1个事件工具(可观测性)--- { "name": "worktree_events", "description": "List recent worktree/task lifecycle events from .worktrees/events.jsonl.", "input_schema": { "type": "object", "properties": {"limit": {"type": "integer"}}, }, }, ] # ================================================================ # Agent主循环:和之前课程完全一样的结构 # ================================================================ # 到了第12课,循环本身还是那个循环。 # 复杂度都在工具和持久化状态里,而不在对话轮次机制里。 def agent_loop(messages: list): while True: # 调用API response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000, ) messages.append({"role": "assistant", "content": response.content}) # 如果不需要调用工具,结束循环 if response.stop_reason != "tool_use": return # 处理工具调用 results = [] for block in response.content: if block.type == "tool_use": handler = TOOL_HANDLERS.get(block.name) try: output = handler(**block.input) if handler else f"Unknown tool: {block.name}" except Exception as e: output = f"Error: {e}" print(f"> {block.name}:") print(str(output)[:200]) results.append( { "type": "tool_result", "tool_use_id": block.id, "content": str(output), } ) messages.append({"role": "user", "content": results}) # ================================================================ # 入口:交互式命令行 # ================================================================ if __name__ == "__main__": print(f"Repo root for s12: {REPO_ROOT}") if not WORKTREES.git_available: # 即使不在git仓库里也能运行(任务和事件功能照常工作) # 但worktree操作会返回明确的错误 print("Note: Not in a git repo. worktree_* tools will return errors.") history = [] while True: try: query = input("\033[36ms12 >> \033[0m") # 青色提示符 except (EOFError, KeyboardInterrupt): break if query.strip().lower() in ("q", "exit", ""): break history.append({"role": "user", "content": query}) agent_loop(history) # 打印Agent的文本回复 response_content = history[-1]["content"] if isinstance(response_content, list): for block in response_content: if hasattr(block, "text"): print(block.text) print()

代码逐行拆解

第一层:EventBus —— 全局黑匣子

python
class EventBus: def __init__(self, event_log_path: Path): self.path = event_log_path # 日志文件路径

为什么需要EventBus?

Agent和模型的对话是"主通道",如果什么都往对话里塞,上下文会爆炸。EventBus是"侧通道"——所有生命周期事件(创建、删除、失败...)都写到一个文件里,需要的时候再查。

JSONL格式的妙处:

text
一行一条JSON,追加写入,不用读取-修改-写回整个文件: {"event":"worktree.create.before","ts":1234,...} {"event":"worktree.create.after","ts":1235,...} {"event":"task.completed","ts":1300,...} 对比JSON数组格式(每次要读取整个文件再追加): [ {"event":"worktree.create.before",...}, {"event":"worktree.create.after",...}, ... 一万条?全部读进来再加一条?太浪费了 ]

第二层:TaskManager —— 任务看板

python
class TaskManager: def create(self, subject, description="") -> str: task = { "id": self._next_id, "worktree": "", # ← 关键:初始为空,等待绑定 }

worktree字段是桥梁:

任务创建时worktree是空的。当Agent决定"这个任务需要一个独立目录来做"时,调用bind_worktree填上worktree名称。这样任务(控制面)和目录(执行面)就关联起来了。

bind_worktree的隐式状态变更:

python
def bind_worktree(self, task_id, worktree, owner=""): if task["status"] == "pending": task["status"] = "in_progress" # 分配了执行环境 = 工作开始了

这是一个设计巧思:你不用手动改任务状态为"进行中",绑定worktree的时候就自动变了。因为你给任务分配了执行环境,肯定是要开始干活了。

第三层:WorktreeManager —— 隔间管理

创建worktree的核心命令:

python
self._run_git(["worktree", "add", "-b", branch, str(path), base_ref])

翻译成命令行就是:

bash
git worktree add -b wt/auth-refactor .worktrees/auth-refactor HEAD

这一条命令做了三件事:

  1. 基于HEAD创建新分支 wt/auth-refactor
  2. .worktrees/auth-refactor/ 目录checkout这个分支
  3. 现在这个目录就是一个完整的代码副本,可以独立修改

worktree_run的隔离关键:

python
def run(self, name, command): r = subprocess.run(command, shell=True, cwd=path, ...) # ^^^^^^^^ # 在worktree目录里执行!

cwd=path 就是隔离的全部秘密。命令在worktree自己的目录里执行,改的文件也在那个目录里,不会碰到其他worktree或主目录。

两种收尾方式:keep vs remove

text
keep(保留): 隔间先不还,标记为"kept" 适用于:还想继续改、想review一下、暂时搁置 remove(删除): 清空隔间还回去 可以同时标记任务完成(complete_task=True) 适用于:工作完成,合并到主分支后删掉

第四层:工具注册和Agent循环

python
TOOL_HANDLERS = { # 4个基础工具 + 5个任务工具 + 7个worktree工具 = 16个工具 "bash": ..., # 在主目录执行命令 "worktree_run": ..., # 在worktree目录执行命令(关键区别!) "worktree_events": ..., # 查看事件日志 }

Agent循环本身和第1课完全一样——这说明架构设计得好:复杂度在工具里,不在循环里。


运行效果

bash
# 启动(需要在git仓库里) python agents/s12_worktree_task_isolation.py # 如果不在git仓库里,会提示: # Note: Not in a git repo. worktree_* tools will return errors.

对话示例:

text
s12 >> 帮我创建两个并行任务:重构认证模块 和 修复登录bug > task_create: {"id": 1, "subject": "重构认证模块", "status": "pending", "worktree": ""} > task_create: {"id": 2, "subject": "修复登录bug", "status": "pending", "worktree": ""} > worktree_create: {"name": "auth-refactor", "branch": "wt/auth-refactor", "task_id": 1, "status": "active"} > worktree_create: {"name": "fix-login", "branch": "wt/fix-login", "task_id": 2, "status": "active"} Agent: 已创建两个独立的工作目录: - auth-refactor(任务#1:重构认证模块) - fix-login(任务#2:修复登录bug) 每个任务都在自己的目录里工作,互不干扰。 s12 >> 在auth-refactor里创建一个新文件 > worktree_run: name: "auth-refactor" command: "echo 'new auth code' > auth_v2.py" → (no output) # 这个文件只存在于auth-refactor目录里 # fix-login目录和主目录都看不到这个文件! s12 >> 查看事件日志 > worktree_events: [ {"event": "worktree.create.before", "ts": 1234, "task": {"id": 1}}, {"event": "worktree.create.after", "ts": 1235, "task": {"id": 1}}, {"event": "worktree.create.before", "ts": 1236, "task": {"id": 2}}, {"event": "worktree.create.after", "ts": 1237, "task": {"id": 2}} ] s12 >> 认证重构完成了,删除worktree并标记任务完成 > worktree_remove: name: "auth-refactor", complete_task: true → Removed worktree 'auth-refactor' > worktree_events(最近事件): {"event": "task.completed", "task": {"id": 1, "subject": "重构认证模块", "status": "completed"}} {"event": "worktree.remove.after", "worktree": {"name": "auth-refactor", "status": "removed"}}

关键收获

1. 隔离是并行的前提

多Agent同时工作,不隔离就必然冲突。git worktree提供了目录级别的隔离,让每个任务都有自己的"小隔间"。

2. 控制面和执行面分离

任务(Task)管"做什么",Worktree管"在哪做"。用task_id把两者连接起来。这种分离让系统更清晰,也更容易扩展。

3. EventBus是侧通道观测

不往对话里塞日志,而是写到单独的JSONL文件里。需要的时候用工具查询。这样既不污染上下文,又保留了完整的审计线索。

4. 生命周期管理很重要

创建-使用-保留/删除,每个阶段都有对应的工具和事件记录。不会出现"隔间用完了忘记还"或"不知道谁在用哪个隔间"的情况。

5. Agent循环本身没变

从第1课到第12课,Agent的主循环结构始终是一样的:调API -> 检查是否要调工具 -> 调工具 -> 把结果喂回去。所有的进化都在工具和状态管理里。


整个课程回顾:从零到完整Agent系统的进化之路

12课进化全景图

S01 Agent Loop                S02 Tool Use               S03 Todo Write
基础对话循环                    给Agent装上手脚             持久化任务管理
┌──────────┐                 ┌──────────┐               ┌──────────┐
│ while:   │                 │ while:   │               │ while:   │
│  ask()   │   ──装工具──→   │  ask()   │  ──加状态──→  │  ask()   │
│  reply() │                 │  tools() │               │  tools() │
│  repeat  │                 │  repeat  │               │  todos[] │
└──────────┘                 └──────────┘               └──────────┘
"只会说话"                    "能读写文件了"               "能记住待办了"
     │                            │                          │
     └────────────────────────────┴──────────────────────────┘
                              基础能力层
                                  │
                                  ▼
S04 Subagent                 S05 Skill Loading           S06 Context Compact
一个人干不完,派小弟            动态加载技能包              上下文快爆了,压缩
┌──────────┐                 ┌──────────┐               ┌──────────┐
│ main     │                 │ agent    │               │ agent    │
│  ├─sub1  │   ──加技能──→   │  ├─skill │  ──压上下文──→│  compact │
│  └─sub2  │                 │  └─skill │               │  resume  │
└──────────┘                 └──────────┘               └──────────┘
"老板+打工人"                 "按需学新技能"               "忘掉细节记住重点"
     │                            │                          │
     └────────────────────────────┴──────────────────────────┘
                              效率提升层
                                  │
                                  ▼
S07 Task System              S08 Background Tasks        S09 Agent Teams
正经的任务板                    后台跑耗时任务              多Agent组队
┌──────────┐                 ┌──────────┐               ┌──────────┐
│ tasks/   │                 │ main     │               │ lead     │
│  t1.json │   ──异步化──→   │  bg_task │  ──组团队──→  │  ├─alice │
│  t2.json │                 │  poll()  │               │  └─bob   │
└──────────┘                 └──────────┘               └──────────┘
"文件化任务管理"               "不用干等结果了"             "三个人一起干活"
     │                            │                          │
     └────────────────────────────┴──────────────────────────┘
                              协作能力层
                                  │
                                  ▼
S10 Team Protocols           S11 Autonomous Agents       S12 Worktree Isolation
团队沟通协议                    自驱动Agent                目录隔离,终极形态
┌──────────┐                 ┌──────────┐               ┌──────────────┐
│ lead     │                 │ agent    │               │ task ←→ wt   │
│  msg()   │  ──自驱动──→    │  scan()  │  ──加隔离──→  │ EventBus     │
│  proto[] │                 │  claim() │               │ .worktrees/  │
└──────────┘                 └──────────┘               └──────────────┘
"统一沟通语言"                "自己找活干"                "各干各的不冲突"

每课一句话总结

课程 一句话 解决的核心问题
S01 Agent Loop:最简单的对话循环 怎么和AI来回对话
S02 Tool Use:给Agent装上手脚 AI只会说不会做
S03 Todo Write:让Agent记住任务 做到一半忘了
S04 Subagent:一个人干不完就派小弟 一个Agent忙不过来
S05 Skill Loading:按需加载技能 不可能什么都会
S06 Context Compact:上下文压缩 聊多了上下文爆了
S07 Task System:正经的任务看板 任务管理太原始
S08 Background Tasks:后台执行 耗时任务卡住主流程
S09 Agent Teams:多Agent组队 一个人干活太慢
S10 Team Protocols:团队协议 多人协作没有规矩
S11 Autonomous Agents:自驱动 老板累死了
S12 Worktree Isolation:目录隔离 多人改代码打架

能力进化主线

单Agent能力:       S01 对话 → S02 工具 → S03 记忆 → S05 技能 → S06 压缩
                         └── 一个Agent从"只会说话"到"能干活还不会忘事"

拆分与并行:        S04 子Agent → S08 后台任务
                         └── 从"一个人干"到"能派活+异步"

团队协作:          S07 任务板 → S09 组队 → S10 协议 → S11 自驱动
                         └── 从"有任务板"到"自己找活干的团队"

终极隔离:          S12 Worktree
                         └── 团队并行工作时互不干扰的最后一块拼图

恭喜你完成了全部12课! 你已经从零开始,一步步构建了一个完整的多Agent协作系统:能对话、能执行、能记忆、能分工、能自驱动、能隔离并行。这就是Claude Code背后的核心思想。

上一课 11. 自治 Agent