pycyphal.presentation package

Subpackages

Module contents

Presentation layer overview

The presentation layer is responsible for serializing and deserializing DSDL objects and for providing a higher-level object-oriented interface on top of the transport layer. A typical application should not access this layer directly; instead, it should rely on the high-level API provided by pycyphal.application.

The presentation layer uses the term port to refer to an instance of publisher, subscriber, service client, or service server for a specific subject or service (see the inheritance diagram below).

It is possible to create multiple ports that access the same underlying transport layer instance concurrently, taking care of all related data management and synchronization issues automatically. This minimizes the logical coupling between different components of the application that have to rely on the same Cyphal network resource. For example, when the application creates more than one subscriber for a given subject, the presentation layer will distribute received messages into every subscription instance requested by the application. Likewise, different components of the application may publish messages over the same subject or invoke the same service on the same remote server node.

Inheritance diagram for the presentation layer is shown below. Classes named *Impl are not accessible to the user; their instances are managed automatically by the presentation layer controller class. Trivial types may be omitted from the diagram.

Inheritance diagram of pycyphal.presentation._port._publisher, pycyphal.presentation._port._subscriber, pycyphal.presentation._port._server, pycyphal.presentation._port._client, pycyphal.presentation._port._error

Usage example

Attention

A typical application should not instantiate presentation-layer entities directly; instead, use the higher-level API provided by pycyphal.application.

The main entity of the presentation layer is the class pycyphal.presentation.Presentation; the following demo shows how it can be used. This example is based on a simple loopback transport that does not interact with the outside world (it doesn’t perform IO with the OS), which makes it well-suited for demo needs.

>>> import uavcan.node, uavcan.diagnostic         # Import what we need from DSDL-generated packages.
>>> import pycyphal.transport.loopback            # Import the demo transport implementation.
>>> transport = pycyphal.transport.loopback.LoopbackTransport(None)  # Use your real transport instead.
>>> presentation = pycyphal.presentation.Presentation(transport)

Having prepared a presentation layer controller, we can create ports. They are the main points of network access for the application. Let’s start with a publisher and a subscriber:

>>> pub_record = presentation.make_publisher_with_fixed_subject_id(uavcan.diagnostic.Record_1_1)
>>> sub_record = presentation.make_subscriber_with_fixed_subject_id(uavcan.diagnostic.Record_1_1)

Publish a message and receive it also (the loopback transport just returns all outgoing transfers back):

>>> record = uavcan.diagnostic.Record_1_1(
...     severity=uavcan.diagnostic.Severity_1_0(uavcan.diagnostic.Severity_1_0.INFO),
...     text='Neither man nor animal can be influenced by anything but suggestion.')
>>> doctest_await(pub_record.publish(record))  # publish() returns False on timeout.
True
>>> message, metadata = doctest_await(sub_record.receive_for(timeout=0.5))
>>> message.text.tobytes().decode()  # Calling .tobytes().decode() won't be needed when DSDL supports strings natively.
'Neither man nor animal can be influenced by anything but suggestion.'
>>> metadata.transfer_id, metadata.source_node_id, metadata.timestamp
(0, None, Timestamp(system_ns=..., monotonic_ns=...))

We can use custom subject-ID with any data type, even if there is a fixed subject-ID provided (the background is explained in Specification, please read it). Here is an example; we also show here that when a receive call times out, it returns None:

>>> sub_record_custom = presentation.make_subscriber(uavcan.diagnostic.Record_1_1, subject_id=2345)
>>> doctest_await(sub_record_custom.get(timeout=0.5))  # Times out and returns None.

You can see above that the node-ID of the received transfer metadata is None, that’s because it is actually an anonymous transfer, and it is so because our node is an anonymous node; i.e., it doesn’t have a node-ID.

>>> presentation.transport.local_node_id is None    # Yup, it's anonymous.
True

Next we’re going to create a service. Services can’t be used with anonymous nodes (which is natural – how do you send a unicast transfer to an anonymous node?), so we’ll have to create a new transport with a node-ID of its own.

>>> transport = pycyphal.transport.loopback.LoopbackTransport(1234)  # The range of valid values is transport-dependent.
>>> presentation = pycyphal.presentation.Presentation(transport)  # Start anew, this time not anonymous.
>>> presentation.transport.local_node_id
1234

