pycyphal.transport.serial package

Module contents

Cyphal/Serial transport overview

The Cyphal/Serial transport is experimental and is not yet part of the Cyphal specification. Future revisions may break wire compatibility until the transport is formally specified. Context: https://forum.opencyphal.org/t/alternative-transport-protocols/324, also see the discussion at https://forum.opencyphal.org/t/yukon-design-megathread/390/115?u=pavel.kirienko.

The Cyphal/Serial transport is designed for OSI L1 byte-level duplex serial links and tunnels:

  • UART, RS-422/485/232 (duplex); the recommended rates are: 115200 bps, 921600 bps, 3 Mbps, 10 Mbps, 100 Mbps.

  • USB CDC ACM.

  • TCP/IP encapsulation.

It may also be suited for raw transport log storage, because one-dimensional flat binary files are structurally similar to serial byte-level links.

This transport module contains no media sublayers because the media abstraction is handled directly by the PySerial library and the underlying operating system.

The serial transport supports all transfer categories:

Supported transfers

Unicast

Broadcast

Message

Yes, non-spec extension

Yes

Service

Yes

Banned by Specification

Protocol definition

The packet header is defined as follows (byte/bit ordering in this definition follow the DSDL specification: least significant first):

uint8   version              # Always zero. Discard the frame if not.
uint8   priority             # 0 = highest, 7 = lowest; the rest are unused.
uint16  source node ID       # 0xFFFF = anonymous.
uint16  destination node ID  # 0xFFFF = broadcast.
uint16  data specifier

void64                       # Reserved; later may be leveraged for runtime type identification.
uint64  transfer-ID

uint32  frame index EOT      # MSB set if last frame of the transfer; i.e., 0x8000_0000 if single-frame transfer.
uint32  header CRC           # CRC-32C (Castagnoli) of the header (all fields above).

For message frames, the data specifier field contains the subject-ID value, so that the most significant bit is always cleared. For service frames, the most significant bit (15th) is always set, and the second-to-most-significant bit (14th) is set for response transfers only; the remaining 14 least significant bits contain the service-ID value.

Total header size: 32 bytes (256 bits).

The header is prepended before the frame payload; the resulting structure is encoded into its serialized form using the following packet format:

Frame delimiter 0x00

Escaped header

Escaped payload

Escaped CRC-32C of the payload

Frame delimiter 0x00

1 byte

32 bytes

>=0 bytes

4 bytes

1 byte

Single-byte frame delimiter 0x00. Begins a new frame and possibly terminates the previous frame.

Four bytes long, little-endian byte order; The CRC is computed over the unescaped (i.e., original form) payload, not including the header (because the header has a dedicated CRC).

Same frame delimiter as at the start. Terminates the current frame and possibly begins the next frame.

This part is escaped using COBS alorithm by Chesire and Baker http://www.stuartcheshire.org/papers/COBSforToN.pdf. A frame delimiter (0) is guaranteed to never occur here.

The frame encoding overhead is 1 byte in every 254 bytes of the header+payload+CRC, which is about ~0.4%. There is a somewhat relevant discussion at https://forum.opencyphal.org/t/uavcan-serial-issues-with-dma-friendliness-and-bandwidth-overhead/846.

The last four bytes of a multi-frame transfer payload contain the CRC32C (Castagnoli) hash of the transfer payload in little-endian byte order. The multi-frame transfer logic (decomposition and reassembly) is implemented in a separate transport-agnostic module pycyphal.transport.commons.high_overhead_transport. Despite the fact that the support for multi-frame transfers is built into this transport, it should not be relied on and it may be removed later. The reason is that serial links do not have native support for framing, and as such, it is possible to configure the MTU to be arbitrarily high to avoid multi-frame transfers completely. The lack of multi-frame transfers simplifies implementations drastically, which is important for deeply-embedded systems. As such, all serial transfers should be single-frame transfers.

Note that we use CRC-32C (Castagnoli) as the header/frame CRC instead of CRC-32K2 (Koopman-2) which is superior at short data blocks offering the Hamming distance of 6 as opposed to 4. This is because Castagnoli is superior for transfer CRC which is often sufficiently long to flip the balance in favor of Castagnoli rather than Koopman. We could use Koopman for the header/frame CRC and keep Castagnoli for the transfer CRC, but such diversity is harmful because it would require implementers to keep two separate CRC tables which may be costly in embedded applications and may deteriorate the performance of CPU caches.

Usage

