1 #define _POSIX_C_SOURCE 200112L
2 #define _XOPEN_SOURCE 600
3 
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <float.h>
7 #include <assert.h>
8 #include <math.h>
9 #include <sys/time.h>
10 
11 #include "libflow.h"
12 #include "utils.h"
13 
move_to_inc_at_start(struct flow * fw)14 static inline void move_to_inc_at_start(struct flow *fw)
15 {
16 	fw->step = 1;
17 	fw->state = FW_INC;
18 }
19 
init_flow(struct flow * fw,uint64_t total_size,long max_process_rate,int progress,flow_func_flush_chunk_t func_flush_chunk)20 void init_flow(struct flow *fw, uint64_t total_size,
21 	long max_process_rate, int progress,
22 	flow_func_flush_chunk_t func_flush_chunk)
23 {
24 	fw->total_size		= total_size;
25 	fw->total_processed	= 0;
26 	fw->progress		= progress;
27 	fw->block_size		= 512;	/* Bytes	*/
28 	fw->blocks_per_delay	= 1;	/* 512B/s	*/
29 	fw->delay_ms		= 1000;	/* 1s		*/
30 	fw->max_process_rate	= max_process_rate <= 0
31 		? DBL_MAX : max_process_rate * 1024.;
32 	fw->measured_blocks	= 0;
33 	fw->measured_time_ms	= 0;
34 	fw->erase		= 0;
35 	fw->func_flush_chunk	= func_flush_chunk;
36 	fw->processed_blocks	= 0;
37 	fw->acc_delay_us	= 0;
38 	assert(fw->block_size > 0);
39 	assert(fw->block_size % SECTOR_SIZE == 0);
40 
41 	move_to_inc_at_start(fw);
42 }
43 
repeat_ch(char ch,int count)44 static inline void repeat_ch(char ch, int count)
45 {
46 	while (count > 0) {
47 		printf("%c", ch);
48 		count--;
49 	}
50 }
51 
erase(int count)52 static void erase(int count)
53 {
54 	if (count <= 0)
55 		return;
56 	repeat_ch('\b',	count);
57 	repeat_ch(' ',	count);
58 	repeat_ch('\b',	count);
59 }
60 
pr_time(double sec)61 static int pr_time(double sec)
62 {
63 	int has_h, has_m;
64 	int c, tot;
65 
66 	tot = printf(" -- ");
67 	assert(tot > 0);
68 
69 	has_h = sec >= 3600;
70 	if (has_h) {
71 		double h = floor(sec / 3600);
72 		c = printf("%i:", (int)h);
73 		assert(c > 0);
74 		tot += c;
75 		sec -= h * 3600;
76 	}
77 
78 	has_m = has_h || sec >= 60;
79 	if (has_m) {
80 		double m = floor(sec / 60);
81 		if (has_h)
82 			c = printf("%02i:", (int)m);
83 		else
84 			c = printf("%i:", (int)m);
85 		assert(c > 0);
86 		tot += c;
87 		sec -= m * 60;
88 	}
89 
90 	if (has_m)
91 		c = printf("%02i", (int)round(sec));
92 	else
93 		c = printf("%is", (int)round(sec));
94 	assert(c > 0);
95 	return tot + c;
96 }
97 
report_progress(struct flow * fw,double inst_speed)98 static void report_progress(struct flow *fw, double inst_speed)
99 {
100 	const char *unit = adjust_unit(&inst_speed);
101 	double percent;
102 	/* The following shouldn't be necessary, but sometimes
103 	 * the initial free space isn't exactly reported
104 	 * by the kernel; this issue has been seen on Macs.
105 	 */
106 	if (fw->total_size < fw->total_processed)
107 		fw->total_size = fw->total_processed;
108 	percent = (double)fw->total_processed * 100 / fw->total_size;
109 	erase(fw->erase);
110 	fw->erase = printf("%.2f%% -- %.2f %s/s",
111 		percent, inst_speed, unit);
112 	assert(fw->erase > 0);
113 	if (has_enough_measurements(fw))
114 		fw->erase += pr_time(
115 			(fw->total_size - fw->total_processed) /
116 			get_avg_speed(fw));
117 	fflush(stdout);
118 }
119 
__start_measurement(struct flow * fw)120 static inline void __start_measurement(struct flow *fw)
121 {
122 	assert(!gettimeofday(&fw->t1, NULL));
123 }
124 
start_measurement(struct flow * fw)125 void start_measurement(struct flow *fw)
126 {
127 	/*
128 	 * The report below is especially useful when a single measurement spans
129 	 * multiple files; this happens when a drive is faster than 1GB/s.
130 	 */
131 	if (fw->progress)
132 		report_progress(fw, fw->blocks_per_delay * fw->block_size *
133 			1000.0 / fw->delay_ms);
134 	__start_measurement(fw);
135 }
136 
move_to_steady(struct flow * fw)137 static inline void move_to_steady(struct flow *fw)
138 {
139 	fw->state = FW_STEADY;
140 }
141 
move_to_search(struct flow * fw,int64_t bpd1,int64_t bpd2)142 static void move_to_search(struct flow *fw, int64_t bpd1, int64_t bpd2)
143 {
144 	assert(bpd1 > 0);
145 	assert(bpd2 >= bpd1);
146 
147 	fw->blocks_per_delay = (bpd1 + bpd2) / 2;
148 	if (bpd2 - bpd1 <= 3) {
149 		move_to_steady(fw);
150 		return;
151 	}
152 
153 	fw->bpd1 = bpd1;
154 	fw->bpd2 = bpd2;
155 	fw->state = FW_SEARCH;
156 }
157 
dec_step(struct flow * fw)158 static inline void dec_step(struct flow *fw)
159 {
160 	if (fw->blocks_per_delay - fw->step > 0) {
161 		fw->blocks_per_delay -= fw->step;
162 		fw->step *= 2;
163 	} else
164 		move_to_search(fw, 1, fw->blocks_per_delay + fw->step / 2);
165 }
166 
inc_step(struct flow * fw)167 static inline void inc_step(struct flow *fw)
168 {
169 	fw->blocks_per_delay += fw->step;
170 	fw->step *= 2;
171 }
172 
move_to_inc(struct flow * fw)173 static inline void move_to_inc(struct flow *fw)
174 {
175 	move_to_inc_at_start(fw);
176 	inc_step(fw);
177 }
178 
move_to_dec(struct flow * fw)179 static inline void move_to_dec(struct flow *fw)
180 {
181 	fw->step = 1;
182 	fw->state = FW_DEC;
183 	dec_step(fw);
184 }
185 
is_rate_above(const struct flow * fw,long delay,double inst_speed)186 static inline int is_rate_above(const struct flow *fw,
187 	long delay, double inst_speed)
188 {
189 	/* We use logical or here to enforce the lowest limit. */
190 	return delay > fw->delay_ms || inst_speed > fw->max_process_rate;
191 }
192 
is_rate_below(const struct flow * fw,long delay,double inst_speed)193 static inline int is_rate_below(const struct flow *fw,
194 	long delay, double inst_speed)
195 {
196 	/* We use logical and here to enforce both limist. */
197 	return delay <= fw->delay_ms && inst_speed < fw->max_process_rate;
198 }
199 
flush_chunk(const struct flow * fw,int fd)200 static inline int flush_chunk(const struct flow *fw, int fd)
201 {
202 	if (fw->func_flush_chunk)
203 		return fw->func_flush_chunk(fw, fd);
204 	return 0;
205 }
206 
207 /* XXX Avoid duplicate this function, which was copied from libutils.h. */
diff_timeval_us(const struct timeval * t1,const struct timeval * t2)208 static inline uint64_t diff_timeval_us(const struct timeval *t1,
209 	const struct timeval *t2)
210 {
211 	return (t2->tv_sec - t1->tv_sec) * 1000000ULL +
212 		t2->tv_usec - t1->tv_usec;
213 }
214 
measure(int fd,struct flow * fw,long processed)215 int measure(int fd, struct flow *fw, long processed)
216 {
217 	ldiv_t result = ldiv(processed, fw->block_size);
218 	struct timeval t2;
219 	int64_t delay;
220 	double bytes_k, inst_speed;
221 
222 	assert(result.rem == 0);
223 	fw->processed_blocks += result.quot;
224 	fw->total_processed += processed;
225 
226 	if (fw->processed_blocks < fw->blocks_per_delay)
227 		return 0;
228 	assert(fw->processed_blocks == fw->blocks_per_delay);
229 
230 	if (flush_chunk(fw, fd) < 0)
231 		return -1; /* Caller can read errno(3). */
232 
233 	assert(!gettimeofday(&t2, NULL));
234 	delay = (diff_timeval_us(&fw->t1, &t2) + fw->acc_delay_us) / 1000;
235 
236 	/* Instantaneous speed in bytes per second. */
237 	bytes_k = fw->blocks_per_delay * fw->block_size * 1000.0;
238 	inst_speed = bytes_k / delay;
239 
240 	if (delay < fw->delay_ms && inst_speed > fw->max_process_rate) {
241 		/* Wait until inst_speed == fw->max_process_rate
242 		 * (if possible).
243 		 */
244 		double wait_ms = round((bytes_k - delay * fw->max_process_rate)
245 			/ fw->max_process_rate);
246 
247 		 if (wait_ms < 0) {
248 			/* Wait what is possible. */
249 			wait_ms = fw->delay_ms - delay;
250 		} else if (delay + wait_ms < fw->delay_ms) {
251 			/* wait_ms is not the largest possible value, so
252 			 * force the flow algorithm to keep increasing it.
253 			 * Otherwise, the delay to print progress may be
254 			 * too small.
255 			 */
256 			wait_ms++;
257 		}
258 
259 		if (wait_ms > 0) {
260 			/* Slow down. */
261 			msleep(wait_ms);
262 
263 			/* Adjust measurements. */
264 			delay += wait_ms;
265 			inst_speed = bytes_k / delay;
266 		}
267 	}
268 
269 	/* Update mean. */
270 	fw->measured_blocks += fw->processed_blocks;
271 	fw->measured_time_ms += delay;
272 
273 	switch (fw->state) {
274 	case FW_INC:
275 		if (is_rate_above(fw, delay, inst_speed)) {
276 			move_to_search(fw,
277 				fw->blocks_per_delay - fw->step / 2,
278 				fw->blocks_per_delay);
279 		} else if (is_rate_below(fw, delay, inst_speed)) {
280 			inc_step(fw);
281 		} else
282 			move_to_steady(fw);
283 		break;
284 
285 	case FW_DEC:
286 		if (is_rate_above(fw, delay, inst_speed)) {
287 			dec_step(fw);
288 		} else if (is_rate_below(fw, delay, inst_speed)) {
289 			move_to_search(fw, fw->blocks_per_delay,
290 				fw->blocks_per_delay + fw->step / 2);
291 		} else
292 			move_to_steady(fw);
293 		break;
294 
295 	case FW_SEARCH:
296 		if (fw->bpd2 - fw->bpd1 <= 3) {
297 			move_to_steady(fw);
298 			break;
299 		}
300 
301 		if (is_rate_above(fw, delay, inst_speed)) {
302 			fw->bpd2 = fw->blocks_per_delay;
303 			fw->blocks_per_delay = (fw->bpd1 + fw->bpd2) / 2;
304 		} else if (is_rate_below(fw, delay, inst_speed)) {
305 			fw->bpd1 = fw->blocks_per_delay;
306 			fw->blocks_per_delay = (fw->bpd1 + fw->bpd2) / 2;
307 		} else
308 			move_to_steady(fw);
309 		break;
310 
311 	case FW_STEADY: {
312 		if (delay <= fw->delay_ms) {
313 			if (inst_speed < fw->max_process_rate) {
314 				move_to_inc(fw);
315 			} else if (inst_speed > fw->max_process_rate) {
316 				move_to_dec(fw);
317 			}
318 		} else if (fw->blocks_per_delay > 1) {
319 			move_to_dec(fw);
320 		}
321 		break;
322 	}
323 
324 	default:
325 		assert(0);
326 	}
327 
328 	if (fw->progress)
329 		report_progress(fw, inst_speed);
330 
331 	/* Reset accumulators. */
332 	fw->processed_blocks = 0;
333 	fw->acc_delay_us = 0;
334 	__start_measurement(fw);
335 	return 0;
336 }
337 
end_measurement(int fd,struct flow * fw)338 int end_measurement(int fd, struct flow *fw)
339 {
340 	struct timeval t2;
341 	int saved_errno;
342 	int ret = 0;
343 
344 	if (fw->processed_blocks <= 0)
345 		goto out;
346 
347 	if (flush_chunk(fw, fd) < 0) {
348 		saved_errno = errno;
349 		ret = -1;
350 		goto out;
351 	}
352 
353 	/* Save time in between closing ongoing file and creating a new file. */
354 	assert(!gettimeofday(&t2, NULL));
355 	fw->acc_delay_us += diff_timeval_us(&fw->t1, &t2);
356 
357 out:
358 	/* Erase progress information. */
359 	erase(fw->erase);
360 	fw->erase = 0;
361 	fflush(stdout);
362 
363 	if (ret < 0) {
364 		/* Propagate errno(3) to caller. */
365 		errno = saved_errno;
366 	}
367 	return ret;
368 }
369