pycyphal.presentation.subscription_synchronizer.transfer_id module

class pycyphal.presentation.subscription_synchronizer.transfer_id.TransferIDSynchronizer(subscribers: Iterable[Subscriber[Any]], span: int = 30)[source]

Bases: Synchronizer

Messages that share the same (source node-ID, transfer-ID) are assumed synchronous (i.e., all messages in a synchronized group always originate from the same node). Each received message is used at most once (it follows that the output frequency is not higher than the frequency of the slowest subject). Anonymous messages are dropped unconditionally (because the source node-ID is not defined for them).

The Cyphal Specification does not recommend this mode of synchronization but it is provided for completeness. If not sure, use other synchronizers instead.

Prepare some scaffolding for the demo:

>>> from uavcan.primitive.scalar import Integer64_1, Bit_1
>>> from pycyphal.transport.loopback import LoopbackTransport
>>> from pycyphal.presentation import Presentation
>>> pres = Presentation(LoopbackTransport(1234))
>>> pub_a = pres.make_publisher(Integer64_1, 2000)
>>> pub_b = pres.make_publisher(Integer64_1, 2001)
>>> pub_c = pres.make_publisher(Bit_1, 2002)
>>> sub_a = pres.make_subscriber(pub_a.dtype, pub_a.port_id)
>>> sub_b = pres.make_subscriber(pub_b.dtype, pub_b.port_id)
>>> sub_c = pres.make_subscriber(pub_c.dtype, pub_c.port_id)

Set up the synchronizer. It will take ownership of our subscribers:

>>> from pycyphal.presentation.subscription_synchronizer.transfer_id import TransferIDSynchronizer
>>> synchronizer = TransferIDSynchronizer([sub_a, sub_b, sub_c])

Publish some messages in an arbitrary order:

>>> _ = doctest_await(pub_a.publish(Integer64_1(123)))
>>> _ = doctest_await(pub_b.publish(Integer64_1(321)))
>>> _ = doctest_await(pub_c.publish(Bit_1(True)))
>>> doctest_await(asyncio.sleep(1.0))               # Wait a little and publish another group.
>>> _ = doctest_await(pub_c.publish(Bit_1(False)))
>>> _ = doctest_await(pub_b.publish(Integer64_1(654)))
>>> _ = doctest_await(pub_a.publish(Integer64_1(456)))
>>> doctest_await(asyncio.sleep(1.0))
>>> _ = doctest_await(pub_b.publish(Integer64_1(654)))  # This group is incomplete, no output produced.
>>> doctest_await(asyncio.sleep(1.0))

Now the synchronizer will automatically sort our messages into well-defined synchronized groups:

>>> doctest_await(synchronizer.get())  # First group.
(...Integer64.1...(value=123), ...Integer64.1...(value=321), ...Bit.1...(value=True))
>>> doctest_await(synchronizer.get())  # Second group.
(...Integer64.1...(value=456), ...Integer64.1...(value=654), ...Bit.1...(value=False))
>>> doctest_await(synchronizer.get()) is None  # No more groups.
True

Closing the synchronizer will also close all subscribers we passed to it (if necessary you can create additional subscribers for the same subjects):

>>> synchronizer.close()
DEFAULT_SPAN = 30
__init__(subscribers: Iterable[Subscriber[Any]], span: int = 30) None[source]
Parameters:
  • subscribers – The set of subscribers to synchronize data from. The constructed instance takes ownership of the subscribers – they will be closed on close().

  • span – Old clusters will be removed to ensure that the sequence number delta between the oldest and the newest does not exceed this limit. This protects against mismatch if cyclic transfer-ID is used and limits the time and memory requirements.

async receive_for(timeout: float) Tuple[Tuple[Any, TransferFrom], ...] | None[source]
receive_in_background(handler: Callable[[...], None]) None[source]