1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 /*!
21 * \file thread_safety_test.cc
22 * \brief test thread safety at the dependency engine level and cached op level
23 */
24
25 #if MXNET_USE_CPP_PACKAGE == 1
26 #include <stdio.h>
27 #include <gtest/gtest.h>
28 #include <mxnet/ndarray.h>
29 #include <mxnet/op_attr_types.h>
30 #include <chrono>
31 #include <cstdlib>
32 #include <random>
33 #include <thread>
34 #include "../src/engine/engine_impl.h"
35 #include "../src/imperative/imperative_utils.h"
36 #include "../include/test_util.h"
37 #include "mxnet-cpp/MxNetCpp.h"
38
39 /*
40 * Prepares input data for the ops/models used in this file
41 */
prepare_input_data(const mxnet::cpp::Shape & shape,const mxnet::cpp::Context & ctx,int num_threads,std::vector<mxnet::cpp::NDArray> * data_arr,bool random_uniform=false)42 void prepare_input_data(const mxnet::cpp::Shape& shape, const mxnet::cpp::Context& ctx,
43 int num_threads,
44 std::vector<mxnet::cpp::NDArray>* data_arr,
45 bool random_uniform = false) {
46 for (size_t i = 0; i < num_threads; ++i) {
47 data_arr->emplace_back(shape, ctx, false, 0);
48 int begin = i * 100;
49 int end = begin + 100;
50 if (random_uniform) {
51 mxnet::cpp::Operator("_random_uniform")(begin, end).Invoke((*data_arr)[i]);
52 }
53 mxnet::cpp::NDArray::WaitAll();
54 }
55 }
56
prepare_output_data(const mxnet::cpp::Shape & shape,const mxnet::cpp::Context & ctx,int num_threads,std::vector<mxnet::cpp::NDArray> * output_arr)57 void prepare_output_data(const mxnet::cpp::Shape& shape, const mxnet::cpp::Context& ctx,
58 int num_threads,
59 std::vector<mxnet::cpp::NDArray>* output_arr) {
60 for (size_t i = 0; i < num_threads; ++i) {
61 output_arr->emplace_back(shape, ctx, false, 0);
62 mxnet::cpp::NDArray::WaitAll();
63 }
64 }
65
66 /*
67 * Prepare backend ndarrays from cpp frontend ndarrays
68 */
prepare_backend_data(const std::vector<mxnet::cpp::NDArray> & input_cpp_arrs,int num_threads,std::vector<mxnet::NDArray * > * output_backend_arrs)69 void prepare_backend_data(const std::vector<mxnet::cpp::NDArray> &input_cpp_arrs,
70 int num_threads,
71 std::vector<mxnet::NDArray *> *output_backend_arrs) {
72 output_backend_arrs->resize(num_threads);
73 for (size_t i = 0; i < num_threads; ++i) {
74 (*output_backend_arrs)[i] = static_cast<NDArray *>(input_cpp_arrs[i].GetHandle());
75 }
76 }
77
78 /*
79 * Create and Invoke CachedOp for given data
80 */
get_expected_results(const mxnet::cpp::Symbol & sym,const std::vector<std::string> & flag_keys,const std::vector<std::string> & flag_vals,int num_threads,std::vector<std::vector<NDArrayHandle>> * arr_handles,std::vector<mxnet::NDArray * > * result_expected,CachedOpHandle * hdl)81 void get_expected_results(const mxnet::cpp::Symbol &sym,
82 const std::vector<std::string> &flag_keys,
83 const std::vector<std::string> &flag_vals,
84 int num_threads,
85 std::vector<std::vector<NDArrayHandle>> *arr_handles,
86 std::vector<mxnet::NDArray*> *result_expected,
87 CachedOpHandle* hdl) {
88 // prepare flag_keys and flag_vals
89 std::vector<const char *> flag_key_cstrs, flag_val_cstrs;
90 flag_key_cstrs.reserve(flag_keys.size());
91 for (size_t i = 0; i < flag_keys.size(); ++i) {
92 flag_key_cstrs.emplace_back(flag_keys[i].c_str());
93 }
94 for (size_t i = 0; i < flag_vals.size(); ++i) {
95 flag_val_cstrs.emplace_back(flag_vals[i].c_str());
96 }
97
98 // Create CachedOp
99 int ret1 = MXCreateCachedOpEx(sym.GetHandle(), flag_keys.size(),
100 flag_key_cstrs.data(), flag_val_cstrs.data(),
101 hdl);
102 if (ret1 < 0) {
103 LOG(FATAL) << MXGetLastError();
104 }
105
106 std::vector<NDArrayHandle *> nd_ptrs(num_threads);
107
108 // Invoke CachedOp same number of times as number of threads
109 for (size_t i = 0; i < num_threads; ++i) {
110 int num_output = 0;
111 const int *stypes;
112 int ret4 = MXInvokeCachedOpEx(*hdl, (*arr_handles)[i].size(), (*arr_handles)[i].data(),
113 &num_output, &nd_ptrs[i], &stypes);
114 if (ret4 < 0) {
115 LOG(FATAL) << MXGetLastError();
116 }
117 mxnet::cpp::NDArray::WaitAll();
118 (*result_expected)[i] = static_cast<NDArray*>(*nd_ptrs[i]);
119 }
120 }
121
122 /*
123 * Create and Invoke CachedOp for multiple threads, each thread with multiple
124 * inferences
125 */
get_expected_results_multiple(const mxnet::cpp::Symbol & sym,const std::vector<std::string> & flag_keys,const std::vector<std::string> & flag_vals,std::vector<std::vector<std::vector<NDArrayHandle>>> * arr_handles,int num_threads,std::vector<std::vector<mxnet::NDArray * >> * result_expected,CachedOpHandle * hdl)126 inline void get_expected_results_multiple(
127 const mxnet::cpp::Symbol &sym,
128 const std::vector<std::string> &flag_keys, const std::vector<std::string> &flag_vals,
129 std::vector<std::vector<std::vector<NDArrayHandle>>> *arr_handles,
130 int num_threads,
131 std::vector<std::vector<mxnet::NDArray *>> *result_expected,
132 CachedOpHandle *hdl) {
133 // prepare flag_keys and flag_vals
134 std::vector<const char *> flag_key_cstrs, flag_val_cstrs;
135 flag_key_cstrs.reserve(flag_keys.size());
136 flag_val_cstrs.reserve(flag_vals.size());
137 for (size_t i = 0; i < flag_keys.size(); ++i) {
138 flag_key_cstrs.emplace_back(flag_keys[i].c_str());
139 }
140 for (size_t i = 0; i < flag_vals.size(); ++i) {
141 flag_val_cstrs.emplace_back(flag_vals[i].c_str());
142 }
143
144 // Create CachedOp
145 int ret1 =
146 MXCreateCachedOpEX(sym.GetHandle(), flag_keys.size(),
147 flag_key_cstrs.data(), flag_val_cstrs.data(), hdl, false);
148 if (ret1 < 0) {
149 LOG(FATAL) << MXGetLastError();
150 }
151 std::vector<std::vector<NDArrayHandle *>> nd_ptrs((*arr_handles).size());
152
153 // Invoke CachedOp same number of times as number of threads
154 for (size_t i = 0; i < (*arr_handles).size(); ++i) {
155 nd_ptrs[i].resize(num_threads);
156 (*result_expected)[i].resize(num_threads);
157 for (size_t j = 0; j < num_threads; ++j) {
158 int num_output = 0;
159 const int *stypes;
160 int ret4 = MXInvokeCachedOpEx(*hdl, (*arr_handles)[i][j].size(),
161 (*arr_handles)[i][j].data(), &num_output,
162 &nd_ptrs[i][j], &stypes);
163 if (ret4 < 0) {
164 LOG(FATAL) << MXGetLastError();
165 }
166 mxnet::cpp::NDArray::WaitAll();
167 (*result_expected)[i][j] = static_cast<NDArray *>(*nd_ptrs[i][j]);
168 }
169 }
170 }
171
run_inference(const std::string & model,int num_inf_per_thread=1,bool random_sleep=false,int num_threads=1,bool static_alloc=false,bool static_shape=false)172 void run_inference(const std::string& model,
173 int num_inf_per_thread = 1, bool random_sleep = false,
174 int num_threads = 1, bool static_alloc = false,
175 bool static_shape = false) {
176 // Load model
177 LOG(INFO) << "Running inference for " + model +
178 " num_threads: " + std::to_string(num_threads) +
179 " num_inf_per_thread: " + std::to_string(num_inf_per_thread) +
180 " random_sleep: " + std::to_string(random_sleep) +
181 " static_alloc: " + std::to_string(static_alloc) +
182 " static_shape: " + std::to_string(static_shape);
183 auto out = mxnet::cpp::Symbol::Load(model + "-symbol.json");
184 std::string static_alloc_str = static_alloc ? "true" : "false";
185 std::string static_shape_str = static_shape ? "true" : "false";
186
187 // Prepare context
188 #if MXNET_USE_CUDA == 1
189 Context backend_ctx;
190 mxnet::cpp::Context ctx = mxnet::cpp::Context::gpu(0);
191 if (!mxnet::test::thread_safety_force_cpu) {
192 backend_ctx = Context::GPU(0);
193 ctx = mxnet::cpp::Context::gpu(0);
194 } else {
195 backend_ctx = Context::CPU();
196 ctx = mxnet::cpp::Context::cpu();
197 }
198 #else
199 Context backend_ctx = Context::CPU(0);
200 mxnet::cpp::Context ctx = mxnet::cpp::Context::cpu(0);
201 #endif
202
203 // Prepare input data and parameters
204 std::vector<std::vector<mxnet::cpp::NDArray>> data_arr(num_inf_per_thread);
205 std::vector<std::vector<mxnet::cpp::NDArray>> softmax_arr(num_inf_per_thread);
206 std::vector<mxnet::cpp::NDArray> params;
207 mxnet::cpp::Shape data_shape = mxnet::cpp::Shape(1, 3, 224, 224);
208 mxnet::cpp::Shape softmax_shape = mxnet::cpp::Shape(1);
209 for (size_t i = 0; i < num_inf_per_thread; ++i) {
210 prepare_input_data(data_shape, ctx, num_threads, &(data_arr[i]), true);
211 prepare_input_data(softmax_shape, ctx, num_threads, &(softmax_arr[i]));
212 }
213 std::map<std::string, mxnet::cpp::NDArray> parameters;
214 mxnet::cpp::NDArray::Load(model + "-0000.params", 0, ¶meters);
215
216 for (std::string name : out.ListInputs()) {
217 if (name == "arg:data") {
218 continue;
219 }
220 if (parameters.find("arg:" + name) != parameters.end()) {
221 params.push_back(parameters["arg:" + name].Copy(ctx));
222 } else if (parameters.find("aux:" + name) != parameters.end()) {
223 params.push_back(parameters["aux:" + name].Copy(ctx));
224 }
225 }
226
227 // Prepare data_indices, param_indices and get_expected_results
228 std::vector<std::string> flag_keys{"data_indices", "param_indices",
229 "static_alloc", "static_shape"};
230 std::string param_indices = "[";
231 std::vector<std::vector<mxnet::NDArray*>> result_expected(num_inf_per_thread);
232 int num_inputs = out.ListInputs().size();
233 for (size_t i = 1; i < num_inputs; ++i) {
234 param_indices += std::to_string(i);
235 param_indices += std::string(", ");
236 }
237 param_indices += "]";
238 std::vector<std::string> flag_vals{"[0]", param_indices, static_alloc_str, static_shape_str};
239 std::vector<std::vector<std::vector<NDArrayHandle>>> arr_handles(num_inf_per_thread);
240 for (size_t i = 0; i < num_inf_per_thread; ++i) {
241 arr_handles[i].resize(num_threads);
242 for (size_t j = 0; j < num_threads; ++j) {
243 arr_handles[i][j].push_back(data_arr[i][j].GetHandle());
244 for (size_t k = 1; k < num_inputs - 1; k++) {
245 arr_handles[i][j].push_back(params[k - 1].GetHandle());
246 }
247 arr_handles[i][j].push_back(softmax_arr[i][j].GetHandle());
248 }
249 }
250 CachedOpHandle hdl = CachedOpHandle();
251 get_expected_results_multiple(out, flag_keys, flag_vals, &arr_handles,
252 num_threads, &result_expected, &hdl);
253
254
255 // Create thread safe cahced op
256 CachedOpHandle hdl2 = CachedOpHandle();
257 std::vector<const char *> flag_key_cstrs, flag_val_cstrs;
258 flag_key_cstrs.reserve(flag_keys.size());
259 for (size_t i = 0; i < flag_keys.size(); ++i) {
260 flag_key_cstrs.emplace_back(flag_keys[i].c_str());
261 }
262 for (size_t i = 0; i < flag_vals.size(); ++i) {
263 flag_val_cstrs.emplace_back(flag_vals[i].c_str());
264 }
265
266 int ret1 = MXCreateCachedOpEX(out.GetHandle(), flag_keys.size(),
267 flag_key_cstrs.data(), flag_val_cstrs.data(),
268 &hdl2, true);
269 if (ret1 < 0) {
270 LOG(FATAL) << MXGetLastError();
271 }
272
273
274 // Prepare data structures and lambda to run in different threads
275 std::vector<NDArrayHandle *> cached_op_handles(num_threads * num_inf_per_thread);
276 std::vector<std::vector<std::vector<mx_float>>> temp(num_inf_per_thread);
277 std::vector<std::vector<mxnet::NDArray*>> output_mx_arr(num_inf_per_thread);
278 for (size_t i = 0; i < num_inf_per_thread; i++) {
279 output_mx_arr[i].resize(num_threads);
280 temp[i].resize(num_threads);
281 for (size_t j = 0; j < num_threads; ++j) {
282 temp[i][j].resize(1000);
283 }
284 }
285
286 std::vector<std::vector<std::vector<NDArrayHandle>>> arr_handles2(num_inf_per_thread);
287 for (size_t i = 0; i < num_inf_per_thread; ++i) {
288 arr_handles2[i].resize(num_threads);
289 for (size_t j = 0; j < num_threads; ++j) {
290 arr_handles2[i][j].reserve(num_inputs);
291 arr_handles2[i][j].emplace_back(data_arr[i][j].GetHandle());
292 for (size_t k = 1; k < num_inputs - 1; ++k) {
293 arr_handles2[i][j].emplace_back(params[k - 1].GetHandle());
294 }
295 arr_handles2[i][j].emplace_back(softmax_arr[i][j].GetHandle());
296 }
297 }
298 std::vector<mxnet::NDArray> data(num_inf_per_thread * num_threads);
299 auto func = [&](int num) {
300 unsigned next = num;
301 for (size_t i = 0; i < num_inf_per_thread; ++i) {
302 if (random_sleep) {
303 static thread_local std::mt19937 generator;
304 std::uniform_int_distribution<int> distribution(0, 5);
305 int sleep_time = distribution(generator);
306 std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
307 }
308 int num_output = 0;
309 const int *stypes;
310 int ret = MXInvokeCachedOpEx(
311 hdl2, arr_handles2[i][num].size(), arr_handles2[i][num].data(),
312 &num_output, &(cached_op_handles[i * num_threads + num]), &stypes);
313 if (ret < 0) {
314 LOG(FATAL) << MXGetLastError();
315 }
316 output_mx_arr[i][num] = static_cast<mxnet::NDArray *>(
317 *cached_op_handles[i * num_threads + num]);
318 }
319 };
320
321 // Spawn multiple threads, join and wait for all threads to complete
322 std::vector<std::thread> worker_threads(num_threads);
323 int count = 0;
324 for (auto &&i : worker_threads) {
325 i = std::thread(func, count);
326 count++;
327 }
328
329 for (auto &&i : worker_threads) {
330 i.join();
331 }
332
333 mxnet::cpp::NDArray::WaitAll();
334 for (size_t i = 0; i < num_inf_per_thread; i++) {
335 mxnet::test::AssertEqual(output_mx_arr[i], result_expected[i], 1e-2, 1e-5);
336 }
337 mxnet::cpp::NDArray::WaitAll();
338 int ret2 = MXFreeCachedOp(hdl);
339 if (ret2 < 0) {
340 LOG(FATAL) << MXGetLastError();
341 }
342
343 ret2 = MXFreeCachedOp(hdl2);
344 if (ret2 < 0) {
345 LOG(FATAL) << MXGetLastError();
346 }
347 }
348
run_inference_unsupported(const std::string & model,int num_inf_per_thread=1,bool random_sleep=false,int num_threads=1,bool static_alloc=false,bool static_shape=false)349 void run_inference_unsupported(const std::string& model,
350 int num_inf_per_thread = 1, bool random_sleep = false,
351 int num_threads = 1, bool static_alloc = false,
352 bool static_shape = false) {
353 // Load model
354 LOG(INFO) << "Running inference for " + model +
355 " num_threads: " + std::to_string(num_threads) +
356 " num_inf_per_thread: " + std::to_string(num_inf_per_thread) +
357 " random_sleep: " + std::to_string(random_sleep) +
358 " static_alloc: " + std::to_string(static_alloc) +
359 " static_shape: " + std::to_string(static_shape);
360 auto out = mxnet::cpp::Symbol::Load(model + "-symbol.json");
361 std::string static_alloc_str = static_alloc ? "true" : "false";
362 std::string static_shape_str = static_shape ? "true" : "false";
363
364 // Prepare context
365 #if MXNET_USE_CUDA == 1
366 Context backend_ctx;
367 mxnet::cpp::Context ctx = mxnet::cpp::Context::gpu(0);
368 if (!mxnet::test::thread_safety_force_cpu) {
369 backend_ctx = Context::GPU(0);
370 ctx = mxnet::cpp::Context::gpu(0);
371 } else {
372 backend_ctx = Context::CPU();
373 ctx = mxnet::cpp::Context::cpu();
374 }
375 #else
376 Context backend_ctx = Context::CPU(0);
377 mxnet::cpp::Context ctx = mxnet::cpp::Context::cpu(0);
378 #endif
379
380 // Prepare input data and parameters
381 std::vector<std::vector<mxnet::cpp::NDArray>> data_arr(num_inf_per_thread);
382 std::vector<std::vector<mxnet::cpp::NDArray>> softmax_arr(num_inf_per_thread);
383 std::vector<mxnet::cpp::NDArray> params;
384 mxnet::cpp::Shape data_shape = mxnet::cpp::Shape(1, 3, 224, 224);
385 mxnet::cpp::Shape softmax_shape = mxnet::cpp::Shape(1);
386 for (size_t i = 0; i < num_inf_per_thread; ++i) {
387 prepare_input_data(data_shape, ctx, num_threads, &(data_arr[i]), true);
388 prepare_input_data(softmax_shape, ctx, num_threads, &(softmax_arr[i]));
389 }
390 std::map<std::string, mxnet::cpp::NDArray> parameters;
391 mxnet::cpp::NDArray::Load(model + "-0000.params", 0, ¶meters);
392
393 for (std::string name : out.ListInputs()) {
394 if (name == "arg:data") {
395 continue;
396 }
397 if (parameters.find("arg:" + name) != parameters.end()) {
398 params.push_back(parameters["arg:" + name].Copy(ctx));
399 } else if (parameters.find("aux:" + name) != parameters.end()) {
400 params.push_back(parameters["aux:" + name].Copy(ctx));
401 }
402 }
403
404 // Prepare data_indices, param_indices and get_expected_results
405 std::vector<std::string> flag_keys{"data_indices", "param_indices",
406 "static_alloc", "static_shape"};
407 std::string param_indices = "[";
408 std::vector<std::vector<mxnet::NDArray*>> result_expected(num_inf_per_thread);
409 int num_inputs = out.ListInputs().size();
410 for (size_t i = 1; i < num_inputs; ++i) {
411 param_indices += std::to_string(i);
412 param_indices += std::string(", ");
413 }
414 param_indices += "]";
415 std::vector<std::string> flag_vals{"[0]", param_indices, static_alloc_str, static_shape_str};
416 std::vector<std::vector<std::vector<NDArrayHandle>>> arr_handles(num_inf_per_thread);
417 for (size_t i = 0; i < num_inf_per_thread; ++i) {
418 arr_handles[i].resize(num_threads);
419 for (size_t j = 0; j < num_threads; ++j) {
420 arr_handles[i][j].push_back(data_arr[i][j].GetHandle());
421 for (size_t k = 1; k < num_inputs - 1; k++) {
422 arr_handles[i][j].push_back(params[k - 1].GetHandle());
423 }
424 arr_handles[i][j].push_back(softmax_arr[i][j].GetHandle());
425 }
426 }
427 CachedOpHandle hdl = CachedOpHandle();
428 get_expected_results_multiple(out, flag_keys, flag_vals, &arr_handles,
429 num_threads, &result_expected, &hdl);
430
431
432 // Create thread safe cahced op
433 CachedOpHandle hdl2 = CachedOpHandle();
434
435
436 // Prepare data structures and lambda to run in different threads
437 std::vector<NDArrayHandle *> cached_op_handles(num_threads * num_inf_per_thread);
438 std::vector<std::vector<mxnet::NDArray*>> output_mx_arr(num_inf_per_thread);
439 for (size_t i = 0; i < num_inf_per_thread; i++) {
440 output_mx_arr[i].resize(num_threads);
441 }
442
443 std::vector<std::vector<std::vector<NDArrayHandle>>> arr_handles2(num_inf_per_thread);
444 for (size_t i = 0; i < num_inf_per_thread; ++i) {
445 arr_handles2[i].resize(num_threads);
446 for (size_t j = 0; j < num_threads; ++j) {
447 arr_handles2[i][j].reserve(num_inputs);
448 arr_handles2[i][j].emplace_back(data_arr[i][j].GetHandle());
449 for (size_t k = 1; k < num_inputs - 1; ++k) {
450 arr_handles2[i][j].emplace_back(params[k - 1].GetHandle());
451 }
452 arr_handles2[i][j].emplace_back(softmax_arr[i][j].GetHandle());
453 }
454 }
455 std::vector<mxnet::NDArray> data(num_inf_per_thread * num_threads);
456 std::mutex mutex_;
457 auto func = [&](int num) {
458 std::vector<const char *> flag_key_cstrs, flag_val_cstrs;
459 flag_key_cstrs.reserve(flag_keys.size());
460 for (size_t i = 0; i < flag_keys.size(); ++i) {
461 flag_key_cstrs.emplace_back(flag_keys[i].c_str());
462 }
463 for (size_t i = 0; i < flag_vals.size(); ++i) {
464 flag_val_cstrs.emplace_back(flag_vals[i].c_str());
465 }
466
467 {
468 // Uncomment these lines for a workaround around the same
469 /*
470 std::lock_guard<std::mutex> lock{mutex_};
471 */
472
473 if (hdl2 == nullptr) {
474 int ret1 = MXCreateCachedOpEX(out.GetHandle(), flag_keys.size(),
475 flag_key_cstrs.data(),
476 flag_val_cstrs.data(), &hdl2, true);
477 if (ret1 < 0) {
478 LOG(FATAL) << MXGetLastError();
479 }
480 }
481 }
482
483 unsigned next = num;
484 for (size_t i = 0; i < num_inf_per_thread; ++i) {
485 if (random_sleep) {
486 static thread_local std::mt19937 generator;
487 std::uniform_int_distribution<int> distribution(0, 5);
488 int sleep_time = distribution(generator);
489 std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
490 }
491 int num_output = 0;
492 const int *stypes;
493 int ret = MXInvokeCachedOpEx(
494 hdl2, arr_handles2[i][num].size(), arr_handles2[i][num].data(),
495 &num_output, &(cached_op_handles[i * num_threads + num]), &stypes);
496 if (ret < 0) {
497 LOG(FATAL) << MXGetLastError();
498 }
499 mxnet::cpp::NDArray::WaitAll();
500 output_mx_arr[i][num] = static_cast<mxnet::NDArray *>(
501 *cached_op_handles[i * num_threads + num]);
502 }
503 };
504
505 // Spawn multiple threads, join and wait for all threads to complete
506 std::vector<std::thread> worker_threads(num_threads);
507 int count = 0;
508 for (auto &&i : worker_threads) {
509 i = std::thread(func, count);
510 count++;
511 }
512
513 for (auto &&i : worker_threads) {
514 i.join();
515 }
516
517 mxnet::cpp::NDArray::WaitAll();
518 for (size_t i = 0; i < num_inf_per_thread; i++) {
519 mxnet::test::AssertEqual(output_mx_arr[i], result_expected[i], 1e-2, 1e-5);
520 }
521 mxnet::cpp::NDArray::WaitAll();
522 int ret2 = MXFreeCachedOp(hdl);
523 if (ret2 < 0) {
524 LOG(FATAL) << MXGetLastError();
525 }
526
527 ret2 = MXFreeCachedOp(hdl2);
528 if (ret2 < 0) {
529 LOG(FATAL) << MXGetLastError();
530 }
531 }
532
533 /**
534 * Verifying engine thread safety by pushing ops from multiple threads to the
535 * dependency engine
536 */
TEST(ThreadSafety,Engine)537 TEST(ThreadSafety, Engine) {
538 int num_threads = 20;
539 #if MXNET_USE_CUDA == 1
540 Context backend_ctx;
541 mxnet::cpp::Context ctx = mxnet::cpp::Context::gpu(0);
542 DispatchMode dispatch_mode;
543 if (!mxnet::test::thread_safety_force_cpu) {
544 backend_ctx = Context::GPU(0);
545 ctx = mxnet::cpp::Context::gpu(0);
546 dispatch_mode = DispatchMode::kFCompute;
547 } else {
548 backend_ctx = Context::CPU();
549 ctx = mxnet::cpp::Context::cpu();
550 dispatch_mode = DispatchMode::kFComputeEx;
551 }
552 #else
553 Context backend_ctx = Context::CPU(0);
554 mxnet::cpp::Context ctx = mxnet::cpp::Context::cpu(0);
555 DispatchMode dispatch_mode = DispatchMode::kFComputeEx;
556 #endif
557 // Prepare convolution op and parse attrs
558 const nnvm::Op *op = Op::Get("Convolution");
559 nnvm::NodeAttrs attrs;
560 attrs.op = op;
561 attrs.name = "conv_node1";
562 std::unordered_map<std::string, std::string> params = {
563 {"kernel", "(2,2)"}, {"no_bias", "0"}, {"dilate", "(1,1)"},
564 {"num_group", "1"}, {"layout", "NCHW"}, {"stride", "(1,1)"},
565 {"pad", "(0,0)"}, {"num_filter", "10"}};
566 attrs.dict = params;
567 op->attr_parser(&attrs);
568
569 // Prepare input data
570 std::vector<mxnet::cpp::NDArray> data_arr, weight_arr, bias_arr, output_arr;
571 mxnet::cpp::Shape data_shape(2, 4, 10, 10);
572 mxnet::cpp::Shape weight_shape(10, 4, 2, 2);
573 mxnet::cpp::Shape bias_shape(10);
574 mxnet::cpp::Shape output_shape(2, 10, 9, 9);
575
576 prepare_input_data(data_shape, ctx, num_threads, &data_arr, true);
577 prepare_input_data(weight_shape, ctx, num_threads, &weight_arr, true);
578 prepare_input_data(bias_shape, ctx, num_threads, &bias_arr, true);
579 prepare_output_data(output_shape, ctx, num_threads, &output_arr);
580
581 // Prepare symbol
582 mxnet::cpp::Symbol data = mxnet::cpp::Symbol::Variable("data");
583 mxnet::cpp::Symbol weight = mxnet::cpp::Symbol::Variable("weight");
584 mxnet::cpp::Symbol bias = mxnet::cpp::Symbol::Variable("bias");
585 auto out = mxnet::cpp::Operator("Convolution")
586 .SetParam("kernel", mxnet::cpp::Shape(2, 2))
587 .SetParam("no_bias", false)
588 .SetParam("dilate", mxnet::cpp::Shape(1, 1))
589 .SetParam("num_group", 1)
590 .SetParam("layout", "NCHW")
591 .SetParam("stride", mxnet::cpp::Shape(1, 1))
592 .SetParam("pad", mxnet::cpp::Shape(0, 0))
593 .SetParam("num_filter", 10)
594 .SetInput("data", data)
595 .SetInput("weight", weight)
596 .SetInput("bias", bias)
597 .CreateSymbol("fwd");
598
599 // Prepare data_indices, param_indices and get_expected_results
600 std::vector<std::string> flag_keys{"data_indices", "param_indices"};
601 std::vector<std::string> flag_vals{"[0]", "[1,2]"};
602 std::vector<mxnet::NDArray*> result_expected(num_threads);
603
604 std::vector<std::vector<NDArrayHandle>> arr_handles(num_threads);
605 for (size_t i = 0; i < num_threads; ++i) {
606 arr_handles[i].push_back(data_arr[i].GetHandle());
607 arr_handles[i].push_back(weight_arr[i].GetHandle());
608 arr_handles[i].push_back(bias_arr[i].GetHandle());
609 }
610 CachedOpHandle hdl = CachedOpHandle();
611 get_expected_results(out, flag_keys, flag_vals, num_threads,
612 &arr_handles, &result_expected, &hdl);
613
614 // Prepare backend NDArray inputs
615 std::vector<mxnet::NDArray*> data_mx_arr, weight_mx_arr, bias_mx_arr, output_mx_arr;
616 prepare_backend_data(data_arr, num_threads, &data_mx_arr);
617 prepare_backend_data(weight_arr, num_threads, &weight_mx_arr);
618 prepare_backend_data(bias_arr, num_threads, &bias_mx_arr);
619 prepare_backend_data(output_arr, num_threads, &output_mx_arr);
620
621 // Prepare func which Invokes op
622 auto func = [&](int num) {
623 std::vector<mxnet::NDArray *> tmp_inputs, tmp_outputs;
624 tmp_inputs.emplace_back(data_mx_arr[num]);
625 tmp_inputs.emplace_back(weight_mx_arr[num]);
626 tmp_inputs.emplace_back(bias_mx_arr[num]);
627 tmp_outputs.emplace_back(output_mx_arr[num]);
628 std::vector<OpReqType> reqs;
629 reqs.push_back(kWriteTo);
630 Imperative::Get()->InvokeOp(backend_ctx, attrs, tmp_inputs, tmp_outputs,
631 reqs, dispatch_mode, OpStatePtr());
632 };
633
634 // Spawn multiple threads
635 std::vector<std::thread> worker_threads(num_threads);
636 int count = 0;
637 for (auto &&i : worker_threads) {
638 i = std::thread(func, count);
639 count++;
640 }
641
642 for (auto &&i : worker_threads) {
643 i.join();
644 }
645
646 mxnet::cpp::NDArray::WaitAll();
647 mxnet::test::AssertEqual(output_mx_arr, result_expected, 1e-2, 1e-5);
648 mxnet::cpp::NDArray::WaitAll();
649 }
650
TEST(ThreadSafety,CachedOpFullModel)651 TEST(ThreadSafety, CachedOpFullModel) {
652 std::vector<std::string> models_list = {
653 "imagenet1k-resnet-18", "imagenet1k-resnet-152", "imagenet1k-resnet-50"};
654 if (mxnet::test::thread_safety_force_cpu) {
655 models_list.push_back("imagenet1k-resnet-152-subgraph");
656 }
657 for (const auto &model : models_list) {
658 run_inference(model, 1, true, 20);
659 run_inference(model, 2, true, 20);
660 run_inference(model, 4, true, 5);
661 run_inference(model, 4, true, 20);
662 run_inference(model, 4, false, 20);
663 run_inference(model, 8, true, 20);
664 // static_alloc = true
665 run_inference(model, 2, true, 20, true);
666 run_inference(model, 4, true, 5, true);
667 run_inference(model, 4, true, 20, true);
668 run_inference(model, 8, true, 20, true);
669 // static_alloc = true, static_shape = true
670 run_inference(model, 4, true, 20, true, true);
671 run_inference(model, 8, true, 20, true, true);
672 // the below line may hang
673 // run_inference_unsupported(model, 32, false, 20);
674 }
675 }
676 #endif
677