Merge branch 'dev' of https://gitee.com/dromara/liteFlow into issues/I61D1N-v2

# Conflicts:
#	liteflow-core/src/main/java/com/yomahub/liteflow/parser/helper/ParserHelper.java
This commit is contained in:
tangkaicheng
2024-03-20 11:00:02 +08:00
384 changed files with 6094 additions and 1039 deletions

View File

@@ -1,5 +1,6 @@
package com.yomahub.liteflow.annotation;
import com.yomahub.liteflow.enums.BooleanTypeEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import java.lang.annotation.Documented;
@@ -20,4 +21,6 @@ import java.lang.annotation.Target;
@Documented
@Inherited
public @interface FallbackCmp {
BooleanTypeEnum value() default BooleanTypeEnum.NOT_BOOL;
}

View File

@@ -12,6 +12,12 @@ import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Deprecated
/**
* This class has been deprecated due to its only component retry function. Please use the retry method in the EL expression.
* @Deprecated
* @see # retry(int retryTimes) e.g. THEN( a, b.retry(3) ); WHEN( a, b ).retry(3);
*/
public @interface LiteflowRetry {
@LFAliasFor("retry")

View File

@@ -3,6 +3,7 @@ package com.yomahub.liteflow.annotation.util;
import cn.hutool.core.annotation.AnnotationUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.LFAliasFor;
import java.lang.annotation.Annotation;
@@ -10,15 +11,25 @@ import java.lang.reflect.AnnotatedElement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 注解工具类
* 此工具类带缓存
*
* @author Bryan.Zhang
*/
public class AnnoUtil {
private static Map<String, Annotation> annoCacheMap = new ConcurrentHashMap<>();
public static <A extends Annotation> A getAnnotation(AnnotatedElement annotatedElement, Class<A> annotationType) {
String cacheKey = StrUtil.format("{}-{}", annotatedElement, annotationType.getSimpleName());
if (annoCacheMap.containsKey(cacheKey)){
return (A)annoCacheMap.get(cacheKey);
}
A annotation = AnnotationUtil.getAnnotation(annotatedElement, annotationType);
if (ObjectUtil.isNull(annotation)) {
return null;
@@ -42,6 +53,8 @@ public class AnnoUtil {
}
});
annoCacheMap.put(cacheKey, annotation);
return annotation;
}
@@ -53,5 +66,4 @@ public class AnnoUtil {
return null;
}
}
}

View File

