Merge branch 'crc_check'

This commit is contained in:
Nigel Dokter 2016-12-29 19:32:24 +01:00
commit 6dec45ae2c
12 changed files with 213 additions and 92 deletions

View File

@ -1,6 +1,10 @@
Change Log Change Log
---------- ----------
**0.5** (2016-12-29)
- CRC checksum verification for DSMR v4 telegrams (`issue #10 <https://github.com/ndokter/dsmr_parser/issues/10>`_)
**0.4** (2016-11-21) **0.4** (2016-11-21)
- DSMR v2.2 serial settings now uses parity serial.EVEN by default (`pull request #5 <https://github.com/ndokter/dsmr_parser/pull/5>`_) - DSMR v2.2 serial settings now uses parity serial.EVEN by default (`pull request #5 <https://github.com/ndokter/dsmr_parser/pull/5>`_)
@ -8,7 +12,7 @@ Change Log
**0.3** (2016-11-12) **0.3** (2016-11-12)
- asyncio reader for non-blocking reads. (`pull request #3 <https://github.com/ndokter/dsmr_parser/pull/3>`_) - asyncio reader for non-blocking reads (`pull request #3 <https://github.com/ndokter/dsmr_parser/pull/3>`_)
**0.2** (2016-11-08) **0.2** (2016-11-08)

View File

@ -70,9 +70,3 @@ If the serial settings SERIAL_SETTINGS_V2_2 or SERIAL_SETTINGS_V4 don't work.
Make sure to try and replace the parity settings to EVEN or NONE. Make sure to try and replace the parity settings to EVEN or NONE.
It's possible that alternative settings will be added in the future if these It's possible that alternative settings will be added in the future if these
settings don't work for the majority of meters. settings don't work for the majority of meters.
TODO
----
- verify telegram checksum
- improve ease of use

View File

@ -1,2 +1,6 @@
class ParseError(Exception): class ParseError(Exception):
pass pass
class InvalidChecksumError(ParseError):
pass

View File

@ -1,8 +1,10 @@
import logging import logging
import re import re
from PyCRC.CRC16 import CRC16
from .objects import MBusObject, MBusObjectV2_2, CosemObject from .objects import MBusObject, MBusObjectV2_2, CosemObject
from .exceptions import ParseError from .exceptions import ParseError, InvalidChecksumError
from .obis_references import GAS_METER_READING from .obis_references import GAS_METER_READING
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -18,7 +20,6 @@ class TelegramParser(object):
self.telegram_specification = telegram_specification self.telegram_specification = telegram_specification
def _find_line_parser(self, line_value): def _find_line_parser(self, line_value):
for obis_reference, parser in self.telegram_specification.items(): for obis_reference, parser in self.telegram_specification.items():
if re.search(obis_reference, line_value): if re.search(obis_reference, line_value):
return obis_reference, parser return obis_reference, parser
@ -29,7 +30,10 @@ class TelegramParser(object):
telegram = {} telegram = {}
for line_value in line_values: for line_value in line_values:
obis_reference, dsmr_object = self.parse_line(line_value.strip()) # TODO temporarily strip newline characters.
line_value = line_value.strip()
obis_reference, dsmr_object = self.parse_line(line_value)
telegram[obis_reference] = dsmr_object telegram[obis_reference] = dsmr_object
@ -47,7 +51,51 @@ class TelegramParser(object):
return obis_reference, parser.parse(line_value) return obis_reference, parser.parse(line_value)
class TelegramParserV4(TelegramParser):
@staticmethod
def validate_telegram_checksum(line_values):
"""
:type line_values: list
:raises ParseError:
:raises InvalidChecksumError:
"""
full_telegram = ''.join(line_values)
# Extract the bytes that count towards the checksum.
checksum_contents = re.search(r'\/.+\!', full_telegram, re.DOTALL)
# Extract the hexadecimal checksum value itself.
checksum_hex = re.search(r'((?<=\!)[0-9A-Z]{4}(?=\r\n))+', full_telegram)
if not checksum_contents or not checksum_hex:
raise ParseError(
'Failed to perform CRC validation because the telegram is '
'incomplete. The checksum and/or content values are missing.'
)
calculated_crc = CRC16().calculate(checksum_contents.group(0))
expected_crc = checksum_hex.group(0)
expected_crc = int(expected_crc, base=16)
if calculated_crc != expected_crc:
raise InvalidChecksumError(
"Invalid telegram. The CRC checksum '{}' does not match the "
"expected '{}'".format(
calculated_crc,
expected_crc
)
)
def parse(self, line_values):
self.validate_telegram_checksum(line_values)
return super().parse(line_values)
class TelegramParserV2_2(TelegramParser): class TelegramParserV2_2(TelegramParser):
def parse(self, line_values): def parse(self, line_values):
"""Join lines for gas meter.""" """Join lines for gas meter."""

