Merge pull request #110 from debug-richard/master
Adds support for the Sagemcom T210-D-r smart meter
This commit is contained in:
		
						commit
						594db890fe
					
				| @ -12,6 +12,14 @@ ELECTRICITY_USED_TARIFF_1 = r'\d-\d:1\.8\.1.+?\r\n' | |||||||
| ELECTRICITY_USED_TARIFF_2 = r'\d-\d:1\.8\.2.+?\r\n' | ELECTRICITY_USED_TARIFF_2 = r'\d-\d:1\.8\.2.+?\r\n' | ||||||
| ELECTRICITY_DELIVERED_TARIFF_1 = r'\d-\d:2\.8\.1.+?\r\n' | ELECTRICITY_DELIVERED_TARIFF_1 = r'\d-\d:2\.8\.1.+?\r\n' | ||||||
| ELECTRICITY_DELIVERED_TARIFF_2 = r'\d-\d:2\.8\.2.+?\r\n' | ELECTRICITY_DELIVERED_TARIFF_2 = r'\d-\d:2\.8\.2.+?\r\n' | ||||||
|  | CURRENT_REACTIVE_EXPORTED = r'\d-\d:3\.7\.0.+?\r\n' | ||||||
|  | ELECTRICITY_REACTIVE_IMPORTED_TOTAL = r'\d-\d:3\.8\.0.+?\r\n' | ||||||
|  | ELECTRICITY_REACTIVE_IMPORTED_TARIFF_1 = r'\d-\d:3\.8\.1.+?\r\n' | ||||||
|  | ELECTRICITY_REACTIVE_IMPORTED_TARIFF_2 = r'\d-\d:3\.8\.2.+?\r\n' | ||||||
|  | CURRENT_REACTIVE_IMPORTED = r'\d-\d:4\.7\.0.+?\r\n' | ||||||
|  | ELECTRICITY_REACTIVE_EXPORTED_TOTAL = r'\d-\d:4\.8\.0.+?\r\n' | ||||||
|  | ELECTRICITY_REACTIVE_EXPORTED_TARIFF_1 = r'\d-\d:4\.8\.1.+?\r\n' | ||||||
|  | ELECTRICITY_REACTIVE_EXPORTED_TARIFF_2 = r'\d-\d:4\.8\.2.+?\r\n' | ||||||
| ELECTRICITY_ACTIVE_TARIFF = r'\d-\d:96\.14\.0.+?\r\n' | ELECTRICITY_ACTIVE_TARIFF = r'\d-\d:96\.14\.0.+?\r\n' | ||||||
| EQUIPMENT_IDENTIFIER = r'\d-\d:96\.1\.1.+?\r\n' | EQUIPMENT_IDENTIFIER = r'\d-\d:96\.1\.1.+?\r\n' | ||||||
| CURRENT_ELECTRICITY_USAGE = r'\d-\d:1\.7\.0.+?\r\n' | CURRENT_ELECTRICITY_USAGE = r'\d-\d:1\.7\.0.+?\r\n' | ||||||
|  | |||||||
| @ -1,8 +1,12 @@ | |||||||
| import logging | import logging | ||||||
| import re | import re | ||||||
|  | from binascii import unhexlify | ||||||
| 
 | 
 | ||||||
| from ctypes import c_ushort | from ctypes import c_ushort | ||||||
| 
 | 
 | ||||||
|  | from dlms_cosem.connection import XDlmsApduFactory | ||||||
|  | from dlms_cosem.protocol.xdlms import GeneralGlobalCipher | ||||||
|  | 
 | ||||||
| from dsmr_parser.objects import MBusObject, CosemObject, ProfileGenericObject | from dsmr_parser.objects import MBusObject, CosemObject, ProfileGenericObject | ||||||
| from dsmr_parser.exceptions import ParseError, InvalidChecksumError | from dsmr_parser.exceptions import ParseError, InvalidChecksumError | ||||||
| 
 | 
 | ||||||
| @ -22,14 +26,15 @@ class TelegramParser(object): | |||||||
|         self.telegram_specification = telegram_specification |         self.telegram_specification = telegram_specification | ||||||
|         self.apply_checksum_validation = apply_checksum_validation |         self.apply_checksum_validation = apply_checksum_validation | ||||||
| 
 | 
 | ||||||
|     def parse(self, telegram_data): |     def parse(self, telegram_data, encryption_key="", authentication_key=""):  # noqa: C901 | ||||||
|         """ |         """ | ||||||
|         Parse telegram from string to dict. |         Parse telegram from string to dict. | ||||||
| 
 |  | ||||||
|         The telegram str type makes python 2.x integration easier. |         The telegram str type makes python 2.x integration easier. | ||||||
| 
 | 
 | ||||||
|         :param str telegram_data: full telegram from start ('/') to checksum |         :param str telegram_data: full telegram from start ('/') to checksum | ||||||
|             ('!ABCD') including line endings in between the telegram's lines |             ('!ABCD') including line endings in between the telegram's lines | ||||||
|  |         :param str encryption_key: encryption key | ||||||
|  |         :param str authentication_key: authentication key | ||||||
|         :rtype: dict |         :rtype: dict | ||||||
|         :returns: Shortened example: |         :returns: Shortened example: | ||||||
|             { |             { | ||||||
| @ -43,6 +48,38 @@ class TelegramParser(object): | |||||||
|         :raises InvalidChecksumError: |         :raises InvalidChecksumError: | ||||||
|         """ |         """ | ||||||
| 
 | 
 | ||||||
|  |         if "general_global_cipher" in self.telegram_specification: | ||||||
|  |             if self.telegram_specification["general_global_cipher"]: | ||||||
|  |                 enc_key = unhexlify(encryption_key) | ||||||
|  |                 auth_key = unhexlify(authentication_key) | ||||||
|  |                 telegram_data = unhexlify(telegram_data) | ||||||
|  |                 apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data) | ||||||
|  |                 if apdu.security_control.security_suite != 0: | ||||||
|  |                     logger.warning("Untested security suite") | ||||||
|  |                 if apdu.security_control.authenticated and not apdu.security_control.encrypted: | ||||||
|  |                     logger.warning("Untested authentication only") | ||||||
|  |                 if not apdu.security_control.authenticated and not apdu.security_control.encrypted: | ||||||
|  |                     logger.warning("Untested not encrypted or authenticated") | ||||||
|  |                 if apdu.security_control.compressed: | ||||||
|  |                     logger.warning("Untested compression") | ||||||
|  |                 if apdu.security_control.broadcast_key: | ||||||
|  |                     logger.warning("Untested broadcast key") | ||||||
|  |                 telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii") | ||||||
|  |             else: | ||||||
|  |                 try: | ||||||
|  |                     if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG: | ||||||
|  |                         raise RuntimeError("Looks like a general_global_cipher frame " | ||||||
|  |                                            "but telegram specification is not matching!") | ||||||
|  |                 except Exception: | ||||||
|  |                     pass | ||||||
|  |         else: | ||||||
|  |             try: | ||||||
|  |                 if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG: | ||||||
|  |                     raise RuntimeError( | ||||||
|  |                         "Looks like a general_global_cipher frame but telegram specification is not matching!") | ||||||
|  |             except Exception: | ||||||
|  |                 pass | ||||||
|  | 
 | ||||||
|         if self.apply_checksum_validation \ |         if self.apply_checksum_validation \ | ||||||
|                 and self.telegram_specification['checksum_support']: |                 and self.telegram_specification['checksum_support']: | ||||||
|             self.validate_checksum(telegram_data) |             self.validate_checksum(telegram_data) | ||||||
|  | |||||||
| @ -201,3 +201,33 @@ Q3D = { | |||||||
|         obis.Q3D_EQUIPMENT_SERIALNUMBER: CosemParser(ValueParser(str)), |         obis.Q3D_EQUIPMENT_SERIALNUMBER: CosemParser(ValueParser(str)), | ||||||
|     }, |     }, | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | SAGEMCOM_T210_D_R = { | ||||||
|  |     "general_global_cipher": True, | ||||||
|  |     "checksum_support": True, | ||||||
|  |     'objects': { | ||||||
|  |         obis.P1_MESSAGE_HEADER: CosemParser(ValueParser(str)), | ||||||
|  |         obis.P1_MESSAGE_TIMESTAMP: CosemParser(ValueParser(timestamp)), | ||||||
|  |         obis.ELECTRICITY_IMPORTED_TOTAL: CosemParser(ValueParser(Decimal)), | ||||||
|  |         obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)), | ||||||
|  |         obis.ELECTRICITY_USED_TARIFF_2: CosemParser(ValueParser(Decimal)), | ||||||
|  |         obis.CURRENT_ELECTRICITY_USAGE: CosemParser(ValueParser(Decimal)), | ||||||
|  | 
 | ||||||
