Demo

This section demonstrates how to build Cyphal applications using PyCyphal. It has been tested against GNU/Linux and Windows; it is also expected to work with any other major OS. The document is arranged as follows:

  • In the first section we introduce a couple of custom data types to illustrate how they can be dealt with.

  • The second section shows a simple demo node that implements a temperature controller and provides a custom RPC-service.

  • The third section provides a hands-on illustration of the data distribution functionality of Cyphal with the help of Yakut — a command-line utility for diagnostics and debugging of Cyphal networks.

  • The fourth section adds a second node that simulates the plant whose temperature is controlled by the first one.

  • The last section explains how to perform orchestration and configuration management of Cyphal networks.

You are expected to be familiar with terms like Cyphal node, DSDL, subject-ID, RPC-service. If not, skim through the Cyphal Guide first.

If you want to follow along, install PyCyphal and switch to a new directory (~/pycyphal-demo) before continuing.

DSDL definitions

Every Cyphal application depends on the standard DSDL definitions located in the namespace uavcan. The standard namespace is part of the regulated namespaces maintained by the OpenCyphal project. Grab your copy from git:

git clone https://github.com/OpenCyphal/public_regulated_data_types

The demo relies on two vendor-specific data types located in the root namespace sirius_cyber_corp. The root namespace directory layout is as follows:

sirius_cyber_corp/                              # root namespace directory
    PerformLinearLeastSquaresFit.1.0.dsdl       # service type definition
    PointXY.1.0.dsdl                            # nested message type definition

Type sirius_cyber_corp.PerformLinearLeastSquaresFit.1.0, file sirius_cyber_corp/PerformLinearLeastSquaresFit.1.0.dsdl:

 1# This service accepts a list of 2D point coordinates and returns the best-fit linear function coefficients.
 2# If no solution exists, the returned coefficients are NaN.
 3
 4PointXY.1.0[<64] points
 5
 6@extent 1024 * 8
 7
 8---
 9
10float64 slope
11float64 y_intercept
12
13@extent 64 * 8

Type sirius_cyber_corp.PointXY.1.0, file sirius_cyber_corp/PointXY.1.0.dsdl:

1float16 x
2float16 y
3@sealed

First node

Copy-paste the source code given below into a file named demo_app.py. For the sake of clarity, move the custom DSDL root namespace directory sirius_cyber_corp/ that we created above into custom_data_types/. You should end up with the following directory structure:

pycyphal-demo/
    custom_data_types/
        sirius_cyber_corp/                          # Created in the previous section
            PerformLinearLeastSquaresFit.1.0.dsdl
            PointXY.1.0.dsdl
    public_regulated_data_types/                    # Clone from git
        uavcan/                                     # The standard DSDL namespace
            ...
        ...
    demo_app.py                                     # The thermostat node script

The CYPHAL_PATH environment variable should contain the list of paths where the DSDL root namespace directories are to be found (be sure to modify the values to match your environment):

export CYPHAL_PATH="$HOME/pycyphal-demo/custom_data_types:$HOME/pycyphal-demo/public_regulated_data_types"

