Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added DLMS data parser for use in GET.WITH_LIST #48

Merged
merged 1 commit into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Added DLMS data parser for use in GET.WITH_LIST
Fixes #47.
  • Loading branch information
Krolken committed Jun 14, 2021
commit bfeb4b9dd7645591eed15bd7bd72ccb9edb8a6c8
5 changes: 5 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ and this project adheres to [Calendar Versioning](https://calver.org/)

### Added

* To handle the more complicated parsing problem of GET.WITH_LIST with compound data
elements a new parser, DlmsDataParser, was added that focuses on only A-XDR DLMS data.
Hopefully this can be be used instead of the A-XDR Parser when the parsing of ACSE
services APDUs is built away

### Changed

### Deprecated
Expand Down
44 changes: 0 additions & 44 deletions dlms_cosem/a_xdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,50 +65,6 @@ def get_axdr_length(data: bytearray):
return int.from_bytes(length_data, "big")


def decode_variable_integer(bytes_input: bytes):
"""
If the length is fitting in 7 bits it can be encoded in 1 bytes.
If it is larger then 7 bybitstes the last bit of the first byte indicates
that the length of the lenght is encoded in the first byte and the length
is encoded in the following bytes.
Ex. 0b00000010 -> Length = 2
Ex 0b100000010, 0b000001111, 0b11111111 -> Lenght = 4095
:param bytes_input: Input where the variable integer is at the beginning of
the bytes
:return: First variable integer the function finds. and the residual bytes
"""

# is the length encoded in single byte or mutliple?
is_mutliple_bytes = bool(bytes_input[0] & 0b10000000)
if is_mutliple_bytes:
length_length = int(bytes_input[0] & 0b01111111)
length_data = bytes_input[1 : (length_length + 1)]
length = int.from_bytes(length_data, "big")
return length, bytes_input[length_length + 1 :]

else:
length = int(bytes_input[0] & 0b01111111)
return length, bytes_input[1:]


def encode_variable_integer(length: int):
if length > 0b01111111:
encoded_length = 1
while True:
try:
length.to_bytes(encoded_length, "big")
except OverflowError:
encoded_length += 1
continue
break

length_byte = (0b10000000 + encoded_length).to_bytes(1, "big")
return length_byte + length.to_bytes(encoded_length, "big")

else:
return length.to_bytes(1, "big")


@attr.s
class DataSequenceEncoding:
attribute_name: str = attr.ib()
Expand Down
1 change: 1 addition & 0 deletions dlms_cosem/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ def unprotect(self, event):
elif isinstance(event, xdlms.GeneralGlobalCipher):
self.update_meter_invocation_counter(event.invocation_counter)
plain_text = self.decrypt(event.ciphered_text)
LOG.warning(f"apdu_bytes: {plain_text!r}")
return XDlmsApduFactory.apdu_from_bytes(plain_text)

return event
Expand Down
182 changes: 181 additions & 1 deletion dlms_cosem/dlms_data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc
import datetime
from typing import *
from typing import List, Optional

import attr

Expand Down Expand Up @@ -72,6 +73,20 @@ class DataArray(BaseDlmsData):
TAG = 1
LENGTH = VARIABLE_LENGTH

def to_bytes(self) -> bytes:
out = bytearray()
out.append(self.TAG)
out.append(encode_variable_integer(len(self.value)))
for item in self.value:
out.extend(item.to_bytes())
return bytes(out)

def to_python(self) -> List[Any]:
values = list()
for item in self.value:
values.append(item.to_python())
return values


@attr.s(auto_attribs=True)
class DataStructure(BaseDlmsData):
Expand All @@ -80,6 +95,20 @@ class DataStructure(BaseDlmsData):
TAG = 2
LENGTH = VARIABLE_LENGTH

def to_bytes(self) -> bytes:
out = bytearray()
out.append(self.TAG)
out.extend(encode_variable_integer(len(self.value)))
for item in self.value:
out.extend(item.to_bytes())
return bytes(out)

def to_python(self) -> List[Any]:
values = list()
for item in self.value:
values.append(item.to_python())
return values


@attr.s(auto_attribs=True)
class BooleanData(BaseDlmsData):
Expand All @@ -97,6 +126,9 @@ class BitStringData(BaseDlmsData):
TAG = 4
LENGTH = VARIABLE_LENGTH

def value_to_bytes(self) -> bytes:
return self.value


@attr.s(auto_attribs=True)
class DoubleLongData(BaseDlmsData):
Expand All @@ -109,6 +141,9 @@ class DoubleLongData(BaseDlmsData):
def from_bytes(cls, bytes_data: bytes):
return cls(value=int.from_bytes(bytes_data, "big", signed=True))

def value_to_bytes(self) -> bytes:
return self.value.to_bytes(4, "big", signed=True)


@attr.s(auto_attribs=True)
class DoubleLongUnsignedData(BaseDlmsData):
Expand Down Expand Up @@ -171,7 +206,7 @@ def from_bytes(cls, bytes_data: bytes):
return cls(value=int.from_bytes(bytes_data, "big", signed=True))

def value_to_bytes(self) -> bytes:
return self.value.to_bytes(1, "big")
return self.value.to_bytes(1, "big", signed=True)


@attr.s(auto_attribs=True)
Expand All @@ -185,6 +220,9 @@ class LongData(BaseDlmsData):
def from_bytes(cls, bytes_data: bytes):
return cls(value=int.from_bytes(bytes_data, "big", signed=True))

def value_to_bytes(self) -> bytes:
return self.value.to_bytes(2, "big", signed=True)


@attr.s(auto_attribs=True)
class UnsignedIntegerData(BaseDlmsData):
Expand All @@ -197,6 +235,9 @@ class UnsignedIntegerData(BaseDlmsData):
def from_bytes(cls, bytes_data: bytes):
return cls(value=int.from_bytes(bytes_data, "big"))

def value_to_bytes(self) -> bytes:
return self.value.to_bytes(1, "big")


@attr.s(auto_attribs=True)
class UnsignedLongData(BaseDlmsData):
Expand Down Expand Up @@ -238,6 +279,9 @@ class Long64Data(BaseDlmsData):
def from_bytes(cls, bytes_data: bytes):
return cls(value=int.from_bytes(bytes_data, "big", signed=True))

def value_to_bytes(self) -> bytes:
return self.value.to_bytes(8, "big", signed=True)


@attr.s(auto_attribs=True)
class UnsignedLong64Data(BaseDlmsData):
Expand All @@ -252,6 +296,9 @@ class UnsignedLong64Data(BaseDlmsData):
def from_bytes(cls, bytes_data: bytes):
return cls(value=int.from_bytes(bytes_data, "big"))

def value_to_bytes(self) -> bytes:
return self.value.to_bytes(8, "big")


@attr.s(auto_attribs=True)
class EnumData(BaseDlmsData):
Expand All @@ -266,6 +313,9 @@ class EnumData(BaseDlmsData):
def from_bytes(cls, bytes_data: bytes):
return cls(value=int.from_bytes(bytes_data, "big"))

def value_to_bytes(self) -> bytes:
return self.value.to_bytes(1, "big")


@attr.s(auto_attribs=True)
class Float32Data(BaseDlmsData):
Expand Down Expand Up @@ -384,3 +434,133 @@ class DlmsDataFactory:
@classmethod
def get_data_class(cls, tag: int):
return cls.MAP[tag]


@attr.s(auto_attribs=True)
class DlmsDataParser:

buffer: bytearray = attr.ib(factory=bytearray, init=False)
pointer: int = attr.ib(default=0, init=False)
data: List[AbstractDlmsData] = attr.ib(factory=list, init=False)

@property
def buffer_empty(self) -> bool:
return self.pointer == len(self.buffer)

def parse(self, data: bytes, limit: Optional[int] = None):
# clear previous results
self.data = list()
self.buffer = bytearray()
self.pointer = 0
# fill the buffer
self.buffer += data

while not self.buffer_empty:
self.data.append(self.parse_one_entry())
if limit:
if len(self.data) >= limit:
break

return self.data

def parse_one_entry(self):

tag = self.get_bytes(1)
klass = DlmsDataFactory.get_data_class(int.from_bytes(tag, "big"))
if klass == DataArray:
return self.decode_array()
elif klass == DataStructure:
return self.decode_structure()
else:

return self.decode_data(klass)

def get_buffer_tail(self) -> bytearray:
return self.buffer[self.pointer :]

def decode_data(self, data_class) -> AbstractDlmsData:
if data_class.LENGTH == VARIABLE_LENGTH:
length = self.decode_variable_integer()
return data_class.from_bytes(self.get_bytes(length))
else:
return data_class.from_bytes(self.get_bytes(data_class.LENGTH))

def decode_array(self) -> DataArray:
item_count = self.decode_variable_integer()
elements = list()
for _ in range(0, item_count):
elements.append(self.parse_one_entry())
return DataArray(value=elements)

def decode_structure(self) -> DataStructure:
item_count = self.decode_variable_integer()
elements = list()
for _ in range(0, item_count):
elements.append(self.parse_one_entry())

return DataStructure(value=elements)

def get_bytes(self, length: int) -> bytearray:
"""Gets some bytes from the buffer and moves the pointer forward."""
part = self.buffer[self.pointer : self.pointer + length]
self.pointer += length
return part

@property
def remaining_buffer(self) -> bytearray:
return self.buffer[self.pointer :]

def decode_variable_integer(self) -> int:
length_data = bytearray()
first_byte = int.from_bytes(self.get_bytes(1), "big")
length_is_multiple_bytes = bool(first_byte & 0b10000000)
if not length_is_multiple_bytes:
return first_byte
number_of_bytes_representing_the_length = first_byte & 0b01111111
for _ in range(0, number_of_bytes_representing_the_length):
length_data.extend(self.get_bytes(1))
return int.from_bytes(length_data, "big")


def decode_variable_integer(bytes_input: bytes):
"""
If the length is fitting in 7 bits it can be encoded in 1 bytes.
If it is larger then 7 bybitstes the last bit of the first byte indicates
that the length of the lenght is encoded in the first byte and the length
is encoded in the following bytes.
Ex. 0b00000010 -> Length = 2
Ex 0b100000010, 0b000001111, 0b11111111 -> Lenght = 4095
:param bytes_input: Input where the variable integer is at the beginning of
the bytes
:return: First variable integer the function finds. and the residual bytes
"""

# is the length encoded in single byte or mutliple?
is_mutliple_bytes = bool(bytes_input[0] & 0b10000000)
if is_mutliple_bytes:
length_length = int(bytes_input[0] & 0b01111111)
length_data = bytes_input[1 : (length_length + 1)]
length = int.from_bytes(length_data, "big")
return length, bytes_input[length_length + 1 :]

else:
length = int(bytes_input[0] & 0b01111111)
return length, bytes_input[1:]


def encode_variable_integer(length: int):
if length > 0b01111111:
encoded_length = 1
while True:
try:
length.to_bytes(encoded_length, "big")
except OverflowError:
encoded_length += 1
continue
break

length_byte = (0b10000000 + encoded_length).to_bytes(1, "big")
return length_byte + length.to_bytes(encoded_length, "big")

else:
return length.to_bytes(1, "big")
Loading