Skip to content

Commit

Permalink
Merge pull request #5 from tsullivan/alerting/task-scheduler-middlewa…
Browse files Browse the repository at this point in the history
…re-extension-points

Task scheduler middleware extension points
  • Loading branch information
chrisdavies authored Sep 17, 2018
2 parents 936637d + 06df98c commit 78261aa
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 179 deletions.
46 changes: 0 additions & 46 deletions src/server/task_manager/client_wrapper.ts

This file was deleted.

80 changes: 0 additions & 80 deletions src/server/task_manager/default_client.ts

This file was deleted.

59 changes: 59 additions & 0 deletions src/server/task_manager/middleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { RunContext, TaskInstance } from './task';

/*
* BeforeSaveMiddlewareParams is nearly identical to RunContext, but
* taskInstance is before save (no _id property)
*
* taskInstance property is guaranteed to exist. The params can optionally
* include fields from an "options" object passed as the 2nd parameter to
* taskManager.schedule()
*/
export interface BeforeSaveMiddlewareParams {
taskInstance: TaskInstance;
}

export type BeforeSaveFunction = (
params: BeforeSaveMiddlewareParams
) => Promise<BeforeSaveMiddlewareParams>;

export type BeforeRunFunction = (params: RunContext) => Promise<RunContext>;

export interface Middleware {
beforeSave: BeforeSaveFunction;
beforeRun: BeforeRunFunction;
}

