1 /*
2  * Functions for transferring between file descriptors.
3  */
4 
5 #include "pv-internal.h"
6 
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <errno.h>
11 #include <time.h>
12 #include <unistd.h>
13 #include <sys/file.h>
14 #include <sys/stat.h>
15 #include <fcntl.h>
16 #include <signal.h>
17 #include <sys/time.h>
18 
19 
20 /*
21  * Read up to "count" bytes from file descriptor "fd" into the buffer "buf",
22  * and return the number of bytes read, like read().
23  *
24  * Unlike read(), if we have read less than "count" bytes, we check to see
25  * if there's any more to read, and keep trying, to make sure we fill the
26  * buffer as full as we can.
27  *
28  * We stop retrying if the time elapsed since this function was entered
29  * reaches TRANSFER_READ_TIMEOUT microseconds.
30  */
pv__transfer_read_repeated(int fd,void * buf,size_t count)31 static ssize_t pv__transfer_read_repeated(int fd, void *buf, size_t count)
32 {
33 	struct timeval start_time;
34 	ssize_t total_read;
35 
36 	gettimeofday(&start_time, NULL);
37 
38 	total_read = 0;
39 
40 	while (count > 0) {
41 		ssize_t nread;
42 		struct timeval now;
43 		long elapsed_usec;
44 
45 		nread =
46 		    read(fd, buf,
47 			 count >
48 			 MAX_READ_AT_ONCE ? MAX_READ_AT_ONCE : count);
49 		if (nread < 0)
50 			return nread;
51 
52 		total_read += nread;
53 		buf += nread;
54 		count -= nread;
55 
56 		if (0 == nread)
57 			return total_read;
58 
59 		gettimeofday(&now, NULL);
60 		elapsed_usec =
61 		    1000000 * (now.tv_sec - start_time.tv_sec) +
62 		    (now.tv_usec - start_time.tv_usec);
63 		if (elapsed_usec > TRANSFER_READ_TIMEOUT) {
64 			debug("%s %d: %s (%ld %s)", "fd", fd,
65 			      "stopping read - timer expired",
66 			      elapsed_usec, "usec elapsed");
67 			return total_read;
68 		}
69 
70 		if (count > 0) {
71 			fd_set readfds;
72 			struct timeval tv;
73 
74 			tv.tv_sec = 0;
75 			tv.tv_usec = 0;
76 			FD_ZERO(&readfds);
77 			FD_SET(fd, &readfds);
78 
79 			debug("%s %d: %s (%ld %s, %ld %s)", "fd", fd,
80 			      "trying another read after partial buffer fill",
81 			      nread, "read", count, "remaining");
82 
83 			if (select(fd + 1, &readfds, NULL, NULL, &tv) < 1)
84 				break;
85 		}
86 	}
87 
88 	return total_read;
89 }
90 
91 
92 /*
93  * Write up to "count" bytes to file descriptor "fd" from the buffer "buf",
94  * and return the number of bytes written, like write().
95  *
96  * Unlike write(), if we have written less than "count" bytes, we check to
97  * see if we can write any more, and keep trying, to make sure we empty the
98  * buffer as much as we can.
99  *
100  * We stop retrying if the time elapsed since this function was entered
101  * reaches TRANSFER_WRITE_TIMEOUT microseconds.
102  */
pv__transfer_write_repeated(int fd,void * buf,size_t count)103 static ssize_t pv__transfer_write_repeated(int fd, void *buf, size_t count)
104 {
105 	struct timeval start_time;
106 	ssize_t total_written;
107 
108 	gettimeofday(&start_time, NULL);
109 
110 	total_written = 0;
111 
112 	while (count > 0) {
113 		ssize_t nwritten;
114 		struct timeval now;
115 		long elapsed_usec;
116 		size_t asked_to_write;
117 
118 		asked_to_write = count >
119 		    MAX_WRITE_AT_ONCE ? MAX_WRITE_AT_ONCE : count;
120 
121 		nwritten = write(fd, buf, asked_to_write);
122 		if (nwritten < 0) {
123 			if ((EINTR == errno) || (EAGAIN == errno)) {
124 				/*
125 				 * Interrupted by a signal - probably our
126 				 * alarm() - so just return what we've
127 				 * written so far.
128 				 */
129 				return total_written;
130 			} else {
131 				/*
132 				 * Legitimate error - return negative.
133 				 */
134 				return nwritten;
135 			}
136 		}
137 
138 		total_written += nwritten;
139 		buf += nwritten;
140 		count -= nwritten;
141 
142 		if (0 == nwritten)
143 			return total_written;
144 
145 		gettimeofday(&now, NULL);
146 		elapsed_usec =
147 		    1000000 * (now.tv_sec - start_time.tv_sec) +
148 		    (now.tv_usec - start_time.tv_usec);
149 		if (elapsed_usec > TRANSFER_WRITE_TIMEOUT) {
150 			debug("%s %d: %s (%ld %s)", "fd", fd,
151 			      "stopping write - timer expired",
152 			      elapsed_usec, "usec elapsed");
153 			return total_written;
154 		}
155 
156 		/*
157 		 * Running the select() here seems to make PV eat a lot of
158 		 * CPU in some cases, so instead we just go round the loop
159 		 * again and rely on our alarm() to interrupt us if we run
160 		 * out of time - also on our gettimeofday() check.
161 		 */
162 		if (count > 0) {
163 # if 0					    /* disabled after 1.6.0 - see comment above */
164 			fd_set writefds;
165 			struct timeval tv;
166 # endif
167 
168 			debug("%s %d: %s (%ld %s, %ld %s)", "fd", fd,
169 			      "trying another write after partial buffer flush",
170 			      nwritten, "written", count, "remaining");
171 
172 # if 0					    /* disabled after 1.6.0 - see comment above */
173 			tv.tv_sec = 0;
174 			tv.tv_usec = 0;
175 			FD_ZERO(&writefds);
176 			FD_SET(fd, &writefds);
177 			if (select(fd + 1, NULL, &writefds, NULL, &tv) < 1)
178 				break;
179 # endif
180 		}
181 	}
182 
183 	return total_written;
184 }
185 
186 
187 /*
188  * Read some data from the given file descriptor. Returns zero if there was
189  * a transient error and we need to return 0 from pv_transfer, otherwise
190  * returns 1.
191  *
192  * At most, the number of bytes read will be the number of bytes remaining
193  * in the input buffer.  If state->rate_limit is >0, and/or "allowed" is >0,
194  * then the maximum number of bytes read will be the number remaining unused
195  * in the input buffer or the value of "allowed", whichever is smaller.
196  *
197  * If splice() was successfully used, sets state->splice_used to 1; if it
198  * failed, then state->splice_failed_fd is updated to the current fd so
199  * splice() won't be tried again until the next input file.
200  *
201  * Updates state->read_position by the number of bytes read, unless splice()
202  * was used, in which case it does not since there's nothing in the buffer
203  * (and it also adds the bytes to state->written since they've been written
204  * to the output).
205  *
206  * On read error, updates state->exit_status, and if allowed by
207  * state->skip_errors, tries to skip past the problem.
208  *
209  * If the end of the input file is reached or the error is unrecoverable,
210  * sets *eof_in to 1.  If all data in the buffer has been written at this
211  * point, then also sets *eof_out.
212  */
pv__transfer_read(pvstate_t state,int fd,int * eof_in,int * eof_out,unsigned long long allowed)213 static int pv__transfer_read(pvstate_t state, int fd,
214 			     int *eof_in, int *eof_out,
215 			     unsigned long long allowed)
216 {
217 	unsigned long bytes_can_read;
218 	unsigned long amount_to_skip;
219 	long amount_skipped;
220 	long orig_offset;
221 	long skip_offset;
222 	ssize_t nread;
223 #ifdef HAVE_SPLICE
224 	size_t bytes_to_splice;
225 #endif				/* HAVE_SPLICE */
226 
227 	bytes_can_read = state->buffer_size - state->read_position;
228 
229 #ifdef HAVE_SPLICE
230 	state->splice_used = 0;
231 	if ((!state->linemode) && (!state->no_splice)
232 	    && (fd != state->splice_failed_fd)
233 	    && (0 == state->to_write)) {
234 		if (state->rate_limit || allowed != 0)
235 			bytes_to_splice = allowed;
236 		else
237 			bytes_to_splice = bytes_can_read;
238 
239 		nread = splice(fd, NULL, STDOUT_FILENO, NULL,
240 			       bytes_to_splice, SPLICE_F_MORE);
241 
242 		state->splice_used = 1;
243 		if ((nread < 0) && (EINVAL == errno)) {
244 			debug("%s %d: %s", "fd", fd,
245 			      "splice failed with EINVAL - disabling");
246 			state->splice_failed_fd = fd;
247 			state->splice_used = 0;
248 			/*
249 			 * Fall through to read() below.
250 			 */
251 		} else if (nread > 0) {
252 			state->written = nread;
253 		} else if ((-1 == nread) && (EAGAIN == errno)) {
254 			/* nothing read yet - do nothing */
255 		} else {
256 			/* EOF might not really be EOF, it seems */
257 			state->splice_used = 0;
258 		}
259 	}
260 	if (0 == state->splice_used) {
261 		nread =
262 		    pv__transfer_read_repeated(fd,
263 					       state->transfer_buffer +
264 					       state->read_position,
265 					       bytes_can_read);
266 	}
267 #else
268 	nread =
269 	    pv__transfer_read_repeated(fd,
270 				       state->transfer_buffer +
271 				       state->read_position,
272 				       bytes_can_read);
273 #endif				/* HAVE_SPLICE */
274 
275 
276 	if (0 == nread) {
277 		/*
278 		 * If read returned 0, we've reached the end of this input
279 		 * file.  If we've also written all the data in the transfer
280 		 * buffer, we set eof_out as well, so that the main loop can
281 		 * move on to the next input file.
282 		 */
283 		*eof_in = 1;
284 		if (state->write_position >= state->read_position)
285 			*eof_out = 1;
286 		return 1;
287 	} else if (nread > 0) {
288 		/*
289 		 * Read returned >0, so we successfully read data - clear
290 		 * the error counter and update our record of how much data
291 		 * we've got in the buffer.
292 		 */
293 		state->read_errors_in_a_row = 0;
294 #ifdef HAVE_SPLICE
295 		/*
296 		 * If we used splice(), there isn't any more data in the
297 		 * buffer than there was before.
298 		 */
299 		if (0 == state->splice_used)
300 			state->read_position += nread;
301 #else
302 		state->read_position += nread;
303 #endif				/* HAVE_SPLICE */
304 		return 1;
305 	}
306 
307 	/*
308 	 * If we reach this point, nread<0, so there was an error.
309 	 */
310 
311 	/*
312 	 * If a read error occurred but it was EINTR or EAGAIN, just wait a
313 	 * bit and then return zero, since this was a transient error.
314 	 */
315 	if ((EINTR == errno) || (EAGAIN == errno)) {
316 		struct timeval tv;
317 		debug("%s %d: %s: %s", "fd", fd,
318 		      "transient error - waiting briefly",
319 		      strerror(errno));
320 		tv.tv_sec = 0;
321 		tv.tv_usec = 10000;
322 		select(0, NULL, NULL, NULL, &tv);
323 		return 0;
324 	}
325 
326 	/*
327 	 * The read error is not transient, so update the program's final
328 	 * exit status, regardless of whether we're skipping errors, and
329 	 * increment the error counter.
330 	 */
331 	state->exit_status |= 16;
332 	state->read_errors_in_a_row++;
333 
334 	/*
335 	 * If we aren't skipping errors, show the error and pretend we
336 	 * reached the end of this file.
337 	 */
338 	if (0 == state->skip_errors) {
339 		pv_error(state, "%s: %s: %s",
340 			 state->current_file,
341 			 _("read failed"), strerror(errno));
342 		*eof_in = 1;
343 		if (state->write_position >= state->read_position) {
344 			*eof_out = 1;
345 		}
346 		return 1;
347 	}
348 
349 	/*
350 	 * Try to skip past the error.
351 	 */
352 
353 	amount_skipped = -1;
354 
355 	if (!state->read_error_warning_shown) {
356 		pv_error(state, "%s: %s: %s",
357 			 state->current_file,
358 			 _
359 			 ("warning: read errors detected"),
360 			 strerror(errno));
361 		state->read_error_warning_shown = 1;
362 	}
363 
364 	orig_offset = lseek64(fd, 0, SEEK_CUR);
365 
366 	/*
367 	 * If the file is not seekable, we can't skip past the error, so we
368 	 * will have to abandon the attempt and pretend we reached the end
369 	 * of the file.
370 	 */
371 	if (0 > orig_offset) {
372 		pv_error(state, "%s: %s: %s",
373 			 state->current_file,
374 			 _("file is not seekable"), strerror(errno));
375 		*eof_in = 1;
376 		if (state->write_position >= state->read_position) {
377 			*eof_out = 1;
378 		}
379 		return 1;
380 	}
381 
382 	if (state->read_errors_in_a_row < 10) {
383 		amount_to_skip = state->read_errors_in_a_row < 5 ? 1 : 2;
384 	} else if (state->read_errors_in_a_row < 20) {
385 		amount_to_skip = 1 << (state->read_errors_in_a_row - 10);
386 	} else {
387 		amount_to_skip = 512;
388 	}
389 
390 	/*
391 	 * Round the skip amount down to the start of the next block of the
392 	 * skip amount size.  For instance if the skip amount is 512, but
393 	 * our file offset is 257, we'll jump to 512 instead of 769.
394 	 */
395 	if (amount_to_skip > 1) {
396 		skip_offset = orig_offset + amount_to_skip;
397 		skip_offset -= (skip_offset % amount_to_skip);
398 		if (skip_offset > orig_offset) {
399 			amount_to_skip = skip_offset - orig_offset;
400 		}
401 	}
402 
403 	/*
404 	 * Trim the skip amount so we wouldn't read too much.
405 	 */
406 	if (amount_to_skip > bytes_can_read)
407 		amount_to_skip = bytes_can_read;
408 
409 	skip_offset = lseek64(fd, orig_offset + amount_to_skip, SEEK_SET);
410 
411 	/*
412 	 * If the skip we just tried didn't work, try only skipping 1 byte
413 	 * in case we were trying to go past the end of the input file.
414 	 */
415 	if (skip_offset < 0) {
416 		amount_to_skip = 1;
417 		skip_offset =
418 		    lseek64(fd, orig_offset + amount_to_skip, SEEK_SET);
419 	}
420 
421 	if (skip_offset < 0) {
422 		/*
423 		 * Failed to skip - lseek() returned an error, so mark the
424 		 * file as having ended.
425 		 */
426 		*eof_in = 1;
427 		/*
428 		 * EINVAL means the file has ended since we've tried to go
429 		 * past the end of it, so we don't bother with a warning
430 		 * since it just means we've reached the end anyway.
431 		 */
432 		if (EINVAL != errno) {
433 			pv_error(state,
434 				 "%s: %s: %s",
435 				 state->current_file,
436 				 _
437 				 ("failed to seek past error"),
438 				 strerror(errno));
439 		}
440 	} else {
441 		amount_skipped = skip_offset - orig_offset;
442 	}
443 
444 	/*
445 	 * If we succeeded in skipping some bytes, zero the equivalent part
446 	 * of the transfer buffer, and update the buffer position.
447 	 */
448 	if (amount_skipped > 0) {
449 		memset(state->transfer_buffer +
450 		       state->read_position, 0, amount_skipped);
451 		state->read_position += amount_skipped;
452 		if (state->skip_errors < 2) {
453 			pv_error(state, "%s: %s: %ld - %ld (%ld %s)",
454 				 state->current_file,
455 				 _
456 				 ("skipped past read error"),
457 				 orig_offset,
458 				 skip_offset, amount_skipped, _("B"));
459 		}
460 	} else {
461 		/*
462 		 * Failed to skip - mark file as ended.
463 		 */
464 		*eof_in = 1;
465 		if (state->write_position >= state->read_position) {
466 			*eof_out = 1;
467 		}
468 	}
469 
470 	return 1;
471 }
472 
473 
474 /*
475  * Write state->to_write bytes of data from the transfer buffer to stdout.
476  * Returns zero if there was a transient error and we need to return 0 from
477  * pv_transfer, otherwise returns 1.
478  *
479  * Updates state->write_position by moving it on by the number of bytes
480  * written; adds the number of bytes written to state->written; sets
481  * *eof_out on stdout EOF or when the write position catches up with the
482  * read position AND *eof_in is 1 (meaning we've reached the end of data).
483  *
484  * On error, sets *eof_out to 1, sets state->written to -1, and updates
485  * state->exit_status.
486  */
pv__transfer_write(pvstate_t state,int * eof_in,int * eof_out,long * lineswritten)487 static int pv__transfer_write(pvstate_t state,
488 			      int *eof_in, int *eof_out,
489 			      long *lineswritten)
490 {
491 	ssize_t nwritten;
492 
493 	signal(SIGALRM, SIG_IGN);
494 	alarm(1);
495 
496 	nwritten = pv__transfer_write_repeated(STDOUT_FILENO,
497 					       state->transfer_buffer +
498 					       state->write_position,
499 					       state->to_write);
500 
501 	alarm(0);
502 
503 	if (0 == nwritten) {
504 		/*
505 		 * Write returned 0 - EOF on stdout.
506 		 */
507 		*eof_out = 1;
508 		return 1;
509 	} else if (nwritten > 0) {
510 		/*
511 		 * Write returned >0 - data successfully written.
512 		 */
513 		if ((state->linemode) && (lineswritten != NULL)) {
514 			/*
515 			 * Guillaume Marcais: use strchr to count \n
516 			 */
517 			unsigned char save;
518 			char *ptr;
519 			long lines = 0;
520 
521 			save =
522 			    state->transfer_buffer[state->write_position +
523 						   nwritten];
524 			state->transfer_buffer[state->write_position +
525 					       nwritten] = 0;
526 			ptr =
527 			    (char *) (state->transfer_buffer +
528 				      state->write_position - 1);
529 
530 			if (state->null) {
531 				for (ptr++;
532 				     ptr -
533 				     (char *) state->transfer_buffer -
534 				     state->write_position <
535 				     (size_t) nwritten; ptr++) {
536 					if (*ptr == '\0')
537 						++lines;
538 				}
539 			} else {
540 				while ((ptr =
541 					strchr((char *) (ptr + 1), '\n')))
542 					++lines;
543 			}
544 
545 			*lineswritten += lines;
546 			state->transfer_buffer[state->write_position +
547 					       nwritten] = save;
548 		}
549 
550 		state->write_position += nwritten;
551 		state->written += nwritten;
552 
553 		/*
554 		 * If we're monitoring the output, update our copy of the
555 		 * last few bytes we've written.
556 		 */
557 		if (((state->components_used & PV_DISPLAY_OUTPUTBUF) != 0)
558 		    && (nwritten > 0)) {
559 			long new_portion_length, old_portion_length;
560 
561 			new_portion_length = nwritten;
562 			if (new_portion_length > state->lastoutput_length)
563 				new_portion_length =
564 				    state->lastoutput_length;
565 
566 			old_portion_length =
567 			    state->lastoutput_length - new_portion_length;
568 
569 			/*
570 			 * Make room for the new portion.
571 			 */
572 			if (old_portion_length > 0) {
573 				memmove(state->lastoutput_buffer,
574 					state->lastoutput_buffer +
575 					new_portion_length,
576 					old_portion_length);
577 			}
578 
579 			/*
580 			 * Copy the new data in.
581 			 */
582 			memcpy(state->lastoutput_buffer +
583 			       old_portion_length,
584 			       state->transfer_buffer +
585 			       state->write_position - new_portion_length,
586 			       new_portion_length);
587 		}
588 
589 		/*
590 		 * If we've written all the data in the buffer, reset the
591 		 * read pointer to the start, and if the input file is at
592 		 * EOF, set eof_out as well to indicate that we've written
593 		 * everything for this input file.
594 		 */
595 		if (state->write_position >= state->read_position) {
596 			state->write_position = 0;
597 			state->read_position = 0;
598 			if (*eof_in)
599 				*eof_out = 1;
600 		}
601 
602 		return 1;
603 	}
604 
605 	/*
606 	 * If we reach this point, nwritten<0, so there was an error.
607 	 */
608 
609 	/*
610 	 * If a write error occurred but it was EINTR or EAGAIN, just wait a
611 	 * bit and then return zero, since this was a transient error.
612 	 */
613 	if ((EINTR == errno) || (EAGAIN == errno)) {
614 		struct timeval tv;
615 		tv.tv_sec = 0;
616 		tv.tv_usec = 10000;
617 		select(0, NULL, NULL, NULL, &tv);
618 		return 0;
619 	}
620 
621 	/*
622 	 * SIGPIPE means we've finished. Don't output an error because it's
623 	 * not really our error to report.
624 	 */
625 	if (EPIPE == errno) {
626 		*eof_in = 1;
627 		*eof_out = 1;
628 		return 0;
629 	}
630 
631 	pv_error(state, "%s: %s", _("write failed"), strerror(errno));
632 	state->exit_status |= 16;
633 	*eof_out = 1;
634 	state->written = -1;
635 
636 	return 1;
637 }
638 
639 
640 /*
641  * Transfer some data from "fd" to standard output, timing out after 9/100
642  * of a second.  If state->rate_limit is >0, and/or "allowed" is >0, only up
643  * to "allowed" bytes can be written.  The variables that "eof_in" and
644  * "eof_out" point to are used to flag that we've finished reading and
645  * writing respectively.
646  *
647  * Returns the number of bytes written, or negative on error (in which case
648  * state->exit_status is updated). In line mode, the number of lines written
649  * will be put into *lineswritten.
650  */
pv_transfer(pvstate_t state,int fd,int * eof_in,int * eof_out,unsigned long long allowed,long * lineswritten)651 long pv_transfer(pvstate_t state, int fd, int *eof_in, int *eof_out,
652 		 unsigned long long allowed, long *lineswritten)
653 {
654 	struct timeval tv;
655 	fd_set readfds;
656 	fd_set writefds;
657 	int max_fd;
658 	int n;
659 
660 	if (NULL == state)
661 		return 0;
662 
663 	/*
664 	 * Reinitialise the error skipping variables if the file descriptor
665 	 * has changed since the last time we were called.
666 	 */
667 	if (fd != state->last_read_skip_fd) {
668 		state->last_read_skip_fd = fd;
669 		state->read_errors_in_a_row = 0;
670 		state->read_error_warning_shown = 0;
671 	}
672 
673 	if (NULL == state->transfer_buffer) {
674 		state->buffer_size = state->target_buffer_size;
675 		state->transfer_buffer =
676 		    (unsigned char *) malloc(state->buffer_size + 32);
677 		if (NULL == state->transfer_buffer) {
678 			pv_error(state, "%s: %s",
679 				 _("buffer allocation failed"),
680 				 strerror(errno));
681 			state->exit_status |= 64;
682 			return -1;
683 		}
684 	}
685 
686 	/*
687 	 * Reallocate the buffer if the buffer size has changed mid-transfer.
688 	 */
689 	if (state->buffer_size < state->target_buffer_size) {
690 		unsigned char *newptr;
691 		newptr =
692 		    realloc(state->transfer_buffer,
693 			    state->target_buffer_size + 32);
694 		if (NULL == newptr) {
695 			/*
696 			 * Reset target if realloc failed so we don't keep
697 			 * trying to realloc over and over.
698 			 */
699 			debug("realloc: %s", strerror(errno));
700 			state->target_buffer_size = state->buffer_size;
701 		} else {
702 			debug("%s: %ld", "buffer resized",
703 			      state->buffer_size);
704 			state->transfer_buffer = newptr;
705 			state->buffer_size = state->target_buffer_size;
706 		}
707 	}
708 
709 	if ((state->linemode) && (lineswritten != NULL))
710 		*lineswritten = 0;
711 
712 	if ((*eof_in) && (*eof_out))
713 		return 0;
714 
715 	tv.tv_sec = 0;
716 	tv.tv_usec = 90000;
717 
718 	FD_ZERO(&readfds);
719 	FD_ZERO(&writefds);
720 
721 	max_fd = 0;
722 
723 	/*
724 	 * If the input file is not at EOF and there's room in the buffer,
725 	 * look for incoming data from it.
726 	 */
727 	if ((!(*eof_in)) && (state->read_position < state->buffer_size)) {
728 		FD_SET(fd, &readfds);
729 		if (fd > max_fd)
730 			max_fd = fd;
731 	}
732 
733 	/*
734 	 * Work out how much we're allowed to write, based on the amount of
735 	 * data left in the buffer.  If rate limiting is active or "allowed"
736 	 * is >0, then this puts an upper limit on how much we're allowed to
737 	 * write.
738 	 */
739 	state->to_write = state->read_position - state->write_position;
740 	if ((state->rate_limit > 0) || (allowed > 0)) {
741 		if ((unsigned long long) (state->to_write) > allowed) {
742 			state->to_write = allowed;
743 		}
744 	}
745 
746 	/*
747 	 * If we don't think we've finished writing and there's anything
748 	 * we're allowed to write, look for the stdout becoming writable.
749 	 */
750 	if ((!(*eof_out)) && (state->to_write > 0)) {
751 		FD_SET(STDOUT_FILENO, &writefds);
752 		if (STDOUT_FILENO > max_fd)
753 			max_fd = STDOUT_FILENO;
754 	}
755 
756 	n = select(max_fd + 1, &readfds, &writefds, NULL, &tv);
757 
758 	if (n < 0) {
759 		/*
760 		 * Ignore transient errors by returning 0 immediately.
761 		 */
762 		if (EINTR == errno)
763 			return 0;
764 
765 		/*
766 		 * Any other error is a problem and we must report back.
767 		 */
768 		pv_error(state, "%s: %s: %d: %s",
769 			 state->current_file,
770 			 _("select call failed"), n, strerror(errno));
771 
772 		state->exit_status |= 16;
773 
774 		return -1;
775 	}
776 
777 	state->written = 0;
778 
779 	/*
780 	 * If there is data to read, try to read some in. Return early if
781 	 * there was a transient read error.
782 	 *
783 	 * NB this can update state->written because of splice().
784 	 */
785 	if (FD_ISSET(fd, &readfds)) {
786 		if (pv__transfer_read
787 		    (state, fd, eof_in, eof_out, allowed) == 0)
788 			return 0;
789 	}
790 
791 	/*
792 	 * In line mode, only write up to and including the last newline,
793 	 * so that we're writing output line-by-line.
794 	 */
795 	if ((state->to_write > 0) && (state->linemode) && !(state->null)) {
796 		/*
797 		 * Guillaume Marcais: use strrchr to find last \n
798 		 */
799 		unsigned char save;
800 		char *start;
801 		char *end;
802 
803 		save =
804 		    state->transfer_buffer[state->write_position +
805 					   state->to_write];
806 		state->transfer_buffer[state->write_position +
807 				       state->to_write] = 0;
808 
809 		start =
810 		    (char *) (state->transfer_buffer +
811 			      state->write_position);
812 		end = strrchr(start, '\n');
813 		state->transfer_buffer[state->write_position +
814 				       state->to_write] = save;
815 
816 		if (end != NULL) {
817 			state->to_write = (end - start) + 1;
818 		}
819 	}
820 
821 	/*
822 	 * If there is data to write, and stdout is ready to receive it, and
823 	 * we didn't use splice() this time, write some data.  Return early
824 	 * if there was a transient write error.
825 	 */
826 	if (FD_ISSET(STDOUT_FILENO, &writefds)
827 #ifdef HAVE_SPLICE
828 	    && (0 == state->splice_used)
829 #endif				/* HAVE_SPLICE */
830 	    && (state->read_position > state->write_position)
831 	    && (state->to_write > 0)) {
832 		if (pv__transfer_write
833 		    (state, eof_in, eof_out, lineswritten) == 0)
834 			return 0;
835 	}
836 #ifdef MAXIMISE_BUFFER_FILL
837 	/*
838 	 * Rotate the written bytes out of the buffer so that it can be
839 	 * filled up completely by the next read.
840 	 */
841 	if (state->write_position > 0) {
842 		if (state->write_position < state->read_position) {
843 			memmove(state->transfer_buffer,
844 				state->transfer_buffer +
845 				state->write_position,
846 				state->read_position -
847 				state->write_position);
848 			state->read_position -= state->write_position;
849 			state->write_position = 0;
850 		} else {
851 			state->write_position = 0;
852 			state->read_position = 0;
853 		}
854 	}
855 #endif				/* MAXIMISE_BUFFER_FILL */
856 
857 	return state->written;
858 }
859 
860 /* EOF */
861