diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java index 3b2eb8e20..c0db15eb4 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java @@ -13,9 +13,15 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ReUtil; import cn.hutool.core.util.StrUtil; import com.google.common.collect.Lists; +import com.yomahub.liteflow.entity.data.DataBus; +import com.yomahub.liteflow.entity.data.DefaultSlot; +import com.yomahub.liteflow.entity.data.LiteflowResponse; +import com.yomahub.liteflow.entity.data.Slot; +import com.yomahub.liteflow.entity.flow.Chain; import com.yomahub.liteflow.entity.flow.Node; import com.yomahub.liteflow.enums.FlowParserTypeEnum; import com.yomahub.liteflow.exception.*; +import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.parser.*; import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.property.LiteflowConfigGetter; @@ -23,15 +29,6 @@ import com.yomahub.liteflow.spi.holder.ContextAwareHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.yomahub.liteflow.entity.flow.Chain; -import com.yomahub.liteflow.entity.data.DataBus; -import com.yomahub.liteflow.entity.data.DefaultSlot; -import com.yomahub.liteflow.entity.data.LiteflowResponse; -import com.yomahub.liteflow.entity.data.Slot; -import com.yomahub.liteflow.flow.FlowBus; -import com.yomahub.liteflow.parser.LocalXmlFlowParser; -import com.yomahub.liteflow.parser.XmlFlowParser; -import com.yomahub.liteflow.parser.ZookeeperXmlFlowParser; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -386,7 +383,8 @@ public class FlowExecutor { String errorMsg = StrUtil.format("[{}]:couldn't find chain with the id[{}]", slot.getRequestId(), chainId); throw new ChainNotFoundException(errorMsg); } - + // 执行前置 + chain.executePre(slotIndex); // 执行chain chain.execute(slotIndex); } catch (ChainEndException e) { diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java index b30c5f549..0f8267643 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java @@ -24,8 +24,13 @@ import com.yomahub.liteflow.property.LiteflowConfigGetter; import com.yomahub.liteflow.thread.ExecutorHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.concurrent.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -74,15 +79,8 @@ public class Chain implements Executable { if (CollUtil.isEmpty(conditionList)) { throw new FlowSystemException("no conditionList in this chain[" + chainName + "]"); } - - //循环chain里包含的condition,每一个condition分四种类型:pre,then,when,finally - //这里conditionList其实已经是有序的,pre一定在最前面,finally一定在最后面 for (Condition condition : conditionList) { - if (condition instanceof PreCondition){ - for (Executable executableItem : condition.getNodeList()) { - executableItem.execute(slotIndex); - } - } else if (condition instanceof ThenCondition) { + if (condition instanceof ThenCondition) { for (Executable executableItem : condition.getNodeList()) { executableItem.execute(slotIndex); } @@ -92,17 +90,33 @@ public class Chain implements Executable { } } + // 执行pre节点 + public void executePre(Integer slotIndex) throws Exception { + doExecuteForPointConditionType(slotIndex, ConditionTypeEnum.TYPE_PRE); + } + public void executeFinally(Integer slotIndex) throws Exception { - //先把finally的节点过滤出来 - List finallyConditionList = conditionList.stream().filter(condition -> - condition.getConditionType().equals(ConditionTypeEnum.TYPE_FINALLY)).collect(Collectors.toList()); - for (Condition finallyCondition : finallyConditionList){ - for(Executable executableItem : finallyCondition.getNodeList()){ + doExecuteForPointConditionType(slotIndex, ConditionTypeEnum.TYPE_FINALLY); + } + + // 执行指定的conditionType的节点 + private void doExecuteForPointConditionType(Integer slotIndex, ConditionTypeEnum conditionTypeEnum) throws Exception { + //先把指定condition类型的节点过滤出来 + List conditions =filterCondition(conditionTypeEnum); + for (Condition condition : conditions){ + for(Executable executableItem : condition.getNodeList()){ executableItem.execute(slotIndex); } } } + // 根据节点condition类型过去出节点列表 + private List filterCondition(ConditionTypeEnum conditionTypeEnum) { + assert conditionTypeEnum != null; + return conditionList.stream().filter(condition -> + condition.getConditionType().equals(conditionTypeEnum)).collect(Collectors.toList()); + } + @Override public ExecuteTypeEnum getExecuteType() { return ExecuteTypeEnum.CHAIN; diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java index 2bfca78cf..9bf23839b 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java @@ -14,6 +14,7 @@ import com.google.common.collect.Maps; import com.yomahub.liteflow.exception.ThreadExecutorServiceCreateException; import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.property.LiteflowConfigGetter; +import com.yomahub.liteflow.spi.holder.ContextAwareHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,14 +117,15 @@ public class ExecutorHelper { * 根据线程执行构建者Class类名获取ExecutorBuilder实例 *

* - * @param threadExecutorClass + * @param threadExecutorClass 线程执行class全量名 * @return com.yomahub.liteflow.thread.ExecutorBuilder * @author sikadai * @date 2022/1/21 23:04 */ private ExecutorBuilder getExecutorBuilder(String threadExecutorClass) { try { - return (ExecutorBuilder) Class.forName(threadExecutorClass).newInstance(); + Class executorClass = (Class) Class.forName(threadExecutorClass); + return ContextAwareHolder.loadContextAware().registerBean(executorClass); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new ThreadExecutorServiceCreateException(e.getMessage());