issue-51-telegram work in progress

This commit is contained in:
Nigel Dokter 2023-02-08 19:39:10 +01:00
parent 98df0922e9
commit 3152304679
6 changed files with 200 additions and 77 deletions

View File

@ -5,7 +5,6 @@ Change Log
- Add instantaneous reactive power + fixed swapped reactive total import export (`pull request #124 <https://github.com/ndokter/dsmr_parser/pull/124>`_ by `yada75 <https://github.com/yada75>`_)
**1.0.0** (2022-12-22)
- switched to new numbering scheme https://semver.org/

View File

@ -16,7 +16,6 @@ Features
DSMR Parser supports DSMR versions 2, 3, 4 and 5. See for the `currently supported/tested Python versions here <https://github.com/ndokter/dsmr_parser/blob/master/.github/workflows/tests.yml#L14>`_.
Client module usage
-------------------
@ -264,10 +263,30 @@ Example to get some of the values:
# See dsmr_reader.obis_references for all readable telegram values.
# Note that the available values differ per DSMR version.
Telegram as an Object
Telegram object
---------------------
An object version of the telegram is available as well.
.. code-block:: python
# DSMR v5 telegram example
from dsmr_parser import telegram_specifications
from dsmr_parser.parsers import TelegramParser
from test.example_telegrams import TELEGRAM_V5
parser = TelegramParser(telegram_specifications.V5)
telegram = parser.parse(TELEGRAM_V5)
# Get telegram message timestamp.
telegram.get(obis_references.P1_MESSAGE_TIMESTAMP)
# Get current electricity usage
telegram.get(obis_references.CURRENT_ELECTRICITY_USAGE)
# Get gas meter readings. Note that this returns a list if multiple gas meter readings are found.
# These gas reading have a channel attribute that can be used to filter them. Or you can supply a channel
# as an argument:
gas_readings = telegram.get(obis_references.HOURLY_GAS_METER_READING)
gas_reading_channel_1 = telegram.get(obis_references.HOURLY_GAS_METER_READING, channel=1)
.. code-block:: python

View File

