Skip to content

Commit

Permalink
Merge branch 'develop' into changecdn
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes committed Feb 28, 2022
2 parents 0a71602 + f86892d commit cf88315
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
2 changes: 2 additions & 0 deletions changes/1.5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#4400](https://github.com/seata/seata/pull/4400)] 异步二阶段任务支持并行处理提升效率
- [[#4391](https://github.com/seata/seata/pull/4391)] commit/rollback 重试超时事件
- [[#4282](https://github.com/seata/seata/pull/4282)] 优化回滚镜像构建逻辑
- [[#4407](https://github.com/seata/seata/pull/4407)] file模式下无需延迟删除globasession



### test:
Expand Down
2 changes: 2 additions & 0 deletions changes/en-us/1.5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@
- [[#4400](https://github.com/seata/seata/pull/4400)] asynchronous tasks handle global transactions in parallel
- [[#4391](https://github.com/seata/seata/pull/4391)] commit/rollback retry timeout event
- [[#4282](https://github.com/seata/seata/pull/4282)] optimize build UndoItem logic
- [[#4407](https://github.com/seata/seata/pull/4407)] file mode does not require lazy processing of sessions


### test:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import io.netty.channel.Channel;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.DurationUtil;
import io.seata.common.util.StringUtils;
import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.context.RootContext;
import io.seata.core.event.EventBus;
import io.seata.core.exception.TransactionException;
import io.seata.core.model.GlobalStatus;
import io.seata.core.protocol.AbstractMessage;
Expand Down Expand Up @@ -67,8 +67,8 @@
import io.seata.core.rpc.TransactionMessageHandler;
import io.seata.core.rpc.netty.ChannelManager;
import io.seata.core.rpc.netty.NettyRemotingServer;
import io.seata.core.store.StoreMode;
import io.seata.server.AbstractTCInboundHandler;
import io.seata.server.event.EventBusManager;
import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition;
Expand Down Expand Up @@ -165,16 +165,19 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran

private final DefaultCore core;

private final EventBus eventBus = EventBusManager.get();

private static volatile DefaultCoordinator instance;

private final boolean delayHandleSession;

/**
* Instantiates a new Default coordinator.
*
* @param remotingServer the remoting server
*/
private DefaultCoordinator(RemotingServer remotingServer) {
String mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
// file mode requires no delay in processing
this.delayHandleSession = !StringUtils.equalsIgnoreCase(mode, StoreMode.FILE.getName());
if (remotingServer == null) {
throw new IllegalArgumentException("RemotingServer not allowed be null.");
}
Expand Down Expand Up @@ -422,8 +425,9 @@ protected void handleRetryRollbacking(Collection<GlobalSession> rollbackingSessi
SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
try {
// prevent repeated rollback
if (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking) && !rollbackingSession.isDeadSession()) {
//The function of this 'return' is 'continue'.
if (delayHandleSession && rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking)
&& !rollbackingSession.isDeadSession()) {
// The function of this 'return' is 'continue'.
return;
}
if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {
Expand Down Expand Up @@ -461,8 +465,9 @@ protected void handleRetryCommitting(Collection<GlobalSession> committingSession
SessionHelper.forEach(committingSessions, committingSession -> {
try {
// prevent repeated commit
if (committingSession.getStatus().equals(GlobalStatus.Committing) && !committingSession.isDeadSession()) {
//The function of this 'return' is 'continue'.
if (delayHandleSession && committingSession.getStatus().equals(GlobalStatus.Committing)
&& !committingSession.isDeadSession()) {
// The function of this 'return' is 'continue'.
return;
}
if (isRetryTimeout(now, MAX_COMMIT_RETRY_TIMEOUT.toMillis(), committingSession.getBeginTime())) {
Expand Down

0 comments on commit cf88315

Please sign in to comment.