Skip to content

Commit

Permalink
Merge pull request #284 from krkeegan/Broadcast_Pause
Browse files Browse the repository at this point in the history
Add Pause After Broadcast Received
  • Loading branch information
krkeegan authored Dec 29, 2020
2 parents 8d618ed + a7edf59 commit 6ef6121
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 35 deletions.
6 changes: 5 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
[Hub Instructions](https://github.com/TD22057/insteon-mqtt/blob/dev/docs/hub.md)
([PR 201][P201])

- More pyTests, up to 75% coverage now. ([PR 262][P262] & [PR 268][P268])
- More pyTests, up to 76% coverage now. ([PR 262][P262] & [PR 268][P268])

- Significant improvement to the Modem database handling. There is no longer
a requirement to perform the command `refresh modem` for anything with the
Expand Down Expand Up @@ -43,6 +43,9 @@
- Refactor code around Pair(). Add significant amount of unit tests.
([PR 277][P277])

- Improved message timing after receiving a broadcast command from a device.
([PR 284][P284])

## [0.7.4]

### Additions
Expand Down Expand Up @@ -515,3 +518,4 @@ will add new features.
[P279]: https://github.com/TD22057/insteon-mqtt/pull/279
[P282]: https://github.com/TD22057/insteon-mqtt/pull/282
[P288]: https://github.com/TD22057/insteon-mqtt/pull/288
[P284]: https://github.com/TD22057/insteon-mqtt/pull/284
20 changes: 12 additions & 8 deletions insteon_mqtt/Protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import collections
import enum
import time
import datetime
from . import log
from . import message as Msg
from .Signal import Signal
Expand Down Expand Up @@ -230,12 +231,19 @@ def send(self, msg, msg_handler, high_priority=False, after=None):
def set_wait_time(self, wait_time):
"""Set the Next Time that a Message Can be Sent to Avoid Collision.
Next time that a message can be written.
Next time that a message can be written. If the wait time is set to
zero, it will cancel all pending wait time. If the wait time is
less than the current pending wait time, it will be ignored.
Args:
wait_time (epoch Seconds): The next time a message can be sent
"""
self._next_write_time = wait_time
if wait_time == 0 or wait_time > self._next_write_time:
wait_time = time.time() if wait_time == 0 else wait_time
self._next_write_time = wait_time
print_time = datetime.datetime.fromtimestamp(
self._next_write_time).strftime('%H:%M:%S.%f')[:-3]
LOG.debug("Setting next write time: %s", print_time)

#-----------------------------------------------------------------------
def is_addr_in_write_queue(self, addr):
Expand Down Expand Up @@ -303,10 +311,7 @@ def _data_read(self, link, data):
start = self._buf.find(0x15)
if start == 0:
LOG.info("PLM is busy, pausing briefly")
# Pause for 1/3 of a second if we are not already waiting
# longer
if self._next_write_time + .3 < time.time():
self._next_write_time = time.time() + .3
self.set_wait_time(time.time() + .3)
self._buf = self._buf[1:]
continue

Expand Down Expand Up @@ -393,8 +398,7 @@ def _is_duplicate(self, msg):

# Update the next allowed write time based on the number of hops that
# are remaining on the inbound message.
self._next_write_time = msg.expire_time
LOG.debug("Setting next write time: %f", self._next_write_time)
self.set_wait_time(msg.expire_time)

# See if we have a duplicate message.
if msg in self._read_history:
Expand Down
11 changes: 0 additions & 11 deletions insteon_mqtt/device/Base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#
#===========================================================================
import json
import time
import os.path
from .MsgHistory import MsgHistory
from ..Address import Address
Expand Down Expand Up @@ -1094,16 +1093,6 @@ def update_linked_devices(self, msg):
LOG.debug("Found %s responders in group %s", len(responders), group)
LOG.debug("Group %s -> %s", group, [i.addr.hex for i in responders])

# A device broadcast will be followed up a series of cleanup messages
# between the devices and sent to the modem. Don't send anything
# during this time to avoid causing a collision. Time is equal to .5
# second of overhead, plus .5 seconds per responer device. This is
# based off the same 87 msec empircal testing performed when designing
# misterhouse. Each device causes a cleanup and an ack. Assuminng
# a max of three hops in each direction that is 6 * .087 or .522 per
# device.
self.protocol.set_wait_time(time.time() + .5 + (len(responders) * .5))

# For each device that we're the controller of call it's handler for
# the broadcast message.
for elem in responders:
Expand Down
78 changes: 66 additions & 12 deletions insteon_mqtt/handler/Broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Broadcast message handler.
#
#===========================================================================
import time
from .. import log
from .. import message as Msg
from .Base import Base
Expand All @@ -24,6 +25,11 @@ class Broadcast(Base):
message gets ignored). So if we get the broadcast, the cleanup is
ignored.
Finally a broadcast LINK_CLEANUP_REPORT is sent. This message indicates
if the device received ACKs from all linked devices or not. This message
indicates that the device is finished sending messages. However, as
broadcast message, it is not guaranteed to be received.
This handler will call device.handle_broadcast(msg) for the device that
sends the message.
Expand Down Expand Up @@ -67,27 +73,62 @@ def msg_received(self, protocol, msg):
if not isinstance(msg, Msg.InpStandard):
return Msg.UNKNOWN

