1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <atomic>
20 #include <cerrno>
21 #include <cmath>
22 #include <cstdlib>
23 #include <functional>
24 #include <list>
25 #include <memory>
26 #include <queue>
27 #include <set>
28 #include <stack>
29 #include <unordered_map>
30 #include <unordered_set>
31 #include <utility>
32 
33 #include <boost/intrusive/list.hpp>
34 #include <glog/logging.h>
35 
36 #include <folly/Executor.h>
37 #include <folly/Function.h>
38 #include <folly/Memory.h>
39 #include <folly/Portability.h>
40 #include <folly/ScopeGuard.h>
41 #include <folly/Synchronized.h>
42 #include <folly/executors/DrivableExecutor.h>
43 #include <folly/executors/IOExecutor.h>
44 #include <folly/executors/ScheduledExecutor.h>
45 #include <folly/executors/SequencedExecutor.h>
46 #include <folly/experimental/ExecutionObserver.h>
47 #include <folly/io/async/AsyncTimeout.h>
48 #include <folly/io/async/HHWheelTimer.h>
49 #include <folly/io/async/Request.h>
50 #include <folly/io/async/TimeoutManager.h>
51 #include <folly/portability/Event.h>
52 #include <folly/synchronization/CallOnce.h>
53 
54 namespace folly {
55 class EventBaseBackendBase;
56 
57 using Cob = Func; // defined in folly/Executor.h
58 
59 template <typename Task, typename Consumer>
60 class EventBaseAtomicNotificationQueue;
61 template <typename MessageT>
62 class NotificationQueue;
63 
64 namespace detail {
65 class EventBaseLocalBase;
66 
67 } // namespace detail
68 template <typename T>
69 class EventBaseLocal;
70 
71 class EventBaseObserver {
72  public:
73   virtual ~EventBaseObserver() = default;
74 
75   virtual uint32_t getSampleRate() const = 0;
76 
77   virtual void loopSample(int64_t busyTime, int64_t idleTime) = 0;
78 };
79 
80 // Helper class that sets and retrieves the EventBase associated with a given
81 // request via RequestContext. See Request.h for that mechanism.
82 class RequestEventBase : public RequestData {
83  public:
get()84   static EventBase* get() {
85     auto data = dynamic_cast<RequestEventBase*>(
86         RequestContext::get()->getContextData(token()));
87     if (!data) {
88       return nullptr;
89     }
90     return data->eb_;
91   }
92 
set(EventBase * eb)93   static void set(EventBase* eb) {
94     RequestContext::get()->setContextData(
95         token(), std::unique_ptr<RequestEventBase>(new RequestEventBase(eb)));
96   }
97 
hasCallback()98   bool hasCallback() override { return false; }
99 
100  private:
token()101   FOLLY_EXPORT static RequestToken const& token() {
102     static RequestToken const token(kContextDataName);
103     return token;
104   }
105 
RequestEventBase(EventBase * eb)106   explicit RequestEventBase(EventBase* eb) : eb_(eb) {}
107   EventBase* eb_;
108   static constexpr const char* kContextDataName{"EventBase"};
109 };
110 
111 class VirtualEventBase;
112 
113 /**
114  * This class is a wrapper for all asynchronous I/O processing functionality
115  *
116  * EventBase provides a main loop that notifies EventHandler callback objects
117  * when I/O is ready on a file descriptor, and notifies AsyncTimeout objects
118  * when a specified timeout has expired.  More complex, higher-level callback
119  * mechanisms can then be built on top of EventHandler and AsyncTimeout.
120  *
121  * A EventBase object can only drive an event loop for a single thread.  To
122  * take advantage of multiple CPU cores, most asynchronous I/O servers have one
123  * thread per CPU, and use a separate EventBase for each thread.
124  *
125  * In general, most EventBase methods may only be called from the thread
126  * running the EventBase's loop.  There are a few exceptions to this rule, for
127  * methods that are explicitly intended to allow communication with a
128  * EventBase from other threads.  When it is safe to call a method from
129  * another thread it is explicitly listed in the method comments.
130  */
131 class EventBase : public TimeoutManager,
132                   public DrivableExecutor,
133                   public IOExecutor,
134                   public SequencedExecutor,
135                   public ScheduledExecutor {
136  public:
137   friend class ScopedEventBaseThread;
138 
139   using Func = folly::Function<void()>;
140 
141   /**
142    * A callback interface to use with runInLoop()
143    *
144    * Derive from this class if you need to delay some code execution until the
145    * next iteration of the event loop.  This allows you to schedule code to be
146    * invoked from the top-level of the loop, after your immediate callers have
147    * returned.
148    *
149    * If a LoopCallback object is destroyed while it is scheduled to be run in
150    * the next loop iteration, it will automatically be cancelled.
151    */
152   class LoopCallback
153       : public boost::intrusive::list_base_hook<
154             boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
155    public:
156     virtual ~LoopCallback() = default;
157 
158     virtual void runLoopCallback() noexcept = 0;
cancelLoopCallback()159     void cancelLoopCallback() {
160       context_.reset();
161       unlink();
162     }
163 
isLoopCallbackScheduled()164     bool isLoopCallbackScheduled() const { return is_linked(); }
165 
166    private:
167     typedef boost::intrusive::
168         list<LoopCallback, boost::intrusive::constant_time_size<false>>
169             List;
170 
171     // EventBase needs access to LoopCallbackList (and therefore to hook_)
172     friend class EventBase;
173     friend class VirtualEventBase;
174     std::shared_ptr<RequestContext> context_;
175   };
176 
177   class FunctionLoopCallback : public LoopCallback {
178    public:
FunctionLoopCallback(Func && function)179     explicit FunctionLoopCallback(Func&& function)
180         : function_(std::move(function)) {}
181 
runLoopCallback()182     void runLoopCallback() noexcept override {
183       function_();
184       delete this;
185     }
186 
187    private:
188     Func function_;
189   };
190 
191   // Like FunctionLoopCallback, but saves one allocation. Use with caution.
192   //
193   // The caller is responsible for maintaining the lifetime of this callback
194   // until after the point at which the contained function is called.
195   class StackFunctionLoopCallback : public LoopCallback {
196    public:
StackFunctionLoopCallback(Func && function)197     explicit StackFunctionLoopCallback(Func&& function)
198         : function_(std::move(function)) {}
runLoopCallback()199     void runLoopCallback() noexcept override { Func(std::move(function_))(); }
200 
201    private:
202     Func function_;
203   };
204 
205   // Base class for user callbacks to be run during EventBase destruction. As
206   // with LoopCallback, users may inherit from this class and provide a concrete
207   // implementation of onEventBaseDestruction(). (Alternatively, users may use
208   // the convenience method EventBase::runOnDestruction(Function<void()> f) to
209   // schedule a function f to be run on EventBase destruction.)
210   //
211   // The only thread-safety guarantees of OnDestructionCallback are as follows:
212   //   - Users may call runOnDestruction() from any thread, provided the caller
213   //     is the only user of the callback, i.e., the callback is not already
214   //     scheduled and there are no concurrent calls to schedule or cancel the
215   //     callback.
216   //   - Users may safely cancel() from any thread. Multiple calls to cancel()
217   //     may execute concurrently. The only caveat is that it is not safe to
218   //     call cancel() within the onEventBaseDestruction() callback.
219   class OnDestructionCallback {
220    public:
221     OnDestructionCallback() = default;
222     OnDestructionCallback(OnDestructionCallback&&) = default;
223     OnDestructionCallback& operator=(OnDestructionCallback&&) = default;
224     virtual ~OnDestructionCallback();
225 
226     // Attempt to cancel the callback. If the callback is running or has already
227     // finished running, cancellation will fail. If the callback is running when
228     // cancel() is called, cancel() will block until the callback completes.
229     bool cancel();
230 
231     // Callback to be invoked during ~EventBase()
232     virtual void onEventBaseDestruction() noexcept = 0;
233 
234    private:
235     boost::intrusive::list_member_hook<
236         boost::intrusive::link_mode<boost::intrusive::normal_link>>
237         listHook_;
238     Function<void(OnDestructionCallback&)> eraser_;
239     Synchronized<bool> scheduled_{in_place, false};
240 
241     using List = boost::intrusive::list<
242         OnDestructionCallback,
243         boost::intrusive::member_hook<
244             OnDestructionCallback,
245             decltype(listHook_),
246             &OnDestructionCallback::listHook_>>;
247 
248     void schedule(
249         FunctionRef<void(OnDestructionCallback&)> linker,
250         Function<void(OnDestructionCallback&)> eraser);
251 
252     friend class EventBase;
253     friend class VirtualEventBase;
254 
255    protected:
256     virtual void runCallback() noexcept;
257   };
258 
259   class FunctionOnDestructionCallback : public OnDestructionCallback {
260    public:
FunctionOnDestructionCallback(Function<void ()> f)261     explicit FunctionOnDestructionCallback(Function<void()> f)
262         : f_(std::move(f)) {}
263 
onEventBaseDestruction()264     void onEventBaseDestruction() noexcept final { f_(); }
265 
266    protected:
runCallback()267     void runCallback() noexcept override {
268       OnDestructionCallback::runCallback();
269       delete this;
270     }
271 
272    private:
273     Function<void()> f_;
274   };
275 
276   struct Options {
OptionsOptions277     Options() {}
278 
279     /**
280      * Skip measuring event base loop durations.
281      *
282      * Disabling it would likely improve performance, but will disable some
283      * features that rely on time-measurement, including: observer, max latency
284      * and avg loop time.
285      */
286     bool skipTimeMeasurement{false};
287 
setSkipTimeMeasurementOptions288     Options& setSkipTimeMeasurement(bool skip) {
289       skipTimeMeasurement = skip;
290       return *this;
291     }
292 
293     /**
294      * Factory function for creating the backend.
295      */
296     using BackendFactory =
297         folly::Function<std::unique_ptr<folly::EventBaseBackendBase>()>;
298     BackendFactory::SharedProxy backendFactory{nullptr};
299 
setBackendFactoryOptions300     Options& setBackendFactory(BackendFactory factoryFn) {
301       backendFactory = std::move(factoryFn).asSharedProxy();
302       return *this;
303     }
304 
305     /**
306      * Granularity of the wheel timer in the EventBase.
307      */
308     std::chrono::milliseconds timerTickInterval{
309         HHWheelTimer::DEFAULT_TICK_INTERVAL};
310 
setTimerTickIntervalOptions311     Options& setTimerTickInterval(std::chrono::milliseconds interval) {
312       timerTickInterval = interval;
313       return *this;
314     }
315   };
316 
317   /**
318    * Create a new EventBase object.
319    *
320    * Same as EventBase(true), which constructs an EventBase that measures time,
321    * except that this also allows the timer granularity to be specified
322    */
323 
324   explicit EventBase(std::chrono::milliseconds tickInterval);
325 
326   /**
327    * Create a new EventBase object.
328    *
329    * Same as EventBase(true), which constructs an EventBase that measures time.
330    */
EventBase()331   EventBase() : EventBase(true) {}
332 
333   /**
334    * Create a new EventBase object.
335    *
336    * @param enableTimeMeasurement Informs whether this event base should measure
337    *                              time. Disabling it would likely improve
338    *                              performance, but will disable some features
339    *                              that relies on time-measurement, including:
340    *                              observer, max latency and avg loop time.
341    */
342   explicit EventBase(bool enableTimeMeasurement);
343 
344   EventBase(const EventBase&) = delete;
345   EventBase& operator=(const EventBase&) = delete;
346 
347   /**
348    * Create a new EventBase object that will use the specified libevent
349    * event_base object to drive the event loop.
350    *
351    * The EventBase will take ownership of this event_base, and will call
352    * event_base_free(evb) when the EventBase is destroyed.
353    *
354    * @param enableTimeMeasurement Informs whether this event base should measure
355    *                              time. Disabling it would likely improve
356    *                              performance, but will disable some features
357    *                              that relies on time-measurement, including:
358    *                              observer, max latency and avg loop time.
359    */
360   explicit EventBase(event_base* evb, bool enableTimeMeasurement = true);
361 
362   explicit EventBase(Options options);
363   ~EventBase() override;
364 
365   /**
366    * Runs the event loop.
367    *
368    * loop() will loop waiting for I/O or timeouts and invoking EventHandler
369    * and AsyncTimeout callbacks as their events become ready.  loop() will
370    * only return when there are no more events remaining to process, or after
371    * terminateLoopSoon() has been called.
372    *
373    * loop() may be called again to restart event processing after a previous
374    * call to loop() or loopForever() has returned.
375    *
376    * Returns true if the loop completed normally (if it processed all
377    * outstanding requests, or if terminateLoopSoon() was called).  If an error
378    * occurs waiting for events, false will be returned.
379    */
380   bool loop();
381 
382   /**
383    * Same as loop(), but doesn't wait for all keep-alive tokens to be released.
384    */
385   [[deprecated("This should only be used in legacy unit tests")]] bool
386   loopIgnoreKeepAlive();
387 
388   /**
389    * Wait for some events to become active, run them, then return.
390    *
391    * When EVLOOP_NONBLOCK is set in flags, the loop won't block if there
392    * are not any events to process.
393    *
394    * This is useful for callers that want to run the loop manually.
395    *
396    * Returns the same result as loop().
397    */
398   bool loopOnce(int flags = 0);
399 
400   /**
401    * Runs the event loop.
402    *
403    * loopForever() behaves like loop(), except that it keeps running even if
404    * when there are no more user-supplied EventHandlers or AsyncTimeouts
405    * registered.  It will only return after terminateLoopSoon() has been
406    * called.
407    *
408    * This is useful for callers that want to wait for other threads to call
409    * runInEventBaseThread(), even when there are no other scheduled events.
410    *
411    * loopForever() may be called again to restart event processing after a
412    * previous call to loop() or loopForever() has returned.
413    *
414    * Throws a std::system_error if an error occurs.
415    */
416   void loopForever();
417 
418   /**
419    * Causes the event loop to exit soon.
420    *
421    * This will cause an existing call to loop() or loopForever() to stop event
422    * processing and return, even if there are still events remaining to be
423    * processed.
424    *
425    * It is safe to call terminateLoopSoon() from another thread to cause loop()
426    * to wake up and return in the EventBase loop thread.  terminateLoopSoon()
427    * may also be called from the loop thread itself (for example, a
428    * EventHandler or AsyncTimeout callback may call terminateLoopSoon() to
429    * cause the loop to exit after the callback returns.)  If the loop is not
430    * running, this will cause the next call to loop to terminate soon after
431    * starting.  If a loop runs out of work (and so terminates on its own)
432    * concurrently with a call to terminateLoopSoon(), this may cause a race
433    * condition.
434    *
435    * Note that the caller is responsible for ensuring that cleanup of all event
436    * callbacks occurs properly.  Since terminateLoopSoon() causes the loop to
437    * exit even when there are pending events present, there may be remaining
438    * callbacks present waiting to be invoked.  If the loop is later restarted
439    * pending events will continue to be processed normally, however if the
440    * EventBase is destroyed after calling terminateLoopSoon() it is the
441    * caller's responsibility to ensure that cleanup happens properly even if
442    * some outstanding events are never processed.
443    */
444   void terminateLoopSoon();
445 
446   /**
447    * Adds the given callback to a queue of things run after the current pass
448    * through the event loop completes.  Note that if this callback calls
449    * runInLoop() the new callback won't be called until the main event loop
450    * has gone through a cycle.
451    *
452    * This method may only be called from the EventBase's thread.  This
453    * essentially allows an event handler to schedule an additional callback to
454    * be invoked after it returns.
455    *
456    * Use runInEventBaseThread() to schedule functions from another thread.
457    *
458    * The thisIteration parameter makes this callback run in this loop
459    * iteration, instead of the next one, even if called from a
460    * runInLoop callback (normal io callbacks that call runInLoop will
461    * always run in this iteration).  This was originally added to
462    * support detachEventBase, as a user callback may have called
463    * terminateLoopSoon(), but we want to make sure we detach.  Also,
464    * detachEventBase almost always must be called from the base event
465    * loop to ensure the stack is unwound, since most users of
466    * EventBase are not thread safe.
467    *
468    * Ideally we would not need thisIteration, and instead just use
469    * runInLoop with loop() (instead of terminateLoopSoon).
470    */
471   void runInLoop(
472       LoopCallback* callback,
473       bool thisIteration = false,
474       std::shared_ptr<RequestContext> rctx = RequestContext::saveContext());
475 
476   /**
477    * Convenience function to call runInLoop() with a folly::Function.
478    *
479    * This creates a LoopCallback object to wrap the folly::Function, and invoke
480    * the folly::Function when the loop callback fires.  This is slightly more
481    * expensive than defining your own LoopCallback, but more convenient in
482    * areas that aren't too performance sensitive.
483    *
484    * This method may only be called from the EventBase's thread.  This
485    * essentially allows an event handler to schedule an additional callback to
486    * be invoked after it returns.
487    *
488    * Use runInEventBaseThread() to schedule functions from another thread.
489    */
490   void runInLoop(Func c, bool thisIteration = false);
491 
492   /**
493    * Adds the given callback to a queue of things run before destruction
494    * of current EventBase.
495    *
496    * This allows users of EventBase that run in it, but don't control it, to be
497    * notified before EventBase gets destructed.
498    *
499    * Note: will be called from the thread that invoked EventBase destructor,
500    *       before the final run of loop callbacks.
501    */
502   void runOnDestruction(OnDestructionCallback& callback);
503 
504   /**
505    * Convenience function that allows users to pass in a Function<void()> to be
506    * run on EventBase destruction.
507    */
508   void runOnDestruction(Func f);
509 
510   /**
511    * Adds a callback that will run immediately *before* the event loop.
512    * This is very similar to runInLoop(), but will not cause the loop to break:
513    * For example, this callback could be used to get loop times.
514    */
515   void runBeforeLoop(LoopCallback* callback);
516 
517   /**
518    * Run the specified function in the EventBase's thread.
519    *
520    * This method is thread-safe, and may be called from another thread.
521    *
522    * If runInEventBaseThread() is called when the EventBase loop is not
523    * running, the function call will be delayed until the next time the loop is
524    * started.
525    *
526    * If the loop is terminated (and never later restarted) before it has a
527    * chance to run the requested function, the function will be run upon the
528    * EventBase's destruction.
529    *
530    * If two calls to runInEventBaseThread() are made from the same thread, the
531    * functions will always be run in the order that they were scheduled.
532    * Ordering between functions scheduled from separate threads is not
533    * guaranteed.
534    *
535    * @param fn  The function to run.  The function must not throw any
536    *     exceptions.
537    * @param arg An argument to pass to the function.
538    */
539   template <typename T>
540   void runInEventBaseThread(void (*fn)(T*), T* arg) noexcept;
541 
542   /**
543    * Run the specified function in the EventBase's thread
544    *
545    * This version of runInEventBaseThread() takes a folly::Function object.
546    * Note that this may be less efficient than the version that takes a plain
547    * function pointer and void* argument, if moving the function is expensive
548    * (e.g., if it wraps a lambda which captures some values with expensive move
549    * constructors).
550    *
551    * If the loop is terminated (and never later restarted) before it has a
552    * chance to run the requested function, the function will be run upon the
553    * EventBase's destruction.
554    *
555    * The function must not throw any exceptions.
556    */
557   void runInEventBaseThread(Func fn) noexcept;
558 
559   /**
560    * Run the specified function in the EventBase's thread.
561    *
562    * This method is thread-safe, and may be called from another thread.
563    *
564    * If runInEventBaseThreadAlwaysEnqueue() is called when the EventBase loop is
565    * not running, the function call will be delayed until the next time the loop
566    * is started.
567    *
568    * If the loop is terminated (and never later restarted) before it has a
569    * chance to run the requested function, the function will be run upon the
570    * EventBase's destruction.
571    *
572    * If two calls to runInEventBaseThreadAlwaysEnqueue() are made from the same
573    * thread, the functions will always be run in the order that they were
574    * scheduled. Ordering between functions scheduled from separate threads is
575    * not guaranteed. If a call is made from the EventBase thread, the function
576    * will not be executed inline and will be queued to the same queue as if the
577    * call would have been made from a different thread
578    *
579    * @param fn  The function to run.  The function must not throw any
580    *     exceptions.
581    * @param arg An argument to pass to the function.
582    */
583   template <typename T>
584   void runInEventBaseThreadAlwaysEnqueue(void (*fn)(T*), T* arg) noexcept;
585 
586   /**
587    * Run the specified function in the EventBase's thread
588    *
589    * This version of runInEventBaseThreadAlwaysEnqueue() takes a folly::Function
590    * object. Note that this may be less efficient than the version that takes a
591    * plain function pointer and void* argument, if moving the function is
592    * expensive (e.g., if it wraps a lambda which captures some values with
593    * expensive move constructors).
594    *
595    * If the loop is terminated (and never later restarted) before it has a
596    * chance to run the requested function, the function will be run upon the
597    * EventBase's destruction.
598    *
599    * The function must not throw any exceptions.
600    */
601   void runInEventBaseThreadAlwaysEnqueue(Func fn) noexcept;
602 
603   /*
604    * Like runInEventBaseThread, but the caller waits for the callback to be
605    * executed.
606    */
607   template <typename T>
608   void runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) noexcept;
609 
610   /*
611    * Like runInEventBaseThread, but the caller waits for the callback to be
612    * executed.
613    */
614   void runInEventBaseThreadAndWait(Func fn) noexcept;
615 
616   /*
617    * Like runInEventBaseThreadAndWait, except if the caller is already in the
618    * event base thread, the functor is simply run inline.
619    */
620   template <typename T>
621   void runImmediatelyOrRunInEventBaseThreadAndWait(
622       void (*fn)(T*), T* arg) noexcept;
623 
624   /*
625    * Like runInEventBaseThreadAndWait, except if the caller is already in the
626    * event base thread, the functor is simply run inline.
627    */
628   void runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) noexcept;
629 
630   /**
631    * Set the maximum desired latency in us and provide a callback which will be
632    * called when that latency is exceeded.
633    * OBS: This functionality depends on time-measurement.
634    */
635   void setMaxLatency(
636       std::chrono::microseconds maxLatency,
637       Func maxLatencyCob,
638       bool dampen = true) {
639     assert(enableTimeMeasurement_);
640     maxLatency_ = maxLatency;
641     maxLatencyCob_ = std::move(maxLatencyCob);
642     dampenMaxLatency_ = dampen;
643   }
644 
645   /**
646    * Set smoothing coefficient for loop load average; # of milliseconds
647    * for exp(-1) (1/2.71828...) decay.
648    */
649   void setLoadAvgMsec(std::chrono::milliseconds ms);
650 
651   /**
652    * reset the load average to a desired value
653    */
654   void resetLoadAvg(double value = 0.0);
655 
656   /**
657    * Get the average loop time in microseconds (an exponentially-smoothed ave)
658    */
getAvgLoopTime()659   double getAvgLoopTime() const {
660     assert(enableTimeMeasurement_);
661     return avgLoopTime_.get();
662   }
663 
664   /**
665    * Check if the event base loop is running.
666    *
667    * This may only be used as a sanity check mechanism; it cannot be used to
668    * make any decisions; for that, consider waitUntilRunning().
669    */
isRunning()670   bool isRunning() const {
671     return loopThread_.load(std::memory_order_relaxed) != std::thread::id();
672   }
673 
674   /**
675    * wait until the event loop starts (after starting the event loop thread).
676    */
677   void waitUntilRunning();
678 
679   size_t getNotificationQueueSize() const;
680 
681   void setMaxReadAtOnce(uint32_t maxAtOnce);
682 
683   /**
684    * Verify that current thread is the EventBase thread, if the EventBase is
685    * running.
686    */
isInEventBaseThread()687   bool isInEventBaseThread() const {
688     auto tid = loopThread_.load(std::memory_order_relaxed);
689     return tid == std::thread::id() || tid == std::this_thread::get_id();
690   }
691 
inRunningEventBaseThread()692   bool inRunningEventBaseThread() const {
693     return loopThread_.load(std::memory_order_relaxed) ==
694         std::this_thread::get_id();
695   }
696 
697   /**
698    * Equivalent to CHECK(isInEventBaseThread()) (and assert/DCHECK for
699    * dcheckIsInEventBaseThread), but it prints more information on
700    * failure.
701    */
702   void checkIsInEventBaseThread() const;
dcheckIsInEventBaseThread()703   void dcheckIsInEventBaseThread() const {
704     if (kIsDebug) {
705       checkIsInEventBaseThread();
706     }
707   }
708 
timer()709   HHWheelTimer& timer() {
710     if (!wheelTimer_) {
711       wheelTimer_ = HHWheelTimer::newTimer(this, intervalDuration_);
712     }
713     return *wheelTimer_.get();
714   }
715 
getBackend()716   EventBaseBackendBase* getBackend() { return evb_.get(); }
717   // --------- interface to underlying libevent base ------------
718   // Avoid using these functions if possible.  These functions are not
719   // guaranteed to always be present if we ever provide alternative EventBase
720   // implementations that do not use libevent internally.
721   event_base* getLibeventBase() const;
722 
723   static const char* getLibeventVersion();
724   const char* getLibeventMethod();
725 
726   /**
727    * only EventHandler/AsyncTimeout subclasses and ourselves should
728    * ever call this.
729    *
730    * This is used to mark the beginning of a new loop cycle by the
731    * first handler fired within that cycle.
732    *
733    */
734   void bumpHandlingTime() final;
735 
736   class SmoothLoopTime {
737    public:
SmoothLoopTime(std::chrono::microseconds timeInterval)738     explicit SmoothLoopTime(std::chrono::microseconds timeInterval)
739         : expCoeff_(-1.0 / static_cast<double>(timeInterval.count())),
740           value_(0.0) {
741       VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
742     }
743 
744     void setTimeInterval(std::chrono::microseconds timeInterval);
745     void reset(double value = 0.0);
746 
747     void addSample(
748         std::chrono::microseconds total, std::chrono::microseconds busy);
749 
get()750     double get() const {
751       // Add the outstanding buffered times linearly, to avoid
752       // expensive exponentiation
753       auto lcoeff = static_cast<double>(buffer_time_.count()) * -expCoeff_;
754       return value_ * (1.0 - lcoeff) +
755           lcoeff * static_cast<double>(busy_buffer_.count());
756     }
757 
dampen(double factor)758     void dampen(double factor) { value_ *= factor; }
759 
760    private:
761     double expCoeff_;
762     double value_;
763     std::chrono::microseconds buffer_time_{0};
764     std::chrono::microseconds busy_buffer_{0};
765     std::size_t buffer_cnt_{0};
766     static constexpr std::chrono::milliseconds buffer_interval_{10};
767   };
768 
setObserver(const std::shared_ptr<EventBaseObserver> & observer)769   void setObserver(const std::shared_ptr<EventBaseObserver>& observer) {
770     assert(enableTimeMeasurement_);
771     observer_ = observer;
772   }
773 
getObserver()774   const std::shared_ptr<EventBaseObserver>& getObserver() { return observer_; }
775 
776   /**
777    * Setup execution observation/instrumentation for every EventHandler
778    * executed in this EventBase.
779    *
780    * @param observer EventHandle's execution observer.
781    */
setExecutionObserver(ExecutionObserver * observer)782   void setExecutionObserver(ExecutionObserver* observer) {
783     executionObserver_ = observer;
784   }
785 
786   /**
787    * Gets the execution observer associated with this EventBase.
788    */
getExecutionObserver()789   ExecutionObserver* getExecutionObserver() { return executionObserver_; }
790 
791   /**
792    * Set the name of the thread that runs this event base.
793    */
794   void setName(const std::string& name);
795 
796   /**
797    * Returns the name of the thread that runs this event base.
798    */
799   const std::string& getName();
800 
801   /// Implements the Executor interface
add(Cob fn)802   void add(Cob fn) override { runInEventBaseThread(std::move(fn)); }
803 
804   /// Implements the DrivableExecutor interface
drive()805   void drive() override {
806     ++loopKeepAliveCount_;
807     SCOPE_EXIT { --loopKeepAliveCount_; };
808     loopOnce();
809   }
810 
811   // Implements the ScheduledExecutor interface
812   void scheduleAt(Func&& fn, TimePoint const& timeout) override;
813 
814   // TimeoutManager
815   void attachTimeoutManager(
816       AsyncTimeout* obj, TimeoutManager::InternalEnum internal) final;
817 
818   void detachTimeoutManager(AsyncTimeout* obj) final;
819 
820   bool scheduleTimeout(
821       AsyncTimeout* obj, TimeoutManager::timeout_type timeout) final;
822 
823   void cancelTimeout(AsyncTimeout* obj) final;
824 
isInTimeoutManagerThread()825   bool isInTimeoutManagerThread() final { return isInEventBaseThread(); }
826 
827   // Returns a VirtualEventBase attached to this EventBase. Can be used to
828   // pass to APIs which expect VirtualEventBase. This VirtualEventBase will be
829   // destroyed together with the EventBase.
830   //
831   // Any number of VirtualEventBases instances may be independently constructed,
832   // which are backed by this EventBase. This method should be only used if you
833   // don't need to manage the life time of the VirtualEventBase used.
834   folly::VirtualEventBase& getVirtualEventBase();
835 
836   /// Implements the IOExecutor interface
837   EventBase* getEventBase() override;
838 
839   static std::unique_ptr<EventBaseBackendBase> getDefaultBackend();
840 
841  protected:
keepAliveAcquire()842   bool keepAliveAcquire() noexcept override {
843     if (inRunningEventBaseThread()) {
844       loopKeepAliveCount_++;
845     } else {
846       loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
847     }
848     return true;
849   }
850 
keepAliveRelease()851   void keepAliveRelease() noexcept override {
852     if (!inRunningEventBaseThread()) {
853       return add([this] { loopKeepAliveCount_--; });
854     }
855     loopKeepAliveCount_--;
856   }
857 
858  private:
859   class FuncRunner;
860 
861   folly::VirtualEventBase* tryGetVirtualEventBase();
862 
863   void applyLoopKeepAlive();
864 
865   ssize_t loopKeepAliveCount();
866 
867   /*
868    * Helper function that tells us whether we have already handled
869    * some event/timeout/callback in this loop iteration.
870    */
871   bool nothingHandledYet() const noexcept;
872 
873   typedef LoopCallback::List LoopCallbackList;
874 
875   bool loopBody(int flags = 0, bool ignoreKeepAlive = false);
876 
877   void runLoopCallbacks(LoopCallbackList& currentCallbacks);
878 
879   // executes any callbacks queued by runInLoop(); returns false if none found
880   bool runLoopCallbacks();
881 
882   void initNotificationQueue();
883 
884   // Tick granularity to wheelTimer_
885   std::chrono::milliseconds intervalDuration_{
886       HHWheelTimer::DEFAULT_TICK_INTERVAL};
887   // should only be accessed through public getter
888   HHWheelTimer::UniquePtr wheelTimer_;
889 
890   LoopCallbackList loopCallbacks_;
891   LoopCallbackList runBeforeLoopCallbacks_;
892   Synchronized<OnDestructionCallback::List> onDestructionCallbacks_;
893 
894   // This will be null most of the time, but point to currentCallbacks
895   // if we are in the middle of running loop callbacks, such that
896   // runInLoop(..., true) will always run in the current loop
897   // iteration.
898   LoopCallbackList* runOnceCallbacks_;
899 
900   // stop_ is set by terminateLoopSoon() and is used by the main loop
901   // to determine if it should exit
902   std::atomic<bool> stop_;
903 
904   // The ID of the thread running the main loop.
905   // std::thread::id{} if loop is not running.
906   std::atomic<std::thread::id> loopThread_;
907 
908   // A notification queue for runInEventBaseThread() to use
909   // to send function requests to the EventBase thread.
910   std::unique_ptr<EventBaseAtomicNotificationQueue<Func, FuncRunner>> queue_;
911   ssize_t loopKeepAliveCount_{0};
912   std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
913   bool loopKeepAliveActive_{false};
914 
915   // limit for latency in microseconds (0 disables)
916   std::chrono::microseconds maxLatency_;
917 
918   // exponentially-smoothed average loop time for latency-limiting
919   SmoothLoopTime avgLoopTime_;
920 
921   // smoothed loop time used to invoke latency callbacks; differs from
922   // avgLoopTime_ in that it's scaled down after triggering a callback
923   // to reduce spamminess
924   SmoothLoopTime maxLatencyLoopTime_;
925 
926   // If true, max latency callbacks will use a dampened SmoothLoopTime, else
927   // they'll use the raw loop time.
928   bool dampenMaxLatency_ = true;
929 
930   // callback called when latency limit is exceeded
931   Func maxLatencyCob_;
932 
933   // Enables/disables time measurements in loopBody(). if disabled, the
934   // following functionality that relies on time-measurement, will not
935   // be supported: avg loop time, observer and max latency.
936   const bool enableTimeMeasurement_;
937 
938   // Wrap-around loop counter to detect beginning of each loop
939   std::size_t nextLoopCnt_;
940   std::size_t latestLoopCnt_;
941   std::chrono::steady_clock::time_point startWork_;
942   // Prevent undefined behavior from invoking event_base_loop() reentrantly.
943   // This is needed since many projects use libevent-1.4, which lacks commit
944   // b557b175c00dc462c1fce25f6e7dd67121d2c001 from
945   // https://github.com/libevent/libevent/.
946   bool invokingLoop_{false};
947 
948   // Observer to export counters
949   std::shared_ptr<EventBaseObserver> observer_;
950   uint32_t observerSampleCount_;
951 
952   // EventHandler's execution observer.
953   ExecutionObserver* executionObserver_;
954 
955   // Name of the thread running this EventBase
956   std::string name_;
957 
958   // see EventBaseLocal
959   friend class detail::EventBaseLocalBase;
960   template <typename T>
961   friend class EventBaseLocal;
962   std::unordered_map<std::size_t, erased_unique_ptr> localStorage_;
963   folly::Synchronized<std::unordered_set<detail::EventBaseLocalBase*>>
964       localStorageToDtor_;
965   bool tryDeregister(detail::EventBaseLocalBase&);
966 
967   folly::once_flag virtualEventBaseInitFlag_;
968   std::unique_ptr<VirtualEventBase> virtualEventBase_;
969 
970   // pointer to underlying backend class doing the heavy lifting
971   std::unique_ptr<EventBaseBackendBase> evb_;
972 };
973 
974 template <typename T>
runInEventBaseThread(void (* fn)(T *),T * arg)975 void EventBase::runInEventBaseThread(void (*fn)(T*), T* arg) noexcept {
976   return runInEventBaseThread([=] { fn(arg); });
977 }
978 
979 template <typename T>
runInEventBaseThreadAlwaysEnqueue(void (* fn)(T *),T * arg)980 void EventBase::runInEventBaseThreadAlwaysEnqueue(
981     void (*fn)(T*), T* arg) noexcept {
982   return runInEventBaseThreadAlwaysEnqueue([=] { fn(arg); });
983 }
984 
985 template <typename T>
runInEventBaseThreadAndWait(void (* fn)(T *),T * arg)986 void EventBase::runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) noexcept {
987   return runInEventBaseThreadAndWait([=] { fn(arg); });
988 }
989 
990 template <typename T>
runImmediatelyOrRunInEventBaseThreadAndWait(void (* fn)(T *),T * arg)991 void EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(
992     void (*fn)(T*), T* arg) noexcept {
993   return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); });
994 }
995 
996 } // namespace folly
997