Skip to content

Commit

Permalink
Merge branch 'master' into add-ghaction
Browse files Browse the repository at this point in the history
  • Loading branch information
J0 authored Sep 13, 2022
2 parents 428e2bd + d475aea commit 966f986
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 93 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Python Client Library to interface with the Phoenix Realtime Server

## Installation
```bash
pip3 install realtime_py==0.1.1a0
pip3 install realtime==0.1.1a0
```

## Installation from source
Expand All @@ -18,7 +18,7 @@ python3 usage.py

## Quick Start
```python
from realtime_py.connection import Socket
from realtime.connection import Socket

def callback1(payload):
print("Callback 1: ", payload)
Expand Down Expand Up @@ -47,7 +47,7 @@ if __name__ == "__main__":
Here's how you could connect to your realtime endpoint using Supabase endpoint. Correct as of 5th June 2021. Please replace `SUPABASE_ID` and `API_KEY` with your own `SUPABASE_ID` and `API_KEY`. The variables shown below are fake and they will not work if you try to run the snippet.

```python
from realtime_py.connection import Socket
from realtime.connection import Socket

SUPABASE_ID = "dlzlllxhaakqdmaapvji"
API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoiYW5vbiIsImlhdCI6MT"
Expand Down
6 changes: 3 additions & 3 deletions docs/realtime/channel.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ <h1 class="title">Module <code>realtime.channel</code></h1>
&#34;&#34;&#34;
`Channel` is an abstraction for a topic listener for an existing socket connection.
Each Channel has its own topic and a list of event-callbacks that responds to messages.
Should only be instantiated through `connection.Socket().set_chanel(topic)`
Should only be instantiated through `connection.Socket().set_channel(topic)`
Topic-Channel has a 1-many relationship.
&#34;&#34;&#34;

Expand Down Expand Up @@ -141,7 +141,7 @@ <h3>Instance variables</h3>
<dd>
<div class="desc"><p><code><a title="realtime.channel.Channel" href="#realtime.channel.Channel">Channel</a></code> is an abstraction for a topic listener for an existing socket connection.
Each Channel has its own topic and a list of event-callbacks that responds to messages.
Should only be instantiated through <code>connection.Socket().set_chanel(topic)</code>
Should only be instantiated through <code>connection.Socket().set_channel(topic)</code>
Topic-Channel has a 1-many relationship.</p>
<p>:param socket: Socket object
:param topic: Topic that it subscribes to on the realtime server
Expand All @@ -154,7 +154,7 @@ <h3>Instance variables</h3>
&#34;&#34;&#34;
`Channel` is an abstraction for a topic listener for an existing socket connection.
Each Channel has its own topic and a list of event-callbacks that responds to messages.
Should only be instantiated through `connection.Socket().set_chanel(topic)`
Should only be instantiated through `connection.Socket().set_channel(topic)`
Topic-Channel has a 1-many relationship.
&#34;&#34;&#34;

Expand Down
9 changes: 5 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
[tool.poetry]
name = "realtime-py"
version = "0.1.2"
name = "realtime"
version = "0.0.4"
description = ""
authors = ["None"]
authors = ["Joel"]

[tool.poetry.dependencies]
python = "^3.7"
dataclasses = "^0.6"
websockets = "^10.1"
websockets = "^10.3"
python-dateutil = "^2.8.1"
typing-extensions = "^4.2.0"

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
Expand Down
7 changes: 7 additions & 0 deletions realtime/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
__version__ = "0.0.4"

from realtime.channel import CallbackListener, Channel
from realtime.connection import Socket
from realtime.exceptions import NotConnectedError
from realtime.message import *
from realtime.transformers import *
55 changes: 31 additions & 24 deletions realtime_py/channel.py → realtime/channel.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,79 @@
from __future__ import annotations

import asyncio
import json
from collections import namedtuple
from typing import List
from typing import Any, List, Dict, TYPE_CHECKING, NamedTuple

