Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move catchup of replication streams to worker. #7024

Merged
merged 17 commits into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 10 additions & 30 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ example flow would be (where '>' indicates master to worker and
'<' worker to master flows):

> SERVER example.com
< REPLICATE events 53
< REPLICATE
> POSITION events 53
> RDATA events 54 ["$foo1:bar.com", ...]
> RDATA events 55 ["$foo4:bar.com", ...]

The example shows the server accepting a new connection and sending its
identity with the `SERVER` command, followed by the client asking to
subscribe to the `events` stream from the token `53`. The server then
periodically sends `RDATA` commands which have the format
`RDATA <stream_name> <token> <row>`, where the format of `<row>` is
defined by the individual streams.
The example shows the server accepting a new connection and sending its identity
with the `SERVER` command, followed by the client server to respond with the
position of all streams. The server then periodically sends `RDATA` commands
which have the format `RDATA <stream_name> <token> <row>`, where the format of
`<row>` is defined by the individual streams.

Error reporting happens by either the client or server sending an ERROR
command, and usually the connection will be closed.
Expand All @@ -32,9 +32,6 @@ Since the protocol is a simple line based, its possible to manually
connect to the server using a tool like netcat. A few things should be
noted when manually using the protocol:

- When subscribing to a stream using `REPLICATE`, the special token
`NOW` can be used to get all future updates. The special stream name
`ALL` can be used with `NOW` to subscribe to all available streams.
- The federation stream is only available if federation sending has
been disabled on the main process.
- The server will only time connections out that have sent a `PING`
Expand Down Expand Up @@ -91,9 +88,7 @@ The client:
- Sends a `NAME` command, allowing the server to associate a human
friendly name with the connection. This is optional.
- Sends a `PING` as above
- For each stream the client wishes to subscribe to it sends a
`REPLICATE` with the `stream_name` and token it wants to subscribe
from.
- Sends a `REPLICATE` to get the current position of all streams.
- On receipt of a `SERVER` command, checks that the server name
matches the expected server name.

Expand Down Expand Up @@ -140,9 +135,7 @@ the wire:
> PING 1490197665618
< NAME synapse.app.appservice
< PING 1490197665618
< REPLICATE events 1
< REPLICATE backfill 1
< REPLICATE caches 1
< REPLICATE
> POSITION events 1
> POSITION backfill 1
> POSITION caches 1
Expand Down Expand Up @@ -199,20 +192,7 @@ client (C):

#### REPLICATE (C)

Asks the server to replicate a given stream. The syntax is:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

```
REPLICATE <stream_name> <token>
```

Where `<token>` may be either:
* a numeric stream_id to stream updates since (exclusive)
* `NOW` to stream all subsequent updates.

The `<stream_name>` is the name of a replication stream to subscribe
to (see [here](../synapse/replication/tcp/streams/_base.py) for a list
of streams). It can also be `ALL` to subscribe to all known streams,
in which case the `<token>` must be set to `NOW`.
Asks the server for the current position of all streams.

#### USER_SYNC (C)

Expand Down
17 changes: 6 additions & 11 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,29 +179,24 @@ class NameCommand(Command):


class ReplicateCommand(Command):
"""Sent by the client to subscribe to the stream.
"""Sent by the client to subscribe to streams.

erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
Format::

REPLICATE <stream_name>

The <stream_name> can be "ALL" to subscribe to all known streams
REPLICATE
"""

NAME = "REPLICATE"

def __init__(self, stream_name):
self.stream_name = stream_name
def __init__(self):
pass

@classmethod
def from_line(cls, line):
return cls(line)
return cls()

def to_line(self):
return self.stream_name

def get_logcontext_id(self):
return "REPLICATE-" + self.stream_name
return ""


class UserSyncCommand(Command):
Expand Down
6 changes: 2 additions & 4 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
> PING 1490197665618
< NAME synapse.app.appservice
< PING 1490197665618
< REPLICATE events 1
< REPLICATE backfill 1
< REPLICATE caches 1
< REPLICATE
> POSITION events 1
> POSITION backfill 1
> POSITION caches 1
Expand Down Expand Up @@ -662,7 +660,7 @@ def replicate(self):
"""
logger.info("[%s] Subscribing to replication streams", self.id())

self.send_command(ReplicateCommand("ALL"))
self.send_command(ReplicateCommand())

def on_connection_closed(self):
BaseReplicationStreamProtocol.on_connection_closed(self)
Expand Down
4 changes: 2 additions & 2 deletions tests/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ def replicate(self):
self.streamer.on_notifier_poke()
self.pump(0.1)

def replicate_stream(self, stream, token="NOW"):
def replicate_stream(self):
"""Make the client end a REPLICATE command to set up a subscription to a stream"""
self.client.send_command(ReplicateCommand(stream))
self.client.send_command(ReplicateCommand())


class TestReplicationClientHandler(object):
Expand Down
2 changes: 1 addition & 1 deletion tests/replication/tcp/streams/test_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_receipt(self):
self.reconnect()

# make the client subscribe to the receipts stream
self.replicate_stream("receipts", "NOW")
self.replicate_stream()
self.test_handler.streams.add("receipts")

# tell the master to send a new receipt
Expand Down