1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/experimental/FunctionScheduler.h>
18
19 #include <random>
20
21 #include <glog/logging.h>
22
23 #include <folly/Conv.h>
24 #include <folly/Random.h>
25 #include <folly/String.h>
26 #include <folly/system/ThreadName.h>
27
28 using std::chrono::microseconds;
29 using std::chrono::steady_clock;
30
31 namespace folly {
32
33 namespace {
34
35 struct ConsistentDelayFunctor {
36 const microseconds constInterval;
37
ConsistentDelayFunctorfolly::__anond05026830111::ConsistentDelayFunctor38 explicit ConsistentDelayFunctor(microseconds interval)
39 : constInterval(interval) {
40 if (interval < microseconds::zero()) {
41 throw std::invalid_argument(
42 "FunctionScheduler: "
43 "time interval must be non-negative");
44 }
45 }
46
operator ()folly::__anond05026830111::ConsistentDelayFunctor47 steady_clock::time_point operator()(
48 steady_clock::time_point curNextRunTime,
49 steady_clock::time_point curTime) const {
50 auto intervalsPassed = (curTime - curNextRunTime) / constInterval;
51 return (intervalsPassed + 1) * constInterval + curNextRunTime;
52 }
53 };
54
55 struct ConstIntervalFunctor {
56 const microseconds constInterval;
57
ConstIntervalFunctorfolly::__anond05026830111::ConstIntervalFunctor58 explicit ConstIntervalFunctor(microseconds interval)
59 : constInterval(interval) {
60 if (interval < microseconds::zero()) {
61 throw std::invalid_argument(
62 "FunctionScheduler: "
63 "time interval must be non-negative");
64 }
65 }
66
operator ()folly::__anond05026830111::ConstIntervalFunctor67 microseconds operator()() const { return constInterval; }
68 };
69
70 struct PoissonDistributionFunctor {
71 std::default_random_engine generator;
72 std::poisson_distribution<microseconds::rep> poissonRandom;
73
PoissonDistributionFunctorfolly::__anond05026830111::PoissonDistributionFunctor74 explicit PoissonDistributionFunctor(microseconds meanPoissonUsec)
75 : poissonRandom(meanPoissonUsec.count()) {
76 if (meanPoissonUsec.count() < 0) {
77 throw std::invalid_argument(
78 "FunctionScheduler: "
79 "Poisson mean interval must be non-negative");
80 }
81 }
82
operator ()folly::__anond05026830111::PoissonDistributionFunctor83 microseconds operator()() { return microseconds(poissonRandom(generator)); }
84 };
85
86 struct UniformDistributionFunctor {
87 std::default_random_engine generator;
88 std::uniform_int_distribution<microseconds::rep> dist;
89
UniformDistributionFunctorfolly::__anond05026830111::UniformDistributionFunctor90 UniformDistributionFunctor(microseconds minInterval, microseconds maxInterval)
91 : generator(Random::rand32()),
92 dist(minInterval.count(), maxInterval.count()) {
93 if (minInterval > maxInterval) {
94 throw std::invalid_argument(
95 "FunctionScheduler: "
96 "min time interval must be less or equal than max interval");
97 }
98 if (minInterval < microseconds::zero()) {
99 throw std::invalid_argument(
100 "FunctionScheduler: "
101 "time interval must be non-negative");
102 }
103 }
104
operator ()folly::__anond05026830111::UniformDistributionFunctor105 microseconds operator()() { return microseconds(dist(generator)); }
106 };
107
108 } // namespace
109
110 FunctionScheduler::FunctionScheduler() = default;
111
~FunctionScheduler()112 FunctionScheduler::~FunctionScheduler() {
113 // make sure to stop the thread (if running)
114 shutdown();
115 }
116
addFunction(Function<void ()> && cb,microseconds interval,StringPiece nameID,microseconds startDelay)117 void FunctionScheduler::addFunction(
118 Function<void()>&& cb,
119 microseconds interval,
120 StringPiece nameID,
121 microseconds startDelay) {
122 addFunctionInternal(
123 std::move(cb),
124 ConstIntervalFunctor(interval),
125 nameID.str(),
126 to<std::string>(interval.count(), "us"),
127 startDelay,
128 false /*runOnce*/);
129 }
130
addFunction(Function<void ()> && cb,microseconds interval,const LatencyDistribution & latencyDistr,StringPiece nameID,microseconds startDelay)131 void FunctionScheduler::addFunction(
132 Function<void()>&& cb,
133 microseconds interval,
134 const LatencyDistribution& latencyDistr,
135 StringPiece nameID,
136 microseconds startDelay) {
137 if (latencyDistr.isPoisson) {
138 addFunctionInternal(
139 std::move(cb),
140 PoissonDistributionFunctor(latencyDistr.poissonMean),
141 nameID.str(),
142 to<std::string>(latencyDistr.poissonMean.count(), "us (Poisson mean)"),
143 startDelay,
144 false /*runOnce*/);
145 } else {
146 addFunction(std::move(cb), interval, nameID, startDelay);
147 }
148 }
149
addFunctionOnce(Function<void ()> && cb,StringPiece nameID,microseconds startDelay)150 void FunctionScheduler::addFunctionOnce(
151 Function<void()>&& cb, StringPiece nameID, microseconds startDelay) {
152 addFunctionInternal(
153 std::move(cb),
154 ConstIntervalFunctor(microseconds::zero()),
155 nameID.str(),
156 "once",
157 startDelay,
158 true /*runOnce*/);
159 }
160
addFunctionUniformDistribution(Function<void ()> && cb,microseconds minInterval,microseconds maxInterval,StringPiece nameID,microseconds startDelay)161 void FunctionScheduler::addFunctionUniformDistribution(
162 Function<void()>&& cb,
163 microseconds minInterval,
164 microseconds maxInterval,
165 StringPiece nameID,
166 microseconds startDelay) {
167 addFunctionInternal(
168 std::move(cb),
169 UniformDistributionFunctor(minInterval, maxInterval),
170 nameID.str(),
171 to<std::string>(
172 "[", minInterval.count(), " , ", maxInterval.count(), "] us"),
173 startDelay,
174 false /*runOnce*/);
175 }
176
addFunctionConsistentDelay(Function<void ()> && cb,microseconds interval,StringPiece nameID,microseconds startDelay)177 void FunctionScheduler::addFunctionConsistentDelay(
178 Function<void()>&& cb,
179 microseconds interval,
180 StringPiece nameID,
181 microseconds startDelay) {
182 addFunctionInternal(
183 std::move(cb),
184 ConsistentDelayFunctor(interval),
185 nameID.str(),
186 to<std::string>(interval.count(), "us"),
187 startDelay,
188 false /*runOnce*/);
189 }
190
addFunctionGenericDistribution(Function<void ()> && cb,IntervalDistributionFunc && intervalFunc,const std::string & nameID,const std::string & intervalDescr,microseconds startDelay)191 void FunctionScheduler::addFunctionGenericDistribution(
192 Function<void()>&& cb,
193 IntervalDistributionFunc&& intervalFunc,
194 const std::string& nameID,
195 const std::string& intervalDescr,
196 microseconds startDelay) {
197 addFunctionInternal(
198 std::move(cb),
199 std::move(intervalFunc),
200 nameID,
201 intervalDescr,
202 startDelay,
203 false /*runOnce*/);
204 }
205
addFunctionGenericNextRunTimeFunctor(Function<void ()> && cb,NextRunTimeFunc && fn,const std::string & nameID,const std::string & intervalDescr,microseconds startDelay)206 void FunctionScheduler::addFunctionGenericNextRunTimeFunctor(
207 Function<void()>&& cb,
208 NextRunTimeFunc&& fn,
209 const std::string& nameID,
210 const std::string& intervalDescr,
211 microseconds startDelay) {
212 addFunctionInternal(
213 std::move(cb),
214 std::move(fn),
215 nameID,
216 intervalDescr,
217 startDelay,
218 false /*runOnce*/);
219 }
220
221 template <typename RepeatFuncNextRunTimeFunc>
addFunctionToHeapChecked(Function<void ()> && cb,RepeatFuncNextRunTimeFunc && fn,const std::string & nameID,const std::string & intervalDescr,microseconds startDelay,bool runOnce)222 void FunctionScheduler::addFunctionToHeapChecked(
223 Function<void()>&& cb,
224 RepeatFuncNextRunTimeFunc&& fn,
225 const std::string& nameID,
226 const std::string& intervalDescr,
227 microseconds startDelay,
228 bool runOnce) {
229 if (!cb) {
230 throw std::invalid_argument(
231 "FunctionScheduler: Scheduled function must be set");
232 }
233 if (!fn) {
234 throw std::invalid_argument(
235 "FunctionScheduler: "
236 "interval distribution or next run time function must be set");
237 }
238 if (startDelay < microseconds::zero()) {
239 throw std::invalid_argument(
240 "FunctionScheduler: start delay must be non-negative");
241 }
242
243 std::unique_lock<std::mutex> l(mutex_);
244 auto it = functionsMap_.find(nameID);
245 // check if the nameID is unique
246 if (it != functionsMap_.end() && it->second->isValid()) {
247 throw std::invalid_argument(to<std::string>(
248 "FunctionScheduler: a function named \"", nameID, "\" already exists"));
249 }
250
251 if (currentFunction_ && currentFunction_->name == nameID) {
252 throw std::invalid_argument(to<std::string>(
253 "FunctionScheduler: a function named \"", nameID, "\" already exists"));
254 }
255
256 addFunctionToHeap(
257 l,
258 std::make_unique<RepeatFunc>(
259 std::move(cb),
260 std::forward<RepeatFuncNextRunTimeFunc>(fn),
261 nameID,
262 intervalDescr,
263 startDelay,
264 runOnce));
265 }
266
addFunctionInternal(Function<void ()> && cb,NextRunTimeFunc && fn,const std::string & nameID,const std::string & intervalDescr,microseconds startDelay,bool runOnce)267 void FunctionScheduler::addFunctionInternal(
268 Function<void()>&& cb,
269 NextRunTimeFunc&& fn,
270 const std::string& nameID,
271 const std::string& intervalDescr,
272 microseconds startDelay,
273 bool runOnce) {
274 return addFunctionToHeapChecked(
275 std::move(cb), std::move(fn), nameID, intervalDescr, startDelay, runOnce);
276 }
277
addFunctionInternal(Function<void ()> && cb,IntervalDistributionFunc && fn,const std::string & nameID,const std::string & intervalDescr,microseconds startDelay,bool runOnce)278 void FunctionScheduler::addFunctionInternal(
279 Function<void()>&& cb,
280 IntervalDistributionFunc&& fn,
281 const std::string& nameID,
282 const std::string& intervalDescr,
283 microseconds startDelay,
284 bool runOnce) {
285 return addFunctionToHeapChecked(
286 std::move(cb), std::move(fn), nameID, intervalDescr, startDelay, runOnce);
287 }
288
cancelFunctionWithLock(std::unique_lock<std::mutex> & lock,StringPiece nameID)289 bool FunctionScheduler::cancelFunctionWithLock(
290 std::unique_lock<std::mutex>& lock, StringPiece nameID) {
291 CHECK_EQ(lock.owns_lock(), true);
292 if (currentFunction_ && currentFunction_->name == nameID) {
293 functionsMap_.erase(currentFunction_->name);
294 // This function is currently being run. Clear currentFunction_
295 // The running thread will see this and won't reschedule the function.
296 currentFunction_ = nullptr;
297 cancellingCurrentFunction_ = true;
298 return true;
299 }
300 return false;
301 }
302
cancelFunction(StringPiece nameID)303 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
304 std::unique_lock<std::mutex> l(mutex_);
305 if (cancelFunctionWithLock(l, nameID)) {
306 return true;
307 }
308 auto it = functionsMap_.find(nameID);
309 if (it != functionsMap_.end() && it->second->isValid()) {
310 cancelFunction(l, it->second);
311 return true;
312 }
313
314 return false;
315 }
316
cancelFunctionAndWait(StringPiece nameID)317 bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) {
318 std::unique_lock<std::mutex> l(mutex_);
319
320 if (cancelFunctionWithLock(l, nameID)) {
321 runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
322 return true;
323 }
324
325 auto it = functionsMap_.find(nameID);
326 if (it != functionsMap_.end() && it->second->isValid()) {
327 cancelFunction(l, it->second);
328 return true;
329 }
330 return false;
331 }
332
cancelFunction(const std::unique_lock<std::mutex> & l,RepeatFunc * it)333 void FunctionScheduler::cancelFunction(
334 const std::unique_lock<std::mutex>& l, RepeatFunc* it) {
335 // This function should only be called with mutex_ already locked.
336 DCHECK(l.mutex() == &mutex_);
337 DCHECK(l.owns_lock());
338 functionsMap_.erase(it->name);
339 it->cancel();
340 }
341
cancelAllFunctionsWithLock(std::unique_lock<std::mutex> & lock)342 bool FunctionScheduler::cancelAllFunctionsWithLock(
343 std::unique_lock<std::mutex>& lock) {
344 CHECK_EQ(lock.owns_lock(), true);
345 functions_.clear();
346 functionsMap_.clear();
347 if (currentFunction_) {
348 cancellingCurrentFunction_ = true;
349 }
350 currentFunction_ = nullptr;
351 return cancellingCurrentFunction_;
352 }
353
cancelAllFunctions()354 void FunctionScheduler::cancelAllFunctions() {
355 std::unique_lock<std::mutex> l(mutex_);
356 cancelAllFunctionsWithLock(l);
357 }
358
cancelAllFunctionsAndWait()359 void FunctionScheduler::cancelAllFunctionsAndWait() {
360 std::unique_lock<std::mutex> l(mutex_);
361 if (cancelAllFunctionsWithLock(l)) {
362 runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
363 }
364 }
365
resetFunctionTimer(StringPiece nameID)366 bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
367 std::unique_lock<std::mutex> l(mutex_);
368 if (currentFunction_ && currentFunction_->name == nameID) {
369 if (cancellingCurrentFunction_ || currentFunction_->runOnce) {
370 return false;
371 }
372 currentFunction_->resetNextRunTime(steady_clock::now());
373 return true;
374 }
375
376 // Since __adjust_heap() isn't a part of the standard API, there's no way to
377 // fix the heap ordering if we adjust the key (nextRunTime) for the existing
378 // RepeatFunc. Instead, we just cancel it and add an identical object.
379 auto it = functionsMap_.find(nameID);
380 if (it != functionsMap_.end() && it->second->isValid()) {
381 if (running_) {
382 it->second->resetNextRunTime(steady_clock::now());
383 std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
384 runningCondvar_.notify_one();
385 }
386 return true;
387 }
388 return false;
389 }
390
start()391 bool FunctionScheduler::start() {
392 std::unique_lock<std::mutex> l(mutex_);
393 if (running_) {
394 return false;
395 }
396
397 VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
398 << " functions.";
399 auto now = steady_clock::now();
400 // Reset the next run time. for all functions.
401 // note: this is needed since one can shutdown() and start() again
402 for (const auto& f : functions_) {
403 f->resetNextRunTime(now);
404 VLOG(1) << " - func: " << (f->name.empty() ? "(anon)" : f->name.c_str())
405 << ", period = " << f->intervalDescr
406 << ", delay = " << f->startDelay.count() << "ms";
407 }
408 std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
409
410 thread_ = std::thread([&] { this->run(); });
411 running_ = true;
412
413 return true;
414 }
415
shutdown()416 bool FunctionScheduler::shutdown() {
417 {
418 std::lock_guard<std::mutex> g(mutex_);
419 if (!running_) {
420 return false;
421 }
422
423 running_ = false;
424 runningCondvar_.notify_one();
425 }
426 thread_.join();
427 return true;
428 }
429
run()430 void FunctionScheduler::run() {
431 std::unique_lock<std::mutex> lock(mutex_);
432
433 if (!threadName_.empty()) {
434 folly::setThreadName(threadName_);
435 }
436
437 while (running_) {
438 // If we have nothing to run, wait until a function is added or until we
439 // are stopped.
440 if (functions_.empty()) {
441 runningCondvar_.wait(lock);
442 continue;
443 }
444
445 auto now = steady_clock::now();
446
447 // Move the next function to run to the end of functions_
448 std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
449
450 // Check to see if the function was cancelled.
451 // If so, just remove it and continue around the loop.
452 if (!functions_.back()->isValid()) {
453 functions_.pop_back();
454 continue;
455 }
456
457 auto sleepTime = functions_.back()->getNextRunTime() - now;
458 if (sleepTime < microseconds::zero()) {
459 // We need to run this function now
460 runOneFunction(lock, now);
461 runningCondvar_.notify_all();
462 } else {
463 // Re-add the function to the heap, and wait until we actually
464 // need to run it.
465 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
466 runningCondvar_.wait_for(lock, sleepTime);
467 }
468 }
469 }
470
runOneFunction(std::unique_lock<std::mutex> & lock,steady_clock::time_point now)471 void FunctionScheduler::runOneFunction(
472 std::unique_lock<std::mutex>& lock, steady_clock::time_point now) {
473 DCHECK(lock.mutex() == &mutex_);
474 DCHECK(lock.owns_lock());
475
476 // The function to run will be at the end of functions_ already.
477 //
478 // Fully remove it from functions_ now.
479 // We need to release mutex_ while we invoke this function, and we need to
480 // maintain the heap property on functions_ while mutex_ is unlocked.
481 auto func = std::move(functions_.back());
482 functions_.pop_back();
483 if (!func->cb) {
484 VLOG(5) << func->name << "function has been canceled while waiting";
485 return;
486 }
487 currentFunction_ = func.get();
488 // Update the function's next run time.
489 if (steady_) {
490 // This allows scheduler to catch up
491 func->setNextRunTimeSteady();
492 } else {
493 // Note that we set nextRunTime based on the current time where we started
494 // the function call, rather than the time when the function finishes.
495 // This ensures that we call the function once every time interval, as
496 // opposed to waiting time interval seconds between calls. (These can be
497 // different if the function takes a significant amount of time to run.)
498 func->setNextRunTimeStrict(now);
499 }
500
501 // Release the lock while we invoke the user's function
502 lock.unlock();
503
504 // Invoke the function
505 try {
506 VLOG(5) << "Now running " << func->name;
507 func->cb();
508 } catch (const std::exception& ex) {
509 LOG(ERROR) << "Error running the scheduled function <" << func->name
510 << ">: " << exceptionStr(ex);
511 }
512
513 // Re-acquire the lock
514 lock.lock();
515
516 if (!currentFunction_) {
517 // The function was cancelled while we were running it.
518 // We shouldn't reschedule it;
519 cancellingCurrentFunction_ = false;
520 return;
521 }
522 if (currentFunction_->runOnce) {
523 // Don't reschedule if the function only needed to run once.
524 functionsMap_.erase(currentFunction_->name);
525 currentFunction_ = nullptr;
526 return;
527 }
528
529 // Re-insert the function into our functions_ heap.
530 // We only maintain the heap property while running_ is set. (running_ may
531 // have been cleared while we were invoking the user's function.)
532 functions_.push_back(std::move(func));
533
534 // Clear currentFunction_
535 currentFunction_ = nullptr;
536
537 if (running_) {
538 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
539 }
540 }
541
addFunctionToHeap(const std::unique_lock<std::mutex> & lock,std::unique_ptr<RepeatFunc> func)542 void FunctionScheduler::addFunctionToHeap(
543 const std::unique_lock<std::mutex>& lock,
544 std::unique_ptr<RepeatFunc> func) {
545 // This function should only be called with mutex_ already locked.
546 DCHECK(lock.mutex() == &mutex_);
547 DCHECK(lock.owns_lock());
548
549 functions_.push_back(std::move(func));
550 functionsMap_[functions_.back()->name] = functions_.back().get();
551 if (running_) {
552 functions_.back()->resetNextRunTime(steady_clock::now());
553 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
554 // Signal the running thread to wake up and see if it needs to change
555 // its current scheduling decision.
556 runningCondvar_.notify_one();
557 }
558 }
559
setThreadName(StringPiece threadName)560 void FunctionScheduler::setThreadName(StringPiece threadName) {
561 std::unique_lock<std::mutex> l(mutex_);
562 threadName_ = threadName.str();
563 }
564
565 } // namespace folly
566