pycyphal.transport.serial package

Module contents

Cyphal/serial transport overview

The Cyphal/serial transport is designed for byte-level communication channels, such as:

  • TCP/IP

  • UART, RS-422/232

  • USB CDC ACM

It may also be suited for raw transport log storage.

This transport module contains no media sublayers because the media abstraction is handled directly by the PySerial library and the underlying operating system.

For the full protocol definition, please refer to the Cyphal Specification.

Forward error correction (FEC)

This transport supports optional FEC through full duplication of transfers. This feature is discussed in detail in the documentation for the UDP transport pycyphal.transport.udp.

Usage

>>> import asyncio
>>> import pycyphal
>>> import pycyphal.transport.serial
>>> tr = pycyphal.transport.serial.SerialTransport('loop://', local_node_id=1234, baudrate=115200)
>>> tr.local_node_id
1234
>>> tr.serial_port.baudrate
115200
>>> pm = pycyphal.transport.PayloadMetadata(1024)
>>> ds = pycyphal.transport.MessageDataSpecifier(2345)
>>> pub = tr.get_output_session(pycyphal.transport.OutputSessionSpecifier(ds, None), pm)
>>> sub = tr.get_input_session(pycyphal.transport.InputSessionSpecifier(ds, None), pm)
>>> doctest_await(pub.send(pycyphal.transport.Transfer(pycyphal.transport.Timestamp.now(),
...                                                    pycyphal.transport.Priority.LOW,
...                                                    1111,
...                                                    fragmented_payload=[]),
...                        asyncio.get_event_loop().time() + 1.0))
True
>>> doctest_await(sub.receive(asyncio.get_event_loop().time() + 1.0))
TransferFrom(..., transfer_id=1111, ...)
>>> tr.close()

Tooling

Serial data logging

The underlying PySerial library provides a convenient method of logging exchange through a serial port into a file. To invoke this feature, embed the name of the serial port into the URI spy:///dev/ttyUSB0?file=dump.txt, where /dev/ttyUSB0 is the name of the serial port, dump.txt is the name of the log file.

TCP/IP tunneling

For testing or experimentation it is often convenient to use a virtual link instead of a real one. The underlying PySerial library supports tunneling of raw serial data over TCP connections, which can be leveraged for local testing without accessing any physical serial ports. This option can be accessed by specifying the URI of the form socket://<address>:<port> instead of a real serial port name when establishing the connection.

The location specified in the URL must point to the TCP server port that will forward the data to and from the other end of the link. While such a server can be trivially coded manually by the developer, it is possible to avoid the effort by relying on the TCP connection brokering mode available in Ncat (which is a part of the Nmap project, thanks Fyodor).

