enhancement #ICANTH 隐式子流程改版

This commit is contained in:
everywhere.z
2025-08-20 18:10:41 +08:00
parent 7ac6799681
commit fba92dace1
15 changed files with 307 additions and 19 deletions

View File

@@ -496,7 +496,7 @@ public class FlowExecutor {
LOG.info("slot[{}] offered", slotIndex);
if (ObjectUtil.isNotNull(param)) {
slot.setRequestData(param);
slot.setChainReqData(chainId, param);
}
Chain chain = null;

View File

@@ -8,6 +8,7 @@
package com.yomahub.liteflow.core;
import cn.hutool.core.date.StopWatch;
import cn.hutool.core.lang.Tuple;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
@@ -36,6 +37,9 @@ import java.lang.reflect.Method;
import java.util.Date;
import java.util.List;
import java.util.Stack;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 普通组件抽象类
@@ -347,15 +351,7 @@ public abstract class NodeComponent{
}
public <T> T getRequestData() {
return getSlot().getRequestData();
}
public <T> T getSubChainReqData() {
return getSlot().getChainReqData(this.getCurrChainId());
}
public <T> T getSubChainReqDataInAsync() {
return getSlot().getChainReqDataFromQueue(this.getCurrChainId());
return getSlot().getChainReqData(getCurrChainId());
}
public boolean isRollback() {
@@ -535,4 +531,19 @@ public abstract class NodeComponent{
return originalClass.getName();
}
public LiteflowResponse invoke2Resp(String chainId, Object requestData){
return invoke2Resp(chainId, requestData, this.getSlot());
}
public LiteflowResponse invoke2Resp(String chainId, Object requestData, Slot slot){
LiteflowResponse response = FlowExecutorHolder.loadInstance().execute2RespWithRid(chainId,
requestData,
slot.getRequestId(),
slot.getContextBeanList().stream().map(tuple -> tuple.get(1)).toArray());
response.getExecuteStepQueue().forEach(slot::addStep);
return response;
}
}

View File

@@ -77,7 +77,7 @@ public abstract class ScriptExecutor {
// 在元数据里放入主Chain的流程参数
Slot slot = DataBus.getSlot(wrap.getSlotIndex());
metaMap.put("requestData", slot.getRequestData());
metaMap.put("requestData", wrap.cmp.getRequestData());
// 如果有隐式流程,则放入隐式流程的流程参数
Object subRequestData = slot.getChainReqData(wrap.getCurrChainId());

View File

@@ -159,14 +159,6 @@ public class Slot {
putMetaDataMap(NODE_OUTPUT_PREFIX + nodeId, t);
}
public <T> T getRequestData() {
return (T) metaDataMap.get(REQUEST);
}
public <T> void setRequestData(T t) {
putMetaDataMap(REQUEST, t);
}
public <T> T getResponseData() {
return (T) metaDataMap.get(RESPONSE);
}

View File

@@ -0,0 +1,70 @@
package com.yomahub.liteflow.test.implicitChain;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import javax.annotation.Resource;
import java.util.HashSet;
import java.util.Set;
/**
* 测试隐式调用子流程 单元测试
*
* @author justin.xu
*/
@TestPropertySource(value = "classpath:/implicitChain/application.properties")
@SpringBootTest(classes = ImplicitChainELSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({ "com.yomahub.liteflow.test.implicitChain.cmp" })
public class ImplicitChainELSpringbootTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
public static final Set<String> RUN_TIME_SLOT = new HashSet<>();
// 这里GCmp中隐式的调用chain4从而执行了hm
@Test
public void testImplicitSubFlow1() {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "it's a request");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("f==>g==>h==>m", response.getExecuteStepStr());
// 传递了slotIndex则set的size==1
Assertions.assertEquals(1, RUN_TIME_SLOT.size());
// set中第一次设置的requestId和response中的requestId一致
Assertions.assertTrue(RUN_TIME_SLOT.contains(response.getSlot().getRequestId()));
// requestData的取值正确
Assertions.assertEquals("it's implicit subflow.", context.getData("innerRequest"));
}
// 在p里多线程调用q 10次每个q取到的参数都是不同的。
@Test
public void testImplicitSubFlow2() {
LiteflowResponse response = flowExecutor.execute2Resp("c1", "it's a request");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Set<String> set = context.getData("test");
// requestData的取值正确
Assertions.assertEquals(10, set.size());
}
@Test
public void testImplicitSubFlow3() {
LiteflowResponse response = flowExecutor.execute2Resp("chain_r", "it's a request");
Assertions.assertTrue(response.isSuccess());
}
}

View File

@@ -0,0 +1,22 @@
package com.yomahub.liteflow.test.implicitChain.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.implicitChain.ImplicitChainELSpringbootTest;
import org.springframework.stereotype.Component;
@Component("f")
public class FCmp extends NodeComponent {
@Override
public void process() throws Exception {
ImplicitChainELSpringbootTest.RUN_TIME_SLOT.add(this.getSlot().getRequestId());
DefaultContext context = this.getFirstContextBean();
context.setData("innerRequestData", "inner request");
System.out.println("Fcomp executed!");
}
}

View File

@@ -0,0 +1,26 @@
package com.yomahub.liteflow.test.implicitChain.cmp;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.test.implicitChain.ImplicitChainELSpringbootTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component("g")
public class GCmp extends NodeComponent {
@Autowired
private FlowExecutor flowExecutor;
@Override
public void process() throws Exception {
ImplicitChainELSpringbootTest.RUN_TIME_SLOT.add(this.getSlot().getRequestId());
System.out.println("Gcmp executed!");
this.invoke2Resp("chain4", "it's implicit subflow.");
}
}

View File

@@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.implicitChain.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.implicitChain.ImplicitChainELSpringbootTest;
import org.springframework.stereotype.Component;
@Component("h")
public class HCmp extends NodeComponent {
@Override
public void process() throws Exception {
String requestData = this.getRequestData();
DefaultContext context = this.getFirstContextBean();
context.setData("innerRequest", requestData);
ImplicitChainELSpringbootTest.RUN_TIME_SLOT.add(this.getSlot().getRequestId());
System.out.println("Hcomp executed!");
}
}

View File

@@ -0,0 +1,19 @@
package com.yomahub.liteflow.test.implicitChain.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.test.implicitChain.ImplicitChainELSpringbootTest;
import org.springframework.stereotype.Component;
@Component("m")
public class MCmp extends NodeComponent {
@Override
public void process() throws Exception {
ImplicitChainELSpringbootTest.RUN_TIME_SLOT.add(this.getSlot().getRequestId());
System.out.println("Mcomp executed!");
}
}

View File

@@ -0,0 +1,39 @@
package com.yomahub.liteflow.test.implicitChain.cmp;
import com.alibaba.ttl.threadpool.TtlExecutors;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.Slot;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component("p")
public class PCmp extends NodeComponent {
private ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newCachedThreadPool());
@Override
public void process() throws Exception {
List<CompletableFuture<Void>> fList = new ArrayList<>();
Slot slot = this.getSlot();
for (int i = 0; i < 10; i++) {
int finalI = i;
CompletableFuture<Void> future = CompletableFuture.runAsync(
() -> invoke2Resp("c2", "it's implicit subflow " + finalI, slot), executorService
);
fList.add(future);
}
CompletableFuture.allOf(fList.toArray(new CompletableFuture[0])).get();
}
}

