1 // Copyright (C) 2019-2021 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
8 
9 #include <util/multi_threading_mgr.h>
10 
11 namespace isc {
12 namespace util {
13 
MultiThreadingMgr()14 MultiThreadingMgr::MultiThreadingMgr()
15     : enabled_(false), critical_section_count_(0), thread_pool_size_(0) {
16 }
17 
~MultiThreadingMgr()18 MultiThreadingMgr::~MultiThreadingMgr() {
19 }
20 
21 MultiThreadingMgr&
instance()22 MultiThreadingMgr::instance() {
23     static MultiThreadingMgr manager;
24     return (manager);
25 }
26 
27 bool
getMode() const28 MultiThreadingMgr::getMode() const {
29     return (enabled_);
30 }
31 
32 void
setMode(bool enabled)33 MultiThreadingMgr::setMode(bool enabled) {
34     enabled_ = enabled;
35 }
36 
37 void
enterCriticalSection()38 MultiThreadingMgr::enterCriticalSection() {
39     checkCallbacksPermissions();
40     bool inside = isInCriticalSection();
41     // Increment the counter to allow CS to be created in the registered
42     // callbacks (in which case the new CS would not call callbacks again).
43     // The counter must be updated regardless of the MT mode because the MT mode
44     // can change between the constructor call and the destructor call.
45     ++critical_section_count_;
46     if (getMode() && !inside) {
47         if (getThreadPoolSize()) {
48             thread_pool_.stop();
49         }
50         // Now it is safe to call callbacks which can also create other CSs.
51         callEntryCallbacks();
52     }
53 }
54 
55 void
exitCriticalSection()56 MultiThreadingMgr::exitCriticalSection() {
57     // The number of CS destructors should match the number of CS constructors.
58     // The case when counter is 0 is only possible if calling this function
59     // explicitly, which is a programming error.
60     if (!isInCriticalSection()) {
61         isc_throw(InvalidOperation, "invalid value for critical section count");
62     }
63     // Decrement the counter to allow the check for last CS destructor which
64     // would result in restarting the thread pool.
65     // The counter must be updated regardless of the MT mode because the MT mode
66     // can change between the constructor call and the destructor call.
67     --critical_section_count_;
68     if (getMode() && !isInCriticalSection()) {
69         if (getThreadPoolSize()) {
70             thread_pool_.start(getThreadPoolSize());
71         }
72         // Now it is safe to call callbacks which can also create other CSs.
73         callExitCallbacks();
74     }
75 }
76 
77 bool
isInCriticalSection()78 MultiThreadingMgr::isInCriticalSection() {
79     return (critical_section_count_ != 0);
80 }
81 
82 ThreadPool<std::function<void()>>&
getThreadPool()83 MultiThreadingMgr::getThreadPool() {
84     return thread_pool_;
85 }
86 
87 uint32_t
getThreadPoolSize() const88 MultiThreadingMgr::getThreadPoolSize() const {
89     return (thread_pool_size_);
90 }
91 
92 void
setThreadPoolSize(uint32_t size)93 MultiThreadingMgr::setThreadPoolSize(uint32_t size) {
94     thread_pool_size_ = size;
95 }
96 
97 uint32_t
getPacketQueueSize()98 MultiThreadingMgr::getPacketQueueSize() {
99     return (thread_pool_.getMaxQueueSize());
100 }
101 
102 void
setPacketQueueSize(uint32_t size)103 MultiThreadingMgr::setPacketQueueSize(uint32_t size) {
104     thread_pool_.setMaxQueueSize(size);
105 }
106 
107 uint32_t
detectThreadCount()108 MultiThreadingMgr::detectThreadCount() {
109     return (std::thread::hardware_concurrency());
110 }
111 
112 void
apply(bool enabled,uint32_t thread_count,uint32_t queue_size)113 MultiThreadingMgr::apply(bool enabled, uint32_t thread_count, uint32_t queue_size) {
114     // check the enabled flag
115     if (enabled) {
116         // check for auto scaling (enabled flag true but thread_count 0)
117         if (!thread_count) {
118             // might also return 0
119             thread_count = MultiThreadingMgr::detectThreadCount();
120         }
121     } else {
122         thread_count = 0;
123         queue_size = 0;
124     }
125     // check enabled flag and explicit number of threads or system supports
126     // hardware concurrency
127     if (thread_count) {
128         if (thread_pool_.size()) {
129             thread_pool_.stop();
130         }
131         setThreadPoolSize(thread_count);
132         setPacketQueueSize(queue_size);
133         setMode(true);
134         if (!isInCriticalSection()) {
135             thread_pool_.start(thread_count);
136         }
137     } else {
138         removeAllCriticalSectionCallbacks();
139         thread_pool_.reset();
140         setMode(false);
141         setThreadPoolSize(thread_count);
142         setPacketQueueSize(queue_size);
143     }
144 }
145 
146 void
checkCallbacksPermissions()147 MultiThreadingMgr::checkCallbacksPermissions() {
148     if (getMode()) {
149         for (const auto& cb : cs_callbacks_.getCallbackSets()) {
150             try {
151                 (cb.check_cb_)();
152             } catch (const isc::MultiThreadingInvalidOperation& ex) {
153                 // If any registered callback throws, the exception needs to be
154                 // propagated to the caller of the
155                 // @ref MultiThreadingCriticalSection constructor.
156                 // Because this function is called by the
157                 // @ref MultiThreadingCriticalSection constructor, throwing here
158                 // is safe.
159                 throw;
160             } catch (...) {
161                 // We can't log it and throwing could be chaos.
162                 // We'll swallow it and tell people their callbacks
163                 // must be exception-proof
164             }
165         }
166     }
167 }
168 
169 void
callEntryCallbacks()170 MultiThreadingMgr::callEntryCallbacks() {
171     if (getMode()) {
172         const auto& callbacks = cs_callbacks_.getCallbackSets();
173         for (auto cb_it = callbacks.begin(); cb_it != callbacks.end(); cb_it++) {
174             try {
175                 (cb_it->entry_cb_)();
176             } catch (...) {
177                 // We can't log it and throwing could be chaos.
178                 // We'll swallow it and tell people their callbacks
179                 // must be exception-proof
180             }
181         }
182     }
183 }
184 
185 void
callExitCallbacks()186 MultiThreadingMgr::callExitCallbacks() {
187     if (getMode()) {
188         const auto& callbacks = cs_callbacks_.getCallbackSets();
189         for (auto cb_it = callbacks.rbegin(); cb_it != callbacks.rend(); cb_it++) {
190             try {
191                 (cb_it->exit_cb_)();
192             } catch (...) {
193                 // We can't log it and throwing could be chaos.
194                 // We'll swallow it and tell people their callbacks
195                 // must be exception-proof
196                 // Because this function is called by the
197                 // @ref MultiThreadingCriticalSection destructor, throwing here
198                 // is not safe and will cause the process to crash.
199             }
200         }
201     }
202 }
203 
204 void
addCriticalSectionCallbacks(const std::string & name,const CSCallbackSet::Callback & check_cb,const CSCallbackSet::Callback & entry_cb,const CSCallbackSet::Callback & exit_cb)205 MultiThreadingMgr::addCriticalSectionCallbacks(const std::string& name,
206                                                const CSCallbackSet::Callback& check_cb,
207                                                const CSCallbackSet::Callback& entry_cb,
208                                                const CSCallbackSet::Callback& exit_cb) {
209     cs_callbacks_.addCallbackSet(name, check_cb, entry_cb, exit_cb);
210 }
211 
212 void
removeCriticalSectionCallbacks(const std::string & name)213 MultiThreadingMgr::removeCriticalSectionCallbacks(const std::string& name) {
214     cs_callbacks_.removeCallbackSet(name);
215 }
216 
217 void
removeAllCriticalSectionCallbacks()218 MultiThreadingMgr::removeAllCriticalSectionCallbacks() {
219     cs_callbacks_.removeAll();
220 }
221 
MultiThreadingCriticalSection()222 MultiThreadingCriticalSection::MultiThreadingCriticalSection() {
223     MultiThreadingMgr::instance().enterCriticalSection();
224 }
225 
~MultiThreadingCriticalSection()226 MultiThreadingCriticalSection::~MultiThreadingCriticalSection() {
227     MultiThreadingMgr::instance().exitCriticalSection();
228 }
229 
230 void
addCallbackSet(const std::string & name,const CSCallbackSet::Callback & check_cb,const CSCallbackSet::Callback & entry_cb,const CSCallbackSet::Callback & exit_cb)231 CSCallbackSetList::addCallbackSet(const std::string& name,
232                                   const CSCallbackSet::Callback& check_cb,
233                                   const CSCallbackSet::Callback& entry_cb,
234                                   const CSCallbackSet::Callback& exit_cb) {
235     if (name.empty()) {
236         isc_throw(BadValue, "CSCallbackSetList - name cannot be empty");
237     }
238 
239     if (!check_cb) {
240         isc_throw(BadValue, "CSCallbackSetList - check callback for " << name
241                   << " cannot be empty");
242     }
243 
244     if (!entry_cb) {
245         isc_throw(BadValue, "CSCallbackSetList - entry callback for " << name
246                   << " cannot be empty");
247     }
248 
249     if (!exit_cb) {
250         isc_throw(BadValue, "CSCallbackSetList - exit callback for " << name
251                   << " cannot be empty");
252     }
253 
254     for (auto const& callback : cb_sets_) {
255         if (callback.name_ == name) {
256             isc_throw(BadValue, "CSCallbackSetList - callbacks for " << name
257                       << " already exist");
258         }
259     }
260 
261     cb_sets_.push_back(CSCallbackSet(name, check_cb, entry_cb, exit_cb));
262 }
263 
264 void
removeCallbackSet(const std::string & name)265 CSCallbackSetList::removeCallbackSet(const std::string& name) {
266     for (auto it = cb_sets_.begin(); it != cb_sets_.end(); ++it) {
267         if ((*it).name_ == name) {
268             cb_sets_.erase(it);
269             break;
270         }
271     }
272 }
273 
274 void
removeAll()275 CSCallbackSetList::removeAll() {
276     cb_sets_.clear();
277 }
278 
279 const std::list<CSCallbackSet>&
getCallbackSets()280 CSCallbackSetList::getCallbackSets() {
281     return (cb_sets_);
282 }
283 
284 }  // namespace util
285 }  // namespace isc
286