1 /* 2 Copyright (c) DataStax, Inc. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef DATASTAX_INTERNAL_STREAM_MANAGER_HPP 18 #define DATASTAX_INTERNAL_STREAM_MANAGER_HPP 19 20 #include "constants.hpp" 21 #include "dense_hash_map.hpp" 22 #include "macros.hpp" 23 #include "scoped_ptr.hpp" 24 25 #include <assert.h> 26 #include <stdint.h> 27 #include <string.h> 28 29 #if defined(_MSC_VER) 30 #include <intrin.h> 31 #endif 32 33 namespace datastax { namespace internal { namespace core { 34 35 struct StreamHash { operator ()datastax::internal::core::StreamHash36 std::size_t operator()(int stream) const { return ((stream & 0x3F) << 10) | (stream >> 6); } 37 }; 38 39 template <class T> 40 class StreamManager { 41 public: StreamManager()42 StreamManager() 43 : max_streams_(CASS_MAX_STREAMS) 44 , num_words_(max_streams_ / NUM_BITS_PER_WORD) 45 , offset_(0) 46 , words_(num_words_, ~static_cast<word_t>(0)) { 47 // Client request stream IDs are always positive values so it's 48 // safe to use negative values for the empty and deleted keys. 49 pending_.set_empty_key(-1); 50 pending_.set_deleted_key(-2); 51 pending_.max_load_factor(0.4f); 52 } 53 acquire(const T & item)54 int acquire(const T& item) { 55 int stream = acquire_stream(); 56 if (stream < 0) return -1; 57 pending_.insert(std::pair<int, T>(stream, item)); 58 return stream; 59 } 60 release(int stream)61 void release(int stream) { 62 assert(stream >= 0 && static_cast<size_t>(stream) < max_streams_); 63 assert(pending_.count(stream) > 0); 64 pending_.erase(stream); 65 release_stream(stream); 66 } 67 get(int stream,T & output)68 bool get(int stream, T& output) { 69 typename PendingMap::iterator i = pending_.find(stream); 70 if (i != pending_.end()) { 71 output = i->second; 72 return true; 73 } 74 return false; 75 } 76 available_streams() const77 size_t available_streams() const { return max_streams_ - pending_.size(); } pending_streams() const78 size_t pending_streams() const { return pending_.size(); } max_streams() const79 size_t max_streams() const { return max_streams_; } 80 81 private: 82 typedef DenseHashMap<int, T, StreamHash> PendingMap; 83 84 #if defined(_MSC_VER) && defined(_M_AMD64) 85 typedef __int64 word_t; 86 #else 87 typedef unsigned long word_t; 88 #endif 89 90 static const size_t NUM_BITS_PER_WORD = sizeof(word_t) * 8; 91 count_trailing_zeros(word_t word)92 static inline int count_trailing_zeros(word_t word) { 93 #if defined(__GNUC__) 94 return __builtin_ctzl(word); 95 #elif defined(_MSC_VER) 96 unsigned long index; 97 #if defined(_M_AMD64) 98 _BitScanForward64(&index, word); 99 100 #else 101 _BitScanForward(&index, word); 102 #endif 103 return static_cast<int>(index); 104 #else 105 #endif 106 } 107 108 private: acquire_stream()109 int acquire_stream() { 110 const size_t offset = offset_; 111 const size_t num_words = num_words_; 112 113 ++offset_; 114 115 for (size_t i = 0; i < num_words; ++i) { 116 size_t index = (i + offset) % num_words; 117 int bit = get_and_set_first_available_stream(index); 118 if (bit >= 0) { 119 return bit + (NUM_BITS_PER_WORD * index); 120 } 121 } 122 123 return -1; 124 } 125 release_stream(int stream)126 inline void release_stream(int stream) { 127 size_t index = stream / NUM_BITS_PER_WORD; 128 int bit = stream % NUM_BITS_PER_WORD; 129 assert((words_[index] & (static_cast<word_t>(1) << (bit))) == 0); 130 words_[index] |= (static_cast<word_t>(1) << (bit)); 131 } 132 get_and_set_first_available_stream(size_t index)133 inline int get_and_set_first_available_stream(size_t index) { 134 word_t word = words_[index]; 135 if (word == 0) return -1; 136 int stream = count_trailing_zeros(word); 137 words_[index] ^= (static_cast<word_t>(1) << stream); 138 return stream; 139 } 140 141 private: 142 const size_t max_streams_; 143 const size_t num_words_; 144 size_t offset_; 145 Vector<word_t> words_; 146 PendingMap pending_; 147 148 private: 149 DISALLOW_COPY_AND_ASSIGN(StreamManager); 150 }; 151 152 }}} // namespace datastax::internal::core 153 154 #endif 155