第 07 课

第7课:Task System —— 持久化任务管理

一句话总结:把任务存成 JSON 文件,Agent 重启、上下文压缩都不怕丢,还能自动管理任务之间的依赖关系。

一句话总结:把任务存成 JSON 文件,Agent 重启、上下文压缩都不怕丢,还能自动管理任务之间的依赖关系。


你将学到什么


核心概念:从"记在脑子里"到"写在纸上"

问题:上下文一压缩,任务就丢了

前面几课我们的 Agent 已经能用工具了,但有个致命问题:

假设你让 Agent 管理一个项目,创建了10个任务。聊着聊着,对话越来越长,上下文窗口装不下了,系统做了一次"上下文压缩"(总结之前的对话)。

结果: 之前创建的任务全丢了!因为它们只存在对话历史里。

类比: 你在白板上列了一堆待办事项,结果有人把白板擦了。你只记得"好像有些事要做",但具体是什么?不知道了。

解决方案:把任务写到文件里

之前(内存模式):                    现在(文件模式):

对话历史                              .tasks/ 目录
+---------------------------+         +---------------------------+
| user: 创建任务A            |         | task_1.json               |
| assistant: 好的,任务A创建  |         |   {"id":1, "subject":"A"} |
| user: 创建任务B            |         | task_2.json               |
| ...                       |         |   {"id":2, "subject":"B"} |
| [上下文压缩:之前的内容被   |         | task_3.json               |
|  总结成一段话]              |         |   {"id":3, "subject":"C"} |
| 任务信息丢失!              |         +---------------------------+
+---------------------------+         文件永远在磁盘上,不会丢!

每个任务都是一个独立的 JSON 文件,存在 .tasks/ 目录下。Agent 重启了?没关系,读文件就行。上下文压缩了?没关系,任务还在文件里。

任务依赖:blockedBy

真实项目中,任务之间有先后顺序:

text
任务1:搭建数据库 状态:pending 任务2:写 API 接口 状态:pending, blockedBy: [1] 任务3:写前端页面 状态:pending, blockedBy: [2]

任务2 被任务1 阻塞 —— 数据库没搭好,API 接口写不了。当你把任务1 标记为 completed,系统会自动把任务2 的阻塞移除。

这就像工厂流水线:上一道工序没完成,下一道不能开始。


和上一课的对比

第5课的结构:                        第7课的结构:

+-- 初始化配置                       +-- 初始化配置
+-- System Prompt                   +-- System Prompt(加了任务管理说明)
+-- safe_path                       +-- safe_path
+-- SkillManager                    +-- SkillManager
+-- TOOL_HANDLERS(5个工具)         +-- TaskManager(新增!持久化任务管理)
+-- TOOLS(5个工具)                 +-- TOOL_HANDLERS(9个工具!+4个任务工具)
+-- agent_loop(没变)               +-- TOOLS(9个工具!)
+-- main                            +-- agent_loop(没变!)
                                    +-- main

关键发现: 又一次,agent_loop 函数一行都没改!我们只是:

  1. 加了一个 TaskManager 类(处理文件读写)
  2. 加了4个工具定义和处理函数
  3. Agent 就自动会管理任务了

ASCII 流程图

用户: "帮我规划这个项目的任务"
   |
   v
+---------------------+
| Agent 调用           |
| task_create          |--------> TaskManager.create()
| 创建任务1            |              |
+---------------------+              v
   |                          .tasks/task_1.json  (写入磁盘)
   v
+---------------------+
| Agent 调用           |
| task_create          |--------> TaskManager.create()
| 创建任务2            |              |
| blockedBy: [1]       |              v
+---------------------+          .tasks/task_2.json  (写入磁盘)
   |
   v
+---------------------+
| Agent 调用           |
| task_update          |--------> TaskManager.update()
| task_id=1            |              |
| status=completed     |              v
+---------------------+          1. 更新 task_1.json (status=completed)
   |                             2. 自动扫描所有任务
   |                             3. 发现 task_2 的 blockedBy 包含 1
   v                             4. 自动移除 task_2 的阻塞!