View File

@ -8,9 +8,15 @@ from serial_asyncio import create_serial_connection
from . import telegram_specifications from . import telegram_specifications
from .exceptions import ParseError from .exceptions import ParseError
from .parsers import TelegramParser, TelegramParserV2_2 from .parsers import (
from .serial import (SERIAL_SETTINGS_V2_2, SERIAL_SETTINGS_V4, TelegramParserV2_2,
is_end_of_telegram, is_start_of_telegram) TelegramParserV4
)
from .serial import (
SERIAL_SETTINGS_V2_2, SERIAL_SETTINGS_V4,
is_end_of_telegram,
is_start_of_telegram
)
def create_dsmr_reader(port, dsmr_version, telegram_callback, loop=None): def create_dsmr_reader(port, dsmr_version, telegram_callback, loop=None):
@ -22,7 +28,7 @@ def create_dsmr_reader(port, dsmr_version, telegram_callback, loop=None):
serial_settings = SERIAL_SETTINGS_V2_2 serial_settings = SERIAL_SETTINGS_V2_2
elif dsmr_version == '4': elif dsmr_version == '4':
specifications = telegram_specifications.V4 specifications = telegram_specifications.V4
telegram_parser = TelegramParser telegram_parser = TelegramParserV4
serial_settings = SERIAL_SETTINGS_V4 serial_settings = SERIAL_SETTINGS_V4
serial_settings['url'] = port serial_settings['url'] = port

View File

