-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Get_Flags Function to Modem #359
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
#=========================================================================== | ||
# | ||
# Modem get_flags handler. | ||
# | ||
#=========================================================================== | ||
from .. import log | ||
from .. import message as Msg | ||
from .Base import Base | ||
|
||
LOG = log.get_logger() | ||
|
||
|
||
class ModemGetFlags(Base): | ||
"""Modem get_flags handler. | ||
|
||
This handles a `get flags` command being sent to the modem. The response | ||
to this command is a single message containing the flags and 2 spare | ||
bytes. | ||
""" | ||
def __init__(self, modem, on_done=None): | ||
"""Constructor | ||
|
||
Args | ||
modem (Modem): The Insteon modem object. | ||
on_done: The finished callback. Calling signature: | ||
on_done( bool success, str message, data ) | ||
""" | ||
super().__init__(on_done) | ||
|
||
self.modem = modem | ||
|
||
#----------------------------------------------------------------------- | ||
def msg_received(self, protocol, msg): | ||
"""See if we can handle the message. | ||
|
||
If we get an ACK of the user reset, we'll clear the modem database. | ||
|
||
Args: | ||
protocol (Protocol): The Insteon Protocol object | ||
msg: Insteon message object that was read. | ||
|
||
Returns: | ||
Msg.UNKNOWN if we can't handle this message. | ||
Msg.CONTINUE if we handled the message and expect more. | ||
Msg.FINISHED if we handled the message and are done. | ||
""" | ||
if not self._PLM_sent: | ||
# If PLM hasn't sent our message yet, this can't be for us | ||
return Msg.UNKNOWN | ||
if isinstance(msg, Msg.OutGetModemFlags): | ||
if msg.is_ack: | ||
LOG.ui("Modem flag byte is: %s, spare bytes are: %s, %s", | ||
msg.modem_flags, msg.spare1, msg.spare2) | ||
self.on_done(True, "Modem get_flags complete", None) | ||
else: | ||
LOG.error("Modem get_flags failed") | ||
self.on_done(False, "Modem get_flags failed", None) | ||
|
||
return Msg.FINISHED | ||
|
||
return Msg.UNKNOWN | ||
|
||
#----------------------------------------------------------------------- |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
#=========================================================================== | ||
# | ||
# Output insteon reset the PLM modem message. | ||
# | ||
#=========================================================================== | ||
from .Base import Base | ||
|
||
|
||
class OutGetModemFlags(Base): | ||
"""Command requesting the modem configuration | ||
|
||
This command will retun 6 bytes: | ||
- 0x02 | ||
- 0x73 | ||
- Modem Configuration Flags | ||
- Spare 1 | ||
- Spare 2 | ||
- Ack/Nak | ||
""" | ||
msg_code = 0x73 | ||
fixed_msg_size = 6 | ||
|
||
#----------------------------------------------------------------------- | ||
@classmethod | ||
def from_bytes(cls, raw): | ||
"""Read the message from a byte stream. | ||
|
||
This should only be called if raw[1] == msg_code and len(raw) >= | ||
msg_size(). | ||
|
||
You cannot pass the output of to_bytes() to this. to_bytes() is used | ||
to output to the PLM but the modem sends back the same message with | ||
an extra ack byte which this function can read. | ||
|
||
Args: | ||
raw (bytes): The current byte stream to read from. | ||
|
||
Returns: | ||
Returns the constructed OutResetModem object. | ||
""" | ||
assert len(raw) >= cls.fixed_msg_size | ||
assert raw[0] == 0x02 and raw[1] == cls.msg_code | ||
modem_flags = raw[2] | ||
spare1 = raw[3] | ||
spare2 = raw[4] | ||
is_ack = raw[5] == 0x06 | ||
return OutGetModemFlags(is_ack, modem_flags, spare1, spare2) | ||
|
||
#----------------------------------------------------------------------- | ||
def __init__(self, is_ack=None, modem_flags=None, spare1=None, | ||
spare2=None): | ||
"""Constructor | ||
|
||
Args: | ||
is_ack (bool): True for ACK, False for NAK. None for output | ||
commands to the modem. | ||
""" | ||
super().__init__() | ||
|
||
self.is_ack = is_ack | ||
self.modem_flags = modem_flags | ||
self.spare1 = spare1 | ||
self.spare2 = spare2 | ||
|
||
#----------------------------------------------------------------------- | ||
def to_bytes(self): | ||
"""Convert the message to a byte array. | ||
|
||
Returns: | ||
bytes: Returns the message as bytes. | ||
""" | ||
return bytes([0x02, self.msg_code]) | ||
|
||
#----------------------------------------------------------------------- | ||
def __str__(self): | ||
ack = "" | ||
flags = "" | ||
spares = "" | ||
if self.is_ack is not None: | ||
ack = " ack: %s" % str(self.is_ack) | ||
flags = " modem flags: %s" % str(self.modem_flags) | ||
spares = " spares: %s %s" % (str(self.spare1), str(self.spare2)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may be more useful to print flags/spares in hex (assuming the conversion to string does so in decimal by default). |
||
return "OutGetModemFlags%s%s%s" % (flags, spares, ack) | ||
|
||
#----------------------------------------------------------------------- | ||
|
||
#=========================================================================== |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
#=========================================================================== | ||
# | ||
# Tests for: insteont_mqtt/handler/ModemGetFlags.py | ||
# | ||
# pylint: disable=attribute-defined-outside-init | ||
#=========================================================================== | ||
import insteon_mqtt as IM | ||
import insteon_mqtt.message as Msg | ||
import helpers as H | ||
|
||
|
||
class Test_ModemGetFlags: | ||
def test_acks(self, tmpdir): | ||
calls = [] | ||
|
||
def callback(success, msg, done): | ||
calls.append((success, msg, done)) | ||
|
||
modem = H.main.MockModem(tmpdir) | ||
proto = H.main.MockProtocol() | ||
handler = IM.handler.ModemGetFlags(modem, callback) | ||
handler._PLM_sent = True | ||
handler._PLM_ACK = True | ||
|
||
#Try a good message | ||
msg = Msg.OutGetModemFlags(is_ack=True, modem_flags=0x01, spare1=0x02, | ||
spare2=0x03) | ||
r = handler.msg_received(proto, msg) | ||
assert r == Msg.FINISHED | ||
assert calls[0][0] | ||
|
||
#Try a NAK message | ||
msg = Msg.OutGetModemFlags(is_ack=False, modem_flags=0x01, spare1=0x02, | ||
spare2=0x03) | ||
r = handler.msg_received(proto, msg) | ||
assert r == Msg.FINISHED | ||
assert not calls[1][0] | ||
|
||
#Wrong Message | ||
msg = Msg.OutResetModem(is_ack=True) | ||
r = handler.msg_received(proto, msg) | ||
assert r == Msg.UNKNOWN | ||
|
||
#----------------------------------------------------------------------- | ||
def test_plm_sent(self, tmpdir): | ||
calls = [] | ||
|
||
def callback(success, msg, done): | ||
calls.append((success, msg, done)) | ||
|
||
modem = H.main.MockModem(tmpdir) | ||
proto = H.main.MockProtocol() | ||
handler = IM.handler.ModemGetFlags(modem, callback) | ||
assert not handler._PLM_sent | ||
|
||
#Try a message prior to sent | ||
msg = Msg.OutGetModemFlags(is_ack=False, modem_flags=0x01, spare1=0x02, | ||
spare2=0x03) | ||
r = handler.msg_received(proto, msg) | ||
assert r == Msg.UNKNOWN | ||
|
||
# Signal Sent | ||
handler.sending_message(msg) | ||
assert handler._PLM_sent | ||
|
||
#Try a message prior to sent | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one is actually after sending ... |
||
msg = Msg.OutGetModemFlags(is_ack=False, modem_flags=0x01, spare1=0x02, | ||
spare2=0x03) | ||
r = handler.msg_received(proto, msg) | ||
assert r == Msg.FINISHED |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
#=========================================================================== | ||
# | ||
# Tests for: insteont_mqtt/message/OutGetModemFlags.py | ||
# | ||
#=========================================================================== | ||
import insteon_mqtt.message as Msg | ||
|
||
|
||
class Test_OutGetModemFlags: | ||
#----------------------------------------------------------------------- | ||
def test_out(self): | ||
obj = Msg.OutGetModemFlags() | ||
assert obj.fixed_msg_size == 6 | ||
|
||
b = obj.to_bytes() | ||
rt = bytes([0x02, 0x73]) | ||
assert b == rt | ||
|
||
str(obj) | ||
|
||
#----------------------------------------------------------------------- | ||
def test_in_ack(self): | ||
b = bytes([0x02, 0x73, 0x01, 0x02, 0x03, 0x06]) | ||
obj = Msg.OutGetModemFlags.from_bytes(b) | ||
assert obj.is_ack is True | ||
assert obj.modem_flags == 0x01 | ||
assert obj.spare1 == 0x02 | ||
assert obj.spare2 == 0x03 | ||
|
||
str(obj) | ||
|
||
#----------------------------------------------------------------------- | ||
def test_in_nack(self): | ||
b = bytes([0x02, 0x73, 0x01, 0x02, 0x03, 0x15]) | ||
obj = Msg.OutGetModemFlags.from_bytes(b) | ||
assert obj.is_ack is False | ||
assert obj.modem_flags == 0x01 | ||
assert obj.spare1 == 0x02 | ||
assert obj.spare2 == 0x03 | ||
|
||
str(obj) | ||
|
||
|
||
#----------------------------------------------------------------------- | ||
|
||
|
||
#=========================================================================== |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be more useful to print flags/spares in hex (assuming the conversion to string does so in decimal by default).