Generally, anonymous nodes are useful in two cases:

  1. You only need to listen and you know that you are not going to emit any transfers (no point tinkering with node-ID if you’re not going to use it anyway).

  2. You need to allocate a node-ID using the plug-and-play autoconfiguration protocol. In this case, you would normally create a transport, run the PnP allocation procedure to obtain a node-ID value from the PnP allocator, and then replace your transport instance with a new one (similar to what we just did here) initialized with the node-ID value provided by the PnP allocator.

Having configured the node-ID, let’s set up a service and invoke it:

>>> async def on_request(request: uavcan.node.ExecuteCommand_1_1.Request,
...                      metadata: pycyphal.presentation.ServiceRequestMetadata) \
...         -> uavcan.node.ExecuteCommand_1_1.Response:
...     print(f'Received command {request.command} from node {metadata.client_node_id}')
...     return uavcan.node.ExecuteCommand_1_1.Response(uavcan.node.ExecuteCommand_1_1.Response.STATUS_BAD_COMMAND)
>>> srv_exec_command = presentation.get_server_with_fixed_service_id(uavcan.node.ExecuteCommand_1_1)
>>> srv_exec_command.serve_in_background(on_request)
>>> client_exec_command = presentation.make_client_with_fixed_service_id(uavcan.node.ExecuteCommand_1_1,
...                                                                      server_node_id=1234)
>>> request_object = uavcan.node.ExecuteCommand_1_1.Request(
...     uavcan.node.ExecuteCommand_1_1.Request.COMMAND_BEGIN_SOFTWARE_UPDATE,
...     '/path/to/the/firmware/image.bin')
>>> received_response, response_transfer = doctest_await(client_exec_command.call(request_object))
Received command 65533 from node 1234
>>> received_response
uavcan.node.ExecuteCommand.Response.1.1(status=3)

Methods that receive data from the network return None on timeout. For example, here we create a client for a nonexistent service; the call times out and returns None:

>>> bad_client = presentation.make_client(uavcan.node.ExecuteCommand_1_1,
...                                       service_id=234,       # There is no such service.
...                                       server_node_id=321)   # There is no such server.
>>> bad_client.response_timeout = 0.1                           # Override the default.
>>> bad_client.priority = pycyphal.transport.Priority.HIGH      # Override the default.
>>> doctest_await(bad_client(request_object))                   # Times out and returns None.
class pycyphal.presentation.Presentation(transport: Transport)[source]

Bases: object

This is the presentation layer controller. It weaves the fabric of peace and maintains balance even when it looks like the darkest of skies spins above.

Methods named make_*() create a new instance upon every invocation. Such instances implement the RAII pattern, managing the life cycle of the underlying resource automatically, so the user does not necessarily have to call close() manually, although it is recommended for determinism.

Methods named get_*() create a new instance only the first time they are invoked for the particular key parameter; the same instance is returned for every subsequent call for the same key parameter until it is manually closed by the caller.

__init__(transport: Transport) None[source]

The presentation controller takes ownership of the supplied transport. When the presentation instance is closed, its transport is also closed (and so are all its sessions).

property output_transfer_id_map: Dict[OutputSessionSpecifier, OutgoingTransferIDCounter][source]

This property is designed for very short-lived processes like CLI tools. Most applications will not benefit from it and should not use it.

Access to the output transfer-ID map allows short-running applications to store/restore the map to/from a persistent storage that retains data across restarts of the application. That may allow applications with very short life cycles (typically under several seconds) to adhere to the transfer-ID computation requirements presented in the specification. If the requirement were to be violated, then upon restart a process using the same node-ID could be unable to initiate communication using same port-ID until the receiving nodes reached the transfer-ID timeout state.

The typical usage pattern is as follows: Upon launch, check if there is a transfer-ID map stored in a predefined location (e.g., a file or a database). If there is, and the storage was last written recently (no point restoring a map that is definitely obsolete), load it and commit to this instance by invoking dict.update() on the object returned by this property. If there isn’t, do nothing. When the application is finished running (e.g., this could be implemented via atexit.register()), access the map via this property and write it to the predefined storage location atomically. Make sure to shard the location by node-ID because nodes that use different node-ID values obviously shall not share their transfer-ID maps. Nodes sharing the same node-ID cannot exist on the same transport, but the local system might be running nodes under the same node-ID on independent networks concurrently, so this may need to be accounted for.