@@ -34,22 +34,14 @@ public class LiteFlowNodeBuilder {
return new LiteFlowNodeBuilder(NodeTypeEnum.SWITCH);
}
public static LiteFlowNodeBuilder createIfNode() {
return new LiteFlowNodeBuilder(NodeTypeEnum.IF);
public static LiteFlowNodeBuilder createBooleanNode() {
return new LiteFlowNodeBuilder(NodeTypeEnum.BOOLEAN);
}
public static LiteFlowNodeBuilder createForNode() {
return new LiteFlowNodeBuilder(NodeTypeEnum.FOR);
}
public static LiteFlowNodeBuilder createWhileNode() {
return new LiteFlowNodeBuilder(NodeTypeEnum.WHILE);
}
public static LiteFlowNodeBuilder createBreakNode() {
return new LiteFlowNodeBuilder(NodeTypeEnum.BREAK);
}
public static LiteFlowNodeBuilder createIteratorNode() {
return new LiteFlowNodeBuilder(NodeTypeEnum.ITERATOR);
}
@@ -62,22 +54,14 @@ public class LiteFlowNodeBuilder {
return new LiteFlowNodeBuilder(NodeTypeEnum.SWITCH_SCRIPT);
}
public static LiteFlowNodeBuilder createScriptIfNode() {
return new LiteFlowNodeBuilder(NodeTypeEnum.IF_SCRIPT);
public static LiteFlowNodeBuilder createScriptBooleanNode() {
return new LiteFlowNodeBuilder(NodeTypeEnum.BOOLEAN_SCRIPT);
}
public static LiteFlowNodeBuilder createScriptForNode() {
return new LiteFlowNodeBuilder(NodeTypeEnum.FOR_SCRIPT);
}
public static LiteFlowNodeBuilder createScriptWhileNode() {
return new LiteFlowNodeBuilder(NodeTypeEnum.WHILE_SCRIPT);
}
public static LiteFlowNodeBuilder createScriptBreakNode() {
return new LiteFlowNodeBuilder(NodeTypeEnum.BREAK_SCRIPT);
}
public LiteFlowNodeBuilder() {
this.node = new Node();
}

View File

@@ -21,6 +21,7 @@ import com.yomahub.liteflow.exception.ParseException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
@@ -44,6 +45,11 @@ public class LiteFlowChainELBuilder {
private Chain chain;
/**
* 这是route EL的文本
*/
private Executable route;
/**
* 这是主体的Condition //声明这个变量而不是用chain.getConditionList的目的是为了辅助平滑加载
* 虽然FlowBus里面的map都是CopyOnWrite类型的但是在buildCondition的时候为了平滑加载所以不能事先把chain.getConditionList给设为空List
@@ -92,6 +98,7 @@ public class LiteFlowChainELBuilder {
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.MAX_WAIT_SECONDS, Object.class, new MaxWaitSecondsOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.MAX_WAIT_MILLISECONDS, Object.class, new MaxWaitMillisecondsOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.PARALLEL, Object.class, new ParallelOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.RETRY, Object.class, new RetryOperator());
}
public static LiteFlowChainELBuilder createChain() {
@@ -129,6 +136,45 @@ public class LiteFlowChainELBuilder {
return this;
}
public LiteFlowChainELBuilder setRoute(String routeEl){
if (StrUtil.isBlank(routeEl)) {
String errMsg = StrUtil.format("You have defined the label <route> but there is no content in the chain[{}].", chain.getChainId());
throw new FlowSystemException(errMsg);
}
List<String> errorList = new ArrayList<>();
try {
DefaultContext<String, Object> context = new DefaultContext<>();
// 往上下文里放入所有的node使得el表达式可以直接引用到nodeId
FlowBus.getNodeMap().keySet().forEach(nodeId -> context.put(nodeId, FlowBus.getNode(nodeId)));
// 解析route el成为一个executable
Executable routeExecutable = (Executable) EXPRESS_RUNNER.execute(routeEl, context, errorList, true, true);
if (Objects.isNull(routeExecutable)){
throw new QLException(StrUtil.format("parse route el fail,el:[{}]", routeEl));
}
// 把主要的condition加入
this.route = routeExecutable;
return this;
} catch (QLException e) {
// EL 底层会包装异常,这里是曲线处理
if (ObjectUtil.isNotNull(e.getCause()) && Objects.equals(e.getCause().getMessage(), DataNotFoundException.MSG)) {
// 构建错误信息
String msg = buildDataNotFoundExceptionMsg(routeEl);
throw new ELParseException(msg);
}else if (ObjectUtil.isNotNull(e.getCause())){
throw new ELParseException(e.getCause().getMessage());
}else{
throw new ELParseException(e.getMessage());
}
} catch (Exception e) {
String errMsg = StrUtil.format("parse el fail in this chain[{}];\r\n", chain.getChainId());
throw new ELParseException(errMsg + e.getMessage());
}
}
public LiteFlowChainELBuilder setEL(String elStr) {
if (StrUtil.isBlank(elStr)) {
String errMsg = StrUtil.format("no content in this chain[{}]", chain.getChainId());
@@ -196,6 +242,7 @@ public class LiteFlowChainELBuilder {
}
public void build() {
this.chain.setRouteItem(this.route);
this.chain.setConditionList(this.conditionList);
//暂且去掉循环依赖检测因为有发现循环依赖检测在对大的EL进行检测的时候会导致CPU飙升也或许是jackson低版本的问题

View File

@@ -16,7 +16,7 @@ import com.yomahub.liteflow.flow.element.condition.BooleanConditionTypeEnum;
public class AndOperator extends BaseOperator<AndOrCondition> {
@Override
public AndOrCondition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeGtTwo(objects);
OperatorHelper.checkObjectSizeGteTwo(objects);
AndOrCondition andOrCondition = new AndOrCondition();
andOrCondition.setBooleanConditionType(BooleanConditionTypeEnum.AND);

View File

@@ -19,7 +19,7 @@ public class MustOperator extends BaseOperator<WhenCondition> {
@Override
public WhenCondition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeGtTwo(objects);
OperatorHelper.checkObjectSizeGteTwo(objects);
String errorMsg = "The caller must be WhenCondition item";
WhenCondition whenCondition = OperatorHelper.convert(objects[0], WhenCondition.class, errorMsg);

View File

@@ -16,7 +16,7 @@ import com.yomahub.liteflow.flow.element.condition.BooleanConditionTypeEnum;
public class OrOperator extends BaseOperator<AndOrCondition> {
@Override
public AndOrCondition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeGtTwo(objects);
OperatorHelper.checkObjectSizeGteTwo(objects);
AndOrCondition andOrCondition = new AndOrCondition();
andOrCondition.setBooleanConditionType(BooleanConditionTypeEnum.OR);

View File

@@ -0,0 +1,35 @@
package com.yomahub.liteflow.builder.el.operator;
import com.yomahub.liteflow.builder.el.operator.base.BaseOperator;
import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.condition.RetryCondition;
/**
*
* @author Rain
* @since 2.12.0
*
*/
public class RetryOperator extends BaseOperator<Condition> {
@Override
public Condition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeGteTwo(objects);
Executable executable = OperatorHelper.convert(objects[0], Executable.class);
Integer retryTimes = OperatorHelper.convert(objects[1], Integer.class);
RetryCondition retryCondition = new RetryCondition();
retryCondition.addExecutable(executable);
retryCondition.setRetryTimes(retryTimes);
if(objects.length > 2) {
Class[] forExceptions = new Class[objects.length - 2];
for(int i = 2; i < objects.length; i ++) {
String className = OperatorHelper.convert(objects[i], String.class);
forExceptions[i - 2] = Class.forName(className);
}
retryCondition.setRetryForExceptions(forExceptions);
}
return retryCondition;
}
}

View File

@@ -15,7 +15,7 @@ public class ToOperator extends BaseOperator<SwitchCondition> {
@Override
public SwitchCondition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeGtTwo(objects);
OperatorHelper.checkObjectSizeGteTwo(objects);
String errorMsg = "The caller must be SwitchCondition item";
SwitchCondition switchCondition = OperatorHelper.convert(objects[0], SwitchCondition.class, errorMsg);

View File

@@ -10,8 +10,6 @@ import com.yomahub.liteflow.exception.DataNotFoundException;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.element.condition.AndOrCondition;
import com.yomahub.liteflow.flow.element.condition.NotCondition;
import java.util.Objects;
@@ -35,13 +33,13 @@ public class OperatorHelper {
}
/**
* 检查参数数量,大于 2
* 检查参数数量,大于等于 2
* @param objects objects
* @throws QLException QLException
*/
public static void checkObjectSizeGtTwo(Object[] objects) throws QLException {
public static void checkObjectSizeGteTwo(Object[] objects) throws QLException {
checkObjectSizeGtZero(objects);
if (objects.length <= 1) {
if (objects.length < 2) {
throw new QLException("parameter error");
}
}
@@ -119,7 +117,7 @@ public class OperatorHelper {
if (clazz.isAssignableFrom(object.getClass())) {
if (object instanceof Node) {
Node node = (Node) object;
return (T) node.copy();
return (T) node.clone();
}
else {
return (T) object;
@@ -169,12 +167,8 @@ public class OperatorHelper {
Executable item = (Executable) object;
if (item.getExecuteType().equals(ExecuteTypeEnum.NODE)){
Node node = (Node) item;
if (!ListUtil.toList(NodeTypeEnum.IF,
NodeTypeEnum.IF_SCRIPT,
NodeTypeEnum.WHILE,
NodeTypeEnum.WHILE_SCRIPT,
NodeTypeEnum.BREAK,
NodeTypeEnum.BREAK_SCRIPT,
if (!ListUtil.toList(NodeTypeEnum.BOOLEAN,
NodeTypeEnum.BOOLEAN_SCRIPT,
NodeTypeEnum.FALLBACK).contains(node.getType())){
throw new QLException(StrUtil.format("The node[{}] must be boolean type Node.", node.getId()));
}

View File

@@ -10,6 +10,10 @@ public interface ChainConstant {
String CHAIN = "chain";
String ROUTE = "route";
String BODY = "body";
String FLOW = "flow";
String NODES = "nodes";
@@ -96,4 +100,6 @@ public interface ChainConstant {
String EXTENDS = "extends";
String RETRY = "retry";
}

View File

@@ -0,0 +1,26 @@
package com.yomahub.liteflow.core;
import com.yomahub.liteflow.slot.DataBus;
/**
* BOOLEAN类型的抽象节点
*
* @author Bryan.Zhang
* @since 2.12.0
*/
public abstract class NodeBooleanComponent extends NodeComponent {
@Override
public void process() throws Exception {
boolean result = this.processBoolean();
this.getSlot().setIfResult(this.getMetaValueKey(), result);
}
public abstract boolean processBoolean() throws Exception;
@Override
@SuppressWarnings("unchecked")
public Boolean getItemResultMetaValue(Integer slotIndex) {
return DataBus.getSlot(slotIndex).getIfResult(this.getMetaValueKey());
}
}

View File

@@ -1,32 +0,0 @@
package com.yomahub.liteflow.core;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.core.proxy.LiteFlowProxyUtil;
/**
* 循环跳出节点逻辑抽象类
*
* @author Bryan.Zhang
* @since 2.9.0
*/
public abstract class NodeBreakComponent extends NodeComponent {
@Override
public void process() throws Exception {
boolean breakFlag = processBreak();
Slot slot = this.getSlot();
Class<?> originalClass = LiteFlowProxyUtil.getUserClass(this.getClass());
slot.setBreakResult(originalClass.getName(), breakFlag);
}
public abstract boolean processBreak() throws Exception;
@Override
@SuppressWarnings("unchecked")
public Boolean getItemResultMetaValue(Integer slotIndex) {
Class<?> originalClass = LiteFlowProxyUtil.getUserClass(this.getClass());
return DataBus.getSlot(slotIndex).getBreakResult(originalClass.getName());
}
}

View File

@@ -7,44 +7,37 @@
*/
package com.yomahub.liteflow.core;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.StopWatch;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.yomahub.liteflow.exception.ChainEndException;
import com.yomahub.liteflow.core.proxy.LiteFlowProxyUtil;
import com.yomahub.liteflow.enums.CmpStepTypeEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.executor.NodeExecutor;
import com.yomahub.liteflow.flow.entity.CmpStep;
import com.yomahub.liteflow.flow.executor.DefaultNodeExecutor;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.flow.executor.NodeExecutor;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.spi.holder.CmpAroundAspectHolder;
import com.yomahub.liteflow.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yomahub.liteflow.flow.entity.CmpStep;
import com.yomahub.liteflow.enums.CmpStepTypeEnum;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.monitor.CompStatistics;
import com.yomahub.liteflow.monitor.MonitorBus;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.spi.holder.CmpAroundAspectHolder;
import com.yomahub.liteflow.util.JsonUtil;
import java.lang.reflect.Method;
import java.util.Date;
import java.util.Deque;
import java.util.Map;
import java.util.function.Predicate;
/**
* 普通组件抽象类
*
* @author Bryan.Zhang
* @author luo yi
*/
public abstract class NodeComponent {
public abstract class NodeComponent{
private final LFLog LOG = LFLoggerManager.getLogger(this.getClass());
@@ -77,16 +70,11 @@ public abstract class NodeComponent {
private final TransmittableThreadLocal<Node> refNodeTL = new TransmittableThreadLocal<>();
/**
******************* 以下的属性为线程附加属性******************** 线程属性是指每一个request的值都是不一样的
******************* 以下的属性为线程附加属性********************
* 线程属性是指每一个request的值都是不一样的
* 这里NodeComponent是单例所以要用ThreadLocal来修饰
*/
// 当前slot的index
private final TransmittableThreadLocal<Integer> slotIndexTL = new TransmittableThreadLocal<>();
// 是否结束整个流程,这个只对串行流程有效,并行流程无效
private final TransmittableThreadLocal<Boolean> isEndTL = new TransmittableThreadLocal<>();
public NodeComponent() {
// 反射判断是否重写了rollback方法
Class<?> clazz = this.getClass();
@@ -237,39 +225,29 @@ public abstract class NodeComponent {
// 是否结束整个流程(不往下继续执行)
public boolean isEnd() {
Boolean isEnd = isEndTL.get();
Boolean isEnd = this.refNodeTL.get().getIsEnd();
if (ObjectUtil.isNull(isEnd)) {
return false;
}
else {
return isEndTL.get();
}else {
return isEnd;
}
}
// 设置是否结束整个流程
public void setIsEnd(boolean isEnd) {
this.isEndTL.set(isEnd);
this.refNodeTL.get().setIsEnd(isEnd);
}
public void removeIsEnd() {
this.isEndTL.remove();
}
public NodeComponent setSlotIndex(Integer slotIndex) {
this.slotIndexTL.set(slotIndex);
return this;
public void setIsContinueOnError(boolean isContinueOnError) {
this.refNodeTL.get().setIsContinueOnErrorResult(isContinueOnError);
}
public Integer getSlotIndex() {
return this.slotIndexTL.get();
}
public void removeSlotIndex() {
this.slotIndexTL.remove();
return this.refNodeTL.get().getSlotIndex();
}
public Slot getSlot() {
return DataBus.getSlot(this.slotIndexTL.get());
return DataBus.getSlot(this.getSlotIndex());
}
public <T> T getFirstContextBean() {
@@ -280,6 +258,10 @@ public abstract class NodeComponent {
return this.getSlot().getContextBean(contextBeanClazz);
}
public <T> T getContextBean(String contextName) {
return this.getSlot().getContextBean(contextName);
}
public String getNodeId() {
return nodeId;
}
@@ -454,4 +436,9 @@ public abstract class NodeComponent {
public <T> T getItemResultMetaValue(Integer slotIndex){
return null;
}
protected String getMetaValueKey(){
Class<?> originalClass = LiteFlowProxyUtil.getUserClass(this.getClass());
return originalClass.getName();
}
}

View File

@@ -16,8 +16,7 @@ public abstract class NodeForComponent extends NodeComponent {
public void process() throws Exception {
int forCount = processFor();
Slot slot = this.getSlot();
Class<?> originalClass = LiteFlowProxyUtil.getUserClass(this.getClass());
slot.setForResult(originalClass.getName(), forCount);
slot.setForResult(this.getMetaValueKey(), forCount);
}
public abstract int processFor() throws Exception;
@@ -25,8 +24,7 @@ public abstract class NodeForComponent extends NodeComponent {
@Override
@SuppressWarnings("unchecked")
public Integer getItemResultMetaValue(Integer slotIndex) {
Class<?> originalClass = LiteFlowProxyUtil.getUserClass(this.getClass());
return DataBus.getSlot(slotIndex).getForResult(originalClass.getName());
return DataBus.getSlot(slotIndex).getForResult(this.getMetaValueKey());
}
}

View File

@@ -1,29 +0,0 @@
package com.yomahub.liteflow.core;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.core.proxy.LiteFlowProxyUtil;
/**
* IF节点抽象类
*
* @author Bryan.Zhang
* @since 2.8.5
*/
public abstract class NodeIfComponent extends NodeComponent {
@Override
public void process() throws Exception {
boolean result = this.processIf();
Class<?> originalClass = LiteFlowProxyUtil.getUserClass(this.getClass());
this.getSlot().setIfResult(originalClass.getName(), result);
}
public abstract boolean processIf() throws Exception;
@Override
@SuppressWarnings("unchecked")
public Boolean getItemResultMetaValue(Integer slotIndex) {
Class<?> originalClass = LiteFlowProxyUtil.getUserClass(this.getClass());
return DataBus.getSlot(slotIndex).getIfResult(originalClass.getName());
}
}

View File

@@ -18,8 +18,7 @@ public abstract class NodeIteratorComponent extends NodeComponent {
public void process() throws Exception {
Iterator<?> it = processIterator();
Slot slot = this.getSlot();
Class<?> originalClass = LiteFlowProxyUtil.getUserClass(this.getClass());
slot.setIteratorResult(originalClass.getName(), it);
slot.setIteratorResult(this.getMetaValueKey(), it);
}
public abstract Iterator<?> processIterator() throws Exception;
@@ -27,8 +26,7 @@ public abstract class NodeIteratorComponent extends NodeComponent {
@Override
@SuppressWarnings("unchecked")
public Iterator<?> getItemResultMetaValue(Integer slotIndex) {
Class<?> originalClass = LiteFlowProxyUtil.getUserClass(this.getClass());
return DataBus.getSlot(slotIndex).getIteratorResult(originalClass.getName());
return DataBus.getSlot(slotIndex).getIteratorResult(this.getMetaValueKey());
}
}

View File

@@ -20,8 +20,7 @@ public abstract class NodeSwitchComponent extends NodeComponent {
@Override
public void process() throws Exception {
String nodeId = this.processSwitch();
Class<?> originalClass = LiteFlowProxyUtil.getUserClass(this.getClass());
this.getSlot().setSwitchResult(originalClass.getName(), nodeId);
this.getSlot().setSwitchResult(this.getMetaValueKey(), nodeId);
}
// 用以返回路由节点的beanId
@@ -30,8 +29,7 @@ public abstract class NodeSwitchComponent extends NodeComponent {
@Override
@SuppressWarnings("unchecked")
public String getItemResultMetaValue(Integer slotIndex) {
Class<?> originalClass = LiteFlowProxyUtil.getUserClass(this.getClass());
return DataBus.getSlot(slotIndex).getSwitchResult(originalClass.getName());
return DataBus.getSlot(slotIndex).getSwitchResult(this.getMetaValueKey());
}
}

View File

@@ -1,32 +0,0 @@
package com.yomahub.liteflow.core;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.core.proxy.LiteFlowProxyUtil;
/**
* WHILE条件节点抽象类
*
* @author Bryan.Zhang
* @since 2.9.0
*/
public abstract class NodeWhileComponent extends NodeComponent {
@Override
public void process() throws Exception {
boolean whileFlag = processWhile();
Slot slot = this.getSlot();
Class<?> originalClass = LiteFlowProxyUtil.getUserClass(this.getClass());
slot.setWhileResult(originalClass.getName(), whileFlag);
}
public abstract boolean processWhile() throws Exception;
@Override
@SuppressWarnings("unchecked")
public Boolean getItemResultMetaValue(Integer slotIndex) {
Class<?> originalClass = LiteFlowProxyUtil.getUserClass(this.getClass());
return DataBus.getSlot(slotIndex).getWhileResult(originalClass.getName());
}
}

View File

@@ -3,18 +3,16 @@ package com.yomahub.liteflow.core;
import com.yomahub.liteflow.script.ScriptExecuteWrap;
import com.yomahub.liteflow.script.ScriptExecutorFactory;
import java.util.Map;
/**
* 脚本IF节点
* 脚本BOOLEAN节点
*
* @author Bryan.Zhang
* @since 2.8.5
* @since 2.12.0
*/
public class ScriptIfComponent extends NodeIfComponent implements ScriptComponent {
public class ScriptBooleanComponent extends NodeBooleanComponent implements ScriptComponent {
@Override
public boolean processIf() throws Exception {
public boolean processBoolean() throws Exception {
ScriptExecuteWrap wrap = this.buildWrap(this);
return (boolean) ScriptExecutorFactory.loadInstance()
.getScriptExecutor(this.getRefNode().getLanguage())

View File

@@ -1,29 +0,0 @@
package com.yomahub.liteflow.core;
import com.yomahub.liteflow.script.ScriptExecuteWrap;
import com.yomahub.liteflow.script.ScriptExecutorFactory;
import java.util.Map;
/**
* 脚本BREAK节点
*
* @author Bryan.Zhang
* @since 2.9.0
*/
public class ScriptBreakComponent extends NodeBreakComponent implements ScriptComponent {
@Override
public boolean processBreak() throws Exception {
ScriptExecuteWrap wrap = this.buildWrap(this);
return (boolean) ScriptExecutorFactory.loadInstance()
.getScriptExecutor(this.getRefNode().getLanguage())
.execute(wrap);
}
@Override
public void loadScript(String script, String language) {
ScriptExecutorFactory.loadInstance().getScriptExecutor(language).load(getNodeId(), script);
}
}

View File

@@ -21,10 +21,8 @@ public interface ScriptComponent {
{
put(NodeTypeEnum.SCRIPT, ScriptCommonComponent.class);
put(NodeTypeEnum.SWITCH_SCRIPT, ScriptSwitchComponent.class);
put(NodeTypeEnum.IF_SCRIPT, ScriptIfComponent.class);
put(NodeTypeEnum.BOOLEAN_SCRIPT, ScriptBooleanComponent.class);
put(NodeTypeEnum.FOR_SCRIPT, ScriptForComponent.class);
put(NodeTypeEnum.WHILE_SCRIPT, ScriptWhileComponent.class);
put(NodeTypeEnum.BREAK_SCRIPT, ScriptBreakComponent.class);
}
};

View File

@@ -1,29 +0,0 @@
package com.yomahub.liteflow.core;
import com.yomahub.liteflow.script.ScriptExecuteWrap;
import com.yomahub.liteflow.script.ScriptExecutorFactory;
import java.util.Map;
/**
* 脚本WHILE节点
*
* @author Bryan.Zhang
* @since 2.9.0
*/
public class ScriptWhileComponent extends NodeWhileComponent implements ScriptComponent {
@Override
public boolean processWhile() throws Exception {
ScriptExecuteWrap wrap = this.buildWrap(this);
return (boolean) ScriptExecutorFactory.loadInstance()
.getScriptExecutor(this.getRefNode().getLanguage())
.execute(wrap);
}
@Override
public void loadScript(String script, String language) {
ScriptExecutorFactory.loadInstance().getScriptExecutor(language).load(getNodeId(), script);
}
}

View File

@@ -0,0 +1,13 @@
package com.yomahub.liteflow.enums;
/**
* 布尔节点的细分TYPE
* 主要用于组件降级
*
* @author Bryan.Zhang
* @since 2.12.0
*/
public enum BooleanTypeEnum {
NOT_BOOL,IF,WHILE,BREAK
}

View File

@@ -4,10 +4,8 @@ public enum LiteFlowMethodEnum {
PROCESS("process", true),
PROCESS_SWITCH("processSwitch", true),
PROCESS_IF("processIf", true),
PROCESS_BOOLEAN("processBoolean", true),
PROCESS_FOR("processFor", true),
PROCESS_WHILE("processWhile", true),
PROCESS_BREAK("processBreak", true),
PROCESS_ITERATOR("processIterator", true),

View File

@@ -25,28 +25,20 @@ public enum NodeTypeEnum {
SWITCH("switch", "选择", false, NodeSwitchComponent.class),
IF("if", "条件", false, NodeIfComponent.class),
BOOLEAN("boolean", "布尔", false, NodeBooleanComponent.class),
FOR("for", "循环次数", false, NodeForComponent.class),
WHILE("while", "循环条件", false, NodeWhileComponent.class),
BREAK("break", "循环跳出", false, NodeBreakComponent.class),
ITERATOR("iterator", "循环迭代", false, NodeIteratorComponent.class),
SCRIPT("script", "脚本", true, ScriptCommonComponent.class),
SWITCH_SCRIPT("switch_script", "选择脚本", true, ScriptSwitchComponent.class),
IF_SCRIPT("if_script", "条件脚本", true, ScriptIfComponent.class),
BOOLEAN_SCRIPT("boolean_script", "布尔脚本", true, ScriptBooleanComponent.class),
FOR_SCRIPT("for_script", "循环次数脚本", true, ScriptForComponent.class),
WHILE_SCRIPT("while_script", "循环条件脚本", true, ScriptWhileComponent.class),
BREAK_SCRIPT("break_script", "循环跳出脚本", true, ScriptBreakComponent.class),
FALLBACK("fallback", "降级", false, null);
private static final LFLog LOG = LFLoggerManager.getLogger(NodeTypeEnum.class);

View File

@@ -16,11 +16,13 @@ import com.yomahub.liteflow.core.ComponentInitializer;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.core.ScriptComponent;
import com.yomahub.liteflow.core.proxy.DeclWarpBean;
import com.yomahub.liteflow.enums.BooleanTypeEnum;
import com.yomahub.liteflow.enums.FlowParserTypeEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.exception.ComponentCannotRegisterException;
import com.yomahub.liteflow.exception.NullNodeTypeException;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
@@ -40,7 +42,9 @@ import com.yomahub.liteflow.core.proxy.LiteFlowProxyUtil;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 流程元数据类
@@ -56,7 +60,7 @@ public class FlowBus {
private static final Map<String, Node> nodeMap;
private static final Map<NodeTypeEnum, Node> fallbackNodeMap;
private static final Map<String, Node> fallbackNodeMap;
private static AtomicBoolean initStat = new AtomicBoolean(false);
@@ -241,6 +245,14 @@ public class FlowBus {
return nodeMap.get(nodeId);
}
// 获取某一个 chainId 下的所有 nodeId
public static List<Node> getNodesByChainId(String chainId) {
Chain chain = getChain(chainId);
return chain.getConditionList().stream().flatMap(
(Function<Condition, Stream<Node>>) condition -> condition.getAllNodeInCondition().stream()
).collect(Collectors.toList());
}
public static Map<String, Node> getNodeMap() {
return nodeMap;
}
@@ -250,7 +262,12 @@ public class FlowBus {
}
public static Node getFallBackNode(NodeTypeEnum nodeType) {
return fallbackNodeMap.get(nodeType);
return getFallBackNode(nodeType, BooleanTypeEnum.NOT_BOOL);
}
public static Node getFallBackNode(NodeTypeEnum nodeType, BooleanTypeEnum booleanTypeEnum){
String key = StrUtil.format("{}_{}", nodeType.name(), booleanTypeEnum.name());
return fallbackNodeMap.get(key);
}
public static void cleanCache() {
@@ -297,6 +314,11 @@ public class FlowBus {
Arrays.stream(chainIds).forEach(FlowBus::removeChain);
}
// 移除节点
public static boolean removeNode(String nodeId) {
return nodeMap.remove(nodeId) != null;
}
// 判断是否是降级组件,如果是则添加到 fallbackNodeMap
private static void addFallbackNode(Node node) {
NodeComponent nodeComponent = node.getInstance();
@@ -306,7 +328,35 @@ public class FlowBus {
}
NodeTypeEnum nodeType = node.getType();
fallbackNodeMap.put(nodeType, node);
String key = StrUtil.format("{}_{}", nodeType.name(), fallbackCmp.value().name());
fallbackNodeMap.put(key, node);
}
// 重新加载脚本
public static void reloadScript(String nodeId, String script) {
Node node = getNode(nodeId);
if (node == null || !node.getType().isScript()) {
return;
}
// 更新脚本
node.setScript(script);
ScriptExecutorFactory.loadInstance()
.getScriptExecutor(node.getLanguage())
.load(nodeId, script);
}
// 卸载脚本节点
public static boolean unloadScriptNode(String nodeId) {
Node node = getNode(nodeId);
if (node == null || !node.getType().isScript()) {
return false;
}
// 卸载脚本
ScriptExecutorFactory.loadInstance()
.getScriptExecutor(node.getLanguage())
.unLoad(nodeId);
// 移除脚本
return removeNode(nodeId);
}
public static void clearStat(){

View File

@@ -101,6 +101,10 @@ public class LiteflowResponse {
return this.getSlot().getContextBean(contextBeanClazz);
}
public <T> T getContextBean(String contextName) {
return this.getSlot().getContextBean(contextName);
}
public Map<String, List<CmpStep>> getExecuteSteps() {
Map<String, List<CmpStep>> map = new LinkedHashMap<>();
this.getSlot().getExecuteSteps().forEach(cmpStep -> {

View File

@@ -30,6 +30,8 @@ public class Chain implements Executable{
private String chainId;
private Executable routeItem;
private List<Condition> conditionList = new ArrayList<>();
public Chain(String chainName) {
@@ -133,4 +135,12 @@ public class Chain implements Executable{
public String getTag() {
return null;
}
public Executable getRouteItem() {
return routeItem;
}
public void setRouteItem(Executable routeItem) {
this.routeItem = routeItem;
}
}

View File

@@ -22,6 +22,11 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Condition的抽象类
@@ -103,6 +108,24 @@ public abstract class Condition implements Executable{
}
}
public List<Node> getAllNodeInCondition(){
List<Executable> executableList = this.executableGroup.entrySet().stream().flatMap(
(Function<Map.Entry<String, List<Executable>>, Stream<Executable>>) entry -> entry.getValue().stream()
).collect(Collectors.toList());
List<Node> resultList = new ArrayList<>();
executableList.stream().forEach(executable -> {
if (executable instanceof Condition){
resultList.addAll(((Condition)executable).getAllNodeInCondition());
}else if(executable instanceof Node){
resultList.add((Node)executable);
}
});
return resultList;
}
public void setExecutableList(List<Executable> executableList) {
this.executableGroup.put(ConditionKey.DEFAULT_KEY, executableList);
}

View File

@@ -2,6 +2,8 @@ package com.yomahub.liteflow.flow.element;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.BooleanTypeEnum;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.exception.FallbackCmpNotFoundException;
@@ -64,7 +66,7 @@ public class FallbackNode extends Node {
this.getCurrChainId()));
}
// 使用 node 的副本
this.fallbackNode = node.copy();
this.fallbackNode = node.clone();
}
private Node findFallbackNode(Condition condition) {
@@ -88,7 +90,8 @@ public class FallbackNode extends Node {
return findNodeInIterator((IteratorCondition) condition);
case TYPE_NOT_OPT:
case TYPE_AND_OR_OPT:
return FlowBus.getFallBackNode(NodeTypeEnum.IF);
//组件降级用在与并或中只能用在IF表达式中
return FlowBus.getFallBackNode(NodeTypeEnum.BOOLEAN, BooleanTypeEnum.IF);
default:
return null;
}
@@ -98,7 +101,7 @@ public class FallbackNode extends Node {
Executable ifItem = ifCondition.getIfItem();
if (ifItem == this) {
// 需要条件组件
return FlowBus.getFallBackNode(NodeTypeEnum.IF);
return FlowBus.getFallBackNode(NodeTypeEnum.BOOLEAN, BooleanTypeEnum.IF);
}
// 需要普通组件
@@ -126,7 +129,7 @@ public class FallbackNode extends Node {
private Node findNodeInWhile(WhileCondition whileCondition) {
Executable whileItem = whileCondition.getWhileItem();
if (whileItem == this) {
return FlowBus.getFallBackNode(NodeTypeEnum.WHILE);
return FlowBus.getFallBackNode(NodeTypeEnum.BOOLEAN, BooleanTypeEnum.WHILE);
}
return findNodeInLoop(whileCondition);
@@ -144,7 +147,7 @@ public class FallbackNode extends Node {
private Node findNodeInLoop(LoopCondition loopCondition) {
Executable breakItem = loopCondition.getExecutableOne(ConditionKey.BREAK_KEY);
if (breakItem == this) {
return FlowBus.getFallBackNode(NodeTypeEnum.BREAK);
return FlowBus.getFallBackNode(NodeTypeEnum.BOOLEAN, BooleanTypeEnum.BREAK);
}
return FlowBus.getFallBackNode(NodeTypeEnum.COMMON);
@@ -162,13 +165,21 @@ public class FallbackNode extends Node {
return this.fallbackNode.isAccess(slotIndex);
}
@Override
public NodeComponent getInstance() {
if (fallbackNode == null){
return null;
}
return fallbackNode.getInstance();
}
@Override
public String getId() {
return this.fallbackNode == null ? null : this.fallbackNode.getId();
}
@Override
public Node copy() {
public Node clone() {
// 代理节点不复制
return this;
}

View File

@@ -21,8 +21,6 @@ import com.yomahub.liteflow.flow.executor.NodeExecutor;
import com.yomahub.liteflow.flow.executor.NodeExecutorHelper;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
@@ -30,6 +28,7 @@ import com.yomahub.liteflow.slot.Slot;
* Node节点实现可执行器 Node节点并不是单例的每构建一次都会copy出一个新的实例
*
* @author Bryan.Zhang
* @author luo yi
*/
public class Node implements Executable, Cloneable, Rollbackable{
@@ -57,10 +56,24 @@ public class Node implements Executable, Cloneable, Rollbackable{
private String currChainId;
// node 的 isAccess 结果,主要用于 WhenCondition 的提前 isAccess 判断,避免 isAccess 方法重复执行
private TransmittableThreadLocal<Boolean> accessResult = new TransmittableThreadLocal<>();
// 循环下标
private TransmittableThreadLocal<Integer> loopIndexTL = new TransmittableThreadLocal<>();
// 迭代对象
private TransmittableThreadLocal<Object> currLoopObject = new TransmittableThreadLocal<>();
// 当前slot的index
private TransmittableThreadLocal<Integer> slotIndexTL = new TransmittableThreadLocal<>();
// 是否结束整个流程,这个只对串行流程有效,并行流程无效
private TransmittableThreadLocal<Boolean> isEndTL = new TransmittableThreadLocal<>();
// isContinueOnError 结果
private TransmittableThreadLocal<Boolean> isContinueOnErrorResult = new TransmittableThreadLocal<>();
public Node() {
}
@@ -125,16 +138,13 @@ public class Node implements Executable, Cloneable, Rollbackable{
throw new FlowSystemException("there is no instance for node id " + id);
}
Slot slot = DataBus.getSlot(slotIndex);
try {
// 把线程属性赋值给组件对象
instance.setSlotIndex(slotIndex);
this.setSlotIndex(slotIndex);
instance.setRefNode(this);
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 判断是否可执行所以isAccess经常作为一个组件进入的实际判断要素用作检查slot里的参数的完备性
if (instance.isAccess()) {
if (getAccessResult() || instance.isAccess()) {
LOG.info("[O]start component[{}] execution", instance.getDisplayName());
// 这里开始进行重试的逻辑和主逻辑的运行
@@ -142,8 +152,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
.buildNodeExecutor(instance.getNodeExecutorClass());
// 调用节点执行器进行执行
nodeExecutor.execute(instance);
}
else {
} else {
LOG.info("[X]skip component[{}] execution", instance.getDisplayName());
}
// 如果组件覆盖了isEnd方法或者在在逻辑中主要调用了setEnd(true)的话,流程就会立马结束
@@ -162,7 +171,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
throw new ChainEndException(errorInfo);
}
// 如果组件覆盖了isContinueOnError方法返回为true那即便出了异常也会继续流程
else if (instance.isContinueOnError()) {
else if (getIsContinueOnErrorResult() || instance.isContinueOnError()) {
String errorMsg = StrUtil.format("component[{}] cause error,but flow is still go on", id);
LOG.error(errorMsg);
}
@@ -174,10 +183,12 @@ public class Node implements Executable, Cloneable, Rollbackable{
}
finally {
// 移除threadLocal里的信息
instance.removeSlotIndex();
instance.removeIsEnd();
instance.removeRefNode();
this.getInstance().removeRefNode();
removeSlotIndex();
removeIsEnd();
removeLoopIndex();
removeAccessResult();
removeIsContinueOnErrorResult();
}
}
@@ -188,7 +199,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
Slot slot = DataBus.getSlot(slotIndex);
try {
// 把线程属性赋值给组件对象
instance.setSlotIndex(slotIndex);
this.setSlotIndex(slotIndex);
instance.setRefNode(this);
instance.doRollback();
}
@@ -198,7 +209,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
}
finally {
// 移除threadLocal里的信息
instance.removeSlotIndex();
this.removeSlotIndex();
instance.removeRefNode();
}
}
@@ -210,7 +221,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
@Override
public boolean isAccess(Integer slotIndex) throws Exception {
// 把线程属性赋值给组件对象
instance.setSlotIndex(slotIndex);
this.setSlotIndex(slotIndex);
instance.setRefNode(this);
return instance.isAccess();
}
@@ -253,6 +264,32 @@ public class Node implements Executable, Cloneable, Rollbackable{
return currChainId;
}
public boolean getAccessResult() {
Boolean result = this.accessResult.get();
return result != null && result;
}
public void setAccessResult(boolean accessResult) {
this.accessResult.set(accessResult);
}
public void removeAccessResult() {
this.accessResult.remove();
}
public boolean getIsContinueOnErrorResult() {
Boolean result = this.isContinueOnErrorResult.get();
return result != null && result;
}
public void setIsContinueOnErrorResult(boolean accessResult) {
this.isContinueOnErrorResult.set(accessResult);
}
public void removeIsContinueOnErrorResult() {
this.isContinueOnErrorResult.remove();
}
public void setLoopIndex(int index) {
this.loopIndexTL.set(index);
}
@@ -277,6 +314,30 @@ public class Node implements Executable, Cloneable, Rollbackable{
this.currLoopObject.remove();
}
public Integer getSlotIndex(){
return this.slotIndexTL.get();
}
public void setSlotIndex(Integer slotIndex){
this.slotIndexTL.set(slotIndex);
}
public void removeSlotIndex(){
this.slotIndexTL.remove();
}
public Boolean getIsEnd(){
return this.isEndTL.get();
}
public void setIsEnd(Boolean isEnd){
this.isEndTL.set(isEnd);
}
public void removeIsEnd(){
this.isEndTL.remove();
}
public String getLanguage() {
return language;
}
@@ -291,14 +352,14 @@ public class Node implements Executable, Cloneable, Rollbackable{
}
@Override
protected Object clone() throws CloneNotSupportedException {
return super.clone();
}
public Node copy() throws Exception {
Node node = (Node)this.clone();
public Node clone() throws CloneNotSupportedException {
Node node = (Node)super.clone();
node.loopIndexTL = new TransmittableThreadLocal<>();
node.currLoopObject = new TransmittableThreadLocal<>();
node.accessResult = new TransmittableThreadLocal<>();
node.slotIndexTL = new TransmittableThreadLocal<>();
node.isEndTL = new TransmittableThreadLocal<>();
node.isContinueOnErrorResult = new TransmittableThreadLocal<>();
return node;
}
}

View File

@@ -33,13 +33,15 @@ public class ForCondition extends LoopCondition {
throw new NoForNodeException(errorInfo);
}
// 提前设置 chainId避免无法在 isAccess 方法中获取到
forNode.setCurrChainId(this.getCurrChainId());
// 先去判断isAccess方法如果isAccess方法都返回false整个FOR表达式不执行
if (!this.getForNode().isAccess(slotIndex)) {
if (!forNode.isAccess(slotIndex)) {
return;
}
// 执行forCount组件
forNode.setCurrChainId(this.getCurrChainId());
forNode.execute(slotIndex);
// 获得循环次数

View File

@@ -3,7 +3,8 @@ package com.yomahub.liteflow.flow.element.condition;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.exception.*;
import com.yomahub.liteflow.exception.IfTargetCannotBePreOrFinallyException;
import com.yomahub.liteflow.exception.NoIfTrueNodeException;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.slot.DataBus;
@@ -21,13 +22,15 @@ public class IfCondition extends Condition {
public void executeCondition(Integer slotIndex) throws Exception {
Executable ifItem = this.getIfItem();
// 提前设置 chainId避免无法在 isAccess 方法中获取到
ifItem.setCurrChainId(this.getCurrChainId());
// 先去判断isAccess方法如果isAccess方法都返回false整个IF表达式不执行
if (!ifItem.isAccess(slotIndex)) {
return;
}
// 先执行IF节点
ifItem.setCurrChainId(this.getCurrChainId());
ifItem.execute(slotIndex);
// 拿到If执行过的结果

View File

@@ -29,13 +29,15 @@ public class IteratorCondition extends LoopCondition {
throw new NoIteratorNodeException(errorInfo);
}
// 提前设置 chainId避免无法在 isAccess 方法中获取到
iteratorNode.setCurrChainId(this.getCurrChainId());
// 先去判断isAccess方法如果isAccess方法都返回false整个ITERATOR表达式不执行
if (!iteratorNode.isAccess(slotIndex)) {
return;
}
// 执行Iterator组件
iteratorNode.setCurrChainId(this.getCurrChainId());
iteratorNode.execute(slotIndex);
Iterator<?> it = iteratorNode.getItemResultMetaValue(slotIndex);

View File

@@ -26,7 +26,7 @@ public class NotCondition extends Condition {
Slot slot = DataBus.getSlot(slotIndex);
String resultKey = StrUtil.format("{}_{}",this.getClass().getName(),this.hashCode());
slot.setAndOrResult(resultKey, !flag);
slot.setNotResult(resultKey, !flag);
}
@@ -35,7 +35,7 @@ public class NotCondition extends Condition {
public Boolean getItemResultMetaValue(Integer slotIndex) {
Slot slot = DataBus.getSlot(slotIndex);
String resultKey = StrUtil.format("{}_{}",this.getClass().getName(),this.hashCode());
return slot.getAndOrResult(resultKey);
return slot.getNotResult(resultKey);
}
@Override

View File

@@ -0,0 +1,108 @@
package com.yomahub.liteflow.flow.element.condition;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.exception.ChainEndException;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.slot.DataBus;
import java.util.Arrays;
import java.util.List;
/**
*
* @author Rain
* @since 2.12.0
*
*/
public class RetryCondition extends ThenCondition{
private final LFLog LOG = LFLoggerManager.getLogger(this.getClass());
private Integer retryTimes;
private Class<? extends Exception>[] retryForExceptions = new Class[] { Exception.class };
public Class<? extends Exception>[] getRetryForExceptions() {
return retryForExceptions;
}
public void setRetryForExceptions(Class<? extends Exception>[] retryForExceptions) {
this.retryForExceptions = retryForExceptions;
}
public Integer getRetryTimes() {
return retryTimes;
}
public void setRetryTimes(Integer retryTimes) {
this.retryTimes = retryTimes;
}
@Override
public void executeCondition(Integer slotIndex) throws Exception {
int retryTimes = this.getRetryTimes() < 0 ? 0 : this.getRetryTimes();
List<Class<? extends Exception>> forExceptions = Arrays.asList(this.getRetryForExceptions());
for (int i = 0; i <= retryTimes; i ++) {
try {
if(i == 0) {
super.executeCondition(slotIndex);
} else {
retry(slotIndex, i);
}
break;
} catch (ChainEndException e) {
throw e;
} catch (Exception e) {
// 判断抛出的异常是不是指定异常的子类
boolean flag = forExceptions.stream().anyMatch(clazz -> clazz.isAssignableFrom(e.getClass()));
if(!flag || i >= retryTimes) {
if(retryTimes > 0) {
String retryFailMsg = StrFormatter.format("retry fail when executing the chain[{}] because {} occurs {}.",
this.getCurrChainId(), this.getCurrentExecutableId(), e);
LOG.error(retryFailMsg);
}
throw e;
} else {
DataBus.getSlot(slotIndex).removeException();
}
}
}
}
private void retry(Integer slotIndex, int retryTime) throws Exception {
LOG.info("{} performs {} retry ", this.getCurrentExecutableId(), retryTime);
super.executeCondition(slotIndex);
}
/**
* 获取当前组件的 id
*
* @return
*/
private String getCurrentExecutableId() {
// retryCondition 只有一个 Executable
Executable executable = this.getExecutableList().get(0);
if (ObjectUtil.isNotNull(executable.getId())) {
// 已经有 id 了
return executable.getId();
}
// 定义 id
switch (executable.getExecuteType()) {
// chain 和 node 一般都有 id
case CHAIN:
return ((Chain) executable).getChainId();
case CONDITION:
return "condition-" + ((Condition) executable).getConditionType().getName();
case NODE:
return "node-" + ((Node) executable).getType().getCode();
default:
return "unknown-executable";
}
}
}

View File

@@ -1,13 +1,10 @@
package com.yomahub.liteflow.flow.element.condition;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.exception.NoSwitchTargetNodeException;
import com.yomahub.liteflow.exception.SwitchTargetCannotBePreOrFinallyException;
import com.yomahub.liteflow.exception.SwitchTypeErrorException;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
@@ -35,13 +32,15 @@ public class SwitchCondition extends Condition {
// 获取target List
List<Executable> targetList = this.getTargetList();
// 提前设置 chainId避免无法在 isAccess 方法中获取到
switchNode.setCurrChainId(this.getCurrChainId());
// 先去判断isAccess方法如果isAccess方法都返回false整个SWITCH表达式不执行
if (!switchNode.isAccess(slotIndex)) {
return;
}
// 先执行switch节点
switchNode.setCurrChainId(this.getCurrChainId());
switchNode.execute(slotIndex);
// 拿到switch节点的结果

View File

@@ -51,13 +51,17 @@ public class ThenCondition extends Condition {
}
catch (Exception e) {
Slot slot = DataBus.getSlot(slotIndex);
String chainId = this.getCurrChainId();
// 这里事先取到exception set到slot里为了方便finally取到exception
if (slot.isSubChain(chainId)) {
slot.setSubException(chainId, e);
}
else {
slot.setException(e);
//正常情况下slot不可能为null
//当设置了超时后还在运行的组件就有可能因为主流程已经结束释放slot而导致slot为null
if (slot != null){
String chainId = this.getCurrChainId();
// 这里事先取到exception set到slot里为了方便finally取到exception
if (slot.isSubChain(chainId)) {
slot.setSubException(chainId, e);
}
else {
slot.setException(e);
}
}
throw e;
}

View File

@@ -3,7 +3,6 @@ package com.yomahub.liteflow.flow.element.condition;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import com.yomahub.liteflow.thread.ExecutorHelper;
@@ -24,6 +23,9 @@ public class WhileCondition extends LoopCondition {
public void executeCondition(Integer slotIndex) throws Exception {
Executable whileItem = this.getWhileItem();
// 提前设置 chainId避免无法在 isAccess 方法中获取到
whileItem.setCurrChainId(this.getCurrChainId());
// 先去判断isAccess方法如果isAccess方法都返回false整个WHILE表达式不执行
if (!whileItem.isAccess(slotIndex)) {
return;
@@ -84,7 +86,6 @@ public class WhileCondition extends LoopCondition {
private boolean getWhileResult(Integer slotIndex, int loopIndex) throws Exception {
Executable whileItem = this.getWhileItem();
// 执行while组件
whileItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(whileItem, loopIndex);
whileItem.execute(slotIndex);

View File

@@ -0,0 +1,87 @@
package com.yomahub.liteflow.flow.parallel;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
public class CompletableFutureExpand {
/**
* 如果在给定超时之前未完成,则异常完成此 CompletableFuture 并抛出 {@link TimeoutException} 。
*
* @param timeout 在出现 TimeoutException 异常完成之前等待多长时间,以 {@code unit} 为单位
* @param unit 一个 {@link TimeUnit},结合 {@code timeout} 参数,表示给定粒度单位的持续时间
* @return 入参的 CompletableFuture
*/
public static <T> CompletableFuture<T> completeOnTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit, T timeoutDefaultObj) {
if (future.isDone()) {
return future;
}
return future.whenComplete(new Canceller(Delayer.delay(new Timeout<>(future, timeoutDefaultObj), timeout, unit)));
}
/**
* 超时时异常完成的操作
*/
static final class Timeout<T> implements Runnable {
final CompletableFuture<T> future;
final T timeoutDefaultObj;
Timeout(CompletableFuture<T> future, T timeoutDefaultObj) {
this.future = future;
this.timeoutDefaultObj = timeoutDefaultObj;
}
public void run() {
if (null != future && !future.isDone()) {
future.complete(timeoutDefaultObj);
}
}
}
/**
* 取消不需要的超时的操作
*/
static final class Canceller implements BiConsumer<Object, Throwable> {
final Future<?> future;
Canceller(Future<?> future) {
this.future = future;
}
public void accept(Object ignore, Throwable ex) {
if (null == ex && null != future && !future.isDone()) {
future.cancel(false);
}
}
}
/**
* 单例延迟调度器,仅用于启动和取消任务,一个线程就足够
*/
static final class Delayer {
static final ScheduledThreadPoolExecutor delayer;
static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}
static final class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureExpandUtilsDelayScheduler");
return t;
}
}
static {
delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
delayer.setRemoveOnCancelPolicy(true);
}
}
}

View File

@@ -63,8 +63,7 @@ public class CompletableFutureTimeout {
}
// 哪个先完成 就apply哪一个结果 这是一个关键的API,exceptionally出现异常后返回默认值
public static <T> CompletableFuture<T> completeOnTimeout(T t, CompletableFuture<T> future, long timeout,
TimeUnit unit) {
public static <T> CompletableFuture<T> completeOnTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit, T t) {
final CompletableFuture<T> timeoutFuture = timeoutAfter(timeout, unit);
return future.applyToEither(timeoutFuture, Function.identity()).exceptionally((throwable) -> t);
}

View File

@@ -31,9 +31,10 @@ public class AllOfParallelExecutor extends ParallelStrategyExecutor {
}
//allOf这个场景中不需要过滤
//allOf 这个场景中,不需要过滤
@Override
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex, String currentChainId) {
return stream;
}
}

View File

@@ -1,12 +1,10 @@
package com.yomahub.liteflow.flow.parallel.strategy;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
/**
* 完成任一任务
@@ -31,18 +29,4 @@ public class AnyOfParallelExecutor extends ParallelStrategyExecutor {
}
//在anyOf这个场景中需要过滤掉isAccess为false的场景
//因为不过滤这个的话,如果加上了 any那么 isAccess 为 false 那就是最快的了
//换句话说就是anyOf这个场景isAccess会被执行两次
@Override
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
return stream.filter(executable -> {
try {
return executable.isAccess(slotIndex);
} catch (Exception e) {
LOG.error("there was an error when executing the when component isAccess", e);
return false;
}
});
}
}

View File

@@ -6,10 +6,11 @@ import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.enums.ParallelStrategyEnum;
import com.yomahub.liteflow.exception.WhenExecuteException;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
import com.yomahub.liteflow.flow.element.condition.PreCondition;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.CompletableFutureTimeout;
import com.yomahub.liteflow.flow.parallel.CompletableFutureExpand;
import com.yomahub.liteflow.flow.parallel.ParallelSupplier;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
import com.yomahub.liteflow.log.LFLog;
@@ -44,19 +45,19 @@ public abstract class ParallelStrategyExecutor {
* @param executable
* @param parallelExecutor
* @param whenCondition
* @param currChainName
* @param currChainId
* @param slotIndex
* @return
*/
protected CompletableFuture<WhenFutureObj> wrappedFutureObj(Executable executable, ExecutorService parallelExecutor,
WhenCondition whenCondition, String currChainName, Integer slotIndex) {
WhenCondition whenCondition, String currChainId, Integer slotIndex) {
// 套入 CompletableFutureTimeout 方法进行超时判断,如果超时则用 WhenFutureObj.timeOut 返回超时的对象
// 第 2 个参数是主要的本体 CompletableFuture传入了 ParallelSupplier 和线程池对象
return CompletableFutureTimeout.completeOnTimeout(
WhenFutureObj.timeOut(executable.getId()),
CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), parallelExecutor),
return CompletableFutureExpand.completeOnTimeout(
CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainId, slotIndex), parallelExecutor),
whenCondition.getMaxWaitTime(),
whenCondition.getMaxWaitTimeUnit());
whenCondition.getMaxWaitTimeUnit(),
WhenFutureObj.timeOut(executable.getId()));
}
/**
@@ -87,26 +88,34 @@ public abstract class ParallelStrategyExecutor {
* 过滤 WHEN 待执行任务
* @param executableList 所有任务列表
* @param slotIndex
* @param currentChainId 当前执行的 chainId
* @return
*/
protected Stream<Executable> filterWhenTaskList(List<Executable> executableList, Integer slotIndex) {
protected Stream<Executable> filterWhenTaskList(List<Executable> executableList, Integer slotIndex, String currentChainId) {
// 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了
// 2.过滤 isAccess 为 false 的情况,因为不过滤这个的话,如果加上了 any那么 isAccess 为 false 那就是最快的了
Stream<Executable> stream = executableList.stream()
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
.filter(executable -> {
try {
return executable.isAccess(slotIndex);
} catch (Exception e) {
LOG.error("there was an error when executing the when component isAccess", e);
return false;
}
});
return filterAccess(stream, slotIndex);
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition));
return filterAccess(stream, slotIndex, currentChainId);
}
//过滤isAccess的抽象接口方法
protected abstract Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex);
// 过滤 isAccess 的方法,默认实现,同时为避免同一个 node 的 isAccess 方法重复执行,给 node 设置 isAccess 方法执行结果
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex, String currentChainId) {
return stream.filter(executable -> {
try {
// 提前设置 chainId避免无法在 isAccess 方法中获取到
executable.setCurrChainId(currentChainId);
boolean access = executable.isAccess(slotIndex);
if (executable instanceof Node) {
((Node) executable).setAccessResult(access);
}
return access;
} catch (Exception e) {
LOG.error("there was an error when executing the when component isAccess", e);
return false;
}
});
}
/**
* 获取 WHEN 所需线程池
@@ -140,18 +149,18 @@ public abstract class ParallelStrategyExecutor {
*/
protected List<CompletableFuture<WhenFutureObj>> getWhenAllTaskList(WhenCondition whenCondition, Integer slotIndex) {
String currChainName = whenCondition.getCurrChainId();
String currChainId = whenCondition.getCurrChainId();
// 设置 whenCondition 参数
setWhenConditionParams(whenCondition);
this.setWhenConditionParams(whenCondition);
// 获取 WHEN 所需线程池
ExecutorService parallelExecutor = getWhenExecutorService(whenCondition);
// 这里主要是做了封装 CompletableFuture 对象,用 lambda 表达式做了很多事情,这句代码要仔细理清
// 根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List<CompletableFuture<WhenFutureObj>>
List<CompletableFuture<WhenFutureObj>> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex)
.map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex))
List<CompletableFuture<WhenFutureObj>> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainId)
.map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainId, slotIndex))
.collect(Collectors.toList());
return completableFutureList;
@@ -161,11 +170,11 @@ public abstract class ParallelStrategyExecutor {
* 任务结果处理
* @param whenCondition 并行组件对象
* @param slotIndex 当前 slot 的 index
* @param whenAllTaskList 并行组件中所有任务列表
* @param whenAllFutureList 并行组件中所有任务列表
* @param specifyTask 指定预先完成的任务,详见 {@link ParallelStrategyEnum}
* @throws Exception
*/
protected void handleTaskResult(WhenCondition whenCondition, Integer slotIndex, List<CompletableFuture<WhenFutureObj>> whenAllTaskList,
protected void handleTaskResult(WhenCondition whenCondition, Integer slotIndex, List<CompletableFuture<WhenFutureObj>> whenAllFutureList,
CompletableFuture<?> specifyTask) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
@@ -187,17 +196,23 @@ public abstract class ParallelStrategyExecutor {
// 如果 any 为 true那么这里拿到的是第一个完成的任务
// 如果为 must那么这里获取到的就是指定的任务
// 这里过滤和转换一起用 lambda 做了
List<WhenFutureObj> allCompletableWhenFutureObjList = whenAllTaskList.stream().filter(f -> {
List<WhenFutureObj> allCompletableWhenFutureObjList = whenAllFutureList.stream().filter(f -> {
// 过滤出已经完成的,没完成的就直接终止
if (f.isDone()) {
return true;
} else {
//事实上CompletableFuture并不能cancel掉底层的线程
f.cancel(true);
return false;
}
}).map(f -> {
try {
return f.get();
WhenFutureObj whenFutureObj = f.get();
if (whenFutureObj.isTimeout()){
//事实上CompletableFuture并不能cancel掉底层的线程
f.cancel(true);
}
return whenFutureObj;
} catch (InterruptedException | ExecutionException e) {
interrupted[0] = true;
return null;

View File

@@ -1,14 +1,12 @@
package com.yomahub.liteflow.flow.parallel.strategy;
import cn.hutool.core.collection.CollUtil;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;
/**
* 完成指定任务执行器,使用 ID 进行比较
@@ -22,7 +20,7 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
@Override
public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception {
String currChainName = whenCondition.getCurrChainId();
String currChainId = whenCondition.getCurrChainId();
// 设置 whenCondition 参数
this.setWhenConditionParams(whenCondition);
@@ -43,10 +41,10 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
List<CompletableFuture<WhenFutureObj>> allTaskList = new ArrayList<>();
// 遍历 when 所有 node进行筛选及处理
filterWhenTaskList(whenCondition.getExecutableList(), slotIndex)
filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainId)
.forEach(executable -> {
// 处理 task封装成 CompletableFuture 对象
CompletableFuture<WhenFutureObj> completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex);
CompletableFuture<WhenFutureObj> completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainId, slotIndex);
// 存在 must 指定 ID 的 task且该任务只会有一个或者没有
if (whenCondition.getSpecifyIdSet().contains(executable.getId())) {
// 设置指定任务 future 对象
@@ -77,19 +75,4 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
}
//在must这个场景中需要过滤掉isAccess为false的场景
//因为不过滤这个的话,如果加上了 any那么 isAccess 为 false 那就是最快的了
//换句话说就是must这个场景isAccess会被执行两次
@Override
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
return stream.filter(executable -> {
try {
return executable.isAccess(slotIndex);
} catch (Exception e) {
LOG.error("there was an error when executing the when component isAccess", e);
return false;
}
});
}
}

View File

@@ -2,14 +2,12 @@ package com.yomahub.liteflow.script;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.util.AnnoUtil;
import com.yomahub.liteflow.context.ContextBean;
import com.yomahub.liteflow.enums.ScriptTypeEnum;
import com.yomahub.liteflow.exception.LiteFlowException;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
@@ -27,6 +25,12 @@ public abstract class ScriptExecutor {
public abstract void load(String nodeId, String script);
// 卸载脚本(不包含 node
public abstract void unLoad(String nodeId);
// 获取该执行器下的所有 nodeId
public abstract List<String> getNodeIds();
public Object execute(ScriptExecuteWrap wrap) throws Exception{
try {
return executeScript(wrap);
@@ -54,17 +58,7 @@ public abstract class ScriptExecutor {
// key的规则为自定义上下文的simpleName
// 比如你的自定义上下文为AbcContext那么key就为:abcContext
// 这里不统一放一个map的原因是考虑到有些用户会调用上下文里的方法而不是参数所以脚本语言的绑定表里也是放多个上下文
DataBus.getContextBeanList(wrap.getSlotIndex()).forEach(o -> {
ContextBean contextBean = AnnoUtil.getAnnotation(o.getClass(), ContextBean.class);
String key;
if (contextBean != null && contextBean.value().trim().length() > 0) {
key = contextBean.value();
}
else {
key = StrUtil.lowerFirst(o.getClass().getSimpleName());
}
putConsumer.accept(key, o);
});
DataBus.getContextBeanList(wrap.getSlotIndex()).forEach(tuple -> putConsumer.accept(tuple.get(0), tuple.get(1)));
// 把wrap对象转换成元数据map
Map<String, Object> metaMap = BeanUtil.beanToMap(wrap);
@@ -86,4 +80,12 @@ public abstract class ScriptExecutor {
ScriptBeanManager.getScriptBeanMap().forEach(putIfAbsentConsumer);
}
/**
* 利用相应框架编译脚本
*
* @param script 脚本
* @return boolean
* @throws Exception 例外
*/
public abstract Object compile(String script) throws Exception;
}

View File

@@ -7,7 +7,16 @@ import com.yomahub.liteflow.script.ScriptExecuteWrap;
import com.yomahub.liteflow.script.ScriptExecutor;
import com.yomahub.liteflow.script.exception.ScriptLoadException;
import com.yomahub.liteflow.util.CopyOnWriteHashMap;
import javax.script.*;
import javax.script.Bindings;
import javax.script.Compilable;
import javax.script.CompiledScript;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.SimpleBindings;
import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
@@ -38,14 +47,22 @@ public abstract class JSR223ScriptExecutor extends ScriptExecutor {
@Override
public void load(String nodeId, String script) {
try {
CompiledScript compiledScript = ((Compilable) scriptEngine).compile(convertScript(script));
compiledScriptMap.put(nodeId, compiledScript);
compiledScriptMap.put(nodeId, (CompiledScript) compile(script));
}
catch (Exception e) {
String errorMsg = StrUtil.format("script loading error for node[{}], error msg:{}", nodeId, e.getMessage());
throw new ScriptLoadException(errorMsg);
}
}
@Override
public void unLoad(String nodeId) {
compiledScriptMap.remove(nodeId);
}
@Override
public List<String> getNodeIds() {
return new ArrayList<>(compiledScriptMap.keySet());
}
@Override
@@ -68,4 +85,12 @@ public abstract class JSR223ScriptExecutor extends ScriptExecutor {
compiledScriptMap.clear();
}
@Override
public Object compile(String script) throws ScriptException {
if(scriptEngine == null) {
LOG.error("script engine has not init");
}
return ((Compilable) scriptEngine).compile(convertScript(script));
}
}

View File

@@ -0,0 +1,104 @@
package com.yomahub.liteflow.script.validator;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.enums.ScriptTypeEnum;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.script.ScriptExecutor;
import java.util.*;
/**
* 脚本验证类
*
* @author Ge_Zuao
* @since 2.12.0
*/
public class ScriptValidator {
private static final LFLog LOG = LFLoggerManager.getLogger(ScriptValidator.class);
private static Map<ScriptTypeEnum, ScriptExecutor> scriptExecutors;
static {
List<ScriptExecutor> scriptExecutorList = new ArrayList<>();
scriptExecutors = new HashMap<>();
ServiceLoader.load(ScriptExecutor.class).forEach(scriptExecutorList::add);
scriptExecutorList.stream()
.peek(ScriptExecutor::init)
.forEach(scriptExecutor -> scriptExecutors.put(scriptExecutor.scriptType(), scriptExecutor));
}
/**
* 验证脚本逻辑的公共部分
*
* @param script 脚本
* @param scriptType 脚本类型
* @return boolean
*/
private static boolean validateScript(String script, ScriptTypeEnum scriptType){
// 未加载任何脚本模块
if(scriptExecutors.isEmpty()){
LOG.error("The loaded script modules not found.");
return false;
}
// 指定脚本语言未加载
if (scriptType != null && !scriptExecutors.containsKey(scriptType)) {
LOG.error(StrUtil.format("Specified script language {} was not found.", scriptType));
return false;
}
// 加载多个脚本语言需要指定语言验证
if (scriptExecutors.size() > 1 && scriptType == null) {
LOG.error("The loaded script modules more than 1. Please specify the script language.");
return false;
}
ScriptExecutor scriptExecutor = (scriptType != null) ? scriptExecutors.get(scriptType) : scriptExecutors.values().iterator().next();
try {
scriptExecutor.compile(script);
} catch (Exception e) {
LOG.error(StrUtil.format("{} Script component validate failure. ", scriptExecutor.scriptType()) + e.getMessage());
return false;
}
return true;
}
/**
* 只引入一种脚本语言时,可以不指定语言验证
*
* @param script 脚本
* @return boolean
*/
public static boolean validate(String script){
return validateScript(script, null);
}
/**
* 指定脚本语言验证
*
* @param script 脚本
* @param scriptType 脚本类型
* @return boolean
*/
public static boolean validate(String script, ScriptTypeEnum scriptType){
return validateScript(script, scriptType);
}
/**
* 多语言脚本批量验证
*
* @param scripts 脚本
* @return boolean
*/
public static boolean validate(Map<ScriptTypeEnum, String> scripts){
for(Map.Entry<ScriptTypeEnum, String> script : scripts.entrySet()){
if(!validate(script.getValue(), script.getKey())){
return false;
}
}
return true;
}
}

View File

@@ -7,10 +7,14 @@
*/
package com.yomahub.liteflow.slot;
import cn.hutool.core.lang.Tuple;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.util.AnnoUtil;
import com.yomahub.liteflow.context.ContextBean;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.property.LiteflowConfig;
@@ -74,13 +78,22 @@ public class DataBus {
.map((Function<Class<?>, Object>) ReflectUtil::newInstanceIfPossible)
.collect(Collectors.toList());
Slot slot = new Slot(contextBeanList);
return offerIndex(slot);
return offerSlotByBean(contextBeanList);
}
public static int offerSlotByBean(List<Object> contextList) {
Slot slot = new Slot(contextList);
List<Tuple> contextBeanList = contextList.stream().map(object -> {
ContextBean contextBean = AnnoUtil.getAnnotation(object.getClass(), ContextBean.class);
String contextKey;
if (contextBean != null && StrUtil.isNotBlank(contextBean.value())){
contextKey = contextBean.value();
}else{
contextKey = StrUtil.lowerFirst(object.getClass().getSimpleName());
}
return new Tuple(contextKey, object);
}).collect(Collectors.toList());
Slot slot = new Slot(contextBeanList);
return offerIndex(slot);
}
@@ -128,7 +141,7 @@ public class DataBus {
return SLOTS.get(slotIndex);
}
public static List<Object> getContextBeanList(int slotIndex) {
public static List<Tuple> getContextBeanList(int slotIndex) {
Slot slot = getSlot(slotIndex);
return slot.getContextBeanList();
}

View File

@@ -9,8 +9,10 @@ package com.yomahub.liteflow.slot;
import cn.hutool.core.collection.ConcurrentHashSet;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.lang.Tuple;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.yomahub.liteflow.exception.NoSuchContextBeanException;
import com.yomahub.liteflow.exception.NullParamException;
import com.yomahub.liteflow.flow.element.Condition;
@@ -29,6 +31,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Predicate;
/**
* Slot的抽象类实现
@@ -90,14 +93,14 @@ public class Slot {
protected ConcurrentHashMap<String, Object> metaDataMap = new ConcurrentHashMap<>();
private List<Object> contextBeanList;
private List<Tuple> contextBeanList;
private static final ThreadLocal<Deque<Condition>> conditionStack = ThreadLocal.withInitial(LinkedList::new);
private static final TransmittableThreadLocal<Deque<Condition>> conditionStack = TransmittableThreadLocal.withInitial(ConcurrentLinkedDeque::new);
public Slot() {
}
public Slot(List<Object> contextBeanList) {
public Slot(List<Tuple> contextBeanList) {
this.contextBeanList = contextBeanList;
}
@@ -448,21 +451,30 @@ public class Slot {
metaDataMap.remove(SUB_EXCEPTION_PREFIX + chainId);
}
public List<Object> getContextBeanList() {
public List<Tuple> getContextBeanList() {
return this.contextBeanList;
}
public <T> T getContextBean(Class<T> contextBeanClazz) {
T t = (T) contextBeanList.stream().filter(o -> o.getClass().getName().equals(contextBeanClazz.getName())).findFirst().orElse(null);
if (t == null) {
Tuple contextTuple = contextBeanList.stream().filter(tuple -> tuple.get(1).getClass().getName().equals(contextBeanClazz.getName())).findFirst().orElse(null);
if (contextTuple == null) {
contextBeanList.forEach(o -> LOG.info("ChainId[{}], Context class:{},Request class:{}", this.getChainId(), o.getClass().getName(), contextBeanClazz.getName()));
throw new NoSuchContextBeanException("this type is not in the context type passed in");
}
return t;
return contextTuple.get(1);
}
public <T> T getContextBean(String contextBeanKey) {
Tuple contextTuple = contextBeanList.stream().filter(tuple -> tuple.get(0).equals(contextBeanKey)).findFirst().orElse(null);
if (contextTuple == null) {
contextBeanList.forEach(o -> LOG.info("ChainId[{}], Context class:{},Request contextBeanKey:{}", this.getChainId(), o.getClass().getName(), contextBeanKey));
throw new NoSuchContextBeanException("this context key is not defined");
}
return contextTuple.get(1);
}
public <T> T getFirstContextBean() {
Class<T> firstContextBeanClazz = (Class<T>) this.getContextBeanList().get(0).getClass();
Class<T> firstContextBeanClazz = (Class<T>) this.getContextBeanList().get(0).get(1).getClass();
return this.getContextBean(firstContextBeanClazz);
}

View File

@@ -3,7 +3,9 @@
<!ELEMENT flow ((nodes)? , (chain)+)>
<!ELEMENT nodes (node)+>
<!ELEMENT node (#PCDATA | EMPTY)*>
<!ELEMENT chain (#PCDATA)>
<!ELEMENT chain ((route)? | (body)? | #PCDATA)>
<!ELEMENT route (#PCDATA)>
<!ELEMENT body (#PCDATA)>
<!ATTLIST node
id CDATA #REQUIRED