@ -1,11 +1,11 @@
import asyncio import asyncio
import logging import logging
import serial import serial
import serial_asyncio import serial_asyncio
from dsmr_parser.exceptions import ParseError from dsmr_parser.exceptions import ParseError
from dsmr_parser.parsers import TelegramParser, TelegramParserV2_2 from dsmr_parser.parsers import TelegramParser, TelegramParserV2_2, \
TelegramParserV4
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -32,15 +32,20 @@ SERIAL_SETTINGS_V4 = {
def is_start_of_telegram(line): def is_start_of_telegram(line):
"""
:type line: line
"""
return line.startswith('/') return line.startswith('/')
def is_end_of_telegram(line): def is_end_of_telegram(line):
"""
:type line: line
"""
return line.startswith('!') return line.startswith('!')
class SerialReader(object): class SerialReader(object):
PORT_KEY = 'port' PORT_KEY = 'port'
def __init__(self, device, serial_settings, telegram_specification): def __init__(self, device, serial_settings, telegram_specification):
@ -49,8 +54,11 @@ class SerialReader(object):
if serial_settings is SERIAL_SETTINGS_V2_2: if serial_settings is SERIAL_SETTINGS_V2_2:
telegram_parser = TelegramParserV2_2 telegram_parser = TelegramParserV2_2
elif serial_settings is SERIAL_SETTINGS_V4:
telegram_parser = TelegramParserV4
else: else:
telegram_parser = TelegramParser telegram_parser = TelegramParser
self.telegram_parser = telegram_parser(telegram_specification) self.telegram_parser = telegram_parser(telegram_specification)
def read(self): def read(self):
@ -65,7 +73,7 @@ class SerialReader(object):
while True: while True:
line = serial_handle.readline() line = serial_handle.readline()
line = line.decode('ascii') line = line.decode('ascii') # TODO move this to the parser?
# Telegrams need to be complete because the values belong to a # Telegrams need to be complete because the values belong to a
# particular reading and can also be related to eachother. # particular reading and can also be related to eachother.
@ -75,7 +83,12 @@ class SerialReader(object):
telegram.append(line) telegram.append(line)
if is_end_of_telegram(line): if is_end_of_telegram(line):
yield self.telegram_parser.parse(telegram)
try:
yield self.telegram_parser.parse(telegram)
except ParseError as e:
logger.error('Failed to parse telegram: %s', e)
telegram = [] telegram = []
@ -119,7 +132,7 @@ class AsyncSerialReader(SerialReader):
parsed_telegram = self.telegram_parser.parse(telegram) parsed_telegram = self.telegram_parser.parse(telegram)
# push new parsed telegram onto queue # push new parsed telegram onto queue
queue.put_nowait(parsed_telegram) queue.put_nowait(parsed_telegram)
except ParseError: except ParseError as e:
logger.exception("failed to parse telegram") logger.warning('Failed to parse telegram: %s', e)
telegram = [] telegram = []

View File

@ -4,8 +4,9 @@ import pytz
def timestamp(value): def timestamp(value):
naive_datetime = datetime.datetime.strptime(value[:-1], '%y%m%d%H%M%S') naive_datetime = datetime.datetime.strptime(value[:-1], '%y%m%d%H%M%S')
# TODO comment on this exception
if len(value) == 13: if len(value) == 13:
is_dst = value[12] == 'S' # assume format 160322150000W is_dst = value[12] == 'S' # assume format 160322150000W
else: else:

View File

@ -6,12 +6,13 @@ setup(
author='Nigel Dokter', author='Nigel Dokter',
author_email='nigeldokter@gmail.com', author_email='nigeldokter@gmail.com',
url='https://github.com/ndokter/dsmr_parser', url='https://github.com/ndokter/dsmr_parser',
version='0.4', version='0.5',
packages=find_packages(), packages=find_packages(),
install_requires=[ install_requires=[
'pyserial>=3,<4', 'pyserial>=3,<4',
'pyserial-asyncio<1', 'pyserial-asyncio<1',
'pytz' 'pytz',
'PyCRC>=1.2,<2'
], ],
entry_points={ entry_points={
'console_scripts': ['dsmr_console=dsmr_parser.__main__:console'] 'console_scripts': ['dsmr_console=dsmr_parser.__main__:console']

View File

@ -5,26 +5,26 @@ from dsmr_parser import telegram_specifications
from dsmr_parser import obis_references as obis from dsmr_parser import obis_references as obis
TELEGRAM_V2_2 = [ TELEGRAM_V2_2 = [
"/ISk5\2MT382-1004", '/ISk5\2MT382-1004',
"", '',
"0-0:96.1.1(00000000000000)", '0-0:96.1.1(00000000000000)',
"1-0:1.8.1(00001.001*kWh)", '1-0:1.8.1(00001.001*kWh)',
"1-0:1.8.2(00001.001*kWh)", '1-0:1.8.2(00001.001*kWh)',
"1-0:2.8.1(00001.001*kWh)", '1-0:2.8.1(00001.001*kWh)',
"1-0:2.8.2(00001.001*kWh)", '1-0:2.8.2(00001.001*kWh)',
"0-0:96.14.0(0001)", '0-0:96.14.0(0001)',
"1-0:1.7.0(0001.01*kW)", '1-0:1.7.0(0001.01*kW)',
"1-0:2.7.0(0000.00*kW)", '1-0:2.7.0(0000.00*kW)',
"0-0:17.0.0(0999.00*kW)", '0-0:17.0.0(0999.00*kW)',
"0-0:96.3.10(1)", '0-0:96.3.10(1)',
"0-0:96.13.1()", '0-0:96.13.1()',
"0-0:96.13.0()", '0-0:96.13.0()',
"0-1:24.1.0(3)", '0-1:24.1.0(3)',
"0-1:96.1.0(000000000000)", '0-1:96.1.0(000000000000)',
"0-1:24.3.0(161107190000)(00)(60)(1)(0-1:24.2.1)(m3)", '0-1:24.3.0(161107190000)(00)(60)(1)(0-1:24.2.1)(m3)',
"(00001.001)", '(00001.001)',
"0-1:24.4.0(1)", '0-1:24.4.0(1)',
"!", '!',
] ]

View File

@ -6,51 +6,77 @@ import pytz
from dsmr_parser import obis_references as obis from dsmr_parser import obis_references as obis
from dsmr_parser import telegram_specifications from dsmr_parser import telegram_specifications
from dsmr_parser.exceptions import InvalidChecksumError, ParseError
from dsmr_parser.objects import CosemObject, MBusObject from dsmr_parser.objects import CosemObject, MBusObject
from dsmr_parser.parsers import TelegramParser from dsmr_parser.parsers import TelegramParser, TelegramParserV4
TELEGRAM_V4_2 = [ TELEGRAM_V4_2 = [
'1-3:0.2.8(42)', '/KFM5KAIFA-METER\r\n',
'0-0:1.0.0(161113205757W)', '\r\n',
'0-0:96.1.1(1231231231231231231231231231231231)', '1-3:0.2.8(42)\r\n',
'1-0:1.8.1(001511.267*kWh)', '0-0:1.0.0(161113205757W)\r\n',
'1-0:1.8.2(001265.173*kWh)', '0-0:96.1.1(3960221976967177082151037881335713)\r\n',
'1-0:2.8.1(000000.000*kWh)', '1-0:1.8.1(001581.123*kWh)\r\n',
'1-0:2.8.2(000000.000*kWh)', '1-0:1.8.2(001435.706*kWh)\r\n',
'0-0:96.14.0(0001)', '1-0:2.8.1(000000.000*kWh)\r\n',
'1-0:1.7.0(00.235*kW)', '1-0:2.8.2(000000.000*kWh)\r\n',
'1-0:2.7.0(00.000*kW)', '0-0:96.14.0(0002)\r\n',
'0-0:96.7.21(00015)', '1-0:1.7.0(02.027*kW)\r\n',
'0-0:96.7.9(00007)', '1-0:2.7.0(00.000*kW)\r\n',
('1-0:99.97.0(3)(0-0:96.7.19)(000103180420W)(0000237126*s)' '0-0:96.7.21(00015)\r\n',
'(000101000001W)(2147483647*s)(000101000001W)(2147483647*s)'), '0-0:96.7.9(00007)\r\n',
'1-0:32.32.0(00000)', '1-0:99.97.0(3)(0-0:96.7.19)(000104180320W)(0000237126*s)(000101000001W)'
'1-0:52.32.0(00000)', '(2147583646*s)(000102000003W)(2317482647*s)\r\n',
'1-0:72.32.0(00000)', '1-0:32.32.0(00000)\r\n',
'1-0:32.36.0(00000)', '1-0:52.32.0(00000)\r\n',
'1-0:52.36.0(00000)', '1-0:72.32.0(00000)\r\n',
'1-0:72.36.0(00000)', '1-0:32.36.0(00000)\r\n',
'0-0:96.13.1()', '1-0:52.36.0(00000)\r\n',
'0-0:96.13.0()', '1-0:72.36.0(00000)\r\n',
'1-0:31.7.0(000*A)', '0-0:96.13.1()\r\n',
'1-0:51.7.0(000*A)', '0-0:96.13.0()\r\n',
'1-0:71.7.0(000*A)', '1-0:31.7.0(000*A)\r\n',
'1-0:21.7.0(00.095*kW)', '1-0:51.7.0(006*A)\r\n',
'1-0:22.7.0(00.000*kW)', '1-0:71.7.0(002*A)\r\n',
'1-0:41.7.0(00.025*kW)', '1-0:21.7.0(00.170*kW)\r\n',
'1-0:42.7.0(00.000*kW)', '1-0:22.7.0(00.000*kW)\r\n',
'1-0:61.7.0(00.115*kW)', '1-0:41.7.0(01.247*kW)\r\n',
'1-0:62.7.0(00.000*kW)', '1-0:42.7.0(00.000*kW)\r\n',
'0-1:24.1.0(003)', '1-0:61.7.0(00.209*kW)\r\n',
'0-1:96.1.0(3404856892390357246729543587524029)', '1-0:62.7.0(00.000*kW)\r\n',
'0-1:24.2.1(161113200000W)(00915.219*m3)', '0-1:24.1.0(003)\r\n',
'!5D83', '0-1:96.1.0(4819243993373755377509728609491464)\r\n',
'0-1:24.2.1(161129200000W)(00981.443*m3)\r\n',
'!6796\r\n'
] ]
class TelegramParserV4_2Test(unittest.TestCase): class TelegramParserV4_2Test(unittest.TestCase):
""" Test parsing of a DSMR v4.2 telegram. """ """ Test parsing of a DSMR v4.2 telegram. """
def test_valid(self):
# No exception is raised.
TelegramParserV4.validate_telegram_checksum(
TELEGRAM_V4_2
)
def test_invalid(self):
# Remove one the electricty used data value. This causes the checksum to
# not match anymore.
telegram = [line
for line in TELEGRAM_V4_2
if '1-0:1.8.1' not in line]
with self.assertRaises(InvalidChecksumError):
TelegramParserV4.validate_telegram_checksum(telegram)
def test_missing_checksum(self):
# Remove the checksum value causing a ParseError.
telegram = TELEGRAM_V4_2[:-1]
with self.assertRaises(ParseError):
TelegramParserV4.validate_telegram_checksum(telegram)
def test_parse(self): def test_parse(self):
parser = TelegramParser(telegram_specifications.V4) parser = TelegramParser(telegram_specifications.V4)
result = parser.parse(TELEGRAM_V4_2) result = parser.parse(TELEGRAM_V4_2)
@ -72,13 +98,13 @@ class TelegramParserV4_2Test(unittest.TestCase):
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1], CosemObject) assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1], CosemObject)
assert result[obis.ELECTRICITY_USED_TARIFF_1].unit == 'kWh' assert result[obis.ELECTRICITY_USED_TARIFF_1].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1].value, Decimal) assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1].value, Decimal)
assert result[obis.ELECTRICITY_USED_TARIFF_1].value == Decimal('1511.267') assert result[obis.ELECTRICITY_USED_TARIFF_1].value == Decimal('1581.123')
# ELECTRICITY_USED_TARIFF_2 (1-0:1.8.2) # ELECTRICITY_USED_TARIFF_2 (1-0:1.8.2)
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2], CosemObject) assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2], CosemObject)
assert result[obis.ELECTRICITY_USED_TARIFF_2].unit == 'kWh' assert result[obis.ELECTRICITY_USED_TARIFF_2].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2].value, Decimal) assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2].value, Decimal)
assert result[obis.ELECTRICITY_USED_TARIFF_2].value == Decimal('1265.173') assert result[obis.ELECTRICITY_USED_TARIFF_2].value == Decimal('1435.706')
# ELECTRICITY_DELIVERED_TARIFF_1 (1-0:2.8.1) # ELECTRICITY_DELIVERED_TARIFF_1 (1-0:2.8.1)
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_1], CosemObject) assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_1], CosemObject)
@ -96,19 +122,19 @@ class TelegramParserV4_2Test(unittest.TestCase):
assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF], CosemObject) assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF], CosemObject)
assert result[obis.ELECTRICITY_ACTIVE_TARIFF].unit is None assert result[obis.ELECTRICITY_ACTIVE_TARIFF].unit is None
assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF].value, str) assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF].value, str)
assert result[obis.ELECTRICITY_ACTIVE_TARIFF].value == '0001' assert result[obis.ELECTRICITY_ACTIVE_TARIFF].value == '0002'
# EQUIPMENT_IDENTIFIER (0-0:96.1.1) # EQUIPMENT_IDENTIFIER (0-0:96.1.1)
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER], CosemObject) assert isinstance(result[obis.EQUIPMENT_IDENTIFIER], CosemObject)
assert result[obis.EQUIPMENT_IDENTIFIER].unit is None assert result[obis.EQUIPMENT_IDENTIFIER].unit is None
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER].value, str) assert isinstance(result[obis.EQUIPMENT_IDENTIFIER].value, str)
assert result[obis.EQUIPMENT_IDENTIFIER].value == '1231231231231231231231231231231231' assert result[obis.EQUIPMENT_IDENTIFIER].value == '3960221976967177082151037881335713'
# CURRENT_ELECTRICITY_USAGE (1-0:1.7.0) # CURRENT_ELECTRICITY_USAGE (1-0:1.7.0)
assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE], CosemObject) assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE], CosemObject)
assert result[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW' assert result[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW'
assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE].value, Decimal) assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE].value, Decimal)
assert result[obis.CURRENT_ELECTRICITY_USAGE].value == Decimal('0.235') assert result[obis.CURRENT_ELECTRICITY_USAGE].value == Decimal('2.027')
# CURRENT_ELECTRICITY_DELIVERY (1-0:2.7.0) # CURRENT_ELECTRICITY_DELIVERY (1-0:2.7.0)
assert isinstance(result[obis.CURRENT_ELECTRICITY_DELIVERY], CosemObject) assert isinstance(result[obis.CURRENT_ELECTRICITY_DELIVERY], CosemObject)
@ -178,19 +204,19 @@ class TelegramParserV4_2Test(unittest.TestCase):
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE], CosemObject) assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].unit == 'kW' assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].value, Decimal) assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].value == Decimal('0.095') assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].value == Decimal('0.170')
# INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE (1-0:41.7.0) # INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE (1-0:41.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE], CosemObject) assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].unit == 'kW' assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].value, Decimal) assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].value == Decimal('0.025') assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].value == Decimal('1.247')
# INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE (1-0:61.7.0) # INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE (1-0:61.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE], CosemObject) assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].unit == 'kW' assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].value, Decimal) assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].value == Decimal('0.115') assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].value == Decimal('0.209')
# INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE (1-0:22.7.0) # INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE (1-0:22.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE], CosemObject) assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE], CosemObject)
@ -214,13 +240,13 @@ class TelegramParserV4_2Test(unittest.TestCase):
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS], CosemObject) assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS], CosemObject)
assert result[obis.EQUIPMENT_IDENTIFIER_GAS].unit is None assert result[obis.EQUIPMENT_IDENTIFIER_GAS].unit is None
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS].value, str) assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS].value, str)
assert result[obis.EQUIPMENT_IDENTIFIER_GAS].value == '3404856892390357246729543587524029' assert result[obis.EQUIPMENT_IDENTIFIER_GAS].value == '4819243993373755377509728609491464'
# HOURLY_GAS_METER_READING (0-1:24.2.1) # HOURLY_GAS_METER_READING (0-1:24.2.1)
assert isinstance(result[obis.HOURLY_GAS_METER_READING], MBusObject) assert isinstance(result[obis.HOURLY_GAS_METER_READING], MBusObject)
assert result[obis.HOURLY_GAS_METER_READING].unit == 'm3' assert result[obis.HOURLY_GAS_METER_READING].unit == 'm3'
assert isinstance(result[obis.HOURLY_GAS_METER_READING].value, Decimal) assert isinstance(result[obis.HOURLY_GAS_METER_READING].value, Decimal)
assert result[obis.HOURLY_GAS_METER_READING].value == Decimal('915.219') assert result[obis.HOURLY_GAS_METER_READING].value == Decimal('981.443')
# POWER_EVENT_FAILURE_LOG (99.97.0) # POWER_EVENT_FAILURE_LOG (99.97.0)
# TODO to be implemented # TODO to be implemented

