1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/experimental/FunctionScheduler.h>
18 
19 #include <random>
20 
21 #include <glog/logging.h>
22 
23 #include <folly/Conv.h>
24 #include <folly/Random.h>
25 #include <folly/String.h>
26 #include <folly/system/ThreadName.h>
27 
28 using std::chrono::microseconds;
29 using std::chrono::steady_clock;
30 
31 namespace folly {
32 
33 namespace {
34 
35 struct ConsistentDelayFunctor {
36   const microseconds constInterval;
37 
ConsistentDelayFunctorfolly::__anond05026830111::ConsistentDelayFunctor38   explicit ConsistentDelayFunctor(microseconds interval)
39       : constInterval(interval) {
40     if (interval < microseconds::zero()) {
41       throw std::invalid_argument(
42           "FunctionScheduler: "
43           "time interval must be non-negative");
44     }
45   }
46 
operator ()folly::__anond05026830111::ConsistentDelayFunctor47   steady_clock::time_point operator()(
48       steady_clock::time_point curNextRunTime,
49       steady_clock::time_point curTime) const {
50     auto intervalsPassed = (curTime - curNextRunTime) / constInterval;
51     return (intervalsPassed + 1) * constInterval + curNextRunTime;
52   }
53 };
54 
55 struct ConstIntervalFunctor {
56   const microseconds constInterval;
57 
ConstIntervalFunctorfolly::__anond05026830111::ConstIntervalFunctor58   explicit ConstIntervalFunctor(microseconds interval)
59       : constInterval(interval) {
60     if (interval < microseconds::zero()) {
61       throw std::invalid_argument(
62           "FunctionScheduler: "
63           "time interval must be non-negative");
64     }
65   }
66 
operator ()folly::__anond05026830111::ConstIntervalFunctor67   microseconds operator()() const { return constInterval; }
68 };
69 
70 struct PoissonDistributionFunctor {
71   std::default_random_engine generator;
72   std::poisson_distribution<microseconds::rep> poissonRandom;
73 
PoissonDistributionFunctorfolly::__anond05026830111::PoissonDistributionFunctor74   explicit PoissonDistributionFunctor(microseconds meanPoissonUsec)
75       : poissonRandom(meanPoissonUsec.count()) {
76     if (meanPoissonUsec.count() < 0) {
77       throw std::invalid_argument(
78           "FunctionScheduler: "
79           "Poisson mean interval must be non-negative");
80     }
81   }
82 
operator ()folly::__anond05026830111::PoissonDistributionFunctor83   microseconds operator()() { return microseconds(poissonRandom(generator)); }
84 };
85 
86 struct UniformDistributionFunctor {
87   std::default_random_engine generator;
88   std::uniform_int_distribution<microseconds::rep> dist;
89 
UniformDistributionFunctorfolly::__anond05026830111::UniformDistributionFunctor90   UniformDistributionFunctor(microseconds minInterval, microseconds maxInterval)
91       : generator(Random::rand32()),
92         dist(minInterval.count(), maxInterval.count()) {
93     if (minInterval > maxInterval) {
94       throw std::invalid_argument(
95           "FunctionScheduler: "
96           "min time interval must be less or equal than max interval");
97     }
98     if (minInterval < microseconds::zero()) {
99       throw std::invalid_argument(
100           "FunctionScheduler: "
101           "time interval must be non-negative");
102     }
103   }
104 
operator ()folly::__anond05026830111::UniformDistributionFunctor105   microseconds operator()() { return microseconds(dist(generator)); }
106 };
107 
108 } // namespace
109 
110 FunctionScheduler::FunctionScheduler() = default;
111 
~FunctionScheduler()112 FunctionScheduler::~FunctionScheduler() {
113   // make sure to stop the thread (if running)
114   shutdown();
115 }
116 
addFunction(Function<void ()> && cb,microseconds interval,StringPiece nameID,microseconds startDelay)117 void FunctionScheduler::addFunction(
118     Function<void()>&& cb,
119     microseconds interval,
120     StringPiece nameID,
121     microseconds startDelay) {
122   addFunctionInternal(
123       std::move(cb),
124       ConstIntervalFunctor(interval),
125       nameID.str(),
126       to<std::string>(interval.count(), "us"),
127       startDelay,
128       false /*runOnce*/);
129 }
130 
addFunction(Function<void ()> && cb,microseconds interval,const LatencyDistribution & latencyDistr,StringPiece nameID,microseconds startDelay)131 void FunctionScheduler::addFunction(
132     Function<void()>&& cb,
133     microseconds interval,
134     const LatencyDistribution& latencyDistr,
135     StringPiece nameID,
136     microseconds startDelay) {
137   if (latencyDistr.isPoisson) {
138     addFunctionInternal(
139         std::move(cb),
140         PoissonDistributionFunctor(latencyDistr.poissonMean),
141         nameID.str(),
142         to<std::string>(latencyDistr.poissonMean.count(), "us (Poisson mean)"),
143         startDelay,
144         false /*runOnce*/);
145   } else {
146     addFunction(std::move(cb), interval, nameID, startDelay);
147   }
148 }
149 
addFunctionOnce(Function<void ()> && cb,StringPiece nameID,microseconds startDelay)150 void FunctionScheduler::addFunctionOnce(
151     Function<void()>&& cb, StringPiece nameID, microseconds startDelay) {
152   addFunctionInternal(
153       std::move(cb),
154       ConstIntervalFunctor(microseconds::zero()),
155       nameID.str(),
156       "once",
157       startDelay,
158       true /*runOnce*/);
159 }
160 
addFunctionUniformDistribution(Function<void ()> && cb,microseconds minInterval,microseconds maxInterval,StringPiece nameID,microseconds startDelay)161 void FunctionScheduler::addFunctionUniformDistribution(
162     Function<void()>&& cb,
163     microseconds minInterval,
164     microseconds maxInterval,
165     StringPiece nameID,
166     microseconds startDelay) {
167   addFunctionInternal(
168       std::move(cb),
169       UniformDistributionFunctor(minInterval, maxInterval),
170       nameID.str(),
171       to<std::string>(
172           "[", minInterval.count(), " , ", maxInterval.count(), "] us"),
173       startDelay,
174       false /*runOnce*/);
175 }
176 
addFunctionConsistentDelay(Function<void ()> && cb,microseconds interval,StringPiece nameID,microseconds startDelay)177 void FunctionScheduler::addFunctionConsistentDelay(
178     Function<void()>&& cb,
179     microseconds interval,
180     StringPiece nameID,
181     microseconds startDelay) {
182   addFunctionInternal(
183       std::move(cb),
184       ConsistentDelayFunctor(interval),
185       nameID.str(),
186       to<std::string>(interval.count(), "us"),
187       startDelay,
188       false /*runOnce*/);
189 }
190 
addFunctionGenericDistribution(Function<void ()> && cb,IntervalDistributionFunc && intervalFunc,const std::string & nameID,const std::string & intervalDescr,microseconds startDelay)191 void FunctionScheduler::addFunctionGenericDistribution(
192     Function<void()>&& cb,
193     IntervalDistributionFunc&& intervalFunc,
194     const std::string& nameID,
195     const std::string& intervalDescr,
196     microseconds startDelay) {
197   addFunctionInternal(
198       std::move(cb),
199       std::move(intervalFunc),
200       nameID,
201       intervalDescr,
202       startDelay,
203       false /*runOnce*/);
204 }
205 
addFunctionGenericNextRunTimeFunctor(Function<void ()> && cb,NextRunTimeFunc && fn,const std::string & nameID,const std::string & intervalDescr,microseconds startDelay)206 void FunctionScheduler::addFunctionGenericNextRunTimeFunctor(
207     Function<void()>&& cb,
208     NextRunTimeFunc&& fn,
209     const std::string& nameID,
210     const std::string& intervalDescr,
211     microseconds startDelay) {
212   addFunctionInternal(
213       std::move(cb),
214       std::move(fn),
215       nameID,
216       intervalDescr,
217       startDelay,
218       false /*runOnce*/);
219 }
220 
221 template <typename RepeatFuncNextRunTimeFunc>
addFunctionToHeapChecked(Function<void ()> && cb,RepeatFuncNextRunTimeFunc && fn,const std::string & nameID,const std::string & intervalDescr,microseconds startDelay,bool runOnce)222 void FunctionScheduler::addFunctionToHeapChecked(
223     Function<void()>&& cb,
224     RepeatFuncNextRunTimeFunc&& fn,
225     const std::string& nameID,
226     const std::string& intervalDescr,
227     microseconds startDelay,
228     bool runOnce) {
229   if (!cb) {
230     throw std::invalid_argument(
231         "FunctionScheduler: Scheduled function must be set");
232   }
233   if (!fn) {
234     throw std::invalid_argument(
235         "FunctionScheduler: "
236         "interval distribution or next run time function must be set");
237   }
238   if (startDelay < microseconds::zero()) {
239     throw std::invalid_argument(
240         "FunctionScheduler: start delay must be non-negative");
241   }
242 
243   std::unique_lock<std::mutex> l(mutex_);
244   auto it = functionsMap_.find(nameID);
245   // check if the nameID is unique
246   if (it != functionsMap_.end() && it->second->isValid()) {
247     throw std::invalid_argument(to<std::string>(
248         "FunctionScheduler: a function named \"", nameID, "\" already exists"));
249   }
250 
251   if (currentFunction_ && currentFunction_->name == nameID) {
252     throw std::invalid_argument(to<std::string>(
253         "FunctionScheduler: a function named \"", nameID, "\" already exists"));
254   }
255 
256   addFunctionToHeap(
257       l,
258       std::make_unique<RepeatFunc>(
259           std::move(cb),
260           std::forward<RepeatFuncNextRunTimeFunc>(fn),
261           nameID,
262           intervalDescr,
263           startDelay,
264           runOnce));
265 }
266 
addFunctionInternal(Function<void ()> && cb,NextRunTimeFunc && fn,const std::string & nameID,const std::string & intervalDescr,microseconds startDelay,bool runOnce)267 void FunctionScheduler::addFunctionInternal(
268     Function<void()>&& cb,
269     NextRunTimeFunc&& fn,
270     const std::string& nameID,
271     const std::string& intervalDescr,
272     microseconds startDelay,
273     bool runOnce) {
274   return addFunctionToHeapChecked(
275       std::move(cb), std::move(fn), nameID, intervalDescr, startDelay, runOnce);
276 }
277 
addFunctionInternal(Function<void ()> && cb,IntervalDistributionFunc && fn,const std::string & nameID,const std::string & intervalDescr,microseconds startDelay,bool runOnce)278 void FunctionScheduler::addFunctionInternal(
279     Function<void()>&& cb,
280     IntervalDistributionFunc&& fn,
281     const std::string& nameID,
282     const std::string& intervalDescr,
283     microseconds startDelay,
284     bool runOnce) {
285   return addFunctionToHeapChecked(
286       std::move(cb), std::move(fn), nameID, intervalDescr, startDelay, runOnce);
287 }
288 
cancelFunctionWithLock(std::unique_lock<std::mutex> & lock,StringPiece nameID)289 bool FunctionScheduler::cancelFunctionWithLock(
290     std::unique_lock<std::mutex>& lock, StringPiece nameID) {
291   CHECK_EQ(lock.owns_lock(), true);
292   if (currentFunction_ && currentFunction_->name == nameID) {
293     functionsMap_.erase(currentFunction_->name);
294     // This function is currently being run. Clear currentFunction_
295     // The running thread will see this and won't reschedule the function.
296     currentFunction_ = nullptr;
297     cancellingCurrentFunction_ = true;
298     return true;
299   }
300   return false;
301 }
302 
cancelFunction(StringPiece nameID)303 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
304   std::unique_lock<std::mutex> l(mutex_);
305   if (cancelFunctionWithLock(l, nameID)) {
306     return true;
307   }
308   auto it = functionsMap_.find(nameID);
309   if (it != functionsMap_.end() && it->second->isValid()) {
310     cancelFunction(l, it->second);
311     return true;
312   }
313 
314   return false;
315 }
316 
cancelFunctionAndWait(StringPiece nameID)317 bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) {
318   std::unique_lock<std::mutex> l(mutex_);
319 
320   if (cancelFunctionWithLock(l, nameID)) {
321     runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
322     return true;
323   }
324 
325   auto it = functionsMap_.find(nameID);
326   if (it != functionsMap_.end() && it->second->isValid()) {
327     cancelFunction(l, it->second);
328     return true;
329   }
330   return false;
331 }
332 
cancelFunction(const std::unique_lock<std::mutex> & l,RepeatFunc * it)333 void FunctionScheduler::cancelFunction(
334     const std::unique_lock<std::mutex>& l, RepeatFunc* it) {
335   // This function should only be called with mutex_ already locked.
336   DCHECK(l.mutex() == &mutex_);
337   DCHECK(l.owns_lock());
338   functionsMap_.erase(it->name);
339   it->cancel();
340 }
341 
cancelAllFunctionsWithLock(std::unique_lock<std::mutex> & lock)342 bool FunctionScheduler::cancelAllFunctionsWithLock(
343     std::unique_lock<std::mutex>& lock) {
344   CHECK_EQ(lock.owns_lock(), true);
345   functions_.clear();
346   functionsMap_.clear();
347   if (currentFunction_) {
348     cancellingCurrentFunction_ = true;
349   }
350   currentFunction_ = nullptr;
351   return cancellingCurrentFunction_;
352 }
353 
cancelAllFunctions()354 void FunctionScheduler::cancelAllFunctions() {
355   std::unique_lock<std::mutex> l(mutex_);
356   cancelAllFunctionsWithLock(l);
357 }
358 
cancelAllFunctionsAndWait()359 void FunctionScheduler::cancelAllFunctionsAndWait() {
360   std::unique_lock<std::mutex> l(mutex_);
361   if (cancelAllFunctionsWithLock(l)) {
362     runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
363   }
364 }
365 
resetFunctionTimer(StringPiece nameID)366 bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
367   std::unique_lock<std::mutex> l(mutex_);
368   if (currentFunction_ && currentFunction_->name == nameID) {
369     if (cancellingCurrentFunction_ || currentFunction_->runOnce) {
370       return false;
371     }
372     currentFunction_->resetNextRunTime(steady_clock::now());
373     return true;
374   }
375 
376   // Since __adjust_heap() isn't a part of the standard API, there's no way to
377   // fix the heap ordering if we adjust the key (nextRunTime) for the existing
378   // RepeatFunc. Instead, we just cancel it and add an identical object.
379   auto it = functionsMap_.find(nameID);
380   if (it != functionsMap_.end() && it->second->isValid()) {
381     if (running_) {
382       it->second->resetNextRunTime(steady_clock::now());
383       std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
384       runningCondvar_.notify_one();
385     }
386     return true;
387   }
388   return false;
389 }
390 
start()391 bool FunctionScheduler::start() {
392   std::unique_lock<std::mutex> l(mutex_);
393   if (running_) {
394     return false;
395   }
396 
397   VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
398           << " functions.";
399   auto now = steady_clock::now();
400   // Reset the next run time. for all functions.
401   // note: this is needed since one can shutdown() and start() again
402   for (const auto& f : functions_) {
403     f->resetNextRunTime(now);
404     VLOG(1) << "   - func: " << (f->name.empty() ? "(anon)" : f->name.c_str())
405             << ", period = " << f->intervalDescr
406             << ", delay = " << f->startDelay.count() << "ms";
407   }
408   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
409 
410   thread_ = std::thread([&] { this->run(); });
411   running_ = true;
412 
413   return true;
414 }
415 
shutdown()416 bool FunctionScheduler::shutdown() {
417   {
418     std::lock_guard<std::mutex> g(mutex_);
419     if (!running_) {
420       return false;
421     }
422 
423     running_ = false;
424     runningCondvar_.notify_one();
425   }
426   thread_.join();
427   return true;
428 }
429 
run()430 void FunctionScheduler::run() {
431   std::unique_lock<std::mutex> lock(mutex_);
432 
433   if (!threadName_.empty()) {
434     folly::setThreadName(threadName_);
435   }
436 
437   while (running_) {
438     // If we have nothing to run, wait until a function is added or until we
439     // are stopped.
440     if (functions_.empty()) {
441       runningCondvar_.wait(lock);
442       continue;
443     }
444 
445     auto now = steady_clock::now();
446 
447     // Move the next function to run to the end of functions_
448     std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
449 
450     // Check to see if the function was cancelled.
451     // If so, just remove it and continue around the loop.
452     if (!functions_.back()->isValid()) {
453       functions_.pop_back();
454       continue;
455     }
456 
457     auto sleepTime = functions_.back()->getNextRunTime() - now;
458     if (sleepTime < microseconds::zero()) {
459       // We need to run this function now
460       runOneFunction(lock, now);
461       runningCondvar_.notify_all();
462     } else {
463       // Re-add the function to the heap, and wait until we actually
464       // need to run it.
465       std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
466       runningCondvar_.wait_for(lock, sleepTime);
467     }
468   }
469 }
470 
runOneFunction(std::unique_lock<std::mutex> & lock,steady_clock::time_point now)471 void FunctionScheduler::runOneFunction(
472     std::unique_lock<std::mutex>& lock, steady_clock::time_point now) {
473   DCHECK(lock.mutex() == &mutex_);
474   DCHECK(lock.owns_lock());
475 
476   // The function to run will be at the end of functions_ already.
477   //
478   // Fully remove it from functions_ now.
479   // We need to release mutex_ while we invoke this function, and we need to
480   // maintain the heap property on functions_ while mutex_ is unlocked.
481   auto func = std::move(functions_.back());
482   functions_.pop_back();
483   if (!func->cb) {
484     VLOG(5) << func->name << "function has been canceled while waiting";
485     return;
486   }
487   currentFunction_ = func.get();
488   // Update the function's next run time.
489   if (steady_) {
490     // This allows scheduler to catch up
491     func->setNextRunTimeSteady();
492   } else {
493     // Note that we set nextRunTime based on the current time where we started
494     // the function call, rather than the time when the function finishes.
495     // This ensures that we call the function once every time interval, as
496     // opposed to waiting time interval seconds between calls.  (These can be
497     // different if the function takes a significant amount of time to run.)
498     func->setNextRunTimeStrict(now);
499   }
500 
501   // Release the lock while we invoke the user's function
502   lock.unlock();
503 
504   // Invoke the function
505   try {
506     VLOG(5) << "Now running " << func->name;
507     func->cb();
508   } catch (const std::exception& ex) {
509     LOG(ERROR) << "Error running the scheduled function <" << func->name
510                << ">: " << exceptionStr(ex);
511   }
512 
513   // Re-acquire the lock
514   lock.lock();
515 
516   if (!currentFunction_) {
517     // The function was cancelled while we were running it.
518     // We shouldn't reschedule it;
519     cancellingCurrentFunction_ = false;
520     return;
521   }
522   if (currentFunction_->runOnce) {
523     // Don't reschedule if the function only needed to run once.
524     functionsMap_.erase(currentFunction_->name);
525     currentFunction_ = nullptr;
526     return;
527   }
528 
529   // Re-insert the function into our functions_ heap.
530   // We only maintain the heap property while running_ is set.  (running_ may
531   // have been cleared while we were invoking the user's function.)
532   functions_.push_back(std::move(func));
533 
534   // Clear currentFunction_
535   currentFunction_ = nullptr;
536 
537   if (running_) {
538     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
539   }
540 }
541 
addFunctionToHeap(const std::unique_lock<std::mutex> & lock,std::unique_ptr<RepeatFunc> func)542 void FunctionScheduler::addFunctionToHeap(
543     const std::unique_lock<std::mutex>& lock,
544     std::unique_ptr<RepeatFunc> func) {
545   // This function should only be called with mutex_ already locked.
546   DCHECK(lock.mutex() == &mutex_);
547   DCHECK(lock.owns_lock());
548 
549   functions_.push_back(std::move(func));
550   functionsMap_[functions_.back()->name] = functions_.back().get();
551   if (running_) {
552     functions_.back()->resetNextRunTime(steady_clock::now());
553     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
554     // Signal the running thread to wake up and see if it needs to change
555     // its current scheduling decision.
556     runningCondvar_.notify_one();
557   }
558 }
559 
setThreadName(StringPiece threadName)560 void FunctionScheduler::setThreadName(StringPiece threadName) {
561   std::unique_lock<std::mutex> l(mutex_);
562   threadName_ = threadName.str();
563 }
564 
565 } // namespace folly
566