From e3569e07199ad4db955017a88bdaef413e8c828a Mon Sep 17 00:00:00 2001 From: Johan Bloemberg Date: Mon, 21 Nov 2016 15:47:29 +0100 Subject: [PATCH] Add asyncio protocol implementation. --- .envrc | 1 - dsmr_parser/__main__.py | 30 ++++++++------ dsmr_parser/protocol.py | 90 +++++++++++++++++++++++++++++++++++++++++ test/test_async.py | 57 -------------------------- test/test_protocol.py | 39 ++++++++++++++++++ tox.ini | 1 + 6 files changed, 148 insertions(+), 70 deletions(-) delete mode 100644 .envrc create mode 100644 dsmr_parser/protocol.py delete mode 100644 test/test_async.py create mode 100644 test/test_protocol.py diff --git a/.envrc b/.envrc deleted file mode 100644 index 94840b3..0000000 --- a/.envrc +++ /dev/null @@ -1 +0,0 @@ -layout python3 diff --git a/dsmr_parser/__main__.py b/dsmr_parser/__main__.py index 92c0dce..5cb6b72 100644 --- a/dsmr_parser/__main__.py +++ b/dsmr_parser/__main__.py @@ -1,6 +1,8 @@ import argparse -from dsmr_parser.serial import SERIAL_SETTINGS_V2_2, SERIAL_SETTINGS_V4, SerialReader -from dsmr_parser import telegram_specifications +import asyncio +import logging + +from .protocol import create_dsmr_reader def console(): @@ -11,22 +13,26 @@ def console(): help='port to read DSMR data from') parser.add_argument('--version', default='2.2', choices=['2.2', '4'], help='DSMR version (2.2, 4)') + parser.add_argument('--verbose', '-v', action='count') args = parser.parse_args() - settings = { - '2.2': (SERIAL_SETTINGS_V2_2, telegram_specifications.V2_2), - '4': (SERIAL_SETTINGS_V4, telegram_specifications.V4), - } + if args.verbose: + level = logging.DEBUG + else: + level = logging.ERROR + logging.basicConfig(level=level) - serial_reader = SerialReader( - device=args.device, - serial_settings=settings[args.version][0], - telegram_specification=settings[args.version][1], - ) + loop = asyncio.get_event_loop() - for telegram in serial_reader.read(): + def print_callback(telegram): + """Callback that prints telegram values.""" for obiref, obj in telegram.items(): if obj: print(obj.value, obj.unit) print() + + conn = create_dsmr_reader(args.device, args.version, print_callback, loop=loop) + + loop.create_task(conn) + loop.run_forever() diff --git a/dsmr_parser/protocol.py b/dsmr_parser/protocol.py new file mode 100644 index 0000000..a4e3c1a --- /dev/null +++ b/dsmr_parser/protocol.py @@ -0,0 +1,90 @@ +"""Asyncio protocol implementation for handling telegrams.""" + +import asyncio +import logging +from functools import partial + +from serial_asyncio import create_serial_connection + +from . import telegram_specifications +from .exceptions import ParseError +from .parsers import TelegramParser, TelegramParserV2_2 +from .serial import (SERIAL_SETTINGS_V2_2, SERIAL_SETTINGS_V4, + is_end_of_telegram) + + +def create_dsmr_reader(port, dsmr_version, telegram_callback, loop=None): + """Creates a DSMR asyncio protocol coroutine.""" + + if dsmr_version == '2.2': + specifications = telegram_specifications.V2_2 + telegram_parser = TelegramParserV2_2 + serial_settings = SERIAL_SETTINGS_V2_2 + elif dsmr_version == '4': + specifications = telegram_specifications.V4 + telegram_parser = TelegramParser + serial_settings = SERIAL_SETTINGS_V4 + + serial_settings['url'] = port + + protocol = partial(DSMRProtocol, loop, telegram_parser(specifications), + telegram_callback=telegram_callback) + + conn = create_serial_connection(loop, protocol, **serial_settings) + + return conn + + +class DSMRProtocol(asyncio.Protocol): + """Assemble and handle incoming data into complete DSM telegrams.""" + + transport = None + telegram_callback = None + + def __init__(self, loop, telegram_parser, telegram_callback=None): + """Initialize class.""" + self.loop = loop + self.log = logging.getLogger(__name__) + self.telegram_parser = telegram_parser + # callback to call on complete telegram + self.telegram_callback = telegram_callback + # buffer to keep incoming telegram lines + self.telegram = [] + # buffer to keep incomplete incoming data + self.buffer = '' + + def connection_made(self, transport): + """Just logging for now.""" + self.transport = transport + self.log.debug('connected') + + def data_received(self, data): + """Add incoming data to buffer.""" + data = data.decode() + self.log.debug('received data: %s', data.strip()) + self.buffer += data + self.handle_lines() + + def handle_lines(self): + """Assemble incoming data into single lines.""" + while "\r\n" in self.buffer: + line, self.buffer = self.buffer.split("\r\n", 1) + self.log.debug('got line: %s', line) + self.telegram.append(line) + if is_end_of_telegram(line): + try: + parsed_telegram = self.telegram_parser.parse(self.telegram) + self.handle_telegram(parsed_telegram) + except ParseError: + self.log.exception("failed to parse telegram") + + def connection_lost(self, exc): + """Stop when connection is lost.""" + self.log.error('disconnected') + + def handle_telegram(self, telegram): + """Send off parsed telegram to handling callback.""" + self.log.debug('got telegram: %s', telegram) + + if self.telegram_callback: + self.telegram_callback(telegram) diff --git a/test/test_async.py b/test/test_async.py deleted file mode 100644 index b8a05cb..0000000 --- a/test/test_async.py +++ /dev/null @@ -1,57 +0,0 @@ -"""Test async read/parse.""" - -import pytest -import asyncio -from dsmr_parser.serial import AsyncSerialReader, SERIAL_SETTINGS_V2_2 -from dsmr_parser.obis_references import CURRENT_ELECTRICITY_USAGE, GAS_METER_READING -from dsmr_parser import telegram_specifications - -TELEGRAM_V2_2 = [ - "/ISk5\2MT382-1004", - "", - "0-0:96.1.1(00000000000000)", - "1-0:1.8.1(00001.001*kWh)", - "1-0:1.8.2(00001.001*kWh)", - "1-0:2.8.1(00001.001*kWh)", - "1-0:2.8.2(00001.001*kWh)", - "0-0:96.14.0(0001)", - "1-0:1.7.0(0001.01*kW)", - "1-0:2.7.0(0000.00*kW)", - "0-0:17.0.0(0999.00*kW)", - "0-0:96.3.10(1)", - "0-0:96.13.1()", - "0-0:96.13.0()", - "0-1:24.1.0(3)", - "0-1:96.1.0(000000000000)", - "0-1:24.3.0(161107190000)(00)(60)(1)(0-1:24.2.1)(m3)", - "(00001.001)", - "0-1:24.4.0(1)", - "!", -] - - -@pytest.mark.asyncio -def test_async_read(event_loop, mocker): - """Test async read and parse.""" - - mock_open_serial_connection = mocker.patch('serial_asyncio.open_serial_connection') - mock_open_serial_connection.return_value = (mocker.stub(), None) - - queue = asyncio.Queue() - - serial_reader = AsyncSerialReader( - device='/dev/ttyUSB0', - serial_settings=SERIAL_SETTINGS_V2_2, - telegram_specification=telegram_specifications.V2_2, - ) - - event_loop.run_until_complete(serial_reader.read(queue)) - - assert not queue.get_nowait() - - result = yield from queue.get() - - assert float(result[CURRENT_ELECTRICITY_USAGE].value) == 1.01 - assert result[CURRENT_ELECTRICITY_USAGE].unit == 'kW' - assert float(result[GAS_METER_READING].value) == 1.001 - assert result[GAS_METER_READING].unit == 'm3' diff --git a/test/test_protocol.py b/test/test_protocol.py new file mode 100644 index 0000000..79e7aa7 --- /dev/null +++ b/test/test_protocol.py @@ -0,0 +1,39 @@ +"""Test DSMR serial protocol.""" + +from unittest.mock import Mock + +import pytest +from dsmr_parser import obis_references as obis +from dsmr_parser import telegram_specifications +from dsmr_parser.parsers import TelegramParserV2_2 +from dsmr_parser.protocol import DSMRProtocol + +from .test_parse_v2_2 import TELEGRAM_V2_2 + + +@pytest.fixture +def protocol(): + """DSMRprotocol instance with mocked telegram_callback.""" + + parser = TelegramParserV2_2 + specification = telegram_specifications.V2_2 + + telegram_parser = parser(specification) + return DSMRProtocol(None, telegram_parser, + telegram_callback=Mock()) + + +def test_complete_packet(protocol): + """Protocol should assemble incoming lines into complete packet.""" + + for line in TELEGRAM_V2_2: + protocol.data_received(bytes(line + '\r\n', 'ascii')) + + telegram = protocol.telegram_callback.call_args_list[0][0][0] + assert isinstance(telegram, dict) + + assert float(telegram[obis.CURRENT_ELECTRICITY_USAGE].value) == 1.01 + assert telegram[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW' + + assert float(telegram[obis.GAS_METER_READING].value) == 1.001 + assert telegram[obis.GAS_METER_READING].unit == 'm3' diff --git a/tox.ini b/tox.ini index 9acedbc..616fc67 100644 --- a/tox.ini +++ b/tox.ini @@ -6,6 +6,7 @@ deps= pytest pylama pytest-asyncio + pytest-catchlog pytest-mock commands= py.test test {posargs}