pycyphal.transport package

Subpackages

Module contents

Abstract transport model

The transport layer submodule defines a high-level interface that abstracts transport-specific implementation details from the transport-agnostic library core. The main component is the interface class pycyphal.transport.Transport accompanied by several auxiliary entities encapsulating and modeling different aspects of the Cyphal protocol stack, particularly:

These classes are specifically designed to map well onto the Cyphal v1 transport layer model (first discussed in this post: https://forum.opencyphal.org/t/alternative-transport-protocols/324). The following transfer metadata taxonomy table is the essence of the model; one can map it onto aforementioned auxiliary definitions:

Transfer metadata taxonomy

Messages

Services

Comments

Transfer priority

Not used above the transport layer.

Session specifier

Route specifier

Source node-ID

Source node-ID

Transport route information. If the destination node-ID is not provided, broadcast is implied.

Destination node-ID

Data specifier

Kind

Contained information: kind of transfer (message or service); subject-ID for messages; service-ID with request/response role selector for services.

Subject-ID

Service-ID

Request

Response

Transfer-ID

Transfer sequence number.

Sessions

PyCyphal transport heavily relies on the concept of session. In PyCyphal, session represents a flow of data through the network defined by a particular session specifier that either originates or terminates at the local node. Whenever the application desires to establish communication (such as subscribing to a subject or invoking a service), it commands the transport layer to open a particular session. The session abstraction is sufficiently high-level to permit efficient mapping to features natively available to concrete transport implementations. For example, the Cyphal/CAN transport uses the set of active input sessions to automatically compute the optimal hardware acceptance filter configuration; the Cyphal/UDP transport can map sessions onto UDP port numbers, establishing close equivalence between sessions and Berkeley sockets.

There can be at most one session per session specifier. When a transport is requested to provide a session, it will first check if there is one for the specifier, and return the existing one if so; otherwise, a new session will be created, stored, and returned. Once created, the session will remain active until explicitly closed, or until the transport instance that owns it is closed.

An output session that doesn’t have a remote node-ID specified is called a broadcast session; the opposite is called a unicast session.

An input session that doesn’t have a remote node-ID specified is called a promiscuous session, meaning that it accepts transfers with matching data specifier from any remote node. An input session where a remote node-ID is specified is called a selective session; such a session accepts transfers from a particular remote node-ID only. Selective sessions are useful for service transfers.

From the above description it is easy to see that a set of transfers that are valid for a given selective session is a subset of transfers that are valid for a given promiscuous session sharing the same data specifier. For example, consider two sessions sharing a data specifier D, one of which is promiscuous and the other is selective bound to remote node-ID N. Suppose that a transfer matching the data specifier D is received by the local node from remote node N, thereby matching both sessions. In cases like this, the transport implementation is required to deliver the received transfer into both matching sessions. The order (whether selective or promiscuous is served first) is implementation-defined.

Sniffing/snooping and tracing

Set up live capture on a transport using Transport.begin_capture(). We are using the loopback transport here for demonstration but other transports follow the same interface:

>>> from pycyphal.transport import Capture
>>> from pycyphal.transport.loopback import LoopbackTransport
>>> captured_events = []
>>> def on_capture(cap: Capture) -> None:
...     captured_events.append(cap)
>>> tr = LoopbackTransport(None)
>>> tr.begin_capture(on_capture)

Multiple different transports can be set up to deliver capture events into the same handler since they all share the same transport-agnostic API. This way, heterogeneous redundant transports can write and parse a single shared log file.

Emit a random transfer and see it captured:

>>> from pycyphal.transport import MessageDataSpecifier, PayloadMetadata, OutputSessionSpecifier, Transfer
>>> from pycyphal.transport import Timestamp, Priority
>>> import asyncio
>>> ses = tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(1234), None), PayloadMetadata(1024))
>>> doctest_await(ses.send(Transfer(Timestamp.now(), Priority.LOW, 1234567890, [memoryview(b'abc')]),
...                        monotonic_deadline=asyncio.get_event_loop().time() + 1.0))
True
>>> captured_events
[LoopbackCapture(...priority=LOW, transfer_id=1234567890...)]

The captured events can be processed afterwards: logged, displayed, or reconstructed into high-level events. The latter is done with the help of Tracer instantiated using the static factory method Transport.make_tracer():

>>> tracer = LoopbackTransport.make_tracer()
>>> tracer.update(captured_events[0])  # Captures could be read from live network or from a log file, for instance.
TransferTrace(...priority=LOW, transfer_id=1234567890...)

Implementing new transports

New transports can be added trivially by subclassing pycyphal.transport.Transport. This module contains several nested submodules providing standard transport implementations according to the Cyphal specification (e.g., the Cyphal/CAN transport) alongside with experimental implementations.

