Skip to content

Commit

Permalink
Merge branch 'develop' into timeout-global-config
Browse files Browse the repository at this point in the history
  • Loading branch information
slievrly committed Jun 17, 2020
2 parents 52c535c + 37d2ea6 commit 7d4e581
Show file tree
Hide file tree
Showing 19 changed files with 223 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;

import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.saga.engine.StateMachineConfig;
import io.seata.saga.engine.evaluation.EvaluatorFactoryManager;
import io.seata.saga.engine.evaluation.exception.ExceptionMatchEvaluatorFactory;
Expand All @@ -29,8 +31,14 @@
import io.seata.saga.engine.expression.spel.SpringELExpressionFactory;
import io.seata.saga.engine.invoker.ServiceInvokerManager;
import io.seata.saga.engine.invoker.impl.SpringBeanServiceInvoker;
import io.seata.saga.engine.pcext.InterceptableStateHandler;
import io.seata.saga.engine.pcext.InterceptableStateRouter;
import io.seata.saga.engine.pcext.StateHandler;
import io.seata.saga.engine.pcext.StateHandlerInterceptor;
import io.seata.saga.engine.pcext.StateMachineProcessHandler;
import io.seata.saga.engine.pcext.StateMachineProcessRouter;
import io.seata.saga.engine.pcext.StateRouter;
import io.seata.saga.engine.pcext.StateRouterInterceptor;
import io.seata.saga.engine.repo.StateLogRepository;
import io.seata.saga.engine.repo.StateMachineRepository;
import io.seata.saga.engine.repo.impl.StateLogRepositoryImpl;
Expand Down Expand Up @@ -194,13 +202,15 @@ protected void init() throws Exception {
}
}

private ProcessControllerImpl createProcessorController(ProcessCtrlEventPublisher eventPublisher) throws Exception {
protected ProcessControllerImpl createProcessorController(ProcessCtrlEventPublisher eventPublisher) throws Exception {

StateMachineProcessRouter stateMachineProcessRouter = new StateMachineProcessRouter();
stateMachineProcessRouter.initDefaultStateRouters();
loadStateRouterInterceptors(stateMachineProcessRouter.getStateRouters());

StateMachineProcessHandler stateMachineProcessHandler = new StateMachineProcessHandler();
stateMachineProcessHandler.initDefaultHandlers();
loadStateHandlerInterceptors(stateMachineProcessHandler.getStateHandlers());

DefaultRouterHandler defaultRouterHandler = new DefaultRouterHandler();
defaultRouterHandler.setEventPublisher(eventPublisher);
Expand All @@ -225,6 +235,42 @@ private ProcessControllerImpl createProcessorController(ProcessCtrlEventPublishe
return processorController;
}

protected void loadStateHandlerInterceptors(Map<String, StateHandler> stateHandlerMap) {
for (StateHandler stateHandler : stateHandlerMap.values()) {
if (stateHandler instanceof InterceptableStateHandler) {
InterceptableStateHandler interceptableStateHandler = (InterceptableStateHandler) stateHandler;
List<StateHandlerInterceptor> interceptorList = EnhancedServiceLoader.loadAll(StateHandlerInterceptor.class);
for (StateHandlerInterceptor interceptor : interceptorList) {
if (interceptor.match(interceptableStateHandler.getClass())) {
interceptableStateHandler.addInterceptor(interceptor);
}

if (interceptor instanceof ApplicationContextAware) {
((ApplicationContextAware) interceptor).setApplicationContext(getApplicationContext());
}
}
}
}
}

protected void loadStateRouterInterceptors(Map<String, StateRouter> stateRouterMap) {
for (StateRouter stateRouter : stateRouterMap.values()) {
if (stateRouter instanceof InterceptableStateRouter) {
InterceptableStateRouter interceptableStateRouter = (InterceptableStateRouter) stateRouter;
List<StateRouterInterceptor> interceptorList = EnhancedServiceLoader.loadAll(StateRouterInterceptor.class);
for (StateRouterInterceptor interceptor : interceptorList) {
if (interceptor.match(interceptableStateRouter.getClass())) {
interceptableStateRouter.addInterceptor(interceptor);
}

if (interceptor instanceof ApplicationContextAware) {
((ApplicationContextAware) interceptor).setApplicationContext(getApplicationContext());
}
}
}
}
}

@Override
public void afterPropertiesSet() throws Exception {
init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
*
* @author lorne.cl
*/
public interface InterceptibleStateHandler extends StateHandler {
public interface InterceptableStateHandler extends StateHandler {

List<StateHandlerInterceptor> getInterceptors();

void addInterceptor(StateHandlerInterceptor interceptor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import java.util.List;

/**
* Interceptible State Router
* Interceptable State Router
*
* @author lorne.cl
*/
public interface InterceptibleStateRouter extends StateRouter {
public interface InterceptableStateRouter extends StateRouter {

List<StateRouterInterceptor> getInterceptors();

void addInterceptor(StateRouterInterceptor interceptor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ public interface StateHandlerInterceptor {
void preProcess(ProcessContext context) throws EngineExecutionException;

void postProcess(ProcessContext context, Exception e) throws EngineExecutionException;

boolean match(Class<? extends InterceptableStateHandler> clazz);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.seata.saga.engine.pcext.handlers.ServiceTaskStateHandler;
import io.seata.saga.engine.pcext.handlers.SubStateMachineHandler;
import io.seata.saga.engine.pcext.handlers.SucceedEndStateHandler;
import io.seata.saga.engine.pcext.interceptors.ServiceTaskHandlerInterceptor;
import io.seata.saga.proctrl.ProcessContext;
import io.seata.saga.proctrl.handler.ProcessHandler;
import io.seata.saga.statelang.domain.DomainConstants;
Expand All @@ -52,8 +51,8 @@ public void process(ProcessContext context) throws FrameworkException {
StateHandler stateHandler = stateHandlers.get(stateType);

List<StateHandlerInterceptor> interceptors = null;
if (stateHandler instanceof InterceptibleStateHandler) {
interceptors = ((InterceptibleStateHandler)stateHandler).getInterceptors();
if (stateHandler instanceof InterceptableStateHandler) {
interceptors = ((InterceptableStateHandler)stateHandler).getInterceptors();
}

List<StateHandlerInterceptor> executedInterceptors = null;
Expand Down Expand Up @@ -87,18 +86,11 @@ public void process(ProcessContext context) throws FrameworkException {
public void initDefaultHandlers() {
if (stateHandlers.size() == 0) {

//ServiceTask
ServiceTaskStateHandler serviceTaskStateHandler = new ServiceTaskStateHandler();
List<StateHandlerInterceptor> stateHandlerInterceptors = new ArrayList<>(1);
stateHandlerInterceptors.add(new ServiceTaskHandlerInterceptor());
serviceTaskStateHandler.setInterceptors(stateHandlerInterceptors);
stateHandlers.put(DomainConstants.STATE_TYPE_SERVICE_TASK, serviceTaskStateHandler);
stateHandlers.put(DomainConstants.STATE_TYPE_SERVICE_TASK, new ServiceTaskStateHandler());

stateHandlers.put(DomainConstants.STATE_TYPE_SUB_MACHINE_COMPENSATION, serviceTaskStateHandler);
stateHandlers.put(DomainConstants.STATE_TYPE_SUB_MACHINE_COMPENSATION, new ServiceTaskStateHandler());

SubStateMachineHandler subStateMachineHandler = new SubStateMachineHandler();
subStateMachineHandler.setInterceptors(stateHandlerInterceptors);
stateHandlers.put(DomainConstants.STATE_TYPE_SUB_STATE_MACHINE, subStateMachineHandler);
stateHandlers.put(DomainConstants.STATE_TYPE_SUB_STATE_MACHINE, new SubStateMachineHandler());

stateHandlers.put(DomainConstants.STATE_TYPE_CHOICE, new ChoiceStateHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_SUCCEED, new SucceedEndStateHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import io.seata.common.exception.FrameworkException;
import io.seata.saga.engine.StateMachineConfig;
import io.seata.saga.engine.pcext.interceptors.EndStateRouterInterceptor;
import io.seata.saga.engine.pcext.routers.EndStateRouter;
import io.seata.saga.engine.pcext.routers.TaskStateRouter;
import io.seata.saga.engine.pcext.utils.EngineUtils;
Expand Down Expand Up @@ -67,8 +66,8 @@ public Instruction route(ProcessContext context) throws FrameworkException {
Instruction instruction = null;

List<StateRouterInterceptor> interceptors = null;
if (router instanceof InterceptibleStateRouter) {
interceptors = ((InterceptibleStateRouter)router).getInterceptors();
if (router instanceof InterceptableStateRouter) {
interceptors = ((InterceptableStateRouter)router).getInterceptors();
}

List<StateRouterInterceptor> executedInterceptors = null;
Expand Down Expand Up @@ -114,13 +113,8 @@ public void initDefaultStateRouters() {
this.stateRouters.put(DomainConstants.STATE_TYPE_SUB_STATE_MACHINE, taskStateRouter);
this.stateRouters.put(DomainConstants.STATE_TYPE_SUB_MACHINE_COMPENSATION, taskStateRouter);

EndStateRouter endStateRouter = new EndStateRouter();
List<StateRouterInterceptor> stateRouterInterceptors = new ArrayList<>(1);
stateRouterInterceptors.add(new EndStateRouterInterceptor());
endStateRouter.setInterceptors(stateRouterInterceptors);

this.stateRouters.put(DomainConstants.STATE_TYPE_SUCCEED, endStateRouter);
this.stateRouters.put(DomainConstants.STATE_TYPE_FAIL, endStateRouter);
this.stateRouters.put(DomainConstants.STATE_TYPE_SUCCEED, new EndStateRouter());
this.stateRouters.put(DomainConstants.STATE_TYPE_FAIL, new EndStateRouter());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@ public interface StateRouterInterceptor {
*/
void postRoute(ProcessContext context, State state, Instruction instruction, Exception e)
throws EngineExecutionException;


boolean match(Class<? extends InterceptableStateRouter> clazz);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.seata.saga.engine.StateMachineEngine;
import io.seata.saga.engine.exception.EngineExecutionException;
import io.seata.saga.engine.invoker.ServiceInvoker;
import io.seata.saga.engine.pcext.InterceptibleStateHandler;
import io.seata.saga.engine.pcext.InterceptableStateHandler;
import io.seata.saga.engine.pcext.StateHandler;
import io.seata.saga.engine.pcext.StateHandlerInterceptor;
import io.seata.saga.engine.pcext.StateInstruction;
Expand All @@ -50,11 +50,11 @@
*
* @author lorne.cl
*/
public class ServiceTaskStateHandler implements StateHandler, InterceptibleStateHandler {
public class ServiceTaskStateHandler implements StateHandler, InterceptableStateHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(ServiceTaskStateHandler.class);

private List<StateHandlerInterceptor> interceptors;
private List<StateHandlerInterceptor> interceptors = new ArrayList<>();

public static void handleException(ProcessContext context, AbstractTaskState state, Throwable e) {
List<TaskState.ExceptionMatch> catches = state.getCatches();
Expand Down Expand Up @@ -239,6 +239,13 @@ public List<StateHandlerInterceptor> getInterceptors() {
return interceptors;
}

@Override
public void addInterceptor(StateHandlerInterceptor interceptor) {
if (interceptors != null && !interceptors.contains(interceptor)) {
interceptors.add(interceptor);
}
}

public void setInterceptors(List<StateHandlerInterceptor> interceptors) {
this.interceptors = interceptors;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.seata.saga.engine.pcext.handlers;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -24,7 +25,7 @@
import io.seata.saga.engine.StateMachineEngine;
import io.seata.saga.engine.exception.EngineExecutionException;
import io.seata.saga.engine.exception.ForwardInvalidException;
import io.seata.saga.engine.pcext.InterceptibleStateHandler;
import io.seata.saga.engine.pcext.InterceptableStateHandler;
import io.seata.saga.engine.pcext.StateHandler;
import io.seata.saga.engine.pcext.StateHandlerInterceptor;
import io.seata.saga.engine.pcext.StateInstruction;
Expand All @@ -46,11 +47,11 @@
*
* @author lorne.cl
*/
public class SubStateMachineHandler implements StateHandler, InterceptibleStateHandler {
public class SubStateMachineHandler implements StateHandler, InterceptableStateHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(SubStateMachineHandler.class);

private List<StateHandlerInterceptor> interceptors;
private List<StateHandlerInterceptor> interceptors = new ArrayList<>();

private static ExecutionStatus decideStatus(StateMachineInstance stateMachineInstance, boolean isForward) {

Expand Down Expand Up @@ -200,6 +201,13 @@ public List<StateHandlerInterceptor> getInterceptors() {
return interceptors;
}

@Override
public void addInterceptor(StateHandlerInterceptor interceptor) {
if (interceptors != null && !interceptors.contains(interceptor)) {
interceptors.add(interceptor);
}
}

public void setInterceptors(List<StateHandlerInterceptor> interceptors) {
this.interceptors = interceptors;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
package io.seata.saga.engine.pcext.interceptors;

import io.seata.common.loader.LoadLevel;
import io.seata.saga.engine.exception.EngineExecutionException;
import io.seata.saga.engine.pcext.InterceptableStateRouter;
import io.seata.saga.engine.pcext.StateRouterInterceptor;
import io.seata.saga.engine.pcext.routers.EndStateRouter;
import io.seata.saga.engine.pcext.utils.EngineUtils;
import io.seata.saga.proctrl.Instruction;
import io.seata.saga.proctrl.ProcessContext;
Expand All @@ -27,6 +30,7 @@
*
* @author lorne.cl
*/
@LoadLevel(name = "EndState", order = 100)
public class EndStateRouterInterceptor implements StateRouterInterceptor {

@Override
Expand All @@ -39,4 +43,9 @@ public void postRoute(ProcessContext context, State state, Instruction instructi
throws EngineExecutionException {
EngineUtils.endStateMachine(context);
}

@Override
public boolean match(Class<? extends InterceptableStateRouter> clazz) {
return clazz != null && EndStateRouter.class.isAssignableFrom(clazz);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;

import io.seata.common.exception.FrameworkErrorCode;
import io.seata.common.loader.LoadLevel;
import io.seata.saga.engine.StateMachineConfig;
import io.seata.saga.engine.evaluation.Evaluator;
import io.seata.saga.engine.evaluation.EvaluatorFactory;
Expand All @@ -32,8 +33,11 @@
import io.seata.saga.engine.expression.ExpressionFactory;
import io.seata.saga.engine.expression.ExpressionFactoryManager;
import io.seata.saga.engine.expression.seq.SequenceExpression;
import io.seata.saga.engine.pcext.InterceptableStateHandler;
import io.seata.saga.engine.pcext.StateHandlerInterceptor;
import io.seata.saga.engine.pcext.StateInstruction;
import io.seata.saga.engine.pcext.handlers.ServiceTaskStateHandler;
import io.seata.saga.engine.pcext.handlers.SubStateMachineHandler;
import io.seata.saga.engine.pcext.utils.CompensationHolder;
import io.seata.saga.engine.pcext.utils.EngineUtils;
import io.seata.saga.engine.utils.ExceptionUtils;
Expand All @@ -50,14 +54,22 @@
import org.springframework.util.StringUtils;

/**
* ServiceTaskHandler Interceptor
* StateInterceptor for ServiceTask, SubStateMachine, CompensateState
*
* @author lorne.cl
*/
@LoadLevel(name = "ServiceTask", order = 100)
public class ServiceTaskHandlerInterceptor implements StateHandlerInterceptor {

private static final Logger LOGGER = LoggerFactory.getLogger(ServiceTaskHandlerInterceptor.class);

@Override
public boolean match(Class<? extends InterceptableStateHandler> clazz) {
return clazz != null &&
(ServiceTaskStateHandler.class.isAssignableFrom(clazz)
|| SubStateMachineHandler.class.isAssignableFrom(clazz));
}

private static List<Object> createInputParams(ExpressionFactoryManager expressionFactoryManager,
StateInstanceImpl stateInstance,
ServiceTaskStateImpl serviceTaskState, Object variablesFrom) {
Expand Down
Loading

0 comments on commit 7d4e581

Please sign in to comment.