pycyphal.transport.redundant package

Module contents

Redundant pseudo-transport overview

Native support for redundant transports is one of the core features of Cyphal. The class RedundantTransport implements this feature within PyCyphal. It works by aggregating zero or more instances of pycyphal.transport.Transport into a composite that implements the redundant transport management logic as defined in the Cyphal specification:

  • Every outgoing transfer is replicated into all of the available redundant interfaces.

  • Incoming transfers are deduplicated so that the local node receives at most one copy of each unique transfer received from the bus.

There exist two approaches to implementing transport-layer redundancy. The differences are confined to the specifics of a particular implementation, they are not manifested on the bus – nodes exhibit identical behavior regardless of the chosen strategy:

  • Frame-level redundancy. In this case, multiple redundant interfaces are managed by the same transport state machine. This strategy is more efficient in the sense of computing power and memory resources required to accommodate a given amount of networking workload compared to the alternative. Its limitation is that the redundant transports shall implement the same protocol (e.g., CAN), and all involved transports shall be configured to use the same MTU.

  • Transfer-level redundancy. In this case, redundant interfaces are managed one level of abstraction higher: not at the level of separate transport frames, but at the level of complete Cyphal transfers (if these terms sound unfamiliar, please read the Cyphal specification). This approach complicates the data flow inside the library, but it supports dissimilar transport redundancy, allowing one to aggregate transports implementing different protocols (e.g., UDP with serial, possibly with different MTU). Dissimilar redundancy is often sought in high-reliability/safety-critical applications, as reviewed in https://forum.opencyphal.org/t/557.

In accordance with its design goals, PyCyphal implements the transfer-level redundancy management strategy since it offers greater flexibility and a wider set of available design options. It is expected though that real-time embedded applications may often find frame-level redundancy preferable.

This implementation uses the term inferior to refer to a member of a redundant group:

  • Inferior transport is a transport that belongs to a redundant transport group.

  • Inferior session is a transport session that is owned by an inferior transport.

Whenever a redundant transport is requested to construct a new session, it does so by initializing an instance of RedundantInputSession or RedundantOutputSession. The constructed instance then holds a set of inferior sessions, one from each inferior transport, all sharing the same session specifier (pycyphal.transport.SessionSpecifier). The resulting relationship between inferior transports and inferior sessions can be conceptualized as a matrix where columns represent inferior transports and rows represent sessions:

Transport 0

Transport 1

Transport M

Session 0

S0T0

S0T1

S0Tm

Session 1

S1T0

S1T1

S1Tm

Session N

SnT0

SnT1

SnTm

Attachment/detachment of a transport is modeled as an addition/removal of a column; likewise, construction/retirement of a session is modeled as an addition/removal of a row. While the construction of a row or a column is in progress, the matrix resides in an inconsistent state. If any error occurs in the process, the matrix is rolled back to the previous consistent state, and the already-constructed sessions of the new vector are retired.

Existing redundant sessions retain validity across any changes in the matrix configuration. Logic that relies on a redundant instance is completely shielded from any changes in the underlying transport configuration, meaning that the entire underlying transport structure may be swapped out with a completely different one without affecting the higher levels. A practical extreme case is where a redundant transport is constructed with zero inferior transports, its session instances are configured, and the inferior transports are added later. This is expected to be useful for long-running applications that have to retain the presentation-level structure across changes in the transport configuration done on-the-fly without stopping the application.

Since the redundant transport itself also implements the interface pycyphal.transport.Transport, it technically could be used as an inferior of another redundant transport instance, although the practicality of such arrangement is questionable. Attaching a redundant transport as an inferior of itself is expressly prohibited and results in an error.

Inferior aggregation restrictions

Transports are categorized into one of the following two categories by the value of their transfer-ID (TID) modulo (i.e., the transfer-ID overflow period).

