Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: file session manager changed to singleton and optimize task thread pool model #4451

Merged
merged 89 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
1d89b88
optimize: skip reload
caohdgege Feb 18, 2022
87234e6
optimize: close undo log delete
caohdgege Feb 18, 2022
df5b93a
add md
caohdgege Feb 18, 2022
3c8939c
Merge branch 'develop' into skip-reload_
wangliang181230 Feb 23, 2022
8e39036
fix logic and ci
caohdgege Feb 28, 2022
a54426b
Merge remote-tracking branch 'origin/skip-reload_' into skip-reload_
caohdgege Feb 28, 2022
9c48ec9
Merge branch 'develop' into skip-reload_
caohdgege Feb 28, 2022
125e29f
fix pmd
caohdgege Feb 28, 2022
47b2bfa
Merge remote-tracking branch 'origin/skip-reload_' into skip-reload_
caohdgege Feb 28, 2022
6534549
fix ci
caohdgege Feb 28, 2022
7309dad
md
caohdgege Mar 3, 2022
f3cc555
fixed the inability to get some remote configurations
funky-eyes Mar 3, 2022
0db8a8f
update
funky-eyes Mar 3, 2022
2598db9
update
funky-eyes Mar 3, 2022
1a6618a
code formatting
funky-eyes Mar 3, 2022
5013b19
fix
funky-eyes Mar 3, 2022
b80cead
bugfix
funky-eyes Mar 4, 2022
e9dce09
bugfix
funky-eyes Mar 4, 2022
d445556
bugfix
funky-eyes Mar 4, 2022
7117812
bugfix
funky-eyes Mar 4, 2022
eb53e00
bugfix
funky-eyes Mar 4, 2022
db19a9f
bugfix
funky-eyes Mar 4, 2022
a0d800d
bugfix
funky-eyes Mar 4, 2022
643d214
add a log && set val for errorStatusGlobalSessions
caohdgege Mar 5, 2022
320c334
add log
caohdgege Mar 5, 2022
03b2739
optimize
caohdgege Mar 5, 2022
9bb1ec9
Merge branch 'develop' into 0303
funky-eyes Mar 6, 2022
204f234
Merge branch 'develop' into 0303
wangliang181230 Mar 7, 2022
a61fc96
Merge branch 'develop' into skip-reload_
slievrly Mar 7, 2022
756f542
Merge branch 'develop' into 0303
slievrly Mar 7, 2022
e03f8c3
optimize
funky-eyes Mar 8, 2022
97ed206
Merge branch '0303' of github.com:a364176773/seata into 0303
funky-eyes Mar 8, 2022
36467e2
optimize
funky-eyes Mar 8, 2022
8307356
optimize
funky-eyes Mar 8, 2022
8d6634b
fix typo
caohdgege Mar 8, 2022
12c0569
fix
caohdgege Mar 8, 2022
1cf3830
Merge remote-tracking branch 'origin/skip-reload_' into skip-reload_
caohdgege Mar 8, 2022
4b16d2b
reverse space
caohdgege Mar 8, 2022
a98cf19
UnusedImports
caohdgege Mar 8, 2022
e414172
fix ci
caohdgege Mar 8, 2022
0bf17a9
Merge branch 'develop' into skip-reload_
caohdgege Mar 8, 2022
6367c9b
fix ci
caohdgege Mar 8, 2022
67d0fae
Merge remote-tracking branch 'origin/skip-reload_' into skip-reload_
caohdgege Mar 8, 2022
106290f
Merge branch 'develop' of github.com:seata/seata into 0309
funky-eyes Mar 9, 2022
a92eeb3
optimize: seata-server timing tasks
funky-eyes Mar 9, 2022
8271ee7
optimize: seata-server timing tasks
funky-eyes Mar 9, 2022
1509b2f
opt
funky-eyes Mar 9, 2022
7f24217
Merge branch 'skip-reload_' of https://github.com/caohdgege/seata int…
funky-eyes Mar 9, 2022
8bc2229
commit
funky-eyes Mar 9, 2022
b758b77
commit
funky-eyes Mar 9, 2022
cfa29cd
Merge branch 'skip-reload_' of github.com:caohdgege/seata into 0309
funky-eyes Mar 10, 2022
46aff5a
optimize
funky-eyes Mar 10, 2022
7cb8d59
optimize
funky-eyes Mar 10, 2022
61f4e5e
commit
funky-eyes Mar 10, 2022
3c9a03d
Merge branch '0309' of github.com:a364176773/seata into 0309
funky-eyes Mar 10, 2022
3841138
Merge branch 'develop' into 0309
funky-eyes Mar 10, 2022
78db1b0
optimize
funky-eyes Mar 11, 2022
e8b6b71
optimize
funky-eyes Mar 11, 2022
ee8b5f9
Merge branch 'develop' into 0309
funky-eyes Mar 12, 2022
acf8151
commit
funky-eyes Mar 13, 2022
657d26d
Merge branch 'develop' into 0309
slievrly Mar 14, 2022
162e1c1
Merge branch 'develop' of github.com:seata/seata into 0309
funky-eyes Mar 14, 2022
7b18221
Merge branch 'develop' of github.com:seata/seata into 0309
funky-eyes Mar 14, 2022
54b1b02
opt
funky-eyes Mar 14, 2022
308768b
opt
funky-eyes Mar 14, 2022
3899a69
Merge branch 'develop' into 0309
funky-eyes Mar 18, 2022
6b9c5cf
Merge branch 'develop' into 0309
funky-eyes Mar 19, 2022
da327dd
Merge branch 'develop' into 0309
funky-eyes Apr 6, 2022
030ee9f
modify test case
funky-eyes Apr 6, 2022
43938d8
Merge remote-tracking branch 'origin/0309' into 0309
funky-eyes Apr 6, 2022
8e3953e
opt
funky-eyes Apr 6, 2022
099d0d3
optimize
funky-eyes Apr 7, 2022
e795464
modify test case
funky-eyes Apr 8, 2022
3d9080e
Merge remote-tracking branch 'origin/0309' into 0309
funky-eyes Apr 8, 2022
c38fe21
modify test case
funky-eyes Apr 8, 2022
0b3c928
modify test case
funky-eyes Apr 8, 2022
5a4c5fb
fix test case fail
funky-eyes Apr 9, 2022
d586673
Merge branch 'develop' of github.com:seata/seata into 0309
funky-eyes Apr 9, 2022
988c759
opt
funky-eyes Apr 19, 2022
bc58bc4
Merge branch 'develop' of github.com:seata/seata into 0309
funky-eyes Apr 19, 2022
2228bc9
opt
funky-eyes Apr 19, 2022
4e25ee7
opt
funky-eyes Apr 19, 2022
aeabd59
opt
funky-eyes Apr 19, 2022
4631220
bugfix
funky-eyes Apr 19, 2022
47fa7de
optimize
funky-eyes Apr 20, 2022
474e866
Merge branch '0309' of github.com:a364176773/seata into 0309
funky-eyes Apr 20, 2022
3abb98e
optimize
funky-eyes Apr 20, 2022
f7bd2e5
fix case
funky-eyes Apr 20, 2022
4b3f1a9
fix case
funky-eyes Apr 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
optimize: seata-server timing tasks
  • Loading branch information
