Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Generic Formatter] Get a working flat data structure for bulk upload #6

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/server/status/collectors/get_ops_stats_collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export function getOpsStatsCollector(server, kbnServer) {
type: KIBANA_STATS_TYPE,
fetch: () => {
return {
kibana: getKibanaInfoForStats(server, kbnServer),
kibana: getKibanaInfoForStats(server, kbnServer), // TODO this should probably be injected in one place at a higher level where all collector data is combined
...kbnServer.metrics // latest metrics captured from the ops event listener in src/server/status/index
};
},
Expand Down
11 changes: 8 additions & 3 deletions src/server/usage/classes/collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ export class Collector {
throw new Error('Collector must be instantiated with a options.fetch function property');
}

this.log = getCollectorLogger(server);

Object.assign(this, options); // spread in other properties and mutate "this"

this.type = type;
this.init = init;
this.fetch = fetch;

const defaultFormatterForBulkUpload = result => [ { type, payload: result } ];
const defaultFormatterForBulkUpload = result => ({ type, payload: result });
this._formatForBulkUpload = formatForBulkUpload || defaultFormatterForBulkUpload;

this.log = getCollectorLogger(server);
}

/*
Expand All @@ -65,6 +65,11 @@ export class Collector {
return this.fetch(fetchMechanisms);
}

/*
* A hook for allowing the fetched data payload to be organized into a typed
* data model for internal bulk upload. See defaultFormatterForBulkUpload for
* a generic example.
*/
formatForBulkUpload(result) {
return this._formatForBulkUpload(result);
}
Expand Down
18 changes: 2 additions & 16 deletions src/server/usage/classes/collector_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { snakeCase, isEmpty } from 'lodash';
import { snakeCase } from 'lodash';
import Promise from 'bluebird';
import { getCollectorLogger } from '../lib';
import { Collector } from './collector';
Expand All @@ -26,7 +26,7 @@ import { UsageCollector } from './usage_collector';
/*
* A collector object has types registered into it with the register(type)
* function. Each type that gets registered defines how to fetch its own data
* and combine it into a unified payload for bulk upload.
* and optionally, how to combine it into a unified payload for bulk upload.
*/
export class CollectorSet {

Expand Down Expand Up @@ -94,20 +94,6 @@ export class CollectorSet {
return Promise.all(fetchPromises);
}

bulkFormat(data) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It didn't make a lot of sense to have this in OSS; I moved it to BulkUploader.toBulkUploadFormat

return data.reduce((accum, { type, result }) => {
if (isEmpty(result)) {
return accum;
}

const payload = this.getCollectorByType(type).formatForBulkUpload(result);
return [
...accum,
payload // TODO flatten it here
];
}, []);
}

