pycyphal.presentation.subscription_synchronizer package
Submodules
Module contents
- pycyphal.presentation.subscription_synchronizer.get_timestamp_field(item: Tuple[Any, pycyphal.transport._transfer.TransferFrom]) float [source]
Message ordering key function that defines key as the value of the
timestamp
field of the message converted to seconds. The field is expected to be of typeuavcan.time.SynchronizedTimestamp
. This function will fail with an attribute error if such field is not present in the message.
- pycyphal.presentation.subscription_synchronizer.get_local_reception_timestamp(item: Tuple[Any, pycyphal.transport._transfer.TransferFrom]) float [source]
Message ordering key function that defines key as the local system (wall) reception timestamp (in seconds). This function works for messages of any type.
- pycyphal.presentation.subscription_synchronizer.get_local_reception_monotonic_timestamp(item: Tuple[Any, pycyphal.transport._transfer.TransferFrom]) float [source]
Message ordering key function that defines key as the local monotonic reception timestamp (in seconds). This function works for messages of any type. This function may perform worse than the wall time alternative because monotonic timestamp is usually less accurate.
- class pycyphal.presentation.subscription_synchronizer.Synchronizer(subscribers: Iterable[pycyphal.presentation._port._subscriber.Subscriber[Any]])[source]
Bases:
abc.ABC
Synchronizer is used to receive messages from multiple subjects concurrently such that messages that belong to the same group, and only those, are delivered to the application synchronously in one batch. Different synchronization policies may be provided by different implementations of this abstract class.
Related sources:
Caution
Synchronizers may not be notified when the underlying subscribers are closed. That is, closing any or all of the subscribers will not automatically unblock data consumers blocked on their synchronizer. This may be changed later.
Warning
This API (incl. all derived types) is experimental and subject to breaking changes.
- __init__(subscribers: Iterable[pycyphal.presentation._port._subscriber.Subscriber[Any]]) None [source]
- property subscribers: tuple[Subscriber[Any], ...][source]
The set of subscribers whose outputs are synchronized. The ordering matches that of the output data.
- async get(timeout: float = 0) tuple[Any, ...] | None [source]
Like
receive_for()
but without transfer metadata, only message objects.
- abstract receive_in_background(handler: Callable[[...], None]) None [source]
See
pycyphal.presentation.Subscriber
. The for N subscribers, the callback receives N tuples ofMessageWithMetadata
.
- get_in_background(handler: Callable[[...], None]) None [source]
This is like
receive_in_background()
but the callback receives message objects directly rather than the tuples of (message, metadata). The two methods cannot be used concurrently.
- __aiter__() pycyphal.presentation.subscription_synchronizer._common.Synchronizer [source]
Iterator API support. Returns self unchanged.
- async __anext__() tuple[tuple[MessageWithMetadata, Subscriber[Any]], ...] [source]
This is like
receive()
with an infinite timeout, so it always returns something. Iteration stops when the instance isclose()
d.The return type is not just a message with metadata but is a tuple of that with its subscriber. The reason we need the subscriber here is to enhance usability because it is not possible to use
zip
,enumerate
, and other combinators with async iterators. The typical usage is then like (synchronizing two subjects here):async for (((msg_a, meta_a), subscriber_a), ((msg_b, meta_b), subscriber_b),) in synchronizer: ...