Skip to content

Commit

Permalink
Improve opds2_feed_reaper performance for large feeds. (PP-1756) (#…
Browse files Browse the repository at this point in the history
…2089)

* Improve `opds2_feed_reaper` performance for large feeds. (PP-1756)
  • Loading branch information
tdilauro authored Sep 28, 2024
1 parent 7032573 commit 881a590
Showing 1 changed file with 65 additions and 26 deletions.
91 changes: 65 additions & 26 deletions bin/opds2_reaper_monitor
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#!/usr/bin/env python
"""Remove availability of items no longer present in OPDS 2.0 import collections."""
import itertools
import json
from collections.abc import Generator
from typing import Any, cast

from sqlalchemy.orm import raiseload
from webpub_manifest_parser.opds2 import OPDS2FeedParserFactory

from palace.manager.core.coverage import CoverageFailure
Expand Down Expand Up @@ -103,53 +105,88 @@ class OPDS2ReaperMonitor(OPDS2ImportMonitor):
:param progress: A TimestampData, ignored.
"""
super().run_once(progress)
feed_id_count = len(self.seen_identifiers)
self.log.info(
f"Feed contained {self.publication_count} publication entries, "
f"{len(self.seen_identifiers)} unique identifiers, "
f"{feed_id_count} unique identifiers, "
f"{self.missing_id_count} missing identifiers."
)

# Number of ORM objects to buffer at a time.
query_batch_size = 500

# Convert feed identifiers to our identifiers, so we can find them.
# Unlike the import case, we don't want to create identifiers, if
# they don't already exist.
identifiers, failures = Identifier.parse_urns(
self._db, self.seen_identifiers, autocreate=False
self.log.info(
f"Mapping {feed_id_count} feed identifiers to database identifiers."
)
failure_total = 0
id_looked_up_count = 0
db_identifiers: dict[str, Identifier] = {}

feed_id_generator = (id_ for id_ in self.seen_identifiers)
while _feed_id_batch := list(
itertools.islice(feed_id_generator, query_batch_size)
):
_batch_size = len(_feed_id_batch)
_batch_db_ids, _batch_failures = Identifier.parse_urns(
self._db, _feed_id_batch, autocreate=False
)
db_identifiers |= _batch_db_ids
id_looked_up_count += _batch_size
_success_count = len(_batch_db_ids)
_failure_count = len(_batch_failures)
failure_total += _failure_count
self.log.info(
f"Mapped batch of {_batch_size} feed identifier(s) to database identifier(s) "
f"(cumulative: {id_looked_up_count} of {feed_id_count} feed ids) "
f"with {_success_count} success(es) and {_failure_count} failure(s))."
)

self.log.info(
f"Successfully mapped {len(db_identifiers)} feed identifier(s) to database identifier(s)."
)
identifier_ids = [x.id for x in list(identifiers.values())]
if failures:
if failure_total > 0:
self.log.warning(
f"Unable to parse {len(failures)} of {len(self.seen_identifiers)} identifiers."
f"Unable to parse {failure_total} of {feed_id_count} identifiers."
)

collection_license_pools_qu = self._db.query(LicensePool).filter(
LicensePool.collection_id == self.collection.id
)
collection_license_pools = collection_license_pools_qu.count()
collection_lp_count = collection_license_pools_qu.count()

unlimited_access_license_pools_qu = collection_license_pools_qu.filter(
eligible_license_pools_qu = collection_license_pools_qu.filter(
LicensePool.licenses_available == LicensePool.UNLIMITED_ACCESS
)
unlimited_access_license_pools = unlimited_access_license_pools_qu.count()
eligible_lp_count = eligible_license_pools_qu.count()

# At this point we've gone through the feed and collected all the identifiers.
# If there's anything we didn't see, we know it's no longer available.
to_be_reaped_qu = unlimited_access_license_pools_qu.join(Identifier).filter(
~Identifier.id.in_(identifier_ids)
)
reap_count = to_be_reaped_qu.count()
self.log.info(
f"Reaping {reap_count} of {unlimited_access_license_pools} unlimited (of {collection_license_pools} total) license pools from collection '{self.collection.name}'. "
f"{eligible_lp_count} of collection's {collection_lp_count} license pool(s) "
"are unlimited and eligible to be reaped, if missing from the feed."
)

if self.dry_run:
# TODO: Need to prevent timestamp update for dry runs.
achievements = f"Dry run: {reap_count} license pools would have been removed. Failures parsing identifiers from feed: {len(failures)}."
else:
achievements = f"License pools removed: {reap_count}. Failures parsing identifiers from feed: {len(failures)}."
for pool in to_be_reaped_qu:
pool.unlimited_access = False
self.log.info(achievements)

reap_count = 0
pool: LicensePool
db_identifier_ids = {x.id for x in list(db_identifiers.values())}

# Note: We need to turn off eager loading, so that `yield_per` works safely.
# `raiseload` will let us know if we're accidentally accessing a joined table.
for pool in eligible_license_pools_qu.options(raiseload("*")).yield_per(
query_batch_size
):
if pool.identifier_id not in db_identifier_ids:
reap_count += 1
# Don't actually reap, unless this is explicitly NOT a dry run.
if self.dry_run is False:
pool.unlimited_access = False

achievements = (
f"Dry run: {reap_count} of {eligible_lp_count} eligible license pool(s) would have been marked unavailable. {failure_total} failures parsing identifiers from feed."
if self.dry_run
else f"{reap_count} of {eligible_lp_count} eligible license pool(s) marked unavailable. {failure_total} failures parsing identifiers from feed."
)
return TimestampData(achievements=achievements)


Expand Down Expand Up @@ -211,7 +248,9 @@ class OPDS2ReaperScript(CollectionInputScript):
self.log.error("No collections specified.")
return

self.log.info(f"Reaping books from {len(collections)} collections.")
self.log.info(
f"Reaping books from {len(collections)} collection{'s' if len(collections) != 1 else ''}."
)
for collection in collections:
self.run_monitor(
collection,
Expand Down

0 comments on commit 881a590

Please sign in to comment.