Each specific transport implementation included in the library shall reside in its own separate submodule under pycyphal.transport. The name of the submodule should be the lowercase name of the transport. The name of the implementation class that inherits from pycyphal.transport.Transport should begin with capitalized name of the submodule followed by Transport. If the new transport contains a media sub-layer, the media interface class should be at pycyphal.transport.*.media.Media, where the asterisk is the transport name placeholder; the media sub-layer should follow the same organization patterns as the transport layer. See the Cyphal/CAN transport as an example.

Implementations included in the library are never auto-imported, nor do they need to be. The same should be true for transport-specific media sub-layers. The application is required to explicitly import the transport (and media sub-layer) implementations that are needed. A highly generic, transport-agnostic application may benefit from the helper functions available in pycyphal.util, designed specifically to ease discovery and use of entities defined in submodules that are not auto-imported and whose names are not known in advance.

Users can define their custom transports and/or media sub-layers outside of the library scope. The library itself does not care about the location of its components.

Class inheritance diagram

Below is the class inheritance diagram for this module (trivial classes may be omitted):

Inheritance diagram of pycyphal.transport._transport, pycyphal.transport._error, pycyphal.transport._session, pycyphal.transport._data_specifier, pycyphal.transport._transfer, pycyphal.transport._payload_metadata, pycyphal.transport._tracer

class pycyphal.transport.Transport[source]

Bases: ABC

An abstract Cyphal transport interface. Please read the module documentation for details.

Implementations should ensure that properties do not raise exceptions.

property loop: AbstractEventLoop[source]

Deprecated.

abstract property protocol_parameters: ProtocolParameters[source]

Provides information about the properties of the transport protocol implemented by the instance. See ProtocolParameters.

abstract property local_node_id: int | None[source]

The node-ID is set once during initialization of the transport, either explicitly (e.g., CAN) or by deriving the node-ID value from the configuration of the underlying protocol layers (e.g., UDP/IP).

If the transport does not have a node-ID, this property has the value of None, and the transport (and the node that uses it) is said to be in the anonymous mode. While in the anonymous mode, some transports may choose to operate in a particular regime to facilitate plug-and-play node-ID allocation (for example, a CAN transport may disable automatic retransmission).

Protip: If you feel like assigning the node-ID after initialization, make a proxy that implements this interface and keeps a private transport instance. When the node-ID is assigned, the private transport instance is destroyed, a new one is implicitly created in its place, and all of the dependent session instances are automatically recreated transparently for the user of the proxy. This logic is implemented in the redundant transport, which can be used even if no redundancy is needed.

abstract close() None[source]

Closes all active sessions, underlying media instances, and other resources related to this transport instance.

After a transport is closed, none of its methods nor dependent objects (such as sessions) can be used. Methods invoked on a closed transport or any of its dependent objects should immediately raise pycyphal.transport.ResourceClosedError. Subsequent calls to close() will have no effect.

Failure to close any of the resources does not prevent the method from closing other resources (best effort policy). Related exceptions may be suppressed and logged; the last occurred exception may be raised after all resources are closed if such behavior is considered to be meaningful.

abstract get_input_session(specifier: InputSessionSpecifier, payload_metadata: PayloadMetadata) InputSession[source]

This factory method is the only valid way of constructing input session instances. Beware that construction and retirement of sessions may be costly.

The transport will always return the same instance unless there is no session object with the requested specifier, in which case it will be created and stored internally until closed. The payload metadata parameter is used only when a new instance is created, ignored otherwise. Implementations are encouraged to use a covariant return type annotation.

abstract get_output_session(specifier: OutputSessionSpecifier, payload_metadata: PayloadMetadata) OutputSession[source]

This factory method is the only valid way of constructing output session instances. Beware that construction and retirement of sessions may be costly.

The transport will always return the same instance unless there is no session object with the requested specifier, in which case it will be created and stored internally until closed. The payload metadata parameter is used only when a new instance is created, ignored otherwise. Implementations are encouraged to use a covariant return type annotation.

abstract sample_statistics() TransportStatistics[source]

Samples the low-level transport stats. The returned object shall be new or cloned (should not refer to an internal field). Implementations should annotate the return type as a derived custom type.

abstract property input_sessions: Sequence[InputSession][source]

Immutable view of all input sessions that are currently open.

abstract property output_sessions: Sequence[OutputSession][source]

Immutable view of all output sessions that are currently open.

abstract begin_capture(handler: Callable[[Capture], None]) None[source]

Warning

This API entity is not yet stable. Suggestions and feedback are welcomed at https://forum.opencyphal.org.

Activates low-level monitoring of the transport interface. Also see related method make_tracer().

This method puts the transport instance into the low-level capture mode which does not interfere with its normal operation but may significantly increase the computing load due to the need to process every frame exchanged over the network (not just frames that originate or terminate at the local node). This usually involves reconfiguration of the local networking hardware. For instance, the network card may be put into promiscuous mode, the CAN adapter will have its acceptance filters disabled, etc.

