1 /*
2 * Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
3 * Copyright (c) 2020, Datadog, Inc. All rights reserved.
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This code is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 *
24 */
25
26 #include "precompiled.hpp"
27 #include "jfr/support/jfrAdaptiveSampler.hpp"
28 #include "jfr/utilities/jfrRandom.inline.hpp"
29 #include "jfr/utilities/jfrSpinlockHelper.hpp"
30 #include "jfr/utilities/jfrTime.hpp"
31 #include "jfr/utilities/jfrTimeConverter.hpp"
32 #include "jfr/utilities/jfrTryLock.hpp"
33 #include "logging/log.hpp"
34 #include "runtime/atomic.hpp"
35 #include "utilities/globalDefinitions.hpp"
36 #include <cmath>
37
JfrSamplerWindow()38 JfrSamplerWindow::JfrSamplerWindow() :
39 _params(),
40 _end_ticks(0),
41 _sampling_interval(1),
42 _projected_population_size(0),
43 _measured_population_size(0) {}
44
JfrAdaptiveSampler()45 JfrAdaptiveSampler::JfrAdaptiveSampler() :
46 _prng(this),
47 _window_0(NULL),
48 _window_1(NULL),
49 _active_window(NULL),
50 _avg_population_size(0),
51 _ewma_population_size_alpha(0),
52 _acc_debt_carry_limit(0),
53 _acc_debt_carry_count(0),
54 _lock(0) {}
55
~JfrAdaptiveSampler()56 JfrAdaptiveSampler::~JfrAdaptiveSampler() {
57 delete _window_0;
58 delete _window_1;
59 }
60
initialize()61 bool JfrAdaptiveSampler::initialize() {
62 assert(_window_0 == NULL, "invariant");
63 _window_0 = new JfrSamplerWindow();
64 if (_window_0 == NULL) {
65 return false;
66 }
67 assert(_window_1 == NULL, "invariant");
68 _window_1 = new JfrSamplerWindow();
69 if (_window_1 == NULL) {
70 return false;
71 }
72 _active_window = _window_0;
73 return true;
74 }
75
76 /*
77 * The entry point to the sampler.
78 */
sample(int64_t timestamp)79 bool JfrAdaptiveSampler::sample(int64_t timestamp) {
80 bool expired_window;
81 const bool result = active_window()->sample(timestamp, &expired_window);
82 if (expired_window) {
83 JfrTryLock mutex(&_lock);
84 if (mutex.acquired()) {
85 rotate_window(timestamp);
86 }
87 }
88 return result;
89 }
90
active_window() const91 inline const JfrSamplerWindow* JfrAdaptiveSampler::active_window() const {
92 return Atomic::load_acquire(&_active_window);
93 }
94
now()95 inline int64_t now() {
96 return JfrTicks::now().value();
97 }
98
is_expired(int64_t timestamp) const99 inline bool JfrSamplerWindow::is_expired(int64_t timestamp) const {
100 const int64_t end_ticks = Atomic::load(&_end_ticks);
101 return timestamp == 0 ? now() >= end_ticks : timestamp >= end_ticks;
102 }
103
sample(int64_t timestamp,bool * expired_window) const104 bool JfrSamplerWindow::sample(int64_t timestamp, bool* expired_window) const {
105 assert(expired_window != NULL, "invariant");
106 *expired_window = is_expired(timestamp);
107 return *expired_window ? false : sample();
108 }
109
sample() const110 inline bool JfrSamplerWindow::sample() const {
111 const size_t ordinal = Atomic::add(&_measured_population_size, static_cast<size_t>(1));
112 return ordinal <= _projected_population_size && ordinal % _sampling_interval == 0;
113 }
114
115 // Called exclusively by the holder of the lock when a window is determined to have expired.
rotate_window(int64_t timestamp)116 void JfrAdaptiveSampler::rotate_window(int64_t timestamp) {
117 assert(_lock, "invariant");
118 const JfrSamplerWindow* const current = active_window();
119 assert(current != NULL, "invariant");
120 if (!current->is_expired(timestamp)) {
121 // Someone took care of it.
122 return;
123 }
124 rotate(current);
125 }
126
127 // Subclasses can call this to immediately trigger a reconfiguration of the sampler.
128 // There is no need to await the expiration of the current active window.
reconfigure()129 void JfrAdaptiveSampler::reconfigure() {
130 assert(_lock, "invariant");
131 rotate(active_window());
132 }
133
134 // Call next_window_param() to report the expired window and to retreive params for the next window.
rotate(const JfrSamplerWindow * expired)135 void JfrAdaptiveSampler::rotate(const JfrSamplerWindow* expired) {
136 assert(expired == active_window(), "invariant");
137 install(configure(next_window_params(expired), expired));
138 }
139
install(const JfrSamplerWindow * next)140 inline void JfrAdaptiveSampler::install(const JfrSamplerWindow* next) {
141 assert(next != active_window(), "invariant");
142 Atomic::release_store(&_active_window, next);
143 }
144
configure(const JfrSamplerParams & params,const JfrSamplerWindow * expired)145 const JfrSamplerWindow* JfrAdaptiveSampler::configure(const JfrSamplerParams& params, const JfrSamplerWindow* expired) {
146 assert(_lock, "invariant");
147 if (params.reconfigure) {
148 // Store updated params once to both windows.
149 const_cast<JfrSamplerWindow*>(expired)->_params = params;
150 next_window(expired)->_params = params;
151 configure(params);
152 }
153 JfrSamplerWindow* const next = set_rate(params, expired);
154 next->initialize(params);
155 return next;
156 }
157
158 /*
159 * Exponentially Weighted Moving Average (EWMA):
160 *
161 * Y is a datapoint (at time t)
162 * S is the current EMWA (at time t-1)
163 * alpha represents the degree of weighting decrease, a constant smoothing factor between 0 and 1.
164 *
165 * A higher alpha discounts older observations faster.
166 * Returns the new EWMA for S
167 */
168
exponentially_weighted_moving_average(double Y,double alpha,double S)169 inline double exponentially_weighted_moving_average(double Y, double alpha, double S) {
170 return alpha * Y + (1 - alpha) * S;
171 }
172
compute_ewma_alpha_coefficient(size_t lookback_count)173 inline double compute_ewma_alpha_coefficient(size_t lookback_count) {
174 return lookback_count <= 1 ? 1 : static_cast<double>(1) / static_cast<double>(lookback_count);
175 }
176
compute_accumulated_debt_carry_limit(const JfrSamplerParams & params)177 inline size_t compute_accumulated_debt_carry_limit(const JfrSamplerParams& params) {
178 if (params.window_duration_ms == 0 || params.window_duration_ms >= MILLIUNITS) {
179 return 1;
180 }
181 return MILLIUNITS / params.window_duration_ms;
182 }
183
configure(const JfrSamplerParams & params)184 void JfrAdaptiveSampler::configure(const JfrSamplerParams& params) {
185 assert(params.reconfigure, "invariant");
186 _avg_population_size = 0;
187 _ewma_population_size_alpha = compute_ewma_alpha_coefficient(params.window_lookback_count);
188 _acc_debt_carry_limit = compute_accumulated_debt_carry_limit(params);
189 _acc_debt_carry_count = _acc_debt_carry_limit;
190 params.reconfigure = false;
191 }
192
millis_to_countertime(int64_t millis)193 inline int64_t millis_to_countertime(int64_t millis) {
194 return JfrTimeConverter::nanos_to_countertime(millis * NANOSECS_PER_MILLISEC);
195 }
196
initialize(const JfrSamplerParams & params)197 void JfrSamplerWindow::initialize(const JfrSamplerParams& params) {
198 assert(_sampling_interval >= 1, "invariant");
199 if (params.window_duration_ms == 0) {
200 Atomic::store(&_end_ticks, static_cast<int64_t>(0));
201 return;
202 }
203 Atomic::store(&_measured_population_size, static_cast<size_t>(0));
204 const int64_t end_ticks = now() + millis_to_countertime(params.window_duration_ms);
205 Atomic::store(&_end_ticks, end_ticks);
206 }
207
208 /*
209 * Based on what it has learned from the past, the sampler creates a future 'projection',
210 * a speculation, or model, of what the situation will be like during the next window.
211 * This projection / model is used to derive values for the parameters, which are estimates for
212 * collecting a sample set that, should the model hold, is as close as possible to the target,
213 * i.e. the set point, which is a function of the number of sample_points_per_window + amortization.
214 * The model is a geometric distribution over the number of trials / selections required until success.
215 * For each window, the sampling interval is a random variable from this geometric distribution.
216 */
set_rate(const JfrSamplerParams & params,const JfrSamplerWindow * expired)217 JfrSamplerWindow* JfrAdaptiveSampler::set_rate(const JfrSamplerParams& params, const JfrSamplerWindow* expired) {
218 JfrSamplerWindow* const next = next_window(expired);
219 assert(next != expired, "invariant");
220 const size_t sample_size = project_sample_size(params, expired);
221 if (sample_size == 0) {
222 next->_projected_population_size = 0;
223 return next;
224 }
225 next->_sampling_interval = derive_sampling_interval(sample_size, expired);
226 assert(next->_sampling_interval >= 1, "invariant");
227 next->_projected_population_size = sample_size * next->_sampling_interval;
228 return next;
229 }
230
next_window(const JfrSamplerWindow * expired) const231 inline JfrSamplerWindow* JfrAdaptiveSampler::next_window(const JfrSamplerWindow* expired) const {
232 assert(expired != NULL, "invariant");
233 return expired == _window_0 ? _window_1 : _window_0;
234 }
235
project_sample_size(const JfrSamplerParams & params,const JfrSamplerWindow * expired)236 size_t JfrAdaptiveSampler::project_sample_size(const JfrSamplerParams& params, const JfrSamplerWindow* expired) {
237 return params.sample_points_per_window + amortize_debt(expired);
238 }
239
240 /*
241 * When the sampler is configured to maintain a rate, is employs the concepts
242 * of 'debt' and 'accumulated debt'. 'Accumulated debt' can be thought of as
243 * a cumulative error term, and is indicative for how much the sampler is
244 * deviating from a set point, i.e. the ideal target rate. Debt accumulates naturally
245 * as a function of undersampled windows, caused by system fluctuations,
246 * i.e. too small populations.
247 *
248 * A specified rate is implicitly a _maximal_ rate, so the sampler must ensure
249 * to respect this 'limit'. Rates are normalized as per-second ratios, hence the
250 * limit to respect is on a per second basis. During this second, the sampler
251 * has freedom to dynamically re-adjust, and it does so by 'amortizing'
252 * accumulated debt over a certain number of windows that fall within the second.
253 *
254 * Intuitively, accumulated debt 'carry over' from the predecessor to the successor
255 * window if within the allowable time frame (determined in # of 'windows' given by
256 * _acc_debt_carry_limit). The successor window will sample more points to make amends,
257 * or 'amortize' debt accumulated by its predecessor(s).
258 */
amortize_debt(const JfrSamplerWindow * expired)259 size_t JfrAdaptiveSampler::amortize_debt(const JfrSamplerWindow* expired) {
260 assert(expired != NULL, "invariant");
261 const intptr_t accumulated_debt = expired->accumulated_debt();
262 assert(accumulated_debt <= 0, "invariant");
263 if (_acc_debt_carry_count == _acc_debt_carry_limit) {
264 _acc_debt_carry_count = 1;
265 return 0;
266 }
267 ++_acc_debt_carry_count;
268 return -accumulated_debt; // negation
269 }
270
max_sample_size() const271 inline size_t JfrSamplerWindow::max_sample_size() const {
272 return _projected_population_size / _sampling_interval;
273 }
274
275 // The sample size is derived from the measured population size.
sample_size() const276 size_t JfrSamplerWindow::sample_size() const {
277 const size_t size = population_size();
278 return size > _projected_population_size ? max_sample_size() : size / _sampling_interval;
279 }
280
population_size() const281 size_t JfrSamplerWindow::population_size() const {
282 return Atomic::load(&_measured_population_size);
283 }
284
accumulated_debt() const285 intptr_t JfrSamplerWindow::accumulated_debt() const {
286 return _projected_population_size == 0 ? 0 : static_cast<intptr_t>(_params.sample_points_per_window - max_sample_size()) + debt();
287 }
288
debt() const289 intptr_t JfrSamplerWindow::debt() const {
290 return _projected_population_size == 0 ? 0 : static_cast<intptr_t>(sample_size() - _params.sample_points_per_window);
291 }
292
293 /*
294 * Inverse transform sampling from a uniform to a geometric distribution.
295 *
296 * PMF: f(x) = P(X=x) = ((1-p)^x-1)p
297 *
298 * CDF: F(x) = P(X<=x) = 1 - (1-p)^x
299 *
300 * Inv
301 * CDF: F'(u) = ceil( ln(1-u) / ln(1-p) ) // u = random uniform, 0.0 < u < 1.0
302 *
303 */
next_geometric(double p,double u)304 inline size_t next_geometric(double p, double u) {
305 assert(u >= 0.0, "invariant");
306 assert(u <= 1.0, "invariant");
307 if (u == 0.0) {
308 u = 0.01;
309 } else if (u == 1.0) {
310 u = 0.99;
311 }
312 // Inverse CDF for the geometric distribution.
313 return ceil(log(1.0 - u) / log(1.0 - p));
314 }
315
derive_sampling_interval(double sample_size,const JfrSamplerWindow * expired)316 size_t JfrAdaptiveSampler::derive_sampling_interval(double sample_size, const JfrSamplerWindow* expired) {
317 assert(sample_size > 0, "invariant");
318 const size_t population_size = project_population_size(expired);
319 if (population_size <= sample_size) {
320 return 1;
321 }
322 assert(population_size > 0, "invariant");
323 const double projected_probability = sample_size / population_size;
324 return next_geometric(projected_probability, _prng.next_uniform());
325 }
326
327 // The projected population size is an exponentially weighted moving average, a function of the window_lookback_count.
project_population_size(const JfrSamplerWindow * expired)328 inline size_t JfrAdaptiveSampler::project_population_size(const JfrSamplerWindow* expired) {
329 assert(expired != NULL, "invariant");
330 _avg_population_size = exponentially_weighted_moving_average(expired->population_size(), _ewma_population_size_alpha, _avg_population_size);
331 return _avg_population_size;
332 }
333
334 /* GTEST support */
JfrGTestFixedRateSampler(size_t sample_points_per_window,size_t window_duration_ms,size_t lookback_count)335 JfrGTestFixedRateSampler::JfrGTestFixedRateSampler(size_t sample_points_per_window, size_t window_duration_ms, size_t lookback_count) : JfrAdaptiveSampler(), _params() {
336 _sample_size_ewma = 0.0;
337 _params.sample_points_per_window = sample_points_per_window;
338 _params.window_duration_ms = window_duration_ms;
339 _params.window_lookback_count = lookback_count;
340 _params.reconfigure = true;
341 }
342
initialize()343 bool JfrGTestFixedRateSampler::initialize() {
344 const bool result = JfrAdaptiveSampler::initialize();
345 JfrSpinlockHelper mutex(&_lock);
346 reconfigure();
347 return result;
348 }
349
350 /*
351 * To start debugging the sampler: -Xlog:jfr+system+throttle=debug
352 * It will log details of each expired window together with an average sample size.
353 *
354 * Excerpt:
355 *
356 * "JfrGTestFixedRateSampler: avg.sample size: 19.8377, window set point: 20 ..."
357 *
358 * Monitoring the relation of average sample size to the window set point, i.e the target,
359 * is a good indicator of how the sampler is performing over time.
360 *
361 */
log(const JfrSamplerWindow * expired,double * sample_size_ewma)362 static void log(const JfrSamplerWindow* expired, double* sample_size_ewma) {
363 assert(sample_size_ewma != NULL, "invariant");
364 if (log_is_enabled(Debug, jfr, system, throttle)) {
365 *sample_size_ewma = exponentially_weighted_moving_average(expired->sample_size(), compute_ewma_alpha_coefficient(expired->params().window_lookback_count), *sample_size_ewma);
366 log_debug(jfr, system, throttle)("JfrGTestFixedRateSampler: avg.sample size: %0.4f, window set point: %zu, sample size: %zu, population size: %zu, ratio: %.4f, window duration: %zu ms\n",
367 *sample_size_ewma, expired->params().sample_points_per_window, expired->sample_size(), expired->population_size(),
368 expired->population_size() == 0 ? 0 : (double)expired->sample_size() / (double)expired->population_size(),
369 expired->params().window_duration_ms);
370 }
371 }
372
373 /*
374 * This is the feedback control loop.
375 *
376 * The JfrAdaptiveSampler engine calls this when a sampler window has expired, providing
377 * us with an opportunity to perform some analysis.To reciprocate, we returns a set of
378 * parameters, possibly updated, for the engine to apply to the next window.
379 */
next_window_params(const JfrSamplerWindow * expired)380 const JfrSamplerParams& JfrGTestFixedRateSampler::next_window_params(const JfrSamplerWindow* expired) {
381 assert(expired != NULL, "invariant");
382 assert(_lock, "invariant");
383 log(expired, &_sample_size_ewma);
384 return _params;
385 }
386