1 /*
2 * Functions for transferring between file descriptors.
3 */
4
5 #include "pv-internal.h"
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <errno.h>
11 #include <time.h>
12 #include <unistd.h>
13 #include <sys/file.h>
14 #include <sys/stat.h>
15 #include <fcntl.h>
16 #include <signal.h>
17 #include <sys/time.h>
18
19
20 /*
21 * Read up to "count" bytes from file descriptor "fd" into the buffer "buf",
22 * and return the number of bytes read, like read().
23 *
24 * Unlike read(), if we have read less than "count" bytes, we check to see
25 * if there's any more to read, and keep trying, to make sure we fill the
26 * buffer as full as we can.
27 *
28 * We stop retrying if the time elapsed since this function was entered
29 * reaches TRANSFER_READ_TIMEOUT microseconds.
30 */
pv__transfer_read_repeated(int fd,void * buf,size_t count)31 static ssize_t pv__transfer_read_repeated(int fd, void *buf, size_t count)
32 {
33 struct timeval start_time;
34 ssize_t total_read;
35
36 gettimeofday(&start_time, NULL);
37
38 total_read = 0;
39
40 while (count > 0) {
41 ssize_t nread;
42 struct timeval now;
43 long elapsed_usec;
44
45 nread =
46 read(fd, buf,
47 count >
48 MAX_READ_AT_ONCE ? MAX_READ_AT_ONCE : count);
49 if (nread < 0)
50 return nread;
51
52 total_read += nread;
53 buf += nread;
54 count -= nread;
55
56 if (0 == nread)
57 return total_read;
58
59 gettimeofday(&now, NULL);
60 elapsed_usec =
61 1000000 * (now.tv_sec - start_time.tv_sec) +
62 (now.tv_usec - start_time.tv_usec);
63 if (elapsed_usec > TRANSFER_READ_TIMEOUT) {
64 debug("%s %d: %s (%ld %s)", "fd", fd,
65 "stopping read - timer expired",
66 elapsed_usec, "usec elapsed");
67 return total_read;
68 }
69
70 if (count > 0) {
71 fd_set readfds;
72 struct timeval tv;
73
74 tv.tv_sec = 0;
75 tv.tv_usec = 0;
76 FD_ZERO(&readfds);
77 FD_SET(fd, &readfds);
78
79 debug("%s %d: %s (%ld %s, %ld %s)", "fd", fd,
80 "trying another read after partial buffer fill",
81 nread, "read", count, "remaining");
82
83 if (select(fd + 1, &readfds, NULL, NULL, &tv) < 1)
84 break;
85 }
86 }
87
88 return total_read;
89 }
90
91
92 /*
93 * Write up to "count" bytes to file descriptor "fd" from the buffer "buf",
94 * and return the number of bytes written, like write().
95 *
96 * Unlike write(), if we have written less than "count" bytes, we check to
97 * see if we can write any more, and keep trying, to make sure we empty the
98 * buffer as much as we can.
99 *
100 * We stop retrying if the time elapsed since this function was entered
101 * reaches TRANSFER_WRITE_TIMEOUT microseconds.
102 */
pv__transfer_write_repeated(int fd,void * buf,size_t count)103 static ssize_t pv__transfer_write_repeated(int fd, void *buf, size_t count)
104 {
105 struct timeval start_time;
106 ssize_t total_written;
107
108 gettimeofday(&start_time, NULL);
109
110 total_written = 0;
111
112 while (count > 0) {
113 ssize_t nwritten;
114 struct timeval now;
115 long elapsed_usec;
116 size_t asked_to_write;
117
118 asked_to_write = count >
119 MAX_WRITE_AT_ONCE ? MAX_WRITE_AT_ONCE : count;
120
121 nwritten = write(fd, buf, asked_to_write);
122 if (nwritten < 0) {
123 if ((EINTR == errno) || (EAGAIN == errno)) {
124 /*
125 * Interrupted by a signal - probably our
126 * alarm() - so just return what we've
127 * written so far.
128 */
129 return total_written;
130 } else {
131 /*
132 * Legitimate error - return negative.
133 */
134 return nwritten;
135 }
136 }
137
138 total_written += nwritten;
139 buf += nwritten;
140 count -= nwritten;
141
142 if (0 == nwritten)
143 return total_written;
144
145 gettimeofday(&now, NULL);
146 elapsed_usec =
147 1000000 * (now.tv_sec - start_time.tv_sec) +
148 (now.tv_usec - start_time.tv_usec);
149 if (elapsed_usec > TRANSFER_WRITE_TIMEOUT) {
150 debug("%s %d: %s (%ld %s)", "fd", fd,
151 "stopping write - timer expired",
152 elapsed_usec, "usec elapsed");
153 return total_written;
154 }
155
156 /*
157 * Running the select() here seems to make PV eat a lot of
158 * CPU in some cases, so instead we just go round the loop
159 * again and rely on our alarm() to interrupt us if we run
160 * out of time - also on our gettimeofday() check.
161 */
162 if (count > 0) {
163 # if 0 /* disabled after 1.6.0 - see comment above */
164 fd_set writefds;
165 struct timeval tv;
166 # endif
167
168 debug("%s %d: %s (%ld %s, %ld %s)", "fd", fd,
169 "trying another write after partial buffer flush",
170 nwritten, "written", count, "remaining");
171
172 # if 0 /* disabled after 1.6.0 - see comment above */
173 tv.tv_sec = 0;
174 tv.tv_usec = 0;
175 FD_ZERO(&writefds);
176 FD_SET(fd, &writefds);
177 if (select(fd + 1, NULL, &writefds, NULL, &tv) < 1)
178 break;
179 # endif
180 }
181 }
182
183 return total_written;
184 }
185
186
187 /*
188 * Read some data from the given file descriptor. Returns zero if there was
189 * a transient error and we need to return 0 from pv_transfer, otherwise
190 * returns 1.
191 *
192 * At most, the number of bytes read will be the number of bytes remaining
193 * in the input buffer. If state->rate_limit is >0, and/or "allowed" is >0,
194 * then the maximum number of bytes read will be the number remaining unused
195 * in the input buffer or the value of "allowed", whichever is smaller.
196 *
197 * If splice() was successfully used, sets state->splice_used to 1; if it
198 * failed, then state->splice_failed_fd is updated to the current fd so
199 * splice() won't be tried again until the next input file.
200 *
201 * Updates state->read_position by the number of bytes read, unless splice()
202 * was used, in which case it does not since there's nothing in the buffer
203 * (and it also adds the bytes to state->written since they've been written
204 * to the output).
205 *
206 * On read error, updates state->exit_status, and if allowed by
207 * state->skip_errors, tries to skip past the problem.
208 *
209 * If the end of the input file is reached or the error is unrecoverable,
210 * sets *eof_in to 1. If all data in the buffer has been written at this
211 * point, then also sets *eof_out.
212 */
pv__transfer_read(pvstate_t state,int fd,int * eof_in,int * eof_out,unsigned long long allowed)213 static int pv__transfer_read(pvstate_t state, int fd,
214 int *eof_in, int *eof_out,
215 unsigned long long allowed)
216 {
217 unsigned long bytes_can_read;
218 unsigned long amount_to_skip;
219 long amount_skipped;
220 long orig_offset;
221 long skip_offset;
222 ssize_t nread;
223 #ifdef HAVE_SPLICE
224 size_t bytes_to_splice;
225 #endif /* HAVE_SPLICE */
226
227 bytes_can_read = state->buffer_size - state->read_position;
228
229 #ifdef HAVE_SPLICE
230 state->splice_used = 0;
231 if ((!state->linemode) && (!state->no_splice)
232 && (fd != state->splice_failed_fd)
233 && (0 == state->to_write)) {
234 if (state->rate_limit || allowed != 0)
235 bytes_to_splice = allowed;
236 else
237 bytes_to_splice = bytes_can_read;
238
239 nread = splice(fd, NULL, STDOUT_FILENO, NULL,
240 bytes_to_splice, SPLICE_F_MORE);
241
242 state->splice_used = 1;
243 if ((nread < 0) && (EINVAL == errno)) {
244 debug("%s %d: %s", "fd", fd,
245 "splice failed with EINVAL - disabling");
246 state->splice_failed_fd = fd;
247 state->splice_used = 0;
248 /*
249 * Fall through to read() below.
250 */
251 } else if (nread > 0) {
252 state->written = nread;
253 } else if ((-1 == nread) && (EAGAIN == errno)) {
254 /* nothing read yet - do nothing */
255 } else {
256 /* EOF might not really be EOF, it seems */
257 state->splice_used = 0;
258 }
259 }
260 if (0 == state->splice_used) {
261 nread =
262 pv__transfer_read_repeated(fd,
263 state->transfer_buffer +
264 state->read_position,
265 bytes_can_read);
266 }
267 #else
268 nread =
269 pv__transfer_read_repeated(fd,
270 state->transfer_buffer +
271 state->read_position,
272 bytes_can_read);
273 #endif /* HAVE_SPLICE */
274
275
276 if (0 == nread) {
277 /*
278 * If read returned 0, we've reached the end of this input
279 * file. If we've also written all the data in the transfer
280 * buffer, we set eof_out as well, so that the main loop can
281 * move on to the next input file.
282 */
283 *eof_in = 1;
284 if (state->write_position >= state->read_position)
285 *eof_out = 1;
286 return 1;
287 } else if (nread > 0) {
288 /*
289 * Read returned >0, so we successfully read data - clear
290 * the error counter and update our record of how much data
291 * we've got in the buffer.
292 */
293 state->read_errors_in_a_row = 0;
294 #ifdef HAVE_SPLICE
295 /*
296 * If we used splice(), there isn't any more data in the
297 * buffer than there was before.
298 */
299 if (0 == state->splice_used)
300 state->read_position += nread;
301 #else
302 state->read_position += nread;
303 #endif /* HAVE_SPLICE */
304 return 1;
305 }
306
307 /*
308 * If we reach this point, nread<0, so there was an error.
309 */
310
311 /*
312 * If a read error occurred but it was EINTR or EAGAIN, just wait a
313 * bit and then return zero, since this was a transient error.
314 */
315 if ((EINTR == errno) || (EAGAIN == errno)) {
316 struct timeval tv;
317 debug("%s %d: %s: %s", "fd", fd,
318 "transient error - waiting briefly",
319 strerror(errno));
320 tv.tv_sec = 0;
321 tv.tv_usec = 10000;
322 select(0, NULL, NULL, NULL, &tv);
323 return 0;
324 }
325
326 /*
327 * The read error is not transient, so update the program's final
328 * exit status, regardless of whether we're skipping errors, and
329 * increment the error counter.
330 */
331 state->exit_status |= 16;
332 state->read_errors_in_a_row++;
333
334 /*
335 * If we aren't skipping errors, show the error and pretend we
336 * reached the end of this file.
337 */
338 if (0 == state->skip_errors) {
339 pv_error(state, "%s: %s: %s",
340 state->current_file,
341 _("read failed"), strerror(errno));
342 *eof_in = 1;
343 if (state->write_position >= state->read_position) {
344 *eof_out = 1;
345 }
346 return 1;
347 }
348
349 /*
350 * Try to skip past the error.
351 */
352
353 amount_skipped = -1;
354
355 if (!state->read_error_warning_shown) {
356 pv_error(state, "%s: %s: %s",
357 state->current_file,
358 _
359 ("warning: read errors detected"),
360 strerror(errno));
361 state->read_error_warning_shown = 1;
362 }
363
364 orig_offset = lseek64(fd, 0, SEEK_CUR);
365
366 /*
367 * If the file is not seekable, we can't skip past the error, so we
368 * will have to abandon the attempt and pretend we reached the end
369 * of the file.
370 */
371 if (0 > orig_offset) {
372 pv_error(state, "%s: %s: %s",
373 state->current_file,
374 _("file is not seekable"), strerror(errno));
375 *eof_in = 1;
376 if (state->write_position >= state->read_position) {
377 *eof_out = 1;
378 }
379 return 1;
380 }
381
382 if (state->read_errors_in_a_row < 10) {
383 amount_to_skip = state->read_errors_in_a_row < 5 ? 1 : 2;
384 } else if (state->read_errors_in_a_row < 20) {
385 amount_to_skip = 1 << (state->read_errors_in_a_row - 10);
386 } else {
387 amount_to_skip = 512;
388 }
389
390 /*
391 * Round the skip amount down to the start of the next block of the
392 * skip amount size. For instance if the skip amount is 512, but
393 * our file offset is 257, we'll jump to 512 instead of 769.
394 */
395 if (amount_to_skip > 1) {
396 skip_offset = orig_offset + amount_to_skip;
397 skip_offset -= (skip_offset % amount_to_skip);
398 if (skip_offset > orig_offset) {
399 amount_to_skip = skip_offset - orig_offset;
400 }
401 }
402
403 /*
404 * Trim the skip amount so we wouldn't read too much.
405 */
406 if (amount_to_skip > bytes_can_read)
407 amount_to_skip = bytes_can_read;
408
409 skip_offset = lseek64(fd, orig_offset + amount_to_skip, SEEK_SET);
410
411 /*
412 * If the skip we just tried didn't work, try only skipping 1 byte
413 * in case we were trying to go past the end of the input file.
414 */
415 if (skip_offset < 0) {
416 amount_to_skip = 1;
417 skip_offset =
418 lseek64(fd, orig_offset + amount_to_skip, SEEK_SET);
419 }
420
421 if (skip_offset < 0) {
422 /*
423 * Failed to skip - lseek() returned an error, so mark the
424 * file as having ended.
425 */
426 *eof_in = 1;
427 /*
428 * EINVAL means the file has ended since we've tried to go
429 * past the end of it, so we don't bother with a warning
430 * since it just means we've reached the end anyway.
431 */
432 if (EINVAL != errno) {
433 pv_error(state,
434 "%s: %s: %s",
435 state->current_file,
436 _
437 ("failed to seek past error"),
438 strerror(errno));
439 }
440 } else {
441 amount_skipped = skip_offset - orig_offset;
442 }
443
444 /*
445 * If we succeeded in skipping some bytes, zero the equivalent part
446 * of the transfer buffer, and update the buffer position.
447 */
448 if (amount_skipped > 0) {
449 memset(state->transfer_buffer +
450 state->read_position, 0, amount_skipped);
451 state->read_position += amount_skipped;
452 if (state->skip_errors < 2) {
453 pv_error(state, "%s: %s: %ld - %ld (%ld %s)",
454 state->current_file,
455 _
456 ("skipped past read error"),
457 orig_offset,
458 skip_offset, amount_skipped, _("B"));
459 }
460 } else {
461 /*
462 * Failed to skip - mark file as ended.
463 */
464 *eof_in = 1;
465 if (state->write_position >= state->read_position) {
466 *eof_out = 1;
467 }
468 }
469
470 return 1;
471 }
472
473
474 /*
475 * Write state->to_write bytes of data from the transfer buffer to stdout.
476 * Returns zero if there was a transient error and we need to return 0 from
477 * pv_transfer, otherwise returns 1.
478 *
479 * Updates state->write_position by moving it on by the number of bytes
480 * written; adds the number of bytes written to state->written; sets
481 * *eof_out on stdout EOF or when the write position catches up with the
482 * read position AND *eof_in is 1 (meaning we've reached the end of data).
483 *
484 * On error, sets *eof_out to 1, sets state->written to -1, and updates
485 * state->exit_status.
486 */
pv__transfer_write(pvstate_t state,int * eof_in,int * eof_out,long * lineswritten)487 static int pv__transfer_write(pvstate_t state,
488 int *eof_in, int *eof_out,
489 long *lineswritten)
490 {
491 ssize_t nwritten;
492
493 signal(SIGALRM, SIG_IGN);
494 alarm(1);
495
496 nwritten = pv__transfer_write_repeated(STDOUT_FILENO,
497 state->transfer_buffer +
498 state->write_position,
499 state->to_write);
500
501 alarm(0);
502
503 if (0 == nwritten) {
504 /*
505 * Write returned 0 - EOF on stdout.
506 */
507 *eof_out = 1;
508 return 1;
509 } else if (nwritten > 0) {
510 /*
511 * Write returned >0 - data successfully written.
512 */
513 if ((state->linemode) && (lineswritten != NULL)) {
514 /*
515 * Guillaume Marcais: use strchr to count \n
516 */
517 unsigned char save;
518 char *ptr;
519 long lines = 0;
520
521 save =
522 state->transfer_buffer[state->write_position +
523 nwritten];
524 state->transfer_buffer[state->write_position +
525 nwritten] = 0;
526 ptr =
527 (char *) (state->transfer_buffer +
528 state->write_position - 1);
529
530 if (state->null) {
531 for (ptr++;
532 ptr -
533 (char *) state->transfer_buffer -
534 state->write_position <
535 (size_t) nwritten; ptr++) {
536 if (*ptr == '\0')
537 ++lines;
538 }
539 } else {
540 while ((ptr =
541 strchr((char *) (ptr + 1), '\n')))
542 ++lines;
543 }
544
545 *lineswritten += lines;
546 state->transfer_buffer[state->write_position +
547 nwritten] = save;
548 }
549
550 state->write_position += nwritten;
551 state->written += nwritten;
552
553 /*
554 * If we're monitoring the output, update our copy of the
555 * last few bytes we've written.
556 */
557 if (((state->components_used & PV_DISPLAY_OUTPUTBUF) != 0)
558 && (nwritten > 0)) {
559 long new_portion_length, old_portion_length;
560
561 new_portion_length = nwritten;
562 if (new_portion_length > state->lastoutput_length)
563 new_portion_length =
564 state->lastoutput_length;
565
566 old_portion_length =
567 state->lastoutput_length - new_portion_length;
568
569 /*
570 * Make room for the new portion.
571 */
572 if (old_portion_length > 0) {
573 memmove(state->lastoutput_buffer,
574 state->lastoutput_buffer +
575 new_portion_length,
576 old_portion_length);
577 }
578
579 /*
580 * Copy the new data in.
581 */
582 memcpy(state->lastoutput_buffer +
583 old_portion_length,
584 state->transfer_buffer +
585 state->write_position - new_portion_length,
586 new_portion_length);
587 }
588
589 /*
590 * If we've written all the data in the buffer, reset the
591 * read pointer to the start, and if the input file is at
592 * EOF, set eof_out as well to indicate that we've written
593 * everything for this input file.
594 */
595 if (state->write_position >= state->read_position) {
596 state->write_position = 0;
597 state->read_position = 0;
598 if (*eof_in)
599 *eof_out = 1;
600 }
601
602 return 1;
603 }
604
605 /*
606 * If we reach this point, nwritten<0, so there was an error.
607 */
608
609 /*
610 * If a write error occurred but it was EINTR or EAGAIN, just wait a
611 * bit and then return zero, since this was a transient error.
612 */
613 if ((EINTR == errno) || (EAGAIN == errno)) {
614 struct timeval tv;
615 tv.tv_sec = 0;
616 tv.tv_usec = 10000;
617 select(0, NULL, NULL, NULL, &tv);
618 return 0;
619 }
620
621 /*
622 * SIGPIPE means we've finished. Don't output an error because it's
623 * not really our error to report.
624 */
625 if (EPIPE == errno) {
626 *eof_in = 1;
627 *eof_out = 1;
628 return 0;
629 }
630
631 pv_error(state, "%s: %s", _("write failed"), strerror(errno));
632 state->exit_status |= 16;
633 *eof_out = 1;
634 state->written = -1;
635
636 return 1;
637 }
638
639
640 /*
641 * Transfer some data from "fd" to standard output, timing out after 9/100
642 * of a second. If state->rate_limit is >0, and/or "allowed" is >0, only up
643 * to "allowed" bytes can be written. The variables that "eof_in" and
644 * "eof_out" point to are used to flag that we've finished reading and
645 * writing respectively.
646 *
647 * Returns the number of bytes written, or negative on error (in which case
648 * state->exit_status is updated). In line mode, the number of lines written
649 * will be put into *lineswritten.
650 */
pv_transfer(pvstate_t state,int fd,int * eof_in,int * eof_out,unsigned long long allowed,long * lineswritten)651 long pv_transfer(pvstate_t state, int fd, int *eof_in, int *eof_out,
652 unsigned long long allowed, long *lineswritten)
653 {
654 struct timeval tv;
655 fd_set readfds;
656 fd_set writefds;
657 int max_fd;
658 int n;
659
660 if (NULL == state)
661 return 0;
662
663 /*
664 * Reinitialise the error skipping variables if the file descriptor
665 * has changed since the last time we were called.
666 */
667 if (fd != state->last_read_skip_fd) {
668 state->last_read_skip_fd = fd;
669 state->read_errors_in_a_row = 0;
670 state->read_error_warning_shown = 0;
671 }
672
673 if (NULL == state->transfer_buffer) {
674 state->buffer_size = state->target_buffer_size;
675 state->transfer_buffer =
676 (unsigned char *) malloc(state->buffer_size + 32);
677 if (NULL == state->transfer_buffer) {
678 pv_error(state, "%s: %s",
679 _("buffer allocation failed"),
680 strerror(errno));
681 state->exit_status |= 64;
682 return -1;
683 }
684 }
685
686 /*
687 * Reallocate the buffer if the buffer size has changed mid-transfer.
688 */
689 if (state->buffer_size < state->target_buffer_size) {
690 unsigned char *newptr;
691 newptr =
692 realloc(state->transfer_buffer,
693 state->target_buffer_size + 32);
694 if (NULL == newptr) {
695 /*
696 * Reset target if realloc failed so we don't keep
697 * trying to realloc over and over.
698 */
699 debug("realloc: %s", strerror(errno));
700 state->target_buffer_size = state->buffer_size;
701 } else {
702 debug("%s: %ld", "buffer resized",
703 state->buffer_size);
704 state->transfer_buffer = newptr;
705 state->buffer_size = state->target_buffer_size;
706 }
707 }
708
709 if ((state->linemode) && (lineswritten != NULL))
710 *lineswritten = 0;
711
712 if ((*eof_in) && (*eof_out))
713 return 0;
714
715 tv.tv_sec = 0;
716 tv.tv_usec = 90000;
717
718 FD_ZERO(&readfds);
719 FD_ZERO(&writefds);
720
721 max_fd = 0;
722
723 /*
724 * If the input file is not at EOF and there's room in the buffer,
725 * look for incoming data from it.
726 */
727 if ((!(*eof_in)) && (state->read_position < state->buffer_size)) {
728 FD_SET(fd, &readfds);
729 if (fd > max_fd)
730 max_fd = fd;
731 }
732
733 /*
734 * Work out how much we're allowed to write, based on the amount of
735 * data left in the buffer. If rate limiting is active or "allowed"
736 * is >0, then this puts an upper limit on how much we're allowed to
737 * write.
738 */
739 state->to_write = state->read_position - state->write_position;
740 if ((state->rate_limit > 0) || (allowed > 0)) {
741 if ((unsigned long long) (state->to_write) > allowed) {
742 state->to_write = allowed;
743 }
744 }
745
746 /*
747 * If we don't think we've finished writing and there's anything
748 * we're allowed to write, look for the stdout becoming writable.
749 */
750 if ((!(*eof_out)) && (state->to_write > 0)) {
751 FD_SET(STDOUT_FILENO, &writefds);
752 if (STDOUT_FILENO > max_fd)
753 max_fd = STDOUT_FILENO;
754 }
755
756 n = select(max_fd + 1, &readfds, &writefds, NULL, &tv);
757
758 if (n < 0) {
759 /*
760 * Ignore transient errors by returning 0 immediately.
761 */
762 if (EINTR == errno)
763 return 0;
764
765 /*
766 * Any other error is a problem and we must report back.
767 */
768 pv_error(state, "%s: %s: %d: %s",
769 state->current_file,
770 _("select call failed"), n, strerror(errno));
771
772 state->exit_status |= 16;
773
774 return -1;
775 }
776
777 state->written = 0;
778
779 /*
780 * If there is data to read, try to read some in. Return early if
781 * there was a transient read error.
782 *
783 * NB this can update state->written because of splice().
784 */
785 if (FD_ISSET(fd, &readfds)) {
786 if (pv__transfer_read
787 (state, fd, eof_in, eof_out, allowed) == 0)
788 return 0;
789 }
790
791 /*
792 * In line mode, only write up to and including the last newline,
793 * so that we're writing output line-by-line.
794 */
795 if ((state->to_write > 0) && (state->linemode) && !(state->null)) {
796 /*
797 * Guillaume Marcais: use strrchr to find last \n
798 */
799 unsigned char save;
800 char *start;
801 char *end;
802
803 save =
804 state->transfer_buffer[state->write_position +
805 state->to_write];
806 state->transfer_buffer[state->write_position +
807 state->to_write] = 0;
808
809 start =
810 (char *) (state->transfer_buffer +
811 state->write_position);
812 end = strrchr(start, '\n');
813 state->transfer_buffer[state->write_position +
814 state->to_write] = save;
815
816 if (end != NULL) {
817 state->to_write = (end - start) + 1;
818 }
819 }
820
821 /*
822 * If there is data to write, and stdout is ready to receive it, and
823 * we didn't use splice() this time, write some data. Return early
824 * if there was a transient write error.
825 */
826 if (FD_ISSET(STDOUT_FILENO, &writefds)
827 #ifdef HAVE_SPLICE
828 && (0 == state->splice_used)
829 #endif /* HAVE_SPLICE */
830 && (state->read_position > state->write_position)
831 && (state->to_write > 0)) {
832 if (pv__transfer_write
833 (state, eof_in, eof_out, lineswritten) == 0)
834 return 0;
835 }
836 #ifdef MAXIMISE_BUFFER_FILL
837 /*
838 * Rotate the written bytes out of the buffer so that it can be
839 * filled up completely by the next read.
840 */
841 if (state->write_position > 0) {
842 if (state->write_position < state->read_position) {
843 memmove(state->transfer_buffer,
844 state->transfer_buffer +
845 state->write_position,
846 state->read_position -
847 state->write_position);
848 state->read_position -= state->write_position;
849 state->write_position = 0;
850 } else {
851 state->write_position = 0;
852 state->read_position = 0;
853 }
854 }
855 #endif /* MAXIMISE_BUFFER_FILL */
856
857 return state->written;
858 }
859
860 /* EOF */
861