property transport: Transport[source]

Direct reference to the underlying transport instance. The presentation layer instance owns its transport.

property loop: AbstractEventLoop[source]

Deprecated.

make_publisher(dtype: Type[pycyphal.presentation.T], subject_id: int) Publisher[pycyphal.presentation.T][source]

Creates a new publisher instance for the specified subject-ID. All publishers created for a specific subject share the same underlying implementation object which is hidden from the user; the implementation is reference counted and it is destroyed automatically along with its underlying transport level session instance when the last publisher is closed. The publisher instance will be closed automatically from the finalizer when garbage collected if the user did not bother to do that manually. This logic follows the RAII pattern.

See Publisher for further information about publishers.

make_subscriber(dtype: Type[pycyphal.presentation.T], subject_id: int, queue_capacity: int | None = None) Subscriber[pycyphal.presentation.T][source]

Creates a new subscriber instance for the specified subject-ID. All subscribers created for a specific subject share the same underlying implementation object which is hidden from the user; the implementation is reference counted and it is destroyed automatically along with its underlying transport level session instance when the last subscriber is closed. The subscriber instance will be closed automatically from the finalizer when garbage collected if the user did not bother to do that manually. This logic follows the RAII pattern.

By default, the size of the input queue is unlimited; the user may provide a positive integer value to override this. If the user is not reading the received messages quickly enough and the size of the queue is limited (technically, it is always limited at least by the amount of the available memory), the queue may become full in which case newer messages will be dropped and the overrun counter will be incremented once per dropped message.

See Subscriber for further information about subscribers.

make_client(dtype: Type[pycyphal.presentation.T], service_id: int, server_node_id: int) Client[pycyphal.presentation.T][source]

Creates a new client instance for the specified service-ID and the remote server node-ID. The number of such instances can be arbitrary. For example, different tasks may simultaneously create and use client instances invoking the same service on the same server node.

All clients created with a specific combination of service-ID and server node-ID share the same underlying implementation object which is hidden from the user. The implementation instance is reference counted and it is destroyed automatically along with its underlying transport level session instances when its last client is closed. The client instance will be closed automatically from its finalizer when garbage collected if the user did not bother to do that manually. This logic follows the RAII pattern.

See Client for further information about clients.

get_server(dtype: Type[pycyphal.presentation.T], service_id: int) Server[pycyphal.presentation.T][source]

Returns the server instance for the specified service-ID. If such instance does not exist, it will be created. The instance should be used from one task only.

Observe that unlike other sessions, the server instance is returned as-is without any intermediate proxy objects, and this interface does NOT implement the RAII pattern. The server instance will not be garbage collected as long as its presentation layer controller exists, hence it is the responsibility of the user to close unwanted servers manually. However, when the parent presentation layer controller is closed (see close()), all of its session instances are also closed, servers are no exception, so the application does not really have to hunt down every server to terminate a Cyphal stack properly.

See Server for further information about servers.

make_publisher_with_fixed_subject_id(dtype: Type[pycyphal.presentation.T]) Publisher[pycyphal.presentation.T][source]

A wrapper for make_publisher() that uses the fixed subject-ID associated with this type. Raises a TypeError if the type has no fixed subject-ID.

make_subscriber_with_fixed_subject_id(dtype: Type[pycyphal.presentation.T], queue_capacity: int | None = None) Subscriber[pycyphal.presentation.T][source]

A wrapper for make_subscriber() that uses the fixed subject-ID associated with this type. Raises a TypeError if the type has no fixed subject-ID.

make_client_with_fixed_service_id(dtype: Type[pycyphal.presentation.T], server_node_id: int) Client[pycyphal.presentation.T][source]

A wrapper for make_client() that uses the fixed service-ID associated with this type. Raises a TypeError if the type has no fixed service-ID.

get_server_with_fixed_service_id(dtype: Type[pycyphal.presentation.T]) Server[pycyphal.presentation.T][source]

A wrapper for get_server() that uses the fixed service-ID associated with this type. Raises a TypeError if the type has no fixed service-ID.

close() None[source]

Closes the underlying transport instance and all existing session instances. I.e., the application is not required to close every session instance explicitly.

