Compare commits
	
		
			4 Commits
		
	
	
		
			master
			...
			autodetect
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 98652418cc | |||
|  | d216996fe6 | ||
|  | aef37837c5 | ||
|  | 6305d3d37f | 
| @ -1,6 +1,9 @@ | ||||
| Change Log | ||||
| ---------- | ||||
| 
 | ||||
| **1.5.0** (2024-08-25) | ||||
| - Allow the telegram specification to optionally be autodetected (`PR #87 <https://github.com/ndokter/dsmr_parser/pull/87>`_ by `clonerswords <https://github.com/clonerswords>`_) | ||||
| 
 | ||||
| **1.4.2** (2024-07-14) | ||||
| 
 | ||||
| - Bump Github Actions to latest versions in favor of Node deprecations (`PR #159 <https://github.com/ndokter/dsmr_parser/pull/159>`_ by `dennissiemensma <https://github.com/dennissiemensma>`_) | ||||
|  | ||||
| @ -4,3 +4,7 @@ class ParseError(Exception): | ||||
| 
 | ||||
| class InvalidChecksumError(ParseError): | ||||
|     pass | ||||
| 
 | ||||
| 
 | ||||
| class TelegramSpecificationMatchError(ParseError): | ||||
|     pass | ||||
|  | ||||
| @ -9,7 +9,7 @@ from dlms_cosem.connection import XDlmsApduFactory | ||||
| from dlms_cosem.protocol.xdlms import GeneralGlobalCipher | ||||
| 
 | ||||
| from dsmr_parser.objects import MBusObject, MBusObjectPeak, CosemObject, ProfileGenericObject, Telegram | ||||
| from dsmr_parser.exceptions import ParseError, InvalidChecksumError | ||||
| from dsmr_parser.exceptions import ParseError, InvalidChecksumError, TelegramSpecificationMatchError | ||||
| from dsmr_parser.value_types import timestamp | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| @ -18,20 +18,17 @@ logger = logging.getLogger(__name__) | ||||
| class TelegramParser(object): | ||||
|     crc16_tab = [] | ||||
| 
 | ||||
|     def __init__(self, telegram_specification, apply_checksum_validation=True): | ||||
|     def __init__(self, telegram_specification=None, apply_checksum_validation=True): | ||||
|         """ | ||||
|         :param telegram_specification: determines how the telegram is parsed | ||||
|         :param telegram_specification: determines how the telegram is parsed. | ||||
|             Will attempt to autodetect if omitted. | ||||
|         :param apply_checksum_validation: validate checksum if applicable for | ||||
|             telegram DSMR version (v4 and up). | ||||
|         :type telegram_specification: dict | ||||
|         """ | ||||
|         self.apply_checksum_validation = apply_checksum_validation | ||||
|         self.telegram_specification = telegram_specification | ||||
|         # Regexes are compiled once to improve performance | ||||
|         self.telegram_specification_regexes = { | ||||
|             object["obis_reference"]: re.compile(object["obis_reference"], re.DOTALL | re.MULTILINE) | ||||
|             for object in self.telegram_specification['objects'] | ||||
|         } | ||||
|         self._telegram_specification_regex = None | ||||
| 
 | ||||
|     def parse(self, telegram_data, encryption_key="", authentication_key="", throw_ex=False):  # noqa: C901 | ||||
|         """ | ||||
| @ -46,6 +43,15 @@ class TelegramParser(object): | ||||
|         :raises ParseError: | ||||
|         :raises InvalidChecksumError: | ||||
|         """ | ||||
|         if not self.telegram_specification: | ||||
|             self.telegram_specification = \ | ||||
|                 match_telegram_specification(telegram_data) | ||||
|         if not self._telegram_specification_regex: | ||||
|             # Regexes are compiled once to improve performance | ||||
|             self._telegram_specification_regexes = { | ||||
|                 object["obis_reference"]: re.compile(object["obis_reference"], re.DOTALL | re.MULTILINE) | ||||
|                 for object in self.telegram_specification['objects'] | ||||
|             } | ||||
| 
 | ||||
