bug #I5F73R 隐式子流程如果是并发的,则初始参数在并发中会被覆盖的问题

This commit is contained in:
everywhere.z
2022-07-19 00:49:26 +08:00
parent 8d1b9e701e
commit 0409561d45
17 changed files with 350 additions and 6 deletions

View File

@@ -7,6 +7,7 @@
*/
package com.yomahub.liteflow.slot;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.exception.NoSuchContextBeanException;
import com.yomahub.liteflow.exception.NullParamException;
@@ -112,11 +113,23 @@ public class Slot{
}
public <T> T getChainReqData(String chainId) {
return (T) metaDataMap.get(CHAIN_REQ_PREFIX + chainId);
String key = CHAIN_REQ_PREFIX + chainId;
if (hasMetaData(key)){
Queue<Object> queue = (Queue<Object>) metaDataMap.get(key);
return (T)queue.poll();
}else{
return null;
}
}
public <T> void setChainReqData(String chainId, T t) {
putMetaDataMap(CHAIN_REQ_PREFIX + chainId, t);
String key = CHAIN_REQ_PREFIX + chainId;
if (hasMetaData(key)){
Queue<Object> queue = (Queue<Object>) metaDataMap.get(key);
queue.offer(t);
}else{
putMetaDataMap(key, new ConcurrentLinkedQueue<>(ListUtil.toList(t)));
}
}
public <T> void setPrivateDeliveryData(String nodeId, T t){

View File

@@ -36,7 +36,7 @@ public class ImplicitSubFlowELDeclSpringbootTest extends BaseTest {
//这里GCmp中隐式的调用chain4从而执行了hm
@Test
public void testImplicitSubFlow() {
public void testImplicitSubFlow1() {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "it's a request");
DefaultContext context = response.getFirstContextBean();
Assert.assertTrue(response.isSuccess());
@@ -49,4 +49,17 @@ public class ImplicitSubFlowELDeclSpringbootTest extends BaseTest {
//requestData的取值正确
Assert.assertEquals("it's implicit subflow.", context.getData("innerRequest"));
}
//在p里多线程调用q 10次每个q取到的参数都是不同的。
@Test
public void testImplicitSubFlow2() {
LiteflowResponse response = flowExecutor.execute2Resp("c1", "it's a request");
DefaultContext context = response.getFirstContextBean();
Assert.assertTrue(response.isSuccess());
Set<String> set = context.getData("test");
//requestData的取值正确
Assert.assertEquals(10, set.size());
}
}

View File

@@ -0,0 +1,35 @@
package com.yomahub.liteflow.test.subflow.cmp2;
import com.yomahub.liteflow.annotation.LiteflowCmpDefine;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.flow.element.Node;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component("p")
@LiteflowCmpDefine
public class PCmp{
@Autowired
private FlowExecutor flowExecutor;
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public void process(NodeComponent bindCmp) throws Exception {
int slotIndex = bindCmp.getSlotIndex();
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
try {
flowExecutor.invoke("c2", "it's implicit subflow " + finalI, slotIndex);
} catch (Exception e) {
throw new RuntimeException(e);
}
}).start();
}
Thread.sleep(1000);
}
}

View File

@@ -0,0 +1,33 @@
package com.yomahub.liteflow.test.subflow.cmp2;
import com.yomahub.liteflow.annotation.LiteflowCmpDefine;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
@Component("q")
@LiteflowCmpDefine
public class QCmp{
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public void process(NodeComponent bindCmp) throws Exception {
String requestData = bindCmp.getSubChainReqData();
DefaultContext context = bindCmp.getFirstContextBean();
synchronized (QCmp.class){
if (context.hasData("test")){
Set<String> set = context.getData("test");
set.add(requestData);
}else{
Set<String> set = new HashSet<>();
set.add(requestData);
context.setData("test", set);
}
}
}
}

View File

@@ -7,4 +7,12 @@
<chain name="chain4">
THEN(h, m);
</chain>
<chain name="c1">
THEN(p);
</chain>
<chain name="c2">
THEN(q);
</chain>
</flow>