funky-eyes committed Mar 9, 2022
commit a92eeb32a50a45281dde371092583759df734c71
20 changes: 18 additions & 2 deletions common/src/main/java/io/seata/common/ConfigurationKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,24 @@ public interface ConfigurationKeys {
String RECOVERY_PREFIX = SERVER_PREFIX + "recovery.";

/**
* The constant HANDLE_ALL_SESSION_PERIOD.
* The constant COMMITING_RETRY_PERIOD.
*/
String HANDLE_ALL_SESSION_PERIOD = RECOVERY_PREFIX + "handleAllSessionPeriod";
String COMMITING_RETRY_PERIOD = RECOVERY_PREFIX + "committingRetryPeriod";

/**
* The constant ASYN_COMMITING_RETRY_PERIOD.
*/
String ASYN_COMMITING_RETRY_PERIOD = RECOVERY_PREFIX + "asynCommittingRetryPeriod";

/**
* The constant ROLLBACKING_RETRY_PERIOD.
*/
String ROLLBACKING_RETRY_PERIOD = RECOVERY_PREFIX + "rollbackingRetryPeriod";

/**
* The constant TIMEOUT_RETRY_PERIOD.
*/
String TIMEOUT_RETRY_PERIOD = RECOVERY_PREFIX + "timeoutRetryPeriod";

/**
* The constant CLIENT_UNDO_PREFIX.
Expand All @@ -377,6 +392,7 @@ public interface ConfigurationKeys {
* The constant TRANSACTION_UNDO_DATA_VALIDATION.
*/
String TRANSACTION_UNDO_DATA_VALIDATION = CLIENT_UNDO_PREFIX + "dataValidation";

/**
* The constant TRANSACTION_UNDO_LOG_SERIALIZATION.
*/
Expand Down
185 changes: 70 additions & 115 deletions server/src/main/java/io/seata/server/coordinator/DefaultCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;


import static io.seata.common.Constants.HANDLE_ALL_SESSION;
import static io.seata.common.Constants.ASYNC_COMMITTING;
import static io.seata.common.Constants.RETRY_COMMITTING;
import static io.seata.common.Constants.RETRY_ROLLBACKING;
import static io.seata.common.Constants.TX_TIMEOUT_CHECK;
import static io.seata.common.Constants.UNDOLOG_DELETE;

/**
Expand All @@ -92,10 +94,27 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
private static final int TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS = 5000;

/**
* The constant HANDLE_ALL_SESSION_PERIOD.
* The constant COMMITTING_RETRY_PERIOD.
*/
protected static final long COMMITTING_RETRY_PERIOD = CONFIG.getLong(ConfigurationKeys.COMMITING_RETRY_PERIOD,
1000L);

/**
* The constant ASYNC_COMMITTING_RETRY_PERIOD.
*/
protected static final long ASYNC_COMMITTING_RETRY_PERIOD = CONFIG.getLong(
ConfigurationKeys.ASYN_COMMITING_RETRY_PERIOD, 1000L);

/**
* The constant ROLLBACKING_RETRY_PERIOD.
*/
protected static final long ROLLBACKING_RETRY_PERIOD = CONFIG.getLong(ConfigurationKeys.ROLLBACKING_RETRY_PERIOD,
1000L);

/**
* The constant TIMEOUT_RETRY_PERIOD.
*/
protected static final long HANDLE_ALL_SESSION_PERIOD = CONFIG.getLong(ConfigurationKeys.HANDLE_ALL_SESSION_PERIOD,
1000L);
protected static final long TIMEOUT_RETRY_PERIOD = CONFIG.getLong(ConfigurationKeys.TIMEOUT_RETRY_PERIOD, 1000L);

/**
* The Transaction undo log delete period.
Expand Down Expand Up @@ -129,30 +148,26 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
private static final boolean ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE = ConfigurationFactory.getInstance().getBoolean(
ConfigurationKeys.ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE, false);

private final ExecutorService retryRollbacking = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory("RetryRollbacking", 1));

private final ExecutorService retryCommitting = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory("RetryCommitting", 1));
private final ScheduledThreadPoolExecutor retryRollbacking =
new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("RetryRollbacking", 1));

private final ExecutorService asyncCommitting = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory("AsyncCommitting", 1));
private final ScheduledThreadPoolExecutor retryCommitting =
new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("RetryCommitting", 1));

private final ExecutorService timeoutCheck = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory("TxTimeoutCheck", 1));
private final ScheduledThreadPoolExecutor asyncCommitting =
new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncCommitting", 1));

private final ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("UndoLogDelete", 1));
private final ScheduledThreadPoolExecutor timeoutCheck =
new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("TxTimeoutCheck", 1));

private final ScheduledThreadPoolExecutor handleAllSession = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("HandleAllSession", 1));
private final ScheduledThreadPoolExecutor undoLogDelete =
new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("UndoLogDelete", 1));

private final List<GlobalStatus> rollbackingStatuses =
Collections.unmodifiableList(Arrays.asList(GlobalStatus.TimeoutRollbacking,
GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.RollbackRetrying, GlobalStatus.Rollbacking));
private final GlobalStatus[] rollbackingStatuses =
new GlobalStatus[] {GlobalStatus.TimeoutRollbacking,
GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.RollbackRetrying, GlobalStatus.Rollbacking};

private final List<GlobalStatus> retryCommittingStatuses = Collections.unmodifiableList(
Arrays.asList(GlobalStatus.Committing, GlobalStatus.CommitRetrying));
private final GlobalStatus[] retryCommittingStatuses = new GlobalStatus[]{GlobalStatus.Committing, GlobalStatus.CommitRetrying};

private final ThreadPoolExecutor branchRemoveExecutor = new ThreadPoolExecutor(BRANCH_ASYNC_POOL_SIZE, BRANCH_ASYNC_POOL_SIZE,
Integer.MAX_VALUE, TimeUnit.MILLISECONDS,
Expand Down Expand Up @@ -293,78 +308,14 @@ protected void doLockCheck(GlobalLockQueryRequest request, GlobalLockQueryRespon
core.lockQuery(request.getBranchType(), request.getResourceId(), request.getXid(), request.getLockKey()));
}

/**
* Handle all session.
*/
protected void handleAllSession() {
SessionCondition sessionCondition = new SessionCondition(GlobalStatus.values());
sessionCondition.setLazyLoadBranch(true);
Collection<GlobalSession> allSessions =
SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
if (CollectionUtils.isEmpty(allSessions)) {
return;
}
List<GlobalSession> retryRollbackingSessions = new ArrayList<>();
List<GlobalSession> beginGlobalSessions = new ArrayList<>();
List<GlobalSession> retryCommittingSessions = new ArrayList<>();
List<GlobalSession> asyncCommittingSessions = new ArrayList<>();
for (GlobalSession session : allSessions) {
if (rollbackingStatuses.contains(session.getStatus())) {
retryRollbackingSessions.add(session);
} else if (retryCommittingStatuses.contains(session.getStatus())) {
retryCommittingSessions.add(session);
} else if (GlobalStatus.AsyncCommitting.equals(session.getStatus())) {
asyncCommittingSessions.add(session);
} else if (GlobalStatus.Begin.equals(session.getStatus())) {
beginGlobalSessions.add(session);
}
}
List<CompletableFuture<Void>> futures = new ArrayList<>(4);
if (!retryRollbackingSessions.isEmpty()) {
futures.add(
CompletableFuture.runAsync(() -> handleRetryRollbacking(retryRollbackingSessions), retryRollbacking));
}
if (!beginGlobalSessions.isEmpty()) {
futures.add(CompletableFuture.runAsync(() -> timeoutCheck(beginGlobalSessions), timeoutCheck));
}
if (!retryCommittingSessions.isEmpty()) {
futures.add(
CompletableFuture.runAsync(() -> handleRetryCommitting(retryCommittingSessions), retryCommitting));
}
if (!asyncCommittingSessions.isEmpty()) {
futures.add(
CompletableFuture.runAsync(() -> handleAsyncCommitting(asyncCommittingSessions), asyncCommitting));
}
if (CollectionUtils.isNotEmpty(futures)) {
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
} catch (InterruptedException e) {
LOGGER.error("transaction task thread ran abnormally: {}", e.getMessage(), e);
} catch (ExecutionException e) {
Throwable throwable = e.getCause() != null ? e.getCause() : e;
LOGGER.error("task execution exception: {}", throwable.getMessage(), throwable);
}
}
}

