feat(agent): track per-invocation chat usage across ReAct reasoning steps

Add ChatUsageTrackingHook that accumulates ChatUsage from every
PostReasoningEvent within a single process() call, expose it via
ReActAgentContext#getChatUsage(), and emit a per-step usage line in
ReActLoggingHook. The hook is cached on AgentSession and reset at the
start of each process() so the snapshot reflects the full invocation
(not just the last reasoning step).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
everywhere.z
2026-05-18 20:50:23 +08:00
parent 8ea01d2e88
commit 083bdfa0f2
6 changed files with 150 additions and 4 deletions

View File

@@ -184,10 +184,20 @@ protected void handleReply(Msg reply) {
ctx().getSlot().getContextBean(MyAgentCtx.class).setReply(getNodeId(), text);
// 选择 2以 nodeId 为 key 存到 slot 输出,避免相互覆盖
ctx().getSlot().setOutput(getNodeId(), text);
// 选择 3把本轮累计 token 用量一起落盘,供下游节点或调用方读取
ChatUsage usage = ctx().getChatUsage();
if (usage != null) {
ctx().getSlot().setOutput(getNodeId() + ".usage", Map.of(
"inputTokens", usage.getInputTokens(),
"outputTokens", usage.getOutputTokens(),
"totalTokens", usage.getTotalTokens(),
"timeSeconds", usage.getTime()
));
}
}
```
下游节点对应使用 `slot.getContextBean(MyAgentCtx.class)``slot.getOutput(nodeId)` 读取。
下游节点对应使用 `slot.getContextBean(MyAgentCtx.class)``slot.getOutput(nodeId)` 读取。`ChatUsage` 来自 `io.agentscope.core.model.ChatUsage``ctx().getChatUsage()` 的语义与边界见 [§ 3](#3-reactagentcomponent-扩展点) 中 `ReActAgentContext` 表格说明。
> **多 Agent 节点共存的注意事项**:默认 `responseData` 是 slot 级别的单一字段,后写覆盖先写。链路中存在多个 ReAct Agent 时,请覆写 `handleReply` 用 `setOutput(nodeId, ...)` 或自定义 ContextBean 区分各 Agent 的输出。
@@ -261,7 +271,7 @@ if (response.isSuccess()) {
| `handleReply(reply)` | 否 | 写入 `slot.responseData` | 自定义回复处理逻辑 |
| `buildModel()` | 否 | 委派 `model().resolve(agentConfig())` | 逃生舱:完全自行构造 agentscope `Model` |
`ReActAgentContext` 可通过组件的 `ctx()` 方法取得,提供四项执行上下文:
`ReActAgentContext` 可通过组件的 `ctx()` 方法取得,提供以下执行上下文:
| 方法 | 说明 |
| --- | --- |
@@ -269,9 +279,12 @@ if (response.isSuccess()) {
| `getConversationId()` | 安全化后的 conversation ID决定 workspace 子目录 |
| `getAgentKey()` | 安全化后的 Agent key默认来自 `nodeId` |
| `getWorkspaceDir()` | 当前 conversation 对应的 workspace 目录 |
| `getChatUsage()` | 本次 `process()` 截至当前已累计的 token 用量agentscope `ChatUsage`,含 `getInputTokens()` / `getOutputTokens()` / `getTotalTokens()` / `getTime()`(秒));模型未上报或本轮尚未发生过 reasoning step 时返回 `null` |
注意:`systemPrompt()` 只在同一 `(conversationId, agentKey)` 下首次构建 Agent 时调用;后续调用会复用同一个 Agent 实例和 memory。动态输入应放在 `userPrompt()` 中,并通过 `ctx()``getSlot()` 读取当次 invocation 的数据。
`getChatUsage()` 的累计口径:底层 agentscope 每次 `reasoning(iter)` 都会新建一个 `ReasoningContext`,所以 `Msg#getChatUsage()` 反映的是**当前这一步** LLM 调用的累计(流式聚合),而不是跨多步 ReAct 循环的累计。框架默认始终注册一个内部 `ChatUsageTrackingHook`,在每次 `PostReasoningEvent` 时把该步 usage 累加到 Session 缓存的计数器,并在每次 `process()` 开始前清零,因此 `ctx().getChatUsage()` 给出的是**整次 `process()` 调用**累计后的值。该 hook 无配置开关,业务无需启用;如果完全不希望产生这部分计数开销,可通过覆写 `buildModel()` 走完全自定义路径绕开 `ReActAgentComponent` 的默认构建流程。
同理,`tools()``hooks()``skills()``enableSkills()``enableReActLogging()``buildModel()` 都属于 Agent 构建期能力声明,通常只在同一 `(conversationId, agentKey)` 首次构建缓存 Agent 时生效。不要让这些方法依赖单次请求数据;如果确实需要按请求隔离模型、工具或 hook请把请求维度体现在 `agentKey()``conversationId` 中,让框架构建新的 AgentSession。
---
@@ -719,10 +732,13 @@ liteflow.agent.shell.mode=disabled
| --- | --- |
| `PreReasoningEvent` | `[agent:reason][conversationId:agentKey] >>> model=... messages=N` |
| `PostReasoningEvent` | `[agent:reason][conversationId:agentKey] <<< text=... toolCalls=[...]` |
| `PostReasoningEvent`usage 附加行) | `[agent:reason][conversationId:agentKey] <<< usage input=N output=N total=N time=Ns`(仅当本步消息上报了 `ChatUsage` 时输出) |
| `PreActingEvent` | `[agent:act][conversationId:agentKey] >>> tool=... input=...` |
| `PostActingEvent` | `[agent:act][conversationId:agentKey] <<< tool=... result=...` |
| `ErrorEvent` | `[agent:error][conversationId:agentKey] ...` |
`PostReasoningEvent` 的 usage 行直接来自当前这一步 reasoning message 的 `ChatUsage`agentscope 单步累计 / 流式聚合的结果),不是跨步累计;想要看整次 `process()` 调用的总 token请使用 `ctx().getChatUsage()`(见 [§ 3](#3-reactagentcomponent-扩展点))。
- 全局开关:`liteflow.agent.logging.react-enabled`(默认 `true`)。
- 单组件开关:覆写 `enableReActLogging()` 强制返回 `true` / `false`
- 输出 logger 名:`com.yomahub.liteflow.agent.hook.ReActLoggingHook`(可在 logback / log4j2 中独立调级)。
@@ -1070,6 +1086,8 @@ liteflow.agent.anthropic-compatible.gateway.base-url=https://anthropic-gateway.e
| `ctx() must be called during process()` | 在构造器、Bean 初始化、异步线程或 `process()` 结束后的生命周期中调用了 `ctx()` | 只在 `userPrompt()`、工具回调、Hook 回调和 `handleReply()``process()` 触发的调用链内读取;跨 invocation 缓存的对象不要保存 `ReActAgentContext` |
| 注册 listener 后链路失败 | `eventListener` 回调中抛出了异常,或执行了阻塞 I/O 导致上游超时 | listener 内只做轻量处理并自行捕获异常;重型转发逻辑放到外部队列或线程池 |
| `WHEN` 中多个 Agent 看起来没有并行 | 多个组件解析到了相同 `(conversationId, agentKey)`,共用了同一把锁;或下游等待最慢分支 | 确保需要并行的 Agent 使用不同 `agentKey()`;如还要隔离文件,则使用不同 conversation 或子目录 |
| `ctx().getChatUsage()` 返回 `null` | 本轮还没发生过 reasoning step`userPrompt()` 阶段就读取),或模型 / 网关未在响应里上报 `ChatUsage`(部分流式实现或代理网关会丢失 usage | 改到 `handleReply()` 等本轮 reasoning 结束后的时机再读;或确认模型 / 网关在响应 metadata 中带回 usage 字段 |
| `ctx().getChatUsage()` 的累计 token 比 SDK 单次响应大 | 同一次 `process()` 内 ReAct 触发了多轮 reasoning框架内置 `ChatUsageTrackingHook` 会把每步 LLM 调用的 usage 都累加进来 | 这是预期行为:`getChatUsage()` 是本次调用的总计;若需要单步 usage`ReActLoggingHook``PostReasoningEvent` 日志或自定义 hook 读取 |
---

View File

@@ -1,6 +1,7 @@
package com.yomahub.liteflow.agent.component;
import com.yomahub.liteflow.agent.exception.AgentConfigException;
import com.yomahub.liteflow.agent.hook.ChatUsageTrackingHook;
import com.yomahub.liteflow.agent.hook.ReActLoggingHook;
import com.yomahub.liteflow.agent.skill.SkillBoxFactory;
import com.yomahub.liteflow.agent.skill.SkillLoadResult;
@@ -300,6 +301,7 @@ public abstract class ReActAgentComponent extends NodeComponent {
BuiltAgent built = buildAgent();
agent = built.agent();
session.setSkillTrackingHook(built.skillTrackingHook());
session.setChatUsageTrackingHook(built.chatUsageTrackingHook());
mgr.loadIfExists(session, agent);
session.setAgent(agent);
}
@@ -308,6 +310,11 @@ public abstract class ReActAgentComponent extends NodeComponent {
skillHook.clear();
slot.setAttachment(skillHookKey(), skillHook);
}
ChatUsageTrackingHook usageHook = session.getChatUsageTrackingHook();
if (usageHook != null) {
usageHook.reset();
ctx.setChatUsageTrackingHook(usageHook);
}
Throwable processError = null;
try {
Msg userMsg = Msg.builder().textContent(userPrompt()).build();
@@ -408,7 +415,8 @@ public abstract class ReActAgentComponent extends NodeComponent {
return null;
}
private record BuiltAgent(ReActAgent agent, SkillTrackingHook skillTrackingHook) {
private record BuiltAgent(ReActAgent agent, SkillTrackingHook skillTrackingHook,
ChatUsageTrackingHook chatUsageTrackingHook) {
}
private BuiltAgent buildAgent() {
@@ -430,6 +438,9 @@ public abstract class ReActAgentComponent extends NodeComponent {
allHooks.add(new ReActLoggingHook(ctx.getConversationId() + ":" + ctx.getAgentKey()));
}
ChatUsageTrackingHook chatUsageTrackingHook = new ChatUsageTrackingHook();
allHooks.add(chatUsageTrackingHook);
SkillTrackingHook skillTrackingHook = null;
SkillBox skillBox = null;
if (enableSkills()) {
@@ -452,7 +463,7 @@ public abstract class ReActAgentComponent extends NodeComponent {
builder.skillBox(skillBox);
}
return new BuiltAgent(builder.build(), skillTrackingHook);
return new BuiltAgent(builder.build(), skillTrackingHook, chatUsageTrackingHook);
}
/** 持有单例 AgentSessionManager首次 process() 时懒创建。 */

View File

@@ -1,6 +1,8 @@
package com.yomahub.liteflow.agent.component;
import com.yomahub.liteflow.agent.hook.ChatUsageTrackingHook;
import com.yomahub.liteflow.slot.Slot;
import io.agentscope.core.model.ChatUsage;
import java.nio.file.Path;
import java.util.Objects;
@@ -28,6 +30,7 @@ public class ReActAgentContext {
private final String conversationId;
private final String agentKey;
private final Path workspaceDir;
private volatile ChatUsageTrackingHook chatUsageTrackingHook;
public ReActAgentContext(Slot slot, String conversationId, String agentKey, Path workspaceDir) {
this.slot = Objects.requireNonNull(slot, "slot");
@@ -43,4 +46,25 @@ public class ReActAgentContext {
public String getAgentKey() { return agentKey; }
public Path getWorkspaceDir() { return workspaceDir; }
/**
* 由框架注入:本次 {@code process()} 调用使用的 token 累加 hook。
*/
public void setChatUsageTrackingHook(ChatUsageTrackingHook hook) {
this.chatUsageTrackingHook = hook;
}
/**
* 返回本次 {@code process()} 截至当前已累计的 token 用量。
*
* <p>{@link ChatUsage#getInputTokens()} / {@link ChatUsage#getOutputTokens()} /
* {@link ChatUsage#getTotalTokens()} 给出累计 token{@link ChatUsage#getTime()}
* 给出累计推理耗时(秒)。在 {@code handleReply()} 中调用拿到的就是整次调用的累计值。
*
* @return 累计 ChatUsage若未观察到任何 usage模型未上报或 reply 为 null则返回 {@code null}
*/
public ChatUsage getChatUsage() {
ChatUsageTrackingHook hook = this.chatUsageTrackingHook;
return hook == null ? null : hook.snapshot();
}
}

View File

@@ -0,0 +1,73 @@
package com.yomahub.liteflow.agent.hook;
import io.agentscope.core.hook.Hook;
import io.agentscope.core.hook.HookEvent;
import io.agentscope.core.hook.PostReasoningEvent;
import io.agentscope.core.message.Msg;
import io.agentscope.core.model.ChatUsage;
import reactor.core.publisher.Mono;
/**
* 累加单次 {@code process()} 调用内所有 reasoning step 的 token 用量。
*
* <p>底层 agentscope 每次 {@code reasoning(iter)} 都新建一个 {@link io.agentscope.core.agent.accumulator.ReasoningContext
* ReasoningContext},因此 {@link PostReasoningEvent#getReasoningMessage()} 的 metadata 中携带的
* {@link ChatUsage} 只是本步 LLM call 的累计(流式聚合),不是跨多步 ReAct 循环的累计。
* 本 hook 在每次 PostReasoningEvent 触发时把当步 usage 累加到内部计数器,
* 暴露整次调用累计后的 {@link #snapshot()}。
*
* <p>实例与缓存 ReActAgent 同生命周期;每次 {@code process()} 开始前必须调用 {@link #reset()}
* 清零,避免上次调用的余量被带入。
*/
public class ChatUsageTrackingHook implements Hook {
private int inputTokens;
private int outputTokens;
private double time;
private int steps;
@Override
public synchronized <T extends HookEvent> Mono<T> onEvent(T event) {
if (event instanceof PostReasoningEvent e) {
Msg msg = e.getReasoningMessage();
if (msg != null) {
ChatUsage usage = msg.getChatUsage();
if (usage != null) {
inputTokens += usage.getInputTokens();
outputTokens += usage.getOutputTokens();
time += usage.getTime();
steps++;
}
}
}
return Mono.just(event);
}
public synchronized void reset() {
this.inputTokens = 0;
this.outputTokens = 0;
this.time = 0;
this.steps = 0;
}
/**
* 返回到目前为止累计的 token 用量;若尚未观察到任何 usage返回 {@code null}。
*/
public synchronized ChatUsage snapshot() {
if (steps == 0) {
return null;
}
return ChatUsage.builder()
.inputTokens(inputTokens)
.outputTokens(outputTokens)
.time(time)
.build();
}
/**
* 已经累加过 usage 的 reasoning step 次数。
*/
public synchronized int getSteps() {
return steps;
}
}

View File

@@ -11,6 +11,7 @@ import io.agentscope.core.message.Msg;
import io.agentscope.core.message.ToolResultBlock;
import io.agentscope.core.message.ToolUseBlock;
import io.agentscope.core.message.ThinkingBlock;
import io.agentscope.core.model.ChatUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
@@ -69,6 +70,15 @@ public class ReActLoggingHook implements Hook {
sessionId, text, summarizeToolUses(tools));
}
}
ChatUsage usage = reply == null ? null : reply.getChatUsage();
if (usage != null) {
LOG.info("[agent:reason][{}] <<< usage input={} output={} total={} time={}s",
sessionId,
usage.getInputTokens(),
usage.getOutputTokens(),
usage.getTotalTokens(),
usage.getTime());
}
} else if (event instanceof PreActingEvent e) {
ToolUseBlock t = e.getToolUse();
LOG.info("[agent:act][{}] >>> tool={} input={}",

View File

@@ -1,5 +1,6 @@
package com.yomahub.liteflow.agent.session;
import com.yomahub.liteflow.agent.hook.ChatUsageTrackingHook;
import com.yomahub.liteflow.agent.skill.SkillTrackingHook;
import java.nio.file.Path;
@@ -30,6 +31,7 @@ public class AgentSession {
private final ReentrantLock lock = new ReentrantLock();
private volatile Object agent;
private volatile SkillTrackingHook skillTrackingHook;
private volatile ChatUsageTrackingHook chatUsageTrackingHook;
private volatile Instant lastActive = Instant.now();
public AgentSession(String conversationId, String agentKey, String cacheKey, Path workspaceDir) {
@@ -78,6 +80,14 @@ public class AgentSession {
this.skillTrackingHook = skillTrackingHook;
}
public ChatUsageTrackingHook getChatUsageTrackingHook() {
return chatUsageTrackingHook;
}
public void setChatUsageTrackingHook(ChatUsageTrackingHook chatUsageTrackingHook) {
this.chatUsageTrackingHook = chatUsageTrackingHook;
}
public Instant getLastActive() {
return lastActive;
}