context engineer
上下文工程
本篇基于 Claude Code-2.1.87版本,深入探讨 agent harness 中 context 的 高设计密度内容
本来想在文中加入高频且量少的代码片段,但由于内容实在太多,遗憾放弃
Prepared
参考src/query.ts :最核心的代码queryloop,一千五百行代码,我们可以得到信息,Claude Code不是把所有的历史信息都交给模型,而是在每一次都重新构造给模型看的上下文,也由此可以引出之后的,compact 机制,prompt 分层,context 输入输出
从输入进入后,REPL路径经过 handlePromptSubmit 之后,第一个问题就是 防止同一 session 并发 trun:在src/utils/QueryGuard.ts中;保留Guard(类似 watch 的方式)进行队列处理,状态 从idle (空闲) -> dispatching(调度);如果不为空闲(另一个查询/调度正在进行中)就返回 false;
之后就进入系统前缀部分,分层依次是 system prompt,system context,user context;而system prompt 又精确分层为静态段和动态段;
-
静态段是 intro,system,doing tasks,action,tool…
-
动态段是 session guidance,memory,language,mcp,token budget …
在真正发请求之前,queryContext.ts 中 fetchSystemPromptParts 会把 defaultSystemPrompt + userContext + systemContext 拉齐
export async function fetchSystemPromptParts({ tools, mainLoopModel, additionalWorkingDirectories, mcpClients, customSystemPrompt,}: { tools: Tools mainLoopModel: string additionalWorkingDirectories: string[] mcpClients: MCPServerConnection[] customSystemPrompt: string | undefined}): Promise<{ defaultSystemPrompt: string[] userContext: { [k: string]: string } systemContext: { [k: string]: string }}> { const [defaultSystemPrompt, userContext, systemContext] = await Promise.all([ customSystemPrompt !== undefined ? Promise.resolve([]) : getSystemPrompt( tools, mainLoopModel, additionalWorkingDirectories, mcpClients, ), getUserContext(), customSystemPrompt !== undefined ? Promise.resolve({}) : getSystemContext(), ]) return { defaultSystemPrompt, userContext, systemContext }}从技术上来说(笔者是对typescript几乎没啥了解,轻喷)
-
性能优化:使用
Promise.all并行处理,避免串行等待(比如生成 Prompt 和获取用户上下文可以同时做)。 -
短路逻辑**:通过
customSystemPrompt判断,**按需加载。如果用户自定义了,就不浪费资源去生成默认的。 -
关注点分离:将
defaultSystemPrompt(框架生成的)和customSystemPrompt(用户传入的)在逻辑上分开,后续拼接时更容易处理优先级。
这同样可以给我们启发,如果我们有自己的一套在实践中生成的 customSystemPrompt ,defaultSystemPrompt 和 systemContext 就会直接返回[]和{},这样跳过生成还不会污染环境变量,是更轻松地做法
之后 spiltSysPromptPrefix 会按 boundary 切成 cache block
export function splitSysPromptPrefix( systemPrompt: SystemPrompt, options?: { skipGlobalCacheForSystemPrompt?: boolean },): SystemPromptBlock[] { const useGlobalCacheFeature = shouldUseGlobalCacheScope() if (useGlobalCacheFeature && options?.skipGlobalCacheForSystemPrompt) { logEvent('tengu_sysprompt_using_tool_based_cache', { promptBlockCount: systemPrompt.length, })
// Filter out boundary marker, return blocks without global scope let attributionHeader: string | undefined let systemPromptPrefix: string | undefined const rest: string[] = []
for (const prompt of systemPrompt) { if (!prompt) continue if (prompt === SYSTEM_PROMPT_DYNAMIC_BOUNDARY) continue // Skip boundary if (prompt.startsWith('x-anthropic-billing-header')) { attributionHeader = prompt } else if (CLI_SYSPROMPT_PREFIXES.has(prompt)) { systemPromptPrefix = prompt } else { rest.push(prompt) } }
const result: SystemPromptBlock[] = [] if (attributionHeader) { result.push({ text: attributionHeader, cacheScope: null }) } if (systemPromptPrefix) { result.push({ text: systemPromptPrefix, cacheScope: 'org' }) } const restJoined = rest.join('\n\n') if (restJoined) { result.push({ text: restJoined, cacheScope: 'org' }) } return result }
if (useGlobalCacheFeature) { const boundaryIndex = systemPrompt.findIndex( s => s === SYSTEM_PROMPT_DYNAMIC_BOUNDARY, ) if (boundaryIndex !== -1) { let attributionHeader: string | undefined let systemPromptPrefix: string | undefined const staticBlocks: string[] = [] const dynamicBlocks: string[] = []
for (let i = 0; i < systemPrompt.length; i++) { const block = systemPrompt[i] if (!block || block === SYSTEM_PROMPT_DYNAMIC_BOUNDARY) continue
if (block.startsWith('x-anthropic-billing-header')) { attributionHeader = block } else if (CLI_SYSPROMPT_PREFIXES.has(block)) { systemPromptPrefix = block } else if (i < boundaryIndex) { staticBlocks.push(block) } else { dynamicBlocks.push(block) } }
const result: SystemPromptBlock[] = [] if (attributionHeader) result.push({ text: attributionHeader, cacheScope: null }) if (systemPromptPrefix) result.push({ text: systemPromptPrefix, cacheScope: null }) const staticJoined = staticBlocks.join('\n\n') if (staticJoined) result.push({ text: staticJoined, cacheScope: 'global' }) const dynamicJoined = dynamicBlocks.join('\n\n') if (dynamicJoined) result.push({ text: dynamicJoined, cacheScope: null })
logEvent('tengu_sysprompt_boundary_found', { blockCount: result.length, staticBlockLength: staticJoined.length, dynamicBlockLength: dynamicJoined.length, })
return result } else { logEvent('tengu_sysprompt_missing_boundary_marker', { promptBlockCount: systemPrompt.length, }) } } let attributionHeader: string | undefined let systemPromptPrefix: string | undefined const rest: string[] = []
for (const block of systemPrompt) { if (!block) continue
if (block.startsWith('x-anthropic-billing-header')) { attributionHeader = block } else if (CLI_SYSPROMPT_PREFIXES.has(block)) { systemPromptPrefix = block } else { rest.push(block) } }
const result: SystemPromptBlock[] = [] if (attributionHeader) result.push({ text: attributionHeader, cacheScope: null }) if (systemPromptPrefix) result.push({ text: systemPromptPrefix, cacheScope: 'org' }) const restJoined = rest.join('\n\n') if (restJoined) result.push({ text: restJoined, cacheScope: 'org' }) return result}比较关键的是
type CacheScope = 'global' | 'org' | null;分为 全局缓存(所有用户,会话,请求可复用);组织级缓存(同一租户内复用,不同组织隔离)
缓存机制由 底层推理框架 (vllm/sglang,Anthropic Context Caching)实现,用于 减少token重复传输,降低延迟和成本
以及启动全局缓存(含边界标记)的相关内容:
- 边界标记之前的内容被视为「静态模板」,可安全全局缓存
- 边界标记之后的内容被视为「动态上下文」,每次请求变化,不缓存
最后的fallback模式:
useGlobalCacheFeature === false,或useGlobalCacheFeature === true但boundaryIndex === -1(没找到标记)
处理逻辑如下:
遍历所有的block,分类,billing header -> null;CLI prefix -> ‘org’;其他 -> 合并,‘org’
埋点:Anthropic Context Caching 如何处理底层缓存(或者 vllm)
稍微理一下:
系统前缀 = system prompt + system context + user context
system prompt = 静态层 + 动态层(通过 SYSTEM_PROMPT_DYNAMIC_BOUNDARY 分割)这样的好处也就是方便上面处理 prompt cache
至于user context 非常的 trivial 就是 CLAUDE.md/ 规则文件体系,对应的加载顺序很明显,离目录越近加载优先级越高,但CLAUDE.md只是表层,更深的还有 ”动态注入的 memory / attachment“。按文件路径匹配的 nested memory 走 src/utils/attachments.ts 的 memoryFileTOAttachments(同步)和getNestedMemoryAttachmentsForFile(异步)
- 前者将记忆文件转换为 LLM附件,并进行去重和维护缓存(LRU)
- 后者按照优先级遍历目录树,找出目标文件适用的所有记忆规则,再交给“前者”处理
两者配合,实现动态、分层、去重的上下文注入机制,让 LLM 每次请求都能获得最相关的项目规范,同时控制 token 消耗和缓存效率。
Runtime
运行时以 src/Tool.ts 为总线(ToolUseContext),这不仅是工具容器,而是携带 messages,readFileState,appState,permission context,ontentReplacementState renderedSystemPrompt 的总线。Claude Code 的上下文工程,在runtime的时候基本以此为准
export type ToolUseContext = { options: { commands: Command[] debug: boolean mainLoopModel: string tools: Tools verbose: boolean thinkingConfig: ThinkingConfig mcpClients: MCPServerConnection[] mcpResources: Record<string, ServerResource[]> isNonInteractiveSession: boolean agentDefinitions: AgentDefinitionsResult maxBudgetUsd?: number /** Custom system prompt that replaces the default system prompt */ customSystemPrompt?: string /** Additional system prompt appended after the main system prompt */ appendSystemPrompt?: string /** Override querySource for analytics tracking */ querySource?: QuerySource /** Optional callback to get the latest tools (e.g., after MCP servers connect mid-query) */ refreshTools?: () => Tools } abortController: AbortController readFileState: FileStateCache getAppState(): AppState setAppState(f: (prev: AppState) => AppState): void /** * Always-shared setAppState for session-scoped infrastructure (background * tasks, session hooks). Unlike setAppState, which is no-op for async agents * (see createSubagentContext), this always reaches the root store so agents * at any nesting depth can register/clean up infrastructure that outlives * a single turn. Only set by createSubagentContext; main-thread contexts * fall back to setAppState. */ setAppStateForTasks?: (f: (prev: AppState) => AppState) => void /** * Optional handler for URL elicitations triggered by tool call errors (-32042). * In print/SDK mode, this delegates to structuredIO.handleElicitation. * In REPL mode, this is undefined and the queue-based UI path is used. */ handleElicitation?: ( serverName: string, params: ElicitRequestURLParams, signal: AbortSignal, ) => Promise<ElicitResult> setToolJSX?: SetToolJSXFn addNotification?: (notif: Notification) => void /** Append a UI-only system message to the REPL message list. Stripped at the * normalizeMessagesForAPI boundary — the Exclude<> makes that type-enforced. */ appendSystemMessage?: ( msg: Exclude<SystemMessage, SystemLocalCommandMessage>, ) => void /** Send an OS-level notification (iTerm2, Kitty, Ghostty, bell, etc.) */ sendOSNotification?: (opts: { message: string notificationType: string }) => void nestedMemoryAttachmentTriggers?: Set<string> /** * CLAUDE.md paths already injected as nested_memory attachments this * session. Dedup for memoryFilesToAttachments — readFileState is an LRU * that evicts entries in busy sessions, so its .has() check alone can * re-inject the same CLAUDE.md dozens of times. */ loadedNestedMemoryPaths?: Set<string> dynamicSkillDirTriggers?: Set<string> /** Skill names surfaced via skill_discovery this session. Telemetry only (feeds was_discovered). */ discoveredSkillNames?: Set<string> userModified?: boolean setInProgressToolUseIDs: (f: (prev: Set<string>) => Set<string>) => void /** Only wired in interactive (REPL) contexts; SDK/QueryEngine don't set this. */ setHasInterruptibleToolInProgress?: (v: boolean) => void setResponseLength: (f: (prev: number) => number) => void /** Ant-only: push a new API metrics entry for OTPS tracking. * Called by subagent streaming when a new API request starts. */ pushApiMetricsEntry?: (ttftMs: number) => void setStreamMode?: (mode: SpinnerMode) => void onCompactProgress?: (event: CompactProgressEvent) => void setSDKStatus?: (status: SDKStatus) => void openMessageSelector?: () => void updateFileHistoryState: ( updater: (prev: FileHistoryState) => FileHistoryState, ) => void updateAttributionState: ( updater: (prev: AttributionState) => AttributionState, ) => void setConversationId?: (id: UUID) => void agentId?: AgentId // Only set for subagents; use getSessionId() for session ID. Hooks use this to distinguish subagent calls. agentType?: string // Subagent type name. For the main thread's --agent type, hooks fall back to getMainThreadAgentType(). /** When true, canUseTool must always be called even when hooks auto-approve. * Used by speculation for overlay file path rewriting. */ requireCanUseTool?: boolean messages: Message[] fileReadingLimits?: { maxTokens?: number maxSizeBytes?: number } globLimits?: { maxResults?: number } toolDecisions?: Map< string, { source: string decision: 'accept' | 'reject' timestamp: number } > queryTracking?: QueryChainTracking /** Callback factory for requesting interactive prompts from the user. * Returns a prompt callback bound to the given source name. * Only available in interactive (REPL) contexts. */ requestPrompt?: ( sourceName: string, toolInputSummary?: string | null, ) => (request: PromptRequest) => Promise<PromptResponse> toolUseId?: string criticalSystemReminder_EXPERIMENTAL?: string /** When true, preserve toolUseResult on messages even for subagents. * Used by in-process teammates whose transcripts are viewable by the user. */ preserveToolUseResults?: boolean /** Local denial tracking state for async subagents whose setAppState is a * no-op. Without this, the denial counter never accumulates and the * fallback-to-prompting threshold is never reached. Mutable — the * permissions code updates it in place. */ localDenialTracking?: DenialTrackingState /** * Per-conversation-thread content replacement state for the tool result * budget. When present, query.ts applies the aggregate tool result budget. * Main thread: REPL provisions once (never resets — stale UUID keys * are inert). Subagents: createSubagentContext clones the parent's state * by default (cache-sharing forks need identical decisions), or * resumeAgentBackground threads one reconstructed from sidechain records. */ contentReplacementState?: ContentReplacementState /** * Parent's rendered system prompt bytes, frozen at turn start. * Used by fork subagents to share the parent's prompt cache — re-calling * getSystemPrompt() at fork-spawn time can diverge (GrowthBook cold→warm) * and bust the cache. See forkSubagent.ts. */ renderedSystemPrompt?: SystemPrompt}之后runtime context可以分为两条链路:
- 一条是 runtime 期间 compact的路径
- 另一条是 /resume 的重建逻辑状态(session memory和 transcript)
compact
四层压缩:
微压缩
规则驱动,按照白名单过滤,保留最近 N 个,清除更早结果
白名单:Fileread/edit…,Bash,Grep,Glob,WebSearch,WebFetch;这些工具的输出通常不会有太长的时效性,因此可以过滤掉;
过滤:但是过滤不能对前缀缓存影响太多
因此有两条路径:
Cache Microcompact(细粒度)
用户处在连续对话,服务端缓存还在
通过 Cache Editing APi 删除(不动Prompt前缀),维护全局状态追踪
Time-base Microcompact(粗粒度)
用户离开一段时间后回来,服务端缓存过期
直接修改消息内容,旧输出直接替换成占位符,无需维护额外状态
细节:Cache Microcompact 只对主线程生效,因为我们曾使用抓包的方式发现过,会有 SubAgent 来做 会话记忆提取,prompt建议 等后台任务,如果这些SubAgent把自己的工具结果注册到全局状态里面,那主线程下次尝试删除时就会去删一些 在自己对话历史里 根本不存在的工具
会话记忆压缩
提取结构化事实(不把对话做摘要(不使用encoding-only)),按照项目结构/用户偏好/任务进度,持久化到MEMORY.md
先等待后台的记忆提取完成,然后读取MEMORY.md的内容
关键设计:保留多少最近的消息(最小tokens(1w),最小消息数5条,最大tokens)
细节:
- tool_use和tool_result 不能分开,要么同时保留,要么同时抛弃
- 流式传输时,同一个message id 可能被拆成多条消息,thinking在一条,tool_use在另一条,要一起都保留
会话记忆压缩的优势:不需要模型做摘要,用结构化记忆,保留最近的原始消息
完整压缩
当会话记忆压缩不够用的时候,就需要完整压缩,这一层就需要调用模型(源码里的 prompt 就值得研究)
analysis 机制 :
压缩prompt ->
目的就是:让模型在输出摘要之前想清楚(显式的);本质可以理解为 chain of thought
实际问题:
压缩请求可以能触发,prompt too long;
源码解法:按API轮次分组,从最早的组开始丢弃,最多重试三次
压缩完成之后:
清空文件读取缓存 -> 重新注入最近文件(预算5w tokens) -> 重新注入 plan 文件 -> 重新注入skill内容 -> 重新注入mcp工具说明 -> 重新注入 agent 列表
自动触发
在模型每次调用之后 -> 检查 token用量 (if 超过阈值)-> 会话记忆压缩 -> 完整压缩 -> 熔断器 连续失败3次 (停止)
session_memory SubAgent 和 compact Subagent 不会触发 autocompact;因为他们本来就是fork出来的 SubAgent,如果他们也会 autocompact,那就会 导致死锁
工具记忆
核心工具,启动时加载;拓展工具,通过toolSearch 按需发现和加载;(每个工具采用 Zod schema 定义输入参数),模型输入的Json 必须通过验证才能执行
如果工具输出太大,系统会到外部的 tool result storage,给模型一个摘要和指针,按需取用
session memory
在src/services/SessionMemory/sessionMemory.ts 中,session_memory 有一个 post-sampling hook 触发,使用 forked subagent 更新一个 markdown文件,触发门槛由 token增长和tool call 数共同控制。
/resume
transcipt:写入 src/utils/sessionStorage.ts 的 recordTranscipt()
export async function recordTranscript( messages: Message[], teamInfo?: TeamInfo, startingParentUuidHint?: UUID, allMessages?: readonly Message[],): Promise<UUID | null> { const cleanedMessages = cleanMessagesForLogging(messages, allMessages) const sessionId = getSessionId() as UUID const messageSet = await getSessionMessages(sessionId) const newMessages: typeof cleanedMessages = [] let startingParentUuid: UUID | undefined = startingParentUuidHint let seenNewMessage = false for (const m of cleanedMessages) { if (messageSet.has(m.uuid as UUID)) { // Only track skipped messages that form a prefix. After compaction, // messagesToKeep appear AFTER new CB/summary, so this skips them. if (!seenNewMessage && isChainParticipant(m)) { startingParentUuid = m.uuid as UUID } } else { newMessages.push(m) seenNewMessage = true } } if (newMessages.length > 0) { await getProject().insertMessageChain( newMessages, false, undefined, startingParentUuid, teamInfo, ) } // Return the last ACTUALLY recorded chain-participant's UUID, OR the // prefix-tracked UUID if no new chain participants were recorded. This lets // callers (useLogMessages) maintain the correct parent chain even when the // slice is all-recorded (rewind, /resume scenarios where every message is // already in messageSet). Progress is skipped — it's written to the JSONL // but nothing chains TO it (see isChainParticipant). const lastRecorded = newMessages.findLast(isChainParticipant) return (lastRecorded?.uuid as UUID | undefined) ?? startingParentUuid ?? null}预处理移除敏感信息等内容之后遍历消息,分离已存在的消息和新消息,动态更新 parent uuid 确保对话链在 压缩,重放,并发 等复杂场景依然可以保持正常的 父子关系,这样就可以只维护一个简单的 parentUuid 状态,无需关心内部复杂的逻辑。
核心的结构是:JSONL + parentUuid链;compact boundary 会刻意断链,确保resume 从 summary 挂起
resume重建逻辑状态(不是恢复进程)在 src/utils/sessionStorage中的loadTranscriptFile(),读取transcipt,桥接 legacy progress,恢复 content replacement,处理 preserved segment;
export async function loadTranscriptFile( filePath: string, opts?: { keepAllLeaves?: boolean },): Promise<{ messages: Map<UUID, TranscriptMessage> summaries: Map<UUID, string> customTitles: Map<UUID, string> tags: Map<UUID, string> agentNames: Map<UUID, string> agentColors: Map<UUID, string> agentSettings: Map<UUID, string> prNumbers: Map<UUID, number> prUrls: Map<UUID, string> prRepositories: Map<UUID, string> modes: Map<UUID, string> worktreeStates: Map<UUID, PersistedWorktreeSession | null> fileHistorySnapshots: Map<UUID, FileHistorySnapshotMessage> attributionSnapshots: Map<UUID, AttributionSnapshotMessage> contentReplacements: Map<UUID, ContentReplacementRecord[]> agentContentReplacements: Map<AgentId, ContentReplacementRecord[]> contextCollapseCommits: ContextCollapseCommitEntry[] contextCollapseSnapshot: ContextCollapseSnapshotEntry | undefined leafUuids: Set<UUID>}> { const messages = new Map<UUID, TranscriptMessage>() const summaries = new Map<UUID, string>() const customTitles = new Map<UUID, string>() const tags = new Map<UUID, string>() const agentNames = new Map<UUID, string>() const agentColors = new Map<UUID, string>() const agentSettings = new Map<UUID, string>() const prNumbers = new Map<UUID, number>() const prUrls = new Map<UUID, string>() const prRepositories = new Map<UUID, string>() const modes = new Map<UUID, string>() const worktreeStates = new Map<UUID, PersistedWorktreeSession | null>() const fileHistorySnapshots = new Map<UUID, FileHistorySnapshotMessage>() const attributionSnapshots = new Map<UUID, AttributionSnapshotMessage>() const contentReplacements = new Map<UUID, ContentReplacementRecord[]>() const agentContentReplacements = new Map< AgentId, ContentReplacementRecord[] >() // Array, not Map — commit order matters (nested collapses). const contextCollapseCommits: ContextCollapseCommitEntry[] = [] // Last-wins — later entries supersede. let contextCollapseSnapshot: ContextCollapseSnapshotEntry | undefined
try { // For large transcripts, avoid materializing megabytes of stale content. // Single forward chunked read: attribution-snapshot lines are skipped at // the fd level (never buffered), compact boundaries truncate the // accumulator in-stream. Peak allocation is the OUTPUT size, not the // file size — a 151 MB session that is 84% stale attr-snaps allocates // ~32 MB instead of 159+64 MB. This matters because mimalloc does not // return those pages to the OS even after JS-level GC frees the backing // buffers (measured: arrayBuffers=0 after Bun.gc(true) but RSS stuck at // ~316 MB on the old scan+strip path vs ~155 MB here). // // Pre-boundary metadata (agent-setting, mode, pr-link, etc.) is recovered // via a cheap byte-level forward scan of [0, boundary). let buf: Buffer | null = null let metadataLines: string[] | null = null let hasPreservedSegment = false if (!isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_PRECOMPACT_SKIP)) { const { size } = await stat(filePath) if (size > SKIP_PRECOMPACT_THRESHOLD) { const scan = await readTranscriptForLoad(filePath, size) buf = scan.postBoundaryBuf hasPreservedSegment = scan.hasPreservedSegment // >0 means we truncated pre-boundary bytes and must recover // session-scoped metadata from that range. A preservedSegment // boundary does not truncate (preserved messages are physically // pre-boundary), so offset stays 0 unless an EARLIER non-preserved // boundary already truncated — in which case the preserved messages // for the later boundary are post-that-earlier-boundary and were // kept, and we still want the metadata scan. if (scan.boundaryStartOffset > 0) { metadataLines = await scanPreBoundaryMetadata( filePath, scan.boundaryStartOffset, ) } } } buf ??= await readFile(filePath) // For large buffers (which here means readTranscriptForLoad output with // attr-snaps already stripped at the fd level — the <5MB readFile path // falls through the size gate below), the dominant cost is parsing dead // fork branches that buildConversationChain would discard anyway. Skip // when the caller needs all // leaves (loadAllLogsFromSessionFile for /insights picks the branch with // most user messages, not the latest), when the boundary has a // preservedSegment (those messages keep their pre-compact parentUuid on // disk -- applyPreservedSegmentRelinks splices them in-memory AFTER // parse, so a pre-parse chain walk would drop them as orphans), and when // CLAUDE_CODE_DISABLE_PRECOMPACT_SKIP is set (that kill switch means // "load everything, skip nothing"; this is another skip-before-parse // optimization and the scan it depends on for hasPreservedSegment did // not run). if ( !opts?.keepAllLeaves && !hasPreservedSegment && !isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_PRECOMPACT_SKIP) && buf.length > SKIP_PRECOMPACT_THRESHOLD ) { buf = walkChainBeforeParse(buf) }
// First pass: process metadata-only lines collected during the boundary scan. // These populate the session-scoped maps (agentSettings, modes, prNumbers, // etc.) for entries written before the compact boundary. Any overlap with // the post-boundary buffer is harmless — later values overwrite earlier ones. if (metadataLines && metadataLines.length > 0) { const metaEntries = parseJSONL<Entry>( Buffer.from(metadataLines.join('\n')), ) for (const entry of metaEntries) { if (entry.type === 'summary' && entry.leafUuid) { summaries.set(entry.leafUuid, entry.summary) } else if (entry.type === 'custom-title' && entry.sessionId) { customTitles.set(entry.sessionId, entry.customTitle) } else if (entry.type === 'tag' && entry.sessionId) { tags.set(entry.sessionId, entry.tag) } else if (entry.type === 'agent-name' && entry.sessionId) { agentNames.set(entry.sessionId, entry.agentName) } else if (entry.type === 'agent-color' && entry.sessionId) { agentColors.set(entry.sessionId, entry.agentColor) } else if (entry.type === 'agent-setting' && entry.sessionId) { agentSettings.set(entry.sessionId, entry.agentSetting) } else if (entry.type === 'mode' && entry.sessionId) { modes.set(entry.sessionId, entry.mode) } else if (entry.type === 'worktree-state' && entry.sessionId) { worktreeStates.set(entry.sessionId, entry.worktreeSession) } else if (entry.type === 'pr-link' && entry.sessionId) { prNumbers.set(entry.sessionId, entry.prNumber) prUrls.set(entry.sessionId, entry.prUrl) prRepositories.set(entry.sessionId, entry.prRepository) } } }
const entries = parseJSONL<Entry>(buf)
// Bridge map for legacy progress entries: progress_uuid → progress_parent_uuid. // PR #24099 removed progress from isTranscriptMessage, so old transcripts with // progress in the parentUuid chain would truncate at buildConversationChain // when messages.get(progressUuid) returns undefined. Since transcripts are // append-only (parents before children), we record each progress→parent link // as we see it, chain-resolving through consecutive progress entries, then // rewrite any subsequent message whose parentUuid lands in the bridge. const progressBridge = new Map<UUID, UUID | null>()
for (const entry of entries) { // Legacy progress check runs before the Entry-typed else-if chain — // progress is not in the Entry union, so checking it after TypeScript // has narrowed `entry` intersects to `never`. if (isLegacyProgressEntry(entry)) { // Chain-resolve through consecutive progress entries so a later // message pointing at the tail of a progress run bridges to the // nearest non-progress ancestor in one lookup. const parent = entry.parentUuid progressBridge.set( entry.uuid, parent && progressBridge.has(parent) ? (progressBridge.get(parent) ?? null) : parent, ) continue } if (isTranscriptMessage(entry)) { if (entry.parentUuid && progressBridge.has(entry.parentUuid)) { entry.parentUuid = progressBridge.get(entry.parentUuid) ?? null } messages.set(entry.uuid, entry) // Compact boundary: prior marble-origami-commit entries reference // messages that won't be in the post-boundary chain. The >5MB // backward-scan path discards them naturally by never reading the // pre-boundary bytes; the <5MB path reads everything, so discard // here. Without this, getStats().collapsedSpans in /context // overcounts (projectView silently skips the stale commits but // they're still in the log). if (isCompactBoundaryMessage(entry)) { contextCollapseCommits.length = 0 contextCollapseSnapshot = undefined } } else if (entry.type === 'summary' && entry.leafUuid) { summaries.set(entry.leafUuid, entry.summary) } else if (entry.type === 'custom-title' && entry.sessionId) { customTitles.set(entry.sessionId, entry.customTitle) } else if (entry.type === 'tag' && entry.sessionId) { tags.set(entry.sessionId, entry.tag) } else if (entry.type === 'agent-name' && entry.sessionId) { agentNames.set(entry.sessionId, entry.agentName) } else if (entry.type === 'agent-color' && entry.sessionId) { agentColors.set(entry.sessionId, entry.agentColor) } else if (entry.type === 'agent-setting' && entry.sessionId) { agentSettings.set(entry.sessionId, entry.agentSetting) } else if (entry.type === 'mode' && entry.sessionId) { modes.set(entry.sessionId, entry.mode) } else if (entry.type === 'worktree-state' && entry.sessionId) { worktreeStates.set(entry.sessionId, entry.worktreeSession) } else if (entry.type === 'pr-link' && entry.sessionId) { prNumbers.set(entry.sessionId, entry.prNumber) prUrls.set(entry.sessionId, entry.prUrl) prRepositories.set(entry.sessionId, entry.prRepository) } else if (entry.type === 'file-history-snapshot') { fileHistorySnapshots.set(entry.messageId, entry) } else if (entry.type === 'attribution-snapshot') { attributionSnapshots.set(entry.messageId, entry) } else if (entry.type === 'content-replacement') { // Subagent decisions key by agentId (sidechain resume); main-thread // decisions key by sessionId (/resume). if (entry.agentId) { const existing = agentContentReplacements.get(entry.agentId) ?? [] agentContentReplacements.set(entry.agentId, existing) existing.push(...entry.replacements) } else { const existing = contentReplacements.get(entry.sessionId) ?? [] contentReplacements.set(entry.sessionId, existing) existing.push(...entry.replacements) } } else if (entry.type === 'marble-origami-commit') { contextCollapseCommits.push(entry) } else if (entry.type === 'marble-origami-snapshot') { contextCollapseSnapshot = entry } } } catch { // File doesn't exist or can't be read }
applyPreservedSegmentRelinks(messages) applySnipRemovals(messages)
// Compute leaf UUIDs once at load time // Only user/assistant messages should be considered as leaves for anchoring resume. // Other message types (system, attachment) are metadata or auxiliary and shouldn't // anchor a conversation chain. // // We use standard parent relationship for main chain detection, but also need to // handle cases where the last message is a system/metadata message. // For each conversation chain (identified by following parent links), the leaf // is the most recent user/assistant message. const allMessages = [...messages.values()]
// Standard leaf computation using parent relationships const parentUuids = new Set( allMessages .map(msg => msg.parentUuid) .filter((uuid): uuid is UUID => uuid !== null), )
// Find all terminal messages (messages with no children) const terminalMessages = allMessages.filter(msg => !parentUuids.has(msg.uuid))
const leafUuids = new Set<UUID>() let hasCycle = false
if (getFeatureValue_CACHED_MAY_BE_STALE('tengu_pebble_leaf_prune', false)) { // Build a set of UUIDs that have user/assistant children // (these are mid-conversation nodes, not dead ends) const hasUserAssistantChild = new Set<UUID>() for (const msg of allMessages) { if (msg.parentUuid && (msg.type === 'user' || msg.type === 'assistant')) { hasUserAssistantChild.add(msg.parentUuid) } }
// For each terminal message, walk back to find the nearest user/assistant ancestor. // Skip ancestors that already have user/assistant children - those are mid-conversation // nodes where the conversation continued (e.g., an assistant tool_use message whose // progress child is terminal, but whose tool_result child continues the conversation). for (const terminal of terminalMessages) { const seen = new Set<UUID>() let current: TranscriptMessage | undefined = terminal while (current) { if (seen.has(current.uuid)) { hasCycle = true break } seen.add(current.uuid) if (current.type === 'user' || current.type === 'assistant') { if (!hasUserAssistantChild.has(current.uuid)) { leafUuids.add(current.uuid) } break } current = current.parentUuid ? messages.get(current.parentUuid) : undefined } } } else { // Original leaf computation: walk back from terminal messages to find // the nearest user/assistant ancestor unconditionally for (const terminal of terminalMessages) { const seen = new Set<UUID>() let current: TranscriptMessage | undefined = terminal while (current) { if (seen.has(current.uuid)) { hasCycle = true break } seen.add(current.uuid) if (current.type === 'user' || current.type === 'assistant') { leafUuids.add(current.uuid) break } current = current.parentUuid ? messages.get(current.parentUuid) : undefined } } }
if (hasCycle) { logEvent('tengu_transcript_parent_cycle', {}) }
return { messages, summaries, customTitles, tags, agentNames, agentColors, agentSettings, prNumbers, prUrls, prRepositories, modes, worktreeStates, fileHistorySnapshots, attributionSnapshots, contentReplacements, agentContentReplacements, contextCollapseCommits, contextCollapseSnapshot, leafUuids, }}在同一个文件的 buildConversationChain(),沿着parentUuid,进行恢复,恢复 tool result分支(值得一提的是,对于 tool_user,tool_result 以及 thinking ; claude code 都有进行更细粒度的控制)以及 在src/utils/conversationRecovery.ts;过滤unresolved tool use, orphaned thinking, whitespace assistant,并在需要时补 continuation sentinel
对于 前文提到的 forked SubAgent,是针对 prompt cache 命中而设计的子代理,
export async function runForkedAgent({ promptMessages, cacheSafeParams, canUseTool, querySource, forkLabel, overrides, maxOutputTokens, maxTurns, onMessage, skipTranscript, skipCacheWrite,}: ForkedAgentParams): Promise<ForkedAgentResult> { const startTime = Date.now() const outputMessages: Message[] = [] let totalUsage: NonNullableUsage = { ...EMPTY_USAGE }
const { systemPrompt, userContext, systemContext, toolUseContext, forkContextMessages, } = cacheSafeParams
// Create isolated context to prevent mutation of parent state const isolatedToolUseContext = createSubagentContext( toolUseContext, overrides, )
// Do NOT filterIncompleteToolCalls here — it drops the whole assistant on // partial tool batches, orphaning the paired results (API 400). Dangling // tool_uses are repaired downstream by ensureToolResultPairing in claude.ts, // same as the main thread — identical post-repair prefix keeps the cache hit. const initialMessages: Message[] = [...forkContextMessages, ...promptMessages]
// Generate agent ID and record initial messages for transcript // When skipTranscript is set, skip agent ID creation and all transcript I/O const agentId = skipTranscript ? undefined : createAgentId(forkLabel) let lastRecordedUuid: UUID | null = null if (agentId) { await recordSidechainTranscript(initialMessages, agentId).catch(err => logForDebugging( `Forked agent [${forkLabel}] failed to record initial transcript: ${err}`, ), ) // Track the last recorded message UUID for parent chain continuity lastRecordedUuid = initialMessages.length > 0 ? initialMessages[initialMessages.length - 1]!.uuid : null }
// Run the query loop with isolated context (cache-safe params preserved) try { for await (const message of query({ messages: initialMessages, systemPrompt, userContext, systemContext, canUseTool, toolUseContext: isolatedToolUseContext, querySource, maxOutputTokensOverride: maxOutputTokens, maxTurns, skipCacheWrite, })) { // Extract real usage from message_delta stream events (final usage per API call) if (message.type === 'stream_event') { if ( 'event' in message && message.event?.type === 'message_delta' && message.event.usage ) { const turnUsage = updateUsage({ ...EMPTY_USAGE }, message.event.usage) totalUsage = accumulateUsage(totalUsage, turnUsage) } continue } if (message.type === 'stream_request_start') { continue }
logForDebugging( `Forked agent [${forkLabel}] received message: type=${message.type}`, )
outputMessages.push(message as Message) onMessage?.(message as Message)
// Record transcript for recordable message types (same pattern as runAgent.ts) const msg = message as Message if ( agentId && (msg.type === 'assistant' || msg.type === 'user' || msg.type === 'progress') ) { await recordSidechainTranscript([msg], agentId, lastRecordedUuid).catch( err => logForDebugging( `Forked agent [${forkLabel}] failed to record transcript: ${err}`, ), ) if (msg.type !== 'progress') { lastRecordedUuid = msg.uuid } } } } finally { // Release cloned file state cache memory (same pattern as runAgent.ts) isolatedToolUseContext.readFileState.clear() // Release the cloned fork context messages initialMessages.length = 0 }
logForDebugging( `Forked agent [${forkLabel}] finished: ${outputMessages.length} messages, types=[${outputMessages.map(m => m.type).join(', ')}], totalUsage: input=${totalUsage.input_tokens} output=${totalUsage.output_tokens} cacheRead=${totalUsage.cache_read_input_tokens} cacheCreate=${totalUsage.cache_creation_input_tokens}`, )
const durationMs = Date.now() - startTime
// Log the fork query metrics with full NonNullableUsage logForkAgentQueryEvent({ forkLabel, querySource, durationMs, messageCount: outputMessages.length, totalUsage, queryTracking: toolUseContext.queryTracking, })
return { messages: outputMessages, totalUsage, }}forked agent 执行独立的LLM 查询循环,实时推送消息给父进程,维护独立的对话记录与用量统计
createSubagentContext,克隆 toolUseContext,避免污染父代理状态(这点也可以给我们启发,其实很多 多agent协作之间,可以是以上下文切面来分离Subagent的);
复用cacheSafeParams的静态prompt,提升缓存命中率;
通过query的aysnc iterator实时处理 message_delta 等流事件
以及在src/tools/AgentTool/forkSubagent.ts的 buildForkedMessages() 刻意给所有child 构造几乎 一致的prefix,也能增加缓存命中率;
export function buildForkedMessages( directive: string, assistantMessage: AssistantMessage,): MessageType[] { // Clone the assistant message to avoid mutating the original, keeping all // content blocks (thinking, text, and every tool_use) const fullAssistantMessage: AssistantMessage = { ...assistantMessage, uuid: randomUUID(), message: { ...assistantMessage.message, content: [...assistantMessage.message.content], }, }
// Collect all tool_use blocks from the assistant message const toolUseBlocks = assistantMessage.message.content.filter( (block): block is BetaToolUseBlock => block.type === 'tool_use', )
if (toolUseBlocks.length === 0) { logForDebugging( `No tool_use blocks found in assistant message for fork directive: ${directive.slice(0, 50)}...`, { level: 'error' }, ) return [ createUserMessage({ content: [ { type: 'text' as const, text: buildChildMessage(directive) }, ], }), ] }
// Build tool_result blocks for every tool_use, all with identical placeholder text const toolResultBlocks = toolUseBlocks.map(block => ({ type: 'tool_result' as const, tool_use_id: block.id, content: [ { type: 'text' as const, text: FORK_PLACEHOLDER_RESULT, }, ], }))
// Build a single user message: all placeholder tool_results + the per-child directive // TODO(smoosh): this text sibling creates a [tool_result, text] pattern on the wire // (renders as </function_results>\n\nHuman:<text>). One-off per-child construction, // not a repeated teacher, so low-priority. If we ever care, use smooshIntoToolResult // from src/utils/messages.ts to fold the directive into the last tool_result.content. const toolResultMessage = createUserMessage({ content: [ ...toolResultBlocks, { type: 'text' as const, text: buildChildMessage(directive), }, ], })
return [fullAssistantMessage, toolResultMessage]}将父代理的 assistant 消息(含工具调用)转换为子代理的初始对话上下文,通过占位符 tool_result 模拟工具已执行,使子代理能基于”假设的工具输出”继续推理
以及 在src/utils/agentContext使用 AsyncLocalStorage 隔离并发 agent,对于 cache-aware的上下文继承已经达到了极度强调的状态
final
关键点:
compact/session memory 和 resume/fork cache sharing
bye~
