Skip to content

Commit

Permalink
bugfix: repeat execute hook (apache#5997)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsbxyyx committed Nov 6, 2023
1 parent af72c60 commit 967906b
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 62 deletions.
3 changes: 2 additions & 1 deletion changes/en-us/2.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ The version is updated as follows:
- [[#5971](https://github.com/seata/seata/pull/5971)] fix some configurations that are not deprecated show "Deprecated"
- [[#5977](https://github.com/seata/seata/pull/5977)] fix that rpcserver is not closed when raftServer is closed
- [[#5954](https://github.com/seata/seata/pull/5954)] fix the issue of saved branch session status does not match the actual branch session status

- [[#5887](https://github.com/seata/seata/pull/5887)] fix global transaction hook repeat execute

### optimize:
- [[#5966](https://github.com/seata/seata/pull/5966)] decouple saga expression handling and remove evaluator package
Expand Down Expand Up @@ -208,6 +208,7 @@ Thanks to these contributors for their code commits. Please report an unintended
- [iquanzhan](https://github.com/iquanzhan)
- [leizhiyuan](https://github.com/leizhiyuan)
- [Aruato](https://github.com/Aruato)
- [jsbxyyx](https://github.com/jsbxyyx)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.

Expand Down
3 changes: 3 additions & 0 deletions changes/zh-cn/2.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#5971](https://github.com/seata/seata/pull/5971)] 修复某些未弃用的配置显示"已弃用"
- [[#5977](https://github.com/seata/seata/pull/5977)] 修复当raft server关闭时,rpc server未关闭的问题
- [[#5954](https://github.com/seata/seata/pull/5954)] 修复保存的分支会话状态与实际的分支会话状态不一致的问题
- [[#5887](https://github.com/seata/seata/pull/5887)] 修复全局事务钩子重复执行


### optimize:
Expand Down Expand Up @@ -209,6 +210,8 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [leizhiyuan](https://github.com/leizhiyuan)
- [Aruato](https://github.com/Aruato)
- [ggbocoder](https://github.com/ggbocoder)
- [jsbxyyx](https://github.com/jsbxyyx)


同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ public void recordStateMachineFinished(StateMachineInstance machineInstance, Pro

protected void reportTransactionFinished(StateMachineInstance machineInstance, ProcessContext context) {
if (sagaTransactionalTemplate != null) {
GlobalTransaction globalTransaction = null;
try {
GlobalTransaction globalTransaction = getGlobalTransaction(machineInstance, context);
globalTransaction = getGlobalTransaction(machineInstance, context);
if (globalTransaction == null) {

throw new EngineExecutionException("Global transaction is not exists",
Expand Down Expand Up @@ -234,7 +235,7 @@ protected void reportTransactionFinished(StateMachineInstance machineInstance, P
// clear
RootContext.unbind();
RootContext.unbindBranchType();
sagaTransactionalTemplate.triggerAfterCompletion();
sagaTransactionalTemplate.triggerAfterCompletion(globalTransaction);
sagaTransactionalTemplate.cleanUp();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.seata.tm.TMClient;
import io.seata.tm.api.GlobalTransaction;
import io.seata.tm.api.GlobalTransactionContext;
import io.seata.tm.api.GlobalTransactionRole;
import io.seata.tm.api.TransactionalExecutor;
import io.seata.tm.api.TransactionalExecutor.ExecutionException;
import io.seata.tm.api.transaction.TransactionHook;
Expand Down Expand Up @@ -63,9 +64,9 @@ public class DefaultSagaTransactionalTemplate
@Override
public void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeCommit();
triggerBeforeCommit(tx);
tx.commit();
triggerAfterCommit();
triggerAfterCommit(tx);
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure);
Expand All @@ -75,19 +76,19 @@ public void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor
@Override
public void rollbackTransaction(GlobalTransaction tx, Throwable ex)
throws TransactionException, TransactionalExecutor.ExecutionException {
triggerBeforeRollback();
triggerBeforeRollback(tx);
tx.rollback();
triggerAfterRollback();
triggerAfterRollback(tx);
// Successfully rolled back
}

@Override
public GlobalTransaction beginTransaction(TransactionInfo txInfo) throws TransactionalExecutor.ExecutionException {
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
triggerBeforeBegin();
triggerBeforeBegin(tx);
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();
triggerAfterBegin(tx);
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);

Expand All @@ -105,7 +106,7 @@ public void reportTransaction(GlobalTransaction tx, GlobalStatus globalStatus)
throws TransactionalExecutor.ExecutionException {
try {
tx.globalReport(globalStatus);
triggerAfterCompletion();
triggerAfterCompletion(tx);
} catch (TransactionException txe) {

throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.ReportFailure);
Expand All @@ -125,73 +126,87 @@ public void branchReport(String xid, long branchId, BranchStatus status, String
DefaultResourceManager.get().branchReport(BranchType.SAGA, xid, branchId, status, applicationData);
}

protected void triggerBeforeBegin() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeBegin();
} catch (Exception e) {
LOGGER.error("Failed execute beforeBegin in hook {}", e.getMessage(), e);
protected void triggerBeforeBegin(GlobalTransaction tx) {
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeBegin();
} catch (Exception e) {
LOGGER.error("Failed execute beforeBegin in hook {}", e.getMessage(), e);
}
}
}
}

protected void triggerAfterBegin() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterBegin();
} catch (Exception e) {
LOGGER.error("Failed execute afterBegin in hook {} ", e.getMessage(), e);
protected void triggerAfterBegin(GlobalTransaction tx) {
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterBegin();
} catch (Exception e) {
LOGGER.error("Failed execute afterBegin in hook {} ", e.getMessage(), e);
}
}
}
}

protected void triggerBeforeRollback() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeRollback();
} catch (Exception e) {
LOGGER.error("Failed execute beforeRollback in hook {} ", e.getMessage(), e);
protected void triggerBeforeRollback(GlobalTransaction tx) {
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeRollback();
} catch (Exception e) {
LOGGER.error("Failed execute beforeRollback in hook {} ", e.getMessage(), e);
}
}
}
}

protected void triggerAfterRollback() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterRollback();
} catch (Exception e) {
LOGGER.error("Failed execute afterRollback in hook {}", e.getMessage(), e);
protected void triggerAfterRollback(GlobalTransaction tx) {
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterRollback();
} catch (Exception e) {
LOGGER.error("Failed execute afterRollback in hook {}", e.getMessage(), e);
}
}
}
}

protected void triggerBeforeCommit() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeCommit();
} catch (Exception e) {
LOGGER.error("Failed execute beforeCommit in hook {}", e.getMessage(), e);
protected void triggerBeforeCommit(GlobalTransaction tx) {
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeCommit();
} catch (Exception e) {
LOGGER.error("Failed execute beforeCommit in hook {}", e.getMessage(), e);
}
}
}
}

protected void triggerAfterCommit() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCommit();
} catch (Exception e) {
LOGGER.error("Failed execute afterCommit in hook {}", e.getMessage(), e);
protected void triggerAfterCommit(GlobalTransaction tx) {
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCommit();
} catch (Exception e) {
LOGGER.error("Failed execute afterCommit in hook {}", e.getMessage(), e);
}
}
}
}

@Override
public void triggerAfterCompletion() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCompletion();
} catch (Exception e) {
LOGGER.error("Failed execute afterCompletion in hook {}", e.getMessage(), e);
public void triggerAfterCompletion(GlobalTransaction tx) {
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCompletion();
} catch (Exception e) {
LOGGER.error("Failed execute afterCompletion in hook {}", e.getMessage(), e);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ long branchRegister(String resourceId, String clientId, String xid, String appli
void branchReport(String xid, long branchId, BranchStatus status, String applicationData)
throws TransactionException;

void triggerAfterCompletion();
void triggerAfterCompletion(GlobalTransaction tx);

void cleanUp();
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void branchReport(String xid, long branchId, BranchStatus status, String
}

@Override
public void triggerAfterCompletion() {
public void triggerAfterCompletion(GlobalTransaction tx) {

}

Expand Down
35 changes: 27 additions & 8 deletions tm/src/main/java/io/seata/tm/api/TransactionalTemplate.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public Object execute(TransactionalExecutor business) throws Throwable {
} finally {
//5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
triggerAfterCompletion(tx);
cleanUp();
}
} finally {
Expand Down Expand Up @@ -200,6 +200,12 @@ private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTran

private void commitTransaction(GlobalTransaction tx, TransactionInfo txInfo)
throws TransactionalExecutor.ExecutionException, TransactionException {
if (tx.getGlobalTransactionRole() != GlobalTransactionRole.Launcher) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore commit: just involved in global transaction [{}]", tx.getXid());
}
return;
}
if (isTimeout(tx.getCreateTime(), txInfo)) {
// business execution timeout
Exception exx = new TmTransactionException(TransactionExceptionCode.TransactionTimeout,
Expand Down Expand Up @@ -245,7 +251,12 @@ private void commitTransaction(GlobalTransaction tx, TransactionInfo txInfo)
}

private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {

if (tx.getGlobalTransactionRole() != GlobalTransactionRole.Launcher) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore rollback: just involved in global transaction [{}]", tx.getXid());
}
return;
}
try {
triggerBeforeRollback();
tx.rollback();
Expand Down Expand Up @@ -285,6 +296,12 @@ private void rollbackTransaction(GlobalTransaction tx, Throwable originalExcepti
}

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
if (tx.getGlobalTransactionRole() != GlobalTransactionRole.Launcher) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore begin: just involved in global transaction [{}]", tx.getXid());
}
return;
}
try {
triggerBeforeBegin();
tx.begin(txInfo.getTimeOut(), txInfo.getName());
Expand Down Expand Up @@ -356,12 +373,14 @@ private void triggerAfterCommit() {
}
}

private void triggerAfterCompletion() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCompletion();
} catch (Exception e) {
LOGGER.error("Failed execute afterCompletion in hook {}", e.getMessage(), e);
private void triggerAfterCompletion(GlobalTransaction tx) {
if (tx == null || tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCompletion();
} catch (Exception e) {
LOGGER.error("Failed execute afterCompletion in hook {}", e.getMessage(), e);
}
}
}
}
Expand Down

0 comments on commit 967906b

Please sign in to comment.