diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java index ba43fb123..ae4d2f17e 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java @@ -7,6 +7,7 @@ */ package com.yomahub.liteflow.flow.element.condition; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.common.LocalDefaultFlowConstant; import com.yomahub.liteflow.enums.ConditionTypeEnum; @@ -70,7 +71,7 @@ public class WhenCondition extends Condition { // 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的 ExecutorService parallelExecutor = ExecutorHelper.loadInstance() - .buildWhenExecutor(this.getThreadExecutorClass()); + .buildWhenExecutor(this.getThreadExecutorClass()); // 获得liteflow的参数 LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); @@ -85,24 +86,34 @@ public class WhenCondition extends Condition { // 3.根据condition.getNodeList()的集合进行流处理,用map进行把executable对象转换成List> // 4.在转的过程中,套入CompletableFutureTimeout方法进行超时判断,如果超时则用WhenFutureObj.timeOut返回超时的对象 // 5.第2个参数是主要的本体CompletableFuture,传入了ParallelSupplier和线程池对象 + Integer whenMaxWaitTime; + TimeUnit whenMaxWaitTimeUnit; + + if (ObjectUtil.isNotNull(liteflowConfig.getWhenMaxWaitSeconds())){ + whenMaxWaitTime = liteflowConfig.getWhenMaxWaitSeconds(); + whenMaxWaitTimeUnit = TimeUnit.SECONDS; + }else{ + whenMaxWaitTime = liteflowConfig.getWhenMaxWaitTime(); + whenMaxWaitTimeUnit = liteflowConfig.getWhenMaxWaitTimeUnit(); + } + List> completableFutureList = this.getExecutableList() - .stream() - .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)) - .filter(executable -> { - try { - return executable.isAccess(slotIndex); - } - catch (Exception e) { - LOG.error("there was an error when executing the when component isAccess", e); - return false; - } - }) - .map(executable -> CompletableFutureTimeout.completeOnTimeout( - WhenFutureObj.timeOut(executable.getId()), - CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), - parallelExecutor), - liteflowConfig.getWhenMaxWaitSeconds(), TimeUnit.SECONDS)) - .collect(Collectors.toList()); + .stream() + .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)) + .filter(executable -> { + try { + return executable.isAccess(slotIndex); + } catch (Exception e) { + LOG.error("there was an error when executing the when component isAccess", e); + return false; + } + }) + .map(executable -> CompletableFutureTimeout.completeOnTimeout( + WhenFutureObj.timeOut(executable.getId()), + CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), + parallelExecutor), + whenMaxWaitTime, whenMaxWaitTimeUnit)) + .collect(Collectors.toList()); CompletableFuture resultCompletableFuture; @@ -112,19 +123,17 @@ public class WhenCondition extends Condition { if (this.isAny()) { // 把这些CompletableFuture通过anyOf合成一个CompletableFuture resultCompletableFuture = CompletableFuture - .anyOf(completableFutureList.toArray(new CompletableFuture[] {})); - } - else { + .anyOf(completableFutureList.toArray(new CompletableFuture[] {})); + } else { // 把这些CompletableFuture通过allOf合成一个CompletableFuture resultCompletableFuture = CompletableFuture - .allOf(completableFutureList.toArray(new CompletableFuture[] {})); + .allOf(completableFutureList.toArray(new CompletableFuture[] {})); } try { // 进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回 resultCompletableFuture.get(); - } - catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException | ExecutionException e) { LOG.error("there was an error when executing the CompletableFuture", e); interrupted[0] = true; } @@ -137,16 +146,14 @@ public class WhenCondition extends Condition { // 过滤出已经完成的,没完成的就直接终止 if (f.isDone()) { return true; - } - else { + } else { f.cancel(true); return false; } }).map(f -> { try { return f.get(); - } - catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException | ExecutionException e) { interrupted[0] = true; return null; } @@ -155,8 +162,8 @@ public class WhenCondition extends Condition { // 判断超时,上面已经拿到了所有已经完成的CompletableFuture // 那我们只要过滤出超时的CompletableFuture List timeOutWhenFutureObjList = allCompletableWhenFutureObjList.stream() - .filter(WhenFutureObj::isTimeout) - .collect(Collectors.toList()); + .filter(WhenFutureObj::isTimeout) + .collect(Collectors.toList()); // 输出超时信息 timeOutWhenFutureObjList.forEach(whenFutureObj -> LOG.warn( @@ -167,7 +174,7 @@ public class WhenCondition extends Condition { if (!this.isIgnoreError()) { if (interrupted[0]) { throw new WhenExecuteException(StrUtil - .format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId())); + .format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId())); } // 循环判断CompletableFuture的返回值,如果异步执行失败,则抛出相应的业务异常 @@ -178,8 +185,7 @@ public class WhenCondition extends Condition { throw whenFutureObj.getEx(); } } - } - else if (interrupted[0]) { + } else if (interrupted[0]) { // 这里由于配置了ignoreError,所以只打印warn日志 LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", slot.getRequestId()); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/WhenFutureObj.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/WhenFutureObj.java index ec5abd2ff..c0953b891 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/WhenFutureObj.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/WhenFutureObj.java @@ -43,8 +43,7 @@ public class WhenFutureObj { result.setTimeout(true); result.setExecutorName(executorName); result.setEx(new WhenTimeoutException( - StrUtil.format("Timed out when executing the component[{}],when-max-timeout-seconds config is:{}(s)", - executorName, LiteflowConfigGetter.get().getWhenMaxWaitSeconds()))); + StrUtil.format("Timed out when executing the component[{}]",executorName))); return result; } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java index b75f05566..8bb229ca3 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java @@ -12,6 +12,7 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * liteflow的配置实体类 这个类中的属性为什么不用基本类型,而用包装类型呢 @@ -42,8 +43,13 @@ public class LiteflowConfig { private String threadExecutorClass; // 异步线程最大等待秒数 + @Deprecated private Integer whenMaxWaitSeconds; + private Integer whenMaxWaitTime; + + private TimeUnit whenMaxWaitTimeUnit; + // 是否打印监控log private Boolean enableLog; @@ -139,15 +145,15 @@ public class LiteflowConfig { this.slotSize = slotSize; } + @Deprecated public Integer getWhenMaxWaitSeconds() { - if (ObjectUtil.isNull(whenMaxWaitSeconds)) { - return 15; - } - else { - return whenMaxWaitSeconds; + if (whenMaxWaitSeconds == null || whenMaxWaitSeconds == 0){ + return null; } + return whenMaxWaitSeconds; } + @Deprecated public void setWhenMaxWaitSeconds(Integer whenMaxWaitSeconds) { this.whenMaxWaitSeconds = whenMaxWaitSeconds; } @@ -382,4 +388,25 @@ public class LiteflowConfig { this.ruleSourceExtDataMap = ruleSourceExtDataMap; } + public Integer getWhenMaxWaitTime() { + if (ObjectUtil.isNull(whenMaxWaitTime)){ + return 15000; + } + return whenMaxWaitTime; + } + + public void setWhenMaxWaitTime(Integer whenMaxWaitTime) { + this.whenMaxWaitTime = whenMaxWaitTime; + } + + public TimeUnit getWhenMaxWaitTimeUnit() { + if (ObjectUtil.isNull(whenMaxWaitTimeUnit)){ + return TimeUnit.MILLISECONDS; + } + return whenMaxWaitTimeUnit; + } + + public void setWhenMaxWaitTimeUnit(TimeUnit whenMaxWaitTimeUnit) { + this.whenMaxWaitTimeUnit = whenMaxWaitTimeUnit; + } } diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java index dcf36ff87..173f673f4 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java @@ -3,6 +3,7 @@ package com.yomahub.liteflow.springboot; import org.springframework.boot.context.properties.ConfigurationProperties; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * 执行流程主要的参数类 @@ -37,8 +38,13 @@ public class LiteflowProperty { private String threadExecutorClass; // 异步线程最大等待描述 + @Deprecated private int whenMaxWaitSeconds; + private int whenMaxWaitTime; + + private TimeUnit whenMaxWaitTimeUnit; + // 异步线程池最大线程数 private int whenMaxWorkers; @@ -106,10 +112,12 @@ public class LiteflowProperty { this.slotSize = slotSize; } + @Deprecated public int getWhenMaxWaitSeconds() { return whenMaxWaitSeconds; } + @Deprecated public void setWhenMaxWaitSeconds(int whenMaxWaitSeconds) { this.whenMaxWaitSeconds = whenMaxWaitSeconds; } @@ -234,4 +242,19 @@ public class LiteflowProperty { this.ruleSourceExtDataMap = ruleSourceExtDataMap; } + public int getWhenMaxWaitTime() { + return whenMaxWaitTime; + } + + public void setWhenMaxWaitTime(int whenMaxWaitTime) { + this.whenMaxWaitTime = whenMaxWaitTime; + } + + public TimeUnit getWhenMaxWaitTimeUnit() { + return whenMaxWaitTimeUnit; + } + + public void setWhenMaxWaitTimeUnit(TimeUnit whenMaxWaitTimeUnit) { + this.whenMaxWaitTimeUnit = whenMaxWaitTimeUnit; + } } diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java index a5b0d1424..0571c662e 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java @@ -28,6 +28,8 @@ public class LiteflowPropertyAutoConfiguration { liteflowConfig.setSlotSize(property.getSlotSize()); liteflowConfig.setThreadExecutorClass(property.getThreadExecutorClass()); liteflowConfig.setWhenMaxWaitSeconds(property.getWhenMaxWaitSeconds()); + liteflowConfig.setWhenMaxWaitTime(property.getWhenMaxWaitTime()); + liteflowConfig.setWhenMaxWaitTimeUnit(property.getWhenMaxWaitTimeUnit()); liteflowConfig.setEnableLog(liteflowMonitorProperty.isEnableLog()); liteflowConfig.setQueueLimit(liteflowMonitorProperty.getQueueLimit()); liteflowConfig.setDelay(liteflowMonitorProperty.getDelay()); diff --git a/liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 5258f753d..5aadf9e05 100644 --- a/liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/liteflow-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -73,7 +73,21 @@ "type": "java.lang.Integer", "description": "Set the async thread max wait seconds on \" when \" mode.", "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty", - "defaultValue": 15 + "defaultValue": 0 + }, + { + "name": "liteflow.when-max-wait-time", + "type": "java.lang.Integer", + "description": "Set the async thread max wait time on \" when \" mode.", + "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty", + "defaultValue": 15000 + }, + { + "name": "liteflow.when-max-wait-time-unit", + "type": "java.util.concurrent.TimeUnit", + "description": "Set the async thread max wait time unit on \" when \" mode.", + "sourceType": "com.yomahub.liteflow.springboot.LiteflowProperty", + "defaultValue": "MILLISECONDS" }, { "name": "liteflow.when-max-workers", diff --git a/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties b/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties index b3f17ce32..6ebe3d3be 100644 --- a/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties +++ b/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties @@ -5,7 +5,8 @@ liteflow.main-executor-works=64 liteflow.main-executor-class=com.yomahub.liteflow.thread.LiteFlowDefaultMainExecutorBuilder liteflow.request-id-generator-class=com.yomahub.liteflow.flow.id.DefaultRequestIdGenerator liteflow.thread-executor-class=com.yomahub.liteflow.thread.LiteFlowDefaultWhenExecutorBuilder -liteflow.when-max-wait-seconds=15 +liteflow.when-max-wait-time=15000 +liteflow.when-max-wait-time-unit=MILLISECONDS liteflow.when-max-workers=16 liteflow.when-queue-limit=512 liteflow.parse-on-start=true diff --git a/liteflow-testcase-el/liteflow-testcase-el-nospring/src/test/java/com/yomahub/liteflow/test/config/LiteflowConfigTest1.java b/liteflow-testcase-el/liteflow-testcase-el-nospring/src/test/java/com/yomahub/liteflow/test/config/LiteflowConfigTest1.java index f5eac6ee8..6c7ba197c 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-nospring/src/test/java/com/yomahub/liteflow/test/config/LiteflowConfigTest1.java +++ b/liteflow-testcase-el/liteflow-testcase-el-nospring/src/test/java/com/yomahub/liteflow/test/config/LiteflowConfigTest1.java @@ -10,6 +10,8 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import java.util.concurrent.TimeUnit; + /** * 非spring环境下参数单元测试 * @@ -33,7 +35,8 @@ public class LiteflowConfigTest1 extends BaseTest { LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); Assert.assertTrue(response.isSuccess()); Assert.assertEquals("config/flow.el.xml", config.getRuleSource()); - Assert.assertEquals(15, config.getWhenMaxWaitSeconds().intValue()); + Assert.assertEquals(15000, config.getWhenMaxWaitTime().intValue()); + Assert.assertEquals(TimeUnit.MILLISECONDS, config.getWhenMaxWaitTimeUnit()); Assert.assertEquals(200, config.getQueueLimit().intValue()); Assert.assertEquals(300000L, config.getDelay().longValue()); Assert.assertEquals(300000L, config.getPeriod().longValue()); diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/config/LiteflowConfigELSpringTest.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/config/LiteflowConfigELSpringTest.java index e3647352c..3d5083c5d 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/config/LiteflowConfigELSpringTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/config/LiteflowConfigELSpringTest.java @@ -13,6 +13,7 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; +import java.util.concurrent.TimeUnit; /** * spring环境下参数单元测试 @@ -36,7 +37,8 @@ public class LiteflowConfigELSpringTest extends BaseTest { LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); Assert.assertTrue(response.isSuccess()); Assert.assertEquals("config/flow.el.json", config.getRuleSource()); - Assert.assertEquals(15, config.getWhenMaxWaitSeconds().intValue()); + Assert.assertEquals(15000, config.getWhenMaxWaitTime().intValue()); + Assert.assertEquals(TimeUnit.MILLISECONDS, config.getWhenMaxWaitTimeUnit()); Assert.assertEquals(200, config.getQueueLimit().intValue()); Assert.assertEquals(300000L, config.getDelay().longValue()); Assert.assertEquals(300000L, config.getPeriod().longValue());