From a5735c3f4d00b2f5bfd32a5b7a7d91af447c2a06 Mon Sep 17 00:00:00 2001 From: "everywhere.z" Date: Sat, 9 May 2026 15:01:19 +0800 Subject: [PATCH] feat(core): add ExecuteOption and ConversationIdGenerator to FlowExecutor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增 ExecuteOption 统一入口,支持 requestId、conversationId、上下文等维度的自由组合, 避免 execute2RespWithXxx 系列方法命名爆炸。同时新增 ConversationIdGenerator 为 agent 对话连续性提供 NanoId 格式的 conversationId 生成能力。LiteflowResponse 增加 conversationId 字段用于返回给调用方。 Co-Authored-By: Claude Opus 4.7 --- .../yomahub/liteflow/core/ExecuteOption.java | 111 ++++++++++++++++++ .../yomahub/liteflow/core/FlowExecutor.java | 84 ++++++++++++- .../liteflow/flow/LiteflowResponse.java | 4 + .../util/ConversationIdGenerator.java | 31 +++++ 4 files changed, 229 insertions(+), 1 deletion(-) create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/core/ExecuteOption.java create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/util/ConversationIdGenerator.java diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/ExecuteOption.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/ExecuteOption.java new file mode 100644 index 000000000..2deacb8e7 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/ExecuteOption.java @@ -0,0 +1,111 @@ +package com.yomahub.liteflow.core; + +/** + * {@link FlowExecutor} 的执行选项参数对象。 + * + *

用于在不增加方法 overload 数量的前提下,灵活组合各种执行维度——例如同时 + * 指定 {@code requestId}、{@code conversationId} 与自定义上下文。所有字段都是 + * 可选的:未设置即沿用框架默认行为。 + * + *

典型用法: + *

{@code
+ *     // 仅指定 conversationId(agent 连续对话场景)
+ *     flowExecutor.execute2Resp("chain1", param,
+ *         ExecuteOption.of().conversationId("user-1024-task-abc"));
+ *
+ *     // 让框架自动生成 conversationId
+ *     LiteflowResponse r = flowExecutor.execute2Resp("chain1", param,
+ *         ExecuteOption.of().autoConversationId());
+ *     String cid = r.getConversationId(); // 取回后续调用可复用
+ *
+ *     // 同时指定 rid 和 cid,并附带上下文 Class
+ *     flowExecutor.execute2Resp("chain1", param,
+ *         ExecuteOption.of()
+ *                 .requestId(rid)
+ *                 .conversationId(cid)
+ *                 .contextClass(MyCtx.class));
+ * }
+ */ +public class ExecuteOption { + + private String requestId; + private String conversationId; + private boolean autoConversationId; + private Class[] contextBeanClasses; + private Object[] contextBeans; + + private ExecuteOption() {} + + /** 创建一个空的执行选项,所有字段均未设置。 */ + public static ExecuteOption of() { + return new ExecuteOption(); + } + + /** + * 指定本次执行的 requestId。{@code null} 或空字符串等价于未设置—— + * 框架会按既有逻辑自动生成。 + */ + public ExecuteOption requestId(String requestId) { + this.requestId = requestId; + return this; + } + + /** + * 指定本次执行的 conversationId(业务会话标识)。 + * + *

主要用于 ReAct Agent 连续对话场景:同一段对话中的所有 agent + * 共享 workspace 目录,跨次调用传入相同 conversationId 即可恢复会话。 + * + *

显式调用本方法会取消 {@link #autoConversationId()} 的语义。 + */ + public ExecuteOption conversationId(String conversationId) { + this.conversationId = conversationId; + this.autoConversationId = false; + return this; + } + + /** + * 声明本次执行需要 conversationId 但具体值由框架生成(NanoId 格式)。 + * 生成的值可通过 {@link com.yomahub.liteflow.flow.LiteflowResponse#getConversationId()} + * 取回,调用方据此在下一次调用中传回 {@link #conversationId(String)} 以延续会话。 + * + *

调用本方法会清掉之前 {@link #conversationId(String)} 设置的具体值。 + */ + public ExecuteOption autoConversationId() { + this.autoConversationId = true; + this.conversationId = null; + return this; + } + + /** 指定上下文 bean 的 Class 数组。框架会根据 Class 创建实例。 */ + public ExecuteOption contextClass(Class... contextBeanClasses) { + this.contextBeanClasses = contextBeanClasses; + return this; + } + + /** 指定上下文 bean 实例数组。 */ + public ExecuteOption contextBean(Object... contextBeans) { + this.contextBeans = contextBeans; + return this; + } + + public String getRequestId() { + return requestId; + } + + public String getConversationId() { + return conversationId; + } + + public boolean isAutoConversationId() { + return autoConversationId; + } + + public Class[] getContextBeanClasses() { + return contextBeanClasses; + } + + public Object[] getContextBeans() { + return contextBeans; + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java index a6e482855..853b7ad77 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java @@ -43,6 +43,7 @@ import com.yomahub.liteflow.slot.Slot; import com.yomahub.liteflow.spi.holder.ContextCmpInitHolder; import com.yomahub.liteflow.spi.holder.PathContentParserHolder; import com.yomahub.liteflow.thread.ExecutorHelper; +import com.yomahub.liteflow.util.ConversationIdGenerator; import com.yomahub.liteflow.util.ElRegexUtil; import java.util.*; @@ -386,6 +387,69 @@ public class FlowExecutor { return this.execute2Resp(chainId, param, requestId, null, contextBeanArray); } + /** + * 使用 {@link ExecuteOption} 执行 chain,自由组合 requestId、conversationId、上下文等执行维度。 + * + *

这是新代码的推荐入口。当需要 conversationId(典型场景:ReAct Agent 连续对话)、 + * 同时传入 requestId 与多上下文时,相比多个 {@code WithXxx} 方法 overload, + * 单一 ExecuteOption 入口能避免命名爆炸: + *

{@code
+	 *     flowExecutor.execute2Resp("chain1", param,
+	 *         ExecuteOption.of()
+	 *             .requestId(rid)
+	 *             .conversationId(cid)
+	 *             .contextClass(MyCtx.class));
+	 * }
+ * + *

已有 {@code execute2RespWithRid(...)} 等方法继续可用,行为不变。 + * + * @param chainId chain id + * @param param 入参,将作为 chainReqData 注入 slot + * @param option 执行选项;{@code null} 等价于 {@link ExecuteOption#of()} + * @return LiteflowResponse + */ + public LiteflowResponse execute2Resp(String chainId, Object param, ExecuteOption option) { + ExecuteOption opt = option == null ? ExecuteOption.of() : option; + return this.execute2Resp(chainId, param, opt.getRequestId(), resolveConversationId(opt), + opt.getContextBeanClasses(), opt.getContextBeans()); + } + + /** + * 异步版本:{@link #execute2Resp(String, Object, ExecuteOption)}。 + */ + public Future execute2Future(String chainId, Object param, ExecuteOption option) { + ExecuteOption opt = option == null ? ExecuteOption.of() : option; + // 注意:cid 解析必须在主线程完成,否则 NanoId 会延迟到 worker 线程才生成, + // 导致调用方拿到的 Future 关联的 cid 与 response 不一致的可能性。这里提前定型。 + String resolvedCid = resolveConversationId(opt); + String requestId = opt.getRequestId(); + Class[] ctxClasses = opt.getContextBeanClasses(); + Object[] ctxBeans = opt.getContextBeans(); + return ExecutorHelper.loadInstance() + .buildMainExecutor(liteflowConfig.getMainExecutorClass()) + .submit(() -> FlowExecutorHolder.loadInstance().execute2Resp( + chainId, param, requestId, resolvedCid, ctxClasses, ctxBeans)); + } + + /** + * 把 ExecuteOption 中关于 conversationId 的两类语义(显式值 / 自动生成) + * 归约成一个最终值: + *

+ */ + private String resolveConversationId(ExecuteOption opt) { + if (StrUtil.isNotBlank(opt.getConversationId())) { + return opt.getConversationId(); + } + if (opt.isAutoConversationId()) { + return ConversationIdGenerator.generate(); + } + return null; + } + public List executeRouteChainWithRid(Object param, String requestId, Object... contextBeanArray) { return this.executeWithRoute(null, param, requestId, null, contextBeanArray); } @@ -433,7 +497,12 @@ public class FlowExecutor { private LiteflowResponse execute2Resp(String chainId, Object param, String requestId, Class[] contextBeanClazzArray, Object[] contextBeanArray) { - Slot slot = doExecute(chainId, param, requestId, contextBeanClazzArray, contextBeanArray, ChainExecuteModeEnum.BODY); + return execute2Resp(chainId, param, requestId, null, contextBeanClazzArray, contextBeanArray); + } + + private LiteflowResponse execute2Resp(String chainId, Object param, String requestId, String conversationId, + Class[] contextBeanClazzArray, Object[] contextBeanArray) { + Slot slot = doExecute(chainId, param, requestId, conversationId, contextBeanClazzArray, contextBeanArray, ChainExecuteModeEnum.BODY); return LiteflowResponse.newMainResponse(slot); } @@ -444,6 +513,12 @@ public class FlowExecutor { private Slot doExecute(String chainId, Object param, String requestId, Class[] contextBeanClazzArray, Object[] contextBeanArray, ChainExecuteModeEnum chainExecuteModeEnum) { + return doExecute(chainId, param, requestId, null, contextBeanClazzArray, contextBeanArray, chainExecuteModeEnum); + } + + private Slot doExecute(String chainId, Object param, String requestId, String conversationId, + Class[] contextBeanClazzArray, Object[] contextBeanArray, + ChainExecuteModeEnum chainExecuteModeEnum) { if (FlowBus.needInit()) { init(true); } @@ -483,6 +558,13 @@ public class FlowExecutor { LOG.info("requestId has generated"); } + // 如果调用方明确传入了 conversationId,则写入 slot;用于 ReAct Agent 等 + // 需要在 chain 内多个组件之间共享会话上下文的场景。未传入时不主动设置, + // 由具体组件按其默认策略处理(例如 ReActAgentComponent 会按需懒生成)。 + if (StrUtil.isNotBlank(conversationId)){ + slot.setConversationId(conversationId); + } + LOG.info("slot[{}] offered", slotIndex); if (ObjectUtil.isNotNull(param)) { diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/LiteflowResponse.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/LiteflowResponse.java index d5eec0e8e..eb63aa7e7 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/LiteflowResponse.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/LiteflowResponse.java @@ -181,6 +181,10 @@ public class LiteflowResponse { return this.getSlot().getRequestId(); } + public String getConversationId() { + return this.getSlot().getConversationId(); + } + public String getChainId() { return chainId; } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/ConversationIdGenerator.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/ConversationIdGenerator.java new file mode 100644 index 000000000..ba9634120 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/ConversationIdGenerator.java @@ -0,0 +1,31 @@ +package com.yomahub.liteflow.util; + +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.lang.id.NanoId; +import cn.hutool.core.util.StrUtil; + +import java.util.Date; + +/** + * 默认的 {@code conversationId} 生成器:日期前缀 + 12 位 NanoId。 + * + *

{@code conversationId} 用于在 chain 执行期间标识一段"业务会话",由 + * {@link com.yomahub.liteflow.slot.Slot#setConversationId(String)} 写入 slot, + * 同 chain 内所有需要会话上下文的组件(典型如 ReAct Agent)共享。 + * + *

不传 conversationId 时由 {@link com.yomahub.liteflow.core.FlowExecutor} 在 slot + * 创建后调用本工具生成一次性标识;传入则原样使用,便于跨调用恢复会话。 + */ +public final class ConversationIdGenerator { + + private static final char[] CODE_ALPHABET = + "123456789ABCDEFGHIJKLMNPQRSTUVWXYZ".toCharArray(); + + private ConversationIdGenerator() {} + + public static String generate() { + String date = DateUtil.format(new Date(), "yyyyMMdd"); + String code = NanoId.randomNanoId(null, CODE_ALPHABET, 12); + return StrUtil.format("{}_{}", date, code); + } +}