1 #ifndef UTIL___THREAD_POOL_CTRL__HPP
2 #define UTIL___THREAD_POOL_CTRL__HPP
3 
4 /*  $Id: thread_pool_ctrl.hpp 132195 2008-06-26 12:58:47Z ivanovp $
5  * ===========================================================================
6  *
7  *                            PUBLIC DOMAIN NOTICE
8  *               National Center for Biotechnology Information
9  *
10  *  This software/database is a "United States Government Work" under the
11  *  terms of the United States Copyright Act.  It was written as part of
12  *  the author's official duties as a United States Government employee and
13  *  thus cannot be copyrighted.  This software/database is freely available
14  *  to the public for use. The National Library of Medicine and the U.S.
15  *  Government have not placed any restriction on its use or reproduction.
16  *
17  *  Although all reasonable efforts have been taken to ensure the accuracy
18  *  and reliability of the software and data, the NLM and the U.S.
19  *  Government do not and cannot warrant the performance or results that
20  *  may be obtained by using this software or data. The NLM and the U.S.
21  *  Government disclaim all warranties, express or implied, including
22  *  warranties of performance, merchantability or fitness for any particular
23  *  purpose.
24  *
25  *  Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Author:  Pavel Ivanov
30  *
31  */
32 
33 /// @file thread_pool_ctrl.hpp
34 /// Implementations of controllers for ThreadPool
35 ///
36 ///  CThreadPool_Controller_PID -- default controller of pool of threads based
37 ///     on Proportional-Integral-Derrivative algorithm depending on number
38 ///     of requests waiting for execution per thread in pool
39 
40 
41 #include <util/thread_pool.hpp>
42 
43 #include <deque>
44 
45 
46 /** @addtogroup ThreadedPools
47  *
48  * @{
49  */
50 
51 
52 BEGIN_NCBI_SCOPE
53 
54 /// Entry in "error" changing history
55 /// Information about "error" in some point of time in the past kept in
56 /// CThreadPool_Control_PID.
57 struct SThreadPool_PID_ErrInfo
58 {
59     /// Time of history entry
60     double call_time;
61     /// Value of the error
62     double err;
63 
SThreadPool_PID_ErrInfoSThreadPool_PID_ErrInfo64     SThreadPool_PID_ErrInfo(double time_, double err_)
65         : call_time(time_), err(err_)
66     {}
67 };
68 
69 
70 
71 /// Default ThreadPool controller based on Proportional-Integral-Derivative
72 /// algorithm. Controller looks at number of tasks waiting in the queue
73 /// per each thread running and adjusts number of threads with respect to
74 /// all coefficients set in it.
75 /// Implementation of the class assumes that all coefficients are set before
76 /// pool begin to work and controller begins to be extencively used.
77 /// All changing of coefficients implemented in non-threadsafe manner and if
78 /// they will be changed at the same time when OnEvent() is executed
79 /// unpredictable consequences can happen.
80 class CThreadPool_Controller_PID : public CThreadPool_Controller
81 {
82 public:
83     /// Constructor
84     /// @param max_threads
85     ///   Maximum number of threads in pool
86     /// @param min_threads
87     ///   Minimum number of threads in pool
88     CThreadPool_Controller_PID(unsigned int  max_threads,
89                                unsigned int  min_threads);
90 
91     /// Set maximum number of tasks in queue per each thread
92     /// The meaning of parameter is only approximate. In fact it is the
93     /// coefficient in proportional part of the algorithm and adjustment for
94     /// all other coefficients.
95     /// By default parameter is set to 3.
96     void SetQueuedTasksThreshold(double threshold);
97 
98     /// Get maximum number of tasks in queue per each thread
99     ///
100     /// @sa SetQueuedTasksThreshold()
101     double GetQueuedTasksThreshold(void);
102 
103     /// Set maximum time (in seconds) that task can wait in queue for
104     /// processing until new thread will be launched.
105     /// The meaning of parameter is only approximate. In fact it is the
106     /// coefficient in integral part of the algorithm and effectively if only
107     /// one task will be considered then coefficient will be multiplied
108     /// by number of currently running threads and currently set threshold.
109     /// By default parameter is set to 0.2.
110     ///
111     /// @sa SetQueuedTasksThreshold()
112     void SetTaskMaxQueuedTime(double queued_time);
113 
114     /// Get maximum time that task can wait in queue for processing until
115     /// new thread will be launched.
116     ///
117     /// @sa SetTaskMaxQueuedTime()
118     double GetTaskMaxQueuedTime(void);
119 
120     /// Set the time period (in seconds) for which average speed of changing
121     /// of waiting tasks number is calculated.
122     /// Average speed is calculated by simple division of changing in waiting
123     /// tasks number during this time period per time period value (all
124     /// counts of tasks are calculated per each thread).
125     /// By default parameter is set to 0.3.
126     void SetChangeCalcTime(double calc_time);
127 
128     /// Get the time period for which average speed of changing of waiting
129     /// tasks number is calculated.
130     double GetChangeCalcTime(void);
131 
132     /// Set period of prediction of number of tasks in queue
133     /// The meaning of parameter is only approximate. In fact it is the
134     /// coefficient in derivative part of the algorithm. Meaning of the
135     /// coefficient is like this: take average speed of changing of tasks
136     /// count, multiply it by this prediction time, if the resulting value
137     /// is greater than threshold then new thread is needed.
138     /// By default parameter is set to 0.5.
139     ///
140     /// @sa SetQueuedTasksThreshold()
141     void SetChangePredictTime(double predict_time);
142 
143     /// Get period of prediction of number of tasks in queue
144     ///
145     /// @sa SetChangePredictTime()
146     double GetChangePredictTime(void);
147 
148     /// Get maximum timeout for which calls to method HandleEvent() can be
149     /// missing.
150     ///
151     /// @sa CThreadPool_Controller::GetSafeSleepTime()
152     virtual CTimeSpan GetSafeSleepTime(void) const;
153 
154 protected:
155     /// Main method for implementation of controlling algorithm
156     virtual void OnEvent(EEvent event);
157 
158 private:
159     /// Timer for measuring time periods
160     CStopWatch                      m_Timer;
161     /// History of changing of "error" value
162     /// "error" - number of tasks per thread waiting in queue. Controller
163     /// will try to tend this value to zero.
164     deque<SThreadPool_PID_ErrInfo>  m_ErrHistory;
165     /// Value of "error" integrated over all working time
166     double                          m_IntegrErr;
167     /// Threshold value
168     /// @sa SetQueuedTasksThreshold()
169     double                          m_Threshold;
170     /// Integral coefficient
171     /// @sa SetTaskMaxQueuedTime()
172     double                          m_IntegrCoeff;
173     /// Derivative coefficient
174     /// @sa SetChangePredictTime()
175     double                          m_DerivCoeff;
176     /// Period of taking average "error" change speed
177     /// @sa SetChangeCalcTime()
178     double                          m_DerivTime;
179 };
180 
181 
182 //////////////////////////////////////////////////////////////////////////
183 //  All inline methods
184 //////////////////////////////////////////////////////////////////////////
185 
186 inline
SetQueuedTasksThreshold(double threshold)187 void CThreadPool_Controller_PID::SetQueuedTasksThreshold(double threshold)
188 {
189     m_Threshold = threshold;
190 }
191 
192 inline
GetQueuedTasksThreshold(void)193 double CThreadPool_Controller_PID::GetQueuedTasksThreshold(void)
194 {
195     return m_Threshold;
196 }
197 
198 inline
SetTaskMaxQueuedTime(double queued_time)199 void CThreadPool_Controller_PID::SetTaskMaxQueuedTime(double queued_time)
200 {
201     m_IntegrCoeff = queued_time;
202 }
203 
204 inline
GetTaskMaxQueuedTime(void)205 double CThreadPool_Controller_PID::GetTaskMaxQueuedTime(void)
206 {
207     return m_IntegrCoeff;
208 }
209 
210 inline
SetChangeCalcTime(double calc_time)211 void CThreadPool_Controller_PID::SetChangeCalcTime(double calc_time)
212 {
213     m_DerivTime = calc_time;
214 }
215 
216 inline
GetChangeCalcTime(void)217 double CThreadPool_Controller_PID::GetChangeCalcTime(void)
218 {
219     return m_DerivTime;
220 }
221 
222 inline
SetChangePredictTime(double predict_time)223 void CThreadPool_Controller_PID::SetChangePredictTime(double predict_time)
224 {
225     m_DerivCoeff = predict_time;
226 }
227 
228 inline
GetChangePredictTime(void)229 double CThreadPool_Controller_PID::GetChangePredictTime(void)
230 {
231     return m_DerivCoeff;
232 }
233 
234 
235 END_NCBI_SCOPE
236 
237 
238 /* @} */
239 
240 #endif  /* UTIL___THREAD_POOL_CTRL__HPP */
241