1 /* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */
2
3 #include "lib.h"
4 #include "buffer.h"
5 #include "str.h"
6 #include "iostream-pump.h"
7 #include "istream.h"
8 #include "ostream.h"
9 #include <unistd.h>
10
11 #undef iostream_pump_set_completion_callback
12
13 struct iostream_pump {
14 int refcount;
15
16 struct istream *input;
17 struct ostream *output;
18
19 struct io *io;
20
21 iostream_pump_callback_t *callback;
22 void *context;
23
24 bool waiting_output;
25 bool completed;
26 };
27
iostream_pump_copy(struct iostream_pump * pump)28 static void iostream_pump_copy(struct iostream_pump *pump)
29 {
30 enum ostream_send_istream_result res;
31 size_t old_size;
32
33 o_stream_cork(pump->output);
34 old_size = o_stream_get_max_buffer_size(pump->output);
35 o_stream_set_max_buffer_size(pump->output,
36 I_MIN(IO_BLOCK_SIZE,
37 o_stream_get_max_buffer_size(pump->output)));
38 res = o_stream_send_istream(pump->output, pump->input);
39 o_stream_set_max_buffer_size(pump->output, old_size);
40 o_stream_uncork(pump->output);
41
42 switch(res) {
43 case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
44 io_remove(&pump->io);
45 pump->callback(IOSTREAM_PUMP_STATUS_INPUT_ERROR,
46 pump->context);
47 return;
48 case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
49 io_remove(&pump->io);
50 pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
51 pump->context);
52 return;
53 case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
54 i_assert(!pump->output->blocking);
55 pump->waiting_output = TRUE;
56 io_remove(&pump->io);
57 return;
58 case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
59 pump->waiting_output = FALSE;
60 io_remove(&pump->io);
61 /* flush it */
62 switch (o_stream_flush(pump->output)) {
63 case -1:
64 pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
65 pump->context);
66 break;
67 case 0:
68 pump->waiting_output = TRUE;
69 pump->completed = TRUE;
70 break;
71 default:
72 pump->callback(IOSTREAM_PUMP_STATUS_INPUT_EOF,
73 pump->context);
74 break;
75 }
76 return;
77 case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
78 i_assert(!pump->input->blocking);
79 pump->waiting_output = FALSE;
80 return;
81 }
82 i_unreached();
83 }
84
iostream_pump_flush(struct iostream_pump * pump)85 static int iostream_pump_flush(struct iostream_pump *pump)
86 {
87 int ret;
88
89 if ((ret = o_stream_flush(pump->output)) <= 0) {
90 if (ret < 0) {
91 pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
92 pump->context);
93 }
94 return ret;
95 }
96 pump->waiting_output = FALSE;
97 if (pump->completed) {
98 pump->callback(IOSTREAM_PUMP_STATUS_INPUT_EOF, pump->context);
99 return 1;
100 }
101
102 if (pump->input->blocking)
103 iostream_pump_copy(pump);
104 else if (pump->io == NULL) {
105 pump->io = io_add_istream(pump->input,
106 iostream_pump_copy, pump);
107 io_set_pending(pump->io);
108 }
109 return ret;
110 }
111
112 struct iostream_pump *
iostream_pump_create(struct istream * input,struct ostream * output)113 iostream_pump_create(struct istream *input, struct ostream *output)
114 {
115 struct iostream_pump *pump;
116
117 i_assert(input != NULL &&
118 output != NULL);
119 i_assert(!input->blocking || !output->blocking);
120
121 /* ref streams */
122 i_stream_ref(input);
123 o_stream_ref(output);
124
125 /* create pump */
126 pump = i_new(struct iostream_pump, 1);
127 pump->refcount = 1;
128 pump->input = input;
129 pump->output = output;
130
131 return pump;
132 }
133
iostream_pump_start(struct iostream_pump * pump)134 void iostream_pump_start(struct iostream_pump *pump)
135 {
136 i_assert(pump != NULL);
137 i_assert(pump->callback != NULL);
138
139 /* add flush handler */
140 if (!pump->output->blocking) {
141 o_stream_set_flush_callback(pump->output,
142 iostream_pump_flush, pump);
143 }
144
145 /* make IO objects */
146 if (pump->input->blocking) {
147 i_assert(!pump->output->blocking);
148 o_stream_set_flush_pending(pump->output, TRUE);
149 } else {
150 pump->io = io_add_istream(pump->input,
151 iostream_pump_copy, pump);
152 io_set_pending(pump->io);
153 }
154 }
155
iostream_pump_get_input(struct iostream_pump * pump)156 struct istream *iostream_pump_get_input(struct iostream_pump *pump)
157 {
158 i_assert(pump != NULL);
159 return pump->input;
160 }
161
iostream_pump_get_output(struct iostream_pump * pump)162 struct ostream *iostream_pump_get_output(struct iostream_pump *pump)
163 {
164 i_assert(pump != NULL);
165 return pump->output;
166 }
167
iostream_pump_set_completion_callback(struct iostream_pump * pump,iostream_pump_callback_t * callback,void * context)168 void iostream_pump_set_completion_callback(struct iostream_pump *pump,
169 iostream_pump_callback_t *callback,
170 void *context)
171 {
172 i_assert(pump != NULL);
173 pump->callback = callback;
174 pump->context = context;
175 }
176
iostream_pump_ref(struct iostream_pump * pump)177 void iostream_pump_ref(struct iostream_pump *pump)
178 {
179 i_assert(pump != NULL);
180 i_assert(pump->refcount > 0);
181 pump->refcount++;
182 }
183
iostream_pump_unref(struct iostream_pump ** _pump)184 void iostream_pump_unref(struct iostream_pump **_pump)
185 {
186 i_assert(_pump != NULL);
187 struct iostream_pump *pump = *_pump;
188
189 if (pump == NULL)
190 return;
191
192 i_assert(pump->refcount > 0);
193
194 *_pump = NULL;
195
196 if (--pump->refcount > 0)
197 return;
198
199 iostream_pump_stop(pump);
200
201 o_stream_unref(&pump->output);
202 i_stream_unref(&pump->input);
203 i_free(pump);
204 }
205
iostream_pump_destroy(struct iostream_pump ** _pump)206 void iostream_pump_destroy(struct iostream_pump **_pump)
207 {
208 i_assert(_pump != NULL);
209 struct iostream_pump *pump = *_pump;
210
211 if (pump == NULL)
212 return;
213
214 *_pump = NULL;
215
216 iostream_pump_stop(pump);
217 o_stream_unref(&pump->output);
218 i_stream_unref(&pump->input);
219
220 iostream_pump_unref(&pump);
221 }
222
iostream_pump_stop(struct iostream_pump * pump)223 void iostream_pump_stop(struct iostream_pump *pump)
224 {
225 i_assert(pump != NULL);
226
227 if (pump->output != NULL)
228 o_stream_unset_flush_callback(pump->output);
229
230 io_remove(&pump->io);
231 }
232
iostream_pump_is_waiting_output(struct iostream_pump * pump)233 bool iostream_pump_is_waiting_output(struct iostream_pump *pump)
234 {
235 return pump->waiting_output;
236 }
237
iostream_pump_switch_ioloop_to(struct iostream_pump * pump,struct ioloop * ioloop)238 void iostream_pump_switch_ioloop_to(struct iostream_pump *pump,
239 struct ioloop *ioloop)
240 {
241 i_assert(pump != NULL);
242 if (pump->io != NULL)
243 pump->io = io_loop_move_io_to(ioloop, &pump->io);
244 o_stream_switch_ioloop_to(pump->output, ioloop);
245 i_stream_switch_ioloop_to(pump->input, ioloop);
246 }
247
iostream_pump_switch_ioloop(struct iostream_pump * pump)248 void iostream_pump_switch_ioloop(struct iostream_pump *pump)
249 {
250 iostream_pump_switch_ioloop_to(pump, current_ioloop);
251 }
252