1 /*
2 * Copyright (C) 2010 Codership Oy <info@codership.com>
3 *
4 * $Id$
5 */
6
7 /*! @file This unit contains Flow Control parts deemed worthy to be
8 * taken out of gcs.c */
9
10 #include "gcs_fc.hpp"
11
12 #include <galerautils.h>
13 #include <string.h>
14
15 double const gcs_fc_hard_limit_fix = 0.9; //! allow for some overhead
16
17 static double const min_sleep = 0.001; //! minimum sleep period (s)
18
19 /*! Initializes operational constants before opening connection to group
20 * @return -EINVAL if wrong values are submitted */
21 int
gcs_fc_init(gcs_fc_t * fc,ssize_t hard_limit,double soft_limit,double max_throttle)22 gcs_fc_init (gcs_fc_t* fc,
23 ssize_t hard_limit, // slave queue hard limit
24 double soft_limit, // soft limit as a fraction of hard limit
25 double max_throttle)
26 {
27 assert (fc);
28
29 if (hard_limit < 0) {
30 gu_error ("Bad value for slave queue hard limit: %zd (should be > 0)",
31 hard_limit);
32 return -EINVAL;
33 }
34
35 if (soft_limit < 0.0 || soft_limit >= 1.0) {
36 gu_error ("Bad value for slave queue soft limit: %f "
37 "(should belong to [0.0,1.0) )", soft_limit);
38 return -EINVAL;
39 }
40
41 if (max_throttle < 0.0 || max_throttle >= 1.0) {
42 gu_error ("Bad value for max throttle: %f "
43 "(should belong to [0.0,1.0) )", max_throttle);
44 return -EINVAL;
45 }
46
47 memset (fc, 0, sizeof(*fc));
48
49 fc->hard_limit = hard_limit;
50 fc->soft_limit = fc->hard_limit * soft_limit;
51 fc->max_throttle = max_throttle;
52
53 return 0;
54 }
55
56 /*! Reinitializes object at the beginning of state transfer */
57 void
gcs_fc_reset(gcs_fc_t * const fc,ssize_t const queue_size)58 gcs_fc_reset (gcs_fc_t* const fc, ssize_t const queue_size)
59 {
60 assert (fc != NULL);
61 assert (queue_size >= 0);
62
63 fc->init_size = queue_size;
64 fc->size = fc->init_size;
65 fc->start = gu_time_monotonic();
66 fc->last_sleep = 0;
67 fc->act_count = 0;
68 fc->max_rate = -1.0;
69 fc->scale = 0.0;
70 fc->offset = 0.0;
71 fc->sleep_count= 0;
72 fc->sleeps = 0.0;
73 }
74
75 /*
76 * The idea here is that there is no flow control up until slave queue size
77 * reaches soft limit.
78 * After that flow control gradually slows down replication rate by emitting FC
79 * events in order to buy more time for state transfer.
80 * Replication rate goes linearly from normal rate at soft limit to max_throttle
81 * fraction at hard limit, at which point -ENOMEM is returned as replication
82 * becomes prohibitively slow.
83 *
84 * replication
85 * speed
86 * ^
87 * |--------. <- normal replication rate
88 * | .\
89 * | . \
90 * | . \
91 * | . \ speed = fc->size * fc->scale + fc->offset
92 * | . \
93 * | . \
94 * | . \ |
95 * | . \ |
96 * | . \|
97 * | . + <- throttle limit
98 * | . |
99 * | . |
100 * +--------+---------+----> slave queue size
101 * soft hard
102 * limit limit
103 */
104
105 /*! Processes a new action added to a slave queue.
106 * @return length of sleep in nanoseconds or negative error code
107 * or GU_TIME_ETERNITY for complete stop */
108 long long
gcs_fc_process(gcs_fc_t * fc,ssize_t act_size)109 gcs_fc_process (gcs_fc_t* fc, ssize_t act_size)
110 {
111 fc->size += act_size;
112 fc->act_count++;
113
114 if (fc->size <= fc->soft_limit) {
115 /* normal operation */
116 if (gu_unlikely(fc->debug > 0 && !(fc->act_count % fc->debug))) {
117 gu_info ("FC: queue size: %zdb (%4.1f%% of soft limit)",
118 fc->size, ((double)fc->size)/fc->soft_limit*100.0);
119 }
120 return 0;
121 }
122 else if (fc->size >= fc->hard_limit) {
123 if (0.0 == fc->max_throttle) {
124 /* we can accept total service outage */
125 return GU_TIME_ETERNITY;
126 }
127 else {
128 gu_error ("Recv queue hard limit exceeded. Can't continue.");
129 return -ENOMEM;
130 }
131 }
132 // else if (!(fc->act_count & 7)) { // do this for every 8th action
133 else {
134 long long end = gu_time_monotonic();
135 double interval = ((end - fc->start) * 1.0e-9);
136
137 if (gu_unlikely (0 == fc->last_sleep)) {
138 /* just tripped the soft limit, preparing constants for throttle */
139
140 fc->max_rate = (double)(fc->size - fc->init_size) / interval;
141
142 double s = (1.0 - fc->max_throttle)/(fc->soft_limit-fc->hard_limit);
143 assert (s < 0.0);
144
145 fc->scale = s * fc->max_rate;
146 fc->offset = (1.0 - s*fc->soft_limit) * fc->max_rate;
147
148 // calculate time interval from the soft limit
149 interval = interval * (double)(fc->size - fc->soft_limit) /
150 (fc->size - fc->init_size);
151 assert (interval >= 0.0);
152
153 // Move reference point to soft limit
154 fc->last_sleep = fc->soft_limit;
155 fc->start = end - interval * 1000000000;
156
157 gu_warn("Soft recv queue limit exceeded, starting replication "
158 "throttle. Measured avg. rate: %f bytes/sec; "
159 "Throttle parameters: scale=%f, offset=%f",
160 fc->max_rate, fc->scale, fc->offset);
161 }
162
163 /* throttling operation */
164 double desired_rate = fc->size * fc->scale + fc->offset; // linear decay
165 //double desired_rate = fc->max_rate * fc->max_throttle; // square wave
166 assert (desired_rate <= fc->max_rate);
167
168 double sleep = (double)(fc->size - fc->last_sleep) / desired_rate
169 - interval;
170
171 if (gu_unlikely(fc->debug > 0 && !(fc->act_count % fc->debug))) {
172 gu_info ("FC: queue size: %zdb, length: %zd, "
173 "measured rate: %fb/s, desired rate: %fb/s, "
174 "interval: %5.3fs, sleep: %5.4fs. "
175 "Sleeps initiated: %zd, for a total of %6.3fs",
176 fc->size, fc->act_count,
177 ((double)(fc->size - fc->last_sleep))/interval,
178 desired_rate, interval, sleep, fc->sleep_count,
179 fc->sleeps);
180 fc->sleep_count = 0;
181 fc->sleeps = 0.0;
182 }
183
184 if (gu_likely(sleep < min_sleep)) {
185 #if 0
186 gu_info ("Skipping sleep: desired_rate = %f, sleep = %f (%f), "
187 "interval = %f, fc->scale = %f, fc->offset = %f, "
188 "fc->size = %zd",
189 desired_rate, sleep, min_sleep, interval,
190 fc->scale, fc->offset, fc->size);
191 #endif
192 return 0;
193 }
194
195 fc->last_sleep = fc->size;
196 fc->start = end;
197 fc->sleep_count++;
198 fc->sleeps += sleep;
199
200 return (1000000000LL * sleep);
201 }
202
203 return 0;
204 }
205
gcs_fc_debug(gcs_fc_t * fc,long debug_level)206 void gcs_fc_debug (gcs_fc_t* fc, long debug_level) { fc->debug = debug_level; }
207