-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Generic Formatter] Get a working flat data structure for bulk upload #6
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,7 @@ | |
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import { compact, flatten, uniq } from 'lodash'; | ||
import { defaultsDeep, isEmpty, uniq } from 'lodash'; | ||
import { callClusterFactory } from '../../../xpack_main'; | ||
import { | ||
LOGGING_TAG, | ||
|
@@ -13,6 +13,7 @@ import { | |
import { | ||
sendBulkPayload, | ||
monitoringBulk, | ||
getKibanaInfoForStats, | ||
} from './lib'; | ||
|
||
const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG]; | ||
|
@@ -34,7 +35,7 @@ const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG]; | |
* @param {Object} xpackInfo server.plugins.xpack_main.info object | ||
*/ | ||
export class BulkUploader { | ||
constructor(server, { interval }) { | ||
constructor(server, { kbnServer, interval }) { | ||
if (typeof interval !== 'number') { | ||
throw new Error('interval number of milliseconds is required'); | ||
} | ||
|
@@ -53,6 +54,7 @@ export class BulkUploader { | |
|
||
this._callClusterWithInternalUser = callClusterFactory(server).getCallClusterInternal(); | ||
this._savedObjectsClient = server.savedObjects.getUnscopedSavedObjectsClient(this._callClusterWithInternalUser); | ||
this._getKibanaInfoForStats = () => getKibanaInfoForStats(server, kbnServer); | ||
} | ||
|
||
/* | ||
|
@@ -100,7 +102,7 @@ export class BulkUploader { | |
callCluster: this._callClusterWithInternalUser, | ||
savedObjectsClient: this._savedObjectsClient, | ||
}); | ||
const payload = BulkUploader.toBulkUploadFormat(data, collectorSet); | ||
const payload = this.toBulkUploadFormat(data, collectorSet); | ||
|
||
if (payload) { | ||
try { | ||
|
@@ -119,41 +121,36 @@ export class BulkUploader { | |
return sendBulkPayload(this._client, this._interval, payload); | ||
} | ||
|
||
static deepMergeUploadData(uploadData, collectorSet) { | ||
const deepMergeAndGroup = collectorSet.bulkFormat(uploadData).reduce((accum, datas) => { | ||
for (const data of datas) { | ||
accum[data.type] = accum[data.type] || {}; | ||
for (const key in data.payload) { | ||
if (typeof accum[data.type][key] === 'object') { | ||
accum[data.type][key] = { | ||
...accum[data.type][key], | ||
...data.payload[key] | ||
}; | ||
} else { | ||
accum[data.type][key] = data.payload[key]; | ||
} | ||
} | ||
/* | ||
* Bulk stats are transformed into a bulk upload format | ||
* Non-legacy transformation is done in CollectorSet.toApiStats | ||
*/ | ||
toBulkUploadFormat(rawData, collectorSet) { | ||
// convert the raw data to a nested object by taking each payload through | ||
// its formatter, organizing it per-type | ||
const typesNested = rawData.reduce((accum, { type, result }) => { | ||
if (isEmpty(result)) { | ||
return accum; | ||
} | ||
return accum; | ||
const { type: uploadType, payload: uploadData } = collectorSet.getCollectorByType(type).formatForBulkUpload(result); | ||
return defaultsDeep(accum, { [uploadType]: uploadData }); | ||
}, {}); | ||
|
||
return Object.keys(deepMergeAndGroup).reduce((accum, type) => { | ||
accum.push([ | ||
// convert the nested object into a flat array, with each payload prefixed | ||
// with an 'index' instruction, for bulk upload | ||
const flat = Object.keys(typesNested).reduce((accum, type) => { | ||
return [ | ||
...accum, | ||
{ index: { _type: type } }, | ||
deepMergeAndGroup[type], | ||
]); | ||
return accum; | ||
{ | ||
kibana: this._getKibanaInfoForStats(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to make |
||
interval_ms: this._interval, | ||
...typesNested[type], | ||
} | ||
]; | ||
}, []); | ||
} | ||
|
||
/* | ||
* Bulk stats are transformed into a bulk upload format | ||
* Non-legacy transformation is done in CollectorSet.toApiStats | ||
*/ | ||
static toBulkUploadFormat(uploadData, collectorSet) { | ||
if (compact(uploadData).length > 0) { | ||
return flatten(BulkUploader.deepMergeUploadData(uploadData, collectorSet)); | ||
} | ||
return flat; | ||
} | ||
|
||
static checkPayloadTypesUnique(payload) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,8 +27,14 @@ export function opsBuffer(server) { | |
}, | ||
|
||
flush() { | ||
let cloud; // a property that will be left out of the result if the details are undefined | ||
const cloudDetails = cloudDetector.getCloudDetails(); | ||
if (cloudDetails != null) { | ||
cloud = { cloud: cloudDetails }; | ||
} | ||
|
||
return { | ||
cloud: cloudDetector.getCloudDetails(), | ||
...cloud, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For checking if payloads are empty, this field needed to be taken out when value is undefined |
||
...eventRoller.flush() | ||
}; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It didn't make a lot of sense to have this in OSS; I moved it to
BulkUploader.toBulkUploadFormat