Compare commits
4 Commits
master
...
autodetect
Author | SHA1 | Date | |
---|---|---|---|
98652418cc | |||
|
d216996fe6 | ||
|
aef37837c5 | ||
|
6305d3d37f |
@ -1,6 +1,9 @@
|
||||
Change Log
|
||||
----------
|
||||
|
||||
**1.5.0** (2024-08-25)
|
||||
- Allow the telegram specification to optionally be autodetected (`PR #87 <https://github.com/ndokter/dsmr_parser/pull/87>`_ by `clonerswords <https://github.com/clonerswords>`_)
|
||||
|
||||
**1.4.2** (2024-07-14)
|
||||
|
||||
- Bump Github Actions to latest versions in favor of Node deprecations (`PR #159 <https://github.com/ndokter/dsmr_parser/pull/159>`_ by `dennissiemensma <https://github.com/dennissiemensma>`_)
|
||||
@ -167,7 +170,7 @@ Remove deprecated asyncio coroutine decorator (`PR #76 <https://github.com/ndokt
|
||||
**0.12** (2018-09-23)
|
||||
|
||||
- Add serial settings for DSMR v5.0 (`PR #31 <https://github.com/ndokter/dsmr_parser/pull/31>`_).
|
||||
- Lux-creos-obis-1.8.0 (`PR #32 <https://github.com/ndokter/dsmr_parser/pull/32>`_).
|
||||
- Lux-creos-obis-1.8.0 (`PR #32 <https://github.com/ndokter/dsmr_parser/pull/32>`_).
|
||||
|
||||
**0.11** (2017-09-18)
|
||||
|
||||
|
@ -4,3 +4,7 @@ class ParseError(Exception):
|
||||
|
||||
class InvalidChecksumError(ParseError):
|
||||
pass
|
||||
|
||||
|
||||
class TelegramSpecificationMatchError(ParseError):
|
||||
pass
|
||||
|
@ -9,7 +9,7 @@ from dlms_cosem.connection import XDlmsApduFactory
|
||||
from dlms_cosem.protocol.xdlms import GeneralGlobalCipher
|
||||
|
||||
from dsmr_parser.objects import MBusObject, MBusObjectPeak, CosemObject, ProfileGenericObject, Telegram
|
||||
from dsmr_parser.exceptions import ParseError, InvalidChecksumError
|
||||
from dsmr_parser.exceptions import ParseError, InvalidChecksumError, TelegramSpecificationMatchError
|
||||
from dsmr_parser.value_types import timestamp
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -18,20 +18,17 @@ logger = logging.getLogger(__name__)
|
||||
class TelegramParser(object):
|
||||
crc16_tab = []
|
||||
|
||||
def __init__(self, telegram_specification, apply_checksum_validation=True):
|
||||
def __init__(self, telegram_specification=None, apply_checksum_validation=True):
|
||||
"""
|
||||
:param telegram_specification: determines how the telegram is parsed
|
||||
:param telegram_specification: determines how the telegram is parsed.
|
||||
Will attempt to autodetect if omitted.
|
||||
:param apply_checksum_validation: validate checksum if applicable for
|
||||
telegram DSMR version (v4 and up).
|
||||
:type telegram_specification: dict
|
||||
"""
|
||||
self.apply_checksum_validation = apply_checksum_validation
|
||||
self.telegram_specification = telegram_specification
|
||||
# Regexes are compiled once to improve performance
|
||||
self.telegram_specification_regexes = {
|
||||
object["obis_reference"]: re.compile(object["obis_reference"], re.DOTALL | re.MULTILINE)
|
||||
for object in self.telegram_specification['objects']
|
||||
}
|
||||
self._telegram_specification_regex = None
|
||||
|
||||
def parse(self, telegram_data, encryption_key="", authentication_key="", throw_ex=False): # noqa: C901
|
||||
"""
|
||||
@ -46,6 +43,15 @@ class TelegramParser(object):
|
||||
:raises ParseError:
|
||||
:raises InvalidChecksumError:
|
||||
"""
|
||||
if not self.telegram_specification:
|
||||
self.telegram_specification = \
|
||||
match_telegram_specification(telegram_data)
|
||||
if not self._telegram_specification_regex:
|
||||
# Regexes are compiled once to improve performance
|
||||
self._telegram_specification_regexes = {
|
||||
object["obis_reference"]: re.compile(object["obis_reference"], re.DOTALL | re.MULTILINE)
|
||||
for object in self.telegram_specification['objects']
|
||||
}
|
||||
|
||||
if "general_global_cipher" in self.telegram_specification:
|
||||
if self.telegram_specification["general_global_cipher"]:
|
||||
@ -85,7 +91,7 @@ class TelegramParser(object):
|
||||
telegram = Telegram()
|
||||
|
||||
for object in self.telegram_specification['objects']:
|
||||
pattern = self.telegram_specification_regexes[object["obis_reference"]]
|
||||
pattern = self._telegram_specification_regexes[object["obis_reference"]]
|
||||
matches = pattern.findall(telegram_data)
|
||||
|
||||
# Some signatures are optional and may not be present,
|
||||
@ -173,6 +179,37 @@ class TelegramParser(object):
|
||||
return crcValue
|
||||
|
||||
|
||||
def match_telegram_specification(telegram_data):
|
||||
"""
|
||||
Find telegram specification that matches the telegram data by trying all
|
||||
specifications.
|
||||
|
||||
Could be further optimized to check the actual 0.2.8 OBIS reference which
|
||||
is available for DSMR version 4 and up.
|
||||
|
||||
:param str telegram_data: full telegram from start ('/') to checksum
|
||||
('!ABCD') including line endings in between the telegram's lines
|
||||
:return: telegram specification
|
||||
:rtype: dict
|
||||
"""
|
||||
# Prevent circular import
|
||||
from dsmr_parser import telegram_specifications
|
||||
|
||||
for specification in telegram_specifications.ALL:
|
||||
try:
|
||||
TelegramParser(specification).parse(telegram_data)
|
||||
except ParseError:
|
||||
pass
|
||||
else:
|
||||
return specification
|
||||
|
||||
raise TelegramSpecificationMatchError(
|
||||
'Could automatically match telegram specification. Make sure the data'
|
||||
'is not corrupt. Alternatively manually specify one.'
|
||||
)
|
||||
|
||||
|
||||
|
||||
class DSMRObjectParser(object):
|
||||
"""
|
||||
Parses an object (can also be see as a 'line') from a telegram.
|
||||
|
@ -486,8 +486,6 @@ V5 = {
|
||||
]
|
||||
}
|
||||
|
||||
ALL = (V2_2, V3, V4, V5)
|
||||
|
||||
BELGIUM_FLUVIUS = {
|
||||
'checksum_support': True,
|
||||
'objects': [
|
||||
@ -1368,3 +1366,17 @@ EON_HUNGARY = {
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
ALL = (
|
||||
V2_2,
|
||||
V3,
|
||||
V4,
|
||||
V5,
|
||||
BELGIUM_FLUVIUS,
|
||||
LUXEMBOURG_SMARTY,
|
||||
SWEDEN,
|
||||
SAGEMCOM_T210_D_R,
|
||||
AUSTRIA_ENERGIENETZE_STEIERMARK,
|
||||
ISKRA_IE,
|
||||
EON_HUNGARY
|
||||
)
|
2
setup.py
2
setup.py
@ -7,7 +7,7 @@ setup(
|
||||
author_email='mail@nldr.net',
|
||||
license='MIT',
|
||||
url='https://github.com/ndokter/dsmr_parser',
|
||||
version='1.4.2',
|
||||
version='1.5.0',
|
||||
packages=find_packages(exclude=('test', 'test.*')),
|
||||
install_requires=[
|
||||
'pyserial>=3,<4',
|
||||
|
29
test/test_match_telegram_specification.py
Normal file
29
test/test_match_telegram_specification.py
Normal file
@ -0,0 +1,29 @@
|
||||
import unittest
|
||||
|
||||
from dsmr_parser.exceptions import TelegramSpecificationMatchError
|
||||
from dsmr_parser.parsers import match_telegram_specification
|
||||
from dsmr_parser import telegram_specifications
|
||||
from test import example_telegrams
|
||||
|
||||
|
||||
class MatchTelegramSpecificationTest(unittest.TestCase):
|
||||
|
||||
def test_v2_2(self):
|
||||
assert match_telegram_specification(example_telegrams.TELEGRAM_V2_2) \
|
||||
== telegram_specifications.V2_2
|
||||
|
||||
def test_v3(self):
|
||||
assert match_telegram_specification(example_telegrams.TELEGRAM_V3) \
|
||||
== telegram_specifications.V3
|
||||
|
||||
def test_v4_2(self):
|
||||
assert match_telegram_specification(example_telegrams.TELEGRAM_V4_2) \
|
||||
== telegram_specifications.V4
|
||||
|
||||
def test_v5(self):
|
||||
assert match_telegram_specification(example_telegrams.TELEGRAM_V5) \
|
||||
== telegram_specifications.V5
|
||||
|
||||
def test_malformed_telegram(self):
|
||||
with self.assertRaises(TelegramSpecificationMatchError):
|
||||
match_telegram_specification(example_telegrams.TELEGRAM_V5[:-4])
|
@ -12,6 +12,15 @@ from test.example_telegrams import TELEGRAM_V2_2
|
||||
class TelegramParserV2_2Test(unittest.TestCase):
|
||||
""" Test parsing of a DSMR v2.2 telegram. """
|
||||
|
||||
def test_telegram_specification_matching(self):
|
||||
parser = TelegramParser()
|
||||
parser.parse(TELEGRAM_V2_2)
|
||||
|
||||
self.assertEqual(
|
||||
parser.telegram_specification,
|
||||
telegram_specifications.V2_2
|
||||
)
|
||||
|
||||
def test_parse(self):
|
||||
parser = TelegramParser(telegram_specifications.V2_2)
|
||||
try:
|
||||
|
@ -12,6 +12,15 @@ from test.example_telegrams import TELEGRAM_V3
|
||||
class TelegramParserV3Test(unittest.TestCase):
|
||||
""" Test parsing of a DSMR v3 telegram. """
|
||||
|
||||
def test_telegram_specification_matching(self):
|
||||
parser = TelegramParser()
|
||||
parser.parse(TELEGRAM_V3)
|
||||
|
||||
self.assertEqual(
|
||||
parser.telegram_specification,
|
||||
telegram_specifications.V3
|
||||
)
|
||||
|
||||
def test_parse(self):
|
||||
parser = TelegramParser(telegram_specifications.V3)
|
||||
try:
|
||||
|
@ -15,6 +15,15 @@ from test.example_telegrams import TELEGRAM_V4_2
|
||||
class TelegramParserV4_2Test(unittest.TestCase):
|
||||
""" Test parsing of a DSMR v4.2 telegram. """
|
||||
|
||||
def test_telegram_specification_matching(self):
|
||||
parser = TelegramParser()
|
||||
parser.parse(TELEGRAM_V4_2)
|
||||
|
||||
self.assertEqual(
|
||||
parser.telegram_specification,
|
||||
telegram_specifications.V4
|
||||
)
|
||||
|
||||
def test_parse(self):
|
||||
parser = TelegramParser(telegram_specifications.V4)
|
||||
try:
|
||||
|
@ -15,6 +15,15 @@ from test.example_telegrams import TELEGRAM_V5
|
||||
class TelegramParserV5Test(unittest.TestCase):
|
||||
""" Test parsing of a DSMR v5.x telegram. """
|
||||
|
||||
def test_telegram_specification_matching(self):
|
||||
parser = TelegramParser()
|
||||
parser.parse(TELEGRAM_V5)
|
||||
|
||||
self.assertEqual(
|
||||
parser.telegram_specification,
|
||||
telegram_specifications.V5
|
||||
)
|
||||
|
||||
def test_parse(self):
|
||||
parser = TelegramParser(telegram_specifications.V5)
|
||||
try:
|
||||
|
Loading…
Reference in New Issue
Block a user