From 76b55bee8d31ab5aca2eb4a51080886aef4b357d Mon Sep 17 00:00:00 2001 From: skrutzler Date: Sun, 12 Nov 2023 09:59:09 +0100 Subject: [PATCH] add encrytion_key and authentication_key to TelegramParser object --- dsmr_parser/__main__.py | 11 ++++++++--- dsmr_parser/clients/protocol.py | 19 ++++++++++++++----- dsmr_parser/clients/rfxtrx_protocol.py | 8 ++++---- dsmr_parser/parsers.py | 4 ++-- test/test_parse_sagemcom_t210_d_r.py | 10 +++++++++- 5 files changed, 37 insertions(+), 15 deletions(-) diff --git a/dsmr_parser/__main__.py b/dsmr_parser/__main__.py index 9169318..30cf1c3 100644 --- a/dsmr_parser/__main__.py +++ b/dsmr_parser/__main__.py @@ -16,8 +16,12 @@ def console(): help='alternatively connect using TCP host.') parser.add_argument('--port', default=None, help='TCP port to use for connection') - parser.add_argument('--version', default='2.2', choices=['2.2', '4', '5', '5B', '5L', '5S', 'Q3D'], - help='DSMR version (2.2, 4, 5, 5B, 5L, 5S, Q3D)') + parser.add_argument('--version', default='2.2', choices=['2.2', '4', '5', '5B', '5L', '5S', 'Q3D', 'SAGEMCOM_T210_D_R'], + help='DSMR version (2.2, 4, 5, 5B, 5L, 5S, Q3D, SAGEMCOM_T210_D_R)') + parser.add_argument('--encryption_key', default="", + help='configure the global cipher encryption_key') + parser.add_argument('--authentication_key', default="", + help='configure the global cipher authentication_key') parser.add_argument('--verbose', '-v', action='count') args = parser.parse_args() @@ -45,7 +49,8 @@ def console(): else: create_connection = partial(create_dsmr_reader, args.device, args.version, - print_callback, loop=loop) + print_callback, loop=loop, + encryption_key=args.encryption_key, authentication_key=args.authentication_key) try: # connect and keep connected until interrupted by ctrl-c diff --git a/dsmr_parser/clients/protocol.py b/dsmr_parser/clients/protocol.py index 4605fcf..2db6896 100644 --- a/dsmr_parser/clients/protocol.py +++ b/dsmr_parser/clients/protocol.py @@ -3,6 +3,7 @@ from functools import partial import asyncio import logging +import copy from serial_asyncio import create_serial_connection @@ -14,16 +15,16 @@ from dsmr_parser.clients.settings import SERIAL_SETTINGS_V2_2, \ SERIAL_SETTINGS_V4, SERIAL_SETTINGS_V5 -def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None, **kwargs): +def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None, encryption_key="", authentication_key="", **kwargs): """Creates a DSMR asyncio protocol.""" protocol = _create_dsmr_protocol(dsmr_version, telegram_callback, - DSMRProtocol, loop, **kwargs) + DSMRProtocol, loop, encryption_key, authentication_key, **kwargs) return protocol # pylama noqa - because of "complex" (too long) if-elif-else. # Match - case might be a solution but it is not available in <3.10 -def _create_dsmr_protocol(dsmr_version, telegram_callback, protocol, loop=None, **kwargs): #noqa +def _create_dsmr_protocol(dsmr_version, telegram_callback, protocol, loop=None, encryption_key="", authentication_key="", **kwargs): #noqa """Creates a DSMR asyncio protocol.""" if dsmr_version == '2.2': @@ -50,6 +51,9 @@ def _create_dsmr_protocol(dsmr_version, telegram_callback, protocol, loop=None, elif dsmr_version == "Q3D": specification = telegram_specifications.Q3D serial_settings = SERIAL_SETTINGS_V5 + elif dsmr_version == "SAGEMCOM_T210_D_R": + specification = telegram_specifications.SAGEMCOM_T210_D_R + serial_settings = SERIAL_SETTINGS_V5 elif dsmr_version == 'ISKRA_IE': specification = telegram_specifications.ISKRA_IE serial_settings = SERIAL_SETTINGS_V5 @@ -60,16 +64,21 @@ def _create_dsmr_protocol(dsmr_version, telegram_callback, protocol, loop=None, raise NotImplementedError("No telegram parser found for version: %s", dsmr_version) + if "general_global_cipher" in specification and specification["general_global_cipher"]: + specification = copy.deepcopy(specification) + specification["encryption_key"] = encryption_key + specification["authentication_key"] = authentication_key + protocol = partial(protocol, loop, TelegramParser(specification), telegram_callback=telegram_callback, **kwargs) return protocol, serial_settings -def create_dsmr_reader(port, dsmr_version, telegram_callback, loop=None): +def create_dsmr_reader(port, dsmr_version, telegram_callback, loop=None, encryption_key="", authentication_key=""): """Creates a DSMR asyncio protocol coroutine using serial port.""" protocol, serial_settings = create_dsmr_protocol( - dsmr_version, telegram_callback, loop=None) + dsmr_version, telegram_callback, loop=None, encryption_key=encryption_key, authentication_key=authentication_key) serial_settings['url'] = port conn = create_serial_connection(loop, protocol, **serial_settings) diff --git a/dsmr_parser/clients/rfxtrx_protocol.py b/dsmr_parser/clients/rfxtrx_protocol.py index 848de71..21feb57 100644 --- a/dsmr_parser/clients/rfxtrx_protocol.py +++ b/dsmr_parser/clients/rfxtrx_protocol.py @@ -6,17 +6,17 @@ from serial_asyncio import create_serial_connection from .protocol import DSMRProtocol, _create_dsmr_protocol -def create_rfxtrx_dsmr_protocol(dsmr_version, telegram_callback, loop=None, **kwargs): +def create_rfxtrx_dsmr_protocol(dsmr_version, telegram_callback, loop=None, encryption_key="", authentication_key="", **kwargs): """Creates a RFXtrxDSMR asyncio protocol.""" protocol = _create_dsmr_protocol(dsmr_version, telegram_callback, - RFXtrxDSMRProtocol, loop, **kwargs) + RFXtrxDSMRProtocol, loop, encryption_key, authentication_key,**kwargs) return protocol -def create_rfxtrx_dsmr_reader(port, dsmr_version, telegram_callback, loop=None): +def create_rfxtrx_dsmr_reader(port, dsmr_version, telegram_callback, loop=None, encryption_key="", authentication_key=""): """Creates a DSMR asyncio protocol coroutine using a RFXtrx serial port.""" protocol, serial_settings = create_rfxtrx_dsmr_protocol( - dsmr_version, telegram_callback, loop=None) + dsmr_version, telegram_callback, loop=None, encryption_key=encryption_key, authentication_key=authentication_key) serial_settings['url'] = port conn = create_serial_connection(loop, protocol, **serial_settings) diff --git a/dsmr_parser/parsers.py b/dsmr_parser/parsers.py index 61515fc..723f642 100644 --- a/dsmr_parser/parsers.py +++ b/dsmr_parser/parsers.py @@ -49,8 +49,8 @@ class TelegramParser(object): if "general_global_cipher" in self.telegram_specification: if self.telegram_specification["general_global_cipher"]: - enc_key = unhexlify(encryption_key) - auth_key = unhexlify(authentication_key) + enc_key = unhexlify(self.telegram_specification["encryption_key"] if "encryption_key" in self.telegram_specification else encryption_key) + auth_key = unhexlify(self.telegram_specification["authentication_key"] if "authentication_key" in self.telegram_specification else authentication_key) telegram_data = unhexlify(telegram_data) apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data) if apdu.security_control.security_suite != 0: diff --git a/test/test_parse_sagemcom_t210_d_r.py b/test/test_parse_sagemcom_t210_d_r.py index 525172c..a9b66f1 100644 --- a/test/test_parse_sagemcom_t210_d_r.py +++ b/test/test_parse_sagemcom_t210_d_r.py @@ -54,6 +54,14 @@ class TelegramParserEncryptedTest(unittest.TestCase): return full_frame + def test_parsing_encryption_key_within_specification(self): + specification = deepcopy(telegram_specifications.SAGEMCOM_T210_D_R) + specification['encryption_key'] = self.DUMMY_ENCRYPTION_KEY + specification['authentication_key'] = self.DUMMY_AUTHENTICATION_KEY + parser = TelegramParser(specification) + result = parser.parse(self.__generate_encrypted().hex()) + self.assertEqual(len(result), 18) + def test_parse(self): parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R) result = parser.parse(self.__generate_encrypted().hex(), @@ -104,4 +112,4 @@ class TelegramParserEncryptedTest(unittest.TestCase): only_auth = self.__generate_encrypted(0, authenticated=True, encrypted=False).hex() with self.assertRaises(ValueError): - parser.parse(only_auth, authentication_key=self.DUMMY_AUTHENTICATION_KEY) + parser.parse(only_auth, authentication_key=self.DUMMY_AUTHENTICATION_KEY) \ No newline at end of file