From bbd73897a0c4e8bdd75dd65f1ea4a4ad798e8c74 Mon Sep 17 00:00:00 2001 From: Rene Hogendoorn Date: Tue, 19 Jan 2021 08:26:28 +0100 Subject: [PATCH 1/3] Optional keep alive monitoring for TCP/IP connections * Since dsmr-parser is listen-only, it will not notice interrupted connections and DSMR device restarts. The connection will be reset after an (optional) keep-alive interval if no messages were received from the device. --- dsmr_parser/__main__.py | 4 ++-- dsmr_parser/clients/protocol.py | 28 +++++++++++++++++++++++----- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/dsmr_parser/__main__.py b/dsmr_parser/__main__.py index 8d9da8b..a9fbaa0 100644 --- a/dsmr_parser/__main__.py +++ b/dsmr_parser/__main__.py @@ -16,8 +16,8 @@ def console(): help='alternatively connect using TCP host.') parser.add_argument('--port', default=None, help='TCP port to use for connection') - parser.add_argument('--version', default='2.2', choices=['2.2', '4'], - help='DSMR version (2.2, 4)') + parser.add_argument('--version', default='2.2', choices=['2.2', '4', '5', '5B', '5L'], + help='DSMR version (2.2, 4, 5, 5B, 5L)') parser.add_argument('--verbose', '-v', action='count') args = parser.parse_args() diff --git a/dsmr_parser/clients/protocol.py b/dsmr_parser/clients/protocol.py index 66d0a39..ef48ae7 100644 --- a/dsmr_parser/clients/protocol.py +++ b/dsmr_parser/clients/protocol.py @@ -14,7 +14,7 @@ from dsmr_parser.clients.settings import SERIAL_SETTINGS_V2_2, \ SERIAL_SETTINGS_V4, SERIAL_SETTINGS_V5 -def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None): +def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None, **args): """Creates a DSMR asyncio protocol.""" if dsmr_version == '2.2': @@ -37,7 +37,7 @@ def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None): dsmr_version) protocol = partial(DSMRProtocol, loop, TelegramParser(specification), - telegram_callback=telegram_callback) + telegram_callback=telegram_callback, **args) return protocol, serial_settings @@ -53,12 +53,14 @@ def create_dsmr_reader(port, dsmr_version, telegram_callback, loop=None): def create_tcp_dsmr_reader(host, port, dsmr_version, - telegram_callback, loop=None): + telegram_callback, loop=None, + keep_alive_interval=None): """Creates a DSMR asyncio protocol coroutine using TCP connection.""" if not loop: loop = asyncio.get_event_loop() protocol, _ = create_dsmr_protocol( - dsmr_version, telegram_callback, loop=loop) + dsmr_version, telegram_callback, loop=loop, + keep_alive_interval=keep_alive_interval) conn = loop.create_connection(protocol, host, port) return conn @@ -69,7 +71,8 @@ class DSMRProtocol(asyncio.Protocol): transport = None telegram_callback = None - def __init__(self, loop, telegram_parser, telegram_callback=None): + def __init__(self, loop, telegram_parser, + telegram_callback=None, keep_alive_interval=None): """Initialize class.""" self.loop = loop self.log = logging.getLogger(__name__) @@ -80,21 +83,36 @@ class DSMRProtocol(asyncio.Protocol): self.telegram_buffer = TelegramBuffer() # keep a lock until the connection is closed self._closed = asyncio.Event() + self._keep_alive_interval = keep_alive_interval + self._active = True def connection_made(self, transport): """Just logging for now.""" self.transport = transport self.log.debug('connected') + self._active = False + if self._keep_alive_interval: + self.loop.call_later(self._keep_alive_interval, self.keep_alive) def data_received(self, data): """Add incoming data to buffer.""" data = data.decode('ascii') + self._active = True self.log.debug('received data: %s', data) self.telegram_buffer.append(data) for telegram in self.telegram_buffer.get_all(): self.handle_telegram(telegram) + def keep_alive(self): + if self._active: + self.log.debug('keep-alive checked') + self._active = False + self.loop.call_later(self._keep_alive_interval, self.keep_alive) + else: + self.log.debug('keep-alive failed') + self.transport.close() + def connection_lost(self, exc): """Stop when connection is lost.""" if exc: From 74535349278e3b136894388888949584f28c1725 Mon Sep 17 00:00:00 2001 From: Rene Hogendoorn Date: Tue, 19 Jan 2021 10:55:20 +0100 Subject: [PATCH 2/3] Raised log level to warning for failed keep-alive check --- dsmr_parser/clients/protocol.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dsmr_parser/clients/protocol.py b/dsmr_parser/clients/protocol.py index ef48ae7..e5e6a66 100644 --- a/dsmr_parser/clients/protocol.py +++ b/dsmr_parser/clients/protocol.py @@ -14,7 +14,7 @@ from dsmr_parser.clients.settings import SERIAL_SETTINGS_V2_2, \ SERIAL_SETTINGS_V4, SERIAL_SETTINGS_V5 -def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None, **args): +def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None, **kwargs): """Creates a DSMR asyncio protocol.""" if dsmr_version == '2.2': @@ -37,7 +37,7 @@ def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None, **args): dsmr_version) protocol = partial(DSMRProtocol, loop, TelegramParser(specification), - telegram_callback=telegram_callback, **args) + telegram_callback=telegram_callback, **kwargs) return protocol, serial_settings @@ -110,7 +110,7 @@ class DSMRProtocol(asyncio.Protocol): self._active = False self.loop.call_later(self._keep_alive_interval, self.keep_alive) else: - self.log.debug('keep-alive failed') + self.log.warning('keep-alive check failed') self.transport.close() def connection_lost(self, exc): From b901b3f74eddd52883e7f7e75f78ec13d2e5d819 Mon Sep 17 00:00:00 2001 From: Rene Hogendoorn Date: Thu, 11 Feb 2021 11:08:19 +0100 Subject: [PATCH 3/3] Add unit test for keep-alive --- dsmr_parser/clients/protocol.py | 8 +++++--- test/test_protocol.py | 29 +++++++++++++++++++++++++---- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/dsmr_parser/clients/protocol.py b/dsmr_parser/clients/protocol.py index e5e6a66..9b4536e 100644 --- a/dsmr_parser/clients/protocol.py +++ b/dsmr_parser/clients/protocol.py @@ -91,7 +91,7 @@ class DSMRProtocol(asyncio.Protocol): self.transport = transport self.log.debug('connected') self._active = False - if self._keep_alive_interval: + if self.loop and self._keep_alive_interval: self.loop.call_later(self._keep_alive_interval, self.keep_alive) def data_received(self, data): @@ -108,10 +108,12 @@ class DSMRProtocol(asyncio.Protocol): if self._active: self.log.debug('keep-alive checked') self._active = False - self.loop.call_later(self._keep_alive_interval, self.keep_alive) + if self.loop: + self.loop.call_later(self._keep_alive_interval, self.keep_alive) else: self.log.warning('keep-alive check failed') - self.transport.close() + if self.transport: + self.transport.close() def connection_lost(self, exc): """Stop when connection is lost.""" diff --git a/test/test_protocol.py b/test/test_protocol.py index 2fb14e0..c298d5c 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -5,7 +5,7 @@ import unittest from dsmr_parser import obis_references as obis from dsmr_parser import telegram_specifications from dsmr_parser.parsers import TelegramParser -from dsmr_parser.clients.protocol import DSMRProtocol +from dsmr_parser.clients.protocol import create_dsmr_protocol TELEGRAM_V2_2 = ( @@ -35,9 +35,10 @@ TELEGRAM_V2_2 = ( class ProtocolTest(unittest.TestCase): def setUp(self): - telegram_parser = TelegramParser(telegram_specifications.V2_2) - self.protocol = DSMRProtocol(None, telegram_parser, - telegram_callback=Mock()) + new_protocol, _ = create_dsmr_protocol('2.2', + telegram_callback=Mock(), + keep_alive_interval=1) + self.protocol = new_protocol() def test_complete_packet(self): """Protocol should assemble incoming lines into complete packet.""" @@ -52,3 +53,23 @@ class ProtocolTest(unittest.TestCase): assert float(telegram[obis.GAS_METER_READING].value) == 1.001 assert telegram[obis.GAS_METER_READING].unit == 'm3' + + def test_receive_packet(self): + """Protocol packet reception.""" + + mock_transport = Mock() + self.protocol.connection_made(mock_transport) + assert not self.protocol._active + + self.protocol.data_received(TELEGRAM_V2_2.encode('ascii')) + assert self.protocol._active + + # 1st call of keep_alive resets 'active' flag + self.protocol.keep_alive() + assert not self.protocol._active + + # 2nd call of keep_alive should close the transport + self.protocol.keep_alive() + assert mock_transport.close.called_once() + + self.protocol.connection_lost(None)