From 931b9a95b19bdbe24a76ca72255100d294f553d9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 Feb 2021 17:38:06 +0000 Subject: [PATCH 1/5] Split ShardedWorkerHandlingConfig This is so that we have a type level understanding of when it is safe to call `get_instance(..)` (as opposed to `should_handle(..)`). --- synapse/config/_base.py | 16 ++++++++++++++++ synapse/config/_base.pyi | 2 ++ synapse/config/workers.py | 11 +++++++++-- 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index e89decda3480..2097378cb682 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -876,4 +876,20 @@ def get_instance(self, key: str) -> str: return self.instances[remainder] +@attr.s +class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig): + """A version of `ShardedWorkerHandlingConfig` that is used for config + options where all instances know which instances are responsible for the + sharded work. + """ + + def __attrs_post_init__(self): + # We require that `self.instances` is non-empty. + assert self.instances + + def get_instance(self, key: str) -> str: + """Get the instance responsible for handling the given key.""" + return self._get_instance(key) + + __all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"] diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi index 70025b5d6075..db16c86f5084 100644 --- a/synapse/config/_base.pyi +++ b/synapse/config/_base.pyi @@ -149,4 +149,6 @@ class ShardedWorkerHandlingConfig: instances: List[str] def __init__(self, instances: List[str]) -> None: ... def should_handle(self, instance_name: str, key: str) -> bool: ... + +class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig): def get_instance(self, key: str) -> str: ... diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 7a0ca16da8b7..81a813f957ea 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -17,7 +17,12 @@ import attr -from ._base import Config, ConfigError, ShardedWorkerHandlingConfig +from ._base import ( + Config, + ConfigError, + RoutableShardedWorkerHandlingConfig, + ShardedWorkerHandlingConfig, +) from .server import ListenerConfig, parse_listener_def @@ -164,7 +169,9 @@ def read_config(self, config, **kwargs): "Must only specify one instance to handle `receipts` messages." ) - self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events) + self.events_shard_config = RoutableShardedWorkerHandlingConfig( + self.writers.events + ) # Whether this worker should run background tasks or not. # From 6b93ad31617dda4756aa6f148980654246108111 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 Feb 2021 15:42:51 +0000 Subject: [PATCH 2/5] Remove special cases in ShardedWorkerHandlingConfig. `ShardedWorkerHandlingConfig` tried to handle the various different ways it was possible to configure federation senders and pushers. This led to special cases that weren't hit during testing. To fix this the handling of the different cases is moved from there and `generic_worker` into the worker config class. This allows us to have the logic in one place and allows the rest of the code to ignore the different cases. --- synapse/app/admin_cmd.py | 2 + synapse/app/generic_worker.py | 32 -------- synapse/config/_base.py | 16 ++-- synapse/config/push.py | 5 +- synapse/config/server.py | 1 - synapse/config/workers.py | 79 +++++++++++++++++-- synapse/push/pusherpool.py | 4 +- synapse/server.py | 7 +- .../tcp/streams/test_federation.py | 2 +- tests/replication/test_federation_ack.py | 2 +- .../test_federation_sender_shard.py | 2 +- tests/replication/test_pusher_shard.py | 2 +- 12 files changed, 94 insertions(+), 60 deletions(-) diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index b4bd4d8e7afb..9f99651aa219 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -210,7 +210,9 @@ def start(config_options): config.update_user_directory = False config.run_background_tasks = False config.start_pushers = False + config.pusher_shard_config.instances = [] config.send_federation = False + config.federation_shard_config.instances = [] synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 6ed405e66b1f..dc0d3eb72503 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -919,22 +919,6 @@ def start(config_options): # For other worker types we force this to off. config.appservice.notify_appservices = False - if config.worker_app == "synapse.app.pusher": - if config.server.start_pushers: - sys.stderr.write( - "\nThe pushers must be disabled in the main synapse process" - "\nbefore they can be run in a separate worker." - "\nPlease add ``start_pushers: false`` to the main config" - "\n" - ) - sys.exit(1) - - # Force the pushers to start since they will be disabled in the main config - config.server.start_pushers = True - else: - # For other worker types we force this to off. - config.server.start_pushers = False - if config.worker_app == "synapse.app.user_dir": if config.server.update_user_directory: sys.stderr.write( @@ -951,22 +935,6 @@ def start(config_options): # For other worker types we force this to off. config.server.update_user_directory = False - if config.worker_app == "synapse.app.federation_sender": - if config.worker.send_federation: - sys.stderr.write( - "\nThe send_federation must be disabled in the main synapse process" - "\nbefore they can be run in a separate worker." - "\nPlease add ``send_federation: false`` to the main config" - "\n" - ) - sys.exit(1) - - # Force the pushers to start since they will be disabled in the main config - config.worker.send_federation = True - else: - # For other worker types we force this to off. - config.worker.send_federation = False - synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts hs = GenericWorkerServer( diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 2097378cb682..365187bbf129 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -845,21 +845,21 @@ class ShardedWorkerHandlingConfig: def should_handle(self, instance_name: str, key: str) -> bool: """Whether this instance is responsible for handling the given key.""" # If multiple instances are not defined we always return true - if not self.instances or len(self.instances) == 1: - return True + if not self.instances: + return False - return self.get_instance(key) == instance_name + return self._get_instance(key) == instance_name - def get_instance(self, key: str) -> str: + def _get_instance(self, key: str) -> str: """Get the instance responsible for handling the given key. - Note: For things like federation sending the config for which instance - is sending is known only to the sender instance if there is only one. - Therefore `should_handle` should be used where possible. + Note: For federation sending and pushers the config for which instance + is sending is known only to the sender instance, so we don't expose this + method by default. """ if not self.instances: - return "master" + raise Exception("Unknown worker") if len(self.instances) == 1: return self.instances[0] diff --git a/synapse/config/push.py b/synapse/config/push.py index 3adbfb73e6d3..7831a2ef7921 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import Config, ShardedWorkerHandlingConfig +from ._base import Config class PushConfig(Config): @@ -27,9 +27,6 @@ def read_config(self, config, **kwargs): "group_unread_count_by_room", True ) - pusher_instances = config.get("pusher_instances") or [] - self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances) - # There was a a 'redact_content' setting but mistakenly read from the # 'email'section'. Check for the flag in the 'push' section, and log, # but do not honour it to avoid nasty surprises when people upgrade. diff --git a/synapse/config/server.py b/synapse/config/server.py index 0bfd4398e225..2afca36e7d16 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -397,7 +397,6 @@ def read_config(self, config, **kwargs): if self.public_baseurl is not None: if self.public_baseurl[-1] != "/": self.public_baseurl += "/" - self.start_pushers = config.get("start_pushers", True) # (undocumented) option for torturing the worker-mode replication a bit, # for testing. The value defines the number of milliseconds to pause before diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 81a813f957ea..e4c0f349b4a2 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -25,6 +25,20 @@ ) from .server import ListenerConfig, parse_listener_def +_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """ +The send_federation must be disabled in the main synapse process +before they can be run in a separate worker. + +Please add ``send_federation: false`` to the main config +""" + +_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """ +The start_pushers must be disabled in the main synapse process +before they can be run in a separate worker. + +Please add ``start_pushers: false`` to the main config +""" + def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]: """Helper for allowing parsing a string or list of strings to a config @@ -108,6 +122,7 @@ def read_config(self, config, **kwargs): self.worker_replication_secret = config.get("worker_replication_secret", None) self.worker_name = config.get("worker_name", self.worker_app) + self.instance_name = self.worker_name or "master" self.worker_main_http_uri = config.get("worker_main_http_uri", None) @@ -123,12 +138,41 @@ def read_config(self, config, **kwargs): ) ) - # Whether to send federation traffic out in this process. This only - # applies to some federation traffic, and so shouldn't be used to - # "disable" federation - self.send_federation = config.get("send_federation", True) + # Handle federation sender configuration. + # + # There are two ways of configuring which instances handle federation + # sending: + # 1. The old way where "send_federation" is set to false and running a + # `synapse.app.federation_sender` worker app. + # 2. Specifying the workers sending federation in + # `federation_sender_instances`. + # + + send_federation = config.get("send_federation", True) - federation_sender_instances = config.get("federation_sender_instances") or [] + federation_sender_instances = config.get("federation_sender_instances") + if federation_sender_instances is None: + # Default to an empty list, which means "another, unknown, worker is + # responsible for it". + federation_sender_instances = [] + + # If no federation sender instances are set we check if + # `send_federation` is set, which means use master + if send_federation: + federation_sender_instances = ["master"] + + if self.worker_app == "synapse.app.federation_sender": + if send_federation: + # If we're running federation senders, and not using + # `federation_sender_instances`, then we should have + # explicitly set `send_federation` to false. + raise ConfigError( + _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR + ) + + federation_sender_instances = [self.worker_name] + + self.send_federation = self.instance_name in federation_sender_instances self.federation_shard_config = ShardedWorkerHandlingConfig( federation_sender_instances ) @@ -173,6 +217,31 @@ def read_config(self, config, **kwargs): self.writers.events ) + # Handle sharded push + start_pushers = config.get("start_pushers", True) + pusher_instances = config.get("pusher_instances") + if pusher_instances is None: + # Default to an empty list, which means "another, unknown, worker is + # responsible for it". + pusher_instances = [] + + # If no pushers instances are set we check if `start_pushers` is + # set, which means use master + if start_pushers: + pusher_instances = ["master"] + + if self.worker_app == "synapse.app.pusher": + if start_pushers: + # If we're running pushers, and not using + # `pusher_instances`, then we should have explicitly set + # `start_pushers` to false. + raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR) + + pusher_instances = [self.instance_name] + + self.start_pushers = self.instance_name in pusher_instances + self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances) + # Whether this worker should run background tasks or not. # # As a note for developers, the background tasks guarded by this should diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 3936bf878479..21f14f05f0a5 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -59,7 +59,6 @@ class PusherPool: def __init__(self, hs: "HomeServer"): self.hs = hs self.pusher_factory = PusherFactory(hs) - self._should_start_pushers = hs.config.start_pushers self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() @@ -68,6 +67,9 @@ def __init__(self, hs: "HomeServer"): # We shard the handling of push notifications by user ID. self._pusher_shard_config = hs.config.push.pusher_shard_config self._instance_name = hs.get_instance_name() + self._should_start_pushers = ( + self._instance_name in self._pusher_shard_config.instances + ) # We can only delete pushers on master. self._remove_pusher_client = None diff --git a/synapse/server.py b/synapse/server.py index 5de87820002b..4b9ec7f0ae98 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -248,7 +248,7 @@ def __init__( self.start_time = None # type: Optional[int] self._instance_id = random_string(5) - self._instance_name = config.worker_name or "master" + self._instance_name = config.worker.instance_name self.version_string = version_string @@ -760,7 +760,4 @@ def get_outbound_redis_connection(self) -> Optional["RedisProtocol"]: def should_send_federation(self) -> bool: "Should this server be sending federation traffic directly?" - return self.config.send_federation and ( - not self.config.worker_app - or self.config.worker_app == "synapse.app.federation_sender" - ) + return self.config.send_federation diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py index 2babea4e3e60..aa4bf1c7e3b0 100644 --- a/tests/replication/tcp/streams/test_federation.py +++ b/tests/replication/tcp/streams/test_federation.py @@ -24,7 +24,7 @@ def _get_worker_hs_config(self) -> dict: # enable federation sending on the worker config = super()._get_worker_hs_config() # TODO: make it so we don't need both of these - config["send_federation"] = True + config["send_federation"] = False config["worker_app"] = "synapse.app.federation_sender" return config diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py index 18536675588e..f235f1bd83e2 100644 --- a/tests/replication/test_federation_ack.py +++ b/tests/replication/test_federation_ack.py @@ -27,7 +27,7 @@ class FederationAckTestCase(HomeserverTestCase): def default_config(self) -> dict: config = super().default_config() config["worker_app"] = "synapse.app.federation_sender" - config["send_federation"] = True + config["send_federation"] = False return config def make_homeserver(self, reactor, clock): diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py index fffdb742c8ea..2f2d117858f0 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py @@ -49,7 +49,7 @@ def test_send_event_single_sender(self): self.make_worker_hs( "synapse.app.federation_sender", - {"send_federation": True}, + {"send_federation": False}, federation_http_client=mock_client, ) diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py index f118fe32af60..ab2988a6ba47 100644 --- a/tests/replication/test_pusher_shard.py +++ b/tests/replication/test_pusher_shard.py @@ -95,7 +95,7 @@ def test_send_push_single_worker(self): self.make_worker_hs( "synapse.app.pusher", - {"start_pushers": True}, + {"start_pushers": False}, proxied_blacklisted_http_client=http_client_mock, ) From 35b841649324c15c186229942b8934f315b3b146 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 Feb 2021 18:14:30 +0000 Subject: [PATCH 3/5] Newsfile --- changelog.d/9466.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/9466.bugfix diff --git a/changelog.d/9466.bugfix b/changelog.d/9466.bugfix new file mode 100644 index 000000000000..2ab4f315c11f --- /dev/null +++ b/changelog.d/9466.bugfix @@ -0,0 +1 @@ +Fix deleting pushers when using sharded pushers. From 13af8a2d608c230b4a8c04758fc0350c861b1ca6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Feb 2021 16:13:24 +0000 Subject: [PATCH 4/5] Check 'self.writers.events' is non-empty, and raise more helpful error in RoutableShardedWorkerHandlingConfig --- synapse/config/_base.py | 3 ++- synapse/config/workers.py | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 365187bbf129..c608b2d34407 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -885,7 +885,8 @@ class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig): def __attrs_post_init__(self): # We require that `self.instances` is non-empty. - assert self.instances + if not self.instances: + raise Exception("Got empty list of instances for shard config") def get_instance(self, key: str) -> str: """Get the instance responsible for handling the given key.""" diff --git a/synapse/config/workers.py b/synapse/config/workers.py index e4c0f349b4a2..35a0c5e0053c 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -213,6 +213,9 @@ def read_config(self, config, **kwargs): "Must only specify one instance to handle `receipts` messages." ) + if len(self.writers.events) == 0: + raise ConfigError("Must specify at least one instance to handle `events`.") + self.events_shard_config = RoutableShardedWorkerHandlingConfig( self.writers.events ) From 173c79e32d9d361c38012618443c1d17adf01871 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 Feb 2021 10:40:08 +0000 Subject: [PATCH 5/5] Fix up comments/errors --- synapse/config/_base.py | 3 ++- synapse/config/workers.py | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index c608b2d34407..402696671193 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -844,7 +844,8 @@ class ShardedWorkerHandlingConfig: def should_handle(self, instance_name: str, key: str) -> bool: """Whether this instance is responsible for handling the given key.""" - # If multiple instances are not defined we always return true + # If no instances are defined we assume some other worker is handling + # this. if not self.instances: return False diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 35a0c5e0053c..ac92375a85e6 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -26,15 +26,15 @@ from .server import ListenerConfig, parse_listener_def _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """ -The send_federation must be disabled in the main synapse process -before they can be run in a separate worker. +The send_federation config option must be disabled in the main +synapse process before they can be run in a separate worker. Please add ``send_federation: false`` to the main config """ _PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """ -The start_pushers must be disabled in the main synapse process -before they can be run in a separate worker. +The start_pushers config option must be disabled in the main +synapse process before they can be run in a separate worker. Please add ``start_pushers: false`` to the main config """