Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Merge popularity calculations and data refresh into a single DAG #496

Merged
merged 31 commits into from
May 13, 2022

Conversation

stacimc
Copy link
Contributor

@stacimc stacimc commented May 2, 2022

Fixes

Fixes WordPress/openverse#1603 by @AetherUnbound

Description

For each media type, this PR merges the following DAGs into a single DAG:

  • refresh_all_*_popularity_data: A DAG that currently runs @monthly. It updates the underlying popularity DB tables
  • refresh_*_view_data: A DAG that currently runs @daily, except on the first of the month (when the previous DAG runs). It refreshes the materialized view that calculates popularity for records.
  • *_data_refresh: A DAG that runs @weekly. It syncs data (including popularity data) in the Catalog up to the API db and elasticsearch cluster, so that it is accessible via the frontend.

The new, single DAG combines all the steps and looks like this:
Screen Shot 2022-05-02 at 11 11 41 AM

Key benefits:

  • Scheduling of the popularity refresh is made much easier. With the current setup, it's not possible to schedule the data refresh step so that it always occurs after the matview refresh and ensure that it never collides with the monthly popularity recalculation.
  • It's much easier to see how the task are related, and easily check the status of the larger process
  • When we add new media types, all we have to do is add a new DataRefresh config, and the DAGs are all generated immediately. Previously we would have needed to write at least two new DAGs for the popularity calc.

How the DAG works

Here's the full DAG, with the TaskGroups expanded so you can see all the steps:
Screen Shot 2022-05-05 at 3 21 37 PM

The steps are:

  1. month_check: a branching operator that checks whether this is the first time this DAG has run this month. If so, it proceeds to the full popularity recalculation; if not, we go directly to the matview refresh.
  2. refresh_popularity_metrics_and_constants: this TaskGroup is skipped if this isn't the first run of the month. In this step, we update the underlying popularity DB tables:
    1. update_media_popularity_metrics_table: this adds any new popularity metrics and updates the configured percentile.
    2. update_media_popularity_constants_view: this completely recalculates the popularity constant for each provider.
  3. update_materialized_popularity_view: this always runs. It refreshes the materialized popularity view, which updates popularity data for existing records and adds data for records that were ingested since the last refresh
  4. data_refresh: Triggers the actual data refresh on the ingestion server, and awaits its completion. Read more in Add data refresh to Airflow #397.

Only the wait_for_data_refresh step (first step of the remote data refresh) is restricted to the data_refresh pool with one worker slot. This works to ensure that the remote data refreshes cannot occur concurrently, but the popularity refresh steps preceding it can.

There is also an option to pass a force_refresh_metrics option in the DagRun config, which can be set to true to make the metrics refresh tasks run even if it's not the first of the month. We might use this if a new metric is added/constants need to be recalculated mid-month. We can also set the option to false if for some reason we want to skip this time-consuming step.

Testing Instructions

Prereqs for setup

  • Make sure your ingestion server is running locally (just up from the api repo)
  • Set up an Airflow connection to the ingestion server in openverse-catalog/.env: AIRFLOW_CONN_DATA_REFRESH="http://172.17.0.1:8001"
  • Create the special data_refresh pool through the Airflow UI (Admin -> Pools). Name it data_refresh and give it 1 Slot

Tests

First run of the month
If you have any runs for audio_data_refresh from this month, delete them. Then run audio_data_refresh (encouraged because it is shorter). Ensure that all tasks, including the refresh_all_popularity_data tasks, run

Subsequent runs
Once it has finished, re-run audio_data_refresh and make sure the refresh_all_popularity_data tasks skip, but all others run successfully

Force refresh_all_popularity_data tasks to run with config
In Airflow, click on the run button and select "Trigger DAG with config". You'll be prompted to provide config data as json.

  • Run with {"force_refresh_metrics": true}, and verify that the refresh_all_popularity_data tasks do not skip even though it is not the first run of the month
  • Run with {"force_refresh_metrics": false} and verify they skip again
  • Run with an empty config and verify they skip