+---------------------+
| Agent 调用           |
| task_list            |--------> 读取所有 .tasks/task_*.json
| 查看进度             |              |
+---------------------+              v
   |                          [x] #1: 搭建数据库
   v                          [ ] #2: 写API接口 (不再被阻塞!)
Agent 回复用户进展

完整代码

python
#!/usr/bin/env python3 """ 第7课:持久化任务系统 把任务存成 JSON 文件,支持状态管理和依赖关系。 Agent 重启、上下文压缩都不会丢失任务。 """ import json # 用来读写 JSON 文件 import subprocess # 用来执行 shell 命令 from pathlib import Path # 用来操作文件路径 from anthropic import Anthropic # Anthropic 官方 SDK # ── 基础配置 ────────────────────────────────────────────────── MODEL = "claude-sonnet-4-20250514" # 使用的模型 WORKDIR = Path.cwd() # 工作目录:当前运行目录 TASKS_DIR = WORKDIR / ".tasks" # 任务文件存放目录 client = Anthropic() # 创建 API 客户端 # ── 系统提示词 ──────────────────────────────────────────────── # 告诉 AI 它有哪些能力,以及怎么管理任务 SYSTEM = f"""You are a helpful coding agent. Working directory: {WORKDIR} You have task management tools to organize work: - task_create: Create a new task with a subject and optional description - task_update: Update task status (pending/in_progress/completed) or dependencies - task_list: List all tasks and their status - task_get: Get full details of a specific task Task workflow: 1. Break down user requests into tasks 2. Use blockedBy to mark dependencies between tasks 3. Work on tasks in order, updating status as you go 4. When completing a task, dependencies are automatically cleared Always use task_list to check current state before starting work. """ # ── 路径安全函数 ────────────────────────────────────────────── def safe_path(p: str) -> Path: """ 确保路径在工作目录下,防止 AI 访问系统敏感文件。 比如 AI 传入 "../../etc/passwd",这个函数会把它限制在 WORKDIR 下。 """ resolved = (WORKDIR / p).resolve() if not str(resolved).startswith(str(WORKDIR.resolve())): raise ValueError(f"Path escapes working directory: {p}") return resolved # ── TaskManager:持久化任务管理器 ────────────────────────────── class TaskManager: """ 把每个任务存成独立的 JSON 文件。 文件名格式:task_1.json, task_2.json, ... 存放在 .tasks/ 目录下。 """ def __init__(self, tasks_dir: Path): self.dir = tasks_dir # 任务文件存放目录 self.dir.mkdir(exist_ok=True) # 目录不存在就创建 self._next_id = self._max_id() + 1 # 下一个可用的任务 ID def _max_id(self) -> int: """ 扫描已有的任务文件,找出最大的 ID。 比如目录里有 task_1.json, task_3.json,返回 3。 如果没有任务文件,返回 0。 """ ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")] return max(ids) if ids else 0 def _load(self, task_id: int) -> dict: """ 从文件加载一个任务。 task_id=1 就读取 .tasks/task_1.json。 """ path = self.dir / f"task_{task_id}.json" if not path.exists(): raise ValueError(f"Task {task_id} not found") return json.loads(path.read_text()) def _save(self, task: dict): """ 把任务保存到文件。 ensure_ascii=False 是为了让中文正常显示,不会变成 \u4e2d\u6587 这种。 """ path = self.dir / f"task_{task['id']}.json" path.write_text(json.dumps(task, indent=2, ensure_ascii=False)) def create(self, subject: str, description: str = "") -> str: """ 创建一个新任务。 - subject: 任务标题(必填) - description: 任务详细描述(选填) 返回创建好的任务 JSON。 """ task = { "id": self._next_id, # 自增 ID "subject": subject, # 任务标题 "description": description, # 任务描述 "status": "pending", # 初始状态:待处理 "blockedBy": [], # 依赖列表:被哪些任务阻塞 "owner": "", # 负责人(预留字段) } self._save(task) # 写入文件 self._next_id += 1 # ID 自增 return json.dumps(task, indent=2, ensure_ascii=False) def update(self, task_id: int, status: str = None, add_blocked_by: list = None, remove_blocked_by: list = None) -> str: """ 更新任务状态或依赖关系。 - status: 新状态(pending / in_progress / completed) - add_blocked_by: 添加依赖(比如 [1, 3] 表示被任务1和3阻塞) - remove_blocked_by: 移除依赖 """ task = self._load(task_id) # 从文件读取任务 # 更新状态 if status: if status not in ("pending", "in_progress", "completed"): raise ValueError(f"Invalid status: {status}") task["status"] = status # 关键:完成任务时,自动解除其他任务对它的依赖 if status == "completed": self._clear_dependency(task_id) # 添加依赖 if add_blocked_by: # 用 set 去重,防止重复添加 task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by)) # 移除依赖 if remove_blocked_by: task["blockedBy"] = [x for x in task["blockedBy"] if x not in remove_blocked_by] self._save(task) # 保存更新后的任务 return json.dumps(task, indent=2, ensure_ascii=False) def _clear_dependency(self, completed_id: int): """ 当一个任务完成时,扫描所有任务,自动移除对它的依赖。 比如任务1完成了,所有 blockedBy 包含 1 的任务都会把 1 移除。 """ for f in self.dir.glob("task_*.json"): task = json.loads(f.read_text()) if completed_id in task.get("blockedBy", []): task["blockedBy"].remove(completed_id) self._save(task) def list_all(self) -> str: """ 列出所有任务的简要信息。 输出格式: [x] #1: 搭建数据库 [>] #2: 写API接口 [ ] #3: 写前端页面 (blocked by: [2]) """ tasks = [] # 按 ID 排序读取所有任务文件 files = sorted(self.dir.glob("task_*.json"), key=lambda f: int(f.stem.split("_")[1])) for f in files: tasks.append(json.loads(f.read_text())) if not tasks: return "No tasks." lines = [] for t in tasks: # 根据状态显示不同的标记 marker = { "pending": "[ ]", # 待处理 "in_progress": "[>]", # 进行中 "completed": "[x]", # 已完成 }.get(t["status"], "[?]") # 如果有阻塞依赖,显示出来 blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else "" lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}") return "\n".join(lines) def get(self, task_id: int) -> str: """获取单个任务的完整 JSON 详情。""" return json.dumps(self._load(task_id), indent=2, ensure_ascii=False) # ── 创建 TaskManager 实例 ────────────────────────────────────── TASKS = TaskManager(TASKS_DIR) # ── Bash 执行函数 ───────────────────────────────────────────── def run_bash(command: str, timeout: int = 30) -> str: """ 执行 shell 命令并返回输出。 有超时保护,默认 30 秒。 """ try: result = subprocess.run( command, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=timeout ) output = (result.stdout + result.stderr).strip() return output[:50000] if output else "(no output)" except subprocess.TimeoutExpired: return f"Error: Command timed out after {timeout}s" except Exception as e: return f"Error: {e}" # ── 文件操作函数 ────────────────────────────────────────────── def run_read(path: str, offset: int = 0, limit: int = 2000) -> str: """读取文件内容,支持偏移和行数限制。""" p = safe_path(path) lines = p.read_text().splitlines() selected = lines[offset:offset + limit] # 加上行号,方便 AI 定位 return "\n".join(f"{i + offset + 1}\t{line}" for i, line in enumerate(selected)) def run_write(path: str, content: str) -> str: """写入文件内容。""" p = safe_path(path) p.parent.mkdir(parents=True, exist_ok=True) # 确保父目录存在 p.write_text(content) return f"Wrote {len(content)} bytes to {path}" def run_edit(path: str, old_string: str, new_string: str) -> str: """精确替换文件中的指定文本。""" p = safe_path(path) text = p.read_text() if old_string not in text: return "Error: old_string not found in file" if text.count(old_string) > 1: return "Error: old_string appears multiple times, be more specific" p.write_text(text.replace(old_string, new_string, 1)) return "OK" # ── 工具处理分发表 ──────────────────────────────────────────── # 每个工具名对应一个执行函数 # lambda 的作用是把 AI 传来的参数转发给对应的函数 TOOL_HANDLERS = { # 基础工具 "bash": lambda **kw: run_bash(kw["command"], kw.get("timeout", 30)), "read_file": lambda **kw: run_read(kw["path"], kw.get("offset", 0), kw.get("limit", 2000)), "write_file": lambda **kw: run_write(kw["path"], kw["content"]), "edit_file": lambda **kw: run_edit(kw["path"], kw["old_string"], kw["new_string"]), # 任务管理工具(新增!) "task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")), "task_update": lambda **kw: TASKS.update( kw["task_id"], kw.get("status"), kw.get("addBlockedBy"), kw.get("removeBlockedBy") ), "task_list": lambda **kw: TASKS.list_all(), "task_get": lambda **kw: TASKS.get(kw["task_id"]), } # ── 工具定义列表(发给 AI 的) ────────────────────────────────── # 这个列表告诉 AI 有哪些工具可用,每个工具需要什么参数 TOOLS = [ # ---- 基础工具 ---- { "name": "bash", "description": "Run a shell command.", "input_schema": { "type": "object", "properties": { "command": {"type": "string", "description": "The command to run"}, "timeout": {"type": "integer", "description": "Timeout in seconds (default 30)"}, }, "required": ["command"], }, }, { "name": "read_file", "description": "Read file content with line numbers.", "input_schema": { "type": "object", "properties": { "path": {"type": "string", "description": "File path relative to working directory"}, "offset": {"type": "integer", "description": "Starting line (0-based)"}, "limit": {"type": "integer", "description": "Max lines to read"}, }, "required": ["path"], }, }, { "name": "write_file", "description": "Write content to a file.", "input_schema": { "type": "object", "properties": { "path": {"type": "string", "description": "File path"}, "content": {"type": "string", "description": "File content"}, }, "required": ["path", "content"], }, }, { "name": "edit_file", "description": "Replace exact text in a file.", "input_schema": { "type": "object", "properties": { "path": {"type": "string", "description": "File path"}, "old_string": {"type": "string", "description": "Text to find"}, "new_string": {"type": "string", "description": "Replacement text"}, }, "required": ["path", "old_string", "new_string"], }, }, # ---- 任务管理工具(本课新增!) ---- { "name": "task_create", "description": "Create a new task. Returns the created task as JSON.", "input_schema": { "type": "object", "properties": { "subject": {"type": "string", "description": "Task title/subject"}, "description": {"type": "string", "description": "Detailed description"}, }, "required": ["subject"], }, }, { "name": "task_update", "description": "Update task status or dependencies. When a task is completed, its blockers on other tasks are auto-cleared.", "input_schema": { "type": "object", "properties": { "task_id": {"type": "integer", "description": "Task ID to update"}, "status": {"type": "string", "enum": ["pending", "in_progress", "completed"], "description": "New status"}, "addBlockedBy": {"type": "array", "items": {"type": "integer"}, "description": "Task IDs to add as blockers"}, "removeBlockedBy": {"type": "array", "items": {"type": "integer"}, "description": "Task IDs to remove from blockers"}, }, "required": ["task_id"], }, }, { "name": "task_list", "description": "List all tasks with status markers: [ ] pending, [>] in_progress, [x] completed.", "input_schema": { "type": "object", "properties": {}, # 不需要任何参数 }, }, { "name": "task_get", "description": "Get full JSON details of a specific task.", "input_schema": { "type": "object", "properties": { "task_id": {"type": "integer", "description": "Task ID"}, }, "required": ["task_id"], }, }, ] # ── Agent 循环(和之前一模一样!) ──────────────────────────── def agent_loop(messages: list): """ 核心 Agent 循环:发消息 -> 收回复 -> 如果要用工具就执行 -> 继续循环。 注意:这个函数从第1课到现在,核心逻辑没变过! """ while True: # 1. 把对话历史发给 AI response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000, ) # 2. 把 AI 的回复加入历史 messages.append({"role": "assistant", "content": response.content}) # 3. 如果 AI 不想用工具了,说明它已经有最终答案,退出循环 if response.stop_reason != "tool_use": return # 4. 遍历所有工具调用,执行并收集结果 results = [] for block in response.content: if block.type == "tool_use": handler = TOOL_HANDLERS.get(block.name) # 查表找执行函数 try: output = handler(**block.input) if handler else f"Unknown tool: {block.name}" except Exception as e: output = f"Error: {e}" print(f" > {block.name}:") # 打印工具调用日志 print(f" {str(output)[:200]}") # 只显示前200字符 results.append({ "type": "tool_result", "tool_use_id": block.id, "content": str(output), }) # 5. 把工具结果作为 user 消息加入历史,继续循环 messages.append({"role": "user", "content": results}) # ── 主函数 ────────────────────────────────────────────────── if __name__ == "__main__": history = [] print("Task System Agent (type 'quit' to exit)") print(f"Tasks directory: {TASKS_DIR}") print("-" * 50) while True: try: query = input("\033[36ms07 >> \033[0m") # 青色提示符 except (EOFError, KeyboardInterrupt): break if query.strip().lower() in ("quit", "exit"): break if not query.strip(): continue # 把用户输入加入对话历史 history.append({"role": "user", "content": query}) # 运行 Agent 循环 agent_loop(history) # 打印 AI 的最终回复(最后一条 assistant 消息中的文字部分) for block in history[-1]["content"]: if hasattr(block, "text"): print(f"\n{block.text}")