@ -20,6 +20,9 @@ class Telegram(object):
"""
def __init__(self):
self._telegram_data = defaultdict(list)
self._mbus_devices = defaultdict(MbusDevice)
# Reverse name mapping and attribute related:
self._obis_name_mapping = dsmr_parser.obis_name_mapping.EN
self._reverse_obis_name_mapping = dsmr_parser.obis_name_mapping.REVERSE_EN
self._item_names = []
@ -30,21 +33,43 @@ class Telegram(object):
# Update name mapping used to get value by attribute. Example: telegram.P1_MESSAGE_HEADER
self._item_names.append(self._obis_name_mapping[obis_reference])
def get(self, obis_reference, channel=None):
"""
Get value by OBIS reference (regex). If multiple values exist a list is returned, unless filtering by channel.
May assume that values are sorted by channel.
"""
if channel is None:
try:
return self._telegram_data[obis_reference]
except KeyError:
raise LookupError('No value found for OBIS reference "{}"'.format(obis_reference))
# Detect Mbus readingsusing obis id+channel and group these into MbusDevice
if dsmr_object.is_mbus_reading:
channel_id = dsmr_object.obis_id_code[1]
mbus_device = self._mbus_devices[channel_id]
mbus_device.add(obis_reference, dsmr_object)
try:
return [v for v in self._telegram_data[obis_reference] if v.channel == channel][0]
except IndexError:
raise LookupError('No value found for OBIS reference "{}" on channel "{}"'.format(obis_reference, channel))
def get_mbus_devices(self):
"""
Return MbusDevice objects which are used for water, heat and gas meters.
"""
# TODO sort by channel ID
return list(self._mbus_devices.values())
def get_mbus_device_by_channel(self, channel_id=None):
return self._mbus_devices[channel_id]
# # TODO devices groeperen. alle values van dat channel daar in groeperen en wrappen in device object gebruik makende van device id
# def get(self, obis_reference, channel=None):
# """
# Get values by OBIS reference (regex). If multiple values exist a list is returned, unless filtering by channel.
# May assume that values are sorted by channel.
# """
# if channel is None:
# try:
# values = self._telegram_data[obis_reference]
# except KeyError:
# raise LookupError('No value found for OBIS reference "{}"'.format(obis_reference))
#
# if len(values) == 1:
# return values[0]
# else:
# return values
#
# try:
# return [v for v in self._telegram_data[obis_reference] if v.channel == channel][0]
# except IndexError:
# raise LookupError('No value found for OBIS reference "{}" on channel "{}"'.format(obis_reference, channel))
def __getattr__(self, name):
""" will only get called for undefined attributes """
@ -88,11 +113,16 @@ class DSMRObject(object):
"""
Represents all data from a single telegram line.
"""
def __init__(self, channel, values):
self.channel = channel # TODO consider if only MBus should have channels
def __init__(self, obis_id_code, values):
self.obis_id_code = obis_id_code
self.values = values
@property
def is_mbus_reading(self):
""" Detect Mbus related readings using obis id + channel """
obis_id, channel_id = self.obis_id_code
return obis_id == 0 and channel_id != 0
class MBusObject(DSMRObject):
@ -255,7 +285,7 @@ class ProfileGenericObject(DSMRObject):
offset = values_offset + i * 2
self._buffer_list.append(
MBusObject(
channel=self.channel,
obis_id_code=self.obis_id_code,
values=[self.values[offset], self.values[offset + 1]]
)
)
@ -294,3 +324,25 @@ class ProfileGenericObject(DSMRObject):
list.append(['buffer', buffer_repr])
output = dict(list)
return json.dumps(output)
class MbusDevice:
def __init__(self):
self._telegram_data = {}
self._obis_name_mapping = dsmr_parser.obis_name_mapping.EN
self._reverse_obis_name_mapping = dsmr_parser.obis_name_mapping.REVERSE_EN
self._item_names = []
def add(self, obis_reference, dsmr_object):
self._telegram_data[obis_reference] = dsmr_object
# Update name mapping used to get value by attribute. Example: telegram.P1_MESSAGE_HEADER
self._item_names.append(self._obis_name_mapping[obis_reference])
def __getattr__(self, name):
""" will only get called for undefined attributes """
obis_reference = self._reverse_obis_name_mapping[name]
value = self._telegram_data[obis_reference]
setattr(self, name, value)
return value

View File

@ -174,19 +174,19 @@ class DSMRObjectParser(object):
return [self.value_formats[i].parse(value)
for i, value in enumerate(values)]
def _parse_channel(self, line):
def _parse_obis_id_code(self, line):
"""
Get the channel identifier of a line.
Get the OBIS ID code
Line format:
Example line:
'0-2:24.2.1(200426223001S)(00246.138*m3)'
^
channel
OBIS ID code = 0-2 returned as tuple
"""
try:
return int(line[2])
return int(line[0]), int(line[2])
except ValueError:
raise ParseError("Invalid channel for line '%s' in '%s'", line, self)
raise ParseError("Invalid OBIS ID code for line '%s' in '%s'", line, self)
def _parse(self, line):
# Match value groups, but exclude the parentheses
@ -222,7 +222,7 @@ class MBusParser(DSMRObjectParser):
def parse(self, line):
return MBusObject(
channel=self._parse_channel(line),
obis_id_code=self._parse_obis_id_code(line),
values=self._parse(line)
)
@ -252,7 +252,7 @@ class MaxDemandParser(DSMRObjectParser):
pattern = re.compile(r'((?<=\()[0-9a-zA-Z\.\*\-\:]{0,}(?=\)))')
values = re.findall(pattern, line)
channel = self._parse_channel(line)
obis_id_code = self._parse_obis_id_code(line)
objects = []
@ -262,7 +262,7 @@ class MaxDemandParser(DSMRObjectParser):
timestamp_occurred = ValueParser(timestamp).parse(values[i * 3 + 1])
value = ValueParser(Decimal).parse(values[i * 3 + 2])
objects.append(MBusObjectPeak(
channel=channel,
obis_id_code=obis_id_code,
values=[timestamp_month, timestamp_occurred, value]
))
@ -291,7 +291,7 @@ class CosemParser(DSMRObjectParser):
def parse(self, line):
return CosemObject(
channel=self._parse_channel(line),
obis_id_code=self._parse_obis_id_code(line),
values=self._parse(line)
)
@ -353,7 +353,7 @@ class ProfileGenericParser(DSMRObjectParser):
def parse(self, line):
return ProfileGenericObject(
channel=self._parse_channel(line),
obis_id_code=self._parse_obis_id_code(line),
values=self._parse(line)
)

