dev progress
This commit is contained in:
		
							parent
							
								
									60317a0dc5
								
							
						
					
					
						commit
						0e7819b535
					
				| @ -67,7 +67,7 @@ class DSMRProtocol(asyncio.Protocol): | |||||||
|         # callback to call on complete telegram |         # callback to call on complete telegram | ||||||
|         self.telegram_callback = telegram_callback |         self.telegram_callback = telegram_callback | ||||||
|         # buffer to keep incomplete incoming data |         # buffer to keep incomplete incoming data | ||||||
|         self.telegram_buffer = TelegramBuffer(self.handle_telegram) |         self.telegram_buffer = TelegramBuffer() | ||||||
|         # keep a lock until the connection is closed |         # keep a lock until the connection is closed | ||||||
|         self._closed = asyncio.Event() |         self._closed = asyncio.Event() | ||||||
| 
 | 
 | ||||||
| @ -80,7 +80,8 @@ class DSMRProtocol(asyncio.Protocol): | |||||||
|         """Add incoming data to buffer.""" |         """Add incoming data to buffer.""" | ||||||
|         data = data.decode('ascii') |         data = data.decode('ascii') | ||||||
|         self.log.debug('received data: %s', data) |         self.log.debug('received data: %s', data) | ||||||
|         self.telegram_buffer.append(data) |         self.telegram_buffer.put(data) | ||||||
|  |         map(self.handle_telegram, self.telegram_buffer.get_all()) | ||||||
| 
 | 
 | ||||||
|     def connection_lost(self, exc): |     def connection_lost(self, exc): | ||||||
|         """Stop when connection is lost.""" |         """Stop when connection is lost.""" | ||||||
| @ -94,8 +95,12 @@ class DSMRProtocol(asyncio.Protocol): | |||||||
|         """Send off parsed telegram to handling callback.""" |         """Send off parsed telegram to handling callback.""" | ||||||
|         self.log.debug('got telegram: %s', telegram) |         self.log.debug('got telegram: %s', telegram) | ||||||
| 
 | 
 | ||||||
|         if self.telegram_callback: |         try: | ||||||
|             self.telegram_callback(telegram) |             parsed_telegram = self.telegram_parser.parse(telegram) | ||||||
|  |         except ParseError: | ||||||
|  |             self.log.exception("failed to parse telegram") | ||||||
|  |         else: | ||||||
|  |             self.telegram_callback(parsed_telegram) | ||||||
| 
 | 
 | ||||||
|     @asyncio.coroutine |     @asyncio.coroutine | ||||||
|     def wait_closed(self): |     def wait_closed(self): | ||||||
|  | |||||||
| @ -63,7 +63,7 @@ class SerialReader(object): | |||||||
|             telegram_parser = TelegramParser |             telegram_parser = TelegramParser | ||||||
| 
 | 
 | ||||||
|         self.telegram_parser = telegram_parser(telegram_specification) |         self.telegram_parser = telegram_parser(telegram_specification) | ||||||
|         self.telegram_buffer = TelegramBuffer(self.handle_telegram) |         self.telegram_buffer = TelegramBuffer() | ||||||
| 
 | 
 | ||||||
|     def read(self): |     def read(self): | ||||||
|         """ |         """ | ||||||
| @ -75,15 +75,16 @@ class SerialReader(object): | |||||||
|         with serial.Serial(**self.serial_settings) as serial_handle: |         with serial.Serial(**self.serial_settings) as serial_handle: | ||||||
|             while True: |             while True: | ||||||
|                 data = serial_handle.readline() |                 data = serial_handle.readline() | ||||||
|                 self.telegram_buffer.append(data.decode('ascii')) |                 self.telegram_buffer.put(data.decode('ascii')) | ||||||
| 
 | 
 | ||||||
|     def handle_telegram(self, telegram): |                 for telegram in self.telegram_buffer.get_all(): | ||||||
|                     try: |                     try: | ||||||
|                         yield self.telegram_parser.parse(telegram) |                         yield self.telegram_parser.parse(telegram) | ||||||
|                     except ParseError as e: |                     except ParseError as e: | ||||||
|                         logger.error('Failed to parse telegram: %s', e) |                         logger.error('Failed to parse telegram: %s', e) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| class AsyncSerialReader(SerialReader): | class AsyncSerialReader(SerialReader): | ||||||
|     """Serial reader using asyncio pyserial.""" |     """Serial reader using asyncio pyserial.""" | ||||||
| 
 | 
 | ||||||
