diff --git a/dsmr_parser/protocol.py b/dsmr_parser/protocol.py index aeffc10..6342e9d 100644 --- a/dsmr_parser/protocol.py +++ b/dsmr_parser/protocol.py @@ -67,7 +67,7 @@ class DSMRProtocol(asyncio.Protocol): # callback to call on complete telegram self.telegram_callback = telegram_callback # buffer to keep incomplete incoming data - self.telegram_buffer = TelegramBuffer(self.handle_telegram) + self.telegram_buffer = TelegramBuffer() # keep a lock until the connection is closed self._closed = asyncio.Event() @@ -80,7 +80,8 @@ class DSMRProtocol(asyncio.Protocol): """Add incoming data to buffer.""" data = data.decode('ascii') self.log.debug('received data: %s', data) - self.telegram_buffer.append(data) + self.telegram_buffer.put(data) + map(self.handle_telegram, self.telegram_buffer.get_all()) def connection_lost(self, exc): """Stop when connection is lost.""" @@ -94,8 +95,12 @@ class DSMRProtocol(asyncio.Protocol): """Send off parsed telegram to handling callback.""" self.log.debug('got telegram: %s', telegram) - if self.telegram_callback: - self.telegram_callback(telegram) + try: + parsed_telegram = self.telegram_parser.parse(telegram) + except ParseError: + self.log.exception("failed to parse telegram") + else: + self.telegram_callback(parsed_telegram) @asyncio.coroutine def wait_closed(self): diff --git a/dsmr_parser/serial.py b/dsmr_parser/serial.py index 8aebb03..5ed809e 100644 --- a/dsmr_parser/serial.py +++ b/dsmr_parser/serial.py @@ -63,7 +63,7 @@ class SerialReader(object): telegram_parser = TelegramParser self.telegram_parser = telegram_parser(telegram_specification) - self.telegram_buffer = TelegramBuffer(self.handle_telegram) + self.telegram_buffer = TelegramBuffer() def read(self): """ @@ -75,13 +75,14 @@ class SerialReader(object): with serial.Serial(**self.serial_settings) as serial_handle: while True: data = serial_handle.readline() - self.telegram_buffer.append(data.decode('ascii')) + self.telegram_buffer.put(data.decode('ascii')) + + for telegram in self.telegram_buffer.get_all(): + try: + yield self.telegram_parser.parse(telegram) + except ParseError as e: + logger.error('Failed to parse telegram: %s', e) - def handle_telegram(self, telegram): - try: - yield self.telegram_parser.parse(telegram) - except ParseError as e: - logger.error('Failed to parse telegram: %s', e) class AsyncSerialReader(SerialReader): @@ -108,16 +109,16 @@ class AsyncSerialReader(SerialReader): # Read line if available or give control back to loop until new # data has arrived. data = yield from reader.readline() - self.telegram_buffer.append(data.decode('ascii')) + self.telegram_buffer.put(data.decode('ascii')) - # TODO - # try: - # # Push new parsed telegram onto queue. - # queue.put_nowait( - # self.telegram_parser.parse(telegram) - # ) - # except ParseError as e: - # logger.warning('Failed to parse telegram: %s', e) + for telegram in self.telegram_buffer.get_all(): + try: + # Push new parsed telegram onto queue. + queue.put_nowait( + self.telegram_parser.parse(telegram) + ) + except ParseError as e: + logger.warning('Failed to parse telegram: %s', e) class TelegramBuffer(object): @@ -126,21 +127,25 @@ class TelegramBuffer(object): when complete. """ - def __init__(self, callback): + def __init__(self): self._buffer = '' - self._callback = callback - def append(self, data): + def get_all(self): + """ + Remove complete telegrams from buffer and yield them + :rtype generator: + """ + for telegram in self._find_telegrams(): + self._remove(telegram) + yield telegram + + def put(self, data): """ Add telegram data to buffer. :param str data: chars, lines or full telegram strings of telegram data """ self._buffer += data - for telegram in self._find_telegrams(): - self._callback(telegram) - self._remove(telegram) - def _remove(self, telegram): """ Remove telegram from buffer and incomplete data preceding it. This diff --git a/test/telegram_buffer.py b/test/telegram_buffer.py index 2cb479b..17a3643 100644 --- a/test/telegram_buffer.py +++ b/test/telegram_buffer.py @@ -8,40 +8,51 @@ from test.example_telegrams import TELEGRAM_V2_2, TELEGRAM_V4_2 class TelegramBufferTest(TestCase): def setUp(self): - self.callback = mock.MagicMock() - self.telegram_buffer = TelegramBuffer(self.callback) + self.telegram_buffer = TelegramBuffer() def test_v22_telegram(self): - self.telegram_buffer.append(TELEGRAM_V2_2) + self.telegram_buffer.put(TELEGRAM_V2_2) - self.callback.assert_called_once_with(TELEGRAM_V2_2) + telegram = next(self.telegram_buffer.get_all()) + + self.assertEqual(telegram, TELEGRAM_V2_2) self.assertEqual(self.telegram_buffer._buffer, '') def test_v42_telegram(self): - self.telegram_buffer.append(TELEGRAM_V4_2) + self.telegram_buffer.put(TELEGRAM_V4_2) - self.callback.assert_called_once_with(TELEGRAM_V4_2) + telegram = next(self.telegram_buffer.get_all()) + + self.assertEqual(telegram, TELEGRAM_V4_2) self.assertEqual(self.telegram_buffer._buffer, '') def test_multiple_mixed_telegrams(self): - self.telegram_buffer.append( + self.telegram_buffer.put( ''.join((TELEGRAM_V2_2, TELEGRAM_V4_2, TELEGRAM_V2_2)) ) - self.callback.assert_has_calls([ - call(TELEGRAM_V2_2), - call(TELEGRAM_V4_2), - call(TELEGRAM_V2_2), - ]) + telegrams = list(self.telegram_buffer.get_all()) + + self.assertListEqual( + telegrams, + [ + TELEGRAM_V2_2, + TELEGRAM_V4_2, + TELEGRAM_V2_2 + ] + ) + self.assertEqual(self.telegram_buffer._buffer, '') def test_v42_telegram_preceded_with_unclosed_telegram(self): # There are unclosed telegrams at the start of the buffer. incomplete_telegram = TELEGRAM_V4_2[:-1] - self.telegram_buffer.append(incomplete_telegram + TELEGRAM_V4_2) + self.telegram_buffer.put(incomplete_telegram + TELEGRAM_V4_2) - self.callback.assert_called_once_with(TELEGRAM_V4_2) + telegram = next(self.telegram_buffer.get_all()) + + self.assertEqual(telegram, TELEGRAM_V4_2) self.assertEqual(self.telegram_buffer._buffer, '') def test_v42_telegram_preceded_with_unopened_telegram(self): @@ -49,37 +60,47 @@ class TelegramBufferTest(TestCase): # the buffer was being filled while the telegram was outputted halfway. incomplete_telegram = TELEGRAM_V4_2[1:] - self.telegram_buffer.append(incomplete_telegram + TELEGRAM_V4_2) + self.telegram_buffer.put(incomplete_telegram + TELEGRAM_V4_2) - self.callback.assert_called_once_with(TELEGRAM_V4_2) + telegram = next(self.telegram_buffer.get_all()) + + self.assertEqual(telegram, TELEGRAM_V4_2) self.assertEqual(self.telegram_buffer._buffer, '') def test_v42_telegram_trailed_by_unclosed_telegram(self): incomplete_telegram = TELEGRAM_V4_2[:-1] - self.telegram_buffer.append(TELEGRAM_V4_2 + incomplete_telegram) + self.telegram_buffer.put(TELEGRAM_V4_2 + incomplete_telegram) - self.callback.assert_called_once_with(TELEGRAM_V4_2) + telegram = next(self.telegram_buffer.get_all()) + + self.assertEqual(telegram, TELEGRAM_V4_2) self.assertEqual(self.telegram_buffer._buffer, incomplete_telegram) def test_v42_telegram_trailed_by_unopened_telegram(self): incomplete_telegram = TELEGRAM_V4_2[1:] - self.telegram_buffer.append(TELEGRAM_V4_2 + incomplete_telegram) + self.telegram_buffer.put(TELEGRAM_V4_2 + incomplete_telegram) - self.callback.assert_called_once_with(TELEGRAM_V4_2) + telegram = next(self.telegram_buffer.get_all()) + + self.assertEqual(telegram, TELEGRAM_V4_2) self.assertEqual(self.telegram_buffer._buffer, incomplete_telegram) def test_v42_telegram_adding_line_by_line(self): for line in TELEGRAM_V4_2.splitlines(keepends=True): - self.telegram_buffer.append(line) + self.telegram_buffer.put(line) - self.callback.assert_called_once_with(TELEGRAM_V4_2) + telegram = next(self.telegram_buffer.get_all()) + + self.assertEqual(telegram, TELEGRAM_V4_2) self.assertEqual(self.telegram_buffer._buffer, '') def test_v42_telegram_adding_char_by_char(self): for char in TELEGRAM_V4_2: - self.telegram_buffer.append(char) + self.telegram_buffer.put(char) - self.callback.assert_called_once_with(TELEGRAM_V4_2) + telegram = next(self.telegram_buffer.get_all()) + + self.assertEqual(telegram, TELEGRAM_V4_2) self.assertEqual(self.telegram_buffer._buffer, '')