1 /* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "lib.h"
4 #include "istream.h"
5 #include "ostream-private.h"
6 
o_stream_set_name(struct ostream * stream,const char * name)7 void o_stream_set_name(struct ostream *stream, const char *name)
8 {
9 	i_free(stream->real_stream->iostream.name);
10 	stream->real_stream->iostream.name = i_strdup(name);
11 }
12 
o_stream_get_name(struct ostream * stream)13 const char *o_stream_get_name(struct ostream *stream)
14 {
15 	while (stream->real_stream->iostream.name == NULL) {
16 		stream = stream->real_stream->parent;
17 		if (stream == NULL)
18 			return "";
19 	}
20 	return stream->real_stream->iostream.name;
21 }
22 
o_stream_get_fd(struct ostream * stream)23 int o_stream_get_fd(struct ostream *stream)
24 {
25 	return stream->real_stream->fd;
26 }
27 
o_stream_get_error(struct ostream * stream)28 const char *o_stream_get_error(struct ostream *stream)
29 {
30 	struct ostream *s;
31 
32 	/* we'll only return errors for streams that have stream_errno set.
33 	   we might be returning unintended error otherwise. */
34 	if (stream->stream_errno == 0)
35 		return "<no error>";
36 
37 	for (s = stream; s != NULL; s = s->real_stream->parent) {
38 		if (s->stream_errno == 0)
39 			break;
40 		if (s->real_stream->iostream.error != NULL)
41 			return s->real_stream->iostream.error;
42 	}
43 	return strerror(stream->stream_errno);
44 }
45 
o_stream_get_disconnect_reason(struct ostream * stream)46 const char *o_stream_get_disconnect_reason(struct ostream *stream)
47 {
48 	return io_stream_get_disconnect_reason(NULL, stream);
49 }
50 
o_stream_close_full(struct ostream * stream,bool close_parents)51 static void o_stream_close_full(struct ostream *stream, bool close_parents)
52 {
53 	/* Ideally o_stream_finish() would be called for all non-failed
54 	   ostreams, but strictly requiring it would cause unnecessary
55 	   complexity for many callers. Just require that at this point
56 	   after flushing there isn't anything in the output buffer or that
57 	   we're ignoring all errors. */
58 	if (o_stream_flush(stream) == 0)
59 		i_assert(stream->real_stream->error_handling_disabled);
60 
61 	if (!stream->closed && !stream->real_stream->closing) {
62 		/* first mark the stream as being closed so the
63 		   o_stream_copy_error_from_parent() won't recurse us back
64 		   here. but don't immediately mark the stream closed, because
65 		   we may still want to write something to it. */
66 		stream->real_stream->closing = TRUE;
67 		io_stream_close(&stream->real_stream->iostream, close_parents);
68 		stream->closed = TRUE;
69 	}
70 
71 	if (stream->stream_errno == 0)
72 		stream->stream_errno = EPIPE;
73 }
74 
o_stream_destroy(struct ostream ** _stream)75 void o_stream_destroy(struct ostream **_stream)
76 {
77 	struct ostream *stream = *_stream;
78 
79 	if (stream == NULL)
80 		return;
81 
82 	*_stream = NULL;
83 	o_stream_close_full(stream, FALSE);
84 	o_stream_unref(&stream);
85 }
86 
o_stream_ref(struct ostream * stream)87 void o_stream_ref(struct ostream *stream)
88 {
89 	io_stream_ref(&stream->real_stream->iostream);
90 }
91 
o_stream_unref(struct ostream ** _stream)92 void o_stream_unref(struct ostream **_stream)
93 {
94 	struct ostream *stream;
95 
96 	if (*_stream == NULL)
97 		return;
98 
99 	stream = *_stream;
100 
101 	if (stream->real_stream->last_errors_not_checked &&
102 	    !stream->real_stream->error_handling_disabled &&
103 	    stream->real_stream->iostream.refcount == 1) {
104 		i_panic("output stream %s is missing error handling",
105 			o_stream_get_name(stream));
106 	}
107 
108 	if (!io_stream_unref(&stream->real_stream->iostream))
109 		io_stream_free(&stream->real_stream->iostream);
110 	*_stream = NULL;
111 }
112 
113 #undef o_stream_add_destroy_callback
o_stream_add_destroy_callback(struct ostream * stream,ostream_callback_t * callback,void * context)114 void o_stream_add_destroy_callback(struct ostream *stream,
115 				   ostream_callback_t *callback, void *context)
116 {
117 	io_stream_add_destroy_callback(&stream->real_stream->iostream,
118 				       callback, context);
119 }
120 
o_stream_remove_destroy_callback(struct ostream * stream,void (* callback)())121 void o_stream_remove_destroy_callback(struct ostream *stream,
122 				      void (*callback)())
123 {
124 	io_stream_remove_destroy_callback(&stream->real_stream->iostream,
125 					  callback);
126 }
127 
o_stream_close(struct ostream * stream)128 void o_stream_close(struct ostream *stream)
129 {
130 	if (stream != NULL)
131 		o_stream_close_full(stream, TRUE);
132 }
133 
134 #undef o_stream_set_flush_callback
o_stream_set_flush_callback(struct ostream * stream,stream_flush_callback_t * callback,void * context)135 void o_stream_set_flush_callback(struct ostream *stream,
136 				 stream_flush_callback_t *callback,
137 				 void *context)
138 {
139 	struct ostream_private *_stream = stream->real_stream;
140 
141 	_stream->set_flush_callback(_stream, callback, context);
142 }
143 
o_stream_unset_flush_callback(struct ostream * stream)144 void o_stream_unset_flush_callback(struct ostream *stream)
145 {
146 	struct ostream_private *_stream = stream->real_stream;
147 
148 	_stream->set_flush_callback(_stream, NULL, NULL);
149 }
150 
o_stream_set_max_buffer_size(struct ostream * stream,size_t max_size)151 void o_stream_set_max_buffer_size(struct ostream *stream, size_t max_size)
152 {
153 	io_stream_set_max_buffer_size(&stream->real_stream->iostream, max_size);
154 }
155 
o_stream_get_max_buffer_size(struct ostream * stream)156 size_t o_stream_get_max_buffer_size(struct ostream *stream)
157 {
158 	return stream->real_stream->max_buffer_size;
159 }
160 
o_stream_cork(struct ostream * stream)161 void o_stream_cork(struct ostream *stream)
162 {
163 	struct ostream_private *_stream = stream->real_stream;
164 
165 	if (unlikely(stream->closed || stream->stream_errno != 0))
166 		return;
167 
168 	_stream->cork(_stream, TRUE);
169 }
170 
o_stream_uncork(struct ostream * stream)171 void o_stream_uncork(struct ostream *stream)
172 {
173 	struct ostream_private *_stream = stream->real_stream;
174 
175 	if (unlikely(stream->closed || stream->stream_errno != 0))
176 		return;
177 
178 	_stream->cork(_stream, FALSE);
179 }
180 
o_stream_is_corked(struct ostream * stream)181 bool o_stream_is_corked(struct ostream *stream)
182 {
183 	struct ostream_private *_stream = stream->real_stream;
184 
185 	return _stream->corked;
186 }
187 
o_stream_flush(struct ostream * stream)188 int o_stream_flush(struct ostream *stream)
189 {
190 	struct ostream_private *_stream = stream->real_stream;
191 	int ret = 1;
192 
193 	o_stream_ignore_last_errors(stream);
194 
195 	if (unlikely(stream->closed || stream->stream_errno != 0)) {
196 		errno = stream->stream_errno;
197 		return -1;
198 	}
199 
200 	if (unlikely(_stream->noverflow)) {
201 		io_stream_set_error(&_stream->iostream,
202 			"Output stream buffer was full (%zu bytes)",
203 			o_stream_get_max_buffer_size(stream));
204 		errno = stream->stream_errno = ENOBUFS;
205 		return -1;
206 	}
207 
208 	if (unlikely((ret = _stream->flush(_stream)) < 0)) {
209 		i_assert(stream->stream_errno != 0);
210 		errno = stream->stream_errno;
211 	}
212 	return ret;
213 }
214 
o_stream_set_flush_pending(struct ostream * stream,bool set)215 void o_stream_set_flush_pending(struct ostream *stream, bool set)
216 {
217 	struct ostream_private *_stream = stream->real_stream;
218 
219 	if (unlikely(stream->closed || stream->stream_errno != 0))
220 		return;
221 
222 	_stream->flush_pending(_stream, set);
223 }
224 
o_stream_get_buffer_used_size(const struct ostream * stream)225 size_t o_stream_get_buffer_used_size(const struct ostream *stream)
226 {
227 	const struct ostream_private *_stream = stream->real_stream;
228 
229 	return _stream->get_buffer_used_size(_stream);
230 }
231 
o_stream_get_buffer_avail_size(const struct ostream * stream)232 size_t o_stream_get_buffer_avail_size(const struct ostream *stream)
233 {
234 	const struct ostream_private *_stream = stream->real_stream;
235 
236 	return _stream->get_buffer_avail_size(_stream);
237 }
238 
o_stream_seek(struct ostream * stream,uoff_t offset)239 int o_stream_seek(struct ostream *stream, uoff_t offset)
240 {
241 	struct ostream_private *_stream = stream->real_stream;
242 
243 	if (unlikely(stream->closed || stream->stream_errno != 0)) {
244 		errno = stream->stream_errno;
245 		return -1;
246 	}
247 
248 	if (unlikely(_stream->seek(_stream, offset) < 0)) {
249 		i_assert(stream->stream_errno != 0);
250 		errno = stream->stream_errno;
251 		return -1;
252 	}
253 	return 1;
254 }
255 
o_stream_send(struct ostream * stream,const void * data,size_t size)256 ssize_t o_stream_send(struct ostream *stream, const void *data, size_t size)
257 {
258 	struct const_iovec iov;
259 
260 	i_zero(&iov);
261 	iov.iov_base = data;
262 	iov.iov_len = size;
263 
264  	return o_stream_sendv(stream, &iov, 1);
265 }
266 
267 static ssize_t
o_stream_sendv_int(struct ostream * stream,const struct const_iovec * iov,unsigned int iov_count,bool * overflow_r)268 o_stream_sendv_int(struct ostream *stream, const struct const_iovec *iov,
269 		   unsigned int iov_count, bool *overflow_r)
270 {
271 	struct ostream_private *_stream = stream->real_stream;
272 	unsigned int i;
273 	size_t total_size;
274 	ssize_t ret;
275 
276 	*overflow_r = FALSE;
277 
278 	for (i = 0, total_size = 0; i < iov_count; i++)
279 		total_size += iov[i].iov_len;
280 	if (total_size == 0)
281 		return 0;
282 
283 	i_assert(!_stream->finished);
284 	ret = _stream->sendv(_stream, iov, iov_count);
285 	if (ret > 0)
286 		stream->real_stream->last_write_timeval = ioloop_timeval;
287 	if (unlikely(ret != (ssize_t)total_size)) {
288 		if (ret < 0) {
289 			i_assert(stream->stream_errno != 0);
290 			errno = stream->stream_errno;
291 		} else {
292 			i_assert(!stream->blocking);
293 			stream->overflow = TRUE;
294 			*overflow_r = TRUE;
295 		}
296 	}
297 	return ret;
298 }
299 
o_stream_sendv(struct ostream * stream,const struct const_iovec * iov,unsigned int iov_count)300 ssize_t o_stream_sendv(struct ostream *stream, const struct const_iovec *iov,
301 		       unsigned int iov_count)
302 {
303 	bool overflow;
304 
305 	if (unlikely(stream->closed || stream->stream_errno != 0)) {
306 		errno = stream->stream_errno;
307 		return -1;
308 	}
309 	return o_stream_sendv_int(stream, iov, iov_count, &overflow);
310 }
311 
o_stream_send_str(struct ostream * stream,const char * str)312 ssize_t o_stream_send_str(struct ostream *stream, const char *str)
313 {
314 	return o_stream_send(stream, str, strlen(str));
315 }
316 
o_stream_nsend(struct ostream * stream,const void * data,size_t size)317 void o_stream_nsend(struct ostream *stream, const void *data, size_t size)
318 {
319 	struct const_iovec iov;
320 
321 	i_zero(&iov);
322 	iov.iov_base = data;
323 	iov.iov_len = size;
324 
325 	o_stream_nsendv(stream, &iov, 1);
326 }
327 
o_stream_nsendv(struct ostream * stream,const struct const_iovec * iov,unsigned int iov_count)328 void o_stream_nsendv(struct ostream *stream, const struct const_iovec *iov,
329 		     unsigned int iov_count)
330 {
331 	bool overflow;
332 
333 	if (unlikely(stream->closed || stream->stream_errno != 0 ||
334 		     stream->real_stream->noverflow))
335 		return;
336 	(void)o_stream_sendv_int(stream, iov, iov_count, &overflow);
337 	if (overflow)
338 		stream->real_stream->noverflow = TRUE;
339 	stream->real_stream->last_errors_not_checked = TRUE;
340 }
341 
o_stream_nsend_str(struct ostream * stream,const char * str)342 void o_stream_nsend_str(struct ostream *stream, const char *str)
343 {
344 	o_stream_nsend(stream, str, strlen(str));
345 }
346 
o_stream_finish(struct ostream * stream)347 int o_stream_finish(struct ostream *stream)
348 {
349 	stream->real_stream->finished = TRUE;
350 	return o_stream_flush(stream);
351 }
352 
o_stream_set_finish_also_parent(struct ostream * stream,bool set)353 void o_stream_set_finish_also_parent(struct ostream *stream, bool set)
354 {
355 	stream->real_stream->finish_also_parent = set;
356 }
357 
o_stream_set_finish_via_child(struct ostream * stream,bool set)358 void o_stream_set_finish_via_child(struct ostream *stream, bool set)
359 {
360 	stream->real_stream->finish_via_child = set;
361 }
362 
o_stream_ignore_last_errors(struct ostream * stream)363 void o_stream_ignore_last_errors(struct ostream *stream)
364 {
365 	while (stream != NULL) {
366 		stream->real_stream->last_errors_not_checked = FALSE;
367 		stream = stream->real_stream->parent;
368 	}
369 }
370 
o_stream_abort(struct ostream * stream)371 void o_stream_abort(struct ostream *stream)
372 {
373 	o_stream_ignore_last_errors(stream);
374 	if (stream->stream_errno != 0)
375 		return;
376 	io_stream_set_error(&stream->real_stream->iostream, "aborted writing");
377 	stream->stream_errno = EPIPE;
378 }
379 
o_stream_set_no_error_handling(struct ostream * stream,bool set)380 void o_stream_set_no_error_handling(struct ostream *stream, bool set)
381 {
382 	stream->real_stream->error_handling_disabled = set;
383 }
384 
385 enum ostream_send_istream_result
o_stream_send_istream(struct ostream * outstream,struct istream * instream)386 o_stream_send_istream(struct ostream *outstream, struct istream *instream)
387 {
388 	struct ostream_private *_outstream = outstream->real_stream;
389 	uoff_t old_outstream_offset = outstream->offset;
390 	uoff_t old_instream_offset = instream->v_offset;
391 	enum ostream_send_istream_result res;
392 
393 	if (unlikely(instream->closed || instream->stream_errno != 0)) {
394 		errno = instream->stream_errno;
395 		return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
396 	}
397 	if (unlikely(outstream->closed || outstream->stream_errno != 0)) {
398 		errno = outstream->stream_errno;
399 		return OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
400 	}
401 
402 	i_assert(!_outstream->finished);
403 	res = _outstream->send_istream(_outstream, instream);
404 	switch (res) {
405 	case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
406 		i_assert(instream->stream_errno == 0);
407 		i_assert(outstream->stream_errno == 0);
408 		i_assert(!i_stream_have_bytes_left(instream));
409 		break;
410 	case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
411 		i_assert(!instream->blocking);
412 		break;
413 	case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
414 		i_assert(!outstream->blocking);
415 		o_stream_set_flush_pending(outstream, TRUE);
416 		break;
417 	case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
418 		i_assert(instream->stream_errno != 0);
419 		return res;
420 	case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
421 		i_assert(outstream->stream_errno != 0);
422 		return res;
423 	}
424 	/* non-failure - make sure stream offsets match */
425 	i_assert((outstream->offset - old_outstream_offset) ==
426 		 (instream->v_offset - old_instream_offset));
427 
428 	if (outstream->offset != old_outstream_offset)
429 		outstream->real_stream->last_write_timeval = ioloop_timeval;
430 	return res;
431 }
432 
o_stream_nsend_istream(struct ostream * outstream,struct istream * instream)433 void o_stream_nsend_istream(struct ostream *outstream, struct istream *instream)
434 {
435 	i_assert(instream->blocking);
436 
437 	switch (o_stream_send_istream(outstream, instream)) {
438 	case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
439 		break;
440 	case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
441 		i_unreached();
442 	case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
443 		outstream->real_stream->noverflow = TRUE;
444 		break;
445 	case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
446 		outstream->stream_errno = instream->stream_errno;
447 		io_stream_set_error(&outstream->real_stream->iostream,
448 			"nsend-istream: read(%s) failed: %s",
449 			i_stream_get_name(instream),
450 			i_stream_get_error(instream));
451 		break;
452 	case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
453 		break;
454 	}
455 	outstream->real_stream->last_errors_not_checked = TRUE;
456 }
457 
o_stream_pwrite(struct ostream * stream,const void * data,size_t size,uoff_t offset)458 int o_stream_pwrite(struct ostream *stream, const void *data, size_t size,
459 		    uoff_t offset)
460 {
461 	int ret;
462 
463 	if (unlikely(stream->closed || stream->stream_errno != 0)) {
464 		errno = stream->stream_errno;
465 		return -1;
466 	}
467 
468 	i_assert(!stream->real_stream->finished);
469 	ret = stream->real_stream->write_at(stream->real_stream,
470 					    data, size, offset);
471 	if (ret > 0)
472 		stream->real_stream->last_write_timeval = ioloop_timeval;
473 	else if (unlikely(ret < 0)) {
474 		i_assert(stream->stream_errno != 0);
475 		errno = stream->stream_errno;
476 	}
477 
478 	return ret;
479 }
480 
o_stream_get_last_write_time(struct ostream * stream,struct timeval * tv_r)481 void o_stream_get_last_write_time(struct ostream *stream, struct timeval *tv_r)
482 {
483 	*tv_r = stream->real_stream->last_write_timeval;
484 }
485 
486 enum ostream_send_istream_result
io_stream_copy(struct ostream * outstream,struct istream * instream)487 io_stream_copy(struct ostream *outstream, struct istream *instream)
488 {
489 	struct const_iovec iov;
490 	const unsigned char *data;
491 	ssize_t ret;
492 
493 	while (i_stream_read_more(instream, &data, &iov.iov_len) > 0) {
494 		iov.iov_base = data;
495 		if ((ret = o_stream_sendv(outstream, &iov, 1)) < 0)
496 			return OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
497 		else if (ret == 0)
498 			return OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
499 		i_stream_skip(instream, ret);
500 	}
501 
502 	if (instream->stream_errno != 0)
503 		return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
504 	if (i_stream_have_bytes_left(instream))
505 		return OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT;
506 	return OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
507 }
508 
o_stream_switch_ioloop_to(struct ostream * stream,struct ioloop * ioloop)509 void o_stream_switch_ioloop_to(struct ostream *stream, struct ioloop *ioloop)
510 {
511 	struct ostream_private *_stream = stream->real_stream;
512 
513 	io_stream_switch_ioloop_to(&_stream->iostream, ioloop);
514 
515 	_stream->switch_ioloop_to(_stream, ioloop);
516 }
517 
o_stream_switch_ioloop(struct ostream * stream)518 void o_stream_switch_ioloop(struct ostream *stream)
519 {
520 	o_stream_switch_ioloop_to(stream, current_ioloop);
521 }
522 
o_stream_default_close(struct iostream_private * stream,bool close_parent)523 static void o_stream_default_close(struct iostream_private *stream,
524 				   bool close_parent)
525 {
526 	struct ostream_private *_stream =
527 		container_of(stream, struct ostream_private, iostream);
528 
529 	(void)o_stream_flush(&_stream->ostream);
530 	if (close_parent)
531 		o_stream_close(_stream->parent);
532 }
533 
o_stream_default_destroy(struct iostream_private * stream)534 static void o_stream_default_destroy(struct iostream_private *stream)
535 {
536 	struct ostream_private *_stream =
537 		container_of(stream, struct ostream_private, iostream);
538 
539 	o_stream_unref(&_stream->parent);
540 }
541 
542 static void
o_stream_default_set_max_buffer_size(struct iostream_private * stream,size_t max_size)543 o_stream_default_set_max_buffer_size(struct iostream_private *stream,
544 				     size_t max_size)
545 {
546 	struct ostream_private *_stream =
547 		container_of(stream, struct ostream_private, iostream);
548 
549 	if (_stream->parent != NULL)
550 		o_stream_set_max_buffer_size(_stream->parent, max_size);
551 	_stream->max_buffer_size = max_size;
552 }
553 
o_stream_default_cork(struct ostream_private * _stream,bool set)554 static void o_stream_default_cork(struct ostream_private *_stream, bool set)
555 {
556 	_stream->corked = set;
557 	if (set) {
558 		if (_stream->parent != NULL)
559 			o_stream_cork(_stream->parent);
560 	} else {
561 		(void)o_stream_flush(&_stream->ostream);
562 		_stream->last_errors_not_checked = TRUE;
563 
564 		if (_stream->parent != NULL)
565 			o_stream_uncork(_stream->parent);
566 	}
567 }
568 
o_stream_copy_error_from_parent(struct ostream_private * _stream)569 void o_stream_copy_error_from_parent(struct ostream_private *_stream)
570 {
571 	struct ostream *src = _stream->parent;
572 	struct ostream *dest = &_stream->ostream;
573 
574 	dest->stream_errno = src->stream_errno;
575 	dest->overflow = src->overflow;
576 	if (src->closed)
577 		o_stream_close(dest);
578 }
579 
o_stream_flush_parent_if_needed(struct ostream_private * _stream)580 int o_stream_flush_parent_if_needed(struct ostream_private *_stream)
581 {
582 	if (o_stream_get_buffer_used_size(_stream->parent) >= IO_BLOCK_SIZE) {
583 		/* we already have quite a lot of data in parent stream.
584 		   unless we can flush it, don't add any more to it or we
585 		   could keep wasting memory by just increasing the buffer
586 		   size all the time. */
587 		if (o_stream_flush(_stream->parent) < 0) {
588 			o_stream_copy_error_from_parent(_stream);
589 			return -1;
590 		}
591 		if (o_stream_get_buffer_used_size(_stream->parent) >= IO_BLOCK_SIZE)
592 			return 0;
593 	}
594 	return 1;
595 }
596 
o_stream_flush_parent(struct ostream_private * _stream)597 int o_stream_flush_parent(struct ostream_private *_stream)
598 {
599 	int ret;
600 
601 	i_assert(_stream->parent != NULL);
602 
603 	if (!_stream->finished || !_stream->finish_also_parent ||
604 	    !_stream->parent->real_stream->finish_via_child)
605 		ret = o_stream_flush(_stream->parent);
606 	else
607 		ret = o_stream_finish(_stream->parent);
608 	if (ret < 0)
609 		o_stream_copy_error_from_parent(_stream);
610 	return ret;
611 }
612 
o_stream_default_flush(struct ostream_private * _stream)613 static int o_stream_default_flush(struct ostream_private *_stream)
614 {
615 	if (_stream->parent == NULL)
616 		return 1;
617 
618 	return o_stream_flush_parent(_stream);
619 }
620 
621 static void
o_stream_default_set_flush_callback(struct ostream_private * _stream,stream_flush_callback_t * callback,void * context)622 o_stream_default_set_flush_callback(struct ostream_private *_stream,
623 				    stream_flush_callback_t *callback,
624 				    void *context)
625 {
626 	if (_stream->parent != NULL)
627 		o_stream_set_flush_callback(_stream->parent, callback, context);
628 
629 	_stream->callback = callback;
630 	_stream->context = context;
631 }
632 
633 static void
o_stream_default_set_flush_pending(struct ostream_private * _stream,bool set)634 o_stream_default_set_flush_pending(struct ostream_private *_stream, bool set)
635 {
636 	if (_stream->parent != NULL)
637 		o_stream_set_flush_pending(_stream->parent, set);
638 }
639 
640 static size_t
o_stream_default_get_buffer_used_size(const struct ostream_private * _stream)641 o_stream_default_get_buffer_used_size(const struct ostream_private *_stream)
642 {
643 	if (_stream->parent == NULL)
644 		return 0;
645 	else
646 		return o_stream_get_buffer_used_size(_stream->parent);
647 }
648 
649 static size_t
o_stream_default_get_buffer_avail_size(const struct ostream_private * _stream)650 o_stream_default_get_buffer_avail_size(const struct ostream_private *_stream)
651 {
652 	/* This default implementation assumes that the returned buffer size is
653 	   between 0..max_buffer_size. There's no assert though, in case the
654 	   max_buffer_size changes. */
655 	size_t used = o_stream_get_buffer_used_size(&_stream->ostream);
656 
657 	return _stream->max_buffer_size <= used ? 0 :
658 		_stream->max_buffer_size - used;
659 }
660 
661 static int
o_stream_default_seek(struct ostream_private * _stream,uoff_t offset ATTR_UNUSED)662 o_stream_default_seek(struct ostream_private *_stream,
663 		      uoff_t offset ATTR_UNUSED)
664 {
665 	_stream->ostream.stream_errno = ESPIPE;
666 	return -1;
667 }
668 
669 static ssize_t
o_stream_default_sendv(struct ostream_private * stream,const struct const_iovec * iov,unsigned int iov_count)670 o_stream_default_sendv(struct ostream_private *stream,
671 		       const struct const_iovec *iov, unsigned int iov_count)
672 {
673 	ssize_t ret;
674 
675 	if ((ret = o_stream_sendv(stream->parent, iov, iov_count)) < 0) {
676 		o_stream_copy_error_from_parent(stream);
677 		return -1;
678 	}
679 	stream->ostream.offset += ret;
680 	return ret;
681 }
682 
683 static int
o_stream_default_write_at(struct ostream_private * _stream,const void * data ATTR_UNUSED,size_t size ATTR_UNUSED,uoff_t offset ATTR_UNUSED)684 o_stream_default_write_at(struct ostream_private *_stream,
685 			  const void *data ATTR_UNUSED,
686 			  size_t size ATTR_UNUSED, uoff_t offset ATTR_UNUSED)
687 {
688 	_stream->ostream.stream_errno = ESPIPE;
689 	return -1;
690 }
691 
692 static enum ostream_send_istream_result
o_stream_default_send_istream(struct ostream_private * outstream,struct istream * instream)693 o_stream_default_send_istream(struct ostream_private *outstream,
694 			      struct istream *instream)
695 {
696 	return io_stream_copy(&outstream->ostream, instream);
697 }
698 
699 static void
o_stream_default_switch_ioloop_to(struct ostream_private * _stream,struct ioloop * ioloop)700 o_stream_default_switch_ioloop_to(struct ostream_private *_stream,
701 				  struct ioloop *ioloop)
702 {
703 	if (_stream->parent != NULL)
704 		o_stream_switch_ioloop_to(_stream->parent, ioloop);
705 }
706 
707 struct ostream *
o_stream_create(struct ostream_private * _stream,struct ostream * parent,int fd)708 o_stream_create(struct ostream_private *_stream, struct ostream *parent, int fd)
709 {
710 	_stream->finish_also_parent = TRUE;
711 	_stream->finish_via_child = TRUE;
712 	_stream->fd = fd;
713 	_stream->ostream.real_stream = _stream;
714 	if (parent != NULL) {
715 		_stream->ostream.blocking = parent->blocking;
716 		_stream->parent = parent;
717 		o_stream_ref(parent);
718 
719 		_stream->callback = parent->real_stream->callback;
720 		_stream->context = parent->real_stream->context;
721 		_stream->max_buffer_size = parent->real_stream->max_buffer_size;
722 		_stream->error_handling_disabled =
723 			parent->real_stream->error_handling_disabled;
724 	}
725 
726 	if (_stream->iostream.close == NULL)
727 		_stream->iostream.close = o_stream_default_close;
728 	if (_stream->iostream.destroy == NULL)
729 		_stream->iostream.destroy = o_stream_default_destroy;
730 	if (_stream->iostream.set_max_buffer_size == NULL) {
731 		_stream->iostream.set_max_buffer_size =
732 			o_stream_default_set_max_buffer_size;
733 	}
734 
735 	if (_stream->cork == NULL)
736 		_stream->cork = o_stream_default_cork;
737 	if (_stream->flush == NULL)
738 		_stream->flush = o_stream_default_flush;
739 	if (_stream->set_flush_callback == NULL) {
740 		_stream->set_flush_callback =
741 			o_stream_default_set_flush_callback;
742 	}
743 	if (_stream->flush_pending == NULL)
744 		_stream->flush_pending = o_stream_default_set_flush_pending;
745 	if (_stream->get_buffer_used_size == NULL)
746 		_stream->get_buffer_used_size =
747 			o_stream_default_get_buffer_used_size;
748 	if (_stream->get_buffer_avail_size == NULL) {
749 		_stream->get_buffer_avail_size =
750 			o_stream_default_get_buffer_avail_size;
751 	}
752 	if (_stream->seek == NULL)
753 		_stream->seek = o_stream_default_seek;
754 	if (_stream->sendv == NULL)
755 		_stream->sendv = o_stream_default_sendv;
756 	if (_stream->write_at == NULL)
757 		_stream->write_at = o_stream_default_write_at;
758 	if (_stream->send_istream == NULL)
759 		_stream->send_istream = o_stream_default_send_istream;
760 	if (_stream->switch_ioloop_to == NULL)
761 		_stream->switch_ioloop_to = o_stream_default_switch_ioloop_to;
762 
763 	io_stream_init(&_stream->iostream);
764 	return &_stream->ostream;
765 }
766 
o_stream_create_error(int stream_errno)767 struct ostream *o_stream_create_error(int stream_errno)
768 {
769 	struct ostream_private *stream;
770 	struct ostream *output;
771 
772 	stream = i_new(struct ostream_private, 1);
773 	stream->ostream.blocking = TRUE;
774 	stream->ostream.closed = TRUE;
775 	stream->ostream.stream_errno = stream_errno;
776 
777 	output = o_stream_create(stream, NULL, -1);
778 	o_stream_set_no_error_handling(output, TRUE);
779 	o_stream_set_name(output, "(error)");
780 	return output;
781 }
782 
783 struct ostream *
o_stream_create_error_str(int stream_errno,const char * fmt,...)784 o_stream_create_error_str(int stream_errno, const char *fmt, ...)
785 {
786 	struct ostream *output;
787 	va_list args;
788 
789 	va_start(args, fmt);
790 	output = o_stream_create_error(stream_errno);
791 	io_stream_set_verror(&output->real_stream->iostream, fmt, args);
792 	va_end(args);
793 	return output;
794 }
795 
o_stream_create_passthrough(struct ostream * output)796 struct ostream *o_stream_create_passthrough(struct ostream *output)
797 {
798 	struct ostream_private *stream;
799 
800 	stream = i_new(struct ostream_private, 1);
801 	return o_stream_create(stream, output, o_stream_get_fd(output));
802 }
803