diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java index f49b0119c..95f37657e 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java @@ -13,7 +13,9 @@ import com.yomahub.liteflow.property.LiteflowConfigGetter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.Collectors; @@ -29,22 +31,48 @@ public class DataBus { public static AtomicInteger OCCUPY_COUNT = new AtomicInteger(0); - private static final AtomicReferenceArray SLOTS; + //这里为什么采用ConcurrentHashMap作为slot存放的容器? + //因为ConcurrentHashMap的随机取值复杂度也和数组一样为O(1),并且没有并发问题,还有自动扩容的功能 + //用数组的话,扩容涉及copy,线程安全问题还要自己处理 + private static final ConcurrentHashMap SLOTS; private static final ConcurrentLinkedQueue QUEUE; + //当前slot的下标index的最大值 + private static Integer currentIndexMaxValue; + static { LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); - int slotSize = liteflowConfig.getSlotSize(); - SLOTS = new AtomicReferenceArray<>(slotSize); - QUEUE = IntStream.range(0, slotSize).boxed().collect(Collectors.toCollection(ConcurrentLinkedQueue::new)); + currentIndexMaxValue = liteflowConfig.getSlotSize(); + + SLOTS = new ConcurrentHashMap<>(); + QUEUE = IntStream.range(0, currentIndexMaxValue).boxed().collect(Collectors.toCollection(ConcurrentLinkedQueue::new)); } public static int offerSlot(Class slotClazz) { try { Slot slot = slotClazz.newInstance(); Integer slotIndex = QUEUE.poll(); - if (ObjectUtil.isNotNull(slotIndex) && SLOTS.compareAndSet(slotIndex, null, slot)) { + + if (ObjectUtil.isNull(slotIndex)){ + //只有在扩容的时候需要用到synchronized重量级锁 + //扩一次容,增强原来size的0.75,因为初始slot容量为1024,从某种层面来说,即便并发很大。但是扩容的次数不会很多。 + //因为单个机器的tps上限总归是有一个极限的,不可能无限制的增长。 + synchronized (DataBus.class){ + //在扩容的一刹那,去竞争这个锁的线程还是有一些,所以获得这个锁的线程这里要再次取一次。如果为null,再真正扩容 + slotIndex = QUEUE.poll(); + if (ObjectUtil.isNull(slotIndex)){ + int nextMaxIndex = (int) Math.round(currentIndexMaxValue * 1.75); + QUEUE.addAll(IntStream.range(currentIndexMaxValue, nextMaxIndex).boxed().collect(Collectors.toCollection(ConcurrentLinkedQueue::new))); + currentIndexMaxValue = nextMaxIndex; + //扩容好,从队列里再取出扩容好的index + slotIndex = QUEUE.poll(); + } + } + } + + if (ObjectUtil.isNotNull(slotIndex)) { + SLOTS.put(slotIndex, slot); OCCUPY_COUNT.incrementAndGet(); return slotIndex; } @@ -63,7 +91,7 @@ public class DataBus { public static void releaseSlot(int slotIndex){ if(ObjectUtil.isNotNull(SLOTS.get(slotIndex))){ LOG.info("[{}]:slot[{}] released",SLOTS.get(slotIndex).getRequestId(),slotIndex); - SLOTS.set(slotIndex, null); + SLOTS.remove(slotIndex); QUEUE.add(slotIndex); OCCUPY_COUNT.decrementAndGet(); }else{ diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/resizeSlot/ResizeSlotSpringbootTest.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/resizeSlot/ResizeSlotSpringbootTest.java new file mode 100644 index 000000000..e5f379a1f --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/resizeSlot/ResizeSlotSpringbootTest.java @@ -0,0 +1,52 @@ +package com.yomahub.liteflow.test.resizeSlot; + +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.entity.data.DefaultSlot; +import com.yomahub.liteflow.entity.data.LiteflowResponse; +import com.yomahub.liteflow.test.BaseTest; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.function.Consumer; + +/** + * springboot环境下slot扩容测试 + * @author Bryan.Zhang + * @since 2.5.0 + */ +@RunWith(SpringRunner.class) +@TestPropertySource(value = "classpath:/resizeSlot/application.properties") +@SpringBootTest(classes = ResizeSlotSpringbootTest.class) +@EnableAutoConfiguration +@ComponentScan({"com.yomahub.liteflow.test.resizeSlot.cmp"}) +public class ResizeSlotSpringbootTest extends BaseTest { + + @Resource + private FlowExecutor flowExecutor; + + @Test + public void testSpringboot() throws Exception{ + ExecutorService pool = Executors.newCachedThreadPool(); + + List>> futureList = new ArrayList<>(); + for (int i = 0; i < 500; i++) { + Future> future = pool.submit(() -> flowExecutor.execute2Resp("chain1", "arg")); + futureList.add(future); + } + + for(Future> future : futureList){ + Assert.assertTrue(future.get().isSuccess()); + } + System.out.println("success"); + } +} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/resizeSlot/cmp/ACmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/resizeSlot/cmp/ACmp.java new file mode 100644 index 000000000..4f466e1c5 --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/resizeSlot/cmp/ACmp.java @@ -0,0 +1,20 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.resizeSlot.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("a") +public class ACmp extends NodeComponent { + + @Override + public void process() { + System.out.println("ACmp executed!"); + } +} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/resizeSlot/cmp/BCmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/resizeSlot/cmp/BCmp.java new file mode 100644 index 000000000..0c76deae4 --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/resizeSlot/cmp/BCmp.java @@ -0,0 +1,21 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.resizeSlot.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("b") +public class BCmp extends NodeComponent { + + @Override + public void process() { + System.out.println("BCmp executed!"); + } + +} diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/resizeSlot/cmp/CCmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/resizeSlot/cmp/CCmp.java new file mode 100644 index 000000000..7dc427c11 --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/resizeSlot/cmp/CCmp.java @@ -0,0 +1,21 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.resizeSlot.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("c") +public class CCmp extends NodeComponent { + + @Override + public void process() { + System.out.println("CCmp executed!"); + } + +} diff --git a/liteflow-testcase-springboot/src/test/resources/resizeSlot/application.properties b/liteflow-testcase-springboot/src/test/resources/resizeSlot/application.properties new file mode 100644 index 000000000..6bea4bd1c --- /dev/null +++ b/liteflow-testcase-springboot/src/test/resources/resizeSlot/application.properties @@ -0,0 +1,2 @@ +liteflow.rule-source=resizeSlot/flow.xml +liteflow.slot-size=4 \ No newline at end of file diff --git a/liteflow-testcase-springboot/src/test/resources/resizeSlot/flow.xml b/liteflow-testcase-springboot/src/test/resources/resizeSlot/flow.xml new file mode 100644 index 000000000..22870d94f --- /dev/null +++ b/liteflow-testcase-springboot/src/test/resources/resizeSlot/flow.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file