diff --git a/dsmr_parser/clients/protocol.py b/dsmr_parser/clients/protocol.py index 40cdfc3..a7fb74f 100644 --- a/dsmr_parser/clients/protocol.py +++ b/dsmr_parser/clients/protocol.py @@ -48,6 +48,9 @@ def _create_dsmr_protocol(dsmr_version, telegram_callback, protocol, loop=None, elif dsmr_version == "Q3D": specification = telegram_specifications.Q3D serial_settings = SERIAL_SETTINGS_V5 + elif dsmr_version == 'ISKRA_IE': + specification = telegram_specifications.ISKRA_IE + serial_settings = SERIAL_SETTINGS_V5 else: raise NotImplementedError("No telegram parser found for version: %s", dsmr_version) diff --git a/dsmr_parser/telegram_specifications.py b/dsmr_parser/telegram_specifications.py index 214fa52..83eb29d 100644 --- a/dsmr_parser/telegram_specifications.py +++ b/dsmr_parser/telegram_specifications.py @@ -309,3 +309,33 @@ SAGEMCOM_T210_D_R = { } } AUSTRIA_ENERGIENETZE_STEIERMARK = SAGEMCOM_T210_D_R + +ISKRA_IE = { + "checksum_support": False, + 'objects': { + obis.EQUIPMENT_IDENTIFIER_GAS: CosemParser(ValueParser(str)), + obis.P1_MESSAGE_TIMESTAMP: CosemParser(ValueParser(timestamp)), + obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_USED_TARIFF_2: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_DELIVERED_TARIFF_1: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_DELIVERED_TARIFF_2: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_ACTIVE_TARIFF: CosemParser(ValueParser(str)), + obis.CURRENT_ELECTRICITY_USAGE: CosemParser(ValueParser(Decimal)), + obis.CURRENT_ELECTRICITY_DELIVERY: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_VOLTAGE_L1: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_VOLTAGE_L2: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_VOLTAGE_L3: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_CURRENT_L1: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_CURRENT_L2: CosemParser(ValueParser(Decimal)), + obis.INSTANTANEOUS_CURRENT_L3: CosemParser(ValueParser(Decimal)), + obis.ACTUAL_SWITCH_POSITION: CosemParser(ValueParser(str)), + obis.TEXT_MESSAGE: CosemParser(ValueParser(str)), + obis.EQUIPMENT_IDENTIFIER: CosemParser(ValueParser(str)), + } +} diff --git a/dsmr_parser/value_types.py b/dsmr_parser/value_types.py index 4bc9ef3..487e98c 100644 --- a/dsmr_parser/value_types.py +++ b/dsmr_parser/value_types.py @@ -4,14 +4,23 @@ import pytz def timestamp(value): - naive_datetime = datetime.datetime.strptime(value[:-1], '%y%m%d%H%M%S') + try: + naive_datetime = datetime.datetime.strptime(value[:-1], '%y%m%d%H%M%S') + except ValueError: + return None - # TODO comment on this exception + # Timestamp has the following format: + # YYMMDDhhmmssX + # ASCII presentation of Time stamp with + # Year, Month, Day, Hour, Minute, Second, + # and an indication whether DST is active + # (X=S) or DST is not active (X=W) if len(value) == 13: is_dst = value[12] == 'S' # assume format 160322150000W else: is_dst = False + # TODO : Use system timezone local_tz = pytz.timezone('Europe/Amsterdam') localized_datetime = local_tz.localize(naive_datetime, is_dst=is_dst) diff --git a/test/example_telegrams.py b/test/example_telegrams.py index 36ea45e..d59ce83 100644 --- a/test/example_telegrams.py +++ b/test/example_telegrams.py @@ -280,3 +280,34 @@ TELEGRAM_SAGEMCOM_T210_D_R = ( '1-0:4.7.0(000000166*var)\r\n' '!7EF9\r\n' ) + +TELEGRAM_ISKRA_IE = ( + '/ISk5\2MIE5T-200\r\n' + '\r\n' + '1-0:0.0.0(00000000)\r\n' + '0-0:96.1.0(09610)\r\n' + '0-0:1.0.0(230202132747S)\r\n' + '1-0:1.8.1(000010.181*kWh)\r\n' + '1-0:1.8.2(000010.182*kWh)\r\n' + '1-0:2.8.1(000010.281*kWh)\r\n' + '1-0:2.8.2(000010.282*kWh)\r\n' + '0-0:96.14.0(0001)\r\n' + '1-0:1.7.0(00.170*kW)\r\n' + '1-0:2.7.0(00.270*kW)\r\n' + '1-0:21.7.0(00.217*kW)\r\n' + '1-0:41.7.0(00.417*kW)\r\n' + '1-0:61.7.0(00.617*kW)\r\n' + '1-0:22.7.0(00.227*kW)\r\n' + '1-0:42.7.0(00.427*kW)\r\n' + '1-0:62.7.0(00.627*kW)\r\n' + '1-0:32.7.0(242.5*V)\r\n' + '1-0:52.7.0(241.7*V)\r\n' + '1-0:72.7.0(243.3*V)\r\n' + '1-0:31.7.0(000*A)\r\n' + '1-0:51.7.0(000*A)\r\n' + '1-0:71.7.0(000*A)\r\n' + '0-0:96.3.10(1)\r\n' + '0-0:96.13.0()\r\n' + '0-1:96.1.1()\r\n' + '!AD3B\r\n' +) diff --git a/test/experiment_telegram.py b/test/experiment_telegram.py index 9556b23..c815072 100644 --- a/test/experiment_telegram.py +++ b/test/experiment_telegram.py @@ -1,6 +1,6 @@ from dsmr_parser import telegram_specifications from dsmr_parser.parsers import TelegramParser -from example_telegrams import TELEGRAM_V4_2 +from test.example_telegrams import TELEGRAM_V4_2 parser = TelegramParser(telegram_specifications.V4) telegram = parser.parse(TELEGRAM_V4_2) diff --git a/test/test_parse_iskra_ie.py b/test/test_parse_iskra_ie.py new file mode 100644 index 0000000..642fc0d --- /dev/null +++ b/test/test_parse_iskra_ie.py @@ -0,0 +1,174 @@ +import unittest + +from decimal import Decimal + +from dsmr_parser.exceptions import InvalidChecksumError, ParseError +from dsmr_parser.objects import CosemObject +from dsmr_parser.parsers import TelegramParser +from dsmr_parser import telegram_specifications +from dsmr_parser import obis_references as obis +from test.example_telegrams import TELEGRAM_ISKRA_IE + + +class TelegramParserIskraIETest(unittest.TestCase): + """ Test parsing of a Iskra IE5 telegram. """ + + def test_parse(self): + parser = TelegramParser(telegram_specifications.ISKRA_IE) + result = parser.parse(TELEGRAM_ISKRA_IE) + + # EQUIPMENT_IDENTIFIER_GAS (0-0:96.1.0) + assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS], CosemObject) + assert result[obis.EQUIPMENT_IDENTIFIER_GAS].unit is None + assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS].value, str) + assert result[obis.EQUIPMENT_IDENTIFIER_GAS].value == '09610' + + # ELECTRICITY_USED_TARIFF_1 (1-0:1.8.1) + assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1], CosemObject) + assert result[obis.ELECTRICITY_USED_TARIFF_1].unit == 'kWh' + assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1].value, Decimal) + assert result[obis.ELECTRICITY_USED_TARIFF_1].value == Decimal('10.181') + + # ELECTRICITY_USED_TARIFF_2 (1-0:1.8.2) + assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2], CosemObject) + assert result[obis.ELECTRICITY_USED_TARIFF_2].unit == 'kWh' + assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2].value, Decimal) + assert result[obis.ELECTRICITY_USED_TARIFF_2].value == Decimal('10.182') + + # ELECTRICITY_DELIVERED_TARIFF_1 (1-0:2.8.1) + assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_1], CosemObject) + assert result[obis.ELECTRICITY_DELIVERED_TARIFF_1].unit == 'kWh' + assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_1].value, Decimal) + assert result[obis.ELECTRICITY_DELIVERED_TARIFF_1].value == Decimal('10.281') + + # ELECTRICITY_DELIVERED_TARIFF_2 (1-0:2.8.2) + assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_2], CosemObject) + assert result[obis.ELECTRICITY_DELIVERED_TARIFF_2].unit == 'kWh' + assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_2].value, Decimal) + assert result[obis.ELECTRICITY_DELIVERED_TARIFF_2].value == Decimal('10.282') + + # ELECTRICITY_ACTIVE_TARIFF (0-0:96.14.0) + assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF], CosemObject) + assert result[obis.ELECTRICITY_ACTIVE_TARIFF].unit is None + assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF].value, str) + assert result[obis.ELECTRICITY_ACTIVE_TARIFF].value == '0001' + + # CURRENT_ELECTRICITY_USAGE (1-0:1.7.0) + assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE], CosemObject) + assert result[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW' + assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE].value, Decimal) + assert result[obis.CURRENT_ELECTRICITY_USAGE].value == Decimal('0.170') + + # CURRENT_ELECTRICITY_DELIVERY (1-0:2.7.0) + assert isinstance(result[obis.CURRENT_ELECTRICITY_DELIVERY], CosemObject) + assert result[obis.CURRENT_ELECTRICITY_DELIVERY].unit == 'kW' + assert isinstance(result[obis.CURRENT_ELECTRICITY_DELIVERY].value, Decimal) + assert result[obis.CURRENT_ELECTRICITY_DELIVERY].value == Decimal('0.270') + + # INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE (1-0:21.7.0) + assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE], CosemObject) + assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].unit == 'kW' + assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].value, Decimal) + assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].value == Decimal('0.217') + + # INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE (1-0:41.7.0) + assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE], CosemObject) + assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].unit == 'kW' + assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].value, Decimal) + assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].value == Decimal('0.417') + + # INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE (1-0:61.7.0) + assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE], CosemObject) + assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].unit == 'kW' + assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].value, Decimal) + assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].value == Decimal('0.617') + + # INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE (1-0:22.7.0) + assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE], CosemObject) + assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE].unit == 'kW' + assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE].value, Decimal) + assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE].value == Decimal('0.227') + + # INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE (1-0:42.7.0) + assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE], CosemObject) + assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE].unit == 'kW' + assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE].value, Decimal) + assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE].value == Decimal('0.427') + + # INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE (1-0:62.7.0) + assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE], CosemObject) + assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE].unit == 'kW' + assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE].value, Decimal) + assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE].value == Decimal('0.627') + + # INSTANTANEOUS_VOLTAGE_L1 (1-0:32.7.0) + assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L1], CosemObject) + assert result[obis.INSTANTANEOUS_VOLTAGE_L1].unit == 'V' + assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L1].value, Decimal) + assert result[obis.INSTANTANEOUS_VOLTAGE_L1].value == Decimal('242.5') + + # INSTANTANEOUS_VOLTAGE_L2 (1-0:52.7.0) + assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L2], CosemObject) + assert result[obis.INSTANTANEOUS_VOLTAGE_L2].unit == 'V' + assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L2].value, Decimal) + assert result[obis.INSTANTANEOUS_VOLTAGE_L2].value == Decimal('241.7') + + # INSTANTANEOUS_VOLTAGE_L3 (1-0:72.7.0) + assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L3], CosemObject) + assert result[obis.INSTANTANEOUS_VOLTAGE_L3].unit == 'V' + assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L3].value, Decimal) + assert result[obis.INSTANTANEOUS_VOLTAGE_L3].value == Decimal('243.3') + + # INSTANTANEOUS_CURRENT_L1 (1-0:31.7.0) + assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L1], CosemObject) + assert result[obis.INSTANTANEOUS_CURRENT_L1].unit == 'A' + assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L1].value, Decimal) + assert result[obis.INSTANTANEOUS_CURRENT_L1].value == Decimal('0.000') + + # INSTANTANEOUS_CURRENT_L2 (1-0:51.7.0) + assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L2], CosemObject) + assert result[obis.INSTANTANEOUS_CURRENT_L2].unit == 'A' + assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L2].value, Decimal) + assert result[obis.INSTANTANEOUS_CURRENT_L2].value == Decimal('0.000') + + # INSTANTANEOUS_CURRENT_L3 (1-0:71.7.0) + assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L3], CosemObject) + assert result[obis.INSTANTANEOUS_CURRENT_L3].unit == 'A' + assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L3].value, Decimal) + assert result[obis.INSTANTANEOUS_CURRENT_L3].value == Decimal('0.000') + + # ACTUAL_SWITCH_POSITION (0-0:96.3.10) + assert isinstance(result[obis.ACTUAL_SWITCH_POSITION], CosemObject) + assert result[obis.ACTUAL_SWITCH_POSITION].unit is None + assert isinstance(result[obis.ACTUAL_SWITCH_POSITION].value, str) + assert result[obis.ACTUAL_SWITCH_POSITION].value == '1' + + # TEXT_MESSAGE (0-0:96.13.0) + assert isinstance(result[obis.TEXT_MESSAGE], CosemObject) + assert result[obis.TEXT_MESSAGE].unit is None + assert result[obis.TEXT_MESSAGE].value is None + + # EQUIPMENT_IDENTIFIER (0-0:96.1.1) + assert isinstance(result[obis.EQUIPMENT_IDENTIFIER], CosemObject) + assert result[obis.EQUIPMENT_IDENTIFIER].unit is None + assert result[obis.EQUIPMENT_IDENTIFIER].value is None + + def test_checksum_valid(self): + # No exception is raised. + TelegramParser.validate_checksum(TELEGRAM_ISKRA_IE) + + def test_checksum_invalid(self): + # Remove the electricty used data value. This causes the checksum to not match anymore. + corrupted_telegram = TELEGRAM_ISKRA_IE.replace( + '1-0:1.8.1(000010.181*kWh)\r\n', + '' + ) + + with self.assertRaises(InvalidChecksumError): + TelegramParser.validate_checksum(corrupted_telegram) + + def test_checksum_missing(self): + # Remove the checksum value causing a ParseError. + corrupted_telegram = TELEGRAM_ISKRA_IE.replace('!AD3B\r\n', '') + with self.assertRaises(ParseError): + TelegramParser.validate_checksum(corrupted_telegram) diff --git a/test/test_parse_v5.py b/test/test_parse_v5.py index 0d4896c..088fe74 100644 --- a/test/test_parse_v5.py +++ b/test/test_parse_v5.py @@ -254,3 +254,21 @@ class TelegramParserV5Test(unittest.TestCase): corrupted_telegram = TELEGRAM_V5.replace('!6EEE\r\n', '') with self.assertRaises(ParseError): TelegramParser.validate_checksum(corrupted_telegram) + + def test_gas_timestamp_invalid(self): + # Issue 120 + # Sometimes a MBUS device (For ex a Gas Meter) returns an invalid timestamp + # Instead of failing, we should just ignore the timestamp + invalid_date_telegram = TELEGRAM_V5.replace( + '0-1:24.2.1(170102161005W)(00000.107*m3)\r\n', + '0-1:24.2.1(632525252525S)(00000.000)\r\n' + ) + invalid_date_telegram = invalid_date_telegram.replace('!6EEE\r\n', '!90C2\r\n') + parser = TelegramParser(telegram_specifications.V5) + result = parser.parse(invalid_date_telegram) + + # HOURLY_GAS_METER_READING (0-1:24.2.1) + assert isinstance(result[obis.HOURLY_GAS_METER_READING], MBusObject) + assert result[obis.HOURLY_GAS_METER_READING].unit is None + assert isinstance(result[obis.HOURLY_GAS_METER_READING].value, Decimal) + assert result[obis.HOURLY_GAS_METER_READING].value == Decimal('0.000')