Skip to content

Commit

Permalink
apply comments
Browse files Browse the repository at this point in the history
  • Loading branch information
azhavoro committed Jan 6, 2023
1 parent b96fa54 commit 2e00feb
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 32 deletions.
14 changes: 7 additions & 7 deletions cvat/apps/engine/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,13 +740,13 @@ def export(db_instance, request, queue_name):
"Unexpected action specified for the request")

if isinstance(db_instance, Task):
filename_prefix = 'task'
obj_type = 'task'
logger = slogger.task[db_instance.pk]
Exporter = TaskExporter
cache_ttl = TASK_CACHE_TTL
use_target_storage_conf = request.query_params.get('use_default_location', True)
elif isinstance(db_instance, Project):
filename_prefix = 'project'
obj_type = 'project'
logger = slogger.project[db_instance.pk]
Exporter = ProjectExporter
cache_ttl = PROJECT_CACHE_TTL
Expand All @@ -763,7 +763,7 @@ def export(db_instance, request, queue_name):
)

queue = django_rq.get_queue(queue_name)
rq_id = f"api-{filename_prefix}s-{db_instance.pk}/backup"
rq_id = f"export:{obj_type}.id{db_instance.pk}-by-{request.user}"
rq_job = queue.fetch_job(rq_id)
if rq_job:
last_project_update_time = timezone.localtime(db_instance.updated_date)
Expand All @@ -780,7 +780,7 @@ def export(db_instance, request, queue_name):
timestamp = datetime.strftime(last_project_update_time,
"%Y_%m_%d_%H_%M_%S")
filename = filename or "{}_{}_backup_{}{}".format(
filename_prefix, db_instance.name, timestamp,
obj_type, db_instance.name, timestamp,
os.path.splitext(file_path)[1]).lower()

location = location_conf.get('location')
Expand Down Expand Up @@ -820,7 +820,7 @@ def export(db_instance, request, queue_name):
ttl = dm.views.PROJECT_CACHE_TTL.total_seconds()
queue.enqueue_call(
func=_create_backup,
args=(db_instance, Exporter, '{}_backup.zip'.format(filename_prefix), logger, cache_ttl),
args=(db_instance, Exporter, '{}_backup.zip'.format(obj_type), logger, cache_ttl),
job_id=rq_id,
meta={ 'request_time': timezone.localtime() },
result_ttl=ttl, failure_ttl=ttl)
Expand Down Expand Up @@ -908,7 +908,7 @@ def import_project(request, queue_name, filename=None):
if 'rq_id' in request.data:
rq_id = request.data['rq_id']
else:
rq_id = f"{request.user}$-api-projects-{uuid.uuid4()}-import"
rq_id = f"import:project.{uuid.uuid4()}-by-{request.user}"
Serializer = ProjectFileSerializer
file_field_name = 'project_file'

Expand All @@ -934,7 +934,7 @@ def import_task(request, queue_name, filename=None):
if 'rq_id' in request.data:
rq_id = request.data['rq_id']
else:
rq_id = f"{request.user}$-api-tasks-{uuid.uuid4()}-import"
rq_id = f"import:task.{uuid.uuid4()}-by-{request.user}"
Serializer = TaskFileSerializer
file_field_name = 'task_file'

Expand Down
5 changes: 3 additions & 2 deletions cvat/apps/engine/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def upload_finished(self, request):

class AnnotationMixin:
def export_annotations(self, request, pk, db_obj, export_func, callback, get_data=None):
format_name = request.query_params.get("format")
format_name = request.query_params.get("format", "")
action = request.query_params.get("action", "").lower()
filename = request.query_params.get("filename", "")

Expand All @@ -259,7 +259,8 @@ def export_annotations(self, request, pk, db_obj, export_func, callback, get_dat
field_name=StorageType.TARGET,
)

rq_id = f"api-{self._object.__class__.__name__.lower()}-{pk}-annotations-{format_name}"
object_name = self._object.__class__.__name__.lower()
rq_id = f"export:annotations-for-{object_name}.id{pk}-in-{format_name.replace(' ', '_')}-format"

if format_name:
return export_func(db_instance=self._object,
Expand Down
4 changes: 2 additions & 2 deletions cvat/apps/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@

############################# Low Level server API

def create(tid, data):
def create(tid, data, username):
"""Schedule the task"""
q = django_rq.get_queue(settings.CVAT_QUEUES.IMPORT_DATA.value)
q.enqueue_call(func=_create_thread, args=(tid, data),
job_id=f"api-tasks-{tid}")
job_id=f"create:task.id{tid}-by-{username}")

@transaction.atomic
def rq_handler(job, exc_type, exc_value, traceback):
Expand Down
33 changes: 20 additions & 13 deletions cvat/apps/engine/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ def tasks(self, request, pk):
url_path=r'dataset/?$')
def dataset(self, request, pk):
self._object = self.get_object() # force to call check_object_permissions
rq_id = f"import:dataset-for-porject.id{pk}-by-{request.user}"