Here comes demo_app.py:

  1#!/usr/bin/env python3
  2# Distributed under CC0 1.0 Universal (CC0 1.0) Public Domain Dedication.
  3# pylint: disable=ungrouped-imports,wrong-import-position
  4
  5import os
  6import sys
  7import asyncio
  8import logging
  9import pycyphal  # Importing PyCyphal will automatically install the import hook for DSDL compilation.
 10
 11# DSDL files are automatically compiled by pycyphal import hook from sources pointed by CYPHAL_PATH env variable.
 12import sirius_cyber_corp  # This is our vendor-specific root namespace. Custom data types.
 13import pycyphal.application  # This module requires the root namespace "uavcan" to be transcompiled.
 14
 15# Import other namespaces we're planning to use. Nested namespaces are not auto-imported, so in order to reach,
 16# say, "uavcan.node.Heartbeat", you have to "import uavcan.node".
 17import uavcan.node  # noqa
 18import uavcan.si.sample.temperature  # noqa
 19import uavcan.si.unit.temperature  # noqa
 20import uavcan.si.unit.voltage  # noqa
 21
 22
 23class DemoApp:
 24    REGISTER_FILE = "demo_app.db"
 25    """
 26    The register file stores configuration parameters of the local application/node. The registers can be modified
 27    at launch via environment variables and at runtime via RPC-service "uavcan.register.Access".
 28    The file will be created automatically if it doesn't exist.
 29    """
 30
 31    def __init__(self) -> None:
 32        node_info = uavcan.node.GetInfo_1.Response(
 33            software_version=uavcan.node.Version_1(major=1, minor=0),
 34            name="org.opencyphal.pycyphal.demo.demo_app",
 35        )
 36        # The Node class is basically the central part of the library -- it is the bridge between the application and
 37        # the UAVCAN network. Also, it implements certain standard application-layer functions, such as publishing
 38        # heartbeats and port introspection messages, responding to GetInfo, serving the register API, etc.
 39        # The register file stores the configuration parameters of our node (you can inspect it using SQLite Browser).
 40        self._node = pycyphal.application.make_node(node_info, DemoApp.REGISTER_FILE)
 41
 42        # Published heartbeat fields can be configured as follows.
 43        self._node.heartbeat_publisher.mode = uavcan.node.Mode_1.OPERATIONAL  # type: ignore
 44        self._node.heartbeat_publisher.vendor_specific_status_code = os.getpid() % 100
 45
 46        # Now we can create ports to interact with the network.
 47        # They can also be created or destroyed later at any point after initialization.
 48        # A port is created by specifying its data type and its name (similar to topic names in ROS or DDS).
 49        # The subject-ID is obtained from the standard register named "uavcan.sub.temperature_setpoint.id".
 50        # It can also be modified via environment variable "UAVCAN__SUB__TEMPERATURE_SETPOINT__ID".
 51        self._sub_t_sp = self._node.make_subscriber(uavcan.si.unit.temperature.Scalar_1, "temperature_setpoint")
 52
 53        # As you may probably guess by looking at the port names, we are building a basic thermostat here.
 54        # We subscribe to the temperature setpoint, temperature measurement (process variable), and publish voltage.
 55        # The corresponding registers are "uavcan.sub.temperature_measurement.id" and "uavcan.pub.heater_voltage.id".
 56        self._sub_t_pv = self._node.make_subscriber(uavcan.si.sample.temperature.Scalar_1, "temperature_measurement")
 57        self._pub_v_cmd = self._node.make_publisher(uavcan.si.unit.voltage.Scalar_1, "heater_voltage")
 58
 59        # Create an RPC-server. The service-ID is read from standard register "uavcan.srv.least_squares.id".
 60        # This service is optional: if the service-ID is not specified, we simply don't provide it.
 61        try:
 62            srv_least_sq = self._node.get_server(sirius_cyber_corp.PerformLinearLeastSquaresFit_1, "least_squares")
 63            srv_least_sq.serve_in_background(self._serve_linear_least_squares)
 64        except pycyphal.application.register.MissingRegisterError:
 65            logging.info("The least squares service is disabled by configuration")
 66
 67        # Create another RPC-server using a standard service type for which a fixed service-ID is defined.
 68        # We don't specify the port name so the service-ID defaults to the fixed port-ID.
 69        # We could, of course, use it with a different service-ID as well, if needed.
 70        self._node.get_server(uavcan.node.ExecuteCommand_1).serve_in_background(self._serve_execute_command)
 71
 72        self._node.start()  # Don't forget to start the node!
 73
 74    @staticmethod
 75    async def _serve_linear_least_squares(
 76        request: sirius_cyber_corp.PerformLinearLeastSquaresFit_1.Request,
 77        metadata: pycyphal.presentation.ServiceRequestMetadata,
 78    ) -> sirius_cyber_corp.PerformLinearLeastSquaresFit_1.Response:
 79        logging.info("Least squares request %s from node %d", request, metadata.client_node_id)
 80        sum_x = sum(map(lambda p: p.x, request.points))  # type: ignore
 81        sum_y = sum(map(lambda p: p.y, request.points))  # type: ignore
 82        a = sum_x * sum_y - len(request.points) * sum(map(lambda p: p.x * p.y, request.points))  # type: ignore
 83        b = sum_x * sum_x - len(request.points) * sum(map(lambda p: p.x**2, request.points))  # type: ignore
 84        try:
 85            slope = a / b
 86            y_intercept = (sum_y - slope * sum_x) / len(request.points)
 87        except ZeroDivisionError:
 88            slope = float("nan")
 89            y_intercept = float("nan")
 90        return sirius_cyber_corp.PerformLinearLeastSquaresFit_1.Response(slope=slope, y_intercept=y_intercept)
 91
 92    @staticmethod
 93    async def _serve_execute_command(
 94        request: uavcan.node.ExecuteCommand_1.Request,
 95        metadata: pycyphal.presentation.ServiceRequestMetadata,
 96    ) -> uavcan.node.ExecuteCommand_1.Response:
 97        logging.info("Execute command request %s from node %d", request, metadata.client_node_id)
 98        if request.command == uavcan.node.ExecuteCommand_1.Request.COMMAND_FACTORY_RESET:
 99            try:
