mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 12:12:08 +08:00
!50 调整线程构建者获取方式和调整pre节点执行位置
Merge pull request !50 from sikadai/v2.6.11
This commit is contained in:
@@ -13,9 +13,15 @@ import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.ReUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.yomahub.liteflow.entity.data.DataBus;
|
||||
import com.yomahub.liteflow.entity.data.DefaultSlot;
|
||||
import com.yomahub.liteflow.entity.data.LiteflowResponse;
|
||||
import com.yomahub.liteflow.entity.data.Slot;
|
||||
import com.yomahub.liteflow.entity.flow.Chain;
|
||||
import com.yomahub.liteflow.entity.flow.Node;
|
||||
import com.yomahub.liteflow.enums.FlowParserTypeEnum;
|
||||
import com.yomahub.liteflow.exception.*;
|
||||
import com.yomahub.liteflow.flow.FlowBus;
|
||||
import com.yomahub.liteflow.parser.*;
|
||||
import com.yomahub.liteflow.property.LiteflowConfig;
|
||||
import com.yomahub.liteflow.property.LiteflowConfigGetter;
|
||||
@@ -23,15 +29,6 @@ import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.yomahub.liteflow.entity.flow.Chain;
|
||||
import com.yomahub.liteflow.entity.data.DataBus;
|
||||
import com.yomahub.liteflow.entity.data.DefaultSlot;
|
||||
import com.yomahub.liteflow.entity.data.LiteflowResponse;
|
||||
import com.yomahub.liteflow.entity.data.Slot;
|
||||
import com.yomahub.liteflow.flow.FlowBus;
|
||||
import com.yomahub.liteflow.parser.LocalXmlFlowParser;
|
||||
import com.yomahub.liteflow.parser.XmlFlowParser;
|
||||
import com.yomahub.liteflow.parser.ZookeeperXmlFlowParser;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@@ -386,7 +383,8 @@ public class FlowExecutor {
|
||||
String errorMsg = StrUtil.format("[{}]:couldn't find chain with the id[{}]", slot.getRequestId(), chainId);
|
||||
throw new ChainNotFoundException(errorMsg);
|
||||
}
|
||||
|
||||
// 执行前置
|
||||
chain.executePre(slotIndex);
|
||||
// 执行chain
|
||||
chain.execute(slotIndex);
|
||||
} catch (ChainEndException e) {
|
||||
|
||||
@@ -24,8 +24,13 @@ import com.yomahub.liteflow.property.LiteflowConfigGetter;
|
||||
import com.yomahub.liteflow.thread.ExecutorHelper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@@ -74,15 +79,8 @@ public class Chain implements Executable {
|
||||
if (CollUtil.isEmpty(conditionList)) {
|
||||
throw new FlowSystemException("no conditionList in this chain[" + chainName + "]");
|
||||
}
|
||||
|
||||
//循环chain里包含的condition,每一个condition分四种类型:pre,then,when,finally
|
||||
//这里conditionList其实已经是有序的,pre一定在最前面,finally一定在最后面
|
||||
for (Condition condition : conditionList) {
|
||||
if (condition instanceof PreCondition){
|
||||
for (Executable executableItem : condition.getNodeList()) {
|
||||
executableItem.execute(slotIndex);
|
||||
}
|
||||
} else if (condition instanceof ThenCondition) {
|
||||
if (condition instanceof ThenCondition) {
|
||||
for (Executable executableItem : condition.getNodeList()) {
|
||||
executableItem.execute(slotIndex);
|
||||
}
|
||||
@@ -92,17 +90,33 @@ public class Chain implements Executable {
|
||||
}
|
||||
}
|
||||
|
||||
// 执行pre节点
|
||||
public void executePre(Integer slotIndex) throws Exception {
|
||||
doExecuteForPointConditionType(slotIndex, ConditionTypeEnum.TYPE_PRE);
|
||||
}
|
||||
|
||||
public void executeFinally(Integer slotIndex) throws Exception {
|
||||
//先把finally的节点过滤出来
|
||||
List<Condition> finallyConditionList = conditionList.stream().filter(condition ->
|
||||
condition.getConditionType().equals(ConditionTypeEnum.TYPE_FINALLY)).collect(Collectors.toList());
|
||||
for (Condition finallyCondition : finallyConditionList){
|
||||
for(Executable executableItem : finallyCondition.getNodeList()){
|
||||
doExecuteForPointConditionType(slotIndex, ConditionTypeEnum.TYPE_FINALLY);
|
||||
}
|
||||
|
||||
// 执行指定的conditionType的节点
|
||||
private void doExecuteForPointConditionType(Integer slotIndex, ConditionTypeEnum conditionTypeEnum) throws Exception {
|
||||
//先把指定condition类型的节点过滤出来
|
||||
List<Condition> conditions =filterCondition(conditionTypeEnum);
|
||||
for (Condition condition : conditions){
|
||||
for(Executable executableItem : condition.getNodeList()){
|
||||
executableItem.execute(slotIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 根据节点condition类型过去出节点列表
|
||||
private List<Condition> filterCondition(ConditionTypeEnum conditionTypeEnum) {
|
||||
assert conditionTypeEnum != null;
|
||||
return conditionList.stream().filter(condition ->
|
||||
condition.getConditionType().equals(conditionTypeEnum)).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecuteTypeEnum getExecuteType() {
|
||||
return ExecuteTypeEnum.CHAIN;
|
||||
|
||||
@@ -14,6 +14,7 @@ import com.google.common.collect.Maps;
|
||||
import com.yomahub.liteflow.exception.ThreadExecutorServiceCreateException;
|
||||
import com.yomahub.liteflow.property.LiteflowConfig;
|
||||
import com.yomahub.liteflow.property.LiteflowConfigGetter;
|
||||
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -116,14 +117,15 @@ public class ExecutorHelper {
|
||||
* 根据线程执行构建者Class类名获取ExecutorBuilder实例
|
||||
* </p>
|
||||
*
|
||||
* @param threadExecutorClass
|
||||
* @param threadExecutorClass 线程执行class全量名
|
||||
* @return com.yomahub.liteflow.thread.ExecutorBuilder
|
||||
* @author sikadai
|
||||
* @date 2022/1/21 23:04
|
||||
*/
|
||||
private ExecutorBuilder getExecutorBuilder(String threadExecutorClass) {
|
||||
try {
|
||||
return (ExecutorBuilder) Class.forName(threadExecutorClass).newInstance();
|
||||
Class<ExecutorBuilder> executorClass = (Class<ExecutorBuilder>) Class.forName(threadExecutorClass);
|
||||
return ContextAwareHolder.loadContextAware().registerBean(executorClass);
|
||||
} catch (Exception e) {
|
||||
LOG.error(e.getMessage(), e);
|
||||
throw new ThreadExecutorServiceCreateException(e.getMessage());
|
||||
|
||||
Reference in New Issue
Block a user