From 60317a0dc52cc63918e9b884fc1ebcbad99ea4ef Mon Sep 17 00:00:00 2001 From: Nigel Dokter Date: Sat, 7 Jan 2017 21:26:21 +0100 Subject: [PATCH] dev progress --- dsmr_parser/protocol.py | 36 +++---------------- dsmr_parser/serial.py | 78 +++++++++++++++++------------------------ test/telegram_buffer.py | 2 -- 3 files changed, 36 insertions(+), 80 deletions(-) diff --git a/dsmr_parser/protocol.py b/dsmr_parser/protocol.py index dc5c54e..aeffc10 100644 --- a/dsmr_parser/protocol.py +++ b/dsmr_parser/protocol.py @@ -10,7 +10,7 @@ from . import telegram_specifications from .exceptions import ParseError from .parsers import TelegramParserV2_2, TelegramParserV4 from .serial import (SERIAL_SETTINGS_V2_2, SERIAL_SETTINGS_V4, - is_end_of_telegram, is_start_of_telegram) + is_end_of_telegram, is_start_of_telegram, TelegramBuffer) def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None): @@ -66,10 +66,8 @@ class DSMRProtocol(asyncio.Protocol): self.telegram_parser = telegram_parser # callback to call on complete telegram self.telegram_callback = telegram_callback - # buffer to keep incoming telegram lines - self.telegram = '' # buffer to keep incomplete incoming data - self.buffer = '' + self.telegram_buffer = TelegramBuffer(self.handle_telegram) # keep a lock until the connection is closed self._closed = asyncio.Event() @@ -81,34 +79,8 @@ class DSMRProtocol(asyncio.Protocol): def data_received(self, data): """Add incoming data to buffer.""" data = data.decode('ascii') - self.log.debug('received data: %s', data.strip()) - self.buffer += data - self.handle_lines() - - def handle_lines(self): - """Assemble incoming data into single lines.""" - crlf = "\r\n" - while crlf in self.buffer: - line, self.buffer = self.buffer.split(crlf, 1) - self.log.debug('got line: %s', line) - line += crlf # add the trailing crlf again - - # Telegrams need to be complete because the values belong to a - # particular reading and can also be related to eachother. - if not self.telegram and not is_start_of_telegram(line): - continue - - self.telegram += line - - if is_end_of_telegram(line): - try: - parsed_telegram = self.telegram_parser.parse(self.telegram) - except ParseError as e: - self.log.error('Failed to parse telegram: %s', e) - else: - self.handle_telegram(parsed_telegram) - - self.telegram = [] + self.log.debug('received data: %s', data) + self.telegram_buffer.append(data) def connection_lost(self, exc): """Stop when connection is lost.""" diff --git a/dsmr_parser/serial.py b/dsmr_parser/serial.py index 3e51f48..8aebb03 100644 --- a/dsmr_parser/serial.py +++ b/dsmr_parser/serial.py @@ -63,6 +63,7 @@ class SerialReader(object): telegram_parser = TelegramParser self.telegram_parser = telegram_parser(telegram_specification) + self.telegram_buffer = TelegramBuffer(self.handle_telegram) def read(self): """ @@ -72,25 +73,15 @@ class SerialReader(object): :rtype: generator """ with serial.Serial(**self.serial_settings) as serial_handle: - telegram = '' - while True: - line = serial_handle.readline() - line.decode('ascii') + data = serial_handle.readline() + self.telegram_buffer.append(data.decode('ascii')) - # Build up buffer from the start of the telegram. - if not telegram and not is_start_of_telegram(line): - continue - - telegram += line - - if is_end_of_telegram(line): - try: - yield self.telegram_parser.parse(telegram) - except ParseError as e: - logger.error('Failed to parse telegram: %s', e) - - telegram = '' + def handle_telegram(self, telegram): + try: + yield self.telegram_parser.parse(telegram) + except ParseError as e: + logger.error('Failed to parse telegram: %s', e) class AsyncSerialReader(SerialReader): @@ -113,48 +104,40 @@ class AsyncSerialReader(SerialReader): conn = serial_asyncio.open_serial_connection(**self.serial_settings) reader, _ = yield from conn - telegram = '' - while True: # Read line if available or give control back to loop until new # data has arrived. - line = yield from reader.readline() - line = line.decode('ascii') + data = yield from reader.readline() + self.telegram_buffer.append(data.decode('ascii')) - # Build up buffer from the start of the telegram. - if not telegram and not is_start_of_telegram(line): - continue - - telegram += line - - if is_end_of_telegram(line): - try: - # Push new parsed telegram onto queue. - queue.put_nowait( - self.telegram_parser.parse(telegram) - ) - except ParseError as e: - logger.warning('Failed to parse telegram: %s', e) - - telegram = '' + # TODO + # try: + # # Push new parsed telegram onto queue. + # queue.put_nowait( + # self.telegram_parser.parse(telegram) + # ) + # except ParseError as e: + # logger.warning('Failed to parse telegram: %s', e) class TelegramBuffer(object): + """ + Used as a buffer for a stream or telegram data. Returns telegram from buffer + when complete. + """ def __init__(self, callback): - self._callback = callback self._buffer = '' + self._callback = callback def append(self, data): """ - Add telegram data to buffer. The callback is called with a full telegram - when data is complete. - :param str data: chars or lines of telegram data - :return: + Add telegram data to buffer. + :param str data: chars, lines or full telegram strings of telegram data """ self._buffer += data - for telegram in self.find_telegrams(self._buffer): + for telegram in self._find_telegrams(): self._callback(telegram) self._remove(telegram) @@ -170,8 +153,7 @@ class TelegramBuffer(object): self._buffer = self._buffer[index:] - @staticmethod - def find_telegrams(buffer): + def _find_telegrams(self): """ Find complete telegrams from buffer from start ('/') till ending checksum ('!AB12\r\n'). @@ -183,4 +165,8 @@ class TelegramBuffer(object): # checksum that's found. # - The checksum is optional '{0,4}' because not all telegram versions # support it. - return re.findall(r'\/[^\/]+?\![A-F0-9]{0,4}\r\n', buffer, re.DOTALL) + return re.findall( + r'\/[^\/]+?\![A-F0-9]{0,4}\r\n', + self._buffer, + re.DOTALL + ) diff --git a/test/telegram_buffer.py b/test/telegram_buffer.py index 92df5b9..2cb479b 100644 --- a/test/telegram_buffer.py +++ b/test/telegram_buffer.py @@ -71,7 +71,6 @@ class TelegramBufferTest(TestCase): self.assertEqual(self.telegram_buffer._buffer, incomplete_telegram) def test_v42_telegram_adding_line_by_line(self): - for line in TELEGRAM_V4_2.splitlines(keepends=True): self.telegram_buffer.append(line) @@ -79,7 +78,6 @@ class TelegramBufferTest(TestCase): self.assertEqual(self.telegram_buffer._buffer, '') def test_v42_telegram_adding_char_by_char(self): - for char in TELEGRAM_V4_2: self.telegram_buffer.append(char)