281 lines
8.8 KiB
Python
Raw Normal View History

2023-01-14 19:47:24 +01:00
from collections import defaultdict
from decimal import Decimal
2019-06-06 05:41:55 +02:00
import dsmr_parser.obis_name_mapping
import datetime
2020-05-25 01:38:14 +02:00
import json
2019-06-06 05:41:55 +02:00
2019-06-06 05:41:55 +02:00
class Telegram(object):
"""
Container for parsed telegram data.
2019-06-06 05:41:55 +02:00
Attributes can be accessed on a telegram object by addressing by their english name, for example:
telegram.ELECTRICITY_USED_TARIFF_1
All attributes in a telegram can be iterated over, for example:
[k for k,v in telegram]
yields:
['P1_MESSAGE_HEADER', 'P1_MESSAGE_TIMESTAMP', 'EQUIPMENT_IDENTIFIER', ...]
"""
2023-01-14 19:47:24 +01:00
def __init__(self):
self._telegram_data = defaultdict(list)
2019-06-06 05:41:55 +02:00
self._obis_name_mapping = dsmr_parser.obis_name_mapping.EN
self._reverse_obis_name_mapping = dsmr_parser.obis_name_mapping.REVERSE_EN
self._item_names = self._get_item_names()
2023-01-15 13:59:15 +01:00
def add(self, obis_reference, dsmr_object):
self._telegram_data[obis_reference].append(dsmr_object)
2023-01-14 19:47:24 +01:00
2023-01-15 13:59:15 +01:00
# TODO experiment with api to see what is nice
def get(self, obis_reference, channel=None):
if channel is None:
return self._telegram_data[obis_reference]
try:
return next(filter(lambda x: x.channel == channel, self._telegram_data[obis_reference]))
except StopIteration:
return None
2023-01-14 19:47:24 +01:00
2019-06-06 05:41:55 +02:00
def __getattr__(self, name):
""" will only get called for undefined attributes """
2019-06-06 05:41:55 +02:00
obis_reference = self._reverse_obis_name_mapping[name]
2023-01-14 19:47:24 +01:00
value = self._telegram_data[obis_reference][0]
2019-06-06 05:41:55 +02:00
setattr(self, name, value)
return value
def __getitem__(self, obis_reference):
2023-01-14 19:47:24 +01:00
return self._telegram_data[obis_reference][0]
def __len__(self):
2023-01-15 14:01:48 +01:00
return len(self._telegram_data) # TODO: its nested now
2019-06-06 05:41:55 +02:00
def _get_item_names(self):
2023-01-14 19:47:24 +01:00
return [self._obis_name_mapping[k] for k, v in self._telegram_data.items()]
2019-06-06 05:41:55 +02:00
def __iter__(self):
for attr in self._item_names:
value = getattr(self, attr)
yield attr, value
def __str__(self):
output = ""
for attr, value in self:
output += "{}: \t {}\n".format(attr, str(value))
2019-06-06 05:41:55 +02:00
return output
2020-05-25 01:38:14 +02:00
def to_json(self):
return json.dumps(dict([[attr, json.loads(value.to_json())] for attr, value in self]))
2019-06-06 05:41:55 +02:00
2016-08-22 20:16:11 +02:00
class DSMRObject(object):
2017-01-28 17:01:33 +01:00
"""
Represents all data from a single telegram line.
"""
2016-08-22 20:16:11 +02:00
2023-01-15 13:59:15 +01:00
def __init__(self, channel, values):
self.channel = channel # TODO consider if only MBus should have channels
2016-08-22 20:16:11 +02:00
self.values = values
class MBusObject(DSMRObject):
@property
def datetime(self):
return self.values[0]['value']
@property
def value(self):
# TODO temporary workaround for DSMR v2.2. Maybe use the same type of
# TODO object, but let the parse set them differently? So don't use
# TODO hardcoded indexes here.
if len(self.values) != 2: # v2
return self.values[6]['value']
else:
return self.values[1]['value']
2016-08-22 20:16:11 +02:00
@property
def unit(self):
# TODO temporary workaround for DSMR v2.2. Maybe use the same type of
# TODO object, but let the parse set them differently? So don't use
# TODO hardcoded indexes here.
if len(self.values) != 2: # v2
return self.values[5]['value']
else:
return self.values[1]['unit']
def __str__(self):
output = "{}\t[{}] at {}".format(str(self.value), str(self.unit), str(self.datetime.astimezone().isoformat()))
return output
2020-05-25 01:38:14 +02:00
def to_json(self):
timestamp = self.datetime
if isinstance(self.datetime, datetime.datetime):
timestamp = self.datetime.astimezone().isoformat()
value = self.value
if isinstance(self.value, datetime.datetime):
value = self.value.astimezone().isoformat()
if isinstance(self.value, Decimal):
value = float(self.value)
output = {
'datetime': timestamp,
'value': value,
'unit': self.unit
}
return json.dumps(output)
class MBusObjectPeak(DSMRObject):
@property
def datetime(self):
return self.values[0]['value']
@property
def occurred(self):
return self.values[1]['value']
@property
def value(self):
return self.values[2]['value']
@property
def unit(self):
return self.values[2]['unit']
def __str__(self):
output = "{}\t[{}] at {} occurred {}"\
.format(str(self.value), str(self.unit), str(self.datetime.astimezone().isoformat()),
str(self.occurred.astimezone().isoformat()))
return output
def to_json(self):
timestamp = self.datetime
if isinstance(self.datetime, datetime.datetime):
timestamp = self.datetime.astimezone().isoformat()
timestamp_occurred = self.occurred
if isinstance(self.occurred, datetime.datetime):
timestamp_occurred = self.occurred.astimezone().isoformat()
value = self.value
if isinstance(self.value, datetime.datetime):
value = self.value.astimezone().isoformat()
if isinstance(self.value, Decimal):
value = float(self.value)
output = {
'datetime': timestamp,
'occurred': timestamp_occurred,
'value': value,
'unit': self.unit
}
return json.dumps(output)
2016-08-22 20:16:11 +02:00
class CosemObject(DSMRObject):
@property
def value(self):
return self.values[0]['value']
@property
def unit(self):
return self.values[0]['unit']
def __str__(self):
print_value = self.value
if isinstance(self.value, datetime.datetime):
print_value = self.value.astimezone().isoformat()
output = "{}\t[{}]".format(str(print_value), str(self.unit))
return output
2020-05-25 01:38:14 +02:00
def to_json(self):
json_value = self.value
if isinstance(self.value, datetime.datetime):
json_value = self.value.astimezone().isoformat()
if isinstance(self.value, Decimal):
json_value = float(self.value)
output = {
'value': json_value,
'unit': self.unit
}
return json.dumps(output)
2016-08-22 20:16:11 +02:00
class ProfileGenericObject(DSMRObject):
"""
Represents all data in a GenericProfile value.
All buffer values are returned as a list of MBusObjects,
containing the datetime (timestamp) and the value.
"""
2023-01-15 13:59:15 +01:00
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._buffer_list = None
@property
def value(self):
# value is added to make sure the telegram iterator does not break
2021-02-14 22:08:35 +01:00
return self.values
@property
def unit(self):
# value is added to make sure all items have a unit so code that relies on that does not break
return None
@property
def buffer_length(self):
return self.values[0]['value']
@property
def buffer_type(self):
return self.values[1]['value']
@property
def buffer(self):
if self._buffer_list is None:
self._buffer_list = []
values_offset = 2
2023-01-15 13:59:15 +01:00
for i in range(self.buffer_length):
2021-11-28 01:12:32 +01:00
offset = values_offset + i * 2
2023-01-15 13:59:15 +01:00
self._buffer_list.append(
MBusObject(
channel=self.channel,
values=[self.values[offset], self.values[offset + 1]]
)
)
return self._buffer_list
def __str__(self):
output = "\t buffer length: {}\n".format(self.buffer_length)
output += "\t buffer type: {}".format(self.buffer_type)
for buffer_value in self.buffer:
timestamp = buffer_value.datetime
if isinstance(timestamp, datetime.datetime):
timestamp = str(timestamp.astimezone().isoformat())
output += "\n\t event occured at: {}".format(timestamp)
output += "\t for: {} [{}]".format(buffer_value.value, buffer_value.unit)
return output
2020-05-25 01:38:14 +02:00
def to_json(self):
"""
:return: A json of all values in the GenericProfileObject , with the following structure
{'buffer_length': n,
'buffer_type': obis_ref,
'buffer': [{'datetime': d1,
'value': v1,
'unit': u1},
...
{'datetime': dn,
'value': vn,
'unit': un}
]
}
"""
list = [['buffer_length', self.buffer_length]]
list.append(['buffer_type', self.buffer_type])
buffer_repr = [json.loads(buffer_item.to_json()) for buffer_item in self.buffer]
list.append(['buffer', buffer_repr])
output = dict(list)
return json.dumps(output)