代码逐行拆解

第一部分:TaskManager 类

这是本课的核心,我们一个方法一个方法地看:

初始化和 ID 管理

python
def __init__(self, tasks_dir: Path): self.dir = tasks_dir # 记住目录位置 self.dir.mkdir(exist_ok=True) # 创建目录,已存在也不报错 self._next_id = self._max_id() + 1 # 找到已有的最大 ID + 1

这里有个巧妙的设计:每次启动时,先扫描目录里已有的文件,找到最大的 ID 号。这样就算 Agent 重启了,ID 也不会重复。

python
def _max_id(self) -> int: # f.stem 是文件名去掉后缀,比如 "task_3.json" -> "task_3" # split("_")[1] 取 "_" 后面的数字部分 -> "3" # int() 转成整数 -> 3 ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")] return max(ids) if ids else 0

读写文件

python
def _load(self, task_id: int) -> dict: path = self.dir / f"task_{task_id}.json" # 拼出文件路径 if not path.exists(): raise ValueError(f"Task {task_id} not found") # 文件不存在就报错 return json.loads(path.read_text()) # 读文件 -> 解析 JSON -> 返回字典 def _save(self, task: dict): path = self.dir / f"task_{task['id']}.json" # indent=2 让 JSON 格式化好看 # ensure_ascii=False 让中文正常显示 path.write_text(json.dumps(task, indent=2, ensure_ascii=False))

