1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <chrono> 20 #include <condition_variable> 21 #include <mutex> 22 #include <thread> 23 #include <unordered_map> 24 #include <vector> 25 26 #include <folly/Function.h> 27 #include <folly/Range.h> 28 #include <folly/hash/Hash.h> 29 30 namespace folly { 31 32 /** 33 * Schedules any number of functions to run at various intervals. E.g., 34 * 35 * FunctionScheduler fs; 36 * 37 * fs.addFunction([&] { LOG(INFO) << "tick..."; }, seconds(1), "ticker"); 38 * fs.addFunction(std::bind(&TestClass::doStuff, this), minutes(5), "stuff"); 39 * fs.start(); 40 * ........ 41 * fs.cancelFunction("ticker"); 42 * fs.addFunction([&] { LOG(INFO) << "tock..."; }, minutes(3), "tocker"); 43 * ........ 44 * fs.shutdown(); 45 * 46 * 47 * Note: the class uses only one thread - if you want to use more than one 48 * thread, either use multiple FunctionScheduler objects, or check out 49 * ThreadedRepeatingFunctionRunner.h for a much simpler contract of 50 * "run each function periodically in its own thread". 51 * 52 * start() schedules the functions, while shutdown() terminates further 53 * scheduling. 54 */ 55 class FunctionScheduler { 56 public: 57 FunctionScheduler(); 58 ~FunctionScheduler(); 59 60 /** 61 * By default steady is false, meaning schedules may lag behind overtime. 62 * This could be due to long running tasks or time drift because of randomness 63 * in thread wakeup time. 64 * By setting steady to true, FunctionScheduler will attempt to catch up. 65 * i.e. more like a cronjob 66 * 67 * NOTE: it's only safe to set this before calling start() 68 */ setSteady(bool steady)69 void setSteady(bool steady) { steady_ = steady; } 70 71 /* 72 * Parameters to control the function interval. 73 * 74 * If isPoisson is true, then use std::poisson_distribution to pick the 75 * interval between each invocation of the function. 76 * 77 * If isPoisson is false, then always use the fixed interval specified to 78 * addFunction(). 79 */ 80 struct LatencyDistribution { 81 bool isPoisson; 82 std::chrono::microseconds poissonMean; 83 LatencyDistributionLatencyDistribution84 LatencyDistribution(bool poisson, std::chrono::microseconds mean) 85 : isPoisson(poisson), poissonMean(mean) {} 86 }; 87 88 /** 89 * Adds a new function to the FunctionScheduler. 90 * 91 * Functions will not be run until start() is called. When start() is 92 * called, each function will be run after its specified startDelay. 93 * Functions may also be added after start() has been called, in which case 94 * startDelay is still honored. 95 * 96 * Throws an exception on error. In particular, each function must have a 97 * unique name--two functions cannot be added with the same name. 98 */ 99 void addFunction( 100 Function<void()>&& cb, 101 std::chrono::microseconds interval, 102 StringPiece nameID = StringPiece(), 103 std::chrono::microseconds startDelay = std::chrono::microseconds(0)); 104 105 /* 106 * Add a new function to the FunctionScheduler with a specified 107 * LatencyDistribution 108 */ 109 void addFunction( 110 Function<void()>&& cb, 111 std::chrono::microseconds interval, 112 const LatencyDistribution& latencyDistr, 113 StringPiece nameID = StringPiece(), 114 std::chrono::microseconds startDelay = std::chrono::microseconds(0)); 115 116 /** 117 * Adds a new function to the FunctionScheduler to run only once. 118 */ 119 void addFunctionOnce( 120 Function<void()>&& cb, 121 StringPiece nameID = StringPiece(), 122 std::chrono::microseconds startDelay = std::chrono::microseconds(0)); 123 124 /** 125 * Add a new function to the FunctionScheduler with the time 126 * interval being distributed uniformly within the given interval 127 * [minInterval, maxInterval]. 128 */ 129 void addFunctionUniformDistribution( 130 Function<void()>&& cb, 131 std::chrono::microseconds minInterval, 132 std::chrono::microseconds maxInterval, 133 StringPiece nameID, 134 std::chrono::microseconds startDelay); 135 136 /** 137 * Add a new function to the FunctionScheduler whose start times are attempted 138 * to be scheduled so that they are congruent modulo the interval. 139 * Note: The scheduling of the next run time happens right before the function 140 * invocation, so the first time a function takes more time than the interval, 141 * it will be reinvoked immediately. 142 */ 143 void addFunctionConsistentDelay( 144 Function<void()>&& cb, 145 std::chrono::microseconds interval, 146 StringPiece nameID = StringPiece(), 147 std::chrono::microseconds startDelay = std::chrono::microseconds(0)); 148 149 /** 150 * A type alias for function that is called to determine the time 151 * interval for the next scheduled run. 152 */ 153 using IntervalDistributionFunc = Function<std::chrono::microseconds()>; 154 /** 155 * A type alias for function that returns the next run time, given the current 156 * run time and the current start time. 157 */ 158 using NextRunTimeFunc = Function<std::chrono::steady_clock::time_point( 159 std::chrono::steady_clock::time_point, 160 std::chrono::steady_clock::time_point)>; 161 162 /** 163 * Add a new function to the FunctionScheduler. The scheduling interval 164 * is determined by the interval distribution functor, which is called 165 * every time the next function execution is scheduled. This allows 166 * for supporting custom interval distribution algorithms in addition 167 * to built in constant interval; and Poisson and jitter distributions 168 * (@see FunctionScheduler::addFunction and 169 * @see FunctionScheduler::addFunctionJitterInterval). 170 */ 171 void addFunctionGenericDistribution( 172 Function<void()>&& cb, 173 IntervalDistributionFunc&& intervalFunc, 174 const std::string& nameID, 175 const std::string& intervalDescr, 176 std::chrono::microseconds startDelay); 177 178 /** 179 * Like addFunctionGenericDistribution, adds a new function to the 180 * FunctionScheduler, but the next run time is determined directly by the 181 * given functor, rather than by adding an interval. 182 */ 183 void addFunctionGenericNextRunTimeFunctor( 184 Function<void()>&& cb, 185 NextRunTimeFunc&& fn, 186 const std::string& nameID, 187 const std::string& intervalDescr, 188 std::chrono::microseconds startDelay); 189 190 /** 191 * Cancels the function with the specified name, so it will no longer be run. 192 * 193 * Returns false if no function exists with the specified name. 194 */ 195 bool cancelFunction(StringPiece nameID); 196 bool cancelFunctionAndWait(StringPiece nameID); 197 198 /** 199 * All functions registered will be canceled. 200 */ 201 void cancelAllFunctions(); 202 void cancelAllFunctionsAndWait(); 203 204 /** 205 * Resets the specified function's timer. 206 * When resetFunctionTimer is called, the specified function's timer will 207 * be reset with the same parameters it was passed initially, including 208 * its startDelay. If the startDelay was 0, the function will be invoked 209 * immediately. 210 * 211 * Returns false if no function exists with the specified name. 212 */ 213 bool resetFunctionTimer(StringPiece nameID); 214 215 /** 216 * Starts the scheduler. 217 * 218 * Returns false if the scheduler was already running. 219 */ 220 bool start(); 221 222 /** 223 * Stops the FunctionScheduler. 224 * 225 * It may be restarted later by calling start() again. 226 * Returns false if the scheduler was not running. 227 */ 228 bool shutdown(); 229 230 /** 231 * Set the name of the worker thread. 232 */ 233 void setThreadName(StringPiece threadName); 234 235 private: 236 struct RepeatFunc { 237 Function<void()> cb; 238 NextRunTimeFunc nextRunTimeFunc; 239 std::chrono::steady_clock::time_point nextRunTime; 240 std::string name; 241 std::chrono::microseconds startDelay; 242 std::string intervalDescr; 243 bool runOnce; 244 RepeatFuncRepeatFunc245 RepeatFunc( 246 Function<void()>&& cback, 247 IntervalDistributionFunc&& intervalFn, 248 const std::string& nameID, 249 const std::string& intervalDistDescription, 250 std::chrono::microseconds delay, 251 bool once) 252 : RepeatFunc( 253 std::move(cback), 254 getNextRunTimeFunc(std::move(intervalFn)), 255 nameID, 256 intervalDistDescription, 257 delay, 258 once) {} 259 RepeatFuncRepeatFunc260 RepeatFunc( 261 Function<void()>&& cback, 262 NextRunTimeFunc&& nextRunTimeFn, 263 const std::string& nameID, 264 const std::string& intervalDistDescription, 265 std::chrono::microseconds delay, 266 bool once) 267 : cb(std::move(cback)), 268 nextRunTimeFunc(std::move(nextRunTimeFn)), 269 nextRunTime(), 270 name(nameID), 271 startDelay(delay), 272 intervalDescr(intervalDistDescription), 273 runOnce(once) {} 274 getNextRunTimeFuncRepeatFunc275 static NextRunTimeFunc getNextRunTimeFunc( 276 IntervalDistributionFunc&& intervalFn) { 277 return [intervalFn = std::move(intervalFn)]( 278 std::chrono::steady_clock::time_point /* curNextRunTime */, 279 std::chrono::steady_clock::time_point curTime) mutable { 280 return curTime + intervalFn(); 281 }; 282 } 283 getNextRunTimeRepeatFunc284 std::chrono::steady_clock::time_point getNextRunTime() const { 285 return nextRunTime; 286 } setNextRunTimeStrictRepeatFunc287 void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) { 288 nextRunTime = nextRunTimeFunc(nextRunTime, curTime); 289 } setNextRunTimeSteadyRepeatFunc290 void setNextRunTimeSteady() { 291 nextRunTime = nextRunTimeFunc(nextRunTime, nextRunTime); 292 } resetNextRunTimeRepeatFunc293 void resetNextRunTime(std::chrono::steady_clock::time_point curTime) { 294 nextRunTime = curTime + startDelay; 295 } cancelRepeatFunc296 void cancel() { 297 // Simply reset cb to an empty function. 298 cb = {}; 299 } isValidRepeatFunc300 bool isValid() const { return bool(cb); } 301 }; 302 303 struct RunTimeOrder { operatorRunTimeOrder304 bool operator()( 305 const std::unique_ptr<RepeatFunc>& f1, 306 const std::unique_ptr<RepeatFunc>& f2) const { 307 return f1->getNextRunTime() > f2->getNextRunTime(); 308 } 309 }; 310 311 typedef std::vector<std::unique_ptr<RepeatFunc>> FunctionHeap; 312 typedef std::unordered_map<StringPiece, RepeatFunc*, Hash> FunctionMap; 313 314 void run(); 315 void runOneFunction( 316 std::unique_lock<std::mutex>& lock, 317 std::chrono::steady_clock::time_point now); 318 void cancelFunction(const std::unique_lock<std::mutex>& lock, RepeatFunc* it); 319 void addFunctionToHeap( 320 const std::unique_lock<std::mutex>& lock, 321 std::unique_ptr<RepeatFunc> func); 322 323 template <typename RepeatFuncNextRunTimeFunc> 324 void addFunctionToHeapChecked( 325 Function<void()>&& cb, 326 RepeatFuncNextRunTimeFunc&& fn, 327 const std::string& nameID, 328 const std::string& intervalDescr, 329 std::chrono::microseconds startDelay, 330 bool runOnce); 331 332 void addFunctionInternal( 333 Function<void()>&& cb, 334 NextRunTimeFunc&& fn, 335 const std::string& nameID, 336 const std::string& intervalDescr, 337 std::chrono::microseconds startDelay, 338 bool runOnce); 339 void addFunctionInternal( 340 Function<void()>&& cb, 341 IntervalDistributionFunc&& fn, 342 const std::string& nameID, 343 const std::string& intervalDescr, 344 std::chrono::microseconds startDelay, 345 bool runOnce); 346 347 // Return true if the current function is being canceled 348 bool cancelAllFunctionsWithLock(std::unique_lock<std::mutex>& lock); 349 bool cancelFunctionWithLock( 350 std::unique_lock<std::mutex>& lock, StringPiece nameID); 351 352 std::thread thread_; 353 354 // Mutex to protect our member variables. 355 std::mutex mutex_; 356 bool running_{false}; 357 358 // The functions to run. 359 // This is a heap, ordered by next run time. 360 FunctionHeap functions_; 361 FunctionMap functionsMap_; 362 RunTimeOrder fnCmp_; 363 364 // The function currently being invoked by the running thread. 365 // This is null when the running thread is idle 366 RepeatFunc* currentFunction_{nullptr}; 367 368 // Condition variable that is signalled whenever a new function is added 369 // or when the FunctionScheduler is stopped. 370 std::condition_variable runningCondvar_; 371 372 std::string threadName_; 373 bool steady_{false}; 374 bool cancellingCurrentFunction_{false}; 375 }; 376 377 } // namespace folly 378