__repr__() str[source]
class pycyphal.presentation.Publisher(impl: pycyphal.presentation.PublisherImpl[pycyphal.presentation.T])[source]

Bases: MessagePort[T]

A task should request its own independent publisher instance from the presentation layer controller. Do not share the same publisher instance across different tasks. This class implements the RAII pattern.

Implementation info: all publishers sharing the same session specifier (i.e., subject-ID) also share the same underlying implementation object containing the transport session which is reference counted and destroyed automatically when the last publisher with that session specifier is closed; the user code cannot access it and generally shouldn’t care. None of the settings of a publisher instance, such as send timeout or priority, can affect other publishers; this does not apply to the transfer-ID counter objects though because they are transport-layer entities and therefore are shared per session specifier.

DEFAULT_SEND_TIMEOUT = 1.0

Default value for send_timeout. The value is an implementation detail, not required by Specification.

__init__(impl: pycyphal.presentation.PublisherImpl[pycyphal.presentation.T])[source]

Do not call this directly! Use Presentation.make_publisher().

property dtype: Type[pycyphal.presentation.T][source]
property transport_session: OutputSession[source]
property transfer_id_counter: OutgoingTransferIDCounter[source]

Allows the caller to reach the transfer-ID counter object of this session (shared per session specifier). This may be useful in certain special cases such as publication of time synchronization messages, where it may be necessary to override the transfer-ID manually.

property priority: Priority[source]

The priority level used for transfers published via this instance. This parameter is configured separately per proxy instance; i.e., it is not shared across different publisher instances under the same session specifier.

property send_timeout: float[source]

Every outgoing transfer initiated via this proxy instance will have to be sent in this amount of time. If the time is exceeded, the attempt is aborted and False is returned. Read the transport layer docs for an in-depth information on send timeout handling. The default is DEFAULT_SEND_TIMEOUT. The publication logic is roughly as follows:

return transport_session.send(message_transfer, self.loop.time() + self.send_timeout)
async publish(message: pycyphal.presentation.T) bool[source]

Serializes and publishes the message object at the priority level selected earlier. Should not be used simultaneously with publish_soon() because that makes the message ordering undefined. Returns False if the publication could not be completed in send_timeout, True otherwise.

publish_soon(message: pycyphal.presentation.T) None[source]

Serializes and publishes the message object at the priority level selected earlier. Does so without blocking (observe that this method is not async). Should not be used simultaneously with publish() because that makes the message ordering undefined. The send timeout is still in effect here – if the operation cannot complete in the selected time, send will be cancelled and a low-severity log message will be emitted.

close() None[source]
__del__() None[source]
class pycyphal.presentation.Subscriber(impl: pycyphal.presentation.SubscriberImpl[pycyphal.presentation.T], queue_capacity: int | None)[source]

Bases: MessagePort[T]

A task should request its own independent subscriber instance from the presentation layer controller. Do not share the same subscriber instance across different tasks. This class implements the RAII pattern.

Whenever a message is received from a subject, it is deserialized once and the resulting object is passed by reference into each subscriber instance. If there is more than one subscriber instance for a subject, accidental mutation of the object by one consumer may affect other consumers. To avoid this, the application should either avoid mutating received message objects or clone them beforehand.

This class implements the async iterator protocol yielding received messages. Iteration stops shortly after the subscriber is closed. It can be used as follows:

async for message, transfer in subscriber:
    ...  # Handle the message.
# The loop will be stopped shortly after the subscriber is closed.

Implementation info: all subscribers sharing the same session specifier also share the same underlying implementation object containing the transport session which is reference counted and destroyed automatically when the last subscriber with that session specifier is closed; the user code cannot access it and generally shouldn’t care.

__init__(impl: pycyphal.presentation.SubscriberImpl[pycyphal.presentation.T], queue_capacity: int | None)[source]

Do not call this directly! Use Presentation.make_subscriber().

receive_in_background(handler: Callable[[pycyphal.presentation.T, TransferFrom], None] | Callable[[pycyphal.presentation.T, TransferFrom], Awaitable[None]]) None[source]

Configures the subscriber to invoke the specified handler whenever a message is received. The handler may be an async callable, or it may return an awaitable, or it may return None (the latter case is that of a regular synchronous function).

If the caller attempts to configure multiple handlers by invoking this method repeatedly, only the last configured handler will be active (the old ones will be forgotten). If the handler throws an exception, it will be suppressed and logged.

