1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <chrono>
20 #include <condition_variable>
21 #include <mutex>
22 #include <thread>
23 #include <unordered_map>
24 #include <vector>
25 
26 #include <folly/Function.h>
27 #include <folly/Range.h>
28 #include <folly/hash/Hash.h>
29 
30 namespace folly {
31 
32 /**
33  * Schedules any number of functions to run at various intervals. E.g.,
34  *
35  *   FunctionScheduler fs;
36  *
37  *   fs.addFunction([&] { LOG(INFO) << "tick..."; }, seconds(1), "ticker");
38  *   fs.addFunction(std::bind(&TestClass::doStuff, this), minutes(5), "stuff");
39  *   fs.start();
40  *   ........
41  *   fs.cancelFunction("ticker");
42  *   fs.addFunction([&] { LOG(INFO) << "tock..."; }, minutes(3), "tocker");
43  *   ........
44  *   fs.shutdown();
45  *
46  *
47  * Note: the class uses only one thread - if you want to use more than one
48  *       thread, either use multiple FunctionScheduler objects, or check out
49  *       ThreadedRepeatingFunctionRunner.h for a much simpler contract of
50  *       "run each function periodically in its own thread".
51  *
52  * start() schedules the functions, while shutdown() terminates further
53  * scheduling.
54  */
55 class FunctionScheduler {
56  public:
57   FunctionScheduler();
58   ~FunctionScheduler();
59 
60   /**
61    * By default steady is false, meaning schedules may lag behind overtime.
62    * This could be due to long running tasks or time drift because of randomness
63    * in thread wakeup time.
64    * By setting steady to true, FunctionScheduler will attempt to catch up.
65    * i.e. more like a cronjob
66    *
67    * NOTE: it's only safe to set this before calling start()
68    */
setSteady(bool steady)69   void setSteady(bool steady) { steady_ = steady; }
70 
71   /*
72    * Parameters to control the function interval.
73    *
74    * If isPoisson is true, then use std::poisson_distribution to pick the
75    * interval between each invocation of the function.
76    *
77    * If isPoisson is false, then always use the fixed interval specified to
78    * addFunction().
79    */
80   struct LatencyDistribution {
81     bool isPoisson;
82     std::chrono::microseconds poissonMean;
83 
LatencyDistributionLatencyDistribution84     LatencyDistribution(bool poisson, std::chrono::microseconds mean)
85         : isPoisson(poisson), poissonMean(mean) {}
86   };
87 
88   /**
89    * Adds a new function to the FunctionScheduler.
90    *
91    * Functions will not be run until start() is called.  When start() is
92    * called, each function will be run after its specified startDelay.
93    * Functions may also be added after start() has been called, in which case
94    * startDelay is still honored.
95    *
96    * Throws an exception on error.  In particular, each function must have a
97    * unique name--two functions cannot be added with the same name.
98    */
99   void addFunction(
100       Function<void()>&& cb,
101       std::chrono::microseconds interval,
102       StringPiece nameID = StringPiece(),
103       std::chrono::microseconds startDelay = std::chrono::microseconds(0));
104 
105   /*
106    * Add a new function to the FunctionScheduler with a specified
107    * LatencyDistribution
108    */
109   void addFunction(
110       Function<void()>&& cb,
111       std::chrono::microseconds interval,
112       const LatencyDistribution& latencyDistr,
113       StringPiece nameID = StringPiece(),
114       std::chrono::microseconds startDelay = std::chrono::microseconds(0));
115 
116   /**
117    * Adds a new function to the FunctionScheduler to run only once.
118    */
119   void addFunctionOnce(
120       Function<void()>&& cb,
121       StringPiece nameID = StringPiece(),
122       std::chrono::microseconds startDelay = std::chrono::microseconds(0));
123 
124   /**
125    * Add a new function to the FunctionScheduler with the time
126    * interval being distributed uniformly within the given interval
127    * [minInterval, maxInterval].
128    */
129   void addFunctionUniformDistribution(
130       Function<void()>&& cb,
131       std::chrono::microseconds minInterval,
132       std::chrono::microseconds maxInterval,
133       StringPiece nameID,
134       std::chrono::microseconds startDelay);
135 
136   /**
137    * Add a new function to the FunctionScheduler whose start times are attempted
138    * to be scheduled so that they are congruent modulo the interval.
139    * Note: The scheduling of the next run time happens right before the function
140    * invocation, so the first time a function takes more time than the interval,
141    * it will be reinvoked immediately.
142    */
143   void addFunctionConsistentDelay(
144       Function<void()>&& cb,
145       std::chrono::microseconds interval,
146       StringPiece nameID = StringPiece(),
147       std::chrono::microseconds startDelay = std::chrono::microseconds(0));
148 
149   /**
150    * A type alias for function that is called to determine the time
151    * interval for the next scheduled run.
152    */
153   using IntervalDistributionFunc = Function<std::chrono::microseconds()>;
154   /**
155    * A type alias for function that returns the next run time, given the current
156    * run time and the current start time.
157    */
158   using NextRunTimeFunc = Function<std::chrono::steady_clock::time_point(
159       std::chrono::steady_clock::time_point,
160       std::chrono::steady_clock::time_point)>;
161 
162   /**
163    * Add a new function to the FunctionScheduler. The scheduling interval
164    * is determined by the interval distribution functor, which is called
165    * every time the next function execution is scheduled. This allows
166    * for supporting custom interval distribution algorithms in addition
167    * to built in constant interval; and Poisson and jitter distributions
168    * (@see FunctionScheduler::addFunction and
169    * @see FunctionScheduler::addFunctionJitterInterval).
170    */
171   void addFunctionGenericDistribution(
172       Function<void()>&& cb,
173       IntervalDistributionFunc&& intervalFunc,
174       const std::string& nameID,
175       const std::string& intervalDescr,
176       std::chrono::microseconds startDelay);
177 
178   /**
179    * Like addFunctionGenericDistribution, adds a new function to the
180    * FunctionScheduler, but the next run time is determined directly by the
181    * given functor, rather than by adding an interval.
182    */
183   void addFunctionGenericNextRunTimeFunctor(
184       Function<void()>&& cb,
185       NextRunTimeFunc&& fn,
186       const std::string& nameID,
187       const std::string& intervalDescr,
188       std::chrono::microseconds startDelay);
189 
190   /**
191    * Cancels the function with the specified name, so it will no longer be run.
192    *
193    * Returns false if no function exists with the specified name.
194    */
195   bool cancelFunction(StringPiece nameID);
196   bool cancelFunctionAndWait(StringPiece nameID);
197 
198   /**
199    * All functions registered will be canceled.
200    */
201   void cancelAllFunctions();
202   void cancelAllFunctionsAndWait();
203 
204   /**
205    * Resets the specified function's timer.
206    * When resetFunctionTimer is called, the specified function's timer will
207    * be reset with the same parameters it was passed initially, including
208    * its startDelay. If the startDelay was 0, the function will be invoked
209    * immediately.
210    *
211    * Returns false if no function exists with the specified name.
212    */
213   bool resetFunctionTimer(StringPiece nameID);
214 
215   /**
216    * Starts the scheduler.
217    *
218    * Returns false if the scheduler was already running.
219    */
220   bool start();
221 
222   /**
223    * Stops the FunctionScheduler.
224    *
225    * It may be restarted later by calling start() again.
226    * Returns false if the scheduler was not running.
227    */
228   bool shutdown();
229 
230   /**
231    * Set the name of the worker thread.
232    */
233   void setThreadName(StringPiece threadName);
234 
235  private:
236   struct RepeatFunc {
237     Function<void()> cb;
238     NextRunTimeFunc nextRunTimeFunc;
239     std::chrono::steady_clock::time_point nextRunTime;
240     std::string name;
241     std::chrono::microseconds startDelay;
242     std::string intervalDescr;
243     bool runOnce;
244 
RepeatFuncRepeatFunc245     RepeatFunc(
246         Function<void()>&& cback,
247         IntervalDistributionFunc&& intervalFn,
248         const std::string& nameID,
249         const std::string& intervalDistDescription,
250         std::chrono::microseconds delay,
251         bool once)
252         : RepeatFunc(
253               std::move(cback),
254               getNextRunTimeFunc(std::move(intervalFn)),
255               nameID,
256               intervalDistDescription,
257               delay,
258               once) {}
259 
RepeatFuncRepeatFunc260     RepeatFunc(
261         Function<void()>&& cback,
262         NextRunTimeFunc&& nextRunTimeFn,
263         const std::string& nameID,
264         const std::string& intervalDistDescription,
265         std::chrono::microseconds delay,
266         bool once)
267         : cb(std::move(cback)),
268           nextRunTimeFunc(std::move(nextRunTimeFn)),
269           nextRunTime(),
270           name(nameID),
271           startDelay(delay),
272           intervalDescr(intervalDistDescription),
273           runOnce(once) {}
274 
getNextRunTimeFuncRepeatFunc275     static NextRunTimeFunc getNextRunTimeFunc(
276         IntervalDistributionFunc&& intervalFn) {
277       return [intervalFn = std::move(intervalFn)](
278                  std::chrono::steady_clock::time_point /* curNextRunTime */,
279                  std::chrono::steady_clock::time_point curTime) mutable {
280         return curTime + intervalFn();
281       };
282     }
283 
getNextRunTimeRepeatFunc284     std::chrono::steady_clock::time_point getNextRunTime() const {
285       return nextRunTime;
286     }
setNextRunTimeStrictRepeatFunc287     void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) {
288       nextRunTime = nextRunTimeFunc(nextRunTime, curTime);
289     }
setNextRunTimeSteadyRepeatFunc290     void setNextRunTimeSteady() {
291       nextRunTime = nextRunTimeFunc(nextRunTime, nextRunTime);
292     }
resetNextRunTimeRepeatFunc293     void resetNextRunTime(std::chrono::steady_clock::time_point curTime) {
294       nextRunTime = curTime + startDelay;
295     }
cancelRepeatFunc296     void cancel() {
297       // Simply reset cb to an empty function.
298       cb = {};
299     }
isValidRepeatFunc300     bool isValid() const { return bool(cb); }
301   };
302 
303   struct RunTimeOrder {
operatorRunTimeOrder304     bool operator()(
305         const std::unique_ptr<RepeatFunc>& f1,
306         const std::unique_ptr<RepeatFunc>& f2) const {
307       return f1->getNextRunTime() > f2->getNextRunTime();
308     }
309   };
310 
311   typedef std::vector<std::unique_ptr<RepeatFunc>> FunctionHeap;
312   typedef std::unordered_map<StringPiece, RepeatFunc*, Hash> FunctionMap;
313 
314   void run();
315   void runOneFunction(
316       std::unique_lock<std::mutex>& lock,
317       std::chrono::steady_clock::time_point now);
318   void cancelFunction(const std::unique_lock<std::mutex>& lock, RepeatFunc* it);
319   void addFunctionToHeap(
320       const std::unique_lock<std::mutex>& lock,
321       std::unique_ptr<RepeatFunc> func);
322 
323   template <typename RepeatFuncNextRunTimeFunc>
324   void addFunctionToHeapChecked(
325       Function<void()>&& cb,
326       RepeatFuncNextRunTimeFunc&& fn,
327       const std::string& nameID,
328       const std::string& intervalDescr,
329       std::chrono::microseconds startDelay,
330       bool runOnce);
331 
332   void addFunctionInternal(
333       Function<void()>&& cb,
334       NextRunTimeFunc&& fn,
335       const std::string& nameID,
336       const std::string& intervalDescr,
337       std::chrono::microseconds startDelay,
338       bool runOnce);
339   void addFunctionInternal(
340       Function<void()>&& cb,
341       IntervalDistributionFunc&& fn,
342       const std::string& nameID,
343       const std::string& intervalDescr,
344       std::chrono::microseconds startDelay,
345       bool runOnce);
346 
347   // Return true if the current function is being canceled
348   bool cancelAllFunctionsWithLock(std::unique_lock<std::mutex>& lock);
349   bool cancelFunctionWithLock(
350       std::unique_lock<std::mutex>& lock, StringPiece nameID);
351 
352   std::thread thread_;
353 
354   // Mutex to protect our member variables.
355   std::mutex mutex_;
356   bool running_{false};
357 
358   // The functions to run.
359   // This is a heap, ordered by next run time.
360   FunctionHeap functions_;
361   FunctionMap functionsMap_;
362   RunTimeOrder fnCmp_;
363 
364   // The function currently being invoked by the running thread.
365   // This is null when the running thread is idle
366   RepeatFunc* currentFunction_{nullptr};
367 
368   // Condition variable that is signalled whenever a new function is added
369   // or when the FunctionScheduler is stopped.
370   std::condition_variable runningCondvar_;
371 
372   std::string threadName_;
373   bool steady_{false};
374   bool cancellingCurrentFunction_{false};
375 };
376 
377 } // namespace folly
378