1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <functional>
20 #include <memory>
21 #include <queue>
22 #include <thread>
23 #include <type_traits>
24 #include <typeindex>
25 #include <unordered_set>
26 #include <vector>
27 
28 #include <folly/AtomicIntrusiveLinkedList.h>
29 #include <folly/CPortability.h>
30 #include <folly/Executor.h>
31 #include <folly/IntrusiveList.h>
32 #include <folly/Likely.h>
33 #include <folly/Portability.h>
34 #include <folly/Try.h>
35 #include <folly/functional/Invoke.h>
36 #include <folly/io/async/HHWheelTimer.h>
37 #include <folly/io/async/Request.h>
38 
39 #include <folly/experimental/ExecutionObserver.h>
40 #include <folly/fibers/BoostContextCompatibility.h>
41 #include <folly/fibers/Fiber.h>
42 #include <folly/fibers/GuardPageAllocator.h>
43 #include <folly/fibers/LoopController.h>
44 #include <folly/fibers/traits.h>
45 
46 namespace folly {
47 
48 template <class T>
49 class Future;
50 
51 namespace fibers {
52 
53 class Baton;
54 class Fiber;
55 
56 struct TaskOptions;
57 
58 template <typename T>
59 class LocalType {};
60 
61 class InlineFunctionRunner {
62  public:
~InlineFunctionRunner()63   virtual ~InlineFunctionRunner() {}
64 
65   /**
66    * func must be executed inline and only once.
67    */
68   virtual void run(folly::Function<void()> func) = 0;
69 };
70 
71 /**
72  * @class FiberManager
73  * @brief Single-threaded task execution engine.
74  *
75  * FiberManager allows semi-parallel task execution on the same thread. Each
76  * task can notify FiberManager that it is blocked on something (via await())
77  * call. This will pause execution of this task and it will be resumed only
78  * when it is unblocked (via setData()).
79  */
80 class FiberManager : public ::folly::Executor {
81  public:
82   struct Options {
83     static constexpr size_t kDefaultStackSize{16 * 1024};
84 
85     /**
86      * Maximum stack size for fibers which will be used for executing all the
87      * tasks.
88      */
89     size_t stackSize{kDefaultStackSize};
90 
91     /**
92      * Sanitizers need a lot of extra stack space. 16x is a conservative
93      * estimate, but 8x also worked with tests where it mattered. Similarly,
94      * debug builds need extra stack space due to reduced inlining.
95      *
96      * Note that over-allocating here does not necessarily increase RSS, since
97      * unused memory is pretty much free.
98      */
99     size_t stackSizeMultiplier{kIsSanitize ? 16 : (kIsDebug ? 2 : 1)};
100 
101     /**
102      * Record exact amount of stack used.
103      *
104      * This is fairly expensive: we fill each newly allocated stack
105      * with some known value and find the boundary of unused stack
106      * with linear search every time we surrender the stack back to fibersPool.
107      * 0 disables stack recording.
108      */
109     size_t recordStackEvery{0};
110 
111     /**
112      * Keep at most this many free fibers in the pool.
113      * This way the total number of fibers in the system is always bounded
114      * by the number of active fibers + maxFibersPoolSize.
115      */
116     size_t maxFibersPoolSize{1000};
117 
118     /**
119      * Protect a small number of fiber stacks with this many guard pages.
120      */
121     size_t guardPagesPerStack{1};
122 
123     /**
124      * Free unnecessary fibers in the fibers pool every fibersPoolResizePeriodMs
125      * milliseconds. If value is 0, periodic resizing of the fibers pool is
126      * disabled.
127      */
128     uint32_t fibersPoolResizePeriodMs{0};
129 
OptionsOptions130     constexpr Options() {}
131 
hashOptions132     auto hash() const {
133       return std::make_tuple(
134           stackSize,
135           stackSizeMultiplier,
136           recordStackEvery,
137           maxFibersPoolSize,
138           guardPagesPerStack,
139           fibersPoolResizePeriodMs);
140     }
141   };
142 
143   /**
144    * A (const) Options instance with a dedicated unique identifier,
145    * which is used as a key in FiberManagerMap.
146    * This is relevant if you want to run different FiberManager,
147    * with different Option, on the same EventBase.
148    */
149   struct FrozenOptions {
FrozenOptionsFrozenOptions150     explicit FrozenOptions(Options opts)
151         : options(std::move(opts)), token(create(options)) {}
152 
153     const Options options;
154     const ssize_t token;
155 
156    private:
157     static ssize_t create(const Options&);
158   };
159 
160   using ExceptionCallback =
161       folly::Function<void(std::exception_ptr, std::string)>;
162 
163   FiberManager(const FiberManager&) = delete;
164   FiberManager& operator=(const FiberManager&) = delete;
165 
166   /**
167    * Initializes, but doesn't start FiberManager loop
168    *
169    * @param loopController A LoopController object
170    * @param options FiberManager options
171    */
172   explicit FiberManager(
173       std::unique_ptr<LoopController> loopController,
174       Options options = Options());
175 
176   /**
177    * Initializes, but doesn't start FiberManager loop
178    *
179    * @param loopController A LoopController object
180    * @param options FiberManager options
181    * @tparam LocalT only local of this type may be stored on fibers.
182    *                Locals of other types will be considered thread-locals.
183    */
184   template <typename LocalT>
185   FiberManager(
186       LocalType<LocalT>,
187       std::unique_ptr<LoopController> loopController,
188       Options options = Options());
189 
190   ~FiberManager() override;
191 
192   /**
193    * Controller access.
194    */
195   LoopController& loopController();
196   const LoopController& loopController() const;
197 
198   /**
199    * Keeps running ready tasks until the list of ready tasks is empty.
200    */
201   void loopUntilNoReady();
202 
203   /**
204    * This should only be called by a LoopController.
205    */
206   void loopUntilNoReadyImpl();
207 
208   /**
209    * This should only be called by a LoopController.
210    */
211   void runEagerFiberImpl(Fiber*);
212 
213   /**
214    * This should only be called by a LoopController.
215    */
216   bool shouldRunLoopRemote();
217 
218   /**
219    * @return true if there are outstanding tasks.
220    */
221   bool hasTasks() const;
222   bool isRemoteScheduled() const;
223 
224   /**
225    * @return The number of currently active fibers (ready to run or blocked).
226    * Does not include the number of remotely enqueued tasks that have not been
227    * run yet.
228    */
numActiveTasks()229   size_t numActiveTasks() const noexcept { return fibersActive_; }
230 
231   /**
232    * @return true if there are tasks ready to run.
233    */
234   bool hasReadyTasks() const;
235 
236   /**
237    * Sets exception callback which will be called if any of the tasks throws an
238    * exception.
239    *
240    * @param ec An ExceptionCallback object.
241    */
242   void setExceptionCallback(ExceptionCallback ec);
243 
244   /**
245    * Add a new task to be executed. Must be called from FiberManager's thread.
246    *
247    * @param func Task functor; must have a signature of `void func()`.
248    *             The object will be destroyed once task execution is complete.
249    * @param taskOptions Task specific configs.
250    */
251   template <typename F>
252   void addTask(F&& func, TaskOptions taskOptions = TaskOptions());
253 
254   /**
255    * Add a new task to be executed and return a future that will be set on
256    * return from func. Must be called from FiberManager's thread.
257    *
258    * @param func Task functor; must have a signature of `void func()`.
259    *             The object will be destroyed once task execution is complete.
260    */
261   template <typename F>
262   auto addTaskFuture(F&& func)
263       -> folly::Future<folly::lift_unit_t<invoke_result_t<F>>>;
264 
265   /**
266    * Add a new task to be executed. Must be called from FiberManager's thread.
267    * The new task is run eagerly. addTaskEager will return only once the new
268    * task reaches its first suspension point or is completed.
269    *
270    * @param func Task functor; must have a signature of `void func()`.
271    *             The object will be destroyed once task execution is complete.
272    */
273   template <typename F>
274   void addTaskEager(F&& func);
275 
276   /**
277    * Add a new task to be executed and return a future that will be set on
278    * return from func. Must be called from FiberManager's thread.
279    * The new task is run eagerly. addTaskEager will return only once the new
280    * task reaches its first suspension point or is completed.
281    *
282    * @param func Task functor; must have a signature of `void func()`.
283    *             The object will be destroyed once task execution is complete.
284    */
285   template <typename F>
286   auto addTaskEagerFuture(F&& func)
287       -> folly::Future<folly::lift_unit_t<invoke_result_t<F>>>;
288 
289   /**
290    * Add a new task to be executed. Safe to call from other threads.
291    *
292    * @param func Task function; must have a signature of `void func()`.
293    *             The object will be destroyed once task execution is complete.
294    */
295   template <typename F>
296   void addTaskRemote(F&& func);
297 
298   /**
299    * Add a new task to be executed and return a future that will be set on
300    * return from func. Safe to call from other threads.
301    *
302    * @param func Task function; must have a signature of `void func()`.
303    *             The object will be destroyed once task execution is complete.
304    */
305   template <typename F>
306   auto addTaskRemoteFuture(F&& func)
307       -> folly::Future<folly::lift_unit_t<invoke_result_t<F>>>;
308 
309   // Executor interface calls addTaskRemote
add(folly::Func f)310   void add(folly::Func f) override { addTaskRemote(std::move(f)); }
311 
312   /**
313    * Add a new task. When the task is complete, execute finally(Try<Result>&&)
314    * on the main context.
315    *
316    * @param func Task functor; must have a signature of `T func()` for some T.
317    * @param finally Finally functor; must have a signature of
318    *                `void finally(Try<T>&&)` and will be passed
319    *                the result of func() (including the exception if occurred).
320    */
321   template <typename F, typename G>
322   void addTaskFinally(F&& func, G&& finally);
323 
324   /**
325    * Add a new task. When the task is complete, execute finally(Try<Result>&&)
326    * on the main context.
327    * The new task is run eagerly. addTaskEager will return only once the new
328    * task reaches its first suspension point or is completed.
329    *
330    * @param func Task functor; must have a signature of `T func()` for some T.
331    * @param finally Finally functor; must have a signature of
332    *                `void finally(Try<T>&&)` and will be passed
333    *                the result of func() (including the exception if occurred).
334    */
335   template <typename F, typename G>
336   void addTaskFinallyEager(F&& func, G&& finally);
337 
338   /**
339    * If called from a fiber, immediately switches to the FiberManager's context
340    * and runs func(), going back to the Fiber's context after completion.
341    * Outside a fiber, just calls func() directly.
342    *
343    * @return value returned by func().
344    */
345   template <typename F>
346   invoke_result_t<F> runInMainContext(F&& func);
347 
348   /**
349    * Returns a refference to a fiber-local context for given Fiber. Should be
350    * always called with the same T for each fiber. Fiber-local context is lazily
351    * default-constructed on first request.
352    * When new task is scheduled via addTask / addTaskRemote from a fiber its
353    * fiber-local context is copied into the new fiber.
354    */
355   template <typename T>
356   T& local();
357 
358   template <typename T>
359   FOLLY_EXPORT static T& localThread();
360 
361   /**
362    * @return How many fiber objects (and stacks) has this manager allocated.
363    */
364   size_t fibersAllocated() const;
365 
366   /**
367    * @return How many of the allocated fiber objects are currently
368    * in the free pool.
369    */
370   size_t fibersPoolSize() const;
371 
372   /**
373    * @return true if running activeFiber_ is not nullptr.
374    */
375   bool hasActiveFiber() const;
376 
377   /**
378    * @return How long has the currently running task on the fiber ran, in
379    * terms of wallclock time. This excludes the time spent in preempted or
380    * waiting stages. This only works if TaskOptions.logRunningTime is true
381    * during addTask().
382    */
383   folly::Optional<std::chrono::nanoseconds> getCurrentTaskRunningTime() const;
384 
385   /**
386    * @return The currently running fiber or null if no fiber is executing.
387    */
currentFiber()388   Fiber* currentFiber() const { return currentFiber_; }
389 
390   /**
391    * @return What was the most observed fiber stack usage (in bytes).
392    */
393   size_t stackHighWatermark() const;
394 
395   /**
396    * Yield execution of the currently running fiber. Must only be called from a
397    * fiber executing on this FiberManager. The calling fiber will be scheduled
398    * when all other fibers have had a chance to run and the event loop is
399    * serviced.
400    */
401   void yield();
402 
403   /**
404    * Setup fibers execution observation/instrumentation. Fiber locals are
405    * available to observer.
406    *
407    * @param observer  Fiber's execution observer.
408    */
409   void setObserver(ExecutionObserver* observer);
410 
411   /**
412    * @return Current observer for this FiberManager. Returns nullptr
413    * if no observer has been set.
414    */
415   ExecutionObserver* getObserver();
416 
417   /**
418    * Setup fibers preempt runner.
419    */
420   void setPreemptRunner(InlineFunctionRunner* preemptRunner);
421 
422   /**
423    * Returns an estimate of the number of fibers which are waiting to run (does
424    * not include fibers or tasks scheduled remotely).
425    */
runQueueSize()426   size_t runQueueSize() const {
427     return readyFibers_.size() + (yieldedFibers_ ? yieldedFibers_->size() : 0);
428   }
429 
430   static FiberManager& getFiberManager();
431   static FiberManager* getFiberManagerUnsafe();
432 
getOptions()433   const Options& getOptions() const { return options_; }
434 
435  private:
436   friend class Baton;
437   friend class Fiber;
438   template <typename F>
439   struct AddTaskHelper;
440   template <typename F, typename G>
441   struct AddTaskFinallyHelper;
442 
443   struct RemoteTask {
444     template <typename F>
RemoteTaskRemoteTask445     explicit RemoteTask(F&& f)
446         : func(std::forward<F>(f)), rcontext(RequestContext::saveContext()) {}
447     template <typename F>
RemoteTaskRemoteTask448     RemoteTask(F&& f, const Fiber::LocalData& localData_)
449         : func(std::forward<F>(f)),
450           localData(std::make_unique<Fiber::LocalData>(localData_)),
451           rcontext(RequestContext::saveContext()) {}
452     folly::Function<void()> func;
453     std::unique_ptr<Fiber::LocalData> localData;
454     std::shared_ptr<RequestContext> rcontext;
455     AtomicIntrusiveLinkedListHook<RemoteTask> nextRemoteTask;
456   };
457 
458   template <typename F>
459   Fiber* createTask(F&& func, TaskOptions taskOptions);
460 
461   template <typename F, typename G>
462   Fiber* createTaskFinally(F&& func, G&& finally);
463 
464   void runEagerFiber(Fiber* fiber);
465 
466   void activateFiber(Fiber* fiber);
467   void deactivateFiber(Fiber* fiber);
468 
469   template <typename LoopFunc>
470   void runFibersHelper(LoopFunc&& loopFunc);
471 
472   size_t recordStackPosition(size_t position);
473 
474   typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
475   typedef folly::IntrusiveList<Fiber, &Fiber::globalListHook_>
476       GlobalFiberTailQueue;
477 
478   Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */
479   /**
480    * Same as active fiber, but also set for functions run from fiber on main
481    * context.
482    */
483   Fiber* currentFiber_{nullptr};
484 
485   FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */
486   FiberTailQueue* yieldedFibers_{nullptr}; /**< queue of fibers which have
487                                       yielded execution */
488   FiberTailQueue fibersPool_; /**< pool of uninitialized Fiber objects */
489 
490   GlobalFiberTailQueue allFibers_; /**< list of all Fiber objects owned */
491 
492   // total number of fibers allocated
493   std::atomic<size_t> fibersAllocated_{0};
494   // total number of fibers in the free pool
495   std::atomic<size_t> fibersPoolSize_{0};
496   size_t fibersActive_{0}; /**< number of running or blocked fibers */
497   size_t fiberId_{0}; /**< id of last fiber used */
498 
499   /**
500    * Maximum number of active fibers in the last period lasting
501    * Options::fibersPoolResizePeriod milliseconds.
502    */
503   size_t maxFibersActiveLastPeriod_{0};
504 
505   std::unique_ptr<LoopController> loopController_;
506   bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */
507 
508   /**
509    * When we are inside FiberManager loop this points to FiberManager. Otherwise
510    * it's nullptr
511    */
512   static FiberManager*& getCurrentFiberManager();
513 
514   /**
515    * Allocator used to allocate stack for Fibers in the pool.
516    * Allocates stack on the stack of the main context.
517    */
518   GuardPageAllocator stackAllocator_;
519 
520   const Options options_; /**< FiberManager options */
521 
522   /**
523    * Largest observed individual Fiber stack usage in bytes.
524    */
525   std::atomic<size_t> stackHighWatermark_{0};
526 
527   /**
528    * Schedules a loop with loopController (unless already scheduled before).
529    */
530   void ensureLoopScheduled();
531 
532   /**
533    * @return An initialized Fiber object from the pool
534    */
535   Fiber* getFiber();
536 
537   /**
538    * Sets local data for given fiber if all conditions are met.
539    */
540   void initLocalData(Fiber& fiber);
541 
542   /**
543    * Function passed to the await call.
544    */
545   folly::Function<void(Fiber&)> awaitFunc_;
546 
547   /**
548    * Function passed to the runInMainContext call.
549    */
550   folly::Function<void()> immediateFunc_;
551 
552   /**
553    * Preempt runner.
554    */
555   InlineFunctionRunner* preemptRunner_{nullptr};
556 
557   /**
558    * Fiber's execution observer.
559    */
560   ExecutionObserver* observer_{nullptr};
561 
562   ExceptionCallback exceptionCallback_; /**< task exception callback */
563 
564   folly::AtomicIntrusiveLinkedList<Fiber, &Fiber::nextRemoteReady_>
565       remoteReadyQueue_;
566 
567   folly::AtomicIntrusiveLinkedList<RemoteTask, &RemoteTask::nextRemoteTask>
568       remoteTaskQueue_;
569 
570   ssize_t remoteCount_{0};
571 
572   /**
573    * Number of uncaught exceptions when FiberManager loop was called.
574    */
575   ssize_t numUncaughtExceptions_{0};
576   /**
577    * Current exception when FiberManager loop was called.
578    */
579   std::exception_ptr currentException_;
580 
581   class FibersPoolResizer final : private HHWheelTimer::Callback {
582    public:
FibersPoolResizer(FiberManager & fm)583     explicit FibersPoolResizer(FiberManager& fm) : fiberManager_(fm) {}
584     void run();
585 
586    private:
587     FiberManager& fiberManager_;
timeoutExpired()588     void timeoutExpired() noexcept override { run(); }
callbackCanceled()589     void callbackCanceled() noexcept override {}
590   };
591 
592   FibersPoolResizer fibersPoolResizer_;
593   bool fibersPoolResizerScheduled_{false};
594 
595   void doFibersPoolResizing();
596 
597   /**
598    * Only local of this type will be available for fibers.
599    */
600   std::type_index localType_;
601 
602   void runReadyFiber(Fiber* fiber);
603   void remoteReadyInsert(Fiber* fiber);
604 
605   // These methods notify ASAN when a fiber is entered/exited so that ASAN can
606   // find the right stack extents when it needs to poison/unpoison the stack.
607   void registerStartSwitchStackWithAsan(
608       void** saveFakeStack, const void* stackBase, size_t stackSize);
609   void registerFinishSwitchStackWithAsan(
610       void* fakeStack, const void** saveStackBase, size_t* saveStackSize);
611   void freeFakeStack(void* fakeStack);
612   void unpoisonFiberStack(const Fiber* fiber);
613 
614   bool alternateSignalStackRegistered_{false};
615 
616   void maybeRegisterAlternateSignalStack();
617 };
618 
619 /**
620  * @return      true iff we are running in a fiber's context
621  */
onFiber()622 inline bool onFiber() {
623   auto fm = FiberManager::getFiberManagerUnsafe();
624   return fm ? fm->hasActiveFiber() : false;
625 }
626 
627 /**
628  * Add a new task to be executed.
629  *
630  * @param func Task functor; must have a signature of `void func()`.
631  *             The object will be destroyed once task execution is complete.
632  */
633 template <typename F>
addTask(F && func)634 inline void addTask(F&& func) {
635   return FiberManager::getFiberManager().addTask(std::forward<F>(func));
636 }
637 
638 template <typename F>
addTaskEager(F && func)639 inline void addTaskEager(F&& func) {
640   return FiberManager::getFiberManager().addTaskEager(std::forward<F>(func));
641 }
642 
643 /**
644  * Add a new task. When the task is complete, execute finally(Try<Result>&&)
645  * on the main context.
646  * Task functor is run and destroyed on the fiber context.
647  * Finally functor is run and destroyed on the main context.
648  *
649  * @param func Task functor; must have a signature of `T func()` for some T.
650  * @param finally Finally functor; must have a signature of
651  *                `void finally(Try<T>&&)` and will be passed
652  *                the result of func() (including the exception if occurred).
653  */
654 template <typename F, typename G>
addTaskFinally(F && func,G && finally)655 inline void addTaskFinally(F&& func, G&& finally) {
656   return FiberManager::getFiberManager().addTaskFinally(
657       std::forward<F>(func), std::forward<G>(finally));
658 }
659 
660 template <typename F, typename G>
addTaskFinallyEager(F && func,G && finally)661 inline void addTaskFinallyEager(F&& func, G&& finally) {
662   return FiberManager::getFiberManager().addTaskFinallyEager(
663       std::forward<F>(func), std::forward<G>(finally));
664 }
665 
666 /**
667  * Blocks task execution until given promise is fulfilled.
668  *
669  * Calls function passing in a Promise<T>, which has to be fulfilled.
670  *
671  * @return data which was used to fulfill the promise.
672  */
673 template <typename F>
674 typename FirstArgOf<F>::type::value_type inline await_async(F&& func);
675 #if !defined(_MSC_VER)
676 template <typename F>
await(F && func)677 FOLLY_ERASE typename FirstArgOf<F>::type::value_type await(F&& func) {
678   return await_async(static_cast<F&&>(func));
679 }
680 #endif
681 
682 /**
683  * If called from a fiber, immediately switches to the FiberManager's context
684  * and runs func(), going back to the Fiber's context after completion.
685  * Outside a fiber, just calls func() directly.
686  *
687  * @return value returned by func().
688  */
689 template <typename F>
690 invoke_result_t<F> inline runInMainContext(F&& func);
691 
692 /**
693  * Returns a refference to a fiber-local context for given Fiber. Should be
694  * always called with the same T for each fiber. Fiber-local context is lazily
695  * default-constructed on first request.
696  * When new task is scheduled via addTask / addTaskRemote from a fiber its
697  * fiber-local context is copied into the new fiber.
698  */
699 template <typename T>
local()700 T& local() {
701   auto fm = FiberManager::getFiberManagerUnsafe();
702   if (fm) {
703     return fm->local<T>();
704   }
705   return FiberManager::localThread<T>();
706 }
707 
yield()708 inline void yield() {
709   auto fm = FiberManager::getFiberManagerUnsafe();
710   if (fm) {
711     fm->yield();
712   } else {
713     std::this_thread::yield();
714   }
715 }
716 } // namespace fibers
717 } // namespace folly
718 
719 #include <folly/fibers/FiberManagerInternal-inl.h>
720