1 /* Copyright (c) 2006-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "lib.h"
4 #include "istream-private.h"
5 #include "istream-tee.h"
6 
7 struct tee_istream {
8 	struct istream *input;
9 	struct tee_child_istream *children;
10 
11 	uoff_t max_read_offset;
12 };
13 
14 struct tee_child_istream {
15 	struct istream_private istream;
16 
17 	struct tee_istream *tee;
18 	struct tee_child_istream *next;
19 
20 	bool last_read_waiting:1;
21 };
22 
tee_streams_update_buffer(struct tee_istream * tee)23 static void tee_streams_update_buffer(struct tee_istream *tee)
24 {
25 	struct tee_child_istream *tstream = tee->children;
26 	const unsigned char *data;
27 	size_t size, old_used;
28 
29 	data = i_stream_get_data(tee->input, &size);
30 	for (; tstream != NULL; tstream = tstream->next) {
31 		if (tstream->istream.istream.closed) {
32 			tstream->istream.skip = tstream->istream.pos = 0;
33 			continue;
34 		}
35 		old_used = tstream->istream.pos - tstream->istream.skip;
36 
37 		tstream->istream.buffer = data;
38 		i_assert(tstream->istream.istream.v_offset >= tee->input->v_offset);
39 		tstream->istream.skip = tstream->istream.istream.v_offset -
40 			tee->input->v_offset;
41 		i_assert(tstream->istream.skip + old_used <= size);
42 		tstream->istream.pos = tstream->istream.skip + old_used;
43 
44 		tstream->istream.parent_expected_offset =
45 			tee->input->v_offset;
46 		tstream->istream.access_counter =
47 			tee->input->real_stream->access_counter;
48 	}
49 }
50 
tee_streams_skip(struct tee_istream * tee)51 static void tee_streams_skip(struct tee_istream *tee)
52 {
53 	struct tee_child_istream *tstream = tee->children;
54 	size_t min_skip;
55 
56 	min_skip = SIZE_MAX;
57 	for (; tstream != NULL; tstream = tstream->next) {
58 		if (tstream->istream.skip < min_skip &&
59 		    !tstream->istream.istream.closed)
60 			min_skip = tstream->istream.skip;
61 	}
62 
63 	if (min_skip > 0 && min_skip != SIZE_MAX) {
64 		i_stream_skip(tee->input, min_skip);
65 		tee_streams_update_buffer(tee);
66 	}
67 }
68 
i_stream_tee_close(struct iostream_private * stream,bool close_parent ATTR_UNUSED)69 static void i_stream_tee_close(struct iostream_private *stream,
70 			       bool close_parent ATTR_UNUSED)
71 {
72 	struct tee_child_istream *tstream =
73 		container_of(stream, struct tee_child_istream,
74 			     istream.iostream);
75 
76 	tee_streams_skip(tstream->tee);
77 }
78 
i_stream_tee_destroy(struct iostream_private * stream)79 static void i_stream_tee_destroy(struct iostream_private *stream)
80 {
81 	struct tee_child_istream *tstream =
82 		container_of(stream, struct tee_child_istream,
83 			     istream.iostream);
84 	struct tee_istream *tee = tstream->tee;
85 	struct tee_child_istream **p;
86 
87 	if (tstream->istream.istream.v_offset > tee->max_read_offset)
88 		tee->max_read_offset = tstream->istream.istream.v_offset;
89 
90 	for (p = &tee->children; *p != NULL; p = &(*p)->next) {
91 		if (*p == tstream) {
92 			*p = tstream->next;
93 			break;
94 		}
95 	}
96 
97 	if (tee->children == NULL) {
98 		/* last child. the tee is now destroyed */
99 		i_assert(tee->input->v_offset <= tee->max_read_offset);
100 		i_stream_skip(tee->input,
101 			      tee->max_read_offset - tee->input->v_offset);
102 
103 		i_stream_unref(&tee->input);
104 		i_free(tee);
105 	} else {
106 		tee_streams_skip(tstream->tee);
107 	}
108 	/* i_stream_unref() shouldn't unref the parent */
109 	tstream->istream.parent = NULL;
110 }
111 
112 static void
i_stream_tee_set_max_buffer_size(struct iostream_private * stream,size_t max_size)113 i_stream_tee_set_max_buffer_size(struct iostream_private *stream,
114 				 size_t max_size)
115 {
116 	struct tee_child_istream *tstream =
117 		container_of(stream, struct tee_child_istream,
118 			     istream.iostream);
119 
120 	tstream->istream.max_buffer_size = max_size;
121 	i_stream_set_max_buffer_size(tstream->tee->input, max_size);
122 }
123 
i_stream_tee_read(struct istream_private * stream)124 static ssize_t i_stream_tee_read(struct istream_private *stream)
125 {
126 	struct tee_child_istream *tstream =
127 		container_of(stream, struct tee_child_istream, istream);
128 	struct istream *input = tstream->tee->input;
129 	const unsigned char *data;
130 	size_t size;
131 	uoff_t last_high_offset;
132 	ssize_t ret;
133 
134 	tstream->last_read_waiting = FALSE;
135 	if (stream->buffer == NULL) {
136 		/* initial read */
137 		tee_streams_update_buffer(tstream->tee);
138 	}
139 	data = i_stream_get_data(input, &size);
140 
141 	/* last_high_offset contains how far we have read this child tee stream
142 	   so far. input->v_offset + size contains how much is available in
143 	   the parent stream without having to read more. */
144 	last_high_offset = stream->istream.v_offset +
145 		(stream->pos - stream->skip);
146 	if (stream->pos == size) {
147 		/* we've read everything, need to read more */
148 		i_assert(last_high_offset == input->v_offset + size);
149 		tee_streams_skip(tstream->tee);
150 		ret = i_stream_read(input);
151 		if (ret <= 0) {
152 			size = i_stream_get_data_size(input);
153 			if (ret == -2 && stream->skip != 0) {
154 				/* someone else is holding the data,
155 				   wait for it */
156 				tstream->last_read_waiting = TRUE;
157 				return 0;
158 			}
159 			stream->istream.stream_errno = input->stream_errno;
160 			stream->istream.eof = input->eof;
161 			return ret;
162 		}
163 		tee_streams_update_buffer(tstream->tee);
164 		data = i_stream_get_data(input, &size);
165 	} else {
166 		/* there's still some data available from parent */
167 		i_assert(last_high_offset < input->v_offset + size);
168 		tee_streams_update_buffer(tstream->tee);
169 		i_assert(stream->pos < size);
170 	}
171 
172 	i_assert(stream->buffer == data);
173 	ret = size - stream->pos;
174 	i_assert(ret > 0);
175 	stream->pos = size;
176 
177 	i_assert(stream->istream.v_offset + (stream->pos - stream->skip) ==
178 		 input->v_offset + size);
179 	return ret;
180 }
181 
182 static int
i_stream_tee_stat(struct istream_private * stream,bool exact)183 i_stream_tee_stat(struct istream_private *stream, bool exact)
184 {
185 	struct tee_child_istream *tstream =
186 		container_of(stream, struct tee_child_istream, istream);
187 	const struct stat *st;
188 
189 	if (i_stream_stat(tstream->tee->input, exact, &st) < 0)
190 		return -1;
191 	stream->statbuf = *st;
192 	return 0;
193 }
194 
i_stream_tee_sync(struct istream_private * stream)195 static void i_stream_tee_sync(struct istream_private *stream)
196 {
197 	struct tee_child_istream *tstream =
198 		container_of(stream, struct tee_child_istream, istream);
199 
200 	tee_streams_skip(tstream->tee);
201 	if (i_stream_get_data_size(tstream->tee->input) != 0) {
202 		i_panic("tee-istream: i_stream_sync() called "
203 			"with data still buffered");
204 	}
205 	i_stream_sync(tstream->tee->input);
206 }
207 
tee_i_stream_create(struct istream * input)208 struct tee_istream *tee_i_stream_create(struct istream *input)
209 {
210 	struct tee_istream *tee;
211 
212 	tee = i_new(struct tee_istream, 1);
213 	if (input->v_offset == 0) {
214 		i_stream_ref(input);
215 		tee->input = input;
216 	} else {
217 		tee->input = i_stream_create_limit(input, UOFF_T_MAX);
218 	}
219 	return tee;
220 }
221 
tee_i_stream_create_child(struct tee_istream * tee)222 struct istream *tee_i_stream_create_child(struct tee_istream *tee)
223 {
224 	struct tee_child_istream *tstream;
225 	struct istream *ret, *input = tee->input;
226 
227 	tstream = i_new(struct tee_child_istream, 1);
228 	tstream->tee = tee;
229 
230 	tstream->istream.max_buffer_size = input->real_stream->max_buffer_size;
231 	tstream->istream.iostream.close = i_stream_tee_close;
232 	tstream->istream.iostream.destroy = i_stream_tee_destroy;
233 	tstream->istream.iostream.set_max_buffer_size =
234 		i_stream_tee_set_max_buffer_size;
235 
236 	tstream->istream.read = i_stream_tee_read;
237 	tstream->istream.stat = i_stream_tee_stat;
238 	tstream->istream.sync = i_stream_tee_sync;
239 
240 	tstream->next = tee->children;
241 	tee->children = tstream;
242 
243 	ret = i_stream_create(&tstream->istream, input, i_stream_get_fd(input),
244 			      ISTREAM_CREATE_FLAG_NOOP_SNAPSHOT);
245 	i_stream_set_name(&tstream->istream.istream, i_stream_get_name(input));
246 	/* we keep the reference in tee stream, no need for extra references */
247 	i_stream_unref(&input);
248 	return ret;
249 }
250 
tee_i_stream_child_is_waiting(struct istream * input)251 bool tee_i_stream_child_is_waiting(struct istream *input)
252 {
253 	struct tee_child_istream *tstream =
254 		container_of(input->real_stream,
255 			     struct tee_child_istream, istream);
256 
257 	return tstream->last_read_waiting;
258 }
259