1# This example demonstrates a simple temperature sensor peripheral. 2# 3# The sensor's local value updates every second, and it will notify 4# any connected central every 10 seconds. 5# 6# Work-in-progress demo of implementing bonding and passkey auth. 7 8import bluetooth 9import random 10import struct 11import time 12import json 13import binascii 14from ble_advertising import advertising_payload 15 16from micropython import const 17 18_IRQ_CENTRAL_CONNECT = const(1) 19_IRQ_CENTRAL_DISCONNECT = const(2) 20_IRQ_GATTS_INDICATE_DONE = const(20) 21 22_IRQ_ENCRYPTION_UPDATE = const(28) 23_IRQ_PASSKEY_ACTION = const(31) 24 25_IRQ_GET_SECRET = const(29) 26_IRQ_SET_SECRET = const(30) 27 28_FLAG_READ = const(0x0002) 29_FLAG_NOTIFY = const(0x0010) 30_FLAG_INDICATE = const(0x0020) 31 32_FLAG_READ_ENCRYPTED = const(0x0200) 33 34# org.bluetooth.service.environmental_sensing 35_ENV_SENSE_UUID = bluetooth.UUID(0x181A) 36# org.bluetooth.characteristic.temperature 37_TEMP_CHAR = ( 38 bluetooth.UUID(0x2A6E), 39 _FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE | _FLAG_READ_ENCRYPTED, 40) 41_ENV_SENSE_SERVICE = ( 42 _ENV_SENSE_UUID, 43 (_TEMP_CHAR,), 44) 45 46# org.bluetooth.characteristic.gap.appearance.xml 47_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768) 48 49_IO_CAPABILITY_DISPLAY_ONLY = const(0) 50_IO_CAPABILITY_DISPLAY_YESNO = const(1) 51_IO_CAPABILITY_KEYBOARD_ONLY = const(2) 52_IO_CAPABILITY_NO_INPUT_OUTPUT = const(3) 53_IO_CAPABILITY_KEYBOARD_DISPLAY = const(4) 54 55_PASSKEY_ACTION_INPUT = const(2) 56_PASSKEY_ACTION_DISP = const(3) 57_PASSKEY_ACTION_NUMCMP = const(4) 58 59 60class BLETemperature: 61 def __init__(self, ble, name="mpy-temp"): 62 self._ble = ble 63 self._load_secrets() 64 self._ble.irq(self._irq) 65 self._ble.config(bond=True) 66 self._ble.config(le_secure=True) 67 self._ble.config(mitm=True) 68 self._ble.config(io=_IO_CAPABILITY_DISPLAY_YESNO) 69 self._ble.active(True) 70 self._ble.config(addr_mode=2) 71 ((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,)) 72 self._connections = set() 73 self._payload = advertising_payload( 74 name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER 75 ) 76 self._advertise() 77 78 def _irq(self, event, data): 79 # Track connections so we can send notifications. 80 if event == _IRQ_CENTRAL_CONNECT: 81 conn_handle, _, _ = data 82 self._connections.add(conn_handle) 83 elif event == _IRQ_CENTRAL_DISCONNECT: 84 conn_handle, _, _ = data 85 self._connections.remove(conn_handle) 86 self._save_secrets() 87 # Start advertising again to allow a new connection. 88 self._advertise() 89 elif event == _IRQ_ENCRYPTION_UPDATE: 90 conn_handle, encrypted, authenticated, bonded, key_size = data 91 print("encryption update", conn_handle, encrypted, authenticated, bonded, key_size) 92 elif event == _IRQ_PASSKEY_ACTION: 93 conn_handle, action, passkey = data 94 print("passkey action", conn_handle, action, passkey) 95 if action == _PASSKEY_ACTION_NUMCMP: 96 accept = int(input("accept? ")) 97 self._ble.gap_passkey(conn_handle, action, accept) 98 elif action == _PASSKEY_ACTION_DISP: 99 print("displaying 123456") 100 self._ble.gap_passkey(conn_handle, action, 123456) 101 elif action == _PASSKEY_ACTION_INPUT: 102 print("prompting for passkey") 103 passkey = int(input("passkey? ")) 104 self._ble.gap_passkey(conn_handle, action, passkey) 105 else: 106 print("unknown action") 107 elif event == _IRQ_GATTS_INDICATE_DONE: 108 conn_handle, value_handle, status = data 109 elif event == _IRQ_SET_SECRET: 110 sec_type, key, value = data 111 key = sec_type, bytes(key) 112 value = bytes(value) if value else None 113 print("set secret:", key, value) 114 if value is None: 115 if key in self._secrets: 116 del self._secrets[key] 117 return True 118 else: 119 return False 120 else: 121 self._secrets[key] = value 122 return True 123 elif event == _IRQ_GET_SECRET: 124 sec_type, index, key = data 125 print("get secret:", sec_type, index, bytes(key) if key else None) 126 if key is None: 127 i = 0 128 for (t, _key), value in self._secrets.items(): 129 if t == sec_type: 130 if i == index: 131 return value 132 i += 1 133 return None 134 else: 135 key = sec_type, bytes(key) 136 return self._secrets.get(key, None) 137 138 def set_temperature(self, temp_deg_c, notify=False, indicate=False): 139 # Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius. 140 # Write the local value, ready for a central to read. 141 self._ble.gatts_write(self._handle, struct.pack("<h", int(temp_deg_c * 100))) 142 if notify or indicate: 143 for conn_handle in self._connections: 144 if notify: 145 # Notify connected centrals. 146 self._ble.gatts_notify(conn_handle, self._handle) 147 if indicate: 148 # Indicate connected centrals. 149 self._ble.gatts_indicate(conn_handle, self._handle) 150 151 def _advertise(self, interval_us=500000): 152 self._ble.config(addr_mode=2) 153 self._ble.gap_advertise(interval_us, adv_data=self._payload) 154 155 def _load_secrets(self): 156 self._secrets = {} 157 try: 158 with open("secrets.json", "r") as f: 159 entries = json.load(f) 160 for sec_type, key, value in entries: 161 self._secrets[sec_type, binascii.a2b_base64(key)] = binascii.a2b_base64(value) 162 except: 163 print("no secrets available") 164 165 def _save_secrets(self): 166 try: 167 with open("secrets.json", "w") as f: 168 json_secrets = [ 169 (sec_type, binascii.b2a_base64(key), binascii.b2a_base64(value)) 170 for (sec_type, key), value in self._secrets.items() 171 ] 172 json.dump(json_secrets, f) 173 except: 174 print("failed to save secrets") 175 176 177def demo(): 178 ble = bluetooth.BLE() 179 temp = BLETemperature(ble) 180 181 t = 25 182 i = 0 183 184 while True: 185 # Write every second, notify every 10 seconds. 186 i = (i + 1) % 10 187 temp.set_temperature(t, notify=i == 0, indicate=False) 188 # Random walk the temperature. 189 t += random.uniform(-0.5, 0.5) 190 time.sleep_ms(1000) 191 192 193if __name__ == "__main__": 194 demo() 195