Skip to content

Commit

Permalink
[ML] file import
Browse files Browse the repository at this point in the history
  • Loading branch information
darnautov committed Feb 7, 2020
1 parent e5e6098 commit 5570202
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import chrome from 'ui/chrome';

import { http } from '../http_service';

const basePath = chrome.addBasePath('/api');
const basePath = chrome.addBasePath('/api/ml');

export const fileDatavisualizer = {
analyzeFile(obj, params = {}) {
Expand All @@ -22,7 +22,7 @@ export const fileDatavisualizer = {
}
}
return http({
url: `${basePath}/ml/file_data_visualizer/analyze_file${paramString}`,
url: `${basePath}/file_data_visualizer/analyze_file${paramString}`,
method: 'POST',
data: obj,
});
Expand All @@ -33,7 +33,7 @@ export const fileDatavisualizer = {
const { index, data, settings, mappings, ingestPipeline } = obj;

return http({
url: `${basePath}/fileupload/import${paramString}`,
url: `${basePath}/file_data_visualizer/import${paramString}`,
method: 'POST',
data: {
index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
import Boom from 'boom';
import { RequestHandlerContext } from 'kibana/server';

export interface InputData {
[key: string]: any;
}
export type InputData = any[];

export interface InputOverrides {
[key: string]: string;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { RequestHandlerContext } from 'kibana/server';
import { INDEX_META_DATA_CREATED_BY } from '../../../common/constants/file_datavisualizer';
import { InputData } from './file_data_visualizer';

export interface Settings {
pipeline?: string;
index: string;
body: any[];
[key: string]: any;
}

export interface Mappings {
[key: string]: any;
}

export interface InjectPipeline {
id: string;
pipeline: any;
}

interface Failure {
item: number;
reason: string;
doc: any;
}

export function importDataProvider(context: RequestHandlerContext) {
const callAsCurrentUser = context.ml!.mlClient.callAsCurrentUser;

async function importData(
id: string,
index: string,
settings: Settings,
mappings: Mappings,
ingestPipeline: InjectPipeline,
data: InputData
) {
let createdIndex;
let createdPipelineId;
const docCount = data.length;

try {
const { id: pipelineId, pipeline } = ingestPipeline;

if (id === undefined) {
// first chunk of data, create the index and id to return
id = generateId();

await createIndex(index, settings, mappings);
createdIndex = index;

// create the pipeline if one has been supplied
if (pipelineId !== undefined) {
const success = await createPipeline(pipelineId, pipeline);
if (success.acknowledged !== true) {
throw success;
}
}
createdPipelineId = pipelineId;
} else {
createdIndex = index;
createdPipelineId = pipelineId;
}

let failures: Failure[] = [];
if (data.length) {
const resp = await indexData(index, createdPipelineId, data);
if (resp.success === false) {
if (resp.ingestError) {
// all docs failed, abort
throw resp;
} else {
// some docs failed.
// still report success but with a list of failures
failures = resp.failures || [];
}
}
}

return {
success: true,
id,
index: createdIndex,
pipelineId: createdPipelineId,
docCount,
failures,
};
} catch (error) {
return {
success: false,
id,
index: createdIndex,
pipelineId: createdPipelineId,
error: error.error !== undefined ? error.error : error,
docCount,
ingestError: error.ingestError,
failures: error.failures || [],
};
}
}

async function createIndex(index: string, settings: Settings, mappings: Mappings) {
const body: { mappings: Mappings; settings?: Settings } = {
mappings: {
_meta: {
created_by: INDEX_META_DATA_CREATED_BY,
},
properties: mappings,
},
};

if (settings && Object.keys(settings).length) {
body.settings = settings;
}

await callAsCurrentUser('indices.create', { index, body });
}

async function indexData(index: string, pipelineId: string, data: InputData) {
try {
const body = [];
for (let i = 0; i < data.length; i++) {
body.push({ index: {} });
body.push(data[i]);
}

const settings: Settings = { index, body };
if (pipelineId !== undefined) {
settings.pipeline = pipelineId;
}

const resp = await callAsCurrentUser('bulk', settings);
if (resp.errors) {
throw resp;
} else {
return {
success: true,
docs: data.length,
failures: [],
};
}
} catch (error) {
let failures: Failure[] = [];
let ingestError = false;
if (error.errors !== undefined && Array.isArray(error.items)) {
// an expected error where some or all of the bulk request
// docs have failed to be ingested.
failures = getFailures(error.items, data);
} else {
// some other error has happened.
ingestError = true;
}

return {
success: false,
error,
docCount: data.length,
failures,
ingestError,
};
}
}

async function createPipeline(id: string, pipeline: any) {
return await callAsCurrentUser('ingest.putPipeline', { id, body: pipeline });
}

function getFailures(items: any[], data: InputData): Failure[] {
const failures = [];
for (let i = 0; i < items.length; i++) {
const item = items[i];
if (item.index && item.index.error) {
failures.push({
item: i,
reason: item.index.error.reason,
doc: data[i],
});
}
}
return failures;
}

return {
importData,
};
}

function generateId() {
return Math.random()
.toString(36)
.substr(2, 9);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ export {
InputData,
AnalysisResult,
} from './file_data_visualizer';

export { importDataProvider, Settings, InjectPipeline, Mappings } from './import_data';
85 changes: 84 additions & 1 deletion x-pack/legacy/plugins/ml/server/routes/file_data_visualizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,49 @@

import { schema } from '@kbn/config-schema';
import { RequestHandlerContext } from 'kibana/server';
import { MAX_BYTES } from '../../common/constants/file_datavisualizer';
import { wrapError } from '../client/error_wrapper';
import {
InputOverrides,
InputData,
fileDataVisualizerProvider,
importDataProvider,
Settings,
InjectPipeline,
Mappings,
} from '../models/file_data_visualizer';

import { licensePreRoutingFactory } from '../new_platform/licence_check_pre_routing_factory';
import { RouteInitialization } from '../new_platform/plugin';
import { incrementFileDataVisualizerIndexCreationCount } from '../lib/ml_telemetry';

function analyzeFiles(context: RequestHandlerContext, data: InputData, overrides: InputOverrides) {
const { analyzeFile } = fileDataVisualizerProvider(context);
return analyzeFile(data, overrides);
}

function importData(
context: RequestHandlerContext,
id: string,
index: string,
settings: Settings,
mappings: Mappings,
ingestPipeline: InjectPipeline,
data: InputData
) {
const { importData: importDataFunc } = importDataProvider(context);
return importDataFunc(id, index, settings, mappings, ingestPipeline, data);
}

/**
* Routes for the file data visualizer.
*/
export function fileDataVisualizerRoutes({ router, xpackMainPlugin }: RouteInitialization) {
export function fileDataVisualizerRoutes({
router,
xpackMainPlugin,
savedObjects,
elasticsearchPlugin,
}: RouteInitialization) {
/**
* @apiGroup FileDataVisualizer
*
Expand All @@ -38,6 +62,12 @@ export function fileDataVisualizerRoutes({ router, xpackMainPlugin }: RouteIniti
validate: {
body: schema.any(),
},
options: {
body: {
accepts: ['text/*', 'application/json'],
maxBytes: MAX_BYTES,
},
},
},
licensePreRoutingFactory(xpackMainPlugin, async (context, request, response) => {
try {
Expand All @@ -48,4 +78,57 @@ export function fileDataVisualizerRoutes({ router, xpackMainPlugin }: RouteIniti
}
})
);

router.post(
{
path: '/api/ml/file_data_visualizer/import',
validate: {
query: schema.object({
id: schema.maybe(schema.string()),
}),
body: schema.object({
index: schema.maybe(schema.string()),
data: schema.arrayOf(schema.any()),
settings: schema.maybe(schema.any()),
mappings: schema.any(),
ingestPipeline: schema.object({
id: schema.string(),
pipeline: schema.maybe(schema.any()),
}),
}),
},
options: {
body: {
accepts: ['application/json'],
maxBytes: MAX_BYTES,
},
},
},
licensePreRoutingFactory(xpackMainPlugin, async (context, request, response) => {
try {
const { id } = request.query;
const { index, data, settings, mappings, ingestPipeline } = request.body;

// `id` being `undefined` tells us that this is a new import due to create a new index.
// follow-up import calls to just add additional data will include the `id` of the created
// index, we'll ignore those and don't increment the counter.
if (id === undefined) {
await incrementFileDataVisualizerIndexCreationCount(elasticsearchPlugin, savedObjects!);
}

const result = await importData(
context,
id,
index,
settings,
mappings,
ingestPipeline,
data
);
return response.ok({ body: result });
} catch (e) {
return response.customError(wrapError(e));
}
})
);
}

0 comments on commit 5570202

Please sign in to comment.