The capture handler is invoked for every transmitted or received transport frame and, possibly, some additional transport-implementation-specific events (e.g., network errors or hardware state changes) which are described in the specific transport implementation docs. The temporal order of the events delivered to the user may be distorted, depending on the guarantees provided by the hardware and its driver. This means that if the network hardware sees TX frame A and then RX frame B separated by a very short time interval, the user may occasionally see the sequence inverted as (B, A).

There may be an arbitrary number of capture handlers installed; when a new handler is installed, it is added to the existing ones, if any.

If the transport does not support capture, this method may have no observable effect. Technically, the capture protocol, as you can see, does not present any requirements to the emitted events, so an implementation that pretends to enter the capture mode while not actually doing anything is compliant.

Since capture reflects actual network events, FEC will make the instance emit duplicate frames for affected transfers (although this is probably obvious enough without this elaboration).

It is not possible to disable capture. Once enabled, it will go on until the transport instance is destroyed.

Parameters:

handler – A one-argument callable invoked to inform the user about low-level network events. The type of the argument is Capture, see transport-specific docs for the list of the possible concrete types and what events they represent. The handler may be invoked from a different thread so the user should ensure synchronization. If the handler raises an exception, it is suppressed and logged.

abstract property capture_active: bool[source]

Whether begin_capture() was invoked and packet capture is being performed on this transport.

abstract static make_tracer() Tracer[source]

Warning

This API entity is not yet stable. Suggestions and feedback are welcomed at https://forum.opencyphal.org.

Use this factory method for constructing tracer implementations for specific transports. Concrete tracers may be Voldemort types themselves. See also: Tracer, begin_capture().

abstract async spoof(transfer: AlienTransfer, monotonic_deadline: float) bool[source]

Warning

This API entity is not yet stable. Suggestions and feedback are welcomed at https://forum.opencyphal.org.

Send a spoofed transfer to the network. The configuration of the local transport instance has no effect on spoofed transfers; as such, even anonymous instances may send arbitrary spoofed transfers. The only relevant property of the instance is which network interface to use for spoofing.

When this method is invoked for the first time, the transport instance may need to perform one-time initialization such as reconfiguring the networking hardware or loading additional drivers. Once this one-time initialization is performed, the transport instance will reside in the spoofing mode until the instance is closed; it is not possible to leave the spoofing mode without closing the instance. Some transports/platforms may require special permissions to perform spoofing (esp. IP-based transports).

If the source node-ID is not provided, an anonymous transfer will be emitted. If anonymous transfers are not supported, pycyphal.transport.OperationNotDefinedForAnonymousNodeError will be raised. Same will happen if one attempted to transmit a multi-frame anonymous transfer.

If the destination node-ID is not provided, a broadcast transfer will be emitted. If the data specifier is that of a service, a UnsupportedSessionConfigurationError will be raised. The reverse conflict for messages is handled identically.

Transports with cyclic transfer-ID will compute the modulo automatically.

This method will update the appropriate statistical counters as usual.

Returns:

True on success, False on timeout.

__repr__() str[source]

Implementations should never override this method. Instead, see _get_repr_fields().

class pycyphal.transport.ProtocolParameters(transfer_id_modulo: int, max_nodes: int, mtu: int)[source]

Bases: object

Basic transport capabilities. These parameters are defined by the underlying transport specifications.

Normally, the values should never change for a particular transport instance. This is not a hard guarantee, however. For example, a redundant transport aggregator may return a different set of parameters after the set of aggregated transports is changed (i.e., a transport is added or removed).

transfer_id_modulo: int

The cardinality of the set of distinct transfer-ID values; i.e., the overflow period. All high-overhead transports (UDP, Serial, etc.) use a sufficiently large value that will never overflow in a realistic, practical scenario. The background and motivation are explained at https://forum.opencyphal.org/t/alternative-transport-protocols/324. Example: 32 for CAN, (2**64) for UDP.

max_nodes: int

How many nodes can the transport accommodate in a given network. Example: 128 for CAN, 65535 for UDP.

mtu: int

The maximum number of payload bytes in a single-frame transfer. If the number of payload bytes in a transfer exceeds this limit, the transport will spill the data into a multi-frame transfer. Example: 7 for Classic CAN, <=63 for CAN FD.

__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(transfer_id_modulo: int, max_nodes: int, mtu: int) None[source]
__match_args__ = ('transfer_id_modulo', 'max_nodes', 'mtu')
__repr__()[source]
__setattr__(name, value)[source]
class pycyphal.transport.TransportStatistics[source]

Bases: object

Base class for transport-specific low-level statistical counters. Not to be confused with pycyphal.transport.SessionStatistics, which is tracked per-session.

__eq__(other)[source]
__hash__ = None
__init__() None[source]
__match_args__ = ()
__repr__()[source]
class pycyphal.transport.Transfer(timestamp: Timestamp, priority: Priority, transfer_id: int, fragmented_payload: Sequence[memoryview])[source]

Bases: object

Cyphal transfer representation.

timestamp: Timestamp

