feature #I4ZVCL 执行器response增加future的支持

This commit is contained in:
bryan31
2022-04-27 23:28:30 +08:00
parent 2acec6c7e2
commit 653053c8b6
30 changed files with 547 additions and 37 deletions

View File

@@ -26,6 +26,7 @@ import com.yomahub.liteflow.parser.*;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.spi.holder.ContextAwareHolder;
import com.yomahub.liteflow.thread.ExecutorHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +34,8 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* 流程规则主要执行器类
@@ -58,7 +61,9 @@ public class FlowExecutor {
private LiteflowConfig liteflowConfig;
public FlowExecutor(){
public FlowExecutor() {
//设置FlowExecutor的Holder虽然大部分地方都可以通过Spring上下文获取到但放入Holder还是为了某些地方能方便的取到
FlowExecutorHolder.setHolder(this);
//初始化DataBus
DataBus.init();
}
@@ -67,6 +72,8 @@ public class FlowExecutor {
this.liteflowConfig = liteflowConfig;
//把liteFlowConfig设到LiteFlowGetter中去
LiteflowConfigGetter.setLiteflowConfig(liteflowConfig);
//设置FlowExecutor的Holder虽然大部分地方都可以通过Spring上下文获取到但放入Holder还是为了某些地方能方便的取到
FlowExecutorHolder.setHolder(this);
if (liteflowConfig.isParseOnStart()){
this.init();
}
@@ -327,6 +334,12 @@ public class FlowExecutor {
return this.execute2Resp(chainId, param, slotClazz, null, false);
}
public <T extends Slot> Future<LiteflowResponse<T>> execute2Future(String chainId, Object param, Class<T> slotClazz) {
return ExecutorHelper.loadInstance().buildMainExecutor(liteflowConfig.getMainExecutorClass()).submit(()
-> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, slotClazz, null, false));
}
public <T extends Slot> LiteflowResponse<T> execute2Resp(String chainId, Object param, Class<T> slotClazz, Integer slotIndex,
boolean isInnerChain) {
LiteflowResponse<T> response = new LiteflowResponse<>();
@@ -427,6 +440,7 @@ public class FlowExecutor {
public void setLiteflowConfig(LiteflowConfig liteflowConfig) {
this.liteflowConfig = liteflowConfig;
//把liteFlowConfig设到LiteFlowGetter中去
LiteflowConfigGetter.setLiteflowConfig(liteflowConfig);
}
}

View File

@@ -22,6 +22,10 @@ public class FlowExecutorHolder {
return flowExecutor;
}
public static void setHolder(FlowExecutor flowExecutor){
FlowExecutorHolder.flowExecutor = flowExecutor;
}
public static void clean(){
flowExecutor = null;
}

View File

@@ -133,13 +133,13 @@ public class Chain implements Executable {
Slot slot = DataBus.getSlot(slotIndex);
//此方法其实只会初始化一次Executor不会每次都会初始化。Executor是唯一的
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutor(condition.getThreadExecutorClass());
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(condition.getThreadExecutorClass());
//获得liteflow的参数
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
//定义是否中断参数
//这里为什么要定义成数组呢因为后面lumbda要用到根据final不能修改引用的原则这里用了数组对象
//这里为什么要定义成数组呢因为后面lambda要用到根据final不能修改引用的原则这里用了数组对象
final boolean[] interrupted = {false};
//这里主要是做了封装CompletableFuture对象用lumbda表达式做了很多事情这句代码要仔细理清

View File

@@ -76,6 +76,12 @@ public class LiteflowConfig {
//是否打印liteflow banner
private Boolean printBanner;
//FlowExecutor的execute2Future的线程数
private Integer mainExecutorWorks;
//FlowExecutor的execute2Future的自定义线程池
private String mainExecutorClass;
public Boolean getEnable() {
if (ObjectUtil.isNull(enable)) {
return true;
@@ -254,7 +260,7 @@ public class LiteflowConfig {
public String getThreadExecutorClass() {
if (StrUtil.isBlank(threadExecutorClass)){
return "com.yomahub.liteflow.thread.LiteFlowDefaultExecutorBuilder";
return "com.yomahub.liteflow.thread.LiteFlowDefaultWhenExecutorBuilder";
}else{
return threadExecutorClass;
}
@@ -275,4 +281,28 @@ public class LiteflowConfig {
public void setNodeExecutorClass(String nodeExecutorClass) {
this.nodeExecutorClass = nodeExecutorClass;
}
public Integer getMainExecutorWorks() {
if (ObjectUtil.isNull(mainExecutorWorks)){
return 64;
}else{
return mainExecutorWorks;
}
}
public void setMainExecutorWorks(Integer mainExecutorWorks) {
this.mainExecutorWorks = mainExecutorWorks;
}
public String getMainExecutorClass() {
if (StrUtil.isBlank(mainExecutorClass)){
return "com.yomahub.liteflow.thread.LiteFlowDefaultMainExecutorBuilder";
}else{
return mainExecutorClass;
}
}
public void setMainExecutorClass(String mainExecutorClass) {
this.mainExecutorClass = mainExecutorClass;
}
}

View File

@@ -9,6 +9,7 @@
package com.yomahub.liteflow.thread;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Maps;
import com.yomahub.liteflow.exception.ThreadExecutorServiceCreateException;
@@ -87,46 +88,49 @@ public class ExecutorHelper {
}
}
//构建全局默认线程池
public ExecutorService buildExecutor() {
//构建默认when线程池
public ExecutorService buildWhenExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
if (!executorServiceMap.containsKey(liteflowConfig.getThreadExecutorClass())) {
ExecutorService executorService = getExecutorBuilder(liteflowConfig.getThreadExecutorClass()).buildExecutor();
executorServiceMap.put(liteflowConfig.getThreadExecutorClass(), executorService);
}
return executorServiceMap.get(liteflowConfig.getThreadExecutorClass());
return buildWhenExecutor(liteflowConfig.getThreadExecutorClass());
}
//构建线程池执行器 - 支持多个when公用一个线程池
public ExecutorService buildExecutor(String threadExecutorClass) {
if (StrUtil.isBlank(threadExecutorClass)) {
return buildExecutor();
//构建when线程池 - 支持多个when公用一个线程池
public ExecutorService buildWhenExecutor(String clazz) {
if (StrUtil.isBlank(clazz)) {
return buildWhenExecutor();
}
ExecutorService executorServiceFromCache = executorServiceMap.get(threadExecutorClass);
if (executorServiceFromCache != null) {
return executorServiceFromCache;
} else {
ExecutorService executorService = getExecutorBuilder(threadExecutorClass).buildExecutor();
executorServiceMap.put(threadExecutorClass, executorService);
return executorService;
return getExecutorService(clazz);
}
//构建默认的FlowExecutor线程池用于execute2Future方法
public ExecutorService buildMainExecutor(){
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
return buildMainExecutor(liteflowConfig.getMainExecutorClass());
}
public ExecutorService buildMainExecutor(String clazz){
if (StrUtil.isBlank(clazz)) {
return buildMainExecutor();
}
return getExecutorService(clazz);
}
/**
* <p>
* 根据线程执行构建者Class类名获取ExecutorBuilder实例
* </p>
*
* @param threadExecutorClass 线程执行class全量名
* @return com.yomahub.liteflow.thread.ExecutorBuilder
* @author sikadai
* @date 2022/1/21 23:04
* 根据线程执行构建者Class类名获取ExecutorService实例
*/
private ExecutorBuilder getExecutorBuilder(String threadExecutorClass) {
try {
Class<ExecutorBuilder> executorClass = (Class<ExecutorBuilder>) Class.forName(threadExecutorClass);
return ContextAwareHolder.loadContextAware().registerBean(executorClass);
} catch (Exception e) {
private ExecutorService getExecutorService(String clazz) {
try{
ExecutorService executorServiceFromCache = executorServiceMap.get(clazz);
if (ObjectUtil.isNotNull(executorServiceFromCache)) {
return executorServiceFromCache;
} else {
Class<ExecutorBuilder> executorClass = (Class<ExecutorBuilder>) Class.forName(clazz);
ExecutorBuilder executorBuilder = ContextAwareHolder.loadContextAware().registerBean(executorClass);
ExecutorService executorService = executorBuilder.buildExecutor();
executorServiceMap.put(clazz, executorService);
return executorService;
}
}catch (Exception e){
LOG.error(e.getMessage(), e);
throw new ThreadExecutorServiceCreateException(e.getMessage());
}

View File

@@ -0,0 +1,23 @@
package com.yomahub.liteflow.thread;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import java.util.concurrent.ExecutorService;
public class LiteFlowDefaultMainExecutorBuilder implements ExecutorBuilder{
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
//只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)){
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(
liteflowConfig.getMainExecutorWorks(),
liteflowConfig.getMainExecutorWorks(),
200,
"lf-main-thead-");
}
}

View File

@@ -11,7 +11,7 @@ import java.util.concurrent.*;
* @author Bryan.Zhang
* @since 2.6.6
*/
public class LiteFlowDefaultExecutorBuilder implements ExecutorBuilder{
public class LiteFlowDefaultWhenExecutorBuilder implements ExecutorBuilder{
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();