Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: fall back to any of available cluster address when query cluster address is empty #6797

Merged
merged 10 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6787](https://github.com/apache/incubator-seata/pull/6787)] upgrade elliptic to 6.5.7
- [[#6783](https://github.com/apache/incubator-seata/pull/6783)] rename the server naming/v1 api to vgroup/v1
- [[#6793](https://github.com/apache/incubator-seata/pull/6793)] fix npmjs conflicts
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] fall back to any of available cluster address when query cluster address is empty
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved

### refactor:

Expand Down Expand Up @@ -112,6 +113,7 @@ Thanks to these contributors for their code commits. Please report an unintended
- [imashimaro](https://github.com/hmj776521114)
- [lyl2008dsg](https://github.com/lyl2008dsg)
- [l81893521](https://github.com/l81893521)
- [laywin](https://github.com/laywin)


Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
3 changes: 2 additions & 1 deletion changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
- [[#6787](https://github.com/apache/incubator-seata/pull/6787)] 升级 elliptic 至 6.5.7 版本
- [[#6783](https://github.com/apache/incubator-seata/pull/6783)] 将server事务分组修改接口改为/vgroup/v1
- [[#6793](https://github.com/apache/incubator-seata/pull/6793)] 修复 npmjs 依赖冲突问题

- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] 当查询的集群地址为空时,获取可用的任意集群地址

### refactor:

Expand Down Expand Up @@ -116,6 +116,7 @@
- [imashimaro](https://github.com/hmj776521114)
- [lyl2008dsg](https://github.com/lyl2008dsg)
- [l81893521](https://github.com/l81893521)
- [laywin](https://github.com/laywin)



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class ConsulRegistryServiceImpl implements RegistryService<ConsulListener
private static final int THREAD_POOL_NUM = 1;
private static final int MAP_INITIAL_CAPACITY = 8;

private String transactionServiceGroup;

/**
* default tcp check interval
*/
Expand Down Expand Up @@ -161,6 +163,7 @@ public void unsubscribe(String cluster, ConsulListener listener) throws Exceptio

@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
transactionServiceGroup = key;
final String cluster = getServiceGroup(key);
if (cluster == null) {
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
Expand Down Expand Up @@ -311,7 +314,7 @@ private void refreshCluster(String cluster, List<HealthService> services) {

clusterAddressMap.put(cluster, addresses);

removeOfflineAddressesIfNecessary(cluster, addresses);
removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, addresses);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.seata.discovery.registry;

import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.config.ConfigurationFactory;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -27,8 +30,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.seata.config.ConfigurationFactory;

/**
* The interface Registry service.
*
Expand All @@ -54,7 +55,7 @@ public interface RegistryService<T> {
/**
* Service node health check
*/
Map<String,List<InetSocketAddress>> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>();
Map<String, Map<String, List<InetSocketAddress>>> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>();
/**
* Register.
*
Expand Down Expand Up @@ -119,12 +120,28 @@ default String getServiceGroup(String key) {
}

default List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
return CURRENT_ADDRESS_MAP.computeIfAbsent(getServiceGroup(transactionServiceGroup), k -> new ArrayList<>());
Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
k -> new ConcurrentHashMap<>());

String clusterName = getServiceGroup(transactionServiceGroup);
List<InetSocketAddress> inetSocketAddresses = clusterAddressMap.get(clusterName);
if (CollectionUtils.isNotEmpty(inetSocketAddresses)) {
return inetSocketAddresses;
}

// fall back to addresses of any cluster
return clusterAddressMap.values().stream().findAny().orElse(Collections.emptyList());
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
}

default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
List<InetSocketAddress> aliveAddress) {
return CURRENT_ADDRESS_MAP.put(getServiceGroup(transactionServiceGroup), aliveAddress);

Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
key -> new ConcurrentHashMap<>());

String clusterName = getServiceGroup(transactionServiceGroup);

return clusterAddressMap.put(clusterName, aliveAddress);
}


Expand All @@ -137,15 +154,18 @@ default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGrou
* @param clusterName
* @param newAddressed
*/
default void removeOfflineAddressesIfNecessary(String clusterName, Collection<InetSocketAddress> newAddressed) {
default void removeOfflineAddressesIfNecessary(String transactionGroupService, String clusterName, Collection<InetSocketAddress> newAddressed) {

Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionGroupService,
key -> new ConcurrentHashMap<>());

List<InetSocketAddress> currentAddresses = CURRENT_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptyList());
List<InetSocketAddress> currentAddresses = clusterAddressMap.getOrDefault(clusterName, new ArrayList<>());

List<InetSocketAddress> inetSocketAddresses = currentAddresses
.stream().filter(newAddressed::contains).collect(
Collectors.toList());

CURRENT_ADDRESS_MAP.put(clusterName, inetSocketAddresses);
clusterAddressMap.put(clusterName, inetSocketAddresses);
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener>
private static final int MAP_INITIAL_CAPACITY = 8;
private static final int THREAD_POOL_SIZE = 2;
private ExecutorService executorService;

private String transactionServiceGroup;
/**
* TTL for lease
*/
Expand Down Expand Up @@ -181,6 +183,7 @@ public void unsubscribe(String cluster, Watch.Listener listener) throws Exceptio

@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
transactionServiceGroup = key;
final String cluster = getServiceGroup(key);
if (cluster == null) {
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
Expand Down Expand Up @@ -252,7 +255,7 @@ private void refreshCluster(String cluster) throws Exception {
}).collect(Collectors.toList());
clusterAddressMap.put(cluster, new Pair<>(getResponse.getHeader().getRevision(), instanceList));

removeOfflineAddressesIfNecessary(cluster, instanceList);
removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, instanceList);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class EurekaRegistryServiceImpl implements RegistryService<EurekaEventLis
private static volatile EurekaRegistryServiceImpl instance;
private static volatile EurekaClient eurekaClient;

private String transactionServiceGroup;

private EurekaRegistryServiceImpl() {
}

Expand Down Expand Up @@ -130,6 +132,7 @@ public void unsubscribe(String cluster, EurekaEventListener listener) throws Exc

@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
transactionServiceGroup = key;
String clusterName = getServiceGroup(key);
if (clusterName == null) {
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
Expand Down Expand Up @@ -169,7 +172,7 @@ private void refreshCluster(String clusterName) {
.collect(Collectors.toList());
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);

removeOfflineAddressesIfNecessary(clusterName, newAddressList);
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class NacosRegistryServiceImpl implements RegistryService<EventListener>
private static final Pattern DEFAULT_SLB_REGISTRY_PATTERN = Pattern.compile("(?!.*internal)(?=.*seata).*mse.aliyuncs.com");
private static volatile Boolean useSLBWay;

private String transactionServiceGroup;

private NacosRegistryServiceImpl() {
String configForNacosSLB = FILE_CONFIG.getConfig(getNacosUrlPatternOfSLB());
Pattern patternOfNacosRegistryForSLB = StringUtils.isBlank(configForNacosSLB)
Expand Down Expand Up @@ -193,7 +195,7 @@ public List<InetSocketAddress> lookup(String key) throws Exception {
.collect(Collectors.toList());
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);

removeOfflineAddressesIfNecessary(clusterName, newAddressList);
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.seata.discovery.registry.namingserver;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.rmi.RemoteException;
Expand All @@ -41,22 +40,22 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ContentType;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.metadata.namingserver.MetaResponse;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.HttpClientUtil;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.common.util.HttpClientUtil;
import org.apache.seata.discovery.registry.RegistryService;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -322,17 +321,6 @@ public void unsubscribe(String vGroup) throws Exception {
isSubscribed = false;
}

@Override
public List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k -> new ArrayList<>());
}

@Override
public List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
List<InetSocketAddress> aliveAddress) {
return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress);
}

/**
* @param key vGroup name
* @return List<InetSocketAddress> available instance list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class RedisRegistryServiceImpl implements RegistryService<RedisListener>
private static final long KEY_TTL = 5L;
private static final long KEY_REFRESH_PERIOD = 2000L;

private String transactionServiceGroup;

private ScheduledExecutorService threadPoolExecutorForSubscribe = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("RedisRegistryService-subscribe", 1));
private ScheduledExecutorService threadPoolExecutorForUpdateMap = new ScheduledThreadPoolExecutor(1,
Expand Down Expand Up @@ -219,6 +221,7 @@ public void unsubscribe(String cluster, RedisListener listener) {

@Override
public List<InetSocketAddress> lookup(String key) {
transactionServiceGroup = key;
String clusterName = getServiceGroup(key);
if (clusterName == null) {
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
Expand Down Expand Up @@ -280,7 +283,7 @@ private void removeServerAddressByPushEmptyProtection(String notifyCluserName, S
}
socketAddresses.remove(inetSocketAddress);

removeOfflineAddressesIfNecessary(notifyCluserName, socketAddresses);
removeOfflineAddressesIfNecessary(transactionServiceGroup, notifyCluserName, socketAddresses);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class SofaRegistryServiceImpl implements RegistryService<SubscriberDataOb

private static volatile SofaRegistryServiceImpl instance;

private String transactionServiceGroup;

private SofaRegistryServiceImpl() {
}

Expand Down Expand Up @@ -159,6 +161,7 @@ public void unsubscribe(String cluster, SubscriberDataObserver listener) throws

@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
transactionServiceGroup = key;
String clusterName = getServiceGroup(key);
if (clusterName == null) {
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
Expand All @@ -174,7 +177,7 @@ public List<InetSocketAddress> lookup(String key) throws Exception {
List<InetSocketAddress> newAddressList = flatData(instances);
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);

removeOfflineAddressesIfNecessary(clusterName, newAddressList);
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList);
}
respondRegistries.countDown();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis
private static final int REGISTERED_PATH_SET_SIZE = 1;
private static final Set<String> REGISTERED_PATH_SET = Collections.synchronizedSet(new HashSet<>(REGISTERED_PATH_SET_SIZE));

private String transactionServiceGroup;

private ZookeeperRegisterServiceImpl() {
}

Expand Down Expand Up @@ -175,6 +177,7 @@ public void unsubscribe(String cluster, IZkChildListener listener) throws Except
*/
@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
transactionServiceGroup = key;
String clusterName = getServiceGroup(key);

if (clusterName == null) {
Expand Down Expand Up @@ -309,7 +312,7 @@ private void refreshClusterAddressMap(String clusterName, List<String> instances
}
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);

removeOfflineAddressesIfNecessary(clusterName, newAddressList);
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList);
}

private String getClusterName() {
Expand Down
Loading
Loading