1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 /*!
21  *  \file thread_safety_test.cc
22  *  \brief test thread safety at the dependency engine level and cached op level
23  */
24 
25 #if MXNET_USE_CPP_PACKAGE == 1
26 #include <stdio.h>
27 #include <gtest/gtest.h>
28 #include <mxnet/ndarray.h>
29 #include <mxnet/op_attr_types.h>
30 #include <chrono>
31 #include <cstdlib>
32 #include <random>
33 #include <thread>
34 #include "../src/engine/engine_impl.h"
35 #include "../src/imperative/imperative_utils.h"
36 #include "../include/test_util.h"
37 #include "mxnet-cpp/MxNetCpp.h"
38 
39 /*
40  * Prepares input data for the ops/models used in this file
41  */
prepare_input_data(const mxnet::cpp::Shape & shape,const mxnet::cpp::Context & ctx,int num_threads,std::vector<mxnet::cpp::NDArray> * data_arr,bool random_uniform=false)42 void prepare_input_data(const mxnet::cpp::Shape& shape, const mxnet::cpp::Context& ctx,
43                         int num_threads,
44                         std::vector<mxnet::cpp::NDArray>* data_arr,
45                         bool random_uniform = false) {
46   for (size_t i = 0; i < num_threads; ++i) {
47     data_arr->emplace_back(shape, ctx, false, 0);
48     int begin = i * 100;
49     int end = begin + 100;
50     if (random_uniform) {
51       mxnet::cpp::Operator("_random_uniform")(begin, end).Invoke((*data_arr)[i]);
52     }
53     mxnet::cpp::NDArray::WaitAll();
54   }
55 }
56 
prepare_output_data(const mxnet::cpp::Shape & shape,const mxnet::cpp::Context & ctx,int num_threads,std::vector<mxnet::cpp::NDArray> * output_arr)57 void prepare_output_data(const mxnet::cpp::Shape& shape, const mxnet::cpp::Context& ctx,
58                          int num_threads,
59                          std::vector<mxnet::cpp::NDArray>* output_arr) {
60     for (size_t i = 0; i < num_threads; ++i) {
61         output_arr->emplace_back(shape, ctx, false, 0);
62         mxnet::cpp::NDArray::WaitAll();
63     }
64 }
65 
66 /*
67  * Prepare backend ndarrays from cpp frontend ndarrays
68  */
prepare_backend_data(const std::vector<mxnet::cpp::NDArray> & input_cpp_arrs,int num_threads,std::vector<mxnet::NDArray * > * output_backend_arrs)69 void prepare_backend_data(const std::vector<mxnet::cpp::NDArray> &input_cpp_arrs,
70                           int num_threads,
71                           std::vector<mxnet::NDArray *> *output_backend_arrs) {
72   output_backend_arrs->resize(num_threads);
73   for (size_t i = 0; i < num_threads; ++i) {
74     (*output_backend_arrs)[i] = static_cast<NDArray *>(input_cpp_arrs[i].GetHandle());
75   }
76 }
77 
78 /*
79  * Create and Invoke CachedOp for given data
80  */
get_expected_results(const mxnet::cpp::Symbol & sym,const std::vector<std::string> & flag_keys,const std::vector<std::string> & flag_vals,int num_threads,std::vector<std::vector<NDArrayHandle>> * arr_handles,std::vector<mxnet::NDArray * > * result_expected,CachedOpHandle * hdl)81 void get_expected_results(const mxnet::cpp::Symbol &sym,
82                           const std::vector<std::string> &flag_keys,
83                           const std::vector<std::string> &flag_vals,
84                           int num_threads,
85                           std::vector<std::vector<NDArrayHandle>> *arr_handles,
86                           std::vector<mxnet::NDArray*> *result_expected,
87                           CachedOpHandle* hdl) {
88   // prepare flag_keys and flag_vals
89   std::vector<const char *> flag_key_cstrs, flag_val_cstrs;
90   flag_key_cstrs.reserve(flag_keys.size());
91   for (size_t i = 0; i < flag_keys.size(); ++i) {
92     flag_key_cstrs.emplace_back(flag_keys[i].c_str());
93   }
94   for (size_t i = 0; i < flag_vals.size(); ++i) {
95     flag_val_cstrs.emplace_back(flag_vals[i].c_str());
96   }
97 
98   // Create CachedOp
99   int ret1 = MXCreateCachedOpEx(sym.GetHandle(), flag_keys.size(),
100                                 flag_key_cstrs.data(), flag_val_cstrs.data(),
101                                 hdl);
102   if (ret1 < 0) {
103     LOG(FATAL) << MXGetLastError();
104   }
105 
106   std::vector<NDArrayHandle *> nd_ptrs(num_threads);
107 
108   // Invoke CachedOp same number of times as number of threads
109   for (size_t i = 0; i < num_threads; ++i) {
110     int num_output = 0;
111     const int *stypes;
112     int ret4 = MXInvokeCachedOpEx(*hdl, (*arr_handles)[i].size(), (*arr_handles)[i].data(),
113                                   &num_output, &nd_ptrs[i], &stypes);
114     if (ret4 < 0) {
115       LOG(FATAL) << MXGetLastError();
116     }
117     mxnet::cpp::NDArray::WaitAll();
118     (*result_expected)[i] = static_cast<NDArray*>(*nd_ptrs[i]);
119   }
120 }
121 
122 /*
123  * Create and Invoke CachedOp for multiple threads, each thread with multiple
124  * inferences
125  */
get_expected_results_multiple(const mxnet::cpp::Symbol & sym,const std::vector<std::string> & flag_keys,const std::vector<std::string> & flag_vals,std::vector<std::vector<std::vector<NDArrayHandle>>> * arr_handles,int num_threads,std::vector<std::vector<mxnet::NDArray * >> * result_expected,CachedOpHandle * hdl)126 inline void get_expected_results_multiple(
127     const mxnet::cpp::Symbol &sym,
128     const std::vector<std::string> &flag_keys, const std::vector<std::string> &flag_vals,
129     std::vector<std::vector<std::vector<NDArrayHandle>>> *arr_handles,
130     int num_threads,
131     std::vector<std::vector<mxnet::NDArray *>> *result_expected,
132     CachedOpHandle *hdl) {
133   // prepare flag_keys and flag_vals
134   std::vector<const char *> flag_key_cstrs, flag_val_cstrs;
135   flag_key_cstrs.reserve(flag_keys.size());
136   flag_val_cstrs.reserve(flag_vals.size());
137   for (size_t i = 0; i < flag_keys.size(); ++i) {
138     flag_key_cstrs.emplace_back(flag_keys[i].c_str());
139   }
140   for (size_t i = 0; i < flag_vals.size(); ++i) {
141     flag_val_cstrs.emplace_back(flag_vals[i].c_str());
142   }
143 
144   // Create CachedOp
145   int ret1 =
146       MXCreateCachedOpEX(sym.GetHandle(), flag_keys.size(),
147                          flag_key_cstrs.data(), flag_val_cstrs.data(), hdl, false);
148   if (ret1 < 0) {
149     LOG(FATAL) << MXGetLastError();
150   }
151   std::vector<std::vector<NDArrayHandle *>> nd_ptrs((*arr_handles).size());
152 
153   // Invoke CachedOp same number of times as number of threads
154   for (size_t i = 0; i < (*arr_handles).size(); ++i) {
155     nd_ptrs[i].resize(num_threads);
156     (*result_expected)[i].resize(num_threads);
157     for (size_t j = 0; j < num_threads; ++j) {
158       int num_output = 0;
159       const int *stypes;
160       int ret4 = MXInvokeCachedOpEx(*hdl, (*arr_handles)[i][j].size(),
161                                     (*arr_handles)[i][j].data(), &num_output,
162                                     &nd_ptrs[i][j], &stypes);
163       if (ret4 < 0) {
164         LOG(FATAL) << MXGetLastError();
165       }
166       mxnet::cpp::NDArray::WaitAll();
167       (*result_expected)[i][j] = static_cast<NDArray *>(*nd_ptrs[i][j]);
168     }
169   }
170 }
171 
run_inference(const std::string & model,int num_inf_per_thread=1,bool random_sleep=false,int num_threads=1,bool static_alloc=false,bool static_shape=false)172 void run_inference(const std::string& model,
173                    int num_inf_per_thread = 1, bool random_sleep = false,
174                    int num_threads = 1, bool static_alloc = false,
175                    bool static_shape = false) {
176     // Load model
177     LOG(INFO) << "Running inference for " + model +
178                  " num_threads: " + std::to_string(num_threads) +
179                  " num_inf_per_thread: " + std::to_string(num_inf_per_thread) +
180                  " random_sleep: " + std::to_string(random_sleep) +
181                  " static_alloc: " + std::to_string(static_alloc) +
182                  " static_shape: " + std::to_string(static_shape);
183     auto out = mxnet::cpp::Symbol::Load(model + "-symbol.json");
184     std::string static_alloc_str = static_alloc ? "true" : "false";
185     std::string static_shape_str = static_shape ? "true" : "false";
186 
187     // Prepare context
188 #if MXNET_USE_CUDA == 1
189     Context backend_ctx;
190     mxnet::cpp::Context ctx = mxnet::cpp::Context::gpu(0);
191     if (!mxnet::test::thread_safety_force_cpu) {
192       backend_ctx = Context::GPU(0);
193       ctx = mxnet::cpp::Context::gpu(0);
194     } else {
195       backend_ctx = Context::CPU();
196       ctx = mxnet::cpp::Context::cpu();
197     }
198 #else
199     Context backend_ctx = Context::CPU(0);
200     mxnet::cpp::Context ctx = mxnet::cpp::Context::cpu(0);
201 #endif
202 
203     // Prepare input data and parameters
204     std::vector<std::vector<mxnet::cpp::NDArray>> data_arr(num_inf_per_thread);
205     std::vector<std::vector<mxnet::cpp::NDArray>> softmax_arr(num_inf_per_thread);
206     std::vector<mxnet::cpp::NDArray> params;
207     mxnet::cpp::Shape data_shape = mxnet::cpp::Shape(1, 3, 224, 224);
208     mxnet::cpp::Shape softmax_shape = mxnet::cpp::Shape(1);
209     for (size_t i = 0; i < num_inf_per_thread; ++i) {
210      prepare_input_data(data_shape, ctx, num_threads, &(data_arr[i]), true);
211      prepare_input_data(softmax_shape, ctx, num_threads, &(softmax_arr[i]));
212     }
213     std::map<std::string, mxnet::cpp::NDArray> parameters;
214     mxnet::cpp::NDArray::Load(model + "-0000.params", 0, &parameters);
215 
216     for (std::string name : out.ListInputs()) {
217         if (name == "arg:data") {
218             continue;
219         }
220         if (parameters.find("arg:" + name) != parameters.end()) {
221             params.push_back(parameters["arg:" + name].Copy(ctx));
222         } else if (parameters.find("aux:" + name) != parameters.end()) {
223             params.push_back(parameters["aux:" + name].Copy(ctx));
224         }
225     }
226 
227     // Prepare data_indices, param_indices and get_expected_results
228     std::vector<std::string> flag_keys{"data_indices", "param_indices",
229                                        "static_alloc", "static_shape"};
230     std::string param_indices = "[";
231     std::vector<std::vector<mxnet::NDArray*>> result_expected(num_inf_per_thread);
232     int num_inputs = out.ListInputs().size();
233     for (size_t i = 1; i < num_inputs; ++i) {
234       param_indices += std::to_string(i);
235       param_indices += std::string(", ");
236     }
237     param_indices += "]";
238     std::vector<std::string> flag_vals{"[0]", param_indices, static_alloc_str, static_shape_str};
239     std::vector<std::vector<std::vector<NDArrayHandle>>> arr_handles(num_inf_per_thread);
240     for (size_t i = 0; i < num_inf_per_thread; ++i) {
241       arr_handles[i].resize(num_threads);
242       for (size_t j = 0; j < num_threads; ++j) {
243         arr_handles[i][j].push_back(data_arr[i][j].GetHandle());
244         for (size_t k = 1; k < num_inputs - 1; k++) {
245           arr_handles[i][j].push_back(params[k - 1].GetHandle());
246         }
247         arr_handles[i][j].push_back(softmax_arr[i][j].GetHandle());
248       }
249     }
250     CachedOpHandle hdl = CachedOpHandle();
251     get_expected_results_multiple(out, flag_keys, flag_vals, &arr_handles,
252                                   num_threads, &result_expected, &hdl);
253 
254 
255     // Create thread safe cahced op
256     CachedOpHandle hdl2 = CachedOpHandle();
257     std::vector<const char *> flag_key_cstrs, flag_val_cstrs;
258     flag_key_cstrs.reserve(flag_keys.size());
259     for (size_t i = 0; i < flag_keys.size(); ++i) {
260       flag_key_cstrs.emplace_back(flag_keys[i].c_str());
261     }
262     for (size_t i = 0; i < flag_vals.size(); ++i) {
263       flag_val_cstrs.emplace_back(flag_vals[i].c_str());
264     }
265 
266     int ret1 = MXCreateCachedOpEX(out.GetHandle(), flag_keys.size(),
267                                   flag_key_cstrs.data(), flag_val_cstrs.data(),
268                                   &hdl2, true);
269     if (ret1 < 0) {
270       LOG(FATAL) << MXGetLastError();
271     }
272 
273 
274     // Prepare data structures and lambda to run in different threads
275     std::vector<NDArrayHandle *> cached_op_handles(num_threads * num_inf_per_thread);
276     std::vector<std::vector<std::vector<mx_float>>> temp(num_inf_per_thread);
277     std::vector<std::vector<mxnet::NDArray*>> output_mx_arr(num_inf_per_thread);
278     for (size_t i = 0; i < num_inf_per_thread; i++) {
279         output_mx_arr[i].resize(num_threads);
280         temp[i].resize(num_threads);
281         for (size_t j = 0; j < num_threads; ++j) {
282             temp[i][j].resize(1000);
283         }
284     }
285 
286     std::vector<std::vector<std::vector<NDArrayHandle>>> arr_handles2(num_inf_per_thread);
287     for (size_t i = 0; i < num_inf_per_thread; ++i) {
288         arr_handles2[i].resize(num_threads);
289         for (size_t j = 0; j < num_threads; ++j) {
290             arr_handles2[i][j].reserve(num_inputs);
291             arr_handles2[i][j].emplace_back(data_arr[i][j].GetHandle());
292             for (size_t k = 1; k < num_inputs - 1; ++k) {
293                 arr_handles2[i][j].emplace_back(params[k - 1].GetHandle());
294             }
295             arr_handles2[i][j].emplace_back(softmax_arr[i][j].GetHandle());
296         }
297     }
298     std::vector<mxnet::NDArray> data(num_inf_per_thread * num_threads);
299     auto func = [&](int num) {
300       unsigned next = num;
301       for (size_t i = 0; i < num_inf_per_thread; ++i) {
302         if (random_sleep) {
303           static thread_local std::mt19937 generator;
304           std::uniform_int_distribution<int> distribution(0, 5);
305           int sleep_time = distribution(generator);
306           std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
307         }
308         int num_output = 0;
309         const int *stypes;
310         int ret = MXInvokeCachedOpEx(
311             hdl2, arr_handles2[i][num].size(), arr_handles2[i][num].data(),
312             &num_output, &(cached_op_handles[i * num_threads + num]), &stypes);
313         if (ret < 0) {
314             LOG(FATAL) << MXGetLastError();
315         }
316         output_mx_arr[i][num] = static_cast<mxnet::NDArray *>(
317             *cached_op_handles[i * num_threads + num]);
318       }
319     };
320 
321     // Spawn multiple threads, join and wait for all threads to complete
322     std::vector<std::thread> worker_threads(num_threads);
323     int count = 0;
324     for (auto &&i : worker_threads) {
325       i = std::thread(func, count);
326       count++;
327     }
328 
329     for (auto &&i : worker_threads) {
330       i.join();
331     }
332 
333     mxnet::cpp::NDArray::WaitAll();
334     for (size_t i = 0; i < num_inf_per_thread; i++) {
335       mxnet::test::AssertEqual(output_mx_arr[i], result_expected[i], 1e-2, 1e-5);
336     }
337     mxnet::cpp::NDArray::WaitAll();
338     int ret2 = MXFreeCachedOp(hdl);
339     if (ret2 < 0) {
340       LOG(FATAL) << MXGetLastError();
341     }
342 
343     ret2 = MXFreeCachedOp(hdl2);
344     if (ret2 < 0) {
345       LOG(FATAL) << MXGetLastError();
346     }
347 }
348 
run_inference_unsupported(const std::string & model,int num_inf_per_thread=1,bool random_sleep=false,int num_threads=1,bool static_alloc=false,bool static_shape=false)349 void run_inference_unsupported(const std::string& model,
350                    int num_inf_per_thread = 1, bool random_sleep = false,
351                    int num_threads = 1, bool static_alloc = false,
352                    bool static_shape = false) {
353     // Load model
354     LOG(INFO) << "Running inference for " + model +
355                  " num_threads: " + std::to_string(num_threads) +
356                  " num_inf_per_thread: " + std::to_string(num_inf_per_thread) +
357                  " random_sleep: " + std::to_string(random_sleep) +
358                  " static_alloc: " + std::to_string(static_alloc) +
359                  " static_shape: " + std::to_string(static_shape);
360     auto out = mxnet::cpp::Symbol::Load(model + "-symbol.json");
361     std::string static_alloc_str = static_alloc ? "true" : "false";
362     std::string static_shape_str = static_shape ? "true" : "false";
363 
364     // Prepare context
365 #if MXNET_USE_CUDA == 1
366     Context backend_ctx;
367     mxnet::cpp::Context ctx = mxnet::cpp::Context::gpu(0);
368     if (!mxnet::test::thread_safety_force_cpu) {
369       backend_ctx = Context::GPU(0);
370       ctx = mxnet::cpp::Context::gpu(0);
371     } else {
372       backend_ctx = Context::CPU();
373       ctx = mxnet::cpp::Context::cpu();
374     }
375 #else
376     Context backend_ctx = Context::CPU(0);
377     mxnet::cpp::Context ctx = mxnet::cpp::Context::cpu(0);
378 #endif
379 
380     // Prepare input data and parameters
381     std::vector<std::vector<mxnet::cpp::NDArray>> data_arr(num_inf_per_thread);
382     std::vector<std::vector<mxnet::cpp::NDArray>> softmax_arr(num_inf_per_thread);
383     std::vector<mxnet::cpp::NDArray> params;
384     mxnet::cpp::Shape data_shape = mxnet::cpp::Shape(1, 3, 224, 224);
385     mxnet::cpp::Shape softmax_shape = mxnet::cpp::Shape(1);
386     for (size_t i = 0; i < num_inf_per_thread; ++i) {
387      prepare_input_data(data_shape, ctx, num_threads, &(data_arr[i]), true);
388      prepare_input_data(softmax_shape, ctx, num_threads, &(softmax_arr[i]));
389     }
390     std::map<std::string, mxnet::cpp::NDArray> parameters;
391     mxnet::cpp::NDArray::Load(model + "-0000.params", 0, &parameters);
392 
393     for (std::string name : out.ListInputs()) {
394         if (name == "arg:data") {
395             continue;
396         }
397         if (parameters.find("arg:" + name) != parameters.end()) {
398             params.push_back(parameters["arg:" + name].Copy(ctx));
399         } else if (parameters.find("aux:" + name) != parameters.end()) {
400             params.push_back(parameters["aux:" + name].Copy(ctx));
401         }
402     }
403 
404     // Prepare data_indices, param_indices and get_expected_results
405     std::vector<std::string> flag_keys{"data_indices", "param_indices",
406                                        "static_alloc", "static_shape"};
407     std::string param_indices = "[";
408     std::vector<std::vector<mxnet::NDArray*>> result_expected(num_inf_per_thread);
409     int num_inputs = out.ListInputs().size();
410     for (size_t i = 1; i < num_inputs; ++i) {
411       param_indices += std::to_string(i);
412       param_indices += std::string(", ");
413     }
414     param_indices += "]";
415     std::vector<std::string> flag_vals{"[0]", param_indices, static_alloc_str, static_shape_str};
416     std::vector<std::vector<std::vector<NDArrayHandle>>> arr_handles(num_inf_per_thread);
417     for (size_t i = 0; i < num_inf_per_thread; ++i) {
418       arr_handles[i].resize(num_threads);
419       for (size_t j = 0; j < num_threads; ++j) {
420         arr_handles[i][j].push_back(data_arr[i][j].GetHandle());
421         for (size_t k = 1; k < num_inputs - 1; k++) {
422           arr_handles[i][j].push_back(params[k - 1].GetHandle());
423         }
424         arr_handles[i][j].push_back(softmax_arr[i][j].GetHandle());
425       }
426     }
427     CachedOpHandle hdl = CachedOpHandle();
428     get_expected_results_multiple(out, flag_keys, flag_vals, &arr_handles,
429                                   num_threads, &result_expected, &hdl);
430 
431 
432     // Create thread safe cahced op
433     CachedOpHandle hdl2 = CachedOpHandle();
434 
435 
436     // Prepare data structures and lambda to run in different threads
437     std::vector<NDArrayHandle *> cached_op_handles(num_threads * num_inf_per_thread);
438     std::vector<std::vector<mxnet::NDArray*>> output_mx_arr(num_inf_per_thread);
439     for (size_t i = 0; i < num_inf_per_thread; i++) {
440         output_mx_arr[i].resize(num_threads);
441     }
442 
443     std::vector<std::vector<std::vector<NDArrayHandle>>> arr_handles2(num_inf_per_thread);
444     for (size_t i = 0; i < num_inf_per_thread; ++i) {
445         arr_handles2[i].resize(num_threads);
446         for (size_t j = 0; j < num_threads; ++j) {
447             arr_handles2[i][j].reserve(num_inputs);
448             arr_handles2[i][j].emplace_back(data_arr[i][j].GetHandle());
449             for (size_t k = 1; k < num_inputs - 1; ++k) {
450                 arr_handles2[i][j].emplace_back(params[k - 1].GetHandle());
451             }
452             arr_handles2[i][j].emplace_back(softmax_arr[i][j].GetHandle());
453         }
454     }
455     std::vector<mxnet::NDArray> data(num_inf_per_thread * num_threads);
456     std::mutex mutex_;
457     auto func = [&](int num) {
458       std::vector<const char *> flag_key_cstrs, flag_val_cstrs;
459       flag_key_cstrs.reserve(flag_keys.size());
460       for (size_t i = 0; i < flag_keys.size(); ++i) {
461         flag_key_cstrs.emplace_back(flag_keys[i].c_str());
462       }
463       for (size_t i = 0; i < flag_vals.size(); ++i) {
464         flag_val_cstrs.emplace_back(flag_vals[i].c_str());
465       }
466 
467       {
468       // Uncomment these lines for a workaround around the same
469       /*
470       std::lock_guard<std::mutex> lock{mutex_};
471       */
472 
473       if (hdl2 == nullptr) {
474         int ret1 = MXCreateCachedOpEX(out.GetHandle(), flag_keys.size(),
475                                       flag_key_cstrs.data(),
476                                       flag_val_cstrs.data(), &hdl2, true);
477         if (ret1 < 0) {
478           LOG(FATAL) << MXGetLastError();
479         }
480       }
481       }
482 
483       unsigned next = num;
484       for (size_t i = 0; i < num_inf_per_thread; ++i) {
485         if (random_sleep) {
486           static thread_local std::mt19937 generator;
487           std::uniform_int_distribution<int> distribution(0, 5);
488           int sleep_time = distribution(generator);
489           std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
490         }
491         int num_output = 0;
492         const int *stypes;
493         int ret = MXInvokeCachedOpEx(
494             hdl2, arr_handles2[i][num].size(), arr_handles2[i][num].data(),
495             &num_output, &(cached_op_handles[i * num_threads + num]), &stypes);
496         if (ret < 0) {
497           LOG(FATAL) << MXGetLastError();
498         }
499         mxnet::cpp::NDArray::WaitAll();
500         output_mx_arr[i][num] = static_cast<mxnet::NDArray *>(
501             *cached_op_handles[i * num_threads + num]);
502       }
503     };
504 
505     // Spawn multiple threads, join and wait for all threads to complete
506     std::vector<std::thread> worker_threads(num_threads);
507     int count = 0;
508     for (auto &&i : worker_threads) {
509       i = std::thread(func, count);
510       count++;
511     }
512 
513     for (auto &&i : worker_threads) {
514       i.join();
515     }
516 
517     mxnet::cpp::NDArray::WaitAll();
518     for (size_t i = 0; i < num_inf_per_thread; i++) {
519       mxnet::test::AssertEqual(output_mx_arr[i], result_expected[i], 1e-2, 1e-5);
520     }
521     mxnet::cpp::NDArray::WaitAll();
522     int ret2 = MXFreeCachedOp(hdl);
523     if (ret2 < 0) {
524       LOG(FATAL) << MXGetLastError();
525     }
526 
527     ret2 = MXFreeCachedOp(hdl2);
528     if (ret2 < 0) {
529       LOG(FATAL) << MXGetLastError();
530     }
531 }
532 
533 /**
534  * Verifying engine thread safety by pushing ops from multiple threads to the
535  * dependency engine
536  */
TEST(ThreadSafety,Engine)537 TEST(ThreadSafety, Engine) {
538   int num_threads = 20;
539 #if MXNET_USE_CUDA == 1
540   Context backend_ctx;
541   mxnet::cpp::Context ctx = mxnet::cpp::Context::gpu(0);
542   DispatchMode dispatch_mode;
543   if (!mxnet::test::thread_safety_force_cpu) {
544     backend_ctx = Context::GPU(0);
545     ctx = mxnet::cpp::Context::gpu(0);
546     dispatch_mode = DispatchMode::kFCompute;
547   } else {
548     backend_ctx = Context::CPU();
549     ctx = mxnet::cpp::Context::cpu();
550     dispatch_mode = DispatchMode::kFComputeEx;
551   }
552 #else
553   Context backend_ctx = Context::CPU(0);
554   mxnet::cpp::Context ctx = mxnet::cpp::Context::cpu(0);
555   DispatchMode dispatch_mode = DispatchMode::kFComputeEx;
556 #endif
557   // Prepare convolution op and parse attrs
558   const nnvm::Op *op = Op::Get("Convolution");
559   nnvm::NodeAttrs attrs;
560   attrs.op = op;
561   attrs.name = "conv_node1";
562   std::unordered_map<std::string, std::string> params = {
563       {"kernel", "(2,2)"}, {"no_bias", "0"},    {"dilate", "(1,1)"},
564       {"num_group", "1"},  {"layout", "NCHW"},  {"stride", "(1,1)"},
565       {"pad", "(0,0)"},    {"num_filter", "10"}};
566   attrs.dict = params;
567   op->attr_parser(&attrs);
568 
569   // Prepare input data
570   std::vector<mxnet::cpp::NDArray> data_arr, weight_arr, bias_arr, output_arr;
571   mxnet::cpp::Shape data_shape(2, 4, 10, 10);
572   mxnet::cpp::Shape weight_shape(10, 4, 2, 2);
573   mxnet::cpp::Shape bias_shape(10);
574   mxnet::cpp::Shape output_shape(2, 10, 9, 9);
575 
576   prepare_input_data(data_shape, ctx, num_threads, &data_arr, true);
577   prepare_input_data(weight_shape, ctx, num_threads, &weight_arr, true);
578   prepare_input_data(bias_shape, ctx, num_threads, &bias_arr, true);
579   prepare_output_data(output_shape, ctx, num_threads, &output_arr);
580 
581   // Prepare symbol
582   mxnet::cpp::Symbol data = mxnet::cpp::Symbol::Variable("data");
583   mxnet::cpp::Symbol weight = mxnet::cpp::Symbol::Variable("weight");
584   mxnet::cpp::Symbol bias = mxnet::cpp::Symbol::Variable("bias");
585   auto out = mxnet::cpp::Operator("Convolution")
586       .SetParam("kernel", mxnet::cpp::Shape(2, 2))
587       .SetParam("no_bias", false)
588       .SetParam("dilate", mxnet::cpp::Shape(1, 1))
589       .SetParam("num_group", 1)
590       .SetParam("layout", "NCHW")
591       .SetParam("stride", mxnet::cpp::Shape(1, 1))
592       .SetParam("pad", mxnet::cpp::Shape(0, 0))
593       .SetParam("num_filter", 10)
594       .SetInput("data", data)
595       .SetInput("weight", weight)
596       .SetInput("bias", bias)
597       .CreateSymbol("fwd");
598 
599   // Prepare data_indices, param_indices and get_expected_results
600   std::vector<std::string> flag_keys{"data_indices", "param_indices"};
601   std::vector<std::string> flag_vals{"[0]", "[1,2]"};
602   std::vector<mxnet::NDArray*> result_expected(num_threads);
603 
604   std::vector<std::vector<NDArrayHandle>> arr_handles(num_threads);
605   for (size_t i = 0; i < num_threads; ++i) {
606       arr_handles[i].push_back(data_arr[i].GetHandle());
607       arr_handles[i].push_back(weight_arr[i].GetHandle());
608       arr_handles[i].push_back(bias_arr[i].GetHandle());
609   }
610   CachedOpHandle hdl = CachedOpHandle();
611   get_expected_results(out, flag_keys, flag_vals, num_threads,
612                        &arr_handles, &result_expected, &hdl);
613 
614   // Prepare backend NDArray inputs
615   std::vector<mxnet::NDArray*> data_mx_arr, weight_mx_arr, bias_mx_arr, output_mx_arr;
616   prepare_backend_data(data_arr, num_threads, &data_mx_arr);
617   prepare_backend_data(weight_arr, num_threads, &weight_mx_arr);
618   prepare_backend_data(bias_arr, num_threads, &bias_mx_arr);
619   prepare_backend_data(output_arr, num_threads, &output_mx_arr);
620 
621   // Prepare func which Invokes op
622   auto func = [&](int num) {
623     std::vector<mxnet::NDArray *> tmp_inputs, tmp_outputs;
624     tmp_inputs.emplace_back(data_mx_arr[num]);
625     tmp_inputs.emplace_back(weight_mx_arr[num]);
626     tmp_inputs.emplace_back(bias_mx_arr[num]);
627     tmp_outputs.emplace_back(output_mx_arr[num]);
628     std::vector<OpReqType> reqs;
629     reqs.push_back(kWriteTo);
630     Imperative::Get()->InvokeOp(backend_ctx, attrs, tmp_inputs, tmp_outputs,
631                                 reqs, dispatch_mode, OpStatePtr());
632   };
633 
634   // Spawn multiple threads
635   std::vector<std::thread> worker_threads(num_threads);
636   int count = 0;
637   for (auto &&i : worker_threads) {
638     i = std::thread(func, count);
639     count++;
640   }
641 
642   for (auto &&i : worker_threads) {
643     i.join();
644   }
645 
646   mxnet::cpp::NDArray::WaitAll();
647   mxnet::test::AssertEqual(output_mx_arr, result_expected, 1e-2, 1e-5);
648   mxnet::cpp::NDArray::WaitAll();
649 }
650 
TEST(ThreadSafety,CachedOpFullModel)651 TEST(ThreadSafety, CachedOpFullModel) {
652   std::vector<std::string> models_list = {
653       "imagenet1k-resnet-18", "imagenet1k-resnet-152", "imagenet1k-resnet-50"};
654   if (mxnet::test::thread_safety_force_cpu) {
655     models_list.push_back("imagenet1k-resnet-152-subgraph");
656   }
657   for (const auto &model : models_list) {
658     run_inference(model, 1, true, 20);
659     run_inference(model, 2, true, 20);
660     run_inference(model, 4, true, 5);
661     run_inference(model, 4, true, 20);
662     run_inference(model, 4, false, 20);
663     run_inference(model, 8, true, 20);
664     // static_alloc = true
665     run_inference(model, 2, true, 20, true);
666     run_inference(model, 4, true, 5, true);
667     run_inference(model, 4, true, 20, true);
668     run_inference(model, 8, true, 20, true);
669     // static_alloc = true, static_shape = true
670     run_inference(model, 4, true, 20, true, true);
671     run_inference(model, 8, true, 20, true, true);
672     // the below line may hang
673     // run_inference_unsupported(model, 32, false, 20);
674   }
675 }
676 #endif
677