1 /* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */
2
3 #include "lib.h"
4 #include "ioloop.h"
5 #include "array.h"
6 #include "str.h"
7 #include "memarea.h"
8 #include "istream-private.h"
9
10 static bool i_stream_is_buffer_invalid(const struct istream_private *stream);
11
i_stream_set_name(struct istream * stream,const char * name)12 void i_stream_set_name(struct istream *stream, const char *name)
13 {
14 i_free(stream->real_stream->iostream.name);
15 stream->real_stream->iostream.name = i_strdup(name);
16 }
17
i_stream_get_name(struct istream * stream)18 const char *i_stream_get_name(struct istream *stream)
19 {
20 while (stream->real_stream->iostream.name == NULL) {
21 stream = stream->real_stream->parent;
22 if (stream == NULL)
23 return "";
24 }
25 return stream->real_stream->iostream.name;
26 }
27
i_stream_close_full(struct istream * stream,bool close_parents)28 static void i_stream_close_full(struct istream *stream, bool close_parents)
29 {
30 io_stream_close(&stream->real_stream->iostream, close_parents);
31 stream->closed = TRUE;
32
33 if (stream->stream_errno == 0)
34 stream->stream_errno = EPIPE;
35 }
36
i_stream_destroy(struct istream ** stream)37 void i_stream_destroy(struct istream **stream)
38 {
39 if (*stream == NULL)
40 return;
41
42 i_stream_close_full(*stream, FALSE);
43 i_stream_unref(stream);
44 }
45
i_stream_ref(struct istream * stream)46 void i_stream_ref(struct istream *stream)
47 {
48 io_stream_ref(&stream->real_stream->iostream);
49 }
50
i_stream_unref(struct istream ** stream)51 void i_stream_unref(struct istream **stream)
52 {
53 struct istream_private *_stream;
54
55 if (*stream == NULL)
56 return;
57
58 _stream = (*stream)->real_stream;
59
60 if (_stream->iostream.refcount > 1) {
61 if (!io_stream_unref(&_stream->iostream))
62 i_unreached();
63 } else {
64 /* The snapshot may contain pointers to the parent istreams.
65 Free it before io_stream_unref() frees the parents. */
66 i_stream_snapshot_free(&_stream->prev_snapshot);
67
68 if (io_stream_unref(&_stream->iostream))
69 i_unreached();
70 str_free(&_stream->line_str);
71 i_stream_unref(&_stream->parent);
72 io_stream_free(&_stream->iostream);
73 }
74 *stream = NULL;
75 }
76
77 #undef i_stream_add_destroy_callback
i_stream_add_destroy_callback(struct istream * stream,istream_callback_t * callback,void * context)78 void i_stream_add_destroy_callback(struct istream *stream,
79 istream_callback_t *callback, void *context)
80 {
81 io_stream_add_destroy_callback(&stream->real_stream->iostream,
82 callback, context);
83 }
84
i_stream_remove_destroy_callback(struct istream * stream,void (* callback)())85 void i_stream_remove_destroy_callback(struct istream *stream,
86 void (*callback)())
87 {
88 io_stream_remove_destroy_callback(&stream->real_stream->iostream,
89 callback);
90 }
91
i_stream_get_fd(struct istream * stream)92 int i_stream_get_fd(struct istream *stream)
93 {
94 struct istream_private *_stream = stream->real_stream;
95
96 return _stream->fd;
97 }
98
i_stream_copy_fd(struct istream * dest,struct istream * source)99 void i_stream_copy_fd(struct istream *dest, struct istream *source)
100 {
101 int fd = i_stream_get_fd(source);
102
103 i_assert(fd != -1);
104 i_assert(dest->real_stream->fd == -1);
105 dest->real_stream->fd = fd;
106 dest->readable_fd = source->readable_fd;
107 }
108
i_stream_get_error(struct istream * stream)109 const char *i_stream_get_error(struct istream *stream)
110 {
111 struct istream *s;
112
113 /* we'll only return errors for streams that have stream_errno set or
114 that have reached EOF. we might be returning unintended error
115 otherwise. */
116 if (stream->stream_errno == 0)
117 return stream->eof ? "EOF" : "<no error>";
118
119 for (s = stream; s != NULL; s = s->real_stream->parent) {
120 if (s->stream_errno == 0)
121 break;
122 if (s->real_stream->iostream.error != NULL)
123 return s->real_stream->iostream.error;
124 }
125 return strerror(stream->stream_errno);
126 }
127
i_stream_get_disconnect_reason(struct istream * stream)128 const char *i_stream_get_disconnect_reason(struct istream *stream)
129 {
130 return io_stream_get_disconnect_reason(stream, NULL);
131 }
132
i_stream_close(struct istream * stream)133 void i_stream_close(struct istream *stream)
134 {
135 if (stream != NULL)
136 i_stream_close_full(stream, TRUE);
137 }
138
i_stream_set_init_buffer_size(struct istream * stream,size_t size)139 void i_stream_set_init_buffer_size(struct istream *stream, size_t size)
140 {
141 stream->real_stream->init_buffer_size = size;
142 }
143
i_stream_set_max_buffer_size(struct istream * stream,size_t max_size)144 void i_stream_set_max_buffer_size(struct istream *stream, size_t max_size)
145 {
146 io_stream_set_max_buffer_size(&stream->real_stream->iostream, max_size);
147 }
148
i_stream_get_max_buffer_size(struct istream * stream)149 size_t i_stream_get_max_buffer_size(struct istream *stream)
150 {
151 size_t max_size = 0;
152
153 do {
154 if (max_size < stream->real_stream->max_buffer_size)
155 max_size = stream->real_stream->max_buffer_size;
156 stream = stream->real_stream->parent;
157 } while (stream != NULL);
158 return max_size;
159 }
160
i_stream_set_return_partial_line(struct istream * stream,bool set)161 void i_stream_set_return_partial_line(struct istream *stream, bool set)
162 {
163 stream->real_stream->return_nolf_line = set;
164 }
165
i_stream_set_persistent_buffers(struct istream * stream,bool set)166 void i_stream_set_persistent_buffers(struct istream *stream, bool set)
167 {
168 do {
169 stream->real_stream->nonpersistent_buffers = !set;
170 stream = stream->real_stream->parent;
171 } while (stream != NULL);
172 }
173
i_stream_set_blocking(struct istream * stream,bool blocking)174 void i_stream_set_blocking(struct istream *stream, bool blocking)
175 {
176 int prev_fd = -1;
177
178 do {
179 stream->blocking = blocking;
180 if (stream->real_stream->fd != -1 &&
181 stream->real_stream->fd != prev_fd) {
182 fd_set_nonblock(stream->real_stream->fd, !blocking);
183 prev_fd = stream->real_stream->fd;
184 }
185 stream = stream->real_stream->parent;
186 } while (stream != NULL);
187 }
188
i_stream_update(struct istream_private * stream)189 static void i_stream_update(struct istream_private *stream)
190 {
191 if (stream->parent == NULL)
192 stream->access_counter++;
193 else {
194 stream->access_counter =
195 stream->parent->real_stream->access_counter;
196 stream->parent_expected_offset = stream->parent->v_offset;
197 }
198 }
199
snapshot_has_memarea(struct istream_snapshot * snapshot,struct memarea * memarea)200 static bool snapshot_has_memarea(struct istream_snapshot *snapshot,
201 struct memarea *memarea)
202 {
203 if (snapshot->old_memarea == memarea)
204 return TRUE;
205 if (snapshot->prev_snapshot != NULL)
206 return snapshot_has_memarea(snapshot->prev_snapshot, memarea);
207 return FALSE;
208 }
209
210 struct istream_snapshot *
i_stream_default_snapshot(struct istream_private * stream,struct istream_snapshot * prev_snapshot)211 i_stream_default_snapshot(struct istream_private *stream,
212 struct istream_snapshot *prev_snapshot)
213 {
214 struct istream_snapshot *snapshot;
215
216 if (stream->memarea != NULL) {
217 if (prev_snapshot != NULL) {
218 if (snapshot_has_memarea(prev_snapshot, stream->memarea))
219 return prev_snapshot;
220 }
221 /* This stream has a memarea. Reference it, so we can later on
222 rollback if needed. */
223 snapshot = i_new(struct istream_snapshot, 1);
224 snapshot->old_memarea = stream->memarea;
225 snapshot->prev_snapshot = prev_snapshot;
226 memarea_ref(snapshot->old_memarea);
227 return snapshot;
228 }
229 if (stream->parent == NULL) {
230 if (stream->nonpersistent_buffers) {
231 /* Assume that memarea would be used normally, but
232 now it's NULL because the buffer is empty and
233 empty buffers are freed. */
234 i_assert(stream->skip == stream->pos);
235 return prev_snapshot;
236 }
237 i_panic("%s is missing istream.snapshot() implementation",
238 i_stream_get_name(&stream->istream));
239 }
240 struct istream_private *_parent_stream =
241 stream->parent->real_stream;
242 return _parent_stream->snapshot(_parent_stream, prev_snapshot);
243 }
244
i_stream_snapshot_free(struct istream_snapshot ** _snapshot)245 void i_stream_snapshot_free(struct istream_snapshot **_snapshot)
246 {
247 struct istream_snapshot *snapshot = *_snapshot;
248
249 if (*_snapshot == NULL)
250 return;
251 *_snapshot = NULL;
252
253 i_stream_snapshot_free(&snapshot->prev_snapshot);
254 if (snapshot->free != NULL)
255 snapshot->free(snapshot);
256 else {
257 if (snapshot->old_memarea != NULL)
258 memarea_unref(&snapshot->old_memarea);
259 i_free(snapshot);
260 }
261 }
262
263 static struct istream_snapshot *
i_stream_noop_snapshot(struct istream_private * stream ATTR_UNUSED,struct istream_snapshot * prev_snapshot)264 i_stream_noop_snapshot(struct istream_private *stream ATTR_UNUSED,
265 struct istream_snapshot *prev_snapshot)
266 {
267 return prev_snapshot;
268 }
269
i_stream_read(struct istream * stream)270 ssize_t i_stream_read(struct istream *stream)
271 {
272 struct istream_private *_stream = stream->real_stream;
273 ssize_t ret;
274 #ifdef DEBUG
275 unsigned char prev_buf[4];
276 const unsigned char *prev_data = _stream->buffer;
277 size_t prev_skip = _stream->skip, prev_pos = _stream->pos;
278 bool invalid = i_stream_is_buffer_invalid(_stream);
279
280 i_assert(prev_skip <= prev_pos);
281 if (invalid)
282 ;
283 else if (prev_pos - prev_skip <= 4)
284 memcpy(prev_buf, prev_data + prev_skip, prev_pos - prev_skip);
285 else {
286 memcpy(prev_buf, prev_data + prev_skip, 2);
287 memcpy(prev_buf+2, prev_data + prev_pos - 2, 2);
288 }
289 #endif
290
291 if (_stream->skip != _stream->pos || _stream->prev_snapshot != NULL) {
292 _stream->prev_snapshot =
293 _stream->snapshot(_stream, _stream->prev_snapshot);
294 }
295 ret = i_stream_read_memarea(stream);
296 if (ret > 0)
297 i_stream_snapshot_free(&_stream->prev_snapshot);
298 #ifdef DEBUG
299 else if (!invalid) {
300 i_assert((_stream->pos - _stream->skip) == (prev_pos - prev_skip) ||
301 prev_pos == prev_skip);
302 if (prev_pos - prev_skip <= 4)
303 i_assert(memcmp(prev_buf, prev_data + prev_skip, prev_pos - prev_skip) == 0);
304 else {
305 i_assert(memcmp(prev_buf, prev_data + prev_skip, 2) == 0);
306 i_assert(memcmp(prev_buf+2, prev_data + prev_pos - 2, 2) == 0);
307 }
308 }
309 #endif
310 return ret;
311 }
312
i_stream_read_memarea(struct istream * stream)313 ssize_t i_stream_read_memarea(struct istream *stream)
314 {
315 struct istream_private *_stream = stream->real_stream;
316 size_t old_size;
317 ssize_t ret;
318
319 if (unlikely(stream->closed || stream->stream_errno != 0)) {
320 stream->eof = TRUE;
321 errno = stream->stream_errno;
322 return -1;
323 }
324
325 stream->eof = FALSE;
326
327 if (_stream->parent != NULL)
328 i_stream_seek(_stream->parent, _stream->parent_expected_offset);
329
330 old_size = _stream->pos - _stream->skip;
331 if (_stream->pos < _stream->high_pos) {
332 /* we're here because we seeked back within the read buffer. */
333 ret = _stream->high_pos - _stream->pos;
334 _stream->pos = _stream->high_pos;
335 _stream->high_pos = 0;
336 } else {
337 _stream->high_pos = 0;
338 ret = _stream->read(_stream);
339 }
340 i_assert(old_size <= _stream->pos - _stream->skip);
341 switch (ret) {
342 case -2:
343 i_assert(_stream->skip != _stream->pos);
344 break;
345 case -1:
346 if (stream->stream_errno != 0) {
347 /* error handling should be easier if we now just
348 assume the stream is now at EOF */
349 stream->eof = TRUE;
350 errno = stream->stream_errno;
351 } else {
352 i_assert(stream->eof);
353 i_assert(old_size == _stream->pos - _stream->skip);
354 }
355 break;
356 case 0:
357 i_assert(!stream->blocking);
358 break;
359 default:
360 i_assert(ret > 0);
361 i_assert(_stream->skip < _stream->pos);
362 i_assert((size_t)ret+old_size == _stream->pos - _stream->skip);
363 _stream->last_read_timeval = ioloop_timeval;
364 break;
365 }
366
367 if (stream->stream_errno != 0) {
368 /* error handling should be easier if we now just
369 assume the stream is now at EOF. Note that we could get here
370 even if read() didn't return -1, although that's a little
371 bit sloppy istream implementation. */
372 stream->eof = TRUE;
373 }
374
375 i_stream_update(_stream);
376 /* verify that parents' access_counters are valid. the parent's
377 i_stream_read() should guarantee this. */
378 i_assert(!i_stream_is_buffer_invalid(_stream));
379 return ret;
380 }
381
i_stream_read_more_memarea(struct istream * stream,const unsigned char ** data_r,size_t * size_r)382 int i_stream_read_more_memarea(struct istream *stream,
383 const unsigned char **data_r, size_t *size_r)
384 {
385 *data_r = i_stream_get_data(stream, size_r);
386 if (*size_r > 0)
387 return 1;
388
389 int ret = i_stream_read_memarea(stream);
390 *data_r = i_stream_get_data(stream, size_r);
391 return ret;
392 }
393
i_stream_get_last_read_time(struct istream * stream,struct timeval * tv_r)394 void i_stream_get_last_read_time(struct istream *stream, struct timeval *tv_r)
395 {
396 *tv_r = stream->real_stream->last_read_timeval;
397 }
398
i_stream_read_copy_from_parent(struct istream * istream)399 ssize_t i_stream_read_copy_from_parent(struct istream *istream)
400 {
401 struct istream_private *stream = istream->real_stream;
402 size_t pos;
403 ssize_t ret;
404
405 stream->pos -= stream->skip;
406 stream->skip = 0;
407
408 stream->buffer = i_stream_get_data(stream->parent, &pos);
409 if (pos > stream->pos)
410 ret = 0;
411 else do {
412 ret = i_stream_read_memarea(stream->parent);
413 stream->istream.stream_errno = stream->parent->stream_errno;
414 stream->istream.eof = stream->parent->eof;
415 stream->buffer = i_stream_get_data(stream->parent, &pos);
416 /* check again, in case the parent stream had been seeked
417 backwards and the previous read() didn't get us far
418 enough. */
419 } while (pos <= stream->pos && ret > 0);
420 if (ret == -2) {
421 i_stream_update(stream);
422 return -2;
423 }
424
425 ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) :
426 (ret == 0 ? 0 : -1);
427 stream->pos = pos;
428 i_assert(ret != -1 || stream->istream.eof ||
429 stream->istream.stream_errno != 0);
430 i_stream_update(stream);
431 return ret;
432 }
433
i_stream_free_buffer(struct istream_private * stream)434 void i_stream_free_buffer(struct istream_private *stream)
435 {
436 if (stream->memarea != NULL) {
437 memarea_unref(&stream->memarea);
438 stream->w_buffer = NULL;
439 } else if (stream->w_buffer != NULL) {
440 i_free_and_null(stream->w_buffer);
441 } else {
442 /* don't know how to free it */
443 return;
444 }
445 stream->buffer_size = 0;
446 }
447
i_stream_skip(struct istream * stream,uoff_t count)448 void i_stream_skip(struct istream *stream, uoff_t count)
449 {
450 struct istream_private *_stream = stream->real_stream;
451 size_t data_size;
452
453 data_size = _stream->pos - _stream->skip;
454 if (count <= data_size) {
455 /* within buffer */
456 stream->v_offset += count;
457 _stream->skip += count;
458 if (_stream->nonpersistent_buffers &&
459 _stream->skip == _stream->pos) {
460 _stream->skip = _stream->pos = 0;
461 i_stream_free_buffer(_stream);
462 }
463 return;
464 }
465
466 /* have to seek forward */
467 count -= data_size;
468 _stream->skip = _stream->pos;
469 stream->v_offset += data_size;
470
471 if (unlikely(stream->closed || stream->stream_errno != 0))
472 return;
473
474 _stream->seek(_stream, stream->v_offset + count, FALSE);
475 }
476
i_stream_can_optimize_seek(struct istream_private * stream)477 static bool i_stream_can_optimize_seek(struct istream_private *stream)
478 {
479 if (stream->parent == NULL)
480 return TRUE;
481
482 /* use the fast route only if the parent stream hasn't been changed */
483 if (stream->access_counter !=
484 stream->parent->real_stream->access_counter)
485 return FALSE;
486
487 return i_stream_can_optimize_seek(stream->parent->real_stream);
488 }
489
i_stream_seek(struct istream * stream,uoff_t v_offset)490 void i_stream_seek(struct istream *stream, uoff_t v_offset)
491 {
492 struct istream_private *_stream = stream->real_stream;
493
494 if (v_offset >= stream->v_offset &&
495 i_stream_can_optimize_seek(_stream))
496 i_stream_skip(stream, v_offset - stream->v_offset);
497 else {
498 if (unlikely(stream->closed || stream->stream_errno != 0)) {
499 stream->eof = TRUE;
500 return;
501 }
502 stream->eof = FALSE;
503 _stream->seek(_stream, v_offset, FALSE);
504 }
505 i_stream_update(_stream);
506 }
507
i_stream_seek_mark(struct istream * stream,uoff_t v_offset)508 void i_stream_seek_mark(struct istream *stream, uoff_t v_offset)
509 {
510 struct istream_private *_stream = stream->real_stream;
511
512 if (unlikely(stream->closed || stream->stream_errno != 0))
513 return;
514
515 stream->eof = FALSE;
516 _stream->seek(_stream, v_offset, TRUE);
517 i_stream_update(_stream);
518 }
519
i_stream_sync(struct istream * stream)520 void i_stream_sync(struct istream *stream)
521 {
522 struct istream_private *_stream = stream->real_stream;
523
524 if (unlikely(stream->closed || stream->stream_errno != 0))
525 return;
526
527 if (_stream->sync != NULL) {
528 _stream->sync(_stream);
529 i_stream_update(_stream);
530 }
531 }
532
i_stream_stat(struct istream * stream,bool exact,const struct stat ** st_r)533 int i_stream_stat(struct istream *stream, bool exact, const struct stat **st_r)
534 {
535 struct istream_private *_stream = stream->real_stream;
536
537 if (unlikely(stream->closed || stream->stream_errno != 0))
538 return -1;
539
540 if (_stream->stat(_stream, exact) < 0) {
541 stream->eof = TRUE;
542 return -1;
543 }
544 *st_r = &_stream->statbuf;
545 return 0;
546 }
547
i_stream_get_size(struct istream * stream,bool exact,uoff_t * size_r)548 int i_stream_get_size(struct istream *stream, bool exact, uoff_t *size_r)
549 {
550 struct istream_private *_stream = stream->real_stream;
551
552 if (unlikely(stream->closed || stream->stream_errno != 0))
553 return -1;
554
555 int ret;
556 if ((ret = _stream->get_size(_stream, exact, size_r)) < 0)
557 stream->eof = TRUE;
558 return ret;
559 }
560
i_stream_have_bytes_left(struct istream * stream)561 bool i_stream_have_bytes_left(struct istream *stream)
562 {
563 return i_stream_get_data_size(stream) > 0 || !stream->eof;
564 }
565
i_stream_read_eof(struct istream * stream)566 bool i_stream_read_eof(struct istream *stream)
567 {
568 if (i_stream_get_data_size(stream) == 0)
569 (void)i_stream_read(stream);
570 return !i_stream_have_bytes_left(stream);
571 }
572
i_stream_get_absolute_offset(struct istream * stream)573 uoff_t i_stream_get_absolute_offset(struct istream *stream)
574 {
575 uoff_t abs_offset = stream->v_offset;
576 while (stream != NULL) {
577 abs_offset += stream->real_stream->start_offset;
578 stream = stream->real_stream->parent;
579 }
580 return abs_offset;
581 }
582
i_stream_next_line_finish(struct istream_private * stream,size_t i)583 static char *i_stream_next_line_finish(struct istream_private *stream, size_t i)
584 {
585 char *ret;
586 size_t end;
587
588 if (i > stream->skip && stream->buffer[i-1] == '\r') {
589 end = i - 1;
590 stream->line_crlf = TRUE;
591 } else {
592 end = i;
593 stream->line_crlf = FALSE;
594 }
595
596 if (stream->buffer == stream->w_buffer &&
597 end < stream->buffer_size) {
598 /* modify the buffer directly */
599 stream->w_buffer[end] = '\0';
600 ret = (char *)stream->w_buffer + stream->skip;
601 } else {
602 /* use a temporary string to return it */
603 if (stream->line_str == NULL)
604 stream->line_str = str_new(default_pool, 256);
605 str_truncate(stream->line_str, 0);
606 if (stream->skip < end)
607 str_append_data(stream->line_str, stream->buffer + stream->skip,
608 end - stream->skip);
609 ret = str_c_modifiable(stream->line_str);
610 }
611
612 if (i < stream->pos)
613 i++;
614 stream->istream.v_offset += i - stream->skip;
615 stream->skip = i;
616 return ret;
617 }
618
i_stream_last_line(struct istream_private * _stream)619 static char *i_stream_last_line(struct istream_private *_stream)
620 {
621 if (_stream->istream.eof && _stream->skip != _stream->pos &&
622 _stream->return_nolf_line) {
623 /* the last line is missing LF and we want to return it. */
624 return i_stream_next_line_finish(_stream, _stream->pos);
625 }
626 return NULL;
627 }
628
i_stream_next_line(struct istream * stream)629 char *i_stream_next_line(struct istream *stream)
630 {
631 struct istream_private *_stream = stream->real_stream;
632 const unsigned char *pos;
633
634 if (_stream->skip >= _stream->pos)
635 return NULL;
636
637 pos = memchr(_stream->buffer + _stream->skip, '\n',
638 _stream->pos - _stream->skip);
639 if (pos != NULL) {
640 return i_stream_next_line_finish(_stream,
641 pos - _stream->buffer);
642 } else {
643 return i_stream_last_line(_stream);
644 }
645 }
646
i_stream_read_next_line(struct istream * stream)647 char *i_stream_read_next_line(struct istream *stream)
648 {
649 char *line;
650
651 for (;;) {
652 line = i_stream_next_line(stream);
653 if (line != NULL)
654 break;
655
656 switch (i_stream_read(stream)) {
657 case -2:
658 io_stream_set_error(&stream->real_stream->iostream,
659 "Line is too long (over %zu"
660 " bytes at offset %"PRIuUOFF_T")",
661 i_stream_get_data_size(stream), stream->v_offset);
662 stream->stream_errno = errno = ENOBUFS;
663 stream->eof = TRUE;
664 return NULL;
665 case -1:
666 return i_stream_last_line(stream->real_stream);
667 case 0:
668 return NULL;
669 }
670 }
671 return line;
672 }
673
i_stream_last_line_crlf(struct istream * stream)674 bool i_stream_last_line_crlf(struct istream *stream)
675 {
676 return stream->real_stream->line_crlf;
677 }
678
i_stream_is_buffer_invalid(const struct istream_private * stream)679 static bool i_stream_is_buffer_invalid(const struct istream_private *stream)
680 {
681 if (stream->parent == NULL) {
682 /* the buffer can't point to parent, because it doesn't exist */
683 return FALSE;
684 }
685 if (stream->w_buffer != NULL) {
686 /* we can pretty safely assume that the stream is using its
687 own private buffer, so it can never become invalid. */
688 return FALSE;
689 }
690 if (stream->access_counter !=
691 stream->parent->real_stream->access_counter) {
692 /* parent has been modified behind this stream, we can't trust
693 that our buffer is valid */
694 return TRUE;
695 }
696 return i_stream_is_buffer_invalid(stream->parent->real_stream);
697 }
698
699 const unsigned char *
i_stream_get_data(struct istream * stream,size_t * size_r)700 i_stream_get_data(struct istream *stream, size_t *size_r)
701 {
702 struct istream_private *_stream = stream->real_stream;
703
704 if (_stream->skip >= _stream->pos) {
705 *size_r = 0;
706 return uchar_empty_ptr;
707 }
708
709 if (unlikely(i_stream_is_buffer_invalid(_stream))) {
710 /* This stream may be using parent's buffer directly as
711 _stream->buffer, but the parent stream has already been
712 modified indirectly. This means that the buffer might no
713 longer point to where we assume it points to. So we'll
714 just return the stream as empty until it's read again.
715
716 It's a bit ugly to suddenly drop data from the stream that
717 was already read, but since this happens only with shared
718 parent istreams the caller is hopefully aware enough that
719 something like this might happen. The other solutions would
720 be to a) try to automatically read the data back (but we
721 can't handle errors..) or b) always copy data to stream's
722 own buffer instead of pointing to parent's buffer (but this
723 causes data copying that is nearly always unnecessary). */
724 *size_r = 0;
725 /* if we had already read until EOF, mark the stream again as
726 not being at the end of file. */
727 if (stream->stream_errno == 0) {
728 _stream->skip = _stream->pos = 0;
729 stream->eof = FALSE;
730 }
731 return uchar_empty_ptr;
732 }
733
734 *size_r = _stream->pos - _stream->skip;
735 return _stream->buffer + _stream->skip;
736 }
737
i_stream_get_data_size(struct istream * stream)738 size_t i_stream_get_data_size(struct istream *stream)
739 {
740 size_t size;
741
742 (void)i_stream_get_data(stream, &size);
743 return size;
744 }
745
i_stream_get_modifiable_data(struct istream * stream,size_t * size_r)746 unsigned char *i_stream_get_modifiable_data(struct istream *stream,
747 size_t *size_r)
748 {
749 struct istream_private *_stream = stream->real_stream;
750
751 if (_stream->skip >= _stream->pos || _stream->w_buffer == NULL) {
752 *size_r = 0;
753 return NULL;
754 }
755
756 *size_r = _stream->pos - _stream->skip;
757 return _stream->w_buffer + _stream->skip;
758 }
759
i_stream_read_data(struct istream * stream,const unsigned char ** data_r,size_t * size_r,size_t threshold)760 int i_stream_read_data(struct istream *stream, const unsigned char **data_r,
761 size_t *size_r, size_t threshold)
762 {
763 ssize_t ret = 0;
764 bool read_more = FALSE;
765
766 do {
767 *data_r = i_stream_get_data(stream, size_r);
768 if (*size_r > threshold)
769 return 1;
770
771 /* we need more data */
772 ret = i_stream_read(stream);
773 if (ret > 0)
774 read_more = TRUE;
775 } while (ret > 0);
776
777 *data_r = i_stream_get_data(stream, size_r);
778 if (ret == -2)
779 return -2;
780
781 if (ret == 0) {
782 /* need to read more */
783 i_assert(!stream->blocking);
784 return 0;
785 }
786 if (stream->eof) {
787 if (read_more) {
788 /* we read at least some new data */
789 return 0;
790 }
791 } else {
792 i_assert(stream->stream_errno != 0);
793 }
794 return -1;
795 }
796
i_stream_compress(struct istream_private * stream)797 void i_stream_compress(struct istream_private *stream)
798 {
799 i_assert(stream->memarea == NULL ||
800 memarea_get_refcount(stream->memarea) == 1);
801
802 if (stream->skip != stream->pos) {
803 memmove(stream->w_buffer, stream->w_buffer + stream->skip,
804 stream->pos - stream->skip);
805 }
806 stream->pos -= stream->skip;
807
808 stream->skip = 0;
809 }
810
i_stream_w_buffer_free(void * buf)811 static void i_stream_w_buffer_free(void *buf)
812 {
813 i_free(buf);
814 }
815
816 static void
i_stream_w_buffer_realloc(struct istream_private * stream,size_t old_size)817 i_stream_w_buffer_realloc(struct istream_private *stream, size_t old_size)
818 {
819 void *new_buffer;
820
821 if (stream->memarea != NULL &&
822 memarea_get_refcount(stream->memarea) == 1) {
823 /* Nobody else is referencing the memarea.
824 We can just reallocate it. */
825 memarea_free_without_callback(&stream->memarea);
826 new_buffer = i_realloc(stream->w_buffer, old_size,
827 stream->buffer_size);
828 } else {
829 new_buffer = i_malloc(stream->buffer_size);
830 if (old_size > 0) {
831 i_assert(stream->w_buffer != NULL);
832 memcpy(new_buffer, stream->w_buffer, old_size);
833 }
834 if (stream->memarea != NULL)
835 memarea_unref(&stream->memarea);
836 }
837
838 stream->w_buffer = new_buffer;
839 stream->buffer = new_buffer;
840
841 stream->memarea = memarea_init(stream->w_buffer, stream->buffer_size,
842 i_stream_w_buffer_free, new_buffer);
843 }
844
i_stream_grow_buffer(struct istream_private * stream,size_t bytes)845 void i_stream_grow_buffer(struct istream_private *stream, size_t bytes)
846 {
847 size_t old_size, max_size;
848
849 old_size = stream->buffer_size;
850
851 stream->buffer_size = stream->pos + bytes;
852 if (stream->buffer_size <= stream->init_buffer_size)
853 stream->buffer_size = stream->init_buffer_size;
854 else
855 stream->buffer_size = nearest_power(stream->buffer_size);
856
857 max_size = i_stream_get_max_buffer_size(&stream->istream);
858 i_assert(max_size > 0);
859 if (stream->buffer_size > max_size)
860 stream->buffer_size = max_size;
861
862 if (stream->buffer_size <= old_size)
863 stream->buffer_size = old_size;
864 else
865 i_stream_w_buffer_realloc(stream, old_size);
866 }
867
i_stream_try_alloc(struct istream_private * stream,size_t wanted_size,size_t * size_r)868 bool i_stream_try_alloc(struct istream_private *stream,
869 size_t wanted_size, size_t *size_r)
870 {
871 i_assert(wanted_size > 0);
872
873 if (wanted_size > stream->buffer_size - stream->pos) {
874 if (stream->skip > 0) {
875 /* remove the unused bytes from beginning of buffer */
876 if (stream->memarea != NULL &&
877 memarea_get_refcount(stream->memarea) > 1) {
878 /* The memarea is still referenced. We can't
879 overwrite data until extra references are
880 gone. */
881 i_stream_w_buffer_realloc(stream, stream->buffer_size);
882 }
883 i_stream_compress(stream);
884 } else if (stream->buffer_size < i_stream_get_max_buffer_size(&stream->istream)) {
885 /* buffer is full - grow it */
886 i_stream_grow_buffer(stream, I_STREAM_MIN_SIZE);
887 }
888 }
889
890 *size_r = stream->buffer_size - stream->pos;
891 if (stream->try_alloc_limit > 0 &&
892 *size_r > stream->try_alloc_limit)
893 *size_r = stream->try_alloc_limit;
894 return *size_r > 0;
895 }
896
897 bool ATTR_NOWARN_UNUSED_RESULT
i_stream_try_alloc_avoid_compress(struct istream_private * stream,size_t wanted_size,size_t * size_r)898 i_stream_try_alloc_avoid_compress(struct istream_private *stream,
899 size_t wanted_size, size_t *size_r)
900 {
901 size_t old_skip = stream->skip;
902
903 /* try first with skip=0, so no compression is done */
904 stream->skip = 0;
905 bool ret = i_stream_try_alloc(stream, wanted_size, size_r);
906 stream->skip = old_skip;
907 if (ret || old_skip == 0)
908 return ret;
909 /* it's full. try with compression. */
910 return i_stream_try_alloc(stream, wanted_size, size_r);
911 }
912
i_stream_alloc(struct istream_private * stream,size_t size)913 void *i_stream_alloc(struct istream_private *stream, size_t size)
914 {
915 size_t old_size, avail_size;
916
917 (void)i_stream_try_alloc(stream, size, &avail_size);
918 if (avail_size < size) {
919 old_size = stream->buffer_size;
920 stream->buffer_size = nearest_power(stream->pos + size);
921 i_stream_w_buffer_realloc(stream, old_size);
922
923 (void)i_stream_try_alloc(stream, size, &avail_size);
924 i_assert(avail_size >= size);
925 }
926 return stream->w_buffer + stream->pos;
927 }
928
i_stream_add_data(struct istream * _stream,const unsigned char * data,size_t size)929 bool i_stream_add_data(struct istream *_stream, const unsigned char *data,
930 size_t size)
931 {
932 struct istream_private *stream = _stream->real_stream;
933 size_t size2;
934
935 (void)i_stream_try_alloc(stream, size, &size2);
936 if (size > size2)
937 return FALSE;
938
939 memcpy(stream->w_buffer + stream->pos, data, size);
940 stream->pos += size;
941 return TRUE;
942 }
943
i_stream_get_root_io(struct istream * stream)944 struct istream *i_stream_get_root_io(struct istream *stream)
945 {
946 while (stream->real_stream->parent != NULL) {
947 i_assert(stream->real_stream->io == NULL);
948 stream = stream->real_stream->parent;
949 }
950 return stream;
951 }
952
i_stream_set_input_pending(struct istream * stream,bool pending)953 void i_stream_set_input_pending(struct istream *stream, bool pending)
954 {
955 if (!pending)
956 return;
957
958 stream = i_stream_get_root_io(stream);
959 if (stream->real_stream->io != NULL)
960 io_set_pending(stream->real_stream->io);
961 else
962 stream->real_stream->io_pending = TRUE;
963 }
964
i_stream_switch_ioloop_to(struct istream * stream,struct ioloop * ioloop)965 void i_stream_switch_ioloop_to(struct istream *stream, struct ioloop *ioloop)
966 {
967 io_stream_switch_ioloop_to(&stream->real_stream->iostream, ioloop);
968
969 do {
970 if (stream->real_stream->switch_ioloop_to != NULL) {
971 stream->real_stream->switch_ioloop_to(
972 stream->real_stream, ioloop);
973 }
974 stream = stream->real_stream->parent;
975 } while (stream != NULL);
976 }
977
i_stream_switch_ioloop(struct istream * stream)978 void i_stream_switch_ioloop(struct istream *stream)
979 {
980 i_stream_switch_ioloop_to(stream, current_ioloop);
981 }
982
i_stream_set_io(struct istream * stream,struct io * io)983 void i_stream_set_io(struct istream *stream, struct io *io)
984 {
985 stream = i_stream_get_root_io(stream);
986
987 i_assert(stream->real_stream->io == NULL);
988 stream->real_stream->io = io;
989 if (stream->real_stream->io_pending) {
990 io_set_pending(io);
991 stream->real_stream->io_pending = FALSE;
992 }
993 }
994
i_stream_unset_io(struct istream * stream,struct io * io)995 void i_stream_unset_io(struct istream *stream, struct io *io)
996 {
997 stream = i_stream_get_root_io(stream);
998
999 i_assert(stream->real_stream->io == io);
1000 if (io_is_pending(io))
1001 stream->real_stream->io_pending = TRUE;
1002 stream->real_stream->io = NULL;
1003 }
1004
1005 static void
i_stream_default_set_max_buffer_size(struct iostream_private * stream,size_t max_size)1006 i_stream_default_set_max_buffer_size(struct iostream_private *stream,
1007 size_t max_size)
1008 {
1009 struct istream_private *_stream =
1010 container_of(stream, struct istream_private, iostream);
1011
1012 _stream->max_buffer_size = max_size;
1013 if (_stream->parent != NULL)
1014 i_stream_set_max_buffer_size(_stream->parent, max_size);
1015 }
1016
i_stream_default_close(struct iostream_private * stream,bool close_parent)1017 static void i_stream_default_close(struct iostream_private *stream,
1018 bool close_parent)
1019 {
1020 struct istream_private *_stream =
1021 container_of(stream, struct istream_private, iostream);
1022
1023 if (close_parent)
1024 i_stream_close(_stream->parent);
1025 }
1026
i_stream_default_destroy(struct iostream_private * stream)1027 static void i_stream_default_destroy(struct iostream_private *stream)
1028 {
1029 struct istream_private *_stream =
1030 container_of(stream, struct istream_private, iostream);
1031
1032 i_stream_free_buffer(_stream);
1033 i_stream_unref(&_stream->parent);
1034 }
1035
1036 static void
i_stream_default_seek_seekable(struct istream_private * stream,uoff_t v_offset,bool mark ATTR_UNUSED)1037 i_stream_default_seek_seekable(struct istream_private *stream,
1038 uoff_t v_offset, bool mark ATTR_UNUSED)
1039 {
1040 stream->istream.v_offset = v_offset;
1041 stream->skip = stream->pos = 0;
1042 }
1043
i_stream_default_seek_nonseekable(struct istream_private * stream,uoff_t v_offset,bool mark ATTR_UNUSED)1044 void i_stream_default_seek_nonseekable(struct istream_private *stream,
1045 uoff_t v_offset, bool mark ATTR_UNUSED)
1046 {
1047 size_t available;
1048
1049 if (stream->istream.v_offset > v_offset)
1050 i_panic("stream %s doesn't support seeking backwards",
1051 i_stream_get_name(&stream->istream));
1052
1053 while (stream->istream.v_offset < v_offset) {
1054 (void)i_stream_read(&stream->istream);
1055
1056 available = stream->pos - stream->skip;
1057 if (available == 0) {
1058 if (stream->istream.stream_errno != 0) {
1059 /* read failed */
1060 return;
1061 }
1062 io_stream_set_error(&stream->iostream,
1063 "Can't seek to offset %"PRIuUOFF_T
1064 ", because we have data only up to offset %"
1065 PRIuUOFF_T" (eof=%d)", v_offset,
1066 stream->istream.v_offset, stream->istream.eof ? 1 : 0);
1067 stream->istream.stream_errno = ESPIPE;
1068 return;
1069 }
1070 if (available <= v_offset - stream->istream.v_offset)
1071 i_stream_skip(&stream->istream, available);
1072 else {
1073 i_stream_skip(&stream->istream,
1074 v_offset - stream->istream.v_offset);
1075 }
1076 }
1077 }
1078
i_stream_nonseekable_try_seek(struct istream_private * stream,uoff_t v_offset)1079 bool i_stream_nonseekable_try_seek(struct istream_private *stream,
1080 uoff_t v_offset)
1081 {
1082 uoff_t start_offset = stream->istream.v_offset - stream->skip;
1083
1084 if (v_offset < start_offset) {
1085 /* have to seek backwards */
1086 i_stream_seek(stream->parent, stream->parent_start_offset);
1087 stream->parent_expected_offset = stream->parent_start_offset;
1088 stream->skip = stream->pos = 0;
1089 stream->istream.v_offset = 0;
1090 stream->high_pos = 0;
1091 return FALSE;
1092 }
1093
1094 if (v_offset <= start_offset + stream->pos) {
1095 /* seeking backwards within what's already cached */
1096 stream->skip = v_offset - start_offset;
1097 stream->istream.v_offset = v_offset;
1098 if (stream->high_pos == 0)
1099 stream->high_pos = stream->pos;
1100 stream->pos = stream->skip;
1101 } else {
1102 /* read forward */
1103 i_stream_default_seek_nonseekable(stream, v_offset, FALSE);
1104 }
1105 return TRUE;
1106 }
1107
1108 static int
seekable_i_stream_get_size(struct istream_private * stream)1109 seekable_i_stream_get_size(struct istream_private *stream)
1110 {
1111 if (stream->cached_stream_size == UOFF_T_MAX) {
1112 uoff_t old_offset = stream->istream.v_offset;
1113 ssize_t ret;
1114
1115 do {
1116 i_stream_skip(&stream->istream,
1117 i_stream_get_data_size(&stream->istream));
1118 } while ((ret = i_stream_read(&stream->istream)) > 0);
1119 i_assert(ret == -1);
1120 if (stream->istream.stream_errno != 0)
1121 return -1;
1122
1123 stream->cached_stream_size = stream->istream.v_offset;
1124 i_stream_seek(&stream->istream, old_offset);
1125 }
1126 stream->statbuf.st_size = stream->cached_stream_size;
1127 return 0;
1128 }
1129
1130 static int
i_stream_default_stat(struct istream_private * stream,bool exact)1131 i_stream_default_stat(struct istream_private *stream, bool exact)
1132 {
1133 const struct stat *st;
1134
1135 if (stream->parent == NULL)
1136 return stream->istream.stream_errno == 0 ? 0 : -1;
1137
1138 if (i_stream_stat(stream->parent, exact, &st) < 0) {
1139 stream->istream.stream_errno = stream->parent->stream_errno;
1140 return -1;
1141 }
1142 stream->statbuf = *st;
1143 if (exact && !stream->stream_size_passthrough) {
1144 /* exact size is not known, even if parent returned something */
1145 stream->statbuf.st_size = -1;
1146 if (stream->istream.seekable) {
1147 if (seekable_i_stream_get_size(stream) < 0)
1148 return -1;
1149 }
1150 } else {
1151 /* When exact=FALSE always return the parent stat's size, even
1152 if we know the exact value. This is necessary because
1153 otherwise e.g. mbox code can see two different values and
1154 think that the mbox file keeps changing. */
1155 }
1156 return 0;
1157 }
1158
1159 static int
i_stream_default_get_size(struct istream_private * stream,bool exact,uoff_t * size_r)1160 i_stream_default_get_size(struct istream_private *stream,
1161 bool exact, uoff_t *size_r)
1162 {
1163 if (stream->stat(stream, exact) < 0)
1164 return -1;
1165 if (stream->statbuf.st_size == -1)
1166 return 0;
1167
1168 *size_r = stream->statbuf.st_size;
1169 return 1;
1170 }
1171
i_stream_init_parent(struct istream_private * _stream,struct istream * parent)1172 void i_stream_init_parent(struct istream_private *_stream,
1173 struct istream *parent)
1174 {
1175 _stream->access_counter = parent->real_stream->access_counter;
1176 _stream->parent = parent;
1177 _stream->parent_start_offset = parent->v_offset;
1178 _stream->parent_expected_offset = parent->v_offset;
1179 _stream->start_offset = parent->v_offset;
1180 /* if parent stream is an istream-error, copy the error */
1181 _stream->istream.stream_errno = parent->stream_errno;
1182 _stream->istream.eof = parent->eof;
1183 i_stream_ref(parent);
1184 }
1185
1186 struct istream *
i_stream_create(struct istream_private * _stream,struct istream * parent,int fd,enum istream_create_flag flags)1187 i_stream_create(struct istream_private *_stream, struct istream *parent, int fd,
1188 enum istream_create_flag flags)
1189 {
1190 bool noop_snapshot = (flags & ISTREAM_CREATE_FLAG_NOOP_SNAPSHOT) != 0;
1191
1192 _stream->fd = fd;
1193 if (parent != NULL)
1194 i_stream_init_parent(_stream, parent);
1195 else if (_stream->memarea == NULL && !noop_snapshot) {
1196 /* The stream has no parent and no memarea yet. We'll assume
1197 that it wants to be using memareas for the reads. */
1198 _stream->memarea = memarea_init_empty();
1199 }
1200 _stream->istream.real_stream = _stream;
1201
1202 if (_stream->iostream.close == NULL)
1203 _stream->iostream.close = i_stream_default_close;
1204 if (_stream->iostream.destroy == NULL)
1205 _stream->iostream.destroy = i_stream_default_destroy;
1206 if (_stream->seek == NULL) {
1207 _stream->seek = _stream->istream.seekable ?
1208 i_stream_default_seek_seekable :
1209 i_stream_default_seek_nonseekable;
1210 }
1211 if (_stream->stat == NULL)
1212 _stream->stat = i_stream_default_stat;
1213 if (_stream->get_size == NULL)
1214 _stream->get_size = i_stream_default_get_size;
1215 if (_stream->snapshot == NULL) {
1216 _stream->snapshot = noop_snapshot ?
1217 i_stream_noop_snapshot :
1218 i_stream_default_snapshot;
1219 }
1220 if (_stream->iostream.set_max_buffer_size == NULL) {
1221 _stream->iostream.set_max_buffer_size =
1222 i_stream_default_set_max_buffer_size;
1223 }
1224 if (_stream->init_buffer_size == 0)
1225 _stream->init_buffer_size = I_STREAM_MIN_SIZE;
1226
1227 i_zero(&_stream->statbuf);
1228 _stream->statbuf.st_size = -1;
1229 _stream->statbuf.st_atime =
1230 _stream->statbuf.st_mtime =
1231 _stream->statbuf.st_ctime = ioloop_time;
1232 _stream->cached_stream_size = UOFF_T_MAX;
1233
1234 io_stream_init(&_stream->iostream);
1235
1236 if (_stream->istream.stream_errno != 0)
1237 _stream->istream.eof = TRUE;
1238
1239 return &_stream->istream;
1240 }
1241
i_stream_create_error(int stream_errno)1242 struct istream *i_stream_create_error(int stream_errno)
1243 {
1244 struct istream_private *stream;
1245
1246 stream = i_new(struct istream_private, 1);
1247 stream->istream.closed = TRUE;
1248 stream->istream.readable_fd = FALSE;
1249 stream->istream.blocking = TRUE;
1250 stream->istream.seekable = TRUE;
1251 stream->istream.eof = TRUE;
1252 stream->istream.stream_errno = stream_errno;
1253 /* Nothing can ever actually be read from this stream, but set a
1254 reasonable max_buffer_size anyway since some filter istreams don't
1255 behave properly otherwise. */
1256 stream->max_buffer_size = IO_BLOCK_SIZE;
1257 i_stream_create(stream, NULL, -1, 0);
1258 i_stream_set_name(&stream->istream, "(error)");
1259 return &stream->istream;
1260 }
1261
1262 struct istream *
i_stream_create_error_str(int stream_errno,const char * fmt,...)1263 i_stream_create_error_str(int stream_errno, const char *fmt, ...)
1264 {
1265 struct istream *input;
1266 va_list args;
1267
1268 va_start(args, fmt);
1269 input = i_stream_create_error(stream_errno);
1270 io_stream_set_verror(&input->real_stream->iostream, fmt, args);
1271 va_end(args);
1272 return input;
1273 }
1274