1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17import os 18 19import numpy as np 20import tvm 21from tvm import te 22from tvm.contrib import graph_runtime, util 23from tvm import relay 24import tvm.micro as micro 25from tvm.micro import create_micro_mod 26 27# Use real micro device - an STM32F746 discovery board 28# SETUP: 29# Be sure to have openocd installed and running 30# Ex : openocd -f board/stm32f7discovery.cfg 31# Be sure to have the ST CMSIS library downloaded, installed and 32# Ex : export CMSIS_ST_PATH="/home/yourid/st/STM32Cube_FW_F7_V1.16.0/Drivers/CMSIS" 33DEV_CONFIG_A = micro.device.arm.stm32f746xx.generate_config("127.0.0.1", 6666) 34DEV_CONFIG_B = micro.device.arm.stm32f746xx.generate_config("127.0.0.1", 6666) 35TARGET = "micro_dev" 36 37 38def relay_micro_build(func, dev_config, params=None): 39 """Create a graph runtime module with a micro device context from a Relay function. 40 41 Parameters 42 ---------- 43 func : relay.Function 44 function to compile 45 46 dev_config : Dict[str, Any] 47 MicroTVM config dict for the target device 48 49 params : dict 50 input parameters that do not change during inference 51 52 Return 53 ------ 54 mod : tvm.runtime.Module 55 graph runtime module for the target device 56 """ 57 with tvm.transform.PassContext( 58 disabled_pass={"FuseOps"}, config={"tir.disable_vectorize": True} 59 ): 60 graph, c_mod, params = relay.build(func, target=TARGET, params=params) 61 micro_mod = micro.create_micro_mod(c_mod, dev_config) 62 ctx = tvm.micro_dev(0) 63 mod = graph_runtime.create(graph, micro_mod, ctx) 64 mod.set_input(**params) 65 return mod 66 67 68GDB_INIT_TEMPLATE = """ 69layout asm 70target remote localhost:{gdb_port} 71set $pc = UTVMInit 72break UTVMDone 73""" 74 75 76def reset_gdbinit(): 77 if "server_port" not in DEV_CONFIG_A: 78 return 79 try: 80 gdb_init_dir = os.environ["MICRO_GDB_INIT_DIR"] 81 except KeyError: 82 return 83 with open(f"{gdb_init_dir}/.gdbinit", "w") as f: 84 gdb_port = DEV_CONFIG_A["server_port"] - 3333 85 f.write(GDB_INIT_TEMPLATE.format(gdb_port=gdb_port)) 86 87 88def test_alloc(): 89 """Test tensor allocation on the device.""" 90 if not tvm.runtime.enabled("micro_dev"): 91 return 92 shape = (1024,) 93 dtype = "float32" 94 with micro.Session(DEV_CONFIG_A): 95 ctx = tvm.micro_dev(0) 96 np_tensor = np.random.uniform(size=shape).astype(dtype) 97 micro_tensor = tvm.nd.array(np_tensor, ctx) 98 tvm.testing.assert_allclose(np_tensor, micro_tensor.asnumpy()) 99 100 101def test_add(): 102 """Test a module which performs addition.""" 103 if not tvm.runtime.enabled("micro_dev"): 104 return 105 shape = (1024,) 106 dtype = "float32" 107 108 reset_gdbinit() 109 110 # Construct TVM expression. 111 tvm_shape = tvm.runtime.convert(shape) 112 A = te.placeholder(tvm_shape, name="A", dtype=dtype) 113 B = te.placeholder(tvm_shape, name="B", dtype=dtype) 114 C = te.compute(A.shape, lambda *i: A(*i) + B(*i), name="C") 115 s = te.create_schedule(C.op) 116 117 func_name = "fadd" 118 c_mod = tvm.build(s, [A, B, C], target="c", name=func_name) 119 120 with micro.Session(DEV_CONFIG_A) as sess: 121 micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A) 122 micro_func = micro_mod[func_name] 123 ctx = tvm.micro_dev(0) 124 125 a_np = np.random.uniform(size=shape).astype(dtype) 126 a = tvm.nd.array(a_np, ctx) 127 b_np = np.random.uniform(size=shape).astype(dtype) 128 b = tvm.nd.array(b_np, ctx) 129 c = tvm.nd.array(np.zeros(shape, dtype=dtype), ctx) 130 micro_func(a, b, c) 131 132 # ensure inputs weren't corrupted 133 tvm.testing.assert_allclose(a.asnumpy(), a_np) 134 tvm.testing.assert_allclose(b.asnumpy(), b_np) 135 # ensure output is correct 136 tvm.testing.assert_allclose(c.asnumpy(), a.asnumpy() + b.asnumpy()) 137 138 139def test_workspace_add(): 140 """Test a module which uses a workspace to compute an intermediate value.""" 141 if not tvm.runtime.enabled("micro_dev"): 142 return 143 shape = (1024,) 144 dtype = "float32" 145 146 reset_gdbinit() 147 148 # Construct TVM expression. 149 tvm_shape = tvm.runtime.convert(shape) 150 A = te.placeholder(tvm_shape, name="A", dtype=dtype) 151 B = te.placeholder(tvm_shape, name="B", dtype=dtype) 152 B = te.compute(A.shape, lambda *i: A(*i) + 1, name="B") 153 C = te.compute(A.shape, lambda *i: B(*i) + 1, name="C") 154 s = te.create_schedule(C.op) 155 156 func_name = "fadd_two_workspace" 157 c_mod = tvm.build(s, [A, C], target="c", name=func_name) 158 159 with micro.Session(DEV_CONFIG_A) as sess: 160 micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A) 161 micro_func = micro_mod[func_name] 162 ctx = tvm.micro_dev(0) 163 a_np = np.random.uniform(size=shape).astype(dtype) 164 a = tvm.nd.array(a_np, ctx) 165 c = tvm.nd.array(np.zeros(shape, dtype=dtype), ctx) 166 micro_func(a, c) 167 168 # ensure input wasn't corrupted 169 tvm.testing.assert_allclose(a.asnumpy(), a_np) 170 # ensure output is correct 171 tvm.testing.assert_allclose(c.asnumpy(), a.asnumpy() + 2.0) 172 173 174def test_graph_runtime(): 175 """Test a program which uses the graph runtime.""" 176 if not tvm.runtime.enabled("micro_dev"): 177 return 178 shape = (1024,) 179 dtype = "float32" 180 181 # Construct Relay program. 182 x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype)) 183 xx = relay.multiply(x, x) 184 z = relay.add(xx, relay.const(1.0)) 185 func = relay.Function([x], z) 186 187 with micro.Session(DEV_CONFIG_A): 188 mod = relay_micro_build(func, DEV_CONFIG_A) 189 190 x_in = np.random.uniform(size=shape[0]).astype(dtype) 191 mod.run(x=x_in) 192 result = mod.get_output(0).asnumpy() 193 194 tvm.testing.assert_allclose(mod.get_input(0).asnumpy(), x_in) 195 tvm.testing.assert_allclose(result, x_in * x_in + 1.0) 196 197 198def test_conv2d(): 199 if not tvm.runtime.enabled("micro_dev"): 200 return 201 202 from tvm.relay import create_executor 203 from tvm.relay import transform 204 205 dshape = (1, 4, 16, 16) 206 dtype = "int8" 207 func_name = "fused_nn_conv2d" 208 209 reset_gdbinit() 210 211 # Construct Relay program. 212 x = relay.var("x", shape=dshape, dtype=dtype) 213 conv_expr = relay.nn.conv2d(x, relay.var("w"), kernel_size=(3, 3), padding=(1, 1), channels=4) 214 func = relay.Function(relay.analysis.free_vars(conv_expr), conv_expr) 215 mod = tvm.IRModule.from_expr(func) 216 mod = transform.InferType()(mod) 217 218 x_shape = list(map(lambda x: x.value, mod["main"].params[0].checked_type.shape)) 219 w_shape = list(map(lambda x: x.value, mod["main"].params[1].checked_type.shape)) 220 out_shape = list(map(lambda x: x.value, mod["main"].ret_type.shape)) 221 222 with tvm.transform.PassContext(config={"tir.disable_vectorize": True}): 223 graph, c_mod, params = relay.build(mod, target="c") 224 225 with micro.Session(DEV_CONFIG_A): 226 micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A) 227 candidate_func_name = func_name 228 for i in range(100): 229 try: 230 micro_func = micro_mod[candidate_func_name] 231 break 232 except tvm.TVMError as e: 233 candidate_func_name = f"{func_name}_{i}" 234 else: 235 assert False 236 ctx = tvm.micro_dev(0) 237 238 x_data = tvm.nd.array(np.random.uniform(size=x_shape).astype(dtype), ctx) 239 w_data = tvm.nd.array(np.random.uniform(size=w_shape).astype(dtype), ctx) 240 result = tvm.nd.array(np.zeros(shape=out_shape, dtype=dtype), ctx) 241 micro_func(x_data, w_data, result) 242 243 out_data = np.zeros(out_shape, dtype=dtype) 244 params = {"x": x_data.asnumpy(), "w": w_data.asnumpy()} 245 intrp = create_executor("debug") 246 expected_result = intrp.evaluate(mod["main"])(x_data, w_data) 247 248 tvm.testing.assert_allclose(result.asnumpy(), expected_result.asnumpy()) 249 250 251def test_interleave_sessions(): 252 """Test closing and reopening sessions.""" 253 if not tvm.runtime.enabled("micro_dev"): 254 return 255 shape = (1024,) 256 dtype = "float32" 257 258 # Construct Relay add program. 259 x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype)) 260 ret = relay.add(x, relay.const(1.0)) 261 add_const_func = relay.Function([x], ret) 262 263 sess_a = micro.Session(DEV_CONFIG_A) 264 sess_b = micro.Session(DEV_CONFIG_B) 265 with sess_a: 266 np_tensor_a = np.random.uniform(size=shape).astype(dtype) 267 micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0)) 268 with sess_b: 269 np_tensor_b = np.random.uniform(size=shape).astype(dtype) 270 micro_tensor_b = tvm.nd.array(np_tensor_b, tvm.micro_dev(0)) 271 with sess_a: 272 add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A) 273 add_const_mod.run(x=micro_tensor_a) 274 add_result = add_const_mod.get_output(0).asnumpy() 275 tvm.testing.assert_allclose(add_result, np_tensor_a + 1.0) 276 with sess_b: 277 add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_B) 278 add_const_mod.run(x=micro_tensor_b) 279 add_result = add_const_mod.get_output(0).asnumpy() 280 tvm.testing.assert_allclose(add_result, np_tensor_b + 1.0) 281 282 283def test_nested_sessions(): 284 """Test entering and exiting nested session contexts.""" 285 if not tvm.runtime.enabled("micro_dev"): 286 return 287 shape = (1024,) 288 dtype = "float32" 289 290 # Construct Relay add program. 291 x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype)) 292 ret = relay.add(x, relay.const(1.0)) 293 add_const_func = relay.Function([x], ret) 294 295 sess_a = micro.Session(DEV_CONFIG_A) 296 sess_b = micro.Session(DEV_CONFIG_B) 297 with sess_a: 298 np_tensor_a = np.random.uniform(size=shape).astype(dtype) 299 micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0)) 300 with sess_b: 301 np_tensor_b = np.random.uniform(size=shape).astype(dtype) 302 micro_tensor_b = tvm.nd.array(np_tensor_b, tvm.micro_dev(0)) 303 add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A) 304 add_const_mod.run(x=micro_tensor_a) 305 add_result = add_const_mod.get_output(0).asnumpy() 306 tvm.testing.assert_allclose(add_result, np_tensor_a + 1.0) 307 308 309def test_inactive_session_use(): 310 """Test the use of objects allocated in a session that is no longer active.""" 311 if not tvm.runtime.enabled("micro_dev"): 312 return 313 shape = (1024,) 314 dtype = "float32" 315 316 # Construct Relay add program. 317 x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype)) 318 ret = relay.add(x, relay.const(1.0)) 319 add_const_func = relay.Function([x], ret) 320 321 sess_a = micro.Session(DEV_CONFIG_A) 322 sess_b = micro.Session(DEV_CONFIG_B) 323 with sess_a: 324 np_tensor_a = np.random.uniform(size=shape).astype(dtype) 325 micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0)) 326 add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A) 327 328 with sess_b: 329 # These objects belong to `sess_a`. 330 add_const_mod.run(x=micro_tensor_a) 331 add_result = add_const_mod.get_output(0).asnumpy() 332 tvm.testing.assert_allclose(add_result, np_tensor_a + 1.0) 333 334 335# TODO add workspace alloc/free stress test 336 337if __name__ == "__main__": 338 test_alloc() 339 print() 340 print("finished alloc test") 341 input("[press enter to continue]") 342 test_add() 343 print() 344 print("finished add test") 345 input("[press enter to continue]") 346 test_workspace_add() 347 print() 348 print("finished workspace add test") 349 input("[press enter to continue]") 350 test_graph_runtime() 351 print() 352 print("finished graph runtime test") 353 input("[press enter to continue]") 354 test_conv2d() 355 print() 356 print("finished conv2d test") 357 input("[press enter to continue]") 358 # disable for now as these are currently broken 359 # test_interleave_sessions() 360 # print() 361 # print('finished interleaved sessions test') 362 # input('[press enter to continue]') 363 # test_nested_sessions() 364 # print() 365 # print('finished nested sessions test') 366 # input('[press enter to continue]') 367 test_inactive_session_use() 368 print() 369 print("finished use inactive session test") 370 input("[press enter to continue]") 371