For output (tx) transfers this field contains the transfer creation timestamp. For input (rx) transfers this field contains the first frame reception timestamp.

priority: Priority

See Priority.

transfer_id: int

When transmitting, the appropriate modulus will be computed by the transport automatically. Higher layers shall use monotonically increasing transfer-ID counters.

__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: Timestamp, priority: Priority, transfer_id: int, fragmented_payload: Sequence[memoryview]) None[source]
__match_args__ = ('timestamp', 'priority', 'transfer_id', 'fragmented_payload')
__setattr__(name, value)[source]
fragmented_payload: Sequence[memoryview]

See FragmentedPayload. This is the serialized application-level payload. Fragmentation may be completely arbitrary. Received transfers usually have it fragmented such that one fragment corresponds to one received frame. Outgoing transfers usually fragment it according to the structure of the serialized data object. The purpose of fragmentation is to eliminate unnecessary data copying within the protocol stack. pycyphal.transport.commons.refragment() is designed to facilitate regrouping when sending a transfer.

__repr__() str[source]
class pycyphal.transport.TransferFrom(timestamp: Timestamp, priority: Priority, transfer_id: int, fragmented_payload: Sequence[memoryview], source_node_id: int | None)[source]

Bases: Transfer

Specialization for received transfers.

__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: Timestamp, priority: Priority, transfer_id: int, fragmented_payload: Sequence[memoryview], source_node_id: int | None) None[source]
__match_args__ = ('timestamp', 'priority', 'transfer_id', 'fragmented_payload', 'source_node_id')
__setattr__(name, value)[source]
source_node_id: int | None

None indicates anonymous transfers.

class pycyphal.transport.Priority(value)[source]

Bases: IntEnum

Transfer priority enumeration follows the recommended names provided in the Cyphal specification. We use integers here in order to allow usage of static lookup tables for conversion into transport-specific priority values. The particular integer values used here may be meaningless for some transports.

EXCEPTIONAL = 0
IMMEDIATE = 1
FAST = 2
HIGH = 3
NOMINAL = 4
LOW = 5
SLOW = 6
OPTIONAL = 7
class pycyphal.transport.DataSpecifier[source]

Bases: object

The data specifier defines what category and type of data is exchanged over a transport session. See the abstract transport model for details.

__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__() None[source]
__match_args__ = ()
__repr__()[source]
__setattr__(name, value)[source]
class pycyphal.transport.MessageDataSpecifier(subject_id: 'int')[source]

Bases: DataSpecifier

SUBJECT_ID_MASK = 8191
subject_id: int
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(subject_id: int) None[source]
__match_args__ = ('subject_id',)
__repr__()[source]
__setattr__(name, value)[source]
class pycyphal.transport.ServiceDataSpecifier(service_id: 'int', role: 'Role')[source]

Bases: DataSpecifier

class Role(value)[source]

Bases: Enum

An enumeration.

REQUEST = 1

Request output role is for clients. Request input role is for servers.

RESPONSE = 2

Response output role is for servers. Response input role is for clients.

SERVICE_ID_MASK = 511
service_id: int
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(service_id: int, role: Role) None[source]
__match_args__ = ('service_id', 'role')
__repr__()[source]
__setattr__(name, value)[source]
role: Role
class pycyphal.transport.SessionSpecifier(data_specifier: DataSpecifier, remote_node_id: int | None)[source]

Bases: object

