1 /*
2 * Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
3 * Copyright (c) 2020, Datadog, Inc. All rights reserved.
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This code is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 *
24 */
25 
26 #include "precompiled.hpp"
27 #include "jfr/support/jfrAdaptiveSampler.hpp"
28 #include "jfr/utilities/jfrRandom.inline.hpp"
29 #include "jfr/utilities/jfrSpinlockHelper.hpp"
30 #include "jfr/utilities/jfrTime.hpp"
31 #include "jfr/utilities/jfrTimeConverter.hpp"
32 #include "jfr/utilities/jfrTryLock.hpp"
33 #include "logging/log.hpp"
34 #include "runtime/atomic.hpp"
35 #include "utilities/globalDefinitions.hpp"
36 #include <cmath>
37 
JfrSamplerWindow()38 JfrSamplerWindow::JfrSamplerWindow() :
39   _params(),
40   _end_ticks(0),
41   _sampling_interval(1),
42   _projected_population_size(0),
43   _measured_population_size(0) {}
44 
JfrAdaptiveSampler()45 JfrAdaptiveSampler::JfrAdaptiveSampler() :
46   _prng(this),
47   _window_0(NULL),
48   _window_1(NULL),
49   _active_window(NULL),
50   _avg_population_size(0),
51   _ewma_population_size_alpha(0),
52   _acc_debt_carry_limit(0),
53   _acc_debt_carry_count(0),
54   _lock(0) {}
55 
~JfrAdaptiveSampler()56 JfrAdaptiveSampler::~JfrAdaptiveSampler() {
57   delete _window_0;
58   delete _window_1;
59 }
60 
initialize()61 bool JfrAdaptiveSampler::initialize() {
62   assert(_window_0 == NULL, "invariant");
63   _window_0 = new JfrSamplerWindow();
64   if (_window_0 == NULL) {
65     return false;
66   }
67   assert(_window_1 == NULL, "invariant");
68   _window_1 = new JfrSamplerWindow();
69   if (_window_1 == NULL) {
70     return false;
71   }
72   _active_window = _window_0;
73   return true;
74 }
75 
76 /*
77  * The entry point to the sampler.
78  */
sample(int64_t timestamp)79 bool JfrAdaptiveSampler::sample(int64_t timestamp) {
80   bool expired_window;
81   const bool result = active_window()->sample(timestamp, &expired_window);
82   if (expired_window) {
83     JfrTryLock mutex(&_lock);
84     if (mutex.acquired()) {
85       rotate_window(timestamp);
86     }
87   }
88   return result;
89 }
90 
active_window() const91 inline const JfrSamplerWindow* JfrAdaptiveSampler::active_window() const {
92   return Atomic::load_acquire(&_active_window);
93 }
94 
now()95 inline int64_t now() {
96   return JfrTicks::now().value();
97 }
98 
is_expired(int64_t timestamp) const99 inline bool JfrSamplerWindow::is_expired(int64_t timestamp) const {
100   const int64_t end_ticks = Atomic::load(&_end_ticks);
101   return timestamp == 0 ? now() >= end_ticks : timestamp >= end_ticks;
102 }
103 
sample(int64_t timestamp,bool * expired_window) const104 bool JfrSamplerWindow::sample(int64_t timestamp, bool* expired_window) const {
105   assert(expired_window != NULL, "invariant");
106   *expired_window = is_expired(timestamp);
107   return *expired_window ? false : sample();
108 }
109 
sample() const110 inline bool JfrSamplerWindow::sample() const {
111   const size_t ordinal = Atomic::add(&_measured_population_size, static_cast<size_t>(1));
112   return ordinal <= _projected_population_size && ordinal % _sampling_interval == 0;
113 }
114 
115 // Called exclusively by the holder of the lock when a window is determined to have expired.
rotate_window(int64_t timestamp)116 void JfrAdaptiveSampler::rotate_window(int64_t timestamp) {
117   assert(_lock, "invariant");
118   const JfrSamplerWindow* const current = active_window();
119   assert(current != NULL, "invariant");
120   if (!current->is_expired(timestamp)) {
121     // Someone took care of it.
122     return;
123   }
124   rotate(current);
125 }
126 
127 // Subclasses can call this to immediately trigger a reconfiguration of the sampler.
128 // There is no need to await the expiration of the current active window.
reconfigure()129 void JfrAdaptiveSampler::reconfigure() {
130   assert(_lock, "invariant");
131   rotate(active_window());
132 }
133 
134 // Call next_window_param() to report the expired window and to retreive params for the next window.
rotate(const JfrSamplerWindow * expired)135 void JfrAdaptiveSampler::rotate(const JfrSamplerWindow* expired) {
136   assert(expired == active_window(), "invariant");
137   install(configure(next_window_params(expired), expired));
138 }
139 
install(const JfrSamplerWindow * next)140 inline void JfrAdaptiveSampler::install(const JfrSamplerWindow* next) {
141   assert(next != active_window(), "invariant");
142   Atomic::release_store(&_active_window, next);
143 }
144 
configure(const JfrSamplerParams & params,const JfrSamplerWindow * expired)145 const JfrSamplerWindow* JfrAdaptiveSampler::configure(const JfrSamplerParams& params, const JfrSamplerWindow* expired) {
146   assert(_lock, "invariant");
147   if (params.reconfigure) {
148     // Store updated params once to both windows.
149     const_cast<JfrSamplerWindow*>(expired)->_params = params;
150     next_window(expired)->_params = params;
151     configure(params);
152   }
153   JfrSamplerWindow* const next = set_rate(params, expired);
154   next->initialize(params);
155   return next;
156 }
157 
158 /*
159  * Exponentially Weighted Moving Average (EWMA):
160  *
161  * Y is a datapoint (at time t)
162  * S is the current EMWA (at time t-1)
163  * alpha represents the degree of weighting decrease, a constant smoothing factor between 0 and 1.
164  *
165  * A higher alpha discounts older observations faster.
166  * Returns the new EWMA for S
167 */
168 
exponentially_weighted_moving_average(double Y,double alpha,double S)169 inline double exponentially_weighted_moving_average(double Y, double alpha, double S) {
170   return alpha * Y + (1 - alpha) * S;
171 }
172 
compute_ewma_alpha_coefficient(size_t lookback_count)173 inline double compute_ewma_alpha_coefficient(size_t lookback_count) {
174   return lookback_count <= 1 ? 1 : static_cast<double>(1) / static_cast<double>(lookback_count);
175 }
176 
compute_accumulated_debt_carry_limit(const JfrSamplerParams & params)177 inline size_t compute_accumulated_debt_carry_limit(const JfrSamplerParams& params) {
178   if (params.window_duration_ms == 0 || params.window_duration_ms >= MILLIUNITS) {
179     return 1;
180   }
181   return MILLIUNITS / params.window_duration_ms;
182 }
183 
configure(const JfrSamplerParams & params)184 void JfrAdaptiveSampler::configure(const JfrSamplerParams& params) {
185   assert(params.reconfigure, "invariant");
186   _avg_population_size = 0;
187   _ewma_population_size_alpha = compute_ewma_alpha_coefficient(params.window_lookback_count);
188   _acc_debt_carry_limit = compute_accumulated_debt_carry_limit(params);
189   _acc_debt_carry_count = _acc_debt_carry_limit;
190   params.reconfigure = false;
191 }
192 
millis_to_countertime(int64_t millis)193 inline int64_t millis_to_countertime(int64_t millis) {
194   return JfrTimeConverter::nanos_to_countertime(millis * NANOSECS_PER_MILLISEC);
195 }
196 
initialize(const JfrSamplerParams & params)197 void JfrSamplerWindow::initialize(const JfrSamplerParams& params) {
198   assert(_sampling_interval >= 1, "invariant");
199   if (params.window_duration_ms == 0) {
200     Atomic::store(&_end_ticks, static_cast<int64_t>(0));
201     return;
202   }
203   Atomic::store(&_measured_population_size, static_cast<size_t>(0));
204   const int64_t end_ticks = now() + millis_to_countertime(params.window_duration_ms);
205   Atomic::store(&_end_ticks, end_ticks);
206 }
207 
208 /*
209  * Based on what it has learned from the past, the sampler creates a future 'projection',
210  * a speculation, or model, of what the situation will be like during the next window.
211  * This projection / model is used to derive values for the parameters, which are estimates for
212  * collecting a sample set that, should the model hold, is as close as possible to the target,
213  * i.e. the set point, which is a function of the number of sample_points_per_window + amortization.
214  * The model is a geometric distribution over the number of trials / selections required until success.
215  * For each window, the sampling interval is a random variable from this geometric distribution.
216  */
set_rate(const JfrSamplerParams & params,const JfrSamplerWindow * expired)217 JfrSamplerWindow* JfrAdaptiveSampler::set_rate(const JfrSamplerParams& params, const JfrSamplerWindow* expired) {
218   JfrSamplerWindow* const next = next_window(expired);
219   assert(next != expired, "invariant");
220   const size_t sample_size = project_sample_size(params, expired);
221   if (sample_size == 0) {
222     next->_projected_population_size = 0;
223     return next;
224   }
225   next->_sampling_interval = derive_sampling_interval(sample_size, expired);
226   assert(next->_sampling_interval >= 1, "invariant");
227   next->_projected_population_size = sample_size * next->_sampling_interval;
228   return next;
229 }
230 
next_window(const JfrSamplerWindow * expired) const231 inline JfrSamplerWindow* JfrAdaptiveSampler::next_window(const JfrSamplerWindow* expired) const {
232   assert(expired != NULL, "invariant");
233   return expired == _window_0 ? _window_1 : _window_0;
234 }
235 
project_sample_size(const JfrSamplerParams & params,const JfrSamplerWindow * expired)236 size_t JfrAdaptiveSampler::project_sample_size(const JfrSamplerParams& params, const JfrSamplerWindow* expired) {
237   return params.sample_points_per_window + amortize_debt(expired);
238 }
239 
240 /*
241  * When the sampler is configured to maintain a rate, is employs the concepts
242  * of 'debt' and 'accumulated debt'. 'Accumulated debt' can be thought of as
243  * a cumulative error term, and is indicative for how much the sampler is
244  * deviating from a set point, i.e. the ideal target rate. Debt accumulates naturally
245  * as a function of undersampled windows, caused by system fluctuations,
246  * i.e. too small populations.
247  *
248  * A specified rate is implicitly a _maximal_ rate, so the sampler must ensure
249  * to respect this 'limit'. Rates are normalized as per-second ratios, hence the
250  * limit to respect is on a per second basis. During this second, the sampler
251  * has freedom to dynamically re-adjust, and it does so by 'amortizing'
252  * accumulated debt over a certain number of windows that fall within the second.
253  *
254  * Intuitively, accumulated debt 'carry over' from the predecessor to the successor
255  * window if within the allowable time frame (determined in # of 'windows' given by
256  * _acc_debt_carry_limit). The successor window will sample more points to make amends,
257  * or 'amortize' debt accumulated by its predecessor(s).
258  */
amortize_debt(const JfrSamplerWindow * expired)259 size_t JfrAdaptiveSampler::amortize_debt(const JfrSamplerWindow* expired) {
260   assert(expired != NULL, "invariant");
261   const intptr_t accumulated_debt = expired->accumulated_debt();
262   assert(accumulated_debt <= 0, "invariant");
263   if (_acc_debt_carry_count == _acc_debt_carry_limit) {
264     _acc_debt_carry_count = 1;
265     return 0;
266   }
267   ++_acc_debt_carry_count;
268   return -accumulated_debt; // negation
269 }
270 
max_sample_size() const271 inline size_t JfrSamplerWindow::max_sample_size() const {
272   return _projected_population_size / _sampling_interval;
273 }
274 
275 // The sample size is derived from the measured population size.
sample_size() const276 size_t JfrSamplerWindow::sample_size() const {
277   const size_t size = population_size();
278   return size > _projected_population_size ? max_sample_size() : size / _sampling_interval;
279 }
280 
population_size() const281 size_t JfrSamplerWindow::population_size() const {
282   return Atomic::load(&_measured_population_size);
283 }
284 
accumulated_debt() const285 intptr_t JfrSamplerWindow::accumulated_debt() const {
286   return _projected_population_size == 0 ? 0 : static_cast<intptr_t>(_params.sample_points_per_window - max_sample_size()) + debt();
287 }
288 
debt() const289 intptr_t JfrSamplerWindow::debt() const {
290   return _projected_population_size == 0 ? 0 : static_cast<intptr_t>(sample_size() - _params.sample_points_per_window);
291 }
292 
293 /*
294  * Inverse transform sampling from a uniform to a geometric distribution.
295  *
296  * PMF: f(x)  = P(X=x) = ((1-p)^x-1)p
297  *
298  * CDF: F(x)  = P(X<=x) = 1 - (1-p)^x
299  *
300  * Inv
301  * CDF: F'(u) = ceil( ln(1-u) / ln(1-p) ) // u = random uniform, 0.0 < u < 1.0
302  *
303  */
next_geometric(double p,double u)304 inline size_t next_geometric(double p, double u) {
305   assert(u >= 0.0, "invariant");
306   assert(u <= 1.0, "invariant");
307   if (u == 0.0) {
308     u = 0.01;
309   } else if (u == 1.0) {
310     u = 0.99;
311   }
312   // Inverse CDF for the geometric distribution.
313   return ceil(log(1.0 - u) / log(1.0 - p));
314 }
315 
derive_sampling_interval(double sample_size,const JfrSamplerWindow * expired)316 size_t JfrAdaptiveSampler::derive_sampling_interval(double sample_size, const JfrSamplerWindow* expired) {
317   assert(sample_size > 0, "invariant");
318   const size_t population_size = project_population_size(expired);
319   if (population_size <= sample_size) {
320     return 1;
321   }
322   assert(population_size > 0, "invariant");
323   const double projected_probability = sample_size / population_size;
324   return next_geometric(projected_probability, _prng.next_uniform());
325 }
326 
327 // The projected population size is an exponentially weighted moving average, a function of the window_lookback_count.
project_population_size(const JfrSamplerWindow * expired)328 inline size_t JfrAdaptiveSampler::project_population_size(const JfrSamplerWindow* expired) {
329   assert(expired != NULL, "invariant");
330   _avg_population_size = exponentially_weighted_moving_average(expired->population_size(), _ewma_population_size_alpha, _avg_population_size);
331   return _avg_population_size;
332 }
333 
334 /* GTEST support */
JfrGTestFixedRateSampler(size_t sample_points_per_window,size_t window_duration_ms,size_t lookback_count)335 JfrGTestFixedRateSampler::JfrGTestFixedRateSampler(size_t sample_points_per_window, size_t window_duration_ms, size_t lookback_count) : JfrAdaptiveSampler(), _params() {
336   _sample_size_ewma = 0.0;
337   _params.sample_points_per_window = sample_points_per_window;
338   _params.window_duration_ms = window_duration_ms;
339   _params.window_lookback_count = lookback_count;
340   _params.reconfigure = true;
341 }
342 
initialize()343 bool JfrGTestFixedRateSampler::initialize() {
344   const bool result = JfrAdaptiveSampler::initialize();
345   JfrSpinlockHelper mutex(&_lock);
346   reconfigure();
347   return result;
348 }
349 
350 /*
351  * To start debugging the sampler: -Xlog:jfr+system+throttle=debug
352  * It will log details of each expired window together with an average sample size.
353  *
354  * Excerpt:
355  *
356  * "JfrGTestFixedRateSampler: avg.sample size: 19.8377, window set point: 20 ..."
357  *
358  * Monitoring the relation of average sample size to the window set point, i.e the target,
359  * is a good indicator of how the sampler is performing over time.
360  *
361  */
log(const JfrSamplerWindow * expired,double * sample_size_ewma)362 static void log(const JfrSamplerWindow* expired, double* sample_size_ewma) {
363   assert(sample_size_ewma != NULL, "invariant");
364   if (log_is_enabled(Debug, jfr, system, throttle)) {
365     *sample_size_ewma = exponentially_weighted_moving_average(expired->sample_size(), compute_ewma_alpha_coefficient(expired->params().window_lookback_count), *sample_size_ewma);
366     log_debug(jfr, system, throttle)("JfrGTestFixedRateSampler: avg.sample size: %0.4f, window set point: %zu, sample size: %zu, population size: %zu, ratio: %.4f, window duration: %zu ms\n",
367       *sample_size_ewma, expired->params().sample_points_per_window, expired->sample_size(), expired->population_size(),
368       expired->population_size() == 0 ? 0 : (double)expired->sample_size() / (double)expired->population_size(),
369       expired->params().window_duration_ms);
370   }
371 }
372 
373 /*
374  * This is the feedback control loop.
375  *
376  * The JfrAdaptiveSampler engine calls this when a sampler window has expired, providing
377  * us with an opportunity to perform some analysis.To reciprocate, we returns a set of
378  * parameters, possibly updated, for the engine to apply to the next window.
379  */
next_window_params(const JfrSamplerWindow * expired)380 const JfrSamplerParams& JfrGTestFixedRateSampler::next_window_params(const JfrSamplerWindow* expired) {
381   assert(expired != NULL, "invariant");
382   assert(_lock, "invariant");
383   log(expired, &_sample_size_ewma);
384   return _params;
385 }
386