1 /* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "lib.h"
4 #include "buffer.h"
5 #include "str.h"
6 #include "safe-mkstemp.h"
7 #include "write-full.h"
8 #include "istream-private.h"
9 #include "ostream-private.h"
10 #include "iostream-temp.h"
11 
12 #include <unistd.h>
13 
14 #define IOSTREAM_TEMP_MAX_BUF_SIZE_DEFAULT (1024*128)
15 
16 struct temp_ostream {
17 	struct ostream_private ostream;
18 
19 	char *temp_path_prefix;
20 	enum iostream_temp_flags flags;
21 	size_t max_mem_size;
22 
23 	struct istream *dupstream;
24 	uoff_t dupstream_offset, dupstream_start_offset;
25 	char *name;
26 
27 	buffer_t *buf;
28 	int fd;
29 	bool fd_tried;
30 	uoff_t fd_size;
31 };
32 
33 static bool o_stream_temp_dup_cancel(struct temp_ostream *tstream,
34 				     enum ostream_send_istream_result *res_r);
35 
36 static void
o_stream_temp_close(struct iostream_private * stream,bool close_parent ATTR_UNUSED)37 o_stream_temp_close(struct iostream_private *stream,
38 		    bool close_parent ATTR_UNUSED)
39 {
40 	struct temp_ostream *tstream =
41 		container_of(stream, struct temp_ostream, ostream.iostream);
42 
43 	i_close_fd(&tstream->fd);
44 	buffer_free(&tstream->buf);
45 	i_free(tstream->temp_path_prefix);
46 	i_free(tstream->name);
47 }
48 
o_stream_temp_move_to_fd(struct temp_ostream * tstream)49 static int o_stream_temp_move_to_fd(struct temp_ostream *tstream)
50 {
51 	string_t *path;
52 
53 	if (tstream->fd_tried)
54 		return -1;
55 	tstream->fd_tried = TRUE;
56 
57 	path = t_str_new(128);
58 	str_append(path, tstream->temp_path_prefix);
59 	tstream->fd = safe_mkstemp_hostpid(path, 0600, (uid_t)-1, (gid_t)-1);
60 	if (tstream->fd == -1) {
61 		i_error("safe_mkstemp(%s) failed: %m", str_c(path));
62 		return -1;
63 	}
64 	if (i_unlink(str_c(path)) < 0) {
65 		i_close_fd(&tstream->fd);
66 		return -1;
67 	}
68 	if (write_full(tstream->fd, tstream->buf->data, tstream->buf->used) < 0) {
69 		i_error("write(%s) failed: %m", str_c(path));
70 		i_close_fd(&tstream->fd);
71 		return -1;
72 	}
73 	/* make the fd available also to o_stream_get_fd(),
74 	   e.g. for unit tests */
75 	tstream->ostream.fd = tstream->fd;
76 	tstream->fd_size = tstream->buf->used;
77 	buffer_free(&tstream->buf);
78 	return 0;
79 }
80 
o_stream_temp_move_to_memory(struct ostream * output)81 int o_stream_temp_move_to_memory(struct ostream *output)
82 {
83 	struct temp_ostream *tstream =
84 		container_of(output->real_stream, struct temp_ostream, ostream);
85 	unsigned char buf[IO_BLOCK_SIZE];
86 	uoff_t offset = 0;
87 	ssize_t ret = 0;
88 
89 	i_assert(tstream->buf == NULL);
90 	tstream->buf = buffer_create_dynamic(default_pool, 8192);
91 	while (offset < tstream->ostream.ostream.offset &&
92 	       (ret = pread(tstream->fd, buf, sizeof(buf), offset)) > 0) {
93 		if ((size_t)ret > tstream->ostream.ostream.offset - offset)
94 			ret = tstream->ostream.ostream.offset - offset;
95 		buffer_append(tstream->buf, buf, ret);
96 		offset += ret;
97 	}
98 	if (ret < 0) {
99 		/* not really expecting this to happen */
100 		i_error("iostream-temp %s: read(%s*) failed: %m",
101 			o_stream_get_name(&tstream->ostream.ostream),
102 			tstream->temp_path_prefix);
103 		tstream->ostream.ostream.stream_errno = EIO;
104 		return -1;
105 	}
106 	i_close_fd(&tstream->fd);
107 	tstream->ostream.fd = -1;
108 	return 0;
109 }
110 
111 static ssize_t
o_stream_temp_fd_sendv(struct temp_ostream * tstream,const struct const_iovec * iov,unsigned int iov_count)112 o_stream_temp_fd_sendv(struct temp_ostream *tstream,
113 		       const struct const_iovec *iov, unsigned int iov_count)
114 {
115 	size_t bytes = 0;
116 	unsigned int i;
117 
118 	for (i = 0; i < iov_count; i++) {
119 		if (write_full(tstream->fd, iov[i].iov_base, iov[i].iov_len) < 0) {
120 			i_error("iostream-temp %s: write(%s*) failed: %m - moving to memory",
121 				o_stream_get_name(&tstream->ostream.ostream),
122 				tstream->temp_path_prefix);
123 			if (o_stream_temp_move_to_memory(&tstream->ostream.ostream) < 0)
124 				return -1;
125 			for (; i < iov_count; i++) {
126 				buffer_append(tstream->buf, iov[i].iov_base, iov[i].iov_len);
127 				bytes += iov[i].iov_len;
128 				tstream->ostream.ostream.offset += iov[i].iov_len;
129 			}
130 			i_assert(tstream->fd_tried);
131 			return bytes;
132 		}
133 		bytes += iov[i].iov_len;
134 		tstream->ostream.ostream.offset += iov[i].iov_len;
135 	}
136 	tstream->fd_size += bytes;
137 	return bytes;
138 }
139 
140 static ssize_t
o_stream_temp_sendv(struct ostream_private * stream,const struct const_iovec * iov,unsigned int iov_count)141 o_stream_temp_sendv(struct ostream_private *stream,
142 		    const struct const_iovec *iov, unsigned int iov_count)
143 {
144 	struct temp_ostream *tstream =
145 		container_of(stream, struct temp_ostream, ostream);
146 	ssize_t ret = 0;
147 	unsigned int i;
148 	enum ostream_send_istream_result res;
149 
150 
151 	tstream->flags &= ENUM_NEGATE(IOSTREAM_TEMP_FLAG_TRY_FD_DUP);
152 	if (tstream->dupstream != NULL) {
153 		if (o_stream_temp_dup_cancel(tstream, &res))
154 			return -1;
155 	}
156 
157 	if (tstream->fd != -1)
158 		return o_stream_temp_fd_sendv(tstream, iov, iov_count);
159 
160 	for (i = 0; i < iov_count; i++) {
161 		if (tstream->buf->used + iov[i].iov_len > tstream->max_mem_size) {
162 			if (o_stream_temp_move_to_fd(tstream) == 0) {
163 				i_assert(tstream->fd != -1);
164 				return o_stream_temp_fd_sendv(tstream, iov+i,
165 							      iov_count-i);
166 			}
167 			/* failed to move to temp fd, just keep it in memory */
168 		}
169 		buffer_append(tstream->buf, iov[i].iov_base, iov[i].iov_len);
170 		ret += iov[i].iov_len;
171 		stream->ostream.offset += iov[i].iov_len;
172 	}
173 	return ret;
174 }
175 
o_stream_temp_dup_cancel(struct temp_ostream * tstream,enum ostream_send_istream_result * res_r)176 static bool o_stream_temp_dup_cancel(struct temp_ostream *tstream,
177 				     enum ostream_send_istream_result *res_r)
178 {
179 	struct istream *input;
180 	uoff_t size = tstream->dupstream_offset -
181 		tstream->dupstream_start_offset;
182 	bool ret = TRUE; /* use res_r to return error */
183 
184 	i_stream_seek(tstream->dupstream, tstream->dupstream_start_offset);
185 	tstream->ostream.ostream.offset = 0;
186 
187 	input = i_stream_create_limit(tstream->dupstream, size);
188 	i_stream_unref(&tstream->dupstream);
189 
190 	*res_r = io_stream_copy(&tstream->ostream.ostream, input);
191 	switch (*res_r) {
192 	case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
193 		/* everything copied */
194 		ret = FALSE;
195 		break;
196 	case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
197 	case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
198 		i_unreached();
199 	case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
200 		tstream->ostream.ostream.stream_errno = input->stream_errno;
201 		io_stream_set_error(&tstream->ostream.iostream,
202 			"iostream-temp: read(%s) failed: %s",
203 			i_stream_get_name(input),
204 			i_stream_get_error(input));
205 		break;
206 	case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
207 		break;
208 	}
209 	i_stream_destroy(&input);
210 	return ret;
211 }
212 
213 static bool
o_stream_temp_dup_istream(struct temp_ostream * outstream,struct istream * instream,enum ostream_send_istream_result * res_r)214 o_stream_temp_dup_istream(struct temp_ostream *outstream,
215 			  struct istream *instream,
216 			  enum ostream_send_istream_result *res_r)
217 {
218 	uoff_t in_size;
219 
220 	if (!instream->readable_fd || i_stream_get_fd(instream) == -1)
221 		return FALSE;
222 
223 	if (i_stream_get_size(instream, TRUE, &in_size) <= 0) {
224 		if (outstream->dupstream != NULL)
225 			return o_stream_temp_dup_cancel(outstream, res_r);
226 		return FALSE;
227 	}
228 	i_assert(instream->v_offset <= in_size);
229 
230 	if (outstream->dupstream == NULL) {
231 		outstream->dupstream = instream;
232 		outstream->dupstream_start_offset = instream->v_offset;
233 		i_stream_ref(outstream->dupstream);
234 	} else {
235 		if (outstream->dupstream != instream ||
236 		    outstream->dupstream_offset != instream->v_offset ||
237 		    outstream->dupstream_offset > in_size)
238 			return o_stream_temp_dup_cancel(outstream, res_r);
239 	}
240 	i_stream_seek(instream, in_size);
241 	/* we should be at EOF now. o_stream_send_istream() asserts if
242 	   eof isn't set. */
243 	instream->eof = TRUE;
244 	outstream->dupstream_offset = instream->v_offset;
245 	outstream->ostream.ostream.offset =
246 		outstream->dupstream_offset - outstream->dupstream_start_offset;
247 	*res_r = OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
248 	return TRUE;
249 }
250 
251 static enum ostream_send_istream_result
o_stream_temp_send_istream(struct ostream_private * _outstream,struct istream * instream)252 o_stream_temp_send_istream(struct ostream_private *_outstream,
253 			   struct istream *instream)
254 {
255 	struct temp_ostream *outstream =
256 		container_of(_outstream, struct temp_ostream, ostream);
257 	enum ostream_send_istream_result res;
258 
259 	if ((outstream->flags & IOSTREAM_TEMP_FLAG_TRY_FD_DUP) != 0) {
260 		if (o_stream_temp_dup_istream(outstream, instream, &res))
261 			return res;
262 		outstream->flags &= ENUM_NEGATE(IOSTREAM_TEMP_FLAG_TRY_FD_DUP);
263 	}
264 	return io_stream_copy(&outstream->ostream.ostream, instream);
265 }
266 
267 static int
o_stream_temp_write_at(struct ostream_private * stream,const void * data,size_t size,uoff_t offset)268 o_stream_temp_write_at(struct ostream_private *stream,
269 		       const void *data, size_t size, uoff_t offset)
270 {
271 	struct temp_ostream *tstream =
272 		container_of(stream, struct temp_ostream, ostream);
273 
274 	if (tstream->fd == -1) {
275 		i_assert(stream->ostream.offset == tstream->buf->used);
276 		buffer_write(tstream->buf, offset, data, size);
277 		stream->ostream.offset = tstream->buf->used;
278 	} else {
279 		if (pwrite_full(tstream->fd, data, size, offset) < 0) {
280 			stream->ostream.stream_errno = errno;
281 			i_close_fd(&tstream->fd);
282 			return -1;
283 		}
284 		if (tstream->fd_size < offset + size)
285 			tstream->fd_size = offset + size;
286 	}
287 	return 0;
288 }
289 
o_stream_temp_seek(struct ostream_private * _stream,uoff_t offset)290 static int o_stream_temp_seek(struct ostream_private *_stream, uoff_t offset)
291 {
292 	_stream->ostream.offset = offset;
293 	return 0;
294 }
295 
iostream_temp_create(const char * temp_path_prefix,enum iostream_temp_flags flags)296 struct ostream *iostream_temp_create(const char *temp_path_prefix,
297 				     enum iostream_temp_flags flags)
298 {
299 	return iostream_temp_create_named(temp_path_prefix, flags, "");
300 }
301 
iostream_temp_create_named(const char * temp_path_prefix,enum iostream_temp_flags flags,const char * name)302 struct ostream *iostream_temp_create_named(const char *temp_path_prefix,
303 					   enum iostream_temp_flags flags,
304 					   const char *name)
305 {
306 	return iostream_temp_create_sized(temp_path_prefix, flags, name,
307 					  IOSTREAM_TEMP_MAX_BUF_SIZE_DEFAULT);
308 }
309 
iostream_temp_create_sized(const char * temp_path_prefix,enum iostream_temp_flags flags,const char * name,size_t max_mem_size)310 struct ostream *iostream_temp_create_sized(const char *temp_path_prefix,
311 					   enum iostream_temp_flags flags,
312 					   const char *name,
313 					   size_t max_mem_size)
314 {
315 	struct temp_ostream *tstream;
316 	struct ostream *output;
317 
318 	tstream = i_new(struct temp_ostream, 1);
319 	tstream->ostream.ostream.blocking = TRUE;
320 	tstream->ostream.sendv = o_stream_temp_sendv;
321 	tstream->ostream.send_istream = o_stream_temp_send_istream;
322 	tstream->ostream.write_at = o_stream_temp_write_at;
323 	tstream->ostream.seek = o_stream_temp_seek;
324 	tstream->ostream.iostream.close = o_stream_temp_close;
325 	tstream->temp_path_prefix = i_strdup(temp_path_prefix);
326 	tstream->flags = flags;
327 	tstream->max_mem_size = max_mem_size;
328 	tstream->buf = buffer_create_dynamic(default_pool, 8192);
329 	tstream->fd = -1;
330 
331 	output = o_stream_create(&tstream->ostream, NULL, -1);
332 	tstream->name = i_strdup(name);
333 	if (name[0] == '\0') {
334 		o_stream_set_name(output, t_strdup_printf(
335 			"(temp iostream in %s)", temp_path_prefix));
336 	} else {
337 		o_stream_set_name(output, t_strdup_printf(
338 			"(temp iostream in %s for %s)", temp_path_prefix, name));
339 	}
340 	return output;
341 }
342 
iostream_temp_buf_destroyed(buffer_t * buf)343 static void iostream_temp_buf_destroyed(buffer_t *buf)
344 {
345 	buffer_free(&buf);
346 }
347 
iostream_temp_finish(struct ostream ** output,size_t max_buffer_size)348 struct istream *iostream_temp_finish(struct ostream **output,
349 				     size_t max_buffer_size)
350 {
351 	struct temp_ostream *tstream =
352 		container_of((*output)->real_stream, struct temp_ostream,
353 			     ostream);
354 	struct istream *input, *input2;
355 	uoff_t abs_offset, size;
356 	const char *for_path;
357 	int fd;
358 
359 	if (tstream->name[0] == '\0')
360 		for_path = "";
361 	else
362 		for_path = t_strdup_printf(" for %s", tstream->name);
363 
364 	if (tstream->dupstream != NULL && !tstream->dupstream->closed) {
365 		abs_offset = i_stream_get_absolute_offset(tstream->dupstream) -
366 			tstream->dupstream->v_offset +
367 			tstream->dupstream_start_offset;
368 		size = tstream->dupstream_offset -
369 			tstream->dupstream_start_offset;
370 		fd = dup(i_stream_get_fd(tstream->dupstream));
371 		if (fd == -1)
372 			input = i_stream_create_error_str(errno, "dup() failed: %m");
373 		else {
374 			input2 = i_stream_create_fd_autoclose(&fd, max_buffer_size);
375 			i_stream_seek(input2, abs_offset);
376 			input = i_stream_create_limit(input2, size);
377 			i_stream_unref(&input2);
378 		}
379 		i_stream_set_name(input, t_strdup_printf(
380 			"(Temp file in %s%s, from %s)", tstream->temp_path_prefix,
381 			for_path, i_stream_get_name(tstream->dupstream)));
382 		i_stream_unref(&tstream->dupstream);
383 	} else if (tstream->dupstream != NULL) {
384 		/* return the original failed stream. */
385 		input = tstream->dupstream;
386 	} else if (tstream->fd != -1) {
387 		int fd = tstream->fd;
388 		input = i_stream_create_fd_autoclose(&tstream->fd, max_buffer_size);
389 		i_stream_set_name(input, t_strdup_printf(
390 			"(Temp file fd %d in %s%s, %"PRIuUOFF_T" bytes)",
391 			fd, tstream->temp_path_prefix, for_path, tstream->fd_size));
392 	} else {
393 		input = i_stream_create_from_data(tstream->buf->data,
394 						  tstream->buf->used);
395 		i_stream_set_name(input, t_strdup_printf(
396 			"(Temp buffer in %s%s, %zu bytes)",
397 			tstream->temp_path_prefix, for_path, tstream->buf->used));
398 		i_stream_add_destroy_callback(input, iostream_temp_buf_destroyed,
399 					      tstream->buf);
400 		tstream->buf = NULL;
401 	}
402 	o_stream_destroy(output);
403 	return input;
404 }
405