1 /*!
2 * Copyright (c) 2017 by Contributors
3 * \file thread_group.h
4 * \brief Thread and synchronization primitives and lifecycle management
5 */
6 #ifndef DMLC_THREAD_GROUP_H_
7 #define DMLC_THREAD_GROUP_H_
8
9 #include <dmlc/concurrentqueue.h>
10 #include <dmlc/blockingconcurrentqueue.h>
11 #include <dmlc/logging.h>
12 #include <string>
13 #include <mutex>
14 #include <utility>
15 #include <memory>
16 #include <set>
17 #include <thread>
18 #include <unordered_set>
19 #include <unordered_map>
20 #if defined(DMLC_USE_CXX14) || __cplusplus > 201103L /* C++14 */
21 #include <shared_mutex>
22 #endif
23 #include <condition_variable>
24 #ifdef __linux__
25 #include <unistd.h>
26 #include <sys/syscall.h>
27 #endif
28
29 namespace dmlc {
30
31 /*!
32 * \brief Simple manual-reset event gate which remains open after signalled
33 */
34 class ManualEvent {
35 public:
ManualEvent()36 ManualEvent() : signaled_(false) {}
37
38 /*!
39 * \brief Wait for the object to become signaled. If the object
40 * is already in the signaled state and reset() has not been called, then no wait will occur
41 */
wait()42 void wait() {
43 std::unique_lock<std::mutex> lock(mutex_);
44 if (!signaled_) {
45 condition_variable_.wait(lock);
46 }
47 }
48
49 /*!
50 * \brief Set this object's state to signaled (wait() will release or pass through)
51 */
signal()52 void signal() {
53 signaled_ = true;
54 std::unique_lock<std::mutex> lk(mutex_);
55 condition_variable_.notify_all();
56 }
57
58 /*!
59 * \brief Manually reset this object's state to unsignaled (wait() will block)
60 */
reset()61 void reset() {
62 std::unique_lock<std::mutex> lk(mutex_);
63 signaled_ = false;
64 }
65
66 private:
67 /*! \brief Internal mutex to protect condition variable and signaled_ variable */
68 std::mutex mutex_;
69 /*! \brief Internal condition variable */
70 std::condition_variable condition_variable_;
71 /*! \brief lockfree signal state check */
72 std::atomic<bool> signaled_;
73 };
74
75 #if defined(DMLC_USE_CXX14) || __cplusplus > 201103L /* C++14 */
76 /*! \brief Mutex which can be read-locked and write-locked */
77 using SharedMutex = std::shared_timed_mutex;
78 /*! \brief Write lock, disallows both reads and writes */
79 using WriteLock = std::unique_lock<SharedMutex>;
80 /*! \brief Read lock, allows concurrent data reads */
81 using ReadLock = std::shared_lock<SharedMutex>;
82 #else
83 /*! \brief Standard mutex for C++ < 14 */
84 using SharedMutex = std::recursive_mutex;
85 /*! \brief Standard unique lock for C++ < 14 */
86 using WriteLock = std::unique_lock<SharedMutex>;
87 /*! \brief Standard unique lock for C++ < 14 */
88 using ReadLock = std::unique_lock<SharedMutex>;
89 #endif
90
91 /*!
92 * \brief Thread lifecycle management group
93 * \note See gtest unit tests Syc.* for a usage examples
94 */
95 class ThreadGroup {
96 public:
97 /*!
98 * \brief Lifecycle-managed thread (used by ThreadGroup)
99 * \note See gtest unit tests Syc.* for a usage examples
100 */
101 class Thread {
102 public:
103 /*! \brief Shared pointer type for readability */
104 using SharedPtr = std::shared_ptr<Thread>;
105
106 /*!
107 * \brief Constructor
108 * \param threadName User-defined name of the thread. must be unique per ThreadGroup
109 * \param owner The ThreadGroup object managing the lifecycle of this thread
110 * \param thrd Optionally-assigned std::thread object associated with this Thread class
111 */
112 Thread(std::string threadName, ThreadGroup *owner, std::thread *thrd = nullptr)
name_(std::move (threadName))113 : name_(std::move(threadName))
114 , thread_(thrd)
115 , ready_event_(std::make_shared<ManualEvent>())
116 , start_event_(std::make_shared<ManualEvent>())
117 , owner_(owner)
118 , shutdown_requested_(false)
119 , auto_remove_(false) {
120 CHECK_NOTNULL(owner);
121 }
122
123 /*!
124 * \brief Destructor with cleanup
125 */
~Thread()126 virtual ~Thread() {
127 const bool self_delete = is_current_thread();
128 if (!self_delete) {
129 request_shutdown();
130 internal_join(true);
131 }
132 WriteLock guard(thread_mutex_);
133 if (thread_.load()) {
134 std::thread *thrd = thread_.load();
135 thread_ = nullptr;
136 if (self_delete) {
137 thrd->detach();
138 }
139 delete thrd;
140 }
141 }
142
143 /*!
144 * \brief Name of the thread
145 * \return Pointer to the thread name's string
146 * \note This shoul ndly be used as immediate for the sacope of the
147 * shared pointer pointing to this object
148 */
name()149 const char *name() const {
150 return name_.c_str();
151 }
152
153 /*!
154 * \brief Launch the given Thread object
155 * \tparam StartFunction Function type for the thread 'main' function
156 * \tparam Args Arguments to pass to the thread 'main' function
157 * \param pThis Shared pointer for the managed thread to launch
158 * \param autoRemove if true, automatically remove this Thread object from the
159 * ThreadGroup owner upon exit
160 * \param start_function The Thread's 'main' function
161 * \param args Arguments to pass to the Thread's 'main' function
162 * \return true if the thread was successfully created and added to the ThreadGroup
163 * If false is returned, the thread may have already been started, but if something
164 * went wrong (ie duplicte thread name for the ThreadGroup), then request_shutdown()
165 * will have been been called on the running thread
166 */
167 template<typename StartFunction, typename ...Args>
168 static bool launch(std::shared_ptr<Thread> pThis,
169 bool autoRemove,
170 StartFunction start_function,
171 Args ...args);
172
173 /*!
174 * \brief Check if this class represents the currently running thread (self)
175 * \return true if the current running thread belongs to this class
176 */
is_current_thread()177 bool is_current_thread() const {
178 ReadLock guard(thread_mutex_);
179 return thread_.load() ? (thread_.load()->get_id() == std::this_thread::get_id()) : false;
180 }
181
182 /*!
183 * \brief Signal to this thread that a thread shutdown/exit is requested.
184 * \note This is a candidate for overrise in a derived class which may trigger shutdown
185 * by means other than a boolean (ie condition variable, SimpleManualkEvent, etc).
186 */
request_shutdown()187 virtual void request_shutdown() {
188 shutdown_requested_ = true;
189 }
190
191 /*!
192 * \brief Check whether shutdown has been requested (request_shutdown() was called)
193 * \return true if shutdown was requested.
194 * \note This may be overriden to match an overriden to match an overriden 'request_shutdown()',
195 * for instance.
196 */
is_shutdown_requested()197 virtual bool is_shutdown_requested() const {
198 return shutdown_requested_.load();
199 }
200
201 /*!
202 * \brief Check whether the thread is set to auto-remove itself from the ThreadGroup owner
203 * when exiting
204 * \return true if the thread will auto-remove itself from the ThreadGroup owner
205 * when exiting
206 */
is_auto_remove()207 bool is_auto_remove() const {
208 return auto_remove_;
209 }
210
211 /*!
212 * \brief Make the thread joinable (by removing the auto_remove flag)
213 * \warning Care should be taken not to cause a race condition between this call
214 * and parallel execution of this thread auto-removing itself
215 */
make_joinable()216 void make_joinable() {
217 auto_remove_ = false;
218 }
219
220 /*!
221 * \brief Check whether the thread is joinable
222 * \return true if the thread is joinable
223 */
joinable()224 bool joinable() const {
225 if (thread_.load()) {
226 CHECK_EQ(auto_remove_, false);
227 // be checked by searching the group or exit event.
228 return thread_.load()->joinable();
229 }
230 return false;
231 }
232
233 /*!
234 * \brief Thread join
235 * \note join() may not be called on auto-remove threads
236 */
join()237 void join() {
238 internal_join(false);
239 }
240
241 /*!
242 * \brief Get this thread's id
243 * \return this thread's id
244 */
get_id()245 std::thread::id get_id() const {
246 return thread_.load()->get_id();
247 }
248
249 private:
250 /*!
251 * \brief Internal join function
252 * \param auto_remove_ok Whether to allow join on an auto-remove thread
253 */
internal_join(bool auto_remove_ok)254 void internal_join(bool auto_remove_ok) {
255 ReadLock guard(thread_mutex_);
256 // should be careful calling (or any function externally) this when in
257 // auto-remove mode
258 if (thread_.load() && thread_.load()->get_id() != std::thread::id()) {
259 std::thread::id someId;
260 if (!auto_remove_ok) {
261 CHECK_EQ(auto_remove_, false);
262 }
263 CHECK_NOTNULL(thread_.load());
264 if (thread_.load()->joinable()) {
265 thread_.load()->join();
266 } else {
267 LOG(WARNING) << "Thread " << name_ << " ( "
268 << thread_.load()->get_id() << " ) not joinable";
269 }
270 }
271 }
272
273 /*!
274 * \brief Thread bootstrapping and teardown wrapper
275 * \tparam StartFunction Thread's "main" function
276 * \tparam Args Argument types to be passed to the start_function
277 * \param pThis Shared pointer to the Thread object to operate upon
278 * \param start_function Thread's "main" function (i.e. passed to launch())
279 * \param args Arguments to be passed to the start_function
280 * \return The thread's return code
281 */
282 template <typename StartFunction, typename ...Args>
283 static int entry_and_exit_f(std::shared_ptr<Thread> pThis,
284 StartFunction start_function,
285 Args... args);
286 /*! \brief Thread name */
287 std::string name_;
288 /*! \brief Shared mutex for some thread operations */
289 mutable SharedMutex thread_mutex_;
290 /*! \brief Pointer to the stl thread object */
291 std::atomic<std::thread *> thread_;
292 /*! \brief Signaled when the thread is started and ready to execute user code */
293 std::shared_ptr<ManualEvent> ready_event_;
294 /*! \brief Thread will block after setting ready_event_ until start_event_ is signaled */
295 std::shared_ptr<ManualEvent> start_event_;
296 /*! \brief The ThreadGroup ownber managing this thread's lifecycle */
297 ThreadGroup *owner_;
298 /*! \brief Flag to determine if shutdown was requested. */
299 std::atomic<bool> shutdown_requested_;
300 /*!
301 * \brief Whether to automatically remove this thread's object from the ThreadGroup when the
302 * thread exists (perform its own cleanup)
303 */
304 std::atomic<bool> auto_remove_;
305 };
306
307 /*!
308 * \brief Constructor
309 */
ThreadGroup()310 inline ThreadGroup()
311 : evEmpty_(std::make_shared<ManualEvent>()) {
312 evEmpty_->signal(); // Starts out empty
313 }
314
315 /*!
316 * \brief Destructor, perform cleanup. All child threads will be exited when this
317 * destructor completes
318 */
~ThreadGroup()319 virtual ~ThreadGroup() {
320 request_shutdown_all();
321 join_all();
322 }
323
324 /*!
325 * \brief Check if the current thread a member if this ThreadGroup
326 * \return true if the current thread is a member of this thread group
327 * \note This lookup involved a linear search, so for a large number of threads,
328 * is it not advised to call this function in a performance-sensitive area
329 */
is_this_thread_in()330 inline bool is_this_thread_in() const {
331 std::thread::id id = std::this_thread::get_id();
332 ReadLock guard(m_);
333 for (auto it = threads_.begin(), end = threads_.end(); it != end; ++it) {
334 std::shared_ptr<Thread> thrd = *it;
335 if (thrd->get_id() == id)
336 return true;
337 }
338 return false;
339 }
340
341 /*!
342 * \brief Check if the current thread is a member of this ThreadGroup
343 * \param thrd The thread to search for
344 * \return true if the given thread is a member of this ThreadGroup
345 */
is_thread_in(std::shared_ptr<Thread> thrd)346 inline bool is_thread_in(std::shared_ptr<Thread> thrd) const {
347 if (thrd) {
348 std::thread::id id = thrd->get_id();
349 ReadLock guard(m_);
350 for (auto it = threads_.begin(), end = threads_.end(); it != end; ++it) {
351 std::shared_ptr<Thread> thrd = *it;
352 if (thrd->get_id() == id)
353 return true;
354 }
355 return false;
356 } else {
357 return false;
358 }
359 }
360
361 /*!
362 * \brief Add a Thread object to this thread group
363 * \param thrd The thread to add to this ThreadGroup object
364 * \return true if the given thread was added to this ThreadGroup
365 */
add_thread(std::shared_ptr<Thread> thrd)366 inline bool add_thread(std::shared_ptr<Thread> thrd) {
367 if (thrd) {
368 WriteLock guard(m_);
369 auto iter = name_to_thread_.find(thrd->name());
370 if (iter == name_to_thread_.end()) {
371 name_to_thread_.emplace(std::make_pair(thrd->name(), thrd));
372 CHECK_EQ(threads_.insert(thrd).second, true);
373 evEmpty_->reset();
374 return true;
375 }
376 }
377 return false;
378 }
379
380 /*!
381 * \brief Remove a Thread object from this thread group
382 * \param thrd The thread to remove from this ThreadGroup object
383 * \return true if the given thread was removed from this ThreadGroup
384 */
remove_thread(std::shared_ptr<Thread> thrd)385 inline bool remove_thread(std::shared_ptr<Thread> thrd) {
386 if (thrd) {
387 WriteLock guard(m_);
388 auto iter = threads_.find(thrd);
389 if (iter != threads_.end()) {
390 name_to_thread_.erase(thrd->name());
391 threads_.erase(iter);
392 if (threads_.empty()) {
393 evEmpty_->signal();
394 }
395 return true;
396 }
397 }
398 return false;
399 }
400
401 /*!
402 * \brief Join all threads in this ThreadGroup
403 * \note While it is not valid to call 'join' on an auto-remove thread, this function will
404 * wait for auto-remove threads to exit (waits for the ThreadGroup to become empty)
405 */
join_all()406 inline void join_all() {
407 CHECK_EQ(!is_this_thread_in(), true);
408 do {
409 std::unique_lock<std::mutex> lk(join_all_mtx_);
410 std::unordered_set<std::shared_ptr<Thread>> working_set;
411 {
412 ReadLock guard(m_);
413 for (auto iter = threads_.begin(), e_iter = threads_.end(); iter != e_iter; ++iter) {
414 if (!(*iter)->is_auto_remove()) {
415 working_set.emplace(*iter);
416 }
417 }
418 }
419 // Where possible, prefer to do a proper join rather than simply waiting for empty
420 // (easier to troubleshoot)
421 while (!working_set.empty()) {
422 std::shared_ptr<Thread> thrd;
423 thrd = *working_set.begin();
424 if (thrd->joinable()) {
425 thrd->join();
426 }
427 remove_thread(thrd);
428 working_set.erase(working_set.begin());
429 thrd.reset();
430 }
431 // Wait for auto-remove threads (if any) to complete
432 } while (0);
433 evEmpty_->wait();
434 CHECK_EQ(threads_.size(), 0);
435 }
436
437 /*!
438 * \brief Call request_shutdown() on all threads in this ThreadGroup
439 * \param make_all_joinable If true, remove all auto_remove flags from child threads
440 */
441 inline void request_shutdown_all(const bool make_all_joinable = true) {
442 std::unique_lock<std::mutex> lk(join_all_mtx_);
443 ReadLock guard(m_);
444 for (auto &thread : threads_) {
445 if (make_all_joinable) {
446 thread->make_joinable();
447 }
448 thread->request_shutdown();
449 }
450 }
451
452 /*!
453 * \brief Return the number of threads in this thread group
454 * \return Number of threads in this thread group
455 */
size()456 inline size_t size() const {
457 ReadLock guard(m_);
458 return threads_.size();
459 }
460
461 /*!
462 * \brief Check if the ThreadGroup is empty
463 * \return true if the ThreadGroup is empty
464 */
empty()465 inline bool empty() const {
466 ReadLock guard(m_);
467 return threads_.size() == 0;
468 }
469
470 /*!
471 * \brief Create and launch a new Thread object which will be owned by this ThreadGroup
472 * \tparam StartFunction Function type for the thread 'main' function
473 * \tparam ThreadType managedThreadclass type (in case it's derived, for instance)
474 * \tparam Args Arguments to pass to the thread 'main' function
475 * \param threadName Name if the thread. Must be unique for a ThreadGroup object
476 * \param auto_remove If true, automatically remove this Thread object from the
477 * ThreadGroup owner upon exit
478 * \param start_function The Thread's 'main' function
479 * \param args Arguments to pass to the Thread's 'main' function
480 * \return true if the thread was successfully created and added to the ThreadGroup
481 * If false is returned, the thread may have already been started, but if something
482 * went wrong (ie duplicte thread name for the ThreadGroup), then request_shutdown()
483 * will have been been called on the running thread
484 */
485 template<typename StartFunction, typename ThreadType = Thread, typename ...Args>
create(const std::string & threadName,bool auto_remove,StartFunction start_function,Args...args)486 inline bool create(const std::string &threadName,
487 bool auto_remove,
488 StartFunction start_function,
489 Args... args) {
490 typename ThreadType::SharedPtr newThread(new ThreadType(threadName, this));
491 return Thread::launch(newThread, auto_remove, start_function, args...);
492 }
493
494 /*!
495 * \brief Lookup Thread object by name
496 * \param name Name of the thread to look up
497 * \return A shared pointer to the Thread object
498 */
thread_by_name(const std::string & name)499 inline std::shared_ptr<Thread> thread_by_name(const std::string& name) {
500 ReadLock guard(m_);
501 auto iter = name_to_thread_.find(name);
502 if (iter != name_to_thread_.end()) {
503 return iter->second;
504 }
505 return nullptr;
506 }
507
508 private:
509 /*! \brief ThreadGroup synchronization mutex */
510 mutable SharedMutex m_;
511 /*! \brief join_all/auto_remove synchronization mutex */
512 mutable std::mutex join_all_mtx_;
513 /*! \brief Set of threads owned and managed by this ThreadGroup object */
514 std::unordered_set<std::shared_ptr<Thread>> threads_;
515 /*! \brief Manual event which is signaled when the thread group is empty */
516 std::shared_ptr<ManualEvent> evEmpty_;
517 /*! \brief name->thread mapping */
518 std::unordered_map<std::string, std::shared_ptr<Thread>> name_to_thread_;
519 };
520
521 /*!
522 * \brief Blocking queue thread class
523 * \tparam ObjectType Object type to queue
524 * \tparam quit_item Object value to signify queue shutdown (ie nullptr for pointer type is common)
525 * \note See gtest unit test Syc.ManagedThreadLaunchQueueThread for a usage example
526 */
527 template<typename ObjectType, ObjectType quit_item>
528 class BlockingQueueThread : public ThreadGroup::Thread {
529 using BQT = BlockingQueueThread<ObjectType, quit_item>;
530
531 public:
532 /*!
533 * \brief Constructor
534 * \param name Name for the blockin g queue thread. Must be unique for a specific ThreadGroup
535 * \param owner ThreadGroup lifecycle manafger/owner
536 * \param thrd Optionally attach an existing stl thread object
537 */
538 BlockingQueueThread(const std::string& name,
539 dmlc::ThreadGroup *owner,
540 std::thread *thrd = nullptr)
Thread(std::move (name),owner,thrd)541 : ThreadGroup::Thread(std::move(name), owner, thrd)
542 , shutdown_in_progress_(false) {
543 }
544
545
546 /*!
547 * \brief Destructor
548 */
~BlockingQueueThread()549 ~BlockingQueueThread() override {
550 // Call to parent first because we don't want to wait for the queue to empty
551 ThreadGroup::Thread::request_shutdown();
552 request_shutdown();
553 }
554
555 /*!
556 * \brief Signal the thread that a shutdown is desired
557 * \note Since consumer doesn't necessarily get items in order, we must wait for
558 * the queue to empty.
559 * This is generally a shutdown procedure and should not be called from
560 * a performance-sensitive area
561 */
request_shutdown()562 void request_shutdown() override {
563 shutdown_in_progress_ = true;
564 while (queue_->size_approx() > 0 && !ThreadGroup::Thread::is_shutdown_requested()) {
565 std::this_thread::sleep_for(std::chrono::milliseconds(1));
566 }
567 ThreadGroup::Thread::request_shutdown();
568 queue_->enqueue(quit_item);
569 }
570
571 /*!
572 * \brief Enqueue and item
573 * \param item The item to enqueue
574 */
enqueue(const ObjectType & item)575 void enqueue(const ObjectType& item) {
576 if (!shutdown_in_progress_) {
577 queue_->enqueue(item);
578 }
579 }
580
581 /*!
582 * \brief Get the approximate size of the queue
583 * \return The approximate size of the queue
584 */
size_approx()585 size_t size_approx() const { return queue_->size_approx(); }
586
587 /*!
588 * \brief Launch to the 'run' function which will, in turn, call the class'
589 * 'run' function, passing it the given 'secondary_function'
590 * for it to call as needed
591 * \tparam SecondaryFunction Type of the secondary function for 'run' override
592 * to call as needed
593 * \param pThis Pointer to the managed thread to launch
594 * \param secondary_function secondary function for 'run' override to call as needed
595 * \return true if thread is launched successfully and added to the ThreadGroup
596 */
597 template<typename SecondaryFunction>
launch_run(std::shared_ptr<BQT> pThis,SecondaryFunction secondary_function)598 static bool launch_run(std::shared_ptr<BQT> pThis,
599 SecondaryFunction secondary_function) {
600 return ThreadGroup::Thread::launch(pThis, true, [](std::shared_ptr<BQT> pThis,
601 SecondaryFunction secondary_function) {
602 return pThis->run(secondary_function);
603 },
604 pThis, secondary_function);
605 }
606
607 /*!
608 * \brief Thread's main queue processing function
609 * \tparam OnItemFunction Function type to call when an item is dequeued
610 * \param on_item_function Function to call when an item is dequeued
611 * \return 0 if completed through a `quit_item`, nonzero if on_item_function requested an exit
612 */
613 template<typename OnItemFunction>
run(OnItemFunction on_item_function)614 inline int run(OnItemFunction on_item_function) {
615 int rc = 0;
616 do {
617 ObjectType item;
618 queue_->wait_dequeue(item);
619 if (item == quit_item) {
620 break;
621 }
622 rc = on_item_function(item);
623 if (rc) {
624 break;
625 }
626 } while (true);
627 return rc;
628 }
629
630 private:
631 /*! \brief The blocking queue associated with this thread */
632 std::shared_ptr<dmlc::moodycamel::BlockingConcurrentQueue<ObjectType>> queue_ =
633 std::make_shared<dmlc::moodycamel::BlockingConcurrentQueue<ObjectType>>();
634 /*! \brief Whether shutdown request is in progress */
635 std::atomic<bool> shutdown_in_progress_;
636 };
637
638 /*!
639 * \brief Managed timer thread
640 * \tparam Duration Duration type (ie seconds, microseconds, etc)
641 */
642 template<typename Duration>
643 class TimerThread : public ThreadGroup::Thread {
644 using ThreadGroup::Thread::is_shutdown_requested;
645
646 public:
647 /*!
648 * \brief Constructor
649 * \param name Name of the timer thread
650 * \param owner ThreadGroup owner if the timer thread
651 */
TimerThread(const std::string & name,ThreadGroup * owner)652 TimerThread(const std::string& name, ThreadGroup *owner)
653 : Thread(name, owner) {
654 }
655
656 /*!
657 * \brief Destructor
658 */
~TimerThread()659 ~TimerThread() override {
660 request_shutdown();
661 }
662
663 /*!
664 * \brief Launch to the 'run' function which will, in turn, call the class'
665 * 'run' function, passing it the given 'secondary_function'
666 * for it to call as needed
667 * \tparam SecondaryFunction Type of the secondary function for 'run' override
668 * to call as needed
669 * \param pThis Pointer to the managed thread to launch
670 * \param secondary_function secondary function for 'run' override to call as needed
671 * \return true if thread is launched successfully and added to the ThreadGroup
672 */
673 template<typename SecondaryFunction>
launch_run(std::shared_ptr<TimerThread<Duration>> pThis,SecondaryFunction secondary_function)674 static bool launch_run(std::shared_ptr<TimerThread<Duration>> pThis,
675 SecondaryFunction secondary_function) {
676 return ThreadGroup::Thread::launch(pThis, true, [](std::shared_ptr<TimerThread<Duration>> pThis,
677 SecondaryFunction secondary_function) {
678 return pThis->run(secondary_function);
679 },
680 pThis, secondary_function);
681 }
682
683 /*!
684 * \brief Start a given timer thread
685 * \tparam Function Type of the timer function
686 * \param timer_thread Thread object to perform the timer events
687 * \param duration Duration between the end end of the timer function and the next timer event
688 * \param function Function to call when the timer expires
689 * \note Calling shutdown_requested() will cause the thread to exit the next time that the timer
690 * expires.
691 */
692 template<typename Function>
start(std::shared_ptr<TimerThread> timer_thread,Duration duration,Function function)693 static void start(std::shared_ptr<TimerThread> timer_thread,
694 Duration duration,
695 Function function) {
696 timer_thread->duration_ = duration;
697 launch_run(timer_thread, function);
698 }
699
700 /*!
701 * \brief Internal timer execution function
702 * \tparam OnTimerFunction Type of function to call each time the timer expires
703 * \param on_timer_function Function to call each time the timer expires
704 * \return Exit code of the thread
705 */
706 template<typename OnTimerFunction>
run(OnTimerFunction on_timer_function)707 inline int run(OnTimerFunction on_timer_function) {
708 int rc = 0;
709 while (!is_shutdown_requested()) {
710 std::this_thread::sleep_for(duration_);
711 if (!is_shutdown_requested()) {
712 rc = on_timer_function();
713 }
714 }
715 return rc;
716 }
717
718 private:
719 Duration duration_;
720 };
721
722 /*
723 * Inline functions - see declarations for usage
724 */
725 template <typename StartFunction, typename ...Args>
entry_and_exit_f(std::shared_ptr<Thread> pThis,StartFunction start_function,Args...args)726 inline int ThreadGroup::Thread::entry_and_exit_f(std::shared_ptr<Thread> pThis,
727 StartFunction start_function,
728 Args... args) {
729 int rc;
730 if (pThis) {
731 // Signal launcher that we're up and running
732 pThis->ready_event_->signal();
733 // Wait for launcher to be ready for us to start
734 pThis->start_event_->wait();
735 // Reset start_event_ for possible reuse
736 pThis->start_event_->reset(); // Reset in case it needs to be reused
737 // If we haven't been requested to shut down prematurely, then run the desired function
738 if (!pThis->is_shutdown_requested()) {
739 rc = start_function(args...);
740 } else {
741 rc = -1;
742 }
743 // If we're set up as auto-remove, then remove this thread from the thread group
744 if (pThis->is_auto_remove()) {
745 pThis->owner_->remove_thread(pThis);
746 }
747 // Release this thread shared pinter. May or may not be the last reference.
748 pThis.reset();
749 } else {
750 LOG(ERROR) << "Null pThis thread pointer";
751 rc = EINVAL;
752 }
753 return rc;
754 }
755
756 template<typename StartFunction, typename ...Args>
launch(std::shared_ptr<Thread> pThis,bool autoRemove,StartFunction start_function,Args...args)757 inline bool ThreadGroup::Thread::launch(std::shared_ptr<Thread> pThis,
758 bool autoRemove,
759 StartFunction start_function,
760 Args ...args) {
761 WriteLock guard(pThis->thread_mutex_);
762 CHECK_EQ(!pThis->thread_.load(), true);
763 CHECK_NOTNULL(pThis->owner_);
764 // Set auto remove
765 pThis->auto_remove_ = autoRemove;
766 // Create the actual stl thread object
767 pThis->thread_ = new std::thread(Thread::template entry_and_exit_f<
768 StartFunction, Args...>,
769 pThis,
770 start_function,
771 args...);
772 // Attempt to add the thread to the thread group (after started, since in case
773 // something goes wrong, there's not a zombie thread in the thread group)
774 if (!pThis->owner_->add_thread(pThis)) {
775 pThis->request_shutdown();
776 LOG(ERROR) << "Duplicate thread name within the same thread group is not allowed";
777 }
778 // Wait for the thread to spin up
779 pThis->ready_event_->wait();
780 // Signal the thgread to continue (it will check its shutdown status)
781 pThis->start_event_->signal();
782 // Return if successful
783 return pThis->thread_.load() != nullptr;
784 }
785
786 /*!
787 * \brief Utility function to easily create a timer
788 * \tparam Duration Duration type (i.e. std::chrono::milliseconds)
789 * \tparam TimerFunction Function to call each time the timer expires
790 * \param timer_name Name of the timer. Must be unique per ThreadGroup object
791 * \param duration Duration of the timer between calls to timer_function
792 * \param owner ThreadGroup owner of the timer
793 * \param timer_function Function to call each time the timer expires
794 * \return true if the timer was successfully created
795 */
796 template<typename Duration, typename TimerFunction>
CreateTimer(const std::string & timer_name,const Duration & duration,ThreadGroup * owner,TimerFunction timer_function)797 inline bool CreateTimer(const std::string& timer_name,
798 const Duration& duration,
799 ThreadGroup *owner,
800 TimerFunction timer_function) {
801 std::shared_ptr<dmlc::TimerThread<Duration>> timer_thread =
802 std::make_shared<dmlc::TimerThread<Duration>>(timer_name, owner);
803 dmlc::TimerThread<Duration>::start(timer_thread, duration, timer_function);
804 return timer_thread != nullptr;
805 }
806 } // namespace dmlc
807
808 #endif // DMLC_THREAD_GROUP_H_
809