issue-51-telegram work in progress

This commit is contained in:
Nigel Dokter 2023-02-08 11:53:08 +01:00
parent ef66c8422c
commit ee2db04ec0
2 changed files with 55 additions and 25 deletions

View File

@ -22,15 +22,28 @@ class Telegram(object):
self._telegram_data = defaultdict(list) self._telegram_data = defaultdict(list)
self._obis_name_mapping = dsmr_parser.obis_name_mapping.EN self._obis_name_mapping = dsmr_parser.obis_name_mapping.EN
self._reverse_obis_name_mapping = dsmr_parser.obis_name_mapping.REVERSE_EN self._reverse_obis_name_mapping = dsmr_parser.obis_name_mapping.REVERSE_EN
self._item_names = self._get_item_names() self._item_names = []
def add(self, obis_reference, dsmr_object): def add(self, obis_reference, dsmr_object):
self._telegram_data[obis_reference].append(dsmr_object) self._telegram_data[obis_reference].append(dsmr_object)
def get_by_channel(self, obis_reference, channel): # Update name mapping used to get value by attribute. Example: telegram.P1_MESSAGE_HEADER
self._item_names.append(self._obis_name_mapping[obis_reference])
def get(self, obis_reference, channel=None):
"""
Get value by OBIS reference (regex). If multiple values exist a list is returned, unless filtering by channel.
May assume that values are sorted by channel.
"""
if channel is None:
try:
return self._telegram_data[obis_reference]
except KeyError:
raise LookupError('No value found for OBIS reference "{}"'.format(obis_reference))
try: try:
return next(filter(lambda x: x.channel == channel, self._telegram_data[obis_reference])) return [v for v in self._telegram_data[obis_reference] if v.channel == channel][0]
except StopIteration: except IndexError:
raise LookupError('No value found for OBIS reference "{}" on channel "{}"'.format(obis_reference, channel)) raise LookupError('No value found for OBIS reference "{}" on channel "{}"'.format(obis_reference, channel))
def __getattr__(self, name): def __getattr__(self, name):
@ -41,6 +54,12 @@ class Telegram(object):
return value return value
def __getitem__(self, obis_reference): def __getitem__(self, obis_reference):
"""
Has the limitation that it will return the first occurrence of the OBIS reference. Will miss values in case of
multiple MBUS devices like gas or water meters. In this case use .get(..) instead.
Example usage: telegram[obis_references.P1_MESSAGE_HEADER]
"""
try: try:
return self._telegram_data[obis_reference][0] return self._telegram_data[obis_reference][0]
except IndexError: except IndexError:
@ -48,10 +67,7 @@ class Telegram(object):
raise KeyError raise KeyError
def __len__(self): def __len__(self):
return len(self._telegram_data) # TODO: its nested now return len(self._telegram_data)
def _get_item_names(self):
return [self._obis_name_mapping[k] for k, v in self._telegram_data.items()]
def __iter__(self): def __iter__(self):
for attr in self._item_names: for attr in self._item_names:

View File

@ -321,31 +321,45 @@ class TelegramTest(unittest.TestCase):
assert item_names_tested_set == V4_name_set assert item_names_tested_set == V4_name_set
def test_len(self):
parser = TelegramParser(telegram_specifications.V5)
telegram = parser.parse(TELEGRAM_V5_TWO_MBUS)
self.assertEqual(len(telegram), 35)
def test_get(self): def test_get(self):
""" Retrieve MBUS device without supplying channel which fetches all (two) records found. """
parser = TelegramParser(telegram_specifications.V5) parser = TelegramParser(telegram_specifications.V5)
telegram = parser.parse(TELEGRAM_V5_TWO_MBUS) telegram = parser.parse(TELEGRAM_V5_TWO_MBUS)
mbus_1 = telegram.get(obis.HOURLY_GAS_METER_READING, channel=1) gas_values = telegram.get(obis.HOURLY_GAS_METER_READING)
mbus_2 = telegram.get(obis.HOURLY_GAS_METER_READING, channel=2)
self.assertEqual(type(mbus_1), MBusObject) self.assertEqual(len(gas_values), 2)
self.assertEqual(mbus_1.channel, 1)
self.assertEqual(mbus_1.value, 0)
self.assertEqual(type(mbus_2), MBusObject) gas_value_1 = gas_values[0]
self.assertEqual(mbus_2.channel, 2) self.assertEqual(type(gas_value_1), MBusObject)
self.assertEqual(mbus_2.value, Decimal('246.138')) self.assertEqual(gas_value_1.channel, 1)
self.assertEqual(gas_value_1.value, 0)
def test_get_without_channel(self): gas_value_2 = gas_values[1]
""" Retrieve MBUS device without supplying channel which fetches the first MBUS record found """ self.assertEqual(type(gas_value_2), MBusObject)
self.assertEqual(gas_value_2.channel, 2)
self.assertEqual(gas_value_2.value, Decimal('246.138'))
def test_get_with_channel(self):
parser = TelegramParser(telegram_specifications.V5) parser = TelegramParser(telegram_specifications.V5)
telegram = parser.parse(TELEGRAM_V5_TWO_MBUS) telegram = parser.parse(TELEGRAM_V5_TWO_MBUS)
mbus = telegram.get(obis.HOURLY_GAS_METER_READING) gas_value_1 = telegram.get(obis.HOURLY_GAS_METER_READING, channel=1)
gas_value_2 = telegram.get(obis.HOURLY_GAS_METER_READING, channel=2)
self.assertEqual(type(mbus), MBusObject) self.assertEqual(type(gas_value_1), MBusObject)
self.assertEqual(mbus.channel, 1) self.assertEqual(gas_value_1.channel, 1)
self.assertEqual(mbus.value, 0) self.assertEqual(gas_value_1.value, 0)
self.assertEqual(type(gas_value_2), MBusObject)
self.assertEqual(gas_value_2.channel, 2)
self.assertEqual(gas_value_2.value, Decimal('246.138'))
def test_get_unknown_value(self): def test_get_unknown_value(self):
""" Retrieve MBUS device without supplying channel which fetches the first MBUS record found """ """ Retrieve MBUS device without supplying channel which fetches the first MBUS record found """
@ -354,13 +368,13 @@ class TelegramTest(unittest.TestCase):
# Test valid OBIS reference with wrong channel # Test valid OBIS reference with wrong channel
with self.assertRaises(LookupError) as exception_context: with self.assertRaises(LookupError) as exception_context:
telegram.get_by_channel(obis.HOURLY_GAS_METER_READING, channel=123) telegram.get(obis.HOURLY_GAS_METER_READING, channel=123)
self.assertEqual( self.assertEqual(
str(exception_context.exception), str(exception_context.exception),
'No value found for OBIS reference "\d-\d:24\.2\.1.+?\\r\\n" on channel "123"' 'No value found for OBIS reference "\\d-\\d:24\\.2\\.1.+?\\r\\n" on channel "123"'
) )
# Test invalid OBIS reference # Test invalid OBIS reference
with self.assertRaises(LookupError): with self.assertRaises(LookupError):
telegram.get_by_channel('invalid_obis_reference', channel=1) telegram.get('invalid_obis_reference', channel=1)