diff --git a/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java b/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java index 9e5e9f42094..a2cb3d99c66 100644 --- a/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java +++ b/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java @@ -97,6 +97,12 @@ public class ConfigurationKeys { * The constant CLIENT_REPORT_RETRY_COUNT. */ public static final String CLIENT_REPORT_RETRY_COUNT = CLIENT_PREFIX + "report.retry.count"; + + /** + * The constant CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT. + */ + public static final String CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT = CLIENT_PREFIX + "lock.retry.policy.branch-rollback-on-conflict"; + /** * The constant CLIENT_TABLE_META_CHECK_ENABLE. */ diff --git a/rm-datasource/src/main/java/io/seata/rm/datasource/ConnectionProxy.java b/rm-datasource/src/main/java/io/seata/rm/datasource/ConnectionProxy.java index 0c1ac51f750..e498578d60b 100644 --- a/rm-datasource/src/main/java/io/seata/rm/datasource/ConnectionProxy.java +++ b/rm-datasource/src/main/java/io/seata/rm/datasource/ConnectionProxy.java @@ -17,6 +17,7 @@ import java.sql.Connection; import java.sql.SQLException; +import java.util.concurrent.Callable; import com.alibaba.druid.util.JdbcConstants; @@ -29,6 +30,7 @@ import io.seata.core.model.BranchType; import io.seata.rm.DefaultResourceManager; import io.seata.rm.datasource.exec.LockConflictException; +import io.seata.rm.datasource.exec.LockRetryController; import io.seata.rm.datasource.undo.SQLUndoLog; import io.seata.rm.datasource.undo.UndoLogManager; import io.seata.rm.datasource.undo.UndoLogManagerOracle; @@ -51,6 +53,8 @@ public class ConnectionProxy extends AbstractConnectionProxy { private static final int REPORT_RETRY_COUNT = ConfigurationFactory.getInstance().getInt( ConfigurationKeys.CLIENT_REPORT_RETRY_COUNT, DEFAULT_REPORT_RETRY_COUNT); + private final static LockRetryPolicy LOCK_RETRY_POLICY = new LockRetryPolicy(); + /** * Instantiates a new Connection proxy. * @@ -169,6 +173,19 @@ public void appendLockKey(String lockKey) { @Override public void commit() throws SQLException { + try { + LOCK_RETRY_POLICY.execute(() -> { + doCommit(); + return null; + }); + } catch (SQLException e) { + throw e; + } catch (Exception e) { + throw new SQLException(e); + } + } + + private void doCommit() throws SQLException { if (context.inGlobalTransaction()) { processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { @@ -235,7 +252,7 @@ public void rollback() throws SQLException { public void setAutoCommit(boolean autoCommit) throws SQLException { if ((autoCommit) && !getAutoCommit()) { // change autocommit from false to true, we should commit() first according to JDBC spec. - commit(); + doCommit(); } targetConnection.setAutoCommit(autoCommit); } @@ -258,4 +275,41 @@ private void report(boolean commitDone) throws SQLException { } } } + + public static class LockRetryPolicy { + protected final static boolean LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT = + ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT, true); + + public T execute(Callable callable) throws Exception { + if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) { + return callable.call(); + } else { + return doRetryOnLockConflict(callable); + } + } + + protected T doRetryOnLockConflict(Callable callable) throws Exception { + LockRetryController lockRetryController = new LockRetryController(); + while (true) { + try { + return callable.call(); + } catch (LockConflictException lockConflict) { + onException(lockConflict); + lockRetryController.sleep(lockConflict); + } catch (Exception e) { + onException(e); + throw e; + } + } + } + + /** + * Callback on exception in doLockRetryOnConflict. + * + * @param e invocation exception + * @throws Exception error + */ + protected void onException(Exception e) throws Exception { + } + } } diff --git a/rm-datasource/src/main/java/io/seata/rm/datasource/exec/AbstractDMLBaseExecutor.java b/rm-datasource/src/main/java/io/seata/rm/datasource/exec/AbstractDMLBaseExecutor.java index 30f9b20b45c..cf72fde0b2b 100644 --- a/rm-datasource/src/main/java/io/seata/rm/datasource/exec/AbstractDMLBaseExecutor.java +++ b/rm-datasource/src/main/java/io/seata/rm/datasource/exec/AbstractDMLBaseExecutor.java @@ -15,9 +15,6 @@ */ package io.seata.rm.datasource.exec; -import java.sql.SQLException; -import java.sql.Statement; - import io.seata.rm.datasource.AbstractConnectionProxy; import io.seata.rm.datasource.ConnectionProxy; import io.seata.rm.datasource.StatementProxy; @@ -26,13 +23,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.Callable; + /** * The type Abstract dml base executor. * - * @author sharajava - * * @param the type parameter * @param the type parameter + * @author sharajava */ public abstract class AbstractDMLBaseExecutor extends BaseTransactionalExecutor { @@ -83,34 +84,23 @@ protected T executeAutoCommitFalse(Object[] args) throws Exception { * @throws Throwable the throwable */ protected T executeAutoCommitTrue(Object[] args) throws Throwable { - T result = null; AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); - LockRetryController lockRetryController = new LockRetryController(); try { connectionProxy.setAutoCommit(false); - while (true) { - try { - result = executeAutoCommitFalse(args); - connectionProxy.commit(); - break; - } catch (LockConflictException lockConflict) { - connectionProxy.getTargetConnection().rollback(); - lockRetryController.sleep(lockConflict); - } catch (Exception exx) { - connectionProxy.getTargetConnection().rollback(); - throw exx; - } - } - + return new LockRetryPolicy(connectionProxy.getTargetConnection()).execute(() -> { + T result = executeAutoCommitFalse(args); + connectionProxy.commit(); + return result; + }); } catch (Exception e) { // when exception occur in finally,this exception will lost, so just print it here LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e); + connectionProxy.rollback(); throw e; } finally { - ((ConnectionProxy)connectionProxy).getContext().reset(); + ((ConnectionProxy) connectionProxy).getContext().reset(); connectionProxy.setAutoCommit(true); } - return result; } /** @@ -130,4 +120,25 @@ protected T executeAutoCommitTrue(Object[] args) throws Throwable { */ protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException; + private static class LockRetryPolicy extends ConnectionProxy.LockRetryPolicy { + private final Connection connection; + + LockRetryPolicy(final Connection connection) { + this.connection = connection; + } + + @Override + public T execute(Callable callable) throws Exception { + if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) { + return doRetryOnLockConflict(callable); + } else { + return callable.call(); + } + } + + @Override + protected void onException(Exception e) throws Exception { + connection.rollback(); + } + } } diff --git a/rm-datasource/src/test/java/io/seata/rm/datasource/ConnectionProxyTest.java b/rm-datasource/src/test/java/io/seata/rm/datasource/ConnectionProxyTest.java new file mode 100644 index 00000000000..4c986422562 --- /dev/null +++ b/rm-datasource/src/test/java/io/seata/rm/datasource/ConnectionProxyTest.java @@ -0,0 +1,87 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.rm.datasource; + +import io.seata.core.exception.TransactionException; +import io.seata.core.exception.TransactionExceptionCode; +import io.seata.core.model.BranchType; +import io.seata.core.model.ResourceManager; +import io.seata.rm.DefaultResourceManager; +import io.seata.rm.datasource.exec.LockConflictException; +import io.seata.rm.datasource.exec.LockWaitTimeoutException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; + +/** + * ConnectionProxy test + * + * @author ggndnn + */ +public class ConnectionProxyTest { + private DataSourceProxy dataSourceProxy; + + private final static String TEST_RESOURCE_ID = "testResourceId"; + + private final static String TEST_XID = "testXid"; + + private Field branchRollbackFlagField; + + @BeforeEach + public void initBeforeEach() throws Exception { + branchRollbackFlagField = ConnectionProxy.LockRetryPolicy.class.getDeclaredField("LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT"); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(branchRollbackFlagField, branchRollbackFlagField.getModifiers() & ~Modifier.FINAL); + branchRollbackFlagField.setAccessible(true); + boolean branchRollbackFlag = (boolean) branchRollbackFlagField.get(null); + Assertions.assertTrue(branchRollbackFlag); + + dataSourceProxy = Mockito.mock(DataSourceProxy.class); + Mockito.when(dataSourceProxy.getResourceId()) + .thenReturn(TEST_RESOURCE_ID); + ResourceManager rm = Mockito.mock(ResourceManager.class); + Mockito.when(rm.branchRegister(BranchType.AT, dataSourceProxy.getResourceId(), null, TEST_XID, null, null)) + .thenThrow(new TransactionException(TransactionExceptionCode.LockKeyConflict)); + DefaultResourceManager defaultResourceManager = DefaultResourceManager.get(); + Assertions.assertNotNull(defaultResourceManager); + DefaultResourceManager.mockResourceManager(BranchType.AT, rm); + } + + @Test + public void testLockRetryPolicyRollbackOnConflict() throws Exception { + boolean oldBranchRollbackFlag = (boolean) branchRollbackFlagField.get(null); + branchRollbackFlagField.set(null, true); + ConnectionProxy connectionProxy = new ConnectionProxy(dataSourceProxy, null); + connectionProxy.bind(TEST_XID); + Assertions.assertThrows(LockConflictException.class, connectionProxy::commit); + branchRollbackFlagField.set(null, oldBranchRollbackFlag); + } + + @Test + public void testLockRetryPolicyNotRollbackOnConflict() throws Exception { + boolean oldBranchRollbackFlag = (boolean) branchRollbackFlagField.get(null); + branchRollbackFlagField.set(null, false); + ConnectionProxy connectionProxy = new ConnectionProxy(dataSourceProxy, null); + connectionProxy.bind(TEST_XID); + Assertions.assertThrows(LockWaitTimeoutException.class, connectionProxy::commit); + branchRollbackFlagField.set(null, oldBranchRollbackFlag); + } +} diff --git a/rm-datasource/src/test/java/io/seata/rm/datasource/exec/AbstractDMLBaseExecutorTest.java b/rm-datasource/src/test/java/io/seata/rm/datasource/exec/AbstractDMLBaseExecutorTest.java new file mode 100644 index 00000000000..6e108c10f1b --- /dev/null +++ b/rm-datasource/src/test/java/io/seata/rm/datasource/exec/AbstractDMLBaseExecutorTest.java @@ -0,0 +1,104 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.rm.datasource.exec; + +import io.seata.rm.datasource.ConnectionContext; +import io.seata.rm.datasource.ConnectionProxy; +import io.seata.rm.datasource.PreparedStatementProxy; +import io.seata.rm.datasource.sql.SQLInsertRecognizer; +import io.seata.rm.datasource.sql.struct.TableMeta; +import io.seata.rm.datasource.sql.struct.TableRecords; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.sql.Connection; + +/** + * AbstractDMLBaseExecutor test + * + * @author ggndnn + */ +public class AbstractDMLBaseExecutorTest { + private ConnectionProxy connectionProxy; + + private AbstractDMLBaseExecutor executor; + + private Field branchRollbackFlagField; + + @BeforeEach + public void initBeforeEach() throws Exception { + branchRollbackFlagField = ConnectionProxy.LockRetryPolicy.class.getDeclaredField("LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT"); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(branchRollbackFlagField, branchRollbackFlagField.getModifiers() & ~Modifier.FINAL); + branchRollbackFlagField.setAccessible(true); + boolean branchRollbackFlag = (boolean) branchRollbackFlagField.get(null); + Assertions.assertTrue(branchRollbackFlag); + + Connection targetConnection = Mockito.mock(Connection.class); + connectionProxy = Mockito.mock(ConnectionProxy.class); + Mockito.doThrow(new LockConflictException()) + .when(connectionProxy).commit(); + Mockito.when(connectionProxy.getAutoCommit()) + .thenReturn(Boolean.TRUE); + Mockito.when(connectionProxy.getTargetConnection()) + .thenReturn(targetConnection); + Mockito.when(connectionProxy.getContext()) + .thenReturn(new ConnectionContext()); + PreparedStatementProxy statementProxy = Mockito.mock(PreparedStatementProxy.class); + Mockito.when(statementProxy.getConnectionProxy()) + .thenReturn(connectionProxy); + StatementCallback statementCallback = Mockito.mock(StatementCallback.class); + SQLInsertRecognizer sqlInsertRecognizer = Mockito.mock(SQLInsertRecognizer.class); + TableMeta tableMeta = Mockito.mock(TableMeta.class); + executor = Mockito.spy(new InsertExecutor(statementProxy, statementCallback, sqlInsertRecognizer)); + Mockito.doReturn(tableMeta) + .when(executor).getTableMeta(); + TableRecords tableRecords = new TableRecords(); + Mockito.doReturn(tableRecords) + .when(executor).beforeImage(); + Mockito.doReturn(tableRecords) + .when(executor).afterImage(tableRecords); + } + + @Test + public void testLockRetryPolicyRollbackOnConflict() throws Exception { + boolean oldBranchRollbackFlag = (boolean) branchRollbackFlagField.get(null); + branchRollbackFlagField.set(null, true); + Assertions.assertThrows(LockWaitTimeoutException.class, executor::execute); + Mockito.verify(connectionProxy, Mockito.times(1)) + .rollback(); + Mockito.verify(connectionProxy.getTargetConnection(), Mockito.atLeastOnce()) + .rollback(); + branchRollbackFlagField.set(null, oldBranchRollbackFlag); + } + + @Test + public void testLockRetryPolicyNotRollbackOnConflict() throws Throwable { + boolean oldBranchRollbackFlag = (boolean) branchRollbackFlagField.get(null); + branchRollbackFlagField.set(null, false); + Assertions.assertThrows(LockConflictException.class, executor::execute); + Mockito.verify(connectionProxy, Mockito.times(1)) + .rollback(); + Mockito.verify(connectionProxy.getTargetConnection(), Mockito.never()) + .rollback(); + branchRollbackFlagField.set(null, oldBranchRollbackFlag); + } +} diff --git a/server/src/main/resources/nacos-config.txt b/server/src/main/resources/nacos-config.txt index 79f9b677dba..ea7236c7dae 100644 --- a/server/src/main/resources/nacos-config.txt +++ b/server/src/main/resources/nacos-config.txt @@ -19,6 +19,7 @@ service.max.rollback.retry.timeout=-1 client.async.commit.buffer.limit=10000 client.lock.retry.internal=10 client.lock.retry.times=30 +client.lock.retry.policy.branch-rollback-on-conflict=true client.table.meta.check.enable=true store.mode=file store.file.dir=file_store/data