pycyphal.transport.can package

Subpackages

Module contents

Cyphal/CAN transport overview

This module implements Cyphal/CAN – the CAN transport for Cyphal, both Classic CAN and CAN FD, as defined in the Cyphal specification. Cyphal does not distinguish between the two aside from the MTU difference; neither does this implementation. Classic CAN is essentially treated as CAN FD with MTU of 8 bytes.

Different CAN hardware is supported through the media sublayer; please refer to pycyphal.transport.can.media.

Per the Cyphal specification, the CAN transport supports broadcast messages and unicast services:

Supported transfers

Unicast

Broadcast

Message

No

Yes

Service

Yes

Banned by Specification

Tooling

Some of the media sub-layer implementations support virtual CAN bus interfaces (e.g., SocketCAN on GNU/Linux); they are often useful for testing. Please read the media sub-layer documentation for details.

Inheritance diagram

Inheritance diagram of pycyphal.transport.can._can, pycyphal.transport.can._session._input, pycyphal.transport.can._session._output, pycyphal.transport.can._tracer

class pycyphal.transport.can.CANTransport(media: Media, local_node_id: int | None, *, loop: AbstractEventLoop | None = None)[source]

Bases: Transport

The standard Cyphal/CAN transport implementation as defined in the Cyphal specification. Please read the module documentation for details.

TRANSFER_ID_MODULO = 32
__init__(media: Media, local_node_id: int | None, *, loop: AbstractEventLoop | None = None)[source]
Parameters:
  • media – The media implementation.

  • local_node_id – The node-ID to use. Can’t be changed. None means anonymous (useful for PnP allocation).

  • loop – Deprecated.

property protocol_parameters: ProtocolParameters[source]
property local_node_id: int | None[source]

If the local node-ID is not assigned, automatic retransmission in the media implementation is disabled to facilitate plug-and-play node-ID allocation.

property input_sessions: Sequence[CANInputSession][source]
property output_sessions: Sequence[CANOutputSession][source]
close() None[source]
sample_statistics() CANTransportStatistics[source]
get_input_session(specifier: InputSessionSpecifier, payload_metadata: PayloadMetadata) CANInputSession[source]

See the base class docs for background. Whenever an input session is created or destroyed, the hardware acceptance filters are reconfigured automatically; computation of a new configuration and its deployment on the CAN controller may be slow.

get_output_session(specifier: OutputSessionSpecifier, payload_metadata: PayloadMetadata) CANOutputSession[source]
begin_capture(handler: Callable[[Capture], None]) None[source]

Capture is implemented by reconfiguring the acceptance filter to accept everything and forcing loopback for every outgoing frame. Forced loopback ensures that transmitted frames are timestamped very accurately. Captured frames are encapsulated inside pycyphal.transport.can.CANCapture.

property capture_active: bool[source]
static make_tracer() CANTracer[source]

See CANTracer.

async spoof_frames(frames: Sequence[DataFrame], monotonic_deadline: float) None[source]

Inject arbitrary frames into the transport directly. Frames that could not be delivered to the underlying media driver before the deadline are silently dropped. This method is mostly intended for co-existence with other communication protocols that use the same CAN interface (e.g., DroneCAN).

async spoof(transfer: AlienTransfer, monotonic_deadline: float) bool[source]

Spoofing over the CAN transport is trivial and it does not involve reconfiguration of the media layer. It can be invoked at no cost at any time (unlike, say, Cyphal/UDP). See the overridden method pycyphal.transport.Transport.spoof() for details.

class pycyphal.transport.can.CANInputSession(specifier: InputSessionSpecifier, payload_metadata: PayloadMetadata, finalizer: Callable[[], None])[source]

Bases: pycyphal.transport.can.CANSession, InputSession

DEFAULT_TRANSFER_ID_TIMEOUT = 2

Per the Cyphal specification. Units are seconds. Can be overridden after instantiation if needed.

__init__(specifier: InputSessionSpecifier, payload_metadata: PayloadMetadata, finalizer: Callable[[], None])[source]

Use the factory method.

property frame_queue_capacity: int | None[source]

Capacity of the input frame queue. None means that the capacity is unlimited, which is the default. This may deplete the heap if input transfers are not consumed quickly enough so beware.

If the capacity is changed and the new value is smaller than the number of frames currently in the queue, the newest frames will be discarded and the number of queue overruns will be incremented accordingly. The complexity of a queue capacity change may be up to linear of the number of frames currently in the queue. If the value is not None, it must be a positive integer, otherwise you get a ValueError.

property specifier: InputSessionSpecifier[source]
property payload_metadata: PayloadMetadata[source]
sample_statistics() CANInputSessionStatistics[source]
property transfer_id_timeout: float[source]
async receive(monotonic_deadline: float) TransferFrom | None[source]
close() None[source]
class pycyphal.transport.can.CANOutputSession(transport: CANTransport, send_handler: Callable[[pycyphal.transport.can.SendTransaction], Awaitable[bool]], specifier: OutputSessionSpecifier, payload_metadata: PayloadMetadata, finalizer: Callable[[], None])[source]

Bases: pycyphal.transport.can.CANSession, OutputSession

This is actually an abstract class, but its concrete inheritors are hidden from the API. The implementation is chosen according to the type of the session requested: broadcast or unicast.

__init__(transport: CANTransport, send_handler: Callable[[pycyphal.transport.can.SendTransaction], Awaitable[bool]], specifier: OutputSessionSpecifier, payload_metadata: PayloadMetadata, finalizer: Callable[[], None])[source]

Use the factory method.