if request.method in {'POST', 'OPTIONS'}:
return self.import_annotations(
Expand All @@ -423,13 +424,13 @@ def dataset(self, request, pk):
db_obj=self._object,
import_func=_import_project_dataset,
rq_func=dm.project.import_dataset_as_project,
rq_id=f"api-project-{pk}-dataset-import",
rq_id=rq_id,
)
else:
action = request.query_params.get("action", "").lower()
if action in ("import_status",):
queue = django_rq.get_queue(settings.CVAT_QUEUES.IMPORT_DATA.value)
rq_job = queue.fetch_job(f"api-project-{pk}-dataset-import")
rq_job = queue.fetch_job(rq_id)
if rq_job is None:
return Response(status=status.HTTP_404_NOT_FOUND)
elif rq_job.is_finished:
Expand All @@ -451,7 +452,7 @@ def dataset(self, request, pk):
return Response(
data=self._get_rq_response(
settings.CVAT_QUEUES.IMPORT_DATA.value,
f'api-project-{pk}-dataset-import'
rq_id,
),
status=status.HTTP_202_ACCEPTED
)
Expand Down Expand Up @@ -498,7 +499,7 @@ def upload_finished(self, request):
return _import_project_dataset(
request=request,
filename=uploaded_file,
rq_id=f"api-project-{self._object.pk}-dataset-import",
rq_id=f"import:dataset-for-porject.id{self._object.pk}-by-{request.user}",
rq_func=dm.project.import_dataset_as_project,
pk=self._object.pk,
format_name=format_name,
Expand Down Expand Up @@ -940,7 +941,8 @@ def upload_finished(self, request):
return _import_annotations(
request=request,
filename=annotation_file,
rq_id=f"{request.user}$-api-tasks-{self._object.pk}-annotations-upload",
rq_id=(f"import:annotations-for-task.id{self._object.pk}-"
f"in-{format_name.replace(' ', '_')}-by-{request.user}"),
rq_func=dm.task.import_task_annotations,
pk=self._object.pk,
format_name=format_name,
Expand Down Expand Up @@ -979,7 +981,7 @@ def upload_finished(self, request):
# the value specified by the user or it's default value from the database
if 'stop_frame' not in serializer.validated_data:
data['stop_frame'] = None
task.create(self._object.id, data)
task.create(self._object.id, data, request.user)
return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
elif self.action == 'import_backup':
filename = request.query_params.get("filename", "")
Expand Down Expand Up @@ -1166,16 +1168,17 @@ def annotations(self, request, pk):
return Response(data="Exporting annotations from a task without data is not allowed",
status=status.HTTP_400_BAD_REQUEST)
elif request.method == 'POST' or request.method == 'OPTIONS':
format_name = request.query_params.get('format', '')
return self.import_annotations(
request=request,
pk=pk,
db_obj=self._object,
import_func=_import_annotations,
rq_func=dm.task.import_task_annotations,
rq_id = f"{request.user}$-api-tasks-{pk}-annotations-upload"
rq_id = f"import:annotations-for-task.id{pk}-in-{format_name.replace(' ', '_')}-by-{request.user}"
)
elif request.method == 'PUT':
format_name = request.query_params.get('format')
format_name = request.query_params.get('format', '')
if format_name:
use_settings = strtobool(str(request.query_params.get('use_default_location', True)))
conv_mask_to_poly = strtobool(request.query_params.get('conv_mask_to_poly', 'True'))
Expand All @@ -1185,7 +1188,7 @@ def annotations(self, request, pk):
)
return _import_annotations(
request=request,
rq_id=f"{request.user}$-api-tasks-{pk}-annotations-upload",
rq_id = f"import:annotations-for-task.id{pk}-in-{format_name.replace(' ', '_')}-by-{request.user}",
rq_func=dm.task.import_task_annotations,
pk=pk,
format_name=format_name,
Expand Down Expand Up @@ -1239,7 +1242,7 @@ def status(self, request, pk):
self.get_object() # force to call check_object_permissions
response = self._get_rq_response(
queue=settings.CVAT_QUEUES.IMPORT_DATA.value,
job_id=f"api-tasks-{pk}"
job_id=f"create:task.id{pk}-by-{request.user}"
)
serializer = RqStatusSerializer(data=response)

Expand Down Expand Up @@ -1451,7 +1454,8 @@ def upload_finished(self, request):
return _import_annotations(
request=request,
filename=annotation_file,
rq_id=f"{request.user}$-api-jobs-{self._object.pk}-annotations-upload",
rq_id=(f"import:annotations-for-job.id{self._object.pk}-"
f"in-{format_name.replace(' ', '_')}-by-{request.user}"),
rq_func=dm.task.import_job_annotations,
pk=self._object.pk,
format_name=format_name,
Expand Down Expand Up @@ -1558,13 +1562,15 @@ def annotations(self, request, pk):
)

elif request.method == 'POST' or request.method == 'OPTIONS':
format_name = request.query_params.get('format', '')
return self.import_annotations(
request=request,
pk=pk,
db_obj=self._object.segment.task,
import_func=_import_annotations,
rq_func=dm.task.import_job_annotations,
rq_id = f"{request.user}$-api-jobs-{pk}-annotations-upload"
rq_id=(f"import:annotations-for-job.id{self._object.pk}-"
f"in-{format_name.replace(' ', '_')}-by-{request.user}"),
)

elif request.method == 'PUT':
Expand All @@ -1578,7 +1584,8 @@ def annotations(self, request, pk):
)
return _import_annotations(
request=request,
rq_id=f"{request.user}$-api-jobs-{pk}-annotations-upload",
rq_id=(f"import:annotations-for-job.id{pk}-"
f"in-{format_name.replace(' ', '_')}-by-{request.user}"),
rq_func=dm.task.import_job_annotations,
pk=pk,
format_name=format_name,
Expand Down
16 changes: 11 additions & 5 deletions supervisord/all.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@ command=bash -c "rm /tmp/ssh-agent.sock -f && /usr/bin/ssh-agent -d -a /tmp/ssh-
priority=1
autorestart=true

[program:rqworker_default]
[program:rqworker_export]
command=%(ENV_HOME)s/wait-for-it.sh %(ENV_CVAT_REDIS_HOST)s:6379 -t 0 -- bash -ic \
"exec python3 %(ENV_HOME)s/manage.py rqworker -v 3 default"
"exec python3 %(ENV_HOME)s/manage.py rqworker -v 3 export"
environment=SSH_AUTH_SOCK="/tmp/ssh-agent.sock"
numprocs=2
process_name=rqworker_default_%(process_num)s

[program:rqworker_low]
command=%(ENV_HOME)s/wait-for-it.sh %(ENV_CVAT_REDIS_HOST)s:6379 -t 0 -- bash -ic \
"exec python3 %(ENV_HOME)s/manage.py rqworker -v 3 low"
"exec python3 %(ENV_HOME)s/manage.py rqworker -v 3 import"
environment=SSH_AUTH_SOCK="/tmp/ssh-agent.sock"
numprocs=2
process_name=rqworker_default_%(process_num)s

[program:rqworker_annotation]
command=%(ENV_HOME)s/wait-for-it.sh %(ENV_CVAT_REDIS_HOST)s:6379 -t 0 -- bash -ic \
"exec python3 %(ENV_HOME)s/manage.py rqworker -v 3 annotation"
environment=SSH_AUTH_SOCK="/tmp/ssh-agent.sock"
numprocs=1

Expand All @@ -49,7 +55,7 @@ numprocs=1

[program:rqscheduler]
command=%(ENV_HOME)s/wait-for-it.sh %(ENV_CVAT_REDIS_HOST)s:6379 -t 0 -- bash -ic \
"python3 /opt/venv/bin/rqscheduler --host %(ENV_CVAT_REDIS_HOST)s -i 30"
"python3 /opt/venv/bin/rqscheduler --host %(ENV_CVAT_REDIS_HOST)s --password '%(ENV_CVAT_REDIS_PASSWORD)s' -i 30"
environment=SSH_AUTH_SOCK="/tmp/ssh-agent.sock"
numprocs=1

Expand Down
2 changes: 1 addition & 1 deletion supervisord/worker.annotation.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ command=bash -c "rm /tmp/ssh-agent.sock -f && /usr/bin/ssh-agent -d -a /tmp/ssh-
priority=1
autorestart=true

[program:rqworker_low]
[program:rqworker_annotation]
command=%(ENV_HOME)s/wait-for-it.sh %(ENV_CVAT_REDIS_HOST)s:6379 -t 0 -- bash -ic " \
exec python3 %(ENV_HOME)s/manage.py rqworker -v 3 annotation \
--worker-class cvat.rqworker.DefaultWorker \
Expand Down
2 changes: 1 addition & 1 deletion supervisord/worker.export.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ command=bash -c "rm /tmp/ssh-agent.sock -f && /usr/bin/ssh-agent -d -a /tmp/ssh-
priority=1
autorestart=true

[program:rqworker_default]
[program:rqworker_export]
command=%(ENV_HOME)s/wait-for-it.sh %(ENV_CVAT_REDIS_HOST)s:6379 -t 0 -- bash -ic " \
exec python3 %(ENV_HOME)s/manage.py rqworker -v 3 export \
--worker-class cvat.rqworker.DefaultWorker \
Expand Down
2 changes: 1 addition & 1 deletion supervisord/worker.import.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ command=bash -c "rm /tmp/ssh-agent.sock -f && /usr/bin/ssh-agent -d -a /tmp/ssh-
priority=1
autorestart=true

[program:rqworker_default]
[program:rqworker_import]
command=%(ENV_HOME)s/wait-for-it.sh %(ENV_CVAT_REDIS_HOST)s:6379 -t 0 -- bash -ic " \
exec python3 %(ENV_HOME)s/manage.py rqworker -v 3 import \
--worker-class cvat.rqworker.DefaultWorker \
Expand Down

0 comments on commit 2e00feb

Please sign in to comment.