Skip to content

Commit

Permalink
bugfix: the issue where the TC occasionally fails to go offline from …
Browse files Browse the repository at this point in the history
…the NamingServer (apache#6781)
  • Loading branch information
funky-eyes committed Aug 26, 2024
1 parent 8af2d84 commit d7e780c
Show file tree
Hide file tree
Showing 17 changed files with 186 additions and 91 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6769](https://github.com/apache/incubator-seata/pull/6769)] fix tcc fence deadLock
- [[#6778](https://github.com/apache/incubator-seata/pull/6778)] fix namingserver node term
- [[#6765](https://github.com/apache/incubator-seata/pull/6765)] fix MySQL driver loading by replacing custom classloader with system classloader for better compatibility and simplified process
- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] the issue where the TC occasionally fails to go offline from the NamingServer


### optimize:
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
- [[#6769](https://github.com/apache/incubator-seata/pull/6769)] 修复tcc fence死锁
- [[#6778](https://github.com/apache/incubator-seata/pull/6778)] 修复namingserver的节点term为0问题
- [[#6765](https://github.com/apache/incubator-seata/pull/6765)] 改进MySQL驱动加载机制,将自定义类加载器替换为系统类加载器,更兼容简化流程
- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] 修复tc下线时,由于定时任务没有先关闭,导致下线后还会被注册上,需要靠namingserver的健康检查来下线的bug


### optimize:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ public class ClusterData {
private String clusterType;
private final Map<String, Unit> unitData;

private Lock lock = new ReentrantLock();
private final Lock lock = new ReentrantLock();


public ClusterData() {
unitData = new ConcurrentHashMap<>(32);
this.unitData = new ConcurrentHashMap<>();
}

public ClusterData(String clusterName) {
unitData = new ConcurrentHashMap<>(32);
this.unitData = new ConcurrentHashMap<>();
this.clusterName = clusterName;
}

public ClusterData(String clusterName, String clusterType) {
unitData = new ConcurrentHashMap<>(32);
unitData = new ConcurrentHashMap<>();
this.clusterName = clusterName;
this.clusterType = clusterType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
Expand Down Expand Up @@ -167,47 +168,38 @@ public Result<String> createGroup(String namespace, String vGroup, String cluste
return new Result<>(String.valueOf(closeableHttpResponse.getStatusLine().getStatusCode()),
"add vGroup in new cluster failed");
}
LOGGER.info("namespace: {} add vGroup: {} in new cluster: {} successfully!", namespace, vGroup, clusterName);
} catch (IOException e) {
LOGGER.warn("add vGroup in new cluster failed");
return new Result<>("500", "add vGroup in new cluster failed");
}
}
addGroup(namespace,clusterName,unitName,vGroup);
return new Result<>("200", "add vGroup successfully!");
}

public Result<String> removeGroup(String namespace, String clusterName,String vGroup, String unitName) {
List<Cluster> clusterList = getClusterListByVgroup(vGroup, namespace);
for (Cluster cluster : clusterList) {
if (!StringUtils.equals(clusterName, cluster.getClusterName())) {
continue;
}
if (cluster.getUnitData() != null && cluster.getUnitData().size() > 0) {
Unit unit = cluster.getUnitData().get(0);
if (unit != null && unit.getNamingInstanceList() != null && unit.getNamingInstanceList().size() > 0) {
Node node = unit.getNamingInstanceList().get(0);
String httpUrl = NamingServerConstants.HTTP_PREFIX + node.getControl().getHost()
+ NamingServerConstants.IP_PORT_SPLIT_CHAR + node.getControl().getPort()
+ NamingServerConstants.HTTP_REMOVE_GROUP_SUFFIX;
HashMap<String, String> params = new HashMap<>();
params.put(CONSTANT_GROUP, vGroup);
params.put(NamingServerConstants.CONSTANT_UNIT, unitName);
Map<String, String> header = new HashMap<>();
header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
try (CloseableHttpResponse closeableHttpResponse =
HttpClientUtil.doGet(httpUrl, params, header, 3000)) {
if (closeableHttpResponse == null
|| closeableHttpResponse.getStatusLine().getStatusCode() != 200) {
LOGGER.warn("remove vGroup in old cluster failed");
return new Result<>(String.valueOf(closeableHttpResponse.getStatusLine().getStatusCode()),
"removing vGroup " + vGroup + " in old cluster " + cluster + " failed");
}
} catch (IOException e) {
LOGGER.warn("handle removing vGroup in old cluster failed");
return new Result<>("500",
"handle removing vGroup " + vGroup + " in old cluster " + cluster + " failed");
}
public Result<String> removeGroup(Unit unit, String vGroup, String clusterName, String namespace, String unitName) {
if (unit != null && !CollectionUtils.isEmpty(unit.getNamingInstanceList())) {
Node node = unit.getNamingInstanceList().get(0);
String httpUrl = NamingServerConstants.HTTP_PREFIX + node.getControl().getHost()
+ NamingServerConstants.IP_PORT_SPLIT_CHAR + node.getControl().getPort()
+ NamingServerConstants.HTTP_REMOVE_GROUP_SUFFIX;
HashMap<String, String> params = new HashMap<>();
params.put(CONSTANT_GROUP, vGroup);
params.put(NamingServerConstants.CONSTANT_UNIT, unitName);
Map<String, String> header = new HashMap<>();
header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
try (CloseableHttpResponse closeableHttpResponse = HttpClientUtil.doGet(httpUrl, params, header, 3000)) {
if (closeableHttpResponse == null || closeableHttpResponse.getStatusLine().getStatusCode() != 200) {
LOGGER.warn("remove vGroup in old cluster failed");
return new Result<>(String.valueOf(closeableHttpResponse.getStatusLine().getStatusCode()),
"removing vGroup " + vGroup + " in old cluster " + clusterName + " failed");
}
LOGGER.info("namespace: {} remove vGroup: {} in new cluster: {} successfully!", namespace, vGroup,
clusterName);
} catch (IOException e) {
LOGGER.warn("handle removing vGroup in old cluster failed");
return new Result<>("500",
"handle removing vGroup " + vGroup + " in old cluster " + clusterName + " failed");
}
}
return new Result<>("200", "remove group in old cluster successfully!");
Expand Down Expand Up @@ -248,20 +240,21 @@ public boolean registerInstance(NamingServerNode node, String namespace, String
ClusterData clusterData = clusterDataHashMap.computeIfAbsent(clusterName,
key -> new ClusterData(clusterName, (String)node.getMetadata().get("cluster-type")));
boolean hasChanged = clusterData.registerInstance(node, unitName);
Object mappingObj = node.getMetadata().get(CONSTANT_GROUP);
// if extended metadata includes vgroup mapping relationship, add it in clusterData
Optional.ofNullable(node.getMetadata().get(CONSTANT_GROUP)).ifPresent(mappingObj -> {
if (mappingObj instanceof Map) {
Map<String, String> vGroups = (Map<String, String>) mappingObj;
if (mappingObj instanceof Map) {
Map<String, String> vGroups = (Map<String, String>)mappingObj;
if (!CollectionUtils.isEmpty(vGroups)) {
vGroups.forEach((k, v) -> {
// In non-raft mode, a unit is one-to-one with a node, and the unitName is stored on the node.
// In raft mode, the unitName is equal to the raft-group, so the node's unitName cannot be used.
addGroup(namespace, clusterName, StringUtils.isBlank(v) ? unitName : v, k);
if (hasChanged) {
notifyClusterChange(k,namespace, clusterName, unitName,node.getTerm());
notifyClusterChange(k, namespace, clusterName, unitName, node.getTerm());
}
});
}
});
}
instanceLiveTable.put(
new InetSocketAddress(node.getTransaction().getHost(), node.getTransaction().getPort()),
System.currentTimeMillis());
Expand Down Expand Up @@ -367,24 +360,26 @@ public void instanceHeartBeatCheck() {
}

public Result<String> changeGroup(String namespace, String vGroup, String clusterName, String unitName) {
ConcurrentMap<String, NamespaceBO> namespaceMap =
new ConcurrentHashMap<>(vGroupMap.get(vGroup));
ConcurrentMap<String, NamespaceBO> namespaceMap = new ConcurrentHashMap<>(vGroupMap.get(vGroup));
Set<String> currentNamespaces = namespaceMap.keySet();
Map<String, Set<String>> namespaceClusters = new HashMap<>();
for (String currentNamespace : currentNamespaces) {
namespaceClusters.put(currentNamespace,
new HashSet<>(namespaceMap.get(currentNamespace).getClusterMap().keySet()));
}
createGroup(namespace, vGroup, clusterName, unitName);
AtomicReference<Result<String>> result = new AtomicReference<>();
namespaceMap.forEach((currentNamespace, namespaceBO) -> namespaceBO.getClusterMap().forEach((currentCluster, clusterBO) -> {
for (String currentUnitName : clusterBO.getUnitNames()) {
if (StringUtils.isBlank(unitName)) {
if (StringUtils.equalsIgnoreCase(clusterName, currentCluster)) {
continue;
}
result.set(removeGroup(currentNamespace, currentCluster, vGroup, unitName));
} else {
if (!StringUtils.equalsIgnoreCase(unitName, currentUnitName)) {
result.set(removeGroup(currentNamespace, currentCluster, vGroup, unitName));
}
}
namespaceClusters.forEach((oldNamespace, clusters) -> {
for (String cluster : clusters) {
Optional.ofNullable(namespaceClusterDataMap.get(oldNamespace))
.flatMap(map -> Optional.ofNullable(map.get(cluster))).ifPresent(clusterData -> {
if (!CollectionUtils.isEmpty(clusterData.getUnitData())) {
clusterData.getUnitData().forEach((unit, unitData) -> result
.set(removeGroup(unitData, vGroup, cluster, oldNamespace, unitName)));
}
});
}
}));
});
return Optional.ofNullable(result.get()).orElseGet(() -> new Result<>("200", "change vGroup successfully!"));
}

Expand Down
7 changes: 6 additions & 1 deletion script/client/conf/registry.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom、raft
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom、raft、namingserver
type = "file"

raft {
Expand Down Expand Up @@ -44,6 +44,11 @@ registry {
##if use Nacos naming meta-data for SLB service registry, specify nacos address pattern rules here
#slbPattern = ""
}
namingserver {
server-addr = "127.0.0.1:8081"
namespace = "public"
heartbeat-period = 5000
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
weight = "1"
Expand Down
3 changes: 3 additions & 0 deletions script/client/spring/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ seata.registry.etcd3.server-addr=http://localhost:2379

seata.registry.eureka.weight=1
seata.registry.eureka.service-url=http://localhost:8761/eureka
seata.registry.namingserver.server-addr=127.0.0.1:8081
seata.registry.namingserver.namespace=public
seata.registry.namingserver.heartbeat-period=5000

seata.registry.nacos.application=seata-server
seata.registry.nacos.server-addr=127.0.0.1:8848
Expand Down
4 changes: 4 additions & 0 deletions script/client/spring/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ seata:
name:
registry:
type: file
namingserver:
server-addr: 127.0.0.1:8081
namespace: public
heartbeat-period: 5000
raft:
server-addr:
metadata-max-age-ms: 30000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryEtcd3Properties;
import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryEurekaProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryNacosProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryNamingServerProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryRaftProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryRedisProperties;
Expand All @@ -59,6 +60,7 @@
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_ETCD3_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_EUREKA_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_NACOS_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_NAMINGSERVER_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_RAFT_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_REDIS_PREFIX;
Expand Down Expand Up @@ -100,6 +102,7 @@ public static void init() {
PROPERTY_BEAN_MAP.put(REGISTRY_ETCD3_PREFIX, RegistryEtcd3Properties.class);
PROPERTY_BEAN_MAP.put(REGISTRY_EUREKA_PREFIX, RegistryEurekaProperties.class);
PROPERTY_BEAN_MAP.put(REGISTRY_NACOS_PREFIX, RegistryNacosProperties.class);
PROPERTY_BEAN_MAP.put(REGISTRY_NAMINGSERVER_PREFIX, RegistryNamingServerProperties.class);
PROPERTY_BEAN_MAP.put(REGISTRY_REDIS_PREFIX, RegistryRedisProperties.class);
PROPERTY_BEAN_MAP.put(REGISTRY_SOFA_PREFIX, RegistrySofaProperties.class);
PROPERTY_BEAN_MAP.put(REGISTRY_ZK_PREFIX, RegistryZooKeeperProperties.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public interface StarterConstants {
String REGISTRY_REDIS_PREFIX = REGISTRY_PREFIX + ".redis";
String REGISTRY_ZK_PREFIX = REGISTRY_PREFIX + ".zk";
String REGISTRY_CONSUL_PREFIX = REGISTRY_PREFIX + ".consul";
String REGISTRY_NAMINGSERVER_PREFIX = REGISTRY_PREFIX + ".namingserver";
String REGISTRY_ETCD3_PREFIX = REGISTRY_PREFIX + ".etcd3";
String REGISTRY_SOFA_PREFIX = REGISTRY_PREFIX + ".sofa";
String REGISTRY_CUSTOM_PREFIX = REGISTRY_PREFIX + ".custom";
Expand Down Expand Up @@ -96,4 +97,5 @@ public interface StarterConstants {
String SPECIAL_KEY_GROUPLIST = "grouplist";
String SPECIAL_KEY_SERVICE = "service";
String SPECIAL_KEY_VGROUP_MAPPING = "vgroupMapping";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.spring.boot.autoconfigure.properties.registry;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_NAMINGSERVER_PREFIX;

@Component
@ConfigurationProperties(prefix = REGISTRY_NAMINGSERVER_PREFIX)
public class RegistryNamingServerProperties {
private String cluster = "default";
private String serverAddr = "127.0.0.1:8081";
private String namespace = "public";

private int heartbeatPeriod = 5000;

public String getCluster() {
return cluster;
}

public RegistryNamingServerProperties setCluster(String cluster) {
this.cluster = cluster;
return this;
}

public String getServerAddr() {
return serverAddr;
}

public RegistryNamingServerProperties setServerAddr(String serverAddr) {
this.serverAddr = serverAddr;
return this;
}

public String getNamespace() {
return namespace;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
}

public int getHeartbeatPeriod() {
return heartbeatPeriod;
}

public void setHeartbeatPeriod(int heartbeatPeriod) {
this.heartbeatPeriod = heartbeatPeriod;
}
}
Loading

0 comments on commit d7e780c

Please sign in to comment.