View File

@ -3,12 +3,35 @@
from unittest.mock import Mock from unittest.mock import Mock
import pytest import pytest
from dsmr_parser import obis_references as obis from dsmr_parser import obis_references as obis
from dsmr_parser import telegram_specifications from dsmr_parser import telegram_specifications
from dsmr_parser.parsers import TelegramParserV2_2 from dsmr_parser.parsers import TelegramParserV2_2
from dsmr_parser.protocol import DSMRProtocol from dsmr_parser.protocol import DSMRProtocol
from .test_parse_v2_2 import TELEGRAM_V2_2
TELEGRAM_V2_2 = [
"/ISk5\2MT382-1004",
"",
"0-0:96.1.1(00000000000000)",
"1-0:1.8.1(00001.001*kWh)",
"1-0:1.8.2(00001.001*kWh)",
"1-0:2.8.1(00001.001*kWh)",
"1-0:2.8.2(00001.001*kWh)",
"0-0:96.14.0(0001)",
"1-0:1.7.0(0001.01*kW)",
"1-0:2.7.0(0000.00*kW)",
"0-0:17.0.0(0999.00*kW)",
"0-0:96.3.10(1)",
"0-0:96.13.1()",
"0-0:96.13.0()",
"0-1:24.1.0(3)",
"0-1:96.1.0(000000000000)",
"0-1:24.3.0(161107190000)(00)(60)(1)(0-1:24.2.1)(m3)",
"(00001.001)",
"0-1:24.4.0(1)",
"!",
]
@pytest.fixture @pytest.fixture

View File

@ -9,6 +9,7 @@ deps=
pytest-asyncio pytest-asyncio
pytest-catchlog pytest-catchlog
pytest-mock pytest-mock
PyCRC
commands= commands=
py.test --cov=dsmr_parser test {posargs} py.test --cov=dsmr_parser test {posargs}
pylama dsmr_parser test pylama dsmr_parser test