Examples#

The following examples show some of the ways Ultimo can be used.

Echo Input to Output#

This example shows how to use a stream to asynchronously read and write. It should work on any device that can connect a serial terminal to micropython standard input and output.

import uasyncio

from ultimo.stream import ARead, AWrite


async def main():
    """Read from standard input and echo to standard output."""

    echo = ARead() | AWrite()
    await uasyncio.gather(echo.create_task())


if __name__ == "__main__":
    # run forever
    uasyncio.run(main())

Download echo.py

Temperature#

This example shows how to smooth data from a source to produce a clean sequence of values. This was written for the Raspberry Pi Pico’s onboard temperature sensor.

This shows how to use the Poll, EWMA, the pipe decorator, and the stream writer.

import uasyncio
from machine import ADC

from ultimo.pipelines import EWMA, pipe
from ultimo.stream import AWrite
from ultimo_machine.gpio import PollADC


@pipe
def u16_to_celcius(value: int) -> float:
    """Convert raw uint16 values to temperatures."""
    return 27 - (3.3 * value / 0xFFFF - 0.706) / 0.001721


@pipe
def format(value: float) -> str:
    """Format a temperature for output."""
    return f"{value:.1f}°C\n"


async def main():
    """Poll values from the temperature sensor and print values as they change."""
    temperature = PollADC(ADC.CORE_TEMP, 1) | u16_to_celcius() | EWMA(0.05)
    write_temperature = temperature | format() | AWrite()
    await uasyncio.gather(write_temperature.create_task())


if __name__ == "__main__":
    # run forever
    uasyncio.run(main())

Download temperature.py

Polled Button#

This example shows how to debounce a noisy digital I/O.

This example expects a button connected pin 19. Adjust appropritely for other set-ups.

import uasyncio
from machine import Pin

from ultimo.core import connect
from ultimo.pipelines import Debounce, Dedup
from ultimo_machine.gpio import PollSignal


async def main(pin_id):
    """Poll values from a button and send an event when the button is pressed."""
    pin_source = PollSignal(pin_id, Pin.PULL_UP, interval=0.1)
    level = pin_source | Debounce() | Dedup()
    task = uasyncio.create_task(connect(level, print))
    await uasyncio.gather(task)


if __name__ == "__main__":
    # run forever
    uasyncio.run(main(19))

Download polled_button.py

Simple Clock#

This example shows how to poll the real-time clock and how to use a Value as a source for multiple pipelines. Output is to stdout.

This should work with any hardware that supports machine.RTC.

import uasyncio
from machine import RTC

from ultimo.core import connect
from ultimo.pipelines import pipe
from ultimo.value import Value
from ultimo.stream import AWrite
from ultimo_machine.time import PollRTC

fields = {
    4: "Hour",
    5: "Minute",
    6: "Second",
}

@pipe
def get_str(dt: tuple[int, ...], index: int):
    return f"{fields[index]:s}: {dt[index]:02d}"


async def main():
    """Poll values from the real-time clock and print values as they change."""
    rtc = PollRTC()
    clock = Value(await rtc())
    output = AWrite()

    update_clock = rtc | clock

    display_hour = clock | get_str(4) | output
    display_minute = clock | get_str(5) | output
    display_second = clock | get_str(6) | output

    # run forever
    await uasyncio.gather(
        update_clock.create_task(),
        display_hour.create_task(),
        display_minute.create_task(),
        display_second.create_task(),
    )


if __name__ == "__main__":
    # run forever
    uasyncio.run(main())

Download clock.py

Potentiometer-PWM LED#

This example shows how to take a noisy data source and produce a clean sequence of values, as well as using that stream to control a pulse-width modulation output. This was written for the Raspberry Pi Pico, which has a fairly noisy onboard ADC.

This example expects a potentiometer connected pin 26, and uses the Raspberry Pi Pico on-board LED. Adjust appropritely for other set-ups.

import uasyncio
from machine import ADC, Pin

from ultimo.core import connect
from ultimo.pipelines import Dedup, pipe
from ultimo_machine.gpio import PollADC, PWMSink


@pipe
def denoise(value):
    """Denoise uint16 values to 6 significant bits."""
    return value & 0xFC00