/*
* @return {new CollectorSet}
*/
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/monitoring/common/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export const MONITORING_SYSTEM_API_VERSION = '6';
* The type name used within the Monitoring index to publish Kibana ops stats.
* @type {string}
*/
export const KIBANA_STATS_TYPE_MONITORING = 'kibana_stats_monitoring'; // similar to KIBANA_STATS_TYPE but rolled up into 10s stats from 5s intervals through ops_buffer
export const KIBANA_STATS_TYPE_MONITORING = 'kibana_stats_internal'; // similar to KIBANA_STATS_TYPE but rolled up into 10s stats from 5s intervals through ops_buffer
/**
* The type name used within the Monitoring index to publish Kibana stats.
* @type {string}
Expand Down
61 changes: 29 additions & 32 deletions x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { compact, flatten, uniq } from 'lodash';
import { defaultsDeep, isEmpty, uniq } from 'lodash';
import { callClusterFactory } from '../../../xpack_main';
import {
LOGGING_TAG,
Expand All @@ -13,6 +13,7 @@ import {
import {
sendBulkPayload,
monitoringBulk,
getKibanaInfoForStats,
} from './lib';

const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
Expand All @@ -34,7 +35,7 @@ const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
* @param {Object} xpackInfo server.plugins.xpack_main.info object
*/
export class BulkUploader {
constructor(server, { interval }) {
constructor(server, { kbnServer, interval }) {
if (typeof interval !== 'number') {
throw new Error('interval number of milliseconds is required');
}
Expand All @@ -53,6 +54,7 @@ export class BulkUploader {

this._callClusterWithInternalUser = callClusterFactory(server).getCallClusterInternal();
this._savedObjectsClient = server.savedObjects.getUnscopedSavedObjectsClient(this._callClusterWithInternalUser);
this._getKibanaInfoForStats = () => getKibanaInfoForStats(server, kbnServer);
}

/*
Expand Down Expand Up @@ -100,7 +102,7 @@ export class BulkUploader {
callCluster: this._callClusterWithInternalUser,
savedObjectsClient: this._savedObjectsClient,
});
const payload = BulkUploader.toBulkUploadFormat(data, collectorSet);
const payload = this.toBulkUploadFormat(data, collectorSet);

if (payload) {
try {
Expand All @@ -119,41 +121,36 @@ export class BulkUploader {
return sendBulkPayload(this._client, this._interval, payload);
}

static deepMergeUploadData(uploadData, collectorSet) {
const deepMergeAndGroup = collectorSet.bulkFormat(uploadData).reduce((accum, datas) => {
for (const data of datas) {
accum[data.type] = accum[data.type] || {};
for (const key in data.payload) {
if (typeof accum[data.type][key] === 'object') {
accum[data.type][key] = {
...accum[data.type][key],
...data.payload[key]
};
} else {
accum[data.type][key] = data.payload[key];
}
}
/*
* Bulk stats are transformed into a bulk upload format
* Non-legacy transformation is done in CollectorSet.toApiStats
*/
toBulkUploadFormat(rawData, collectorSet) {
// convert the raw data to a nested object by taking each payload through
// its formatter, organizing it per-type
const typesNested = rawData.reduce((accum, { type, result }) => {
if (isEmpty(result)) {
return accum;
}
return accum;
const { type: uploadType, payload: uploadData } = collectorSet.getCollectorByType(type).formatForBulkUpload(result);
return defaultsDeep(accum, { [uploadType]: uploadData });
}, {});

return Object.keys(deepMergeAndGroup).reduce((accum, type) => {
accum.push([
// convert the nested object into a flat array, with each payload prefixed
// with an 'index' instruction, for bulk upload
const flat = Object.keys(typesNested).reduce((accum, type) => {
return [
...accum,
{ index: { _type: type } },
deepMergeAndGroup[type],
]);
return accum;
{
kibana: this._getKibanaInfoForStats(),
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to make BulkUploader.toBulkUploadFormat a non-static method to support this

interval_ms: this._interval,
...typesNested[type],
}
];
}, []);
}

/*
* Bulk stats are transformed into a bulk upload format
* Non-legacy transformation is done in CollectorSet.toApiStats
*/
static toBulkUploadFormat(uploadData, collectorSet) {
if (compact(uploadData).length > 0) {
return flatten(BulkUploader.deepMergeUploadData(uploadData, collectorSet));
}
return flat;
}

static checkPayloadTypesUnique(payload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ export function getKibanaUsageCollector(server) {
return savedObjectsClient.summarize();
},
formatForBulkUpload: result => {
return [{
return {
type: 'kibana_stats',
payload: {
usage: result
}
}];
};
}
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@

import { KIBANA_STATS_TYPE_MONITORING } from '../../../common/constants';
import { opsBuffer } from './ops_buffer';
import { getKibanaInfoForStats } from '../lib';

/*
* Initialize a collector for Kibana Ops Stats
*/
export function getOpsStatsCollector(server, kbnServer) {
export function getOpsStatsCollector(server) {
let monitor;
const buffer = opsBuffer(server);
const onOps = event => buffer.push(event);
Expand Down Expand Up @@ -48,28 +47,13 @@ export function getOpsStatsCollector(server, kbnServer) {
type: KIBANA_STATS_TYPE_MONITORING,
init: start,
fetch: () => {
return {
kibana: getKibanaInfoForStats(server, kbnServer),
...buffer.flush()
};
return buffer.flush();
},
formatForBulkUpload: result => {
const { kibana, ...rest } = result;
return [
{
type: 'kibana_stats',
payload: {
kibana,
...rest,
}
},
{
type: 'kibana_settings',
payload: {
kibana,
}
}
];
return {
type: 'kibana_stats',
payload: result
};
}
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import { get } from 'lodash';
import { XPACK_DEFAULT_ADMIN_EMAIL_UI_SETTING } from '../../../../../server/lib/constants';
import { KIBANA_SETTINGS_TYPE } from '../../../common/constants';
import { getKibanaInfoForStats } from '../lib';

/*
* Check if Cluster Alert email notifications is enabled in config
Expand Down Expand Up @@ -54,7 +53,7 @@ export async function checkForEmailValue(
}
}

export function getSettingsCollector(server, kbnServer) {
export function getSettingsCollector(server) {
const config = server.config();
const { collectorSet } = server.usage;

Expand All @@ -79,10 +78,13 @@ export function getSettingsCollector(server, kbnServer) {
// remember the current email so that we can mark it as successful if the bulk does not error out
shouldUseNull = !!defaultAdminEmail;

return kibanaSettingsData;
},
formatForBulkUpload: result => {
return {
kibana: getKibanaInfoForStats(server, kbnServer),
...kibanaSettingsData
type: 'kibana_settings',
payload: result
};
},
}
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ export function opsBuffer(server) {
},

flush() {
let cloud; // a property that will be left out of the result if the details are undefined
const cloudDetails = cloudDetector.getCloudDetails();
if (cloudDetails != null) {
cloud = { cloud: cloudDetails };
}

return {
cloud: cloudDetector.getCloudDetails(),
...cloud,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For checking if payloads are empty, this field needed to be taken out when value is undefined

...eventRoller.flush()
};
}
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugins/monitoring/server/kibana_monitoring/init.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ import { BulkUploader } from './bulk_uploader';
* @param {Object} kbnServer manager of Kibana services - see `src/server/kbn_server` in Kibana core
* @param {Object} server HapiJS server instance
*/
export function initBulkUploader(_kbnServer, server) {
export function initBulkUploader(kbnServer, server) {
const config = server.config();
const interval = config.get('xpack.monitoring.kibana.collection.interval');

return new BulkUploader(server, {
kbnServer,
interval
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export function getReportingUsageCollector(server) {
};
},
formatForBulkUpload: result => {
return [{
return {
type: 'kibana_stats',
payload: {
usage: {
Expand All @@ -158,7 +158,7 @@ export function getReportingUsageCollector(server) {
}
}
}
}];
};
}
});
}