Skip to content

Commit

Permalink
IGNITE-18783 Fix DistributedProcess hangs on AssertionError (apache#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
timoninmaxim authored Feb 16, 2023
1 parent f22ceb2 commit ab27886
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ private IgniteInternalFuture<EmptyResult> prepareMasterKeyChange(MasterKeyChange
* @param res Results.
* @param err Errors.
*/
private void finishPrepareMasterKeyChange(UUID id, Map<UUID, EmptyResult> res, Map<UUID, Exception> err) {
private void finishPrepareMasterKeyChange(UUID id, Map<UUID, EmptyResult> res, Map<UUID, Throwable> err) {
if (!err.isEmpty()) {
if (masterKeyChangeRequest != null && masterKeyChangeRequest.requestId().equals(id))
masterKeyChangeRequest = null;
Expand Down Expand Up @@ -1613,23 +1613,23 @@ private IgniteInternalFuture<EmptyResult> performMasterKeyChange(MasterKeyChange
* @param res Results.
* @param err Errors.
*/
private void finishPerformMasterKeyChange(UUID id, Map<UUID, EmptyResult> res, Map<UUID, Exception> err) {
private void finishPerformMasterKeyChange(UUID id, Map<UUID, EmptyResult> res, Map<UUID, Throwable> err) {
completeMasterKeyChangeFuture(id, err);
}

/**
* @param reqId Request id.
* @param err Exception.
*/
private void completeMasterKeyChangeFuture(UUID reqId, Map<UUID, Exception> err) {
private void completeMasterKeyChangeFuture(UUID reqId, Map<UUID, Throwable> err) {
synchronized (opsMux) {
boolean isInitiator = masterKeyChangeFut != null && masterKeyChangeFut.id().equals(reqId);

if (!isInitiator || masterKeyChangeFut.isDone())
return;

if (!F.isEmpty(err)) {
Exception e = err.values().stream().findFirst().get();
Throwable e = err.values().stream().findFirst().get();

masterKeyChangeFut.onDone(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ private IgniteInternalFuture<EmptyResult> prepare(ChangeCacheEncryptionRequest r
* @param res Results.
* @param err Errors.
*/
private void finishPrepare(UUID id, Map<UUID, EmptyResult> res, Map<UUID, Exception> err) {
private void finishPrepare(UUID id, Map<UUID, EmptyResult> res, Map<UUID, Throwable> err) {
if (!err.isEmpty()) {
if (req != null && req.requestId().equals(id))
req = null;
Expand Down Expand Up @@ -318,7 +318,7 @@ private IgniteInternalFuture<EmptyResult> perform(ChangeCacheEncryptionRequest r
* @param res Results.
* @param err Errors.
*/
private void finishPerform(UUID id, Map<UUID, EmptyResult> res, Map<UUID, Exception> err) {
private void finishPerform(UUID id, Map<UUID, EmptyResult> res, Map<UUID, Throwable> err) {
completeFuture(id, err, fut);
}

Expand All @@ -328,7 +328,7 @@ private void finishPerform(UUID id, Map<UUID, EmptyResult> res, Map<UUID, Except
* @param fut Key change future.
* @return {@code True} if future was completed by this call.
*/
private boolean completeFuture(UUID reqId, Map<UUID, Exception> err, GroupKeyChangeFuture fut) {
private boolean completeFuture(UUID reqId, Map<UUID, Throwable> err, GroupKeyChangeFuture fut) {
boolean isInitiator = fut != null && fut.id().equals(reqId);

if (!isInitiator || fut.isDone())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotStartSt
* @param res Results.
* @param err Errors.
*/
private void processLocalSnapshotStartStageResult(UUID id, Map<UUID, SnapshotOperationResponse> res, Map<UUID, Exception> err) {
private void processLocalSnapshotStartStageResult(UUID id, Map<UUID, SnapshotOperationResponse> res, Map<UUID, Throwable> err) {
if (cctx.kernalContext().clientNode())
return;

Expand Down Expand Up @@ -1106,7 +1106,7 @@ private void storeWarnings(SnapshotOperationRequest snpReq) {
* @param res Results.
* @param err Errors.
*/
private void processLocalSnapshotEndStageResult(UUID id, Map<UUID, SnapshotOperationResponse> res, Map<UUID, Exception> err) {
private void processLocalSnapshotEndStageResult(UUID id, Map<UUID, SnapshotOperationResponse> res, Map<UUID, Throwable> err) {
SnapshotOperationRequest snpReq = clusterSnpReq;

if (snpReq == null || !F.eq(id, snpReq.requestId()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,13 +771,13 @@ private SnapshotRestoreContext prepareContext(
* @param res Results.
* @param errs Errors.
*/
private void finishPrepare(UUID reqId, Map<UUID, SnapshotRestoreOperationResponse> res, Map<UUID, Exception> errs) {
private void finishPrepare(UUID reqId, Map<UUID, SnapshotRestoreOperationResponse> res, Map<UUID, Throwable> errs) {
if (ctx.clientNode())
return;

SnapshotRestoreContext opCtx0 = opCtx;

Exception failure = F.first(errs.values());
Throwable failure = F.first(errs.values());

assert opCtx0 != null || failure != null : "Context has not been created on the node " + ctx.localNodeId();

Expand Down Expand Up @@ -1173,13 +1173,13 @@ private void restoreMappings(File marshallerDir, BooleanSupplier stopChecker) th
* @param res Results.
* @param errs Errors.
*/
private void finishPreload(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
private void finishPreload(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Throwable> errs) {
if (ctx.clientNode())
return;

SnapshotRestoreContext opCtx0 = opCtx;

Exception failure = errs.values().stream().findFirst().
Throwable failure = errs.values().stream().findFirst().
orElse(checkNodeLeft(opCtx0.nodes(), res.keySet()));

opCtx0.errHnd.accept(failure);
Expand Down Expand Up @@ -1232,13 +1232,13 @@ private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
* @param res Results.
* @param errs Errors.
*/
private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Throwable> errs) {
if (ctx.clientNode())
return;

SnapshotRestoreContext opCtx0 = opCtx;

Exception failure = errs.values().stream().findFirst().
Throwable failure = errs.values().stream().findFirst().
orElse(checkNodeLeft(opCtx0.nodes(), res.keySet()));

if (failure == null) {
Expand Down Expand Up @@ -1375,7 +1375,7 @@ private IgniteInternalFuture<Boolean> rollback(UUID reqId) {
* @param res Results.
* @param errs Errors.
*/
private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Throwable> errs) {
if (ctx.clientNode())
return;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public DistributedProcess(
GridKernalContext ctx,
DistributedProcessType type,
Function<I, IgniteInternalFuture<R>> exec,
CI3<UUID, Map<UUID, R>, Map<UUID, Exception>> finish
CI3<UUID, Map<UUID, R>, Map<UUID, Throwable>> finish
) {
this(ctx, type, exec, finish, (id, req) -> new InitMessage<>(id, type, req));
}
Expand All @@ -112,7 +112,7 @@ public DistributedProcess(
GridKernalContext ctx,
DistributedProcessType type,
Function<I, IgniteInternalFuture<R>> exec,
CI3<UUID, Map<UUID, R>, Map<UUID, Exception>> finish,
CI3<UUID, Map<UUID, R>, Map<UUID, Throwable>> finish,
BiFunction<UUID, I, ? extends InitMessage<I>> initMsgFactory
) {
this.ctx = ctx;
Expand Down Expand Up @@ -146,22 +146,32 @@ public DistributedProcess(
if (crd.isLocal())
initCoordinator(p, topVer);

IgniteInternalFuture<R> fut = exec.apply((I)msg.request());
try {
IgniteInternalFuture<R> fut = exec.apply((I)msg.request());

fut.listen(f -> {
if (f.error() != null)
p.resFut.onDone(f.error());
else
p.resFut.onDone(f.result());
fut.listen(f -> {
if (f.error() != null)
p.resFut.onDone(f.error());
else
p.resFut.onDone(f.result());

if (!ctx.clientNode()) {
assert crd != null;
if (!ctx.clientNode()) {
assert crd != null;

sendSingleMessage(p);
}
});
sendSingleMessage(p);
}
});

p.initFut.onDone();
}
catch (Throwable err) {
U.error(log, "Failed to handle InitMessage [id=" + p.id + ']', err);

p.initFut.onDone();
p.resFut.onDone(err);

if (!ctx.clientNode())
sendSingleMessage(p);
}
});

ctx.discovery().setCustomEventListener(FullMessage.class, (topVer, snd, msg0) -> {
Expand All @@ -180,9 +190,15 @@ public DistributedProcess(
return;
}

finish.apply(p.id, msg.result(), msg.error());

processes.remove(msg.processId());
try {
finish.apply(p.id, msg.result(), msg.error());
}
catch (Throwable err) {
U.error(log, "Failed to handle FullMessage [id=" + p.id + ']', err);
}
finally {
processes.remove(msg.processId());
}
});

ctx.io().addMessageListener(GridTopic.TOPIC_DISTRIBUTED_PROCESS, (nodeId, msg0, plc) -> {
Expand Down Expand Up @@ -274,8 +290,7 @@ private void initCoordinator(Process p, AffinityTopologyVersion topVer) {
private void sendSingleMessage(Process p) {
assert p.resFut.isDone();

SingleNodeMessage<R> singleMsg = new SingleNodeMessage<>(p.id, type, p.resFut.result(),
(Exception)p.resFut.error());
SingleNodeMessage<R> singleMsg = new SingleNodeMessage<>(p.id, type, p.resFut.result(), p.resFut.error());

UUID crdId = p.crdId;

Expand Down Expand Up @@ -333,7 +348,7 @@ private void onSingleNodeMessageReceived(SingleNodeMessage<R> msg, UUID nodeId)
private void finishProcess(Process p) {
HashMap<UUID, R> res = new HashMap<>();

HashMap<UUID, Exception> err = new HashMap<>();
HashMap<UUID, Throwable> err = new HashMap<>();

p.singleMsgs.forEach((uuid, msg) -> {
if (msg.hasError())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ public class FullMessage<R extends Serializable> implements DiscoveryCustomMessa
private Map<UUID, R> res;

/** Errors. */
private Map<UUID, Exception> err;
private Map<UUID, Throwable> err;

/**
* @param processId Process id.
* @param type Process type.
* @param res Results.
* @param err Errors
*/
public FullMessage(UUID processId, DistributedProcessType type, Map<UUID, R> res, Map<UUID, Exception> err) {
public FullMessage(UUID processId, DistributedProcessType type, Map<UUID, R> res, Map<UUID, Throwable> err) {
this.processId = processId;
this.type = type.ordinal();
this.res = res;
Expand Down Expand Up @@ -106,7 +106,7 @@ public Map<UUID, R> result() {
}

/** @return Nodes errors. */
public Map<UUID, Exception> error() {
public Map<UUID, Throwable> error() {
return err;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class SingleNodeMessage<R extends Serializable> implements Message {
private R resp;

/** Error. */
private Exception err;
private Throwable err;

/** Empty constructor for marshalling purposes. */
public SingleNodeMessage() {
Expand All @@ -63,7 +63,7 @@ public SingleNodeMessage() {
* @param resp Single node response.
* @param err Error.
*/
public SingleNodeMessage(UUID processId, DistributedProcessType type, R resp, Exception err) {
public SingleNodeMessage(UUID processId, DistributedProcessType type, R resp, Throwable err) {
this.processId = processId;
this.type = type.ordinal();
this.resp = resp;
Expand Down Expand Up @@ -194,7 +194,7 @@ public boolean hasError() {
}

/** @return Error. */
public Exception error() {
public Throwable error() {
return err;
}
}
Loading

0 comments on commit ab27886

Please sign in to comment.