Transports where the set of transfer-ID values contains less than 2**48 (0x_1_0000_0000_0000) distinct elements are said to have cyclic transfer-ID. In such transports, the value of the transfer-ID increases steadily starting from zero, incremented once per emitted transfer, until the highest value is reached, then the value is wrapped over to zero:

modulo
     /|   /|   /|
    / |  / |  / |
   /  | /  | /  | /
  /   |/   |/   |/
0 ----------------->
        time

Transports where the set of transfer-ID values is larger are said to have monotonic transfer-ID. In such transports, the set is considered to be large enough to be inexhaustible for any practical application, hence a wrap-over to zero is expected to never occur. (For example, a Cyphal/UDP transport operating over a 10 GbE link at the theoretical throughput limit of 14.9 million transfers per second will exhaust the set in approx. 153 years in the worst case.)

Monotonic transports impose a higher data overhead per frame due to the requirement to accommodate a sufficiently wide integer field for the transfer-ID value. Their advantage is that transfer-ID values carried over inferior transports of a redundant group are guaranteed to remain in-phase for the entire lifetime of the network. The importance of this guarantee can be demonstrated with the following counter-example of two transports leveraging different transfer-ID modulo for the same session, where the unambiguous mapping between their transfer-ID values is lost with the beginning of the epoch B1 after the first overflow:

A0    A1    A2    A3
    /|    /|    /|
   / |   / |   / |   /
  /  |  /  |  /  |  /
 /   | /   | /   | /
/    |/    |/    |/

B0   B1   B2   B3   B4
   /|   /|   /|   /|
  / |  / |  / |  / |
 /  | /  | /  | /  | /
/   |/   |/   |/   |/
---------------------->
         time

The phase ambiguity of cyclic-TID transports results in the following hard requirements:

  1. Inferior transports under the same redundant transport instance shall belong to the same TID monotonicity category: either all cyclic or all monotonic.

  2. In the case where the inferiors utilize cyclic TID counters, the TID modulo shall be identical for all inferiors.

The implementation raises an error if an attempt is made to violate any of the above requirements. The TID monotonicity category of an inferior is determined by querying pycyphal.transport.Transport.protocol_parameters.

Transmission

As stated in the Specification, every emitted transfer shall be replicated into all available redundant interfaces. The rest of the logic does not concern wire compatibility, and hence it is implementation-defined.

This implementation applies an optimistic result aggregation policy where it considers a transmission successful if at least one inferior was able to successfully complete it. The handling of time-outs, exceptions, and other edge cases is described in detail in the documentation for RedundantOutputSession.

Every outgoing transfer will be serialized and transmitted by each inferior independently from each other. This may result in different number of transport frames emitted if the inferiors are configured to use different MTU, or if they implement different transport protocols.

Inferiors compute the modulus of the transfer-ID according to the protocol they implement independently from each other; however, despite the independent computation, it is guaranteed that they will always arrive at the same final transfer-ID value thanks to the aggregation restrictions introduced earlier. This guarantee is paramount for service calls, because Cyphal requires the caller to match a service response with the appropriate request state by comparing its transfer-ID value, which in turn requires that the logic that performs such matching is aware about the transfer-ID modulo in use.

Reception

Received transfers need to be deduplicated (dereplicated) so that the higher layers of the protocol stack would not receive each unique transfer more than once (as demanded by the Specification).

Transfer reception and deduplication are managed by the class RedundantInputSession. There exist two deduplication strategies, chosen automatically depending on the TID monotonicity category of the inferiors (as described earlier, it is enforced that all inferiors in a redundant group belong to the same TID monotonicity category).

