Adds support for the Sagemcom T210-D-r smart meter installed by Austrian grid operators.

The smart meter uses the DSMR/P1 standard and embeds the data in an encrypted and authenticated "DLMS General Global Cipher" frame.
The encryption/decryption is handled by the "dlms_cosem" lib.
Test cases are included.
Adds OBIS codes for reactive energy.
This commit is contained in:
Richard Schleich 2022-10-14 17:50:02 +02:00
parent 2aba206c6f
commit 09f4afcada
No known key found for this signature in database
GPG Key ID: 6FDD55C9F0B68D64
7 changed files with 211 additions and 3 deletions

View File

@ -12,6 +12,14 @@ ELECTRICITY_USED_TARIFF_1 = r'\d-\d:1\.8\.1.+?\r\n'
ELECTRICITY_USED_TARIFF_2 = r'\d-\d:1\.8\.2.+?\r\n' ELECTRICITY_USED_TARIFF_2 = r'\d-\d:1\.8\.2.+?\r\n'
ELECTRICITY_DELIVERED_TARIFF_1 = r'\d-\d:2\.8\.1.+?\r\n' ELECTRICITY_DELIVERED_TARIFF_1 = r'\d-\d:2\.8\.1.+?\r\n'
ELECTRICITY_DELIVERED_TARIFF_2 = r'\d-\d:2\.8\.2.+?\r\n' ELECTRICITY_DELIVERED_TARIFF_2 = r'\d-\d:2\.8\.2.+?\r\n'
CURRENT_REACTIVE_EXPORTED = r'\d-\d:3\.7\.0.+?\r\n'
ELECTRICITY_REACTIVE_IMPORTED_TOTAL = r'\d-\d:3\.8\.0.+?\r\n'
ELECTRICITY_REACTIVE_IMPORTED_TARIFF_1 = r'\d-\d:3\.8\.1.+?\r\n'
ELECTRICITY_REACTIVE_IMPORTED_TARIFF_2 = r'\d-\d:3\.8\.2.+?\r\n'
CURRENT_REACTIVE_IMPORTED = r'\d-\d:4\.7\.0.+?\r\n'
ELECTRICITY_REACTIVE_EXPORTED_TOTAL = r'\d-\d:4\.8\.0.+?\r\n'
ELECTRICITY_REACTIVE_EXPORTED_TARIFF_1 = r'\d-\d:4\.8\.1.+?\r\n'
ELECTRICITY_REACTIVE_EXPORTED_TARIFF_2 = r'\d-\d:4\.8\.2.+?\r\n'
ELECTRICITY_ACTIVE_TARIFF = r'\d-\d:96\.14\.0.+?\r\n' ELECTRICITY_ACTIVE_TARIFF = r'\d-\d:96\.14\.0.+?\r\n'
EQUIPMENT_IDENTIFIER = r'\d-\d:96\.1\.1.+?\r\n' EQUIPMENT_IDENTIFIER = r'\d-\d:96\.1\.1.+?\r\n'
CURRENT_ELECTRICITY_USAGE = r'\d-\d:1\.7\.0.+?\r\n' CURRENT_ELECTRICITY_USAGE = r'\d-\d:1\.7\.0.+?\r\n'

View File

