1 /* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "lib.h"
4 #include "ioloop.h"
5 #include "array.h"
6 #include "str.h"
7 #include "memarea.h"
8 #include "istream-private.h"
9 
10 static bool i_stream_is_buffer_invalid(const struct istream_private *stream);
11 
i_stream_set_name(struct istream * stream,const char * name)12 void i_stream_set_name(struct istream *stream, const char *name)
13 {
14 	i_free(stream->real_stream->iostream.name);
15 	stream->real_stream->iostream.name = i_strdup(name);
16 }
17 
i_stream_get_name(struct istream * stream)18 const char *i_stream_get_name(struct istream *stream)
19 {
20 	while (stream->real_stream->iostream.name == NULL) {
21 		stream = stream->real_stream->parent;
22 		if (stream == NULL)
23 			return "";
24 	}
25 	return stream->real_stream->iostream.name;
26 }
27 
i_stream_close_full(struct istream * stream,bool close_parents)28 static void i_stream_close_full(struct istream *stream, bool close_parents)
29 {
30 	io_stream_close(&stream->real_stream->iostream, close_parents);
31 	stream->closed = TRUE;
32 
33 	if (stream->stream_errno == 0)
34 		stream->stream_errno = EPIPE;
35 }
36 
i_stream_destroy(struct istream ** stream)37 void i_stream_destroy(struct istream **stream)
38 {
39 	if (*stream == NULL)
40 		return;
41 
42 	i_stream_close_full(*stream, FALSE);
43 	i_stream_unref(stream);
44 }
45 
i_stream_ref(struct istream * stream)46 void i_stream_ref(struct istream *stream)
47 {
48 	io_stream_ref(&stream->real_stream->iostream);
49 }
50 
i_stream_unref(struct istream ** stream)51 void i_stream_unref(struct istream **stream)
52 {
53 	struct istream_private *_stream;
54 
55 	if (*stream == NULL)
56 		return;
57 
58 	_stream = (*stream)->real_stream;
59 
60 	if (_stream->iostream.refcount > 1) {
61 		if (!io_stream_unref(&_stream->iostream))
62 			i_unreached();
63 	} else {
64 		/* The snapshot may contain pointers to the parent istreams.
65 		   Free it before io_stream_unref() frees the parents. */
66 		i_stream_snapshot_free(&_stream->prev_snapshot);
67 
68 		if (io_stream_unref(&_stream->iostream))
69 			i_unreached();
70 		str_free(&_stream->line_str);
71 		i_stream_unref(&_stream->parent);
72 		io_stream_free(&_stream->iostream);
73 	}
74 	*stream = NULL;
75 }
76 
77 #undef i_stream_add_destroy_callback
i_stream_add_destroy_callback(struct istream * stream,istream_callback_t * callback,void * context)78 void i_stream_add_destroy_callback(struct istream *stream,
79 				   istream_callback_t *callback, void *context)
80 {
81 	io_stream_add_destroy_callback(&stream->real_stream->iostream,
82 				       callback, context);
83 }
84 
i_stream_remove_destroy_callback(struct istream * stream,void (* callback)())85 void i_stream_remove_destroy_callback(struct istream *stream,
86 				      void (*callback)())
87 {
88 	io_stream_remove_destroy_callback(&stream->real_stream->iostream,
89 					  callback);
90 }
91 
i_stream_get_fd(struct istream * stream)92 int i_stream_get_fd(struct istream *stream)
93 {
94 	struct istream_private *_stream = stream->real_stream;
95 
96 	return _stream->fd;
97 }
98 
i_stream_copy_fd(struct istream * dest,struct istream * source)99 void i_stream_copy_fd(struct istream *dest, struct istream *source)
100 {
101 	int fd = i_stream_get_fd(source);
102 
103 	i_assert(fd != -1);
104 	i_assert(dest->real_stream->fd == -1);
105 	dest->real_stream->fd = fd;
106 	dest->readable_fd = source->readable_fd;
107 }
108 
i_stream_get_error(struct istream * stream)109 const char *i_stream_get_error(struct istream *stream)
110 {
111 	struct istream *s;
112 
113 	/* we'll only return errors for streams that have stream_errno set or
114 	   that have reached EOF. we might be returning unintended error
115 	   otherwise. */
116 	if (stream->stream_errno == 0)
117 		return stream->eof ? "EOF" : "<no error>";
118 
119 	for (s = stream; s != NULL; s = s->real_stream->parent) {
120 		if (s->stream_errno == 0)
121 			break;
122 		if (s->real_stream->iostream.error != NULL)
123 			return s->real_stream->iostream.error;
124 	}
125 	return strerror(stream->stream_errno);
126 }
127 
i_stream_get_disconnect_reason(struct istream * stream)128 const char *i_stream_get_disconnect_reason(struct istream *stream)
129 {
130 	return io_stream_get_disconnect_reason(stream, NULL);
131 }
132 
i_stream_close(struct istream * stream)133 void i_stream_close(struct istream *stream)
134 {
135 	if (stream != NULL)
136 		i_stream_close_full(stream, TRUE);
137 }
138 
i_stream_set_init_buffer_size(struct istream * stream,size_t size)139 void i_stream_set_init_buffer_size(struct istream *stream, size_t size)
140 {
141 	stream->real_stream->init_buffer_size = size;
142 }
143 
i_stream_set_max_buffer_size(struct istream * stream,size_t max_size)144 void i_stream_set_max_buffer_size(struct istream *stream, size_t max_size)
145 {
146 	io_stream_set_max_buffer_size(&stream->real_stream->iostream, max_size);
147 }
148 
i_stream_get_max_buffer_size(struct istream * stream)149 size_t i_stream_get_max_buffer_size(struct istream *stream)
150 {
151 	size_t max_size = 0;
152 
153 	do {
154 		if (max_size < stream->real_stream->max_buffer_size)
155 			max_size = stream->real_stream->max_buffer_size;
156 		stream = stream->real_stream->parent;
157 	} while (stream != NULL);
158 	return max_size;
159 }
160 
i_stream_set_return_partial_line(struct istream * stream,bool set)161 void i_stream_set_return_partial_line(struct istream *stream, bool set)
162 {
163 	stream->real_stream->return_nolf_line = set;
164 }
165 
i_stream_set_persistent_buffers(struct istream * stream,bool set)166 void i_stream_set_persistent_buffers(struct istream *stream, bool set)
167 {
168 	do {
169 		stream->real_stream->nonpersistent_buffers = !set;
170 		stream = stream->real_stream->parent;
171 	} while (stream != NULL);
172 }
173 
i_stream_set_blocking(struct istream * stream,bool blocking)174 void i_stream_set_blocking(struct istream *stream, bool blocking)
175 {
176 	int prev_fd = -1;
177 
178 	do {
179 		stream->blocking = blocking;
180 		if (stream->real_stream->fd != -1 &&
181 		    stream->real_stream->fd != prev_fd) {
182 			fd_set_nonblock(stream->real_stream->fd, !blocking);
183 			prev_fd = stream->real_stream->fd;
184 		}
185 		stream = stream->real_stream->parent;
186 	} while (stream != NULL);
187 }
188 
i_stream_update(struct istream_private * stream)189 static void i_stream_update(struct istream_private *stream)
190 {
191 	if (stream->parent == NULL)
192 		stream->access_counter++;
193 	else {
194 		stream->access_counter =
195 			stream->parent->real_stream->access_counter;
196 		stream->parent_expected_offset = stream->parent->v_offset;
197 	}
198 }
199 
snapshot_has_memarea(struct istream_snapshot * snapshot,struct memarea * memarea)200 static bool snapshot_has_memarea(struct istream_snapshot *snapshot,
201 				 struct memarea *memarea)
202 {
203 	if (snapshot->old_memarea == memarea)
204 		return TRUE;
205 	if (snapshot->prev_snapshot != NULL)
206 		return snapshot_has_memarea(snapshot->prev_snapshot, memarea);
207 	return FALSE;
208 }
209 
210 struct istream_snapshot *
i_stream_default_snapshot(struct istream_private * stream,struct istream_snapshot * prev_snapshot)211 i_stream_default_snapshot(struct istream_private *stream,
212 			  struct istream_snapshot *prev_snapshot)
213 {
214 	struct istream_snapshot *snapshot;
215 
216 	if (stream->memarea != NULL) {
217 		if (prev_snapshot != NULL) {
218 			if (snapshot_has_memarea(prev_snapshot, stream->memarea))
219 				return prev_snapshot;
220 		}
221 		/* This stream has a memarea. Reference it, so we can later on
222 		   rollback if needed. */
223 		snapshot = i_new(struct istream_snapshot, 1);
224 		snapshot->old_memarea = stream->memarea;
225 		snapshot->prev_snapshot = prev_snapshot;
226 		memarea_ref(snapshot->old_memarea);
227 		return snapshot;
228 	}
229 	if (stream->parent == NULL) {
230 		if (stream->nonpersistent_buffers) {
231 			/* Assume that memarea would be used normally, but
232 			   now it's NULL because the buffer is empty and
233 			   empty buffers are freed. */
234 			i_assert(stream->skip == stream->pos);
235 			return prev_snapshot;
236 		}
237 		i_panic("%s is missing istream.snapshot() implementation",
238 			i_stream_get_name(&stream->istream));
239 	}
240 	struct istream_private *_parent_stream =
241 		stream->parent->real_stream;
242 	return _parent_stream->snapshot(_parent_stream, prev_snapshot);
243 }
244 
i_stream_snapshot_free(struct istream_snapshot ** _snapshot)245 void i_stream_snapshot_free(struct istream_snapshot **_snapshot)
246 {
247 	struct istream_snapshot *snapshot = *_snapshot;
248 
249 	if (*_snapshot == NULL)
250 		return;
251 	*_snapshot = NULL;
252 
253 	i_stream_snapshot_free(&snapshot->prev_snapshot);
254 	if (snapshot->free != NULL)
255 		snapshot->free(snapshot);
256 	else {
257 		if (snapshot->old_memarea != NULL)
258 			memarea_unref(&snapshot->old_memarea);
259 		i_free(snapshot);
260 	}
261 }
262 
263 static struct istream_snapshot *
i_stream_noop_snapshot(struct istream_private * stream ATTR_UNUSED,struct istream_snapshot * prev_snapshot)264 i_stream_noop_snapshot(struct istream_private *stream ATTR_UNUSED,
265 		       struct istream_snapshot *prev_snapshot)
266 {
267 	return prev_snapshot;
268 }
269 
i_stream_read(struct istream * stream)270 ssize_t i_stream_read(struct istream *stream)
271 {
272 	struct istream_private *_stream = stream->real_stream;
273 	ssize_t ret;
274 #ifdef DEBUG
275 	unsigned char prev_buf[4];
276 	const unsigned char *prev_data = _stream->buffer;
277 	size_t prev_skip = _stream->skip, prev_pos = _stream->pos;
278 	bool invalid = i_stream_is_buffer_invalid(_stream);
279 
280 	i_assert(prev_skip <= prev_pos);
281 	if (invalid)
282 		;
283 	else if (prev_pos - prev_skip <= 4)
284 		memcpy(prev_buf, prev_data + prev_skip, prev_pos - prev_skip);
285 	else {
286 		memcpy(prev_buf, prev_data + prev_skip, 2);
287 		memcpy(prev_buf+2, prev_data + prev_pos - 2, 2);
288 	}
289 #endif
290 
291 	if (_stream->skip != _stream->pos || _stream->prev_snapshot != NULL) {
292 		_stream->prev_snapshot =
293 			_stream->snapshot(_stream, _stream->prev_snapshot);
294 	}
295 	ret = i_stream_read_memarea(stream);
296 	if (ret > 0)
297 		i_stream_snapshot_free(&_stream->prev_snapshot);
298 #ifdef DEBUG
299 	else if (!invalid) {
300 		i_assert((_stream->pos - _stream->skip) == (prev_pos - prev_skip) ||
301 			 prev_pos == prev_skip);
302 		if (prev_pos - prev_skip <= 4)
303 			i_assert(memcmp(prev_buf, prev_data + prev_skip, prev_pos - prev_skip) == 0);
304 		else {
305 			i_assert(memcmp(prev_buf, prev_data + prev_skip, 2) == 0);
306 			i_assert(memcmp(prev_buf+2, prev_data + prev_pos - 2, 2) == 0);
307 		}
308 	}
309 #endif
310 	return ret;
311 }
312 
i_stream_read_memarea(struct istream * stream)313 ssize_t i_stream_read_memarea(struct istream *stream)
314 {
315 	struct istream_private *_stream = stream->real_stream;
316 	size_t old_size;
317 	ssize_t ret;
318 
319 	if (unlikely(stream->closed || stream->stream_errno != 0)) {
320 		stream->eof = TRUE;
321 		errno = stream->stream_errno;
322 		return -1;
323 	}
324 
325 	stream->eof = FALSE;
326 
327 	if (_stream->parent != NULL)
328 		i_stream_seek(_stream->parent, _stream->parent_expected_offset);
329 
330 	old_size = _stream->pos - _stream->skip;
331 	if (_stream->pos < _stream->high_pos) {
332 		/* we're here because we seeked back within the read buffer. */
333 		ret = _stream->high_pos - _stream->pos;
334 		_stream->pos = _stream->high_pos;
335 		_stream->high_pos = 0;
336 	} else {
337 		_stream->high_pos = 0;
338 		ret = _stream->read(_stream);
339 	}
340 	i_assert(old_size <= _stream->pos - _stream->skip);
341 	switch (ret) {
342 	case -2:
343 		i_assert(_stream->skip != _stream->pos);
344 		break;
345 	case -1:
346 		if (stream->stream_errno != 0) {
347 			/* error handling should be easier if we now just
348 			   assume the stream is now at EOF */
349 			stream->eof = TRUE;
350 			errno = stream->stream_errno;
351 		} else {
352 			i_assert(stream->eof);
353 			i_assert(old_size == _stream->pos - _stream->skip);
354 		}
355 		break;
356 	case 0:
357 		i_assert(!stream->blocking);
358 		break;
359 	default:
360 		i_assert(ret > 0);
361 		i_assert(_stream->skip < _stream->pos);
362 		i_assert((size_t)ret+old_size == _stream->pos - _stream->skip);
363 		_stream->last_read_timeval = ioloop_timeval;
364 		break;
365 	}
366 
367 	if (stream->stream_errno != 0) {
368 		/* error handling should be easier if we now just
369 		   assume the stream is now at EOF. Note that we could get here
370 		   even if read() didn't return -1, although that's a little
371 		   bit sloppy istream implementation. */
372 		stream->eof = TRUE;
373 	}
374 
375 	i_stream_update(_stream);
376 	/* verify that parents' access_counters are valid. the parent's
377 	   i_stream_read() should guarantee this. */
378 	i_assert(!i_stream_is_buffer_invalid(_stream));
379 	return ret;
380 }
381 
i_stream_read_more_memarea(struct istream * stream,const unsigned char ** data_r,size_t * size_r)382 int i_stream_read_more_memarea(struct istream *stream,
383 			       const unsigned char **data_r, size_t *size_r)
384 {
385 	*data_r = i_stream_get_data(stream, size_r);
386 	if (*size_r > 0)
387 		return 1;
388 
389 	int ret = i_stream_read_memarea(stream);
390 	*data_r = i_stream_get_data(stream, size_r);
391 	return ret;
392 }
393 
i_stream_get_last_read_time(struct istream * stream,struct timeval * tv_r)394 void i_stream_get_last_read_time(struct istream *stream, struct timeval *tv_r)
395 {
396 	*tv_r = stream->real_stream->last_read_timeval;
397 }
398 
i_stream_read_copy_from_parent(struct istream * istream)399 ssize_t i_stream_read_copy_from_parent(struct istream *istream)
400 {
401 	struct istream_private *stream = istream->real_stream;
402 	size_t pos;
403 	ssize_t ret;
404 
405 	stream->pos -= stream->skip;
406 	stream->skip = 0;
407 
408 	stream->buffer = i_stream_get_data(stream->parent, &pos);
409 	if (pos > stream->pos)
410 		ret = 0;
411 	else do {
412 		ret = i_stream_read_memarea(stream->parent);
413 		stream->istream.stream_errno = stream->parent->stream_errno;
414 		stream->istream.eof = stream->parent->eof;
415 		stream->buffer = i_stream_get_data(stream->parent, &pos);
416 		/* check again, in case the parent stream had been seeked
417 		   backwards and the previous read() didn't get us far
418 		   enough. */
419 	} while (pos <= stream->pos && ret > 0);
420 	if (ret == -2) {
421 		i_stream_update(stream);
422 		return -2;
423 	}
424 
425 	ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) :
426 		(ret == 0 ? 0 : -1);
427 	stream->pos = pos;
428 	i_assert(ret != -1 || stream->istream.eof ||
429 		 stream->istream.stream_errno != 0);
430 	i_stream_update(stream);
431 	return ret;
432 }
433 
i_stream_free_buffer(struct istream_private * stream)434 void i_stream_free_buffer(struct istream_private *stream)
435 {
436 	if (stream->memarea != NULL) {
437 		memarea_unref(&stream->memarea);
438 		stream->w_buffer = NULL;
439 	} else if (stream->w_buffer != NULL) {
440 		i_free_and_null(stream->w_buffer);
441 	} else {
442 		/* don't know how to free it */
443 		return;
444 	}
445 	stream->buffer_size = 0;
446 }
447 
i_stream_skip(struct istream * stream,uoff_t count)448 void i_stream_skip(struct istream *stream, uoff_t count)
449 {
450 	struct istream_private *_stream = stream->real_stream;
451 	size_t data_size;
452 
453 	data_size = _stream->pos - _stream->skip;
454 	if (count <= data_size) {
455 		/* within buffer */
456 		stream->v_offset += count;
457 		_stream->skip += count;
458 		if (_stream->nonpersistent_buffers &&
459 		    _stream->skip == _stream->pos) {
460 			_stream->skip = _stream->pos = 0;
461 			i_stream_free_buffer(_stream);
462 		}
463 		return;
464 	}
465 
466 	/* have to seek forward */
467 	count -= data_size;
468 	_stream->skip = _stream->pos;
469 	stream->v_offset += data_size;
470 
471 	if (unlikely(stream->closed || stream->stream_errno != 0))
472 		return;
473 
474 	_stream->seek(_stream, stream->v_offset + count, FALSE);
475 }
476 
i_stream_can_optimize_seek(struct istream_private * stream)477 static bool i_stream_can_optimize_seek(struct istream_private *stream)
478 {
479 	if (stream->parent == NULL)
480 		return TRUE;
481 
482 	/* use the fast route only if the parent stream hasn't been changed */
483 	if (stream->access_counter !=
484 	    stream->parent->real_stream->access_counter)
485 		return FALSE;
486 
487 	return i_stream_can_optimize_seek(stream->parent->real_stream);
488 }
489 
i_stream_seek(struct istream * stream,uoff_t v_offset)490 void i_stream_seek(struct istream *stream, uoff_t v_offset)
491 {
492 	struct istream_private *_stream = stream->real_stream;
493 
494 	if (v_offset >= stream->v_offset &&
495 	    i_stream_can_optimize_seek(_stream))
496 		i_stream_skip(stream, v_offset - stream->v_offset);
497 	else {
498 		if (unlikely(stream->closed || stream->stream_errno != 0)) {
499 			stream->eof = TRUE;
500 			return;
501 		}
502 		stream->eof = FALSE;
503 		_stream->seek(_stream, v_offset, FALSE);
504 	}
505 	i_stream_update(_stream);
506 }
507 
i_stream_seek_mark(struct istream * stream,uoff_t v_offset)508 void i_stream_seek_mark(struct istream *stream, uoff_t v_offset)
509 {
510 	struct istream_private *_stream = stream->real_stream;
511 
512 	if (unlikely(stream->closed || stream->stream_errno != 0))
513 		return;
514 
515 	stream->eof = FALSE;
516 	_stream->seek(_stream, v_offset, TRUE);
517 	i_stream_update(_stream);
518 }
519 
i_stream_sync(struct istream * stream)520 void i_stream_sync(struct istream *stream)
521 {
522 	struct istream_private *_stream = stream->real_stream;
523 
524 	if (unlikely(stream->closed || stream->stream_errno != 0))
525 		return;
526 
527 	if (_stream->sync != NULL) {
528 		_stream->sync(_stream);
529 		i_stream_update(_stream);
530 	}
531 }
532 
i_stream_stat(struct istream * stream,bool exact,const struct stat ** st_r)533 int i_stream_stat(struct istream *stream, bool exact, const struct stat **st_r)
534 {
535 	struct istream_private *_stream = stream->real_stream;
536 
537 	if (unlikely(stream->closed || stream->stream_errno != 0))
538 		return -1;
539 
540 	if (_stream->stat(_stream, exact) < 0) {
541 		stream->eof = TRUE;
542 		return -1;
543 	}
544 	*st_r = &_stream->statbuf;
545 	return 0;
546 }
547 
i_stream_get_size(struct istream * stream,bool exact,uoff_t * size_r)548 int i_stream_get_size(struct istream *stream, bool exact, uoff_t *size_r)
549 {
550 	struct istream_private *_stream = stream->real_stream;
551 
552 	if (unlikely(stream->closed || stream->stream_errno != 0))
553 		return -1;
554 
555 	int ret;
556 	if ((ret = _stream->get_size(_stream, exact, size_r)) < 0)
557 		stream->eof = TRUE;
558 	return ret;
559 }
560 
i_stream_have_bytes_left(struct istream * stream)561 bool i_stream_have_bytes_left(struct istream *stream)
562 {
563 	return i_stream_get_data_size(stream) > 0 || !stream->eof;
564 }
565 
i_stream_read_eof(struct istream * stream)566 bool i_stream_read_eof(struct istream *stream)
567 {
568 	if (i_stream_get_data_size(stream) == 0)
569 		(void)i_stream_read(stream);
570 	return !i_stream_have_bytes_left(stream);
571 }
572 
i_stream_get_absolute_offset(struct istream * stream)573 uoff_t i_stream_get_absolute_offset(struct istream *stream)
574 {
575 	uoff_t abs_offset = stream->v_offset;
576 	while (stream != NULL) {
577 		abs_offset += stream->real_stream->start_offset;
578 		stream = stream->real_stream->parent;
579 	}
580 	return abs_offset;
581 }
582 
i_stream_next_line_finish(struct istream_private * stream,size_t i)583 static char *i_stream_next_line_finish(struct istream_private *stream, size_t i)
584 {
585 	char *ret;
586 	size_t end;
587 
588 	if (i > stream->skip && stream->buffer[i-1] == '\r') {
589 		end = i - 1;
590 		stream->line_crlf = TRUE;
591 	} else {
592 		end = i;
593 		stream->line_crlf = FALSE;
594 	}
595 
596 	if (stream->buffer == stream->w_buffer &&
597 	    end < stream->buffer_size) {
598 		/* modify the buffer directly */
599 		stream->w_buffer[end] = '\0';
600 		ret = (char *)stream->w_buffer + stream->skip;
601 	} else {
602 		/* use a temporary string to return it */
603 		if (stream->line_str == NULL)
604 			stream->line_str = str_new(default_pool, 256);
605 		str_truncate(stream->line_str, 0);
606 		if (stream->skip < end)
607 			str_append_data(stream->line_str, stream->buffer + stream->skip,
608 					end - stream->skip);
609 		ret = str_c_modifiable(stream->line_str);
610 	}
611 
612 	if (i < stream->pos)
613 		i++;
614 	stream->istream.v_offset += i - stream->skip;
615 	stream->skip = i;
616 	return ret;
617 }
618 
i_stream_last_line(struct istream_private * _stream)619 static char *i_stream_last_line(struct istream_private *_stream)
620 {
621 	if (_stream->istream.eof && _stream->skip != _stream->pos &&
622 	    _stream->return_nolf_line) {
623 		/* the last line is missing LF and we want to return it. */
624 		return i_stream_next_line_finish(_stream, _stream->pos);
625 	}
626 	return NULL;
627 }
628 
i_stream_next_line(struct istream * stream)629 char *i_stream_next_line(struct istream *stream)
630 {
631 	struct istream_private *_stream = stream->real_stream;
632 	const unsigned char *pos;
633 
634 	if (_stream->skip >= _stream->pos)
635 		return NULL;
636 
637 	pos = memchr(_stream->buffer + _stream->skip, '\n',
638 		     _stream->pos - _stream->skip);
639 	if (pos != NULL) {
640 		return i_stream_next_line_finish(_stream,
641 						 pos - _stream->buffer);
642 	} else {
643 		return i_stream_last_line(_stream);
644 	}
645 }
646 
i_stream_read_next_line(struct istream * stream)647 char *i_stream_read_next_line(struct istream *stream)
648 {
649 	char *line;
650 
651 	for (;;) {
652 		line = i_stream_next_line(stream);
653 		if (line != NULL)
654 			break;
655 
656 		switch (i_stream_read(stream)) {
657 		case -2:
658 			io_stream_set_error(&stream->real_stream->iostream,
659 				"Line is too long (over %zu"
660 				" bytes at offset %"PRIuUOFF_T")",
661 				i_stream_get_data_size(stream), stream->v_offset);
662 			stream->stream_errno = errno = ENOBUFS;
663 			stream->eof = TRUE;
664 			return NULL;
665 		case -1:
666 			return i_stream_last_line(stream->real_stream);
667 		case 0:
668 			return NULL;
669 		}
670 	}
671 	return line;
672 }
673 
i_stream_last_line_crlf(struct istream * stream)674 bool i_stream_last_line_crlf(struct istream *stream)
675 {
676 	return stream->real_stream->line_crlf;
677 }
678 
i_stream_is_buffer_invalid(const struct istream_private * stream)679 static bool i_stream_is_buffer_invalid(const struct istream_private *stream)
680 {
681 	if (stream->parent == NULL) {
682 		/* the buffer can't point to parent, because it doesn't exist */
683 		return FALSE;
684 	}
685 	if (stream->w_buffer != NULL) {
686 		/* we can pretty safely assume that the stream is using its
687 		   own private buffer, so it can never become invalid. */
688 		return FALSE;
689 	}
690 	if (stream->access_counter !=
691 	    stream->parent->real_stream->access_counter) {
692 		/* parent has been modified behind this stream, we can't trust
693 		   that our buffer is valid */
694 		return TRUE;
695 	}
696 	return i_stream_is_buffer_invalid(stream->parent->real_stream);
697 }
698 
699 const unsigned char *
i_stream_get_data(struct istream * stream,size_t * size_r)700 i_stream_get_data(struct istream *stream, size_t *size_r)
701 {
702 	struct istream_private *_stream = stream->real_stream;
703 
704 	if (_stream->skip >= _stream->pos) {
705 		*size_r = 0;
706 		return uchar_empty_ptr;
707 	}
708 
709 	if (unlikely(i_stream_is_buffer_invalid(_stream))) {
710 		/* This stream may be using parent's buffer directly as
711 		   _stream->buffer, but the parent stream has already been
712 		   modified indirectly. This means that the buffer might no
713 		   longer point to where we assume it points to. So we'll
714 		   just return the stream as empty until it's read again.
715 
716 		   It's a bit ugly to suddenly drop data from the stream that
717 		   was already read, but since this happens only with shared
718 		   parent istreams the caller is hopefully aware enough that
719 		   something like this might happen. The other solutions would
720 		   be to a) try to automatically read the data back (but we
721 		   can't handle errors..) or b) always copy data to stream's
722 		   own buffer instead of pointing to parent's buffer (but this
723 		   causes data copying that is nearly always unnecessary). */
724 		*size_r = 0;
725 		/* if we had already read until EOF, mark the stream again as
726 		   not being at the end of file. */
727 		if (stream->stream_errno == 0) {
728 			_stream->skip = _stream->pos = 0;
729 			stream->eof = FALSE;
730 		}
731 		return uchar_empty_ptr;
732 	}
733 
734         *size_r = _stream->pos - _stream->skip;
735         return _stream->buffer + _stream->skip;
736 }
737 
i_stream_get_data_size(struct istream * stream)738 size_t i_stream_get_data_size(struct istream *stream)
739 {
740 	size_t size;
741 
742 	(void)i_stream_get_data(stream, &size);
743 	return size;
744 }
745 
i_stream_get_modifiable_data(struct istream * stream,size_t * size_r)746 unsigned char *i_stream_get_modifiable_data(struct istream *stream,
747 					    size_t *size_r)
748 {
749 	struct istream_private *_stream = stream->real_stream;
750 
751 	if (_stream->skip >= _stream->pos || _stream->w_buffer == NULL) {
752 		*size_r = 0;
753 		return NULL;
754 	}
755 
756         *size_r = _stream->pos - _stream->skip;
757         return _stream->w_buffer + _stream->skip;
758 }
759 
i_stream_read_data(struct istream * stream,const unsigned char ** data_r,size_t * size_r,size_t threshold)760 int i_stream_read_data(struct istream *stream, const unsigned char **data_r,
761 		       size_t *size_r, size_t threshold)
762 {
763 	ssize_t ret = 0;
764 	bool read_more = FALSE;
765 
766 	do {
767 		*data_r = i_stream_get_data(stream, size_r);
768 		if (*size_r > threshold)
769 			return 1;
770 
771 		/* we need more data */
772 		ret = i_stream_read(stream);
773 		if (ret > 0)
774 			read_more = TRUE;
775 	} while (ret > 0);
776 
777 	*data_r = i_stream_get_data(stream, size_r);
778 	if (ret == -2)
779 		return -2;
780 
781 	if (ret == 0) {
782 		/* need to read more */
783 		i_assert(!stream->blocking);
784 		return 0;
785 	}
786 	if (stream->eof) {
787 		if (read_more) {
788 			/* we read at least some new data */
789 			return 0;
790 		}
791 	} else {
792 		i_assert(stream->stream_errno != 0);
793 	}
794 	return -1;
795 }
796 
i_stream_compress(struct istream_private * stream)797 void i_stream_compress(struct istream_private *stream)
798 {
799 	i_assert(stream->memarea == NULL ||
800 		 memarea_get_refcount(stream->memarea) == 1);
801 
802 	if (stream->skip != stream->pos) {
803 		memmove(stream->w_buffer, stream->w_buffer + stream->skip,
804 			stream->pos - stream->skip);
805 	}
806 	stream->pos -= stream->skip;
807 
808 	stream->skip = 0;
809 }
810 
i_stream_w_buffer_free(void * buf)811 static void i_stream_w_buffer_free(void *buf)
812 {
813 	i_free(buf);
814 }
815 
816 static void
i_stream_w_buffer_realloc(struct istream_private * stream,size_t old_size)817 i_stream_w_buffer_realloc(struct istream_private *stream, size_t old_size)
818 {
819 	void *new_buffer;
820 
821 	if (stream->memarea != NULL &&
822 	    memarea_get_refcount(stream->memarea) == 1) {
823 		/* Nobody else is referencing the memarea.
824 		   We can just reallocate it. */
825 		memarea_free_without_callback(&stream->memarea);
826 		new_buffer = i_realloc(stream->w_buffer, old_size,
827 				       stream->buffer_size);
828 	} else {
829 		new_buffer = i_malloc(stream->buffer_size);
830 		if (old_size > 0) {
831 			i_assert(stream->w_buffer != NULL);
832 			memcpy(new_buffer, stream->w_buffer, old_size);
833 		}
834 		if (stream->memarea != NULL)
835 			memarea_unref(&stream->memarea);
836 	}
837 
838 	stream->w_buffer = new_buffer;
839 	stream->buffer = new_buffer;
840 
841 	stream->memarea = memarea_init(stream->w_buffer, stream->buffer_size,
842 				       i_stream_w_buffer_free, new_buffer);
843 }
844 
i_stream_grow_buffer(struct istream_private * stream,size_t bytes)845 void i_stream_grow_buffer(struct istream_private *stream, size_t bytes)
846 {
847 	size_t old_size, max_size;
848 
849 	old_size = stream->buffer_size;
850 
851 	stream->buffer_size = stream->pos + bytes;
852 	if (stream->buffer_size <= stream->init_buffer_size)
853 		stream->buffer_size = stream->init_buffer_size;
854 	else
855 		stream->buffer_size = nearest_power(stream->buffer_size);
856 
857 	max_size = i_stream_get_max_buffer_size(&stream->istream);
858 	i_assert(max_size > 0);
859 	if (stream->buffer_size > max_size)
860 		stream->buffer_size = max_size;
861 
862 	if (stream->buffer_size <= old_size)
863 		stream->buffer_size = old_size;
864 	else
865 		i_stream_w_buffer_realloc(stream, old_size);
866 }
867 
i_stream_try_alloc(struct istream_private * stream,size_t wanted_size,size_t * size_r)868 bool i_stream_try_alloc(struct istream_private *stream,
869 			size_t wanted_size, size_t *size_r)
870 {
871 	i_assert(wanted_size > 0);
872 
873 	if (wanted_size > stream->buffer_size - stream->pos) {
874 		if (stream->skip > 0) {
875 			/* remove the unused bytes from beginning of buffer */
876 			if (stream->memarea != NULL &&
877 			    memarea_get_refcount(stream->memarea) > 1) {
878 				/* The memarea is still referenced. We can't
879 				   overwrite data until extra references are
880 				   gone. */
881 				i_stream_w_buffer_realloc(stream, stream->buffer_size);
882 			}
883 			i_stream_compress(stream);
884 		} else if (stream->buffer_size < i_stream_get_max_buffer_size(&stream->istream)) {
885 			/* buffer is full - grow it */
886 			i_stream_grow_buffer(stream, I_STREAM_MIN_SIZE);
887 		}
888 	}
889 
890 	*size_r = stream->buffer_size - stream->pos;
891 	if (stream->try_alloc_limit > 0 &&
892 	    *size_r > stream->try_alloc_limit)
893 		*size_r = stream->try_alloc_limit;
894 	return *size_r > 0;
895 }
896 
897 bool ATTR_NOWARN_UNUSED_RESULT
i_stream_try_alloc_avoid_compress(struct istream_private * stream,size_t wanted_size,size_t * size_r)898 i_stream_try_alloc_avoid_compress(struct istream_private *stream,
899 				  size_t wanted_size, size_t *size_r)
900 {
901 	size_t old_skip = stream->skip;
902 
903 	/* try first with skip=0, so no compression is done */
904 	stream->skip = 0;
905 	bool ret = i_stream_try_alloc(stream, wanted_size, size_r);
906 	stream->skip = old_skip;
907 	if (ret || old_skip == 0)
908 		return ret;
909 	/* it's full. try with compression. */
910 	return i_stream_try_alloc(stream, wanted_size, size_r);
911 }
912 
i_stream_alloc(struct istream_private * stream,size_t size)913 void *i_stream_alloc(struct istream_private *stream, size_t size)
914 {
915 	size_t old_size, avail_size;
916 
917 	(void)i_stream_try_alloc(stream, size, &avail_size);
918 	if (avail_size < size) {
919 		old_size = stream->buffer_size;
920 		stream->buffer_size = nearest_power(stream->pos + size);
921 		i_stream_w_buffer_realloc(stream, old_size);
922 
923 		(void)i_stream_try_alloc(stream, size, &avail_size);
924 		i_assert(avail_size >= size);
925 	}
926 	return stream->w_buffer + stream->pos;
927 }
928 
i_stream_add_data(struct istream * _stream,const unsigned char * data,size_t size)929 bool i_stream_add_data(struct istream *_stream, const unsigned char *data,
930 		       size_t size)
931 {
932 	struct istream_private *stream = _stream->real_stream;
933 	size_t size2;
934 
935 	(void)i_stream_try_alloc(stream, size, &size2);
936 	if (size > size2)
937 		return FALSE;
938 
939 	memcpy(stream->w_buffer + stream->pos, data, size);
940 	stream->pos += size;
941 	return TRUE;
942 }
943 
i_stream_get_root_io(struct istream * stream)944 struct istream *i_stream_get_root_io(struct istream *stream)
945 {
946 	while (stream->real_stream->parent != NULL) {
947 		i_assert(stream->real_stream->io == NULL);
948 		stream = stream->real_stream->parent;
949 	}
950 	return stream;
951 }
952 
i_stream_set_input_pending(struct istream * stream,bool pending)953 void i_stream_set_input_pending(struct istream *stream, bool pending)
954 {
955 	if (!pending)
956 		return;
957 
958 	stream = i_stream_get_root_io(stream);
959 	if (stream->real_stream->io != NULL)
960 		io_set_pending(stream->real_stream->io);
961 	else
962 		stream->real_stream->io_pending = TRUE;
963 }
964 
i_stream_switch_ioloop_to(struct istream * stream,struct ioloop * ioloop)965 void i_stream_switch_ioloop_to(struct istream *stream, struct ioloop *ioloop)
966 {
967 	io_stream_switch_ioloop_to(&stream->real_stream->iostream, ioloop);
968 
969 	do {
970 		if (stream->real_stream->switch_ioloop_to != NULL) {
971 			stream->real_stream->switch_ioloop_to(
972 				stream->real_stream, ioloop);
973 		}
974 		stream = stream->real_stream->parent;
975 	} while (stream != NULL);
976 }
977 
i_stream_switch_ioloop(struct istream * stream)978 void i_stream_switch_ioloop(struct istream *stream)
979 {
980 	i_stream_switch_ioloop_to(stream, current_ioloop);
981 }
982 
i_stream_set_io(struct istream * stream,struct io * io)983 void i_stream_set_io(struct istream *stream, struct io *io)
984 {
985 	stream = i_stream_get_root_io(stream);
986 
987 	i_assert(stream->real_stream->io == NULL);
988 	stream->real_stream->io = io;
989 	if (stream->real_stream->io_pending) {
990 		io_set_pending(io);
991 		stream->real_stream->io_pending = FALSE;
992 	}
993 }
994 
i_stream_unset_io(struct istream * stream,struct io * io)995 void i_stream_unset_io(struct istream *stream, struct io *io)
996 {
997 	stream = i_stream_get_root_io(stream);
998 
999 	i_assert(stream->real_stream->io == io);
1000 	if (io_is_pending(io))
1001 		stream->real_stream->io_pending = TRUE;
1002 	stream->real_stream->io = NULL;
1003 }
1004 
1005 static void
i_stream_default_set_max_buffer_size(struct iostream_private * stream,size_t max_size)1006 i_stream_default_set_max_buffer_size(struct iostream_private *stream,
1007 				     size_t max_size)
1008 {
1009 	struct istream_private *_stream =
1010 		container_of(stream, struct istream_private, iostream);
1011 
1012 	_stream->max_buffer_size = max_size;
1013 	if (_stream->parent != NULL)
1014 		i_stream_set_max_buffer_size(_stream->parent, max_size);
1015 }
1016 
i_stream_default_close(struct iostream_private * stream,bool close_parent)1017 static void i_stream_default_close(struct iostream_private *stream,
1018 				   bool close_parent)
1019 {
1020 	struct istream_private *_stream =
1021 		container_of(stream, struct istream_private, iostream);
1022 
1023 	if (close_parent)
1024 		i_stream_close(_stream->parent);
1025 }
1026 
i_stream_default_destroy(struct iostream_private * stream)1027 static void i_stream_default_destroy(struct iostream_private *stream)
1028 {
1029 	struct istream_private *_stream =
1030 		container_of(stream, struct istream_private, iostream);
1031 
1032 	i_stream_free_buffer(_stream);
1033 	i_stream_unref(&_stream->parent);
1034 }
1035 
1036 static void
i_stream_default_seek_seekable(struct istream_private * stream,uoff_t v_offset,bool mark ATTR_UNUSED)1037 i_stream_default_seek_seekable(struct istream_private *stream,
1038 			       uoff_t v_offset, bool mark ATTR_UNUSED)
1039 {
1040 	stream->istream.v_offset = v_offset;
1041 	stream->skip = stream->pos = 0;
1042 }
1043 
i_stream_default_seek_nonseekable(struct istream_private * stream,uoff_t v_offset,bool mark ATTR_UNUSED)1044 void i_stream_default_seek_nonseekable(struct istream_private *stream,
1045 				       uoff_t v_offset, bool mark ATTR_UNUSED)
1046 {
1047 	size_t available;
1048 
1049 	if (stream->istream.v_offset > v_offset)
1050 		i_panic("stream %s doesn't support seeking backwards",
1051 			i_stream_get_name(&stream->istream));
1052 
1053 	while (stream->istream.v_offset < v_offset) {
1054 		(void)i_stream_read(&stream->istream);
1055 
1056 		available = stream->pos - stream->skip;
1057 		if (available == 0) {
1058 			if (stream->istream.stream_errno != 0) {
1059 				/* read failed */
1060 				return;
1061 			}
1062 			io_stream_set_error(&stream->iostream,
1063 				"Can't seek to offset %"PRIuUOFF_T
1064 				", because we have data only up to offset %"
1065 				PRIuUOFF_T" (eof=%d)", v_offset,
1066 				stream->istream.v_offset, stream->istream.eof ? 1 : 0);
1067 			stream->istream.stream_errno = ESPIPE;
1068 			return;
1069 		}
1070 		if (available <= v_offset - stream->istream.v_offset)
1071 			i_stream_skip(&stream->istream, available);
1072 		else {
1073 			i_stream_skip(&stream->istream,
1074 				      v_offset - stream->istream.v_offset);
1075 		}
1076 	}
1077 }
1078 
i_stream_nonseekable_try_seek(struct istream_private * stream,uoff_t v_offset)1079 bool i_stream_nonseekable_try_seek(struct istream_private *stream,
1080 				   uoff_t v_offset)
1081 {
1082 	uoff_t start_offset = stream->istream.v_offset - stream->skip;
1083 
1084 	if (v_offset < start_offset) {
1085 		/* have to seek backwards */
1086 		i_stream_seek(stream->parent, stream->parent_start_offset);
1087 		stream->parent_expected_offset = stream->parent_start_offset;
1088 		stream->skip = stream->pos = 0;
1089 		stream->istream.v_offset = 0;
1090 		stream->high_pos = 0;
1091 		return FALSE;
1092 	}
1093 
1094 	if (v_offset <= start_offset + stream->pos) {
1095 		/* seeking backwards within what's already cached */
1096 		stream->skip = v_offset - start_offset;
1097 		stream->istream.v_offset = v_offset;
1098 		if (stream->high_pos == 0)
1099 			stream->high_pos = stream->pos;
1100 		stream->pos = stream->skip;
1101 	} else {
1102 		/* read forward */
1103 		i_stream_default_seek_nonseekable(stream, v_offset, FALSE);
1104 	}
1105 	return TRUE;
1106 }
1107 
1108 static int
seekable_i_stream_get_size(struct istream_private * stream)1109 seekable_i_stream_get_size(struct istream_private *stream)
1110 {
1111 	if (stream->cached_stream_size == UOFF_T_MAX) {
1112 		uoff_t old_offset = stream->istream.v_offset;
1113 		ssize_t ret;
1114 
1115 		do {
1116 			i_stream_skip(&stream->istream,
1117 				i_stream_get_data_size(&stream->istream));
1118 		} while ((ret = i_stream_read(&stream->istream)) > 0);
1119 		i_assert(ret == -1);
1120 		if (stream->istream.stream_errno != 0)
1121 			return -1;
1122 
1123 		stream->cached_stream_size = stream->istream.v_offset;
1124 		i_stream_seek(&stream->istream, old_offset);
1125 	}
1126 	stream->statbuf.st_size = stream->cached_stream_size;
1127 	return 0;
1128 }
1129 
1130 static int
i_stream_default_stat(struct istream_private * stream,bool exact)1131 i_stream_default_stat(struct istream_private *stream, bool exact)
1132 {
1133 	const struct stat *st;
1134 
1135 	if (stream->parent == NULL)
1136 		return stream->istream.stream_errno == 0 ? 0 : -1;
1137 
1138 	if (i_stream_stat(stream->parent, exact, &st) < 0) {
1139 		stream->istream.stream_errno = stream->parent->stream_errno;
1140 		return -1;
1141 	}
1142 	stream->statbuf = *st;
1143 	if (exact && !stream->stream_size_passthrough) {
1144 		/* exact size is not known, even if parent returned something */
1145 		stream->statbuf.st_size = -1;
1146 		if (stream->istream.seekable) {
1147 			if (seekable_i_stream_get_size(stream) < 0)
1148 				return -1;
1149 		}
1150 	} else {
1151 		/* When exact=FALSE always return the parent stat's size, even
1152 		   if we know the exact value. This is necessary because
1153 		   otherwise e.g. mbox code can see two different values and
1154 		   think that the mbox file keeps changing. */
1155 	}
1156 	return 0;
1157 }
1158 
1159 static int
i_stream_default_get_size(struct istream_private * stream,bool exact,uoff_t * size_r)1160 i_stream_default_get_size(struct istream_private *stream,
1161 			  bool exact, uoff_t *size_r)
1162 {
1163 	if (stream->stat(stream, exact) < 0)
1164 		return -1;
1165 	if (stream->statbuf.st_size == -1)
1166 		return 0;
1167 
1168 	*size_r = stream->statbuf.st_size;
1169 	return 1;
1170 }
1171 
i_stream_init_parent(struct istream_private * _stream,struct istream * parent)1172 void i_stream_init_parent(struct istream_private *_stream,
1173 			  struct istream *parent)
1174 {
1175 	_stream->access_counter = parent->real_stream->access_counter;
1176 	_stream->parent = parent;
1177 	_stream->parent_start_offset = parent->v_offset;
1178 	_stream->parent_expected_offset = parent->v_offset;
1179 	_stream->start_offset = parent->v_offset;
1180 	/* if parent stream is an istream-error, copy the error */
1181 	_stream->istream.stream_errno = parent->stream_errno;
1182 	_stream->istream.eof = parent->eof;
1183 	i_stream_ref(parent);
1184 }
1185 
1186 struct istream *
i_stream_create(struct istream_private * _stream,struct istream * parent,int fd,enum istream_create_flag flags)1187 i_stream_create(struct istream_private *_stream, struct istream *parent, int fd,
1188 		enum istream_create_flag flags)
1189 {
1190 	bool noop_snapshot = (flags & ISTREAM_CREATE_FLAG_NOOP_SNAPSHOT) != 0;
1191 
1192 	_stream->fd = fd;
1193 	if (parent != NULL)
1194 		i_stream_init_parent(_stream, parent);
1195 	else if (_stream->memarea == NULL && !noop_snapshot) {
1196 		/* The stream has no parent and no memarea yet. We'll assume
1197 		   that it wants to be using memareas for the reads. */
1198 		_stream->memarea = memarea_init_empty();
1199 	}
1200 	_stream->istream.real_stream = _stream;
1201 
1202 	if (_stream->iostream.close == NULL)
1203 		_stream->iostream.close = i_stream_default_close;
1204 	if (_stream->iostream.destroy == NULL)
1205 		_stream->iostream.destroy = i_stream_default_destroy;
1206 	if (_stream->seek == NULL) {
1207 		_stream->seek = _stream->istream.seekable ?
1208 			i_stream_default_seek_seekable :
1209 			i_stream_default_seek_nonseekable;
1210 	}
1211 	if (_stream->stat == NULL)
1212 		_stream->stat = i_stream_default_stat;
1213 	if (_stream->get_size == NULL)
1214 		_stream->get_size = i_stream_default_get_size;
1215 	if (_stream->snapshot == NULL) {
1216 		_stream->snapshot = noop_snapshot ?
1217 			i_stream_noop_snapshot :
1218 			i_stream_default_snapshot;
1219 	}
1220 	if (_stream->iostream.set_max_buffer_size == NULL) {
1221 		_stream->iostream.set_max_buffer_size =
1222 			i_stream_default_set_max_buffer_size;
1223 	}
1224 	if (_stream->init_buffer_size == 0)
1225 		_stream->init_buffer_size = I_STREAM_MIN_SIZE;
1226 
1227 	i_zero(&_stream->statbuf);
1228 	_stream->statbuf.st_size = -1;
1229 	_stream->statbuf.st_atime =
1230 		_stream->statbuf.st_mtime =
1231 		_stream->statbuf.st_ctime = ioloop_time;
1232 	_stream->cached_stream_size = UOFF_T_MAX;
1233 
1234 	io_stream_init(&_stream->iostream);
1235 
1236 	if (_stream->istream.stream_errno != 0)
1237 		_stream->istream.eof = TRUE;
1238 
1239 	return &_stream->istream;
1240 }
1241 
i_stream_create_error(int stream_errno)1242 struct istream *i_stream_create_error(int stream_errno)
1243 {
1244 	struct istream_private *stream;
1245 
1246 	stream = i_new(struct istream_private, 1);
1247 	stream->istream.closed = TRUE;
1248 	stream->istream.readable_fd = FALSE;
1249 	stream->istream.blocking = TRUE;
1250 	stream->istream.seekable = TRUE;
1251 	stream->istream.eof = TRUE;
1252 	stream->istream.stream_errno = stream_errno;
1253 	/* Nothing can ever actually be read from this stream, but set a
1254 	   reasonable max_buffer_size anyway since some filter istreams don't
1255 	   behave properly otherwise. */
1256 	stream->max_buffer_size = IO_BLOCK_SIZE;
1257 	i_stream_create(stream, NULL, -1, 0);
1258 	i_stream_set_name(&stream->istream, "(error)");
1259 	return &stream->istream;
1260 }
1261 
1262 struct istream *
i_stream_create_error_str(int stream_errno,const char * fmt,...)1263 i_stream_create_error_str(int stream_errno, const char *fmt, ...)
1264 {
1265 	struct istream *input;
1266 	va_list args;
1267 
1268 	va_start(args, fmt);
1269 	input = i_stream_create_error(stream_errno);
1270 	io_stream_set_verror(&input->real_stream->iostream, fmt, args);
1271 	va_end(args);
1272 	return input;
1273 }
1274