feat(core): add ExecuteOption and ConversationIdGenerator to FlowExecutor

新增 ExecuteOption 统一入口,支持 requestId、conversationId、上下文等维度的自由组合,
避免 execute2RespWithXxx 系列方法命名爆炸。同时新增 ConversationIdGenerator 为
agent 对话连续性提供 NanoId 格式的 conversationId 生成能力。LiteflowResponse 增加
conversationId 字段用于返回给调用方。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
everywhere.z
2026-05-09 15:01:19 +08:00
parent 36d945fca2
commit a5735c3f4d
4 changed files with 229 additions and 1 deletions

View File

@@ -0,0 +1,111 @@
package com.yomahub.liteflow.core;
/**
* {@link FlowExecutor} 的执行选项参数对象。
*
* <p>用于在不增加方法 overload 数量的前提下,灵活组合各种执行维度——例如同时
* 指定 {@code requestId}、{@code conversationId} 与自定义上下文。所有字段都是
* 可选的:未设置即沿用框架默认行为。
*
* <p>典型用法:
* <pre>{@code
* // 仅指定 conversationIdagent 连续对话场景)
* flowExecutor.execute2Resp("chain1", param,
* ExecuteOption.of().conversationId("user-1024-task-abc"));
*
* // 让框架自动生成 conversationId
* LiteflowResponse r = flowExecutor.execute2Resp("chain1", param,
* ExecuteOption.of().autoConversationId());
* String cid = r.getConversationId(); // 取回后续调用可复用
*
* // 同时指定 rid 和 cid并附带上下文 Class
* flowExecutor.execute2Resp("chain1", param,
* ExecuteOption.of()
* .requestId(rid)
* .conversationId(cid)
* .contextClass(MyCtx.class));
* }</pre>
*/
public class ExecuteOption {
private String requestId;
private String conversationId;
private boolean autoConversationId;
private Class<?>[] contextBeanClasses;
private Object[] contextBeans;
private ExecuteOption() {}
/** 创建一个空的执行选项,所有字段均未设置。 */
public static ExecuteOption of() {
return new ExecuteOption();
}
/**
* 指定本次执行的 requestId。{@code null} 或空字符串等价于未设置——
* 框架会按既有逻辑自动生成。
*/
public ExecuteOption requestId(String requestId) {
this.requestId = requestId;
return this;
}
/**
* 指定本次执行的 conversationId业务会话标识
*
* <p>主要用于 ReAct Agent 连续对话场景:同一段对话中的所有 agent
* 共享 workspace 目录,跨次调用传入相同 conversationId 即可恢复会话。
*
* <p>显式调用本方法会取消 {@link #autoConversationId()} 的语义。
*/
public ExecuteOption conversationId(String conversationId) {
this.conversationId = conversationId;
this.autoConversationId = false;
return this;
}
/**
* 声明本次执行需要 conversationId 但具体值由框架生成NanoId 格式)。
* 生成的值可通过 {@link com.yomahub.liteflow.flow.LiteflowResponse#getConversationId()}
* 取回,调用方据此在下一次调用中传回 {@link #conversationId(String)} 以延续会话。
*
* <p>调用本方法会清掉之前 {@link #conversationId(String)} 设置的具体值。
*/
public ExecuteOption autoConversationId() {
this.autoConversationId = true;
this.conversationId = null;
return this;
}
/** 指定上下文 bean 的 Class 数组。框架会根据 Class 创建实例。 */
public ExecuteOption contextClass(Class<?>... contextBeanClasses) {
this.contextBeanClasses = contextBeanClasses;
return this;
}
/** 指定上下文 bean 实例数组。 */
public ExecuteOption contextBean(Object... contextBeans) {
this.contextBeans = contextBeans;
return this;
}
public String getRequestId() {
return requestId;
}
public String getConversationId() {
return conversationId;
}
public boolean isAutoConversationId() {
return autoConversationId;
}
public Class<?>[] getContextBeanClasses() {
return contextBeanClasses;
}
public Object[] getContextBeans() {
return contextBeans;
}
}

View File

