issue-51-telegram work in progress

This commit is contained in:
Nigel Dokter 2023-01-14 19:47:24 +01:00
parent 29872ae6bb
commit 3e0332963c
2 changed files with 28 additions and 15 deletions

View File

@ -1,7 +1,9 @@
from collections import defaultdict
from decimal import Decimal
import dsmr_parser.obis_name_mapping import dsmr_parser.obis_name_mapping
import datetime import datetime
import json import json
from decimal import Decimal
class Telegram(object): class Telegram(object):
@ -16,28 +18,33 @@ class Telegram(object):
yields: yields:
['P1_MESSAGE_HEADER', 'P1_MESSAGE_TIMESTAMP', 'EQUIPMENT_IDENTIFIER', ...] ['P1_MESSAGE_HEADER', 'P1_MESSAGE_TIMESTAMP', 'EQUIPMENT_IDENTIFIER', ...]
""" """
def __init__(self, telegram_data, telegram_specification): def __init__(self):
self._telegram_specification = telegram_specification self._telegram_data = defaultdict(list)
self._obis_name_mapping = dsmr_parser.obis_name_mapping.EN self._obis_name_mapping = dsmr_parser.obis_name_mapping.EN
self._reverse_obis_name_mapping = dsmr_parser.obis_name_mapping.REVERSE_EN self._reverse_obis_name_mapping = dsmr_parser.obis_name_mapping.REVERSE_EN
self._dictionary = telegram_data
self._item_names = self._get_item_names() self._item_names = self._get_item_names()
def add(self, obis_reference, value):
self._telegram_data[obis_reference].append(value)
def get(self, obis_reference, channel):
return next(filter(lambda x: x.channel == channel, self._telegram_data[obis_reference]))
def __getattr__(self, name): def __getattr__(self, name):
""" will only get called for undefined attributes """ """ will only get called for undefined attributes """
obis_reference = self._reverse_obis_name_mapping[name] obis_reference = self._reverse_obis_name_mapping[name]
value = self._dictionary[obis_reference] value = self._telegram_data[obis_reference][0]
setattr(self, name, value) setattr(self, name, value)
return value return value
def __getitem__(self, obis_reference): def __getitem__(self, obis_reference):
return self._dictionary[obis_reference] return self._telegram_data[obis_reference][0]
def __len__(self): def __len__(self):
return len(self._dictionary) return len(self._telegram_data) #TODO: its nested now
def _get_item_names(self): def _get_item_names(self):
return [self._obis_name_mapping[k] for k, v in self._dictionary.items()] return [self._obis_name_mapping[k] for k, v in self._telegram_data.items()]
def __iter__(self): def __iter__(self):
for attr in self._item_names: for attr in self._item_names:
@ -62,6 +69,10 @@ class DSMRObject(object):
def __init__(self, values): def __init__(self, values):
self.values = values self.values = values
@property
def channel(self):
return 0 # TODO
class MBusObject(DSMRObject): class MBusObject(DSMRObject):

View File

@ -74,25 +74,27 @@ class TelegramParser(object):
except Exception: except Exception:
pass pass
if self.apply_checksum_validation \ if self.apply_checksum_validation and self.telegram_specification['checksum_support']:
and self.telegram_specification['checksum_support']:
self.validate_checksum(telegram_data) self.validate_checksum(telegram_data)
telegram = {} telegram = Telegram()
for signature, parser in self.telegram_specification['objects'].items(): for signature, parser in self.telegram_specification['objects'].items():
match = re.search(signature, telegram_data, re.DOTALL) pattern = re.compile(signature, re.DOTALL)
matches = pattern.findall(telegram_data)
# Some signatures are optional and may not be present, # Some signatures are optional and may not be present,
# so only parse lines that match # so only parse lines that match
if match: for match in matches:
try: try:
telegram[signature] = parser.parse(match.group(0)) value = parser.parse(match)
except Exception: except Exception:
logger.error("ignore line with signature {}, because parsing failed.".format(signature), logger.error("ignore line with signature {}, because parsing failed.".format(signature),
exc_info=True) exc_info=True)
else:
telegram.add(obis_reference=signature, value=value)
return Telegram(telegram, self.telegram_specification) return telegram
@staticmethod @staticmethod
def validate_checksum(telegram): def validate_checksum(telegram):