1 /* Copyright (c) 2006-2018 Dovecot authors, see the included COPYING file */
2
3 #include "lib.h"
4 #include "istream-private.h"
5 #include "istream-tee.h"
6
7 struct tee_istream {
8 struct istream *input;
9 struct tee_child_istream *children;
10
11 uoff_t max_read_offset;
12 };
13
14 struct tee_child_istream {
15 struct istream_private istream;
16
17 struct tee_istream *tee;
18 struct tee_child_istream *next;
19
20 bool last_read_waiting:1;
21 };
22
tee_streams_update_buffer(struct tee_istream * tee)23 static void tee_streams_update_buffer(struct tee_istream *tee)
24 {
25 struct tee_child_istream *tstream = tee->children;
26 const unsigned char *data;
27 size_t size, old_used;
28
29 data = i_stream_get_data(tee->input, &size);
30 for (; tstream != NULL; tstream = tstream->next) {
31 if (tstream->istream.istream.closed) {
32 tstream->istream.skip = tstream->istream.pos = 0;
33 continue;
34 }
35 old_used = tstream->istream.pos - tstream->istream.skip;
36
37 tstream->istream.buffer = data;
38 i_assert(tstream->istream.istream.v_offset >= tee->input->v_offset);
39 tstream->istream.skip = tstream->istream.istream.v_offset -
40 tee->input->v_offset;
41 i_assert(tstream->istream.skip + old_used <= size);
42 tstream->istream.pos = tstream->istream.skip + old_used;
43
44 tstream->istream.parent_expected_offset =
45 tee->input->v_offset;
46 tstream->istream.access_counter =
47 tee->input->real_stream->access_counter;
48 }
49 }
50
tee_streams_skip(struct tee_istream * tee)51 static void tee_streams_skip(struct tee_istream *tee)
52 {
53 struct tee_child_istream *tstream = tee->children;
54 size_t min_skip;
55
56 min_skip = SIZE_MAX;
57 for (; tstream != NULL; tstream = tstream->next) {
58 if (tstream->istream.skip < min_skip &&
59 !tstream->istream.istream.closed)
60 min_skip = tstream->istream.skip;
61 }
62
63 if (min_skip > 0 && min_skip != SIZE_MAX) {
64 i_stream_skip(tee->input, min_skip);
65 tee_streams_update_buffer(tee);
66 }
67 }
68
i_stream_tee_close(struct iostream_private * stream,bool close_parent ATTR_UNUSED)69 static void i_stream_tee_close(struct iostream_private *stream,
70 bool close_parent ATTR_UNUSED)
71 {
72 struct tee_child_istream *tstream =
73 container_of(stream, struct tee_child_istream,
74 istream.iostream);
75
76 tee_streams_skip(tstream->tee);
77 }
78
i_stream_tee_destroy(struct iostream_private * stream)79 static void i_stream_tee_destroy(struct iostream_private *stream)
80 {
81 struct tee_child_istream *tstream =
82 container_of(stream, struct tee_child_istream,
83 istream.iostream);
84 struct tee_istream *tee = tstream->tee;
85 struct tee_child_istream **p;
86
87 if (tstream->istream.istream.v_offset > tee->max_read_offset)
88 tee->max_read_offset = tstream->istream.istream.v_offset;
89
90 for (p = &tee->children; *p != NULL; p = &(*p)->next) {
91 if (*p == tstream) {
92 *p = tstream->next;
93 break;
94 }
95 }
96
97 if (tee->children == NULL) {
98 /* last child. the tee is now destroyed */
99 i_assert(tee->input->v_offset <= tee->max_read_offset);
100 i_stream_skip(tee->input,
101 tee->max_read_offset - tee->input->v_offset);
102
103 i_stream_unref(&tee->input);
104 i_free(tee);
105 } else {
106 tee_streams_skip(tstream->tee);
107 }
108 /* i_stream_unref() shouldn't unref the parent */
109 tstream->istream.parent = NULL;
110 }
111
112 static void
i_stream_tee_set_max_buffer_size(struct iostream_private * stream,size_t max_size)113 i_stream_tee_set_max_buffer_size(struct iostream_private *stream,
114 size_t max_size)
115 {
116 struct tee_child_istream *tstream =
117 container_of(stream, struct tee_child_istream,
118 istream.iostream);
119
120 tstream->istream.max_buffer_size = max_size;
121 i_stream_set_max_buffer_size(tstream->tee->input, max_size);
122 }
123
i_stream_tee_read(struct istream_private * stream)124 static ssize_t i_stream_tee_read(struct istream_private *stream)
125 {
126 struct tee_child_istream *tstream =
127 container_of(stream, struct tee_child_istream, istream);
128 struct istream *input = tstream->tee->input;
129 const unsigned char *data;
130 size_t size;
131 uoff_t last_high_offset;
132 ssize_t ret;
133
134 tstream->last_read_waiting = FALSE;
135 if (stream->buffer == NULL) {
136 /* initial read */
137 tee_streams_update_buffer(tstream->tee);
138 }
139 data = i_stream_get_data(input, &size);
140
141 /* last_high_offset contains how far we have read this child tee stream
142 so far. input->v_offset + size contains how much is available in
143 the parent stream without having to read more. */
144 last_high_offset = stream->istream.v_offset +
145 (stream->pos - stream->skip);
146 if (stream->pos == size) {
147 /* we've read everything, need to read more */
148 i_assert(last_high_offset == input->v_offset + size);
149 tee_streams_skip(tstream->tee);
150 ret = i_stream_read(input);
151 if (ret <= 0) {
152 size = i_stream_get_data_size(input);
153 if (ret == -2 && stream->skip != 0) {
154 /* someone else is holding the data,
155 wait for it */
156 tstream->last_read_waiting = TRUE;
157 return 0;
158 }
159 stream->istream.stream_errno = input->stream_errno;
160 stream->istream.eof = input->eof;
161 return ret;
162 }
163 tee_streams_update_buffer(tstream->tee);
164 data = i_stream_get_data(input, &size);
165 } else {
166 /* there's still some data available from parent */
167 i_assert(last_high_offset < input->v_offset + size);
168 tee_streams_update_buffer(tstream->tee);
169 i_assert(stream->pos < size);
170 }
171
172 i_assert(stream->buffer == data);
173 ret = size - stream->pos;
174 i_assert(ret > 0);
175 stream->pos = size;
176
177 i_assert(stream->istream.v_offset + (stream->pos - stream->skip) ==
178 input->v_offset + size);
179 return ret;
180 }
181
182 static int
i_stream_tee_stat(struct istream_private * stream,bool exact)183 i_stream_tee_stat(struct istream_private *stream, bool exact)
184 {
185 struct tee_child_istream *tstream =
186 container_of(stream, struct tee_child_istream, istream);
187 const struct stat *st;
188
189 if (i_stream_stat(tstream->tee->input, exact, &st) < 0)
190 return -1;
191 stream->statbuf = *st;
192 return 0;
193 }
194
i_stream_tee_sync(struct istream_private * stream)195 static void i_stream_tee_sync(struct istream_private *stream)
196 {
197 struct tee_child_istream *tstream =
198 container_of(stream, struct tee_child_istream, istream);
199
200 tee_streams_skip(tstream->tee);
201 if (i_stream_get_data_size(tstream->tee->input) != 0) {
202 i_panic("tee-istream: i_stream_sync() called "
203 "with data still buffered");
204 }
205 i_stream_sync(tstream->tee->input);
206 }
207
tee_i_stream_create(struct istream * input)208 struct tee_istream *tee_i_stream_create(struct istream *input)
209 {
210 struct tee_istream *tee;
211
212 tee = i_new(struct tee_istream, 1);
213 if (input->v_offset == 0) {
214 i_stream_ref(input);
215 tee->input = input;
216 } else {
217 tee->input = i_stream_create_limit(input, UOFF_T_MAX);
218 }
219 return tee;
220 }
221
tee_i_stream_create_child(struct tee_istream * tee)222 struct istream *tee_i_stream_create_child(struct tee_istream *tee)
223 {
224 struct tee_child_istream *tstream;
225 struct istream *ret, *input = tee->input;
226
227 tstream = i_new(struct tee_child_istream, 1);
228 tstream->tee = tee;
229
230 tstream->istream.max_buffer_size = input->real_stream->max_buffer_size;
231 tstream->istream.iostream.close = i_stream_tee_close;
232 tstream->istream.iostream.destroy = i_stream_tee_destroy;
233 tstream->istream.iostream.set_max_buffer_size =
234 i_stream_tee_set_max_buffer_size;
235
236 tstream->istream.read = i_stream_tee_read;
237 tstream->istream.stat = i_stream_tee_stat;
238 tstream->istream.sync = i_stream_tee_sync;
239
240 tstream->next = tee->children;
241 tee->children = tstream;
242
243 ret = i_stream_create(&tstream->istream, input, i_stream_get_fd(input),
244 ISTREAM_CREATE_FLAG_NOOP_SNAPSHOT);
245 i_stream_set_name(&tstream->istream.istream, i_stream_get_name(input));
246 /* we keep the reference in tee stream, no need for extra references */
247 i_stream_unref(&input);
248 return ret;
249 }
250
tee_i_stream_child_is_waiting(struct istream * input)251 bool tee_i_stream_child_is_waiting(struct istream *input)
252 {
253 struct tee_child_istream *tstream =
254 container_of(input->real_stream,
255 struct tee_child_istream, istream);
256
257 return tstream->last_read_waiting;
258 }
259