1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 // Based on implemenations of metrics (especially Meter) from Java library
18 // com.codehale.Metrics (https://github.com/dropwizard/metrics)
19 
20 #ifndef DATASTAX_INTERNAL_METRICS_HPP
21 #define DATASTAX_INTERNAL_METRICS_HPP
22 
23 #include "allocated.hpp"
24 #include "atomic.hpp"
25 #include "constants.hpp"
26 #include "scoped_lock.hpp"
27 #include "scoped_ptr.hpp"
28 #include "utils.hpp"
29 
30 #include "third_party/hdr_histogram/hdr_histogram.hpp"
31 
32 #include <stdlib.h>
33 #include <uv.h>
34 
35 #include <math.h>
36 
37 namespace datastax { namespace internal { namespace core {
38 
39 class Metrics : public Allocated {
40 public:
41   class ThreadState {
42   public:
ThreadState(size_t max_threads)43     ThreadState(size_t max_threads)
44         : max_threads_(max_threads)
45         , thread_count_(1) {
46       uv_key_create(&thread_id_key_);
47     }
48 
~ThreadState()49     ~ThreadState() { uv_key_delete(&thread_id_key_); }
50 
max_threads() const51     size_t max_threads() const { return max_threads_; }
52 
current_thread_id()53     size_t current_thread_id() {
54       void* id = uv_key_get(&thread_id_key_);
55       if (id == NULL) {
56         size_t thread_id = thread_count_.fetch_add(1);
57         assert(thread_id <= max_threads_);
58         id = reinterpret_cast<void*>(thread_id);
59         uv_key_set(&thread_id_key_, id);
60       }
61       return reinterpret_cast<size_t>(id) - 1;
62     }
63 
64   private:
65     const size_t max_threads_;
66     Atomic<size_t> thread_count_;
67     uv_key_t thread_id_key_;
68   };
69 
70   class Counter {
71   public:
Counter(ThreadState * thread_state)72     Counter(ThreadState* thread_state)
73         : thread_state_(thread_state)
74         , counters_(new PerThreadCounter[thread_state->max_threads()]) {}
75 
inc()76     void inc() { counters_[thread_state_->current_thread_id()].add(1LL); }
77 
dec()78     void dec() { counters_[thread_state_->current_thread_id()].sub(1LL); }
79 
sum() const80     int64_t sum() const {
81       int64_t sum = 0;
82       for (size_t i = 0; i < thread_state_->max_threads(); ++i) {
83         sum += counters_[i].get();
84       }
85       return sum;
86     }
87 
sum_and_reset()88     int64_t sum_and_reset() {
89       int64_t sum = 0;
90       for (size_t i = 0; i < thread_state_->max_threads(); ++i) {
91         sum += counters_[i].get_and_reset();
92       }
93       return sum;
94     }
95 
96   private:
97     class PerThreadCounter : public Allocated {
98     public:
PerThreadCounter()99       PerThreadCounter()
100           : value_(0) {}
101 
add(int64_t n)102       void add(int64_t n) { value_.fetch_add(n, MEMORY_ORDER_RELEASE); }
103 
sub(int64_t n)104       void sub(int64_t n) { value_.fetch_sub(n, MEMORY_ORDER_RELEASE); }
105 
get() const106       int64_t get() const { return value_.load(MEMORY_ORDER_ACQUIRE); }
107 
get_and_reset()108       int64_t get_and_reset() { return value_.exchange(0, MEMORY_ORDER_RELEASE); }
109 
110     private:
111       Atomic<int64_t> value_;
112 
113       static const size_t cacheline_size = 64;
114       char pad__[cacheline_size];
no_unused_private_warning__()115       void no_unused_private_warning__() { pad__[0] = 0; }
116     };
117 
118   private:
119     ThreadState* thread_state_;
120     ScopedArray<PerThreadCounter> counters_;
121 
122   private:
123     DISALLOW_COPY_AND_ASSIGN(Counter);
124   };
125 
126   class ExponentiallyWeightedMovingAverage {
127   public:
128     static const uint64_t INTERVAL = 5;
129 
ExponentiallyWeightedMovingAverage(double alpha,ThreadState * thread_state)130     ExponentiallyWeightedMovingAverage(double alpha, ThreadState* thread_state)
131         : alpha_(alpha)
132         , uncounted_(thread_state)
133         , is_initialized_(false)
134         , rate_(0.0) {}
135 
rate() const136     double rate() const { return rate_.load(MEMORY_ORDER_ACQUIRE); }
137 
update()138     void update() { uncounted_.inc(); }
139 
tick()140     void tick() {
141       const int64_t count = uncounted_.sum_and_reset();
142       double instant_rate = static_cast<double>(count) / INTERVAL;
143 
144       if (is_initialized_.load(MEMORY_ORDER_ACQUIRE)) {
145         double rate = rate_.load(MEMORY_ORDER_ACQUIRE);
146         rate_.store(rate + (alpha_ * (instant_rate - rate)), MEMORY_ORDER_RELEASE);
147       } else {
148         rate_.store(instant_rate, MEMORY_ORDER_RELEASE);
149         is_initialized_.store(true, MEMORY_ORDER_RELEASE);
150       }
151     }
152 
153   private:
154     const double alpha_;
155     Counter uncounted_;
156     Atomic<bool> is_initialized_;
157     Atomic<double> rate_;
158   };
159 
160   class Meter {
161   public:
Meter(ThreadState * thread_state)162     Meter(ThreadState* thread_state)
163         : one_minute_rate_(
164               1.0 - exp(-static_cast<double>(ExponentiallyWeightedMovingAverage::INTERVAL) / 60.0 /
165                         1),
166               thread_state)
167         , five_minute_rate_(
168               1.0 - exp(-static_cast<double>(ExponentiallyWeightedMovingAverage::INTERVAL) / 60.0 /
169                         5),
170               thread_state)
171         , fifteen_minute_rate_(
172               1.0 - exp(-static_cast<double>(ExponentiallyWeightedMovingAverage::INTERVAL) / 60.0 /
173                         15),
174               thread_state)
175         , count_(thread_state)
176         , speculative_request_count_(thread_state)
177         , start_time_(uv_hrtime())
178         , last_tick_(start_time_) {}
179 
mark()180     void mark() {
181       tick_if_necessary();
182       count_.inc();
183       one_minute_rate_.update();
184       five_minute_rate_.update();
185       fifteen_minute_rate_.update();
186     }
187 
mark_speculative()188     void mark_speculative() { speculative_request_count_.inc(); }
189 
one_minute_rate() const190     double one_minute_rate() const { return one_minute_rate_.rate(); }
five_minute_rate() const191     double five_minute_rate() const { return five_minute_rate_.rate(); }
fifteen_minute_rate() const192     double fifteen_minute_rate() const { return fifteen_minute_rate_.rate(); }
193 
mean_rate() const194     double mean_rate() const {
195       if (count() == 0) {
196         return 0.0;
197       } else {
198         double elapsed = static_cast<double>(uv_hrtime() - start_time_) / 1e9;
199         return count() / elapsed;
200       }
201     }
202 
count() const203     uint64_t count() const { return count_.sum(); }
204 
speculative_request_count() const205     uint64_t speculative_request_count() const { return speculative_request_count_.sum(); }
206 
speculative_request_percent() const207     double speculative_request_percent() const {
208       // count() gives us the number of requests that we successfully handled.
209       //
210       // speculative_request_count() give us the number of requests sent on
211       // the wire but were aborted after we received a good response.
212 
213       uint64_t spec_count = speculative_request_count();
214       uint64_t total_requests = spec_count + count();
215 
216       // Be wary of div by 0.
217       return total_requests ? static_cast<double>(spec_count) / total_requests * 100 : 0;
218     }
219 
220   private:
221     static const uint64_t TICK_INTERVAL =
222         ExponentiallyWeightedMovingAverage::INTERVAL * 1000LL * 1000LL * 1000LL;
223 
tick_if_necessary()224     void tick_if_necessary() {
225       uint64_t old_tick = last_tick_.load();
226       uint64_t new_tick = uv_hrtime();
227       uint64_t elapsed = new_tick - old_tick;
228 
229       if (elapsed > TICK_INTERVAL) {
230         uint64_t new_interval_start_tick = new_tick - elapsed % TICK_INTERVAL;
231         if (last_tick_.compare_exchange_strong(old_tick, new_interval_start_tick)) {
232           uint64_t required_ticks = elapsed / TICK_INTERVAL;
233           for (uint64_t i = 0; i < required_ticks; ++i) {
234             one_minute_rate_.tick();
235             five_minute_rate_.tick();
236             fifteen_minute_rate_.tick();
237           }
238         }
239       }
240     }
241 
242     ExponentiallyWeightedMovingAverage one_minute_rate_;
243     ExponentiallyWeightedMovingAverage five_minute_rate_;
244     ExponentiallyWeightedMovingAverage fifteen_minute_rate_;
245     Counter count_;
246     Counter speculative_request_count_;
247     const uint64_t start_time_;
248     Atomic<uint64_t> last_tick_;
249 
250   private:
251     DISALLOW_COPY_AND_ASSIGN(Meter);
252   };
253 
254   class Histogram {
255   public:
256     static const int64_t HIGHEST_TRACKABLE_VALUE = 3600LL * 1000LL * 1000LL;
257 
258     struct Snapshot {
259       int64_t min;
260       int64_t max;
261       int64_t mean;
262       int64_t stddev;
263       int64_t median;
264       int64_t percentile_75th;
265       int64_t percentile_95th;
266       int64_t percentile_98th;
267       int64_t percentile_99th;
268       int64_t percentile_999th;
269     };
270 
Histogram(ThreadState * thread_state)271     Histogram(ThreadState* thread_state)
272         : thread_state_(thread_state)
273         , histograms_(new PerThreadHistogram[thread_state->max_threads()]) {
274       hdr_init(1LL, HIGHEST_TRACKABLE_VALUE, 3, &histogram_);
275       uv_mutex_init(&mutex_);
276     }
277 
~Histogram()278     ~Histogram() {
279       free(histogram_);
280       uv_mutex_destroy(&mutex_);
281     }
282 
record_value(int64_t value)283     void record_value(int64_t value) {
284       histograms_[thread_state_->current_thread_id()].record_value(value);
285     }
286 
get_snapshot(Snapshot * snapshot) const287     void get_snapshot(Snapshot* snapshot) const {
288       ScopedMutex l(&mutex_);
289       hdr_histogram* h = histogram_;
290       for (size_t i = 0; i < thread_state_->max_threads(); ++i) {
291         histograms_[i].add(h);
292       }
293 
294       if (h->total_count == 0) {
295         // There is no data; default to 0 for the stats.
296         snapshot->max = 0;
297         snapshot->min = 0;
298         snapshot->mean = 0;
299         snapshot->stddev = 0;
300         snapshot->median = 0;
301         snapshot->percentile_75th = 0;
302         snapshot->percentile_95th = 0;
303         snapshot->percentile_98th = 0;
304         snapshot->percentile_99th = 0;
305         snapshot->percentile_999th = 0;
306       } else {
307         snapshot->max = hdr_max(h);
308         snapshot->min = hdr_min(h);
309         snapshot->mean = static_cast<int64_t>(hdr_mean(h));
310         snapshot->stddev = static_cast<int64_t>(hdr_stddev(h));
311         snapshot->median = hdr_value_at_percentile(h, 50.0);
312         snapshot->percentile_75th = hdr_value_at_percentile(h, 75.0);
313         snapshot->percentile_95th = hdr_value_at_percentile(h, 95.0);
314         snapshot->percentile_98th = hdr_value_at_percentile(h, 98.0);
315         snapshot->percentile_99th = hdr_value_at_percentile(h, 99.0);
316         snapshot->percentile_999th = hdr_value_at_percentile(h, 99.9);
317       }
318     }
319 
320   private:
321     class WriterReaderPhaser {
322     public:
WriterReaderPhaser()323       WriterReaderPhaser()
324           : start_epoch_(0)
325           , even_end_epoch_(0)
326           , odd_end_epoch_(CASS_INT64_MIN) {}
327 
writer_critical_section_enter()328       int64_t writer_critical_section_enter() { return start_epoch_.fetch_add(1); }
329 
writer_critical_section_end(int64_t critical_value_enter)330       void writer_critical_section_end(int64_t critical_value_enter) {
331         if (critical_value_enter < 0) {
332           odd_end_epoch_.fetch_add(1);
333         } else {
334           even_end_epoch_.fetch_add(1);
335         }
336       }
337 
338       // The reader is protected by a mutex in Histogram
flip_phase()339       void flip_phase() {
340         bool is_next_phase_even = (start_epoch_.load() < 0);
341 
342         int64_t initial_start_value;
343 
344         if (is_next_phase_even) {
345           initial_start_value = 0;
346           even_end_epoch_.store(initial_start_value, MEMORY_ORDER_RELAXED);
347         } else {
348           initial_start_value = CASS_INT64_MIN;
349           odd_end_epoch_.store(initial_start_value, MEMORY_ORDER_RELAXED);
350         }
351 
352         int64_t start_value_at_flip = start_epoch_.exchange(initial_start_value);
353 
354         bool is_caught_up = false;
355         do {
356           if (is_next_phase_even) {
357             is_caught_up = (odd_end_epoch_.load() == start_value_at_flip);
358           } else {
359             is_caught_up = (even_end_epoch_.load() == start_value_at_flip);
360           }
361           if (!is_caught_up) {
362             thread_yield();
363           }
364         } while (!is_caught_up);
365       }
366 
367     private:
368       Atomic<int64_t> start_epoch_;
369       Atomic<int64_t> even_end_epoch_;
370       Atomic<int64_t> odd_end_epoch_;
371     };
372 
373     class PerThreadHistogram : public Allocated {
374     public:
PerThreadHistogram()375       PerThreadHistogram()
376           : active_index_(0) {
377         hdr_init(1LL, HIGHEST_TRACKABLE_VALUE, 3, &histograms_[0]);
378         hdr_init(1LL, HIGHEST_TRACKABLE_VALUE, 3, &histograms_[1]);
379       }
380 
~PerThreadHistogram()381       ~PerThreadHistogram() {
382         free(histograms_[0]);
383         free(histograms_[1]);
384       }
385 
record_value(int64_t value)386       void record_value(int64_t value) {
387         int64_t critical_value_enter = phaser_.writer_critical_section_enter();
388         hdr_histogram* h = histograms_[active_index_.load()];
389         hdr_record_value(h, value);
390         phaser_.writer_critical_section_end(critical_value_enter);
391       }
392 
add(hdr_histogram * to) const393       void add(hdr_histogram* to) const {
394         int inactive_index = active_index_.exchange(!active_index_.load());
395         hdr_histogram* from = histograms_[inactive_index];
396         phaser_.flip_phase();
397         hdr_add(to, from);
398         hdr_reset(from);
399       }
400 
401     private:
402       hdr_histogram* histograms_[2];
403       mutable Atomic<int> active_index_;
404       mutable WriterReaderPhaser phaser_;
405     };
406 
407     ThreadState* thread_state_;
408     ScopedArray<PerThreadHistogram> histograms_;
409     hdr_histogram* histogram_;
410     mutable uv_mutex_t mutex_;
411 
412   private:
413     DISALLOW_COPY_AND_ASSIGN(Histogram);
414   };
415 
Metrics(size_t max_threads)416   Metrics(size_t max_threads)
417       : thread_state_(max_threads)
418       , request_latencies(&thread_state_)
419       , speculative_request_latencies(&thread_state_)
420       , request_rates(&thread_state_)
421       , total_connections(&thread_state_)
422       , connection_timeouts(&thread_state_)
423       , request_timeouts(&thread_state_) {}
424 
record_request(uint64_t latency_ns)425   void record_request(uint64_t latency_ns) {
426     // Final measurement is in microseconds
427     request_latencies.record_value(latency_ns / 1000);
428     request_rates.mark();
429   }
430 
record_speculative_request(uint64_t latency_ns)431   void record_speculative_request(uint64_t latency_ns) {
432     // Final measurement is in microseconds
433     speculative_request_latencies.record_value(latency_ns / 1000);
434     request_rates.mark_speculative();
435   }
436 
437 private:
438   ThreadState thread_state_;
439 
440 public:
441   Histogram request_latencies;
442   Histogram speculative_request_latencies;
443   Meter request_rates;
444 
445   Counter total_connections;
446 
447   Counter connection_timeouts;
448   Counter request_timeouts;
449 
450 private:
451   DISALLOW_COPY_AND_ASSIGN(Metrics);
452 };
453 
454 }}} // namespace datastax::internal::core
455 
456 #endif
457