Core Classes#
The core classes are the building-blocks for Ultimo. They defined the basic behaviour of the asynchronous iterators that define the application data flow.
ASource Classes#
Ultimo provides a slightly richer API for its iteratable objects. In addition
to being used in async for ...
similar iterator contexts, the iteratable
objects can be asynchronously called to generate an immediate value (eg. by
querying the underlying hardware, or returning a cached value). This protocol
is embodied in the ASource
base class.
The corresponding iterator objects are subclasses of AFlow
and
by default each ASource
has a particular AFlow
subclass
that it creates when iterating.
Ultimo has a number of builtin ASource
subclasses that handle common
cases.
Poll#
The simplest way to create a source is by polling a callable at an interval.
The Poll
class does this. For example:
from machine import RTC
rtc = RTC()
clock = Poll(rtc.datetime, interval=1)
or:
from machine import ADC, Pin
_adc = ADC(Pin(19))
adc = Poll(_adc.read_16, interval=0.01)
The Poll
object expects the callable to be called with no arguments, so more
complex function signatures may need to be wrapped in a helper function,
partial or callable class, as appropriate. The callable can be a regular
synchronous function or asynchronous coroutine, which should ideally avoid
setting any shared state (or carefully use locks if it must).
Poll
objects can also be called, which simply invokes to the underlying
callable.
EventSource#
The alternative to polling is to wait for an asynchronous event. The
EventSource
asbtract class provides the basic infrastructure for
users to subclass by providing an appropriate __call__()
method.
The ThreadSafeSource
class is a subclass that uses a
asyncio.ThreadSafeFlag
instead of an asyncio.Event
.
The event that the iterator waits for the event to be set by regular Python
code calling the fire()
method, or an
interrupt handler (carefully!) setting a asyncio.ThreadSafeFlag
.
For example, the following class provides a class for handling IRQs from a
Pin
:
class Interrupt(ThreadSafeSource):
def __init__(self, pin, trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING):
super().__init__()
self.pin = pin
self.trigger = trigger
async def __aenter__(self):
set_flag = self.event.set
def isr(_):
set_flag()
self.pin.irq(isr, self.trigger)
return self
async def __aexit__(self, *args, **kwargs):
await self.close()
return False
async def __call__(self):
return self.pin()
async def close(self):
self.pin.irq()
As with all interrupt-based code in Micropython, care needs to be taken in
the interrupt handler and the iterator method so that the code is fast,
robust and reentrant. Also note that although interrupt handlers may be
fast, any EventFlow
instances watching the event will be dispatched by
the
Streams#
The ARead
source wraps a readable IO stream (sys.stdin
by default)
and when iterated over returns each byte (or char from a text file) from the
stream until the stream is closed. To help with clean-up, ARead
is also a
async context manager that will close the stream on exit.
Values#
A Value
is an EventSource
which holds a value as state
and fires an event every time the value is updated (either by calling the
instance with the new value, or calling the update()
method).
Iterating over a Value
asynchronously generates the values as they
are changed.
An EasedValue
is a value which when set is transitioned into its new
value over time by an easing formula. The intermediate values will be emitted
by the iterator.
ASink Classes#
Note
The ASink
class is very thin, and most behaviour can be achieved by an
async function with an async for loop. Currently the AWrite
class is
the most compelling reason for this to exist. It’s possible that this
concept may be removed in future versions.
A common pattern for the eventual result of a chain of iterators is a simple async for loop which looks something like:
try:
async for value in source:
await process(value)
except uasyncio.CancelError:
pass
This is common enough that Ultimo provides an ASink
base class which wraps
a source and has a run()
method that asynchronously consumes the source,
calling its process()
method on each generated value, until the source is
exhausted or the task cancelled.
While sinks can be generated with a source provided as an argument, they also
support a pipe-style syntax with the |
operator, so rather than writing:
source = MySource()
sink = MySink(source=source)
uasyncio.create_task(sink.run())
they can be connected using:
sink = MySource() | MySink()
uasyncio.create_task(sink.run())
Consumers#
In many cases, a sink just needs to asyncronously call a function. The
Consumer
class provides a simple ASink
which wraps an asynchronous
callable which is called with the source values as the first argument
by the process()
method:
async def display(value, lcd):
lcd.write_data(value)
lcd = ...
display_sink = Consumer(display, lcd)
There are also helper decorators sink()
and asink()
that wrap functions
and produce factories that produce consumers for the functions:
@sink
def display(value, lcd):
lcd.write_data(value)
lcd = ...
display_sink = display(lcd)
Streams#
The AWrite
source wraps a writable IO stream (sys.stdout
by
default) and consumes a stream of bytes
objects (or
str
for a text stream) which it writes to the stream until the
stream is closed. To help with clean-up, AWrite
is also a async context
manager that will close the stream on exit.
Pipelines#
Note
The APipeline
class exists primarily because Micropython doesn’t
currently support asynchronous generator functions.
Often you want to do some further processing on the raw output from a device.
For example, you may want to convert the data into a more useful format,
smooth a noisy signal, debounce a button press, or de-duplicate a repetitive
iterator. Ultimo provides the APipeline
base class for sources
which transform another source.
These include:
|
Pipeline that applies a callable to each value. |
|
Pipeline that stabilizes polled values emitted for a short time. |
|
Pipeline that ignores repeated values. |
|
Pipeline that smoothes values with an exponentially weighted moving average. |
|
Pipeline that filters values. |
For example, a raw ADC output could be converted to a voltage as follows:
async def voltage(raw_value):
return 3.3 * raw_value / 0xffff
and then this used to wrap the output of an ADC iterator:
adc_volts = Apply(adc, voltage)
More succinctly, the pipe()
decorator can be used
as follows:
@pipe
def voltage(raw_value, max_value=3.3):
return max_value * raw_value / 0xffff
adc_volts = voltage(adc)
There is a similar apipe()
method that accepts an
asynchronous function.
APipeline
subclasses both ASource
and ASink
, so it is both an iteratable and a
has a run()
method that can be used in a task. Just like other ASink
subclasses,
APipeline
classes can use the | operator to compose longer data flows:
display_voltage = ADCSource(26) | voltage() | EWMA(0.2) | display(lcd)