The cyclic-TID deduplication strategy picks a transport interface at random and stays with it as long as the interface keeps delivering transfers. If the currently used interface ceases to deliver transfers, the strategy may switch to another one, thus manifesting the automatic fail-over. The cyclic-TID strategy cannot utilize more than one interface simultaneously due to the risk of transfer duplication induced by a possible transport latency disbalance (this is discussed at https://github.com/OpenCyphal/specification/issues/8 and in the Specification).

The monotonic-TID deduplication strategy always picks the first transfer to arrive. This approach provides instant fail-over in the case of an interface failure and ensures that the worst case transfer latency is bounded by the latency of the best-performing transport.

The following two swim lane diagrams should illustrate the difference. First, the case of cyclic-TID:

A   B     Deduplicated
|   |     |
T0  |     T0     <-- First transfer received from transport A.
T1  T0    T1     <-- Transport B is auto-assigned as a back-up.
T2  T1    T2     <-- Up to this point the transport functions normally.
X   T2    |      <-- Transport A fails here.
    T3    |      <-- Valid transfers from transport B are ignored due to the mandatory fail-over delay.
    ...   |
    Tn    Tn     <-- After the delay, the deduplicator switches over to the back-up transport.
    Tn+1  Tn+1   <-- Now, the roles of the back-up transport and the main transport are swapped.
    Tn+2  Tn+2

Monotonic-TID:

A   B     Deduplicated
|   |     |
T0  |     T0    <-- The monotonic-TID strategy always picks the first transfer to arrive.
T1  T0    T1    <-- All available interfaces are always considered.
T2  T1    T2    <-- The result is that the transfer latency is defined by the best-performing transport.
|   T2    |     <-- Here, the latency of transport A has increased temporarily.
|   T3    T3    <-- The deduplication strategy reacts by picking the next transfer from transport B.
T3  X     |     <-- Shall one transport fail, the deduplication strategy fails over immediately.
T4        T4

Anonymous transfers are a special case: a deduplicator has to keep local state per session in order to perform its functions; since anonymous transfers are fundamentally stateless, they are always accepted unconditionally. The implication is that redundant transfers may be replicated. This behavior is due to the design of the protocol and is not specific to this implementation.

Inheritance diagram

Inheritance diagram of pycyphal.transport.redundant._redundant_transport, pycyphal.transport.redundant._error, pycyphal.transport.redundant._session._base, pycyphal.transport.redundant._session._input, pycyphal.transport.redundant._session._output

Usage

A freshly constructed redundant transport is empty. Redundant transport instances are intentionally designed to be very mutable, allowing one to reconfigure them freely on-the-fly to support the needs of highly dynamic applications. Such flexibility allows one to do things that are illegal per the Cyphal specification, such as changing the node-ID while the node is running, so beware.

>>> tr = RedundantTransport()
>>> tr.inferiors  # By default, there are none.
[]

It is possible to begin creating session instances immediately, before configuring the inferiors. Any future changes will update all dependent session instances automatically.

>>> from pycyphal.transport import OutputSessionSpecifier, InputSessionSpecifier, MessageDataSpecifier
>>> from pycyphal.transport import PayloadMetadata, Transfer, Timestamp, Priority, ProtocolParameters
>>> pm = PayloadMetadata(1024)
>>> s0 = tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(2345), None), pm)
>>> s0.inferiors    # No inferior transports; hence, no inferior sessions.
[]

If we attempted to transmit or receive a transfer while there are no inferiors, the call would just time out.

In this example, we will be experimenting with the loopback transport. Below we are attaching a new inferior transport instance; the session instances are updated automatically.

>>> from pycyphal.transport.loopback import LoopbackTransport
>>> lo_0 = LoopbackTransport(local_node_id=42)
>>> tr.attach_inferior(lo_0)
>>> tr.inferiors
[LoopbackTransport(...)]
>>> s0.inferiors
[LoopbackOutputSession(...)]

Add another inferior and another session:

>>> lo_1 = LoopbackTransport(local_node_id=42)
>>> tr.attach_inferior(lo_1)
>>> s1 = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(2345), None), pm)
>>> len(tr.inferiors)
2
>>> len(s0.inferiors)  # Updated automatically.
2
>>> len(s1.inferiors)
2
>>> assert tr.inferiors[0].output_sessions[0] is s0.inferiors[0]    # Navigating the session matrix.
>>> assert tr.inferiors[1].output_sessions[0] is s0.inferiors[1]
>>> assert tr.inferiors[0].input_sessions[0] is s1.inferiors[0]
>>> assert tr.inferiors[1].input_sessions[0] is s1.inferiors[1]

