Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reintroduce #14376, with bugfix for monoliths #14468

Merged
merged 5 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14376.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove old stream ID tracking code. Contributed by Nick @Beeper (@fizzadar).
1 change: 1 addition & 0 deletions changelog.d/14468.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove old stream ID tracking code. Contributed by Nick @Beeper (@fizzadar).
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ disallow_untyped_defs = True
[mypy-tests.state.test_profile]
disallow_untyped_defs = True

[mypy-tests.storage.test_id_generators]
disallow_untyped_defs = True

[mypy-tests.storage.test_profile]
disallow_untyped_defs = True

Expand Down
13 changes: 0 additions & 13 deletions synapse/replication/slave/__init__.py

This file was deleted.

13 changes: 0 additions & 13 deletions synapse/replication/slave/storage/__init__.py

This file was deleted.

50 changes: 0 additions & 50 deletions synapse/replication/slave/storage/_slaved_id_tracker.py

This file was deleted.

30 changes: 10 additions & 20 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
)

from synapse.api.constants import AccountDataTypes
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import AccountDataStream, TagAccountDataStream
from synapse.storage._base import db_to_json
from synapse.storage.database import (
Expand Down Expand Up @@ -68,12 +67,11 @@ def __init__(
# to write account data. A value of `True` implies that `_account_data_id_gen`
# is an `AbstractStreamIdGenerator` and not just a tracker.
self._account_data_id_gen: AbstractStreamIdTracker
self._can_write_to_account_data = (
self._instance_name in hs.config.worker.writers.account_data
)

if isinstance(database.engine, PostgresEngine):
self._can_write_to_account_data = (
self._instance_name in hs.config.worker.writers.account_data
)

self._account_data_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
Expand All @@ -95,21 +93,13 @@ def __init__(
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
if self._instance_name in hs.config.worker.writers.account_data:
self._can_write_to_account_data = True
self._account_data_id_gen = StreamIdGenerator(
db_conn,
"room_account_data",
"stream_id",
extra_tables=[("room_tags_revisions", "stream_id")],
)
else:
self._account_data_id_gen = SlavedIdTracker(
db_conn,
"room_account_data",
"stream_id",
extra_tables=[("room_tags_revisions", "stream_id")],
)
self._account_data_id_gen = StreamIdGenerator(
db_conn,
"room_account_data",
"stream_id",
extra_tables=[("room_tags_revisions", "stream_id")],
is_writer=self._instance_name in hs.config.worker.writers.account_data,
)

account_max = self.get_max_account_data_stream_id()
self._account_data_stream_cache = StreamChangeCache(
Expand Down
36 changes: 13 additions & 23 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
whitelisted_homeserver,
)
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
Expand Down Expand Up @@ -86,28 +85,19 @@ def __init__(
):
super().__init__(database, db_conn, hs)

if hs.config.worker.worker_app is None:
self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn,
"device_lists_stream",
"stream_id",
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
],
)
else:
self._device_list_id_gen = SlavedIdTracker(
db_conn,
"device_lists_stream",
"stream_id",
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
],
)
# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn,
"device_lists_stream",
"stream_id",
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
],
is_writer=hs.config.worker.worker_app is None,
)

# Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a
# StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker).
Expand Down
35 changes: 14 additions & 21 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
run_as_background_process,
wrap_as_background_process,
)
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams.events import EventsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
Expand Down Expand Up @@ -213,26 +212,20 @@ def __init__(
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
if hs.get_instance_name() in hs.config.worker.writers.events:
self._stream_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
)
self._backfill_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
step=-1,
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
)
else:
self._stream_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering"
)
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)
self._stream_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
)
self._backfill_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
step=-1,
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
)

events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
Expand Down
17 changes: 8 additions & 9 deletions synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

from synapse.api.errors import StoreError
from synapse.config.homeserver import ExperimentalConfig
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import PushRulesStream
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
Expand Down Expand Up @@ -111,14 +110,14 @@ def __init__(
):
super().__init__(database, db_conn, hs)

