Skip to content

Commit

Permalink
feature: support raft cluster (apache#5226)
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes committed Oct 25, 2023
1 parent 2d6ae18 commit e9919e4
Show file tree
Hide file tree
Showing 131 changed files with 5,659 additions and 297 deletions.
5 changes: 5 additions & 0 deletions all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@
<artifactId>seata-discovery-sofa</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-discovery-raft</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-discovery-zk</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@
<artifactId>seata-discovery-sofa</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-discovery-raft</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-brpc</artifactId>
Expand Down
1 change: 1 addition & 0 deletions changes/en-us/2.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The version is updated as follows:
- [[#5907](https://github.com/seata/seata/pull/5907)] support polardb-x 2.0 in AT mode
- [[#5932](https://github.com/seata/seata/pull/5932)] support Dameng database
- [[#5946](https://github.com/seata/seata/pull/5946)] add sqlserver's adaptation to the console paging interface
- [[#5226](https://github.com/seata/seata/pull/5226)] support raft cluster and store mode

### bugfix:
- [[#5677](https://github.com/seata/seata/pull/5677)] fix saga mode serviceTask inputParams json autoType convert exception
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#5907](https://github.com/seata/seata/pull/5907)] 增加AT模式的PolarDB-X 2.0数据库支持
- [[#5932](https://github.com/seata/seata/pull/5932)] AT模式支持达梦数据库
- [[#5946](https://github.com/seata/seata/pull/5946)] 增加sqlserver对控制台分页接口的适配
- [[#5226](https://github.com/seata/seata/pull/5226)] 支持Raft集群部署和事务存储模式

### bugfix:
- [[#5677](https://github.com/seata/seata/pull/5677)] 修复saga模式下serviceTask入参autoType转化失败问题
Expand Down
81 changes: 81 additions & 0 deletions common/src/main/java/io/seata/common/ConfigurationKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ public interface ConfigurationKeys {
*/
String SERVER_SERVICE_PORT_CAMEL = SERVER_PREFIX + "servicePort";

/**
* The constant SERVER_RAFT_PORT.
*/
String SERVER_RAFT_PORT_CAMEL = SERVER_PREFIX + "raftPort";

/**
* The constant SERVER_SERVICE_PORT_CONFIG.
*/
Expand Down Expand Up @@ -866,6 +871,81 @@ public interface ConfigurationKeys {
*/
String ENABLE_BRANCH_ASYNC_REMOVE = SERVER_PREFIX + SESSION_PREFIX + "enableBranchAsyncRemove";

/**
* The constant SERVER_RAFT.
*/
String SERVER_RAFT = SERVER_PREFIX + "raft.";

/**
* The constant SERVER_RAFT_SERVER_ADDR.
*/
String SERVER_RAFT_SERVER_ADDR = SERVER_RAFT + "serverAddr";

/**
* The constant SERVER_RAFT_GROUP.
*/
String SERVER_RAFT_GROUP = SERVER_RAFT + "group";

/**
* The constant SERVER_RAFT_SNAPSHOT_INTERVAL.
*/
String SERVER_RAFT_SNAPSHOT_INTERVAL = SERVER_RAFT + "snapshotInterval";

/**
* The constant SERVER_RAFT_DISRUPTOR_BUFFER_SIZE.
*/
String SERVER_RAFT_DISRUPTOR_BUFFER_SIZE = SERVER_RAFT + "disruptorBufferSize";

/**
* The constant SERVER_RAFT_MAX_REPLICATOR_INFLIGHT_MSGS.
*/
String SERVER_RAFT_MAX_REPLICATOR_INFLIGHT_MSGS = SERVER_RAFT + "maxReplicatorInflightMsgs";

/**
* The constant SERVER_RAFT_SYNC.
*/
String SERVER_RAFT_SYNC = SERVER_RAFT + "sync";

/**
* The constant SERVER_RAFT_MAX_APPEND_BUFFER_SIZE.
*/
String SERVER_RAFT_MAX_APPEND_BUFFER_SIZE = SERVER_RAFT + "maxAppendBufferSize";

/**
* The constant SERVER_RAFT_APPLY_BATCH.
*/
String SERVER_RAFT_APPLY_BATCH = SERVER_RAFT + "applyBatch";

/**
* The constant SERVER_RAFT_APPLY_BATCH.
*/
String SERVER_RAFT_ELECTION_TIMEOUT_MS = SERVER_RAFT + "electionTimeoutMs";

/**
* The constant SERVER_RAFT_REPORTER_ENABLED.
*/
String SERVER_RAFT_REPORTER_ENABLED = SERVER_RAFT + "reporterEnabled";

/**
* The constant SERVER_RAFT_REPORTER_INITIAL_DELAY.
*/
String SERVER_RAFT_REPORTER_INITIAL_DELAY = SERVER_RAFT + "reporterInitialDelay";

/**
* The constant SERVER_RAFT_SERIALIZATION.
*/
String SERVER_RAFT_SERIALIZATION = SERVER_RAFT + "serialization";

/**
* The constant SERVER_RAFT_COMPRESSOR.
*/
String SERVER_RAFT_COMPRESSOR = SERVER_RAFT + "compressor";

/**
* The constant CLIENT_METADATA_MAX_AGE_MS.
*/
String CLIENT_METADATA_MAX_AGE_MS = CLIENT_PREFIX + "metadataMaxAgeMs";

/**
* The constant IS_USE_CLOUD_NAMESPACE_PARSING.
*/
Expand All @@ -891,6 +971,7 @@ public interface ConfigurationKeys {
*/
String XA_CONNECTION_TWO_PHASE_HOLD_TIMEOUT = CLIENT_RM_PREFIX + "connectionTwoPhaseHoldTimeoutXA";


/**
* The constant ENABLE_PARALLEL_REQUEST_HANDLE_KEY
*/
Expand Down
15 changes: 14 additions & 1 deletion common/src/main/java/io/seata/common/DefaultValues.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,15 @@ public interface DefaultValues {
long DEFAULT_TABLE_META_CHECKER_INTERVAL = 60000L;
boolean DEFAULT_TM_DEGRADE_CHECK = false;
boolean DEFAULT_CLIENT_SAGA_BRANCH_REGISTER_ENABLE = false;

/**
* The default session store dir
*/
String DEFAULT_SESSION_STORE_FILE_DIR = "sessionStore";
boolean DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE = false;
boolean DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE = false;
String DEFAULT_RAFT_SERIALIZATION = "jackson";
String DEFAULT_RAFT_COMPRESSOR = "none";

/**
* Shutdown timeout default 3s
Expand All @@ -53,7 +60,6 @@ public interface DefaultValues {
boolean DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST = true;
boolean DEFAULT_ENABLE_TC_SERVER_BATCH_SEND_RESPONSE = false;


String DEFAULT_BOSS_THREAD_PREFIX = "NettyBoss";
String DEFAULT_NIO_WORKER_THREAD_PREFIX = "NettyServerNIOWorker";
String DEFAULT_EXECUTOR_THREAD_PREFIX = "NettyServerBizHandler";
Expand Down Expand Up @@ -120,6 +126,8 @@ public interface DefaultValues {
String DEFAULT_LOAD_BALANCE = "XID";
int VIRTUAL_NODES_DEFAULT = 10;

String DEFAULT_SEATA_GROUP = "default";

/**
* the constant DEFAULT_CLIENT_UNDO_COMPRESS_ENABLE
*/
Expand All @@ -135,6 +143,7 @@ public interface DefaultValues {
*/
String DEFAULT_CLIENT_UNDO_COMPRESS_THRESHOLD = "64k";


/**
* the constant DEFAULT_RETRY_DEAD_THRESHOLD
*/
Expand Down Expand Up @@ -203,6 +212,10 @@ public interface DefaultValues {
*/
int DEFAULT_XA_CONNECTION_TWO_PHASE_HOLD_TIMEOUT = 10000;

/**
* the constant DEFAULT_SERVER_RAFT_ELECTION_TIMEOUT_MS
*/
int DEFAULT_SERVER_RAFT_ELECTION_TIMEOUT_MS = 1000;
/**
* the constant DEFAULT_COMMITING_RETRY_PERIOD
*/
Expand Down
54 changes: 54 additions & 0 deletions common/src/main/java/io/seata/common/metadata/ClusterRole.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.common.metadata;

/**
* @author funkye
*/
public enum ClusterRole {

/**
* raft mode leader
*/
LEADER(0),
/**
* raft mode follower
*/
FOLLOWER(1),
/**
* raft mode learner
*/
LEARNER(2),
/**
* cluster mode member
*/
MEMBER(3);

private int roleCode;

ClusterRole(int roleCode) {
this.roleCode = roleCode;
}

public int getRoleCode() {
return roleCode;
}

public void setRoleCode(int roleCode) {
this.roleCode = roleCode;
}

}
115 changes: 115 additions & 0 deletions common/src/main/java/io/seata/common/metadata/Metadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.common.metadata;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;

import io.seata.common.store.StoreMode;
import io.seata.common.util.StringUtils;

/**
* @author funkye
*/
public class Metadata {

private final Map<String/*vgroup*/, Map<String/*raft-group*/, Node>> leaders = new ConcurrentHashMap<>();

private final Map<String/*vgroup*/, Map<String/*raft-group*/, Long/*term*/>> clusterTerm = new ConcurrentHashMap<>();

private final Map<String/*vgroup*/, Map<String/*raft-group*/, List<Node>>> clusterNodes =
new ConcurrentHashMap<>();

private StoreMode storeMode = StoreMode.FILE;

public Node getLeader(String clusterName) {
Map<String/*raft-group*/, Node> map = leaders.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>());
List<Node> nodes = new ArrayList<>(map.values());
return nodes.size() > 0 ? nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())) : null;
}

public void setLeaderNode(String clusterName, Node node) {
String group = node.getGroup();
Map<String/*raft-group*/, Node> map = leaders.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>());
map.put(group, node);
this.leaders.put(clusterName, map);
}

public List<Node> getNodes(String clusterName, String group) {
return clusterNodes.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>()).get(group);
}

public List<Node> getNodes(String clusterName) {
return clusterNodes.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>()).values().stream()
.flatMap(List::stream).collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
}

public void setNodes(String clusterName, String group, List<Node> nodes) {
this.clusterNodes.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>()).put(group, nodes);
}

public boolean containsGroup(String group) {
return clusterNodes.containsKey(group);
}

public Set<String> groups(String clusterName) {
return clusterNodes.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>()).keySet();
}

public StoreMode getStoreMode() {
return storeMode;
}

public boolean isRaftMode() {
return Objects.equals(storeMode, StoreMode.RAFT);
}

public void setStoreMode(StoreMode storeMode) {
this.storeMode = storeMode;
}

public Map<String, Long> getClusterTerm(String clusterName) {
return clusterTerm.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>());
}

public void refreshMetadata(String clusterName, MetadataResponse metadataResponse) {
List<Node> list = new ArrayList<>();
for (Node node : metadataResponse.getNodes()) {
if (node.getRole() == ClusterRole.LEADER) {
this.setLeaderNode(clusterName, node);
}
list.add(node);
}
this.storeMode = StoreMode.get(metadataResponse.getStoreMode());
if (!list.isEmpty()) {
String group = list.get(0).getGroup();
this.setNodes(clusterName, group, list);
this.clusterTerm.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>()).put(group,
metadataResponse.getTerm());
}
}

@Override
public String toString() {
return StringUtils.toString(this);
}

}
Loading

0 comments on commit e9919e4

Please sign in to comment.