>>> import asyncio
>>> import pycyphal
>>> import pycyphal.transport.serial
>>> tr = pycyphal.transport.serial.SerialTransport('loop://', local_node_id=1234, baudrate=115200)
>>> tr.local_node_id
1234
>>> tr.serial_port.baudrate
115200
>>> pm = pycyphal.transport.PayloadMetadata(1024)
>>> ds = pycyphal.transport.MessageDataSpecifier(2345)
>>> pub = tr.get_output_session(pycyphal.transport.OutputSessionSpecifier(ds, None), pm)
>>> sub = tr.get_input_session(pycyphal.transport.InputSessionSpecifier(ds, None), pm)
>>> doctest_await(pub.send(pycyphal.transport.Transfer(pycyphal.transport.Timestamp.now(),
...                                                    pycyphal.transport.Priority.LOW,
...                                                    1111,
...                                                    fragmented_payload=[]),
...                        asyncio.get_event_loop().time() + 1.0))
True
>>> doctest_await(sub.receive(asyncio.get_event_loop().time() + 1.0))
TransferFrom(..., transfer_id=1111, ...)
>>> tr.close()

Tooling

Serial data logging

The underlying PySerial library provides a convenient method of logging exchange through a serial port into a file. To invoke this feature, embed the name of the serial port into the URI spy:///dev/ttyUSB0?file=dump.txt, where /dev/ttyUSB0 is the name of the serial port, dump.txt is the name of the log file.

TCP/IP tunneling

For testing or experimentation it is often convenient to use a virtual link instead of a real one. The underlying PySerial library supports tunneling of raw serial data over TCP connections, which can be leveraged for local testing without accessing any physical serial ports. This option can be accessed by specifying the URI of the form socket://<address>:<port> instead of a real serial port name when establishing the connection.

The location specified in the URL must point to the TCP server port that will forward the data to and from the other end of the link. While such a server can be trivially coded manually by the developer, it is possible to avoid the effort by relying on the TCP connection brokering mode available in Ncat (which is a part of the Nmap project, thanks Fyodor).