async def main(potentiometer_pin, led_pin):
    """Poll from a potentiometer, print values and change brightness of LED."""
    level = PollADC(potentiometer_pin, 0.1) | denoise() | Dedup()
    print_level = uasyncio.create_task(connect(level, print))
    led_brightness = level | PWMSink(led_pin, 1000, 0)
    await uasyncio.gather(print_level, led_brightness.create_task())


if __name__ == "__main__":
    # Raspberry Pi Pico pin numbers
    ADC_PIN = 26
    ONBOARD_LED_PIN = 25

    # run forever
    uasyncio.run(main(ADC_PIN, ONBOARD_LED_PIN))

Download potentiometer.py

Motion Sensor Interrupt#

This example shows how to use an IRQ to feed a ThreadSafeSource source, the Hold source, how to connect to a value’s sink, and using the consumer decorator.

This example expects an HC-SR501 or similar motion sensor connected with to pin 22. Adjust appropritely for other set-ups.

import uasyncio
from machine import RTC, Pin

from ultimo.core import sink
from ultimo.value import Hold
from ultimo_machine.gpio import PinInterrupt


@sink
def report(value):
    print(value, RTC().datetime())


async def main(pin_id):
    """Wait for a motion sensor to trigger and print output."""
    async with PinInterrupt(pin_id, Pin.PULL_DOWN) as motion_pin:
        activity = Hold(False)
        update_activity = motion_pin | activity
        report_activity = activity | report()

        update_task = uasyncio.create_task(update_activity.run())
        report_task = uasyncio.create_task(report_activity.run())
        await uasyncio.gather(update_task, report_task)


if __name__ == "__main__":
    # run forever
    uasyncio.run(main(22))

Download motion_interrupt.py

16x2 LCD Clock#

This example shows how to poll the real-time clock and how to use a Value as a source for multiple pipelines, a custom subclass of ATextDevice, and how to write a simple async function that consumes a flow of values.

This example expects I2C to be connected with SDA on pin 4 and SCL on pin 5. Adjust appropritely for other set-ups.

import uasyncio
from machine import I2C, Pin

from ultimo.core import asink
from ultimo.pipelines import Dedup, apipe, pipe
from ultimo.value import Hold, Value
from ultimo_display.text_device import ATextDevice
from ultimo_machine.time import PollRTC

from devices.lcd1602 import LCD1602_RGB


class HD44780TextDevice(ATextDevice):
    """Text devive for HD44780-style lcds."""

    size: tuple[int, int]

    def __init__(self, device):
        self.size = device._size
        self.device = device

    async def display_at(self, text: str, position: tuple[int, int]):
        # need proper lookup table for Unicode -> JIS X 0201 Hitachi variant
        self.device.write_ddram(position, text.encode())

    async def erase(self, length: int, position: tuple[int, int]):
        await self.display_at(" " * length, position)

    async def set_cursor(self, position: tuple[int, int]):
        # doesn't handle 4-line displays
        self.device.cursor = position
        self.device.cursor_on = True

    async def clear_cursor(self):
        self.device.cursor_off = True

    async def clear(self):
        self.device.cursor_off = True
        self.device.clear()


@apipe
async def get_formatted(dt: tuple[int, ...], index: int):
    return f"{dt[index]:02d}"


async def blink_colons(
    clock: Value, text_device: ATextDevice, positions: list[tuple[int, int]]
):
    async for value in clock:
        for position in positions:
            await text_device.display_at(":", position)
        await uasyncio.sleep(0.8)
        for position in positions:
            await text_device.erase(1, position)


async def main(i2c):
    """Poll values from the real-time clock and print values as they change."""

    rgb1602 = LCD1602_RGB(i2c)
    await rgb1602.ainit()
    rgb1602.led_white()
    rgb1602.lcd.display_on = True

    text_device = HD44780TextDevice(rgb1602.lcd)

    rtc = PollRTC()
    clock = Value(await rtc())
    update_clock = rtc | clock
    display_hours = clock | get_formatted(4) | Dedup() | text_device.display_text(0, 0)
    display_minutes = (
        clock | get_formatted(5) | Dedup() | text_device.display_text(0, 3)
    )
    display_seconds = (
        clock | get_formatted(6) | Dedup() | text_device.display_text(0, 6)
    )
    blink_display = blink_colons(clock, text_device, [(2, 0), (5, 0)])

    # run forever
    await uasyncio.gather(
        update_clock.create_task(),
        display_hours.create_task(),
        display_minutes.create_task(),
        display_seconds.create_task(),
        uasyncio.create_task(blink_display),
    )