Multiple DAGs
The data_refresh steps should not run concurrently, but all previous steps can. To test this we can take advantage of how slow the image refresh is compared to audio.

  • Start the image refresh and wait for it to get to the wait_for_data_refresh step. Immediately start the audio refresh. It should complete the popularity calculation steps and get to wait_for_data_refresh before the image refresh is complete; thus its wait_for_data_refresh should enter up_for_reschedule, and only succeed when image finishes.
  • Also try starting image and audio at the same time and observe that the month_check/popularity steps can happen concurrently

Checklist

  • My pull request has a descriptive title (not a vague title like Update index.md).
  • My pull request targets the default branch of the repository (main) or a parent feature branch.
  • My commit messages follow best practices.
  • My code follows the established code style of the repository.
  • I added or updated tests for the changes I made (if applicable).
  • I added or updated documentation (if applicable).
  • I tried running the project locally and verified that there are no visible errors.

Developer Certificate of Origin

Developer Certificate of Origin
Developer Certificate of Origin
Version 1.1

Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
1 Letterman Drive
Suite D4700
San Francisco, CA, 94129

Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.


Developer's Certificate of Origin 1.1

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
    have the right to submit it under the open source license
    indicated in the file; or

(b) The contribution is based upon previous work that, to the best
    of my knowledge, is covered under an appropriate open source
    license and I have the right under that license to submit that
    work with modifications, whether created in whole or in part
    by me, under the same open source license (unless I am
    permitted to submit under a different license), as indicated
    in the file; or

(c) The contribution was provided directly to me by some other
    person who certified (a), (b) or (c) and I have not modified
    it.

(d) I understand and agree that this project and the contribution
    are public and that a record of the contribution (including all
    personal information I submit with it, including my sign-off) is
    maintained indefinitely and may be redistributed consistent with
    this project or the open source license(s) involved.