A simple exchange test (remember this is a loopback, so we get back whatever we send):

>>> import asyncio
>>> doctest_await(s0.send(Transfer(Timestamp.now(), Priority.LOW, 1111, fragmented_payload=[]),
...                       asyncio.get_event_loop().time() + 1.0))
True
>>> doctest_await(s1.receive(asyncio.get_event_loop().time() + 1.0))
RedundantTransferFrom(..., transfer_id=1111, fragmented_payload=[], ...)

Inject a failure into one inferior. The redundant transport will continue to function with the other inferior; an error message will be logged:

>>> lo_0.output_sessions[0].exception = RuntimeError('Injected failure')  
>>> doctest_await(s0.send(Transfer(Timestamp.now(), Priority.LOW, 1112, fragmented_payload=[]),
...                       asyncio.get_event_loop().time() + 1.0))
True
>>> doctest_await(s1.receive(asyncio.get_event_loop().time() + 1.0))   # Still works.
RedundantTransferFrom(..., transfer_id=1112, fragmented_payload=[], ...)

Inferiors that are no longer needed can be detached. The redundant transport cleans up after itself by closing all inferior sessions in the detached transport.

>>> tr.detach_inferior(lo_0)
>>> len(tr.inferiors)   # Yup, removed.
1
>>> len(s0.inferiors)   # And the existing session instances are updated.
1
>>> len(s1.inferiors)   # Indeed they are.
1

One cannot mix inferiors with incompatible TID monotonicity or different node-ID. For example, it is not possible to use CAN with UDP in the same redundant group.

>>> lo_0 = LoopbackTransport(local_node_id=42)
>>> lo_0.protocol_parameters = ProtocolParameters(transfer_id_modulo=32, max_nodes=128, mtu=8)
>>> tr.attach_inferior(lo_0)                        # TID monotonicity mismatch.    
Traceback (most recent call last):
    ...
InconsistentInferiorConfigurationError: The new inferior shall use monotonic transfer-ID counters...
>>> tr.attach_inferior(LoopbackTransport(local_node_id=None))  # Node-ID mismatch.  
Traceback (most recent call last):
    ...
InconsistentInferiorConfigurationError: The inferior has a different node-ID...

The parameters of a redundant transport are computed from the inferiors. If the inferior set is changed, the transport parameters may also be changed. This may create unexpected complications because parameters of real transports are generally immutable, so it is best to avoid unnecessary runtime transformations unless required by the business logic.

>>> tr.local_node_id
42
>>> tr.protocol_parameters
ProtocolParameters(...)
>>> tr.close()                  # All inferiors and all sessions are closed.
>>> tr.inferiors
[]
>>> tr.local_node_id is None
True
>>> tr.protocol_parameters
ProtocolParameters(transfer_id_modulo=0, max_nodes=0, mtu=0)

A redundant transport can be used with just one inferior to implement ad-hoc PnP allocation as follows: the transport is set up with an anonymous inferior which is disposed of upon completing the allocation procedure; the new inferior is then installed in the place of the old one configured to use the newly allocated node-ID value.

class pycyphal.transport.redundant.RedundantTransport(*, loop: Optional[asyncio.events.AbstractEventLoop] = None)[source]

Bases: pycyphal.transport._transport.Transport

This is a composite over a set of pycyphal.transport.Transport. Please read the module documentation for details.

__init__(*, loop: Optional[asyncio.events.AbstractEventLoop] = None) None[source]
Parameters

loop – Deprecated.

property protocol_parameters: pycyphal.transport._transport.ProtocolParameters[source]

Aggregate parameters constructed from all inferiors. If there are no inferiors (i.e., if the instance is closed), the value is all-zeros. Beware that if the set of inferiors is changed, this value may also be changed.

