Module contents


Bases: abc.ABC

CAN hardware abstraction interface.

It is recognized that the availability of some of the media implementations may be conditional on the type of platform (e.g., SocketCAN is Linux-only) and the availability of third-party software (e.g., PySerial may be needed for SLCAN). Python packages containing such media implementations shall be always importable.


The frames handler is non-blocking and non-yielding; returns immediately. The timestamp is provided individually per frame.

alias of Callable[[Sequence[Tuple[pycyphal.transport._timestamp.Timestamp,]]], None]

VALID_MTU_SET = {8, 12, 16, 20, 24, 32, 48, 64}

Valid MTU values for Classic CAN and CAN FD.

property loop:[source]


abstract property interface_name: str[source]

The name of the interface on the local system. For example:

  • can0 for SocketCAN;

  • /dev/serial/by-id/usb-Zubax_Robotics_Zubax_Babel_28002E0001514D593833302000000000-if00 for SLCAN;

  • COM9 for SLCAN.

abstract property mtu: int[source]

The value belongs to VALID_MTU_SET. Observe that the media interface doesn’t care whether we’re using CAN FD or CAN 2.0 because the Cyphal CAN transport protocol itself doesn’t care. The transport simply does not distinguish them.

abstract property number_of_acceptance_filters: int[source]

The number of hardware acceptance filters supported by the underlying CAN controller. Some media drivers, such as SocketCAN, may implement acceptance filtering in software instead of hardware. The returned value shall be a positive integer. If the hardware does not support filtering at all, the media driver shall emulate at least one filter in software.

abstract start(handler: ReceivedFramesHandler, no_automatic_retransmission: bool) None[source]

Every received frame shall be timestamped. Both monotonic and system timestamps are required. There are no timestamping accuracy requirements. An empty set of frames should never be reported.

The media implementation shall drop all non-data frames (RTR frames, error frames, etc.).

If the set contains more than one frame, all frames must be ordered by the time of their arrival, which also should be reflected in their timestamps; that is, the timestamp of a frame at index N generally should not be higher than the timestamp of a frame at index N+1. The timestamp ordering, however, is not a strict requirement because it is recognized that due to error variations in the timestamping algorithms timestamp values may not be monotonically increasing.

The implementation should strive to return as many frames per call as possible as long as that does not increase the worst case latency.

The handler shall be invoked on the event loop returned by loop.

The transport is guaranteed to invoke this method exactly once during (or shortly after) initialization; it can be used to perform a lazy start of the receive loop task/thread/whatever. It is undefined behavior to invoke this method more than once on the same instance.

  • handler – Behold my transformation. You are empowered to do as you please.

  • no_automatic_retransmission – If True, the CAN controller should be configured to abort transmission of CAN frames after first error or arbitration loss (time-triggered transmission mode). This mode is used by Cyphal to facilitate the PnP node-ID allocation process on the client side. Its support is not mandatory but highly recommended to avoid excessive disturbance of the bus while PnP allocations are in progress.

abstract configure_acceptance_filters(configuration: Sequence[]) None[source]

This method is invoked whenever the subscription set is changed in order to communicate to the underlying CAN controller hardware which CAN frames should be accepted and which ones should be ignored.

An empty set of configurations means that the transport is not interested in any frames, i.e., all frames should be rejected by the controller. That is also the recommended default configuration (ignore all frames until explicitly requested otherwise).

abstract async send(frames: Iterable[], monotonic_deadline: float) int[source]

All passed frames are guaranteed to share the same CAN-ID. This guarantee may enable some optimizations. The frames shall be delivered to the bus in the same order. The iterable is guaranteed to be non-empty.

The method returns when the deadline is reached even if some of the frames could not be transmitted. The returned value is the number of frames that have been sent. If the returned number is lower than the number of supplied frames, the outer transport logic will register an error, which is then propagated upwards all the way to the application level.

The method should avoid yielding the execution flow; instead, it is recommended to unload the frames into an internal transmission queue and return ASAP, as that minimizes the likelihood of inner priority inversion. If that approach is used, implementations are advised to keep track of transmission deadline on a per-frame basis to meet the timing requirements imposed by the application.