23
test/test_device.py Normal file
View File

@ -0,0 +1,23 @@
import unittest
import datetime
import pytz
from dsmr_parser import telegram_specifications
from dsmr_parser import obis_name_mapping
from dsmr_parser import obis_references as obis
from dsmr_parser.objects import CosemObject, MbusDevice
from dsmr_parser.objects import MBusObject
from dsmr_parser.objects import ProfileGenericObject
from dsmr_parser.parsers import TelegramParser
from test.example_telegrams import TELEGRAM_V4_2, TELEGRAM_V5_TWO_MBUS
from decimal import Decimal
class DeviceObjectTest(unittest.TestCase):
def test_tmp(self):
parser = TelegramParser(telegram_specifications.V5)
telegram = parser.parse(TELEGRAM_V5_TWO_MBUS)
# print('val: ', telegram.HOURLY_GAS_METER_READING)
device = MbusDevice()

View File

@ -5,7 +5,7 @@ import pytz
from dsmr_parser import telegram_specifications
from dsmr_parser import obis_name_mapping
from dsmr_parser import obis_references as obis
from dsmr_parser.objects import CosemObject
from dsmr_parser.objects import CosemObject, MbusDevice
from dsmr_parser.objects import MBusObject
from dsmr_parser.objects import ProfileGenericObject
from dsmr_parser.parsers import TelegramParser
@ -327,54 +327,84 @@ class TelegramTest(unittest.TestCase):
self.assertEqual(len(telegram), 35)
def test_get(self):
""" Retrieve MBUS device without supplying channel which fetches all (two) records found. """
# def test_get(self):
# """ Retrieve MBUS device without supplying channel which fetches all (two) records found. """
# parser = TelegramParser(telegram_specifications.V5)
# telegram = parser.parse(TELEGRAM_V5_TWO_MBUS)
#
# # A single value is returned for the current electricity usage
# electricity_used_value = telegram.get(obis.CURRENT_ELECTRICITY_USAGE)
# self.assertEqual(type(electricity_used_value), CosemObject)
# self.assertEqual(electricity_used_value.channel, 0)
# self.assertEqual(electricity_used_value.value, Decimal('0.111'))
#
# # Multiple values are returned for the gas reading
# gas_values = telegram.get(obis.HOURLY_GAS_METER_READING)
# self.assertEqual(len(gas_values), 2)
#
# gas_value_1 = gas_values[0]
# self.assertEqual(type(gas_value_1), MBusObject)
# self.assertEqual(gas_value_1.channel, 1)
# self.assertEqual(gas_value_1.value, 0)
#
# gas_value_2 = gas_values[1]
# self.assertEqual(type(gas_value_2), MBusObject)
# self.assertEqual(gas_value_2.channel, 2)
# self.assertEqual(gas_value_2.value, Decimal('246.138'))
#
# def test_get_with_channel(self):
# parser = TelegramParser(telegram_specifications.V5)
# telegram = parser.parse(TELEGRAM_V5_TWO_MBUS)
#
# gas_value_1 = telegram.get(obis.HOURLY_GAS_METER_READING, channel=1)
# gas_value_2 = telegram.get(obis.HOURLY_GAS_METER_READING, channel=2)
#
# self.assertEqual(type(gas_value_1), MBusObject)
# self.assertEqual(gas_value_1.channel, 1)
# self.assertEqual(gas_value_1.value, 0)
#
# self.assertEqual(type(gas_value_2), MBusObject)
# self.assertEqual(gas_value_2.channel, 2)
# self.assertEqual(gas_value_2.value, Decimal('246.138'))
#
# def test_get_unknown_value(self):
# """ Retrieve MBUS device without supplying channel which fetches the first MBUS record found """
# parser = TelegramParser(telegram_specifications.V5)
# telegram = parser.parse(TELEGRAM_V5_TWO_MBUS)
#
# # Test valid OBIS reference with wrong channel
# with self.assertRaises(LookupError) as exception_context:
# telegram.get(obis.HOURLY_GAS_METER_READING, channel=123)
#
# self.assertEqual(
# str(exception_context.exception),
# 'No value found for OBIS reference "\\d-\\d:24\\.2\\.1.+?\\r\\n" on channel "123"'
# )
#
# # Test invalid OBIS reference
# with self.assertRaises(LookupError):
# telegram.get('invalid_obis_reference', channel=1)
# TODO
def test_get_mbus_devices(self):
parser = TelegramParser(telegram_specifications.V5)
telegram = parser.parse(TELEGRAM_V5_TWO_MBUS)
gas_values = telegram.get(obis.HOURLY_GAS_METER_READING)
mbus_devices = telegram.get_mbus_devices()
print(mbus_devices)
self.assertEqual(len(mbus_devices), 2)
self.assertEqual(len(gas_values), 2)
mbus_device_1 = mbus_devices[0]
self.assertEqual(type(mbus_device_1), MbusDevice)
print('mbus_device_1.HOURLY_GAS_METER_READING: ', mbus_device_1.HOURLY_GAS_METER_READING)
gas_value_1 = gas_values[0]
self.assertEqual(type(gas_value_1), MBusObject)
self.assertEqual(gas_value_1.channel, 1)
self.assertEqual(gas_value_1.value, 0)
mbus_device_2 = mbus_devices[1]
self.assertEqual(type(mbus_device_2), MbusDevice)
print('mbus_device_2.HOURLY_GAS_METER_READING: ', mbus_device_2.HOURLY_GAS_METER_READING)
gas_value_2 = gas_values[1]
self.assertEqual(type(gas_value_2), MBusObject)
self.assertEqual(gas_value_2.channel, 2)
self.assertEqual(gas_value_2.value, Decimal('246.138'))
def test_get_with_channel(self):
# TODO
def test_get_mbus_device_by_channel(self):
parser = TelegramParser(telegram_specifications.V5)
telegram = parser.parse(TELEGRAM_V5_TWO_MBUS)
gas_value_1 = telegram.get(obis.HOURLY_GAS_METER_READING, channel=1)
gas_value_2 = telegram.get(obis.HOURLY_GAS_METER_READING, channel=2)
self.assertEqual(type(gas_value_1), MBusObject)
self.assertEqual(gas_value_1.channel, 1)
self.assertEqual(gas_value_1.value, 0)
self.assertEqual(type(gas_value_2), MBusObject)
self.assertEqual(gas_value_2.channel, 2)
self.assertEqual(gas_value_2.value, Decimal('246.138'))
def test_get_unknown_value(self):
""" Retrieve MBUS device without supplying channel which fetches the first MBUS record found """
parser = TelegramParser(telegram_specifications.V5)
telegram = parser.parse(TELEGRAM_V5_TWO_MBUS)
# Test valid OBIS reference with wrong channel
with self.assertRaises(LookupError) as exception_context:
telegram.get(obis.HOURLY_GAS_METER_READING, channel=123)
self.assertEqual(
str(exception_context.exception),
'No value found for OBIS reference "\\d-\\d:24\\.2\\.1.+?\\r\\n" on channel "123"'
)
# Test invalid OBIS reference
with self.assertRaises(LookupError):
telegram.get('invalid_obis_reference', channel=1)
print('by channel: ', telegram.get_mbus_device_by_channel(2).HOURLY_GAS_METER_READING)