1 /* $Id: endpoint.c 9895 2015-06-14 10:08:21Z iulius $
2 **
3 ** The implementation of the innfeed EndPoint object class.
4 **
5 ** Written by James Brister <brister@vix.com>
6 **
7 ** The EndPoint class is what gives the illusion (sort of, kind of) of
8 ** threading. Basically it controls a select loop and a set of EndPoint
9 ** objects. Each EndPoint has a file descriptor it is interested in. The
10 ** users of the EndPoint tell the EndPoints to notify them when a read or
11 ** write has been completed (or simple if the file descriptor is read or
12 ** write ready).
13 */
14
15 #include "innfeed.h"
16 #include "config.h"
17 #include "clibrary.h"
18 #include "portable/socket.h"
19
20 #include <assert.h>
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <signal.h>
24 #include <sys/stat.h>
25 #include <sys/uio.h>
26 #include <syslog.h>
27
28 #ifdef HAVE_LIMITS_H
29 # include <limits.h>
30 #endif
31
32 #ifdef HAVE_SYS_SELECT_H
33 # include <sys/select.h>
34 #endif
35
36 #ifdef HAVE_SYS_TIME_H
37 # include <sys/time.h>
38 #endif
39 #include <time.h>
40
41 #include "inn/innconf.h"
42 #include "inn/messages.h"
43 #include "inn/libinn.h"
44
45 #include "buffer.h"
46 #include "configfile.h"
47 #include "endpoint.h"
48 #include "host.h"
49
50 static const char *const timer_name[] = {
51 "idle", "blstats", "stsfile", "newart", "readart", "prepart", "read",
52 "write", "cb"
53 };
54
55 #if ! defined (NSIG)
56 #define NSIG 32
57 #endif
58
59
60 /* This is the structure that is the EndPoint */
61 struct endpoint_s
62 {
63 /* fields for managing multiple reads into the inBuffer. */
64 Buffer *inBuffer ; /* list of buffers to read into */
65 unsigned int inBufferIdx ; /* where is list we're at. */
66 size_t inIndex ; /* where in current read we're at */
67 size_t inMinLen ; /* minimum amount to read */
68 size_t inAmtRead ; /* amount read so far */
69 EndpRWCB inCbk ; /* callback for when read complete */
70 void *inClientData ; /* callback data */
71
72 /* fields for managing multiple writes from the outBuffer */
73 Buffer *outBuffer ; /* list of buffers to write */
74 unsigned int outBufferIdx ; /* index into buffer list to start write */
75 size_t outIndex ; /* where in current buffer we write from */
76 size_t outSize ; /* total of all the buffers */
77 size_t outAmtWritten ; /* amount written so far */
78 EndpRWCB outProgressCbk ; /* callback when done */
79 EndpRWCB outDoneCbk ; /* callback when done */
80 void *outClientData ; /* callback data */
81
82 EndpWorkCbk workCbk ; /* callback to run if no I/O to do */
83 void *workData ; /* data for callback */
84
85 int myFd ; /* the file descriptor we're handling */
86 int myErrno ; /* the errno when I/O fails */
87
88 double selectHits ; /* indicates how often it's ready */
89 };
90
91
92
93 /* A private structure. These hold the information on the timer callbacks. */
94 typedef struct timerqelem_s
95 {
96 TimeoutId id ; /* the id we gave out */
97 time_t when ; /* The time the timer should go off */
98 EndpTCB func ; /* the function to call */
99 void *data ; /* the client callback data */
100 struct timerqelem_s *next ; /* next in the queue */
101 } *TimerElem, TimerElemStruct ;
102
103
104
105 /* set to 1 elsewhere if you want stderr to get what's also being written
106 in doWrite. */
107 int debugWrites ;
108
109 extern const char *InputFile ;
110
111 static EndPoint mainEndPoint ;
112 static bool mainEpIsReg = false ;
113 unsigned int stdioFdMax = MAX_STDIO_FD ;
114
115 time_t PrivateTime;
116
117
118 typedef void (*sigfn) (int) ;
119 static sigfn *sigHandlers ;
120
121 static volatile sig_atomic_t *sigFlags ;
122
123
124
125 /* private functions */
126 static IoStatus doRead (EndPoint endp) ;
127 static IoStatus doWrite (EndPoint endp) ;
128 static IoStatus doExcept (EndPoint endp) ;
129 static void pipeHandler (int s) ;
130 static void signalHandler (int s) ;
131 static int hitCompare (const void *v1, const void *v2) ;
132 static void reorderPriorityList (void) ;
133 static TimerElem newTimerElem (TimeoutId i, time_t w, EndpTCB f, void *d) ;
134 static TimeoutId timerElemAdd (time_t when, EndpTCB func, void *data) ;
135 static struct timeval *getTimeout (struct timeval *tout) ;
136 static void doTimeout (void) ;
137 static void handleSignals (void) ;
138
139 #if 0
140 static int ff_set (fd_set *set, unsigned int start) ;
141 static int ff_free (fd_set *set, unsigned int start) ;
142 #endif
143 static void endpointCleanup (void) ;
144
145
146 /* Private data */
147 static size_t maxEndPoints ;
148
149 static EndPoint *endPoints ; /* endpoints indexed on fd */
150 static EndPoint *priorityList ; /* endpoints indexed on priority */
151
152 static int absHighestFd = 0 ; /* never goes down */
153 static int highestFd = -1 ;
154 static unsigned int endPointCount = 0 ;
155 static unsigned int priorityCount = 0 ;
156
157 static fd_set rdSet ;
158 static fd_set wrSet ;
159 static fd_set exSet ;
160
161 static int keepSelecting ;
162
163 static TimerElem timeoutQueue ;
164 static TimerElem timeoutPool ;
165 static TimeoutId nextId ;
166 static int timeoutQueueLength ;
167
168
169
170
171 /* Create a new EndPoint and hook it to the give file descriptor. All
172 fields are initialized to appropriate values. On the first time this
173 function is called the global data structs that manages lists of
174 endpoints are intialized. */
175 static bool inited = false ;
176
newEndPoint(int fd)177 EndPoint newEndPoint (int fd)
178 {
179 EndPoint ep ;
180
181 if (!inited)
182 {
183 inited = true ;
184 atexit (endpointCleanup) ;
185 }
186
187 if (fd < 0)
188 return NULL ;
189
190 /* try to dup the fd to a larger number to leave lower values free for
191 broken stdio implementations. */
192 if (stdioFdMax > 0 && ((unsigned int) fd) <= stdioFdMax)
193 {
194 int newfd = fcntl(fd, F_DUPFD, stdioFdMax + 1);
195 if (newfd >= 0)
196 {
197 d_printf (1,"Dupped fd %d to %d\n",fd,newfd) ;
198 if (close (fd) != 0)
199 syswarn ("ME oserr close (%d)", fd) ;
200 }
201 else
202 {
203 d_printf (1,"Couldn't dup fd %d to above %d\n",fd,stdioFdMax) ;
204 newfd = fd ;
205 }
206
207 fd = newfd ;
208 }
209
210 if ((unsigned int) fd >= maxEndPoints)
211 {
212 unsigned int i = maxEndPoints ;
213
214 maxEndPoints = (((fd + 256) / 256) * 256); /* round up to nearest 256 */
215 if (endPoints == NULL)
216 {
217 endPoints = xmalloc (sizeof(EndPoint) * maxEndPoints) ;
218 priorityList = xmalloc (sizeof(EndPoint) * maxEndPoints) ;
219 }
220 else
221 {
222 endPoints = xrealloc (endPoints,sizeof(EndPoint) * maxEndPoints) ;
223 priorityList = xrealloc (priorityList,
224 sizeof(EndPoint) * maxEndPoints) ;
225 }
226
227 for ( ; i < maxEndPoints ; i++)
228 endPoints [i] = priorityList [i] = NULL ;
229 }
230
231 ASSERT (endPoints [fd] == NULL) ;
232
233 if (fd > absHighestFd)
234 {
235
236 #if defined (FD_SETSIZE)
237 if ((unsigned int) fd >= FD_SETSIZE)
238 {
239 warn ("ME fd (%d) looks too big (%d -- FD_SETSIZE)", fd,
240 FD_SETSIZE) ;
241 return NULL ;
242 }
243 #else
244 if (fd > (sizeof (fd_set) * CHAR_BIT))
245 {
246 warn ("ME fd (%d) looks too big (%d -- sizeof (fd_set) * CHAR_BIT)",
247 fd, (sizeof (fd_set) * CHAR_BIT)) ;
248 return NULL ;
249 }
250 #endif
251
252 absHighestFd = fd ;
253 }
254
255 ep = xcalloc (1, sizeof(struct endpoint_s)) ;
256
257 ep->inBuffer = NULL ;
258 ep->inBufferIdx = 0 ;
259 ep->inIndex = 0 ;
260 ep->inMinLen = 0 ;
261 ep->inAmtRead = 0 ;
262 ep->inCbk = NULL ;
263 ep->inClientData = NULL ;
264
265 ep->outBuffer = 0 ;
266 ep->outBufferIdx = 0 ;
267 ep->outIndex = 0 ;
268 ep->outSize = 0 ;
269 ep->outProgressCbk = NULL ;
270 ep->outDoneCbk = NULL ;
271 ep->outClientData = NULL ;
272 ep->outAmtWritten = 0 ;
273
274 ep->workCbk = NULL ;
275 ep->workData = NULL ;
276
277 ep->myFd = fd ;
278 ep->myErrno = 0 ;
279
280 ep->selectHits = 0.0 ;
281
282 endPoints [fd] = ep ;
283 priorityList [priorityCount++] = ep ;
284 endPointCount++ ;
285
286 highestFd = (fd > highestFd ? fd : highestFd) ;
287
288 return ep ;
289 }
290
291
292
293 /* Delete the given endpoint. The files descriptor is closed and the two
294 Buffer arrays are released. */
295
delEndPoint(EndPoint ep)296 void delEndPoint (EndPoint ep)
297 {
298 unsigned int idx ;
299
300 if (ep == NULL)
301 return ;
302
303 ASSERT (endPoints [ep->myFd] == ep) ;
304
305 if (mainEndPoint == ep)
306 mainEndPoint = NULL ;
307
308 if (ep->inBuffer != NULL)
309 freeBufferArray (ep->inBuffer) ;
310
311 if (ep->outBuffer != NULL)
312 freeBufferArray (ep->outBuffer) ;
313
314 close (ep->myFd) ;
315
316 /* remove from selectable bits */
317 FD_CLR (ep->myFd,&rdSet) ;
318 FD_CLR (ep->myFd,&wrSet) ;
319 FD_CLR (ep->myFd,&exSet) ;
320
321 /* Adjust the global arrays to account for deleted endpoint. */
322 endPoints [ep->myFd] = NULL ;
323 if (ep->myFd == highestFd)
324 while (highestFd >= 0 && endPoints [highestFd] == NULL)
325 highestFd-- ;
326
327 for (idx = 0 ; idx < priorityCount ; idx++)
328 if (priorityList [idx] == ep)
329 break ;
330
331 ASSERT (idx < priorityCount) ; /* i.e. was found */
332 ASSERT (priorityList [idx] == ep) ; /* redundant */
333
334 /* this hole will removed in the reorder routine */
335 priorityList [idx] = NULL ;
336
337 endPointCount-- ;
338
339 free (ep) ;
340 }
341
endPointFd(EndPoint endp)342 int endPointFd (EndPoint endp)
343 {
344 ASSERT (endp != NULL) ;
345
346 return endp->myFd ;
347 }
348
349
350
351
352 /* Request a read to be done next time there's data. The endpoint ENDP
353 * is what will do the read. BUFFERS is the array of Buffers the data
354 * should go into. FUNC is the callback function to call when the read
355 * is complete. CLIENTDATA is the client data to pass back into the
356 * callback function. MINLEN is the minimum amount of data to
357 * read. If MINLEN is 0 then then BUFFERS must be filled, otherwise at
358 * least MINLEN bytes must be read.
359 *
360 * BUFFERS can be NULL, in which case no read is actually done, but the
361 * callback function will be called still. This is useful for
362 * listening sockets.
363 *
364 * Returns 0 if the read couldn't be prepared (for example if a read
365 * is already outstanding).
366 */
367
prepareRead(EndPoint endp,Buffer * buffers,EndpRWCB func,void * clientData,int minlen)368 int prepareRead (EndPoint endp,
369 Buffer *buffers,
370 EndpRWCB func,
371 void *clientData,
372 int minlen)
373 {
374 int bufferSizeTotal = 0 ;
375 int idx ;
376
377 ASSERT (endp != NULL) ;
378
379 if (endp->inBuffer != NULL || FD_ISSET (endp->myFd,&rdSet))
380 return 0 ; /* something already there */
381
382 for (idx = 0 ; buffers != NULL && buffers [idx] != NULL ; idx++)
383 {
384 size_t bs = bufferSize (buffers [idx]) ;
385 size_t bds = bufferDataSize (buffers [idx]) ;
386
387 bufferSizeTotal += (bs - bds) ;
388 }
389
390 endp->inBuffer = buffers ;
391 endp->inBufferIdx = 0 ;
392 endp->inIndex = 0 ;
393 endp->inMinLen = (minlen > 0 ? minlen : bufferSizeTotal) ;
394 endp->inCbk = func ;
395 endp->inAmtRead = 0 ;
396 endp->inClientData = clientData ;
397
398 FD_SET (endp->myFd, &rdSet) ;
399 if ( InputFile == NULL )
400 FD_SET (endp->myFd, &exSet) ;
401
402 return 1 ;
403 }
404
405
406
407 /* Request a write to be done at a later point. ENDP is the EndPoint to
408 * do the write. BUFFERS is the array of Buffers to write from. FUNC is
409 * the function to call when the write is complete. CLIENTDATA is some
410 * data to hand back to the callback function.
411 *
412 * If BUFFERS is NULL, then no write will actually by done, but the
413 * callback function will still be called. This is useful for
414 * connecting sockets.
415 *
416 * Returns 0 if the write couldn't be prepared (like if a write is
417 * still in process.
418 */
prepareWrite(EndPoint endp,Buffer * buffers,EndpRWCB progress,EndpRWCB done,void * clientData)419 int prepareWrite (EndPoint endp,
420 Buffer *buffers,
421 EndpRWCB progress,
422 EndpRWCB done,
423 void *clientData)
424 {
425 int bufferSizeTotal = 0 ;
426 int idx ;
427
428 ASSERT (endp != NULL) ;
429
430 if (endp->outBuffer != NULL || FD_ISSET (endp->myFd,&wrSet))
431 return 0 ; /* something already there */
432
433 for (idx = 0 ; buffers != NULL && buffers [idx] != NULL ; idx++)
434 bufferSizeTotal += bufferDataSize (buffers [idx]) ;
435
436 endp->outBuffer = buffers ;
437 endp->outBufferIdx = 0 ;
438 endp->outIndex = 0 ;
439 endp->outProgressCbk = progress ;
440 endp->outDoneCbk = done ;
441 endp->outClientData = clientData ;
442 endp->outSize = bufferSizeTotal ;
443 endp->outAmtWritten = 0 ;
444
445 FD_SET (endp->myFd, &wrSet) ;
446 FD_SET (endp->myFd, &exSet) ;
447
448 return 1 ;
449 }
450
451
452 /* Cancel the pending read. */
cancelRead(EndPoint endp)453 void cancelRead (EndPoint endp)
454 {
455 FD_CLR (endp->myFd,&rdSet) ;
456 if (!FD_ISSET (endp->myFd, &wrSet))
457 FD_CLR (endp->myFd,&exSet) ;
458
459 freeBufferArray (endp->inBuffer) ;
460
461 endp->inBuffer = NULL ;
462 endp->inBufferIdx = 0 ;
463 endp->inIndex = 0 ;
464 endp->inMinLen = 0 ;
465 endp->inAmtRead = 0 ;
466 endp->inCbk = NULL ;
467 endp->inClientData = NULL ;
468 }
469
470
471 /* cancel all pending writes. The first len bytes of the queued write are
472 copied to buffer. The number of bytes copied (if it is less than *len) is
473 copied to len. If no write was outstanding then len will have 0 stored in
474 it. */
cancelWrite(EndPoint endp,char * buffer UNUSED,size_t * len UNUSED)475 void cancelWrite (EndPoint endp, char *buffer UNUSED, size_t *len UNUSED)
476 {
477 FD_CLR (endp->myFd, &wrSet) ;
478 if (!FD_ISSET (endp->myFd, &rdSet))
479 FD_CLR (endp->myFd, &exSet) ;
480
481 #if 0
482 #error XXX need to copy data to buffer and *len
483 #endif
484
485 freeBufferArray (endp->outBuffer) ;
486
487 endp->outBuffer = NULL ;
488 endp->outBufferIdx = 0 ;
489 endp->outIndex = 0 ;
490 endp->outProgressCbk = NULL ;
491 endp->outDoneCbk = NULL ;
492 endp->outClientData = NULL ;
493 endp->outSize = 0 ;
494 endp->outAmtWritten = 0 ;
495 }
496
497 /* queue up a new timeout request. to go off at a specific time. */
prepareWake(EndpTCB func,time_t timeToWake,void * clientData)498 TimeoutId prepareWake (EndpTCB func, time_t timeToWake, void *clientData)
499 {
500 TimeoutId id ;
501 time_t now = theTime() ;
502
503 if (now > timeToWake)
504 return 0 ;
505
506 id = timerElemAdd (timeToWake,func,clientData) ;
507
508 #if 0
509 d_printf (1, "Preparing wake %d at date %ld for %ld seconds\n",
510 (int) id, (long) now, (long) (timeToWake - now)) ;
511 #endif
512
513 return id ;
514 }
515
516
517 /* queue up a new timeout request to off TIMETOSLEEP seconds from now */
prepareSleep(EndpTCB func,int timeToSleep,void * clientData)518 TimeoutId prepareSleep (EndpTCB func, int timeToSleep, void *clientData)
519 {
520 time_t now = theTime() ;
521 TimeoutId id ;
522
523 id = timerElemAdd (now + timeToSleep,func,clientData) ;
524
525 #if 0
526 d_printf (1, "Preparing sleep %d at date %ld for %ld seconds\n",
527 (int) id, (long) now, (long) timeToSleep) ;
528 #endif
529
530 return id ;
531 }
532
533
534 /* Updates an existing timeout request to go off TIMETOSLEEP seconds from
535 now, or queues a new request. Always returns a new ID. */
updateSleep(TimeoutId tid,EndpTCB func,int timeToSleep,void * clientData)536 TimeoutId updateSleep (TimeoutId tid, EndpTCB func, int timeToSleep,
537 void *clientData)
538 {
539 if (tid == 0)
540 return prepareSleep (func, timeToSleep, clientData) ;
541 else
542 {
543 /* XXX - quick and dirty but CPU wasteful implementation */
544 removeTimeout (tid) ;
545 return prepareSleep (func, timeToSleep, clientData) ;
546 }
547 }
548
549
550 /* Remove a timeout that was previously prepared. 0 is a legal value that
551 is just ignored. */
removeTimeout(TimeoutId tid)552 bool removeTimeout (TimeoutId tid)
553 {
554 TimerElem n = timeoutQueue ;
555 TimerElem p = NULL ;
556
557 if (tid == 0)
558 return true ;
559
560 while (n != NULL && n->id != tid)
561 {
562 p = n ;
563 n = n->next ;
564 }
565
566 if (n == NULL)
567 return false ;
568
569 if (p == NULL) /* at the head. */
570 timeoutQueue = n->next ;
571 else
572 p->next = n->next ;
573
574 n->next = timeoutPool ;
575 timeoutPool = n ;
576
577 timeoutQueueLength-- ;
578
579 return true ;
580 }
581
582
583 /* The main routine. This is a near-infinite loop that drives the whole
584 program. */
Run(void)585 void Run (void)
586 {
587 fd_set rSet ;
588 fd_set wSet ;
589 fd_set eSet ;
590
591 keepSelecting = 1 ;
592 xsignal (SIGPIPE, pipeHandler) ;
593
594 while (keepSelecting)
595 {
596 struct timeval timeout ;
597 struct timeval *twait ;
598 int sval ;
599 unsigned int idx ;
600 bool modifiedTime = false ;
601
602 twait = getTimeout (&timeout) ;
603
604 memcpy (&rSet,&rdSet,sizeof (rdSet)) ;
605 memcpy (&wSet,&wrSet,sizeof (wrSet)) ;
606 memcpy (&eSet,&exSet,sizeof (exSet)) ;
607
608 if (highestFd < 0 && twait == NULL) /* no fds and no timeout */
609 break ;
610 else if (twait != NULL && (twait->tv_sec != 0 || twait->tv_usec != 0))
611 {
612 /* if we have any workprocs registered we poll rather than
613 block on the fds */
614 for (idx = 0 ; idx < priorityCount ; idx++)
615 if (priorityList [idx] != NULL &&
616 priorityList [idx]->workCbk != NULL)
617 {
618 modifiedTime = true ;
619 twait->tv_sec = 0 ;
620 twait->tv_usec = 0 ;
621
622 break ;
623 }
624 }
625
626 /* calculate host backlog statistics */
627 TMRstart(TMR_BACKLOGSTATS);
628 gCalcHostBlStat ();
629 TMRstop(TMR_BACKLOGSTATS);
630
631 TMRstart(TMR_IDLE);
632 sval = select (highestFd + 1, &rSet, &wSet, &eSet, twait) ;
633 TMRstop(TMR_IDLE);
634
635 timePasses () ;
636 if (innconf->timer != 0 && TMRnow() > innconf->timer * 1000) {
637 TMRsummary ("ME", timer_name);
638 }
639
640 if (sval == 0 && twait == NULL)
641 die ("No fd's ready and no timeouts") ;
642 else if (sval < 0 && errno == EINTR)
643 {
644 handleSignals () ;
645 }
646 else if (sval < 0)
647 {
648 syswarn ("ME exception: select failed: %d", sval) ;
649 stopRun () ;
650 }
651 else if (sval > 0)
652 {
653 IoStatus rval ;
654 int readyCount = sval ;
655 int endpointsServiced = 1 ;
656
657 handleSignals() ;
658
659 for (idx = 0 ; idx < priorityCount ; idx++)
660 {
661 EndPoint ep = priorityList [idx] ;
662 bool specialCheck = false ;
663
664 if (readyCount > 0 && ep != NULL)
665 {
666 int fd = ep->myFd ;
667 int selectHit = 0, readMiss = 0, writeMiss = 0 ;
668
669 /* Every SELECT_RATIO times we service an endpoint in this
670 loop we check to see if the mainEndPoint fd is ready to
671 read or write. If so we process it and do the current
672 endpoint next time around. */
673 if (((endpointsServiced % (SELECT_RATIO + 1)) == 0) &&
674 ep != mainEndPoint && mainEndPoint != NULL &&
675 !mainEpIsReg)
676 {
677 fd_set trSet, twSet ;
678 struct timeval tw ;
679 int checkRead = FD_ISSET (mainEndPoint->myFd,&rdSet) ;
680 int checkWrite = FD_ISSET (mainEndPoint->myFd,&wrSet) ;
681
682 endpointsServiced++;
683
684 if (checkRead || checkWrite)
685 {
686 fd = mainEndPoint->myFd ;
687
688 tw.tv_sec = tw.tv_usec = 0 ;
689 memset (&trSet,0,sizeof (trSet)) ;
690 memset (&twSet,0,sizeof (twSet)) ;
691
692 if (checkRead)
693 FD_SET (fd,&trSet) ;
694 if (checkWrite)
695 FD_SET (fd,&twSet) ;
696
697 sval = select (fd + 1,&trSet,&twSet,0,&tw) ;
698
699 if (sval > 0)
700 {
701 idx-- ;
702 ep = mainEndPoint ;
703 specialCheck = true ;
704 if (checkRead && FD_ISSET (fd,&trSet))
705 {
706 FD_SET (fd,&rSet) ;
707 readyCount++ ;
708 }
709 if (checkWrite && FD_ISSET (fd,&twSet))
710 {
711 FD_SET (fd,&wSet) ;
712 readyCount++ ;
713 }
714 }
715 else if (sval < 0)
716 {
717 syswarn ("ME exception: select failed: %d",
718 sval) ;
719 stopRun () ;
720 return ;
721 }
722 else
723 fd = ep->myFd ; /* back to original fd. */
724 }
725 }
726
727 FD_CLR (fd, &exSet) ;
728
729 if (FD_ISSET (fd,&rSet))
730 {
731 readyCount-- ;
732 endpointsServiced++ ;
733 selectHit = 1 ;
734
735 if ((rval = doRead (ep)) != IoIncomplete)
736 {
737 Buffer *buff = ep->inBuffer ;
738
739 FD_CLR (fd, &rdSet) ;
740
741 /* incase callback wants to issue read */
742 ep->inBuffer = NULL ;
743
744 if (ep->inCbk != NULL)
745 (*ep->inCbk) (ep,rval,buff,ep->inClientData) ;
746 else
747 freeBufferArray (buff) ;
748 }
749 else
750 {
751 if ( InputFile == NULL )
752 FD_SET (ep->myFd, &exSet) ;
753 }
754 }
755 else if (FD_ISSET(fd,&rdSet))
756 readMiss = 1;
757
758 /* get it again as the read callback may have deleted the */
759 /* endpoint */
760 if (specialCheck)
761 ep = mainEndPoint ;
762 else
763 ep = priorityList [idx] ;
764
765 if (readyCount > 0 && ep != NULL && FD_ISSET (fd,&wSet))
766 {
767 readyCount-- ;
768 endpointsServiced++ ;
769 selectHit = 1 ;
770
771 if ((rval = doWrite (ep)) != IoIncomplete &&
772 rval != IoProgress)
773 {
774 Buffer *buff = ep->outBuffer ;
775
776 FD_CLR (fd, &wrSet) ;
777
778 /* incase callback wants to issue a write */
779 ep->outBuffer = NULL ;
780
781 if (ep->outDoneCbk != NULL)
782 (*ep->outDoneCbk) (ep,rval,buff,ep->outClientData) ;
783 else
784 freeBufferArray (buff) ;
785 }
786 else if (rval == IoProgress)
787 {
788 Buffer *buff = ep->outBuffer ;
789
790 if (ep->outProgressCbk != NULL)
791 (*ep->outProgressCbk) (ep,rval,buff,ep->outClientData) ;
792 }
793 else
794 {
795 FD_SET (ep->myFd, &exSet) ;
796 }
797 }
798 else if (FD_ISSET(fd,&wrSet))
799 writeMiss = 1;
800
801 /* get it again as the write callback may have deleted the */
802 /* endpoint */
803 if (specialCheck)
804 ep = mainEndPoint ;
805 else
806 ep = priorityList [idx] ;
807
808 if (ep != NULL)
809 {
810 ep->selectHits *= 0.9 ;
811 if (selectHit)
812 ep->selectHits += 1.0 ;
813 else if (readMiss && writeMiss)
814 ep->selectHits -= 1.0 ;
815 }
816
817 if (readyCount > 0 && ep != NULL && FD_ISSET (fd,&eSet))
818 doExcept (ep) ;
819 }
820 }
821
822 reorderPriorityList () ;
823 }
824 else if (sval == 0 && !modifiedTime)
825 doTimeout () ;
826
827 /* now we're done processing all read fds and/or the
828 timeout(s). Next we do the work callbacks for all the endpoints
829 whose fds weren't ready. */
830 for (idx = 0 ; idx < priorityCount ; idx++)
831 {
832 EndPoint ep = priorityList [idx] ;
833
834 if (ep != NULL)
835 {
836 int fd = ep->myFd ;
837
838 if ( !FD_ISSET (fd,&rSet) && !FD_ISSET (fd,&wSet) )
839 if (ep->workCbk != NULL)
840 {
841 EndpWorkCbk func = ep->workCbk ;
842 void *data = ep->workData ;
843
844 ep->workCbk = NULL ;
845 ep->workData = NULL ;
846 TMRstart(TMR_CALLBACK);
847 func (ep,data) ;
848 TMRstop(TMR_CALLBACK);
849 }
850
851 }
852 }
853 }
854 }
855
addWorkCallback(EndPoint endp,EndpWorkCbk cbk,void * data)856 void *addWorkCallback (EndPoint endp, EndpWorkCbk cbk, void *data)
857 {
858 void *oldBk = endp->workData ;
859
860 endp->workCbk = cbk ;
861 endp->workData = data ;
862
863 return oldBk ;
864 }
865
866 /* Tell the Run routine to stop next time around. */
stopRun(void)867 void stopRun (void)
868 {
869 keepSelecting = 0 ;
870 }
871
872
endPointErrno(EndPoint endp)873 int endPointErrno (EndPoint endp)
874 {
875 return endp->myErrno ;
876 }
877
readIsPending(EndPoint endp)878 bool readIsPending (EndPoint endp)
879 {
880 return (endp->inBuffer != NULL ? true : false) ;
881 }
882
writeIsPending(EndPoint endp)883 bool writeIsPending (EndPoint endp)
884 {
885 return (endp->outBuffer != NULL ? true : false) ;
886 }
887
setMainEndPoint(EndPoint endp)888 void setMainEndPoint (EndPoint endp)
889 {
890 struct stat buf ;
891
892 mainEndPoint = endp ;
893 if (endp->myFd >= 0 && fstat (endp->myFd,&buf) < 0)
894 syslog (LOG_ERR,"Can't fstat mainEndPoint fd (%d): %m", endp->myFd) ;
895 else if (endp->myFd < 0)
896 syslog (LOG_ERR,"Negative fd for mainEndPoint???") ;
897 else
898 mainEpIsReg = (S_ISREG(buf.st_mode) ? true : false) ;
899 }
900
getMainEndPointFd(void)901 int getMainEndPointFd (void)
902 {
903 return(mainEndPoint->myFd) ;
904 }
905
freeTimeoutQueue(void)906 void freeTimeoutQueue (void)
907 {
908 TimerElem p, n ;
909
910 p = timeoutQueue ;
911 while (p)
912 {
913 n = p->next ;
914 p->next = timeoutPool ;
915 timeoutPool = p;
916 p = n ;
917 timeoutQueueLength-- ;
918 }
919 }
920
921
922 /***********************************************************************/
923 /* STATIC FUNCTIONS BELOW HERE */
924 /***********************************************************************/
925
926
927 /*
928 * called when the file descriptor on this endpoint is read ready.
929 */
doRead(EndPoint endp)930 static IoStatus doRead (EndPoint endp)
931 {
932 int i = 0 ;
933 unsigned int idx ;
934 unsigned int bCount = 0 ;
935 struct iovec *vp = NULL ;
936 Buffer *buffers = endp->inBuffer ;
937 unsigned int currIdx = endp->inBufferIdx ;
938 size_t amt = 0 ;
939 IoStatus rval = IoIncomplete ;
940
941 TMRstart(TMR_READ);
942 for (i = currIdx ; buffers && buffers [i] != NULL ; i++)
943 bCount++ ;
944
945 bCount = (bCount > IOV_MAX ? IOV_MAX : bCount) ;
946
947 /* now set up the iovecs for the readv */
948 if (bCount > 0)
949 {
950 char *bbase ;
951 size_t bds, bs ;
952
953 vp = xcalloc (bCount, sizeof(struct iovec)) ;
954
955 bbase = bufferBase (buffers[currIdx]) ;
956 bds = bufferDataSize (buffers[currIdx]) ;
957 bs = bufferSize (buffers [currIdx]) ;
958
959 /* inIndex is an index in the virtual array of the read, not directly
960 into the buffer. */
961 vp[0].iov_base = bbase + bds + endp->inIndex ;
962 vp[0].iov_len = bs - bds - endp->inIndex ;
963
964 amt = vp[0].iov_len ;
965
966 for (idx = currIdx + 1 ; idx < bCount ; idx++)
967 {
968 bbase = bufferBase (buffers[idx]) ;
969 bds = bufferDataSize (buffers[idx]) ;
970 bs = bufferSize (buffers [idx]) ;
971
972 vp [idx].iov_base = bbase ;
973 vp [idx].iov_len = bs - bds ;
974 amt += (bs - bds) ;
975 }
976
977 i = readv (endp->myFd,vp,(int) bCount) ;
978
979 if (i > 0)
980 {
981 size_t readAmt = (size_t) i ;
982
983 endp->inAmtRead += readAmt ;
984
985 /* check if we filled the first buffer */
986 if (readAmt >= (size_t) vp[0].iov_len)
987 { /* we did */
988 bufferIncrDataSize (buffers[currIdx], vp[0].iov_len) ;
989 readAmt -= vp [0].iov_len ;
990 endp->inBufferIdx++ ;
991 }
992 else
993 {
994 endp->inIndex += readAmt ;
995 bufferIncrDataSize (buffers[currIdx], readAmt) ;
996 readAmt = 0 ;
997 }
998
999 /* now check the rest of the buffers */
1000 for (idx = 1 ; readAmt > 0 ; idx++)
1001 {
1002 ASSERT (idx < bCount) ;
1003
1004 bs = bufferSize (buffers [currIdx + idx]) ;
1005 bbase = bufferBase (buffers [currIdx + idx]) ;
1006 bds = bufferDataSize (buffers [currIdx + idx]) ;
1007
1008 if (readAmt >= (bs - bds))
1009 {
1010 bufferSetDataSize (buffers [currIdx + idx],bs) ;
1011 readAmt -= bs ;
1012 endp->inBufferIdx++ ;
1013 }
1014 else
1015 {
1016 endp->inIndex = readAmt ;
1017 bufferIncrDataSize (buffers [currIdx + idx], readAmt) ;
1018 memset (bbase + bds + readAmt, 0, bs - bds - readAmt) ;
1019 readAmt = 0 ;
1020 }
1021 }
1022
1023 if (endp->inAmtRead >= endp->inMinLen)
1024 {
1025 endp->inIndex = 0 ;
1026 rval = IoDone ;
1027 }
1028 }
1029 else if (i < 0 && errno != EINTR && errno != EAGAIN)
1030 {
1031 endp->myErrno = errno ;
1032 rval = IoFailed ;
1033 }
1034 else if (i < 0 && errno == EINTR)
1035 {
1036 handleSignals () ;
1037 }
1038 else if (i == 0)
1039 rval = IoEOF ;
1040 else /* i < 0 && errno == EAGAIN */
1041 rval = IoIncomplete ;
1042
1043 free (vp) ;
1044 }
1045 else
1046 rval = IoDone ;
1047 TMRstop(TMR_READ);
1048 return rval ;
1049 }
1050
1051 /* called when the file descriptor on the endpoint is write ready. */
doWrite(EndPoint endp)1052 static IoStatus doWrite (EndPoint endp)
1053 {
1054 unsigned int idx ;
1055 int i = 0 ;
1056 size_t amt = 0 ;
1057 unsigned int bCount = 0 ;
1058 struct iovec *vp = NULL ;
1059 Buffer *buffers = endp->outBuffer ;
1060 unsigned int currIdx = endp->outBufferIdx ;
1061 IoStatus rval = IoIncomplete ;
1062
1063 TMRstart(TMR_WRITE);
1064 for (i = currIdx ; buffers && buffers [i] != NULL ; i++)
1065 bCount++ ;
1066
1067 bCount = (bCount > IOV_MAX ? IOV_MAX : bCount) ;
1068
1069 if (bCount > 0)
1070 {
1071 vp = xcalloc (bCount, sizeof(struct iovec)) ;
1072
1073 vp[0].iov_base = bufferBase (buffers [currIdx]) ;
1074 vp[0].iov_base = (char *) vp[0].iov_base + endp->outIndex ;
1075 vp[0].iov_len = bufferDataSize (buffers [currIdx]) - endp->outIndex ;
1076
1077 amt = vp[0].iov_len ;
1078
1079 for (idx = 1 ; idx < bCount ; idx++)
1080 {
1081 vp [idx].iov_base = bufferBase (buffers [idx + currIdx]) ;
1082 vp [idx].iov_len = bufferDataSize (buffers [idx + currIdx]) ;
1083 amt += vp[idx].iov_len ;
1084 }
1085
1086 #if 1
1087 if (debugWrites)
1088 {
1089 /* nasty mixing, but stderr is unbuffered usually. It's debugging only */
1090 d_printf (5,"About to write this:================================\n") ;
1091 writev (2,vp,bCount) ;
1092 d_printf (5,"end=================================================\n") ;
1093 }
1094
1095 #endif
1096
1097 ASSERT (endp->myFd >= 0) ;
1098 ASSERT (vp != 0) ;
1099 ASSERT (bCount > 0) ;
1100
1101 i = writev (endp->myFd,vp,(int) bCount) ;
1102
1103 if (i > 0)
1104 {
1105 size_t writeAmt = (size_t) i ;
1106
1107 endp->outAmtWritten += writeAmt ;
1108
1109 /* now figure out which buffers got completely written */
1110 for (idx = 0 ; writeAmt > 0 ; idx++)
1111 {
1112 if (writeAmt >= (size_t) vp[idx].iov_len)
1113 {
1114 endp->outBufferIdx++ ;
1115 endp->outIndex = 0 ;
1116 writeAmt -= vp [idx].iov_len ;
1117 }
1118 else
1119 {
1120 /* this buffer was not completly written */
1121 endp->outIndex += writeAmt ;
1122 writeAmt = 0 ;
1123 }
1124 }
1125
1126 if (endp->outAmtWritten == endp->outSize)
1127 rval = IoDone ;
1128 else
1129 rval = IoProgress ;
1130 }
1131 else if (i < 0 && errno == EINTR)
1132 {
1133 handleSignals () ;
1134 }
1135 else if (i < 0 && errno == EAGAIN)
1136 {
1137 rval = IoIncomplete ;
1138 }
1139 else if (i < 0)
1140 {
1141 endp->myErrno = errno ;
1142 rval = IoFailed ;
1143 }
1144 else
1145 d_printf (1,"Wrote 0 bytes in doWrite()?\n") ;
1146
1147 free (vp) ;
1148 }
1149 else
1150 rval = IoDone ;
1151
1152 TMRstop(TMR_WRITE);
1153 return rval ;
1154 }
1155
1156
doExcept(EndPoint endp)1157 static IoStatus doExcept (EndPoint endp)
1158 {
1159 int optval;
1160 socklen_t size ;
1161 int fd = endPointFd (endp) ;
1162
1163 if (getsockopt (fd, SOL_SOCKET, SO_ERROR,
1164 (char *) &optval, &size) != 0)
1165 syswarn ("ME exception: getsockopt (%d)", fd) ;
1166 else if (optval != 0)
1167 {
1168 errno = optval ;
1169 syswarn ("ME exception: fd %d", fd) ;
1170 }
1171 else
1172 syswarn ("ME exception: fd %d: Unknown error", fd) ;
1173
1174 #if 0
1175 sleep (5) ;
1176 abort () ;
1177 #endif
1178
1179 /* Not reached */
1180 return IoFailed ;
1181 }
1182
1183 #if 0
1184 static void endPointPrint (EndPoint ep, FILE *fp)
1185 {
1186 fprintf (fp,"EndPoint [%p]: fd [%d]\n",(void *) ep, ep->myFd) ;
1187 }
1188 #endif
1189
signalHandler(int s)1190 static void signalHandler (int s)
1191 {
1192 sigFlags[s] = 1 ;
1193 #ifndef HAVE_SIGACTION
1194 xsignal (s, signalHandler) ;
1195 #endif
1196 }
1197
1198
pipeHandler(int s)1199 static void pipeHandler (int s)
1200 {
1201 xsignal (s, pipeHandler) ;
1202 }
1203
1204
1205 /* compare the hit ratio of two endpoint for qsort. We're sorting the
1206 endpoints on their relative activity */
hitCompare(const void * v1,const void * v2)1207 static int hitCompare (const void *v1, const void *v2)
1208 {
1209 const struct endpoint_s *e1 = *((const struct endpoint_s * const *) v1) ;
1210 const struct endpoint_s *e2 = *((const struct endpoint_s * const *) v2) ;
1211 double e1Hit = e1->selectHits ;
1212 double e2Hit = e2->selectHits ;
1213
1214 if (e1 == mainEndPoint)
1215 return -1 ;
1216 else if (e2 == mainEndPoint)
1217 return 1 ;
1218 else if (e1Hit < e2Hit)
1219 return 1 ;
1220 else if (e1Hit > e2Hit)
1221 return -1 ;
1222
1223 return 0 ;
1224 }
1225
1226
1227
1228 /* We maintain the endpoints in order of the percent times they're ready
1229 for read/write when they've been selected. This helps us favour the more
1230 active endpoints. */
reorderPriorityList(void)1231 static void reorderPriorityList (void)
1232 {
1233 unsigned int i, j ;
1234 static int thisTime = 4;
1235
1236 /* only sort every 4th time since it's so expensive */
1237 if (--thisTime > 0)
1238 return ;
1239
1240 thisTime = 4;
1241
1242 for (i = j = 0; i < priorityCount; i++)
1243 if (priorityList [i] != NULL)
1244 {
1245 if (i != j)
1246 priorityList [j] = priorityList [i] ;
1247 j++ ;
1248 }
1249
1250 for (i = j; i < priorityCount; i++)
1251 priorityList [ i ] = NULL;
1252
1253 priorityCount = j;
1254
1255 qsort (priorityList, (size_t)priorityCount, sizeof (EndPoint), &hitCompare);
1256 }
1257
1258
1259 #define TIMEOUT_POOL_SIZE ((4096 - 2 * (sizeof (void *))) / (sizeof (TimerElemStruct)))
1260
1261 /* create a new timeout data structure properly initialized. */
newTimerElem(TimeoutId i,time_t w,EndpTCB f,void * d)1262 static TimerElem newTimerElem (TimeoutId i, time_t w, EndpTCB f, void *d)
1263 {
1264 TimerElem p ;
1265
1266 if (timeoutPool == NULL)
1267 {
1268 unsigned int j ;
1269
1270 timeoutPool = xmalloc (sizeof(TimerElemStruct) * TIMEOUT_POOL_SIZE) ;
1271
1272 for (j = 0; j < TIMEOUT_POOL_SIZE - 1; j++)
1273 timeoutPool[j] . next = &(timeoutPool [j + 1]) ;
1274 timeoutPool [TIMEOUT_POOL_SIZE-1] . next = NULL ;
1275 }
1276
1277 p = timeoutPool ;
1278 timeoutPool = timeoutPool->next ;
1279
1280 ASSERT (p != NULL) ;
1281
1282 p->id = i ;
1283 p->when = w ;
1284 p->func = f ;
1285 p->data = d ;
1286 p->next = NULL ;
1287
1288 return p ;
1289 }
1290
1291
1292
1293 /* add a new timeout structure to the global list. */
timerElemAdd(time_t when,EndpTCB func,void * data)1294 static TimeoutId timerElemAdd (time_t when, EndpTCB func, void *data)
1295 {
1296 TimerElem p = newTimerElem (++nextId ? nextId : ++nextId,when,func,data) ;
1297 TimerElem n = timeoutQueue ;
1298 TimerElem q = NULL ;
1299
1300 while (n != NULL && n->when <= when)
1301 {
1302 q = n ;
1303 n = n->next ;
1304 }
1305
1306 if (n == NULL && q == NULL) /* empty list so put at head */
1307 timeoutQueue = p ;
1308 else if (q == NULL) /* put at head of list */
1309 {
1310 p->next = timeoutQueue ;
1311 timeoutQueue = p ;
1312 }
1313 else if (n == NULL) /* put at end of list */
1314 q->next = p ;
1315 else /* in middle of list */
1316 {
1317 p->next = q->next ;
1318 q->next = p ;
1319 }
1320
1321 timeoutQueueLength++ ;
1322
1323 return p->id ;
1324 }
1325
1326
1327 /* Fills in TOUT with the timeout to use on the next call to
1328 * select. Returns TOUT. If there is no timeout, then returns NULL. If the
1329 * timeout has already passed, then it calls the timeout handling routine
1330 * first.
1331 */
getTimeout(struct timeval * tout)1332 static struct timeval *getTimeout (struct timeval *tout)
1333 {
1334 struct timeval *rval = NULL ;
1335
1336 if (timeoutQueue != NULL)
1337 {
1338 time_t now = theTime() ;
1339
1340 while (timeoutQueue && now > timeoutQueue->when)
1341 doTimeout () ;
1342
1343 if (timeoutQueue != NULL && now == timeoutQueue->when)
1344 {
1345 tout->tv_sec = 0 ;
1346 tout->tv_usec = 0 ;
1347 rval = tout ;
1348 }
1349 else if (timeoutQueue != NULL)
1350 {
1351 tout->tv_sec = timeoutQueue->when - now ;
1352 tout->tv_usec = 0 ;
1353 rval = tout ;
1354 }
1355 }
1356
1357 return rval ;
1358 }
1359
1360
1361
1362
1363
1364
doTimeout(void)1365 static void doTimeout (void)
1366 {
1367 EndpTCB cbk = timeoutQueue->func ;
1368 void *data = timeoutQueue->data ;
1369 TimerElem p = timeoutQueue ;
1370 TimeoutId tid = timeoutQueue->id ;
1371
1372 timeoutQueue = timeoutQueue->next ;
1373
1374 p->next = timeoutPool ;
1375 timeoutPool = p ;
1376
1377 timeoutQueueLength-- ;
1378
1379 if (cbk)
1380 (*cbk) (tid, data) ; /* call the callback function */
1381 }
1382
1383
1384
1385
1386 #if defined (WANT_MAIN)
1387
1388 #define BUFF_SIZE 100
1389
lineIsWritten(EndPoint ep,IoStatus status,Buffer * buffers,void * d UNUSED)1390 static void lineIsWritten (EndPoint ep, IoStatus status, Buffer *buffers, void *d UNUSED)
1391 {
1392 int i ;
1393
1394 if (status == IoDone)
1395 d_printf (1,"LINE was written\n") ;
1396 else
1397 {
1398 int oldErrno = errno ;
1399
1400 errno = endPointErrno (ep) ;
1401 perror ("write failed") ;
1402 errno = oldErrno ;
1403 }
1404
1405 for (i = 0 ; buffers && buffers[i] ; i++) {
1406 delBuffer (buffers[i]);
1407 }
1408 }
1409
lineIsRead(EndPoint myEp,IoStatus status,Buffer * buffers,void * d UNUSED)1410 static void lineIsRead (EndPoint myEp, IoStatus status, Buffer *buffers, void *d UNUSED)
1411 {
1412 Buffer *writeBuffers, *readBuffers ;
1413 Buffer newBuff1, newBuff2 ;
1414 Buffer newInputBuffer ;
1415 char *data, *p ;
1416 size_t len ;
1417
1418 if (status == IoFailed)
1419 {
1420 int oldErrno = errno ;
1421
1422 errno = endPointErrno (myEp) ;
1423 perror ("read failed") ;
1424 errno = oldErrno ;
1425
1426 return ;
1427 }
1428 else if (status == IoEOF)
1429 {
1430 d_printf (1,"EOF on endpoint.\n") ;
1431 delEndPoint (myEp) ;
1432
1433 return ;
1434 }
1435
1436
1437 data = bufferBase (buffers[0]);
1438 len = bufferDataSize (buffers[0]);
1439
1440 if (data [len - 1] == '\r' || data [len - 1] == '\n')
1441 bufferDecrDataSize (buffers[0], 1);
1442 if (data [len - 1] == '\r' || data [len - 1] == '\n')
1443 bufferDecrDataSize (buffers[0], 1);
1444
1445 data [len] = '\0' ;
1446
1447 d_printf (1,"Got a line: %s\n", data) ;
1448
1449 newBuff1 = newBuffer (len + 50) ;
1450 newBuff2 = newBuffer (len + 50) ;
1451 newInputBuffer = newBuffer (BUFF_SIZE) ;
1452
1453 p = bufferBase (newBuff1) ;
1454 strlcpy (p, "Thanks for that \"", bufferSize (newBuff1)) ;
1455 bufferSetDataSize (newBuff1,strlen (p)) ;
1456
1457 p = bufferBase (newBuff2) ;
1458 strlcpy (p,"\" very tasty\n", bufferSize (newBuff2)) ;
1459 bufferSetDataSize (newBuff2,strlen (p)) ;
1460
1461 writeBuffers = makeBufferArray (newBuff1, buffers[0], newBuff2, NULL);
1462 readBuffers = makeBufferArray (newInputBuffer,NULL) ;
1463
1464 prepareWrite(myEp, writeBuffers, NULL, lineIsWritten, NULL);
1465 prepareRead(myEp, readBuffers, lineIsRead, NULL, 1);
1466 }
1467
1468
printDate(TimeoutId tid,void * data)1469 static void printDate (TimeoutId tid, void *data)
1470 {
1471 char dateString[30];
1472 const time_t t = theTime() ;
1473
1474 timeToString (t, dateString, sizeof (dateString)) ;
1475 d_printf (1,"Timeout (%d) time now is %ld %s\n",
1476 (int) tid, (long) t, dateString) ;
1477
1478 if (timeoutQueue == NULL)
1479 {
1480 int ti = (rand () % 10) + 1 ;
1481
1482 prepareSleep (printDate,ti,data) ;
1483 }
1484 }
1485
1486 TimeoutId rm ;
1487
Timeout(TimeoutId tid,void * data)1488 static void Timeout (TimeoutId tid, void *data)
1489 {
1490 static int seeded ;
1491 static int howMany ;
1492 static int i ;
1493 char dateString[30];
1494 const time_t t = theTime() ;
1495
1496 if ( !seeded )
1497 {
1498 srand (t) ;
1499 seeded = 1 ;
1500 }
1501
1502 timeToString (t, dateString, sizeof (dateString)) ;
1503 d_printf (1,"Timeout (%d) time now is %ld %s\n",
1504 (int) tid, (long) t, dateString) ;
1505
1506 if (timeoutQueue != NULL && timeoutQueue->next != NULL)
1507 d_printf (1,"%s timeout id %d\n",
1508 (removeTimeout (rm) ? "REMOVED" : "FAILED TO REMOVE"), rm) ;
1509 rm = 0 ;
1510
1511 howMany = (rand() % 10) + (timeoutQueue == NULL ? 1 : 0) ;
1512
1513 for (i = 0 ; i < howMany ; i++ )
1514 {
1515 TimeoutId id ;
1516 int count = (rand () % 30) + 1 ;
1517
1518 id = (i % 2 == 0 ? prepareSleep (Timeout,count,data)
1519 : prepareWake (Timeout,t + count,data)) ;
1520
1521 if (rm == 0)
1522 rm = id ;
1523 }
1524 }
1525
1526
newConn(EndPoint ep,IoStatus status UNUSED,Buffer * buffers UNUSED,void * d UNUSED)1527 static void newConn (EndPoint ep, IoStatus status UNUSED, Buffer *buffers UNUSED, void *d UNUSED)
1528 {
1529 EndPoint newEp ;
1530 struct sockaddr_in in ;
1531 Buffer *readBuffers ;
1532 Buffer newBuff = newBuffer (BUFF_SIZE) ;
1533 socklen_t len = sizeof (in);
1534 int fd ;
1535
1536 memset (&in, 0, sizeof (in)) ;
1537
1538 fd = accept (ep->myFd, (struct sockaddr *) &in, &len) ;
1539
1540 if (fd < 0)
1541 {
1542 perror ("::accept") ;
1543 return ;
1544 }
1545
1546 newEp = newEndPoint (fd) ;
1547
1548 prepareRead(ep, NULL, newConn, NULL, 0);
1549
1550 readBuffers = makeBufferArray (newBuff,NULL) ;
1551
1552 prepareRead(newEp, readBuffers, lineIsRead, NULL, 1);
1553
1554 d_printf (1,"Set up a new connection\n");
1555 }
1556
1557
main(int argc,char ** argv)1558 int main (int argc, char **argv)
1559 {
1560 EndPoint accConn ;
1561 struct sockaddr_in accNet ;
1562 int accFd = socket (AF_INET,SOCK_STREAM,0) ;
1563 unsigned short port = atoi (argc > 1 ? argv[1] : "10000") ;
1564 time_t t = theTime() ;
1565
1566
1567 char * program = strrchr (argv[0],'/') ;
1568
1569 if (!program)
1570 program = argv [0] ;
1571 else
1572 program++ ;
1573
1574 ASSERT (accFd >= 0) ;
1575
1576 memset (&accNet,0,sizeof (accNet)) ;
1577 accNet.sin_family = AF_INET ;
1578 accNet.sin_addr.s_addr = htonl (INADDR_ANY) ;
1579 accNet.sin_port = htons (port) ;
1580
1581 #ifdef LOG_PERROR
1582 openlog (program, LOG_PERROR | LOG_PID, LOG_NEWS) ;
1583 #else
1584 openlog (program, LOG_PID, LOG_NEWS) ;
1585 #endif
1586
1587 if (bind (accFd, (struct sockaddr *) &accNet, sizeof (accNet)) < 0)
1588 {
1589 perror ("bind: %m") ;
1590 exit (1) ;
1591 }
1592
1593 listen (accFd,5) ;
1594
1595 accConn = newEndPoint (accFd) ;
1596
1597 prepareRead(accConn, NULL, newConn, NULL, 0);
1598
1599 prepareSleep (Timeout,5,(void *) 0x10) ;
1600
1601 t = theTime() ;
1602 d_printf (1,"Time now is %s",ctime(&t)) ;
1603
1604 prepareWake (printDate,t + 16,NULL) ;
1605
1606 Run () ;
1607
1608 return 0;
1609 }
1610 #endif /* WANT_MAIN */
1611
1612 /* Probably doesn't do the right thing for SIGCHLD */
setSigHandler(int signum,void (* ptr)(int))1613 void setSigHandler (int signum, void (*ptr)(int))
1614 {
1615 unsigned int i ;
1616
1617 if (sigHandlers == NULL)
1618 {
1619 sigHandlers = xmalloc (sizeof(sigfn) * NSIG) ;
1620 sigFlags = xmalloc (sizeof(sig_atomic_t) * NSIG) ;
1621 for (i = 0 ; i < NSIG ; i++)
1622 {
1623 sigHandlers [i] = NULL ;
1624 sigFlags [i] = 0 ;
1625 }
1626 }
1627
1628 if (signum >= NSIG)
1629 {
1630 syslog (LOG_ERR,"ME signal number bigger than NSIG: %d vs %d",
1631 signum,NSIG) ;
1632 return ;
1633 }
1634
1635 if (xsignal (signum, signalHandler) == SIG_ERR)
1636 die ("signal failed: %s", strerror(errno)) ;
1637
1638 sigHandlers[signum] = ptr ;
1639 }
1640
handleSignals(void)1641 static void handleSignals (void)
1642 {
1643 int i ;
1644
1645 for (i = 1; i < NSIG; i++)
1646 {
1647 if (sigFlags[i])
1648 {
1649 #if defined(HAVE_SIGACTION)
1650 sigset_t set, oset ;
1651
1652 if (sigemptyset (&set) != 0 || sigaddset (&set, i) != 0)
1653 die ("sigemptyset or sigaddset failed") ;
1654 if (sigprocmask (SIG_BLOCK, &set, &oset) != 0)
1655 die ("sigprocmask failed: %s", strerror(errno)) ;
1656 #else
1657 /* hope for the best */
1658 #endif
1659
1660 sigFlags[i] = 0;
1661
1662 if (sigHandlers[i] != NULL &&
1663 sigHandlers[i] != SIG_IGN &&
1664 sigHandlers[i] != SIG_DFL)
1665 (sigHandlers[i])(i) ;
1666
1667 #if defined(HAVE_SIGACTION)
1668 if (sigprocmask (SIG_SETMASK, &oset, (sigset_t *)NULL) != 0)
1669 die ("sigprocmask failed: %s", strerror(errno)) ;
1670 #else
1671 /* hope for the best */
1672 #endif
1673 }
1674 }
1675 }
1676
1677
endpointConfigLoadCbk(void * data)1678 int endpointConfigLoadCbk (void *data)
1679 {
1680 FILE *fp = (FILE *) data ;
1681 long ival ;
1682 int rval = 1 ;
1683
1684 if (getInteger (topScope,"stdio-fdmax",&ival,NO_INHERIT))
1685 {
1686 stdioFdMax = ival ;
1687
1688 #if ! defined (FD_SETSIZE)
1689
1690 if (stdioFdMax > 0)
1691 {
1692 logOrPrint (LOG_ERR,fp,NO_STDIO_FDMAX) ;
1693 stdioFdMax = 0 ;
1694 rval = 0 ;
1695 }
1696
1697 #else
1698
1699 if (stdioFdMax > FD_SETSIZE)
1700 {
1701 logOrPrint (LOG_ERR,fp,
1702 "ME config: value of %s (%ld) in %s is higher"
1703 " than maximum of %ld. Using %ld","stdio-fdmax",
1704 ival,"global scope",
1705 (long) FD_SETSIZE, (long) FD_SETSIZE) ;
1706 stdioFdMax = FD_SETSIZE ;
1707 rval = 0 ;
1708 }
1709
1710 #endif
1711
1712 }
1713 else
1714 stdioFdMax = 0 ;
1715
1716 return rval ;
1717 }
1718
1719
1720
1721 #if 0
1722 /* definitely not the fastest, but the most portable way to find the first
1723 set bit in a mask */
1724 static int ff_set (fd_set *set,unsigned int start)
1725 {
1726 unsigned int i ;
1727
1728 for (i = start ; i < FD_SETSIZE ; i++)
1729 if (FD_ISSET (i,set))
1730 return (int) i ;
1731
1732 return -1 ;
1733 }
1734
1735
1736 static int ff_free (fd_set *set, unsigned int start)
1737 {
1738 unsigned int i ;
1739
1740 for (i = start ; i < FD_SETSIZE ; i++)
1741 if (!FD_ISSET (i,set))
1742 return i ;
1743
1744
1745 return -1 ;
1746 }
1747 #endif
1748
1749
endpointCleanup(void)1750 static void endpointCleanup (void)
1751 {
1752 free (endPoints) ;
1753 free (priorityList) ;
1754 free (sigHandlers) ;
1755 endPoints = NULL ;
1756 priorityList = NULL ;
1757 sigHandlers = NULL ;
1758 }
1759