1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20 
21 #include <folly/io/async/EventBase.h>
22 
23 #include <fcntl.h>
24 
25 #include <memory>
26 #include <mutex>
27 #include <thread>
28 
29 #include <folly/ExceptionString.h>
30 #include <folly/Memory.h>
31 #include <folly/String.h>
32 #include <folly/io/async/EventBaseAtomicNotificationQueue.h>
33 #include <folly/io/async/EventBaseBackendBase.h>
34 #include <folly/io/async/EventBaseLocal.h>
35 #include <folly/io/async/VirtualEventBase.h>
36 #include <folly/portability/Unistd.h>
37 #include <folly/synchronization/Baton.h>
38 #include <folly/system/ThreadName.h>
39 
40 namespace {
41 
42 class EventBaseBackend : public folly::EventBaseBackendBase {
43  public:
44   EventBaseBackend();
45   explicit EventBaseBackend(event_base* evb);
46   ~EventBaseBackend() override;
47 
getEventBase()48   event_base* getEventBase() override { return evb_; }
49 
50   int eb_event_base_loop(int flags) override;
51   int eb_event_base_loopbreak() override;
52 
53   int eb_event_add(Event& event, const struct timeval* timeout) override;
54   int eb_event_del(EventBaseBackendBase::Event& event) override;
55 
56   bool eb_event_active(Event& event, int res) override;
57 
58  private:
59   event_base* evb_;
60 };
61 
EventBaseBackend()62 EventBaseBackend::EventBaseBackend() {
63   evb_ = event_base_new();
64 }
65 
EventBaseBackend(event_base * evb)66 EventBaseBackend::EventBaseBackend(event_base* evb) : evb_(evb) {
67   if (UNLIKELY(evb_ == nullptr)) {
68     LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
69     throw std::invalid_argument("EventBase(): event base cannot be nullptr");
70   }
71 }
72 
eb_event_base_loop(int flags)73 int EventBaseBackend::eb_event_base_loop(int flags) {
74   return event_base_loop(evb_, flags);
75 }
76 
eb_event_base_loopbreak()77 int EventBaseBackend::eb_event_base_loopbreak() {
78   return event_base_loopbreak(evb_);
79 }
80 
eb_event_add(Event & event,const struct timeval * timeout)81 int EventBaseBackend::eb_event_add(
82     Event& event, const struct timeval* timeout) {
83   return event_add(event.getEvent(), timeout);
84 }
85 
eb_event_del(EventBaseBackendBase::Event & event)86 int EventBaseBackend::eb_event_del(EventBaseBackendBase::Event& event) {
87   return event_del(event.getEvent());
88 }
89 
eb_event_active(Event & event,int res)90 bool EventBaseBackend::eb_event_active(Event& event, int res) {
91   event_active(event.getEvent(), res, 1);
92   return true;
93 }
94 
~EventBaseBackend()95 EventBaseBackend::~EventBaseBackend() {
96   event_base_free(evb_);
97 }
98 
99 class ExecutionObserverScopeGuard {
100  public:
ExecutionObserverScopeGuard(folly::ExecutionObserver * observer,void * id)101   ExecutionObserverScopeGuard(folly::ExecutionObserver* observer, void* id)
102       : observer_(observer), id_{reinterpret_cast<uintptr_t>(id)} {
103     if (observer_) {
104       observer_->starting(id_);
105     }
106   }
107 
~ExecutionObserverScopeGuard()108   ~ExecutionObserverScopeGuard() {
109     if (observer_) {
110       observer_->stopped(id_);
111     }
112   }
113 
114  private:
115   folly::ExecutionObserver* observer_;
116   uintptr_t id_;
117 };
118 } // namespace
119 
120 namespace folly {
121 
122 class EventBase::FuncRunner {
123  public:
operator ()(Func func)124   void operator()(Func func) noexcept { func(); }
125 };
126 
127 /*
128  * EventBase methods
129  */
130 
EventBase(std::chrono::milliseconds tickInterval)131 EventBase::EventBase(std::chrono::milliseconds tickInterval)
132     : EventBase(Options().setTimerTickInterval(tickInterval)) {}
133 
EventBase(bool enableTimeMeasurement)134 EventBase::EventBase(bool enableTimeMeasurement)
135     : EventBase(Options().setSkipTimeMeasurement(!enableTimeMeasurement)) {}
136 
137 // takes ownership of the event_base
EventBase(event_base * evb,bool enableTimeMeasurement)138 EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
139     : EventBase(Options()
140                     .setBackendFactory([evb] {
141                       return std::make_unique<EventBaseBackend>(evb);
142                     })
143                     .setSkipTimeMeasurement(!enableTimeMeasurement)) {}
144 
EventBase(Options options)145 EventBase::EventBase(Options options)
146     : intervalDuration_(options.timerTickInterval),
147       runOnceCallbacks_(nullptr),
148       stop_(false),
149       loopThread_(),
150       queue_(nullptr),
151       maxLatency_(0),
152       avgLoopTime_(std::chrono::seconds(2)),
153       maxLatencyLoopTime_(avgLoopTime_),
154       enableTimeMeasurement_(!options.skipTimeMeasurement),
155       nextLoopCnt_(
156           std::size_t(-40)) // Early wrap-around so bugs will manifest soon
157       ,
158       latestLoopCnt_(nextLoopCnt_),
159       startWork_(),
160       observer_(nullptr),
161       observerSampleCount_(0),
162       executionObserver_(nullptr) {
163   evb_ =
164       options.backendFactory ? options.backendFactory() : getDefaultBackend();
165   initNotificationQueue();
166 }
167 
~EventBase()168 EventBase::~EventBase() {
169   std::future<void> virtualEventBaseDestroyFuture;
170   if (virtualEventBase_) {
171     virtualEventBaseDestroyFuture = virtualEventBase_->destroy();
172   }
173 
174   // Keep looping until all keep-alive handles are released. Each keep-alive
175   // handle signals that some external code will still schedule some work on
176   // this EventBase (so it's not safe to destroy it).
177   while (loopKeepAliveCount() > 0) {
178     applyLoopKeepAlive();
179     loopOnce();
180   }
181 
182   if (virtualEventBaseDestroyFuture.valid()) {
183     virtualEventBaseDestroyFuture.get();
184   }
185 
186   // Call all destruction callbacks, before we start cleaning up our state.
187   while (!onDestructionCallbacks_.rlock()->empty()) {
188     OnDestructionCallback::List callbacks;
189     onDestructionCallbacks_.swap(callbacks);
190     while (!callbacks.empty()) {
191       auto& callback = callbacks.front();
192       callbacks.pop_front();
193       callback.runCallback();
194     }
195   }
196 
197   clearCobTimeouts();
198 
199   DCHECK_EQ(0u, runBeforeLoopCallbacks_.size());
200 
201   (void)runLoopCallbacks();
202 
203   queue_->drain();
204 
205   // Stop consumer before deleting NotificationQueue
206   queue_->stopConsuming();
207 
208   // Remove self from all registered EventBaseLocal instances.
209   // Notice that we could be racing with EventBaseLocal dtor similarly
210   // deregistering itself from all registered EventBase instances. Because
211   // both sides need to acquire two locks, but in inverse order, we retry if
212   // inner lock acquisition fails to prevent lock inversion deadlock.
213   while (true) {
214     auto locked = localStorageToDtor_.wlock();
215     if (locked->empty()) {
216       break;
217     }
218     auto evbl = *locked->begin();
219     if (evbl->tryDeregister(*this)) {
220       locked->erase(evbl);
221     }
222   }
223   localStorage_.clear();
224 
225   evb_.reset();
226 
227   VLOG(5) << "EventBase(): Destroyed.";
228 }
229 
tryDeregister(detail::EventBaseLocalBase & evbl)230 bool EventBase::tryDeregister(detail::EventBaseLocalBase& evbl) {
231   if (auto locked = localStorageToDtor_.tryWLock()) {
232     locked->erase(&evbl);
233     runInEventBaseThread([this, key = evbl.key_] { localStorage_.erase(key); });
234     return true;
235   }
236   return false;
237 }
238 
getDefaultBackend()239 std::unique_ptr<EventBaseBackendBase> EventBase::getDefaultBackend() {
240   return std::make_unique<EventBaseBackend>();
241 }
242 
getNotificationQueueSize() const243 size_t EventBase::getNotificationQueueSize() const {
244   return queue_->size();
245 }
246 
setMaxReadAtOnce(uint32_t maxAtOnce)247 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
248   queue_->setMaxReadAtOnce(maxAtOnce);
249 }
250 
checkIsInEventBaseThread() const251 void EventBase::checkIsInEventBaseThread() const {
252   auto evbTid = loopThread_.load(std::memory_order_relaxed);
253   if (evbTid == std::thread::id()) {
254     return;
255   }
256 
257   // Using getThreadName(evbTid) instead of name_ will work also if
258   // the thread name is set outside of EventBase (and name_ is empty).
259   auto curTid = std::this_thread::get_id();
260   CHECK_EQ(evbTid, curTid)
261       << "This logic must be executed in the event base thread. "
262       << "Event base thread name: \""
263       << folly::getThreadName(evbTid).value_or("")
264       << "\", current thread name: \""
265       << folly::getThreadName(curTid).value_or("") << "\"";
266 }
267 
268 // Set smoothing coefficient for loop load average; input is # of milliseconds
269 // for exp(-1) decay.
setLoadAvgMsec(std::chrono::milliseconds ms)270 void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
271   assert(enableTimeMeasurement_);
272   std::chrono::microseconds us = std::chrono::milliseconds(ms);
273   if (ms > std::chrono::milliseconds::zero()) {
274     maxLatencyLoopTime_.setTimeInterval(us);
275     avgLoopTime_.setTimeInterval(us);
276   } else {
277     LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
278   }
279 }
280 
resetLoadAvg(double value)281 void EventBase::resetLoadAvg(double value) {
282   assert(enableTimeMeasurement_);
283   avgLoopTime_.reset(value);
284   maxLatencyLoopTime_.reset(value);
285 }
286 
getTimeDelta(std::chrono::steady_clock::time_point * prev)287 static std::chrono::milliseconds getTimeDelta(
288     std::chrono::steady_clock::time_point* prev) {
289   auto result = std::chrono::steady_clock::now() - *prev;
290   *prev = std::chrono::steady_clock::now();
291 
292   return std::chrono::duration_cast<std::chrono::milliseconds>(result);
293 }
294 
waitUntilRunning()295 void EventBase::waitUntilRunning() {
296   while (loopThread_.load(std::memory_order_acquire) == std::thread::id()) {
297     std::this_thread::yield();
298   }
299 }
300 
301 // enters the event_base loop -- will only exit when forced to
loop()302 bool EventBase::loop() {
303   // Enforce blocking tracking and if we have a name override any previous one
304   ExecutorBlockingGuard guard{ExecutorBlockingGuard::TrackTag{}, this, name_};
305   return loopBody();
306 }
307 
loopIgnoreKeepAlive()308 bool EventBase::loopIgnoreKeepAlive() {
309   if (loopKeepAliveActive_) {
310     // Make sure NotificationQueue is not counted as one of the readers
311     // (otherwise loopBody won't return until terminateLoopSoon is called).
312     queue_->stopConsuming();
313     queue_->startConsumingInternal(this);
314     loopKeepAliveActive_ = false;
315   }
316   return loopBody(0, true);
317 }
318 
loopOnce(int flags)319 bool EventBase::loopOnce(int flags) {
320   return loopBody(flags | EVLOOP_ONCE);
321 }
322 
loopBody(int flags,bool ignoreKeepAlive)323 bool EventBase::loopBody(int flags, bool ignoreKeepAlive) {
324   VLOG(5) << "EventBase(): Starting loop.";
325 
326   const char* message =
327       "Your code just tried to loop over an event base from inside another "
328       "event base loop. Since libevent is not reentrant, this leads to "
329       "undefined behavior in opt builds. Please fix immediately. For the "
330       "common case of an inner function that needs to do some synchronous "
331       "computation on an event-base, replace getEventBase() by a new, "
332       "stack-allocated EventBase.";
333 
334   LOG_IF(DFATAL, invokingLoop_) << message;
335 
336   invokingLoop_ = true;
337   SCOPE_EXIT { invokingLoop_ = false; };
338 
339   int res = 0;
340   bool ranLoopCallbacks;
341   bool blocking = !(flags & EVLOOP_NONBLOCK);
342   bool once = (flags & EVLOOP_ONCE);
343 
344   // time-measurement variables.
345   std::chrono::steady_clock::time_point prev;
346   std::chrono::steady_clock::time_point idleStart = {};
347   std::chrono::microseconds busy;
348   std::chrono::microseconds idle;
349 
350   auto const prevLoopThread = loopThread_.exchange(
351       std::this_thread::get_id(), std::memory_order_release);
352   CHECK_EQ(std::thread::id(), prevLoopThread)
353       << "Driving an EventBase in one thread (" << std::this_thread::get_id()
354       << ") while it is already being driven in another thread ("
355       << prevLoopThread << ") is forbidden.";
356 
357   if (!name_.empty()) {
358     setThreadName(name_);
359   }
360 
361   if (enableTimeMeasurement_) {
362     prev = std::chrono::steady_clock::now();
363     idleStart = prev;
364   }
365 
366   while (!stop_.load(std::memory_order_relaxed)) {
367     if (!ignoreKeepAlive) {
368       applyLoopKeepAlive();
369     }
370     ++nextLoopCnt_;
371 
372     // Run the before loop callbacks
373     LoopCallbackList callbacks;
374     callbacks.swap(runBeforeLoopCallbacks_);
375 
376     runLoopCallbacks(callbacks);
377 
378     // nobody can add loop callbacks from within this thread if
379     // we don't have to handle anything to start with...
380     if (blocking && loopCallbacks_.empty()) {
381       res = evb_->eb_event_base_loop(EVLOOP_ONCE);
382     } else {
383       res = evb_->eb_event_base_loop(EVLOOP_ONCE | EVLOOP_NONBLOCK);
384     }
385 
386     ranLoopCallbacks = runLoopCallbacks();
387 
388     if (enableTimeMeasurement_) {
389       auto now = std::chrono::steady_clock::now();
390       busy = std::chrono::duration_cast<std::chrono::microseconds>(
391           now - startWork_);
392       idle = std::chrono::duration_cast<std::chrono::microseconds>(
393           startWork_ - idleStart);
394       auto loop_time = busy + idle;
395 
396       avgLoopTime_.addSample(loop_time, busy);
397       maxLatencyLoopTime_.addSample(loop_time, busy);
398 
399       if (observer_) {
400         if (observerSampleCount_++ == observer_->getSampleRate()) {
401           observerSampleCount_ = 0;
402           observer_->loopSample(busy.count(), idle.count());
403         }
404       }
405 
406       VLOG(11) << "EventBase " << this << " did not timeout "
407                << " loop time guess: " << loop_time.count()
408                << " idle time: " << idle.count()
409                << " busy time: " << busy.count()
410                << " avgLoopTime: " << avgLoopTime_.get()
411                << " maxLatencyLoopTime: " << maxLatencyLoopTime_.get()
412                << " maxLatency_: " << maxLatency_.count() << "us"
413                << " notificationQueueSize: " << getNotificationQueueSize()
414                << " nothingHandledYet(): " << nothingHandledYet();
415 
416       if (maxLatency_ > std::chrono::microseconds::zero()) {
417         // see if our average loop time has exceeded our limit
418         if (dampenMaxLatency_ &&
419             (maxLatencyLoopTime_.get() > double(maxLatency_.count()))) {
420           maxLatencyCob_();
421           // back off temporarily -- don't keep spamming maxLatencyCob_
422           // if we're only a bit over the limit
423           maxLatencyLoopTime_.dampen(0.9);
424         } else if (!dampenMaxLatency_ && busy > maxLatency_) {
425           // If no damping, we compare the raw busy time
426           maxLatencyCob_();
427         }
428       }
429 
430       // Our loop run did real work; reset the idle timer
431       idleStart = now;
432     } else {
433       VLOG(11) << "EventBase " << this << " did not timeout";
434     }
435 
436     // Event loop indicated that there were no more events (NotificationQueue
437     // was registered as an internal event and there were no other registered
438     // events).
439     if (res != 0) {
440       // Since Notification Queue is marked 'internal' some events may not have
441       // run.  Run them manually if so, and continue looping.
442       //
443       if (!queue_->empty()) {
444         ExecutionObserverScopeGuard guard(executionObserver_, queue_.get());
445         queue_->execute();
446       } else if (!ranLoopCallbacks) {
447         // If there were no more events and we also didn't have any loop
448         // callbacks to run, there is nothing left to do.
449         break;
450       }
451     }
452 
453     if (enableTimeMeasurement_) {
454       VLOG(11) << "EventBase " << this
455                << " loop time: " << getTimeDelta(&prev).count();
456     }
457 
458     if (once) {
459       break;
460     }
461   }
462   // Reset stop_ so loop() can be called again
463   stop_.store(false, std::memory_order_relaxed);
464 
465   if (res < 0) {
466     LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
467     return false;
468   } else if (res == 1) {
469     VLOG(5) << "EventBase: ran out of events (exiting loop)!";
470   } else if (res > 1) {
471     LOG(ERROR) << "EventBase: unknown event loop result = " << res;
472     return false;
473   }
474 
475   loopThread_.store({}, std::memory_order_release);
476 
477   VLOG(5) << "EventBase(): Done with loop.";
478   return true;
479 }
480 
loopKeepAliveCount()481 ssize_t EventBase::loopKeepAliveCount() {
482   if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) {
483     loopKeepAliveCount_ +=
484         loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed);
485   }
486   DCHECK_GE(loopKeepAliveCount_, 0);
487 
488   return loopKeepAliveCount_;
489 }
490 
applyLoopKeepAlive()491 void EventBase::applyLoopKeepAlive() {
492   auto keepAliveCount = loopKeepAliveCount();
493   // Make sure default VirtualEventBase won't hold EventBase::loop() forever.
494   if (auto virtualEventBase = tryGetVirtualEventBase()) {
495     if (virtualEventBase->keepAliveCount() == 1) {
496       --keepAliveCount;
497     }
498   }
499 
500   if (loopKeepAliveActive_ && keepAliveCount == 0) {
501     // Restore the notification queue internal flag
502     queue_->stopConsuming();
503     queue_->startConsumingInternal(this);
504     loopKeepAliveActive_ = false;
505   } else if (!loopKeepAliveActive_ && keepAliveCount > 0) {
506     // Update the notification queue event to treat it as a normal
507     // (non-internal) event.  The notification queue event always remains
508     // installed, and the main loop won't exit with it installed.
509     queue_->stopConsuming();
510     queue_->startConsuming(this);
511     loopKeepAliveActive_ = true;
512   }
513 }
514 
loopForever()515 void EventBase::loopForever() {
516   bool ret;
517   {
518     SCOPE_EXIT { applyLoopKeepAlive(); };
519     // Make sure notification queue events are treated as normal events.
520     // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
521     // released inside a loop.
522     ++loopKeepAliveCount_;
523     SCOPE_EXIT { --loopKeepAliveCount_; };
524     ret = loop();
525   }
526 
527   if (!ret) {
528     folly::throwSystemError("error in EventBase::loopForever()");
529   }
530 }
531 
bumpHandlingTime()532 void EventBase::bumpHandlingTime() {
533   if (!enableTimeMeasurement_) {
534     return;
535   }
536 
537   VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
538            << " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
539   if (nothingHandledYet()) {
540     latestLoopCnt_ = nextLoopCnt_;
541     // set the time
542     startWork_ = std::chrono::steady_clock::now();
543 
544     VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
545              << " (loop) startWork_ " << startWork_.time_since_epoch().count();
546   }
547 }
548 
terminateLoopSoon()549 void EventBase::terminateLoopSoon() {
550   VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
551 
552   auto keepAlive = getKeepAliveToken(this);
553 
554   // Set stop to true, so the event loop will know to exit.
555   stop_.store(true, std::memory_order_relaxed);
556 
557   // If terminateLoopSoon() is called from another thread,
558   // the EventBase thread might be stuck waiting for events.
559   // In this case, it won't wake up and notice that stop_ is set until it
560   // receives another event.  Send an empty frame to the notification queue
561   // so that the event loop will wake up even if there are no other events.
562   try {
563     queue_->putMessage([] {});
564   } catch (...) {
565     // putMessage() can only fail when the queue is draining in ~EventBase.
566   }
567 }
568 
runInLoop(LoopCallback * callback,bool thisIteration,std::shared_ptr<RequestContext> rctx)569 void EventBase::runInLoop(
570     LoopCallback* callback,
571     bool thisIteration,
572     std::shared_ptr<RequestContext> rctx) {
573   dcheckIsInEventBaseThread();
574   callback->cancelLoopCallback();
575   callback->context_ = std::move(rctx);
576   if (runOnceCallbacks_ != nullptr && thisIteration) {
577     runOnceCallbacks_->push_back(*callback);
578   } else {
579     loopCallbacks_.push_back(*callback);
580   }
581 }
582 
runInLoop(Func cob,bool thisIteration)583 void EventBase::runInLoop(Func cob, bool thisIteration) {
584   dcheckIsInEventBaseThread();
585   auto wrapper = new FunctionLoopCallback(std::move(cob));
586   wrapper->context_ = RequestContext::saveContext();
587   if (runOnceCallbacks_ != nullptr && thisIteration) {
588     runOnceCallbacks_->push_back(*wrapper);
589   } else {
590     loopCallbacks_.push_back(*wrapper);
591   }
592 }
593 
runOnDestruction(OnDestructionCallback & callback)594 void EventBase::runOnDestruction(OnDestructionCallback& callback) {
595   callback.schedule(
596       [this](auto& cb) { onDestructionCallbacks_.wlock()->push_back(cb); },
597       [this](auto& cb) {
598         onDestructionCallbacks_.withWLock(
599             [&](auto& list) { list.erase(list.iterator_to(cb)); });
600       });
601 }
602 
runOnDestruction(Func f)603 void EventBase::runOnDestruction(Func f) {
604   auto* callback = new FunctionOnDestructionCallback(std::move(f));
605   runOnDestruction(*callback);
606 }
607 
runBeforeLoop(LoopCallback * callback)608 void EventBase::runBeforeLoop(LoopCallback* callback) {
609   dcheckIsInEventBaseThread();
610   callback->cancelLoopCallback();
611   runBeforeLoopCallbacks_.push_back(*callback);
612 }
613 
runInEventBaseThread(Func fn)614 void EventBase::runInEventBaseThread(Func fn) noexcept {
615   // Send the message.
616   // It will be received by the FunctionRunner in the EventBase's thread.
617 
618   // We try not to schedule nullptr callbacks
619   if (!fn) {
620     DLOG(FATAL) << "EventBase " << this
621                 << ": Scheduling nullptr callbacks is not allowed";
622     return;
623   }
624 
625   // Short-circuit if we are already in our event base
626   if (inRunningEventBaseThread()) {
627     runInLoop(std::move(fn));
628     return;
629   }
630 
631   queue_->putMessage(std::move(fn));
632 }
633 
runInEventBaseThreadAlwaysEnqueue(Func fn)634 void EventBase::runInEventBaseThreadAlwaysEnqueue(Func fn) noexcept {
635   // Send the message.
636   // It will be received by the FunctionRunner in the EventBase's thread.
637 
638   // We try not to schedule nullptr callbacks
639   if (!fn) {
640     LOG(DFATAL) << "EventBase " << this
641                 << ": Scheduling nullptr callbacks is not allowed";
642     return;
643   }
644 
645   queue_->putMessage(std::move(fn));
646 }
647 
runInEventBaseThreadAndWait(Func fn)648 void EventBase::runInEventBaseThreadAndWait(Func fn) noexcept {
649   if (inRunningEventBaseThread()) {
650     LOG(DFATAL) << "EventBase " << this << ": Waiting in the event loop is not "
651                 << "allowed";
652     return;
653   }
654 
655   Baton<> ready;
656   runInEventBaseThread([&ready, fn = std::move(fn)]() mutable {
657     SCOPE_EXIT { ready.post(); };
658     // A trick to force the stored functor to be executed and then destructed
659     // before posting the baton and waking the waiting thread.
660     copy(std::move(fn))();
661   });
662   ready.wait(folly::Baton<>::wait_options().logging_enabled(false));
663 }
664 
runImmediatelyOrRunInEventBaseThreadAndWait(Func fn)665 void EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) noexcept {
666   if (isInEventBaseThread()) {
667     fn();
668   } else {
669     runInEventBaseThreadAndWait(std::move(fn));
670   }
671 }
672 
runLoopCallbacks(LoopCallbackList & currentCallbacks)673 void EventBase::runLoopCallbacks(LoopCallbackList& currentCallbacks) {
674   while (!currentCallbacks.empty()) {
675     LoopCallback* callback = &currentCallbacks.front();
676     currentCallbacks.pop_front();
677     folly::RequestContextScopeGuard rctx(std::move(callback->context_));
678     ExecutionObserverScopeGuard guard(executionObserver_, callback);
679     callback->runLoopCallback();
680   }
681 }
682 
runLoopCallbacks()683 bool EventBase::runLoopCallbacks() {
684   bumpHandlingTime();
685   if (!loopCallbacks_.empty()) {
686     // Swap the loopCallbacks_ list with a temporary list on our stack.
687     // This way we will only run callbacks scheduled at the time
688     // runLoopCallbacks() was invoked.
689     //
690     // If any of these callbacks in turn call runInLoop() to schedule more
691     // callbacks, those new callbacks won't be run until the next iteration
692     // around the event loop.  This prevents runInLoop() callbacks from being
693     // able to start file descriptor and timeout based events.
694     LoopCallbackList currentCallbacks;
695     currentCallbacks.swap(loopCallbacks_);
696     runOnceCallbacks_ = &currentCallbacks;
697 
698     runLoopCallbacks(currentCallbacks);
699 
700     runOnceCallbacks_ = nullptr;
701     return true;
702   }
703   return false;
704 }
705 
initNotificationQueue()706 void EventBase::initNotificationQueue() {
707   // Infinite size queue
708   queue_ =
709       std::make_unique<EventBaseAtomicNotificationQueue<Func, FuncRunner>>();
710 
711   // Mark this as an internal event, so event_base_loop() will return if
712   // there are no other events besides this one installed.
713   //
714   // Most callers don't care about the internal notification queue used by
715   // EventBase.  The queue is always installed, so if we did count the queue as
716   // an active event, loop() would never exit with no more events to process.
717   // Users can use loopForever() if they do care about the notification queue.
718   // (This is useful for EventBase threads that do nothing but process
719   // runInEventBaseThread() notifications.)
720   queue_->startConsumingInternal(this);
721 }
722 
setTimeInterval(std::chrono::microseconds timeInterval)723 void EventBase::SmoothLoopTime::setTimeInterval(
724     std::chrono::microseconds timeInterval) {
725   expCoeff_ = -1.0 / static_cast<double>(timeInterval.count());
726   VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
727 }
728 
reset(double value)729 void EventBase::SmoothLoopTime::reset(double value) {
730   value_ = value;
731 }
732 
addSample(std::chrono::microseconds total,std::chrono::microseconds busy)733 void EventBase::SmoothLoopTime::addSample(
734     std::chrono::microseconds total, std::chrono::microseconds busy) {
735   if ((buffer_time_ + total) > buffer_interval_ && buffer_cnt_ > 0) {
736     // See https://en.wikipedia.org/wiki/Exponential_smoothing for
737     // more info on this calculation.
738     double coeff = exp(static_cast<double>(buffer_time_.count()) * expCoeff_);
739     value_ = value_ * coeff +
740         (1.0 - coeff) *
741             (static_cast<double>(busy_buffer_.count()) / buffer_cnt_);
742     buffer_time_ = std::chrono::microseconds{0};
743     busy_buffer_ = std::chrono::microseconds{0};
744     buffer_cnt_ = 0;
745   }
746   buffer_time_ += total;
747   busy_buffer_ += busy;
748   buffer_cnt_++;
749 }
750 
nothingHandledYet() const751 bool EventBase::nothingHandledYet() const noexcept {
752   VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
753   return (nextLoopCnt_ != latestLoopCnt_);
754 }
755 
attachTimeoutManager(AsyncTimeout * obj,InternalEnum internal)756 void EventBase::attachTimeoutManager(AsyncTimeout* obj, InternalEnum internal) {
757   auto* ev = obj->getEvent();
758   assert(ev->eb_ev_base() == nullptr);
759 
760   ev->eb_event_base_set(this);
761   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
762     // Set the EVLIST_INTERNAL flag
763     event_ref_flags(ev->getEvent()) |= EVLIST_INTERNAL;
764   }
765 }
766 
detachTimeoutManager(AsyncTimeout * obj)767 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
768   cancelTimeout(obj);
769   auto* ev = obj->getEvent();
770   ev->eb_ev_base(nullptr);
771 }
772 
scheduleTimeout(AsyncTimeout * obj,TimeoutManager::timeout_type timeout)773 bool EventBase::scheduleTimeout(
774     AsyncTimeout* obj, TimeoutManager::timeout_type timeout) {
775   dcheckIsInEventBaseThread();
776   // Set up the timeval and add the event
777   struct timeval tv;
778   tv.tv_sec = long(timeout.count() / 1000LL);
779   tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL);
780 
781   auto* ev = obj->getEvent();
782 
783   DCHECK(ev->eb_ev_base());
784 
785   if (ev->eb_event_add(&tv) < 0) {
786     LOG(ERROR) << "EventBase: failed to schedule timeout: " << errnoStr(errno);
787     return false;
788   }
789 
790   return true;
791 }
792 
cancelTimeout(AsyncTimeout * obj)793 void EventBase::cancelTimeout(AsyncTimeout* obj) {
794   dcheckIsInEventBaseThread();
795   auto* ev = obj->getEvent();
796   if (ev->isEventRegistered()) {
797     ev->eb_event_del();
798   }
799 }
800 
setName(const std::string & name)801 void EventBase::setName(const std::string& name) {
802   dcheckIsInEventBaseThread();
803   name_ = name;
804 
805   if (isRunning()) {
806     setThreadName(loopThread_.load(std::memory_order_relaxed), name_);
807   }
808 }
809 
getName()810 const std::string& EventBase::getName() {
811   dcheckIsInEventBaseThread();
812   return name_;
813 }
814 
scheduleAt(Func && fn,TimePoint const & timeout)815 void EventBase::scheduleAt(Func&& fn, TimePoint const& timeout) {
816   auto duration = timeout - now();
817   timer().scheduleTimeoutFn(
818       std::move(fn),
819       std::chrono::duration_cast<std::chrono::milliseconds>(duration));
820 }
821 
getLibeventBase() const822 event_base* EventBase::getLibeventBase() const {
823   return evb_ ? (evb_->getEventBase()) : nullptr;
824 }
825 
getLibeventVersion()826 const char* EventBase::getLibeventVersion() {
827   return event_get_version();
828 }
getLibeventMethod()829 const char* EventBase::getLibeventMethod() {
830   // event_base_method() would segv if there is no current_base so simulate it
831   struct op {
832     const char* name;
833   };
834   struct base {
835     const op* evsel;
836   };
837   auto b = reinterpret_cast<base*>(getLibeventBase());
838   return !b ? "" : b->evsel->name;
839 }
840 
getVirtualEventBase()841 VirtualEventBase& EventBase::getVirtualEventBase() {
842   folly::call_once(virtualEventBaseInitFlag_, [&] {
843     virtualEventBase_ = std::make_unique<VirtualEventBase>(*this);
844   });
845 
846   return *virtualEventBase_;
847 }
848 
tryGetVirtualEventBase()849 VirtualEventBase* EventBase::tryGetVirtualEventBase() {
850   if (folly::test_once(virtualEventBaseInitFlag_)) {
851     return virtualEventBase_.get();
852   }
853   return nullptr;
854 }
855 
getEventBase()856 EventBase* EventBase::getEventBase() {
857   return this;
858 }
859 
~OnDestructionCallback()860 EventBase::OnDestructionCallback::~OnDestructionCallback() {
861   if (*scheduled_.rlock()) {
862     LOG(FATAL)
863         << "OnDestructionCallback must be canceled if needed prior to destruction";
864   }
865 }
866 
runCallback()867 void EventBase::OnDestructionCallback::runCallback() noexcept {
868   scheduled_.withWLock([&](bool& scheduled) {
869     CHECK(scheduled);
870     scheduled = false;
871 
872     // run can only be called by EventBase and VirtualEventBase, and it's called
873     // after the callback has been popped off the list.
874     eraser_ = nullptr;
875 
876     // Note that the exclusive lock on shared state is held while the callback
877     // runs. This ensures concurrent callers to cancel() block until the
878     // callback finishes.
879     onEventBaseDestruction();
880   });
881 }
882 
schedule(FunctionRef<void (OnDestructionCallback &)> linker,Function<void (OnDestructionCallback &)> eraser)883 void EventBase::OnDestructionCallback::schedule(
884     FunctionRef<void(OnDestructionCallback&)> linker,
885     Function<void(OnDestructionCallback&)> eraser) {
886   eraser_ = std::move(eraser);
887   scheduled_.withWLock([](bool& scheduled) { scheduled = true; });
888   linker(*this);
889 }
890 
cancel()891 bool EventBase::OnDestructionCallback::cancel() {
892   return scheduled_.withWLock([this](bool& scheduled) {
893     const bool wasScheduled = std::exchange(scheduled, false);
894     if (wasScheduled) {
895       auto eraser = std::move(eraser_);
896       CHECK(eraser);
897       eraser(*this);
898     }
899     return wasScheduled;
900   });
901 }
902 
903 constexpr std::chrono::milliseconds EventBase::SmoothLoopTime::buffer_interval_;
904 
905 } // namespace folly
906