From eb0d3d7353177e994d4d56f87f952d1195da0306 Mon Sep 17 00:00:00 2001 From: Nigel Dokter <mail@nldr.net> Date: Sun, 22 Dec 2024 13:37:21 +0100 Subject: [PATCH 1/2] Move client tests to own folder --- .gitignore | 1 + test/clients/__init__.py | 0 test/clients/test_filereader.py | 21 ++++++++ test/clients/test_rfxtrx_protocol.py | 77 ++++++++++++++++++++++++++++ test/clients/test_serialreader.py | 29 +++++++++++ 5 files changed, 128 insertions(+) create mode 100644 test/clients/__init__.py create mode 100644 test/clients/test_filereader.py create mode 100644 test/clients/test_rfxtrx_protocol.py create mode 100644 test/clients/test_serialreader.py diff --git a/.gitignore b/.gitignore index 6789bb2..fd75316 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ .tox .cache .venv +.history *.egg-info /.project /.pydevproject diff --git a/test/clients/__init__.py b/test/clients/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/clients/test_filereader.py b/test/clients/test_filereader.py new file mode 100644 index 0000000..857a111 --- /dev/null +++ b/test/clients/test_filereader.py @@ -0,0 +1,21 @@ +import unittest +import tempfile + +from dsmr_parser.clients.filereader import FileReader +from dsmr_parser.telegram_specifications import V5 +from test.example_telegrams import TELEGRAM_V5 + + +class FileReaderTest(unittest.TestCase): + def test_read_as_object(self): + with tempfile.NamedTemporaryFile() as file: + with open(file.name, "w") as f: + f.write(TELEGRAM_V5) + + telegrams = [] + reader = FileReader(file=file.name, telegram_specification=V5) + # Call + for telegram in reader.read_as_object(): + telegrams.append(telegram) + + self.assertEqual(len(telegrams), 1) diff --git a/test/clients/test_rfxtrx_protocol.py b/test/clients/test_rfxtrx_protocol.py new file mode 100644 index 0000000..6770bd5 --- /dev/null +++ b/test/clients/test_rfxtrx_protocol.py @@ -0,0 +1,77 @@ +from unittest.mock import Mock + +import unittest + +from dsmr_parser import obis_references as obis +from dsmr_parser.clients.rfxtrx_protocol import create_rfxtrx_dsmr_protocol, PACKETTYPE_DSMR, SUBTYPE_P1 +from dsmr_parser.objects import Telegram + +TELEGRAM_V2_2 = ( + '/ISk5\2MT382-1004\r\n' + '\r\n' + '0-0:96.1.1(00000000000000)\r\n' + '1-0:1.8.1(00001.001*kWh)\r\n' + '1-0:1.8.2(00001.001*kWh)\r\n' + '1-0:2.8.1(00001.001*kWh)\r\n' + '1-0:2.8.2(00001.001*kWh)\r\n' + '0-0:96.14.0(0001)\r\n' + '1-0:1.7.0(0001.01*kW)\r\n' + '1-0:2.7.0(0000.00*kW)\r\n' + '0-0:17.0.0(0999.00*kW)\r\n' + '0-0:96.3.10(1)\r\n' + '0-0:96.13.1()\r\n' + '0-0:96.13.0()\r\n' + '0-1:24.1.0(3)\r\n' + '0-1:96.1.0(000000000000)\r\n' + '0-1:24.3.0(161107190000)(00)(60)(1)(0-1:24.2.1)(m3)\r\n' + '(00001.001)\r\n' + '0-1:24.4.0(1)\r\n' + '!\r\n' +) + +OTHER_RF_PACKET = b'\x03\x01\x02\x03' + + +def encode_telegram_as_RF_packets(telegram): + data = b'' + + for line in telegram.split('\n'): + packet_data = (line + '\n').encode('ascii') + packet_header = bytes(bytearray([ + len(packet_data) + 3, # excluding length byte + PACKETTYPE_DSMR, + SUBTYPE_P1, + 0 # seq num (ignored) + ])) + + data += packet_header + packet_data + # other RF packets can pass by on the line + data += OTHER_RF_PACKET + + return data + + +class RFXtrxProtocolTest(unittest.TestCase): + + def setUp(self): + new_protocol, _ = create_rfxtrx_dsmr_protocol('2.2', + telegram_callback=Mock(), + keep_alive_interval=1) + self.protocol = new_protocol() + + def test_complete_packet(self): + """Protocol should assemble incoming lines into complete packet.""" + + data = encode_telegram_as_RF_packets(TELEGRAM_V2_2) + # send data broken up in two parts + self.protocol.data_received(data[0:200]) + self.protocol.data_received(data[200:]) + + telegram = self.protocol.telegram_callback.call_args_list[0][0][0] + assert isinstance(telegram, Telegram) + + assert float(telegram[obis.CURRENT_ELECTRICITY_USAGE].value) == 1.01 + assert telegram[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW' + + assert float(telegram[obis.GAS_METER_READING].value) == 1.001 + assert telegram[obis.GAS_METER_READING].unit == 'm3' diff --git a/test/clients/test_serialreader.py b/test/clients/test_serialreader.py new file mode 100644 index 0000000..3457f4c --- /dev/null +++ b/test/clients/test_serialreader.py @@ -0,0 +1,29 @@ +import unittest +import tempfile +from unittest import mock + +from dsmr_parser import telegram_specifications +from dsmr_parser.clients.filereader import FileReader +from dsmr_parser.clients.serial_ import SerialReader +from dsmr_parser.clients.settings import SERIAL_SETTINGS_V5 + +from test.example_telegrams import TELEGRAM_V5 + + +class SerialReaderTest(unittest.TestCase): + + @mock.patch('dsmr_parser.clients.serial_.serial.Serial') + def test_read_as_object(self, mock_serial): + serial_handle_mock = mock_serial.return_value + # mock_serial.return_value.in_waiting = 1024 + mock_serial.return_value.read.return_value = [b'Telegram data...', b''] # Return data, then empty bytes + + + serial_reader = SerialReader( + device='/dev/ttyUSB0', + serial_settings=SERIAL_SETTINGS_V5, + telegram_specification=telegram_specifications.V5 + ) + + for telegram in serial_reader.read(): + print(telegram) # see 'Telegram object' docs below \ No newline at end of file From bd2e64b0cdc0e522862241a01f19361ea5847c1b Mon Sep 17 00:00:00 2001 From: Nigel Dokter <mail@nldr.net> Date: Sun, 22 Dec 2024 14:28:22 +0100 Subject: [PATCH 2/2] moved decryption logic and simplified it a bit --- dsmr_parser/parsers.py | 74 ++++++++++++++++++------------- test/clients/test_serialreader.py | 29 ------------ test/test_parse_v5.py | 2 +- 3 files changed, 43 insertions(+), 62 deletions(-) delete mode 100644 test/clients/test_serialreader.py diff --git a/dsmr_parser/parsers.py b/dsmr_parser/parsers.py index d49d2bd..017b877 100644 --- a/dsmr_parser/parsers.py +++ b/dsmr_parser/parsers.py @@ -32,6 +32,7 @@ class TelegramParser(object): object["obis_reference"]: re.compile(object["obis_reference"], re.DOTALL | re.MULTILINE) for object in self.telegram_specification['objects'] } + self._telegram_encryption_active = None def parse(self, telegram_data, encryption_key="", authentication_key="", throw_ex=False): # noqa: C901 """ @@ -46,38 +47,11 @@ class TelegramParser(object): :raises ParseError: :raises InvalidChecksumError: """ - - if "general_global_cipher" in self.telegram_specification: - if self.telegram_specification["general_global_cipher"]: - enc_key = unhexlify(encryption_key) - auth_key = unhexlify(authentication_key) - telegram_data = unhexlify(telegram_data) - apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data) - if apdu.security_control.security_suite != 0: - logger.warning("Untested security suite") - if apdu.security_control.authenticated and not apdu.security_control.encrypted: - logger.warning("Untested authentication only") - if not apdu.security_control.authenticated and not apdu.security_control.encrypted: - logger.warning("Untested not encrypted or authenticated") - if apdu.security_control.compressed: - logger.warning("Untested compression") - if apdu.security_control.broadcast_key: - logger.warning("Untested broadcast key") - telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii") - else: - try: - if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG: - raise RuntimeError("Looks like a general_global_cipher frame " - "but telegram specification is not matching!") - except Exception: - pass - else: - try: - if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG: - raise RuntimeError( - "Looks like a general_global_cipher frame but telegram specification is not matching!") - except Exception: - pass + telegram_data = self.decrypt_telegram_data( + telegram_data=telegram_data, + encryption_key=encryption_key, + authentication_key=authentication_key + ) if self.apply_checksum_validation and self.telegram_specification['checksum_support']: self.validate_checksum(telegram_data) @@ -112,6 +86,42 @@ class TelegramParser(object): return telegram + def decrypt_telegram_data(self, encryption_key, authentication_key, telegram_data): + """ + Check if telegram data is encrypted and decrypt if applicable. + """ + # if self._telegram_encryption_active is False: + # # If encryption is not working, stop trying and logging warnings. + # return telegram_data + + if self.telegram_specification.get("general_global_cipher"): + enc_key = unhexlify(encryption_key) + auth_key = unhexlify(authentication_key) + telegram_data = unhexlify(telegram_data) + apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data) + if apdu.security_control.security_suite != 0: + logger.warning("Untested security suite") + if apdu.security_control.authenticated and not apdu.security_control.encrypted: + logger.warning("Untested authentication only") + if not apdu.security_control.authenticated and not apdu.security_control.encrypted: + logger.warning("Untested not encrypted or authenticated") + if apdu.security_control.compressed: + logger.warning("Untested compression") + if apdu.security_control.broadcast_key: + logger.warning("Untested broadcast key") + telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii") + self._telegram_encryption_active = True + else: + try: + if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG: + logger.warning("Looks like a general_global_cipher frame " + "but telegram specification is not matching!") + except Exception: + pass + self._telegram_encryption_active = False + + return telegram_data + @staticmethod def validate_checksum(telegram): """ diff --git a/test/clients/test_serialreader.py b/test/clients/test_serialreader.py deleted file mode 100644 index 3457f4c..0000000 --- a/test/clients/test_serialreader.py +++ /dev/null @@ -1,29 +0,0 @@ -import unittest -import tempfile -from unittest import mock - -from dsmr_parser import telegram_specifications -from dsmr_parser.clients.filereader import FileReader -from dsmr_parser.clients.serial_ import SerialReader -from dsmr_parser.clients.settings import SERIAL_SETTINGS_V5 - -from test.example_telegrams import TELEGRAM_V5 - - -class SerialReaderTest(unittest.TestCase): - - @mock.patch('dsmr_parser.clients.serial_.serial.Serial') - def test_read_as_object(self, mock_serial): - serial_handle_mock = mock_serial.return_value - # mock_serial.return_value.in_waiting = 1024 - mock_serial.return_value.read.return_value = [b'Telegram data...', b''] # Return data, then empty bytes - - - serial_reader = SerialReader( - device='/dev/ttyUSB0', - serial_settings=SERIAL_SETTINGS_V5, - telegram_specification=telegram_specifications.V5 - ) - - for telegram in serial_reader.read(): - print(telegram) # see 'Telegram object' docs below \ No newline at end of file diff --git a/test/test_parse_v5.py b/test/test_parse_v5.py index a321b21..db9d588 100644 --- a/test/test_parse_v5.py +++ b/test/test_parse_v5.py @@ -21,7 +21,7 @@ class TelegramParserV5Test(unittest.TestCase): telegram = parser.parse(TELEGRAM_V5, throw_ex=True) except Exception as ex: assert False, f"parse trigged an exception {ex}" - print('test: ', type(telegram.P1_MESSAGE_HEADER), telegram.P1_MESSAGE_HEADER.__dict__) + # P1_MESSAGE_HEADER (1-3:0.2.8) assert isinstance(telegram.P1_MESSAGE_HEADER, CosemObject) assert telegram.P1_MESSAGE_HEADER.unit is None