View File

@@ -34,7 +34,7 @@ public class ImplicitSubFlowTest extends BaseTest {
//这里GCmp中隐式的调用chain4从而执行了hm
@Test
public void testImplicitSubFlow() {
public void testImplicitSubFlow1() {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "it's a request");
DefaultContext context = response.getFirstContextBean();
Assert.assertTrue(response.isSuccess());
@@ -47,4 +47,17 @@ public class ImplicitSubFlowTest extends BaseTest {
//requestData的取值正确
Assert.assertEquals("it's implicit subflow.", context.getData("innerRequest"));
}
//在p里多线程调用q 10次每个q取到的参数都是不同的。
@Test
public void testImplicitSubFlow2() {
LiteflowResponse response = flowExecutor.execute2Resp("c1", "it's a request");
DefaultContext context = response.getFirstContextBean();
Assert.assertTrue(response.isSuccess());
Set<String> set = context.getData("test");
//requestData的取值正确
Assert.assertEquals(10, set.size());
}
}

View File

@@ -0,0 +1,27 @@
package com.yomahub.liteflow.test.subflow.cmp2;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.core.FlowExecutorHolder;
import com.yomahub.liteflow.core.NodeComponent;
public class PCmp extends NodeComponent {
private FlowExecutor flowExecutor = FlowExecutorHolder.loadInstance();
@Override
public void process() throws Exception {
int slotIndex = this.getSlotIndex();
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
try {
flowExecutor.invoke("c2", "it's implicit subflow " + finalI, slotIndex);
} catch (Exception e) {
throw new RuntimeException(e);
}
}).start();
}
Thread.sleep(1000);
}
}

View File

@@ -0,0 +1,27 @@
package com.yomahub.liteflow.test.subflow.cmp2;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import java.util.HashSet;
import java.util.Set;
public class QCmp extends NodeComponent {
@Override
public void process() throws Exception {
String requestData = this.getSubChainReqData();
DefaultContext context = this.getFirstContextBean();
synchronized (QCmp.class){
if (context.hasData("test")){
Set<String> set = context.getData("test");
set.add(requestData);
}else{
Set<String> set = new HashSet<>();
set.add(requestData);
context.setData("test", set);
}
}
}
}

View File

@@ -5,7 +5,10 @@
<node id="g" class="com.yomahub.liteflow.test.subflow.cmp2.GCmp"/>
<node id="h" class="com.yomahub.liteflow.test.subflow.cmp2.HCmp"/>
<node id="m" class="com.yomahub.liteflow.test.subflow.cmp2.MCmp"/>
<node id="p" class="com.yomahub.liteflow.test.subflow.cmp2.PCmp"/>
<node id="q" class="com.yomahub.liteflow.test.subflow.cmp2.QCmp"/>
</nodes>
<chain name="chain3">
THEN(f, g);
</chain>
@@ -13,4 +16,12 @@
<chain name="chain4">
THEN(h, m);
</chain>
<chain name="c1">
THEN(p);
</chain>
<chain name="c2">
THEN(q);
</chain>
</flow>

View File

@@ -36,7 +36,7 @@ public class ImplicitSubFlowELSpringbootTest extends BaseTest {
//这里GCmp中隐式的调用chain4从而执行了hm
@Test
public void testImplicitSubFlow() {
public void testImplicitSubFlow1() {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "it's a request");
DefaultContext context = response.getFirstContextBean();
Assert.assertTrue(response.isSuccess());
@@ -49,4 +49,17 @@ public class ImplicitSubFlowELSpringbootTest extends BaseTest {
//requestData的取值正确
Assert.assertEquals("it's implicit subflow.", context.getData("innerRequest"));
}
//在p里多线程调用q 10次每个q取到的参数都是不同的。
@Test
public void testImplicitSubFlow2() {
LiteflowResponse response = flowExecutor.execute2Resp("c1", "it's a request");
DefaultContext context = response.getFirstContextBean();
Assert.assertTrue(response.isSuccess());
Set<String> set = context.getData("test");
//requestData的取值正确
Assert.assertEquals(10, set.size());
}
}

View File

@@ -0,0 +1,32 @@
package com.yomahub.liteflow.test.subflow.cmp2;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static com.yomahub.liteflow.test.subflow.ImplicitSubFlowELSpringbootTest.RUN_TIME_SLOT;
@Component("p")
public class PCmp extends NodeComponent {
@Autowired
private FlowExecutor flowExecutor;
@Override
public void process() throws Exception {
int slotIndex = this.getSlotIndex();
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
try {
flowExecutor.invoke("c2", "it's implicit subflow " + finalI, slotIndex);
} catch (Exception e) {
throw new RuntimeException(e);
}
}).start();
}
Thread.sleep(1000);
}
}

