Skip to content

Commit

Permalink
fix(flow): fail parent on failure by default (#2682)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Aug 14, 2024
1 parent b5993b3 commit fe0cd9c
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 49 deletions.
8 changes: 1 addition & 7 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const logger = debuglog('bull');

const optsDecodeMap = {
de: 'debounce',
fpof: 'failParentOnFailure',
fpof: 'failParentOnFailure', // TODO: deprecate it in next breaking change
idof: 'ignoreDependencyOnFailure',
kl: 'keepLogs',
rdof: 'removeDependencyOnFailure',
Expand Down Expand Up @@ -1210,12 +1210,6 @@ export class Job<
throw new Error(`Delay and repeat options could not be used together`);
}

if (this.opts.removeDependencyOnFailure && this.opts.failParentOnFailure) {
throw new Error(
`RemoveDependencyOnFailure and failParentOnFailure options can not be used together`,
);
}

if (`${parseInt(this.id, 10)}` === this.id) {
throw new Error('Custom Ids cannot be integers');
}
Expand Down
16 changes: 8 additions & 8 deletions src/commands/includes/moveParentFromWaitingChildrenToFailed.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey,

if jobAttributes[1] then
local parentData = cjson.decode(jobAttributes[1])
if parentData['fpof'] then
if parentData['rdof'] then
local grandParentKey = parentData['queueKey'] .. ':' .. parentData['id']
local grandParentDependenciesSet = grandParentKey .. ":dependencies"
if rcall("SREM", grandParentDependenciesSet, parentKey) == 1 then
moveParentToWaitIfNeeded(parentData['queueKey'], grandParentDependenciesSet,
grandParentKey, parentData['id'], timestamp)
end
else
moveParentFromWaitingChildrenToFailed(
parentData['queueKey'],
parentData['queueKey'] .. ':' .. parentData['id'],
parentData['id'],
parentKey,
timestamp
)
elseif parentData['rdof'] then
local grandParentKey = parentData['queueKey'] .. ':' .. parentData['id']
local grandParentDependenciesSet = grandParentKey .. ":dependencies"
if rcall("SREM", grandParentDependenciesSet, parentKey) == 1 then
moveParentToWaitIfNeeded(parentData['queueKey'], grandParentDependenciesSet,
grandParentKey, parentData['id'], timestamp)
end
end
end
end
Expand Down
12 changes: 6 additions & 6 deletions src/commands/moveToFinished-14.lua
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
opts - maxMetricsSize
opts - fpof - fail parent on fail
opts - idof - ignore dependency on fail
opts - rdof - remove dependency on fail
opts - rdof - remove dependency on fail TODO: remove it in next breaking change
Output:
0 OK
Expand Down Expand Up @@ -140,11 +140,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
ARGV[4], timestamp)
end
else
if opts['fpof'] then
moveParentFromWaitingChildrenToFailed(parentQueueKey, parentKey,
parentId, jobIdKey,
timestamp)
elseif opts['idof'] or opts['rdof'] then
if opts['idof'] or opts['rdof'] then
local dependenciesSet = parentKey .. ":dependencies"
if rcall("SREM", dependenciesSet, jobIdKey) == 1 then
moveParentToWaitIfNeeded(parentQueueKey, dependenciesSet,
Expand All @@ -154,6 +150,10 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
rcall("HSET", failedSet, jobIdKey, ARGV[4])
end
end
else
moveParentFromWaitingChildrenToFailed(parentQueueKey, parentKey,
parentId, jobIdKey,
timestamp)
end
end
end
Expand Down
27 changes: 14 additions & 13 deletions tests/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe('flows', () => {
});

describe('when removeOnFail is true in last pending child', () => {
it('moves parent to wait without getting stuck', async () => {
it('moves parent to failed without getting stuck', async () => {
const worker = new Worker(
queueName,
async job => {
Expand All @@ -51,9 +51,14 @@ describe('flows', () => {
{ connection, prefix },
);
await worker.waitUntilReady();
const queueEvents = new QueueEvents(queueName, {
connection,
prefix,
});
await queueEvents.waitUntilReady();

const flow = new FlowProducer({ connection, prefix });
await flow.add({
const tree = await flow.add({
name: 'parent',
data: {},
queueName,
Expand All @@ -74,21 +79,17 @@ describe('flows', () => {
],
});

const completed = new Promise<void>((resolve, reject) => {
worker.on('completed', async (job: Job) => {
try {
if (job.name === 'parent') {
const { processed } = await job.getDependenciesCount();
expect(processed).to.equal(1);
resolve();
}
} catch (err) {
reject(err);
const failed = new Promise<void>(resolve => {
queueEvents.on('failed', async ({ jobId, failedReason, prev }) => {
if (jobId === tree.job.id) {
const { processed } = await tree.job!.getDependenciesCount();
expect(processed).to.equal(1);
resolve();
}
});
});

await completed;
await failed;
await flow.close();
await worker.close();
});
Expand Down
9 changes: 7 additions & 2 deletions tests/test_getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -809,20 +809,25 @@ describe('Jobs getters', function () {
});
});

await queue.add('test', {});
const flow = new FlowProducer({ connection, prefix });
await flow.add({
name: 'parent-job',
queueName,
data: {},
children: [
{ name: 'child-1', data: { idx: 0, foo: 'bar' }, queueName },
{
name: 'child-1',
data: { idx: 0, foo: 'bar' },
queueName,
opts: { delay: 6000 },
},
{ name: 'child-2', data: { idx: 1, foo: 'baz' }, queueName },
{ name: 'child-3', data: { idx: 2, foo: 'bac' }, queueName },
{ name: 'child-4', data: { idx: 3, foo: 'bad' }, queueName },
],
});

await queue.add('test', { idx: 2 }, { delay: 5000 });
await queue.add('test', { idx: 3 }, { priority: 5 });

await completing;
Expand Down
13 changes: 0 additions & 13 deletions tests/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,6 @@ describe('Job', function () {
});
});

describe('when removeDependencyOnFailure and failParentOnFailure options are provided', () => {
it('throws an error', async () => {
const data = { foo: 'bar' };
const opts = {
removeDependencyOnFailure: true,
failParentOnFailure: true,
};
await expect(Job.create(queue, 'test', data, opts)).to.be.rejectedWith(
'RemoveDependencyOnFailure and failParentOnFailure options can not be used together',
);
});
});

describe('when priority option is provided as float', () => {
it('throws an error', async () => {
const data = { foo: 'bar' };
Expand Down

0 comments on commit fe0cd9c

Please sign in to comment.