define checksum support in telegram specification; moved telegram v2 exception temporarily from parser to MBUSObject;

This commit is contained in:
Nigel Dokter 2017-01-21 10:33:17 +01:00
parent 07634abed1
commit 45f5fe2c36
4 changed files with 109 additions and 106 deletions

View File

@ -12,26 +12,23 @@ class MBusObject(DSMRObject):
@property @property
def value(self): def value(self):
# TODO temporary workaround for DSMR v2.2. Maybe use the same type of
# TODO object, but let the parse set them differently? So don't use
# TODO hardcoded indexes here.
if len(self.values) != 2: # v2
return self.values[5]['value']
else:
return self.values[1]['value'] return self.values[1]['value']
@property @property
def unit(self): def unit(self):
return self.values[1]['unit'] # TODO temporary workaround for DSMR v2.2. Maybe use the same type of
# TODO object, but let the parse set them differently? So don't use
# TODO hardcoded indexes here.
class MBusObjectV2_2(DSMRObject): if len(self.values) != 2: # v2
@property
def datetime(self):
return self.values[0]['value']
@property
def value(self):
return self.values[5]['value']
@property
def unit(self):
return self.values[4]['value'] return self.values[4]['value']
else:
return self.values[1]['unit']
class CosemObject(DSMRObject): class CosemObject(DSMRObject):

View File

@ -3,7 +3,7 @@ import re
from PyCRC.CRC16 import CRC16 from PyCRC.CRC16 import CRC16
from dsmr_parser.objects import MBusObject, MBusObjectV2_2, CosemObject from dsmr_parser.objects import MBusObject, CosemObject
from dsmr_parser.exceptions import ParseError, InvalidChecksumError from dsmr_parser.exceptions import ParseError, InvalidChecksumError
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -11,14 +11,15 @@ logger = logging.getLogger(__name__)
class TelegramParser(object): class TelegramParser(object):
def __init__(self, telegram_specification, def __init__(self, telegram_specification, apply_checksum_validation=True):
enable_checksum_validation=False):
""" """
:param telegram_specification: determines how the telegram is parsed :param telegram_specification: determines how the telegram is parsed
:param apply_checksum_validation: validate checksum if applicable for
telegram DSMR version (v4 and up).
:type telegram_specification: dict :type telegram_specification: dict
""" """
self.telegram_specification = telegram_specification self.telegram_specification = telegram_specification
self.enable_checksum_validation = enable_checksum_validation self.apply_checksum_validation = apply_checksum_validation
def parse(self, telegram_data): def parse(self, telegram_data):
""" """
@ -32,19 +33,22 @@ class TelegramParser(object):
:returns: Shortened example: :returns: Shortened example:
{ {
.. ..
r'0-0:96\.1\.1': <CosemObject>, # EQUIPMENT_IDENTIFIER r'\d-\d:96\.1\.1.+?\r\n': <CosemObject>, # EQUIPMENT_IDENTIFIER
r'1-0:1\.8\.1': <CosemObject>, # ELECTRICITY_USED_TARIFF_1 r'\d-\d:1\.8\.1.+?\r\n': <CosemObject>, # ELECTRICITY_USED_TARIFF_1
r'0-\d:24\.3\.0': <MBusObject>, # GAS_METER_READING r'\d-\d:24\.3\.0.+?\r\n.+?\r\n': <MBusObject>, # GAS_METER_READING
.. ..
} }
:raises ParseError:
:raises InvalidChecksumError:
""" """
if self.enable_checksum_validation: if self.apply_checksum_validation \
and self.telegram_specification['checksum_support']:
self.validate_checksum(telegram_data) self.validate_checksum(telegram_data)
telegram = {} telegram = {}
for signature, parser in self.telegram_specification.items(): for signature, parser in self.telegram_specification['objects'].items():
match = re.search(signature, telegram_data, re.DOTALL) match = re.search(signature, telegram_data, re.DOTALL)
if match: if match:
@ -124,11 +128,7 @@ class MBusParser(DSMRObjectParser):
""" """
def parse(self, line): def parse(self, line):
values = self._parse(line) return MBusObject(self._parse(line))
if len(values) == 2:
return MBusObject(values)
else:
return MBusObjectV2_2(values)
class CosemParser(DSMRObjectParser): class CosemParser(DSMRObjectParser):