property specifier: OutputSessionSpecifier[source]
property payload_metadata: PayloadMetadata[source]
enable_feedback(handler: Callable[[Feedback], None]) None[source]
disable_feedback() None[source]
sample_statistics() SessionStatistics[source]
close() None[source]
class pycyphal.transport.can.CANTransportStatistics(in_frames: int = 0, in_frames_cyphal: int = 0, in_frames_cyphal_accepted: int = 0, in_frames_loopback: int = 0, in_frames_errored: int = 0, out_frames: int = 0, out_frames_timeout: int = 0, out_frames_loopback: int = 0)[source]

Bases: TransportStatistics

The following invariants apply:

out_frames >= out_frames_loopback
in_frames >= in_frames_cyphal >= in_frames_cyphal_accepted
out_frames_loopback >= in_frames_loopback
in_frames: int = 0

Number of genuine frames received from the bus (loopback not included).

in_frames_cyphal: int = 0

Subset of the above that happen to be valid Cyphal frames.

in_frames_cyphal_accepted: int = 0

Subset of the above that are useful for the local application.

in_frames_loopback: int = 0

Number of loopback frames received from the media instance (not bus).

in_frames_errored: int = 0

How many frames of any kind could not be successfully processed.

out_frames: int = 0

Number of frames sent to the media instance successfully.

out_frames_timeout: int = 0

Number of frames that were supposed to be sent but timed out.

out_frames_loopback: int = 0

Number of sent frames that we requested loopback for.

property media_acceptance_filtering_efficiency: float[source]

An efficiency metric for the acceptance filtering implemented in the media instance. The value of 1.0 (100%) indicates perfect filtering, where the media can sort out relevant frames from irrelevant ones completely autonomously. The value of 0 indicates that none of the frames passed over from the media instance are useful for the application (all ignored).

property lost_loopback_frames: int[source]

The number of loopback frames that have been requested but never returned. Normally the value should be zero. The value may transiently increase to small values if the counters happened to be sampled while the loopback frames reside in the transmission queue of the CAN controller awaiting being processed. If the value remains positive for long periods of time, the media driver is probably misbehaving. A negative value means that the media instance is sending more loopback frames than requested (bad).

property in_frames_uavcan: int[source]
property in_frames_uavcan_accepted: int[source]
__eq__(other)[source]
__hash__ = None
__init__(in_frames: int = 0, in_frames_cyphal: int = 0, in_frames_cyphal_accepted: int = 0, in_frames_loopback: int = 0, in_frames_errored: int = 0, out_frames: int = 0, out_frames_timeout: int = 0, out_frames_loopback: int = 0) None[source]
__match_args__ = ('in_frames', 'in_frames_cyphal', 'in_frames_cyphal_accepted', 'in_frames_loopback', 'in_frames_errored', 'out_frames', 'out_frames_timeout', 'out_frames_loopback')
__repr__()[source]
class pycyphal.transport.can.CANInputSessionStatistics(transfers: 'int' = 0, frames: 'int' = 0, payload_bytes: 'int' = 0, errors: 'int' = 0, drops: 'int' = 0, reception_error_counters: 'typing.Dict[TransferReassemblyErrorID, int]' = <factory>)[source]

Bases: SessionStatistics

reception_error_counters: Dict[TransferReassemblyErrorID, int]
__eq__(other)[source]
__hash__ = None
__init__(transfers: int = 0, frames: int = 0, payload_bytes: int = 0, errors: int = 0, drops: int = 0, reception_error_counters: ~typing.Dict[~pycyphal.transport.can._session._transfer_reassembler.TransferReassemblyErrorID, int] = <factory>) None[source]
__match_args__ = ('transfers', 'frames', 'payload_bytes', 'errors', 'drops', 'reception_error_counters')
__repr__()[source]
class pycyphal.transport.can.TransferReassemblyErrorID(value)[source]

Bases: Enum

Transfer reassembly error codes. Used in the extended error statistics. See the Cyphal specification for background info. We have ID in the name to make clear that this is not an exception type.

MISSED_START_OF_TRANSFER = 1
UNEXPECTED_TOGGLE_BIT = 2
UNEXPECTED_TRANSFER_ID = 3
TRANSFER_CRC_MISMATCH = 4
class pycyphal.transport.can.CANCapture(timestamp: Timestamp, frame: DataFrame, own: bool)[source]

Bases: Capture

See pycyphal.transport.can.CANTransport.begin_capture() for details.

frame: DataFrame
own: bool

True if the captured frame was sent by the local transport instance. False if it was received from the bus.

parse() Tuple[AlienSessionSpecifier, Priority, pycyphal.transport.can.CyphalFrame] | None[source]
__repr__() str[source]
static get_transport_type() Type[CANTransport][source]
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: Timestamp, frame: DataFrame, own: bool) None[source]
__match_args__ = ('timestamp', 'frame', 'own')
__setattr__(name, value)[source]
class pycyphal.transport.can.CANErrorTrace(timestamp: 'pycyphal.transport.Timestamp', error: 'TransferReassemblyErrorID')[source]

Bases: ErrorTrace

error: TransferReassemblyErrorID
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: Timestamp, error: TransferReassemblyErrorID) None[source]
__match_args__ = ('timestamp', 'error')
__repr__()[source]
__setattr__(name, value)[source]
class pycyphal.transport.can.CANTracer[source]

Bases: Tracer

The CAN tracer does not differentiate between RX and TX frames, they are treated uniformly. Return types from update():

__init__() None[source]
update(cap: Capture) Trace | None[source]