_load_save 是带下划线开头的方法,表示"内部使用"。外部不需要直接调用它们。

创建任务

python
def create(self, subject, description=""): task = { "id": self._next_id, # 自增 ID,保证唯一 "subject": subject, # 任务标题 "description": description, # 详细描述 "status": "pending", # 新任务默认是 pending "blockedBy": [], # 默认没有阻塞 "owner": "", # 预留字段 } self._save(task) # 写入文件 self._next_id += 1 # ID 递增,准备给下一个任务 return json.dumps(task, ...) # 返回 JSON 字符串给 AI 看

更新任务(核心!)

python
def update(self, task_id, status=None, add_blocked_by=None, remove_blocked_by=None): task = self._load(task_id) # 先从文件读出来 if status: task["status"] = status if status == "completed": self._clear_dependency(task_id) # 完成时自动解除依赖! if add_blocked_by: task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by)) if remove_blocked_by: task["blockedBy"] = [x for x in task["blockedBy"] if x not in remove_blocked_by] self._save(task) # 写回文件

最关键的是 _clear_dependency:当任务1完成时,遍历所有任务文件,把所有 blockedBy 里的 1 都移除掉。

python
def _clear_dependency(self, completed_id): for f in self.dir.glob("task_*.json"): # 遍历所有任务文件 task = json.loads(f.read_text()) if completed_id in task.get("blockedBy", []): task["blockedBy"].remove(completed_id) # 移除依赖 self._save(task) # 保存

第二部分:工具注册

和之前一样,每个工具需要两个东西:

  1. TOOL_HANDLERS 里的处理函数(告诉程序怎么执行):
python
"task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")), "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), ...), "task_list": lambda **kw: TASKS.list_all(), "task_get": lambda **kw: TASKS.get(kw["task_id"]),
  1. TOOLS 里的工具定义(告诉 AI 有什么工具可用、参数是什么)。

