Skip to content

Commit

Permalink
Merge branch 'develop' into patch-3
Browse files Browse the repository at this point in the history
  • Loading branch information
slievrly committed Jan 29, 2023
2 parents 35da27a + fda8060 commit b721e0d
Show file tree
Hide file tree
Showing 18 changed files with 126 additions and 33 deletions.
2 changes: 1 addition & 1 deletion build/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@

<!-- The version of spring-boot for 'spring-boot-dependencies' and 'spring-boot-maven-plugin' -->
<spring-boot.version>2.5.13</spring-boot.version>
<spring-framework.version>5.3.19</spring-framework.version>
<spring-framework.version>5.3.20</spring-framework.version>

<!-- For test -->
<junit-jupiter.version>5.8.2</junit-jupiter.version>
Expand Down
8 changes: 8 additions & 0 deletions changes/en-us/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Add changes here for all PR submitted to the develop branch.
- [[#5023](https://github.com/seata/seata/pull/5203)] Fix `seata-core` dependency transitive conflict in `seata-dubbo`
- [[#5224](https://github.com/seata/seata/pull/5224)] fix oracle initialize script index_name is duplicate
- [[#5233](https://github.com/seata/seata/pull/5233)] fix the inconsistent configuration item names related to LoadBalance
- [[#5245](https://github.com/seata/seata/pull/5245)] fix the incomplete dependency of distribution module



Expand All @@ -19,6 +20,11 @@ Add changes here for all PR submitted to the develop branch.
- [[#5212](https://github.com/seata/seata/pull/5212)] optimize log message level
- [[#5237](https://github.com/seata/seata/pull/5237)] optimize exception log message print(EnhancedServiceLoader.loadFile#cahtch)
- [[#5243](https://github.com/seata/seata/pull/5243)] optimize kryo 5.4.0 optimize compatibility with jdk17
- [[#5153](https://github.com/seata/seata/pull/5153)] Only AT mode try to get channel with other app
- [[#5177](https://github.com/seata/seata/pull/5177)] If `server.session.enable-branch-async-remove` is true, delete the branch asynchronously and unlock it synchronously.

### security:
- [[#5172](https://github.com/seata/seata/pull/5172)] fix some security vulnerabilities


### test:
Expand All @@ -34,6 +40,8 @@ Thanks to these contributors for their code commits. Please report an unintended
- [yuruixin](https://github.com/yuruixin)
- [CrazyLionLi](https://github.com/JavaLionLi)
- [xingfudeshi](https://github.com/xingfudeshi)
- [Bughue](https://github.com/Bughue)
- [pengten](https://github.com/pengten)


Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
7 changes: 7 additions & 0 deletions changes/zh-cn/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@
- [[#5023](https://github.com/seata/seata/pull/5203)] 修复 `seata-core` 模块传递依赖冲突
- [[#5224](https://github.com/seata/seata/pull/5224)] 修复 oracle初始化脚本索引名重复的问题
- [[#5233](https://github.com/seata/seata/pull/5233)] 修复LoadBalance相关配置不一致的问题
- [[#5245](https://github.com/seata/seata/pull/5245)] 修复不完整的distribution模块依赖


### optimize:
- [[#5208](https://github.com/seata/seata/pull/5208)] 优化多次重复获取Throwable#getCause问题
- [[#5212](https://github.com/seata/seata/pull/5212)] 优化不合理的日志信息级别
- [[#5237](https://github.com/seata/seata/pull/5237)] 优化异常日志打印(EnhancedServiceLoader.loadFile#cahtch)
- [[#5243](https://github.com/seata/seata/pull/5243)] 升级 kryo 5.4.0 优化对jdk17的兼容性
- [[#5153](https://github.com/seata/seata/pull/5153)] 只允许AT去尝试跨RM获取channel
- [[#5177](https://github.com/seata/seata/pull/5177)] 如果 `server.session.enable-branch-async-remove` 为真,异步删除分支,同步解锁。

### security:
- [[#5172](https://github.com/seata/seata/pull/5172)] 修复一些安全漏洞的版本

### test:
- [[#xxx](https://github.com/seata/seata/pull/xxx)] 增加 xxx 测试
Expand All @@ -33,6 +38,8 @@
- [yuruixin](https://github.com/yuruixin)
- [CrazyLionLi](https://github.com/JavaLionLi)
- [xingfudeshi](https://github.com/xingfudeshi)
- [Bughue](https://github.com/Bughue)
- [pengten](https://github.com/pengten)


同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
3 changes: 2 additions & 1 deletion core/src/main/java/io/seata/core/rpc/RemotingServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ public interface RemotingServer {
* @param resourceId rm client resourceId
* @param clientId rm client id
* @param msg transaction message {@code io.seata.core.protocol}
* @param tryOtherApp try other app
* @return client result message
* @throws TimeoutException TimeoutException
*/
Object sendSyncRequest(String resourceId, String clientId, Object msg) throws TimeoutException;
Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp) throws TimeoutException;

/**
* server send sync request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServ
}

@Override
public Object sendSyncRequest(String resourceId, String clientId, Object msg) throws TimeoutException {
Channel channel = ChannelManager.getChannel(resourceId, clientId);
public Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp)
throws TimeoutException {
Channel channel = ChannelManager.getChannel(resourceId, clientId, tryOtherApp);
if (channel == null) {
throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private static Channel getChannelFromSameClientMap(Map<Integer, RpcContext> clie
* @param clientId Client ID - ApplicationId:IP:Port
* @return Corresponding channel, NULL if not found.
*/
public static Channel getChannel(String resourceId, String clientId) {
public static Channel getChannel(String resourceId, String clientId, boolean tryOtherApp) {
Channel resultChannel = null;

String[] clientIdInfo = readClientId(clientId);
Expand Down Expand Up @@ -381,7 +381,7 @@ public static Channel getChannel(String resourceId, String clientId) {
}
}

if (resultChannel == null) {
if (resultChannel == null && tryOtherApp) {
resultChannel = tryOtherApp(applicationIdMap, targetApplicationId);

if (resultChannel == null) {
Expand Down
16 changes: 11 additions & 5 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<brpc.version>2.5.9</brpc.version>
<hsf.version>1.8.3</hsf.version>
<bytebuddy.version>1.12.13</bytebuddy.version>
<dubbo.alibaba.version>2.6.5</dubbo.alibaba.version>
<dubbo.alibaba.version>2.6.10</dubbo.alibaba.version>
<sofa.rpc.version>5.5.3</sofa.rpc.version>
<fastjson.version>1.2.83</fastjson.version>
<protostuff.version>1.5.9</protostuff.version>
Expand All @@ -55,7 +55,7 @@
<nacos-client.version>1.4.2</nacos-client.version>
<etcd-client-v3.version>0.5.0</etcd-client-v3.version>
<testcontainers.version>1.11.2</testcontainers.version>
<guava.version>27.0.1-jre</guava.version>
<guava.version>30.1-jre</guava.version>
<javax-inject.version>1</javax-inject.version>
<javax.annotation-api.version>1.3.2</javax.annotation-api.version>
<javax.servlet-api.version>4.0.1</javax.servlet-api.version>
Expand All @@ -69,7 +69,7 @@
<ant.version>1.10.12</ant.version>
<lz4.version>1.7.1</lz4.version>

<protobuf.version>3.11.4</protobuf.version>
<protobuf.version>3.16.3</protobuf.version>
<grpc.version>1.27.1</grpc.version>
<kryo.version>5.4.0</kryo.version>
<kryo-serializers.version>0.45</kryo-serializers.version>
Expand All @@ -80,6 +80,7 @@
<xstream.version>1.4.19</xstream.version>

<jwt.version>0.10.5</jwt.version>
<jackson.databind.version>2.13.4.1</jackson.databind.version>
<prometheus.client.version>0.6.0</prometheus.client.version>
<logback.version>1.2.9</logback.version>
<logstash-logback-encoder.version>6.5</logstash-logback-encoder.version>
Expand All @@ -90,9 +91,9 @@

<!-- # for database -->
<!-- db -->
<mysql.version>5.1.35</mysql.version>
<mysql.version>5.1.42</mysql.version>
<ojdbc.version>19.3.0.0</ojdbc.version>
<postgresql.version>42.1.4</postgresql.version>
<postgresql.version>42.3.3</postgresql.version>
<h2.version>1.4.181</h2.version>
<mariadb.version>2.7.2</mariadb.version>
<!-- db connection pool -->
Expand Down Expand Up @@ -615,6 +616,11 @@
<artifactId>jjwt-jackson</artifactId>
<version>${jwt.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.databind.version}</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_httpserver</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
<artifactId>seata-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>apm-seata-skywalking-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<finalName>seata</finalName>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@
<image.publish.skip>false</image.publish.skip>
<image.tags>latest</image.tags>
<dependencies.copy.skip>false</dependencies.copy.skip>
<mysql.jdbc.version>5.1.35</mysql.jdbc.version>
<mysql.jdbc.version>5.1.42</mysql.jdbc.version>
<mysql8.jdbc.version>8.0.27</mysql8.jdbc.version>
</properties>
</profile>
Expand Down
2 changes: 1 addition & 1 deletion server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@
<profile>
<id>release-seata</id>
<properties>
<mysql.jdbc.version>5.1.35</mysql.jdbc.version>
<mysql.jdbc.version>5.1.42</mysql.jdbc.version>
<mysql8.jdbc.version>8.0.27</mysql8.jdbc.version>
<dependencies.copy.skip>false</dependencies.copy.skip>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ public BranchStatus branchCommit(GlobalSession globalSession, BranchSession bran

protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession,
BranchSession branchSession) throws IOException, TimeoutException {

BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(
branchSession.getResourceId(), branchSession.getClientId(), request);
branchSession.getResourceId(), branchSession.getClientId(), request, branchSession.isAT());
return response.getBranchStatus();
}

Expand All @@ -196,8 +197,9 @@ public BranchStatus branchRollback(GlobalSession globalSession, BranchSession br

protected BranchStatus branchRollbackSend(BranchRollbackRequest request, GlobalSession globalSession,
BranchSession branchSession) throws IOException, TimeoutException {

BranchRollbackResponse response = (BranchRollbackResponse) remotingServer.sendSyncRequest(
branchSession.getResourceId(), branchSession.getClientId(), request);
branchSession.getResourceId(), branchSession.getClientId(), request, branchSession.isAT());
return response.getBranchStatus();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.seata.server.session.SessionCondition;
import io.seata.server.session.SessionHelper;
import io.seata.server.session.SessionHolder;
import io.seata.server.store.StoreConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand All @@ -77,6 +78,7 @@
import static io.seata.common.Constants.UNDOLOG_DELETE;
import static io.seata.common.DefaultValues.DEFAULT_ASYNC_COMMITTING_RETRY_PERIOD;
import static io.seata.common.DefaultValues.DEFAULT_COMMITING_RETRY_PERIOD;
import static io.seata.common.DefaultValues.DEFAULT_ENABLE_BRANCH_ASYNC_REMOVE;
import static io.seata.common.DefaultValues.DEFAULT_MAX_COMMIT_RETRY_TIMEOUT;
import static io.seata.common.DefaultValues.DEFAULT_MAX_ROLLBACK_RETRY_TIMEOUT;
import static io.seata.common.DefaultValues.DEFAULT_ROLLBACKING_RETRY_PERIOD;
Expand Down Expand Up @@ -138,7 +140,7 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
/**
* the pool size of branch asynchronous remove thread pool
*/
private static final int BRANCH_ASYNC_POOL_SIZE = Runtime.getRuntime().availableProcessors();
private static final int BRANCH_ASYNC_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;

private static final Duration MAX_COMMIT_RETRY_TIMEOUT = ConfigurationFactory.getInstance().getDuration(
ConfigurationKeys.MAX_COMMIT_RETRY_TIMEOUT, DurationUtil.DEFAULT_DURATION, DEFAULT_MAX_COMMIT_RETRY_TIMEOUT);
Expand Down Expand Up @@ -170,12 +172,7 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
private final GlobalStatus[] retryCommittingStatuses =
new GlobalStatus[] {GlobalStatus.Committing, GlobalStatus.CommitRetrying};

private final ThreadPoolExecutor branchRemoveExecutor = new ThreadPoolExecutor(BRANCH_ASYNC_POOL_SIZE, BRANCH_ASYNC_POOL_SIZE,
Integer.MAX_VALUE, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(
CONFIG.getInt(ConfigurationKeys.SESSION_BRANCH_ASYNC_QUEUE_SIZE, DEFAULT_BRANCH_ASYNC_QUEUE_SIZE)
), new NamedThreadFactory("branchSessionRemove", BRANCH_ASYNC_POOL_SIZE),
new ThreadPoolExecutor.CallerRunsPolicy());
private final ThreadPoolExecutor branchRemoveExecutor;

private RemotingServer remotingServer;

Expand All @@ -194,6 +191,19 @@ private DefaultCoordinator(RemotingServer remotingServer) {
}
this.remotingServer = remotingServer;
this.core = new DefaultCore(remotingServer);
boolean enableBranchAsyncRemove = CONFIG.getBoolean(
ConfigurationKeys.ENABLE_BRANCH_ASYNC_REMOVE, DEFAULT_ENABLE_BRANCH_ASYNC_REMOVE);
// create branchRemoveExecutor
if (enableBranchAsyncRemove && StoreConfig.getSessionMode() != StoreConfig.SessionMode.FILE) {
branchRemoveExecutor = new ThreadPoolExecutor(BRANCH_ASYNC_POOL_SIZE, BRANCH_ASYNC_POOL_SIZE,
Integer.MAX_VALUE, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(
CONFIG.getInt(ConfigurationKeys.SESSION_BRANCH_ASYNC_QUEUE_SIZE, DEFAULT_BRANCH_ASYNC_QUEUE_SIZE)
), new NamedThreadFactory("branchSessionRemove", BRANCH_ASYNC_POOL_SIZE),
new ThreadPoolExecutor.CallerRunsPolicy());
} else {
branchRemoveExecutor = null;
}
}

public static DefaultCoordinator getInstance(RemotingServer remotingServer) {
Expand Down Expand Up @@ -533,14 +543,18 @@ public void destroy() {
asyncCommitting.shutdown();
timeoutCheck.shutdown();
undoLogDelete.shutdown();
branchRemoveExecutor.shutdown();
if (branchRemoveExecutor != null) {
branchRemoveExecutor.shutdown();
}
try {
retryRollbacking.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
retryCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
asyncCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
timeoutCheck.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
undoLogDelete.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
branchRemoveExecutor.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
if (branchRemoveExecutor != null) {
branchRemoveExecutor.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException ignore) {

}
Expand Down Expand Up @@ -582,6 +596,9 @@ static class BranchRemoveTask implements Runnable {
*/
public BranchRemoveTask(GlobalSession globalSession, BranchSession branchSession) {
this.globalSession = globalSession;
if (branchSession == null) {
throw new IllegalArgumentException("BranchSession can`t be null!");
}
this.branchSession = branchSession;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ public boolean unlock() throws TransactionException {
return true;
}

public boolean isAT() {
return this.getBranchType() == BranchType.AT;
}

public LockStatus getLockStatus() {
return lockStatus;
}
Expand Down
12 changes: 11 additions & 1 deletion server/src/main/java/io/seata/server/session/GlobalSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,20 +312,30 @@ public void loadBranchs() {
}

@Override
public void removeBranch(BranchSession branchSession) throws TransactionException {
public void unlockBranch(BranchSession branchSession) throws TransactionException {
// do not unlock if global status in (Committing, CommitRetrying, AsyncCommitting),
// because it's already unlocked in 'DefaultCore.commit()'
if (status != Committing && status != CommitRetrying && status != AsyncCommitting) {
if (!branchSession.unlock()) {
throw new TransactionException("Unlock branch lock failed, xid = " + this.xid + ", branchId = " + branchSession.getBranchId());
}
}
}

@Override
public void removeBranch(BranchSession branchSession) throws TransactionException {
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onRemoveBranch(this, branchSession);
}
remove(branchSession);
}

@Override
public void removeAndUnlockBranch(BranchSession branchSession) throws TransactionException {
unlockBranch(branchSession);
removeBranch(branchSession);
}

/**
* Gets branch.
*
Expand Down
27 changes: 21 additions & 6 deletions server/src/main/java/io/seata/server/session/SessionHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ public static Boolean forEach(Collection<BranchSession> sessions, BranchSessionH
*/
public static void removeBranch(GlobalSession globalSession, BranchSession branchSession, boolean isAsync)
throws TransactionException {
if (Objects.equals(Boolean.TRUE, ENABLE_BRANCH_ASYNC_REMOVE) && isAsync) {
globalSession.unlockBranch(branchSession);
if (isEnableBranchRemoveAsync() && isAsync) {
COORDINATOR.doBranchRemoveAsync(globalSession, branchSession);
} else {
globalSession.removeBranch(branchSession);
Expand All @@ -312,12 +313,26 @@ public static void removeAllBranch(GlobalSession globalSession, boolean isAsync)
if (branchSessions == null || branchSessions.isEmpty()) {
return;
}
if (Objects.equals(Boolean.TRUE, ENABLE_BRANCH_ASYNC_REMOVE) && isAsync) {
COORDINATOR.doBranchRemoveAllAsync(globalSession);
} else {
for (BranchSession branchSession : branchSessions) {
globalSession.removeBranch(branchSession);
boolean isAsyncRemove = isEnableBranchRemoveAsync() && isAsync;
for (BranchSession branchSession : branchSessions) {
if (isAsyncRemove) {
globalSession.unlockBranch(branchSession);
} else {
globalSession.removeAndUnlockBranch(branchSession);
}
}
if (isAsyncRemove) {
COORDINATOR.doBranchRemoveAllAsync(globalSession);
}
}

/**
* if true, enable delete the branch asynchronously
*
* @return the boolean
*/
private static boolean isEnableBranchRemoveAsync() {
return Objects.equals(Boolean.TRUE, DELAY_HANDLE_SESSION)
&& Objects.equals(Boolean.TRUE, ENABLE_BRANCH_ASYNC_REMOVE);
}
}
Loading

0 comments on commit b721e0d

Please sign in to comment.