View File

@ -14,6 +14,8 @@ how the telegram lines are parsed.
""" """
V2_2 = { V2_2 = {
'checksum_support': False,
'objects': {
obis.EQUIPMENT_IDENTIFIER: CosemParser(ValueParser(str)), obis.EQUIPMENT_IDENTIFIER: CosemParser(ValueParser(str)),
obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)), obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_USED_TARIFF_2: CosemParser(ValueParser(Decimal)), obis.ELECTRICITY_USED_TARIFF_2: CosemParser(ValueParser(Decimal)),
@ -37,9 +39,12 @@ V2_2 = {
ValueParser(str), ValueParser(str),
ValueParser(Decimal), ValueParser(Decimal),
), ),
}
} }
V4 = { V4 = {
'checksum_support': True,
'objects': {
obis.P1_MESSAGE_HEADER: CosemParser(ValueParser(str)), obis.P1_MESSAGE_HEADER: CosemParser(ValueParser(str)),
obis.P1_MESSAGE_TIMESTAMP: CosemParser(ValueParser(timestamp)), obis.P1_MESSAGE_TIMESTAMP: CosemParser(ValueParser(timestamp)),
obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)), obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)),
@ -70,4 +75,5 @@ V4 = {
obis.EQUIPMENT_IDENTIFIER_GAS: CosemParser(ValueParser(str)), obis.EQUIPMENT_IDENTIFIER_GAS: CosemParser(ValueParser(str)),
obis.HOURLY_GAS_METER_READING: MBusParser(ValueParser(timestamp), obis.HOURLY_GAS_METER_READING: MBusParser(ValueParser(timestamp),
ValueParser(Decimal)) ValueParser(Decimal))
}
} }

View File

@ -15,28 +15,6 @@ from test.example_telegrams import TELEGRAM_V4_2
class TelegramParserV4_2Test(unittest.TestCase): class TelegramParserV4_2Test(unittest.TestCase):
""" Test parsing of a DSMR v4.2 telegram. """ """ Test parsing of a DSMR v4.2 telegram. """
def test_valid(self):
# No exception is raised.
TelegramParser.validate_checksum(TELEGRAM_V4_2)
def test_invalid(self):
# Remove the electricty used data value. This causes the checksum to
# not match anymore.
corrupted_telegram = TELEGRAM_V4_2.replace(
'1-0:1.8.1(001581.123*kWh)\r\n',
''
)
with self.assertRaises(InvalidChecksumError):
TelegramParser.validate_checksum(corrupted_telegram)
def test_missing_checksum(self):
# Remove the checksum value causing a ParseError.
corrupted_telegram = TELEGRAM_V4_2.replace('!6796\r\n', '')
with self.assertRaises(ParseError):
TelegramParser.validate_checksum(corrupted_telegram)
def test_parse(self): def test_parse(self):
parser = TelegramParser(telegram_specifications.V4) parser = TelegramParser(telegram_specifications.V4)
result = parser.parse(TELEGRAM_V4_2) result = parser.parse(TELEGRAM_V4_2)
@ -219,3 +197,25 @@ class TelegramParserV4_2Test(unittest.TestCase):
# VALVE_POSITION_GAS (0-x:24.4.0) # VALVE_POSITION_GAS (0-x:24.4.0)
# TODO to be implemented # TODO to be implemented
def test_checksum_valid(self):
# No exception is raised.
TelegramParser.validate_checksum(TELEGRAM_V4_2)
def test_checksum_invalid(self):
# Remove the electricty used data value. This causes the checksum to
# not match anymore.
corrupted_telegram = TELEGRAM_V4_2.replace(
'1-0:1.8.1(001581.123*kWh)\r\n',
''
)
with self.assertRaises(InvalidChecksumError):
TelegramParser.validate_checksum(corrupted_telegram)
def test_checksum_missing(self):
# Remove the checksum value causing a ParseError.
corrupted_telegram = TELEGRAM_V4_2.replace('!6796\r\n', '')
with self.assertRaises(ParseError):
TelegramParser.validate_checksum(corrupted_telegram)