Skip to content

Commit

Permalink
bugfix: fix AsyncWorker potential OOM problem (apache#3258)
Browse files Browse the repository at this point in the history
  • Loading branch information
selfishlover committed Dec 21, 2020
1 parent de56e17 commit 80ea72a
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 186 deletions.
271 changes: 125 additions & 146 deletions rm-datasource/src/main/java/io/seata/rm/datasource/AsyncWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,25 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import io.seata.common.exception.NotSupportYetException;
import io.seata.common.exception.ShouldNeverHappenException;
import com.google.common.collect.Lists;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.CollectionUtils;
import io.seata.config.ConfigurationFactory;
import io.seata.core.exception.TransactionException;
import io.seata.core.model.BranchStatus;
import io.seata.core.model.BranchType;
import io.seata.core.model.ResourceManagerInbound;
import io.seata.rm.DefaultResourceManager;
import io.seata.rm.datasource.undo.UndoLogManager;
import io.seata.rm.datasource.undo.UndoLogManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,178 +48,160 @@
*
* @author sharajava
*/
public class AsyncWorker implements ResourceManagerInbound {
public class AsyncWorker {

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncWorker.class);

private static final int DEFAULT_RESOURCE_SIZE = 16;

private static final int UNDOLOG_DELETE_LIMIT_SIZE = 1000;

private static final int ASYNC_COMMIT_BUFFER_LIMIT = ConfigurationFactory.getInstance().getInt(
CLIENT_ASYNC_COMMIT_BUFFER_LIMIT, DEFAULT_CLIENT_ASYNC_COMMIT_BUFFER_LIMIT);

private static class Phase2Context {
private final DataSourceManager dataSourceManager;

/**
* Instantiates a new Phase 2 context.
*
* @param branchType the branchType
* @param xid the xid
* @param branchId the branch id
* @param resourceId the resource id
* @param applicationData the application data
*/
public Phase2Context(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) {
this.xid = xid;
this.branchId = branchId;
this.resourceId = resourceId;
this.applicationData = applicationData;
this.branchType = branchType;
}
private final BlockingQueue<Phase2Context> commitQueue;

/**
* The Xid.
*/
String xid;
/**
* The Branch id.
*/
long branchId;
/**
* The Resource id.
*/
String resourceId;
/**
* The Application data.
*/
String applicationData;
private final ScheduledExecutorService scheduledExecutor;

/**
* the branch Type
*/
BranchType branchType;
}
public AsyncWorker(DataSourceManager dataSourceManager) {
this.dataSourceManager = dataSourceManager;

private static int ASYNC_COMMIT_BUFFER_LIMIT = ConfigurationFactory.getInstance().getInt(
CLIENT_ASYNC_COMMIT_BUFFER_LIMIT, DEFAULT_CLIENT_ASYNC_COMMIT_BUFFER_LIMIT);
LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
commitQueue = new LinkedBlockingQueue<>(ASYNC_COMMIT_BUFFER_LIMIT);

private static final BlockingQueue<Phase2Context> ASYNC_COMMIT_BUFFER = new LinkedBlockingQueue<>(
ASYNC_COMMIT_BUFFER_LIMIT);
ThreadFactory threadFactory = new NamedThreadFactory("AsyncWorker", 2, true);
scheduledExecutor = new ScheduledThreadPoolExecutor(2, threadFactory);
scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10, 1000, TimeUnit.MILLISECONDS);
}

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);
}
public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
Phase2Context context = new Phase2Context(xid, branchId, resourceId);
addToCommitQueue(context);
return BranchStatus.PhaseTwo_Committed;
}

/**
* Init.
* try add context to commitQueue directly, if fail(which means the queue is full),
* then doBranchCommit urgently(so that the queue could be empty again) and retry this process.
*/
public synchronized void init() {
LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
timerExecutor.scheduleAtFixedRate(() -> {
try {
private void addToCommitQueue(Phase2Context context) {
if (commitQueue.offer(context)) {
return;
}
CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor)
.thenRun(() -> addToCommitQueue(context));
}

void doBranchCommitSafely() {
try {
doBranchCommit();
} catch (Throwable e) {
LOGGER.error("Exception occur when doing branch commit", e);
}
}

doBranchCommits();
private void doBranchCommit() {
if (commitQueue.isEmpty()) {
return;
}

} catch (Throwable e) {
LOGGER.info("Failed at async committing ... {}", e.getMessage());
// transfer all context currently received to this list
List<Phase2Context> allContexts = new LinkedList<>();
commitQueue.drainTo(allContexts);

}
}, 10, 1000 * 1, TimeUnit.MILLISECONDS);
// group context by their resourceId
Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);

