pycyphal.presentation.subscription_synchronizer.monotonic_clustering module

class pycyphal.presentation.subscription_synchronizer.monotonic_clustering.MonotonicClusteringSynchronizer(subscribers: Iterable[pycyphal.presentation.Subscriber[Any]], f_key: KeyFunction, tolerance: float, *, depth: int = 15)[source]

Bases: Synchronizer

Messages are clustered by the message ordering key with the specified tolerance. The key shall be monotonically non-decreasing except under special circumstances such as time adjustment. Once a full cluster is collected, it is delivered to the application, and this and all older clusters are dropped (where “older” means smaller key). Each received message is used at most once (it follows that the output frequency is not higher than the frequency of the slowest subject). If a given cluster receives multiple messages from the same subject, the latest one is used (this situation occurs if the subjects are updated at different rates).

The maximum number of clusters, or depth, is limited (oldest dropped). This is needed to address the case when the message ordering key leaps backward (for example, if the sycnhronized time is adjusted), because some clusters may end up in the future and there needs to be a mechanism in place to remove them. This is also necessary to ensure that the worst-case complexity is well-bounded.

Old cluster removal is based on a simple non-overflowing sequence counter that is assigned to each new cluster and then incremented; when the limit is exceeded, the cluster with the smallest seq no is dropped. This approach allows us to reason about temporal ordering even if the key is not monotonically non-decreasing.

This synchronizer is well-suited for use in real-time embedded systems, where the clustering logic can be based on Cavl + O1Heap. The attainable worst-case time complexity is O(log d), where d is the depth limit; the memory requirement is c*s, where s is the number of subscribers assuming unity message size.

The behavior is illustrated on the following timeline:


Time synchronization across multiple subjects with jitter, message loss, and publication frequency variation. Time is increasing left to right. Messages that were identified as belonging to the same synchronized group are connected.

A usage example is provided below. First it is necessary to prepare some scaffolding:

>>> from uavcan.primitive.scalar import Integer64_1, Bit_1
>>> from pycyphal.transport.loopback import LoopbackTransport
>>> from pycyphal.presentation import Presentation
>>> pres = Presentation(LoopbackTransport(1234))
>>> pub_a = pres.make_publisher(Integer64_1, 2000)
>>> pub_b = pres.make_publisher(Integer64_1, 2001)
>>> pub_c = pres.make_publisher(Bit_1, 2002)
>>> sub_a = pres.make_subscriber(pub_a.dtype, pub_a.port_id)
>>> sub_b = pres.make_subscriber(pub_b.dtype, pub_b.port_id)
>>> sub_c = pres.make_subscriber(pub_c.dtype, pub_c.port_id)

Set up the synchronizer. It will take ownership of our subscribers. In this example, we are using the local reception timestamp for synchronization, but we could also use the timestamp field or whatever by swapping the ordering key function here:

>>> from pycyphal.presentation.subscription_synchronizer import get_local_reception_timestamp
>>> from pycyphal.presentation.subscription_synchronizer.monotonic_clustering import MonotonicClusteringSynchronizer
>>> synchronizer = MonotonicClusteringSynchronizer([sub_a, sub_b, sub_c], get_local_reception_timestamp, 0.1)
>>> synchronizer.tolerance
>>> synchronizer.tolerance = 0.75  # Tolerance can be changed at any moment.

Publish some messages in an arbitrary order:

>>> _ = doctest_await(pub_a.publish(Integer64_1(123)))
>>> _ = doctest_await(pub_a.publish(Integer64_1(234)))  # Replaces the previous one because newer.
>>> _ = doctest_await(pub_b.publish(Integer64_1(321)))
>>> _ = doctest_await(pub_c.publish(Bit_1(True)))
>>> doctest_await(asyncio.sleep(2.0))               # Wait a little and publish another group.
>>> _ = doctest_await(pub_c.publish(Bit_1(False)))
>>> _ = doctest_await(pub_b.publish(Integer64_1(654)))
>>> _ = doctest_await(pub_a.publish(Integer64_1(456)))
>>> doctest_await(asyncio.sleep(1.5))
>>> _ = doctest_await(pub_a.publish(Integer64_1(789)))
>>> # This group is incomplete because we did not publish on subject B, so no output will be generated.
>>> _ = doctest_await(pub_c.publish(Bit_1(False)))
>>> doctest_await(asyncio.sleep(1.5))
>>> _ = doctest_await(pub_a.publish(Integer64_1(741)))
>>> _ = doctest_await(pub_b.publish(Integer64_1(852)))
>>> _ = doctest_await(pub_c.publish(Bit_1(True)))
>>> doctest_await(asyncio.sleep(1.0))

Now the synchronizer will automatically sort our messages into well-defined synchronized groups:

>>> doctest_await(synchronizer.get())  # First group.
(...Integer64.1...(value=234), ...Integer64.1...(value=321), ...Bit.1...(value=True))
>>> doctest_await(synchronizer.get())  # Second group.
(...Integer64.1...(value=456), ...Integer64.1...(value=654), ...Bit.1...(value=False))
>>> doctest_await(synchronizer.get())  # Fourth group -- the third one was incomplete so dropped.
(...Integer64.1...(value=741), ...Integer64.1...(value=852), ...Bit.1...(value=True))
>>> doctest_await(synchronizer.get()) is None  # No more groups.

Closing the synchronizer will also close all subscribers we passed to it (if necessary you can create additional subscribers for the same subjects):

>>> synchronizer.close()

alias of Callable[[Tuple[Any, TransferFrom]], float]

__init__(subscribers: Iterable[pycyphal.presentation.Subscriber[Any]], f_key: KeyFunction, tolerance: float, *, depth: int = 15) None[source]
  • subscribers – The set of subscribers to synchronize data from. The constructed instance takes ownership of the subscribers – they will be closed on close().

  • f_key – Message ordering key function; e.g., pycyphal.presentation.subscription_synchronizer.get_local_reception_timestamp(). Any monotonic non-decreasing function of the received message with its metadata is acceptable, and it doesn’t necessarily have to be time-related.

  • tolerance – Messages whose absolute key difference does not exceed this limit will be clustered together. This value can be changed dynamically, which can be leveraged for automatic tolerance configuration as some function of the output frequency.

  • depth – At most this many newest clusters will be maintained at any moment. This limits the time and memory requirements. If the depth is too small, some valid clusters may be dropped prematurely.

property tolerance: float[source]

The current tolerance value.

Auto-tuning with feedback can be implemented on top of this synchronizer such that when a new synchronized group is delivered, the key delta from the previous group is computed and the tolerance is updated as some function of that. If the tolerance is low, more synchronized groups will be skipped (delta increased); therefore, at the next successful synchronized group reassembly the tolerance will be increased. With this method, if the initial tolerance is large, the synchronizer may initially output poorly grouped messages, but it will converge to a more sensible tolerance setting in a few iterations.

async receive_for(timeout: float) Tuple[Tuple[Any, TransferFrom], ...] | None[source]
receive_in_background(handler: Callable[[...], None]) None[source]