mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 12:12:08 +08:00
bug #I41S18 在开启监控的情况下,偶尔会报出空指针的问题
This commit is contained in:
@@ -11,7 +11,7 @@ package com.yomahub.liteflow.entity.monitor;
|
|||||||
* 统计类
|
* 统计类
|
||||||
* @author Bryan.Zhang
|
* @author Bryan.Zhang
|
||||||
*/
|
*/
|
||||||
public class CompStatistics implements Comparable{
|
public class CompStatistics implements Comparable<CompStatistics>{
|
||||||
|
|
||||||
private String componentClazzName;
|
private String componentClazzName;
|
||||||
|
|
||||||
@@ -55,9 +55,9 @@ public class CompStatistics implements Comparable{
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int compareTo(Object o) {
|
public int compareTo(CompStatistics o) {
|
||||||
if( o instanceof CompStatistics) {
|
if(o != null) {
|
||||||
return this.recordTime >= ((CompStatistics) o).getRecordTime() ? -1 : 1;
|
return this.recordTime >= o.getRecordTime() ? -1 : 1;
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,9 +19,7 @@ import java.util.Map.Entry;
|
|||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import com.yomahub.liteflow.util.BoundedPriorityBlockingQueue;
|
||||||
import cn.hutool.core.collection.BoundedPriorityQueue;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@@ -40,7 +38,7 @@ public class MonitorBus {
|
|||||||
|
|
||||||
private final Logger LOG = LoggerFactory.getLogger(this.getClass());
|
private final Logger LOG = LoggerFactory.getLogger(this.getClass());
|
||||||
|
|
||||||
private final ConcurrentHashMap<String, BoundedPriorityQueue<CompStatistics>> statisticsMap = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, BoundedPriorityBlockingQueue<CompStatistics>> statisticsMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public MonitorBus(LiteflowConfig liteflowConfig) {
|
public MonitorBus(LiteflowConfig liteflowConfig) {
|
||||||
this.liteflowConfig = liteflowConfig;
|
this.liteflowConfig = liteflowConfig;
|
||||||
@@ -55,7 +53,7 @@ public class MonitorBus {
|
|||||||
if(statisticsMap.containsKey(statistics.getComponentClazzName())){
|
if(statisticsMap.containsKey(statistics.getComponentClazzName())){
|
||||||
statisticsMap.get(statistics.getComponentClazzName()).add(statistics);
|
statisticsMap.get(statistics.getComponentClazzName()).add(statistics);
|
||||||
}else{
|
}else{
|
||||||
BoundedPriorityQueue<CompStatistics> queue = new BoundedPriorityQueue<>(liteflowConfig.getQueueLimit());
|
BoundedPriorityBlockingQueue<CompStatistics> queue = new BoundedPriorityBlockingQueue<>(liteflowConfig.getQueueLimit());
|
||||||
queue.offer(statistics);
|
queue.offer(statistics);
|
||||||
statisticsMap.put(statistics.getComponentClazzName(), queue);
|
statisticsMap.put(statistics.getComponentClazzName(), queue);
|
||||||
}
|
}
|
||||||
@@ -65,7 +63,7 @@ public class MonitorBus {
|
|||||||
try{
|
try{
|
||||||
Map<String, BigDecimal> compAverageTimeSpent = new HashMap<String, BigDecimal>();
|
Map<String, BigDecimal> compAverageTimeSpent = new HashMap<String, BigDecimal>();
|
||||||
|
|
||||||
for(Entry<String, BoundedPriorityQueue<CompStatistics>> entry : statisticsMap.entrySet()){
|
for(Entry<String, BoundedPriorityBlockingQueue<CompStatistics>> entry : statisticsMap.entrySet()){
|
||||||
long totalTimeSpent = 0;
|
long totalTimeSpent = 0;
|
||||||
for(CompStatistics statistics : entry.getValue()){
|
for(CompStatistics statistics : entry.getValue()){
|
||||||
totalTimeSpent += statistics.getTimeSpent();
|
totalTimeSpent += statistics.getTimeSpent();
|
||||||
|
|||||||
@@ -0,0 +1,81 @@
|
|||||||
|
package com.yomahub.liteflow.util;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.PriorityBlockingQueue;
|
||||||
|
|
||||||
|
public class BoundedPriorityBlockingQueue<E> extends PriorityBlockingQueue<E> {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = -1;
|
||||||
|
|
||||||
|
//容量
|
||||||
|
private final int capacity;
|
||||||
|
private final Comparator<? super E> comparator;
|
||||||
|
|
||||||
|
public BoundedPriorityBlockingQueue(int capacity) {
|
||||||
|
this(capacity, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 构造
|
||||||
|
* @param capacity 容量
|
||||||
|
* @param comparator 比较器
|
||||||
|
*/
|
||||||
|
public BoundedPriorityBlockingQueue(int capacity, final Comparator<? super E> comparator) {
|
||||||
|
super(capacity, (o1, o2) -> {
|
||||||
|
int cResult;
|
||||||
|
if(comparator != null) {
|
||||||
|
cResult = comparator.compare(o1, o2);
|
||||||
|
}else {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Comparable<E> o1c = (Comparable<E>)o1;
|
||||||
|
cResult = o1c.compareTo(o2);
|
||||||
|
}
|
||||||
|
|
||||||
|
return - cResult;
|
||||||
|
});
|
||||||
|
this.capacity = capacity;
|
||||||
|
this.comparator = comparator;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 加入元素,当队列满时,淘汰末尾元素
|
||||||
|
* @param e 元素
|
||||||
|
* @return 加入成功与否
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean offer(E e) {
|
||||||
|
if(size() >= capacity) {
|
||||||
|
E head = peek();
|
||||||
|
if (this.comparator().compare(e, head) <= 0){
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
//当队列满时,就要淘汰顶端队列
|
||||||
|
poll();
|
||||||
|
}
|
||||||
|
return super.offer(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 添加多个元素<br>
|
||||||
|
* 参数为集合的情况请使用{@link PriorityQueue#addAll}
|
||||||
|
* @param c 元素数组
|
||||||
|
* @return 是否发生改变
|
||||||
|
*/
|
||||||
|
public boolean addAll(E[] c) {
|
||||||
|
return this.addAll(Arrays.asList(c));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return 返回排序后的列表
|
||||||
|
*/
|
||||||
|
public ArrayList<E> toList() {
|
||||||
|
final ArrayList<E> list = new ArrayList<>(this);
|
||||||
|
list.sort(comparator);
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<E> iterator() {
|
||||||
|
return toList().iterator();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -16,6 +16,7 @@ import org.springframework.context.annotation.Import;
|
|||||||
* 主要的业务装配器
|
* 主要的业务装配器
|
||||||
* 在这个装配器里装配了执行器,执行器初始化类,监控器
|
* 在这个装配器里装配了执行器,执行器初始化类,监控器
|
||||||
* 这个装配前置条件是需要LiteflowConfig,LiteflowPropertyAutoConfiguration以及SpringAware
|
* 这个装配前置条件是需要LiteflowConfig,LiteflowPropertyAutoConfiguration以及SpringAware
|
||||||
|
*
|
||||||
* @author Bryan.Zhang
|
* @author Bryan.Zhang
|
||||||
*/
|
*/
|
||||||
@Configuration
|
@Configuration
|
||||||
@@ -37,12 +38,13 @@ public class LiteflowMainAutoConfiguration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@ConditionalOnProperty(prefix = "liteflow",name = "parse-on-start",havingValue = "true")
|
@ConditionalOnProperty(prefix = "liteflow", name = "parse-on-start", havingValue = "true")
|
||||||
public LiteflowExecutorInit liteflowExecutorInit(FlowExecutor flowExecutor) {
|
public LiteflowExecutorInit liteflowExecutorInit(FlowExecutor flowExecutor) {
|
||||||
return new LiteflowExecutorInit(flowExecutor);
|
return new LiteflowExecutorInit(flowExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
@ConditionalOnProperty(prefix = "liteflow", name = "monitor.enable-log", havingValue = "true")
|
||||||
public MonitorBus monitorBus(LiteflowConfig liteflowConfig) {
|
public MonitorBus monitorBus(LiteflowConfig liteflowConfig) {
|
||||||
return new MonitorBus(liteflowConfig);
|
return new MonitorBus(liteflowConfig);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user