第 08 课

第8课:后台任务 - 让Agent边想边干,不傻等

一句话总结:用 Python 线程在后台跑耗时命令,Agent 不用干等,命令跑完后通过"通知队列"把结果送回来。

一句话总结:用 Python 线程在后台跑耗时命令,Agent 不用干等,命令跑完后通过"通知队列"把结果送回来。


你将学到什么


核心概念:点了外卖就别站门口等

问题:有些命令跑起来要好几分钟

想象你让 Agent 跑一个测试套件:

text
s08 >> 帮我跑一下所有测试

测试要跑 3 分钟。如果 Agent 傻等着,这 3 分钟里它啥都干不了。你问它话,它不理你。你让它看个文件,它也不理你。就像你点了外卖,然后站在门口盯着路口等骑手 —— 3 分钟白白浪费。

解决方案:扔到后台,干完了通知我

聪明的做法是:点完外卖,回去继续干活,手机响了再去拿

对应到代码里就是:

  1. 把命令丢给一个后台线程去跑
  2. Agent 立刻拿到一个"任务ID",继续干别的
  3. 后台线程跑完后,把结果丢进通知队列
  4. Agent 下次和 LLM 对话前,先检查通知队列,看看有没有跑完的任务
类比:

你(Agent)                          外卖骑手(后台线程)
+-------------------------------+    +---------------------------+
| 1. 点外卖(启动后台命令)       |    |                           |
| 2. 拿到订单号(任务ID)        |    | 1. 接单                   |
| 3. 继续工作(不傻等)          |    | 2. 取餐                   |
| 4. ...做其他事...             |    | 3. 送餐中...              |
| 5. 手机响了!(检查通知队列)   | <--| 4. 到了!(结果入队列)     |
| 6. 拿到外卖(获取命令结果)     |    |                           |
+-------------------------------+    +---------------------------+

ASCII 流程图

主线程(Agent Loop)                    后台线程 A              后台线程 B
   |                                      |                      |
   |--- background_run("pytest") -------->|                      |
   |    返回 task_id: "a1b2c3d4"          |                      |
   |                                      | [pytest 运行中...]    |
   |--- background_run("npm build") ----------------------------->|
   |    返回 task_id: "e5f6g7h8"          |                      |
   |                                      |                      | [npm 编译中...]
   |--- 用户问了别的问题                    |                      |
   |--- Agent 正常处理,不受影响            |                      |
   |                                      |                      |
   |                                      | pytest 跑完了!       |
   |                                      |---> 通知队列.append() |
   |                                      |                      |
   |=== 下次 LLM 调用前 ===               |                      |
   |<-- drain_notifications()             |                      |
   |    拿到 pytest 结果                   |                      |
   |    注入为 <background-results> 消息    |                      |
   |--- LLM 看到结果,做出反应              |                      |
   |                                      |                      | npm 编译完了!
   |                                      |                      |---> 通知队列.append()
   |=== 再下次 LLM 调用前 ===              |                      |
   |<-- drain_notifications()                                    |
   |    拿到 npm build 结果                                       |
   |--- LLM 看到结果,做出反应                                     |

关键点:主线程从不等待后台线程,后台线程通过通知队列单向传递结果。


和上一课的对比

第7课的结构:                         第8课的结构:

+-- 初始化配置                        +-- 初始化配置
+-- System Prompt                    +-- System Prompt(提到 background_run)
+-- safe_path                        +-- safe_path
+-- TaskManager                      +-- BackgroundManager(新增!线程+队列)
+-- TOOL_HANDLERS(9个工具)          +-- TOOL_HANDLERS(6个工具,含2个后台工具)
+-- TOOLS                            +-- TOOLS
+-- agent_loop(没变)                +-- agent_loop(有变化!加了 drain 逻辑)
+-- main                             +-- main

本课的重大变化: agent_loop 终于改了!在每次 LLM 调用前,多了一步"检查通知队列"的操作。但改动很小,就多了几行代码。


完整代码

