Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Remove ChainedIdGenerator. (#8123)
Browse files Browse the repository at this point in the history
It's just a thin wrapper around two ID gens to make `get_current_token`
and `get_next` return tuples. This can easily be replaced by calling the
appropriate methods on the underlying ID gens directly.
  • Loading branch information
erikjohnston authored Aug 19, 2020
1 parent f594e43 commit c9c544c
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 95 deletions.
1 change: 1 addition & 0 deletions changelog.d/8123.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove `ChainedIdGenerator`.
10 changes: 4 additions & 6 deletions synapse/replication/slave/storage/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import PushRulesStream
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore

from .events import SlavedEventStore


class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
def get_push_rules_stream_token(self):
return (
self._push_rules_stream_id_gen.get_current_token(),
self._stream_id_gen.get_current_token(),
)

def get_max_push_rules_stream_id(self):
return self._push_rules_stream_id_gen.get_current_token()

def process_replication_rows(self, stream_name, instance_name, token, rows):
# We assert this for the benefit of mypy
assert isinstance(self._push_rules_stream_id_gen, SlavedIdTracker)

if stream_name == PushRulesStream.NAME:
self._push_rules_stream_id_gen.advance(token)
for row in rows:
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def __init__(self, hs):
)

def _current_token(self, instance_name: str) -> int:
push_rules_token, _ = self.store.get_push_rules_stream_token()
push_rules_token = self.store.get_max_push_rules_stream_id()
return push_rules_token


Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/client/v1/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def on_OPTIONS(self, request, path):
return 200, {}

def notify_user(self, user_id):
stream_id, _ = self.store.get_push_rules_stream_token()
stream_id = self.store.get_max_push_rules_stream_id()
self.notifier.on_new_event("push_rules_key", stream_id, users=[user_id])

async def set_rule_attr(self, user_id, spec, val):
Expand Down
36 changes: 17 additions & 19 deletions synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
from synapse.storage.util.id_generators import ChainedIdGenerator
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
Expand Down Expand Up @@ -82,9 +82,9 @@ def __init__(self, database: DatabasePool, db_conn, hs):
super(PushRulesWorkerStore, self).__init__(database, db_conn, hs)

if hs.config.worker.worker_app is None:
self._push_rules_stream_id_gen = ChainedIdGenerator(
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
) # type: Union[ChainedIdGenerator, SlavedIdTracker]
self._push_rules_stream_id_gen = StreamIdGenerator(
db_conn, "push_rules_stream", "stream_id"
) # type: Union[StreamIdGenerator, SlavedIdTracker]
else:
self._push_rules_stream_id_gen = SlavedIdTracker(
db_conn, "push_rules_stream", "stream_id"
Expand Down Expand Up @@ -338,8 +338,9 @@ async def add_push_rule(
) -> None:
conditions_json = json_encoder.encode(conditions)
actions_json = json_encoder.encode(actions)
with self._push_rules_stream_id_gen.get_next() as ids:
stream_id, event_stream_ordering = ids
with self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()

if before or after:
await self.db_pool.runInteraction(
"_add_push_rule_relative_txn",
Expand Down Expand Up @@ -559,8 +560,9 @@ def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
txn, stream_id, event_stream_ordering, user_id, rule_id, op="DELETE"
)

with self._push_rules_stream_id_gen.get_next() as ids:
stream_id, event_stream_ordering = ids
with self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()

await self.db_pool.runInteraction(
"delete_push_rule",
delete_push_rule_txn,
Expand All @@ -569,8 +571,9 @@ def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
)

async def set_push_rule_enabled(self, user_id, rule_id, enabled) -> None:
with self._push_rules_stream_id_gen.get_next() as ids:
stream_id, event_stream_ordering = ids
with self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()

await self.db_pool.runInteraction(
"_set_push_rule_enabled_txn",
self._set_push_rule_enabled_txn,
Expand Down Expand Up @@ -643,8 +646,9 @@ def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering):
data={"actions": actions_json},
)

with self._push_rules_stream_id_gen.get_next() as ids:
stream_id, event_stream_ordering = ids
with self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()

await self.db_pool.runInteraction(
"set_push_rule_actions",
set_push_rule_actions_txn,
Expand Down Expand Up @@ -673,11 +677,5 @@ def _insert_push_rules_update_txn(
self.push_rules_stream_cache.entity_has_changed, user_id, stream_id
)

def get_push_rules_stream_token(self):
"""Get the position of the push rules stream.
Returns a pair of a stream id for the push_rules stream and the
room stream ordering it corresponds to."""
return self._push_rules_stream_id_gen.get_current_token()

def get_max_push_rules_stream_id(self):
return self.get_push_rules_stream_token()[0]
return self._push_rules_stream_id_gen.get_current_token()
68 changes: 1 addition & 67 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import contextlib
import threading
from collections import deque
from typing import Dict, Set, Tuple
from typing import Dict, Set

from typing_extensions import Deque

Expand Down Expand Up @@ -167,72 +167,6 @@ def get_current_token_for_writer(self, instance_name: str) -> int:
return self.get_current_token()


class ChainedIdGenerator(object):
"""Used to generate new stream ids where the stream must be kept in sync
with another stream. It generates pairs of IDs, the first element is an
integer ID for this stream, the second element is the ID for the stream
that this stream needs to be kept in sync with."""

def __init__(self, chained_generator, db_conn, table, column):
self.chained_generator = chained_generator
self._table = table
self._lock = threading.Lock()
self._current_max = _load_current_id(db_conn, table, column)
self._unfinished_ids = deque() # type: Deque[Tuple[int, int]]

def get_next(self):
"""
Usage:
with stream_id_gen.get_next() as (stream_id, chained_id):
# ... persist event ...
"""
with self._lock:
self._current_max += 1
next_id = self._current_max
chained_id = self.chained_generator.get_current_token()

self._unfinished_ids.append((next_id, chained_id))

@contextlib.contextmanager
def manager():
try:
yield (next_id, chained_id)
finally:
with self._lock:
self._unfinished_ids.remove((next_id, chained_id))

return manager()

def get_current_token(self):
"""Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted.
"""
with self._lock:
if self._unfinished_ids:
stream_id, chained_id = self._unfinished_ids[0]
return stream_id - 1, chained_id

return self._current_max, self.chained_generator.get_current_token()

def advance(self, token: int):
"""Stub implementation for advancing the token when receiving updates
over replication; raises an exception as this instance should be the
only source of updates.
"""

raise Exception(
"Attempted to advance token on source for table %r", self._table
)

def get_current_token_for_writer(self, instance_name: str) -> Tuple[int, int]:
"""Returns the position of the given writer.
For streams with single writers this is equivalent to
`get_current_token`.
"""
return self.get_current_token()


class MultiWriterIdGenerator:
"""An ID generator that tracks a stream that can have multiple writers.
Expand Down
2 changes: 1 addition & 1 deletion synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, hs):
self.store = hs.get_datastore()

def get_current_token(self) -> StreamToken:
push_rules_key, _ = self.store.get_push_rules_stream_token()
push_rules_key = self.store.get_max_push_rules_stream_id()
to_device_key = self.store.get_to_device_stream_token()
device_list_key = self.store.get_device_stream_token()
groups_key = self.store.get_group_stream_token()
Expand Down

0 comments on commit c9c544c

Please sign in to comment.