For example, one could set up the TCP broker as follows (add -v to see what’s happening; more info at https://nmap.org/ncat/guide/ncat-broker.html) (the port number is chosen at random here):

ncat --broker --listen -p 50905

And then use a serial transport with socket://127.0.0.1:50905 (N.B.: using localhost may significantly increase initialization latency on Windows due to slow DNS lookup). All nodes whose transports are configured like that will be able to communicate with each other, as if they were connected to the same bus. Essentially, this can be seen as a virtualized RS-485 bus, where same concerns regarding medium access coordination apply.

The location of the URI doesn’t have to be local, of course – one can use this approach to link Cyphal nodes via conventional IP networks.

The exchange over the virtual bus can be dumped trivially for analysis:

nc localhost 50905 > dump.bin

Inheritance diagram

Inheritance diagram of pycyphal.transport.serial._serial, pycyphal.transport.serial._frame, pycyphal.transport.serial._session._base, pycyphal.transport.serial._session._input, pycyphal.transport.serial._session._output, pycyphal.transport.serial._tracer

class pycyphal.transport.serial.SerialTransport(serial_port: str | SerialBase, local_node_id: int | None, *, mtu: int = 1073741824, service_transfer_multiplier: int = 2, baudrate: int | None = None, loop: AbstractEventLoop | None = None)[source]

Bases: Transport

The Cyphal/Serial transport is designed for OSI L1 byte-level serial links and tunnels, such as UART, RS-422/485/232 (duplex), USB CDC ACM, TCP/IP, etc. Please read the module documentation for details.

TRANSFER_ID_MODULO = 18446744073709551616
VALID_MTU_RANGE = (1024, 1073741824)

The maximum MTU is practically unlimited, and it is also the default MTU. This is by design to ensure that all frames are single-frame transfers. Compliant implementations of the serial transport do not have to support multi-frame transfers, which removes the greatest chunk of complexity from the protocol.

DEFAULT_SERVICE_TRANSFER_MULTIPLIER = 2
VALID_SERVICE_TRANSFER_MULTIPLIER_RANGE = (1, 5)
__init__(serial_port: str | SerialBase, local_node_id: int | None, *, mtu: int = 1073741824, service_transfer_multiplier: int = 2, baudrate: int | None = None, loop: AbstractEventLoop | None = None)[source]
Parameters:
  • serial_port

    The serial port instance to communicate over, or its name. In the latter case, the port will be constructed via serial.serial_for_url() (refer to the PySerial docs for the background). The new instance takes ownership of the port; when the instance is closed, its port will also be closed. Examples:

    • /dev/ttyACM0 – a regular serial port on GNU/Linux (USB CDC ACM in this example).

    • COM9 – likewise, on Windows.

    • /dev/serial/by-id/usb-Black_Sphere_Technologies_Black_Magic_Probe_B5DCABF5-if02 – a regular USB CDC ACM port referenced by the device name and ID (GNU/Linux).

    • hwgrep:///dev/serial/by-id/*Black_Magic_Probe*-if02 – glob instead of exact name.

    • socket://127.0.0.1:50905 – a TCP/IP tunnel instead of a physical port.

    • spy://COM3?file=dump.txt – open a regular port and dump all data exchange into a text file.

    Read the PySerial docs for more info.

  • local_node_id – The node-ID to use. Can’t be changed after initialization. None means that the transport will operate in the anonymous mode.

  • mtu

    Use single-frame transfers for all outgoing transfers containing not more than than this many bytes of payload. Otherwise, use multi-frame transfers.

    By default, the MTU is virtually unlimited (to be precise, it is set to a very large number that is unattainable in practice), meaning that all transfers will be single-frame transfers. Such behavior is optimal for the serial transport because it does not have native framing and as such it supports frames of arbitrary sizes. Implementations may omit the support for multi-frame transfers completely, which removes the greatest chunk of complexity from the protocol.

    This setting does not affect transfer reception – the RX MTU is always set to the maximum valid MTU (i.e., practically unlimited).

  • service_transfer_multiplier – Forward error correction for service transfers. This parameter specifies the number of times each outgoing service transfer will be repeated. This setting does not affect message transfers.

  • baudrate – If not None, the specified baud rate will be configured on the serial port. Otherwise, the baudrate will be left unchanged.

  • loop – Deprecated.

property protocol_parameters: ProtocolParameters[source]
property local_node_id: int | None[source]
close() None[source]
get_input_session(specifier: InputSessionSpecifier, payload_metadata: PayloadMetadata) SerialInputSession[source]
get_output_session(specifier: OutputSessionSpecifier, payload_metadata: PayloadMetadata) SerialOutputSession[source]
property input_sessions: Sequence[SerialInputSession][source]
property output_sessions: Sequence[SerialOutputSession][source]
property serial_port: SerialBase[source]
sample_statistics() SerialTransportStatistics[source]
begin_capture(handler: Callable[[Capture], None]) None[source]

The reported events are of type SerialCapture, please read its documentation for details. The events may be reported from a different thread (use locks).

property capture_active: bool[source]
static make_tracer() SerialTracer[source]

See SerialTracer.

async spoof(transfer: AlienTransfer, monotonic_deadline: float) bool[source]

Spoofing over the serial transport is trivial and it does not involve reconfiguration of the media layer. It can be invoked at no cost at any time (unlike, say, Cyphal/UDP). See the overridden method pycyphal.transport.Transport.spoof() for details.

Notice that if the transport operates over the virtual loopback port loop:// with capture enabled, every spoofed frame will be captured twice: one TX, one RX. Same goes for regular transfers.

class pycyphal.transport.serial.SerialTransportStatistics(in_bytes: int = 0, in_frames: int = 0, in_out_of_band_bytes: int = 0, out_bytes: int = 0, out_frames: int = 0, out_transfers: int = 0, out_incomplete: int = 0)[source]

Bases: TransportStatistics

in_bytes: int = 0
in_frames: int = 0
in_out_of_band_bytes: int = 0
out_bytes: int = 0
out_frames: int = 0
out_transfers: int = 0
out_incomplete: int = 0
__eq__(other)[source]
__hash__ = None
__init__(in_bytes: int = 0, in_frames: int = 0, in_out_of_band_bytes: int = 0, out_bytes: int = 0, out_frames: int = 0, out_transfers: int = 0, out_incomplete: int = 0) None[source]
__match_args__ = ('in_bytes', 'in_frames', 'in_out_of_band_bytes', 'out_bytes', 'out_frames', 'out_transfers', 'out_incomplete')
__repr__()[source]
class pycyphal.transport.serial.SerialSession(finalizer: Callable[[], None])[source]

Bases: object

__init__(finalizer: Callable[[], None])[source]
close() None[source]
class pycyphal.transport.serial.SerialInputSession(specifier: InputSessionSpecifier, payload_metadata: PayloadMetadata, finalizer: Callable[[], None])[source]

Bases: SerialSession, InputSession

DEFAULT_TRANSFER_ID_TIMEOUT = 2.0

Units are seconds. Can be overridden after instantiation if needed.

__init__(specifier: InputSessionSpecifier, payload_metadata: PayloadMetadata, finalizer: Callable[[], None])[source]

Do not call this directly. Instead, use the factory method pycyphal.transport.serial.SerialTransport.get_input_session().

async receive(monotonic_deadline: float) TransferFrom | None[source]
property transfer_id_timeout: float[source]
property specifier: InputSessionSpecifier[source]
property payload_metadata: PayloadMetadata[source]
sample_statistics() SerialInputSessionStatistics[source]
class pycyphal.transport.serial.SerialOutputSession(specifier: OutputSessionSpecifier, payload_metadata: PayloadMetadata, mtu: int, local_node_id: int | None, send_handler: Callable[[List[SerialFrame], float], Awaitable[Timestamp | None]], finalizer: Callable[[], None])[source]

Bases: SerialSession, OutputSession

__init__(specifier: OutputSessionSpecifier, payload_metadata: PayloadMetadata, mtu: int, local_node_id: int | None, send_handler: Callable[[List[SerialFrame], float], Awaitable[Timestamp | None]], finalizer: Callable[[], None])[source]

Do not call this directly. Instead, use the factory method pycyphal.transport.serial.SerialTransport.get_output_session().

async send(transfer: Transfer, monotonic_deadline: float) bool[source]
enable_feedback(handler: Callable[[Feedback], None]) None[source]
disable_feedback() None[source]
property specifier: OutputSessionSpecifier[source]
property payload_metadata: PayloadMetadata[source]
sample_statistics() SessionStatistics[source]
close() None[source]
class pycyphal.transport.serial.SerialFeedback(original_transfer_timestamp: Timestamp, first_frame_transmission_timestamp: Timestamp)[source]

Bases: Feedback

__init__(original_transfer_timestamp: Timestamp, first_frame_transmission_timestamp: Timestamp)[source]
property original_transfer_timestamp: Timestamp[source]
property first_frame_transmission_timestamp: Timestamp[source]
class pycyphal.transport.serial.SerialInputSessionStatistics(transfers: 'int' = 0, frames: 'int' = 0, payload_bytes: 'int' = 0, errors: 'int' = 0, drops: 'int' = 0, reassembly_errors_per_source_node_id: 'typing.Dict[int, typing.Dict[TransferReassembler.Error, int]]' = <factory>)[source]

Bases: SessionStatistics

reassembly_errors_per_source_node_id: Dict[int, Dict[Error, int]]

Keys are source node-IDs; values are dicts where keys are error enum members and values are counts.

__eq__(other)[source]
__hash__ = None
__init__(transfers: int = 0, frames: int = 0, payload_bytes: int = 0, errors: int = 0, drops: int = 0, reassembly_errors_per_source_node_id: ~typing.Dict[int, ~typing.Dict[~pycyphal.transport.commons.high_overhead_transport._transfer_reassembler.TransferReassembler.Error, int]] = <factory>) None[source]
__match_args__ = ('transfers', 'frames', 'payload_bytes', 'errors', 'drops', 'reassembly_errors_per_source_node_id')
__repr__()[source]
class pycyphal.transport.serial.SerialFrame(priority: 'pycyphal.transport.Priority', transfer_id: 'int', index: 'int', end_of_transfer: 'bool', payload: 'memoryview', source_node_id: 'typing.Optional[int]', destination_node_id: 'typing.Optional[int]', data_specifier: 'pycyphal.transport.DataSpecifier', user_data: 'int')[source]

Bases: Frame

VERSION = 1
NODE_ID_MASK = 65535
TRANSFER_ID_MASK = 18446744073709551615
INDEX_MASK = 2147483647
NUM_OVERHEAD_BYTES_EXCEPT_DELIMITERS_AND_ESCAPING = 24
NODE_ID_RANGE = range(0, 65535)
FRAME_DELIMITER_BYTE = 0
source_node_id: int | None
destination_node_id: int | None
data_specifier: DataSpecifier
user_data: int
compile_into(out_buffer: bytearray) memoryview[source]

Compiles the frame into the specified output buffer, escaping the data as necessary. The buffer must be large enough to accommodate the frame header with the payload and CRC, including escape sequences. :returns: View of the memory from the beginning of the buffer until the end of the compiled frame.

static calc_cobs_size(payload_size_bytes: int) int[source]
Returns:

worst case COBS-encoded message size for a given payload size.

static parse_from_cobs_image(image: memoryview) SerialFrame | None[source]

Delimiters will be stripped if present but they are not required. :returns: Frame or None if the image is invalid.

static parse_from_unescaped_image(image: memoryview) SerialFrame | None[source]
Returns:

Frame or None if the image is invalid.

__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(priority: Priority, transfer_id: int, index: int, end_of_transfer: bool, payload: memoryview, source_node_id: int | None, destination_node_id: int | None, data_specifier: DataSpecifier, user_data: int) None[source]
__match_args__ = ('priority', 'transfer_id', 'index', 'end_of_transfer', 'payload', 'source_node_id', 'destination_node_id', 'data_specifier', 'user_data')
__setattr__(name, value)[source]
class pycyphal.transport.serial.SerialCapture(timestamp: Timestamp, fragment: memoryview, own: bool)[source]

Bases: Capture

Since Cyphal/serial operates on top of unstructured L1 data links, there is no native concept of framing. Therefore, the capture type defines only the timestamp, a raw chunk of bytes, and the direction (RX/TX).

When capturing data from a live interface, it is guaranteed by this library that each capture will contain AT MOST one frame along with the delimiter bytes (at least the last byte of the fragment is zero). When reading data from a file, it is trivial to split the data into frames by looking for the frame separators, which are simply zero bytes.

fragment: memoryview
own: bool

True if the captured fragment was sent by the local transport instance. False if it was received from the port.

__repr__() str[source]

Captures that contain large fragments are truncated and appended with an ellipsis.

static get_transport_type() Type[SerialTransport][source]
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: Timestamp, fragment: memoryview, own: bool) None[source]
__match_args__ = ('timestamp', 'fragment', 'own')
__setattr__(name, value)[source]
class pycyphal.transport.serial.SerialTracer[source]

