Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Revert "Remove slaved id tracker" #14463

Merged
merged 1 commit into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion changelog.d/14376.misc

This file was deleted.

13 changes: 13 additions & 0 deletions synapse/replication/slave/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
13 changes: 13 additions & 0 deletions synapse/replication/slave/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
50 changes: 50 additions & 0 deletions synapse/replication/slave/storage/_slaved_id_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional, Tuple

from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.util.id_generators import AbstractStreamIdTracker, _load_current_id


class SlavedIdTracker(AbstractStreamIdTracker):
"""Tracks the "current" stream ID of a stream with a single writer.

See `AbstractStreamIdTracker` for more details.

Note that this class does not work correctly when there are multiple
writers.
"""

def __init__(
self,
db_conn: LoggingDatabaseConnection,
table: str,
column: str,
extra_tables: Optional[List[Tuple[str, str]]] = None,
step: int = 1,
):
self.step = step
self._current = _load_current_id(db_conn, table, column, step)
if extra_tables:
for table, column in extra_tables:
self.advance(None, _load_current_id(db_conn, table, column))

def advance(self, instance_name: Optional[str], new_id: int) -> None:
self._current = (max if self.step > 0 else min)(self._current, new_id)

def get_current_token(self) -> int:
return self._current

def get_current_token_for_writer(self, instance_name: str) -> int:
return self.get_current_token()
30 changes: 20 additions & 10 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)

from synapse.api.constants import AccountDataTypes
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import AccountDataStream, TagAccountDataStream
from synapse.storage._base import db_to_json
from synapse.storage.database import (
Expand Down Expand Up @@ -67,11 +68,12 @@ def __init__(
# to write account data. A value of `True` implies that `_account_data_id_gen`
# is an `AbstractStreamIdGenerator` and not just a tracker.
self._account_data_id_gen: AbstractStreamIdTracker
self._can_write_to_account_data = (
self._instance_name in hs.config.worker.writers.account_data
)

if isinstance(database.engine, PostgresEngine):
self._can_write_to_account_data = (
self._instance_name in hs.config.worker.writers.account_data
)

self._account_data_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
Expand All @@ -93,13 +95,21 @@ def __init__(
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
self._account_data_id_gen = StreamIdGenerator(
db_conn,
"room_account_data",
"stream_id",
extra_tables=[("room_tags_revisions", "stream_id")],
is_writer=self._instance_name in hs.config.worker.writers.account_data,
)
if self._instance_name in hs.config.worker.writers.account_data:
self._can_write_to_account_data = True
self._account_data_id_gen = StreamIdGenerator(
db_conn,
"room_account_data",
"stream_id",
extra_tables=[("room_tags_revisions", "stream_id")],
)
else:
self._account_data_id_gen = SlavedIdTracker(
db_conn,
"room_account_data",
"stream_id",
extra_tables=[("room_tags_revisions", "stream_id")],
)

account_max = self.get_max_account_data_stream_id()
self._account_data_stream_cache = StreamChangeCache(
Expand Down
36 changes: 23 additions & 13 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
whitelisted_homeserver,
)
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
Expand Down Expand Up @@ -85,19 +86,28 @@ def __init__(
):
super().__init__(database, db_conn, hs)

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn,
"device_lists_stream",
"stream_id",
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
],
is_writer=hs.config.worker.worker_app is None,
)
if hs.config.worker.worker_app is None:
self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn,
"device_lists_stream",
"stream_id",
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
],
)
else:
self._device_list_id_gen = SlavedIdTracker(
db_conn,
"device_lists_stream",
"stream_id",
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
],
)

# Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a
# StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker).
Expand Down
35 changes: 21 additions & 14 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
run_as_background_process,
wrap_as_background_process,
)
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams.events import EventsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
Expand Down Expand Up @@ -212,20 +213,26 @@ def __init__(
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
self._stream_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
)
self._backfill_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
step=-1,
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
)
if hs.get_instance_name() in hs.config.worker.writers.events:
self._stream_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
)
self._backfill_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
step=-1,
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
)
else:
self._stream_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering"
)
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)

