1 /* Copyright (c) 2014-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "lib.h"
4 #include "buffer.h"
5 #include "istream-private.h"
6 #include "istream-callback.h"
7 
8 struct callback_istream {
9 	struct istream_private istream;
10 	istream_callback_read_t *callback;
11 	void *context;
12 
13 	buffer_t *buf;
14 	size_t prev_pos;
15 };
16 
i_stream_callback_destroy(struct iostream_private * stream)17 static void i_stream_callback_destroy(struct iostream_private *stream)
18 {
19 	struct callback_istream *cstream =
20 		container_of(stream, struct callback_istream, istream.iostream);
21 
22 	buffer_free(&cstream->buf);
23 }
24 
i_stream_callback_read(struct istream_private * stream)25 static ssize_t i_stream_callback_read(struct istream_private *stream)
26 {
27 	struct callback_istream *cstream =
28 		container_of(stream, struct callback_istream, istream);
29 	size_t pos;
30 
31 	if (cstream->callback == NULL) {
32 		/* already returned EOF / error */
33 		stream->istream.eof = TRUE;
34 		return -1;
35 	}
36 
37 	if (stream->skip > 0) {
38 		buffer_delete(cstream->buf, 0, stream->skip);
39 		stream->pos -= stream->skip;
40 		cstream->prev_pos -= stream->skip;
41 		stream->skip = 0;
42 	}
43 	i_assert(cstream->buf->used >= cstream->prev_pos);
44 	pos = cstream->prev_pos;
45 	if (cstream->buf->used > pos) {
46 		/* data was added outside the callback */
47 	} else if (!cstream->callback(cstream->buf, cstream->context)) {
48 		/* EOF / error */
49 		stream->istream.eof = TRUE;
50 		cstream->callback = NULL;
51 		if (cstream->buf->used == pos ||
52 		    stream->istream.stream_errno != 0)
53 			return -1;
54 		/* EOF was returned with some data still added to the buffer.
55 		   return the buffer first and EOF only on the next call. */
56 	} else if (cstream->buf->used == pos) {
57 		/* buffer full */
58 		i_assert(cstream->buf->used > 0);
59 		return -2;
60 	}
61 	i_assert(cstream->buf->used > pos);
62 	stream->buffer = cstream->buf->data;
63 	cstream->prev_pos = stream->pos = cstream->buf->used;
64 	return cstream->buf->used - pos;
65 }
66 
67 #undef i_stream_create_callback
68 struct istream *
i_stream_create_callback(istream_callback_read_t * callback,void * context)69 i_stream_create_callback(istream_callback_read_t *callback, void *context)
70 {
71 	struct callback_istream *cstream;
72 	struct istream *istream;
73 
74 	i_assert(callback != NULL);
75 
76 	cstream = i_new(struct callback_istream, 1);
77 	cstream->callback = callback;
78 	cstream->context = context;
79 	cstream->buf = buffer_create_dynamic(default_pool, 1024);
80 
81 	cstream->istream.iostream.destroy = i_stream_callback_destroy;
82 	cstream->istream.read = i_stream_callback_read;
83 
84 	istream = i_stream_create(&cstream->istream, NULL, -1, 0);
85 	istream->blocking = TRUE;
86 	return istream;
87 }
88 
i_stream_callback_append(struct istream * input,const void * data,size_t size)89 void i_stream_callback_append(struct istream *input,
90 			      const void *data, size_t size)
91 {
92 	struct callback_istream *cstream =
93 		container_of(input->real_stream,
94 			     struct callback_istream, istream);
95 
96 	buffer_append(cstream->buf, data, size);
97 }
98 
i_stream_callback_append_str(struct istream * input,const char * str)99 void i_stream_callback_append_str(struct istream *input, const char *str)
100 {
101 	i_stream_callback_append(input, str, strlen(str));
102 }
103 
i_stream_callback_get_buffer(struct istream * input)104 buffer_t *i_stream_callback_get_buffer(struct istream *input)
105 {
106 	struct callback_istream *cstream =
107 		container_of(input->real_stream,
108 			     struct callback_istream, istream);
109 
110 	return cstream->buf;
111 }
112 
i_stream_callback_set_error(struct istream * input,int stream_errno,const char * error)113 void i_stream_callback_set_error(struct istream *input, int stream_errno,
114 				 const char *error)
115 {
116 	input->stream_errno = stream_errno;
117 	io_stream_set_error(&input->real_stream->iostream, "%s", error);
118 }
119