第三部分:Agent Loop

和之前一模一样! 这就是好架构的力量 —— 加新功能不需要改核心逻辑。


运行效果

启动后,你可以这样和 Agent 对话:

text
$ python s07_task_system.py Task System Agent (type 'quit' to exit) Tasks directory: /home/user/project/.tasks -------------------------------------------------- s07 >> 帮我规划一个博客系统的开发任务 > task_create: {"id": 1, "subject": "设计数据库模型", ...} > task_create: {"id": 2, "subject": "实现用户认证API", ...} > task_create: {"id": 3, "subject": "实现文章CRUD API", ...} > task_update: {"id": 2, "blockedBy": [1], ...} > task_update: {"id": 3, "blockedBy": [1], ...} > task_list: [ ] #1: 设计数据库模型 [ ] #2: 实现用户认证API (blocked by: [1]) [ ] #3: 实现文章CRUD API (blocked by: [1]) 我已经创建了3个任务来开发博客系统: 1. 设计数据库模型 - 这是基础,其他任务都依赖它 2. 实现用户认证API - 被任务1阻塞,需要先有数据库 3. 实现文章CRUD API - 同样被任务1阻塞 任务2和3都依赖任务1,等数据库设计好后才能开始。 s07 >> 任务1完成了 > task_update: {"id": 1, "status": "completed", ...} > task_list: [x] #1: 设计数据库模型 [ ] #2: 实现用户认证API [ ] #3: 实现文章CRUD API 任务1已标记为完成!注意,任务2和3的阻塞已经自动解除了, 现在可以开始开发了。建议先从任务2(用户认证)开始。