This dataclass models the session specifier (https://forum.opencyphal.org/t/alternative-transport-protocols/324) except that we assume that one end of the session terminates at the local node. There are specializations for input and output sessions with additional logic, but they do not add extra data (because remember this class follows the protocol model definition).

data_specifier: DataSpecifier

See pycyphal.transport.DataSpecifier.

remote_node_id: int | None

If not None: output sessions are unicast to that node-ID, and input sessions ignore all transfers except those that originate from the specified remote node-ID. If None: output sessions are broadcast and input sessions are promiscuous.

__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(data_specifier: DataSpecifier, remote_node_id: int | None) None[source]
__match_args__ = ('data_specifier', 'remote_node_id')
__repr__()[source]
__setattr__(name, value)[source]
class pycyphal.transport.InputSessionSpecifier(data_specifier: DataSpecifier, remote_node_id: int | None)[source]

Bases: SessionSpecifier

If the remote node-ID is set, this is a selective session (accept data from the specified remote node only); otherwise this is a promiscuous session (accept data from any node).

property is_promiscuous: bool[source]
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(data_specifier: DataSpecifier, remote_node_id: int | None) None[source]
__match_args__ = ('data_specifier', 'remote_node_id')
__repr__()[source]
__setattr__(name, value)[source]
class pycyphal.transport.OutputSessionSpecifier(data_specifier: DataSpecifier, remote_node_id: int | None)[source]

Bases: SessionSpecifier

If the remote node-ID is set, this is a unicast session (use unicast transfers); otherwise this is a broadcast session (use broadcast transfers). The Specification v1.0 allows the following kinds of transfers:

  • Broadcast message transfers.

  • Unicast service transfers.

Anything else is invalid per Cyphal v1.0. A future version of the specification may add support for unicast messages for at least some transports. Here, we go ahead and assume that unicast message transfers are valid in general; it is up to a particular transport implementation to choose whether they are supported. Beware that this is a non-standard experimental protocol extension and it may be removed depending on how the next versions of the Specification evolve. You can influence that by leaving feedback at https://forum.opencyphal.org.

To summarize:

Unicast

Broadcast

Message

Experimental, may be allowed in v1.x

Allowed by Specification

Service

Allowed by Specification

Banned by Specification

property is_broadcast: bool[source]
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(data_specifier: DataSpecifier, remote_node_id: int | None) None[source]
__match_args__ = ('data_specifier', 'remote_node_id')
__repr__()[source]
__setattr__(name, value)[source]
class pycyphal.transport.Session[source]

Bases: ABC

Abstract session base class. This is further specialized by input and output. Properties should not raise exceptions.

abstract property specifier: SessionSpecifier[source]
abstract property payload_metadata: PayloadMetadata[source]
abstract sample_statistics() SessionStatistics[source]

Samples and returns the approximated statistics. We say “approximated” because implementations are not required to sample the counters atomically, although normally they should strive to do so when possible.

abstract close() None[source]

After a session is closed, none of its methods can be used. Methods invoked on a closed session should immediately raise pycyphal.transport.ResourceClosedError. Subsequent calls to close() will have no effect (no exception either).

Methods where a task is blocked (such as receive()) at the time of close() will raise a pycyphal.transport.ResourceClosedError upon next invocation or sooner. Callers of such blocking methods are recommended to avoid usage of large timeouts to facilitate faster reaction to transport closure.

__repr__() str[source]
class pycyphal.transport.InputSession[source]

Bases: Session

Either promiscuous or selective input session. The configuration cannot be changed once instantiated.

Users shall never construct instances themselves; instead, the factory method pycyphal.transport.Transport.get_input_session() shall be used.

abstract property specifier: InputSessionSpecifier[source]
abstract async receive(monotonic_deadline: float) TransferFrom | None[source]

Attempts to receive the transfer before the deadline [second]. Returns None if the transfer is not received before the deadline. The deadline is compared against asyncio.AbstractEventLoop.time(). If the deadline is in the past, checks once if there is a transfer and then returns immediately without context switching.

Implementations that use internal queues are recommended to permit the consumer to continue reading queued transfers after the instance is closed until the queue is empty. In other words, it is recommended to not raise the ResourceClosed exception until the instance is closed AND the queue is empty.

abstract property transfer_id_timeout: float[source]

By default, the transfer-ID timeout [second] is initialized with the default value provided in the Cyphal specification. It can be overridden using this interface if necessary (rarely is). An attempt to assign an invalid timestamp value raises ValueError.

property source_node_id: int | None[source]

Alias for .specifier.remote_node_id. For promiscuous sessions this is always None. For selective sessions this is the node-ID of the source.

class pycyphal.transport.OutputSession[source]

Bases: Session

Either broadcast or unicast output session. The configuration cannot be changed once instantiated.

Users shall never construct instances themselves; instead, the factory method pycyphal.transport.Transport.get_output_session() shall be used.

abstract property specifier: OutputSessionSpecifier[source]
abstract async send(transfer: Transfer, monotonic_deadline: float) bool[source]

Sends the transfer; blocks if necessary until the specified deadline [second]. The deadline value is compared against asyncio.AbstractEventLoop.time(). Returns when transmission is completed, in which case the return value is True; or when the deadline is reached, in which case the return value is False. In the case of timeout, a multi-frame transfer may be emitted partially, thereby rendering the receiving end unable to process it. If the deadline is in the past, the method attempts to send the frames anyway as long as that doesn’t involve blocking (i.e., task context switching).

Some transports or media sub-layers may be unable to guarantee transmission strictly before the deadline; for example, that may be the case if there is an additional buffering layer under the transport/media implementation (e.g., that could be the case with SLCAN-interfaced CAN bus adapters, IEEE 802.15.4 radios, and so on, where the data is pushed through an intermediary interface and briefly buffered again before being pushed onto the media). This is a design limitation imposed by the underlying non-real-time platform that Python runs on; it is considered acceptable since PyCyphal is designed for soft-real-time applications at most.

abstract enable_feedback(handler: Callable[[Feedback], None]) None[source]

The output feedback feature makes the transport invoke the specified handler soon after the first frame of each transfer originating from this session instance is delivered to the network interface or similar underlying logic (not to be confused with delivery to the destination node!). This is designed for transmission timestamping, which in turn is necessary for certain protocol features such as highly accurate time synchronization.

The handler is invoked with one argument of type pycyphal.transport.Feedback which contains the timing information. The transport implementation is allowed to invoke the handler from any context, possibly from another thread. The caller should ensure adequate synchronization. The actual delay between the emission of the first frame and invocation of the callback is implementation-defined, but implementations should strive to minimize it.

Output feedback is disabled by default. It can be enabled by invoking this method. While the feedback is enabled, the performance of the transport in general (not just this session instance) may be reduced, possibly resulting in higher input/output latencies and increased CPU load.

When feedback is already enabled at the time of invocation, this method removes the old callback and installs the new one instead.

Design motivation: We avoid full-transfer loopback such as used in Libuavcan (at least in its old version) on purpose because that would make it impossible for us to timestamp outgoing transfers independently per transport interface (assuming redundant transports here), since the transport aggregation logic would deduplicate redundant received transfers, thus making the valuable timing information unavailable.

abstract disable_feedback() None[source]

Restores the original state. Does nothing if the callback is already disabled.

property destination_node_id: int | None[source]

Alias for .specifier.remote_node_id. For broadcast sessions this is always None. For unicast sessions this is the node-ID of the destination.

class pycyphal.transport.PayloadMetadata(extent_bytes: int)[source]

Bases: object

This information is obtained from the data type definition.

Eventually, this type might include the runtime type identification information, if it is ever implemented in Cyphal. The alpha revision used to contain the “data type hash” field here, but this concept was found deficient and removed from the proposal. You can find related discussion in https://forum.opencyphal.org/t/alternative-transport-protocols-in-uavcan/324.

extent_bytes: int

The minimum amount of memory required to hold any serialized representation of any compatible version of the data type; or, on other words, it is the the maximum possible size of received objects. The size is specified in bytes because extent is guaranteed (by definition) to be an integer number of bytes long.

This parameter is determined by the data type author at the data type definition time. It is typically larger than the maximum object size in order to allow the data type author to introduce more fields in the future versions of the type; for example, MyMessage.1.0 may have the maximum size of 100 bytes and the extent 200 bytes; a revised version MyMessage.1.1 may have the maximum size anywhere between 0 and 200 bytes. It is always safe to pick a larger value if not sure. You will find a more rigorous description in the Cyphal Specification.

Transport implementations may use this information to statically size receive buffers.

__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(extent_bytes: int) None[source]
__match_args__ = ('extent_bytes',)
__repr__()[source]
__setattr__(name, value)[source]
class pycyphal.transport.SessionStatistics(transfers: int = 0, frames: int = 0, payload_bytes: int = 0, errors: int = 0, drops: int = 0)[source]

Bases: object

Abstract transport-agnostic session statistics. Transport implementations are encouraged to extend this class to add more transport-specific information. The statistical counters start from zero when a session is first instantiated.

transfers: int = 0

Successful transfer count.

frames: int = 0

Cyphal transport frame count (CAN frames, UDP packets, wireless frames, etc).

payload_bytes: int = 0

Successful transfer payload bytes (not including transport metadata or padding).

errors: int = 0

Failures of any kind, even if they are also logged using other means, excepting drops.

drops: int = 0

Frames lost to buffer overruns and expired deadlines.

__eq__(other: object) bool[source]

The statistic comparison operator is defined for any combination of derived classes. It compares only those fields that are available in both operands, ignoring unique fields. This is useful for testing.

__hash__ = None
__init__(transfers: int = 0, frames: int = 0, payload_bytes: int = 0, errors: int = 0, drops: int = 0) None[source]
__match_args__ = ('transfers', 'frames', 'payload_bytes', 'errors', 'drops')
__repr__()[source]
class pycyphal.transport.Feedback[source]

Bases: ABC

Abstract output transfer feedback for transmission timestamping. If feedback is enabled for an output session, an instance of this class is delivered back to the application via a callback soon after the first frame of the transfer is emitted.

The upper layers can match a feedback object with its transfer by the transfer creation timestamp.

abstract property original_transfer_timestamp: Timestamp[source]

This is the timestamp value of the original outgoing transfer object; normally it is the transfer creation timestamp. This value can be used by the upper layers to match each transmitted transfer with its transmission timestamp. Why do we use timestamp for matching? This is because:

  • The priority is rarely unique, hence unfit for matching.

  • Transfer-ID may be modified by the transport layer by computing its modulus, which is difficult to reliably account for in the application, especially in heterogeneous redundant transports.

  • The fragmented payload may contain references to the actual memory of the serialized object, meaning that it may actually change after the object is transmitted, also rendering it unfit for matching.

abstract property first_frame_transmission_timestamp: Timestamp[source]

This is the best-effort estimate of the transmission timestamp. Transport implementations are not required to adhere to any specific accuracy goals. They may use either software or hardware timestamping under the hood, depending on the capabilities of the underlying media driver. The timestamp of a multi-frame transfer is the timestamp of its first frame. The overall TX latency can be computed by subtracting the original transfer timestamp from this value.

__repr__() str[source]
class pycyphal.transport.Timestamp(system_ns: int, monotonic_ns: int)[source]

Bases: object

Timestamps are hashable and immutable. Timestamps can be compared for exact equality; relational comparison operators are not defined.

A timestamp instance always contains a pair of time samples: the system time, also known as “wall time” or local civil time, and the monotonic time, which is used only for time interval measurement.

__init__(system_ns: int, monotonic_ns: int) None[source]

Manual construction is rarely needed, except when implementing network drivers. See the static factory methods.

Parameters:
  • system_ns – Belongs to the domain of time.time_ns(). Units are nanoseconds.

  • monotonic_ns – Belongs to the domain of time.monotonic_ns(). Units are nanoseconds.

static from_seconds(system: float | int | Decimal, monotonic: float | int | Decimal) Timestamp[source]

Both inputs are in seconds (not nanoseconds) of any numerical type.

static now() Timestamp[source]

Constructs a new timestamp instance populated with current time.

Important

Clocks are sampled non-atomically! Monotonic sampled first.

static combine_oldest(*arguments: Timestamp) Timestamp[source]

Picks lowest time values from the provided set of timestamps and constructs a new instance from those.

This can be useful for transfer reception logic where the oldest frame timestamp is used as the transfer timestamp for multi-frame transfers to reduce possible timestamping error variation introduced in the media layer.

>>> Timestamp.combine_oldest(
...     Timestamp(12345, 45600),
...     Timestamp(12300, 45699),
...     Timestamp(12399, 45678),
... )
Timestamp(system_ns=12300, monotonic_ns=45600)
property system: Decimal[source]

System time in seconds.

property monotonic: Decimal[source]

Monotonic time in seconds.

property system_ns: int[source]
property monotonic_ns: int[source]
__eq__(other: Any) bool[source]

Performs an exact comparison of the timestamp components with nanosecond resolution.

__hash__() int[source]
__str__() str[source]
__repr__() str[source]
exception pycyphal.transport.TransportError[source]

Bases: RuntimeError

This is the root exception class for all transport-related errors. Exception types defined at the higher layers up the protocol stack (e.g., the presentation layer) also inherit from this type, so the application may use this type as the base exception type for all Cyphal-related errors that occur at runtime.

This exception type hierarchy is intentionally separated from DSDL-related errors that may occur at code generation time.

exception pycyphal.transport.UnsupportedSessionConfigurationError[source]

Bases: TransportError

The requested session configuration is not supported by this transport. For example, this exception would be raised if one attempted to create a unicast output for messages over the CAN bus transport.

exception pycyphal.transport.OperationNotDefinedForAnonymousNodeError[source]

Bases: TransportError

The requested action would normally be possible, but it is currently not because the transport instance does not have a node-ID assigned.

exception pycyphal.transport.InvalidTransportConfigurationError[source]

Bases: TransportError

The transport could not be initialized or the operation could not be performed because the specified configuration is invalid.

exception pycyphal.transport.InvalidMediaConfigurationError[source]

Bases: InvalidTransportConfigurationError

The transport could not be initialized or the operation could not be performed because the specified media configuration is invalid.

exception pycyphal.transport.ResourceClosedError[source]

Bases: TransportError

The requested operation could not be performed because an associated resource has already been terminated. Double-close should not raise exceptions.

class pycyphal.transport.Capture(timestamp: Timestamp)[source]

Bases: object

This is the abstract data class for all events reported via the capture API.

If a transport implementation defines multiple event types, it is recommended to define a common superclass for them such that it is always possible to determine which transport an event has arrived from using a single instance check.

timestamp: Timestamp
static get_transport_type() Type[Transport][source]

Static reference to the type of transport that can emit captures of this type. For example, for Cyphal/serial it would be pycyphal.transport.serial.SerialTransport. Although the method is static, it shall be overridden by all inheritors.

__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: Timestamp) None[source]
__match_args__ = ('timestamp',)
__repr__()[source]
__setattr__(name, value)[source]
class pycyphal.transport.AlienSessionSpecifier(source_node_id: int | None, destination_node_id: int | None, data_specifier: DataSpecifier)[source]

