1 /* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */
2 
3 /* @UNSAFE: whole file */
4 
5 #include "lib.h"
6 #include "ioloop.h"
7 #include "write-full.h"
8 #include "net.h"
9 #include "sendfile-util.h"
10 #include "istream.h"
11 #include "istream-private.h"
12 #include "ostream-file-private.h"
13 
14 #include <unistd.h>
15 #include <sys/stat.h>
16 #ifdef HAVE_SYS_UIO_H
17 #  include <sys/uio.h>
18 #endif
19 #include <fcntl.h>
20 
21 /* try to keep the buffer size within 4k..128k. ReiserFS may actually return
22    128k as optimal size. */
23 #define DEFAULT_OPTIMAL_BLOCK_SIZE IO_BLOCK_SIZE
24 #define MAX_OPTIMAL_BLOCK_SIZE (128*1024)
25 
26 #define IS_STREAM_EMPTY(fstream) \
27 	((fstream)->head == (fstream)->tail && !(fstream)->full)
28 
29 #define MAX_SSIZE_T(size) \
30 	((size) < SSIZE_T_MAX ? (size_t)(size) : SSIZE_T_MAX)
31 
32 static void stream_send_io(struct file_ostream *fstream);
33 static struct ostream * o_stream_create_fd_common(int fd,
34 		size_t max_buffer_size, bool autoclose_fd);
35 
stream_closed(struct file_ostream * fstream)36 static void stream_closed(struct file_ostream *fstream)
37 {
38 	io_remove(&fstream->io);
39 
40 	if (fstream->autoclose_fd && fstream->fd != -1) {
41 		/* Ignore ECONNRESET because we don't really care about it here,
42 		   as we are closing the socket down in any case. There might be
43 		   unsent data but nothing we can do about that. */
44 		if (unlikely(close(fstream->fd) < 0 && errno != ECONNRESET)) {
45 			i_error("file_ostream.close(%s) failed: %m",
46 				o_stream_get_name(&fstream->ostream.ostream));
47 		}
48 	}
49 	fstream->fd = -1;
50 
51 	fstream->ostream.ostream.closed = TRUE;
52 }
53 
o_stream_file_close(struct iostream_private * stream,bool close_parent ATTR_UNUSED)54 void o_stream_file_close(struct iostream_private *stream,
55 				bool close_parent ATTR_UNUSED)
56 {
57 	struct file_ostream *fstream =
58 		container_of(stream, struct file_ostream, ostream.iostream);
59 
60 	stream_closed(fstream);
61 }
62 
o_stream_file_destroy(struct iostream_private * stream)63 static void o_stream_file_destroy(struct iostream_private *stream)
64 {
65 	struct file_ostream *fstream =
66 		container_of(stream, struct file_ostream, ostream.iostream);
67 
68 	i_free(fstream->buffer);
69 }
70 
file_buffer_get_used_size(struct file_ostream * fstream)71 static size_t file_buffer_get_used_size(struct file_ostream *fstream)
72 {
73 	if (fstream->head == fstream->tail)
74 		return fstream->full ? fstream->buffer_size : 0;
75 	else if (fstream->head < fstream->tail) {
76 		/* ...HXXXT... */
77 		return fstream->tail - fstream->head;
78 	} else {
79 		/* XXXT...HXXX */
80 		return fstream->tail +
81 			(fstream->buffer_size - fstream->head);
82 	}
83 }
84 
update_buffer(struct file_ostream * fstream,size_t size)85 static void update_buffer(struct file_ostream *fstream, size_t size)
86 {
87 	size_t used;
88 
89 	if (IS_STREAM_EMPTY(fstream) || size == 0)
90 		return;
91 
92 	if (fstream->head < fstream->tail) {
93 		/* ...HXXXT... */
94 		used = fstream->tail - fstream->head;
95 		i_assert(size <= used);
96 		fstream->head += size;
97 	} else {
98 		/* XXXT...HXXX */
99 		used = fstream->buffer_size - fstream->head;
100 		if (size > used) {
101 			size -= used;
102 			i_assert(size <= fstream->tail);
103 			fstream->head = size;
104 		} else {
105 			fstream->head += size;
106 		}
107 
108 		fstream->full = FALSE;
109 	}
110 
111 	if (fstream->head == fstream->tail)
112 		fstream->head = fstream->tail = 0;
113 
114 	if (fstream->head == fstream->buffer_size)
115 		fstream->head = 0;
116 }
117 
o_stream_socket_cork(struct file_ostream * fstream)118 static void o_stream_socket_cork(struct file_ostream *fstream)
119 {
120 	if (fstream->ostream.corked && !fstream->socket_cork_set) {
121 		if (!fstream->no_socket_cork) {
122 			if (net_set_cork(fstream->fd, TRUE) < 0)
123 				fstream->no_socket_cork = TRUE;
124 			else
125 				fstream->socket_cork_set = TRUE;
126 		}
127 	}
128 }
129 
o_stream_lseek(struct file_ostream * fstream)130 static int o_stream_lseek(struct file_ostream *fstream)
131 {
132 	off_t ret;
133 
134 	if (fstream->real_offset == fstream->buffer_offset)
135 		return 0;
136 
137 	ret = lseek(fstream->fd, (off_t)fstream->buffer_offset, SEEK_SET);
138 	if (ret < 0) {
139 		io_stream_set_error(&fstream->ostream.iostream,
140 				    "lseek() failed: %m");
141 		fstream->ostream.ostream.stream_errno = errno;
142 		return -1;
143 	}
144 
145 	if (ret != (off_t)fstream->buffer_offset) {
146 		io_stream_set_error(&fstream->ostream.iostream,
147 				    "lseek() returned wrong value");
148 		fstream->ostream.ostream.stream_errno = EINVAL;
149 		return -1;
150 	}
151 	fstream->real_offset = fstream->buffer_offset;
152 	return 0;
153 }
154 
o_stream_file_writev(struct file_ostream * fstream,const struct const_iovec * iov,unsigned int iov_count)155 ssize_t o_stream_file_writev(struct file_ostream *fstream,
156 				   const struct const_iovec *iov,
157 				   unsigned int iov_count)
158 {
159 	ssize_t ret;
160 	size_t size, sent;
161 	unsigned int i;
162 
163 	if (iov_count == 1) {
164 		i_assert(iov->iov_len > 0);
165 
166 		if (!fstream->file ||
167 		    fstream->real_offset == fstream->buffer_offset) {
168 			ret = write(fstream->fd, iov->iov_base, iov->iov_len);
169 			if (ret > 0)
170 				fstream->real_offset += ret;
171 		} else {
172 			ret = pwrite(fstream->fd, iov->iov_base, iov->iov_len,
173 				     fstream->buffer_offset);
174 		}
175 	} else {
176 		if (o_stream_lseek(fstream) < 0)
177 			return -1;
178 
179 		sent = 0;
180 		while (iov_count > IOV_MAX) {
181 			size = 0;
182 			for (i = 0; i < IOV_MAX; i++)
183 				size += iov[i].iov_len;
184 
185 			ret = writev(fstream->fd, (const struct iovec *)iov,
186 				     IOV_MAX);
187 			if (ret != (ssize_t)size) {
188 				break;
189 			}
190 
191 			fstream->real_offset += ret;
192 			fstream->buffer_offset += ret;
193 			sent += ret;
194 			iov += IOV_MAX;
195 			iov_count -= IOV_MAX;
196 		}
197 
198 		if (iov_count <= IOV_MAX) {
199 			size = 0;
200 			for (i = 0; i < iov_count; i++)
201 				size += iov[i].iov_len;
202 
203 			ret = writev(fstream->fd, (const struct iovec *)iov,
204 				     iov_count);
205 		}
206 		if (ret > 0) {
207 			fstream->real_offset += ret;
208 			ret += sent;
209 		} else if (!fstream->file && sent > 0) {
210 			/* return what we managed to get sent */
211 			ret = sent;
212 		}
213 	}
214 	return ret;
215 }
216 
217 static ssize_t
o_stream_file_writev_full(struct file_ostream * fstream,const struct const_iovec * iov,unsigned int iov_count)218 o_stream_file_writev_full(struct file_ostream *fstream,
219 				   const struct const_iovec *iov,
220 				   unsigned int iov_count)
221 {
222 	ssize_t ret, ret2;
223 	size_t size, total_size;
224 	bool partial;
225 	unsigned int i;
226 
227 	for (i = 0, total_size = 0; i < iov_count; i++)
228 		total_size += iov[i].iov_len;
229 
230 	o_stream_socket_cork(fstream);
231 	ret = fstream->writev(fstream, iov, iov_count);
232 	partial = ret != (ssize_t)total_size;
233 
234 	if (ret < 0) {
235 		if (fstream->file) {
236 			if (errno == EINTR) {
237 				/* automatically retry */
238 				return o_stream_file_writev_full(fstream, iov, iov_count);
239 			}
240 		} else if (errno == EAGAIN || errno == EINTR) {
241 			/* try again later */
242 			return 0;
243 		}
244 		fstream->ostream.ostream.stream_errno = errno;
245 		stream_closed(fstream);
246 		return -1;
247 	}
248 	if (unlikely(ret == 0 && fstream->file)) {
249 		/* assume out of disk space */
250 		fstream->ostream.ostream.stream_errno = ENOSPC;
251 		stream_closed(fstream);
252 		return -1;
253 	}
254 	fstream->buffer_offset += ret;
255 	if (partial && fstream->file) {
256 		/* we failed to write everything to a file. either we ran out
257 		   of disk space or we're writing to NFS. try to write the
258 		   rest to resolve this. */
259 		size = ret;
260 		while (iov_count > 0 && size >= iov->iov_len) {
261 			size -= iov->iov_len;
262 			iov++;
263 			iov_count--;
264 		}
265 		i_assert(iov_count > 0);
266 		if (size == 0)
267 			ret2 = o_stream_file_writev_full(fstream, iov, iov_count);
268 		else {
269 			/* write the first iov separately */
270 			struct const_iovec new_iov;
271 
272 			new_iov.iov_base =
273 				CONST_PTR_OFFSET(iov->iov_base, size);
274 			new_iov.iov_len = iov->iov_len - size;
275 			ret2 = o_stream_file_writev_full(fstream, &new_iov, 1);
276 			if (ret2 > 0) {
277 				i_assert((size_t)ret2 == new_iov.iov_len);
278 				/* write the rest */
279 				if (iov_count > 1) {
280 					ret += ret2;
281 					ret2 = o_stream_file_writev_full(fstream, iov + 1,
282 							       iov_count - 1);
283 				}
284 			}
285 		}
286 		i_assert(ret2 != 0);
287 		if (ret2 < 0)
288 			ret = ret2;
289 		else
290 			ret += ret2;
291 	}
292 	i_assert(ret < 0 || !fstream->file ||
293 		 (size_t)ret == total_size);
294 	return ret;
295 }
296 
297 /* returns how much of vector was used */
o_stream_fill_iovec(struct file_ostream * fstream,struct const_iovec iov[2])298 static int o_stream_fill_iovec(struct file_ostream *fstream,
299 			       struct const_iovec iov[2])
300 {
301 	if (IS_STREAM_EMPTY(fstream))
302 		return 0;
303 
304 	if (fstream->head < fstream->tail) {
305 		iov[0].iov_base = fstream->buffer + fstream->head;
306 		iov[0].iov_len = fstream->tail - fstream->head;
307 		return 1;
308 	} else {
309 		iov[0].iov_base = fstream->buffer + fstream->head;
310 		iov[0].iov_len = fstream->buffer_size - fstream->head;
311 		if (fstream->tail == 0)
312 			return 1;
313 		else {
314 			iov[1].iov_base = fstream->buffer;
315 			iov[1].iov_len = fstream->tail;
316 			return 2;
317 		}
318 	}
319 }
320 
buffer_flush(struct file_ostream * fstream)321 static int buffer_flush(struct file_ostream *fstream)
322 {
323 	struct const_iovec iov[2];
324 	int iov_len;
325 	ssize_t ret;
326 
327 	iov_len = o_stream_fill_iovec(fstream, iov);
328 	if (iov_len > 0) {
329 		ret = o_stream_file_writev_full(fstream, iov, iov_len);
330 		if (ret < 0)
331 			return -1;
332 
333 		update_buffer(fstream, ret);
334 	}
335 
336 	return IS_STREAM_EMPTY(fstream) ? 1 : 0;
337 }
338 
o_stream_tcp_flush_via_nodelay(struct file_ostream * fstream)339 static void o_stream_tcp_flush_via_nodelay(struct file_ostream *fstream)
340 {
341 	if (net_set_tcp_nodelay(fstream->fd, TRUE) < 0) {
342 		/* Don't bother logging errors. There are quite a lot of
343 		   different errors that need to be ignored, and it differs
344 		   between OSes. At least:
345 		   Linux: ENOTSUP, ENOTSOCK, ENOPROTOOPT
346 		   FreeBSD: EINVAL, ECONNRESET */
347 		fstream->no_socket_nodelay = TRUE;
348 	} else if (net_set_tcp_nodelay(fstream->fd, FALSE) < 0) {
349 		/* We already successfully enabled TCP_NODELAY, so there
350 		   shouldn't really be errors. Except ECONNRESET can possibly
351 		   still happen between these two calls, so again don't log
352 		   errors. */
353 		fstream->no_socket_nodelay = TRUE;
354 	}
355 }
356 
o_stream_file_cork(struct ostream_private * stream,bool set)357 static void o_stream_file_cork(struct ostream_private *stream, bool set)
358 {
359 	struct file_ostream *fstream =
360 		container_of(stream, struct file_ostream, ostream);
361 	struct iostream_private *iostream = &fstream->ostream.iostream;
362 	int ret;
363 
364 	if (stream->corked != set && !stream->ostream.closed) {
365 		if (set && fstream->io != NULL)
366 			io_remove(&fstream->io);
367 		else if (!set) {
368 			/* buffer flushing might close the stream */
369 			ret = buffer_flush(fstream);
370 			stream->last_errors_not_checked = TRUE;
371 			if (fstream->io == NULL &&
372 			    (ret == 0 || fstream->flush_pending) &&
373 			    !stream->ostream.closed) {
374 				fstream->io = io_add_to(
375 					io_stream_get_ioloop(iostream),
376 					fstream->fd, IO_WRITE,
377 					stream_send_io, fstream);
378 			}
379 		}
380 		if (stream->ostream.closed) {
381 			/* flushing may have closed the stream already */
382 			return;
383 		}
384 
385 		if (fstream->socket_cork_set) {
386 			i_assert(!set);
387 			if (net_set_cork(fstream->fd, FALSE) < 0)
388 				fstream->no_socket_cork = TRUE;
389 			fstream->socket_cork_set = FALSE;
390 		}
391 		if (!set && !fstream->no_socket_nodelay) {
392 			/* Uncorking - send all the pending data immediately.
393 			   Remove nodelay immediately afterwards, so if any
394 			   output is sent outside corking it may get delayed. */
395 			o_stream_tcp_flush_via_nodelay(fstream);
396 		}
397 		if (!set && !fstream->no_socket_quickack) {
398 			/* Uncorking - disable delayed ACKs to reduce latency.
399 			   Note that this needs to be set repeatedly. */
400 			if (net_set_tcp_quickack(fstream->fd, TRUE) < 0)
401 				fstream->no_socket_quickack = TRUE;
402 		}
403 		stream->corked = set;
404 	}
405 }
406 
o_stream_file_flush(struct ostream_private * stream)407 static int o_stream_file_flush(struct ostream_private *stream)
408 {
409 	struct file_ostream *fstream =
410 		container_of(stream, struct file_ostream, ostream);
411 
412 	return buffer_flush(fstream);
413 }
414 
415 static void
o_stream_file_flush_pending(struct ostream_private * stream,bool set)416 o_stream_file_flush_pending(struct ostream_private *stream, bool set)
417 {
418 	struct file_ostream *fstream =
419 		container_of(stream, struct file_ostream, ostream);
420 	struct iostream_private *iostream = &fstream->ostream.iostream;
421 
422 	fstream->flush_pending = set;
423 	if (set && !stream->corked && fstream->io == NULL) {
424 		fstream->io = io_add_to(io_stream_get_ioloop(iostream),
425 					fstream->fd, IO_WRITE,
426 					stream_send_io, fstream);
427 	}
428 }
429 
get_unused_space(const struct file_ostream * fstream)430 static size_t get_unused_space(const struct file_ostream *fstream)
431 {
432 	if (fstream->head > fstream->tail) {
433 		/* XXXT...HXXX */
434 		return fstream->head - fstream->tail;
435 	} else if (fstream->head < fstream->tail) {
436 		/* ...HXXXT... */
437 		return (fstream->buffer_size - fstream->tail) + fstream->head;
438 	} else {
439 		/* either fully unused or fully used */
440 		return fstream->full ? 0 : fstream->buffer_size;
441 	}
442 }
443 
444 static size_t
o_stream_file_get_buffer_used_size(const struct ostream_private * stream)445 o_stream_file_get_buffer_used_size(const struct ostream_private *stream)
446 {
447 	const struct file_ostream *fstream =
448 		container_of(stream, const struct file_ostream, ostream);
449 
450 	return fstream->buffer_size - get_unused_space(fstream);
451 }
452 
o_stream_file_seek(struct ostream_private * stream,uoff_t offset)453 static int o_stream_file_seek(struct ostream_private *stream, uoff_t offset)
454 {
455 	struct file_ostream *fstream =
456 		container_of(stream, struct file_ostream, ostream);
457 
458 	if (offset > OFF_T_MAX) {
459 		stream->ostream.stream_errno = EINVAL;
460 		return -1;
461 	}
462 	if (!fstream->file) {
463 		stream->ostream.stream_errno = ESPIPE;
464 		return -1;
465 	}
466 
467 	if (buffer_flush(fstream) < 0)
468 		return -1;
469 
470 	stream->ostream.offset = offset;
471 	fstream->buffer_offset = offset;
472 	return 1;
473 }
474 
o_stream_grow_buffer(struct file_ostream * fstream,size_t bytes)475 static void o_stream_grow_buffer(struct file_ostream *fstream, size_t bytes)
476 {
477 	size_t size, new_size, end_size;
478 
479 	size = nearest_power(fstream->buffer_size + bytes);
480 	if (size > fstream->ostream.max_buffer_size) {
481 		/* limit the size */
482 		size = fstream->ostream.max_buffer_size;
483 	} else if (fstream->ostream.corked) {
484 		/* try to use optimal buffer size with corking */
485 		new_size = I_MIN(fstream->optimal_block_size,
486 				 fstream->ostream.max_buffer_size);
487 		if (new_size > size)
488 			size = new_size;
489 	}
490 
491 	if (size <= fstream->buffer_size)
492 		return;
493 
494 	fstream->buffer = i_realloc(fstream->buffer,
495 				    fstream->buffer_size, size);
496 
497 	if (fstream->tail <= fstream->head && !IS_STREAM_EMPTY(fstream)) {
498 		/* move head forward to end of buffer */
499 		end_size = fstream->buffer_size - fstream->head;
500 		memmove(fstream->buffer + size - end_size,
501 			fstream->buffer + fstream->head, end_size);
502 		fstream->head = size - end_size;
503 	}
504 
505 	fstream->full = FALSE;
506 	fstream->buffer_size = size;
507 }
508 
stream_send_io(struct file_ostream * fstream)509 static void stream_send_io(struct file_ostream *fstream)
510 {
511 	struct ostream *ostream = &fstream->ostream.ostream;
512 	struct iostream_private *iostream = &fstream->ostream.iostream;
513 	bool use_cork = !fstream->ostream.corked;
514 	int ret;
515 
516 	/* Set flush_pending = FALSE first before calling the flush callback,
517 	   and change it to TRUE only if callback returns 0. That way the
518 	   callback can call o_stream_set_flush_pending() again and we don't
519 	   forget it even if flush callback returns 1. */
520 	fstream->flush_pending = FALSE;
521 
522 	o_stream_ref(ostream);
523 	if (use_cork)
524 		o_stream_cork(ostream);
525 	if (fstream->ostream.callback != NULL)
526 		ret = fstream->ostream.callback(fstream->ostream.context);
527 	else
528 		ret = o_stream_file_flush(&fstream->ostream);
529 	if (use_cork)
530 		o_stream_uncork(ostream);
531 
532 	if (ret == 0)
533 		fstream->flush_pending = TRUE;
534 
535 	if (!fstream->flush_pending && IS_STREAM_EMPTY(fstream)) {
536 		io_remove(&fstream->io);
537 	} else if (!fstream->ostream.ostream.closed) {
538 		/* Add the IO handler if it's not there already. Callback
539 		   might have just returned 0 without there being any data
540 		   to be sent. */
541 		if (fstream->io == NULL) {
542 			fstream->io = io_add_to(io_stream_get_ioloop(iostream),
543 						fstream->fd, IO_WRITE,
544 						stream_send_io, fstream);
545 		}
546 	}
547 
548 	o_stream_unref(&ostream);
549 }
550 
o_stream_add(struct file_ostream * fstream,const void * data,size_t size)551 static size_t o_stream_add(struct file_ostream *fstream,
552 			   const void *data, size_t size)
553 {
554 	struct iostream_private *iostream = &fstream->ostream.iostream;
555 	size_t unused, sent;
556 	int i;
557 
558 	unused = get_unused_space(fstream);
559 	if (unused < size)
560 		o_stream_grow_buffer(fstream, size-unused);
561 
562 	sent = 0;
563 	for (i = 0; i < 2 && sent < size && !fstream->full; i++) {
564 		unused = fstream->tail >= fstream->head ?
565 			fstream->buffer_size - fstream->tail :
566 			fstream->head - fstream->tail;
567 
568 		if (unused > size-sent)
569 			unused = size-sent;
570 		memcpy(fstream->buffer + fstream->tail,
571 		       CONST_PTR_OFFSET(data, sent), unused);
572 		sent += unused;
573 
574 		fstream->tail += unused;
575 		if (fstream->tail == fstream->buffer_size)
576 			fstream->tail = 0;
577 
578 		if (fstream->head == fstream->tail &&
579 		    fstream->buffer_size > 0)
580 			fstream->full = TRUE;
581 	}
582 
583 	if (sent != 0 && fstream->io == NULL &&
584 	    !fstream->ostream.corked && !fstream->file) {
585 		fstream->io = io_add_to(io_stream_get_ioloop(iostream),
586 					fstream->fd, IO_WRITE, stream_send_io,
587 				     	fstream);
588 	}
589 
590 	return sent;
591 }
592 
o_stream_file_sendv(struct ostream_private * stream,const struct const_iovec * iov,unsigned int iov_count)593 ssize_t o_stream_file_sendv(struct ostream_private *stream,
594 				   const struct const_iovec *iov,
595 				   unsigned int iov_count)
596 {
597 	struct file_ostream *fstream =
598 		container_of(stream, struct file_ostream, ostream);
599 	size_t size, total_size, added, optimal_size;
600 	unsigned int i;
601 	ssize_t ret = 0;
602 
603 	for (i = 0, size = 0; i < iov_count; i++)
604 		size += iov[i].iov_len;
605 	total_size = size;
606 
607 	if (size > get_unused_space(fstream) && !IS_STREAM_EMPTY(fstream)) {
608 		if (o_stream_file_flush(stream) < 0)
609 			return -1;
610 	}
611 
612 	optimal_size = I_MIN(fstream->optimal_block_size,
613 			     fstream->ostream.max_buffer_size);
614 	if (IS_STREAM_EMPTY(fstream) &&
615 	    (!stream->corked || size >= optimal_size)) {
616 		/* send immediately */
617 		ret = o_stream_file_writev_full(fstream, iov, iov_count);
618 		if (ret < 0)
619 			return -1;
620 
621 		size = ret;
622 		while (size > 0 && iov_count > 0 && size >= iov[0].iov_len) {
623 			size -= iov[0].iov_len;
624 			iov++;
625 			iov_count--;
626 		}
627 
628 		if (iov_count == 0)
629 			i_assert(size == 0);
630 		else {
631 			added = o_stream_add(fstream,
632 					CONST_PTR_OFFSET(iov[0].iov_base, size),
633 					iov[0].iov_len - size);
634 			ret += added;
635 
636 			if (added != iov[0].iov_len - size) {
637 				/* buffer full */
638 				stream->ostream.offset += ret;
639 				return ret;
640 			}
641 
642 			iov++;
643 			iov_count--;
644 		}
645 	}
646 
647 	/* buffer it, at least partly */
648 	for (i = 0; i < iov_count; i++) {
649 		added = o_stream_add(fstream, iov[i].iov_base, iov[i].iov_len);
650 		ret += added;
651 		if (added != iov[i].iov_len)
652 			break;
653 	}
654 	stream->ostream.offset += ret;
655 	i_assert((size_t)ret <= total_size);
656 	i_assert((size_t)ret == total_size || !fstream->file);
657 	return ret;
658 }
659 
660 static size_t
o_stream_file_update_buffer(struct file_ostream * fstream,const void * data,size_t size,size_t pos)661 o_stream_file_update_buffer(struct file_ostream *fstream,
662 			    const void *data, size_t size, size_t pos)
663 {
664 	size_t avail, copy_size;
665 
666 	if (fstream->head < fstream->tail) {
667 		/* ...HXXXT... */
668 		i_assert(pos < fstream->tail);
669 		avail = fstream->tail - pos;
670 	} else {
671 		/* XXXT...HXXX */
672 		avail = fstream->buffer_size - pos;
673 	}
674 	copy_size = I_MIN(size, avail);
675 	memcpy(fstream->buffer + pos, data, copy_size);
676 	data = CONST_PTR_OFFSET(data, copy_size);
677 	size -= copy_size;
678 
679 	if (size > 0 && fstream->head >= fstream->tail) {
680 		/* wraps to beginning of the buffer */
681 		copy_size = I_MIN(size, fstream->tail);
682 		memcpy(fstream->buffer, data, copy_size);
683 		size -= copy_size;
684 	}
685 	return size;
686 }
687 
688 static int
o_stream_file_write_at(struct ostream_private * stream,const void * data,size_t size,uoff_t offset)689 o_stream_file_write_at(struct ostream_private *stream,
690 		       const void *data, size_t size, uoff_t offset)
691 {
692 	struct file_ostream *fstream =
693 		container_of(stream, struct file_ostream, ostream);
694 	size_t used, pos, skip, left;
695 
696 	/* update buffer if the write overlaps it */
697 	used = file_buffer_get_used_size(fstream);
698 	if (used > 0 &&
699 	    fstream->buffer_offset < offset + size &&
700 	    fstream->buffer_offset + used > offset) {
701 		if (fstream->buffer_offset <= offset) {
702 			/* updating from the beginning */
703 			skip = 0;
704 		} else {
705 			skip = fstream->buffer_offset - offset;
706 		}
707 		pos = (fstream->head + offset + skip - fstream->buffer_offset) %
708 			fstream->buffer_size;
709 		left = o_stream_file_update_buffer(fstream,
710 				CONST_PTR_OFFSET(data, skip), size - skip, pos);
711 		if (left > 0) {
712 			/* didn't write all of it */
713 			if (skip > 0) {
714 				/* we also have to write a prefix. don't
715 				   bother with two syscalls, just write all
716 				   of it in one pwrite(). */
717 			} else {
718 				/* write only the suffix */
719 				size_t update_count = size - left;
720 
721 				data = CONST_PTR_OFFSET(data, update_count);
722 				size -= update_count;
723 				offset += update_count;
724 			}
725 		} else if (skip == 0) {
726 			/* everything done */
727 			return 0;
728 		} else {
729 			/* still have to write prefix */
730 			size = skip;
731 		}
732 	}
733 
734 	/* we couldn't write everything to the buffer. flush the buffer
735 	   and pwrite() the rest. */
736 	if (o_stream_file_flush(stream) < 0)
737 		return -1;
738 
739 	if (pwrite_full(fstream->fd, data, size, offset) < 0) {
740 		stream->ostream.stream_errno = errno;
741 		stream_closed(fstream);
742 		return -1;
743 	}
744 	return 0;
745 }
746 
747 static bool
io_stream_sendfile(struct ostream_private * outstream,struct istream * instream,int in_fd,enum ostream_send_istream_result * res_r)748 io_stream_sendfile(struct ostream_private *outstream,
749 		   struct istream *instream, int in_fd,
750 		   enum ostream_send_istream_result *res_r)
751 {
752 	struct file_ostream *foutstream =
753 		container_of(outstream, struct file_ostream, ostream);
754 	uoff_t in_size, offset, send_size, v_offset, abs_start_offset;
755 	ssize_t ret;
756 	bool sendfile_not_supported = FALSE;
757 
758 	if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0) {
759 		*res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
760 		return TRUE;
761 	}
762 	if (ret == 0) {
763 		/* size unknown. we can't use sendfile(). */
764 		return FALSE;
765 	}
766 
767 	o_stream_socket_cork(foutstream);
768 
769 	/* flush out any data in buffer */
770 	if ((ret = buffer_flush(foutstream)) < 0) {
771 		*res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
772 		return TRUE;
773 	} else if (ret == 0) {
774 		*res_r = OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
775 		return TRUE;
776 	}
777 
778 	if (o_stream_lseek(foutstream) < 0) {
779 		*res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
780 		return TRUE;
781 	}
782 
783 	v_offset = instream->v_offset;
784 	abs_start_offset = i_stream_get_absolute_offset(instream) - v_offset;
785 	while (v_offset < in_size) {
786 		offset = abs_start_offset + v_offset;
787 		send_size = in_size - v_offset;
788 
789 		ret = safe_sendfile(foutstream->fd, in_fd, &offset,
790 				    MAX_SSIZE_T(send_size));
791 		if (ret <= 0) {
792 			if (ret == 0) {
793 				/* Unexpectedly early EOF at input */
794 				i_stream_seek(instream, v_offset);
795 				instream->eof = TRUE;
796 				*res_r = OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
797 				return TRUE;
798 			}
799 			if (foutstream->file) {
800 				if (errno == EINTR) {
801 					/* automatically retry */
802 					continue;
803 				}
804 			} else {
805 				if (errno == EINTR || errno == EAGAIN) {
806 					ret = 0;
807 					break;
808 				}
809 			}
810 			if (errno == EINVAL)
811 				sendfile_not_supported = TRUE;
812 			else {
813 				io_stream_set_error(&outstream->iostream,
814 						    "sendfile() failed: %m");
815 				outstream->ostream.stream_errno = errno;
816 				/* close only if error wasn't because
817 				   sendfile() isn't supported */
818 				stream_closed(foutstream);
819 			}
820 			break;
821 		}
822 
823 		v_offset += ret;
824 		foutstream->real_offset += ret;
825 		foutstream->buffer_offset += ret;
826 		outstream->ostream.offset += ret;
827 	}
828 
829 	i_stream_seek(instream, v_offset);
830 	if (v_offset == in_size) {
831 		instream->eof = TRUE;
832 		*res_r = OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
833 		return TRUE;
834 	}
835 	i_assert(ret <= 0);
836 	if (sendfile_not_supported)
837 		return FALSE;
838 	if (ret < 0)
839 		*res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
840 	else
841 		*res_r = OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
842 	return TRUE;
843 }
844 
845 static enum ostream_send_istream_result
io_stream_copy_backwards(struct ostream_private * outstream,struct istream * instream,uoff_t in_size)846 io_stream_copy_backwards(struct ostream_private *outstream,
847 			 struct istream *instream, uoff_t in_size)
848 {
849 	struct file_ostream *foutstream =
850 		container_of(outstream, struct file_ostream, ostream);
851 	uoff_t in_start_offset, in_offset, in_limit, out_offset;
852 	const unsigned char *data;
853 	size_t buffer_size, size, read_size;
854 	ssize_t ret;
855 
856 	i_assert(IS_STREAM_EMPTY(foutstream));
857 
858 	/* figure out optimal buffer size */
859 	buffer_size = instream->real_stream->buffer_size;
860 	if (buffer_size == 0 || buffer_size > foutstream->buffer_size) {
861 		if (foutstream->optimal_block_size > foutstream->buffer_size) {
862 			o_stream_grow_buffer(foutstream,
863 					     foutstream->optimal_block_size -
864 					     foutstream->buffer_size);
865 		}
866 
867 		buffer_size = foutstream->buffer_size;
868 	}
869 
870 	in_start_offset = instream->v_offset;
871 	in_offset = in_limit = in_size;
872 	out_offset = outstream->ostream.offset + (in_offset - in_start_offset);
873 
874 	while (in_offset > in_start_offset) {
875 		if (in_offset - in_start_offset <= buffer_size)
876 			read_size = in_offset - in_start_offset;
877 		else
878 			read_size = buffer_size;
879 		in_offset -= read_size;
880 		out_offset -= read_size;
881 
882 		for (;;) {
883 			i_assert(in_offset <= in_limit);
884 
885 			i_stream_seek(instream, in_offset);
886 			read_size = in_limit - in_offset;
887 
888 			/* FIXME: something's wrong here */
889 			if (i_stream_read_bytes(instream, &data, &size,
890 						read_size) == 0)
891 				i_unreached();
892 			if (size >= read_size) {
893 				size = read_size;
894 				if (instream->mmaped) {
895 					/* we'll have to write it through
896 					   buffer or the file gets corrupted */
897 					i_assert(size <=
898 						 foutstream->buffer_size);
899 					memcpy(foutstream->buffer, data, size);
900 					data = foutstream->buffer;
901 				}
902 				break;
903 			}
904 
905 			/* buffer too large probably, try with smaller */
906 			read_size -= size;
907 			in_offset += read_size;
908 			out_offset += read_size;
909 			buffer_size -= read_size;
910 		}
911 		in_limit -= size;
912 
913 		ret = pwrite_full(foutstream->fd, data, size, out_offset);
914 		if (ret < 0) {
915 			/* error */
916 			outstream->ostream.stream_errno = errno;
917 			return OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
918 		}
919 		i_stream_skip(instream, size);
920 	}
921 	/* make it visible that we're at instream's EOF */
922 	i_stream_seek(instream, in_size);
923 	instream->eof = TRUE;
924 
925 	outstream->ostream.offset += in_size - in_start_offset;
926 	return OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
927 }
928 
929 static enum ostream_send_istream_result
io_stream_copy_same_stream(struct ostream_private * outstream,struct istream * instream)930 io_stream_copy_same_stream(struct ostream_private *outstream,
931 			   struct istream *instream)
932 {
933 	uoff_t in_size;
934 	off_t in_abs_offset, ret = 0;
935 
936 	/* copying data within same fd. we'll have to be careful with
937 	   seeks and overlapping writes. */
938 	if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0)
939 		return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
940 	if (ret == 0) {
941 		/* if we couldn't find out the size, it means that instream
942 		   isn't a regular file_istream. we can be reasonably sure that
943 		   we can copy it safely the regular way. (there's really no
944 		   other possibility, other than failing completely.) */
945 		return io_stream_copy(&outstream->ostream, instream);
946 	}
947 	i_assert(instream->v_offset <= in_size);
948 
949 	in_abs_offset = i_stream_get_absolute_offset(instream);
950 	ret = (off_t)outstream->ostream.offset - in_abs_offset;
951 	if (ret == 0) {
952 		/* copying data over itself. we don't really
953 		   need to do that, just fake it. */
954 		return OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
955 	}
956 	if (ret > 0 && in_size > (uoff_t)ret) {
957 		/* overlapping */
958 		i_assert(instream->seekable);
959 		return io_stream_copy_backwards(outstream, instream, in_size);
960 	} else {
961 		/* non-overlapping */
962 		return io_stream_copy(&outstream->ostream, instream);
963 	}
964 }
965 
966 static enum ostream_send_istream_result
o_stream_file_send_istream(struct ostream_private * outstream,struct istream * instream)967 o_stream_file_send_istream(struct ostream_private *outstream,
968 			   struct istream *instream)
969 {
970 	struct file_ostream *foutstream =
971 		container_of(outstream, struct file_ostream, ostream);
972 	bool same_stream;
973 	int in_fd;
974 	enum ostream_send_istream_result res;
975 
976 	in_fd = !instream->readable_fd ? -1 : i_stream_get_fd(instream);
977 	if (!foutstream->no_sendfile && in_fd != -1 &&
978 	    in_fd != foutstream->fd && instream->seekable) {
979 		if (io_stream_sendfile(outstream, instream, in_fd, &res))
980 			return res;
981 
982 		/* sendfile() not supported (with this fd), fallback to
983 		   regular sending. */
984 		foutstream->no_sendfile = TRUE;
985 	}
986 
987 	same_stream = i_stream_get_fd(instream) == foutstream->fd &&
988 		foutstream->fd != -1;
989 	if (!same_stream)
990 		return io_stream_copy(&outstream->ostream, instream);
991 	return io_stream_copy_same_stream(outstream, instream);
992 }
993 
o_stream_file_switch_ioloop_to(struct ostream_private * stream,struct ioloop * ioloop)994 static void o_stream_file_switch_ioloop_to(struct ostream_private *stream,
995 					   struct ioloop *ioloop)
996 {
997 	struct file_ostream *fstream =
998 		container_of(stream, struct file_ostream, ostream);
999 
1000 	if (fstream->io != NULL)
1001 		fstream->io = io_loop_move_io_to(ioloop, &fstream->io);
1002 }
1003 
1004 struct ostream *
o_stream_create_file_common(struct file_ostream * fstream,int fd,size_t max_buffer_size,bool autoclose_fd)1005 o_stream_create_file_common(struct file_ostream *fstream,
1006 	int fd, size_t max_buffer_size, bool autoclose_fd)
1007 {
1008 	struct ostream *ostream;
1009 
1010 	fstream->fd = fd;
1011 	fstream->autoclose_fd = autoclose_fd;
1012 	fstream->optimal_block_size = DEFAULT_OPTIMAL_BLOCK_SIZE;
1013 
1014 	fstream->ostream.iostream.close = o_stream_file_close;
1015 	fstream->ostream.iostream.destroy = o_stream_file_destroy;
1016 
1017 	fstream->ostream.cork = o_stream_file_cork;
1018 	fstream->ostream.flush = o_stream_file_flush;
1019 	fstream->ostream.flush_pending = o_stream_file_flush_pending;
1020 	fstream->ostream.get_buffer_used_size =
1021 		o_stream_file_get_buffer_used_size;
1022 	fstream->ostream.seek = o_stream_file_seek;
1023 	fstream->ostream.sendv = o_stream_file_sendv;
1024 	fstream->ostream.write_at = o_stream_file_write_at;
1025 	fstream->ostream.send_istream = o_stream_file_send_istream;
1026 	fstream->ostream.switch_ioloop_to = o_stream_file_switch_ioloop_to;
1027 
1028 	fstream->writev = o_stream_file_writev;
1029 
1030 	fstream->ostream.max_buffer_size = max_buffer_size;
1031 	ostream = o_stream_create(&fstream->ostream, NULL, fd);
1032 
1033 	if (max_buffer_size == 0)
1034 		fstream->ostream.max_buffer_size = fstream->optimal_block_size;
1035 
1036 	return ostream;
1037 }
1038 
fstream_init_file(struct file_ostream * fstream)1039 static void fstream_init_file(struct file_ostream *fstream)
1040 {
1041 	struct stat st;
1042 
1043 	fstream->no_sendfile = TRUE;
1044 	if (fstat(fstream->fd, &st) < 0)
1045 		return;
1046 
1047 	if ((uoff_t)st.st_blksize > fstream->optimal_block_size) {
1048 		/* use the optimal block size, but with a reasonable limit */
1049 		fstream->optimal_block_size =
1050 			I_MIN(st.st_blksize, MAX_OPTIMAL_BLOCK_SIZE);
1051 	}
1052 
1053 	if (S_ISREG(st.st_mode)) {
1054 		fstream->no_socket_cork = TRUE;
1055 		fstream->no_socket_nodelay = TRUE;
1056 		fstream->no_socket_quickack = TRUE;
1057 		fstream->file = TRUE;
1058 	}
1059 }
1060 
1061 static
o_stream_create_fd_common(int fd,size_t max_buffer_size,bool autoclose_fd)1062 struct ostream * o_stream_create_fd_common(int fd, size_t max_buffer_size,
1063 		bool autoclose_fd)
1064 {
1065 	struct file_ostream *fstream;
1066 	struct ostream *ostream;
1067 	off_t offset;
1068 
1069 	fstream = i_new(struct file_ostream, 1);
1070 	ostream = o_stream_create_file_common
1071 		(fstream, fd, max_buffer_size, autoclose_fd);
1072 
1073 	offset = lseek(fd, 0, SEEK_CUR);
1074 	if (offset >= 0) {
1075 		ostream->offset = offset;
1076 		fstream->real_offset = offset;
1077 		fstream->buffer_offset = offset;
1078 		fstream_init_file(fstream);
1079 	} else {
1080 		struct ip_addr local_ip;
1081 
1082 		if (net_getsockname(fd, &local_ip, NULL) < 0) {
1083 			/* not a socket */
1084 			fstream->no_sendfile = TRUE;
1085 			fstream->no_socket_cork = TRUE;
1086 			fstream->no_socket_nodelay = TRUE;
1087 			fstream->no_socket_quickack = TRUE;
1088 		} else if (local_ip.family == 0) {
1089 			/* UNIX domain socket */
1090 			fstream->no_socket_cork = TRUE;
1091 			fstream->no_socket_nodelay = TRUE;
1092 			fstream->no_socket_quickack = TRUE;
1093 		}
1094 	}
1095 
1096 	return ostream;
1097 }
1098 
1099 struct ostream *
o_stream_create_fd(int fd,size_t max_buffer_size)1100 o_stream_create_fd(int fd, size_t max_buffer_size)
1101 {
1102 	return o_stream_create_fd_common(fd, max_buffer_size, FALSE);
1103 }
1104 
1105 struct ostream *
o_stream_create_fd_autoclose(int * fd,size_t max_buffer_size)1106 o_stream_create_fd_autoclose(int *fd, size_t max_buffer_size)
1107 {
1108 	struct ostream *ostream = o_stream_create_fd_common(*fd,
1109 			max_buffer_size, TRUE);
1110 	*fd = -1;
1111 	return ostream;
1112 }
1113 
1114 struct ostream *
o_stream_create_fd_file(int fd,uoff_t offset,bool autoclose_fd)1115 o_stream_create_fd_file(int fd, uoff_t offset, bool autoclose_fd)
1116 {
1117 	struct file_ostream *fstream;
1118 	struct ostream *ostream;
1119 
1120 	if (offset == UOFF_T_MAX)
1121 		offset = lseek(fd, 0, SEEK_CUR);
1122 
1123 	fstream = i_new(struct file_ostream, 1);
1124 	ostream = o_stream_create_file_common(fstream, fd, 0, autoclose_fd);
1125 	fstream_init_file(fstream);
1126 	fstream->real_offset = offset;
1127 	fstream->buffer_offset = offset;
1128 	ostream->blocking = fstream->file;
1129 	ostream->offset = offset;
1130 	return ostream;
1131 }
1132 
o_stream_create_fd_file_autoclose(int * fd,uoff_t offset)1133 struct ostream *o_stream_create_fd_file_autoclose(int *fd, uoff_t offset)
1134 {
1135 	struct ostream *output;
1136 
1137 	output = o_stream_create_fd_file(*fd, offset, TRUE);
1138 	*fd = -1;
1139 	return output;
1140 }
1141 
o_stream_create_file(const char * path,uoff_t offset,mode_t mode,enum ostream_create_file_flags flags)1142 struct ostream *o_stream_create_file(const char *path, uoff_t offset, mode_t mode,
1143 				     enum ostream_create_file_flags flags)
1144 {
1145 	int fd;
1146 	int open_flags = O_WRONLY|O_CREAT;
1147 	if (HAS_ANY_BITS(flags, OSTREAM_CREATE_FILE_FLAG_APPEND))
1148 		open_flags |= O_APPEND;
1149 	else
1150 		open_flags |= O_TRUNC;
1151 	if ((fd = open(path, open_flags, mode)) < 0)
1152 		return o_stream_create_error(errno);
1153 	return o_stream_create_fd_file_autoclose(&fd, offset);
1154 }
1155