Skip to content

Commit

Permalink
optimize: @GlobalTransactional and @GlobalLock can now customize lock…
Browse files Browse the repository at this point in the history
… retry config (apache#2962)
  • Loading branch information
selfishlover committed Oct 12, 2020
1 parent f1cf404 commit 6c0c759
Show file tree
Hide file tree
Showing 17 changed files with 609 additions and 147 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.context;

import io.seata.core.model.GlobalLockConfig;

/** use this class to access current GlobalLockConfig from anywhere
* @author selfishlover
*/
public class GlobalLockConfigHolder {

private static ThreadLocal<GlobalLockConfig> holder = new ThreadLocal<>();

public static GlobalLockConfig getCurrentGlobalLockConfig() {
return holder.get();
}

public static GlobalLockConfig setAndReturnPrevious(GlobalLockConfig config) {
GlobalLockConfig previous = holder.get();
holder.set(config);
return previous;
}

public static void remove() {
holder.remove();
}
}
42 changes: 42 additions & 0 deletions core/src/main/java/io/seata/core/model/GlobalLockConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.model;

/**
* @author selfishlover
*/
public class GlobalLockConfig {

private int lockRetryInternal;

private int lockRetryTimes;

public int getLockRetryInternal() {
return lockRetryInternal;
}

public void setLockRetryInternal(int lockRetryInternal) {
this.lockRetryInternal = lockRetryInternal;
}

public int getLockRetryTimes() {
return lockRetryTimes;
}

public void setLockRetryTimes(int lockRetryTimes) {
this.lockRetryTimes = lockRetryTimes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.context;

import io.seata.core.model.GlobalLockConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

class GlobalLockConfigHolderTest {

@BeforeEach
void setUp() {
assertNull(GlobalLockConfigHolder.getCurrentGlobalLockConfig(), "should be null at first");
}

@Test
void setAndReturnPrevious() {
GlobalLockConfig config1 = new GlobalLockConfig();
assertNull(GlobalLockConfigHolder.setAndReturnPrevious(config1), "should return null");
assertSame(config1, GlobalLockConfigHolder.getCurrentGlobalLockConfig(), "holder fail to store config");

GlobalLockConfig config2 = new GlobalLockConfig();
assertSame(config1, GlobalLockConfigHolder.setAndReturnPrevious(config2), "fail to get previous config");
assertSame(config2, GlobalLockConfigHolder.getCurrentGlobalLockConfig(), "holder fail to store latest config");
}

@AfterEach
void tearDown() {
assertDoesNotThrow(GlobalLockConfigHolder::remove, "clear method should not throw anything");
}
}
38 changes: 38 additions & 0 deletions rm-datasource/src/main/java/io/seata/rm/GlobalLockExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.rm;

import io.seata.core.model.GlobalLockConfig;

/**
* executor to execute business logic that require global lock
* @author selfishlover
*/
public interface GlobalLockExecutor {

/**
* execute business logic
* @return business return
* @throws Throwable whatever throw during execution
*/
Object execute() throws Throwable;

/**
* global lock config info
* @return
*/
GlobalLockConfig getGlobalLockConfig();
}
54 changes: 26 additions & 28 deletions rm-datasource/src/main/java/io/seata/rm/GlobalLockTemplate.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,43 @@
*/
package io.seata.rm;

import java.util.concurrent.Callable;

import io.seata.core.context.GlobalLockConfigHolder;
import io.seata.core.context.RootContext;
import io.seata.core.model.GlobalLockConfig;

/**
* Template of executing business logic in a local transaction with Global lock.
*
* @param <T>
* @author deyou
* executor template for local transaction which need global lock
* @author selfishlover
*/
public class GlobalLockTemplate<T> {

/**
* Execute object.
*
* @param business the business
* @return the object
* @throws Exception
*/
public Object execute(Callable<T> business) throws Exception {
public class GlobalLockTemplate {

Object rs;
//fix nested situation
boolean hasInGlobalLock = RootContext.requireGlobalLock();
// add global lock declare
if (!hasInGlobalLock) {
public Object execute(GlobalLockExecutor executor) throws Throwable {
boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
if (!alreadyInGlobalLock) {
RootContext.bindGlobalLockFlag();
}

// set my config to config holder so that it can be access in further execution
// for example, LockRetryController can access it with config holder
GlobalLockConfig myConfig = executor.getGlobalLockConfig();
GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);

try {
// Do Your Business
rs = business.call();
return executor.execute();
} finally {
//clean the global lock declare
if (!hasInGlobalLock) {
// only unbind when this is the root caller.
// otherwise, the outer caller would lose global lock flag
if (!alreadyInGlobalLock) {
RootContext.unbindGlobalLockFlag();
}
}

return rs;
// if previous config is not null, we need to set it back
// so that the outer logic can still use their config
if (previousConfig != null) {
GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
} else {
GlobalLockConfigHolder.remove();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,41 @@
*/
package io.seata.rm.datasource.exec;

import io.seata.common.DefaultValues;
import io.seata.common.util.NumberUtils;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationCache;
import io.seata.config.ConfigurationChangeEvent;
import io.seata.config.ConfigurationChangeListener;
import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys;

import static io.seata.common.DefaultValues.DEFAULT_CLIENT_LOCK_RETRY_INTERVAL;
import static io.seata.common.DefaultValues.DEFAULT_CLIENT_LOCK_RETRY_TIMES;
import io.seata.core.context.GlobalLockConfigHolder;
import io.seata.core.model.GlobalLockConfig;

/**
* The type Lock retry controller.
* Lock retry controller
*
* @author sharajava
*/
public class LockRetryController {
private static int LOCK_RETRY_INTERNAL =
ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_LOCK_RETRY_INTERVAL, DEFAULT_CLIENT_LOCK_RETRY_INTERVAL);
private static int LOCK_RETRY_TIMES =
ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_LOCK_RETRY_TIMES, DEFAULT_CLIENT_LOCK_RETRY_TIMES);

private int lockRetryInternal = LOCK_RETRY_INTERNAL;
private int lockRetryTimes = LOCK_RETRY_TIMES;
private static final GlobalConfig LISTENER = new GlobalConfig();

static {
ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_LOCK_RETRY_INTERVAL, LISTENER);
ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_LOCK_RETRY_TIMES, LISTENER);
}

private int lockRetryInternal;

private int lockRetryTimes;

/**
* Instantiates a new Lock retry controller.
*/
public LockRetryController() {
this.lockRetryInternal = getLockRetryInternal();
this.lockRetryTimes = getLockRetryTimes();
}

/**
Expand All @@ -57,4 +68,66 @@ public void sleep(Exception e) throws LockWaitTimeoutException {
} catch (InterruptedException ignore) {
}
}

int getLockRetryInternal() {
// get customized config first
GlobalLockConfig config = GlobalLockConfigHolder.getCurrentGlobalLockConfig();
if (config != null) {
int configInternal = config.getLockRetryInternal();
if (configInternal > 0) {
return configInternal;
}
}
// if there is no customized config, use global config instead
return LISTENER.getGlobalLockRetryInternal();
}

int getLockRetryTimes() {
// get customized config first
GlobalLockConfig config = GlobalLockConfigHolder.getCurrentGlobalLockConfig();
if (config != null) {
int configTimes = config.getLockRetryTimes();
if (configTimes >= 0) {
return configTimes;
}
}
// if there is no customized config, use global config instead
return LISTENER.getGlobalLockRetryTimes();
}

static class GlobalConfig implements ConfigurationChangeListener {

private volatile int globalLockRetryInternal;

private volatile int globalLockRetryTimes;

private final int defaultRetryInternal = DefaultValues.DEFAULT_CLIENT_LOCK_RETRY_INTERVAL;
private final int defaultRetryTimes = DefaultValues.DEFAULT_CLIENT_LOCK_RETRY_TIMES;

public GlobalConfig() {
Configuration configuration = ConfigurationFactory.getInstance();
globalLockRetryInternal = configuration.getInt(ConfigurationKeys.CLIENT_LOCK_RETRY_INTERVAL, defaultRetryInternal);
globalLockRetryTimes = configuration.getInt(ConfigurationKeys.CLIENT_LOCK_RETRY_TIMES, defaultRetryTimes);
}

@Override
public void onChangeEvent(ConfigurationChangeEvent event) {
String dataId = event.getDataId();
String newValue = event.getNewValue();
if (ConfigurationKeys.CLIENT_LOCK_RETRY_INTERVAL.equals(dataId)) {
globalLockRetryInternal = NumberUtils.toInt(newValue, defaultRetryInternal);
}
if (ConfigurationKeys.CLIENT_LOCK_RETRY_TIMES.equals(dataId)) {
globalLockRetryTimes = NumberUtils.toInt(newValue, defaultRetryTimes);
}
}

public int getGlobalLockRetryInternal() {
return globalLockRetryInternal;
}

public int getGlobalLockRetryTimes() {
return globalLockRetryTimes;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ public T doExecute(Object... args) throws Throwable {
DatabaseMetaData dbmd = conn.getMetaData();
T rs;
Savepoint sp = null;
LockRetryController lockRetryController = new LockRetryController();
boolean originalAutoCommit = conn.getAutoCommit();
ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
String selectPKSQL = buildSelectSQL(paramAppenderList);
try {
if (originalAutoCommit) {
/*
Expand All @@ -82,6 +79,9 @@ public T doExecute(Object... args) throws Throwable {
throw new SQLException("not support savepoint. please check your db version");
}

LockRetryController lockRetryController = new LockRetryController();
ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
String selectPKSQL = buildSelectSQL(paramAppenderList);
while (true) {
try {
// #870
Expand Down
Loading

0 comments on commit 6c0c759

Please sign in to comment.