From df0f168848b9a60565960fabcc6b52a8a1e3a487 Mon Sep 17 00:00:00 2001 From: bryan31 Date: Wed, 21 Jul 2021 16:02:40 +0800 Subject: [PATCH] =?UTF-8?q?bug=20#I41S18=20=E5=9C=A8=E5=BC=80=E5=90=AF?= =?UTF-8?q?=E7=9B=91=E6=8E=A7=E7=9A=84=E6=83=85=E5=86=B5=E4=B8=8B=EF=BC=8C?= =?UTF-8?q?=E5=81=B6=E5=B0=94=E4=BC=9A=E6=8A=A5=E5=87=BA=E7=A9=BA=E6=8C=87?= =?UTF-8?q?=E9=92=88=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../entity/monitor/CompStatistics.java | 8 +- .../yomahub/liteflow/monitor/MonitorBus.java | 10 +-- .../util/BoundedPriorityBlockingQueue.java | 81 +++++++++++++++++++ .../LiteflowMainAutoConfiguration.java | 4 +- 4 files changed, 92 insertions(+), 11 deletions(-) create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/util/BoundedPriorityBlockingQueue.java diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/monitor/CompStatistics.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/monitor/CompStatistics.java index 70a8ec1a4..ceb0941d3 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/monitor/CompStatistics.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/monitor/CompStatistics.java @@ -11,7 +11,7 @@ package com.yomahub.liteflow.entity.monitor; * 统计类 * @author Bryan.Zhang */ -public class CompStatistics implements Comparable{ +public class CompStatistics implements Comparable{ private String componentClazzName; @@ -55,9 +55,9 @@ public class CompStatistics implements Comparable{ } @Override - public int compareTo(Object o) { - if( o instanceof CompStatistics) { - return this.recordTime >= ((CompStatistics) o).getRecordTime() ? -1 : 1; + public int compareTo(CompStatistics o) { + if(o != null) { + return this.recordTime >= o.getRecordTime() ? -1 : 1; } return 1; } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorBus.java index d2109aaa1..d0d13a092 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorBus.java @@ -19,9 +19,7 @@ import java.util.Map.Entry; import java.util.Timer; import java.util.concurrent.ConcurrentHashMap; -import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.collection.BoundedPriorityQueue; - +import com.yomahub.liteflow.util.BoundedPriorityBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +38,7 @@ public class MonitorBus { private final Logger LOG = LoggerFactory.getLogger(this.getClass()); - private final ConcurrentHashMap> statisticsMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> statisticsMap = new ConcurrentHashMap<>(); public MonitorBus(LiteflowConfig liteflowConfig) { this.liteflowConfig = liteflowConfig; @@ -55,7 +53,7 @@ public class MonitorBus { if(statisticsMap.containsKey(statistics.getComponentClazzName())){ statisticsMap.get(statistics.getComponentClazzName()).add(statistics); }else{ - BoundedPriorityQueue queue = new BoundedPriorityQueue<>(liteflowConfig.getQueueLimit()); + BoundedPriorityBlockingQueue queue = new BoundedPriorityBlockingQueue<>(liteflowConfig.getQueueLimit()); queue.offer(statistics); statisticsMap.put(statistics.getComponentClazzName(), queue); } @@ -65,7 +63,7 @@ public class MonitorBus { try{ Map compAverageTimeSpent = new HashMap(); - for(Entry> entry : statisticsMap.entrySet()){ + for(Entry> entry : statisticsMap.entrySet()){ long totalTimeSpent = 0; for(CompStatistics statistics : entry.getValue()){ totalTimeSpent += statistics.getTimeSpent(); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/BoundedPriorityBlockingQueue.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/BoundedPriorityBlockingQueue.java new file mode 100644 index 000000000..ff624e627 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/BoundedPriorityBlockingQueue.java @@ -0,0 +1,81 @@ +package com.yomahub.liteflow.util; + +import java.util.*; +import java.util.concurrent.PriorityBlockingQueue; + +public class BoundedPriorityBlockingQueue extends PriorityBlockingQueue { + + private static final long serialVersionUID = -1; + + //容量 + private final int capacity; + private final Comparator comparator; + + public BoundedPriorityBlockingQueue(int capacity) { + this(capacity, null); + } + + /** + * 构造 + * @param capacity 容量 + * @param comparator 比较器 + */ + public BoundedPriorityBlockingQueue(int capacity, final Comparator comparator) { + super(capacity, (o1, o2) -> { + int cResult; + if(comparator != null) { + cResult = comparator.compare(o1, o2); + }else { + @SuppressWarnings("unchecked") + Comparable o1c = (Comparable)o1; + cResult = o1c.compareTo(o2); + } + + return - cResult; + }); + this.capacity = capacity; + this.comparator = comparator; + } + + /** + * 加入元素,当队列满时,淘汰末尾元素 + * @param e 元素 + * @return 加入成功与否 + */ + @Override + public boolean offer(E e) { + if(size() >= capacity) { + E head = peek(); + if (this.comparator().compare(e, head) <= 0){ + return true; + } + //当队列满时,就要淘汰顶端队列 + poll(); + } + return super.offer(e); + } + + /** + * 添加多个元素
+ * 参数为集合的情况请使用{@link PriorityQueue#addAll} + * @param c 元素数组 + * @return 是否发生改变 + */ + public boolean addAll(E[] c) { + return this.addAll(Arrays.asList(c)); + } + + /** + * @return 返回排序后的列表 + */ + public ArrayList toList() { + final ArrayList list = new ArrayList<>(this); + list.sort(comparator); + return list; + } + + @Override + public Iterator iterator() { + return toList().iterator(); + } +} diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java index 459f7420f..8e806379c 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java @@ -16,6 +16,7 @@ import org.springframework.context.annotation.Import; * 主要的业务装配器 * 在这个装配器里装配了执行器,执行器初始化类,监控器 * 这个装配前置条件是需要LiteflowConfig,LiteflowPropertyAutoConfiguration以及SpringAware + * * @author Bryan.Zhang */ @Configuration @@ -37,12 +38,13 @@ public class LiteflowMainAutoConfiguration { } @Bean - @ConditionalOnProperty(prefix = "liteflow",name = "parse-on-start",havingValue = "true") + @ConditionalOnProperty(prefix = "liteflow", name = "parse-on-start", havingValue = "true") public LiteflowExecutorInit liteflowExecutorInit(FlowExecutor flowExecutor) { return new LiteflowExecutorInit(flowExecutor); } @Bean + @ConditionalOnProperty(prefix = "liteflow", name = "monitor.enable-log", havingValue = "true") public MonitorBus monitorBus(LiteflowConfig liteflowConfig) { return new MonitorBus(liteflowConfig); }