|  |         obis.ELECTRICITY_REACTIVE_EXPORTED_TOTAL: CosemParser(ValueParser(Decimal)), | ||||||
|  |         obis.ELECTRICITY_REACTIVE_EXPORTED_TARIFF_1: CosemParser(ValueParser(Decimal)), | ||||||
|  |         obis.ELECTRICITY_REACTIVE_EXPORTED_TARIFF_2: CosemParser(ValueParser(Decimal)), | ||||||
|  |         obis.CURRENT_REACTIVE_IMPORTED: CosemParser(ValueParser(Decimal)), | ||||||
|  | 
 | ||||||
|  |         obis.ELECTRICITY_EXPORTED_TOTAL: CosemParser(ValueParser(Decimal)), | ||||||
|  |         obis.ELECTRICITY_DELIVERED_TARIFF_1: CosemParser(ValueParser(Decimal)), | ||||||
|  |         obis.ELECTRICITY_DELIVERED_TARIFF_2: CosemParser(ValueParser(Decimal)), | ||||||
|  |         obis.CURRENT_ELECTRICITY_DELIVERY: CosemParser(ValueParser(Decimal)), | ||||||
|  | 
 | ||||||
|  |         obis.ELECTRICITY_REACTIVE_IMPORTED_TOTAL: CosemParser(ValueParser(Decimal)), | ||||||
|  |         obis.ELECTRICITY_REACTIVE_IMPORTED_TARIFF_1: CosemParser(ValueParser(Decimal)), | ||||||
|  |         obis.ELECTRICITY_REACTIVE_IMPORTED_TARIFF_2: CosemParser(ValueParser(Decimal)), | ||||||
|  |         obis.CURRENT_REACTIVE_EXPORTED: CosemParser(ValueParser(Decimal)), | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | AUSTRIA_ENERGIENETZE_STEIERMARK = SAGEMCOM_T210_D_R | ||||||
|  | |||||||
							
								
								
									
										3
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								setup.py
									
									
									
									
									
								
							| @ -13,7 +13,8 @@ setup( | |||||||
|         'pyserial>=3,<4', |         'pyserial>=3,<4', | ||||||
|         'pyserial-asyncio<1', |         'pyserial-asyncio<1', | ||||||
|         'pytz', |         'pytz', | ||||||
|         'Tailer==0.4.1' |         'Tailer==0.4.1', | ||||||
|  |         'dlms_cosem==21.3.2' | ||||||
|     ], |     ], | ||||||
|     entry_points={ |     entry_points={ | ||||||
|         'console_scripts': ['dsmr_console=dsmr_parser.__main__:console'] |         'console_scripts': ['dsmr_console=dsmr_parser.__main__:console'] | ||||||
|  | |||||||
| @ -169,3 +169,27 @@ TELEGRAM_ESY5Q3DA1004_V304 = ( | |||||||
|     '  25818685\r\n' |     '  25818685\r\n' | ||||||
|     'DE0000000000000000000000000000003\r\n' |     'DE0000000000000000000000000000003\r\n' | ||||||
| ) | ) | ||||||
|  | 
 | ||||||
|  | TELEGRAM_SAGEMCOM_T210_D_R = ( | ||||||
|  |     '/EST5\\253710000_A\r\n' | ||||||
|  |     '\r\n' | ||||||
|  |     '1-3:0.2.8(50)\r\n' | ||||||
|  |     '0-0:1.0.0(221006155014S)\r\n' | ||||||
|  |     '1-0:1.8.0(006545766*Wh)\r\n' | ||||||
|  |     '1-0:1.8.1(005017120*Wh)\r\n' | ||||||
|  |     '1-0:1.8.2(001528646*Wh)\r\n' | ||||||
|  |     '1-0:1.7.0(000000286*W)\r\n' | ||||||
|  |     '1-0:2.8.0(000000058*Wh)\r\n' | ||||||
|  |     '1-0:2.8.1(000000000*Wh)\r\n' | ||||||
|  |     '1-0:2.8.2(000000058*Wh)\r\n' | ||||||
|  |     '1-0:2.7.0(000000000*W)\r\n' | ||||||
|  |     '1-0:3.8.0(000000747*varh)\r\n' | ||||||
|  |     '1-0:3.8.1(000000000*varh)\r\n' | ||||||
|  |     '1-0:3.8.2(000000747*varh)\r\n' | ||||||
|  |     '1-0:3.7.0(000000000*var)\r\n' | ||||||
|  |     '1-0:4.8.0(003897726*varh)\r\n' | ||||||
|  |     '1-0:4.8.1(002692848*varh)\r\n' | ||||||
|  |     '1-0:4.8.2(001204878*varh)\r\n' | ||||||
|  |     '1-0:4.7.0(000000166*var)\r\n' | ||||||
|  |     '!7EF9\r\n' | ||||||
|  | ) | ||||||
|  | |||||||
							
								
								
									
										107
									
								
								test/test_parse_sagemcom_t210_d_r.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										107
									
								
								test/test_parse_sagemcom_t210_d_r.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,107 @@ | |||||||
|  | from binascii import unhexlify | ||||||
|  | from copy import deepcopy | ||||||
|  | 
 | ||||||
|  | import unittest | ||||||
|  | 
 | ||||||
|  | from dlms_cosem.exceptions import DecryptionError | ||||||
|  | from dlms_cosem.protocol.xdlms import GeneralGlobalCipher | ||||||
|  | from dlms_cosem.security import SecurityControlField, encrypt | ||||||
|  | 
 | ||||||
|  | from dsmr_parser import telegram_specifications | ||||||
|  | from dsmr_parser.exceptions import ParseError | ||||||
|  | from dsmr_parser.parsers import TelegramParser | ||||||
|  | from test.example_telegrams import TELEGRAM_SAGEMCOM_T210_D_R | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class TelegramParserEncryptedTest(unittest.TestCase): | ||||||
|  |     """ Test parsing of a DSML encypted DSMR v5.x telegram. """ | ||||||
|  |     DUMMY_ENCRYPTION_KEY = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" | ||||||
|  |     DUMMY_AUTHENTICATION_KEY = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB" | ||||||
|  | 
 | ||||||
|  |     def __generate_encrypted(self, security_suite=0, authenticated=True, encrypted=True): | ||||||
|  |         security_control = SecurityControlField( | ||||||
|  |             security_suite=security_suite, authenticated=authenticated, encrypted=encrypted | ||||||
|  |         ) | ||||||
|  |         encryption_key = unhexlify(self.DUMMY_ENCRYPTION_KEY) | ||||||
|  |         authentication_key = unhexlify(self.DUMMY_AUTHENTICATION_KEY) | ||||||
|  |         system_title = "SYSTEMID".encode("ascii") | ||||||
|  |         invocation_counter = int.from_bytes(bytes.fromhex("10000001"), "big") | ||||||
|  |         plain_data = TELEGRAM_SAGEMCOM_T210_D_R.encode("ascii") | ||||||
|  | 
 | ||||||
|  |         encrypted = encrypt( | ||||||
|  |             security_control=security_control, | ||||||
|  |             key=encryption_key, | ||||||
|  |             auth_key=authentication_key, | ||||||
|  |             system_title=system_title, | ||||||
|  |             invocation_counter=invocation_counter, | ||||||
|  |             plain_text=plain_data, | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         full_frame = bytearray(GeneralGlobalCipher.TAG.to_bytes(1, "big", signed=False)) | ||||||
|  |         full_frame.extend(len(system_title).to_bytes(1, "big", signed=False)) | ||||||
|  |         full_frame.extend(system_title) | ||||||
|  |         full_frame.extend([0x82])  # Length of the following length bytes | ||||||
|  |         # https://github.com/pwitab/dlms-cosem/blob/739f81a58e5f07663a512d4a128851333a0ed5e6/dlms_cosem/a_xdr.py#L33 | ||||||
|  | 
 | ||||||
|  |         security_control = security_control.to_bytes() | ||||||
|  |         invocation_counter = invocation_counter.to_bytes(4, "big", signed=False) | ||||||
|  |         full_frame.extend((len(encrypted) | ||||||
|  |                            + len(invocation_counter) | ||||||
|  |                            + len(security_control)).to_bytes(2, "big", signed=False)) | ||||||
|  |         full_frame.extend(security_control) | ||||||
|  |         full_frame.extend(invocation_counter) | ||||||
|  |         full_frame.extend(encrypted) | ||||||
|  | 
 | ||||||
|  |         return full_frame | ||||||
|  | 
 | ||||||
|  |     def test_parse(self): | ||||||
|  |         parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R) | ||||||
|  |         result = parser.parse(self.__generate_encrypted().hex(), | ||||||
|  |                               self.DUMMY_ENCRYPTION_KEY, | ||||||
|  |                               self.DUMMY_AUTHENTICATION_KEY) | ||||||
|  |         self.assertEqual(len(result), 18) | ||||||
|  | 
 | ||||||
|  |     def test_damaged_frame(self): | ||||||
|  |         # If the frame is damaged decrypting fails (crc is technically not needed) | ||||||
|  |         parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R) | ||||||
|  | 
 | ||||||
