Merge pull request #73 from hogend/keep-alive
Optional keep alive monitoring for TCP/IP connections
This commit is contained in:
		
						commit
						399532f244
					
				| @ -16,8 +16,8 @@ def console(): | |||||||
|                         help='alternatively connect using TCP host.') |                         help='alternatively connect using TCP host.') | ||||||
|     parser.add_argument('--port', default=None, |     parser.add_argument('--port', default=None, | ||||||
|                         help='TCP port to use for connection') |                         help='TCP port to use for connection') | ||||||
|     parser.add_argument('--version', default='2.2', choices=['2.2', '4'], |     parser.add_argument('--version', default='2.2', choices=['2.2', '4', '5', '5B', '5L'], | ||||||
|                         help='DSMR version (2.2, 4)') |                         help='DSMR version (2.2, 4, 5, 5B, 5L)') | ||||||
|     parser.add_argument('--verbose', '-v', action='count') |     parser.add_argument('--verbose', '-v', action='count') | ||||||
| 
 | 
 | ||||||
|     args = parser.parse_args() |     args = parser.parse_args() | ||||||
|  | |||||||
| @ -14,7 +14,7 @@ from dsmr_parser.clients.settings import SERIAL_SETTINGS_V2_2, \ | |||||||
|     SERIAL_SETTINGS_V4, SERIAL_SETTINGS_V5 |     SERIAL_SETTINGS_V4, SERIAL_SETTINGS_V5 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None): | def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None, **kwargs): | ||||||
|     """Creates a DSMR asyncio protocol.""" |     """Creates a DSMR asyncio protocol.""" | ||||||
| 
 | 
 | ||||||
|     if dsmr_version == '2.2': |     if dsmr_version == '2.2': | ||||||
| @ -37,7 +37,7 @@ def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None): | |||||||
|                                   dsmr_version) |                                   dsmr_version) | ||||||
| 
 | 
 | ||||||
|     protocol = partial(DSMRProtocol, loop, TelegramParser(specification), |     protocol = partial(DSMRProtocol, loop, TelegramParser(specification), | ||||||
|                        telegram_callback=telegram_callback) |                        telegram_callback=telegram_callback, **kwargs) | ||||||
| 
 | 
 | ||||||
|     return protocol, serial_settings |     return protocol, serial_settings | ||||||
| 
 | 
 | ||||||
| @ -53,12 +53,14 @@ def create_dsmr_reader(port, dsmr_version, telegram_callback, loop=None): | |||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def create_tcp_dsmr_reader(host, port, dsmr_version, | def create_tcp_dsmr_reader(host, port, dsmr_version, | ||||||
|                            telegram_callback, loop=None): |                            telegram_callback, loop=None, | ||||||
|  |                            keep_alive_interval=None): | ||||||
|     """Creates a DSMR asyncio protocol coroutine using TCP connection.""" |     """Creates a DSMR asyncio protocol coroutine using TCP connection.""" | ||||||
|     if not loop: |     if not loop: | ||||||
|         loop = asyncio.get_event_loop() |         loop = asyncio.get_event_loop() | ||||||
|     protocol, _ = create_dsmr_protocol( |     protocol, _ = create_dsmr_protocol( | ||||||
|         dsmr_version, telegram_callback, loop=loop) |         dsmr_version, telegram_callback, loop=loop, | ||||||
|  |         keep_alive_interval=keep_alive_interval) | ||||||
|     conn = loop.create_connection(protocol, host, port) |     conn = loop.create_connection(protocol, host, port) | ||||||
|     return conn |     return conn | ||||||
| 
 | 
 | ||||||
| @ -69,7 +71,8 @@ class DSMRProtocol(asyncio.Protocol): | |||||||
|     transport = None |     transport = None | ||||||
|     telegram_callback = None |     telegram_callback = None | ||||||
| 
 | 
 | ||||||
|     def __init__(self, loop, telegram_parser, telegram_callback=None): |     def __init__(self, loop, telegram_parser, | ||||||
|  |                  telegram_callback=None, keep_alive_interval=None): | ||||||
|         """Initialize class.""" |         """Initialize class.""" | ||||||
|         self.loop = loop |         self.loop = loop | ||||||
|         self.log = logging.getLogger(__name__) |         self.log = logging.getLogger(__name__) | ||||||
| @ -80,21 +83,38 @@ class DSMRProtocol(asyncio.Protocol): | |||||||
|         self.telegram_buffer = TelegramBuffer() |         self.telegram_buffer = TelegramBuffer() | ||||||
|         # keep a lock until the connection is closed |         # keep a lock until the connection is closed | ||||||
|         self._closed = asyncio.Event() |         self._closed = asyncio.Event() | ||||||
|  |         self._keep_alive_interval = keep_alive_interval | ||||||
|  |         self._active = True | ||||||
| 
 | 
 | ||||||
