Compare commits

...

4 Commits

Author SHA1 Message Date
98652418cc Merged master into autodetecttelegram 2024-08-25 15:26:54 +02:00
Nigel Dokter
d216996fe6 updated changelog in preparation of v0.9 2017-03-02 19:15:19 +01:00
Nigel Dokter
aef37837c5 Merge branch 'issue_22' into development 2017-03-02 19:08:17 +01:00
Nigel Dokter
6305d3d37f finished match_telegram_specification 2017-03-02 19:05:17 +01:00
10 changed files with 134 additions and 13 deletions

View File

@ -1,6 +1,9 @@
Change Log
----------
**1.5.0** (2024-08-25)
- Allow the telegram specification to optionally be autodetected (`PR #87 <https://github.com/ndokter/dsmr_parser/pull/87>`_ by `clonerswords <https://github.com/clonerswords>`_)
**1.4.2** (2024-07-14)
- Bump Github Actions to latest versions in favor of Node deprecations (`PR #159 <https://github.com/ndokter/dsmr_parser/pull/159>`_ by `dennissiemensma <https://github.com/dennissiemensma>`_)
@ -167,7 +170,7 @@ Remove deprecated asyncio coroutine decorator (`PR #76 <https://github.com/ndokt
**0.12** (2018-09-23)
- Add serial settings for DSMR v5.0 (`PR #31 <https://github.com/ndokter/dsmr_parser/pull/31>`_).
- Lux-creos-obis-1.8.0 (`PR #32 <https://github.com/ndokter/dsmr_parser/pull/32>`_).
- Lux-creos-obis-1.8.0 (`PR #32 <https://github.com/ndokter/dsmr_parser/pull/32>`_).
**0.11** (2017-09-18)

View File

@ -4,3 +4,7 @@ class ParseError(Exception):
class InvalidChecksumError(ParseError):
pass
class TelegramSpecificationMatchError(ParseError):
pass

View File

@ -9,7 +9,7 @@ from dlms_cosem.connection import XDlmsApduFactory
from dlms_cosem.protocol.xdlms import GeneralGlobalCipher
from dsmr_parser.objects import MBusObject, MBusObjectPeak, CosemObject, ProfileGenericObject, Telegram
from dsmr_parser.exceptions import ParseError, InvalidChecksumError
from dsmr_parser.exceptions import ParseError, InvalidChecksumError, TelegramSpecificationMatchError
from dsmr_parser.value_types import timestamp
logger = logging.getLogger(__name__)
@ -18,20 +18,17 @@ logger = logging.getLogger(__name__)
class TelegramParser(object):
crc16_tab = []
def __init__(self, telegram_specification, apply_checksum_validation=True):
def __init__(self, telegram_specification=None, apply_checksum_validation=True):
"""
:param telegram_specification: determines how the telegram is parsed
:param telegram_specification: determines how the telegram is parsed.
Will attempt to autodetect if omitted.
:param apply_checksum_validation: validate checksum if applicable for
telegram DSMR version (v4 and up).
:type telegram_specification: dict
"""
self.apply_checksum_validation = apply_checksum_validation
self.telegram_specification = telegram_specification
# Regexes are compiled once to improve performance
self.telegram_specification_regexes = {
object["obis_reference"]: re.compile(object["obis_reference"], re.DOTALL | re.MULTILINE)
for object in self.telegram_specification['objects']
}
self._telegram_specification_regex = None
def parse(self, telegram_data, encryption_key="", authentication_key="", throw_ex=False): # noqa: C901
"""
@ -46,6 +43,15 @@ class TelegramParser(object):
:raises ParseError:
:raises InvalidChecksumError:
"""
if not self.telegram_specification:
self.telegram_specification = \
match_telegram_specification(telegram_data)
if not self._telegram_specification_regex:
# Regexes are compiled once to improve performance
self._telegram_specification_regexes = {
object["obis_reference"]: re.compile(object["obis_reference"], re.DOTALL | re.MULTILINE)
for object in self.telegram_specification['objects']
}
if "general_global_cipher" in self.telegram_specification:
if self.telegram_specification["general_global_cipher"]:
@ -85,7 +91,7 @@ class TelegramParser(object):
telegram = Telegram()
for object in self.telegram_specification['objects']:
pattern = self.telegram_specification_regexes[object["obis_reference"]]
pattern = self._telegram_specification_regexes[object["obis_reference"]]
matches = pattern.findall(telegram_data)
# Some signatures are optional and may not be present,
@ -173,6 +179,37 @@ class TelegramParser(object):
return crcValue
def match_telegram_specification(telegram_data):
"""
Find telegram specification that matches the telegram data by trying all
specifications.
Could be further optimized to check the actual 0.2.8 OBIS reference which
is available for DSMR version 4 and up.
:param str telegram_data: full telegram from start ('/') to checksum
('!ABCD') including line endings in between the telegram's lines
:return: telegram specification
:rtype: dict
"""
# Prevent circular import
from dsmr_parser import telegram_specifications
for specification in telegram_specifications.ALL:
try:
TelegramParser(specification).parse(telegram_data)
except ParseError:
pass
else:
return specification
raise TelegramSpecificationMatchError(
'Could automatically match telegram specification. Make sure the data'
'is not corrupt. Alternatively manually specify one.'
)
class DSMRObjectParser(object):
"""
Parses an object (can also be see as a 'line') from a telegram.

View File

@ -486,8 +486,6 @@ V5 = {
]
}
ALL = (V2_2, V3, V4, V5)
BELGIUM_FLUVIUS = {
'checksum_support': True,
'objects': [
@ -1368,3 +1366,17 @@ EON_HUNGARY = {
}
]
}
ALL = (
V2_2,
V3,
V4,
V5,
BELGIUM_FLUVIUS,
LUXEMBOURG_SMARTY,
SWEDEN,
SAGEMCOM_T210_D_R,
AUSTRIA_ENERGIENETZE_STEIERMARK,
ISKRA_IE,
EON_HUNGARY
)

View File

@ -7,7 +7,7 @@ setup(
author_email='mail@nldr.net',
license='MIT',
url='https://github.com/ndokter/dsmr_parser',
version='1.4.2',
version='1.5.0',
packages=find_packages(exclude=('test', 'test.*')),
install_requires=[
'pyserial>=3,<4',

View File

@ -0,0 +1,29 @@
import unittest
from dsmr_parser.exceptions import TelegramSpecificationMatchError
from dsmr_parser.parsers import match_telegram_specification
from dsmr_parser import telegram_specifications
from test import example_telegrams
class MatchTelegramSpecificationTest(unittest.TestCase):
def test_v2_2(self):
assert match_telegram_specification(example_telegrams.TELEGRAM_V2_2) \
== telegram_specifications.V2_2
def test_v3(self):
assert match_telegram_specification(example_telegrams.TELEGRAM_V3) \
== telegram_specifications.V3
def test_v4_2(self):
assert match_telegram_specification(example_telegrams.TELEGRAM_V4_2) \
== telegram_specifications.V4
def test_v5(self):
assert match_telegram_specification(example_telegrams.TELEGRAM_V5) \
== telegram_specifications.V5
def test_malformed_telegram(self):
with self.assertRaises(TelegramSpecificationMatchError):
match_telegram_specification(example_telegrams.TELEGRAM_V5[:-4])

View File

@ -12,6 +12,15 @@ from test.example_telegrams import TELEGRAM_V2_2
class TelegramParserV2_2Test(unittest.TestCase):
""" Test parsing of a DSMR v2.2 telegram. """
def test_telegram_specification_matching(self):
parser = TelegramParser()
parser.parse(TELEGRAM_V2_2)
self.assertEqual(
parser.telegram_specification,
telegram_specifications.V2_2
)
def test_parse(self):
parser = TelegramParser(telegram_specifications.V2_2)
try:

View File

@ -12,6 +12,15 @@ from test.example_telegrams import TELEGRAM_V3
class TelegramParserV3Test(unittest.TestCase):
""" Test parsing of a DSMR v3 telegram. """
def test_telegram_specification_matching(self):
parser = TelegramParser()
parser.parse(TELEGRAM_V3)
self.assertEqual(
parser.telegram_specification,
telegram_specifications.V3
)
def test_parse(self):
parser = TelegramParser(telegram_specifications.V3)
try:

View File

@ -15,6 +15,15 @@ from test.example_telegrams import TELEGRAM_V4_2
class TelegramParserV4_2Test(unittest.TestCase):
""" Test parsing of a DSMR v4.2 telegram. """
def test_telegram_specification_matching(self):
parser = TelegramParser()
parser.parse(TELEGRAM_V4_2)
self.assertEqual(
parser.telegram_specification,
telegram_specifications.V4
)
def test_parse(self):
parser = TelegramParser(telegram_specifications.V4)
try:

View File

@ -15,6 +15,15 @@ from test.example_telegrams import TELEGRAM_V5
class TelegramParserV5Test(unittest.TestCase):
""" Test parsing of a DSMR v5.x telegram. """
def test_telegram_specification_matching(self):
parser = TelegramParser()
parser.parse(TELEGRAM_V5)
self.assertEqual(
parser.telegram_specification,
telegram_specifications.V5
)
def test_parse(self):
parser = TelegramParser(telegram_specifications.V5)
try: