From 332ce3ba0951946ecb6396053196640a9c82abb0 Mon Sep 17 00:00:00 2001 From: bryan31 Date: Tue, 9 Nov 2021 13:45:18 +0800 Subject: [PATCH] =?UTF-8?q?enhancement=20#I4HD8L=20=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E8=8A=82=E7=82=B9=E8=BF=94=E5=9B=9E=E8=87=AA?= =?UTF-8?q?=E5=AE=9A=E4=B9=89=E7=9A=84=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../liteflow/asynctool/callback/IWorker.java | 2 +- .../asynctool/wrapper/WorkerWrapper.java | 4 + .../asynctool/test/seqwork/SeqWorkTest.java | 7 +- .../yomahub/liteflow/entity/data/AbsSlot.java | 4 + .../yomahub/liteflow/entity/data/Slot.java | 2 + .../yomahub/liteflow/entity/flow/Chain.java | 84 +++++------ .../entity/flow/ParallelCallable.java | 43 ------ .../liteflow/entity/flow/ParallelWorker.java | 29 ++++ .../asyncNode/AsyncNodeSpringbootTest.java | 107 ++++++++++++++ .../cmp1 => asyncNode/cmp}/ACmp.java | 2 +- .../cmp1 => asyncNode/cmp}/BCmp.java | 2 +- .../cmp1 => asyncNode/cmp}/CCmp.java | 2 +- .../cmp1 => asyncNode/cmp}/DCmp.java | 2 +- .../cmp1 => asyncNode/cmp}/ECmp.java | 3 +- .../cmp1 => asyncNode/cmp}/FCmp.java | 2 +- .../cmp1 => asyncNode/cmp}/GCmp.java | 2 +- .../cmp1 => asyncNode/cmp}/HCmp.java | 2 +- .../liteflow/test/asyncNode/cmp/ICmp.java | 24 ++++ .../cmp1 => asyncNode/cmp}/JCmp.java | 2 +- .../asyncNode/exception/TestException.java | 4 + .../test/condition/BaseConditionFlowTest.java | 136 ------------------ .../liteflow/test/condition/cmp1/ICmp.java | 19 --- .../UseTTLInWhenSpringbootTest.java | 2 +- .../asyncNode/application.properties | 1 + .../{condition => asyncNode}/flow.xml | 2 - .../application-condition.properties | 1 - .../condition/application-xml.properties | 1 - 27 files changed, 233 insertions(+), 258 deletions(-) delete mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelWorker.java create mode 100644 liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeSpringbootTest.java rename liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/{condition/cmp1 => asyncNode/cmp}/ACmp.java (83%) rename liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/{condition/cmp1 => asyncNode/cmp}/BCmp.java (83%) rename liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/{condition/cmp1 => asyncNode/cmp}/CCmp.java (84%) rename liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/{condition/cmp1 => asyncNode/cmp}/DCmp.java (85%) rename liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/{condition/cmp1 => asyncNode/cmp}/ECmp.java (75%) rename liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/{condition/cmp1 => asyncNode/cmp}/FCmp.java (84%) rename liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/{condition/cmp1 => asyncNode/cmp}/GCmp.java (84%) rename liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/{condition/cmp1 => asyncNode/cmp}/HCmp.java (84%) create mode 100644 liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/ICmp.java rename liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/{condition/cmp1 => asyncNode/cmp}/JCmp.java (86%) create mode 100644 liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/exception/TestException.java delete mode 100644 liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/BaseConditionFlowTest.java delete mode 100644 liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/ICmp.java create mode 100644 liteflow-testcase-springboot/src/test/resources/asyncNode/application.properties rename liteflow-testcase-springboot/src/test/resources/{condition => asyncNode}/flow.xml (98%) delete mode 100644 liteflow-testcase-springboot/src/test/resources/condition/application-condition.properties delete mode 100644 liteflow-testcase-springboot/src/test/resources/condition/application-xml.properties diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/IWorker.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/IWorker.java index 9b520bf01..9a318271a 100755 --- a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/IWorker.java +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/IWorker.java @@ -17,7 +17,7 @@ public interface IWorker { * @param object object * @param allWrappers 任务包装 */ - V action(T object, Map allWrappers); + V action(T object, Map allWrappers) throws Exception; /** * 超时、异常时,返回的默认值 diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/wrapper/WorkerWrapper.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/wrapper/WorkerWrapper.java index d4010d439..8d6811e46 100755 --- a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/wrapper/WorkerWrapper.java +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/wrapper/WorkerWrapper.java @@ -607,4 +607,8 @@ public class WorkerWrapper { } } + + public IWorker getWorker() { + return worker; + } } diff --git a/liteflow-async-tool/src/test/java/com/yomahub/liteflow/asynctool/test/seqwork/SeqWorkTest.java b/liteflow-async-tool/src/test/java/com/yomahub/liteflow/asynctool/test/seqwork/SeqWorkTest.java index 6dd483498..53ed747e7 100644 --- a/liteflow-async-tool/src/test/java/com/yomahub/liteflow/asynctool/test/seqwork/SeqWorkTest.java +++ b/liteflow-async-tool/src/test/java/com/yomahub/liteflow/asynctool/test/seqwork/SeqWorkTest.java @@ -68,19 +68,18 @@ public class SeqWorkTest { .worker(seqWork2) .callback(callback2) .param("param2") - .depend(workerWrapper1) +// .depend(workerWrapper1) .build(); WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() .worker(seqWork3) .callback(callback3) .param("param3") - .depend(workerWrapper2) +// .depend(workerWrapper2) .build(); try{ - boolean flag = Async.beginWork(2500,workerWrapper1); - System.out.println(workerWrapper3.getWorkResult().getResultState()); + boolean flag = Async.beginWork(4000,workerWrapper1,workerWrapper2,workerWrapper3); System.out.println(flag); }catch (Exception e){ e.printStackTrace(); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/AbsSlot.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/AbsSlot.java index 8ebeaca69..f2f95132a 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/AbsSlot.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/AbsSlot.java @@ -87,6 +87,10 @@ public abstract class AbsSlot implements Slot { dataMap.put(CHAIN_REQ_PREFIX + chainId, t); } + public boolean hasData(String key){ + return dataMap.containsKey(key); + } + public T getData(String key){ return (T)dataMap.get(key); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/Slot.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/Slot.java index 023f1e934..6e97136ec 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/Slot.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/Slot.java @@ -32,6 +32,8 @@ public interface Slot { void setResponseData(T t); + boolean hasData(String key); + T getData(String key); void setData(String key, T t); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java index 720b337a8..7bea2b6d3 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java @@ -10,11 +10,14 @@ package com.yomahub.liteflow.entity.flow; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; -import com.alibaba.ttl.TtlCallable; +import com.alibaba.ttl.threadpool.TtlExecutors; +import com.yomahub.liteflow.asynctool.executor.Async; +import com.yomahub.liteflow.asynctool.worker.ResultState; +import com.yomahub.liteflow.asynctool.worker.WorkResult; +import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper; import com.yomahub.liteflow.entity.data.DataBus; import com.yomahub.liteflow.entity.data.Slot; import com.yomahub.liteflow.enums.ExecuteTypeEnum; -import com.yomahub.liteflow.exception.ChainEndException; import com.yomahub.liteflow.exception.FlowSystemException; import com.yomahub.liteflow.exception.WhenExecuteException; import com.yomahub.liteflow.property.LiteflowConfig; @@ -22,13 +25,9 @@ import com.yomahub.liteflow.property.LiteflowConfigGetter; import com.yomahub.liteflow.util.ExecutorHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.lang.reflect.Array; import java.util.*; import java.util.concurrent.*; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.stream.IntStream; +import java.util.stream.Collectors; /** * chain对象,实现可执行器 @@ -97,57 +96,62 @@ public class Chain implements Executable { } - // 使用线程池执行when并发流程 - private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) { - final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size()); - final Map> futureMap = new HashMap<>(); + //使用线程池执行when并发流程 + private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) throws Exception{ //此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的 - ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutor(); + ExecutorService parallelExecutor = TtlExecutors.getTtlExecutorService(ExecutorHelper.loadInstance().buildExecutor()); + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); - condition.getNodeList().forEach(executable -> { - Future future = parallelExecutor.submit( - Objects.requireNonNull(TtlCallable.get(new ParallelCallable(executable, slotIndex, requestId, latch))) - ); - futureMap.put(executable.getExecuteName(), future); - }); + //封装asyncTool的workerWrapper对象 + List> parallelWorkerWrapperList = condition.getNodeList().stream() + .map(executable -> new WorkerWrapper.Builder() + .worker(new ParallelWorker(executable, slotIndex)) + .next(new WorkerWrapper.Builder().worker((object, allWrappers) -> Void.TYPE.newInstance()).build(), true) + .build()) + .collect(Collectors.toList()); boolean interrupted = false; - try { - if (!latch.await(liteflowConfig.getWhenMaxWaitSeconds(), TimeUnit.SECONDS)) { + boolean asyncToolResult; - futureMap.forEach((name, f) -> { - boolean flag = f.cancel(true); - //如果flag为true,说明线程被成功cancel掉了,需要打出这个线程对应的执行器单元的name,说明这个线程超时了 - if (flag){ - LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", requestId, name); - } - }); - interrupted = true; - } - } catch (InterruptedException e) { + //这里利用asyncTool框架进行并行调用 + try{ + asyncToolResult = Async.beginWork(liteflowConfig.getWhenMaxWaitSeconds()*1000, + parallelExecutor, + parallelWorkerWrapperList.toArray(new WorkerWrapper[]{})); + }catch (Exception e){ + throw new WhenExecuteException(StrUtil.format("requestId [{}] AsyncTool framework execution exception.", requestId)); + } + + //asyncToolResult为false,说明是timeout状态了 + //遍历wrapper拿到worker,拿到defaultValue,其实就是nodeId,打印出来 + if (!asyncToolResult){ + parallelWorkerWrapperList.forEach(workerWrapper -> { + if(workerWrapper.getWorkResult().getResultState().equals(ResultState.TIMEOUT)){ + LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", + requestId, workerWrapper.getWorker().defaultValue()); + } + }); interrupted = true; } - //当配置了errorResume = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException + //errorResume是一个condition里的参数,如果为true,表示即便出现了错误,也继续执行下一个condition + //当配置了errorResume = false,出现interrupted或者其中一个线程执行出错的情况,将抛出WhenExecuteException if (!condition.isErrorResume()) { if (interrupted) { throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", requestId)); } - futureMap.forEach((name, f) -> { - try { - if (!f.get()) { - throw new WhenExecuteException(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", name, requestId)); - } - } catch (InterruptedException | ExecutionException e) { - throw new WhenExecuteException(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", name, requestId)); + for (WorkerWrapper workerWrapper : parallelWorkerWrapperList){ + WorkResult workResult = workerWrapper.getWorkResult(); + if (!workResult.getResultState().equals(ResultState.SUCCESS)){ + throw workResult.getEx(); } - }); + } } else if (interrupted) { - // 这里由于配置了errorResume,所以只打印warn日志 + // 这里由于配置了errorResume=true,所以只打印warn日志 LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", requestId); } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java deleted file mode 100644 index 82ee1c3f5..000000000 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.yomahub.liteflow.entity.flow; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.concurrent.Callable; - -import java.util.concurrent.CountDownLatch; - -/** - * 并行器线程 - * @author Bryan.Zhang - */ -public class ParallelCallable implements Callable { - - private static final Logger LOG = LoggerFactory.getLogger(ParallelCallable.class); - - private final Executable executableItem; - - private final Integer slotIndex; - - private final String requestId; - - private final CountDownLatch latch; - - public ParallelCallable(Executable executableItem, Integer slotIndex, String requestId, CountDownLatch latch) { - this.executableItem = executableItem; - this.slotIndex = slotIndex; - this.requestId = requestId; - this.latch = latch; - } - - @Override - public Boolean call() throws Exception { - try { - executableItem.execute(slotIndex); - return true; - } catch (Exception e){ - return false; - } finally { - latch.countDown(); - } - } -} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelWorker.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelWorker.java new file mode 100644 index 000000000..f5d82f368 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelWorker.java @@ -0,0 +1,29 @@ +package com.yomahub.liteflow.entity.flow; + +import com.yomahub.liteflow.asynctool.callback.IWorker; +import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper; + +import java.util.Map; + +public class ParallelWorker implements IWorker { + + private final Executable executableItem; + + private final Integer slotIndex; + + public ParallelWorker(Executable executableItem, Integer slotIndex) { + this.executableItem = executableItem; + this.slotIndex = slotIndex; + } + + @Override + public String action(Void object, Map allWrappers) throws Exception{ + executableItem.execute(slotIndex); + return executableItem.getExecuteName(); + } + + @Override + public String defaultValue() { + return executableItem.getExecuteName(); + } +} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeSpringbootTest.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeSpringbootTest.java new file mode 100644 index 000000000..e065ec453 --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeSpringbootTest.java @@ -0,0 +1,107 @@ +package com.yomahub.liteflow.test.asyncNode; + +import cn.hutool.core.collection.ListUtil; +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.entity.data.DefaultSlot; +import com.yomahub.liteflow.entity.data.LiteflowResponse; +import com.yomahub.liteflow.test.BaseTest; +import com.yomahub.liteflow.test.asyncNode.exception.TestException; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; + +import javax.annotation.Resource; + +/** + * 测试隐式调用子流程 + * 单元测试 + * + * @author ssss + */ +@RunWith(SpringRunner.class) +@TestPropertySource(value = "classpath:/asyncNode/application.properties") +@SpringBootTest(classes = AsyncNodeSpringbootTest.class) +@EnableAutoConfiguration +@ComponentScan({"com.yomahub.liteflow.test.asyncNode.cmp"}) +public class AsyncNodeSpringbootTest extends BaseTest { + @Resource + private FlowExecutor flowExecutor; + + /***** + * 标准chain 嵌套选择 嵌套子chain进行执行 + * 验证了when情况下 多个node是并行执行 + * 验证了默认参数情况下 when可以加载执行 + * **/ + @Test + public void testBaseConditionFlow1() { + LiteflowResponse response = flowExecutor.execute2Resp("chain1", "it's a base request"); + Assert.assertTrue(response.isSuccess()); + System.out.println(response.getSlot().printStep()); + } + + @Test + public void testBaseConditionFlow2() { + LiteflowResponse response = flowExecutor.execute2Resp("chain2", "it's a base request"); + Assert.assertTrue(ListUtil.toList("b==>j==>g==>f==>h","b==>j==>g==>h==>f", + "b==>j==>h==>g==>f","b==>j==>h==>f==>g", + "b==>j==>f==>h==>g","b==>j==>f==>g==>h" + ).contains(response.getSlot().printStep())); + } + + //相同group的并行组,会合并,并且errorResume根据第一个when来,这里第一个when配置了不抛错 + @Test + public void testBaseErrorResumeConditionFlow4() { + LiteflowResponse response = flowExecutor.execute2Resp("chain4", "it's a base request"); + //因为不记录错误,所以最终结果是true + Assert.assertTrue(response.isSuccess()); + //因为是并行组,所以即便抛错了,其他组件也会执行,i在流程里配置了2遍,i抛错,但是也执行了2遍,这里验证下 + Integer count = response.getSlot().getData("count"); + Assert.assertEquals(new Integer(2), count); + //因为配置了不抛错,所以response里的cause应该为null + Assert.assertNull(response.getCause()); + } + + //相同group的并行组,会合并,并且errorResume根据第一个when来,这里第一个when配置了会抛错 + @Test + public void testBaseErrorResumeConditionFlow5() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain5", "it's a base request"); + //整个并行组是报错的,所以最终结果是false + Assert.assertFalse(response.isSuccess()); + //因为是并行组,所以即便抛错了,其他组件也会执行,i在流程里配置了2遍,i抛错,但是也执行了2遍,这里验证下 + Integer count = response.getSlot().getData("count"); + Assert.assertEquals(new Integer(2), count); + //因为第一个when配置了会报错,所以response里的cause里应该会有TestException + Assert.assertEquals(TestException.class, response.getCause().getClass()); + } + + //不同group的并行组,不会合并,第一个when的errorResume是false,会抛错,那第二个when就不会执行 + @Test + public void testBaseErrorResumeConditionFlow6() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain6", "it's a base request"); + //第一个when会抛错,所以最终结果是false + Assert.assertFalse(response.isSuccess()); + //因为是不同组并行组,第一组的when里的i就抛错了,所以i就执行了1遍 + Integer count = response.getSlot().getData("count"); + Assert.assertEquals(new Integer(1), count); + //第一个when会报错,所以最终response的cause里应该会有TestException + Assert.assertEquals(TestException.class, response.getCause().getClass()); + } + + //不同group的并行组,不会合并,第一个when的errorResume是true,不会报错,那第二个when还会继续执行,但是第二个when的errorResume是false,所以第二个when会报错 + @Test + public void testBaseErrorResumeConditionFlow7() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain7", "it's a base request"); + //第二个when会抛错,所以最终结果是false + Assert.assertFalse(response.isSuccess()); + // 传递了slotIndex,则set的size==2 + Integer count = response.getSlot().getData("count"); + Assert.assertEquals(new Integer(2), count); + //第一个when会报错,所以最终response的cause里应该会有TestException + Assert.assertEquals(TestException.class, response.getCause().getClass()); + } +} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/ACmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/ACmp.java similarity index 83% rename from liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/ACmp.java rename to liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/ACmp.java index b1af3e08b..d1891c141 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/ACmp.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/ACmp.java @@ -1,4 +1,4 @@ -package com.yomahub.liteflow.test.condition.cmp1; +package com.yomahub.liteflow.test.asyncNode.cmp; import com.yomahub.liteflow.core.NodeComponent; import org.springframework.stereotype.Component; diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/BCmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/BCmp.java similarity index 83% rename from liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/BCmp.java rename to liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/BCmp.java index 36cc501e6..88f97fc36 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/BCmp.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/BCmp.java @@ -1,4 +1,4 @@ -package com.yomahub.liteflow.test.condition.cmp1; +package com.yomahub.liteflow.test.asyncNode.cmp; import com.yomahub.liteflow.core.NodeComponent; import org.springframework.stereotype.Component; diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/CCmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/CCmp.java similarity index 84% rename from liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/CCmp.java rename to liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/CCmp.java index 0d0630e3d..8d72549d0 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/CCmp.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/CCmp.java @@ -1,4 +1,4 @@ -package com.yomahub.liteflow.test.condition.cmp1; +package com.yomahub.liteflow.test.asyncNode.cmp; import com.yomahub.liteflow.core.NodeComponent; import org.springframework.stereotype.Component; diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/DCmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/DCmp.java similarity index 85% rename from liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/DCmp.java rename to liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/DCmp.java index 40a607222..46624de0d 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/DCmp.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/DCmp.java @@ -1,4 +1,4 @@ -package com.yomahub.liteflow.test.condition.cmp1; +package com.yomahub.liteflow.test.asyncNode.cmp; import com.yomahub.liteflow.core.NodeComponent; import org.springframework.stereotype.Component; diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/ECmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/ECmp.java similarity index 75% rename from liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/ECmp.java rename to liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/ECmp.java index 5b1e01042..19e9b27bf 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/ECmp.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/ECmp.java @@ -1,6 +1,5 @@ -package com.yomahub.liteflow.test.condition.cmp1; +package com.yomahub.liteflow.test.asyncNode.cmp; -import com.yomahub.liteflow.core.NodeComponent; import com.yomahub.liteflow.core.NodeCondComponent; import org.springframework.stereotype.Component; diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/FCmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/FCmp.java similarity index 84% rename from liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/FCmp.java rename to liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/FCmp.java index db2b77c8c..eb9322c81 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/FCmp.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/FCmp.java @@ -1,4 +1,4 @@ -package com.yomahub.liteflow.test.condition.cmp1; +package com.yomahub.liteflow.test.asyncNode.cmp; import com.yomahub.liteflow.core.NodeComponent; import org.springframework.stereotype.Component; diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/GCmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/GCmp.java similarity index 84% rename from liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/GCmp.java rename to liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/GCmp.java index 519f76c05..0660c076a 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/GCmp.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/GCmp.java @@ -1,4 +1,4 @@ -package com.yomahub.liteflow.test.condition.cmp1; +package com.yomahub.liteflow.test.asyncNode.cmp; import com.yomahub.liteflow.core.NodeComponent; import org.springframework.stereotype.Component; diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/HCmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/HCmp.java similarity index 84% rename from liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/HCmp.java rename to liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/HCmp.java index 1d9202d2e..fef8fef49 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/HCmp.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/HCmp.java @@ -1,4 +1,4 @@ -package com.yomahub.liteflow.test.condition.cmp1; +package com.yomahub.liteflow.test.asyncNode.cmp; import com.yomahub.liteflow.core.NodeComponent; import org.springframework.stereotype.Component; diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/ICmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/ICmp.java new file mode 100644 index 000000000..7526ad545 --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/ICmp.java @@ -0,0 +1,24 @@ +package com.yomahub.liteflow.test.asyncNode.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.entity.data.Slot; +import com.yomahub.liteflow.test.asyncNode.exception.TestException; +import org.springframework.stereotype.Component; + + +@Component("i") +public class ICmp extends NodeComponent { + + @Override + public void process() throws Exception { + Slot slot = this.getSlot(); + if (slot.hasData("count")){ + Integer count = slot.getData("count"); + slot.setData("count", ++count); + } else{ + slot.setData("count", 1); + } + System.out.println("Icomp executed! throw Exception!"); + throw new TestException(); + } +} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/JCmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/JCmp.java similarity index 86% rename from liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/JCmp.java rename to liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/JCmp.java index 3a0e93bc0..41d95782d 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/JCmp.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/JCmp.java @@ -1,4 +1,4 @@ -package com.yomahub.liteflow.test.condition.cmp1; +package com.yomahub.liteflow.test.asyncNode.cmp; import com.yomahub.liteflow.core.NodeCondComponent; import org.springframework.stereotype.Component; diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/exception/TestException.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/exception/TestException.java new file mode 100644 index 000000000..e786e9f86 --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/exception/TestException.java @@ -0,0 +1,4 @@ +package com.yomahub.liteflow.test.asyncNode.exception; + +public class TestException extends Exception{ +} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/BaseConditionFlowTest.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/BaseConditionFlowTest.java deleted file mode 100644 index 3ea946704..000000000 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/BaseConditionFlowTest.java +++ /dev/null @@ -1,136 +0,0 @@ -package com.yomahub.liteflow.test.condition; - -import cn.hutool.core.collection.ListUtil; -import com.google.common.collect.Lists; -import com.yomahub.liteflow.core.FlowExecutor; -import com.yomahub.liteflow.entity.data.DefaultSlot; -import com.yomahub.liteflow.entity.data.LiteflowResponse; -import com.yomahub.liteflow.exception.WhenExecuteException; -import com.yomahub.liteflow.test.BaseTest; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.junit4.SpringRunner; -import org.springframework.util.ReflectionUtils; - -import javax.annotation.Resource; -import java.util.List; - -/** - * 测试隐式调用子流程 - * 单元测试 - * - * @author ssss - */ -@RunWith(SpringRunner.class) -@TestPropertySource(value = "classpath:/condition/application-condition.properties") -@SpringBootTest(classes = BaseConditionFlowTest.class) -@EnableAutoConfiguration -@ComponentScan({"com.yomahub.liteflow.test.condition.cmp1"}) -public class BaseConditionFlowTest extends BaseTest { - @Resource - private FlowExecutor flowExecutor; - - public static List RUN_TIME_SLOT = Lists.newArrayList(); - - /***** - * 标准chain 嵌套选择 嵌套子chain进行执行 - * 验证了when情况下 多个node是并行执行 - * 验证了默认参数情况下 when可以加载执行 - * **/ - @Test - public void testBaseConditionFlow1() { - LiteflowResponse response = flowExecutor.execute2Resp("chain1", "it's a base request"); - Assert.assertTrue(response.isSuccess()); - System.out.println(response.getSlot().printStep()); - } - - @Test - public void testBaseConditionFlow2() { - LiteflowResponse response = flowExecutor.execute2Resp("chain2", "it's a base request"); - Assert.assertTrue(ListUtil.toList("b==>j==>g==>f==>h","b==>j==>g==>h==>f", - "b==>j==>h==>g==>f","b==>j==>h==>f==>g", - "b==>j==>f==>h==>g","b==>j==>f==>g==>h" - ).contains(response.getSlot().printStep())); - } - - /***** - * 标准chain - * 验证多层when 相同组 会合并node - * 验证多层when errorResume 合并 并参照最上层 errorResume配置 - * **/ - @Test - public void testBaseErrorResumeConditionFlow4() { - RUN_TIME_SLOT.clear(); - LiteflowResponse response = flowExecutor.execute2Resp("chain4", "it's a base request"); - Assert.assertTrue(response.isSuccess()); - // 传递了slotIndex,则set的size==2 - Assert.assertEquals(2, RUN_TIME_SLOT.size()); - // set中第一次设置的requestId和response中的requestId一致 - Assert.assertTrue(RUN_TIME_SLOT.contains(response.getSlot().getRequestId())); - - } - - /***** - * 标准chain - * 验证多层when 相同组 会合并node - * 验证多层when errorResume 合并 并参照最上层 errorResume配置 - * **/ - @Test(expected = WhenExecuteException.class) - public void testBaseErrorResumeConditionFlow5() throws Exception { - RUN_TIME_SLOT.clear(); - LiteflowResponse response = flowExecutor.execute2Resp("chain5", "it's a base request"); - System.out.println(response.isSuccess()); - //System.out.println(response.getSlot().printStep()); - Assert.assertFalse(response.isSuccess()); - // 传递了slotIndex,则set的size==2 - Assert.assertEquals(2, RUN_TIME_SLOT.size()); - // set中第一次设置的requestId和response中的requestId一致 - //Assert.assertTrue(RUN_TIME_SLOT.contains(response.getSlot().getRequestId())); - ReflectionUtils.rethrowException(response.getCause()); - } - - /***** - * 标准chain - * 验证多层when 不同组 不会合并node - * 验证多层when errorResume 不同组 配置分开配置 - * **/ - @Test(expected = WhenExecuteException.class) - public void testBaseErrorResumeConditionFlow6() throws Exception { - RUN_TIME_SLOT.clear(); - LiteflowResponse response = flowExecutor.execute2Resp("chain6", "it's a base request"); - System.out.println(response.isSuccess()); - //System.out.println(response.getSlot().printStep()); - Assert.assertFalse(response.isSuccess()); - // 传递了slotIndex,则set的size==1 - Assert.assertEquals(1, RUN_TIME_SLOT.size()); - // set中第一次设置的requestId和response中的requestId一致 - //Assert.assertTrue(RUN_TIME_SLOT.contains(response.getSlot().getRequestId())); - ReflectionUtils.rethrowException(response.getCause()); - - } - - /***** - * 标准chain - * 验证多层when 不同组 不会合并node - * 验证多层when errorResume 不同组 配置分开配置 - * **/ - @Test(expected = WhenExecuteException.class) - public void testBaseErrorResumeConditionFlow7() throws Exception { - RUN_TIME_SLOT.clear(); - LiteflowResponse response = flowExecutor.execute2Resp("chain7", "it's a base request"); - System.out.println(response.isSuccess()); - //System.out.println(response.getSlot().printStep()); - Assert.assertFalse(response.isSuccess()); - // 传递了slotIndex,则set的size==2 - Assert.assertEquals(2, BaseConditionFlowTest.RUN_TIME_SLOT.size()); - // set中第一次设置的requestId和response中的requestId一致 - //Assert.assertTrue(RUN_TIME_SLOT.contains(response.getSlot().getRequestId())); - ReflectionUtils.rethrowException(response.getCause()); - - } -} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/ICmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/ICmp.java deleted file mode 100644 index 05ff95470..000000000 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/condition/cmp1/ICmp.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.yomahub.liteflow.test.condition.cmp1; - -import com.yomahub.liteflow.core.NodeComponent; -import com.yomahub.liteflow.test.condition.BaseConditionFlowTest; -import org.springframework.stereotype.Component; - - - -@Component("i") -public class ICmp extends NodeComponent { - - @Override - public void process() throws Exception { - BaseConditionFlowTest.RUN_TIME_SLOT.add(this.getSlot().getRequestId()); - System.out.println(BaseConditionFlowTest.RUN_TIME_SLOT.size()); - System.out.println("Icomp executed! throw Exception!"); - throw new RuntimeException("主动抛出异常"); - } -} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/useTTLInWhen/UseTTLInWhenSpringbootTest.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/useTTLInWhen/UseTTLInWhenSpringbootTest.java index 8a3df7d16..bfe0c7d8f 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/useTTLInWhen/UseTTLInWhenSpringbootTest.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/useTTLInWhen/UseTTLInWhenSpringbootTest.java @@ -32,7 +32,7 @@ public class UseTTLInWhenSpringbootTest extends BaseTest { private FlowExecutor flowExecutor; @Test - public void testPrivateDelivery() throws Exception{ + public void testUseTTLInWhen() throws Exception{ LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); Assert.assertEquals("hello,b", response.getSlot().getData("b")); Assert.assertEquals("hello,c", response.getSlot().getData("c")); diff --git a/liteflow-testcase-springboot/src/test/resources/asyncNode/application.properties b/liteflow-testcase-springboot/src/test/resources/asyncNode/application.properties new file mode 100644 index 000000000..db0c76e72 --- /dev/null +++ b/liteflow-testcase-springboot/src/test/resources/asyncNode/application.properties @@ -0,0 +1 @@ +liteflow.rule-source=asyncNode/flow.xml \ No newline at end of file diff --git a/liteflow-testcase-springboot/src/test/resources/condition/flow.xml b/liteflow-testcase-springboot/src/test/resources/asyncNode/flow.xml similarity index 98% rename from liteflow-testcase-springboot/src/test/resources/condition/flow.xml rename to liteflow-testcase-springboot/src/test/resources/asyncNode/flow.xml index 83f5738b5..18924f8a0 100644 --- a/liteflow-testcase-springboot/src/test/resources/condition/flow.xml +++ b/liteflow-testcase-springboot/src/test/resources/asyncNode/flow.xml @@ -37,6 +37,4 @@ - - \ No newline at end of file diff --git a/liteflow-testcase-springboot/src/test/resources/condition/application-condition.properties b/liteflow-testcase-springboot/src/test/resources/condition/application-condition.properties deleted file mode 100644 index 15371f39d..000000000 --- a/liteflow-testcase-springboot/src/test/resources/condition/application-condition.properties +++ /dev/null @@ -1 +0,0 @@ -liteflow.rule-source=condition/flow.xml \ No newline at end of file diff --git a/liteflow-testcase-springboot/src/test/resources/condition/application-xml.properties b/liteflow-testcase-springboot/src/test/resources/condition/application-xml.properties deleted file mode 100644 index 15371f39d..000000000 --- a/liteflow-testcase-springboot/src/test/resources/condition/application-xml.properties +++ /dev/null @@ -1 +0,0 @@ -liteflow.rule-source=condition/flow.xml \ No newline at end of file