注意看:任务1完成后,任务2和3的 (blocked by: [1]) 自动消失了!

重启程序后再看:

text
s07 >> 看看现在的任务 > task_list: [x] #1: 设计数据库模型 [ ] #2: 实现用户认证API [ ] #3: 实现文章CRUD API 任务都还在!之前的进度没有丢失。

任务都存在文件里,重启不丢。


关键收获

  1. 持久化 = 写文件:最简单也最可靠的持久化方式就是写 JSON 文件。不需要数据库,不需要 Redis,一个 .tasks/ 目录搞定。
  1. 每个任务一个文件:为什么不把所有任务存在一个大 JSON 文件里?因为多个任务文件可以独立读写,不会有并发冲突的问题。
  1. 依赖自动清除:完成任务时自动扫描并清除依赖,Agent 不需要手动管理,减少出错的可能。
  1. Agent Loop 不需要改:这是第7课了,Agent Loop 的核心代码还是那几行。所有新功能都是通过"加工具"实现的。
  1. ID 自动恢复:启动时扫描文件找最大 ID,保证重启后 ID 不会重复。

下一课预告

第8课我们来解决另一个实际问题:后台任务。有些命令要跑很久(比如运行测试、编译大项目),Agent 不应该傻等着。我们会用线程让命令在后台跑,Agent 可以继续做别的事,等命令跑完了再通知它。

上一课 06. 上下文压缩