From 31523046791b96f719f5441e380eae320c4e6ed6 Mon Sep 17 00:00:00 2001 From: Nigel Dokter Date: Wed, 8 Feb 2023 19:39:10 +0100 Subject: [PATCH] issue-51-telegram work in progress --- CHANGELOG.rst | 1 - README.rst | 25 +++++++-- dsmr_parser/objects.py | 88 ++++++++++++++++++++++++------- dsmr_parser/parsers.py | 24 ++++----- test/test_device.py | 23 ++++++++ test/test_telegram.py | 116 ++++++++++++++++++++++++++--------------- 6 files changed, 200 insertions(+), 77 deletions(-) create mode 100644 test/test_device.py diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 588657e..8eda744 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -5,7 +5,6 @@ Change Log - Add instantaneous reactive power + fixed swapped reactive total import export (`pull request #124 `_ by `yada75 `_) - **1.0.0** (2022-12-22) - switched to new numbering scheme https://semver.org/ diff --git a/README.rst b/README.rst index 75e572c..a19d2f6 100644 --- a/README.rst +++ b/README.rst @@ -16,7 +16,6 @@ Features DSMR Parser supports DSMR versions 2, 3, 4 and 5. See for the `currently supported/tested Python versions here `_. - Client module usage ------------------- @@ -264,10 +263,30 @@ Example to get some of the values: # See dsmr_reader.obis_references for all readable telegram values. # Note that the available values differ per DSMR version. -Telegram as an Object +Telegram object --------------------- -An object version of the telegram is available as well. +.. code-block:: python + + # DSMR v5 telegram example + from dsmr_parser import telegram_specifications + from dsmr_parser.parsers import TelegramParser + from test.example_telegrams import TELEGRAM_V5 + + parser = TelegramParser(telegram_specifications.V5) + telegram = parser.parse(TELEGRAM_V5) + + # Get telegram message timestamp. + telegram.get(obis_references.P1_MESSAGE_TIMESTAMP) + + # Get current electricity usage + telegram.get(obis_references.CURRENT_ELECTRICITY_USAGE) + + # Get gas meter readings. Note that this returns a list if multiple gas meter readings are found. + # These gas reading have a channel attribute that can be used to filter them. Or you can supply a channel + # as an argument: + gas_readings = telegram.get(obis_references.HOURLY_GAS_METER_READING) + gas_reading_channel_1 = telegram.get(obis_references.HOURLY_GAS_METER_READING, channel=1) .. code-block:: python diff --git a/dsmr_parser/objects.py b/dsmr_parser/objects.py index ffc9b55..7f85fc8 100644 --- a/dsmr_parser/objects.py +++ b/dsmr_parser/objects.py @@ -20,6 +20,9 @@ class Telegram(object): """ def __init__(self): self._telegram_data = defaultdict(list) + self._mbus_devices = defaultdict(MbusDevice) + + # Reverse name mapping and attribute related: self._obis_name_mapping = dsmr_parser.obis_name_mapping.EN self._reverse_obis_name_mapping = dsmr_parser.obis_name_mapping.REVERSE_EN self._item_names = [] @@ -30,21 +33,43 @@ class Telegram(object): # Update name mapping used to get value by attribute. Example: telegram.P1_MESSAGE_HEADER self._item_names.append(self._obis_name_mapping[obis_reference]) - def get(self, obis_reference, channel=None): - """ - Get value by OBIS reference (regex). If multiple values exist a list is returned, unless filtering by channel. - May assume that values are sorted by channel. - """ - if channel is None: - try: - return self._telegram_data[obis_reference] - except KeyError: - raise LookupError('No value found for OBIS reference "{}"'.format(obis_reference)) + # Detect Mbus readingsusing obis id+channel and group these into MbusDevice + if dsmr_object.is_mbus_reading: + channel_id = dsmr_object.obis_id_code[1] + mbus_device = self._mbus_devices[channel_id] + mbus_device.add(obis_reference, dsmr_object) - try: - return [v for v in self._telegram_data[obis_reference] if v.channel == channel][0] - except IndexError: - raise LookupError('No value found for OBIS reference "{}" on channel "{}"'.format(obis_reference, channel)) + def get_mbus_devices(self): + """ + Return MbusDevice objects which are used for water, heat and gas meters. + """ + # TODO sort by channel ID + return list(self._mbus_devices.values()) + + def get_mbus_device_by_channel(self, channel_id=None): + return self._mbus_devices[channel_id] + + # # TODO devices groeperen. alle values van dat channel daar in groeperen en wrappen in device object gebruik makende van device id + # def get(self, obis_reference, channel=None): + # """ + # Get values by OBIS reference (regex). If multiple values exist a list is returned, unless filtering by channel. + # May assume that values are sorted by channel. + # """ + # if channel is None: + # try: + # values = self._telegram_data[obis_reference] + # except KeyError: + # raise LookupError('No value found for OBIS reference "{}"'.format(obis_reference)) + # + # if len(values) == 1: + # return values[0] + # else: + # return values + # + # try: + # return [v for v in self._telegram_data[obis_reference] if v.channel == channel][0] + # except IndexError: + # raise LookupError('No value found for OBIS reference "{}" on channel "{}"'.format(obis_reference, channel)) def __getattr__(self, name): """ will only get called for undefined attributes """ @@ -88,11 +113,16 @@ class DSMRObject(object): """ Represents all data from a single telegram line. """ - - def __init__(self, channel, values): - self.channel = channel # TODO consider if only MBus should have channels + def __init__(self, obis_id_code, values): + self.obis_id_code = obis_id_code self.values = values + @property + def is_mbus_reading(self): + """ Detect Mbus related readings using obis id + channel """ + obis_id, channel_id = self.obis_id_code + + return obis_id == 0 and channel_id != 0 class MBusObject(DSMRObject): @@ -255,7 +285,7 @@ class ProfileGenericObject(DSMRObject): offset = values_offset + i * 2 self._buffer_list.append( MBusObject( - channel=self.channel, + obis_id_code=self.obis_id_code, values=[self.values[offset], self.values[offset + 1]] ) ) @@ -294,3 +324,25 @@ class ProfileGenericObject(DSMRObject): list.append(['buffer', buffer_repr]) output = dict(list) return json.dumps(output) + + +class MbusDevice: + + def __init__(self): + self._telegram_data = {} + self._obis_name_mapping = dsmr_parser.obis_name_mapping.EN + self._reverse_obis_name_mapping = dsmr_parser.obis_name_mapping.REVERSE_EN + self._item_names = [] + + def add(self, obis_reference, dsmr_object): + self._telegram_data[obis_reference] = dsmr_object + + # Update name mapping used to get value by attribute. Example: telegram.P1_MESSAGE_HEADER + self._item_names.append(self._obis_name_mapping[obis_reference]) + + def __getattr__(self, name): + """ will only get called for undefined attributes """ + obis_reference = self._reverse_obis_name_mapping[name] + value = self._telegram_data[obis_reference] + setattr(self, name, value) + return value diff --git a/dsmr_parser/parsers.py b/dsmr_parser/parsers.py index bff6ed3..c80de39 100644 --- a/dsmr_parser/parsers.py +++ b/dsmr_parser/parsers.py @@ -174,19 +174,19 @@ class DSMRObjectParser(object): return [self.value_formats[i].parse(value) for i, value in enumerate(values)] - def _parse_channel(self, line): + def _parse_obis_id_code(self, line): """ - Get the channel identifier of a line. + Get the OBIS ID code - Line format: + Example line: '0-2:24.2.1(200426223001S)(00246.138*m3)' - ^ - channel + + OBIS ID code = 0-2 returned as tuple """ try: - return int(line[2]) + return int(line[0]), int(line[2]) except ValueError: - raise ParseError("Invalid channel for line '%s' in '%s'", line, self) + raise ParseError("Invalid OBIS ID code for line '%s' in '%s'", line, self) def _parse(self, line): # Match value groups, but exclude the parentheses @@ -222,7 +222,7 @@ class MBusParser(DSMRObjectParser): def parse(self, line): return MBusObject( - channel=self._parse_channel(line), + obis_id_code=self._parse_obis_id_code(line), values=self._parse(line) ) @@ -252,7 +252,7 @@ class MaxDemandParser(DSMRObjectParser): pattern = re.compile(r'((?<=\()[0-9a-zA-Z\.\*\-\:]{0,}(?=\)))') values = re.findall(pattern, line) - channel = self._parse_channel(line) + obis_id_code = self._parse_obis_id_code(line) objects = [] @@ -262,7 +262,7 @@ class MaxDemandParser(DSMRObjectParser): timestamp_occurred = ValueParser(timestamp).parse(values[i * 3 + 1]) value = ValueParser(Decimal).parse(values[i * 3 + 2]) objects.append(MBusObjectPeak( - channel=channel, + obis_id_code=obis_id_code, values=[timestamp_month, timestamp_occurred, value] )) @@ -291,7 +291,7 @@ class CosemParser(DSMRObjectParser): def parse(self, line): return CosemObject( - channel=self._parse_channel(line), + obis_id_code=self._parse_obis_id_code(line), values=self._parse(line) ) @@ -353,7 +353,7 @@ class ProfileGenericParser(DSMRObjectParser): def parse(self, line): return ProfileGenericObject( - channel=self._parse_channel(line), + obis_id_code=self._parse_obis_id_code(line), values=self._parse(line) ) diff --git a/test/test_device.py b/test/test_device.py new file mode 100644 index 0000000..2b3b73f --- /dev/null +++ b/test/test_device.py @@ -0,0 +1,23 @@ +import unittest +import datetime +import pytz + +from dsmr_parser import telegram_specifications +from dsmr_parser import obis_name_mapping +from dsmr_parser import obis_references as obis +from dsmr_parser.objects import CosemObject, MbusDevice +from dsmr_parser.objects import MBusObject +from dsmr_parser.objects import ProfileGenericObject +from dsmr_parser.parsers import TelegramParser +from test.example_telegrams import TELEGRAM_V4_2, TELEGRAM_V5_TWO_MBUS +from decimal import Decimal + + +class DeviceObjectTest(unittest.TestCase): + + def test_tmp(self): + parser = TelegramParser(telegram_specifications.V5) + telegram = parser.parse(TELEGRAM_V5_TWO_MBUS) + # print('val: ', telegram.HOURLY_GAS_METER_READING) + + device = MbusDevice() diff --git a/test/test_telegram.py b/test/test_telegram.py index 3b91b0c..a380b9e 100644 --- a/test/test_telegram.py +++ b/test/test_telegram.py @@ -5,7 +5,7 @@ import pytz from dsmr_parser import telegram_specifications from dsmr_parser import obis_name_mapping from dsmr_parser import obis_references as obis -from dsmr_parser.objects import CosemObject +from dsmr_parser.objects import CosemObject, MbusDevice from dsmr_parser.objects import MBusObject from dsmr_parser.objects import ProfileGenericObject from dsmr_parser.parsers import TelegramParser @@ -327,54 +327,84 @@ class TelegramTest(unittest.TestCase): self.assertEqual(len(telegram), 35) - def test_get(self): - """ Retrieve MBUS device without supplying channel which fetches all (two) records found. """ + # def test_get(self): + # """ Retrieve MBUS device without supplying channel which fetches all (two) records found. """ + # parser = TelegramParser(telegram_specifications.V5) + # telegram = parser.parse(TELEGRAM_V5_TWO_MBUS) + # + # # A single value is returned for the current electricity usage + # electricity_used_value = telegram.get(obis.CURRENT_ELECTRICITY_USAGE) + # self.assertEqual(type(electricity_used_value), CosemObject) + # self.assertEqual(electricity_used_value.channel, 0) + # self.assertEqual(electricity_used_value.value, Decimal('0.111')) + # + # # Multiple values are returned for the gas reading + # gas_values = telegram.get(obis.HOURLY_GAS_METER_READING) + # self.assertEqual(len(gas_values), 2) + # + # gas_value_1 = gas_values[0] + # self.assertEqual(type(gas_value_1), MBusObject) + # self.assertEqual(gas_value_1.channel, 1) + # self.assertEqual(gas_value_1.value, 0) + # + # gas_value_2 = gas_values[1] + # self.assertEqual(type(gas_value_2), MBusObject) + # self.assertEqual(gas_value_2.channel, 2) + # self.assertEqual(gas_value_2.value, Decimal('246.138')) + # + # def test_get_with_channel(self): + # parser = TelegramParser(telegram_specifications.V5) + # telegram = parser.parse(TELEGRAM_V5_TWO_MBUS) + # + # gas_value_1 = telegram.get(obis.HOURLY_GAS_METER_READING, channel=1) + # gas_value_2 = telegram.get(obis.HOURLY_GAS_METER_READING, channel=2) + # + # self.assertEqual(type(gas_value_1), MBusObject) + # self.assertEqual(gas_value_1.channel, 1) + # self.assertEqual(gas_value_1.value, 0) + # + # self.assertEqual(type(gas_value_2), MBusObject) + # self.assertEqual(gas_value_2.channel, 2) + # self.assertEqual(gas_value_2.value, Decimal('246.138')) + # + # def test_get_unknown_value(self): + # """ Retrieve MBUS device without supplying channel which fetches the first MBUS record found """ + # parser = TelegramParser(telegram_specifications.V5) + # telegram = parser.parse(TELEGRAM_V5_TWO_MBUS) + # + # # Test valid OBIS reference with wrong channel + # with self.assertRaises(LookupError) as exception_context: + # telegram.get(obis.HOURLY_GAS_METER_READING, channel=123) + # + # self.assertEqual( + # str(exception_context.exception), + # 'No value found for OBIS reference "\\d-\\d:24\\.2\\.1.+?\\r\\n" on channel "123"' + # ) + # + # # Test invalid OBIS reference + # with self.assertRaises(LookupError): + # telegram.get('invalid_obis_reference', channel=1) + + # TODO + def test_get_mbus_devices(self): parser = TelegramParser(telegram_specifications.V5) telegram = parser.parse(TELEGRAM_V5_TWO_MBUS) - gas_values = telegram.get(obis.HOURLY_GAS_METER_READING) + mbus_devices = telegram.get_mbus_devices() + print(mbus_devices) + self.assertEqual(len(mbus_devices), 2) - self.assertEqual(len(gas_values), 2) + mbus_device_1 = mbus_devices[0] + self.assertEqual(type(mbus_device_1), MbusDevice) + print('mbus_device_1.HOURLY_GAS_METER_READING: ', mbus_device_1.HOURLY_GAS_METER_READING) - gas_value_1 = gas_values[0] - self.assertEqual(type(gas_value_1), MBusObject) - self.assertEqual(gas_value_1.channel, 1) - self.assertEqual(gas_value_1.value, 0) + mbus_device_2 = mbus_devices[1] + self.assertEqual(type(mbus_device_2), MbusDevice) + print('mbus_device_2.HOURLY_GAS_METER_READING: ', mbus_device_2.HOURLY_GAS_METER_READING) - gas_value_2 = gas_values[1] - self.assertEqual(type(gas_value_2), MBusObject) - self.assertEqual(gas_value_2.channel, 2) - self.assertEqual(gas_value_2.value, Decimal('246.138')) - - def test_get_with_channel(self): + # TODO + def test_get_mbus_device_by_channel(self): parser = TelegramParser(telegram_specifications.V5) telegram = parser.parse(TELEGRAM_V5_TWO_MBUS) - gas_value_1 = telegram.get(obis.HOURLY_GAS_METER_READING, channel=1) - gas_value_2 = telegram.get(obis.HOURLY_GAS_METER_READING, channel=2) - - self.assertEqual(type(gas_value_1), MBusObject) - self.assertEqual(gas_value_1.channel, 1) - self.assertEqual(gas_value_1.value, 0) - - self.assertEqual(type(gas_value_2), MBusObject) - self.assertEqual(gas_value_2.channel, 2) - self.assertEqual(gas_value_2.value, Decimal('246.138')) - - def test_get_unknown_value(self): - """ Retrieve MBUS device without supplying channel which fetches the first MBUS record found """ - parser = TelegramParser(telegram_specifications.V5) - telegram = parser.parse(TELEGRAM_V5_TWO_MBUS) - - # Test valid OBIS reference with wrong channel - with self.assertRaises(LookupError) as exception_context: - telegram.get(obis.HOURLY_GAS_METER_READING, channel=123) - - self.assertEqual( - str(exception_context.exception), - 'No value found for OBIS reference "\\d-\\d:24\\.2\\.1.+?\\r\\n" on channel "123"' - ) - - # Test invalid OBIS reference - with self.assertRaises(LookupError): - telegram.get('invalid_obis_reference', channel=1) + print('by channel: ', telegram.get_mbus_device_by_channel(2).HOURLY_GAS_METER_READING)