from realtime.types import Callback

if TYPE_CHECKING:
from realtime.connection import Socket

"""
Callback Listener is a tuple with `event` and `callback`
"""
CallbackListener = namedtuple("CallbackListener", "event callback")

class CallbackListener(NamedTuple):
"""A tuple with `event` and `callback` """
event: str
callback: Callback


class Channel:
"""
`Channel` is an abstraction for a topic listener for an existing socket connection.
Each Channel has its own topic and a list of event-callbacks that responds to messages.
Should only be instantiated through `connection.Socket().set_chanel(topic)`
Should only be instantiated through `connection.Socket().set_channel(topic)`
Topic-Channel has a 1-many relationship.
"""

def __init__(self, socket, topic: str, params: dict = {}):
def __init__(self, socket: Socket, topic: str, params: Dict[str, Any] = {}) -> None:
"""
:param socket: Socket object
:param topic: Topic that it subscribes to on the realtime server
:param params:
"""
self.socket = socket
self.topic: str = topic
self.params: dict = params
self.params = params
self.topic = topic
self.listeners: List[CallbackListener] = []
self.joined: bool = False
self.joined = False

def join(self):
def join(self) -> Channel:
"""
Wrapper for async def _join() to expose a non-async interface
Essentially gets the only event loop and attempt joining a topic
:return: None
:return: Channel
"""
loop = asyncio.get_event_loop()
loop = asyncio.get_event_loop() # TODO: replace with get_running_loop
loop.run_until_complete(self._join())
return self

async def _join(self):
async def _join(self) -> None:
"""
Coroutine that attempts to join Phoenix Realtime server via a certain topic
:return: Channel.channel
:return: None
"""
join_req = dict(topic=self.topic, event="phx_join", payload={}, ref=None)
join_req = dict(topic=self.topic, event="phx_join",
payload={}, ref=None)

try:
await self.socket.ws_connection.send(json.dumps(join_req))

except Exception as e:
print(str(e))
print(str(e)) # TODO: better error propagation
return

def on(self, event: str, callback):
def on(self, event: str, callback: Callback) -> Channel:
"""
:param event: A specific event will have a specific callback
:param callback: Callback that takes msg payload as its first argument
:return: None
:return: Channel
"""

cl = CallbackListener(event=event, callback=callback)
self.listeners.append(cl)
return self

def off(self, event: str):
def off(self, event: str) -> None:
"""
:param event: Stop responding to a certain event
:return: None
"""
self.listeners = [callback for callback in self.listeners if callback.event != event]
self.listeners = [
callback for callback in self.listeners if callback.event != event]
76 changes: 43 additions & 33 deletions realtime_py/connection.py → realtime/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,35 @@
import logging
from collections import defaultdict
from functools import wraps
from typing import Any, Callable, List, Dict, cast, TypeVar

import websockets
from typing_extensions import ParamSpec

from realtime_py.channel import Channel
from realtime_py.exceptions import NotConnectedError
from realtime_py.message import HEARTBEAT_PAYLOAD, PHOENIX_CHANNEL, ChannelEvents, Message
from realtime.channel import Channel
from realtime.exceptions import NotConnectedError
from realtime.message import HEARTBEAT_PAYLOAD, PHOENIX_CHANNEL, ChannelEvents, Message

logging.basicConfig(format="%(asctime)s:%(levelname)s - %(message)s", level=logging.INFO)

T_Retval = TypeVar("T_Retval")
T_ParamSpec = ParamSpec("T_ParamSpec")