Bases: object

See AlienTransfer and the abstract transport model.

source_node_id: int | None

None represents an anonymous transfer.

destination_node_id: int | None

None represents a broadcast transfer.

data_specifier: DataSpecifier
__repr__() str[source]
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(source_node_id: int | None, destination_node_id: int | None, data_specifier: DataSpecifier) None[source]
__match_args__ = ('source_node_id', 'destination_node_id', 'data_specifier')
__setattr__(name, value)[source]
class pycyphal.transport.AlienTransferMetadata(priority: 'pycyphal.transport.Priority', transfer_id: 'int', session_specifier: 'AlienSessionSpecifier')[source]

Bases: object

priority: Priority
transfer_id: int

For outgoing transfers over transports with cyclic transfer-ID the modulo is computed automatically. The user does not have to bother; although, if it is desired to match the spoofed transfer with some follow-up activity (like a service response), the user needs to compute the modulo manually for obvious reasons.

session_specifier: AlienSessionSpecifier
__repr__() str[source]
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(priority: Priority, transfer_id: int, session_specifier: AlienSessionSpecifier) None[source]
__match_args__ = ('priority', 'transfer_id', 'session_specifier')
__setattr__(name, value)[source]
class pycyphal.transport.AlienTransfer(metadata: AlienTransferMetadata, fragmented_payload: Sequence[memoryview])[source]

