1 #define _POSIX_C_SOURCE 200112L
2 #define _XOPEN_SOURCE 600
3
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <float.h>
7 #include <assert.h>
8 #include <math.h>
9 #include <sys/time.h>
10
11 #include "libflow.h"
12 #include "utils.h"
13
move_to_inc_at_start(struct flow * fw)14 static inline void move_to_inc_at_start(struct flow *fw)
15 {
16 fw->step = 1;
17 fw->state = FW_INC;
18 }
19
init_flow(struct flow * fw,uint64_t total_size,long max_process_rate,int progress,flow_func_flush_chunk_t func_flush_chunk)20 void init_flow(struct flow *fw, uint64_t total_size,
21 long max_process_rate, int progress,
22 flow_func_flush_chunk_t func_flush_chunk)
23 {
24 fw->total_size = total_size;
25 fw->total_processed = 0;
26 fw->progress = progress;
27 fw->block_size = 512; /* Bytes */
28 fw->blocks_per_delay = 1; /* 512B/s */
29 fw->delay_ms = 1000; /* 1s */
30 fw->max_process_rate = max_process_rate <= 0
31 ? DBL_MAX : max_process_rate * 1024.;
32 fw->measured_blocks = 0;
33 fw->measured_time_ms = 0;
34 fw->erase = 0;
35 fw->func_flush_chunk = func_flush_chunk;
36 fw->processed_blocks = 0;
37 fw->acc_delay_us = 0;
38 assert(fw->block_size > 0);
39 assert(fw->block_size % SECTOR_SIZE == 0);
40
41 move_to_inc_at_start(fw);
42 }
43
repeat_ch(char ch,int count)44 static inline void repeat_ch(char ch, int count)
45 {
46 while (count > 0) {
47 printf("%c", ch);
48 count--;
49 }
50 }
51
erase(int count)52 static void erase(int count)
53 {
54 if (count <= 0)
55 return;
56 repeat_ch('\b', count);
57 repeat_ch(' ', count);
58 repeat_ch('\b', count);
59 }
60
pr_time(double sec)61 static int pr_time(double sec)
62 {
63 int has_h, has_m;
64 int c, tot;
65
66 tot = printf(" -- ");
67 assert(tot > 0);
68
69 has_h = sec >= 3600;
70 if (has_h) {
71 double h = floor(sec / 3600);
72 c = printf("%i:", (int)h);
73 assert(c > 0);
74 tot += c;
75 sec -= h * 3600;
76 }
77
78 has_m = has_h || sec >= 60;
79 if (has_m) {
80 double m = floor(sec / 60);
81 if (has_h)
82 c = printf("%02i:", (int)m);
83 else
84 c = printf("%i:", (int)m);
85 assert(c > 0);
86 tot += c;
87 sec -= m * 60;
88 }
89
90 if (has_m)
91 c = printf("%02i", (int)round(sec));
92 else
93 c = printf("%is", (int)round(sec));
94 assert(c > 0);
95 return tot + c;
96 }
97
report_progress(struct flow * fw,double inst_speed)98 static void report_progress(struct flow *fw, double inst_speed)
99 {
100 const char *unit = adjust_unit(&inst_speed);
101 double percent;
102 /* The following shouldn't be necessary, but sometimes
103 * the initial free space isn't exactly reported
104 * by the kernel; this issue has been seen on Macs.
105 */
106 if (fw->total_size < fw->total_processed)
107 fw->total_size = fw->total_processed;
108 percent = (double)fw->total_processed * 100 / fw->total_size;
109 erase(fw->erase);
110 fw->erase = printf("%.2f%% -- %.2f %s/s",
111 percent, inst_speed, unit);
112 assert(fw->erase > 0);
113 if (has_enough_measurements(fw))
114 fw->erase += pr_time(
115 (fw->total_size - fw->total_processed) /
116 get_avg_speed(fw));
117 fflush(stdout);
118 }
119
__start_measurement(struct flow * fw)120 static inline void __start_measurement(struct flow *fw)
121 {
122 assert(!gettimeofday(&fw->t1, NULL));
123 }
124
start_measurement(struct flow * fw)125 void start_measurement(struct flow *fw)
126 {
127 /*
128 * The report below is especially useful when a single measurement spans
129 * multiple files; this happens when a drive is faster than 1GB/s.
130 */
131 if (fw->progress)
132 report_progress(fw, fw->blocks_per_delay * fw->block_size *
133 1000.0 / fw->delay_ms);
134 __start_measurement(fw);
135 }
136
move_to_steady(struct flow * fw)137 static inline void move_to_steady(struct flow *fw)
138 {
139 fw->state = FW_STEADY;
140 }
141
move_to_search(struct flow * fw,int64_t bpd1,int64_t bpd2)142 static void move_to_search(struct flow *fw, int64_t bpd1, int64_t bpd2)
143 {
144 assert(bpd1 > 0);
145 assert(bpd2 >= bpd1);
146
147 fw->blocks_per_delay = (bpd1 + bpd2) / 2;
148 if (bpd2 - bpd1 <= 3) {
149 move_to_steady(fw);
150 return;
151 }
152
153 fw->bpd1 = bpd1;
154 fw->bpd2 = bpd2;
155 fw->state = FW_SEARCH;
156 }
157
dec_step(struct flow * fw)158 static inline void dec_step(struct flow *fw)
159 {
160 if (fw->blocks_per_delay - fw->step > 0) {
161 fw->blocks_per_delay -= fw->step;
162 fw->step *= 2;
163 } else
164 move_to_search(fw, 1, fw->blocks_per_delay + fw->step / 2);
165 }
166
inc_step(struct flow * fw)167 static inline void inc_step(struct flow *fw)
168 {
169 fw->blocks_per_delay += fw->step;
170 fw->step *= 2;
171 }
172
move_to_inc(struct flow * fw)173 static inline void move_to_inc(struct flow *fw)
174 {
175 move_to_inc_at_start(fw);
176 inc_step(fw);
177 }
178
move_to_dec(struct flow * fw)179 static inline void move_to_dec(struct flow *fw)
180 {
181 fw->step = 1;
182 fw->state = FW_DEC;
183 dec_step(fw);
184 }
185
is_rate_above(const struct flow * fw,long delay,double inst_speed)186 static inline int is_rate_above(const struct flow *fw,
187 long delay, double inst_speed)
188 {
189 /* We use logical or here to enforce the lowest limit. */
190 return delay > fw->delay_ms || inst_speed > fw->max_process_rate;
191 }
192
is_rate_below(const struct flow * fw,long delay,double inst_speed)193 static inline int is_rate_below(const struct flow *fw,
194 long delay, double inst_speed)
195 {
196 /* We use logical and here to enforce both limist. */
197 return delay <= fw->delay_ms && inst_speed < fw->max_process_rate;
198 }
199
flush_chunk(const struct flow * fw,int fd)200 static inline int flush_chunk(const struct flow *fw, int fd)
201 {
202 if (fw->func_flush_chunk)
203 return fw->func_flush_chunk(fw, fd);
204 return 0;
205 }
206
207 /* XXX Avoid duplicate this function, which was copied from libutils.h. */
diff_timeval_us(const struct timeval * t1,const struct timeval * t2)208 static inline uint64_t diff_timeval_us(const struct timeval *t1,
209 const struct timeval *t2)
210 {
211 return (t2->tv_sec - t1->tv_sec) * 1000000ULL +
212 t2->tv_usec - t1->tv_usec;
213 }
214
measure(int fd,struct flow * fw,long processed)215 int measure(int fd, struct flow *fw, long processed)
216 {
217 ldiv_t result = ldiv(processed, fw->block_size);
218 struct timeval t2;
219 int64_t delay;
220 double bytes_k, inst_speed;
221
222 assert(result.rem == 0);
223 fw->processed_blocks += result.quot;
224 fw->total_processed += processed;
225
226 if (fw->processed_blocks < fw->blocks_per_delay)
227 return 0;
228 assert(fw->processed_blocks == fw->blocks_per_delay);
229
230 if (flush_chunk(fw, fd) < 0)
231 return -1; /* Caller can read errno(3). */
232
233 assert(!gettimeofday(&t2, NULL));
234 delay = (diff_timeval_us(&fw->t1, &t2) + fw->acc_delay_us) / 1000;
235
236 /* Instantaneous speed in bytes per second. */
237 bytes_k = fw->blocks_per_delay * fw->block_size * 1000.0;
238 inst_speed = bytes_k / delay;
239
240 if (delay < fw->delay_ms && inst_speed > fw->max_process_rate) {
241 /* Wait until inst_speed == fw->max_process_rate
242 * (if possible).
243 */
244 double wait_ms = round((bytes_k - delay * fw->max_process_rate)
245 / fw->max_process_rate);
246
247 if (wait_ms < 0) {
248 /* Wait what is possible. */
249 wait_ms = fw->delay_ms - delay;
250 } else if (delay + wait_ms < fw->delay_ms) {
251 /* wait_ms is not the largest possible value, so
252 * force the flow algorithm to keep increasing it.
253 * Otherwise, the delay to print progress may be
254 * too small.
255 */
256 wait_ms++;
257 }
258
259 if (wait_ms > 0) {
260 /* Slow down. */
261 msleep(wait_ms);
262
263 /* Adjust measurements. */
264 delay += wait_ms;
265 inst_speed = bytes_k / delay;
266 }
267 }
268
269 /* Update mean. */
270 fw->measured_blocks += fw->processed_blocks;
271 fw->measured_time_ms += delay;
272
273 switch (fw->state) {
274 case FW_INC:
275 if (is_rate_above(fw, delay, inst_speed)) {
276 move_to_search(fw,
277 fw->blocks_per_delay - fw->step / 2,
278 fw->blocks_per_delay);
279 } else if (is_rate_below(fw, delay, inst_speed)) {
280 inc_step(fw);
281 } else
282 move_to_steady(fw);
283 break;
284
285 case FW_DEC:
286 if (is_rate_above(fw, delay, inst_speed)) {
287 dec_step(fw);
288 } else if (is_rate_below(fw, delay, inst_speed)) {
289 move_to_search(fw, fw->blocks_per_delay,
290 fw->blocks_per_delay + fw->step / 2);
291 } else
292 move_to_steady(fw);
293 break;
294
295 case FW_SEARCH:
296 if (fw->bpd2 - fw->bpd1 <= 3) {
297 move_to_steady(fw);
298 break;
299 }
300
301 if (is_rate_above(fw, delay, inst_speed)) {
302 fw->bpd2 = fw->blocks_per_delay;
303 fw->blocks_per_delay = (fw->bpd1 + fw->bpd2) / 2;
304 } else if (is_rate_below(fw, delay, inst_speed)) {
305 fw->bpd1 = fw->blocks_per_delay;
306 fw->blocks_per_delay = (fw->bpd1 + fw->bpd2) / 2;
307 } else
308 move_to_steady(fw);
309 break;
310
311 case FW_STEADY: {
312 if (delay <= fw->delay_ms) {
313 if (inst_speed < fw->max_process_rate) {
314 move_to_inc(fw);
315 } else if (inst_speed > fw->max_process_rate) {
316 move_to_dec(fw);
317 }
318 } else if (fw->blocks_per_delay > 1) {
319 move_to_dec(fw);
320 }
321 break;
322 }
323
324 default:
325 assert(0);
326 }
327
328 if (fw->progress)
329 report_progress(fw, inst_speed);
330
331 /* Reset accumulators. */
332 fw->processed_blocks = 0;
333 fw->acc_delay_us = 0;
334 __start_measurement(fw);
335 return 0;
336 }
337
end_measurement(int fd,struct flow * fw)338 int end_measurement(int fd, struct flow *fw)
339 {
340 struct timeval t2;
341 int saved_errno;
342 int ret = 0;
343
344 if (fw->processed_blocks <= 0)
345 goto out;
346
347 if (flush_chunk(fw, fd) < 0) {
348 saved_errno = errno;
349 ret = -1;
350 goto out;
351 }
352
353 /* Save time in between closing ongoing file and creating a new file. */
354 assert(!gettimeofday(&t2, NULL));
355 fw->acc_delay_us += diff_timeval_us(&fw->t1, &t2);
356
357 out:
358 /* Erase progress information. */
359 erase(fw->erase);
360 fw->erase = 0;
361 fflush(stdout);
362
363 if (ret < 0) {
364 /* Propagate errno(3) to caller. */
365 errno = saved_errno;
366 }
367 return ret;
368 }
369