Skip to content

Commit

Permalink
Added docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
lionellloh committed Sep 30, 2020
1 parent 01058df commit 5d8f9f5
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 5 deletions.
38 changes: 37 additions & 1 deletion channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,48 @@
from typing import List
from collections import namedtuple

"""
Callback Listener is a tuple with `event` and `callback`
"""
CallbackListener = namedtuple("CallbackListener", "event callback")


class Channel:
def __init__(self, socket, topic: str, params: dict):
"""
`Channel` is an abstraction for a topic listener for an existing socket connection.
Each Channel has its own topic and a list of event-callbacks that responds to messages.
Should only be instantiated through `connection.Socket().set_chanel(topic)`
Topic-Channel has a 1-many relationship.
"""

def __init__(self, socket, topic: str, params: dict = {}):
"""
:param socket: Socket object
:param topic: Topic that it subscribes to on the realtime server
:param params:
"""
self.socket = socket
self.topic: str = topic
self.params: dict = params
self.listeners: List[CallbackListener] = []
self.joined: bool = False

def join(self):
"""
Wrapper for async def _join() to expose a non-async interface
Essentially gets the only event loop and attempt joining a topic
:return: None
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(self._join())
return self

async def _join(self):
"""
Coroutine that attempts to join Phoenix Realtime server via a certain topic
:return: Channel.channel
"""
join_req = dict(topic=self.topic, event="phx_join", payload={}, ref=None)

try:
Expand All @@ -30,10 +55,21 @@ async def _join(self):
return

def on(self, event: str, callback):
"""
:param event: A specific event will have a specific callback
:param callback: Callback that takes msg payload as its first argument
:return: NOne
"""

# TODO: Should I return self so that I can allow chaining?
cl = CallbackListener(event=event, callback=callback)
self.listeners.append(cl)

def off(self, event: str):
"""
:param event: Stop responding to a certain event
:return: None
"""
self.listeners = [callback for callback in self.listeners if callback.event != event]
37 changes: 33 additions & 4 deletions connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
class Socket:

def __init__(self, url: str, params: dict = {}, hb_interval: int = 5):
"""
`Socket` is the abstraction for an actual socket connection that receives and 'reroutes' `Message` according to its `topic` and `event`.
Socket-Channel has a 1-many relationship.
Socket-Topic has a 1-many relationship.
:param url: Websocket URL of the Realtime server. starts with `ws://` or `wss://`
:param params: Optional parameters for connection.
:param hb_interval: WS connection is kept alive by sending a heartbeat message. Optional, defaults to 5.
"""
self.url = url
self.channels = defaultdict(list)
self.connected = False
Expand All @@ -18,18 +26,25 @@ def __init__(self, url: str, params: dict = {}, hb_interval: int = 5):
self.kept_alive: bool = False

def listen(self):

"""
Wrapper for async def _listen() to expose a non-async interface
In most cases, this should be the last method executed as it starts an infinite listening loop.
:return: None
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(self._listen(), self._keep_alive()))

async def _listen(self):
"""
An infinite loop that keeps listening.
:return: None
"""
while True:
try:
msg = await self.ws_connection.recv()
msg = Message(**json.loads(msg))
if msg.event == ChannelEvents.reply:
continue
# TODO: use a named tuple?
for channel in self.channels.get(msg.topic, []):
for cl in channel.listeners:
if cl.event == msg.event:
Expand All @@ -40,10 +55,15 @@ async def _listen(self):
break

def connect(self):
"""
Wrapper for async def _connect() to expose a non-async interface
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(self._connect())
self.connected = True

async def _connect(self):

ws_connection = await websockets.connect(self.url)
if ws_connection.open:
# TODO: Include a logger to indicate successful connection
Expand All @@ -69,14 +89,23 @@ async def _keep_alive(self):
break

def set_channel(self, topic: str):
"""
:param topic: Initializes a channel and creates a two-way association with the socket
:return: None
"""
assert self.connected, "Socket must be connected before trying to set a channel!"

chan = Channel(self, topic, self.params)
self.channels[topic].append(chan)

return chan

# TODO: Implement this to show summary to subscriptions

def summary(self):
# print a summary of subscriptions from the socket
"""
Prints a list of topics and event the socket is listening to
:return: None
"""
for topic, chans in self.channels.items():
for chan in chans:
print(f"Topic: {topic} | Events: {[e for e, _ in chan.callbacks]}]")
7 changes: 7 additions & 0 deletions messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

@dataclass
class Message:
"""
Dataclass abstraction for message
"""
event: str
payload: dict
ref: any
Expand All @@ -13,6 +16,10 @@ def __hash__(self):


class ChannelEvents(str, Enum):
"""
ChannelEvents are a bunch of constant strings that are defined according to
what the Phoenix realtime server expects.
"""
close = "phx_close"
error = "phx_error"
join = "phx_join"
Expand Down

0 comments on commit 5d8f9f5

Please sign in to comment.