This method internally starts a new task. If the subscriber is closed while the task is running, the task will be silently cancelled automatically; the application need not get involved.

This method of handling messages should not be used with the plain async receive API; an attempt to do so may lead to unpredictable message distribution between consumers.

async receive(monotonic_deadline: float) tuple[pycyphal.presentation.T, TransferFrom] | None[source]

Blocks until either a valid message is received, in which case it is returned along with the transfer which delivered it; or until the specified deadline is reached, in which case None is returned. The deadline value is compared against asyncio.AbstractEventLoop.time().

The method will never return None unless the deadline has been exceeded or the session is closed; in order words, a spurious premature return cannot occur.

If the deadline is not in the future, the method will non-blockingly check if there is any data; if there is, it will be returned, otherwise None will be returned immediately. It is guaranteed that no context switch will occur in this case, as if the method was not async.

If an infinite deadline is desired, consider using __aiter__()/__anext__().

async receive_for(timeout: float) tuple[pycyphal.presentation.T, TransferFrom] | None[source]

This is like receive() but with a relative timeout instead of an absolute deadline.

async get(timeout: float = 0) pycyphal.presentation.T | None[source]

A convenience wrapper over receive_for() where the result does not contain the transfer metadata, and the default timeout is zero (which means check for new messages non-blockingly). This method approximates the standard Queue API.

__aiter__() Subscriber[pycyphal.presentation.T][source]

Iterator API support. Returns self unchanged.

async __anext__() tuple[pycyphal.presentation.T, TransferFrom][source]

This is like receive() with an infinite timeout, so it cannot return None.

property dtype: Type[pycyphal.presentation.T][source]
property transport_session: InputSession[source]
sample_statistics() SubscriberStatistics[source]

Returns the statistical counters of this subscriber, including the statistical metrics of the underlying transport session, which is shared across all subscribers with the same session specifier.

close() None[source]
__del__() None[source]
class pycyphal.presentation.Client(impl: pycyphal.presentation.ClientImpl[pycyphal.presentation.T])[source]

Bases: ServicePort[T]

A task should request its own client instance from the presentation layer controller. Do not share the same client instance across different tasks. This class implements the RAII pattern.

Implementation info: all client instances sharing the same session specifier also share the same underlying implementation object containing the transport sessions which is reference counted and destroyed automatically when the last client instance is closed; the user code cannot access it and generally shouldn’t care. None of the settings of a client instance, such as timeout or priority, can affect other client instances; this does not apply to the transfer-ID counter objects though because they are transport-layer entities and therefore are shared per session specifier.

__init__(impl: pycyphal.presentation.ClientImpl[pycyphal.presentation.T])[source]

Do not call this directly! Use Presentation.make_client().

async call(request: object) tuple[object, TransferFrom] | None[source]

Sends the request to the remote server using the pre-configured priority and response timeout parameters. Returns the response along with its transfer info in the case of successful completion. If the server did not provide a valid response on time, returns None.

On certain feature-limited transfers (such as CAN) the call may raise pycyphal.presentation.RequestTransferIDVariabilityExhaustedError if there are too many concurrent requests.

async __call__(request: object) object | None[source]

This is a simpler wrapper over call() that only returns the response object without the metadata.

property response_timeout: float[source]

The response timeout value used for requests emitted via this proxy instance. This parameter is configured separately per proxy instance; i.e., it is not shared across different client instances under the same session specifier, so that, for example, different tasks invoking the same service on the same server node can have different timeout settings. The same value is also used as send timeout for the underlying call to pycyphal.transport.OutputSession.send(). The default value is set according to the recommendations provided in the Specification, which is DEFAULT_SERVICE_REQUEST_TIMEOUT.

property priority: Priority[source]

The priority level used for requests emitted via this proxy instance. This parameter is configured separately per proxy instance; i.e., it is not shared across different client instances under the same session specifier.

property dtype: Type[pycyphal.presentation.T][source]
property transfer_id_counter: OutgoingTransferIDCounter[source]

Allows the caller to reach the transfer-ID counter instance. The instance is shared for clients under the same session. I.e., if there are two clients that use the same service-ID and same server node-ID, they will share the same transfer-ID counter.

