1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17import os
18
19import numpy as np
20import tvm
21from tvm import te
22from tvm.contrib import graph_runtime, util
23from tvm import relay
24import tvm.micro as micro
25from tvm.micro import create_micro_mod
26
27# Use real micro device - an STM32F746 discovery board
28# SETUP:
29# Be sure to have openocd installed and running
30# Ex : openocd -f board/stm32f7discovery.cfg
31# Be sure to have the ST CMSIS library downloaded, installed and
32# Ex : export CMSIS_ST_PATH="/home/yourid/st/STM32Cube_FW_F7_V1.16.0/Drivers/CMSIS"
33DEV_CONFIG_A = micro.device.arm.stm32f746xx.generate_config("127.0.0.1", 6666)
34DEV_CONFIG_B = micro.device.arm.stm32f746xx.generate_config("127.0.0.1", 6666)
35TARGET = "micro_dev"
36
37
38def relay_micro_build(func, dev_config, params=None):
39    """Create a graph runtime module with a micro device context from a Relay function.
40
41    Parameters
42    ----------
43    func : relay.Function
44        function to compile
45
46    dev_config : Dict[str, Any]
47        MicroTVM config dict for the target device
48
49    params : dict
50        input parameters that do not change during inference
51
52    Return
53    ------
54    mod : tvm.runtime.Module
55        graph runtime module for the target device
56    """
57    with tvm.transform.PassContext(
58        disabled_pass={"FuseOps"}, config={"tir.disable_vectorize": True}
59    ):
60        graph, c_mod, params = relay.build(func, target=TARGET, params=params)
61    micro_mod = micro.create_micro_mod(c_mod, dev_config)
62    ctx = tvm.micro_dev(0)
63    mod = graph_runtime.create(graph, micro_mod, ctx)
64    mod.set_input(**params)
65    return mod
66
67
68GDB_INIT_TEMPLATE = """
69layout asm
70target remote localhost:{gdb_port}
71set $pc = UTVMInit
72break UTVMDone
73"""
74
75
76def reset_gdbinit():
77    if "server_port" not in DEV_CONFIG_A:
78        return
79    try:
80        gdb_init_dir = os.environ["MICRO_GDB_INIT_DIR"]
81    except KeyError:
82        return
83    with open(f"{gdb_init_dir}/.gdbinit", "w") as f:
84        gdb_port = DEV_CONFIG_A["server_port"] - 3333
85        f.write(GDB_INIT_TEMPLATE.format(gdb_port=gdb_port))
86
87
88def test_alloc():
89    """Test tensor allocation on the device."""
90    if not tvm.runtime.enabled("micro_dev"):
91        return
92    shape = (1024,)
93    dtype = "float32"
94    with micro.Session(DEV_CONFIG_A):
95        ctx = tvm.micro_dev(0)
96        np_tensor = np.random.uniform(size=shape).astype(dtype)
97        micro_tensor = tvm.nd.array(np_tensor, ctx)
98        tvm.testing.assert_allclose(np_tensor, micro_tensor.asnumpy())
99
100
101def test_add():
102    """Test a module which performs addition."""
103    if not tvm.runtime.enabled("micro_dev"):
104        return
105    shape = (1024,)
106    dtype = "float32"
107
108    reset_gdbinit()
109
110    # Construct TVM expression.
111    tvm_shape = tvm.runtime.convert(shape)
112    A = te.placeholder(tvm_shape, name="A", dtype=dtype)
113    B = te.placeholder(tvm_shape, name="B", dtype=dtype)
114    C = te.compute(A.shape, lambda *i: A(*i) + B(*i), name="C")
115    s = te.create_schedule(C.op)
116
117    func_name = "fadd"
118    c_mod = tvm.build(s, [A, B, C], target="c", name=func_name)
119
120    with micro.Session(DEV_CONFIG_A) as sess:
121        micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A)
122        micro_func = micro_mod[func_name]
123        ctx = tvm.micro_dev(0)
124
125        a_np = np.random.uniform(size=shape).astype(dtype)
126        a = tvm.nd.array(a_np, ctx)
127        b_np = np.random.uniform(size=shape).astype(dtype)
128        b = tvm.nd.array(b_np, ctx)
129        c = tvm.nd.array(np.zeros(shape, dtype=dtype), ctx)
130        micro_func(a, b, c)
131
132        # ensure inputs weren't corrupted
133        tvm.testing.assert_allclose(a.asnumpy(), a_np)
134        tvm.testing.assert_allclose(b.asnumpy(), b_np)
135        # ensure output is correct
136        tvm.testing.assert_allclose(c.asnumpy(), a.asnumpy() + b.asnumpy())
137
138
139def test_workspace_add():
140    """Test a module which uses a workspace to compute an intermediate value."""
141    if not tvm.runtime.enabled("micro_dev"):
142        return
143    shape = (1024,)
144    dtype = "float32"
145
146    reset_gdbinit()
147
148    # Construct TVM expression.
149    tvm_shape = tvm.runtime.convert(shape)
150    A = te.placeholder(tvm_shape, name="A", dtype=dtype)
151    B = te.placeholder(tvm_shape, name="B", dtype=dtype)
152    B = te.compute(A.shape, lambda *i: A(*i) + 1, name="B")
153    C = te.compute(A.shape, lambda *i: B(*i) + 1, name="C")
154    s = te.create_schedule(C.op)
155
156    func_name = "fadd_two_workspace"
157    c_mod = tvm.build(s, [A, C], target="c", name=func_name)
158
159    with micro.Session(DEV_CONFIG_A) as sess:
160        micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A)
161        micro_func = micro_mod[func_name]
162        ctx = tvm.micro_dev(0)
163        a_np = np.random.uniform(size=shape).astype(dtype)
164        a = tvm.nd.array(a_np, ctx)
165        c = tvm.nd.array(np.zeros(shape, dtype=dtype), ctx)
166        micro_func(a, c)
167
168        # ensure input wasn't corrupted
169        tvm.testing.assert_allclose(a.asnumpy(), a_np)
170        # ensure output is correct
171        tvm.testing.assert_allclose(c.asnumpy(), a.asnumpy() + 2.0)
172
173
174def test_graph_runtime():
175    """Test a program which uses the graph runtime."""
176    if not tvm.runtime.enabled("micro_dev"):
177        return
178    shape = (1024,)
179    dtype = "float32"
180
181    # Construct Relay program.
182    x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype))
183    xx = relay.multiply(x, x)
184    z = relay.add(xx, relay.const(1.0))
185    func = relay.Function([x], z)
186
187    with micro.Session(DEV_CONFIG_A):
188        mod = relay_micro_build(func, DEV_CONFIG_A)
189
190        x_in = np.random.uniform(size=shape[0]).astype(dtype)
191        mod.run(x=x_in)
192        result = mod.get_output(0).asnumpy()
193
194        tvm.testing.assert_allclose(mod.get_input(0).asnumpy(), x_in)
195        tvm.testing.assert_allclose(result, x_in * x_in + 1.0)
196
197
198def test_conv2d():
199    if not tvm.runtime.enabled("micro_dev"):
200        return
201
202    from tvm.relay import create_executor
203    from tvm.relay import transform
204
205    dshape = (1, 4, 16, 16)
206    dtype = "int8"
207    func_name = "fused_nn_conv2d"
208
209    reset_gdbinit()
210
211    # Construct Relay program.
212    x = relay.var("x", shape=dshape, dtype=dtype)
213    conv_expr = relay.nn.conv2d(x, relay.var("w"), kernel_size=(3, 3), padding=(1, 1), channels=4)
214    func = relay.Function(relay.analysis.free_vars(conv_expr), conv_expr)
215    mod = tvm.IRModule.from_expr(func)
216    mod = transform.InferType()(mod)
217
218    x_shape = list(map(lambda x: x.value, mod["main"].params[0].checked_type.shape))
219    w_shape = list(map(lambda x: x.value, mod["main"].params[1].checked_type.shape))
220    out_shape = list(map(lambda x: x.value, mod["main"].ret_type.shape))
221
222    with tvm.transform.PassContext(config={"tir.disable_vectorize": True}):
223        graph, c_mod, params = relay.build(mod, target="c")
224
225    with micro.Session(DEV_CONFIG_A):
226        micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A)
227        candidate_func_name = func_name
228        for i in range(100):
229            try:
230                micro_func = micro_mod[candidate_func_name]
231                break
232            except tvm.TVMError as e:
233                candidate_func_name = f"{func_name}_{i}"
234        else:
235            assert False
236        ctx = tvm.micro_dev(0)
237
238        x_data = tvm.nd.array(np.random.uniform(size=x_shape).astype(dtype), ctx)
239        w_data = tvm.nd.array(np.random.uniform(size=w_shape).astype(dtype), ctx)
240        result = tvm.nd.array(np.zeros(shape=out_shape, dtype=dtype), ctx)
241        micro_func(x_data, w_data, result)
242
243        out_data = np.zeros(out_shape, dtype=dtype)
244        params = {"x": x_data.asnumpy(), "w": w_data.asnumpy()}
245        intrp = create_executor("debug")
246        expected_result = intrp.evaluate(mod["main"])(x_data, w_data)
247
248        tvm.testing.assert_allclose(result.asnumpy(), expected_result.asnumpy())
249
250
251def test_interleave_sessions():
252    """Test closing and reopening sessions."""
253    if not tvm.runtime.enabled("micro_dev"):
254        return
255    shape = (1024,)
256    dtype = "float32"
257
258    # Construct Relay add program.
259    x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype))
260    ret = relay.add(x, relay.const(1.0))
261    add_const_func = relay.Function([x], ret)
262
263    sess_a = micro.Session(DEV_CONFIG_A)
264    sess_b = micro.Session(DEV_CONFIG_B)
265    with sess_a:
266        np_tensor_a = np.random.uniform(size=shape).astype(dtype)
267        micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0))
268    with sess_b:
269        np_tensor_b = np.random.uniform(size=shape).astype(dtype)
270        micro_tensor_b = tvm.nd.array(np_tensor_b, tvm.micro_dev(0))
271    with sess_a:
272        add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A)
273        add_const_mod.run(x=micro_tensor_a)
274        add_result = add_const_mod.get_output(0).asnumpy()
275        tvm.testing.assert_allclose(add_result, np_tensor_a + 1.0)
276    with sess_b:
277        add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_B)
278        add_const_mod.run(x=micro_tensor_b)
279        add_result = add_const_mod.get_output(0).asnumpy()
280        tvm.testing.assert_allclose(add_result, np_tensor_b + 1.0)
281
282
283def test_nested_sessions():
284    """Test entering and exiting nested session contexts."""
285    if not tvm.runtime.enabled("micro_dev"):
286        return
287    shape = (1024,)
288    dtype = "float32"
289
290    # Construct Relay add program.
291    x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype))
292    ret = relay.add(x, relay.const(1.0))
293    add_const_func = relay.Function([x], ret)
294
295    sess_a = micro.Session(DEV_CONFIG_A)
296    sess_b = micro.Session(DEV_CONFIG_B)
297    with sess_a:
298        np_tensor_a = np.random.uniform(size=shape).astype(dtype)
299        micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0))
300        with sess_b:
301            np_tensor_b = np.random.uniform(size=shape).astype(dtype)
302            micro_tensor_b = tvm.nd.array(np_tensor_b, tvm.micro_dev(0))
303        add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A)
304        add_const_mod.run(x=micro_tensor_a)
305        add_result = add_const_mod.get_output(0).asnumpy()
306        tvm.testing.assert_allclose(add_result, np_tensor_a + 1.0)
307
308
309def test_inactive_session_use():
310    """Test the use of objects allocated in a session that is no longer active."""
311    if not tvm.runtime.enabled("micro_dev"):
312        return
313    shape = (1024,)
314    dtype = "float32"
315
316    # Construct Relay add program.
317    x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype))
318    ret = relay.add(x, relay.const(1.0))
319    add_const_func = relay.Function([x], ret)
320
321    sess_a = micro.Session(DEV_CONFIG_A)
322    sess_b = micro.Session(DEV_CONFIG_B)
323    with sess_a:
324        np_tensor_a = np.random.uniform(size=shape).astype(dtype)
325        micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0))
326        add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A)
327
328    with sess_b:
329        # These objects belong to `sess_a`.
330        add_const_mod.run(x=micro_tensor_a)
331        add_result = add_const_mod.get_output(0).asnumpy()
332        tvm.testing.assert_allclose(add_result, np_tensor_a + 1.0)
333
334
335# TODO add workspace alloc/free stress test
336
337if __name__ == "__main__":
338    test_alloc()
339    print()
340    print("finished alloc test")
341    input("[press enter to continue]")
342    test_add()
343    print()
344    print("finished add test")
345    input("[press enter to continue]")
346    test_workspace_add()
347    print()
348    print("finished workspace add test")
349    input("[press enter to continue]")
350    test_graph_runtime()
351    print()
352    print("finished graph runtime test")
353    input("[press enter to continue]")
354    test_conv2d()
355    print()
356    print("finished conv2d test")
357    input("[press enter to continue]")
358    # disable for now as these are currently broken
359    # test_interleave_sessions()
360    # print()
361    # print('finished interleaved sessions test')
362    # input('[press enter to continue]')
363    # test_nested_sessions()
364    # print()
365    # print('finished nested sessions test')
366    # input('[press enter to continue]')
367    test_inactive_session_use()
368    print()
369    print("finished use inactive session test")
370    input("[press enter to continue]")
371