This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import re
from PyCRC.CRC16 import CRC16
from dsmr_parser.objects import MBusObject, MBusObjectV2_2, CosemObject
from dsmr_parser.exceptions import ParseError, InvalidChecksumError
from dsmr_parser.obis_references import GAS_METER_READING
logger = logging.getLogger(__name__)
class TelegramParser(object):
def __init__(self, telegram_specification):
"""
:param telegram_specification: determines how the telegram is parsed
:type telegram_specification: dict
"""
self.telegram_specification = telegram_specification
def _find_line_parser(self, line_value):
for obis_reference, parser in self.telegram_specification.items():
if re.search(obis_reference, line_value):
return obis_reference, parser
return None, None
def parse(self, telegram):
"""
Parse telegram from string to dict.
The telegram str type makes python 2.x integration easier.
:param str telegram: full telegram from start ('/') to checksum
('!ABCD') including line endings inbetween the telegram's lines
:rtype: dict
:returns: Shortened example:
{
..
r'0-0:96\.1\.1': <CosemObject>, # EQUIPMENT_IDENTIFIER
r'1-0:1\.8\.1': <CosemObject>, # ELECTRICITY_USED_TARIFF_1
r'0-\d:24\.3\.0': <MBusObject>, # GAS_METER_READING
..
}
"""
telegram_lines = telegram.splitlines()
parsed_lines = map(self.parse_line, telegram_lines)
return {obis_reference: dsmr_object
for obis_reference, dsmr_object in parsed_lines}
def parse_line(self, line):
logger.debug("Parsing line '%s'", line)
obis_reference, parser = self._find_line_parser(line)
if not obis_reference:
logger.debug("No line class found for: '%s'", line)
return None, None
return obis_reference, parser.parse(line)
class TelegramParserV4(TelegramParser):
@staticmethod
def validate_telegram_checksum(telegram):
"""
:param str telegram:
:raises ParseError:
:raises InvalidChecksumError:
"""
# Extract the part for which the checksum applies.
checksum_contents = re.search(r'\/.+\!', telegram, re.DOTALL)
# Extract the hexadecimal checksum value itself.
# The line ending '\r\n' for the checksum line can be ignored.
checksum_hex = re.search(r'((?<=\!)[0-9A-Z]{4})+', telegram)
if not checksum_contents or not checksum_hex:
raise ParseError(
'Failed to perform CRC validation because the telegram is '
'incomplete. The checksum and/or content values are missing.'
)
calculated_crc = CRC16().calculate(checksum_contents.group(0))
expected_crc = int(checksum_hex.group(0), base=16)
if calculated_crc != expected_crc:
raise InvalidChecksumError(
"Invalid telegram. The CRC checksum '{}' does not match the "
"expected '{}'".format(
calculated_crc,
expected_crc
)
)
def parse(self, telegram):
"""
:param str telegram:
:rtype: dict
"""
self.validate_telegram_checksum(telegram)
return super().parse(telegram)
class TelegramParserV2_2(TelegramParser):
def parse(self, telegram):
"""
:param str telegram:
:rtype: dict
"""
# TODO fix this in the specification: telegram_specifications.V2_2
def join_lines(telegram):
"""Join lines for gas meter."""
join_next = re.compile(GAS_METER_READING)
join = None
for line_value in telegram.splitlines():
if join:
yield join + line_value
join = None
elif join_next.match(line_value):
join = line_value
else:
yield line_value
# TODO temporary workaround
lines = join_lines(telegram)
telegram = '\r\n'.join(lines)
return super().parse(telegram)
class DSMRObjectParser(object):
def __init__(self, *value_formats):
self.value_formats = value_formats
def _parse(self, line):
# Match value groups, but exclude the parentheses
pattern = re.compile(r'((?<=\()[0-9a-zA-Z\.\*]{0,}(?=\)))+')
values = re.findall(pattern, line)
# Convert empty value groups to None for clarity.
values = [None if value == '' else value for value in values]
if not values or len(values) != len(self.value_formats):
raise ParseError("Invalid '%s' line for '%s'", line, self)
return [self.value_formats[i].parse(value)
for i, value in enumerate(values)]
class MBusParser(DSMRObjectParser):
"""
Gas meter value parser.
These are lines with a timestamp and gas meter value.
Line format:
'ID (TST) (Mv1*U1)'
1 2 3 4
1) OBIS Reduced ID-code
2) Time Stamp (TST) of capture time of measurement value
3) Measurement value 1 (most recent entry of buffer attribute without unit)
4) Unit of measurement values (Unit of capture objects attribute)
"""
def parse(self, line):
values = self._parse(line)
if len(values) == 2:
return MBusObject(values)
else:
return MBusObjectV2_2(values)
class CosemParser(DSMRObjectParser):
"""
Cosem object parser.
These are data objects with a single value that optionally have a unit of
measurement.
Line format:
ID (Mv*U)
1 23 45
1) OBIS Reduced ID-code
2) Separator “(“, ASCII 28h
3) COSEM object attribute value
4) Unit of measurement values (Unit of capture objects attribute) only if applicable
5) Separator “)”, ASCII 29h
"""
def parse(self, line):
return CosemObject(self._parse(line))
class ProfileGenericParser(DSMRObjectParser):
"""
Power failure log parser.
These are data objects with multiple repeating groups of values.
Line format:
ID (z) (ID1) (TST) (Bv1*U1) (TST) (Bvz*Uz)
1 2 3 4 5 6 7 8 9
1) OBIS Reduced ID-code
2) Number of values z (max 10).
3) Identifications of buffer values (OBIS Reduced ID codes of capture objects attribute)
4) Time Stamp (TST) of power failure end time
5) Buffer value 1 (most recent entry of buffer attribute without unit)
6) Unit of buffer values (Unit of capture objects attribute)
7) Time Stamp (TST) of power failure end time
8) Buffer value 2 (oldest entry of buffer attribute without unit)
9) Unit of buffer values (Unit of capture objects attribute)
"""
def parse(self, line):
raise NotImplementedError()
class ValueParser(object):
def __init__(self, coerce_type):
self.coerce_type = coerce_type
def parse(self, value):
unit_of_measurement = None
if value and '*' in value:
value, unit_of_measurement = value.split('*')
# A value group is not required to have a value, and then coercing does
# not apply.
value = self.coerce_type(value) if value is not None else value
return {
'value': value,
'unit': unit_of_measurement
}