1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Monkey HTTP Server
4 * ==================
5 * Copyright 2001-2017 Eduardo Silva <eduardo@monkey.io>
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19
20 #include <monkey/monkey.h>
21 #include <monkey/mk_stream.h>
22 #include <assert.h>
23
24 /* Create a new channel */
mk_channel_new(int type,int fd)25 struct mk_channel *mk_channel_new(int type, int fd)
26 {
27 struct mk_channel *channel;
28
29 channel = mk_mem_alloc(sizeof(struct mk_channel));
30 channel->type = type;
31 channel->fd = fd;
32 channel->status = MK_CHANNEL_OK;
33 mk_list_init(&channel->streams);
34
35 return channel;
36 }
37
channel_write_in_file(struct mk_channel * channel,struct mk_stream_input * in)38 static inline size_t channel_write_in_file(struct mk_channel *channel,
39 struct mk_stream_input *in)
40 {
41 ssize_t bytes = 0;
42
43 MK_TRACE("[CH %i] STREAM_FILE [fd=%i], bytes=%lu",
44 channel->fd, in->fd, in->bytes_total);
45
46 /* Direct write */
47 bytes = mk_sched_conn_sendfile(channel,
48 in->fd,
49 &in->bytes_offset,
50 in->bytes_total
51 );
52 MK_TRACE("[CH=%d] [FD=%i] WRITE STREAM FILE: %lu bytes",
53 channel->fd, in->fd, bytes);
54
55 return bytes;
56 }
57
mk_stream_size(struct mk_stream * stream)58 size_t mk_stream_size(struct mk_stream *stream)
59 {
60 return (stream->bytes_total - stream->bytes_offset);
61 }
62
63 /*
64 * It 'intent' to write a few streams over the channel and alter the
65 * channel notification side if required: READ -> WRITE.
66 */
mk_channel_flush(struct mk_channel * channel)67 int mk_channel_flush(struct mk_channel *channel)
68 {
69 int ret = 0;
70 size_t count = 0;
71 size_t total = 0;
72 uint32_t stop = (MK_CHANNEL_DONE | MK_CHANNEL_ERROR | MK_CHANNEL_EMPTY);
73
74 do {
75 ret = mk_channel_write(channel, &count);
76 total += count;
77
78 #ifdef MK_HAVE_TRACE
79 MK_TRACE("Channel flush: %d bytes", count);
80 if (ret & MK_CHANNEL_DONE) {
81 MK_TRACE("Channel was empty");
82 }
83 if (ret & MK_CHANNEL_ERROR) {
84 MK_TRACE("Channel error");
85 }
86 if (ret & MK_CHANNEL_EMPTY) {
87 MK_TRACE("Channel empty");
88 }
89 #endif
90 } while (total <= 4096 && ((ret & stop) == 0));
91
92 if (ret == MK_CHANNEL_DONE) {
93 MK_TRACE("Channel done");
94 return ret;
95 }
96 else if (ret & (MK_CHANNEL_FLUSH | MK_CHANNEL_BUSY)) {
97 MK_TRACE("Channel FLUSH | BUSY");
98 if ((channel->event->mask & MK_EVENT_WRITE) == 0) {
99 mk_event_add(mk_sched_loop(),
100 channel->fd,
101 MK_EVENT_CONNECTION,
102 MK_EVENT_WRITE,
103 channel->event);
104 }
105 }
106
107 return ret;
108 }
109
mk_stream_in_release(struct mk_stream_input * in)110 int mk_stream_in_release(struct mk_stream_input *in)
111 {
112 if (in->cb_finished) {
113 in->cb_finished(in);
114 }
115
116 mk_stream_input_unlink(in);
117 if (in->dynamic == MK_TRUE) {
118 mk_mem_free(in);
119 }
120
121 return 0;
122 }
123
mk_channel_stream_write(struct mk_stream * stream,size_t * count)124 int mk_channel_stream_write(struct mk_stream *stream, size_t *count)
125 {
126 ssize_t bytes = 0;
127 struct mk_iov *iov;
128 struct mk_list *tmp;
129 struct mk_list *head;
130 struct mk_channel *channel;
131 struct mk_stream_input *input;
132
133 channel = stream->channel;
134
135 /* Validate channel status */
136 if (channel->status != MK_CHANNEL_OK) {
137 return -MK_CHANNEL_ERROR;
138 }
139
140 /* Iterate inputs and process stream */
141 mk_list_foreach_safe(head, tmp, &stream->inputs) {
142 input = mk_list_entry(head, struct mk_stream_input, _head);
143 if (input->type == MK_STREAM_FILE) {
144 bytes = channel_write_in_file(channel, input);
145 }
146 else if (input->type == MK_STREAM_IOV) {
147 iov = input->buffer;
148 if (!iov) {
149 return MK_CHANNEL_EMPTY;
150 }
151
152 bytes = mk_sched_conn_writev(channel, iov);
153
154 MK_TRACE("[CH %i] STREAM_IOV, wrote %d bytes",
155 channel->fd, bytes);
156 if (bytes > 0) {
157 /* Perform the adjustment on mk_iov */
158 mk_iov_consume(iov, bytes);
159 }
160 }
161 else if (input->type == MK_STREAM_RAW) {
162 bytes = mk_sched_conn_write(channel,
163 input->buffer, input->bytes_total);
164 MK_TRACE("[CH %i] STREAM_RAW, bytes=%lu/%lu\n",
165 channel->fd, bytes, input->bytes_total);
166 }
167
168 if (bytes > 0) {
169 *count = bytes;
170 mk_stream_input_consume(input, bytes);
171
172 /* notification callback, optional */
173 if (stream->cb_bytes_consumed) {
174 stream->cb_bytes_consumed(stream, bytes);
175 }
176
177 if (input->cb_consumed) {
178 input->cb_consumed(input, bytes);
179 }
180
181 if (input->bytes_total == 0) {
182 MK_TRACE("Input done, unlinking (channel=%p)", channel);
183 mk_stream_in_release(input);
184 }
185 MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd);
186 }
187 else if (bytes < 0) {
188 mk_stream_in_release(input);
189 return -MK_CHANNEL_ERROR;
190 }
191 else if (bytes == 0) {
192 mk_stream_in_release(input);
193 return -MK_CHANNEL_ERROR;
194 }
195 }
196
197 return bytes;
198 }
199
200 /* It perform a direct stream I/O write through the network layer */
mk_channel_write(struct mk_channel * channel,size_t * count)201 int mk_channel_write(struct mk_channel *channel, size_t *count)
202 {
203 ssize_t bytes = -1;
204 struct mk_iov *iov;
205 struct mk_stream *stream = NULL;
206 struct mk_stream_input *input;
207
208 errno = 0;
209
210 if (mk_list_is_empty(&channel->streams) == 0) {
211 MK_TRACE("[CH %i] CHANNEL_EMPTY", channel->fd);
212 return MK_CHANNEL_EMPTY;
213 }
214
215 /* Get the input source */
216 stream = mk_list_entry_first(&channel->streams, struct mk_stream, _head);
217 if (mk_list_is_empty(&stream->inputs) == 0) {
218 return MK_CHANNEL_EMPTY;
219 }
220 input = mk_list_entry_first(&stream->inputs, struct mk_stream_input, _head);
221
222 /*
223 * Based on the Stream Input type we consume on that way, not all inputs
224 * requires to read from buffer, e.g: Static File, Pipes.
225 */
226 if (channel->type == MK_CHANNEL_SOCKET) {
227 if (input->type == MK_STREAM_FILE) {
228 bytes = channel_write_in_file(channel, input);
229 }
230 else if (input->type == MK_STREAM_IOV) {
231 iov = input->buffer;
232 if (!iov) {
233 return MK_CHANNEL_EMPTY;
234 }
235
236 bytes = mk_sched_conn_writev(channel, iov);
237
238 MK_TRACE("[CH %i] STREAM_IOV, wrote %d bytes",
239 channel->fd, bytes);
240 if (bytes > 0) {
241 /* Perform the adjustment on mk_iov */
242 mk_iov_consume(iov, bytes);
243 }
244 }
245 else if (input->type == MK_STREAM_RAW) {
246 bytes = mk_sched_conn_write(channel,
247 input->buffer, input->bytes_total);
248 MK_TRACE("[CH %i] STREAM_RAW, bytes=%lu/%lu",
249 channel->fd, bytes, input->bytes_total);
250 if (bytes > 0) {
251 /* DEPRECATED: consume_raw(input, bytes); */
252 }
253 }
254
255 if (bytes > 0) {
256 *count = bytes;
257 mk_stream_input_consume(input, bytes);
258
259 /* notification callback, optional */
260 if (stream->cb_bytes_consumed) {
261 stream->cb_bytes_consumed(stream, bytes);
262 }
263
264 if (input->cb_consumed) {
265 input->cb_consumed(input, bytes);
266 }
267
268 if (input->bytes_total == 0) {
269 MK_TRACE("Input done, unlinking (channel=%p)", channel);
270 mk_stream_in_release(input);
271 }
272
273 if (mk_list_is_empty(&stream->inputs) == 0) {
274 /* Everytime the stream is empty, we notify the trigger the cb */
275 if (stream->cb_finished) {
276 stream->cb_finished(stream);
277 }
278
279 if (mk_channel_is_empty(channel) == 0) {
280 MK_TRACE("[CH %i] CHANNEL_DONE", channel->fd);
281 return MK_CHANNEL_DONE;
282 }
283 else {
284 MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd);
285 return MK_CHANNEL_FLUSH;
286 }
287 }
288
289 MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd);
290 return MK_CHANNEL_FLUSH;
291 }
292 else if (bytes < 0) {
293 if (errno == EAGAIN) {
294 return MK_CHANNEL_BUSY;
295 }
296
297 mk_stream_in_release(input);
298 return MK_CHANNEL_ERROR;
299 }
300 else if (bytes == 0) {
301 mk_stream_in_release(input);
302 return MK_CHANNEL_ERROR;
303 }
304 }
305
306 return MK_CHANNEL_ERROR;
307 }
308
309 /* Remove any dynamic memory associated */
mk_channel_clean(struct mk_channel * channel)310 int mk_channel_clean(struct mk_channel *channel)
311 {
312 struct mk_list *tmp;
313 struct mk_list *tmp_in;
314 struct mk_list *head;
315 struct mk_list *head_in;
316 struct mk_stream *stream;
317 struct mk_stream_input *in;
318
319 mk_list_foreach_safe(head, tmp, &channel->streams) {
320 stream = mk_list_entry(head, struct mk_stream, _head);
321 mk_list_foreach_safe(head_in, tmp_in, &stream->inputs) {
322 in = mk_list_entry(head_in, struct mk_stream_input, _head);
323 mk_stream_in_release(in);
324 }
325 mk_stream_release(stream);
326 }
327
328 return 0;
329 }
330