property input_transport_session: InputSession[source]
property output_transport_session: OutputSession[source]

The transport session used for request transfers.

sample_statistics() ClientStatistics[source]

The statistics are counted at the hidden implementation instance. Clients that use the same session specifier will have the same set of statistical counters.

close() None[source]
__del__() None[source]
class pycyphal.presentation.Server(dtype: Type[pycyphal.presentation.T], input_transport_session: InputSession, output_transport_session_factory: Callable[[int], OutputSession], finalizer: Callable[[Sequence[Session]], None])[source]

Bases: ServicePort[T]

At most one task can use the server at any given time. The instance must be closed manually to stop the server.

__init__(dtype: Type[pycyphal.presentation.T], input_transport_session: InputSession, output_transport_session_factory: Callable[[int], OutputSession], finalizer: Callable[[Sequence[Session]], None])[source]

Do not call this directly! Use Presentation.get_server().

async serve(handler: Callable[[Any, ServiceRequestMetadata], Awaitable[Any | None]], monotonic_deadline: float | None = None) None[source]

This is like serve_for() except that it exits normally after the specified monotonic deadline is reached. The deadline value is compared against asyncio.AbstractEventLoop.time(). If no deadline is provided, it is assumed to be infinite.

async serve_for(handler: Callable[[Any, ServiceRequestMetadata], Awaitable[Any | None]], timeout: float) None[source]

Listen for requests for the specified time or until the instance is closed, then exit.

When a request is received, the supplied handler callable will be invoked with the request object and the associated metadata object (which contains auxiliary information such as the client’s node-ID). The handler shall return the response or None. If None is returned, the server will not send any response back (this practice is discouraged). If the handler throws an exception, it will be suppressed and logged.

serve_in_background(handler: Callable[[Any, ServiceRequestMetadata], Awaitable[Any | None]]) None[source]

Start a new task and use it to run the server in the background. The task will be stopped when the server is closed.

When a request is received, the supplied handler callable will be invoked with the request object and the associated metadata object (which contains auxiliary information such as the client’s node-ID). The handler shall return the response or None. If None is returned, the server will not send any response back (this practice is discouraged). If the handler throws an exception, it will be suppressed and logged.

If the background task is already running, it will be cancelled and a new one will be started instead. This method of serving requests shall not be used concurrently with other methods.

property send_timeout: float[source]

Every response transfer will have to be sent in this amount of time. If the time is exceeded, the attempt is aborted and a warning is logged. The default value is DEFAULT_SERVICE_REQUEST_TIMEOUT.

sample_statistics() ServerStatistics[source]

Returns the statistical counters of this server instance, including the statistical metrics of the underlying transport sessions.

property dtype: Type[pycyphal.presentation.T][source]
property input_transport_session: InputSession[source]
close() None[source]
class pycyphal.presentation.SubscriberStatistics(transport_session: 'pycyphal.transport.SessionStatistics', messages: 'int', overruns: 'int', deserialization_failures: 'int')[source]

Bases: object

transport_session: SessionStatistics

Shared per session specifier.

messages: int

Number of received messages, individual per subscriber.

overruns: int

Number of messages lost to queue overruns; individual per subscriber.

deserialization_failures: int

Number of messages lost to deserialization errors; shared per session specifier.

__eq__(other)[source]
__hash__ = None
__init__(transport_session: SessionStatistics, messages: int, overruns: int, deserialization_failures: int) None[source]
__match_args__ = ('transport_session', 'messages', 'overruns', 'deserialization_failures')
__repr__()[source]
class pycyphal.presentation.ClientStatistics(request_transport_session: SessionStatistics, response_transport_session: SessionStatistics, sent_requests: int, deserialization_failures: int, unexpected_responses: int)[source]

Bases: object

The counters are maintained at the hidden client instance which is not accessible to the user. As such, clients with the same session specifier will share the same set of statistical counters.

request_transport_session: SessionStatistics
response_transport_session: SessionStatistics
sent_requests: int
deserialization_failures: int

Response transfers that could not be deserialized into a response object.

unexpected_responses: int

Response transfers that could not be matched with a request state.