groupedContexts.forEach(this::dealWithGroupedContexts);
}

Map<String, List<Phase2Context>> groupedByResourceId(List<Phase2Context> contexts) {
Map<String, List<Phase2Context>> groupedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
contexts.forEach(context -> {
List<Phase2Context> group = groupedContexts.computeIfAbsent(context.resourceId, key -> new LinkedList<>());
group.add(context);
});
return groupedContexts;
}

private void doBranchCommits() {
if (ASYNC_COMMIT_BUFFER.isEmpty()) {
private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {
DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);
if (dataSourceProxy == null) {
LOGGER.warn("Failed to find resource for {}", resourceId);
return;
}

Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
List<Phase2Context> contextsGroupedByResourceId;
while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
contextsGroupedByResourceId = CollectionUtils.computeIfAbsent(mappedContexts, commitContext.resourceId, key -> new ArrayList<>());
contextsGroupedByResourceId.add(commitContext);
Connection conn;
try {
conn = dataSourceProxy.getPlainConnection();
} catch (SQLException sqle) {
LOGGER.error("Failed to get connection for async committing on {}", resourceId, sqle);
return;
}

for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
Connection conn = null;
DataSourceProxy dataSourceProxy;
UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());

// split contexts into several lists, with each list contain no more element than limit size
List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);
splitByLimit.forEach(partition -> deleteUndoLog(conn, undoLogManager, partition));
}

private void deleteUndoLog(Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts) {
Set<String> xids = new LinkedHashSet<>(contexts.size());
Set<Long> branchIds = new LinkedHashSet<>(contexts.size());
contexts.forEach(context -> {
xids.add(context.xid);
branchIds.add(context.branchId);
});

try {
undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
if (!conn.getAutoCommit()) {
conn.commit();
}
} catch (SQLException e) {
LOGGER.error("Failed to batch delete undo log", e);
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", rollbackEx);
}
} finally {
try {
try {
DataSourceManager resourceManager = (DataSourceManager) DefaultResourceManager.get()
.getResourceManager(BranchType.AT);
dataSourceProxy = resourceManager.get(entry.getKey());
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
}
conn = dataSourceProxy.getPlainConnection();
} catch (SQLException sqle) {
LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
continue;
}
contextsGroupedByResourceId = entry.getValue();
Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
for (Phase2Context commitContext : contextsGroupedByResourceId) {
xids.add(commitContext.xid);
branchIds.add(commitContext.branchId);
int maxSize = Math.max(xids.size(), branchIds.size());
if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
try {
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(
xids, branchIds, conn);
} catch (Exception ex) {
LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
}
xids.clear();
branchIds.clear();
}
}

if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
return;
}

try {
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids,
branchIds, conn);
} catch (Exception ex) {
LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
}

if (!conn.getAutoCommit()) {
conn.commit();
}
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
try {
if (conn != null) {
conn.rollback();
}
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to rollback JDBC resource while deleting undo_log ", rollbackEx);
}
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
}
}
conn.close();
} catch (SQLException closeEx) {
LOGGER.error("Failed to close JDBC resource after deleting undo log", closeEx);
}
}
}

@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
throw new NotSupportYetException();
static class Phase2Context {

/**
* AT Phase 2 context
* @param xid the xid
* @param branchId the branch id
* @param resourceId the resource id
*/
public Phase2Context(String xid, long branchId, String resourceId) {
this.xid = xid;
this.branchId = branchId;
this.resourceId = resourceId;
}

/**
* The Xid.
*/
String xid;
/**
* The Branch id.
*/
long branchId;
/**
* The Resource id.
*/
String resourceId;
}
}
Loading

0 comments on commit 80ea72a

Please sign in to comment.