100                os.unlink(DemoApp.REGISTER_FILE)  # Reset to defaults by removing the register file.
101            except OSError:  # Do nothing if already removed.
102                pass
103            return uavcan.node.ExecuteCommand_1.Response(uavcan.node.ExecuteCommand_1.Response.STATUS_SUCCESS)
104        return uavcan.node.ExecuteCommand_1.Response(uavcan.node.ExecuteCommand_1.Response.STATUS_BAD_COMMAND)
105
106    async def run(self) -> None:
107        """
108        The main method that runs the business logic. It is also possible to use the library in an IoC-style
109        by using receive_in_background() for all subscriptions if desired.
110        """
111        temperature_setpoint = 0.0
112        temperature_error = 0.0
113
114        def on_setpoint(msg: uavcan.si.unit.temperature.Scalar_1, _: pycyphal.transport.TransferFrom) -> None:
115            nonlocal temperature_setpoint
116            temperature_setpoint = msg.kelvin
117
118        self._sub_t_sp.receive_in_background(on_setpoint)  # IoC-style handler.
119
120        # Expose internal states to external observers for diagnostic purposes. Here, we define read-only registers.
121        # Since they are computed at every invocation, they are never stored in the register file.
122        self._node.registry["thermostat.error"] = lambda: temperature_error
123        self._node.registry["thermostat.setpoint"] = lambda: temperature_setpoint
124
125        # Read application settings from the registry. The defaults will be used only if a new register file is created.
126        gain_p, gain_i, gain_d = self._node.registry.setdefault("thermostat.pid.gains", [0.12, 0.18, 0.01]).floats
127
128        logging.info("Application started with PID gains: %.3f %.3f %.3f", gain_p, gain_i, gain_d)
129        print("Running. Press Ctrl+C to stop.", file=sys.stderr)
130
131        # This loop will exit automatically when the node is close()d. It is also possible to use receive() instead.
132        async for m, _metadata in self._sub_t_pv:
133            assert isinstance(m, uavcan.si.sample.temperature.Scalar_1)
134            temperature_error = temperature_setpoint - m.kelvin
135            voltage_output = temperature_error * gain_p  # Suppose this is a basic P-controller.
136            await self._pub_v_cmd.publish(uavcan.si.unit.voltage.Scalar_1(voltage_output))
137
138    def close(self) -> None:
139        """
140        This will close all the underlying resources down to the transport interface and all publishers/servers/etc.
141        All pending tasks such as serve_in_background()/receive_in_background() will notice this and exit automatically.
142        """
143        self._node.close()
144
145
146async def main() -> None:
147    logging.root.setLevel(logging.INFO)
148    app = DemoApp()
149    try:
150        await app.run()
151    except KeyboardInterrupt:
152        pass
153    finally:
154        app.close()
155
156
157if __name__ == "__main__":
158    asyncio.run(main())

The following graph should give a rough visual overview of how the applications within the demo_app node are structured:

digraph G { subgraph cluster { label = "42:org:opencyphal.pycyphal.demo.demo_app"; node [shape=box] subgraph cluster_5 { label = "least_squares"; least_squares_service[label="sirius_cyber_corp.PerformLinearLeastSquaresFit_1", shape=hexagon, style=filled] sirius_cyber_corp_PerformLinearLeastSquaresFit_1_Request_123[label="123:sirius_cyber_corp.PerformLinearLeastSquaresFit_1.Request", style=filled] sirius_cyber_corp_PerformLinearLeastSquaresFit_1_Response_123[label="123:sirius_cyber_corp.PerformLinearLeastSquaresFit_1.Response", style=filled] } sirius_cyber_corp_PerformLinearLeastSquaresFit_1_Request_123 -> least_squares_service least_squares_service -> sirius_cyber_corp_PerformLinearLeastSquaresFit_1_Response_123 subgraph cluster_4 { label = "heater_voltage"; heater_voltage_node[label="uavcan.si.unit.voltage.Scalar_1", shape=trapezium, style=filled] uavcan_si_unit_voltage_Scalar[label="2347:uavcan.si.unit.voltage.Scalar", style=filled] } heater_voltage_node -> uavcan_si_unit_voltage_Scalar subgraph cluster_3 { label = "temperature_measurement"; uavcan_si_unit_voltage_scalar_2346[label="2346:uavcan.si.unit.voltage.Scalar",style=filled] temperature_measurement_node[label="uavcan.si.sample.temperature.Scalar_1", shape=invtrapezium, style=filled] } uavcan_si_unit_voltage_scalar_2346 -> temperature_measurement_node subgraph cluster_2 { label = "temperature_setpoint"; uavcan_si_sample_temperature_scalar_2345[label="2345:uavcan.si.sample.temperature.Scalar",style=filled] temperature_setpoint_node[label="uavcan.si.unit.temperature.Scalar_1", shape=invtrapezium, style=filled] } uavcan_si_sample_temperature_scalar_2345 -> temperature_setpoint_node subgraph cluster_1 { label = "heartbeat_publisher"; heartbeat_publisher_node[label="uavcan.node.Hearbeat.1.0", shape=trapezium, style=filled] uavcan_node_heartbeat[label="uavcan.node.heartbeat",style=filled] } heartbeat_publisher_node -> uavcan_node_heartbeat } }