__eq__(other)[source]
__hash__ = None
__init__(request_transport_session: SessionStatistics, response_transport_session: SessionStatistics, sent_requests: int, deserialization_failures: int, unexpected_responses: int) None[source]
__match_args__ = ('request_transport_session', 'response_transport_session', 'sent_requests', 'deserialization_failures', 'unexpected_responses')
__repr__()[source]
class pycyphal.presentation.ServerStatistics(request_transport_session: 'pycyphal.transport.SessionStatistics', response_transport_sessions: 'typing.Dict[int, pycyphal.transport.SessionStatistics]', served_requests: 'int', deserialization_failures: 'int', malformed_requests: 'int')[source]

Bases: object

request_transport_session: SessionStatistics

There is only one input transport session per server.

response_transport_sessions: Dict[int, SessionStatistics]

This is a mapping keyed by the remote client node-ID value. One transport session per client.

served_requests: int
deserialization_failures: int

Requests that could not be received because of bad input transfers.

malformed_requests: int

Problems at the transport layer.

__eq__(other)[source]
__hash__ = None
__init__(request_transport_session: SessionStatistics, response_transport_sessions: Dict[int, SessionStatistics], served_requests: int, deserialization_failures: int, malformed_requests: int) None[source]
__match_args__ = ('request_transport_session', 'response_transport_sessions', 'served_requests', 'deserialization_failures', 'malformed_requests')
__repr__()[source]
class pycyphal.presentation.ServiceRequestMetadata(timestamp: Timestamp, priority: Priority, transfer_id: int, client_node_id: int)[source]

Bases: object

This structure is supplied with every received request for informational purposes. The application is not required to do anything with it.

timestamp: Timestamp

Timestamp of the first frame of the request transfer.

priority: Priority

Same priority will be used for the response (see Specification).

transfer_id: int

Same transfer-ID will be used for the response (see Specification).

client_node_id: int

The response will be sent back to this node.

__repr__() str[source]
__delattr__(name)[source]
__eq__(other)[source]
__hash__()[source]
__init__(timestamp: Timestamp, priority: Priority, transfer_id: int, client_node_id: int) None[source]
__match_args__ = ('timestamp', 'priority', 'transfer_id', 'client_node_id')
__setattr__(name, value)[source]
class pycyphal.presentation.Port[source]

Bases: pycyphal.presentation.Closable, Generic[T]

The base class for any presentation layer session such as publisher, subscriber, client, or server. The term “port” came to be from <https://forum.opencyphal.org/t/a-generic-term-for-either-subject-or-service/182>.

abstract property dtype: Type[pycyphal.presentation.T][source]

The generated Python class modeling the corresponding DSDL data type.

abstract property port_id: int[source]

The immutable subject-/service-ID of the underlying transport session instance.

abstract __repr__() str[source]
class pycyphal.presentation.MessagePort[source]

Bases: Port[T]

The base class for publishers and subscribers.

abstract property transport_session: Session[source]

The underlying transport session instance. Input for subscribers, output for publishers. One instance per session specifier.

property port_id: int[source]
__repr__() str[source]
class pycyphal.presentation.ServicePort[source]

Bases: Port[T]

abstract property input_transport_session: InputSession[source]

The underlying transport session instance used for the input transfers (requests for servers, responses for clients). One instance per session specifier.

property port_id: int[source]
__repr__() str[source]
class pycyphal.presentation.OutgoingTransferIDCounter[source]

Bases: object

A member of the output transfer-ID map. Essentially this is just a boxed integer. The value is monotonically increasing starting from zero; transport-specific modulus is computed by the underlying transport(s).

__init__() None[source]

Initializes the counter to zero.

get_then_increment() int[source]

Samples the counter with post-increment; i.e., like i++.

override(value: int) None[source]

Assigns a new value. Raises a ValueError if the value is not a non-negative integer.

__repr__() str[source]
exception pycyphal.presentation.PortClosedError[source]

Bases: ResourceClosedError

Raised when an attempt is made to use a presentation-layer session instance that has been closed. Observe that it is a specialization of the corresponding transport-layer error type. Double-close is NOT an error, so closing the same instance twice will not result in this exception being raised.

exception pycyphal.presentation.RequestTransferIDVariabilityExhaustedError[source]

Bases: TransportError

Raised when an attempt is made to invoke more concurrent requests that supported by the transport layer. For CAN, the number is 32; for some transports the number is unlimited (technically, there is always a limit, but for some transports, such as the serial transport, it is unreachable in practice).