Merge branch 'dev' of https://gitee.com/dromara/liteFlow into issue/#I8MW6Q-2

 Conflicts:
	liteflow-core/src/main/java/com/yomahub/liteflow/script/ScriptExecutor.java
This commit is contained in:
DaleLee
2024-02-02 21:42:19 +08:00
109 changed files with 2964 additions and 121 deletions

View File

@@ -12,6 +12,12 @@ import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Deprecated
/**
* This class has been deprecated due to its only component retry function. Please use the retry method in the EL expression.
* @Deprecated
* @see # retry(int retryTimes) e.g. THEN( a, b.retry(3) ); WHEN( a, b ).retry(3);
*/
public @interface LiteflowRetry {
@LFAliasFor("retry")

View File

@@ -3,6 +3,7 @@ package com.yomahub.liteflow.annotation.util;
import cn.hutool.core.annotation.AnnotationUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.LFAliasFor;
import java.lang.annotation.Annotation;
@@ -10,15 +11,25 @@ import java.lang.reflect.AnnotatedElement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 注解工具类
* 此工具类带缓存
*
* @author Bryan.Zhang
*/
public class AnnoUtil {
private static Map<String, Annotation> annoCacheMap = new ConcurrentHashMap<>();
public static <A extends Annotation> A getAnnotation(AnnotatedElement annotatedElement, Class<A> annotationType) {
String cacheKey = StrUtil.format("{}-{}", annotatedElement, annotationType.getSimpleName());
if (annoCacheMap.containsKey(cacheKey)){
return (A)annoCacheMap.get(cacheKey);
}
A annotation = AnnotationUtil.getAnnotation(annotatedElement, annotationType);
if (ObjectUtil.isNull(annotation)) {
return null;
@@ -42,6 +53,8 @@ public class AnnoUtil {
}
});
annoCacheMap.put(cacheKey, annotation);
return annotation;
}
@@ -53,5 +66,4 @@ public class AnnoUtil {
return null;
}
}
}

View File

@@ -92,6 +92,7 @@ public class LiteFlowChainELBuilder {
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.MAX_WAIT_SECONDS, Object.class, new MaxWaitSecondsOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.MAX_WAIT_MILLISECONDS, Object.class, new MaxWaitMillisecondsOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.PARALLEL, Object.class, new ParallelOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.RETRY, Object.class, new RetryOperator());
}
public static LiteFlowChainELBuilder createChain() {

View File

@@ -0,0 +1,35 @@
package com.yomahub.liteflow.builder.el.operator;
import com.yomahub.liteflow.builder.el.operator.base.BaseOperator;
import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.condition.RetryCondition;
/**
*
* @author Rain
* @since 2.12.0
*
*/
public class RetryOperator extends BaseOperator<Condition> {
@Override
public Condition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeGteTwo(objects);
Executable executable = OperatorHelper.convert(objects[0], Executable.class);
Integer retryTimes = OperatorHelper.convert(objects[1], Integer.class);
RetryCondition retryCondition = new RetryCondition();
retryCondition.addExecutable(executable);
retryCondition.setRetryTimes(retryTimes);
if(objects.length > 2) {
Class[] forExceptions = new Class[objects.length - 2];
for(int i = 2; i < objects.length; i ++) {
String className = OperatorHelper.convert(objects[i], String.class);
forExceptions[i - 2] = Class.forName(className);
}
retryCondition.setRetryForExceptions(forExceptions);
}
return retryCondition;
}
}

View File

@@ -94,4 +94,6 @@ public interface ChainConstant {
String EXTENDS = "extends";
String RETRY = "retry";
}

View File

@@ -280,6 +280,10 @@ public abstract class NodeComponent {
return this.getSlot().getContextBean(contextBeanClazz);
}
public <T> T getContextBean(String contextName) {
return this.getSlot().getContextBean(contextName);
}
public String getNodeId() {
return nodeId;
}

View File