digraph G { node [shape=box] message_publisher_node[label="Message-publisher", shape=trapezium, style=filled] message_subscriber_node[label="Message-subscriber", shape=invtrapezium, style=filled] service_node[label="Service", shape=hexagon, style=filled] type_node[label="subject/service id:type", style=filled] }

Legend

If you just run the script as-is, you will notice that it fails with an error referring to some missing registers.

As explained in the comments (and — in great detail — in the Cyphal Specification), registers are basically named values that keep various configuration parameters of the local Cyphal node (application). Some of these parameters are used by the business logic of the application (e.g., PID gains); others are used by the Cyphal stack (e.g., port-IDs, node-ID, transport configuration, logging, and so on). Registers of the latter category are all named with the same prefix uavcan., and their names and semantics are regulated by the Specification to ensure consistency across the ecosystem.

So the application fails with an error that says that it doesn’t know how to reach the Cyphal network it is supposed to be part of because there are no registers to read that information from. We can resolve this by passing the correct register values via environment variables:

export UAVCAN__NODE__ID=42                           # Set the local node-ID 42 (anonymous by default)
export UAVCAN__UDP__IFACE=127.0.0.1                  # Use Cyphal/UDP transport via localhost
export UAVCAN__SUB__TEMPERATURE_SETPOINT__ID=2345    # Subject "temperature_setpoint"    on ID 2345
export UAVCAN__SUB__TEMPERATURE_MEASUREMENT__ID=2346 # Subject "temperature_measurement" on ID 2346
export UAVCAN__PUB__HEATER_VOLTAGE__ID=2347          # Subject "heater_voltage"          on ID 2347
export UAVCAN__SRV__LEAST_SQUARES__ID=123            # Service "least_squares"           on ID 123
export UAVCAN__DIAGNOSTIC__SEVERITY=2                # This is optional to enable logging via Cyphal

python demo_app.py                                   # Run the application!

The snippet is valid for sh/bash/zsh; if you are using PowerShell on Windows, replace export with $env: and take values into double quotes. Further snippets will not include this remark.

An environment variable UAVCAN__SUB__TEMPERATURE_SETPOINT__ID sets register uavcan.sub.temperature_setpoint.id, and so on.

Tip

Specifying the environment variables manually is inconvenient. A better option is to store the configuration you use often into a shell file, and then source that when necessary into your active shell session like source my_env.sh (this is similar to Python virtualenv). See Yakut user manual for practical examples.

In PyCyphal, registers are normally stored in the register file, in our case it’s demo_app.db (the Cyphal Specification does not regulate how the registers are to be stored, this is an implementation detail). Once you started the application with a specific configuration, it will store the values in the register file, so the next time you can run it without passing any environment variables at all.

The registers of any Cyphal node are exposed to other network participants via the standard RPC-services defined in the standard DSDL namespace uavcan.register. This means that other nodes on the network can reconfigure our demo application via Cyphal directly, without the need to resort to any secondary management interfaces. This is equally true for software nodes like our demo application and deeply embedded hardware nodes.

When you execute the commands above, you should see the script running. Leave it running and move on to the next section.

Tip

Just-in-time vs. ahead-of-time DSDL compilation

