Skip to content

Flickswitch/phxsocket

 
 

Repository files navigation

phxsocket

Synchronous phoenix websocket client using callbacks

Phoenix channels

Requirements

websockets

Usage

Import the package

import phxsocket

Create socket client

socket = phxsocket.Client("wss://target.url/websocket", {"options": "something"})

Connect and join a channel

if socket.connect(): # blocking, raises exception on failure
  channel = socket.channel("room:roomname", {"more options": "something else"})
  join_success, resp = channel.join()

Alternatively

def connect_to_channel(socket):
  channel = socket.channel("room:roomname", {"more options": "something else"})
  join_success, resp = channel.join()
  
socket.on_open = connect_to_channel
connection = socket.connect(blocking=False)

connection.wait() # blocking, raises exception on failure

Reconnect on disconnection

socket.on_close = lambda socket: socket.connect()

Subscribe to events

def do_something(payload):
  thing = payload["thing"]

channel.on("eventname", do_something)

Push data to a channel

channel.push("eventname", {"some": "data"})

Push data and wait for a response

message = channel.push("eventname", {"some": "data"}, reply=True)
response = message.wait_for_response() # blocking

Push data and react to the response with a callback

def respond(payload):
  print(payload)

channel.push("eventname", {"some": "data"}, respond)

Leave a channel

channel.leave()

Disconnect

socket.close()

Packages

No packages published

Languages

  • Python 100.0%