This commit is contained in:
daiqi
2022-01-18 01:19:28 +08:00
parent e4819f17cb
commit a58e3da8db
6 changed files with 79 additions and 21 deletions

View File

@@ -13,6 +13,7 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReUtil; import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.yomahub.liteflow.entity.flow.Node;
import com.yomahub.liteflow.enums.FlowParserTypeEnum; import com.yomahub.liteflow.enums.FlowParserTypeEnum;
import com.yomahub.liteflow.exception.*; import com.yomahub.liteflow.exception.*;
import com.yomahub.liteflow.parser.*; import com.yomahub.liteflow.parser.*;
@@ -266,6 +267,11 @@ public class FlowExecutor {
this.execute(chainId, param, slotClazz, slotIndex, true); this.execute(chainId, param, slotClazz, slotIndex, true);
} }
public <T extends Slot> void invoke(String nodeId, Integer slotIndex) throws Exception {
Node node = FlowBus.getNode(nodeId);
node.execute(slotIndex);
}
public DefaultSlot execute(String chainId) throws Exception { public DefaultSlot execute(String chainId) throws Exception {
return this.execute(chainId, null, DefaultSlot.class, null, false); return this.execute(chainId, null, DefaultSlot.class, null, false);
} }

View File

@@ -127,6 +127,15 @@ public abstract class AbsSlot implements Slot {
} }
} }
public <T> Queue<T> getPrivateDeliveryQueue(String nodeId){
String privateDKey = PRIVATE_DELIVERY_PREFIX + nodeId;
if(dataMap.containsKey(privateDKey)){
return (Queue<T>) dataMap.get(privateDKey);
}else{
return null;
}
}
public <T> T getPrivateDeliveryData(String nodeId){ public <T> T getPrivateDeliveryData(String nodeId){
String privateDKey = PRIVATE_DELIVERY_PREFIX + nodeId; String privateDKey = PRIVATE_DELIVERY_PREFIX + nodeId;
if(dataMap.containsKey(privateDKey)){ if(dataMap.containsKey(privateDKey)){

View File

@@ -8,23 +8,36 @@
package com.yomahub.liteflow.test.privateDelivery.cmp; package com.yomahub.liteflow.test.privateDelivery.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent; import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.core.NodeComponent; import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.entity.data.DefaultSlot;
import com.yomahub.liteflow.entity.data.Slot; import com.yomahub.liteflow.entity.data.Slot;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashSet; import java.util.HashSet;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@LiteflowComponent("a") @LiteflowComponent("a")
public class ACmp extends NodeComponent { public class ACmp extends NodeComponent {
@Autowired
private FlowExecutor flowExecutor;
@Override @Override
public void process() { public void process() {
System.out.println("ACmp executed!"); System.out.println("ACmp executed!");
Slot slot = getSlot(); Slot slot = getSlot();
slot.setData("testSet", new HashSet<>()); slot.setData("testSet", new HashSet<>());
for (int i = 0; i < 100; i++) { try {
this.sendPrivateDeliveryData("b",i+1); Queue<Integer> queue = new ConcurrentLinkedQueue<>();
for (int i = 1; i <= 100; i++) {
queue.add(i);
}
flowExecutor.execute2Resp("chain2", queue);
}catch (Exception e) {
e.printStackTrace();
} }
} }
} }

View File

@@ -1,26 +1,42 @@
/** /**
* <p>Title: liteflow</p> * <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p> * <p>Description: 轻量级的组件式流程框架</p>
*
* @author Bryan.Zhang * @author Bryan.Zhang
* @email weenyc31@163.com * @email weenyc31@163.com
* @Date 2020/4/1 * @Date 2020/4/1
*/ */
package com.yomahub.liteflow.test.privateDelivery.cmp; package com.yomahub.liteflow.test.privateDelivery.cmp;
import cn.hutool.core.collection.CollUtil;
import com.yomahub.liteflow.annotation.LiteflowComponent; import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeComponent; import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Queue;
import java.util.Set; import java.util.Set;
@LiteflowComponent("b") @LiteflowComponent("b")
public class BCmp extends NodeComponent { public class BCmp extends NodeComponent {
@Override @Override
public void process() { public boolean isAccess() {
System.out.println("BCmp executed!"); Queue<Integer> values = this.getSlot().getRequestData();
Integer value = this.getPrivateDeliveryData(); System.out.println("BCmp executed! values.size" + values.size());
Set<Integer> testSet = this.getSlot().getData("testSet"); if (CollUtil.isEmpty(values)) {
testSet.add(value); return false;
} }
Integer value = values.poll();
if (value == null) {
return false;
}
this.sendPrivateDeliveryData(this.getNodeId(), value);
return true;
}
@Override
public void process() {
Integer value = getPrivateDeliveryData();
System.out.println("BCmp executed!" + value);
}
} }

View File

@@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.privateDelivery.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeComponent;
@LiteflowComponent("d")
public class DCmp extends NodeComponent {
@Override
public void process() {
System.out.println("CCmp executed!");
}
}

View File

@@ -2,17 +2,10 @@
<flow> <flow>
<chain name="chain1"> <chain name="chain1">
<then value="a"/> <then value="a"/>
<!-- 100个b组件并发 -->
<when value="b,b,b,b,b,b,b,b,b,b"/>
<when value="b,b,b,b,b,b,b,b,b,b"/>
<when value="b,b,b,b,b,b,b,b,b,b"/>
<when value="b,b,b,b,b,b,b,b,b,b"/>
<when value="b,b,b,b,b,b,b,b,b,b"/>
<when value="b,b,b,b,b,b,b,b,b,b"/>
<when value="b,b,b,b,b,b,b,b,b,b"/>
<when value="b,b,b,b,b,b,b,b,b,b"/>
<when value="b,b,b,b,b,b,b,b,b,b"/>
<when value="b,b,b,b,b,b,b,b,b,b"/>
<then value="c"/> <then value="c"/>
</chain> </chain>
<chain name="chain2">
<pre value="d"/>
<when value="b,b,b,b,b,b,b,b,b,b"/>
</chain>
</flow> </flow>