/**
* Timeout check.
*/
@Deprecated
protected void timeoutCheck() {
SessionCondition sessionCondition = new SessionCondition(new GlobalStatus[] {GlobalStatus.Begin});
SessionCondition sessionCondition = new SessionCondition(GlobalStatus.Begin);
sessionCondition.setLazyLoadBranch(true);
Collection<GlobalSession> allSessions =
Collection<GlobalSession> beginGlobalsessions =
SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
timeoutCheck(allSessions);
}

/**
* Timeout check.
*
* @param beginGlobalsessions
*/
protected void timeoutCheck(Collection<GlobalSession> beginGlobalsessions) {
if (CollectionUtils.isEmpty(beginGlobalsessions)) {
return;
}
Expand Down Expand Up @@ -403,24 +354,14 @@ protected void timeoutCheck(Collection<GlobalSession> beginGlobalsessions) {

}


/**
* Handle retry rollbacking.
*/
@Deprecated
protected void handleRetryRollbacking() {
SessionCondition sessionCondition =
new SessionCondition(rollbackingStatuses.toArray(new GlobalStatus[0]));
SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
Collection<GlobalSession> rollbackingSessions =
SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);
handleRetryRollbacking(rollbackingSessions);
}

/**
* Handle retry rollbacking.
*
* @param rollbackingSessions
*/
protected void handleRetryRollbacking(Collection<GlobalSession> rollbackingSessions) {
SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);
long now = System.currentTimeMillis();
SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
try {
Expand Down Expand Up @@ -454,10 +395,11 @@ protected void handleRetryRollbacking(Collection<GlobalSession> rollbackingSessi

/**
* Handle retry committing.
*
* @param committingSessions
*/
protected void handleRetryCommitting(Collection<GlobalSession> committingSessions) {
protected void handleRetryCommitting() {
SessionCondition retryCommittingSessionCondition = new SessionCondition(retryCommittingStatuses);
Collection<GlobalSession> committingSessions =
SessionHolder.getRootSessionManager().findGlobalSessions(retryCommittingSessionCondition);
if (CollectionUtils.isEmpty(committingSessions)) {
return;
}
Expand Down Expand Up @@ -495,10 +437,11 @@ private boolean isRetryTimeout(long now, long timeout, long beginTime) {

/**
* Handle async committing.
*
* @param asyncCommittingSessions
*/
protected void handleAsyncCommitting(Collection<GlobalSession> asyncCommittingSessions) {
protected void handleAsyncCommitting() {
SessionCondition asyncCommittingSessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting);
Collection<GlobalSession> asyncCommittingSessions =
SessionHolder.getRootSessionManager().findGlobalSessions(asyncCommittingSessionCondition);
if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
return;
}
Expand Down Expand Up @@ -542,11 +485,25 @@ protected void undoLogDelete() {
* Init.
*/
public void init() {
handleAllSession.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(HANDLE_ALL_SESSION, this::handleAllSession), 0,
HANDLE_ALL_SESSION_PERIOD, TimeUnit.MILLISECONDS);
undoLogDelete.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),
UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
retryRollbacking.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,
ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

retryCommitting.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,
COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

asyncCommitting.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,
ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

timeoutCheck.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,
TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);

undoLogDelete.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),
UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}

@Override
Expand All @@ -571,14 +528,12 @@ public void onResponse(AbstractResultMessage response, RpcContext context) {
@Override
public void destroy() {
// 1. first shutdown timed task
handleAllSession.shutdown();
retryRollbacking.shutdown();
retryCommitting.shutdown();
asyncCommitting.shutdown();
timeoutCheck.shutdown();
branchRemoveExecutor.shutdown();
try {
handleAllSession.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
retryRollbacking.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
retryCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
asyncCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void test_handleRetryRollbacking() throws TransactionException, Interrupt
Assertions.assertNotNull(branchId);

Thread.sleep(100);
defaultCoordinator.handleAllSession();
defaultCoordinator.timeoutCheck();
defaultCoordinator.handleRetryRollbacking();

GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
Expand Down