fix from cr

This commit is contained in:
houxinyu
2023-08-16 18:12:58 +08:00
parent b5452dec69
commit 132729141d
7 changed files with 56 additions and 30 deletions

View File

@@ -5,6 +5,7 @@ import cn.hutool.crypto.digest.DigestUtil;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.parser.redis.mode.RClient;
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
@@ -16,7 +17,7 @@ import java.util.*;
* @author hxinyu
* @since 2.11.0
*/
public class ChainPollingTask {
public class ChainPollingTask implements Runnable {
private RedisParserVO redisParserVO;
@@ -26,24 +27,29 @@ public class ChainPollingTask {
private Map<String, String> chainSHAMap;
LFLog LOG;
private String keyLua;
public ChainPollingTask(RedisParserVO redisParserVO, RClient chainClient, Integer chainNum, Map<String, String> chainSHAMap, LFLog LOG) {
private String valueLua;
LFLog LOG = LFLoggerManager.getLogger(ChainPollingTask.class);
public ChainPollingTask(RedisParserVO redisParserVO, RClient chainClient, Integer chainNum, Map<String, String> chainSHAMap, String keyLua, String valueLua) {
this.redisParserVO = redisParserVO;
this.chainClient = chainClient;
this.chainNum = chainNum;
this.chainSHAMap = chainSHAMap;
this.LOG = LOG;
this.keyLua = keyLua;
this.valueLua = valueLua;
}
/**
* 用于返回chain轮询任务的Runnable实例
* 用于返回chain轮询任务
* 先根据hash中value的SHA值修改变化的和被删除的chain
* 再根据hash中field数量的变化拉取新增的chain
*/
public Runnable pollChainTask(String keyLua, String valueLua) {
Runnable r = () -> {
@Override
public void run() {
try {
String chainKey = redisParserVO.getChainKey();
//Lua获取chainKey中最新的chain数量
String keyNum = chainClient.evalSha(keyLua, chainKey);
@@ -52,7 +58,7 @@ public class ChainPollingTask {
List<String> needDelete = new ArrayList<>();
//遍历Map,判断各个chain的value有无变化修改变化了值的chain和被删除的chain
for(Map.Entry<String, String> entry: chainSHAMap.entrySet()) {
for (Map.Entry<String, String> entry : chainSHAMap.entrySet()) {
String chainId = entry.getKey();
String oldSHA = entry.getValue();
//在redis服务端通过Lua脚本计算SHA值
@@ -101,7 +107,8 @@ public class ChainPollingTask {
}
}
}
};
return r;
} catch (Exception e) {
LOG.error("[Exception during chain polling] " + e.getMessage(), e);
}
}
}

View File

@@ -44,7 +44,7 @@ public class RedisParserPollingMode implements RedisParserHelper {
private Map<String, String> scriptSHAMap = new HashMap<>();
//定时任务线程池核心线程数
private static final int CORE_POOL_SIZE = 1;
private static final int CORE_POOL_SIZE = 2;
//计算hash中field数量的lua脚本
private final String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" +
@@ -187,9 +187,9 @@ public class RedisParserPollingMode implements RedisParserHelper {
new ThreadPoolExecutor.DiscardOldestPolicy());
//添加轮询chain的定时任务
ChainPollingTask chainTask = new ChainPollingTask(redisParserVO, chainClient, chainNum, chainSHAMap, LOG);
pollExecutor.scheduleAtFixedRate(chainTask.pollChainTask(keyLuaOfChain, valueLuaOfChain),
60, redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS);
ChainPollingTask chainTask = new ChainPollingTask(redisParserVO, chainClient, chainNum, chainSHAMap, keyLuaOfChain, valueLuaOfChain);
pollExecutor.scheduleAtFixedRate(chainTask, redisParserVO.getPollingStartTime().longValue(),
redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS);
//如果有脚本
if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())
@@ -199,9 +199,9 @@ public class RedisParserPollingMode implements RedisParserHelper {
String valueLuaOfScript = scriptClient.scriptLoad(luaOfValue);
//添加轮询script的定时任务
ScriptPollingTask scriptTask = new ScriptPollingTask(redisParserVO, scriptClient, scriptNum, scriptSHAMap, LOG);
pollExecutor.scheduleAtFixedRate(scriptTask.pollScriptTask(keyLuaOfScript, valueLuaOfScript),
60, redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS);
ScriptPollingTask scriptTask = new ScriptPollingTask(redisParserVO, scriptClient, scriptNum, scriptSHAMap, keyLuaOfScript, valueLuaOfScript);
pollExecutor.scheduleAtFixedRate(scriptTask, redisParserVO.getPollingStartTime().longValue(),
redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS);
}
}
}

View File

@@ -4,6 +4,7 @@ import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.digest.DigestUtil;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.parser.redis.mode.RClient;
import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper;
import com.yomahub.liteflow.parser.redis.vo.RedisParserVO;
@@ -19,7 +20,7 @@ import java.util.Set;
* @author hxinyu
* @since 2.11.0
*/
public class ScriptPollingTask {
public class ScriptPollingTask implements Runnable {
private RedisParserVO redisParserVO;
@@ -29,24 +30,29 @@ public class ScriptPollingTask {
private Map<String, String> scriptSHAMap;
LFLog LOG;
private String keyLua;
public ScriptPollingTask(RedisParserVO redisParserVO, RClient scriptClient, Integer scriptNum, Map<String, String> scriptSHAMap, LFLog LOG) {
private String valueLua;
LFLog LOG = LFLoggerManager.getLogger(ScriptPollingTask.class);
public ScriptPollingTask(RedisParserVO redisParserVO, RClient scriptClient, Integer scriptNum, Map<String, String> scriptSHAMap, String keyLua, String valueLua) {
this.redisParserVO = redisParserVO;
this.scriptClient = scriptClient;
this.scriptNum = scriptNum;
this.scriptSHAMap = scriptSHAMap;
this.LOG = LOG;
this.keyLua = keyLua;
this.valueLua = valueLua;
}
/**
* 用于返回script轮询任务的Runnable实例
* 用于返回script轮询任务
* 首先根据hash中field数量的变化拉取新增的script
* 再根据hash中value的SHA值修改变化的和被删除的script
*/
public Runnable pollScriptTask(String keyLua, String valueLua) {
Runnable r = () -> {
@Override
public void run() {
try {
String scriptKey = redisParserVO.getScriptKey();
//Lua获取scriptKey中最新的script数量
String keyNum = scriptClient.evalSha(keyLua, scriptKey);
@@ -105,7 +111,8 @@ public class ScriptPollingTask {
}
}
}
};
return r;
} catch (Exception e) {
LOG.error("[Exception during script polling] " + e.getMessage(), e);
}
}
}

View File

@@ -23,9 +23,12 @@ public class RedisParserVO {
/*监听机制 轮询为poll 订阅为subscribe 默认为poll*/
private RedisParserMode mode = RedisParserMode.POLL;
/*轮询时间间隔(s) 默认1分钟 若选择订阅机制可不配置*/
/*轮询时间间隔(s) 默认60s 若选择订阅机制可不配置*/
private Integer pollingInterval = 60;
/*规则配置后首次轮询的起始时间 默认为60s 若选择订阅机制可不配置*/
private Integer pollingStartTime = 60;
/*chain表配置的数据库号*/
private Integer chainDataBase;
@@ -77,6 +80,14 @@ public class RedisParserVO {
}
}
public Integer getPollingStartTime() {
return pollingStartTime;
}
public void setPollingStartTime(Integer pollingStartTime) {
this.pollingStartTime = pollingStartTime;
}
public Integer getPollingInterval() {
return pollingInterval;
}

View File

@@ -43,6 +43,7 @@
<version>${revision}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -9,7 +9,6 @@ import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.MockitoAnnotations;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;

View File

@@ -2,6 +2,7 @@ liteflow.rule-source-ext-data={\
"host":"localhost",\
"port":6379,\
"pollingInterval":1,\
"pollingStartTime":1,\
"chainDataBase":1,\
"chainKey":"pollChainKey",\
"scriptDataBase":1,\