1 /* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */
2
3 #include "lib.h"
4 #include "istream.h"
5 #include "ostream-private.h"
6
o_stream_set_name(struct ostream * stream,const char * name)7 void o_stream_set_name(struct ostream *stream, const char *name)
8 {
9 i_free(stream->real_stream->iostream.name);
10 stream->real_stream->iostream.name = i_strdup(name);
11 }
12
o_stream_get_name(struct ostream * stream)13 const char *o_stream_get_name(struct ostream *stream)
14 {
15 while (stream->real_stream->iostream.name == NULL) {
16 stream = stream->real_stream->parent;
17 if (stream == NULL)
18 return "";
19 }
20 return stream->real_stream->iostream.name;
21 }
22
o_stream_get_fd(struct ostream * stream)23 int o_stream_get_fd(struct ostream *stream)
24 {
25 return stream->real_stream->fd;
26 }
27
o_stream_get_error(struct ostream * stream)28 const char *o_stream_get_error(struct ostream *stream)
29 {
30 struct ostream *s;
31
32 /* we'll only return errors for streams that have stream_errno set.
33 we might be returning unintended error otherwise. */
34 if (stream->stream_errno == 0)
35 return "<no error>";
36
37 for (s = stream; s != NULL; s = s->real_stream->parent) {
38 if (s->stream_errno == 0)
39 break;
40 if (s->real_stream->iostream.error != NULL)
41 return s->real_stream->iostream.error;
42 }
43 return strerror(stream->stream_errno);
44 }
45
o_stream_get_disconnect_reason(struct ostream * stream)46 const char *o_stream_get_disconnect_reason(struct ostream *stream)
47 {
48 return io_stream_get_disconnect_reason(NULL, stream);
49 }
50
o_stream_close_full(struct ostream * stream,bool close_parents)51 static void o_stream_close_full(struct ostream *stream, bool close_parents)
52 {
53 /* Ideally o_stream_finish() would be called for all non-failed
54 ostreams, but strictly requiring it would cause unnecessary
55 complexity for many callers. Just require that at this point
56 after flushing there isn't anything in the output buffer or that
57 we're ignoring all errors. */
58 if (o_stream_flush(stream) == 0)
59 i_assert(stream->real_stream->error_handling_disabled);
60
61 if (!stream->closed && !stream->real_stream->closing) {
62 /* first mark the stream as being closed so the
63 o_stream_copy_error_from_parent() won't recurse us back
64 here. but don't immediately mark the stream closed, because
65 we may still want to write something to it. */
66 stream->real_stream->closing = TRUE;
67 io_stream_close(&stream->real_stream->iostream, close_parents);
68 stream->closed = TRUE;
69 }
70
71 if (stream->stream_errno == 0)
72 stream->stream_errno = EPIPE;
73 }
74
o_stream_destroy(struct ostream ** _stream)75 void o_stream_destroy(struct ostream **_stream)
76 {
77 struct ostream *stream = *_stream;
78
79 if (stream == NULL)
80 return;
81
82 *_stream = NULL;
83 o_stream_close_full(stream, FALSE);
84 o_stream_unref(&stream);
85 }
86
o_stream_ref(struct ostream * stream)87 void o_stream_ref(struct ostream *stream)
88 {
89 io_stream_ref(&stream->real_stream->iostream);
90 }
91
o_stream_unref(struct ostream ** _stream)92 void o_stream_unref(struct ostream **_stream)
93 {
94 struct ostream *stream;
95
96 if (*_stream == NULL)
97 return;
98
99 stream = *_stream;
100
101 if (stream->real_stream->last_errors_not_checked &&
102 !stream->real_stream->error_handling_disabled &&
103 stream->real_stream->iostream.refcount == 1) {
104 i_panic("output stream %s is missing error handling",
105 o_stream_get_name(stream));
106 }
107
108 if (!io_stream_unref(&stream->real_stream->iostream))
109 io_stream_free(&stream->real_stream->iostream);
110 *_stream = NULL;
111 }
112
113 #undef o_stream_add_destroy_callback
o_stream_add_destroy_callback(struct ostream * stream,ostream_callback_t * callback,void * context)114 void o_stream_add_destroy_callback(struct ostream *stream,
115 ostream_callback_t *callback, void *context)
116 {
117 io_stream_add_destroy_callback(&stream->real_stream->iostream,
118 callback, context);
119 }
120
o_stream_remove_destroy_callback(struct ostream * stream,void (* callback)())121 void o_stream_remove_destroy_callback(struct ostream *stream,
122 void (*callback)())
123 {
124 io_stream_remove_destroy_callback(&stream->real_stream->iostream,
125 callback);
126 }
127
o_stream_close(struct ostream * stream)128 void o_stream_close(struct ostream *stream)
129 {
130 if (stream != NULL)
131 o_stream_close_full(stream, TRUE);
132 }
133
134 #undef o_stream_set_flush_callback
o_stream_set_flush_callback(struct ostream * stream,stream_flush_callback_t * callback,void * context)135 void o_stream_set_flush_callback(struct ostream *stream,
136 stream_flush_callback_t *callback,
137 void *context)
138 {
139 struct ostream_private *_stream = stream->real_stream;
140
141 _stream->set_flush_callback(_stream, callback, context);
142 }
143
o_stream_unset_flush_callback(struct ostream * stream)144 void o_stream_unset_flush_callback(struct ostream *stream)
145 {
146 struct ostream_private *_stream = stream->real_stream;
147
148 _stream->set_flush_callback(_stream, NULL, NULL);
149 }
150
o_stream_set_max_buffer_size(struct ostream * stream,size_t max_size)151 void o_stream_set_max_buffer_size(struct ostream *stream, size_t max_size)
152 {
153 io_stream_set_max_buffer_size(&stream->real_stream->iostream, max_size);
154 }
155
o_stream_get_max_buffer_size(struct ostream * stream)156 size_t o_stream_get_max_buffer_size(struct ostream *stream)
157 {
158 return stream->real_stream->max_buffer_size;
159 }
160
o_stream_cork(struct ostream * stream)161 void o_stream_cork(struct ostream *stream)
162 {
163 struct ostream_private *_stream = stream->real_stream;
164
165 if (unlikely(stream->closed || stream->stream_errno != 0))
166 return;
167
168 _stream->cork(_stream, TRUE);
169 }
170
o_stream_uncork(struct ostream * stream)171 void o_stream_uncork(struct ostream *stream)
172 {
173 struct ostream_private *_stream = stream->real_stream;
174
175 if (unlikely(stream->closed || stream->stream_errno != 0))
176 return;
177
178 _stream->cork(_stream, FALSE);
179 }
180
o_stream_is_corked(struct ostream * stream)181 bool o_stream_is_corked(struct ostream *stream)
182 {
183 struct ostream_private *_stream = stream->real_stream;
184
185 return _stream->corked;
186 }
187
o_stream_flush(struct ostream * stream)188 int o_stream_flush(struct ostream *stream)
189 {
190 struct ostream_private *_stream = stream->real_stream;
191 int ret = 1;
192
193 o_stream_ignore_last_errors(stream);
194
195 if (unlikely(stream->closed || stream->stream_errno != 0)) {
196 errno = stream->stream_errno;
197 return -1;
198 }
199
200 if (unlikely(_stream->noverflow)) {
201 io_stream_set_error(&_stream->iostream,
202 "Output stream buffer was full (%zu bytes)",
203 o_stream_get_max_buffer_size(stream));
204 errno = stream->stream_errno = ENOBUFS;
205 return -1;
206 }
207
208 if (unlikely((ret = _stream->flush(_stream)) < 0)) {
209 i_assert(stream->stream_errno != 0);
210 errno = stream->stream_errno;
211 }
212 return ret;
213 }
214
o_stream_set_flush_pending(struct ostream * stream,bool set)215 void o_stream_set_flush_pending(struct ostream *stream, bool set)
216 {
217 struct ostream_private *_stream = stream->real_stream;
218
219 if (unlikely(stream->closed || stream->stream_errno != 0))
220 return;
221
222 _stream->flush_pending(_stream, set);
223 }
224
o_stream_get_buffer_used_size(const struct ostream * stream)225 size_t o_stream_get_buffer_used_size(const struct ostream *stream)
226 {
227 const struct ostream_private *_stream = stream->real_stream;
228
229 return _stream->get_buffer_used_size(_stream);
230 }
231
o_stream_get_buffer_avail_size(const struct ostream * stream)232 size_t o_stream_get_buffer_avail_size(const struct ostream *stream)
233 {
234 const struct ostream_private *_stream = stream->real_stream;
235
236 return _stream->get_buffer_avail_size(_stream);
237 }
238
o_stream_seek(struct ostream * stream,uoff_t offset)239 int o_stream_seek(struct ostream *stream, uoff_t offset)
240 {
241 struct ostream_private *_stream = stream->real_stream;
242
243 if (unlikely(stream->closed || stream->stream_errno != 0)) {
244 errno = stream->stream_errno;
245 return -1;
246 }
247
248 if (unlikely(_stream->seek(_stream, offset) < 0)) {
249 i_assert(stream->stream_errno != 0);
250 errno = stream->stream_errno;
251 return -1;
252 }
253 return 1;
254 }
255
o_stream_send(struct ostream * stream,const void * data,size_t size)256 ssize_t o_stream_send(struct ostream *stream, const void *data, size_t size)
257 {
258 struct const_iovec iov;
259
260 i_zero(&iov);
261 iov.iov_base = data;
262 iov.iov_len = size;
263
264 return o_stream_sendv(stream, &iov, 1);
265 }
266
267 static ssize_t
o_stream_sendv_int(struct ostream * stream,const struct const_iovec * iov,unsigned int iov_count,bool * overflow_r)268 o_stream_sendv_int(struct ostream *stream, const struct const_iovec *iov,
269 unsigned int iov_count, bool *overflow_r)
270 {
271 struct ostream_private *_stream = stream->real_stream;
272 unsigned int i;
273 size_t total_size;
274 ssize_t ret;
275
276 *overflow_r = FALSE;
277
278 for (i = 0, total_size = 0; i < iov_count; i++)
279 total_size += iov[i].iov_len;
280 if (total_size == 0)
281 return 0;
282
283 i_assert(!_stream->finished);
284 ret = _stream->sendv(_stream, iov, iov_count);
285 if (ret > 0)
286 stream->real_stream->last_write_timeval = ioloop_timeval;
287 if (unlikely(ret != (ssize_t)total_size)) {
288 if (ret < 0) {
289 i_assert(stream->stream_errno != 0);
290 errno = stream->stream_errno;
291 } else {
292 i_assert(!stream->blocking);
293 stream->overflow = TRUE;
294 *overflow_r = TRUE;
295 }
296 }
297 return ret;
298 }
299
o_stream_sendv(struct ostream * stream,const struct const_iovec * iov,unsigned int iov_count)300 ssize_t o_stream_sendv(struct ostream *stream, const struct const_iovec *iov,
301 unsigned int iov_count)
302 {
303 bool overflow;
304
305 if (unlikely(stream->closed || stream->stream_errno != 0)) {
306 errno = stream->stream_errno;
307 return -1;
308 }
309 return o_stream_sendv_int(stream, iov, iov_count, &overflow);
310 }
311
o_stream_send_str(struct ostream * stream,const char * str)312 ssize_t o_stream_send_str(struct ostream *stream, const char *str)
313 {
314 return o_stream_send(stream, str, strlen(str));
315 }
316
o_stream_nsend(struct ostream * stream,const void * data,size_t size)317 void o_stream_nsend(struct ostream *stream, const void *data, size_t size)
318 {
319 struct const_iovec iov;
320
321 i_zero(&iov);
322 iov.iov_base = data;
323 iov.iov_len = size;
324
325 o_stream_nsendv(stream, &iov, 1);
326 }
327
o_stream_nsendv(struct ostream * stream,const struct const_iovec * iov,unsigned int iov_count)328 void o_stream_nsendv(struct ostream *stream, const struct const_iovec *iov,
329 unsigned int iov_count)
330 {
331 bool overflow;
332
333 if (unlikely(stream->closed || stream->stream_errno != 0 ||
334 stream->real_stream->noverflow))
335 return;
336 (void)o_stream_sendv_int(stream, iov, iov_count, &overflow);
337 if (overflow)
338 stream->real_stream->noverflow = TRUE;
339 stream->real_stream->last_errors_not_checked = TRUE;
340 }
341
o_stream_nsend_str(struct ostream * stream,const char * str)342 void o_stream_nsend_str(struct ostream *stream, const char *str)
343 {
344 o_stream_nsend(stream, str, strlen(str));
345 }
346
o_stream_finish(struct ostream * stream)347 int o_stream_finish(struct ostream *stream)
348 {
349 stream->real_stream->finished = TRUE;
350 return o_stream_flush(stream);
351 }
352
o_stream_set_finish_also_parent(struct ostream * stream,bool set)353 void o_stream_set_finish_also_parent(struct ostream *stream, bool set)
354 {
355 stream->real_stream->finish_also_parent = set;
356 }
357
o_stream_set_finish_via_child(struct ostream * stream,bool set)358 void o_stream_set_finish_via_child(struct ostream *stream, bool set)
359 {
360 stream->real_stream->finish_via_child = set;
361 }
362
o_stream_ignore_last_errors(struct ostream * stream)363 void o_stream_ignore_last_errors(struct ostream *stream)
364 {
365 while (stream != NULL) {
366 stream->real_stream->last_errors_not_checked = FALSE;
367 stream = stream->real_stream->parent;
368 }
369 }
370
o_stream_abort(struct ostream * stream)371 void o_stream_abort(struct ostream *stream)
372 {
373 o_stream_ignore_last_errors(stream);
374 if (stream->stream_errno != 0)
375 return;
376 io_stream_set_error(&stream->real_stream->iostream, "aborted writing");
377 stream->stream_errno = EPIPE;
378 }
379
o_stream_set_no_error_handling(struct ostream * stream,bool set)380 void o_stream_set_no_error_handling(struct ostream *stream, bool set)
381 {
382 stream->real_stream->error_handling_disabled = set;
383 }
384
385 enum ostream_send_istream_result
o_stream_send_istream(struct ostream * outstream,struct istream * instream)386 o_stream_send_istream(struct ostream *outstream, struct istream *instream)
387 {
388 struct ostream_private *_outstream = outstream->real_stream;
389 uoff_t old_outstream_offset = outstream->offset;
390 uoff_t old_instream_offset = instream->v_offset;
391 enum ostream_send_istream_result res;
392
393 if (unlikely(instream->closed || instream->stream_errno != 0)) {
394 errno = instream->stream_errno;
395 return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
396 }
397 if (unlikely(outstream->closed || outstream->stream_errno != 0)) {
398 errno = outstream->stream_errno;
399 return OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
400 }
401
402 i_assert(!_outstream->finished);
403 res = _outstream->send_istream(_outstream, instream);
404 switch (res) {
405 case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
406 i_assert(instream->stream_errno == 0);
407 i_assert(outstream->stream_errno == 0);
408 i_assert(!i_stream_have_bytes_left(instream));
409 break;
410 case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
411 i_assert(!instream->blocking);
412 break;
413 case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
414 i_assert(!outstream->blocking);
415 o_stream_set_flush_pending(outstream, TRUE);
416 break;
417 case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
418 i_assert(instream->stream_errno != 0);
419 return res;
420 case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
421 i_assert(outstream->stream_errno != 0);
422 return res;
423 }
424 /* non-failure - make sure stream offsets match */
425 i_assert((outstream->offset - old_outstream_offset) ==
426 (instream->v_offset - old_instream_offset));
427
428 if (outstream->offset != old_outstream_offset)
429 outstream->real_stream->last_write_timeval = ioloop_timeval;
430 return res;
431 }
432
o_stream_nsend_istream(struct ostream * outstream,struct istream * instream)433 void o_stream_nsend_istream(struct ostream *outstream, struct istream *instream)
434 {
435 i_assert(instream->blocking);
436
437 switch (o_stream_send_istream(outstream, instream)) {
438 case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
439 break;
440 case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
441 i_unreached();
442 case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
443 outstream->real_stream->noverflow = TRUE;
444 break;
445 case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
446 outstream->stream_errno = instream->stream_errno;
447 io_stream_set_error(&outstream->real_stream->iostream,
448 "nsend-istream: read(%s) failed: %s",
449 i_stream_get_name(instream),
450 i_stream_get_error(instream));
451 break;
452 case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
453 break;
454 }
455 outstream->real_stream->last_errors_not_checked = TRUE;
456 }
457
o_stream_pwrite(struct ostream * stream,const void * data,size_t size,uoff_t offset)458 int o_stream_pwrite(struct ostream *stream, const void *data, size_t size,
459 uoff_t offset)
460 {
461 int ret;
462
463 if (unlikely(stream->closed || stream->stream_errno != 0)) {
464 errno = stream->stream_errno;
465 return -1;
466 }
467
468 i_assert(!stream->real_stream->finished);
469 ret = stream->real_stream->write_at(stream->real_stream,
470 data, size, offset);
471 if (ret > 0)
472 stream->real_stream->last_write_timeval = ioloop_timeval;
473 else if (unlikely(ret < 0)) {
474 i_assert(stream->stream_errno != 0);
475 errno = stream->stream_errno;
476 }
477
478 return ret;
479 }
480
o_stream_get_last_write_time(struct ostream * stream,struct timeval * tv_r)481 void o_stream_get_last_write_time(struct ostream *stream, struct timeval *tv_r)
482 {
483 *tv_r = stream->real_stream->last_write_timeval;
484 }
485
486 enum ostream_send_istream_result
io_stream_copy(struct ostream * outstream,struct istream * instream)487 io_stream_copy(struct ostream *outstream, struct istream *instream)
488 {
489 struct const_iovec iov;
490 const unsigned char *data;
491 ssize_t ret;
492
493 while (i_stream_read_more(instream, &data, &iov.iov_len) > 0) {
494 iov.iov_base = data;
495 if ((ret = o_stream_sendv(outstream, &iov, 1)) < 0)
496 return OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
497 else if (ret == 0)
498 return OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
499 i_stream_skip(instream, ret);
500 }
501
502 if (instream->stream_errno != 0)
503 return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
504 if (i_stream_have_bytes_left(instream))
505 return OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT;
506 return OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
507 }
508
o_stream_switch_ioloop_to(struct ostream * stream,struct ioloop * ioloop)509 void o_stream_switch_ioloop_to(struct ostream *stream, struct ioloop *ioloop)
510 {
511 struct ostream_private *_stream = stream->real_stream;
512
513 io_stream_switch_ioloop_to(&_stream->iostream, ioloop);
514
515 _stream->switch_ioloop_to(_stream, ioloop);
516 }
517
o_stream_switch_ioloop(struct ostream * stream)518 void o_stream_switch_ioloop(struct ostream *stream)
519 {
520 o_stream_switch_ioloop_to(stream, current_ioloop);
521 }
522
o_stream_default_close(struct iostream_private * stream,bool close_parent)523 static void o_stream_default_close(struct iostream_private *stream,
524 bool close_parent)
525 {
526 struct ostream_private *_stream =
527 container_of(stream, struct ostream_private, iostream);
528
529 (void)o_stream_flush(&_stream->ostream);
530 if (close_parent)
531 o_stream_close(_stream->parent);
532 }
533
o_stream_default_destroy(struct iostream_private * stream)534 static void o_stream_default_destroy(struct iostream_private *stream)
535 {
536 struct ostream_private *_stream =
537 container_of(stream, struct ostream_private, iostream);
538
539 o_stream_unref(&_stream->parent);
540 }
541
542 static void
o_stream_default_set_max_buffer_size(struct iostream_private * stream,size_t max_size)543 o_stream_default_set_max_buffer_size(struct iostream_private *stream,
544 size_t max_size)
545 {
546 struct ostream_private *_stream =
547 container_of(stream, struct ostream_private, iostream);
548
549 if (_stream->parent != NULL)
550 o_stream_set_max_buffer_size(_stream->parent, max_size);
551 _stream->max_buffer_size = max_size;
552 }
553
o_stream_default_cork(struct ostream_private * _stream,bool set)554 static void o_stream_default_cork(struct ostream_private *_stream, bool set)
555 {
556 _stream->corked = set;
557 if (set) {
558 if (_stream->parent != NULL)
559 o_stream_cork(_stream->parent);
560 } else {
561 (void)o_stream_flush(&_stream->ostream);
562 _stream->last_errors_not_checked = TRUE;
563
564 if (_stream->parent != NULL)
565 o_stream_uncork(_stream->parent);
566 }
567 }
568
o_stream_copy_error_from_parent(struct ostream_private * _stream)569 void o_stream_copy_error_from_parent(struct ostream_private *_stream)
570 {
571 struct ostream *src = _stream->parent;
572 struct ostream *dest = &_stream->ostream;
573
574 dest->stream_errno = src->stream_errno;
575 dest->overflow = src->overflow;
576 if (src->closed)
577 o_stream_close(dest);
578 }
579
o_stream_flush_parent_if_needed(struct ostream_private * _stream)580 int o_stream_flush_parent_if_needed(struct ostream_private *_stream)
581 {
582 if (o_stream_get_buffer_used_size(_stream->parent) >= IO_BLOCK_SIZE) {
583 /* we already have quite a lot of data in parent stream.
584 unless we can flush it, don't add any more to it or we
585 could keep wasting memory by just increasing the buffer
586 size all the time. */
587 if (o_stream_flush(_stream->parent) < 0) {
588 o_stream_copy_error_from_parent(_stream);
589 return -1;
590 }
591 if (o_stream_get_buffer_used_size(_stream->parent) >= IO_BLOCK_SIZE)
592 return 0;
593 }
594 return 1;
595 }
596
o_stream_flush_parent(struct ostream_private * _stream)597 int o_stream_flush_parent(struct ostream_private *_stream)
598 {
599 int ret;
600
601 i_assert(_stream->parent != NULL);
602
603 if (!_stream->finished || !_stream->finish_also_parent ||
604 !_stream->parent->real_stream->finish_via_child)
605 ret = o_stream_flush(_stream->parent);
606 else
607 ret = o_stream_finish(_stream->parent);
608 if (ret < 0)
609 o_stream_copy_error_from_parent(_stream);
610 return ret;
611 }
612
o_stream_default_flush(struct ostream_private * _stream)613 static int o_stream_default_flush(struct ostream_private *_stream)
614 {
615 if (_stream->parent == NULL)
616 return 1;
617
618 return o_stream_flush_parent(_stream);
619 }
620
621 static void
o_stream_default_set_flush_callback(struct ostream_private * _stream,stream_flush_callback_t * callback,void * context)622 o_stream_default_set_flush_callback(struct ostream_private *_stream,
623 stream_flush_callback_t *callback,
624 void *context)
625 {
626 if (_stream->parent != NULL)
627 o_stream_set_flush_callback(_stream->parent, callback, context);
628
629 _stream->callback = callback;
630 _stream->context = context;
631 }
632
633 static void
o_stream_default_set_flush_pending(struct ostream_private * _stream,bool set)634 o_stream_default_set_flush_pending(struct ostream_private *_stream, bool set)
635 {
636 if (_stream->parent != NULL)
637 o_stream_set_flush_pending(_stream->parent, set);
638 }
639
640 static size_t
o_stream_default_get_buffer_used_size(const struct ostream_private * _stream)641 o_stream_default_get_buffer_used_size(const struct ostream_private *_stream)
642 {
643 if (_stream->parent == NULL)
644 return 0;
645 else
646 return o_stream_get_buffer_used_size(_stream->parent);
647 }
648
649 static size_t
o_stream_default_get_buffer_avail_size(const struct ostream_private * _stream)650 o_stream_default_get_buffer_avail_size(const struct ostream_private *_stream)
651 {
652 /* This default implementation assumes that the returned buffer size is
653 between 0..max_buffer_size. There's no assert though, in case the
654 max_buffer_size changes. */
655 size_t used = o_stream_get_buffer_used_size(&_stream->ostream);
656
657 return _stream->max_buffer_size <= used ? 0 :
658 _stream->max_buffer_size - used;
659 }
660
661 static int
o_stream_default_seek(struct ostream_private * _stream,uoff_t offset ATTR_UNUSED)662 o_stream_default_seek(struct ostream_private *_stream,
663 uoff_t offset ATTR_UNUSED)
664 {
665 _stream->ostream.stream_errno = ESPIPE;
666 return -1;
667 }
668
669 static ssize_t
o_stream_default_sendv(struct ostream_private * stream,const struct const_iovec * iov,unsigned int iov_count)670 o_stream_default_sendv(struct ostream_private *stream,
671 const struct const_iovec *iov, unsigned int iov_count)
672 {
673 ssize_t ret;
674
675 if ((ret = o_stream_sendv(stream->parent, iov, iov_count)) < 0) {
676 o_stream_copy_error_from_parent(stream);
677 return -1;
678 }
679 stream->ostream.offset += ret;
680 return ret;
681 }
682
683 static int
o_stream_default_write_at(struct ostream_private * _stream,const void * data ATTR_UNUSED,size_t size ATTR_UNUSED,uoff_t offset ATTR_UNUSED)684 o_stream_default_write_at(struct ostream_private *_stream,
685 const void *data ATTR_UNUSED,
686 size_t size ATTR_UNUSED, uoff_t offset ATTR_UNUSED)
687 {
688 _stream->ostream.stream_errno = ESPIPE;
689 return -1;
690 }
691
692 static enum ostream_send_istream_result
o_stream_default_send_istream(struct ostream_private * outstream,struct istream * instream)693 o_stream_default_send_istream(struct ostream_private *outstream,
694 struct istream *instream)
695 {
696 return io_stream_copy(&outstream->ostream, instream);
697 }
698
699 static void
o_stream_default_switch_ioloop_to(struct ostream_private * _stream,struct ioloop * ioloop)700 o_stream_default_switch_ioloop_to(struct ostream_private *_stream,
701 struct ioloop *ioloop)
702 {
703 if (_stream->parent != NULL)
704 o_stream_switch_ioloop_to(_stream->parent, ioloop);
705 }
706
707 struct ostream *
o_stream_create(struct ostream_private * _stream,struct ostream * parent,int fd)708 o_stream_create(struct ostream_private *_stream, struct ostream *parent, int fd)
709 {
710 _stream->finish_also_parent = TRUE;
711 _stream->finish_via_child = TRUE;
712 _stream->fd = fd;
713 _stream->ostream.real_stream = _stream;
714 if (parent != NULL) {
715 _stream->ostream.blocking = parent->blocking;
716 _stream->parent = parent;
717 o_stream_ref(parent);
718
719 _stream->callback = parent->real_stream->callback;
720 _stream->context = parent->real_stream->context;
721 _stream->max_buffer_size = parent->real_stream->max_buffer_size;
722 _stream->error_handling_disabled =
723 parent->real_stream->error_handling_disabled;
724 }
725
726 if (_stream->iostream.close == NULL)
727 _stream->iostream.close = o_stream_default_close;
728 if (_stream->iostream.destroy == NULL)
729 _stream->iostream.destroy = o_stream_default_destroy;
730 if (_stream->iostream.set_max_buffer_size == NULL) {
731 _stream->iostream.set_max_buffer_size =
732 o_stream_default_set_max_buffer_size;
733 }
734
735 if (_stream->cork == NULL)
736 _stream->cork = o_stream_default_cork;
737 if (_stream->flush == NULL)
738 _stream->flush = o_stream_default_flush;
739 if (_stream->set_flush_callback == NULL) {
740 _stream->set_flush_callback =
741 o_stream_default_set_flush_callback;
742 }
743 if (_stream->flush_pending == NULL)
744 _stream->flush_pending = o_stream_default_set_flush_pending;
745 if (_stream->get_buffer_used_size == NULL)
746 _stream->get_buffer_used_size =
747 o_stream_default_get_buffer_used_size;
748 if (_stream->get_buffer_avail_size == NULL) {
749 _stream->get_buffer_avail_size =
750 o_stream_default_get_buffer_avail_size;
751 }
752 if (_stream->seek == NULL)
753 _stream->seek = o_stream_default_seek;
754 if (_stream->sendv == NULL)
755 _stream->sendv = o_stream_default_sendv;
756 if (_stream->write_at == NULL)
757 _stream->write_at = o_stream_default_write_at;
758 if (_stream->send_istream == NULL)
759 _stream->send_istream = o_stream_default_send_istream;
760 if (_stream->switch_ioloop_to == NULL)
761 _stream->switch_ioloop_to = o_stream_default_switch_ioloop_to;
762
763 io_stream_init(&_stream->iostream);
764 return &_stream->ostream;
765 }
766
o_stream_create_error(int stream_errno)767 struct ostream *o_stream_create_error(int stream_errno)
768 {
769 struct ostream_private *stream;
770 struct ostream *output;
771
772 stream = i_new(struct ostream_private, 1);
773 stream->ostream.blocking = TRUE;
774 stream->ostream.closed = TRUE;
775 stream->ostream.stream_errno = stream_errno;
776
777 output = o_stream_create(stream, NULL, -1);
778 o_stream_set_no_error_handling(output, TRUE);
779 o_stream_set_name(output, "(error)");
780 return output;
781 }
782
783 struct ostream *
o_stream_create_error_str(int stream_errno,const char * fmt,...)784 o_stream_create_error_str(int stream_errno, const char *fmt, ...)
785 {
786 struct ostream *output;
787 va_list args;
788
789 va_start(args, fmt);
790 output = o_stream_create_error(stream_errno);
791 io_stream_set_verror(&output->real_stream->iostream, fmt, args);
792 va_end(args);
793 return output;
794 }
795
o_stream_create_passthrough(struct ostream * output)796 struct ostream *o_stream_create_passthrough(struct ostream *output)
797 {
798 struct ostream_private *stream;
799
800 stream = i_new(struct ostream_private, 1);
801 return o_stream_create(stream, output, o_stream_get_fd(output));
802 }
803