The values are obtained from the set of inferiors by applying the following reductions:

  • min transfer-ID modulo

  • min max-nodes

  • min MTU

property local_node_id: Optional[int][source]

All inferiors share the same local node-ID. If there are no inferiors, the value is None (anonymous).

get_input_session(specifier: pycyphal.transport._session.InputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata) pycyphal.transport.redundant._session._input.RedundantInputSession[source]
get_output_session(specifier: pycyphal.transport._session.OutputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata) pycyphal.transport.redundant._session._output.RedundantOutputSession[source]
sample_statistics() pycyphal.transport.redundant._redundant_transport.RedundantTransportStatistics[source]
property input_sessions: Sequence[pycyphal.transport.redundant._session._input.RedundantInputSession][source]
property output_sessions: Sequence[pycyphal.transport.redundant._session._output.RedundantOutputSession][source]
property inferiors: Sequence[pycyphal.transport._transport.Transport][source]

Read-only access to the list of inferior transports. The inferiors are guaranteed to be ordered according to the temporal order of their attachment.

attach_inferior(transport: pycyphal.transport._transport.Transport) None[source]

Adds a new transport to the redundant group. The new transport shall not be closed.

If the transport is already added or it is the redundant transport itself (recursive attachment), a ValueError will be raised.

If the configuration of the new transport is not compatible with the other inferiors or with the redundant transport instance itself, an instance of InconsistentInferiorConfigurationError will be raised. Specifically, the following preconditions are checked:

  • The new inferior shall operate on the same event loop as the redundant transport instance it is added to.

  • The local node-ID shall be the same for all inferiors, or all shall be anonymous.

  • The transfer-ID modulo shall meet either of the following conditions:

    • Identical for all inferiors.

    • Not less than MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD for all inferiors.

If an exception is raised while the setup of the new inferior is in progress, the operation will be rolled back to ensure state consistency.

detach_inferior(transport: pycyphal.transport._transport.Transport) None[source]

Removes the specified transport from the redundant group. If there is no such transport, a ValueError will be raised.

All sessions of the removed inferior that are managed by the redundant transport instance will be automatically closed, but the inferior itself will not be (the caller will have to do that manually if desired).

close() None[source]

Closes all redundant session instances, detaches and closes all inferior transports. Any exceptions occurring in the process will be suppressed and logged.

Upon completion, the session matrix will be returned into its original empty state. It can be populated back by adding new transports and/or instantiating new redundant sessions if needed. In other words, closing is reversible here, which is uncommon for the library; consider this feature experimental.

If the session matrix is empty, this method has no effect.

begin_capture(handler: Callable[[pycyphal.transport._tracer.Capture], None]) None[source]

Stores the handler in the local list of handlers. Invokes pycyphal.transport.Transport.begin_capture on each inferior. If at least one inferior raises an exception, it is propagated immediately and the remaining inferiors will remain in an inconsistent state. When a new inferior is added later, the stored handlers will be automatically used to enable capture on it. If such auto-restoration behavior is undesirable, configure capture individually per-inferior instead.

Every capture emitted by the inferiors is wrapped into RedundantCapture, which contains additional metadata about the inferior transport instance that emitted the capture. This is done to let users understand which transport of the redundant group has provided the capture and also this information is used by RedundantTracer to automatically manage transfer deduplication.

property capture_active: bool[source]
static make_tracer() pycyphal.transport.redundant._tracer.RedundantTracer[source]

See RedundantTracer.

async spoof(transfer: pycyphal.transport._tracer.AlienTransfer, monotonic_deadline: float) bool[source]

Simply propagates the call to every inferior. The return value is a logical AND for all inferiors; False if there are no inferiors.

First exception to occur terminates the operation and is raised immediately. This is different from regular sending; the assumption is that the caller necessarily wants to ensure that spoofing takes place against every inferior. If this is not the case, spoof each inferior separately.