Bases: object

This type models a captured (sniffed) decoded transfer exchanged between a local node and a remote node, between remote nodes, misaddressed transfer, or a spoofed transfer.

It is different from pycyphal.transport.Transfer because the latter is intended for normal communication, whereas this type is designed for advanced network diagnostics, which is a very different use case. You may notice that the regular transfer model does not include some information such as, say, the route specifier, because the respective behaviors are managed by the transport configuration.

metadata: AlienTransferMetadata
fragmented_payload: Sequence[memoryview]

For reconstructed transfers the number of fragments equals the number of frames in the transfer. For outgoing transfers the number of fragments may be arbitrary, the payload is always rearranged correctly.

__eq__(other: object) bool[source]

Transfers whose payload is fragmented differently but content-wise is identical compare equal.

>>> from pycyphal.transport import MessageDataSpecifier, Priority
>>> meta = AlienTransferMetadata(Priority.LOW, 999, AlienSessionSpecifier(123, None, MessageDataSpecifier(888)))
>>> a =  AlienTransfer(meta, fragmented_payload=[memoryview(b'abc'), memoryview(b'def')])
>>> a == AlienTransfer(meta, fragmented_payload=[memoryview(b'abcd'), memoryview(b''), memoryview(b'ef')])
True
>>> a == AlienTransfer(meta, fragmented_payload=[memoryview(b'abcdef')])
True
>>> a == AlienTransfer(meta, fragmented_payload=[])
False
__repr__() str[source]
__delattr__(name)[source]
__hash__()[source]
__init__(metadata: AlienTransferMetadata, fragmented_payload: Sequence[memoryview]) None[source]
__match_args__ = ('metadata', 'fragmented_payload')
__setattr__(name, value)[source]
class pycyphal.transport.Trace(timestamp: Timestamp)[source]

