Skip to content

Commit

Permalink
bugfix: filter repeated lock (apache#1839)
Browse files Browse the repository at this point in the history
  • Loading branch information
zjinlei authored and xingfudeshi committed Oct 31, 2019
1 parent 9dab05d commit 5730057
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,9 @@ enum TransactionExceptionCodeProto {
*/
FailedWriteSession = 16;

/**
* FailedStore
*/
FailedStore = 17;

}
21 changes: 21 additions & 0 deletions common/src/main/java/io/seata/common/util/LambdaUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.seata.common.util;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;

/**
* The type Lambda util.
*
* @author zjinlei
* @date 2019 /10/29
*/
public class LambdaUtils {

public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {
Map<Object, Boolean> seen = new ConcurrentHashMap<>();
return object -> seen.putIfAbsent(keyExtractor.apply(object), Boolean.TRUE) == null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.seata.core.store.db;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -25,13 +26,12 @@
import java.util.Set;
import java.util.stream.Collectors;

import javax.sql.DataSource;

import io.seata.common.exception.DataAccessException;
import io.seata.common.exception.StoreException;
import io.seata.common.executor.Initialize;
import io.seata.common.loader.LoadLevel;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.LambdaUtils;
import io.seata.common.util.StringUtils;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
Expand Down Expand Up @@ -104,9 +104,11 @@ public boolean acquireLock(List<LockDO> lockDOs) {
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
List<LockDO> unrepeatedLockDOs = null;
Set<String> dbExistedRowKeys = new HashSet<>();
boolean originalAutoCommit = true;
if (lockDOs.size() > 1) {
lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
}
try {
conn = logStoreDataSource.getConnection();
if (originalAutoCommit = conn.getAutoCommit()) {
Expand Down Expand Up @@ -149,6 +151,7 @@ public boolean acquireLock(List<LockDO> lockDOs) {
conn.rollback();
return false;
}
List<LockDO> unrepeatedLockDOs = null;
if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
.collect(Collectors.toList());
Expand All @@ -159,7 +162,6 @@ public boolean acquireLock(List<LockDO> lockDOs) {
conn.rollback();
return true;
}

//lock
for (LockDO lockDO : unrepeatedLockDOs) {
if (!doAcquireLock(conn, lockDO)) {
Expand Down

0 comments on commit 5730057

Please sign in to comment.