Skip to content

Commit

Permalink
Added ability to call analytics report manually (#7805)
Browse files Browse the repository at this point in the history
  • Loading branch information
bsekachev committed May 3, 2024
1 parent af182a3 commit 39afcd4
Show file tree
Hide file tree
Showing 23 changed files with 481 additions and 389 deletions.
4 changes: 4 additions & 0 deletions changelog.d/20240426_143800_boris_manual_analytics_report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Changed

- Analytics reports calculation may be initiated manually instead of automatic scheduling
(<https://github.com/cvat-ai/cvat/pull/7805>)
7 changes: 4 additions & 3 deletions cvat-core/src/analytics-report.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright (C) 2023 CVAT.ai Corporation
// Copyright (C) 2023-2024 CVAT.ai Corporation
//
// SPDX-License-Identifier: MIT

import {
SerializedAnalyticsEntry, SerializedAnalyticsReport, SerializedDataEntry, SerializedTransformationEntry,
SerializedAnalyticsEntry, SerializedAnalyticsReport,
SerializedDataEntry, SerializedTransformationEntry,
} from './server-response-types';
import { ArgumentError } from './exceptions';

Expand Down Expand Up @@ -126,7 +127,7 @@ export default class AnalyticsReport {
#statistics: AnalyticsEntry[];

constructor(initialData: SerializedAnalyticsReport) {
this.#id = initialData.id;
this.#id = initialData.job_id || initialData.task_id || initialData.project_id;
this.#target = initialData.target as AnalyticsReportTarget;
this.#createdDate = initialData.created_date;
this.#statistics = [];
Expand Down
36 changes: 30 additions & 6 deletions cvat-core/src/api-implementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import CloudStorage from './cloud-storage';
import Organization, { Invitation } from './organization';
import Webhook from './webhook';
import { ArgumentError } from './exceptions';
import { SerializedAsset } from './server-response-types';
import {
AnalyticsReportFilter, QualityConflictsFilter, QualityReportsFilter,
QualitySettingsFilter, SerializedAsset,
} from './server-response-types';
import QualityReport from './quality-report';
import QualityConflict, { ConflictSeverity } from './quality-conflict';
import QualitySettings from './quality-settings';
Expand Down Expand Up @@ -403,7 +406,7 @@ export default function implementAPI(cvat: CVATCore): CVATCore {
return webhooks;
});

implementationMixin(cvat.analytics.quality.reports, async (filter) => {
implementationMixin(cvat.analytics.quality.reports, async (filter: QualityReportsFilter) => {
checkFilter(filter, {
page: isInteger,
pageSize: isPageSize,
Expand All @@ -426,7 +429,7 @@ export default function implementAPI(cvat: CVATCore): CVATCore {
);
return reports;
});
implementationMixin(cvat.analytics.quality.conflicts, async (filter) => {
implementationMixin(cvat.analytics.quality.conflicts, async (filter: QualityConflictsFilter) => {
checkFilter(filter, {
reportID: isInteger,
});
Expand Down Expand Up @@ -502,7 +505,7 @@ export default function implementAPI(cvat: CVATCore): CVATCore {

return mergedConflicts;
});
implementationMixin(cvat.analytics.quality.settings.get, async (filter) => {
implementationMixin(cvat.analytics.quality.settings.get, async (filter: QualitySettingsFilter) => {
checkFilter(filter, {
taskID: isInteger,
});
Expand All @@ -512,7 +515,7 @@ export default function implementAPI(cvat: CVATCore): CVATCore {
const settings = await serverProxy.analytics.quality.settings.get(params);
return new QualitySettings({ ...settings });
});
implementationMixin(cvat.analytics.performance.reports, async (filter) => {
implementationMixin(cvat.analytics.performance.reports, async (filter: AnalyticsReportFilter) => {
checkFilter(filter, {
jobID: isInteger,
taskID: isInteger,
Expand All @@ -527,9 +530,30 @@ export default function implementAPI(cvat: CVATCore): CVATCore {
const reportData = await serverProxy.analytics.performance.reports(params);
return new AnalyticsReport(reportData);
});
implementationMixin(cvat.analytics.performance.calculate, async (
body: Parameters<CVATCore['analytics']['performance']['calculate']>[0],
onUpdate: Parameters<CVATCore['analytics']['performance']['calculate']>[1],
) => {
checkFilter(body, {
jobID: isInteger,
taskID: isInteger,
projectID: isInteger,
});

checkExclusiveFields(body, ['jobID', 'taskID', 'projectID'], []);
if (!('jobID' in body || 'taskID' in body || 'projectID' in body)) {
throw new ArgumentError('One of "jobID", "taskID", "projectID" is required, but not provided');
}

const params = fieldsToSnakeCase(body);
await serverProxy.analytics.performance.calculate(params, onUpdate);
});
implementationMixin(cvat.frames.getMeta, async (type, id) => {
const result = await serverProxy.frames.getMeta(type, id);
return new FramesMetaData({ ...result });
return new FramesMetaData({
...result,
deleted_frames: Object.fromEntries(result.deleted_frames.map((_frame) => [_frame, true])),
});
});

return cvat;
Expand Down
8 changes: 8 additions & 0 deletions cvat-core/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,14 @@ function build(): CVATCore {
const result = await PluginRegistry.apiWrapper(cvat.analytics.performance.reports, filter);
return result;
},
async calculate(body, onUpdate) {
const result = await PluginRegistry.apiWrapper(
cvat.analytics.performance.calculate,
body,
onUpdate,
);
return result;
},
},
quality: {
async reports(filter = {}) {
Expand Down
14 changes: 9 additions & 5 deletions cvat-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ export default interface CVATCore {
projects: {
get: (
filter: {
id: number;
page: number;
search: string;
sort: string;
filter: string;
id?: number;
page?: number;
search?: string;
sort?: string;
filter?: string;
}
) => Promise<PaginatedResource<Project>>;
searchNames: any;
Expand Down Expand Up @@ -141,6 +141,10 @@ export default interface CVATCore {
};
performance: {
reports: (filter: AnalyticsReportFilter) => Promise<AnalyticsReport>;
calculate: (
body: { jobID?: number; taskID?: number; projectID?: number; },
onUpdate: (status: enums.RQStatus, progress: number, message: string) => void,
) => Promise<void>;
};
};
frames: {
Expand Down
121 changes: 102 additions & 19 deletions cvat-core/src/server-proxy.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (C) 2019-2022 Intel Corporation
// Copyright (C) 2022-2023 CVAT.ai Corporation
// Copyright (C) 2022-2024 CVAT.ai Corporation
//
// SPDX-License-Identifier: MIT

Expand All @@ -16,8 +16,8 @@ import {
SerializedAbout, SerializedRemoteFile, SerializedUserAgreement,
SerializedRegister, JobsFilter, SerializedJob, SerializedGuide, SerializedAsset, SerializedAPISchema,
SerializedInvitationData, SerializedCloudStorage, SerializedFramesMetaData, SerializedCollection,
SerializedQualitySettingsData, ApiQualitySettingsFilter, SerializedQualityConflictData, ApiQualityConflictsFilter,
SerializedQualityReportData, ApiQualityReportsFilter, SerializedAnalyticsReport, ApiAnalyticsReportFilter,
SerializedQualitySettingsData, APIQualitySettingsFilter, SerializedQualityConflictData, APIQualityConflictsFilter,
SerializedQualityReportData, APIQualityReportsFilter, SerializedAnalyticsReport, APIAnalyticsReportFilter,
} from './server-response-types';
import { PaginatedResource } from './core-types';
import { Storage } from './storage';
Expand Down Expand Up @@ -1185,18 +1185,20 @@ async function restoreProject(storage: Storage, file: File | string) {
return wait();
}

const listenToCreateCallbacks: Record<number, {
promise: Promise<SerializedTask>;
type LongProcessListener<R> = Record<number, {
promise: Promise<R>;
onUpdate: ((state: string, progress: number, message: string) => void)[];
}> = {};
}>;

const listenToCreateTaskCallbacks: LongProcessListener<SerializedTask> = {};

function listenToCreateTask(
id, onUpdate: (state: RQStatus, progress: number, message: string) => void,
): Promise<SerializedTask> {
if (id in listenToCreateCallbacks) {
listenToCreateCallbacks[id].onUpdate.push(onUpdate);
if (id in listenToCreateTaskCallbacks) {
listenToCreateTaskCallbacks[id].onUpdate.push(onUpdate);
// to avoid extra status check requests we do not create any more promises
return listenToCreateCallbacks[id].promise;
return listenToCreateTaskCallbacks[id].promise;
}

const promise = new Promise<SerializedTask>((resolve, reject) => {
Expand All @@ -1208,7 +1210,7 @@ function listenToCreateTask(
const state = response.data.state?.toLowerCase();
if ([RQStatus.QUEUED, RQStatus.STARTED].includes(state)) {
// notify all the subscribtions when data status changed
listenToCreateCallbacks[id].onUpdate.forEach((callback) => {
listenToCreateTaskCallbacks[id].onUpdate.forEach((callback) => {
callback(
state,
response.data.progress || 0,
Expand All @@ -1223,14 +1225,14 @@ function listenToCreateTask(
resolve(createdTask);
} else if (state === RQStatus.FAILED) {
const failMessage = 'Images processing failed';
listenToCreateCallbacks[id].onUpdate.forEach((callback) => {
listenToCreateTaskCallbacks[id].onUpdate.forEach((callback) => {
callback(state, 0, failMessage);
});

reject(new ServerError(filterPythonTraceback(response.data.message), 400));
} else {
const failMessage = 'Unknown status received';
listenToCreateCallbacks[id].onUpdate.forEach((callback) => {
listenToCreateTaskCallbacks[id].onUpdate.forEach((callback) => {
callback(state || RQStatus.UNKNOWN, 0, failMessage);
});
reject(
Expand All @@ -1241,7 +1243,7 @@ function listenToCreateTask(
);
}
} catch (errorData) {
listenToCreateCallbacks[id].onUpdate.forEach((callback) => {
listenToCreateTaskCallbacks[id].onUpdate.forEach((callback) => {
callback('failed', 0, 'Server request failed');
});
reject(generateError(errorData));
Expand All @@ -1251,13 +1253,13 @@ function listenToCreateTask(
setTimeout(checkStatus, 100);
});

listenToCreateCallbacks[id] = {
listenToCreateTaskCallbacks[id] = {
promise,
onUpdate: [onUpdate],
};
promise.catch(() => {
// do nothing, avoid uncaught promise exceptions
}).finally(() => delete listenToCreateCallbacks[id]);
}).finally(() => delete listenToCreateTaskCallbacks[id]);
return promise;
}

Expand Down Expand Up @@ -2337,7 +2339,7 @@ async function createAsset(file: File, guideId: number): Promise<SerializedAsset
}

async function getQualitySettings(
filter: ApiQualitySettingsFilter,
filter: APIQualitySettingsFilter,
): Promise<SerializedQualitySettingsData> {
const { backendAPI } = config;

Expand Down Expand Up @@ -2373,7 +2375,7 @@ async function updateQualitySettings(
}

async function getQualityConflicts(
filter: ApiQualityConflictsFilter,
filter: APIQualityConflictsFilter,
): Promise<SerializedQualityConflictData[]> {
const params = enableOrganization();
const { backendAPI } = config;
Expand All @@ -2391,7 +2393,7 @@ async function getQualityConflicts(
}

async function getQualityReports(
filter: ApiQualityReportsFilter,
filter: APIQualityReportsFilter,
): Promise<PaginatedResource<SerializedQualityReportData>> {
const { backendAPI } = config;

Expand All @@ -2410,7 +2412,7 @@ async function getQualityReports(
}

async function getAnalyticsReports(
filter: ApiAnalyticsReportFilter,
filter: APIAnalyticsReportFilter,
): Promise<SerializedAnalyticsReport> {
const { backendAPI } = config;

Expand All @@ -2427,6 +2429,86 @@ async function getAnalyticsReports(
}
}

const listenToCreateAnalyticsReportCallbacks: {
job: LongProcessListener<void>;
task: LongProcessListener<void>;
project: LongProcessListener<void>;
} = {
job: {},
task: {},
project: {},
};

async function calculateAnalyticsReport(
body: {
job_id?: number;
task_id?: number;
project_id?: number;
},
onUpdate: (state: string, progress: number, message: string) => void,
): Promise<void> {
const id = body.job_id || body.task_id || body.project_id;
const { backendAPI } = config;
const params = enableOrganization();
let listenerStorage: LongProcessListener<void> = null;

if (Number.isInteger(body.job_id)) {
listenerStorage = listenToCreateAnalyticsReportCallbacks.job;
} else if (Number.isInteger(body.task_id)) {
listenerStorage = listenToCreateAnalyticsReportCallbacks.task;
} else if (Number.isInteger(body.project_id)) {
listenerStorage = listenToCreateAnalyticsReportCallbacks.project;
}

if (listenerStorage[id]) {
listenerStorage[id].onUpdate.push(onUpdate);
return listenerStorage[id].promise;
}

const promise = new Promise<void>((resolve, reject) => {
Axios.post(`${backendAPI}/analytics/reports`, {
...body,
...params,
}).then(({ data: { rq_id: rqID } }) => {
listenerStorage[id].onUpdate.forEach((_onUpdate) => _onUpdate(RQStatus.QUEUED, 0, 'Analytics report request sent'));
const checkStatus = (): void => {
Axios.post(`${backendAPI}/analytics/reports`, {
...body,
...params,
}, { params: { rq_id: rqID } }).then((response) => {
// TODO: rewrite server logic, now it returns 202, 201 codes, but we need RQ statuses and details
// after this patch is merged https://github.com/cvat-ai/cvat/pull/7537
if (response.status === 201) {
listenerStorage[id].onUpdate.forEach((_onUpdate) => _onUpdate(RQStatus.FINISHED, 0, 'Done'));
resolve();
return;
}

listenerStorage[id].onUpdate.forEach((_onUpdate) => _onUpdate(RQStatus.QUEUED, 0, 'Analytics report calculation is in progress'));
setTimeout(checkStatus, 10000);
}).catch((errorData) => {
reject(generateError(errorData));
});
};

setTimeout(checkStatus, 2500);
}).catch((errorData) => {
reject(generateError(errorData));
});
});

listenerStorage[id] = {
promise,
onUpdate: [onUpdate],
};

promise.finally(() => {
delete listenerStorage[id];
});

return promise;
}

export default Object.freeze({
server: Object.freeze({
setAuthData,
Expand Down Expand Up @@ -2578,6 +2660,7 @@ export default Object.freeze({
analytics: Object.freeze({
performance: Object.freeze({
reports: getAnalyticsReports,
calculate: calculateAnalyticsReport,
}),
quality: Object.freeze({
reports: getQualityReports,
Expand Down
Loading

0 comments on commit 39afcd4

Please sign in to comment.