Skip to content

Commit

Permalink
feature: support redis lua for transaction storage & global lock (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
conghuhu committed Feb 12, 2023
1 parent 3097fd6 commit 51f6a09
Show file tree
Hide file tree
Showing 48 changed files with 2,682 additions and 472 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The version is updated as follows:
- [[#4880](https://github.com/seata/seata/pull/4880)] optimize logs when commit/rollback catch an exception
- [[#5322](https://github.com/seata/seata/pull/5322)] optimize the log of SPI
- [[#5323](https://github.com/seata/seata/pull/5323)] add time info for global transaction timeout log
- [[#5328](https://github.com/seata/seata/pull/5333)] add corresponding lua implementation for Redis mode of global transaction and transaction storage

### test:
- [[#5308](https://github.com/seata/seata/pull/5308)] add unit test [FileLoader, ObjectHolder, StringUtils]
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#4880](https://github.com/seata/seata/pull/4880)] 优化提交和回滚遇到异常时的日志输出
- [[#5322](https://github.com/seata/seata/pull/5322)] 优化SPI加载日志
- [[#5323](https://github.com/seata/seata/pull/5323)] 为全局事务超时日志添加时间信息
- [[#5328](https://github.com/seata/seata/pull/5333)] 为全局事务和事务存储的Redis模式,增加对应的lua实现

### security:
- [[#5172](https://github.com/seata/seata/pull/5172)] 修复一些安全漏洞的版本
Expand Down
5 changes: 5 additions & 0 deletions common/src/main/java/io/seata/common/ConfigurationKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,11 @@ public interface ConfigurationKeys {
*/
String STORE_REDIS_MODE = STORE_REDIS_PREFIX + "mode";

/**
* The constant STORE_REDIS_TYPE. lua pipeline
*/
String STORE_REDIS_TYPE = STORE_REDIS_PREFIX + "type";

/**
* The constant STORE_REDIS_HOST.
*/
Expand Down
7 changes: 6 additions & 1 deletion common/src/main/java/io/seata/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,9 @@ public interface Constants {
*/
String COMPENSATION_METHOD = "sys::compensation";

}
/**
* phase STORE_REDIS_TYPE_PIPELINE
*/
String STORE_REDIS_TYPE_PIPELINE = "pipeline";

}
5 changes: 5 additions & 0 deletions core/src/main/java/io/seata/core/lock/AbstractLocker.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public void cleanAllLocks() {

}

@Override
public boolean releaseLock(List<RowLock> rowLock) {
return false;
}

@Override
public boolean releaseLock(String xid, Long branchId) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void testGetPkValues_NotSupportYetException() {
}

private void mockParameters_with_null_and_insertRows_with_placeholder_null() {
Map<Integer,ArrayList<Object>> paramters = new HashMap<>(5);
Map<Integer,ArrayList<Object>> paramters = new HashMap<>();
ArrayList arrayList0 = new ArrayList<>();
arrayList0.add("userId1");
ArrayList arrayList1 = new ArrayList<>();
Expand All @@ -214,7 +214,7 @@ private void mockParameters_with_null_and_insertRows_with_placeholder_null() {
}

private void mockParameters_with_number_and_insertRows_with_placeholde_null() {
Map<Integer,ArrayList<Object>> paramters = new HashMap<>(5);
Map<Integer,ArrayList<Object>> paramters = new HashMap<>();
ArrayList arrayList0 = new ArrayList<>();
arrayList0.add("userId1");
ArrayList arrayList1 = new ArrayList<>();
Expand Down
1 change: 1 addition & 0 deletions script/config-center/config.txt
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ store.db.maxWait=5000

#These configurations are required if the `store mode` is `redis`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `redis`, you can remove the configuration block.
store.redis.mode=single
store.redis.type=pipeline
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
store.redis.sentinel.masterName=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ public class StoreRedisProperties {
* single, sentinel
*/
private String mode = "single";
private String password;
private String type = "pipeline";
private Integer maxConn = DEFAULT_REDIS_MAX_IDLE;
private String password = null;
private Integer minConn = DEFAULT_REDIS_MIN_IDLE;
private Integer database = 0;
private Integer queryLimit = DEFAULT_QUERY_LIMIT;
Expand All @@ -51,6 +52,15 @@ public StoreRedisProperties setMode(String mode) {
return this;
}

public String getType() {
return type;
}

public StoreRedisProperties setType(String type) {
this.type = type;
return this;
}

public String getPassword() {
return password;
}
Expand Down Expand Up @@ -105,7 +115,6 @@ public StoreRedisProperties setMaxTotal(Integer maxTotal) {
return this;
}


@Component
@ConfigurationProperties(prefix = STORE_REDIS_SINGLE_PREFIX)
public static class Single {
Expand All @@ -131,8 +140,6 @@ public Single setPort(Integer port) {
}
}



@Component
@ConfigurationProperties(prefix = STORE_REDIS_SENTINEL_PREFIX)
public static class Sentinel {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.seata.core.store.BranchTransactionDO;
import io.seata.server.console.service.BranchSessionService;
import io.seata.server.storage.redis.store.RedisTransactionStoreManager;
import io.seata.server.storage.redis.store.RedisTransactionStoreManagerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
Expand All @@ -47,7 +48,7 @@ public PageResult<BranchSessionVO> queryByXid(String xid) {

List<BranchSessionVO> branchSessionVos = new ArrayList<>();

RedisTransactionStoreManager instance = RedisTransactionStoreManager.getInstance();
RedisTransactionStoreManager instance = RedisTransactionStoreManagerFactory.getInstance();

List<BranchTransactionDO> branchSessionDos = instance.findBranchSessionByXid(xid);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition;
import io.seata.server.storage.redis.store.RedisTransactionStoreManager;
import io.seata.server.storage.redis.store.RedisTransactionStoreManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
Expand Down Expand Up @@ -60,7 +61,7 @@ public PageResult<GlobalSessionVO> query(GlobalSessionParam param) {
}
List<GlobalSession> globalSessions = new ArrayList<>();

RedisTransactionStoreManager instance = RedisTransactionStoreManager.getInstance();
RedisTransactionStoreManager instance = RedisTransactionStoreManagerFactory.getInstance();

checkPage(param);

Expand Down
152 changes: 152 additions & 0 deletions server/src/main/java/io/seata/server/storage/redis/LuaParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.server.storage.redis;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.seata.common.exception.StoreException;
import io.seata.common.io.FileLoader;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisDataException;

/**
* lua related utils
*
* @author conghuhu
*/
public class LuaParser {

private static final String WHITE_SPACE = " ";

private static final String ANNOTATION_LUA = "--";

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

public final static class LuaResult implements Serializable {
private static final long serialVersionUID = -4160065043902060730L;
private Boolean success;
private String status;
private String data;

public Boolean getSuccess() {
return success;
}

public void setSuccess(Boolean success) {
this.success = success;
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}

public String getData() {
return data;
}

public void setData(String data) {
this.data = data;
}

@Override public String toString() {
return "LuaResult{" +
"success=" + success +
", type='" + status + '\'' +
", data='" + data + '\'' +
'}';
}
}

public final static class LuaErrorStatus {

public static final String ANOTHER_ROLLBACKING = "AnotherRollbackIng";

public static final String ANOTHER_HOLDING = "AnotherHoldIng";

public static final String XID_NOT_EXISTED = "NotExisted";

public static final String ILLEGAL_CHANGE_STATUS = "ChangeStatusFail";
}

/**
* get lua string from lua file.
*
* @param fileName
* @return
* @throws IOException
*/
public static Map<String, String> getEvalShaMapFromFile(String fileName) throws IOException {
File luaFile = FileLoader.load(fileName);
if (luaFile == null) {
throw new IOException("no lua file: " + fileName);
}
StringBuilder luaByFile = new StringBuilder();
try (FileInputStream fis = new FileInputStream(luaFile)) {
BufferedReader br = new BufferedReader(new InputStreamReader(fis));
String line;
while ((line = br.readLine()) != null) {
if (line.trim().startsWith(ANNOTATION_LUA)) {
continue;
}
luaByFile.append(line);
luaByFile.append(WHITE_SPACE);
}
} catch (IOException e) {
throw new IOException(e);
}
Map<String, String> resultMap = new ConcurrentHashMap<>(1);
try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
resultMap.put(fileName, jedis.scriptLoad(luaByFile.toString()));
return resultMap;
} catch (UnsupportedOperationException | JedisDataException e) {
throw new IOException(e);
}
}

public static <T> T getObjectFromJson(String json, Class<T> classz) {
try {
return OBJECT_MAPPER.readValue(json, classz);
} catch (JsonProcessingException e) {
throw new StoreException(e.getMessage());
}
}

public static <T> List<T> getListFromJson(String json, Class<T> classz) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.readValue(json, new TypeReference<List<T>>() {
});
} catch (JsonProcessingException e) {
throw new StoreException(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class RedisLockManager extends AbstractLockManager implements Initialize

@Override
public void init() {
locker = new RedisLocker();
locker = RedisLockerFactory.getLocker();
}

@Override
Expand Down
Loading

0 comments on commit 51f6a09

Please sign in to comment.