Source code for ewoksfluo.xrffit.queue_messages

from typing import NamedTuple, Optional, Union
import numpy
from typing_extensions import TypeGuard


[docs]class StopMsg(NamedTuple): sendid: int cmd: str = "STOP"
[docs]class GroupMsg(NamedTuple): sendid: int destinationid: int group: str data: dict cmd: str = "GROUP"
[docs]class DatasetMsg(NamedTuple): sendid: int destinationid: int group: str name: str npoints: int attrs: dict nscans: Optional[int] cmd: str = "DATASET"
[docs]class DataMsg(NamedTuple): sendid: int destinationid: int group: str name: str value: numpy.ndarray scan_index: Optional[int] cmd: str = "DATA"
QueueMsg = Union[StopMsg, GroupMsg, DatasetMsg, DataMsg]
[docs]def is_StopMsg(msg: QueueMsg) -> TypeGuard[StopMsg]: return msg.cmd == "STOP"
[docs]def is_GroupMsg(msg: QueueMsg) -> TypeGuard[GroupMsg]: return msg.cmd == "GROUP"
[docs]def is_DatasetMsg(msg: QueueMsg) -> TypeGuard[DatasetMsg]: return msg.cmd == "DATASET"
[docs]def is_DataMsg(msg: QueueMsg) -> TypeGuard[DataMsg]: return msg.cmd == "DATA"