Skip to content

Commit

Permalink
[ML] Separate datafeed and job
Browse files Browse the repository at this point in the history
  • Loading branch information
qn895 committed Jan 26, 2021
1 parent e515028 commit 9f446c2
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { cloneDeep } from 'lodash';
import { Datafeed } from './datafeed';
import { DatafeedStats } from './datafeed_stats';
import { Job } from './job';
Expand All @@ -25,16 +24,6 @@ export interface CombinedJobWithStats extends JobWithStats {
datafeed_config: DatafeedWithStats;
}

export function expandCombinedJobConfig(combinedJob: CombinedJob) {
const combinedJobClone = cloneDeep(combinedJob);
const job = combinedJobClone;
const datafeed = combinedJobClone.datafeed_config;
// @ts-expect-error
delete job.datafeed_config;

return { job, datafeed };
}

export function isCombinedJobWithStats(arg: any): arg is CombinedJobWithStats {
return typeof arg.job_id === 'string';
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ export function loadFullJob(jobId) {
export function loadJobForExport(jobId) {
return new Promise((resolve, reject) => {
ml.jobs
.jobsForExport([jobId])
.then((jobs) => {
if (jobs.length) {
resolve(jobs[0]);
.jobForExport(jobId)
.then((resp) => {
if (resp?.job) {
resolve(resp);
} else {
throw new Error(`Could not find job ${jobId}`);
}
Expand Down Expand Up @@ -197,21 +197,14 @@ function showResults(resp, action) {

export async function cloneJob(jobId) {
try {
const [cloneableJob, originalJob] = await Promise.all([
const [{ job: cloneableJob, datafeed }, originalJob] = await Promise.all([
loadJobForExport(jobId),
loadFullJob(jobId, false),
]);
if (cloneableJob !== undefined && originalJob?.custom_settings?.created_by !== undefined) {
// if the job is from a wizards, i.e. contains a created_by property
// use tempJobCloningObjects to temporarily store the job
// cloneableJob.custom_settings.created_by = originalJob?.custom_settings?.created_by;

if (cloneableJob.custom_settings === undefined) {
cloneableJob.custom_settings = originalJob.custom_settings;
} else {
cloneableJob.custom_settings.created_by = originalJob.custom_settings.created_by;
}

mlJobService.tempJobCloningObjects.createdBy = originalJob?.custom_settings?.created_by;
mlJobService.tempJobCloningObjects.job = cloneableJob;

if (
Expand Down Expand Up @@ -244,7 +237,13 @@ export async function cloneJob(jobId) {
} else {
// otherwise use the tempJobCloningObjects
mlJobService.tempJobCloningObjects.job = cloneableJob;
// resets the createdBy field in case it still retains previous settings
mlJobService.tempJobCloningObjects.createdBy = undefined;
}
if (datafeed !== undefined) {
mlJobService.tempJobCloningObjects.datafeed = datafeed;
}

if (originalJob.calendars) {
mlJobService.tempJobCloningObjects.calendars = await mlCalendarService.fetchCalendarsByIds(
originalJob.calendars
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ import { ApplicationStart } from 'kibana/public';
import { IndexPatternsContract } from '../../../../../../../../../src/plugins/data/public';
import { mlJobService } from '../../../../services/job_service';
import { loadIndexPatterns, getIndexPatternIdFromName } from '../../../../util/index_utils';
import { CombinedJob } from '../../../../../../common/types/anomaly_detection_jobs';
import { Datafeed, Job } from '../../../../../../common/types/anomaly_detection_jobs';
import { CREATED_BY_LABEL, JOB_TYPE } from '../../../../../../common/constants/new_job';

export async function preConfiguredJobRedirect(
indexPatterns: IndexPatternsContract,
basePath: string,
navigateToUrl: ApplicationStart['navigateToUrl']
) {
const { job } = mlJobService.tempJobCloningObjects;
if (job) {
const { createdBy, job, datafeed } = mlJobService.tempJobCloningObjects;
if (job && datafeed) {
try {
await loadIndexPatterns(indexPatterns);
const redirectUrl = getWizardUrlFromCloningJob(job);
const redirectUrl = getWizardUrlFromCloningJob(createdBy, job, datafeed);
await navigateToUrl(`${basePath}/app/ml/${redirectUrl}`);
return Promise.reject();
} catch (error) {
Expand All @@ -33,8 +33,8 @@ export async function preConfiguredJobRedirect(
}
}

function getWizardUrlFromCloningJob(job: CombinedJob) {
const created = job?.custom_settings?.created_by;
function getWizardUrlFromCloningJob(createdBy: string | undefined, job: Job, datafeed: Datafeed) {
const created = createdBy;
let page = '';

switch (created) {
Expand All @@ -55,7 +55,7 @@ function getWizardUrlFromCloningJob(job: CombinedJob) {
break;
}

const indexPatternId = getIndexPatternIdFromName(job.datafeed_config.indices.join());
const indexPatternId = getIndexPatternIdFromName(datafeed.indices.join());

return `jobs/new_job/${page}?index=${indexPatternId}&_g=()`;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import { useMlContext } from '../../../../contexts/ml';
import { getTimeFilterRange } from '../../../../components/full_time_range_selector';
import { getTimeBucketsFromCache } from '../../../../util/time_buckets';
import { ExistingJobsAndGroups, mlJobService } from '../../../../services/job_service';
import { expandCombinedJobConfig } from '../../../../../../common/types/anomaly_detection_jobs';
import { newJobCapsService } from '../../../../services/new_job_capabilities_service';
import { EVENT_RATE_FIELD_ID } from '../../../../../../common/types/fields';
import { getNewJobDefaults } from '../../../../services/ml_server_info';
Expand Down Expand Up @@ -74,10 +73,11 @@ export const Page: FC<PageProps> = ({ existingJobsAndGroups, jobType }) => {

if (mlJobService.tempJobCloningObjects.job !== undefined) {
// cloning a job
const clonedJob = mlJobService.cloneJob(mlJobService.tempJobCloningObjects.job);
const { job, datafeed } = expandCombinedJobConfig(clonedJob);
const clonedJob = mlJobService.tempJobCloningObjects.job;
const clonedDatafeed = mlJobService.cloneDatafeed(mlJobService.tempJobCloningObjects.datafeed);

initCategorizationSettings();
jobCreator.cloneFromExistingJob(job, datafeed);
jobCreator.cloneFromExistingJob(clonedJob, clonedDatafeed);

// if we're not skipping the time range, this is a standard job clone, so wipe the jobId
if (mlJobService.tempJobCloningObjects.skipTimeRangeStep === false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ declare interface JobService {
jobs: CombinedJob[];
createResultsUrlForJobs: (jobs: any[], target: string, timeRange?: TimeRange) => string;
tempJobCloningObjects: {
createdBy?: string;
datafeed?: Datafeed;
job: any;
skipTimeRangeStep: boolean;
start?: number;
Expand All @@ -26,7 +28,7 @@ declare interface JobService {
};
skipTimeRangeStep: boolean;
saveNewJob(job: any): Promise<any>;
cloneJob(job: any): any;
cloneDatafeed(datafeed: any): Datafeed;
openJob(jobId: string): Promise<any>;
saveNewDatafeed(datafeedConfig: any, jobId: string): Promise<any>;
startDatafeed(
Expand Down
54 changes: 15 additions & 39 deletions x-pack/plugins/ml/public/application/services/job_service.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class JobService {
// if populated when loading the job management page, the start datafeed modal
// is automatically opened.
this.tempJobCloningObjects = {
createdBy: undefined,
datafeed: undefined,
job: undefined,
skipTimeRangeStep: false,
start: undefined,
Expand Down Expand Up @@ -325,58 +327,32 @@ class JobService {
return ml.addJob({ jobId: job.job_id, job }).then(func).catch(func);
}

cloneJob(job) {
// create a deep copy of a job object
// also remove items from the job which are set by the server and not needed
// in the future this formatting could be optional

const tempJob = cloneDeep(job);

// remove all of the items which should not be copied
// such as counts, state and times
delete tempJob.calendars;
delete tempJob.data_counts;
delete tempJob.model_size_stats;
delete tempJob.forecasts_stats;
delete tempJob.state;
delete tempJob.timing_stats;

delete tempJob.analysis_config.use_per_partition_normalization;

each(tempJob.analysis_config.detectors, (d) => {
delete d.detector_index;
});
cloneDatafeed(datafeed) {
const tempDatafeed = cloneDeep(datafeed);

// remove parts of the datafeed config which should not be copied
if (tempJob.datafeed_config) {
delete tempJob.datafeed_config.datafeed_id;
delete tempJob.datafeed_config.job_id;
delete tempJob.datafeed_config.state;
delete tempJob.datafeed_config.node;
delete tempJob.datafeed_config.timing_stats;
delete tempJob.datafeed_config.assignment_explanation;
if (tempDatafeed) {
delete tempDatafeed.datafeed_id;
delete tempDatafeed.job_id;
delete tempDatafeed.state;
delete tempDatafeed.node;
delete tempDatafeed.timing_stats;
delete tempDatafeed.assignment_explanation;

// remove query_delay if it's between 60s and 120s
// the back-end produces a random value between 60 and 120 and so
// by deleting it, the back-end will produce a new random value
if (tempJob.datafeed_config.query_delay) {
const interval = parseInterval(tempJob.datafeed_config.query_delay);
if (tempDatafeed.query_delay) {
const interval = parseInterval(tempDatafeed.query_delay);
if (interval !== null) {
const queryDelay = interval.asSeconds();
if (queryDelay > 60 && queryDelay < 120) {
delete tempJob.datafeed_config.query_delay;
delete tempDatafeed.query_delay;
}
}
}
}

// when jumping from a wizard to the advanced job creation,
// the wizard's created_by information should be stripped.
if (tempJob.custom_settings && tempJob.custom_settings.created_by) {
delete tempJob.custom_settings.created_by;
}

return tempJob;
return tempDatafeed;
}

// find a job based on the id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {
MlJobWithTimeRange,
MlSummaryJobs,
CombinedJobWithStats,
Job,
Datafeed,
} from '../../../../common/types/anomaly_detection_jobs';
import { JobMessage } from '../../../../common/types/audit_message';
import { AggFieldNamePair } from '../../../../common/types/fields';
Expand Down Expand Up @@ -48,10 +50,10 @@ export const jobsApiProvider = (httpService: HttpService) => ({
});
},

jobsForExport(jobIds: string[]) {
const body = JSON.stringify({ jobIds });
return httpService.http<CombinedJobWithStats[]>({
path: `${basePath()}/jobs/jobs_for_export`,
jobForExport(jobId: string) {
const body = JSON.stringify({ jobId });
return httpService.http<{ job: Job; datafeed: Datafeed } | undefined>({
path: `${basePath()}/jobs/job_for_export`,
method: 'POST',
body,
});
Expand Down
28 changes: 8 additions & 20 deletions x-pack/plugins/ml/server/models/job_service/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {
AuditMessage,
DatafeedWithStats,
CombinedJobWithStats,
CombinedJob,
Datafeed,
} from '../../../common/types/anomaly_detection_jobs';
import {
Expand Down Expand Up @@ -259,16 +258,11 @@ export function jobsProvider(client: IScopedClusterClient, mlClient: MlClient) {
return { jobs, jobsMap };
}

async function createJobsListForExport(jobIds: string[] = []) {
const jobs: CombinedJob[] = [];
async function getJobForExport(jobId: string) {
const datafeeds: { [id: string]: Datafeed } = {};

const jobIdsString = jobIds.join();

const [{ body: jobResults }, { body: datafeedResults }] = await Promise.all([
mlClient.getJobs<MlJobsResponse>(
jobIds.length > 0 ? { job_id: jobIdsString, exclude_generated: true } : undefined
),
mlClient.getJobs<MlJobsResponse>({ job_id: jobId, exclude_generated: true }),
mlClient.getDatafeeds<MlDatafeedsResponse>({ exclude_generated: true }),
]);

Expand All @@ -280,18 +274,12 @@ export function jobsProvider(client: IScopedClusterClient, mlClient: MlClient) {

// create jobs objects containing job stats, datafeeds, datafeed stats and calendars
if (jobResults && jobResults.jobs) {
jobResults.jobs.forEach((job) => {
const tempJob = job as CombinedJob;

const datafeed = datafeeds[job.job_id];
if (datafeed !== undefined) {
tempJob.datafeed_config = datafeed;
}

jobs.push(tempJob);
});
const job = jobResults.jobs.find((j) => j.job_id === jobId);
if (job && datafeeds[job.job_id]) {
return { job, datafeed: datafeeds[job.job_id] };
}
}
return jobs;
return undefined;
}

async function createFullJobsList(jobIds: string[] = []) {
Expand Down Expand Up @@ -540,7 +528,7 @@ export function jobsProvider(client: IScopedClusterClient, mlClient: MlClient) {
forceStopAndCloseJob,
jobsSummary,
jobsWithTimerange,
createJobsListForExport,
getJobForExport,
createFullJobsList,
deletingJobTasks,
jobsExist,
Expand Down
15 changes: 7 additions & 8 deletions x-pack/plugins/ml/server/routes/job_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,29 +275,28 @@ export function jobServiceRoutes({ router, routeGuard }: RouteInitialization) {
/**
* @apiGroup JobService
*
* @api {post} /api/ml/jobs/jobs_for_export Get job
* @api {post} /api/ml/jobs/job_for_export Get job
* @apiName CreateFullJobsList
* @apiDescription Get the job configuration with auto generated fields excluded for cloning
*
* @apiSchema (body) jobIdsSchema
* @apiSchema (body) jobIdSchema
*/
router.post(
{
path: '/api/ml/jobs/jobs_for_export',
path: '/api/ml/jobs/job_for_export',
validate: {
body: jobIdsSchema,
body: jobIdSchema,
},
options: {
tags: ['access:ml:canGetJobs'],
},
},
routeGuard.fullLicenseAPIGuard(async ({ client, mlClient, request, response }) => {
try {
const { createJobsListForExport } = jobServiceProvider(client, mlClient);
const { jobIds } = request.body;

const resp = await createJobsListForExport(jobIds);
const { getJobForExport } = jobServiceProvider(client, mlClient);
const { jobId } = request.body;

const resp = await getJobForExport(jobId);
return response.ok({
body: resp,
});
Expand Down
5 changes: 5 additions & 0 deletions x-pack/plugins/ml/server/routes/schemas/job_service_schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ export const forceStartDatafeedSchema = schema.object({
end: schema.maybe(schema.number()),
});

export const jobIdSchema = schema.object({
/** Optional list of job IDs. */
jobIds: schema.maybe(schema.string()),
});

export const jobIdsSchema = schema.object({
/** Optional list of job IDs. */
jobIds: schema.maybe(schema.arrayOf(schema.maybe(schema.string()))),
Expand Down

0 comments on commit 9f446c2

Please sign in to comment.