From 09f4afcadadeb05f4294789ee3163f750a179197 Mon Sep 17 00:00:00 2001 From: Richard Schleich Date: Fri, 14 Oct 2022 17:50:02 +0200 Subject: [PATCH] Adds support for the Sagemcom T210-D-r smart meter installed by Austrian grid operators. The smart meter uses the DSMR/P1 standard and embeds the data in an encrypted and authenticated "DLMS General Global Cipher" frame. The encryption/decryption is handled by the "dlms_cosem" lib. Test cases are included. Adds OBIS codes for reactive energy. --- dsmr_parser/obis_references.py | 8 ++ dsmr_parser/parsers.py | 41 +++++++++- dsmr_parser/telegram_specifications.py | 30 +++++++ setup.py | 3 +- test/example_telegrams.py | 24 ++++++ test/test_parse_sagemcom_t210_d_r.py | 107 +++++++++++++++++++++++++ tox.ini | 1 + 7 files changed, 211 insertions(+), 3 deletions(-) create mode 100644 test/test_parse_sagemcom_t210_d_r.py diff --git a/dsmr_parser/obis_references.py b/dsmr_parser/obis_references.py index b52d1e6..a5ffd61 100644 --- a/dsmr_parser/obis_references.py +++ b/dsmr_parser/obis_references.py @@ -12,6 +12,14 @@ ELECTRICITY_USED_TARIFF_1 = r'\d-\d:1\.8\.1.+?\r\n' ELECTRICITY_USED_TARIFF_2 = r'\d-\d:1\.8\.2.+?\r\n' ELECTRICITY_DELIVERED_TARIFF_1 = r'\d-\d:2\.8\.1.+?\r\n' ELECTRICITY_DELIVERED_TARIFF_2 = r'\d-\d:2\.8\.2.+?\r\n' +CURRENT_REACTIVE_EXPORTED = r'\d-\d:3\.7\.0.+?\r\n' +ELECTRICITY_REACTIVE_IMPORTED_TOTAL = r'\d-\d:3\.8\.0.+?\r\n' +ELECTRICITY_REACTIVE_IMPORTED_TARIFF_1 = r'\d-\d:3\.8\.1.+?\r\n' +ELECTRICITY_REACTIVE_IMPORTED_TARIFF_2 = r'\d-\d:3\.8\.2.+?\r\n' +CURRENT_REACTIVE_IMPORTED = r'\d-\d:4\.7\.0.+?\r\n' +ELECTRICITY_REACTIVE_EXPORTED_TOTAL = r'\d-\d:4\.8\.0.+?\r\n' +ELECTRICITY_REACTIVE_EXPORTED_TARIFF_1 = r'\d-\d:4\.8\.1.+?\r\n' +ELECTRICITY_REACTIVE_EXPORTED_TARIFF_2 = r'\d-\d:4\.8\.2.+?\r\n' ELECTRICITY_ACTIVE_TARIFF = r'\d-\d:96\.14\.0.+?\r\n' EQUIPMENT_IDENTIFIER = r'\d-\d:96\.1\.1.+?\r\n' CURRENT_ELECTRICITY_USAGE = r'\d-\d:1\.7\.0.+?\r\n' diff --git a/dsmr_parser/parsers.py b/dsmr_parser/parsers.py index fab9a50..984253e 100644 --- a/dsmr_parser/parsers.py +++ b/dsmr_parser/parsers.py @@ -1,8 +1,12 @@ import logging import re +from binascii import unhexlify from ctypes import c_ushort +from dlms_cosem.connection import XDlmsApduFactory +from dlms_cosem.protocol.xdlms import GeneralGlobalCipher + from dsmr_parser.objects import MBusObject, CosemObject, ProfileGenericObject from dsmr_parser.exceptions import ParseError, InvalidChecksumError @@ -22,14 +26,15 @@ class TelegramParser(object): self.telegram_specification = telegram_specification self.apply_checksum_validation = apply_checksum_validation - def parse(self, telegram_data): + def parse(self, telegram_data, encryption_key="", authentication_key=""): # noqa: C901 """ Parse telegram from string to dict. - The telegram str type makes python 2.x integration easier. :param str telegram_data: full telegram from start ('/') to checksum ('!ABCD') including line endings in between the telegram's lines + :param str encryption_key: encryption key + :param str authentication_key: authentication key :rtype: dict :returns: Shortened example: { @@ -43,6 +48,38 @@ class TelegramParser(object): :raises InvalidChecksumError: """ + if "general_global_cipher" in self.telegram_specification: + if self.telegram_specification["general_global_cipher"]: + enc_key = unhexlify(encryption_key) + auth_key = unhexlify(authentication_key) + telegram_data = unhexlify(telegram_data) + apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data) + if apdu.security_control.security_suite != 0: + logger.warning("Untested security suite") + if apdu.security_control.authenticated and not apdu.security_control.encrypted: + logger.warning("Untested authentication only") + if not apdu.security_control.authenticated and not apdu.security_control.encrypted: + logger.warning("Untested not encrypted or authenticated") + if apdu.security_control.compressed: + logger.warning("Untested compression") + if apdu.security_control.broadcast_key: + logger.warning("Untested broadcast key") + telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii") + else: + try: + if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG: + raise RuntimeError("Looks like a general_global_cipher frame " + "but telegram specification is not matching!") + except Exception: + pass + else: + try: + if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG: + raise RuntimeError( + "Looks like a general_global_cipher frame but telegram specification is not matching!") + except Exception: + pass + if self.apply_checksum_validation \ and self.telegram_specification['checksum_support']: self.validate_checksum(telegram_data) diff --git a/dsmr_parser/telegram_specifications.py b/dsmr_parser/telegram_specifications.py index ca2f23f..000d61c 100644 --- a/dsmr_parser/telegram_specifications.py +++ b/dsmr_parser/telegram_specifications.py @@ -201,3 +201,33 @@ Q3D = { obis.Q3D_EQUIPMENT_SERIALNUMBER: CosemParser(ValueParser(str)), }, } + + +SAGEMCOM_T210_D_R = { + "general_global_cipher": True, + "checksum_support": True, + 'objects': { + obis.P1_MESSAGE_HEADER: CosemParser(ValueParser(str)), + obis.P1_MESSAGE_TIMESTAMP: CosemParser(ValueParser(timestamp)), + obis.ELECTRICITY_IMPORTED_TOTAL: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_USED_TARIFF_2: CosemParser(ValueParser(Decimal)), + obis.CURRENT_ELECTRICITY_USAGE: CosemParser(ValueParser(Decimal)), + + obis.ELECTRICITY_REACTIVE_EXPORTED_TOTAL: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_REACTIVE_EXPORTED_TARIFF_1: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_REACTIVE_EXPORTED_TARIFF_2: CosemParser(ValueParser(Decimal)), + obis.CURRENT_REACTIVE_IMPORTED: CosemParser(ValueParser(Decimal)), + + obis.ELECTRICITY_EXPORTED_TOTAL: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_DELIVERED_TARIFF_1: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_DELIVERED_TARIFF_2: CosemParser(ValueParser(Decimal)), + obis.CURRENT_ELECTRICITY_DELIVERY: CosemParser(ValueParser(Decimal)), + + obis.ELECTRICITY_REACTIVE_IMPORTED_TOTAL: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_REACTIVE_IMPORTED_TARIFF_1: CosemParser(ValueParser(Decimal)), + obis.ELECTRICITY_REACTIVE_IMPORTED_TARIFF_2: CosemParser(ValueParser(Decimal)), + obis.CURRENT_REACTIVE_EXPORTED: CosemParser(ValueParser(Decimal)), + } +} +AUSTRIA_ENERGIENETZE_STEIERMARK = SAGEMCOM_T210_D_R diff --git a/setup.py b/setup.py index cb4f092..062872b 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,8 @@ setup( 'pyserial>=3,<4', 'pyserial-asyncio<1', 'pytz', - 'Tailer==0.4.1' + 'Tailer==0.4.1', + 'dlms_cosem==21.3.2' ], entry_points={ 'console_scripts': ['dsmr_console=dsmr_parser.__main__:console'] diff --git a/test/example_telegrams.py b/test/example_telegrams.py index 1ccb8ce..04b9005 100644 --- a/test/example_telegrams.py +++ b/test/example_telegrams.py @@ -169,3 +169,27 @@ TELEGRAM_ESY5Q3DA1004_V304 = ( ' 25818685\r\n' 'DE0000000000000000000000000000003\r\n' ) + +TELEGRAM_SAGEMCOM_T210_D_R = ( + '/EST5\\253710000_A\r\n' + '\r\n' + '1-3:0.2.8(50)\r\n' + '0-0:1.0.0(221006155014S)\r\n' + '1-0:1.8.0(006545766*Wh)\r\n' + '1-0:1.8.1(005017120*Wh)\r\n' + '1-0:1.8.2(001528646*Wh)\r\n' + '1-0:1.7.0(000000286*W)\r\n' + '1-0:2.8.0(000000058*Wh)\r\n' + '1-0:2.8.1(000000000*Wh)\r\n' + '1-0:2.8.2(000000058*Wh)\r\n' + '1-0:2.7.0(000000000*W)\r\n' + '1-0:3.8.0(000000747*varh)\r\n' + '1-0:3.8.1(000000000*varh)\r\n' + '1-0:3.8.2(000000747*varh)\r\n' + '1-0:3.7.0(000000000*var)\r\n' + '1-0:4.8.0(003897726*varh)\r\n' + '1-0:4.8.1(002692848*varh)\r\n' + '1-0:4.8.2(001204878*varh)\r\n' + '1-0:4.7.0(000000166*var)\r\n' + '!7EF9\r\n' +) diff --git a/test/test_parse_sagemcom_t210_d_r.py b/test/test_parse_sagemcom_t210_d_r.py new file mode 100644 index 0000000..525172c --- /dev/null +++ b/test/test_parse_sagemcom_t210_d_r.py @@ -0,0 +1,107 @@ +from binascii import unhexlify +from copy import deepcopy + +import unittest + +from dlms_cosem.exceptions import DecryptionError +from dlms_cosem.protocol.xdlms import GeneralGlobalCipher +from dlms_cosem.security import SecurityControlField, encrypt + +from dsmr_parser import telegram_specifications +from dsmr_parser.exceptions import ParseError +from dsmr_parser.parsers import TelegramParser +from test.example_telegrams import TELEGRAM_SAGEMCOM_T210_D_R + + +class TelegramParserEncryptedTest(unittest.TestCase): + """ Test parsing of a DSML encypted DSMR v5.x telegram. """ + DUMMY_ENCRYPTION_KEY = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + DUMMY_AUTHENTICATION_KEY = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB" + + def __generate_encrypted(self, security_suite=0, authenticated=True, encrypted=True): + security_control = SecurityControlField( + security_suite=security_suite, authenticated=authenticated, encrypted=encrypted + ) + encryption_key = unhexlify(self.DUMMY_ENCRYPTION_KEY) + authentication_key = unhexlify(self.DUMMY_AUTHENTICATION_KEY) + system_title = "SYSTEMID".encode("ascii") + invocation_counter = int.from_bytes(bytes.fromhex("10000001"), "big") + plain_data = TELEGRAM_SAGEMCOM_T210_D_R.encode("ascii") + + encrypted = encrypt( + security_control=security_control, + key=encryption_key, + auth_key=authentication_key, + system_title=system_title, + invocation_counter=invocation_counter, + plain_text=plain_data, + ) + + full_frame = bytearray(GeneralGlobalCipher.TAG.to_bytes(1, "big", signed=False)) + full_frame.extend(len(system_title).to_bytes(1, "big", signed=False)) + full_frame.extend(system_title) + full_frame.extend([0x82]) # Length of the following length bytes + # https://github.com/pwitab/dlms-cosem/blob/739f81a58e5f07663a512d4a128851333a0ed5e6/dlms_cosem/a_xdr.py#L33 + + security_control = security_control.to_bytes() + invocation_counter = invocation_counter.to_bytes(4, "big", signed=False) + full_frame.extend((len(encrypted) + + len(invocation_counter) + + len(security_control)).to_bytes(2, "big", signed=False)) + full_frame.extend(security_control) + full_frame.extend(invocation_counter) + full_frame.extend(encrypted) + + return full_frame + + def test_parse(self): + parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R) + result = parser.parse(self.__generate_encrypted().hex(), + self.DUMMY_ENCRYPTION_KEY, + self.DUMMY_AUTHENTICATION_KEY) + self.assertEqual(len(result), 18) + + def test_damaged_frame(self): + # If the frame is damaged decrypting fails (crc is technically not needed) + parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R) + + generated = self.__generate_encrypted() + generated[150] = 0x00 + generated = generated.hex() + + with self.assertRaises(DecryptionError): + parser.parse(generated, self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY) + + def test_plain(self): + # If a plain request is parsed with "general_global_cipher": True it fails + parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R) + + with self.assertRaises(Exception): + parser.parse(TELEGRAM_SAGEMCOM_T210_D_R, self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY) + + def test_general_global_cipher_not_specified(self): + # If a GGC frame is detected but general_global_cipher is not set it fails + parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R) + parser = deepcopy(parser) # We do not want to change the module value + parser.telegram_specification['general_global_cipher'] = False + + with self.assertRaises(ParseError): + parser.parse(self.__generate_encrypted().hex(), self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY) + + def test_only_encrypted(self): + # Not implemented by dlms_cosem + parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R) + + only_auth = self.__generate_encrypted(0, authenticated=False, encrypted=True).hex() + + with self.assertRaises(ValueError): + parser.parse(only_auth, self.DUMMY_ENCRYPTION_KEY) + + def test_only_auth(self): + # Not implemented by dlms_cosem + parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R) + + only_auth = self.__generate_encrypted(0, authenticated=True, encrypted=False).hex() + + with self.assertRaises(ValueError): + parser.parse(only_auth, authentication_key=self.DUMMY_AUTHENTICATION_KEY) diff --git a/tox.ini b/tox.ini index 27fc713..011d394 100644 --- a/tox.ini +++ b/tox.ini @@ -5,6 +5,7 @@ deps= pylama pytest-asyncio pytest-mock + dlms_cosem commands= py.test --cov=dsmr_parser test {posargs} pylama dsmr_parser test