diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java index 5e072fad4..ccc87bfe1 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java @@ -7,15 +7,18 @@ */ package com.yomahub.liteflow.entity.data; -import java.util.concurrent.atomic.AtomicInteger; - import cn.hutool.core.util.ObjectUtil; -import com.yomahub.liteflow.exception.ConfigErrorException; import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.util.SpringAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + /** * 数据BUS,主要用来管理Slot,用以分配和回收 * @author Bryan.Zhang @@ -26,7 +29,9 @@ public class DataBus { public static AtomicInteger OCCUPY_COUNT = new AtomicInteger(0); - private static Slot[] slots; + private static AtomicReferenceArray SLOTS; + + private static ConcurrentLinkedQueue QUEUE; static { LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); @@ -36,20 +41,22 @@ public class DataBus { liteflowConfig = new LiteflowConfig(); } int slotSize = liteflowConfig.getSlotSize(); - slots = new Slot[slotSize]; + + SLOTS = new AtomicReferenceArray<>(slotSize); + + QUEUE = IntStream.range(0, slotSize - 1).boxed().collect(Collectors.toCollection(ConcurrentLinkedQueue::new)); } - public synchronized static int offerSlot(Class slotClazz){ - try{ - for(int i = 0; i < slots.length; i++){ - if(ObjectUtil.isNull(slots[i])){ - slots[i] = slotClazz.newInstance(); - OCCUPY_COUNT.incrementAndGet(); - return i; - } + public synchronized static int offerSlot(Class slotClazz) { + try { + Slot slot = slotClazz.newInstance(); + Integer slotIndex = QUEUE.poll(); + if (ObjectUtil.isNotNull(slotIndex) && SLOTS.compareAndSet(slotIndex, null, slot)) { + OCCUPY_COUNT.incrementAndGet(); + return slotIndex; } - }catch(Exception e){ - LOG.error("offer slot error",e); + } catch (Exception e) { + LOG.error("offer slot error", e); return -1; } return -1; @@ -57,13 +64,14 @@ public class DataBus { @SuppressWarnings("unchecked") public static T getSlot(int slotIndex){ - return (T)slots[slotIndex]; + return (T)SLOTS.get(slotIndex); } public static void releaseSlot(int slotIndex){ - if(ObjectUtil.isNotNull(slots[slotIndex])){ - LOG.info("[{}]:slot[{}] released",slots[slotIndex].getRequestId(),slotIndex); - slots[slotIndex] = null; + if(ObjectUtil.isNotNull(SLOTS.get(slotIndex))){ + LOG.info("[{}]:slot[{}] released",SLOTS.get(slotIndex).getRequestId(),slotIndex); + SLOTS.set(slotIndex, null); + QUEUE.add(slotIndex); OCCUPY_COUNT.decrementAndGet(); }else{ LOG.warn("slot[{}] already has been released",slotIndex);