The script will transpile the required DSDL namespaces just-in-time at launch. While this approach works for some applications, those that are built for redistribution at large (e.g., via PyPI) may benefit from compiling DSDL ahead-of-time (at build time) and including the compilation outputs into the redistributable package. Ahead-of-time DSDL compilation can be trivially implemented in setup.py:

 1#!/usr/bin/env python
 2# Distributed under CC0 1.0 Universal (CC0 1.0) Public Domain Dedication.
 3# type: ignore
 4"""
 5A simplified setup.py demo that shows how to distribute compiled DSDL definitions with Python packages.
 6
 7To use precompiled DSDL files in app, the compilation output directory must be included in path:
 8    compiled_dsdl_dir = pathlib.Path(__file__).resolve().parent / ".demo_dsdl_compiled"
 9    sys.path.insert(0, str(compiled_dsdl_dir))
10"""
11
12import setuptools
13import logging
14import distutils.command.build_py
15from pathlib import Path
16
17NAME = "demo_app"
18
19
20# noinspection PyUnresolvedReferences
21class BuildPy(distutils.command.build_py.build_py):
22    def run(self):
23        import pycyphal
24
25        pycyphal.dsdl.compile_all(
26            [
27                "public_regulated_data_types/uavcan",  # All Cyphal applications need the standard namespace, always.
28                "custom_data_types/sirius_cyber_corp",
29                # "public_regulated_data_types/reg",  # Many applications also need the non-standard regulated DSDL.
30            ],
31            output_directory=Path(self.build_lib, NAME, ".demo_dsdl_compiled"),  # Store in the build output archive.
32        )
33        super().run()
34
35
36logging.basicConfig(level=logging.INFO, format="%(levelname)-3.3s %(name)s: %(message)s")
37
38setuptools.setup(
39    name=NAME,
40    py_modules=["demo_app"],
41    cmdclass={"build_py": BuildPy},
42)

Poking the node using Yakut

The demo is running now so we can interact with it and see how it responds. We could write another script for that using PyCyphal, but in this section we will instead use Yakut — a simple CLI tool for diagnostics and management of Cyphal networks. You will need to open a couple of new terminal sessions now.

If you don’t have Yakut installed on your system yet, install it now by following its documentation.

Yakut also needs to know where the DSDL files are located, this is specified via the same CYPHAL_PATH environment variable (this is a standard variable that many Cyphal tools rely on):

export CYPHAL_PATH="$HOME/pycyphal-demo/custom_data_types:$HOME/pycyphal-demo/public_regulated_data_types"

The commands shown later need to operate on the same network as the demo. Earlier we configured the demo to use Cyphal/UDP via the localhost interface. So, for Yakut, we can export this configuration to let it run on the same network anonymously:

export UAVCAN__UDP__IFACE=127.0.0.1  # We don't export the node-ID, so it will remain anonymous.

To listen to the demo’s heartbeat and diagnostics, launch the following in a new terminal and leave it running (y is a convenience shortcut for yakut):

export CYPHAL_PATH="$HOME/pycyphal-demo/custom_data_types:$HOME/pycyphal-demo/public_regulated_data_types"
export UAVCAN__UDP__IFACE=127.0.0.1
y sub --with-metadata uavcan.node.heartbeat uavcan.diagnostic.record    # You should see heartbeats

Now let’s see how the simple thermostat node operates. Launch another subscriber to see the published voltage command (it is not going to print anything yet):

export CYPHAL_PATH="$HOME/pycyphal-demo/custom_data_types:$HOME/pycyphal-demo/public_regulated_data_types"
export UAVCAN__UDP__IFACE=127.0.0.1
y sub 2347:uavcan.si.unit.voltage.scalar --redraw       # Prints nothing.

And publish the setpoint along with the measurement (process variable):

export CYPHAL_PATH="$HOME/pycyphal-demo/custom_data_types:$HOME/pycyphal-demo/public_regulated_data_types"
export UAVCAN__UDP__IFACE=127.0.0.1
export UAVCAN__NODE__ID=111         # We need a node-ID to publish messages properly
y pub --count=10 2345:uavcan.si.unit.temperature.scalar   250 \
                 2346:uavcan.si.sample.temperature.scalar 'kelvin: 240'

You should see the voltage subscriber that we just started print something along these lines:

---
2347: {volt: 1.1999999284744263}
# And so on...

Okay, the thermostat is working. If you change the setpoint (via subject-ID 2345) or measurement (via subject-ID 2346), you will see the published command messages (subject-ID 2347) update accordingly.

One important feature of the register interface is that it allows one to monitor internal states of the application, which is critical for debugging. In some way it is similar to performance counters or tracing probes:

y r 42 thermostat.error     # Read register

We will see the current value of the temperature error registered by the thermostat. If you run the last command with -dd (d for detailed), you will see the register metadata:

real64:
  value: [10.0]
_meta_: {mutable: false, persistent: false}

mutable: false says that this register cannot be modified and persistent: false says that it is not committed to any persistent storage (like a register file). Together they mean that the value is computed at runtime dynamically.

We can use the very same interface to query or modify the configuration parameters. For example, we can change the PID gains of the thermostat:

y r 42 thermostat.pid.gains       # read current values
y r 42 thermostat.pid.gains 2 0 0 # write new values

