Source code for sundog.server

import functools
import typing

import attr
import pymodbus.datastore
import pymodbus.device
import pymodbus.server.trio
import pymodbus.interfaces
import sunspec2.mb
import sunspec2.modbus.client
import trio

import sundog


base_address = 40_000


[docs]@attr.s(auto_attribs=True) class ModelSummary: """A model can be summarized by its ID and length. While models of fixed length would not need the length provided, those with repeatable blocks need a length to indicate the number of repetitions of the repeating block.""" id: int """The integer model ID.""" length: int """The model length inclusive of the fixed and repeating blocks and exclusive of the model's ID and length header."""
[docs]@attr.s(auto_attribs=True) class SunSpecModbusSlaveContext(pymodbus.interfaces.IModbusSlaveContext): """A :mod:`pymodbus` slave context that is backed by the ``pysunspec2`` device object.""" sunspec_device: sunspec2.modbus.client.SunSpecModbusClientDevice """The ``pysunspec2`` device object use for local storage of the SunSpec data."""
[docs] def getValues(self, fx: int, address: int, count: int = 1) -> bytearray: """See :meth:`pymodbus.interfaces.IModbusSlaveContext.getValues`.""" request = PreparedRequest.build( base_address=self.sunspec_device.base_addr, requested_address=address, count=count, all_registers=self.sunspec_device.get_mb(), ) return request.data[request.slice]
[docs] def setValues(self, fx: int, address: int, values: bytes) -> None: """See :meth:`pymodbus.interfaces.IModbusSlaveContext.setValues`.""" request = PreparedRequest.build( base_address=self.sunspec_device.base_addr, requested_address=address, count=len(values) // 2, all_registers=self.sunspec_device.get_mb(), ) data = bytearray(request.data) data[request.slice] = values self.sunspec_device.set_mb(data=data[len(sundog.base_address_sentinel) :])
[docs] def validate(self, fx: int, address: int, count: int = 1) -> bool: """See :meth:`pymodbus.interfaces.IModbusSlaveContext.validate`.""" return ( self.sunspec_device.base_addr <= address and address + count <= self._end_address() )
def _end_address(self) -> int: """Calculate the exclusive last address. This is the first address which cannot be read. """ return ( base_address + ( (len(sundog.base_address_sentinel) + len(self.sunspec_device.get_mb())) // 2 ) + 2 )
[docs]@attr.s(auto_attribs=True) class Server: """A SunSpec Modbus TCP server using :mod:`trio` support in :mod:`pymodbus` for communication and `pysunspec2` for loading models and holding the local cache of the data. The actual TCP server can be launched using :meth:`Server.tcp_server`. .. code-block:: python await nursery.start( functools.partial( trio.serve_tcp, server.tcp_server, host="127.0.0.1", port=0, ), ) .. automethod:: __getitem__ """ slave_context: SunSpecModbusSlaveContext """The single slave context to be served by this server. This is backed by the SunSpec device object. """ server_context: pymodbus.datastore.ModbusServerContext """The datastore for this pymodbus server. Presently only a single slave context is supported.""" identity: pymodbus.device.ModbusDeviceIdentification """The identity information for this Modbus server."""
[docs] @classmethod def build(cls, model_summaries: typing.Sequence[ModelSummary]) -> "Server": """Build the server instance based on the passed model summaries. Any per-point or bulk data update must be done separately. Arguments: model_summaries: The models which you want the server to provide. Returns: The instance of the server datastore pieces. """ address = base_address + len(sundog.base_address_sentinel) // 2 sunspec_device = sunspec2.modbus.client.SunSpecModbusClientDevice() sunspec_device.base_addr = base_address for model_summary in model_summaries: model = sunspec2.modbus.client.SunSpecModbusClientModel( model_id=model_summary.id, model_addr=address, model_len=model_summary.length, mb_device=sunspec_device, ) address += 2 + model_summary.length sunspec_device.add_model(model) slave_context = SunSpecModbusSlaveContext(sunspec_device=sunspec_device) return cls( slave_context=slave_context, server_context=pymodbus.datastore.ModbusServerContext( slaves=slave_context, single=True, ), identity=pymodbus.device.ModbusDeviceIdentification(), )
[docs] def __getitem__( self, item: typing.Union[int, str] ) -> sunspec2.modbus.client.SunSpecModbusClientModel: """SunSpec models are accessible by indexing the client using either the model number or model name. .. code-block:: python model_1 = server[1] model_common = server["common"] assert model_1 is model_common Arguments: item: The integer or string identifying the model. Returns: The requested model. """ [model] = self.slave_context.sunspec_device.models[item] return model
[docs] async def tcp_server(self, server_stream: trio.SocketStream) -> None: """Handle serving over a stream. See :class:`Server` for an example. Arguments: server_stream: The stream to communicate over. """ await pymodbus.server.trio.tcp_server( server_stream=server_stream, context=self.server_context, identity=self.identity, )
[docs]@attr.s(auto_attribs=True) class PreparedRequest: """Holds some common bits used in serving a request.""" data: bytearray """The entire block of registers. Each register is a 2-byte chunk stored in big-endian byte order. The first element is the high byte of the server's register located at the base address. """ slice: slice """The slice covering the bytes of the registers to be operated on.""" offset_address: int """The offset in 16-bit/2-byte registers relative to the server's base address.""" bytes_offset_address: int """The offset in bytes relative to the server's base address."""
[docs] @classmethod def build( cls, base_address: int, requested_address: int, count: int, all_registers: bytes ) -> "PreparedRequest": """Build the instance based on the passed raw request information. Arguments: base_address: The SunSpec base register address. requested_address: The requested address. count: The requested register count. all_registers: The raw register data for all models. Returns: The prepared request information. """ # This is super lazy, what with building _all_ data even if you only need a # register or two. But, optimize when we need to. data = bytearray(sundog.base_address_sentinel) data.extend(all_registers) data.extend( sunspec2.mb.SUNS_END_MODEL_ID.to_bytes( length=2, byteorder="big", signed=False ) ) offset_address = requested_address - base_address return cls( data=data, slice=slice(2 * offset_address, 2 * (offset_address + count)), offset_address=offset_address, bytes_offset_address=2 * offset_address, )