!22 DataBus 性能优化 Lock Free

Merge pull request !22 from 王大锤/lock-free
This commit is contained in:
铂赛东
2021-05-19 16:59:39 +08:00
committed by Gitee

View File

@@ -7,15 +7,18 @@
*/
package com.yomahub.liteflow.entity.data;
import java.util.concurrent.atomic.AtomicInteger;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.exception.ConfigErrorException;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.util.SpringAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* 数据BUS主要用来管理Slot用以分配和回收
* @author Bryan.Zhang
@@ -26,7 +29,9 @@ public class DataBus {
public static AtomicInteger OCCUPY_COUNT = new AtomicInteger(0);
private static Slot[] slots;
private static AtomicReferenceArray<Slot> SLOTS;
private static ConcurrentLinkedQueue<Integer> QUEUE;
static {
LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class);
@@ -36,20 +41,22 @@ public class DataBus {
liteflowConfig = new LiteflowConfig();
}
int slotSize = liteflowConfig.getSlotSize();
slots = new Slot[slotSize];
SLOTS = new AtomicReferenceArray<>(slotSize);
QUEUE = IntStream.range(0, slotSize - 1).boxed().collect(Collectors.toCollection(ConcurrentLinkedQueue::new));
}
public synchronized static int offerSlot(Class<? extends Slot> slotClazz){
try{
for(int i = 0; i < slots.length; i++){
if(ObjectUtil.isNull(slots[i])){
slots[i] = slotClazz.newInstance();
OCCUPY_COUNT.incrementAndGet();
return i;
}
public synchronized static int offerSlot(Class<? extends Slot> slotClazz) {
try {
Slot slot = slotClazz.newInstance();
Integer slotIndex = QUEUE.poll();
if (ObjectUtil.isNotNull(slotIndex) && SLOTS.compareAndSet(slotIndex, null, slot)) {
OCCUPY_COUNT.incrementAndGet();
return slotIndex;
}
}catch(Exception e){
LOG.error("offer slot error",e);
} catch (Exception e) {
LOG.error("offer slot error", e);
return -1;
}
return -1;
@@ -57,13 +64,14 @@ public class DataBus {
@SuppressWarnings("unchecked")
public static <T extends Slot> T getSlot(int slotIndex){
return (T)slots[slotIndex];
return (T)SLOTS.get(slotIndex);
}
public static void releaseSlot(int slotIndex){
if(ObjectUtil.isNotNull(slots[slotIndex])){
LOG.info("[{}]:slot[{}] released",slots[slotIndex].getRequestId(),slotIndex);
slots[slotIndex] = null;
if(ObjectUtil.isNotNull(SLOTS.get(slotIndex))){
LOG.info("[{}]:slot[{}] released",SLOTS.get(slotIndex).getRequestId(),slotIndex);
SLOTS.set(slotIndex, null);
QUEUE.add(slotIndex);
OCCUPY_COUNT.decrementAndGet();
}else{
LOG.warn("slot[{}] already has been released",slotIndex);