Skip to content

Commit

Permalink
Merge pull request #762 from endlessm/background-downloads
Browse files Browse the repository at this point in the history
Download extra channels in the background
  • Loading branch information
pwithnall authored Aug 24, 2023
2 parents d288b2a + e9de59d commit 8702e58
Show file tree
Hide file tree
Showing 25 changed files with 450 additions and 154 deletions.
4 changes: 2 additions & 2 deletions kolibri_explore_plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright 2021-2023 Endless OS Foundation LLC
# SPDX-License-Identifier: GPL-2.0-or-later
try:
from ._version import __version__ # noqa: F401
except ModuleNotFoundError:
__version__ = "0.dev0"

default_app_config = "kolibri_explore_plugin.apps_config.ExploreConfig"
2 changes: 2 additions & 0 deletions kolibri_explore_plugin/api_urls.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright 2021-2023 Endless OS Foundation LLC
# SPDX-License-Identifier: GPL-2.0-or-later
from django.conf.urls import include
from django.conf.urls import url
from rest_framework import routers
Expand Down
11 changes: 0 additions & 11 deletions kolibri_explore_plugin/apps_config.py

This file was deleted.

171 changes: 44 additions & 127 deletions kolibri_explore_plugin/collectionviews.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright 2022-2023 Endless OS Foundation LLC
# SPDX-License-Identifier: GPL-2.0-or-later
import logging
import os
import time
Expand All @@ -6,13 +8,10 @@

from django.utils.translation import gettext_lazy as _
from kolibri.core.content.errors import InsufficientStorageSpaceError
from kolibri.core.content.models import ChannelMetadata
from kolibri.core.content.tasks import remotechannelimport
from kolibri.core.content.utils.content_manifest import ContentManifest
from kolibri.core.content.utils.content_manifest import (
ContentManifestParseError,
)
from kolibri.core.tasks.job import Priority
from kolibri.core.tasks.job import State as JobState
from kolibri.core.tasks.main import job_storage
from kolibri.utils import conf
Expand All @@ -21,10 +20,13 @@
from rest_framework.exceptions import APIException
from rest_framework.response import Response

from .tasks import applyexternaltags
from .tasks import BACKGROUND_QUEUE
from .tasks import QUEUE
from .tasks import remotecontentimport
from .jobs import enqueue_next_background_task
from .jobs import enqueue_task
from .jobs import get_applyexternaltags_task
from .jobs import get_channel_metadata
from .jobs import get_remotechannelimport_task
from .jobs import get_remotecontentimport_task
from .models import BackgroundTask

logger = logging.getLogger(__name__)

Expand All @@ -50,8 +52,7 @@

PROGRESS_STEPS = {
"importing": 0.1,
"downloading": 0.8,
"tagging": 0.9,
"downloading": 0.9,
"completed": 1,
}

Expand Down Expand Up @@ -121,7 +122,7 @@ def get_channelimport_tasks(self):
For all the channels in this content manifest.
"""
return [
_build_remotechannelimport_task(channel_id)
get_remotechannelimport_task(channel_id)
for channel_id in self.get_channel_ids()
]

Expand All @@ -131,7 +132,7 @@ def get_extra_channelimport_tasks(self):
For all channels featured in Endless Key content manifests.
"""
return [
_build_remotechannelimport_task(channel_id)
get_remotechannelimport_task(channel_id)
for channel_id in self.get_extra_channel_ids()
]

Expand All @@ -143,22 +144,14 @@ def get_contentimport_tasks(self):
tasks = []

for channel_id in self.get_channel_ids():
channel_metadata = _get_channel_metadata(channel_id)
channel_metadata = get_channel_metadata(channel_id)
node_ids = list(
self._get_node_ids_for_channel(channel_metadata, channel_id)
)
tasks.append(
{
"task": "remotecontentimport",
"params": {
"channel_id": channel_id,
"channel_name": channel_metadata.name,
"node_ids": list(
self._get_node_ids_for_channel(
channel_metadata, channel_id
)
),
"exclude_node_ids": [],
"fail_on_error": True,
},
}
get_remotecontentimport_task(
channel_id, channel_metadata.name, node_ids
)
)

return tasks
Expand All @@ -176,15 +169,7 @@ def get_applyexternaltags_tasks(self):
for tagged in self.metadata["tagged_node_ids"]:
node_id = tagged["node_id"]
tags = tagged["tags"]
tasks.append(
{
"task": "applyexternaltags",
"params": {
"node_id": node_id,
"tags": tags,
},
}
)
tasks.append(get_applyexternaltags_task(node_id, tags))

return tasks

Expand All @@ -193,25 +178,10 @@ def get_contentthumbnail_tasks(self):
For all the channels in this content manifest.
"""
tasks = []

for channel_id in _get_channel_ids_for_all_content_manifests():
channel_metadata = _get_channel_metadata(channel_id)
tasks.append(
{
"task": "remotecontentimport",
"params": {
"channel_id": channel_id,
"channel_name": channel_metadata.name,
"node_ids": [],
"exclude_node_ids": [],
"all_thumbnails": True,
"fail_on_error": True,
},
}
)

return tasks
return [
get_remotecontentimport_task(channel_id, all_thumbnails=True)
for channel_id in self.get_channel_ids()
]

def _get_node_ids_for_channel(self, channel_metadata, channel_id):
"""Get node IDs regardless of the version
Expand Down Expand Up @@ -247,9 +217,6 @@ class DownloadStage(IntEnum):
IMPORTING_CHANNELS = auto()
IMPORTING_CONTENT = auto()
APPLYING_EXTERNAL_TAGS = auto()
IMPORTING_EXTRA_CHANNELS = auto()
FOREGROUND_COMPLETED = auto()
IMPORTING_ALL_THUMBNAILS = auto()
COMPLETED = auto()


