Merge remote-tracking branch 'upstream/dev' into dev_rule_cache_2

This commit is contained in:
DaleLee
2024-12-05 22:23:13 +08:00
25 changed files with 203 additions and 51 deletions

View File

@@ -65,6 +65,8 @@ public abstract class MaxWaitTimeOperator extends BaseOperator<Condition> {
timeoutCondition.addExecutable(executable);
timeoutCondition.setMaxWaitTime(maxWaitTime);
timeoutCondition.setMaxWaitTimeUnit(getMaxWaitTimeUnit());
timeoutCondition.setId(executable.getId());
timeoutCondition.setTag(executable.getTag());
return timeoutCondition;
}

View File

@@ -159,6 +159,8 @@ public class OperatorHelper {
/**
* 所谓Boolean item指的是那些最终的结果值为布尔类型的Item
* 布尔类型的items有ifwhilebreak类型的Node以及AndOrCondition以及NotCondition
* @param object 检查的对象
* @throws Exception 检查过程中抛的错
*/
public static void checkObjMustBeBooleanTypeItem(Object object) throws Exception{
if (!(object instanceof Executable)){

View File

@@ -484,20 +484,10 @@ public class FlowExecutor {
catch (Exception e) {
if (ObjectUtil.isNotNull(chain)) {
String errMsg = StrUtil.format("chain[{}] execute error on slot[{}]", chain.getChainId(), slotIndex);
if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())) {
LOG.error(errMsg, e);
}
else {
LOG.error(errMsg);
}
LOG.error(errMsg, e);
}
else {
if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())) {
LOG.error(e.getMessage(), e);
}
else {
LOG.error(e.getMessage());
}
LOG.error(e.getMessage(), e);
}
// 如果是正常流程需要把异常设置到slot的exception属性里

View File

@@ -406,6 +406,9 @@ public abstract class NodeComponent{
}
public void removeRefNode() {
if (this.refNodeStackTL.get() == null){
return;
}
if (this.refNodeStackTL.get().size() > 1) {
this.refNodeStackTL.get().pop();
}else{

View File

@@ -20,6 +20,7 @@ import com.yomahub.liteflow.core.ScriptComponent;
import com.yomahub.liteflow.core.proxy.DeclWarpBean;
import com.yomahub.liteflow.enums.FlowParserTypeEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.enums.ParseModeEnum;
import com.yomahub.liteflow.exception.ComponentCannotRegisterException;
import com.yomahub.liteflow.exception.NullNodeTypeException;
import com.yomahub.liteflow.flow.element.Chain;
@@ -53,6 +54,7 @@ import java.util.stream.Stream;
*
* @author Bryan.Zhang
* @author DaleLee
* @author Jay li
*/
public class FlowBus {
@@ -130,7 +132,9 @@ public class FlowBus {
/**
* 添加已托管的节点Spring、Solon 管理的节点)
* */
* @param nodeId nodeId
* @param nodeComponent nodeComponent
*/
public static void addManagedNode(String nodeId, NodeComponent nodeComponent) {
// 根据class来猜测类型
NodeTypeEnum type = NodeTypeEnum.guessType(nodeComponent.getClass());
@@ -180,12 +184,38 @@ public class FlowBus {
* @param name 节点名称
* @param nodeType 节点类型
* @param script 脚本
* @param language 语言
*/
public static void addScriptNode(String nodeId, String name, NodeTypeEnum nodeType, String script,
String language) {
addNode(nodeId, name, nodeType, ScriptComponent.ScriptComponentClassMap.get(nodeType), script, language);
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 如果是PARSE_ONE_ON_FIRST_EXEC模式则不进行脚本加载而是直接把脚本内容放到node中
if (liteflowConfig.getParseMode().equals(ParseModeEnum.PARSE_ONE_ON_FIRST_EXEC)) {
Node node = new Node(nodeId, name, nodeType, script, language);
nodeMap.put(nodeId, node);
} else {
addScriptNodeAndCompile(nodeId, name, nodeType, script, language);
}
}
/**
* 添加脚本 node并且编译脚本
* @param nodeId nodeId
* @param name name
* @param type type
* @param script script content
* @param language language
* @return NodeComponent instance
*/
public static NodeComponent addScriptNodeAndCompile(String nodeId, String name, NodeTypeEnum type, String script,
String language) {
addNode(nodeId, name, type, ScriptComponent.ScriptComponentClassMap.get(type), script, language);
return nodeMap.get(nodeId).getInstance();
}
private static void addNode(String nodeId, String name, NodeTypeEnum type, Class<?> cmpClazz, String script,
String language) {
try {
@@ -245,6 +275,7 @@ public class FlowBus {
}
String activeNodeId = StrUtil.isEmpty(cmpInstance.getNodeId()) ? nodeId : cmpInstance.getNodeId();
node.setCompiled(true);
put2NodeMap(activeNodeId, node);
addFallbackNode(node);
}

View File

@@ -70,6 +70,7 @@ public class Chain implements Executable{
/**
* @deprecated 请使用{@link #getChainId()}
* @return chainId
*/
@Deprecated
public String getChainName() {
@@ -77,7 +78,7 @@ public class Chain implements Executable{
}
/**
* @param chainName
* @param chainName chainId
* @deprecated 请使用 {@link #setChainId(String)}
*/
public void setChainName(String chainName) {

View File

@@ -173,6 +173,7 @@ public abstract class Condition implements Executable{
/**
* 请使用 {@link #setCurrChainId(String)}
* @return currChainId
*/
@Deprecated
public String getCurrChainName() {

View File

@@ -18,7 +18,7 @@ public interface Executable{
ExecuteableTypeEnum getExecuteType();
/**
* @return
* @return executeName or id
* @deprecated 请使用 {@link #getId()}
*/
@Deprecated
@@ -27,7 +27,7 @@ public interface Executable{
}
/**
* @param currentChainName
* @param currentChainName currentChainName
* @deprecated 请使用 {@link #setCurrChainId(String)}
*/
default void setCurrChainName(String currentChainName) {

View File

@@ -22,13 +22,14 @@ import com.yomahub.liteflow.flow.executor.NodeExecutor;
import com.yomahub.liteflow.flow.executor.NodeExecutorHelper;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.util.TupleOf2;
import java.util.Stack;
import java.util.concurrent.locks.ReentrantLock;
import static com.yomahub.liteflow.flow.FlowBus.*;
/**
* Node节点实现可执行器 Node节点并不是单例的每构建一次都会copy出一个新的实例
*
@@ -64,6 +65,8 @@ public class Node implements Executable, Cloneable, Rollbackable{
private String currChainId;
private boolean isCompiled = true;
// node 的 isAccess 结果,主要用于 WhenCondition 的提前 isAccess 判断,避免 isAccess 方法重复执行
private TransmittableThreadLocal<Boolean> accessResult = new TransmittableThreadLocal<>();
@@ -94,6 +97,17 @@ public class Node implements Executable, Cloneable, Rollbackable{
this.clazz = instance.getClass().getName();
}
public Node(String nodeId, String name, NodeTypeEnum nodeType, String script, String language) {
this.id = nodeId;
this.name = name;
this.type = nodeType;
this.script = script;
this.language = language;
this.isCompiled = false;
}
@Override
public String getId() {
return id;
@@ -139,6 +153,10 @@ public class Node implements Executable, Cloneable, Rollbackable{
}
public NodeComponent getInstance() {
// 没有编译的情况,需重新编译
if (!this.isCompiled()) {
this.instance = addScriptNodeAndCompile(id, name, type, script, language);
}
return instance;
}
@@ -150,7 +168,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
// 所有的可执行节点其实最终都会落到node上来因为chain中包含的也是node
@Override
public void execute(Integer slotIndex) throws Exception {
if (ObjectUtil.isNull(instance)) {
if (ObjectUtil.isNull(getInstance())) {
throw new FlowSystemException("there is no instance for node id " + id);
}
@@ -212,12 +230,10 @@ public class Node implements Executable, Cloneable, Rollbackable{
// 回滚的主要逻辑
@Override
public void rollback(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
try {
// 把线程属性赋值给组件对象
this.setSlotIndex(slotIndex);
instance.setRefNode(this);
getInstance().setRefNode(this);
instance.doRollback();
}
catch (Exception e) {
@@ -239,7 +255,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
public boolean isAccess(Integer slotIndex) throws Exception {
// 把线程属性赋值给组件对象
this.setSlotIndex(slotIndex);
instance.setRefNode(this);
getInstance().setRefNode(this);
return instance.isAccess();
}
@@ -469,9 +485,17 @@ public class Node implements Executable, Cloneable, Rollbackable{
this.language = language;
}
public boolean isCompiled() {
return isCompiled;
}
public void setCompiled(boolean compiled) {
isCompiled = compiled;
}
@Override
public <T> T getItemResultMetaValue(Integer slotIndex) {
return instance.getItemResultMetaValue(slotIndex);
return getInstance().getItemResultMetaValue(slotIndex);
}
@Override

View File

@@ -21,6 +21,8 @@ public abstract class NodeExecutor {
/**
* 执行器执行入口-若需要更大维度的执行方式可以重写该方法
* @param instance instance
* @throws Exception 执行过程中抛的错
*/
public void execute(NodeComponent instance) throws Exception {
int retryCount = instance.getRetryCount();
@@ -54,6 +56,9 @@ public abstract class NodeExecutor {
/**
* 执行重试逻辑 - 子类通过实现该方法进行重试逻辑的控制
* @param instance instance
* @param currentRetryCount currentRetryCount
* @throws Exception 抛出重试执行过程中的错
*/
protected void retry(NodeComponent instance, int currentRetryCount) throws Exception {
Slot slot = DataBus.getSlot(instance.getSlotIndex());

View File

@@ -34,7 +34,7 @@ public class NodeExecutorHelper {
/**
* 获取帮助者的实例
* @return
* @return NodeExecutorHelper
*/
public static NodeExecutorHelper loadInstance() {
// 外围类能直接访问内部类(不管是否是静态的)的私有变量

View File

@@ -10,6 +10,7 @@ public class CompletableFutureExpand {
*
* @param timeout 在出现 TimeoutException 异常完成之前等待多长时间,以 {@code unit} 为单位
* @param unit 一个 {@link TimeUnit},结合 {@code timeout} 参数,表示给定粒度单位的持续时间
* @param timeoutDefaultObj timeoutDefaultObj
* @return 入参的 CompletableFuture
*/
public static <T> CompletableFuture<T> completeOnTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit, T timeoutDefaultObj) {

View File

@@ -42,12 +42,12 @@ public abstract class ParallelStrategyExecutor {
/**
* 封装 CompletableFuture 对象
* @param executable
* @param parallelExecutor
* @param whenCondition
* @param currChainId
* @param slotIndex
* @return
* @param executable executable
* @param parallelExecutor parallelExecutor
* @param whenCondition whenCondition
* @param currChainId currChainId
* @param slotIndex slotIndex
* @return CompletableFuture
*/
protected CompletableFuture<WhenFutureObj> wrappedFutureObj(Executable executable, ExecutorService parallelExecutor,
WhenCondition whenCondition, String currChainId, Integer slotIndex) {
@@ -62,7 +62,7 @@ public abstract class ParallelStrategyExecutor {
/**
* 设置 WhenCondition 参数
* @param whenCondition
* @param whenCondition whenCondition
*/
protected void setWhenConditionParams(WhenCondition whenCondition) {
// 获得 liteflow 的参数
@@ -87,9 +87,9 @@ public abstract class ParallelStrategyExecutor {
/**
* 过滤 WHEN 待执行任务
* @param executableList 所有任务列表
* @param slotIndex
* @param slotIndex slotIndex
* @param currentChainId 当前执行的 chainId
* @return
* @return Executable的Stream对象
*/
protected Stream<Executable> filterWhenTaskList(List<Executable> executableList, Integer slotIndex, String currentChainId) {
// 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了
@@ -120,9 +120,9 @@ public abstract class ParallelStrategyExecutor {
/**
* 获取所有任务 CompletableFuture 集合
* @param whenCondition
* @param slotIndex
* @return
* @param whenCondition whenCondition
* @param slotIndex slotIndex
* @return List
*/
protected List<CompletableFuture<WhenFutureObj>> getWhenAllTaskList(WhenCondition whenCondition, Integer slotIndex) {
@@ -152,7 +152,7 @@ public abstract class ParallelStrategyExecutor {
* @param slotIndex 当前 slot 的 index
* @param whenAllFutureList 并行组件中所有任务列表
* @param specifyTask 指定预先完成的任务,详见 {@link ParallelStrategyEnum}
* @throws Exception
* @throws Exception 处理过程中抛的异常
*/
protected void handleTaskResult(WhenCondition whenCondition, Integer slotIndex, List<CompletableFuture<WhenFutureObj>> whenAllFutureList,
CompletableFuture<?> specifyTask) throws Exception {

View File

@@ -64,7 +64,7 @@ public class ParallelStrategyHelper {
/**
* 默认需完成所有任务
* @return
* @return ParallelStrategyExecutor
*/
public ParallelStrategyExecutor buildParallelExecutor() {
return buildParallelExecutor(ParallelStrategyEnum.ALL);

View File

@@ -373,6 +373,8 @@ public class LFLog implements Logger{
public void error(String s, Throwable throwable) {
if (isPrint()) {
this.log.error(getRId() + s, throwable);
}else{
this.log.error(getRId() + s);
}
}

View File

@@ -53,7 +53,7 @@ public class MonitorFile {
/**
* 创建文件监听
*/
*/
public void create() throws Exception {
for (String path : PATH_SET) {
long interval = TimeUnit.MILLISECONDS.toMillis(2);

View File

@@ -382,9 +382,7 @@ public class Slot {
if (ObjectUtil.isNull(this.executeStepsStr)) {
this.executeStepsStr = getExecuteStepStr(true);
}
if (LiteflowConfigGetter.get().getPrintExecutionLog()) {
LOG.info("CHAIN_NAME[{}]\n{}", this.getChainName(), this.executeStepsStr);
}
LOG.info("CHAIN_NAME[{}]\n{}", this.getChainName(), this.executeStepsStr);
}
public void addRollbackStep(CmpStep step) {
@@ -418,9 +416,7 @@ public class Slot {
if (ObjectUtil.isNull(this.rollbackStepsStr)) {
this.rollbackStepsStr = getRollbackStepStr(true);
}
if (LiteflowConfigGetter.get().getPrintExecutionLog()) {
LOG.info("ROLLBACK_CHAIN_NAME[{}]\n{}", this.getChainName(), this.rollbackStepsStr);
}
LOG.info("ROLLBACK_CHAIN_NAME[{}]\n{}", this.getChainName(), this.rollbackStepsStr);
}

View File

@@ -6,7 +6,7 @@ package com.yomahub.liteflow.thread.ExecutorCondition;
* <p>Description: 执行器条件对象</p>
*
* @author jason
* @Date 2024/11/12
* @since 2.13.0
*/
public class ExecutorCondition {
private final boolean conditionLevel;

View File

@@ -15,13 +15,18 @@ import com.yomahub.liteflow.property.LiteflowConfig;
* <p>Description: 执行器构建对象</p>
*
* @author jason
* @Date 2024/11/12
* @since 2.13.0
*/
public class ExecutorConditionBuilder {
/**
* 构建执行器条件
* @param condition condition
* @param chain chain
* @param liteflowConfig liteflowConfig
* @param type type
* @return ExecutorCondition
*/
public static ExecutorCondition buildExecutorCondition(
Condition condition,

View File

@@ -0,0 +1,72 @@
package com.yomahub.liteflow.test.script.qlexpress;
import cn.hutool.core.io.resource.ResourceUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.enums.FlowParserTypeEnum;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.entity.CmpStep;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
import static com.yomahub.liteflow.enums.NodeTypeEnum.SCRIPT;
/**
* PARSE_ONE_ON_FIRST_EXEC 第一次执行时解析脚本节点
* 测试springboot下的脚本组件
*
* @author jay li
* @since 2.12.4
*/
@ExtendWith(SpringExtension.class)
@TestPropertySource(value = "classpath:/xml-script/application-parse-first.properties")
@SpringBootTest(classes = LiteflowXmlScriptQLExpressELParseModeTest.class)
@EnableAutoConfiguration
@ComponentScan({ "com.yomahub.liteflow.test.script.qlexpress.cmp" })
public class LiteflowXmlScriptQLExpressELParseModeTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
// 测试启动时未加载,执行后加载
@Test
public void testScript() {
Map<String, Node> nodeMap = FlowBus.getNodeMap();
for (Map.Entry<String, Node> entry : nodeMap.entrySet()) {
Node node = entry.getValue();
if (SCRIPT.equals(node.getType())) {
Assertions.assertFalse(node.isCompiled());
}
}
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals(Integer.valueOf(6), context.getData("s1"));
// 验证脚本节点是否被重新编译
Set<String> nodeIds = response.getExecuteStepQueue().stream().map(CmpStep::getNodeId).collect(Collectors.toSet());
nodeMap = FlowBus.getNodeMap();
for (Map.Entry<String, Node> entry : nodeMap.entrySet()) {
Node node = entry.getValue();
if (SCRIPT.equals(node.getType()) && nodeIds.contains(node.getId())) {
Assertions.assertTrue(node.isCompiled());
Assertions.assertNotNull(node.getInstance());
}
}
}
}

View File

@@ -0,0 +1,2 @@
liteflow.rule-source=xml-script/flow.el.xml
liteflow.parse-mode=PARSE_ONE_ON_FIRST_EXEC

View File

@@ -111,6 +111,11 @@ public class MaxWaitSecondsELSpringbootTest extends BaseTest {
assertNotTimeout("switch2");
}
@Test
public void testSwitch3() {
assertNotTimeout("switch3");
}
// 测试 IF 的超时情况
@Test
public void testIf1() {

View File

@@ -62,6 +62,11 @@
SWITCH(s).TO(a, b).maxWaitSeconds(3);
</chain>
<chain name="switch3">
<!-- 不超时 -->
SWITCH(s).TO(a.maxWaitSeconds(3), b.maxWaitSeconds(2));
</chain>
<!-- 条件编排测试 -->
<!-- f 返回 true -->
<chain name="if1">

View File

@@ -80,6 +80,12 @@
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc</artifactId>
<version>${shardingsphere.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-test-util</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@@ -39,7 +39,7 @@
</scm>
<properties>
<revision>2.12.4.2</revision>
<revision>2.13.0</revision>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>8</maven.compiler.source>
@@ -380,7 +380,6 @@
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<!-- Javadoc -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>