1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/executors/TimedDrivableExecutor.h>
18 
19 #include <cstring>
20 #include <ctime>
21 #include <string>
22 #include <tuple>
23 
24 namespace folly {
25 
26 TimedDrivableExecutor::TimedDrivableExecutor() = default;
27 
~TimedDrivableExecutor()28 TimedDrivableExecutor::~TimedDrivableExecutor() noexcept {
29   // Drain on destruction so that if work is added here during the collapse
30   // of a future train, it will propagate.
31   drain();
32 }
33 
add(Func callback)34 void TimedDrivableExecutor::add(Func callback) {
35   queue_.enqueue(std::move(callback));
36 }
37 
drive()38 void TimedDrivableExecutor::drive() noexcept {
39   wait();
40   run();
41 }
42 
try_drive()43 bool TimedDrivableExecutor::try_drive() noexcept {
44   return try_wait() && run() > 0;
45 }
46 
run()47 size_t TimedDrivableExecutor::run() noexcept {
48   size_t count = 0;
49   size_t n = queue_.size();
50 
51   // If we have waited already, then func_ may have a value
52   if (func_) {
53     auto f = std::move(func_);
54     f();
55     count = 1;
56   }
57 
58   while (count < n && queue_.try_dequeue(func_)) {
59     auto f = std::move(func_);
60     f();
61     ++count;
62   }
63 
64   return count;
65 }
66 
drain()67 size_t TimedDrivableExecutor::drain() noexcept {
68   size_t tasksRun = 0;
69   size_t tasksForSingleRun = 0;
70   while ((tasksForSingleRun = run()) != 0) {
71     tasksRun += tasksForSingleRun;
72   }
73   return tasksRun;
74 }
75 
wait()76 void TimedDrivableExecutor::wait() noexcept {
77   if (!func_) {
78     queue_.dequeue(func_);
79   }
80 }
81 
try_wait()82 bool TimedDrivableExecutor::try_wait() noexcept {
83   return func_ || queue_.try_dequeue(func_);
84 }
85 
86 } // namespace folly
87