class pycyphal.transport.redundant.RedundantTransportStatistics(inferiors: List[pycyphal.transport._transport.TransportStatistics] = <factory>)[source]

Bases: pycyphal.transport._transport.TransportStatistics

Aggregate statistics for all inferior transports in a redundant group. This is an atomic immutable sample; it is not updated after construction.

inferiors: List[pycyphal.transport._transport.TransportStatistics]

The ordering is guaranteed to match that of RedundantTransport.inferiors.

__eq__(other)[source]
__hash__ = None
__init__(inferiors: List[pycyphal.transport._transport.TransportStatistics] = <factory>) None[source]
__repr__()[source]
class pycyphal.transport.redundant.RedundantSession[source]

Bases: abc.ABC

The base for all redundant session instances.

A redundant session may be constructed even if the redundant transport itself has no inferiors. When a new inferior transport is attached/detached to/from the redundant group, dependent session instances are automatically reconfigured, transparently to the user.

The higher layers of the protocol stack are therefore shielded from any changes made to the stack below the redundant transport instance; existing sessions and other instances are never invalidated. This guarantee allows one to construct applications whose underlying transport configuration can be changed at runtime.

abstract property specifier: pycyphal.transport._session.SessionSpecifier[source]
abstract property payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata[source]
abstract property inferiors: Sequence[pycyphal.transport._session.Session][source]

Read-only access to the list of inferiors. The ordering is guaranteed to match that of RedundantTransport.inferiors.

abstract close() None[source]

Closes and detaches all inferior sessions. If any of the sessions fail to close, an error message will be logged, but no exception will be raised. The instance will no longer be usable afterward.

class pycyphal.transport.redundant.RedundantInputSession(specifier: pycyphal.transport._session.InputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata, tid_modulo_provider: Callable[[], int], finalizer: Callable[[], None])[source]

Bases: pycyphal.transport.redundant._session._base.RedundantSession, pycyphal.transport._session.InputSession

This is a composite of a group of pycyphal.transport.InputSession.

The transfer deduplication strategy is chosen between cyclic and monotonic automatically when the first inferior is added.

__init__(specifier: pycyphal.transport._session.InputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata, tid_modulo_provider: Callable[[], int], finalizer: Callable[[], None])[source]

Do not call this directly! Use the factory method instead.

property inferiors: Sequence[pycyphal.transport._session.InputSession][source]
async receive(monotonic_deadline: float) Optional[pycyphal.transport.redundant.RedundantTransferFrom][source]

Reads one deduplicated transfer received from all inferiors concurrently. Returns None on timeout. If there are no inferiors at the time of the invocation and none appear by the expiration of the timeout, returns None.

Exceptions raised by inferiors are propagated normally, but it is possible for an exception to be delayed until the next invocation of this method.

property transfer_id_timeout: float[source]

Assignment of a new transfer-ID timeout is transferred to all inferior sessions, so that their settings are always kept consistent. When the transfer-ID timeout value is queried, the maximum value from the inferior sessions is returned; if there are no inferiors, zero is returned. The transfer-ID timeout is not kept by the redundant session itself.

When a new inferior session is added, its transfer-ID timeout is assigned to match other inferiors. When all inferior sessions are removed, the transfer-ID timeout configuration becomes lost. Therefore, when the first inferior is added, the redundant session assumes its transfer-ID timeout configuration as its own; all inferiors added later will inherit the same setting.

property specifier: pycyphal.transport._session.InputSessionSpecifier[source]
property payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata[source]
sample_statistics() pycyphal.transport.redundant._session._base.RedundantSessionStatistics[source]
  • transfers - the number of successfully received deduplicated transfers (unique transfer count).

  • errors - the number of receive calls that could not be completed due to an exception.

  • payload_bytes - the number of payload bytes in successful deduplicated transfers counted in transfers.

  • drops - the total number of drops summed from all inferiors (i.e., total drop count). This value is invalidated when the set of inferiors is changed. The semantics may change later.

  • frames - the total number of frames summed from all inferiors (i.e., replicated frame count). This value is invalidated when the set of inferiors is changed. The semantics may change later.

