Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add groups to sync stream #2378

Merged
merged 5 commits into from
Jul 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
Expand Down Expand Up @@ -75,6 +76,7 @@ class SynchrotronSlavedStore(
SlavedRegistrationStore,
SlavedFilteringStore,
SlavedPresenceStore,
SlavedGroupServerStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedClientIpStore,
Expand Down Expand Up @@ -409,6 +411,10 @@ def process_and_notify(self, stream_name, token, rows):
)
elif stream_name == "presence":
yield self.presence_handler.process_replication_rows(token, rows)
elif stream_name == "receipts":
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows],
)


def start(config_options):
Expand Down
21 changes: 17 additions & 4 deletions synapse/handlers/groups_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, hs):
self.is_mine_id = hs.is_mine_id
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
self.notifier = hs.get_notifier()
self.attestations = hs.get_groups_attestation_signing()

# Ensure attestations get renewed
Expand Down Expand Up @@ -211,13 +212,16 @@ def accept_invite(self, group_id, user_id, content):
user_id=user_id,
)

yield self.store.register_user_group_membership(
token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="join",
is_admin=False,
local_attestation=local_attestation,
remote_attestation=remote_attestation,
)
self.notifier.on_new_event(
"groups_key", token, users=[user_id],
)

defer.returnValue({})

Expand Down Expand Up @@ -257,11 +261,14 @@ def on_invite(self, group_id, user_id, content):
if "avatar_url" in content["profile"]:
local_profile["avatar_url"] = content["profile"]["avatar_url"]

yield self.store.register_user_group_membership(
token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="invite",
content={"profile": local_profile, "inviter": content["inviter"]},
)
self.notifier.on_new_event(
"groups_key", token, users=[user_id],
)

defer.returnValue({"state": "invite"})

Expand All @@ -270,10 +277,13 @@ def remove_user_from_group(self, group_id, user_id, requester_user_id, content):
"""Remove a user from a group
"""
if user_id == requester_user_id:
yield self.store.register_user_group_membership(
token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="leave",
)
self.notifier.on_new_event(
"groups_key", token, users=[user_id],
)

# TODO: Should probably remember that we tried to leave so that we can
# retry if the group server is currently down.
Expand All @@ -296,10 +306,13 @@ def user_removed_from_group(self, group_id, user_id, content):
"""One of our users was removed/kicked from a group
"""
# TODO: Check if user in group
yield self.store.register_user_group_membership(
token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="leave",
)
self.notifier.on_new_event(
"groups_key", token, users=[user_id],
)

@defer.inlineCallbacks
def get_joined_groups(self, user_id):
Expand Down
64 changes: 63 additions & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ def __nonzero__(self):
return True


class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
"join",
"invite",
"leave",
])):
__slots__ = []

def __nonzero__(self):
return bool(self.join or self.invite or self.leave)


class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
"presence", # List of presence events for the user.
Expand All @@ -119,6 +130,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
"device_lists", # List of user_ids whose devices have chanegd
"device_one_time_keys_count", # Dict of algorithm to count for one time keys
# for this device
"groups",
])):
__slots__ = []

Expand All @@ -134,7 +146,8 @@ def __nonzero__(self):
self.archived or
self.account_data or
self.to_device or
self.device_lists
self.device_lists or
self.groups
)


Expand Down Expand Up @@ -560,6 +573,8 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
user_id, device_id
)

yield self._generate_sync_entry_for_groups(sync_result_builder)

defer.returnValue(SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
Expand All @@ -568,10 +583,56 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
archived=sync_result_builder.archived,
to_device=sync_result_builder.to_device,
device_lists=device_lists,
groups=sync_result_builder.groups,
device_one_time_keys_count=one_time_key_counts,
next_batch=sync_result_builder.now_token,
))

@measure_func("_generate_sync_entry_for_groups")
@defer.inlineCallbacks
def _generate_sync_entry_for_groups(self, sync_result_builder):
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token

if since_token and since_token.groups_key:
results = yield self.store.get_groups_changes_for_user(
user_id, since_token.groups_key, now_token.groups_key,
)
else:
results = yield self.store.get_all_groups_for_user(
user_id, now_token.groups_key,
)

invited = {}
joined = {}
left = {}
for result in results:
membership = result["membership"]
group_id = result["group_id"]
gtype = result["type"]
content = result["content"]

