1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #ifndef DATASTAX_INTERNAL_HOST_HPP
18 #define DATASTAX_INTERNAL_HOST_HPP
19 
20 #include "address.hpp"
21 #include "allocated.hpp"
22 #include "atomic.hpp"
23 #include "copy_on_write_ptr.hpp"
24 #include "get_time.hpp"
25 #include "logger.hpp"
26 #include "macros.hpp"
27 #include "map.hpp"
28 #include "ref_counted.hpp"
29 #include "scoped_ptr.hpp"
30 #include "spin_lock.hpp"
31 #include "vector.hpp"
32 
33 #include <math.h>
34 #include <stdint.h>
35 
36 namespace datastax { namespace internal { namespace core {
37 
38 class Row;
39 
40 struct TimestampedAverage {
TimestampedAveragedatastax::internal::core::TimestampedAverage41   TimestampedAverage()
42       : average(-1)
43       , timestamp(0)
44       , num_measured(0) {}
45 
46   int64_t average;
47   uint64_t timestamp;
48   uint64_t num_measured;
49 };
50 
51 class VersionNumber {
52 public:
VersionNumber()53   VersionNumber()
54       : major_version_(0)
55       , minor_version_(0)
56       , patch_version_(0) {}
57 
VersionNumber(int major_version,int minor_version,int patch_version)58   VersionNumber(int major_version, int minor_version, int patch_version)
59       : major_version_(major_version)
60       , minor_version_(minor_version)
61       , patch_version_(patch_version) {}
62 
operator >=(const VersionNumber & other) const63   bool operator>=(const VersionNumber& other) const { return compare(other) >= 0; }
64 
operator <(const VersionNumber & other) const65   bool operator<(const VersionNumber& other) const { return compare(other) < 0; }
66 
compare(const VersionNumber & other) const67   int compare(const VersionNumber& other) const {
68     if (major_version_ < other.major_version_) return -1;
69     if (major_version_ > other.major_version_) return 1;
70 
71     if (minor_version_ < other.minor_version_) return -1;
72     if (minor_version_ > other.minor_version_) return 1;
73 
74     if (patch_version_ < other.patch_version_) return -1;
75     if (patch_version_ > other.patch_version_) return 1;
76 
77     return 0;
78   }
79 
80   bool parse(const String& version);
81 
major_version() const82   int major_version() const { return major_version_; }
minor_version() const83   int minor_version() const { return minor_version_; }
patch_version() const84   int patch_version() const { return patch_version_; }
85 
86 private:
87   int major_version_;
88   int minor_version_;
89   int patch_version_;
90 };
91 
92 class Host : public RefCounted<Host> {
93 public:
94   typedef SharedRefPtr<Host> Ptr;
95   typedef SharedRefPtr<const Host> ConstPtr;
96 
Host(const Address & address)97   Host(const Address& address)
98       : address_(address)
99       , rpc_address_(address)
100       , rack_id_(0)
101       , dc_id_(0)
102       , address_string_(address.to_string())
103       , connection_count_(0)
104       , inflight_request_count_(0) {}
105 
address() const106   const Address& address() const { return address_; }
address_string() const107   const String& address_string() const { return address_string_; }
108 
rpc_address() const109   const Address& rpc_address() const { return rpc_address_; }
110 
111   void set(const Row* row, bool use_tokens);
112 
rack() const113   const String& rack() const { return rack_; }
dc() const114   const String& dc() const { return dc_; }
set_rack_and_dc(const String & rack,const String & dc)115   void set_rack_and_dc(const String& rack, const String& dc) {
116     rack_ = rack;
117     dc_ = dc;
118   }
119 
rack_id() const120   uint32_t rack_id() const { return rack_id_; }
dc_id() const121   uint32_t dc_id() const { return dc_id_; }
set_rack_and_dc_ids(uint32_t rack_id,uint32_t dc_id)122   void set_rack_and_dc_ids(uint32_t rack_id, uint32_t dc_id) {
123     rack_id_ = rack_id;
124     dc_id_ = dc_id;
125   }
126 
partitioner() const127   const String& partitioner() const { return partitioner_; }
128 
tokens() const129   const Vector<String>& tokens() const { return tokens_; }
130 
server_version() const131   const VersionNumber& server_version() const { return server_version_; }
132 
dse_server_version() const133   const VersionNumber& dse_server_version() const { return dse_server_version_; }
134 
to_string() const135   String to_string() const {
136     OStringStream ss;
137     ss << address_string_;
138     if (!rack_.empty() || !dc_.empty()) {
139       ss << " [" << rack_ << ':' << dc_ << "]";
140     }
141     return ss.str();
142   }
143 
enable_latency_tracking(uint64_t scale,uint64_t min_measured)144   void enable_latency_tracking(uint64_t scale, uint64_t min_measured) {
145     if (!latency_tracker_) {
146       latency_tracker_.reset(new LatencyTracker(scale, (30LL * min_measured) / 100LL));
147     }
148   }
149 
update_latency(uint64_t latency_ns)150   void update_latency(uint64_t latency_ns) {
151     if (latency_tracker_) {
152       LOG_TRACE("Latency %f ms for %s", static_cast<double>(latency_ns) / 1e6, to_string().c_str());
153       latency_tracker_->update(latency_ns);
154     }
155   }
156 
get_current_average() const157   TimestampedAverage get_current_average() const {
158     if (latency_tracker_) {
159       return latency_tracker_->get();
160     }
161     return TimestampedAverage();
162   }
163 
increment_connection_count()164   void increment_connection_count() { connection_count_.fetch_add(1, MEMORY_ORDER_RELAXED); }
165 
decrement_connection_count()166   void decrement_connection_count() { connection_count_.fetch_sub(1, MEMORY_ORDER_RELAXED); }
167 
connection_count() const168   int32_t connection_count() const { return connection_count_.load(MEMORY_ORDER_RELAXED); }
169 
increment_inflight_requests()170   void increment_inflight_requests() { inflight_request_count_.fetch_add(1, MEMORY_ORDER_RELAXED); }
171 
decrement_inflight_requests()172   void decrement_inflight_requests() { inflight_request_count_.fetch_sub(1, MEMORY_ORDER_RELAXED); }
173 
inflight_request_count() const174   int32_t inflight_request_count() const {
175     return inflight_request_count_.load(MEMORY_ORDER_RELAXED);
176   }
177 
178 private:
179   class LatencyTracker : public Allocated {
180   public:
LatencyTracker(uint64_t scale_ns,uint64_t threshold_to_account)181     LatencyTracker(uint64_t scale_ns, uint64_t threshold_to_account)
182         : scale_ns_(scale_ns)
183         , threshold_to_account_(threshold_to_account) {}
184 
185     void update(uint64_t latency_ns);
186 
get() const187     TimestampedAverage get() const {
188       ScopedSpinlock l(SpinlockPool<LatencyTracker>::get_spinlock(this));
189       return current_;
190     }
191 
192   private:
193     uint64_t scale_ns_;
194     uint64_t threshold_to_account_;
195     TimestampedAverage current_;
196 
197   private:
198     DISALLOW_COPY_AND_ASSIGN(LatencyTracker);
199   };
200 
201 private:
202   Address address_;
203   Address rpc_address_;
204   uint32_t rack_id_;
205   uint32_t dc_id_;
206   String address_string_;
207   VersionNumber server_version_;
208   VersionNumber dse_server_version_;
209   String rack_;
210   String dc_;
211   String partitioner_;
212   Vector<String> tokens_;
213   Atomic<int32_t> connection_count_;
214   Atomic<int32_t> inflight_request_count_;
215 
216   ScopedPtr<LatencyTracker> latency_tracker_;
217 
218 private:
219   DISALLOW_COPY_AND_ASSIGN(Host);
220 };
221 
222 /**
223  * A listener that handles cluster topology and host status changes.
224  */
225 class HostListener {
226 public:
~HostListener()227   virtual ~HostListener() {}
228 
229   /**
230    * A callback that's called when a host is marked as being UP.
231    *
232    * @param host A fully populated host object.
233    */
234   virtual void on_host_up(const Host::Ptr& host) = 0;
235 
236   /**
237    * A callback that's called when a host is marked as being DOWN.
238    *
239    * @param host A fully populated host object.
240    */
241   virtual void on_host_down(const Host::Ptr& host) = 0;
242 
243   /**
244    * A callback that's called when a new host is added to the cluster.
245    *
246    * @param host A fully populated host object.
247    */
248   virtual void on_host_added(const Host::Ptr& host) = 0;
249 
250   /**
251    * A callback that's called when a host is removed from a cluster.
252    *
253    * @param address The address of the host.
254    */
255   virtual void on_host_removed(const Host::Ptr& host) = 0;
256 };
257 
258 class DefaultHostListener
259     : public HostListener
260     , public RefCounted<DefaultHostListener> {
261 public:
262   typedef SharedRefPtr<DefaultHostListener> Ptr;
263 
on_host_up(const Host::Ptr & host)264   virtual void on_host_up(const Host::Ptr& host) {}
on_host_down(const Host::Ptr & host)265   virtual void on_host_down(const Host::Ptr& host) {}
on_host_added(const Host::Ptr & host)266   virtual void on_host_added(const Host::Ptr& host) {}
on_host_removed(const Host::Ptr & host)267   virtual void on_host_removed(const Host::Ptr& host) {}
268 };
269 
270 class ExternalHostListener : public DefaultHostListener {
271 public:
272   typedef SharedRefPtr<ExternalHostListener> Ptr;
273   ExternalHostListener(const CassHostListenerCallback callback, void* data);
274 
275   virtual void on_host_up(const Host::Ptr& host);
276   virtual void on_host_down(const Host::Ptr& host);
277   virtual void on_host_added(const Host::Ptr& host);
278   virtual void on_host_removed(const Host::Ptr& host);
279 
280 private:
281   const CassHostListenerCallback callback_;
282   void* data_;
283 };
284 
285 typedef Map<Address, Host::Ptr> HostMap;
286 
287 struct GetAddress {
288   typedef std::pair<Address, Host::Ptr> Pair;
operator ()datastax::internal::core::GetAddress289   const Address& operator()(const Pair& pair) const { return pair.first; }
290 };
291 
292 struct GetHost {
293   typedef std::pair<Address, Host::Ptr> Pair;
operator ()datastax::internal::core::GetHost294   Host::Ptr operator()(const Pair& pair) const { return pair.second; }
295 };
296 
297 typedef std::pair<Address, Host::Ptr> HostPair;
298 typedef Vector<Host::Ptr> HostVec;
299 typedef CopyOnWritePtr<HostVec> CopyOnWriteHostVec;
300 
301 void add_host(CopyOnWriteHostVec& hosts, const Host::Ptr& host);
302 void remove_host(CopyOnWriteHostVec& hosts, const Host::Ptr& host);
303 bool remove_host(CopyOnWriteHostVec& hosts, const Address& address);
304 
305 }}} // namespace datastax::internal::core
306 
307 #endif
308