python
#!/usr/bin/env python3 """ 第8课:后台任务 用线程在后台跑耗时命令,Agent 不用傻等。 命令跑完后通过通知队列把结果送回 Agent。 """ import os # 操作系统相关(环境变量等) import subprocess # 执行 shell 命令 import threading # Python 多线程(本课核心!) import uuid # 生成唯一任务 ID from pathlib import Path # 文件路径操作 from anthropic import Anthropic # Anthropic 官方 SDK from dotenv import load_dotenv # 读取 .env 文件中的环境变量 # ── 基础配置 ────────────────────────────────────────────────── load_dotenv(override=True) # 加载 .env 文件,override=True 表示覆盖已有的环境变量 # 如果设置了自定义 API 端点,清掉可能冲突的认证变量 if os.getenv("ANTHROPIC_BASE_URL"): os.environ.pop("ANTHROPIC_AUTH_TOKEN", None) WORKDIR = Path.cwd() # 工作目录:当前运行目录 client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL")) # 创建 API 客户端 MODEL = os.environ["MODEL_ID"] # 从环境变量读取模型 ID # 系统提示词:告诉 AI 可以用 background_run 跑耗时命令 SYSTEM = f"You are a coding agent at {WORKDIR}. Use background_run for long-running commands." # ══════════════════════════════════════════════════════════════ # BackgroundManager:本课的核心!线程执行 + 通知队列 # ══════════════════════════════════════════════════════════════ class BackgroundManager: """ 管理后台任务的类。 职责: 1. 接收命令,在后台线程中执行 2. 执行完毕后,把结果放入通知队列 3. 提供查询接口,让 Agent 可以主动检查任务状态 """ def __init__(self): # tasks 字典:记录所有任务的状态,是"持久化的真相来源" # 格式:{task_id: {"status": "running/completed/error", "result": "...", "command": "..."}} self.tasks = {} # 通知队列:只存放"刚跑完、Agent 还没看到"的结果 # Agent 看过之后就清空,保证每个结果只通知一次 self._notification_queue = [] # 线程锁:因为后台线程和主线程会同时操作通知队列, # 必须用锁来防止"两个人同时往同一个篮子里放东西"导致数据混乱 self._lock = threading.Lock() def run(self, command: str) -> str: """ 启动一个后台任务。 关键:这个函数会立刻返回,不会等命令跑完! 返回值是任务ID,Agent 可以用它来查询状态。 """ # 生成一个8位的短UUID作为任务ID(完整UUID太长了) task_id = str(uuid.uuid4())[:8] # 在字典中记录这个任务,初始状态是 "running" self.tasks[task_id] = { "status": "running", "result": None, "command": command } # 创建一个后台线程来执行命令 # daemon=True 的意思是:主程序退出时,这个线程也跟着退出 # 如果不设 daemon,主程序想退出时会被还在跑的线程卡住 thread = threading.Thread( target=self._execute, # 线程要执行的函数 args=(task_id, command), # 传给函数的参数 daemon=True # 设为守护线程 ) thread.start() # 启动线程!这行代码会立刻返回,不会等 # 立刻告诉 Agent:任务已启动,这是你的任务ID return f"Background task {task_id} started: {command[:80]}" def _execute(self, task_id: str, command: str): """ 在后台线程中执行命令。 这个函数在另一个线程里跑,主线程不会被阻塞。 """ try: # 后台任务给 300 秒超时(比前台的 120 秒更长) # 因为后台任务本来就是给"跑得慢"的命令用的 r = subprocess.run( command, shell=True, # 用 shell 解释命令(支持管道、通配符等) cwd=WORKDIR, # 在工作目录下执行 capture_output=True, # 捕获 stdout 和 stderr text=True, # 输出为字符串(不是 bytes) timeout=300 # 5 分钟超时 ) # 合并标准输出和错误输出,截取前 50000 字符(防止输出太大撑爆内存) output = (r.stdout + r.stderr).strip()[:50000] status = "completed" except subprocess.TimeoutExpired: output = "Error: Timeout (300s)" status = "timeout" except Exception as e: output = f"Error: {e}" status = "error" # 更新任务状态(写入 tasks 字典) self.tasks[task_id]["status"] = status self.tasks[task_id]["result"] = output or "(no output)" # 把结果放入通知队列 # 注意:这里必须加锁!因为主线程可能同时在读通知队列 with self._lock: self._notification_queue.append({ "task_id": task_id, "status": status, "command": command[:80], # 命令只取前80字符(节省 token) "result": (output or "(no output)")[:500], # 结果只取前500字符 }) def check(self, task_id: str = None) -> str: """ 查询任务状态。 - 传 task_id:查看某个具体任务的详情 - 不传:列出所有任务的简要状态 """ if task_id: t = self.tasks.get(task_id) if not t: return f"Error: Unknown task {task_id}" return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}" # 列出所有任务 lines = [] for tid, t in self.tasks.items(): lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}") return "\n".join(lines) if lines else "No background tasks." def drain_notifications(self) -> list: """ 取出并清空通知队列。 这是"精确一次投递"的关键: 1. 复制队列内容 2. 清空队列 3. 返回复制的内容 这样每个完成的任务只会被 Agent 看到一次,不会重复通知。 必须加锁,因为后台线程可能正在往队列里加东西。 """ with self._lock: notifs = list(self._notification_queue) # 复制一份 self._notification_queue.clear() # 清空原队列 return notifs # 创建全局的后台管理器实例 BG = BackgroundManager() # ── 工具函数 ───────────────────────────────────────────────── def safe_path(p: str) -> Path: """确保路径不会逃出工作目录(安全保障)。""" path = (WORKDIR / p).resolve() if not path.is_relative_to(WORKDIR): raise ValueError(f"Path escapes workspace: {p}") return path def run_bash(command: str) -> str: """ 执行 shell 命令(前台阻塞模式)。 和 background_run 的区别:这个会等命令跑完才返回。 适合快速命令,比如 ls、cat、echo。 """ # 安全检查:拦截危险命令 dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"] if any(d in command for d in dangerous): return "Error: Dangerous command blocked" try: r = subprocess.run( command, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=120 # 前台超时 120 秒 ) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" def run_read(path: str, limit: int = None) -> str: """读取文件内容,可以限制行数。""" try: lines = safe_path(path).read_text().splitlines() if limit and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more)"] return "\n".join(lines)[:50000] except Exception as e: return f"Error: {e}" def run_write(path: str, content: str) -> str: """写入文件内容。""" try: fp = safe_path(path) fp.parent.mkdir(parents=True, exist_ok=True) fp.write_text(content) return f"Wrote {len(content)} bytes" except Exception as e: return f"Error: {e}" def run_edit(path: str, old_text: str, new_text: str) -> str: """精确替换文件中的文本。""" try: fp = safe_path(path) c = fp.read_text() if old_text not in c: return f"Error: Text not found in {path}" fp.write_text(c.replace(old_text, new_text, 1)) return f"Edited {path}" except Exception as e: return f"Error: {e}" # ── 工具注册 ───────────────────────────────────────────────── # 工具处理分发表:工具名 -> 执行函数 TOOL_HANDLERS = { "bash": lambda **kw: run_bash(kw["command"]), "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")), "write_file": lambda **kw: run_write(kw["path"], kw["content"]), "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]), # 本课新增的两个工具: "background_run": lambda **kw: BG.run(kw["command"]), # 启动后台任务 "check_background": lambda **kw: BG.check(kw.get("task_id")), # 查询后台任务 } # 工具定义列表:告诉 AI 有哪些工具、每个工具需要什么参数 TOOLS = [ { "name": "bash", "description": "Run a shell command (blocking).", "input_schema": { "type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"], }, }, { "name": "read_file", "description": "Read file contents.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "limit": {"type": "integer"}, }, "required": ["path"], }, }, { "name": "write_file", "description": "Write content to file.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "content": {"type": "string"}, }, "required": ["path", "content"], }, }, { "name": "edit_file", "description": "Replace exact text in file.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}, }, "required": ["path", "old_text", "new_text"], }, }, { # 本课重点工具1:后台执行命令 "name": "background_run", "description": "Run command in background thread. Returns task_id immediately.", "input_schema": { "type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"], }, }, { # 本课重点工具2:查询后台任务状态 "name": "check_background", "description": "Check background task status. Omit task_id to list all.", "input_schema": { "type": "object", "properties": {"task_id": {"type": "string"}}, }, }, ] # ══════════════════════════════════════════════════════════════ # Agent Loop —— 本课有变化!多了 drain_notifications 的逻辑 # ══════════════════════════════════════════════════════════════ def agent_loop(messages: list): """ 核心 Agent 循环。 和之前的区别:每次调用 LLM 之前,先检查后台任务有没有跑完的, 如果有,就把结果作为一条新消息注入对话历史。 """ while True: # ★ 新增:在调用 LLM 之前,先检查后台任务的通知队列 notifs = BG.drain_notifications() if notifs and messages: # 把后台完成的结果拼成一段文字 notif_text = "\n".join( f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs ) # 作为一条 "user" 消息注入对话历史 # 用 <background-results> 标签包裹,让 LLM 知道这是后台结果 messages.append({ "role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>" }) # 以下和之前的课程完全一样 ↓↓↓ # 1. 调用 LLM response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000, ) # 2. 把 AI 回复加入历史 messages.append({"role": "assistant", "content": response.content}) # 3. 如果 AI 不想用工具了,退出循环 if response.stop_reason != "tool_use": return # 4. 执行所有工具调用 results = [] for block in response.content: if block.type == "tool_use": handler = TOOL_HANDLERS.get(block.name) try: output = handler(**block.input) if handler else f"Unknown tool: {block.name}" except Exception as e: output = f"Error: {e}" print(f"> {block.name}:") print(str(output)[:200]) results.append({ "type": "tool_result", "tool_use_id": block.id, "content": str(output), }) # 5. 把工具结果加入历史,继续循环 messages.append({"role": "user", "content": results}) # ── 主函数 ────────────────────────────────────────────────── if __name__ == "__main__": history = [] while True: try: query = input("\033[36ms08 >> \033[0m") # 青色提示符 except (EOFError, KeyboardInterrupt): break if query.strip().lower() in ("q", "exit", ""): break history.append({"role": "user", "content": query}) agent_loop(history) # 打印 AI 的最终回复 response_content = history[-1]["content"] if isinstance(response_content, list): for block in response_content: if hasattr(block, "text"): print(block.text) print()