close() None[source]
class pycyphal.transport.redundant.RedundantOutputSession(specifier: pycyphal.transport._session.OutputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata, finalizer: Callable[[], None])[source]

Bases: pycyphal.transport.redundant._session._base.RedundantSession, pycyphal.transport._session.OutputSession

This is a composite of a group of pycyphal.transport.OutputSession. Every outgoing transfer is simply forked into each of the inferior sessions. The result aggregation policy is documented in send().

__init__(specifier: pycyphal.transport._session.OutputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata, finalizer: Callable[[], None])[source]

Do not call this directly! Use the factory method instead.

property inferiors: Sequence[pycyphal.transport._session.OutputSession][source]
enable_feedback(handler: Callable[[pycyphal.transport.redundant._session._output.RedundantFeedback], None]) None[source]

The operation is atomic on all inferiors. If at least one inferior fails to enable feedback, all inferiors are rolled back into the disabled state.

disable_feedback() None[source]

The method implements the best-effort policy if any of the inferior sessions fail to disable feedback.

async send(transfer: pycyphal.transport._transfer.Transfer, monotonic_deadline: float) bool[source]

Sends the transfer via all of the inferior sessions concurrently. Returns when the first of the inferior calls succeeds; the remaining will keep sending in the background; that is, the redundant transport operates at the rate of the fastest inferior, delegating the slower ones to background tasks. Edge cases:

  • If there are no inferiors, the method will await until either the deadline is expired or an inferior(s) is (are) added. In the former case, the method returns False. In the latter case, the transfer is transmitted via the new inferior(s) using the remaining time until the deadline.

  • If at least one inferior succeeds, True is returned (logical OR). If the other inferiors raise exceptions, they are logged as errors and suppressed.

  • If all inferiors raise exceptions, one of them is propagated, the rest are logged as errors and suppressed.

  • If all inferiors time out, False is returned (logical OR).

In other words, the error handling strategy is optimistic: if one inferior reported success, the call is assumed to have succeeded; best result is always returned.

property specifier: pycyphal.transport._session.OutputSessionSpecifier[source]
property payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata[source]
sample_statistics() pycyphal.transport.redundant._session._base.RedundantSessionStatistics[source]
  • transfers - the number of redundant transfers where at least ONE inferior succeeded (success count).

  • errors - the number of redundant transfers where ALL inferiors raised exceptions (failure count).

  • payload_bytes - the number of payload bytes in successful redundant transfers counted in transfers.

  • drops - the number of redundant transfers where ALL inferiors timed out (timeout count).

  • frames - the total number of frames summed from all inferiors (i.e., replicated frame count). This value is invalidated when the set of inferiors is changed. The semantics may change later.

close() None[source]
class pycyphal.transport.redundant.RedundantSessionStatistics(transfers: int = 0, frames: int = 0, payload_bytes: int = 0, errors: int = 0, drops: int = 0, inferiors: List[pycyphal.transport._session.SessionStatistics] = <factory>)[source]

Bases: pycyphal.transport._session.SessionStatistics

Aggregate statistics for all inferior sessions in a redundant group. This is an atomic immutable sample; it is not updated after construction.

inferiors: List[pycyphal.transport._session.SessionStatistics]

The ordering is guaranteed to match that of RedundantSession.inferiors.

__eq__(other)[source]
__hash__ = None
__init__(transfers: int = 0, frames: int = 0, payload_bytes: int = 0, errors: int = 0, drops: int = 0, inferiors: List[pycyphal.transport._session.SessionStatistics] = <factory>) None[source]
__repr__()[source]
class pycyphal.transport.redundant.RedundantFeedback(inferior_feedback: pycyphal.transport._session.Feedback, inferior_session: pycyphal.transport._session.OutputSession)[source]

