1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20
21 #include <folly/io/async/EventBase.h>
22
23 #include <fcntl.h>
24
25 #include <memory>
26 #include <mutex>
27 #include <thread>
28
29 #include <folly/ExceptionString.h>
30 #include <folly/Memory.h>
31 #include <folly/String.h>
32 #include <folly/io/async/EventBaseAtomicNotificationQueue.h>
33 #include <folly/io/async/EventBaseBackendBase.h>
34 #include <folly/io/async/EventBaseLocal.h>
35 #include <folly/io/async/VirtualEventBase.h>
36 #include <folly/portability/Unistd.h>
37 #include <folly/synchronization/Baton.h>
38 #include <folly/system/ThreadName.h>
39
40 namespace {
41
42 class EventBaseBackend : public folly::EventBaseBackendBase {
43 public:
44 EventBaseBackend();
45 explicit EventBaseBackend(event_base* evb);
46 ~EventBaseBackend() override;
47
getEventBase()48 event_base* getEventBase() override { return evb_; }
49
50 int eb_event_base_loop(int flags) override;
51 int eb_event_base_loopbreak() override;
52
53 int eb_event_add(Event& event, const struct timeval* timeout) override;
54 int eb_event_del(EventBaseBackendBase::Event& event) override;
55
56 bool eb_event_active(Event& event, int res) override;
57
58 private:
59 event_base* evb_;
60 };
61
EventBaseBackend()62 EventBaseBackend::EventBaseBackend() {
63 evb_ = event_base_new();
64 }
65
EventBaseBackend(event_base * evb)66 EventBaseBackend::EventBaseBackend(event_base* evb) : evb_(evb) {
67 if (UNLIKELY(evb_ == nullptr)) {
68 LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
69 throw std::invalid_argument("EventBase(): event base cannot be nullptr");
70 }
71 }
72
eb_event_base_loop(int flags)73 int EventBaseBackend::eb_event_base_loop(int flags) {
74 return event_base_loop(evb_, flags);
75 }
76
eb_event_base_loopbreak()77 int EventBaseBackend::eb_event_base_loopbreak() {
78 return event_base_loopbreak(evb_);
79 }
80
eb_event_add(Event & event,const struct timeval * timeout)81 int EventBaseBackend::eb_event_add(
82 Event& event, const struct timeval* timeout) {
83 return event_add(event.getEvent(), timeout);
84 }
85
eb_event_del(EventBaseBackendBase::Event & event)86 int EventBaseBackend::eb_event_del(EventBaseBackendBase::Event& event) {
87 return event_del(event.getEvent());
88 }
89
eb_event_active(Event & event,int res)90 bool EventBaseBackend::eb_event_active(Event& event, int res) {
91 event_active(event.getEvent(), res, 1);
92 return true;
93 }
94
~EventBaseBackend()95 EventBaseBackend::~EventBaseBackend() {
96 event_base_free(evb_);
97 }
98
99 class ExecutionObserverScopeGuard {
100 public:
ExecutionObserverScopeGuard(folly::ExecutionObserver * observer,void * id)101 ExecutionObserverScopeGuard(folly::ExecutionObserver* observer, void* id)
102 : observer_(observer), id_{reinterpret_cast<uintptr_t>(id)} {
103 if (observer_) {
104 observer_->starting(id_);
105 }
106 }
107
~ExecutionObserverScopeGuard()108 ~ExecutionObserverScopeGuard() {
109 if (observer_) {
110 observer_->stopped(id_);
111 }
112 }
113
114 private:
115 folly::ExecutionObserver* observer_;
116 uintptr_t id_;
117 };
118 } // namespace
119
120 namespace folly {
121
122 class EventBase::FuncRunner {
123 public:
operator ()(Func func)124 void operator()(Func func) noexcept { func(); }
125 };
126
127 /*
128 * EventBase methods
129 */
130
EventBase(std::chrono::milliseconds tickInterval)131 EventBase::EventBase(std::chrono::milliseconds tickInterval)
132 : EventBase(Options().setTimerTickInterval(tickInterval)) {}
133
EventBase(bool enableTimeMeasurement)134 EventBase::EventBase(bool enableTimeMeasurement)
135 : EventBase(Options().setSkipTimeMeasurement(!enableTimeMeasurement)) {}
136
137 // takes ownership of the event_base
EventBase(event_base * evb,bool enableTimeMeasurement)138 EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
139 : EventBase(Options()
140 .setBackendFactory([evb] {
141 return std::make_unique<EventBaseBackend>(evb);
142 })
143 .setSkipTimeMeasurement(!enableTimeMeasurement)) {}
144
EventBase(Options options)145 EventBase::EventBase(Options options)
146 : intervalDuration_(options.timerTickInterval),
147 runOnceCallbacks_(nullptr),
148 stop_(false),
149 loopThread_(),
150 queue_(nullptr),
151 maxLatency_(0),
152 avgLoopTime_(std::chrono::seconds(2)),
153 maxLatencyLoopTime_(avgLoopTime_),
154 enableTimeMeasurement_(!options.skipTimeMeasurement),
155 nextLoopCnt_(
156 std::size_t(-40)) // Early wrap-around so bugs will manifest soon
157 ,
158 latestLoopCnt_(nextLoopCnt_),
159 startWork_(),
160 observer_(nullptr),
161 observerSampleCount_(0),
162 executionObserver_(nullptr) {
163 evb_ =
164 options.backendFactory ? options.backendFactory() : getDefaultBackend();
165 initNotificationQueue();
166 }
167
~EventBase()168 EventBase::~EventBase() {
169 std::future<void> virtualEventBaseDestroyFuture;
170 if (virtualEventBase_) {
171 virtualEventBaseDestroyFuture = virtualEventBase_->destroy();
172 }
173
174 // Keep looping until all keep-alive handles are released. Each keep-alive
175 // handle signals that some external code will still schedule some work on
176 // this EventBase (so it's not safe to destroy it).
177 while (loopKeepAliveCount() > 0) {
178 applyLoopKeepAlive();
179 loopOnce();
180 }
181
182 if (virtualEventBaseDestroyFuture.valid()) {
183 virtualEventBaseDestroyFuture.get();
184 }
185
186 // Call all destruction callbacks, before we start cleaning up our state.
187 while (!onDestructionCallbacks_.rlock()->empty()) {
188 OnDestructionCallback::List callbacks;
189 onDestructionCallbacks_.swap(callbacks);
190 while (!callbacks.empty()) {
191 auto& callback = callbacks.front();
192 callbacks.pop_front();
193 callback.runCallback();
194 }
195 }
196
197 clearCobTimeouts();
198
199 DCHECK_EQ(0u, runBeforeLoopCallbacks_.size());
200
201 (void)runLoopCallbacks();
202
203 queue_->drain();
204
205 // Stop consumer before deleting NotificationQueue
206 queue_->stopConsuming();
207
208 // Remove self from all registered EventBaseLocal instances.
209 // Notice that we could be racing with EventBaseLocal dtor similarly
210 // deregistering itself from all registered EventBase instances. Because
211 // both sides need to acquire two locks, but in inverse order, we retry if
212 // inner lock acquisition fails to prevent lock inversion deadlock.
213 while (true) {
214 auto locked = localStorageToDtor_.wlock();
215 if (locked->empty()) {
216 break;
217 }
218 auto evbl = *locked->begin();
219 if (evbl->tryDeregister(*this)) {
220 locked->erase(evbl);
221 }
222 }
223 localStorage_.clear();
224
225 evb_.reset();
226
227 VLOG(5) << "EventBase(): Destroyed.";
228 }
229
tryDeregister(detail::EventBaseLocalBase & evbl)230 bool EventBase::tryDeregister(detail::EventBaseLocalBase& evbl) {
231 if (auto locked = localStorageToDtor_.tryWLock()) {
232 locked->erase(&evbl);
233 runInEventBaseThread([this, key = evbl.key_] { localStorage_.erase(key); });
234 return true;
235 }
236 return false;
237 }
238
getDefaultBackend()239 std::unique_ptr<EventBaseBackendBase> EventBase::getDefaultBackend() {
240 return std::make_unique<EventBaseBackend>();
241 }
242
getNotificationQueueSize() const243 size_t EventBase::getNotificationQueueSize() const {
244 return queue_->size();
245 }
246
setMaxReadAtOnce(uint32_t maxAtOnce)247 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
248 queue_->setMaxReadAtOnce(maxAtOnce);
249 }
250
checkIsInEventBaseThread() const251 void EventBase::checkIsInEventBaseThread() const {
252 auto evbTid = loopThread_.load(std::memory_order_relaxed);
253 if (evbTid == std::thread::id()) {
254 return;
255 }
256
257 // Using getThreadName(evbTid) instead of name_ will work also if
258 // the thread name is set outside of EventBase (and name_ is empty).
259 auto curTid = std::this_thread::get_id();
260 CHECK_EQ(evbTid, curTid)
261 << "This logic must be executed in the event base thread. "
262 << "Event base thread name: \""
263 << folly::getThreadName(evbTid).value_or("")
264 << "\", current thread name: \""
265 << folly::getThreadName(curTid).value_or("") << "\"";
266 }
267
268 // Set smoothing coefficient for loop load average; input is # of milliseconds
269 // for exp(-1) decay.
setLoadAvgMsec(std::chrono::milliseconds ms)270 void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
271 assert(enableTimeMeasurement_);
272 std::chrono::microseconds us = std::chrono::milliseconds(ms);
273 if (ms > std::chrono::milliseconds::zero()) {
274 maxLatencyLoopTime_.setTimeInterval(us);
275 avgLoopTime_.setTimeInterval(us);
276 } else {
277 LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
278 }
279 }
280
resetLoadAvg(double value)281 void EventBase::resetLoadAvg(double value) {
282 assert(enableTimeMeasurement_);
283 avgLoopTime_.reset(value);
284 maxLatencyLoopTime_.reset(value);
285 }
286
getTimeDelta(std::chrono::steady_clock::time_point * prev)287 static std::chrono::milliseconds getTimeDelta(
288 std::chrono::steady_clock::time_point* prev) {
289 auto result = std::chrono::steady_clock::now() - *prev;
290 *prev = std::chrono::steady_clock::now();
291
292 return std::chrono::duration_cast<std::chrono::milliseconds>(result);
293 }
294
waitUntilRunning()295 void EventBase::waitUntilRunning() {
296 while (loopThread_.load(std::memory_order_acquire) == std::thread::id()) {
297 std::this_thread::yield();
298 }
299 }
300
301 // enters the event_base loop -- will only exit when forced to
loop()302 bool EventBase::loop() {
303 // Enforce blocking tracking and if we have a name override any previous one
304 ExecutorBlockingGuard guard{ExecutorBlockingGuard::TrackTag{}, this, name_};
305 return loopBody();
306 }
307
loopIgnoreKeepAlive()308 bool EventBase::loopIgnoreKeepAlive() {
309 if (loopKeepAliveActive_) {
310 // Make sure NotificationQueue is not counted as one of the readers
311 // (otherwise loopBody won't return until terminateLoopSoon is called).
312 queue_->stopConsuming();
313 queue_->startConsumingInternal(this);
314 loopKeepAliveActive_ = false;
315 }
316 return loopBody(0, true);
317 }
318
loopOnce(int flags)319 bool EventBase::loopOnce(int flags) {
320 return loopBody(flags | EVLOOP_ONCE);
321 }
322
loopBody(int flags,bool ignoreKeepAlive)323 bool EventBase::loopBody(int flags, bool ignoreKeepAlive) {
324 VLOG(5) << "EventBase(): Starting loop.";
325
326 const char* message =
327 "Your code just tried to loop over an event base from inside another "
328 "event base loop. Since libevent is not reentrant, this leads to "
329 "undefined behavior in opt builds. Please fix immediately. For the "
330 "common case of an inner function that needs to do some synchronous "
331 "computation on an event-base, replace getEventBase() by a new, "
332 "stack-allocated EventBase.";
333
334 LOG_IF(DFATAL, invokingLoop_) << message;
335
336 invokingLoop_ = true;
337 SCOPE_EXIT { invokingLoop_ = false; };
338
339 int res = 0;
340 bool ranLoopCallbacks;
341 bool blocking = !(flags & EVLOOP_NONBLOCK);
342 bool once = (flags & EVLOOP_ONCE);
343
344 // time-measurement variables.
345 std::chrono::steady_clock::time_point prev;
346 std::chrono::steady_clock::time_point idleStart = {};
347 std::chrono::microseconds busy;
348 std::chrono::microseconds idle;
349
350 auto const prevLoopThread = loopThread_.exchange(
351 std::this_thread::get_id(), std::memory_order_release);
352 CHECK_EQ(std::thread::id(), prevLoopThread)
353 << "Driving an EventBase in one thread (" << std::this_thread::get_id()
354 << ") while it is already being driven in another thread ("
355 << prevLoopThread << ") is forbidden.";
356
357 if (!name_.empty()) {
358 setThreadName(name_);
359 }
360
361 if (enableTimeMeasurement_) {
362 prev = std::chrono::steady_clock::now();
363 idleStart = prev;
364 }
365
366 while (!stop_.load(std::memory_order_relaxed)) {
367 if (!ignoreKeepAlive) {
368 applyLoopKeepAlive();
369 }
370 ++nextLoopCnt_;
371
372 // Run the before loop callbacks
373 LoopCallbackList callbacks;
374 callbacks.swap(runBeforeLoopCallbacks_);
375
376 runLoopCallbacks(callbacks);
377
378 // nobody can add loop callbacks from within this thread if
379 // we don't have to handle anything to start with...
380 if (blocking && loopCallbacks_.empty()) {
381 res = evb_->eb_event_base_loop(EVLOOP_ONCE);
382 } else {
383 res = evb_->eb_event_base_loop(EVLOOP_ONCE | EVLOOP_NONBLOCK);
384 }
385
386 ranLoopCallbacks = runLoopCallbacks();
387
388 if (enableTimeMeasurement_) {
389 auto now = std::chrono::steady_clock::now();
390 busy = std::chrono::duration_cast<std::chrono::microseconds>(
391 now - startWork_);
392 idle = std::chrono::duration_cast<std::chrono::microseconds>(
393 startWork_ - idleStart);
394 auto loop_time = busy + idle;
395
396 avgLoopTime_.addSample(loop_time, busy);
397 maxLatencyLoopTime_.addSample(loop_time, busy);
398
399 if (observer_) {
400 if (observerSampleCount_++ == observer_->getSampleRate()) {
401 observerSampleCount_ = 0;
402 observer_->loopSample(busy.count(), idle.count());
403 }
404 }
405
406 VLOG(11) << "EventBase " << this << " did not timeout "
407 << " loop time guess: " << loop_time.count()
408 << " idle time: " << idle.count()
409 << " busy time: " << busy.count()
410 << " avgLoopTime: " << avgLoopTime_.get()
411 << " maxLatencyLoopTime: " << maxLatencyLoopTime_.get()
412 << " maxLatency_: " << maxLatency_.count() << "us"
413 << " notificationQueueSize: " << getNotificationQueueSize()
414 << " nothingHandledYet(): " << nothingHandledYet();
415
416 if (maxLatency_ > std::chrono::microseconds::zero()) {
417 // see if our average loop time has exceeded our limit
418 if (dampenMaxLatency_ &&
419 (maxLatencyLoopTime_.get() > double(maxLatency_.count()))) {
420 maxLatencyCob_();
421 // back off temporarily -- don't keep spamming maxLatencyCob_
422 // if we're only a bit over the limit
423 maxLatencyLoopTime_.dampen(0.9);
424 } else if (!dampenMaxLatency_ && busy > maxLatency_) {
425 // If no damping, we compare the raw busy time
426 maxLatencyCob_();
427 }
428 }
429
430 // Our loop run did real work; reset the idle timer
431 idleStart = now;
432 } else {
433 VLOG(11) << "EventBase " << this << " did not timeout";
434 }
435
436 // Event loop indicated that there were no more events (NotificationQueue
437 // was registered as an internal event and there were no other registered
438 // events).
439 if (res != 0) {
440 // Since Notification Queue is marked 'internal' some events may not have
441 // run. Run them manually if so, and continue looping.
442 //
443 if (!queue_->empty()) {
444 ExecutionObserverScopeGuard guard(executionObserver_, queue_.get());
445 queue_->execute();
446 } else if (!ranLoopCallbacks) {
447 // If there were no more events and we also didn't have any loop
448 // callbacks to run, there is nothing left to do.
449 break;
450 }
451 }
452
453 if (enableTimeMeasurement_) {
454 VLOG(11) << "EventBase " << this
455 << " loop time: " << getTimeDelta(&prev).count();
456 }
457
458 if (once) {
459 break;
460 }
461 }
462 // Reset stop_ so loop() can be called again
463 stop_.store(false, std::memory_order_relaxed);
464
465 if (res < 0) {
466 LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
467 return false;
468 } else if (res == 1) {
469 VLOG(5) << "EventBase: ran out of events (exiting loop)!";
470 } else if (res > 1) {
471 LOG(ERROR) << "EventBase: unknown event loop result = " << res;
472 return false;
473 }
474
475 loopThread_.store({}, std::memory_order_release);
476
477 VLOG(5) << "EventBase(): Done with loop.";
478 return true;
479 }
480
loopKeepAliveCount()481 ssize_t EventBase::loopKeepAliveCount() {
482 if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) {
483 loopKeepAliveCount_ +=
484 loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed);
485 }
486 DCHECK_GE(loopKeepAliveCount_, 0);
487
488 return loopKeepAliveCount_;
489 }
490
applyLoopKeepAlive()491 void EventBase::applyLoopKeepAlive() {
492 auto keepAliveCount = loopKeepAliveCount();
493 // Make sure default VirtualEventBase won't hold EventBase::loop() forever.
494 if (auto virtualEventBase = tryGetVirtualEventBase()) {
495 if (virtualEventBase->keepAliveCount() == 1) {
496 --keepAliveCount;
497 }
498 }
499
500 if (loopKeepAliveActive_ && keepAliveCount == 0) {
501 // Restore the notification queue internal flag
502 queue_->stopConsuming();
503 queue_->startConsumingInternal(this);
504 loopKeepAliveActive_ = false;
505 } else if (!loopKeepAliveActive_ && keepAliveCount > 0) {
506 // Update the notification queue event to treat it as a normal
507 // (non-internal) event. The notification queue event always remains
508 // installed, and the main loop won't exit with it installed.
509 queue_->stopConsuming();
510 queue_->startConsuming(this);
511 loopKeepAliveActive_ = true;
512 }
513 }
514
loopForever()515 void EventBase::loopForever() {
516 bool ret;
517 {
518 SCOPE_EXIT { applyLoopKeepAlive(); };
519 // Make sure notification queue events are treated as normal events.
520 // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
521 // released inside a loop.
522 ++loopKeepAliveCount_;
523 SCOPE_EXIT { --loopKeepAliveCount_; };
524 ret = loop();
525 }
526
527 if (!ret) {
528 folly::throwSystemError("error in EventBase::loopForever()");
529 }
530 }
531
bumpHandlingTime()532 void EventBase::bumpHandlingTime() {
533 if (!enableTimeMeasurement_) {
534 return;
535 }
536
537 VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
538 << " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
539 if (nothingHandledYet()) {
540 latestLoopCnt_ = nextLoopCnt_;
541 // set the time
542 startWork_ = std::chrono::steady_clock::now();
543
544 VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
545 << " (loop) startWork_ " << startWork_.time_since_epoch().count();
546 }
547 }
548
terminateLoopSoon()549 void EventBase::terminateLoopSoon() {
550 VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
551
552 auto keepAlive = getKeepAliveToken(this);
553
554 // Set stop to true, so the event loop will know to exit.
555 stop_.store(true, std::memory_order_relaxed);
556
557 // If terminateLoopSoon() is called from another thread,
558 // the EventBase thread might be stuck waiting for events.
559 // In this case, it won't wake up and notice that stop_ is set until it
560 // receives another event. Send an empty frame to the notification queue
561 // so that the event loop will wake up even if there are no other events.
562 try {
563 queue_->putMessage([] {});
564 } catch (...) {
565 // putMessage() can only fail when the queue is draining in ~EventBase.
566 }
567 }
568
runInLoop(LoopCallback * callback,bool thisIteration,std::shared_ptr<RequestContext> rctx)569 void EventBase::runInLoop(
570 LoopCallback* callback,
571 bool thisIteration,
572 std::shared_ptr<RequestContext> rctx) {
573 dcheckIsInEventBaseThread();
574 callback->cancelLoopCallback();
575 callback->context_ = std::move(rctx);
576 if (runOnceCallbacks_ != nullptr && thisIteration) {
577 runOnceCallbacks_->push_back(*callback);
578 } else {
579 loopCallbacks_.push_back(*callback);
580 }
581 }
582
runInLoop(Func cob,bool thisIteration)583 void EventBase::runInLoop(Func cob, bool thisIteration) {
584 dcheckIsInEventBaseThread();
585 auto wrapper = new FunctionLoopCallback(std::move(cob));
586 wrapper->context_ = RequestContext::saveContext();
587 if (runOnceCallbacks_ != nullptr && thisIteration) {
588 runOnceCallbacks_->push_back(*wrapper);
589 } else {
590 loopCallbacks_.push_back(*wrapper);
591 }
592 }
593
runOnDestruction(OnDestructionCallback & callback)594 void EventBase::runOnDestruction(OnDestructionCallback& callback) {
595 callback.schedule(
596 [this](auto& cb) { onDestructionCallbacks_.wlock()->push_back(cb); },
597 [this](auto& cb) {
598 onDestructionCallbacks_.withWLock(
599 [&](auto& list) { list.erase(list.iterator_to(cb)); });
600 });
601 }
602
runOnDestruction(Func f)603 void EventBase::runOnDestruction(Func f) {
604 auto* callback = new FunctionOnDestructionCallback(std::move(f));
605 runOnDestruction(*callback);
606 }
607
runBeforeLoop(LoopCallback * callback)608 void EventBase::runBeforeLoop(LoopCallback* callback) {
609 dcheckIsInEventBaseThread();
610 callback->cancelLoopCallback();
611 runBeforeLoopCallbacks_.push_back(*callback);
612 }
613
runInEventBaseThread(Func fn)614 void EventBase::runInEventBaseThread(Func fn) noexcept {
615 // Send the message.
616 // It will be received by the FunctionRunner in the EventBase's thread.
617
618 // We try not to schedule nullptr callbacks
619 if (!fn) {
620 DLOG(FATAL) << "EventBase " << this
621 << ": Scheduling nullptr callbacks is not allowed";
622 return;
623 }
624
625 // Short-circuit if we are already in our event base
626 if (inRunningEventBaseThread()) {
627 runInLoop(std::move(fn));
628 return;
629 }
630
631 queue_->putMessage(std::move(fn));
632 }
633
runInEventBaseThreadAlwaysEnqueue(Func fn)634 void EventBase::runInEventBaseThreadAlwaysEnqueue(Func fn) noexcept {
635 // Send the message.
636 // It will be received by the FunctionRunner in the EventBase's thread.
637
638 // We try not to schedule nullptr callbacks
639 if (!fn) {
640 LOG(DFATAL) << "EventBase " << this
641 << ": Scheduling nullptr callbacks is not allowed";
642 return;
643 }
644
645 queue_->putMessage(std::move(fn));
646 }
647
runInEventBaseThreadAndWait(Func fn)648 void EventBase::runInEventBaseThreadAndWait(Func fn) noexcept {
649 if (inRunningEventBaseThread()) {
650 LOG(DFATAL) << "EventBase " << this << ": Waiting in the event loop is not "
651 << "allowed";
652 return;
653 }
654
655 Baton<> ready;
656 runInEventBaseThread([&ready, fn = std::move(fn)]() mutable {
657 SCOPE_EXIT { ready.post(); };
658 // A trick to force the stored functor to be executed and then destructed
659 // before posting the baton and waking the waiting thread.
660 copy(std::move(fn))();
661 });
662 ready.wait(folly::Baton<>::wait_options().logging_enabled(false));
663 }
664
runImmediatelyOrRunInEventBaseThreadAndWait(Func fn)665 void EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) noexcept {
666 if (isInEventBaseThread()) {
667 fn();
668 } else {
669 runInEventBaseThreadAndWait(std::move(fn));
670 }
671 }
672
runLoopCallbacks(LoopCallbackList & currentCallbacks)673 void EventBase::runLoopCallbacks(LoopCallbackList& currentCallbacks) {
674 while (!currentCallbacks.empty()) {
675 LoopCallback* callback = ¤tCallbacks.front();
676 currentCallbacks.pop_front();
677 folly::RequestContextScopeGuard rctx(std::move(callback->context_));
678 ExecutionObserverScopeGuard guard(executionObserver_, callback);
679 callback->runLoopCallback();
680 }
681 }
682
runLoopCallbacks()683 bool EventBase::runLoopCallbacks() {
684 bumpHandlingTime();
685 if (!loopCallbacks_.empty()) {
686 // Swap the loopCallbacks_ list with a temporary list on our stack.
687 // This way we will only run callbacks scheduled at the time
688 // runLoopCallbacks() was invoked.
689 //
690 // If any of these callbacks in turn call runInLoop() to schedule more
691 // callbacks, those new callbacks won't be run until the next iteration
692 // around the event loop. This prevents runInLoop() callbacks from being
693 // able to start file descriptor and timeout based events.
694 LoopCallbackList currentCallbacks;
695 currentCallbacks.swap(loopCallbacks_);
696 runOnceCallbacks_ = ¤tCallbacks;
697
698 runLoopCallbacks(currentCallbacks);
699
700 runOnceCallbacks_ = nullptr;
701 return true;
702 }
703 return false;
704 }
705
initNotificationQueue()706 void EventBase::initNotificationQueue() {
707 // Infinite size queue
708 queue_ =
709 std::make_unique<EventBaseAtomicNotificationQueue<Func, FuncRunner>>();
710
711 // Mark this as an internal event, so event_base_loop() will return if
712 // there are no other events besides this one installed.
713 //
714 // Most callers don't care about the internal notification queue used by
715 // EventBase. The queue is always installed, so if we did count the queue as
716 // an active event, loop() would never exit with no more events to process.
717 // Users can use loopForever() if they do care about the notification queue.
718 // (This is useful for EventBase threads that do nothing but process
719 // runInEventBaseThread() notifications.)
720 queue_->startConsumingInternal(this);
721 }
722
setTimeInterval(std::chrono::microseconds timeInterval)723 void EventBase::SmoothLoopTime::setTimeInterval(
724 std::chrono::microseconds timeInterval) {
725 expCoeff_ = -1.0 / static_cast<double>(timeInterval.count());
726 VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
727 }
728
reset(double value)729 void EventBase::SmoothLoopTime::reset(double value) {
730 value_ = value;
731 }
732
addSample(std::chrono::microseconds total,std::chrono::microseconds busy)733 void EventBase::SmoothLoopTime::addSample(
734 std::chrono::microseconds total, std::chrono::microseconds busy) {
735 if ((buffer_time_ + total) > buffer_interval_ && buffer_cnt_ > 0) {
736 // See https://en.wikipedia.org/wiki/Exponential_smoothing for
737 // more info on this calculation.
738 double coeff = exp(static_cast<double>(buffer_time_.count()) * expCoeff_);
739 value_ = value_ * coeff +
740 (1.0 - coeff) *
741 (static_cast<double>(busy_buffer_.count()) / buffer_cnt_);
742 buffer_time_ = std::chrono::microseconds{0};
743 busy_buffer_ = std::chrono::microseconds{0};
744 buffer_cnt_ = 0;
745 }
746 buffer_time_ += total;
747 busy_buffer_ += busy;
748 buffer_cnt_++;
749 }
750
nothingHandledYet() const751 bool EventBase::nothingHandledYet() const noexcept {
752 VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
753 return (nextLoopCnt_ != latestLoopCnt_);
754 }
755
attachTimeoutManager(AsyncTimeout * obj,InternalEnum internal)756 void EventBase::attachTimeoutManager(AsyncTimeout* obj, InternalEnum internal) {
757 auto* ev = obj->getEvent();
758 assert(ev->eb_ev_base() == nullptr);
759
760 ev->eb_event_base_set(this);
761 if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
762 // Set the EVLIST_INTERNAL flag
763 event_ref_flags(ev->getEvent()) |= EVLIST_INTERNAL;
764 }
765 }
766
detachTimeoutManager(AsyncTimeout * obj)767 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
768 cancelTimeout(obj);
769 auto* ev = obj->getEvent();
770 ev->eb_ev_base(nullptr);
771 }
772
scheduleTimeout(AsyncTimeout * obj,TimeoutManager::timeout_type timeout)773 bool EventBase::scheduleTimeout(
774 AsyncTimeout* obj, TimeoutManager::timeout_type timeout) {
775 dcheckIsInEventBaseThread();
776 // Set up the timeval and add the event
777 struct timeval tv;
778 tv.tv_sec = long(timeout.count() / 1000LL);
779 tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL);
780
781 auto* ev = obj->getEvent();
782
783 DCHECK(ev->eb_ev_base());
784
785 if (ev->eb_event_add(&tv) < 0) {
786 LOG(ERROR) << "EventBase: failed to schedule timeout: " << errnoStr(errno);
787 return false;
788 }
789
790 return true;
791 }
792
cancelTimeout(AsyncTimeout * obj)793 void EventBase::cancelTimeout(AsyncTimeout* obj) {
794 dcheckIsInEventBaseThread();
795 auto* ev = obj->getEvent();
796 if (ev->isEventRegistered()) {
797 ev->eb_event_del();
798 }
799 }
800
setName(const std::string & name)801 void EventBase::setName(const std::string& name) {
802 dcheckIsInEventBaseThread();
803 name_ = name;
804
805 if (isRunning()) {
806 setThreadName(loopThread_.load(std::memory_order_relaxed), name_);
807 }
808 }
809
getName()810 const std::string& EventBase::getName() {
811 dcheckIsInEventBaseThread();
812 return name_;
813 }
814
scheduleAt(Func && fn,TimePoint const & timeout)815 void EventBase::scheduleAt(Func&& fn, TimePoint const& timeout) {
816 auto duration = timeout - now();
817 timer().scheduleTimeoutFn(
818 std::move(fn),
819 std::chrono::duration_cast<std::chrono::milliseconds>(duration));
820 }
821
getLibeventBase() const822 event_base* EventBase::getLibeventBase() const {
823 return evb_ ? (evb_->getEventBase()) : nullptr;
824 }
825
getLibeventVersion()826 const char* EventBase::getLibeventVersion() {
827 return event_get_version();
828 }
getLibeventMethod()829 const char* EventBase::getLibeventMethod() {
830 // event_base_method() would segv if there is no current_base so simulate it
831 struct op {
832 const char* name;
833 };
834 struct base {
835 const op* evsel;
836 };
837 auto b = reinterpret_cast<base*>(getLibeventBase());
838 return !b ? "" : b->evsel->name;
839 }
840
getVirtualEventBase()841 VirtualEventBase& EventBase::getVirtualEventBase() {
842 folly::call_once(virtualEventBaseInitFlag_, [&] {
843 virtualEventBase_ = std::make_unique<VirtualEventBase>(*this);
844 });
845
846 return *virtualEventBase_;
847 }
848
tryGetVirtualEventBase()849 VirtualEventBase* EventBase::tryGetVirtualEventBase() {
850 if (folly::test_once(virtualEventBaseInitFlag_)) {
851 return virtualEventBase_.get();
852 }
853 return nullptr;
854 }
855
getEventBase()856 EventBase* EventBase::getEventBase() {
857 return this;
858 }
859
~OnDestructionCallback()860 EventBase::OnDestructionCallback::~OnDestructionCallback() {
861 if (*scheduled_.rlock()) {
862 LOG(FATAL)
863 << "OnDestructionCallback must be canceled if needed prior to destruction";
864 }
865 }
866
runCallback()867 void EventBase::OnDestructionCallback::runCallback() noexcept {
868 scheduled_.withWLock([&](bool& scheduled) {
869 CHECK(scheduled);
870 scheduled = false;
871
872 // run can only be called by EventBase and VirtualEventBase, and it's called
873 // after the callback has been popped off the list.
874 eraser_ = nullptr;
875
876 // Note that the exclusive lock on shared state is held while the callback
877 // runs. This ensures concurrent callers to cancel() block until the
878 // callback finishes.
879 onEventBaseDestruction();
880 });
881 }
882
schedule(FunctionRef<void (OnDestructionCallback &)> linker,Function<void (OnDestructionCallback &)> eraser)883 void EventBase::OnDestructionCallback::schedule(
884 FunctionRef<void(OnDestructionCallback&)> linker,
885 Function<void(OnDestructionCallback&)> eraser) {
886 eraser_ = std::move(eraser);
887 scheduled_.withWLock([](bool& scheduled) { scheduled = true; });
888 linker(*this);
889 }
890
cancel()891 bool EventBase::OnDestructionCallback::cancel() {
892 return scheduled_.withWLock([this](bool& scheduled) {
893 const bool wasScheduled = std::exchange(scheduled, false);
894 if (wasScheduled) {
895 auto eraser = std::move(eraser_);
896 CHECK(eraser);
897 eraser(*this);
898 }
899 return wasScheduled;
900 });
901 }
902
903 constexpr std::chrono::milliseconds EventBase::SmoothLoopTime::buffer_interval_;
904
905 } // namespace folly
906