1# This example demonstrates a simple temperature sensor peripheral.
2#
3# The sensor's local value updates every second, and it will notify
4# any connected central every 10 seconds.
5#
6# Work-in-progress demo of implementing bonding and passkey auth.
7
8import bluetooth
9import random
10import struct
11import time
12import json
13import binascii
14from ble_advertising import advertising_payload
15
16from micropython import const
17
18_IRQ_CENTRAL_CONNECT = const(1)
19_IRQ_CENTRAL_DISCONNECT = const(2)
20_IRQ_GATTS_INDICATE_DONE = const(20)
21
22_IRQ_ENCRYPTION_UPDATE = const(28)
23_IRQ_PASSKEY_ACTION = const(31)
24
25_IRQ_GET_SECRET = const(29)
26_IRQ_SET_SECRET = const(30)
27
28_FLAG_READ = const(0x0002)
29_FLAG_NOTIFY = const(0x0010)
30_FLAG_INDICATE = const(0x0020)
31
32_FLAG_READ_ENCRYPTED = const(0x0200)
33
34# org.bluetooth.service.environmental_sensing
35_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
36# org.bluetooth.characteristic.temperature
37_TEMP_CHAR = (
38    bluetooth.UUID(0x2A6E),
39    _FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE | _FLAG_READ_ENCRYPTED,
40)
41_ENV_SENSE_SERVICE = (
42    _ENV_SENSE_UUID,
43    (_TEMP_CHAR,),
44)
45
46# org.bluetooth.characteristic.gap.appearance.xml
47_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
48
49_IO_CAPABILITY_DISPLAY_ONLY = const(0)
50_IO_CAPABILITY_DISPLAY_YESNO = const(1)
51_IO_CAPABILITY_KEYBOARD_ONLY = const(2)
52_IO_CAPABILITY_NO_INPUT_OUTPUT = const(3)
53_IO_CAPABILITY_KEYBOARD_DISPLAY = const(4)
54
55_PASSKEY_ACTION_INPUT = const(2)
56_PASSKEY_ACTION_DISP = const(3)
57_PASSKEY_ACTION_NUMCMP = const(4)
58
59
60class BLETemperature:
61    def __init__(self, ble, name="mpy-temp"):
62        self._ble = ble
63        self._load_secrets()
64        self._ble.irq(self._irq)
65        self._ble.config(bond=True)
66        self._ble.config(le_secure=True)
67        self._ble.config(mitm=True)
68        self._ble.config(io=_IO_CAPABILITY_DISPLAY_YESNO)
69        self._ble.active(True)
70        self._ble.config(addr_mode=2)
71        ((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
72        self._connections = set()
73        self._payload = advertising_payload(
74            name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER
75        )
76        self._advertise()
77
78    def _irq(self, event, data):
79        # Track connections so we can send notifications.
80        if event == _IRQ_CENTRAL_CONNECT:
81            conn_handle, _, _ = data
82            self._connections.add(conn_handle)
83        elif event == _IRQ_CENTRAL_DISCONNECT:
84            conn_handle, _, _ = data
85            self._connections.remove(conn_handle)
86            self._save_secrets()
87            # Start advertising again to allow a new connection.
88            self._advertise()
89        elif event == _IRQ_ENCRYPTION_UPDATE:
90            conn_handle, encrypted, authenticated, bonded, key_size = data
91            print("encryption update", conn_handle, encrypted, authenticated, bonded, key_size)
92        elif event == _IRQ_PASSKEY_ACTION:
93            conn_handle, action, passkey = data
94            print("passkey action", conn_handle, action, passkey)
95            if action == _PASSKEY_ACTION_NUMCMP:
96                accept = int(input("accept? "))
97                self._ble.gap_passkey(conn_handle, action, accept)
98            elif action == _PASSKEY_ACTION_DISP:
99                print("displaying 123456")
100                self._ble.gap_passkey(conn_handle, action, 123456)
101            elif action == _PASSKEY_ACTION_INPUT:
102                print("prompting for passkey")
103                passkey = int(input("passkey? "))
104                self._ble.gap_passkey(conn_handle, action, passkey)
105            else:
106                print("unknown action")
107        elif event == _IRQ_GATTS_INDICATE_DONE:
108            conn_handle, value_handle, status = data
109        elif event == _IRQ_SET_SECRET:
110            sec_type, key, value = data
111            key = sec_type, bytes(key)
112            value = bytes(value) if value else None
113            print("set secret:", key, value)
114            if value is None:
115                if key in self._secrets:
116                    del self._secrets[key]
117                    return True
118                else:
119                    return False
120            else:
121                self._secrets[key] = value
122            return True
123        elif event == _IRQ_GET_SECRET:
124            sec_type, index, key = data
125            print("get secret:", sec_type, index, bytes(key) if key else None)
126            if key is None:
127                i = 0
128                for (t, _key), value in self._secrets.items():
129                    if t == sec_type:
130                        if i == index:
131                            return value
132                        i += 1
133                return None
134            else:
135                key = sec_type, bytes(key)
136                return self._secrets.get(key, None)
137
138    def set_temperature(self, temp_deg_c, notify=False, indicate=False):
139        # Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
140        # Write the local value, ready for a central to read.
141        self._ble.gatts_write(self._handle, struct.pack("<h", int(temp_deg_c * 100)))
142        if notify or indicate:
143            for conn_handle in self._connections:
144                if notify:
145                    # Notify connected centrals.
146                    self._ble.gatts_notify(conn_handle, self._handle)
147                if indicate:
148                    # Indicate connected centrals.
149                    self._ble.gatts_indicate(conn_handle, self._handle)
150
151    def _advertise(self, interval_us=500000):
152        self._ble.config(addr_mode=2)
153        self._ble.gap_advertise(interval_us, adv_data=self._payload)
154
155    def _load_secrets(self):
156        self._secrets = {}
157        try:
158            with open("secrets.json", "r") as f:
159                entries = json.load(f)
160                for sec_type, key, value in entries:
161                    self._secrets[sec_type, binascii.a2b_base64(key)] = binascii.a2b_base64(value)
162        except:
163            print("no secrets available")
164
165    def _save_secrets(self):
166        try:
167            with open("secrets.json", "w") as f:
168                json_secrets = [
169                    (sec_type, binascii.b2a_base64(key), binascii.b2a_base64(value))
170                    for (sec_type, key), value in self._secrets.items()
171                ]
172                json.dump(json_secrets, f)
173        except:
174            print("failed to save secrets")
175
176
177def demo():
178    ble = bluetooth.BLE()
179    temp = BLETemperature(ble)
180
181    t = 25
182    i = 0
183
184    while True:
185        # Write every second, notify every 10 seconds.
186        i = (i + 1) % 10
187        temp.set_temperature(t, notify=i == 0, indicate=False)
188        # Random walk the temperature.
189        t += random.uniform(-0.5, 0.5)
190        time.sleep_ms(1000)
191
192
193if __name__ == "__main__":
194    demo()
195