1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #pragma once
18
19 #include <sys/types.h>
20
21 #include <algorithm>
22 #include <iterator>
23 #include <memory>
24 #include <stdexcept>
25 #include <utility>
26
27 #include <boost/intrusive/slist.hpp>
28 #include <glog/logging.h>
29
30 #include <folly/Exception.h>
31 #include <folly/FileUtil.h>
32 #include <folly/Likely.h>
33 #include <folly/ScopeGuard.h>
34 #include <folly/SpinLock.h>
35 #include <folly/io/async/DelayedDestruction.h>
36 #include <folly/io/async/EventBase.h>
37 #include <folly/io/async/EventHandler.h>
38 #include <folly/io/async/Request.h>
39 #include <folly/portability/Fcntl.h>
40 #include <folly/portability/Sockets.h>
41 #include <folly/portability/Unistd.h>
42 #include <folly/system/Pid.h>
43
44 #if __has_include(<sys/eventfd.h>)
45 #include <sys/eventfd.h>
46 #endif
47
48 namespace folly {
49
50 /**
51 * A producer-consumer queue for passing messages between EventBase threads.
52 *
53 * Messages can be added to the queue from any thread. Multiple consumers may
54 * listen to the queue from multiple EventBase threads.
55 *
56 * A NotificationQueue may not be destroyed while there are still consumers
57 * registered to receive events from the queue. It is the user's
58 * responsibility to ensure that all consumers are unregistered before the
59 * queue is destroyed.
60 *
61 * MessageT should be MoveConstructible (i.e., must support either a move
62 * constructor or a copy constructor, or both). Ideally it's move constructor
63 * (or copy constructor if no move constructor is provided) should never throw
64 * exceptions. If the constructor may throw, the consumers could end up
65 * spinning trying to move a message off the queue and failing, and then
66 * retrying.
67 */
68 template <typename MessageT>
69 class NotificationQueue {
70 struct Node : public boost::intrusive::slist_base_hook<
71 boost::intrusive::cache_last<true>> {
72 template <typename MessageTT>
NodeNode73 Node(MessageTT&& msg, std::shared_ptr<RequestContext> ctx)
74 : msg_(std::forward<MessageTT>(msg)), ctx_(std::move(ctx)) {}
75 MessageT msg_;
76 std::shared_ptr<RequestContext> ctx_;
77 };
78
79 public:
80 /**
81 * A callback interface for consuming messages from the queue as they arrive.
82 */
83 class Consumer : public DelayedDestruction, private EventHandler {
84 public:
85 using UniquePtr = DelayedDestructionUniquePtr<Consumer>;
86
87 enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
88
Consumer()89 Consumer()
90 : queue_(nullptr),
91 destroyedFlagPtr_(nullptr),
92 maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
93
94 // create a consumer in-place, without the need to build new class
95 template <typename TCallback>
96 static UniquePtr make(TCallback&& callback);
97
98 /**
99 * messageAvailable() will be invoked whenever a new
100 * message is available from the pipe.
101 */
102 virtual void messageAvailable(MessageT&& message) noexcept = 0;
103
104 /**
105 * Begin consuming messages from the specified queue.
106 *
107 * messageAvailable() will be called whenever a message is available. This
108 * consumer will continue to consume messages until stopConsuming() is
109 * called.
110 *
111 * A Consumer may only consume messages from a single NotificationQueue at
112 * a time. startConsuming() should not be called if this consumer is
113 * already consuming.
114 */
startConsuming(EventBase * eventBase,NotificationQueue * queue)115 void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
116 init(eventBase, queue);
117 registerHandler(READ | PERSIST);
118 }
119
120 /**
121 * Same as above but registers this event handler as internal so that it
122 * doesn't count towards the pending reader count for the IOLoop.
123 */
startConsumingInternal(EventBase * eventBase,NotificationQueue * queue)124 void startConsumingInternal(
125 EventBase* eventBase, NotificationQueue* queue) {
126 init(eventBase, queue);
127 registerInternalHandler(READ | PERSIST);
128 }
129
130 /**
131 * Stop consuming messages.
132 *
133 * startConsuming() may be called again to resume consumption of messages
134 * at a later point in time.
135 */
136 void stopConsuming();
137
138 /**
139 * Consume messages off the queue until it is empty. No messages may be
140 * added to the queue while it is draining, so that the process is bounded.
141 * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
142 * and tryPutMessageNoThrow will return false.
143 *
144 * @returns true if the queue was drained, false otherwise. In practice,
145 * this will only fail if someone else is already draining the queue.
146 */
147 bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
148
149 /**
150 * Get the NotificationQueue that this consumer is currently consuming
151 * messages from. Returns nullptr if the consumer is not currently
152 * consuming events from any queue.
153 */
getCurrentQueue()154 NotificationQueue* getCurrentQueue() const { return queue_; }
155
156 /**
157 * Set a limit on how many messages this consumer will read each iteration
158 * around the event loop.
159 *
160 * This helps rate-limit how much work the Consumer will do each event loop
161 * iteration, to prevent it from starving other event handlers.
162 *
163 * A limit of 0 means no limit will be enforced. If unset, the limit
164 * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
165 */
setMaxReadAtOnce(uint32_t maxAtOnce)166 void setMaxReadAtOnce(uint32_t maxAtOnce) { maxReadAtOnce_ = maxAtOnce; }
getMaxReadAtOnce()167 uint32_t getMaxReadAtOnce() const { return maxReadAtOnce_; }
168
getEventBase()169 EventBase* getEventBase() { return base_; }
170
171 void handlerReady(uint16_t events) noexcept override;
172
173 protected:
174 void destroy() override;
175
~Consumer()176 ~Consumer() override {}
177
178 private:
179 /**
180 * Consume messages off the queue until
181 * - the queue is empty (1), or
182 * - until the consumer is destroyed, or
183 * - until the consumer is uninstalled, or
184 * - an exception is thrown in the course of dequeueing, or
185 * - unless isDrain is true, until the maxReadAtOnce_ limit is hit
186 *
187 * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
188 */
189 void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
190
191 void setActive(bool active, bool shouldLock = false) {
192 if (!queue_) {
193 active_ = active;
194 return;
195 }
196 if (shouldLock) {
197 queue_->spinlock_.lock();
198 }
199 if (!active_ && active) {
200 ++queue_->numActiveConsumers_;
201 } else if (active_ && !active) {
202 --queue_->numActiveConsumers_;
203 }
204 active_ = active;
205 if (shouldLock) {
206 queue_->spinlock_.unlock();
207 }
208 }
209 void init(EventBase* eventBase, NotificationQueue* queue);
210
211 NotificationQueue* queue_;
212 bool* destroyedFlagPtr_;
213 uint32_t maxReadAtOnce_;
214 EventBase* base_;
215 bool active_{false};
216 };
217
218 class SimpleConsumer {
219 public:
SimpleConsumer(NotificationQueue & queue)220 explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) {
221 ++queue_.numConsumers_;
222 }
223
~SimpleConsumer()224 ~SimpleConsumer() { --queue_.numConsumers_; }
225
getFd()226 int getFd() const {
227 return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0];
228 }
229
230 template <typename F>
231 void consume(F&& f);
232
233 private:
234 NotificationQueue& queue_;
235 };
236
237 enum class FdType {
238 PIPE = 1,
239 EVENTFD,
240 };
241
242 /**
243 * Create a new NotificationQueue.
244 *
245 * If the maxSize parameter is specified, this sets the maximum queue size
246 * that will be enforced by tryPutMessage(). (This size is advisory, and may
247 * be exceeded if producers explicitly use putMessage() instead of
248 * tryPutMessage().)
249 *
250 * The fdType parameter determines the type of file descriptor used
251 * internally to signal message availability. The default (eventfd) is
252 * preferable for performance and because it won't fail when the queue gets
253 * too long. It is not available on on older and non-linux kernels, however.
254 * In this case the code will fall back to using a pipe, the parameter is
255 * mostly for testing purposes.
256 */
257 explicit NotificationQueue(
258 uint32_t maxSize = 0, FdType fdType = FdType::EVENTFD)
259 : eventfd_(-1),
260 pipeFds_{-1, -1},
261 advisoryMaxQueueSize_(maxSize),
262 pid_(folly::get_cached_pid()) {
263 #if !__has_include(<sys/eventfd.h>)
264 if (fdType == FdType::EVENTFD) {
265 fdType = FdType::PIPE;
266 }
267 #endif
268
269 #if __has_include(<sys/eventfd.h>)
270 if (fdType == FdType::EVENTFD) {
271 eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
272 if (eventfd_ == -1) {
273 if (errno == ENOSYS || errno == EINVAL) {
274 // eventfd not availalble
275 LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
276 << errno << ", falling back to pipe mode (is your kernel "
277 << "> 2.6.30?)";
278 fdType = FdType::PIPE;
279 } else {
280 // some other error
281 folly::throwSystemError(
282 "Failed to create eventfd for "
283 "NotificationQueue",
284 errno);
285 }
286 }
287 }
288 #endif
289
290 if (fdType == FdType::PIPE) {
291 if (pipe(pipeFds_)) {
292 folly::throwSystemError(
293 "Failed to create pipe for NotificationQueue", errno);
294 }
295 try {
296 // put both ends of the pipe into non-blocking mode
297 if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
298 folly::throwSystemError(
299 "failed to put NotificationQueue pipe read "
300 "endpoint into non-blocking mode",
301 errno);
302 }
303 if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
304 folly::throwSystemError(
305 "failed to put NotificationQueue pipe write "
306 "endpoint into non-blocking mode",
307 errno);
308 }
catch(...)309 } catch (...) {
310 ::close(pipeFds_[0]);
311 ::close(pipeFds_[1]);
312 throw;
313 }
314 }
315 }
316
~NotificationQueue()317 ~NotificationQueue() {
318 std::unique_ptr<Node> data;
319 while (!queue_.empty()) {
320 data.reset(&queue_.front());
321 queue_.pop_front();
322 }
323 if (eventfd_ >= 0) {
324 ::close(eventfd_);
325 eventfd_ = -1;
326 }
327 if (pipeFds_[0] >= 0) {
328 ::close(pipeFds_[0]);
329 pipeFds_[0] = -1;
330 }
331 if (pipeFds_[1] >= 0) {
332 ::close(pipeFds_[1]);
333 pipeFds_[1] = -1;
334 }
335 }
336
337 /**
338 * Set the advisory maximum queue size.
339 *
340 * This maximum queue size affects calls to tryPutMessage(). Message
341 * producers can still use the putMessage() call to unconditionally put a
342 * message on the queue, ignoring the configured maximum queue size. This
343 * can cause the queue size to exceed the configured maximum.
344 */
setMaxQueueSize(uint32_t max)345 void setMaxQueueSize(uint32_t max) { advisoryMaxQueueSize_ = max; }
346
347 /**
348 * Attempt to put a message on the queue if the queue is not already full.
349 *
350 * If the queue is full, a std::overflow_error will be thrown. The
351 * setMaxQueueSize() function controls the maximum queue size.
352 *
353 * If the queue is currently draining, an std::runtime_error will be thrown.
354 *
355 * This method may contend briefly on a spinlock if many threads are
356 * concurrently accessing the queue, but for all intents and purposes it will
357 * immediately place the message on the queue and return.
358 *
359 * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
360 * may throw any other exception thrown by the MessageT move/copy
361 * constructor.
362 */
363 template <typename MessageTT>
tryPutMessage(MessageTT && message)364 void tryPutMessage(MessageTT&& message) {
365 putMessageImpl(std::forward<MessageTT>(message), advisoryMaxQueueSize_);
366 }
367
368 /**
369 * No-throw versions of the above. Instead returns true on success, false on
370 * failure.
371 *
372 * Only std::overflow_error (the common exception case) and std::runtime_error
373 * (which indicates that the queue is being drained) are prevented from being
374 * thrown. User code must still catch std::bad_alloc errors.
375 */
376 template <typename MessageTT>
tryPutMessageNoThrow(MessageTT && message)377 bool tryPutMessageNoThrow(MessageTT&& message) {
378 return putMessageImpl(
379 std::forward<MessageTT>(message), advisoryMaxQueueSize_, false);
380 }
381
382 /**
383 * Unconditionally put a message on the queue.
384 *
385 * This method is like tryPutMessage(), but ignores the maximum queue size
386 * and always puts the message on the queue, even if the maximum queue size
387 * would be exceeded.
388 *
389 * putMessage() may throw
390 * - std::bad_alloc if memory allocation fails, and may
391 * - std::runtime_error if the queue is currently draining
392 * - any other exception thrown by the MessageT move/copy constructor.
393 */
394 template <typename MessageTT>
putMessage(MessageTT && message)395 void putMessage(MessageTT&& message) {
396 putMessageImpl(std::forward<MessageTT>(message), 0);
397 }
398
399 /**
400 * Put several messages on the queue.
401 */
402 template <typename InputIteratorT>
putMessages(InputIteratorT first,InputIteratorT last)403 void putMessages(InputIteratorT first, InputIteratorT last) {
404 typedef typename std::iterator_traits<InputIteratorT>::iterator_category
405 IterCategory;
406 putMessagesImpl(first, last, IterCategory());
407 }
408
409 /**
410 * Try to immediately pull a message off of the queue, without blocking.
411 *
412 * If a message is immediately available, the result parameter will be
413 * updated to contain the message contents and true will be returned.
414 *
415 * If no message is available, false will be returned and result will be left
416 * unmodified.
417 */
tryConsume(MessageT & result)418 bool tryConsume(MessageT& result) {
419 SCOPE_EXIT { syncSignalAndQueue(); };
420
421 checkPid();
422 std::unique_ptr<Node> data;
423
424 {
425 std::unique_lock<SpinLock> g(spinlock_);
426
427 if (UNLIKELY(queue_.empty())) {
428 return false;
429 }
430
431 data.reset(&queue_.front());
432 queue_.pop_front();
433 }
434
435 result = std::move(data->msg_);
436 RequestContext::setContext(std::move(data->ctx_));
437
438 return true;
439 }
440
size()441 size_t size() const {
442 std::unique_lock<SpinLock> g(spinlock_);
443 return queue_.size();
444 }
445
446 /**
447 * Check that the NotificationQueue is being used from the correct process.
448 *
449 * If you create a NotificationQueue in one process, then fork, and try to
450 * send messages to the queue from the child process, you're going to have a
451 * bad time. Unfortunately users have (accidentally) run into this.
452 *
453 * Because we use an eventfd/pipe, the child process can actually signal the
454 * parent process that an event is ready. However, it can't put anything on
455 * the parent's queue, so the parent wakes up and finds an empty queue. This
456 * check ensures that we catch the problem in the misbehaving child process
457 * code, and crash before signalling the parent process.
458 */
checkPid()459 void checkPid() const {
460 if (FOLLY_UNLIKELY(pid_ != folly::get_cached_pid())) {
461 checkPidFail();
462 }
463 }
464
465 private:
466 // Forbidden copy constructor and assignment operator
467 NotificationQueue(NotificationQueue const&) = delete;
468 NotificationQueue& operator=(NotificationQueue const&) = delete;
469
470 inline bool checkQueueSize(size_t maxSize, bool throws = true) const {
471 DCHECK(0 == spinlock_.try_lock());
472 if (maxSize > 0 && queue_.size() >= maxSize) {
473 if (throws) {
474 throw std::overflow_error(
475 "unable to add message to NotificationQueue: "
476 "queue is full");
477 }
478 return false;
479 }
480 return true;
481 }
482
483 inline bool checkDraining(bool throws = true) {
484 if (UNLIKELY(draining_ && throws)) {
485 throw std::runtime_error("queue is draining, cannot add message");
486 }
487 return draining_;
488 }
489
ensureSignalLocked()490 void ensureSignalLocked() const {
491 // semantics: empty fd == empty queue <=> !signal_
492 if (signal_) {
493 return;
494 }
495
496 ssize_t bytes_written = 0;
497 size_t bytes_expected = 0;
498
499 do {
500 if (eventfd_ >= 0) {
501 // eventfd(2) dictates that we must write a 64-bit integer
502 uint64_t signal = 1;
503 bytes_expected = sizeof(signal);
504 bytes_written = ::write(eventfd_, &signal, bytes_expected);
505 } else {
506 uint8_t signal = 1;
507 bytes_expected = sizeof(signal);
508 bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
509 }
510 } while (bytes_written == -1 && errno == EINTR);
511
512 if (bytes_written == ssize_t(bytes_expected)) {
513 signal_ = true;
514 } else {
515 folly::throwSystemError(
516 "failed to signal NotificationQueue after "
517 "write",
518 errno);
519 }
520 }
521
drainSignalsLocked()522 void drainSignalsLocked() {
523 ssize_t bytes_read = 0;
524 if (eventfd_ > 0) {
525 uint64_t message;
526 bytes_read = readNoInt(eventfd_, &message, sizeof(message));
527 CHECK(bytes_read != -1 || errno == EAGAIN);
528 } else {
529 // There should only be one byte in the pipe. To avoid potential leaks we
530 // still drain.
531 uint8_t message[32];
532 ssize_t result;
533 while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) !=
534 -1) {
535 bytes_read += result;
536 }
537 CHECK(result == -1 && errno == EAGAIN);
538 LOG_IF(ERROR, bytes_read > 1)
539 << "[NotificationQueue] Unexpected state while draining pipe: bytes_read="
540 << bytes_read << " bytes, expected <= 1";
541 }
542 LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0))
543 << "[NotificationQueue] Unexpected state while draining signals: signal_="
544 << signal_ << " bytes_read=" << bytes_read;
545
546 signal_ = false;
547 }
548
ensureSignal()549 void ensureSignal() const {
550 std::unique_lock<SpinLock> g(spinlock_);
551 ensureSignalLocked();
552 }
553
syncSignalAndQueue()554 void syncSignalAndQueue() {
555 std::unique_lock<SpinLock> g(spinlock_);
556
557 if (queue_.empty()) {
558 drainSignalsLocked();
559 } else {
560 ensureSignalLocked();
561 }
562 }
563
564 template <typename MessageTT>
565 bool putMessageImpl(MessageTT&& message, size_t maxSize, bool throws = true) {
566 checkPid();
567 bool signal = false;
568 {
569 auto data = std::make_unique<Node>(
570 std::forward<MessageTT>(message), RequestContext::saveContext());
571 std::unique_lock<SpinLock> g(spinlock_);
572 if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
573 return false;
574 }
575 // We only need to signal an event if not all consumers are
576 // awake.
577 if (numActiveConsumers_ < numConsumers_) {
578 signal = true;
579 }
580 queue_.push_back(*data.release());
581 if (signal) {
582 ensureSignalLocked();
583 }
584 }
585 return true;
586 }
587
588 template <typename InputIteratorT>
putMessagesImpl(InputIteratorT first,InputIteratorT last,std::input_iterator_tag)589 void putMessagesImpl(
590 InputIteratorT first, InputIteratorT last, std::input_iterator_tag) {
591 checkPid();
592 bool signal = false;
593 boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> q;
594 try {
595 while (first != last) {
596 auto data = std::make_unique<Node>(
597 std::move(*first), RequestContext::saveContext());
598 q.push_back(*data.release());
599 ++first;
600 }
601 std::unique_lock<SpinLock> g(spinlock_);
602 checkDraining();
603 queue_.splice(queue_.end(), q);
604 if (numActiveConsumers_ < numConsumers_) {
605 signal = true;
606 }
607 if (signal) {
608 ensureSignalLocked();
609 }
610 } catch (...) {
611 std::unique_ptr<Node> data;
612 while (!q.empty()) {
613 data.reset(&q.front());
614 q.pop_front();
615 }
616 throw;
617 }
618 }
619
checkPidFail()620 FOLLY_NOINLINE void checkPidFail() const {
621 folly::terminate_with<std::runtime_error>(
622 "Pid mismatch. Pid = " +
623 folly::to<std::string>(folly::get_cached_pid()) + ". Expecting " +
624 folly::to<std::string>(pid_));
625 }
626
627 mutable folly::SpinLock spinlock_;
628 mutable bool signal_{false};
629 int eventfd_;
630 int pipeFds_[2]; // to fallback to on older/non-linux systems
631 uint32_t advisoryMaxQueueSize_;
632 pid_t pid_;
633 boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> queue_;
634 int numConsumers_{0};
635 std::atomic<int> numActiveConsumers_{0};
636 bool draining_{false};
637 };
638
639 template <typename MessageT>
destroy()640 void NotificationQueue<MessageT>::Consumer::destroy() {
641 // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
642 // will be non-nullptr. Mark the value that it points to, so that
643 // handlerReady() will know the callback is destroyed, and that it cannot
644 // access any member variables anymore.
645 if (destroyedFlagPtr_) {
646 *destroyedFlagPtr_ = true;
647 }
648 stopConsuming();
649 DelayedDestruction::destroy();
650 }
651
652 template <typename MessageT>
handlerReady(uint16_t)653 void NotificationQueue<MessageT>::Consumer::handlerReady(
654 uint16_t /*events*/) noexcept {
655 consumeMessages(false);
656 }
657
658 template <typename MessageT>
consumeMessages(bool isDrain,size_t * numConsumed)659 void NotificationQueue<MessageT>::Consumer::consumeMessages(
660 bool isDrain, size_t* numConsumed) noexcept {
661 DestructorGuard dg(this);
662 uint32_t numProcessed = 0;
663 setActive(true);
664 SCOPE_EXIT {
665 if (queue_) {
666 queue_->syncSignalAndQueue();
667 }
668 };
669 SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
670 SCOPE_EXIT {
671 if (numConsumed != nullptr) {
672 *numConsumed = numProcessed;
673 }
674 };
675 while (true) {
676 // Now pop the message off of the queue.
677 //
678 // We have to manually acquire and release the spinlock here, rather than
679 // using SpinLockHolder since the MessageT has to be constructed while
680 // holding the spinlock and available after we release it. SpinLockHolder
681 // unfortunately doesn't provide a release() method. (We can't construct
682 // MessageT first since we have no guarantee that MessageT has a default
683 // constructor.
684 queue_->spinlock_.lock();
685 bool locked = true;
686
687 try {
688 if (UNLIKELY(queue_->queue_.empty())) {
689 // If there is no message, we've reached the end of the queue, return.
690 setActive(false);
691 queue_->spinlock_.unlock();
692 return;
693 }
694
695 // Pull a message off the queue.
696 std::unique_ptr<Node> data;
697 data.reset(&queue_->queue_.front());
698 queue_->queue_.pop_front();
699
700 // Check to see if the queue is empty now.
701 // We use this as an optimization to see if we should bother trying to
702 // loop again and read another message after invoking this callback.
703 bool wasEmpty = queue_->queue_.empty();
704 if (wasEmpty) {
705 setActive(false);
706 }
707
708 // Now unlock the spinlock before we invoke the callback.
709 queue_->spinlock_.unlock();
710 RequestContextScopeGuard rctx(std::move(data->ctx_));
711
712 locked = false;
713
714 // Call the callback
715 bool callbackDestroyed = false;
716 CHECK(destroyedFlagPtr_ == nullptr);
717 destroyedFlagPtr_ = &callbackDestroyed;
718 messageAvailable(std::move(data->msg_));
719 destroyedFlagPtr_ = nullptr;
720
721 // Make sure message destructor is called with the correct RequestContext.
722 data.reset();
723
724 // If the callback was destroyed before it returned, we are done
725 if (callbackDestroyed) {
726 return;
727 }
728
729 // If the callback is no longer installed, we are done.
730 if (queue_ == nullptr) {
731 return;
732 }
733
734 // If we have hit maxReadAtOnce_, we are done.
735 ++numProcessed;
736 if (!isDrain && maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
737 return;
738 }
739
740 // If the queue was empty before we invoked the callback, it's probable
741 // that it is still empty now. Just go ahead and return, rather than
742 // looping again and trying to re-read from the eventfd. (If a new
743 // message had in fact arrived while we were invoking the callback, we
744 // will simply be woken up the next time around the event loop and will
745 // process the message then.)
746 if (wasEmpty) {
747 return;
748 }
749 } catch (const std::exception&) {
750 // This catch block is really just to handle the case where the MessageT
751 // constructor throws. The messageAvailable() callback itself is
752 // declared as noexcept and should never throw.
753 //
754 // If the MessageT constructor does throw we try to handle it as best as
755 // we can, but we can't work miracles. We will just ignore the error for
756 // now and return. The next time around the event loop we will end up
757 // trying to read the message again. If MessageT continues to throw we
758 // will never make forward progress and will keep trying each time around
759 // the event loop.
760 if (locked) {
761 // Unlock the spinlock.
762 queue_->spinlock_.unlock();
763 }
764
765 return;
766 }
767 }
768 }
769
770 template <typename MessageT>
init(EventBase * eventBase,NotificationQueue * queue)771 void NotificationQueue<MessageT>::Consumer::init(
772 EventBase* eventBase, NotificationQueue* queue) {
773 eventBase->dcheckIsInEventBaseThread();
774 assert(queue_ == nullptr);
775 assert(!isHandlerRegistered());
776 queue->checkPid();
777
778 base_ = eventBase;
779
780 queue_ = queue;
781
782 {
783 std::unique_lock<SpinLock> g(queue_->spinlock_);
784 queue_->numConsumers_++;
785 }
786 queue_->ensureSignal();
787
788 if (queue_->eventfd_ >= 0) {
789 initHandler(eventBase, folly::NetworkSocket::fromFd(queue_->eventfd_));
790 } else {
791 initHandler(eventBase, folly::NetworkSocket::fromFd(queue_->pipeFds_[0]));
792 }
793 }
794
795 template <typename MessageT>
stopConsuming()796 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
797 if (queue_ == nullptr) {
798 assert(!isHandlerRegistered());
799 return;
800 }
801
802 {
803 std::unique_lock<SpinLock> g(queue_->spinlock_);
804 queue_->numConsumers_--;
805 setActive(false);
806 }
807
808 assert(isHandlerRegistered());
809 unregisterHandler();
810 detachEventBase();
811 queue_ = nullptr;
812 }
813
814 template <typename MessageT>
consumeUntilDrained(size_t * numConsumed)815 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
816 size_t* numConsumed) noexcept {
817 DestructorGuard dg(this);
818 {
819 std::unique_lock<SpinLock> g(queue_->spinlock_);
820 if (queue_->draining_) {
821 return false;
822 }
823 queue_->draining_ = true;
824 }
825 consumeMessages(true, numConsumed);
826 {
827 std::unique_lock<SpinLock> g(queue_->spinlock_);
828 queue_->draining_ = false;
829 }
830 return true;
831 }
832
833 template <typename MessageT>
834 template <typename F>
consume(F && foreach)835 void NotificationQueue<MessageT>::SimpleConsumer::consume(F&& foreach) {
836 SCOPE_EXIT { queue_.syncSignalAndQueue(); };
837
838 queue_.checkPid();
839
840 std::unique_ptr<Node> data;
841 {
842 std::unique_lock<SpinLock> g(queue_.spinlock_);
843
844 if (UNLIKELY(queue_.queue_.empty())) {
845 return;
846 }
847
848 data.reset(&queue_.queue_.front());
849 queue_.queue_.pop_front();
850 }
851
852 RequestContextScopeGuard rctx(std::move(data->ctx_));
853 foreach(std::move(data->msg_));
854 // Make sure message destructor is called with the correct RequestContext.
855 data.reset();
856 }
857
858 /**
859 * Creates a NotificationQueue::Consumer wrapping a function object
860 * Modeled after AsyncTimeout::make
861 *
862 */
863
864 namespace detail {
865
866 template <typename MessageT, typename TCallback>
867 struct notification_queue_consumer_wrapper
868 : public NotificationQueue<MessageT>::Consumer {
869 template <typename UCallback>
notification_queue_consumer_wrappernotification_queue_consumer_wrapper870 explicit notification_queue_consumer_wrapper(UCallback&& callback)
871 : callback_(std::forward<UCallback>(callback)) {}
872
873 // we are being stricter here and requiring noexcept for callback
messageAvailablenotification_queue_consumer_wrapper874 void messageAvailable(MessageT&& message) noexcept override {
875 static_assert(
876 noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))),
877 "callback must be declared noexcept, e.g.: `[]() noexcept {}`");
878
879 callback_(std::forward<MessageT>(message));
880 }
881
882 private:
883 TCallback callback_;
884 };
885
886 } // namespace detail
887
888 template <typename MessageT>
889 template <typename TCallback>
890 auto NotificationQueue<MessageT>::Consumer::make(TCallback&& callback)
891 -> UniquePtr {
892 using CB = typename std::decay<TCallback>::type;
893 using W = detail::notification_queue_consumer_wrapper<MessageT, CB>;
894 return makeDelayedDestructionUniquePtr<W>(std::forward<TCallback>(callback));
895 }
896
897 } // namespace folly
898