if membership == "join":
if gtype == "membership":
content.pop("membership", None)
invited[group_id] = content["content"]
else:
joined.setdefault(group_id, {})[gtype] = content
elif membership == "invite":
if gtype == "membership":
content.pop("membership", None)
invited[group_id] = content["content"]
else:
if gtype == "membership":
left[group_id] = content["content"]

sync_result_builder.groups = GroupsSyncResult(
join=joined,
invite=invited,
leave=left,
)

@measure_func("_generate_sync_entry_for_device_list")
@defer.inlineCallbacks
def _generate_sync_entry_for_device_list(self, sync_result_builder):
Expand Down Expand Up @@ -1260,6 +1321,7 @@ def __init__(self, sync_config, full_state, since_token, now_token):
self.invited = []
self.archived = []
self.device = []
self.groups = None


class RoomSyncResultBuilder(object):
Expand Down
54 changes: 54 additions & 0 deletions synapse/replication/slave/storage/groups.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
from synapse.util.caches.stream_change_cache import StreamChangeCache


class SlavedGroupServerStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedGroupServerStore, self).__init__(db_conn, hs)

self.hs = hs

self._group_updates_id_gen = SlavedIdTracker(
db_conn, "local_group_updates", "stream_id",
)
self._group_updates_stream_cache = StreamChangeCache(
"_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(),
)

get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__
get_group_stream_token = DataStore.get_group_stream_token.__func__
get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__

def stream_positions(self):
result = super(SlavedGroupServerStore, self).stream_positions()
result["groups"] = self._group_updates_id_gen.get_current_token()
return result

def process_replication_rows(self, stream_name, token, rows):
if stream_name == "groups":
self._group_updates_id_gen.advance(token)
for row in rows:
self._group_updates_stream_cache.entity_has_changed(
row.user_id, token
)

return super(SlavedGroupServerStore, self).process_replication_rows(
stream_name, token, rows
)
20 changes: 20 additions & 0 deletions synapse/replication/tcp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@
"state_key", # str
"event_id", # str, optional
))
GroupsStreamRow = namedtuple("GroupsStreamRow", (
"group_id", # str
"user_id", # str
"type", # str
"content", # dict
))


class Stream(object):
Expand Down Expand Up @@ -464,6 +470,19 @@ def __init__(self, hs):
super(CurrentStateDeltaStream, self).__init__(hs)


class GroupServerStream(Stream):
NAME = "groups"
ROW_TYPE = GroupsStreamRow

def __init__(self, hs):
store = hs.get_datastore()

self.current_token = store.get_group_stream_token
self.update_function = store.get_all_groups_changes

super(GroupServerStream, self).__init__(hs)


STREAMS_MAP = {
stream.NAME: stream
for stream in (
Expand All @@ -482,5 +501,6 @@ def __init__(self, hs):
TagAccountDataStream,
AccountDataStream,
CurrentStateDeltaStream,
GroupServerStream,
)
}
5 changes: 5 additions & 0 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ def encode_response(time_now, sync_result, access_token_id, filter):
"invite": invited,
"leave": archived,
},
"groups": {
"join": sync_result.groups.join,
"invite": sync_result.groups.invite,
"leave": sync_result.groups.leave,
},
"device_one_time_keys_count": sync_result.device_one_time_keys_count,
"next_batch": sync_result.next_batch.to_string(),
}
Expand Down
15 changes: 15 additions & 0 deletions synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def __init__(self, db_conn, hs):
db_conn, "pushers", "id",
extra_tables=[("deleted_pushers", "stream_id")],
)
self._group_updates_id_gen = StreamIdGenerator(
db_conn, "local_group_updates", "stream_id",
)

if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = StreamIdGenerator(
Expand Down Expand Up @@ -236,6 +239,18 @@ def __init__(self, db_conn, hs):
prefilled_cache=curr_state_delta_prefill,
)

_group_updates_prefill, min_group_updates_id = self._get_cache_dict(
db_conn, "local_group_updates",
entity_column="user_id",
stream_column="stream_id",
max_value=self._group_updates_id_gen.get_current_token(),
limit=1000,
)
self._group_updates_stream_cache = StreamChangeCache(
"_group_updates_stream_cache", min_group_updates_id,
prefilled_cache=_group_updates_prefill,
)

cur = LoggingTransaction(
db_conn.cursor(),
name="_find_stream_orderings_for_times_txn",
Expand Down
Loading