1 /*!
2  * Copyright (c) 2017 by Contributors
3  * \file thread_group.h
4  * \brief Thread and synchronization primitives and lifecycle management
5  */
6 #ifndef DMLC_THREAD_GROUP_H_
7 #define DMLC_THREAD_GROUP_H_
8 
9 #include <dmlc/concurrentqueue.h>
10 #include <dmlc/blockingconcurrentqueue.h>
11 #include <dmlc/logging.h>
12 #include <string>
13 #include <mutex>
14 #include <utility>
15 #include <memory>
16 #include <set>
17 #include <thread>
18 #include <unordered_set>
19 #include <unordered_map>
20 #if defined(DMLC_USE_CXX14) || __cplusplus > 201103L  /* C++14 */
21 #include <shared_mutex>
22 #endif
23 #include <condition_variable>
24 #ifdef __linux__
25 #include <unistd.h>
26 #include <sys/syscall.h>
27 #endif
28 
29 namespace dmlc {
30 
31 /*!
32  * \brief Simple manual-reset event gate which remains open after signalled
33  */
34 class ManualEvent {
35  public:
ManualEvent()36   ManualEvent() : signaled_(false) {}
37 
38   /*!
39    * \brief Wait for the object to become signaled.  If the object
40    * is already in the signaled state and reset() has not been called, then no wait will occur
41    */
wait()42   void wait() {
43     std::unique_lock<std::mutex> lock(mutex_);
44     if (!signaled_) {
45       condition_variable_.wait(lock);
46     }
47   }
48 
49   /*!
50    * \brief Set this object's state to signaled (wait() will release or pass through)
51    */
signal()52   void signal() {
53     signaled_ = true;
54     std::unique_lock<std::mutex> lk(mutex_);
55     condition_variable_.notify_all();
56   }
57 
58   /*!
59    * \brief Manually reset this object's state to unsignaled (wait() will block)
60    */
reset()61   void reset() {
62     std::unique_lock<std::mutex> lk(mutex_);
63     signaled_ = false;
64   }
65 
66  private:
67   /*! \brief Internal mutex to protect condition variable and signaled_ variable */
68   std::mutex mutex_;
69   /*! \brief Internal condition variable */
70   std::condition_variable condition_variable_;
71   /*! \brief lockfree signal state check */
72   std::atomic<bool> signaled_;
73 };
74 
75 #if defined(DMLC_USE_CXX14) || __cplusplus > 201103L  /* C++14 */
76 /*! \brief Mutex which can be read-locked and write-locked */
77 using SharedMutex = std::shared_timed_mutex;
78 /*! \brief Write lock, disallows both reads and writes */
79 using WriteLock = std::unique_lock<SharedMutex>;
80 /*! \brief Read lock, allows concurrent data reads */
81 using ReadLock = std::shared_lock<SharedMutex>;
82 #else
83 /*! \brief Standard mutex for C++ < 14 */
84 using SharedMutex = std::recursive_mutex;
85 /*! \brief Standard unique lock for C++ < 14 */
86 using WriteLock = std::unique_lock<SharedMutex>;
87 /*! \brief Standard unique lock for C++ < 14 */
88 using ReadLock = std::unique_lock<SharedMutex>;
89 #endif
90 
91 /*!
92  * \brief Thread lifecycle management group
93  * \note See gtest unit tests Syc.* for a usage examples
94  */
95 class ThreadGroup {
96  public:
97   /*!
98    * \brief Lifecycle-managed thread (used by ThreadGroup)
99    * \note See gtest unit tests Syc.* for a usage examples
100    */
101   class Thread {
102    public:
103     /*! \brief Shared pointer type for readability */
104     using SharedPtr = std::shared_ptr<Thread>;
105 
106     /*!
107      * \brief Constructor
108      * \param threadName User-defined name of the thread. must be unique per ThreadGroup
109      * \param owner The ThreadGroup object managing the lifecycle of this thread
110      * \param thrd Optionally-assigned std::thread object associated with this Thread class
111      */
112     Thread(std::string threadName, ThreadGroup *owner, std::thread *thrd = nullptr)
name_(std::move (threadName))113       : name_(std::move(threadName))
114         , thread_(thrd)
115         , ready_event_(std::make_shared<ManualEvent>())
116         , start_event_(std::make_shared<ManualEvent>())
117         , owner_(owner)
118         , shutdown_requested_(false)
119         , auto_remove_(false) {
120       CHECK_NOTNULL(owner);
121     }
122 
123     /*!
124      * \brief Destructor with cleanup
125      */
~Thread()126     virtual ~Thread() {
127       const bool self_delete = is_current_thread();
128       if (!self_delete) {
129         request_shutdown();
130         internal_join(true);
131       }
132       WriteLock guard(thread_mutex_);
133       if (thread_.load()) {
134         std::thread *thrd = thread_.load();
135         thread_ = nullptr;
136         if (self_delete) {
137           thrd->detach();
138         }
139         delete thrd;
140       }
141     }
142 
143     /*!
144      * \brief Name of the thread
145      * \return Pointer to the thread name's string
146      * \note This shoul ndly be used as immediate for the sacope of the
147      *       shared pointer pointing to this object
148      */
name()149     const char *name() const {
150       return name_.c_str();
151     }
152 
153     /*!
154      * \brief Launch the given Thread object
155      * \tparam StartFunction Function type for the thread 'main' function
156      * \tparam Args Arguments to pass to the thread 'main' function
157      * \param pThis Shared pointer for the managed thread to launch
158      * \param autoRemove if true, automatically remove this Thread object from the
159      *                   ThreadGroup owner upon exit
160      * \param start_function The Thread's 'main' function
161      * \param args Arguments to pass to the Thread's 'main' function
162      * \return true if the thread was successfully created and added to the ThreadGroup
163      *              If false is returned, the thread may have already been started, but if something
164      *              went wrong (ie duplicte thread name for the ThreadGroup), then request_shutdown()
165      *              will have been been called on the running thread
166      */
167     template<typename StartFunction, typename ...Args>
168     static bool launch(std::shared_ptr<Thread> pThis,
169                        bool autoRemove,
170                        StartFunction start_function,
171                        Args ...args);
172 
173     /*!
174      * \brief Check if this class represents the currently running thread (self)
175      * \return true if the current running thread belongs to this class
176      */
is_current_thread()177     bool is_current_thread() const {
178       ReadLock guard(thread_mutex_);
179       return thread_.load() ? (thread_.load()->get_id() == std::this_thread::get_id()) : false;
180     }
181 
182     /*!
183      * \brief Signal to this thread that a thread shutdown/exit is requested.
184      * \note This is a candidate for overrise in a derived class which may trigger shutdown
185      *       by means other than a boolean (ie condition variable, SimpleManualkEvent, etc).
186      */
request_shutdown()187     virtual void request_shutdown() {
188       shutdown_requested_ = true;
189     }
190 
191     /*!
192      * \brief Check whether shutdown has been requested (request_shutdown() was called)
193      * \return true if shutdown was requested.
194      * \note This may be overriden to match an overriden to match an overriden 'request_shutdown()',
195      *       for instance.
196      */
is_shutdown_requested()197     virtual bool is_shutdown_requested() const {
198       return shutdown_requested_.load();
199     }
200 
201     /*!
202      * \brief Check whether the thread is set to auto-remove itself from the ThreadGroup owner
203      *        when exiting
204      * \return true if the thread will auto-remove itself from the ThreadGroup owner
205      *        when exiting
206      */
is_auto_remove()207     bool is_auto_remove() const {
208       return auto_remove_;
209     }
210 
211     /*!
212      * \brief Make the thread joinable (by removing the auto_remove flag)
213      * \warning Care should be taken not to cause a race condition between this call
214      *          and parallel execution of this thread auto-removing itself
215      */
make_joinable()216     void make_joinable() {
217       auto_remove_ = false;
218     }
219 
220     /*!
221      * \brief Check whether the thread is joinable
222      * \return true if the thread is joinable
223      */
joinable()224     bool joinable() const {
225       if (thread_.load()) {
226         CHECK_EQ(auto_remove_, false);
227         // be checked by searching the group or exit event.
228         return thread_.load()->joinable();
229       }
230       return false;
231     }
232 
233     /*!
234      * \brief Thread join
235      * \note join() may not be called on auto-remove threads
236      */
join()237     void join() {
238       internal_join(false);
239     }
240 
241     /*!
242      * \brief Get this thread's id
243      * \return this thread's id
244      */
get_id()245     std::thread::id get_id() const {
246       return thread_.load()->get_id();
247     }
248 
249    private:
250     /*!
251      * \brief Internal join function
252      * \param auto_remove_ok Whether to allow join on an auto-remove thread
253      */
internal_join(bool auto_remove_ok)254     void internal_join(bool auto_remove_ok) {
255       ReadLock guard(thread_mutex_);
256       // should be careful calling (or any function externally) this when in
257       // auto-remove mode
258       if (thread_.load() && thread_.load()->get_id() != std::thread::id()) {
259         std::thread::id someId;
260         if (!auto_remove_ok) {
261           CHECK_EQ(auto_remove_, false);
262         }
263         CHECK_NOTNULL(thread_.load());
264         if (thread_.load()->joinable()) {
265           thread_.load()->join();
266         } else {
267           LOG(WARNING) << "Thread " << name_ << " ( "
268                        << thread_.load()->get_id() << " ) not joinable";
269         }
270       }
271     }
272 
273     /*!
274      * \brief Thread bootstrapping and teardown wrapper
275      * \tparam StartFunction Thread's "main" function
276      * \tparam Args Argument types to be passed to the start_function
277      * \param pThis Shared pointer to the Thread object to operate upon
278      * \param start_function Thread's "main" function (i.e. passed to launch())
279      * \param args Arguments to be passed to the start_function
280      * \return The thread's return code
281      */
282     template <typename StartFunction, typename ...Args>
283     static int entry_and_exit_f(std::shared_ptr<Thread> pThis,
284                                 StartFunction start_function,
285                                 Args... args);
286     /*! \brief Thread name */
287     std::string name_;
288     /*! \brief Shared mutex for some thread operations */
289     mutable SharedMutex thread_mutex_;
290     /*! \brief Pointer to the stl thread object */
291     std::atomic<std::thread *> thread_;
292     /*! \brief Signaled when the thread is started and ready to execute user code */
293     std::shared_ptr<ManualEvent> ready_event_;
294     /*! \brief Thread will block after setting ready_event_ until start_event_ is signaled */
295     std::shared_ptr<ManualEvent> start_event_;
296     /*! \brief The ThreadGroup ownber managing this thread's lifecycle */
297     ThreadGroup *owner_;
298     /*! \brief Flag to determine if shutdown was requested. */
299     std::atomic<bool> shutdown_requested_;
300     /*!
301      * \brief Whether to automatically remove this thread's object from the ThreadGroup when the
302      *        thread exists (perform its own cleanup)
303      */
304     std::atomic<bool> auto_remove_;
305   };
306 
307   /*!
308    * \brief Constructor
309    */
ThreadGroup()310   inline ThreadGroup()
311     : evEmpty_(std::make_shared<ManualEvent>()) {
312     evEmpty_->signal();  // Starts out empty
313   }
314 
315   /*!
316    * \brief Destructor, perform cleanup. All child threads will be exited when this
317    *        destructor completes
318    */
~ThreadGroup()319   virtual ~ThreadGroup() {
320     request_shutdown_all();
321     join_all();
322   }
323 
324   /*!
325    * \brief Check if the current thread a member if this ThreadGroup
326    * \return true if the current thread is a member of this thread group
327    * \note This lookup involved a linear search, so for a large number of threads,
328    *       is it not advised to call this function in a performance-sensitive area
329    */
is_this_thread_in()330   inline bool is_this_thread_in() const {
331     std::thread::id id = std::this_thread::get_id();
332     ReadLock guard(m_);
333     for (auto it = threads_.begin(), end = threads_.end(); it != end; ++it) {
334       std::shared_ptr<Thread> thrd = *it;
335       if (thrd->get_id() == id)
336         return true;
337     }
338     return false;
339   }
340 
341   /*!
342    * \brief Check if the current thread is a member of this ThreadGroup
343    * \param thrd The thread to search for
344    * \return true if the given thread is a member of this ThreadGroup
345    */
is_thread_in(std::shared_ptr<Thread> thrd)346   inline bool is_thread_in(std::shared_ptr<Thread> thrd) const {
347     if (thrd) {
348       std::thread::id id = thrd->get_id();
349       ReadLock guard(m_);
350       for (auto it = threads_.begin(), end = threads_.end(); it != end; ++it) {
351         std::shared_ptr<Thread> thrd = *it;
352         if (thrd->get_id() == id)
353           return true;
354       }
355       return false;
356     } else {
357       return false;
358     }
359   }
360 
361   /*!
362    * \brief Add a Thread object to this thread group
363    * \param thrd The thread to add to this ThreadGroup object
364    * \return true if the given thread was added to this ThreadGroup
365    */
add_thread(std::shared_ptr<Thread> thrd)366   inline bool add_thread(std::shared_ptr<Thread> thrd) {
367     if (thrd) {
368       WriteLock guard(m_);
369       auto iter = name_to_thread_.find(thrd->name());
370       if (iter == name_to_thread_.end()) {
371         name_to_thread_.emplace(std::make_pair(thrd->name(), thrd));
372         CHECK_EQ(threads_.insert(thrd).second, true);
373         evEmpty_->reset();
374         return true;
375       }
376     }
377     return false;
378   }
379 
380   /*!
381    * \brief Remove a Thread object from this thread group
382    * \param thrd The thread to remove from this ThreadGroup object
383    * \return true if the given thread was removed from this ThreadGroup
384    */
remove_thread(std::shared_ptr<Thread> thrd)385   inline bool remove_thread(std::shared_ptr<Thread> thrd) {
386     if (thrd) {
387       WriteLock guard(m_);
388       auto iter = threads_.find(thrd);
389       if (iter != threads_.end()) {
390         name_to_thread_.erase(thrd->name());
391         threads_.erase(iter);
392         if (threads_.empty()) {
393           evEmpty_->signal();
394         }
395         return true;
396       }
397     }
398     return false;
399   }
400 
401   /*!
402    * \brief Join all threads in this ThreadGroup
403    * \note While it is not valid to call 'join' on an auto-remove thread, this function will
404    *       wait for auto-remove threads to exit (waits for the ThreadGroup to become empty)
405    */
join_all()406   inline void join_all() {
407     CHECK_EQ(!is_this_thread_in(), true);
408     do {
409       std::unique_lock<std::mutex> lk(join_all_mtx_);
410       std::unordered_set<std::shared_ptr<Thread>> working_set;
411       {
412         ReadLock guard(m_);
413         for (auto iter = threads_.begin(), e_iter = threads_.end(); iter != e_iter; ++iter) {
414           if (!(*iter)->is_auto_remove()) {
415             working_set.emplace(*iter);
416           }
417         }
418       }
419       // Where possible, prefer to do a proper join rather than simply waiting for empty
420       // (easier to troubleshoot)
421       while (!working_set.empty()) {
422         std::shared_ptr<Thread> thrd;
423         thrd = *working_set.begin();
424         if (thrd->joinable()) {
425           thrd->join();
426         }
427         remove_thread(thrd);
428         working_set.erase(working_set.begin());
429         thrd.reset();
430       }
431       // Wait for auto-remove threads (if any) to complete
432     } while (0);
433     evEmpty_->wait();
434     CHECK_EQ(threads_.size(), 0);
435   }
436 
437   /*!
438    * \brief Call request_shutdown() on all threads in this ThreadGroup
439    * \param make_all_joinable If true, remove all auto_remove flags from child threads
440    */
441   inline void request_shutdown_all(const bool make_all_joinable = true) {
442     std::unique_lock<std::mutex> lk(join_all_mtx_);
443     ReadLock guard(m_);
444     for (auto &thread : threads_) {
445       if (make_all_joinable) {
446         thread->make_joinable();
447       }
448       thread->request_shutdown();
449     }
450   }
451 
452   /*!
453    * \brief Return the number of threads in this thread group
454    * \return Number of threads in this thread group
455    */
size()456   inline size_t size() const {
457     ReadLock guard(m_);
458     return threads_.size();
459   }
460 
461   /*!
462    * \brief Check if the ThreadGroup is empty
463    * \return true if the ThreadGroup is empty
464    */
empty()465   inline bool empty() const {
466     ReadLock guard(m_);
467     return threads_.size() == 0;
468   }
469 
470   /*!
471    * \brief Create and launch a new Thread object which will be owned by this ThreadGroup
472    * \tparam StartFunction Function type for the thread 'main' function
473    * \tparam ThreadType managedThreadclass type (in case it's derived, for instance)
474    * \tparam Args Arguments to pass to the thread 'main' function
475    * \param threadName Name if the thread. Must be unique for a ThreadGroup object
476    * \param auto_remove If true, automatically remove this Thread object from the
477    *                    ThreadGroup owner upon exit
478    * \param start_function The Thread's 'main' function
479    * \param args Arguments to pass to the Thread's 'main' function
480    * \return true if the thread was successfully created and added to the ThreadGroup
481    *              If false is returned, the thread may have already been started, but if something
482    *              went wrong (ie duplicte thread name for the ThreadGroup), then request_shutdown()
483    *              will have been been called on the running thread
484    */
485   template<typename StartFunction, typename ThreadType = Thread, typename ...Args>
create(const std::string & threadName,bool auto_remove,StartFunction start_function,Args...args)486   inline bool create(const std::string &threadName,
487                      bool auto_remove,
488                      StartFunction start_function,
489                      Args... args) {
490     typename ThreadType::SharedPtr newThread(new ThreadType(threadName, this));
491     return Thread::launch(newThread, auto_remove, start_function, args...);
492   }
493 
494   /*!
495    * \brief Lookup Thread object by name
496    * \param name Name of the thread to look up
497    * \return A shared pointer to the Thread object
498    */
thread_by_name(const std::string & name)499   inline std::shared_ptr<Thread> thread_by_name(const std::string& name) {
500     ReadLock guard(m_);
501     auto iter = name_to_thread_.find(name);
502     if (iter != name_to_thread_.end()) {
503       return iter->second;
504     }
505     return nullptr;
506   }
507 
508  private:
509   /*! \brief ThreadGroup synchronization mutex */
510   mutable SharedMutex m_;
511   /*! \brief join_all/auto_remove synchronization mutex */
512   mutable std::mutex join_all_mtx_;
513   /*! \brief Set of threads owned and managed by this ThreadGroup object */
514   std::unordered_set<std::shared_ptr<Thread>> threads_;
515   /*! \brief Manual event which is signaled when the thread group is empty */
516   std::shared_ptr<ManualEvent> evEmpty_;
517   /*! \brief name->thread mapping */
518   std::unordered_map<std::string, std::shared_ptr<Thread>> name_to_thread_;
519 };
520 
521 /*!
522  * \brief Blocking queue thread class
523  * \tparam ObjectType Object type to queue
524  * \tparam quit_item Object value to signify queue shutdown (ie nullptr for pointer type is common)
525  * \note See gtest unit test Syc.ManagedThreadLaunchQueueThread for a usage example
526  */
527 template<typename ObjectType, ObjectType quit_item>
528 class BlockingQueueThread : public ThreadGroup::Thread {
529   using BQT = BlockingQueueThread<ObjectType, quit_item>;
530 
531  public:
532   /*!
533    * \brief Constructor
534    * \param name Name for the blockin g queue thread. Must be unique for a specific ThreadGroup
535    * \param owner ThreadGroup lifecycle manafger/owner
536    * \param thrd Optionally attach an existing stl thread object
537    */
538   BlockingQueueThread(const std::string& name,
539                       dmlc::ThreadGroup *owner,
540                       std::thread *thrd = nullptr)
Thread(std::move (name),owner,thrd)541     : ThreadGroup::Thread(std::move(name), owner, thrd)
542       , shutdown_in_progress_(false) {
543   }
544 
545 
546   /*!
547    * \brief Destructor
548    */
~BlockingQueueThread()549   ~BlockingQueueThread() override {
550     // Call to parent first because we don't want to wait for the queue to empty
551     ThreadGroup::Thread::request_shutdown();
552     request_shutdown();
553   }
554 
555   /*!
556    * \brief Signal the thread that a shutdown is desired
557    * \note Since consumer doesn't necessarily get items in order, we must wait for
558    *       the queue to empty.
559    *       This is generally a shutdown procedure and should not be called from
560    *       a performance-sensitive area
561    */
request_shutdown()562   void request_shutdown() override {
563     shutdown_in_progress_ = true;
564     while (queue_->size_approx() > 0 && !ThreadGroup::Thread::is_shutdown_requested()) {
565       std::this_thread::sleep_for(std::chrono::milliseconds(1));
566     }
567     ThreadGroup::Thread::request_shutdown();
568     queue_->enqueue(quit_item);
569   }
570 
571   /*!
572    * \brief Enqueue and item
573    * \param item The item to enqueue
574    */
enqueue(const ObjectType & item)575   void enqueue(const ObjectType& item) {
576     if (!shutdown_in_progress_) {
577       queue_->enqueue(item);
578     }
579   }
580 
581   /*!
582    * \brief Get the approximate size of the queue
583    * \return The approximate size of the queue
584    */
size_approx()585   size_t size_approx() const { return queue_->size_approx(); }
586 
587   /*!
588    * \brief Launch to the 'run' function which will, in turn, call the class'
589    *        'run' function, passing it the given 'secondary_function'
590    *        for it to call as needed
591    * \tparam SecondaryFunction Type of the secondary function for 'run' override
592    *         to call as needed
593    * \param pThis Pointer to the managed thread to launch
594    * \param secondary_function secondary function for 'run' override to call as needed
595    * \return true if thread is launched successfully and added to the ThreadGroup
596    */
597   template<typename SecondaryFunction>
launch_run(std::shared_ptr<BQT> pThis,SecondaryFunction secondary_function)598   static bool launch_run(std::shared_ptr<BQT> pThis,
599                          SecondaryFunction secondary_function) {
600     return ThreadGroup::Thread::launch(pThis, true, [](std::shared_ptr<BQT> pThis,
601                                                        SecondaryFunction secondary_function) {
602                                          return pThis->run(secondary_function);
603                                        },
604                                        pThis, secondary_function);
605   }
606 
607   /*!
608    * \brief Thread's main queue processing function
609    * \tparam OnItemFunction Function type to call when an item is dequeued
610    * \param on_item_function Function to call when an item is dequeued
611    * \return 0 if completed through a `quit_item`, nonzero if on_item_function requested an exit
612    */
613   template<typename OnItemFunction>
run(OnItemFunction on_item_function)614   inline int run(OnItemFunction on_item_function) {
615     int rc = 0;
616     do {
617       ObjectType item;
618       queue_->wait_dequeue(item);
619       if (item == quit_item) {
620         break;
621       }
622       rc = on_item_function(item);
623       if (rc) {
624         break;
625       }
626     } while (true);
627     return rc;
628   }
629 
630  private:
631   /*! \brief The blocking queue associated with this thread */
632   std::shared_ptr<dmlc::moodycamel::BlockingConcurrentQueue<ObjectType>> queue_ =
633     std::make_shared<dmlc::moodycamel::BlockingConcurrentQueue<ObjectType>>();
634   /*! \brief Whether shutdown request is in progress */
635   std::atomic<bool> shutdown_in_progress_;
636 };
637 
638 /*!
639  * \brief Managed timer thread
640  * \tparam Duration Duration type (ie seconds, microseconds, etc)
641  */
642 template<typename Duration>
643 class TimerThread : public ThreadGroup::Thread {
644   using ThreadGroup::Thread::is_shutdown_requested;
645 
646  public:
647   /*!
648    * \brief Constructor
649    * \param name Name of the timer thread
650    * \param owner ThreadGroup owner if the timer thread
651    */
TimerThread(const std::string & name,ThreadGroup * owner)652   TimerThread(const std::string& name, ThreadGroup *owner)
653     : Thread(name, owner) {
654   }
655 
656   /*!
657    * \brief Destructor
658    */
~TimerThread()659   ~TimerThread() override {
660     request_shutdown();
661   }
662 
663   /*!
664    * \brief Launch to the 'run' function which will, in turn, call the class'
665    *        'run' function, passing it the given 'secondary_function'
666    *        for it to call as needed
667    * \tparam SecondaryFunction Type of the secondary function for 'run' override
668    *         to call as needed
669    * \param pThis Pointer to the managed thread to launch
670    * \param secondary_function secondary function for 'run' override to call as needed
671    * \return true if thread is launched successfully and added to the ThreadGroup
672    */
673   template<typename SecondaryFunction>
launch_run(std::shared_ptr<TimerThread<Duration>> pThis,SecondaryFunction secondary_function)674   static bool launch_run(std::shared_ptr<TimerThread<Duration>> pThis,
675                          SecondaryFunction secondary_function) {
676     return ThreadGroup::Thread::launch(pThis, true, [](std::shared_ptr<TimerThread<Duration>> pThis,
677                                                        SecondaryFunction secondary_function) {
678                                          return pThis->run(secondary_function);
679                                        },
680                                        pThis, secondary_function);
681   }
682 
683   /*!
684    * \brief Start a given timer thread
685    * \tparam Function Type of the timer function
686    * \param timer_thread Thread object to perform the timer events
687    * \param duration Duration between the end end of the timer function and the next timer event
688    * \param function Function to call when the timer expires
689    * \note Calling shutdown_requested() will cause the thread to exit the next time that the timer
690    *       expires.
691    */
692   template<typename Function>
start(std::shared_ptr<TimerThread> timer_thread,Duration duration,Function function)693   static void start(std::shared_ptr<TimerThread> timer_thread,
694                     Duration duration,
695                     Function function) {
696     timer_thread->duration_ = duration;
697     launch_run(timer_thread, function);
698   }
699 
700   /*!
701    * \brief Internal timer execution function
702    * \tparam OnTimerFunction Type of function to call each time the timer expires
703    * \param on_timer_function Function to call each time the timer expires
704    * \return Exit code of the thread
705    */
706   template<typename OnTimerFunction>
run(OnTimerFunction on_timer_function)707   inline int run(OnTimerFunction on_timer_function) {
708     int rc = 0;
709     while (!is_shutdown_requested()) {
710       std::this_thread::sleep_for(duration_);
711       if (!is_shutdown_requested()) {
712         rc = on_timer_function();
713       }
714     }
715     return rc;
716   }
717 
718  private:
719   Duration duration_;
720 };
721 
722 /*
723  * Inline functions - see declarations for usage
724  */
725 template <typename StartFunction, typename ...Args>
entry_and_exit_f(std::shared_ptr<Thread> pThis,StartFunction start_function,Args...args)726 inline int ThreadGroup::Thread::entry_and_exit_f(std::shared_ptr<Thread> pThis,
727                                                  StartFunction start_function,
728                                                  Args... args) {
729   int rc;
730   if (pThis) {
731     // Signal launcher that we're up and running
732     pThis->ready_event_->signal();
733     // Wait for launcher to be ready for us to start
734     pThis->start_event_->wait();
735     // Reset start_event_ for possible reuse
736     pThis->start_event_->reset();  // Reset in case it needs to be reused
737     // If we haven't been requested to shut down prematurely, then run the desired function
738     if (!pThis->is_shutdown_requested()) {
739       rc = start_function(args...);
740     } else {
741       rc = -1;
742     }
743     // If we're set up as auto-remove, then remove this thread from the thread group
744     if (pThis->is_auto_remove()) {
745       pThis->owner_->remove_thread(pThis);
746     }
747     // Release this thread shared pinter. May or may not be the last reference.
748     pThis.reset();
749   } else {
750     LOG(ERROR) << "Null pThis thread pointer";
751     rc = EINVAL;
752   }
753   return rc;
754 }
755 
756 template<typename StartFunction, typename ...Args>
launch(std::shared_ptr<Thread> pThis,bool autoRemove,StartFunction start_function,Args...args)757 inline bool ThreadGroup::Thread::launch(std::shared_ptr<Thread> pThis,
758                                         bool autoRemove,
759                                         StartFunction start_function,
760                                         Args ...args) {
761   WriteLock guard(pThis->thread_mutex_);
762   CHECK_EQ(!pThis->thread_.load(), true);
763   CHECK_NOTNULL(pThis->owner_);
764   // Set auto remove
765   pThis->auto_remove_ = autoRemove;
766   // Create the actual stl thread object
767   pThis->thread_ = new std::thread(Thread::template entry_and_exit_f<
768                                      StartFunction, Args...>,
769                                    pThis,
770                                    start_function,
771                                    args...);
772   // Attempt to add the thread to the thread group (after started, since in case
773   // something goes wrong, there's not a zombie thread in the thread group)
774   if (!pThis->owner_->add_thread(pThis)) {
775     pThis->request_shutdown();
776     LOG(ERROR) << "Duplicate thread name within the same thread group is not allowed";
777   }
778   // Wait for the thread to spin up
779   pThis->ready_event_->wait();
780   // Signal the thgread to continue (it will check its shutdown status)
781   pThis->start_event_->signal();
782   // Return if successful
783   return pThis->thread_.load() != nullptr;
784 }
785 
786 /*!
787  * \brief Utility function to easily create a timer
788  * \tparam Duration Duration type (i.e. std::chrono::milliseconds)
789  * \tparam TimerFunction Function to call each time the timer expires
790  * \param timer_name Name of the timer. Must be unique per ThreadGroup object
791  * \param duration Duration of the timer between calls to timer_function
792  * \param owner ThreadGroup owner of the timer
793  * \param timer_function Function to call each time the timer expires
794  * \return true if the timer was successfully created
795  */
796 template<typename Duration, typename TimerFunction>
CreateTimer(const std::string & timer_name,const Duration & duration,ThreadGroup * owner,TimerFunction timer_function)797 inline bool CreateTimer(const std::string& timer_name,
798                         const Duration& duration,
799                         ThreadGroup *owner,
800                         TimerFunction timer_function) {
801   std::shared_ptr<dmlc::TimerThread<Duration>> timer_thread =
802     std::make_shared<dmlc::TimerThread<Duration>>(timer_name, owner);
803   dmlc::TimerThread<Duration>::start(timer_thread, duration, timer_function);
804   return timer_thread != nullptr;
805 }
806 }  // namespace dmlc
807 
808 #endif  // DMLC_THREAD_GROUP_H_
809