add encrytion_key and authentication_key to TelegramParser object

This commit is contained in:
skrutzler 2023-11-12 09:59:09 +01:00 committed by Stefan Krutzler
parent 739419e3ee
commit 76b55bee8d
No known key found for this signature in database
5 changed files with 37 additions and 15 deletions

View File

@ -16,8 +16,12 @@ def console():
help='alternatively connect using TCP host.')
parser.add_argument('--port', default=None,
help='TCP port to use for connection')
parser.add_argument('--version', default='2.2', choices=['2.2', '4', '5', '5B', '5L', '5S', 'Q3D'],
help='DSMR version (2.2, 4, 5, 5B, 5L, 5S, Q3D)')
parser.add_argument('--version', default='2.2', choices=['2.2', '4', '5', '5B', '5L', '5S', 'Q3D', 'SAGEMCOM_T210_D_R'],
help='DSMR version (2.2, 4, 5, 5B, 5L, 5S, Q3D, SAGEMCOM_T210_D_R)')
parser.add_argument('--encryption_key', default="",
help='configure the global cipher encryption_key')
parser.add_argument('--authentication_key', default="",
help='configure the global cipher authentication_key')
parser.add_argument('--verbose', '-v', action='count')
args = parser.parse_args()
@ -45,7 +49,8 @@ def console():
else:
create_connection = partial(create_dsmr_reader,
args.device, args.version,
print_callback, loop=loop)
print_callback, loop=loop,
encryption_key=args.encryption_key, authentication_key=args.authentication_key)
try:
# connect and keep connected until interrupted by ctrl-c

View File

@ -3,6 +3,7 @@
from functools import partial
import asyncio
import logging
import copy
from serial_asyncio import create_serial_connection
@ -14,16 +15,16 @@ from dsmr_parser.clients.settings import SERIAL_SETTINGS_V2_2, \
SERIAL_SETTINGS_V4, SERIAL_SETTINGS_V5
def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None, **kwargs):
def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None, encryption_key="", authentication_key="", **kwargs):
"""Creates a DSMR asyncio protocol."""
protocol = _create_dsmr_protocol(dsmr_version, telegram_callback,
DSMRProtocol, loop, **kwargs)
DSMRProtocol, loop, encryption_key, authentication_key, **kwargs)
return protocol
# pylama noqa - because of "complex" (too long) if-elif-else.
# Match - case might be a solution but it is not available in <3.10
def _create_dsmr_protocol(dsmr_version, telegram_callback, protocol, loop=None, **kwargs): #noqa
def _create_dsmr_protocol(dsmr_version, telegram_callback, protocol, loop=None, encryption_key="", authentication_key="", **kwargs): #noqa
"""Creates a DSMR asyncio protocol."""
if dsmr_version == '2.2':
@ -50,6 +51,9 @@ def _create_dsmr_protocol(dsmr_version, telegram_callback, protocol, loop=None,
elif dsmr_version == "Q3D":
specification = telegram_specifications.Q3D
serial_settings = SERIAL_SETTINGS_V5
elif dsmr_version == "SAGEMCOM_T210_D_R":
specification = telegram_specifications.SAGEMCOM_T210_D_R
serial_settings = SERIAL_SETTINGS_V5
elif dsmr_version == 'ISKRA_IE':
specification = telegram_specifications.ISKRA_IE
serial_settings = SERIAL_SETTINGS_V5
@ -60,16 +64,21 @@ def _create_dsmr_protocol(dsmr_version, telegram_callback, protocol, loop=None,
raise NotImplementedError("No telegram parser found for version: %s",
dsmr_version)
if "general_global_cipher" in specification and specification["general_global_cipher"]:
specification = copy.deepcopy(specification)
specification["encryption_key"] = encryption_key
specification["authentication_key"] = authentication_key
protocol = partial(protocol, loop, TelegramParser(specification),
telegram_callback=telegram_callback, **kwargs)
return protocol, serial_settings
def create_dsmr_reader(port, dsmr_version, telegram_callback, loop=None):
def create_dsmr_reader(port, dsmr_version, telegram_callback, loop=None, encryption_key="", authentication_key=""):
"""Creates a DSMR asyncio protocol coroutine using serial port."""
protocol, serial_settings = create_dsmr_protocol(
dsmr_version, telegram_callback, loop=None)
dsmr_version, telegram_callback, loop=None, encryption_key=encryption_key, authentication_key=authentication_key)
serial_settings['url'] = port
conn = create_serial_connection(loop, protocol, **serial_settings)

View File

@ -6,17 +6,17 @@ from serial_asyncio import create_serial_connection
from .protocol import DSMRProtocol, _create_dsmr_protocol
def create_rfxtrx_dsmr_protocol(dsmr_version, telegram_callback, loop=None, **kwargs):
def create_rfxtrx_dsmr_protocol(dsmr_version, telegram_callback, loop=None, encryption_key="", authentication_key="", **kwargs):
"""Creates a RFXtrxDSMR asyncio protocol."""
protocol = _create_dsmr_protocol(dsmr_version, telegram_callback,
RFXtrxDSMRProtocol, loop, **kwargs)
RFXtrxDSMRProtocol, loop, encryption_key, authentication_key,**kwargs)
return protocol
def create_rfxtrx_dsmr_reader(port, dsmr_version, telegram_callback, loop=None):
def create_rfxtrx_dsmr_reader(port, dsmr_version, telegram_callback, loop=None, encryption_key="", authentication_key=""):
"""Creates a DSMR asyncio protocol coroutine using a RFXtrx serial port."""
protocol, serial_settings = create_rfxtrx_dsmr_protocol(
dsmr_version, telegram_callback, loop=None)
dsmr_version, telegram_callback, loop=None, encryption_key=encryption_key, authentication_key=authentication_key)
serial_settings['url'] = port
conn = create_serial_connection(loop, protocol, **serial_settings)

View File

@ -49,8 +49,8 @@ class TelegramParser(object):
if "general_global_cipher" in self.telegram_specification:
if self.telegram_specification["general_global_cipher"]:
enc_key = unhexlify(encryption_key)
auth_key = unhexlify(authentication_key)
enc_key = unhexlify(self.telegram_specification["encryption_key"] if "encryption_key" in self.telegram_specification else encryption_key)
auth_key = unhexlify(self.telegram_specification["authentication_key"] if "authentication_key" in self.telegram_specification else authentication_key)
telegram_data = unhexlify(telegram_data)
apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data)
if apdu.security_control.security_suite != 0:

View File

@ -54,6 +54,14 @@ class TelegramParserEncryptedTest(unittest.TestCase):
return full_frame
def test_parsing_encryption_key_within_specification(self):
specification = deepcopy(telegram_specifications.SAGEMCOM_T210_D_R)
specification['encryption_key'] = self.DUMMY_ENCRYPTION_KEY
specification['authentication_key'] = self.DUMMY_AUTHENTICATION_KEY
parser = TelegramParser(specification)
result = parser.parse(self.__generate_encrypted().hex())
self.assertEqual(len(result), 18)
def test_parse(self):
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
result = parser.parse(self.__generate_encrypted().hex(),
@ -104,4 +112,4 @@ class TelegramParserEncryptedTest(unittest.TestCase):
only_auth = self.__generate_encrypted(0, authenticated=True, encrypted=False).hex()
with self.assertRaises(ValueError):
parser.parse(only_auth, authentication_key=self.DUMMY_AUTHENTICATION_KEY)
parser.parse(only_auth, authentication_key=self.DUMMY_AUTHENTICATION_KEY)