1 /* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "lib.h"
4 #include "buffer.h"
5 #include "str.h"
6 #include "iostream-pump.h"
7 #include "istream.h"
8 #include "ostream.h"
9 #include <unistd.h>
10 
11 #undef iostream_pump_set_completion_callback
12 
13 struct iostream_pump {
14 	int refcount;
15 
16 	struct istream *input;
17 	struct ostream *output;
18 
19 	struct io *io;
20 
21 	iostream_pump_callback_t *callback;
22 	void *context;
23 
24 	bool waiting_output;
25 	bool completed;
26 };
27 
iostream_pump_copy(struct iostream_pump * pump)28 static void iostream_pump_copy(struct iostream_pump *pump)
29 {
30 	enum ostream_send_istream_result res;
31 	size_t old_size;
32 
33 	o_stream_cork(pump->output);
34 	old_size = o_stream_get_max_buffer_size(pump->output);
35 	o_stream_set_max_buffer_size(pump->output,
36 		I_MIN(IO_BLOCK_SIZE,
37 		      o_stream_get_max_buffer_size(pump->output)));
38 	res = o_stream_send_istream(pump->output, pump->input);
39 	o_stream_set_max_buffer_size(pump->output, old_size);
40 	o_stream_uncork(pump->output);
41 
42 	switch(res) {
43 	case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
44 		io_remove(&pump->io);
45 		pump->callback(IOSTREAM_PUMP_STATUS_INPUT_ERROR,
46 			       pump->context);
47 		return;
48 	case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
49 		io_remove(&pump->io);
50 		pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
51 			       pump->context);
52 		return;
53 	case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
54 		i_assert(!pump->output->blocking);
55 		pump->waiting_output = TRUE;
56 		io_remove(&pump->io);
57 		return;
58 	case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
59 		pump->waiting_output = FALSE;
60 		io_remove(&pump->io);
61 		/* flush it */
62 		switch (o_stream_flush(pump->output)) {
63 		case -1:
64 			pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
65 				       pump->context);
66 			break;
67 		case 0:
68 			pump->waiting_output = TRUE;
69 			pump->completed = TRUE;
70 			break;
71 		default:
72 			pump->callback(IOSTREAM_PUMP_STATUS_INPUT_EOF,
73 				       pump->context);
74 			break;
75 		}
76 		return;
77 	case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
78 		i_assert(!pump->input->blocking);
79 		pump->waiting_output = FALSE;
80 		return;
81 	}
82 	i_unreached();
83 }
84 
iostream_pump_flush(struct iostream_pump * pump)85 static int iostream_pump_flush(struct iostream_pump *pump)
86 {
87 	int ret;
88 
89 	if ((ret = o_stream_flush(pump->output)) <= 0) {
90 		if (ret < 0) {
91 			pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
92 				       pump->context);
93 		}
94 		return ret;
95 	}
96 	pump->waiting_output = FALSE;
97 	if (pump->completed) {
98 		pump->callback(IOSTREAM_PUMP_STATUS_INPUT_EOF, pump->context);
99 		return 1;
100 	}
101 
102 	if (pump->input->blocking)
103 		iostream_pump_copy(pump);
104 	else if (pump->io == NULL) {
105 		pump->io = io_add_istream(pump->input,
106 					  iostream_pump_copy, pump);
107 		io_set_pending(pump->io);
108 	}
109 	return ret;
110 }
111 
112 struct iostream_pump *
iostream_pump_create(struct istream * input,struct ostream * output)113 iostream_pump_create(struct istream *input, struct ostream *output)
114 {
115 	struct iostream_pump *pump;
116 
117 	i_assert(input != NULL &&
118 		 output != NULL);
119 	i_assert(!input->blocking || !output->blocking);
120 
121 	/* ref streams */
122 	i_stream_ref(input);
123 	o_stream_ref(output);
124 
125 	/* create pump */
126 	pump = i_new(struct iostream_pump, 1);
127 	pump->refcount = 1;
128 	pump->input = input;
129 	pump->output = output;
130 
131 	return pump;
132 }
133 
iostream_pump_start(struct iostream_pump * pump)134 void iostream_pump_start(struct iostream_pump *pump)
135 {
136 	i_assert(pump != NULL);
137 	i_assert(pump->callback != NULL);
138 
139 	/* add flush handler */
140 	if (!pump->output->blocking) {
141 		o_stream_set_flush_callback(pump->output,
142 					    iostream_pump_flush, pump);
143 	}
144 
145 	/* make IO objects */
146 	if (pump->input->blocking) {
147 		i_assert(!pump->output->blocking);
148 		o_stream_set_flush_pending(pump->output, TRUE);
149 	} else {
150 		pump->io = io_add_istream(pump->input,
151 					  iostream_pump_copy, pump);
152 		io_set_pending(pump->io);
153 	}
154 }
155 
iostream_pump_get_input(struct iostream_pump * pump)156 struct istream *iostream_pump_get_input(struct iostream_pump *pump)
157 {
158 	i_assert(pump != NULL);
159 	return pump->input;
160 }
161 
iostream_pump_get_output(struct iostream_pump * pump)162 struct ostream *iostream_pump_get_output(struct iostream_pump *pump)
163 {
164 	i_assert(pump != NULL);
165 	return pump->output;
166 }
167 
iostream_pump_set_completion_callback(struct iostream_pump * pump,iostream_pump_callback_t * callback,void * context)168 void iostream_pump_set_completion_callback(struct iostream_pump *pump,
169 					   iostream_pump_callback_t *callback,
170 					   void *context)
171 {
172 	i_assert(pump != NULL);
173 	pump->callback = callback;
174 	pump->context = context;
175 }
176 
iostream_pump_ref(struct iostream_pump * pump)177 void iostream_pump_ref(struct iostream_pump *pump)
178 {
179 	i_assert(pump != NULL);
180 	i_assert(pump->refcount > 0);
181 	pump->refcount++;
182 }
183 
iostream_pump_unref(struct iostream_pump ** _pump)184 void iostream_pump_unref(struct iostream_pump **_pump)
185 {
186 	i_assert(_pump != NULL);
187 	struct iostream_pump *pump = *_pump;
188 
189 	if (pump == NULL)
190 		return;
191 
192 	i_assert(pump->refcount > 0);
193 
194 	*_pump = NULL;
195 
196 	if (--pump->refcount > 0)
197 		return;
198 
199 	iostream_pump_stop(pump);
200 
201 	o_stream_unref(&pump->output);
202 	i_stream_unref(&pump->input);
203 	i_free(pump);
204 }
205 
iostream_pump_destroy(struct iostream_pump ** _pump)206 void iostream_pump_destroy(struct iostream_pump **_pump)
207 {
208 	i_assert(_pump != NULL);
209 	struct iostream_pump *pump = *_pump;
210 
211 	if (pump == NULL)
212 		return;
213 
214 	*_pump = NULL;
215 
216 	iostream_pump_stop(pump);
217 	o_stream_unref(&pump->output);
218 	i_stream_unref(&pump->input);
219 
220 	iostream_pump_unref(&pump);
221 }
222 
iostream_pump_stop(struct iostream_pump * pump)223 void iostream_pump_stop(struct iostream_pump *pump)
224 {
225 	i_assert(pump != NULL);
226 
227 	if (pump->output != NULL)
228 		o_stream_unset_flush_callback(pump->output);
229 
230 	io_remove(&pump->io);
231 }
232 
iostream_pump_is_waiting_output(struct iostream_pump * pump)233 bool iostream_pump_is_waiting_output(struct iostream_pump *pump)
234 {
235 	return pump->waiting_output;
236 }
237 
iostream_pump_switch_ioloop_to(struct iostream_pump * pump,struct ioloop * ioloop)238 void iostream_pump_switch_ioloop_to(struct iostream_pump *pump,
239 				    struct ioloop *ioloop)
240 {
241 	i_assert(pump != NULL);
242 	if (pump->io != NULL)
243 		pump->io = io_loop_move_io_to(ioloop, &pump->io);
244 	o_stream_switch_ioloop_to(pump->output, ioloop);
245 	i_stream_switch_ioloop_to(pump->input, ioloop);
246 }
247 
iostream_pump_switch_ioloop(struct iostream_pump * pump)248 void iostream_pump_switch_ioloop(struct iostream_pump *pump)
249 {
250 	iostream_pump_switch_ioloop_to(pump, current_ioloop);
251 }
252