@stacimc stacimc added 🟨 priority: medium Not blocking but should be addressed soon ✨ goal: improvement Improvement to an existing user-facing feature 💻 aspect: code Concerns the software code in the repository 🐍 tech: python Requires familiarity with Python 🔧 tech: airflow Requires familiarity with Apache Airflow labels May 2, 2022
@stacimc stacimc added this to In progress in Openverse PRs via automation May 2, 2022
@stacimc stacimc self-assigned this May 2, 2022
@@ -0,0 +1,174 @@
"""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewers: This file just pulls out all the old data_refresh steps into a new subfactory.

@stacimc stacimc changed the title [WIP] Merge popularity calculations and data refresh into a single DAG Merge popularity calculations and data refresh into a single DAG May 5, 2022
Copy link
Contributor

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few more small things!

openverse_catalog/dags/common/constants.py Outdated Show resolved Hide resolved
openverse_catalog/dags/data_refresh/data_refresh_types.py Outdated Show resolved Hide resolved
python_callable=sql.update_db_view,
op_args=[POSTGRES_CONN_ID, media_type],
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
doc_md=create_refresh_view_data_task.__doc__,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💖

@stacimc stacimc requested a review from a team May 10, 2022 17:06
@obulat
Copy link
Contributor

obulat commented May 11, 2022

I cannot test this. On the first step, when I trigger audio_data_refresh, I get an error on trigger_data_refresh task:

requests.exceptions.ConnectionError: HTTPSConnectionPool(host='172.17.0.1', port=8001): Max retries exceeded with url: /task (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0xffff84e84e50>: Failed to establish a new connection: [Errno 110] Connection timed out'))
[2022-05-11, 15:30:56 +03] {taskinstance.py:1272} INFO - Marking task as UP_FOR_RETRY. dag_id=audio_data_refresh, task_id=data_refresh.trigger_data_refresh, execution_date=20220501T000000, start_date=20220511T122846, end_date=20220511T123056
[2022-05-11, 15:30:56 +03] {standard_task_runner.py:89} ERROR - Failed to execute job 7 for task data_refresh.trigger_data_refresh

The ingestion server seems to finish the task, at least that's what I understand from all the logs, and their last lines:

2022-05-11 12:27:49,691 INFO indexer.py:389 - Created 'audio' index alias pointing to audio-5a96b0da35c447cea8604ecd5adbd64e
2022-05-11 12:27:49,691 INFO tasks.py:137 - Task a167d816-d173-4410-a4ad-10672404db3b exited.

I really love being able to run both the catalog and the API ingestion server locally! I hope I can actually finish the refresh, though :)

@stacimc
Copy link
Contributor Author

stacimc commented May 11, 2022

Thanks for testing @obulat! I wonder, can you check for .env for the AIRFLOW_CONN_DATA_REFRESH variable? Previously we were having to url-encode and prepend this, so the variable would be set to something like:

AIRFLOW_CONN_DATA_REFRESH="http://http%3A%2F%2Fhost.docker.internal%3A8001"

But with the changes in #480 we can now just configure:

AIRFLOW_CONN_DATA_REFRESH="http://host.docker.internal:8001"

If you haven't updated your connection variables to the improved format, you'll see those connection errors.


You can find more background information on this process in the following
issues and related PRs:

- [[Feature] Data refresh orchestration DAG](
https://github.com/WordPress/openverse-catalog/issues/353)
- [[Feature] Merge popularity calculations and data refresh into a single DAG](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to have these PRs linked here

@obulat
Copy link
Contributor

obulat commented May 12, 2022

If you haven't updated your connection variables to the improved format, you'll see those connection errors.

Thank you, @stacimc! This fixed it, and I managed to run all of the steps you describe in the Testing instructions. It all works really well, and the workflow is much simpler now.

One thing I don't understand is this: I have not ingested any data, I was running data_refresh on the fresh install of the Catalog. What data was it using to run it? What was it refreshing?

Copy link
Contributor

@obulat obulat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for such detailed instructions for testing! It all works great and simplifies a lot in the process ❤️

Openverse PRs automation moved this from Needs review to Reviewer approved May 12, 2022
@stacimc
Copy link
Contributor Author

stacimc commented May 12, 2022

One thing I don't understand is this: I have not ingested any data, I was running data_refresh on the fresh install of the Catalog. What data was it using to run it? What was it refreshing?

@obulat The process syncs the catalog with the API DB, but as far as I understand it isn't smart enough to detect whether there have actually been any changes.

Actually, when you're testing locally the data refresh server doesn't even really connect to your local catalog by default. It connects to a mock upstream_db with sample data. You can force it to connect to your local catalog by updating the ingestion-server/env.docker:

UPSTREAM_DB_HOST="172.17.0.1"
UPSTREAM_DB_POT="5434" # Note you have to update the port too!

@stacimc
Copy link
Contributor Author

stacimc commented May 13, 2022

I just realized that we needed to update the execution_timeout for some of the popularity calculation steps for Image; the default of 1 hour was being applied, which certainly would not have been enough for production data 😄

@AetherUnbound, @obulat -- just wanted to give you an opportunity to object/add feedback to the last commit if you'd like before I merge. Sorry about that!

Copy link
Contributor

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call on the timeouts! I love being able to pass around the data classes themselves 😄

@obulat
Copy link
Contributor

obulat commented May 13, 2022

Actually, when you're testing locally the data refresh server doesn't even really connect to your local catalog by default. It connects to a mock upstream_db with sample data. You can force it to connect to your local catalog by updating the ingestion-server/env.docker:

Have you tried running both the catalog and the API locally? I have wanted to try it for a year now :) I managed to get it to work, but maybe there's is a simpler way?

Simply setting the upstream_db values didn't work. I looked at the local networks (docker network ls) and saw that there are two bridge networks: one for API and one for Catalog, and they are not connected. So, to get the two to be in the same network locally, I added a network property to the API docker.compose file like this:

networks:
  default:
    external:
      name: openverse-catalog_default

Based on https://docs.docker.com/compose/networking/#use-a-pre-existing-network

And it worked, and now the local catalog and the API are talking to each other.

And none of this is related to this PR, of course, 😂

@stacimc stacimc merged commit 3ef7138 into main May 13, 2022
Openverse PRs automation moved this from Reviewer approved to Merged! May 13, 2022
@stacimc stacimc deleted the feature/single-dag-for-popularity-and-data-refresh branch May 13, 2022 16:30
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
💻 aspect: code Concerns the software code in the repository ✨ goal: improvement Improvement to an existing user-facing feature 🟨 priority: medium Not blocking but should be addressed soon 🔧 tech: airflow Requires familiarity with Apache Airflow 🐍 tech: python Requires familiarity with Python
Projects
No open projects
Openverse PRs
  
Merged!
Development

Successfully merging this pull request may close these issues.

More tightly couple matview refresh, popularity recalculation, and data refresh
4 participants