1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #pragma once
18
19 #include <atomic>
20 #include <cerrno>
21 #include <cmath>
22 #include <cstdlib>
23 #include <functional>
24 #include <list>
25 #include <memory>
26 #include <queue>
27 #include <set>
28 #include <stack>
29 #include <unordered_map>
30 #include <unordered_set>
31 #include <utility>
32
33 #include <boost/intrusive/list.hpp>
34 #include <glog/logging.h>
35
36 #include <folly/Executor.h>
37 #include <folly/Function.h>
38 #include <folly/Memory.h>
39 #include <folly/Portability.h>
40 #include <folly/ScopeGuard.h>
41 #include <folly/Synchronized.h>
42 #include <folly/executors/DrivableExecutor.h>
43 #include <folly/executors/IOExecutor.h>
44 #include <folly/executors/ScheduledExecutor.h>
45 #include <folly/executors/SequencedExecutor.h>
46 #include <folly/experimental/ExecutionObserver.h>
47 #include <folly/io/async/AsyncTimeout.h>
48 #include <folly/io/async/HHWheelTimer.h>
49 #include <folly/io/async/Request.h>
50 #include <folly/io/async/TimeoutManager.h>
51 #include <folly/portability/Event.h>
52 #include <folly/synchronization/CallOnce.h>
53
54 namespace folly {
55 class EventBaseBackendBase;
56
57 using Cob = Func; // defined in folly/Executor.h
58
59 template <typename Task, typename Consumer>
60 class EventBaseAtomicNotificationQueue;
61 template <typename MessageT>
62 class NotificationQueue;
63
64 namespace detail {
65 class EventBaseLocalBase;
66
67 } // namespace detail
68 template <typename T>
69 class EventBaseLocal;
70
71 class EventBaseObserver {
72 public:
73 virtual ~EventBaseObserver() = default;
74
75 virtual uint32_t getSampleRate() const = 0;
76
77 virtual void loopSample(int64_t busyTime, int64_t idleTime) = 0;
78 };
79
80 // Helper class that sets and retrieves the EventBase associated with a given
81 // request via RequestContext. See Request.h for that mechanism.
82 class RequestEventBase : public RequestData {
83 public:
get()84 static EventBase* get() {
85 auto data = dynamic_cast<RequestEventBase*>(
86 RequestContext::get()->getContextData(token()));
87 if (!data) {
88 return nullptr;
89 }
90 return data->eb_;
91 }
92
set(EventBase * eb)93 static void set(EventBase* eb) {
94 RequestContext::get()->setContextData(
95 token(), std::unique_ptr<RequestEventBase>(new RequestEventBase(eb)));
96 }
97
hasCallback()98 bool hasCallback() override { return false; }
99
100 private:
token()101 FOLLY_EXPORT static RequestToken const& token() {
102 static RequestToken const token(kContextDataName);
103 return token;
104 }
105
RequestEventBase(EventBase * eb)106 explicit RequestEventBase(EventBase* eb) : eb_(eb) {}
107 EventBase* eb_;
108 static constexpr const char* kContextDataName{"EventBase"};
109 };
110
111 class VirtualEventBase;
112
113 /**
114 * This class is a wrapper for all asynchronous I/O processing functionality
115 *
116 * EventBase provides a main loop that notifies EventHandler callback objects
117 * when I/O is ready on a file descriptor, and notifies AsyncTimeout objects
118 * when a specified timeout has expired. More complex, higher-level callback
119 * mechanisms can then be built on top of EventHandler and AsyncTimeout.
120 *
121 * A EventBase object can only drive an event loop for a single thread. To
122 * take advantage of multiple CPU cores, most asynchronous I/O servers have one
123 * thread per CPU, and use a separate EventBase for each thread.
124 *
125 * In general, most EventBase methods may only be called from the thread
126 * running the EventBase's loop. There are a few exceptions to this rule, for
127 * methods that are explicitly intended to allow communication with a
128 * EventBase from other threads. When it is safe to call a method from
129 * another thread it is explicitly listed in the method comments.
130 */
131 class EventBase : public TimeoutManager,
132 public DrivableExecutor,
133 public IOExecutor,
134 public SequencedExecutor,
135 public ScheduledExecutor {
136 public:
137 friend class ScopedEventBaseThread;
138
139 using Func = folly::Function<void()>;
140
141 /**
142 * A callback interface to use with runInLoop()
143 *
144 * Derive from this class if you need to delay some code execution until the
145 * next iteration of the event loop. This allows you to schedule code to be
146 * invoked from the top-level of the loop, after your immediate callers have
147 * returned.
148 *
149 * If a LoopCallback object is destroyed while it is scheduled to be run in
150 * the next loop iteration, it will automatically be cancelled.
151 */
152 class LoopCallback
153 : public boost::intrusive::list_base_hook<
154 boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
155 public:
156 virtual ~LoopCallback() = default;
157
158 virtual void runLoopCallback() noexcept = 0;
cancelLoopCallback()159 void cancelLoopCallback() {
160 context_.reset();
161 unlink();
162 }
163
isLoopCallbackScheduled()164 bool isLoopCallbackScheduled() const { return is_linked(); }
165
166 private:
167 typedef boost::intrusive::
168 list<LoopCallback, boost::intrusive::constant_time_size<false>>
169 List;
170
171 // EventBase needs access to LoopCallbackList (and therefore to hook_)
172 friend class EventBase;
173 friend class VirtualEventBase;
174 std::shared_ptr<RequestContext> context_;
175 };
176
177 class FunctionLoopCallback : public LoopCallback {
178 public:
FunctionLoopCallback(Func && function)179 explicit FunctionLoopCallback(Func&& function)
180 : function_(std::move(function)) {}
181
runLoopCallback()182 void runLoopCallback() noexcept override {
183 function_();
184 delete this;
185 }
186
187 private:
188 Func function_;
189 };
190
191 // Like FunctionLoopCallback, but saves one allocation. Use with caution.
192 //
193 // The caller is responsible for maintaining the lifetime of this callback
194 // until after the point at which the contained function is called.
195 class StackFunctionLoopCallback : public LoopCallback {
196 public:
StackFunctionLoopCallback(Func && function)197 explicit StackFunctionLoopCallback(Func&& function)
198 : function_(std::move(function)) {}
runLoopCallback()199 void runLoopCallback() noexcept override { Func(std::move(function_))(); }
200
201 private:
202 Func function_;
203 };
204
205 // Base class for user callbacks to be run during EventBase destruction. As
206 // with LoopCallback, users may inherit from this class and provide a concrete
207 // implementation of onEventBaseDestruction(). (Alternatively, users may use
208 // the convenience method EventBase::runOnDestruction(Function<void()> f) to
209 // schedule a function f to be run on EventBase destruction.)
210 //
211 // The only thread-safety guarantees of OnDestructionCallback are as follows:
212 // - Users may call runOnDestruction() from any thread, provided the caller
213 // is the only user of the callback, i.e., the callback is not already
214 // scheduled and there are no concurrent calls to schedule or cancel the
215 // callback.
216 // - Users may safely cancel() from any thread. Multiple calls to cancel()
217 // may execute concurrently. The only caveat is that it is not safe to
218 // call cancel() within the onEventBaseDestruction() callback.
219 class OnDestructionCallback {
220 public:
221 OnDestructionCallback() = default;
222 OnDestructionCallback(OnDestructionCallback&&) = default;
223 OnDestructionCallback& operator=(OnDestructionCallback&&) = default;
224 virtual ~OnDestructionCallback();
225
226 // Attempt to cancel the callback. If the callback is running or has already
227 // finished running, cancellation will fail. If the callback is running when
228 // cancel() is called, cancel() will block until the callback completes.
229 bool cancel();
230
231 // Callback to be invoked during ~EventBase()
232 virtual void onEventBaseDestruction() noexcept = 0;
233
234 private:
235 boost::intrusive::list_member_hook<
236 boost::intrusive::link_mode<boost::intrusive::normal_link>>
237 listHook_;
238 Function<void(OnDestructionCallback&)> eraser_;
239 Synchronized<bool> scheduled_{in_place, false};
240
241 using List = boost::intrusive::list<
242 OnDestructionCallback,
243 boost::intrusive::member_hook<
244 OnDestructionCallback,
245 decltype(listHook_),
246 &OnDestructionCallback::listHook_>>;
247
248 void schedule(
249 FunctionRef<void(OnDestructionCallback&)> linker,
250 Function<void(OnDestructionCallback&)> eraser);
251
252 friend class EventBase;
253 friend class VirtualEventBase;
254
255 protected:
256 virtual void runCallback() noexcept;
257 };
258
259 class FunctionOnDestructionCallback : public OnDestructionCallback {
260 public:
FunctionOnDestructionCallback(Function<void ()> f)261 explicit FunctionOnDestructionCallback(Function<void()> f)
262 : f_(std::move(f)) {}
263
onEventBaseDestruction()264 void onEventBaseDestruction() noexcept final { f_(); }
265
266 protected:
runCallback()267 void runCallback() noexcept override {
268 OnDestructionCallback::runCallback();
269 delete this;
270 }
271
272 private:
273 Function<void()> f_;
274 };
275
276 struct Options {
OptionsOptions277 Options() {}
278
279 /**
280 * Skip measuring event base loop durations.
281 *
282 * Disabling it would likely improve performance, but will disable some
283 * features that rely on time-measurement, including: observer, max latency
284 * and avg loop time.
285 */
286 bool skipTimeMeasurement{false};
287
setSkipTimeMeasurementOptions288 Options& setSkipTimeMeasurement(bool skip) {
289 skipTimeMeasurement = skip;
290 return *this;
291 }
292
293 /**
294 * Factory function for creating the backend.
295 */
296 using BackendFactory =
297 folly::Function<std::unique_ptr<folly::EventBaseBackendBase>()>;
298 BackendFactory::SharedProxy backendFactory{nullptr};
299
setBackendFactoryOptions300 Options& setBackendFactory(BackendFactory factoryFn) {
301 backendFactory = std::move(factoryFn).asSharedProxy();
302 return *this;
303 }
304
305 /**
306 * Granularity of the wheel timer in the EventBase.
307 */
308 std::chrono::milliseconds timerTickInterval{
309 HHWheelTimer::DEFAULT_TICK_INTERVAL};
310
setTimerTickIntervalOptions311 Options& setTimerTickInterval(std::chrono::milliseconds interval) {
312 timerTickInterval = interval;
313 return *this;
314 }
315 };
316
317 /**
318 * Create a new EventBase object.
319 *
320 * Same as EventBase(true), which constructs an EventBase that measures time,
321 * except that this also allows the timer granularity to be specified
322 */
323
324 explicit EventBase(std::chrono::milliseconds tickInterval);
325
326 /**
327 * Create a new EventBase object.
328 *
329 * Same as EventBase(true), which constructs an EventBase that measures time.
330 */
EventBase()331 EventBase() : EventBase(true) {}
332
333 /**
334 * Create a new EventBase object.
335 *
336 * @param enableTimeMeasurement Informs whether this event base should measure
337 * time. Disabling it would likely improve
338 * performance, but will disable some features
339 * that relies on time-measurement, including:
340 * observer, max latency and avg loop time.
341 */
342 explicit EventBase(bool enableTimeMeasurement);
343
344 EventBase(const EventBase&) = delete;
345 EventBase& operator=(const EventBase&) = delete;
346
347 /**
348 * Create a new EventBase object that will use the specified libevent
349 * event_base object to drive the event loop.
350 *
351 * The EventBase will take ownership of this event_base, and will call
352 * event_base_free(evb) when the EventBase is destroyed.
353 *
354 * @param enableTimeMeasurement Informs whether this event base should measure
355 * time. Disabling it would likely improve
356 * performance, but will disable some features
357 * that relies on time-measurement, including:
358 * observer, max latency and avg loop time.
359 */
360 explicit EventBase(event_base* evb, bool enableTimeMeasurement = true);
361
362 explicit EventBase(Options options);
363 ~EventBase() override;
364
365 /**
366 * Runs the event loop.
367 *
368 * loop() will loop waiting for I/O or timeouts and invoking EventHandler
369 * and AsyncTimeout callbacks as their events become ready. loop() will
370 * only return when there are no more events remaining to process, or after
371 * terminateLoopSoon() has been called.
372 *
373 * loop() may be called again to restart event processing after a previous
374 * call to loop() or loopForever() has returned.
375 *
376 * Returns true if the loop completed normally (if it processed all
377 * outstanding requests, or if terminateLoopSoon() was called). If an error
378 * occurs waiting for events, false will be returned.
379 */
380 bool loop();
381
382 /**
383 * Same as loop(), but doesn't wait for all keep-alive tokens to be released.
384 */
385 [[deprecated("This should only be used in legacy unit tests")]] bool
386 loopIgnoreKeepAlive();
387
388 /**
389 * Wait for some events to become active, run them, then return.
390 *
391 * When EVLOOP_NONBLOCK is set in flags, the loop won't block if there
392 * are not any events to process.
393 *
394 * This is useful for callers that want to run the loop manually.
395 *
396 * Returns the same result as loop().
397 */
398 bool loopOnce(int flags = 0);
399
400 /**
401 * Runs the event loop.
402 *
403 * loopForever() behaves like loop(), except that it keeps running even if
404 * when there are no more user-supplied EventHandlers or AsyncTimeouts
405 * registered. It will only return after terminateLoopSoon() has been
406 * called.
407 *
408 * This is useful for callers that want to wait for other threads to call
409 * runInEventBaseThread(), even when there are no other scheduled events.
410 *
411 * loopForever() may be called again to restart event processing after a
412 * previous call to loop() or loopForever() has returned.
413 *
414 * Throws a std::system_error if an error occurs.
415 */
416 void loopForever();
417
418 /**
419 * Causes the event loop to exit soon.
420 *
421 * This will cause an existing call to loop() or loopForever() to stop event
422 * processing and return, even if there are still events remaining to be
423 * processed.
424 *
425 * It is safe to call terminateLoopSoon() from another thread to cause loop()
426 * to wake up and return in the EventBase loop thread. terminateLoopSoon()
427 * may also be called from the loop thread itself (for example, a
428 * EventHandler or AsyncTimeout callback may call terminateLoopSoon() to
429 * cause the loop to exit after the callback returns.) If the loop is not
430 * running, this will cause the next call to loop to terminate soon after
431 * starting. If a loop runs out of work (and so terminates on its own)
432 * concurrently with a call to terminateLoopSoon(), this may cause a race
433 * condition.
434 *
435 * Note that the caller is responsible for ensuring that cleanup of all event
436 * callbacks occurs properly. Since terminateLoopSoon() causes the loop to
437 * exit even when there are pending events present, there may be remaining
438 * callbacks present waiting to be invoked. If the loop is later restarted
439 * pending events will continue to be processed normally, however if the
440 * EventBase is destroyed after calling terminateLoopSoon() it is the
441 * caller's responsibility to ensure that cleanup happens properly even if
442 * some outstanding events are never processed.
443 */
444 void terminateLoopSoon();
445
446 /**
447 * Adds the given callback to a queue of things run after the current pass
448 * through the event loop completes. Note that if this callback calls
449 * runInLoop() the new callback won't be called until the main event loop
450 * has gone through a cycle.
451 *
452 * This method may only be called from the EventBase's thread. This
453 * essentially allows an event handler to schedule an additional callback to
454 * be invoked after it returns.
455 *
456 * Use runInEventBaseThread() to schedule functions from another thread.
457 *
458 * The thisIteration parameter makes this callback run in this loop
459 * iteration, instead of the next one, even if called from a
460 * runInLoop callback (normal io callbacks that call runInLoop will
461 * always run in this iteration). This was originally added to
462 * support detachEventBase, as a user callback may have called
463 * terminateLoopSoon(), but we want to make sure we detach. Also,
464 * detachEventBase almost always must be called from the base event
465 * loop to ensure the stack is unwound, since most users of
466 * EventBase are not thread safe.
467 *
468 * Ideally we would not need thisIteration, and instead just use
469 * runInLoop with loop() (instead of terminateLoopSoon).
470 */
471 void runInLoop(
472 LoopCallback* callback,
473 bool thisIteration = false,
474 std::shared_ptr<RequestContext> rctx = RequestContext::saveContext());
475
476 /**
477 * Convenience function to call runInLoop() with a folly::Function.
478 *
479 * This creates a LoopCallback object to wrap the folly::Function, and invoke
480 * the folly::Function when the loop callback fires. This is slightly more
481 * expensive than defining your own LoopCallback, but more convenient in
482 * areas that aren't too performance sensitive.
483 *
484 * This method may only be called from the EventBase's thread. This
485 * essentially allows an event handler to schedule an additional callback to
486 * be invoked after it returns.
487 *
488 * Use runInEventBaseThread() to schedule functions from another thread.
489 */
490 void runInLoop(Func c, bool thisIteration = false);
491
492 /**
493 * Adds the given callback to a queue of things run before destruction
494 * of current EventBase.
495 *
496 * This allows users of EventBase that run in it, but don't control it, to be
497 * notified before EventBase gets destructed.
498 *
499 * Note: will be called from the thread that invoked EventBase destructor,
500 * before the final run of loop callbacks.
501 */
502 void runOnDestruction(OnDestructionCallback& callback);
503
504 /**
505 * Convenience function that allows users to pass in a Function<void()> to be
506 * run on EventBase destruction.
507 */
508 void runOnDestruction(Func f);
509
510 /**
511 * Adds a callback that will run immediately *before* the event loop.
512 * This is very similar to runInLoop(), but will not cause the loop to break:
513 * For example, this callback could be used to get loop times.
514 */
515 void runBeforeLoop(LoopCallback* callback);
516
517 /**
518 * Run the specified function in the EventBase's thread.
519 *
520 * This method is thread-safe, and may be called from another thread.
521 *
522 * If runInEventBaseThread() is called when the EventBase loop is not
523 * running, the function call will be delayed until the next time the loop is
524 * started.
525 *
526 * If the loop is terminated (and never later restarted) before it has a
527 * chance to run the requested function, the function will be run upon the
528 * EventBase's destruction.
529 *
530 * If two calls to runInEventBaseThread() are made from the same thread, the
531 * functions will always be run in the order that they were scheduled.
532 * Ordering between functions scheduled from separate threads is not
533 * guaranteed.
534 *
535 * @param fn The function to run. The function must not throw any
536 * exceptions.
537 * @param arg An argument to pass to the function.
538 */
539 template <typename T>
540 void runInEventBaseThread(void (*fn)(T*), T* arg) noexcept;
541
542 /**
543 * Run the specified function in the EventBase's thread
544 *
545 * This version of runInEventBaseThread() takes a folly::Function object.
546 * Note that this may be less efficient than the version that takes a plain
547 * function pointer and void* argument, if moving the function is expensive
548 * (e.g., if it wraps a lambda which captures some values with expensive move
549 * constructors).
550 *
551 * If the loop is terminated (and never later restarted) before it has a
552 * chance to run the requested function, the function will be run upon the
553 * EventBase's destruction.
554 *
555 * The function must not throw any exceptions.
556 */
557 void runInEventBaseThread(Func fn) noexcept;
558
559 /**
560 * Run the specified function in the EventBase's thread.
561 *
562 * This method is thread-safe, and may be called from another thread.
563 *
564 * If runInEventBaseThreadAlwaysEnqueue() is called when the EventBase loop is
565 * not running, the function call will be delayed until the next time the loop
566 * is started.
567 *
568 * If the loop is terminated (and never later restarted) before it has a
569 * chance to run the requested function, the function will be run upon the
570 * EventBase's destruction.
571 *
572 * If two calls to runInEventBaseThreadAlwaysEnqueue() are made from the same
573 * thread, the functions will always be run in the order that they were
574 * scheduled. Ordering between functions scheduled from separate threads is
575 * not guaranteed. If a call is made from the EventBase thread, the function
576 * will not be executed inline and will be queued to the same queue as if the
577 * call would have been made from a different thread
578 *
579 * @param fn The function to run. The function must not throw any
580 * exceptions.
581 * @param arg An argument to pass to the function.
582 */
583 template <typename T>
584 void runInEventBaseThreadAlwaysEnqueue(void (*fn)(T*), T* arg) noexcept;
585
586 /**
587 * Run the specified function in the EventBase's thread
588 *
589 * This version of runInEventBaseThreadAlwaysEnqueue() takes a folly::Function
590 * object. Note that this may be less efficient than the version that takes a
591 * plain function pointer and void* argument, if moving the function is
592 * expensive (e.g., if it wraps a lambda which captures some values with
593 * expensive move constructors).
594 *
595 * If the loop is terminated (and never later restarted) before it has a
596 * chance to run the requested function, the function will be run upon the
597 * EventBase's destruction.
598 *
599 * The function must not throw any exceptions.
600 */
601 void runInEventBaseThreadAlwaysEnqueue(Func fn) noexcept;
602
603 /*
604 * Like runInEventBaseThread, but the caller waits for the callback to be
605 * executed.
606 */
607 template <typename T>
608 void runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) noexcept;
609
610 /*
611 * Like runInEventBaseThread, but the caller waits for the callback to be
612 * executed.
613 */
614 void runInEventBaseThreadAndWait(Func fn) noexcept;
615
616 /*
617 * Like runInEventBaseThreadAndWait, except if the caller is already in the
618 * event base thread, the functor is simply run inline.
619 */
620 template <typename T>
621 void runImmediatelyOrRunInEventBaseThreadAndWait(
622 void (*fn)(T*), T* arg) noexcept;
623
624 /*
625 * Like runInEventBaseThreadAndWait, except if the caller is already in the
626 * event base thread, the functor is simply run inline.
627 */
628 void runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) noexcept;
629
630 /**
631 * Set the maximum desired latency in us and provide a callback which will be
632 * called when that latency is exceeded.
633 * OBS: This functionality depends on time-measurement.
634 */
635 void setMaxLatency(
636 std::chrono::microseconds maxLatency,
637 Func maxLatencyCob,
638 bool dampen = true) {
639 assert(enableTimeMeasurement_);
640 maxLatency_ = maxLatency;
641 maxLatencyCob_ = std::move(maxLatencyCob);
642 dampenMaxLatency_ = dampen;
643 }
644
645 /**
646 * Set smoothing coefficient for loop load average; # of milliseconds
647 * for exp(-1) (1/2.71828...) decay.
648 */
649 void setLoadAvgMsec(std::chrono::milliseconds ms);
650
651 /**
652 * reset the load average to a desired value
653 */
654 void resetLoadAvg(double value = 0.0);
655
656 /**
657 * Get the average loop time in microseconds (an exponentially-smoothed ave)
658 */
getAvgLoopTime()659 double getAvgLoopTime() const {
660 assert(enableTimeMeasurement_);
661 return avgLoopTime_.get();
662 }
663
664 /**
665 * Check if the event base loop is running.
666 *
667 * This may only be used as a sanity check mechanism; it cannot be used to
668 * make any decisions; for that, consider waitUntilRunning().
669 */
isRunning()670 bool isRunning() const {
671 return loopThread_.load(std::memory_order_relaxed) != std::thread::id();
672 }
673
674 /**
675 * wait until the event loop starts (after starting the event loop thread).
676 */
677 void waitUntilRunning();
678
679 size_t getNotificationQueueSize() const;
680
681 void setMaxReadAtOnce(uint32_t maxAtOnce);
682
683 /**
684 * Verify that current thread is the EventBase thread, if the EventBase is
685 * running.
686 */
isInEventBaseThread()687 bool isInEventBaseThread() const {
688 auto tid = loopThread_.load(std::memory_order_relaxed);
689 return tid == std::thread::id() || tid == std::this_thread::get_id();
690 }
691
inRunningEventBaseThread()692 bool inRunningEventBaseThread() const {
693 return loopThread_.load(std::memory_order_relaxed) ==
694 std::this_thread::get_id();
695 }
696
697 /**
698 * Equivalent to CHECK(isInEventBaseThread()) (and assert/DCHECK for
699 * dcheckIsInEventBaseThread), but it prints more information on
700 * failure.
701 */
702 void checkIsInEventBaseThread() const;
dcheckIsInEventBaseThread()703 void dcheckIsInEventBaseThread() const {
704 if (kIsDebug) {
705 checkIsInEventBaseThread();
706 }
707 }
708
timer()709 HHWheelTimer& timer() {
710 if (!wheelTimer_) {
711 wheelTimer_ = HHWheelTimer::newTimer(this, intervalDuration_);
712 }
713 return *wheelTimer_.get();
714 }
715
getBackend()716 EventBaseBackendBase* getBackend() { return evb_.get(); }
717 // --------- interface to underlying libevent base ------------
718 // Avoid using these functions if possible. These functions are not
719 // guaranteed to always be present if we ever provide alternative EventBase
720 // implementations that do not use libevent internally.
721 event_base* getLibeventBase() const;
722
723 static const char* getLibeventVersion();
724 const char* getLibeventMethod();
725
726 /**
727 * only EventHandler/AsyncTimeout subclasses and ourselves should
728 * ever call this.
729 *
730 * This is used to mark the beginning of a new loop cycle by the
731 * first handler fired within that cycle.
732 *
733 */
734 void bumpHandlingTime() final;
735
736 class SmoothLoopTime {
737 public:
SmoothLoopTime(std::chrono::microseconds timeInterval)738 explicit SmoothLoopTime(std::chrono::microseconds timeInterval)
739 : expCoeff_(-1.0 / static_cast<double>(timeInterval.count())),
740 value_(0.0) {
741 VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
742 }
743
744 void setTimeInterval(std::chrono::microseconds timeInterval);
745 void reset(double value = 0.0);
746
747 void addSample(
748 std::chrono::microseconds total, std::chrono::microseconds busy);
749
get()750 double get() const {
751 // Add the outstanding buffered times linearly, to avoid
752 // expensive exponentiation
753 auto lcoeff = static_cast<double>(buffer_time_.count()) * -expCoeff_;
754 return value_ * (1.0 - lcoeff) +
755 lcoeff * static_cast<double>(busy_buffer_.count());
756 }
757
dampen(double factor)758 void dampen(double factor) { value_ *= factor; }
759
760 private:
761 double expCoeff_;
762 double value_;
763 std::chrono::microseconds buffer_time_{0};
764 std::chrono::microseconds busy_buffer_{0};
765 std::size_t buffer_cnt_{0};
766 static constexpr std::chrono::milliseconds buffer_interval_{10};
767 };
768
setObserver(const std::shared_ptr<EventBaseObserver> & observer)769 void setObserver(const std::shared_ptr<EventBaseObserver>& observer) {
770 assert(enableTimeMeasurement_);
771 observer_ = observer;
772 }
773
getObserver()774 const std::shared_ptr<EventBaseObserver>& getObserver() { return observer_; }
775
776 /**
777 * Setup execution observation/instrumentation for every EventHandler
778 * executed in this EventBase.
779 *
780 * @param observer EventHandle's execution observer.
781 */
setExecutionObserver(ExecutionObserver * observer)782 void setExecutionObserver(ExecutionObserver* observer) {
783 executionObserver_ = observer;
784 }
785
786 /**
787 * Gets the execution observer associated with this EventBase.
788 */
getExecutionObserver()789 ExecutionObserver* getExecutionObserver() { return executionObserver_; }
790
791 /**
792 * Set the name of the thread that runs this event base.
793 */
794 void setName(const std::string& name);
795
796 /**
797 * Returns the name of the thread that runs this event base.
798 */
799 const std::string& getName();
800
801 /// Implements the Executor interface
add(Cob fn)802 void add(Cob fn) override { runInEventBaseThread(std::move(fn)); }
803
804 /// Implements the DrivableExecutor interface
drive()805 void drive() override {
806 ++loopKeepAliveCount_;
807 SCOPE_EXIT { --loopKeepAliveCount_; };
808 loopOnce();
809 }
810
811 // Implements the ScheduledExecutor interface
812 void scheduleAt(Func&& fn, TimePoint const& timeout) override;
813
814 // TimeoutManager
815 void attachTimeoutManager(
816 AsyncTimeout* obj, TimeoutManager::InternalEnum internal) final;
817
818 void detachTimeoutManager(AsyncTimeout* obj) final;
819
820 bool scheduleTimeout(
821 AsyncTimeout* obj, TimeoutManager::timeout_type timeout) final;
822
823 void cancelTimeout(AsyncTimeout* obj) final;
824
isInTimeoutManagerThread()825 bool isInTimeoutManagerThread() final { return isInEventBaseThread(); }
826
827 // Returns a VirtualEventBase attached to this EventBase. Can be used to
828 // pass to APIs which expect VirtualEventBase. This VirtualEventBase will be
829 // destroyed together with the EventBase.
830 //
831 // Any number of VirtualEventBases instances may be independently constructed,
832 // which are backed by this EventBase. This method should be only used if you
833 // don't need to manage the life time of the VirtualEventBase used.
834 folly::VirtualEventBase& getVirtualEventBase();
835
836 /// Implements the IOExecutor interface
837 EventBase* getEventBase() override;
838
839 static std::unique_ptr<EventBaseBackendBase> getDefaultBackend();
840
841 protected:
keepAliveAcquire()842 bool keepAliveAcquire() noexcept override {
843 if (inRunningEventBaseThread()) {
844 loopKeepAliveCount_++;
845 } else {
846 loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
847 }
848 return true;
849 }
850
keepAliveRelease()851 void keepAliveRelease() noexcept override {
852 if (!inRunningEventBaseThread()) {
853 return add([this] { loopKeepAliveCount_--; });
854 }
855 loopKeepAliveCount_--;
856 }
857
858 private:
859 class FuncRunner;
860
861 folly::VirtualEventBase* tryGetVirtualEventBase();
862
863 void applyLoopKeepAlive();
864
865 ssize_t loopKeepAliveCount();
866
867 /*
868 * Helper function that tells us whether we have already handled
869 * some event/timeout/callback in this loop iteration.
870 */
871 bool nothingHandledYet() const noexcept;
872
873 typedef LoopCallback::List LoopCallbackList;
874
875 bool loopBody(int flags = 0, bool ignoreKeepAlive = false);
876
877 void runLoopCallbacks(LoopCallbackList& currentCallbacks);
878
879 // executes any callbacks queued by runInLoop(); returns false if none found
880 bool runLoopCallbacks();
881
882 void initNotificationQueue();
883
884 // Tick granularity to wheelTimer_
885 std::chrono::milliseconds intervalDuration_{
886 HHWheelTimer::DEFAULT_TICK_INTERVAL};
887 // should only be accessed through public getter
888 HHWheelTimer::UniquePtr wheelTimer_;
889
890 LoopCallbackList loopCallbacks_;
891 LoopCallbackList runBeforeLoopCallbacks_;
892 Synchronized<OnDestructionCallback::List> onDestructionCallbacks_;
893
894 // This will be null most of the time, but point to currentCallbacks
895 // if we are in the middle of running loop callbacks, such that
896 // runInLoop(..., true) will always run in the current loop
897 // iteration.
898 LoopCallbackList* runOnceCallbacks_;
899
900 // stop_ is set by terminateLoopSoon() and is used by the main loop
901 // to determine if it should exit
902 std::atomic<bool> stop_;
903
904 // The ID of the thread running the main loop.
905 // std::thread::id{} if loop is not running.
906 std::atomic<std::thread::id> loopThread_;
907
908 // A notification queue for runInEventBaseThread() to use
909 // to send function requests to the EventBase thread.
910 std::unique_ptr<EventBaseAtomicNotificationQueue<Func, FuncRunner>> queue_;
911 ssize_t loopKeepAliveCount_{0};
912 std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
913 bool loopKeepAliveActive_{false};
914
915 // limit for latency in microseconds (0 disables)
916 std::chrono::microseconds maxLatency_;
917
918 // exponentially-smoothed average loop time for latency-limiting
919 SmoothLoopTime avgLoopTime_;
920
921 // smoothed loop time used to invoke latency callbacks; differs from
922 // avgLoopTime_ in that it's scaled down after triggering a callback
923 // to reduce spamminess
924 SmoothLoopTime maxLatencyLoopTime_;
925
926 // If true, max latency callbacks will use a dampened SmoothLoopTime, else
927 // they'll use the raw loop time.
928 bool dampenMaxLatency_ = true;
929
930 // callback called when latency limit is exceeded
931 Func maxLatencyCob_;
932
933 // Enables/disables time measurements in loopBody(). if disabled, the
934 // following functionality that relies on time-measurement, will not
935 // be supported: avg loop time, observer and max latency.
936 const bool enableTimeMeasurement_;
937
938 // Wrap-around loop counter to detect beginning of each loop
939 std::size_t nextLoopCnt_;
940 std::size_t latestLoopCnt_;
941 std::chrono::steady_clock::time_point startWork_;
942 // Prevent undefined behavior from invoking event_base_loop() reentrantly.
943 // This is needed since many projects use libevent-1.4, which lacks commit
944 // b557b175c00dc462c1fce25f6e7dd67121d2c001 from
945 // https://github.com/libevent/libevent/.
946 bool invokingLoop_{false};
947
948 // Observer to export counters
949 std::shared_ptr<EventBaseObserver> observer_;
950 uint32_t observerSampleCount_;
951
952 // EventHandler's execution observer.
953 ExecutionObserver* executionObserver_;
954
955 // Name of the thread running this EventBase
956 std::string name_;
957
958 // see EventBaseLocal
959 friend class detail::EventBaseLocalBase;
960 template <typename T>
961 friend class EventBaseLocal;
962 std::unordered_map<std::size_t, erased_unique_ptr> localStorage_;
963 folly::Synchronized<std::unordered_set<detail::EventBaseLocalBase*>>
964 localStorageToDtor_;
965 bool tryDeregister(detail::EventBaseLocalBase&);
966
967 folly::once_flag virtualEventBaseInitFlag_;
968 std::unique_ptr<VirtualEventBase> virtualEventBase_;
969
970 // pointer to underlying backend class doing the heavy lifting
971 std::unique_ptr<EventBaseBackendBase> evb_;
972 };
973
974 template <typename T>
runInEventBaseThread(void (* fn)(T *),T * arg)975 void EventBase::runInEventBaseThread(void (*fn)(T*), T* arg) noexcept {
976 return runInEventBaseThread([=] { fn(arg); });
977 }
978
979 template <typename T>
runInEventBaseThreadAlwaysEnqueue(void (* fn)(T *),T * arg)980 void EventBase::runInEventBaseThreadAlwaysEnqueue(
981 void (*fn)(T*), T* arg) noexcept {
982 return runInEventBaseThreadAlwaysEnqueue([=] { fn(arg); });
983 }
984
985 template <typename T>
runInEventBaseThreadAndWait(void (* fn)(T *),T * arg)986 void EventBase::runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) noexcept {
987 return runInEventBaseThreadAndWait([=] { fn(arg); });
988 }
989
990 template <typename T>
runImmediatelyOrRunInEventBaseThreadAndWait(void (* fn)(T *),T * arg)991 void EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(
992 void (*fn)(T*), T* arg) noexcept {
993 return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); });
994 }
995
996 } // namespace folly
997