From 557020220334a24e13f75a11966100e469edd6a4 Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Fri, 7 Feb 2020 13:25:00 +0100 Subject: [PATCH] [ML] file import --- .../services/ml_api_service/datavisualizer.js | 6 +- .../file_data_visualizer.ts | 4 +- .../file_data_visualizer/import_data.ts | 198 ++++++++++++++++++ .../models/file_data_visualizer/index.ts | 2 + .../ml/server/routes/file_data_visualizer.ts | 85 +++++++- 5 files changed, 288 insertions(+), 7 deletions(-) create mode 100644 x-pack/legacy/plugins/ml/server/models/file_data_visualizer/import_data.ts diff --git a/x-pack/legacy/plugins/ml/public/application/services/ml_api_service/datavisualizer.js b/x-pack/legacy/plugins/ml/public/application/services/ml_api_service/datavisualizer.js index 584641068405c1..c9f6bc08e75ec9 100644 --- a/x-pack/legacy/plugins/ml/public/application/services/ml_api_service/datavisualizer.js +++ b/x-pack/legacy/plugins/ml/public/application/services/ml_api_service/datavisualizer.js @@ -8,7 +8,7 @@ import chrome from 'ui/chrome'; import { http } from '../http_service'; -const basePath = chrome.addBasePath('/api'); +const basePath = chrome.addBasePath('/api/ml'); export const fileDatavisualizer = { analyzeFile(obj, params = {}) { @@ -22,7 +22,7 @@ export const fileDatavisualizer = { } } return http({ - url: `${basePath}/ml/file_data_visualizer/analyze_file${paramString}`, + url: `${basePath}/file_data_visualizer/analyze_file${paramString}`, method: 'POST', data: obj, }); @@ -33,7 +33,7 @@ export const fileDatavisualizer = { const { index, data, settings, mappings, ingestPipeline } = obj; return http({ - url: `${basePath}/fileupload/import${paramString}`, + url: `${basePath}/file_data_visualizer/import${paramString}`, method: 'POST', data: { index, diff --git a/x-pack/legacy/plugins/ml/server/models/file_data_visualizer/file_data_visualizer.ts b/x-pack/legacy/plugins/ml/server/models/file_data_visualizer/file_data_visualizer.ts index 5ac8868b5b9647..fd5b5221393fc9 100644 --- a/x-pack/legacy/plugins/ml/server/models/file_data_visualizer/file_data_visualizer.ts +++ b/x-pack/legacy/plugins/ml/server/models/file_data_visualizer/file_data_visualizer.ts @@ -7,9 +7,7 @@ import Boom from 'boom'; import { RequestHandlerContext } from 'kibana/server'; -export interface InputData { - [key: string]: any; -} +export type InputData = any[]; export interface InputOverrides { [key: string]: string; diff --git a/x-pack/legacy/plugins/ml/server/models/file_data_visualizer/import_data.ts b/x-pack/legacy/plugins/ml/server/models/file_data_visualizer/import_data.ts new file mode 100644 index 00000000000000..008efb43a6c07e --- /dev/null +++ b/x-pack/legacy/plugins/ml/server/models/file_data_visualizer/import_data.ts @@ -0,0 +1,198 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { RequestHandlerContext } from 'kibana/server'; +import { INDEX_META_DATA_CREATED_BY } from '../../../common/constants/file_datavisualizer'; +import { InputData } from './file_data_visualizer'; + +export interface Settings { + pipeline?: string; + index: string; + body: any[]; + [key: string]: any; +} + +export interface Mappings { + [key: string]: any; +} + +export interface InjectPipeline { + id: string; + pipeline: any; +} + +interface Failure { + item: number; + reason: string; + doc: any; +} + +export function importDataProvider(context: RequestHandlerContext) { + const callAsCurrentUser = context.ml!.mlClient.callAsCurrentUser; + + async function importData( + id: string, + index: string, + settings: Settings, + mappings: Mappings, + ingestPipeline: InjectPipeline, + data: InputData + ) { + let createdIndex; + let createdPipelineId; + const docCount = data.length; + + try { + const { id: pipelineId, pipeline } = ingestPipeline; + + if (id === undefined) { + // first chunk of data, create the index and id to return + id = generateId(); + + await createIndex(index, settings, mappings); + createdIndex = index; + + // create the pipeline if one has been supplied + if (pipelineId !== undefined) { + const success = await createPipeline(pipelineId, pipeline); + if (success.acknowledged !== true) { + throw success; + } + } + createdPipelineId = pipelineId; + } else { + createdIndex = index; + createdPipelineId = pipelineId; + } + + let failures: Failure[] = []; + if (data.length) { + const resp = await indexData(index, createdPipelineId, data); + if (resp.success === false) { + if (resp.ingestError) { + // all docs failed, abort + throw resp; + } else { + // some docs failed. + // still report success but with a list of failures + failures = resp.failures || []; + } + } + } + + return { + success: true, + id, + index: createdIndex, + pipelineId: createdPipelineId, + docCount, + failures, + }; + } catch (error) { + return { + success: false, + id, + index: createdIndex, + pipelineId: createdPipelineId, + error: error.error !== undefined ? error.error : error, + docCount, + ingestError: error.ingestError, + failures: error.failures || [], + }; + } + } + + async function createIndex(index: string, settings: Settings, mappings: Mappings) { + const body: { mappings: Mappings; settings?: Settings } = { + mappings: { + _meta: { + created_by: INDEX_META_DATA_CREATED_BY, + }, + properties: mappings, + }, + }; + + if (settings && Object.keys(settings).length) { + body.settings = settings; + } + + await callAsCurrentUser('indices.create', { index, body }); + } + + async function indexData(index: string, pipelineId: string, data: InputData) { + try { + const body = []; + for (let i = 0; i < data.length; i++) { + body.push({ index: {} }); + body.push(data[i]); + } + + const settings: Settings = { index, body }; + if (pipelineId !== undefined) { + settings.pipeline = pipelineId; + } + + const resp = await callAsCurrentUser('bulk', settings); + if (resp.errors) { + throw resp; + } else { + return { + success: true, + docs: data.length, + failures: [], + }; + } + } catch (error) { + let failures: Failure[] = []; + let ingestError = false; + if (error.errors !== undefined && Array.isArray(error.items)) { + // an expected error where some or all of the bulk request + // docs have failed to be ingested. + failures = getFailures(error.items, data); + } else { + // some other error has happened. + ingestError = true; + } + + return { + success: false, + error, + docCount: data.length, + failures, + ingestError, + }; + } + } + + async function createPipeline(id: string, pipeline: any) { + return await callAsCurrentUser('ingest.putPipeline', { id, body: pipeline }); + } + + function getFailures(items: any[], data: InputData): Failure[] { + const failures = []; + for (let i = 0; i < items.length; i++) { + const item = items[i]; + if (item.index && item.index.error) { + failures.push({ + item: i, + reason: item.index.error.reason, + doc: data[i], + }); + } + } + return failures; + } + + return { + importData, + }; +} + +function generateId() { + return Math.random() + .toString(36) + .substr(2, 9); +} diff --git a/x-pack/legacy/plugins/ml/server/models/file_data_visualizer/index.ts b/x-pack/legacy/plugins/ml/server/models/file_data_visualizer/index.ts index 0e0289e1874c18..94529dc111696e 100644 --- a/x-pack/legacy/plugins/ml/server/models/file_data_visualizer/index.ts +++ b/x-pack/legacy/plugins/ml/server/models/file_data_visualizer/index.ts @@ -10,3 +10,5 @@ export { InputData, AnalysisResult, } from './file_data_visualizer'; + +export { importDataProvider, Settings, InjectPipeline, Mappings } from './import_data'; diff --git a/x-pack/legacy/plugins/ml/server/routes/file_data_visualizer.ts b/x-pack/legacy/plugins/ml/server/routes/file_data_visualizer.ts index a86b29a8c3e10c..530085b42c47a7 100644 --- a/x-pack/legacy/plugins/ml/server/routes/file_data_visualizer.ts +++ b/x-pack/legacy/plugins/ml/server/routes/file_data_visualizer.ts @@ -6,25 +6,49 @@ import { schema } from '@kbn/config-schema'; import { RequestHandlerContext } from 'kibana/server'; +import { MAX_BYTES } from '../../common/constants/file_datavisualizer'; import { wrapError } from '../client/error_wrapper'; import { InputOverrides, InputData, fileDataVisualizerProvider, + importDataProvider, + Settings, + InjectPipeline, + Mappings, } from '../models/file_data_visualizer'; import { licensePreRoutingFactory } from '../new_platform/licence_check_pre_routing_factory'; import { RouteInitialization } from '../new_platform/plugin'; +import { incrementFileDataVisualizerIndexCreationCount } from '../lib/ml_telemetry'; function analyzeFiles(context: RequestHandlerContext, data: InputData, overrides: InputOverrides) { const { analyzeFile } = fileDataVisualizerProvider(context); return analyzeFile(data, overrides); } +function importData( + context: RequestHandlerContext, + id: string, + index: string, + settings: Settings, + mappings: Mappings, + ingestPipeline: InjectPipeline, + data: InputData +) { + const { importData: importDataFunc } = importDataProvider(context); + return importDataFunc(id, index, settings, mappings, ingestPipeline, data); +} + /** * Routes for the file data visualizer. */ -export function fileDataVisualizerRoutes({ router, xpackMainPlugin }: RouteInitialization) { +export function fileDataVisualizerRoutes({ + router, + xpackMainPlugin, + savedObjects, + elasticsearchPlugin, +}: RouteInitialization) { /** * @apiGroup FileDataVisualizer * @@ -38,6 +62,12 @@ export function fileDataVisualizerRoutes({ router, xpackMainPlugin }: RouteIniti validate: { body: schema.any(), }, + options: { + body: { + accepts: ['text/*', 'application/json'], + maxBytes: MAX_BYTES, + }, + }, }, licensePreRoutingFactory(xpackMainPlugin, async (context, request, response) => { try { @@ -48,4 +78,57 @@ export function fileDataVisualizerRoutes({ router, xpackMainPlugin }: RouteIniti } }) ); + + router.post( + { + path: '/api/ml/file_data_visualizer/import', + validate: { + query: schema.object({ + id: schema.maybe(schema.string()), + }), + body: schema.object({ + index: schema.maybe(schema.string()), + data: schema.arrayOf(schema.any()), + settings: schema.maybe(schema.any()), + mappings: schema.any(), + ingestPipeline: schema.object({ + id: schema.string(), + pipeline: schema.maybe(schema.any()), + }), + }), + }, + options: { + body: { + accepts: ['application/json'], + maxBytes: MAX_BYTES, + }, + }, + }, + licensePreRoutingFactory(xpackMainPlugin, async (context, request, response) => { + try { + const { id } = request.query; + const { index, data, settings, mappings, ingestPipeline } = request.body; + + // `id` being `undefined` tells us that this is a new import due to create a new index. + // follow-up import calls to just add additional data will include the `id` of the created + // index, we'll ignore those and don't increment the counter. + if (id === undefined) { + await incrementFileDataVisualizerIndexCreationCount(elasticsearchPlugin, savedObjects!); + } + + const result = await importData( + context, + id, + index, + settings, + mappings, + ingestPipeline, + data + ); + return response.ok({ body: result }); + } catch (e) { + return response.customError(wrapError(e)); + } + }) + ); }