From 6d113d1b2b1349b120e20099cca847f8a5f1d2af Mon Sep 17 00:00:00 2001 From: bryan31 Date: Fri, 5 Nov 2021 00:23:56 +0800 Subject: [PATCH] =?UTF-8?q?enhancement=20#I4GZ1Q=20=E5=A2=9E=E5=BC=BA?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E7=BA=BF=E7=A8=8B=E8=B6=85=E6=97=B6=E7=9A=84?= =?UTF-8?q?=E6=83=85=E5=86=B5=E4=B8=8B=E6=89=93=E5=8D=B0=E5=87=BA=E5=85=B7?= =?UTF-8?q?=E4=BD=93=E8=B6=85=E6=97=B6=E8=8A=82=E7=82=B9=E7=9A=84=E4=BF=A1?= =?UTF-8?q?=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yomahub/liteflow/entity/flow/Chain.java | 42 +++++++++++-------- .../WhenTimeOutSpringbootTest.java | 41 ++++++++++++++++++ .../liteflow/test/whenTimeOut/cmp/ACmp.java | 20 +++++++++ .../liteflow/test/whenTimeOut/cmp/BCmp.java | 26 ++++++++++++ .../liteflow/test/whenTimeOut/cmp/CCmp.java | 26 ++++++++++++ .../whenTimeOut/application.properties | 2 + .../src/test/resources/whenTimeOut/flow.xml | 6 +++ 7 files changed, 146 insertions(+), 17 deletions(-) create mode 100644 liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/WhenTimeOutSpringbootTest.java create mode 100644 liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/ACmp.java create mode 100644 liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/BCmp.java create mode 100644 liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/CCmp.java create mode 100644 liteflow-testcase-springboot/src/test/resources/whenTimeOut/application.properties create mode 100644 liteflow-testcase-springboot/src/test/resources/whenTimeOut/flow.xml diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java index 429ef70d8..bc9c6d403 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java @@ -22,10 +22,13 @@ import com.yomahub.liteflow.property.LiteflowConfigGetter; import com.yomahub.liteflow.util.ExecutorHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; + +import java.lang.reflect.Array; +import java.util.*; import java.util.concurrent.*; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.IntStream; /** * chain对象,实现可执行器 @@ -122,27 +125,32 @@ public class Chain implements Executable { // 使用线程池执行when并发流程 private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) { final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size()); - final List> futures = new ArrayList<>(condition.getNodeList().size()); + final Map> futureMap = new HashMap<>(); //此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的 ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutor(); LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); - for (int i = 0; i < condition.getNodeList().size(); i++) { - futures.add(parallelExecutor.submit( - TtlCallable.get(new ParallelCallable(condition.getNodeList().get(i), slotIndex, requestId, latch, liteflowConfig.getRetryCount())) - )); - } + condition.getNodeList().forEach(executable -> { + Future future = parallelExecutor.submit( + Objects.requireNonNull(TtlCallable.get(new ParallelCallable(executable, slotIndex, requestId, latch, liteflowConfig.getRetryCount()))) + ); + futureMap.put(executable.getExecuteName(), future); + }); boolean interrupted = false; try { if (!latch.await(liteflowConfig.getWhenMaxWaitSeconds(), TimeUnit.SECONDS)) { - for (Future f : futures) { - f.cancel(true); - } + + futureMap.forEach((name, f) -> { + boolean flag = f.cancel(true); + //如果flag为true,说明线程被成功cancel掉了,需要打出这个线程对应的执行器单元的name,说明这个线程超时了 + if (flag){ + LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", requestId, name); + } + }); interrupted = true; - LOG.warn("requestId [{}] executing async condition has reached max-wait-seconds, condition canceled.", requestId); } } catch (InterruptedException e) { interrupted = true; @@ -154,15 +162,15 @@ public class Chain implements Executable { throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", requestId)); } - for (Future f : futures) { + futureMap.forEach((name, f) -> { try { if (!f.get()) { - throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute failed. errorResume [false].", requestId)); + throw new WhenExecuteException(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", name, requestId)); } } catch (InterruptedException | ExecutionException e) { - throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute failed. errorResume [false].", requestId)); + throw new WhenExecuteException(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", name, requestId)); } - } + }); } else if (interrupted) { // 这里由于配置了errorResume,所以只打印warn日志 LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", requestId); diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/WhenTimeOutSpringbootTest.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/WhenTimeOutSpringbootTest.java new file mode 100644 index 000000000..66444ae55 --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/WhenTimeOutSpringbootTest.java @@ -0,0 +1,41 @@ +package com.yomahub.liteflow.test.whenTimeOut; + +import cn.hutool.core.io.resource.ResourceUtil; +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.entity.data.DefaultSlot; +import com.yomahub.liteflow.entity.data.LiteflowResponse; +import com.yomahub.liteflow.enums.FlowParserTypeEnum; +import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.test.BaseTest; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; + +import javax.annotation.Resource; + +/** + * springboot环境下异步线程超时日志打印测试 + * @author Bryan.Zhang + * @since 2.6.4 + */ +@RunWith(SpringRunner.class) +@TestPropertySource(value = "classpath:/whenTimeOut/application.properties") +@SpringBootTest(classes = WhenTimeOutSpringbootTest.class) +@EnableAutoConfiguration +@ComponentScan({"com.yomahub.liteflow.test.whenTimeOut.cmp"}) +public class WhenTimeOutSpringbootTest extends BaseTest { + + @Resource + private FlowExecutor flowExecutor; + + @Test + public void testWhenTimeOut() throws Exception{ + LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); + Assert.assertTrue(response.isSuccess()); + } +} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/ACmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/ACmp.java new file mode 100644 index 000000000..19b0bc06d --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/ACmp.java @@ -0,0 +1,20 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.whenTimeOut.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("a") +public class ACmp extends NodeComponent { + + @Override + public void process() { + System.out.println("ACmp executed!"); + } +} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/BCmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/BCmp.java new file mode 100644 index 000000000..41a1d901c --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/BCmp.java @@ -0,0 +1,26 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.whenTimeOut.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("b") +public class BCmp extends NodeComponent { + + @Override + public void process() { + try { + Thread.sleep(6000); + }catch (Exception ignored){ + + } + System.out.println("BCmp executed!"); + } + +} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/CCmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/CCmp.java new file mode 100644 index 000000000..d1537b04f --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/whenTimeOut/cmp/CCmp.java @@ -0,0 +1,26 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.whenTimeOut.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("c") +public class CCmp extends NodeComponent { + + @Override + public void process() { + try { + Thread.sleep(8000); + }catch (Exception ignored){ + + } + System.out.println("CCmp executed!"); + } + +} diff --git a/liteflow-testcase-springboot/src/test/resources/whenTimeOut/application.properties b/liteflow-testcase-springboot/src/test/resources/whenTimeOut/application.properties new file mode 100644 index 000000000..a35693877 --- /dev/null +++ b/liteflow-testcase-springboot/src/test/resources/whenTimeOut/application.properties @@ -0,0 +1,2 @@ +liteflow.rule-source=whenTimeOut/flow.xml +liteflow.when-max-wait-seconds=5 \ No newline at end of file diff --git a/liteflow-testcase-springboot/src/test/resources/whenTimeOut/flow.xml b/liteflow-testcase-springboot/src/test/resources/whenTimeOut/flow.xml new file mode 100644 index 000000000..657f64cc3 --- /dev/null +++ b/liteflow-testcase-springboot/src/test/resources/whenTimeOut/flow.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file