issue-51-telegram updated documentation; refactored V5 unittest to use telegram attributes instead of keys;
This commit is contained in:
parent
253d043b7b
commit
b527e991ef
27
README.rst
27
README.rst
@ -171,6 +171,8 @@ Parsing module usage
|
|||||||
The parsing module accepts complete unaltered telegram strings and parses these
|
The parsing module accepts complete unaltered telegram strings and parses these
|
||||||
into a Telegram object. This previously was a dictionary, but the Telegram object is mostly compatible.
|
into a Telegram object. This previously was a dictionary, but the Telegram object is mostly compatible.
|
||||||
|
|
||||||
|
Getting full telegrams from a bytestream can be made easier by using the TelegramBuffer helper class.
|
||||||
|
|
||||||
.. code-block:: python
|
.. code-block:: python
|
||||||
|
|
||||||
from dsmr_parser import telegram_specifications
|
from dsmr_parser import telegram_specifications
|
||||||
@ -210,16 +212,12 @@ into a Telegram object. This previously was a dictionary, but the Telegram objec
|
|||||||
Telegram object
|
Telegram object
|
||||||
---------------------
|
---------------------
|
||||||
|
|
||||||
|
A Telegram has attributes for all the parsed values according to the given telegram specification. Each value is a DsmrObject which have a 'value' and 'unit' property. MBusObject's, which are DsmrObject's as well additionally have a 'datetime' property. The 'value' can contain any python type (int, str, Decimal) depending on the field. The 'unit' contains 'kW', 'A', 'kWh' or 'm3'.
|
||||||
|
|
||||||
|
Below are some examples on how to get the meter data. Alternatively check out the following unit test for a complete example: TelegramParserV5Test.test_parse
|
||||||
|
|
||||||
.. code-block:: python
|
.. code-block:: python
|
||||||
|
|
||||||
# DSMR v5 telegram example
|
|
||||||
from dsmr_parser import telegram_specifications
|
|
||||||
from dsmr_parser.parsers import TelegramParser
|
|
||||||
from test.example_telegrams import TELEGRAM_V5
|
|
||||||
|
|
||||||
parser = TelegramParser(telegram_specifications.V5)
|
|
||||||
telegram = parser.parse(TELEGRAM_V5)
|
|
||||||
|
|
||||||
# Print contents of all available values
|
# Print contents of all available values
|
||||||
# See dsmr_parser.obis_name_mapping for all readable telegram values.
|
# See dsmr_parser.obis_name_mapping for all readable telegram values.
|
||||||
# The available values differ per DSMR version and meter.
|
# The available values differ per DSMR version and meter.
|
||||||
@ -231,21 +229,22 @@ Telegram object
|
|||||||
# etc.
|
# etc.
|
||||||
|
|
||||||
# Example to get current electricity usage
|
# Example to get current electricity usage
|
||||||
print(telegram.CURRENT_ELECTRICITY_USAGE)
|
print(telegram.CURRENT_ELECTRICITY_USAGE) # <dsmr_parser.objects.CosemObject at 0x7f5e98ae5ac8>
|
||||||
print(telegram.CURRENT_ELECTRICITY_USAGE.value)
|
print(telegram.CURRENT_ELECTRICITY_USAGE.value) # Decimal('2.027')
|
||||||
print(telegram.CURRENT_ELECTRICITY_USAGE.unit)
|
print(telegram.CURRENT_ELECTRICITY_USAGE.unit) # 'kW'
|
||||||
# <dsmr_parser.objects.CosemObject at 0x7f5e98ae5ac8>
|
|
||||||
# Decimal('2.027')
|
|
||||||
# 'kW'
|
|
||||||
|
|
||||||
# All Mbus device readings like gas meters and water meters can be retrieved as follows:
|
# All Mbus device readings like gas meters and water meters can be retrieved as follows:
|
||||||
mbus_devices = telegram.get_mbus_devices()
|
mbus_devices = telegram.get_mbus_devices()
|
||||||
|
|
||||||
# A specific device based on the channel the device is connected to can be retrieved as follows:
|
# A specific device based on the channel the device is connected to can be retrieved as follows:
|
||||||
mbus_device = telegram.get_mbus_device_by_channel(1)
|
mbus_device = telegram.get_mbus_device_by_channel(1)
|
||||||
|
print(mbus_device.DEVICE_TYPE.value) # 3
|
||||||
print(mbus_device.EQUIPMENT_IDENTIFIER_GAS.value) # '4730303339303031393336393930363139'
|
print(mbus_device.EQUIPMENT_IDENTIFIER_GAS.value) # '4730303339303031393336393930363139'
|
||||||
print(mbus_device.HOURLY_GAS_METER_READING.value) # Decimal('246.138')
|
print(mbus_device.HOURLY_GAS_METER_READING.value) # Decimal('246.138')
|
||||||
|
|
||||||
|
# A deprecated way of getting the values is by key
|
||||||
|
telegram[obis_references.CURRENT_ELECTRICITY_USAGE]
|
||||||
|
|
||||||
Installation
|
Installation
|
||||||
------------
|
------------
|
||||||
|
|
||||||
|
@ -5,7 +5,6 @@ import unittest
|
|||||||
|
|
||||||
import pytz
|
import pytz
|
||||||
|
|
||||||
from dsmr_parser import obis_references as obis
|
|
||||||
from dsmr_parser import telegram_specifications
|
from dsmr_parser import telegram_specifications
|
||||||
from dsmr_parser.exceptions import InvalidChecksumError, ParseError
|
from dsmr_parser.exceptions import InvalidChecksumError, ParseError
|
||||||
from dsmr_parser.objects import CosemObject, MBusObject
|
from dsmr_parser.objects import CosemObject, MBusObject
|
||||||
@ -18,211 +17,222 @@ class TelegramParserV5Test(unittest.TestCase):
|
|||||||
|
|
||||||
def test_parse(self):
|
def test_parse(self):
|
||||||
parser = TelegramParser(telegram_specifications.V5)
|
parser = TelegramParser(telegram_specifications.V5)
|
||||||
result = parser.parse(TELEGRAM_V5)
|
telegram = parser.parse(TELEGRAM_V5)
|
||||||
|
|
||||||
# P1_MESSAGE_HEADER (1-3:0.2.8)
|
# P1_MESSAGE_HEADER (1-3:0.2.8)
|
||||||
assert isinstance(result[obis.P1_MESSAGE_HEADER], CosemObject)
|
assert isinstance(telegram.P1_MESSAGE_HEADER, CosemObject)
|
||||||
assert result[obis.P1_MESSAGE_HEADER].unit is None
|
assert telegram.P1_MESSAGE_HEADER.unit is None
|
||||||
assert isinstance(result[obis.P1_MESSAGE_HEADER].value, str)
|
assert isinstance(telegram.P1_MESSAGE_HEADER.value, str)
|
||||||
assert result[obis.P1_MESSAGE_HEADER].value == '50'
|
assert telegram.P1_MESSAGE_HEADER.value == '50'
|
||||||
|
|
||||||
# P1_MESSAGE_TIMESTAMP (0-0:1.0.0)
|
# P1_MESSAGE_TIMESTAMP (0-0:1.0.0)
|
||||||
assert isinstance(result[obis.P1_MESSAGE_TIMESTAMP], CosemObject)
|
assert isinstance(telegram.P1_MESSAGE_TIMESTAMP, CosemObject)
|
||||||
assert result[obis.P1_MESSAGE_TIMESTAMP].unit is None
|
assert telegram.P1_MESSAGE_TIMESTAMP.unit is None
|
||||||
assert isinstance(result[obis.P1_MESSAGE_TIMESTAMP].value, datetime.datetime)
|
assert isinstance(telegram.P1_MESSAGE_TIMESTAMP.value, datetime.datetime)
|
||||||
assert result[obis.P1_MESSAGE_TIMESTAMP].value == \
|
assert telegram.P1_MESSAGE_TIMESTAMP.value == \
|
||||||
datetime.datetime(2017, 1, 2, 18, 20, 2, tzinfo=pytz.UTC)
|
datetime.datetime(2017, 1, 2, 18, 20, 2, tzinfo=pytz.UTC)
|
||||||
|
|
||||||
# ELECTRICITY_USED_TARIFF_1 (1-0:1.8.1)
|
# ELECTRICITY_USED_TARIFF_1 (1-0:1.8.1)
|
||||||
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1], CosemObject)
|
assert isinstance(telegram.ELECTRICITY_USED_TARIFF_1, CosemObject)
|
||||||
assert result[obis.ELECTRICITY_USED_TARIFF_1].unit == 'kWh'
|
assert telegram.ELECTRICITY_USED_TARIFF_1.unit == 'kWh'
|
||||||
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1].value, Decimal)
|
assert isinstance(telegram.ELECTRICITY_USED_TARIFF_1.value, Decimal)
|
||||||
assert result[obis.ELECTRICITY_USED_TARIFF_1].value == Decimal('4.426')
|
assert telegram.ELECTRICITY_USED_TARIFF_1.value == Decimal('4.426')
|
||||||
|
|
||||||
# ELECTRICITY_USED_TARIFF_2 (1-0:1.8.2)
|
# ELECTRICITY_USED_TARIFF_2 (1-0:1.8.2)
|
||||||
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2], CosemObject)
|
assert isinstance(telegram.ELECTRICITY_USED_TARIFF_2, CosemObject)
|
||||||
assert result[obis.ELECTRICITY_USED_TARIFF_2].unit == 'kWh'
|
assert telegram.ELECTRICITY_USED_TARIFF_2.unit == 'kWh'
|
||||||
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2].value, Decimal)
|
assert isinstance(telegram.ELECTRICITY_USED_TARIFF_2.value, Decimal)
|
||||||
assert result[obis.ELECTRICITY_USED_TARIFF_2].value == Decimal('2.399')
|
assert telegram.ELECTRICITY_USED_TARIFF_2.value == Decimal('2.399')
|
||||||
|
|
||||||
# ELECTRICITY_DELIVERED_TARIFF_1 (1-0:2.8.1)
|
# ELECTRICITY_DELIVERED_TARIFF_1 (1-0:2.8.1)
|
||||||
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_1], CosemObject)
|
assert isinstance(telegram.ELECTRICITY_DELIVERED_TARIFF_1, CosemObject)
|
||||||
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_1].unit == 'kWh'
|
assert telegram.ELECTRICITY_DELIVERED_TARIFF_1.unit == 'kWh'
|
||||||
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_1].value, Decimal)
|
assert isinstance(telegram.ELECTRICITY_DELIVERED_TARIFF_1.value, Decimal)
|
||||||
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_1].value == Decimal('2.444')
|
assert telegram.ELECTRICITY_DELIVERED_TARIFF_1.value == Decimal('2.444')
|
||||||
|
|
||||||
# ELECTRICITY_DELIVERED_TARIFF_2 (1-0:2.8.2)
|
# ELECTRICITY_DELIVERED_TARIFF_2 (1-0:2.8.2)
|
||||||
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_2], CosemObject)
|
assert isinstance(telegram.ELECTRICITY_DELIVERED_TARIFF_2, CosemObject)
|
||||||
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_2].unit == 'kWh'
|
assert telegram.ELECTRICITY_DELIVERED_TARIFF_2.unit == 'kWh'
|
||||||
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_2].value, Decimal)
|
assert isinstance(telegram.ELECTRICITY_DELIVERED_TARIFF_2.value, Decimal)
|
||||||
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_2].value == Decimal('0')
|
assert telegram.ELECTRICITY_DELIVERED_TARIFF_2.value == Decimal('0')
|
||||||
|
|
||||||
# ELECTRICITY_ACTIVE_TARIFF (0-0:96.14.0)
|
# ELECTRICITY_ACTIVE_TARIFF (0-0:96.14.0)
|
||||||
assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF], CosemObject)
|
assert isinstance(telegram.ELECTRICITY_ACTIVE_TARIFF, CosemObject)
|
||||||
assert result[obis.ELECTRICITY_ACTIVE_TARIFF].unit is None
|
assert telegram.ELECTRICITY_ACTIVE_TARIFF.unit is None
|
||||||
assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF].value, str)
|
assert isinstance(telegram.ELECTRICITY_ACTIVE_TARIFF.value, str)
|
||||||
assert result[obis.ELECTRICITY_ACTIVE_TARIFF].value == '0002'
|
assert telegram.ELECTRICITY_ACTIVE_TARIFF.value == '0002'
|
||||||
|
|
||||||
# EQUIPMENT_IDENTIFIER (0-0:96.1.1)
|
# EQUIPMENT_IDENTIFIER (0-0:96.1.1)
|
||||||
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER], CosemObject)
|
assert isinstance(telegram.EQUIPMENT_IDENTIFIER, CosemObject)
|
||||||
assert result[obis.EQUIPMENT_IDENTIFIER].unit is None
|
assert telegram.EQUIPMENT_IDENTIFIER.unit is None
|
||||||
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER].value, str)
|
assert isinstance(telegram.EQUIPMENT_IDENTIFIER.value, str)
|
||||||
assert result[obis.EQUIPMENT_IDENTIFIER].value == '4B384547303034303436333935353037'
|
assert telegram.EQUIPMENT_IDENTIFIER.value == '4B384547303034303436333935353037'
|
||||||
|
|
||||||
# CURRENT_ELECTRICITY_USAGE (1-0:1.7.0)
|
# CURRENT_ELECTRICITY_USAGE (1-0:1.7.0)
|
||||||
assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE], CosemObject)
|
assert isinstance(telegram.CURRENT_ELECTRICITY_USAGE, CosemObject)
|
||||||
assert result[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW'
|
assert telegram.CURRENT_ELECTRICITY_USAGE.unit == 'kW'
|
||||||
assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE].value, Decimal)
|
assert isinstance(telegram.CURRENT_ELECTRICITY_USAGE.value, Decimal)
|
||||||
assert result[obis.CURRENT_ELECTRICITY_USAGE].value == Decimal('0.244')
|
assert telegram.CURRENT_ELECTRICITY_USAGE.value == Decimal('0.244')
|
||||||
|
|
||||||
# CURRENT_ELECTRICITY_DELIVERY (1-0:2.7.0)
|
# CURRENT_ELECTRICITY_DELIVERY (1-0:2.7.0)
|
||||||
assert isinstance(result[obis.CURRENT_ELECTRICITY_DELIVERY], CosemObject)
|
assert isinstance(telegram.CURRENT_ELECTRICITY_DELIVERY, CosemObject)
|
||||||
assert result[obis.CURRENT_ELECTRICITY_DELIVERY].unit == 'kW'
|
assert telegram.CURRENT_ELECTRICITY_DELIVERY.unit == 'kW'
|
||||||
assert isinstance(result[obis.CURRENT_ELECTRICITY_DELIVERY].value, Decimal)
|
assert isinstance(telegram.CURRENT_ELECTRICITY_DELIVERY.value, Decimal)
|
||||||
assert result[obis.CURRENT_ELECTRICITY_DELIVERY].value == Decimal('0')
|
assert telegram.CURRENT_ELECTRICITY_DELIVERY.value == Decimal('0')
|
||||||
|
|
||||||
# LONG_POWER_FAILURE_COUNT (96.7.9)
|
# LONG_POWER_FAILURE_COUNT (96.7.9)
|
||||||
assert isinstance(result[obis.LONG_POWER_FAILURE_COUNT], CosemObject)
|
assert isinstance(telegram.LONG_POWER_FAILURE_COUNT, CosemObject)
|
||||||
assert result[obis.LONG_POWER_FAILURE_COUNT].unit is None
|
assert telegram.LONG_POWER_FAILURE_COUNT.unit is None
|
||||||
assert isinstance(result[obis.LONG_POWER_FAILURE_COUNT].value, int)
|
assert isinstance(telegram.LONG_POWER_FAILURE_COUNT.value, int)
|
||||||
assert result[obis.LONG_POWER_FAILURE_COUNT].value == 0
|
assert telegram.LONG_POWER_FAILURE_COUNT.value == 0
|
||||||
|
|
||||||
# SHORT_POWER_FAILURE_COUNT (1-0:96.7.21)
|
# SHORT_POWER_FAILURE_COUNT (1-0:96.7.21)
|
||||||
assert isinstance(result[obis.SHORT_POWER_FAILURE_COUNT], CosemObject)
|
assert isinstance(telegram.SHORT_POWER_FAILURE_COUNT, CosemObject)
|
||||||
assert result[obis.SHORT_POWER_FAILURE_COUNT].unit is None
|
assert telegram.SHORT_POWER_FAILURE_COUNT.unit is None
|
||||||
assert isinstance(result[obis.SHORT_POWER_FAILURE_COUNT].value, int)
|
assert isinstance(telegram.SHORT_POWER_FAILURE_COUNT.value, int)
|
||||||
assert result[obis.SHORT_POWER_FAILURE_COUNT].value == 13
|
assert telegram.SHORT_POWER_FAILURE_COUNT.value == 13
|
||||||
|
|
||||||
# VOLTAGE_SAG_L1_COUNT (1-0:32.32.0)
|
# VOLTAGE_SAG_L1_COUNT (1-0:32.32.0)
|
||||||
assert isinstance(result[obis.VOLTAGE_SAG_L1_COUNT], CosemObject)
|
assert isinstance(telegram.VOLTAGE_SAG_L1_COUNT, CosemObject)
|
||||||
assert result[obis.VOLTAGE_SAG_L1_COUNT].unit is None
|
assert telegram.VOLTAGE_SAG_L1_COUNT.unit is None
|
||||||
assert isinstance(result[obis.VOLTAGE_SAG_L1_COUNT].value, int)
|
assert isinstance(telegram.VOLTAGE_SAG_L1_COUNT.value, int)
|
||||||
assert result[obis.VOLTAGE_SAG_L1_COUNT].value == 0
|
assert telegram.VOLTAGE_SAG_L1_COUNT.value == 0
|
||||||
|
|
||||||
# VOLTAGE_SAG_L2_COUNT (1-0:52.32.0)
|
# VOLTAGE_SAG_L2_COUNT (1-0:52.32.0)
|
||||||
assert isinstance(result[obis.VOLTAGE_SAG_L2_COUNT], CosemObject)
|
assert isinstance(telegram.VOLTAGE_SAG_L2_COUNT, CosemObject)
|
||||||
assert result[obis.VOLTAGE_SAG_L2_COUNT].unit is None
|
assert telegram.VOLTAGE_SAG_L2_COUNT.unit is None
|
||||||
assert isinstance(result[obis.VOLTAGE_SAG_L2_COUNT].value, int)
|
assert isinstance(telegram.VOLTAGE_SAG_L2_COUNT.value, int)
|
||||||
assert result[obis.VOLTAGE_SAG_L2_COUNT].value == 0
|
assert telegram.VOLTAGE_SAG_L2_COUNT.value == 0
|
||||||
|
|
||||||
# VOLTAGE_SAG_L3_COUNT (1-0:72.32.0)
|
# VOLTAGE_SAG_L3_COUNT (1-0:72.32.0)
|
||||||
assert isinstance(result[obis.VOLTAGE_SAG_L3_COUNT], CosemObject)
|
assert isinstance(telegram.VOLTAGE_SAG_L3_COUNT, CosemObject)
|
||||||
assert result[obis.VOLTAGE_SAG_L3_COUNT].unit is None
|
assert telegram.VOLTAGE_SAG_L3_COUNT.unit is None
|
||||||
assert isinstance(result[obis.VOLTAGE_SAG_L3_COUNT].value, int)
|
assert isinstance(telegram.VOLTAGE_SAG_L3_COUNT.value, int)
|
||||||
assert result[obis.VOLTAGE_SAG_L3_COUNT].value == 0
|
assert telegram.VOLTAGE_SAG_L3_COUNT.value == 0
|
||||||
|
|
||||||
# VOLTAGE_SWELL_L1_COUNT (1-0:32.36.0)
|
# VOLTAGE_SWELL_L1_COUNT (1-0:32.36.0)
|
||||||
assert isinstance(result[obis.VOLTAGE_SWELL_L1_COUNT], CosemObject)
|
assert isinstance(telegram.VOLTAGE_SWELL_L1_COUNT, CosemObject)
|
||||||
assert result[obis.VOLTAGE_SWELL_L1_COUNT].unit is None
|
assert telegram.VOLTAGE_SWELL_L1_COUNT.unit is None
|
||||||
assert isinstance(result[obis.VOLTAGE_SWELL_L1_COUNT].value, int)
|
assert isinstance(telegram.VOLTAGE_SWELL_L1_COUNT.value, int)
|
||||||
assert result[obis.VOLTAGE_SWELL_L1_COUNT].value == 0
|
assert telegram.VOLTAGE_SWELL_L1_COUNT.value == 0
|
||||||
|
|
||||||
# VOLTAGE_SWELL_L2_COUNT (1-0:52.36.0)
|
# VOLTAGE_SWELL_L2_COUNT (1-0:52.36.0)
|
||||||
assert isinstance(result[obis.VOLTAGE_SWELL_L2_COUNT], CosemObject)
|
assert isinstance(telegram.VOLTAGE_SWELL_L2_COUNT, CosemObject)
|
||||||
assert result[obis.VOLTAGE_SWELL_L2_COUNT].unit is None
|
assert telegram.VOLTAGE_SWELL_L2_COUNT.unit is None
|
||||||
assert isinstance(result[obis.VOLTAGE_SWELL_L2_COUNT].value, int)
|
assert isinstance(telegram.VOLTAGE_SWELL_L2_COUNT.value, int)
|
||||||
assert result[obis.VOLTAGE_SWELL_L2_COUNT].value == 0
|
assert telegram.VOLTAGE_SWELL_L2_COUNT.value == 0
|
||||||
|
|
||||||
# VOLTAGE_SWELL_L3_COUNT (1-0:72.36.0)
|
# VOLTAGE_SWELL_L3_COUNT (1-0:72.36.0)
|
||||||
assert isinstance(result[obis.VOLTAGE_SWELL_L3_COUNT], CosemObject)
|
assert isinstance(telegram.VOLTAGE_SWELL_L3_COUNT, CosemObject)
|
||||||
assert result[obis.VOLTAGE_SWELL_L3_COUNT].unit is None
|
assert telegram.VOLTAGE_SWELL_L3_COUNT.unit is None
|
||||||
assert isinstance(result[obis.VOLTAGE_SWELL_L3_COUNT].value, int)
|
assert isinstance(telegram.VOLTAGE_SWELL_L3_COUNT.value, int)
|
||||||
assert result[obis.VOLTAGE_SWELL_L3_COUNT].value == 0
|
assert telegram.VOLTAGE_SWELL_L3_COUNT.value == 0
|
||||||
|
|
||||||
# INSTANTANEOUS_VOLTAGE_L1 (1-0:32.7.0)
|
# INSTANTANEOUS_VOLTAGE_L1 (1-0:32.7.0)
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L1], CosemObject)
|
assert isinstance(telegram.INSTANTANEOUS_VOLTAGE_L1, CosemObject)
|
||||||
assert result[obis.INSTANTANEOUS_VOLTAGE_L1].unit == 'V'
|
assert telegram.INSTANTANEOUS_VOLTAGE_L1.unit == 'V'
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L1].value, Decimal)
|
assert isinstance(telegram.INSTANTANEOUS_VOLTAGE_L1.value, Decimal)
|
||||||
assert result[obis.INSTANTANEOUS_VOLTAGE_L1].value == Decimal('230.0')
|
assert telegram.INSTANTANEOUS_VOLTAGE_L1.value == Decimal('230.0')
|
||||||
|
|
||||||
# INSTANTANEOUS_VOLTAGE_L2 (1-0:52.7.0)
|
# INSTANTANEOUS_VOLTAGE_L2 (1-0:52.7.0)
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L2], CosemObject)
|
assert isinstance(telegram.INSTANTANEOUS_VOLTAGE_L2, CosemObject)
|
||||||
assert result[obis.INSTANTANEOUS_VOLTAGE_L2].unit == 'V'
|
assert telegram.INSTANTANEOUS_VOLTAGE_L2.unit == 'V'
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L2].value, Decimal)
|
assert isinstance(telegram.INSTANTANEOUS_VOLTAGE_L2.value, Decimal)
|
||||||
assert result[obis.INSTANTANEOUS_VOLTAGE_L2].value == Decimal('230.0')
|
assert telegram.INSTANTANEOUS_VOLTAGE_L2.value == Decimal('230.0')
|
||||||
|
|
||||||
# INSTANTANEOUS_VOLTAGE_L3 (1-0:72.7.0)
|
# INSTANTANEOUS_VOLTAGE_L3 (1-0:72.7.0)
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L3], CosemObject)
|
assert isinstance(telegram.INSTANTANEOUS_VOLTAGE_L3, CosemObject)
|
||||||
assert result[obis.INSTANTANEOUS_VOLTAGE_L3].unit == 'V'
|
assert telegram.INSTANTANEOUS_VOLTAGE_L3.unit == 'V'
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L3].value, Decimal)
|
assert isinstance(telegram.INSTANTANEOUS_VOLTAGE_L3.value, Decimal)
|
||||||
assert result[obis.INSTANTANEOUS_VOLTAGE_L3].value == Decimal('229.0')
|
assert telegram.INSTANTANEOUS_VOLTAGE_L3.value == Decimal('229.0')
|
||||||
|
|
||||||
# INSTANTANEOUS_CURRENT_L1 (1-0:31.7.0)
|
# INSTANTANEOUS_CURRENT_L1 (1-0:31.7.0)
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L1], CosemObject)
|
assert isinstance(telegram.INSTANTANEOUS_CURRENT_L1, CosemObject)
|
||||||
assert result[obis.INSTANTANEOUS_CURRENT_L1].unit == 'A'
|
assert telegram.INSTANTANEOUS_CURRENT_L1.unit == 'A'
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L1].value, Decimal)
|
assert isinstance(telegram.INSTANTANEOUS_CURRENT_L1.value, Decimal)
|
||||||
assert result[obis.INSTANTANEOUS_CURRENT_L1].value == Decimal('0.48')
|
assert telegram.INSTANTANEOUS_CURRENT_L1.value == Decimal('0.48')
|
||||||
|
|
||||||
# INSTANTANEOUS_CURRENT_L2 (1-0:51.7.0)
|
# INSTANTANEOUS_CURRENT_L2 (1-0:51.7.0)
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L2], CosemObject)
|
assert isinstance(telegram.INSTANTANEOUS_CURRENT_L2, CosemObject)
|
||||||
assert result[obis.INSTANTANEOUS_CURRENT_L2].unit == 'A'
|
assert telegram.INSTANTANEOUS_CURRENT_L2.unit == 'A'
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L2].value, Decimal)
|
assert isinstance(telegram.INSTANTANEOUS_CURRENT_L2.value, Decimal)
|
||||||
assert result[obis.INSTANTANEOUS_CURRENT_L2].value == Decimal('0.44')
|
assert telegram.INSTANTANEOUS_CURRENT_L2.value == Decimal('0.44')
|
||||||
|
|
||||||
# INSTANTANEOUS_CURRENT_L3 (1-0:71.7.0)
|
# INSTANTANEOUS_CURRENT_L3 (1-0:71.7.0)
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L3], CosemObject)
|
assert isinstance(telegram.INSTANTANEOUS_CURRENT_L3, CosemObject)
|
||||||
assert result[obis.INSTANTANEOUS_CURRENT_L3].unit == 'A'
|
assert telegram.INSTANTANEOUS_CURRENT_L3.unit == 'A'
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L3].value, Decimal)
|
assert isinstance(telegram.INSTANTANEOUS_CURRENT_L3.value, Decimal)
|
||||||
assert result[obis.INSTANTANEOUS_CURRENT_L3].value == Decimal('0.86')
|
assert telegram.INSTANTANEOUS_CURRENT_L3.value == Decimal('0.86')
|
||||||
|
|
||||||
# TEXT_MESSAGE (0-0:96.13.0)
|
# TEXT_MESSAGE (0-0:96.13.0)
|
||||||
assert isinstance(result[obis.TEXT_MESSAGE], CosemObject)
|
assert isinstance(telegram.TEXT_MESSAGE, CosemObject)
|
||||||
assert result[obis.TEXT_MESSAGE].unit is None
|
assert telegram.TEXT_MESSAGE.unit is None
|
||||||
assert result[obis.TEXT_MESSAGE].value is None
|
assert telegram.TEXT_MESSAGE.value is None
|
||||||
|
|
||||||
# DEVICE_TYPE (0-x:24.1.0)
|
# DEVICE_TYPE (0-x:24.1.0)
|
||||||
assert isinstance(result[obis.DEVICE_TYPE], CosemObject)
|
assert isinstance(telegram.DEVICE_TYPE, CosemObject)
|
||||||
assert result[obis.DEVICE_TYPE].unit is None
|
assert telegram.DEVICE_TYPE.unit is None
|
||||||
assert isinstance(result[obis.DEVICE_TYPE].value, int)
|
assert isinstance(telegram.DEVICE_TYPE.value, int)
|
||||||
assert result[obis.DEVICE_TYPE].value == 3
|
assert telegram.DEVICE_TYPE.value == 3
|
||||||
|
|
||||||
# INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE (1-0:21.7.0)
|
# INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE (1-0:21.7.0)
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE], CosemObject)
|
assert isinstance(telegram.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE, CosemObject)
|
||||||
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].unit == 'kW'
|
assert telegram.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE.unit == 'kW'
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].value, Decimal)
|
assert isinstance(telegram.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE.value, Decimal)
|
||||||
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].value == Decimal('0.070')
|
assert telegram.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE.value == Decimal('0.070')
|
||||||
|
|
||||||
# INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE (1-0:41.7.0)
|
# INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE (1-0:41.7.0)
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE], CosemObject)
|
assert isinstance(telegram.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE, CosemObject)
|
||||||
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].unit == 'kW'
|
assert telegram.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE.unit == 'kW'
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].value, Decimal)
|
assert isinstance(telegram.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE.value, Decimal)
|
||||||
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].value == Decimal('0.032')
|
assert telegram.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE.value == Decimal('0.032')
|
||||||
|
|
||||||
# INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE (1-0:61.7.0)
|
# INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE (1-0:61.7.0)
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE], CosemObject)
|
assert isinstance(telegram.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE, CosemObject)
|
||||||
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].unit == 'kW'
|
assert telegram.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE.unit == 'kW'
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].value, Decimal)
|
assert isinstance(telegram.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE.value, Decimal)
|
||||||
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].value == Decimal('0.142')
|
assert telegram.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE.value == Decimal('0.142')
|
||||||
|
|
||||||
# INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE (1-0:22.7.0)
|
# INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE (1-0:22.7.0)
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE], CosemObject)
|
assert isinstance(telegram.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE, CosemObject)
|
||||||
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE].unit == 'kW'
|
assert telegram.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE.unit == 'kW'
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE].value, Decimal)
|
assert isinstance(telegram.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE.value, Decimal)
|
||||||
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE].value == Decimal('0')
|
assert telegram.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE.value == Decimal('0')
|
||||||
|
|
||||||
# INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE (1-0:42.7.0)
|
# INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE (1-0:42.7.0)
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE], CosemObject)
|
assert isinstance(telegram.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE, CosemObject)
|
||||||
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE].unit == 'kW'
|
assert telegram.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE.unit == 'kW'
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE].value, Decimal)
|
assert isinstance(telegram.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE.value, Decimal)
|
||||||
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE].value == Decimal('0')
|
assert telegram.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE.value == Decimal('0')
|
||||||
|
|
||||||
# INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE (1-0:62.7.0)
|
# INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE (1-0:62.7.0)
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE], CosemObject)
|
assert isinstance(telegram.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE, CosemObject)
|
||||||
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE].unit == 'kW'
|
assert telegram.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE.unit == 'kW'
|
||||||
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE].value, Decimal)
|
assert isinstance(telegram.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE.value, Decimal)
|
||||||
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE].value == Decimal('0')
|
assert telegram.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE.value == Decimal('0')
|
||||||
|
|
||||||
|
# There's only one Mbus device (gas meter) in this case. Alternatively
|
||||||
|
# use get_mbget_mbus_device_by_channel
|
||||||
|
gas_meter_devices = telegram.get_mbus_devices()
|
||||||
|
gas_meter_device = gas_meter_devices[0]
|
||||||
|
|
||||||
# EQUIPMENT_IDENTIFIER_GAS (0-x:96.1.0)
|
# EQUIPMENT_IDENTIFIER_GAS (0-x:96.1.0)
|
||||||
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS], CosemObject)
|
assert isinstance(gas_meter_device.DEVICE_TYPE, CosemObject)
|
||||||
assert result[obis.EQUIPMENT_IDENTIFIER_GAS].unit is None
|
assert gas_meter_device.DEVICE_TYPE.unit is None
|
||||||
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS].value, str)
|
assert isinstance(gas_meter_device.DEVICE_TYPE.value, int)
|
||||||
assert result[obis.EQUIPMENT_IDENTIFIER_GAS].value == '3232323241424344313233343536373839'
|
assert gas_meter_device.DEVICE_TYPE.value == 3
|
||||||
|
|
||||||
|
# EQUIPMENT_IDENTIFIER_GAS (0-x:96.1.0)
|
||||||
|
assert isinstance(gas_meter_device.EQUIPMENT_IDENTIFIER_GAS, CosemObject)
|
||||||
|
assert gas_meter_device.EQUIPMENT_IDENTIFIER_GAS.unit is None
|
||||||
|
assert isinstance(gas_meter_device.EQUIPMENT_IDENTIFIER_GAS.value, str)
|
||||||
|
assert gas_meter_device.EQUIPMENT_IDENTIFIER_GAS.value == '3232323241424344313233343536373839'
|
||||||
|
|
||||||
# HOURLY_GAS_METER_READING (0-1:24.2.1)
|
# HOURLY_GAS_METER_READING (0-1:24.2.1)
|
||||||
assert isinstance(result[obis.HOURLY_GAS_METER_READING], MBusObject)
|
assert isinstance(gas_meter_device.HOURLY_GAS_METER_READING, MBusObject)
|
||||||
assert result[obis.HOURLY_GAS_METER_READING].unit == 'm3'
|
assert gas_meter_device.HOURLY_GAS_METER_READING.unit == 'm3'
|
||||||
assert isinstance(result[obis.HOURLY_GAS_METER_READING].value, Decimal)
|
assert isinstance(telegram.HOURLY_GAS_METER_READING.value, Decimal)
|
||||||
assert result[obis.HOURLY_GAS_METER_READING].value == Decimal('0.107')
|
assert gas_meter_device.HOURLY_GAS_METER_READING.value == Decimal('0.107')
|
||||||
|
|
||||||
def test_checksum_valid(self):
|
def test_checksum_valid(self):
|
||||||
# No exception is raised.
|
# No exception is raised.
|
||||||
|
@ -334,10 +334,12 @@ class TelegramTest(unittest.TestCase):
|
|||||||
self.assertEqual(len(mbus_devices), 2)
|
self.assertEqual(len(mbus_devices), 2)
|
||||||
|
|
||||||
mbus_device_1 = mbus_devices[0]
|
mbus_device_1 = mbus_devices[0]
|
||||||
|
self.assertEqual(mbus_device_1.DEVICE_TYPE.value, 3)
|
||||||
self.assertEqual(mbus_device_1.EQUIPMENT_IDENTIFIER_GAS.value, None)
|
self.assertEqual(mbus_device_1.EQUIPMENT_IDENTIFIER_GAS.value, None)
|
||||||
self.assertEqual(mbus_device_1.HOURLY_GAS_METER_READING.value, Decimal('0'))
|
self.assertEqual(mbus_device_1.HOURLY_GAS_METER_READING.value, Decimal('0'))
|
||||||
|
|
||||||
mbus_device_2 = mbus_devices[1]
|
mbus_device_2 = mbus_devices[1]
|
||||||
|
self.assertEqual(mbus_device_2.DEVICE_TYPE.value, 3)
|
||||||
self.assertEqual(mbus_device_2.EQUIPMENT_IDENTIFIER_GAS.value, '4730303339303031393336393930363139')
|
self.assertEqual(mbus_device_2.EQUIPMENT_IDENTIFIER_GAS.value, '4730303339303031393336393930363139')
|
||||||
self.assertEqual(mbus_device_2.HOURLY_GAS_METER_READING.value, Decimal('246.138'))
|
self.assertEqual(mbus_device_2.HOURLY_GAS_METER_READING.value, Decimal('246.138'))
|
||||||
|
|
||||||
@ -346,9 +348,11 @@ class TelegramTest(unittest.TestCase):
|
|||||||
telegram = parser.parse(TELEGRAM_V5_TWO_MBUS)
|
telegram = parser.parse(TELEGRAM_V5_TWO_MBUS)
|
||||||
|
|
||||||
mbus_device_1 = telegram.get_mbus_device_by_channel(1)
|
mbus_device_1 = telegram.get_mbus_device_by_channel(1)
|
||||||
|
self.assertEqual(mbus_device_1.DEVICE_TYPE.value, 3)
|
||||||
self.assertEqual(mbus_device_1.EQUIPMENT_IDENTIFIER_GAS.value, None)
|
self.assertEqual(mbus_device_1.EQUIPMENT_IDENTIFIER_GAS.value, None)
|
||||||
self.assertEqual(mbus_device_1.HOURLY_GAS_METER_READING.value, Decimal('0'))
|
self.assertEqual(mbus_device_1.HOURLY_GAS_METER_READING.value, Decimal('0'))
|
||||||
|
|
||||||
mbus_device_2 = telegram.get_mbus_device_by_channel(2)
|
mbus_device_2 = telegram.get_mbus_device_by_channel(2)
|
||||||
|
self.assertEqual(mbus_device_2.DEVICE_TYPE.value, 3)
|
||||||
self.assertEqual(mbus_device_2.EQUIPMENT_IDENTIFIER_GAS.value, '4730303339303031393336393930363139')
|
self.assertEqual(mbus_device_2.EQUIPMENT_IDENTIFIER_GAS.value, '4730303339303031393336393930363139')
|
||||||
self.assertEqual(mbus_device_2.HOURLY_GAS_METER_READING.value, Decimal('246.138'))
|
self.assertEqual(mbus_device_2.HOURLY_GAS_METER_READING.value, Decimal('246.138'))
|
||||||
|
Loading…
Reference in New Issue
Block a user