View File

@@ -0,0 +1,31 @@
package com.yomahub.liteflow.test.subflow.cmp2;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
import static com.yomahub.liteflow.test.subflow.ImplicitSubFlowELSpringbootTest.RUN_TIME_SLOT;
@Component("q")
public class QCmp extends NodeComponent {
@Override
public void process() throws Exception {
String requestData = this.getSubChainReqData();
DefaultContext context = this.getFirstContextBean();
synchronized (QCmp.class){
if (context.hasData("test")){
Set<String> set = context.getData("test");
set.add(requestData);
}else{
Set<String> set = new HashSet<>();
set.add(requestData);
context.setData("test", set);
}
}
}
}

View File

@@ -7,4 +7,12 @@
<chain name="chain4">
THEN(h, m);
</chain>
<chain name="c1">
THEN(p);
</chain>
<chain name="c2">
THEN(q);
</chain>
</flow>

View File

@@ -30,7 +30,7 @@ public class ImplicitSubFlowELSpringTest extends BaseTest {
//这里GCmp中隐式的调用chain4从而执行了hm
@Test
public void testImplicitSubFlow() {
public void testImplicitSubFlow1() {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "it's a request");
DefaultContext context = response.getFirstContextBean();
Assert.assertTrue(response.isSuccess());
@@ -43,4 +43,17 @@ public class ImplicitSubFlowELSpringTest extends BaseTest {
//requestData的取值正确
Assert.assertEquals("it's implicit subflow.", context.getData("innerRequest"));
}
//在p里多线程调用q 10次每个q取到的参数都是不同的。
@Test
public void testImplicitSubFlow2() {
LiteflowResponse response = flowExecutor.execute2Resp("c1", "it's a request");
DefaultContext context = response.getFirstContextBean();
Assert.assertTrue(response.isSuccess());
Set<String> set = context.getData("test");
//requestData的取值正确
Assert.assertEquals(10, set.size());
}
}

View File

@@ -0,0 +1,30 @@
package com.yomahub.liteflow.test.subflow.cmp2;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component("p")
public class PCmp extends NodeComponent {
@Autowired
private FlowExecutor flowExecutor;
@Override
public void process() throws Exception {
int slotIndex = this.getSlotIndex();
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
try {
flowExecutor.invoke("c2", "it's implicit subflow " + finalI, slotIndex);
} catch (Exception e) {
throw new RuntimeException(e);
}
}).start();
}
Thread.sleep(1000);
}
}

View File

@@ -0,0 +1,29 @@
package com.yomahub.liteflow.test.subflow.cmp2;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
@Component("q")
public class QCmp extends NodeComponent {
@Override
public void process() throws Exception {
String requestData = this.getSubChainReqData();
DefaultContext context = this.getFirstContextBean();
synchronized (QCmp.class){
if (context.hasData("test")){
Set<String> set = context.getData("test");
set.add(requestData);
}else{
Set<String> set = new HashSet<>();
set.add(requestData);
context.setData("test", set);
}
}
}
}

View File

@@ -7,4 +7,12 @@
<chain name="chain4">
THEN(h, m)
</chain>
<chain name="c1">
THEN(p);
</chain>
<chain name="c2">
THEN(q);
</chain>
</flow>