working version of the Telegram object
This commit is contained in:
parent
8bdf77c78d
commit
c36f68a884
@ -6,6 +6,7 @@ import serial_asyncio
|
||||
from dsmr_parser.clients.telegram_buffer import TelegramBuffer
|
||||
from dsmr_parser.exceptions import ParseError, InvalidChecksumError
|
||||
from dsmr_parser.parsers import TelegramParser
|
||||
from dsmr_parser.objects import Telegram
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -41,6 +42,25 @@ class SerialReader(object):
|
||||
except ParseError as e:
|
||||
logger.error('Failed to parse telegram: %s', e)
|
||||
|
||||
def read_as_object(self):
|
||||
"""
|
||||
Read complete DSMR telegram's from the serial interface and return a Telegram object.
|
||||
|
||||
:rtype: generator
|
||||
"""
|
||||
with serial.Serial(**self.serial_settings) as serial_handle:
|
||||
while True:
|
||||
data = serial_handle.readline()
|
||||
self.telegram_buffer.append(data.decode('ascii'))
|
||||
|
||||
for telegram in self.telegram_buffer.get_all():
|
||||
try:
|
||||
yield Telegram(telegram, telegram_parser, telegram_specification)
|
||||
except InvalidChecksumError as e:
|
||||
logger.warning(str(e))
|
||||
except ParseError as e:
|
||||
logger.error('Failed to parse telegram: %s', e)
|
||||
|
||||
|
||||
class AsyncSerialReader(SerialReader):
|
||||
"""Serial reader using asyncio pyserial."""
|
||||
|
54
dsmr_parser/obis_name_mapping.py
Normal file
54
dsmr_parser/obis_name_mapping.py
Normal file
@ -0,0 +1,54 @@
|
||||
from dsmr_parser import obis_references as obis
|
||||
|
||||
"""
|
||||
dsmr_parser.obis_name_mapping
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
This module contains a mapping of obis references to names.
|
||||
"""
|
||||
|
||||
EN = {
|
||||
obis.P1_MESSAGE_HEADER: 'P1_MESSAGE_HEADER',
|
||||
obis.P1_MESSAGE_TIMESTAMP: 'P1_MESSAGE_TIMESTAMP',
|
||||
obis.ELECTRICITY_IMPORTED_TOTAL : 'ELECTRICITY_IMPORTED_TOTAL',
|
||||
obis.ELECTRICITY_USED_TARIFF_1 : 'ELECTRICITY_USED_TARIFF_1',
|
||||
obis.ELECTRICITY_USED_TARIFF_2 : 'ELECTRICITY_USED_TARIFF_2',
|
||||
obis.ELECTRICITY_DELIVERED_TARIFF_1 : 'ELECTRICITY_DELIVERED_TARIFF_1',
|
||||
obis.ELECTRICITY_DELIVERED_TARIFF_2 : 'ELECTRICITY_DELIVERED_TARIFF_2',
|
||||
obis.ELECTRICITY_ACTIVE_TARIFF : 'ELECTRICITY_ACTIVE_TARIFF',
|
||||
obis.EQUIPMENT_IDENTIFIER : 'EQUIPMENT_IDENTIFIER',
|
||||
obis.CURRENT_ELECTRICITY_USAGE : 'CURRENT_ELECTRICITY_USAGE',
|
||||
obis.CURRENT_ELECTRICITY_DELIVERY : 'CURRENT_ELECTRICITY_DELIVERY',
|
||||
obis.LONG_POWER_FAILURE_COUNT : 'LONG_POWER_FAILURE_COUNT',
|
||||
obis.SHORT_POWER_FAILURE_COUNT : 'SHORT_POWER_FAILURE_COUNT',
|
||||
obis.POWER_EVENT_FAILURE_LOG : 'POWER_EVENT_FAILURE_LOG',
|
||||
obis.VOLTAGE_SAG_L1_COUNT : 'VOLTAGE_SAG_L1_COUNT',
|
||||
obis.VOLTAGE_SAG_L2_COUNT : 'VOLTAGE_SAG_L2_COUNT',
|
||||
obis.VOLTAGE_SAG_L3_COUNT : 'VOLTAGE_SAG_L3_COUNT',
|
||||
obis.VOLTAGE_SWELL_L1_COUNT : 'VOLTAGE_SWELL_L1_COUNT',
|
||||
obis.VOLTAGE_SWELL_L2_COUNT : 'VOLTAGE_SWELL_L2_COUNT',
|
||||
obis.VOLTAGE_SWELL_L3_COUNT : 'VOLTAGE_SWELL_L3_COUNT',
|
||||
obis.INSTANTANEOUS_VOLTAGE_L1 : 'INSTANTANEOUS_VOLTAGE_L1',
|
||||
obis.INSTANTANEOUS_VOLTAGE_L2 : 'INSTANTANEOUS_VOLTAGE_L2',
|
||||
obis.INSTANTANEOUS_VOLTAGE_L3 : 'INSTANTANEOUS_VOLTAGE_L3',
|
||||
obis.INSTANTANEOUS_CURRENT_L1 : 'INSTANTANEOUS_CURRENT_L1',
|
||||
obis.INSTANTANEOUS_CURRENT_L2 : 'INSTANTANEOUS_CURRENT_L2',
|
||||
obis.INSTANTANEOUS_CURRENT_L3 : 'INSTANTANEOUS_CURRENT_L3',
|
||||
obis.TEXT_MESSAGE_CODE : 'TEXT_MESSAGE_CODE',
|
||||
obis.TEXT_MESSAGE : 'TEXT_MESSAGE',
|
||||
obis.DEVICE_TYPE : 'DEVICE_TYPE',
|
||||
obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE : 'INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE',
|
||||
obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE : 'INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE',
|
||||
obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE : 'INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE',
|
||||
obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE : 'INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE',
|
||||
obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE : 'INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE',
|
||||
obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE : 'INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE',
|
||||
obis.EQUIPMENT_IDENTIFIER_GAS : 'EQUIPMENT_IDENTIFIER_GAS',
|
||||
obis.HOURLY_GAS_METER_READING : 'HOURLY_GAS_METER_READING',
|
||||
obis.GAS_METER_READING : 'GAS_METER_READING',
|
||||
obis.ACTUAL_TRESHOLD_ELECTRICITY : 'ACTUAL_TRESHOLD_ELECTRICITY',
|
||||
obis.ACTUAL_SWITCH_POSITION : 'ACTUAL_SWITCH_POSITION',
|
||||
obis.VALVE_POSITION_GAS : 'VALVE_POSITION_GAS'
|
||||
}
|
||||
|
||||
REVERSE_EN = dict([ (v,k) for k,v in EN.items()])
|
@ -1,3 +1,57 @@
|
||||
import dsmr_parser.obis_name_mapping
|
||||
|
||||
class Telegram(object):
|
||||
"""
|
||||
Container for raw and parsed telegram data.
|
||||
Initializing:
|
||||
from dsmr_parser import telegram_specifications
|
||||
from dsmr_parser.exceptions import InvalidChecksumError, ParseError
|
||||
from dsmr_parser.objects import CosemObject, MBusObject, Telegram
|
||||
from dsmr_parser.parsers import TelegramParser
|
||||
from test.example_telegrams import TELEGRAM_V4_2
|
||||
parser = TelegramParser(telegram_specifications.V4)
|
||||
telegram = Telegram(TELEGRAM_V4_2, parser, telegram_specifications.V4)
|
||||
|
||||
Attributes can be accessed on a telegram object by addressing by their english name, for example:
|
||||
telegram.ELECTRICITY_USED_TARIFF_1
|
||||
|
||||
All attributes in a telegram can be iterated over, for example:
|
||||
[k for k,v in telegram]
|
||||
yields:
|
||||
['P1_MESSAGE_HEADER', 'P1_MESSAGE_TIMESTAMP', 'EQUIPMENT_IDENTIFIER', ...]
|
||||
"""
|
||||
def __init__(self, telegram_data, telegram_parser, telegram_specification):
|
||||
self._telegram_data = telegram_data
|
||||
self._telegram_specification = telegram_specification
|
||||
self._telegram_parser = telegram_parser
|
||||
self._obis_name_mapping = dsmr_parser.obis_name_mapping.EN
|
||||
self._reverse_obis_name_mapping = dsmr_parser.obis_name_mapping.REVERSE_EN
|
||||
self._dictionary = self._telegram_parser.parse(telegram_data)
|
||||
self._item_names = self._get_item_names()
|
||||
|
||||
def __getattr__(self, name):
|
||||
''' will only get called for undefined attributes '''
|
||||
obis_reference = self._reverse_obis_name_mapping[name]
|
||||
value = self._dictionary[obis_reference]
|
||||
setattr(self, name, value)
|
||||
return value
|
||||
|
||||
def _get_item_names(self):
|
||||
return [self._obis_name_mapping[k] for k, v in self._dictionary.items()]
|
||||
|
||||
def __iter__(self):
|
||||
for attr in self._item_names:
|
||||
value = getattr(self, attr)
|
||||
yield attr, value
|
||||
|
||||
def __str__(self):
|
||||
output = ""
|
||||
for attr,value in self:
|
||||
output += " " if not output == "" else ""
|
||||
output += "{}: \t {} \t[{}]\n".format(attr,str(value.value),str(value.unit))
|
||||
return output
|
||||
|
||||
|
||||
class DSMRObject(object):
|
||||
"""
|
||||
Represents all data from a single telegram line.
|
||||
|
@ -3,7 +3,7 @@ import re
|
||||
|
||||
from PyCRC.CRC16 import CRC16
|
||||
|
||||
from dsmr_parser.objects import MBusObject, CosemObject
|
||||
from dsmr_parser.objects import MBusObject, CosemObject, Telegram
|
||||
from dsmr_parser.exceptions import ParseError, InvalidChecksumError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
30
test/test_telegram.py
Normal file
30
test/test_telegram.py
Normal file
@ -0,0 +1,30 @@
|
||||
from decimal import Decimal
|
||||
|
||||
import datetime
|
||||
import unittest
|
||||
|
||||
import pytz
|
||||
|
||||
from dsmr_parser import obis_references as obis
|
||||
from dsmr_parser import telegram_specifications
|
||||
from dsmr_parser.exceptions import InvalidChecksumError, ParseError
|
||||
from dsmr_parser.objects import CosemObject, MBusObject, Telegram
|
||||
from dsmr_parser.parsers import TelegramParser
|
||||
from test.example_telegrams import TELEGRAM_V4_2
|
||||
|
||||
class TelegramTest(unittest.TestCase):
|
||||
""" Test instantiation of Telegram object """
|
||||
|
||||
def test_instantiate(self):
|
||||
parser = TelegramParser(telegram_specifications.V4)
|
||||
#result = parser.parse(TELEGRAM_V4_2)
|
||||
telegram = Telegram(TELEGRAM_V4_2, parser, telegram_specifications.V4)
|
||||
|
||||
|
||||
|
||||
|
||||
# P1_MESSAGE_HEADER (1-3:0.2.8)
|
||||
#assert isinstance(result[obis.P1_MESSAGE_HEADER], CosemObject)
|
||||
#assert result[obis.P1_MESSAGE_HEADER].unit is None
|
||||
#assert isinstance(result[obis.P1_MESSAGE_HEADER].value, str)
|
||||
#assert result[obis.P1_MESSAGE_HEADER].value == '50'
|
Loading…
Reference in New Issue
Block a user