# Calculate the total time this process could take
# A device broadcast will be followed up a series of cleanup messages
# between the devices and sent to the modem. Don't send anything
# during this time to avoid causing a collision. Time is equal to .5
# second of overhead, plus .522 seconds per responer device. This is
# based off the same 87 msec empircal testing performed when designing
# misterhouse. Each device causes a cleanup and an ack. Assuminng
# a max of three hops in each direction that is 6 * .087 or .522 per
# device.
device = self.modem.find(msg.from_addr)
wait_time = 0
if device:
responders = device.db.find_group(msg.group)
wait_time = .5 + (len(responders) * .522)

# Process the all link broadcast.
if msg.flags.type == Msg.Flags.Type.ALL_LINK_BROADCAST:
self._last_broadcast = msg
return self._process(msg)
if msg.cmd1 == Msg.CmdType.LINK_CLEANUP_REPORT:
# This is the final broadcast signalling completion.
# All of these messages will be forwarded to the device
# potentially even duplicates
# Re-enable sending
# First clear wait time
protocol.set_wait_time(0)
# Then set as expire time of this message
protocol.set_wait_time(msg.expire_time)
# cmd2 identifies the number of failed devices
if msg.cmd2 == 0x00:
LOG.debug("Cleanup report for %s, grp %s success.",
msg.from_addr, msg.group)
else:
text = "Cleanup report for %s, grp %s had %d fails."
LOG.warning(text, msg.from_addr, msg.group, msg.cmd2)
return self._process(msg, protocol, wait_time)
else:
# This is the initial broadcast or an echo of it.
if self._should_process(msg, wait_time):
return self._process(msg, protocol, wait_time)
else:
return Msg.CONTINUE

# Clean up message is basically the same data but addressed to the
# modem. If we saw the broadcast, we don't need to handle this. But
# if we missed the broadcast, this gives us a second chance to
# trigger the scene.
elif msg.flags.type == Msg.Flags.Type.ALL_LINK_CLEANUP:
if self._should_process(msg):
return self._process(msg)

self._last_broadcast = None
return Msg.CONTINUE
if self._should_process(msg, wait_time):
return self._process(msg, protocol, wait_time)
else:
return Msg.CONTINUE

# Different message flags than we expected.
return Msg.UNKNOWN

