1 /*
2  * Copyright (c) 2014, 2017  Machine Zone, Inc.
3  *
4  * Original author: Lev Walkin <lwalkin@machinezone.com>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14 
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  */
27 /*
28  * Moving average allows to capture number of events per period of time
29  * (hits/second, kbps)
30  * averaged over the specified time period.
31  * http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
32  */
33 #ifndef TCPKALI_MAVG_H
34 #define TCPKALI_MAVG_H
35 
36 #include <string.h>
37 #include <math.h>
38 #include <assert.h>
39 
40 typedef struct {
41     const double decay_factor;
42     double accumulator;
43 } exp_moving_average;
44 
45 static void __attribute__((unused))
exp_moving_average_init(exp_moving_average * ema,double decay)46 exp_moving_average_init(exp_moving_average *ema, double decay) {
47     assert(decay > 0 && decay <= 1);
48     *(double *)&ema->decay_factor = decay;
49     ema->accumulator = 0 / 0.0;
50 }
51 
52 static void __attribute__((unused))
exp_moving_average_add(exp_moving_average * ema,double x)53 exp_moving_average_add(exp_moving_average *ema, double x) {
54     if(isfinite(ema->accumulator) == 0)
55         ema->accumulator = x;
56     else
57         ema->accumulator =
58             ema->decay_factor * x + (1 - ema->decay_factor) * ema->accumulator;
59 }
60 
61 typedef struct {
62     const double aggregate_window; /* Time window to aggregate data in. */
63     double events;              /* Aggregated events for rate calculation. */
64     double right_edge_ts;       /* Time to aggregate a new rate point. */
65     double update_ts;           /* Last time we got an update. */
66     exp_moving_average storage; /* Aggregated measurement results. */
67 } mavg;
68 
69 /*
70  * Initialize the empty moving average structure. Keep the smoothing window
71  * within the structure.
72  * Typical size of the aggregate_window is (1/8.0) seconds.
73  * Typical size of the smoothing window is 3.0 seconds.
74  */
75 static void __attribute__((unused))
mavg_init(mavg * m,double start_time,double aggregate_window,double smoothing_window)76 mavg_init(mavg *m, double start_time, double aggregate_window,
77                   double smoothing_window) {
78     assert(aggregate_window <= smoothing_window);
79     exp_moving_average_init(&m->storage,
80                             aggregate_window / smoothing_window);
81     m->events = 0;
82     *(double *)&m->aggregate_window = aggregate_window;
83     m->right_edge_ts = start_time + aggregate_window;
84     m->update_ts = start_time;
85 }
86 
87 static double __attribute__((unused))
mavg_smoothing_window_s(const mavg * m)88 mavg_smoothing_window_s(const mavg *m) {
89     return m->aggregate_window / m->storage.decay_factor;
90 }
91 
92 /*
93  * Bump the moving average with the specified number of events.
94  */
95 static void __attribute__((unused))
mavg_add(mavg * m,double now,double new_events)96 mavg_add(mavg *m, double now, double new_events) {
97 
98     if(new_events == 0.0 && m->events == 0.0)
99         return;
100 
101     assert(!(new_events < 0));
102 
103     if(m->right_edge_ts > now) {
104         /*
105          * We are still aggregating results and has not reached
106          * the edge of the update window
107          */
108         m->events += new_events;
109         m->update_ts = now;
110         return;
111     }
112 
113     if(m->right_edge_ts + mavg_smoothing_window_s(m) < now) {
114         m->right_edge_ts = now + m->aggregate_window;
115         m->update_ts = now;
116         m->events = 0;
117         m->storage.accumulator = new_events / m->aggregate_window;
118         return;
119     }
120 
121     /*
122      * Using a model where events arrive at the same rate,
123      * distribute them between timeframe to measure/store,
124      * and future events.
125      */
126     double old_events = new_events * (m->right_edge_ts - m->update_ts)
127                         / (now - m->update_ts);
128     exp_moving_average_add(&m->storage,
129                           (m->events + old_events) / m->aggregate_window);
130     m->update_ts = m->right_edge_ts;
131     m->right_edge_ts += m->aggregate_window;
132     m->events = 0;
133 
134     /* If last event was long ago - we want to make a set of updates */
135     mavg_add(m, now, new_events - old_events);
136 }
137 
mavg_per_second(const mavg * m,double now)138 static double __attribute__((unused)) mavg_per_second(const mavg *m, double now) {
139     double elapsed = now - m->update_ts;
140     if(elapsed > mavg_smoothing_window_s(m)) {
141         /*
142          * If we stopped for too long, we report zero.
143          * Otherwise we'll never reach zero (traffic, hits/s),
144          * just asymptotically approach it.
145          * This is confusing for humans. So we report zero.
146          */
147         return 0.0;
148     } else if(isfinite(m->storage.accumulator)) {
149         return m->storage.accumulator
150                * pow(1 - m->storage.decay_factor,
151                      (now - m->update_ts) / m->aggregate_window);
152     } else {
153         return 0.0;
154     }
155 }
156 
157 
158 #endif /* TCPKALI_MAVG_H */
159