update 优化 使用redisson重写dubbo元数据中心实现(dubbo官方jedis实现类断连问题太多) 彻底解决重新部署服务 元数据更新失败问题

This commit is contained in:
疯狂的狮子Li
2026-04-16 12:40:13 +08:00
parent 93adb1080a
commit a356e030cd
8 changed files with 324 additions and 565 deletions

View File

@@ -37,20 +37,10 @@
</dependency>
<dependency>
<groupId>org.apache.dubbo.extensions</groupId>
<artifactId>dubbo-metadata-report-redis</artifactId>
<exclusions>
<exclusion>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>5.2.0</version>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>

View File

@@ -1,538 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.metadata.store.redis;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.configcenter.ConfigItem;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.*;
import org.apache.dubbo.metadata.MappingChangedEvent;
import org.apache.dubbo.metadata.MappingListener;
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.metadata.ServiceNameMapping;
import org.apache.dubbo.metadata.report.identifier.*;
import org.apache.dubbo.metadata.report.support.AbstractMetadataReport;
import org.apache.dubbo.rpc.RpcException;
import redis.clients.jedis.*;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.util.JedisClusterCRC16;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.dubbo.common.constants.CommonConstants.*;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RESPONSE;
import static org.apache.dubbo.metadata.MetadataConstants.META_DATA_STORE_TAG;
import static org.apache.dubbo.metadata.ServiceNameMapping.DEFAULT_MAPPING_GROUP;
import static org.apache.dubbo.metadata.ServiceNameMapping.getAppNames;
import static org.apache.dubbo.metadata.report.support.Constants.DEFAULT_METADATA_REPORT_CYCLE_REPORT;
/**
* RedisMetadataReport
*/
public class RedisMetadataReport extends AbstractMetadataReport {
private static final String REDIS_DATABASE_KEY = "database";
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(RedisMetadataReport.class);
// protected , for test
protected JedisPool pool;
private Set<HostAndPort> jedisClusterNodes;
private int timeout;
private String username;
private String password;
private final String root;
private final ConcurrentHashMap<String, MappingDataListener> mappingDataListenerMap = new ConcurrentHashMap<>();
private SetParams jedisParams = SetParams.setParams();
public RedisMetadataReport(URL url) {
super(url);
timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
username = url.getUsername();
password = url.getPassword();
this.root = url.getGroup(DEFAULT_ROOT);
if (url.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) {
// ttl default is twice the cycle-report time
jedisParams.px(ONE_DAY_IN_MILLISECONDS * 2);
}
if (url.getParameter(CLUSTER_KEY, false)) {
jedisClusterNodes = new HashSet<>();
List<URL> urls = url.getBackupUrls();
for (URL tmpUrl : urls) {
jedisClusterNodes.add(new HostAndPort(tmpUrl.getHost(), tmpUrl.getPort()));
}
} else {
int database = url.getParameter(REDIS_DATABASE_KEY, 0);
pool = new JedisPool(new JedisPoolConfig(), url.getHost(), url.getPort(), timeout, username, password, database);
}
}
@Override
protected void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions) {
this.storeMetadata(providerMetadataIdentifier, serviceDefinitions, true);
}
@Override
protected void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String value) {
this.storeMetadata(consumerMetadataIdentifier, value, true);
}
@Override
protected void doSaveMetadata(ServiceMetadataIdentifier serviceMetadataIdentifier, URL url) {
this.storeMetadata(serviceMetadataIdentifier, URL.encode(url.toFullString()), false);
}
@Override
protected void doRemoveMetadata(ServiceMetadataIdentifier serviceMetadataIdentifier) {
this.deleteMetadata(serviceMetadataIdentifier);
}
@Override
protected List<String> doGetExportedURLs(ServiceMetadataIdentifier metadataIdentifier) {
String content = getMetadata(metadataIdentifier);
if (StringUtils.isEmpty(content)) {
return Collections.emptyList();
}
return new ArrayList<>(Arrays.asList(URL.decode(content)));
}
@Override
protected void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr) {
this.storeMetadata(subscriberMetadataIdentifier, urlListStr, false);
}
@Override
protected String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier) {
return this.getMetadata(subscriberMetadataIdentifier);
}
@Override
public String getServiceDefinition(MetadataIdentifier metadataIdentifier) {
return this.getMetadata(metadataIdentifier);
}
private void storeMetadata(BaseMetadataIdentifier metadataIdentifier, String v, boolean ephemeral) {
if (pool != null) {
storeMetadataStandalone(metadataIdentifier, v, ephemeral);
} else {
storeMetadataInCluster(metadataIdentifier, v, ephemeral);
}
}
private void storeMetadataInCluster(BaseMetadataIdentifier metadataIdentifier, String v, boolean ephemeral) {
try (JedisCluster jedisCluster =
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
if (ephemeral) {
jedisCluster.set(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG, v, jedisParams);
} else {
jedisCluster.set(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG, v);
}
} catch (Throwable e) {
String msg =
"Failed to put " + metadataIdentifier + " to redis cluster " + v + ", cause: " + e.getMessage();
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
throw new RpcException(msg, e);
}
}
private void storeMetadataStandalone(BaseMetadataIdentifier metadataIdentifier, String v, boolean ephemeral) {
try (Jedis jedis = pool.getResource()) {
if (ephemeral) {
jedis.set(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v, jedisParams);
} else {
jedis.set(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v);
}
} catch (Throwable e) {
String msg = "Failed to put " + metadataIdentifier + " to redis " + v + ", cause: " + e.getMessage();
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
throw new RpcException(msg, e);
}
}
private void deleteMetadata(BaseMetadataIdentifier metadataIdentifier) {
if (pool != null) {
deleteMetadataStandalone(metadataIdentifier);
} else {
deleteMetadataInCluster(metadataIdentifier);
}
}
private void deleteMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) {
try (JedisCluster jedisCluster =
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
jedisCluster.del(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG);
} catch (Throwable e) {
String msg = "Failed to delete " + metadataIdentifier + " from redis cluster , cause: " + e.getMessage();
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
throw new RpcException(msg, e);
}
}
private void deleteMetadataStandalone(BaseMetadataIdentifier metadataIdentifier) {
try (Jedis jedis = pool.getResource()) {
jedis.del(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
} catch (Throwable e) {
String msg = "Failed to delete " + metadataIdentifier + " from redis , cause: " + e.getMessage();
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
throw new RpcException(msg, e);
}
}
private String getMetadata(BaseMetadataIdentifier metadataIdentifier) {
if (pool != null) {
return getMetadataStandalone(metadataIdentifier);
} else {
return getMetadataInCluster(metadataIdentifier);
}
}
private String getMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) {
try (JedisCluster jedisCluster =
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
return jedisCluster.get(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG);
} catch (Throwable e) {
String msg = "Failed to get " + metadataIdentifier + " from redis cluster , cause: " + e.getMessage();
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
throw new RpcException(msg, e);
}
}
private String getMetadataStandalone(BaseMetadataIdentifier metadataIdentifier) {
try (Jedis jedis = pool.getResource()) {
return jedis.get(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
} catch (Throwable e) {
String msg = "Failed to get " + metadataIdentifier + " from redis , cause: " + e.getMessage();
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
throw new RpcException(msg, e);
}
}
/**
* Store class and application names using Redis hashes
* key: default 'dubbo:mapping'
* field: class (serviceInterface)
* value: application_names
* @param serviceInterface field(class)
* @param defaultMappingGroup {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP}
* @param newConfigContent new application_names
* @param ticket previous application_names
* @return
*/
@Override
public boolean registerServiceAppMapping(
String serviceInterface, String defaultMappingGroup, String newConfigContent, Object ticket) {
try {
if (null != ticket && !(ticket instanceof String)) {
throw new IllegalArgumentException("redis publishConfigCas requires stat type ticket");
}
String pathKey = buildMappingKey(defaultMappingGroup);
return storeMapping(pathKey, serviceInterface, newConfigContent, (String) ticket);
} catch (Exception e) {
logger.warn(TRANSPORT_FAILED_RESPONSE, "", "", "redis publishConfigCas failed.", e);
return false;
}
}
private boolean storeMapping(String key, String field, String value, String ticket) {
if (pool != null) {
return storeMappingStandalone(key, field, value, ticket);
} else {
return storeMappingInCluster(key, field, value, ticket);
}
}
/**
* use 'watch' to implement cas.
* Find information about slot distribution by key.
*/
private boolean storeMappingInCluster(String key, String field, String value, String ticket) {
try (JedisCluster jedisCluster =
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
Jedis jedis = new Jedis(jedisCluster.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)));
jedis.watch(key);
String oldValue = jedis.hget(key, field);
if (null == oldValue || null == ticket || oldValue.equals(ticket)) {
Transaction transaction = jedis.multi();
transaction.hset(key, field, value);
List<Object> result = transaction.exec();
if (null != result) {
jedisCluster.publish(buildPubSubKey(), field);
return true;
}
} else {
jedis.unwatch();
}
jedis.close();
} catch (Throwable e) {
String msg = "Failed to put " + key + ":" + field + " to redis " + value + ", cause: " + e.getMessage();
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
throw new RpcException(msg, e);
}
return false;
}
/**
* use 'watch' to implement cas.
* Find information about slot distribution by key.
*/
private boolean storeMappingStandalone(String key, String field, String value, String ticket) {
try (Jedis jedis = pool.getResource()) {
jedis.watch(key);
String oldValue = jedis.hget(key, field);
if (null == oldValue || null == ticket || oldValue.equals(ticket)) {
Transaction transaction = jedis.multi();
transaction.hset(key, field, value);
List<Object> result = transaction.exec();
if (null != result) {
jedis.publish(buildPubSubKey(), field);
return true;
}
}
jedis.unwatch();
} catch (Throwable e) {
String msg = "Failed to put " + key + ":" + field + " to redis " + value + ", cause: " + e.getMessage();
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
throw new RpcException(msg, e);
}
return false;
}
/**
* build mapping key
* @param defaultMappingGroup {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP}
* @return
*/
private String buildMappingKey(String defaultMappingGroup) {
return this.root + GROUP_CHAR_SEPARATOR + defaultMappingGroup;
}
/**
* build pub/sub key
*/
private String buildPubSubKey() {
return buildMappingKey(DEFAULT_MAPPING_GROUP) + GROUP_CHAR_SEPARATOR + QUEUES_KEY;
}
/**
* get content and use content to complete cas
* @param serviceKey class
* @param group {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP}
*/
@Override
public ConfigItem getConfigItem(String serviceKey, String group) {
String key = buildMappingKey(group);
String content = getMappingData(key, serviceKey);
return new ConfigItem(content, content);
}
/**
* get current application_names
*/
private String getMappingData(String key, String field) {
if (pool != null) {
return getMappingDataStandalone(key, field);
} else {
return getMappingDataInCluster(key, field);
}
}
private String getMappingDataInCluster(String key, String field) {
try (JedisCluster jedisCluster =
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
return jedisCluster.hget(key, field);
} catch (Throwable e) {
String msg = "Failed to get " + key + ":" + field + " from redis cluster , cause: " + e.getMessage();
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
throw new RpcException(msg, e);
}
}
private String getMappingDataStandalone(String key, String field) {
try (Jedis jedis = pool.getResource()) {
return jedis.hget(key, field);
} catch (Throwable e) {
String msg = "Failed to get " + key + ":" + field + " from redis , cause: " + e.getMessage();
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
throw new RpcException(msg, e);
}
}
/**
* remove listener. If have no listener,thread will dead
*/
@Override
public void removeServiceAppMappingListener(String serviceKey, MappingListener listener) {
MappingDataListener mappingDataListener = mappingDataListenerMap.get(buildPubSubKey());
if (null != mappingDataListener) {
NotifySub notifySub = mappingDataListener.getNotifySub();
notifySub.removeListener(serviceKey, listener);
if (notifySub.isEmpty()) {
mappingDataListener.shutdown();
}
}
}
/**
* Start a thread and subscribe to {@link this#buildPubSubKey()}.
* Notify {@link MappingListener} if there is a change in the 'application_names' message.
*/
@Override
public Set<String> getServiceAppMapping(String serviceKey, MappingListener listener, URL url) {
MappingDataListener mappingDataListener =
ConcurrentHashMapUtils.computeIfAbsent(mappingDataListenerMap, buildPubSubKey(), k -> {
MappingDataListener dataListener = new MappingDataListener(buildPubSubKey());
dataListener.start();
return dataListener;
});
mappingDataListener.getNotifySub().addListener(serviceKey, listener);
return this.getServiceAppMapping(serviceKey, url);
}
@Override
public Set<String> getServiceAppMapping(String serviceKey, URL url) {
String key = buildMappingKey(DEFAULT_MAPPING_GROUP);
return getAppNames(getMappingData(key, serviceKey));
}
@Override
public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier identifier, Map<String, String> instanceMetadata) {
String content = this.getMetadata(identifier);
return JsonUtils.toJavaObject(content, MetadataInfo.class);
}
@Override
public void publishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) {
this.storeMetadata(identifier, metadataInfo.getContent(), false);
}
@Override
public void unPublishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) {
this.deleteMetadata(identifier);
}
// for test
public MappingDataListener getMappingDataListener() {
return mappingDataListenerMap.get(buildPubSubKey());
}
/**
* Listen for changes in the 'application_names' message and notify the listener.
*/
class NotifySub extends JedisPubSub {
private final Map<String, Set<MappingListener>> listeners = new ConcurrentHashMap<>();
public void addListener(String key, MappingListener listener) {
Set<MappingListener> listenerSet = listeners.computeIfAbsent(key, k -> new ConcurrentHashSet<>());
listenerSet.add(listener);
}
public void removeListener(String serviceKey, MappingListener listener) {
Set<MappingListener> listenerSet = this.listeners.get(serviceKey);
if (listenerSet != null) {
listenerSet.remove(listener);
if (listenerSet.isEmpty()) {
this.listeners.remove(serviceKey);
}
}
}
public Boolean isEmpty() {
return this.listeners.isEmpty();
}
@Override
public void onMessage(String key, String msg) {
logger.info("sub from redis:" + key + " message:" + msg);
String applicationNames = getMappingData(buildMappingKey(DEFAULT_MAPPING_GROUP), msg);
MappingChangedEvent mappingChangedEvent = new MappingChangedEvent(msg, getAppNames(applicationNames));
if (!CollectionUtils.isEmpty(listeners.get(msg))) {
for (MappingListener mappingListener : listeners.get(msg)) {
mappingListener.onEvent(mappingChangedEvent);
}
}
}
@Override
public void onPMessage(String pattern, String key, String msg) {
onMessage(key, msg);
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
super.onPSubscribe(pattern, subscribedChannels);
}
}
/**
* Subscribe application names change message.
*/
class MappingDataListener extends Thread {
private String path;
private final NotifySub notifySub = new NotifySub();
// for test
protected volatile boolean running = true;
public MappingDataListener(String path) {
this.path = path;
}
public NotifySub getNotifySub() {
return notifySub;
}
@Override
public void run() {
while (running) {
if (pool != null) {
try (Jedis jedis = pool.getResource()) {
jedis.subscribe(notifySub, path);
} catch (Throwable e) {
String msg = "Failed to subscribe " + path + ", cause: " + e.getMessage();
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
throw new RpcException(msg, e);
}
} else {
try (JedisCluster jedisCluster = new JedisCluster(
jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
jedisCluster.subscribe(notifySub, path);
} catch (Throwable e) {
String msg = "Failed to subscribe " + path + ", cause: " + e.getMessage();
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
throw new RpcException(msg, e);
}
}
}
}
public void shutdown() {
try {
running = false;
notifySub.unsubscribe(path);
} catch (Throwable e) {
String msg = "Failed to unsubscribe " + path + ", cause: " + e.getMessage();
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
}
}
}
}

