From 717b03f6c3cfa50df9cabf92befc69aa137a47c3 Mon Sep 17 00:00:00 2001 From: sikadai <466608943@qq.com> Date: Thu, 20 Jan 2022 21:03:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81when=E7=BB=84=E4=BB=B6?= =?UTF-8?q?=E7=BB=B4=E5=BA=A6=E4=BD=BF=E7=94=A8=E7=BA=BF=E7=A8=8B=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../builder/LiteFlowWhenConditionBuilder.java | 9 +++ .../yomahub/liteflow/entity/flow/Chain.java | 3 +- .../liteflow/entity/flow/Condition.java | 10 ++++ .../liteflow/entity/flow/WhenCondition.java | 1 + .../liteflow/parser/JsonFlowParser.java | 9 +-- .../liteflow/parser/XmlFlowParser.java | 11 +--- .../liteflow/thread/ExecutorHelper.java | 56 +++++++++++++------ .../test/builder/BuilderSpringbootTest.java | 14 ++++- .../CustomThreadExecutor1.java | 38 +++++++++++++ .../CustomThreadExecutor2.java | 37 ++++++++++++ 10 files changed, 153 insertions(+), 35 deletions(-) create mode 100644 liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customThreadPool/CustomThreadExecutor1.java create mode 100644 liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customThreadPool/CustomThreadExecutor2.java diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/LiteFlowWhenConditionBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/LiteFlowWhenConditionBuilder.java index c61a1dda5..5e71ec8dd 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/LiteFlowWhenConditionBuilder.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/LiteFlowWhenConditionBuilder.java @@ -48,4 +48,13 @@ public class LiteFlowWhenConditionBuilder extends LiteFlowConditionBuilder{ } return setAny(Boolean.parseBoolean(any)); } + + + public LiteFlowWhenConditionBuilder setThreadExecutorClass(String executorServiceName){ + if (StrUtil.isBlank(executorServiceName)) { + return this; + } + this.condition.setThreadExecutorClass(executorServiceName); + return this; + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java index 2c85a4623..b30c5f549 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java @@ -10,7 +10,6 @@ package com.yomahub.liteflow.entity.flow; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; -import com.alibaba.ttl.threadpool.TtlExecutors; import com.yomahub.liteflow.entity.data.DataBus; import com.yomahub.liteflow.entity.data.Slot; import com.yomahub.liteflow.entity.flow.parallel.CompletableFutureTimeout; @@ -120,7 +119,7 @@ public class Chain implements Executable { Slot slot = DataBus.getSlot(slotIndex); //此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的 - ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutor(); + ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutor(condition.getThreadExecutorClass()); //获得liteflow的参数 LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Condition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Condition.java index 6dbbeb23e..07cf4ac53 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Condition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Condition.java @@ -31,6 +31,8 @@ public class Condition { //只在when类型下有效,为true的话说明在多个并行节点下,任意一个成功,整个when就成功 private boolean any = false; + // when单独的线程池名称 + private String threadExecutorClass; public Condition(List nodeList) { this.nodeList = nodeList; @@ -77,4 +79,12 @@ public class Condition { public void setAny(boolean any) { this.any = any; } + + public String getThreadExecutorClass() { + return threadExecutorClass; + } + + public void setThreadExecutorClass(String threadExecutorClass) { + this.threadExecutorClass = threadExecutorClass; + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java index 4199b5bf0..507541966 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java @@ -19,6 +19,7 @@ public class WhenCondition extends Condition{ super.setGroup(condition.getGroup()); super.setErrorResume(condition.isErrorResume()); super.setAny(condition.isAny()); + super.setThreadExecutorClass(condition.getThreadExecutorClass()); } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/JsonFlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/JsonFlowParser.java index 11e3e751c..48de2c2e2 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/JsonFlowParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/JsonFlowParser.java @@ -2,7 +2,6 @@ package com.yomahub.liteflow.parser; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.ListUtil; -import cn.hutool.core.io.resource.ResourceUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONArray; @@ -11,23 +10,18 @@ import com.alibaba.fastjson.parser.Feature; import com.yomahub.liteflow.builder.LiteFlowChainBuilder; import com.yomahub.liteflow.builder.LiteFlowConditionBuilder; import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; -import com.yomahub.liteflow.common.LocalDefaultFlowConstant; import com.yomahub.liteflow.core.NodeComponent; -import com.yomahub.liteflow.entity.flow.*; import com.yomahub.liteflow.enums.ConditionTypeEnum; import com.yomahub.liteflow.enums.NodeTypeEnum; import com.yomahub.liteflow.exception.EmptyConditionValueException; -import com.yomahub.liteflow.exception.ExecutableItemNotFoundException; import com.yomahub.liteflow.exception.NodeTypeNotSupportException; import com.yomahub.liteflow.exception.NotSupportConditionException; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.spring.ComponentScanner; -import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -import java.util.function.Consumer; /** * Json格式解析器 @@ -136,6 +130,7 @@ public abstract class JsonFlowParser extends FlowParser { String group; String errorResume; String any; + String threadExecutorClass; //构建chainBuilder String chainName = chainObject.getString("name"); @@ -148,6 +143,7 @@ public abstract class JsonFlowParser extends FlowParser { errorResume = condObject.getString("errorResume"); group = condObject.getString("group"); any = condObject.getString("any"); + threadExecutorClass = condObject.getString("threadExecutorClass"); if (ObjectUtil.isNull(conditionType)){ throw new NotSupportConditionException("ConditionType is not supported"); @@ -165,6 +161,7 @@ public abstract class JsonFlowParser extends FlowParser { .setErrorResume(errorResume) .setGroup(group) .setAny(any) + .setThreadExecutorClass(threadExecutorClass) .setValue(condValueStr) .build() ).build(); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java index 63986d3a1..26e74963c 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java @@ -2,18 +2,12 @@ package com.yomahub.liteflow.parser; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.ListUtil; -import cn.hutool.core.io.resource.ResourceUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.builder.LiteFlowChainBuilder; import com.yomahub.liteflow.builder.LiteFlowConditionBuilder; import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; -import com.yomahub.liteflow.common.LocalDefaultFlowConstant; import com.yomahub.liteflow.core.NodeComponent; -import com.yomahub.liteflow.entity.flow.Chain; -import com.yomahub.liteflow.entity.flow.Condition; -import com.yomahub.liteflow.entity.flow.Executable; -import com.yomahub.liteflow.entity.flow.Node; import com.yomahub.liteflow.enums.ConditionTypeEnum; import com.yomahub.liteflow.enums.NodeTypeEnum; import com.yomahub.liteflow.exception.*; @@ -25,11 +19,9 @@ import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; -import java.util.function.Consumer; /** * xml形式的解析器 @@ -127,6 +119,7 @@ public abstract class XmlFlowParser extends FlowParser { String group; String errorResume; String any; + String threadExecutorClass; ConditionTypeEnum conditionType; //构建chainBuilder @@ -140,6 +133,7 @@ public abstract class XmlFlowParser extends FlowParser { errorResume = condE.attributeValue("errorResume"); group = condE.attributeValue("group"); any = condE.attributeValue("any"); + threadExecutorClass = condE.attributeValue("threadExecutorClass"); if (ObjectUtil.isNull(conditionType)){ throw new NotSupportConditionException("ConditionType is not supported"); @@ -156,6 +150,7 @@ public abstract class XmlFlowParser extends FlowParser { .setErrorResume(errorResume) .setGroup(group) .setAny(any) + .setThreadExecutorClass(threadExecutorClass) .setValue(condValueStr) .build() ).build(); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java index cf9a78822..d49c65930 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java @@ -1,6 +1,7 @@ /** *

Title: liteflow

*

Description: 轻量级的组件式流程框架

+ * * @author Bryan.Zhang * @email weenyc31@163.com * @Date 2020/4/1 @@ -8,31 +9,39 @@ package com.yomahub.liteflow.thread; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.google.common.collect.Maps; import com.yomahub.liteflow.exception.ThreadExecutorServiceCreateException; import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.util.SpringAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.util.Map; import java.util.concurrent.*; /** * 线程池工具类 + * * @author Bryan.Zhang */ public class ExecutorHelper { - + private final Logger LOG = LoggerFactory.getLogger(ExecutorHelper.class); private static ExecutorHelper executorHelper; private ExecutorService executorService; - + + private Map executorServiceMap; + private ExecutorHelper() { + executorServiceMap = Maps.newConcurrentMap(); } - public static ExecutorHelper loadInstance(){ - if (ObjectUtil.isNull(executorHelper)){ + public static ExecutorHelper loadInstance() { + if (ObjectUtil.isNull(executorHelper)) { executorHelper = new ExecutorHelper(); } return executorHelper; @@ -56,7 +65,7 @@ public class ExecutorHelper { * @param timeout 等待时间 */ public void shutdownAwaitTermination(ExecutorService pool, - long timeout) { + long timeout) { pool.shutdown(); try { if (!pool.awaitTermination(timeout, TimeUnit.SECONDS)) { @@ -72,22 +81,37 @@ public class ExecutorHelper { } public ExecutorService buildExecutor() { - if (ObjectUtil.isNull(executorService)){ + if (ObjectUtil.isNull(executorService)) { LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); - - try{ - assert liteflowConfig != null; - ExecutorBuilder executorBuilder = (ExecutorBuilder)Class.forName(liteflowConfig.getThreadExecutorClass()).newInstance(); - executorService = executorBuilder.buildExecutor(); - }catch (Exception e){ - LOG.error(e.getMessage(), e); - throw new ThreadExecutorServiceCreateException(e.getMessage()); - } - + assert liteflowConfig != null; + executorService = buildExecutor(liteflowConfig.getThreadExecutorClass()); } return executorService; } + public ExecutorService buildExecutor(String threadExecutorClass) { + try { + if (StrUtil.isBlank(threadExecutorClass)) { + return buildExecutor(); + } + ExecutorService executorServiceFromCache = executorServiceMap.get(threadExecutorClass); + if (executorServiceFromCache != null) { + return executorServiceFromCache; + } else { + ExecutorService executorService = getExecutorBuilder(threadExecutorClass).buildExecutor(); + executorServiceMap.put(threadExecutorClass, executorService); + return executorService; + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new ThreadExecutorServiceCreateException(e.getMessage()); + } + } + + private ExecutorBuilder getExecutorBuilder(String threadExecutorClass) throws Exception { + return (ExecutorBuilder) Class.forName(threadExecutorClass).newInstance(); + } + public ExecutorService getExecutorService() { return executorService; } diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/builder/BuilderSpringbootTest.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/builder/BuilderSpringbootTest.java index aaa3a3894..b773b0354 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/builder/BuilderSpringbootTest.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/builder/BuilderSpringbootTest.java @@ -8,6 +8,8 @@ import com.yomahub.liteflow.entity.data.DefaultSlot; import com.yomahub.liteflow.entity.data.LiteflowResponse; import com.yomahub.liteflow.enums.NodeTypeEnum; import com.yomahub.liteflow.test.BaseTest; +import com.yomahub.liteflow.test.customThreadPool.CustomThreadExecutor1; +import com.yomahub.liteflow.test.customThreadPool.CustomThreadExecutor2; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -71,13 +73,19 @@ public class BuilderSpringbootTest extends BaseTest { LiteFlowChainBuilder.createChain().setChainName("chain2").setCondition( - LiteFlowConditionBuilder.createThenCondition().setValue("c,d").build() + LiteFlowConditionBuilder.createWhenCondition().setValue("c,d").build() ).build(); LiteFlowChainBuilder.createChain().setChainName("chain1").setCondition( - LiteFlowConditionBuilder.createThenCondition().setValue("a,b").build() + LiteFlowConditionBuilder + .createWhenCondition() + .setAny(true) + .setThreadExecutorClass(CustomThreadExecutor2.class.getName()) + .setValue("a,b").build() ).setCondition( - LiteFlowConditionBuilder.createWhenCondition().setValue("e(f|g|chain2)").build() + LiteFlowConditionBuilder.createWhenCondition() + .setThreadExecutorClass(CustomThreadExecutor1.class.getName()) + .setValue("e(f|g|chain2)").build() ).build(); LiteflowResponse response = flowExecutor.execute2Resp("chain1"); diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customThreadPool/CustomThreadExecutor1.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customThreadPool/CustomThreadExecutor1.java new file mode 100644 index 000000000..0cc24856c --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customThreadPool/CustomThreadExecutor1.java @@ -0,0 +1,38 @@ +package com.yomahub.liteflow.test.customThreadPool; + +import cn.hutool.core.util.ObjectUtil; +import com.alibaba.ttl.threadpool.TtlExecutors; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.thread.ExecutorBuilder; +import com.yomahub.liteflow.util.SpringAware; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +public class CustomThreadExecutor1 implements ExecutorBuilder { + + @Override + public ExecutorService buildExecutor() { + LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); + //只有在非spring的场景下liteflowConfig才会为null + if (ObjectUtil.isNull(liteflowConfig)) { + liteflowConfig = new LiteflowConfig(); + } + return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(), + liteflowConfig.getWhenMaxWorkers(), + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()), + new ThreadFactory() { + private final AtomicLong number = new AtomicLong(); + + @Override + public Thread newThread(Runnable r) { + Thread newThread = Executors.defaultThreadFactory().newThread(r); + newThread.setName("Customer-when-thead-" + number.getAndIncrement()); + newThread.setDaemon(false); + return newThread; + } + }, + new ThreadPoolExecutor.AbortPolicy())); + } +} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customThreadPool/CustomThreadExecutor2.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customThreadPool/CustomThreadExecutor2.java new file mode 100644 index 000000000..36de8f3ae --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/customThreadPool/CustomThreadExecutor2.java @@ -0,0 +1,37 @@ +package com.yomahub.liteflow.test.customThreadPool; + +import cn.hutool.core.util.ObjectUtil; +import com.alibaba.ttl.threadpool.TtlExecutors; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.thread.ExecutorBuilder; +import com.yomahub.liteflow.util.SpringAware; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +public class CustomThreadExecutor2 implements ExecutorBuilder { + @Override + public ExecutorService buildExecutor() { + LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); + //只有在非spring的场景下liteflowConfig才会为null + if (ObjectUtil.isNull(liteflowConfig)) { + liteflowConfig = new LiteflowConfig(); + } + return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(), + liteflowConfig.getWhenMaxWorkers(), + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()), + new ThreadFactory() { + private final AtomicLong number = new AtomicLong(); + + @Override + public Thread newThread(Runnable r) { + Thread newThread = Executors.defaultThreadFactory().newThread(r); + newThread.setName("Customer-when-222thead-" + number.getAndIncrement()); + newThread.setDaemon(false); + return newThread; + } + }, + new ThreadPoolExecutor.AbortPolicy())); + } +}