diff --git a/pom.xml b/pom.xml index dd7befa8e..85ac7b747 100644 --- a/pom.xml +++ b/pom.xml @@ -350,13 +350,6 @@ ${fastjson.version} - - - redis.clients - jedis - 5.1.0 - - io.github.linpeilie mapstruct-plus-spring-boot-starter diff --git a/ruoyi-common/ruoyi-common-dubbo/pom.xml b/ruoyi-common/ruoyi-common-dubbo/pom.xml index 344e3a1ae..87dcae628 100644 --- a/ruoyi-common/ruoyi-common-dubbo/pom.xml +++ b/ruoyi-common/ruoyi-common-dubbo/pom.xml @@ -37,20 +37,10 @@ - org.apache.dubbo.extensions - dubbo-metadata-report-redis - - - redis.clients - jedis - - - - - redis.clients - jedis - 5.2.0 + org.dromara + ruoyi-common-redis + org.projectlombok lombok diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java deleted file mode 100644 index fa47ed1fe..000000000 --- a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java +++ /dev/null @@ -1,538 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.metadata.store.redis; - -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.config.configcenter.ConfigItem; -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.*; -import org.apache.dubbo.metadata.MappingChangedEvent; -import org.apache.dubbo.metadata.MappingListener; -import org.apache.dubbo.metadata.MetadataInfo; -import org.apache.dubbo.metadata.ServiceNameMapping; -import org.apache.dubbo.metadata.report.identifier.*; -import org.apache.dubbo.metadata.report.support.AbstractMetadataReport; -import org.apache.dubbo.rpc.RpcException; -import redis.clients.jedis.*; -import redis.clients.jedis.params.SetParams; -import redis.clients.jedis.util.JedisClusterCRC16; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -import static org.apache.dubbo.common.constants.CommonConstants.*; -import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RESPONSE; -import static org.apache.dubbo.metadata.MetadataConstants.META_DATA_STORE_TAG; -import static org.apache.dubbo.metadata.ServiceNameMapping.DEFAULT_MAPPING_GROUP; -import static org.apache.dubbo.metadata.ServiceNameMapping.getAppNames; -import static org.apache.dubbo.metadata.report.support.Constants.DEFAULT_METADATA_REPORT_CYCLE_REPORT; - -/** - * RedisMetadataReport - */ -public class RedisMetadataReport extends AbstractMetadataReport { - - private static final String REDIS_DATABASE_KEY = "database"; - private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(RedisMetadataReport.class); - - // protected , for test - protected JedisPool pool; - private Set jedisClusterNodes; - private int timeout; - private String username; - private String password; - private final String root; - private final ConcurrentHashMap mappingDataListenerMap = new ConcurrentHashMap<>(); - private SetParams jedisParams = SetParams.setParams(); - - public RedisMetadataReport(URL url) { - super(url); - timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); - username = url.getUsername(); - password = url.getPassword(); - this.root = url.getGroup(DEFAULT_ROOT); - if (url.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) { - // ttl default is twice the cycle-report time - jedisParams.px(ONE_DAY_IN_MILLISECONDS * 2); - } - if (url.getParameter(CLUSTER_KEY, false)) { - jedisClusterNodes = new HashSet<>(); - List urls = url.getBackupUrls(); - for (URL tmpUrl : urls) { - jedisClusterNodes.add(new HostAndPort(tmpUrl.getHost(), tmpUrl.getPort())); - } - } else { - int database = url.getParameter(REDIS_DATABASE_KEY, 0); - pool = new JedisPool(new JedisPoolConfig(), url.getHost(), url.getPort(), timeout, username, password, database); - } - } - - @Override - protected void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions) { - this.storeMetadata(providerMetadataIdentifier, serviceDefinitions, true); - } - - @Override - protected void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String value) { - this.storeMetadata(consumerMetadataIdentifier, value, true); - } - - @Override - protected void doSaveMetadata(ServiceMetadataIdentifier serviceMetadataIdentifier, URL url) { - this.storeMetadata(serviceMetadataIdentifier, URL.encode(url.toFullString()), false); - } - - @Override - protected void doRemoveMetadata(ServiceMetadataIdentifier serviceMetadataIdentifier) { - this.deleteMetadata(serviceMetadataIdentifier); - } - - @Override - protected List doGetExportedURLs(ServiceMetadataIdentifier metadataIdentifier) { - String content = getMetadata(metadataIdentifier); - if (StringUtils.isEmpty(content)) { - return Collections.emptyList(); - } - return new ArrayList<>(Arrays.asList(URL.decode(content))); - } - - @Override - protected void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr) { - this.storeMetadata(subscriberMetadataIdentifier, urlListStr, false); - } - - @Override - protected String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier) { - return this.getMetadata(subscriberMetadataIdentifier); - } - - @Override - public String getServiceDefinition(MetadataIdentifier metadataIdentifier) { - return this.getMetadata(metadataIdentifier); - } - - private void storeMetadata(BaseMetadataIdentifier metadataIdentifier, String v, boolean ephemeral) { - if (pool != null) { - storeMetadataStandalone(metadataIdentifier, v, ephemeral); - } else { - storeMetadataInCluster(metadataIdentifier, v, ephemeral); - } - } - - private void storeMetadataInCluster(BaseMetadataIdentifier metadataIdentifier, String v, boolean ephemeral) { - try (JedisCluster jedisCluster = - new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { - if (ephemeral) { - jedisCluster.set(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG, v, jedisParams); - } else { - jedisCluster.set(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG, v); - } - } catch (Throwable e) { - String msg = - "Failed to put " + metadataIdentifier + " to redis cluster " + v + ", cause: " + e.getMessage(); - logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); - throw new RpcException(msg, e); - } - } - - private void storeMetadataStandalone(BaseMetadataIdentifier metadataIdentifier, String v, boolean ephemeral) { - try (Jedis jedis = pool.getResource()) { - if (ephemeral) { - jedis.set(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v, jedisParams); - } else { - jedis.set(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v); - } - } catch (Throwable e) { - String msg = "Failed to put " + metadataIdentifier + " to redis " + v + ", cause: " + e.getMessage(); - logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); - throw new RpcException(msg, e); - } - } - - private void deleteMetadata(BaseMetadataIdentifier metadataIdentifier) { - if (pool != null) { - deleteMetadataStandalone(metadataIdentifier); - } else { - deleteMetadataInCluster(metadataIdentifier); - } - } - - private void deleteMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) { - try (JedisCluster jedisCluster = - new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { - jedisCluster.del(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG); - } catch (Throwable e) { - String msg = "Failed to delete " + metadataIdentifier + " from redis cluster , cause: " + e.getMessage(); - logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); - throw new RpcException(msg, e); - } - } - - private void deleteMetadataStandalone(BaseMetadataIdentifier metadataIdentifier) { - try (Jedis jedis = pool.getResource()) { - jedis.del(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); - } catch (Throwable e) { - String msg = "Failed to delete " + metadataIdentifier + " from redis , cause: " + e.getMessage(); - logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); - throw new RpcException(msg, e); - } - } - - private String getMetadata(BaseMetadataIdentifier metadataIdentifier) { - if (pool != null) { - return getMetadataStandalone(metadataIdentifier); - } else { - return getMetadataInCluster(metadataIdentifier); - } - } - - private String getMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) { - try (JedisCluster jedisCluster = - new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { - return jedisCluster.get(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG); - } catch (Throwable e) { - String msg = "Failed to get " + metadataIdentifier + " from redis cluster , cause: " + e.getMessage(); - logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); - throw new RpcException(msg, e); - } - } - - private String getMetadataStandalone(BaseMetadataIdentifier metadataIdentifier) { - try (Jedis jedis = pool.getResource()) { - return jedis.get(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); - } catch (Throwable e) { - String msg = "Failed to get " + metadataIdentifier + " from redis , cause: " + e.getMessage(); - logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); - throw new RpcException(msg, e); - } - } - - /** - * Store class and application names using Redis hashes - * key: default 'dubbo:mapping' - * field: class (serviceInterface) - * value: application_names - * @param serviceInterface field(class) - * @param defaultMappingGroup {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} - * @param newConfigContent new application_names - * @param ticket previous application_names - * @return - */ - @Override - public boolean registerServiceAppMapping( - String serviceInterface, String defaultMappingGroup, String newConfigContent, Object ticket) { - try { - if (null != ticket && !(ticket instanceof String)) { - throw new IllegalArgumentException("redis publishConfigCas requires stat type ticket"); - } - String pathKey = buildMappingKey(defaultMappingGroup); - - return storeMapping(pathKey, serviceInterface, newConfigContent, (String) ticket); - } catch (Exception e) { - logger.warn(TRANSPORT_FAILED_RESPONSE, "", "", "redis publishConfigCas failed.", e); - return false; - } - } - - private boolean storeMapping(String key, String field, String value, String ticket) { - if (pool != null) { - return storeMappingStandalone(key, field, value, ticket); - } else { - return storeMappingInCluster(key, field, value, ticket); - } - } - - /** - * use 'watch' to implement cas. - * Find information about slot distribution by key. - */ - private boolean storeMappingInCluster(String key, String field, String value, String ticket) { - try (JedisCluster jedisCluster = - new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { - Jedis jedis = new Jedis(jedisCluster.getConnectionFromSlot(JedisClusterCRC16.getSlot(key))); - jedis.watch(key); - String oldValue = jedis.hget(key, field); - if (null == oldValue || null == ticket || oldValue.equals(ticket)) { - Transaction transaction = jedis.multi(); - transaction.hset(key, field, value); - List result = transaction.exec(); - if (null != result) { - jedisCluster.publish(buildPubSubKey(), field); - return true; - } - } else { - jedis.unwatch(); - } - jedis.close(); - } catch (Throwable e) { - String msg = "Failed to put " + key + ":" + field + " to redis " + value + ", cause: " + e.getMessage(); - logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); - throw new RpcException(msg, e); - } - return false; - } - - /** - * use 'watch' to implement cas. - * Find information about slot distribution by key. - */ - private boolean storeMappingStandalone(String key, String field, String value, String ticket) { - try (Jedis jedis = pool.getResource()) { - jedis.watch(key); - String oldValue = jedis.hget(key, field); - if (null == oldValue || null == ticket || oldValue.equals(ticket)) { - Transaction transaction = jedis.multi(); - transaction.hset(key, field, value); - List result = transaction.exec(); - if (null != result) { - jedis.publish(buildPubSubKey(), field); - return true; - } - } - jedis.unwatch(); - } catch (Throwable e) { - String msg = "Failed to put " + key + ":" + field + " to redis " + value + ", cause: " + e.getMessage(); - logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); - throw new RpcException(msg, e); - } - return false; - } - - /** - * build mapping key - * @param defaultMappingGroup {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} - * @return - */ - private String buildMappingKey(String defaultMappingGroup) { - return this.root + GROUP_CHAR_SEPARATOR + defaultMappingGroup; - } - - /** - * build pub/sub key - */ - private String buildPubSubKey() { - return buildMappingKey(DEFAULT_MAPPING_GROUP) + GROUP_CHAR_SEPARATOR + QUEUES_KEY; - } - - /** - * get content and use content to complete cas - * @param serviceKey class - * @param group {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} - */ - @Override - public ConfigItem getConfigItem(String serviceKey, String group) { - String key = buildMappingKey(group); - String content = getMappingData(key, serviceKey); - - return new ConfigItem(content, content); - } - - /** - * get current application_names - */ - private String getMappingData(String key, String field) { - if (pool != null) { - return getMappingDataStandalone(key, field); - } else { - return getMappingDataInCluster(key, field); - } - } - - private String getMappingDataInCluster(String key, String field) { - try (JedisCluster jedisCluster = - new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { - return jedisCluster.hget(key, field); - } catch (Throwable e) { - String msg = "Failed to get " + key + ":" + field + " from redis cluster , cause: " + e.getMessage(); - logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); - throw new RpcException(msg, e); - } - } - - private String getMappingDataStandalone(String key, String field) { - try (Jedis jedis = pool.getResource()) { - return jedis.hget(key, field); - } catch (Throwable e) { - String msg = "Failed to get " + key + ":" + field + " from redis , cause: " + e.getMessage(); - logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); - throw new RpcException(msg, e); - } - } - - /** - * remove listener. If have no listener,thread will dead - */ - @Override - public void removeServiceAppMappingListener(String serviceKey, MappingListener listener) { - MappingDataListener mappingDataListener = mappingDataListenerMap.get(buildPubSubKey()); - if (null != mappingDataListener) { - NotifySub notifySub = mappingDataListener.getNotifySub(); - notifySub.removeListener(serviceKey, listener); - if (notifySub.isEmpty()) { - mappingDataListener.shutdown(); - } - } - } - - /** - * Start a thread and subscribe to {@link this#buildPubSubKey()}. - * Notify {@link MappingListener} if there is a change in the 'application_names' message. - */ - @Override - public Set getServiceAppMapping(String serviceKey, MappingListener listener, URL url) { - MappingDataListener mappingDataListener = - ConcurrentHashMapUtils.computeIfAbsent(mappingDataListenerMap, buildPubSubKey(), k -> { - MappingDataListener dataListener = new MappingDataListener(buildPubSubKey()); - dataListener.start(); - return dataListener; - }); - mappingDataListener.getNotifySub().addListener(serviceKey, listener); - return this.getServiceAppMapping(serviceKey, url); - } - - @Override - public Set getServiceAppMapping(String serviceKey, URL url) { - String key = buildMappingKey(DEFAULT_MAPPING_GROUP); - return getAppNames(getMappingData(key, serviceKey)); - } - - @Override - public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier identifier, Map instanceMetadata) { - String content = this.getMetadata(identifier); - return JsonUtils.toJavaObject(content, MetadataInfo.class); - } - - @Override - public void publishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) { - this.storeMetadata(identifier, metadataInfo.getContent(), false); - } - - @Override - public void unPublishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) { - this.deleteMetadata(identifier); - } - - // for test - public MappingDataListener getMappingDataListener() { - return mappingDataListenerMap.get(buildPubSubKey()); - } - - /** - * Listen for changes in the 'application_names' message and notify the listener. - */ - class NotifySub extends JedisPubSub { - - private final Map> listeners = new ConcurrentHashMap<>(); - - public void addListener(String key, MappingListener listener) { - Set listenerSet = listeners.computeIfAbsent(key, k -> new ConcurrentHashSet<>()); - listenerSet.add(listener); - } - - public void removeListener(String serviceKey, MappingListener listener) { - Set listenerSet = this.listeners.get(serviceKey); - if (listenerSet != null) { - listenerSet.remove(listener); - if (listenerSet.isEmpty()) { - this.listeners.remove(serviceKey); - } - } - } - - public Boolean isEmpty() { - return this.listeners.isEmpty(); - } - - @Override - public void onMessage(String key, String msg) { - logger.info("sub from redis:" + key + " message:" + msg); - String applicationNames = getMappingData(buildMappingKey(DEFAULT_MAPPING_GROUP), msg); - MappingChangedEvent mappingChangedEvent = new MappingChangedEvent(msg, getAppNames(applicationNames)); - if (!CollectionUtils.isEmpty(listeners.get(msg))) { - for (MappingListener mappingListener : listeners.get(msg)) { - mappingListener.onEvent(mappingChangedEvent); - } - } - } - - @Override - public void onPMessage(String pattern, String key, String msg) { - onMessage(key, msg); - } - - @Override - public void onPSubscribe(String pattern, int subscribedChannels) { - super.onPSubscribe(pattern, subscribedChannels); - } - } - - /** - * Subscribe application names change message. - */ - class MappingDataListener extends Thread { - - private String path; - - private final NotifySub notifySub = new NotifySub(); - // for test - protected volatile boolean running = true; - - public MappingDataListener(String path) { - this.path = path; - } - - public NotifySub getNotifySub() { - return notifySub; - } - - @Override - public void run() { - while (running) { - if (pool != null) { - try (Jedis jedis = pool.getResource()) { - jedis.subscribe(notifySub, path); - } catch (Throwable e) { - String msg = "Failed to subscribe " + path + ", cause: " + e.getMessage(); - logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); - throw new RpcException(msg, e); - } - } else { - try (JedisCluster jedisCluster = new JedisCluster( - jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { - jedisCluster.subscribe(notifySub, path); - } catch (Throwable e) { - String msg = "Failed to subscribe " + path + ", cause: " + e.getMessage(); - logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); - throw new RpcException(msg, e); - } - } - } - } - - public void shutdown() { - try { - running = false; - notifySub.unsubscribe(path); - } catch (Throwable e) { - String msg = "Failed to unsubscribe " + path + ", cause: " + e.getMessage(); - logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); - } - } - } -} diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedissonMetadataReport.java b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedissonMetadataReport.java new file mode 100644 index 000000000..834ef63f7 --- /dev/null +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedissonMetadataReport.java @@ -0,0 +1,301 @@ +package org.apache.dubbo.metadata.store.redis; + +import lombok.extern.slf4j.Slf4j; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.config.configcenter.ConfigItem; +import org.apache.dubbo.common.utils.CollectionUtils; +import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.metadata.MappingChangedEvent; +import org.apache.dubbo.metadata.MappingListener; +import org.apache.dubbo.metadata.MetadataInfo; +import org.apache.dubbo.metadata.report.identifier.*; +import org.apache.dubbo.metadata.report.support.AbstractMetadataReport; +import org.apache.dubbo.rpc.RpcException; +import org.dromara.common.core.utils.SpringUtils; +import org.redisson.api.RScript; +import org.redisson.api.RTopic; +import org.redisson.api.RedissonClient; +import org.redisson.client.codec.StringCodec; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.dubbo.common.constants.CommonConstants.*; +import static org.apache.dubbo.metadata.MetadataConstants.META_DATA_STORE_TAG; +import static org.apache.dubbo.metadata.ServiceNameMapping.DEFAULT_MAPPING_GROUP; +import static org.apache.dubbo.metadata.ServiceNameMapping.getAppNames; +import static org.apache.dubbo.metadata.report.support.Constants.DEFAULT_METADATA_REPORT_CYCLE_REPORT; + +/** + * 使用 Redisson 重新实现元数据中心 + */ +@Slf4j +public class RedissonMetadataReport extends AbstractMetadataReport { + + // Lua script for atomic CAS on a hash field: + // If the current value equals ticket (or field is absent, or ticket is empty), update and return 1; else return 0. + private static final String CAS_LUA =""" + local old = redis.call('HGET', KEYS[1], ARGV[1]) + if old == false or ARGV[3] == '' or old == ARGV[3] then + redis.call('HSET', KEYS[1], ARGV[1], ARGV[2]) + return 1 + end + return 0"""; + + private final String root; + private final long ttlMs; + + // Lazily initialized — Dubbo SPI creates this class before Spring is fully ready + private volatile RedissonClient redissonClient; + + // topic key → RTopic (keeps the subscription alive) + private final ConcurrentHashMap topicMap = new ConcurrentHashMap<>(); + // serviceKey → listeners (for dispatching mapping change events) + private final ConcurrentHashMap> listenerMap = new ConcurrentHashMap<>(); + + public RedissonMetadataReport(URL url) { + super(url); + this.root = url.getGroup(DEFAULT_ROOT); + this.ttlMs = url.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT) + ? ONE_DAY_IN_MILLISECONDS * 2L + : 0L; + } + + // ------------------------------------------------------------------------- + // Lazy RedissonClient accessor + // ------------------------------------------------------------------------- + + private RedissonClient getRedisson() { + if (redissonClient == null) { + synchronized (this) { + if (redissonClient == null) { + redissonClient = SpringUtils.getBean(RedissonClient.class); + } + } + } + return redissonClient; + } + + // ------------------------------------------------------------------------- + // AbstractMetadataReport — provider / consumer metadata + // ------------------------------------------------------------------------- + + @Override + protected void doStoreProviderMetadata(MetadataIdentifier id, String serviceDefinitions) { + storeMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), serviceDefinitions, true); + } + + @Override + protected void doStoreConsumerMetadata(MetadataIdentifier id, String value) { + storeMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), value, true); + } + + @Override + protected void doSaveMetadata(ServiceMetadataIdentifier id, URL url) { + storeMetadata(id.getIdentifierKey() + META_DATA_STORE_TAG, URL.encode(url.toFullString()), false); + } + + @Override + protected void doRemoveMetadata(ServiceMetadataIdentifier id) { + deleteMetadata(id.getIdentifierKey() + META_DATA_STORE_TAG); + } + + @Override + protected List doGetExportedURLs(ServiceMetadataIdentifier id) { + String content = getMetadata(id.getIdentifierKey() + META_DATA_STORE_TAG); + if (content == null || content.isEmpty()) { + return Collections.emptyList(); + } + return Collections.singletonList(URL.decode(content)); + } + + @Override + protected void doSaveSubscriberData(SubscriberMetadataIdentifier id, String urlListStr) { + storeMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), urlListStr, false); + } + + @Override + protected String doGetSubscribedURLs(SubscriberMetadataIdentifier id) { + return getMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); + } + + @Override + public String getServiceDefinition(MetadataIdentifier id) { + return getMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); + } + + // ------------------------------------------------------------------------- + // App-level metadata (Dubbo 3.x application-level service discovery) + // ------------------------------------------------------------------------- + + @Override + public void publishAppMetadata(SubscriberMetadataIdentifier id, MetadataInfo metadataInfo) { + storeMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), metadataInfo.getContent(), false); + } + + @Override + public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier id, Map instanceMetadata) { + String content = getMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); + return org.apache.dubbo.common.utils.JsonUtils.toJavaObject(content, MetadataInfo.class); + } + + @Override + public void unPublishAppMetadata(SubscriberMetadataIdentifier id, MetadataInfo metadataInfo) { + deleteMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); + } + + // ------------------------------------------------------------------------- + // Service-to-application mapping + // ------------------------------------------------------------------------- + + @Override + public boolean registerServiceAppMapping( + String serviceInterface, String defaultMappingGroup, String newConfigContent, Object ticket) { + try { + if (ticket != null && !(ticket instanceof String)) { + throw new IllegalArgumentException("Redis CAS requires a String ticket"); + } + return storeMappingWithCas( + buildMappingKey(defaultMappingGroup), + serviceInterface, + newConfigContent, + (String) ticket); + } catch (Exception e) { + log.warn("registerServiceAppMapping failed.", e); + return false; + } + } + + @Override + public ConfigItem getConfigItem(String serviceKey, String group) { + String key = buildMappingKey(group); + String content = getMappingField(key, serviceKey); + return new ConfigItem(content, content); + } + + @Override + public Set getServiceAppMapping(String serviceKey, MappingListener listener, URL url) { + String pubSubKey = buildPubSubKey(); + // Register the RTopic listener once per pubSubKey + topicMap.computeIfAbsent(pubSubKey, k -> { + RTopic topic = getRedisson().getTopic(k, StringCodec.INSTANCE); + topic.addListener(String.class, (channel, msg) -> { + String applicationNames = getMappingField(buildMappingKey(DEFAULT_MAPPING_GROUP), msg); + MappingChangedEvent event = new MappingChangedEvent(msg, getAppNames(applicationNames)); + Set ls = listenerMap.get(msg); + if (!CollectionUtils.isEmpty(ls)) { + ls.forEach(l -> l.onEvent(event)); + } + }); + return topic; + }); + listenerMap.computeIfAbsent(serviceKey, k -> new ConcurrentHashSet<>()).add(listener); + return getServiceAppMapping(serviceKey, url); + } + + @Override + public Set getServiceAppMapping(String serviceKey, URL url) { + return getAppNames(getMappingField(buildMappingKey(DEFAULT_MAPPING_GROUP), serviceKey)); + } + + @Override + public void removeServiceAppMappingListener(String serviceKey, MappingListener listener) { + Set ls = listenerMap.get(serviceKey); + if (ls != null) { + ls.remove(listener); + if (ls.isEmpty()) { + listenerMap.remove(serviceKey); + // If no listeners remain for any key, remove the topic subscription + if (listenerMap.isEmpty()) { + RTopic topic = topicMap.remove(buildPubSubKey()); + if (topic != null) { + topic.removeAllListeners(); + } + } + } + } + } + + // ------------------------------------------------------------------------- + // Internal Redis helpers + // ------------------------------------------------------------------------- + + private void storeMetadata(String key, String value, boolean ephemeral) { + try { + if (ephemeral && ttlMs > 0) { + getRedisson().getBucket(key, StringCodec.INSTANCE).set(value, Duration.ofMillis(ttlMs)); + } else { + getRedisson().getBucket(key, StringCodec.INSTANCE).set(value); + } + } catch (Exception e) { + String msg = "Failed to store metadata key=" + key + ", cause: " + e.getMessage(); + log.error(msg, e); + throw new RpcException(msg, e); + } + } + + private String getMetadata(String key) { + try { + return getRedisson().getBucket(key, StringCodec.INSTANCE).get(); + } catch (Exception e) { + String msg = "Failed to get metadata key=" + key + ", cause: " + e.getMessage(); + log.error(msg, e); + throw new RpcException(msg, e); + } + } + + private void deleteMetadata(String key) { + try { + getRedisson().getBucket(key, StringCodec.INSTANCE).delete(); + } catch (Exception e) { + String msg = "Failed to delete metadata key=" + key + ", cause: " + e.getMessage(); + log.error(msg, e); + throw new RpcException(msg, e); + } + } + + private String getMappingField(String key, String field) { + try { + return getRedisson().getMap(key, StringCodec.INSTANCE).get(field); + } catch (Exception e) { + String msg = "Failed to get mapping key=" + key + " field=" + field + ", cause: " + e.getMessage(); + log.error(msg, e); + throw new RpcException(msg, e); + } + } + + /** + * Atomic CAS on a hash field via Lua script. + * Updates field to newValue only when the current value equals ticket (or field is absent / ticket is null). + * On success, publishes a change notification to the pub/sub channel. + */ + private boolean storeMappingWithCas(String key, String field, String newValue, String ticket) { + try { + Long result = getRedisson().getScript(StringCodec.INSTANCE).eval( + RScript.Mode.READ_WRITE, + CAS_LUA, + RScript.ReturnType.INTEGER, + Collections.singletonList(key), + field, newValue, ticket == null ? "" : ticket + ); + if (Long.valueOf(1L).equals(result)) { + getRedisson().getTopic(buildPubSubKey(), StringCodec.INSTANCE).publish(field); + return true; + } + return false; + } catch (Exception e) { + String msg = "Failed to store mapping key=" + key + " field=" + field + ", cause: " + e.getMessage(); + log.error(msg, e); + throw new RpcException(msg, e); + } + } + + private String buildMappingKey(String mappingGroup) { + return this.root + GROUP_CHAR_SEPARATOR + mappingGroup; + } + + private String buildPubSubKey() { + return buildMappingKey(DEFAULT_MAPPING_GROUP) + GROUP_CHAR_SEPARATOR + QUEUES_KEY; + } +} diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedissonMetadataReportFactory.java b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedissonMetadataReportFactory.java new file mode 100644 index 000000000..51aa80f03 --- /dev/null +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedissonMetadataReportFactory.java @@ -0,0 +1,17 @@ +package org.apache.dubbo.metadata.store.redis; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.metadata.report.MetadataReport; +import org.apache.dubbo.metadata.report.support.AbstractMetadataReportFactory; + +/** + * RedisMetadataReportFactory. + */ +public class RedissonMetadataReportFactory extends AbstractMetadataReportFactory { + + @Override + public MetadataReport createMetadataReport(URL url) { + return new RedissonMetadataReport(url); + } + +} diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.report.MetadataReportFactory b/ruoyi-common/ruoyi-common-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.report.MetadataReportFactory new file mode 100644 index 000000000..c69ece412 --- /dev/null +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.report.MetadataReportFactory @@ -0,0 +1 @@ +redis=org.apache.dubbo.metadata.store.redis.RedissonMetadataReportFactory diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/resources/common-dubbo.yml b/ruoyi-common/ruoyi-common-dubbo/src/main/resources/common-dubbo.yml index ddf7f6980..3000f2b3a 100644 --- a/ruoyi-common/ruoyi-common-dubbo/src/main/resources/common-dubbo.yml +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/resources/common-dubbo.yml @@ -19,6 +19,8 @@ dubbo: password: ${spring.cloud.nacos.password} parameters: namespace: ${spring.profiles.active} + # 已经采用框架内 Redisson 实现元数据中心 + # 以下配置不用管占位用 不然 dubbo 会直接用 nacos 当注册中心 直接配好框架自带的 Redisson 即可 metadata-report: address: redis://${spring.data.redis.host:localhost}:${spring.data.redis.port:6379} group: DUBBO_GROUP diff --git a/script/config/nacos/application-common.yml b/script/config/nacos/application-common.yml index 053113150..5e5a8023d 100644 --- a/script/config/nacos/application-common.yml +++ b/script/config/nacos/application-common.yml @@ -32,13 +32,6 @@ dubbo: consumer: # 超时时间 timeout: 3000 - metadata-report: - # Redis集群开关 - cluster: false - parameters: - # 集群地址 cluster 为 true 生效 - # 集群把所有Redis集群节点写到这里就行了 - backup: 127.0.0.1:6379,127.0.0.1:6381 # 自定义配置 custom: # 全局请求log