1 #include <dmlc/io.h>
2 #include <dmlc/memory_io.h>
3 #include <dmlc/blockingconcurrentqueue.h>
4 #include <dmlc/thread_group.h>
5 #include <gtest/gtest.h>
6
7 #if (defined _WIN32)
8
9 #define NOMINMAX
10 #include <Windows.h>
dmlc_usleep(__int64 usec)11 static inline void dmlc_usleep(__int64 usec) {
12 HANDLE timer;
13 LARGE_INTEGER ft;
14
15 ft.QuadPart = -(10*usec); // Convert to 100 nanosecond interval, negative value indicates relative time
16
17 timer = CreateWaitableTimer(NULL, TRUE, NULL);
18 SetWaitableTimer(timer, &ft, 0, NULL, NULL, 0);
19 WaitForSingleObject(timer, INFINITE);
20 CloseHandle(timer);
21 }
22
23 #elif (defined DMLC_NANOSLEEP_PRESENT)
24
25 #include <sys/types.h> // for useconds_t, time_t
26 #include <time.h> // for timespec, nanosleep
27
dmlc_usleep(useconds_t useconds)28 static inline int dmlc_usleep(useconds_t useconds) {
29 timespec ts;
30 ts.tv_sec = static_cast<time_t>(useconds / 1000000);
31 ts.tv_nsec = static_cast<long>(useconds % 1000000 * 1000ul);
32 return nanosleep(&ts, NULL);
33 }
34
35 #else
36
37 #include <unistd.h> // for usleep()
38
dmlc_usleep(useconds_t useconds)39 static inline int dmlc_usleep(useconds_t useconds) {
40 return usleep(useconds);
41 }
42
43 #endif
44
45 static std::atomic<int> thread_count(0);
46
this_is_thread_func(std::string label,const bool with_delay)47 static int this_is_thread_func(std::string label, const bool with_delay) {
48 ++thread_count;
49 if(with_delay) {
50 dmlc_usleep(1e4);
51 }
52 --thread_count;
53 return 0;
54 }
55
56 /*!
57 * \brief Generic Thread launch to standalone function, passing ThreadGroup owner
58 */
TEST(ThreadGroup,ThreadLaunchAutoRemove)59 TEST(ThreadGroup, ThreadLaunchAutoRemove) {
60 std::shared_ptr<dmlc::ThreadGroup> thread_group = std::make_shared<dmlc::ThreadGroup>();
61 for(int x = 0; x < 200; ++x) {
62 dmlc::ThreadGroup::Thread::SharedPtr thread =
63 std::make_shared<dmlc::ThreadGroup::Thread>(std::string("test_thread_ar ")
64 + std::to_string(x), thread_group.get());
65 dmlc::ThreadGroup::Thread::launch(thread, true, this_is_thread_func, "Runner", false);
66 }
67 thread_group.reset();
68 CHECK_EQ(thread_count, 0);
69 }
70
71 /*!
72 * \brief Generic Thread launch to standalone function, passing ThreadGroup owner
73 */
TEST(ThreadGroup,ThreadLaunchAutoRemoveWithDelay)74 TEST(ThreadGroup, ThreadLaunchAutoRemoveWithDelay) {
75 std::shared_ptr<dmlc::ThreadGroup> thread_group = std::make_shared<dmlc::ThreadGroup>();
76 for(int x = 0; x < 200; ++x) {
77 dmlc::ThreadGroup::Thread::SharedPtr thread =
78 std::make_shared<dmlc::ThreadGroup::Thread>(std::string("test_thread_rwd ")
79 + std::to_string(x), thread_group.get());
80 dmlc::ThreadGroup::Thread::launch(thread, true, this_is_thread_func, "Runner", true);
81 }
82 thread_group.reset();
83 CHECK_EQ(thread_count, 0);
84 }
85
86 /*!
87 * \brief Generic Thread launch to standalone function, passing ThreadGroup owner
88 */
TEST(ThreadGroup,ThreadLaunchNoAutoRemove)89 TEST(ThreadGroup, ThreadLaunchNoAutoRemove) {
90 std::shared_ptr<dmlc::ThreadGroup> thread_group = std::make_shared<dmlc::ThreadGroup>();
91 for(int x = 0; x < 200; ++x) {
92 dmlc::ThreadGroup::Thread::SharedPtr thread =
93 std::make_shared<dmlc::ThreadGroup::Thread>(std::string("test_thread_nao ")
94 + std::to_string(x), thread_group.get());
95 dmlc::ThreadGroup::Thread::launch(thread, false, this_is_thread_func, "Runner", false);
96 }
97 thread_group.reset();
98 CHECK_EQ(thread_count, 0);
99 }
100
101 /*!
102 * \brief Generic Thread launch to standalone function, passing ThreadGroup owner
103 */
TEST(ThreadGroup,ThreadLaunchNoAutoRemoveWithDelay)104 TEST(ThreadGroup, ThreadLaunchNoAutoRemoveWithDelay) {
105 std::shared_ptr<dmlc::ThreadGroup> thread_group = std::make_shared<dmlc::ThreadGroup>();
106 for(int x = 0; x < 200; ++x) {
107 dmlc::ThreadGroup::Thread::SharedPtr thread =
108 std::make_shared<dmlc::ThreadGroup::Thread>(std::string("test_thread_narwd ")
109 + std::to_string(x), thread_group.get());
110 dmlc::ThreadGroup::Thread::launch(thread, false, this_is_thread_func, "Runner", true);
111 }
112 thread_group.reset();
113 CHECK_EQ(thread_count, 0);
114 }
115
116 /*!
117 * \brief Test BlockingQueueThread
118 */
TEST(ThreadGroup,ThreadLaunchQueueThread)119 TEST(ThreadGroup, ThreadLaunchQueueThread) {
120 // Define the queue type for convenience
121 using BQ = dmlc::BlockingQueueThread<int, -1>;
122
123 // Create the thread group
124 std::shared_ptr<dmlc::ThreadGroup> thread_group = std::make_shared<dmlc::ThreadGroup>();
125
126 // Create the queue thread object
127 std::shared_ptr<BQ> queue_thread = std::make_shared<BQ>("BlockingQueueThread",
128 thread_group.get());
129
130 // Queue some stuff before the thread starts
131 queue_thread->enqueue(1);
132 queue_thread->enqueue(2);
133 queue_thread->enqueue(3);
134 queue_thread->enqueue(4);
135 CHECK_EQ(queue_thread->size_approx(), 4U);
136 // Launch the queue thread, passing queue item handler as lambda
137 BQ::launch_run(queue_thread,
138 // Queue item handler
139 [queue_thread](int item) -> int {
140 std::cout << "ITEM: " << item
141 << std::endl << std::flush;
142 if(item >= 2 && item <= 3) {
143 // Queue some more while thread is running
144 queue_thread->enqueue(100 + item);
145 }
146 return 0; // return 0 means continue
147 });
148 // Trigger the queues to exit
149 thread_group->request_shutdown_all(false);
150 // Wait for all of the queue threads to exit
151 thread_group->join_all();
152 // Check that the queue is empty
153 CHECK_EQ(queue_thread->size_approx(), 0);
154 }
155
156 using Tick = std::chrono::high_resolution_clock::time_point;
Now()157 static inline Tick Now() { return std::chrono::high_resolution_clock::now(); }
GetDurationInNanoseconds(const Tick & t1,const Tick & t2)158 static inline uint64_t GetDurationInNanoseconds(const Tick &t1, const Tick &t2) {
159 return static_cast<uint64_t>(
160 std::chrono::duration_cast<std::chrono::nanoseconds>(t2 - t1).count());
161 }
GetDurationInNanoseconds(const Tick & since)162 static inline uint64_t GetDurationInNanoseconds(const Tick &since) {
163 return GetDurationInNanoseconds(since, Now());
164 }
165
166 constexpr size_t SLEEP_DURATION = 500;
167 constexpr size_t TIMER_PERIOD = 10; // Ideal is 50 periods occur
168 constexpr size_t MIN_COUNT_WHILE_SLEEPING = 10;
169 constexpr size_t MAX_COUNT_WHILE_SLEEPING = 150;
170
GetDurationInMilliseconds(const Tick & start_time)171 inline size_t GetDurationInMilliseconds(const Tick& start_time) {
172 return static_cast<size_t>(GetDurationInNanoseconds(start_time)/1000/1000);
173 }
174
175 /*!
176 * \brief Test TimerThread
177 */
TEST(ThreadGroup,TimerThread)178 TEST(ThreadGroup, TimerThread) {
179 // Create the thread group
180 std::shared_ptr<dmlc::ThreadGroup> thread_group = std::make_shared<dmlc::ThreadGroup>();
181
182 using Duration = std::chrono::milliseconds;
183 // Create the queue thread object
184 std::shared_ptr<dmlc::TimerThread<Duration>> timer_thread =
185 std::make_shared<dmlc::TimerThread<Duration>>("TimerThread", thread_group.get());
186 Tick start_time = Now();
187 size_t count = 0;
188 // Launch the queue thread, passing queue item handler as lambda
189 dmlc::TimerThread<Duration>::start(
190 timer_thread, Duration(TIMER_PERIOD), [timer_thread, start_time, &count]() -> int {
191 if ((count + 1) % 5 == 0) {
192 // output slows it down a bit, so print fewer times
193 std::cout << "[" << (count + 1) << "] TIME: "
194 << GetDurationInMilliseconds(start_time)
195 << std::endl << std::flush;
196 }
197 ++count;
198 return 0; // return 0 means continue
199 });
200 std::this_thread::sleep_for(Duration(SLEEP_DURATION));
201 // Trigger the queues to exit
202 thread_group->request_shutdown_all(true);
203 // Wait for all of the queue threads to exit
204 thread_group->join_all();
205 GTEST_ASSERT_GE(count, MIN_COUNT_WHILE_SLEEPING); // Should have at least done three
206 GTEST_ASSERT_LE(count, MAX_COUNT_WHILE_SLEEPING); // Should not have had time to do 20 of them
207 }
208
209 /*!
210 * \brief Test TimerThread Simple
211 */
TEST(ThreadGroup,TimerThreadSimple)212 TEST(ThreadGroup, TimerThreadSimple) {
213 // Create the thread group
214 std::shared_ptr<dmlc::ThreadGroup> thread_group = std::make_shared<dmlc::ThreadGroup>();
215
216 using Duration = std::chrono::milliseconds;
217 Tick start_time = Now();
218 size_t count = 0;
219 // Launch the queue thread, passing queue item handler as lambda
220 dmlc::CreateTimer("TimerThreadSimple",
221 Duration(TIMER_PERIOD),
222 thread_group.get(),
223 [start_time, &count]() -> int {
224 if ((count + 1) % 5 == 0) {
225 // output slows it down a bit, so print fewer times
226 std::cout << "[" << (count + 1) << "] TIME: "
227 << GetDurationInMilliseconds(start_time)
228 << std::endl << std::flush;
229 }
230 ++count;
231 return 0; // return 0 means continue
232 });
233 std::this_thread::sleep_for(Duration(SLEEP_DURATION));
234 // Trigger the queues to exit
235 thread_group->request_shutdown_all();
236 // Wait for all of the queue threads to exit
237 thread_group->join_all();
238 GTEST_ASSERT_GE(count, MIN_COUNT_WHILE_SLEEPING); // Should have at least done three
239 GTEST_ASSERT_LE(count, MAX_COUNT_WHILE_SLEEPING); // Should not have had time to do 20 of them
240 }
241