if __name__ == "__main__":
    SDA = Pin(4)
    SCL = Pin(5)

    i2c = I2C(0, sda=SDA, scl=SCL, freq=400000)

    # run forever
    uasyncio.run(main(i2c))

Download lcd_clock.py

Download devices/lcd1602.py

Download devices/aip31068l.py

Download devices/pca9633.py

Download devices/hd44780.py

16x2 LCD Python Eval#

This example shows how to handle text input from a serial port and display it on a 16x2 LCD panel, and implements a simple Python eval-based calculator. This uses an async function to handle the state of editing a line, evaluating the expression on return, and displaying the result.

For best results, use a terminal emulator or mpremote, rather than Thonny or other line-based terminals.

This example expects I2C to be connected with SDA on pin 4 and SCL on pin 5. Adjust appropritely for other set-ups.

import uasyncio
from machine import I2C, Pin

from ultimo.core import asink
from ultimo.pipelines import Dedup, apipe, pipe
from ultimo.stream import ARead
from ultimo.value import Hold, Value
from ultimo_display.text_device import ATextDevice
from ultimo_machine.time import PollRTC

from devices.hd44780_text_device import HD44780TextDevice
from devices.lcd1602 import LCD1602_RGB


async def main(i2c):
    """Poll values from the real-time clock and print values as they change."""

    rgb1602 = LCD1602_RGB(i2c)
    await rgb1602.ainit()
    rgb1602.led_white()
    rgb1602.lcd.display_on = True
    rgb1602.lcd.blink_on = True

    text_device = HD44780TextDevice(rgb1602.lcd)
    input = ARead()

    # run forever
    await uasyncio.gather(uasyncio.create_task(display_lines(input, text_device)))


async def display_line(display, text, cursor, line=1):
    """Display a single line."""
    if cursor < 8 or len(text) < 16:
        text = text[:16]
        cursor = cursor
    elif cursor > len(text) - 8:
        cursor = cursor - len(text) + 16
        text = text[-16:]
    else:
        text = text[cursor - 8 : cursor + 8]
        cursor = 8
    await display.display_at(f"{text:<16s}", (0, line))
    await display.set_cursor((cursor, line))


async def handle_escape(input):
    """Very simplistic handler to catch ANSI cursor commands."""
    escape = ""
    async for char in input:
        escape += char
        if len(escape) == 2:
            return escape


async def display_lines(input, display):
    """Display result line and editing line in a display."""
    last_line = "Python:"
    current_line = ""
    cursor = 0
    await display_line(display, last_line, 0, 0)
    await display_line(display, current_line, cursor, 1)
    async for char in input:
        if char == "\n":
            try:
                last_line = str(eval(current_line))
            except Exception as exc:
                last_line = str(exc)
            current_line = ""
            cursor = 0
            await display_line(display, last_line, 0, 0)
        elif ord(char) == 0x1B:
            # escape sequence
            escape = await handle_escape(input)
            if escape == "[D":
                # cursor back
                if cursor > 0:
                    cursor -= 1
            elif escape == "[C":
                # cursor forward
                if cursor < len(current_line):
                    cursor += 1
        elif ord(char) == 0x7E:
            # forward delete
            if cursor < len(current_line):
                current_line = current_line[:cursor] + current_line[cursor + 1 :]
        elif ord(char) == 0x7F:
            # backspace
            if cursor > 0:
                current_line = current_line[: cursor - 1] + current_line[cursor:]
                cursor -= 1
        elif ord(char) == 0x08:
            # tab
            current_line = current_line + " " * 4
            cursor += 4
        else:
            current_line = current_line[:cursor] + char + current_line[cursor:]
            cursor += 1
        await display_line(display, current_line, cursor, 1)


if __name__ == "__main__":
    SDA = Pin(4)
    SCL = Pin(5)

    i2c = I2C(0, sda=SDA, scl=SCL, freq=400000)

    # run forever
    uasyncio.run(main(i2c))

Download lcd_input.py

Download devices/hd44780_text_device.py

Download devices/lcd1602.py

Download devices/aip31068l.py

Download devices/pca9633.py

Download devices/hd44780.py