1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #ifndef DATASTAX_INTERNAL_STREAM_MANAGER_HPP
18 #define DATASTAX_INTERNAL_STREAM_MANAGER_HPP
19 
20 #include "constants.hpp"
21 #include "dense_hash_map.hpp"
22 #include "macros.hpp"
23 #include "scoped_ptr.hpp"
24 
25 #include <assert.h>
26 #include <stdint.h>
27 #include <string.h>
28 
29 #if defined(_MSC_VER)
30 #include <intrin.h>
31 #endif
32 
33 namespace datastax { namespace internal { namespace core {
34 
35 struct StreamHash {
operator ()datastax::internal::core::StreamHash36   std::size_t operator()(int stream) const { return ((stream & 0x3F) << 10) | (stream >> 6); }
37 };
38 
39 template <class T>
40 class StreamManager {
41 public:
StreamManager()42   StreamManager()
43       : max_streams_(CASS_MAX_STREAMS)
44       , num_words_(max_streams_ / NUM_BITS_PER_WORD)
45       , offset_(0)
46       , words_(num_words_, ~static_cast<word_t>(0)) {
47     // Client request stream IDs are always positive values so it's
48     // safe to use negative values for the empty and deleted keys.
49     pending_.set_empty_key(-1);
50     pending_.set_deleted_key(-2);
51     pending_.max_load_factor(0.4f);
52   }
53 
acquire(const T & item)54   int acquire(const T& item) {
55     int stream = acquire_stream();
56     if (stream < 0) return -1;
57     pending_.insert(std::pair<int, T>(stream, item));
58     return stream;
59   }
60 
release(int stream)61   void release(int stream) {
62     assert(stream >= 0 && static_cast<size_t>(stream) < max_streams_);
63     assert(pending_.count(stream) > 0);
64     pending_.erase(stream);
65     release_stream(stream);
66   }
67 
get(int stream,T & output)68   bool get(int stream, T& output) {
69     typename PendingMap::iterator i = pending_.find(stream);
70     if (i != pending_.end()) {
71       output = i->second;
72       return true;
73     }
74     return false;
75   }
76 
available_streams() const77   size_t available_streams() const { return max_streams_ - pending_.size(); }
pending_streams() const78   size_t pending_streams() const { return pending_.size(); }
max_streams() const79   size_t max_streams() const { return max_streams_; }
80 
81 private:
82   typedef DenseHashMap<int, T, StreamHash> PendingMap;
83 
84 #if defined(_MSC_VER) && defined(_M_AMD64)
85   typedef __int64 word_t;
86 #else
87   typedef unsigned long word_t;
88 #endif
89 
90   static const size_t NUM_BITS_PER_WORD = sizeof(word_t) * 8;
91 
count_trailing_zeros(word_t word)92   static inline int count_trailing_zeros(word_t word) {
93 #if defined(__GNUC__)
94     return __builtin_ctzl(word);
95 #elif defined(_MSC_VER)
96     unsigned long index;
97 #if defined(_M_AMD64)
98     _BitScanForward64(&index, word);
99 
100 #else
101     _BitScanForward(&index, word);
102 #endif
103     return static_cast<int>(index);
104 #else
105 #endif
106   }
107 
108 private:
acquire_stream()109   int acquire_stream() {
110     const size_t offset = offset_;
111     const size_t num_words = num_words_;
112 
113     ++offset_;
114 
115     for (size_t i = 0; i < num_words; ++i) {
116       size_t index = (i + offset) % num_words;
117       int bit = get_and_set_first_available_stream(index);
118       if (bit >= 0) {
119         return bit + (NUM_BITS_PER_WORD * index);
120       }
121     }
122 
123     return -1;
124   }
125 
release_stream(int stream)126   inline void release_stream(int stream) {
127     size_t index = stream / NUM_BITS_PER_WORD;
128     int bit = stream % NUM_BITS_PER_WORD;
129     assert((words_[index] & (static_cast<word_t>(1) << (bit))) == 0);
130     words_[index] |= (static_cast<word_t>(1) << (bit));
131   }
132 
get_and_set_first_available_stream(size_t index)133   inline int get_and_set_first_available_stream(size_t index) {
134     word_t word = words_[index];
135     if (word == 0) return -1;
136     int stream = count_trailing_zeros(word);
137     words_[index] ^= (static_cast<word_t>(1) << stream);
138     return stream;
139   }
140 
141 private:
142   const size_t max_streams_;
143   const size_t num_words_;
144   size_t offset_;
145   Vector<word_t> words_;
146   PendingMap pending_;
147 
148 private:
149   DISALLOW_COPY_AND_ASSIGN(StreamManager);
150 };
151 
152 }}} // namespace datastax::internal::core
153 
154 #endif
155