pycyphal.transport.can package
Subpackages
Module contents
Cyphal/CAN transport overview
This module implements Cyphal/CAN – the CAN transport for Cyphal, both Classic CAN and CAN FD, as defined in the Cyphal specification. Cyphal does not distinguish between the two aside from the MTU difference; neither does this implementation. Classic CAN is essentially treated as CAN FD with MTU of 8 bytes.
Different CAN hardware is supported through the media sublayer; please refer to pycyphal.transport.can.media
.
Per the Cyphal specification, the CAN transport supports broadcast messages and unicast services:
Supported transfers |
Unicast |
Broadcast |
---|---|---|
Message |
No |
Yes |
Service |
Yes |
Banned by Specification |
Tooling
Some of the media sub-layer implementations support virtual CAN bus interfaces (e.g., SocketCAN on GNU/Linux); they are often useful for testing. Please read the media sub-layer documentation for details.
Inheritance diagram
- class pycyphal.transport.can.CANTransport(media: pycyphal.transport.can.media._media.Media, local_node_id: Optional[int], *, loop: Optional[asyncio.events.AbstractEventLoop] = None)[source]
Bases:
pycyphal.transport._transport.Transport
The standard Cyphal/CAN transport implementation as defined in the Cyphal specification. Please read the module documentation for details.
- TRANSFER_ID_MODULO = 32
- __init__(media: pycyphal.transport.can.media._media.Media, local_node_id: Optional[int], *, loop: Optional[asyncio.events.AbstractEventLoop] = None)[source]
- Parameters
media – The media implementation.
local_node_id – The node-ID to use. Can’t be changed. None means anonymous (useful for PnP allocation).
loop – Deprecated.
- property protocol_parameters: pycyphal.transport._transport.ProtocolParameters[source]
- property local_node_id: Optional[int][source]
If the local node-ID is not assigned, automatic retransmission in the media implementation is disabled to facilitate plug-and-play node-ID allocation.
- property input_sessions: Sequence[pycyphal.transport.can._session._input.CANInputSession][source]
- property output_sessions: Sequence[pycyphal.transport.can._session._output.CANOutputSession][source]
- sample_statistics() pycyphal.transport.can._can.CANTransportStatistics [source]
- get_input_session(specifier: pycyphal.transport._session.InputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata) pycyphal.transport.can._session._input.CANInputSession [source]
See the base class docs for background. Whenever an input session is created or destroyed, the hardware acceptance filters are reconfigured automatically; computation of a new configuration and its deployment on the CAN controller may be slow.
- get_output_session(specifier: pycyphal.transport._session.OutputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata) pycyphal.transport.can._session._output.CANOutputSession [source]
- begin_capture(handler: Callable[[pycyphal.transport._tracer.Capture], None]) None [source]
Capture is implemented by reconfiguring the acceptance filter to accept everything and forcing loopback for every outgoing frame. Forced loopback ensures that transmitted frames are timestamped very accurately. Captured frames are encapsulated inside
pycyphal.transport.can.CANCapture
.
- static make_tracer() pycyphal.transport.can._tracer.CANTracer [source]
See
CANTracer
.
- async spoof_frames(frames: Sequence[pycyphal.transport.can.media._frame.DataFrame], monotonic_deadline: float) None [source]
Inject arbitrary frames into the transport directly. Frames that could not be delivered to the underlying media driver before the deadline are silently dropped. This method is mostly intended for co-existence with other communication protocols that use the same CAN interface (e.g., DroneCAN).
- async spoof(transfer: pycyphal.transport._tracer.AlienTransfer, monotonic_deadline: float) bool [source]
Spoofing over the CAN transport is trivial and it does not involve reconfiguration of the media layer. It can be invoked at no cost at any time (unlike, say, Cyphal/UDP). See the overridden method
pycyphal.transport.Transport.spoof()
for details.
- class pycyphal.transport.can.CANInputSession(specifier: pycyphal.transport._session.InputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata, finalizer: Callable[[], None])[source]
Bases:
pycyphal.transport.can.CANSession
,pycyphal.transport._session.InputSession
- DEFAULT_TRANSFER_ID_TIMEOUT = 2
Per the Cyphal specification. Units are seconds. Can be overridden after instantiation if needed.
- __init__(specifier: pycyphal.transport._session.InputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata, finalizer: Callable[[], None])[source]
Use the factory method.
- property frame_queue_capacity: Optional[int][source]
Capacity of the input frame queue. None means that the capacity is unlimited, which is the default. This may deplete the heap if input transfers are not consumed quickly enough so beware.
If the capacity is changed and the new value is smaller than the number of frames currently in the queue, the newest frames will be discarded and the number of queue overruns will be incremented accordingly. The complexity of a queue capacity change may be up to linear of the number of frames currently in the queue. If the value is not None, it must be a positive integer, otherwise you get a
ValueError
.
- property specifier: pycyphal.transport._session.InputSessionSpecifier[source]
- property payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata[source]
- sample_statistics() pycyphal.transport.can._session._input.CANInputSessionStatistics [source]
- async receive(monotonic_deadline: float) Optional[pycyphal.transport._transfer.TransferFrom] [source]
-
class pycyphal.transport.can.CANOutputSession(transport: pycyphal.transport.can._can.CANTransport, send_handler: Callable[[
pycyphal.transport.can.SendTransaction
], Awaitable[bool]], specifier: pycyphal.transport._session.OutputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata, finalizer: Callable[[], None])[source] Bases:
pycyphal.transport.can.CANSession
,pycyphal.transport._session.OutputSession
This is actually an abstract class, but its concrete inheritors are hidden from the API. The implementation is chosen according to the type of the session requested: broadcast or unicast.
-
__init__(transport: pycyphal.transport.can._can.CANTransport, send_handler: Callable[[
pycyphal.transport.can.SendTransaction
], Awaitable[bool]], specifier: pycyphal.transport._session.OutputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata, finalizer: Callable[[], None])[source] Use the factory method.
- property specifier: pycyphal.transport._session.OutputSessionSpecifier[source]
- property payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata[source]
- enable_feedback(handler: Callable[[pycyphal.transport._session.Feedback], None]) None [source]
- sample_statistics() pycyphal.transport._session.SessionStatistics [source]
-
__init__(transport: pycyphal.transport.can._can.CANTransport, send_handler: Callable[[
- class pycyphal.transport.can.CANTransportStatistics(in_frames: int = 0, in_frames_cyphal: int = 0, in_frames_cyphal_accepted: int = 0, in_frames_loopback: int = 0, in_frames_errored: int = 0, out_frames: int = 0, out_frames_timeout: int = 0, out_frames_loopback: int = 0)[source]
Bases:
pycyphal.transport._transport.TransportStatistics
The following invariants apply:
out_frames >= out_frames_loopback in_frames >= in_frames_cyphal >= in_frames_cyphal_accepted out_frames_loopback >= in_frames_loopback
- property media_acceptance_filtering_efficiency: float[source]
An efficiency metric for the acceptance filtering implemented in the media instance. The value of 1.0 (100%) indicates perfect filtering, where the media can sort out relevant frames from irrelevant ones completely autonomously. The value of 0 indicates that none of the frames passed over from the media instance are useful for the application (all ignored).
- property lost_loopback_frames: int[source]
The number of loopback frames that have been requested but never returned. Normally the value should be zero. The value may transiently increase to small values if the counters happened to be sampled while the loopback frames reside in the transmission queue of the CAN controller awaiting being processed. If the value remains positive for long periods of time, the media driver is probably misbehaving. A negative value means that the media instance is sending more loopback frames than requested (bad).
- __hash__ = None
- class pycyphal.transport.can.CANInputSessionStatistics(transfers: 'int' = 0, frames: 'int' = 0, payload_bytes: 'int' = 0, errors: 'int' = 0, drops: 'int' = 0, reception_error_counters: 'typing.Dict[TransferReassemblyErrorID, int]' = <factory>)[source]
Bases:
pycyphal.transport._session.SessionStatistics
- reception_error_counters: Dict[pycyphal.transport.can._session._transfer_reassembler.TransferReassemblyErrorID, int]
- __hash__ = None
- class pycyphal.transport.can.TransferReassemblyErrorID(value)[source]
Bases:
enum.Enum
Transfer reassembly error codes. Used in the extended error statistics. See the Cyphal specification for background info. We have
ID
in the name to make clear that this is not an exception type.- MISSED_START_OF_TRANSFER = 1
- UNEXPECTED_TOGGLE_BIT = 2
- UNEXPECTED_TRANSFER_ID = 3
- TRANSFER_CRC_MISMATCH = 4
- class pycyphal.transport.can.CANCapture(timestamp: pycyphal.transport._timestamp.Timestamp, frame: pycyphal.transport.can.media._frame.DataFrame, own: bool)[source]
Bases:
pycyphal.transport._tracer.Capture
See
pycyphal.transport.can.CANTransport.begin_capture()
for details.- own: bool
True if the captured frame was sent by the local transport instance. False if it was received from the bus.
-
parse() Optional[Tuple[pycyphal.transport._tracer.AlienSessionSpecifier, pycyphal.transport._transfer.Priority,
pycyphal.transport.can.CyphalFrame
]] [source]
- static get_transport_type() Type[pycyphal.transport.can._can.CANTransport] [source]
- __init__(timestamp: pycyphal.transport._timestamp.Timestamp, frame: pycyphal.transport.can.media._frame.DataFrame, own: bool) None [source]
- class pycyphal.transport.can.CANErrorTrace(timestamp: 'pycyphal.transport.Timestamp', error: 'TransferReassemblyErrorID')[source]
- class pycyphal.transport.can.CANTracer[source]
Bases:
pycyphal.transport._tracer.Tracer
The CAN tracer does not differentiate between RX and TX frames, they are treated uniformly. Return types from
update()
:- update(cap: pycyphal.transport._tracer.Capture) Optional[pycyphal.transport._tracer.Trace] [source]