代码逐行拆解

第一部分:BackgroundManager 类

这是本课的绝对核心。它有三个关键数据结构:

三大数据结构

python
def __init__(self): self.tasks = {} # 所有任务的完整状态(长期保存) self._notification_queue = [] # 等待通知的结果(一次性消费) self._lock = threading.Lock() # 线程锁(防止数据竞争)

为什么要分 tasks_notification_queue 两个东西?

就像公司的通知系统:

run() 方法:启动后台任务

python
def run(self, command: str) -> str: task_id = str(uuid.uuid4())[:8] # 生成8位短ID,比如 "a1b2c3d4" self.tasks[task_id] = {...} # 登记任务,状态为 "running" thread = threading.Thread( target=self._execute, # 这个线程要执行 _execute 函数 args=(task_id, command), # 传给 _execute 的参数 daemon=True # 守护线程:主程序退出时自动终止 ) thread.start() # 启动线程,立刻返回! return f"Background task {task_id} started: ..." # 告诉 Agent 任务已启动

关键理解:thread.start() 不会等线程执行完。它只是说"你去后台跑吧",然后自己继续往下走。这就是"非阻塞"。

_execute() 方法:后台线程的执行体

python
def _execute(self, task_id, command): try: r = subprocess.run(command, shell=True, ..., timeout=300) output = (r.stdout + r.stderr).strip()[:50000] status = "completed" except subprocess.TimeoutExpired: output = "Error: Timeout (300s)" status = "timeout" except Exception as e: output = f"Error: {e}" status = "error" # 更新任务状态 self.tasks[task_id]["status"] = status self.tasks[task_id]["result"] = output or "(no output)" # ★ 关键:把结果放入通知队列 with self._lock: # 加锁! self._notification_queue.append({ "task_id": task_id, "status": status, "command": command[:80], # 只取前80字符,节省 token "result": (output or "(no output)")[:500], # 只取前500字符 })

