1 /*  $Id: connection.c 10283 2018-05-14 12:43:05Z iulius $
2 **
3 **  The implementation of the innfeed Connection class.
4 **
5 **  Written by James Brister <brister@vix.com>
6 **
7 **  The Connection object is what manages the NNTP protocol. If the remote
8 **  doesn't do streaming, then the standard IHAVE lock-step protcol is
9 **  performed. In the streaming situation we have two cases. One where we must
10 **  send CHECK commands, and the other where we can directly send TAKETHIS
11 **  commands without a prior CHECK.
12 **
13 **  The Connection object maintains four article queues. The first one is
14 **  where new articles are put if they need to have an IHAVE or CHECK command
15 **  sent for them. The second queue is where the articles move from the first
16 **  after their IHAVE/CHECK command is sent, but the reply has not yet been
17 **  seen. The third queue is where articles go after the IHAVE/CHECK reply has
18 **  been seen (and the reply says to send the article). It is articles in the
19 **  third queue that have the TAKETHIS command sent, or the body of an IHAVE.
20 **  The third queue is also where new articles go if the connection is running
21 **  in no-CHECK mode. The fourth queue is where the articles move to from the
22 **  third queue after their IHAVE-body or TAKETHIS command has been sent. When
23 **  the response to the IHAVE-body or TAKETHIS is received the articles are
24 **  removed from the fourth queue and the Host object controlling this
25 **  Connection is notified of the success or failure of the transfer.
26 **
27 **  The whole system is event-driven by the EndPoint class and the Host via
28 **  calls to prepareRead() and prepareWrite() and prepareSleep().
29 **
30 **
31 **  We should probably store the results of gethostbyname in the connection so
32 **  we can rotate through the address when one fails for connecting. Perhaps
33 **  the gethostbyname should be done in the Host and the connection should
34 **  just be given the address to use.
35 **
36 **  Should we worry about articles being stuck on a queue for ever if the
37 **  remote forgets to send a response to a CHECK?
38 **
39 **  Perhaps instead of killing the connection on some of the more simple
40 **  errors, we should perhaps try to flush the input and keep going.
41 **
42 **  Worry about counter overflow.
43 **
44 **  Worry about stats gathering when switch to no-check mode.
45 **
46 **  XXX if issueQUIT() has a problem and the state goes to cxnDeadS this is
47 **  not handled properly everywhere yet.
48 */
49 
50 #include "innfeed.h"
51 #include "config.h"
52 #include "clibrary.h"
53 #include "portable/socket.h"
54 
55 #include <assert.h>
56 #include <errno.h>
57 #include <fcntl.h>
58 #include <netdb.h>
59 #include <signal.h>
60 #include <syslog.h>
61 
62 #ifdef HAVE_SYS_TIME_H
63 # include <sys/time.h>
64 #endif
65 #include <time.h>
66 
67 #if defined (__FreeBSD__)
68 # include <sys/ioctl.h>
69 #endif
70 
71 #include "inn/fdflag.h"
72 #include "inn/innconf.h"
73 #include "inn/messages.h"
74 #include "inn/network.h"
75 #include "inn/libinn.h"
76 
77 #include "article.h"
78 #include "buffer.h"
79 #include "configfile.h"
80 #include "connection.h"
81 #include "endpoint.h"
82 #include "host.h"
83 
84 #if defined (NDEBUG)
85 #define VALIDATE_CONNECTION(x) ((void) 0)
86 #else
87 #define VALIDATE_CONNECTION(x) validateConnection (x)
88 #endif
89 
90 extern char **PointersFreedOnExit ;
91 extern const char *pidFile ;
92 
93 /*
94  * Private types.
95  */
96 
97 /* We keep a linked list of articles the connection is trying to transmit */
98 typedef struct art_holder_s
99 {
100     Article article ;
101     struct art_holder_s *next ;
102 } *ArtHolder ;
103 
104 
105 typedef enum {
106   cxnStartingS,                 /* the connection's start state. */
107   cxnWaitingS,                  /* not connected. Waiting for an article. */
108   cxnConnectingS,               /* in the middle of connecting */
109   cxnIdleS,			/* open and ready to feed, has empty queues */
110   cxnIdleTimeoutS,	        /* timed out in the idle state */
111   cxnFeedingS,                  /* in the processes of feeding articles */
112   cxnSleepingS,                 /* blocked on reestablishment timer */
113   cxnFlushingS,                 /* am waiting for queues to drain to bounce connection. */
114   cxnClosingS,                  /* have been told to close down permanently when queues drained */
115   cxnDeadS                      /* connection is dead. */
116 } CxnState ;
117 
118 /* The Connection class */
119 struct connection_s
120 {
121     Host myHost ;               /* the host who owns the connection */
122     EndPoint myEp ;             /* the endpoint the connection talks through */
123     unsigned int ident ;               /* an identifier for syslogging. */
124     CxnState state ;            /* the state the connection is in */
125 
126 
127     /*
128      * The Connection maintains 4 queue of articles.
129      */
130     ArtHolder checkHead ;       /* head of article list to do CHECK/IHAVE */
131     ArtHolder checkRespHead ;   /* head of list waiting on CHECK/IHAVE
132                                    response */
133     ArtHolder takeHead ;        /* head of list of articles to send
134                                    TAKETHIS/IHAVE-body */
135     ArtHolder takeRespHead ;    /* list of articles waiting on
136                                    TAKETHIS/IHAVE-body response */
137     unsigned int articleQTotal ;       /* number of articles in all four queues */
138     ArtHolder missing ;         /* head of missing list */
139 
140 
141     Buffer respBuffer ;         /* buffer all responses are read into */
142 
143     char *ipName ;              /* the ip name (possibly quad) of the remote */
144 
145     unsigned int maxCheck ;            /* the max number of CHECKs to send */
146     unsigned short port ;              /* the port number to use */
147 
148     /*
149      * Timeout values and their callback IDs
150      */
151 
152     /* Timer for max amount of time between receiving articles from the
153        Host */
154     unsigned int articleReceiptTimeout ;
155     TimeoutId artReceiptTimerId ;
156 
157     /* Timer for the max amount of time to wait for a response from the
158        remote */
159     unsigned int readTimeout ;
160     TimeoutId readBlockedTimerId ;
161 
162     /* Timer for the max amount of time to wait for a any amount of data
163        to be written to the remote */
164     unsigned int writeTimeout ;
165     TimeoutId writeBlockedTimerId ;
166 
167     /* Timer for the max number of seconds to keep the network connection
168        up (long lasting connections give older nntp servers problems). */
169     unsigned int flushTimeout ;
170     TimeoutId flushTimerId ;
171 
172     /* Timer for the number of seconds to sleep before attempting a
173        reconnect. */
174     unsigned int sleepTimeout ;
175     TimeoutId sleepTimerId ;
176 
177 
178     bool loggedNoCr ;           /* true if we logged the NOCR_MSG */
179     bool immedRecon ;           /* true if we recon immediately after flushing. */
180     bool doesStreaming ;        /* true if remote will handle streaming */
181     bool authenticated ;        /* true if remote authenticated */
182     bool quitWasIssued ;          /* true if QUIT command was sent. */
183     bool needsChecks ;          /* true if we issue CHECK commands in
184                                    streaming mode (rather than just sending
185                                    TAKETHIS commands) */
186 
187     time_t timeCon;             /* the time the connect happened (including
188                                    the MODE STREAM command) */
189     time_t timeCon_checkpoint;
190 
191     /*
192      * STATISTICS
193      */
194     unsigned int artsTaken;           /* the number of articles INN gave this cxn */
195 
196     unsigned int checksIssued;        /* the number of CHECKs/IHAVEs we
197                                           sent.  Note that if we're running in
198                                           no-CHECK mode, then we add in the
199                                           TAKETHIS commands too */
200     unsigned int checksIssued_checkpoint;
201 
202     unsigned int checksRefused;       /* the number of response 435/438 */
203     unsigned int checksRefused_checkpoint;
204     unsigned int takesRejected;       /* the number of response 437/439 received */
205     unsigned int takesRejected_checkpoint;
206     unsigned int takesOkayed;         /* the number of response 235/239 received */
207     unsigned int takesOkayed_checkpoint;
208 
209     double takesSizeRejected;
210     double takesSizeRejected_checkpoint;
211     double takesSizeOkayed;
212     double takesSizeOkayed_checkpoint;
213 
214     double onThreshold ;        /* for no-CHECK mode */
215     double offThreshold ;       /* for no-CHECK mode */
216     double filterValue ;        /* current value of IIR filter */
217     double lowPassFilter ;      /* time constant for IIR filter */
218 
219     Connection next ;           /* for global list */
220 };
221 
222 static Connection gCxnList = NULL ;
223 static unsigned int gCxnCount = 0 ;
224 unsigned int max_reconnect_period = MAX_RECON_PER ;
225 unsigned int init_reconnect_period = INIT_RECON_PER ;
226 #if 0
227 static bool inited = false ;
228 #endif
229 static Buffer dotFirstBuffer ;
230 static Buffer dotBuffer ;
231 static Buffer crlfBuffer ;
232 
233 
234 /***************************************************
235  *
236  * Private function declarations.
237  *
238  ***************************************************/
239 
240 
241 /* I/O Callbacks */
242 static void connectionDone (EndPoint e, IoStatus i, Buffer *b, void *d) ;
243 static void getBanner (EndPoint e, IoStatus i, Buffer *b, void *d) ;
244 static void getAuthUserResponse (EndPoint e, IoStatus i, Buffer *b, void *d) ;
245 static void getAuthPassResponse (EndPoint e, IoStatus i, Buffer *b, void *d) ;
246 static void getModeResponse (EndPoint e, IoStatus i, Buffer *b, void *d) ;
247 static void responseIsRead (EndPoint e, IoStatus i, Buffer *b, void *d) ;
248 static void quitWritten (EndPoint e, IoStatus i, Buffer *b, void *d) ;
249 static void ihaveBodyDone (EndPoint e, IoStatus i, Buffer *b, void *d) ;
250 static void commandWriteDone (EndPoint e, IoStatus i, Buffer *b, void *d) ;
251 static void modeCmdIssued (EndPoint e, IoStatus i, Buffer *b, void *d) ;
252 static void authUserIssued (EndPoint e, IoStatus i, Buffer *b, void *d) ;
253 static void authPassIssued (EndPoint e, IoStatus i, Buffer *b, void *d) ;
254 static void writeProgress (EndPoint e, IoStatus i, Buffer *b, void *d) ;
255 
256 
257 /* Timer callbacks */
258 static void responseTimeoutCbk (TimeoutId id, void *data) ;
259 static void writeTimeoutCbk (TimeoutId id, void *data) ;
260 static void reopenTimeoutCbk (TimeoutId id, void *data) ;
261 static void flushCxnCbk (TimeoutId, void *data) ;
262 static void articleTimeoutCbk (TimeoutId id, void *data) ;
263 
264 /* Work callbacks */
265 static void cxnWorkProc (EndPoint ep, void *data) ;
266 
267 
268 static void cxnSleepOrDie (Connection cxn) ;
269 
270 /* Response processing. */
271 static void processResponse205 (Connection cxn, char *response) ;
272 static void processResponse238 (Connection cxn, char *response) ;
273 static void processResponse431 (Connection cxn, char *response) ;
274 static void processResponse438 (Connection cxn, char *response) ;
275 static void processResponse239 (Connection cxn, char *response) ;
276 static void processResponse439 (Connection cxn, char *response) ;
277 static void processResponse235 (Connection cxn, char *response) ;
278 static void processResponse335 (Connection cxn, char *response) ;
279 static void processResponse400 (Connection cxn, char *response) ;
280 static void processResponse435 (Connection cxn, char *response) ;
281 static void processResponse436 (Connection cxn, char *response) ;
282 static void processResponse437 (Connection cxn, char *response) ;
283 static void processResponse480 (Connection cxn, char *response) ;
284 static void processResponse503 (Connection cxn, char *response) ;
285 
286 
287 /* Misc functions */
288 static void cxnSleep (Connection cxn) ;
289 static void cxnDead (Connection cxn) ;
290 static void cxnIdle (Connection cxn) ;
291 static void noSuchMessageId (Connection cxn, unsigned int responseCode,
292                            const char *msgid, const char *response) ;
293 static void abortConnection (Connection cxn) ;
294 static void resetConnection (Connection cxn) ;
295 static void deferAllArticles (Connection cxn) ;
296 static void deferQueuedArticles (Connection cxn) ;
297 static void doSomeWrites (Connection cxn) ;
298 static bool issueIHAVE (Connection cxn) ;
299 static void issueIHAVEBody (Connection cxn) ;
300 static bool issueStreamingCommands (Connection cxn) ;
301 static Buffer buildCheckBuffer (Connection cxn) ;
302 static Buffer *buildTakethisBuffers (Connection cxn, Buffer checkBuffer) ;
303 static void issueQUIT (Connection cxn) ;
304 static void initReadBlockedTimeout (Connection cxn) ;
305 static int prepareWriteWithTimeout (EndPoint endp, Buffer *buffers,
306                                     EndpRWCB done, Connection cxn) ;
307 static void delConnection (Connection cxn) ;
308 static void incrFilter (Connection cxn) ;
309 static void decrFilter (Connection cxn) ;
310 static bool writesNeeded (Connection cxn) ;
311 static void validateConnection (Connection cxn) ;
312 static const char *stateToString (CxnState state) ;
313 
314 static void issueModeStream (EndPoint e, Connection cxn) ;
315 static void issueAuthUser (EndPoint e, Connection cxn) ;
316 static void issueAuthPass (EndPoint e, Connection cxn) ;
317 
318 static void prepareReopenCbk (Connection cxn) ;
319 
320 
321 /* Article queue management routines. */
322 static ArtHolder newArtHolder (Article art) ;
323 static void delArtHolder (ArtHolder artH) ;
324 static bool remArtHolder (ArtHolder art, ArtHolder *head, unsigned int *count) ;
325 static void appendArtHolder (ArtHolder artH, ArtHolder *head, unsigned int *count) ;
326 static ArtHolder artHolderByMsgId (const char *msgid, ArtHolder head) ;
327 
328 static int fudgeFactor (int initVal) ;
329 
330 
331 
332 
333 /***************************************************
334  *
335  * Public functions implementation.
336  *
337  ***************************************************/
338 
339 
cxnConfigLoadCbk(void * data UNUSED)340 int cxnConfigLoadCbk (void *data UNUSED)
341 {
342   long iv ;
343   int rval = 1 ;
344   FILE *fp = (FILE *) data ;
345 
346   if (getInteger (topScope,"max-reconnect-time",&iv,NO_INHERIT))
347     {
348       if (iv < 1)
349         {
350           rval = 0 ;
351           logOrPrint (LOG_ERR,fp,
352                       "ME config: value of %s (%ld) in %s cannot be less"
353                       " than 1. Using %ld", "max-reconnect-time",
354                       iv,"global scope",(long) MAX_RECON_PER);
355           iv = MAX_RECON_PER ;
356         }
357     }
358   else
359     iv = MAX_RECON_PER ;
360   max_reconnect_period = (unsigned int) iv ;
361 
362   if (getInteger (topScope,"initial-reconnect-time",&iv,NO_INHERIT))
363     {
364       if (iv < 1)
365         {
366           rval = 0 ;
367           logOrPrint (LOG_ERR,fp,
368                       "ME config: value of %s (%ld) in %s cannot be less"
369                       " than 1. Using %ld", "initial-reconnect-time",
370                       iv,"global scope",(long)INIT_RECON_PER);
371           iv = INIT_RECON_PER ;
372         }
373     }
374   else
375     iv = INIT_RECON_PER ;
376   init_reconnect_period = (unsigned int) iv ;
377 
378   return rval ;
379 }
380 
381 
382 
383 
384 
385 /*
386  * Create a new Connection object and return it. All fields are
387  * initialized to reasonable values.
388  */
newConnection(Host host,unsigned int id,const char * ipname,unsigned int articleReceiptTimeout,unsigned int portNum,unsigned int respTimeout,unsigned int flushTimeout,double lowPassLow,double lowPassHigh,double lowPassFilter)389 Connection newConnection (Host host,
390                           unsigned int id,
391                           const char *ipname,
392                           unsigned int articleReceiptTimeout,
393                           unsigned int portNum,
394                           unsigned int respTimeout,
395                           unsigned int flushTimeout,
396                           double lowPassLow,
397                           double lowPassHigh,
398 			  double lowPassFilter)
399 {
400   Connection cxn ;
401   bool croak = false ;
402 
403   if (ipname == NULL)
404     {
405       d_printf (1,"NULL ipname in newConnection\n") ;
406       croak = true ;
407     }
408 
409   if (ipname && strlen (ipname) == 0)
410     {
411       d_printf (1,"Empty ipname in newConnection\n") ;
412       croak = true ;
413     }
414 
415   if (croak)
416     return NULL ;
417 
418   cxn = xcalloc (1, sizeof(struct connection_s));
419 
420   cxn->myHost = host ;
421   cxn->myEp = NULL ;
422   cxn->ident = id ;
423 
424   cxn->checkHead = NULL ;
425   cxn->checkRespHead = NULL ;
426   cxn->takeHead = NULL ;
427   cxn->takeRespHead = NULL ;
428 
429   cxn->articleQTotal = 0 ;
430   cxn->missing = NULL ;
431 
432   cxn->respBuffer = newBuffer (BUFFER_SIZE) ;
433   ASSERT (cxn->respBuffer != NULL) ;
434 
435   cxn->ipName = xstrdup (ipname) ;
436   cxn->port = portNum ;
437 
438   /* Time out the higher numbered connections faster */
439   cxn->articleReceiptTimeout = articleReceiptTimeout * 10.0 / (10.0 + id) ;
440   cxn->artReceiptTimerId = 0 ;
441 
442   cxn->readTimeout = respTimeout ;
443   cxn->readBlockedTimerId = 0 ;
444 
445   cxn->writeTimeout = respTimeout ; /* XXX should be a separate value */
446   cxn->writeBlockedTimerId = 0 ;
447 
448   cxn->flushTimeout = fudgeFactor (flushTimeout) ;
449   cxn->flushTimerId = 0 ;
450 
451   cxn->onThreshold = lowPassHigh * lowPassFilter / 100.0 ;
452   cxn->offThreshold = lowPassLow * lowPassFilter / 100.0 ;
453   cxn->lowPassFilter = lowPassFilter;
454 
455   cxn->sleepTimerId = 0 ;
456   cxn->sleepTimeout = init_reconnect_period ;
457 
458   resetConnection (cxn) ;
459 
460   cxn->next = gCxnList ;
461   gCxnList = cxn ;
462   gCxnCount++ ;
463 
464   cxn->state = cxnStartingS ;
465 
466   return cxn ;
467 }
468 
469 
470 
471 
472 
473 /* Create a new endpoint hooked to a non-blocking socket that is trying to
474  * connect to the host info stored in the Connection. On fast machines
475  * connecting locally the connect() may have already succeeded when this
476  * returns, but typically the connect will still be running and when it
477  * completes. The Connection will be notified via a write callback setup by
478  * prepareWrite below. If nothing goes wrong then this will return true
479  * (even if the connect() has not yet completed). If something fails
480  * (hostname lookup etc.) then it returns false (and the Connection is left
481  * in the sleeping state)..
482  *
483  * Pre-state		Reason cxnConnect called
484  * ---------		------------------------
485  * cxnStartingS		Connection owner issued call.
486  * cxnWaitingS		side effect of cxnTakeArticle() call
487  * cxnConnecting	side effect of cxnFlush() call
488  * cxnSleepingS		side effect of reopenTimeoutCbk() call.
489  */
cxnConnect(Connection cxn)490 bool cxnConnect (Connection cxn)
491 {
492   struct sockaddr *cxnAddr;
493   socklen_t len;
494   int fd, rval;
495   const char *src;
496   const char *peerName = hostPeerName (cxn->myHost) ;
497 
498   ASSERT (cxn->myEp == NULL) ;
499 
500   if (!(cxn->state == cxnStartingS ||
501         cxn->state == cxnWaitingS ||
502         cxn->state == cxnFlushingS ||
503         cxn->state == cxnSleepingS))
504     {
505       warn ("%s:%d cxnsleep connection in bad state: %s",
506             hostPeerName (cxn->myHost), cxn->ident,
507             stateToString (cxn->state)) ;
508       cxnSleepOrDie (cxn) ;
509       return false;
510     }
511 
512   if (cxn->state == cxnWaitingS)
513     ASSERT (cxn->articleQTotal == 1) ;
514   else
515     ASSERT (cxn->articleQTotal == 0) ;
516 
517   cxn->state = cxnConnectingS ;
518 
519   cxnAddr = hostIpAddr (cxn->myHost) ;
520 
521   if (cxnAddr == NULL)
522     {
523       cxnSleepOrDie (cxn) ;
524       return false ;
525     }
526 
527   if (cxnAddr->sa_family == AF_INET)
528     {
529       src = hostBindAddr(cxn->myHost) ;
530       len = sizeof(struct sockaddr_in) ;
531     }
532   else
533     {
534       src = hostBindAddr6(cxn->myHost) ;
535 #if HAVE_INET6
536       len = sizeof(struct sockaddr_in6) ;
537 #else
538       /* This should never happen, but the compiler doesn't know that. */
539       len = sizeof(struct sockaddr) ;
540 #endif
541     }
542   if (src && strcmp(src, "none") == 0)
543     src = NULL;
544 
545   fd = network_client_create (cxnAddr->sa_family, SOCK_STREAM, src);
546   if (fd < 0)
547     {
548       syswarn ("%s:%d cxnsleep can't create socket", peerName, cxn->ident) ;
549       d_printf (1,"Can't get a socket: %s\n", strerror (errno)) ;
550 
551       hostIpFailed (cxn->myHost) ;
552       cxnSleepOrDie (cxn) ;
553 
554       return false ;
555     }
556 
557   if (!fdflag_nonblocking (fd, true))
558     {
559       syswarn ("%s:%d cxnsleep can't set socket non-blocking", peerName,
560                cxn->ident) ;
561       close (fd) ;
562 
563       cxnSleepOrDie (cxn) ;
564 
565       return false ;
566     }
567 
568   rval = connect (fd, cxnAddr, len) ;
569   if (rval < 0 && errno != EINPROGRESS)
570     {
571       syswarn ("%s:%d connect", peerName, cxn->ident) ;
572       hostIpFailed (cxn->myHost) ;
573       close (fd) ;
574 
575       cxnSleepOrDie (cxn) ;
576 
577       return false ;
578     }
579 
580   if ((cxn->myEp = newEndPoint (fd)) == NULL)
581     {
582       /* If this happens, then fd was bigger than what select could handle,
583          so endpoint.c refused to create the new object. */
584       close (fd) ;
585       cxnSleepOrDie (cxn) ;
586       return false ;
587     }
588 
589   if (rval < 0)
590     /* when the write callback gets done the connection went through */
591     prepareWrite (cxn->myEp, NULL, NULL, connectionDone, cxn) ;
592   else
593     connectionDone (cxn->myEp, IoDone, NULL, cxn) ;
594 
595   /* connectionDone() could set state to sleeping */
596   return (cxn->state == cxnConnectingS ? true : false) ;
597 }
598 
599 
600 
601 
602 
603 /* Put the Connection into the wait state.
604  *
605  * Pre-state		Reason cxnWait called
606  * ---------		------------------------
607  * cxnStartingS		- Connection owner called cxnWait()
608  * cxnSleepingS		- side effect of cxnFlush() call.
609  * cxnConnectingS	- side effect of cxnFlush() call.
610  * cxnFlushingS		- side effect of receiving response 205
611  * 			  and Connection had no articles when
612  * 			  cxnFlush() was issued.
613  * 			- prepareRead failed.
614  * 			- I/O failed.
615  *
616  */
cxnWait(Connection cxn)617 void cxnWait (Connection cxn)
618 {
619   ASSERT (cxn->state == cxnStartingS ||
620           cxn->state == cxnSleepingS ||
621           cxn->state == cxnConnectingS ||
622           cxn->state == cxnFeedingS ||
623           cxn->state == cxnFlushingS) ;
624   VALIDATE_CONNECTION (cxn) ;
625 
626   abortConnection (cxn) ;
627 
628   cxn->state = cxnWaitingS ;
629 
630   hostCxnWaiting (cxn->myHost,cxn) ;   /* tell our Host we're waiting */
631 }
632 
633 
634 
635 
636 
637 /* Tells the Connection to flush itself (i.e. push out all articles,
638  * issue a QUIT and drop the network connection. If necessary a
639  * reconnect will be done immediately after. Called by the Host, or
640  * by the timer callback.
641  *
642  * Pre-state		Reason cxnFlush called
643  * ---------		------------------------
644  * ALL (except cxnDeadS	- Connection owner called cxnFlush()
645  *  and cxnStartingS)
646  * cxnFeedingS		- side effect of flushCxnCbk() call.
647  */
cxnFlush(Connection cxn)648 void cxnFlush (Connection cxn)
649 {
650   ASSERT (cxn != NULL) ;
651   ASSERT (cxn->state != cxnStartingS) ;
652   ASSERT (cxn->state != cxnDeadS) ;
653   VALIDATE_CONNECTION (cxn) ;
654 
655   switch (cxn->state)
656     {
657       case cxnSleepingS:
658         cxnWait (cxn) ;
659         break ;
660 
661       case cxnConnectingS:
662         cxnWait (cxn) ;
663         cxnConnect (cxn) ;
664         break ;
665 
666       case cxnIdleTimeoutS:
667       case cxnIdleS:
668         ASSERT (cxn->articleQTotal == 0) ;
669         if (cxn->state != cxnIdleTimeoutS)
670           clearTimer (cxn->artReceiptTimerId) ;
671         clearTimer (cxn->flushTimerId) ;
672         cxn->state = cxnFlushingS ;
673         issueQUIT (cxn) ;
674         break ;
675 
676       case cxnClosingS:
677       case cxnFlushingS:
678       case cxnWaitingS:
679         if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
680           issueQUIT (cxn) ;
681         break ;
682 
683       case cxnFeedingS:
684         /* we only reconnect immediately if we're not idle when cxnFlush()
685            is called. */
686         if (!cxn->immedRecon)
687           {
688             cxn->immedRecon = (cxn->articleQTotal > 0 ? true : false) ;
689             d_printf (1,"%s:%d immediate reconnect for a cxnFlush()\n",
690                      hostPeerName (cxn->myHost), cxn->ident) ;
691           }
692 
693         clearTimer (cxn->flushTimerId) ;
694 
695         cxn->state = cxnFlushingS ;
696 
697         if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
698           issueQUIT (cxn) ;
699         break ;
700 
701       default:
702         die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
703     }
704 }
705 
706 
707 
708 /*
709  * Tells the Connection to dump all articles that are queued and to issue a
710  * QUIT as quickly as possible. Much like cxnClose, except queued articles
711  * are not sent, but are given back to the Host.
712  */
cxnTerminate(Connection cxn)713 void cxnTerminate (Connection cxn)
714 {
715   ASSERT (cxn != NULL) ;
716   ASSERT (cxn->state != cxnDeadS) ;
717   ASSERT (cxn->state != cxnStartingS) ;
718   VALIDATE_CONNECTION (cxn) ;
719 
720   switch (cxn->state)
721     {
722       case cxnFeedingS:
723         d_printf (1,"%s:%d Issuing terminate\n",
724                  hostPeerName (cxn->myHost), cxn->ident) ;
725 
726         clearTimer (cxn->flushTimerId) ;
727 
728         cxn->state = cxnClosingS ;
729 
730         deferQueuedArticles (cxn) ;
731         if (cxn->articleQTotal == 0)
732           issueQUIT (cxn) ; /* send out the QUIT if we can */
733         break ;
734 
735       case cxnIdleTimeoutS:
736       case cxnIdleS:
737         ASSERT (cxn->articleQTotal == 0) ;
738         if (cxn->state != cxnIdleTimeoutS)
739           clearTimer (cxn->artReceiptTimerId) ;
740         clearTimer (cxn->flushTimerId) ;
741         cxn->state = cxnClosingS ;
742         issueQUIT (cxn) ;
743         break ;
744 
745       case cxnFlushingS: /* we are in the middle of a periodic close. */
746         d_printf (1,"%s:%d Connection already being flushed\n",
747                  hostPeerName (cxn->myHost),cxn->ident);
748         cxn->state = cxnClosingS ;
749         if (cxn->articleQTotal == 0)
750           issueQUIT (cxn) ; /* send out the QUIT if we can */
751         break ;
752 
753       case cxnClosingS:
754         d_printf (1,"%s:%d Connection already closing\n",
755                  hostPeerName (cxn->myHost),cxn->ident) ;
756         break ;
757 
758       case cxnWaitingS:
759       case cxnConnectingS:
760       case cxnSleepingS:
761         cxnDead (cxn) ;
762         break ;
763 
764       default:
765         die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
766     }
767 
768   VALIDATE_CONNECTION (cxn) ;
769 
770   if (cxn->state == cxnDeadS)
771     {
772       d_printf (1,"%s:%d Deleting connection\n",hostPeerName (cxn->myHost),
773                cxn->ident) ;
774 
775       delConnection (cxn) ;
776     }
777 }
778 
779 
780 
781 /* Tells the Connection to do a disconnect and then when it is
782  * disconnected to delete itself.
783  *
784  * Pre-state		Reason cxnClose called
785  * ---------		------------------------
786  * ALL (except cxnDeadS	- Connecton owner called directly.
787  * and cxnStartingS).
788  */
cxnClose(Connection cxn)789 void cxnClose (Connection cxn)
790 {
791   ASSERT (cxn != NULL) ;
792   ASSERT (cxn->state != cxnDeadS) ;
793   ASSERT (cxn->state != cxnStartingS) ;
794   VALIDATE_CONNECTION (cxn) ;
795 
796   switch (cxn->state)
797     {
798       case cxnFeedingS:
799         d_printf (1,"%s:%d Issuing disconnect\n",
800                  hostPeerName (cxn->myHost), cxn->ident) ;
801 
802         clearTimer (cxn->flushTimerId) ;
803 
804         cxn->state = cxnClosingS ;
805 
806         if (cxn->articleQTotal == 0)
807           issueQUIT (cxn) ; /* send out the QUIT if we can */
808         break ;
809 
810       case cxnIdleS:
811       case cxnIdleTimeoutS:
812         ASSERT (cxn->articleQTotal == 0) ;
813         if (cxn->state != cxnIdleTimeoutS)
814           clearTimer (cxn->artReceiptTimerId) ;
815         clearTimer (cxn->flushTimerId) ;
816         cxn->state = cxnClosingS ;
817         issueQUIT (cxn) ;
818         break ;
819 
820       case cxnFlushingS: /* we are in the middle of a periodic close. */
821         d_printf (1,"%s:%d Connection already being flushed\n",
822                  hostPeerName (cxn->myHost),cxn->ident);
823         cxn->state = cxnClosingS ;
824         if (cxn->articleQTotal == 0)
825           issueQUIT (cxn) ; /* send out the QUIT if we can */
826         break ;
827 
828       case cxnClosingS:
829         d_printf (1,"%s:%d Connection already closing\n",
830                  hostPeerName (cxn->myHost),cxn->ident) ;
831         break ;
832 
833       case cxnWaitingS:
834       case cxnConnectingS:
835       case cxnSleepingS:
836         cxnDead (cxn) ;
837         break ;
838 
839       default:
840         die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
841     }
842 
843   VALIDATE_CONNECTION (cxn) ;
844 
845   if (cxn->state == cxnDeadS)
846     {
847       d_printf (1,"%s:%d Deleting connection\n",hostPeerName (cxn->myHost),
848                cxn->ident) ;
849 
850       delConnection (cxn) ;
851     }
852 }
853 
854 
855 
856 
857 
858 /* This is what the Host calls to get us to tranfer an article. If
859  * we're running the IHAVE sequence, then we can't take it if we've
860  * got an article already. If we're running the CHECK/TAKETHIS
861  * sequence, then we'll take as many as we can (up to our MAXCHECK
862  * limit).
863  */
cxnTakeArticle(Connection cxn,Article art)864 bool cxnTakeArticle (Connection cxn, Article art)
865 {
866   bool rval = true ;
867 
868   ASSERT (cxn != NULL) ;
869   VALIDATE_CONNECTION (cxn) ;
870 
871   if ( !cxnQueueArticle (cxn,art) ) /* might change cxnIdleS to cxnFeedingS */
872     return false ;
873 
874   if (!(cxn->state == cxnConnectingS ||
875         cxn->state == cxnFeedingS ||
876         cxn->state == cxnWaitingS))
877     {
878       warn ("%s:%d cxnsleep connection in bad state: %s",
879             hostPeerName (cxn->myHost), cxn->ident,
880             stateToString (cxn->state)) ;
881       cxnSleepOrDie (cxn) ;
882       return false ;
883     }
884 
885   if (cxn->state != cxnWaitingS) /* because articleQTotal == 1 */
886     VALIDATE_CONNECTION (cxn) ;
887   else
888     ASSERT (cxn->articleQTotal == 1) ;
889 
890   switch (cxn->state)
891     {
892       case cxnWaitingS:
893         cxnConnect (cxn) ;
894         break ;
895 
896       case cxnFeedingS:
897         doSomeWrites (cxn) ;
898         break ;
899 
900       case cxnConnectingS:
901         break ;
902 
903       default:
904         die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
905     }
906 
907   return rval ;
908 }
909 
910 
911 
912 
913 
914 /* if there's room in the Connection then stick the article on the
915  * queue, otherwise return false.
916  */
cxnQueueArticle(Connection cxn,Article art)917 bool cxnQueueArticle (Connection cxn, Article art)
918 {
919   ArtHolder newArt ;
920   bool rval = false ;
921 
922   ASSERT (cxn != NULL) ;
923   ASSERT (cxn->state != cxnStartingS) ;
924   ASSERT (cxn->state != cxnDeadS) ;
925   VALIDATE_CONNECTION (cxn) ;
926 
927   switch (cxn->state)
928     {
929       case cxnClosingS:
930         d_printf (5,"%s:%d Refusing article due to closing\n",
931                  hostPeerName (cxn->myHost),cxn->ident) ;
932         break ;
933 
934       case cxnFlushingS:
935         d_printf (5,"%s:%d Refusing article due to flushing\n",
936                  hostPeerName (cxn->myHost),cxn->ident) ;
937         break ;
938 
939       case cxnSleepingS:
940         d_printf (5,"%s:%d Refusing article due to sleeping\n",
941                  hostPeerName (cxn->myHost),cxn->ident) ;
942         break ;
943 
944       case cxnWaitingS:
945         rval = true ;
946         newArt = newArtHolder (art) ;
947         appendArtHolder (newArt, &cxn->checkHead, &cxn->articleQTotal) ;
948         break ;
949 
950       case cxnConnectingS:
951         if (cxn->articleQTotal != 0)
952           break ;
953         rval = true ;
954         newArt = newArtHolder (art) ;
955         appendArtHolder (newArt, &cxn->checkHead, &cxn->articleQTotal) ;
956         break ;
957 
958       case cxnIdleS:
959       case cxnFeedingS:
960         if (cxn->articleQTotal >= cxn->maxCheck)
961           d_printf (5, "%s:%d Refusing article due to articleQTotal >= maxCheck (%d > %d)\n",
962                    hostPeerName (cxn->myHost), cxn->ident,
963                    cxn->articleQTotal, cxn->maxCheck) ;
964         else
965           {
966             rval = true ;
967             newArt = newArtHolder (art) ;
968             if (cxn->needsChecks)
969               appendArtHolder (newArt, &cxn->checkHead, &cxn->articleQTotal) ;
970             else
971               appendArtHolder (newArt, &cxn->takeHead, &cxn->articleQTotal) ;
972             if (cxn->state == cxnIdleS)
973               {
974                 cxn->state = cxnFeedingS ;
975                 clearTimer (cxn->artReceiptTimerId) ;
976               }
977           }
978         break ;
979 
980       default:
981         die ("Invalid state: %s\n", stateToString (cxn->state)) ;
982     }
983 
984   if (rval)
985     {
986       d_printf (5,"%s:%d accepting article %s\n",hostPeerName (cxn->myHost),
987                cxn->ident,artMsgId (art)) ;
988 
989       cxn->artsTaken++ ;
990     }
991 
992   return rval ;
993 }
994 
995 
996 
997 
998 
999 /*
1000  * Generate a log message for activity.  Usually called by the Connection's
1001  * owner.
1002  */
cxnLogStats(Connection cxn,bool final)1003 void cxnLogStats (Connection cxn, bool final)
1004 {
1005   const char *peerName ;
1006   time_t now = theTime() ;
1007 
1008   ASSERT (cxn != NULL) ;
1009 
1010   /* Only log stats when in one of these three states. */
1011   switch (cxn->state)
1012     {
1013       case cxnFeedingS:
1014       case cxnFlushingS:
1015       case cxnClosingS:
1016         break ;
1017 
1018       default:
1019         return ;
1020     }
1021 
1022   peerName = hostPeerName (cxn->myHost) ;
1023 
1024   /* Log a checkpoint in any case. */
1025   notice("%s:%d checkpoint seconds %ld offered %d accepted %d refused %d"
1026          " rejected %d accsize %.0f rejsize %.0f",
1027          peerName, cxn->ident,
1028          (long) (now - cxn->timeCon_checkpoint),
1029          cxn->checksIssued - cxn->checksIssued_checkpoint,
1030          cxn->takesOkayed - cxn->takesOkayed_checkpoint,
1031          cxn->checksRefused - cxn->checksRefused_checkpoint,
1032          cxn->takesRejected - cxn->takesRejected_checkpoint,
1033          cxn->takesSizeOkayed - cxn->takesSizeOkayed_checkpoint,
1034          cxn->takesSizeRejected - cxn->takesSizeRejected_checkpoint);
1035 
1036   if (final) {
1037     notice("%s:%d final seconds %ld offered %d accepted %d refused %d"
1038            " rejected %d accsize %.0f rejsize %.0f",
1039            peerName, cxn->ident, (long) (now - cxn->timeCon),
1040            cxn->checksIssued, cxn->takesOkayed, cxn->checksRefused,
1041            cxn->takesRejected, cxn->takesSizeOkayed, cxn->takesSizeRejected);
1042 
1043     cxn->artsTaken = 0;
1044     cxn->checksIssued = 0;
1045     cxn->checksRefused = 0;
1046     cxn->takesRejected = 0;
1047     cxn->takesOkayed = 0;
1048     cxn->takesSizeRejected = 0;
1049     cxn->takesSizeOkayed = 0;
1050 
1051       if (cxn->timeCon > 0) {
1052         cxn->timeCon = theTime();
1053       }
1054   }
1055 
1056   cxn->timeCon_checkpoint = now;
1057   cxn->checksIssued_checkpoint = cxn->checksIssued;
1058   cxn->takesOkayed_checkpoint = cxn->takesOkayed;
1059   cxn->checksRefused_checkpoint = cxn->checksRefused;
1060   cxn->takesRejected_checkpoint = cxn->takesRejected;
1061   cxn->takesSizeOkayed_checkpoint = cxn->takesSizeOkayed;
1062   cxn->takesSizeRejected_checkpoint = cxn->takesSizeRejected;
1063 }
1064 
1065 
1066 
1067 
1068 
1069 /*
1070  * return the number of articles the connection will accept.
1071  */
cxnQueueSpace(Connection cxn)1072 size_t cxnQueueSpace (Connection cxn)
1073 {
1074   int rval = 0 ;
1075 
1076   ASSERT (cxn != NULL) ;
1077 
1078   if (cxn->state == cxnFeedingS ||
1079       cxn->state == cxnIdleS ||
1080       cxn->state == cxnConnectingS ||
1081       cxn->state == cxnWaitingS)
1082     rval = cxn->maxCheck - cxn->articleQTotal ;
1083 
1084   return rval ;
1085 }
1086 
1087 
1088 
1089 
1090 
1091 /*
1092  * Print info on all the connections that currently exist.
1093  */
gPrintCxnInfo(FILE * fp,unsigned int indentAmt)1094 void gPrintCxnInfo (FILE *fp, unsigned int indentAmt)
1095 {
1096   char indent [INDENT_BUFFER_SIZE] ;
1097   unsigned int i ;
1098   Connection cxn ;
1099 
1100   for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
1101     indent [i] = ' ' ;
1102   indent [i] = '\0' ;
1103 
1104   fprintf (fp,"%sGlobal Connection list : (count %u) {\n",
1105            indent,gCxnCount) ;
1106   for (cxn = gCxnList ; cxn != NULL ; cxn = cxn->next)
1107     printCxnInfo (cxn,fp,indentAmt + INDENT_INCR) ;
1108   fprintf (fp,"%s}\n",indent) ;
1109 }
1110 
1111 
1112 
1113 
1114 
1115 /*
1116  * Print the info about the given connection.
1117  */
printCxnInfo(Connection cxn,FILE * fp,unsigned int indentAmt)1118 void printCxnInfo (Connection cxn, FILE *fp, unsigned int indentAmt)
1119 {
1120   char indent [INDENT_BUFFER_SIZE] ;
1121   unsigned int i ;
1122   ArtHolder artH ;
1123 
1124   for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
1125     indent [i] = ' ' ;
1126   indent [i] = '\0' ;
1127 
1128   fprintf (fp,"%sConnection : %p {\n",indent, (void *) cxn) ;
1129   fprintf (fp,"%s    host : %p\n",indent, (void *) cxn->myHost) ;
1130   fprintf (fp,"%s    endpoint : %p\n",indent, (void *) cxn->myEp) ;
1131   fprintf (fp,"%s    state : %s\n",indent, stateToString (cxn->state)) ;
1132   fprintf (fp,"%s    ident : %u\n",indent,cxn->ident) ;
1133   fprintf (fp,"%s    ip-name : %s\n", indent, cxn->ipName) ;
1134   fprintf (fp,"%s    port-number : %u\n",indent,cxn->port) ;
1135   fprintf (fp,"%s    max-checks : %u\n",indent,cxn->maxCheck) ;
1136   fprintf (fp,"%s    does-streaming : %s\n",indent,
1137            boolToString (cxn->doesStreaming)) ;
1138   fprintf (fp,"%s    authenticated : %s\n",indent,
1139            boolToString (cxn->authenticated)) ;
1140   fprintf (fp,"%s    quitWasIssued : %s\n",indent,
1141            boolToString (cxn->quitWasIssued)) ;
1142   fprintf (fp,"%s    needs-checks : %s\n",indent,
1143            boolToString (cxn->needsChecks)) ;
1144 
1145   fprintf (fp,"%s    time-connected : %ld\n",indent,(long) cxn->timeCon) ;
1146   fprintf (fp,"%s    articles from INN : %u\n",indent,cxn->artsTaken) ;
1147   fprintf (fp,"%s    articles offered : %u\n",indent,
1148            cxn->checksIssued) ;
1149   fprintf (fp,"%s    articles refused : %u\n",indent,
1150            cxn->checksRefused) ;
1151   fprintf (fp,"%s    articles rejected : %u\n",indent,
1152            cxn->takesRejected) ;
1153   fprintf (fp,"%s    articles accepted : %u\n",indent,
1154            cxn->takesOkayed) ;
1155   fprintf (fp,"%s    low-pass upper limit : %0.6f\n", indent,
1156            cxn->onThreshold) ;
1157   fprintf (fp,"%s    low-pass lower limit : %0.6f\n", indent,
1158            cxn->offThreshold) ;
1159   fprintf (fp,"%s    low-pass filter tc : %0.6f\n", indent,
1160            cxn->lowPassFilter) ;
1161   fprintf (fp,"%s    low-pass filter : %0.6f\n", indent,
1162            cxn->filterValue) ;
1163 
1164   fprintf (fp,"%s    article-timeout : %u\n",indent,cxn->articleReceiptTimeout) ;
1165   fprintf (fp,"%s    article-callback : %d\n",indent,cxn->artReceiptTimerId) ;
1166 
1167   fprintf (fp,"%s    response-timeout : %u\n",indent,cxn->readTimeout) ;
1168   fprintf (fp,"%s    response-callback : %d\n",indent,cxn->readBlockedTimerId) ;
1169 
1170   fprintf (fp,"%s    write-timeout : %u\n",indent,cxn->writeTimeout) ;
1171   fprintf (fp,"%s    write-callback : %d\n",indent,cxn->writeBlockedTimerId) ;
1172 
1173   fprintf (fp,"%s    flushTimeout : %u\n",indent,cxn->flushTimeout) ;
1174   fprintf (fp,"%s    flushTimerId : %d\n",indent,cxn->flushTimerId) ;
1175 
1176   fprintf (fp,"%s    reopen wait : %u\n",indent,cxn->sleepTimeout) ;
1177   fprintf (fp,"%s    reopen id : %d\n",indent,cxn->sleepTimerId) ;
1178 
1179   fprintf (fp,"%s    CHECK queue {\n",indent) ;
1180   for (artH = cxn->checkHead ; artH != NULL ; artH = artH->next)
1181     printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1182   fprintf (fp,"%s    }\n",indent) ;
1183 
1184   fprintf (fp,"%s    CHECK Response queue {\n",indent) ;
1185   for (artH = cxn->checkRespHead ; artH != NULL ; artH = artH->next)
1186     printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1187   fprintf (fp,"%s    }\n",indent) ;
1188 
1189   fprintf (fp,"%s    TAKE queue {\n",indent) ;
1190   for (artH = cxn->takeHead ; artH != NULL ; artH = artH->next)
1191     printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1192   fprintf (fp,"%s    }\n",indent) ;
1193 
1194   fprintf (fp,"%s    TAKE response queue {\n",indent) ;
1195   for (artH = cxn->takeRespHead ; artH != NULL ; artH = artH->next)
1196     printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1197   fprintf (fp,"%s    }\n",indent) ;
1198 
1199   fprintf (fp,"%s    response buffer {\n",indent) ;
1200   printBufferInfo (cxn->respBuffer,fp,indentAmt + INDENT_INCR) ;
1201   fprintf (fp,"%s    }\n",indent) ;
1202 
1203   fprintf (fp,"%s}\n",indent) ;
1204 }
1205 
1206 
1207 
1208 
1209 
1210 /*
1211  * Return whether the connection will accept articles.
1212  */
cxnCheckstate(Connection cxn)1213 bool cxnCheckstate (Connection cxn)
1214 {
1215   bool rval = false ;
1216 
1217   ASSERT (cxn != NULL) ;
1218 
1219   if (cxn->state == cxnFeedingS ||
1220       cxn->state == cxnIdleS ||
1221       cxn->state == cxnConnectingS)
1222     rval = true ;
1223 
1224   return rval ;
1225 }
1226 
1227 
1228 
1229 
1230 
1231 /**********************************************************************/
1232 /**                       STATIC PRIVATE FUNCTIONS                   **/
1233 /**********************************************************************/
1234 
1235 
1236 /*
1237  * ENDPOINT CALLBACK AREA.
1238  *
1239  * All the functions in this next section are callbacks fired by the
1240  * EndPoint objects/class (either timers or i/o completion callbacks)..
1241  */
1242 
1243 
1244 /*
1245  * this is the first stage of the NNTP FSM. This function is called
1246  * when the tcp/ip network connection is setup and we should get
1247  * ready to read the banner message. When this function returns the
1248  * state of the Connection will still be cxnConnectingS unless
1249  * something broken, in which case it probably went into the
1250  * cxnSleepingS state.
1251  */
connectionDone(EndPoint e,IoStatus i,Buffer * b,void * d)1252 static void connectionDone (EndPoint e, IoStatus i, Buffer *b, void *d)
1253 {
1254   Buffer *readBuffers ;
1255   Connection cxn = (Connection) d ;
1256   const char *peerName ;
1257   int optval;
1258   socklen_t size ;
1259 
1260   ASSERT (b == NULL) ;
1261   ASSERT (cxn->state == cxnConnectingS) ;
1262   ASSERT (!writeIsPending (cxn->myEp)) ;
1263 
1264   size = sizeof (optval) ;
1265   peerName = hostPeerName (cxn->myHost) ;
1266 
1267   if (i != IoDone)
1268     {
1269       errno = endPointErrno (e) ;
1270       syswarn ("%s:%d cxnsleep i/o failed", peerName, cxn->ident) ;
1271 
1272       cxnSleepOrDie (cxn) ;
1273     }
1274   else if (getsockopt (endPointFd (e), SOL_SOCKET, SO_ERROR,
1275                        (char *) &optval, &size) != 0)
1276     {
1277       /* This is bad. Can't even get the SO_ERROR value out of the socket */
1278       syswarn ("%s:%d cxnsleep internal getsockopt", peerName, cxn->ident) ;
1279 
1280       cxnSleepOrDie (cxn) ;
1281     }
1282   else if (optval != 0)
1283     {
1284       /* if the connect failed then the only way to know is by getting
1285          the SO_ERROR value out of the socket. */
1286       errno = optval ;
1287       syswarn ("%s:%d cxnsleep connect", peerName, cxn->ident) ;
1288       hostIpFailed (cxn->myHost) ;
1289 
1290       cxnSleepOrDie (cxn) ;
1291     }
1292   else
1293     {
1294       readBuffers = makeBufferArray (bufferTakeRef (cxn->respBuffer), NULL) ;
1295 
1296       if ( !prepareRead (e, readBuffers, getBanner, cxn, 1) )
1297         {
1298           warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1299 
1300           cxnSleepOrDie (cxn) ;
1301         }
1302       else
1303         {
1304           initReadBlockedTimeout (cxn) ;
1305 
1306           /* set up the callback for closing down the connection at regular
1307              intervals (due to problems with long running nntpd). */
1308           if (cxn->flushTimeout > 0)
1309             cxn->flushTimerId = prepareSleep (flushCxnCbk,
1310                                               cxn->flushTimeout,cxn) ;
1311 
1312           /* The state doesn't change yet until we've read the banner and
1313              tried the MODE STREAM command. */
1314         }
1315     }
1316   VALIDATE_CONNECTION (cxn) ;
1317 }
1318 
1319 
1320 /*
1321  * This is called when we are so far in the connection setup that
1322  * we're confident it'll work.  If the connection is IPv6, remove
1323  * the IPv4 addresses from the address list.
1324  */
connectionIfIpv6DeleteIpv4Addr(Connection cxn)1325 static void connectionIfIpv6DeleteIpv4Addr (Connection cxn)
1326 {
1327   union {
1328     struct sockaddr sa;
1329     struct sockaddr_storage ss;
1330   } u;
1331   socklen_t len = sizeof(u);
1332 
1333   if (getpeername (endPointFd (cxn->myEp), &u.sa, &len) < 0)
1334     return;
1335   if (u.sa.sa_family == AF_INET)
1336     return;
1337 
1338   hostDeleteIpv4Addr (cxn->myHost);
1339 }
1340 
1341 
1342 /*
1343  * Called when the banner message has been read off the wire and is
1344  * in the buffer(s). When this function returns the state of the
1345  * Connection will still be cxnConnectingS unless something broken,
1346  * in which case it probably went into the cxnSleepiongS state.
1347  */
getBanner(EndPoint e,IoStatus i,Buffer * b,void * d)1348 static void getBanner (EndPoint e, IoStatus i, Buffer *b, void *d)
1349 {
1350   Buffer *readBuffers ;
1351   Connection cxn = (Connection) d ;
1352   char *p = bufferBase (b[0]) ;
1353   int code ;
1354   bool isOk = false ;
1355   const char *peerName ;
1356   char *rest ;
1357 
1358   ASSERT (e == cxn->myEp) ;
1359   ASSERT (b[0] == cxn->respBuffer) ;
1360   ASSERT (b[1] == NULL) ;
1361   ASSERT (cxn->state == cxnConnectingS) ;
1362   ASSERT (!writeIsPending (cxn->myEp));
1363 
1364 
1365   peerName = hostPeerName (cxn->myHost) ;
1366 
1367   bufferAddNullByte (b[0]) ;
1368 
1369   if (i != IoDone)
1370     {
1371       errno = endPointErrno (cxn->myEp) ;
1372       syswarn ("%s:%d cxnsleep can't read banner", peerName, cxn->ident) ;
1373       hostIpFailed (cxn->myHost) ;
1374 
1375       cxnSleepOrDie (cxn) ;
1376     }
1377   else if (strchr (p, '\n') == NULL)
1378     {                           /* partial read. expand buffer and retry */
1379       expandBuffer (b[0], BUFFER_EXPAND_AMOUNT) ;
1380       readBuffers = makeBufferArray (bufferTakeRef (b[0]), NULL) ;
1381 
1382       if ( !prepareRead (e, readBuffers, getBanner, cxn, 1) )
1383         {
1384           warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1385 
1386           cxnSleepOrDie (cxn) ;
1387         }
1388     }
1389   else if ( !getNntpResponse (p, &code, &rest) )
1390     {
1391       trim_ws (p) ;
1392 
1393       warn ("%s:%d cxnsleep response format: %s", peerName, cxn->ident, p) ;
1394 
1395       cxnSleepOrDie (cxn) ;
1396     }
1397   else
1398     {
1399       trim_ws (p) ;
1400 
1401       switch (code)
1402         {
1403           case 200:             /* normal */
1404           case 201:             /* can transfer but not post -- old nntpd */
1405             isOk = true ;
1406             break ;
1407 
1408           case 400:
1409             cxnSleepOrDie (cxn) ;
1410             hostIpFailed (cxn->myHost) ;
1411             hostCxnBlocked (cxn->myHost, cxn, rest) ;
1412             break ;
1413 
1414           case 502:
1415             warn ("%s:%d cxnsleep no permission to talk: %s", peerName,
1416                   cxn->ident, p) ;
1417             cxnSleepOrDie (cxn) ;
1418             hostIpFailed (cxn->myHost) ;
1419             hostCxnBlocked (cxn->myHost, cxn, rest) ;
1420             break ;
1421 
1422           default:
1423             warn ("%s:%d cxnsleep response unknown banner: %d %s", peerName,
1424                   cxn->ident, code, p) ;
1425             d_printf (1,"%s:%d Unknown response code: %d: %s\n",
1426                      hostPeerName (cxn->myHost),cxn->ident, code, p) ;
1427             cxnSleepOrDie (cxn) ;
1428             hostIpFailed (cxn->myHost) ;
1429             hostCxnBlocked (cxn->myHost, cxn, rest) ;
1430             break ;
1431         }
1432 
1433       if ( isOk )
1434 	{
1435           /* If we got this far and the connection is IPv6, remove
1436              the IPv4 addresses from the address list. */
1437           connectionIfIpv6DeleteIpv4Addr (cxn);
1438 
1439 	  if (hostUsername (cxn->myHost) != NULL
1440 	      && hostPassword (cxn->myHost) != NULL)
1441 	    issueAuthUser (e,cxn);
1442 	  else
1443 	    issueModeStream (e,cxn);
1444 	}
1445     }
1446   freeBufferArray (b) ;
1447 }
1448 
1449 
1450 
1451 
1452 
issueAuthUser(EndPoint e,Connection cxn)1453 static void issueAuthUser (EndPoint e, Connection cxn)
1454 {
1455   Buffer authUserBuffer;
1456   Buffer *authUserCmdBuffers,*readBuffers;
1457   size_t lenBuff = 0 ;
1458   char *t ;
1459 
1460   /* 17 == strlen("AUTHINFO USER \r\n\0") */
1461   lenBuff = (17 + strlen (hostUsername (cxn->myHost))) ;
1462   authUserBuffer = newBuffer (lenBuff) ;
1463   t = bufferBase (authUserBuffer) ;
1464 
1465   sprintf (t, "AUTHINFO USER %s\r\n", hostUsername (cxn->myHost)) ;
1466   bufferSetDataSize (authUserBuffer, strlen (t)) ;
1467 
1468   authUserCmdBuffers = makeBufferArray (authUserBuffer, NULL) ;
1469 
1470   if ( !prepareWriteWithTimeout (e, authUserCmdBuffers, authUserIssued,
1471 				 cxn) )
1472     {
1473       die ("%s:%d fatal prepare write for authinfo user failed",
1474            hostPeerName (cxn->myHost), cxn->ident) ;
1475     }
1476 
1477   bufferSetDataSize (cxn->respBuffer, 0) ;
1478 
1479   readBuffers = makeBufferArray (bufferTakeRef(cxn->respBuffer),NULL);
1480 
1481   if ( !prepareRead (e, readBuffers, getAuthUserResponse, cxn, 1) )
1482     {
1483       warn ("%s:%d cxnsleep prepare read failed", hostPeerName (cxn->myHost),
1484             cxn->ident) ;
1485       freeBufferArray (readBuffers) ;
1486       cxnSleepOrDie (cxn) ;
1487     }
1488 
1489 }
1490 
1491 
1492 
1493 
1494 
1495 
issueAuthPass(EndPoint e,Connection cxn)1496 static void issueAuthPass (EndPoint e, Connection cxn)
1497 {
1498   Buffer authPassBuffer;
1499   Buffer *authPassCmdBuffers,*readBuffers;
1500   size_t lenBuff = 0 ;
1501   char *t ;
1502 
1503   /* 17 == strlen("AUTHINFO PASS \r\n\0") */
1504   lenBuff = (17 + strlen (hostPassword (cxn->myHost))) ;
1505   authPassBuffer = newBuffer (lenBuff) ;
1506   t = bufferBase (authPassBuffer) ;
1507 
1508   sprintf (t, "AUTHINFO PASS %s\r\n", hostPassword (cxn->myHost)) ;
1509   bufferSetDataSize (authPassBuffer, strlen (t)) ;
1510 
1511   authPassCmdBuffers = makeBufferArray (authPassBuffer, NULL) ;
1512 
1513   if ( !prepareWriteWithTimeout (e, authPassCmdBuffers, authPassIssued,
1514 				 cxn) )
1515     {
1516       die ("%s:%d fatal prepare write for authinfo pass failed",
1517            hostPeerName (cxn->myHost), cxn->ident) ;
1518     }
1519 
1520   bufferSetDataSize (cxn->respBuffer, 0) ;
1521 
1522   readBuffers = makeBufferArray (bufferTakeRef(cxn->respBuffer),NULL);
1523 
1524   if ( !prepareRead (e, readBuffers, getAuthPassResponse, cxn, 1) )
1525     {
1526       warn ("%s:%d cxnsleep prepare read failed", hostPeerName (cxn->myHost),
1527             cxn->ident) ;
1528       freeBufferArray (readBuffers) ;
1529       cxnSleepOrDie (cxn) ;
1530     }
1531 
1532 }
1533 
1534 
1535 
1536 
1537 
1538 
issueModeStream(EndPoint e,Connection cxn)1539 static void issueModeStream (EndPoint e, Connection cxn)
1540 {
1541   Buffer *modeCmdBuffers,*readBuffers ;
1542   Buffer modeBuffer ;
1543   char *p;
1544 
1545 #define  MODE_CMD "MODE STREAM\r\n"
1546 
1547   modeBuffer = newBuffer (strlen (MODE_CMD) + 1) ;
1548   p = bufferBase (modeBuffer) ;
1549 
1550   /* now issue the MODE STREAM command */
1551   d_printf (1, "%s:%d Issuing the streaming command\n",
1552             hostPeerName (cxn->myHost), cxn->ident) ;
1553 
1554   strlcpy (p, MODE_CMD, bufferSize (modeBuffer)) ;
1555 
1556   bufferSetDataSize (modeBuffer, strlen (p)) ;
1557 
1558   modeCmdBuffers = makeBufferArray (modeBuffer, NULL) ;
1559 
1560   if ( !prepareWriteWithTimeout (e, modeCmdBuffers, modeCmdIssued,
1561 				 cxn) )
1562     {
1563       die ("%s:%d fatal prepare write for mode stream failed",
1564            hostPeerName (cxn->myHost), cxn->ident) ;
1565     }
1566 
1567   bufferSetDataSize (cxn->respBuffer, 0) ;
1568 
1569   readBuffers = makeBufferArray (bufferTakeRef(cxn->respBuffer),NULL);
1570 
1571   if ( !prepareRead (e, readBuffers, getModeResponse, cxn, 1) )
1572     {
1573       warn ("%s:%d cxnsleep prepare read failed", hostPeerName (cxn->myHost),
1574             cxn->ident) ;
1575       freeBufferArray (readBuffers) ;
1576       cxnSleepOrDie (cxn) ;
1577     }
1578 }
1579 
1580 
1581 
1582 
1583 
1584 /*
1585  *
1586  */
getAuthUserResponse(EndPoint e,IoStatus i,Buffer * b,void * d)1587 static void getAuthUserResponse (EndPoint e, IoStatus i, Buffer *b, void *d)
1588 {
1589   Connection cxn = (Connection) d ;
1590   int code ;
1591   char *p = bufferBase (b[0]) ;
1592   Buffer *buffers ;
1593   const char *peerName ;
1594 
1595   ASSERT (e == cxn->myEp) ;
1596   ASSERT (b [0] == cxn->respBuffer) ;
1597   ASSERT (b [1] == NULL) ;      /* only ever one buffer on this read */
1598   ASSERT (cxn->state == cxnConnectingS) ;
1599   VALIDATE_CONNECTION (cxn) ;
1600 
1601   peerName = hostPeerName (cxn->myHost) ;
1602 
1603   bufferAddNullByte (b[0]) ;
1604 
1605   d_printf (1,"%s:%d Processing authinfo user response: %s", /* no NL */
1606 	    hostPeerName (cxn->myHost), cxn->ident, p) ;
1607 
1608   if (i == IoDone && writeIsPending (cxn->myEp))
1609     {
1610       /* badness. should never happen */
1611       warn ("%s:%d cxnsleep authinfo command still pending", peerName,
1612             cxn->ident) ;
1613 
1614       cxnSleepOrDie (cxn) ;
1615     }
1616   else if (i != IoDone)
1617     {
1618       if (i != IoEOF)
1619 	{
1620 	  errno = endPointErrno (e) ;
1621           syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1622 	}
1623       cxnSleepOrDie (cxn) ;
1624     }
1625   else if (strchr (p, '\n') == NULL)
1626     {
1627       /* partial read */
1628       expandBuffer (b [0], BUFFER_EXPAND_AMOUNT) ;
1629 
1630       buffers = makeBufferArray (bufferTakeRef (b [0]), NULL) ;
1631       if ( !prepareRead (e, buffers, getAuthUserResponse, cxn, 1) )
1632 	{
1633           warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1634 	  freeBufferArray (buffers) ;
1635 	  cxnSleepOrDie (cxn) ;
1636 	}
1637     }
1638   else
1639     {
1640       clearTimer (cxn->readBlockedTimerId) ;
1641 
1642       if ( !getNntpResponse (p, &code, NULL) )
1643 	{
1644           warn ("%s:%d cxnsleep response to AUTHINFO USER: %s", peerName,
1645                 cxn->ident, p) ;
1646 
1647 	  cxnSleepOrDie (cxn) ;
1648 	}
1649       else
1650 	{
1651           notice ("%s:%d connected", peerName, cxn->ident) ;
1652 
1653 	  switch (code)
1654 	    {
1655 	    case 381:
1656 	      issueAuthPass (e,cxn);
1657 	      break ;
1658 
1659 	    default:
1660               warn ("%s:%d cxnsleep response to AUTHINFO USER: %s", peerName,
1661                     cxn->ident, p) ;
1662 	      cxn->authenticated = true;
1663 	      issueModeStream (e,cxn);
1664 	      break ;
1665 	    }
1666 
1667 	}
1668     }
1669 }
1670 
1671 
1672 
1673 
1674 
1675 /*
1676  *
1677  */
getAuthPassResponse(EndPoint e,IoStatus i,Buffer * b,void * d)1678 static void getAuthPassResponse (EndPoint e, IoStatus i, Buffer *b, void *d)
1679 {
1680   Connection cxn = (Connection) d ;
1681   int code ;
1682   char *p = bufferBase (b[0]) ;
1683   Buffer *buffers ;
1684   const char *peerName ;
1685 
1686   ASSERT (e == cxn->myEp) ;
1687   ASSERT (b [0] == cxn->respBuffer) ;
1688   ASSERT (b [1] == NULL) ;      /* only ever one buffer on this read */
1689   ASSERT (cxn->state == cxnConnectingS) ;
1690   VALIDATE_CONNECTION (cxn) ;
1691 
1692   peerName = hostPeerName (cxn->myHost) ;
1693 
1694   bufferAddNullByte (b[0]) ;
1695 
1696   d_printf (1,"%s:%d Processing authinfo pass response: %s", /* no NL */
1697 	    hostPeerName (cxn->myHost), cxn->ident, p) ;
1698 
1699   if (i == IoDone && writeIsPending (cxn->myEp))
1700     {
1701       /* badness. should never happen */
1702       warn ("%s:%d cxnsleep authinfo command still pending", peerName,
1703             cxn->ident) ;
1704 
1705       cxnSleepOrDie (cxn) ;
1706     }
1707   else if (i != IoDone)
1708     {
1709       if (i != IoEOF)
1710 	{
1711 	  errno = endPointErrno (e) ;
1712           syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1713 	}
1714       cxnSleepOrDie (cxn) ;
1715     }
1716   else if (strchr (p, '\n') == NULL)
1717     {
1718       /* partial read */
1719       expandBuffer (b [0], BUFFER_EXPAND_AMOUNT) ;
1720 
1721       buffers = makeBufferArray (bufferTakeRef (b [0]), NULL) ;
1722       if ( !prepareRead (e, buffers, getAuthPassResponse, cxn, 1) )
1723 	{
1724           warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1725 	  freeBufferArray (buffers) ;
1726 	  cxnSleepOrDie (cxn) ;
1727 	}
1728     }
1729   else
1730     {
1731       clearTimer (cxn->readBlockedTimerId) ;
1732 
1733       if ( !getNntpResponse (p, &code, NULL) )
1734 	{
1735           warn ("%s:%d cxnsleep response to AUTHINFO PASS: %s", peerName,
1736                 cxn->ident, p) ;
1737 
1738 	  cxnSleepOrDie (cxn) ;
1739 	}
1740       else
1741 	{
1742 	  switch (code)
1743 	    {
1744 	    case 281:
1745               notice ("%s:%d authenticated", peerName, cxn->ident) ;
1746 	      cxn->authenticated = true ;
1747 	      issueModeStream (e,cxn);
1748 	      break ;
1749 
1750 	    default:
1751               warn ("%s:%d cxnsleep response to AUTHINFO PASS: %s", peerName,
1752                     cxn->ident, p) ;
1753 	      cxnSleepOrDie (cxn) ;
1754 	      break ;
1755 	    }
1756 
1757 	}
1758     }
1759 }
1760 
1761 
1762 
1763 
1764 
1765 /*
1766  * Process the remote's response to our MODE STREAM command. This is where
1767  * the Connection moves into the cxnFeedingS state. If the remote has given
1768  * us a good welcome banner, but then immediately dropped the connection,
1769  * we'll arrive here with the MODE STREAM command still queued up.
1770  */
getModeResponse(EndPoint e,IoStatus i,Buffer * b,void * d)1771 static void getModeResponse (EndPoint e, IoStatus i, Buffer *b, void *d)
1772 {
1773   Connection cxn = (Connection) d ;
1774   int code ;
1775   char *p = bufferBase (b[0]) ;
1776   Buffer *buffers ;
1777   const char *peerName ;
1778 
1779   ASSERT (e == cxn->myEp) ;
1780   ASSERT (b [0] == cxn->respBuffer) ;
1781   ASSERT (b [1] == NULL) ;      /* only ever one buffer on this read */
1782   ASSERT (cxn->state == cxnConnectingS) ;
1783   VALIDATE_CONNECTION (cxn) ;
1784 
1785   peerName = hostPeerName (cxn->myHost) ;
1786 
1787   bufferAddNullByte (b[0]) ;
1788 
1789   d_printf (1,"%s:%d Processing mode response: %s", /* no NL */
1790            hostPeerName (cxn->myHost), cxn->ident, p) ;
1791 
1792   if (i == IoDone && writeIsPending (cxn->myEp))
1793     {                           /* badness. should never happen */
1794       warn ("%s:%d cxnsleep mode stream command still pending", peerName,
1795             cxn->ident) ;
1796 
1797       cxnSleepOrDie (cxn) ;
1798     }
1799   else if (i != IoDone)
1800     {
1801       if (i != IoEOF)
1802         {
1803           errno = endPointErrno (e) ;
1804           syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1805         }
1806       cxnSleepOrDie (cxn) ;
1807     }
1808   else if (strchr (p, '\n') == NULL)
1809     {                           /* partial read */
1810       expandBuffer (b [0], BUFFER_EXPAND_AMOUNT) ;
1811 
1812       buffers = makeBufferArray (bufferTakeRef (b [0]), NULL) ;
1813       if ( !prepareRead (e, buffers, getModeResponse, cxn, 1) )
1814         {
1815           warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1816           freeBufferArray (buffers) ;
1817           cxnSleepOrDie (cxn) ;
1818         }
1819     }
1820   else
1821     {
1822       clearTimer (cxn->readBlockedTimerId) ;
1823 
1824       if ( !getNntpResponse (p, &code, NULL) )
1825         {
1826           warn ("%s:%d cxnsleep response to MODE STREAM: %s", peerName,
1827                 cxn->ident, p) ;
1828 
1829           cxnSleepOrDie (cxn) ;
1830         }
1831       else
1832         {
1833 	  if (!cxn->authenticated)
1834             notice ("%s:%d connected", peerName, cxn->ident) ;
1835 
1836           switch (code)
1837             {
1838               case 203:             /* will do streaming */
1839                 hostRemoteStreams (cxn->myHost, cxn, true) ;
1840 
1841                 if (hostWantsStreaming (cxn->myHost))
1842                   {
1843                     cxn->doesStreaming = true ;
1844                     cxn->maxCheck = hostMaxChecks (cxn->myHost) ;
1845                   }
1846                 else
1847                   cxn->maxCheck = 1 ;
1848 
1849                 break ;
1850 
1851               default:                      /* won't do it */
1852                 hostRemoteStreams (cxn->myHost, cxn, false) ;
1853                 cxn->maxCheck = 1 ;
1854                 break ;
1855             }
1856 
1857           /* now we consider ourselves completly connected. */
1858           cxn->timeCon = theTime();
1859           cxn->timeCon_checkpoint = theTime();
1860 
1861           if (cxn->articleQTotal == 0)
1862             cxnIdle (cxn) ;
1863           else
1864             cxn->state = cxnFeedingS ;
1865 
1866               /* one for the connection and one for the buffer array */
1867           ASSERT (cxn->authenticated || bufferRefCount (cxn->respBuffer) == 2) ;
1868 
1869           /* there was only one line in there, right? */
1870           bufferSetDataSize (cxn->respBuffer, 0) ;
1871           buffers = makeBufferArray (bufferTakeRef (cxn->respBuffer), NULL) ;
1872 
1873               /* sleepTimeout get changed at each failed attempt, so reset. */
1874           cxn->sleepTimeout = init_reconnect_period ;
1875 
1876           if ( !prepareRead (cxn->myEp, buffers, responseIsRead, cxn, 1) )
1877             {
1878               freeBufferArray (buffers) ;
1879 
1880               cxnSleepOrDie (cxn) ;
1881             }
1882           else
1883             {
1884               /* now we wait for articles from our Host, or we have some
1885                  articles already. On infrequently used connections, the
1886                  network link is torn down and rebuilt as needed. So we may
1887                  be rebuilding the connection here in which case we have an
1888                  article to send. */
1889               if (writesNeeded (cxn) || hostGimmeArticle (cxn->myHost,cxn))
1890                 doSomeWrites (cxn) ;
1891             }
1892         }
1893     }
1894 
1895   freeBufferArray (b) ;
1896 }
1897 
1898 
1899 
1900 
1901 
1902 /*
1903  * called when a response has been read from the socket. This is
1904  * where the bulk of the processing starts.
1905  */
responseIsRead(EndPoint e,IoStatus i,Buffer * b,void * d)1906 static void responseIsRead (EndPoint e, IoStatus i, Buffer *b, void *d)
1907 {
1908   Connection cxn = (Connection) d ;
1909   char *response ;
1910   char *endr ;
1911   char *bufBase ;
1912   unsigned int respSize ;
1913   int code ;
1914   char *rest = NULL ;
1915   Buffer buf ;
1916   Buffer *bArr ;
1917   const char *peerName ;
1918 
1919   ASSERT (e == cxn->myEp) ;
1920   ASSERT (b != NULL) ;
1921   ASSERT (b [1] == NULL) ;
1922   ASSERT (b [0] == cxn->respBuffer) ;
1923   ASSERT (cxn->state == cxnFeedingS ||
1924           cxn->state == cxnIdleS    ||
1925           cxn->state == cxnClosingS ||
1926           cxn->state == cxnFlushingS) ;
1927   VALIDATE_CONNECTION (cxn) ;
1928 
1929   bufferAddNullByte (b [0]) ;
1930 
1931   peerName = hostPeerName (cxn->myHost) ;
1932 
1933   if (i != IoDone)
1934     {                           /* uh oh. */
1935       if (i != IoEOF)
1936         {
1937           errno = endPointErrno (e) ;
1938           syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1939         }
1940       freeBufferArray (b) ;
1941 
1942       cxnLogStats (cxn,true) ;
1943 
1944       if (cxn->state == cxnClosingS)
1945         {
1946           cxnDead (cxn) ;
1947           delConnection (cxn) ;
1948         }
1949       else
1950         cxnSleep (cxn) ;
1951 
1952       return ;
1953     }
1954 
1955   buf = b [0] ;
1956   bufBase = bufferBase (buf) ;
1957 
1958   /* check that we have (at least) a full line response. If not expand
1959      the buffer and resubmit the read. */
1960   if (strchr (bufBase, '\n') == 0)
1961     {
1962       if (!expandBuffer (buf, BUFFER_EXPAND_AMOUNT))
1963         {
1964           warn ("%s:%d cxnsleep can't expand input buffer", peerName,
1965                 cxn->ident) ;
1966           freeBufferArray (b) ;
1967 
1968           cxnSleepOrDie (cxn) ;
1969         }
1970       else if ( !prepareRead (cxn->myEp, b, responseIsRead, cxn, 1))
1971         {
1972           warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1973           freeBufferArray (b) ;
1974 
1975           cxnSleepOrDie (cxn) ;
1976         }
1977 
1978       return ;
1979     }
1980 
1981 
1982   freeBufferArray (b) ; /* connection still has reference to buffer */
1983 
1984 
1985   /*
1986    * Now process all the full responses that we have.
1987    */
1988   response = bufBase ;
1989   respSize = bufferDataSize (cxn->respBuffer) ;
1990 
1991   while ((endr = strchr (response, '\n')) != NULL)
1992     {
1993       char *next = endr + 1 ;
1994 
1995       if (*next == '\r')
1996         next++ ;
1997 
1998       endr-- ;
1999       if (*endr != '\r')
2000         endr++ ;
2001 
2002       if (next - endr != 2 && !cxn->loggedNoCr)
2003         {
2004           /* only a newline there. we'll live with it */
2005           warn ("%s:%d remote not giving out CR characters", peerName,
2006                 cxn->ident) ;
2007           cxn->loggedNoCr = true ;
2008         }
2009 
2010       *endr = '\0' ;
2011 
2012       if ( !getNntpResponse (response, &code, &rest) )
2013         {
2014           warn ("%s:%d cxnsleep response format: %s", peerName, cxn->ident,
2015                 response) ;
2016           cxnSleepOrDie (cxn) ;
2017 
2018           return ;
2019         }
2020 
2021       d_printf (5,"%s:%d Response %d: %s\n", peerName, cxn->ident, code, response) ;
2022 
2023       /* now handle the response code. I'm not using symbolic names on
2024          purpose--the numbers are all you see in the RFC's. */
2025       switch (code)
2026         {
2027           case 205:             /* OK response to QUIT. */
2028             processResponse205 (cxn, response) ;
2029             break ;
2030 
2031 
2032 
2033             /* These three are from the CHECK command */
2034           case 238:             /* no such article found */
2035 	    /* Do not incrFilter (cxn) now, wait till after
2036 	       subsequent TAKETHIS */
2037             processResponse238 (cxn, response) ;
2038             break ;
2039 
2040           case 431:             /* try again later (also for TAKETHIS) */
2041             decrFilter (cxn) ;
2042             if (hostDropDeferred (cxn->myHost))
2043                 processResponse438 (cxn, response) ;
2044             else
2045                 processResponse431 (cxn, response) ;
2046             break ;
2047 
2048           case 438:             /* already have it */
2049             decrFilter (cxn) ;
2050             processResponse438 (cxn, response) ;
2051             break ;
2052 
2053 
2054 
2055             /* These are from the TAKETHIS command */
2056           case 239:             /* article transferred OK */
2057             incrFilter (cxn) ;
2058             processResponse239 (cxn, response) ;
2059             break ;
2060 
2061           case 439:             /* article rejected */
2062             decrFilter (cxn) ;
2063             processResponse439 (cxn, response) ;
2064             break ;
2065 
2066 
2067 
2068             /* These are from the IHAVE command */
2069           case 335:             /* send article */
2070             processResponse335 (cxn, response) ;
2071             break ;
2072 
2073           case 435:             /* article not wanted */
2074             processResponse435 (cxn, response) ;
2075             break ;
2076 
2077           case 436:             /* transfer failed try again later */
2078             if (cxn->takeRespHead == NULL && hostDropDeferred (cxn->myHost))
2079                 processResponse435 (cxn, response) ;
2080             else
2081                 processResponse436 (cxn, response) ;
2082             break ;
2083 
2084           case 437:             /* article rejected */
2085             processResponse437 (cxn, response) ;
2086             break ;
2087 
2088           case 400:             /* has stopped accepting articles */
2089             processResponse400 (cxn, response) ;
2090             break ;
2091 
2092 
2093 
2094           case 235:             /* article transfered OK (IHAVE-body) */
2095             processResponse235 (cxn, response) ;
2096             break ;
2097 
2098 
2099           case 480:             /* Transfer permission denied. */
2100             processResponse480  (cxn,response) ;
2101             break ;
2102 
2103           case 503:             /* remote timeout. */
2104             processResponse503  (cxn,response) ;
2105             break ;
2106 
2107           default:
2108             warn ("%s:%d cxnsleep response unknown: %d %s", peerName,
2109                   cxn->ident, code, response) ;
2110             cxnSleepOrDie (cxn) ;
2111             break ;
2112         }
2113 
2114       VALIDATE_CONNECTION (cxn) ;
2115 
2116       if (cxn->state != cxnFeedingS && cxn->state != cxnClosingS &&
2117           cxn->state != cxnFlushingS && cxn->state != cxnIdleS /* XXX */)
2118         break ;                 /* connection is terminated */
2119 
2120       response = next ;
2121     }
2122 
2123   d_printf (5,"%s:%d done with responses\n",hostPeerName (cxn->myHost),
2124            cxn->ident) ;
2125 
2126   switch (cxn->state)
2127     {
2128       case cxnIdleS:
2129       case cxnFeedingS:
2130       case cxnClosingS:
2131       case cxnFlushingS:
2132         /* see if we need to drop in to or out of no-CHECK mode */
2133         if (cxn->state == cxnFeedingS && cxn->doesStreaming)
2134           {
2135             if ((cxn->filterValue > cxn->onThreshold) && cxn->needsChecks) {
2136 	      cxn->needsChecks = false;
2137               hostLogNoCheckMode (cxn->myHost, true,
2138 				  cxn->offThreshold/cxn->lowPassFilter,
2139 				  cxn->filterValue/cxn->lowPassFilter,
2140 				  cxn->onThreshold/cxn->lowPassFilter) ;
2141 	      /* on and log */
2142             } else if ((cxn->filterValue < cxn->offThreshold) &&
2143                      !cxn->needsChecks) {
2144 	      cxn->needsChecks = true;
2145               hostLogNoCheckMode (cxn->myHost, false,
2146 				  cxn->offThreshold/cxn->lowPassFilter,
2147 				  cxn->filterValue/cxn->lowPassFilter,
2148 				  cxn->onThreshold/cxn->lowPassFilter) ;
2149 	      /* off and log */
2150 	    }
2151           }
2152 
2153         /* Now handle possible remaining partial response and set up for
2154            next read. */
2155         if (*response != '\0')
2156           {                       /* partial response */
2157             unsigned int leftAmt = respSize - (response - bufBase) ;
2158 
2159             d_printf (2,"%s:%d handling a partial response\n",
2160                      hostPeerName (cxn->myHost),cxn->ident) ;
2161 
2162             /* first we shift what's left in the buffer down to the
2163                bottom, if needed, or just expand the buffer */
2164             if (response != bufBase)
2165               {
2166                 /* so next read appends */
2167                 memmove (bufBase, response, leftAmt) ;
2168                 bufferSetDataSize (cxn->respBuffer, leftAmt) ;
2169               }
2170             else if (!expandBuffer (cxn->respBuffer, BUFFER_EXPAND_AMOUNT))
2171               die ("%s:%d cxnsleep can't expand input buffer", peerName,
2172                    cxn->ident) ;
2173           }
2174         else
2175           bufferSetDataSize (cxn->respBuffer, 0) ;
2176 
2177         bArr = makeBufferArray (bufferTakeRef (cxn->respBuffer), NULL) ;
2178 
2179         if ( !prepareRead (e, bArr, responseIsRead, cxn, 1) )
2180           {
2181             warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
2182             freeBufferArray (bArr) ;
2183             cxnWait (cxn) ;
2184             return ;
2185           }
2186         else
2187           {
2188             /* only setup the timer if we're still waiting for a response
2189                to something. There's not necessarily a 1-to-1 mapping
2190                between reads and writes in streaming mode. May have been
2191                set already above (that would be unlikely I think). */
2192             VALIDATE_CONNECTION (cxn) ;
2193 
2194             d_printf (5,"%s:%d about to do some writes\n",
2195                      hostPeerName (cxn->myHost),cxn->ident) ;
2196 
2197             doSomeWrites (cxn) ;
2198 
2199             /* If the read timer is (still) running, update it to give
2200                those terminally slow hosts that take forever to drain
2201                the network buffers and just dribble out responses the
2202                benefit of the doubt.  XXX - maybe should just increase
2203 	       timeout for these! */
2204             if (cxn->readBlockedTimerId)
2205               cxn->readBlockedTimerId = updateSleep (cxn->readBlockedTimerId,
2206                                                      responseTimeoutCbk,
2207                                                      cxn->readTimeout,
2208                                                      cxn) ;
2209           }
2210         VALIDATE_CONNECTION (cxn) ;
2211         break ;
2212 
2213       case cxnWaitingS:         /* presumably after a code 205 or 400 */
2214       case cxnConnectingS:      /* presumably after a code 205 or 400 */
2215       case cxnSleepingS:        /* probably after a 480 */
2216         break ;
2217 
2218       case cxnDeadS:
2219         delConnection (cxn) ;
2220         break ;
2221 
2222       case cxnStartingS:
2223       default:
2224         die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
2225     }
2226 }
2227 
2228 
2229 
2230 
2231 
2232 /*
2233  * called when the write of the QUIT command has completed.
2234  */
quitWritten(EndPoint e,IoStatus i,Buffer * b,void * d)2235 static void quitWritten (EndPoint e, IoStatus i, Buffer *b, void *d)
2236 {
2237   Connection cxn = (Connection) d ;
2238   const char *peerName ;
2239 
2240   peerName = hostPeerName (cxn->myHost) ;
2241 
2242   clearTimer (cxn->writeBlockedTimerId) ;
2243 
2244   ASSERT (cxn->myEp == e) ;
2245   VALIDATE_CONNECTION (cxn) ;
2246 
2247   if (i != IoDone)
2248     {
2249       errno = endPointErrno (e) ;
2250       syswarn ("%s:%d cxnsleep can't write QUIT", peerName, cxn->ident) ;
2251       if (cxn->state == cxnClosingS)
2252         {
2253           cxnDead (cxn) ;
2254           delConnection (cxn) ;
2255         }
2256       else
2257         cxnWait (cxn) ;
2258     }
2259   else
2260     /* The QUIT command has been sent, so start the response timer. */
2261     initReadBlockedTimeout (cxn) ;
2262 
2263   freeBufferArray (b) ;
2264 }
2265 
2266 
2267 
2268 
2269 
2270 /*
2271  * called when the write of the IHAVE-body data is finished
2272  */
ihaveBodyDone(EndPoint e,IoStatus i,Buffer * b,void * d)2273 static void ihaveBodyDone (EndPoint e, IoStatus i, Buffer *b, void *d)
2274 {
2275   Connection cxn = (Connection) d ;
2276 
2277   ASSERT (e == cxn->myEp) ;
2278 
2279   clearTimer (cxn->writeBlockedTimerId) ;
2280 
2281   if (i != IoDone)
2282     {
2283       errno = endPointErrno (e) ;
2284       syswarn ("%s:%d cxnsleep can't write IHAVE body",
2285                hostPeerName (cxn->myHost), cxn->ident) ;
2286 
2287       cxnLogStats (cxn,true) ;
2288 
2289       if (cxn->state == cxnClosingS)
2290         {
2291           cxnDead (cxn) ;
2292           delConnection (cxn) ;
2293         }
2294       else
2295         cxnSleep (cxn) ;
2296     }
2297   else
2298     {
2299       /* Some hosts return a response even before we're done sending, so don't
2300          go idle until here. */
2301       if (cxn->state == cxnFeedingS && cxn->articleQTotal == 0)
2302         cxnIdle (cxn) ;
2303       else
2304         /* The command set has been sent, so start the response timer. */
2305         initReadBlockedTimeout (cxn) ;
2306     }
2307 
2308   freeBufferArray (b) ;
2309 
2310   return ;
2311 }
2312 
2313 
2314 
2315 
2316 
2317 /*
2318  * Called when a command set (IHAVE, CHECK, TAKETHIS) has been
2319  * written to the remote.
2320  */
commandWriteDone(EndPoint e,IoStatus i,Buffer * b,void * d)2321 static void commandWriteDone (EndPoint e, IoStatus i, Buffer *b, void *d)
2322 {
2323   Connection cxn = (Connection) d ;
2324   const char *peerName ;
2325 
2326   ASSERT (e == cxn->myEp) ;
2327 
2328   peerName = hostPeerName (cxn->myHost) ;
2329 
2330   freeBufferArray (b) ;
2331 
2332   clearTimer (cxn->writeBlockedTimerId) ;
2333 
2334   if (i != IoDone)
2335     {
2336       errno = endPointErrno (e) ;
2337       syswarn ("%s:%d cxnsleep can't write command", peerName, cxn->ident) ;
2338 
2339       cxnLogStats (cxn,true) ;
2340 
2341       if (cxn->state == cxnClosingS)
2342         {
2343           cxnDead (cxn) ;
2344           delConnection (cxn) ;
2345         }
2346       else
2347         {
2348 	  /* XXX - so cxnSleep() doesn't die in VALIDATE_CONNECTION () */
2349           deferAllArticles (cxn) ;
2350           cxnIdle (cxn) ;
2351 
2352           cxnSleep (cxn) ;
2353         }
2354     }
2355   else
2356     {
2357       /* Some hosts return a response even before we're done sending, so don't
2358          go idle until here */
2359       if (cxn->state == cxnFeedingS && cxn->articleQTotal == 0)
2360         cxnIdle (cxn) ;
2361       else
2362         /* The command set has been sent, so start the response timer.
2363            XXX - we'd like finer grained control */
2364         initReadBlockedTimeout (cxn) ;
2365 
2366       if ( cxn->doesStreaming )
2367         doSomeWrites (cxn) ;        /* pump data as fast as possible */
2368                                     /* XXX - will clear the read timeout */
2369     }
2370 }
2371 
2372 
2373 
2374 
2375 
2376 /*
2377  * Called when the MODE STREAM command has been written down the pipe.
2378  */
modeCmdIssued(EndPoint e,IoStatus i,Buffer * b,void * d)2379 static void modeCmdIssued (EndPoint e, IoStatus i, Buffer *b, void *d)
2380 {
2381   Connection cxn = (Connection) d ;
2382 
2383   ASSERT (e == cxn->myEp) ;
2384 
2385   clearTimer (cxn->writeBlockedTimerId) ;
2386 
2387   /* The mode command has been sent, so start the response timer */
2388   initReadBlockedTimeout (cxn) ;
2389 
2390   if (i != IoDone)
2391     {
2392       d_printf (1,"%s:%d MODE STREAM command failed to write\n",
2393                hostPeerName (cxn->myHost), cxn->ident) ;
2394 
2395       syswarn ("%s:%d cxnsleep can't write MODE STREAM",
2396                hostPeerName (cxn->myHost), cxn->ident) ;
2397 
2398       cxnSleepOrDie (cxn) ;
2399     }
2400 
2401   freeBufferArray (b) ;
2402 }
2403 
2404 
2405 
2406 
2407 
2408 /*
2409  * Called when the AUTHINFO USER command has been written down the pipe.
2410  */
authUserIssued(EndPoint e,IoStatus i,Buffer * b,void * d)2411 static void authUserIssued (EndPoint e, IoStatus i, Buffer *b, void *d)
2412 {
2413   Connection cxn = (Connection) d ;
2414 
2415   ASSERT (e == cxn->myEp) ;
2416 
2417   clearTimer (cxn->writeBlockedTimerId) ;
2418 
2419   /* The authinfo user command has been sent, so start the response timer */
2420   initReadBlockedTimeout (cxn) ;
2421 
2422   if (i != IoDone)
2423     {
2424       d_printf (1,"%s:%d AUTHINFO USER command failed to write\n",
2425                hostPeerName (cxn->myHost), cxn->ident) ;
2426 
2427       syswarn ("%s:%d cxnsleep can't write AUTHINFO USER",
2428                hostPeerName (cxn->myHost), cxn->ident) ;
2429 
2430       cxnSleepOrDie (cxn) ;
2431     }
2432 
2433   freeBufferArray (b) ;
2434 }
2435 
2436 
2437 
2438 
2439 
2440 
2441 /*
2442  * Called when the AUTHINFO USER command has been written down the pipe.
2443  */
authPassIssued(EndPoint e,IoStatus i,Buffer * b,void * d)2444 static void authPassIssued (EndPoint e, IoStatus i, Buffer *b, void *d)
2445 {
2446   Connection cxn = (Connection) d ;
2447 
2448   ASSERT (e == cxn->myEp) ;
2449 
2450   clearTimer (cxn->writeBlockedTimerId) ;
2451 
2452   /* The authinfo pass command has been sent, so start the response timer */
2453   initReadBlockedTimeout (cxn) ;
2454 
2455   if (i != IoDone)
2456     {
2457       d_printf (1,"%s:%d AUTHINFO PASS command failed to write\n",
2458                hostPeerName (cxn->myHost), cxn->ident) ;
2459 
2460       syswarn ("%s:%d cxnsleep can't write AUTHINFO PASS",
2461                hostPeerName (cxn->myHost), cxn->ident) ;
2462 
2463       cxnSleepOrDie (cxn) ;
2464     }
2465 
2466   freeBufferArray (b) ;
2467 }
2468 
2469 
2470 
2471 
2472 
2473 
2474 /*
2475  * Called whenever some amount of data has been written to the pipe but
2476  * more data remains to be written
2477  */
writeProgress(EndPoint e UNUSED,IoStatus i,Buffer * b UNUSED,void * d)2478 static void writeProgress (EndPoint e UNUSED, IoStatus i, Buffer *b UNUSED,
2479                            void *d)
2480 {
2481   Connection cxn = (Connection) d ;
2482 
2483   ASSERT (i == IoProgress) ;
2484 
2485   if (cxn->writeTimeout > 0)
2486     cxn->writeBlockedTimerId = updateSleep (cxn->writeBlockedTimerId,
2487                                             writeTimeoutCbk, cxn->writeTimeout,
2488                                             cxn) ;
2489 }
2490 
2491 
2492 
2493 
2494 
2495 /*
2496  * Timers.
2497  */
2498 
2499 /*
2500  * This is called when the timeout for the response from the remote
2501  * goes off. We tear down the connection and notify our host.
2502  */
responseTimeoutCbk(TimeoutId id,void * data)2503 static void responseTimeoutCbk (TimeoutId id, void *data)
2504 {
2505   Connection cxn = (Connection) data ;
2506   const char *peerName ;
2507 
2508   ASSERT (id == cxn->readBlockedTimerId) ;
2509   ASSERT (cxn->state == cxnConnectingS ||
2510           cxn->state == cxnFeedingS ||
2511           cxn->state == cxnFlushingS ||
2512           cxn->state == cxnClosingS) ;
2513   VALIDATE_CONNECTION (cxn) ;
2514 
2515   /* XXX - let abortConnection clear readBlockedTimerId, otherwise
2516      VALIDATE_CONNECTION() will croak */
2517 
2518   peerName = hostPeerName (cxn->myHost) ;
2519 
2520   warn ("%s:%d cxnsleep non-responsive connection", peerName, cxn->ident) ;
2521   d_printf (1,"%s:%d shutting down non-responsive connection\n",
2522            hostPeerName (cxn->myHost), cxn->ident) ;
2523 
2524   cxnLogStats (cxn,true) ;
2525 
2526   if (cxn->state == cxnClosingS)
2527     {
2528       abortConnection (cxn) ;
2529       delConnection (cxn) ;
2530     }
2531   else
2532     cxnSleep (cxn) ;              /* will notify the Host */
2533 }
2534 
2535 
2536 
2537 
2538 
2539 /*
2540  * This is called when the data write timeout for the remote
2541  * goes off. We tear down the connection and notify our host.
2542  */
writeTimeoutCbk(TimeoutId id,void * data)2543 static void writeTimeoutCbk (TimeoutId id, void *data)
2544 {
2545   Connection cxn = (Connection) data ;
2546   const char *peerName ;
2547 
2548   ASSERT (id == cxn->writeBlockedTimerId) ;
2549   ASSERT (cxn->state == cxnConnectingS ||
2550           cxn->state == cxnFeedingS ||
2551           cxn->state == cxnFlushingS ||
2552           cxn->state == cxnClosingS) ;
2553   VALIDATE_CONNECTION (cxn) ;
2554 
2555   /* XXX - let abortConnection clear writeBlockedTimerId, otherwise
2556      VALIDATE_CONNECTION() will croak */
2557 
2558   peerName = hostPeerName (cxn->myHost) ;
2559 
2560   warn ("%s:%d cxnsleep write timeout", peerName, cxn->ident) ;
2561   d_printf (1,"%s:%d shutting down non-responsive connection\n",
2562            hostPeerName (cxn->myHost), cxn->ident) ;
2563 
2564   cxnLogStats (cxn,true) ;
2565 
2566   if (cxn->state == cxnClosingS)
2567     {
2568       abortConnection (cxn) ;
2569       delConnection (cxn) ;
2570     }
2571   else
2572     cxnSleep (cxn) ;              /* will notify the Host */
2573 }
2574 
2575 
2576 
2577 
2578 
2579 /*
2580  * Called by the EndPoint class when the timer goes off
2581  */
reopenTimeoutCbk(TimeoutId id,void * data)2582 static void reopenTimeoutCbk (TimeoutId id, void *data)
2583 {
2584   Connection cxn = (Connection) data ;
2585 
2586   ASSERT (id == cxn->sleepTimerId) ;
2587 
2588   cxn->sleepTimerId = 0 ;
2589 
2590   if (cxn->state != cxnSleepingS)
2591     {
2592       warn ("%s:%d cxnsleep connection in bad state: %s",
2593             hostPeerName (cxn->myHost), cxn->ident,
2594             stateToString (cxn->state)) ;
2595       cxnSleepOrDie (cxn) ;
2596     }
2597   else
2598     cxnConnect (cxn) ;
2599 }
2600 
2601 
2602 
2603 
2604 
2605 /*
2606  * timeout callback to close down long running connection.
2607  */
flushCxnCbk(TimeoutId id,void * data)2608 static void flushCxnCbk (TimeoutId id, void *data)
2609 {
2610   Connection cxn = (Connection) data ;
2611 
2612   ASSERT (id == cxn->flushTimerId) ;
2613   VALIDATE_CONNECTION (cxn) ;
2614 
2615   cxn->flushTimerId = 0 ;
2616 
2617   if (!(cxn->state == cxnFeedingS || cxn->state == cxnConnectingS ||
2618         cxn->state == cxnIdleS))
2619     {
2620       warn ("%s:%d cxnsleep connection in bad state: %s",
2621             hostPeerName (cxn->myHost), cxn->ident,
2622             stateToString (cxn->state)) ;
2623       cxnSleepOrDie (cxn) ;
2624     }
2625   else
2626     {
2627       d_printf (1,"%s:%d Handling periodic connection close.\n",
2628                hostPeerName (cxn->myHost), cxn->ident) ;
2629 
2630       notice ("%s:%d periodic close", hostPeerName (cxn->myHost), cxn->ident) ;
2631 
2632       cxnFlush (cxn) ;
2633     }
2634 }
2635 
2636 
2637 
2638 
2639 
2640 /*
2641  * Timer callback for when the connection has not received an
2642  * article from INN. When that happens we tear down the network
2643  * connection to help recycle fds
2644  */
articleTimeoutCbk(TimeoutId id,void * data)2645 static void articleTimeoutCbk (TimeoutId id, void *data)
2646 {
2647   Connection cxn = (Connection) data ;
2648   const char *peerName = hostPeerName (cxn->myHost) ;
2649 
2650   ASSERT (cxn->artReceiptTimerId == id) ;
2651   VALIDATE_CONNECTION (cxn) ;
2652 
2653   cxn->artReceiptTimerId = 0 ;
2654 
2655   if (cxn->state != cxnIdleS)
2656     {
2657       warn ("%s:%d cxnsleep connection in bad state: %s",
2658             hostPeerName (cxn->myHost), cxn->ident,
2659             stateToString (cxn->state)) ;
2660       cxnSleepOrDie (cxn) ;
2661 
2662       return ;
2663     }
2664 
2665   /* it's doubtful (right?) that this timer could go off and there'd
2666      still be articles in the queue. */
2667   if (cxn->articleQTotal > 0)
2668     {
2669       warn ("%s:%d idle connection still has articles", peerName, cxn->ident) ;
2670     }
2671   else
2672     {
2673       notice ("%s:%d idle tearing down connection", peerName, cxn->ident) ;
2674       cxn->state = cxnIdleTimeoutS ;
2675       cxnFlush (cxn) ;
2676     }
2677 }
2678 
2679 
2680 
2681 
2682 
2683 /*
2684  * function to be called when the fd is not ready for reading, but there is
2685  * an article on tape or in the queue to be done. Things are done this way
2686  * so that a Connection doesn't hog time trying to find the next good
2687  * article for writing. With a large backlog of expired articles that would
2688  * take a long time. Instead the Connection just tries its next article on
2689  * tape or queue, and if that's no good then it registers this callback so
2690  * that other Connections have a chance of being serviced.
2691  *
2692  * This function is also put on the callback queue if we called doSomeWrites
2693  * but there was already a write pending, mostly so that we don't deadlock the
2694  * connection when we got a response to the body of an IHAVE command before we
2695  * finished sending the body.
2696  */
cxnWorkProc(EndPoint ep UNUSED,void * data)2697 static void cxnWorkProc (EndPoint ep UNUSED, void *data)
2698 {
2699   Connection cxn = (Connection) data ;
2700 
2701   d_printf (2,"%s:%d calling work proc\n",
2702            hostPeerName (cxn->myHost),cxn->ident) ;
2703 
2704   if (writesNeeded (cxn))
2705     doSomeWrites (cxn) ;        /* may re-register the work proc... */
2706   else if (cxn->state == cxnFlushingS || cxn->state == cxnClosingS)
2707     {
2708       if (cxn->articleQTotal == 0)
2709         issueQUIT (cxn) ;
2710     }
2711   else
2712     d_printf (2,"%s:%d no writes were needed...\n",
2713              hostPeerName (cxn->myHost), cxn->ident) ;
2714 }
2715 
2716 
2717 
2718 /****************************************************************************
2719  *
2720  * END EndPoint callback area.
2721  *
2722  ****************************************************************************/
2723 
2724 
2725 
2726 
2727 
2728 /****************************************************************************
2729  *
2730  * RESPONSE CODE PROCESSING.
2731  *
2732  ***************************************************************************/
2733 
2734 
2735 /*
2736  * A connection needs to sleep, but if it's closing it needs to die instead.
2737  */
cxnSleepOrDie(Connection cxn)2738 static void cxnSleepOrDie (Connection cxn)
2739 {
2740   if (cxn->state == cxnClosingS)
2741     cxnDead (cxn) ;
2742   else
2743     cxnSleep (cxn) ;
2744 }
2745 
2746 
2747 /*
2748  * Handle the response 205 to our QUIT command, which means the
2749  * remote is going away and we can happily cleanup
2750  */
processResponse205(Connection cxn,char * response UNUSED)2751 static void processResponse205 (Connection cxn, char *response UNUSED)
2752 {
2753   bool immedRecon ;
2754 
2755   VALIDATE_CONNECTION (cxn) ;
2756 
2757   if (!(cxn->state == cxnFeedingS ||
2758         cxn->state == cxnIdleS ||
2759         cxn->state == cxnFlushingS ||
2760         cxn->state == cxnClosingS))
2761     {
2762       warn ("%s:%d cxnsleep connection in bad state: %s",
2763             hostPeerName (cxn->myHost), cxn->ident,
2764             stateToString (cxn->state)) ;
2765       cxnSleepOrDie (cxn) ;
2766       return ;
2767     }
2768 
2769   switch (cxn->state)
2770     {
2771       case cxnFlushingS:
2772       case cxnClosingS:
2773         ASSERT (cxn->articleQTotal == 0) ;
2774 
2775         cxnLogStats (cxn,true) ;
2776 
2777         immedRecon = cxn->immedRecon ;
2778 
2779         hostCxnDead (cxn->myHost,cxn) ;
2780 
2781         if (cxn->state == cxnFlushingS && immedRecon)
2782           {
2783             abortConnection (cxn) ;
2784             if (!cxnConnect (cxn))
2785               notice ("%s:%d flush re-connect failed",
2786                       hostPeerName (cxn->myHost), cxn->ident) ;
2787           }
2788         else if (cxn->state == cxnFlushingS)
2789           cxnWait (cxn) ;
2790         else
2791           cxnDead (cxn) ;
2792         break ;
2793 
2794       case cxnIdleS:
2795       case cxnFeedingS:
2796         /* this shouldn't ever happen... */
2797         warn ("%s:%d cxnsleep response unexpected: %d",
2798               hostPeerName (cxn->myHost), cxn->ident, 205) ;
2799         cxnSleepOrDie (cxn) ;
2800         break ;
2801 
2802       default:
2803         die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
2804     }
2805 }
2806 
2807 
2808 
2809 
2810 
2811 /*
2812  * Handle a response code of 238 which is the "no such article"
2813  * reply to the CHECK command (i.e. remote wants it).
2814  */
processResponse238(Connection cxn,char * response)2815 static void processResponse238 (Connection cxn, char *response)
2816 {
2817   char *msgid ;
2818   ArtHolder artHolder ;
2819 
2820   if (!cxn->doesStreaming)
2821     {
2822       warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
2823             " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
2824             response) ;
2825       cxnSleepOrDie (cxn) ;
2826       return ;
2827     }
2828 
2829   if (!(cxn->state == cxnFlushingS ||
2830         cxn->state == cxnFeedingS ||
2831         cxn->state == cxnClosingS))
2832     {
2833       warn ("%s:%d cxnsleep connection in bad state: %s",
2834             hostPeerName (cxn->myHost), cxn->ident,
2835             stateToString (cxn->state)) ;
2836       cxnSleepOrDie (cxn) ;
2837       return ;
2838     }
2839 
2840   VALIDATE_CONNECTION (cxn) ;
2841 
2842   msgid = getMsgId (response) ;
2843 
2844   if (cxn->checkRespHead == NULL) /* peer is confused */
2845     {
2846       warn ("%s:%d cxnsleep response unexpected: %d",
2847             hostPeerName (cxn->myHost),cxn->ident,238) ;
2848       cxnSleepOrDie (cxn) ;
2849     }
2850   else if (msgid == NULL || strlen (msgid) == 0 ||
2851            (artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
2852     noSuchMessageId (cxn,238,msgid,response) ;
2853   else
2854     {
2855       /* now remove the article from the check queue and move it onto the
2856          transmit queue. Another function wil take care of transmitting */
2857       remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
2858       if (cxn->state != cxnClosingS)
2859         appendArtHolder (artHolder, &cxn->takeHead, &cxn->articleQTotal) ;
2860       else
2861         {
2862           hostTakeBackArticle (cxn->myHost, cxn, artHolder->article) ;
2863           delArtHolder (artHolder) ;
2864         }
2865     }
2866 
2867   if (msgid != NULL)
2868     free (msgid) ;
2869 }
2870 
2871 
2872 
2873 
2874 
2875 /*
2876  * process the response "try again later" to the CHECK command If this
2877  * returns true then the connection is still usable.
2878  */
processResponse431(Connection cxn,char * response)2879 static void processResponse431 (Connection cxn, char *response)
2880 {
2881   char *msgid ;
2882   ArtHolder artHolder ;
2883 
2884   if (!cxn->doesStreaming)
2885     {
2886       warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
2887             " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
2888             response) ;
2889       cxnSleepOrDie (cxn) ;
2890       return ;
2891     }
2892 
2893   if (!(cxn->state == cxnFlushingS ||
2894         cxn->state == cxnFeedingS ||
2895         cxn->state == cxnClosingS))
2896     {
2897       warn ("%s:%d cxnsleep connection in bad state: %s",
2898             hostPeerName (cxn->myHost), cxn->ident,
2899             stateToString (cxn->state)) ;
2900       cxnSleepOrDie (cxn) ;
2901       return ;
2902     }
2903 
2904   VALIDATE_CONNECTION (cxn) ;
2905 
2906   msgid = getMsgId (response) ;
2907 
2908   if (cxn->checkRespHead == NULL) /* peer is confused */
2909     {
2910       warn ("%s:%d cxnsleep response unexpected: %d",
2911             hostPeerName (cxn->myHost),cxn->ident,431) ;
2912       cxnSleepOrDie (cxn) ;
2913     }
2914   else if (msgid == NULL || strlen (msgid) == 0 ||
2915            (artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
2916     noSuchMessageId (cxn,431,msgid,response) ;
2917   else
2918     {
2919       remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
2920       if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
2921         cxnIdle (cxn) ;
2922       hostArticleDeferred (cxn->myHost, cxn, artHolder->article) ;
2923       delArtHolder (artHolder) ;
2924     }
2925 
2926   if (msgid != NULL)
2927     free (msgid) ;
2928 }
2929 
2930 
2931 
2932 
2933 
2934 /*
2935  * process the "already have it" response to the CHECK command.  If this
2936  * returns true then the connection is still usable.
2937  */
processResponse438(Connection cxn,char * response)2938 static void processResponse438 (Connection cxn, char *response)
2939 {
2940   char *msgid ;
2941   ArtHolder artHolder ;
2942 
2943   if (!cxn->doesStreaming)
2944     {
2945       warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
2946             " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
2947             response) ;
2948       cxnSleepOrDie (cxn) ;
2949       return ;
2950     }
2951 
2952   if (!(cxn->state == cxnFlushingS ||
2953         cxn->state == cxnFeedingS ||
2954         cxn->state == cxnClosingS))
2955     {
2956       warn ("%s:%d cxnsleep connection in bad state: %s",
2957             hostPeerName (cxn->myHost), cxn->ident,
2958             stateToString (cxn->state)) ;
2959       cxnSleepOrDie (cxn) ;
2960       return ;
2961     }
2962 
2963   VALIDATE_CONNECTION (cxn) ;
2964 
2965   msgid = getMsgId (response) ;
2966 
2967   if (cxn->checkRespHead == NULL) /* peer is confused */
2968     {
2969       warn ("%s:%d cxnsleep response unexpected: %d",
2970             hostPeerName (cxn->myHost),cxn->ident,438) ;
2971       cxnSleepOrDie (cxn) ;
2972     }
2973   else if (msgid == NULL || strlen (msgid) == 0 ||
2974            (artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
2975     noSuchMessageId (cxn,438,msgid,response) ;
2976   else
2977     {
2978       cxn->checksRefused++ ;
2979 
2980       remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
2981       if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
2982         cxnIdle (cxn) ;
2983       hostArticleNotWanted (cxn->myHost, cxn, artHolder->article);
2984       delArtHolder (artHolder) ;
2985     }
2986 
2987   if (msgid != NULL)
2988     free (msgid) ;
2989 }
2990 
2991 
2992 
2993 
2994 
2995 /*
2996  * process the "article transferred ok" response to the TAKETHIS.
2997  */
processResponse239(Connection cxn,char * response)2998 static void processResponse239 (Connection cxn, char *response)
2999 {
3000   char *msgid ;
3001   ArtHolder artHolder ;
3002 
3003   if (!cxn->doesStreaming)
3004     {
3005       warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
3006             " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
3007             response) ;
3008       cxnSleepOrDie (cxn) ;
3009       return ;
3010     }
3011 
3012   if (!(cxn->state == cxnFlushingS ||
3013         cxn->state == cxnFeedingS ||
3014         cxn->state == cxnClosingS))
3015     {
3016       warn ("%s:%d cxnsleep connection in bad state: %s",
3017             hostPeerName (cxn->myHost), cxn->ident,
3018             stateToString (cxn->state)) ;
3019       cxnSleepOrDie (cxn) ;
3020       return ;
3021     }
3022 
3023   VALIDATE_CONNECTION (cxn) ;
3024 
3025   msgid = getMsgId (response) ;
3026 
3027   if (cxn->takeRespHead == NULL) /* peer is confused */
3028     {
3029       warn ("%s:%d cxnsleep response unexpected: %d",
3030             hostPeerName (cxn->myHost),cxn->ident,239) ;
3031       cxnSleepOrDie (cxn) ;
3032     }
3033   else if (msgid == NULL || strlen (msgid) == 0 ||
3034            (artHolder = artHolderByMsgId (msgid, cxn->takeRespHead)) == NULL)
3035     noSuchMessageId (cxn,239,msgid,response) ;
3036   else
3037     {
3038       cxn->takesOkayed++ ;
3039       cxn->takesSizeOkayed += artSize(artHolder->article);
3040 
3041       remArtHolder (artHolder, &cxn->takeRespHead, &cxn->articleQTotal) ;
3042       if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
3043         cxnIdle (cxn) ;
3044       hostArticleAccepted (cxn->myHost, cxn, artHolder->article) ;
3045       delArtHolder (artHolder) ;
3046     }
3047 
3048   if (msgid != NULL)
3049     free (msgid) ;
3050 }
3051 
3052 
3053 
3054 /*
3055  *  Set the thresholds for no-CHECK mode; negative means leave existing value
3056  */
3057 
cxnSetCheckThresholds(Connection cxn,double lowFilter,double highFilter,double lowPassFilter)3058 void cxnSetCheckThresholds (Connection cxn,
3059 			    double lowFilter, double highFilter,
3060 			    double lowPassFilter)
3061 {
3062   /* Adjust current value for new scaling */
3063   if (cxn->lowPassFilter > 0.0)
3064     cxn->filterValue = cxn->filterValue / cxn->lowPassFilter * lowPassFilter;
3065 
3066   /* Stick in new values */
3067   if (highFilter >= 0)
3068     cxn->onThreshold = highFilter * lowPassFilter / 100.0;
3069   if (lowFilter >= 0)
3070     cxn->offThreshold = lowFilter * lowPassFilter / 100.0;
3071   cxn->lowPassFilter = lowPassFilter;
3072 }
3073 
3074 
3075 /*
3076  *  Blow away the connection gracelessly and immedately clean up
3077  */
cxnNuke(Connection cxn)3078 void cxnNuke (Connection cxn)
3079 {
3080   abortConnection (cxn) ;
3081   hostCxnDead (cxn->myHost,cxn) ;
3082   delConnection(cxn) ;
3083 }
3084 
3085 
3086 /*
3087  * process a "article rejected do not try again" response to the
3088  * TAKETHIS.
3089  */
processResponse439(Connection cxn,char * response)3090 static void processResponse439 (Connection cxn, char *response)
3091 {
3092   char *msgid ;
3093   ArtHolder artHolder ;
3094 
3095   if (!cxn->doesStreaming)
3096     {
3097       warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
3098             " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
3099             response) ;
3100       cxnSleepOrDie (cxn) ;
3101       return ;
3102     }
3103 
3104   if (!(cxn->state == cxnFlushingS ||
3105         cxn->state == cxnFeedingS ||
3106         cxn->state == cxnClosingS))
3107     {
3108       warn ("%s:%d cxnsleep connection in bad state: %s",
3109             hostPeerName (cxn->myHost), cxn->ident,
3110             stateToString (cxn->state)) ;
3111       cxnSleepOrDie (cxn) ;
3112       return ;
3113     }
3114 
3115   VALIDATE_CONNECTION (cxn) ;
3116 
3117   msgid = getMsgId (response) ;
3118 
3119   if (cxn->takeRespHead == NULL) /* peer is confused */
3120     {
3121       /* NNTPRelay return 439 for check <messid> if messid is bad */
3122       if (cxn->checkRespHead == NULL) /* peer is confused */
3123         {
3124           warn ("%s:%d cxnsleep response unexpected: %d",
3125                 hostPeerName (cxn->myHost),cxn->ident,439) ;
3126           cxnSleepOrDie (cxn) ;
3127         }
3128       else
3129         {
3130           if ((artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
3131             noSuchMessageId (cxn,439,msgid,response) ;
3132           else
3133             {
3134               cxn->checksRefused++ ;
3135               remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
3136               if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
3137                 cxnIdle (cxn) ;
3138               hostArticleNotWanted (cxn->myHost, cxn, artHolder->article);
3139               delArtHolder (artHolder) ;
3140             }
3141         }
3142     }
3143   else if (msgid == NULL || strlen (msgid) == 0 ||
3144            (artHolder = artHolderByMsgId (msgid, cxn->takeRespHead)) == NULL)
3145     noSuchMessageId (cxn,439,msgid,response) ;
3146   else
3147     {
3148       cxn->takesRejected++ ;
3149       cxn->takesSizeRejected += artSize(artHolder->article);
3150 
3151       remArtHolder (artHolder, &cxn->takeRespHead, &cxn->articleQTotal) ;
3152       /* Some hosts return the 439 response even before we're done sending */
3153       if (cxn->articleQTotal == 0 && !writeIsPending(cxn->myEp))
3154         cxnIdle (cxn) ;
3155       hostArticleRejected (cxn->myHost, cxn, artHolder->article) ;
3156       delArtHolder (artHolder) ;
3157     }
3158 
3159   if (msgid != NULL)
3160     free (msgid) ;
3161 }
3162 
3163 
3164 
3165 
3166 
3167 
3168 /*
3169  * process the "article transferred ok" response to the IHAVE-body.
3170  */
processResponse235(Connection cxn,char * response UNUSED)3171 static void processResponse235 (Connection cxn, char *response UNUSED)
3172 {
3173   ArtHolder artHolder ;
3174 
3175   if (cxn->doesStreaming)
3176     {
3177       warn ("%s:%d cxnsleep unexpected non-streaming response for"
3178             " streaming connection: %s", hostPeerName (cxn->myHost),
3179             cxn->ident,response) ;
3180       cxnSleepOrDie (cxn) ;
3181       return ;
3182     }
3183 
3184   if (!(cxn->state == cxnFlushingS ||
3185         cxn->state == cxnFeedingS ||
3186         cxn->state == cxnClosingS))
3187     {
3188       warn ("%s:%d cxnsleep connection in bad state: %s",
3189             hostPeerName (cxn->myHost), cxn->ident,
3190             stateToString (cxn->state)) ;
3191       cxnSleepOrDie (cxn) ;
3192       return ;
3193     }
3194 
3195   ASSERT (cxn->articleQTotal == 1) ;
3196   ASSERT (cxn->takeRespHead != NULL) ;
3197   VALIDATE_CONNECTION (cxn) ;
3198 
3199   if (cxn->takeRespHead == NULL) /* peer is confused */
3200     {
3201       warn ("%s:%d cxnsleep response unexpected: %d",
3202             hostPeerName (cxn->myHost),cxn->ident,235) ;
3203       cxnSleepOrDie (cxn) ;
3204     }
3205   else
3206     {
3207       /* now remove the article from the queue and tell the Host to forget
3208          about it. */
3209       artHolder = cxn->takeRespHead ;
3210 
3211       cxn->takeRespHead = NULL ;
3212       cxn->articleQTotal = 0 ;
3213       cxn->takesOkayed++ ;
3214       cxn->takesSizeOkayed += artSize(artHolder->article);
3215 
3216       if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
3217         cxnIdle (cxn) ;
3218 
3219       hostArticleAccepted (cxn->myHost, cxn, artHolder->article) ;
3220       delArtHolder (artHolder) ;
3221     }
3222 }
3223 
3224 
3225 
3226 
3227 
3228 /*
3229  * process the "send article to be transfered" response to the IHAVE.
3230  */
processResponse335(Connection cxn,char * response UNUSED)3231 static void processResponse335 (Connection cxn, char *response UNUSED)
3232 {
3233   if (cxn->doesStreaming)
3234     {
3235       warn ("%s:%d cxnsleep unexpected non-streaming response for"
3236             " streaming connection: %s", hostPeerName (cxn->myHost),
3237             cxn->ident,response) ;
3238       cxnSleepOrDie (cxn) ;
3239       return ;
3240     }
3241 
3242   if (!(cxn->state == cxnFlushingS ||
3243         cxn->state == cxnFeedingS ||
3244         cxn->state == cxnClosingS))
3245     {
3246       warn ("%s:%d cxnsleep connection in bad state: %s",
3247             hostPeerName (cxn->myHost), cxn->ident,
3248             stateToString (cxn->state)) ;
3249       cxnSleepOrDie (cxn) ;
3250       return ;
3251     }
3252 
3253   if (cxn->checkRespHead == NULL)
3254     {
3255       warn ("%s:%d cxnsleep response unexpected: %d",
3256             hostPeerName (cxn->myHost),cxn->ident,335) ;
3257       cxnSleepOrDie (cxn) ;
3258     }
3259   else
3260     {
3261       VALIDATE_CONNECTION (cxn) ;
3262       /* now move the article into the third queue */
3263       cxn->takeHead = cxn->checkRespHead ;
3264       cxn->checkRespHead = NULL ;
3265 
3266       issueIHAVEBody (cxn) ;
3267     }
3268 }
3269 
3270 
3271 
3272 
3273 
3274 /*
3275  * process the "not accepting articles" response. This could be to any of
3276  * the IHAVE/CHECK/TAKETHIS command, but not the banner--that's handled
3277  * elsewhere.
3278  */
processResponse400(Connection cxn,char * response)3279 static void processResponse400 (Connection cxn, char *response)
3280 {
3281   if (!(cxn->state == cxnFlushingS ||
3282         cxn->state == cxnFeedingS ||
3283         cxn->state == cxnIdleS ||
3284         cxn->state == cxnClosingS))
3285     {
3286       warn ("%s:%d cxnsleep connection in bad state: %s",
3287             hostPeerName (cxn->myHost), cxn->ident,
3288             stateToString (cxn->state)) ;
3289       cxnSleepOrDie (cxn) ;
3290       return ;
3291     }
3292 
3293   VALIDATE_CONNECTION (cxn) ;
3294 
3295   /* We may get a response 400 multiple times when in streaming mode. */
3296   notice ("%s:%d remote cannot accept articles: %s",
3297           hostPeerName(cxn->myHost), cxn->ident, response) ;
3298 
3299   /* right here there may still be data queued to write and so we'll fail
3300      trying to issue the quit ('cause a write will be pending). Furthermore,
3301      the data pending may be half way through an command, and so just
3302      tossing the buffer is nt sufficient. But figuring out where we are and
3303      doing a tidy job is hard */
3304   if (writeIsPending (cxn->myEp))
3305     cxnSleepOrDie (cxn) ;
3306   else
3307     {
3308       if (cxn->articleQTotal > 0)
3309         {
3310           /* Defer the articles here so that cxnFlush() doesn't set up an
3311              immediate reconnect. */
3312           deferAllArticles (cxn) ;
3313           clearTimer (cxn->readBlockedTimerId) ;
3314 	  /* XXX - so cxnSleep() doesn't die when it validates the connection */
3315           cxnIdle (cxn) ;
3316         }
3317       /* XXX - it would be nice if we QUIT first, but we'd have to go
3318          into a state where we just search for the 205 response, and
3319          only go into the sleep state at that point */
3320       cxnSleepOrDie (cxn) ;
3321     }
3322 }
3323 
3324 
3325 
3326 
3327 
3328 /*
3329  * process the "not wanted" response to the IHAVE.
3330  */
processResponse435(Connection cxn,char * response UNUSED)3331 static void processResponse435 (Connection cxn, char *response UNUSED)
3332 {
3333   ArtHolder artHolder ;
3334 
3335   if (cxn->doesStreaming)
3336     {
3337       warn ("%s:%d cxnsleep unexpected non-streaming response for"
3338             " streaming connection: %s", hostPeerName (cxn->myHost),
3339             cxn->ident,response) ;
3340       cxnSleepOrDie (cxn) ;
3341       return ;
3342     }
3343 
3344   if (!(cxn->state == cxnFlushingS ||
3345         cxn->state == cxnFeedingS ||
3346         cxn->state == cxnClosingS))
3347     {
3348       warn ("%s:%d cxnsleep connection in bad state: %s",
3349             hostPeerName (cxn->myHost), cxn->ident,
3350             stateToString (cxn->state)) ;
3351       cxnSleepOrDie (cxn) ;
3352       return ;
3353     }
3354 
3355   /* Some servers, such as early versions of Diablo, had a bug where they'd
3356      respond with a 435 code (which should only be used for refusing an
3357      article before it was offered) after an article has been sent. */
3358   if (cxn->checkRespHead == NULL)
3359     {
3360       warn ("%s:%d cxnsleep response unexpected: %d",
3361             hostPeerName (cxn->myHost), cxn->ident, 435) ;
3362       cxnSleepOrDie (cxn) ;
3363       return ;
3364     }
3365 
3366   ASSERT (cxn->articleQTotal == 1) ;
3367   VALIDATE_CONNECTION (cxn) ;
3368 
3369   cxn->articleQTotal-- ;
3370   cxn->checksRefused++ ;
3371 
3372   artHolder = cxn->checkRespHead ;
3373   cxn->checkRespHead = NULL ;
3374 
3375   if (cxn->articleQTotal == 0 && !writeIsPending(cxn->myEp))
3376     cxnIdle (cxn) ;
3377 
3378   hostArticleNotWanted (cxn->myHost, cxn, artHolder->article) ;
3379   delArtHolder (artHolder) ;
3380 
3381 #if 0
3382   d_printf (1,"%s:%d On exiting 435 article queue total is %d (%d %d %d %d)\n",
3383            hostPeerName (cxn->myHost), cxn->ident,
3384            cxn->articleQTotal,
3385            (int) (cxn->checkHead != NULL),
3386            (int) (cxn->checkRespHead != NULL),
3387            (int) (cxn->takeHead != NULL),
3388            (int) (cxn->takeRespHead != NULL));
3389 #endif
3390 }
3391 
3392 
3393 
3394 
3395 
3396 /*
3397  * process the "transfer failed" response to the IHAVE-body, (seems this
3398  * can come from the IHAVE too).
3399  */
processResponse436(Connection cxn,char * response UNUSED)3400 static void processResponse436 (Connection cxn, char *response UNUSED)
3401 {
3402   ArtHolder artHolder ;
3403 
3404   if (cxn->doesStreaming)
3405     {
3406       warn ("%s:%d cxnsleep unexpected non-streaming response for"
3407             " streaming connection: %s", hostPeerName (cxn->myHost),
3408             cxn->ident,response) ;
3409       cxnSleepOrDie (cxn) ;
3410       return ;
3411     }
3412 
3413   if (!(cxn->state == cxnFlushingS ||
3414         cxn->state == cxnFeedingS ||
3415         cxn->state == cxnClosingS))
3416     {
3417       warn ("%s:%d cxnsleep connection in bad state: %s",
3418             hostPeerName (cxn->myHost), cxn->ident,
3419             stateToString (cxn->state)) ;
3420       cxnSleepOrDie (cxn) ;
3421       return ;
3422     }
3423 
3424   ASSERT (cxn->articleQTotal == 1) ;
3425   ASSERT (cxn->takeRespHead != NULL || cxn->checkRespHead != NULL) ;
3426   VALIDATE_CONNECTION (cxn) ;
3427 
3428   cxn->articleQTotal-- ;
3429 
3430   if (cxn->takeRespHead != NULL) /* IHAVE-body command barfed */
3431     {
3432       artHolder = cxn->takeRespHead ;
3433       cxn->takeRespHead = NULL ;
3434     }
3435   else                          /* IHAVE command barfed */
3436     {
3437       artHolder = cxn->checkRespHead ;
3438       cxn->checkRespHead = NULL ;
3439     }
3440 
3441   if (cxn->articleQTotal == 0 && !writeIsPending(cxn->myEp))
3442     cxnIdle (cxn) ;
3443 
3444   hostArticleDeferred (cxn->myHost, cxn, artHolder->article) ;
3445   delArtHolder (artHolder) ;
3446 }
3447 
3448 
3449 
3450 
3451 
3452 /*
3453  * Process the "article rejected do not try again" response to the
3454  * IHAVE-body.
3455  */
processResponse437(Connection cxn,char * response UNUSED)3456 static void processResponse437 (Connection cxn, char *response UNUSED)
3457 {
3458   ArtHolder artHolder ;
3459 
3460   if (cxn->doesStreaming)
3461     {
3462       warn ("%s:%d cxnsleep unexpected non-streaming response for"
3463             " streaming connection: %s", hostPeerName (cxn->myHost),
3464             cxn->ident,response) ;
3465       cxnSleepOrDie (cxn) ;
3466       return ;
3467     }
3468 
3469   if (!(cxn->state == cxnFlushingS ||
3470         cxn->state == cxnFeedingS ||
3471         cxn->state == cxnClosingS))
3472     {
3473       warn ("%s:%d cxnsleep connection in bad state: %s",
3474             hostPeerName (cxn->myHost), cxn->ident,
3475             stateToString (cxn->state)) ;
3476       cxnSleepOrDie (cxn) ;
3477       return ;
3478     }
3479 
3480   ASSERT (cxn->articleQTotal == 1) ;
3481   ASSERT (cxn->takeRespHead != NULL) ;
3482   VALIDATE_CONNECTION (cxn) ;
3483 
3484   cxn->articleQTotal-- ;
3485   cxn->takesRejected++ ;
3486 
3487   artHolder = cxn->takeRespHead ;
3488   cxn->takeRespHead = NULL ;
3489   cxn->takesSizeRejected += artSize(artHolder->article);
3490 
3491   /* Some servers return the 437 response before we're done sending. */
3492   if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
3493     cxnIdle (cxn) ;
3494 
3495   hostArticleRejected (cxn->myHost, cxn, artHolder->article) ;
3496   delArtHolder (artHolder) ;
3497 }
3498 
3499 
3500 /* Process the response 480 Transfer permission defined. We're probably
3501    talking to a remote nnrpd on a system that forgot to put us in
3502    the hosts.nntp */
processResponse480(Connection cxn,char * response UNUSED)3503 static void processResponse480 (Connection cxn, char *response UNUSED)
3504 {
3505   if (cxn->doesStreaming)
3506     {
3507       warn ("%s:%d cxnsleep unexpected non-streaming response for"
3508             " streaming connection: %s", hostPeerName (cxn->myHost),
3509             cxn->ident,response) ;
3510       cxnSleepOrDie (cxn) ;
3511       return ;
3512     }
3513 
3514   if (!(cxn->state == cxnFlushingS ||
3515         cxn->state == cxnFeedingS ||
3516         cxn->state == cxnClosingS))
3517     {
3518       warn ("%s:%d cxnsleep connection in bad state: %s",
3519             hostPeerName (cxn->myHost), cxn->ident,
3520             stateToString (cxn->state)) ;
3521       cxnSleepOrDie (cxn) ;
3522       return ;
3523     }
3524 
3525   VALIDATE_CONNECTION (cxn) ;
3526 
3527   warn ("%s:%d cxnsleep transfer permission denied",
3528         hostPeerName (cxn->myHost), cxn->ident) ;
3529 
3530   if (cxn->state == cxnClosingS)
3531     cxnDead (cxn) ;
3532   else
3533     cxnSleep (cxn) ;
3534 }
3535 
3536 
3537 
3538 
3539 
3540 /*
3541  * Handle the response 503, which means the timeout of nnrpd.
3542  */
processResponse503(Connection cxn,char * response UNUSED)3543 static void processResponse503 (Connection cxn, char *response UNUSED)
3544 {
3545   bool immedRecon ;
3546 
3547   VALIDATE_CONNECTION (cxn) ;
3548 
3549   if (!(cxn->state == cxnFeedingS ||
3550 	cxn->state == cxnIdleS ||
3551 	cxn->state == cxnFlushingS ||
3552 	cxn->state == cxnClosingS))
3553     {
3554       warn ("%s:%d cxnsleep connection in bad state: %s",
3555             hostPeerName (cxn->myHost), cxn->ident,
3556             stateToString (cxn->state)) ;
3557       cxnSleepOrDie (cxn) ;
3558       return ;
3559     }
3560 
3561   if (cxn->articleQTotal != 0)
3562     notice ("%s:%d flush re-connect failed", hostPeerName (cxn->myHost),
3563             cxn->ident) ;
3564 
3565   cxnLogStats (cxn,true) ;
3566 
3567   immedRecon = cxn->immedRecon ;
3568 
3569   hostCxnDead (cxn->myHost,cxn) ;
3570 
3571   if (cxn->state == cxnFlushingS && immedRecon)
3572     {
3573       abortConnection (cxn) ;
3574       if (!cxnConnect (cxn))
3575         notice ("%s:%d flush re-connect failed", hostPeerName (cxn->myHost),
3576                 cxn->ident) ;
3577     }
3578   else if (cxn->state == cxnFlushingS)
3579     cxnWait (cxn) ;
3580   else
3581     cxnDead (cxn) ;
3582 
3583 }
3584 
3585 
3586 
3587 
3588 
3589 /****************************************************************************
3590  *
3591  * END RESPONSE CODE PROCESSING.
3592  *
3593  ***************************************************************************/
3594 
3595 
3596 
3597 
3598 
3599 /*
3600  * puts the Connection into the sleep state.
3601  */
cxnSleep(Connection cxn)3602 static void cxnSleep (Connection cxn)
3603 {
3604   ASSERT (cxn != NULL) ;
3605   ASSERT (cxn->state == cxnFlushingS ||
3606           cxn->state == cxnIdleS ||
3607           cxn->state == cxnFeedingS ||
3608           cxn->state == cxnConnectingS) ;
3609   VALIDATE_CONNECTION (cxn) ;
3610 
3611   abortConnection (cxn) ;
3612 
3613   prepareReopenCbk (cxn) ;  /* XXX - we don't want to reopen if idle */
3614   cxn->state = cxnSleepingS ;
3615 
3616   /* tell our Host we're asleep so it doesn't try to give us articles */
3617   hostCxnSleeping (cxn->myHost,cxn) ;
3618 }
3619 
3620 
3621 
cxnDead(Connection cxn)3622 static void cxnDead (Connection cxn)
3623 {
3624   ASSERT (cxn != NULL) ;
3625   VALIDATE_CONNECTION (cxn) ;
3626 
3627   abortConnection (cxn) ;
3628   cxn->state = cxnDeadS ;
3629 }
3630 
3631 
3632 
3633 /*
3634  * Sets the idle timer. If no articles arrive before the timer expires, the
3635  * connection will be closed.
3636  */
cxnIdle(Connection cxn)3637 static void cxnIdle (Connection cxn)
3638 {
3639   ASSERT (cxn != NULL) ;
3640   ASSERT (cxn->state == cxnFeedingS || cxn->state == cxnConnectingS ||
3641           cxn->state == cxnFlushingS || cxn->state == cxnClosingS) ;
3642   ASSERT (cxn->articleQTotal == 0) ;
3643   ASSERT (cxn->writeBlockedTimerId == 0) ;
3644   ASSERT (!writeIsPending (cxn->myEp)) ;
3645   ASSERT (cxn->sleepTimerId == 0) ;
3646 
3647   if (cxn->state == cxnFeedingS || cxn->state == cxnConnectingS)
3648     {
3649       if (cxn->articleReceiptTimeout > 0)
3650         {
3651           clearTimer (cxn->artReceiptTimerId) ;
3652           cxn->artReceiptTimerId = prepareSleep (articleTimeoutCbk,
3653                                                  cxn->articleReceiptTimeout,
3654                                                  cxn) ;
3655         }
3656 
3657       if (cxn->readTimeout > 0 && cxn->state == cxnFeedingS)
3658         clearTimer (cxn->readBlockedTimerId) ;
3659 
3660       cxn->state = cxnIdleS ;
3661 ASSERT (cxn->readBlockedTimerId == 0) ;
3662     }
3663 }
3664 
3665 
3666 
3667 
3668 
3669 /*
3670  * Called when a response from the remote refers to a non-existant
3671  * message-id. The network connection is aborted and the Connection
3672  * object goes into sleep mode.
3673  */
noSuchMessageId(Connection cxn,unsigned int responseCode,const char * msgid,const char * response)3674 static void noSuchMessageId (Connection cxn, unsigned int responseCode,
3675                              const char *msgid, const char *response)
3676 {
3677   const char *peerName = hostPeerName (cxn->myHost) ;
3678 
3679   if (msgid == NULL || strlen (msgid) == 0)
3680     warn ("%s:%d cxnsleep message-id missing in response code %d: %s",
3681           peerName, cxn->ident, responseCode, response) ;
3682   else
3683     warn ("%s:%d cxnsleep message-id invalid message-id in response code"
3684           " %d: %s", peerName, cxn->ident, responseCode, msgid) ;
3685 
3686   cxnLogStats (cxn,true) ;
3687 
3688   if (cxn->state != cxnClosingS)
3689     cxnSleep (cxn) ;
3690   else
3691     cxnDead (cxn) ;
3692 }
3693 
3694 
3695 
3696 
3697 
3698 /*
3699  * a processing error has occured (for example in parsing a response), or
3700  * we're at the end of the FSM and we're cleaning up.
3701  */
abortConnection(Connection cxn)3702 static void abortConnection (Connection cxn)
3703 {
3704   ASSERT (cxn != NULL) ;
3705   VALIDATE_CONNECTION (cxn) ;
3706 
3707   /* cxn->myEp could be NULL if we get here during cxnConnect (via
3708      cxnWait()) */
3709   if (cxn->myEp != NULL)
3710     {
3711 
3712       delEndPoint (cxn->myEp) ;
3713       cxn->myEp = NULL ;
3714     }
3715 
3716   clearTimer (cxn->sleepTimerId) ;
3717   clearTimer (cxn->artReceiptTimerId) ;
3718   clearTimer (cxn->readBlockedTimerId) ;
3719   clearTimer (cxn->writeBlockedTimerId) ;
3720   clearTimer (cxn->flushTimerId) ;
3721 
3722   deferAllArticles (cxn) ;      /* give any articles back to Host */
3723 
3724   bufferSetDataSize (cxn->respBuffer,0) ;
3725 
3726   resetConnection (cxn) ;
3727 
3728   if (cxn->state == cxnFeedingS ||
3729       cxn->state == cxnIdleS ||
3730       cxn->state == cxnFlushingS ||
3731       cxn->state == cxnClosingS)
3732     hostCxnDead (cxn->myHost,cxn) ;
3733 }
3734 
3735 
3736 
3737 
3738 /*
3739  * Set up the callback used when the Connection is sleeping (i.e. will try
3740  * to reopen the connection).
3741  */
prepareReopenCbk(Connection cxn)3742 static void prepareReopenCbk (Connection cxn)
3743 {
3744   ASSERT (cxn->sleepTimerId == 0) ;
3745 
3746   if (!(cxn->state == cxnConnectingS ||
3747         cxn->state == cxnIdleS ||
3748         cxn->state == cxnFeedingS ||
3749         cxn->state == cxnFlushingS ||
3750         cxn->state == cxnStartingS))
3751     {
3752       warn ("%s:%d cxnsleep connection in bad state: %s",
3753             hostPeerName (cxn->myHost), cxn->ident,
3754             stateToString (cxn->state)) ;
3755       cxnSleepOrDie (cxn) ;
3756       return ;
3757     }
3758 
3759   d_printf (1,"%s:%d Setting up a reopen callback\n",
3760            hostPeerName (cxn->myHost), cxn->ident) ;
3761 
3762   cxn->sleepTimerId = prepareSleep (reopenTimeoutCbk, cxn->sleepTimeout, cxn) ;
3763 
3764   /* bump the sleep timer amount each time to wait longer and longer. Gets
3765      reset in resetConnection() */
3766   cxn->sleepTimeout *= 2 ;
3767   if (cxn->sleepTimeout > max_reconnect_period)
3768     cxn->sleepTimeout = max_reconnect_period ;
3769 }
3770 
3771 
3772 
3773 
3774 
3775 /*
3776  * (re)set all state variables to inital condition.
3777  */
resetConnection(Connection cxn)3778 static void resetConnection (Connection cxn)
3779 {
3780   ASSERT (cxn != NULL) ;
3781 
3782   bufferSetDataSize (cxn->respBuffer,0) ;
3783 
3784   cxn->loggedNoCr = false ;
3785   cxn->maxCheck = 1 ;
3786   cxn->immedRecon = false ;
3787   cxn->doesStreaming = false ;  /* who knows, next time around maybe... */
3788   cxn->authenticated = false ;
3789   cxn->quitWasIssued = false ;
3790   cxn->needsChecks = true ;
3791   cxn->timeCon = 0 ;
3792 
3793   cxn->artsTaken = 0 ;
3794   cxn->checksIssued = 0 ;
3795   cxn->checksRefused = 0 ;
3796   cxn->takesRejected = 0 ;
3797   cxn->takesOkayed = 0 ;
3798   cxn->takesSizeRejected = 0 ;
3799   cxn->takesSizeOkayed = 0 ;
3800 
3801   cxn->timeCon_checkpoint = 0;
3802   cxn->checksIssued_checkpoint = 0;
3803   cxn->checksRefused_checkpoint = 0;
3804   cxn->takesRejected_checkpoint = 0;
3805   cxn->takesOkayed_checkpoint = 0;
3806   cxn->takesSizeRejected_checkpoint = 0;
3807   cxn->takesSizeOkayed_checkpoint = 0;
3808 
3809   cxn->filterValue = 0.0 ;
3810 }
3811 
3812 
3813 
3814 /*
3815  * Give back all articles that are queued, but not actually in progress.
3816  * XXX merge come of this with deferAllArticles
3817  */
deferQueuedArticles(Connection cxn)3818 static void deferQueuedArticles (Connection cxn)
3819 {
3820   ArtHolder p, q ;
3821 
3822   for (q = NULL, p = cxn->checkHead ; p != NULL ; p = q)
3823     {
3824       q = p->next ;
3825       hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3826       delArtHolder (p) ;
3827       cxn->articleQTotal-- ;
3828     }
3829   cxn->checkHead = NULL ;
3830 
3831   for (q = NULL, p = cxn->takeHead ; cxn->doesStreaming && p != NULL ; p = q)
3832     {
3833       q = p->next ;
3834       hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3835       delArtHolder (p) ;
3836       cxn->articleQTotal-- ;
3837     }
3838   cxn->takeHead = NULL ;
3839 }
3840 
3841 
3842 
3843 /*
3844  * Give back any articles we have to the Host for later retrys.
3845  */
deferAllArticles(Connection cxn)3846 static void deferAllArticles (Connection cxn)
3847 {
3848   ArtHolder p, q ;
3849 
3850   for (q = NULL, p = cxn->checkHead ; p != NULL ; p = q)
3851     {
3852       q = p->next ;
3853       hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3854       delArtHolder (p) ;
3855       cxn->articleQTotal-- ;
3856     }
3857   cxn->checkHead = NULL ;
3858 
3859   for (q = NULL, p = cxn->checkRespHead ; p != NULL ; p = q)
3860     {
3861       q = p->next ;
3862       hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3863       delArtHolder (p) ;
3864       cxn->articleQTotal-- ;
3865     }
3866   cxn->checkRespHead = NULL ;
3867 
3868   for (q = NULL, p = cxn->takeHead ; p != NULL ; p = q)
3869     {
3870       q = p->next ;
3871       hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3872       delArtHolder (p) ;
3873       cxn->articleQTotal-- ;
3874     }
3875   cxn->takeHead = NULL ;
3876 
3877   for (q = NULL, p = cxn->takeRespHead ; p != NULL ; p = q)
3878     {
3879       q = p->next ;
3880       hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3881       delArtHolder (p) ;
3882       cxn->articleQTotal-- ;
3883     }
3884   cxn->takeRespHead = NULL ;
3885 
3886   ASSERT (cxn->articleQTotal == 0) ;
3887 }
3888 
3889 
3890 
3891 
3892 
3893 /*
3894  * Called when there's an article to be pushed out to the remote. Even if
3895  * the Connection has an article it's possible that nothing will be written
3896  * (e.g. if the article on the queue doesn't exist any more)
3897  */
doSomeWrites(Connection cxn)3898 static void doSomeWrites (Connection cxn)
3899 {
3900   bool doneSome = false ;
3901 
3902   /* If there's a write pending we can't do anything now.
3903    * No addWorkCallback here (otherwise innfeed consumes too much CPU). */
3904   if ( writeIsPending (cxn->myEp) )
3905     {
3906       return ;
3907     }
3908   else if ( writesNeeded (cxn) ) /* something on a queue. */
3909     {
3910       if (cxn->doesStreaming)
3911         doneSome = issueStreamingCommands (cxn) ;
3912       else
3913         doneSome = issueIHAVE (cxn) ;
3914 
3915       /* doneSome will be false if article(s) were gone, but if the Host
3916          has something available, then it would have been put on the queue
3917          for next time around. */
3918       if (!doneSome)
3919         {
3920           if (writesNeeded (cxn)) /* Host gave us something */
3921             addWorkCallback (cxn->myEp,cxnWorkProc,cxn) ; /* for next time. */
3922           else if (cxn->articleQTotal == 0)
3923             {
3924               /* if we were in cxnFeedingS, then issueStreamingCommands
3925                  already called cxnIdle(). */
3926               if (cxn->state == cxnClosingS || cxn->state == cxnFlushingS)
3927                 issueQUIT (cxn) ; /* and nothing to wait for... */
3928             }
3929         }
3930     }
3931   else if (cxn->state == cxnClosingS || cxn->state == cxnFlushingS)
3932     {                           /* nothing to do... */
3933       if (cxn->articleQTotal == 0)
3934         issueQUIT (cxn) ;       /* and nothing to wait for before closing */
3935     }
3936 }
3937 
3938 
3939 
3940 
3941 
3942 /* Queue up a buffer with the IHAVE command in it for the article at
3943  * the head of the transmisson queue.
3944  *
3945  * If the article is missing, then the Host will be notified and
3946  * another article may be put on the Connections queue. This new
3947  * article is ignored for now, but a work callback is registered so
3948  * that it can be looked at later.
3949  */
issueIHAVE(Connection cxn)3950 static bool issueIHAVE (Connection cxn)
3951 {
3952   Buffer ihaveBuff, *writeArr ;
3953   ArtHolder artH ;
3954   Article article ;
3955   const char *msgid ;
3956   char *p ;
3957   unsigned int tmp ;
3958   size_t bufLen = 256 ;
3959   bool rval = false ;
3960 
3961   ASSERT (!cxn->doesStreaming) ;
3962   ASSERT (cxn->state == cxnFlushingS ||
3963           cxn->state == cxnFeedingS ||
3964           cxn->state == cxnClosingS) ;
3965   ASSERT (cxn->articleQTotal == 1) ;
3966   ASSERT (cxn->checkHead != NULL) ;
3967   ASSERT (writeIsPending (cxn->myEp) == false) ;
3968   VALIDATE_CONNECTION (cxn) ;
3969 
3970   artH = cxn->checkHead ;
3971   article = cxn->checkHead->article ;
3972   msgid = artMsgId (artH->article) ;
3973 
3974   ASSERT (msgid != NULL) ;
3975   ASSERT (article != NULL) ;
3976 
3977       if ((tmp = (strlen (msgid) + 10)) > bufLen)
3978         bufLen = tmp ;
3979 
3980       ihaveBuff = newBuffer (bufLen) ;
3981 
3982       ASSERT (ihaveBuff != NULL) ;
3983 
3984       p = bufferBase (ihaveBuff) ;
3985       sprintf (p, "IHAVE %s\r\n", msgid) ;
3986       bufferSetDataSize (ihaveBuff, strlen (p)) ;
3987 
3988       d_printf (5,"%s:%d Command IHAVE %s\n",
3989                hostPeerName (cxn->myHost),cxn->ident,msgid) ;
3990 
3991       writeArr = makeBufferArray (ihaveBuff, NULL) ;
3992       if ( !prepareWriteWithTimeout (cxn->myEp, writeArr, commandWriteDone,
3993                                      cxn) )
3994         {
3995           die ("%s:%d fatal prepare write for IHAVE failed",
3996                hostPeerName (cxn->myHost), cxn->ident) ;
3997         }
3998 
3999       /* now move the article to the second queue */
4000       cxn->checkRespHead = cxn->checkHead ;
4001       cxn->checkHead = NULL ;
4002 
4003       cxn->checksIssued++ ;
4004       hostArticleOffered (cxn->myHost, cxn) ;
4005 
4006       rval = true ;
4007 
4008   return rval ;
4009 }
4010 
4011 
4012 
4013 
4014 
4015 /*
4016  * Do a prepare write with the article body as the body portion of the
4017  * IHAVE command
4018  */
issueIHAVEBody(Connection cxn)4019 static void issueIHAVEBody (Connection cxn)
4020 {
4021   Buffer *writeArray ;
4022   Article article ;
4023 
4024   ASSERT (cxn != NULL) ;
4025   ASSERT (!cxn->doesStreaming) ;
4026   ASSERT (cxn->state == cxnFlushingS ||
4027           cxn->state == cxnFeedingS ||
4028           cxn->state == cxnClosingS) ;
4029   ASSERT (cxn->articleQTotal == 1) ;
4030   ASSERT (cxn->takeHead != NULL) ;
4031   VALIDATE_CONNECTION (cxn) ;
4032 
4033   article = cxn->takeHead->article ;
4034   ASSERT (article != NULL) ;
4035 
4036   if (cxn->state != cxnClosingS)
4037     writeArray = artGetNntpBuffers (article) ;
4038   else
4039     writeArray = NULL ;
4040 
4041   if (writeArray == NULL)
4042     {
4043       /* missing article (expired for example) will get us here. */
4044       if (dotBuffer == NULL)
4045         {
4046           dotBuffer = newBufferByCharP (".\r\n",3,3) ;
4047           dotFirstBuffer = newBufferByCharP ("\r\n.",3,3) ;
4048           crlfBuffer = newBufferByCharP ("\r\n",2,2) ;
4049         }
4050 
4051       /* we'll just write the empty buffer and the remote will complain
4052          with 437 */
4053       writeArray = makeBufferArray  (bufferTakeRef (dotBuffer),NULL) ;
4054     }
4055 
4056 
4057   if ( !prepareWriteWithTimeout (cxn->myEp, writeArray, ihaveBodyDone, cxn) )
4058     {
4059       die ("%s:%d fatal prepare write failed in issueIHAVEBody",
4060            hostPeerName (cxn->myHost), cxn->ident) ;
4061     }
4062   else
4063     {
4064       d_printf (5,"%s:%d prepared write for IHAVE body.\n",
4065                hostPeerName (cxn->myHost),cxn->ident) ;
4066     }
4067 
4068   /* now move the article to the last queue */
4069   cxn->takeRespHead = cxn->takeHead ;
4070   cxn->takeHead = NULL ;
4071 
4072   return ;
4073 }
4074 
4075 
4076 
4077 
4078 
4079 /* Process the two command queues. Slaps all the CHECKs together and
4080  * then does the TAKETHIS commands.
4081  *
4082  * If no articles on the queue(s) are valid, then the Host is
4083  * notified. It may queue up new articles on the Connection, but
4084  * these are ignored for now. A work proc is registered so the
4085  * articles can be processed later.
4086  */
issueStreamingCommands(Connection cxn)4087 static bool issueStreamingCommands (Connection cxn)
4088 {
4089   Buffer checkBuffer = NULL ;   /* the buffer with the CHECK commands in it. */
4090   Buffer *writeArray = NULL ;
4091   ArtHolder p, q ;
4092   bool rval = false ;
4093 
4094   ASSERT (cxn != NULL) ;
4095   ASSERT (cxn->myEp != NULL) ;
4096   ASSERT (cxn->doesStreaming) ;
4097   VALIDATE_CONNECTION (cxn) ;
4098 
4099   checkBuffer = buildCheckBuffer (cxn) ; /* may be null if none to issue */
4100 
4101   if (checkBuffer != NULL)
4102     {
4103       /* Now shift the articles to their new queue. */
4104       for (p = cxn->checkRespHead ; p != NULL && p->next != NULL ; p = p->next)
4105         /* nada--finding end of queue*/ ;
4106 
4107       if (p == NULL)
4108         cxn->checkRespHead = cxn->checkHead ;
4109       else
4110         p->next = cxn->checkHead ;
4111 
4112       cxn->checkHead = NULL ;
4113     }
4114 
4115 
4116   writeArray = buildTakethisBuffers (cxn,checkBuffer) ; /* may be null */
4117 
4118   /* If not null, then writeArray will have checkBuffer (if it wasn't NULL)
4119      in the first spot and the takethis buffers after that. */
4120   if (writeArray)
4121     {
4122       if ( !prepareWriteWithTimeout (cxn->myEp, writeArray,
4123                                      commandWriteDone, cxn) )
4124         {
4125           die ("%s:%d fatal prepare write for STREAMING commands failed",
4126                hostPeerName (cxn->myHost), cxn->ident) ;
4127         }
4128 
4129       rval = true ;
4130 
4131       /* now shift articles over to their new queue. */
4132       for (p = cxn->takeRespHead ; p != NULL && p->next != NULL ; p = p->next)
4133         /* nada--finding end of queue */ ;
4134 
4135       if (p == NULL)
4136         cxn->takeRespHead = cxn->takeHead ;
4137       else
4138         p->next = cxn->takeHead ;
4139 
4140       cxn->takeHead = NULL ;
4141     }
4142 
4143   /* we defer the missing article notification to here because if there
4144      was a big backlog of missing articles *and* we're running in
4145      no-CHECK mode, then the Host would be putting bad articles on the
4146      queue we're taking them off of. */
4147   if (cxn->missing && cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
4148     cxnIdle (cxn) ;
4149   for (p = cxn->missing ; p != NULL ; p = q)
4150     {
4151       hostArticleIsMissing (cxn->myHost, cxn, p->article) ;
4152       q = p->next ;
4153       delArtHolder (p) ;
4154     }
4155   cxn->missing = NULL ;
4156 
4157   return rval ;
4158 }
4159 
4160 
4161 
4162 
4163 
4164 /*
4165  * build up the buffer of all the CHECK commands.
4166  */
buildCheckBuffer(Connection cxn)4167 static Buffer buildCheckBuffer (Connection cxn)
4168 {
4169   ArtHolder p ;
4170   size_t lenBuff = 0 ;
4171   Buffer checkBuffer = NULL ;
4172   const char *peerName = hostPeerName (cxn->myHost) ;
4173 
4174   p = cxn->checkHead ;
4175   while (p != NULL)
4176     {
4177       Article article = p->article ;
4178       const char *msgid ;
4179 
4180       msgid = artMsgId (article) ;
4181 
4182       lenBuff += (8 + strlen (msgid)) ; /* 8 == strlen("CHECK \r\n") */
4183       p = p->next ;
4184     }
4185 
4186   if (lenBuff > 0)
4187     lenBuff++ ;                 /* for the null byte */
4188 
4189   /* now build up the single buffer that contains all the CHECK commands */
4190   if (lenBuff > 0)
4191     {
4192       char *t ;
4193       size_t tlen = 0 ;
4194 
4195       checkBuffer = newBuffer (lenBuff) ;
4196       t = bufferBase (checkBuffer) ;
4197 
4198       p = cxn->checkHead ;
4199       while (p != NULL)
4200         {
4201           const char *msgid = artMsgId (p->article) ;
4202 
4203           sprintf (t,"CHECK %s\r\n", msgid) ;
4204           d_printf (5,"%s:%d Command %s\n", peerName, cxn->ident, t) ;
4205 
4206           tlen += strlen (t) ;
4207 
4208           while ( *t ) t++ ;
4209 
4210           cxn->checksIssued++ ;
4211           hostArticleOffered (cxn->myHost,cxn) ;
4212 
4213           p = p->next ;
4214         }
4215 
4216       ASSERT (tlen + 1 == lenBuff) ;
4217 
4218       bufferSetDataSize (checkBuffer, tlen) ;
4219     }
4220 
4221   return checkBuffer ;
4222 }
4223 
4224 
4225 
4226 
4227 
4228 
4229 /*
4230  * Construct and array of TAKETHIS commands and the command bodies. Any
4231  * articles on the queue that are missing will be removed and the Host will
4232  * be informed.
4233  */
buildTakethisBuffers(Connection cxn,Buffer checkBuffer)4234 static Buffer *buildTakethisBuffers (Connection cxn, Buffer checkBuffer)
4235 {
4236   size_t lenArray = 0 ;
4237   ArtHolder p, q ;
4238   Buffer *rval = NULL ;
4239   const char *peerName = hostPeerName (cxn->myHost) ;
4240 
4241   if (checkBuffer != NULL)
4242     lenArray++ ;
4243 
4244   if (cxn->takeHead != NULL)    /* some TAKETHIS commands to be done. */
4245     {
4246       Buffer takeBuffer ;
4247       unsigned int takeBuffLen  ;
4248       unsigned int writeIdx = 0 ;
4249 
4250       /* count up all the buffers we'll be writing. One extra each time for
4251          the TAKETHIS command buffer*/
4252       for (p = cxn->takeHead ; p != NULL ; p = p->next)
4253         if (artContentsOk (p->article))
4254             lenArray += (1 + artNntpBufferCount (p->article)) ;
4255 
4256       /* now allocate the array for the buffers and put them all in it */
4257       /* 1 for the terminator */
4258       rval = xmalloc (sizeof(Buffer) * (lenArray + 1)) ;
4259 
4260       if (checkBuffer != NULL)
4261         rval [writeIdx++] = checkBuffer ;
4262 
4263       q = NULL ;
4264       p = cxn->takeHead ;
4265       while (p != NULL)
4266         {
4267           char *t ;
4268           const char *msgid ;
4269           Article article ;
4270           Buffer *articleBuffers ;
4271           int i, nntpLen ;
4272 
4273           article = p->article ;
4274           nntpLen = artNntpBufferCount (article) ;
4275           msgid = artMsgId (article) ;
4276 
4277           if (nntpLen == 0)
4278             {                   /* file no longer valid so drop from queue */
4279               ArtHolder ta = p ;
4280 
4281               if (q == NULL)    /* it's the first in the queue */
4282                 cxn->takeHead = p->next ;
4283               else
4284                 q->next = p->next ;
4285 
4286               p = p->next ;
4287               ASSERT (cxn->articleQTotal > 0) ;
4288               cxn->articleQTotal-- ;
4289 
4290               ta->next = cxn->missing ;
4291               cxn->missing = ta ;
4292             }
4293           else
4294             {
4295               articleBuffers = artGetNntpBuffers (article) ;
4296 
4297               /* set up the buffer with the TAKETHIS command in it.
4298                  12 == strlen ("TAKETHIS \n\r") */
4299               takeBuffLen = 12 + strlen (msgid) ;
4300               takeBuffer = newBuffer (takeBuffLen) ;
4301               t = bufferBase (takeBuffer) ;
4302 
4303               sprintf (t, "TAKETHIS %s\r\n", msgid) ;
4304               bufferSetDataSize (takeBuffer, strlen (t)) ;
4305 
4306               d_printf (5,"%s:%d Command %s\n", peerName, cxn->ident, t) ;
4307 
4308               ASSERT (writeIdx <= lenArray) ;
4309               rval [writeIdx++] = takeBuffer ;
4310 
4311               /* now add all the buffers that make up the body of the TAKETHIS
4312                  command  */
4313               for (i = 0 ; i < nntpLen ; i++)
4314                 {
4315                   ASSERT (writeIdx <= lenArray) ;
4316                   rval [writeIdx++] = bufferTakeRef (articleBuffers [i]) ;
4317                 }
4318 
4319               freeBufferArray (articleBuffers) ;
4320 
4321               if ( !cxn->needsChecks )
4322                 {
4323                   /* this isn't quite right. An article may be counted
4324                      twice if we switch to no-CHECK mode after its
4325                      CHECK was issued, but before its TAKETHIS was done
4326                      just now. I'm not going to worry unless someone
4327                      complains. */
4328 
4329                   cxn->checksIssued++ ;
4330                   hostArticleOffered (cxn->myHost,cxn) ;
4331                 }
4332 
4333               q = p ;
4334               p = p->next ;
4335             }
4336         }
4337 
4338       if (writeIdx > 0)
4339         rval [writeIdx] = NULL ;
4340       else
4341         {                       /* all articles were missing and no CHECKS */
4342           free (rval) ;
4343           rval = NULL ;
4344         }
4345     }
4346   else if (checkBuffer != NULL) /* no TAKETHIS to do, but some CHECKS */
4347     rval = makeBufferArray (checkBuffer, NULL) ;
4348 
4349   return rval ;
4350 }
4351 
4352 
4353 
4354 
4355 
4356 /*
4357  * for one reason or another we need to disconnect gracefully. We send a
4358  * QUIT command.
4359  */
issueQUIT(Connection cxn)4360 static void issueQUIT (Connection cxn)
4361 {
4362   Buffer quitBuffer, *writeArray ;
4363   const char *peerName = hostPeerName (cxn->myHost) ;
4364 
4365   ASSERT (cxn->takeHead == NULL) ;
4366   ASSERT (cxn->checkHead == NULL) ;
4367   VALIDATE_CONNECTION (cxn) ;
4368 
4369   if (cxn->quitWasIssued)
4370     return ;
4371 
4372   if (writeIsPending (cxn->myEp))
4373     {
4374       warn ("%s:%d internal QUIT while write pending", peerName,
4375             cxn->ident) ;
4376 
4377       if (cxn->state == cxnClosingS)
4378         cxnDead (cxn) ;
4379       else
4380         cxnWait (cxn) ;
4381     }
4382   else
4383     {
4384       quitBuffer = newBuffer (7) ;
4385       strlcpy (bufferBase (quitBuffer), "QUIT\r\n", 7) ;
4386       bufferSetDataSize (quitBuffer, 6) ;
4387 
4388       writeArray = makeBufferArray (quitBuffer, NULL) ;
4389 
4390       d_printf (1,"%s:%d Sending a quit command\n",
4391                hostPeerName (cxn->myHost),cxn->ident) ;
4392 
4393       cxn->quitWasIssued = true ; /* not exactly true, but good enough */
4394 
4395       if ( !prepareWriteWithTimeout (cxn->myEp, writeArray, quitWritten,
4396                                      cxn) )
4397         {
4398           die ("%s:%d fatal prepare write for QUIT command failed", peerName,
4399                cxn->ident) ;
4400         }
4401     }
4402 }
4403 
4404 
4405 
4406 
4407 
4408 /*
4409  * Set up the timer for the blocked reads
4410  */
initReadBlockedTimeout(Connection cxn)4411 static void initReadBlockedTimeout (Connection cxn)
4412 {
4413   ASSERT (cxn != NULL) ;
4414 ASSERT (cxn->state != cxnIdleS ) ;
4415 
4416   /* set up the response timer. */
4417   clearTimer (cxn->readBlockedTimerId) ;
4418 
4419   if (cxn->readTimeout > 0)
4420     cxn->readBlockedTimerId = prepareSleep (responseTimeoutCbk, cxn->readTimeout, cxn) ;
4421 }
4422 
4423 
4424 
4425 
4426 
4427 /*
4428  * Set up the timer for the blocked reads
4429  */
prepareWriteWithTimeout(EndPoint endp,Buffer * buffers,EndpRWCB done,Connection cxn)4430 static int prepareWriteWithTimeout (EndPoint endp,
4431                                     Buffer *buffers,
4432                                     EndpRWCB done,
4433                                     Connection cxn)
4434 {
4435   /* Clear the read timer, since we can't expect a response until everything
4436      is sent.
4437      XXX - would be nice to have a timeout for responses if we're sending a
4438      string of commands. */
4439   clearTimer (cxn->readBlockedTimerId) ;
4440 
4441   /* set up the write timer. */
4442   clearTimer (cxn->writeBlockedTimerId) ;
4443 
4444   if (cxn->writeTimeout > 0)
4445     cxn->writeBlockedTimerId = prepareSleep (writeTimeoutCbk, cxn->writeTimeout,
4446                                              cxn) ;
4447 
4448   /* set up the write. */
4449   return prepareWrite (endp, buffers, writeProgress, done, cxn) ;
4450 }
4451 
4452 
4453 
4454 
4455 
4456 /*
4457  * Does the actual deletion of a connection and all its private data.
4458  */
delConnection(Connection cxn)4459 static void delConnection (Connection cxn)
4460 {
4461   bool shutDown;
4462   Connection c, q;
4463 
4464   if (cxn == NULL)
4465     return ;
4466 
4467   d_printf (1,"Deleting connection: %s:%d\n",
4468            hostPeerName (cxn->myHost),cxn->ident) ;
4469 
4470   for (c = gCxnList, q = NULL ; c != NULL ; q = c, c = c->next)
4471     if (c == cxn)
4472       {
4473         if (gCxnList == c)
4474           gCxnList = gCxnList->next ;
4475         else
4476           q->next = c->next ;
4477         break ;
4478       }
4479 
4480   ASSERT (c != NULL) ;
4481 
4482   if (cxn->myEp != NULL)
4483     delEndPoint (cxn->myEp) ;
4484 
4485   ASSERT (cxn->checkHead == NULL) ;
4486   ASSERT (cxn->checkRespHead == NULL) ;
4487   ASSERT (cxn->takeHead == NULL) ;
4488   ASSERT (cxn->takeRespHead == NULL) ;
4489 
4490   delBuffer (cxn->respBuffer) ;
4491 
4492   /* tell the Host we're outta here. */
4493   shutDown = hostCxnGone (cxn->myHost, cxn) ;
4494 
4495   cxn->ident = 0 ;
4496   cxn->timeCon = 0 ;
4497 
4498   free (cxn->ipName) ;
4499 
4500   clearTimer (cxn->artReceiptTimerId) ;
4501   clearTimer (cxn->readBlockedTimerId) ;
4502   clearTimer (cxn->writeBlockedTimerId) ;
4503   clearTimer (cxn->flushTimerId) ;
4504 
4505   free (cxn) ;
4506 
4507   if (shutDown)
4508     {
4509       /* exit program if that was the last connexion for the last host */
4510       /* XXX what about if there are ever multiple listeners?
4511 	 XXX    this will be executed if all hosts on only one of the
4512 	 XXX    listeners have gone */
4513       time_t now = theTime () ;
4514       char dateString [30] ;
4515       char **p = PointersFreedOnExit ;
4516 
4517       /* finish out all outstanding memory */
4518       while (*p)
4519 	free (*p++) ;
4520       free (PointersFreedOnExit) ;
4521       freeTimeoutQueue () ;
4522 
4523       timeToString (now, dateString, sizeof(dateString)) ;
4524       notice ("ME finishing at %s", dateString) ;
4525 
4526       exit (0) ;
4527     }
4528 }
4529 
4530 
4531 
4532 
4533 
4534 /*
4535  * Bump up the value of the low pass filter on the connection.
4536  */
incrFilter(Connection cxn)4537 static void incrFilter (Connection cxn)
4538 {
4539   cxn->filterValue *= (1.0 - (1.0 / cxn->lowPassFilter)) ;
4540   cxn->filterValue += 1.0 ;
4541 }
4542 
4543 
4544 
4545 
4546 
4547 /*
4548  * decrement the value of the low pass filter on the connection.
4549  */
decrFilter(Connection cxn)4550 static void decrFilter (Connection cxn)
4551 {
4552   cxn->filterValue *= (1.0 - (1.0 / cxn->lowPassFilter)) ;
4553 }
4554 
4555 
4556 
4557 
4558 
4559 /*
4560  * return true if we have articles we need to issue commands for.
4561  */
writesNeeded(Connection cxn)4562 static bool writesNeeded (Connection cxn)
4563 {
4564   return (cxn->checkHead != NULL || cxn->takeHead != NULL ? true : false) ;
4565 }
4566 
4567 
4568 
4569 
4570 
4571 /*
4572  * do some simple tests to make sure it's OK.
4573  */
validateConnection(Connection cxn)4574 static void validateConnection (Connection cxn)
4575 {
4576   unsigned int i ;
4577   unsigned int old ;
4578   ArtHolder p ;
4579 
4580   i = 0 ;
4581 
4582   /* count up the articles the Connection has and make sure that matches. */
4583   for (p = cxn->takeHead ; p != NULL ; p = p->next)
4584     i++ ;
4585   d_printf (4,"TAKE queue: %d\n",i) ;
4586   old = i ;
4587 
4588   for (p = cxn->takeRespHead ; p != NULL ; p = p->next)
4589     i++ ;
4590   d_printf (4,"TAKE response queue: %d\n",i - old) ;
4591   old = i ;
4592 
4593   for (p = cxn->checkHead ; p != NULL ; p = p->next)
4594     i++ ;
4595   d_printf (4,"CHECK queue: %d\n",i - old) ;
4596   old = i ;
4597 
4598   for (p = cxn->checkRespHead ; p != NULL ; p = p->next)
4599     i++ ;
4600   d_printf (4,"CHECK response queue: %d\n",i - old) ;
4601 
4602   ASSERT (i == cxn->articleQTotal) ;
4603 
4604   switch (cxn->state)
4605     {
4606       case cxnConnectingS:
4607         ASSERT (cxn->doesStreaming == false) ;
4608         ASSERT (cxn->articleQTotal <= 1) ;
4609         ASSERT (cxn->artReceiptTimerId == 0) ;
4610         ASSERT (cxn->sleepTimerId == 0) ;
4611         /* ASSERT (cxn->timeCon == 0) ; */
4612         break ;
4613 
4614       case cxnWaitingS:
4615         ASSERT (cxn->articleQTotal == 0) ;
4616         ASSERT (cxn->myEp == NULL) ;
4617         ASSERT (cxn->artReceiptTimerId == 0) ;
4618         ASSERT (cxn->readBlockedTimerId == 0) ;
4619         ASSERT (cxn->writeBlockedTimerId == 0) ;
4620         ASSERT (cxn->flushTimerId == 0) ;
4621         ASSERT (cxn->sleepTimerId == 0) ;
4622         ASSERT (cxn->timeCon == 0) ;
4623         break ;
4624 
4625       case cxnFlushingS:
4626       case cxnClosingS:
4627         if (!cxn->doesStreaming)
4628           ASSERT (cxn->articleQTotal <= 1) ;
4629         ASSERT (cxn->artReceiptTimerId == 0) ;
4630         ASSERT (cxn->flushTimerId == 0) ;
4631         ASSERT (cxn->sleepTimerId == 0) ;
4632         ASSERT (cxn->timeCon != 0) ;
4633         ASSERT (cxn->doesStreaming || cxn->maxCheck == 1) ;
4634         break ;
4635 
4636       case cxnFeedingS:
4637         if (cxn->doesStreaming)
4638           /* Some(?) hosts return the 439 response even before we're done
4639              sending, so don't go idle until here */
4640           ASSERT (cxn->articleQTotal > 0 || writeIsPending (cxn->myEp)) ;
4641         else
4642           ASSERT (cxn->articleQTotal == 1) ;
4643         if (cxn->readTimeout > 0 && !writeIsPending (cxn->myEp) &&
4644 	    cxn->checkRespHead != NULL && cxn->takeRespHead != NULL)
4645           ASSERT (cxn->readBlockedTimerId != 0) ;
4646         if (cxn->writeTimeout > 0 && writeIsPending (cxn->myEp))
4647           ASSERT (cxn->writeBlockedTimerId != 0) ;
4648         ASSERT (cxn->sleepTimerId == 0) ;
4649         ASSERT (cxn->timeCon != 0) ;
4650         ASSERT (cxn->doesStreaming || cxn->maxCheck == 1) ;
4651         break;
4652 
4653       case cxnIdleS:
4654         ASSERT (cxn->articleQTotal == 0) ;
4655         if (cxn->articleReceiptTimeout > 0)
4656           ASSERT (cxn->artReceiptTimerId != 0) ;
4657         ASSERT (cxn->readBlockedTimerId == 0) ;
4658         ASSERT (cxn->writeBlockedTimerId == 0) ;
4659         ASSERT (cxn->sleepTimerId == 0) ;
4660         ASSERT (cxn->timeCon != 0) ;
4661         ASSERT (!writeIsPending (cxn->myEp)) ;
4662         break ;
4663 
4664       case cxnIdleTimeoutS:
4665         ASSERT (cxn->articleQTotal == 0) ;
4666         ASSERT (cxn->artReceiptTimerId == 0) ;
4667         ASSERT (cxn->writeBlockedTimerId == 0) ;
4668         ASSERT (cxn->sleepTimerId == 0) ;
4669         ASSERT (cxn->timeCon != 0) ;
4670         ASSERT (!writeIsPending (cxn->myEp)) ;
4671         break ;
4672 
4673       case cxnSleepingS:
4674         ASSERT (cxn->articleQTotal == 0) ;
4675         ASSERT (cxn->myEp == NULL) ;
4676         ASSERT (cxn->artReceiptTimerId == 0) ;
4677         ASSERT (cxn->readBlockedTimerId == 0) ;
4678         ASSERT (cxn->writeBlockedTimerId == 0) ;
4679         ASSERT (cxn->flushTimerId == 0) ;
4680         ASSERT (cxn->timeCon == 0) ;
4681         break ;
4682 
4683       case cxnStartingS:
4684         ASSERT (cxn->articleQTotal == 0) ;
4685         ASSERT (cxn->myEp == NULL) ;
4686         ASSERT (cxn->artReceiptTimerId == 0) ;
4687         ASSERT (cxn->readBlockedTimerId == 0) ;
4688         ASSERT (cxn->writeBlockedTimerId == 0) ;
4689         ASSERT (cxn->flushTimerId == 0) ;
4690         ASSERT (cxn->sleepTimerId == 0) ;
4691         ASSERT (cxn->timeCon == 0) ;
4692         break ;
4693 
4694       case cxnDeadS:
4695         break ;
4696     }
4697 }
4698 
4699 
4700 
4701 
4702 
4703 /*
4704  * Generate a printable string of the parameter.
4705  */
stateToString(CxnState state)4706 static const char *stateToString (CxnState state)
4707 {
4708   static char rval [64] ;
4709 
4710   switch (state)
4711     {
4712       case cxnStartingS:
4713         strlcpy (rval,"cxnStartingS", sizeof(rval)) ;
4714         break ;
4715 
4716       case cxnWaitingS:
4717         strlcpy (rval,"cxnWaitingS", sizeof(rval)) ;
4718         break ;
4719 
4720       case cxnConnectingS:
4721         strlcpy (rval,"cxnConnectingS", sizeof(rval)) ;
4722         break ;
4723 
4724       case cxnIdleS:
4725         strlcpy (rval,"cxnIdleS", sizeof(rval)) ;
4726         break ;
4727 
4728       case cxnIdleTimeoutS:
4729         strlcpy (rval,"cxnIdleTimeoutS", sizeof(rval)) ;
4730         break ;
4731 
4732       case cxnFeedingS:
4733         strlcpy (rval,"cxnFeedingS", sizeof(rval)) ;
4734         break ;
4735 
4736       case cxnSleepingS:
4737         strlcpy (rval,"cxnSleepingS", sizeof(rval)) ;
4738         break ;
4739 
4740       case cxnFlushingS:
4741         strlcpy (rval,"cxnFlushingS", sizeof(rval)) ;
4742         break ;
4743 
4744       case cxnClosingS:
4745         strlcpy (rval,"cxnClosingS", sizeof(rval)) ;
4746         break ;
4747 
4748       case cxnDeadS:
4749         strlcpy (rval,"cxnDeadS", sizeof(rval)) ;
4750         break ;
4751 
4752       default:
4753         snprintf (rval,sizeof(rval),"UNKNOWN STATE: %d",state) ;
4754         break ;
4755     }
4756 
4757   return rval ;
4758 }
4759 
4760 
4761 
4762 
4763 
4764 /****************************************************************************
4765  *
4766  * Functions for managing the internal queue of Articles on each Connection.
4767  *
4768  ****************************************************************************/
4769 
newArtHolder(Article article)4770 static ArtHolder newArtHolder (Article article)
4771 {
4772   ArtHolder a = xmalloc (sizeof(struct art_holder_s)) ;
4773 
4774   a->article = article ;
4775   a->next = NULL ;
4776 
4777   return a ;
4778 }
4779 
4780 
4781 
4782 
4783 
4784 /*
4785  * Deletes the article holder
4786  */
delArtHolder(ArtHolder artH)4787 static void delArtHolder (ArtHolder artH)
4788 {
4789   if (artH != NULL)
4790     free (artH) ;
4791 }
4792 
4793 
4794 
4795 
4796 
4797 /*
4798  * remove the article holder from the queue. Adjust the count and if nxtPtr
4799  * points at the element then adjust that too.
4800  */
remArtHolder(ArtHolder artH,ArtHolder * head,unsigned int * count)4801 static bool remArtHolder (ArtHolder artH, ArtHolder *head, unsigned int *count)
4802 {
4803   ArtHolder h, i ;
4804 
4805   ASSERT (head != NULL) ;
4806   ASSERT (count != NULL) ;
4807 
4808   h = *head ;
4809   i = NULL ;
4810   while (h != NULL && h != artH)
4811     {
4812       i = h ;
4813       h = h->next ;
4814     }
4815 
4816   if (h == NULL)
4817     return false ;
4818 
4819   if (i == NULL)
4820     *head = (*head)->next ;
4821   else
4822     i->next = artH->next ;
4823 
4824   (*count)-- ;
4825 
4826   return true ;
4827 }
4828 
4829 
4830 
4831 
4832 
4833 /*
4834  * append the ArticleHolder to the queue
4835  */
appendArtHolder(ArtHolder artH,ArtHolder * head,unsigned int * count)4836 static void appendArtHolder (ArtHolder artH, ArtHolder *head, unsigned int *count)
4837 {
4838   ArtHolder p ;
4839 
4840   ASSERT (head != NULL) ;
4841   ASSERT (count != NULL) ;
4842 
4843   for (p = *head ; p != NULL && p->next != NULL ; p = p->next)
4844     /* nada */ ;
4845 
4846   if (p == NULL)
4847     *head = artH ;
4848   else
4849     p->next = artH ;
4850 
4851   artH->next = NULL ;
4852   (*count)++ ;
4853 }
4854 
4855 
4856 
4857 
4858 
4859 /*
4860  * find the article holder on the queue by comparing the message-id.
4861  */
artHolderByMsgId(const char * msgid,ArtHolder head)4862 static ArtHolder artHolderByMsgId (const char *msgid, ArtHolder head)
4863 {
4864   while (head != NULL)
4865     {
4866       if (strcmp (msgid, artMsgId (head->article)) == 0)
4867         return head ;
4868 
4869       head = head->next ;
4870     }
4871 
4872   return NULL ;
4873 }
4874 
4875 
4876 
4877 /*
4878  * Randomize a numeber by the given percentage
4879  */
4880 
fudgeFactor(int initVal)4881 static int fudgeFactor (int initVal)
4882 {
4883   int newValue ;
4884   static bool seeded ;
4885 
4886   if ( !seeded )
4887     {
4888       time_t t = theTime () ;
4889 
4890       /* this may have been done already in endpoint.c. Is that a problem??? */
4891       srand (t) ;
4892       seeded = true ;
4893     }
4894 
4895   newValue = initVal + (initVal / 10 - (rand() % (initVal / 5)));
4896 
4897   return newValue ;
4898 }
4899