| @ -108,16 +109,16 @@ class AsyncSerialReader(SerialReader): | |||||||
|             # Read line if available or give control back to loop until new |             # Read line if available or give control back to loop until new | ||||||
|             # data has arrived. |             # data has arrived. | ||||||
|             data = yield from reader.readline() |             data = yield from reader.readline() | ||||||
|             self.telegram_buffer.append(data.decode('ascii')) |             self.telegram_buffer.put(data.decode('ascii')) | ||||||
| 
 | 
 | ||||||
|             # TODO |             for telegram in self.telegram_buffer.get_all(): | ||||||
|             # try: |                 try: | ||||||
|             #     # Push new parsed telegram onto queue. |                     # Push new parsed telegram onto queue. | ||||||
|             #     queue.put_nowait( |                     queue.put_nowait( | ||||||
|             #         self.telegram_parser.parse(telegram) |                         self.telegram_parser.parse(telegram) | ||||||
|             #     ) |                     ) | ||||||
|             # except ParseError as e: |                 except ParseError as e: | ||||||
|             #     logger.warning('Failed to parse telegram: %s', e) |                     logger.warning('Failed to parse telegram: %s', e) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class TelegramBuffer(object): | class TelegramBuffer(object): | ||||||
| @ -126,21 +127,25 @@ class TelegramBuffer(object): | |||||||
|     when complete. |     when complete. | ||||||
|     """ |     """ | ||||||
| 
 | 
 | ||||||
|     def __init__(self, callback): |     def __init__(self): | ||||||
|         self._buffer = '' |         self._buffer = '' | ||||||
|         self._callback = callback |  | ||||||
| 
 | 
 | ||||||
|     def append(self, data): |     def get_all(self): | ||||||
|  |         """ | ||||||
|  |         Remove complete telegrams from buffer and yield them | ||||||
|  |         :rtype generator: | ||||||
|  |         """ | ||||||
|  |         for telegram in self._find_telegrams(): | ||||||
|  |             self._remove(telegram) | ||||||
|  |             yield telegram | ||||||
|  | 
 | ||||||
|  |     def put(self, data): | ||||||
|         """ |         """ | ||||||
|         Add telegram data to buffer. |         Add telegram data to buffer. | ||||||
|         :param str data: chars, lines or full telegram strings of telegram data |         :param str data: chars, lines or full telegram strings of telegram data | ||||||
|         """ |         """ | ||||||
|         self._buffer += data |         self._buffer += data | ||||||
| 
 | 
 | ||||||
|         for telegram in self._find_telegrams(): |  | ||||||
|             self._callback(telegram) |  | ||||||
|             self._remove(telegram) |  | ||||||
| 
 |  | ||||||
|     def _remove(self, telegram): |     def _remove(self, telegram): | ||||||
|         """ |         """ | ||||||
|         Remove telegram from buffer and incomplete data preceding it. This |         Remove telegram from buffer and incomplete data preceding it. This | ||||||
|  | |||||||
| @ -8,40 +8,51 @@ from test.example_telegrams import TELEGRAM_V2_2, TELEGRAM_V4_2 | |||||||
| class TelegramBufferTest(TestCase): | class TelegramBufferTest(TestCase): | ||||||
| 
 | 
 | ||||||
|     def setUp(self): |     def setUp(self): | ||||||
|         self.callback = mock.MagicMock() |         self.telegram_buffer = TelegramBuffer() | ||||||
|         self.telegram_buffer = TelegramBuffer(self.callback) |  | ||||||
| 
 | 
 | ||||||
|     def test_v22_telegram(self): |     def test_v22_telegram(self): | ||||||
|         self.telegram_buffer.append(TELEGRAM_V2_2) |         self.telegram_buffer.put(TELEGRAM_V2_2) | ||||||
| 
 | 
 | ||||||
|         self.callback.assert_called_once_with(TELEGRAM_V2_2) |         telegram = next(self.telegram_buffer.get_all()) | ||||||
|  | 
 | ||||||