@ -1,8 +1,12 @@
import logging import logging
import re import re
from binascii import unhexlify
from ctypes import c_ushort from ctypes import c_ushort
from dlms_cosem.connection import XDlmsApduFactory
from dlms_cosem.protocol.xdlms import GeneralGlobalCipher
from dsmr_parser.objects import MBusObject, CosemObject, ProfileGenericObject from dsmr_parser.objects import MBusObject, CosemObject, ProfileGenericObject
from dsmr_parser.exceptions import ParseError, InvalidChecksumError from dsmr_parser.exceptions import ParseError, InvalidChecksumError
@ -22,14 +26,15 @@ class TelegramParser(object):
self.telegram_specification = telegram_specification self.telegram_specification = telegram_specification
self.apply_checksum_validation = apply_checksum_validation self.apply_checksum_validation = apply_checksum_validation
def parse(self, telegram_data): def parse(self, telegram_data, encryption_key="", authentication_key=""): # noqa: C901
""" """
Parse telegram from string to dict. Parse telegram from string to dict.
The telegram str type makes python 2.x integration easier. The telegram str type makes python 2.x integration easier.
:param str telegram_data: full telegram from start ('/') to checksum :param str telegram_data: full telegram from start ('/') to checksum
('!ABCD') including line endings in between the telegram's lines ('!ABCD') including line endings in between the telegram's lines
:param str encryption_key: encryption key
:param str authentication_key: authentication key
:rtype: dict :rtype: dict
:returns: Shortened example: :returns: Shortened example:
{ {
@ -43,6 +48,38 @@ class TelegramParser(object):
:raises InvalidChecksumError: :raises InvalidChecksumError:
""" """
if "general_global_cipher" in self.telegram_specification:
if self.telegram_specification["general_global_cipher"]:
enc_key = unhexlify(encryption_key)
auth_key = unhexlify(authentication_key)
telegram_data = unhexlify(telegram_data)
apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data)
if apdu.security_control.security_suite != 0:
logger.warning("Untested security suite")
if apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested authentication only")
if not apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested not encrypted or authenticated")
if apdu.security_control.compressed:
logger.warning("Untested compression")
if apdu.security_control.broadcast_key:
logger.warning("Untested broadcast key")
telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii")
else:
try:
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
raise RuntimeError("Looks like a general_global_cipher frame "
"but telegram specification is not matching!")
except Exception:
pass
else:
try:
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
raise RuntimeError(
"Looks like a general_global_cipher frame but telegram specification is not matching!")
except Exception:
pass
if self.apply_checksum_validation \ if self.apply_checksum_validation \
and self.telegram_specification['checksum_support']: and self.telegram_specification['checksum_support']:
self.validate_checksum(telegram_data) self.validate_checksum(telegram_data)

View File

@ -201,3 +201,33 @@ Q3D = {
obis.Q3D_EQUIPMENT_SERIALNUMBER: CosemParser(ValueParser(str)), obis.Q3D_EQUIPMENT_SERIALNUMBER: CosemParser(ValueParser(str)),
}, },
} }
SAGEMCOM_T210_D_R = {
"general_global_cipher": True,
"checksum_support": True,
'objects': {
obis.P1_MESSAGE_HEADER: CosemParser(ValueParser(str)),
obis.P1_MESSAGE_TIMESTAMP: CosemParser(ValueParser(timestamp)),
obis.ELECTRICITY_IMPORTED_TOTAL: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_USED_TARIFF_2: CosemParser(ValueParser(Decimal)),
obis.CURRENT_ELECTRICITY_USAGE: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_REACTIVE_EXPORTED_TOTAL: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_REACTIVE_EXPORTED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_REACTIVE_EXPORTED_TARIFF_2: CosemParser(ValueParser(Decimal)),
obis.CURRENT_REACTIVE_IMPORTED: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_EXPORTED_TOTAL: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_DELIVERED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_DELIVERED_TARIFF_2: CosemParser(ValueParser(Decimal)),
obis.CURRENT_ELECTRICITY_DELIVERY: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_REACTIVE_IMPORTED_TOTAL: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_REACTIVE_IMPORTED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_REACTIVE_IMPORTED_TARIFF_2: CosemParser(ValueParser(Decimal)),
obis.CURRENT_REACTIVE_EXPORTED: CosemParser(ValueParser(Decimal)),
}
}
AUSTRIA_ENERGIENETZE_STEIERMARK = SAGEMCOM_T210_D_R

View File