|     def connection_made(self, transport): |     def connection_made(self, transport): | ||||||
|         """Just logging for now.""" |         """Just logging for now.""" | ||||||
|         self.transport = transport |         self.transport = transport | ||||||
|         self.log.debug('connected') |         self.log.debug('connected') | ||||||
|  |         self._active = False | ||||||
|  |         if self.loop and self._keep_alive_interval: | ||||||
|  |             self.loop.call_later(self._keep_alive_interval, self.keep_alive) | ||||||
| 
 | 
 | ||||||
|     def data_received(self, data): |     def data_received(self, data): | ||||||
|         """Add incoming data to buffer.""" |         """Add incoming data to buffer.""" | ||||||
|         data = data.decode('ascii') |         data = data.decode('ascii') | ||||||
|  |         self._active = True | ||||||
|         self.log.debug('received data: %s', data) |         self.log.debug('received data: %s', data) | ||||||
|         self.telegram_buffer.append(data) |         self.telegram_buffer.append(data) | ||||||
| 
 | 
 | ||||||
|         for telegram in self.telegram_buffer.get_all(): |         for telegram in self.telegram_buffer.get_all(): | ||||||
|             self.handle_telegram(telegram) |             self.handle_telegram(telegram) | ||||||
| 
 | 
 | ||||||
|  |     def keep_alive(self): | ||||||
|  |         if self._active: | ||||||
|  |             self.log.debug('keep-alive checked') | ||||||
|  |             self._active = False | ||||||
|  |             if self.loop: | ||||||
|  |                 self.loop.call_later(self._keep_alive_interval, self.keep_alive) | ||||||
|  |         else: | ||||||
|  |             self.log.warning('keep-alive check failed') | ||||||
|  |             if self.transport: | ||||||
|  |                 self.transport.close() | ||||||
|  | 
 | ||||||
|     def connection_lost(self, exc): |     def connection_lost(self, exc): | ||||||
|         """Stop when connection is lost.""" |         """Stop when connection is lost.""" | ||||||
|         if exc: |         if exc: | ||||||
|  | |||||||
| @ -5,7 +5,7 @@ import unittest | |||||||
| from dsmr_parser import obis_references as obis | from dsmr_parser import obis_references as obis | ||||||
| from dsmr_parser import telegram_specifications | from dsmr_parser import telegram_specifications | ||||||
| from dsmr_parser.parsers import TelegramParser | from dsmr_parser.parsers import TelegramParser | ||||||
| from dsmr_parser.clients.protocol import DSMRProtocol | from dsmr_parser.clients.protocol import create_dsmr_protocol | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| TELEGRAM_V2_2 = ( | TELEGRAM_V2_2 = ( | ||||||
| @ -35,9 +35,10 @@ TELEGRAM_V2_2 = ( | |||||||
| class ProtocolTest(unittest.TestCase): | class ProtocolTest(unittest.TestCase): | ||||||
| 
 | 
 | ||||||
|     def setUp(self): |     def setUp(self): | ||||||
|         telegram_parser = TelegramParser(telegram_specifications.V2_2) |         new_protocol, _ = create_dsmr_protocol('2.2', | ||||||
|         self.protocol = DSMRProtocol(None, telegram_parser, |                                                telegram_callback=Mock(), | ||||||
|                                      telegram_callback=Mock()) |                                                keep_alive_interval=1) | ||||||
|  |         self.protocol = new_protocol() | ||||||
| 
 | 
 | ||||||
|     def test_complete_packet(self): |     def test_complete_packet(self): | ||||||
|         """Protocol should assemble incoming lines into complete packet.""" |         """Protocol should assemble incoming lines into complete packet.""" | ||||||
| @ -52,3 +53,23 @@ class ProtocolTest(unittest.TestCase): | |||||||
| 
 | 
 | ||||||
|         assert float(telegram[obis.GAS_METER_READING].value) == 1.001 |         assert float(telegram[obis.GAS_METER_READING].value) == 1.001 | ||||||
|         assert telegram[obis.GAS_METER_READING].unit == 'm3' |         assert telegram[obis.GAS_METER_READING].unit == 'm3' | ||||||
|  | 
 | ||||||
|  |     def test_receive_packet(self): | ||||||
|  |         """Protocol packet reception.""" | ||||||
|  | 
 | ||||||
|  |         mock_transport = Mock() | ||||||
|  |         self.protocol.connection_made(mock_transport) | ||||||
|  |         assert not self.protocol._active | ||||||
|  | 
 | ||||||
|  |         self.protocol.data_received(TELEGRAM_V2_2.encode('ascii')) | ||||||
|  |         assert self.protocol._active | ||||||
|  | 
 | ||||||
|  |         # 1st call of keep_alive resets 'active' flag | ||||||
|  |         self.protocol.keep_alive() | ||||||
|  |         assert not self.protocol._active | ||||||
|  | 
 | ||||||
|  |         # 2nd call of keep_alive should close the transport | ||||||
|  |         self.protocol.keep_alive() | ||||||
|  |         assert mock_transport.close.called_once() | ||||||
|  | 
 | ||||||
|  |         self.protocol.connection_lost(None) | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user