1 // Copyright(c) 2015-present, Gabi Melman & spdlog contributors.
2 // Distributed under the MIT License (http://opensource.org/licenses/MIT)
3 
4 #pragma once
5 
6 #ifndef SPDLOG_HEADER_ONLY
7 #include <spdlog/details/thread_pool.h>
8 #endif
9 
10 #include <spdlog/common.h>
11 #include <cassert>
12 
13 namespace spdlog {
14 namespace details {
15 
thread_pool(size_t q_max_items,size_t threads_n,std::function<void ()> on_thread_start)16 SPDLOG_INLINE thread_pool::thread_pool(size_t q_max_items, size_t threads_n, std::function<void()> on_thread_start)
17     : q_(q_max_items)
18 {
19     if (threads_n == 0 || threads_n > 1000)
20     {
21         throw_spdlog_ex("spdlog::thread_pool(): invalid threads_n param (valid "
22                         "range is 1-1000)");
23     }
24     for (size_t i = 0; i < threads_n; i++)
25     {
26         threads_.emplace_back([this, on_thread_start] {
27             on_thread_start();
28             this->thread_pool::worker_loop_();
29         });
30     }
31 }
32 
thread_pool(size_t q_max_items,size_t threads_n)33 SPDLOG_INLINE thread_pool::thread_pool(size_t q_max_items, size_t threads_n)
34     : thread_pool(q_max_items, threads_n, [] {})
35 {}
36 
37 // message all threads to terminate gracefully join them
~thread_pool()38 SPDLOG_INLINE thread_pool::~thread_pool()
39 {
40     SPDLOG_TRY
41     {
42         for (size_t i = 0; i < threads_.size(); i++)
43         {
44             post_async_msg_(async_msg(async_msg_type::terminate), async_overflow_policy::block);
45         }
46 
47         for (auto &t : threads_)
48         {
49             t.join();
50         }
51     }
52     SPDLOG_CATCH_ALL() {}
53 }
54 
post_log(async_logger_ptr && worker_ptr,const details::log_msg & msg,async_overflow_policy overflow_policy)55 void SPDLOG_INLINE thread_pool::post_log(async_logger_ptr &&worker_ptr, const details::log_msg &msg, async_overflow_policy overflow_policy)
56 {
57     async_msg async_m(std::move(worker_ptr), async_msg_type::log, msg);
58     post_async_msg_(std::move(async_m), overflow_policy);
59 }
60 
post_flush(async_logger_ptr && worker_ptr,async_overflow_policy overflow_policy)61 void SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy)
62 {
63     post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush), overflow_policy);
64 }
65 
overrun_counter()66 size_t SPDLOG_INLINE thread_pool::overrun_counter()
67 {
68     return q_.overrun_counter();
69 }
70 
post_async_msg_(async_msg && new_msg,async_overflow_policy overflow_policy)71 void SPDLOG_INLINE thread_pool::post_async_msg_(async_msg &&new_msg, async_overflow_policy overflow_policy)
72 {
73     if (overflow_policy == async_overflow_policy::block)
74     {
75         q_.enqueue(std::move(new_msg));
76     }
77     else
78     {
79         q_.enqueue_nowait(std::move(new_msg));
80     }
81 }
82 
worker_loop_()83 void SPDLOG_INLINE thread_pool::worker_loop_()
84 {
85     while (process_next_msg_()) {}
86 }
87 
88 // process next message in the queue
89 // return true if this thread should still be active (while no terminate msg
90 // was received)
process_next_msg_()91 bool SPDLOG_INLINE thread_pool::process_next_msg_()
92 {
93     async_msg incoming_async_msg;
94     bool dequeued = q_.dequeue_for(incoming_async_msg, std::chrono::seconds(10));
95     if (!dequeued)
96     {
97         return true;
98     }
99 
100     switch (incoming_async_msg.msg_type)
101     {
102     case async_msg_type::log: {
103         incoming_async_msg.worker_ptr->backend_sink_it_(incoming_async_msg);
104         return true;
105     }
106     case async_msg_type::flush: {
107         incoming_async_msg.worker_ptr->backend_flush_();
108         return true;
109     }
110 
111     case async_msg_type::terminate: {
112         return false;
113     }
114 
115     default: {
116         assert(false);
117     }
118     }
119 
120     return true;
121 }
122 
123 } // namespace details
124 } // namespace spdlog
125