export function addMiddlewareToChain(prevMiddleware: Middleware, middleware: Middleware) {
const beforeSave = middleware.beforeSave
? (params: BeforeSaveMiddlewareParams) =>
middleware.beforeSave(params).then(prevMiddleware.beforeSave)
: prevMiddleware.beforeSave;

const beforeRun = middleware.beforeRun
? (params: RunContext) => middleware.beforeRun(params).then(prevMiddleware.beforeRun)
: prevMiddleware.beforeRun;

return {
beforeSave,
beforeRun,
};
}
10 changes: 1 addition & 9 deletions src/server/task_manager/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,9 @@ export type ElasticJs = (action: string, args: any) => Promise<any>;
* The run context is passed into a task's run function as its sole argument.
*/
export interface RunContext {
/**
* The elastic search js wrapper function which the task can use
* to access Elastic.
*/
callCluster: ElasticJs;

/**
* The Kibana server object. This gives tasks full-access to the server object,
* but if the task needs to query ES in the context of the user who scheduled
* the task, it hsould use `callCluster` rather than the various ES options
* available in kbnServer.
* including the various ES options client functions
*/
kbnServer: object;

Expand Down
119 changes: 106 additions & 13 deletions src/server/task_manager/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,128 @@
* under the License.
*/

import { TaskInstance } from './task';
import { fillPool } from './fill_pool';
import { TaskManagerLogger } from './logger';
import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './middleware';
import {
ConcreteTaskInstance,
RunContext,
SanitizedTaskDefinition,
TaskDictionary,
TaskInstance,
} from './task';
import { TaskPoller } from './task_poller';
import { TaskPool } from './task_pool';
import { TaskManagerRunner } from './task_runner';
import { FetchOpts, FetchResult, RawTaskDoc, RemoveResult, TaskStore } from './task_store';

interface Opts {
poller: TaskPoller;
store: TaskStore;
interface ConstructOpts {
logger: TaskManagerLogger;
maxWorkers: number;
definitions: TaskDictionary<SanitizedTaskDefinition>;
}

export class TaskManager {
private poller: TaskPoller;
private store: TaskStore;
private logger: TaskManagerLogger;
private maxWorkers: number;
private definitions: TaskDictionary<SanitizedTaskDefinition>;
private middleware: Middleware;
private poller: TaskPoller | null;
private store: TaskStore | null;
private initialized: boolean;

constructor(opts: Opts) {
this.poller = opts.poller;
this.store = opts.store;
constructor(opts: ConstructOpts) {
this.initialized = false;

const { logger, maxWorkers, definitions } = opts;
this.logger = logger;
this.maxWorkers = maxWorkers;
this.definitions = definitions;

this.middleware = {
beforeSave: async (saveOpts: BeforeSaveMiddlewareParams) => saveOpts,
beforeRun: async (runOpts: RunContext) => runOpts,
};

this.poller = null;
this.store = null;
}

public async afterPluginsInit(kbnServer: any, server: any, config: any) {
const callCluster = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser;
const store = new TaskStore({
index: config.get('task_manager.index'),
callCluster,
maxAttempts: config.get('task_manager.max_attempts'),
supportedTypes: Object.keys(this.definitions),
});
this.store = store;

this.logger.debug('Initializing the task manager index');
await store.init();

const pool = new TaskPool({
logger: this.logger,
maxWorkers: this.maxWorkers,
});

const poller = new TaskPoller({
logger: this.logger,
pollInterval: config.get('task_manager.poll_interval'),
work: () =>
fillPool(
pool.run,
store.fetchAvailableTasks,
(instance: ConcreteTaskInstance) =>
new TaskManagerRunner({
logger: this.logger,
definition: this.definitions[instance.taskType],
kbnServer,
instance,
store,
beforeRun: this.middleware.beforeRun,
})
),
});
this.poller = poller;
await this.poller.start();

this.initialized = true;
}

public addMiddleware(middleware: Middleware) {
const prevMiddleWare = this.middleware;
this.middleware = addMiddlewareToChain(prevMiddleWare, middleware);
}

public async schedule(task: TaskInstance): Promise<RawTaskDoc> {
const result = await this.store.schedule(task);
/*
* Saves a task
* @param {TaskInstance} taskInstance
*/
public async schedule(taskInstance: TaskInstance, options?: any): Promise<RawTaskDoc> {
if (!this.initialized || !this.poller || !this.store) {
throw new Error('Task Manager service is not ready for tasks to be scheduled');
}
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...options,
taskInstance,
});
const result = await this.store.schedule(modifiedTask);
this.poller.attemptWork();
return result;
}

public fetch(opts: FetchOpts = {}): Promise<FetchResult> {
public fetch(opts: FetchOpts = {}): Promise<FetchResult> | null {
if (!this.initialized || !this.store) {
throw new Error('Task Manager service is not ready to fetch tasks');
}
return this.store.fetch(opts);
}

public remove(id: string): Promise<RemoveResult> {
public remove(id: string): Promise<RemoveResult> | null {
if (!this.initialized || !this.store) {
throw new Error('Task Manager service is not ready to remove a task');
}
return this.store.remove(id);
}
}
21 changes: 5 additions & 16 deletions src/server/task_manager/task_manager_mixin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
*/

import Joi from 'joi';
import { TaskManagerClientWrapper } from './client_wrapper';
import { getDefaultClient } from './default_client';
import { TaskManagerLogger } from './logger';
import {
SanitizedTaskDefinition,
TaskDefinition,
TaskDictionary,
validateTaskDefinition,
} from './task';
import { TaskManager } from './task_manager';

export async function taskManagerMixin(kbnServer: any, server: any, config: any) {
const logger = new TaskManagerLogger((...args: any[]) => server.log(...args));
Expand All @@ -37,22 +36,12 @@ export async function taskManagerMixin(kbnServer: any, server: any, config: any)
config.get('task_manager.override_num_workers')
);

server.decorate('server', 'taskManager', new TaskManagerClientWrapper());

kbnServer.afterPluginsInit(async () => {
const client = await getDefaultClient(
kbnServer,
server,
config,
logger,
maxWorkers,
definitions
);
server.taskManager.setClient(client);
});
const client = new TaskManager({ logger, maxWorkers, definitions });
server.decorate('server', 'taskManager', client);
kbnServer.afterPluginsInit(() => client.afterPluginsInit(kbnServer, server, config));
}

// TODO, move this to a file and properly test it, validate the taskDefinition via Joi or something
// TODO, move this to a file and properly test it
function extractTaskDefinitions(
maxWorkers: number,
taskDefinitions: TaskDictionary<TaskDefinition> = {},
Expand Down
Loading

0 comments on commit 78261aa

Please sign in to comment.