1 #include <dmlc/io.h>
2 #include <dmlc/memory_io.h>
3 #include <dmlc/blockingconcurrentqueue.h>
4 #include <dmlc/thread_group.h>
5 #include <gtest/gtest.h>
6 
7 #if (defined _WIN32)
8 
9 #define NOMINMAX
10 #include <Windows.h>
dmlc_usleep(__int64 usec)11 static inline void dmlc_usleep(__int64 usec) {
12   HANDLE timer;
13   LARGE_INTEGER ft;
14 
15   ft.QuadPart = -(10*usec); // Convert to 100 nanosecond interval, negative value indicates relative time
16 
17   timer = CreateWaitableTimer(NULL, TRUE, NULL);
18   SetWaitableTimer(timer, &ft, 0, NULL, NULL, 0);
19   WaitForSingleObject(timer, INFINITE);
20   CloseHandle(timer);
21 }
22 
23 #elif (defined DMLC_NANOSLEEP_PRESENT)
24 
25 #include <sys/types.h>  // for useconds_t, time_t
26 #include <time.h>  // for timespec, nanosleep
27 
dmlc_usleep(useconds_t useconds)28 static inline int dmlc_usleep(useconds_t useconds) {
29   timespec ts;
30   ts.tv_sec = static_cast<time_t>(useconds / 1000000);
31   ts.tv_nsec = static_cast<long>(useconds % 1000000 * 1000ul);
32   return nanosleep(&ts, NULL);
33 }
34 
35 #else
36 
37 #include <unistd.h>   // for usleep()
38 
dmlc_usleep(useconds_t useconds)39 static inline int dmlc_usleep(useconds_t useconds) {
40   return usleep(useconds);
41 }
42 
43 #endif
44 
45 static std::atomic<int> thread_count(0);
46 
this_is_thread_func(std::string label,const bool with_delay)47 static int this_is_thread_func(std::string label, const bool with_delay) {
48   ++thread_count;
49   if(with_delay) {
50     dmlc_usleep(1e4);
51   }
52   --thread_count;
53   return 0;
54 }
55 
56 /*!
57  * \brief Generic Thread launch to standalone function, passing ThreadGroup owner
58  */
TEST(ThreadGroup,ThreadLaunchAutoRemove)59 TEST(ThreadGroup, ThreadLaunchAutoRemove) {
60   std::shared_ptr<dmlc::ThreadGroup> thread_group = std::make_shared<dmlc::ThreadGroup>();
61   for(int x = 0; x < 200; ++x) {
62     dmlc::ThreadGroup::Thread::SharedPtr thread =
63       std::make_shared<dmlc::ThreadGroup::Thread>(std::string("test_thread_ar ")
64                                                          + std::to_string(x), thread_group.get());
65     dmlc::ThreadGroup::Thread::launch(thread, true, this_is_thread_func, "Runner", false);
66   }
67   thread_group.reset();
68   CHECK_EQ(thread_count, 0);
69 }
70 
71 /*!
72  * \brief Generic Thread launch to standalone function, passing ThreadGroup owner
73  */
TEST(ThreadGroup,ThreadLaunchAutoRemoveWithDelay)74 TEST(ThreadGroup, ThreadLaunchAutoRemoveWithDelay) {
75   std::shared_ptr<dmlc::ThreadGroup> thread_group = std::make_shared<dmlc::ThreadGroup>();
76   for(int x = 0; x < 200; ++x) {
77     dmlc::ThreadGroup::Thread::SharedPtr thread =
78       std::make_shared<dmlc::ThreadGroup::Thread>(std::string("test_thread_rwd ")
79                                                          + std::to_string(x), thread_group.get());
80     dmlc::ThreadGroup::Thread::launch(thread, true, this_is_thread_func, "Runner", true);
81   }
82   thread_group.reset();
83   CHECK_EQ(thread_count, 0);
84 }
85 
86 /*!
87  * \brief Generic Thread launch to standalone function, passing ThreadGroup owner
88  */
TEST(ThreadGroup,ThreadLaunchNoAutoRemove)89 TEST(ThreadGroup, ThreadLaunchNoAutoRemove) {
90   std::shared_ptr<dmlc::ThreadGroup> thread_group = std::make_shared<dmlc::ThreadGroup>();
91   for(int x = 0; x < 200; ++x) {
92     dmlc::ThreadGroup::Thread::SharedPtr thread =
93       std::make_shared<dmlc::ThreadGroup::Thread>(std::string("test_thread_nao ")
94                                                          + std::to_string(x), thread_group.get());
95     dmlc::ThreadGroup::Thread::launch(thread, false, this_is_thread_func, "Runner", false);
96   }
97   thread_group.reset();
98   CHECK_EQ(thread_count, 0);
99 }
100 
101 /*!
102  * \brief Generic Thread launch to standalone function, passing ThreadGroup owner
103  */
TEST(ThreadGroup,ThreadLaunchNoAutoRemoveWithDelay)104 TEST(ThreadGroup, ThreadLaunchNoAutoRemoveWithDelay) {
105   std::shared_ptr<dmlc::ThreadGroup> thread_group = std::make_shared<dmlc::ThreadGroup>();
106   for(int x = 0; x < 200; ++x) {
107     dmlc::ThreadGroup::Thread::SharedPtr thread =
108       std::make_shared<dmlc::ThreadGroup::Thread>(std::string("test_thread_narwd ")
109                                                          + std::to_string(x), thread_group.get());
110     dmlc::ThreadGroup::Thread::launch(thread, false, this_is_thread_func, "Runner", true);
111   }
112   thread_group.reset();
113   CHECK_EQ(thread_count, 0);
114 }
115 
116 /*!
117  * \brief Test BlockingQueueThread
118  */
TEST(ThreadGroup,ThreadLaunchQueueThread)119 TEST(ThreadGroup, ThreadLaunchQueueThread) {
120   // Define the queue type for convenience
121   using BQ = dmlc::BlockingQueueThread<int, -1>;
122 
123   // Create the thread group
124   std::shared_ptr<dmlc::ThreadGroup> thread_group = std::make_shared<dmlc::ThreadGroup>();
125 
126   // Create the queue thread object
127   std::shared_ptr<BQ> queue_thread = std::make_shared<BQ>("BlockingQueueThread",
128                                                           thread_group.get());
129 
130   // Queue some stuff before the thread starts
131   queue_thread->enqueue(1);
132   queue_thread->enqueue(2);
133   queue_thread->enqueue(3);
134   queue_thread->enqueue(4);
135   CHECK_EQ(queue_thread->size_approx(), 4U);
136   // Launch the queue thread, passing queue item handler as lambda
137   BQ::launch_run(queue_thread,
138                  // Queue item handler
139                  [queue_thread](int item) -> int {
140                    std::cout << "ITEM: " << item
141                              << std::endl << std::flush;
142                    if(item >= 2 && item <= 3) {
143                      // Queue some more while thread is running
144                      queue_thread->enqueue(100 + item);
145                    }
146                    return 0;  // return 0 means continue
147                  });
148   // Trigger the queues to exit
149   thread_group->request_shutdown_all(false);
150   // Wait for all of the queue threads to exit
151   thread_group->join_all();
152   // Check that the queue is empty
153   CHECK_EQ(queue_thread->size_approx(), 0);
154 }
155 
156 using Tick = std::chrono::high_resolution_clock::time_point;
Now()157 static inline Tick Now() { return std::chrono::high_resolution_clock::now(); }
GetDurationInNanoseconds(const Tick & t1,const Tick & t2)158 static inline uint64_t GetDurationInNanoseconds(const Tick &t1, const Tick &t2) {
159   return static_cast<uint64_t>(
160     std::chrono::duration_cast<std::chrono::nanoseconds>(t2 - t1).count());
161 }
GetDurationInNanoseconds(const Tick & since)162 static inline uint64_t GetDurationInNanoseconds(const Tick &since) {
163   return GetDurationInNanoseconds(since, Now());
164 }
165 
166 constexpr size_t SLEEP_DURATION = 500;
167 constexpr size_t TIMER_PERIOD = 10;  // Ideal is 50 periods occur
168 constexpr size_t MIN_COUNT_WHILE_SLEEPING = 10;
169 constexpr size_t MAX_COUNT_WHILE_SLEEPING = 150;
170 
GetDurationInMilliseconds(const Tick & start_time)171 inline size_t GetDurationInMilliseconds(const Tick& start_time) {
172   return static_cast<size_t>(GetDurationInNanoseconds(start_time)/1000/1000);
173 }
174 
175 /*!
176  * \brief Test TimerThread
177  */
TEST(ThreadGroup,TimerThread)178 TEST(ThreadGroup, TimerThread) {
179   // Create the thread group
180   std::shared_ptr<dmlc::ThreadGroup> thread_group = std::make_shared<dmlc::ThreadGroup>();
181 
182   using Duration = std::chrono::milliseconds;
183   // Create the queue thread object
184   std::shared_ptr<dmlc::TimerThread<Duration>> timer_thread =
185     std::make_shared<dmlc::TimerThread<Duration>>("TimerThread", thread_group.get());
186   Tick start_time = Now();
187   size_t count = 0;
188   // Launch the queue thread, passing queue item handler as lambda
189   dmlc::TimerThread<Duration>::start(
190     timer_thread, Duration(TIMER_PERIOD), [timer_thread, start_time, &count]() -> int {
191       if ((count + 1) % 5 == 0) {
192         // output slows it down a bit, so print fewer times
193         std::cout << "[" << (count + 1) << "] TIME: "
194                   << GetDurationInMilliseconds(start_time)
195                   << std::endl << std::flush;
196       }
197       ++count;
198       return 0;  // return 0 means continue
199     });
200   std::this_thread::sleep_for(Duration(SLEEP_DURATION));
201   // Trigger the queues to exit
202   thread_group->request_shutdown_all(true);
203   // Wait for all of the queue threads to exit
204   thread_group->join_all();
205   GTEST_ASSERT_GE(count, MIN_COUNT_WHILE_SLEEPING);  // Should have at least done three
206   GTEST_ASSERT_LE(count, MAX_COUNT_WHILE_SLEEPING); // Should not have had time to do 20 of them
207 }
208 
209 /*!
210  * \brief Test TimerThread Simple
211  */
TEST(ThreadGroup,TimerThreadSimple)212 TEST(ThreadGroup, TimerThreadSimple) {
213   // Create the thread group
214   std::shared_ptr<dmlc::ThreadGroup> thread_group = std::make_shared<dmlc::ThreadGroup>();
215 
216   using Duration = std::chrono::milliseconds;
217   Tick start_time = Now();
218   size_t count = 0;
219   // Launch the queue thread, passing queue item handler as lambda
220   dmlc::CreateTimer("TimerThreadSimple",
221                     Duration(TIMER_PERIOD),
222                     thread_group.get(),
223                     [start_time, &count]() -> int {
224                       if ((count + 1) % 5 == 0) {
225                         // output slows it down a bit, so print fewer times
226                         std::cout << "[" << (count + 1) << "] TIME: "
227                                   << GetDurationInMilliseconds(start_time)
228                                   << std::endl << std::flush;
229                       }
230                       ++count;
231                       return 0;  // return 0 means continue
232                     });
233   std::this_thread::sleep_for(Duration(SLEEP_DURATION));
234   // Trigger the queues to exit
235   thread_group->request_shutdown_all();
236   // Wait for all of the queue threads to exit
237   thread_group->join_all();
238   GTEST_ASSERT_GE(count, MIN_COUNT_WHILE_SLEEPING);  // Should have at least done three
239   GTEST_ASSERT_LE(count, MAX_COUNT_WHILE_SLEEPING); // Should not have had time to do 20 of them
240 }
241