Which returns [2.0, 0.0, 0.0], meaning that the new value was assigned successfully. Observe that the register server does implicit type conversion to the type specified by the application (our script). The Cyphal Specification does not require this behavior, though, so some simpler nodes (embedded systems in particular) may just reject mis-typed requests.

If you restart the application now, you will see it use the updated PID gains.

Now let’s try the linear regression service:

# The following commands do the same thing but differ in verbosity/explicitness:
y call 42 123:sirius_cyber_corp.PerformLinearLeastSquaresFit 'points: [{x: 10, y: 3}, {x: 20, y: 4}]'
y q 42 least_squares '[[10, 3], [20, 4]]'

The response should look like:

123: {slope: 0.1, y_intercept: 2.0}

And the diagnostic subscriber we started in the beginning (type uavcan.diagnostic.Record) should print a message.

Second node

To make this tutorial more hands-on, we are going to add another node and make it interoperate with the first one. As the first node implements a basic thermostat, the second one simulates the plant whose temperature is controlled by the thermostat. Put the following into plant.py in the same directory:

 1#!/usr/bin/env python3
 2# Distributed under CC0 1.0 Universal (CC0 1.0) Public Domain Dedication.
 3"""
 4This application simulates the plant controlled by the thermostat node: it takes a voltage command,
 5runs a crude thermodynamics simulation, and publishes the temperature (i.e., one subscription, one publication).
 6"""
 7
 8import time
 9import asyncio
10import pycyphal  # Importing PyCyphal will automatically install the import hook for DSDL compilation.
11
12# Import DSDLs after pycyphal import hook is installed.
13import uavcan.si.unit.voltage
14import uavcan.si.sample.temperature
15import uavcan.time
16from pycyphal.application.heartbeat_publisher import Health
17from pycyphal.application import make_node, NodeInfo, register
18
19
20UPDATE_PERIOD = 0.5
21
22heater_voltage = 0.0
23saturation = False
24
25
26def handle_command(msg: uavcan.si.unit.voltage.Scalar_1, _metadata: pycyphal.transport.TransferFrom) -> None:
27    global heater_voltage, saturation
28    if msg.volt < 0.0:
29        heater_voltage = 0.0
30        saturation = True
31    elif msg.volt > 50.0:
32        heater_voltage = 50.0
33        saturation = True
34    else:
35        heater_voltage = msg.volt
36        saturation = False
37
38
39async def main() -> None:
40    with make_node(NodeInfo(name="org.opencyphal.pycyphal.demo.plant"), "plant.db") as node:
41        # Expose internal states for diagnostics.
42        node.registry["status.saturation"] = lambda: saturation  # The register type will be deduced as "bit[1]".
43
44        # Initialize values from the registry. The temperature is in kelvin because in UAVCAN everything follows SI.
45        # Here, we specify the type explicitly as "real32[1]". If we pass a native float, it would be "real64[1]".
46        temp_environment = float(node.registry.setdefault("model.environment.temperature", register.Real32([292.15])))
47        temp_plant = temp_environment
48
49        # Set up the ports.
50        pub_meas = node.make_publisher(uavcan.si.sample.temperature.Scalar_1, "temperature")
51        pub_meas.priority = pycyphal.transport.Priority.HIGH
52        sub_volt = node.make_subscriber(uavcan.si.unit.voltage.Scalar_1, "voltage")
53        sub_volt.receive_in_background(handle_command)
54
55        # Run the main loop forever.
56        next_update_at = asyncio.get_running_loop().time()
57        while True:
58            # Publish new measurement and update node health.
59            await pub_meas.publish(
60                uavcan.si.sample.temperature.Scalar_1(
61                    timestamp=uavcan.time.SynchronizedTimestamp_1(microsecond=int(time.time() * 1e6)),
62                    kelvin=temp_plant,
63                )
64            )
65            node.heartbeat_publisher.health = Health.ADVISORY if saturation else Health.NOMINAL
66
67            # Sleep until the next iteration.
68            next_update_at += UPDATE_PERIOD
69            await asyncio.sleep(next_update_at - asyncio.get_running_loop().time())
70
71            # Update the simulation.
72            temp_plant += heater_voltage * 0.1 * UPDATE_PERIOD  # Energy input from the heater.
73            temp_plant -= (temp_plant - temp_environment) * 0.05 * UPDATE_PERIOD  # Dissipation.
74
75
76if __name__ == "__main__":
77    try:
78        asyncio.run(main())
79    except KeyboardInterrupt:
80        pass

In graph form, the new node looks as follows:

digraph G { subgraph cluster { label = "43:org:opencyphal.pycyphal.demo.plant"; node [shape=box] subgraph cluster_3 { label = "voltage"; uavcan_si_unit_voltage_scalar_2347[label="2347:uavcan.si.unit.voltage.Scalar",style=filled] voltage_node[label="uavcan.si.sample.voltage.Scalar_1", shape=invtrapezium, style=filled] } uavcan_si_unit_voltage_scalar_2347 -> voltage_node subgraph cluster_2 { label = "temperature"; temperature_setpoint_node[label="uavcan.si.unit.temperature.Scalar_1", shape=trapezium, style=filled] uavcan_si_sample_temperature_scalar_2346[label="2346:uavcan.si.sample.temperature.Scalar",style=filled] } temperature_setpoint_node -> uavcan_si_sample_temperature_scalar_2346 subgraph cluster_1 { label = "heartbeat_publisher"; heartbeat_publisher_node[label="uavcan.node.Hearbeat.1.0", shape=trapezium, style=filled] uavcan_node_heartbeat[label="uavcan.node.heartbeat", style=filled] } heartbeat_publisher_node -> uavcan_node_heartbeat } }

You may launch it if you want, but you will notice that tinkering with registers by way of manual configuration gets old fast. The next section introduces a better way.

Orchestration

Attention

Yakut Orchestrator is in the alpha stage. Breaking changes may be introduced between minor versions until Yakut v1.0 is released. Freeze the minor version number to avoid unexpected changes.

Yakut Orchestrator does not support Windows at the moment.

Manual management of environment variables and node processes may work in simple setups, but it doesn’t really scale. Practical cyber-physical systems require a better way of managing Cyphal networks that may simultaneously include software nodes executed on the local or remote computers along with specialized bare-metal nodes running on dedicated hardware.

One solution to this is Yakut Orchestrator — an interpreter of a simple YAML-based domain-specific language that allows one to define process groups and conveniently manage them as a single entity. The language comes with a user-friendly syntax for managing Cyphal registers. Those familiar with ROS may find it somewhat similar to roslaunch.

The following orchestration file (orc-file) launch.orc.yaml launches the two applications (be sure to stop the first script if it is still running!) along with a couple of diagnostic processes that monitor the network. A setpoint publisher that will command the thermostat to drive the plant to the specified temperature is also started.

The orchestrator runs everything concurrently, but join statements are used to enforce sequential execution as needed. The first process to fail (that is, exit with a non-zero code) will bring down the entire composition. Predicate scripts ?= are allowed to fail though — this is used to implement conditional execution.

The syntax allows the developer to define regular environment variables along with register names. The latter are translated into environment variables when starting a process.

 1#!/usr/bin/env -S yakut --verbose orchestrate
 2# Read the docs about the orc-file syntax: yakut orchestrate --help
 3
 4# Shared environment variables for all nodes/processes (can be overridden or selectively removed in local scopes).
 5CYPHAL_PATH: "./public_regulated_data_types;./custom_data_types"
 6PYCYPHAL_PATH: ".pycyphal_generated"  # This one is optional; the default is "~/.pycyphal".
 7
 8# Shared registers for all nodes/processes (can be overridden or selectively removed in local scopes).
 9# See the docs for pycyphal.application.make_node() to see which registers can be used here.
10uavcan:
11  # Use Cyphal/UDP via localhost:
12  udp.iface: 127.0.0.1
13  # If you have Ncat or some other TCP broker, you can use Cyphal/serial tunneled over TCP (in a heterogeneous
14  # redundant configuration with UDP or standalone). Ncat launch example: ncat --broker --listen --source-port 50905
15  serial.iface: "" # socket://127.0.0.1:50905
16  # It is recommended to explicitly assign unused transports to ensure that previously stored transport
17  # configurations are not accidentally reused:
18  can.iface: ""
19  # Configure diagnostic publishing, too:
20  diagnostic:
21    severity: 2
22    timestamp: true
23
24# Keys with "=" define imperatives rather than registers or environment variables.
25$=:
26- $=:
27  # Wait a bit to let the diagnostic subscriber get ready (it is launched below).
28  - sleep 5
29  - # An empty statement is a join statement -- wait for the previously launched processes to exit before continuing.
30
31  # Launch the demo app that implements the thermostat.
32  - $=: python demo_app.py
33    uavcan:
34      node.id: 42
35      sub.temperature_setpoint.id:    2345
36      sub.temperature_measurement.id: 2346
37      pub.heater_voltage.id:          2347
38      srv.least_squares.id:           0xFFFF    # We don't need this service. Disable by setting an invalid port-ID.
39    thermostat:
40      pid.gains: [0.1, 0, 0]
41
42  # Launch the controlled plant simulator.
43  - $=: python plant.py
44    uavcan:
45      node.id: 43
46      sub.voltage.id:     2347
47      pub.temperature.id: 2346
48    model.environment.temperature: 300.0    # In UAVCAN everything follows SI, so this temperature is in kelvin.
49
50  # Publish the setpoint a few times to show how the thermostat drives the plant to the correct temperature.
51  # You can publish a different setpoint by running this command in a separate terminal to see how the system responds:
52  #   yakut pub 2345 "kelvin: 200"
53  - $=: |
54      yakut pub 2345:uavcan.si.unit.temperature.scalar 450 -N3
55    uavcan.node.id: 100
56
57# Launch diagnostic subscribers to print messages in the terminal that runs the orchestrator.
58- yakut sub --with-metadata uavcan.diagnostic.record 2346:uavcan.si.sample.temperature.scalar
59
60# Exit automatically if STOP_AFTER is defined (frankly, this is just a testing aid, feel free to ignore).
61- ?=: test -n "$STOP_AFTER"
62  $=: sleep $STOP_AFTER && exit 111