abstract close() None[source]

After the media instance is closed, none of its methods can be used anymore. If a method is invoked after close, pycyphal.transport.ResourceClosedError should be raised. This method is an exception to that rule: if invoked on a closed instance, it shall do nothing.

static list_available_interface_names() Iterable[str][source]

Returns the list of interface names that can be used with the media class implementing it. For example, for the SocketCAN media class it would return the SocketCAN interface names such as “vcan0”; for SLCAN it would return the list of serial ports.

Implementations should strive to sort the output so that the interfaces that are most likely to be used are listed first – this helps GUI applications.

If the media implementation cannot be used on the local platform, the method shall return an empty set instead of raising an error. This guarantee supports an important use case where the caller would just iterate over all inheritors of this Media interface and ask each one to yield the list of available interfaces, and then just present that to the user.

__repr__() str[source]

Bases: enum.IntEnum

An enumeration.

BASE = 11
class 'FrameFormat', identifier: 'int', data: 'bytearray')[source]

Bases: object

identifier: int
data: bytearray
property dlc: int[source]

Not to be confused with len(data).

static convert_dlc_to_length(dlc: int) int[source]
static get_required_padding(data_length: int) int[source]

Computes padding to nearest valid CAN FD frame size.

>>> DataFrame.get_required_padding(6)
>>> DataFrame.get_required_padding(61)
__repr__() str[source]
__init__(format:, identifier: int, data: bytearray) None[source]
__setattr__(name, value)[source]
class, loopback: bool)[source]

Bases: object

The envelope models a singular input/output frame transaction. It is a media layer frame extended with IO-related metadata.

loopback: bool

Loopback request for outgoing frames; loopback indicator for received frames.

__init__(frame:, loopback: bool) None[source]
__setattr__(name, value)[source]
class 'int', mask: 'int', format: 'typing.Optional[FrameFormat]')[source]

Bases: object

identifier: int

The reference CAN ID value.

mask: int

Mask applies to the identifier only. It does not contain any special flags.

format: Optional[]

None means no preference – both formats will be accepted.

property identifier_bit_length: int[source]
static new_promiscuous(frame_format: Optional[] = None)[source]

Returns a configuration that accepts all frames of the specified format. If the format is not specified, no distinction will be made. Note that some CAN controllers may have difficulty supporting both formats on a single filter.

property rank: int[source]

This is the number of set bits in the mask. This is a part of the CAN acceptance filter configuration optimization algorithm; see optimize_filter_configurations().

We return negative rank for configurations which do not distinguish between extended and base frames in order to discourage merger of configurations of different frame types, since they are hard to support in certain CAN controllers. The effect of this is that we guarantee that an ambivalent filter configuration will never appear if the controller has at least two acceptance filters. Negative rank is computed by subtracting the number of bits in the CAN ID (or 29 if the filter accepts both base and extended identifiers) from the original rank.


This is a part of the CAN acceptance filter configuration optimization algorithm; see optimize_filter_configurations().

Given two filter configurations A and B, where A accepts CAN frames whose identifiers belong to Ca and likewise Cb for B, the merge product of A and B would be a new filter configuration that accepts CAN frames belonging to a new set which is a superset of the union of Ca and Cb.

__str__() str[source]
__init__(identifier: int, mask: int, format: Optional[]) None[source]
__setattr__(name, value)[source] Iterable[], target_number_of_configurations: int) Sequence[][source]

Implements the CAN acceptance filter configuration optimization algorithm described in the Specification. The algorithm was originally proposed by P. Kirienko and I. Sheremet.

Given a set of K filter configurations that accept CAN frames whose identifiers belong to the set C, and N acceptance filters implemented in hardware, where 1 <= N < K, find a new set of K' filter configurations that accept CAN frames whose identifiers belong to the set C', such that K' <= N, C' is a superset of C, and |C'| is minimized.

The algorithm is not defined for N >= K because this configuration is considered optimal. The function returns the input set unchanged in this case. If the target number of configurations is not positive, a ValueError is raised.

The time complexity of this implementation is O(K!); it should be optimized.