Tutorial#
When writing an application you often want to do multiple things at once. In standard Python there are a number of ways of doing this: multiprocessing, multithreading, and asyncio (plus other, more specialized systems, such as MPI). In Micropython there are fewer choices: multiprocessing and multithreading are either not available or are limited, so asyncio is commonly used, particularly when precise timing is not an issue.
Asyncio Basics#
The primary interface of the asyncio
module for both Python and
Micropython is an loop that schedules Task
instances
to run. The Task
instances can in turn choose to
pause their execution and pass control back to the event loop to allow
another Task
to be scheduled to run.
In this way a number of Task
instances can cooperate,
each being run in turn. This is well-suited to code which spends most of its
time waiting for something to happen (“I/O bound”), rather than heavy
computational code (“CPU bound”).
Tasks#
To create a task you need to create an async
function, which should at
one or more points await
another async function. For example, a task
which waits for a second and then prints something would be created as
follows:
import asyncio
async def slow_hello():
await asyncio.sleep(1.0)
print("Hello world, slowly.")
slow_task = asyncio.create_task(slow_hello())
while a task that waits for only 10 milliseconds, befoer printing would be created with:
async def quick_hello():
await asyncio.sleep(0.01)
print("Hello world, quickly.")
quick_task = asyncio.create_task(quick_hello())
At this point the tasks have been created, but they need to be run. This
is done by running asyncio.gather()
with the tasks:
asyncio.run(asyncio.gather(slow_task, quick_task))
which starts the event loop and waits for the tasks to complete (potentially running forever if they don’t ever return).
Async iterators#
Python and Micropython also have the notion of asynchronous iterables and
iterators: these are objects which can be used in a special async for
loop where they can pause between iterations of the loop. Internally
this is done by implementing the __aiter__()
and
__anext__()
“magic methods”:
class SlowIterator:
def __init__(self, n):
self.n = n
self.i = 0
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
if i >= n:
raise StopAsyncIteration()
else:
await asyncio.sleep(1.0)
self.i += 1
return i
which can the be used as follows in an async
function:
async def use_iterator():
async for i in SlowIterator(10):
print(i)
which can in turn be used to create a Task
.
Python has a very nice way to create asynchronous iterators using asynchronous generator functions. The following is approximately equivalent to the previous example:
async def slow_iterator(n):
for i in range(n):
async yield i
However Micropython doesn’t support asynchronous generators as of this writing. This lack is a primary motivation for Ultimo as a library.
Hardware and Asyncio#
Asynchronous code can greatly simplify hardware access on microcontrollers. For example, the Raspberry Pi Pico has an on-board temperature sensor that can be accessed via the analog-digital converter. Many tutorials show you how to read from it using code that looks something like the following:
from machine import ADC
import time
def temperature():
adc = ADC(ADC.CORE_TEMP)
while True:
# poll the temperature every 10 seconds
time.sleep(10.0)
value = adc.read_u16()
t = 27 - (3.3 * value / 0xFFFF - 0.706) / 0.001721
print(t)
if __name__ == '__main__':
temperature()
but because this is synchronous code the microcontroller can’t do anything else while it is sleeping. For example, let’s say we also wanted to print the current time from the real-time clock. We’d need to interleave these inside the for loop:
from machine import ADC, RTC
import time
def temperature_and_time():
adc = ADC(ADC.CORE_TEMP)
rtc = RTC()
temperature_counter = 0
old_time = None
while True:
# poll the time every 0.1 seconds while waiting for time to change
time.sleep(0.1)
current_time = rtc.datetime()
# only print when time changes
if current_time != old_time:
print(current_time)
old_time = current_time
# check to see if want to print temperature as well
temperature_counter += 1
if temperature_counter == 10:
value = adc.read_u16()
t = 27 - (3.3 * value / 0xFFFF - 0.706) / 0.001721
print(t)
temperature_counter = 0
if __name__ == '__main__':
temperature_and_time()
This is not very pretty, and gets even more difficult to handle if you have more things going on.
We can solve this using asynchronous code:
from machine import ADC, RTC
import asyncio
async def temperature():
adc = ADC(ADC.CORE_TEMP)
while True:
# poll the temperature every second
asyncio.sleep(10.0)
value = adc.read_u16()
t = 27 - (3.3 * value / 0xFFFF - 0.706) / 0.001721
print(t)
async def clock():
rtc = RTC()
old_time = None
while True:
# poll the clock every 100 milliseconds
asyncio.sleep(0.1)
current_time = rtc.datetime()
# only print when time changes
if current_time != old_time:
print(current_time)
old_time = current_time
async def main():
temperature_task = asyncio.create_task(temperature())
clock_task = asyncio.create_task(clock())
await asyncio.gather(temperature_task, clock_task)
if __name__ == '__main__':
asyncio.run(main())
This is very nice, but if you put on your software architect hat, you will notice a lot of similarity between these methods: essentially they are looping forever while the generate a flow of values which are then processed.
Hardware Sources#
Asynchronous iterators provide a very nice way of processing a data flow coming from hardware. The primary thing which the Ultimo library provides is a collection of asynchronous iterators that interact with standard microcontroller hardware. In particular, Ultimo has classes for polling analog-digital converters and the real-time clock. Using these we get:
import asyncio
from ultimo_machine.gpio import PollADC
from ultimo_machine.time import PollRTC
async def temperature():
async for value in PollADC(ADC.CORE_TEMP, 10.0):
t = 27 - (3.3 * value / 0xFFFF - 0.706) / 0.001721
print(t)
async def clock():
old_time = None
async for current_time in PollRTC(0.1):
current_time = rtc.datetime()
if current_time != old_time:
print(current_time)
current_time = old_time
async def main():
temperature_task = asyncio.create_task(temperature())
clock_task = asyncio.create_task(clock())
await asyncio.gather(temperature_task, clock_task)
if __name__ == '__main__':
asyncio.run(main())
Ultimo calls these asynchronous iterators _sources_ and they all subclass
from the ASource
abstract base class. There are
additional sources which come from polling pins, from pin or timer interrupts,
and from streams such as standard input, files and sockets.
For hardware which is not currently wrapped, Ultimo provides a
poll
decorator that can be used to wrap a standard
Micropython function and poll it at a set frequency. For example:
from ultimo.poll import poll
@poll
def noise():
return random.uniform(0.0, 1.0)
async def print_noise():
# print a random value every second
async for value in noise(1.0):
print(value)
Pipelines#
If you look at the clock()
function in the previous example, you
will see that some of its complexity comes from the desire to print the
clock value only when the value changes: we want to de-duplicate consecutive
values.
Similarly, when running the code you may notice that the temperature values are somewhat noisy, and it would be nice to be able to smooth the readings over time.
In addition to the hardware sources, Ultimo has a mechanism to build processing pipelines with streams. Ultimo calls these _pipelines_ and provides a collection of commonly useful operations.
In particular, there is the Dedup
pipeline which
handles removing consecutive duplicates, so we can re-write the
clock()
function as:
from ultimo.pipelines import Dedup
from ultimo_machine.time import PollRTC
async def clock():
async for current_time in Dedup(PollRTC(0.1)):
print(current_time)
There is also the EWMA
pipeline which smooths
values using an exponentially-weighted moving average (which has the
advantage of being efficient to compute). With this we can re-write the
temperature()
function as:
async def temperature():
async for value in EWMA(0.2, PollADC(ADC.CORE_TEMP, 10.0)):
t = 27 - (3.3 * value / 0xFFFF - 0.706) / 0.001721
print(t)
Ultimo provides additional pipelines for filtering, debouncing, and simply applying a function to the data flow.
Pipeline Decorators#
For the cases of applying a function or filtering a flow, Ultimo provides function decorators to make creating a custom pipeline easy.
The computation of the temperature from the raw ADC values could be turned
into a custom filter using the pipe()
decorator:
from ultimo.pipeline import pipe
@pipe
def to_celcius(value):
return 27 - (3.3 * value / 0xFFFF - 0.706) / 0.001721
async def temperature():
async for value in to_celcius(EWMA(0.2, PollADC(ADC.CORE_TEMP, 10.0))):
t = 27 - (3.3 * value / 0xFFFF - 0.706) / 0.001721
print(t)
There is an analagous apipe()
decorator for async
functions. There are similar decorators filter()
and afilter()
that turn a function that produces
boolean values into a filter which supresses values which return False
.
Pipe Notation#
The standard functional notation for building pipelines can be confusing when there are many terms involved. Ultimo provides an alternative notation using the bitwise-or operator as a “pipe” symbol in a way that may be familiar to unix command-line users.
For example, the expression:
to_celcius(EWMA(0.2, PollADC(ADC.CORE_TEMP, 10.0)))
can be re-written as:
PollADC(ADC.CORE_TEMP, 10.0) | EWMA(0.2) | to_celcius()
Values move from left-to-right from the source through subsequent pipelines. This notation makes it clear which attributes belong to which parts of the overall pipeline.
In terms of behaviour, the two notations are equivalent, so which is used is a matter of preference.
Hardware Sinks#
Getting values from hardware is only half the story. We would also like to control hardware from our code, whether turning an LED on, or displaying text on a screen.
Let’s continue our example by assuming that we add a potentiometer to the setup and use it to control a LED’s brightness via pulse-width modulation.
Using an Ultimo hardware source, we would add the following code to our application:
from machine import PWM
# Raspberry Pi Pico pin numbers
ADC_PIN = 26
ONBOARD_LED_PIN = 25
async def led_brightness():
pwm = PWM(ONBOARD_LED_PIN, freq=1000, duty_u16=0)
async for value in PollADC(ADC_PIN, 0.1):
pwm.duty_u16(value)
async def main():
temperature_task = asyncio.create_task(temperature())
clock_task = asyncio.create_task(clock())
led_brightness_task = asyncio.create_task(led_brightness())
await asyncio.gather(temperature_task, clock_task, led_brightness)
Note
The above doesn’t work on the Pico W as the onboard LED isn’t accessible to the PWM hardware. Use a different pin wired to an LED and resistor between 50 and 330 ohms.
Again, if we put on our software architect’s hat we will realize that all tasks which set the pluse-width modulation duty cycle of pin will look very much the same:
async def set_pwm(...):
pwm = PWM(...)
async for value in ...:
pwm.duty_u16(value)
Ultimo provides a class which encapsulates this pattern:
PWMSink
. So rather than writing a dedicated async
function, the PWMSink
class can simply be appended
to the pipeline. Additionally it has a convenience method
create_task
:
async def main():
temperature_task = asyncio.create_task(temperature())
clock_task = asyncio.create_task(clock())
led_brightness = PollADC(ADC_PIN, 0.1) | PWMSink(ONBOARD_LED_PIN, 1000)
led_brightness_task = led_brightness.create_task()
await asyncio.gather(temperature_task, clock_task, led_brightness_task)
This sort of standardized pipeline-end is called a sink by Ultimo, and all
sinks subclass the ASink
abstract base class. In
addition to PWMSink
there are standard sinks
for output to GPIO pins, writeable streams (such as files, sockets and
standard output), and text displays.
Where Ultimo doesn’t yet provide a sink, the sink()
decorator allows you to wrap a standard Micropython function which takes an
input value and consumes it. For example, we could print nicely formatted
Celcius temperatures using:
@sink
def print_celcius(value):
print(f"{value:2.1f}°C")
async def main():
temperature = PollADC(ADC.CORE_TEMP, 10.0) | EWMA(0.2) | to_celcius() | print_celcius()
temperature_task = temperature.create_task()
...
Application State#
While you can get a lot done with data flows from sources to sinks, almost all real applications need to hold some state, whether something as simple as the location of a cursor up to the full engineering logic of a complex app. You may want hardware to do things depending on updates to that state. Often it may be enough to just use the current values of state stored as Micropython objects when updating for other reasons. But sometimes you want to react to changes in the current state.
Ultimo has a Value
source which holds a Python
object and emits a flow of values as that held object changes.
For example, an application which is producing audio might hold the output
volume in a Value
and then have one or more
streams which flow from it: perhaps one to set values on the sound system,
another to display a volume bar in on a screen, or another to set the
brightness of an LED:
@pipe
def text_bar(volume):
bar = ("=" * (volume >> 12))
return f"Vol: {bar:<16s}"
async def main():
# volume is an unsigned 16-bit int
volume = Value(0)
led_brightness = volume | PWMSink(ONBOARD_LED_PIN, 1000)
text_device = ...
volume_bar = volume | text_bar() | text_device.display_text(0, 0)
...
It’s also common for a Value
to be set at the end
of a pipeline, and for this the value provides a dedicated
sink()
method, but also can be used at the end of
a pipeline using the pipe syntax. For example, to control the volume with a
potentiometer, you could have code which looks like:
async def main():
# volume is an unsigned 16-bit int
volume = Value(0)
set_volume = ADCPoll(ADC_PIN, 0.1) | volume
led_brightness = volume | PWMSink(ONBOARD_LED_PIN, 1000)
...
In addition to the simple Value
class, there are
additional value subclasses which smooth value changes using easing functions
and another which holds a value for a set period of time before resetting to
a default.
Conclusion#
As you can see Ultimo provides you with the building-blocks for creating
interfaces which allow you to build applications which smoothly work together.
Since it is built on top of the standard Micropython asyncio
it
interoperates with other async code that you might write. If you need to it
is generally straightforward to write your own sources, sinks and pipelines
with a little understanding of Python and Micropython’s asyncio libraries.