For example, one could set up the TCP broker as follows (add -v to see what’s happening; more info at https://nmap.org/ncat/guide/ncat-broker.html) (the port number is chosen at random here):

ncat --broker --listen -p 50905

And then use a serial transport with socket://127.0.0.1:50905 (N.B.: using localhost may significantly increase initialization latency on Windows due to slow DNS lookup). All nodes whose transports are configured like that will be able to communicate with each other, as if they were connected to the same bus. Essentially, this can be seen as a virtualized RS-485 bus, where same concerns regarding medium access coordination apply.

The location of the URI doesn’t have to be local, of course – one can use this approach to link Cyphal nodes via conventional IP networks.

The exchange over the virtual bus can be dumped trivially for analysis:

nc localhost 50905 > dump.bin

Inheritance diagram

Inheritance diagram of pycyphal.transport.serial._serial, pycyphal.transport.serial._frame, pycyphal.transport.serial._session._base, pycyphal.transport.serial._session._input, pycyphal.transport.serial._session._output, pycyphal.transport.serial._tracer

class pycyphal.transport.serial.SerialTransport(serial_port: Union[str, serial.serialutil.SerialBase], local_node_id: Optional[int], *, mtu: int = 1073741824, service_transfer_multiplier: int = 2, baudrate: Optional[int] = None, loop: Optional[asyncio.events.AbstractEventLoop] = None)[source]

Bases: pycyphal.transport._transport.Transport

The Cyphal/Serial transport is designed for OSI L1 byte-level serial links and tunnels, such as UART, RS-422/485/232 (duplex), USB CDC ACM, TCP/IP, etc. Please read the module documentation for details.

TRANSFER_ID_MODULO = 18446744073709551616
VALID_MTU_RANGE = (1024, 1073741824)

The maximum MTU is practically unlimited, and it is also the default MTU. This is by design to ensure that all frames are single-frame transfers. Compliant implementations of the serial transport do not have to support multi-frame transfers, which removes the greatest chunk of complexity from the protocol.

DEFAULT_SERVICE_TRANSFER_MULTIPLIER = 2
VALID_SERVICE_TRANSFER_MULTIPLIER_RANGE = (1, 5)
__init__(serial_port: Union[str, serial.serialutil.SerialBase], local_node_id: Optional[int], *, mtu: int = 1073741824, service_transfer_multiplier: int = 2, baudrate: Optional[int] = None, loop: Optional[asyncio.events.AbstractEventLoop] = None)[source]
Parameters
  • serial_port

    The serial port instance to communicate over, or its name. In the latter case, the port will be constructed via serial.serial_for_url() (refer to the PySerial docs for the background). The new instance takes ownership of the port; when the instance is closed, its port will also be closed. Examples:

    • /dev/ttyACM0 – a regular serial port on GNU/Linux (USB CDC ACM in this example).

    • COM9 – likewise, on Windows.

    • /dev/serial/by-id/usb-Black_Sphere_Technologies_Black_Magic_Probe_B5DCABF5-if02 – a regular USB CDC ACM port referenced by the device name and ID (GNU/Linux).

    • hwgrep:///dev/serial/by-id/*Black_Magic_Probe*-if02 – glob instead of exact name.

    • socket://127.0.0.1:50905 – a TCP/IP tunnel instead of a physical port.

    • spy://COM3?file=dump.txt – open a regular port and dump all data exchange into a text file.

    Read the PySerial docs for more info.

  • local_node_id – The node-ID to use. Can’t be changed after initialization. None means that the transport will operate in the anonymous mode.

  • mtu

    Use single-frame transfers for all outgoing transfers containing not more than than this many bytes of payload. Otherwise, use multi-frame transfers.

    By default, the MTU is virtually unlimited (to be precise, it is set to a very large number that is unattainable in practice), meaning that all transfers will be single-frame transfers. Such behavior is optimal for the serial transport because it does not have native framing and as such it supports frames of arbitrary sizes. Implementations may omit the support for multi-frame transfers completely, which removes the greatest chunk of complexity from the protocol.

    This setting does not affect transfer reception – the RX MTU is always set to the maximum valid MTU (i.e., practically unlimited).

  • service_transfer_multiplier – Deterministic data loss mitigation for service transfers. This parameter specifies the number of times each outgoing service transfer will be repeated. This setting does not affect message transfers.

  • baudrate – If not None, the specified baud rate will be configured on the serial port. Otherwise, the baudrate will be left unchanged.

  • loop – Deprecated.

property protocol_parameters: pycyphal.transport._transport.ProtocolParameters[source]
property local_node_id: Optional[int][source]
close() None[source]
get_input_session(specifier: pycyphal.transport._session.InputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata) pycyphal.transport.serial._session._input.SerialInputSession[source]
get_output_session(specifier: pycyphal.transport._session.OutputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata) pycyphal.transport.serial._session._output.SerialOutputSession[source]
property input_sessions: Sequence[pycyphal.transport.serial._session._input.SerialInputSession][source]
property output_sessions: Sequence[pycyphal.transport.serial._session._output.SerialOutputSession][source]
property serial_port: serial.serialutil.SerialBase[source]
sample_statistics() pycyphal.transport.serial._serial.SerialTransportStatistics[source]
begin_capture(handler: Callable[[pycyphal.transport._tracer.Capture], None]) None[source]

The reported events are of type SerialCapture, please read its documentation for details. The events may be reported from a different thread (use locks).

property capture_active: bool[source]
static make_tracer() pycyphal.transport.serial._tracer.SerialTracer[source]

See SerialTracer.

async spoof(transfer: pycyphal.transport._tracer.AlienTransfer, monotonic_deadline: float) bool[source]

Spoofing over the serial transport is trivial and it does not involve reconfiguration of the media layer. It can be invoked at no cost at any time (unlike, say, Cyphal/UDP). See the overridden method pycyphal.transport.Transport.spoof() for details.

Notice that if the transport operates over the virtual loopback port loop:// with capture enabled, every spoofed frame will be captured twice: one TX, one RX. Same goes for regular transfers.

class pycyphal.transport.serial.SerialTransportStatistics(in_bytes: int = 0, in_frames: int = 0, in_out_of_band_bytes: int = 0, out_bytes: int = 0, out_frames: int = 0, out_transfers: int = 0, out_incomplete: int = 0)[source]

Bases: pycyphal.transport._transport.TransportStatistics

in_bytes: int = 0
in_frames: int = 0
in_out_of_band_bytes: int = 0
out_bytes: int = 0
out_frames: int = 0
out_transfers: int = 0
out_incomplete: int = 0
__eq__(other)[source]
__hash__ = None
__init__(in_bytes: int = 0, in_frames: int = 0, in_out_of_band_bytes: int = 0, out_bytes: int = 0, out_frames: int = 0, out_transfers: int = 0, out_incomplete: int = 0) None[source]
__repr__()[source]
class pycyphal.transport.serial.SerialSession(finalizer: Callable[[], None])[source]

Bases: object

__init__(finalizer: Callable[[], None])[source]
close() None[source]
class pycyphal.transport.serial.SerialInputSession(specifier: pycyphal.transport._session.InputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata, finalizer: Callable[[], None])[source]

Bases: pycyphal.transport.serial._session._base.SerialSession, pycyphal.transport._session.InputSession

DEFAULT_TRANSFER_ID_TIMEOUT = 2.0

Units are seconds. Can be overridden after instantiation if needed.

__init__(specifier: pycyphal.transport._session.InputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata, finalizer: Callable[[], None])[source]

Do not call this directly. Instead, use the factory method pycyphal.transport.serial.SerialTransport.get_input_session().

async receive(monotonic_deadline: float) Optional[pycyphal.transport._transfer.TransferFrom][source]
property transfer_id_timeout: float[source]
property specifier: pycyphal.transport._session.InputSessionSpecifier[source]
property payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata[source]
sample_statistics() pycyphal.transport.serial._session._input.SerialInputSessionStatistics[source]
class pycyphal.transport.serial.SerialOutputSession(specifier: pycyphal.transport._session.OutputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata, mtu: int, local_node_id: Optional[int], send_handler: Callable[[List[pycyphal.transport.serial._frame.SerialFrame], float], Awaitable[Optional[pycyphal.transport._timestamp.Timestamp]]], finalizer: Callable[[], None])[source]

Bases: pycyphal.transport.serial._session._base.SerialSession, pycyphal.transport._session.OutputSession

__init__(specifier: pycyphal.transport._session.OutputSessionSpecifier, payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata, mtu: int, local_node_id: Optional[int], send_handler: Callable[[List[pycyphal.transport.serial._frame.SerialFrame], float], Awaitable[Optional[pycyphal.transport._timestamp.Timestamp]]], finalizer: Callable[[], None])[source]

Do not call this directly. Instead, use the factory method pycyphal.transport.serial.SerialTransport.get_output_session().

async send(transfer: pycyphal.transport._transfer.Transfer, monotonic_deadline: float) bool[source]
enable_feedback(handler: Callable[[pycyphal.transport._session.Feedback], None]) None[source]
disable_feedback() None[source]
property specifier: pycyphal.transport._session.OutputSessionSpecifier[source]
property payload_metadata: pycyphal.transport._payload_metadata.PayloadMetadata[source]
sample_statistics() pycyphal.transport._session.SessionStatistics[source]
close() None[source]
class pycyphal.transport.serial.SerialFeedback(original_transfer_timestamp: pycyphal.transport._timestamp.Timestamp, first_frame_transmission_timestamp: pycyphal.transport._timestamp.Timestamp)[source]

Bases: pycyphal.transport._session.Feedback

__init__(original_transfer_timestamp: pycyphal.transport._timestamp.Timestamp, first_frame_transmission_timestamp: pycyphal.transport._timestamp.Timestamp)[source]
property original_transfer_timestamp: pycyphal.transport._timestamp.Timestamp[source]
property first_frame_transmission_timestamp: pycyphal.transport._timestamp.Timestamp[source]
class pycyphal.transport.serial.SerialInputSessionStatistics(transfers: 'int' = 0, frames: 'int' = 0, payload_bytes: 'int' = 0, errors: 'int' = 0, drops: 'int' = 0, reassembly_errors_per_source_node_id: 'typing.Dict[int, typing.Dict[TransferReassembler.Error, int]]' = <factory>)[source]

Bases: pycyphal.transport._session.SessionStatistics

reassembly_errors_per_source_node_id: Dict[int, Dict[pycyphal.transport.commons.high_overhead_transport._transfer_reassembler.TransferReassembler.Error, int]]

Keys are source node-IDs; values are dicts where keys are error enum members and values are counts.

__eq__(other)[source]
__hash__ = None
__init__(transfers: int = 0, frames: int = 0, payload_bytes: int = 0, errors: int = 0, drops: int = 0, reassembly_errors_per_source_node_id: Dict[int, Dict[pycyphal.transport.commons.high_overhead_transport._transfer_reassembler.TransferReassembler.Error, int]] = <factory>) None[source]
__repr__()[source]
class pycyphal.transport.serial.SerialFrame(priority: 'pycyphal.transport.Priority', transfer_id: 'int', index: 'int', end_of_transfer: 'bool', payload: 'memoryview', source_node_id: 'typing.Optional[int]', destination_node_id: 'typing.Optional[int]', data_specifier: 'pycyphal.transport.DataSpecifier')[source]

Bases: pycyphal.transport.commons.high_overhead_transport._frame.Frame

NODE_ID_MASK = 4095
TRANSFER_ID_MASK = 18446744073709551615
INDEX_MASK = 2147483647
NODE_ID_RANGE = range(0, 4096)
FRAME_DELIMITER_BYTE = 0
NUM_OVERHEAD_BYTES_EXCEPT_DELIMITERS_AND_ESCAPING = 36
source_node_id: Optional[int]
destination_node_id: Optional[int]
data_specifier: pycyphal.transport._data_specifier.DataSpecifier
compile_into(out_buffer: bytearray) memoryview[source]

Compiles the frame into the specified output buffer, escaping the data as necessary. The buffer must be large enough to accommodate the frame header with the payload and CRC, including escape sequences. :returns: View of the memory from the beginning of the buffer until the end of the compiled frame.

static calc_cobs_size(payload_size_bytes: int) int[source]
Returns

worst case COBS-encoded message size for a given payload size.

static parse_from_cobs_image(image: memoryview) Optional[pycyphal.transport.serial._frame.SerialFrame][source]

Delimiters will be stripped if present but they are not required. :returns: Frame or None if the image is invalid.

static parse_from_unescaped_image(header_payload_crc_image: memoryview) Optional[pycyphal.transport.serial._frame.SerialFrame][source]
Returns

Frame or None if the image is invalid.

__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(priority: pycyphal.transport._transfer.Priority, transfer_id: int, index: int, end_of_transfer: bool, payload: memoryview, source_node_id: Optional[int], destination_node_id: Optional[int], data_specifier: pycyphal.transport._data_specifier.DataSpecifier) None[source]
__setattr__(name, value)[source]
class pycyphal.transport.serial.SerialCapture(timestamp: pycyphal.transport._timestamp.Timestamp, fragment: memoryview, own: bool)[source]

Bases: pycyphal.transport._tracer.Capture

Since Cyphal/serial operates on top of unstructured L1 data links, there is no native concept of framing. Therefore, the capture type defines only the timestamp, a raw chunk of bytes, and the direction (RX/TX).

When capturing data from a live interface, it is guaranteed by this library that each capture will contain AT MOST one frame along with the delimiter bytes (at least the last byte of the fragment is zero). When reading data from a file, it is trivial to split the data into frames by looking for the frame separators, which are simply zero bytes.

fragment: memoryview
own: bool

True if the captured fragment was sent by the local transport instance. False if it was received from the port.

__repr__() str[source]

Captures that contain large fragments are truncated and appended with an ellipsis.

static get_transport_type() Type[pycyphal.transport.serial._serial.SerialTransport][source]
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: pycyphal.transport._timestamp.Timestamp, fragment: memoryview, own: bool) None[source]
__setattr__(name, value)[source]
class pycyphal.transport.serial.SerialTracer[source]