|  |         generated = self.__generate_encrypted() | ||||||
|  |         generated[150] = 0x00 | ||||||
|  |         generated = generated.hex() | ||||||
|  | 
 | ||||||
|  |         with self.assertRaises(DecryptionError): | ||||||
|  |             parser.parse(generated, self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY) | ||||||
|  | 
 | ||||||
|  |     def test_plain(self): | ||||||
|  |         # If a plain request is parsed with "general_global_cipher": True it fails | ||||||
|  |         parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R) | ||||||
|  | 
 | ||||||
|  |         with self.assertRaises(Exception): | ||||||
|  |             parser.parse(TELEGRAM_SAGEMCOM_T210_D_R, self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY) | ||||||
|  | 
 | ||||||
|  |     def test_general_global_cipher_not_specified(self): | ||||||
|  |         # If a GGC frame is detected but general_global_cipher is not set it fails | ||||||
|  |         parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R) | ||||||
|  |         parser = deepcopy(parser)  # We do not want to change the module value | ||||||
|  |         parser.telegram_specification['general_global_cipher'] = False | ||||||
|  | 
 | ||||||
|  |         with self.assertRaises(ParseError): | ||||||
|  |             parser.parse(self.__generate_encrypted().hex(), self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY) | ||||||
|  | 
 | ||||||
|  |     def test_only_encrypted(self): | ||||||
|  |         # Not implemented by dlms_cosem | ||||||
|  |         parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R) | ||||||
|  | 
 | ||||||
|  |         only_auth = self.__generate_encrypted(0, authenticated=False, encrypted=True).hex() | ||||||
|  | 
 | ||||||
|  |         with self.assertRaises(ValueError): | ||||||
|  |             parser.parse(only_auth, self.DUMMY_ENCRYPTION_KEY) | ||||||
|  | 
 | ||||||
|  |     def test_only_auth(self): | ||||||
|  |         # Not implemented by dlms_cosem | ||||||
|  |         parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R) | ||||||
|  | 
 | ||||||
|  |         only_auth = self.__generate_encrypted(0, authenticated=True, encrypted=False).hex() | ||||||
|  | 
 | ||||||
|  |         with self.assertRaises(ValueError): | ||||||
|  |             parser.parse(only_auth, authentication_key=self.DUMMY_AUTHENTICATION_KEY) | ||||||
		Loading…
	
		Reference in New Issue
	
	Block a user