Examples#
The following examples show some of the ways Ultimo can be used.
Echo Input to Output#
This example shows how to use a stream to asynchronously read and write. It should work on any device that can connect a serial terminal to micropython standard input and output.
import uasyncio
from ultimo.stream import ARead, AWrite
async def main():
"""Read from standard input and echo to standard output."""
echo = ARead() | AWrite()
await uasyncio.gather(echo.create_task())
if __name__ == "__main__":
# run forever
uasyncio.run(main())
Download echo.py
Temperature#
This example shows how to smooth data from a source to produce a clean sequence of values. This was written for the Raspberry Pi Pico’s onboard temperature sensor.
This shows how to use the Poll, EWMA, the pipe decorator, and the stream writer.
import uasyncio
from machine import ADC
from ultimo.pipelines import EWMA, pipe
from ultimo.stream import AWrite
from ultimo_machine.gpio import PollADC
@pipe
def u16_to_celcius(value: int) -> float:
"""Convert raw uint16 values to temperatures."""
return 27 - (3.3 * value / 0xFFFF - 0.706) / 0.001721
@pipe
def format(value: float) -> str:
"""Format a temperature for output."""
return f"{value:.1f}°C\n"
async def main():
"""Poll values from the temperature sensor and print values as they change."""
temperature = PollADC(ADC.CORE_TEMP, 1) | u16_to_celcius() | EWMA(0.05)
write_temperature = temperature | format() | AWrite()
await uasyncio.gather(write_temperature.create_task())
if __name__ == "__main__":
# run forever
uasyncio.run(main())
Download temperature.py
Simple Clock#
This example shows how to poll the real-time clock and how to use a Value as a source for multiple pipelines. Output is to stdout.
This should work with any hardware that supports machine.RTC
.
import uasyncio
from machine import RTC
from ultimo.core import connect
from ultimo.pipelines import pipe
from ultimo.value import Value
from ultimo.stream import AWrite
from ultimo_machine.time import PollRTC
fields = {
4: "Hour",
5: "Minute",
6: "Second",
}
@pipe
def get_str(dt: tuple[int, ...], index: int):
return f"{fields[index]:s}: {dt[index]:02d}"
async def main():
"""Poll values from the real-time clock and print values as they change."""
rtc = PollRTC()
clock = Value(await rtc())
output = AWrite()
update_clock = rtc | clock
display_hour = clock | get_str(4) | output
display_minute = clock | get_str(5) | output
display_second = clock | get_str(6) | output
# run forever
await uasyncio.gather(
update_clock.create_task(),
display_hour.create_task(),
display_minute.create_task(),
display_second.create_task(),
)
if __name__ == "__main__":
# run forever
uasyncio.run(main())
Download clock.py
Potentiometer-PWM LED#
This example shows how to take a noisy data source and produce a clean sequence of values, as well as using that stream to control a pulse-width modulation output. This was written for the Raspberry Pi Pico, which has a fairly noisy onboard ADC.
This example expects a potentiometer connected pin 26, and uses the Raspberry Pi Pico on-board LED. Adjust appropritely for other set-ups.
import uasyncio
from machine import ADC, Pin
from ultimo.core import connect
from ultimo.pipelines import Dedup, pipe
from ultimo_machine.gpio import PollADC, PWMSink
@pipe
def denoise(value):
"""Denoise uint16 values to 6 significant bits."""
return value & 0xFC00
async def main(potentiometer_pin, led_pin):
"""Poll from a potentiometer, print values and change brightness of LED."""
level = PollADC(potentiometer_pin, 0.1) | denoise() | Dedup()
print_level = uasyncio.create_task(connect(level, print))
led_brightness = level | PWMSink(led_pin, 1000, 0)
await uasyncio.gather(print_level, led_brightness.create_task())
if __name__ == "__main__":
# Raspberry Pi Pico pin numbers
ADC_PIN = 26
ONBOARD_LED_PIN = 25
# run forever
uasyncio.run(main(ADC_PIN, ONBOARD_LED_PIN))
Download potentiometer.py
Motion Sensor Interrupt#
This example shows how to use an IRQ to feed a ThreadSafeSource source, the Hold source, how to connect to a value’s sink, and using the consumer decorator.
This example expects an HC-SR501 or similar motion sensor connected with to pin 22. Adjust appropritely for other set-ups.
import uasyncio
from machine import RTC, Pin
from ultimo.core import sink
from ultimo.value import Hold
from ultimo_machine.gpio import PinInterrupt
@sink
def report(value):
print(value, RTC().datetime())
async def main(pin_id):
"""Wait for a motion sensor to trigger and print output."""
async with PinInterrupt(pin_id, Pin.PULL_DOWN) as motion_pin:
activity = Hold(False)
update_activity = motion_pin | activity
report_activity = activity | report()
update_task = uasyncio.create_task(update_activity.run())
report_task = uasyncio.create_task(report_activity.run())
await uasyncio.gather(update_task, report_task)
if __name__ == "__main__":
# run forever
uasyncio.run(main(22))
Download motion_interrupt.py
16x2 LCD Clock#
This example shows how to poll the real-time clock and how to use a Value as a source for multiple pipelines, a custom subclass of ATextDevice, and how to write a simple async function that consumes a flow of values.
This example expects I2C to be connected with SDA on pin 4 and SCL on pin 5. Adjust appropritely for other set-ups.
import uasyncio
from machine import I2C, Pin
from ultimo.core import asink
from ultimo.pipelines import Dedup, apipe, pipe
from ultimo.value import Hold, Value
from ultimo_display.text_device import ATextDevice
from ultimo_machine.time import PollRTC
from devices.lcd1602 import LCD1602_RGB
class HD44780TextDevice(ATextDevice):
"""Text devive for HD44780-style lcds."""
size: tuple[int, int]
def __init__(self, device):
self.size = device._size
self.device = device
async def display_at(self, text: str, position: tuple[int, int]):
# need proper lookup table for Unicode -> JIS X 0201 Hitachi variant
self.device.write_ddram(position, text.encode())
async def erase(self, length: int, position: tuple[int, int]):
await self.display_at(" " * length, position)
async def set_cursor(self, position: tuple[int, int]):
# doesn't handle 4-line displays
self.device.cursor = position
self.device.cursor_on = True
async def clear_cursor(self):
self.device.cursor_off = True
async def clear(self):
self.device.cursor_off = True
self.device.clear()
@apipe
async def get_formatted(dt: tuple[int, ...], index: int):
return f"{dt[index]:02d}"
async def blink_colons(
clock: Value, text_device: ATextDevice, positions: list[tuple[int, int]]
):
async for value in clock:
for position in positions:
await text_device.display_at(":", position)
await uasyncio.sleep(0.8)
for position in positions:
await text_device.erase(1, position)
async def main(i2c):
"""Poll values from the real-time clock and print values as they change."""
rgb1602 = LCD1602_RGB(i2c)
await rgb1602.ainit()
rgb1602.led_white()
rgb1602.lcd.display_on = True
text_device = HD44780TextDevice(rgb1602.lcd)
rtc = PollRTC()
clock = Value(await rtc())
update_clock = rtc | clock
display_hours = clock | get_formatted(4) | Dedup() | text_device.display_text(0, 0)
display_minutes = (
clock | get_formatted(5) | Dedup() | text_device.display_text(0, 3)
)
display_seconds = (
clock | get_formatted(6) | Dedup() | text_device.display_text(0, 6)
)
blink_display = blink_colons(clock, text_device, [(2, 0), (5, 0)])
# run forever
await uasyncio.gather(
update_clock.create_task(),
display_hours.create_task(),
display_minutes.create_task(),
display_seconds.create_task(),
uasyncio.create_task(blink_display),
)
if __name__ == "__main__":
SDA = Pin(4)
SCL = Pin(5)
i2c = I2C(0, sda=SDA, scl=SCL, freq=400000)
# run forever
uasyncio.run(main(i2c))
Download lcd_clock.py
Download devices/lcd1602.py
Download devices/aip31068l.py
Download devices/pca9633.py
Download devices/hd44780.py
16x2 LCD Python Eval#
This example shows how to handle text input from a serial port and display it on a 16x2 LCD panel, and implements a simple Python eval-based calculator. This uses an async function to handle the state of editing a line, evaluating the expression on return, and displaying the result.
For best results, use a terminal emulator or mpremote, rather than Thonny or other line-based terminals.
This example expects I2C to be connected with SDA on pin 4 and SCL on pin 5. Adjust appropritely for other set-ups.
import uasyncio
from machine import I2C, Pin
from ultimo.core import asink
from ultimo.pipelines import Dedup, apipe, pipe
from ultimo.stream import ARead
from ultimo.value import Hold, Value
from ultimo_display.text_device import ATextDevice
from ultimo_machine.time import PollRTC
from devices.hd44780_text_device import HD44780TextDevice
from devices.lcd1602 import LCD1602_RGB
async def main(i2c):
"""Poll values from the real-time clock and print values as they change."""
rgb1602 = LCD1602_RGB(i2c)
await rgb1602.ainit()
rgb1602.led_white()
rgb1602.lcd.display_on = True
rgb1602.lcd.blink_on = True
text_device = HD44780TextDevice(rgb1602.lcd)
input = ARead()
# run forever
await uasyncio.gather(uasyncio.create_task(display_lines(input, text_device)))
async def display_line(display, text, cursor, line=1):
"""Display a single line."""
if cursor < 8 or len(text) < 16:
text = text[:16]
cursor = cursor
elif cursor > len(text) - 8:
cursor = cursor - len(text) + 16
text = text[-16:]
else:
text = text[cursor - 8 : cursor + 8]
cursor = 8
await display.display_at(f"{text:<16s}", (0, line))
await display.set_cursor((cursor, line))
async def handle_escape(input):
"""Very simplistic handler to catch ANSI cursor commands."""
escape = ""
async for char in input:
escape += char
if len(escape) == 2:
return escape
async def display_lines(input, display):
"""Display result line and editing line in a display."""
last_line = "Python:"
current_line = ""
cursor = 0
await display_line(display, last_line, 0, 0)
await display_line(display, current_line, cursor, 1)
async for char in input:
if char == "\n":
try:
last_line = str(eval(current_line))
except Exception as exc:
last_line = str(exc)
current_line = ""
cursor = 0
await display_line(display, last_line, 0, 0)
elif ord(char) == 0x1B:
# escape sequence
escape = await handle_escape(input)
if escape == "[D":
# cursor back
if cursor > 0:
cursor -= 1
elif escape == "[C":
# cursor forward
if cursor < len(current_line):
cursor += 1
elif ord(char) == 0x7E:
# forward delete
if cursor < len(current_line):
current_line = current_line[:cursor] + current_line[cursor + 1 :]
elif ord(char) == 0x7F:
# backspace
if cursor > 0:
current_line = current_line[: cursor - 1] + current_line[cursor:]
cursor -= 1
elif ord(char) == 0x08:
# tab
current_line = current_line + " " * 4
cursor += 4
else:
current_line = current_line[:cursor] + char + current_line[cursor:]
cursor += 1
await display_line(display, current_line, cursor, 1)
if __name__ == "__main__":
SDA = Pin(4)
SCL = Pin(5)
i2c = I2C(0, sda=SDA, scl=SCL, freq=400000)
# run forever
uasyncio.run(main(i2c))
Download lcd_input.py
Download devices/hd44780_text_device.py
Download devices/lcd1602.py
Download devices/aip31068l.py
Download devices/pca9633.py
Download devices/hd44780.py