Skip to content

Commit

Permalink
optimize: the namingserver code to improve readability (apache#6761)
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes committed Aug 18, 2024
1 parent f7c3866 commit b7fc0c1
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 54 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6748](https://github.com/apache/incubator-seata/pull/6748)] optimize ConsistentHashLoadBalance Algorithm
- [[#6747](https://github.com/apache/incubator-seata/pull/6747)] optimize fastjson deserialization
- [[#6755](https://github.com/apache/incubator-seata/pull/6755)] optimize namingserver code logic
- [[#6761](https://github.com/apache/incubator-seata/pull/6761)] optimize the namingserver code to improve readability


### refactor:
Expand Down
2 changes: 1 addition & 1 deletion changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
- [[#6748](https://github.com/apache/incubator-seata/pull/6748)] 优化 ConsistentHashLoadBalance 算法
- [[#6747](https://github.com/apache/incubator-seata/pull/6747)] 优化 fastjson 反序列化
- [[#6755](https://github.com/apache/incubator-seata/pull/6755)] 优化namingserver代码逻辑

- [[#6761](https://github.com/apache/incubator-seata/pull/6761)] 提升namingserver manager代码可读性

### refactor:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package org.apache.seata.common.metadata;

import org.apache.seata.common.metadata.namingserver.Unit;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.apache.seata.common.metadata.namingserver.Unit;

public class Cluster {
private String clusterName;
private String clusterType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.namingserver.entity.bo;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class ClusterBO {

Set<String> unitNames = ConcurrentHashMap.newKeySet();

public ClusterBO(Set<String> unitNames) {
this.unitNames = unitNames;
}

public ClusterBO() {
}

public Set<String> getUnitNames() {
return unitNames;
}

public void setUnitNames(Set<String> unitNames) {
this.unitNames = unitNames;
}

public void remove(String realUnitName) {
this.unitNames.remove(realUnitName);
}

public void addUnit(String unitName) {
this.unitNames.add(unitName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.namingserver.entity.bo;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.seata.common.metadata.Cluster;
import org.apache.seata.namingserver.entity.pojo.ClusterData;

public class NamespaceBO {

Map<String, ClusterBO> clusterMap = new ConcurrentHashMap<>();

public NamespaceBO() {
}

public Map<String, ClusterBO> getClusterMap() {
return clusterMap;
}

public List<Cluster> getCluster(ConcurrentMap<String/* clusterName */, ClusterData> clusterDataMap) {
List<Cluster> list = new ArrayList<>();
clusterMap.forEach((clusterName, unitNameSet) -> {
ClusterData clusterData = clusterDataMap.get(clusterName);
if (clusterData != null) {
list.add(clusterData.getClusterByUnits(unitNameSet.getUnitNames()));
}
});
return list;
}

public void setClusterMap(Map<String, ClusterBO> clusterMap) {
this.clusterMap = clusterMap;
}

public ClusterBO getCluster(String clusterName) {
return clusterMap.computeIfAbsent(clusterName, k -> new ClusterBO());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.seata.common.util.HttpClientUtil;
import org.apache.seata.common.NamingServerConstants;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.namingserver.entity.bo.ClusterBO;
import org.apache.seata.namingserver.entity.bo.NamespaceBO;
import org.apache.seata.namingserver.listener.ClusterChangeEvent;
import org.apache.seata.namingserver.entity.pojo.ClusterData;
import org.apache.seata.namingserver.entity.vo.monitor.ClusterVO;
Expand All @@ -63,8 +65,7 @@
public class NamingManager {
private static final Logger LOGGER = LoggerFactory.getLogger(NamingManager.class);
private final ConcurrentMap<InetSocketAddress, Long> instanceLiveTable;
private final ConcurrentMap<String/* VGroup */, ConcurrentMap<String/* namespace */,
ConcurrentMap<String/* clusterName */, Set<String>/* unitName */>>> vGroupMap;
private final ConcurrentMap<String/* VGroup */, ConcurrentMap<String/* namespace */, NamespaceBO>> vGroupMap;
private final ConcurrentMap<String/* namespace */,
ConcurrentMap<String/* clusterName */, ClusterData>> namespaceClusterDataMap;

Expand Down Expand Up @@ -111,19 +112,17 @@ public List<ClusterVO> monitorCluster(String namespace) {
LOGGER.warn("no cluster in namespace:" + namespace);
}

for (Map.Entry<String, ConcurrentMap<String, ConcurrentMap<String, Set<String>>>> entry : vGroupMap
.entrySet()) {
String vGroup = entry.getKey();
ConcurrentMap<String, ConcurrentMap<String, Set<String>>> namespaceMap = entry.getValue();
ConcurrentMap<String, Set<String>> pair = namespaceMap.get(namespace);
pair.keySet().stream().findFirst().ifPresent(clusterName -> {
ClusterVO clusterVO = clusterVOHashMap.get(clusterName);
if (clusterVO != null) {
clusterVO.addMapping(vGroup);
}
});
}

vGroupMap.forEach((vGroup, namespaceMap) -> {
NamespaceBO namespaceBO = namespaceMap.get(namespace);
if (namespaceBO != null) {
namespaceBO.getClusterMap().forEach((clusterName, clusterBO) -> {
ClusterVO clusterVO = clusterVOHashMap.get(clusterName);
if (clusterVO != null) {
clusterVO.addMapping(vGroup);
}
});
}
});
return new ArrayList<>(clusterVOHashMap.values());
}

Expand Down Expand Up @@ -195,11 +194,10 @@ public Result<String> removeGroup(String namespace, String clusterName,String vG

public void addGroup(String namespace, String clusterName, String unitName, String vGroup) {
try {
Set<String> units = vGroupMap.computeIfAbsent(vGroup, k -> new ConcurrentHashMap<>())
.computeIfAbsent(namespace, k -> new ConcurrentHashMap<>())
.computeIfAbsent(clusterName, k -> ConcurrentHashMap.newKeySet());
if (!units.contains(unitName)) {
units.add(unitName);
ClusterBO clusterBO = vGroupMap.computeIfAbsent(vGroup, k -> new ConcurrentHashMap<>())
.computeIfAbsent(namespace, k -> new NamespaceBO()).getCluster(clusterName);
if (clusterBO != null && !clusterBO.getUnitNames().contains(unitName)) {
clusterBO.addUnit(unitName);
applicationContext.publishEvent(new ClusterChangeEvent(this, vGroup, System.currentTimeMillis()));
}
} catch (Exception e) {
Expand All @@ -208,17 +206,16 @@ public void addGroup(String namespace, String clusterName, String unitName, Stri
}

public void notifyClusterChange(String namespace, String clusterName, String unitName, long term) {
for (Map.Entry<String, ConcurrentMap<String, ConcurrentMap<String, Set<String>>>> entry : vGroupMap
.entrySet()) {
String vGroup = entry.getKey();
Map<String, ConcurrentMap<String, Set<String>>> namespaceMap = entry.getValue();
Optional.ofNullable(namespaceMap.get(namespace)).flatMap(pair -> Optional.ofNullable(pair.get(clusterName)))
.ifPresent(unitSet -> {
if (StringUtils.isBlank(unitName) || unitSet.contains(unitName)) {
vGroupMap.forEach((vGroup, namespaceMap) -> {
Optional.ofNullable(namespaceMap.get(namespace))
.flatMap(namespaceBO -> Optional.ofNullable(namespaceBO.getCluster(clusterName)))
.ifPresent(clusterBO -> {
Set<String> units = clusterBO.getUnitNames();
if (StringUtils.isBlank(unitName) || units.contains(unitName)) {
applicationContext.publishEvent(new ClusterChangeEvent(this, vGroup, term));
}
});
}
});
}

public boolean registerInstance(NamingServerNode node, String namespace, String clusterName, String unitName) {
Expand All @@ -234,11 +231,11 @@ public boolean registerInstance(NamingServerNode node, String namespace, String
// if extended metadata includes vgroup mapping relationship, add it in clusterData
Optional.ofNullable(node.getMetadata().get(CONSTANT_GROUP)).ifPresent(mappingObj -> {
if (mappingObj instanceof Map) {
Map<String, Object> vGroups = (Map<String, Object>) mappingObj;
Map<String, String> vGroups = (Map<String, String>) mappingObj;
vGroups.forEach((k, v) -> {
// In non-raft mode, a unit is one-to-one with a node, and the unitName is stored on the node.
// In raft mode, the unitName is equal to the raft-group, so the node's unitName cannot be used.
addGroup(namespace, clusterName, v == null ? unitName : (String)v, k);
addGroup(namespace, clusterName, StringUtils.isBlank(v) ? unitName : v, k);
});
}
});
Expand Down Expand Up @@ -267,7 +264,7 @@ public boolean unregisterInstance(String namespace, String clusterName, String u
Object vgroupMap = node.getMetadata().get(CONSTANT_GROUP);
if (vgroupMap instanceof Map) {
((Map<String, Object>)vgroupMap).forEach((group, realUnitName) -> vGroupMap.get(group)
.get(namespace).get(clusterName).remove(realUnitName));
.get(namespace).getCluster(clusterName).remove((String) realUnitName));
}
notifyClusterChange(namespace, clusterName, unitName, node.getTerm());
instanceLiveTable.remove(
Expand All @@ -283,20 +280,14 @@ public boolean unregisterInstance(String namespace, String clusterName, String u

public List<Cluster> getClusterListByVgroup(String vGroup, String namespace) {
// find the cluster where the transaction group is located
ConcurrentMap<String/* namespace */,
ConcurrentMap<String/* clusterName */, Set<String>/* unitName */>> vgroupNamespaceMap =
ConcurrentMap<String/* namespace */, NamespaceBO> vgroupNamespaceMap =
vGroupMap.get(vGroup);
List<Cluster> clusterList = new ArrayList<>();
if (!CollectionUtils.isEmpty(vgroupNamespaceMap)) {
ConcurrentMap<String, Set<String>> clusterUnitPair = vgroupNamespaceMap.get(namespace);
NamespaceBO namespaceBO = vgroupNamespaceMap.get(namespace);
ConcurrentMap<String/* clusterName */, ClusterData> clusterDataMap = namespaceClusterDataMap.get(namespace);
if (clusterUnitPair != null && !CollectionUtils.isEmpty(clusterDataMap)) {
clusterUnitPair.forEach((clusterName, unitNameSet) -> {
ClusterData clusterData = clusterDataMap.get(clusterName);
if (clusterData != null) {
clusterList.add(clusterData.getClusterByUnits(unitNameSet));
}
});
if (namespaceBO != null && !CollectionUtils.isEmpty(clusterDataMap)) {
clusterList.addAll(namespaceBO.getCluster(clusterDataMap));
}
}
return clusterList;
Expand Down Expand Up @@ -333,9 +324,10 @@ public void instanceHeartBeatCheck() {
Object vgoupMap = instance.getMetadata().get(CONSTANT_GROUP);
if (vgoupMap instanceof Map) {
((Map<String, Object>)vgoupMap).forEach((group, unitName) -> {
Set<String> units =
vGroupMap.get(group).get(namespace).get(clusterData.getClusterName());
units.remove(unitName);
ClusterBO clusterBO =
vGroupMap.get(group).get(namespace).getCluster(clusterData.getClusterName());
Set<String> units = clusterBO.getUnitNames();
units.remove((String)unitName);
});
}

Expand All @@ -350,12 +342,12 @@ public void instanceHeartBeatCheck() {
}

public Result<String> changeGroup(String namespace, String vGroup, String clusterName, String unitName) {
ConcurrentMap<String, ConcurrentMap<String, Set<String>>> namespaceMap =
ConcurrentMap<String, NamespaceBO> namespaceMap =
new ConcurrentHashMap<>(vGroupMap.get(vGroup));
createGroup(namespace, vGroup, clusterName, unitName);
AtomicReference<Result<String>> result = new AtomicReference<>();
namespaceMap.forEach((currentNamespace, clusterMap) -> clusterMap.forEach((currentCluster, unitSet) -> {
for (String currentUnitName : unitSet) {
namespaceMap.forEach((currentNamespace, namespaceBO) -> namespaceBO.getClusterMap().forEach((currentCluster, clusterBO) -> {
for (String currentUnitName : clusterBO.getUnitNames()) {
if (StringUtils.isBlank(unitName)) {
if (StringUtils.equalsIgnoreCase(clusterName, currentCluster)) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -106,7 +104,7 @@ void mockUnregisterGracefully() {
String unitName2 = UUID.randomUUID().toString();
vGroups2.put("vgroup2",unitName2);
meatadata2.put(CONSTANT_GROUP, vGroups2);
namingController.registerInstance(namespace, "cluster2", unitName2, node2);
namingController.registerInstance(namespace, "cluster1", unitName2, node2);
String vGroup = "vgroup1";
MetaResponse metaResponse = namingController.discovery(vGroup, namespace);
assertNotNull(metaResponse);
Expand All @@ -125,7 +123,7 @@ void mockUnregisterGracefully() {
metaResponse = namingController.discovery(vGroup, namespace);
assertNotNull(metaResponse);
assertNotNull(metaResponse.getClusterList());
assertEquals(0, metaResponse.getClusterList().get(0).getUnitData().size());
assertEquals(1, metaResponse.getClusterList().get(0).getUnitData().size());
}

@Test
Expand Down Expand Up @@ -265,4 +263,41 @@ void mockHeartbeat() throws InterruptedException {
assertEquals(8091, node1.getTransaction().getPort());
}

@Test
void mockIntermediateState() {
String clusterName = "cluster1";
String namespace = "public6";
String vGroup = "vgroup1";
String unitName = String.valueOf(UUID.randomUUID());
NamingServerNode node = new NamingServerNode();
node.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty"));
node.setControl(new Node.Endpoint("127.0.0.1", 7091, "http"));
Map<String, Object> meatadata = node.getMetadata();
Map<String,Object> vGroups = new HashMap<>();
vGroups.put(vGroup,unitName);
meatadata.put(CONSTANT_GROUP, vGroups);
namingController.registerInstance(namespace, clusterName, unitName, node);
NamingServerNode node2 = new NamingServerNode();
node2.setTransaction(new Node.Endpoint("127.0.0.1", 8092, "netty"));
node2.setControl(new Node.Endpoint("127.0.0.1", 7092, "http"));
Map<String, Object> meatadata2 = node2.getMetadata();
Map<String,Object> vGroups2 = new HashMap<>();
String unitName2 = UUID.randomUUID().toString();
vGroups2.put(vGroup,unitName2);
meatadata2.put(CONSTANT_GROUP, vGroups2);
namingController.registerInstance(namespace, "clusterName2", unitName2, node2);
MetaResponse metaResponse = namingController.discovery(vGroup, namespace);
assertNotNull(metaResponse);
assertNotNull(metaResponse.getClusterList());
assertEquals(2, metaResponse.getClusterList().size());
Cluster cluster = metaResponse.getClusterList().get(1);
assertNotNull(cluster.getUnitData());
assertEquals(1, cluster.getUnitData().size());
Unit unit = cluster.getUnitData().get(0);
assertNotNull(unit.getNamingInstanceList());
assertEquals(1, unit.getNamingInstanceList().size());
Node unit2 = unit.getNamingInstanceList().get(0);
assertEquals(unit2.getTransaction(), node2.getTransaction());
}

}

0 comments on commit b7fc0c1

Please sign in to comment.