1 /* VNC Reflector
2 * Copyright (C) 2001-2003 HorizonLive.com, Inc. All rights reserved.
3 *
4 * This software is released under the terms specified in the file LICENSE,
5 * included. HorizonLive provides e-Learning and collaborative synchronous
6 * presentation solutions in a totally Web-based environment. For more
7 * information about HorizonLive, please see our website at
8 * http://www.horizonlive.com.
9 *
10 * This software was authored by Constantin Kaplinsky <const@ce.cctpu.edu.ru>
11 * and sponsored by HorizonLive.com, Inc.
12 *
13 * $Id: async_io.c,v 1.26 2003/04/26 19:47:23 const Exp $
14 * Asynchronous file/socket I/O
15 */
16
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <unistd.h>
21 #include <sys/time.h>
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/in.h>
25 #include <arpa/inet.h>
26 #include <fcntl.h>
27 #include <signal.h>
28 #include <errno.h>
29
30 #ifdef USE_POLL
31 #include <sys/poll.h>
32 #define FD_ARRAY_MAXSIZE 10000
33 #endif
34
35 #include "async_io.h"
36
37 /*
38 * Global variables
39 */
40
41 AIO_SLOT *cur_slot;
42
43 /*
44 * Static variables
45 */
46
47 struct in_addr s_bind_address;
48
49 #ifdef USE_POLL
50 static struct pollfd s_fd_array[FD_ARRAY_MAXSIZE];
51 static unsigned int s_fd_array_size;
52 #else
53 static fd_set s_fdset_read;
54 static fd_set s_fdset_write;
55 static int s_max_fd;
56 #endif
57
58 static AIO_FUNCPTR s_idle_func;
59 static AIO_SLOT *s_first_slot;
60 static AIO_SLOT *s_last_slot;
61
62 static volatile int s_sig_func_set;
63 static AIO_FUNCPTR s_sig_func[10];
64
65 static int s_close_f;
66
67 /*
68 * Prototypes for static functions
69 */
70
71 static AIO_SLOT *aio_new_slot(int fd, char *name, size_t slot_size);
72 static void aio_process_input(AIO_SLOT *slot);
73 static void aio_process_output(AIO_SLOT *slot);
74 static void aio_process_func_list(void);
75 static void aio_accept_connection(AIO_SLOT *slot);
76 static void aio_process_closed(void);
77 static void aio_destroy_slot(AIO_SLOT *slot, int fatal);
78
79 static void sh_interrupt(int signo);
80
81
82 /*
83 * Implementation
84 */
85
86 /*
87 * Initialize I/O sybsystem. This function should be called prior to
88 * any other function herein and should NOT be called from within
89 * event loop, from callback functions.
90 */
91
aio_init(void)92 void aio_init(void)
93 {
94 int i;
95
96 #ifdef USE_POLL
97 s_fd_array_size = 0;
98 #else
99 FD_ZERO(&s_fdset_read);
100 FD_ZERO(&s_fdset_write);
101 s_max_fd = 0;
102 #endif
103 s_idle_func = NULL;
104 s_first_slot = NULL;
105 s_last_slot = NULL;
106 s_close_f = 0;
107
108 s_sig_func_set = 0;
109 for (i = 0; i < 10; i++)
110 s_sig_func[i] = NULL;
111
112 s_bind_address.s_addr = htonl(INADDR_ANY);
113 }
114
115 /*
116 * Bind all listening sockets to specific interface specified by an IP
117 * adress.
118 */
119
aio_set_bind_address(char * bind_ip)120 int aio_set_bind_address(char *bind_ip)
121 {
122 struct in_addr tmp_bind_address;
123
124 if (!inet_aton(bind_ip, &tmp_bind_address))
125 return 0;
126
127 s_bind_address = tmp_bind_address;
128 return 1;
129 }
130
131 /*
132 * Create I/O slot for existing connection (open file). After new slot
133 * has been created, initfunc would be called with cur_slot pointing
134 * to that slot. To allow reading from provided descriptor, initfunc
135 * should set some input handler using aio_setread() function.
136 */
137
aio_add_slot(int fd,char * name,AIO_FUNCPTR initfunc,size_t slot_size)138 int aio_add_slot(int fd, char *name, AIO_FUNCPTR initfunc, size_t slot_size)
139 {
140 AIO_SLOT *slot, *saved_slot;
141
142 if (initfunc == NULL)
143 return 0;
144
145 slot = aio_new_slot(fd, name, slot_size);
146 if (slot == NULL)
147 return 0;
148
149 /* Saving cur_slot value, calling initfunc with different cur_slot */
150 saved_slot = cur_slot;
151 cur_slot = slot;
152 (*initfunc)();
153 cur_slot = saved_slot;
154
155 return 1;
156 }
157
158 /*
159 * Create listening socket. All connections would be accepted
160 * automatically. initfunc would be called after listening I/O slot is
161 * created, and acceptfunc would be called for each new slot created
162 * on newly accepted connection.
163 */
164
aio_listen(int port,AIO_FUNCPTR initfunc,AIO_FUNCPTR acceptfunc,size_t slot_size)165 int aio_listen(int port, AIO_FUNCPTR initfunc, AIO_FUNCPTR acceptfunc,
166 size_t slot_size)
167 {
168 AIO_SLOT *slot, *saved_slot;
169 int listen_fd;
170 struct sockaddr_in listen_addr;
171 int optval = 1;
172
173 errno = 0;
174
175 /* initfunc is optional but acceptfunc should be provided. */
176 if (acceptfunc == NULL)
177 return 0;
178
179 listen_fd = socket(AF_INET, SOCK_STREAM, 0);
180 if (listen_fd < 0)
181 return 0;
182
183 if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR,
184 &optval, sizeof(int)) != 0) {
185 close(listen_fd);
186 return 0;
187 }
188
189 listen_addr.sin_family = AF_INET;
190 listen_addr.sin_addr.s_addr = s_bind_address.s_addr;
191 listen_addr.sin_port = htons((unsigned short)port);
192
193 if ( bind(listen_fd, (struct sockaddr *)&listen_addr,
194 sizeof(listen_addr)) != 0 ||
195 fcntl(listen_fd, F_SETFL, O_NONBLOCK) != 0 ||
196 listen(listen_fd, 5) != 0 ) {
197 close(listen_fd);
198 return 0;
199 }
200
201 slot = aio_new_slot(listen_fd, "[listening slot]", sizeof(AIO_SLOT));
202 if (slot == NULL)
203 return 0;
204
205 slot->listening_f = 1;
206 slot->bytes_to_read = slot_size;
207 slot->readfunc = acceptfunc;
208
209 if (initfunc != NULL) {
210 saved_slot = cur_slot;
211 cur_slot = slot;
212 (*initfunc)();
213 cur_slot = saved_slot;
214 }
215
216 return 1;
217 }
218
219 /*
220 * Iterate over a list of connection slots with specified type.
221 * Returns number of matching slots.
222 */
223
aio_walk_slots(AIO_FUNCPTR fn,int type)224 int aio_walk_slots(AIO_FUNCPTR fn, int type)
225 {
226 AIO_SLOT *slot, *next_slot;
227 int count = 0;
228
229 slot = s_first_slot;
230 while (slot != NULL && !s_close_f) {
231 next_slot = slot->next;
232 if (slot->type == type && !slot->close_f) {
233 (*fn)(slot);
234 count++;
235 }
236 slot = next_slot;
237 }
238
239 return count;
240 }
241
242 /*
243 * This function should be called if we have to execute a function
244 * when I/O state is consistent, but currently we are not sure if it's
245 * safe (e.g. to be called from signal handlers). fn_type should be a
246 * number in the range of 0..9 and if there are two or more functions
247 * of the same fn_type set, only one of them would be called
248 * (probably, the latest set).
249 */
250
aio_call_func(AIO_FUNCPTR fn,int fn_type)251 void aio_call_func(AIO_FUNCPTR fn, int fn_type)
252 {
253 if (fn_type >= 0 && fn_type < 10) {
254 s_sig_func[fn_type] = fn;
255 s_sig_func_set = 1;
256 }
257 }
258
259 /*
260 * Function to close connection slot. Operates on *cur_slot.
261 * If fatal_f is not 0 then close all other slots and quit
262 * event loop. Note that a slot would not be destroyed right
263 * on this function call, this would be done later, at the end
264 * of main loop cycle (however, listening sockets stop listening
265 * immediately if fatal_f is 0).
266 */
267
aio_close(int fatal_f)268 void aio_close(int fatal_f)
269 {
270 aio_close_other(cur_slot, fatal_f);
271 }
272
273 /*
274 * A function similar to aio_close, but operates on specified slot.
275 */
276
aio_close_other(AIO_SLOT * slot,int fatal_f)277 void aio_close_other(AIO_SLOT *slot, int fatal_f)
278 {
279 slot->close_f = 1;
280
281 if (fatal_f) {
282 s_close_f = 1;
283 } else if (slot->listening_f) {
284 close(slot->fd);
285 slot->fd_closed_f = 1;
286 #ifndef USE_POLL
287 FD_CLR(slot->fd, &s_fdset_read);
288 if (slot->fd == s_max_fd) {
289 /* NOTE: Better way is to find _existing_ max fd */
290 s_max_fd--;
291 }
292 #endif
293 }
294 }
295
296 /*
297 * Main event loop. It watches for possibility to perform I/O
298 * operations on descriptors and dispatches results to custom
299 * callback functions.
300 *
301 * Here are two versions, one uses poll(2) syscall, another uses
302 * select(2) instead. Note that select(2) is more portable while
303 * poll(2) is less limited.
304 */
305
306 /* FIXME: Implement configurable network timeout. */
307
308 #ifdef USE_POLL
309
aio_mainloop(void)310 void aio_mainloop(void)
311 {
312 AIO_SLOT *slot, *next_slot;
313
314 signal(SIGPIPE, SIG_IGN);
315 signal(SIGTERM, sh_interrupt);
316 signal(SIGINT, sh_interrupt);
317
318 if (s_sig_func_set)
319 aio_process_func_list();
320
321 while (!s_close_f) {
322 if (poll(s_fd_array, s_fd_array_size, 1000) > 0) {
323 slot = s_first_slot;
324 while (slot != NULL && !s_close_f) {
325 next_slot = slot->next;
326 if (s_fd_array[slot->idx].revents & (POLLERR | POLLHUP | POLLNVAL)) {
327 slot->errio_f = 1;
328 slot->close_f = 1;
329 } else {
330 if (s_fd_array[slot->idx].revents & POLLOUT)
331 aio_process_output(slot);
332 if ((s_fd_array[slot->idx].revents & POLLIN) && !slot->close_f) {
333 if (slot->listening_f)
334 aio_accept_connection(slot);
335 else
336 aio_process_input(slot);
337 }
338 }
339 slot = next_slot;
340 }
341 aio_process_closed();
342 if (s_sig_func_set && !s_close_f)
343 aio_process_func_list();
344 } else {
345 if (s_sig_func_set)
346 aio_process_func_list();
347 else if (s_idle_func != NULL)
348 (*s_idle_func)(); /* Do something in idle periods */
349 }
350 }
351 /* Close all slots and exit */
352 slot = s_first_slot;
353 while(slot != NULL) {
354 next_slot = slot->next;
355 aio_destroy_slot(slot, 1);
356 slot = next_slot;
357 }
358 }
359
360 #else
361
aio_mainloop(void)362 void aio_mainloop(void)
363 {
364 fd_set fdset_r, fdset_w;
365 struct timeval timeout;
366 AIO_SLOT *slot, *next_slot;
367
368 signal(SIGPIPE, SIG_IGN);
369 signal(SIGTERM, sh_interrupt);
370 signal(SIGINT, sh_interrupt);
371
372 if (s_sig_func_set)
373 aio_process_func_list();
374
375 while (!s_close_f) {
376 memcpy(&fdset_r, &s_fdset_read, sizeof(fd_set));
377 memcpy(&fdset_w, &s_fdset_write, sizeof(fd_set));
378 timeout.tv_sec = 1; /* One second timeout */
379 timeout.tv_usec = 0;
380 if (select(s_max_fd + 1, &fdset_r, &fdset_w, NULL, &timeout) > 0) {
381 slot = s_first_slot;
382 while (slot != NULL && !s_close_f) {
383 next_slot = slot->next;
384 if (FD_ISSET(slot->fd, &fdset_w))
385 aio_process_output(slot);
386 if (FD_ISSET(slot->fd, &fdset_r) && !slot->close_f) {
387 if (slot->listening_f)
388 aio_accept_connection(slot);
389 else
390 aio_process_input(slot);
391 }
392 slot = next_slot;
393 }
394 aio_process_closed();
395 if (s_sig_func_set && !s_close_f)
396 aio_process_func_list();
397 } else {
398 if (s_sig_func_set)
399 aio_process_func_list();
400 else if (s_idle_func != NULL)
401 (*s_idle_func)(); /* Do something in idle periods */
402 }
403 }
404 /* Stop listening, close all slots and exit */
405 slot = s_first_slot;
406 while(slot != NULL) {
407 next_slot = slot->next;
408 aio_destroy_slot(slot, 1);
409 slot = next_slot;
410 }
411 }
412
413 #endif /* USE_POLL */
414
aio_setread(AIO_FUNCPTR fn,void * inbuf,int bytes_to_read)415 void aio_setread(AIO_FUNCPTR fn, void *inbuf, int bytes_to_read)
416 {
417 /* FIXME: Check for close_f before the work? */
418
419 if (cur_slot->alloc_f) {
420 free(cur_slot->readbuf);
421 cur_slot->alloc_f = 0;
422 }
423
424 /* NOTE: readfunc must be real, not NULL */
425 cur_slot->readfunc = fn;
426
427 if (inbuf != NULL) {
428 cur_slot->readbuf = inbuf;
429 } else {
430 if (bytes_to_read <= sizeof(cur_slot->buf256)) {
431 cur_slot->readbuf = cur_slot->buf256;
432 } else {
433 cur_slot->readbuf = malloc(bytes_to_read);
434 if (cur_slot->readbuf != NULL) {
435 cur_slot->alloc_f = 1;
436 } else {
437 cur_slot->close_f = 1;
438 }
439 }
440 }
441 cur_slot->bytes_to_read = bytes_to_read;
442 cur_slot->bytes_ready = 0;
443 }
444
aio_write(AIO_FUNCPTR fn,void * outbuf,int bytes_to_write)445 void aio_write(AIO_FUNCPTR fn, void *outbuf, int bytes_to_write)
446 {
447 AIO_BLOCK *block;
448
449 /* FIXME: Join small blocks together? */
450 /* FIXME: Support small static buffer as in reading? */
451
452 block = malloc(sizeof(AIO_BLOCK) + bytes_to_write);
453 if (block != NULL) {
454 block->data_size = bytes_to_write;
455 memcpy(block->data, outbuf, bytes_to_write);
456 aio_write_nocopy(fn, block);
457 }
458 }
459
aio_write_nocopy(AIO_FUNCPTR fn,AIO_BLOCK * block)460 void aio_write_nocopy(AIO_FUNCPTR fn, AIO_BLOCK *block)
461 {
462 if (block != NULL) {
463 /* By the way, fn may be NULL */
464 block->func = fn;
465
466 if (cur_slot->outqueue == NULL) {
467 /* Output queue was empty */
468 cur_slot->outqueue = block;
469 cur_slot->bytes_written = 0;
470 #ifdef USE_POLL
471 s_fd_array[cur_slot->idx].events |= POLLOUT;
472 #else
473 FD_SET(cur_slot->fd, &s_fdset_write);
474 #endif
475 } else {
476 /* Output queue was not empty */
477 cur_slot->outqueue_last->next = block;
478 }
479
480 cur_slot->outqueue_last = block;
481 block->next = NULL;
482 }
483 }
484
aio_setclose(AIO_FUNCPTR closefunc)485 void aio_setclose(AIO_FUNCPTR closefunc)
486 {
487 cur_slot->closefunc = closefunc;
488 }
489
490 /***************************
491 * Static functions follow
492 */
493
aio_new_slot(int fd,char * name,size_t slot_size)494 AIO_SLOT *aio_new_slot(int fd, char *name, size_t slot_size)
495 {
496 size_t size;
497 AIO_SLOT *slot;
498
499 /* Allocate memory make sure all fields are zeroed (very important). */
500 size = (slot_size > sizeof(AIO_SLOT)) ? slot_size : sizeof(AIO_SLOT);
501 slot = calloc(1, size);
502
503 if (slot) {
504 slot->fd = fd;
505 if (name != NULL) {
506 slot->name = strdup(name);
507 } else {
508 slot->name = strdup("[unknown]");
509 }
510
511 if (s_last_slot == NULL) {
512 /* This is the first slot */
513 s_first_slot = slot;
514 slot->prev = NULL;
515 } else {
516 /* Other slots exist */
517 s_last_slot->next = slot;
518 slot->prev = s_last_slot;
519 }
520 s_last_slot = slot;
521
522 /* Put fd into non-blocking mode */
523 /* FIXME: check return value? */
524 fcntl(fd, F_SETFL, O_NONBLOCK);
525
526 #ifdef USE_POLL
527 /* FIXME: do something better if s_fd_array_size exceeds max size? */
528 if (s_fd_array_size < FD_ARRAY_MAXSIZE) {
529 slot->idx = s_fd_array_size++;
530 s_fd_array[slot->idx].fd = fd;
531 s_fd_array[slot->idx].events = POLLIN;
532 s_fd_array[slot->idx].revents = 0;
533 }
534 #else
535 FD_SET(fd, &s_fdset_read);
536 if (fd > s_max_fd)
537 s_max_fd = fd;
538 #endif
539 }
540 return slot;
541 }
542
aio_process_input(AIO_SLOT * slot)543 static void aio_process_input(AIO_SLOT *slot)
544 {
545 int bytes = 0;
546
547 /* FIXME: Do not read anything if readfunc is not set?
548 Or maybe skip everything we're receiving?
549 Or better destroy the slot? -- I think yes. */
550
551 if (!slot->close_f) {
552 errno = 0;
553 if (slot->bytes_to_read - slot->bytes_ready > 0) {
554 bytes = read(slot->fd, slot->readbuf + slot->bytes_ready,
555 slot->bytes_to_read - slot->bytes_ready);
556 }
557 if (bytes > 0 || slot->bytes_to_read == 0) {
558 slot->bytes_ready += bytes;
559 if (slot->bytes_ready == slot->bytes_to_read) {
560 cur_slot = slot;
561 (*slot->readfunc)();
562 }
563 } else if (bytes == 0 || (bytes < 0 && errno != EAGAIN)) {
564 slot->close_f = 1;
565 slot->errio_f = 1;
566 slot->errread_f = 1;
567 slot->io_errno = errno;
568 }
569 }
570 }
571
aio_process_output(AIO_SLOT * slot)572 static void aio_process_output(AIO_SLOT *slot)
573 {
574 int bytes = 0;
575 AIO_BLOCK *next;
576
577 /* FIXME: Maybe write all blocks in a loop. */
578
579 if (!slot->close_f) {
580 errno = 0;
581 if (slot->outqueue->data_size - slot->bytes_written > 0) {
582 bytes = write(slot->fd, slot->outqueue->data + slot->bytes_written,
583 slot->outqueue->data_size - slot->bytes_written);
584 }
585 if (bytes > 0 || slot->outqueue->data_size == 0) {
586 slot->bytes_written += bytes;
587 if (slot->bytes_written == slot->outqueue->data_size) {
588 /* Block sent, call hook function if set */
589 if (slot->outqueue->func != NULL) {
590 cur_slot = slot;
591 (*slot->outqueue->func)();
592 }
593 next = slot->outqueue->next;
594 if (next != NULL) {
595 /* There are other blocks to send */
596 free(slot->outqueue);
597 slot->outqueue = next;
598 slot->bytes_written = 0;
599 } else {
600 /* Last block sent */
601 free(slot->outqueue);
602 slot->outqueue = NULL;
603 #ifdef USE_POLL
604 s_fd_array[slot->idx].events &= (short)~POLLOUT;
605 #else
606 FD_CLR(slot->fd, &s_fdset_write);
607 #endif
608 }
609 }
610 } else if (bytes == 0 || (bytes < 0 && errno != EAGAIN)) {
611 slot->close_f = 1;
612 slot->errio_f = 1;
613 slot->errwrite_f = 1;
614 slot->io_errno = errno;
615 }
616 }
617 }
618
aio_process_func_list(void)619 static void aio_process_func_list(void)
620 {
621 int i;
622
623 s_sig_func_set = 0;
624 for (i = 0; i < 10; i++) {
625 if (s_sig_func[i] != NULL) {
626 (*s_sig_func[i])();
627 s_sig_func[i] = NULL;
628 }
629 }
630
631 aio_process_closed();
632 }
633
aio_accept_connection(AIO_SLOT * slot)634 static void aio_accept_connection(AIO_SLOT *slot)
635 {
636 struct sockaddr_in client_addr;
637 unsigned int len;
638 int fd;
639 AIO_SLOT *new_slot, *saved_slot;
640
641 if (!slot->close_f) {
642 len = sizeof(client_addr);
643 fd = accept(slot->fd, (struct sockaddr *) &client_addr, &len);
644 if (fd < 0)
645 return;
646
647 new_slot = aio_new_slot(fd, inet_ntoa(client_addr.sin_addr),
648 slot->bytes_to_read);
649
650 saved_slot = cur_slot;
651 cur_slot = new_slot;
652 (*slot->readfunc)();
653 cur_slot = saved_slot;
654 }
655 }
656
aio_process_closed(void)657 static void aio_process_closed(void)
658 {
659 AIO_SLOT *slot, *next_slot;
660
661 slot = s_first_slot;
662 while (slot != NULL && !s_close_f) {
663 next_slot = slot->next;
664 if (slot->close_f)
665 aio_destroy_slot(slot, 0);
666 slot = next_slot;
667 }
668 }
669
670 /*
671 * Destroy a slot, free all its memory etc. If fatal != 0, assume all
672 * slots would be removed one after another so do not care about such
673 * things as correctness of slot list links, setfd_* masks etc.
674 */
675
676 /* FIXME: Dangerous. Changes slot list while we might iterate over it. */
677
aio_destroy_slot(AIO_SLOT * slot,int fatal)678 static void aio_destroy_slot(AIO_SLOT *slot, int fatal)
679 {
680 AIO_BLOCK *block, *next_block;
681 #ifdef USE_POLL
682 AIO_SLOT *h_slot;
683 #endif
684
685 /* Call on-close hook */
686 if (slot->closefunc != NULL) {
687 cur_slot = slot;
688 (*slot->closefunc)();
689 }
690
691 if (!fatal) {
692 /* Remove from the slot list */
693 if (slot->prev == NULL)
694 s_first_slot = slot->next;
695 else
696 slot->prev->next = slot->next;
697 if (slot->next == NULL)
698 s_last_slot = slot->prev;
699 else
700 slot->next->prev = slot->prev;
701
702 /* Remove references to descriptor */
703 #ifdef USE_POLL
704 if (s_fd_array_size - 1 > slot->idx) {
705 memmove(&s_fd_array[slot->idx],
706 &s_fd_array[slot->idx + 1],
707 (s_fd_array_size - slot->idx - 1) * sizeof(struct pollfd));
708 for (h_slot = slot->next; h_slot != NULL; h_slot = h_slot->next)
709 h_slot->idx--;
710 }
711 s_fd_array_size--;
712 #else
713 if (!slot->fd_closed_f) {
714 FD_CLR(slot->fd, &s_fdset_read);
715 FD_CLR(slot->fd, &s_fdset_write);
716 if (slot->fd == s_max_fd) {
717 /* NOTE: Better way is to find _existing_ max fd */
718 s_max_fd--;
719 }
720 }
721 #endif
722 }
723
724 /* Free all memory in slave structures */
725 block = slot->outqueue;
726 while (block != NULL) {
727 next_block = block->next;
728 free(block);
729 block = next_block;
730 }
731 free(slot->name);
732 if (slot->alloc_f)
733 free(slot->readbuf);
734
735 /* Close the file and free the slot itself */
736 if (!slot->fd_closed_f)
737 close(slot->fd);
738
739 free(slot);
740 }
741
742 /*
743 * Signal handler catching SIGTERM and SIGINT signals
744 */
745
sh_interrupt(int signo)746 static void sh_interrupt(int signo)
747 {
748 s_close_f = 1;
749 signal(signo, sh_interrupt);
750 }
751
752