1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Monkey HTTP Server
4  *  ==================
5  *  Copyright 2001-2017 Eduardo Silva <eduardo@monkey.io>
6  *
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  */
19 
20 #include <monkey/monkey.h>
21 #include <monkey/mk_stream.h>
22 #include <assert.h>
23 
24 /* Create a new channel */
mk_channel_new(int type,int fd)25 struct mk_channel *mk_channel_new(int type, int fd)
26 {
27     struct mk_channel *channel;
28 
29     channel = mk_mem_alloc(sizeof(struct mk_channel));
30     channel->type   = type;
31     channel->fd     = fd;
32     channel->status = MK_CHANNEL_OK;
33     mk_list_init(&channel->streams);
34 
35     return channel;
36 }
37 
channel_write_in_file(struct mk_channel * channel,struct mk_stream_input * in)38 static inline size_t channel_write_in_file(struct mk_channel *channel,
39                                            struct mk_stream_input *in)
40 {
41     ssize_t bytes = 0;
42 
43     MK_TRACE("[CH %i] STREAM_FILE [fd=%i], bytes=%lu",
44              channel->fd, in->fd, in->bytes_total);
45 
46     /* Direct write */
47     bytes = mk_sched_conn_sendfile(channel,
48                                    in->fd,
49                                    &in->bytes_offset,
50                                    in->bytes_total
51                                    );
52     MK_TRACE("[CH=%d] [FD=%i] WRITE STREAM FILE: %lu bytes",
53              channel->fd, in->fd, bytes);
54 
55     return bytes;
56 }
57 
mk_stream_size(struct mk_stream * stream)58 size_t mk_stream_size(struct mk_stream *stream)
59 {
60     return (stream->bytes_total - stream->bytes_offset);
61 }
62 
63 /*
64  * It 'intent' to write a few streams over the channel and alter the
65  * channel notification side if required: READ -> WRITE.
66  */
mk_channel_flush(struct mk_channel * channel)67 int mk_channel_flush(struct mk_channel *channel)
68 {
69     int ret = 0;
70     size_t count = 0;
71     size_t total = 0;
72     uint32_t stop = (MK_CHANNEL_DONE | MK_CHANNEL_ERROR | MK_CHANNEL_EMPTY);
73 
74     do {
75         ret = mk_channel_write(channel, &count);
76         total += count;
77 
78 #ifdef MK_HAVE_TRACE
79         MK_TRACE("Channel flush: %d bytes", count);
80         if (ret & MK_CHANNEL_DONE) {
81             MK_TRACE("Channel was empty");
82         }
83         if (ret & MK_CHANNEL_ERROR) {
84             MK_TRACE("Channel error");
85         }
86         if (ret & MK_CHANNEL_EMPTY) {
87             MK_TRACE("Channel empty");
88         }
89 #endif
90     } while (total <= 4096 && ((ret & stop) == 0));
91 
92     if (ret == MK_CHANNEL_DONE) {
93         MK_TRACE("Channel done");
94         return ret;
95     }
96     else if (ret & (MK_CHANNEL_FLUSH | MK_CHANNEL_BUSY)) {
97         MK_TRACE("Channel FLUSH | BUSY");
98         if ((channel->event->mask & MK_EVENT_WRITE) == 0) {
99             mk_event_add(mk_sched_loop(),
100                          channel->fd,
101                          MK_EVENT_CONNECTION,
102                          MK_EVENT_WRITE,
103                          channel->event);
104         }
105     }
106 
107     return ret;
108 }
109 
mk_stream_in_release(struct mk_stream_input * in)110 int mk_stream_in_release(struct mk_stream_input *in)
111 {
112     if (in->cb_finished) {
113         in->cb_finished(in);
114     }
115 
116     mk_stream_input_unlink(in);
117     if (in->dynamic == MK_TRUE) {
118         mk_mem_free(in);
119     }
120 
121     return 0;
122 }
123 
mk_channel_stream_write(struct mk_stream * stream,size_t * count)124 int mk_channel_stream_write(struct mk_stream *stream, size_t *count)
125 {
126     ssize_t bytes = 0;
127     struct mk_iov *iov;
128     struct mk_list *tmp;
129     struct mk_list *head;
130     struct mk_channel *channel;
131     struct mk_stream_input *input;
132 
133     channel = stream->channel;
134 
135     /* Validate channel status */
136     if (channel->status != MK_CHANNEL_OK) {
137         return -MK_CHANNEL_ERROR;
138     }
139 
140     /* Iterate inputs and process stream */
141     mk_list_foreach_safe(head, tmp, &stream->inputs) {
142         input = mk_list_entry(head, struct mk_stream_input, _head);
143         if (input->type == MK_STREAM_FILE) {
144             bytes = channel_write_in_file(channel, input);
145         }
146         else if (input->type == MK_STREAM_IOV) {
147             iov = input->buffer;
148             if (!iov) {
149                 return MK_CHANNEL_EMPTY;
150             }
151 
152             bytes = mk_sched_conn_writev(channel, iov);
153 
154             MK_TRACE("[CH %i] STREAM_IOV, wrote %d bytes",
155                      channel->fd, bytes);
156             if (bytes > 0) {
157                 /* Perform the adjustment on mk_iov */
158                 mk_iov_consume(iov, bytes);
159             }
160         }
161         else if (input->type == MK_STREAM_RAW) {
162             bytes = mk_sched_conn_write(channel,
163                                         input->buffer, input->bytes_total);
164             MK_TRACE("[CH %i] STREAM_RAW, bytes=%lu/%lu\n",
165                      channel->fd, bytes, input->bytes_total);
166         }
167 
168         if (bytes > 0) {
169             *count = bytes;
170             mk_stream_input_consume(input, bytes);
171 
172             /* notification callback, optional */
173             if (stream->cb_bytes_consumed) {
174                 stream->cb_bytes_consumed(stream, bytes);
175             }
176 
177             if (input->cb_consumed) {
178                 input->cb_consumed(input, bytes);
179             }
180 
181             if (input->bytes_total == 0) {
182                 MK_TRACE("Input done, unlinking (channel=%p)", channel);
183                 mk_stream_in_release(input);
184             }
185             MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd);
186         }
187         else if (bytes < 0) {
188             mk_stream_in_release(input);
189             return -MK_CHANNEL_ERROR;
190         }
191         else if (bytes == 0) {
192             mk_stream_in_release(input);
193             return -MK_CHANNEL_ERROR;
194         }
195     }
196 
197     return bytes;
198 }
199 
200 /* It perform a direct stream I/O write through the network layer */
mk_channel_write(struct mk_channel * channel,size_t * count)201 int mk_channel_write(struct mk_channel *channel, size_t *count)
202 {
203     ssize_t bytes = -1;
204     struct mk_iov *iov;
205     struct mk_stream *stream = NULL;
206     struct mk_stream_input *input;
207 
208     errno = 0;
209 
210     if (mk_list_is_empty(&channel->streams) == 0) {
211         MK_TRACE("[CH %i] CHANNEL_EMPTY", channel->fd);
212         return MK_CHANNEL_EMPTY;
213     }
214 
215     /* Get the input source */
216     stream = mk_list_entry_first(&channel->streams, struct mk_stream, _head);
217     if (mk_list_is_empty(&stream->inputs) == 0) {
218         return MK_CHANNEL_EMPTY;
219     }
220     input = mk_list_entry_first(&stream->inputs, struct mk_stream_input, _head);
221 
222     /*
223      * Based on the Stream Input type we consume on that way, not all inputs
224      * requires to read from buffer, e.g: Static File, Pipes.
225      */
226     if (channel->type == MK_CHANNEL_SOCKET) {
227         if (input->type == MK_STREAM_FILE) {
228             bytes = channel_write_in_file(channel, input);
229         }
230         else if (input->type == MK_STREAM_IOV) {
231             iov   = input->buffer;
232             if (!iov) {
233                 return MK_CHANNEL_EMPTY;
234             }
235 
236             bytes = mk_sched_conn_writev(channel, iov);
237 
238             MK_TRACE("[CH %i] STREAM_IOV, wrote %d bytes",
239                      channel->fd, bytes);
240             if (bytes > 0) {
241                 /* Perform the adjustment on mk_iov */
242                 mk_iov_consume(iov, bytes);
243             }
244         }
245         else if (input->type == MK_STREAM_RAW) {
246             bytes = mk_sched_conn_write(channel,
247                                         input->buffer, input->bytes_total);
248             MK_TRACE("[CH %i] STREAM_RAW, bytes=%lu/%lu",
249                      channel->fd, bytes, input->bytes_total);
250             if (bytes > 0) {
251                 /* DEPRECATED: consume_raw(input, bytes); */
252             }
253         }
254 
255         if (bytes > 0) {
256             *count = bytes;
257             mk_stream_input_consume(input, bytes);
258 
259             /* notification callback, optional */
260             if (stream->cb_bytes_consumed) {
261                 stream->cb_bytes_consumed(stream, bytes);
262             }
263 
264             if (input->cb_consumed) {
265                 input->cb_consumed(input, bytes);
266             }
267 
268             if (input->bytes_total == 0) {
269                 MK_TRACE("Input done, unlinking (channel=%p)", channel);
270                 mk_stream_in_release(input);
271             }
272 
273             if (mk_list_is_empty(&stream->inputs) == 0) {
274                 /* Everytime the stream is empty, we notify the trigger the cb */
275                 if (stream->cb_finished) {
276                     stream->cb_finished(stream);
277                 }
278 
279                 if (mk_channel_is_empty(channel) == 0) {
280                     MK_TRACE("[CH %i] CHANNEL_DONE", channel->fd);
281                     return MK_CHANNEL_DONE;
282                 }
283                 else {
284                     MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd);
285                     return MK_CHANNEL_FLUSH;
286                 }
287             }
288 
289             MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd);
290             return MK_CHANNEL_FLUSH;
291         }
292         else if (bytes < 0) {
293             if (errno == EAGAIN) {
294                 return MK_CHANNEL_BUSY;
295             }
296 
297             mk_stream_in_release(input);
298             return MK_CHANNEL_ERROR;
299         }
300         else if (bytes == 0) {
301             mk_stream_in_release(input);
302             return MK_CHANNEL_ERROR;
303         }
304     }
305 
306     return MK_CHANNEL_ERROR;
307 }
308 
309 /* Remove any dynamic memory associated */
mk_channel_clean(struct mk_channel * channel)310 int mk_channel_clean(struct mk_channel *channel)
311 {
312     struct mk_list *tmp;
313     struct mk_list *tmp_in;
314     struct mk_list *head;
315     struct mk_list *head_in;
316     struct mk_stream *stream;
317     struct mk_stream_input *in;
318 
319     mk_list_foreach_safe(head, tmp, &channel->streams) {
320         stream = mk_list_entry(head, struct mk_stream, _head);
321         mk_list_foreach_safe(head_in, tmp_in, &stream->inputs) {
322             in = mk_list_entry(head_in, struct mk_stream_input, _head);
323             mk_stream_in_release(in);
324         }
325         mk_stream_release(stream);
326     }
327 
328     return 0;
329 }
330