Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

let _claimPendingJobs have a valid job when updating leads to version conflict #21980

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 63 additions & 8 deletions x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import expect from 'expect.js';
import sinon from 'sinon';
import moment from 'moment';
import { noop, random, get, find } from 'lodash';
import { noop, random, get, find, identity } from 'lodash';
import { ClientMock } from './fixtures/elasticsearch';
import { QueueMock } from './fixtures/queue';
import { Worker } from '../worker';
Expand Down Expand Up @@ -494,25 +494,57 @@ describe('Worker class', function () {
expect(msg).to.equal(false);
});

it('should return true on version errors', function () {
it('should reject the promise on version errors', function () {
mockQueue.client.update.restore();
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 }));
return worker._claimJob(job)
.then((res) => expect(res).to.equal(true));
.catch(err => {
expect(err).to.eql({ statusCode: 409 });
});
});

it('should return false on other errors', function () {
it('should reject the promise on other errors', function () {
mockQueue.client.update.restore();
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
return worker._claimJob(job)
.then((res) => expect(res).to.equal(false));
.catch(err => {
expect(err).to.eql({ statusCode: 401 });
});
});
});

describe('find a pending job to claim', function () {
const getMockJobs = (status = 'pending') => ([{
_index: 'myIndex',
_type: 'test',
_id: 12345,
_version: 3,
found: true,
_source: {
jobtype: 'jobtype',
created_by: false,
payload: { id: 'sample-job-1', now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)' },
priority: 10,
timeout: 10000,
created_at: '2016-04-25T21:13:04.738Z',
attempts: 0,
max_attempts: 3,
status
},
}]);

it('should emit on other errors', function (done) {
beforeEach(function () {
worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions);
});

afterEach(() => {
mockQueue.client.update.restore();
});

it('should emit for errors from claiming job', function (done) {
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));

worker.on(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) {
worker.once(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) {
try {
expect(err).to.have.property('error');
expect(err).to.have.property('job');
Expand All @@ -523,7 +555,30 @@ describe('Worker class', function () {
done(e);
}
});
worker._claimJob(job);

worker._claimPendingJobs(getMockJobs());
});

it('should reject the promise if an error claiming the job', function () {
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 }));
return worker._claimPendingJobs(getMockJobs())
.catch(err => {
expect(err).to.eql({ statusCode: 409 });
});
});

it('should get the pending job', function () {
sinon.stub(mockQueue.client, 'update').returns(Promise.resolve({ test: 'cool' }));
sinon.stub(worker, '_performJob').callsFake(identity);
return worker._claimPendingJobs(getMockJobs())
.then(claimedJob => {
expect(claimedJob._index).to.be('myIndex');
expect(claimedJob._type).to.be('test');
expect(claimedJob._source.jobtype).to.be('jobtype');
expect(claimedJob._source.status).to.be('processing');
expect(claimedJob.test).to.be('cool');
worker._performJob.restore();
});
});
});

Expand Down
68 changes: 37 additions & 31 deletions x-pack/plugins/reporting/server/lib/esqueue/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ function formatJobObject(job) {
};
}

function getLogger(opts, id, logLevel) {
return (msg, err) => {
const logger = opts.logger || function () {};

const message = `${id} - ${msg}`;
const tags = ['worker', logLevel];

if (err) {
logger(`${message}: ${err.stack ? err.stack : err }`, tags);
return;
}

logger(message, tags);
};
}

export class Worker extends events.EventEmitter {
constructor(queue, type, workerFn, opts) {
if (typeof type !== 'string') throw new Error('type must be a string');
Expand All @@ -40,19 +56,8 @@ export class Worker extends events.EventEmitter {
this.checkSize = opts.size || 10;
this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE;

this.debug = (msg, err) => {
const logger = opts.logger || function () {};

const message = `${this.id} - ${msg}`;
const tags = ['worker', 'debug'];

if (err) {
logger(`${message}: ${err.stack ? err.stack : err }`, tags);
return;
}

logger(message, tags);
};
this.debug = getLogger(opts, this.id, 'debug');
this.warn = getLogger(opts, this.id, 'warn');

this._running = true;
this.debug(`Created worker for job type ${this.jobtype}`);
Expand Down Expand Up @@ -134,17 +139,11 @@ export class Worker extends events.EventEmitter {
...doc
};
return updatedJob;
})
.catch((err) => {
if (err.statusCode === 409) return true;
this.debug(`_claimJob failed on job ${job._id}`, err);
this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job));
return false;
});
}