Expand All @@ -258,12 +225,6 @@ class DownloadError(Exception):


class CollectionDownloadManager:
TASKS_MAPPING = {
"remotechannelimport": remotechannelimport,
"remotecontentimport": remotecontentimport,
"applyexternaltags": applyexternaltags,
}

def __init__(self):
self._set_empty_state()

Expand Down Expand Up @@ -444,7 +405,7 @@ def get_status(self):
progress = (
PROGRESS_STEPS["downloading"]
+ (
PROGRESS_STEPS["tagging"]
PROGRESS_STEPS["completed"]
- PROGRESS_STEPS["downloading"]
)
* current_task_number
Expand All @@ -453,18 +414,7 @@ def get_status(self):
else:
progress = PROGRESS_STEPS["downloading"]

elif self._stage == DownloadStage.IMPORTING_EXTRA_CHANNELS:
if total_tasks_number > 0:
progress = (
PROGRESS_STEPS["tagging"]
+ (PROGRESS_STEPS["completed"] - PROGRESS_STEPS["tagging"])
* current_task_number
/ total_tasks_number
)
else:
progress = PROGRESS_STEPS["tagging"]

elif self._stage >= DownloadStage.FOREGROUND_COMPLETED:
elif self._stage >= DownloadStage.COMPLETED:
progress = PROGRESS_STEPS["completed"]

return {
Expand Down Expand Up @@ -507,7 +457,6 @@ def _next_task_or_stage(self, user):

def _set_next_stage(self, user):
if self._stage == DownloadStage.COMPLETED:
logger.info("Download completed!")
return

tasks = []
Expand All @@ -519,14 +468,22 @@ def _set_next_stage(self, user):
tasks = self._content_manifest.get_contentimport_tasks()
elif self._stage == DownloadStage.APPLYING_EXTERNAL_TAGS:
tasks = self._content_manifest.get_applyexternaltags_tasks()
elif self._stage == DownloadStage.IMPORTING_EXTRA_CHANNELS:
tasks = self._content_manifest.get_extra_channelimport_tasks()
elif self._stage == DownloadStage.IMPORTING_ALL_THUMBNAILS:
# Download the remaining content thumbnails in the background.
for (
task
) in self._content_manifest.get_contentthumbnail_tasks():
self._enqueue_background_task(user, task)

if self._stage == DownloadStage.COMPLETED:
logger.info("Download completed!")

# Download the manifest content thumbnails and the extra channels
# in the background.
thumbnail_tasks = (
self._content_manifest.get_contentthumbnail_tasks()
)
extra_channel_tasks = (
self._content_manifest.get_extra_channelimport_tasks()
)
for task in thumbnail_tasks + extra_channel_tasks:
BackgroundTask.create_from_task_data(task)
logger.info("Starting background download tasks")
enqueue_next_background_task()

self._tasks_pending = tasks
self._tasks_previously_completed.extend(self._tasks_completed)
Expand All @@ -543,49 +500,13 @@ def _enqueue_current_task(self, user):
f" at {self._enqueuing_timestamp}"
)

task = self.TASKS_MAPPING[self._current_task["task"]]
task = self._current_task["task"]
params = self._current_task["params"]
self._current_job_id = _call_task(task, user, **params)
self._current_job_id = enqueue_task(task, user, **params)
self._enqueuing_timestamp = None
self._enqueued_timestamp = time.time()
logger.info(f"Enqueued job id {self._current_job_id}")

def _enqueue_background_task(self, user, task):
task_func = self.TASKS_MAPPING[task["task"]]
job_id = _call_task(
task_func,
user,
queue=BACKGROUND_QUEUE,
priority=Priority.REGULAR,
**task["params"],
)
logger.info(f"Enqueued task {task} in job {job_id}")


def _call_task(task, user, queue=QUEUE, priority=Priority.HIGH, **params):
"""Create, validate and enqueue a job."""
job, _enqueue_args = task.validate_job_data(user, params)
job_id = job_storage.enqueue_job(job, queue=queue, priority=priority)
return job_id


def _build_remotechannelimport_task(channel_id):
# Try to get the channel name from an existing channel database, but
# this will fail on first import.
try:
channel_metadata = _get_channel_metadata(channel_id)
except ChannelMetadata.DoesNotExist:
channel_name = "unknown"
else:
channel_name = channel_metadata.name
return {
"task": "remotechannelimport",
"params": {
"channel_id": channel_id,
"channel_name": channel_name,
},
}


_content_manifests = []
_content_manifests_by_grade_name = {}
Expand Down Expand Up @@ -653,10 +574,6 @@ def _get_channel_ids_for_all_content_manifests():
return channel_ids


def _get_channel_metadata(channel_id):
return ChannelMetadata.objects.get(id=channel_id)


@api_view(["GET"])
def get_collection_info(request):
"""Return the collection metadata and availability."""
Expand Down
Loading

0 comments on commit 8702e58

Please sign in to comment.