1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Monkey HTTP Server
4 * ==================
5 * Copyright 2001-2017 Eduardo Silva <eduardo@monkey.io>
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19
20 #ifndef MK_STREAM_H
21 #define MK_STREAM_H
22
23 #include <monkey/mk_core.h>
24 #include <monkey/mk_plugin_net.h>
25
26 /*
27 * Stream types: each stream can have a different
28 * source of information and for hence it handler
29 * may need to be different for each cases.
30 */
31 #define MK_STREAM_RAW 0 /* raw data from buffer */
32 #define MK_STREAM_IOV 1 /* mk_iov struct */
33 #define MK_STREAM_FILE 2 /* opened file */
34 #define MK_STREAM_SOCKET 3 /* socket, scared.. */
35
36 /* Channel return values for write event */
37 #define MK_CHANNEL_OK 0 /* channel is ok (channel->status) */
38 #define MK_CHANNEL_DONE 1 /* channel consumed all streams */
39 #define MK_CHANNEL_ERROR 2 /* exception when flusing data */
40 #define MK_CHANNEL_FLUSH 4 /* channel flushed some data */
41 #define MK_CHANNEL_EMPTY 8 /* no streams available */
42 #define MK_CHANNEL_BUSY 16 /* cannot write, busy (EAGAIN) */
43 #define MK_CHANNEL_UNKNOWN 32 /* unhandled */
44
45 /* Channel status */
46 #define MK_CHANNEL_DISABLED 0 /* channel is sleeping */
47 #define MK_CHANNEL_ENABLED 1 /* channel enabled, have some data */
48
49 /*
50 * Channel types: by default the only channel supported
51 * is a direct write to the network layer.
52 */
53 #define MK_CHANNEL_SOCKET 0
54
55 /*
56 * A channel represents an end-point of a stream, for short
57 * where the stream data consumed and is send to. The channel
58 * knows how to read/write to a TCP connection through a
59 * defined network plugin.
60 */
61 struct mk_channel {
62 int type;
63 int fd;
64 int status;
65
66 struct mk_event *event;
67 struct mk_plugin_network *io;
68 struct mk_list streams;
69 void *thread;
70 };
71
72 /* Stream input source */
73 struct mk_stream_input {
74 int type; /* input type */
75 int fd; /* file descriptor (files) */
76 int dynamic;
77
78 size_t bytes_total; /* Total of data from the input */
79 off_t bytes_offset; /* Data already sent */
80
81 /*
82 * Based on the stream input type, 'data' could reference a RAW buffer
83 * or a mk_iov struct.
84 */
85 void *buffer;
86 void *context;
87
88 /* callbacks */
89 void (*cb_consumed)(struct mk_stream_input *, long);
90 void (*cb_finished)(struct mk_stream_input *);
91
92 struct mk_stream *stream; /* reference to parent stream */
93 struct mk_list _head; /* link to inputs stream list */
94 };
95
96 /*
97 * A stream holds a queue of components that refers to different
98 * data sources such as: static file, raw buffer, etc.
99 */
100 struct mk_stream {
101 int preserve; /* preserve stream? (do not unlink) */
102 int encoding; /* some output encoding ? */
103 int dynamic; /* dynamic allocated ? */
104
105 size_t bytes_total; /* Total of data from stream_input */
106 off_t bytes_offset; /* Data already sent */
107
108 /* the outgoing channel, we do this for all streams */
109 struct mk_channel *channel;
110
111 /* Context the caller may want to reference with the stream (optional) */
112 void *context;
113
114 /* callbacks */
115 void (*cb_finished) (struct mk_stream *);
116 void (*cb_bytes_consumed) (struct mk_stream *, long);
117 void (*cb_exception) (struct mk_stream *, int);
118
119 /* Head of stream_input nodes */
120 struct mk_list inputs;
121
122 /* Link to the Channel parent */
123 struct mk_list _head;
124 };
125
126 int mk_stream_in_release(struct mk_stream_input *in);
127
128
mk_channel_is_empty(struct mk_channel * channel)129 static inline int mk_channel_is_empty(struct mk_channel *channel)
130 {
131 return mk_list_is_empty(&channel->streams);
132 }
133
mk_channel_append_stream(struct mk_channel * channel,struct mk_stream * stream)134 static inline void mk_channel_append_stream(struct mk_channel *channel,
135 struct mk_stream *stream)
136 {
137 mk_list_add(&stream->_head, &channel->streams);
138 }
139
mk_stream_append(struct mk_stream_input * in,struct mk_stream * stream)140 static inline void mk_stream_append(struct mk_stream_input *in,
141 struct mk_stream *stream)
142 {
143 mk_list_add(&in->_head, &stream->inputs);
144 }
145
mk_stream_input(struct mk_stream * stream,struct mk_stream_input * in,int type,int fd,void * buffer,size_t size,off_t offset,void (* cb_consumed)(struct mk_stream_input *,long),void (* cb_finished)(struct mk_stream_input *))146 static inline int mk_stream_input(struct mk_stream *stream,
147 struct mk_stream_input *in,
148 int type,
149 int fd,
150 void *buffer, size_t size,
151 off_t offset,
152 void (*cb_consumed) (struct mk_stream_input *, long),
153 void (*cb_finished)(struct mk_stream_input *))
154
155 {
156 struct mk_iov *iov;
157
158 if (!in) {
159 in = mk_mem_alloc(sizeof(struct mk_stream_input));
160 if (!in) {
161 return -1;
162 }
163 in->dynamic = MK_TRUE;
164 }
165 else {
166 in->dynamic = MK_FALSE;
167 }
168
169 in->fd = fd;
170 in->type = type;
171 in->bytes_offset = offset;
172 in->buffer = buffer;
173 in->cb_consumed = cb_consumed;
174 in->cb_finished = cb_finished;
175 in->stream = stream;
176
177 if (type == MK_STREAM_IOV) {
178 iov = buffer;
179 in->bytes_total = iov->total_len;
180 }
181 else {
182 in->bytes_total = size;
183 }
184
185 mk_list_add(&in->_head, &stream->inputs);
186 return 0;
187 }
188
mk_stream_in_file(struct mk_stream * stream,struct mk_stream_input * in,int fd,size_t total_bytes,off_t offset,void (* cb_consumed)(struct mk_stream_input *,long),void (* cb_finished)(struct mk_stream_input *))189 static inline int mk_stream_in_file(struct mk_stream *stream,
190 struct mk_stream_input *in, int fd,
191 size_t total_bytes,
192 off_t offset,
193 void (*cb_consumed)(struct mk_stream_input *, long),
194 void (*cb_finished)(struct mk_stream_input *))
195
196 {
197 return mk_stream_input(stream,
198 in,
199 MK_STREAM_FILE,
200 fd,
201 NULL, total_bytes, offset,
202 cb_consumed, cb_finished);
203 }
204
mk_stream_in_iov(struct mk_stream * stream,struct mk_stream_input * in,struct mk_iov * iov,void (* cb_consumed)(struct mk_stream_input *,long),void (* cb_finished)(struct mk_stream_input *))205 static inline int mk_stream_in_iov(struct mk_stream *stream,
206 struct mk_stream_input *in,
207 struct mk_iov *iov,
208 void (*cb_consumed)(struct mk_stream_input *, long),
209 void (*cb_finished)(struct mk_stream_input *))
210
211 {
212 return mk_stream_input(stream,
213 in,
214 MK_STREAM_IOV,
215 0,
216 iov, 0, 0,
217 cb_consumed, cb_finished);
218 }
219
mk_stream_in_raw(struct mk_stream * stream,struct mk_stream_input * in,char * buf,size_t length,void (* cb_consumed)(struct mk_stream_input *,long),void (* cb_finished)(struct mk_stream_input *))220 static inline int mk_stream_in_raw(struct mk_stream *stream,
221 struct mk_stream_input *in,
222 char *buf, size_t length,
223 void (*cb_consumed)(struct mk_stream_input *, long),
224 void (*cb_finished)(struct mk_stream_input *))
225 {
226 return mk_stream_input(stream,
227 in,
228 MK_STREAM_RAW,
229 -1,
230 buf, length,
231 0,
232 cb_consumed, cb_finished);
233 }
234
235
mk_stream_release(struct mk_stream * stream)236 static inline void mk_stream_release(struct mk_stream *stream)
237 {
238 struct mk_list *tmp;
239 struct mk_list *head;
240 struct mk_stream_input *in;
241
242 /* Release any pending input */
243 mk_list_foreach_safe(head, tmp, &stream->inputs) {
244 in = mk_list_entry(head, struct mk_stream_input, _head);
245 mk_stream_in_release(in);
246 }
247
248 if (stream->cb_finished) {
249 stream->cb_finished(stream);
250 }
251
252 stream->channel = NULL;
253 mk_list_del(&stream->_head);
254 if (stream->dynamic == MK_TRUE) {
255 mk_mem_free(stream);
256 }
257 }
258
259 static inline
mk_stream_set(struct mk_stream * stream,struct mk_channel * channel,void * data,void (* cb_finished)(struct mk_stream *),void (* cb_bytes_consumed)(struct mk_stream *,long),void (* cb_exception)(struct mk_stream *,int))260 struct mk_stream *mk_stream_set(struct mk_stream *stream,
261 struct mk_channel *channel,
262 void *data,
263 void (*cb_finished) (struct mk_stream *),
264 void (*cb_bytes_consumed) (struct mk_stream *, long),
265 void (*cb_exception) (struct mk_stream *, int))
266 {
267 /*
268 * The copybuf stream type it's a lazy stream mechanism on which the
269 * stream it self and the buffer are allocated dynamically. It just
270 * exists as an optional interface that do not care too much about
271 * performance and aim to make things easier. The COPYBUF type is not
272 * used by Monkey core, at the moment the only caller is the CGI plugin.
273 */
274 if (!stream) {
275 stream = mk_mem_alloc(sizeof(struct mk_stream));
276 if (!stream) {
277 return NULL;
278 }
279 stream->dynamic = MK_TRUE;
280 }
281 else {
282 stream->dynamic = MK_FALSE;
283 }
284
285 stream->channel = channel;
286 stream->bytes_offset = 0;
287 stream->context = data;
288 stream->preserve = MK_FALSE;
289
290 /* callbacks */
291 stream->cb_finished = cb_finished;
292 stream->cb_bytes_consumed = cb_bytes_consumed;
293 stream->cb_exception = cb_exception;
294
295 mk_list_init(&stream->inputs);
296 mk_list_add(&stream->_head, &channel->streams);
297
298 return stream;
299 }
300
mk_stream_input_unlink(struct mk_stream_input * in)301 static inline void mk_stream_input_unlink(struct mk_stream_input *in)
302 {
303 mk_list_del(&in->_head);
304 }
305
306 /* Mark a specific number of bytes served (just on successfull flush) */
mk_stream_input_consume(struct mk_stream_input * in,long bytes)307 static inline void mk_stream_input_consume(struct mk_stream_input *in, long bytes)
308 {
309 #ifdef TRACE
310 char *fmt = NULL;
311
312 if (in->type == MK_STREAM_RAW) {
313 fmt = "[INPUT_RAW %p] bytes consumed %lu/%lu";
314 }
315 else if (in->type == MK_STREAM_IOV) {
316 fmt = "[INPUT_IOV %p] bytes consumed %lu/%lu";
317 }
318 else if (in->type == MK_STREAM_FILE) {
319 fmt = "[INPUT_FILE %p] bytes consumed %lu/%lu";
320 }
321 else if (in->type == MK_STREAM_SOCKET) {
322 fmt = "[INPUT_SOCK %p] bytes consumed %lu/%lu";
323 }
324 else if (in->type == MK_STREAM_COPYBUF) {
325 fmt = "[INPUT_CBUF %p] bytes consumed %lu/%lu";
326 }
327 else {
328 fmt = "[INPUT_UNKW %p] bytes consumed %lu/%lu";
329 }
330 MK_TRACE(fmt, in, bytes, in->bytes_total);
331 #endif
332
333 in->bytes_total -= bytes;
334 }
335
336 #ifdef TRACE
mk_channel_debug(struct mk_channel * channel)337 static inline void mk_channel_debug(struct mk_channel *channel)
338 {
339 int i = 0;
340 int i_input;
341 struct mk_list *head;
342 struct mk_list *h_inputs;
343 struct mk_stream *stream;
344 struct mk_stream_input *in;
345
346 printf("\n*** Channel ***\n");
347 mk_list_foreach(head, &channel->streams) {
348 stream = mk_list_entry(head, struct mk_stream, _head);
349 i_input = 0;
350
351 mk_list_foreach(h_inputs, &stream->inputs) {
352 in = mk_list_entry(h_inputs, struct mk_stream_input, _head);
353 switch (in->type) {
354 case MK_STREAM_RAW:
355 printf(" in.%i] %p RAW : ", i_input, in);
356 break;
357 case MK_STREAM_IOV:
358 printf(" in.%i] %p IOV : ", i_input, in);
359 break;
360 case MK_STREAM_FILE:
361 printf(" in.%i] %p FILE : ", i_input, in);
362 break;
363 case MK_STREAM_SOCKET:
364 printf(" in.%i] %p SOCKET : ", i_input, in);
365 break;
366 case MK_STREAM_COPYBUF:
367 printf(" in.%i] %p COPYBUF: ", i_input, in);
368 break;
369 case MK_STREAM_EOF:
370 printf("%i) [%p] STREAM EOF : ", i, stream);
371 break;
372 }
373 #if defined(__APPLE__)
374 printf("bytes=%lld/%lu\n", in->bytes_offset, in->bytes_total);
375 #else
376 printf("bytes=%ld/%zu\n", in->bytes_offset, in->bytes_total);
377 #endif
378 i_input++;
379 }
380 }
381 }
382 #endif
383
384 struct mk_stream *mk_stream_new(int type, struct mk_channel *channel,
385 void *buffer, size_t size, void *data,
386 void (*cb_finished) (struct mk_stream *),
387 void (*cb_bytes_consumed) (struct mk_stream *, long),
388 void (*cb_exception) (struct mk_stream *, int));
389
390 int mk_channel_stream_write(struct mk_stream *stream, size_t *count);
391
392 struct mk_channel *mk_channel_new(int type, int fd);
393
394 int mk_channel_flush(struct mk_channel *channel);
395 int mk_channel_write(struct mk_channel *channel, size_t *count);
396 int mk_channel_clean(struct mk_channel *channel);
397 #endif
398