Bases: object

Base event reconstructed by Tracer. Transport-specific implementations may define custom subclasses.

timestamp: Timestamp

The local time when the traced event took place or was commenced. For transfers, this is the timestamp of the first frame.

__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: Timestamp) None[source]
__match_args__ = ('timestamp',)
__repr__()[source]
__setattr__(name, value)[source]
class pycyphal.transport.ErrorTrace(timestamp: Timestamp)[source]

Bases: Trace

This trace is yielded when the tracer has determined that it is unable to reconstruct a transfer. It may be further specialized by transport implementations.

__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: Timestamp) None[source]
__match_args__ = ('timestamp',)
__repr__()[source]
__setattr__(name, value)[source]
class pycyphal.transport.TransferTrace(timestamp: Timestamp, transfer: AlienTransfer, transfer_id_timeout: float)[source]

Bases: Trace

Reconstructed network data transfer (possibly exchanged between remote nodes) along with metadata.

__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: Timestamp, transfer: AlienTransfer, transfer_id_timeout: float) None[source]
__match_args__ = ('timestamp', 'transfer', 'transfer_id_timeout')
__repr__()[source]
__setattr__(name, value)[source]
transfer: AlienTransfer
transfer_id_timeout: float

The tracer uses heuristics to automatically deduce the optimal transfer-ID timeout value per session based on the supplied captures. Whenever a new transfer is reassembled, the auto-deduced transfer-ID timeout that is currently used for its session is reported for informational purposes. This value may be used later to perform transfer deduplication if redundant tracers are used; for that, see pycyphal.transport.redundant.

class pycyphal.transport.Tracer[source]

Bases: ABC

The tracer takes single instances of Capture at the input and delivers a reconstructed high-level view of network events (modeled by Trace) at the output. It keeps massive internal state that is modified whenever update() is invoked. The class may be used either for real-time analysis on a live network, or for post-mortem analysis with capture events read from a black box recorder or a log file.

Instances of this class are entirely isolated from the outside world; they do not perform any IO and do not hold any resources, they are purely computing entities. To reset the state (e.g., in order to start analyzing a new log) simply discard the old instance and use a new one.

The user should never attempt to instantiate implementations manually; instead, the factory method pycyphal.transport.Transport.make_tracer() should be used.

Each transport implementation typically implements its own tracer.

abstract update(cap: Capture) Trace | None[source]

Takes a captured low-level network event at the input, returns a reconstructed high-level event at the output. If the event is considered irrelevant or did not update the internal state significantly (i.e., this is a non-last frame of a multi-frame transfer), the output is None. Reconstructed multi-frame transfers are reported as a single event when the last frame is received.

Capture instances that are not supported by the current transport are silently ignored and None is returned.