View File

@@ -0,0 +1,301 @@
package org.apache.dubbo.metadata.store.redis;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.configcenter.ConfigItem;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.metadata.MappingChangedEvent;
import org.apache.dubbo.metadata.MappingListener;
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.metadata.report.identifier.*;
import org.apache.dubbo.metadata.report.support.AbstractMetadataReport;
import org.apache.dubbo.rpc.RpcException;
import org.dromara.common.core.utils.SpringUtils;
import org.redisson.api.RScript;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.dubbo.common.constants.CommonConstants.*;
import static org.apache.dubbo.metadata.MetadataConstants.META_DATA_STORE_TAG;
import static org.apache.dubbo.metadata.ServiceNameMapping.DEFAULT_MAPPING_GROUP;
import static org.apache.dubbo.metadata.ServiceNameMapping.getAppNames;
import static org.apache.dubbo.metadata.report.support.Constants.DEFAULT_METADATA_REPORT_CYCLE_REPORT;
/**
* 使用 Redisson 重新实现元数据中心
*/
@Slf4j
public class RedissonMetadataReport extends AbstractMetadataReport {
// Lua script for atomic CAS on a hash field:
// If the current value equals ticket (or field is absent, or ticket is empty), update and return 1; else return 0.
private static final String CAS_LUA ="""
local old = redis.call('HGET', KEYS[1], ARGV[1])
if old == false or ARGV[3] == '' or old == ARGV[3] then
redis.call('HSET', KEYS[1], ARGV[1], ARGV[2])
return 1
end
return 0""";
private final String root;
private final long ttlMs;
// Lazily initialized — Dubbo SPI creates this class before Spring is fully ready
private volatile RedissonClient redissonClient;
// topic key → RTopic (keeps the subscription alive)
private final ConcurrentHashMap<String, RTopic> topicMap = new ConcurrentHashMap<>();
// serviceKey → listeners (for dispatching mapping change events)
private final ConcurrentHashMap<String, Set<MappingListener>> listenerMap = new ConcurrentHashMap<>();
public RedissonMetadataReport(URL url) {
super(url);
this.root = url.getGroup(DEFAULT_ROOT);
this.ttlMs = url.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)
? ONE_DAY_IN_MILLISECONDS * 2L
: 0L;
}
// -------------------------------------------------------------------------
// Lazy RedissonClient accessor
// -------------------------------------------------------------------------
private RedissonClient getRedisson() {
if (redissonClient == null) {
synchronized (this) {
if (redissonClient == null) {
redissonClient = SpringUtils.getBean(RedissonClient.class);
}
}
}
return redissonClient;
}
// -------------------------------------------------------------------------
// AbstractMetadataReport — provider / consumer metadata
// -------------------------------------------------------------------------
@Override
protected void doStoreProviderMetadata(MetadataIdentifier id, String serviceDefinitions) {
storeMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), serviceDefinitions, true);
}
@Override
protected void doStoreConsumerMetadata(MetadataIdentifier id, String value) {
storeMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), value, true);
}
@Override
protected void doSaveMetadata(ServiceMetadataIdentifier id, URL url) {
storeMetadata(id.getIdentifierKey() + META_DATA_STORE_TAG, URL.encode(url.toFullString()), false);
}
@Override
protected void doRemoveMetadata(ServiceMetadataIdentifier id) {
deleteMetadata(id.getIdentifierKey() + META_DATA_STORE_TAG);
}
@Override
protected List<String> doGetExportedURLs(ServiceMetadataIdentifier id) {
String content = getMetadata(id.getIdentifierKey() + META_DATA_STORE_TAG);
if (content == null || content.isEmpty()) {
return Collections.emptyList();
}
return Collections.singletonList(URL.decode(content));
}
@Override
protected void doSaveSubscriberData(SubscriberMetadataIdentifier id, String urlListStr) {
storeMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), urlListStr, false);
}
@Override
protected String doGetSubscribedURLs(SubscriberMetadataIdentifier id) {
return getMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
}
@Override
public String getServiceDefinition(MetadataIdentifier id) {
return getMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
}
// -------------------------------------------------------------------------
// App-level metadata (Dubbo 3.x application-level service discovery)
// -------------------------------------------------------------------------
@Override
public void publishAppMetadata(SubscriberMetadataIdentifier id, MetadataInfo metadataInfo) {
storeMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), metadataInfo.getContent(), false);
}
@Override
public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier id, Map<String, String> instanceMetadata) {
String content = getMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
return org.apache.dubbo.common.utils.JsonUtils.toJavaObject(content, MetadataInfo.class);
}
@Override
public void unPublishAppMetadata(SubscriberMetadataIdentifier id, MetadataInfo metadataInfo) {
deleteMetadata(id.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
}
// -------------------------------------------------------------------------
// Service-to-application mapping
// -------------------------------------------------------------------------
@Override
public boolean registerServiceAppMapping(
String serviceInterface, String defaultMappingGroup, String newConfigContent, Object ticket) {
try {
if (ticket != null && !(ticket instanceof String)) {
throw new IllegalArgumentException("Redis CAS requires a String ticket");
}
return storeMappingWithCas(
buildMappingKey(defaultMappingGroup),
serviceInterface,
newConfigContent,
(String) ticket);
} catch (Exception e) {
log.warn("registerServiceAppMapping failed.", e);
return false;
}
}
@Override
public ConfigItem getConfigItem(String serviceKey, String group) {
String key = buildMappingKey(group);
String content = getMappingField(key, serviceKey);
return new ConfigItem(content, content);
}
@Override
public Set<String> getServiceAppMapping(String serviceKey, MappingListener listener, URL url) {
String pubSubKey = buildPubSubKey();
// Register the RTopic listener once per pubSubKey
topicMap.computeIfAbsent(pubSubKey, k -> {
RTopic topic = getRedisson().getTopic(k, StringCodec.INSTANCE);
topic.addListener(String.class, (channel, msg) -> {
String applicationNames = getMappingField(buildMappingKey(DEFAULT_MAPPING_GROUP), msg);
MappingChangedEvent event = new MappingChangedEvent(msg, getAppNames(applicationNames));
Set<MappingListener> ls = listenerMap.get(msg);
if (!CollectionUtils.isEmpty(ls)) {
ls.forEach(l -> l.onEvent(event));
}
});
return topic;
});
listenerMap.computeIfAbsent(serviceKey, k -> new ConcurrentHashSet<>()).add(listener);
return getServiceAppMapping(serviceKey, url);
}
@Override
public Set<String> getServiceAppMapping(String serviceKey, URL url) {
return getAppNames(getMappingField(buildMappingKey(DEFAULT_MAPPING_GROUP), serviceKey));
}
@Override
public void removeServiceAppMappingListener(String serviceKey, MappingListener listener) {
Set<MappingListener> ls = listenerMap.get(serviceKey);
if (ls != null) {
ls.remove(listener);
if (ls.isEmpty()) {
listenerMap.remove(serviceKey);
// If no listeners remain for any key, remove the topic subscription
if (listenerMap.isEmpty()) {
RTopic topic = topicMap.remove(buildPubSubKey());
if (topic != null) {
topic.removeAllListeners();
}
}
}
}
}
// -------------------------------------------------------------------------
// Internal Redis helpers
// -------------------------------------------------------------------------
private void storeMetadata(String key, String value, boolean ephemeral) {
try {
if (ephemeral && ttlMs > 0) {
getRedisson().<String>getBucket(key, StringCodec.INSTANCE).set(value, Duration.ofMillis(ttlMs));
} else {
getRedisson().<String>getBucket(key, StringCodec.INSTANCE).set(value);
}
} catch (Exception e) {
String msg = "Failed to store metadata key=" + key + ", cause: " + e.getMessage();
log.error(msg, e);
throw new RpcException(msg, e);
}
}
private String getMetadata(String key) {
try {
return getRedisson().<String>getBucket(key, StringCodec.INSTANCE).get();
} catch (Exception e) {
String msg = "Failed to get metadata key=" + key + ", cause: " + e.getMessage();
log.error(msg, e);
throw new RpcException(msg, e);
}
}
private void deleteMetadata(String key) {
try {
getRedisson().getBucket(key, StringCodec.INSTANCE).delete();
} catch (Exception e) {
String msg = "Failed to delete metadata key=" + key + ", cause: " + e.getMessage();
log.error(msg, e);
throw new RpcException(msg, e);
}
}
private String getMappingField(String key, String field) {
try {
return getRedisson().<String, String>getMap(key, StringCodec.INSTANCE).get(field);
} catch (Exception e) {
String msg = "Failed to get mapping key=" + key + " field=" + field + ", cause: " + e.getMessage();
log.error(msg, e);
throw new RpcException(msg, e);
}
}
/**
* Atomic CAS on a hash field via Lua script.
* Updates field to newValue only when the current value equals ticket (or field is absent / ticket is null).
* On success, publishes a change notification to the pub/sub channel.
*/
private boolean storeMappingWithCas(String key, String field, String newValue, String ticket) {
try {
Long result = getRedisson().getScript(StringCodec.INSTANCE).eval(
RScript.Mode.READ_WRITE,
CAS_LUA,
RScript.ReturnType.INTEGER,
Collections.singletonList(key),
field, newValue, ticket == null ? "" : ticket
);
if (Long.valueOf(1L).equals(result)) {
getRedisson().getTopic(buildPubSubKey(), StringCodec.INSTANCE).publish(field);
return true;
}
return false;
} catch (Exception e) {
String msg = "Failed to store mapping key=" + key + " field=" + field + ", cause: " + e.getMessage();
log.error(msg, e);
throw new RpcException(msg, e);
}
}
private String buildMappingKey(String mappingGroup) {
return this.root + GROUP_CHAR_SEPARATOR + mappingGroup;
}
private String buildPubSubKey() {
return buildMappingKey(DEFAULT_MAPPING_GROUP) + GROUP_CHAR_SEPARATOR + QUEUES_KEY;
}
}

View File

@@ -0,0 +1,17 @@
package org.apache.dubbo.metadata.store.redis;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.metadata.report.MetadataReport;
import org.apache.dubbo.metadata.report.support.AbstractMetadataReportFactory;
/**
* RedisMetadataReportFactory.
*/
public class RedissonMetadataReportFactory extends AbstractMetadataReportFactory {
@Override
public MetadataReport createMetadataReport(URL url) {
return new RedissonMetadataReport(url);
}
}

View File

@@ -0,0 +1 @@
redis=org.apache.dubbo.metadata.store.redis.RedissonMetadataReportFactory

View File

@@ -19,6 +19,8 @@ dubbo:
password: ${spring.cloud.nacos.password}
parameters:
namespace: ${spring.profiles.active}
# 已经采用框架内 Redisson 实现元数据中心
# 以下配置不用管占位用 不然 dubbo 会直接用 nacos 当注册中心 直接配好框架自带的 Redisson 即可
metadata-report:
address: redis://${spring.data.redis.host:localhost}:${spring.data.redis.port:6379}
group: DUBBO_GROUP