Add test and exceptions for V2_2 implementation.
This commit is contained in:
		
							parent
							
								
									4a82066144
								
							
						
					
					
						commit
						447f2a24fb
					
				| @ -19,6 +19,21 @@ class MBusObject(DSMRObject): | ||||
|         return self.values[1]['unit'] | ||||
| 
 | ||||
| 
 | ||||
| class MBusObjectV2_2(DSMRObject): | ||||
| 
 | ||||
|     @property | ||||
|     def datetime(self): | ||||
|         return self.values[0]['value'] | ||||
| 
 | ||||
|     @property | ||||
|     def value(self): | ||||
|         return self.values[5]['value'] | ||||
| 
 | ||||
|     @property | ||||
|     def unit(self): | ||||
|         return self.values[4]['unit'] | ||||
| 
 | ||||
| 
 | ||||
| class CosemObject(DSMRObject): | ||||
| 
 | ||||
|     @property | ||||
|  | ||||
| @ -1,9 +1,9 @@ | ||||
| import logging | ||||
| import re | ||||
| 
 | ||||
| from .objects import MBusObject, CosemObject | ||||
| from .objects import MBusObject, MBusObjectV2_2, CosemObject | ||||
| from .exceptions import ParseError | ||||
| 
 | ||||
| from .obis_references import GAS_METER_READING | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| @ -29,7 +29,7 @@ class TelegramParser(object): | ||||
|         telegram = {} | ||||
| 
 | ||||
|         for line_value in line_values: | ||||
|             obis_reference, dsmr_object = self.parse_line(line_value) | ||||
|             obis_reference, dsmr_object = self.parse_line(line_value.strip()) | ||||
| 
 | ||||
|             telegram[obis_reference] = dsmr_object | ||||
| 
 | ||||
| @ -47,6 +47,26 @@ class TelegramParser(object): | ||||
|         return obis_reference, parser.parse(line_value) | ||||
| 
 | ||||
| 
 | ||||
| class TelegramParserV2_2(TelegramParser): | ||||
|     def parse(self, line_values): | ||||
|         """Join lines for gas meter.""" | ||||
| 
 | ||||
|         def join_lines(line_values): | ||||
|             join_next = re.compile(GAS_METER_READING) | ||||
| 
 | ||||
|             join = None | ||||
|             for line_value in line_values: | ||||
|                 if join: | ||||
|                     yield join.strip() + line_value | ||||
|                     join = None | ||||
|                 elif join_next.match(line_value): | ||||
|                     join = line_value | ||||
|                 else: | ||||
|                     yield line_value | ||||
| 
 | ||||
|         return super().parse(join_lines(line_values)) | ||||
| 
 | ||||
| 
 | ||||
| class DSMRObjectParser(object): | ||||
| 
 | ||||
|     def __init__(self, *value_formats): | ||||
| @ -85,7 +105,11 @@ class MBusParser(DSMRObjectParser): | ||||
|     """ | ||||
| 
 | ||||
|     def parse(self, line): | ||||
|         return MBusObject(self._parse(line)) | ||||
|         values = self._parse(line) | ||||
|         if len(values) == 2: | ||||
|             return MBusObject(values) | ||||
|         else: | ||||
|             return MBusObjectV2_2(values) | ||||
| 
 | ||||
| 
 | ||||
| class CosemParser(DSMRObjectParser): | ||||
|  | ||||
| @ -1,6 +1,6 @@ | ||||
| import serial | ||||
| 
 | ||||
| from dsmr_parser.parsers import TelegramParser | ||||
| from dsmr_parser.parsers import TelegramParser, V3TelegramParser | ||||
| 
 | ||||
| SERIAL_SETTINGS_V2_2 = { | ||||
|     'baudrate': 9600, | ||||
| @ -36,7 +36,12 @@ class SerialReader(object): | ||||
|     def __init__(self, device, serial_settings, telegram_specification): | ||||
|         self.serial_settings = serial_settings | ||||
|         self.serial_settings['port'] = device | ||||
|         self.telegram_parser = TelegramParser(telegram_specification) | ||||
| 
 | ||||
|         if serial_settings is SERIAL_SETTINGS_V2_2: | ||||
|             telegram_parser = V3TelegramParser | ||||
|         else: | ||||
|             telegram_parser = TelegramParser | ||||
|         self.telegram_parser = telegram_parser(telegram_specification) | ||||
| 
 | ||||
|     def read(self): | ||||
|         """ | ||||
|  | ||||
| @ -6,7 +6,10 @@ import pytz | ||||
| def timestamp(value): | ||||
| 
 | ||||
|     naive_datetime = datetime.datetime.strptime(value[:-1], '%y%m%d%H%M%S') | ||||
|     if len(value) == 13: | ||||
|         is_dst = value[12] == 'S'  # assume format 160322150000W | ||||
|     else: | ||||
|         is_dst = False | ||||
| 
 | ||||
|     local_tz = pytz.timezone('Europe/Amsterdam') | ||||
|     localized_datetime = local_tz.localize(naive_datetime, is_dst=is_dst) | ||||
|  | ||||
							
								
								
									
										38
									
								
								test/test_parse.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								test/test_parse.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,38 @@ | ||||
| """Test telegram parsing.""" | ||||
| 
 | ||||
| from dsmr_parser.parsers import TelegramParserV2_2 | ||||
| from dsmr_parser import telegram_specifications | ||||
| from dsmr_parser.obis_references import CURRENT_ELECTRICITY_USAGE, GAS_METER_READING | ||||
| 
 | ||||
| TELEGRAM_V2_2 = [ | ||||
|     "/ISk5\2MT382-1004", | ||||
|     "", | ||||
|     "0-0:96.1.1(00000000000000)", | ||||
|     "1-0:1.8.1(00001.001*kWh)", | ||||
|     "1-0:1.8.2(00001.001*kWh)", | ||||
|     "1-0:2.8.1(00001.001*kWh)", | ||||
|     "1-0:2.8.2(00001.001*kWh)", | ||||
|     "0-0:96.14.0(0001)", | ||||
|     "1-0:1.7.0(0001.01*kW)", | ||||
|     "1-0:2.7.0(0000.00*kW)", | ||||
|     "0-0:17.0.0(0999.00*kW)", | ||||
|     "0-0:96.3.10(1)", | ||||
|     "0-0:96.13.1()", | ||||
|     "0-0:96.13.0()", | ||||
|     "0-1:24.1.0(3)", | ||||
|     "0-1:96.1.0(000000000000)", | ||||
|     "0-1:24.3.0(161107190000)(00)(60)(1)(0-1:24.2.1)(m3)", | ||||
|     "(00001.001)", | ||||
|     "0-1:24.4.0(1)", | ||||
|     "!", | ||||
| ] | ||||
| 
 | ||||
| 
 | ||||
| def test_parse_v2_2(): | ||||
|     """Test if telegram parsing results in correct results.""" | ||||
| 
 | ||||
|     parser = TelegramParserV2_2(telegram_specifications.V2_2) | ||||
|     result = parser.parse(TELEGRAM_V2_2) | ||||
| 
 | ||||
|     assert float(result[CURRENT_ELECTRICITY_USAGE].value) == 1.01 | ||||
|     assert float(result[GAS_METER_READING].value) == 1.001 | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user