if hs.config.worker.worker_app is None:
self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn, "push_rules_stream", "stream_id"
)
else:
self._push_rules_stream_id_gen = SlavedIdTracker(
db_conn, "push_rules_stream", "stream_id"
)
# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn,
"push_rules_stream",
"stream_id",
is_writer=hs.config.worker.worker_app is None,
)

push_rules_prefill, push_rules_id = self.db_pool.get_cache_dict(
db_conn,
Expand Down
24 changes: 9 additions & 15 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
)

from synapse.push import PusherConfig, ThrottleParams
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import PushersStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
Expand Down Expand Up @@ -59,20 +58,15 @@ def __init__(
):
super().__init__(database, db_conn, hs)

if hs.config.worker.worker_app is None:
self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn,
"pushers",
"id",
extra_tables=[("deleted_pushers", "stream_id")],
)
else:
self._pushers_id_gen = SlavedIdTracker(
db_conn,
"pushers",
"id",
extra_tables=[("deleted_pushers", "stream_id")],
)
# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn,
"pushers",
"id",
extra_tables=[("deleted_pushers", "stream_id")],
is_writer=hs.config.worker.worker_app is None,
)

self.db_pool.updates.register_background_update_handler(
"remove_deactivated_pushers",
Expand Down
18 changes: 9 additions & 9 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
)

from synapse.api.constants import EduTypes
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
Expand Down Expand Up @@ -61,6 +60,9 @@ def __init__(
hs: "HomeServer",
):
self._instance_name = hs.get_instance_name()

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._receipts_id_gen: AbstractStreamIdTracker

if isinstance(database.engine, PostgresEngine):
Expand All @@ -87,14 +89,12 @@ def __init__(
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
if hs.get_instance_name() in hs.config.worker.writers.receipts:
self._receipts_id_gen = StreamIdGenerator(
db_conn, "receipts_linearized", "stream_id"
)
else:
self._receipts_id_gen = SlavedIdTracker(
db_conn, "receipts_linearized", "stream_id"
)
self._receipts_id_gen = StreamIdGenerator(
db_conn,
"receipts_linearized",
"stream_id",
is_writer=hs.get_instance_name() in hs.config.worker.writers.receipts,
)

super().__init__(database, db_conn, hs)

Expand Down
13 changes: 10 additions & 3 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,13 @@ def __init__(
column: str,
extra_tables: Iterable[Tuple[str, str]] = (),
step: int = 1,
is_writer: bool = True,
) -> None:
assert step != 0
self._lock = threading.Lock()
self._step: int = step
self._current: int = _load_current_id(db_conn, table, column, step)
self._is_writer = is_writer
for table, column in extra_tables:
self._current = (max if step > 0 else min)(
self._current, _load_current_id(db_conn, table, column, step)
Expand All @@ -204,9 +206,11 @@ def __init__(
self._unfinished_ids: OrderedDict[int, int] = OrderedDict()

def advance(self, instance_name: str, new_id: int) -> None:
# `StreamIdGenerator` should only be used when there is a single writer,
# so replication should never happen.
raise Exception("Replication is not supported by StreamIdGenerator")
# Advance should never be called on a writer instance, only over replication
if self._is_writer:
raise Exception("Replication is not supported by writer StreamIdGenerator")

self._current = (max if self._step > 0 else min)(self._current, new_id)

def get_next(self) -> AsyncContextManager[int]:
with self._lock:
Expand Down Expand Up @@ -249,6 +253,9 @@ def manager() -> Generator[Sequence[int], None, None]:
return _AsyncCtxManagerWrapper(manager())

def get_current_token(self) -> int:
if not self._is_writer:
return self._current

with self._lock:
if self._unfinished_ids:
return next(iter(self._unfinished_ids)) - self._step
Expand Down
Loading