注意两个截断:

这是有意为之的设计:通知消息要精简(因为它会被注入对话历史,占 token),但完整结果要保留着以备查询。

drain_notifications() 方法:取走并清空通知

python
def drain_notifications(self) -> list: with self._lock: # 加锁 notifs = list(self._notification_queue) # 复制一份 self._notification_queue.clear() # 清空原队列 return notifs # 返回复制的那份

为什么要"复制再清空"而不是直接返回?

如果直接 return self._notification_queue 然后 self._notification_queue = [],在这两行之间如果后台线程刚好往队列里加了个结果,那个结果就永远丢失了

"复制再清空"是在锁的保护下一步完成的,不会有这个问题。

这个模式叫做 "精确一次投递"(exactly-once delivery):每个后台结果恰好被 Agent 看到一次,不会多也不会少。

第二部分:Agent Loop 的变化

python
def agent_loop(messages: list): while True: # ★ 新增的几行:检查后台通知 notifs = BG.drain_notifications() if notifs and messages: notif_text = "\n".join( f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs ) messages.append({ "role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>" }) # 以下和之前一样... response = client.messages.create(...)

就加了这么几行!作用是:

  1. 每次要调用 LLM 之前,先看看通知队列有没有东西
  2. 如果有,把后台结果包装成一条 user 消息,注入到对话历史里
  3. LLM 就会看到这些结果,并做出相应的反应

