From e6956487283dfa794ff909f3aefffcea974dfc00 Mon Sep 17 00:00:00 2001 From: "everywhere.z" Date: Fri, 23 May 2025 19:46:09 +0800 Subject: [PATCH] =?UTF-8?q?bug=20#IC9ZZ8=20reids=20poll=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/yomahub/liteflow/util/TupleOf3.java | 46 +++++++ .../redis/mode/polling/ChainPollingTask.java | 113 +++++++++--------- 2 files changed, 105 insertions(+), 54 deletions(-) create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/util/TupleOf3.java diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/TupleOf3.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/TupleOf3.java new file mode 100644 index 000000000..8675b03ae --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/TupleOf3.java @@ -0,0 +1,46 @@ +package com.yomahub.liteflow.util; + +/** + * 三元值对象 + * + * @author Bryan.Zhang + * @since 2.13.3 + */ +public class TupleOf3 { + + private A a; + + private B b; + + private C c; + + public TupleOf3(A a, B b, C c) { + this.a = a; + this.b = b; + this.c = c; + } + + public A getA() { + return a; + } + + public B getB() { + return b; + } + + public void setA(A a) { + this.a = a; + } + + public void setB(B b) { + this.b = b; + } + + public C getC() { + return c; + } + + public void setC(C c) { + this.c = c; + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ChainPollingTask.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ChainPollingTask.java index 814e29844..49014a38a 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ChainPollingTask.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/ChainPollingTask.java @@ -1,6 +1,8 @@ package com.yomahub.liteflow.parser.redis.mode.polling; +import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.lang.Pair; +import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.crypto.digest.DigestUtil; import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; @@ -10,13 +12,18 @@ import com.yomahub.liteflow.log.LFLoggerManager; import com.yomahub.liteflow.parser.redis.mode.RClient; import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; import com.yomahub.liteflow.util.RuleParsePluginUtil; +import com.yomahub.liteflow.util.TupleOf3; import java.util.*; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; /** * 用于轮询chain的定时任务 * * @author hxinyu + * @author Bryan.Zhang * @since 2.11.0 */ public class ChainPollingTask implements Runnable { @@ -27,6 +34,7 @@ public class ChainPollingTask implements Runnable { private Integer chainNum; + //key为chainId,value为缓存的SHA值 private Map chainSHAMap; private String keyLua; @@ -58,69 +66,66 @@ public class ChainPollingTask implements Runnable { //修改chainNum为最新chain数量 chainNum = Integer.parseInt(keyNum); - List needDelete = new ArrayList<>(); - //遍历Map,判断各个chain的value有无变化:修改变化了值的chain和被删除的chain - for (Map.Entry entry : chainSHAMap.entrySet()) { - String chainId = entry.getKey(); - String oldSHA = entry.getValue(); - Pair pair = RuleParsePluginUtil.parseIdKey(chainId); - // 如果是停用,就直接进删除 - if (pair.getKey()){ - FlowBus.removeChain(pair.getValue()); - needDelete.add(chainId); - continue; + //拿到所有的Chain的HashKey + Set newChainHashKeySet = chainClient.hkeys(chainKey); + + List> tupleOf3List = newChainHashKeySet.stream().map(chainHashKey -> { + Pair pair = RuleParsePluginUtil.parseIdKey(chainHashKey); + //TupleOf3为三元值对象,在这里A为chainHashKey,B为解析到的chainId,C为是否启用 + return new TupleOf3<>(chainHashKey, pair.getValue(), pair.getKey()); + }).collect(Collectors.toList()); + + tupleOf3List.forEach(tupleOf3 -> { + String chainHashKey = tupleOf3.getA(); + String chainId = tupleOf3.getB(); + Boolean enable = tupleOf3.getC(); + + // 如果是停用,就直接删除 + if (BooleanUtil.isFalse(enable)){ + LOG.info("starting reload flow config... delete key={}", chainId); + chainSHAMap.remove(chainId); + FlowBus.removeChain(chainId); + return; } //在redis服务端通过Lua脚本计算SHA值 - String newSHA = chainClient.evalSha(valueLua, chainKey, chainId); - if (StrUtil.equals(newSHA, "nil")) { - //新SHA值为nil, 即未获取到该chain,表示该chain已被删除 - FlowBus.removeChain(pair.getValue()); - LOG.info("starting reload flow config... delete key={}", chainId); + String newSHA = chainClient.evalSha(valueLua, chainKey, chainHashKey); - //添加到待删除的list 后续统一从SHAMap中移除 - //不在这里直接移除是为了避免先删除导致chainSHAMap并没有完全遍历完 chain删除不全 - needDelete.add(chainId); + if (StrUtil.isBlank(newSHA) || "nil".equals(newSHA)){ + FlowBus.removeChain(chainId); + return; } - else if (!StrUtil.equals(newSHA, oldSHA)) { - //SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain - String chainData = chainClient.hget(chainKey, chainId); - LiteFlowChainELBuilder.createChain().setChainId(pair.getValue()).setEL(chainData).build(); - LOG.info("starting reload flow config... update key={} new value={},", chainId, chainData); - //修改SHAMap + // chainSHAMap不含有redis取到的chainId,说明是新增的 + if (!chainSHAMap.containsKey(chainId)){ + String chainEL = chainClient.hget(chainKey, chainHashKey); + LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainEL).build(); chainSHAMap.put(chainId, newSHA); - } - //SHA值无变化,表示该chain未改变 - } - - //统一从SHAMap中移除要删除的chain - for (String chainId : needDelete) { - chainSHAMap.remove(chainId); - } - - //处理新添加chain和chainId被修改的情况 - if (chainNum > chainSHAMap.size()) { - //如果封装的SHAMap数量比最新chain总数少, 说明有两种情况: - // 1、添加了新chain - // 2、修改了chainId:因为遍历到旧的id时会取到nil,SHAMap会把原来的chainId删掉,但没有机会添加新的chainId - // 3、上述两者结合 - //在此处重新拉取所有chainId集合,补充添加新chain - Set newChainSet = chainClient.hkeys(chainKey); - for (String chainId : newChainSet) { - Pair pair = RuleParsePluginUtil.parseIdKey(chainId); - - if (!chainSHAMap.containsKey(chainId)) { - //将新chainId添加到LiteFlowChainELBuilder和SHAMap - String chainData = chainClient.hget(chainKey, chainId); - // 如果是启用,才装配 - if (pair.getKey()){ - LiteFlowChainELBuilder.createChain().setChainId(pair.getValue()).setEL(chainData).build(); - LOG.info("starting reload flow config... create key={} new value={},", chainId, chainData); - chainSHAMap.put(chainId, DigestUtil.sha1Hex(chainData)); - } + }else{ + String oldSHA = chainSHAMap.get(chainId); + if (!StrUtil.equals(newSHA, oldSHA)) { + //SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain + String chainEL = chainClient.hget(chainKey, chainHashKey); + LiteFlowChainELBuilder.createChain().setChainId(chainId).setEL(chainEL).build(); + LOG.info("starting reload flow config... update key={} new value={},", chainId, chainEL); + //修改SHAMap + chainSHAMap.put(chainId, newSHA); } } + }); + + // 这里是为了处理在redis服务端删除,但是本地缓存还存在chainId的情况 + // 这表明是服务端这边已经删除了chain + if (CollectionUtil.isNotEmpty(chainSHAMap)){ + Set newChainIdSet = tupleOf3List.stream().map(TupleOf3::getB).collect(Collectors.toSet()); + + Collection deletedChainIdSet = CollectionUtil.subtract(chainSHAMap.keySet(), newChainIdSet); + + deletedChainIdSet.forEach(chainId -> { + chainSHAMap.remove(chainId); + FlowBus.removeChain(chainId); + LOG.info("starting reload flow config... delete key={}", chainId); + }); } } catch (Exception e) { LOG.error("[Exception during chain polling] " + e.getMessage(), e);