pycyphal.presentation.subscription_synchronizer package

Submodules

Module contents

pycyphal.presentation.subscription_synchronizer.get_timestamp_field(item: Tuple[Any, TransferFrom]) float[source]

Message ordering key function that defines key as the value of the timestamp field of the message converted to seconds. The field is expected to be of type uavcan.time.SynchronizedTimestamp. This function will fail with an attribute error if such field is not present in the message.

pycyphal.presentation.subscription_synchronizer.get_local_reception_timestamp(item: Tuple[Any, TransferFrom]) float[source]

Message ordering key function that defines key as the local system (wall) reception timestamp (in seconds). This function works for messages of any type.

pycyphal.presentation.subscription_synchronizer.get_local_reception_monotonic_timestamp(item: Tuple[Any, TransferFrom]) float[source]

Message ordering key function that defines key as the local monotonic reception timestamp (in seconds). This function works for messages of any type. This function may perform worse than the wall time alternative because monotonic timestamp is usually less accurate.

class pycyphal.presentation.subscription_synchronizer.Synchronizer(subscribers: Iterable[Subscriber[Any]])[source]

Bases: ABC

Synchronizer is used to receive messages from multiple subjects concurrently such that messages that belong to the same group, and only those, are delivered to the application synchronously in one batch. Different synchronization policies may be provided by different implementations of this abstract class.

Related sources:

Caution

Synchronizers may not be notified when the underlying subscribers are closed. That is, closing any or all of the subscribers will not automatically unblock data consumers blocked on their synchronizer. This may be changed later.

Warning

This API (incl. all derived types) is experimental and subject to breaking changes.

__init__(subscribers: Iterable[Subscriber[Any]]) None[source]
property subscribers: tuple[Subscriber[Any], ...][source]

The set of subscribers whose outputs are synchronized. The ordering matches that of the output data.

abstract async receive_for(timeout: float) Tuple[Tuple[Any, TransferFrom], ...] | None[source]

See pycyphal.presentation.Subscriber

async receive(monotonic_deadline: float) Tuple[Tuple[Any, TransferFrom], ...] | None[source]

See pycyphal.presentation.Subscriber

async get(timeout: float = 0) tuple[Any, ...] | None[source]

Like receive_for() but without transfer metadata, only message objects.

abstract receive_in_background(handler: Callable[[...], None]) None[source]

See pycyphal.presentation.Subscriber. The for N subscribers, the callback receives N tuples of MessageWithMetadata.

get_in_background(handler: Callable[[...], None]) None[source]

This is like receive_in_background() but the callback receives message objects directly rather than the tuples of (message, metadata). The two methods cannot be used concurrently.

__aiter__() Synchronizer[source]

Iterator API support. Returns self unchanged.

async __anext__() tuple[tuple[Tuple[Any, TransferFrom], Subscriber[Any]], ...][source]

This is like receive() with an infinite timeout, so it always returns something. Iteration stops when the instance is close() d.

The return type is not just a message with metadata but is a tuple of that with its subscriber. The reason we need the subscriber here is to enhance usability because it is not possible to use zip, enumerate, and other combinators with async iterators. The typical usage is then like (synchronizing two subjects here):

async for (((msg_a, meta_a), subscriber_a), ((msg_b, meta_b), subscriber_b),) in synchronizer:
    ...
close() None[source]

Idempotent.

__repr__() str[source]