Skip to content

Commit

Permalink
python: move send-side flow control to Channel
Browse files Browse the repository at this point in the history
AsyncChannel and ProtocolChannel each have their own version of some
very-similar looking flow control code.

Let's unify those and move them to the base Channel type before we go
and add yet another copy.  All channels ought to support flow control
anyway.  Rename the constants to BLOCK_SIZE and SEND_WINDOW to make them
more accessible (and meaningful) to subclasses.  In particular,
BLOCK_SIZE is the size of blocks that channel implementations should
aim to send.

Add some docs as well, and implement the 'flow-control' option (it was
missing for ProtocolChannel).

Note: we don't do the same with receive-side flow control because it's
easier to implement (no windowing algorithm — just reply to pings) and
also because the two implementations we have are very different so we
can't really share code.
  • Loading branch information
allisonkarlitskaya committed Jan 31, 2023
1 parent 0d6754f commit 8d980ba
Showing 1 changed file with 60 additions and 48 deletions.
108 changes: 60 additions & 48 deletions src/cockpit/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
from __future__ import annotations

import asyncio
import logging

from typing import ClassVar, Dict, List, Optional, Sequence, Tuple, Type

from .router import Endpoint, Router, RoutingRule


logger = logging.getLogger(__name__)


class ChannelRoutingRule(RoutingRule):
table: Dict[str, List[Type[Channel]]]

Expand Down Expand Up @@ -80,9 +84,15 @@ def __init__(self, problem, **kwargs):

class Channel(Endpoint):
# Values borrowed from C implementation
CHANNEL_FLOW_PING = 16 * 1024
CHANNEL_FLOW_WINDOW = 2 * 1024 * 1024
BLOCK_SIZE = 16 * 1024
SEND_WINDOW = 2 * 1024 * 1024

# Flow control book-keeping
_send_pings: bool = False
_out_sequence: int = 0
_out_window: int = SEND_WINDOW

# Must be filled in by the channel implementation
payload: ClassVar[str]
restrictions: ClassVar[Sequence[Tuple[str, object]]] = ()

Expand All @@ -95,6 +105,8 @@ def do_control(self, command, message):
# 'message' field for handlers that don't need it.
if command == 'open':
self.channel = message['channel']
if message.get('flow-control'):
self._send_pings = True
self.do_open(message)
elif command == 'ready':
self.do_ready()
Expand Down Expand Up @@ -130,13 +142,10 @@ def do_done(self):
def do_close(self):
pass

def do_pong(self, message):
pass

def do_options(self, message):
raise ChannelError('not-supported', message='This channel does not implement "options"')

# 'reasonable' default, overridden in AsyncChannel for flow control
# 'reasonable' default, overridden in other channels for receive-side flow control
def do_ping(self, message):
self.send_pong(message)

Expand All @@ -161,9 +170,40 @@ def done(self):
def close(self, **kwargs):
self.send_control('close', **kwargs)

def send_data(self, data: bytes) -> None:
def send_data(self, data: bytes) -> bool:
"""Send data and handle book-keeping for flow control.
The flow control is "advisory". The data is sent immediately, even if
it's larger than the window. In general you should try to send packets
which are approximately Channel.BLOCK_SIZE in size.
Returns True if there is still room in the window, or False if you
should stop writing for now. In that case, `.do_resume_send()` will be
called later when there is more room.
"""
self.send_channel_data(self.channel, data)

if self._send_pings:
out_sequence = self._out_sequence + len(data)
if self._out_sequence // Channel.BLOCK_SIZE != out_sequence // Channel.BLOCK_SIZE:
self.send_control(command='ping', sequence=out_sequence)
self._out_sequence = out_sequence

return self._out_sequence < self._out_window

def do_pong(self, message):
if not self._send_pings: # huh?
logger.warning("Got wild pong on channel %s", self.channel)
return

self._out_window = message['sequence'] + Channel.SEND_WINDOW
if self._out_sequence < self._out_window:
self.do_resume_send()

def do_resume_send(self) -> None:
"""Called to indicate that the channel may start sending again."""
pass # change to `raise NotImplementedError` after everyone implements it

def send_message(self, **kwargs):
self.send_channel_message(self.channel, **kwargs)

Expand Down Expand Up @@ -192,10 +232,6 @@ class ProtocolChannel(Channel, asyncio.Protocol):
_send_pongs: bool = True
_last_ping: Optional[Dict[str, object]]

_send_pings: bool = False
_out_sequence: int = 0
_out_window: int = Channel.CHANNEL_FLOW_WINDOW

# read-side EOF handling
_close_on_eof: bool = False
_eof: bool = False
Expand Down Expand Up @@ -235,8 +271,13 @@ def do_done(self) -> None:
self._transport.write_eof()

def data_received(self, data: bytes) -> None:
self.send_data(data)
self._write_flow_control(len(data))
assert self._transport is not None
if not self.send_data(data):
self._transport.pause_reading()

def do_resume_send(self) -> None:
assert self._transport is not None
self._transport.resume_reading()

def close_on_eof(self) -> None:
"""Mark the channel to be closed on EOF.
Expand All @@ -260,21 +301,6 @@ def eof_received(self) -> bool:
self.done()
return not self._close_on_eof

# Channel send-side flow control
def _write_flow_control(self, n_bytes):
out_sequence = self._out_sequence + n_bytes
if self._out_sequence // Channel.CHANNEL_FLOW_PING != out_sequence // Channel.CHANNEL_FLOW_PING:
self.send_control(command='ping', sequence=out_sequence)
self._out_sequence = out_sequence

if self._out_window <= self._out_sequence:
self._transport.pause_reading()

def do_pong(self, message):
self._out_window = message['sequence'] + Channel.CHANNEL_FLOW_WINDOW
if self._out_sequence < self._out_window:
self._transport.resume_reading()

# Channel receive-side flow control
def do_ping(self, message):
if self._send_pongs:
Expand Down Expand Up @@ -317,9 +343,7 @@ class AsyncChannel(Channel):
# do_data() without blocking, we have no choice.
receive_queue = None

# Send-side flow control: no buffers here, just bookkeeping.
out_sequence = 0
out_window = Channel.CHANNEL_FLOW_WINDOW
# Send-side flow control
write_waiter = None

async def run(self, options):
Expand All @@ -338,29 +362,17 @@ async def read(self):
return item

async def write(self, data):
if self.flow_control:
assert len(data) <= AsyncChannel.CHANNEL_FLOW_WINDOW
if not self.send_data(data):
self.write_waiter = asyncio.get_running_loop().create_future()
await self.write_waiter

out_sequence = self.out_sequence + len(data)
if self.out_sequence // Channel.CHANNEL_FLOW_PING != out_sequence // Channel.CHANNEL_FLOW_PING:
self.send_control(command='ping', sequence=out_sequence)
self.out_sequence = out_sequence

while self.out_window < self.out_sequence:
self.write_waiter = asyncio.get_running_loop().create_future()
await self.write_waiter

self.send_data(data)

def do_pong(self, message):
self.out_window = message['sequence'] + AsyncChannel.CHANNEL_FLOW_WINDOW
if self.out_sequence <= self.out_window and self.write_waiter is not None:
def do_resume_send(self) -> None:
if self.write_waiter is not None:
self.write_waiter.set_result(None)
self.write_waiter = None

def do_open(self, options):
self.receive_queue = asyncio.Queue()
self.flow_control = options.get('flow-control') is True
asyncio.create_task(self.run_wrapper(options), name=f'{self.__class__.__name__}.run_wrapper({options})')

def do_done(self):
Expand Down

0 comments on commit 8d980ba

Please sign in to comment.