1 /* 2 Copyright (c) DataStax, Inc. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // Based on implemenations of metrics (especially Meter) from Java library 18 // com.codehale.Metrics (https://github.com/dropwizard/metrics) 19 20 #ifndef DATASTAX_INTERNAL_METRICS_HPP 21 #define DATASTAX_INTERNAL_METRICS_HPP 22 23 #include "allocated.hpp" 24 #include "atomic.hpp" 25 #include "constants.hpp" 26 #include "scoped_lock.hpp" 27 #include "scoped_ptr.hpp" 28 #include "utils.hpp" 29 30 #include "third_party/hdr_histogram/hdr_histogram.hpp" 31 32 #include <stdlib.h> 33 #include <uv.h> 34 35 #include <math.h> 36 37 namespace datastax { namespace internal { namespace core { 38 39 class Metrics : public Allocated { 40 public: 41 class ThreadState { 42 public: ThreadState(size_t max_threads)43 ThreadState(size_t max_threads) 44 : max_threads_(max_threads) 45 , thread_count_(1) { 46 uv_key_create(&thread_id_key_); 47 } 48 ~ThreadState()49 ~ThreadState() { uv_key_delete(&thread_id_key_); } 50 max_threads() const51 size_t max_threads() const { return max_threads_; } 52 current_thread_id()53 size_t current_thread_id() { 54 void* id = uv_key_get(&thread_id_key_); 55 if (id == NULL) { 56 size_t thread_id = thread_count_.fetch_add(1); 57 assert(thread_id <= max_threads_); 58 id = reinterpret_cast<void*>(thread_id); 59 uv_key_set(&thread_id_key_, id); 60 } 61 return reinterpret_cast<size_t>(id) - 1; 62 } 63 64 private: 65 const size_t max_threads_; 66 Atomic<size_t> thread_count_; 67 uv_key_t thread_id_key_; 68 }; 69 70 class Counter { 71 public: Counter(ThreadState * thread_state)72 Counter(ThreadState* thread_state) 73 : thread_state_(thread_state) 74 , counters_(new PerThreadCounter[thread_state->max_threads()]) {} 75 inc()76 void inc() { counters_[thread_state_->current_thread_id()].add(1LL); } 77 dec()78 void dec() { counters_[thread_state_->current_thread_id()].sub(1LL); } 79 sum() const80 int64_t sum() const { 81 int64_t sum = 0; 82 for (size_t i = 0; i < thread_state_->max_threads(); ++i) { 83 sum += counters_[i].get(); 84 } 85 return sum; 86 } 87 sum_and_reset()88 int64_t sum_and_reset() { 89 int64_t sum = 0; 90 for (size_t i = 0; i < thread_state_->max_threads(); ++i) { 91 sum += counters_[i].get_and_reset(); 92 } 93 return sum; 94 } 95 96 private: 97 class PerThreadCounter : public Allocated { 98 public: PerThreadCounter()99 PerThreadCounter() 100 : value_(0) {} 101 add(int64_t n)102 void add(int64_t n) { value_.fetch_add(n, MEMORY_ORDER_RELEASE); } 103 sub(int64_t n)104 void sub(int64_t n) { value_.fetch_sub(n, MEMORY_ORDER_RELEASE); } 105 get() const106 int64_t get() const { return value_.load(MEMORY_ORDER_ACQUIRE); } 107 get_and_reset()108 int64_t get_and_reset() { return value_.exchange(0, MEMORY_ORDER_RELEASE); } 109 110 private: 111 Atomic<int64_t> value_; 112 113 static const size_t cacheline_size = 64; 114 char pad__[cacheline_size]; no_unused_private_warning__()115 void no_unused_private_warning__() { pad__[0] = 0; } 116 }; 117 118 private: 119 ThreadState* thread_state_; 120 ScopedArray<PerThreadCounter> counters_; 121 122 private: 123 DISALLOW_COPY_AND_ASSIGN(Counter); 124 }; 125 126 class ExponentiallyWeightedMovingAverage { 127 public: 128 static const uint64_t INTERVAL = 5; 129 ExponentiallyWeightedMovingAverage(double alpha,ThreadState * thread_state)130 ExponentiallyWeightedMovingAverage(double alpha, ThreadState* thread_state) 131 : alpha_(alpha) 132 , uncounted_(thread_state) 133 , is_initialized_(false) 134 , rate_(0.0) {} 135 rate() const136 double rate() const { return rate_.load(MEMORY_ORDER_ACQUIRE); } 137 update()138 void update() { uncounted_.inc(); } 139 tick()140 void tick() { 141 const int64_t count = uncounted_.sum_and_reset(); 142 double instant_rate = static_cast<double>(count) / INTERVAL; 143 144 if (is_initialized_.load(MEMORY_ORDER_ACQUIRE)) { 145 double rate = rate_.load(MEMORY_ORDER_ACQUIRE); 146 rate_.store(rate + (alpha_ * (instant_rate - rate)), MEMORY_ORDER_RELEASE); 147 } else { 148 rate_.store(instant_rate, MEMORY_ORDER_RELEASE); 149 is_initialized_.store(true, MEMORY_ORDER_RELEASE); 150 } 151 } 152 153 private: 154 const double alpha_; 155 Counter uncounted_; 156 Atomic<bool> is_initialized_; 157 Atomic<double> rate_; 158 }; 159 160 class Meter { 161 public: Meter(ThreadState * thread_state)162 Meter(ThreadState* thread_state) 163 : one_minute_rate_( 164 1.0 - exp(-static_cast<double>(ExponentiallyWeightedMovingAverage::INTERVAL) / 60.0 / 165 1), 166 thread_state) 167 , five_minute_rate_( 168 1.0 - exp(-static_cast<double>(ExponentiallyWeightedMovingAverage::INTERVAL) / 60.0 / 169 5), 170 thread_state) 171 , fifteen_minute_rate_( 172 1.0 - exp(-static_cast<double>(ExponentiallyWeightedMovingAverage::INTERVAL) / 60.0 / 173 15), 174 thread_state) 175 , count_(thread_state) 176 , speculative_request_count_(thread_state) 177 , start_time_(uv_hrtime()) 178 , last_tick_(start_time_) {} 179 mark()180 void mark() { 181 tick_if_necessary(); 182 count_.inc(); 183 one_minute_rate_.update(); 184 five_minute_rate_.update(); 185 fifteen_minute_rate_.update(); 186 } 187 mark_speculative()188 void mark_speculative() { speculative_request_count_.inc(); } 189 one_minute_rate() const190 double one_minute_rate() const { return one_minute_rate_.rate(); } five_minute_rate() const191 double five_minute_rate() const { return five_minute_rate_.rate(); } fifteen_minute_rate() const192 double fifteen_minute_rate() const { return fifteen_minute_rate_.rate(); } 193 mean_rate() const194 double mean_rate() const { 195 if (count() == 0) { 196 return 0.0; 197 } else { 198 double elapsed = static_cast<double>(uv_hrtime() - start_time_) / 1e9; 199 return count() / elapsed; 200 } 201 } 202 count() const203 uint64_t count() const { return count_.sum(); } 204 speculative_request_count() const205 uint64_t speculative_request_count() const { return speculative_request_count_.sum(); } 206 speculative_request_percent() const207 double speculative_request_percent() const { 208 // count() gives us the number of requests that we successfully handled. 209 // 210 // speculative_request_count() give us the number of requests sent on 211 // the wire but were aborted after we received a good response. 212 213 uint64_t spec_count = speculative_request_count(); 214 uint64_t total_requests = spec_count + count(); 215 216 // Be wary of div by 0. 217 return total_requests ? static_cast<double>(spec_count) / total_requests * 100 : 0; 218 } 219 220 private: 221 static const uint64_t TICK_INTERVAL = 222 ExponentiallyWeightedMovingAverage::INTERVAL * 1000LL * 1000LL * 1000LL; 223 tick_if_necessary()224 void tick_if_necessary() { 225 uint64_t old_tick = last_tick_.load(); 226 uint64_t new_tick = uv_hrtime(); 227 uint64_t elapsed = new_tick - old_tick; 228 229 if (elapsed > TICK_INTERVAL) { 230 uint64_t new_interval_start_tick = new_tick - elapsed % TICK_INTERVAL; 231 if (last_tick_.compare_exchange_strong(old_tick, new_interval_start_tick)) { 232 uint64_t required_ticks = elapsed / TICK_INTERVAL; 233 for (uint64_t i = 0; i < required_ticks; ++i) { 234 one_minute_rate_.tick(); 235 five_minute_rate_.tick(); 236 fifteen_minute_rate_.tick(); 237 } 238 } 239 } 240 } 241 242 ExponentiallyWeightedMovingAverage one_minute_rate_; 243 ExponentiallyWeightedMovingAverage five_minute_rate_; 244 ExponentiallyWeightedMovingAverage fifteen_minute_rate_; 245 Counter count_; 246 Counter speculative_request_count_; 247 const uint64_t start_time_; 248 Atomic<uint64_t> last_tick_; 249 250 private: 251 DISALLOW_COPY_AND_ASSIGN(Meter); 252 }; 253 254 class Histogram { 255 public: 256 static const int64_t HIGHEST_TRACKABLE_VALUE = 3600LL * 1000LL * 1000LL; 257 258 struct Snapshot { 259 int64_t min; 260 int64_t max; 261 int64_t mean; 262 int64_t stddev; 263 int64_t median; 264 int64_t percentile_75th; 265 int64_t percentile_95th; 266 int64_t percentile_98th; 267 int64_t percentile_99th; 268 int64_t percentile_999th; 269 }; 270 Histogram(ThreadState * thread_state)271 Histogram(ThreadState* thread_state) 272 : thread_state_(thread_state) 273 , histograms_(new PerThreadHistogram[thread_state->max_threads()]) { 274 hdr_init(1LL, HIGHEST_TRACKABLE_VALUE, 3, &histogram_); 275 uv_mutex_init(&mutex_); 276 } 277 ~Histogram()278 ~Histogram() { 279 free(histogram_); 280 uv_mutex_destroy(&mutex_); 281 } 282 record_value(int64_t value)283 void record_value(int64_t value) { 284 histograms_[thread_state_->current_thread_id()].record_value(value); 285 } 286 get_snapshot(Snapshot * snapshot) const287 void get_snapshot(Snapshot* snapshot) const { 288 ScopedMutex l(&mutex_); 289 hdr_histogram* h = histogram_; 290 for (size_t i = 0; i < thread_state_->max_threads(); ++i) { 291 histograms_[i].add(h); 292 } 293 294 if (h->total_count == 0) { 295 // There is no data; default to 0 for the stats. 296 snapshot->max = 0; 297 snapshot->min = 0; 298 snapshot->mean = 0; 299 snapshot->stddev = 0; 300 snapshot->median = 0; 301 snapshot->percentile_75th = 0; 302 snapshot->percentile_95th = 0; 303 snapshot->percentile_98th = 0; 304 snapshot->percentile_99th = 0; 305 snapshot->percentile_999th = 0; 306 } else { 307 snapshot->max = hdr_max(h); 308 snapshot->min = hdr_min(h); 309 snapshot->mean = static_cast<int64_t>(hdr_mean(h)); 310 snapshot->stddev = static_cast<int64_t>(hdr_stddev(h)); 311 snapshot->median = hdr_value_at_percentile(h, 50.0); 312 snapshot->percentile_75th = hdr_value_at_percentile(h, 75.0); 313 snapshot->percentile_95th = hdr_value_at_percentile(h, 95.0); 314 snapshot->percentile_98th = hdr_value_at_percentile(h, 98.0); 315 snapshot->percentile_99th = hdr_value_at_percentile(h, 99.0); 316 snapshot->percentile_999th = hdr_value_at_percentile(h, 99.9); 317 } 318 } 319 320 private: 321 class WriterReaderPhaser { 322 public: WriterReaderPhaser()323 WriterReaderPhaser() 324 : start_epoch_(0) 325 , even_end_epoch_(0) 326 , odd_end_epoch_(CASS_INT64_MIN) {} 327 writer_critical_section_enter()328 int64_t writer_critical_section_enter() { return start_epoch_.fetch_add(1); } 329 writer_critical_section_end(int64_t critical_value_enter)330 void writer_critical_section_end(int64_t critical_value_enter) { 331 if (critical_value_enter < 0) { 332 odd_end_epoch_.fetch_add(1); 333 } else { 334 even_end_epoch_.fetch_add(1); 335 } 336 } 337 338 // The reader is protected by a mutex in Histogram flip_phase()339 void flip_phase() { 340 bool is_next_phase_even = (start_epoch_.load() < 0); 341 342 int64_t initial_start_value; 343 344 if (is_next_phase_even) { 345 initial_start_value = 0; 346 even_end_epoch_.store(initial_start_value, MEMORY_ORDER_RELAXED); 347 } else { 348 initial_start_value = CASS_INT64_MIN; 349 odd_end_epoch_.store(initial_start_value, MEMORY_ORDER_RELAXED); 350 } 351 352 int64_t start_value_at_flip = start_epoch_.exchange(initial_start_value); 353 354 bool is_caught_up = false; 355 do { 356 if (is_next_phase_even) { 357 is_caught_up = (odd_end_epoch_.load() == start_value_at_flip); 358 } else { 359 is_caught_up = (even_end_epoch_.load() == start_value_at_flip); 360 } 361 if (!is_caught_up) { 362 thread_yield(); 363 } 364 } while (!is_caught_up); 365 } 366 367 private: 368 Atomic<int64_t> start_epoch_; 369 Atomic<int64_t> even_end_epoch_; 370 Atomic<int64_t> odd_end_epoch_; 371 }; 372 373 class PerThreadHistogram : public Allocated { 374 public: PerThreadHistogram()375 PerThreadHistogram() 376 : active_index_(0) { 377 hdr_init(1LL, HIGHEST_TRACKABLE_VALUE, 3, &histograms_[0]); 378 hdr_init(1LL, HIGHEST_TRACKABLE_VALUE, 3, &histograms_[1]); 379 } 380 ~PerThreadHistogram()381 ~PerThreadHistogram() { 382 free(histograms_[0]); 383 free(histograms_[1]); 384 } 385 record_value(int64_t value)386 void record_value(int64_t value) { 387 int64_t critical_value_enter = phaser_.writer_critical_section_enter(); 388 hdr_histogram* h = histograms_[active_index_.load()]; 389 hdr_record_value(h, value); 390 phaser_.writer_critical_section_end(critical_value_enter); 391 } 392 add(hdr_histogram * to) const393 void add(hdr_histogram* to) const { 394 int inactive_index = active_index_.exchange(!active_index_.load()); 395 hdr_histogram* from = histograms_[inactive_index]; 396 phaser_.flip_phase(); 397 hdr_add(to, from); 398 hdr_reset(from); 399 } 400 401 private: 402 hdr_histogram* histograms_[2]; 403 mutable Atomic<int> active_index_; 404 mutable WriterReaderPhaser phaser_; 405 }; 406 407 ThreadState* thread_state_; 408 ScopedArray<PerThreadHistogram> histograms_; 409 hdr_histogram* histogram_; 410 mutable uv_mutex_t mutex_; 411 412 private: 413 DISALLOW_COPY_AND_ASSIGN(Histogram); 414 }; 415 Metrics(size_t max_threads)416 Metrics(size_t max_threads) 417 : thread_state_(max_threads) 418 , request_latencies(&thread_state_) 419 , speculative_request_latencies(&thread_state_) 420 , request_rates(&thread_state_) 421 , total_connections(&thread_state_) 422 , connection_timeouts(&thread_state_) 423 , request_timeouts(&thread_state_) {} 424 record_request(uint64_t latency_ns)425 void record_request(uint64_t latency_ns) { 426 // Final measurement is in microseconds 427 request_latencies.record_value(latency_ns / 1000); 428 request_rates.mark(); 429 } 430 record_speculative_request(uint64_t latency_ns)431 void record_speculative_request(uint64_t latency_ns) { 432 // Final measurement is in microseconds 433 speculative_request_latencies.record_value(latency_ns / 1000); 434 request_rates.mark_speculative(); 435 } 436 437 private: 438 ThreadState thread_state_; 439 440 public: 441 Histogram request_latencies; 442 Histogram speculative_request_latencies; 443 Meter request_rates; 444 445 Counter total_connections; 446 447 Counter connection_timeouts; 448 Counter request_timeouts; 449 450 private: 451 DISALLOW_COPY_AND_ASSIGN(Metrics); 452 }; 453 454 }}} // namespace datastax::internal::core 455 456 #endif 457