From d7e780c39d453353ebde3b1c108c4dae5533a8ff Mon Sep 17 00:00:00 2001 From: funkye Date: Mon, 26 Aug 2024 21:11:13 +0800 Subject: [PATCH] bugfix: the issue where the TC occasionally fails to go offline from the NamingServer (#6781) --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + .../namingserver/entity/pojo/ClusterData.java | 8 +- .../namingserver/manager/NamingManager.java | 99 +++++++++---------- script/client/conf/registry.conf | 7 +- script/client/spring/application.properties | 3 + script/client/spring/application.yml | 4 + .../SeataCoreEnvironmentPostProcessor.java | 3 + .../boot/autoconfigure/StarterConstants.java | 2 + .../RegistryNamingServerProperties.java | 66 +++++++++++++ .../java/org/apache/seata/server/Server.java | 50 +++++----- .../org/apache/seata/server/ServerRunner.java | 6 +- .../DataBaseVGroupMappingStoreManager.java | 2 +- .../db/store/VGroupMappingDataBaseDAO.java | 3 +- .../store/RedisVGroupMappingStoreManager.java | 9 +- .../store/VGroupMappingStoreManager.java | 7 +- .../main/resources/application.example.yml | 6 +- 17 files changed, 186 insertions(+), 91 deletions(-) create mode 100644 seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/registry/RegistryNamingServerProperties.java diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index e9a57c4bf92..3de47dd8d0a 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -27,6 +27,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6769](https://github.com/apache/incubator-seata/pull/6769)] fix tcc fence deadLock - [[#6778](https://github.com/apache/incubator-seata/pull/6778)] fix namingserver node term - [[#6765](https://github.com/apache/incubator-seata/pull/6765)] fix MySQL driver loading by replacing custom classloader with system classloader for better compatibility and simplified process +- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] the issue where the TC occasionally fails to go offline from the NamingServer ### optimize: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index d38a48d2d81..2b440e474b2 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -28,6 +28,7 @@ - [[#6769](https://github.com/apache/incubator-seata/pull/6769)] 修复tcc fence死锁 - [[#6778](https://github.com/apache/incubator-seata/pull/6778)] 修复namingserver的节点term为0问题 - [[#6765](https://github.com/apache/incubator-seata/pull/6765)] 改进MySQL驱动加载机制,将自定义类加载器替换为系统类加载器,更兼容简化流程 +- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] 修复tc下线时,由于定时任务没有先关闭,导致下线后还会被注册上,需要靠namingserver的健康检查来下线的bug ### optimize: diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java b/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java index 250f8509388..971b768e40c 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java @@ -42,20 +42,20 @@ public class ClusterData { private String clusterType; private final Map unitData; - private Lock lock = new ReentrantLock(); + private final Lock lock = new ReentrantLock(); public ClusterData() { - unitData = new ConcurrentHashMap<>(32); + this.unitData = new ConcurrentHashMap<>(); } public ClusterData(String clusterName) { - unitData = new ConcurrentHashMap<>(32); + this.unitData = new ConcurrentHashMap<>(); this.clusterName = clusterName; } public ClusterData(String clusterName, String clusterType) { - unitData = new ConcurrentHashMap<>(32); + unitData = new ConcurrentHashMap<>(); this.clusterName = clusterName; this.clusterType = clusterType; } diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java index 09fa57d7519..bd89851b32e 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.ArrayList; @@ -167,47 +168,38 @@ public Result createGroup(String namespace, String vGroup, String cluste return new Result<>(String.valueOf(closeableHttpResponse.getStatusLine().getStatusCode()), "add vGroup in new cluster failed"); } + LOGGER.info("namespace: {} add vGroup: {} in new cluster: {} successfully!", namespace, vGroup, clusterName); } catch (IOException e) { LOGGER.warn("add vGroup in new cluster failed"); return new Result<>("500", "add vGroup in new cluster failed"); } } - addGroup(namespace,clusterName,unitName,vGroup); return new Result<>("200", "add vGroup successfully!"); } - public Result removeGroup(String namespace, String clusterName,String vGroup, String unitName) { - List clusterList = getClusterListByVgroup(vGroup, namespace); - for (Cluster cluster : clusterList) { - if (!StringUtils.equals(clusterName, cluster.getClusterName())) { - continue; - } - if (cluster.getUnitData() != null && cluster.getUnitData().size() > 0) { - Unit unit = cluster.getUnitData().get(0); - if (unit != null && unit.getNamingInstanceList() != null && unit.getNamingInstanceList().size() > 0) { - Node node = unit.getNamingInstanceList().get(0); - String httpUrl = NamingServerConstants.HTTP_PREFIX + node.getControl().getHost() - + NamingServerConstants.IP_PORT_SPLIT_CHAR + node.getControl().getPort() - + NamingServerConstants.HTTP_REMOVE_GROUP_SUFFIX; - HashMap params = new HashMap<>(); - params.put(CONSTANT_GROUP, vGroup); - params.put(NamingServerConstants.CONSTANT_UNIT, unitName); - Map header = new HashMap<>(); - header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.getMimeType()); - try (CloseableHttpResponse closeableHttpResponse = - HttpClientUtil.doGet(httpUrl, params, header, 3000)) { - if (closeableHttpResponse == null - || closeableHttpResponse.getStatusLine().getStatusCode() != 200) { - LOGGER.warn("remove vGroup in old cluster failed"); - return new Result<>(String.valueOf(closeableHttpResponse.getStatusLine().getStatusCode()), - "removing vGroup " + vGroup + " in old cluster " + cluster + " failed"); - } - } catch (IOException e) { - LOGGER.warn("handle removing vGroup in old cluster failed"); - return new Result<>("500", - "handle removing vGroup " + vGroup + " in old cluster " + cluster + " failed"); - } + public Result removeGroup(Unit unit, String vGroup, String clusterName, String namespace, String unitName) { + if (unit != null && !CollectionUtils.isEmpty(unit.getNamingInstanceList())) { + Node node = unit.getNamingInstanceList().get(0); + String httpUrl = NamingServerConstants.HTTP_PREFIX + node.getControl().getHost() + + NamingServerConstants.IP_PORT_SPLIT_CHAR + node.getControl().getPort() + + NamingServerConstants.HTTP_REMOVE_GROUP_SUFFIX; + HashMap params = new HashMap<>(); + params.put(CONSTANT_GROUP, vGroup); + params.put(NamingServerConstants.CONSTANT_UNIT, unitName); + Map header = new HashMap<>(); + header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.getMimeType()); + try (CloseableHttpResponse closeableHttpResponse = HttpClientUtil.doGet(httpUrl, params, header, 3000)) { + if (closeableHttpResponse == null || closeableHttpResponse.getStatusLine().getStatusCode() != 200) { + LOGGER.warn("remove vGroup in old cluster failed"); + return new Result<>(String.valueOf(closeableHttpResponse.getStatusLine().getStatusCode()), + "removing vGroup " + vGroup + " in old cluster " + clusterName + " failed"); } + LOGGER.info("namespace: {} remove vGroup: {} in new cluster: {} successfully!", namespace, vGroup, + clusterName); + } catch (IOException e) { + LOGGER.warn("handle removing vGroup in old cluster failed"); + return new Result<>("500", + "handle removing vGroup " + vGroup + " in old cluster " + clusterName + " failed"); } } return new Result<>("200", "remove group in old cluster successfully!"); @@ -248,20 +240,21 @@ public boolean registerInstance(NamingServerNode node, String namespace, String ClusterData clusterData = clusterDataHashMap.computeIfAbsent(clusterName, key -> new ClusterData(clusterName, (String)node.getMetadata().get("cluster-type"))); boolean hasChanged = clusterData.registerInstance(node, unitName); + Object mappingObj = node.getMetadata().get(CONSTANT_GROUP); // if extended metadata includes vgroup mapping relationship, add it in clusterData - Optional.ofNullable(node.getMetadata().get(CONSTANT_GROUP)).ifPresent(mappingObj -> { - if (mappingObj instanceof Map) { - Map vGroups = (Map) mappingObj; + if (mappingObj instanceof Map) { + Map vGroups = (Map)mappingObj; + if (!CollectionUtils.isEmpty(vGroups)) { vGroups.forEach((k, v) -> { // In non-raft mode, a unit is one-to-one with a node, and the unitName is stored on the node. // In raft mode, the unitName is equal to the raft-group, so the node's unitName cannot be used. addGroup(namespace, clusterName, StringUtils.isBlank(v) ? unitName : v, k); if (hasChanged) { - notifyClusterChange(k,namespace, clusterName, unitName,node.getTerm()); + notifyClusterChange(k, namespace, clusterName, unitName, node.getTerm()); } }); } - }); + } instanceLiveTable.put( new InetSocketAddress(node.getTransaction().getHost(), node.getTransaction().getPort()), System.currentTimeMillis()); @@ -367,24 +360,26 @@ public void instanceHeartBeatCheck() { } public Result changeGroup(String namespace, String vGroup, String clusterName, String unitName) { - ConcurrentMap namespaceMap = - new ConcurrentHashMap<>(vGroupMap.get(vGroup)); + ConcurrentMap namespaceMap = new ConcurrentHashMap<>(vGroupMap.get(vGroup)); + Set currentNamespaces = namespaceMap.keySet(); + Map> namespaceClusters = new HashMap<>(); + for (String currentNamespace : currentNamespaces) { + namespaceClusters.put(currentNamespace, + new HashSet<>(namespaceMap.get(currentNamespace).getClusterMap().keySet())); + } createGroup(namespace, vGroup, clusterName, unitName); AtomicReference> result = new AtomicReference<>(); - namespaceMap.forEach((currentNamespace, namespaceBO) -> namespaceBO.getClusterMap().forEach((currentCluster, clusterBO) -> { - for (String currentUnitName : clusterBO.getUnitNames()) { - if (StringUtils.isBlank(unitName)) { - if (StringUtils.equalsIgnoreCase(clusterName, currentCluster)) { - continue; - } - result.set(removeGroup(currentNamespace, currentCluster, vGroup, unitName)); - } else { - if (!StringUtils.equalsIgnoreCase(unitName, currentUnitName)) { - result.set(removeGroup(currentNamespace, currentCluster, vGroup, unitName)); - } - } + namespaceClusters.forEach((oldNamespace, clusters) -> { + for (String cluster : clusters) { + Optional.ofNullable(namespaceClusterDataMap.get(oldNamespace)) + .flatMap(map -> Optional.ofNullable(map.get(cluster))).ifPresent(clusterData -> { + if (!CollectionUtils.isEmpty(clusterData.getUnitData())) { + clusterData.getUnitData().forEach((unit, unitData) -> result + .set(removeGroup(unitData, vGroup, cluster, oldNamespace, unitName))); + } + }); } - })); + }); return Optional.ofNullable(result.get()).orElseGet(() -> new Result<>("200", "change vGroup successfully!")); } diff --git a/script/client/conf/registry.conf b/script/client/conf/registry.conf index 4a5329b4881..48f2cd60b46 100644 --- a/script/client/conf/registry.conf +++ b/script/client/conf/registry.conf @@ -16,7 +16,7 @@ # registry { - # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom、raft + # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom、raft、namingserver type = "file" raft { @@ -44,6 +44,11 @@ registry { ##if use Nacos naming meta-data for SLB service registry, specify nacos address pattern rules here #slbPattern = "" } + namingserver { + server-addr = "127.0.0.1:8081" + namespace = "public" + heartbeat-period = 5000 + } eureka { serviceUrl = "http://localhost:8761/eureka" weight = "1" diff --git a/script/client/spring/application.properties b/script/client/spring/application.properties index f74bcbb53bd..1715c373622 100755 --- a/script/client/spring/application.properties +++ b/script/client/spring/application.properties @@ -133,6 +133,9 @@ seata.registry.etcd3.server-addr=http://localhost:2379 seata.registry.eureka.weight=1 seata.registry.eureka.service-url=http://localhost:8761/eureka +seata.registry.namingserver.server-addr=127.0.0.1:8081 +seata.registry.namingserver.namespace=public +seata.registry.namingserver.heartbeat-period=5000 seata.registry.nacos.application=seata-server seata.registry.nacos.server-addr=127.0.0.1:8848 diff --git a/script/client/spring/application.yml b/script/client/spring/application.yml index 951002d5f86..e1a170c7645 100755 --- a/script/client/spring/application.yml +++ b/script/client/spring/application.yml @@ -131,6 +131,10 @@ seata: name: registry: type: file + namingserver: + server-addr: 127.0.0.1:8081 + namespace: public + heartbeat-period: 5000 raft: server-addr: metadata-max-age-ms: 30000 diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataCoreEnvironmentPostProcessor.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataCoreEnvironmentPostProcessor.java index df9478809d5..67bc7bd75ab 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataCoreEnvironmentPostProcessor.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataCoreEnvironmentPostProcessor.java @@ -34,6 +34,7 @@ import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryEtcd3Properties; import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryEurekaProperties; import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryNacosProperties; +import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryNamingServerProperties; import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryProperties; import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryRaftProperties; import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryRedisProperties; @@ -59,6 +60,7 @@ import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_ETCD3_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_EUREKA_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_NACOS_PREFIX; +import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_NAMINGSERVER_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_RAFT_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_REDIS_PREFIX; @@ -100,6 +102,7 @@ public static void init() { PROPERTY_BEAN_MAP.put(REGISTRY_ETCD3_PREFIX, RegistryEtcd3Properties.class); PROPERTY_BEAN_MAP.put(REGISTRY_EUREKA_PREFIX, RegistryEurekaProperties.class); PROPERTY_BEAN_MAP.put(REGISTRY_NACOS_PREFIX, RegistryNacosProperties.class); + PROPERTY_BEAN_MAP.put(REGISTRY_NAMINGSERVER_PREFIX, RegistryNamingServerProperties.class); PROPERTY_BEAN_MAP.put(REGISTRY_REDIS_PREFIX, RegistryRedisProperties.class); PROPERTY_BEAN_MAP.put(REGISTRY_SOFA_PREFIX, RegistrySofaProperties.class); PROPERTY_BEAN_MAP.put(REGISTRY_ZK_PREFIX, RegistryZooKeeperProperties.class); diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java index f5fe4fe6721..3d1d7dcce13 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java @@ -53,6 +53,7 @@ public interface StarterConstants { String REGISTRY_REDIS_PREFIX = REGISTRY_PREFIX + ".redis"; String REGISTRY_ZK_PREFIX = REGISTRY_PREFIX + ".zk"; String REGISTRY_CONSUL_PREFIX = REGISTRY_PREFIX + ".consul"; + String REGISTRY_NAMINGSERVER_PREFIX = REGISTRY_PREFIX + ".namingserver"; String REGISTRY_ETCD3_PREFIX = REGISTRY_PREFIX + ".etcd3"; String REGISTRY_SOFA_PREFIX = REGISTRY_PREFIX + ".sofa"; String REGISTRY_CUSTOM_PREFIX = REGISTRY_PREFIX + ".custom"; @@ -96,4 +97,5 @@ public interface StarterConstants { String SPECIAL_KEY_GROUPLIST = "grouplist"; String SPECIAL_KEY_SERVICE = "service"; String SPECIAL_KEY_VGROUP_MAPPING = "vgroupMapping"; + } diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/registry/RegistryNamingServerProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/registry/RegistryNamingServerProperties.java new file mode 100644 index 00000000000..86dc8577543 --- /dev/null +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/registry/RegistryNamingServerProperties.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.spring.boot.autoconfigure.properties.registry; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_NAMINGSERVER_PREFIX; + +@Component +@ConfigurationProperties(prefix = REGISTRY_NAMINGSERVER_PREFIX) +public class RegistryNamingServerProperties { + private String cluster = "default"; + private String serverAddr = "127.0.0.1:8081"; + private String namespace = "public"; + + private int heartbeatPeriod = 5000; + + public String getCluster() { + return cluster; + } + + public RegistryNamingServerProperties setCluster(String cluster) { + this.cluster = cluster; + return this; + } + + public String getServerAddr() { + return serverAddr; + } + + public RegistryNamingServerProperties setServerAddr(String serverAddr) { + this.serverAddr = serverAddr; + return this; + } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public int getHeartbeatPeriod() { + return heartbeatPeriod; + } + + public void setHeartbeatPeriod(int heartbeatPeriod) { + this.heartbeatPeriod = heartbeatPeriod; + } +} diff --git a/server/src/main/java/org/apache/seata/server/Server.java b/server/src/main/java/org/apache/seata/server/Server.java index 8289ded6b8d..4f4de537cca 100644 --- a/server/src/main/java/org/apache/seata/server/Server.java +++ b/server/src/main/java/org/apache/seata/server/Server.java @@ -24,6 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import javax.annotation.Resource; import org.apache.seata.common.XID; import org.apache.seata.common.holder.ObjectHolder; import org.apache.seata.common.metadata.Node; @@ -41,6 +42,8 @@ import org.apache.seata.server.session.SessionHolder; import org.apache.seata.server.store.StoreConfig; import org.apache.seata.server.store.VGroupMappingStoreManager; +import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryNamingServerProperties; +import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; @@ -48,15 +51,11 @@ import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.EnumerablePropertySource; import org.springframework.core.env.PropertySource; +import org.springframework.stereotype.Component; import org.springframework.web.context.support.GenericWebApplicationContext; -import static org.apache.seata.common.ConfigurationKeys.CLUSTER_NAME_KEY; -import static org.apache.seata.common.ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR; -import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_REGISTRY; -import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_TYPE; import static org.apache.seata.common.ConfigurationKeys.META_PREFIX; -import static org.apache.seata.common.ConfigurationKeys.NAMESPACE_KEY; import static org.apache.seata.common.ConfigurationKeys.NAMING_SERVER; import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_APPLICATION_CONTEXT; import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT; @@ -66,24 +65,31 @@ /** * The type Server. */ +@Component("seataServer") public class Server { private static final Logger LOGGER = LoggerFactory.getLogger(Server.class); - protected static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("scheduledExcuter", 1, true)); + protected static volatile ScheduledExecutorService EXECUTOR_SERVICE; - public static void metadataInit() { + @Resource + RegistryNamingServerProperties registryNamingServerProperties; + + @Resource + RegistryProperties registryProperties; + + public void metadataInit() { VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager(); - if (StringUtils.equals(ConfigurationFactory.getInstance().getConfig(FILE_ROOT_REGISTRY - + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_TYPE), NAMING_SERVER)) { + if (StringUtils.equals(registryProperties.getType(), NAMING_SERVER)) { + EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("scheduledExcuter", 1, true)); ConfigurableEnvironment environment = (ConfigurableEnvironment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT); // load node properties Instance instance = Instance.getInstance(); // load namespace - String namespace = environment.getProperty(NAMESPACE_KEY, "public"); + String namespace = registryNamingServerProperties.getNamespace(); instance.setNamespace(namespace); // load cluster name - String clusterName = environment.getProperty(CLUSTER_NAME_KEY, "default"); + String clusterName = registryNamingServerProperties.getCluster(); instance.setClusterName(clusterName); // load cluster type @@ -109,17 +115,18 @@ public static void metadataInit() { } } } - // load vgroup mapping relationship instance.addMetadata("vGroup", vGroupMappingStoreManager.loadVGroups()); + + EXECUTOR_SERVICE.scheduleAtFixedRate(() -> { + try { + vGroupMappingStoreManager.notifyMapping(); + } catch (Exception e) { + LOGGER.error("Naming server register Exception", e); + } + }, registryNamingServerProperties.getHeartbeatPeriod(), registryNamingServerProperties.getHeartbeatPeriod(), TimeUnit.MILLISECONDS); + ServerRunner.addDisposable(EXECUTOR_SERVICE::shutdown); } - EXECUTOR_SERVICE.scheduleAtFixedRate(() -> { - try { - vGroupMappingStoreManager.notifyMapping(); - } catch (Exception e) { - LOGGER.error("Naming server register Exception", e); - } - }, 0, 5000, TimeUnit.MILLISECONDS); } @@ -128,7 +135,7 @@ public static void metadataInit() { * * @param args the input arguments */ - public static void start(String[] args) { + public void start(String[] args) { //initialize the parameter parser //Note that the parameter parser should always be the first line to execute. //Because, here we need to parse the parameters needed for startup. @@ -172,10 +179,9 @@ public static void start(String[] args) { coordinator.init(); nettyRemotingServer.setHandler(coordinator); + metadataInit(); // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028 ServerRunner.addDisposable(coordinator); - metadataInit(); - nettyRemotingServer.init(); } } diff --git a/server/src/main/java/org/apache/seata/server/ServerRunner.java b/server/src/main/java/org/apache/seata/server/ServerRunner.java index 437f427895e..a48c7379fdf 100644 --- a/server/src/main/java/org/apache/seata/server/ServerRunner.java +++ b/server/src/main/java/org/apache/seata/server/ServerRunner.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import javax.annotation.Resource; import org.apache.seata.core.rpc.Disposable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,11 +54,14 @@ public static void addDisposable(Disposable disposable) { DISPOSABLE_LIST.add(disposable); } + @Resource + Server seataServer; + @Override public void run(String... args) { try { long start = System.currentTimeMillis(); - Server.start(args); + seataServer.start(args); started = true; long cost = System.currentTimeMillis() - start; diff --git a/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java index 6389168daef..d83ecf34c37 100644 --- a/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java +++ b/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java @@ -16,12 +16,12 @@ */ package org.apache.seata.server.storage.db.store; +import org.apache.seata.common.ConfigurationKeys; import org.apache.seata.common.loader.EnhancedServiceLoader; import org.apache.seata.common.loader.LoadLevel; import org.apache.seata.common.metadata.namingserver.Instance; import org.apache.seata.config.Configuration; import org.apache.seata.config.ConfigurationFactory; -import org.apache.seata.core.constants.ConfigurationKeys; import org.apache.seata.core.store.MappingDO; import org.apache.seata.core.store.db.DataSourceProvider; import org.apache.seata.server.store.VGroupMappingStoreManager; diff --git a/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java b/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java index ed2d68d6354..84fea262d57 100644 --- a/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java +++ b/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java @@ -42,7 +42,7 @@ public class VGroupMappingDataBaseDAO { private static final Logger LOGGER = LoggerFactory.getLogger(VGroupMappingDataBaseDAO.class); - protected DataSource vGroupMappingDataSource = null; + protected DataSource vGroupMappingDataSource; protected final String vMapping; @@ -98,7 +98,6 @@ public boolean deleteMappingDOByVGroup(String vGroup) { PreparedStatement ps = null; try { conn = vGroupMappingDataSource.getConnection(); - conn.setAutoCommit(true); ps = conn.prepareStatement(sql); ps.setString(1, vGroup); ps.setString(2, instance.getClusterName()); diff --git a/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java index 470f39cde43..1367ad281f3 100644 --- a/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java +++ b/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java @@ -51,8 +51,13 @@ public boolean removeVGroup(String vGroup) { Instance instance = Instance.getInstance(); String namespace = REDIS_PREFIX + instance.getNamespace(); try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { - jedis.hdel(namespace, vGroup); - return true; + String currentVgroup = jedis.hget(namespace, vGroup); + if (StringUtils.equalsIgnoreCase(currentVgroup, instance.getClusterName())) { + jedis.hdel(namespace, vGroup); + return true; + } else { + return false; + } } catch (Exception ex) { throw new RedisException(ex); } diff --git a/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java index cf45795a7b8..d4ce6a5412f 100644 --- a/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java +++ b/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java @@ -24,6 +24,7 @@ import java.net.InetSocketAddress; import java.util.HashMap; +import java.util.Map; public interface VGroupMappingStoreManager { /** @@ -55,12 +56,12 @@ default HashMap readVGroups() { * notify mapping relationship to all namingserver nodes */ default void notifyMapping() { - Instance instance = Instance.getInstance(); - instance.addMetadata("vGroup", this.readVGroups()); + Map map = this.readVGroups(); + instance.addMetadata("vGroup", map); try { InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort()); - for (RegistryService registryService : MultiRegistryFactory.getInstances()) { + for (RegistryService registryService : MultiRegistryFactory.getInstances()) { registryService.register(address); } } catch (Exception e) { diff --git a/server/src/main/resources/application.example.yml b/server/src/main/resources/application.example.yml index 0a7897ab930..09360ccbaf7 100644 --- a/server/src/main/resources/application.example.yml +++ b/server/src/main/resources/application.example.yml @@ -80,12 +80,12 @@ seata: type: file preferred-networks: 30.240.* metadata: - weight: 1 + weight: 100 namingserver: - server-addr: 127.0.0.1:8080 + server-addr: 127.0.0.1:8081 cluster: default namespace: public - heartbeat-period: 1000 + heartbeat-period: 5000 nacos: application: seata-server server-addr: 127.0.0.1:8848