From 368dc3dcf483d5afc81a261c12f51970b66ee225 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Oct 2022 14:28:33 +0100 Subject: [PATCH 1/7] Add BG task to clear stale staged push actions --- .../databases/main/event_push_actions.py | 44 +++++++++++++++++++ synapse/storage/schema/__init__.py | 1 + .../main/delta/73/05old_push_actions.sql | 18 ++++++++ 3 files changed, 63 insertions(+) create mode 100644 synapse/storage/schema/main/delta/73/05old_push_actions.sql diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 3fdf128d9e24..1857b632ae3e 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -224,6 +224,10 @@ def __init__( self._rotate_notifs, 30 * 1000 ) + self._clear_old_staging_loop = self._clock.looping_call( + self._clear_old_push_actions_staging, 30 * 60 * 1000 + ) + self.db_pool.updates.register_background_index_update( "event_push_summary_unique_index", index_name="event_push_summary_unique_index", @@ -802,6 +806,7 @@ def _gen_entry( is_highlight, # highlight column int(count_as_unread), # unread column thread_id, # thread_id column + self._clock.time_msec(), # inserted_ts column ) await self.db_pool.simple_insert_many( @@ -814,6 +819,7 @@ def _gen_entry( "highlight", "unread", "thread_id", + "inserted_ts", ), values=[ _gen_entry(user_id, actions) @@ -1340,6 +1346,44 @@ def remove_old_push_actions_that_have_rotated_txn( if done: break + @wrap_as_background_process("_clear_old_push_actions_staging") + async def _clear_old_push_actions_staging(self) -> None: + """Clear out any old event push actions from the staging table for + events that we failed to persist. + """ + + # We delete anything more than an hour old, on the assumption that we'll + # never take more than an hour to persist an event. + delete_before_ts = self._clock.time_msec() - 60 * 60 * 1000 + + # We don't have an index on `inserted_ts`, instead we assume that the + # number of "live" rows in `event_push_actions_staging` is small enough + # that an infrequent periodic scan won't cause a problem + limit = 1000 + sql = """ + DELETE FROM event_push_actions_staging WHERE event_id IN ( + SELECT event_id FROM event_push_actions_staging WHERE + inserted_ts < ? + LIMIT ? + ) + """ + + def _clear_old_push_actions_staging_txn(txn: LoggingTransaction) -> bool: + txn.execute(sql, (delete_before_ts, limit)) + return txn.rowcount >= limit + + while True: + # Returns true if we have more stuff to delete from the table. + deleted = await self.db_pool.runInteraction( + "_clear_old_push_actions_staging", _clear_old_push_actions_staging_txn + ) + + if not deleted: + return + + # We sleep to ensure that we don't overwhelm the DB. + self._clock.sleep(1.0) + class EventPushActionsStore(EventPushActionsWorkerStore): EPA_HIGHLIGHT_INDEX = "epa_highlight_index" diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index f29424d17a74..4a5c947699eb 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -85,6 +85,7 @@ events over federation. - Add indexes to various tables (`event_failed_pull_attempts`, `insertion_events`, `batch_events`) to make it easy to delete all associated rows when purging a room. + - `inserted_ts` column is added to `event_push_actions_staging` table. """ diff --git a/synapse/storage/schema/main/delta/73/05old_push_actions.sql b/synapse/storage/schema/main/delta/73/05old_push_actions.sql new file mode 100644 index 000000000000..e8d0abe46187 --- /dev/null +++ b/synapse/storage/schema/main/delta/73/05old_push_actions.sql @@ -0,0 +1,18 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a column so that we know when a push action was inserted, to make it +-- easier to clear out old ones. +ALTER TABLE event_push_actions_staging ADD COLUMN inserted_ts BIGINT; From 481bbb19d737aba76fc442f05eb2f01e6ed7b2cf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Oct 2022 15:36:11 +0100 Subject: [PATCH 2/7] Handle existing rows --- .../databases/main/event_push_actions.py | 16 +++++++++++-- ...ns.sql => 05old_push_actions.sql.postgres} | 4 ++++ .../delta/73/05old_push_actions.sql.sqlite | 24 +++++++++++++++++++ 3 files changed, 42 insertions(+), 2 deletions(-) rename synapse/storage/schema/main/delta/73/{05old_push_actions.sql => 05old_push_actions.sql.postgres} (76%) create mode 100644 synapse/storage/schema/main/delta/73/05old_push_actions.sql.sqlite diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 1857b632ae3e..cb7948768f86 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -205,6 +205,9 @@ def __init__( ): super().__init__(database, db_conn, hs) + # Track when the process started. + self._started_ts = self._clock.time_msec() + # These get correctly set by _find_stream_orderings_for_times_txn self.stream_ordering_month_ago: Optional[int] = None self.stream_ordering_day_ago: Optional[int] = None @@ -1356,14 +1359,23 @@ async def _clear_old_push_actions_staging(self) -> None: # never take more than an hour to persist an event. delete_before_ts = self._clock.time_msec() - 60 * 60 * 1000 + if self._started_ts > self._clock.time_msec() - delete_before_ts: + # We need to wait for at least an hour before we started deleting, + # so that we know it's safe to delete rows with NULL `inserted_ts`. + return + # We don't have an index on `inserted_ts`, instead we assume that the # number of "live" rows in `event_push_actions_staging` is small enough - # that an infrequent periodic scan won't cause a problem + # that an infrequent periodic scan won't cause a problem. + # + # Note: we also delete any columns with NULL `inserted_ts`, this is safe + # as we added a default value to new rows and so they must be at least + # an hour old. limit = 1000 sql = """ DELETE FROM event_push_actions_staging WHERE event_id IN ( SELECT event_id FROM event_push_actions_staging WHERE - inserted_ts < ? + inserted_ts < ? OR inserted_ts IS NULL LIMIT ? ) """ diff --git a/synapse/storage/schema/main/delta/73/05old_push_actions.sql b/synapse/storage/schema/main/delta/73/05old_push_actions.sql.postgres similarity index 76% rename from synapse/storage/schema/main/delta/73/05old_push_actions.sql rename to synapse/storage/schema/main/delta/73/05old_push_actions.sql.postgres index e8d0abe46187..4af1a8470bb6 100644 --- a/synapse/storage/schema/main/delta/73/05old_push_actions.sql +++ b/synapse/storage/schema/main/delta/73/05old_push_actions.sql.postgres @@ -16,3 +16,7 @@ -- Add a column so that we know when a push action was inserted, to make it -- easier to clear out old ones. ALTER TABLE event_push_actions_staging ADD COLUMN inserted_ts BIGINT; + +-- We now add a default for *new* rows. We don't do this above as we don't want +-- to have to update every remove with the new default. +ALTER TABLE event_push_actions_staging ALTER COLUMN inserted_ts SET DEFAULT extract(epoch from now()) * 1000; diff --git a/synapse/storage/schema/main/delta/73/05old_push_actions.sql.sqlite b/synapse/storage/schema/main/delta/73/05old_push_actions.sql.sqlite new file mode 100644 index 000000000000..ab09e33bf7b1 --- /dev/null +++ b/synapse/storage/schema/main/delta/73/05old_push_actions.sql.sqlite @@ -0,0 +1,24 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- On SQLite we must be in monolith mode and updating the database from Synapse, +-- so its safe to assume that `event_push_actions_staging` should be empty (as +-- over restart an event must either have been fully persisted or we'll +-- recalculate the push actions) +DELETE FROM event_push_actions_staging; + +-- Add a column so that we know when a push action was inserted, to make it +-- easier to clear out old ones. +ALTER TABLE event_push_actions_staging ADD COLUMN inserted_ts BIGINT NOT NULL; From 8f04bbe71fc7633c512c726f558f3d825939897d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Oct 2022 15:37:41 +0100 Subject: [PATCH 3/7] Newsfile --- changelog.d/14020.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14020.misc diff --git a/changelog.d/14020.misc b/changelog.d/14020.misc new file mode 100644 index 000000000000..85550b307d42 --- /dev/null +++ b/changelog.d/14020.misc @@ -0,0 +1 @@ +Clear out stale entries in `event_push_actions_staging` table. From 4d0995faa423dd5b4fb0680af4a6b4d36ec61a34 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Oct 2022 15:45:31 +0100 Subject: [PATCH 4/7] Typing --- synapse/storage/databases/main/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index cb7948768f86..70b9e008b7d1 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -798,7 +798,7 @@ async def add_push_actions_to_staging( # can be used to insert into the `event_push_actions_staging` table. def _gen_entry( user_id: str, actions: Collection[Union[Mapping, str]] - ) -> Tuple[str, str, str, int, int, int, str]: + ) -> Tuple[str, str, str, int, int, int, str, int]: is_highlight = 1 if _action_has_highlight(actions) else 0 notif = 1 if "notify" in actions else 0 return ( From 58f0c2c8a32c29d4a3f4c1683a5e483228774f6a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Oct 2022 17:06:40 +0100 Subject: [PATCH 5/7] Fix SQLite CI --- .../storage/schema/main/delta/73/05old_push_actions.sql.sqlite | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/main/delta/73/05old_push_actions.sql.sqlite b/synapse/storage/schema/main/delta/73/05old_push_actions.sql.sqlite index ab09e33bf7b1..7482dabba224 100644 --- a/synapse/storage/schema/main/delta/73/05old_push_actions.sql.sqlite +++ b/synapse/storage/schema/main/delta/73/05old_push_actions.sql.sqlite @@ -21,4 +21,4 @@ DELETE FROM event_push_actions_staging; -- Add a column so that we know when a push action was inserted, to make it -- easier to clear out old ones. -ALTER TABLE event_push_actions_staging ADD COLUMN inserted_ts BIGINT NOT NULL; +ALTER TABLE event_push_actions_staging ADD COLUMN inserted_ts BIGINT; From 608eb066b9275532e486231cd63ae887121a51e7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Oct 2022 17:27:49 +0100 Subject: [PATCH 6/7] Update synapse/storage/databases/main/event_push_actions.py Co-authored-by: Patrick Cloke --- synapse/storage/databases/main/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 70b9e008b7d1..b0f9572ed63c 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -1394,7 +1394,7 @@ def _clear_old_push_actions_staging_txn(txn: LoggingTransaction) -> bool: return # We sleep to ensure that we don't overwhelm the DB. - self._clock.sleep(1.0) + await self._clock.sleep(1.0) class EventPushActionsStore(EventPushActionsWorkerStore): From bfc259d6a7578cd390ae004e0b261d6c7462beeb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Oct 2022 17:57:44 +0100 Subject: [PATCH 7/7] Update synapse/storage/databases/main/event_push_actions.py Co-authored-by: Patrick Cloke --- synapse/storage/databases/main/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index b0f9572ed63c..cdc9ee5a3780 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -1359,7 +1359,7 @@ async def _clear_old_push_actions_staging(self) -> None: # never take more than an hour to persist an event. delete_before_ts = self._clock.time_msec() - 60 * 60 * 1000 - if self._started_ts > self._clock.time_msec() - delete_before_ts: + if self._started_ts > delete_before_ts: # We need to wait for at least an hour before we started deleting, # so that we know it's safe to delete rows with NULL `inserted_ts`. return