1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #pragma once 7 8 #include <condition_variable> 9 #include <mutex> 10 #include <queue> 11 #include <utility> 12 13 namespace ROCKSDB_NAMESPACE { 14 15 template <class T> 16 class channel { 17 public: channel()18 explicit channel() : eof_(false) {} 19 20 channel(const channel&) = delete; 21 void operator=(const channel&) = delete; 22 sendEof()23 void sendEof() { 24 std::lock_guard<std::mutex> lk(lock_); 25 eof_ = true; 26 cv_.notify_all(); 27 } 28 eof()29 bool eof() { 30 std::lock_guard<std::mutex> lk(lock_); 31 return buffer_.empty() && eof_; 32 } 33 size()34 size_t size() const { 35 std::lock_guard<std::mutex> lk(lock_); 36 return buffer_.size(); 37 } 38 39 // writes elem to the queue write(T && elem)40 void write(T&& elem) { 41 std::unique_lock<std::mutex> lk(lock_); 42 buffer_.emplace(std::forward<T>(elem)); 43 cv_.notify_one(); 44 } 45 46 /// Moves a dequeued element onto elem, blocking until an element 47 /// is available. 48 // returns false if EOF read(T & elem)49 bool read(T& elem) { 50 std::unique_lock<std::mutex> lk(lock_); 51 cv_.wait(lk, [&] { return eof_ || !buffer_.empty(); }); 52 if (eof_ && buffer_.empty()) { 53 return false; 54 } 55 elem = std::move(buffer_.front()); 56 buffer_.pop(); 57 cv_.notify_one(); 58 return true; 59 } 60 61 private: 62 std::condition_variable cv_; 63 mutable std::mutex lock_; 64 std::queue<T> buffer_; 65 bool eof_; 66 }; 67 } // namespace ROCKSDB_NAMESPACE 68