From e56960cb8e55d2e19aafc552efa3bdd739cb7ca1 Mon Sep 17 00:00:00 2001 From: luoyi <972849752@qq.com> Date: Fri, 4 Jul 2025 16:55:27 +0800 Subject: [PATCH] =?UTF-8?q?bug=20=E8=B0=83=E6=95=B4=20FlowExecutor=20?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yomahub/liteflow/core/FlowExecutor.java | 1202 ++++++++--------- 1 file changed, 601 insertions(+), 601 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java index 3afa01ffc..1648fa379 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java @@ -58,616 +58,616 @@ import java.util.stream.Collectors; */ public class FlowExecutor { - private static final LFLog LOG = LFLoggerManager.getLogger(FlowExecutor.class); - - private static final String PREFIX_FORMAT_CONFIG_REGEX = "el_xml:|el_json:|el_yml:"; - - private LiteflowConfig liteflowConfig; - - public FlowExecutor() { - // 设置FlowExecutor的Holder,虽然大部分地方都可以通过Spring上下文获取到,但放入Holder,还是为了某些地方能方便的取到 - FlowExecutorHolder.setHolder(this); - // 初始化DataBus - DataBus.init(); - } - - public FlowExecutor(LiteflowConfig liteflowConfig) { - this.liteflowConfig = liteflowConfig; - // 把liteFlowConfig设到LiteFlowGetter中去 - LiteflowConfigGetter.setLiteflowConfig(liteflowConfig); - // 设置FlowExecutor的Holder,虽然大部分地方都可以通过Spring上下文获取到,但放入Holder,还是为了某些地方能方便的取到 - FlowExecutorHolder.setHolder(this); - if (!liteflowConfig.getParseMode().equals(ParseModeEnum.PARSE_ALL_ON_FIRST_EXEC)) { - this.init(true); - } - // 初始化DataBus - DataBus.init(); - } - - /** - * FlowExecutor的初始化化方式,主要用于parse规则文件 - * isStart表示是否是系统启动阶段,启动阶段要做额外的事情,而因为reload所调用的init就不用做 - */ - public void init(boolean isStart) { - if (ObjectUtil.isNull(liteflowConfig)) { - throw new ConfigErrorException("config error, please check liteflow config property"); - } - - // 在相应的环境下进行节点的初始化工作 - // 在spring体系下会获得spring扫描后的节点,接入元数据 - // 在非spring体系下是一个空实现,等于不做此步骤 - ContextCmpInitHolder.loadContextCmpInit().initCmp(); - - if (isStart){ - // 进行id生成器的初始化 - IdGeneratorHolder.init(); - } - - String ruleSource = liteflowConfig.getRuleSource(); - if (StrUtil.isBlank(ruleSource)) { - // 查看有没有Parser的SPI实现 - // 所有的Parser的SPI实现都是以custom形式放入的,且只支持xml形式 - ServiceLoader loader = ServiceLoader.load(ParserClassNameSpi.class); - Iterator it = loader.iterator(); - if (it.hasNext()) { - ParserClassNameSpi parserClassNameSpi = it.next(); - ruleSource = "el_xml:" + parserClassNameSpi.getSpiClassName(); - liteflowConfig.setRuleSource(ruleSource); - } - else { - // ruleSource为空,而且没有spi形式的扩展,那么说明真的没有ruleSource - // 这种情况有可能是基于代码动态构建的 - return; - } - } - - // 如果有前缀的,则不需要再进行分割了,说明是一个整体 - // 如果没有前缀,说明是本地文件,可能配置多个,所以需要分割 - List sourceRulePathList; - if (ReUtil.contains(PREFIX_FORMAT_CONFIG_REGEX, ruleSource)) { - sourceRulePathList = ListUtil.toList(ruleSource); - } - else { - String afterHandleRuleSource = ruleSource.replace(StrUtil.SPACE, StrUtil.EMPTY); - sourceRulePathList = ListUtil.toList(afterHandleRuleSource.split(",|;")); - } - - FlowParser parser = null; - Set parserNameSet = new HashSet<>(); - List rulePathList = new ArrayList<>(); - for (String path : sourceRulePathList) { - try { - // 查找对应的解析器 - parser = FlowParserProvider.lookup(path); - parserNameSet.add(parser.getClass().getName()); - // 替换掉前缀标识(如:xml:/json:),保留剩下的完整地址,并统一路径格式 - path = ReUtil.replaceAll(path, PREFIX_FORMAT_CONFIG_REGEX, "").replace("\\", "/"); - rulePathList.add(path); - - // 支持多类型的配置文件,分别解析 - if (BooleanUtil.isTrue(liteflowConfig.isSupportMultipleType())) { - // 解析文件 - parser.parseMain(ListUtil.toList(path)); - } - } - catch (CyclicDependencyException e) { - LOG.error(e.getMessage()); - throw e; - } - catch (Exception e) { - String errorMsg = StrUtil.format("init flow executor cause error for path {},reason:{}", path, - e.getMessage()); - LOG.error(e.getMessage(), e); - throw new FlowExecutorNotInitException(errorMsg); - } - } - - // 单类型的配置文件,需要一起解析 - if (BooleanUtil.isFalse(liteflowConfig.isSupportMultipleType())) { - // 检查Parser是否只有一个,因为多个不同的parser会造成子流程的混乱 - if (parserNameSet.size() > 1) { - String errorMsg = "cannot have multiple different parsers"; - LOG.error(errorMsg); - throw new MultipleParsersException(errorMsg); - } - - // 进行多个配置文件的一起解析 - try { - if (parser != null) { - // 解析文件 - parser.parseMain(rulePathList); - } - else { - throw new ConfigErrorException("parse error, please check liteflow config property"); - } - } - catch (CyclicDependencyException e) { - LOG.error(e.getMessage(), e); - LOG.error(e.getMessage()); - throw e; - } - catch (ChainDuplicateException e) { - LOG.error(e.getMessage(), e); - throw e; - } - catch (RouteELInvalidException e) { - LOG.error(e.getMessage(), e); - throw e; - } - catch (Exception e) { - String errorMsg = StrUtil.format("init flow executor cause error for path {},reason: {}", rulePathList, - e.getMessage()); - LOG.error(e.getMessage(), e); - throw new FlowExecutorNotInitException(errorMsg); - } - } - - // 如果是ruleSource方式的,最后判断下有没有解析出来,如果没有解析出来则报错 - if (StrUtil.isBlank(liteflowConfig.getRuleSourceExtData()) - && MapUtil.isEmpty(liteflowConfig.getRuleSourceExtDataMap())) { - if (FlowBus.getChainMap().isEmpty()) { - String errMsg = StrUtil.format("no valid rule config found in rule path [{}]", - liteflowConfig.getRuleSource()); - throw new ConfigErrorException(errMsg); - } - } - - // 执行钩子 - if (isStart) { - FlowInitHook.executeHook(); - } - - // 文件监听 - if (isStart && liteflowConfig.getEnableMonitorFile()) { - try { - addMonitorFilePaths(rulePathList); - MonitorFile.getInstance().create(); - } - catch (Exception e) { - String errMsg = StrUtil.format("file monitor init error for path:{}", rulePathList); - throw new MonitorFileInitErrorException(errMsg); - } - - } - } - - // 此方法就是从原有的配置源主动拉取新的进行刷新 - // 和FlowBus.refreshFlowMetaData的区别就是一个为主动拉取,一个为被动监听到新的内容进行刷新 - public void reloadRule() { - long start = System.currentTimeMillis(); - init(false); - LOG.info("reload rules takes {}ms", System.currentTimeMillis() - start); - } - - // 单独调用某一个node - @Deprecated - public void invoke(String nodeId, Integer slotIndex) throws Exception { - Node node = FlowBus.getNode(nodeId); - node.execute(slotIndex); - } - - // 调用一个流程并返回LiteflowResponse,上下文为默认的DefaultContext,初始参数为null - public LiteflowResponse execute2Resp(String chainId) { - return this.execute2Resp(chainId, null, DefaultContext.class); - } - - // 调用一个流程并返回LiteflowResponse,上下文为默认的DefaultContext - public LiteflowResponse execute2Resp(String chainId, Object param) { - return this.execute2Resp(chainId, param, DefaultContext.class); - } - - // 调用一个流程并返回LiteflowResponse,允许多上下文的传入 - public LiteflowResponse execute2Resp(String chainId, Object param, Class... contextBeanClazzArray) { - return this.execute2Resp(chainId, param, null, contextBeanClazzArray, null); - } - - /** - * 直接执行 EL 表达式 - * - * @param elStr EL 表达式 - * @return LiteflowResponse - * @throws ELParseException - */ - public LiteflowResponse execute2RespWithEL(String elStr) throws Exception { - return this.execute2RespWithEL(elStr, null, null, DefaultContext.class); - } - - /** - * 直接执行 EL 表达式 - * - * @param elStr EL 表达式 - * @param param 入参 - * @return LiteflowResponse - * @throws ELParseException - */ - public LiteflowResponse execute2RespWithEL(String elStr, Object param) throws Exception { - return this.execute2RespWithEL(elStr, param, null, DefaultContext.class); - } - - /** - * 直接执行 EL 表达式 - * - * @param elStr EL 表达式 - * @param param 入参 - * @param requestId 请求 ID - * @param contextBeanClazzArray 上下文 Class - * @return LiteflowResponse - * @throws ELParseException - */ - public LiteflowResponse execute2RespWithEL(String elStr, Object param, String requestId, Class... contextBeanClazzArray) throws Exception { - return this.execute2RespWithEL(elStr, param, requestId, contextBeanClazzArray, null); - } - - /** - * 直接执行 EL 表达式 - * - * @param elStr EL 表达式 - * @param param 入参 - * @param requestId 请求 ID - * @param contextBeanArray 上下文对象 - * @return LiteflowResponse - * @throws ELParseException - */ - public LiteflowResponse execute2RespWithEL(String elStr, Object param, String requestId, Object... contextBeanArray) throws Exception { - return this.execute2RespWithEL(elStr, param, requestId, null, contextBeanArray); - } - - /** - * 直接执行 EL 表达式 - * - * @param elStr EL 表达式 - * @param param 入参 - * @param requestId 请求 ID - * @param contextBeanClazzArray 上下文 Class 数组 - * @param contextBeanArray 上下文对象数组 - * @return LiteflowResponse - * @throws ELParseException - */ - private LiteflowResponse execute2RespWithEL(String elStr, Object param, String requestId, Class[] contextBeanClazzArray, Object[] contextBeanArray) throws Exception { - // 规范化 el 表达式 - String normalizedEl = ElRegexUtil.normalize(elStr); - - // 校验 EL 是否正常 - ValidationResp validationResp = LiteFlowChainELBuilder.validateWithEx(normalizedEl); - - if (!validationResp.isSuccess()) { - // 实际封装的是 ELParseException 类型 - throw validationResp.getCause(); - } - - // 计算 EL MD5 值,并检查对应的 chain 是否已加载到内存中 - String elMd5 = MD5.create().digestHex(normalizedEl); - - String chainId; - - if (StrUtil.isEmpty(chainId = FlowBus.getChainIdByElMd5(elMd5))) { - // 调用表达式构造 chain,并且返回 UUID 作为 chainId - chainId = IdUtil.fastSimpleUUID(); - LiteFlowChainELBuilder.createChain() - .setChainId(chainId) - .setEL(normalizedEl) - .setElMd5(elMd5) - .build(); - } - - return this.execute2Resp(chainId, param, requestId, contextBeanClazzArray, contextBeanArray); - } - - public List executeRouteChain(Object param, Class... contextBeanClazzArray){ - return this.executeWithRoute(null, param, null, contextBeanClazzArray, null); - } - - public List executeRouteChain(String namespace, Object param, Class... contextBeanClazzArray){ - return this.executeWithRoute(namespace, param, null, contextBeanClazzArray, null); - } - - public LiteflowResponse execute2Resp(String chainId, Object param, Object... contextBeanArray) { - return this.execute2Resp(chainId, param, null, null, contextBeanArray); - } - - public List executeRouteChain(Object param, Object... contextBeanArray){ - return this.executeWithRoute(null, param, null, null, contextBeanArray); - } - - public List executeRouteChain(String namespace, Object param, Object... contextBeanArray){ - return this.executeWithRoute(namespace, param, null, null, contextBeanArray); - } - - public LiteflowResponse execute2RespWithRid(String chainId, Object param, String requestId, Class... contextBeanClazzArray) { - return this.execute2Resp(chainId, param, requestId, contextBeanClazzArray, null); - } - - public List executeRouteChainWithRid(Object param, String requestId, Class... contextBeanClazzArray) { - return this.executeWithRoute(null, param, requestId, contextBeanClazzArray, null); - } - - public List executeRouteChainWithRid(String namespace, Object param, String requestId, Class... contextBeanClazzArray) { - return this.executeWithRoute(namespace, param, requestId, contextBeanClazzArray, null); - } - - public LiteflowResponse execute2RespWithRid(String chainId, Object param, String requestId, Object... contextBeanArray) { - return this.execute2Resp(chainId, param, requestId, null, contextBeanArray); - } - - public List executeRouteChainWithRid(Object param, String requestId, Object... contextBeanArray) { - return this.executeWithRoute(null, param, requestId, null, contextBeanArray); - } - - public List executeRouteChainWithRid(String namespace, Object param, String requestId, Object... contextBeanArray) { - return this.executeWithRoute(namespace, param, requestId, null, contextBeanArray); - } - - // 调用一个流程并返回Future,允许多上下文的传入 - public Future execute2Future(String chainId, Object param, Class... contextBeanClazzArray) { - return ExecutorHelper.loadInstance() - .buildMainExecutor(liteflowConfig.getMainExecutorClass()) - .submit(() -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazzArray)); - } - - public Future execute2Future(String chainId, Object param, Object... contextBeanArray) { - return ExecutorHelper.loadInstance() - .buildMainExecutor(liteflowConfig.getMainExecutorClass()) - .submit(() -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanArray)); - } - - public Future execute2FutureWithRid(String chainId, Object param, String requestId, Class... contextBeanClazzArray) { - return ExecutorHelper.loadInstance() - .buildMainExecutor(liteflowConfig.getMainExecutorClass()) - .submit(() -> FlowExecutorHolder.loadInstance().execute2RespWithRid(chainId, param, requestId, contextBeanClazzArray)); - } - - public Future execute2FutureWithRid(String chainId, Object param, String requestId, Object... contextBeanArray) { - return ExecutorHelper.loadInstance() - .buildMainExecutor(liteflowConfig.getMainExecutorClass()) - .submit(() -> FlowExecutorHolder.loadInstance().execute2RespWithRid(chainId, param, requestId, contextBeanArray)); - } - - // 调用一个流程,返回默认的上下文,适用于简单的调用 - @Deprecated - public DefaultContext execute(String chainId, Object param) throws Exception { - LiteflowResponse response = this.execute2Resp(chainId, param, DefaultContext.class); - if (!response.isSuccess()) { - throw response.getCause(); - } - else { - return response.getFirstContextBean(); - } - } - - private LiteflowResponse execute2Resp(String chainId, Object param, String requestId, Class[] contextBeanClazzArray, - Object[] contextBeanArray) { - Slot slot = doExecute(chainId, param, requestId, contextBeanClazzArray, contextBeanArray, ChainExecuteModeEnum.BODY); - return LiteflowResponse.newMainResponse(slot); - } - - private List executeWithRoute(String namespace, Object param, String requestId, Class[] contextBeanClazzArray, Object[] contextBeanArray){ - List slotList = doExecuteWithRoute(namespace, param, requestId, contextBeanClazzArray, contextBeanArray); - return slotList.stream().map(LiteflowResponse::newMainResponse).collect(Collectors.toList()); - } - - private Slot doExecute(String chainId, Object param, String requestId, Class[] contextBeanClazzArray, Object[] contextBeanArray, - ChainExecuteModeEnum chainExecuteModeEnum) { - if (FlowBus.needInit()) { - init(true); - } - - Integer slotIndex; - // 这里可以根据class分配,也可以根据bean去分配 - if (ArrayUtil.isNotEmpty(contextBeanClazzArray)) { - slotIndex = DataBus.offerSlotByClass(ListUtil.toList(contextBeanClazzArray)); - } - else { - slotIndex = DataBus.offerSlotByBean(ListUtil.toList(contextBeanArray)); - } - - if (slotIndex == -1) { - throw new NoAvailableSlotException("there is no available slot"); - } - - Slot slot = DataBus.getSlot(slotIndex); - if (ObjectUtil.isNull(slot)) { - throw new NoAvailableSlotException(StrUtil.format("the slot[{}] is not exist", slotIndex)); - } - - // 如果有FlowExecute生命周期实现,则执行 - if (CollUtil.isNotEmpty(LifeCycleHolder.getPostProcessFlowExecuteLifeCycleList())){ - LifeCycleHolder.getPostProcessFlowExecuteLifeCycleList().forEach( - postProcessFlowExecuteLifeCycle -> postProcessFlowExecuteLifeCycle.postProcessBeforeFlowExecute(chainId, slot) - ); - } - - //如果传入了用户的RequestId,则用这个请求Id,如果没传入,则进行生成 - if (StrUtil.isNotBlank(requestId)){ - slot.putRequestId(requestId); - LFLoggerManager.setRequestId(requestId); - }else if(StrUtil.isBlank(slot.getRequestId())){ - slot.generateRequestId(); - LFLoggerManager.setRequestId(slot.getRequestId()); - LOG.info("requestId has generated"); - } - - LOG.info("slot[{}] offered", slotIndex); - - if (ObjectUtil.isNotNull(param)) { - slot.setRequestData(param); - } - - Chain chain = null; - try { - chain = FlowBus.getChain(chainId); - - if (ObjectUtil.isNull(chain)) { - String errorMsg = StrUtil.format("couldn't find chain with the id[{}]", chainId); - throw new ChainNotFoundException(errorMsg); - } - // 根据chain执行模式执行chain - if (chainExecuteModeEnum.equals(ChainExecuteModeEnum.BODY)){ - chain.execute(slotIndex); - }else if(chainExecuteModeEnum.equals(ChainExecuteModeEnum.ROUTE)){ - chain.executeRoute(slotIndex); - }else{ - throw new LiteFlowException("chain execute mode error"); - } - } - catch (ChainEndException e) { - if (ObjectUtil.isNotNull(chain)) { - String warnMsg = StrUtil.format("chain[{}] execute end on slot[{}]", chain.getChainId(), slotIndex); - LOG.warn(warnMsg); - } - } - catch (Exception e) { - if (ObjectUtil.isNotNull(chain)) { - String errMsg = StrUtil.format("chain[{}] execute error on slot[{}]", chain.getChainId(), slotIndex); - LOG.error(errMsg, e); - } - else { - LOG.error(e.getMessage(), e); - } - - slot.setException(e); - Deque executeSteps = slot.getExecuteSteps(); - try { - Iterator cmpStepIterator = executeSteps.descendingIterator(); - while(cmpStepIterator.hasNext()) { - CmpStep cmpStep = cmpStepIterator.next(); - if(cmpStep.getInstance().isRollback()) { - Rollbackable rollbackItem = cmpStep.getRefNode(); - rollbackItem.rollback(slotIndex); - } - } - } catch (Exception exception) { - LOG.error(exception.getMessage()); - } - finally { - slot.printRollbackStep(); - } - } - finally { - slot.printStep(); - DataBus.releaseSlot(slotIndex); - LFLoggerManager.removeRequestId(); - - // 如果有FlowExecute生命周期实现,则执行 - if (CollUtil.isNotEmpty(LifeCycleHolder.getPostProcessFlowExecuteLifeCycleList())){ - LifeCycleHolder.getPostProcessFlowExecuteLifeCycleList().forEach( - postProcessFlowExecuteLifeCycle -> postProcessFlowExecuteLifeCycle.postProcessAfterFlowExecute(chainId, slot) - ); - } - } - return slot; - } - - public LiteflowConfig getLiteflowConfig() { - return liteflowConfig; - } - - public void setLiteflowConfig(LiteflowConfig liteflowConfig) { - this.liteflowConfig = liteflowConfig; - // 把liteFlowConfig设到LiteFlowGetter中去 - LiteflowConfigGetter.setLiteflowConfig(liteflowConfig); - } - - /** - * 添加监听文件路径 - * @param pathList 文件路径 - */ - private void addMonitorFilePaths(List pathList) throws Exception { - // 添加规则文件监听 - List fileAbsolutePath = PathContentParserHolder.loadContextAware().getFileAbsolutePath(pathList); - MonitorFile.getInstance().addMonitorFilePaths(fileAbsolutePath); - } - - private List doExecuteWithRoute(String namespace, Object param, String requestId, Class[] contextBeanClazzArray, Object[] contextBeanArray){ - if (FlowBus.needInit()) { - init(true); - } - - if (StrUtil.isBlank(namespace)){ - namespace = ChainConstant.DEFAULT_NAMESPACE; - } - - String finalNamespace = namespace; - List routeChainList = FlowBus.getChainMap().values().stream() - .filter(chain -> chain.getNamespace().equals(finalNamespace)) - .filter(chain -> chain.getRouteItem() != null).collect(Collectors.toList()); - - if (CollUtil.isEmpty(routeChainList)){ - String errorMsg = StrUtil.format("no route found for namespace[{}]", finalNamespace); - throw new RouteChainNotFoundException(errorMsg); - } - - String finalRequestId; - if (StrUtil.isBlank(requestId)){ - finalRequestId = IdGeneratorHolder.getInstance().generate(); - }else{ - finalRequestId = requestId; - } - - // 异步执行route el - List routeTupleList = new ArrayList<>(); - for (Chain routeChain : routeChainList){ - CompletableFuture f = CompletableFuture.supplyAsync( + private static final LFLog LOG = LFLoggerManager.getLogger(FlowExecutor.class); + + private static final String PREFIX_FORMAT_CONFIG_REGEX = "el_xml:|el_json:|el_yml:"; + + private LiteflowConfig liteflowConfig; + + public FlowExecutor() { + // 设置FlowExecutor的Holder,虽然大部分地方都可以通过Spring上下文获取到,但放入Holder,还是为了某些地方能方便的取到 + FlowExecutorHolder.setHolder(this); + // 初始化DataBus + DataBus.init(); + } + + public FlowExecutor(LiteflowConfig liteflowConfig) { + this.liteflowConfig = liteflowConfig; + // 把liteFlowConfig设到LiteFlowGetter中去 + LiteflowConfigGetter.setLiteflowConfig(liteflowConfig); + // 设置FlowExecutor的Holder,虽然大部分地方都可以通过Spring上下文获取到,但放入Holder,还是为了某些地方能方便的取到 + FlowExecutorHolder.setHolder(this); + if (!liteflowConfig.getParseMode().equals(ParseModeEnum.PARSE_ALL_ON_FIRST_EXEC)) { + this.init(true); + } + // 初始化DataBus + DataBus.init(); + } + + /** + * FlowExecutor的初始化化方式,主要用于parse规则文件 + * isStart表示是否是系统启动阶段,启动阶段要做额外的事情,而因为reload所调用的init就不用做 + */ + public void init(boolean isStart) { + if (ObjectUtil.isNull(liteflowConfig)) { + throw new ConfigErrorException("config error, please check liteflow config property"); + } + + // 在相应的环境下进行节点的初始化工作 + // 在spring体系下会获得spring扫描后的节点,接入元数据 + // 在非spring体系下是一个空实现,等于不做此步骤 + ContextCmpInitHolder.loadContextCmpInit().initCmp(); + + if (isStart){ + // 进行id生成器的初始化 + IdGeneratorHolder.init(); + } + + String ruleSource = liteflowConfig.getRuleSource(); + if (StrUtil.isBlank(ruleSource)) { + // 查看有没有Parser的SPI实现 + // 所有的Parser的SPI实现都是以custom形式放入的,且只支持xml形式 + ServiceLoader loader = ServiceLoader.load(ParserClassNameSpi.class); + Iterator it = loader.iterator(); + if (it.hasNext()) { + ParserClassNameSpi parserClassNameSpi = it.next(); + ruleSource = "el_xml:" + parserClassNameSpi.getSpiClassName(); + liteflowConfig.setRuleSource(ruleSource); + } + else { + // ruleSource为空,而且没有spi形式的扩展,那么说明真的没有ruleSource + // 这种情况有可能是基于代码动态构建的 + return; + } + } + + // 如果有前缀的,则不需要再进行分割了,说明是一个整体 + // 如果没有前缀,说明是本地文件,可能配置多个,所以需要分割 + List sourceRulePathList; + if (ReUtil.contains(PREFIX_FORMAT_CONFIG_REGEX, ruleSource)) { + sourceRulePathList = ListUtil.toList(ruleSource); + } + else { + String afterHandleRuleSource = ruleSource.replace(StrUtil.SPACE, StrUtil.EMPTY); + sourceRulePathList = ListUtil.toList(afterHandleRuleSource.split(",|;")); + } + + FlowParser parser = null; + Set parserNameSet = new HashSet<>(); + List rulePathList = new ArrayList<>(); + for (String path : sourceRulePathList) { + try { + // 查找对应的解析器 + parser = FlowParserProvider.lookup(path); + parserNameSet.add(parser.getClass().getName()); + // 替换掉前缀标识(如:xml:/json:),保留剩下的完整地址,并统一路径格式 + path = ReUtil.replaceAll(path, PREFIX_FORMAT_CONFIG_REGEX, "").replace("\\", "/"); + rulePathList.add(path); + + // 支持多类型的配置文件,分别解析 + if (BooleanUtil.isTrue(liteflowConfig.isSupportMultipleType())) { + // 解析文件 + parser.parseMain(ListUtil.toList(path)); + } + } + catch (CyclicDependencyException e) { + LOG.error(e.getMessage()); + throw e; + } + catch (Exception e) { + String errorMsg = StrUtil.format("init flow executor cause error for path {},reason:{}", path, + e.getMessage()); + LOG.error(e.getMessage(), e); + throw new FlowExecutorNotInitException(errorMsg); + } + } + + // 单类型的配置文件,需要一起解析 + if (BooleanUtil.isFalse(liteflowConfig.isSupportMultipleType())) { + // 检查Parser是否只有一个,因为多个不同的parser会造成子流程的混乱 + if (parserNameSet.size() > 1) { + String errorMsg = "cannot have multiple different parsers"; + LOG.error(errorMsg); + throw new MultipleParsersException(errorMsg); + } + + // 进行多个配置文件的一起解析 + try { + if (parser != null) { + // 解析文件 + parser.parseMain(rulePathList); + } + else { + throw new ConfigErrorException("parse error, please check liteflow config property"); + } + } + catch (CyclicDependencyException e) { + LOG.error(e.getMessage(), e); + LOG.error(e.getMessage()); + throw e; + } + catch (ChainDuplicateException e) { + LOG.error(e.getMessage(), e); + throw e; + } + catch (RouteELInvalidException e) { + LOG.error(e.getMessage(), e); + throw e; + } + catch (Exception e) { + String errorMsg = StrUtil.format("init flow executor cause error for path {},reason: {}", rulePathList, + e.getMessage()); + LOG.error(e.getMessage(), e); + throw new FlowExecutorNotInitException(errorMsg); + } + } + + // 如果是ruleSource方式的,最后判断下有没有解析出来,如果没有解析出来则报错 + if (StrUtil.isBlank(liteflowConfig.getRuleSourceExtData()) + && MapUtil.isEmpty(liteflowConfig.getRuleSourceExtDataMap())) { + if (FlowBus.getChainMap().isEmpty()) { + String errMsg = StrUtil.format("no valid rule config found in rule path [{}]", + liteflowConfig.getRuleSource()); + throw new ConfigErrorException(errMsg); + } + } + + // 执行钩子 + if (isStart) { + FlowInitHook.executeHook(); + } + + // 文件监听 + if (isStart && liteflowConfig.getEnableMonitorFile()) { + try { + addMonitorFilePaths(rulePathList); + MonitorFile.getInstance().create(); + } + catch (Exception e) { + String errMsg = StrUtil.format("file monitor init error for path:{}", rulePathList); + throw new MonitorFileInitErrorException(errMsg); + } + + } + } + + // 此方法就是从原有的配置源主动拉取新的进行刷新 + // 和FlowBus.refreshFlowMetaData的区别就是一个为主动拉取,一个为被动监听到新的内容进行刷新 + public void reloadRule() { + long start = System.currentTimeMillis(); + init(false); + LOG.info("reload rules takes {}ms", System.currentTimeMillis() - start); + } + + // 单独调用某一个node + @Deprecated + public void invoke(String nodeId, Integer slotIndex) throws Exception { + Node node = FlowBus.getNode(nodeId); + node.execute(slotIndex); + } + + // 调用一个流程并返回LiteflowResponse,上下文为默认的DefaultContext,初始参数为null + public LiteflowResponse execute2Resp(String chainId) { + return this.execute2Resp(chainId, null, DefaultContext.class); + } + + // 调用一个流程并返回LiteflowResponse,上下文为默认的DefaultContext + public LiteflowResponse execute2Resp(String chainId, Object param) { + return this.execute2Resp(chainId, param, DefaultContext.class); + } + + // 调用一个流程并返回LiteflowResponse,允许多上下文的传入 + public LiteflowResponse execute2Resp(String chainId, Object param, Class... contextBeanClazzArray) { + return this.execute2Resp(chainId, param, null, contextBeanClazzArray, null); + } + + /** + * 直接执行 EL 表达式 + * + * @param elStr EL 表达式 + * @return LiteflowResponse + * @throws ELParseException + */ + public LiteflowResponse execute2RespWithEL(String elStr) throws Exception { + return this.execute2RespWithEL(elStr, null, null, DefaultContext.class); + } + + /** + * 直接执行 EL 表达式 + * + * @param elStr EL 表达式 + * @param param 入参 + * @return LiteflowResponse + * @throws ELParseException + */ + public LiteflowResponse execute2RespWithEL(String elStr, Object param) throws Exception { + return this.execute2RespWithEL(elStr, param, null, DefaultContext.class); + } + + /** + * 直接执行 EL 表达式 + * + * @param elStr EL 表达式 + * @param param 入参 + * @param requestId 请求 ID + * @param contextBeanClazzArray 上下文 Class + * @return LiteflowResponse + * @throws ELParseException + */ + public LiteflowResponse execute2RespWithEL(String elStr, Object param, String requestId, Class... contextBeanClazzArray) throws Exception { + return this.execute2RespWithEL(elStr, param, requestId, contextBeanClazzArray, null); + } + + /** + * 直接执行 EL 表达式 + * + * @param elStr EL 表达式 + * @param param 入参 + * @param requestId 请求 ID + * @param contextBeanArray 上下文对象 + * @return LiteflowResponse + * @throws ELParseException + */ + public LiteflowResponse execute2RespWithEL(String elStr, Object param, String requestId, Object... contextBeanArray) throws Exception { + return this.execute2RespWithEL(elStr, param, requestId, null, contextBeanArray); + } + + /** + * 直接执行 EL 表达式 + * + * @param elStr EL 表达式 + * @param param 入参 + * @param requestId 请求 ID + * @param contextBeanClazzArray 上下文 Class 数组 + * @param contextBeanArray 上下文对象数组 + * @return LiteflowResponse + * @throws ELParseException + */ + private LiteflowResponse execute2RespWithEL(String elStr, Object param, String requestId, Class[] contextBeanClazzArray, Object[] contextBeanArray) throws Exception { + // 规范化 el 表达式 + String normalizedEl = ElRegexUtil.normalize(elStr); + + // 校验 EL 是否正常 + ValidationResp validationResp = LiteFlowChainELBuilder.validateWithEx(normalizedEl); + + if (!validationResp.isSuccess()) { + // 实际封装的是 ELParseException 类型 + throw validationResp.getCause(); + } + + // 计算 EL MD5 值,并检查对应的 chain 是否已加载到内存中 + String elMd5 = MD5.create().digestHex(normalizedEl); + + String chainId; + + if (StrUtil.isEmpty(chainId = FlowBus.getChainIdByElMd5(elMd5))) { + // 调用表达式构造 chain,并且返回 UUID 作为 chainId + chainId = IdUtil.fastSimpleUUID(); + LiteFlowChainELBuilder.createChain() + .setChainId(chainId) + .setEL(normalizedEl) + .setElMd5(elMd5) + .build(); + } + + return this.execute2Resp(chainId, param, requestId, contextBeanClazzArray, contextBeanArray); + } + + public List executeRouteChain(Object param, Class... contextBeanClazzArray){ + return this.executeWithRoute(null, param, null, contextBeanClazzArray, null); + } + + public List executeRouteChain(String namespace, Object param, Class... contextBeanClazzArray){ + return this.executeWithRoute(namespace, param, null, contextBeanClazzArray, null); + } + + public LiteflowResponse execute2Resp(String chainId, Object param, Object... contextBeanArray) { + return this.execute2Resp(chainId, param, null, null, contextBeanArray); + } + + public List executeRouteChain(Object param, Object... contextBeanArray){ + return this.executeWithRoute(null, param, null, null, contextBeanArray); + } + + public List executeRouteChain(String namespace, Object param, Object... contextBeanArray){ + return this.executeWithRoute(namespace, param, null, null, contextBeanArray); + } + + public LiteflowResponse execute2RespWithRid(String chainId, Object param, String requestId, Class... contextBeanClazzArray) { + return this.execute2Resp(chainId, param, requestId, contextBeanClazzArray, null); + } + + public List executeRouteChainWithRid(Object param, String requestId, Class... contextBeanClazzArray) { + return this.executeWithRoute(null, param, requestId, contextBeanClazzArray, null); + } + + public List executeRouteChainWithRid(String namespace, Object param, String requestId, Class... contextBeanClazzArray) { + return this.executeWithRoute(namespace, param, requestId, contextBeanClazzArray, null); + } + + public LiteflowResponse execute2RespWithRid(String chainId, Object param, String requestId, Object... contextBeanArray) { + return this.execute2Resp(chainId, param, requestId, null, contextBeanArray); + } + + public List executeRouteChainWithRid(Object param, String requestId, Object... contextBeanArray) { + return this.executeWithRoute(null, param, requestId, null, contextBeanArray); + } + + public List executeRouteChainWithRid(String namespace, Object param, String requestId, Object... contextBeanArray) { + return this.executeWithRoute(namespace, param, requestId, null, contextBeanArray); + } + + // 调用一个流程并返回Future,允许多上下文的传入 + public Future execute2Future(String chainId, Object param, Class... contextBeanClazzArray) { + return ExecutorHelper.loadInstance() + .buildMainExecutor(liteflowConfig.getMainExecutorClass()) + .submit(() -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazzArray)); + } + + public Future execute2Future(String chainId, Object param, Object... contextBeanArray) { + return ExecutorHelper.loadInstance() + .buildMainExecutor(liteflowConfig.getMainExecutorClass()) + .submit(() -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanArray)); + } + + public Future execute2FutureWithRid(String chainId, Object param, String requestId, Class... contextBeanClazzArray) { + return ExecutorHelper.loadInstance() + .buildMainExecutor(liteflowConfig.getMainExecutorClass()) + .submit(() -> FlowExecutorHolder.loadInstance().execute2RespWithRid(chainId, param, requestId, contextBeanClazzArray)); + } + + public Future execute2FutureWithRid(String chainId, Object param, String requestId, Object... contextBeanArray) { + return ExecutorHelper.loadInstance() + .buildMainExecutor(liteflowConfig.getMainExecutorClass()) + .submit(() -> FlowExecutorHolder.loadInstance().execute2RespWithRid(chainId, param, requestId, contextBeanArray)); + } + + // 调用一个流程,返回默认的上下文,适用于简单的调用 + @Deprecated + public DefaultContext execute(String chainId, Object param) throws Exception { + LiteflowResponse response = this.execute2Resp(chainId, param, DefaultContext.class); + if (!response.isSuccess()) { + throw response.getCause(); + } + else { + return response.getFirstContextBean(); + } + } + + private LiteflowResponse execute2Resp(String chainId, Object param, String requestId, Class[] contextBeanClazzArray, + Object[] contextBeanArray) { + Slot slot = doExecute(chainId, param, requestId, contextBeanClazzArray, contextBeanArray, ChainExecuteModeEnum.BODY); + return LiteflowResponse.newMainResponse(slot); + } + + private List executeWithRoute(String namespace, Object param, String requestId, Class[] contextBeanClazzArray, Object[] contextBeanArray){ + List slotList = doExecuteWithRoute(namespace, param, requestId, contextBeanClazzArray, contextBeanArray); + return slotList.stream().map(LiteflowResponse::newMainResponse).collect(Collectors.toList()); + } + + private Slot doExecute(String chainId, Object param, String requestId, Class[] contextBeanClazzArray, Object[] contextBeanArray, + ChainExecuteModeEnum chainExecuteModeEnum) { + if (FlowBus.needInit()) { + init(true); + } + + Integer slotIndex; + // 这里可以根据class分配,也可以根据bean去分配 + if (ArrayUtil.isNotEmpty(contextBeanClazzArray)) { + slotIndex = DataBus.offerSlotByClass(ListUtil.toList(contextBeanClazzArray)); + } + else { + slotIndex = DataBus.offerSlotByBean(ListUtil.toList(contextBeanArray)); + } + + if (slotIndex == -1) { + throw new NoAvailableSlotException("there is no available slot"); + } + + Slot slot = DataBus.getSlot(slotIndex); + if (ObjectUtil.isNull(slot)) { + throw new NoAvailableSlotException(StrUtil.format("the slot[{}] is not exist", slotIndex)); + } + + // 如果有FlowExecute生命周期实现,则执行 + if (CollUtil.isNotEmpty(LifeCycleHolder.getPostProcessFlowExecuteLifeCycleList())){ + LifeCycleHolder.getPostProcessFlowExecuteLifeCycleList().forEach( + postProcessFlowExecuteLifeCycle -> postProcessFlowExecuteLifeCycle.postProcessBeforeFlowExecute(chainId, slot) + ); + } + + //如果传入了用户的RequestId,则用这个请求Id,如果没传入,则进行生成 + if (StrUtil.isNotBlank(requestId)){ + slot.putRequestId(requestId); + LFLoggerManager.setRequestId(requestId); + }else if(StrUtil.isBlank(slot.getRequestId())){ + slot.generateRequestId(); + LFLoggerManager.setRequestId(slot.getRequestId()); + LOG.info("requestId has generated"); + } + + LOG.info("slot[{}] offered", slotIndex); + + if (ObjectUtil.isNotNull(param)) { + slot.setRequestData(param); + } + + Chain chain = null; + try { + chain = FlowBus.getChain(chainId); + + if (ObjectUtil.isNull(chain)) { + String errorMsg = StrUtil.format("couldn't find chain with the id[{}]", chainId); + throw new ChainNotFoundException(errorMsg); + } + // 根据chain执行模式执行chain + if (chainExecuteModeEnum.equals(ChainExecuteModeEnum.BODY)){ + chain.execute(slotIndex); + }else if(chainExecuteModeEnum.equals(ChainExecuteModeEnum.ROUTE)){ + chain.executeRoute(slotIndex); + }else{ + throw new LiteFlowException("chain execute mode error"); + } + } + catch (ChainEndException e) { + if (ObjectUtil.isNotNull(chain)) { + String warnMsg = StrUtil.format("chain[{}] execute end on slot[{}]", chain.getChainId(), slotIndex); + LOG.warn(warnMsg); + } + } + catch (Exception e) { + if (ObjectUtil.isNotNull(chain)) { + String errMsg = StrUtil.format("chain[{}] execute error on slot[{}]", chain.getChainId(), slotIndex); + LOG.error(errMsg, e); + } + else { + LOG.error(e.getMessage(), e); + } + + slot.setException(e); + Deque executeSteps = slot.getExecuteSteps(); + try { + Iterator cmpStepIterator = executeSteps.descendingIterator(); + while(cmpStepIterator.hasNext()) { + CmpStep cmpStep = cmpStepIterator.next(); + if(cmpStep.getInstance().isRollback()) { + Rollbackable rollbackItem = cmpStep.getRefNode(); + rollbackItem.rollback(slotIndex); + } + } + } catch (Exception exception) { + LOG.error(exception.getMessage()); + } + finally { + slot.printRollbackStep(); + } + } + finally { + slot.printStep(); + DataBus.releaseSlot(slotIndex); + LFLoggerManager.removeRequestId(); + + // 如果有FlowExecute生命周期实现,则执行 + if (CollUtil.isNotEmpty(LifeCycleHolder.getPostProcessFlowExecuteLifeCycleList())){ + LifeCycleHolder.getPostProcessFlowExecuteLifeCycleList().forEach( + postProcessFlowExecuteLifeCycle -> postProcessFlowExecuteLifeCycle.postProcessAfterFlowExecute(chainId, slot) + ); + } + } + return slot; + } + + public LiteflowConfig getLiteflowConfig() { + return liteflowConfig; + } + + public void setLiteflowConfig(LiteflowConfig liteflowConfig) { + this.liteflowConfig = liteflowConfig; + // 把liteFlowConfig设到LiteFlowGetter中去 + LiteflowConfigGetter.setLiteflowConfig(liteflowConfig); + } + + /** + * 添加监听文件路径 + * @param pathList 文件路径 + */ + private void addMonitorFilePaths(List pathList) throws Exception { + // 添加规则文件监听 + List fileAbsolutePath = PathContentParserHolder.loadContextAware().getFileAbsolutePath(pathList); + MonitorFile.getInstance().addMonitorFilePaths(fileAbsolutePath); + } + + private List doExecuteWithRoute(String namespace, Object param, String requestId, Class[] contextBeanClazzArray, Object[] contextBeanArray){ + if (FlowBus.needInit()) { + init(true); + } + + if (StrUtil.isBlank(namespace)){ + namespace = ChainConstant.DEFAULT_NAMESPACE; + } + + String finalNamespace = namespace; + List routeChainList = FlowBus.getChainMap().values().stream() + .filter(chain -> chain.getNamespace().equals(finalNamespace)) + .filter(chain -> chain.getRouteItem() != null).collect(Collectors.toList()); + + if (CollUtil.isEmpty(routeChainList)){ + String errorMsg = StrUtil.format("no route found for namespace[{}]", finalNamespace); + throw new RouteChainNotFoundException(errorMsg); + } + + String finalRequestId; + if (StrUtil.isBlank(requestId)){ + finalRequestId = IdGeneratorHolder.getInstance().generate(); + }else{ + finalRequestId = requestId; + } + + // 异步执行route el + List routeTupleList = new ArrayList<>(); + for (Chain routeChain : routeChainList){ + CompletableFuture f = CompletableFuture.supplyAsync( () -> doExecute(routeChain.getChainId(), param, finalRequestId, contextBeanClazzArray, contextBeanArray, ChainExecuteModeEnum.ROUTE), - ExecutorHelper.loadInstance().buildWhenExecutor() - ); + ExecutorHelper.loadInstance().buildWhenExecutor() + ); - routeTupleList.add(new Tuple(routeChain, f)); - } + routeTupleList.add(new Tuple(routeChain, f)); + } - CompletableFuture resultRouteCf = CompletableFuture.allOf(routeTupleList.stream().map( - (Function>) tuple -> tuple.get(1) - ).collect(Collectors.toList()).toArray(new CompletableFuture[] {})); - try{ - resultRouteCf.get(); - }catch (Exception e){ - throw new LiteFlowException("There is An error occurred while executing the route.", e); - } + CompletableFuture resultRouteCf = CompletableFuture.allOf(routeTupleList.stream().map( + (Function>) tuple -> tuple.get(1) + ).collect(Collectors.toList()).toArray(new CompletableFuture[] {})); + try{ + resultRouteCf.get(); + }catch (Exception e){ + throw new LiteFlowException("There is An error occurred while executing the route.", e); + } - // 把route执行为true都过滤出来 - List matchedRouteChainList = routeTupleList.stream().filter(tuple -> { - try{ - CompletableFuture f = tuple.get(1); - Slot slot = f.get(); - return BooleanUtil.isTrue(slot.getRouteResult()); - }catch (Exception e){ - return false; - } - }).map( - (Function) tuple -> tuple.get(0) - ).collect(Collectors.toList()); + // 把route执行为true都过滤出来 + List matchedRouteChainList = routeTupleList.stream().filter(tuple -> { + try{ + CompletableFuture f = tuple.get(1); + Slot slot = f.get(); + return BooleanUtil.isTrue(slot.getRouteResult()); + }catch (Exception e){ + return false; + } + }).map( + (Function) tuple -> tuple.get(0) + ).collect(Collectors.toList()); - if (CollUtil.isEmpty(matchedRouteChainList)){ - throw new NoMatchedRouteChainException("there is no matched route chain"); - } + if (CollUtil.isEmpty(matchedRouteChainList)){ + throw new NoMatchedRouteChainException("there is no matched route chain"); + } - // 异步分别执行这些chain - List> executeChainCfList = new ArrayList<>(); - for (Chain chain : matchedRouteChainList){ - CompletableFuture cf = CompletableFuture.supplyAsync( - () -> doExecute(chain.getChainId(), param, finalRequestId, contextBeanClazzArray, contextBeanArray, ChainExecuteModeEnum.BODY), - ExecutorHelper.loadInstance().buildWhenExecutor() - ); - executeChainCfList.add(cf); - } + // 异步分别执行这些chain + List> executeChainCfList = new ArrayList<>(); + for (Chain chain : matchedRouteChainList){ + CompletableFuture cf = CompletableFuture.supplyAsync( + () -> doExecute(chain.getChainId(), param, finalRequestId, contextBeanClazzArray, contextBeanArray, ChainExecuteModeEnum.BODY), + ExecutorHelper.loadInstance().buildWhenExecutor() + ); + executeChainCfList.add(cf); + } - CompletableFuture resultChainCf = CompletableFuture.allOf(executeChainCfList.toArray(new CompletableFuture[] {})); - try{ - resultChainCf.get(); - }catch (Exception e){ - throw new LiteFlowException("There is An error occurred while executing the matched chain.", e); - } + CompletableFuture resultChainCf = CompletableFuture.allOf(executeChainCfList.toArray(new CompletableFuture[] {})); + try{ + resultChainCf.get(); + }catch (Exception e){ + throw new LiteFlowException("There is An error occurred while executing the matched chain.", e); + } - List resultSlotList = executeChainCfList.stream().map(slotCompletableFuture -> { - try{ - return slotCompletableFuture.get(); - }catch (Exception e){ - return null; - } - }).filter(Objects::nonNull).collect(Collectors.toList()); + List resultSlotList = executeChainCfList.stream().map(slotCompletableFuture -> { + try{ + return slotCompletableFuture.get(); + }catch (Exception e){ + return null; + } + }).filter(Objects::nonNull).collect(Collectors.toList()); - LOG.info("chain namespace:[{}], total size:[{}], matched size:[{}]", namespace, routeChainList.size(), resultSlotList.size()); + LOG.info("chain namespace:[{}], total size:[{}], matched size:[{}]", namespace, routeChainList.size(), resultSlotList.size()); - return resultSlotList; - } + return resultSlotList; + } } \ No newline at end of file