Bases: Tracer

This tracer does not differentiate between input and output traces, but it keeps separate parsers for input and output captures such that there is no RX/TX state conflict. If necessary, the user can distinguish RX/TX traces by checking SerialCapture.direction before invoking update().

Return types from update():

__init__() None[source]
update(cap: Capture) Trace | None[source]

If the capture encapsulates more than one serialized frame, a ValueError will be raised. To avoid this, always ensure that the captured fragments are split on the frame delimiters (which are simply zero bytes). Captures provided by PyCyphal are always fragmented correctly, but you may need to implement fragmentation manually when reading data from an external file.

class pycyphal.transport.serial.SerialErrorTrace(timestamp: 'pycyphal.transport.Timestamp', error: 'TransferReassembler.Error')[source]

Bases: ErrorTrace

error: Error
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: Timestamp, error: Error) None[source]
__match_args__ = ('timestamp', 'error')
__repr__()[source]
__setattr__(name, value)[source]
class pycyphal.transport.serial.SerialOutOfBandTrace(timestamp: Timestamp, data: memoryview)[source]

Bases: ErrorTrace

Out-of-band data or a malformed frame received. See pycyphal.serial.StreamParser.

data: memoryview
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: Timestamp, data: memoryview) None[source]
__match_args__ = ('timestamp', 'data')
__repr__()[source]
__setattr__(name, value)[source]