Skip to content

Commit

Permalink
[Maps] Geo containment latency and concurrent containment fix (#86980)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Caldwell authored Jan 28, 2021
1 parent 7593cf7 commit 80b720d
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ export interface GeoContainmentAlertParams extends AlertTypeParams {
boundaryIndexId: string;
boundaryGeoField: string;
boundaryNameField?: string;
delayOffsetWithUnits?: string;
indexQuery?: Query;
boundaryIndexQuery?: Query;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ export const ParamsSchema = schema.object({
boundaryIndexId: schema.string({ minLength: 1 }),
boundaryGeoField: schema.string({ minLength: 1 }),
boundaryNameField: schema.maybe(schema.string({ minLength: 1 })),
delayOffsetWithUnits: schema.maybe(schema.string({ minLength: 1 })),
indexQuery: schema.maybe(schema.any({})),
boundaryIndexQuery: schema.maybe(schema.any({})),
});
Expand All @@ -114,7 +113,6 @@ export interface GeoContainmentParams extends AlertTypeParams {
boundaryIndexId: string;
boundaryGeoField: string;
boundaryNameField?: string;
delayOffsetWithUnits?: string;
indexQuery?: Query;
boundaryIndexQuery?: Query;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export function transformResults(
results: SearchResponse<unknown> | undefined,
dateField: string,
geoField: string
): Map<string, LatestEntityLocation> {
): Map<string, LatestEntityLocation[]> {
if (!results) {
return new Map();
}
Expand Down Expand Up @@ -64,12 +64,15 @@ export function transformResults(
// Get unique
.reduce(
(
accu: Map<string, LatestEntityLocation>,
accu: Map<string, LatestEntityLocation[]>,
el: LatestEntityLocation & { entityName: string }
) => {
const { entityName, ...locationData } = el;
if (!accu.has(entityName)) {
accu.set(entityName, locationData);
if (entityName) {
if (!accu.has(entityName)) {
accu.set(entityName, []);
}
accu.get(entityName)!.push(locationData);
}
return accu;
},
Expand All @@ -78,26 +81,9 @@ export function transformResults(
return orderedResults;
}

function getOffsetTime(delayOffsetWithUnits: string, oldTime: Date): Date {
const timeUnit = delayOffsetWithUnits.slice(-1);
const time: number = +delayOffsetWithUnits.slice(0, -1);

const adjustedDate = new Date(oldTime.getTime());
if (timeUnit === 's') {
adjustedDate.setSeconds(adjustedDate.getSeconds() - time);
} else if (timeUnit === 'm') {
adjustedDate.setMinutes(adjustedDate.getMinutes() - time);
} else if (timeUnit === 'h') {
adjustedDate.setHours(adjustedDate.getHours() - time);
} else if (timeUnit === 'd') {
adjustedDate.setDate(adjustedDate.getDate() - time);
}
return adjustedDate;
}

export function getActiveEntriesAndGenerateAlerts(
prevLocationMap: Record<string, LatestEntityLocation>,
currLocationMap: Map<string, LatestEntityLocation>,
prevLocationMap: Map<string, LatestEntityLocation[]>,
currLocationMap: Map<string, LatestEntityLocation[]>,
alertInstanceFactory: AlertServices<
GeoContainmentInstanceState,
GeoContainmentInstanceContext,
Expand All @@ -106,32 +92,55 @@ export function getActiveEntriesAndGenerateAlerts(
shapesIdsNamesMap: Record<string, unknown>,
currIntervalEndTime: Date
) {
const allActiveEntriesMap: Map<string, LatestEntityLocation> = new Map([
...Object.entries(prevLocationMap || {}),
const allActiveEntriesMap: Map<string, LatestEntityLocation[]> = new Map([
...prevLocationMap,
...currLocationMap,
]);
allActiveEntriesMap.forEach(({ location, shapeLocationId, dateInShape, docId }, entityName) => {
const containingBoundaryName = shapesIdsNamesMap[shapeLocationId] || shapeLocationId;
const context = {
entityId: entityName,
entityDateTime: dateInShape ? new Date(dateInShape).toISOString() : null,
entityDocumentId: docId,
detectionDateTime: new Date(currIntervalEndTime).toISOString(),
entityLocation: `POINT (${location[0]} ${location[1]})`,
containingBoundaryId: shapeLocationId,
containingBoundaryName,
};
const alertInstanceId = `${entityName}-${containingBoundaryName}`;
if (shapeLocationId === OTHER_CATEGORY) {
allActiveEntriesMap.forEach((locationsArr, entityName) => {
// Generate alerts
locationsArr.forEach(({ location, shapeLocationId, dateInShape, docId }) => {
const context = {
entityId: entityName,
entityDateTime: dateInShape ? new Date(dateInShape).toISOString() : null,
entityDocumentId: docId,
detectionDateTime: new Date(currIntervalEndTime).toISOString(),
entityLocation: `POINT (${location[0]} ${location[1]})`,
containingBoundaryId: shapeLocationId,
containingBoundaryName: shapesIdsNamesMap[shapeLocationId] || shapeLocationId,
};
const alertInstanceId = `${entityName}-${context.containingBoundaryName}`;
if (shapeLocationId !== OTHER_CATEGORY) {
alertInstanceFactory(alertInstanceId).scheduleActions(ActionGroupId, context);
}
});

if (locationsArr[0].shapeLocationId === OTHER_CATEGORY) {
allActiveEntriesMap.delete(entityName);
return;
}

const otherCatIndex = locationsArr.findIndex(
({ shapeLocationId }) => shapeLocationId === OTHER_CATEGORY
);
if (otherCatIndex >= 0) {
const afterOtherLocationsArr = locationsArr.slice(0, otherCatIndex);
allActiveEntriesMap.set(entityName, afterOtherLocationsArr);
} else {
alertInstanceFactory(alertInstanceId).scheduleActions(ActionGroupId, context);
allActiveEntriesMap.set(entityName, locationsArr);
}
});
return allActiveEntriesMap;
}

export const getGeoContainmentExecutor = (log: Logger): GeoContainmentAlertType['executor'] =>
async function ({ previousStartedAt, startedAt, services, params, alertId, state }) {
async function ({
previousStartedAt: currIntervalStartTime,
startedAt: currIntervalEndTime,
services,
params,
alertId,
state,
}) {
const { shapesFilters, shapesIdsNamesMap } = state.shapesFilters
? state
: await getShapesFilters(
Expand All @@ -147,15 +156,6 @@ export const getGeoContainmentExecutor = (log: Logger): GeoContainmentAlertType[

const executeEsQuery = await executeEsQueryFactory(params, services, log, shapesFilters);

let currIntervalStartTime = previousStartedAt;
let currIntervalEndTime = startedAt;
if (params.delayOffsetWithUnits) {
if (currIntervalStartTime) {
currIntervalStartTime = getOffsetTime(params.delayOffsetWithUnits, currIntervalStartTime);
}
currIntervalEndTime = getOffsetTime(params.delayOffsetWithUnits, currIntervalEndTime);
}

// Start collecting data only on the first cycle
let currentIntervalResults: SearchResponse<unknown> | undefined;
if (!currIntervalStartTime) {
Expand All @@ -169,14 +169,17 @@ export const getGeoContainmentExecutor = (log: Logger): GeoContainmentAlertType[
currentIntervalResults = await executeEsQuery(currIntervalStartTime, currIntervalEndTime);
}

const currLocationMap: Map<string, LatestEntityLocation> = transformResults(
const currLocationMap: Map<string, LatestEntityLocation[]> = transformResults(
currentIntervalResults,
params.dateField,
params.geoField
);

const prevLocationMap: Map<string, LatestEntityLocation[]> = new Map([
...Object.entries((state.prevLocationMap as Record<string, LatestEntityLocation[]>) || {}),
]);
const allActiveEntriesMap = getActiveEntriesAndGenerateAlerts(
state.prevLocationMap as Record<string, LatestEntityLocation>,
prevLocationMap,
currLocationMap,
services.alertInstanceFactory,
shapesIdsNamesMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ describe('alertType', () => {
boundaryIndexId: 'testIndex',
boundaryGeoField: 'testField',
boundaryNameField: 'testField',
delayOffsetWithUnits: 'testOffset',
};

expect(alertType.validate?.params?.validate(params)).toBeTruthy();
Expand Down
Loading

0 comments on commit 80b720d

Please sign in to comment.