Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: reduce the number of lock conflict exception #1469

Merged
merged 7 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ public class ConfigurationKeys {
* The constant CLIENT_REPORT_RETRY_COUNT.
*/
public static final String CLIENT_REPORT_RETRY_COUNT = CLIENT_PREFIX + "report.retry.count";

/**
* The constant CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT.
*/
public static final String CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT = CLIENT_PREFIX + "lock.retry.policy.branch-rollback-on-conflict";

/**
* The constant CLIENT_TABLE_META_CHECK_ENABLE.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.Callable;

import com.alibaba.druid.util.JdbcConstants;

Expand All @@ -28,6 +29,7 @@
import io.seata.core.model.BranchType;
import io.seata.rm.DefaultResourceManager;
import io.seata.rm.datasource.exec.LockConflictException;
import io.seata.rm.datasource.exec.LockRetryController;
import io.seata.rm.datasource.undo.SQLUndoLog;
import io.seata.rm.datasource.undo.UndoLogManager;
import io.seata.rm.datasource.undo.UndoLogManagerOracle;
Expand All @@ -50,6 +52,8 @@ public class ConnectionProxy extends AbstractConnectionProxy {
private static int REPORT_RETRY_COUNT = ConfigurationFactory.getInstance().getInt(
ConfigurationKeys.CLIENT_REPORT_RETRY_COUNT, DEFAULT_REPORT_RETRY_COUNT);

private final static LockRetryPolicy LOCK_RETRY_POLICY = new LockRetryPolicy();

/**
* Instantiates a new Connection proxy.
*
Expand Down Expand Up @@ -158,6 +162,19 @@ public void appendLockKey(String lockKey) {

@Override
public void commit() throws SQLException {
try {
LOCK_RETRY_POLICY.execute(() -> {
doCommit();
return null;
});
} catch (SQLException e) {
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}

private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
Expand Down Expand Up @@ -224,7 +241,7 @@ public void rollback() throws SQLException {
public void setAutoCommit(boolean autoCommit) throws SQLException {
if ((autoCommit) && !getAutoCommit()) {
// change autocommit from false to true, we should commit() first according to JDBC spec.
commit();
doCommit();
}
targetConnection.setAutoCommit(autoCommit);
}
Expand All @@ -247,4 +264,41 @@ private void report(boolean commitDone) throws SQLException {
}
}
}

public static class LockRetryPolicy {
protected final static boolean LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT =
ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT, true);

public <T> T execute(Callable<T> callable) throws Exception {
if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {
return callable.call();
} else {
return doRetryOnLockConflict(callable);
}
}

protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
LockRetryController lockRetryController = new LockRetryController();
while (true) {
try {
return callable.call();
} catch (LockConflictException lockConflict) {
onException(lockConflict);
lockRetryController.sleep(lockConflict);
} catch (Exception e) {
onException(e);
throw e;
}
}
}

/**
* Callback on exception in doLockRetryOnConflict.
*
* @param e invocation exception
* @throws Exception error
*/
protected void onException(Exception e) throws Exception {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package io.seata.rm.datasource.exec;

import java.sql.SQLException;
import java.sql.Statement;

import io.seata.rm.datasource.AbstractConnectionProxy;
import io.seata.rm.datasource.ConnectionProxy;
import io.seata.rm.datasource.StatementProxy;
Expand All @@ -26,13 +23,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.Callable;

/**
* The type Abstract dml base executor.
*
* @author sharajava
*
* @param <T> the type parameter
* @param <S> the type parameter
* @author sharajava
*/
public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {

Expand Down Expand Up @@ -83,34 +84,23 @@ protected T executeAutoCommitFalse(Object[] args) throws Exception {
* @throws Throwable the throwable
*/
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
T result = null;
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
LockRetryController lockRetryController = new LockRetryController();
try {
connectionProxy.setAutoCommit(false);
while (true) {
try {
result = executeAutoCommitFalse(args);
connectionProxy.commit();
break;
} catch (LockConflictException lockConflict) {
connectionProxy.getTargetConnection().rollback();
lockRetryController.sleep(lockConflict);
} catch (Exception exx) {
connectionProxy.getTargetConnection().rollback();
throw exx;
}
}

return new LockRetryPolicy(connectionProxy.getTargetConnection()).execute(() -> {
T result = executeAutoCommitFalse(args);
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
connectionProxy.rollback();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just used for LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT=false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be okay to do ConnectionProxy#rollback in both cases?

throw e;
} finally {
((ConnectionProxy)connectionProxy).getContext().reset();
((ConnectionProxy) connectionProxy).getContext().reset();
connectionProxy.setAutoCommit(true);
}
return result;
}

/**
Expand All @@ -130,4 +120,25 @@ protected T executeAutoCommitTrue(Object[] args) throws Throwable {
*/
protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;

private static class LockRetryPolicy extends ConnectionProxy.LockRetryPolicy {
private final Connection connection;

LockRetryPolicy(final Connection connection) {
this.connection = connection;
}

@Override
public <T> T execute(Callable<T> callable) throws Exception {
if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {
return doRetryOnLockConflict(callable);
} else {
return callable.call();
}
}

@Override
protected void onException(Exception e) throws Exception {
connection.rollback();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.rm.datasource;

import io.seata.core.exception.TransactionException;
import io.seata.core.exception.TransactionExceptionCode;
import io.seata.core.model.BranchType;
import io.seata.core.model.ResourceManager;
import io.seata.rm.DefaultResourceManager;
import io.seata.rm.datasource.exec.LockConflictException;
import io.seata.rm.datasource.exec.LockWaitTimeoutException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;

/**
* ConnectionProxy test
*
* @author ggndnn
*/
public class ConnectionProxyTest {
private DataSourceProxy dataSourceProxy;

private final static String TEST_RESOURCE_ID = "testResourceId";

private final static String TEST_XID = "testXid";

private Field branchRollbackFlagField;

@BeforeEach
public void initBeforeEach() throws Exception {
branchRollbackFlagField = ConnectionProxy.LockRetryPolicy.class.getDeclaredField("LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT");
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(branchRollbackFlagField, branchRollbackFlagField.getModifiers() & ~Modifier.FINAL);
branchRollbackFlagField.setAccessible(true);
boolean branchRollbackFlag = (boolean) branchRollbackFlagField.get(null);
Assertions.assertTrue(branchRollbackFlag);

dataSourceProxy = Mockito.mock(DataSourceProxy.class);
Mockito.when(dataSourceProxy.getResourceId())
.thenReturn(TEST_RESOURCE_ID);
ResourceManager rm = Mockito.mock(ResourceManager.class);
Mockito.when(rm.branchRegister(BranchType.AT, dataSourceProxy.getResourceId(), null, TEST_XID, null, null))
.thenThrow(new TransactionException(TransactionExceptionCode.LockKeyConflict));
DefaultResourceManager defaultResourceManager = DefaultResourceManager.get();
Assertions.assertNotNull(defaultResourceManager);
DefaultResourceManager.mockResourceManager(BranchType.AT, rm);
}

@Test
public void testLockRetryPolicyRollbackOnConflict() throws Exception {
boolean oldBranchRollbackFlag = (boolean) branchRollbackFlagField.get(null);
branchRollbackFlagField.set(null, true);
ConnectionProxy connectionProxy = new ConnectionProxy(dataSourceProxy, null);
connectionProxy.bind(TEST_XID);
Assertions.assertThrows(LockConflictException.class, connectionProxy::commit);
branchRollbackFlagField.set(null, oldBranchRollbackFlag);
}

@Test
public void testLockRetryPolicyNotRollbackOnConflict() throws Exception {
boolean oldBranchRollbackFlag = (boolean) branchRollbackFlagField.get(null);
branchRollbackFlagField.set(null, false);
ConnectionProxy connectionProxy = new ConnectionProxy(dataSourceProxy, null);
connectionProxy.bind(TEST_XID);
Assertions.assertThrows(LockWaitTimeoutException.class, connectionProxy::commit);
branchRollbackFlagField.set(null, oldBranchRollbackFlag);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.rm.datasource.exec;

import io.seata.rm.datasource.ConnectionContext;
import io.seata.rm.datasource.ConnectionProxy;
import io.seata.rm.datasource.PreparedStatementProxy;
import io.seata.rm.datasource.sql.SQLInsertRecognizer;
import io.seata.rm.datasource.sql.struct.TableMeta;
import io.seata.rm.datasource.sql.struct.TableRecords;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.sql.Connection;

/**
* AbstractDMLBaseExecutor test
*
* @author ggndnn
*/
public class AbstractDMLBaseExecutorTest {
private ConnectionProxy connectionProxy;

private AbstractDMLBaseExecutor executor;

private Field branchRollbackFlagField;

@BeforeEach
public void initBeforeEach() throws Exception {
branchRollbackFlagField = ConnectionProxy.LockRetryPolicy.class.getDeclaredField("LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT");
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(branchRollbackFlagField, branchRollbackFlagField.getModifiers() & ~Modifier.FINAL);
branchRollbackFlagField.setAccessible(true);
boolean branchRollbackFlag = (boolean) branchRollbackFlagField.get(null);
Assertions.assertTrue(branchRollbackFlag);

Connection targetConnection = Mockito.mock(Connection.class);
connectionProxy = Mockito.mock(ConnectionProxy.class);
Mockito.doThrow(new LockConflictException())
.when(connectionProxy).commit();
Mockito.when(connectionProxy.getAutoCommit())
.thenReturn(Boolean.TRUE);
Mockito.when(connectionProxy.getTargetConnection())
.thenReturn(targetConnection);
Mockito.when(connectionProxy.getContext())
.thenReturn(new ConnectionContext());
PreparedStatementProxy statementProxy = Mockito.mock(PreparedStatementProxy.class);
Mockito.when(statementProxy.getConnectionProxy())
.thenReturn(connectionProxy);
StatementCallback statementCallback = Mockito.mock(StatementCallback.class);
SQLInsertRecognizer sqlInsertRecognizer = Mockito.mock(SQLInsertRecognizer.class);
TableMeta tableMeta = Mockito.mock(TableMeta.class);
executor = Mockito.spy(new InsertExecutor(statementProxy, statementCallback, sqlInsertRecognizer));
Mockito.doReturn(tableMeta)
.when(executor).getTableMeta();
TableRecords tableRecords = new TableRecords();
Mockito.doReturn(tableRecords)
.when(executor).beforeImage();
Mockito.doReturn(tableRecords)
.when(executor).afterImage(tableRecords);
}

@Test
public void testLockRetryPolicyRollbackOnConflict() throws Exception {
boolean oldBranchRollbackFlag = (boolean) branchRollbackFlagField.get(null);
branchRollbackFlagField.set(null, true);
Assertions.assertThrows(LockWaitTimeoutException.class, executor::execute);
Mockito.verify(connectionProxy, Mockito.times(1))
.rollback();
Mockito.verify(connectionProxy.getTargetConnection(), Mockito.atLeastOnce())
.rollback();
branchRollbackFlagField.set(null, oldBranchRollbackFlag);
}

@Test
public void testLockRetryPolicyNotRollbackOnConflict() throws Throwable {
boolean oldBranchRollbackFlag = (boolean) branchRollbackFlagField.get(null);
branchRollbackFlagField.set(null, false);
Assertions.assertThrows(LockConflictException.class, executor::execute);
Mockito.verify(connectionProxy, Mockito.times(1))
.rollback();
Mockito.verify(connectionProxy.getTargetConnection(), Mockito.never())
.rollback();
branchRollbackFlagField.set(null, oldBranchRollbackFlag);
}
}
Loading