Skip to content

Commit

Permalink
Merge worker apps into one. (matrix-org#6964)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored and phil-flex committed Mar 27, 2020
1 parent 660cd73 commit 96b1910
Show file tree
Hide file tree
Showing 16 changed files with 1,052 additions and 2,327 deletions.
1 change: 1 addition & 0 deletions changelog.d/6964.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Merge worker apps together.
156 changes: 3 additions & 153 deletions synapse/app/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,161 +13,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys

from twisted.internet import defer, reactor
from twisted.web.resource import NoResource

import synapse
from synapse import events
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext, run_in_background
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string

logger = logging.getLogger("synapse.app.appservice")


class AppserviceSlaveStore(
DirectoryStore,
SlavedEventStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
):
pass


class AppserviceServer(HomeServer):
DATASTORE_CLASS = AppserviceSlaveStore

def _listen_http(self, listener_config):
port = listener_config["port"]
bind_addresses = listener_config["bind_addresses"]
site_tag = listener_config.get("tag", port)
resources = {}
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)

root_resource = create_resource_tree(resources, NoResource())

_base.listen_tcp(
bind_addresses,
port,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
self.version_string,
),
)

logger.info("Synapse appservice now listening on port %d", port)

def start_listening(self, listeners):
for listener in listeners:
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
_base.listen_tcp(
listener["bind_addresses"],
listener["port"],
manhole(
username="matrix", password="rabbithole", globals={"hs": self}
),
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warning(
(
"Metrics listener configured, but "
"enable_metrics is not True!"
)
)
else:
_base.listen_metrics(listener["bind_addresses"], listener["port"])
else:
logger.warning("Unrecognized listener type: %s", listener["type"])

self.get_tcp_replication().start_replication(self)

def build_tcp_replication(self):
return ASReplicationHandler(self)

import sys

class ASReplicationHandler(ReplicationClientHandler):
def __init__(self, hs):
super(ASReplicationHandler, self).__init__(hs.get_datastore())
self.appservice_handler = hs.get_application_service_handler()

async def on_rdata(self, stream_name, token, rows):
await super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)

if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()
run_in_background(self._notify_app_services, max_stream_id)

@defer.inlineCallbacks
def _notify_app_services(self, room_stream_id):
try:
yield self.appservice_handler.notify_interested_services(room_stream_id)
except Exception:
logger.exception("Error notifying application services of event")


def start(config_options):
try:
config = HomeServerConfig.load_config("Synapse appservice", config_options)
except ConfigError as e:
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)

assert config.worker_app == "synapse.app.appservice"

events.USE_FROZEN_DICTS = config.use_frozen_dicts

if config.notify_appservices:
sys.stderr.write(
"\nThe appservices must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
"\nPlease add ``notify_appservices: false`` to the main config"
"\n"
)
sys.exit(1)

# Force the pushers to start since they will be disabled in the main config
config.notify_appservices = True

ps = AppserviceServer(
config.server_name,
config=config,
version_string="Synapse/" + get_version_string(synapse),
)

setup_logging(ps, config, use_worker_options=True)

ps.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ps, config.worker_listeners
)

_base.start_worker_reactor("synapse-appservice", config)

from synapse.app.generic_worker import start
from synapse.util.logcontext import LoggingContext

if __name__ == "__main__":
with LoggingContext("main"):
Expand Down
190 changes: 3 additions & 187 deletions synapse/app/client_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,195 +13,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys

from twisted.internet import reactor
from twisted.web.resource import NoResource

import synapse
from synapse import events
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.login import LoginRestServlet
from synapse.rest.client.v1.push_rule import PushRuleRestServlet
from synapse.rest.client.v1.room import (
JoinedRoomMemberListRestServlet,
PublicRoomListRestServlet,
RoomEventContextServlet,
RoomMemberListRestServlet,
RoomMessageListRestServlet,
RoomStateRestServlet,
)
from synapse.rest.client.v1.voip import VoipRestServlet
from synapse.rest.client.v2_alpha import groups
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet
from synapse.server import HomeServer
from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string

logger = logging.getLogger("synapse.app.client_reader")


class ClientReaderSlavedStore(
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedReceiptsStore,
SlavedPushRuleStore,
SlavedGroupServerStore,
SlavedAccountDataStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedTransactionStore,
SlavedProfileStore,
SlavedClientIpStore,
MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass


class ClientReaderServer(HomeServer):
DATASTORE_CLASS = ClientReaderSlavedStore

def _listen_http(self, listener_config):
port = listener_config["port"]
bind_addresses = listener_config["bind_addresses"]
site_tag = listener_config.get("tag", port)
resources = {}
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)

PublicRoomListRestServlet(self).register(resource)
RoomMemberListRestServlet(self).register(resource)
JoinedRoomMemberListRestServlet(self).register(resource)
RoomStateRestServlet(self).register(resource)
RoomEventContextServlet(self).register(resource)
RoomMessageListRestServlet(self).register(resource)
RegisterRestServlet(self).register(resource)
LoginRestServlet(self).register(resource)
ThreepidRestServlet(self).register(resource)
KeyQueryServlet(self).register(resource)
KeyChangesServlet(self).register(resource)
VoipRestServlet(self).register(resource)
PushRuleRestServlet(self).register(resource)
VersionsRestServlet(self).register(resource)

groups.register_servlets(self, resource)

resources.update({"/_matrix/client": resource})

root_resource = create_resource_tree(resources, NoResource())

_base.listen_tcp(
bind_addresses,
port,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
self.version_string,
),
)

logger.info("Synapse client reader now listening on port %d", port)

def start_listening(self, listeners):
for listener in listeners:
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
_base.listen_tcp(
listener["bind_addresses"],
listener["port"],
manhole(
username="matrix", password="rabbithole", globals={"hs": self}
),
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warning(
(
"Metrics listener configured, but "
"enable_metrics is not True!"
)
)
else:
_base.listen_metrics(listener["bind_addresses"], listener["port"])
else:
logger.warning("Unrecognized listener type: %s", listener["type"])

self.get_tcp_replication().start_replication(self)

def build_tcp_replication(self):
return ReplicationClientHandler(self.get_datastore())


def start(config_options):
try:
config = HomeServerConfig.load_config("Synapse client reader", config_options)
except ConfigError as e:
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)

assert config.worker_app == "synapse.app.client_reader"

events.USE_FROZEN_DICTS = config.use_frozen_dicts

ss = ClientReaderServer(
config.server_name,
config=config,
version_string="Synapse/" + get_version_string(synapse),
)

setup_logging(ss, config, use_worker_options=True)

ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
)

_base.start_worker_reactor("synapse-client-reader", config)
import sys

from synapse.app.generic_worker import start
from synapse.util.logcontext import LoggingContext

if __name__ == "__main__":
with LoggingContext("main"):
Expand Down
Loading

0 comments on commit 96b1910

Please sign in to comment.