1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <sys/types.h>
20 
21 #include <algorithm>
22 #include <iterator>
23 #include <memory>
24 #include <stdexcept>
25 #include <utility>
26 
27 #include <boost/intrusive/slist.hpp>
28 #include <glog/logging.h>
29 
30 #include <folly/Exception.h>
31 #include <folly/FileUtil.h>
32 #include <folly/Likely.h>
33 #include <folly/ScopeGuard.h>
34 #include <folly/SpinLock.h>
35 #include <folly/io/async/DelayedDestruction.h>
36 #include <folly/io/async/EventBase.h>
37 #include <folly/io/async/EventHandler.h>
38 #include <folly/io/async/Request.h>
39 #include <folly/portability/Fcntl.h>
40 #include <folly/portability/Sockets.h>
41 #include <folly/portability/Unistd.h>
42 #include <folly/system/Pid.h>
43 
44 #if __has_include(<sys/eventfd.h>)
45 #include <sys/eventfd.h>
46 #endif
47 
48 namespace folly {
49 
50 /**
51  * A producer-consumer queue for passing messages between EventBase threads.
52  *
53  * Messages can be added to the queue from any thread.  Multiple consumers may
54  * listen to the queue from multiple EventBase threads.
55  *
56  * A NotificationQueue may not be destroyed while there are still consumers
57  * registered to receive events from the queue.  It is the user's
58  * responsibility to ensure that all consumers are unregistered before the
59  * queue is destroyed.
60  *
61  * MessageT should be MoveConstructible (i.e., must support either a move
62  * constructor or a copy constructor, or both).  Ideally it's move constructor
63  * (or copy constructor if no move constructor is provided) should never throw
64  * exceptions.  If the constructor may throw, the consumers could end up
65  * spinning trying to move a message off the queue and failing, and then
66  * retrying.
67  */
68 template <typename MessageT>
69 class NotificationQueue {
70   struct Node : public boost::intrusive::slist_base_hook<
71                     boost::intrusive::cache_last<true>> {
72     template <typename MessageTT>
NodeNode73     Node(MessageTT&& msg, std::shared_ptr<RequestContext> ctx)
74         : msg_(std::forward<MessageTT>(msg)), ctx_(std::move(ctx)) {}
75     MessageT msg_;
76     std::shared_ptr<RequestContext> ctx_;
77   };
78 
79  public:
80   /**
81    * A callback interface for consuming messages from the queue as they arrive.
82    */
83   class Consumer : public DelayedDestruction, private EventHandler {
84    public:
85     using UniquePtr = DelayedDestructionUniquePtr<Consumer>;
86 
87     enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
88 
Consumer()89     Consumer()
90         : queue_(nullptr),
91           destroyedFlagPtr_(nullptr),
92           maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
93 
94     // create a consumer in-place, without the need to build new class
95     template <typename TCallback>
96     static UniquePtr make(TCallback&& callback);
97 
98     /**
99      * messageAvailable() will be invoked whenever a new
100      * message is available from the pipe.
101      */
102     virtual void messageAvailable(MessageT&& message) noexcept = 0;
103 
104     /**
105      * Begin consuming messages from the specified queue.
106      *
107      * messageAvailable() will be called whenever a message is available.  This
108      * consumer will continue to consume messages until stopConsuming() is
109      * called.
110      *
111      * A Consumer may only consume messages from a single NotificationQueue at
112      * a time.  startConsuming() should not be called if this consumer is
113      * already consuming.
114      */
startConsuming(EventBase * eventBase,NotificationQueue * queue)115     void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
116       init(eventBase, queue);
117       registerHandler(READ | PERSIST);
118     }
119 
120     /**
121      * Same as above but registers this event handler as internal so that it
122      * doesn't count towards the pending reader count for the IOLoop.
123      */
startConsumingInternal(EventBase * eventBase,NotificationQueue * queue)124     void startConsumingInternal(
125         EventBase* eventBase, NotificationQueue* queue) {
126       init(eventBase, queue);
127       registerInternalHandler(READ | PERSIST);
128     }
129 
130     /**
131      * Stop consuming messages.
132      *
133      * startConsuming() may be called again to resume consumption of messages
134      * at a later point in time.
135      */
136     void stopConsuming();
137 
138     /**
139      * Consume messages off the queue until it is empty. No messages may be
140      * added to the queue while it is draining, so that the process is bounded.
141      * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
142      * and tryPutMessageNoThrow will return false.
143      *
144      * @returns true if the queue was drained, false otherwise. In practice,
145      * this will only fail if someone else is already draining the queue.
146      */
147     bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
148 
149     /**
150      * Get the NotificationQueue that this consumer is currently consuming
151      * messages from.  Returns nullptr if the consumer is not currently
152      * consuming events from any queue.
153      */
getCurrentQueue()154     NotificationQueue* getCurrentQueue() const { return queue_; }
155 
156     /**
157      * Set a limit on how many messages this consumer will read each iteration
158      * around the event loop.
159      *
160      * This helps rate-limit how much work the Consumer will do each event loop
161      * iteration, to prevent it from starving other event handlers.
162      *
163      * A limit of 0 means no limit will be enforced.  If unset, the limit
164      * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
165      */
setMaxReadAtOnce(uint32_t maxAtOnce)166     void setMaxReadAtOnce(uint32_t maxAtOnce) { maxReadAtOnce_ = maxAtOnce; }
getMaxReadAtOnce()167     uint32_t getMaxReadAtOnce() const { return maxReadAtOnce_; }
168 
getEventBase()169     EventBase* getEventBase() { return base_; }
170 
171     void handlerReady(uint16_t events) noexcept override;
172 
173    protected:
174     void destroy() override;
175 
~Consumer()176     ~Consumer() override {}
177 
178    private:
179     /**
180      * Consume messages off the queue until
181      *   - the queue is empty (1), or
182      *   - until the consumer is destroyed, or
183      *   - until the consumer is uninstalled, or
184      *   - an exception is thrown in the course of dequeueing, or
185      *   - unless isDrain is true, until the maxReadAtOnce_ limit is hit
186      *
187      * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
188      */
189     void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
190 
191     void setActive(bool active, bool shouldLock = false) {
192       if (!queue_) {
193         active_ = active;
194         return;
195       }
196       if (shouldLock) {
197         queue_->spinlock_.lock();
198       }
199       if (!active_ && active) {
200         ++queue_->numActiveConsumers_;
201       } else if (active_ && !active) {
202         --queue_->numActiveConsumers_;
203       }
204       active_ = active;
205       if (shouldLock) {
206         queue_->spinlock_.unlock();
207       }
208     }
209     void init(EventBase* eventBase, NotificationQueue* queue);
210 
211     NotificationQueue* queue_;
212     bool* destroyedFlagPtr_;
213     uint32_t maxReadAtOnce_;
214     EventBase* base_;
215     bool active_{false};
216   };
217 
218   class SimpleConsumer {
219    public:
SimpleConsumer(NotificationQueue & queue)220     explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) {
221       ++queue_.numConsumers_;
222     }
223 
~SimpleConsumer()224     ~SimpleConsumer() { --queue_.numConsumers_; }
225 
getFd()226     int getFd() const {
227       return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0];
228     }
229 
230     template <typename F>
231     void consume(F&& f);
232 
233    private:
234     NotificationQueue& queue_;
235   };
236 
237   enum class FdType {
238     PIPE = 1,
239     EVENTFD,
240   };
241 
242   /**
243    * Create a new NotificationQueue.
244    *
245    * If the maxSize parameter is specified, this sets the maximum queue size
246    * that will be enforced by tryPutMessage().  (This size is advisory, and may
247    * be exceeded if producers explicitly use putMessage() instead of
248    * tryPutMessage().)
249    *
250    * The fdType parameter determines the type of file descriptor used
251    * internally to signal message availability.  The default (eventfd) is
252    * preferable for performance and because it won't fail when the queue gets
253    * too long.  It is not available on on older and non-linux kernels, however.
254    * In this case the code will fall back to using a pipe, the parameter is
255    * mostly for testing purposes.
256    */
257   explicit NotificationQueue(
258       uint32_t maxSize = 0, FdType fdType = FdType::EVENTFD)
259       : eventfd_(-1),
260         pipeFds_{-1, -1},
261         advisoryMaxQueueSize_(maxSize),
262         pid_(folly::get_cached_pid()) {
263 #if !__has_include(<sys/eventfd.h>)
264     if (fdType == FdType::EVENTFD) {
265       fdType = FdType::PIPE;
266     }
267 #endif
268 
269 #if __has_include(<sys/eventfd.h>)
270     if (fdType == FdType::EVENTFD) {
271       eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
272       if (eventfd_ == -1) {
273         if (errno == ENOSYS || errno == EINVAL) {
274           // eventfd not availalble
275           LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
276                      << errno << ", falling back to pipe mode (is your kernel "
277                      << "> 2.6.30?)";
278           fdType = FdType::PIPE;
279         } else {
280           // some other error
281           folly::throwSystemError(
282               "Failed to create eventfd for "
283               "NotificationQueue",
284               errno);
285         }
286       }
287     }
288 #endif
289 
290     if (fdType == FdType::PIPE) {
291       if (pipe(pipeFds_)) {
292         folly::throwSystemError(
293             "Failed to create pipe for NotificationQueue", errno);
294       }
295       try {
296         // put both ends of the pipe into non-blocking mode
297         if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
298           folly::throwSystemError(
299               "failed to put NotificationQueue pipe read "
300               "endpoint into non-blocking mode",
301               errno);
302         }
303         if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
304           folly::throwSystemError(
305               "failed to put NotificationQueue pipe write "
306               "endpoint into non-blocking mode",
307               errno);
308         }
catch(...)309       } catch (...) {
310         ::close(pipeFds_[0]);
311         ::close(pipeFds_[1]);
312         throw;
313       }
314     }
315   }
316 
~NotificationQueue()317   ~NotificationQueue() {
318     std::unique_ptr<Node> data;
319     while (!queue_.empty()) {
320       data.reset(&queue_.front());
321       queue_.pop_front();
322     }
323     if (eventfd_ >= 0) {
324       ::close(eventfd_);
325       eventfd_ = -1;
326     }
327     if (pipeFds_[0] >= 0) {
328       ::close(pipeFds_[0]);
329       pipeFds_[0] = -1;
330     }
331     if (pipeFds_[1] >= 0) {
332       ::close(pipeFds_[1]);
333       pipeFds_[1] = -1;
334     }
335   }
336 
337   /**
338    * Set the advisory maximum queue size.
339    *
340    * This maximum queue size affects calls to tryPutMessage().  Message
341    * producers can still use the putMessage() call to unconditionally put a
342    * message on the queue, ignoring the configured maximum queue size.  This
343    * can cause the queue size to exceed the configured maximum.
344    */
setMaxQueueSize(uint32_t max)345   void setMaxQueueSize(uint32_t max) { advisoryMaxQueueSize_ = max; }
346 
347   /**
348    * Attempt to put a message on the queue if the queue is not already full.
349    *
350    * If the queue is full, a std::overflow_error will be thrown.  The
351    * setMaxQueueSize() function controls the maximum queue size.
352    *
353    * If the queue is currently draining, an std::runtime_error will be thrown.
354    *
355    * This method may contend briefly on a spinlock if many threads are
356    * concurrently accessing the queue, but for all intents and purposes it will
357    * immediately place the message on the queue and return.
358    *
359    * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
360    * may throw any other exception thrown by the MessageT move/copy
361    * constructor.
362    */
363   template <typename MessageTT>
tryPutMessage(MessageTT && message)364   void tryPutMessage(MessageTT&& message) {
365     putMessageImpl(std::forward<MessageTT>(message), advisoryMaxQueueSize_);
366   }
367 
368   /**
369    * No-throw versions of the above.  Instead returns true on success, false on
370    * failure.
371    *
372    * Only std::overflow_error (the common exception case) and std::runtime_error
373    * (which indicates that the queue is being drained) are prevented from being
374    * thrown. User code must still catch std::bad_alloc errors.
375    */
376   template <typename MessageTT>
tryPutMessageNoThrow(MessageTT && message)377   bool tryPutMessageNoThrow(MessageTT&& message) {
378     return putMessageImpl(
379         std::forward<MessageTT>(message), advisoryMaxQueueSize_, false);
380   }
381 
382   /**
383    * Unconditionally put a message on the queue.
384    *
385    * This method is like tryPutMessage(), but ignores the maximum queue size
386    * and always puts the message on the queue, even if the maximum queue size
387    * would be exceeded.
388    *
389    * putMessage() may throw
390    *   - std::bad_alloc if memory allocation fails, and may
391    *   - std::runtime_error if the queue is currently draining
392    *   - any other exception thrown by the MessageT move/copy constructor.
393    */
394   template <typename MessageTT>
putMessage(MessageTT && message)395   void putMessage(MessageTT&& message) {
396     putMessageImpl(std::forward<MessageTT>(message), 0);
397   }
398 
399   /**
400    * Put several messages on the queue.
401    */
402   template <typename InputIteratorT>
putMessages(InputIteratorT first,InputIteratorT last)403   void putMessages(InputIteratorT first, InputIteratorT last) {
404     typedef typename std::iterator_traits<InputIteratorT>::iterator_category
405         IterCategory;
406     putMessagesImpl(first, last, IterCategory());
407   }
408 
409   /**
410    * Try to immediately pull a message off of the queue, without blocking.
411    *
412    * If a message is immediately available, the result parameter will be
413    * updated to contain the message contents and true will be returned.
414    *
415    * If no message is available, false will be returned and result will be left
416    * unmodified.
417    */
tryConsume(MessageT & result)418   bool tryConsume(MessageT& result) {
419     SCOPE_EXIT { syncSignalAndQueue(); };
420 
421     checkPid();
422     std::unique_ptr<Node> data;
423 
424     {
425       std::unique_lock<SpinLock> g(spinlock_);
426 
427       if (UNLIKELY(queue_.empty())) {
428         return false;
429       }
430 
431       data.reset(&queue_.front());
432       queue_.pop_front();
433     }
434 
435     result = std::move(data->msg_);
436     RequestContext::setContext(std::move(data->ctx_));
437 
438     return true;
439   }
440 
size()441   size_t size() const {
442     std::unique_lock<SpinLock> g(spinlock_);
443     return queue_.size();
444   }
445 
446   /**
447    * Check that the NotificationQueue is being used from the correct process.
448    *
449    * If you create a NotificationQueue in one process, then fork, and try to
450    * send messages to the queue from the child process, you're going to have a
451    * bad time.  Unfortunately users have (accidentally) run into this.
452    *
453    * Because we use an eventfd/pipe, the child process can actually signal the
454    * parent process that an event is ready.  However, it can't put anything on
455    * the parent's queue, so the parent wakes up and finds an empty queue.  This
456    * check ensures that we catch the problem in the misbehaving child process
457    * code, and crash before signalling the parent process.
458    */
checkPid()459   void checkPid() const {
460     if (FOLLY_UNLIKELY(pid_ != folly::get_cached_pid())) {
461       checkPidFail();
462     }
463   }
464 
465  private:
466   // Forbidden copy constructor and assignment operator
467   NotificationQueue(NotificationQueue const&) = delete;
468   NotificationQueue& operator=(NotificationQueue const&) = delete;
469 
470   inline bool checkQueueSize(size_t maxSize, bool throws = true) const {
471     DCHECK(0 == spinlock_.try_lock());
472     if (maxSize > 0 && queue_.size() >= maxSize) {
473       if (throws) {
474         throw std::overflow_error(
475             "unable to add message to NotificationQueue: "
476             "queue is full");
477       }
478       return false;
479     }
480     return true;
481   }
482 
483   inline bool checkDraining(bool throws = true) {
484     if (UNLIKELY(draining_ && throws)) {
485       throw std::runtime_error("queue is draining, cannot add message");
486     }
487     return draining_;
488   }
489 
ensureSignalLocked()490   void ensureSignalLocked() const {
491     // semantics: empty fd == empty queue <=> !signal_
492     if (signal_) {
493       return;
494     }
495 
496     ssize_t bytes_written = 0;
497     size_t bytes_expected = 0;
498 
499     do {
500       if (eventfd_ >= 0) {
501         // eventfd(2) dictates that we must write a 64-bit integer
502         uint64_t signal = 1;
503         bytes_expected = sizeof(signal);
504         bytes_written = ::write(eventfd_, &signal, bytes_expected);
505       } else {
506         uint8_t signal = 1;
507         bytes_expected = sizeof(signal);
508         bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
509       }
510     } while (bytes_written == -1 && errno == EINTR);
511 
512     if (bytes_written == ssize_t(bytes_expected)) {
513       signal_ = true;
514     } else {
515       folly::throwSystemError(
516           "failed to signal NotificationQueue after "
517           "write",
518           errno);
519     }
520   }
521 
drainSignalsLocked()522   void drainSignalsLocked() {
523     ssize_t bytes_read = 0;
524     if (eventfd_ > 0) {
525       uint64_t message;
526       bytes_read = readNoInt(eventfd_, &message, sizeof(message));
527       CHECK(bytes_read != -1 || errno == EAGAIN);
528     } else {
529       // There should only be one byte in the pipe. To avoid potential leaks we
530       // still drain.
531       uint8_t message[32];
532       ssize_t result;
533       while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) !=
534              -1) {
535         bytes_read += result;
536       }
537       CHECK(result == -1 && errno == EAGAIN);
538       LOG_IF(ERROR, bytes_read > 1)
539           << "[NotificationQueue] Unexpected state while draining pipe: bytes_read="
540           << bytes_read << " bytes, expected <= 1";
541     }
542     LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0))
543         << "[NotificationQueue] Unexpected state while draining signals: signal_="
544         << signal_ << " bytes_read=" << bytes_read;
545 
546     signal_ = false;
547   }
548 
ensureSignal()549   void ensureSignal() const {
550     std::unique_lock<SpinLock> g(spinlock_);
551     ensureSignalLocked();
552   }
553 
syncSignalAndQueue()554   void syncSignalAndQueue() {
555     std::unique_lock<SpinLock> g(spinlock_);
556 
557     if (queue_.empty()) {
558       drainSignalsLocked();
559     } else {
560       ensureSignalLocked();
561     }
562   }
563 
564   template <typename MessageTT>
565   bool putMessageImpl(MessageTT&& message, size_t maxSize, bool throws = true) {
566     checkPid();
567     bool signal = false;
568     {
569       auto data = std::make_unique<Node>(
570           std::forward<MessageTT>(message), RequestContext::saveContext());
571       std::unique_lock<SpinLock> g(spinlock_);
572       if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
573         return false;
574       }
575       // We only need to signal an event if not all consumers are
576       // awake.
577       if (numActiveConsumers_ < numConsumers_) {
578         signal = true;
579       }
580       queue_.push_back(*data.release());
581       if (signal) {
582         ensureSignalLocked();
583       }
584     }
585     return true;
586   }
587 
588   template <typename InputIteratorT>
putMessagesImpl(InputIteratorT first,InputIteratorT last,std::input_iterator_tag)589   void putMessagesImpl(
590       InputIteratorT first, InputIteratorT last, std::input_iterator_tag) {
591     checkPid();
592     bool signal = false;
593     boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> q;
594     try {
595       while (first != last) {
596         auto data = std::make_unique<Node>(
597             std::move(*first), RequestContext::saveContext());
598         q.push_back(*data.release());
599         ++first;
600       }
601       std::unique_lock<SpinLock> g(spinlock_);
602       checkDraining();
603       queue_.splice(queue_.end(), q);
604       if (numActiveConsumers_ < numConsumers_) {
605         signal = true;
606       }
607       if (signal) {
608         ensureSignalLocked();
609       }
610     } catch (...) {
611       std::unique_ptr<Node> data;
612       while (!q.empty()) {
613         data.reset(&q.front());
614         q.pop_front();
615       }
616       throw;
617     }
618   }
619 
checkPidFail()620   FOLLY_NOINLINE void checkPidFail() const {
621     folly::terminate_with<std::runtime_error>(
622         "Pid mismatch. Pid = " +
623         folly::to<std::string>(folly::get_cached_pid()) + ". Expecting " +
624         folly::to<std::string>(pid_));
625   }
626 
627   mutable folly::SpinLock spinlock_;
628   mutable bool signal_{false};
629   int eventfd_;
630   int pipeFds_[2]; // to fallback to on older/non-linux systems
631   uint32_t advisoryMaxQueueSize_;
632   pid_t pid_;
633   boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> queue_;
634   int numConsumers_{0};
635   std::atomic<int> numActiveConsumers_{0};
636   bool draining_{false};
637 };
638 
639 template <typename MessageT>
destroy()640 void NotificationQueue<MessageT>::Consumer::destroy() {
641   // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
642   // will be non-nullptr.  Mark the value that it points to, so that
643   // handlerReady() will know the callback is destroyed, and that it cannot
644   // access any member variables anymore.
645   if (destroyedFlagPtr_) {
646     *destroyedFlagPtr_ = true;
647   }
648   stopConsuming();
649   DelayedDestruction::destroy();
650 }
651 
652 template <typename MessageT>
handlerReady(uint16_t)653 void NotificationQueue<MessageT>::Consumer::handlerReady(
654     uint16_t /*events*/) noexcept {
655   consumeMessages(false);
656 }
657 
658 template <typename MessageT>
consumeMessages(bool isDrain,size_t * numConsumed)659 void NotificationQueue<MessageT>::Consumer::consumeMessages(
660     bool isDrain, size_t* numConsumed) noexcept {
661   DestructorGuard dg(this);
662   uint32_t numProcessed = 0;
663   setActive(true);
664   SCOPE_EXIT {
665     if (queue_) {
666       queue_->syncSignalAndQueue();
667     }
668   };
669   SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
670   SCOPE_EXIT {
671     if (numConsumed != nullptr) {
672       *numConsumed = numProcessed;
673     }
674   };
675   while (true) {
676     // Now pop the message off of the queue.
677     //
678     // We have to manually acquire and release the spinlock here, rather than
679     // using SpinLockHolder since the MessageT has to be constructed while
680     // holding the spinlock and available after we release it.  SpinLockHolder
681     // unfortunately doesn't provide a release() method.  (We can't construct
682     // MessageT first since we have no guarantee that MessageT has a default
683     // constructor.
684     queue_->spinlock_.lock();
685     bool locked = true;
686 
687     try {
688       if (UNLIKELY(queue_->queue_.empty())) {
689         // If there is no message, we've reached the end of the queue, return.
690         setActive(false);
691         queue_->spinlock_.unlock();
692         return;
693       }
694 
695       // Pull a message off the queue.
696       std::unique_ptr<Node> data;
697       data.reset(&queue_->queue_.front());
698       queue_->queue_.pop_front();
699 
700       // Check to see if the queue is empty now.
701       // We use this as an optimization to see if we should bother trying to
702       // loop again and read another message after invoking this callback.
703       bool wasEmpty = queue_->queue_.empty();
704       if (wasEmpty) {
705         setActive(false);
706       }
707 
708       // Now unlock the spinlock before we invoke the callback.
709       queue_->spinlock_.unlock();
710       RequestContextScopeGuard rctx(std::move(data->ctx_));
711 
712       locked = false;
713 
714       // Call the callback
715       bool callbackDestroyed = false;
716       CHECK(destroyedFlagPtr_ == nullptr);
717       destroyedFlagPtr_ = &callbackDestroyed;
718       messageAvailable(std::move(data->msg_));
719       destroyedFlagPtr_ = nullptr;
720 
721       // Make sure message destructor is called with the correct RequestContext.
722       data.reset();
723 
724       // If the callback was destroyed before it returned, we are done
725       if (callbackDestroyed) {
726         return;
727       }
728 
729       // If the callback is no longer installed, we are done.
730       if (queue_ == nullptr) {
731         return;
732       }
733 
734       // If we have hit maxReadAtOnce_, we are done.
735       ++numProcessed;
736       if (!isDrain && maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
737         return;
738       }
739 
740       // If the queue was empty before we invoked the callback, it's probable
741       // that it is still empty now.  Just go ahead and return, rather than
742       // looping again and trying to re-read from the eventfd.  (If a new
743       // message had in fact arrived while we were invoking the callback, we
744       // will simply be woken up the next time around the event loop and will
745       // process the message then.)
746       if (wasEmpty) {
747         return;
748       }
749     } catch (const std::exception&) {
750       // This catch block is really just to handle the case where the MessageT
751       // constructor throws.  The messageAvailable() callback itself is
752       // declared as noexcept and should never throw.
753       //
754       // If the MessageT constructor does throw we try to handle it as best as
755       // we can, but we can't work miracles.  We will just ignore the error for
756       // now and return.  The next time around the event loop we will end up
757       // trying to read the message again.  If MessageT continues to throw we
758       // will never make forward progress and will keep trying each time around
759       // the event loop.
760       if (locked) {
761         // Unlock the spinlock.
762         queue_->spinlock_.unlock();
763       }
764 
765       return;
766     }
767   }
768 }
769 
770 template <typename MessageT>
init(EventBase * eventBase,NotificationQueue * queue)771 void NotificationQueue<MessageT>::Consumer::init(
772     EventBase* eventBase, NotificationQueue* queue) {
773   eventBase->dcheckIsInEventBaseThread();
774   assert(queue_ == nullptr);
775   assert(!isHandlerRegistered());
776   queue->checkPid();
777 
778   base_ = eventBase;
779 
780   queue_ = queue;
781 
782   {
783     std::unique_lock<SpinLock> g(queue_->spinlock_);
784     queue_->numConsumers_++;
785   }
786   queue_->ensureSignal();
787 
788   if (queue_->eventfd_ >= 0) {
789     initHandler(eventBase, folly::NetworkSocket::fromFd(queue_->eventfd_));
790   } else {
791     initHandler(eventBase, folly::NetworkSocket::fromFd(queue_->pipeFds_[0]));
792   }
793 }
794 
795 template <typename MessageT>
stopConsuming()796 void NotificationQueue<MessageT>::Consumer::stopConsuming() {
797   if (queue_ == nullptr) {
798     assert(!isHandlerRegistered());
799     return;
800   }
801 
802   {
803     std::unique_lock<SpinLock> g(queue_->spinlock_);
804     queue_->numConsumers_--;
805     setActive(false);
806   }
807 
808   assert(isHandlerRegistered());
809   unregisterHandler();
810   detachEventBase();
811   queue_ = nullptr;
812 }
813 
814 template <typename MessageT>
consumeUntilDrained(size_t * numConsumed)815 bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
816     size_t* numConsumed) noexcept {
817   DestructorGuard dg(this);
818   {
819     std::unique_lock<SpinLock> g(queue_->spinlock_);
820     if (queue_->draining_) {
821       return false;
822     }
823     queue_->draining_ = true;
824   }
825   consumeMessages(true, numConsumed);
826   {
827     std::unique_lock<SpinLock> g(queue_->spinlock_);
828     queue_->draining_ = false;
829   }
830   return true;
831 }
832 
833 template <typename MessageT>
834 template <typename F>
consume(F && foreach)835 void NotificationQueue<MessageT>::SimpleConsumer::consume(F&& foreach) {
836   SCOPE_EXIT { queue_.syncSignalAndQueue(); };
837 
838   queue_.checkPid();
839 
840   std::unique_ptr<Node> data;
841   {
842     std::unique_lock<SpinLock> g(queue_.spinlock_);
843 
844     if (UNLIKELY(queue_.queue_.empty())) {
845       return;
846     }
847 
848     data.reset(&queue_.queue_.front());
849     queue_.queue_.pop_front();
850   }
851 
852   RequestContextScopeGuard rctx(std::move(data->ctx_));
853   foreach(std::move(data->msg_));
854   // Make sure message destructor is called with the correct RequestContext.
855   data.reset();
856 }
857 
858 /**
859  * Creates a NotificationQueue::Consumer wrapping a function object
860  * Modeled after AsyncTimeout::make
861  *
862  */
863 
864 namespace detail {
865 
866 template <typename MessageT, typename TCallback>
867 struct notification_queue_consumer_wrapper
868     : public NotificationQueue<MessageT>::Consumer {
869   template <typename UCallback>
notification_queue_consumer_wrappernotification_queue_consumer_wrapper870   explicit notification_queue_consumer_wrapper(UCallback&& callback)
871       : callback_(std::forward<UCallback>(callback)) {}
872 
873   // we are being stricter here and requiring noexcept for callback
messageAvailablenotification_queue_consumer_wrapper874   void messageAvailable(MessageT&& message) noexcept override {
875     static_assert(
876         noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))),
877         "callback must be declared noexcept, e.g.: `[]() noexcept {}`");
878 
879     callback_(std::forward<MessageT>(message));
880   }
881 
882  private:
883   TCallback callback_;
884 };
885 
886 } // namespace detail
887 
888 template <typename MessageT>
889 template <typename TCallback>
890 auto NotificationQueue<MessageT>::Consumer::make(TCallback&& callback)
891     -> UniquePtr {
892   using CB = typename std::decay<TCallback>::type;
893   using W = detail::notification_queue_consumer_wrapper<MessageT, CB>;
894   return makeDelayedDestructionUniquePtr<W>(std::forward<TCallback>(callback));
895 }
896 
897 } // namespace folly
898