1 /* Copyright (c) 2014-2018 Dovecot authors, see the included COPYING file */
2
3 #include "lib.h"
4 #include "buffer.h"
5 #include "istream-private.h"
6 #include "istream-callback.h"
7
8 struct callback_istream {
9 struct istream_private istream;
10 istream_callback_read_t *callback;
11 void *context;
12
13 buffer_t *buf;
14 size_t prev_pos;
15 };
16
i_stream_callback_destroy(struct iostream_private * stream)17 static void i_stream_callback_destroy(struct iostream_private *stream)
18 {
19 struct callback_istream *cstream =
20 container_of(stream, struct callback_istream, istream.iostream);
21
22 buffer_free(&cstream->buf);
23 }
24
i_stream_callback_read(struct istream_private * stream)25 static ssize_t i_stream_callback_read(struct istream_private *stream)
26 {
27 struct callback_istream *cstream =
28 container_of(stream, struct callback_istream, istream);
29 size_t pos;
30
31 if (cstream->callback == NULL) {
32 /* already returned EOF / error */
33 stream->istream.eof = TRUE;
34 return -1;
35 }
36
37 if (stream->skip > 0) {
38 buffer_delete(cstream->buf, 0, stream->skip);
39 stream->pos -= stream->skip;
40 cstream->prev_pos -= stream->skip;
41 stream->skip = 0;
42 }
43 i_assert(cstream->buf->used >= cstream->prev_pos);
44 pos = cstream->prev_pos;
45 if (cstream->buf->used > pos) {
46 /* data was added outside the callback */
47 } else if (!cstream->callback(cstream->buf, cstream->context)) {
48 /* EOF / error */
49 stream->istream.eof = TRUE;
50 cstream->callback = NULL;
51 if (cstream->buf->used == pos ||
52 stream->istream.stream_errno != 0)
53 return -1;
54 /* EOF was returned with some data still added to the buffer.
55 return the buffer first and EOF only on the next call. */
56 } else if (cstream->buf->used == pos) {
57 /* buffer full */
58 i_assert(cstream->buf->used > 0);
59 return -2;
60 }
61 i_assert(cstream->buf->used > pos);
62 stream->buffer = cstream->buf->data;
63 cstream->prev_pos = stream->pos = cstream->buf->used;
64 return cstream->buf->used - pos;
65 }
66
67 #undef i_stream_create_callback
68 struct istream *
i_stream_create_callback(istream_callback_read_t * callback,void * context)69 i_stream_create_callback(istream_callback_read_t *callback, void *context)
70 {
71 struct callback_istream *cstream;
72 struct istream *istream;
73
74 i_assert(callback != NULL);
75
76 cstream = i_new(struct callback_istream, 1);
77 cstream->callback = callback;
78 cstream->context = context;
79 cstream->buf = buffer_create_dynamic(default_pool, 1024);
80
81 cstream->istream.iostream.destroy = i_stream_callback_destroy;
82 cstream->istream.read = i_stream_callback_read;
83
84 istream = i_stream_create(&cstream->istream, NULL, -1, 0);
85 istream->blocking = TRUE;
86 return istream;
87 }
88
i_stream_callback_append(struct istream * input,const void * data,size_t size)89 void i_stream_callback_append(struct istream *input,
90 const void *data, size_t size)
91 {
92 struct callback_istream *cstream =
93 container_of(input->real_stream,
94 struct callback_istream, istream);
95
96 buffer_append(cstream->buf, data, size);
97 }
98
i_stream_callback_append_str(struct istream * input,const char * str)99 void i_stream_callback_append_str(struct istream *input, const char *str)
100 {
101 i_stream_callback_append(input, str, strlen(str));
102 }
103
i_stream_callback_get_buffer(struct istream * input)104 buffer_t *i_stream_callback_get_buffer(struct istream *input)
105 {
106 struct callback_istream *cstream =
107 container_of(input->real_stream,
108 struct callback_istream, istream);
109
110 return cstream->buf;
111 }
112
i_stream_callback_set_error(struct istream * input,int stream_errno,const char * error)113 void i_stream_callback_set_error(struct istream *input, int stream_errno,
114 const char *error)
115 {
116 input->stream_errno = stream_errno;
117 io_stream_set_error(&input->real_stream->iostream, "%s", error);
118 }
119