#-----------------------------------------------------------------------
def _process(self, msg):
def _process(self, msg, protocol, wait_time):
"""Process the all link broadcast message.
Args:
Expand All @@ -107,29 +148,42 @@ def _process(self, msg):
LOG.info("Handling all link broadcast for %s '%s'", device.addr,
device.name)

# Save for deduplication detection
self._last_broadcast = msg

# Delay sending, see above
protocol.set_wait_time(time.time() + wait_time)

# Tell the device about it. This will look up all the responders for
# this group and tell them that the scene has been activated.
device.handle_broadcast(msg)
return Msg.CONTINUE

#-----------------------------------------------------------------------
def _should_process(self, msg):
def _should_process(self, msg, wait_time):
"""Should we process a cleanup message?
Checks if this is a duplicate of a message we have already seen.
Args:
msg (Msg.InpStandard): Cleanup message to handle.
wait_time (float): The number of seconds this process could
take to complate. Used to find duplicates
Returns:
bool: True if the message should be procssed, False otherwise.
"""
if not self._last_broadcast:
return True

# Don't process the cleanup if we just got the corresponding broadcast
# message from the same device.
# Don't process the message if we just got a corresponding broadcast
# message from the same device. Wait time plus expire time is used
# to ignore old messages. Expire_time adds more time then we need
# but the timings don't have to be perfect
if (self._last_broadcast.from_addr == msg.from_addr and
self._last_broadcast.group == msg.group and
self._last_broadcast.cmd1 == msg.cmd1):
self._last_broadcast.cmd1 == msg.cmd1 and
self._last_broadcast.expire_time + wait_time > time.time()):
return False

return True
2 changes: 1 addition & 1 deletion insteon_mqtt/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def initialize(level=None, screen=None, file=None, config=None):
log_obj.setLevel(level)

# Add handlers for the optional screen and file output.
fmt = '%(asctime)s %(levelname)s %(module)s: %(message)s'
fmt = '%(asctime)s.%(msecs)03d %(levelname)s %(module)s: %(message)s'
datefmt = '%Y-%m-%d %H:%M:%S'
formatter = logging.Formatter(fmt, datefmt)

Expand Down
4 changes: 3 additions & 1 deletion insteon_mqtt/message/Flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ def __eq__(self, rhs):

#-----------------------------------------------------------------------
def __str__(self):
return "%s%s" % (self.type, '' if not self.is_ext else ' ext')
return "%s%s mh:%s hl:%s" % (self.type,
'' if not self.is_ext else ' ext',
self.max_hops, self.hops_left)

#-----------------------------------------------------------------------
26 changes: 25 additions & 1 deletion tests/handler/test_Broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#
# pylint: disable=protected-access
#===========================================================================
import time
import insteon_mqtt as IM
import insteon_mqtt.message as Msg

Expand All @@ -21,21 +22,37 @@ def test_acks(self, tmpdir):

r = handler.msg_received(proto, "dummy")
assert r == Msg.UNKNOWN
assert len(calls) == 0

flags = Msg.Flags(Msg.Flags.Type.ALL_LINK_BROADCAST, False)
msg = Msg.InpStandard(addr, broadcast_to_addr, flags, 0x11, 0x01)

# no device
r = handler.msg_received(proto, msg)
assert r == Msg.UNKNOWN
assert len(calls) == 0

# test good broadcat
assert proto.wait_time == 0
device = IM.device.Base(proto, modem, addr, "foo")

# add 10 device db entries for this group
for count in range(10):
e_addr = IM.Address(0x10, 0xab, count)
db_flags = Msg.DbFlags(in_use=True, is_controller=True,
is_last_rec=False)
entry = IM.db.DeviceEntry(e_addr, 0x01, count, db_flags,
bytes([0x01, 0x02, 0x03]))
device.db.add_entry(entry)

device.handle_broadcast = calls.append
modem.add(device)
r = handler.msg_received(proto, msg)

assert r == Msg.CONTINUE
assert len(calls) == 1
# should be at least 5.5 seconds ahead
assert proto.wait_time - time.time() > 5

# cleanup should be ignored since prev was processed.
flags = Msg.Flags(Msg.Flags.Type.ALL_LINK_CLEANUP, False)
Expand All @@ -46,7 +63,7 @@ def test_acks(self, tmpdir):
assert len(calls) == 1

# If broadcast wasn't found, cleanup should be handled.
handler._handled = False
handler._last_broadcast = None
r = handler.msg_received(proto, msg)

assert r == Msg.CONTINUE
Expand All @@ -58,13 +75,16 @@ def test_acks(self, tmpdir):
assert r == Msg.UNKNOWN

# Success Report Broadcast
pre_success_time = proto.wait_time
flags = Msg.Flags(Msg.Flags.Type.ALL_LINK_BROADCAST, False)
success_report_to_addr = IM.Address(0x11, 1, 0x1)
msg = Msg.InpStandard(addr, addr, flags, 0x06, 0x00)
r = handler.msg_received(proto, msg)

assert r == Msg.CONTINUE
assert len(calls) == 3
# wait time should be cleared
assert proto.wait_time < pre_success_time

# Pretend that a new broadcast message dropped / not received by PLM

Expand All @@ -84,6 +104,10 @@ def test_acks(self, tmpdir):
class MockProto:
def __init__(self):
self.signal_received = IM.Signal()
self.wait_time = 0

def add_handler(self, *args):
pass

def set_wait_time(self, wait_time):
self.wait_time = wait_time
13 changes: 13 additions & 0 deletions tests/test_Protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
# pylint: disable=protected-access
#===========================================================================
import time
import pytest
import insteon_mqtt as IM
import insteon_mqtt.message as Msg

@pytest.fixture
def test_proto():
link = MockSerial()
return IM.Protocol(link)

class Test_Protocol:
def test_reads(self):
Expand Down Expand Up @@ -56,6 +61,14 @@ def test_duplicate(self):
assert proto._read_history[0] == msg_keep

#-----------------------------------------------------------------------
def test_set_wait_time(self, test_proto):
assert test_proto._next_write_time == 0
test_proto.set_wait_time(5)
assert test_proto._next_write_time == 5
test_proto.set_wait_time(2)
assert test_proto._next_write_time == 5
test_proto.set_wait_time(0)
assert test_proto._next_write_time > 5

#===========================================================================

Expand Down

0 comments on commit 6ef6121

Please sign in to comment.