Skip to content

Commit

Permalink
Revert on_success on_failure usage due to their incorrect work
Browse files Browse the repository at this point in the history
  • Loading branch information
Marishka17 committed Mar 24, 2023
1 parent 2b504d3 commit 549911b
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 53 deletions.
24 changes: 18 additions & 6 deletions cvat-core/src/server-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -768,15 +768,17 @@ async function importDataset(
};

const url = `${backendAPI}/projects/${id}/dataset`;
let rqId: string;

async function wait() {
return new Promise<void>((resolve, reject) => {
async function requestStatus() {
try {
const response = await Axios.get(url, {
params: { ...params, action: 'import_status' },
params: { ...params, action: 'import_status', rq_id: rqId },
});
if (response.status === 202) {
rqId = response.data.rq_id;
if (response.data.message) {
options.updateStatusCallback(response.data.message, response.data.progress || 0);
}
Expand All @@ -797,10 +799,11 @@ async function importDataset(

if (isCloudStorage) {
try {
await Axios.post(url,
const response = await Axios.post(url,
new FormData(), {
params,
});
rqId = response.data.rq_id;
} catch (errorData) {
throw generateError(errorData);
}
Expand All @@ -822,11 +825,12 @@ async function importDataset(
headers: { 'Upload-Start': true },
});
await chunkUpload(file, uploadConfig);
await Axios.post(url,
const response = await Axios.post(url,
new FormData(), {
params,
headers: { 'Upload-Finish': true },
});
rqId = response.data.rq_id;
} catch (errorData) {
throw generateError(errorData);
}
Expand Down Expand Up @@ -1602,20 +1606,26 @@ async function uploadAnnotations(
filename: typeof file === 'string' ? file : file.name,
conv_mask_to_poly: options.convMaskToPoly,
};
let rqId: string;

const url = `${backendAPI}/${session}s/${id}/annotations`;
async function wait() {
return new Promise<void>((resolve, reject) => {
async function requestStatus() {
try {
const data = new FormData();
if (rqId) {
data.set('rq_id', rqId);
}
const response = await Axios.put(
url,
new FormData(),
data,
{
params,
},
);
if (response.status === 202) {
rqId = response.data.rq_id;
setTimeout(requestStatus, 3000);
} else {
resolve();
Expand All @@ -1631,10 +1641,11 @@ async function uploadAnnotations(

if (isCloudStorage) {
try {
await Axios.post(url,
const response = await Axios.post(url,
new FormData(), {
params,
});
rqId = response.data.rq_id;
} catch (errorData) {
throw generateError(errorData);
}
Expand All @@ -1652,11 +1663,12 @@ async function uploadAnnotations(
headers: { 'Upload-Start': true },
});
await chunkUpload(file, uploadConfig);
await Axios.post(url,
const response = await Axios.post(url,
new FormData(), {
params,
headers: { 'Upload-Finish': true },
});
rqId = response.data.rq_id;
} catch (errorData) {
throw generateError(errorData);
}
Expand Down
6 changes: 4 additions & 2 deletions cvat/apps/dataset_manager/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .annotation import AnnotationIR
from .bindings import ProjectData, load_dataset_data
from .formats.registry import make_exporter, make_importer
from .util import remove_resources

def export_project(project_id, dst_file, format_name,
server_url=None, save_images=False):
Expand Down Expand Up @@ -160,7 +161,8 @@ def data(self) -> dict:
raise NotImplementedError()

@transaction.atomic
def import_dataset_as_project(project_id, dataset_file, format_name, conv_mask_to_poly):
@remove_resources
def import_dataset_as_project(src_file, project_id, format_name, conv_mask_to_poly):
rq_job = rq.get_current_job()
rq_job.meta['status'] = 'Dataset import has been started...'
rq_job.meta['progress'] = 0.
Expand All @@ -170,5 +172,5 @@ def import_dataset_as_project(project_id, dataset_file, format_name, conv_mask_t
project.init_from_db()

importer = make_importer(format_name)
with open(dataset_file, 'rb') as f:
with open(src_file, 'rb') as f:
project.import_dataset(f, importer, conv_mask_to_poly=conv_mask_to_poly)
8 changes: 5 additions & 3 deletions cvat/apps/dataset_manager/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .annotation import AnnotationIR, AnnotationManager
from .bindings import TaskData, JobData
from .formats.registry import make_exporter, make_importer
from .util import bulk_create
from .util import bulk_create, remove_resources


class dotdict(OrderedDict):
Expand Down Expand Up @@ -789,7 +789,8 @@ def export_task(task_id, dst_file, format_name, server_url=None, save_images=Fal
task.export(f, exporter, host=server_url, save_images=save_images)

@transaction.atomic
def import_task_annotations(task_id, src_file, format_name, conv_mask_to_poly):
@remove_resources
def import_task_annotations(src_file, task_id, format_name, conv_mask_to_poly):
task = TaskAnnotation(task_id)
task.init_from_db()

Expand All @@ -798,7 +799,8 @@ def import_task_annotations(task_id, src_file, format_name, conv_mask_to_poly):
task.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly)

@transaction.atomic
def import_job_annotations(job_id, src_file, format_name, conv_mask_to_poly):
@remove_resources
def import_job_annotations(src_file, job_id, format_name, conv_mask_to_poly):
job = JobAnnotation(job_id)
job.init_from_db()

Expand Down
14 changes: 14 additions & 0 deletions cvat/apps/dataset_manager/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import inspect
import os, os.path as osp
import zipfile
import functools
from contextlib import suppress
from django.conf import settings


Expand Down Expand Up @@ -35,3 +37,15 @@ def bulk_create(db_model, objects, flt_param):
return db_model.objects.bulk_create(objects)

return []


def remove_resources(func):
@functools.wraps(func)
def wrapper(src_file, *args, **kwargs):
try:
func(src_file, *args, **kwargs)
finally:
with suppress(FileNotFoundError):
os.remove(src_file)
return None
return wrapper
24 changes: 18 additions & 6 deletions cvat/apps/engine/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import re
import shutil
import tempfile
import functools
from typing import Any, Dict, Iterable
import uuid
from zipfile import ZipFile
from datetime import datetime
from tempfile import NamedTemporaryFile
from contextlib import suppress

import django_rq
from django.conf import settings
Expand All @@ -36,7 +38,7 @@
LabeledDataSerializer, SegmentSerializer, SimpleJobSerializer, TaskReadSerializer,
ProjectReadSerializer, ProjectFileSerializer, TaskFileSerializer)
from cvat.apps.engine.utils import (
av_scan_paths, process_failed_job, configure_dependent_job, get_rq_job_meta, handle_finished_or_failed_job
av_scan_paths, process_failed_job, configure_dependent_job, get_rq_job_meta
)
from cvat.apps.engine.models import (
StorageChoice, StorageMethodChoice, DataChoice, Task, Project, Location,
Expand All @@ -55,6 +57,17 @@ class Version(Enum):
IMPORT_CACHE_FAILED_TTL = timedelta(hours=10).total_seconds()
IMPORT_CACHE_SUCCESS_TTL = timedelta(hours=1).total_seconds()

def remove_resources(func):
@functools.wraps(func)
def wrapper(filename, *args, **kwargs):
try:
result = func(filename, *args, **kwargs)
finally:
with suppress(FileNotFoundError):
os.remove(filename)
return result
return wrapper

def _get_label_mapping(db_labels):
label_mapping = {db_label.id: db_label.name for db_label in db_labels}
for db_label in db_labels:
Expand Down Expand Up @@ -633,6 +646,7 @@ def import_task(self):
return self._db_task

@transaction.atomic
@remove_resources
def _import_task(filename, user, org_id):
av_scan_paths(filename)
task_importer = TaskImporter(filename, user, org_id)
Expand Down Expand Up @@ -752,6 +766,7 @@ def import_project(self):
return self._db_project

@transaction.atomic
@remove_resources
def _import_project(filename, user, org_id):
av_scan_paths(filename)
project_importer = ProjectImporter(filename, user, org_id)
Expand Down Expand Up @@ -947,15 +962,12 @@ def _import(importer, request, queue, rq_id, Serializer, file_field_name, locati
**get_rq_job_meta(request=request, db_obj=None)
},
depends_on=dependent_job,
on_success=handle_finished_or_failed_job,
on_failure=handle_finished_or_failed_job,
result_ttl=IMPORT_CACHE_SUCCESS_TTL,
failure_ttl=IMPORT_CACHE_FAILED_TTL
)
else:
if rq_job.is_finished:
project_id = rq_job.return_value()
handle_finished_or_failed_job(rq_job)
rq_job.delete()
return Response({'id': project_id}, status=status.HTTP_201_CREATED)
elif rq_job.is_failed or \
Expand All @@ -981,7 +993,7 @@ def import_project(request, queue_name, filename=None):
if 'rq_id' in request.data:
rq_id = request.data['rq_id']
else:
rq_id = f"import:project.{uuid.uuid4()}-by-{request.user}"
rq_id = settings.COMMON_IMPORT_RQ_ID_TEMPLATE.format('project', uuid.uuid4(), 'backup', request.user)
Serializer = ProjectFileSerializer
file_field_name = 'project_file'

Expand All @@ -1007,7 +1019,7 @@ def import_task(request, queue_name, filename=None):
if 'rq_id' in request.data:
rq_id = request.data['rq_id']
else:
rq_id = f"import:task.{uuid.uuid4()}-by-{request.user}"
rq_id = settings.COMMON_IMPORT_RQ_ID_TEMPLATE.format('task', uuid.uuid4(), 'backup', request.user)
Serializer = TaskFileSerializer
file_field_name = 'task_file'

Expand Down
23 changes: 19 additions & 4 deletions cvat/apps/engine/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import base64
from unittest import mock
import uuid
import django_rq

from django.conf import settings
from django.core.cache import cache
Expand Down Expand Up @@ -170,12 +171,26 @@ def init_tus_upload(self, request):
if message_id:
metadata["message_id"] = base64.b64decode(message_id)

file_exists = os.path.lexists(os.path.join(self.get_upload_dir(), filename))
file_path = os.path.join(self.get_upload_dir(), filename)
file_exists = os.path.lexists(file_path)

if file_exists:
# check whether the rw_job is in progress or has been finished/failed
object_class_name = self._object.__class__.__name__.lower()
import_type = request.path.strip('/').split('/')[-1]
if import_type != 'backup':
template = settings.COMMON_IMPORT_RQ_ID_TEMPLATE.format(object_class_name, self._object.pk, import_type, request.user)
queue = django_rq.get_queue(settings.CVAT_QUEUES.IMPORT_DATA.value)
finished_job_ids = queue.finished_job_registry.get_job_ids()
failed_job_ids = queue.failed_job_registry.get_job_ids()
if template in finished_job_ids or template in failed_job_ids:
os.remove(file_path)
file_exists = False

if file_exists:
return self._tus_response(status=status.HTTP_409_CONFLICT,
data="File with same name already exists")

# TODO: check if rq job exists and is active
file_size = int(request.META.get("HTTP_UPLOAD_LENGTH", "0"))
if file_size > int(self._tus_max_file_size):
return self._tus_response(status=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
Expand Down Expand Up @@ -283,7 +298,7 @@ def export_annotations(self, request, db_obj, export_func, callback, get_data=No
if serializer.is_valid(raise_exception=True):
return Response(serializer.data)

def import_annotations(self, request, db_obj, import_func, rq_func, rq_id):
def import_annotations(self, request, db_obj, import_func, rq_func, rq_id_template):
is_tus_request = request.headers.get('Upload-Length', None) is not None or \
request.method == 'OPTIONS'
if is_tus_request:
Expand All @@ -305,7 +320,7 @@ def import_annotations(self, request, db_obj, import_func, rq_func, rq_id):

return import_func(
request=request,
rq_id=rq_id,
rq_id_template=rq_id_template,
rq_func=rq_func,
db_obj=self._object,
format_name=format_name,
Expand Down
5 changes: 0 additions & 5 deletions cvat/apps/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ def parse_exception_message(msg):
return parsed_msg

def process_failed_job(rq_job: Job):
handle_finished_or_failed_job(rq_job)
exc_info = str(rq_job.exc_info or rq_job.dependency.exc_info)
if rq_job.dependency:
rq_job.dependency.delete()
Expand All @@ -142,9 +141,6 @@ def process_failed_job(rq_job: Job):
log.error(msg)
return msg

def handle_finished_or_failed_job(rq_job: Job, *args, **kwargs):
if os.path.exists(rq_job.meta['tmp_file']):
os.remove(rq_job.meta['tmp_file'])

def configure_dependent_job(
queue: DjangoRQ,
Expand All @@ -166,7 +162,6 @@ def configure_dependent_job(
args=(db_storage, filename, key),
job_id=rq_job_id_download_file,
meta=get_rq_job_meta(request=request, db_obj=db_storage),
on_failure=handle_finished_or_failed_job,
result_ttl=result_ttl,
failure_ttl=failure_ttl
)
Expand Down
Loading

0 comments on commit 549911b

Please sign in to comment.