Terminate the first node before continuing since it is now managed by the orchestration script we just wrote. Ensure that the node script files are named demo_app.py and plant.py, otherwise the orchestrator won’t find them.

The orc-file can be executed as yakut orc launch.orc.yaml, or simply ./launch.orc.yaml (use --verbose to see which environment variables are passed to each launched process). Having started it, you should see roughly the following output appear in the terminal, indicating that the thermostat is driving the plant towards the setpoint:

---
2346:
  _meta_: {ts_system: 1651773332.157150, ts_monotonic: 3368.421244, source_node_id: 43, transfer_id: 0, priority: high, dtype: uavcan.si.sample.temperature.Scalar.1.0}
  timestamp: {microsecond: 1651773332156343}
  kelvin: 300.0
---
8184:
  _meta_: {ts_system: 1651773332.162746, ts_monotonic: 3368.426840, source_node_id: 42, transfer_id: 0, priority: optional, dtype: uavcan.diagnostic.Record.1.1}
  timestamp: {microsecond: 1651773332159267}
  severity: {value: 2}
  text: 'root: Application started with PID gains: 0.100 0.000 0.000'
---
2346:
  _meta_: {ts_system: 1651773332.157150, ts_monotonic: 3368.421244, source_node_id: 43, transfer_id: 1, priority: high, dtype: uavcan.si.sample.temperature.Scalar.1.0}
  timestamp: {microsecond: 1651773332657040}
  kelvin: 300.0
---
2346:
  _meta_: {ts_system: 1651773332.657383, ts_monotonic: 3368.921476, source_node_id: 43, transfer_id: 2, priority: high, dtype: uavcan.si.sample.temperature.Scalar.1.0}
  timestamp: {microsecond: 1651773333157512}
  kelvin: 300.0
---
2346:
  _meta_: {ts_system: 1651773333.158257, ts_monotonic: 3369.422350, source_node_id: 43, transfer_id: 3, priority: high, dtype: uavcan.si.sample.temperature.Scalar.1.0}
  timestamp: {microsecond: 1651773333657428}
  kelvin: 300.73126220703125
---
2346:
  _meta_: {ts_system: 1651773333.657797, ts_monotonic: 3369.921891, source_node_id: 43, transfer_id: 4, priority: high, dtype: uavcan.si.sample.temperature.Scalar.1.0}
  timestamp: {microsecond: 1651773334157381}
  kelvin: 301.4406433105469
---
2346:
  _meta_: {ts_system: 1651773334.158120, ts_monotonic: 3370.422213, source_node_id: 43, transfer_id: 5, priority: high, dtype: uavcan.si.sample.temperature.Scalar.1.0}
  timestamp: {microsecond: 1651773334657390}
  kelvin: 302.1288757324219
# And so on. Notice how the temperature is rising slowly towards the setpoint at 450 K!

You can run yakut monitor to see what is happening on the network. (Don’t forget to set UAVCAN__UDP__IFACE or similar depending on your transport.)

Tip

macOS

Monitoring the network using yakut monitor, requires using root while preserving your environment variables:

sudo -E yakut monitor

As an exercise, consider this:

  • Run the same composition over CAN by changing the transport configuration registers at the top of the orc-file. The full set of transport-related registers is documented at pycyphal.application.make_transport().

  • Implement saturation management by publishing the saturation flag over a dedicated subject and subscribing to it from the thermostat node.

  • Use Wireshark (capture filter expression: (udp or igmp) and src net 127.9.0.0/16) or candump (like candump -decaxta any) to inspect the network exchange.