pycyphal.transport.can.media package
Subpackages
Module contents
- class pycyphal.transport.can.media.Media[source]
Bases:
ABC
CAN hardware abstraction interface.
It is recognized that the availability of some of the media implementations may be conditional on the type of platform (e.g., SocketCAN is Linux-only) and the availability of third-party software (e.g., PySerial may be needed for SLCAN). Python packages containing such media implementations shall be always importable.
- ReceivedFramesHandler
The frames handler is non-blocking and non-yielding; returns immediately. The timestamp is provided individually per frame.
alias of
Callable
[[Sequence
[Tuple
[Timestamp
,Envelope
]]],None
]
- VALID_MTU_SET = {8, 12, 16, 20, 24, 32, 48, 64}
Valid MTU values for Classic CAN and CAN FD.
- abstract property interface_name: str[source]
The name of the interface on the local system. For example:
can0
for SocketCAN;/dev/serial/by-id/usb-Zubax_Robotics_Zubax_Babel_28002E0001514D593833302000000000-if00
for SLCAN;COM9
for SLCAN.
- abstract property mtu: int[source]
The value belongs to
VALID_MTU_SET
. Observe that the media interface doesn’t care whether we’re using CAN FD or CAN 2.0 because the Cyphal CAN transport protocol itself doesn’t care. The transport simply does not distinguish them.
- abstract property number_of_acceptance_filters: int[source]
The number of hardware acceptance filters supported by the underlying CAN controller. Some media drivers, such as SocketCAN, may implement acceptance filtering in software instead of hardware. The returned value shall be a positive integer. If the hardware does not support filtering at all, the media driver shall emulate at least one filter in software.
- abstract start(handler: ReceivedFramesHandler, no_automatic_retransmission: bool) None [source]
Every received frame shall be timestamped. Both monotonic and system timestamps are required. There are no timestamping accuracy requirements. An empty set of frames should never be reported.
The media implementation shall drop all non-data frames (RTR frames, error frames, etc.).
If the set contains more than one frame, all frames must be ordered by the time of their arrival, which also should be reflected in their timestamps; that is, the timestamp of a frame at index N generally should not be higher than the timestamp of a frame at index N+1. The timestamp ordering, however, is not a strict requirement because it is recognized that due to error variations in the timestamping algorithms timestamp values may not be monotonically increasing.
The implementation should strive to return as many frames per call as possible as long as that does not increase the worst case latency.
The handler shall be invoked on the event loop returned by
loop
.The transport is guaranteed to invoke this method exactly once during (or shortly after) initialization; it can be used to perform a lazy start of the receive loop task/thread/whatever. It is undefined behavior to invoke this method more than once on the same instance.
- Parameters:
handler – Behold my transformation. You are empowered to do as you please.
no_automatic_retransmission – If True, the CAN controller should be configured to abort transmission of CAN frames after first error or arbitration loss (time-triggered transmission mode). This mode is used by Cyphal to facilitate the PnP node-ID allocation process on the client side. Its support is not mandatory but highly recommended to avoid excessive disturbance of the bus while PnP allocations are in progress.
- abstract configure_acceptance_filters(configuration: Sequence[FilterConfiguration]) None [source]
This method is invoked whenever the subscription set is changed in order to communicate to the underlying CAN controller hardware which CAN frames should be accepted and which ones should be ignored.
An empty set of configurations means that the transport is not interested in any frames, i.e., all frames should be rejected by the controller. That is also the recommended default configuration (ignore all frames until explicitly requested otherwise).
- abstract async send(frames: Iterable[Envelope], monotonic_deadline: float) int [source]
All passed frames are guaranteed to share the same CAN-ID. This guarantee may enable some optimizations. The frames shall be delivered to the bus in the same order. The iterable is guaranteed to be non-empty.
The method returns when the deadline is reached even if some of the frames could not be transmitted. The returned value is the number of frames that have been sent. If the returned number is lower than the number of supplied frames, the outer transport logic will register an error, which is then propagated upwards all the way to the application level.
The method should avoid yielding the execution flow; instead, it is recommended to unload the frames into an internal transmission queue and return ASAP, as that minimizes the likelihood of inner priority inversion. If that approach is used, implementations are advised to keep track of transmission deadline on a per-frame basis to meet the timing requirements imposed by the application.
- abstract close() None [source]
After the media instance is closed, none of its methods can be used anymore. If a method is invoked after close,
pycyphal.transport.ResourceClosedError
should be raised. This method is an exception to that rule: if invoked on a closed instance, it shall do nothing.
- static list_available_interface_names() Iterable[str] [source]
Returns the list of interface names that can be used with the media class implementing it. For example, for the SocketCAN media class it would return the SocketCAN interface names such as “vcan0”; for SLCAN it would return the list of serial ports.
Implementations should strive to sort the output so that the interfaces that are most likely to be used are listed first – this helps GUI applications.
If the media implementation cannot be used on the local platform, the method shall return an empty set instead of raising an error. This guarantee supports an important use case where the caller would just iterate over all inheritors of this Media interface and ask each one to yield the list of available interfaces, and then just present that to the user.
- class pycyphal.transport.can.media.FrameFormat(value)[source]
Bases:
IntEnum
An enumeration.
- BASE = 11
- EXTENDED = 29
- class pycyphal.transport.can.media.DataFrame(format: 'FrameFormat', identifier: 'int', data: 'bytearray')[source]
Bases:
object
- format: FrameFormat
- static get_required_padding(data_length: int) int [source]
Computes padding to nearest valid CAN FD frame size.
>>> DataFrame.get_required_padding(6) 0 >>> DataFrame.get_required_padding(61) 3
- __match_args__ = ('format', 'identifier', 'data')
- class pycyphal.transport.can.media.Envelope(frame: DataFrame, loopback: bool)[source]
Bases:
object
The envelope models a singular input/output frame transaction. It is a media layer frame extended with IO-related metadata.
- __match_args__ = ('frame', 'loopback')
- class pycyphal.transport.can.media.FilterConfiguration(identifier: 'int', mask: 'int', format: 'typing.Optional[FrameFormat]')[source]
Bases:
object
- format: FrameFormat | None
None means no preference – both formats will be accepted.
- static new_promiscuous(frame_format: FrameFormat | None = None) FilterConfiguration [source]
Returns a configuration that accepts all frames of the specified format. If the format is not specified, no distinction will be made. Note that some CAN controllers may have difficulty supporting both formats on a single filter.
- property rank: int[source]
This is the number of set bits in the mask. This is a part of the CAN acceptance filter configuration optimization algorithm; see
optimize_filter_configurations()
.We return negative rank for configurations which do not distinguish between extended and base frames in order to discourage merger of configurations of different frame types, since they are hard to support in certain CAN controllers. The effect of this is that we guarantee that an ambivalent filter configuration will never appear if the controller has at least two acceptance filters. Negative rank is computed by subtracting the number of bits in the CAN ID (or 29 if the filter accepts both base and extended identifiers) from the original rank.
- merge(other: FilterConfiguration) FilterConfiguration [source]
This is a part of the CAN acceptance filter configuration optimization algorithm; see
optimize_filter_configurations()
.Given two filter configurations
A
andB
, whereA
accepts CAN frames whose identifiers belong toCa
and likewiseCb
forB
, the merge product ofA
andB
would be a new filter configuration that accepts CAN frames belonging to a new set which is a superset of the union ofCa
andCb
.
- __match_args__ = ('identifier', 'mask', 'format')
- pycyphal.transport.can.media.optimize_filter_configurations(configurations: Iterable[FilterConfiguration], target_number_of_configurations: int) Sequence[FilterConfiguration] [source]
Implements the CAN acceptance filter configuration optimization algorithm described in the Specification. The algorithm was originally proposed by P. Kirienko and I. Sheremet.
Given a set of
K
filter configurations that accept CAN frames whose identifiers belong to the setC
, andN
acceptance filters implemented in hardware, where1 <= N < K
, find a new set ofK'
filter configurations that accept CAN frames whose identifiers belong to the setC'
, such thatK' <= N
,C'
is a superset ofC
, and|C'|
is minimized.The algorithm is not defined for
N >= K
because this configuration is considered optimal. The function returns the input set unchanged in this case. If the target number of configurations is not positive, a ValueError is raised.The time complexity of this implementation is
O(K!)
; it should be optimized.