Skip to content

Commit

Permalink
logzeromq: Create sockets in threads that use them.
Browse files Browse the repository at this point in the history
With sockets inside the thread functions, it should not be possible to
use them from multiple threads (accidentally or otherwise).
  • Loading branch information
zbynekwinkler committed Apr 15, 2020
1 parent c91d067 commit a601b3a
Showing 1 changed file with 32 additions and 35 deletions.
67 changes: 32 additions & 35 deletions osgar/drivers/logzeromq.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,16 @@ class LogZeroMQ:
def __init__(self, config, bus):
bus.register('raw:null')
mode = config['mode']
endpoint = config['endpoint']
self.endpoint = config['endpoint']
self.timeout = config.get('timeout', 1) # default recv timeout 1s

self.context = zmq.Context()
print("Connecting to hello world server ...")
if mode == 'PULL':
self.socket = self.context.socket(zmq.PULL)
if 'timeout' in config:
# https://stackoverflow.com/questions/7538988/zeromq-how-to-prevent-infinite-wait
self.socket.RCVTIMEO = int(config['timeout'] * 1000) # in milliseconds
self.thread = Thread(target=self.run_input, daemon=False)
self.thread = Thread(target=self.run_input)
elif mode == 'PUSH':
self.socket = self.context.socket(zmq.PUSH)
# https://stackoverflow.com/questions/24619490/how-do-i-clear-the-buffer-upon-start-exit-in-zmq-socket-to-prevent-server-from
# ZMQ_LINGER: Set linger period for socket shutdown
# The default value of -1 specifies an infinite linger period.
# Pending messages shall not be discarded after a call to
# zmq_close(); attempting to terminate the socket's context with
# zmq_term() shall block until all pending messages have been sent
# to a peer.
self.socket.setsockopt(zmq.LINGER, 100) # milliseconds
self.thread = Thread(target=self.run_output, daemon=False)
self.thread = Thread(target=self.run_output)
else:
assert False, mode # unknown/unsupported mode

self.socket.connect(endpoint)
self.thread.name = bus.name
self.bus = bus

Expand All @@ -47,35 +32,47 @@ def start(self):
def join(self, timeout=None):
self.thread.join(timeout=timeout)

def _close(self):
self.socket.close()
self.context.term()

def run_input(self):
context = zmq.Context()
socket = context.socket(zmq.PULL)
# https://stackoverflow.com/questions/7538988/zeromq-how-to-prevent-infinite-wait
socket.RCVTIMEO = int(self.timeout * 1000) # convert to milliseconds
socket.connect(self.endpoint)

while self.bus.is_alive():
try:
message = self.socket.recv()
message = socket.recv()
self.bus.publish('raw', message)
except zmq.error.Again:
pass
self._close()

def slot_raw(self, timestamp, data):
while self.bus.is_alive():
try:
self.socket.send(data, zmq.NOBLOCK)
break
except zmq.error.Again:
self.bus.sleep(0.1)
socket.close()
context.term()

def run_output(self):
context = zmq.Context()
socket = context.socket(zmq.PUSH)
# https://stackoverflow.com/questions/24619490/how-do-i-clear-the-buffer-upon-start-exit-in-zmq-socket-to-prevent-server-from
# ZMQ_LINGER: Set linger period for socket shutdown
# The default value of -1 specifies an infinite linger period.
# Pending messages shall not be discarded after a call to
# zmq_close(); attempting to terminate the socket's context with
# zmq_term() shall block until all pending messages have been sent
# to a peer.
socket.setsockopt(zmq.LINGER, 100) # milliseconds
socket.connect(self.endpoint)
try:
while True:
dt, __, data = self.bus.listen()
self.slot_raw(dt, data)
while self.bus.is_alive():
try:
socket.send(data, zmq.NOBLOCK)
break
except zmq.error.Again:
self.bus.sleep(0.1)
except BusShutdownException:
pass
self._close()
socket.close()
context.term()

def request_stop(self):
self.bus.shutdown()
Expand Down

0 comments on commit a601b3a

Please sign in to comment.