Move client tests to own folder

This commit is contained in:
Nigel Dokter 2024-12-22 13:37:21 +01:00
parent 99ad21c896
commit eb0d3d7353
5 changed files with 128 additions and 0 deletions

1
.gitignore vendored
View File

@ -3,6 +3,7 @@
.tox
.cache
.venv
.history
*.egg-info
/.project
/.pydevproject

0
test/clients/__init__.py Normal file
View File

View File

@ -0,0 +1,21 @@
import unittest
import tempfile
from dsmr_parser.clients.filereader import FileReader
from dsmr_parser.telegram_specifications import V5
from test.example_telegrams import TELEGRAM_V5
class FileReaderTest(unittest.TestCase):
def test_read_as_object(self):
with tempfile.NamedTemporaryFile() as file:
with open(file.name, "w") as f:
f.write(TELEGRAM_V5)
telegrams = []
reader = FileReader(file=file.name, telegram_specification=V5)
# Call
for telegram in reader.read_as_object():
telegrams.append(telegram)
self.assertEqual(len(telegrams), 1)

View File

@ -0,0 +1,77 @@
from unittest.mock import Mock
import unittest
from dsmr_parser import obis_references as obis
from dsmr_parser.clients.rfxtrx_protocol import create_rfxtrx_dsmr_protocol, PACKETTYPE_DSMR, SUBTYPE_P1
from dsmr_parser.objects import Telegram
TELEGRAM_V2_2 = (
'/ISk5\2MT382-1004\r\n'
'\r\n'
'0-0:96.1.1(00000000000000)\r\n'
'1-0:1.8.1(00001.001*kWh)\r\n'
'1-0:1.8.2(00001.001*kWh)\r\n'
'1-0:2.8.1(00001.001*kWh)\r\n'
'1-0:2.8.2(00001.001*kWh)\r\n'
'0-0:96.14.0(0001)\r\n'
'1-0:1.7.0(0001.01*kW)\r\n'
'1-0:2.7.0(0000.00*kW)\r\n'
'0-0:17.0.0(0999.00*kW)\r\n'
'0-0:96.3.10(1)\r\n'
'0-0:96.13.1()\r\n'
'0-0:96.13.0()\r\n'
'0-1:24.1.0(3)\r\n'
'0-1:96.1.0(000000000000)\r\n'
'0-1:24.3.0(161107190000)(00)(60)(1)(0-1:24.2.1)(m3)\r\n'
'(00001.001)\r\n'
'0-1:24.4.0(1)\r\n'
'!\r\n'
)
OTHER_RF_PACKET = b'\x03\x01\x02\x03'
def encode_telegram_as_RF_packets(telegram):
data = b''
for line in telegram.split('\n'):
packet_data = (line + '\n').encode('ascii')
packet_header = bytes(bytearray([
len(packet_data) + 3, # excluding length byte
PACKETTYPE_DSMR,
SUBTYPE_P1,
0 # seq num (ignored)
]))
data += packet_header + packet_data
# other RF packets can pass by on the line
data += OTHER_RF_PACKET
return data
class RFXtrxProtocolTest(unittest.TestCase):
def setUp(self):
new_protocol, _ = create_rfxtrx_dsmr_protocol('2.2',
telegram_callback=Mock(),
keep_alive_interval=1)
self.protocol = new_protocol()
def test_complete_packet(self):
"""Protocol should assemble incoming lines into complete packet."""
data = encode_telegram_as_RF_packets(TELEGRAM_V2_2)
# send data broken up in two parts
self.protocol.data_received(data[0:200])
self.protocol.data_received(data[200:])
telegram = self.protocol.telegram_callback.call_args_list[0][0][0]
assert isinstance(telegram, Telegram)
assert float(telegram[obis.CURRENT_ELECTRICITY_USAGE].value) == 1.01
assert telegram[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW'
assert float(telegram[obis.GAS_METER_READING].value) == 1.001
assert telegram[obis.GAS_METER_READING].unit == 'm3'

View File

@ -0,0 +1,29 @@
import unittest
import tempfile
from unittest import mock
from dsmr_parser import telegram_specifications
from dsmr_parser.clients.filereader import FileReader
from dsmr_parser.clients.serial_ import SerialReader
from dsmr_parser.clients.settings import SERIAL_SETTINGS_V5
from test.example_telegrams import TELEGRAM_V5
class SerialReaderTest(unittest.TestCase):
@mock.patch('dsmr_parser.clients.serial_.serial.Serial')
def test_read_as_object(self, mock_serial):
serial_handle_mock = mock_serial.return_value
# mock_serial.return_value.in_waiting = 1024
mock_serial.return_value.read.return_value = [b'Telegram data...', b''] # Return data, then empty bytes
serial_reader = SerialReader(
device='/dev/ttyUSB0',
serial_settings=SERIAL_SETTINGS_V5,
telegram_specification=telegram_specifications.V5
)
for telegram in serial_reader.read():
print(telegram) # see 'Telegram object' docs below