|         if "general_global_cipher" in self.telegram_specification: | ||||
|             if self.telegram_specification["general_global_cipher"]: | ||||
| @ -85,7 +91,7 @@ class TelegramParser(object): | ||||
|         telegram = Telegram() | ||||
| 
 | ||||
|         for object in self.telegram_specification['objects']: | ||||
|             pattern = self.telegram_specification_regexes[object["obis_reference"]] | ||||
|             pattern = self._telegram_specification_regexes[object["obis_reference"]] | ||||
|             matches = pattern.findall(telegram_data) | ||||
| 
 | ||||
|             # Some signatures are optional and may not be present, | ||||
| @ -173,6 +179,37 @@ class TelegramParser(object): | ||||
|         return crcValue | ||||
| 
 | ||||
| 
 | ||||
| def match_telegram_specification(telegram_data): | ||||
|     """ | ||||
|     Find telegram specification that matches the telegram data by trying all | ||||
|     specifications. | ||||
| 
 | ||||
|     Could be further optimized to check the actual 0.2.8 OBIS reference which | ||||
|     is available for DSMR version 4 and up. | ||||
| 
 | ||||
|     :param str telegram_data: full telegram from start ('/') to checksum | ||||
|             ('!ABCD') including line endings in between the telegram's lines | ||||
|     :return: telegram specification | ||||
|     :rtype: dict | ||||
|     """ | ||||
|     # Prevent circular import | ||||
|     from dsmr_parser import telegram_specifications | ||||
| 
 | ||||
|     for specification in telegram_specifications.ALL: | ||||
|         try: | ||||
|             TelegramParser(specification).parse(telegram_data) | ||||
|         except ParseError: | ||||
|             pass | ||||
|         else: | ||||
|             return specification | ||||
| 
 | ||||