View File

@@ -0,0 +1,31 @@
package com.yomahub.liteflow.test.implicitChain.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
@Component("q")
public class QCmp extends NodeComponent {
@Override
public void process() throws Exception {
String requestData = this.getRequestData();
DefaultContext context = this.getFirstContextBean();
synchronized (QCmp.class) {
if (context.hasData("test")) {
Set<String> set = context.getData("test");
set.add(requestData);
}
else {
Set<String> set = new HashSet<>();
set.add(requestData);
context.setData("test", set);
}
}
}
}

View File

@@ -0,0 +1,14 @@
package com.yomahub.liteflow.test.implicitChain.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeComponent;
@LiteflowComponent("r")
public class R extends NodeComponent {
@Override
public void process() throws Exception {
this.invoke2Resp("chain_s", "");
}
}

View File

@@ -0,0 +1,14 @@
package com.yomahub.liteflow.test.implicitChain.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeComponent;
@LiteflowComponent("s")
public class S extends NodeComponent {
@Override
public void process() throws Exception {
throw new RuntimeException("test");
}
}

View File

@@ -0,0 +1 @@
liteflow.rule-source=implicitChain/flow.xml

View File

@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain3">
THEN(f, g);
</chain>
<chain name="chain4">
THEN(h, m);
</chain>
<chain name="c1">
THEN(p);
</chain>
<chain name="c2">
THEN(q);
</chain>
<chain name="chain_r">
THEN(r);
</chain>
<chain name="chain_s">
THEN(s);
</chain>
</flow>