为什么注入为 user 消息而不是 assistant 消息?因为在 Anthropic API 中,assistant 消息是 AI 说的话,user 消息是"外部输入"。后台结果是外部来的信息,所以作为 user 消息注入是正确的。

标签包裹,是为了让 LLM 明确知道"这不是用户说的话,而是后台任务的结果"。

第三部分:两个新工具

本课新增的工具很简单:

工具名 功能 阻塞?
bash 执行命令,等它跑完 是(最多等120秒)
background_run 启动后台命令,立刻返回 否(立刻返回任务ID)
check_background 查询后台任务状态

Agent 需要自己判断什么时候用 bash(快速命令),什么时候用 background_run(耗时命令)。System Prompt 里的那句 "Use background_run for long-running commands" 就是在引导它做这个判断。


运行效果

text
$ python s08_background_tasks.py s08 >> 帮我在后台跑一下测试,然后趁等的时候看看 README 文件 > background_run: Background task a1b2c3d4 started: pytest --tb=short > read_file: # My Project This is a demo project... 好的,我已经做了两件事: 1. 在后台启动了测试(任务ID: a1b2c3d4),正在运行中 2. 同时读取了 README 文件的内容:... 等测试跑完后我会收到通知。 s08 >> 测试跑完了吗? > check_background: a1b2c3d4: [completed] pytest --tb=short 测试已经跑完了!结果显示全部通过。 s08 >> 同时在后台跑编译和代码检查 > background_run: Background task e5f6g7h8 started: npm run build > background_run: Background task i9j0k1l2 started: npm run lint 好的,我同时启动了两个后台任务: 1. 编译(e5f6g7h8) 2. 代码检查(i9j0k1l2) 它们会并行运行,跑完后我会收到通知。 s08 >> 现在任务都什么状态? > check_background: e5f6g7h8: [running] npm run build i9j0k1l2: [completed] npm run lint 编译还在跑,代码检查已经完成了,没有发现问题。

注意看关键流程:

  1. Agent 用 background_run 启动任务后立刻就能做别的事(读文件)
  2. Agent 可以随时用 check_background 主动查询状态
  3. 当后台任务完成时,结果会在下次 LLM 调用前自动注入

关键收获

  1. 阻塞 vs 非阻塞bash 是阻塞的(等命令跑完才返回),background_run 是非阻塞的(立刻返回任务ID)。选哪个取决于命令要跑多久。
  1. 线程 + 队列 = 后台通知:后台线程执行命令,通知队列传递结果。两者通过线程锁保证数据安全。这是经典的生产者-消费者模式。
  1. drain 的时机很讲究:在每次 LLM 调用之前检查通知队列。太早了(任务还没跑完)没东西可取,太晚了(LLM 已经回复了)Agent 看不到结果。刚好在调用前是最佳时机。
  1. 精确一次投递drain_notifications 的"复制再清空"保证每个结果恰好通知一次。不会漏掉,也不会重复。
  1. Agent Loop 改动极小:整个后台任务系统,对 Agent Loop 的改动就是加了几行 drain 逻辑。核心的"调用LLM -> 执行工具 -> 循环"骨架没变。好的架构就是这样 —— 加功能的成本很低。
  1. 通知消息要精简:注入对话历史的通知只取前500字符,完整结果保留在 tasks 字典里。这是在 token 消耗和信息完整性之间的权衡。

下一课预告

到目前为止,我们的 Agent 已经能用工具、管任务、跑后台命令了。但还有一个问题:Agent 看到的信息太多了。当对话越来越长,上下文窗口会被塞满。下一课我们来看怎么用上下文压缩(Context Compaction)来解决这个问题,让 Agent 能处理更长的对话。

上一课 07. 任务系统