From fc0c9ced86c317fafb2bffd37289d8afa56e7be8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IT=E4=BA=91=E6=B8=85?= <33415199+lightClouds917@users.noreply.github.com> Date: Thu, 3 Sep 2020 15:25:13 +0800 Subject: [PATCH] optimize: refactor the redis lock string to hash (#3016) --- .../main/java/io/seata/common/Constants.java | 14 +- .../storage/redis/lock/RedisLocker.java | 237 +++++++++--------- .../lock/redis/RedisLockManagerTest.java | 6 +- 3 files changed, 131 insertions(+), 126 deletions(-) diff --git a/common/src/main/java/io/seata/common/Constants.java b/common/src/main/java/io/seata/common/Constants.java index 0cc11201a63..798c8855716 100644 --- a/common/src/main/java/io/seata/common/Constants.java +++ b/common/src/main/java/io/seata/common/Constants.java @@ -23,6 +23,7 @@ * @author slievrly */ public class Constants { + /** * The constant IP_PORT_SPLIT_CHAR. */ @@ -40,8 +41,15 @@ public class Constants { */ public static final String DBKEYS_SPLIT_CHAR = ","; - /** the start time of transaction */ - public static final String START_TIME = "start-time"; + /** + * The constant ROW_LOCK_KEY_SPLIT_CHAR. + */ + public static final String ROW_LOCK_KEY_SPLIT_CHAR = ";"; + + /** + * the start time of transaction + */ + public static final String START_TIME = "start-time"; /** * app name @@ -102,7 +110,7 @@ public class Constants { * default charset name */ public static final String DEFAULT_CHARSET_NAME = "UTF-8"; - + /** * default charset is utf-8 */ diff --git a/server/src/main/java/io/seata/server/storage/redis/lock/RedisLocker.java b/server/src/main/java/io/seata/server/storage/redis/lock/RedisLocker.java index 7cb8b0e3586..44936c30c55 100644 --- a/server/src/main/java/io/seata/server/storage/redis/lock/RedisLocker.java +++ b/server/src/main/java/io/seata/server/storage/redis/lock/RedisLocker.java @@ -15,23 +15,19 @@ */ package io.seata.server.storage.redis.lock; +import com.google.common.collect.Lists; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.Map; import java.util.HashMap; +import java.util.StringJoiner; import java.util.stream.Collectors; -import com.alibaba.fastjson.JSON; import io.seata.common.util.CollectionUtils; import io.seata.common.util.LambdaUtils; import io.seata.common.util.StringUtils; -import io.seata.config.ConfigurationFactory; -import io.seata.core.constants.ConfigurationKeys; import io.seata.core.lock.AbstractLocker; import io.seata.core.lock.RowLock; import io.seata.core.store.LockDO; @@ -39,117 +35,142 @@ import redis.clients.jedis.Jedis; import redis.clients.jedis.Pipeline; +import static io.seata.common.Constants.ROW_LOCK_KEY_SPLIT_CHAR; + /** + * The redis lock store operation + * * @author funkye + * @author wangzhongxiang */ public class RedisLocker extends AbstractLocker { - private static final Integer DEFAULT_QUERY_LIMIT = 100; + private static final Integer SUCCEED = 1; - private static final String DEFAULT_REDIS_SEATA_LOCK_PREFIX = "SEATA_LOCK_"; + private static final Integer FAILED = 0; - private static final String DEFAULT_REDIS_SEATA_LOCK_XID_PREFIX = "SEATA_LOCK_XID_"; + private static final String DEFAULT_REDIS_SEATA_ROW_LOCK_PREFIX = "SEATA_ROW_LOCK_"; - /** - * The query limit. - */ - private int logQueryLimit; + private static final String DEFAULT_REDIS_SEATA_GLOBAL_LOCK_PREFIX = "SEATA_GLOBAL_LOCK"; + + private static final String XID = "xid"; + + private static final String TRANSACTION_ID = "transactionId"; + + private static final String BRANCH_ID = "branchId"; + + private static final String RESOURCE_ID = "resourceId"; + + private static final String TABLE_NAME = "tableName"; + + private static final String PK = "pk"; + + private static final String ROW_KEY = "rowKey"; /** * Instantiates a new Redis locker. */ public RedisLocker() { - logQueryLimit = - ConfigurationFactory.getInstance().getInt(ConfigurationKeys.STORE_REDIS_QUERY_LIMIT, DEFAULT_QUERY_LIMIT); } @Override public boolean acquireLock(List rowLocks) { if (CollectionUtils.isEmpty(rowLocks)) { - // no lock return true; } - Set successList = new HashSet<>(); - long status = 1; + Integer status = SUCCEED; + String needLockXid = rowLocks.get(0).getXid(); + Long branchId = rowLocks.get(0).getBranchId(); + try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { - List locks = convertToLockDO(rowLocks); - if (locks.size() > 1) { - locks = - locks.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList()); + List needLockDOS = convertToLockDO(rowLocks); + if (needLockDOS.size() > 1) { + needLockDOS = needLockDOS.stream(). + filter(LambdaUtils.distinctByKey(LockDO::getRowKey)) + .collect(Collectors.toList()); } - List existedKeyList = new ArrayList<>(); - locks.forEach(lockDO -> { - existedKeyList.add(getLockKey(lockDO.getRowKey())); - }); - List lockList = jedis.mget(existedKeyList.toArray(new String[0])); - Map map = new HashMap<>(existedKeyList.size(), 1); - for (int i = 0; i < existedKeyList.size(); i++) { - String existedValue = lockList.get(i); - if (existedValue == null) { - String key = existedKeyList.get(i); - map.put(key, JSON.toJSONString(locks.get(i))); + List needLockKeys = new ArrayList<>(); + needLockDOS.forEach(lockDO -> needLockKeys.add(buildLockKey(lockDO.getRowKey()))); + + Pipeline pipeline1 = jedis.pipelined(); + needLockKeys.stream().forEachOrdered(needLockKey -> pipeline1.hget(needLockKey, XID)); + List existedLockInfos = (List) (List) pipeline1.syncAndReturnAll(); + Map needAddLock = new HashMap<>(needLockKeys.size(), 1); + + for (int i = 0; i < needLockKeys.size(); i++) { + String existedLockXid = existedLockInfos.get(i); + if (StringUtils.isEmpty(existedLockXid)) { + //If empty,we need to lock this row + needAddLock.put(needLockKeys.get(i), needLockDOS.get(i)); } else { - LockDO existed = JSON.parseObject(existedValue, LockDO.class); - if (!StringUtils.equals(existed.getXid(), locks.get(i).getXid())) { + if (!StringUtils.equals(existedLockXid, needLockXid)) { + //If not equals,means the rowkey is holding by another global transaction return false; } } } - if (map.isEmpty()) { + + if (needAddLock.isEmpty()) { return true; } Pipeline pipeline = jedis.pipelined(); List readyKeys = new ArrayList<>(); - map.forEach((key, value) -> { - pipeline.setnx(key, value); + needAddLock.forEach((key, value) -> { + pipeline.hsetnx(key, XID, value.getXid()); + pipeline.hsetnx(key, TRANSACTION_ID, value.getTransactionId().toString()); + pipeline.hsetnx(key, BRANCH_ID, value.getBranchId().toString()); + pipeline.hsetnx(key, RESOURCE_ID, value.getResourceId()); + pipeline.hsetnx(key, TABLE_NAME, value.getTableName()); + pipeline.hsetnx(key, ROW_KEY, value.getRowKey()); + pipeline.hsetnx(key, PK, value.getPk()); readyKeys.add(key); }); - List results = pipeline.syncAndReturnAll(); - for (int i = 0; i < results.size(); i++) { - Long result = (long)results.get(i); + List results = (List) (List) pipeline.syncAndReturnAll(); + List> partitions = Lists.partition(results, 7); + + String[] success = new String[partitions.size()]; + for (int i = 0; i < partitions.size(); i++) { String key = readyKeys.get(i); - if (result != 1) { - status = result; + if (partitions.get(i).contains(FAILED)) { + status = FAILED; } else { - successList.add(key); + success[0] = key; } } - if (status != 1) { - String[] rms = successList.toArray(new String[0]); - if (rms.length > 0) { - jedis.del(rms); + + //If someone has failed,all the lockkey which has been added need to be delete. + if (FAILED.equals(status)) { + if (success.length > 0) { + jedis.del(success); } return false; - } else { - try { - String xidLockKey = getXidLockKey(locks.get(0).getXid()); - jedis.lpush(xidLockKey, readyKeys.toArray(new String[0])); - } catch (Exception e) { - return false; - } - return true; } + String xidLockKey = buildXidLockKey(needLockXid); + StringJoiner lockKeysString = new StringJoiner(ROW_LOCK_KEY_SPLIT_CHAR); + needLockKeys.stream().forEach(lockKey -> lockKeysString.add(lockKey)); + jedis.hset(xidLockKey, branchId.toString(), lockKeysString.toString()); + return true; } } @Override public boolean releaseLock(List rowLocks) { if (CollectionUtils.isEmpty(rowLocks)) { - // no lock return true; } - String[] keys = new String[rowLocks.size()]; - List locks = convertToLockDO(rowLocks); - for (int i = 0; i < locks.size(); i++) { - String key = getLockKey(locks.get(i).getRowKey()); - keys[i] = key; + String currentXid = rowLocks.get(0).getXid(); + Long branchId = rowLocks.get(0).getBranchId(); + List needReleaseLocks = convertToLockDO(rowLocks); + String[] needReleaseKeys = new String[needReleaseLocks.size()]; + for (int i = 0; i < needReleaseLocks.size(); i ++) { + needReleaseKeys[i] = buildLockKey(needReleaseLocks.get(i).getRowKey()); } + try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { - String xidLockKey = getXidLockKey(locks.get(0).getXid()); - Pipeline pipeline = jedis.pipelined(); - pipeline.del(keys); - Arrays.stream(keys).forEach(key -> pipeline.lrem(xidLockKey, 0, key)); - pipeline.sync(); + Pipeline pipelined = jedis.pipelined(); + pipelined.del(needReleaseKeys); + pipelined.hdel(buildXidLockKey(currentXid), branchId.toString()); + pipelined.sync(); return true; } } @@ -157,34 +178,30 @@ public boolean releaseLock(List rowLocks) { @Override public boolean releaseLock(String xid, List branchIds) { if (CollectionUtils.isEmpty(branchIds)) { - // no lock return true; } try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { - String lockListKey = getXidLockKey(xid); - Set keys = lRange(jedis, lockListKey); - if (CollectionUtils.isNotEmpty(keys)) { - List delKeys = new ArrayList<>(); - List values = jedis.mget(keys.toArray(new String[0])); - for (String value : values) { - Iterator it = branchIds.iterator(); - LockDO lock = JSON.parseObject(value, LockDO.class); - while (it.hasNext()) { - Long branchId = it.next(); - if (lock != null && Objects.equals(lock.getBranchId(), branchId)) { - delKeys.add(getLockKey(lock.getRowKey())); - break; + String xidLockKey = buildXidLockKey(xid); + String[] branchIdsArray = new String[branchIds.size()]; + for (int i = 0; i < branchIds.size(); i++) { + branchIdsArray[i] = branchIds.get(i).toString(); + } + List rowKeys = jedis.hmget(xidLockKey, branchIdsArray); + + if (CollectionUtils.isNotEmpty(rowKeys)) { + Pipeline pipelined = jedis.pipelined(); + pipelined.hdel(xidLockKey, branchIdsArray); + rowKeys.stream().forEach(rowKeyStr -> { + if (StringUtils.isNotEmpty(rowKeyStr)) { + if (rowKeyStr.contains(ROW_LOCK_KEY_SPLIT_CHAR)) { + String[] keys = rowKeyStr.split(ROW_LOCK_KEY_SPLIT_CHAR); + pipelined.del(keys); + } else { + pipelined.del(rowKeyStr); } } - } - if (CollectionUtils.isNotEmpty(delKeys)) { - Pipeline pipeline = jedis.pipelined(); - pipeline.del(delKeys.toArray(new String[0])); - for (String key : delKeys) { - pipeline.lrem(lockListKey, 0, key); - } - pipeline.sync(); - } + }); + pipelined.sync(); } return true; } @@ -200,49 +217,29 @@ public boolean releaseLock(String xid, Long branchId) { @Override public boolean isLockable(List rowLocks) { if (CollectionUtils.isEmpty(rowLocks)) { - // no lock return true; } try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { List locks = convertToLockDO(rowLocks); Set lockKeys = new HashSet<>(); for (LockDO rowlock : locks) { - lockKeys.add(getLockKey(rowlock.getRowKey())); + lockKeys.add(buildLockKey(rowlock.getRowKey())); } - List rowlockJsons = jedis.mget(lockKeys.toArray(new String[0])); + String xid = rowLocks.get(0).getXid(); - for (String rowlockJson : rowlockJsons) { - if (!StringUtils.isEmpty(rowlockJson)) { - LockDO lock = JSON.parseObject(rowlockJson, LockDO.class); - if (lock != null && !Objects.equals(lock.getXid(), xid)) { - return false; - } - } - } + Pipeline pipeline = jedis.pipelined(); + lockKeys.stream().forEach(key -> pipeline.hget(key, XID)); + List existedXids = (List) (List) pipeline.syncAndReturnAll(); + return existedXids.stream().allMatch(existedXid -> existedXid == null || xid.equals(existedXid)); } - return true; - } - - private Set lRange(Jedis jedis, String key) { - Set keys = new HashSet<>(); - List redisLockJson; - int start = 0; - int stop = logQueryLimit; - do { - redisLockJson = jedis.lrange(key, start, stop); - keys.addAll(redisLockJson); - start = keys.size(); - stop = start + logQueryLimit; - } while (CollectionUtils.isNotEmpty(redisLockJson)); - return keys; } - private String getXidLockKey(String xid) { - return DEFAULT_REDIS_SEATA_LOCK_XID_PREFIX + xid; + private String buildXidLockKey(String xid) { + return DEFAULT_REDIS_SEATA_GLOBAL_LOCK_PREFIX + xid; } - private String getLockKey(String rowKey) { - return DEFAULT_REDIS_SEATA_LOCK_PREFIX + rowKey; + private String buildLockKey(String rowKey) { + return DEFAULT_REDIS_SEATA_ROW_LOCK_PREFIX + rowKey; } } diff --git a/server/src/test/java/io/seata/server/lock/redis/RedisLockManagerTest.java b/server/src/test/java/io/seata/server/lock/redis/RedisLockManagerTest.java index 95304b59d18..2a75573062c 100644 --- a/server/src/test/java/io/seata/server/lock/redis/RedisLockManagerTest.java +++ b/server/src/test/java/io/seata/server/lock/redis/RedisLockManagerTest.java @@ -16,6 +16,7 @@ package io.seata.server.lock.redis; +import io.seata.server.storage.redis.lock.RedisLockManager; import java.io.IOException; import org.junit.jupiter.api.AfterAll; @@ -29,7 +30,6 @@ import io.seata.core.lock.Locker; import io.seata.server.lock.LockManager; import io.seata.server.session.BranchSession; -import io.seata.server.storage.file.lock.FileLockManager; import io.seata.server.storage.redis.JedisPooledFactory; import io.seata.server.storage.redis.lock.RedisLocker; import redis.clients.jedis.JedisPool; @@ -72,7 +72,7 @@ public void unLock() throws TransactionException { branchSession.setBranchId(204565); branchSession.setResourceId("abcss"); branchSession.setLockKey("t1:3,4;t2:4,5"); - Assertions.assertTrue(lockManager.acquireLock(branchSession)); + Assertions.assertTrue(lockManager.releaseLock(branchSession)); } @Test @@ -100,7 +100,7 @@ public static void after() { server = null; } - public static class RedisLockManagerForTest extends FileLockManager { + public static class RedisLockManagerForTest extends RedisLockManager { public RedisLockManagerForTest() {}