Bases: pycyphal.transport._session.Feedback

This is the output feedback extended with the reference to the inferior transport session that this feedback originates from.

A redundant output session provides one feedback entry per inferior session; for example, if there are three inferiors in a redundant transport group, each outgoing transfer will generate three feedback entries (unless inferior sessions fail to provide their feedback entries for whatever reason).

__init__(inferior_feedback: pycyphal.transport._session.Feedback, inferior_session: pycyphal.transport._session.OutputSession)[source]
property original_transfer_timestamp: pycyphal.transport._timestamp.Timestamp[source]
property first_frame_transmission_timestamp: pycyphal.transport._timestamp.Timestamp[source]
property inferior_feedback: pycyphal.transport._session.Feedback[source]

The original feedback instance from the inferior session.

property inferior_session: pycyphal.transport._session.OutputSession[source]

The inferior session that generated this feedback entry.

exception pycyphal.transport.redundant.InconsistentInferiorConfigurationError[source]

Bases: pycyphal.transport._error.InvalidTransportConfigurationError

Raised when a redundant transport instance is asked to attach a new inferior whose configuration does not match that of the other inferiors or of the redundant transport itself.

class pycyphal.transport.redundant.RedundantCapture(timestamp: pycyphal.transport._timestamp.Timestamp, inferior: pycyphal.transport._tracer.Capture, iface_id: int, transfer_id_modulo: int)[source]

Bases: pycyphal.transport._tracer.Capture

Composes pycyphal.transport.Capture with a reference to the transport instance that yielded this capture. The user may construct such captures manually when performing postmortem analysis of a network data dump to feed them later into RedundantTracer.

inferior: pycyphal.transport._tracer.Capture

The original capture from the inferior transport.

iface_id: int

A unique number that identifies this transport in its redundant group.

transfer_id_modulo: int

The number of unique transfer-ID values (that is, the maximum possible transfer-ID plus one) for the transport that emitted this capture. This is actually a transport-specific constant. This value is used by RedundantTracer to select the appropriate transfer deduplication strategy.

static get_transport_type() Type[pycyphal.transport.redundant._redundant_transport.RedundantTransport][source]
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: pycyphal.transport._timestamp.Timestamp, inferior: pycyphal.transport._tracer.Capture, iface_id: int, transfer_id_modulo: int) None[source]
__repr__()[source]
__setattr__(name, value)[source]
class pycyphal.transport.redundant.RedundantDuplicateTransferTrace(timestamp: pycyphal.transport._timestamp.Timestamp)[source]

Bases: pycyphal.transport._tracer.Trace

Indicates that the last capture object completed a valid transfer that was discarded as a duplicate (either received from another redundant interface or deterministic data loss mitigation (DDLM) is employed).

Observe that it is NOT a subclass of pycyphal.transport.TransferTrace! It shall not be one because duplicates should not be processed normally.

__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: pycyphal.transport._timestamp.Timestamp) None[source]
__repr__()[source]
__setattr__(name, value)[source]
timestamp: pycyphal.transport._timestamp.Timestamp

The local time when the traced event took place or was commenced. For transfers, this is the timestamp of the first frame.

class pycyphal.transport.redundant.RedundantTracer[source]

Bases: pycyphal.transport._tracer.Tracer

The redundant tracer automatically deduplicates transfers received from multiple redundant transports. It can be used either in real-time or during postmortem analysis. In the latter case the user would construct instances of RedundantCapture manually and feed them into the tracer one-by-one.

__init__() None[source]
update(cap: pycyphal.transport._tracer.Capture) Optional[pycyphal.transport._tracer.Trace][source]

All instances of pycyphal.transport.TransferTrace are deduplicated, duplicates are simply dropped and RedundantDuplicateTransferTrace is returned. All other instances (such as pycyphal.transport.ErrorTrace) are returned unchanged.

__repr__() str[source]