1# Proof-of-concept of a REPL over BLE UART.
2#
3# Tested with the Adafruit Bluefruit app on Android.
4# Set the EoL characters to \r\n.
5
6import bluetooth
7import io
8import os
9import micropython
10import machine
11
12from ble_uart_peripheral import BLEUART
13
14_MP_STREAM_POLL = const(3)
15_MP_STREAM_POLL_RD = const(0x0001)
16
17# TODO: Remove this when STM32 gets machine.Timer.
18if hasattr(machine, "Timer"):
19    _timer = machine.Timer(-1)
20else:
21    _timer = None
22
23# Batch writes into 50ms intervals.
24def schedule_in(handler, delay_ms):
25    def _wrap(_arg):
26        handler()
27
28    if _timer:
29        _timer.init(mode=machine.Timer.ONE_SHOT, period=delay_ms, callback=_wrap)
30    else:
31        micropython.schedule(_wrap, None)
32
33
34# Simple buffering stream to support the dupterm requirements.
35class BLEUARTStream(io.IOBase):
36    def __init__(self, uart):
37        self._uart = uart
38        self._tx_buf = bytearray()
39        self._uart.irq(self._on_rx)
40
41    def _on_rx(self):
42        # Needed for ESP32.
43        if hasattr(os, "dupterm_notify"):
44            os.dupterm_notify(None)
45
46    def read(self, sz=None):
47        return self._uart.read(sz)
48
49    def readinto(self, buf):
50        avail = self._uart.read(len(buf))
51        if not avail:
52            return None
53        for i in range(len(avail)):
54            buf[i] = avail[i]
55        return len(avail)
56
57    def ioctl(self, op, arg):
58        if op == _MP_STREAM_POLL:
59            if self._uart.any():
60                return _MP_STREAM_POLL_RD
61        return 0
62
63    def _flush(self):
64        data = self._tx_buf[0:100]
65        self._tx_buf = self._tx_buf[100:]
66        self._uart.write(data)
67        if self._tx_buf:
68            schedule_in(self._flush, 50)
69
70    def write(self, buf):
71        empty = not self._tx_buf
72        self._tx_buf += buf
73        if empty:
74            schedule_in(self._flush, 50)
75
76
77def start():
78    ble = bluetooth.BLE()
79    uart = BLEUART(ble, name="mpy-repl")
80    stream = BLEUARTStream(uart)
81
82    os.dupterm(stream)
83