@ -13,7 +13,8 @@ setup(
'pyserial>=3,<4', 'pyserial>=3,<4',
'pyserial-asyncio<1', 'pyserial-asyncio<1',
'pytz', 'pytz',
'Tailer==0.4.1' 'Tailer==0.4.1',
'dlms_cosem==21.3.2'
], ],
entry_points={ entry_points={
'console_scripts': ['dsmr_console=dsmr_parser.__main__:console'] 'console_scripts': ['dsmr_console=dsmr_parser.__main__:console']

View File

@ -169,3 +169,27 @@ TELEGRAM_ESY5Q3DA1004_V304 = (
' 25818685\r\n' ' 25818685\r\n'
'DE0000000000000000000000000000003\r\n' 'DE0000000000000000000000000000003\r\n'
) )
TELEGRAM_SAGEMCOM_T210_D_R = (
'/EST5\\253710000_A\r\n'
'\r\n'
'1-3:0.2.8(50)\r\n'
'0-0:1.0.0(221006155014S)\r\n'
'1-0:1.8.0(006545766*Wh)\r\n'
'1-0:1.8.1(005017120*Wh)\r\n'
'1-0:1.8.2(001528646*Wh)\r\n'
'1-0:1.7.0(000000286*W)\r\n'
'1-0:2.8.0(000000058*Wh)\r\n'
'1-0:2.8.1(000000000*Wh)\r\n'
'1-0:2.8.2(000000058*Wh)\r\n'
'1-0:2.7.0(000000000*W)\r\n'
'1-0:3.8.0(000000747*varh)\r\n'
'1-0:3.8.1(000000000*varh)\r\n'
'1-0:3.8.2(000000747*varh)\r\n'
'1-0:3.7.0(000000000*var)\r\n'
'1-0:4.8.0(003897726*varh)\r\n'
'1-0:4.8.1(002692848*varh)\r\n'
'1-0:4.8.2(001204878*varh)\r\n'
'1-0:4.7.0(000000166*var)\r\n'
'!7EF9\r\n'
)

View File

@ -0,0 +1,107 @@
from binascii import unhexlify
from copy import deepcopy
import unittest
from dlms_cosem.exceptions import DecryptionError
from dlms_cosem.protocol.xdlms import GeneralGlobalCipher
from dlms_cosem.security import SecurityControlField, encrypt
from dsmr_parser import telegram_specifications
from dsmr_parser.exceptions import ParseError
from dsmr_parser.parsers import TelegramParser
from test.example_telegrams import TELEGRAM_SAGEMCOM_T210_D_R
class TelegramParserEncryptedTest(unittest.TestCase):
""" Test parsing of a DSML encypted DSMR v5.x telegram. """
DUMMY_ENCRYPTION_KEY = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
DUMMY_AUTHENTICATION_KEY = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
def __generate_encrypted(self, security_suite=0, authenticated=True, encrypted=True):
security_control = SecurityControlField(
security_suite=security_suite, authenticated=authenticated, encrypted=encrypted
)
encryption_key = unhexlify(self.DUMMY_ENCRYPTION_KEY)
authentication_key = unhexlify(self.DUMMY_AUTHENTICATION_KEY)
system_title = "SYSTEMID".encode("ascii")
invocation_counter = int.from_bytes(bytes.fromhex("10000001"), "big")
plain_data = TELEGRAM_SAGEMCOM_T210_D_R.encode("ascii")
encrypted = encrypt(
security_control=security_control,
key=encryption_key,
auth_key=authentication_key,
system_title=system_title,
invocation_counter=invocation_counter,
plain_text=plain_data,
)
full_frame = bytearray(GeneralGlobalCipher.TAG.to_bytes(1, "big", signed=False))
full_frame.extend(len(system_title).to_bytes(1, "big", signed=False))
full_frame.extend(system_title)
full_frame.extend([0x82]) # Length of the following length bytes
# https://github.com/pwitab/dlms-cosem/blob/739f81a58e5f07663a512d4a128851333a0ed5e6/dlms_cosem/a_xdr.py#L33
security_control = security_control.to_bytes()
invocation_counter = invocation_counter.to_bytes(4, "big", signed=False)
full_frame.extend((len(encrypted)
+ len(invocation_counter)
+ len(security_control)).to_bytes(2, "big", signed=False))
full_frame.extend(security_control)
full_frame.extend(invocation_counter)
full_frame.extend(encrypted)
return full_frame
def test_parse(self):
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
result = parser.parse(self.__generate_encrypted().hex(),
self.DUMMY_ENCRYPTION_KEY,
self.DUMMY_AUTHENTICATION_KEY)
self.assertEqual(len(result), 18)
def test_damaged_frame(self):
# If the frame is damaged decrypting fails (crc is technically not needed)
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
generated = self.__generate_encrypted()
generated[150] = 0x00
generated = generated.hex()
with self.assertRaises(DecryptionError):
parser.parse(generated, self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY)
def test_plain(self):
# If a plain request is parsed with "general_global_cipher": True it fails
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
with self.assertRaises(Exception):
parser.parse(TELEGRAM_SAGEMCOM_T210_D_R, self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY)
def test_general_global_cipher_not_specified(self):
# If a GGC frame is detected but general_global_cipher is not set it fails
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
parser = deepcopy(parser) # We do not want to change the module value
parser.telegram_specification['general_global_cipher'] = False
with self.assertRaises(ParseError):
parser.parse(self.__generate_encrypted().hex(), self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY)
def test_only_encrypted(self):
# Not implemented by dlms_cosem
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
only_auth = self.__generate_encrypted(0, authenticated=False, encrypted=True).hex()
with self.assertRaises(ValueError):
parser.parse(only_auth, self.DUMMY_ENCRYPTION_KEY)
def test_only_auth(self):
# Not implemented by dlms_cosem
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
only_auth = self.__generate_encrypted(0, authenticated=True, encrypted=False).hex()
with self.assertRaises(ValueError):
parser.parse(only_auth, authentication_key=self.DUMMY_AUTHENTICATION_KEY)

View File

@ -5,6 +5,7 @@ deps=
pylama pylama
pytest-asyncio pytest-asyncio
pytest-mock pytest-mock
dlms_cosem
commands= commands=
py.test --cov=dsmr_parser test {posargs} py.test --cov=dsmr_parser test {posargs}
pylama dsmr_parser test pylama dsmr_parser test