From 34b8060b210a8340965861d9d6aa4579a8757e54 Mon Sep 17 00:00:00 2001 From: zjinlei <75718677@qq.com> Date: Mon, 2 Dec 2019 15:41:03 +0800 Subject: [PATCH] feature: solve the lock_key length problem (#1905) --- .../constants/ServerTableColumnsName.java | 5 - .../io/seata/core/lock/AbstractLocker.java | 11 ++ .../main/java/io/seata/core/lock/Locker.java | 18 +++ .../seata/core/store/BranchTransactionDO.java | 20 ---- .../java/io/seata/core/store/LockStore.java | 4 + .../core/store/db/LockStoreDataBaseDAO.java | 46 ++++++++ .../io/seata/core/store/db/LockStoreSqls.java | 39 +++++++ .../core/store/db/LogStoreDataBaseDAO.java | 10 +- .../io/seata/core/store/db/LogStoreSqls.java | 6 +- .../store/db/LogStoreDataBaseDAOTest.java | 5 - .../server/lock/AbstractLockManager.java | 73 +++++++++++- .../seata/server/lock/DefaultLockManager.java | 109 +----------------- .../io/seata/server/lock/LockerFactory.java | 23 ++-- .../server/lock/db/DataBaseLockManager.java | 60 ++++++++++ .../seata/server/lock/db/DataBaseLocker.java | 36 +++++- .../db/DatabaseTransactionStoreManager.java | 2 - server/src/main/resources/db_store.sql | 8 +- 17 files changed, 313 insertions(+), 162 deletions(-) create mode 100644 server/src/main/java/io/seata/server/lock/db/DataBaseLockManager.java diff --git a/core/src/main/java/io/seata/core/constants/ServerTableColumnsName.java b/core/src/main/java/io/seata/core/constants/ServerTableColumnsName.java index 09ef6f64bd3..cd90815f642 100644 --- a/core/src/main/java/io/seata/core/constants/ServerTableColumnsName.java +++ b/core/src/main/java/io/seata/core/constants/ServerTableColumnsName.java @@ -107,11 +107,6 @@ public class ServerTableColumnsName { */ public static final String BRANCH_TABLE_RESOURCE_ID = "resource_id"; - /** - * The constant branch_table column name lock_key - */ - public static final String BRANCH_TABLE_LOCK_KEY = "lock_key"; - /** * The constant branch_table column name branch_type */ diff --git a/core/src/main/java/io/seata/core/lock/AbstractLocker.java b/core/src/main/java/io/seata/core/lock/AbstractLocker.java index f6e1c56a4d3..7306b42f2f0 100644 --- a/core/src/main/java/io/seata/core/lock/AbstractLocker.java +++ b/core/src/main/java/io/seata/core/lock/AbstractLocker.java @@ -83,4 +83,15 @@ protected String getRowKey(String resourceId, String tableName, String pk) { public void cleanAllLocks() { } + + @Override + public boolean releaseLock(String xid, Long branchId) { + return false; + } + + @Override + public boolean releaseLock(String xid, List branchIds) { + return false; + } + } diff --git a/core/src/main/java/io/seata/core/lock/Locker.java b/core/src/main/java/io/seata/core/lock/Locker.java index 723042a4225..d3e7087ff52 100644 --- a/core/src/main/java/io/seata/core/lock/Locker.java +++ b/core/src/main/java/io/seata/core/lock/Locker.java @@ -41,6 +41,24 @@ public interface Locker { */ boolean releaseLock(List rowLock); + /** + * Un lock boolean. + * + * @param xid the xid + * @param branchId the branchId + * @return the boolean + */ + boolean releaseLock(String xid, Long branchId); + + /** + * Un lock boolean. + * + * @param xid the xid + * @param branchIds the branchIds + * @return the boolean + */ + boolean releaseLock(String xid, List branchIds); + /** * Is lockable boolean. * diff --git a/core/src/main/java/io/seata/core/store/BranchTransactionDO.java b/core/src/main/java/io/seata/core/store/BranchTransactionDO.java index 2a646a3cf7b..6957ba1a8d9 100644 --- a/core/src/main/java/io/seata/core/store/BranchTransactionDO.java +++ b/core/src/main/java/io/seata/core/store/BranchTransactionDO.java @@ -38,8 +38,6 @@ public class BranchTransactionDO { private String resourceId; - private String lockKey; - private String branchType; private int status = BranchStatus.Unknown.getCode(); @@ -142,24 +140,6 @@ public void setResourceId(String resourceId) { this.resourceId = resourceId; } - /** - * Gets lock key. - * - * @return the lock key - */ - public String getLockKey() { - return lockKey; - } - - /** - * Sets lock key. - * - * @param lockKey the lock key - */ - public void setLockKey(String lockKey) { - this.lockKey = lockKey; - } - /** * Gets branch type. * diff --git a/core/src/main/java/io/seata/core/store/LockStore.java b/core/src/main/java/io/seata/core/store/LockStore.java index 49e92e6ab10..fe39c2ab9c9 100644 --- a/core/src/main/java/io/seata/core/store/LockStore.java +++ b/core/src/main/java/io/seata/core/store/LockStore.java @@ -58,6 +58,10 @@ public interface LockStore { */ boolean unLock(List lockDOs); + boolean unLock(String xid, Long branchId); + + boolean unLock(String xid, List branchIds); + /** * Is lockable boolean. * diff --git a/core/src/main/java/io/seata/core/store/db/LockStoreDataBaseDAO.java b/core/src/main/java/io/seata/core/store/db/LockStoreDataBaseDAO.java index 9330cd619c2..272846a82fe 100644 --- a/core/src/main/java/io/seata/core/store/db/LockStoreDataBaseDAO.java +++ b/core/src/main/java/io/seata/core/store/db/LockStoreDataBaseDAO.java @@ -232,6 +232,52 @@ public boolean unLock(List lockDOs) { return true; } + @Override + public boolean unLock(String xid, Long branchId) { + Connection conn = null; + PreparedStatement ps = null; + try { + conn = logStoreDataSource.getConnection(); + conn.setAutoCommit(true); + //batch release lock by branch + String batchDeleteSQL = LockStoreSqls.getBatchDeleteLockSqlByBranch(lockTable, dbType); + ps = conn.prepareStatement(batchDeleteSQL); + ps.setString(1, xid); + ps.setLong(2, branchId); + ps.executeUpdate(); + } catch (SQLException e) { + throw new StoreException(e); + } finally { + IOUtil.close(ps, conn); + } + return true; + } + + @Override + public boolean unLock(String xid, List branchIds) { + Connection conn = null; + PreparedStatement ps = null; + try { + conn = logStoreDataSource.getConnection(); + conn.setAutoCommit(true); + StringJoiner sj = new StringJoiner(","); + branchIds.stream().forEach(branchId -> sj.add("?")); + //batch release lock by branch list + String batchDeleteSQL = LockStoreSqls.getBatchDeleteLockSqlByBranchs(lockTable, sj.toString(), dbType); + ps = conn.prepareStatement(batchDeleteSQL); + ps.setString(1, xid); + for (int i = 0; i < branchIds.size(); i++) { + ps.setLong(i + 2, branchIds.get(i)); + } + ps.executeUpdate(); + } catch (SQLException e) { + throw new StoreException(e); + } finally { + IOUtil.close(ps, conn); + } + return true; + } + @Override public boolean isLockable(List lockDOs) { Connection conn = null; diff --git a/core/src/main/java/io/seata/core/store/db/LockStoreSqls.java b/core/src/main/java/io/seata/core/store/db/LockStoreSqls.java index d286c77e273..587d20ddde7 100644 --- a/core/src/main/java/io/seata/core/store/db/LockStoreSqls.java +++ b/core/src/main/java/io/seata/core/store/db/LockStoreSqls.java @@ -73,6 +73,20 @@ public class LockStoreSqls { public static final String BATCH_DELETE_LOCK_SQL = "delete from " + LOCK_TABLE_PLACEHOLD + " where " + ServerTableColumnsName.LOCK_TABLE_XID + " = ? and " + ServerTableColumnsName.LOCK_TABLE_ROW_KEY + " in (" + IN_PARAMS_PLACEHOLD + ") "; + /** + * The constant BATCH_DELETE_LOCK_BY_BRANCH_SQL. + */ + public static final String BATCH_DELETE_LOCK_BY_BRANCH_SQL = "delete from " + LOCK_TABLE_PLACEHOLD + + " where " + ServerTableColumnsName.LOCK_TABLE_XID + " = ? and " + ServerTableColumnsName.LOCK_TABLE_BRANCH_ID + " = ? "; + + + /** + * The constant BATCH_DELETE_LOCK_BY_BRANCHS_SQL. + */ + public static final String BATCH_DELETE_LOCK_BY_BRANCHS_SQL = "delete from " + LOCK_TABLE_PLACEHOLD + + " where " + ServerTableColumnsName.LOCK_TABLE_XID + " = ? and " + ServerTableColumnsName.LOCK_TABLE_BRANCH_ID + " in (" + IN_PARAMS_PLACEHOLD + ") "; + + /** * The constant QUERY_LOCK_SQL. */ @@ -128,6 +142,31 @@ public static String getBatchDeleteLockSql(String lockTable, String paramPlaceHo paramPlaceHold); } + + /** + * Get batch delete lock sql string. + * + * @param lockTable the lock table + * @param dbType the db type + * @return the string + */ + public static String getBatchDeleteLockSqlByBranch(String lockTable, String dbType) { + return BATCH_DELETE_LOCK_BY_BRANCH_SQL.replace(LOCK_TABLE_PLACEHOLD, lockTable); + } + + /** + * Get batch delete lock sql string. + * + * @param lockTable the lock table + * @param paramPlaceHold the param place hold + * @param dbType the db type + * @return the string + */ + public static String getBatchDeleteLockSqlByBranchs(String lockTable, String paramPlaceHold, String dbType) { + return BATCH_DELETE_LOCK_BY_BRANCHS_SQL.replace(LOCK_TABLE_PLACEHOLD, lockTable).replace(IN_PARAMS_PLACEHOLD, + paramPlaceHold); + } + /** * Get query lock sql string. * diff --git a/core/src/main/java/io/seata/core/store/db/LogStoreDataBaseDAO.java b/core/src/main/java/io/seata/core/store/db/LogStoreDataBaseDAO.java index 5a15b876eee..8a815eed06e 100644 --- a/core/src/main/java/io/seata/core/store/db/LogStoreDataBaseDAO.java +++ b/core/src/main/java/io/seata/core/store/db/LogStoreDataBaseDAO.java @@ -339,11 +339,10 @@ public boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO ps.setLong(3, branchTransactionDO.getBranchId()); ps.setString(4, branchTransactionDO.getResourceGroupId()); ps.setString(5, branchTransactionDO.getResourceId()); - ps.setString(6, branchTransactionDO.getLockKey()); - ps.setString(7, branchTransactionDO.getBranchType()); - ps.setInt(8, branchTransactionDO.getStatus()); - ps.setString(9, branchTransactionDO.getClientId()); - ps.setString(10, branchTransactionDO.getApplicationData()); + ps.setString(6, branchTransactionDO.getBranchType()); + ps.setInt(7, branchTransactionDO.getStatus()); + ps.setString(8, branchTransactionDO.getClientId()); + ps.setString(9, branchTransactionDO.getApplicationData()); return ps.executeUpdate() > 0; } catch (SQLException e) { throw new StoreException(e); @@ -448,7 +447,6 @@ private BranchTransactionDO convertBranchTransactionDO(ResultSet rs) throws SQLE branchTransactionDO.setStatus(rs.getInt(ServerTableColumnsName.BRANCH_TABLE_STATUS)); branchTransactionDO.setApplicationData(rs.getString(ServerTableColumnsName.BRANCH_TABLE_APPLICATION_DATA)); branchTransactionDO.setClientId(rs.getString(ServerTableColumnsName.BRANCH_TABLE_CLIENT_ID)); - branchTransactionDO.setLockKey(rs.getString(ServerTableColumnsName.BRANCH_TABLE_LOCK_KEY)); branchTransactionDO.setXid(rs.getString(ServerTableColumnsName.BRANCH_TABLE_XID)); branchTransactionDO.setResourceId(rs.getString(ServerTableColumnsName.BRANCH_TABLE_RESOURCE_ID)); branchTransactionDO.setBranchId(rs.getLong(ServerTableColumnsName.BRANCH_TABLE_BRANCH_ID)); diff --git a/core/src/main/java/io/seata/core/store/db/LogStoreSqls.java b/core/src/main/java/io/seata/core/store/db/LogStoreSqls.java index 2c3eccb55f4..0f6e1186706 100644 --- a/core/src/main/java/io/seata/core/store/db/LogStoreSqls.java +++ b/core/src/main/java/io/seata/core/store/db/LogStoreSqls.java @@ -62,7 +62,7 @@ public class LogStoreSqls { protected static final String ALL_BRANCH_COLUMNS = ServerTableColumnsName.BRANCH_TABLE_XID + ", " + ServerTableColumnsName.BRANCH_TABLE_TRANSACTION_ID + ", " + ServerTableColumnsName.BRANCH_TABLE_BRANCH_ID + ", " + ServerTableColumnsName.BRANCH_TABLE_RESOURCE_GROUP_ID + ", " - + ServerTableColumnsName.BRANCH_TABLE_RESOURCE_ID + ", " + ServerTableColumnsName.BRANCH_TABLE_LOCK_KEY + ", " + + ServerTableColumnsName.BRANCH_TABLE_RESOURCE_ID + ", " + ServerTableColumnsName.BRANCH_TABLE_BRANCH_TYPE + ", " + ServerTableColumnsName.BRANCH_TABLE_STATUS + ", " + ServerTableColumnsName.BRANCH_TABLE_CLIENT_ID + ", " + ServerTableColumnsName.BRANCH_TABLE_APPLICATION_DATA + ", " + ServerTableColumnsName.BRANCH_TABLE_GMT_CREATE + ", " + ServerTableColumnsName.BRANCH_TABLE_GMT_MODIFIED; @@ -143,14 +143,14 @@ public class LogStoreSqls { */ public static final String INSERT_BRANCH_TRANSACTION_MYSQL = "insert into " + BRANCH_TABLE_PLACEHOLD + "(" + ALL_BRANCH_COLUMNS + ")" + - "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())"; + "values (?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())"; /** * The constant INSERT_BRANCH_TRANSACTION_ORACLE. */ public static final String INSERT_BRANCH_TRANSACTION_ORACLE = "insert into " + BRANCH_TABLE_PLACEHOLD + "(" + ALL_BRANCH_COLUMNS + ")" + - "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, sysdate, sysdate)"; + "values (?, ?, ?, ?, ?, ?, ?, ?, ?, sysdate, sysdate)"; /** * The constant UPDATE_BRANCH_TRANSACTION_STATUS_MYSQL. diff --git a/core/src/test/java/io/seata/core/store/db/LogStoreDataBaseDAOTest.java b/core/src/test/java/io/seata/core/store/db/LogStoreDataBaseDAOTest.java index 698e1736041..a450c953b12 100644 --- a/core/src/test/java/io/seata/core/store/db/LogStoreDataBaseDAOTest.java +++ b/core/src/test/java/io/seata/core/store/db/LogStoreDataBaseDAOTest.java @@ -445,7 +445,6 @@ public void queryBranchTransactionDO() throws SQLException { branchTransactionDO.setBranchId(345465676); branchTransactionDO.setBranchType("TCC"); branchTransactionDO.setResourceGroupId("abc"); - branchTransactionDO.setLockKey("t:1,2,3;t2,4,5,6"); branchTransactionDO.setResourceGroupId("a"); branchTransactionDO.setClientId("1.1.1.1"); branchTransactionDO.setStatus(1); @@ -464,7 +463,6 @@ public void queryBranchTransactionDO() throws SQLException { branchTransactionDO.setBranchId(78563453); branchTransactionDO.setBranchType("TCC"); branchTransactionDO.setResourceGroupId("abc"); - branchTransactionDO.setLockKey("t:6;t2:7"); branchTransactionDO.setResourceGroupId("a"); branchTransactionDO.setClientId("1.1.1.1"); branchTransactionDO.setStatus(1); @@ -508,7 +506,6 @@ public void insertBranchTransactionDO() throws SQLException { branchTransactionDO.setBranchId(1234508); branchTransactionDO.setBranchType("TCC"); branchTransactionDO.setResourceGroupId("abc"); - branchTransactionDO.setLockKey("t:1,2,3;t2,4,5,6"); branchTransactionDO.setResourceGroupId("a"); branchTransactionDO.setClientId("1.1.1.1"); branchTransactionDO.setStatus(1); @@ -548,7 +545,6 @@ public void updateBranchTransactionDO() throws SQLException { branchTransactionDO.setBranchId(343434318); branchTransactionDO.setBranchType("TCC"); branchTransactionDO.setResourceGroupId("abc"); - branchTransactionDO.setLockKey("t:1,2,3;t2,4,5,6"); branchTransactionDO.setResourceGroupId("a"); branchTransactionDO.setClientId("1.1.1.1"); branchTransactionDO.setStatus(1); @@ -590,7 +586,6 @@ public void deleteBranchTransactionDO() throws SQLException { branchTransactionDO.setBranchId(34567798); branchTransactionDO.setBranchType("TCC"); branchTransactionDO.setResourceGroupId("abc"); - branchTransactionDO.setLockKey("t:1,2,3;t2,4,5,6"); branchTransactionDO.setResourceGroupId("a"); branchTransactionDO.setClientId("1.1.1.1"); branchTransactionDO.setStatus(1); diff --git a/server/src/main/java/io/seata/server/lock/AbstractLockManager.java b/server/src/main/java/io/seata/server/lock/AbstractLockManager.java index 1208578d1f2..9d0b8f848ed 100644 --- a/server/src/main/java/io/seata/server/lock/AbstractLockManager.java +++ b/server/src/main/java/io/seata/server/lock/AbstractLockManager.java @@ -19,7 +19,10 @@ import java.util.List; import io.seata.common.XID; +import io.seata.common.util.CollectionUtils; import io.seata.common.util.StringUtils; +import io.seata.core.exception.TransactionException; +import io.seata.core.lock.Locker; import io.seata.core.lock.RowLock; import io.seata.server.session.BranchSession; import org.slf4j.Logger; @@ -29,7 +32,6 @@ * The type Abstract lock manager. * * @author zhangsen - * @data 2019 /4/25 */ public abstract class AbstractLockManager implements LockManager { @@ -38,6 +40,75 @@ public abstract class AbstractLockManager implements LockManager { */ protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractLockManager.class); + @Override + public boolean acquireLock(BranchSession branchSession) throws TransactionException { + if (branchSession == null) { + throw new IllegalArgumentException("branchSession can't be null for memory/file locker."); + } + String lockKey = branchSession.getLockKey(); + if (StringUtils.isNullOrEmpty(lockKey)) { + //no lock + return true; + } + //get locks of branch + List locks = collectRowLocks(branchSession); + if (CollectionUtils.isEmpty(locks)) { + //no lock + return true; + } + return getLocker(branchSession).acquireLock(locks); + } + + @Override + public boolean releaseLock(BranchSession branchSession) throws TransactionException { + if (branchSession == null) { + throw new IllegalArgumentException("branchSession can't be null for memory/file locker."); + } + List locks = collectRowLocks(branchSession); + try { + return getLocker(branchSession).releaseLock(locks); + } catch (Exception t) { + LOGGER.error("unLock error, branchSession:{}", branchSession, t); + return false; + } + } + + @Override + public boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException { + List locks = collectRowLocks(lockKey, resourceId, xid); + try { + return getLocker().isLockable(locks); + } catch (Exception t) { + LOGGER.error("isLockable error, xid:{} resourceId:{}, lockKey:{}", xid, resourceId, lockKey, t); + return false; + } + } + + + @Override + public void cleanAllLocks() throws TransactionException { + getLocker().cleanAllLocks(); + } + + /** + * Gets locker. + * + * @return the locker + */ + protected Locker getLocker() { + return getLocker(null); + } + + /** + * Gets locker. + * + * @param branchSession the branch session + * @return the locker + */ + protected Locker getLocker(BranchSession branchSession) { + return LockerFactory.get(branchSession); + } + /** * Collect row locks list.` * diff --git a/server/src/main/java/io/seata/server/lock/DefaultLockManager.java b/server/src/main/java/io/seata/server/lock/DefaultLockManager.java index eab0326f048..a3c89978cc7 100644 --- a/server/src/main/java/io/seata/server/lock/DefaultLockManager.java +++ b/server/src/main/java/io/seata/server/lock/DefaultLockManager.java @@ -16,17 +16,8 @@ package io.seata.server.lock; import java.util.ArrayList; -import java.util.List; -import io.seata.common.util.CollectionUtils; -import io.seata.common.util.StringUtils; -import io.seata.config.Configuration; -import io.seata.config.ConfigurationFactory; -import io.seata.core.constants.ConfigurationKeys; import io.seata.core.exception.TransactionException; -import io.seata.core.lock.Locker; -import io.seata.core.lock.RowLock; -import io.seata.core.store.StoreMode; import io.seata.server.session.BranchSession; import io.seata.server.session.GlobalSession; @@ -34,109 +25,19 @@ * The type Default lock manager. * * @author zhangsen - * @data 2019 -05-15 */ public class DefaultLockManager extends AbstractLockManager { - private static Locker locker = null; - - /** - * The constant CONFIG. - */ - protected static final Configuration CONFIG = ConfigurationFactory.getInstance(); - - @Override - public boolean acquireLock(BranchSession branchSession) throws TransactionException { - if (branchSession == null) { - throw new IllegalArgumentException("branchSession can't be null for memory/file locker."); - } - String lockKey = branchSession.getLockKey(); - if (StringUtils.isNullOrEmpty(lockKey)) { - //no lock - return true; - } - //get locks of branch - List locks = collectRowLocks(branchSession); - if (CollectionUtils.isEmpty(locks)) { - //no lock - return true; - } - return getLocker(branchSession).acquireLock(locks); - } - - @Override - public boolean releaseLock(BranchSession branchSession) throws TransactionException { - if (branchSession == null) { - throw new IllegalArgumentException("branchSession can't be null for memory/file locker."); - } - List locks = collectRowLocks(branchSession); - try { - return getLocker(branchSession).releaseLock(locks); - } catch (Exception t) { - LOGGER.error("unLock error, branchSession:{}",branchSession, t); - return false; - } - } - @Override public boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException { ArrayList branchSessions = globalSession.getBranchSessions(); - String storeMode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE); - if (StoreMode.DB.name().equalsIgnoreCase(storeMode)) { - List locks = new ArrayList<>(); - for (BranchSession branchSession : branchSessions) { - locks.addAll(collectRowLocks(branchSession)); + boolean releaseLockResult = true; + for (BranchSession branchSession : branchSessions) { + if (!this.releaseLock(branchSession)) { + releaseLockResult = false; } - try { - return getLocker(null).releaseLock(locks); - } catch (Exception t) { - LOGGER.error("unLock globalSession error, xid:{}", globalSession.getXid(), t); - return false; - } - } else { - boolean releaseLockResult = true; - for (BranchSession branchSession : branchSessions) { - if (!this.releaseLock(branchSession)) { - releaseLockResult = false; - } - } - return releaseLockResult; } - } - - @Override - public boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException { - List locks = collectRowLocks(lockKey, resourceId, xid); - try { - return getLocker().isLockable(locks); - } catch (Exception t) { - LOGGER.error("isLockable error, xid:{} resourceId:{}, lockKey:{}", xid,resourceId,lockKey,t); - return false; - } - } - - @Override - public void cleanAllLocks() throws TransactionException { - getLocker().cleanAllLocks(); - } - - /** - * Gets locker. - * - * @return the locker - */ - protected Locker getLocker() { - return getLocker(null); - } - - /** - * Gets locker. - * - * @param branchSession the branch session - * @return the locker - */ - protected Locker getLocker(BranchSession branchSession) { - return LockerFactory.get(branchSession); + return releaseLockResult; } } diff --git a/server/src/main/java/io/seata/server/lock/LockerFactory.java b/server/src/main/java/io/seata/server/lock/LockerFactory.java index eda7d0a965b..7f2592b2e40 100644 --- a/server/src/main/java/io/seata/server/lock/LockerFactory.java +++ b/server/src/main/java/io/seata/server/lock/LockerFactory.java @@ -15,21 +15,23 @@ */ package io.seata.server.lock; +import javax.sql.DataSource; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import io.seata.common.loader.EnhancedServiceLoader; +import io.seata.common.util.StringUtils; import io.seata.config.Configuration; import io.seata.config.ConfigurationFactory; import io.seata.core.constants.ConfigurationKeys; import io.seata.core.lock.Locker; import io.seata.core.store.StoreMode; import io.seata.core.store.db.DataSourceGenerator; +import io.seata.server.lock.db.DataBaseLockManager; import io.seata.server.session.BranchSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.sql.DataSource; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - /** * The type Lock manager factory. * @@ -60,7 +62,7 @@ public class LockerFactory { /** * The constant lockManager. */ - protected static LockManager lockManager = new DefaultLockManager(); + protected static LockManager lockManager; /** * Get lock manager. @@ -68,6 +70,13 @@ public class LockerFactory { * @return the lock manager */ public static final LockManager getLockManager() { + if (lockManager == null) { + if (StringUtils.equalsIgnoreCase(StoreMode.DB.name(), CONFIG.getConfig(ConfigurationKeys.STORE_MODE))) { + lockManager = new DataBaseLockManager(); + } else { + lockManager = new DefaultLockManager(); + } + } return lockManager; } @@ -79,7 +88,7 @@ public static final LockManager getLockManager() { */ public static final Locker get(BranchSession branchSession) { String storeMode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE); - if (StoreMode.DB.name().equalsIgnoreCase(storeMode)) { + if (StringUtils.equalsIgnoreCase(StoreMode.DB.name(), storeMode)) { if (lockerMap.get(storeMode) != null) { return lockerMap.get(storeMode); } @@ -91,7 +100,7 @@ public static final Locker get(BranchSession branchSession) { locker = EnhancedServiceLoader.load(Locker.class, storeMode, new Class[] {DataSource.class}, new Object[] {logStoreDataSource}); lockerMap.putIfAbsent(storeMode, locker); - } else if (StoreMode.FILE.name().equalsIgnoreCase(storeMode)) { + } else if (StringUtils.equalsIgnoreCase(StoreMode.FILE.name(), storeMode)) { locker = EnhancedServiceLoader.load(Locker.class, storeMode, new Class[] {BranchSession.class}, new Object[] {branchSession}); } else { diff --git a/server/src/main/java/io/seata/server/lock/db/DataBaseLockManager.java b/server/src/main/java/io/seata/server/lock/db/DataBaseLockManager.java new file mode 100644 index 00000000000..c314f923ad0 --- /dev/null +++ b/server/src/main/java/io/seata/server/lock/db/DataBaseLockManager.java @@ -0,0 +1,60 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.server.lock.db; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import io.seata.common.util.CollectionUtils; +import io.seata.core.exception.TransactionException; +import io.seata.server.lock.AbstractLockManager; +import io.seata.server.session.BranchSession; +import io.seata.server.session.GlobalSession; + +/** + * The type db lock manager. + * + * @author zjinlei + */ +public class DataBaseLockManager extends AbstractLockManager { + + @Override + public boolean releaseLock(BranchSession branchSession) throws TransactionException { + try { + return getLocker().releaseLock(branchSession.getXid(), branchSession.getBranchId()); + } catch (Exception t) { + LOGGER.error("unLock error, xid {}, branchId:{}", branchSession.getXid(), branchSession.getBranchId(), t); + return false; + } + } + + @Override + public boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException { + ArrayList branchSessions = globalSession.getBranchSessions(); + if (CollectionUtils.isEmpty(branchSessions)) { + return true; + } + List branchIds = branchSessions.stream().map(BranchSession::getBranchId).collect(Collectors.toList()); + try { + return getLocker().releaseLock(globalSession.getXid(), branchIds); + } catch (Exception t) { + LOGGER.error("unLock globalSession error, xid:{} branchIds:{}", globalSession.getXid(), + CollectionUtils.toString(branchIds), t); + return false; + } + } +} diff --git a/server/src/main/java/io/seata/server/lock/db/DataBaseLocker.java b/server/src/main/java/io/seata/server/lock/db/DataBaseLocker.java index ce4c0ab00e6..d3125c71436 100644 --- a/server/src/main/java/io/seata/server/lock/db/DataBaseLocker.java +++ b/server/src/main/java/io/seata/server/lock/db/DataBaseLocker.java @@ -15,9 +15,8 @@ */ package io.seata.server.lock.db; -import java.util.List; - import javax.sql.DataSource; +import java.util.List; import io.seata.common.exception.DataAccessException; import io.seata.common.exception.StoreException; @@ -33,7 +32,6 @@ * The type Data base locker. * * @author zhangsen - * @data 2019 -05-15 */ @LoadLevel(name = "db") public class DataBaseLocker extends AbstractLocker { @@ -67,7 +65,7 @@ public boolean acquireLock(List locks) { } catch (StoreException e) { throw e; } catch (Exception t) { - LOGGER.error("AcquireLock error, locks:{}",CollectionUtils.toString(locks), t); + LOGGER.error("AcquireLock error, locks:{}", CollectionUtils.toString(locks), t); return false; } } @@ -83,7 +81,35 @@ public boolean releaseLock(List locks) { } catch (StoreException e) { throw e; } catch (Exception t) { - LOGGER.error("unLock error, locks:{}",CollectionUtils.toString(locks), t); + LOGGER.error("unLock error, locks:{}", CollectionUtils.toString(locks), t); + return false; + } + } + + @Override + public boolean releaseLock(String xid, Long branchId) { + try { + return lockStore.unLock(xid, branchId); + } catch (StoreException e) { + throw e; + } catch (Exception t) { + LOGGER.error("unLock by branchId error, xid {}, branchId:{}", xid, branchId, t); + return false; + } + } + + @Override + public boolean releaseLock(String xid, List branchIds) { + if (CollectionUtils.isEmpty(branchIds)) { + //no lock + return true; + } + try { + return lockStore.unLock(xid, branchIds); + } catch (StoreException e) { + throw e; + } catch (Exception t) { + LOGGER.error("unLock by branchIds error, xid {}, branchIds:{}", xid, CollectionUtils.toString(branchIds), t); return false; } } diff --git a/server/src/main/java/io/seata/server/store/db/DatabaseTransactionStoreManager.java b/server/src/main/java/io/seata/server/store/db/DatabaseTransactionStoreManager.java index 4f3251b9993..999599d7a45 100644 --- a/server/src/main/java/io/seata/server/store/db/DatabaseTransactionStoreManager.java +++ b/server/src/main/java/io/seata/server/store/db/DatabaseTransactionStoreManager.java @@ -263,7 +263,6 @@ private BranchSession convertBranchSession(BranchTransactionDO branchTransaction branchSession.setBranchType(BranchType.valueOf(branchTransactionDO.getBranchType())); branchSession.setResourceId(branchTransactionDO.getResourceId()); branchSession.setClientId(branchTransactionDO.getClientId()); - branchSession.setLockKey(branchTransactionDO.getLockKey()); branchSession.setResourceGroupId(branchTransactionDO.getResourceGroupId()); branchSession.setStatus(BranchStatus.get(branchTransactionDO.getStatus())); return branchSession; @@ -301,7 +300,6 @@ private BranchTransactionDO convertBranchTransactionDO(SessionStorable session) branchTransactionDO.setBranchId(branchSession.getBranchId()); branchTransactionDO.setBranchType(branchSession.getBranchType().name()); branchTransactionDO.setClientId(branchSession.getClientId()); - branchTransactionDO.setLockKey(branchSession.getLockKey()); branchTransactionDO.setResourceGroupId(branchSession.getResourceGroupId()); branchTransactionDO.setTransactionId(branchSession.getTransactionId()); branchTransactionDO.setApplicationData(branchSession.getApplicationData()); diff --git a/server/src/main/resources/db_store.sql b/server/src/main/resources/db_store.sql index 919237083ff..c98f2b29cb3 100644 --- a/server/src/main/resources/db_store.sql +++ b/server/src/main/resources/db_store.sql @@ -25,7 +25,6 @@ create table `branch_table` ( `transaction_id` bigint , `resource_group_id` varchar(32), `resource_id` varchar(256) , - `lock_key` varchar(128) , `branch_type` varchar(8) , `status` tinyint, `client_id` varchar(64), @@ -41,12 +40,13 @@ drop table if exists `lock_table`; create table `lock_table` ( `row_key` varchar(128) not null, `xid` varchar(96), - `transaction_id` long , - `branch_id` long, + `transaction_id` bigint , + `branch_id` bigint not null, `resource_id` varchar(256) , `table_name` varchar(32) , `pk` varchar(36) , `gmt_create` datetime , `gmt_modified` datetime, - primary key(`row_key`) + primary key(`row_key`), + key `idx_branch_id` (`branch_id`) );