Skip to content

Commit

Permalink
Fix export of resources to cloud storage (#7317)
Browse files Browse the repository at this point in the history
<!-- Raise an issue to propose your change
(https://github.com/opencv/cvat/issues).
It helps to avoid duplication of efforts from multiple independent
contributors.
Discuss your ideas with maintainers to be sure that changes will be
approved and merged.
Read the [Contribution
guide](https://opencv.github.io/cvat/docs/contributing/). -->

<!-- Provide a general summary of your changes in the Title above -->

### Motivation and context
<!-- Why is this change required? What problem does it solve? If it
fixes an open
issue, please link to the issue here. Describe your changes in detail,
add
screenshots. -->
In the previous implementation logic to upload created files to cloud
storage was in the code that processes requests. This resulted in
responses with 504 status when uploading a file took longer than the
request timeout. This PR fixes the described problem.
### How has this been tested?
<!-- Please describe in detail how you tested your changes.
Include details of your testing environment, and the tests you ran to
see how your change affects other areas of the code, etc. -->

### Checklist
<!-- Go over all the following points, and put an `x` in all the boxes
that apply.
If an item isn't applicable for some reason, then ~~explicitly
strikethrough~~ the whole
line. If you don't do that, GitHub will show incorrect progress for the
pull request.
If you're unsure about any of these, don't hesitate to ask. We're here
to help! -->
- [x] I submit my changes into the `develop` branch
- [x] I have created a changelog fragment <!-- see top comment in
CHANGELOG.md -->
- [ ] I have updated the documentation accordingly
- [ ] I have added tests to cover my changes
- [ ] I have linked related issues (see [GitHub docs](

https://help.github.com/en/github/managing-your-work-on-github/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword))
- [x] I have increased versions of npm packages if it is necessary

([cvat-canvas](https://github.com/opencv/cvat/tree/develop/cvat-canvas#versioning),

[cvat-core](https://github.com/opencv/cvat/tree/develop/cvat-core#versioning),

[cvat-data](https://github.com/opencv/cvat/tree/develop/cvat-data#versioning)
and

[cvat-ui](https://github.com/opencv/cvat/tree/develop/cvat-ui#versioning))

### License

- [x] I submit _my code changes_ under the same [MIT License](
https://github.com/opencv/cvat/blob/develop/LICENSE) that covers the
project.
  Feel free to contact the maintainers if that's a concern.
  • Loading branch information
Marishka17 committed Jan 15, 2024
1 parent c02580c commit 906260c
Show file tree
Hide file tree
Showing 12 changed files with 256 additions and 152 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Fixed

- 504 Timeout error when exporting resources to cloud storage
(<https://github.com/opencv/cvat/pull/7317>)
- Enqueuing deferred jobs when their dependencies have been started -> cancelled -> restarted -> finished
(<https://github.com/opencv/cvat/pull/7317>)
2 changes: 1 addition & 1 deletion cvat-core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "cvat-core",
"version": "14.0.3",
"version": "14.0.4",
"type": "module",
"description": "Part of Computer Vision Tool which presents an interface for client-side integration",
"main": "src/api.ts",
Expand Down
15 changes: 9 additions & 6 deletions cvat-core/src/server-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -797,10 +797,11 @@ function exportDataset(instanceType: 'projects' | 'jobs' | 'tasks') {
.then((response) => {
const isCloudStorage = targetStorage.location === StorageLocation.CLOUD_STORAGE;
const { status } = response;
if (status === 201) params.action = 'download';
if (status === 202 || (isCloudStorage && status === 201)) {

if (status === 202) {
setTimeout(request, 3000);
} else if (status === 201) {
params.action = 'download';
resolve(`${baseURL}?${new URLSearchParams(params).toString()}`);
} else if (isCloudStorage && status === 200) {
resolve();
Expand Down Expand Up @@ -927,10 +928,11 @@ async function backupTask(id: number, targetStorage: Storage, useDefaultSettings
});
const isCloudStorage = targetStorage.location === StorageLocation.CLOUD_STORAGE;
const { status } = response;
if (status === 201) params.action = 'download';
if (status === 202 || (isCloudStorage && status === 201)) {

if (status === 202) {
setTimeout(request, 3000);
} else if (status === 201) {
params.action = 'download';
resolve(`${url}?${new URLSearchParams(params).toString()}`);
} else if (isCloudStorage && status === 200) {
resolve();
Expand Down Expand Up @@ -1032,10 +1034,11 @@ async function backupProject(
});
const isCloudStorage = targetStorage.location === StorageLocation.CLOUD_STORAGE;
const { status } = response;
if (status === 201) params.action = 'download';
if (status === 202 || (isCloudStorage && status === 201)) {

if (status === 202) {
setTimeout(request, 3000);
} else if (status === 201) {
params.action = 'download';
resolve(`${url}?${new URLSearchParams(params).toString()}`);
} else if (isCloudStorage && status === 200) {
resolve();
Expand Down
108 changes: 59 additions & 49 deletions cvat/apps/engine/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from rest_framework.parsers import JSONParser
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from rest_framework.exceptions import ValidationError, PermissionDenied, NotFound
from rest_framework.exceptions import ValidationError

import cvat.apps.dataset_manager as dm
from cvat.apps.engine import models
Expand All @@ -38,12 +38,12 @@
from cvat.apps.engine.utils import (
av_scan_paths, process_failed_job, configure_dependent_job_to_download_from_cs,
get_rq_job_meta, get_import_rq_id, import_resource_with_clean_up_after,
sendfile, define_dependent_job, get_rq_lock_by_user
sendfile, define_dependent_job, get_rq_lock_by_user, build_backup_file_name,
)
from cvat.apps.engine.models import (
StorageChoice, StorageMethodChoice, DataChoice, Task, Project, Location)
from cvat.apps.engine.task import JobFileMapping, _create_thread
from cvat.apps.engine.cloud_provider import db_storage_to_storage_instance
from cvat.apps.engine.cloud_provider import download_file_from_bucket, export_resource_to_cloud_storage
from cvat.apps.engine.location import StorageType, get_location_configuration
from cvat.apps.engine.view_utils import get_cloud_storage_for_import_or_export
from cvat.apps.dataset_manager.views import TASK_CACHE_TTL, PROJECT_CACHE_TTL, get_export_cache_dir, clear_export_cache, log_exception
Expand Down Expand Up @@ -955,54 +955,49 @@ def export(db_instance, request, queue_name):
queue = django_rq.get_queue(queue_name)
rq_id = f"export:{obj_type}.id{db_instance.pk}-by-{request.user}"
rq_job = queue.fetch_job(rq_id)

last_instance_update_time = timezone.localtime(db_instance.updated_date)
timestamp = datetime.strftime(last_instance_update_time, "%Y_%m_%d_%H_%M_%S")
location = location_conf.get('location')

if rq_job:
last_project_update_time = timezone.localtime(db_instance.updated_date)
rq_request = rq_job.meta.get('request', None)
request_time = rq_request.get("timestamp", None) if rq_request else None
if request_time is None or request_time < last_project_update_time:
rq_job.cancel()
if request_time is None or request_time < last_instance_update_time:
# in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER
# we have to enqueue dependent jobs after canceling one
rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER)
rq_job.delete()
else:
if rq_job.is_finished:
file_path = rq_job.return_value()
if action == "download" and os.path.exists(file_path):
rq_job.delete()
if location == Location.LOCAL:
file_path = rq_job.return_value()

if not file_path:
return Response('A result for exporting job was not found for finished RQ job', status=status.HTTP_500_INTERNAL_SERVER_ERROR)

timestamp = datetime.strftime(last_project_update_time,
"%Y_%m_%d_%H_%M_%S")
filename = filename or "{}_{}_backup_{}{}".format(
obj_type, db_instance.name, timestamp,
os.path.splitext(file_path)[1]).lower()
elif not os.path.exists(file_path):
return Response('The result file does not exist in export cache', status=status.HTTP_500_INTERNAL_SERVER_ERROR)

location = location_conf.get('location')
if location == Location.LOCAL:
filename = filename or build_backup_file_name(
class_name=obj_type,
identifier=db_instance.name,
timestamp=timestamp,
extension=os.path.splitext(file_path)[1]
)

if action == "download":
rq_job.delete()
return sendfile(request, file_path, attachment=True,
attachment_filename=filename)
elif location == Location.CLOUD_STORAGE:
try:
storage_id = location_conf['storage_id']
except KeyError:
raise serializers.ValidationError(
'Cloud storage location was selected as the destination,'
' but cloud storage id was not specified')

db_storage = get_cloud_storage_for_import_or_export(
storage_id=storage_id, request=request,
is_default=location_conf['is_default'])
storage = db_storage_to_storage_instance(db_storage)

try:
storage.upload_file(file_path, filename)
except (ValidationError, PermissionDenied, NotFound) as ex:
msg = str(ex) if not isinstance(ex, ValidationError) else \
'\n'.join([str(d) for d in ex.detail])
return Response(data=msg, status=ex.status_code)
return Response(status=status.HTTP_200_OK)
else:
raise NotImplementedError()

return Response(status=status.HTTP_201_CREATED)

elif location == Location.CLOUD_STORAGE:
rq_job.delete()
return Response(status=status.HTTP_200_OK)
else:
if os.path.exists(file_path):
return Response(status=status.HTTP_201_CREATED)
raise NotImplementedError()
elif rq_job.is_failed:
exc_info = rq_job.meta.get('formatted_exception', str(rq_job.exc_info))
rq_job.delete()
Expand All @@ -1014,10 +1009,31 @@ def export(db_instance, request, queue_name):
ttl = dm.views.PROJECT_CACHE_TTL.total_seconds()
user_id = request.user.id

func = _create_backup if location == Location.LOCAL else export_resource_to_cloud_storage
func_args = (db_instance, Exporter, '{}_backup.zip'.format(obj_type), logger, cache_ttl)

if location == Location.CLOUD_STORAGE:
try:
storage_id = location_conf['storage_id']
except KeyError:
raise serializers.ValidationError(
'Cloud storage location was selected as the destination,'
' but cloud storage id was not specified')

db_storage = get_cloud_storage_for_import_or_export(
storage_id=storage_id, request=request,
is_default=location_conf['is_default'])
filename_pattern = build_backup_file_name(
class_name=obj_type,
identifier=db_instance.name,
timestamp=timestamp,
)
func_args = (db_storage, filename, filename_pattern, _create_backup) + func_args

with get_rq_lock_by_user(queue, user_id):
queue.enqueue_call(
func=_create_backup,
args=(db_instance, Exporter, '{}_backup.zip'.format(obj_type), logger, cache_ttl),
func=func,
args=func_args,
job_id=rq_id,
meta=get_rq_job_meta(request=request, db_obj=db_instance),
depends_on=define_dependent_job(queue, user_id, rq_id=rq_id),
Expand All @@ -1027,12 +1043,6 @@ def export(db_instance, request, queue_name):
return Response(status=status.HTTP_202_ACCEPTED)


def _download_file_from_bucket(db_storage, filename, key):
storage = db_storage_to_storage_instance(db_storage)

with storage.download_fileobj(key) as data, open(filename, 'wb+') as f:
f.write(data.getbuffer())

def _import(importer, request, queue, rq_id, Serializer, file_field_name, location_conf, filename=None):
rq_job = queue.fetch_job(rq_id)

Expand Down Expand Up @@ -1077,7 +1087,7 @@ def _import(importer, request, queue, rq_id, Serializer, file_field_name, locati
dependent_job = configure_dependent_job_to_download_from_cs(
queue=queue,
rq_id=rq_id,
rq_func=_download_file_from_bucket,
rq_func=download_file_from_bucket,
db_storage=db_storage,
filename=filename,
key=key,
Expand Down
22 changes: 21 additions & 1 deletion cvat/apps/engine/cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from enum import Enum
from io import BytesIO
from multiprocessing.pool import ThreadPool
from typing import Dict, List, Optional, Any
from typing import Dict, List, Optional, Any, Callable

import boto3
from azure.core.exceptions import HttpResponseError, ResourceExistsError
Expand Down Expand Up @@ -962,3 +962,23 @@ def db_storage_to_storage_instance(db_storage):
'specific_attributes': db_storage.get_specific_attributes()
}
return get_cloud_storage_instance(cloud_provider=db_storage.provider_type, **details)

def download_file_from_bucket(db_storage: Any, filename: str, key: str) -> None:
storage = db_storage_to_storage_instance(db_storage)

with storage.download_fileobj(key) as data, open(filename, 'wb+') as f:
f.write(data.getbuffer())

def export_resource_to_cloud_storage(
db_storage: Any,
key: str,
key_pattern: str,
func: Callable[[int, Optional[str], Optional[str]], str],
*args,
**kwargs,
) -> str:
file_path = func(*args, **kwargs)
storage = db_storage_to_storage_instance(db_storage)
storage.upload_file(file_path, key if key else key_pattern.format(os.path.splitext(file_path)[1].lower()))

return file_path
13 changes: 11 additions & 2 deletions cvat/apps/engine/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pathlib import Path
from tempfile import NamedTemporaryFile
from unittest import mock
from typing import Optional, Callable, Dict, Any

import django_rq
from attr.converters import to_bool
Expand Down Expand Up @@ -384,7 +385,15 @@ def upload_finished(self, request):
raise NotImplementedError('Must be implemented in the derived class')

class AnnotationMixin:
def export_annotations(self, request, db_obj, export_func, callback, get_data=None):
def export_annotations(
self,
request,
db_obj,
export_func,
callback: Callable[[int, Optional[str], Optional[str]], str],
*,
get_data: Optional[Callable[[int], Dict[str, Any]]]= None,
):
format_name = request.query_params.get("format", "")
action = request.query_params.get("action", "").lower()
filename = request.query_params.get("filename", "")
Expand All @@ -399,7 +408,7 @@ def export_annotations(self, request, db_obj, export_func, callback, get_data=No
)

object_name = self._object.__class__.__name__.lower()
rq_id = f"export:annotations-for-{object_name}.id{self._object.pk}-in-{format_name.replace(' ', '_')}-format"
rq_id = f"export:{request.path.strip('/').split('/')[-1]}-for-{object_name}.id{self._object.pk}-in-{format_name.replace(' ', '_')}-format"

if format_name:
return export_func(db_instance=self._object,
Expand Down
27 changes: 27 additions & 0 deletions cvat/apps/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,30 @@ def preload_image(image: tuple[str, str, str])-> tuple[Image.Image, str, str]:

def preload_images(images: Iterable[tuple[str, str, str]]) -> list[tuple[Image.Image, str, str]]:
return list(map(preload_image, images))

def build_backup_file_name(
*,
class_name: str,
identifier: str | int,
timestamp: str,
extension: str = "{}",
) -> str:
# "<project|task>_<name>_backup_<timestamp>.zip"
return "{}_{}_backup_{}{}".format(
class_name, identifier, timestamp, extension,
).lower()

def build_annotations_file_name(
*,
class_name: str,
identifier: str | int,
timestamp: str,
format_name: str,
is_annotation_file: bool = True,
extension: str = "{}",
) -> str:
# "<project|task|job>_<name|id>_<annotations|dataset>_<timestamp>_<format>.zip"
return "{}_{}_{}_{}_{}{}".format(
class_name, identifier, 'annotations' if is_annotation_file else 'dataset',
timestamp, format_name, extension,
).lower()
Loading

0 comments on commit 906260c

Please sign in to comment.