1 // Copyright(c) 2015-present, Gabi Melman & spdlog contributors.
2 // Distributed under the MIT License (http://opensource.org/licenses/MIT)
3
4 #pragma once
5
6 #ifndef SPDLOG_HEADER_ONLY
7 #include <spdlog/details/thread_pool.h>
8 #endif
9
10 #include <spdlog/common.h>
11 #include <cassert>
12
13 namespace spdlog {
14 namespace details {
15
thread_pool(size_t q_max_items,size_t threads_n,std::function<void ()> on_thread_start)16 SPDLOG_INLINE thread_pool::thread_pool(size_t q_max_items, size_t threads_n, std::function<void()> on_thread_start)
17 : q_(q_max_items)
18 {
19 if (threads_n == 0 || threads_n > 1000)
20 {
21 throw_spdlog_ex("spdlog::thread_pool(): invalid threads_n param (valid "
22 "range is 1-1000)");
23 }
24 for (size_t i = 0; i < threads_n; i++)
25 {
26 threads_.emplace_back([this, on_thread_start] {
27 on_thread_start();
28 this->thread_pool::worker_loop_();
29 });
30 }
31 }
32
thread_pool(size_t q_max_items,size_t threads_n)33 SPDLOG_INLINE thread_pool::thread_pool(size_t q_max_items, size_t threads_n)
34 : thread_pool(q_max_items, threads_n, [] {})
35 {}
36
37 // message all threads to terminate gracefully join them
~thread_pool()38 SPDLOG_INLINE thread_pool::~thread_pool()
39 {
40 SPDLOG_TRY
41 {
42 for (size_t i = 0; i < threads_.size(); i++)
43 {
44 post_async_msg_(async_msg(async_msg_type::terminate), async_overflow_policy::block);
45 }
46
47 for (auto &t : threads_)
48 {
49 t.join();
50 }
51 }
52 SPDLOG_CATCH_ALL() {}
53 }
54
post_log(async_logger_ptr && worker_ptr,const details::log_msg & msg,async_overflow_policy overflow_policy)55 void SPDLOG_INLINE thread_pool::post_log(async_logger_ptr &&worker_ptr, const details::log_msg &msg, async_overflow_policy overflow_policy)
56 {
57 async_msg async_m(std::move(worker_ptr), async_msg_type::log, msg);
58 post_async_msg_(std::move(async_m), overflow_policy);
59 }
60
post_flush(async_logger_ptr && worker_ptr,async_overflow_policy overflow_policy)61 void SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy)
62 {
63 post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush), overflow_policy);
64 }
65
overrun_counter()66 size_t SPDLOG_INLINE thread_pool::overrun_counter()
67 {
68 return q_.overrun_counter();
69 }
70
post_async_msg_(async_msg && new_msg,async_overflow_policy overflow_policy)71 void SPDLOG_INLINE thread_pool::post_async_msg_(async_msg &&new_msg, async_overflow_policy overflow_policy)
72 {
73 if (overflow_policy == async_overflow_policy::block)
74 {
75 q_.enqueue(std::move(new_msg));
76 }
77 else
78 {
79 q_.enqueue_nowait(std::move(new_msg));
80 }
81 }
82
worker_loop_()83 void SPDLOG_INLINE thread_pool::worker_loop_()
84 {
85 while (process_next_msg_()) {}
86 }
87
88 // process next message in the queue
89 // return true if this thread should still be active (while no terminate msg
90 // was received)
process_next_msg_()91 bool SPDLOG_INLINE thread_pool::process_next_msg_()
92 {
93 async_msg incoming_async_msg;
94 bool dequeued = q_.dequeue_for(incoming_async_msg, std::chrono::seconds(10));
95 if (!dequeued)
96 {
97 return true;
98 }
99
100 switch (incoming_async_msg.msg_type)
101 {
102 case async_msg_type::log: {
103 incoming_async_msg.worker_ptr->backend_sink_it_(incoming_async_msg);
104 return true;
105 }
106 case async_msg_type::flush: {
107 incoming_async_msg.worker_ptr->backend_flush_();
108 return true;
109 }
110
111 case async_msg_type::terminate: {
112 return false;
113 }
114
115 default: {
116 assert(false);
117 }
118 }
119
120 return true;
121 }
122
123 } // namespace details
124 } // namespace spdlog
125