Skip to content

Commit

Permalink
optimize: refactor the redis lock string to hash (apache#3016)
Browse files Browse the repository at this point in the history
  • Loading branch information
lightClouds917 committed Sep 3, 2020
1 parent afd9eaa commit fc0c9ce
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 126 deletions.
14 changes: 11 additions & 3 deletions common/src/main/java/io/seata/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* @author slievrly
*/
public class Constants {

/**
* The constant IP_PORT_SPLIT_CHAR.
*/
Expand All @@ -40,8 +41,15 @@ public class Constants {
*/
public static final String DBKEYS_SPLIT_CHAR = ",";

/** the start time of transaction */
public static final String START_TIME = "start-time";
/**
* The constant ROW_LOCK_KEY_SPLIT_CHAR.
*/
public static final String ROW_LOCK_KEY_SPLIT_CHAR = ";";

/**
* the start time of transaction
*/
public static final String START_TIME = "start-time";

/**
* app name
Expand Down Expand Up @@ -102,7 +110,7 @@ public class Constants {
* default charset name
*/
public static final String DEFAULT_CHARSET_NAME = "UTF-8";

/**
* default charset is utf-8
*/
Expand Down
237 changes: 117 additions & 120 deletions server/src/main/java/io/seata/server/storage/redis/lock/RedisLocker.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,176 +15,193 @@
*/
package io.seata.server.storage.redis.lock;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.StringJoiner;
import java.util.stream.Collectors;

import com.alibaba.fastjson.JSON;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.LambdaUtils;
import io.seata.common.util.StringUtils;
import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.lock.AbstractLocker;
import io.seata.core.lock.RowLock;
import io.seata.core.store.LockDO;
import io.seata.server.storage.redis.JedisPooledFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

import static io.seata.common.Constants.ROW_LOCK_KEY_SPLIT_CHAR;

/**
* The redis lock store operation
*
* @author funkye
* @author wangzhongxiang
*/
public class RedisLocker extends AbstractLocker {

private static final Integer DEFAULT_QUERY_LIMIT = 100;
private static final Integer SUCCEED = 1;

private static final String DEFAULT_REDIS_SEATA_LOCK_PREFIX = "SEATA_LOCK_";
private static final Integer FAILED = 0;

private static final String DEFAULT_REDIS_SEATA_LOCK_XID_PREFIX = "SEATA_LOCK_XID_";
private static final String DEFAULT_REDIS_SEATA_ROW_LOCK_PREFIX = "SEATA_ROW_LOCK_";

/**
* The query limit.
*/
private int logQueryLimit;
private static final String DEFAULT_REDIS_SEATA_GLOBAL_LOCK_PREFIX = "SEATA_GLOBAL_LOCK";

private static final String XID = "xid";

private static final String TRANSACTION_ID = "transactionId";

private static final String BRANCH_ID = "branchId";

private static final String RESOURCE_ID = "resourceId";

private static final String TABLE_NAME = "tableName";

private static final String PK = "pk";

private static final String ROW_KEY = "rowKey";

/**
* Instantiates a new Redis locker.
*/
public RedisLocker() {
logQueryLimit =
ConfigurationFactory.getInstance().getInt(ConfigurationKeys.STORE_REDIS_QUERY_LIMIT, DEFAULT_QUERY_LIMIT);
}

@Override
public boolean acquireLock(List<RowLock> rowLocks) {
if (CollectionUtils.isEmpty(rowLocks)) {
// no lock
return true;
}
Set<String> successList = new HashSet<>();
long status = 1;
Integer status = SUCCEED;
String needLockXid = rowLocks.get(0).getXid();
Long branchId = rowLocks.get(0).getBranchId();

try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
List<LockDO> locks = convertToLockDO(rowLocks);
if (locks.size() > 1) {
locks =
locks.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
List<LockDO> needLockDOS = convertToLockDO(rowLocks);
if (needLockDOS.size() > 1) {
needLockDOS = needLockDOS.stream().
filter(LambdaUtils.distinctByKey(LockDO::getRowKey))
.collect(Collectors.toList());
}
List<String> existedKeyList = new ArrayList<>();
locks.forEach(lockDO -> {
existedKeyList.add(getLockKey(lockDO.getRowKey()));
});
List<String> lockList = jedis.mget(existedKeyList.toArray(new String[0]));
Map<String, String> map = new HashMap<>(existedKeyList.size(), 1);
for (int i = 0; i < existedKeyList.size(); i++) {
String existedValue = lockList.get(i);
if (existedValue == null) {
String key = existedKeyList.get(i);
map.put(key, JSON.toJSONString(locks.get(i)));
List<String> needLockKeys = new ArrayList<>();
needLockDOS.forEach(lockDO -> needLockKeys.add(buildLockKey(lockDO.getRowKey())));

Pipeline pipeline1 = jedis.pipelined();
needLockKeys.stream().forEachOrdered(needLockKey -> pipeline1.hget(needLockKey, XID));
List<String> existedLockInfos = (List<String>) (List) pipeline1.syncAndReturnAll();
Map<String, LockDO> needAddLock = new HashMap<>(needLockKeys.size(), 1);

for (int i = 0; i < needLockKeys.size(); i++) {
String existedLockXid = existedLockInfos.get(i);
if (StringUtils.isEmpty(existedLockXid)) {
//If empty,we need to lock this row
needAddLock.put(needLockKeys.get(i), needLockDOS.get(i));
} else {
LockDO existed = JSON.parseObject(existedValue, LockDO.class);
if (!StringUtils.equals(existed.getXid(), locks.get(i).getXid())) {
if (!StringUtils.equals(existedLockXid, needLockXid)) {
//If not equals,means the rowkey is holding by another global transaction
return false;
}
}
}
if (map.isEmpty()) {

if (needAddLock.isEmpty()) {
return true;
}
Pipeline pipeline = jedis.pipelined();
List<String> readyKeys = new ArrayList<>();
map.forEach((key, value) -> {
pipeline.setnx(key, value);
needAddLock.forEach((key, value) -> {
pipeline.hsetnx(key, XID, value.getXid());
pipeline.hsetnx(key, TRANSACTION_ID, value.getTransactionId().toString());
pipeline.hsetnx(key, BRANCH_ID, value.getBranchId().toString());
pipeline.hsetnx(key, RESOURCE_ID, value.getResourceId());
pipeline.hsetnx(key, TABLE_NAME, value.getTableName());
pipeline.hsetnx(key, ROW_KEY, value.getRowKey());
pipeline.hsetnx(key, PK, value.getPk());
readyKeys.add(key);
});
List<Object> results = pipeline.syncAndReturnAll();
for (int i = 0; i < results.size(); i++) {
Long result = (long)results.get(i);
List<Integer> results = (List<Integer>) (List) pipeline.syncAndReturnAll();
List<List<Integer>> partitions = Lists.partition(results, 7);

String[] success = new String[partitions.size()];
for (int i = 0; i < partitions.size(); i++) {
String key = readyKeys.get(i);
if (result != 1) {
status = result;
if (partitions.get(i).contains(FAILED)) {
status = FAILED;
} else {
successList.add(key);
success[0] = key;
}
}
if (status != 1) {
String[] rms = successList.toArray(new String[0]);
if (rms.length > 0) {
jedis.del(rms);

//If someone has failed,all the lockkey which has been added need to be delete.
if (FAILED.equals(status)) {
if (success.length > 0) {
jedis.del(success);
}
return false;
} else {
try {
String xidLockKey = getXidLockKey(locks.get(0).getXid());
jedis.lpush(xidLockKey, readyKeys.toArray(new String[0]));
} catch (Exception e) {
return false;
}
return true;
}
String xidLockKey = buildXidLockKey(needLockXid);
StringJoiner lockKeysString = new StringJoiner(ROW_LOCK_KEY_SPLIT_CHAR);
needLockKeys.stream().forEach(lockKey -> lockKeysString.add(lockKey));
jedis.hset(xidLockKey, branchId.toString(), lockKeysString.toString());
return true;
}
}

@Override
public boolean releaseLock(List<RowLock> rowLocks) {
if (CollectionUtils.isEmpty(rowLocks)) {
// no lock
return true;
}
String[] keys = new String[rowLocks.size()];
List<LockDO> locks = convertToLockDO(rowLocks);
for (int i = 0; i < locks.size(); i++) {
String key = getLockKey(locks.get(i).getRowKey());
keys[i] = key;
String currentXid = rowLocks.get(0).getXid();
Long branchId = rowLocks.get(0).getBranchId();
List<LockDO> needReleaseLocks = convertToLockDO(rowLocks);
String[] needReleaseKeys = new String[needReleaseLocks.size()];
for (int i = 0; i < needReleaseLocks.size(); i ++) {
needReleaseKeys[i] = buildLockKey(needReleaseLocks.get(i).getRowKey());
}

try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
String xidLockKey = getXidLockKey(locks.get(0).getXid());
Pipeline pipeline = jedis.pipelined();
pipeline.del(keys);
Arrays.stream(keys).forEach(key -> pipeline.lrem(xidLockKey, 0, key));
pipeline.sync();
Pipeline pipelined = jedis.pipelined();
pipelined.del(needReleaseKeys);
pipelined.hdel(buildXidLockKey(currentXid), branchId.toString());
pipelined.sync();
return true;
}
}

@Override
public boolean releaseLock(String xid, List<Long> branchIds) {
if (CollectionUtils.isEmpty(branchIds)) {
// no lock
return true;
}
try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
String lockListKey = getXidLockKey(xid);
Set<String> keys = lRange(jedis, lockListKey);
if (CollectionUtils.isNotEmpty(keys)) {
List<String> delKeys = new ArrayList<>();
List<String> values = jedis.mget(keys.toArray(new String[0]));
for (String value : values) {
Iterator<Long> it = branchIds.iterator();
LockDO lock = JSON.parseObject(value, LockDO.class);
while (it.hasNext()) {
Long branchId = it.next();
if (lock != null && Objects.equals(lock.getBranchId(), branchId)) {
delKeys.add(getLockKey(lock.getRowKey()));
break;
String xidLockKey = buildXidLockKey(xid);
String[] branchIdsArray = new String[branchIds.size()];
for (int i = 0; i < branchIds.size(); i++) {
branchIdsArray[i] = branchIds.get(i).toString();
}
List<String> rowKeys = jedis.hmget(xidLockKey, branchIdsArray);

if (CollectionUtils.isNotEmpty(rowKeys)) {
Pipeline pipelined = jedis.pipelined();
pipelined.hdel(xidLockKey, branchIdsArray);
rowKeys.stream().forEach(rowKeyStr -> {
if (StringUtils.isNotEmpty(rowKeyStr)) {
if (rowKeyStr.contains(ROW_LOCK_KEY_SPLIT_CHAR)) {
String[] keys = rowKeyStr.split(ROW_LOCK_KEY_SPLIT_CHAR);
pipelined.del(keys);
} else {
pipelined.del(rowKeyStr);
}
}
}
if (CollectionUtils.isNotEmpty(delKeys)) {
Pipeline pipeline = jedis.pipelined();
pipeline.del(delKeys.toArray(new String[0]));
for (String key : delKeys) {
pipeline.lrem(lockListKey, 0, key);
}
pipeline.sync();
}
});
pipelined.sync();
}
return true;
}
Expand All @@ -200,49 +217,29 @@ public boolean releaseLock(String xid, Long branchId) {
@Override
public boolean isLockable(List<RowLock> rowLocks) {
if (CollectionUtils.isEmpty(rowLocks)) {
// no lock
return true;
}
try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
List<LockDO> locks = convertToLockDO(rowLocks);
Set<String> lockKeys = new HashSet<>();
for (LockDO rowlock : locks) {
lockKeys.add(getLockKey(rowlock.getRowKey()));
lockKeys.add(buildLockKey(rowlock.getRowKey()));
}
List<String> rowlockJsons = jedis.mget(lockKeys.toArray(new String[0]));

String xid = rowLocks.get(0).getXid();
for (String rowlockJson : rowlockJsons) {
if (!StringUtils.isEmpty(rowlockJson)) {
LockDO lock = JSON.parseObject(rowlockJson, LockDO.class);
if (lock != null && !Objects.equals(lock.getXid(), xid)) {
return false;
}
}
}
Pipeline pipeline = jedis.pipelined();
lockKeys.stream().forEach(key -> pipeline.hget(key, XID));
List<String> existedXids = (List<String>) (List) pipeline.syncAndReturnAll();
return existedXids.stream().allMatch(existedXid -> existedXid == null || xid.equals(existedXid));
}
return true;
}

private Set<String> lRange(Jedis jedis, String key) {
Set<String> keys = new HashSet<>();
List<String> redisLockJson;
int start = 0;
int stop = logQueryLimit;
do {
redisLockJson = jedis.lrange(key, start, stop);
keys.addAll(redisLockJson);
start = keys.size();
stop = start + logQueryLimit;
} while (CollectionUtils.isNotEmpty(redisLockJson));
return keys;
}

private String getXidLockKey(String xid) {
return DEFAULT_REDIS_SEATA_LOCK_XID_PREFIX + xid;
private String buildXidLockKey(String xid) {
return DEFAULT_REDIS_SEATA_GLOBAL_LOCK_PREFIX + xid;
}

private String getLockKey(String rowKey) {
return DEFAULT_REDIS_SEATA_LOCK_PREFIX + rowKey;
private String buildLockKey(String rowKey) {
return DEFAULT_REDIS_SEATA_ROW_LOCK_PREFIX + rowKey;
}

}
Loading

0 comments on commit fc0c9ce

Please sign in to comment.