1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #pragma once
18
19 #include <functional>
20 #include <memory>
21 #include <queue>
22 #include <thread>
23 #include <type_traits>
24 #include <typeindex>
25 #include <unordered_set>
26 #include <vector>
27
28 #include <folly/AtomicIntrusiveLinkedList.h>
29 #include <folly/CPortability.h>
30 #include <folly/Executor.h>
31 #include <folly/IntrusiveList.h>
32 #include <folly/Likely.h>
33 #include <folly/Portability.h>
34 #include <folly/Try.h>
35 #include <folly/functional/Invoke.h>
36 #include <folly/io/async/HHWheelTimer.h>
37 #include <folly/io/async/Request.h>
38
39 #include <folly/experimental/ExecutionObserver.h>
40 #include <folly/fibers/BoostContextCompatibility.h>
41 #include <folly/fibers/Fiber.h>
42 #include <folly/fibers/GuardPageAllocator.h>
43 #include <folly/fibers/LoopController.h>
44 #include <folly/fibers/traits.h>
45
46 namespace folly {
47
48 template <class T>
49 class Future;
50
51 namespace fibers {
52
53 class Baton;
54 class Fiber;
55
56 struct TaskOptions;
57
58 template <typename T>
59 class LocalType {};
60
61 class InlineFunctionRunner {
62 public:
~InlineFunctionRunner()63 virtual ~InlineFunctionRunner() {}
64
65 /**
66 * func must be executed inline and only once.
67 */
68 virtual void run(folly::Function<void()> func) = 0;
69 };
70
71 /**
72 * @class FiberManager
73 * @brief Single-threaded task execution engine.
74 *
75 * FiberManager allows semi-parallel task execution on the same thread. Each
76 * task can notify FiberManager that it is blocked on something (via await())
77 * call. This will pause execution of this task and it will be resumed only
78 * when it is unblocked (via setData()).
79 */
80 class FiberManager : public ::folly::Executor {
81 public:
82 struct Options {
83 static constexpr size_t kDefaultStackSize{16 * 1024};
84
85 /**
86 * Maximum stack size for fibers which will be used for executing all the
87 * tasks.
88 */
89 size_t stackSize{kDefaultStackSize};
90
91 /**
92 * Sanitizers need a lot of extra stack space. 16x is a conservative
93 * estimate, but 8x also worked with tests where it mattered. Similarly,
94 * debug builds need extra stack space due to reduced inlining.
95 *
96 * Note that over-allocating here does not necessarily increase RSS, since
97 * unused memory is pretty much free.
98 */
99 size_t stackSizeMultiplier{kIsSanitize ? 16 : (kIsDebug ? 2 : 1)};
100
101 /**
102 * Record exact amount of stack used.
103 *
104 * This is fairly expensive: we fill each newly allocated stack
105 * with some known value and find the boundary of unused stack
106 * with linear search every time we surrender the stack back to fibersPool.
107 * 0 disables stack recording.
108 */
109 size_t recordStackEvery{0};
110
111 /**
112 * Keep at most this many free fibers in the pool.
113 * This way the total number of fibers in the system is always bounded
114 * by the number of active fibers + maxFibersPoolSize.
115 */
116 size_t maxFibersPoolSize{1000};
117
118 /**
119 * Protect a small number of fiber stacks with this many guard pages.
120 */
121 size_t guardPagesPerStack{1};
122
123 /**
124 * Free unnecessary fibers in the fibers pool every fibersPoolResizePeriodMs
125 * milliseconds. If value is 0, periodic resizing of the fibers pool is
126 * disabled.
127 */
128 uint32_t fibersPoolResizePeriodMs{0};
129
OptionsOptions130 constexpr Options() {}
131
hashOptions132 auto hash() const {
133 return std::make_tuple(
134 stackSize,
135 stackSizeMultiplier,
136 recordStackEvery,
137 maxFibersPoolSize,
138 guardPagesPerStack,
139 fibersPoolResizePeriodMs);
140 }
141 };
142
143 /**
144 * A (const) Options instance with a dedicated unique identifier,
145 * which is used as a key in FiberManagerMap.
146 * This is relevant if you want to run different FiberManager,
147 * with different Option, on the same EventBase.
148 */
149 struct FrozenOptions {
FrozenOptionsFrozenOptions150 explicit FrozenOptions(Options opts)
151 : options(std::move(opts)), token(create(options)) {}
152
153 const Options options;
154 const ssize_t token;
155
156 private:
157 static ssize_t create(const Options&);
158 };
159
160 using ExceptionCallback =
161 folly::Function<void(std::exception_ptr, std::string)>;
162
163 FiberManager(const FiberManager&) = delete;
164 FiberManager& operator=(const FiberManager&) = delete;
165
166 /**
167 * Initializes, but doesn't start FiberManager loop
168 *
169 * @param loopController A LoopController object
170 * @param options FiberManager options
171 */
172 explicit FiberManager(
173 std::unique_ptr<LoopController> loopController,
174 Options options = Options());
175
176 /**
177 * Initializes, but doesn't start FiberManager loop
178 *
179 * @param loopController A LoopController object
180 * @param options FiberManager options
181 * @tparam LocalT only local of this type may be stored on fibers.
182 * Locals of other types will be considered thread-locals.
183 */
184 template <typename LocalT>
185 FiberManager(
186 LocalType<LocalT>,
187 std::unique_ptr<LoopController> loopController,
188 Options options = Options());
189
190 ~FiberManager() override;
191
192 /**
193 * Controller access.
194 */
195 LoopController& loopController();
196 const LoopController& loopController() const;
197
198 /**
199 * Keeps running ready tasks until the list of ready tasks is empty.
200 */
201 void loopUntilNoReady();
202
203 /**
204 * This should only be called by a LoopController.
205 */
206 void loopUntilNoReadyImpl();
207
208 /**
209 * This should only be called by a LoopController.
210 */
211 void runEagerFiberImpl(Fiber*);
212
213 /**
214 * This should only be called by a LoopController.
215 */
216 bool shouldRunLoopRemote();
217
218 /**
219 * @return true if there are outstanding tasks.
220 */
221 bool hasTasks() const;
222 bool isRemoteScheduled() const;
223
224 /**
225 * @return The number of currently active fibers (ready to run or blocked).
226 * Does not include the number of remotely enqueued tasks that have not been
227 * run yet.
228 */
numActiveTasks()229 size_t numActiveTasks() const noexcept { return fibersActive_; }
230
231 /**
232 * @return true if there are tasks ready to run.
233 */
234 bool hasReadyTasks() const;
235
236 /**
237 * Sets exception callback which will be called if any of the tasks throws an
238 * exception.
239 *
240 * @param ec An ExceptionCallback object.
241 */
242 void setExceptionCallback(ExceptionCallback ec);
243
244 /**
245 * Add a new task to be executed. Must be called from FiberManager's thread.
246 *
247 * @param func Task functor; must have a signature of `void func()`.
248 * The object will be destroyed once task execution is complete.
249 * @param taskOptions Task specific configs.
250 */
251 template <typename F>
252 void addTask(F&& func, TaskOptions taskOptions = TaskOptions());
253
254 /**
255 * Add a new task to be executed and return a future that will be set on
256 * return from func. Must be called from FiberManager's thread.
257 *
258 * @param func Task functor; must have a signature of `void func()`.
259 * The object will be destroyed once task execution is complete.
260 */
261 template <typename F>
262 auto addTaskFuture(F&& func)
263 -> folly::Future<folly::lift_unit_t<invoke_result_t<F>>>;
264
265 /**
266 * Add a new task to be executed. Must be called from FiberManager's thread.
267 * The new task is run eagerly. addTaskEager will return only once the new
268 * task reaches its first suspension point or is completed.
269 *
270 * @param func Task functor; must have a signature of `void func()`.
271 * The object will be destroyed once task execution is complete.
272 */
273 template <typename F>
274 void addTaskEager(F&& func);
275
276 /**
277 * Add a new task to be executed and return a future that will be set on
278 * return from func. Must be called from FiberManager's thread.
279 * The new task is run eagerly. addTaskEager will return only once the new
280 * task reaches its first suspension point or is completed.
281 *
282 * @param func Task functor; must have a signature of `void func()`.
283 * The object will be destroyed once task execution is complete.
284 */
285 template <typename F>
286 auto addTaskEagerFuture(F&& func)
287 -> folly::Future<folly::lift_unit_t<invoke_result_t<F>>>;
288
289 /**
290 * Add a new task to be executed. Safe to call from other threads.
291 *
292 * @param func Task function; must have a signature of `void func()`.
293 * The object will be destroyed once task execution is complete.
294 */
295 template <typename F>
296 void addTaskRemote(F&& func);
297
298 /**
299 * Add a new task to be executed and return a future that will be set on
300 * return from func. Safe to call from other threads.
301 *
302 * @param func Task function; must have a signature of `void func()`.
303 * The object will be destroyed once task execution is complete.
304 */
305 template <typename F>
306 auto addTaskRemoteFuture(F&& func)
307 -> folly::Future<folly::lift_unit_t<invoke_result_t<F>>>;
308
309 // Executor interface calls addTaskRemote
add(folly::Func f)310 void add(folly::Func f) override { addTaskRemote(std::move(f)); }
311
312 /**
313 * Add a new task. When the task is complete, execute finally(Try<Result>&&)
314 * on the main context.
315 *
316 * @param func Task functor; must have a signature of `T func()` for some T.
317 * @param finally Finally functor; must have a signature of
318 * `void finally(Try<T>&&)` and will be passed
319 * the result of func() (including the exception if occurred).
320 */
321 template <typename F, typename G>
322 void addTaskFinally(F&& func, G&& finally);
323
324 /**
325 * Add a new task. When the task is complete, execute finally(Try<Result>&&)
326 * on the main context.
327 * The new task is run eagerly. addTaskEager will return only once the new
328 * task reaches its first suspension point or is completed.
329 *
330 * @param func Task functor; must have a signature of `T func()` for some T.
331 * @param finally Finally functor; must have a signature of
332 * `void finally(Try<T>&&)` and will be passed
333 * the result of func() (including the exception if occurred).
334 */
335 template <typename F, typename G>
336 void addTaskFinallyEager(F&& func, G&& finally);
337
338 /**
339 * If called from a fiber, immediately switches to the FiberManager's context
340 * and runs func(), going back to the Fiber's context after completion.
341 * Outside a fiber, just calls func() directly.
342 *
343 * @return value returned by func().
344 */
345 template <typename F>
346 invoke_result_t<F> runInMainContext(F&& func);
347
348 /**
349 * Returns a refference to a fiber-local context for given Fiber. Should be
350 * always called with the same T for each fiber. Fiber-local context is lazily
351 * default-constructed on first request.
352 * When new task is scheduled via addTask / addTaskRemote from a fiber its
353 * fiber-local context is copied into the new fiber.
354 */
355 template <typename T>
356 T& local();
357
358 template <typename T>
359 FOLLY_EXPORT static T& localThread();
360
361 /**
362 * @return How many fiber objects (and stacks) has this manager allocated.
363 */
364 size_t fibersAllocated() const;
365
366 /**
367 * @return How many of the allocated fiber objects are currently
368 * in the free pool.
369 */
370 size_t fibersPoolSize() const;
371
372 /**
373 * @return true if running activeFiber_ is not nullptr.
374 */
375 bool hasActiveFiber() const;
376
377 /**
378 * @return How long has the currently running task on the fiber ran, in
379 * terms of wallclock time. This excludes the time spent in preempted or
380 * waiting stages. This only works if TaskOptions.logRunningTime is true
381 * during addTask().
382 */
383 folly::Optional<std::chrono::nanoseconds> getCurrentTaskRunningTime() const;
384
385 /**
386 * @return The currently running fiber or null if no fiber is executing.
387 */
currentFiber()388 Fiber* currentFiber() const { return currentFiber_; }
389
390 /**
391 * @return What was the most observed fiber stack usage (in bytes).
392 */
393 size_t stackHighWatermark() const;
394
395 /**
396 * Yield execution of the currently running fiber. Must only be called from a
397 * fiber executing on this FiberManager. The calling fiber will be scheduled
398 * when all other fibers have had a chance to run and the event loop is
399 * serviced.
400 */
401 void yield();
402
403 /**
404 * Setup fibers execution observation/instrumentation. Fiber locals are
405 * available to observer.
406 *
407 * @param observer Fiber's execution observer.
408 */
409 void setObserver(ExecutionObserver* observer);
410
411 /**
412 * @return Current observer for this FiberManager. Returns nullptr
413 * if no observer has been set.
414 */
415 ExecutionObserver* getObserver();
416
417 /**
418 * Setup fibers preempt runner.
419 */
420 void setPreemptRunner(InlineFunctionRunner* preemptRunner);
421
422 /**
423 * Returns an estimate of the number of fibers which are waiting to run (does
424 * not include fibers or tasks scheduled remotely).
425 */
runQueueSize()426 size_t runQueueSize() const {
427 return readyFibers_.size() + (yieldedFibers_ ? yieldedFibers_->size() : 0);
428 }
429
430 static FiberManager& getFiberManager();
431 static FiberManager* getFiberManagerUnsafe();
432
getOptions()433 const Options& getOptions() const { return options_; }
434
435 private:
436 friend class Baton;
437 friend class Fiber;
438 template <typename F>
439 struct AddTaskHelper;
440 template <typename F, typename G>
441 struct AddTaskFinallyHelper;
442
443 struct RemoteTask {
444 template <typename F>
RemoteTaskRemoteTask445 explicit RemoteTask(F&& f)
446 : func(std::forward<F>(f)), rcontext(RequestContext::saveContext()) {}
447 template <typename F>
RemoteTaskRemoteTask448 RemoteTask(F&& f, const Fiber::LocalData& localData_)
449 : func(std::forward<F>(f)),
450 localData(std::make_unique<Fiber::LocalData>(localData_)),
451 rcontext(RequestContext::saveContext()) {}
452 folly::Function<void()> func;
453 std::unique_ptr<Fiber::LocalData> localData;
454 std::shared_ptr<RequestContext> rcontext;
455 AtomicIntrusiveLinkedListHook<RemoteTask> nextRemoteTask;
456 };
457
458 template <typename F>
459 Fiber* createTask(F&& func, TaskOptions taskOptions);
460
461 template <typename F, typename G>
462 Fiber* createTaskFinally(F&& func, G&& finally);
463
464 void runEagerFiber(Fiber* fiber);
465
466 void activateFiber(Fiber* fiber);
467 void deactivateFiber(Fiber* fiber);
468
469 template <typename LoopFunc>
470 void runFibersHelper(LoopFunc&& loopFunc);
471
472 size_t recordStackPosition(size_t position);
473
474 typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
475 typedef folly::IntrusiveList<Fiber, &Fiber::globalListHook_>
476 GlobalFiberTailQueue;
477
478 Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */
479 /**
480 * Same as active fiber, but also set for functions run from fiber on main
481 * context.
482 */
483 Fiber* currentFiber_{nullptr};
484
485 FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */
486 FiberTailQueue* yieldedFibers_{nullptr}; /**< queue of fibers which have
487 yielded execution */
488 FiberTailQueue fibersPool_; /**< pool of uninitialized Fiber objects */
489
490 GlobalFiberTailQueue allFibers_; /**< list of all Fiber objects owned */
491
492 // total number of fibers allocated
493 std::atomic<size_t> fibersAllocated_{0};
494 // total number of fibers in the free pool
495 std::atomic<size_t> fibersPoolSize_{0};
496 size_t fibersActive_{0}; /**< number of running or blocked fibers */
497 size_t fiberId_{0}; /**< id of last fiber used */
498
499 /**
500 * Maximum number of active fibers in the last period lasting
501 * Options::fibersPoolResizePeriod milliseconds.
502 */
503 size_t maxFibersActiveLastPeriod_{0};
504
505 std::unique_ptr<LoopController> loopController_;
506 bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */
507
508 /**
509 * When we are inside FiberManager loop this points to FiberManager. Otherwise
510 * it's nullptr
511 */
512 static FiberManager*& getCurrentFiberManager();
513
514 /**
515 * Allocator used to allocate stack for Fibers in the pool.
516 * Allocates stack on the stack of the main context.
517 */
518 GuardPageAllocator stackAllocator_;
519
520 const Options options_; /**< FiberManager options */
521
522 /**
523 * Largest observed individual Fiber stack usage in bytes.
524 */
525 std::atomic<size_t> stackHighWatermark_{0};
526
527 /**
528 * Schedules a loop with loopController (unless already scheduled before).
529 */
530 void ensureLoopScheduled();
531
532 /**
533 * @return An initialized Fiber object from the pool
534 */
535 Fiber* getFiber();
536
537 /**
538 * Sets local data for given fiber if all conditions are met.
539 */
540 void initLocalData(Fiber& fiber);
541
542 /**
543 * Function passed to the await call.
544 */
545 folly::Function<void(Fiber&)> awaitFunc_;
546
547 /**
548 * Function passed to the runInMainContext call.
549 */
550 folly::Function<void()> immediateFunc_;
551
552 /**
553 * Preempt runner.
554 */
555 InlineFunctionRunner* preemptRunner_{nullptr};
556
557 /**
558 * Fiber's execution observer.
559 */
560 ExecutionObserver* observer_{nullptr};
561
562 ExceptionCallback exceptionCallback_; /**< task exception callback */
563
564 folly::AtomicIntrusiveLinkedList<Fiber, &Fiber::nextRemoteReady_>
565 remoteReadyQueue_;
566
567 folly::AtomicIntrusiveLinkedList<RemoteTask, &RemoteTask::nextRemoteTask>
568 remoteTaskQueue_;
569
570 ssize_t remoteCount_{0};
571
572 /**
573 * Number of uncaught exceptions when FiberManager loop was called.
574 */
575 ssize_t numUncaughtExceptions_{0};
576 /**
577 * Current exception when FiberManager loop was called.
578 */
579 std::exception_ptr currentException_;
580
581 class FibersPoolResizer final : private HHWheelTimer::Callback {
582 public:
FibersPoolResizer(FiberManager & fm)583 explicit FibersPoolResizer(FiberManager& fm) : fiberManager_(fm) {}
584 void run();
585
586 private:
587 FiberManager& fiberManager_;
timeoutExpired()588 void timeoutExpired() noexcept override { run(); }
callbackCanceled()589 void callbackCanceled() noexcept override {}
590 };
591
592 FibersPoolResizer fibersPoolResizer_;
593 bool fibersPoolResizerScheduled_{false};
594
595 void doFibersPoolResizing();
596
597 /**
598 * Only local of this type will be available for fibers.
599 */
600 std::type_index localType_;
601
602 void runReadyFiber(Fiber* fiber);
603 void remoteReadyInsert(Fiber* fiber);
604
605 // These methods notify ASAN when a fiber is entered/exited so that ASAN can
606 // find the right stack extents when it needs to poison/unpoison the stack.
607 void registerStartSwitchStackWithAsan(
608 void** saveFakeStack, const void* stackBase, size_t stackSize);
609 void registerFinishSwitchStackWithAsan(
610 void* fakeStack, const void** saveStackBase, size_t* saveStackSize);
611 void freeFakeStack(void* fakeStack);
612 void unpoisonFiberStack(const Fiber* fiber);
613
614 bool alternateSignalStackRegistered_{false};
615
616 void maybeRegisterAlternateSignalStack();
617 };
618
619 /**
620 * @return true iff we are running in a fiber's context
621 */
onFiber()622 inline bool onFiber() {
623 auto fm = FiberManager::getFiberManagerUnsafe();
624 return fm ? fm->hasActiveFiber() : false;
625 }
626
627 /**
628 * Add a new task to be executed.
629 *
630 * @param func Task functor; must have a signature of `void func()`.
631 * The object will be destroyed once task execution is complete.
632 */
633 template <typename F>
addTask(F && func)634 inline void addTask(F&& func) {
635 return FiberManager::getFiberManager().addTask(std::forward<F>(func));
636 }
637
638 template <typename F>
addTaskEager(F && func)639 inline void addTaskEager(F&& func) {
640 return FiberManager::getFiberManager().addTaskEager(std::forward<F>(func));
641 }
642
643 /**
644 * Add a new task. When the task is complete, execute finally(Try<Result>&&)
645 * on the main context.
646 * Task functor is run and destroyed on the fiber context.
647 * Finally functor is run and destroyed on the main context.
648 *
649 * @param func Task functor; must have a signature of `T func()` for some T.
650 * @param finally Finally functor; must have a signature of
651 * `void finally(Try<T>&&)` and will be passed
652 * the result of func() (including the exception if occurred).
653 */
654 template <typename F, typename G>
addTaskFinally(F && func,G && finally)655 inline void addTaskFinally(F&& func, G&& finally) {
656 return FiberManager::getFiberManager().addTaskFinally(
657 std::forward<F>(func), std::forward<G>(finally));
658 }
659
660 template <typename F, typename G>
addTaskFinallyEager(F && func,G && finally)661 inline void addTaskFinallyEager(F&& func, G&& finally) {
662 return FiberManager::getFiberManager().addTaskFinallyEager(
663 std::forward<F>(func), std::forward<G>(finally));
664 }
665
666 /**
667 * Blocks task execution until given promise is fulfilled.
668 *
669 * Calls function passing in a Promise<T>, which has to be fulfilled.
670 *
671 * @return data which was used to fulfill the promise.
672 */
673 template <typename F>
674 typename FirstArgOf<F>::type::value_type inline await_async(F&& func);
675 #if !defined(_MSC_VER)
676 template <typename F>
await(F && func)677 FOLLY_ERASE typename FirstArgOf<F>::type::value_type await(F&& func) {
678 return await_async(static_cast<F&&>(func));
679 }
680 #endif
681
682 /**
683 * If called from a fiber, immediately switches to the FiberManager's context
684 * and runs func(), going back to the Fiber's context after completion.
685 * Outside a fiber, just calls func() directly.
686 *
687 * @return value returned by func().
688 */
689 template <typename F>
690 invoke_result_t<F> inline runInMainContext(F&& func);
691
692 /**
693 * Returns a refference to a fiber-local context for given Fiber. Should be
694 * always called with the same T for each fiber. Fiber-local context is lazily
695 * default-constructed on first request.
696 * When new task is scheduled via addTask / addTaskRemote from a fiber its
697 * fiber-local context is copied into the new fiber.
698 */
699 template <typename T>
local()700 T& local() {
701 auto fm = FiberManager::getFiberManagerUnsafe();
702 if (fm) {
703 return fm->local<T>();
704 }
705 return FiberManager::localThread<T>();
706 }
707
yield()708 inline void yield() {
709 auto fm = FiberManager::getFiberManagerUnsafe();
710 if (fm) {
711 fm->yield();
712 } else {
713 std::this_thread::yield();
714 }
715 }
716 } // namespace fibers
717 } // namespace folly
718
719 #include <folly/fibers/FiberManagerInternal-inl.h>
720