Optional keep alive monitoring for TCP/IP connections
* Since dsmr-parser is listen-only, it will not notice interrupted connections and DSMR device restarts. The connection will be reset after an (optional) keep-alive interval if no messages were received from the device.
This commit is contained in:
		
							parent
							
								
									3dc77a8231
								
							
						
					
					
						commit
						bbd73897a0
					
				| @ -16,8 +16,8 @@ def console(): | |||||||
|                         help='alternatively connect using TCP host.') |                         help='alternatively connect using TCP host.') | ||||||
|     parser.add_argument('--port', default=None, |     parser.add_argument('--port', default=None, | ||||||
|                         help='TCP port to use for connection') |                         help='TCP port to use for connection') | ||||||
|     parser.add_argument('--version', default='2.2', choices=['2.2', '4'], |     parser.add_argument('--version', default='2.2', choices=['2.2', '4', '5', '5B', '5L'], | ||||||
|                         help='DSMR version (2.2, 4)') |                         help='DSMR version (2.2, 4, 5, 5B, 5L)') | ||||||
|     parser.add_argument('--verbose', '-v', action='count') |     parser.add_argument('--verbose', '-v', action='count') | ||||||
| 
 | 
 | ||||||
|     args = parser.parse_args() |     args = parser.parse_args() | ||||||
|  | |||||||
| @ -14,7 +14,7 @@ from dsmr_parser.clients.settings import SERIAL_SETTINGS_V2_2, \ | |||||||
|     SERIAL_SETTINGS_V4, SERIAL_SETTINGS_V5 |     SERIAL_SETTINGS_V4, SERIAL_SETTINGS_V5 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None): | def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None, **args): | ||||||
|     """Creates a DSMR asyncio protocol.""" |     """Creates a DSMR asyncio protocol.""" | ||||||
| 
 | 
 | ||||||
|     if dsmr_version == '2.2': |     if dsmr_version == '2.2': | ||||||
| @ -37,7 +37,7 @@ def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None): | |||||||
|                                   dsmr_version) |                                   dsmr_version) | ||||||
| 
 | 
 | ||||||
|     protocol = partial(DSMRProtocol, loop, TelegramParser(specification), |     protocol = partial(DSMRProtocol, loop, TelegramParser(specification), | ||||||
|                        telegram_callback=telegram_callback) |                        telegram_callback=telegram_callback, **args) | ||||||
| 
 | 
 | ||||||
|     return protocol, serial_settings |     return protocol, serial_settings | ||||||
| 
 | 
 | ||||||
| @ -53,12 +53,14 @@ def create_dsmr_reader(port, dsmr_version, telegram_callback, loop=None): | |||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def create_tcp_dsmr_reader(host, port, dsmr_version, | def create_tcp_dsmr_reader(host, port, dsmr_version, | ||||||
|                            telegram_callback, loop=None): |                            telegram_callback, loop=None, | ||||||
|  |                            keep_alive_interval=None): | ||||||
|     """Creates a DSMR asyncio protocol coroutine using TCP connection.""" |     """Creates a DSMR asyncio protocol coroutine using TCP connection.""" | ||||||
|     if not loop: |     if not loop: | ||||||
|         loop = asyncio.get_event_loop() |         loop = asyncio.get_event_loop() | ||||||
|     protocol, _ = create_dsmr_protocol( |     protocol, _ = create_dsmr_protocol( | ||||||
|         dsmr_version, telegram_callback, loop=loop) |         dsmr_version, telegram_callback, loop=loop, | ||||||
|  |         keep_alive_interval=keep_alive_interval) | ||||||
|     conn = loop.create_connection(protocol, host, port) |     conn = loop.create_connection(protocol, host, port) | ||||||
|     return conn |     return conn | ||||||
| 
 | 
 | ||||||
| @ -69,7 +71,8 @@ class DSMRProtocol(asyncio.Protocol): | |||||||
|     transport = None |     transport = None | ||||||
|     telegram_callback = None |     telegram_callback = None | ||||||
| 
 | 
 | ||||||
|     def __init__(self, loop, telegram_parser, telegram_callback=None): |     def __init__(self, loop, telegram_parser, | ||||||
|  |                  telegram_callback=None, keep_alive_interval=None): | ||||||
|         """Initialize class.""" |         """Initialize class.""" | ||||||
|         self.loop = loop |         self.loop = loop | ||||||
|         self.log = logging.getLogger(__name__) |         self.log = logging.getLogger(__name__) | ||||||
| @ -80,21 +83,36 @@ class DSMRProtocol(asyncio.Protocol): | |||||||
|         self.telegram_buffer = TelegramBuffer() |         self.telegram_buffer = TelegramBuffer() | ||||||
|         # keep a lock until the connection is closed |         # keep a lock until the connection is closed | ||||||
|         self._closed = asyncio.Event() |         self._closed = asyncio.Event() | ||||||
|  |         self._keep_alive_interval = keep_alive_interval | ||||||
|  |         self._active = True | ||||||
| 
 | 
 | ||||||
|     def connection_made(self, transport): |     def connection_made(self, transport): | ||||||
|         """Just logging for now.""" |         """Just logging for now.""" | ||||||
|         self.transport = transport |         self.transport = transport | ||||||
|         self.log.debug('connected') |         self.log.debug('connected') | ||||||
|  |         self._active = False | ||||||
|  |         if self._keep_alive_interval: | ||||||
|  |             self.loop.call_later(self._keep_alive_interval, self.keep_alive) | ||||||
| 
 | 
 | ||||||
|     def data_received(self, data): |     def data_received(self, data): | ||||||
|         """Add incoming data to buffer.""" |         """Add incoming data to buffer.""" | ||||||
|         data = data.decode('ascii') |         data = data.decode('ascii') | ||||||
|  |         self._active = True | ||||||
|         self.log.debug('received data: %s', data) |         self.log.debug('received data: %s', data) | ||||||
|         self.telegram_buffer.append(data) |         self.telegram_buffer.append(data) | ||||||
| 
 | 
 | ||||||
|         for telegram in self.telegram_buffer.get_all(): |         for telegram in self.telegram_buffer.get_all(): | ||||||
|             self.handle_telegram(telegram) |             self.handle_telegram(telegram) | ||||||
| 
 | 
 | ||||||
|  |     def keep_alive(self): | ||||||
|  |         if self._active: | ||||||
|  |             self.log.debug('keep-alive checked') | ||||||
|  |             self._active = False | ||||||
|  |             self.loop.call_later(self._keep_alive_interval, self.keep_alive) | ||||||
|  |         else: | ||||||
|  |             self.log.debug('keep-alive failed') | ||||||
|  |             self.transport.close() | ||||||
|  | 
 | ||||||
|     def connection_lost(self, exc): |     def connection_lost(self, exc): | ||||||
|         """Stop when connection is lost.""" |         """Stop when connection is lost.""" | ||||||
|         if exc: |         if exc: | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user