_failJob(job, output = false) {
this.debug(`Failing job ${job._id}`);
this.warn(`Failing job ${job._id}`);

const completedTime = moment().toISOString();
const docOutput = this._formatOutput(output);
Expand All @@ -170,7 +169,7 @@ export class Worker extends events.EventEmitter {
.then(() => true)
.catch((err) => {
if (err.statusCode === 409) return true;
this.debug(`_failJob failed to update job ${job._id}`, err);
this.warn(`_failJob failed to update job ${job._id}`, err);
this.emit(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, this._formatErrorParams(err, job));
return false;
});
Expand Down Expand Up @@ -215,7 +214,7 @@ export class Worker extends events.EventEmitter {
if (isResolved) return;

cancellationToken.cancel();
this.debug(`Timeout processing job ${job._id}`);
this.warn(`Timeout processing job ${job._id}`);
reject(new WorkerTimeoutError(`Worker timed out, timeout = ${job._source.timeout}`, {
timeout: job._source.timeout,
jobId: job._id,
Expand Down Expand Up @@ -253,7 +252,7 @@ export class Worker extends events.EventEmitter {
})
.catch((err) => {
if (err.statusCode === 409) return false;
this.debug(`Failure saving job output ${job._id}`, err);
this.warn(`Failure saving job output ${job._id}`, err);
this.emit(constants.EVENT_WORKER_JOB_UPDATE_ERROR, this._formatErrorParams(err, job));
});
}, (jobErr) => {
Expand All @@ -265,7 +264,7 @@ export class Worker extends events.EventEmitter {

// job execution failed
if (jobErr.name === 'WorkerTimeoutError') {
this.debug(`Timeout on job ${job._id}`);
this.warn(`Timeout on job ${job._id}`);
this.emit(constants.EVENT_WORKER_JOB_TIMEOUT, this._formatErrorParams(jobErr, job));
return;

Expand All @@ -278,7 +277,7 @@ export class Worker extends events.EventEmitter {
}
}

this.debug(`Failure occurred on job ${job._id}`, jobErr);
this.warn(`Failure occurred on job ${job._id}`, jobErr);
this.emit(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, this._formatErrorParams(jobErr, job));
return this._failJob(job, (jobErr.toString) ? jobErr.toString() : false);
});
Expand Down Expand Up @@ -316,23 +315,30 @@ export class Worker extends events.EventEmitter {

return this._claimJob(job)
.then((claimResult) => {
if (claimResult !== false) {
claimed = true;
return claimResult;
claimed = true;
return claimResult;
})
.catch((err) => {
if (err.statusCode === 409) {
this.warn(`_claimPendingJobs encountered a version conflict on updating pending job ${job._id}`, err);
return; // continue reducing and looking for a different job to claim
}
this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job));
return Promise.reject(err);
});
});
}, Promise.resolve())
.then((claimedJob) => {
if (!claimedJob) {
this.debug(`All ${jobs.length} jobs already claimed`);
this.debug(`Found no claimable jobs out of ${jobs.length} total`);
return;
}
this.debug(`Claimed job ${claimedJob._id}`);
return this._performJob(claimedJob);
})
.catch((err) => {
this.debug('Error claiming jobs', err);
this.warn('Error claiming jobs', err);
return Promise.reject(err);
});
}

Expand Down Expand Up @@ -384,7 +390,7 @@ export class Worker extends events.EventEmitter {
// ignore missing indices errors
if (err && err.status === 404) return [];

this.debug('job querying failed', err);
this.warn('job querying failed', err);
this.emit(constants.EVENT_WORKER_JOB_SEARCH_ERROR, this._formatErrorParams(err));
throw err;
});
Expand Down