1 /* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */
2
3 /* @UNSAFE: whole file */
4
5 #include "lib.h"
6 #include "ioloop.h"
7 #include "write-full.h"
8 #include "net.h"
9 #include "sendfile-util.h"
10 #include "istream.h"
11 #include "istream-private.h"
12 #include "ostream-file-private.h"
13
14 #include <unistd.h>
15 #include <sys/stat.h>
16 #ifdef HAVE_SYS_UIO_H
17 # include <sys/uio.h>
18 #endif
19 #include <fcntl.h>
20
21 /* try to keep the buffer size within 4k..128k. ReiserFS may actually return
22 128k as optimal size. */
23 #define DEFAULT_OPTIMAL_BLOCK_SIZE IO_BLOCK_SIZE
24 #define MAX_OPTIMAL_BLOCK_SIZE (128*1024)
25
26 #define IS_STREAM_EMPTY(fstream) \
27 ((fstream)->head == (fstream)->tail && !(fstream)->full)
28
29 #define MAX_SSIZE_T(size) \
30 ((size) < SSIZE_T_MAX ? (size_t)(size) : SSIZE_T_MAX)
31
32 static void stream_send_io(struct file_ostream *fstream);
33 static struct ostream * o_stream_create_fd_common(int fd,
34 size_t max_buffer_size, bool autoclose_fd);
35
stream_closed(struct file_ostream * fstream)36 static void stream_closed(struct file_ostream *fstream)
37 {
38 io_remove(&fstream->io);
39
40 if (fstream->autoclose_fd && fstream->fd != -1) {
41 /* Ignore ECONNRESET because we don't really care about it here,
42 as we are closing the socket down in any case. There might be
43 unsent data but nothing we can do about that. */
44 if (unlikely(close(fstream->fd) < 0 && errno != ECONNRESET)) {
45 i_error("file_ostream.close(%s) failed: %m",
46 o_stream_get_name(&fstream->ostream.ostream));
47 }
48 }
49 fstream->fd = -1;
50
51 fstream->ostream.ostream.closed = TRUE;
52 }
53
o_stream_file_close(struct iostream_private * stream,bool close_parent ATTR_UNUSED)54 void o_stream_file_close(struct iostream_private *stream,
55 bool close_parent ATTR_UNUSED)
56 {
57 struct file_ostream *fstream =
58 container_of(stream, struct file_ostream, ostream.iostream);
59
60 stream_closed(fstream);
61 }
62
o_stream_file_destroy(struct iostream_private * stream)63 static void o_stream_file_destroy(struct iostream_private *stream)
64 {
65 struct file_ostream *fstream =
66 container_of(stream, struct file_ostream, ostream.iostream);
67
68 i_free(fstream->buffer);
69 }
70
file_buffer_get_used_size(struct file_ostream * fstream)71 static size_t file_buffer_get_used_size(struct file_ostream *fstream)
72 {
73 if (fstream->head == fstream->tail)
74 return fstream->full ? fstream->buffer_size : 0;
75 else if (fstream->head < fstream->tail) {
76 /* ...HXXXT... */
77 return fstream->tail - fstream->head;
78 } else {
79 /* XXXT...HXXX */
80 return fstream->tail +
81 (fstream->buffer_size - fstream->head);
82 }
83 }
84
update_buffer(struct file_ostream * fstream,size_t size)85 static void update_buffer(struct file_ostream *fstream, size_t size)
86 {
87 size_t used;
88
89 if (IS_STREAM_EMPTY(fstream) || size == 0)
90 return;
91
92 if (fstream->head < fstream->tail) {
93 /* ...HXXXT... */
94 used = fstream->tail - fstream->head;
95 i_assert(size <= used);
96 fstream->head += size;
97 } else {
98 /* XXXT...HXXX */
99 used = fstream->buffer_size - fstream->head;
100 if (size > used) {
101 size -= used;
102 i_assert(size <= fstream->tail);
103 fstream->head = size;
104 } else {
105 fstream->head += size;
106 }
107
108 fstream->full = FALSE;
109 }
110
111 if (fstream->head == fstream->tail)
112 fstream->head = fstream->tail = 0;
113
114 if (fstream->head == fstream->buffer_size)
115 fstream->head = 0;
116 }
117
o_stream_socket_cork(struct file_ostream * fstream)118 static void o_stream_socket_cork(struct file_ostream *fstream)
119 {
120 if (fstream->ostream.corked && !fstream->socket_cork_set) {
121 if (!fstream->no_socket_cork) {
122 if (net_set_cork(fstream->fd, TRUE) < 0)
123 fstream->no_socket_cork = TRUE;
124 else
125 fstream->socket_cork_set = TRUE;
126 }
127 }
128 }
129
o_stream_lseek(struct file_ostream * fstream)130 static int o_stream_lseek(struct file_ostream *fstream)
131 {
132 off_t ret;
133
134 if (fstream->real_offset == fstream->buffer_offset)
135 return 0;
136
137 ret = lseek(fstream->fd, (off_t)fstream->buffer_offset, SEEK_SET);
138 if (ret < 0) {
139 io_stream_set_error(&fstream->ostream.iostream,
140 "lseek() failed: %m");
141 fstream->ostream.ostream.stream_errno = errno;
142 return -1;
143 }
144
145 if (ret != (off_t)fstream->buffer_offset) {
146 io_stream_set_error(&fstream->ostream.iostream,
147 "lseek() returned wrong value");
148 fstream->ostream.ostream.stream_errno = EINVAL;
149 return -1;
150 }
151 fstream->real_offset = fstream->buffer_offset;
152 return 0;
153 }
154
o_stream_file_writev(struct file_ostream * fstream,const struct const_iovec * iov,unsigned int iov_count)155 ssize_t o_stream_file_writev(struct file_ostream *fstream,
156 const struct const_iovec *iov,
157 unsigned int iov_count)
158 {
159 ssize_t ret;
160 size_t size, sent;
161 unsigned int i;
162
163 if (iov_count == 1) {
164 i_assert(iov->iov_len > 0);
165
166 if (!fstream->file ||
167 fstream->real_offset == fstream->buffer_offset) {
168 ret = write(fstream->fd, iov->iov_base, iov->iov_len);
169 if (ret > 0)
170 fstream->real_offset += ret;
171 } else {
172 ret = pwrite(fstream->fd, iov->iov_base, iov->iov_len,
173 fstream->buffer_offset);
174 }
175 } else {
176 if (o_stream_lseek(fstream) < 0)
177 return -1;
178
179 sent = 0;
180 while (iov_count > IOV_MAX) {
181 size = 0;
182 for (i = 0; i < IOV_MAX; i++)
183 size += iov[i].iov_len;
184
185 ret = writev(fstream->fd, (const struct iovec *)iov,
186 IOV_MAX);
187 if (ret != (ssize_t)size) {
188 break;
189 }
190
191 fstream->real_offset += ret;
192 fstream->buffer_offset += ret;
193 sent += ret;
194 iov += IOV_MAX;
195 iov_count -= IOV_MAX;
196 }
197
198 if (iov_count <= IOV_MAX) {
199 size = 0;
200 for (i = 0; i < iov_count; i++)
201 size += iov[i].iov_len;
202
203 ret = writev(fstream->fd, (const struct iovec *)iov,
204 iov_count);
205 }
206 if (ret > 0) {
207 fstream->real_offset += ret;
208 ret += sent;
209 } else if (!fstream->file && sent > 0) {
210 /* return what we managed to get sent */
211 ret = sent;
212 }
213 }
214 return ret;
215 }
216
217 static ssize_t
o_stream_file_writev_full(struct file_ostream * fstream,const struct const_iovec * iov,unsigned int iov_count)218 o_stream_file_writev_full(struct file_ostream *fstream,
219 const struct const_iovec *iov,
220 unsigned int iov_count)
221 {
222 ssize_t ret, ret2;
223 size_t size, total_size;
224 bool partial;
225 unsigned int i;
226
227 for (i = 0, total_size = 0; i < iov_count; i++)
228 total_size += iov[i].iov_len;
229
230 o_stream_socket_cork(fstream);
231 ret = fstream->writev(fstream, iov, iov_count);
232 partial = ret != (ssize_t)total_size;
233
234 if (ret < 0) {
235 if (fstream->file) {
236 if (errno == EINTR) {
237 /* automatically retry */
238 return o_stream_file_writev_full(fstream, iov, iov_count);
239 }
240 } else if (errno == EAGAIN || errno == EINTR) {
241 /* try again later */
242 return 0;
243 }
244 fstream->ostream.ostream.stream_errno = errno;
245 stream_closed(fstream);
246 return -1;
247 }
248 if (unlikely(ret == 0 && fstream->file)) {
249 /* assume out of disk space */
250 fstream->ostream.ostream.stream_errno = ENOSPC;
251 stream_closed(fstream);
252 return -1;
253 }
254 fstream->buffer_offset += ret;
255 if (partial && fstream->file) {
256 /* we failed to write everything to a file. either we ran out
257 of disk space or we're writing to NFS. try to write the
258 rest to resolve this. */
259 size = ret;
260 while (iov_count > 0 && size >= iov->iov_len) {
261 size -= iov->iov_len;
262 iov++;
263 iov_count--;
264 }
265 i_assert(iov_count > 0);
266 if (size == 0)
267 ret2 = o_stream_file_writev_full(fstream, iov, iov_count);
268 else {
269 /* write the first iov separately */
270 struct const_iovec new_iov;
271
272 new_iov.iov_base =
273 CONST_PTR_OFFSET(iov->iov_base, size);
274 new_iov.iov_len = iov->iov_len - size;
275 ret2 = o_stream_file_writev_full(fstream, &new_iov, 1);
276 if (ret2 > 0) {
277 i_assert((size_t)ret2 == new_iov.iov_len);
278 /* write the rest */
279 if (iov_count > 1) {
280 ret += ret2;
281 ret2 = o_stream_file_writev_full(fstream, iov + 1,
282 iov_count - 1);
283 }
284 }
285 }
286 i_assert(ret2 != 0);
287 if (ret2 < 0)
288 ret = ret2;
289 else
290 ret += ret2;
291 }
292 i_assert(ret < 0 || !fstream->file ||
293 (size_t)ret == total_size);
294 return ret;
295 }
296
297 /* returns how much of vector was used */
o_stream_fill_iovec(struct file_ostream * fstream,struct const_iovec iov[2])298 static int o_stream_fill_iovec(struct file_ostream *fstream,
299 struct const_iovec iov[2])
300 {
301 if (IS_STREAM_EMPTY(fstream))
302 return 0;
303
304 if (fstream->head < fstream->tail) {
305 iov[0].iov_base = fstream->buffer + fstream->head;
306 iov[0].iov_len = fstream->tail - fstream->head;
307 return 1;
308 } else {
309 iov[0].iov_base = fstream->buffer + fstream->head;
310 iov[0].iov_len = fstream->buffer_size - fstream->head;
311 if (fstream->tail == 0)
312 return 1;
313 else {
314 iov[1].iov_base = fstream->buffer;
315 iov[1].iov_len = fstream->tail;
316 return 2;
317 }
318 }
319 }
320
buffer_flush(struct file_ostream * fstream)321 static int buffer_flush(struct file_ostream *fstream)
322 {
323 struct const_iovec iov[2];
324 int iov_len;
325 ssize_t ret;
326
327 iov_len = o_stream_fill_iovec(fstream, iov);
328 if (iov_len > 0) {
329 ret = o_stream_file_writev_full(fstream, iov, iov_len);
330 if (ret < 0)
331 return -1;
332
333 update_buffer(fstream, ret);
334 }
335
336 return IS_STREAM_EMPTY(fstream) ? 1 : 0;
337 }
338
o_stream_tcp_flush_via_nodelay(struct file_ostream * fstream)339 static void o_stream_tcp_flush_via_nodelay(struct file_ostream *fstream)
340 {
341 if (net_set_tcp_nodelay(fstream->fd, TRUE) < 0) {
342 /* Don't bother logging errors. There are quite a lot of
343 different errors that need to be ignored, and it differs
344 between OSes. At least:
345 Linux: ENOTSUP, ENOTSOCK, ENOPROTOOPT
346 FreeBSD: EINVAL, ECONNRESET */
347 fstream->no_socket_nodelay = TRUE;
348 } else if (net_set_tcp_nodelay(fstream->fd, FALSE) < 0) {
349 /* We already successfully enabled TCP_NODELAY, so there
350 shouldn't really be errors. Except ECONNRESET can possibly
351 still happen between these two calls, so again don't log
352 errors. */
353 fstream->no_socket_nodelay = TRUE;
354 }
355 }
356
o_stream_file_cork(struct ostream_private * stream,bool set)357 static void o_stream_file_cork(struct ostream_private *stream, bool set)
358 {
359 struct file_ostream *fstream =
360 container_of(stream, struct file_ostream, ostream);
361 struct iostream_private *iostream = &fstream->ostream.iostream;
362 int ret;
363
364 if (stream->corked != set && !stream->ostream.closed) {
365 if (set && fstream->io != NULL)
366 io_remove(&fstream->io);
367 else if (!set) {
368 /* buffer flushing might close the stream */
369 ret = buffer_flush(fstream);
370 stream->last_errors_not_checked = TRUE;
371 if (fstream->io == NULL &&
372 (ret == 0 || fstream->flush_pending) &&
373 !stream->ostream.closed) {
374 fstream->io = io_add_to(
375 io_stream_get_ioloop(iostream),
376 fstream->fd, IO_WRITE,
377 stream_send_io, fstream);
378 }
379 }
380 if (stream->ostream.closed) {
381 /* flushing may have closed the stream already */
382 return;
383 }
384
385 if (fstream->socket_cork_set) {
386 i_assert(!set);
387 if (net_set_cork(fstream->fd, FALSE) < 0)
388 fstream->no_socket_cork = TRUE;
389 fstream->socket_cork_set = FALSE;
390 }
391 if (!set && !fstream->no_socket_nodelay) {
392 /* Uncorking - send all the pending data immediately.
393 Remove nodelay immediately afterwards, so if any
394 output is sent outside corking it may get delayed. */
395 o_stream_tcp_flush_via_nodelay(fstream);
396 }
397 if (!set && !fstream->no_socket_quickack) {
398 /* Uncorking - disable delayed ACKs to reduce latency.
399 Note that this needs to be set repeatedly. */
400 if (net_set_tcp_quickack(fstream->fd, TRUE) < 0)
401 fstream->no_socket_quickack = TRUE;
402 }
403 stream->corked = set;
404 }
405 }
406
o_stream_file_flush(struct ostream_private * stream)407 static int o_stream_file_flush(struct ostream_private *stream)
408 {
409 struct file_ostream *fstream =
410 container_of(stream, struct file_ostream, ostream);
411
412 return buffer_flush(fstream);
413 }
414
415 static void
o_stream_file_flush_pending(struct ostream_private * stream,bool set)416 o_stream_file_flush_pending(struct ostream_private *stream, bool set)
417 {
418 struct file_ostream *fstream =
419 container_of(stream, struct file_ostream, ostream);
420 struct iostream_private *iostream = &fstream->ostream.iostream;
421
422 fstream->flush_pending = set;
423 if (set && !stream->corked && fstream->io == NULL) {
424 fstream->io = io_add_to(io_stream_get_ioloop(iostream),
425 fstream->fd, IO_WRITE,
426 stream_send_io, fstream);
427 }
428 }
429
get_unused_space(const struct file_ostream * fstream)430 static size_t get_unused_space(const struct file_ostream *fstream)
431 {
432 if (fstream->head > fstream->tail) {
433 /* XXXT...HXXX */
434 return fstream->head - fstream->tail;
435 } else if (fstream->head < fstream->tail) {
436 /* ...HXXXT... */
437 return (fstream->buffer_size - fstream->tail) + fstream->head;
438 } else {
439 /* either fully unused or fully used */
440 return fstream->full ? 0 : fstream->buffer_size;
441 }
442 }
443
444 static size_t
o_stream_file_get_buffer_used_size(const struct ostream_private * stream)445 o_stream_file_get_buffer_used_size(const struct ostream_private *stream)
446 {
447 const struct file_ostream *fstream =
448 container_of(stream, const struct file_ostream, ostream);
449
450 return fstream->buffer_size - get_unused_space(fstream);
451 }
452
o_stream_file_seek(struct ostream_private * stream,uoff_t offset)453 static int o_stream_file_seek(struct ostream_private *stream, uoff_t offset)
454 {
455 struct file_ostream *fstream =
456 container_of(stream, struct file_ostream, ostream);
457
458 if (offset > OFF_T_MAX) {
459 stream->ostream.stream_errno = EINVAL;
460 return -1;
461 }
462 if (!fstream->file) {
463 stream->ostream.stream_errno = ESPIPE;
464 return -1;
465 }
466
467 if (buffer_flush(fstream) < 0)
468 return -1;
469
470 stream->ostream.offset = offset;
471 fstream->buffer_offset = offset;
472 return 1;
473 }
474
o_stream_grow_buffer(struct file_ostream * fstream,size_t bytes)475 static void o_stream_grow_buffer(struct file_ostream *fstream, size_t bytes)
476 {
477 size_t size, new_size, end_size;
478
479 size = nearest_power(fstream->buffer_size + bytes);
480 if (size > fstream->ostream.max_buffer_size) {
481 /* limit the size */
482 size = fstream->ostream.max_buffer_size;
483 } else if (fstream->ostream.corked) {
484 /* try to use optimal buffer size with corking */
485 new_size = I_MIN(fstream->optimal_block_size,
486 fstream->ostream.max_buffer_size);
487 if (new_size > size)
488 size = new_size;
489 }
490
491 if (size <= fstream->buffer_size)
492 return;
493
494 fstream->buffer = i_realloc(fstream->buffer,
495 fstream->buffer_size, size);
496
497 if (fstream->tail <= fstream->head && !IS_STREAM_EMPTY(fstream)) {
498 /* move head forward to end of buffer */
499 end_size = fstream->buffer_size - fstream->head;
500 memmove(fstream->buffer + size - end_size,
501 fstream->buffer + fstream->head, end_size);
502 fstream->head = size - end_size;
503 }
504
505 fstream->full = FALSE;
506 fstream->buffer_size = size;
507 }
508
stream_send_io(struct file_ostream * fstream)509 static void stream_send_io(struct file_ostream *fstream)
510 {
511 struct ostream *ostream = &fstream->ostream.ostream;
512 struct iostream_private *iostream = &fstream->ostream.iostream;
513 bool use_cork = !fstream->ostream.corked;
514 int ret;
515
516 /* Set flush_pending = FALSE first before calling the flush callback,
517 and change it to TRUE only if callback returns 0. That way the
518 callback can call o_stream_set_flush_pending() again and we don't
519 forget it even if flush callback returns 1. */
520 fstream->flush_pending = FALSE;
521
522 o_stream_ref(ostream);
523 if (use_cork)
524 o_stream_cork(ostream);
525 if (fstream->ostream.callback != NULL)
526 ret = fstream->ostream.callback(fstream->ostream.context);
527 else
528 ret = o_stream_file_flush(&fstream->ostream);
529 if (use_cork)
530 o_stream_uncork(ostream);
531
532 if (ret == 0)
533 fstream->flush_pending = TRUE;
534
535 if (!fstream->flush_pending && IS_STREAM_EMPTY(fstream)) {
536 io_remove(&fstream->io);
537 } else if (!fstream->ostream.ostream.closed) {
538 /* Add the IO handler if it's not there already. Callback
539 might have just returned 0 without there being any data
540 to be sent. */
541 if (fstream->io == NULL) {
542 fstream->io = io_add_to(io_stream_get_ioloop(iostream),
543 fstream->fd, IO_WRITE,
544 stream_send_io, fstream);
545 }
546 }
547
548 o_stream_unref(&ostream);
549 }
550
o_stream_add(struct file_ostream * fstream,const void * data,size_t size)551 static size_t o_stream_add(struct file_ostream *fstream,
552 const void *data, size_t size)
553 {
554 struct iostream_private *iostream = &fstream->ostream.iostream;
555 size_t unused, sent;
556 int i;
557
558 unused = get_unused_space(fstream);
559 if (unused < size)
560 o_stream_grow_buffer(fstream, size-unused);
561
562 sent = 0;
563 for (i = 0; i < 2 && sent < size && !fstream->full; i++) {
564 unused = fstream->tail >= fstream->head ?
565 fstream->buffer_size - fstream->tail :
566 fstream->head - fstream->tail;
567
568 if (unused > size-sent)
569 unused = size-sent;
570 memcpy(fstream->buffer + fstream->tail,
571 CONST_PTR_OFFSET(data, sent), unused);
572 sent += unused;
573
574 fstream->tail += unused;
575 if (fstream->tail == fstream->buffer_size)
576 fstream->tail = 0;
577
578 if (fstream->head == fstream->tail &&
579 fstream->buffer_size > 0)
580 fstream->full = TRUE;
581 }
582
583 if (sent != 0 && fstream->io == NULL &&
584 !fstream->ostream.corked && !fstream->file) {
585 fstream->io = io_add_to(io_stream_get_ioloop(iostream),
586 fstream->fd, IO_WRITE, stream_send_io,
587 fstream);
588 }
589
590 return sent;
591 }
592
o_stream_file_sendv(struct ostream_private * stream,const struct const_iovec * iov,unsigned int iov_count)593 ssize_t o_stream_file_sendv(struct ostream_private *stream,
594 const struct const_iovec *iov,
595 unsigned int iov_count)
596 {
597 struct file_ostream *fstream =
598 container_of(stream, struct file_ostream, ostream);
599 size_t size, total_size, added, optimal_size;
600 unsigned int i;
601 ssize_t ret = 0;
602
603 for (i = 0, size = 0; i < iov_count; i++)
604 size += iov[i].iov_len;
605 total_size = size;
606
607 if (size > get_unused_space(fstream) && !IS_STREAM_EMPTY(fstream)) {
608 if (o_stream_file_flush(stream) < 0)
609 return -1;
610 }
611
612 optimal_size = I_MIN(fstream->optimal_block_size,
613 fstream->ostream.max_buffer_size);
614 if (IS_STREAM_EMPTY(fstream) &&
615 (!stream->corked || size >= optimal_size)) {
616 /* send immediately */
617 ret = o_stream_file_writev_full(fstream, iov, iov_count);
618 if (ret < 0)
619 return -1;
620
621 size = ret;
622 while (size > 0 && iov_count > 0 && size >= iov[0].iov_len) {
623 size -= iov[0].iov_len;
624 iov++;
625 iov_count--;
626 }
627
628 if (iov_count == 0)
629 i_assert(size == 0);
630 else {
631 added = o_stream_add(fstream,
632 CONST_PTR_OFFSET(iov[0].iov_base, size),
633 iov[0].iov_len - size);
634 ret += added;
635
636 if (added != iov[0].iov_len - size) {
637 /* buffer full */
638 stream->ostream.offset += ret;
639 return ret;
640 }
641
642 iov++;
643 iov_count--;
644 }
645 }
646
647 /* buffer it, at least partly */
648 for (i = 0; i < iov_count; i++) {
649 added = o_stream_add(fstream, iov[i].iov_base, iov[i].iov_len);
650 ret += added;
651 if (added != iov[i].iov_len)
652 break;
653 }
654 stream->ostream.offset += ret;
655 i_assert((size_t)ret <= total_size);
656 i_assert((size_t)ret == total_size || !fstream->file);
657 return ret;
658 }
659
660 static size_t
o_stream_file_update_buffer(struct file_ostream * fstream,const void * data,size_t size,size_t pos)661 o_stream_file_update_buffer(struct file_ostream *fstream,
662 const void *data, size_t size, size_t pos)
663 {
664 size_t avail, copy_size;
665
666 if (fstream->head < fstream->tail) {
667 /* ...HXXXT... */
668 i_assert(pos < fstream->tail);
669 avail = fstream->tail - pos;
670 } else {
671 /* XXXT...HXXX */
672 avail = fstream->buffer_size - pos;
673 }
674 copy_size = I_MIN(size, avail);
675 memcpy(fstream->buffer + pos, data, copy_size);
676 data = CONST_PTR_OFFSET(data, copy_size);
677 size -= copy_size;
678
679 if (size > 0 && fstream->head >= fstream->tail) {
680 /* wraps to beginning of the buffer */
681 copy_size = I_MIN(size, fstream->tail);
682 memcpy(fstream->buffer, data, copy_size);
683 size -= copy_size;
684 }
685 return size;
686 }
687
688 static int
o_stream_file_write_at(struct ostream_private * stream,const void * data,size_t size,uoff_t offset)689 o_stream_file_write_at(struct ostream_private *stream,
690 const void *data, size_t size, uoff_t offset)
691 {
692 struct file_ostream *fstream =
693 container_of(stream, struct file_ostream, ostream);
694 size_t used, pos, skip, left;
695
696 /* update buffer if the write overlaps it */
697 used = file_buffer_get_used_size(fstream);
698 if (used > 0 &&
699 fstream->buffer_offset < offset + size &&
700 fstream->buffer_offset + used > offset) {
701 if (fstream->buffer_offset <= offset) {
702 /* updating from the beginning */
703 skip = 0;
704 } else {
705 skip = fstream->buffer_offset - offset;
706 }
707 pos = (fstream->head + offset + skip - fstream->buffer_offset) %
708 fstream->buffer_size;
709 left = o_stream_file_update_buffer(fstream,
710 CONST_PTR_OFFSET(data, skip), size - skip, pos);
711 if (left > 0) {
712 /* didn't write all of it */
713 if (skip > 0) {
714 /* we also have to write a prefix. don't
715 bother with two syscalls, just write all
716 of it in one pwrite(). */
717 } else {
718 /* write only the suffix */
719 size_t update_count = size - left;
720
721 data = CONST_PTR_OFFSET(data, update_count);
722 size -= update_count;
723 offset += update_count;
724 }
725 } else if (skip == 0) {
726 /* everything done */
727 return 0;
728 } else {
729 /* still have to write prefix */
730 size = skip;
731 }
732 }
733
734 /* we couldn't write everything to the buffer. flush the buffer
735 and pwrite() the rest. */
736 if (o_stream_file_flush(stream) < 0)
737 return -1;
738
739 if (pwrite_full(fstream->fd, data, size, offset) < 0) {
740 stream->ostream.stream_errno = errno;
741 stream_closed(fstream);
742 return -1;
743 }
744 return 0;
745 }
746
747 static bool
io_stream_sendfile(struct ostream_private * outstream,struct istream * instream,int in_fd,enum ostream_send_istream_result * res_r)748 io_stream_sendfile(struct ostream_private *outstream,
749 struct istream *instream, int in_fd,
750 enum ostream_send_istream_result *res_r)
751 {
752 struct file_ostream *foutstream =
753 container_of(outstream, struct file_ostream, ostream);
754 uoff_t in_size, offset, send_size, v_offset, abs_start_offset;
755 ssize_t ret;
756 bool sendfile_not_supported = FALSE;
757
758 if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0) {
759 *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
760 return TRUE;
761 }
762 if (ret == 0) {
763 /* size unknown. we can't use sendfile(). */
764 return FALSE;
765 }
766
767 o_stream_socket_cork(foutstream);
768
769 /* flush out any data in buffer */
770 if ((ret = buffer_flush(foutstream)) < 0) {
771 *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
772 return TRUE;
773 } else if (ret == 0) {
774 *res_r = OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
775 return TRUE;
776 }
777
778 if (o_stream_lseek(foutstream) < 0) {
779 *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
780 return TRUE;
781 }
782
783 v_offset = instream->v_offset;
784 abs_start_offset = i_stream_get_absolute_offset(instream) - v_offset;
785 while (v_offset < in_size) {
786 offset = abs_start_offset + v_offset;
787 send_size = in_size - v_offset;
788
789 ret = safe_sendfile(foutstream->fd, in_fd, &offset,
790 MAX_SSIZE_T(send_size));
791 if (ret <= 0) {
792 if (ret == 0) {
793 /* Unexpectedly early EOF at input */
794 i_stream_seek(instream, v_offset);
795 instream->eof = TRUE;
796 *res_r = OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
797 return TRUE;
798 }
799 if (foutstream->file) {
800 if (errno == EINTR) {
801 /* automatically retry */
802 continue;
803 }
804 } else {
805 if (errno == EINTR || errno == EAGAIN) {
806 ret = 0;
807 break;
808 }
809 }
810 if (errno == EINVAL)
811 sendfile_not_supported = TRUE;
812 else {
813 io_stream_set_error(&outstream->iostream,
814 "sendfile() failed: %m");
815 outstream->ostream.stream_errno = errno;
816 /* close only if error wasn't because
817 sendfile() isn't supported */
818 stream_closed(foutstream);
819 }
820 break;
821 }
822
823 v_offset += ret;
824 foutstream->real_offset += ret;
825 foutstream->buffer_offset += ret;
826 outstream->ostream.offset += ret;
827 }
828
829 i_stream_seek(instream, v_offset);
830 if (v_offset == in_size) {
831 instream->eof = TRUE;
832 *res_r = OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
833 return TRUE;
834 }
835 i_assert(ret <= 0);
836 if (sendfile_not_supported)
837 return FALSE;
838 if (ret < 0)
839 *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
840 else
841 *res_r = OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
842 return TRUE;
843 }
844
845 static enum ostream_send_istream_result
io_stream_copy_backwards(struct ostream_private * outstream,struct istream * instream,uoff_t in_size)846 io_stream_copy_backwards(struct ostream_private *outstream,
847 struct istream *instream, uoff_t in_size)
848 {
849 struct file_ostream *foutstream =
850 container_of(outstream, struct file_ostream, ostream);
851 uoff_t in_start_offset, in_offset, in_limit, out_offset;
852 const unsigned char *data;
853 size_t buffer_size, size, read_size;
854 ssize_t ret;
855
856 i_assert(IS_STREAM_EMPTY(foutstream));
857
858 /* figure out optimal buffer size */
859 buffer_size = instream->real_stream->buffer_size;
860 if (buffer_size == 0 || buffer_size > foutstream->buffer_size) {
861 if (foutstream->optimal_block_size > foutstream->buffer_size) {
862 o_stream_grow_buffer(foutstream,
863 foutstream->optimal_block_size -
864 foutstream->buffer_size);
865 }
866
867 buffer_size = foutstream->buffer_size;
868 }
869
870 in_start_offset = instream->v_offset;
871 in_offset = in_limit = in_size;
872 out_offset = outstream->ostream.offset + (in_offset - in_start_offset);
873
874 while (in_offset > in_start_offset) {
875 if (in_offset - in_start_offset <= buffer_size)
876 read_size = in_offset - in_start_offset;
877 else
878 read_size = buffer_size;
879 in_offset -= read_size;
880 out_offset -= read_size;
881
882 for (;;) {
883 i_assert(in_offset <= in_limit);
884
885 i_stream_seek(instream, in_offset);
886 read_size = in_limit - in_offset;
887
888 /* FIXME: something's wrong here */
889 if (i_stream_read_bytes(instream, &data, &size,
890 read_size) == 0)
891 i_unreached();
892 if (size >= read_size) {
893 size = read_size;
894 if (instream->mmaped) {
895 /* we'll have to write it through
896 buffer or the file gets corrupted */
897 i_assert(size <=
898 foutstream->buffer_size);
899 memcpy(foutstream->buffer, data, size);
900 data = foutstream->buffer;
901 }
902 break;
903 }
904
905 /* buffer too large probably, try with smaller */
906 read_size -= size;
907 in_offset += read_size;
908 out_offset += read_size;
909 buffer_size -= read_size;
910 }
911 in_limit -= size;
912
913 ret = pwrite_full(foutstream->fd, data, size, out_offset);
914 if (ret < 0) {
915 /* error */
916 outstream->ostream.stream_errno = errno;
917 return OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
918 }
919 i_stream_skip(instream, size);
920 }
921 /* make it visible that we're at instream's EOF */
922 i_stream_seek(instream, in_size);
923 instream->eof = TRUE;
924
925 outstream->ostream.offset += in_size - in_start_offset;
926 return OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
927 }
928
929 static enum ostream_send_istream_result
io_stream_copy_same_stream(struct ostream_private * outstream,struct istream * instream)930 io_stream_copy_same_stream(struct ostream_private *outstream,
931 struct istream *instream)
932 {
933 uoff_t in_size;
934 off_t in_abs_offset, ret = 0;
935
936 /* copying data within same fd. we'll have to be careful with
937 seeks and overlapping writes. */
938 if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0)
939 return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
940 if (ret == 0) {
941 /* if we couldn't find out the size, it means that instream
942 isn't a regular file_istream. we can be reasonably sure that
943 we can copy it safely the regular way. (there's really no
944 other possibility, other than failing completely.) */
945 return io_stream_copy(&outstream->ostream, instream);
946 }
947 i_assert(instream->v_offset <= in_size);
948
949 in_abs_offset = i_stream_get_absolute_offset(instream);
950 ret = (off_t)outstream->ostream.offset - in_abs_offset;
951 if (ret == 0) {
952 /* copying data over itself. we don't really
953 need to do that, just fake it. */
954 return OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
955 }
956 if (ret > 0 && in_size > (uoff_t)ret) {
957 /* overlapping */
958 i_assert(instream->seekable);
959 return io_stream_copy_backwards(outstream, instream, in_size);
960 } else {
961 /* non-overlapping */
962 return io_stream_copy(&outstream->ostream, instream);
963 }
964 }
965
966 static enum ostream_send_istream_result
o_stream_file_send_istream(struct ostream_private * outstream,struct istream * instream)967 o_stream_file_send_istream(struct ostream_private *outstream,
968 struct istream *instream)
969 {
970 struct file_ostream *foutstream =
971 container_of(outstream, struct file_ostream, ostream);
972 bool same_stream;
973 int in_fd;
974 enum ostream_send_istream_result res;
975
976 in_fd = !instream->readable_fd ? -1 : i_stream_get_fd(instream);
977 if (!foutstream->no_sendfile && in_fd != -1 &&
978 in_fd != foutstream->fd && instream->seekable) {
979 if (io_stream_sendfile(outstream, instream, in_fd, &res))
980 return res;
981
982 /* sendfile() not supported (with this fd), fallback to
983 regular sending. */
984 foutstream->no_sendfile = TRUE;
985 }
986
987 same_stream = i_stream_get_fd(instream) == foutstream->fd &&
988 foutstream->fd != -1;
989 if (!same_stream)
990 return io_stream_copy(&outstream->ostream, instream);
991 return io_stream_copy_same_stream(outstream, instream);
992 }
993
o_stream_file_switch_ioloop_to(struct ostream_private * stream,struct ioloop * ioloop)994 static void o_stream_file_switch_ioloop_to(struct ostream_private *stream,
995 struct ioloop *ioloop)
996 {
997 struct file_ostream *fstream =
998 container_of(stream, struct file_ostream, ostream);
999
1000 if (fstream->io != NULL)
1001 fstream->io = io_loop_move_io_to(ioloop, &fstream->io);
1002 }
1003
1004 struct ostream *
o_stream_create_file_common(struct file_ostream * fstream,int fd,size_t max_buffer_size,bool autoclose_fd)1005 o_stream_create_file_common(struct file_ostream *fstream,
1006 int fd, size_t max_buffer_size, bool autoclose_fd)
1007 {
1008 struct ostream *ostream;
1009
1010 fstream->fd = fd;
1011 fstream->autoclose_fd = autoclose_fd;
1012 fstream->optimal_block_size = DEFAULT_OPTIMAL_BLOCK_SIZE;
1013
1014 fstream->ostream.iostream.close = o_stream_file_close;
1015 fstream->ostream.iostream.destroy = o_stream_file_destroy;
1016
1017 fstream->ostream.cork = o_stream_file_cork;
1018 fstream->ostream.flush = o_stream_file_flush;
1019 fstream->ostream.flush_pending = o_stream_file_flush_pending;
1020 fstream->ostream.get_buffer_used_size =
1021 o_stream_file_get_buffer_used_size;
1022 fstream->ostream.seek = o_stream_file_seek;
1023 fstream->ostream.sendv = o_stream_file_sendv;
1024 fstream->ostream.write_at = o_stream_file_write_at;
1025 fstream->ostream.send_istream = o_stream_file_send_istream;
1026 fstream->ostream.switch_ioloop_to = o_stream_file_switch_ioloop_to;
1027
1028 fstream->writev = o_stream_file_writev;
1029
1030 fstream->ostream.max_buffer_size = max_buffer_size;
1031 ostream = o_stream_create(&fstream->ostream, NULL, fd);
1032
1033 if (max_buffer_size == 0)
1034 fstream->ostream.max_buffer_size = fstream->optimal_block_size;
1035
1036 return ostream;
1037 }
1038
fstream_init_file(struct file_ostream * fstream)1039 static void fstream_init_file(struct file_ostream *fstream)
1040 {
1041 struct stat st;
1042
1043 fstream->no_sendfile = TRUE;
1044 if (fstat(fstream->fd, &st) < 0)
1045 return;
1046
1047 if ((uoff_t)st.st_blksize > fstream->optimal_block_size) {
1048 /* use the optimal block size, but with a reasonable limit */
1049 fstream->optimal_block_size =
1050 I_MIN(st.st_blksize, MAX_OPTIMAL_BLOCK_SIZE);
1051 }
1052
1053 if (S_ISREG(st.st_mode)) {
1054 fstream->no_socket_cork = TRUE;
1055 fstream->no_socket_nodelay = TRUE;
1056 fstream->no_socket_quickack = TRUE;
1057 fstream->file = TRUE;
1058 }
1059 }
1060
1061 static
o_stream_create_fd_common(int fd,size_t max_buffer_size,bool autoclose_fd)1062 struct ostream * o_stream_create_fd_common(int fd, size_t max_buffer_size,
1063 bool autoclose_fd)
1064 {
1065 struct file_ostream *fstream;
1066 struct ostream *ostream;
1067 off_t offset;
1068
1069 fstream = i_new(struct file_ostream, 1);
1070 ostream = o_stream_create_file_common
1071 (fstream, fd, max_buffer_size, autoclose_fd);
1072
1073 offset = lseek(fd, 0, SEEK_CUR);
1074 if (offset >= 0) {
1075 ostream->offset = offset;
1076 fstream->real_offset = offset;
1077 fstream->buffer_offset = offset;
1078 fstream_init_file(fstream);
1079 } else {
1080 struct ip_addr local_ip;
1081
1082 if (net_getsockname(fd, &local_ip, NULL) < 0) {
1083 /* not a socket */
1084 fstream->no_sendfile = TRUE;
1085 fstream->no_socket_cork = TRUE;
1086 fstream->no_socket_nodelay = TRUE;
1087 fstream->no_socket_quickack = TRUE;
1088 } else if (local_ip.family == 0) {
1089 /* UNIX domain socket */
1090 fstream->no_socket_cork = TRUE;
1091 fstream->no_socket_nodelay = TRUE;
1092 fstream->no_socket_quickack = TRUE;
1093 }
1094 }
1095
1096 return ostream;
1097 }
1098
1099 struct ostream *
o_stream_create_fd(int fd,size_t max_buffer_size)1100 o_stream_create_fd(int fd, size_t max_buffer_size)
1101 {
1102 return o_stream_create_fd_common(fd, max_buffer_size, FALSE);
1103 }
1104
1105 struct ostream *
o_stream_create_fd_autoclose(int * fd,size_t max_buffer_size)1106 o_stream_create_fd_autoclose(int *fd, size_t max_buffer_size)
1107 {
1108 struct ostream *ostream = o_stream_create_fd_common(*fd,
1109 max_buffer_size, TRUE);
1110 *fd = -1;
1111 return ostream;
1112 }
1113
1114 struct ostream *
o_stream_create_fd_file(int fd,uoff_t offset,bool autoclose_fd)1115 o_stream_create_fd_file(int fd, uoff_t offset, bool autoclose_fd)
1116 {
1117 struct file_ostream *fstream;
1118 struct ostream *ostream;
1119
1120 if (offset == UOFF_T_MAX)
1121 offset = lseek(fd, 0, SEEK_CUR);
1122
1123 fstream = i_new(struct file_ostream, 1);
1124 ostream = o_stream_create_file_common(fstream, fd, 0, autoclose_fd);
1125 fstream_init_file(fstream);
1126 fstream->real_offset = offset;
1127 fstream->buffer_offset = offset;
1128 ostream->blocking = fstream->file;
1129 ostream->offset = offset;
1130 return ostream;
1131 }
1132
o_stream_create_fd_file_autoclose(int * fd,uoff_t offset)1133 struct ostream *o_stream_create_fd_file_autoclose(int *fd, uoff_t offset)
1134 {
1135 struct ostream *output;
1136
1137 output = o_stream_create_fd_file(*fd, offset, TRUE);
1138 *fd = -1;
1139 return output;
1140 }
1141
o_stream_create_file(const char * path,uoff_t offset,mode_t mode,enum ostream_create_file_flags flags)1142 struct ostream *o_stream_create_file(const char *path, uoff_t offset, mode_t mode,
1143 enum ostream_create_file_flags flags)
1144 {
1145 int fd;
1146 int open_flags = O_WRONLY|O_CREAT;
1147 if (HAS_ANY_BITS(flags, OSTREAM_CREATE_FILE_FLAG_APPEND))
1148 open_flags |= O_APPEND;
1149 else
1150 open_flags |= O_TRUNC;
1151 if ((fd = open(path, open_flags, mode)) < 0)
1152 return o_stream_create_error(errno);
1153 return o_stream_create_fd_file_autoclose(&fd, offset);
1154 }
1155