|  |         self.assertEqual(telegram, TELEGRAM_V2_2) | ||||||
|         self.assertEqual(self.telegram_buffer._buffer, '') |         self.assertEqual(self.telegram_buffer._buffer, '') | ||||||
| 
 | 
 | ||||||
|     def test_v42_telegram(self): |     def test_v42_telegram(self): | ||||||
|         self.telegram_buffer.append(TELEGRAM_V4_2) |         self.telegram_buffer.put(TELEGRAM_V4_2) | ||||||
| 
 | 
 | ||||||
|         self.callback.assert_called_once_with(TELEGRAM_V4_2) |         telegram = next(self.telegram_buffer.get_all()) | ||||||
|  | 
 | ||||||
|  |         self.assertEqual(telegram, TELEGRAM_V4_2) | ||||||
|         self.assertEqual(self.telegram_buffer._buffer, '') |         self.assertEqual(self.telegram_buffer._buffer, '') | ||||||
| 
 | 
 | ||||||
|     def test_multiple_mixed_telegrams(self): |     def test_multiple_mixed_telegrams(self): | ||||||
|         self.telegram_buffer.append( |         self.telegram_buffer.put( | ||||||
|             ''.join((TELEGRAM_V2_2, TELEGRAM_V4_2, TELEGRAM_V2_2)) |             ''.join((TELEGRAM_V2_2, TELEGRAM_V4_2, TELEGRAM_V2_2)) | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         self.callback.assert_has_calls([ |         telegrams = list(self.telegram_buffer.get_all()) | ||||||
|             call(TELEGRAM_V2_2), | 
 | ||||||
|             call(TELEGRAM_V4_2), |         self.assertListEqual( | ||||||
|             call(TELEGRAM_V2_2), |             telegrams, | ||||||
|         ]) |             [ | ||||||
|  |                 TELEGRAM_V2_2, | ||||||
|  |                 TELEGRAM_V4_2, | ||||||
|  |                 TELEGRAM_V2_2 | ||||||
|  |             ] | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|         self.assertEqual(self.telegram_buffer._buffer, '') |         self.assertEqual(self.telegram_buffer._buffer, '') | ||||||
| 
 | 
 | ||||||
|     def test_v42_telegram_preceded_with_unclosed_telegram(self): |     def test_v42_telegram_preceded_with_unclosed_telegram(self): | ||||||
|         # There are unclosed telegrams at the start of the buffer. |         # There are unclosed telegrams at the start of the buffer. | ||||||
|         incomplete_telegram = TELEGRAM_V4_2[:-1] |         incomplete_telegram = TELEGRAM_V4_2[:-1] | ||||||
| 
 | 
 | ||||||
|         self.telegram_buffer.append(incomplete_telegram + TELEGRAM_V4_2) |         self.telegram_buffer.put(incomplete_telegram + TELEGRAM_V4_2) | ||||||
| 
 | 
 | ||||||
|         self.callback.assert_called_once_with(TELEGRAM_V4_2) |         telegram = next(self.telegram_buffer.get_all()) | ||||||
|  | 
 | ||||||
|  |         self.assertEqual(telegram, TELEGRAM_V4_2) | ||||||
|         self.assertEqual(self.telegram_buffer._buffer, '') |         self.assertEqual(self.telegram_buffer._buffer, '') | ||||||
| 
 | 
 | ||||||
|     def test_v42_telegram_preceded_with_unopened_telegram(self): |     def test_v42_telegram_preceded_with_unopened_telegram(self): | ||||||
| @ -49,37 +60,47 @@ class TelegramBufferTest(TestCase): | |||||||
|         # the buffer was being filled while the telegram was outputted halfway. |         # the buffer was being filled while the telegram was outputted halfway. | ||||||
|         incomplete_telegram = TELEGRAM_V4_2[1:] |         incomplete_telegram = TELEGRAM_V4_2[1:] | ||||||
| 
 | 
 | ||||||
|         self.telegram_buffer.append(incomplete_telegram + TELEGRAM_V4_2) |         self.telegram_buffer.put(incomplete_telegram + TELEGRAM_V4_2) | ||||||
| 
 | 
 | ||||||
|         self.callback.assert_called_once_with(TELEGRAM_V4_2) |         telegram = next(self.telegram_buffer.get_all()) | ||||||
|  | 
 | ||||||
|  |         self.assertEqual(telegram, TELEGRAM_V4_2) | ||||||
|         self.assertEqual(self.telegram_buffer._buffer, '') |         self.assertEqual(self.telegram_buffer._buffer, '') | ||||||
| 
 | 
 | ||||||
|     def test_v42_telegram_trailed_by_unclosed_telegram(self): |     def test_v42_telegram_trailed_by_unclosed_telegram(self): | ||||||
|         incomplete_telegram = TELEGRAM_V4_2[:-1] |         incomplete_telegram = TELEGRAM_V4_2[:-1] | ||||||
| 
 | 
 | ||||||
|         self.telegram_buffer.append(TELEGRAM_V4_2 + incomplete_telegram) |         self.telegram_buffer.put(TELEGRAM_V4_2 + incomplete_telegram) | ||||||
| 
 | 
 | ||||||
|         self.callback.assert_called_once_with(TELEGRAM_V4_2) |         telegram = next(self.telegram_buffer.get_all()) | ||||||
|  | 
 | ||||||
|  |         self.assertEqual(telegram, TELEGRAM_V4_2) | ||||||
|         self.assertEqual(self.telegram_buffer._buffer, incomplete_telegram) |         self.assertEqual(self.telegram_buffer._buffer, incomplete_telegram) | ||||||
| 
 | 
 | ||||||
|     def test_v42_telegram_trailed_by_unopened_telegram(self): |     def test_v42_telegram_trailed_by_unopened_telegram(self): | ||||||
|         incomplete_telegram = TELEGRAM_V4_2[1:] |         incomplete_telegram = TELEGRAM_V4_2[1:] | ||||||
| 
 | 
 | ||||||
|         self.telegram_buffer.append(TELEGRAM_V4_2 + incomplete_telegram) |         self.telegram_buffer.put(TELEGRAM_V4_2 + incomplete_telegram) | ||||||
| 
 | 
 | ||||||
|         self.callback.assert_called_once_with(TELEGRAM_V4_2) |         telegram = next(self.telegram_buffer.get_all()) | ||||||
|  | 
 | ||||||
|  |         self.assertEqual(telegram, TELEGRAM_V4_2) | ||||||
|         self.assertEqual(self.telegram_buffer._buffer, incomplete_telegram) |         self.assertEqual(self.telegram_buffer._buffer, incomplete_telegram) | ||||||
| 
 | 
 | ||||||
|     def test_v42_telegram_adding_line_by_line(self): |     def test_v42_telegram_adding_line_by_line(self): | ||||||
|         for line in TELEGRAM_V4_2.splitlines(keepends=True): |         for line in TELEGRAM_V4_2.splitlines(keepends=True): | ||||||
|             self.telegram_buffer.append(line) |             self.telegram_buffer.put(line) | ||||||
| 
 | 
 | ||||||
|         self.callback.assert_called_once_with(TELEGRAM_V4_2) |         telegram = next(self.telegram_buffer.get_all()) | ||||||
|  | 
 | ||||||
|  |         self.assertEqual(telegram, TELEGRAM_V4_2) | ||||||
|         self.assertEqual(self.telegram_buffer._buffer, '') |         self.assertEqual(self.telegram_buffer._buffer, '') | ||||||
| 
 | 
 | ||||||
|     def test_v42_telegram_adding_char_by_char(self): |     def test_v42_telegram_adding_char_by_char(self): | ||||||
|         for char in TELEGRAM_V4_2: |         for char in TELEGRAM_V4_2: | ||||||
|             self.telegram_buffer.append(char) |             self.telegram_buffer.put(char) | ||||||
| 
 | 
 | ||||||
|         self.callback.assert_called_once_with(TELEGRAM_V4_2) |         telegram = next(self.telegram_buffer.get_all()) | ||||||
|  | 
 | ||||||
|  |         self.assertEqual(telegram, TELEGRAM_V4_2) | ||||||
|         self.assertEqual(self.telegram_buffer._buffer, '') |         self.assertEqual(self.telegram_buffer._buffer, '') | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user