From c6e667a59511744fc62a4a24a6160af0c64cf59e Mon Sep 17 00:00:00 2001 From: bryan31 Date: Sat, 31 Jul 2021 22:37:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81,=E9=81=BF?= =?UTF-8?q?=E5=85=8D=E8=BF=87=E5=A4=9A=E7=9A=84=E7=B1=BB=E5=B1=9E=E6=80=A7?= =?UTF-8?q?=EF=BC=8C=E4=BC=9A=E9=80=A0=E6=88=90=E6=B5=8B=E8=AF=95=E7=94=A8?= =?UTF-8?q?=E4=BE=8B=E7=9A=84=E9=94=99=E8=AF=AF=EF=BC=8C=E5=9B=A0=E4=B8=BA?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E7=94=A8=E4=BE=8Bjvm=E5=8F=AA=E5=90=AF?= =?UTF-8?q?=E5=8A=A8=E4=B8=80=E9=81=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yomahub/liteflow/entity/data/DataBus.java | 10 +--- .../yomahub/liteflow/entity/flow/Chain.java | 29 +++--------- .../property/LiteflowConfigGetter.java | 22 +++++++++ .../yomahub/liteflow/util/ExecutorHelper.java | 47 ++++++++++++++----- .../util/LiteFlowExecutorPoolShutdown.java | 2 +- .../LiteflowExecutorAutoConfiguration.java | 4 +- 6 files changed, 68 insertions(+), 46 deletions(-) create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfigGetter.java diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java index 8051fc0bd..577b5e374 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java @@ -9,6 +9,7 @@ package com.yomahub.liteflow.entity.data; import cn.hutool.core.util.ObjectUtil; import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.property.LiteflowConfigGetter; import com.yomahub.liteflow.util.SpringAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,16 +35,9 @@ public class DataBus { private static ConcurrentLinkedQueue QUEUE; static { - LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); - - if (ObjectUtil.isNull(liteflowConfig)){ - //liteflowConfig有自己的默认值 - liteflowConfig = new LiteflowConfig(); - } + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); int slotSize = liteflowConfig.getSlotSize(); - SLOTS = new AtomicReferenceArray<>(slotSize); - QUEUE = IntStream.range(0, slotSize - 1).boxed().collect(Collectors.toCollection(ConcurrentLinkedQueue::new)); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java index 1b0d9f2db..d816be4b1 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java @@ -18,6 +18,7 @@ import com.yomahub.liteflow.exception.ConfigErrorException; import com.yomahub.liteflow.exception.FlowSystemException; import com.yomahub.liteflow.exception.WhenExecuteException; import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.property.LiteflowConfigGetter; import com.yomahub.liteflow.util.ExecutorHelper; import com.yomahub.liteflow.util.SpringAware; import org.slf4j.Logger; @@ -39,28 +40,6 @@ public class Chain implements Executable { private List conditionList; - private static ExecutorService parallelExecutor; - - private static LiteflowConfig liteflowConfig; - - static { - //这里liteflowConfig不可能为null - //如果在springboot环境,由于自动装配,所以不可能为null - //在spring环境,如果xml没配置,在FlowExecutor的init时候就已经报错了 - liteflowConfig = SpringAware.getBean(LiteflowConfig.class); - - //这里为了非spring环境下的严谨,还是判断 - if (ObjectUtil.isNull(liteflowConfig)){ - //liteflowConfig有自己的默认值 - liteflowConfig = new LiteflowConfig(); - } - - parallelExecutor = SpringAware.getBean(ExecutorService.class); - if (ObjectUtil.isNull(parallelExecutor)){ - parallelExecutor = ExecutorHelper.buildExecutor(liteflowConfig.getWhenMaxWorkers(), liteflowConfig.getWhenQueueLimit(), "liteflow-when-thread", false); - } - } - public Chain(String chainName, List conditionList) { this.chainName = chainName; this.conditionList = conditionList; @@ -89,6 +68,8 @@ public class Chain implements Executable { throw new FlowSystemException("no conditionList in this chain[" + chainName + "]"); } + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + Slot slot = DataBus.getSlot(slotIndex); //循环chain里包含的condition,每一个condition有可能是then,也有可能是when @@ -133,6 +114,10 @@ public class Chain implements Executable { final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size()); final List> futures = new ArrayList<>(condition.getNodeList().size()); + //此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的 + ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutor(); + + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); for (int i = 0; i < condition.getNodeList().size(); i++) { futures.add(parallelExecutor.submit( diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfigGetter.java b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfigGetter.java new file mode 100644 index 000000000..3e5b683cf --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfigGetter.java @@ -0,0 +1,22 @@ +package com.yomahub.liteflow.property; + +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.util.SpringAware; + +/** + * liteflow的配置获取器 + */ +public class LiteflowConfigGetter { + + public static LiteflowConfig get(){ + LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); + //这里liteflowConfig不可能为null + //如果在springboot环境,由于自动装配,所以不可能为null + //在spring环境,如果xml没配置,在FlowExecutor的init时候就已经报错了 + //只有在非spring环境下,是为null + if (ObjectUtil.isNull(liteflowConfig)){ + liteflowConfig = new LiteflowConfig(); + } + return liteflowConfig; + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java index 2212f67fa..cd06e527c 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java @@ -7,6 +7,8 @@ */ package com.yomahub.liteflow.util; +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.property.LiteflowConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,18 +22,29 @@ import java.util.concurrent.atomic.AtomicLong; */ public class ExecutorHelper { - private static final Logger LOG = LoggerFactory.getLogger(ExecutorHelper.class); + private final Logger LOG = LoggerFactory.getLogger(ExecutorHelper.class); + + private static ExecutorHelper executorHelper; + + private ExecutorService executorService; private ExecutorHelper() { } + public static ExecutorHelper loadInstance(){ + if (ObjectUtil.isNull(executorHelper)){ + executorHelper = new ExecutorHelper(); + } + return executorHelper; + } + /** * 使用默认的等待时间1分钟,来关闭目标线程组。 *

* * @param pool 需要关闭的线程组. */ - public static void shutdownAwaitTermination(ExecutorService pool) { + public void shutdownAwaitTermination(ExecutorService pool) { shutdownAwaitTermination(pool, 60L); } @@ -42,7 +55,7 @@ public class ExecutorHelper { * @param pool 需要关闭的管理者 * @param timeout 等待时间 */ - public static void shutdownAwaitTermination(ExecutorService pool, + public void shutdownAwaitTermination(ExecutorService pool, long timeout) { pool.shutdown(); try { @@ -65,7 +78,7 @@ public class ExecutorHelper { * @param name 名称. * @return 线程工厂实例. */ - public static ThreadFactory buildExecutorFactory(final String name) { + public ThreadFactory buildExecutorFactory(final String name) { return buildExecutorFactory(name, false); } @@ -76,7 +89,7 @@ public class ExecutorHelper { * @param daemon 是否为后台线程. * @return 线程工厂实例. */ - public static ThreadFactory buildExecutorFactory(final String name, final boolean daemon) { + public ThreadFactory buildExecutorFactory(final String name, final boolean daemon) { return new ThreadFactory() { private final AtomicLong number = new AtomicLong(); @@ -92,12 +105,22 @@ public class ExecutorHelper { }; } - public static ExecutorService buildExecutor(int worker, int queue, String namePrefix, boolean daemon) { - return new ThreadPoolExecutor(worker, worker, - 0L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(queue), - buildExecutorFactory(namePrefix, daemon), - new ThreadPoolExecutor.AbortPolicy() - ); + public ExecutorService buildExecutor() { + if (ObjectUtil.isNull(executorService)){ + LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); + //只有在非spring的场景下liteflowConfig才会为null + if (ObjectUtil.isNull(liteflowConfig)){ + liteflowConfig = new LiteflowConfig(); + } + + + executorService = new ThreadPoolExecutor(liteflowConfig.getWhenMaxWorkers(), + liteflowConfig.getWhenMaxWorkers(), + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(liteflowConfig.getWhenQueueLimit()), + buildExecutorFactory("liteflow-when-thead", false), + new ThreadPoolExecutor.AbortPolicy()); + } + return executorService; } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/LiteFlowExecutorPoolShutdown.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/LiteFlowExecutorPoolShutdown.java index d4f913f36..a2eade017 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/util/LiteFlowExecutorPoolShutdown.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/LiteFlowExecutorPoolShutdown.java @@ -20,7 +20,7 @@ public class LiteFlowExecutorPoolShutdown { ExecutorService executorService = SpringAware.getBean("whenExecutors"); LOG.info("Start closing the liteflow-when-calls..."); - ExecutorHelper.shutdownAwaitTermination(executorService); + ExecutorHelper.loadInstance().shutdownAwaitTermination(executorService); LOG.info("Succeed closing the liteflow-when-calls ok..."); } } diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java index 25a08cf15..12c1af196 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java @@ -24,9 +24,7 @@ public class LiteflowExecutorAutoConfiguration { @Bean("whenExecutors") public ExecutorService executorService(LiteflowConfig liteflowConfig) { - Integer useWorker = liteflowConfig.getWhenMaxWorkers(); - Integer useQueue = liteflowConfig.getWhenQueueLimit(); - return ExecutorHelper.buildExecutor(useWorker, useQueue, "liteflow-when-thead", false); + return ExecutorHelper.loadInstance().buildExecutor(); } @Bean