class Socket:
def ensure_connection(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not args[0].connected:
raise NotConnectedError(func.__name__)
logging.basicConfig(
format="%(asctime)s:%(levelname)s - %(message)s", level=logging.INFO)

def ensure_connection(func: Callable[T_ParamSpec, T_Retval]):
@wraps(func)
def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval:
if not args[0].connected:
raise NotConnectedError(func.__name__)

return func(*args, **kwargs)

return func(*args, **kwargs)
return wrapper

return wrapper

def __init__(self, url: str, params: dict = {}, hb_interval: int = 5):
class Socket:
def __init__(self, url: str, params: Dict[str, Any] = {}, hb_interval: int = 5) -> None:
"""
`Socket` is the abstraction for an actual socket connection that receives and 'reroutes' `Message` according to its `topic` and `event`.
Socket-Channel has a 1-many relationship.
Expand All @@ -36,22 +43,25 @@ def __init__(self, url: str, params: dict = {}, hb_interval: int = 5):
self.url = url
self.channels = defaultdict(list)
self.connected = False
self.params: dict = params
self.hb_interval: int = hb_interval
self.ws_connection: websockets.client.WebSocketClientProtocol = None
self.kept_alive: bool = False
self.params = params
self.hb_interval = hb_interval
self.ws_connection: websockets.client.WebSocketClientProtocol
self.kept_alive = False

self.channels = cast(defaultdict[str, List[Channel]], self.channels)

@ensure_connection
def listen(self):
def listen(self) -> None:
"""
Wrapper for async def _listen() to expose a non-async interface
In most cases, this should be the last method executed as it starts an infinite listening loop.
:return: None
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(self._listen(), self._keep_alive()))
loop = asyncio.get_event_loop() # TODO: replace with get_running_loop
loop.run_until_complete(asyncio.gather(
self._listen(), self._keep_alive()))

async def _listen(self):
async def _listen(self) -> None:
"""
An infinite loop that keeps listening.
:return: None
Expand All @@ -60,37 +70,37 @@ async def _listen(self):
try:
msg = await self.ws_connection.recv()
msg = Message(**json.loads(msg))

if msg.event == ChannelEvents.reply:
continue

for channel in self.channels.get(msg.topic, []):
for cl in channel.listeners:
if cl.event == msg.event:
cl.callback(msg.payload)

except websockets.exceptions.ConnectionClosed:
logging.exception("Connection closed")
break

def connect(self):
def connect(self) -> None:
"""
Wrapper for async def _connect() to expose a non-async interface
"""
loop = asyncio.get_event_loop()
loop = asyncio.get_event_loop() # TODO: replace with get_running
loop.run_until_complete(self._connect())
self.connected = True

async def _connect(self):

async def _connect(self) -> None:
ws_connection = await websockets.connect(self.url)

if ws_connection.open:
logging.info("Connection was successful")
self.ws_connection = ws_connection
self.connected = True

else:
raise Exception("Connection Failed")

async def _keep_alive(self):
async def _keep_alive(self) -> None:
"""
Sending heartbeat to server every 5 seconds
Ping - pong messages to verify connection is alive
Expand All @@ -110,22 +120,22 @@ async def _keep_alive(self):
break

@ensure_connection
def set_channel(self, topic: str):
def set_channel(self, topic: str) -> Channel:
"""
:param topic: Initializes a channel and creates a two-way association with the socket
:return: None
:return: Channel
"""

chan = Channel(self, topic, self.params)
self.channels[topic].append(chan)

return chan

def summary(self):
def summary(self) -> None:
"""
Prints a list of topics and event the socket is listening to
:return: None
"""
for topic, chans in self.channels.items():
for chan in chans:
print(f"Topic: {topic} | Events: {[e for e, _ in chan.callbacks]}]")
print(
f"Topic: {topic} | Events: {[e for e, _ in chan.callbacks]}]")
File renamed without changes.
6 changes: 3 additions & 3 deletions realtime_py/message.py → realtime/message.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, Any


@dataclass
class Message:
"""
Dataclass abstraction for message
"""

event: str
payload: dict
ref: any
payload: Dict[str, Any]
ref: Any
topic: str

def __hash__(self):
Expand Down
Loading

0 comments on commit 966f986

Please sign in to comment.