1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Monkey HTTP Server
4  *  ==================
5  *  Copyright 2001-2017 Eduardo Silva <eduardo@monkey.io>
6  *
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  */
19 
20 #ifndef MK_STREAM_H
21 #define MK_STREAM_H
22 
23 #include <monkey/mk_core.h>
24 #include <monkey/mk_plugin_net.h>
25 
26 /*
27  * Stream types: each stream can have a different
28  * source of information and for hence it handler
29  * may need to be different for each cases.
30  */
31 #define MK_STREAM_RAW       0  /* raw data from buffer */
32 #define MK_STREAM_IOV       1  /* mk_iov struct        */
33 #define MK_STREAM_FILE      2  /* opened file          */
34 #define MK_STREAM_SOCKET    3  /* socket, scared..     */
35 
36 /* Channel return values for write event */
37 #define MK_CHANNEL_OK       0  /* channel is ok (channel->status) */
38 #define MK_CHANNEL_DONE     1  /* channel consumed all streams */
39 #define MK_CHANNEL_ERROR    2  /* exception when flusing data  */
40 #define MK_CHANNEL_FLUSH    4  /* channel flushed some data    */
41 #define MK_CHANNEL_EMPTY    8  /* no streams available         */
42 #define MK_CHANNEL_BUSY    16  /* cannot write, busy (EAGAIN)  */
43 #define MK_CHANNEL_UNKNOWN 32  /* unhandled                    */
44 
45 /* Channel status */
46 #define MK_CHANNEL_DISABLED 0 /* channel is sleeping */
47 #define MK_CHANNEL_ENABLED  1 /* channel enabled, have some data */
48 
49 /*
50  * Channel types: by default the only channel supported
51  * is a direct write to the network layer.
52  */
53 #define MK_CHANNEL_SOCKET 0
54 
55 /*
56  * A channel represents an end-point of a stream, for short
57  * where the stream data consumed and is send to. The channel
58  * knows how to read/write to a TCP connection through a
59  * defined network plugin.
60  */
61 struct mk_channel {
62     int type;
63     int fd;
64     int status;
65 
66     struct mk_event *event;
67     struct mk_plugin_network *io;
68     struct mk_list streams;
69     void *thread;
70 };
71 
72 /* Stream input source */
73 struct mk_stream_input {
74     int type;              /* input type                      */
75     int fd;                /* file descriptor (files)         */
76     int dynamic;
77 
78     size_t bytes_total;    /* Total of data from the input    */
79     off_t  bytes_offset;   /* Data already sent               */
80 
81     /*
82      * Based on the stream input type, 'data' could reference a RAW buffer
83      * or a mk_iov struct.
84      */
85     void *buffer;
86     void *context;
87 
88     /* callbacks */
89     void (*cb_consumed)(struct mk_stream_input *, long);
90     void (*cb_finished)(struct mk_stream_input *);
91 
92     struct mk_stream *stream; /* reference to parent stream */
93     struct mk_list _head;     /* link to inputs stream list */
94 };
95 
96 /*
97  * A stream holds a queue of components that refers to different
98  * data sources such as: static file, raw buffer, etc.
99  */
100 struct mk_stream {
101     int preserve;          /* preserve stream? (do not unlink) */
102     int encoding;          /* some output encoding ?           */
103     int dynamic;           /* dynamic allocated ?              */
104 
105     size_t bytes_total;    /* Total of data from stream_input  */
106     off_t  bytes_offset;   /* Data already sent                */
107 
108     /* the outgoing channel, we do this for all streams */
109     struct mk_channel *channel;
110 
111     /* Context the caller may want to reference with the stream (optional) */
112     void *context;
113 
114     /* callbacks */
115     void (*cb_finished) (struct mk_stream *);
116     void (*cb_bytes_consumed) (struct mk_stream *, long);
117     void (*cb_exception) (struct mk_stream *, int);
118 
119     /* Head of stream_input nodes */
120     struct mk_list inputs;
121 
122     /* Link to the Channel parent */
123     struct mk_list _head;
124 };
125 
126 int mk_stream_in_release(struct mk_stream_input *in);
127 
128 
mk_channel_is_empty(struct mk_channel * channel)129 static inline int mk_channel_is_empty(struct mk_channel *channel)
130 {
131     return mk_list_is_empty(&channel->streams);
132 }
133 
mk_channel_append_stream(struct mk_channel * channel,struct mk_stream * stream)134 static inline void mk_channel_append_stream(struct mk_channel *channel,
135                                             struct mk_stream *stream)
136 {
137     mk_list_add(&stream->_head, &channel->streams);
138 }
139 
mk_stream_append(struct mk_stream_input * in,struct mk_stream * stream)140 static inline void mk_stream_append(struct mk_stream_input *in,
141                                     struct mk_stream *stream)
142 {
143     mk_list_add(&in->_head, &stream->inputs);
144 }
145 
mk_stream_input(struct mk_stream * stream,struct mk_stream_input * in,int type,int fd,void * buffer,size_t size,off_t offset,void (* cb_consumed)(struct mk_stream_input *,long),void (* cb_finished)(struct mk_stream_input *))146 static inline int mk_stream_input(struct mk_stream *stream,
147                                   struct mk_stream_input *in,
148                                   int type,
149                                   int fd,
150                                   void *buffer, size_t size,
151                                   off_t offset,
152                                   void (*cb_consumed) (struct mk_stream_input *, long),
153                                   void (*cb_finished)(struct mk_stream_input *))
154 
155 {
156     struct mk_iov *iov;
157 
158     if (!in) {
159         in = mk_mem_alloc(sizeof(struct mk_stream_input));
160         if (!in) {
161             return -1;
162         }
163         in->dynamic  = MK_TRUE;
164     }
165     else {
166         in->dynamic  = MK_FALSE;
167     }
168 
169     in->fd           = fd;
170     in->type         = type;
171     in->bytes_offset = offset;
172     in->buffer       = buffer;
173     in->cb_consumed  = cb_consumed;
174     in->cb_finished  = cb_finished;
175     in->stream       = stream;
176 
177     if (type == MK_STREAM_IOV) {
178         iov = buffer;
179         in->bytes_total = iov->total_len;
180     }
181     else {
182         in->bytes_total = size;
183     }
184 
185     mk_list_add(&in->_head, &stream->inputs);
186     return 0;
187 }
188 
mk_stream_in_file(struct mk_stream * stream,struct mk_stream_input * in,int fd,size_t total_bytes,off_t offset,void (* cb_consumed)(struct mk_stream_input *,long),void (* cb_finished)(struct mk_stream_input *))189 static inline int mk_stream_in_file(struct mk_stream *stream,
190                                     struct mk_stream_input *in, int fd,
191                                     size_t total_bytes,
192                                     off_t offset,
193                                     void (*cb_consumed)(struct mk_stream_input *, long),
194                                     void (*cb_finished)(struct mk_stream_input *))
195 
196 {
197     return mk_stream_input(stream,
198                            in,
199                            MK_STREAM_FILE,
200                            fd,
201                            NULL, total_bytes, offset,
202                            cb_consumed, cb_finished);
203 }
204 
mk_stream_in_iov(struct mk_stream * stream,struct mk_stream_input * in,struct mk_iov * iov,void (* cb_consumed)(struct mk_stream_input *,long),void (* cb_finished)(struct mk_stream_input *))205 static inline int mk_stream_in_iov(struct mk_stream *stream,
206                                    struct mk_stream_input *in,
207                                    struct mk_iov *iov,
208                                    void (*cb_consumed)(struct mk_stream_input *, long),
209                                    void (*cb_finished)(struct mk_stream_input *))
210 
211 {
212     return mk_stream_input(stream,
213                            in,
214                            MK_STREAM_IOV,
215                            0,
216                            iov, 0, 0,
217                            cb_consumed, cb_finished);
218 }
219 
mk_stream_in_raw(struct mk_stream * stream,struct mk_stream_input * in,char * buf,size_t length,void (* cb_consumed)(struct mk_stream_input *,long),void (* cb_finished)(struct mk_stream_input *))220 static inline int mk_stream_in_raw(struct mk_stream *stream,
221                                    struct mk_stream_input *in,
222                                    char *buf, size_t length,
223                                    void (*cb_consumed)(struct mk_stream_input *, long),
224                                    void (*cb_finished)(struct mk_stream_input *))
225 {
226     return mk_stream_input(stream,
227                            in,
228                            MK_STREAM_RAW,
229                            -1,
230                            buf, length,
231                            0,
232                            cb_consumed, cb_finished);
233 }
234 
235 
mk_stream_release(struct mk_stream * stream)236 static inline void mk_stream_release(struct mk_stream *stream)
237 {
238     struct mk_list *tmp;
239     struct mk_list *head;
240     struct mk_stream_input *in;
241 
242     /* Release any pending input */
243     mk_list_foreach_safe(head, tmp, &stream->inputs) {
244         in = mk_list_entry(head, struct mk_stream_input, _head);
245         mk_stream_in_release(in);
246     }
247 
248     if (stream->cb_finished) {
249         stream->cb_finished(stream);
250     }
251 
252     stream->channel = NULL;
253     mk_list_del(&stream->_head);
254     if (stream->dynamic == MK_TRUE) {
255         mk_mem_free(stream);
256     }
257 }
258 
259 static inline
mk_stream_set(struct mk_stream * stream,struct mk_channel * channel,void * data,void (* cb_finished)(struct mk_stream *),void (* cb_bytes_consumed)(struct mk_stream *,long),void (* cb_exception)(struct mk_stream *,int))260 struct mk_stream *mk_stream_set(struct mk_stream *stream,
261                                 struct mk_channel *channel,
262                                 void *data,
263                                 void (*cb_finished) (struct mk_stream *),
264                                 void (*cb_bytes_consumed) (struct mk_stream *, long),
265                                 void (*cb_exception) (struct mk_stream *, int))
266 {
267     /*
268      * The copybuf stream type it's a lazy stream mechanism on which the
269      * stream it self and the buffer are allocated dynamically. It just
270      * exists as an optional interface that do not care too much about
271      * performance and aim to make things easier. The COPYBUF type is not
272      * used by Monkey core, at the moment the only caller is the CGI plugin.
273      */
274     if (!stream) {
275         stream = mk_mem_alloc(sizeof(struct mk_stream));
276         if (!stream) {
277             return NULL;
278         }
279         stream->dynamic = MK_TRUE;
280     }
281     else {
282         stream->dynamic = MK_FALSE;
283     }
284 
285     stream->channel      = channel;
286     stream->bytes_offset = 0;
287     stream->context      = data;
288     stream->preserve     = MK_FALSE;
289 
290     /* callbacks */
291     stream->cb_finished       = cb_finished;
292     stream->cb_bytes_consumed = cb_bytes_consumed;
293     stream->cb_exception      = cb_exception;
294 
295     mk_list_init(&stream->inputs);
296     mk_list_add(&stream->_head, &channel->streams);
297 
298     return stream;
299 }
300 
mk_stream_input_unlink(struct mk_stream_input * in)301 static inline void mk_stream_input_unlink(struct mk_stream_input *in)
302 {
303     mk_list_del(&in->_head);
304 }
305 
306 /* Mark a specific number of bytes served (just on successfull flush) */
mk_stream_input_consume(struct mk_stream_input * in,long bytes)307 static inline void mk_stream_input_consume(struct mk_stream_input *in, long bytes)
308 {
309 #ifdef TRACE
310     char *fmt = NULL;
311 
312     if (in->type == MK_STREAM_RAW) {
313         fmt = "[INPUT_RAW %p] bytes consumed %lu/%lu";
314     }
315     else if (in->type == MK_STREAM_IOV) {
316         fmt = "[INPUT_IOV %p] bytes consumed %lu/%lu";
317     }
318     else if (in->type == MK_STREAM_FILE) {
319         fmt = "[INPUT_FILE %p] bytes consumed %lu/%lu";
320     }
321     else if (in->type == MK_STREAM_SOCKET) {
322         fmt = "[INPUT_SOCK %p] bytes consumed %lu/%lu";
323     }
324     else if (in->type == MK_STREAM_COPYBUF) {
325         fmt = "[INPUT_CBUF %p] bytes consumed %lu/%lu";
326     }
327     else {
328         fmt = "[INPUT_UNKW %p] bytes consumed %lu/%lu";
329     }
330     MK_TRACE(fmt, in, bytes, in->bytes_total);
331 #endif
332 
333     in->bytes_total -= bytes;
334 }
335 
336 #ifdef TRACE
mk_channel_debug(struct mk_channel * channel)337 static inline void mk_channel_debug(struct mk_channel *channel)
338 {
339     int i = 0;
340     int i_input;
341     struct mk_list *head;
342     struct mk_list *h_inputs;
343     struct mk_stream *stream;
344     struct mk_stream_input *in;
345 
346     printf("\n*** Channel ***\n");
347     mk_list_foreach(head, &channel->streams) {
348         stream = mk_list_entry(head, struct mk_stream, _head);
349         i_input = 0;
350 
351         mk_list_foreach(h_inputs, &stream->inputs) {
352             in = mk_list_entry(h_inputs, struct mk_stream_input, _head);
353             switch (in->type) {
354             case MK_STREAM_RAW:
355                 printf("     in.%i] %p RAW    : ", i_input, in);
356                 break;
357             case MK_STREAM_IOV:
358                 printf("     in.%i] %p IOV    : ", i_input, in);
359                 break;
360             case MK_STREAM_FILE:
361                 printf("     in.%i] %p FILE   : ", i_input, in);
362                 break;
363             case MK_STREAM_SOCKET:
364                 printf("     in.%i] %p SOCKET : ", i_input, in);
365                 break;
366             case MK_STREAM_COPYBUF:
367                 printf("     in.%i] %p COPYBUF: ", i_input, in);
368                 break;
369             case MK_STREAM_EOF:
370                 printf("%i) [%p] STREAM EOF    : ", i, stream);
371                 break;
372             }
373 #if defined(__APPLE__)
374             printf("bytes=%lld/%lu\n", in->bytes_offset, in->bytes_total);
375 #else
376             printf("bytes=%ld/%zu\n", in->bytes_offset, in->bytes_total);
377 #endif
378             i_input++;
379         }
380     }
381 }
382 #endif
383 
384 struct mk_stream *mk_stream_new(int type, struct mk_channel *channel,
385                                 void *buffer, size_t size, void *data,
386                                 void (*cb_finished) (struct mk_stream *),
387                                 void (*cb_bytes_consumed) (struct mk_stream *, long),
388                                 void (*cb_exception) (struct mk_stream *, int));
389 
390 int mk_channel_stream_write(struct mk_stream *stream, size_t *count);
391 
392 struct mk_channel *mk_channel_new(int type, int fd);
393 
394 int mk_channel_flush(struct mk_channel *channel);
395 int mk_channel_write(struct mk_channel *channel, size_t *count);
396 int mk_channel_clean(struct mk_channel *channel);
397 #endif
398