events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
Expand Down
17 changes: 9 additions & 8 deletions synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from synapse.api.errors import StoreError
from synapse.config.homeserver import ExperimentalConfig
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import PushRulesStream
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
Expand Down Expand Up @@ -110,14 +111,14 @@ def __init__(
):
super().__init__(database, db_conn, hs)

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn,
"push_rules_stream",
"stream_id",
is_writer=hs.config.worker.worker_app is None,
)
if hs.config.worker.worker_app is None:
self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn, "push_rules_stream", "stream_id"
)
else:
self._push_rules_stream_id_gen = SlavedIdTracker(
db_conn, "push_rules_stream", "stream_id"
)

push_rules_prefill, push_rules_id = self.db_pool.get_cache_dict(
db_conn,
Expand Down
24 changes: 15 additions & 9 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)

from synapse.push import PusherConfig, ThrottleParams
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import PushersStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
Expand Down Expand Up @@ -58,15 +59,20 @@ def __init__(
):
super().__init__(database, db_conn, hs)

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn,
"pushers",
"id",
extra_tables=[("deleted_pushers", "stream_id")],
is_writer=hs.config.worker.worker_app is None,
)
if hs.config.worker.worker_app is None:
self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn,
"pushers",
"id",
extra_tables=[("deleted_pushers", "stream_id")],
)
else:
self._pushers_id_gen = SlavedIdTracker(
db_conn,
"pushers",
"id",
extra_tables=[("deleted_pushers", "stream_id")],
)

self.db_pool.updates.register_background_update_handler(
"remove_deactivated_pushers",
Expand Down
18 changes: 9 additions & 9 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)

from synapse.api.constants import EduTypes
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
Expand Down Expand Up @@ -60,9 +61,6 @@ def __init__(
hs: "HomeServer",
):
self._instance_name = hs.get_instance_name()

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._receipts_id_gen: AbstractStreamIdTracker

if isinstance(database.engine, PostgresEngine):
Expand All @@ -89,12 +87,14 @@ def __init__(
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
self._receipts_id_gen = StreamIdGenerator(
db_conn,
"receipts_linearized",
"stream_id",
is_writer=hs.get_instance_name() in hs.config.worker.writers.receipts,
)
if hs.get_instance_name() in hs.config.worker.writers.receipts:
self._receipts_id_gen = StreamIdGenerator(
db_conn, "receipts_linearized", "stream_id"
)
else:
self._receipts_id_gen = SlavedIdTracker(
db_conn, "receipts_linearized", "stream_id"
)

super().__init__(database, db_conn, hs)

Expand Down
13 changes: 3 additions & 10 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,11 @@ def __init__(
column: str,
extra_tables: Iterable[Tuple[str, str]] = (),
step: int = 1,
is_writer: bool = True,
) -> None:
assert step != 0
self._lock = threading.Lock()
self._step: int = step
self._current: int = _load_current_id(db_conn, table, column, step)
self._is_writer = is_writer
for table, column in extra_tables:
self._current = (max if step > 0 else min)(
self._current, _load_current_id(db_conn, table, column, step)
Expand All @@ -206,11 +204,9 @@ def __init__(
self._unfinished_ids: OrderedDict[int, int] = OrderedDict()

def advance(self, instance_name: str, new_id: int) -> None:
# Advance should never be called on a writer instance, only over replication
if self._is_writer:
raise Exception("Replication is not supported by writer StreamIdGenerator")

self._current = (max if self._step > 0 else min)(self._current, new_id)
# `StreamIdGenerator` should only be used when there is a single writer,
# so replication should never happen.
raise Exception("Replication is not supported by StreamIdGenerator")

def get_next(self) -> AsyncContextManager[int]:
with self._lock:
Expand Down Expand Up @@ -253,9 +249,6 @@ def manager() -> Generator[Sequence[int], None, None]:
return _AsyncCtxManagerWrapper(manager())

def get_current_token(self) -> int:
if self._is_writer:
return self._current

with self._lock:
if self._unfinished_ids:
return next(iter(self._unfinished_ids)) - self._step
Expand Down