1 /*  $Id: endpoint.c 9895 2015-06-14 10:08:21Z iulius $
2 **
3 **  The implementation of the innfeed EndPoint object class.
4 **
5 **  Written by James Brister <brister@vix.com>
6 **
7 **  The EndPoint class is what gives the illusion (sort of, kind of) of
8 **  threading.  Basically it controls a select loop and a set of EndPoint
9 **  objects.  Each EndPoint has a file descriptor it is interested in.  The
10 **  users of the EndPoint tell the EndPoints to notify them when a read or
11 **  write has been completed (or simple if the file descriptor is read or
12 **  write ready).
13 */
14 
15 #include "innfeed.h"
16 #include "config.h"
17 #include "clibrary.h"
18 #include "portable/socket.h"
19 
20 #include <assert.h>
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <signal.h>
24 #include <sys/stat.h>
25 #include <sys/uio.h>
26 #include <syslog.h>
27 
28 #ifdef HAVE_LIMITS_H
29 # include <limits.h>
30 #endif
31 
32 #ifdef HAVE_SYS_SELECT_H
33 # include <sys/select.h>
34 #endif
35 
36 #ifdef HAVE_SYS_TIME_H
37 # include <sys/time.h>
38 #endif
39 #include <time.h>
40 
41 #include "inn/innconf.h"
42 #include "inn/messages.h"
43 #include "inn/libinn.h"
44 
45 #include "buffer.h"
46 #include "configfile.h"
47 #include "endpoint.h"
48 #include "host.h"
49 
50 static const char *const timer_name[] = {
51   "idle", "blstats", "stsfile", "newart", "readart", "prepart", "read",
52   "write", "cb"
53 };
54 
55 #if ! defined (NSIG)
56 #define NSIG 32
57 #endif
58 
59 
60   /* This is the structure that is the EndPoint */
61 struct endpoint_s
62 {
63       /* fields for managing multiple reads into the inBuffer. */
64     Buffer *inBuffer ;          /* list of buffers to read into */
65     unsigned int inBufferIdx ;         /* where is list we're at. */
66     size_t inIndex ;            /* where in current read we're at */
67     size_t inMinLen ;           /* minimum amount to read */
68     size_t inAmtRead ;          /* amount read so far */
69     EndpRWCB inCbk ;            /* callback for when read complete */
70     void *inClientData ;        /* callback data */
71 
72       /* fields for managing multiple writes from the outBuffer */
73     Buffer *outBuffer ;         /* list of buffers to write */
74     unsigned int outBufferIdx ;        /* index into buffer list to start write */
75     size_t outIndex ;           /* where in current buffer we write from */
76     size_t outSize ;            /* total of all the buffers */
77     size_t outAmtWritten ;      /* amount written so far */
78     EndpRWCB outProgressCbk ;   /* callback when done */
79     EndpRWCB outDoneCbk ;       /* callback when done */
80     void *outClientData ;       /* callback data */
81 
82     EndpWorkCbk workCbk ;       /* callback to run if no I/O to do */
83     void *workData ;            /* data for callback */
84 
85     int myFd ;                  /* the file descriptor we're handling */
86     int myErrno ;               /* the errno when I/O fails */
87 
88     double selectHits ;		/* indicates how often it's ready */
89 };
90 
91 
92 
93   /* A private structure. These hold the information on the timer callbacks. */
94 typedef struct timerqelem_s
95 {
96     TimeoutId id ;              /* the id we gave out */
97     time_t when ;               /* The time the timer should go off */
98     EndpTCB func ;              /* the function to call */
99     void *data ;                /* the client callback data */
100     struct timerqelem_s *next ; /* next in the queue */
101 } *TimerElem, TimerElemStruct ;
102 
103 
104 
105   /* set to 1 elsewhere if you want stderr to get what's also being written
106      in doWrite. */
107 int debugWrites ;
108 
109 extern const char *InputFile ;
110 
111 static EndPoint mainEndPoint ;
112 static bool mainEpIsReg = false ;
113 unsigned int stdioFdMax = MAX_STDIO_FD ;
114 
115 time_t  PrivateTime;
116 
117 
118 typedef void (*sigfn) (int) ;
119 static sigfn *sigHandlers ;
120 
121 static volatile sig_atomic_t *sigFlags ;
122 
123 
124 
125   /* private functions */
126 static IoStatus doRead (EndPoint endp) ;
127 static IoStatus doWrite (EndPoint endp) ;
128 static IoStatus doExcept (EndPoint endp) ;
129 static void pipeHandler (int s) ;
130 static void signalHandler (int s) ;
131 static int hitCompare (const void *v1, const void *v2) ;
132 static void reorderPriorityList (void) ;
133 static TimerElem newTimerElem (TimeoutId i, time_t w, EndpTCB f, void *d) ;
134 static TimeoutId timerElemAdd (time_t when, EndpTCB func, void *data) ;
135 static struct timeval *getTimeout (struct timeval *tout) ;
136 static void doTimeout (void) ;
137 static void handleSignals (void) ;
138 
139 #if 0
140 static int ff_set (fd_set *set, unsigned int start) ;
141 static int ff_free (fd_set *set, unsigned int start) ;
142 #endif
143 static void endpointCleanup (void) ;
144 
145 
146   /* Private data */
147 static size_t maxEndPoints ;
148 
149 static EndPoint *endPoints ;    /* endpoints indexed on fd */
150 static EndPoint *priorityList ; /* endpoints indexed on priority */
151 
152 static int absHighestFd = 0 ;       /* never goes down */
153 static int highestFd = -1 ;
154 static unsigned int endPointCount = 0 ;
155 static unsigned int priorityCount = 0 ;
156 
157 static fd_set rdSet ;
158 static fd_set wrSet ;
159 static fd_set exSet ;
160 
161 static int keepSelecting ;
162 
163 static TimerElem timeoutQueue ;
164 static TimerElem timeoutPool ;
165 static TimeoutId nextId ;
166 static int timeoutQueueLength ;
167 
168 
169 
170 
171   /* Create a new EndPoint and hook it to the give file descriptor. All
172      fields are initialized to appropriate values.  On the first time this
173      function is called the global data structs that manages lists of
174      endpoints are intialized. */
175 static bool inited = false ;
176 
newEndPoint(int fd)177 EndPoint newEndPoint (int fd)
178 {
179   EndPoint ep ;
180 
181   if (!inited)
182     {
183       inited = true ;
184       atexit (endpointCleanup) ;
185     }
186 
187   if (fd < 0)
188     return NULL ;
189 
190   /* try to dup the fd to a larger number to leave lower values free for
191      broken stdio implementations. */
192   if (stdioFdMax > 0 && ((unsigned int) fd) <= stdioFdMax)
193     {
194       int newfd = fcntl(fd, F_DUPFD, stdioFdMax + 1);
195       if (newfd >= 0)
196         {
197           d_printf (1,"Dupped fd %d to %d\n",fd,newfd) ;
198           if (close (fd) != 0)
199             syswarn ("ME oserr close (%d)", fd) ;
200         }
201       else
202         {
203           d_printf (1,"Couldn't dup fd %d to above %d\n",fd,stdioFdMax) ;
204           newfd = fd ;
205         }
206 
207       fd = newfd ;
208     }
209 
210   if ((unsigned int) fd >= maxEndPoints)
211     {
212       unsigned int i = maxEndPoints ;
213 
214       maxEndPoints = (((fd + 256) / 256) * 256); /* round up to nearest 256 */
215       if (endPoints == NULL)
216         {
217           endPoints = xmalloc (sizeof(EndPoint) * maxEndPoints) ;
218           priorityList = xmalloc (sizeof(EndPoint) * maxEndPoints) ;
219         }
220       else
221         {
222           endPoints = xrealloc (endPoints,sizeof(EndPoint) * maxEndPoints) ;
223           priorityList = xrealloc (priorityList,
224                                    sizeof(EndPoint) * maxEndPoints) ;
225         }
226 
227       for ( ; i < maxEndPoints ; i++)
228         endPoints [i] = priorityList [i] = NULL ;
229     }
230 
231   ASSERT (endPoints [fd] == NULL) ;
232 
233   if (fd > absHighestFd)
234     {
235 
236 #if defined (FD_SETSIZE)
237       if ((unsigned int) fd >= FD_SETSIZE)
238         {
239           warn ("ME fd (%d) looks too big (%d -- FD_SETSIZE)", fd,
240                 FD_SETSIZE) ;
241           return NULL ;
242         }
243 #else
244       if (fd > (sizeof (fd_set) * CHAR_BIT))
245         {
246           warn ("ME fd (%d) looks too big (%d -- sizeof (fd_set) * CHAR_BIT)",
247                 fd, (sizeof (fd_set) * CHAR_BIT)) ;
248           return NULL ;
249         }
250 #endif
251 
252       absHighestFd = fd ;
253     }
254 
255   ep = xcalloc (1, sizeof(struct endpoint_s)) ;
256 
257   ep->inBuffer = NULL ;
258   ep->inBufferIdx = 0 ;
259   ep->inIndex = 0 ;
260   ep->inMinLen = 0 ;
261   ep->inAmtRead = 0 ;
262   ep->inCbk = NULL ;
263   ep->inClientData = NULL ;
264 
265   ep->outBuffer = 0 ;
266   ep->outBufferIdx = 0 ;
267   ep->outIndex = 0 ;
268   ep->outSize = 0 ;
269   ep->outProgressCbk = NULL ;
270   ep->outDoneCbk = NULL ;
271   ep->outClientData = NULL ;
272   ep->outAmtWritten = 0 ;
273 
274   ep->workCbk = NULL ;
275   ep->workData = NULL ;
276 
277   ep->myFd = fd ;
278   ep->myErrno = 0 ;
279 
280   ep->selectHits = 0.0 ;
281 
282   endPoints [fd] = ep ;
283   priorityList [priorityCount++] = ep ;
284   endPointCount++ ;
285 
286   highestFd = (fd > highestFd ? fd : highestFd) ;
287 
288   return ep ;
289 }
290 
291 
292 
293 /* Delete the given endpoint. The files descriptor is closed and the two
294    Buffer arrays are released. */
295 
delEndPoint(EndPoint ep)296 void delEndPoint (EndPoint ep)
297 {
298   unsigned int idx ;
299 
300   if (ep == NULL)
301     return ;
302 
303   ASSERT (endPoints [ep->myFd] == ep) ;
304 
305   if (mainEndPoint == ep)
306     mainEndPoint = NULL ;
307 
308   if (ep->inBuffer != NULL)
309     freeBufferArray (ep->inBuffer) ;
310 
311   if (ep->outBuffer != NULL)
312     freeBufferArray (ep->outBuffer) ;
313 
314   close (ep->myFd) ;
315 
316   /* remove from selectable bits */
317   FD_CLR (ep->myFd,&rdSet) ;
318   FD_CLR (ep->myFd,&wrSet) ;
319   FD_CLR (ep->myFd,&exSet) ;
320 
321   /* Adjust the global arrays to account for deleted endpoint. */
322   endPoints [ep->myFd] = NULL ;
323   if (ep->myFd == highestFd)
324     while (highestFd >= 0 && endPoints [highestFd] == NULL)
325       highestFd-- ;
326 
327   for (idx = 0 ; idx < priorityCount ; idx++)
328     if (priorityList [idx] == ep)
329       break ;
330 
331   ASSERT (idx < priorityCount) ; /* i.e. was found */
332   ASSERT (priorityList [idx] == ep) ; /* redundant */
333 
334   /* this hole will removed in the reorder routine */
335   priorityList [idx] = NULL ;
336 
337   endPointCount-- ;
338 
339   free (ep) ;
340 }
341 
endPointFd(EndPoint endp)342 int endPointFd (EndPoint endp)
343 {
344   ASSERT (endp != NULL) ;
345 
346   return endp->myFd ;
347 }
348 
349 
350 
351 
352 /* Request a read to be done next time there's data. The endpoint ENDP
353  * is what will do the read. BUFFERS is the array of Buffers the data
354  * should go into. FUNC is the callback function to call when the read
355  * is complete. CLIENTDATA is the client data to pass back into the
356  * callback function. MINLEN is the minimum amount of data to
357  * read. If MINLEN is 0 then then BUFFERS must be filled, otherwise at
358  * least MINLEN bytes must be read.
359  *
360  * BUFFERS can be NULL, in which case no read is actually done, but the
361  * callback function will be called still. This is useful for
362  * listening sockets.
363  *
364  * Returns 0 if the read couldn't be prepared (for example if a read
365  * is already outstanding).
366  */
367 
prepareRead(EndPoint endp,Buffer * buffers,EndpRWCB func,void * clientData,int minlen)368 int prepareRead (EndPoint endp,
369                  Buffer *buffers,
370                  EndpRWCB func,
371                  void *clientData,
372                  int minlen)
373 {
374   int bufferSizeTotal = 0 ;
375   int idx ;
376 
377   ASSERT (endp != NULL) ;
378 
379   if (endp->inBuffer != NULL || FD_ISSET (endp->myFd,&rdSet))
380     return 0 ;                  /* something already there */
381 
382   for (idx = 0 ; buffers != NULL && buffers [idx] != NULL ; idx++)
383     {
384       size_t bs = bufferSize (buffers [idx]) ;
385       size_t bds = bufferDataSize (buffers [idx]) ;
386 
387       bufferSizeTotal += (bs - bds) ;
388     }
389 
390   endp->inBuffer = buffers ;
391   endp->inBufferIdx = 0 ;
392   endp->inIndex = 0 ;
393   endp->inMinLen = (minlen > 0 ? minlen : bufferSizeTotal) ;
394   endp->inCbk = func ;
395   endp->inAmtRead = 0 ;
396   endp->inClientData = clientData ;
397 
398   FD_SET (endp->myFd, &rdSet) ;
399   if ( InputFile == NULL )
400     FD_SET (endp->myFd, &exSet) ;
401 
402   return 1 ;
403 }
404 
405 
406 
407 /* Request a write to be done at a later point. ENDP is the EndPoint to
408  * do the write. BUFFERS is the array of Buffers to write from. FUNC is
409  * the function to call when the write is complete. CLIENTDATA is some
410  * data to hand back to the callback function.
411  *
412  * If BUFFERS is NULL, then no write will actually by done, but the
413  * callback function will still be called. This is useful for
414  * connecting sockets.
415  *
416  * Returns 0 if the write couldn't be prepared (like if a write is
417  * still in process.
418  */
prepareWrite(EndPoint endp,Buffer * buffers,EndpRWCB progress,EndpRWCB done,void * clientData)419 int prepareWrite (EndPoint endp,
420                   Buffer *buffers,
421                   EndpRWCB progress,
422                   EndpRWCB done,
423                   void *clientData)
424 {
425   int bufferSizeTotal = 0 ;
426   int idx ;
427 
428   ASSERT (endp != NULL) ;
429 
430   if (endp->outBuffer != NULL || FD_ISSET (endp->myFd,&wrSet))
431     return 0 ;                  /* something already there */
432 
433   for (idx = 0 ; buffers != NULL && buffers [idx] != NULL ; idx++)
434     bufferSizeTotal += bufferDataSize (buffers [idx]) ;
435 
436   endp->outBuffer = buffers ;
437   endp->outBufferIdx = 0 ;
438   endp->outIndex = 0 ;
439   endp->outProgressCbk = progress ;
440   endp->outDoneCbk = done ;
441   endp->outClientData = clientData ;
442   endp->outSize = bufferSizeTotal ;
443   endp->outAmtWritten = 0 ;
444 
445   FD_SET (endp->myFd, &wrSet) ;
446   FD_SET (endp->myFd, &exSet) ;
447 
448   return 1 ;
449 }
450 
451 
452 /* Cancel the pending read. */
cancelRead(EndPoint endp)453 void cancelRead (EndPoint endp)
454 {
455   FD_CLR (endp->myFd,&rdSet) ;
456   if (!FD_ISSET (endp->myFd, &wrSet))
457     FD_CLR (endp->myFd,&exSet) ;
458 
459   freeBufferArray (endp->inBuffer) ;
460 
461   endp->inBuffer = NULL ;
462   endp->inBufferIdx = 0 ;
463   endp->inIndex = 0 ;
464   endp->inMinLen = 0 ;
465   endp->inAmtRead = 0 ;
466   endp->inCbk = NULL ;
467   endp->inClientData = NULL ;
468 }
469 
470 
471 /* cancel all pending writes. The first len bytes of the queued write are
472   copied to buffer. The number of bytes copied (if it is less than *len) is
473   copied to len. If no write was outstanding then len will have 0 stored in
474   it. */
cancelWrite(EndPoint endp,char * buffer UNUSED,size_t * len UNUSED)475 void cancelWrite (EndPoint endp, char *buffer UNUSED, size_t *len UNUSED)
476 {
477   FD_CLR (endp->myFd, &wrSet) ;
478   if (!FD_ISSET (endp->myFd, &rdSet))
479     FD_CLR (endp->myFd, &exSet) ;
480 
481 #if 0
482 #error XXX need to copy data to buffer and *len
483 #endif
484 
485   freeBufferArray (endp->outBuffer) ;
486 
487   endp->outBuffer = NULL ;
488   endp->outBufferIdx = 0 ;
489   endp->outIndex = 0 ;
490   endp->outProgressCbk = NULL ;
491   endp->outDoneCbk = NULL ;
492   endp->outClientData = NULL ;
493   endp->outSize = 0 ;
494   endp->outAmtWritten = 0 ;
495 }
496 
497 /* queue up a new timeout request. to go off at a specific time. */
prepareWake(EndpTCB func,time_t timeToWake,void * clientData)498 TimeoutId prepareWake (EndpTCB func, time_t timeToWake, void *clientData)
499 {
500   TimeoutId id ;
501   time_t now = theTime() ;
502 
503   if (now > timeToWake)
504     return 0 ;
505 
506   id = timerElemAdd (timeToWake,func,clientData) ;
507 
508 #if 0
509   d_printf (1, "Preparing wake %d at date %ld for %ld seconds\n",
510            (int) id, (long) now, (long) (timeToWake - now)) ;
511 #endif
512 
513   return id ;
514 }
515 
516 
517 /* queue up a new timeout request to off TIMETOSLEEP seconds from now */
prepareSleep(EndpTCB func,int timeToSleep,void * clientData)518 TimeoutId prepareSleep (EndpTCB func, int timeToSleep, void *clientData)
519 {
520   time_t now = theTime() ;
521   TimeoutId id ;
522 
523   id = timerElemAdd (now + timeToSleep,func,clientData) ;
524 
525 #if 0
526   d_printf (1, "Preparing sleep %d at date %ld for %ld seconds\n",
527            (int) id, (long) now, (long) timeToSleep) ;
528 #endif
529 
530   return id ;
531 }
532 
533 
534 /* Updates an existing timeout request to go off TIMETOSLEEP seconds from
535    now, or queues a new request.  Always returns a new ID. */
updateSleep(TimeoutId tid,EndpTCB func,int timeToSleep,void * clientData)536 TimeoutId updateSleep (TimeoutId tid, EndpTCB func, int timeToSleep,
537                        void *clientData)
538 {
539   if (tid == 0)
540     return prepareSleep (func, timeToSleep, clientData) ;
541   else
542     {
543       /* XXX - quick and dirty but CPU wasteful implementation */
544       removeTimeout (tid) ;
545       return prepareSleep (func, timeToSleep, clientData) ;
546     }
547 }
548 
549 
550 /* Remove a timeout that was previously prepared. 0 is a legal value that
551    is just ignored. */
removeTimeout(TimeoutId tid)552 bool removeTimeout (TimeoutId tid)
553 {
554   TimerElem n = timeoutQueue ;
555   TimerElem p = NULL ;
556 
557   if (tid == 0)
558     return true ;
559 
560   while (n != NULL && n->id != tid)
561     {
562       p = n ;
563       n = n->next ;
564     }
565 
566   if (n == NULL)
567     return false ;
568 
569   if (p == NULL)                /* at the head. */
570     timeoutQueue = n->next ;
571   else
572     p->next = n->next ;
573 
574   n->next = timeoutPool ;
575   timeoutPool = n ;
576 
577   timeoutQueueLength-- ;
578 
579   return true ;
580 }
581 
582 
583 /* The main routine. This is a near-infinite loop that drives the whole
584    program. */
Run(void)585 void Run (void)
586 {
587   fd_set rSet ;
588   fd_set wSet ;
589   fd_set eSet ;
590 
591   keepSelecting = 1 ;
592   xsignal (SIGPIPE, pipeHandler) ;
593 
594   while (keepSelecting)
595     {
596       struct timeval timeout ;
597       struct timeval *twait ;
598       int sval ;
599       unsigned int idx ;
600       bool modifiedTime = false ;
601 
602       twait = getTimeout (&timeout) ;
603 
604       memcpy (&rSet,&rdSet,sizeof (rdSet)) ;
605       memcpy (&wSet,&wrSet,sizeof (wrSet)) ;
606       memcpy (&eSet,&exSet,sizeof (exSet)) ;
607 
608       if (highestFd < 0 && twait == NULL) /* no fds and no timeout */
609         break ;
610       else if (twait != NULL && (twait->tv_sec != 0 || twait->tv_usec != 0))
611         {
612             /* if we have any workprocs registered we poll rather than
613                block on the fds */
614           for (idx = 0 ; idx < priorityCount ; idx++)
615             if (priorityList [idx] != NULL &&
616                 priorityList [idx]->workCbk != NULL)
617               {
618                 modifiedTime = true ;
619                 twait->tv_sec = 0 ;
620                 twait->tv_usec = 0 ;
621 
622                 break ;
623               }
624         }
625 
626       /* calculate host backlog statistics */
627       TMRstart(TMR_BACKLOGSTATS);
628       gCalcHostBlStat ();
629       TMRstop(TMR_BACKLOGSTATS);
630 
631       TMRstart(TMR_IDLE);
632       sval = select (highestFd + 1, &rSet, &wSet, &eSet, twait) ;
633       TMRstop(TMR_IDLE);
634 
635       timePasses () ;
636       if (innconf->timer != 0 && TMRnow() > innconf->timer * 1000) {
637           TMRsummary ("ME", timer_name);
638       }
639 
640       if (sval == 0 && twait == NULL)
641         die ("No fd's ready and no timeouts") ;
642       else if (sval < 0 && errno == EINTR)
643         {
644 	  handleSignals () ;
645         }
646       else if (sval < 0)
647         {
648           syswarn ("ME exception: select failed: %d", sval) ;
649           stopRun () ;
650         }
651       else if (sval > 0)
652         {
653           IoStatus rval ;
654           int readyCount = sval ;
655           int endpointsServiced = 1 ;
656 
657           handleSignals() ;
658 
659           for (idx = 0 ; idx < priorityCount ; idx++)
660             {
661               EndPoint ep = priorityList [idx] ;
662               bool specialCheck = false ;
663 
664               if (readyCount > 0 && ep != NULL)
665                 {
666                   int fd = ep->myFd ;
667                   int selectHit = 0, readMiss = 0, writeMiss = 0 ;
668 
669                   /* Every SELECT_RATIO times we service an endpoint in this
670                      loop we check to see if the mainEndPoint fd is ready to
671                      read or write. If so we process it and do the current
672                      endpoint next time around. */
673                   if (((endpointsServiced % (SELECT_RATIO + 1)) == 0) &&
674                       ep != mainEndPoint && mainEndPoint != NULL &&
675                       !mainEpIsReg)
676                     {
677                       fd_set trSet, twSet ;
678                       struct timeval tw ;
679                       int checkRead = FD_ISSET (mainEndPoint->myFd,&rdSet) ;
680                       int checkWrite = FD_ISSET (mainEndPoint->myFd,&wrSet) ;
681 
682                       endpointsServiced++;
683 
684                       if (checkRead || checkWrite)
685                         {
686                           fd = mainEndPoint->myFd ;
687 
688                           tw.tv_sec = tw.tv_usec = 0 ;
689                           memset (&trSet,0,sizeof (trSet)) ;
690                           memset (&twSet,0,sizeof (twSet)) ;
691 
692                           if (checkRead)
693                             FD_SET (fd,&trSet) ;
694                           if (checkWrite)
695                             FD_SET (fd,&twSet) ;
696 
697                           sval = select (fd + 1,&trSet,&twSet,0,&tw) ;
698 
699                           if (sval > 0)
700                             {
701                               idx-- ;
702                               ep = mainEndPoint ;
703                               specialCheck = true ;
704                               if (checkRead && FD_ISSET (fd,&trSet))
705                                 {
706                                   FD_SET (fd,&rSet) ;
707                                   readyCount++ ;
708                                 }
709                               if (checkWrite && FD_ISSET (fd,&twSet))
710                                 {
711                                   FD_SET (fd,&wSet) ;
712                                   readyCount++ ;
713                                 }
714                             }
715                           else if (sval < 0)
716                             {
717                               syswarn ("ME exception: select failed: %d",
718                                        sval) ;
719                               stopRun () ;
720                               return ;
721                             }
722                           else
723                             fd = ep->myFd ; /* back to original fd. */
724                         }
725                     }
726 
727                   FD_CLR (fd, &exSet) ;
728 
729                   if (FD_ISSET (fd,&rSet))
730                     {
731                       readyCount-- ;
732                       endpointsServiced++ ;
733                       selectHit = 1 ;
734 
735                       if ((rval = doRead (ep)) != IoIncomplete)
736                         {
737                           Buffer *buff = ep->inBuffer ;
738 
739                           FD_CLR (fd, &rdSet) ;
740 
741                           /* incase callback wants to issue read */
742                           ep->inBuffer = NULL ;
743 
744                           if (ep->inCbk != NULL)
745                             (*ep->inCbk) (ep,rval,buff,ep->inClientData) ;
746                           else
747                             freeBufferArray (buff) ;
748                         }
749                       else
750                         {
751                           if ( InputFile == NULL )
752                             FD_SET (ep->myFd, &exSet) ;
753                         }
754                     }
755                   else if (FD_ISSET(fd,&rdSet))
756                     readMiss = 1;
757 
758                   /* get it again as the read callback may have deleted the */
759                   /* endpoint */
760                   if (specialCheck)
761                     ep = mainEndPoint ;
762                   else
763                     ep = priorityList [idx] ;
764 
765                   if (readyCount > 0 && ep != NULL && FD_ISSET (fd,&wSet))
766                     {
767                       readyCount-- ;
768                       endpointsServiced++ ;
769                       selectHit = 1 ;
770 
771                       if ((rval = doWrite (ep)) != IoIncomplete &&
772 			  rval != IoProgress)
773                         {
774                           Buffer *buff = ep->outBuffer ;
775 
776                           FD_CLR (fd, &wrSet) ;
777 
778                           /* incase callback wants to issue a write */
779                           ep->outBuffer = NULL ;
780 
781                           if (ep->outDoneCbk != NULL)
782                             (*ep->outDoneCbk) (ep,rval,buff,ep->outClientData) ;
783                           else
784                             freeBufferArray (buff) ;
785                         }
786                       else if (rval == IoProgress)
787                         {
788                           Buffer *buff = ep->outBuffer ;
789 
790                           if (ep->outProgressCbk != NULL)
791                             (*ep->outProgressCbk) (ep,rval,buff,ep->outClientData) ;
792                         }
793                       else
794                         {
795                           FD_SET (ep->myFd, &exSet) ;
796                         }
797                     }
798                   else if (FD_ISSET(fd,&wrSet))
799                     writeMiss = 1;
800 
801                   /* get it again as the write callback may have deleted the */
802                   /* endpoint */
803                   if (specialCheck)
804                     ep = mainEndPoint ;
805                   else
806                     ep = priorityList [idx] ;
807 
808                   if (ep != NULL)
809                     {
810                       ep->selectHits *= 0.9 ;
811                       if (selectHit)
812                         ep->selectHits += 1.0 ;
813                       else if (readMiss && writeMiss)
814                         ep->selectHits -= 1.0 ;
815                     }
816 
817                   if (readyCount > 0 && ep != NULL && FD_ISSET (fd,&eSet))
818                     doExcept (ep) ;
819                 }
820             }
821 
822           reorderPriorityList () ;
823         }
824       else if (sval == 0 && !modifiedTime)
825         doTimeout () ;
826 
827         /* now we're done processing all read fds and/or the
828            timeout(s). Next we do the work callbacks for all the endpoints
829            whose fds weren't ready. */
830       for (idx = 0 ; idx < priorityCount ; idx++)
831         {
832           EndPoint ep = priorityList [idx] ;
833 
834           if (ep != NULL)
835             {
836               int fd = ep->myFd ;
837 
838               if ( !FD_ISSET (fd,&rSet) && !FD_ISSET (fd,&wSet) )
839                 if (ep->workCbk != NULL)
840                   {
841                     EndpWorkCbk func = ep->workCbk ;
842                     void *data = ep->workData ;
843 
844                     ep->workCbk = NULL ;
845                     ep->workData = NULL ;
846                     TMRstart(TMR_CALLBACK);
847                     func (ep,data) ;
848                     TMRstop(TMR_CALLBACK);
849                   }
850 
851             }
852         }
853     }
854 }
855 
addWorkCallback(EndPoint endp,EndpWorkCbk cbk,void * data)856 void *addWorkCallback (EndPoint endp, EndpWorkCbk cbk, void *data)
857 {
858   void *oldBk = endp->workData ;
859 
860   endp->workCbk = cbk ;
861   endp->workData = data ;
862 
863   return oldBk ;
864 }
865 
866 /* Tell the Run routine to stop next time around. */
stopRun(void)867 void stopRun (void)
868 {
869   keepSelecting = 0 ;
870 }
871 
872 
endPointErrno(EndPoint endp)873 int endPointErrno (EndPoint endp)
874 {
875   return endp->myErrno ;
876 }
877 
readIsPending(EndPoint endp)878 bool readIsPending (EndPoint endp)
879 {
880   return (endp->inBuffer != NULL ? true : false) ;
881 }
882 
writeIsPending(EndPoint endp)883 bool writeIsPending (EndPoint endp)
884 {
885   return (endp->outBuffer != NULL ? true : false) ;
886 }
887 
setMainEndPoint(EndPoint endp)888 void setMainEndPoint (EndPoint endp)
889 {
890   struct stat buf ;
891 
892   mainEndPoint = endp ;
893   if (endp->myFd >= 0 && fstat (endp->myFd,&buf) < 0)
894     syslog (LOG_ERR,"Can't fstat mainEndPoint fd (%d): %m", endp->myFd) ;
895   else if (endp->myFd < 0)
896     syslog (LOG_ERR,"Negative fd for mainEndPoint???") ;
897   else
898     mainEpIsReg = (S_ISREG(buf.st_mode) ? true : false) ;
899 }
900 
getMainEndPointFd(void)901 int getMainEndPointFd (void)
902 {
903   return(mainEndPoint->myFd) ;
904 }
905 
freeTimeoutQueue(void)906 void freeTimeoutQueue (void)
907 {
908   TimerElem p, n ;
909 
910   p = timeoutQueue ;
911   while (p)
912     {
913       n = p->next ;
914       p->next = timeoutPool ;
915       timeoutPool = p;
916       p = n ;
917       timeoutQueueLength-- ;
918     }
919 }
920 
921 
922 /***********************************************************************/
923 /*                      STATIC FUNCTIONS BELOW HERE                    */
924 /***********************************************************************/
925 
926 
927 /*
928  * called when the file descriptor on this endpoint is read ready.
929  */
doRead(EndPoint endp)930 static IoStatus doRead (EndPoint endp)
931 {
932   int i = 0 ;
933   unsigned int idx ;
934   unsigned int bCount = 0 ;
935   struct iovec *vp = NULL ;
936   Buffer *buffers = endp->inBuffer ;
937   unsigned int currIdx = endp->inBufferIdx ;
938   size_t amt = 0 ;
939   IoStatus rval = IoIncomplete ;
940 
941   TMRstart(TMR_READ);
942   for (i = currIdx ; buffers && buffers [i] != NULL ; i++)
943     bCount++ ;
944 
945   bCount = (bCount > IOV_MAX ? IOV_MAX : bCount) ;
946 
947   /* now set up the iovecs for the readv */
948   if (bCount > 0)
949     {
950       char *bbase ;
951       size_t bds, bs ;
952 
953       vp = xcalloc (bCount, sizeof(struct iovec)) ;
954 
955       bbase = bufferBase (buffers[currIdx]) ;
956       bds = bufferDataSize (buffers[currIdx]) ;
957       bs = bufferSize (buffers [currIdx]) ;
958 
959       /* inIndex is an index in the virtual array of the read, not directly
960          into the buffer. */
961       vp[0].iov_base = bbase + bds + endp->inIndex ;
962       vp[0].iov_len = bs - bds - endp->inIndex ;
963 
964       amt = vp[0].iov_len ;
965 
966       for (idx = currIdx + 1 ; idx < bCount ; idx++)
967         {
968           bbase = bufferBase (buffers[idx]) ;
969           bds = bufferDataSize (buffers[idx]) ;
970           bs = bufferSize (buffers [idx]) ;
971 
972           vp [idx].iov_base = bbase ;
973           vp [idx].iov_len = bs - bds ;
974           amt += (bs - bds) ;
975         }
976 
977       i = readv (endp->myFd,vp,(int) bCount) ;
978 
979       if (i > 0)
980         {
981           size_t readAmt = (size_t) i ;
982 
983           endp->inAmtRead += readAmt ;
984 
985           /* check if we filled the first buffer */
986           if (readAmt >= (size_t) vp[0].iov_len)
987             {                   /* we did */
988               bufferIncrDataSize (buffers[currIdx], vp[0].iov_len) ;
989               readAmt -= vp [0].iov_len ;
990               endp->inBufferIdx++ ;
991             }
992           else
993             {
994               endp->inIndex += readAmt ;
995               bufferIncrDataSize (buffers[currIdx], readAmt) ;
996               readAmt = 0 ;
997             }
998 
999           /* now check the rest of the buffers */
1000           for (idx = 1 ; readAmt > 0 ; idx++)
1001             {
1002               ASSERT (idx < bCount) ;
1003 
1004               bs = bufferSize (buffers [currIdx + idx]) ;
1005               bbase = bufferBase (buffers [currIdx + idx]) ;
1006               bds = bufferDataSize (buffers [currIdx + idx]) ;
1007 
1008               if (readAmt >= (bs - bds))
1009                 {
1010                   bufferSetDataSize (buffers [currIdx + idx],bs) ;
1011                   readAmt -= bs ;
1012                   endp->inBufferIdx++ ;
1013                 }
1014               else
1015                 {
1016                   endp->inIndex = readAmt ;
1017                   bufferIncrDataSize (buffers [currIdx + idx], readAmt) ;
1018                   memset (bbase + bds + readAmt, 0, bs - bds - readAmt) ;
1019                   readAmt = 0 ;
1020                 }
1021             }
1022 
1023           if (endp->inAmtRead >= endp->inMinLen)
1024             {
1025               endp->inIndex = 0 ;
1026               rval = IoDone ;
1027             }
1028         }
1029       else if (i < 0 && errno != EINTR && errno != EAGAIN)
1030         {
1031           endp->myErrno = errno ;
1032           rval = IoFailed ;
1033         }
1034       else if (i < 0 && errno == EINTR)
1035         {
1036 	  handleSignals () ;
1037         }
1038       else if (i == 0)
1039         rval = IoEOF ;
1040       else                   /* i < 0 && errno == EAGAIN */
1041         rval = IoIncomplete ;
1042 
1043       free (vp) ;
1044     }
1045   else
1046     rval = IoDone ;
1047   TMRstop(TMR_READ);
1048   return rval ;
1049 }
1050 
1051 /* called when the file descriptor on the endpoint is write ready. */
doWrite(EndPoint endp)1052 static IoStatus doWrite (EndPoint endp)
1053 {
1054   unsigned int idx ;
1055   int i = 0 ;
1056   size_t amt = 0 ;
1057   unsigned int bCount = 0 ;
1058   struct iovec *vp = NULL ;
1059   Buffer *buffers = endp->outBuffer ;
1060   unsigned int currIdx = endp->outBufferIdx ;
1061   IoStatus rval = IoIncomplete ;
1062 
1063   TMRstart(TMR_WRITE);
1064   for (i = currIdx ; buffers && buffers [i] != NULL ; i++)
1065     bCount++ ;
1066 
1067   bCount = (bCount > IOV_MAX ? IOV_MAX : bCount) ;
1068 
1069   if (bCount > 0)
1070     {
1071       vp = xcalloc (bCount, sizeof(struct iovec)) ;
1072 
1073       vp[0].iov_base = bufferBase (buffers [currIdx]) ;
1074       vp[0].iov_base = (char *) vp[0].iov_base + endp->outIndex ;
1075       vp[0].iov_len = bufferDataSize (buffers [currIdx]) - endp->outIndex ;
1076 
1077       amt = vp[0].iov_len ;
1078 
1079       for (idx = 1 ; idx < bCount ; idx++)
1080         {
1081           vp [idx].iov_base = bufferBase (buffers [idx + currIdx]) ;
1082           vp [idx].iov_len = bufferDataSize (buffers [idx + currIdx]) ;
1083           amt += vp[idx].iov_len ;
1084         }
1085 
1086 #if 1
1087       if (debugWrites)
1088         {
1089           /* nasty mixing, but stderr is unbuffered usually. It's debugging only */
1090           d_printf (5,"About to write this:================================\n") ;
1091           writev (2,vp,bCount) ;
1092           d_printf (5,"end=================================================\n") ;
1093         }
1094 
1095 #endif
1096 
1097       ASSERT (endp->myFd >= 0) ;
1098       ASSERT (vp != 0) ;
1099       ASSERT (bCount > 0) ;
1100 
1101       i = writev (endp->myFd,vp,(int) bCount) ;
1102 
1103       if (i > 0)
1104         {
1105           size_t writeAmt = (size_t) i ;
1106 
1107           endp->outAmtWritten += writeAmt ;
1108 
1109           /* now figure out which buffers got completely written */
1110           for (idx = 0 ; writeAmt > 0 ; idx++)
1111             {
1112               if (writeAmt >= (size_t) vp[idx].iov_len)
1113                 {
1114                   endp->outBufferIdx++ ;
1115                   endp->outIndex = 0 ;
1116                   writeAmt -= vp [idx].iov_len ;
1117                 }
1118               else
1119                 {
1120                   /* this buffer was not completly written */
1121                   endp->outIndex += writeAmt ;
1122                   writeAmt = 0 ;
1123                 }
1124             }
1125 
1126           if (endp->outAmtWritten == endp->outSize)
1127             rval = IoDone ;
1128 	  else
1129             rval = IoProgress ;
1130         }
1131       else if (i < 0 && errno == EINTR)
1132         {
1133 	  handleSignals () ;
1134         }
1135       else if (i < 0 && errno == EAGAIN)
1136         {
1137           rval = IoIncomplete ;
1138         }
1139       else if (i < 0)
1140         {
1141           endp->myErrno = errno ;
1142           rval = IoFailed ;
1143         }
1144       else
1145         d_printf (1,"Wrote 0 bytes in doWrite()?\n") ;
1146 
1147       free (vp) ;
1148     }
1149   else
1150     rval = IoDone ;
1151 
1152   TMRstop(TMR_WRITE);
1153   return rval ;
1154 }
1155 
1156 
doExcept(EndPoint endp)1157 static IoStatus doExcept (EndPoint endp)
1158 {
1159   int optval;
1160   socklen_t size ;
1161   int fd = endPointFd (endp) ;
1162 
1163   if (getsockopt (fd, SOL_SOCKET, SO_ERROR,
1164                   (char *) &optval, &size) != 0)
1165     syswarn ("ME exception: getsockopt (%d)", fd) ;
1166   else if (optval != 0)
1167     {
1168       errno = optval ;
1169       syswarn ("ME exception: fd %d", fd) ;
1170     }
1171   else
1172     syswarn ("ME exception: fd %d: Unknown error", fd) ;
1173 
1174 #if 0
1175   sleep (5) ;
1176   abort () ;
1177 #endif
1178 
1179   /* Not reached */
1180   return IoFailed ;
1181 }
1182 
1183 #if 0
1184 static void endPointPrint (EndPoint ep, FILE *fp)
1185 {
1186   fprintf (fp,"EndPoint [%p]: fd [%d]\n",(void *) ep, ep->myFd) ;
1187 }
1188 #endif
1189 
signalHandler(int s)1190 static void signalHandler (int s)
1191 {
1192   sigFlags[s] = 1 ;
1193 #ifndef HAVE_SIGACTION
1194   xsignal (s, signalHandler) ;
1195 #endif
1196 }
1197 
1198 
pipeHandler(int s)1199 static void pipeHandler (int s)
1200 {
1201   xsignal (s, pipeHandler) ;
1202 }
1203 
1204 
1205 /* compare the hit ratio of two endpoint for qsort. We're sorting the
1206    endpoints on their relative activity */
hitCompare(const void * v1,const void * v2)1207 static int hitCompare (const void *v1, const void *v2)
1208 {
1209   const struct endpoint_s *e1 = *((const struct endpoint_s * const *) v1) ;
1210   const struct endpoint_s *e2 = *((const struct endpoint_s * const *) v2) ;
1211   double e1Hit = e1->selectHits ;
1212   double e2Hit = e2->selectHits ;
1213 
1214   if (e1 == mainEndPoint)
1215     return -1 ;
1216   else if (e2 == mainEndPoint)
1217     return 1 ;
1218   else if (e1Hit < e2Hit)
1219     return 1 ;
1220   else if (e1Hit > e2Hit)
1221     return -1 ;
1222 
1223   return 0 ;
1224 }
1225 
1226 
1227 
1228 /* We maintain the endpoints in order of the percent times they're ready
1229    for read/write when they've been selected. This helps us favour the more
1230    active endpoints. */
reorderPriorityList(void)1231 static void reorderPriorityList (void)
1232 {
1233   unsigned int i, j ;
1234   static int thisTime = 4;
1235 
1236   /* only sort every 4th time since it's so expensive */
1237   if (--thisTime > 0)
1238     return ;
1239 
1240   thisTime = 4;
1241 
1242   for (i = j = 0; i < priorityCount; i++)
1243     if (priorityList [i] != NULL)
1244       {
1245         if (i != j)
1246           priorityList [j] = priorityList [i] ;
1247         j++ ;
1248       }
1249 
1250   for (i = j; i < priorityCount; i++)
1251     priorityList [ i ] = NULL;
1252 
1253   priorityCount = j;
1254 
1255   qsort (priorityList, (size_t)priorityCount, sizeof (EndPoint), &hitCompare);
1256 }
1257 
1258 
1259 #define TIMEOUT_POOL_SIZE ((4096 - 2 * (sizeof (void *))) / (sizeof (TimerElemStruct)))
1260 
1261 /* create a new timeout data structure properly initialized. */
newTimerElem(TimeoutId i,time_t w,EndpTCB f,void * d)1262 static TimerElem newTimerElem (TimeoutId i, time_t w, EndpTCB f, void *d)
1263 {
1264   TimerElem p ;
1265 
1266   if (timeoutPool == NULL)
1267     {
1268       unsigned int j ;
1269 
1270       timeoutPool = xmalloc (sizeof(TimerElemStruct) * TIMEOUT_POOL_SIZE) ;
1271 
1272       for (j = 0; j < TIMEOUT_POOL_SIZE - 1; j++)
1273         timeoutPool[j] . next = &(timeoutPool [j + 1]) ;
1274       timeoutPool [TIMEOUT_POOL_SIZE-1] . next = NULL ;
1275     }
1276 
1277   p = timeoutPool ;
1278   timeoutPool = timeoutPool->next ;
1279 
1280   ASSERT (p != NULL) ;
1281 
1282   p->id = i ;
1283   p->when = w ;
1284   p->func = f ;
1285   p->data = d ;
1286   p->next = NULL ;
1287 
1288   return p ;
1289 }
1290 
1291 
1292 
1293 /* add a new timeout structure to the global list. */
timerElemAdd(time_t when,EndpTCB func,void * data)1294 static TimeoutId timerElemAdd (time_t when, EndpTCB func, void *data)
1295 {
1296   TimerElem p = newTimerElem (++nextId ? nextId : ++nextId,when,func,data) ;
1297   TimerElem n = timeoutQueue ;
1298   TimerElem q = NULL ;
1299 
1300   while (n != NULL && n->when <= when)
1301     {
1302       q = n ;
1303       n = n->next ;
1304     }
1305 
1306   if (n == NULL && q == NULL)   /* empty list so put at head */
1307     timeoutQueue = p ;
1308   else if (q == NULL)           /* put at head of list */
1309     {
1310       p->next = timeoutQueue ;
1311       timeoutQueue = p ;
1312     }
1313   else if (n == NULL)           /* put at end of list */
1314     q->next = p ;
1315   else                          /* in middle of list */
1316     {
1317       p->next = q->next ;
1318       q->next = p ;
1319     }
1320 
1321   timeoutQueueLength++ ;
1322 
1323   return p->id ;
1324 }
1325 
1326 
1327 /* Fills in TOUT with the timeout to use on the next call to
1328  * select. Returns TOUT. If there is no timeout, then returns NULL.  If the
1329  * timeout has already passed, then it calls the timeout handling routine
1330  * first.
1331  */
getTimeout(struct timeval * tout)1332 static struct timeval *getTimeout (struct timeval *tout)
1333 {
1334   struct timeval *rval = NULL ;
1335 
1336   if (timeoutQueue != NULL)
1337     {
1338       time_t now = theTime() ;
1339 
1340       while (timeoutQueue && now > timeoutQueue->when)
1341         doTimeout () ;
1342 
1343       if (timeoutQueue != NULL && now == timeoutQueue->when)
1344         {
1345           tout->tv_sec = 0 ;
1346           tout->tv_usec = 0 ;
1347           rval = tout ;
1348         }
1349       else if (timeoutQueue != NULL)
1350         {
1351           tout->tv_sec = timeoutQueue->when - now ;
1352           tout->tv_usec = 0 ;
1353           rval = tout ;
1354         }
1355     }
1356 
1357   return rval ;
1358 }
1359 
1360 
1361 
1362 
1363 
1364 
doTimeout(void)1365 static void doTimeout (void)
1366 {
1367   EndpTCB cbk = timeoutQueue->func ;
1368   void *data = timeoutQueue->data ;
1369   TimerElem p = timeoutQueue ;
1370   TimeoutId tid = timeoutQueue->id ;
1371 
1372   timeoutQueue = timeoutQueue->next ;
1373 
1374   p->next = timeoutPool ;
1375   timeoutPool = p ;
1376 
1377   timeoutQueueLength-- ;
1378 
1379   if (cbk)
1380     (*cbk) (tid, data) ;        /* call the callback function */
1381 }
1382 
1383 
1384 
1385 
1386 #if defined (WANT_MAIN)
1387 
1388 #define BUFF_SIZE 100
1389 
lineIsWritten(EndPoint ep,IoStatus status,Buffer * buffers,void * d UNUSED)1390 static void lineIsWritten (EndPoint ep, IoStatus status, Buffer *buffers, void *d UNUSED)
1391 {
1392   int i ;
1393 
1394   if (status == IoDone)
1395     d_printf (1,"LINE was written\n") ;
1396   else
1397     {
1398       int oldErrno = errno ;
1399 
1400       errno = endPointErrno (ep) ;
1401       perror ("write failed") ;
1402       errno = oldErrno ;
1403     }
1404 
1405   for (i = 0 ; buffers && buffers[i] ; i++) {
1406     delBuffer (buffers[i]);
1407   }
1408 }
1409 
lineIsRead(EndPoint myEp,IoStatus status,Buffer * buffers,void * d UNUSED)1410 static void lineIsRead (EndPoint myEp, IoStatus status, Buffer *buffers, void *d UNUSED)
1411 {
1412   Buffer *writeBuffers, *readBuffers ;
1413   Buffer newBuff1, newBuff2 ;
1414   Buffer newInputBuffer ;
1415   char *data, *p ;
1416   size_t len ;
1417 
1418   if (status == IoFailed)
1419     {
1420       int oldErrno = errno ;
1421 
1422       errno = endPointErrno (myEp) ;
1423       perror ("read failed") ;
1424       errno = oldErrno ;
1425 
1426       return ;
1427     }
1428   else if (status == IoEOF)
1429     {
1430       d_printf (1,"EOF on endpoint.\n") ;
1431       delEndPoint (myEp) ;
1432 
1433       return ;
1434     }
1435 
1436 
1437   data = bufferBase (buffers[0]);
1438   len = bufferDataSize (buffers[0]);
1439 
1440   if (data [len - 1] == '\r' || data [len - 1] == '\n')
1441     bufferDecrDataSize (buffers[0], 1);
1442   if (data [len - 1] == '\r' || data [len - 1] == '\n')
1443     bufferDecrDataSize (buffers[0], 1);
1444 
1445   data [len] = '\0' ;
1446 
1447   d_printf (1,"Got a line: %s\n", data) ;
1448 
1449   newBuff1 = newBuffer (len + 50) ;
1450   newBuff2 = newBuffer (len + 50) ;
1451   newInputBuffer = newBuffer (BUFF_SIZE) ;
1452 
1453   p = bufferBase (newBuff1) ;
1454   strlcpy (p, "Thanks for that \"", bufferSize (newBuff1)) ;
1455   bufferSetDataSize (newBuff1,strlen (p)) ;
1456 
1457   p = bufferBase (newBuff2) ;
1458   strlcpy (p,"\" very tasty\n", bufferSize (newBuff2)) ;
1459   bufferSetDataSize (newBuff2,strlen (p)) ;
1460 
1461   writeBuffers = makeBufferArray (newBuff1, buffers[0], newBuff2, NULL);
1462   readBuffers = makeBufferArray (newInputBuffer,NULL) ;
1463 
1464   prepareWrite(myEp, writeBuffers, NULL, lineIsWritten, NULL);
1465   prepareRead(myEp, readBuffers, lineIsRead, NULL, 1);
1466 }
1467 
1468 
printDate(TimeoutId tid,void * data)1469 static void printDate (TimeoutId tid, void *data)
1470 {
1471   char dateString[30];
1472   const time_t t = theTime() ;
1473 
1474   timeToString (t, dateString, sizeof (dateString)) ;
1475   d_printf (1,"Timeout (%d) time now is %ld %s\n",
1476             (int) tid, (long) t, dateString) ;
1477 
1478   if (timeoutQueue == NULL)
1479     {
1480       int ti = (rand () % 10) + 1 ;
1481 
1482       prepareSleep (printDate,ti,data) ;
1483     }
1484 }
1485 
1486 TimeoutId rm ;
1487 
Timeout(TimeoutId tid,void * data)1488 static void Timeout (TimeoutId tid, void *data)
1489 {
1490   static int seeded ;
1491   static int howMany ;
1492   static int i ;
1493   char dateString[30];
1494   const time_t t = theTime() ;
1495 
1496   if ( !seeded )
1497     {
1498       srand (t) ;
1499       seeded = 1 ;
1500     }
1501 
1502   timeToString (t, dateString, sizeof (dateString)) ;
1503   d_printf (1,"Timeout (%d) time now is %ld %s\n",
1504             (int) tid, (long) t, dateString) ;
1505 
1506   if (timeoutQueue != NULL && timeoutQueue->next != NULL)
1507     d_printf (1,"%s timeout id %d\n",
1508              (removeTimeout (rm) ? "REMOVED" : "FAILED TO REMOVE"), rm) ;
1509   rm = 0 ;
1510 
1511   howMany = (rand() % 10) + (timeoutQueue == NULL ? 1 : 0) ;
1512 
1513   for (i = 0 ; i < howMany ; i++ )
1514     {
1515       TimeoutId id ;
1516       int count = (rand () % 30) + 1 ;
1517 
1518       id = (i % 2 == 0 ? prepareSleep (Timeout,count,data)
1519             : prepareWake (Timeout,t + count,data)) ;
1520 
1521       if (rm == 0)
1522         rm = id ;
1523     }
1524 }
1525 
1526 
newConn(EndPoint ep,IoStatus status UNUSED,Buffer * buffers UNUSED,void * d UNUSED)1527 static void newConn (EndPoint ep, IoStatus status UNUSED, Buffer *buffers UNUSED, void *d UNUSED)
1528 {
1529   EndPoint newEp ;
1530   struct sockaddr_in in ;
1531   Buffer *readBuffers ;
1532   Buffer newBuff = newBuffer (BUFF_SIZE) ;
1533   socklen_t len = sizeof (in);
1534   int fd ;
1535 
1536   memset (&in, 0, sizeof (in)) ;
1537 
1538   fd = accept (ep->myFd, (struct sockaddr *) &in, &len) ;
1539 
1540   if (fd < 0)
1541     {
1542       perror ("::accept") ;
1543       return ;
1544     }
1545 
1546   newEp = newEndPoint (fd) ;
1547 
1548   prepareRead(ep, NULL, newConn, NULL, 0);
1549 
1550   readBuffers = makeBufferArray (newBuff,NULL) ;
1551 
1552   prepareRead(newEp, readBuffers, lineIsRead, NULL, 1);
1553 
1554   d_printf (1,"Set up a new connection\n");
1555 }
1556 
1557 
main(int argc,char ** argv)1558 int main (int argc, char **argv)
1559 {
1560   EndPoint accConn ;
1561   struct sockaddr_in accNet ;
1562   int accFd = socket (AF_INET,SOCK_STREAM,0) ;
1563   unsigned short port = atoi (argc > 1 ? argv[1] : "10000") ;
1564   time_t t = theTime() ;
1565 
1566 
1567   char * program = strrchr (argv[0],'/') ;
1568 
1569   if (!program)
1570     program = argv [0] ;
1571   else
1572     program++ ;
1573 
1574   ASSERT (accFd >= 0) ;
1575 
1576   memset (&accNet,0,sizeof (accNet)) ;
1577   accNet.sin_family = AF_INET ;
1578   accNet.sin_addr.s_addr = htonl (INADDR_ANY) ;
1579   accNet.sin_port = htons (port) ;
1580 
1581 #ifdef LOG_PERROR
1582   openlog (program, LOG_PERROR | LOG_PID, LOG_NEWS) ;
1583 #else
1584   openlog (program, LOG_PID, LOG_NEWS) ;
1585 #endif
1586 
1587   if (bind (accFd, (struct sockaddr *) &accNet, sizeof (accNet)) < 0)
1588     {
1589       perror ("bind: %m") ;
1590       exit (1) ;
1591     }
1592 
1593   listen (accFd,5) ;
1594 
1595   accConn = newEndPoint (accFd) ;
1596 
1597   prepareRead(accConn, NULL, newConn, NULL, 0);
1598 
1599   prepareSleep (Timeout,5,(void *) 0x10) ;
1600 
1601   t = theTime() ;
1602   d_printf (1,"Time now is %s",ctime(&t)) ;
1603 
1604   prepareWake (printDate,t + 16,NULL) ;
1605 
1606   Run () ;
1607 
1608   return 0;
1609 }
1610 #endif /* WANT_MAIN */
1611 
1612 /* Probably doesn't do the right thing for SIGCHLD */
setSigHandler(int signum,void (* ptr)(int))1613 void setSigHandler (int signum, void (*ptr)(int))
1614 {
1615   unsigned int i ;
1616 
1617   if (sigHandlers == NULL)
1618     {
1619       sigHandlers = xmalloc (sizeof(sigfn) * NSIG) ;
1620       sigFlags = xmalloc (sizeof(sig_atomic_t) * NSIG) ;
1621       for (i = 0 ; i < NSIG ; i++)
1622         {
1623           sigHandlers [i] = NULL ;
1624           sigFlags [i] = 0 ;
1625         }
1626     }
1627 
1628   if (signum >= NSIG)
1629     {
1630       syslog (LOG_ERR,"ME signal number bigger than NSIG: %d vs %d",
1631               signum,NSIG) ;
1632       return ;
1633     }
1634 
1635   if (xsignal (signum, signalHandler) == SIG_ERR)
1636     die ("signal failed: %s", strerror(errno)) ;
1637 
1638   sigHandlers[signum] = ptr ;
1639 }
1640 
handleSignals(void)1641 static void handleSignals (void)
1642 {
1643   int i ;
1644 
1645   for (i = 1; i < NSIG; i++)
1646     {
1647       if (sigFlags[i])
1648         {
1649 #if defined(HAVE_SIGACTION)
1650           sigset_t set, oset ;
1651 
1652           if (sigemptyset (&set) != 0 || sigaddset (&set, i) != 0)
1653             die ("sigemptyset or sigaddset failed") ;
1654           if (sigprocmask (SIG_BLOCK, &set, &oset) != 0)
1655             die ("sigprocmask failed: %s", strerror(errno)) ;
1656 #else
1657       /* hope for the best */
1658 #endif
1659 
1660           sigFlags[i] = 0;
1661 
1662           if (sigHandlers[i] != NULL &&
1663               sigHandlers[i] != SIG_IGN &&
1664               sigHandlers[i] != SIG_DFL)
1665             (sigHandlers[i])(i) ;
1666 
1667 #if defined(HAVE_SIGACTION)
1668           if (sigprocmask (SIG_SETMASK, &oset, (sigset_t *)NULL) != 0)
1669             die ("sigprocmask failed: %s", strerror(errno)) ;
1670 #else
1671           /* hope for the best */
1672 #endif
1673         }
1674     }
1675 }
1676 
1677 
endpointConfigLoadCbk(void * data)1678 int endpointConfigLoadCbk (void *data)
1679 {
1680   FILE *fp = (FILE *) data ;
1681   long ival ;
1682   int rval = 1 ;
1683 
1684   if (getInteger (topScope,"stdio-fdmax",&ival,NO_INHERIT))
1685     {
1686       stdioFdMax = ival ;
1687 
1688 #if ! defined (FD_SETSIZE)
1689 
1690       if (stdioFdMax > 0)
1691         {
1692           logOrPrint (LOG_ERR,fp,NO_STDIO_FDMAX) ;
1693           stdioFdMax = 0 ;
1694           rval = 0 ;
1695         }
1696 
1697 #else
1698 
1699       if (stdioFdMax > FD_SETSIZE)
1700         {
1701           logOrPrint (LOG_ERR,fp,
1702                       "ME config: value of %s (%ld) in %s is higher"
1703                       " than maximum of %ld. Using %ld","stdio-fdmax",
1704                       ival,"global scope",
1705                       (long) FD_SETSIZE, (long) FD_SETSIZE) ;
1706           stdioFdMax = FD_SETSIZE ;
1707           rval = 0 ;
1708         }
1709 
1710 #endif
1711 
1712     }
1713   else
1714     stdioFdMax = 0 ;
1715 
1716   return rval ;
1717 }
1718 
1719 
1720 
1721 #if 0
1722 /* definitely not the fastest, but the most portable way to find the first
1723   set bit in a mask  */
1724 static int ff_set (fd_set *set,unsigned int start)
1725 {
1726   unsigned int i ;
1727 
1728   for (i = start ; i < FD_SETSIZE ; i++)
1729     if (FD_ISSET (i,set))
1730       return (int) i ;
1731 
1732   return -1 ;
1733 }
1734 
1735 
1736 static int ff_free (fd_set *set, unsigned int start)
1737 {
1738   unsigned int i ;
1739 
1740   for (i = start ; i < FD_SETSIZE ; i++)
1741     if (!FD_ISSET (i,set))
1742       return i ;
1743 
1744 
1745   return -1 ;
1746 }
1747 #endif
1748 
1749 
endpointCleanup(void)1750 static void endpointCleanup (void)
1751 {
1752   free (endPoints) ;
1753   free (priorityList) ;
1754   free (sigHandlers) ;
1755   endPoints = NULL ;
1756   priorityList = NULL ;
1757   sigHandlers = NULL ;
1758 }
1759