diff --git a/common/src/main/java/io/seata/common/DefaultValues.java b/common/src/main/java/io/seata/common/DefaultValues.java index 11c228e441b..971648ffeb4 100644 --- a/common/src/main/java/io/seata/common/DefaultValues.java +++ b/common/src/main/java/io/seata/common/DefaultValues.java @@ -33,6 +33,9 @@ public interface DefaultValues { boolean DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE = false; boolean DEFAULT_TM_DEGRADE_CHECK = false; boolean DEFAULT_CLIENT_SAGA_BRANCH_REGISTER_ENABLE = false; + boolean DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE = false; + boolean DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE = false; + /** * Shutdown timeout default 3s */ diff --git a/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java b/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java index ede07f29150..036eae57582 100644 --- a/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java +++ b/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java @@ -133,6 +133,16 @@ public interface ConfigurationKeys { */ String CLIENT_SAGA_JSON_PARSER = CLIENT_RM_PREFIX + "sagaJsonParser"; + /** + * The constant CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE. + */ + String CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE = CLIENT_RM_PREFIX + "sagaRetryPersistModeUpdate"; + + /** + * The constant CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE. + */ + String CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE = CLIENT_RM_PREFIX + "sagaCompensatePersistModeUpdate"; + /** * The constant CLIENT_REPORT_RETRY_COUNT. */ diff --git a/saga/seata-saga-engine-store/src/main/java/io/seata/saga/engine/config/DbStateMachineConfig.java b/saga/seata-saga-engine-store/src/main/java/io/seata/saga/engine/config/DbStateMachineConfig.java index 0ccdab187b6..50f2fbf9da5 100644 --- a/saga/seata-saga-engine-store/src/main/java/io/seata/saga/engine/config/DbStateMachineConfig.java +++ b/saga/seata-saga-engine-store/src/main/java/io/seata/saga/engine/config/DbStateMachineConfig.java @@ -36,6 +36,8 @@ import static io.seata.common.DefaultValues.DEFAULT_CLIENT_REPORT_SUCCESS_ENABLE; import static io.seata.common.DefaultValues.DEFAULT_CLIENT_SAGA_BRANCH_REGISTER_ENABLE; +import static io.seata.common.DefaultValues.DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE; +import static io.seata.common.DefaultValues.DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE; import static io.seata.common.DefaultValues.DEFAULT_SAGA_JSON_PARSER; /** @@ -66,6 +68,10 @@ public DbStateMachineConfig() { setSagaJsonParser(configuration.getConfig(ConfigurationKeys.CLIENT_SAGA_JSON_PARSER, DEFAULT_SAGA_JSON_PARSER)); this.applicationId = configuration.getConfig(ConfigurationKeys.APPLICATION_ID); this.txServiceGroup = configuration.getConfig(ConfigurationKeys.TX_SERVICE_GROUP); + setSagaRetryPersistModeUpdate(configuration.getBoolean(ConfigurationKeys.CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE, + DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE)); + setSagaCompensatePersistModeUpdate(configuration.getBoolean(ConfigurationKeys.CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE, + DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE)); } } catch (Exception e) { LOGGER.warn("Load SEATA configuration failed, use default configuration instead.", e); diff --git a/saga/seata-saga-engine-store/src/main/java/io/seata/saga/engine/store/db/DbAndReportTcStateLogStore.java b/saga/seata-saga-engine-store/src/main/java/io/seata/saga/engine/store/db/DbAndReportTcStateLogStore.java index 9d3450d69fd..fcf41c42349 100644 --- a/saga/seata-saga-engine-store/src/main/java/io/seata/saga/engine/store/db/DbAndReportTcStateLogStore.java +++ b/saga/seata-saga-engine-store/src/main/java/io/seata/saga/engine/store/db/DbAndReportTcStateLogStore.java @@ -36,6 +36,8 @@ import io.seata.saga.engine.StateMachineConfig; import io.seata.saga.engine.config.DbStateMachineConfig; import io.seata.saga.engine.exception.EngineExecutionException; +import io.seata.saga.engine.impl.DefaultStateMachineConfig; +import io.seata.saga.engine.pcext.StateInstruction; import io.seata.saga.engine.pcext.utils.EngineUtils; import io.seata.saga.engine.sequence.SeqGenerator; import io.seata.saga.engine.serializer.Serializer; @@ -46,7 +48,9 @@ import io.seata.saga.statelang.domain.DomainConstants; import io.seata.saga.statelang.domain.ExecutionStatus; import io.seata.saga.statelang.domain.StateInstance; +import io.seata.saga.statelang.domain.StateMachine; import io.seata.saga.statelang.domain.StateMachineInstance; +import io.seata.saga.statelang.domain.impl.ServiceTaskStateImpl; import io.seata.saga.statelang.domain.impl.StateInstanceImpl; import io.seata.saga.statelang.domain.impl.StateMachineInstanceImpl; import io.seata.saga.tm.SagaTransactionalTemplate; @@ -90,11 +94,7 @@ public void recordStateMachineStarted(StateMachineInstance machineInstance, Proc //if parentId is not null, machineInstance is a SubStateMachine, do not start a new global transaction, //use parent transaction instead. String parentId = machineInstance.getParentId(); - if (StringUtils.hasLength(parentId)) { - if (StringUtils.isEmpty(machineInstance.getId())) { - machineInstance.setId(parentId); - } - } else { + if (StringUtils.isEmpty(parentId)) { beginTransaction(machineInstance, context); } @@ -263,28 +263,39 @@ public void recordStateMachineRestarted(StateMachineInstance machineInstance, Pr @Override public void recordStateStarted(StateInstance stateInstance, ProcessContext context) { if (stateInstance != null) { - //if this state is for retry, do not register branch, but generate id - if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) { - stateInstance.setId(generateRetryStateInstanceId(stateInstance)); - } - //if this state is for compensation, do not register branch, but generate id - else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) { + boolean isUpdateMode = isUpdateMode(stateInstance, context); - stateInstance.setId(generateCompensateStateInstanceId(stateInstance)); + // if this state is for retry, do not register branch + if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) { + if (isUpdateMode) { + stateInstance.setId(stateInstance.getStateIdRetriedFor()); + } else { + // generate id by default + stateInstance.setId(generateRetryStateInstanceId(stateInstance)); + } } - else { + // if this state is for compensation, do not register branch + else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) { + stateInstance.setId(generateCompensateStateInstanceId(stateInstance, isUpdateMode)); + } else { branchRegister(stateInstance, context); } - if (StringUtils.isEmpty(stateInstance.getId()) && seqGenerator != null) { stateInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_INST)); } stateInstance.setSerializedInputParams(paramsSerializer.serialize(stateInstance.getInputParams())); - executeUpdate(stateLogStoreSqls.getRecordStateStartedSql(dbType), STATE_INSTANCE_TO_STATEMENT_FOR_INSERT, - stateInstance); + if (!isUpdateMode) { + executeUpdate(stateLogStoreSqls.getRecordStateStartedSql(dbType), + STATE_INSTANCE_TO_STATEMENT_FOR_INSERT, stateInstance); + } else { + // if this retry/compensate state do not need persist, just update last inst + executeUpdate(stateLogStoreSqls.getUpdateStateExecutionStatusSql(dbType), + stateInstance.getStatus().name(), new Timestamp(System.currentTimeMillis()), + stateInstance.getMachineInstanceId(), stateInstance.getId()); + } } } @@ -376,9 +387,13 @@ private String generateRetryStateInstanceId(StateInstance stateInstance) { * @param stateInstance * @return */ - private String generateCompensateStateInstanceId(StateInstance stateInstance) { + private String generateCompensateStateInstanceId(StateInstance stateInstance, boolean isUpdateMode) { String originalCompensateStateInstId = stateInstance.getStateIdCompensatedFor(); int maxIndex = 1; + // if update mode, means update last compensate inst + if (isUpdateMode) { + return originalCompensateStateInstId + "-" + maxIndex; + } for (StateInstance aStateInstance : stateInstance.getStateMachineInstance().getStateList()) { if (aStateInstance != stateInstance && originalCompensateStateInstId.equals(aStateInstance.getStateIdCompensatedFor())) { @@ -405,6 +420,40 @@ private int getIdIndex(String stateInstanceId, String separator) { return -1; } + private boolean isUpdateMode(StateInstance stateInstance, ProcessContext context) { + DefaultStateMachineConfig stateMachineConfig = (DefaultStateMachineConfig)context.getVariable( + DomainConstants.VAR_NAME_STATEMACHINE_CONFIG); + StateInstruction instruction = context.getInstruction(StateInstruction.class); + ServiceTaskStateImpl state = (ServiceTaskStateImpl)instruction.getState(context); + StateMachine stateMachine = stateInstance.getStateMachineInstance().getStateMachine(); + + if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) { + + if (null != state.isRetryPersistModeUpdate()) { + return state.isRetryPersistModeUpdate(); + } else if (null != stateMachine.isRetryPersistModeUpdate()) { + return stateMachine.isRetryPersistModeUpdate(); + } + return stateMachineConfig.isSagaRetryPersistModeUpdate(); + + } else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) { + + // find if this compensate has been executed + for (StateInstance aStateInstance : stateInstance.getStateMachineInstance().getStateList()) { + if (aStateInstance.isForCompensation() && aStateInstance.getName().equals(stateInstance.getName())) { + if (null != state.isCompensatePersistModeUpdate()) { + return state.isCompensatePersistModeUpdate(); + } else if (null != stateMachine.isCompensatePersistModeUpdate()) { + return stateMachine.isCompensatePersistModeUpdate(); + } + return stateMachineConfig.isSagaCompensatePersistModeUpdate(); + } + } + return false; + } + return false; + } + @Override public void recordStateFinished(StateInstance stateInstance, ProcessContext context) { if (stateInstance != null) { @@ -444,7 +493,11 @@ protected void branchReport(StateInstance stateInstance, ProcessContext context) StateInstance originalStateInst = null; if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) { - originalStateInst = findOutOriginalStateInstanceOfRetryState(stateInstance); + if (isUpdateMode(stateInstance, context)) { + originalStateInst = stateInstance; + } else { + originalStateInst = findOutOriginalStateInstanceOfRetryState(stateInstance); + } if (ExecutionStatus.SU.equals(stateInstance.getStatus())) { branchStatus = BranchStatus.PhaseTwo_Committed; @@ -457,7 +510,12 @@ protected void branchReport(StateInstance stateInstance, ProcessContext context) } else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) { - originalStateInst = findOutOriginalStateInstanceOfCompensateState(stateInstance); + if (isUpdateMode(stateInstance, context)) { + originalStateInst = stateInstance.getStateMachineInstance().getStateMap().get( + stateInstance.getStateIdCompensatedFor()); + } else { + originalStateInst = findOutOriginalStateInstanceOfCompensateState(stateInstance); + } } if (originalStateInst == null) { @@ -774,6 +832,7 @@ public void toStatement(StateInstance stateInstance, PreparedStatement statement statement.setString(12, stateInstance.getBusinessKey()); statement.setString(13, stateInstance.getStateIdCompensatedFor()); statement.setString(14, stateInstance.getStateIdRetriedFor()); + statement.setTimestamp(15, new Timestamp(stateInstance.getGmtUpdated().getTime())); } } @@ -785,8 +844,9 @@ public void toStatement(StateInstance stateInstance, PreparedStatement statement stateInstance.getException() != null ? (byte[]) stateInstance.getSerializedException() : null); statement.setString(3, stateInstance.getStatus().name()); statement.setObject(4, stateInstance.getSerializedOutputParams()); - statement.setString(5, stateInstance.getId()); - statement.setString(6, stateInstance.getMachineInstanceId()); + statement.setTimestamp(5, new Timestamp(stateInstance.getGmtEnd().getTime())); + statement.setString(6, stateInstance.getId()); + statement.setString(7, stateInstance.getMachineInstanceId()); } } diff --git a/saga/seata-saga-engine-store/src/main/java/io/seata/saga/engine/store/db/StateLogStoreSqls.java b/saga/seata-saga-engine-store/src/main/java/io/seata/saga/engine/store/db/StateLogStoreSqls.java index 06b3c15bfa6..3c237ae64a0 100644 --- a/saga/seata-saga-engine-store/src/main/java/io/seata/saga/engine/store/db/StateLogStoreSqls.java +++ b/saga/seata-saga-engine-store/src/main/java/io/seata/saga/engine/store/db/StateLogStoreSqls.java @@ -68,15 +68,15 @@ public class StateLogStoreSqls { private static final String RECORD_STATE_STARTED_SQL = "INSERT INTO ${TABLE_PREFIX}state_inst (id, machine_inst_id, name, type," + " gmt_started, service_name, service_method, service_type, is_for_update, input_params, status, " - + "business_key, " - + "state_id_compensated_for, state_id_retried_for)\n" + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + "business_key, state_id_compensated_for, state_id_retried_for, gmt_updated)\n" + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; - private static final String RECORD_STATE_FINISHED_SQL - = "UPDATE ${TABLE_PREFIX}state_inst SET gmt_end = ?, excep = ?, status = ?, output_params = ? WHERE id = ? " - + "AND machine_inst_id = ?"; + private static final String RECORD_STATE_FINISHED_SQL = + "UPDATE ${TABLE_PREFIX}state_inst SET gmt_end = ?, excep = ?, status = ?, output_params = ?, gmt_updated = ? " + + "WHERE id = ? AND machine_inst_id = ?"; private static final String UPDATE_STATE_EXECUTION_STATUS_SQL - = "UPDATE ${TABLE_PREFIX}state_inst SET status = ? WHERE machine_inst_id = ? AND id = ?"; + = "UPDATE ${TABLE_PREFIX}state_inst SET status = ?, gmt_updated = ? WHERE machine_inst_id = ? AND id = ?"; private static final String QUERY_STATE_INSTANCES_BY_MACHINE_INSTANCE_ID_SQL = "SELECT " + STATE_INSTANCE_FIELDS + " FROM ${TABLE_PREFIX}state_inst WHERE machine_inst_id = ? ORDER BY gmt_started, ID ASC"; diff --git a/saga/seata-saga-engine/src/main/java/io/seata/saga/engine/impl/DefaultStateMachineConfig.java b/saga/seata-saga-engine/src/main/java/io/seata/saga/engine/impl/DefaultStateMachineConfig.java index 86853017c91..5ec5299ee96 100644 --- a/saga/seata-saga-engine/src/main/java/io/seata/saga/engine/impl/DefaultStateMachineConfig.java +++ b/saga/seata-saga-engine/src/main/java/io/seata/saga/engine/impl/DefaultStateMachineConfig.java @@ -70,6 +70,8 @@ import javax.script.ScriptEngineManager; +import static io.seata.common.DefaultValues.DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE; +import static io.seata.common.DefaultValues.DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE; import static io.seata.common.DefaultValues.DEFAULT_SAGA_JSON_PARSER; /** @@ -108,6 +110,8 @@ public class DefaultStateMachineConfig implements StateMachineConfig, Applicatio private String defaultTenantId = "000001"; private ScriptEngineManager scriptEngineManager; private String sagaJsonParser = DEFAULT_SAGA_JSON_PARSER; + private boolean sagaRetryPersistModeUpdate = DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE; + private boolean sagaCompensatePersistModeUpdate = DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE; protected void init() throws Exception { @@ -477,4 +481,20 @@ public String getSagaJsonParser() { public void setSagaJsonParser(String sagaJsonParser) { this.sagaJsonParser = sagaJsonParser; } + + public boolean isSagaRetryPersistModeUpdate() { + return sagaRetryPersistModeUpdate; + } + + public void setSagaRetryPersistModeUpdate(boolean sagaRetryPersistModeUpdate) { + this.sagaRetryPersistModeUpdate = sagaRetryPersistModeUpdate; + } + + public boolean isSagaCompensatePersistModeUpdate() { + return sagaCompensatePersistModeUpdate; + } + + public void setSagaCompensatePersistModeUpdate(boolean sagaCompensatePersistModeUpdate) { + this.sagaCompensatePersistModeUpdate = sagaCompensatePersistModeUpdate; + } } \ No newline at end of file diff --git a/saga/seata-saga-engine/src/main/java/io/seata/saga/engine/pcext/interceptors/ServiceTaskHandlerInterceptor.java b/saga/seata-saga-engine/src/main/java/io/seata/saga/engine/pcext/interceptors/ServiceTaskHandlerInterceptor.java index d879b39a69d..a24064b3b44 100644 --- a/saga/seata-saga-engine/src/main/java/io/seata/saga/engine/pcext/interceptors/ServiceTaskHandlerInterceptor.java +++ b/saga/seata-saga-engine/src/main/java/io/seata/saga/engine/pcext/interceptors/ServiceTaskHandlerInterceptor.java @@ -122,6 +122,7 @@ public void preProcess(ProcessContext context) throws EngineExecutionException { stateInstance.setStateMachineInstance(stateMachineInstance); stateInstance.setName(state.getName()); stateInstance.setGmtStarted(new Date()); + stateInstance.setGmtUpdated(stateInstance.getGmtStarted()); stateInstance.setStatus(ExecutionStatus.RU); stateInstance.setStateIdRetriedFor( diff --git a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/ServiceTaskState.java b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/ServiceTaskState.java index d35f33ff8c8..d4c877c0ff0 100644 --- a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/ServiceTaskState.java +++ b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/ServiceTaskState.java @@ -58,4 +58,18 @@ public interface ServiceTaskState extends TaskState { * @return */ boolean isPersist(); + + /** + * Is update last retry execution log, default append new + * + * @return + */ + Boolean isRetryPersistModeUpdate(); + + /** + * Is update last compensate execution log, default append new + * + * @return + */ + Boolean isCompensatePersistModeUpdate(); } \ No newline at end of file diff --git a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/StateInstance.java b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/StateInstance.java index 4b037581a13..d6a92fc3328 100644 --- a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/StateInstance.java +++ b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/StateInstance.java @@ -150,6 +150,20 @@ public interface StateInstance { */ void setGmtStarted(Date gmtStarted); + /** + * get update time + * + * @return + */ + Date getGmtUpdated(); + + /** + * set update time + * + * @param gmtUpdated + */ + void setGmtUpdated(Date gmtUpdated); + /** * get end time * diff --git a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/StateMachine.java b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/StateMachine.java index c5684ff1d3b..080cb78d2c7 100644 --- a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/StateMachine.java +++ b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/StateMachine.java @@ -142,6 +142,20 @@ public interface StateMachine { */ boolean isPersist(); + /** + * Is update last retry execution log, default append new + * + * @return + */ + Boolean isRetryPersistModeUpdate(); + + /** + * Is update last compensate execution log, default append new + * + * @return + */ + Boolean isCompensatePersistModeUpdate(); + /** * State language text * diff --git a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/impl/AbstractTaskState.java b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/impl/AbstractTaskState.java index 487eed98417..173ce6e9b90 100644 --- a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/impl/AbstractTaskState.java +++ b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/impl/AbstractTaskState.java @@ -39,6 +39,8 @@ public abstract class AbstractTaskState extends BaseState implements TaskState { private List inputExpressions; private Map outputExpressions; private boolean isPersist = true; + private Boolean retryPersistModeUpdate; + private Boolean compensatePersistModeUpdate; @Override public String getCompensateState() { @@ -113,6 +115,22 @@ public void setPersist(boolean persist) { isPersist = persist; } + public Boolean isRetryPersistModeUpdate() { + return retryPersistModeUpdate; + } + + public void setRetryPersistModeUpdate(Boolean retryPersistModeUpdate) { + this.retryPersistModeUpdate = retryPersistModeUpdate; + } + + public Boolean isCompensatePersistModeUpdate() { + return compensatePersistModeUpdate; + } + + public void setCompensatePersistModeUpdate(Boolean compensatePersistModeUpdate) { + this.compensatePersistModeUpdate = compensatePersistModeUpdate; + } + public List getInputExpressions() { return inputExpressions; } diff --git a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/impl/StateInstanceImpl.java b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/impl/StateInstanceImpl.java index a2daecb89b6..8222619efc3 100644 --- a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/impl/StateInstanceImpl.java +++ b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/impl/StateInstanceImpl.java @@ -38,6 +38,7 @@ public class StateInstanceImpl implements StateInstance { private String serviceType; private String businessKey; private Date gmtStarted; + private Date gmtUpdated; private Date gmtEnd; private boolean isForUpdate; private Exception exception; @@ -143,6 +144,16 @@ public void setGmtStarted(Date gmtStarted) { this.gmtStarted = gmtStarted; } + @Override + public Date getGmtUpdated() { + return gmtUpdated; + } + + @Override + public void setGmtUpdated(Date gmtUpdated) { + this.gmtUpdated = gmtUpdated; + } + @Override public Date getGmtEnd() { return gmtEnd; diff --git a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/impl/StateMachineImpl.java b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/impl/StateMachineImpl.java index b6135526d2d..159e81e3657 100644 --- a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/impl/StateMachineImpl.java +++ b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/domain/impl/StateMachineImpl.java @@ -40,6 +40,8 @@ public class StateMachineImpl implements StateMachine { private Status status = Status.AC; private RecoverStrategy recoverStrategy; private boolean isPersist = true; + private Boolean retryPersistModeUpdate; + private Boolean compensatePersistModeUpdate; private String type = "STATE_LANG"; private transient String content; private Date gmtCreate; @@ -189,4 +191,22 @@ public Date getGmtCreate() { public void setGmtCreate(Date gmtCreate) { this.gmtCreate = gmtCreate; } + + @Override + public Boolean isRetryPersistModeUpdate() { + return retryPersistModeUpdate; + } + + public void setRetryPersistModeUpdate(Boolean retryPersistModeUpdate) { + this.retryPersistModeUpdate = retryPersistModeUpdate; + } + + @Override + public Boolean isCompensatePersistModeUpdate() { + return compensatePersistModeUpdate; + } + + public void setCompensatePersistModeUpdate(Boolean compensatePersistModeUpdate) { + this.compensatePersistModeUpdate = compensatePersistModeUpdate; + } } \ No newline at end of file diff --git a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/parser/impl/AbstractTaskStateParser.java b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/parser/impl/AbstractTaskStateParser.java index 263dd411d47..3f1065e7829 100644 --- a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/parser/impl/AbstractTaskStateParser.java +++ b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/parser/impl/AbstractTaskStateParser.java @@ -46,6 +46,18 @@ protected void parseTaskAttributes(AbstractTaskState state, Object node) { state.setPersist(false); } + // customize if update origin or append new retryStateInstLog + Object isRetryPersistModeUpdate = nodeMap.get("IsRetryPersistModeUpdate"); + if (isRetryPersistModeUpdate instanceof Boolean) { + state.setRetryPersistModeUpdate(Boolean.TRUE.equals(isRetryPersistModeUpdate)); + } + + // customize if update last or append new compensateStateInstLog + Object isCompensatePersistModeUpdate = nodeMap.get("IsCompensatePersistModeUpdate"); + if (isCompensatePersistModeUpdate instanceof Boolean) { + state.setCompensatePersistModeUpdate(Boolean.TRUE.equals(isCompensatePersistModeUpdate)); + } + List retryList = (List) nodeMap.get("Retry"); if (retryList != null) { state.setRetry(parseRetry(retryList)); diff --git a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/parser/impl/StateMachineParserImpl.java b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/parser/impl/StateMachineParserImpl.java index b548bb9af18..5fd73e75f0e 100644 --- a/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/parser/impl/StateMachineParserImpl.java +++ b/saga/seata-saga-statelang/src/main/java/io/seata/saga/statelang/parser/impl/StateMachineParserImpl.java @@ -79,6 +79,18 @@ public StateMachine parse(String json) { stateMachine.setPersist(false); } + // customize if update origin or append new retryStateInstLog + Object isRetryPersistModeUpdate = node.get("IsRetryPersistModeUpdate"); + if (isRetryPersistModeUpdate instanceof Boolean) { + stateMachine.setRetryPersistModeUpdate(Boolean.TRUE.equals(isRetryPersistModeUpdate)); + } + + // customize if update last or append new compensateStateInstLog + Object isCompensatePersistModeUpdate = node.get("IsCompensatePersistModeUpdate"); + if (isCompensatePersistModeUpdate instanceof Boolean) { + stateMachine.setCompensatePersistModeUpdate(Boolean.TRUE.equals(isCompensatePersistModeUpdate)); + } + Map statesNode = (Map) node.get("States"); statesNode.forEach((stateName, value) -> { Map stateNode = (Map) value; diff --git a/script/client/conf/file.conf b/script/client/conf/file.conf index 08aef22beef..7c58ec1ef0f 100644 --- a/script/client/conf/file.conf +++ b/script/client/conf/file.conf @@ -51,6 +51,9 @@ client { tableMetaCheckEnable = false reportSuccessEnable = false sagaBranchRegisterEnable = false + sagaJsonParser = jackson + sagaRetryPersistModeUpdate = false + sagaCompensatePersistModeUpdate = false } tm { commitRetryCount = 5 diff --git a/script/client/saga/db/db2.sql b/script/client/saga/db/db2.sql index 78de4769e6f..3a8d48d0a65 100644 --- a/script/client/saga/db/db2.sql +++ b/script/client/saga/db/db2.sql @@ -58,6 +58,7 @@ create table seata_state_inst output_params clob(65536) inline length 1024, status varchar(2) not null, excep blob(10240), + gmt_updated timestamp(3), gmt_end timestamp(3), primary key(id, machine_inst_id) ); \ No newline at end of file diff --git a/script/client/saga/db/h2.sql b/script/client/saga/db/h2.sql index 19ba12e7ca7..8493c64277c 100644 --- a/script/client/saga/db/h2.sql +++ b/script/client/saga/db/h2.sql @@ -52,6 +52,7 @@ create table if not exists seata_state_inst output_params clob comment 'output parameters', status varchar(2) not null comment 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)', excep blob comment 'exception', + gmt_updated timestamp(3) comment 'update time', gmt_end timestamp(3) comment 'end time', primary key (id, machine_inst_id) ); \ No newline at end of file diff --git a/script/client/saga/db/mysql.sql b/script/client/saga/db/mysql.sql index 35f45824e69..429e04fb3c7 100644 --- a/script/client/saga/db/mysql.sql +++ b/script/client/saga/db/mysql.sql @@ -57,6 +57,7 @@ CREATE TABLE IF NOT EXISTS `seata_state_inst` `output_params` TEXT COMMENT 'output parameters', `status` VARCHAR(2) NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)', `excep` BLOB COMMENT 'exception', + `gmt_updated` DATETIME(3) COMMENT 'update time', `gmt_end` DATETIME(3) COMMENT 'end time', PRIMARY KEY (`id`, `machine_inst_id`) ) ENGINE = InnoDB diff --git a/script/client/saga/db/oracle.sql b/script/client/saga/db/oracle.sql index 825e69f44ae..6e16506b44e 100644 --- a/script/client/saga/db/oracle.sql +++ b/script/client/saga/db/oracle.sql @@ -58,6 +58,7 @@ CREATE TABLE seata_state_inst output_params CLOB, status VARCHAR(2) NOT NULL, excep BLOB, + gmt_updated TIMESTAMP(3), gmt_end TIMESTAMP(3), PRIMARY KEY (id, machine_inst_id) ); diff --git a/script/client/saga/db/postgresql.sql b/script/client/saga/db/postgresql.sql index b74e278c1a8..3110c8713e1 100644 --- a/script/client/saga/db/postgresql.sql +++ b/script/client/saga/db/postgresql.sql @@ -52,7 +52,8 @@ CREATE TABLE IF NOT EXISTS public.seata_state_inst input_params TEXT, output_params TEXT, status VARCHAR(2) NOT NULL, - excep BYTEA, + excep BYTEA, + gmt_updated TIMESTAMP(3) DEFAULT now(), gmt_end TIMESTAMP(3) DEFAULT now(), CONSTRAINT pk_seata_state_inst PRIMARY KEY (id, machine_inst_id) ); diff --git a/script/client/spring/application.properties b/script/client/spring/application.properties index 2704a7fcdd5..8da397cb8e5 100755 --- a/script/client/spring/application.properties +++ b/script/client/spring/application.properties @@ -27,6 +27,8 @@ seata.client.rm.table-meta-check-enable=false seata.client.rm.report-success-enable=false seata.client.rm.saga-branch-register-enable=false seata.client.rm.saga-json-parser=fastjson +seata.client.rm.saga-retry-persist-mode-update=false +seata.client.rm.saga-compensate-persist-mode-update=false seata.client.rm.lock.retry-interval=10 seata.client.rm.lock.retry-times=30 seata.client.rm.lock.retry-policy-branch-rollback-on-conflict=true diff --git a/script/client/spring/application.yml b/script/client/spring/application.yml index e0934cd2557..5fcb18f3849 100755 --- a/script/client/spring/application.yml +++ b/script/client/spring/application.yml @@ -14,6 +14,8 @@ seata: report-success-enable: false saga-branch-register-enable: false saga-json-parser: fastjson + saga-retry-persist-mode-update: false + saga-compensate-persist-mode-update: false lock: retry-interval: 10 retry-times: 30 diff --git a/seata-spring-boot-starter/src/main/java/io/seata/spring/boot/autoconfigure/properties/client/RmProperties.java b/seata-spring-boot-starter/src/main/java/io/seata/spring/boot/autoconfigure/properties/client/RmProperties.java index dd69e1bd4b1..2f5115ab52d 100644 --- a/seata-spring-boot-starter/src/main/java/io/seata/spring/boot/autoconfigure/properties/client/RmProperties.java +++ b/seata-spring-boot-starter/src/main/java/io/seata/spring/boot/autoconfigure/properties/client/RmProperties.java @@ -22,6 +22,8 @@ import static io.seata.common.DefaultValues.DEFAULT_CLIENT_REPORT_RETRY_COUNT; import static io.seata.common.DefaultValues.DEFAULT_CLIENT_REPORT_SUCCESS_ENABLE; import static io.seata.common.DefaultValues.DEFAULT_CLIENT_SAGA_BRANCH_REGISTER_ENABLE; +import static io.seata.common.DefaultValues.DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE; +import static io.seata.common.DefaultValues.DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE; import static io.seata.common.DefaultValues.DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE; import static io.seata.common.DefaultValues.DEFAULT_SAGA_JSON_PARSER; import static io.seata.spring.boot.autoconfigure.StarterConstants.CLIENT_RM_PREFIX; @@ -38,6 +40,8 @@ public class RmProperties { private boolean reportSuccessEnable = DEFAULT_CLIENT_REPORT_SUCCESS_ENABLE; private boolean sagaBranchRegisterEnable = DEFAULT_CLIENT_SAGA_BRANCH_REGISTER_ENABLE; private String sagaJsonParser = DEFAULT_SAGA_JSON_PARSER; + private boolean sagaRetryPersistModeUpdate = DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE; + private boolean sagaCompensatePersistModeUpdate = DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE; public int getAsyncCommitBufferLimit() { return asyncCommitBufferLimit; @@ -90,4 +94,20 @@ public String getSagaJsonParser() { public void setSagaJsonParser(String sagaJsonParser) { this.sagaJsonParser = sagaJsonParser; } + + public boolean isSagaRetryPersistModeUpdate() { + return sagaRetryPersistModeUpdate; + } + + public void setSagaRetryPersistModeUpdate(boolean sagaRetryPersistModeUpdate) { + this.sagaRetryPersistModeUpdate = sagaRetryPersistModeUpdate; + } + + public boolean isSagaCompensatePersistModeUpdate() { + return sagaCompensatePersistModeUpdate; + } + + public void setSagaCompensatePersistModeUpdate(boolean sagaCompensatePersistModeUpdate) { + this.sagaCompensatePersistModeUpdate = sagaCompensatePersistModeUpdate; + } } diff --git a/seata-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/seata-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json index a49c6d1601f..cd36eac2af0 100644 --- a/seata-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/seata-spring-boot-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -72,6 +72,18 @@ "sourceType": "io.seata.spring.boot.autoconfigure.properties.client.RmProperties", "defaultValue": "fastjson" }, + { + "name": "seata.client.rm.saga-retry-persist-mode-update", + "type": "java.lang.Boolean", + "sourceType": "io.seata.spring.boot.autoconfigure.properties.client.RmProperties", + "defaultValue": false + }, + { + "name": "seata.client.rm.saga-compensate-persist-mode-update", + "type": "java.lang.Boolean", + "sourceType": "io.seata.spring.boot.autoconfigure.properties.client.RmProperties", + "defaultValue": false + }, { "name": "seata.client.rm.lock.retry-interval", "type": "java.lang.Integer", diff --git a/seata-spring-boot-starter/src/test/java/io/seata/spring/boot/autoconfigure/PropertiesTest.java b/seata-spring-boot-starter/src/test/java/io/seata/spring/boot/autoconfigure/PropertiesTest.java index 9459039aa9f..104c43df73d 100644 --- a/seata-spring-boot-starter/src/test/java/io/seata/spring/boot/autoconfigure/PropertiesTest.java +++ b/seata-spring-boot-starter/src/test/java/io/seata/spring/boot/autoconfigure/PropertiesTest.java @@ -91,6 +91,8 @@ public void testRmProperties() { assertEquals(5, context.getBean(RmProperties.class).getReportRetryCount()); assertFalse(context.getBean(RmProperties.class).isTableMetaCheckEnable()); assertFalse(context.getBean(RmProperties.class).isReportSuccessEnable()); + assertFalse(context.getBean(RmProperties.class).isSagaRetryPersistModeUpdate()); + assertFalse(context.getBean(RmProperties.class).isSagaCompensatePersistModeUpdate()); } @Test diff --git a/test/src/test/java/io/seata/saga/engine/db/StateMachineDBTests.java b/test/src/test/java/io/seata/saga/engine/db/StateMachineDBTests.java index 774ad2e2830..ba74a08b40d 100644 --- a/test/src/test/java/io/seata/saga/engine/db/StateMachineDBTests.java +++ b/test/src/test/java/io/seata/saga/engine/db/StateMachineDBTests.java @@ -740,6 +740,99 @@ public void testStateMachineRecordFailed() { Assertions.assertNull(RootContext.getXID()); } + @Test + public void testSimpleRetryStateAsUpdateMode() throws Exception { + long start = System.currentTimeMillis(); + + Map paramMap = new HashMap<>(1); + paramMap.put("a", 1); + paramMap.put("barThrowException", "true"); + + String stateMachineName = "simpleUpdateStateMachine"; + + StateMachineInstance inst = stateMachineEngine.start(stateMachineName, null, paramMap); + + long cost = System.currentTimeMillis() - start; + System.out.println("====== cost :" + cost); + + Assertions.assertNotNull(inst.getException()); + Assertions.assertEquals(inst.getStatus(), ExecutionStatus.UN); + + Thread.sleep(sleepTime); + inst = stateMachineEngine.getStateMachineConfig().getStateLogStore().getStateMachineInstance(inst.getId()); + Assertions.assertEquals(inst.getStateList().size(), 2); + } + + @Test + public void testSimpleCompensateStateAsUpdateMode() throws Exception { + long start = System.currentTimeMillis(); + + Map paramMap = new HashMap<>(1); + paramMap.put("a", 2); + paramMap.put("barThrowException", "true"); + paramMap.put("compensateBarThrowException", "true"); + + String stateMachineName = "simpleUpdateStateMachine"; + + StateMachineInstance inst = stateMachineEngine.start(stateMachineName, null, paramMap); + + long cost = System.currentTimeMillis() - start; + System.out.println("====== cost :" + cost); + + Assertions.assertNotNull(inst.getException()); + Assertions.assertEquals(inst.getStatus(), ExecutionStatus.UN); + + Thread.sleep(sleepTime); + inst = stateMachineEngine.getStateMachineConfig().getStateLogStore().getStateMachineInstance(inst.getId()); + Assertions.assertEquals(inst.getStateList().size(), 3); + } + + @Test + public void testSimpleSubRetryStateAsUpdateMode() throws Exception { + long start = System.currentTimeMillis(); + + Map paramMap = new HashMap<>(1); + paramMap.put("a", 3); + paramMap.put("barThrowException", "true"); + + String stateMachineName = "simpleStateMachineWithCompensationAndSubMachine"; + + StateMachineInstance inst = stateMachineEngine.start(stateMachineName, null, paramMap); + + long cost = System.currentTimeMillis() - start; + System.out.println("====== cost :" + cost); + + Assertions.assertEquals(inst.getStatus(), ExecutionStatus.UN); + + Thread.sleep(sleepTime); + inst = stateMachineEngine.getStateMachineConfig().getStateLogStore().getStateMachineInstance(inst.getId()); + + Assertions.assertEquals(inst.getStateList().size(), 2); + } + + @Test + public void testSimpleSubCompensateStateAsUpdateMode() throws Exception { + long start = System.currentTimeMillis(); + + Map paramMap = new HashMap<>(1); + paramMap.put("a", 4); + paramMap.put("barThrowException", "true"); + + String stateMachineName = "simpleStateMachineWithCompensationAndSubMachine"; + + StateMachineInstance inst = stateMachineEngine.start(stateMachineName, null, paramMap); + + long cost = System.currentTimeMillis() - start; + System.out.println("====== cost :" + cost); + + Assertions.assertEquals(inst.getStatus(), ExecutionStatus.UN); + + Thread.sleep(sleepTime); + inst = stateMachineEngine.getStateMachineConfig().getStateLogStore().getStateMachineInstance(inst.getId()); + + Assertions.assertEquals(inst.getStateList().size(), 2); + } + private void doTestStateMachineTransTimeout(Map paramMap) throws Exception { long start = System.currentTimeMillis(); diff --git a/test/src/test/resources/file.conf b/test/src/test/resources/file.conf index e9fb03b7601..7873ab23568 100644 --- a/test/src/test/resources/file.conf +++ b/test/src/test/resources/file.conf @@ -12,5 +12,7 @@ client { reportSuccessEnable = false sagaBranchRegisterEnable = false sagaJsonParser = jackson + sagaRetryPersistModeUpdate = false + sagaCompensatePersistModeUpdate = false } } \ No newline at end of file diff --git a/test/src/test/resources/saga/spring/statemachine_engine_db_test.xml b/test/src/test/resources/saga/spring/statemachine_engine_db_test.xml index 83bd60ae51b..3a7a05f9972 100644 --- a/test/src/test/resources/saga/spring/statemachine_engine_db_test.xml +++ b/test/src/test/resources/saga/spring/statemachine_engine_db_test.xml @@ -52,6 +52,8 @@ + +