|     raise TelegramSpecificationMatchError( | ||||
|         'Could automatically match telegram specification. Make sure the data' | ||||
|         'is not corrupt. Alternatively manually specify one.' | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| class DSMRObjectParser(object): | ||||
|     """ | ||||
|     Parses an object (can also be see as a 'line') from a telegram. | ||||
|  | ||||
| @ -486,8 +486,6 @@ V5 = { | ||||
|     ] | ||||
| } | ||||
| 
 | ||||
| ALL = (V2_2, V3, V4, V5) | ||||
| 
 | ||||
| BELGIUM_FLUVIUS = { | ||||
|     'checksum_support': True, | ||||
|     'objects': [ | ||||
| @ -1368,3 +1366,17 @@ EON_HUNGARY = { | ||||
|         } | ||||
|     ] | ||||
| } | ||||
| 
 | ||||
| ALL = ( | ||||
|     V2_2, | ||||
|     V3, | ||||
|     V4, | ||||
|     V5, | ||||
|     BELGIUM_FLUVIUS, | ||||
|     LUXEMBOURG_SMARTY, | ||||
|     SWEDEN, | ||||
|     SAGEMCOM_T210_D_R, | ||||
|     AUSTRIA_ENERGIENETZE_STEIERMARK, | ||||
|     ISKRA_IE, | ||||
|     EON_HUNGARY | ||||
| ) | ||||
							
								
								
									
										2
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								setup.py
									
									
									
									
									
								
							| @ -7,7 +7,7 @@ setup( | ||||
|     author_email='mail@nldr.net', | ||||
|     license='MIT', | ||||
|     url='https://github.com/ndokter/dsmr_parser', | ||||
|     version='1.4.2', | ||||
|     version='1.5.0', | ||||
|     packages=find_packages(exclude=('test', 'test.*')), | ||||
|     install_requires=[ | ||||
|         'pyserial>=3,<4', | ||||
|  | ||||
							
								
								
									
										29
									
								
								test/test_match_telegram_specification.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										29
									
								
								test/test_match_telegram_specification.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,29 @@ | ||||
| import unittest | ||||
| 
 | ||||
| from dsmr_parser.exceptions import TelegramSpecificationMatchError | ||||
| from dsmr_parser.parsers import match_telegram_specification | ||||
| from dsmr_parser import telegram_specifications | ||||
| from test import example_telegrams | ||||
| 
 | ||||
| 
 | ||||
| class MatchTelegramSpecificationTest(unittest.TestCase): | ||||
| 
 | ||||
|     def test_v2_2(self): | ||||
|         assert match_telegram_specification(example_telegrams.TELEGRAM_V2_2) \ | ||||
|            == telegram_specifications.V2_2 | ||||
| 
 | ||||
|     def test_v3(self): | ||||
|         assert match_telegram_specification(example_telegrams.TELEGRAM_V3) \ | ||||
|            == telegram_specifications.V3 | ||||
| 
 | ||||
|     def test_v4_2(self): | ||||
|         assert match_telegram_specification(example_telegrams.TELEGRAM_V4_2) \ | ||||
|            == telegram_specifications.V4 | ||||
| 
 | ||||
|     def test_v5(self): | ||||
|         assert match_telegram_specification(example_telegrams.TELEGRAM_V5) \ | ||||
|            == telegram_specifications.V5 | ||||
| 
 | ||||
|     def test_malformed_telegram(self): | ||||
|         with self.assertRaises(TelegramSpecificationMatchError): | ||||
|             match_telegram_specification(example_telegrams.TELEGRAM_V5[:-4]) | ||||
| @ -12,6 +12,15 @@ from test.example_telegrams import TELEGRAM_V2_2 | ||||
| class TelegramParserV2_2Test(unittest.TestCase): | ||||
|     """ Test parsing of a DSMR v2.2 telegram. """ | ||||
| 
 | ||||
|     def test_telegram_specification_matching(self): | ||||
|         parser = TelegramParser() | ||||
|         parser.parse(TELEGRAM_V2_2) | ||||
| 
 | ||||
|         self.assertEqual( | ||||
|             parser.telegram_specification, | ||||
|             telegram_specifications.V2_2 | ||||
|         ) | ||||
| 
 | ||||
|     def test_parse(self): | ||||
|         parser = TelegramParser(telegram_specifications.V2_2) | ||||
|         try: | ||||
|  | ||||
| @ -12,6 +12,15 @@ from test.example_telegrams import TELEGRAM_V3 | ||||
| class TelegramParserV3Test(unittest.TestCase): | ||||
|     """ Test parsing of a DSMR v3 telegram. """ | ||||
| 
 | ||||
|     def test_telegram_specification_matching(self): | ||||
|         parser = TelegramParser() | ||||
|         parser.parse(TELEGRAM_V3) | ||||
| 
 | ||||
|         self.assertEqual( | ||||
|             parser.telegram_specification, | ||||
|             telegram_specifications.V3 | ||||
|         ) | ||||
| 
 | ||||
|     def test_parse(self): | ||||
|         parser = TelegramParser(telegram_specifications.V3) | ||||
|         try: | ||||
|  | ||||
| @ -15,6 +15,15 @@ from test.example_telegrams import TELEGRAM_V4_2 | ||||
| class TelegramParserV4_2Test(unittest.TestCase): | ||||
|     """ Test parsing of a DSMR v4.2 telegram. """ | ||||
| 
 | ||||
|     def test_telegram_specification_matching(self): | ||||
|         parser = TelegramParser() | ||||
|         parser.parse(TELEGRAM_V4_2) | ||||
| 
 | ||||
|         self.assertEqual( | ||||
|             parser.telegram_specification, | ||||
|             telegram_specifications.V4 | ||||
|         ) | ||||
| 
 | ||||
|     def test_parse(self): | ||||
|         parser = TelegramParser(telegram_specifications.V4) | ||||
|         try: | ||||
|  | ||||
| @ -15,6 +15,15 @@ from test.example_telegrams import TELEGRAM_V5 | ||||
| class TelegramParserV5Test(unittest.TestCase): | ||||
|     """ Test parsing of a DSMR v5.x telegram. """ | ||||
| 
 | ||||
|     def test_telegram_specification_matching(self): | ||||
|         parser = TelegramParser() | ||||
|         parser.parse(TELEGRAM_V5) | ||||
| 
 | ||||
|         self.assertEqual( | ||||
|             parser.telegram_specification, | ||||
|             telegram_specifications.V5 | ||||
|         ) | ||||
| 
 | ||||
|     def test_parse(self): | ||||
|         parser = TelegramParser(telegram_specifications.V5) | ||||
|         try: | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user