108 lines
4.7 KiB
Python
108 lines
4.7 KiB
Python
|
from binascii import unhexlify
|
||
|
from copy import deepcopy
|
||
|
|
||
|
import unittest
|
||
|
|
||
|
from dlms_cosem.exceptions import DecryptionError
|
||
|
from dlms_cosem.protocol.xdlms import GeneralGlobalCipher
|
||
|
from dlms_cosem.security import SecurityControlField, encrypt
|
||
|
|
||
|
from dsmr_parser import telegram_specifications
|
||
|
from dsmr_parser.exceptions import ParseError
|
||
|
from dsmr_parser.parsers import TelegramParser
|
||
|
from test.example_telegrams import TELEGRAM_SAGEMCOM_T210_D_R
|
||
|
|
||
|
|
||
|
class TelegramParserEncryptedTest(unittest.TestCase):
|
||
|
""" Test parsing of a DSML encypted DSMR v5.x telegram. """
|
||
|
DUMMY_ENCRYPTION_KEY = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
|
||
|
DUMMY_AUTHENTICATION_KEY = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
|
||
|
|
||
|
def __generate_encrypted(self, security_suite=0, authenticated=True, encrypted=True):
|
||
|
security_control = SecurityControlField(
|
||
|
security_suite=security_suite, authenticated=authenticated, encrypted=encrypted
|
||
|
)
|
||
|
encryption_key = unhexlify(self.DUMMY_ENCRYPTION_KEY)
|
||
|
authentication_key = unhexlify(self.DUMMY_AUTHENTICATION_KEY)
|
||
|
system_title = "SYSTEMID".encode("ascii")
|
||
|
invocation_counter = int.from_bytes(bytes.fromhex("10000001"), "big")
|
||
|
plain_data = TELEGRAM_SAGEMCOM_T210_D_R.encode("ascii")
|
||
|
|
||
|
encrypted = encrypt(
|
||
|
security_control=security_control,
|
||
|
key=encryption_key,
|
||
|
auth_key=authentication_key,
|
||
|
system_title=system_title,
|
||
|
invocation_counter=invocation_counter,
|
||
|
plain_text=plain_data,
|
||
|
)
|
||
|
|
||
|
full_frame = bytearray(GeneralGlobalCipher.TAG.to_bytes(1, "big", signed=False))
|
||
|
full_frame.extend(len(system_title).to_bytes(1, "big", signed=False))
|
||
|
full_frame.extend(system_title)
|
||
|
full_frame.extend([0x82]) # Length of the following length bytes
|
||
|
# https://github.com/pwitab/dlms-cosem/blob/739f81a58e5f07663a512d4a128851333a0ed5e6/dlms_cosem/a_xdr.py#L33
|
||
|
|
||
|
security_control = security_control.to_bytes()
|
||
|
invocation_counter = invocation_counter.to_bytes(4, "big", signed=False)
|
||
|
full_frame.extend((len(encrypted)
|
||
|
+ len(invocation_counter)
|
||
|
+ len(security_control)).to_bytes(2, "big", signed=False))
|
||
|
full_frame.extend(security_control)
|
||
|
full_frame.extend(invocation_counter)
|
||
|
full_frame.extend(encrypted)
|
||
|
|
||
|
return full_frame
|
||
|
|
||
|
def test_parse(self):
|
||
|
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
|
||
|
result = parser.parse(self.__generate_encrypted().hex(),
|
||
|
self.DUMMY_ENCRYPTION_KEY,
|
||
|
self.DUMMY_AUTHENTICATION_KEY)
|
||
|
self.assertEqual(len(result), 18)
|
||
|
|
||
|
def test_damaged_frame(self):
|
||
|
# If the frame is damaged decrypting fails (crc is technically not needed)
|
||
|
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
|
||
|
|
||
|
generated = self.__generate_encrypted()
|
||
|
generated[150] = 0x00
|
||
|
generated = generated.hex()
|
||
|
|
||
|
with self.assertRaises(DecryptionError):
|
||
|
parser.parse(generated, self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY)
|
||
|
|
||
|
def test_plain(self):
|
||
|
# If a plain request is parsed with "general_global_cipher": True it fails
|
||
|
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
|
||
|
|
||
|
with self.assertRaises(Exception):
|
||
|
parser.parse(TELEGRAM_SAGEMCOM_T210_D_R, self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY)
|
||
|
|
||
|
def test_general_global_cipher_not_specified(self):
|
||
|
# If a GGC frame is detected but general_global_cipher is not set it fails
|
||
|
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
|
||
|
parser = deepcopy(parser) # We do not want to change the module value
|
||
|
parser.telegram_specification['general_global_cipher'] = False
|
||
|
|
||
|
with self.assertRaises(ParseError):
|
||
|
parser.parse(self.__generate_encrypted().hex(), self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY)
|
||
|
|
||
|
def test_only_encrypted(self):
|
||
|
# Not implemented by dlms_cosem
|
||
|
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
|
||
|
|
||
|
only_auth = self.__generate_encrypted(0, authenticated=False, encrypted=True).hex()
|
||
|
|
||
|
with self.assertRaises(ValueError):
|
||
|
parser.parse(only_auth, self.DUMMY_ENCRYPTION_KEY)
|
||
|
|
||
|
def test_only_auth(self):
|
||
|
# Not implemented by dlms_cosem
|
||
|
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
|
||
|
|
||
|
only_auth = self.__generate_encrypted(0, authenticated=True, encrypted=False).hex()
|
||
|
|
||
|
with self.assertRaises(ValueError):
|
||
|
parser.parse(only_auth, authentication_key=self.DUMMY_AUTHENTICATION_KEY)
|