Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow multiple workers to write to receipts stream. #16432

Merged
merged 15 commits into from
Oct 25, 2023
84 changes: 84 additions & 0 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,90 @@ async def to_string(self, store: "DataStore") -> str:
return "s%d" % (self.stream,)


@attr.s(frozen=True, slots=True, order=False)
class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
"""A basic stream token class for streams that supports multiple writers."""

@classmethod
async def parse(cls, store: "DataStore", string: str) -> "MultiWriterStreamToken":
clokep marked this conversation as resolved.
Show resolved Hide resolved
try:
if string[0].isdigit():
return cls(stream=int(string))
if string[0] == "m":
parts = string[1:].split("~")
stream = int(parts[0])

instance_map = {}
for part in parts[1:]:
key, value = part.split(".")
instance_id = int(key)
pos = int(value)

instance_name = await store.get_name_from_instance_id(instance_id)
instance_map[instance_name] = pos

return cls(
stream=stream,
instance_map=immutabledict(instance_map),
)
except CancelledError:
raise
except Exception:
pass
raise SynapseError(400, "Invalid stream token %r" % (string,))

async def to_string(self, store: "DataStore") -> str:
if self.instance_map:
entries = []
for name, pos in self.instance_map.items():
if pos <= self.stream:
# Ignore instances who are below the minimum stream position
# (we might know they've advanced without seeing a recent
# write from them).
continue

instance_id = await store.get_id_for_instance(name)
entries.append(f"{instance_id}.{pos}")

encoded_map = "~".join(entries)
return f"m{self.stream}~{encoded_map}"
else:
return str(self.stream)

@staticmethod
def is_stream_position_in_range(
low: Optional["AbstractMultiWriterStreamToken"],
high: Optional["AbstractMultiWriterStreamToken"],
instance_name: Optional[str],
pos: int,
) -> bool:
"""Checks if a given persisted position is between the two given tokens.

If `instance_name` is None then the row was persisted before multi
writer support.
"""

if low:
if instance_name:
low_stream = low.instance_map.get(instance_name, low.stream)
else:
low_stream = low.stream

if pos <= low_stream:
return False

if high:
if instance_name:
high_stream = high.instance_map.get(instance_name, high.stream)
else:
high_stream = high.stream

if high_stream < pos:
return False

return True


class StreamKeyType(Enum):
"""Known stream types.

Expand Down