1 /*
2  * Copyright (C) 2010 Codership Oy <info@codership.com>
3  *
4  * $Id$
5  */
6 
7 /*! @file This unit contains Flow Control parts deemed worthy to be
8  *        taken out of gcs.c */
9 
10 #include "gcs_fc.hpp"
11 
12 #include <galerautils.h>
13 #include <string.h>
14 
15 double const gcs_fc_hard_limit_fix = 0.9; //! allow for some overhead
16 
17 static double const min_sleep = 0.001; //! minimum sleep period (s)
18 
19 /*! Initializes operational constants before opening connection to group
20  * @return -EINVAL if wrong values are submitted */
21 int
gcs_fc_init(gcs_fc_t * fc,ssize_t hard_limit,double soft_limit,double max_throttle)22 gcs_fc_init (gcs_fc_t* fc,
23              ssize_t   hard_limit,   // slave queue hard limit
24              double    soft_limit,   // soft limit as a fraction of hard limit
25              double    max_throttle)
26 {
27     assert (fc);
28 
29     if (hard_limit < 0) {
30         gu_error ("Bad value for slave queue hard limit: %zd (should be > 0)",
31                   hard_limit);
32         return -EINVAL;
33     }
34 
35     if (soft_limit < 0.0 || soft_limit >= 1.0) {
36         gu_error ("Bad value for slave queue soft limit: %f "
37                   "(should belong to [0.0,1.0) )", soft_limit);
38         return -EINVAL;
39     }
40 
41     if (max_throttle < 0.0 || max_throttle >= 1.0) {
42         gu_error ("Bad value for max throttle: %f "
43                   "(should belong to [0.0,1.0) )", max_throttle);
44         return -EINVAL;
45     }
46 
47     memset (fc, 0, sizeof(*fc));
48 
49     fc->hard_limit = hard_limit;
50     fc->soft_limit = fc->hard_limit * soft_limit;
51     fc->max_throttle = max_throttle;
52 
53     return 0;
54 }
55 
56 /*! Reinitializes object at the beginning of state transfer */
57 void
gcs_fc_reset(gcs_fc_t * const fc,ssize_t const queue_size)58 gcs_fc_reset (gcs_fc_t* const fc, ssize_t const queue_size)
59 {
60     assert (fc != NULL);
61     assert (queue_size >= 0);
62 
63     fc->init_size  = queue_size;
64     fc->size       = fc->init_size;
65     fc->start      = gu_time_monotonic();
66     fc->last_sleep = 0;
67     fc->act_count  = 0;
68     fc->max_rate   = -1.0;
69     fc->scale      =  0.0;
70     fc->offset     =  0.0;
71     fc->sleep_count= 0;
72     fc->sleeps     = 0.0;
73 }
74 
75 /*
76  * The idea here is that there is no flow control up until slave queue size
77  * reaches soft limit.
78  * After that flow control gradually slows down replication rate by emitting FC
79  * events in order to buy more time for state transfer.
80  * Replication rate goes linearly from normal rate at soft limit to max_throttle
81  * fraction at hard limit, at which point -ENOMEM is returned as replication
82  * becomes prohibitively slow.
83  *
84  * replication
85  *    speed
86  *      ^
87  *      |--------.           <- normal replication rate
88  *      |        .\
89  *      |        . \
90  *      |        .  \
91  *      |        .   \      speed = fc->size * fc->scale + fc->offset
92  *      |        .    \
93  *      |        .     \
94  *      |        .      \  |
95  *      |        .       \ |
96  *      |        .        \|
97  *      |        .         + <- throttle limit
98  *      |        .         |
99  *      |        .         |
100  *      +--------+---------+----> slave queue size
101  *             soft       hard
102  *            limit       limit
103  */
104 
105 /*! Processes a new action added to a slave queue.
106  *  @return length of sleep in nanoseconds or negative error code
107  *          or GU_TIME_ETERNITY for complete stop */
108 long long
gcs_fc_process(gcs_fc_t * fc,ssize_t act_size)109 gcs_fc_process (gcs_fc_t* fc, ssize_t act_size)
110 {
111     fc->size += act_size;
112     fc->act_count++;
113 
114     if (fc->size <= fc->soft_limit) {
115         /* normal operation */
116         if (gu_unlikely(fc->debug > 0 && !(fc->act_count % fc->debug))) {
117             gu_info ("FC: queue size: %zdb (%4.1f%% of soft limit)",
118                      fc->size, ((double)fc->size)/fc->soft_limit*100.0);
119         }
120         return 0;
121     }
122     else if (fc->size >= fc->hard_limit) {
123         if (0.0 == fc->max_throttle) {
124             /* we can accept total service outage */
125             return GU_TIME_ETERNITY;
126         }
127         else {
128             gu_error ("Recv queue hard limit exceeded. Can't continue.");
129             return -ENOMEM;
130         }
131     }
132 //    else if (!(fc->act_count & 7)) { // do this for every 8th action
133     else {
134         long long end   = gu_time_monotonic();
135         double interval = ((end - fc->start) * 1.0e-9);
136 
137         if (gu_unlikely (0 == fc->last_sleep)) {
138             /* just tripped the soft limit, preparing constants for throttle */
139 
140             fc->max_rate = (double)(fc->size - fc->init_size) / interval;
141 
142             double s = (1.0 - fc->max_throttle)/(fc->soft_limit-fc->hard_limit);
143             assert (s < 0.0);
144 
145             fc->scale  = s * fc->max_rate;
146             fc->offset = (1.0 - s*fc->soft_limit) * fc->max_rate;
147 
148             // calculate time interval from the soft limit
149             interval = interval * (double)(fc->size - fc->soft_limit) /
150                 (fc->size - fc->init_size);
151             assert (interval >= 0.0);
152 
153             // Move reference point to soft limit
154             fc->last_sleep = fc->soft_limit;
155             fc->start      = end - interval * 1000000000;
156 
157             gu_warn("Soft recv queue limit exceeded, starting replication "
158                     "throttle. Measured avg. rate: %f bytes/sec; "
159                     "Throttle parameters: scale=%f, offset=%f",
160                     fc->max_rate, fc->scale, fc->offset);
161         }
162 
163         /* throttling operation */
164         double desired_rate = fc->size * fc->scale + fc->offset; // linear decay
165         //double desired_rate = fc->max_rate * fc->max_throttle; // square wave
166         assert (desired_rate <= fc->max_rate);
167 
168         double sleep = (double)(fc->size - fc->last_sleep) / desired_rate
169             - interval;
170 
171         if (gu_unlikely(fc->debug > 0 && !(fc->act_count % fc->debug))) {
172             gu_info ("FC: queue size: %zdb, length: %zd, "
173                      "measured rate: %fb/s, desired rate: %fb/s, "
174                      "interval: %5.3fs, sleep: %5.4fs. "
175                      "Sleeps initiated: %zd, for a total of %6.3fs",
176                      fc->size, fc->act_count,
177                      ((double)(fc->size - fc->last_sleep))/interval,
178                      desired_rate, interval, sleep, fc->sleep_count,
179                      fc->sleeps);
180             fc->sleep_count = 0;
181             fc->sleeps = 0.0;
182         }
183 
184         if (gu_likely(sleep < min_sleep)) {
185 #if 0
186             gu_info ("Skipping sleep: desired_rate = %f, sleep = %f (%f), "
187                      "interval = %f, fc->scale = %f, fc->offset = %f, "
188                      "fc->size = %zd",
189                      desired_rate, sleep, min_sleep, interval,
190                      fc->scale, fc->offset, fc->size);
191 #endif
192             return 0;
193         }
194 
195         fc->last_sleep = fc->size;
196         fc->start      = end;
197         fc->sleep_count++;
198         fc->sleeps += sleep;
199 
200         return (1000000000LL * sleep);
201     }
202 
203     return 0;
204 }
205 
gcs_fc_debug(gcs_fc_t * fc,long debug_level)206 void gcs_fc_debug (gcs_fc_t* fc, long debug_level) { fc->debug = debug_level; }
207