1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 
8 #include <condition_variable>
9 #include <mutex>
10 #include <queue>
11 #include <utility>
12 
13 namespace ROCKSDB_NAMESPACE {
14 
15 template <class T>
16 class channel {
17  public:
channel()18   explicit channel() : eof_(false) {}
19 
20   channel(const channel&) = delete;
21   void operator=(const channel&) = delete;
22 
sendEof()23   void sendEof() {
24     std::lock_guard<std::mutex> lk(lock_);
25     eof_ = true;
26     cv_.notify_all();
27   }
28 
eof()29   bool eof() {
30     std::lock_guard<std::mutex> lk(lock_);
31     return buffer_.empty() && eof_;
32   }
33 
size()34   size_t size() const {
35     std::lock_guard<std::mutex> lk(lock_);
36     return buffer_.size();
37   }
38 
39   // writes elem to the queue
write(T && elem)40   void write(T&& elem) {
41     std::unique_lock<std::mutex> lk(lock_);
42     buffer_.emplace(std::forward<T>(elem));
43     cv_.notify_one();
44   }
45 
46   /// Moves a dequeued element onto elem, blocking until an element
47   /// is available.
48   // returns false if EOF
read(T & elem)49   bool read(T& elem) {
50     std::unique_lock<std::mutex> lk(lock_);
51     cv_.wait(lk, [&] { return eof_ || !buffer_.empty(); });
52     if (eof_ && buffer_.empty()) {
53       return false;
54     }
55     elem = std::move(buffer_.front());
56     buffer_.pop();
57     cv_.notify_one();
58     return true;
59   }
60 
61  private:
62   std::condition_variable cv_;
63   mutable std::mutex lock_;
64   std::queue<T> buffer_;
65   bool eof_;
66 };
67 }  // namespace ROCKSDB_NAMESPACE
68