1# Proof-of-concept of a REPL over BLE UART. 2# 3# Tested with the Adafruit Bluefruit app on Android. 4# Set the EoL characters to \r\n. 5 6import bluetooth 7import io 8import os 9import micropython 10import machine 11 12from ble_uart_peripheral import BLEUART 13 14_MP_STREAM_POLL = const(3) 15_MP_STREAM_POLL_RD = const(0x0001) 16 17# TODO: Remove this when STM32 gets machine.Timer. 18if hasattr(machine, "Timer"): 19 _timer = machine.Timer(-1) 20else: 21 _timer = None 22 23# Batch writes into 50ms intervals. 24def schedule_in(handler, delay_ms): 25 def _wrap(_arg): 26 handler() 27 28 if _timer: 29 _timer.init(mode=machine.Timer.ONE_SHOT, period=delay_ms, callback=_wrap) 30 else: 31 micropython.schedule(_wrap, None) 32 33 34# Simple buffering stream to support the dupterm requirements. 35class BLEUARTStream(io.IOBase): 36 def __init__(self, uart): 37 self._uart = uart 38 self._tx_buf = bytearray() 39 self._uart.irq(self._on_rx) 40 41 def _on_rx(self): 42 # Needed for ESP32. 43 if hasattr(os, "dupterm_notify"): 44 os.dupterm_notify(None) 45 46 def read(self, sz=None): 47 return self._uart.read(sz) 48 49 def readinto(self, buf): 50 avail = self._uart.read(len(buf)) 51 if not avail: 52 return None 53 for i in range(len(avail)): 54 buf[i] = avail[i] 55 return len(avail) 56 57 def ioctl(self, op, arg): 58 if op == _MP_STREAM_POLL: 59 if self._uart.any(): 60 return _MP_STREAM_POLL_RD 61 return 0 62 63 def _flush(self): 64 data = self._tx_buf[0:100] 65 self._tx_buf = self._tx_buf[100:] 66 self._uart.write(data) 67 if self._tx_buf: 68 schedule_in(self._flush, 50) 69 70 def write(self, buf): 71 empty = not self._tx_buf 72 self._tx_buf += buf 73 if empty: 74 schedule_in(self._flush, 50) 75 76 77def start(): 78 ble = bluetooth.BLE() 79 uart = BLEUART(ble, name="mpy-repl") 80 stream = BLEUARTStream(uart) 81 82 os.dupterm(stream) 83