Skip to content

Commit

Permalink
feature: Saga support customize whether update last retry log (apache…
Browse files Browse the repository at this point in the history
…#3372)

* feature: support customize if persist retry and compensate execution log
  • Loading branch information
anselleeyy committed Dec 24, 2020
1 parent 2598b16 commit 491a9d7
Show file tree
Hide file tree
Showing 35 changed files with 529 additions and 28 deletions.
3 changes: 3 additions & 0 deletions common/src/main/java/io/seata/common/DefaultValues.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public interface DefaultValues {
boolean DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE = false;
boolean DEFAULT_TM_DEGRADE_CHECK = false;
boolean DEFAULT_CLIENT_SAGA_BRANCH_REGISTER_ENABLE = false;
boolean DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE = false;
boolean DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE = false;

/**
* Shutdown timeout default 3s
*/
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/io/seata/core/constants/ConfigurationKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ public interface ConfigurationKeys {
*/
String CLIENT_SAGA_JSON_PARSER = CLIENT_RM_PREFIX + "sagaJsonParser";

/**
* The constant CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE.
*/
String CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE = CLIENT_RM_PREFIX + "sagaRetryPersistModeUpdate";

/**
* The constant CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE.
*/
String CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE = CLIENT_RM_PREFIX + "sagaCompensatePersistModeUpdate";

/**
* The constant CLIENT_REPORT_RETRY_COUNT.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

import static io.seata.common.DefaultValues.DEFAULT_CLIENT_REPORT_SUCCESS_ENABLE;
import static io.seata.common.DefaultValues.DEFAULT_CLIENT_SAGA_BRANCH_REGISTER_ENABLE;
import static io.seata.common.DefaultValues.DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE;
import static io.seata.common.DefaultValues.DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE;
import static io.seata.common.DefaultValues.DEFAULT_SAGA_JSON_PARSER;

/**
Expand Down Expand Up @@ -66,6 +68,10 @@ public DbStateMachineConfig() {
setSagaJsonParser(configuration.getConfig(ConfigurationKeys.CLIENT_SAGA_JSON_PARSER, DEFAULT_SAGA_JSON_PARSER));
this.applicationId = configuration.getConfig(ConfigurationKeys.APPLICATION_ID);
this.txServiceGroup = configuration.getConfig(ConfigurationKeys.TX_SERVICE_GROUP);
setSagaRetryPersistModeUpdate(configuration.getBoolean(ConfigurationKeys.CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE,
DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE));
setSagaCompensatePersistModeUpdate(configuration.getBoolean(ConfigurationKeys.CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE,
DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE));
}
} catch (Exception e) {
LOGGER.warn("Load SEATA configuration failed, use default configuration instead.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import io.seata.saga.engine.StateMachineConfig;
import io.seata.saga.engine.config.DbStateMachineConfig;
import io.seata.saga.engine.exception.EngineExecutionException;
import io.seata.saga.engine.impl.DefaultStateMachineConfig;
import io.seata.saga.engine.pcext.StateInstruction;
import io.seata.saga.engine.pcext.utils.EngineUtils;
import io.seata.saga.engine.sequence.SeqGenerator;
import io.seata.saga.engine.serializer.Serializer;
Expand All @@ -46,7 +48,9 @@
import io.seata.saga.statelang.domain.DomainConstants;
import io.seata.saga.statelang.domain.ExecutionStatus;
import io.seata.saga.statelang.domain.StateInstance;
import io.seata.saga.statelang.domain.StateMachine;
import io.seata.saga.statelang.domain.StateMachineInstance;
import io.seata.saga.statelang.domain.impl.ServiceTaskStateImpl;
import io.seata.saga.statelang.domain.impl.StateInstanceImpl;
import io.seata.saga.statelang.domain.impl.StateMachineInstanceImpl;
import io.seata.saga.tm.SagaTransactionalTemplate;
Expand Down Expand Up @@ -90,11 +94,7 @@ public void recordStateMachineStarted(StateMachineInstance machineInstance, Proc
//if parentId is not null, machineInstance is a SubStateMachine, do not start a new global transaction,
//use parent transaction instead.
String parentId = machineInstance.getParentId();
if (StringUtils.hasLength(parentId)) {
if (StringUtils.isEmpty(machineInstance.getId())) {
machineInstance.setId(parentId);
}
} else {
if (StringUtils.isEmpty(parentId)) {
beginTransaction(machineInstance, context);
}

Expand Down Expand Up @@ -263,28 +263,39 @@ public void recordStateMachineRestarted(StateMachineInstance machineInstance, Pr
@Override
public void recordStateStarted(StateInstance stateInstance, ProcessContext context) {
if (stateInstance != null) {
//if this state is for retry, do not register branch, but generate id
if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) {

stateInstance.setId(generateRetryStateInstanceId(stateInstance));
}
//if this state is for compensation, do not register branch, but generate id
else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) {
boolean isUpdateMode = isUpdateMode(stateInstance, context);

stateInstance.setId(generateCompensateStateInstanceId(stateInstance));
// if this state is for retry, do not register branch
if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) {
if (isUpdateMode) {
stateInstance.setId(stateInstance.getStateIdRetriedFor());
} else {
// generate id by default
stateInstance.setId(generateRetryStateInstanceId(stateInstance));
}
}
else {
// if this state is for compensation, do not register branch
else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) {
stateInstance.setId(generateCompensateStateInstanceId(stateInstance, isUpdateMode));
} else {
branchRegister(stateInstance, context);
}


if (StringUtils.isEmpty(stateInstance.getId()) && seqGenerator != null) {
stateInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_INST));
}

stateInstance.setSerializedInputParams(paramsSerializer.serialize(stateInstance.getInputParams()));
executeUpdate(stateLogStoreSqls.getRecordStateStartedSql(dbType), STATE_INSTANCE_TO_STATEMENT_FOR_INSERT,
stateInstance);
if (!isUpdateMode) {
executeUpdate(stateLogStoreSqls.getRecordStateStartedSql(dbType),
STATE_INSTANCE_TO_STATEMENT_FOR_INSERT, stateInstance);
} else {
// if this retry/compensate state do not need persist, just update last inst
executeUpdate(stateLogStoreSqls.getUpdateStateExecutionStatusSql(dbType),
stateInstance.getStatus().name(), new Timestamp(System.currentTimeMillis()),
stateInstance.getMachineInstanceId(), stateInstance.getId());
}
}
}

Expand Down Expand Up @@ -376,9 +387,13 @@ private String generateRetryStateInstanceId(StateInstance stateInstance) {
* @param stateInstance
* @return
*/
private String generateCompensateStateInstanceId(StateInstance stateInstance) {
private String generateCompensateStateInstanceId(StateInstance stateInstance, boolean isUpdateMode) {
String originalCompensateStateInstId = stateInstance.getStateIdCompensatedFor();
int maxIndex = 1;
// if update mode, means update last compensate inst
if (isUpdateMode) {
return originalCompensateStateInstId + "-" + maxIndex;
}
for (StateInstance aStateInstance : stateInstance.getStateMachineInstance().getStateList()) {
if (aStateInstance != stateInstance
&& originalCompensateStateInstId.equals(aStateInstance.getStateIdCompensatedFor())) {
Expand All @@ -405,6 +420,40 @@ private int getIdIndex(String stateInstanceId, String separator) {
return -1;
}

private boolean isUpdateMode(StateInstance stateInstance, ProcessContext context) {
DefaultStateMachineConfig stateMachineConfig = (DefaultStateMachineConfig)context.getVariable(
DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
StateInstruction instruction = context.getInstruction(StateInstruction.class);
ServiceTaskStateImpl state = (ServiceTaskStateImpl)instruction.getState(context);
StateMachine stateMachine = stateInstance.getStateMachineInstance().getStateMachine();

if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) {

if (null != state.isRetryPersistModeUpdate()) {
return state.isRetryPersistModeUpdate();
} else if (null != stateMachine.isRetryPersistModeUpdate()) {
return stateMachine.isRetryPersistModeUpdate();
}
return stateMachineConfig.isSagaRetryPersistModeUpdate();

} else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) {

// find if this compensate has been executed
for (StateInstance aStateInstance : stateInstance.getStateMachineInstance().getStateList()) {
if (aStateInstance.isForCompensation() && aStateInstance.getName().equals(stateInstance.getName())) {
if (null != state.isCompensatePersistModeUpdate()) {
return state.isCompensatePersistModeUpdate();
} else if (null != stateMachine.isCompensatePersistModeUpdate()) {
return stateMachine.isCompensatePersistModeUpdate();
}
return stateMachineConfig.isSagaCompensatePersistModeUpdate();
}
}
return false;
}
return false;
}

@Override
public void recordStateFinished(StateInstance stateInstance, ProcessContext context) {
if (stateInstance != null) {
Expand Down Expand Up @@ -444,7 +493,11 @@ protected void branchReport(StateInstance stateInstance, ProcessContext context)
StateInstance originalStateInst = null;
if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) {

originalStateInst = findOutOriginalStateInstanceOfRetryState(stateInstance);
if (isUpdateMode(stateInstance, context)) {
originalStateInst = stateInstance;
} else {
originalStateInst = findOutOriginalStateInstanceOfRetryState(stateInstance);
}

if (ExecutionStatus.SU.equals(stateInstance.getStatus())) {
branchStatus = BranchStatus.PhaseTwo_Committed;
Expand All @@ -457,7 +510,12 @@ protected void branchReport(StateInstance stateInstance, ProcessContext context)

} else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) {

originalStateInst = findOutOriginalStateInstanceOfCompensateState(stateInstance);
if (isUpdateMode(stateInstance, context)) {
originalStateInst = stateInstance.getStateMachineInstance().getStateMap().get(
stateInstance.getStateIdCompensatedFor());
} else {
originalStateInst = findOutOriginalStateInstanceOfCompensateState(stateInstance);
}
}

if (originalStateInst == null) {
Expand Down Expand Up @@ -774,6 +832,7 @@ public void toStatement(StateInstance stateInstance, PreparedStatement statement
statement.setString(12, stateInstance.getBusinessKey());
statement.setString(13, stateInstance.getStateIdCompensatedFor());
statement.setString(14, stateInstance.getStateIdRetriedFor());
statement.setTimestamp(15, new Timestamp(stateInstance.getGmtUpdated().getTime()));
}
}

Expand All @@ -785,8 +844,9 @@ public void toStatement(StateInstance stateInstance, PreparedStatement statement
stateInstance.getException() != null ? (byte[]) stateInstance.getSerializedException() : null);
statement.setString(3, stateInstance.getStatus().name());
statement.setObject(4, stateInstance.getSerializedOutputParams());
statement.setString(5, stateInstance.getId());
statement.setString(6, stateInstance.getMachineInstanceId());
statement.setTimestamp(5, new Timestamp(stateInstance.getGmtEnd().getTime()));
statement.setString(6, stateInstance.getId());
statement.setString(7, stateInstance.getMachineInstanceId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ public class StateLogStoreSqls {
private static final String RECORD_STATE_STARTED_SQL =
"INSERT INTO ${TABLE_PREFIX}state_inst (id, machine_inst_id, name, type,"
+ " gmt_started, service_name, service_method, service_type, is_for_update, input_params, status, "
+ "business_key, "
+ "state_id_compensated_for, state_id_retried_for)\n" + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ "business_key, state_id_compensated_for, state_id_retried_for, gmt_updated)\n"
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";

private static final String RECORD_STATE_FINISHED_SQL
= "UPDATE ${TABLE_PREFIX}state_inst SET gmt_end = ?, excep = ?, status = ?, output_params = ? WHERE id = ? "
+ "AND machine_inst_id = ?";
private static final String RECORD_STATE_FINISHED_SQL =
"UPDATE ${TABLE_PREFIX}state_inst SET gmt_end = ?, excep = ?, status = ?, output_params = ?, gmt_updated = ? "
+ "WHERE id = ? AND machine_inst_id = ?";

private static final String UPDATE_STATE_EXECUTION_STATUS_SQL
= "UPDATE ${TABLE_PREFIX}state_inst SET status = ? WHERE machine_inst_id = ? AND id = ?";
= "UPDATE ${TABLE_PREFIX}state_inst SET status = ?, gmt_updated = ? WHERE machine_inst_id = ? AND id = ?";

private static final String QUERY_STATE_INSTANCES_BY_MACHINE_INSTANCE_ID_SQL = "SELECT " + STATE_INSTANCE_FIELDS
+ " FROM ${TABLE_PREFIX}state_inst WHERE machine_inst_id = ? ORDER BY gmt_started, ID ASC";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@

import javax.script.ScriptEngineManager;

import static io.seata.common.DefaultValues.DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE;
import static io.seata.common.DefaultValues.DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE;
import static io.seata.common.DefaultValues.DEFAULT_SAGA_JSON_PARSER;

/**
Expand Down Expand Up @@ -108,6 +110,8 @@ public class DefaultStateMachineConfig implements StateMachineConfig, Applicatio
private String defaultTenantId = "000001";
private ScriptEngineManager scriptEngineManager;
private String sagaJsonParser = DEFAULT_SAGA_JSON_PARSER;
private boolean sagaRetryPersistModeUpdate = DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE;
private boolean sagaCompensatePersistModeUpdate = DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE;

protected void init() throws Exception {

Expand Down Expand Up @@ -477,4 +481,20 @@ public String getSagaJsonParser() {
public void setSagaJsonParser(String sagaJsonParser) {
this.sagaJsonParser = sagaJsonParser;
}

public boolean isSagaRetryPersistModeUpdate() {
return sagaRetryPersistModeUpdate;
}

public void setSagaRetryPersistModeUpdate(boolean sagaRetryPersistModeUpdate) {
this.sagaRetryPersistModeUpdate = sagaRetryPersistModeUpdate;
}

public boolean isSagaCompensatePersistModeUpdate() {
return sagaCompensatePersistModeUpdate;
}

public void setSagaCompensatePersistModeUpdate(boolean sagaCompensatePersistModeUpdate) {
this.sagaCompensatePersistModeUpdate = sagaCompensatePersistModeUpdate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public void preProcess(ProcessContext context) throws EngineExecutionException {
stateInstance.setStateMachineInstance(stateMachineInstance);
stateInstance.setName(state.getName());
stateInstance.setGmtStarted(new Date());
stateInstance.setGmtUpdated(stateInstance.getGmtStarted());
stateInstance.setStatus(ExecutionStatus.RU);

stateInstance.setStateIdRetriedFor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,18 @@ public interface ServiceTaskState extends TaskState {
* @return
*/
boolean isPersist();

/**
* Is update last retry execution log, default append new
*
* @return
*/
Boolean isRetryPersistModeUpdate();

/**
* Is update last compensate execution log, default append new
*
* @return
*/
Boolean isCompensatePersistModeUpdate();
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,20 @@ public interface StateInstance {
*/
void setGmtStarted(Date gmtStarted);

/**
* get update time
*
* @return
*/
Date getGmtUpdated();

/**
* set update time
*
* @param gmtUpdated
*/
void setGmtUpdated(Date gmtUpdated);

/**
* get end time
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,20 @@ public interface StateMachine {
*/
boolean isPersist();

/**
* Is update last retry execution log, default append new
*
* @return
*/
Boolean isRetryPersistModeUpdate();

/**
* Is update last compensate execution log, default append new
*
* @return
*/
Boolean isCompensatePersistModeUpdate();

/**
* State language text
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public abstract class AbstractTaskState extends BaseState implements TaskState {
private List<Object> inputExpressions;
private Map<String, Object> outputExpressions;
private boolean isPersist = true;
private Boolean retryPersistModeUpdate;
private Boolean compensatePersistModeUpdate;

@Override
public String getCompensateState() {
Expand Down Expand Up @@ -113,6 +115,22 @@ public void setPersist(boolean persist) {
isPersist = persist;
}

public Boolean isRetryPersistModeUpdate() {
return retryPersistModeUpdate;
}

public void setRetryPersistModeUpdate(Boolean retryPersistModeUpdate) {
this.retryPersistModeUpdate = retryPersistModeUpdate;
}

public Boolean isCompensatePersistModeUpdate() {
return compensatePersistModeUpdate;
}

public void setCompensatePersistModeUpdate(Boolean compensatePersistModeUpdate) {
this.compensatePersistModeUpdate = compensatePersistModeUpdate;
}

public List<Object> getInputExpressions() {
return inputExpressions;
}
Expand Down
Loading

0 comments on commit 491a9d7

Please sign in to comment.