mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 12:12:08 +08:00
性能优化 DataBus Lock Free
This commit is contained in:
@@ -7,15 +7,18 @@
|
||||
*/
|
||||
package com.yomahub.liteflow.entity.data;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.yomahub.liteflow.exception.ConfigErrorException;
|
||||
import com.yomahub.liteflow.property.LiteflowConfig;
|
||||
import com.yomahub.liteflow.util.SpringAware;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
/**
|
||||
* 数据BUS,主要用来管理Slot,用以分配和回收
|
||||
* @author Bryan.Zhang
|
||||
@@ -26,7 +29,9 @@ public class DataBus {
|
||||
|
||||
public static AtomicInteger OCCUPY_COUNT = new AtomicInteger(0);
|
||||
|
||||
private static Slot[] slots;
|
||||
private static AtomicReferenceArray<Slot> SLOTS;
|
||||
|
||||
private static ConcurrentLinkedQueue<Integer> QUEUE;
|
||||
|
||||
static {
|
||||
LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class);
|
||||
@@ -36,20 +41,22 @@ public class DataBus {
|
||||
liteflowConfig = new LiteflowConfig();
|
||||
}
|
||||
int slotSize = liteflowConfig.getSlotSize();
|
||||
slots = new Slot[slotSize];
|
||||
|
||||
SLOTS = new AtomicReferenceArray<>(slotSize);
|
||||
|
||||
QUEUE = IntStream.range(0, slotSize - 1).boxed().collect(Collectors.toCollection(ConcurrentLinkedQueue::new));
|
||||
}
|
||||
|
||||
public synchronized static int offerSlot(Class<? extends Slot> slotClazz){
|
||||
try{
|
||||
for(int i = 0; i < slots.length; i++){
|
||||
if(ObjectUtil.isNull(slots[i])){
|
||||
slots[i] = slotClazz.newInstance();
|
||||
OCCUPY_COUNT.incrementAndGet();
|
||||
return i;
|
||||
}
|
||||
public synchronized static int offerSlot(Class<? extends Slot> slotClazz) {
|
||||
try {
|
||||
Slot slot = slotClazz.newInstance();
|
||||
Integer slotIndex = QUEUE.poll();
|
||||
if (ObjectUtil.isNotNull(slotIndex) && SLOTS.compareAndSet(slotIndex, null, slot)) {
|
||||
OCCUPY_COUNT.incrementAndGet();
|
||||
return slotIndex;
|
||||
}
|
||||
}catch(Exception e){
|
||||
LOG.error("offer slot error",e);
|
||||
} catch (Exception e) {
|
||||
LOG.error("offer slot error", e);
|
||||
return -1;
|
||||
}
|
||||
return -1;
|
||||
@@ -57,13 +64,13 @@ public class DataBus {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T extends Slot> T getSlot(int slotIndex){
|
||||
return (T)slots[slotIndex];
|
||||
return (T)SLOTS.get(slotIndex);
|
||||
}
|
||||
|
||||
public static void releaseSlot(int slotIndex){
|
||||
if(ObjectUtil.isNotNull(slots[slotIndex])){
|
||||
LOG.info("[{}]:slot[{}] released",slots[slotIndex].getRequestId(),slotIndex);
|
||||
slots[slotIndex] = null;
|
||||
if(ObjectUtil.isNotNull(SLOTS.get(slotIndex))){
|
||||
LOG.info("[{}]:slot[{}] released",SLOTS.get(slotIndex).getRequestId(),slotIndex);
|
||||
SLOTS.set(slotIndex, null);
|
||||
OCCUPY_COUNT.decrementAndGet();
|
||||
}else{
|
||||
LOG.warn("slot[{}] already has been released",slotIndex);
|
||||
|
||||
Reference in New Issue
Block a user