Bases: pycyphal.transport._tracer.Tracer

This tracer does not differentiate between input and output traces, but it keeps separate parsers for input and output captures such that there is no RX/TX state conflict. If necessary, the user can distinguish RX/TX traces by checking SerialCapture.direction before invoking update().

Return types from update():

__init__() None[source]
update(cap: pycyphal.transport._tracer.Capture) Optional[pycyphal.transport._tracer.Trace][source]

If the capture encapsulates more than one serialized frame, a ValueError will be raised. To avoid this, always ensure that the captured fragments are split on the frame delimiters (which are simply zero bytes). Captures provided by PyCyphal are always fragmented correctly, but you may need to implement fragmentation manually when reading data from an external file.

class pycyphal.transport.serial.SerialErrorTrace(timestamp: 'pycyphal.transport.Timestamp', error: 'TransferReassembler.Error')[source]

Bases: pycyphal.transport._tracer.ErrorTrace

error: pycyphal.transport.commons.high_overhead_transport._transfer_reassembler.TransferReassembler.Error
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: pycyphal.transport._timestamp.Timestamp, error: pycyphal.transport.commons.high_overhead_transport._transfer_reassembler.TransferReassembler.Error) None[source]
__repr__()[source]
__setattr__(name, value)[source]
class pycyphal.transport.serial.SerialOutOfBandTrace(timestamp: pycyphal.transport._timestamp.Timestamp, data: memoryview)[source]

Bases: pycyphal.transport._tracer.ErrorTrace

Out-of-band data or a malformed frame received. See pycyphal.serial.StreamParser.

data: memoryview
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: pycyphal.transport._timestamp.Timestamp, data: memoryview) None[source]
__repr__()[source]
__setattr__(name, value)[source]