1 /* 2 Copyright (c) DataStax, Inc. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef DATASTAX_INTERNAL_HOST_HPP 18 #define DATASTAX_INTERNAL_HOST_HPP 19 20 #include "address.hpp" 21 #include "allocated.hpp" 22 #include "atomic.hpp" 23 #include "copy_on_write_ptr.hpp" 24 #include "get_time.hpp" 25 #include "logger.hpp" 26 #include "macros.hpp" 27 #include "map.hpp" 28 #include "ref_counted.hpp" 29 #include "scoped_ptr.hpp" 30 #include "spin_lock.hpp" 31 #include "vector.hpp" 32 33 #include <math.h> 34 #include <stdint.h> 35 36 namespace datastax { namespace internal { namespace core { 37 38 class Row; 39 40 struct TimestampedAverage { TimestampedAveragedatastax::internal::core::TimestampedAverage41 TimestampedAverage() 42 : average(-1) 43 , timestamp(0) 44 , num_measured(0) {} 45 46 int64_t average; 47 uint64_t timestamp; 48 uint64_t num_measured; 49 }; 50 51 class VersionNumber { 52 public: VersionNumber()53 VersionNumber() 54 : major_version_(0) 55 , minor_version_(0) 56 , patch_version_(0) {} 57 VersionNumber(int major_version,int minor_version,int patch_version)58 VersionNumber(int major_version, int minor_version, int patch_version) 59 : major_version_(major_version) 60 , minor_version_(minor_version) 61 , patch_version_(patch_version) {} 62 operator >=(const VersionNumber & other) const63 bool operator>=(const VersionNumber& other) const { return compare(other) >= 0; } 64 operator <(const VersionNumber & other) const65 bool operator<(const VersionNumber& other) const { return compare(other) < 0; } 66 compare(const VersionNumber & other) const67 int compare(const VersionNumber& other) const { 68 if (major_version_ < other.major_version_) return -1; 69 if (major_version_ > other.major_version_) return 1; 70 71 if (minor_version_ < other.minor_version_) return -1; 72 if (minor_version_ > other.minor_version_) return 1; 73 74 if (patch_version_ < other.patch_version_) return -1; 75 if (patch_version_ > other.patch_version_) return 1; 76 77 return 0; 78 } 79 80 bool parse(const String& version); 81 major_version() const82 int major_version() const { return major_version_; } minor_version() const83 int minor_version() const { return minor_version_; } patch_version() const84 int patch_version() const { return patch_version_; } 85 86 private: 87 int major_version_; 88 int minor_version_; 89 int patch_version_; 90 }; 91 92 class Host : public RefCounted<Host> { 93 public: 94 typedef SharedRefPtr<Host> Ptr; 95 typedef SharedRefPtr<const Host> ConstPtr; 96 Host(const Address & address)97 Host(const Address& address) 98 : address_(address) 99 , rpc_address_(address) 100 , rack_id_(0) 101 , dc_id_(0) 102 , address_string_(address.to_string()) 103 , connection_count_(0) 104 , inflight_request_count_(0) {} 105 address() const106 const Address& address() const { return address_; } address_string() const107 const String& address_string() const { return address_string_; } 108 rpc_address() const109 const Address& rpc_address() const { return rpc_address_; } 110 111 void set(const Row* row, bool use_tokens); 112 rack() const113 const String& rack() const { return rack_; } dc() const114 const String& dc() const { return dc_; } set_rack_and_dc(const String & rack,const String & dc)115 void set_rack_and_dc(const String& rack, const String& dc) { 116 rack_ = rack; 117 dc_ = dc; 118 } 119 rack_id() const120 uint32_t rack_id() const { return rack_id_; } dc_id() const121 uint32_t dc_id() const { return dc_id_; } set_rack_and_dc_ids(uint32_t rack_id,uint32_t dc_id)122 void set_rack_and_dc_ids(uint32_t rack_id, uint32_t dc_id) { 123 rack_id_ = rack_id; 124 dc_id_ = dc_id; 125 } 126 partitioner() const127 const String& partitioner() const { return partitioner_; } 128 tokens() const129 const Vector<String>& tokens() const { return tokens_; } 130 server_version() const131 const VersionNumber& server_version() const { return server_version_; } 132 dse_server_version() const133 const VersionNumber& dse_server_version() const { return dse_server_version_; } 134 to_string() const135 String to_string() const { 136 OStringStream ss; 137 ss << address_string_; 138 if (!rack_.empty() || !dc_.empty()) { 139 ss << " [" << rack_ << ':' << dc_ << "]"; 140 } 141 return ss.str(); 142 } 143 enable_latency_tracking(uint64_t scale,uint64_t min_measured)144 void enable_latency_tracking(uint64_t scale, uint64_t min_measured) { 145 if (!latency_tracker_) { 146 latency_tracker_.reset(new LatencyTracker(scale, (30LL * min_measured) / 100LL)); 147 } 148 } 149 update_latency(uint64_t latency_ns)150 void update_latency(uint64_t latency_ns) { 151 if (latency_tracker_) { 152 LOG_TRACE("Latency %f ms for %s", static_cast<double>(latency_ns) / 1e6, to_string().c_str()); 153 latency_tracker_->update(latency_ns); 154 } 155 } 156 get_current_average() const157 TimestampedAverage get_current_average() const { 158 if (latency_tracker_) { 159 return latency_tracker_->get(); 160 } 161 return TimestampedAverage(); 162 } 163 increment_connection_count()164 void increment_connection_count() { connection_count_.fetch_add(1, MEMORY_ORDER_RELAXED); } 165 decrement_connection_count()166 void decrement_connection_count() { connection_count_.fetch_sub(1, MEMORY_ORDER_RELAXED); } 167 connection_count() const168 int32_t connection_count() const { return connection_count_.load(MEMORY_ORDER_RELAXED); } 169 increment_inflight_requests()170 void increment_inflight_requests() { inflight_request_count_.fetch_add(1, MEMORY_ORDER_RELAXED); } 171 decrement_inflight_requests()172 void decrement_inflight_requests() { inflight_request_count_.fetch_sub(1, MEMORY_ORDER_RELAXED); } 173 inflight_request_count() const174 int32_t inflight_request_count() const { 175 return inflight_request_count_.load(MEMORY_ORDER_RELAXED); 176 } 177 178 private: 179 class LatencyTracker : public Allocated { 180 public: LatencyTracker(uint64_t scale_ns,uint64_t threshold_to_account)181 LatencyTracker(uint64_t scale_ns, uint64_t threshold_to_account) 182 : scale_ns_(scale_ns) 183 , threshold_to_account_(threshold_to_account) {} 184 185 void update(uint64_t latency_ns); 186 get() const187 TimestampedAverage get() const { 188 ScopedSpinlock l(SpinlockPool<LatencyTracker>::get_spinlock(this)); 189 return current_; 190 } 191 192 private: 193 uint64_t scale_ns_; 194 uint64_t threshold_to_account_; 195 TimestampedAverage current_; 196 197 private: 198 DISALLOW_COPY_AND_ASSIGN(LatencyTracker); 199 }; 200 201 private: 202 Address address_; 203 Address rpc_address_; 204 uint32_t rack_id_; 205 uint32_t dc_id_; 206 String address_string_; 207 VersionNumber server_version_; 208 VersionNumber dse_server_version_; 209 String rack_; 210 String dc_; 211 String partitioner_; 212 Vector<String> tokens_; 213 Atomic<int32_t> connection_count_; 214 Atomic<int32_t> inflight_request_count_; 215 216 ScopedPtr<LatencyTracker> latency_tracker_; 217 218 private: 219 DISALLOW_COPY_AND_ASSIGN(Host); 220 }; 221 222 /** 223 * A listener that handles cluster topology and host status changes. 224 */ 225 class HostListener { 226 public: ~HostListener()227 virtual ~HostListener() {} 228 229 /** 230 * A callback that's called when a host is marked as being UP. 231 * 232 * @param host A fully populated host object. 233 */ 234 virtual void on_host_up(const Host::Ptr& host) = 0; 235 236 /** 237 * A callback that's called when a host is marked as being DOWN. 238 * 239 * @param host A fully populated host object. 240 */ 241 virtual void on_host_down(const Host::Ptr& host) = 0; 242 243 /** 244 * A callback that's called when a new host is added to the cluster. 245 * 246 * @param host A fully populated host object. 247 */ 248 virtual void on_host_added(const Host::Ptr& host) = 0; 249 250 /** 251 * A callback that's called when a host is removed from a cluster. 252 * 253 * @param address The address of the host. 254 */ 255 virtual void on_host_removed(const Host::Ptr& host) = 0; 256 }; 257 258 class DefaultHostListener 259 : public HostListener 260 , public RefCounted<DefaultHostListener> { 261 public: 262 typedef SharedRefPtr<DefaultHostListener> Ptr; 263 on_host_up(const Host::Ptr & host)264 virtual void on_host_up(const Host::Ptr& host) {} on_host_down(const Host::Ptr & host)265 virtual void on_host_down(const Host::Ptr& host) {} on_host_added(const Host::Ptr & host)266 virtual void on_host_added(const Host::Ptr& host) {} on_host_removed(const Host::Ptr & host)267 virtual void on_host_removed(const Host::Ptr& host) {} 268 }; 269 270 class ExternalHostListener : public DefaultHostListener { 271 public: 272 typedef SharedRefPtr<ExternalHostListener> Ptr; 273 ExternalHostListener(const CassHostListenerCallback callback, void* data); 274 275 virtual void on_host_up(const Host::Ptr& host); 276 virtual void on_host_down(const Host::Ptr& host); 277 virtual void on_host_added(const Host::Ptr& host); 278 virtual void on_host_removed(const Host::Ptr& host); 279 280 private: 281 const CassHostListenerCallback callback_; 282 void* data_; 283 }; 284 285 typedef Map<Address, Host::Ptr> HostMap; 286 287 struct GetAddress { 288 typedef std::pair<Address, Host::Ptr> Pair; operator ()datastax::internal::core::GetAddress289 const Address& operator()(const Pair& pair) const { return pair.first; } 290 }; 291 292 struct GetHost { 293 typedef std::pair<Address, Host::Ptr> Pair; operator ()datastax::internal::core::GetHost294 Host::Ptr operator()(const Pair& pair) const { return pair.second; } 295 }; 296 297 typedef std::pair<Address, Host::Ptr> HostPair; 298 typedef Vector<Host::Ptr> HostVec; 299 typedef CopyOnWritePtr<HostVec> CopyOnWriteHostVec; 300 301 void add_host(CopyOnWriteHostVec& hosts, const Host::Ptr& host); 302 void remove_host(CopyOnWriteHostVec& hosts, const Host::Ptr& host); 303 bool remove_host(CopyOnWriteHostVec& hosts, const Address& address); 304 305 }}} // namespace datastax::internal::core 306 307 #endif 308