From 45f5fe2c3691fc2df9a6d24120b0db3ba8543b05 Mon Sep 17 00:00:00 2001 From: Nigel Dokter Date: Sat, 21 Jan 2017 10:33:17 +0100 Subject: [PATCH] define checksum support in telegram specification; moved telegram v2 exception temporarily from parser to MBUSObject; --- dsmr_parser/objects.py | 31 ++++--- dsmr_parser/parsers.py | 28 +++---- dsmr_parser/telegram_specifications.py | 112 +++++++++++++------------ test/test_parse_v4_2.py | 44 +++++----- 4 files changed, 109 insertions(+), 106 deletions(-) diff --git a/dsmr_parser/objects.py b/dsmr_parser/objects.py index f09fda5..890dd50 100644 --- a/dsmr_parser/objects.py +++ b/dsmr_parser/objects.py @@ -12,26 +12,23 @@ class MBusObject(DSMRObject): @property def value(self): - return self.values[1]['value'] + # TODO temporary workaround for DSMR v2.2. Maybe use the same type of + # TODO object, but let the parse set them differently? So don't use + # TODO hardcoded indexes here. + if len(self.values) != 2: # v2 + return self.values[5]['value'] + else: + return self.values[1]['value'] @property def unit(self): - return self.values[1]['unit'] - - -class MBusObjectV2_2(DSMRObject): - - @property - def datetime(self): - return self.values[0]['value'] - - @property - def value(self): - return self.values[5]['value'] - - @property - def unit(self): - return self.values[4]['value'] + # TODO temporary workaround for DSMR v2.2. Maybe use the same type of + # TODO object, but let the parse set them differently? So don't use + # TODO hardcoded indexes here. + if len(self.values) != 2: # v2 + return self.values[4]['value'] + else: + return self.values[1]['unit'] class CosemObject(DSMRObject): diff --git a/dsmr_parser/parsers.py b/dsmr_parser/parsers.py index 2cf6ab9..069c02a 100644 --- a/dsmr_parser/parsers.py +++ b/dsmr_parser/parsers.py @@ -3,7 +3,7 @@ import re from PyCRC.CRC16 import CRC16 -from dsmr_parser.objects import MBusObject, MBusObjectV2_2, CosemObject +from dsmr_parser.objects import MBusObject, CosemObject from dsmr_parser.exceptions import ParseError, InvalidChecksumError logger = logging.getLogger(__name__) @@ -11,14 +11,15 @@ logger = logging.getLogger(__name__) class TelegramParser(object): - def __init__(self, telegram_specification, - enable_checksum_validation=False): + def __init__(self, telegram_specification, apply_checksum_validation=True): """ :param telegram_specification: determines how the telegram is parsed + :param apply_checksum_validation: validate checksum if applicable for + telegram DSMR version (v4 and up). :type telegram_specification: dict """ self.telegram_specification = telegram_specification - self.enable_checksum_validation = enable_checksum_validation + self.apply_checksum_validation = apply_checksum_validation def parse(self, telegram_data): """ @@ -32,19 +33,22 @@ class TelegramParser(object): :returns: Shortened example: { .. - r'0-0:96\.1\.1': , # EQUIPMENT_IDENTIFIER - r'1-0:1\.8\.1': , # ELECTRICITY_USED_TARIFF_1 - r'0-\d:24\.3\.0': , # GAS_METER_READING + r'\d-\d:96\.1\.1.+?\r\n': , # EQUIPMENT_IDENTIFIER + r'\d-\d:1\.8\.1.+?\r\n': , # ELECTRICITY_USED_TARIFF_1 + r'\d-\d:24\.3\.0.+?\r\n.+?\r\n': , # GAS_METER_READING .. } + :raises ParseError: + :raises InvalidChecksumError: """ - if self.enable_checksum_validation: + if self.apply_checksum_validation \ + and self.telegram_specification['checksum_support']: self.validate_checksum(telegram_data) telegram = {} - for signature, parser in self.telegram_specification.items(): + for signature, parser in self.telegram_specification['objects'].items(): match = re.search(signature, telegram_data, re.DOTALL) if match: @@ -124,11 +128,7 @@ class MBusParser(DSMRObjectParser): """ def parse(self, line): - values = self._parse(line) - if len(values) == 2: - return MBusObject(values) - else: - return MBusObjectV2_2(values) + return MBusObject(self._parse(line)) class CosemParser(DSMRObjectParser): diff --git a/dsmr_parser/telegram_specifications.py b/dsmr_parser/telegram_specifications.py index adc5e62..b4f48c2 100644 --- a/dsmr_parser/telegram_specifications.py +++ b/dsmr_parser/telegram_specifications.py @@ -14,60 +14,66 @@ how the telegram lines are parsed. """ V2_2 = { - obis.EQUIPMENT_IDENTIFIER: CosemParser(ValueParser(str)), - obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)), - obis.ELECTRICITY_USED_TARIFF_2: CosemParser(ValueParser(Decimal)), - obis.ELECTRICITY_DELIVERED_TARIFF_1: CosemParser(ValueParser(Decimal)), - obis.ELECTRICITY_DELIVERED_TARIFF_2: CosemParser(ValueParser(Decimal)), - obis.ELECTRICITY_ACTIVE_TARIFF: CosemParser(ValueParser(str)), - obis.CURRENT_ELECTRICITY_USAGE: CosemParser(ValueParser(Decimal)), - obis.CURRENT_ELECTRICITY_DELIVERY: CosemParser(ValueParser(Decimal)), - obis.ACTUAL_TRESHOLD_ELECTRICITY: CosemParser(ValueParser(Decimal)), - obis.ACTUAL_SWITCH_POSITION: CosemParser(ValueParser(str)), - obis.TEXT_MESSAGE_CODE: CosemParser(ValueParser(int)), - obis.TEXT_MESSAGE: CosemParser(ValueParser(str)), - obis.EQUIPMENT_IDENTIFIER_GAS: CosemParser(ValueParser(str)), - obis.DEVICE_TYPE: CosemParser(ValueParser(str)), - obis.VALVE_POSITION_GAS: CosemParser(ValueParser(str)), - obis.GAS_METER_READING: MBusParser( - ValueParser(timestamp), - ValueParser(int), - ValueParser(int), - ValueParser(int), - ValueParser(str), - ValueParser(Decimal), - ), + 'checksum_support': False, + 'objects': { + obis.EQUIPMENT_IDENTIFIER: CosemParser(ValueParser(str)), + obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_USED_TARIFF_2: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_DELIVERED_TARIFF_1: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_DELIVERED_TARIFF_2: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_ACTIVE_TARIFF: CosemParser(ValueParser(str)), + obis.CURRENT_ELECTRICITY_USAGE: CosemParser(ValueParser(Decimal)), + obis.CURRENT_ELECTRICITY_DELIVERY: CosemParser(ValueParser(Decimal)), + obis.ACTUAL_TRESHOLD_ELECTRICITY: CosemParser(ValueParser(Decimal)), + obis.ACTUAL_SWITCH_POSITION: CosemParser(ValueParser(str)), + obis.TEXT_MESSAGE_CODE: CosemParser(ValueParser(int)), + obis.TEXT_MESSAGE: CosemParser(ValueParser(str)), + obis.EQUIPMENT_IDENTIFIER_GAS: CosemParser(ValueParser(str)), + obis.DEVICE_TYPE: CosemParser(ValueParser(str)), + obis.VALVE_POSITION_GAS: CosemParser(ValueParser(str)), + obis.GAS_METER_READING: MBusParser( + ValueParser(timestamp), + ValueParser(int), + ValueParser(int), + ValueParser(int), + ValueParser(str), + ValueParser(Decimal), + ), + } } V4 = { - obis.P1_MESSAGE_HEADER: CosemParser(ValueParser(str)), - obis.P1_MESSAGE_TIMESTAMP: CosemParser(ValueParser(timestamp)), - obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)), - obis.ELECTRICITY_USED_TARIFF_2: CosemParser(ValueParser(Decimal)), - obis.ELECTRICITY_DELIVERED_TARIFF_1: CosemParser(ValueParser(Decimal)), - obis.ELECTRICITY_DELIVERED_TARIFF_2: CosemParser(ValueParser(Decimal)), - obis.ELECTRICITY_ACTIVE_TARIFF: CosemParser(ValueParser(str)), - obis.EQUIPMENT_IDENTIFIER: CosemParser(ValueParser(str)), - obis.CURRENT_ELECTRICITY_USAGE: CosemParser(ValueParser(Decimal)), - obis.CURRENT_ELECTRICITY_DELIVERY: CosemParser(ValueParser(Decimal)), - obis.LONG_POWER_FAILURE_COUNT: CosemParser(ValueParser(int)), - # POWER_EVENT_FAILURE_LOG: ProfileGenericParser(), TODO - obis.VOLTAGE_SAG_L1_COUNT: CosemParser(ValueParser(int)), - obis.VOLTAGE_SAG_L2_COUNT: CosemParser(ValueParser(int)), - obis.VOLTAGE_SAG_L3_COUNT: CosemParser(ValueParser(int)), - obis.VOLTAGE_SWELL_L1_COUNT: CosemParser(ValueParser(int)), - obis.VOLTAGE_SWELL_L2_COUNT: CosemParser(ValueParser(int)), - obis.VOLTAGE_SWELL_L3_COUNT: CosemParser(ValueParser(int)), - obis.TEXT_MESSAGE_CODE: CosemParser(ValueParser(int)), - obis.TEXT_MESSAGE: CosemParser(ValueParser(str)), - obis.DEVICE_TYPE: CosemParser(ValueParser(int)), - obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE: CosemParser(ValueParser(Decimal)), - obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE: CosemParser(ValueParser(Decimal)), - obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE: CosemParser(ValueParser(Decimal)), - obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE: CosemParser(ValueParser(Decimal)), - obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE: CosemParser(ValueParser(Decimal)), - obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE: CosemParser(ValueParser(Decimal)), - obis.EQUIPMENT_IDENTIFIER_GAS: CosemParser(ValueParser(str)), - obis.HOURLY_GAS_METER_READING: MBusParser(ValueParser(timestamp), - ValueParser(Decimal)) + 'checksum_support': True, + 'objects': { + obis.P1_MESSAGE_HEADER: CosemParser(ValueParser(str)), + obis.P1_MESSAGE_TIMESTAMP: CosemParser(ValueParser(timestamp)), + obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_USED_TARIFF_2: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_DELIVERED_TARIFF_1: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_DELIVERED_TARIFF_2: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_ACTIVE_TARIFF: CosemParser(ValueParser(str)), + obis.EQUIPMENT_IDENTIFIER: CosemParser(ValueParser(str)), + obis.CURRENT_ELECTRICITY_USAGE: CosemParser(ValueParser(Decimal)), + obis.CURRENT_ELECTRICITY_DELIVERY: CosemParser(ValueParser(Decimal)), + obis.LONG_POWER_FAILURE_COUNT: CosemParser(ValueParser(int)), + # POWER_EVENT_FAILURE_LOG: ProfileGenericParser(), TODO + obis.VOLTAGE_SAG_L1_COUNT: CosemParser(ValueParser(int)), + obis.VOLTAGE_SAG_L2_COUNT: CosemParser(ValueParser(int)), + obis.VOLTAGE_SAG_L3_COUNT: CosemParser(ValueParser(int)), + obis.VOLTAGE_SWELL_L1_COUNT: CosemParser(ValueParser(int)), + obis.VOLTAGE_SWELL_L2_COUNT: CosemParser(ValueParser(int)), + obis.VOLTAGE_SWELL_L3_COUNT: CosemParser(ValueParser(int)), + obis.TEXT_MESSAGE_CODE: CosemParser(ValueParser(int)), + obis.TEXT_MESSAGE: CosemParser(ValueParser(str)), + obis.DEVICE_TYPE: CosemParser(ValueParser(int)), + obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE: CosemParser(ValueParser(Decimal)), + obis.EQUIPMENT_IDENTIFIER_GAS: CosemParser(ValueParser(str)), + obis.HOURLY_GAS_METER_READING: MBusParser(ValueParser(timestamp), + ValueParser(Decimal)) + } } diff --git a/test/test_parse_v4_2.py b/test/test_parse_v4_2.py index 1e04f11..681783b 100644 --- a/test/test_parse_v4_2.py +++ b/test/test_parse_v4_2.py @@ -15,28 +15,6 @@ from test.example_telegrams import TELEGRAM_V4_2 class TelegramParserV4_2Test(unittest.TestCase): """ Test parsing of a DSMR v4.2 telegram. """ - def test_valid(self): - # No exception is raised. - TelegramParser.validate_checksum(TELEGRAM_V4_2) - - def test_invalid(self): - # Remove the electricty used data value. This causes the checksum to - # not match anymore. - corrupted_telegram = TELEGRAM_V4_2.replace( - '1-0:1.8.1(001581.123*kWh)\r\n', - '' - ) - - with self.assertRaises(InvalidChecksumError): - TelegramParser.validate_checksum(corrupted_telegram) - - def test_missing_checksum(self): - # Remove the checksum value causing a ParseError. - corrupted_telegram = TELEGRAM_V4_2.replace('!6796\r\n', '') - - with self.assertRaises(ParseError): - TelegramParser.validate_checksum(corrupted_telegram) - def test_parse(self): parser = TelegramParser(telegram_specifications.V4) result = parser.parse(TELEGRAM_V4_2) @@ -219,3 +197,25 @@ class TelegramParserV4_2Test(unittest.TestCase): # VALVE_POSITION_GAS (0-x:24.4.0) # TODO to be implemented + + def test_checksum_valid(self): + # No exception is raised. + TelegramParser.validate_checksum(TELEGRAM_V4_2) + + def test_checksum_invalid(self): + # Remove the electricty used data value. This causes the checksum to + # not match anymore. + corrupted_telegram = TELEGRAM_V4_2.replace( + '1-0:1.8.1(001581.123*kWh)\r\n', + '' + ) + + with self.assertRaises(InvalidChecksumError): + TelegramParser.validate_checksum(corrupted_telegram) + + def test_checksum_missing(self): + # Remove the checksum value causing a ParseError. + corrupted_telegram = TELEGRAM_V4_2.replace('!6796\r\n', '') + + with self.assertRaises(ParseError): + TelegramParser.validate_checksum(corrupted_telegram)