1 /*
2 * Copyright (c) 2014, 2017 Machine Zone, Inc.
3 *
4 * Original author: Lev Walkin <lwalkin@machinezone.com>
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27 /*
28 * Moving average allows to capture number of events per period of time
29 * (hits/second, kbps)
30 * averaged over the specified time period.
31 * http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
32 */
33 #ifndef TCPKALI_MAVG_H
34 #define TCPKALI_MAVG_H
35
36 #include <string.h>
37 #include <math.h>
38 #include <assert.h>
39
40 typedef struct {
41 const double decay_factor;
42 double accumulator;
43 } exp_moving_average;
44
45 static void __attribute__((unused))
exp_moving_average_init(exp_moving_average * ema,double decay)46 exp_moving_average_init(exp_moving_average *ema, double decay) {
47 assert(decay > 0 && decay <= 1);
48 *(double *)&ema->decay_factor = decay;
49 ema->accumulator = 0 / 0.0;
50 }
51
52 static void __attribute__((unused))
exp_moving_average_add(exp_moving_average * ema,double x)53 exp_moving_average_add(exp_moving_average *ema, double x) {
54 if(isfinite(ema->accumulator) == 0)
55 ema->accumulator = x;
56 else
57 ema->accumulator =
58 ema->decay_factor * x + (1 - ema->decay_factor) * ema->accumulator;
59 }
60
61 typedef struct {
62 const double aggregate_window; /* Time window to aggregate data in. */
63 double events; /* Aggregated events for rate calculation. */
64 double right_edge_ts; /* Time to aggregate a new rate point. */
65 double update_ts; /* Last time we got an update. */
66 exp_moving_average storage; /* Aggregated measurement results. */
67 } mavg;
68
69 /*
70 * Initialize the empty moving average structure. Keep the smoothing window
71 * within the structure.
72 * Typical size of the aggregate_window is (1/8.0) seconds.
73 * Typical size of the smoothing window is 3.0 seconds.
74 */
75 static void __attribute__((unused))
mavg_init(mavg * m,double start_time,double aggregate_window,double smoothing_window)76 mavg_init(mavg *m, double start_time, double aggregate_window,
77 double smoothing_window) {
78 assert(aggregate_window <= smoothing_window);
79 exp_moving_average_init(&m->storage,
80 aggregate_window / smoothing_window);
81 m->events = 0;
82 *(double *)&m->aggregate_window = aggregate_window;
83 m->right_edge_ts = start_time + aggregate_window;
84 m->update_ts = start_time;
85 }
86
87 static double __attribute__((unused))
mavg_smoothing_window_s(const mavg * m)88 mavg_smoothing_window_s(const mavg *m) {
89 return m->aggregate_window / m->storage.decay_factor;
90 }
91
92 /*
93 * Bump the moving average with the specified number of events.
94 */
95 static void __attribute__((unused))
mavg_add(mavg * m,double now,double new_events)96 mavg_add(mavg *m, double now, double new_events) {
97
98 if(new_events == 0.0 && m->events == 0.0)
99 return;
100
101 assert(!(new_events < 0));
102
103 if(m->right_edge_ts > now) {
104 /*
105 * We are still aggregating results and has not reached
106 * the edge of the update window
107 */
108 m->events += new_events;
109 m->update_ts = now;
110 return;
111 }
112
113 if(m->right_edge_ts + mavg_smoothing_window_s(m) < now) {
114 m->right_edge_ts = now + m->aggregate_window;
115 m->update_ts = now;
116 m->events = 0;
117 m->storage.accumulator = new_events / m->aggregate_window;
118 return;
119 }
120
121 /*
122 * Using a model where events arrive at the same rate,
123 * distribute them between timeframe to measure/store,
124 * and future events.
125 */
126 double old_events = new_events * (m->right_edge_ts - m->update_ts)
127 / (now - m->update_ts);
128 exp_moving_average_add(&m->storage,
129 (m->events + old_events) / m->aggregate_window);
130 m->update_ts = m->right_edge_ts;
131 m->right_edge_ts += m->aggregate_window;
132 m->events = 0;
133
134 /* If last event was long ago - we want to make a set of updates */
135 mavg_add(m, now, new_events - old_events);
136 }
137
mavg_per_second(const mavg * m,double now)138 static double __attribute__((unused)) mavg_per_second(const mavg *m, double now) {
139 double elapsed = now - m->update_ts;
140 if(elapsed > mavg_smoothing_window_s(m)) {
141 /*
142 * If we stopped for too long, we report zero.
143 * Otherwise we'll never reach zero (traffic, hits/s),
144 * just asymptotically approach it.
145 * This is confusing for humans. So we report zero.
146 */
147 return 0.0;
148 } else if(isfinite(m->storage.accumulator)) {
149 return m->storage.accumulator
150 * pow(1 - m->storage.decay_factor,
151 (now - m->update_ts) / m->aggregate_window);
152 } else {
153 return 0.0;
154 }
155 }
156
157
158 #endif /* TCPKALI_MAVG_H */
159