moved decryption logic and simplified it a bit

This commit is contained in:
Nigel Dokter 2024-12-22 14:28:22 +01:00
parent eb0d3d7353
commit bd2e64b0cd
3 changed files with 43 additions and 62 deletions

View File

@ -32,6 +32,7 @@ class TelegramParser(object):
object["obis_reference"]: re.compile(object["obis_reference"], re.DOTALL | re.MULTILINE)
for object in self.telegram_specification['objects']
}
self._telegram_encryption_active = None
def parse(self, telegram_data, encryption_key="", authentication_key="", throw_ex=False): # noqa: C901
"""
@ -46,38 +47,11 @@ class TelegramParser(object):
:raises ParseError:
:raises InvalidChecksumError:
"""
if "general_global_cipher" in self.telegram_specification:
if self.telegram_specification["general_global_cipher"]:
enc_key = unhexlify(encryption_key)
auth_key = unhexlify(authentication_key)
telegram_data = unhexlify(telegram_data)
apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data)
if apdu.security_control.security_suite != 0:
logger.warning("Untested security suite")
if apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested authentication only")
if not apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested not encrypted or authenticated")
if apdu.security_control.compressed:
logger.warning("Untested compression")
if apdu.security_control.broadcast_key:
logger.warning("Untested broadcast key")
telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii")
else:
try:
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
raise RuntimeError("Looks like a general_global_cipher frame "
"but telegram specification is not matching!")
except Exception:
pass
else:
try:
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
raise RuntimeError(
"Looks like a general_global_cipher frame but telegram specification is not matching!")
except Exception:
pass
telegram_data = self.decrypt_telegram_data(
telegram_data=telegram_data,
encryption_key=encryption_key,
authentication_key=authentication_key
)
if self.apply_checksum_validation and self.telegram_specification['checksum_support']:
self.validate_checksum(telegram_data)
@ -112,6 +86,42 @@ class TelegramParser(object):
return telegram
def decrypt_telegram_data(self, encryption_key, authentication_key, telegram_data):
"""
Check if telegram data is encrypted and decrypt if applicable.
"""
# if self._telegram_encryption_active is False:
# # If encryption is not working, stop trying and logging warnings.
# return telegram_data
if self.telegram_specification.get("general_global_cipher"):
enc_key = unhexlify(encryption_key)
auth_key = unhexlify(authentication_key)
telegram_data = unhexlify(telegram_data)
apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data)
if apdu.security_control.security_suite != 0:
logger.warning("Untested security suite")
if apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested authentication only")
if not apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested not encrypted or authenticated")
if apdu.security_control.compressed:
logger.warning("Untested compression")
if apdu.security_control.broadcast_key:
logger.warning("Untested broadcast key")
telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii")
self._telegram_encryption_active = True
else:
try:
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
logger.warning("Looks like a general_global_cipher frame "
"but telegram specification is not matching!")
except Exception:
pass
self._telegram_encryption_active = False
return telegram_data
@staticmethod
def validate_checksum(telegram):
"""

View File

@ -1,29 +0,0 @@
import unittest
import tempfile
from unittest import mock
from dsmr_parser import telegram_specifications
from dsmr_parser.clients.filereader import FileReader
from dsmr_parser.clients.serial_ import SerialReader
from dsmr_parser.clients.settings import SERIAL_SETTINGS_V5
from test.example_telegrams import TELEGRAM_V5
class SerialReaderTest(unittest.TestCase):
@mock.patch('dsmr_parser.clients.serial_.serial.Serial')
def test_read_as_object(self, mock_serial):
serial_handle_mock = mock_serial.return_value
# mock_serial.return_value.in_waiting = 1024
mock_serial.return_value.read.return_value = [b'Telegram data...', b''] # Return data, then empty bytes
serial_reader = SerialReader(
device='/dev/ttyUSB0',
serial_settings=SERIAL_SETTINGS_V5,
telegram_specification=telegram_specifications.V5
)
for telegram in serial_reader.read():
print(telegram) # see 'Telegram object' docs below

View File

@ -21,7 +21,7 @@ class TelegramParserV5Test(unittest.TestCase):
telegram = parser.parse(TELEGRAM_V5, throw_ex=True)
except Exception as ex:
assert False, f"parse trigged an exception {ex}"
print('test: ', type(telegram.P1_MESSAGE_HEADER), telegram.P1_MESSAGE_HEADER.__dict__)
# P1_MESSAGE_HEADER (1-3:0.2.8)
assert isinstance(telegram.P1_MESSAGE_HEADER, CosemObject)
assert telegram.P1_MESSAGE_HEADER.unit is None