-
Notifications
You must be signed in to change notification settings - Fork 8.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EMT-661: use new metadata current #74394
Changes from 32 commits
c7c37ff
b222ae8
a518df4
c0a4d84
3603f5e
a356c92
70c5311
482c054
614cfd8
106628d
30b7e7a
d5578b6
74c4988
0ad91e9
b56369a
eca0a8d
edb1d30
e09e512
13442a8
77e4395
d450f9d
b0a51a2
cbf7d2d
c21b7c8
349fcd1
4f8a9e6
d592bcb
07bc7ba
b00c65c
da3ad7c
eb0c4df
5e84d99
2786976
a6225eb
7f26338
5e8ad6e
c2fae69
cc1d123
5dafd4d
4affdb1
301a299
0d8ccee
bdf394c
0bf0fa3
809a3d7
f22d28f
1bb921b
952824e
1b769c3
51302c7
2bc5d3a
db0b200
6e1532d
192a1c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import { SavedObjectsClientContract } from 'kibana/server'; | ||
|
||
import { saveInstalledEsRefs } from '../../packages/install'; | ||
import * as Registry from '../../registry'; | ||
import { | ||
Dataset, | ||
ElasticsearchAssetType, | ||
EsAssetReference, | ||
RegistryPackage, | ||
} from '../../../../../common/types/models'; | ||
import { CallESAsCurrentUser } from '../../../../types'; | ||
import { getInstallation } from '../../packages'; | ||
import { deleteTransforms, deleteTransformRefs } from './remove'; | ||
import { getAsset } from '../../registry'; | ||
|
||
interface TransformInstallation { | ||
installationName: string; | ||
content: string; | ||
} | ||
|
||
interface TransformPathDataset { | ||
path: string; | ||
dataset: Dataset; | ||
} | ||
|
||
export const installTransformForDataset = async ( | ||
registryPackage: RegistryPackage, | ||
paths: string[], | ||
callCluster: CallESAsCurrentUser, | ||
savedObjectsClient: SavedObjectsClientContract | ||
) => { | ||
const installation = await getInstallation({ savedObjectsClient, pkgName: registryPackage.name }); | ||
let previousInstalledTransformEsAssets: EsAssetReference[] = []; | ||
if (installation) { | ||
previousInstalledTransformEsAssets = installation.installed_es.filter( | ||
({ type, id }) => type === ElasticsearchAssetType.transform | ||
); | ||
} | ||
|
||
// delete all previous transform | ||
await deleteTransforms( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The thing that jumps out at me here is that there are no There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, even if I put a try catch I will still have to re-throw the error. As per discussions I have had, the is supposed to be a global catch that will initiate a rollback. My initial plan was to restart the old transform, but that required that transform for the same version be differentiated . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, it should be handle globally as otherwise we have a package which is partially new and partially old. Either all the assets need to be new or all need to be old. |
||
callCluster, | ||
previousInstalledTransformEsAssets.map((asset) => asset.id) | ||
); | ||
// install the latest dataset | ||
const datasets = registryPackage.datasets; | ||
if (!datasets?.length) return []; | ||
const installNameSuffix = `${registryPackage.version}`; | ||
|
||
const transformPaths = paths.filter((path) => isTransform(path)); | ||
|
||
const transformPathDatasets = datasets.reduce<TransformPathDataset[]>((acc, dataset) => { | ||
neptunian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
transformPaths.forEach((path) => { | ||
if (isDatasetTransform(path, dataset.path)) { | ||
acc.push({ path, dataset }); | ||
} | ||
}); | ||
return acc; | ||
}, []); | ||
|
||
const transformRefs = transformPathDatasets.reduce<EsAssetReference[]>( | ||
(acc, transformPathDataset) => { | ||
if (transformPathDataset) { | ||
acc.push({ | ||
id: getTransformNameForInstallation(transformPathDataset, installNameSuffix), | ||
type: ElasticsearchAssetType.transform, | ||
}); | ||
} | ||
return acc; | ||
}, | ||
[] | ||
); | ||
|
||
// get and save transform refs before installing transforms | ||
await saveInstalledEsRefs(savedObjectsClient, registryPackage.name, transformRefs); | ||
neptunian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
const transforms: TransformInstallation[] = transformPathDatasets.map( | ||
(transformPathDataset: TransformPathDataset) => { | ||
return { | ||
installationName: getTransformNameForInstallation(transformPathDataset, installNameSuffix), | ||
content: getAsset(transformPathDataset.path).toString('utf-8'), | ||
}; | ||
} | ||
); | ||
|
||
const installationPromises = transforms.map(async (transform) => { | ||
return installTransform({ callCluster, transform }); | ||
}); | ||
|
||
const installedTransforms = await Promise.all([ | ||
Promise.all(installationPromises), | ||
]).then((results) => results.flat()); | ||
|
||
const currentInstallation = await getInstallation({ | ||
neptunian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
savedObjectsClient, | ||
pkgName: registryPackage.name, | ||
}); | ||
|
||
// remove the saved object reference | ||
await deleteTransformRefs( | ||
savedObjectsClient, | ||
currentInstallation?.installed_es || [], | ||
registryPackage.name, | ||
previousInstalledTransformEsAssets.map((asset) => asset.id), | ||
installedTransforms.map((installed) => installed.id) | ||
); | ||
|
||
return installedTransforms; | ||
}; | ||
|
||
const isTransform = (path: string) => { | ||
const pathParts = Registry.pathParts(path); | ||
return pathParts.type === ElasticsearchAssetType.transform; | ||
}; | ||
|
||
const isDatasetTransform = (path: string, datasetName: string) => { | ||
const pathParts = Registry.pathParts(path); | ||
return ( | ||
!path.endsWith('/') && | ||
pathParts.type === ElasticsearchAssetType.transform && | ||
pathParts.dataset !== undefined && | ||
datasetName === pathParts.dataset | ||
); | ||
}; | ||
|
||
async function installTransform({ | ||
callCluster, | ||
transform, | ||
}: { | ||
callCluster: CallESAsCurrentUser; | ||
transform: TransformInstallation; | ||
}): Promise<EsAssetReference> { | ||
const callClusterParams: { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was confused at first by the name of this variable as I thought it might be something that is reused. But this is about creating the transform and as far as I can see, it is not reused. If we use a var, I would defined it closer to where it is used or specify it inline like you did on line 153. Otherwise reading the code is a bit confusing as I first read code that creates the transform and then deletes it, but turns out it happens in the other order. At least I was confused at first. |
||
method: string; | ||
path: string; | ||
query: string; | ||
ignore?: number[]; | ||
body: any; | ||
headers?: any; | ||
} = { | ||
method: 'PUT', | ||
path: `/_transform/${transform.installationName}`, | ||
query: 'defer_validation=true', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps worth a comment that this is because the source index does not exist? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comment. |
||
body: transform.content, | ||
}; | ||
|
||
await callCluster('transport.request', { | ||
neptunian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
method: 'DELETE', | ||
query: 'force=true', | ||
path: `_transform/${transform.installationName}`, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All the other paths start with a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rectified this, it did not make a difference, but good to be consistent. |
||
ignore: [404, 400], | ||
}); | ||
await callCluster('transport.request', callClusterParams); | ||
await callCluster('transport.request', { | ||
method: 'POST', | ||
path: `/_transform/${transform.installationName}/_start`, | ||
}); | ||
|
||
return { id: transform.installationName, type: ElasticsearchAssetType.transform }; | ||
} | ||
|
||
const getTransformNameForInstallation = ( | ||
transformDataset: TransformPathDataset, | ||
suffix: string | ||
) => { | ||
const filename = transformDataset?.path.split('/')?.pop()?.split('.')[0]; | ||
neptunian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return `${transformDataset.dataset.type}-${transformDataset.dataset.name}-${filename}-${suffix}`; | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import { SavedObjectsClientContract } from 'kibana/server'; | ||
import { CallESAsCurrentUser, ElasticsearchAssetType, EsAssetReference } from '../../../../types'; | ||
import { PACKAGES_SAVED_OBJECT_TYPE } from '../../../../../common/constants'; | ||
|
||
export const stopTransforms = async (transformIds: string[], callCluster: CallESAsCurrentUser) => { | ||
for (const transformId of transformIds) { | ||
await callCluster('transport.request', { | ||
ruflin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
method: 'POST', | ||
path: `/_transform/${transformId}/_stop`, | ||
query: 'force=true', | ||
ignore: [404], | ||
}); | ||
} | ||
}; | ||
|
||
export const deleteTransforms = async ( | ||
callCluster: CallESAsCurrentUser, | ||
transformIds: string[] | ||
) => { | ||
for (const transformId of transformIds) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the case where there are many transforms to delete, this could take a while as it awaits each transform before proceeding to the next. Is there a reason we can't use map and Promise.all to make the requests happen at once? |
||
await stopTransforms([transformId], callCluster); | ||
await callCluster('transport.request', { | ||
method: 'DELETE', | ||
query: 'force=true', | ||
path: `_transform/${transformId}`, | ||
ignore: [404], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the above delete command, you ignore 404 and 400. What is the difference? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed all the 400, it is a vestige of my initial testing. |
||
}); | ||
} | ||
}; | ||
|
||
export const deleteTransformRefs = async ( | ||
savedObjectsClient: SavedObjectsClientContract, | ||
installedEsAssets: EsAssetReference[], | ||
pkgName: string, | ||
installedEsIdToRemove: string[], | ||
currentInstalledEsTransformIds: string[] | ||
) => { | ||
const filteredAssets = installedEsAssets.filter(({ type, id }) => { | ||
if (type !== ElasticsearchAssetType.transform) return true; | ||
return currentInstalledEsTransformIds.includes(id) || !installedEsIdToRemove.includes(id); | ||
neptunian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}); | ||
return savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, { | ||
installed_es: filteredAssets, | ||
}); | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @ruflin