@@ -101,6 +101,10 @@ public class LiteflowResponse {
return this.getSlot().getContextBean(contextBeanClazz);
}
public <T> T getContextBean(String contextName) {
return this.getSlot().getContextBean(contextName);
}
public Map<String, List<CmpStep>> getExecuteSteps() {
Map<String, List<CmpStep>> map = new LinkedHashMap<>();
this.getSlot().getExecuteSteps().forEach(cmpStep -> {

View File

@@ -21,8 +21,6 @@ import com.yomahub.liteflow.flow.executor.NodeExecutor;
import com.yomahub.liteflow.flow.executor.NodeExecutorHelper;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
@@ -30,6 +28,7 @@ import com.yomahub.liteflow.slot.Slot;
* Node节点实现可执行器 Node节点并不是单例的每构建一次都会copy出一个新的实例
*
* @author Bryan.Zhang
* @author luo yi
*/
public class Node implements Executable, Cloneable, Rollbackable{
@@ -57,6 +56,9 @@ public class Node implements Executable, Cloneable, Rollbackable{
private String currChainId;
// node 的 isAccess 结果,主要用于 WhenCondition 的提前 isAccess 判断,避免 isAccess 方法重复执行
private TransmittableThreadLocal<Boolean> accessResult = new TransmittableThreadLocal<>();
private TransmittableThreadLocal<Integer> loopIndexTL = new TransmittableThreadLocal<>();
private TransmittableThreadLocal<Object> currLoopObject = new TransmittableThreadLocal<>();
@@ -125,16 +127,13 @@ public class Node implements Executable, Cloneable, Rollbackable{
throw new FlowSystemException("there is no instance for node id " + id);
}
Slot slot = DataBus.getSlot(slotIndex);
try {
// 把线程属性赋值给组件对象
instance.setSlotIndex(slotIndex);
instance.setRefNode(this);
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 判断是否可执行所以isAccess经常作为一个组件进入的实际判断要素用作检查slot里的参数的完备性
if (instance.isAccess()) {
if (getAccessResult() || instance.isAccess()) {
LOG.info("[O]start component[{}] execution", instance.getDisplayName());
// 这里开始进行重试的逻辑和主逻辑的运行
@@ -142,8 +141,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
.buildNodeExecutor(instance.getNodeExecutorClass());
// 调用节点执行器进行执行
nodeExecutor.execute(instance);
}
else {
} else {
LOG.info("[X]skip component[{}] execution", instance.getDisplayName());
}
// 如果组件覆盖了isEnd方法或者在在逻辑中主要调用了setEnd(true)的话,流程就会立马结束
@@ -178,6 +176,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
instance.removeIsEnd();
instance.removeRefNode();
removeLoopIndex();
removeAccessResult();
}
}
@@ -253,6 +252,19 @@ public class Node implements Executable, Cloneable, Rollbackable{
return currChainId;
}
public boolean getAccessResult() {
Boolean result = this.accessResult.get();
return result == null ? false : result;
}
public void setAccessResult(boolean accessResult) {
this.accessResult.set(accessResult);
}
public void removeAccessResult() {
this.accessResult.remove();
}
public void setLoopIndex(int index) {
this.loopIndexTL.set(index);
}
@@ -299,6 +311,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
Node node = (Node)this.clone();
node.loopIndexTL = new TransmittableThreadLocal<>();
node.currLoopObject = new TransmittableThreadLocal<>();
node.accessResult = new TransmittableThreadLocal<>();
return node;
}
}

View File

@@ -0,0 +1,108 @@
package com.yomahub.liteflow.flow.element.condition;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.exception.ChainEndException;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.slot.DataBus;
import java.util.Arrays;
import java.util.List;
/**
*
* @author Rain
* @since 2.12.0
*
*/
public class RetryCondition extends ThenCondition{
private final LFLog LOG = LFLoggerManager.getLogger(this.getClass());
private Integer retryTimes;
private Class<? extends Exception>[] retryForExceptions = new Class[] { Exception.class };
public Class<? extends Exception>[] getRetryForExceptions() {
return retryForExceptions;
}
public void setRetryForExceptions(Class<? extends Exception>[] retryForExceptions) {
this.retryForExceptions = retryForExceptions;
}
public Integer getRetryTimes() {
return retryTimes;
}
public void setRetryTimes(Integer retryTimes) {
this.retryTimes = retryTimes;
}
@Override
public void executeCondition(Integer slotIndex) throws Exception {
int retryTimes = this.getRetryTimes() < 0 ? 0 : this.getRetryTimes();
List<Class<? extends Exception>> forExceptions = Arrays.asList(this.getRetryForExceptions());
for (int i = 0; i <= retryTimes; i ++) {
try {
if(i == 0) {
super.executeCondition(slotIndex);
} else {
retry(slotIndex, i);
}
break;
} catch (ChainEndException e) {
throw e;
} catch (Exception e) {
// 判断抛出的异常是不是指定异常的子类
boolean flag = forExceptions.stream().anyMatch(clazz -> clazz.isAssignableFrom(e.getClass()));
if(!flag || i >= retryTimes) {
if(retryTimes > 0) {
String retryFailMsg = StrFormatter.format("retry fail when executing the chain[{}] because {} occurs {}.",
this.getCurrChainId(), this.getCurrentExecutableId(), e);
LOG.error(retryFailMsg);
}
throw e;
} else {
DataBus.getSlot(slotIndex).removeException();
}
}
}
}
private void retry(Integer slotIndex, int retryTime) throws Exception {
LOG.info("{} performs {} retry ", this.getCurrentExecutableId(), retryTime);
super.executeCondition(slotIndex);
}
/**
* 获取当前组件的 id
*
* @return
*/
private String getCurrentExecutableId() {
// retryCondition 只有一个 Executable
Executable executable = this.getExecutableList().get(0);
if (ObjectUtil.isNotNull(executable.getId())) {
// 已经有 id 了
return executable.getId();
}
// 定义 id
switch (executable.getExecuteType()) {
// chain 和 node 一般都有 id
case CHAIN:
return ((Chain) executable).getChainId();
case CONDITION:
return "condition-" + ((Condition) executable).getConditionType().getName();
case NODE:
return "node-" + ((Node) executable).getType().getCode();
default:
return "unknown-executable";
}
}
}

View File

@@ -51,13 +51,17 @@ public class ThenCondition extends Condition {
}
catch (Exception e) {
Slot slot = DataBus.getSlot(slotIndex);
String chainId = this.getCurrChainId();
// 这里事先取到exception set到slot里为了方便finally取到exception
if (slot.isSubChain(chainId)) {
slot.setSubException(chainId, e);
}
else {
slot.setException(e);
//正常情况下slot不可能为null
//当设置了超时后还在运行的组件就有可能因为主流程已经结束释放slot而导致slot为null
if (slot != null){
String chainId = this.getCurrChainId();
// 这里事先取到exception set到slot里为了方便finally取到exception
if (slot.isSubChain(chainId)) {
slot.setSubException(chainId, e);
}
else {
slot.setException(e);
}
}
throw e;
}

View File

@@ -31,9 +31,10 @@ public class AllOfParallelExecutor extends ParallelStrategyExecutor {
}
//allOf这个场景中不需要过滤
//allOf 这个场景中,不需要过滤
@Override
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
return stream;
}
}

View File

@@ -1,12 +1,10 @@
package com.yomahub.liteflow.flow.parallel.strategy;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
/**
* 完成任一任务
@@ -31,18 +29,4 @@ public class AnyOfParallelExecutor extends ParallelStrategyExecutor {
}
//在anyOf这个场景中需要过滤掉isAccess为false的场景
//因为不过滤这个的话,如果加上了 any那么 isAccess 为 false 那就是最快的了
//换句话说就是anyOf这个场景isAccess会被执行两次
@Override
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
return stream.filter(executable -> {
try {
return executable.isAccess(slotIndex);
} catch (Exception e) {
LOG.error("there was an error when executing the when component isAccess", e);
return false;
}
});
}
}

View File

@@ -6,6 +6,7 @@ import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.enums.ParallelStrategyEnum;
import com.yomahub.liteflow.exception.WhenExecuteException;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
import com.yomahub.liteflow.flow.element.condition.PreCondition;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
@@ -93,20 +94,25 @@ public abstract class ParallelStrategyExecutor {
// 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了
// 2.过滤 isAccess 为 false 的情况,因为不过滤这个的话,如果加上了 any那么 isAccess 为 false 那就是最快的了
Stream<Executable> stream = executableList.stream()
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
.filter(executable -> {
try {
return executable.isAccess(slotIndex);
} catch (Exception e) {
LOG.error("there was an error when executing the when component isAccess", e);
return false;
}
});
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition));
return filterAccess(stream, slotIndex);
}
//过滤isAccess的抽象接口方法
protected abstract Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex);
// 过滤 isAccess 的方法,默认实现,同时为避免同一个 node 的 isAccess 方法重复执行,给 node 设置 isAccess 方法执行结果
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
return stream.filter(executable -> {
try {
boolean access = executable.isAccess(slotIndex);
if (executable instanceof Node) {
((Node) executable).setAccessResult(access);
}
return access;
} catch (Exception e) {
LOG.error("there was an error when executing the when component isAccess", e);
return false;
}
});
}
/**
* 获取 WHEN 所需线程池

View File

@@ -1,14 +1,12 @@
package com.yomahub.liteflow.flow.parallel.strategy;
import cn.hutool.core.collection.CollUtil;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;
/**
* 完成指定任务执行器,使用 ID 进行比较
@@ -77,19 +75,4 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
}
//在must这个场景中需要过滤掉isAccess为false的场景
//因为不过滤这个的话,如果加上了 any那么 isAccess 为 false 那就是最快的了
//换句话说就是must这个场景isAccess会被执行两次
@Override
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
return stream.filter(executable -> {
try {
return executable.isAccess(slotIndex);
} catch (Exception e) {
LOG.error("there was an error when executing the when component isAccess", e);
return false;
}
});
}
}

View File

@@ -7,11 +7,10 @@ import com.yomahub.liteflow.annotation.util.AnnoUtil;
import com.yomahub.liteflow.context.ContextBean;
import com.yomahub.liteflow.enums.ScriptTypeEnum;
import com.yomahub.liteflow.exception.LiteFlowException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import java.util.List;
import javax.script.ScriptException;
import java.util.Map;
import java.util.function.BiConsumer;
@@ -29,12 +28,6 @@ public abstract class ScriptExecutor {
public abstract void load(String nodeId, String script);
// 卸载脚本(不包含 node
public abstract void unLoad(String nodeId);
// 获取该执行器下的所有 nodeId
public abstract List<String> getNodeIds();
public Object execute(ScriptExecuteWrap wrap) throws Exception{
try {
return executeScript(wrap);
@@ -62,17 +55,7 @@ public abstract class ScriptExecutor {
// key的规则为自定义上下文的simpleName
// 比如你的自定义上下文为AbcContext那么key就为:abcContext
// 这里不统一放一个map的原因是考虑到有些用户会调用上下文里的方法而不是参数所以脚本语言的绑定表里也是放多个上下文
DataBus.getContextBeanList(wrap.getSlotIndex()).forEach(o -> {
ContextBean contextBean = AnnoUtil.getAnnotation(o.getClass(), ContextBean.class);
String key;
if (contextBean != null && contextBean.value().trim().length() > 0) {
key = contextBean.value();
}
else {
key = StrUtil.lowerFirst(o.getClass().getSimpleName());
}
putConsumer.accept(key, o);
});
DataBus.getContextBeanList(wrap.getSlotIndex()).forEach(tuple -> putConsumer.accept(tuple.get(0), tuple.get(1)));
// 把wrap对象转换成元数据map
Map<String, Object> metaMap = BeanUtil.beanToMap(wrap);
@@ -94,4 +77,12 @@ public abstract class ScriptExecutor {
ScriptBeanManager.getScriptBeanMap().forEach(putIfAbsentConsumer);
}
/**
* 利用相应框架编译脚本
*
* @param script 脚本
* @return boolean
* @throws Exception 例外
*/
public abstract Object compile(String script) throws Exception;
}

View File

@@ -46,8 +46,7 @@ public abstract class JSR223ScriptExecutor extends ScriptExecutor {
@Override
public void load(String nodeId, String script) {
try {
CompiledScript compiledScript = ((Compilable) scriptEngine).compile(convertScript(script));
compiledScriptMap.put(nodeId, compiledScript);
compiledScriptMap.put(nodeId, (CompiledScript) compile(script));
}
catch (Exception e) {
String errorMsg = StrUtil.format("script loading error for node[{}], error msg:{}", nodeId, e.getMessage());
@@ -85,4 +84,12 @@ public abstract class JSR223ScriptExecutor extends ScriptExecutor {
compiledScriptMap.clear();
}
@Override
public Object compile(String script) throws ScriptException {
if(scriptEngine == null) {
LOG.error("script engine has not init");
}
return ((Compilable) scriptEngine).compile(convertScript(script));
}
}

View File

@@ -0,0 +1,104 @@
package com.yomahub.liteflow.script.validator;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.enums.ScriptTypeEnum;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.script.ScriptExecutor;
import java.util.*;
/**
* 脚本验证类
*
* @author Ge_Zuao
* @since 2.12.0
*/
public class ScriptValidator {
private static final LFLog LOG = LFLoggerManager.getLogger(ScriptValidator.class);
private static Map<ScriptTypeEnum, ScriptExecutor> scriptExecutors;
static {
List<ScriptExecutor> scriptExecutorList = new ArrayList<>();
scriptExecutors = new HashMap<>();
ServiceLoader.load(ScriptExecutor.class).forEach(scriptExecutorList::add);
scriptExecutorList.stream()
.peek(ScriptExecutor::init)
.forEach(scriptExecutor -> scriptExecutors.put(scriptExecutor.scriptType(), scriptExecutor));
}
/**
* 验证脚本逻辑的公共部分
*
* @param script 脚本
* @param scriptType 脚本类型
* @return boolean
*/
private static boolean validateScript(String script, ScriptTypeEnum scriptType){
// 未加载任何脚本模块
if(scriptExecutors.isEmpty()){
LOG.error("The loaded script modules not found.");
return false;
}
// 指定脚本语言未加载
if (scriptType != null && !scriptExecutors.containsKey(scriptType)) {
LOG.error(StrUtil.format("Specified script language {} was not found.", scriptType));
return false;
}
// 加载多个脚本语言需要指定语言验证
if (scriptExecutors.size() > 1 && scriptType == null) {
LOG.error("The loaded script modules more than 1. Please specify the script language.");
return false;
}
ScriptExecutor scriptExecutor = (scriptType != null) ? scriptExecutors.get(scriptType) : scriptExecutors.values().iterator().next();
try {
scriptExecutor.compile(script);
} catch (Exception e) {
LOG.error(StrUtil.format("{} Script component validate failure. ", scriptExecutor.scriptType()) + e.getMessage());
return false;
}
return true;
}
/**
* 只引入一种脚本语言时,可以不指定语言验证
*
* @param script 脚本
* @return boolean
*/
public static boolean validate(String script){
return validateScript(script, null);
}
/**
* 指定脚本语言验证
*
* @param script 脚本
* @param scriptType 脚本类型
* @return boolean
*/
public static boolean validate(String script, ScriptTypeEnum scriptType){
return validateScript(script, scriptType);
}
/**
* 多语言脚本批量验证
*
* @param scripts 脚本
* @return boolean
*/
public static boolean validate(Map<ScriptTypeEnum, String> scripts){
for(Map.Entry<ScriptTypeEnum, String> script : scripts.entrySet()){
if(!validate(script.getValue(), script.getKey())){
return false;
}
}
return true;
}
}

View File

@@ -7,10 +7,14 @@
*/
package com.yomahub.liteflow.slot;
import cn.hutool.core.lang.Tuple;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.util.AnnoUtil;
import com.yomahub.liteflow.context.ContextBean;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.property.LiteflowConfig;
@@ -74,13 +78,22 @@ public class DataBus {
.map((Function<Class<?>, Object>) ReflectUtil::newInstanceIfPossible)
.collect(Collectors.toList());
Slot slot = new Slot(contextBeanList);
return offerIndex(slot);
return offerSlotByBean(contextBeanList);
}
public static int offerSlotByBean(List<Object> contextList) {
Slot slot = new Slot(contextList);
List<Tuple> contextBeanList = contextList.stream().map(object -> {
ContextBean contextBean = AnnoUtil.getAnnotation(object.getClass(), ContextBean.class);
String contextKey;
if (contextBean != null && StrUtil.isNotBlank(contextBean.value())){
contextKey = contextBean.value();
}else{
contextKey = StrUtil.lowerFirst(object.getClass().getSimpleName());
}
return new Tuple(contextKey, object);
}).collect(Collectors.toList());
Slot slot = new Slot(contextBeanList);
return offerIndex(slot);
}
@@ -128,7 +141,7 @@ public class DataBus {
return SLOTS.get(slotIndex);
}
public static List<Object> getContextBeanList(int slotIndex) {
public static List<Tuple> getContextBeanList(int slotIndex) {
Slot slot = getSlot(slotIndex);
return slot.getContextBeanList();
}

View File

@@ -9,8 +9,10 @@ package com.yomahub.liteflow.slot;
import cn.hutool.core.collection.ConcurrentHashSet;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.lang.Tuple;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.yomahub.liteflow.exception.NoSuchContextBeanException;
import com.yomahub.liteflow.exception.NullParamException;
import com.yomahub.liteflow.flow.element.Condition;
@@ -29,6 +31,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Predicate;
/**
* Slot的抽象类实现
@@ -90,14 +93,14 @@ public class Slot {
protected ConcurrentHashMap<String, Object> metaDataMap = new ConcurrentHashMap<>();
private List<Object> contextBeanList;
private List<Tuple> contextBeanList;
private static final ThreadLocal<Deque<Condition>> conditionStack = ThreadLocal.withInitial(LinkedList::new);
private static final TransmittableThreadLocal<Deque<Condition>> conditionStack = TransmittableThreadLocal.withInitial(ConcurrentLinkedDeque::new);
public Slot() {
}
public Slot(List<Object> contextBeanList) {
public Slot(List<Tuple> contextBeanList) {
this.contextBeanList = contextBeanList;
}
@@ -448,21 +451,30 @@ public class Slot {
metaDataMap.remove(SUB_EXCEPTION_PREFIX + chainId);
}
public List<Object> getContextBeanList() {
public List<Tuple> getContextBeanList() {
return this.contextBeanList;
}
public <T> T getContextBean(Class<T> contextBeanClazz) {
T t = (T) contextBeanList.stream().filter(o -> o.getClass().getName().equals(contextBeanClazz.getName())).findFirst().orElse(null);
if (t == null) {
Tuple contextTuple = contextBeanList.stream().filter(tuple -> tuple.get(1).getClass().getName().equals(contextBeanClazz.getName())).findFirst().orElse(null);
if (contextTuple == null) {
contextBeanList.forEach(o -> LOG.info("ChainId[{}], Context class:{},Request class:{}", this.getChainId(), o.getClass().getName(), contextBeanClazz.getName()));
throw new NoSuchContextBeanException("this type is not in the context type passed in");
}
return t;
return contextTuple.get(1);
}
public <T> T getContextBean(String contextBeanKey) {
Tuple contextTuple = contextBeanList.stream().filter(tuple -> tuple.get(0).equals(contextBeanKey)).findFirst().orElse(null);
if (contextTuple == null) {
contextBeanList.forEach(o -> LOG.info("ChainId[{}], Context class:{},Request contextBeanKey:{}", this.getChainId(), o.getClass().getName(), contextBeanKey));
throw new NoSuchContextBeanException("this context key is not defined");
}
return contextTuple.get(1);
}
public <T> T getFirstContextBean() {
Class<T> firstContextBeanClazz = (Class<T>) this.getContextBeanList().get(0).getClass();
Class<T> firstContextBeanClazz = (Class<T>) this.getContextBeanList().get(0).get(1).getClass();
return this.getContextBean(firstContextBeanClazz);
}