@@ -43,6 +43,7 @@ import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.spi.holder.ContextCmpInitHolder;
import com.yomahub.liteflow.spi.holder.PathContentParserHolder;
import com.yomahub.liteflow.thread.ExecutorHelper;
import com.yomahub.liteflow.util.ConversationIdGenerator;
import com.yomahub.liteflow.util.ElRegexUtil;
import java.util.*;
@@ -386,6 +387,69 @@ public class FlowExecutor {
return this.execute2Resp(chainId, param, requestId, null, contextBeanArray);
}
/**
* 使用 {@link ExecuteOption} 执行 chain自由组合 requestId、conversationId、上下文等执行维度。
*
* <p>这是新代码的推荐入口。当需要 conversationId典型场景ReAct Agent 连续对话)、
* 同时传入 requestId 与多上下文时,相比多个 {@code WithXxx} 方法 overload
* 单一 ExecuteOption 入口能避免命名爆炸:
* <pre>{@code
* flowExecutor.execute2Resp("chain1", param,
* ExecuteOption.of()
* .requestId(rid)
* .conversationId(cid)
* .contextClass(MyCtx.class));
* }</pre>
*
* <p>已有 {@code execute2RespWithRid(...)} 等方法继续可用,行为不变。
*
* @param chainId chain id
* @param param 入参,将作为 chainReqData 注入 slot
* @param option 执行选项;{@code null} 等价于 {@link ExecuteOption#of()}
* @return LiteflowResponse
*/
public LiteflowResponse execute2Resp(String chainId, Object param, ExecuteOption option) {
ExecuteOption opt = option == null ? ExecuteOption.of() : option;
return this.execute2Resp(chainId, param, opt.getRequestId(), resolveConversationId(opt),
opt.getContextBeanClasses(), opt.getContextBeans());
}
/**
* 异步版本:{@link #execute2Resp(String, Object, ExecuteOption)}。
*/
public Future<LiteflowResponse> execute2Future(String chainId, Object param, ExecuteOption option) {
ExecuteOption opt = option == null ? ExecuteOption.of() : option;
// 注意cid 解析必须在主线程完成,否则 NanoId 会延迟到 worker 线程才生成,
// 导致调用方拿到的 Future 关联的 cid 与 response 不一致的可能性。这里提前定型。
String resolvedCid = resolveConversationId(opt);
String requestId = opt.getRequestId();
Class<?>[] ctxClasses = opt.getContextBeanClasses();
Object[] ctxBeans = opt.getContextBeans();
return ExecutorHelper.loadInstance()
.buildMainExecutor(liteflowConfig.getMainExecutorClass())
.submit(() -> FlowExecutorHolder.loadInstance().execute2Resp(
chainId, param, requestId, resolvedCid, ctxClasses, ctxBeans));
}
/**
* 把 ExecuteOption 中关于 conversationId 的两类语义(显式值 / 自动生成)
* 归约成一个最终值:
* <ul>
* <li>显式 {@code conversationId(...)}(非空白)→ 用之;</li>
* <li>{@code autoConversationId()} → 调用 {@link ConversationIdGenerator} 生成;</li>
* <li>都未声明 → 返回 {@code null},由后续组件按需自行处理(不主动写入 slot。</li>
* </ul>
*/
private String resolveConversationId(ExecuteOption opt) {
if (StrUtil.isNotBlank(opt.getConversationId())) {
return opt.getConversationId();
}
if (opt.isAutoConversationId()) {
return ConversationIdGenerator.generate();
}
return null;
}
public List<LiteflowResponse> executeRouteChainWithRid(Object param, String requestId, Object... contextBeanArray) {
return this.executeWithRoute(null, param, requestId, null, contextBeanArray);
}
@@ -433,7 +497,12 @@ public class FlowExecutor {
private LiteflowResponse execute2Resp(String chainId, Object param, String requestId, Class<?>[] contextBeanClazzArray,
Object[] contextBeanArray) {
Slot slot = doExecute(chainId, param, requestId, contextBeanClazzArray, contextBeanArray, ChainExecuteModeEnum.BODY);
return execute2Resp(chainId, param, requestId, null, contextBeanClazzArray, contextBeanArray);
}
private LiteflowResponse execute2Resp(String chainId, Object param, String requestId, String conversationId,
Class<?>[] contextBeanClazzArray, Object[] contextBeanArray) {
Slot slot = doExecute(chainId, param, requestId, conversationId, contextBeanClazzArray, contextBeanArray, ChainExecuteModeEnum.BODY);
return LiteflowResponse.newMainResponse(slot);
}
@@ -444,6 +513,12 @@ public class FlowExecutor {
private Slot doExecute(String chainId, Object param, String requestId, Class<?>[] contextBeanClazzArray, Object[] contextBeanArray,
ChainExecuteModeEnum chainExecuteModeEnum) {
return doExecute(chainId, param, requestId, null, contextBeanClazzArray, contextBeanArray, chainExecuteModeEnum);
}
private Slot doExecute(String chainId, Object param, String requestId, String conversationId,
Class<?>[] contextBeanClazzArray, Object[] contextBeanArray,
ChainExecuteModeEnum chainExecuteModeEnum) {
if (FlowBus.needInit()) {
init(true);
}
@@ -483,6 +558,13 @@ public class FlowExecutor {
LOG.info("requestId has generated");
}
// 如果调用方明确传入了 conversationId则写入 slot用于 ReAct Agent 等
// 需要在 chain 内多个组件之间共享会话上下文的场景。未传入时不主动设置,
// 由具体组件按其默认策略处理(例如 ReActAgentComponent 会按需懒生成)。
if (StrUtil.isNotBlank(conversationId)){
slot.setConversationId(conversationId);
}
LOG.info("slot[{}] offered", slotIndex);
if (ObjectUtil.isNotNull(param)) {

View File

@@ -181,6 +181,10 @@ public class LiteflowResponse {
return this.getSlot().getRequestId();
}
public String getConversationId() {
return this.getSlot().getConversationId();
}
public String getChainId() {
return chainId;
}

View File

@@ -0,0 +1,31 @@
package com.yomahub.liteflow.util;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.id.NanoId;
import cn.hutool.core.util.StrUtil;
import java.util.Date;
/**
* 默认的 {@code conversationId} 生成器:日期前缀 + 12 位 NanoId。
*
* <p>{@code conversationId} 用于在 chain 执行期间标识一段"业务会话",由
* {@link com.yomahub.liteflow.slot.Slot#setConversationId(String)} 写入 slot
* 同 chain 内所有需要会话上下文的组件(典型如 ReAct Agent共享。
*
* <p>不传 conversationId 时由 {@link com.yomahub.liteflow.core.FlowExecutor} 在 slot
* 创建后调用本工具生成一次性标识;传入则原样使用,便于跨调用恢复会话。
*/
public final class ConversationIdGenerator {
private static final char[] CODE_ALPHABET =
"123456789ABCDEFGHIJKLMNPQRSTUVWXYZ".toCharArray();
private ConversationIdGenerator() {}
public static String generate() {
String date = DateUtil.format(new Date(), "yyyyMMdd");
String code = NanoId.randomNanoId(null, CODE_ALPHABET, 12);
return StrUtil.format("{}_{}", date, code);
}
}