Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chg: [serializer] add support for phoenix version 2 serialization #68

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9e23d75
chg: [serializer] add support for phoenix version 2 serialization
gallypette Jul 24, 2023
52ece71
Merge branch 'master' of https://github.com/supabase-community/realti…
gallypette Oct 4, 2023
e5bd333
add: [send] sending to the channel poc
gallypette Oct 4, 2023
93bf498
add: [send] listener on a msg ref
gallypette Oct 4, 2023
4b3215b
add: [leave] leaving a channel
gallypette Oct 5, 2023
ca06e8e
chg: [logging] handling control msg
gallypette Oct 17, 2023
172f9c1
chg: [logging] handling control msg - again
gallypette Oct 17, 2023
ccc9e78
chg: [connection] leave on kb interrupt
gallypette Oct 18, 2023
3e4424d
chg: [async] remove non-async API
gallypette Oct 20, 2023
f92d755
chg: [async] {} -> self.params
gallypette Oct 25, 2023
5009fcf
Fix reconnection
maxbaluev Oct 31, 2023
86d9306
Update connection.py, fix join async
maxbaluev Oct 31, 2023
d010483
Improve error handling
maxbaluev Nov 1, 2023
e9b4d66
fix _handle_reconnection
maxbaluev Nov 1, 2023
e9e44d8
test improvements
Nov 6, 2023
f271851
test improvements
Nov 6, 2023
fcdc3de
test improvements
Nov 6, 2023
1251851
test improvements
Nov 6, 2023
6b32e15
test improvements
Nov 7, 2023
8debb8e
break on terminate
Nov 7, 2023
f227115
fix message
Nov 7, 2023
fdc3440
fix message
Nov 8, 2023
d00acfb
fix message
Nov 8, 2023
001eb5a
fix message
Nov 8, 2023
a6be915
fix message
Nov 8, 2023
52af8ff
fix message
Nov 8, 2023
7a5474d
fix message
Nov 8, 2023
cb1b695
raise exceptions
Nov 10, 2023
6f8d9c8
Test callback improvements (#1)
karvetskiy Nov 11, 2023
4790742
do not raise exceptions on disconnect
Nov 13, 2023
fac7bfe
Merge remote-tracking branch 'origin/master'
Nov 13, 2023
373383a
retry connection to socket
Nov 23, 2023
080419e
chg: [connection] put sync callback in threads
gallypette Feb 8, 2024
7c70dd5
merge: supabase/master 30699c9
gallypette Feb 8, 2024
a1c3b18
chg: [doc] change the readme, adds examples
gallypette Feb 13, 2024
efded7f
chg: [python] bumps python to 3.9 for better functools
gallypette Feb 13, 2024
46725e7
chg: [docs] typos, links to examples
gallypette Feb 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test improvements
  • Loading branch information
seva committed Nov 6, 2023
commit f27185169c85ad69b9e54c96c4fc9ba1d329e99e
104 changes: 54 additions & 50 deletions realtime/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from realtime.exceptions import NotConnectedError
from realtime.message import HEARTBEAT_PAYLOAD, PHOENIX_CHANNEL, ChannelEvents, Message


T_Retval = TypeVar("T_Retval")
T_ParamSpec = ParamSpec("T_ParamSpec")

Expand All @@ -40,12 +39,12 @@ def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval:

class Socket:
def __init__(
self,
url: str,
auto_reconnect: bool = False,
params: Dict[str, Any] = {},
hb_interval: int = 30,
version: int = 2,
self,
url: str,
auto_reconnect: bool = False,
params: Dict[str, Any] = {},
hb_interval: int = 30,
version: int = 2,
) -> None:
"""
`Socket` is the abstraction for an actual socket connection that receives and 'reroutes' `Message` according to its `topic` and `event`.
Expand All @@ -68,6 +67,51 @@ def __init__(

self.channels: DefaultDict[str, List[Channel]] = defaultdict(list)

async def _process_message(self, msg: str):
try:
if self.version == 1:
msg = Message(**json.loads(msg))
elif self.version == 2:
msg_array = json.loads(msg)
msg = Message(
join_ref=msg_array[0],
ref=msg_array[1],
topic=msg_array[2],
event=msg_array[3],
payload=msg_array[4],
)
if msg.event == ChannelEvents.reply:
for channel in self.channels.get(msg.topic, []):
if msg.ref == channel.control_msg_ref:
if msg.payload["status"] == "error":
logging.info(
f"Error joining channel: {msg.topic} - {msg.payload['response']['reason']}"
)
break
elif msg.payload["status"] == "ok":
logging.info(f"Successfully joined {msg.topic}")
continue
else:
for cl in channel.listeners:
if cl.ref in ["*", msg.ref]:
cl.callback(msg.payload)

if msg.event == ChannelEvents.close:
for channel in self.channels.get(msg.topic, []):
if msg.join_ref == channel.join_ref:
logging.info(f"Successfully left {msg.topic}")
continue

for channel in self.channels.get(msg.topic, []):
for cl in channel.listeners:
if cl.event in ["*", msg.event]:
if asyncio.iscoroutinefunction(cl.callback):
await cl.callback(msg.payload)
else:
cl.callback(msg.payload)
except Exception as e:
logging.error(f"Error processing message: {e}", exc_info=True)

@ensure_connection
async def listen(self) -> None:
"""
Expand All @@ -79,47 +123,7 @@ async def listen(self) -> None:
while True:
try:
msg = await self.ws_connection.recv()
if self.version == 1:
msg = Message(**json.loads(msg))
elif self.version == 2:
msg_array = json.loads(msg)
msg = Message(
join_ref=msg_array[0],
ref=msg_array[1],
topic=msg_array[2],
event=msg_array[3],
payload=msg_array[4],
)
if msg.event == ChannelEvents.reply:
for channel in self.channels.get(msg.topic, []):
if msg.ref == channel.control_msg_ref:
if msg.payload["status"] == "error":
logging.info(
f"Error joining channel: {msg.topic} - {msg.payload['response']['reason']}"
)
break
elif msg.payload["status"] == "ok":
logging.info(f"Successfully joined {msg.topic}")
continue
else:
for cl in channel.listeners:
if cl.ref in ["*", msg.ref]:
cl.callback(msg.payload)

if msg.event == ChannelEvents.close:
for channel in self.channels.get(msg.topic, []):
if msg.join_ref == channel.join_ref:
logging.info(f"Successfully left {msg.topic}")
continue

for channel in self.channels.get(msg.topic, []):
for cl in channel.listeners:
if cl.event in ["*", msg.event]:
if asyncio.iscoroutinefunction(cl.callback):
asyncio.create_task(cl.callback(msg.payload))
else:
cl.callback(msg.payload)

asyncio.create_task(self._process_message(msg))

except ConnectionClosedOK:
logging.info("Connection was closed normally.")
Expand All @@ -145,7 +149,7 @@ async def listen(self) -> None:
await self.leave_all()

except (
Exception
Exception
) as e: # A general exception handler should be the last resort
logging.error(f"Unexpected error in listen: {e}")
await self._handle_reconnection()
Expand Down Expand Up @@ -209,7 +213,7 @@ async def keep_alive(self) -> None:
await self._handle_reconnection()

except (
Exception
Exception
) as e: # A general exception handler should be the last resort
logging.error(f"Unexpected error in keep_alive: {e}")

Expand Down