import typing
import async_generator
import attr
import pymodbus.client.asynchronous.schedulers
import pymodbus.client.asynchronous.tcp
import pymodbus.client.asynchronous.trio
import pymodbus.client.common
import sunspec2.mb
import sunspec2.modbus.client
import pymodbus.pdu
import sundog
[docs]@async_generator.asynccontextmanager
async def open_client(host: str, port: int) -> typing.AsyncIterator["Client"]:
"""Open a SunSpec Modbus TCP connection to the passed host and port.
Arguments:
host: The host name or IP address.
port: The port number.
Yields:
The SunSpec client.
"""
modbus_client = pymodbus.client.asynchronous.tcp.AsyncModbusTCPClient(
scheduler=pymodbus.client.asynchronous.schedulers.TRIO,
host=host,
port=port,
)
sunspec_device = sunspec2.modbus.client.SunSpecModbusClientDevice()
async with modbus_client.manage_connection() as protocol:
yield Client(
modbus_client=modbus_client,
sunspec_device=sunspec_device,
protocol=protocol,
)
[docs]@attr.s(auto_attribs=True)
class Client:
"""A SunSpec Modbus TCP client using :mod:`trio` support in :mod:`pymodbus` for
communication and `pysunspec2` for loading models and holding the local cache of
the data. The existing communication abilities of the `pysunspec2` objects are
left intact but should not be used.
.. automethod:: __getitem__
"""
modbus_client: pymodbus.client.asynchronous.trio.TrioModbusTcpClient
"""The Modbus TCP client used for communication."""
protocol: pymodbus.client.common.ModbusClientMixin
"""The Modbus client protocol."""
sunspec_device: sunspec2.modbus.client.SunSpecModbusClientDevice
"""The SunSpec device object that holds the local data cache and model structures.
"""
[docs] def __getitem__(
self, item: typing.Union[int, str]
) -> sunspec2.modbus.client.SunSpecModbusClientModel:
"""SunSpec models are accessible by indexing the client using either the model
number or model name.
.. code-block:: python
model_1 = client[1]
model_common = client["common"]
assert model_1 is model_common
Returns:
The requested model.
"""
[model] = self.sunspec_device.models[item]
return model
[docs] async def scan(self) -> None:
"""Scan the device to identify the base address, if not already set, and
collect the model list. This also populates all the data.
"""
if self.sunspec_device.base_addr is None:
for maybe_base_address in self.sunspec_device.base_addr_list:
read_bytes = await self.read_registers(
address=maybe_base_address,
count=len(sundog.base_address_sentinel) // 2,
)
if read_bytes == sundog.base_address_sentinel:
self.sunspec_device.base_addr = maybe_base_address
break
else:
raise sundog.BaseAddressNotFoundError(
addresses=self.sunspec_device.base_addr_list
)
else:
read_bytes = await self.read_registers(
address=self.sunspec_device.base_addr,
count=len(sundog.base_address_sentinel) // 2,
)
if read_bytes != sundog.base_address_sentinel:
raise sundog.InvalidBaseAddressError(
address=self.sunspec_device.base_addr,
value=read_bytes,
)
address = self.sunspec_device.base_addr + len(sundog.base_address_sentinel) // 2
model_id_length = 1
model_length_length = 1
while True:
model_address = address
intra_model_address = address
read_bytes = await self.read_registers(
address=address, count=model_id_length
)
intra_model_address += model_id_length
maybe_model_id = int.from_bytes(read_bytes, byteorder="big", signed=False)
if maybe_model_id == sunspec2.mb.SUNS_END_MODEL_ID:
break
model_id = maybe_model_id
read_bytes = await self.read_registers(
address=intra_model_address, count=model_length_length
)
intra_model_address += model_length_length
model_length = int.from_bytes(read_bytes, byteorder="big", signed=False)
# TODO: oof, awkward way to write this it seems
whole_model_length = (intra_model_address - address) + model_length
model_data = self.read_registers(address=address, count=whole_model_length)
address += whole_model_length
model = sunspec2.modbus.client.SunSpecModbusClientModel(
model_id=model_id,
model_addr=model_address,
model_len=model_length,
data=model_data,
mb_device=self.sunspec_device,
)
self.sunspec_device.add_model(model)
# TODO: should the local data be updated?
[docs] async def read_registers(self, address: int, count: int) -> bytes:
"""Read from the specified sequential register range in the device. Based on
the 16-bit Modbus register size, the data in the returned bytes is in 2-byte
chunks with each having a big-endian byte order. The local data is not
updated.
Arguments:
address: The first register to read.
count: The total number of sequential registers to read.
Returns:
The raw bytes read from the device.
Raises:
sundog.ModbusError: When a Modbus exception response is received.
"""
response = await self.protocol.read_holding_registers(
address=address, count=count, unit=0x01
)
if isinstance(response, pymodbus.pdu.ExceptionResponse):
raise sundog.ModbusError(exception=response)
return bytes(response.registers)
[docs] async def read_point(
self, point: sunspec2.modbus.client.SunSpecModbusClientPoint
) -> typing.Union[float, int]:
"""Read the passed point from the device and update the local data.
Arguments:
point: The SunSpec point object to read.
Returns:
The new computed value of the point.
Raises:
sundog.ModbusError: When a Modbus exception response is received.
"""
if point.sf is not None:
await self.read_point(point=point.model.points[point.sf])
read_bytes = await self.read_registers(
address=self.point_address(point=point),
count=point.len,
)
point.set_mb(data=read_bytes)
if point.pdef["type"] == "sunssf":
for other_point in point.model.points.values():
if other_point.sf == point.pdef["name"]:
other_cvalue = other_point.cvalue
other_point.sf_value = point.cvalue
if other_cvalue is not None:
other_point.cvalue = other_cvalue
return point.cvalue # type: ignore[no-any-return]
[docs] def point_address(
self, point: sunspec2.modbus.client.SunSpecModbusClientPoint
) -> int:
"""Calculate the start address of a given SunSpec point.
Arguments:
point: The SunSpec point object to read.
Returns:
The address of the first register of the point.
"""
return point.model.model_addr + point.offset # type: ignore[no-any-return]
[docs] async def write_registers(self, address: int, values: bytes) -> None:
"""Write to the specified sequential register range in the device. Based on
the 16-bit Modbus register size, the data in the passed bytes should in 2-byte
chunks with each having a big-endian byte order. The local data is not
updated.
Arguments:
address: The first register to write.
count: The total number of sequential registers to write.
Returns:
The raw bytes to be written to the device.
Raises:
sundog.ModbusError: When a Modbus exception response is received.
"""
response = await self.protocol.write_registers(
address=address, values=values, unit=0x01
)
if isinstance(response, pymodbus.pdu.ExceptionResponse):
raise sundog.ModbusError(exception=response)
[docs] async def write_point(
self, point: sunspec2.modbus.client.SunSpecModbusClientPoint
) -> None:
"""Write the passed point from the local data to the device.
Arguments:
point: The SunSpec point object to write.
Raises:
sundog.ModbusError: When a Modbus exception response is received.
"""
if point.sf is not None:
await self.read_point(point=point.model.points[point.sf])
bytes_to_write = point.get_mb()
await self.write_registers(
address=self.point_address(point=point),
values=bytes_to_write,
)