1 /* $Id: connection.c 10283 2018-05-14 12:43:05Z iulius $
2 **
3 ** The implementation of the innfeed Connection class.
4 **
5 ** Written by James Brister <brister@vix.com>
6 **
7 ** The Connection object is what manages the NNTP protocol. If the remote
8 ** doesn't do streaming, then the standard IHAVE lock-step protcol is
9 ** performed. In the streaming situation we have two cases. One where we must
10 ** send CHECK commands, and the other where we can directly send TAKETHIS
11 ** commands without a prior CHECK.
12 **
13 ** The Connection object maintains four article queues. The first one is
14 ** where new articles are put if they need to have an IHAVE or CHECK command
15 ** sent for them. The second queue is where the articles move from the first
16 ** after their IHAVE/CHECK command is sent, but the reply has not yet been
17 ** seen. The third queue is where articles go after the IHAVE/CHECK reply has
18 ** been seen (and the reply says to send the article). It is articles in the
19 ** third queue that have the TAKETHIS command sent, or the body of an IHAVE.
20 ** The third queue is also where new articles go if the connection is running
21 ** in no-CHECK mode. The fourth queue is where the articles move to from the
22 ** third queue after their IHAVE-body or TAKETHIS command has been sent. When
23 ** the response to the IHAVE-body or TAKETHIS is received the articles are
24 ** removed from the fourth queue and the Host object controlling this
25 ** Connection is notified of the success or failure of the transfer.
26 **
27 ** The whole system is event-driven by the EndPoint class and the Host via
28 ** calls to prepareRead() and prepareWrite() and prepareSleep().
29 **
30 **
31 ** We should probably store the results of gethostbyname in the connection so
32 ** we can rotate through the address when one fails for connecting. Perhaps
33 ** the gethostbyname should be done in the Host and the connection should
34 ** just be given the address to use.
35 **
36 ** Should we worry about articles being stuck on a queue for ever if the
37 ** remote forgets to send a response to a CHECK?
38 **
39 ** Perhaps instead of killing the connection on some of the more simple
40 ** errors, we should perhaps try to flush the input and keep going.
41 **
42 ** Worry about counter overflow.
43 **
44 ** Worry about stats gathering when switch to no-check mode.
45 **
46 ** XXX if issueQUIT() has a problem and the state goes to cxnDeadS this is
47 ** not handled properly everywhere yet.
48 */
49
50 #include "innfeed.h"
51 #include "config.h"
52 #include "clibrary.h"
53 #include "portable/socket.h"
54
55 #include <assert.h>
56 #include <errno.h>
57 #include <fcntl.h>
58 #include <netdb.h>
59 #include <signal.h>
60 #include <syslog.h>
61
62 #ifdef HAVE_SYS_TIME_H
63 # include <sys/time.h>
64 #endif
65 #include <time.h>
66
67 #if defined (__FreeBSD__)
68 # include <sys/ioctl.h>
69 #endif
70
71 #include "inn/fdflag.h"
72 #include "inn/innconf.h"
73 #include "inn/messages.h"
74 #include "inn/network.h"
75 #include "inn/libinn.h"
76
77 #include "article.h"
78 #include "buffer.h"
79 #include "configfile.h"
80 #include "connection.h"
81 #include "endpoint.h"
82 #include "host.h"
83
84 #if defined (NDEBUG)
85 #define VALIDATE_CONNECTION(x) ((void) 0)
86 #else
87 #define VALIDATE_CONNECTION(x) validateConnection (x)
88 #endif
89
90 extern char **PointersFreedOnExit ;
91 extern const char *pidFile ;
92
93 /*
94 * Private types.
95 */
96
97 /* We keep a linked list of articles the connection is trying to transmit */
98 typedef struct art_holder_s
99 {
100 Article article ;
101 struct art_holder_s *next ;
102 } *ArtHolder ;
103
104
105 typedef enum {
106 cxnStartingS, /* the connection's start state. */
107 cxnWaitingS, /* not connected. Waiting for an article. */
108 cxnConnectingS, /* in the middle of connecting */
109 cxnIdleS, /* open and ready to feed, has empty queues */
110 cxnIdleTimeoutS, /* timed out in the idle state */
111 cxnFeedingS, /* in the processes of feeding articles */
112 cxnSleepingS, /* blocked on reestablishment timer */
113 cxnFlushingS, /* am waiting for queues to drain to bounce connection. */
114 cxnClosingS, /* have been told to close down permanently when queues drained */
115 cxnDeadS /* connection is dead. */
116 } CxnState ;
117
118 /* The Connection class */
119 struct connection_s
120 {
121 Host myHost ; /* the host who owns the connection */
122 EndPoint myEp ; /* the endpoint the connection talks through */
123 unsigned int ident ; /* an identifier for syslogging. */
124 CxnState state ; /* the state the connection is in */
125
126
127 /*
128 * The Connection maintains 4 queue of articles.
129 */
130 ArtHolder checkHead ; /* head of article list to do CHECK/IHAVE */
131 ArtHolder checkRespHead ; /* head of list waiting on CHECK/IHAVE
132 response */
133 ArtHolder takeHead ; /* head of list of articles to send
134 TAKETHIS/IHAVE-body */
135 ArtHolder takeRespHead ; /* list of articles waiting on
136 TAKETHIS/IHAVE-body response */
137 unsigned int articleQTotal ; /* number of articles in all four queues */
138 ArtHolder missing ; /* head of missing list */
139
140
141 Buffer respBuffer ; /* buffer all responses are read into */
142
143 char *ipName ; /* the ip name (possibly quad) of the remote */
144
145 unsigned int maxCheck ; /* the max number of CHECKs to send */
146 unsigned short port ; /* the port number to use */
147
148 /*
149 * Timeout values and their callback IDs
150 */
151
152 /* Timer for max amount of time between receiving articles from the
153 Host */
154 unsigned int articleReceiptTimeout ;
155 TimeoutId artReceiptTimerId ;
156
157 /* Timer for the max amount of time to wait for a response from the
158 remote */
159 unsigned int readTimeout ;
160 TimeoutId readBlockedTimerId ;
161
162 /* Timer for the max amount of time to wait for a any amount of data
163 to be written to the remote */
164 unsigned int writeTimeout ;
165 TimeoutId writeBlockedTimerId ;
166
167 /* Timer for the max number of seconds to keep the network connection
168 up (long lasting connections give older nntp servers problems). */
169 unsigned int flushTimeout ;
170 TimeoutId flushTimerId ;
171
172 /* Timer for the number of seconds to sleep before attempting a
173 reconnect. */
174 unsigned int sleepTimeout ;
175 TimeoutId sleepTimerId ;
176
177
178 bool loggedNoCr ; /* true if we logged the NOCR_MSG */
179 bool immedRecon ; /* true if we recon immediately after flushing. */
180 bool doesStreaming ; /* true if remote will handle streaming */
181 bool authenticated ; /* true if remote authenticated */
182 bool quitWasIssued ; /* true if QUIT command was sent. */
183 bool needsChecks ; /* true if we issue CHECK commands in
184 streaming mode (rather than just sending
185 TAKETHIS commands) */
186
187 time_t timeCon; /* the time the connect happened (including
188 the MODE STREAM command) */
189 time_t timeCon_checkpoint;
190
191 /*
192 * STATISTICS
193 */
194 unsigned int artsTaken; /* the number of articles INN gave this cxn */
195
196 unsigned int checksIssued; /* the number of CHECKs/IHAVEs we
197 sent. Note that if we're running in
198 no-CHECK mode, then we add in the
199 TAKETHIS commands too */
200 unsigned int checksIssued_checkpoint;
201
202 unsigned int checksRefused; /* the number of response 435/438 */
203 unsigned int checksRefused_checkpoint;
204 unsigned int takesRejected; /* the number of response 437/439 received */
205 unsigned int takesRejected_checkpoint;
206 unsigned int takesOkayed; /* the number of response 235/239 received */
207 unsigned int takesOkayed_checkpoint;
208
209 double takesSizeRejected;
210 double takesSizeRejected_checkpoint;
211 double takesSizeOkayed;
212 double takesSizeOkayed_checkpoint;
213
214 double onThreshold ; /* for no-CHECK mode */
215 double offThreshold ; /* for no-CHECK mode */
216 double filterValue ; /* current value of IIR filter */
217 double lowPassFilter ; /* time constant for IIR filter */
218
219 Connection next ; /* for global list */
220 };
221
222 static Connection gCxnList = NULL ;
223 static unsigned int gCxnCount = 0 ;
224 unsigned int max_reconnect_period = MAX_RECON_PER ;
225 unsigned int init_reconnect_period = INIT_RECON_PER ;
226 #if 0
227 static bool inited = false ;
228 #endif
229 static Buffer dotFirstBuffer ;
230 static Buffer dotBuffer ;
231 static Buffer crlfBuffer ;
232
233
234 /***************************************************
235 *
236 * Private function declarations.
237 *
238 ***************************************************/
239
240
241 /* I/O Callbacks */
242 static void connectionDone (EndPoint e, IoStatus i, Buffer *b, void *d) ;
243 static void getBanner (EndPoint e, IoStatus i, Buffer *b, void *d) ;
244 static void getAuthUserResponse (EndPoint e, IoStatus i, Buffer *b, void *d) ;
245 static void getAuthPassResponse (EndPoint e, IoStatus i, Buffer *b, void *d) ;
246 static void getModeResponse (EndPoint e, IoStatus i, Buffer *b, void *d) ;
247 static void responseIsRead (EndPoint e, IoStatus i, Buffer *b, void *d) ;
248 static void quitWritten (EndPoint e, IoStatus i, Buffer *b, void *d) ;
249 static void ihaveBodyDone (EndPoint e, IoStatus i, Buffer *b, void *d) ;
250 static void commandWriteDone (EndPoint e, IoStatus i, Buffer *b, void *d) ;
251 static void modeCmdIssued (EndPoint e, IoStatus i, Buffer *b, void *d) ;
252 static void authUserIssued (EndPoint e, IoStatus i, Buffer *b, void *d) ;
253 static void authPassIssued (EndPoint e, IoStatus i, Buffer *b, void *d) ;
254 static void writeProgress (EndPoint e, IoStatus i, Buffer *b, void *d) ;
255
256
257 /* Timer callbacks */
258 static void responseTimeoutCbk (TimeoutId id, void *data) ;
259 static void writeTimeoutCbk (TimeoutId id, void *data) ;
260 static void reopenTimeoutCbk (TimeoutId id, void *data) ;
261 static void flushCxnCbk (TimeoutId, void *data) ;
262 static void articleTimeoutCbk (TimeoutId id, void *data) ;
263
264 /* Work callbacks */
265 static void cxnWorkProc (EndPoint ep, void *data) ;
266
267
268 static void cxnSleepOrDie (Connection cxn) ;
269
270 /* Response processing. */
271 static void processResponse205 (Connection cxn, char *response) ;
272 static void processResponse238 (Connection cxn, char *response) ;
273 static void processResponse431 (Connection cxn, char *response) ;
274 static void processResponse438 (Connection cxn, char *response) ;
275 static void processResponse239 (Connection cxn, char *response) ;
276 static void processResponse439 (Connection cxn, char *response) ;
277 static void processResponse235 (Connection cxn, char *response) ;
278 static void processResponse335 (Connection cxn, char *response) ;
279 static void processResponse400 (Connection cxn, char *response) ;
280 static void processResponse435 (Connection cxn, char *response) ;
281 static void processResponse436 (Connection cxn, char *response) ;
282 static void processResponse437 (Connection cxn, char *response) ;
283 static void processResponse480 (Connection cxn, char *response) ;
284 static void processResponse503 (Connection cxn, char *response) ;
285
286
287 /* Misc functions */
288 static void cxnSleep (Connection cxn) ;
289 static void cxnDead (Connection cxn) ;
290 static void cxnIdle (Connection cxn) ;
291 static void noSuchMessageId (Connection cxn, unsigned int responseCode,
292 const char *msgid, const char *response) ;
293 static void abortConnection (Connection cxn) ;
294 static void resetConnection (Connection cxn) ;
295 static void deferAllArticles (Connection cxn) ;
296 static void deferQueuedArticles (Connection cxn) ;
297 static void doSomeWrites (Connection cxn) ;
298 static bool issueIHAVE (Connection cxn) ;
299 static void issueIHAVEBody (Connection cxn) ;
300 static bool issueStreamingCommands (Connection cxn) ;
301 static Buffer buildCheckBuffer (Connection cxn) ;
302 static Buffer *buildTakethisBuffers (Connection cxn, Buffer checkBuffer) ;
303 static void issueQUIT (Connection cxn) ;
304 static void initReadBlockedTimeout (Connection cxn) ;
305 static int prepareWriteWithTimeout (EndPoint endp, Buffer *buffers,
306 EndpRWCB done, Connection cxn) ;
307 static void delConnection (Connection cxn) ;
308 static void incrFilter (Connection cxn) ;
309 static void decrFilter (Connection cxn) ;
310 static bool writesNeeded (Connection cxn) ;
311 static void validateConnection (Connection cxn) ;
312 static const char *stateToString (CxnState state) ;
313
314 static void issueModeStream (EndPoint e, Connection cxn) ;
315 static void issueAuthUser (EndPoint e, Connection cxn) ;
316 static void issueAuthPass (EndPoint e, Connection cxn) ;
317
318 static void prepareReopenCbk (Connection cxn) ;
319
320
321 /* Article queue management routines. */
322 static ArtHolder newArtHolder (Article art) ;
323 static void delArtHolder (ArtHolder artH) ;
324 static bool remArtHolder (ArtHolder art, ArtHolder *head, unsigned int *count) ;
325 static void appendArtHolder (ArtHolder artH, ArtHolder *head, unsigned int *count) ;
326 static ArtHolder artHolderByMsgId (const char *msgid, ArtHolder head) ;
327
328 static int fudgeFactor (int initVal) ;
329
330
331
332
333 /***************************************************
334 *
335 * Public functions implementation.
336 *
337 ***************************************************/
338
339
cxnConfigLoadCbk(void * data UNUSED)340 int cxnConfigLoadCbk (void *data UNUSED)
341 {
342 long iv ;
343 int rval = 1 ;
344 FILE *fp = (FILE *) data ;
345
346 if (getInteger (topScope,"max-reconnect-time",&iv,NO_INHERIT))
347 {
348 if (iv < 1)
349 {
350 rval = 0 ;
351 logOrPrint (LOG_ERR,fp,
352 "ME config: value of %s (%ld) in %s cannot be less"
353 " than 1. Using %ld", "max-reconnect-time",
354 iv,"global scope",(long) MAX_RECON_PER);
355 iv = MAX_RECON_PER ;
356 }
357 }
358 else
359 iv = MAX_RECON_PER ;
360 max_reconnect_period = (unsigned int) iv ;
361
362 if (getInteger (topScope,"initial-reconnect-time",&iv,NO_INHERIT))
363 {
364 if (iv < 1)
365 {
366 rval = 0 ;
367 logOrPrint (LOG_ERR,fp,
368 "ME config: value of %s (%ld) in %s cannot be less"
369 " than 1. Using %ld", "initial-reconnect-time",
370 iv,"global scope",(long)INIT_RECON_PER);
371 iv = INIT_RECON_PER ;
372 }
373 }
374 else
375 iv = INIT_RECON_PER ;
376 init_reconnect_period = (unsigned int) iv ;
377
378 return rval ;
379 }
380
381
382
383
384
385 /*
386 * Create a new Connection object and return it. All fields are
387 * initialized to reasonable values.
388 */
newConnection(Host host,unsigned int id,const char * ipname,unsigned int articleReceiptTimeout,unsigned int portNum,unsigned int respTimeout,unsigned int flushTimeout,double lowPassLow,double lowPassHigh,double lowPassFilter)389 Connection newConnection (Host host,
390 unsigned int id,
391 const char *ipname,
392 unsigned int articleReceiptTimeout,
393 unsigned int portNum,
394 unsigned int respTimeout,
395 unsigned int flushTimeout,
396 double lowPassLow,
397 double lowPassHigh,
398 double lowPassFilter)
399 {
400 Connection cxn ;
401 bool croak = false ;
402
403 if (ipname == NULL)
404 {
405 d_printf (1,"NULL ipname in newConnection\n") ;
406 croak = true ;
407 }
408
409 if (ipname && strlen (ipname) == 0)
410 {
411 d_printf (1,"Empty ipname in newConnection\n") ;
412 croak = true ;
413 }
414
415 if (croak)
416 return NULL ;
417
418 cxn = xcalloc (1, sizeof(struct connection_s));
419
420 cxn->myHost = host ;
421 cxn->myEp = NULL ;
422 cxn->ident = id ;
423
424 cxn->checkHead = NULL ;
425 cxn->checkRespHead = NULL ;
426 cxn->takeHead = NULL ;
427 cxn->takeRespHead = NULL ;
428
429 cxn->articleQTotal = 0 ;
430 cxn->missing = NULL ;
431
432 cxn->respBuffer = newBuffer (BUFFER_SIZE) ;
433 ASSERT (cxn->respBuffer != NULL) ;
434
435 cxn->ipName = xstrdup (ipname) ;
436 cxn->port = portNum ;
437
438 /* Time out the higher numbered connections faster */
439 cxn->articleReceiptTimeout = articleReceiptTimeout * 10.0 / (10.0 + id) ;
440 cxn->artReceiptTimerId = 0 ;
441
442 cxn->readTimeout = respTimeout ;
443 cxn->readBlockedTimerId = 0 ;
444
445 cxn->writeTimeout = respTimeout ; /* XXX should be a separate value */
446 cxn->writeBlockedTimerId = 0 ;
447
448 cxn->flushTimeout = fudgeFactor (flushTimeout) ;
449 cxn->flushTimerId = 0 ;
450
451 cxn->onThreshold = lowPassHigh * lowPassFilter / 100.0 ;
452 cxn->offThreshold = lowPassLow * lowPassFilter / 100.0 ;
453 cxn->lowPassFilter = lowPassFilter;
454
455 cxn->sleepTimerId = 0 ;
456 cxn->sleepTimeout = init_reconnect_period ;
457
458 resetConnection (cxn) ;
459
460 cxn->next = gCxnList ;
461 gCxnList = cxn ;
462 gCxnCount++ ;
463
464 cxn->state = cxnStartingS ;
465
466 return cxn ;
467 }
468
469
470
471
472
473 /* Create a new endpoint hooked to a non-blocking socket that is trying to
474 * connect to the host info stored in the Connection. On fast machines
475 * connecting locally the connect() may have already succeeded when this
476 * returns, but typically the connect will still be running and when it
477 * completes. The Connection will be notified via a write callback setup by
478 * prepareWrite below. If nothing goes wrong then this will return true
479 * (even if the connect() has not yet completed). If something fails
480 * (hostname lookup etc.) then it returns false (and the Connection is left
481 * in the sleeping state)..
482 *
483 * Pre-state Reason cxnConnect called
484 * --------- ------------------------
485 * cxnStartingS Connection owner issued call.
486 * cxnWaitingS side effect of cxnTakeArticle() call
487 * cxnConnecting side effect of cxnFlush() call
488 * cxnSleepingS side effect of reopenTimeoutCbk() call.
489 */
cxnConnect(Connection cxn)490 bool cxnConnect (Connection cxn)
491 {
492 struct sockaddr *cxnAddr;
493 socklen_t len;
494 int fd, rval;
495 const char *src;
496 const char *peerName = hostPeerName (cxn->myHost) ;
497
498 ASSERT (cxn->myEp == NULL) ;
499
500 if (!(cxn->state == cxnStartingS ||
501 cxn->state == cxnWaitingS ||
502 cxn->state == cxnFlushingS ||
503 cxn->state == cxnSleepingS))
504 {
505 warn ("%s:%d cxnsleep connection in bad state: %s",
506 hostPeerName (cxn->myHost), cxn->ident,
507 stateToString (cxn->state)) ;
508 cxnSleepOrDie (cxn) ;
509 return false;
510 }
511
512 if (cxn->state == cxnWaitingS)
513 ASSERT (cxn->articleQTotal == 1) ;
514 else
515 ASSERT (cxn->articleQTotal == 0) ;
516
517 cxn->state = cxnConnectingS ;
518
519 cxnAddr = hostIpAddr (cxn->myHost) ;
520
521 if (cxnAddr == NULL)
522 {
523 cxnSleepOrDie (cxn) ;
524 return false ;
525 }
526
527 if (cxnAddr->sa_family == AF_INET)
528 {
529 src = hostBindAddr(cxn->myHost) ;
530 len = sizeof(struct sockaddr_in) ;
531 }
532 else
533 {
534 src = hostBindAddr6(cxn->myHost) ;
535 #if HAVE_INET6
536 len = sizeof(struct sockaddr_in6) ;
537 #else
538 /* This should never happen, but the compiler doesn't know that. */
539 len = sizeof(struct sockaddr) ;
540 #endif
541 }
542 if (src && strcmp(src, "none") == 0)
543 src = NULL;
544
545 fd = network_client_create (cxnAddr->sa_family, SOCK_STREAM, src);
546 if (fd < 0)
547 {
548 syswarn ("%s:%d cxnsleep can't create socket", peerName, cxn->ident) ;
549 d_printf (1,"Can't get a socket: %s\n", strerror (errno)) ;
550
551 hostIpFailed (cxn->myHost) ;
552 cxnSleepOrDie (cxn) ;
553
554 return false ;
555 }
556
557 if (!fdflag_nonblocking (fd, true))
558 {
559 syswarn ("%s:%d cxnsleep can't set socket non-blocking", peerName,
560 cxn->ident) ;
561 close (fd) ;
562
563 cxnSleepOrDie (cxn) ;
564
565 return false ;
566 }
567
568 rval = connect (fd, cxnAddr, len) ;
569 if (rval < 0 && errno != EINPROGRESS)
570 {
571 syswarn ("%s:%d connect", peerName, cxn->ident) ;
572 hostIpFailed (cxn->myHost) ;
573 close (fd) ;
574
575 cxnSleepOrDie (cxn) ;
576
577 return false ;
578 }
579
580 if ((cxn->myEp = newEndPoint (fd)) == NULL)
581 {
582 /* If this happens, then fd was bigger than what select could handle,
583 so endpoint.c refused to create the new object. */
584 close (fd) ;
585 cxnSleepOrDie (cxn) ;
586 return false ;
587 }
588
589 if (rval < 0)
590 /* when the write callback gets done the connection went through */
591 prepareWrite (cxn->myEp, NULL, NULL, connectionDone, cxn) ;
592 else
593 connectionDone (cxn->myEp, IoDone, NULL, cxn) ;
594
595 /* connectionDone() could set state to sleeping */
596 return (cxn->state == cxnConnectingS ? true : false) ;
597 }
598
599
600
601
602
603 /* Put the Connection into the wait state.
604 *
605 * Pre-state Reason cxnWait called
606 * --------- ------------------------
607 * cxnStartingS - Connection owner called cxnWait()
608 * cxnSleepingS - side effect of cxnFlush() call.
609 * cxnConnectingS - side effect of cxnFlush() call.
610 * cxnFlushingS - side effect of receiving response 205
611 * and Connection had no articles when
612 * cxnFlush() was issued.
613 * - prepareRead failed.
614 * - I/O failed.
615 *
616 */
cxnWait(Connection cxn)617 void cxnWait (Connection cxn)
618 {
619 ASSERT (cxn->state == cxnStartingS ||
620 cxn->state == cxnSleepingS ||
621 cxn->state == cxnConnectingS ||
622 cxn->state == cxnFeedingS ||
623 cxn->state == cxnFlushingS) ;
624 VALIDATE_CONNECTION (cxn) ;
625
626 abortConnection (cxn) ;
627
628 cxn->state = cxnWaitingS ;
629
630 hostCxnWaiting (cxn->myHost,cxn) ; /* tell our Host we're waiting */
631 }
632
633
634
635
636
637 /* Tells the Connection to flush itself (i.e. push out all articles,
638 * issue a QUIT and drop the network connection. If necessary a
639 * reconnect will be done immediately after. Called by the Host, or
640 * by the timer callback.
641 *
642 * Pre-state Reason cxnFlush called
643 * --------- ------------------------
644 * ALL (except cxnDeadS - Connection owner called cxnFlush()
645 * and cxnStartingS)
646 * cxnFeedingS - side effect of flushCxnCbk() call.
647 */
cxnFlush(Connection cxn)648 void cxnFlush (Connection cxn)
649 {
650 ASSERT (cxn != NULL) ;
651 ASSERT (cxn->state != cxnStartingS) ;
652 ASSERT (cxn->state != cxnDeadS) ;
653 VALIDATE_CONNECTION (cxn) ;
654
655 switch (cxn->state)
656 {
657 case cxnSleepingS:
658 cxnWait (cxn) ;
659 break ;
660
661 case cxnConnectingS:
662 cxnWait (cxn) ;
663 cxnConnect (cxn) ;
664 break ;
665
666 case cxnIdleTimeoutS:
667 case cxnIdleS:
668 ASSERT (cxn->articleQTotal == 0) ;
669 if (cxn->state != cxnIdleTimeoutS)
670 clearTimer (cxn->artReceiptTimerId) ;
671 clearTimer (cxn->flushTimerId) ;
672 cxn->state = cxnFlushingS ;
673 issueQUIT (cxn) ;
674 break ;
675
676 case cxnClosingS:
677 case cxnFlushingS:
678 case cxnWaitingS:
679 if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
680 issueQUIT (cxn) ;
681 break ;
682
683 case cxnFeedingS:
684 /* we only reconnect immediately if we're not idle when cxnFlush()
685 is called. */
686 if (!cxn->immedRecon)
687 {
688 cxn->immedRecon = (cxn->articleQTotal > 0 ? true : false) ;
689 d_printf (1,"%s:%d immediate reconnect for a cxnFlush()\n",
690 hostPeerName (cxn->myHost), cxn->ident) ;
691 }
692
693 clearTimer (cxn->flushTimerId) ;
694
695 cxn->state = cxnFlushingS ;
696
697 if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
698 issueQUIT (cxn) ;
699 break ;
700
701 default:
702 die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
703 }
704 }
705
706
707
708 /*
709 * Tells the Connection to dump all articles that are queued and to issue a
710 * QUIT as quickly as possible. Much like cxnClose, except queued articles
711 * are not sent, but are given back to the Host.
712 */
cxnTerminate(Connection cxn)713 void cxnTerminate (Connection cxn)
714 {
715 ASSERT (cxn != NULL) ;
716 ASSERT (cxn->state != cxnDeadS) ;
717 ASSERT (cxn->state != cxnStartingS) ;
718 VALIDATE_CONNECTION (cxn) ;
719
720 switch (cxn->state)
721 {
722 case cxnFeedingS:
723 d_printf (1,"%s:%d Issuing terminate\n",
724 hostPeerName (cxn->myHost), cxn->ident) ;
725
726 clearTimer (cxn->flushTimerId) ;
727
728 cxn->state = cxnClosingS ;
729
730 deferQueuedArticles (cxn) ;
731 if (cxn->articleQTotal == 0)
732 issueQUIT (cxn) ; /* send out the QUIT if we can */
733 break ;
734
735 case cxnIdleTimeoutS:
736 case cxnIdleS:
737 ASSERT (cxn->articleQTotal == 0) ;
738 if (cxn->state != cxnIdleTimeoutS)
739 clearTimer (cxn->artReceiptTimerId) ;
740 clearTimer (cxn->flushTimerId) ;
741 cxn->state = cxnClosingS ;
742 issueQUIT (cxn) ;
743 break ;
744
745 case cxnFlushingS: /* we are in the middle of a periodic close. */
746 d_printf (1,"%s:%d Connection already being flushed\n",
747 hostPeerName (cxn->myHost),cxn->ident);
748 cxn->state = cxnClosingS ;
749 if (cxn->articleQTotal == 0)
750 issueQUIT (cxn) ; /* send out the QUIT if we can */
751 break ;
752
753 case cxnClosingS:
754 d_printf (1,"%s:%d Connection already closing\n",
755 hostPeerName (cxn->myHost),cxn->ident) ;
756 break ;
757
758 case cxnWaitingS:
759 case cxnConnectingS:
760 case cxnSleepingS:
761 cxnDead (cxn) ;
762 break ;
763
764 default:
765 die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
766 }
767
768 VALIDATE_CONNECTION (cxn) ;
769
770 if (cxn->state == cxnDeadS)
771 {
772 d_printf (1,"%s:%d Deleting connection\n",hostPeerName (cxn->myHost),
773 cxn->ident) ;
774
775 delConnection (cxn) ;
776 }
777 }
778
779
780
781 /* Tells the Connection to do a disconnect and then when it is
782 * disconnected to delete itself.
783 *
784 * Pre-state Reason cxnClose called
785 * --------- ------------------------
786 * ALL (except cxnDeadS - Connecton owner called directly.
787 * and cxnStartingS).
788 */
cxnClose(Connection cxn)789 void cxnClose (Connection cxn)
790 {
791 ASSERT (cxn != NULL) ;
792 ASSERT (cxn->state != cxnDeadS) ;
793 ASSERT (cxn->state != cxnStartingS) ;
794 VALIDATE_CONNECTION (cxn) ;
795
796 switch (cxn->state)
797 {
798 case cxnFeedingS:
799 d_printf (1,"%s:%d Issuing disconnect\n",
800 hostPeerName (cxn->myHost), cxn->ident) ;
801
802 clearTimer (cxn->flushTimerId) ;
803
804 cxn->state = cxnClosingS ;
805
806 if (cxn->articleQTotal == 0)
807 issueQUIT (cxn) ; /* send out the QUIT if we can */
808 break ;
809
810 case cxnIdleS:
811 case cxnIdleTimeoutS:
812 ASSERT (cxn->articleQTotal == 0) ;
813 if (cxn->state != cxnIdleTimeoutS)
814 clearTimer (cxn->artReceiptTimerId) ;
815 clearTimer (cxn->flushTimerId) ;
816 cxn->state = cxnClosingS ;
817 issueQUIT (cxn) ;
818 break ;
819
820 case cxnFlushingS: /* we are in the middle of a periodic close. */
821 d_printf (1,"%s:%d Connection already being flushed\n",
822 hostPeerName (cxn->myHost),cxn->ident);
823 cxn->state = cxnClosingS ;
824 if (cxn->articleQTotal == 0)
825 issueQUIT (cxn) ; /* send out the QUIT if we can */
826 break ;
827
828 case cxnClosingS:
829 d_printf (1,"%s:%d Connection already closing\n",
830 hostPeerName (cxn->myHost),cxn->ident) ;
831 break ;
832
833 case cxnWaitingS:
834 case cxnConnectingS:
835 case cxnSleepingS:
836 cxnDead (cxn) ;
837 break ;
838
839 default:
840 die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
841 }
842
843 VALIDATE_CONNECTION (cxn) ;
844
845 if (cxn->state == cxnDeadS)
846 {
847 d_printf (1,"%s:%d Deleting connection\n",hostPeerName (cxn->myHost),
848 cxn->ident) ;
849
850 delConnection (cxn) ;
851 }
852 }
853
854
855
856
857
858 /* This is what the Host calls to get us to tranfer an article. If
859 * we're running the IHAVE sequence, then we can't take it if we've
860 * got an article already. If we're running the CHECK/TAKETHIS
861 * sequence, then we'll take as many as we can (up to our MAXCHECK
862 * limit).
863 */
cxnTakeArticle(Connection cxn,Article art)864 bool cxnTakeArticle (Connection cxn, Article art)
865 {
866 bool rval = true ;
867
868 ASSERT (cxn != NULL) ;
869 VALIDATE_CONNECTION (cxn) ;
870
871 if ( !cxnQueueArticle (cxn,art) ) /* might change cxnIdleS to cxnFeedingS */
872 return false ;
873
874 if (!(cxn->state == cxnConnectingS ||
875 cxn->state == cxnFeedingS ||
876 cxn->state == cxnWaitingS))
877 {
878 warn ("%s:%d cxnsleep connection in bad state: %s",
879 hostPeerName (cxn->myHost), cxn->ident,
880 stateToString (cxn->state)) ;
881 cxnSleepOrDie (cxn) ;
882 return false ;
883 }
884
885 if (cxn->state != cxnWaitingS) /* because articleQTotal == 1 */
886 VALIDATE_CONNECTION (cxn) ;
887 else
888 ASSERT (cxn->articleQTotal == 1) ;
889
890 switch (cxn->state)
891 {
892 case cxnWaitingS:
893 cxnConnect (cxn) ;
894 break ;
895
896 case cxnFeedingS:
897 doSomeWrites (cxn) ;
898 break ;
899
900 case cxnConnectingS:
901 break ;
902
903 default:
904 die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
905 }
906
907 return rval ;
908 }
909
910
911
912
913
914 /* if there's room in the Connection then stick the article on the
915 * queue, otherwise return false.
916 */
cxnQueueArticle(Connection cxn,Article art)917 bool cxnQueueArticle (Connection cxn, Article art)
918 {
919 ArtHolder newArt ;
920 bool rval = false ;
921
922 ASSERT (cxn != NULL) ;
923 ASSERT (cxn->state != cxnStartingS) ;
924 ASSERT (cxn->state != cxnDeadS) ;
925 VALIDATE_CONNECTION (cxn) ;
926
927 switch (cxn->state)
928 {
929 case cxnClosingS:
930 d_printf (5,"%s:%d Refusing article due to closing\n",
931 hostPeerName (cxn->myHost),cxn->ident) ;
932 break ;
933
934 case cxnFlushingS:
935 d_printf (5,"%s:%d Refusing article due to flushing\n",
936 hostPeerName (cxn->myHost),cxn->ident) ;
937 break ;
938
939 case cxnSleepingS:
940 d_printf (5,"%s:%d Refusing article due to sleeping\n",
941 hostPeerName (cxn->myHost),cxn->ident) ;
942 break ;
943
944 case cxnWaitingS:
945 rval = true ;
946 newArt = newArtHolder (art) ;
947 appendArtHolder (newArt, &cxn->checkHead, &cxn->articleQTotal) ;
948 break ;
949
950 case cxnConnectingS:
951 if (cxn->articleQTotal != 0)
952 break ;
953 rval = true ;
954 newArt = newArtHolder (art) ;
955 appendArtHolder (newArt, &cxn->checkHead, &cxn->articleQTotal) ;
956 break ;
957
958 case cxnIdleS:
959 case cxnFeedingS:
960 if (cxn->articleQTotal >= cxn->maxCheck)
961 d_printf (5, "%s:%d Refusing article due to articleQTotal >= maxCheck (%d > %d)\n",
962 hostPeerName (cxn->myHost), cxn->ident,
963 cxn->articleQTotal, cxn->maxCheck) ;
964 else
965 {
966 rval = true ;
967 newArt = newArtHolder (art) ;
968 if (cxn->needsChecks)
969 appendArtHolder (newArt, &cxn->checkHead, &cxn->articleQTotal) ;
970 else
971 appendArtHolder (newArt, &cxn->takeHead, &cxn->articleQTotal) ;
972 if (cxn->state == cxnIdleS)
973 {
974 cxn->state = cxnFeedingS ;
975 clearTimer (cxn->artReceiptTimerId) ;
976 }
977 }
978 break ;
979
980 default:
981 die ("Invalid state: %s\n", stateToString (cxn->state)) ;
982 }
983
984 if (rval)
985 {
986 d_printf (5,"%s:%d accepting article %s\n",hostPeerName (cxn->myHost),
987 cxn->ident,artMsgId (art)) ;
988
989 cxn->artsTaken++ ;
990 }
991
992 return rval ;
993 }
994
995
996
997
998
999 /*
1000 * Generate a log message for activity. Usually called by the Connection's
1001 * owner.
1002 */
cxnLogStats(Connection cxn,bool final)1003 void cxnLogStats (Connection cxn, bool final)
1004 {
1005 const char *peerName ;
1006 time_t now = theTime() ;
1007
1008 ASSERT (cxn != NULL) ;
1009
1010 /* Only log stats when in one of these three states. */
1011 switch (cxn->state)
1012 {
1013 case cxnFeedingS:
1014 case cxnFlushingS:
1015 case cxnClosingS:
1016 break ;
1017
1018 default:
1019 return ;
1020 }
1021
1022 peerName = hostPeerName (cxn->myHost) ;
1023
1024 /* Log a checkpoint in any case. */
1025 notice("%s:%d checkpoint seconds %ld offered %d accepted %d refused %d"
1026 " rejected %d accsize %.0f rejsize %.0f",
1027 peerName, cxn->ident,
1028 (long) (now - cxn->timeCon_checkpoint),
1029 cxn->checksIssued - cxn->checksIssued_checkpoint,
1030 cxn->takesOkayed - cxn->takesOkayed_checkpoint,
1031 cxn->checksRefused - cxn->checksRefused_checkpoint,
1032 cxn->takesRejected - cxn->takesRejected_checkpoint,
1033 cxn->takesSizeOkayed - cxn->takesSizeOkayed_checkpoint,
1034 cxn->takesSizeRejected - cxn->takesSizeRejected_checkpoint);
1035
1036 if (final) {
1037 notice("%s:%d final seconds %ld offered %d accepted %d refused %d"
1038 " rejected %d accsize %.0f rejsize %.0f",
1039 peerName, cxn->ident, (long) (now - cxn->timeCon),
1040 cxn->checksIssued, cxn->takesOkayed, cxn->checksRefused,
1041 cxn->takesRejected, cxn->takesSizeOkayed, cxn->takesSizeRejected);
1042
1043 cxn->artsTaken = 0;
1044 cxn->checksIssued = 0;
1045 cxn->checksRefused = 0;
1046 cxn->takesRejected = 0;
1047 cxn->takesOkayed = 0;
1048 cxn->takesSizeRejected = 0;
1049 cxn->takesSizeOkayed = 0;
1050
1051 if (cxn->timeCon > 0) {
1052 cxn->timeCon = theTime();
1053 }
1054 }
1055
1056 cxn->timeCon_checkpoint = now;
1057 cxn->checksIssued_checkpoint = cxn->checksIssued;
1058 cxn->takesOkayed_checkpoint = cxn->takesOkayed;
1059 cxn->checksRefused_checkpoint = cxn->checksRefused;
1060 cxn->takesRejected_checkpoint = cxn->takesRejected;
1061 cxn->takesSizeOkayed_checkpoint = cxn->takesSizeOkayed;
1062 cxn->takesSizeRejected_checkpoint = cxn->takesSizeRejected;
1063 }
1064
1065
1066
1067
1068
1069 /*
1070 * return the number of articles the connection will accept.
1071 */
cxnQueueSpace(Connection cxn)1072 size_t cxnQueueSpace (Connection cxn)
1073 {
1074 int rval = 0 ;
1075
1076 ASSERT (cxn != NULL) ;
1077
1078 if (cxn->state == cxnFeedingS ||
1079 cxn->state == cxnIdleS ||
1080 cxn->state == cxnConnectingS ||
1081 cxn->state == cxnWaitingS)
1082 rval = cxn->maxCheck - cxn->articleQTotal ;
1083
1084 return rval ;
1085 }
1086
1087
1088
1089
1090
1091 /*
1092 * Print info on all the connections that currently exist.
1093 */
gPrintCxnInfo(FILE * fp,unsigned int indentAmt)1094 void gPrintCxnInfo (FILE *fp, unsigned int indentAmt)
1095 {
1096 char indent [INDENT_BUFFER_SIZE] ;
1097 unsigned int i ;
1098 Connection cxn ;
1099
1100 for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
1101 indent [i] = ' ' ;
1102 indent [i] = '\0' ;
1103
1104 fprintf (fp,"%sGlobal Connection list : (count %u) {\n",
1105 indent,gCxnCount) ;
1106 for (cxn = gCxnList ; cxn != NULL ; cxn = cxn->next)
1107 printCxnInfo (cxn,fp,indentAmt + INDENT_INCR) ;
1108 fprintf (fp,"%s}\n",indent) ;
1109 }
1110
1111
1112
1113
1114
1115 /*
1116 * Print the info about the given connection.
1117 */
printCxnInfo(Connection cxn,FILE * fp,unsigned int indentAmt)1118 void printCxnInfo (Connection cxn, FILE *fp, unsigned int indentAmt)
1119 {
1120 char indent [INDENT_BUFFER_SIZE] ;
1121 unsigned int i ;
1122 ArtHolder artH ;
1123
1124 for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
1125 indent [i] = ' ' ;
1126 indent [i] = '\0' ;
1127
1128 fprintf (fp,"%sConnection : %p {\n",indent, (void *) cxn) ;
1129 fprintf (fp,"%s host : %p\n",indent, (void *) cxn->myHost) ;
1130 fprintf (fp,"%s endpoint : %p\n",indent, (void *) cxn->myEp) ;
1131 fprintf (fp,"%s state : %s\n",indent, stateToString (cxn->state)) ;
1132 fprintf (fp,"%s ident : %u\n",indent,cxn->ident) ;
1133 fprintf (fp,"%s ip-name : %s\n", indent, cxn->ipName) ;
1134 fprintf (fp,"%s port-number : %u\n",indent,cxn->port) ;
1135 fprintf (fp,"%s max-checks : %u\n",indent,cxn->maxCheck) ;
1136 fprintf (fp,"%s does-streaming : %s\n",indent,
1137 boolToString (cxn->doesStreaming)) ;
1138 fprintf (fp,"%s authenticated : %s\n",indent,
1139 boolToString (cxn->authenticated)) ;
1140 fprintf (fp,"%s quitWasIssued : %s\n",indent,
1141 boolToString (cxn->quitWasIssued)) ;
1142 fprintf (fp,"%s needs-checks : %s\n",indent,
1143 boolToString (cxn->needsChecks)) ;
1144
1145 fprintf (fp,"%s time-connected : %ld\n",indent,(long) cxn->timeCon) ;
1146 fprintf (fp,"%s articles from INN : %u\n",indent,cxn->artsTaken) ;
1147 fprintf (fp,"%s articles offered : %u\n",indent,
1148 cxn->checksIssued) ;
1149 fprintf (fp,"%s articles refused : %u\n",indent,
1150 cxn->checksRefused) ;
1151 fprintf (fp,"%s articles rejected : %u\n",indent,
1152 cxn->takesRejected) ;
1153 fprintf (fp,"%s articles accepted : %u\n",indent,
1154 cxn->takesOkayed) ;
1155 fprintf (fp,"%s low-pass upper limit : %0.6f\n", indent,
1156 cxn->onThreshold) ;
1157 fprintf (fp,"%s low-pass lower limit : %0.6f\n", indent,
1158 cxn->offThreshold) ;
1159 fprintf (fp,"%s low-pass filter tc : %0.6f\n", indent,
1160 cxn->lowPassFilter) ;
1161 fprintf (fp,"%s low-pass filter : %0.6f\n", indent,
1162 cxn->filterValue) ;
1163
1164 fprintf (fp,"%s article-timeout : %u\n",indent,cxn->articleReceiptTimeout) ;
1165 fprintf (fp,"%s article-callback : %d\n",indent,cxn->artReceiptTimerId) ;
1166
1167 fprintf (fp,"%s response-timeout : %u\n",indent,cxn->readTimeout) ;
1168 fprintf (fp,"%s response-callback : %d\n",indent,cxn->readBlockedTimerId) ;
1169
1170 fprintf (fp,"%s write-timeout : %u\n",indent,cxn->writeTimeout) ;
1171 fprintf (fp,"%s write-callback : %d\n",indent,cxn->writeBlockedTimerId) ;
1172
1173 fprintf (fp,"%s flushTimeout : %u\n",indent,cxn->flushTimeout) ;
1174 fprintf (fp,"%s flushTimerId : %d\n",indent,cxn->flushTimerId) ;
1175
1176 fprintf (fp,"%s reopen wait : %u\n",indent,cxn->sleepTimeout) ;
1177 fprintf (fp,"%s reopen id : %d\n",indent,cxn->sleepTimerId) ;
1178
1179 fprintf (fp,"%s CHECK queue {\n",indent) ;
1180 for (artH = cxn->checkHead ; artH != NULL ; artH = artH->next)
1181 printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1182 fprintf (fp,"%s }\n",indent) ;
1183
1184 fprintf (fp,"%s CHECK Response queue {\n",indent) ;
1185 for (artH = cxn->checkRespHead ; artH != NULL ; artH = artH->next)
1186 printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1187 fprintf (fp,"%s }\n",indent) ;
1188
1189 fprintf (fp,"%s TAKE queue {\n",indent) ;
1190 for (artH = cxn->takeHead ; artH != NULL ; artH = artH->next)
1191 printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1192 fprintf (fp,"%s }\n",indent) ;
1193
1194 fprintf (fp,"%s TAKE response queue {\n",indent) ;
1195 for (artH = cxn->takeRespHead ; artH != NULL ; artH = artH->next)
1196 printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1197 fprintf (fp,"%s }\n",indent) ;
1198
1199 fprintf (fp,"%s response buffer {\n",indent) ;
1200 printBufferInfo (cxn->respBuffer,fp,indentAmt + INDENT_INCR) ;
1201 fprintf (fp,"%s }\n",indent) ;
1202
1203 fprintf (fp,"%s}\n",indent) ;
1204 }
1205
1206
1207
1208
1209
1210 /*
1211 * Return whether the connection will accept articles.
1212 */
cxnCheckstate(Connection cxn)1213 bool cxnCheckstate (Connection cxn)
1214 {
1215 bool rval = false ;
1216
1217 ASSERT (cxn != NULL) ;
1218
1219 if (cxn->state == cxnFeedingS ||
1220 cxn->state == cxnIdleS ||
1221 cxn->state == cxnConnectingS)
1222 rval = true ;
1223
1224 return rval ;
1225 }
1226
1227
1228
1229
1230
1231 /**********************************************************************/
1232 /** STATIC PRIVATE FUNCTIONS **/
1233 /**********************************************************************/
1234
1235
1236 /*
1237 * ENDPOINT CALLBACK AREA.
1238 *
1239 * All the functions in this next section are callbacks fired by the
1240 * EndPoint objects/class (either timers or i/o completion callbacks)..
1241 */
1242
1243
1244 /*
1245 * this is the first stage of the NNTP FSM. This function is called
1246 * when the tcp/ip network connection is setup and we should get
1247 * ready to read the banner message. When this function returns the
1248 * state of the Connection will still be cxnConnectingS unless
1249 * something broken, in which case it probably went into the
1250 * cxnSleepingS state.
1251 */
connectionDone(EndPoint e,IoStatus i,Buffer * b,void * d)1252 static void connectionDone (EndPoint e, IoStatus i, Buffer *b, void *d)
1253 {
1254 Buffer *readBuffers ;
1255 Connection cxn = (Connection) d ;
1256 const char *peerName ;
1257 int optval;
1258 socklen_t size ;
1259
1260 ASSERT (b == NULL) ;
1261 ASSERT (cxn->state == cxnConnectingS) ;
1262 ASSERT (!writeIsPending (cxn->myEp)) ;
1263
1264 size = sizeof (optval) ;
1265 peerName = hostPeerName (cxn->myHost) ;
1266
1267 if (i != IoDone)
1268 {
1269 errno = endPointErrno (e) ;
1270 syswarn ("%s:%d cxnsleep i/o failed", peerName, cxn->ident) ;
1271
1272 cxnSleepOrDie (cxn) ;
1273 }
1274 else if (getsockopt (endPointFd (e), SOL_SOCKET, SO_ERROR,
1275 (char *) &optval, &size) != 0)
1276 {
1277 /* This is bad. Can't even get the SO_ERROR value out of the socket */
1278 syswarn ("%s:%d cxnsleep internal getsockopt", peerName, cxn->ident) ;
1279
1280 cxnSleepOrDie (cxn) ;
1281 }
1282 else if (optval != 0)
1283 {
1284 /* if the connect failed then the only way to know is by getting
1285 the SO_ERROR value out of the socket. */
1286 errno = optval ;
1287 syswarn ("%s:%d cxnsleep connect", peerName, cxn->ident) ;
1288 hostIpFailed (cxn->myHost) ;
1289
1290 cxnSleepOrDie (cxn) ;
1291 }
1292 else
1293 {
1294 readBuffers = makeBufferArray (bufferTakeRef (cxn->respBuffer), NULL) ;
1295
1296 if ( !prepareRead (e, readBuffers, getBanner, cxn, 1) )
1297 {
1298 warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1299
1300 cxnSleepOrDie (cxn) ;
1301 }
1302 else
1303 {
1304 initReadBlockedTimeout (cxn) ;
1305
1306 /* set up the callback for closing down the connection at regular
1307 intervals (due to problems with long running nntpd). */
1308 if (cxn->flushTimeout > 0)
1309 cxn->flushTimerId = prepareSleep (flushCxnCbk,
1310 cxn->flushTimeout,cxn) ;
1311
1312 /* The state doesn't change yet until we've read the banner and
1313 tried the MODE STREAM command. */
1314 }
1315 }
1316 VALIDATE_CONNECTION (cxn) ;
1317 }
1318
1319
1320 /*
1321 * This is called when we are so far in the connection setup that
1322 * we're confident it'll work. If the connection is IPv6, remove
1323 * the IPv4 addresses from the address list.
1324 */
connectionIfIpv6DeleteIpv4Addr(Connection cxn)1325 static void connectionIfIpv6DeleteIpv4Addr (Connection cxn)
1326 {
1327 union {
1328 struct sockaddr sa;
1329 struct sockaddr_storage ss;
1330 } u;
1331 socklen_t len = sizeof(u);
1332
1333 if (getpeername (endPointFd (cxn->myEp), &u.sa, &len) < 0)
1334 return;
1335 if (u.sa.sa_family == AF_INET)
1336 return;
1337
1338 hostDeleteIpv4Addr (cxn->myHost);
1339 }
1340
1341
1342 /*
1343 * Called when the banner message has been read off the wire and is
1344 * in the buffer(s). When this function returns the state of the
1345 * Connection will still be cxnConnectingS unless something broken,
1346 * in which case it probably went into the cxnSleepiongS state.
1347 */
getBanner(EndPoint e,IoStatus i,Buffer * b,void * d)1348 static void getBanner (EndPoint e, IoStatus i, Buffer *b, void *d)
1349 {
1350 Buffer *readBuffers ;
1351 Connection cxn = (Connection) d ;
1352 char *p = bufferBase (b[0]) ;
1353 int code ;
1354 bool isOk = false ;
1355 const char *peerName ;
1356 char *rest ;
1357
1358 ASSERT (e == cxn->myEp) ;
1359 ASSERT (b[0] == cxn->respBuffer) ;
1360 ASSERT (b[1] == NULL) ;
1361 ASSERT (cxn->state == cxnConnectingS) ;
1362 ASSERT (!writeIsPending (cxn->myEp));
1363
1364
1365 peerName = hostPeerName (cxn->myHost) ;
1366
1367 bufferAddNullByte (b[0]) ;
1368
1369 if (i != IoDone)
1370 {
1371 errno = endPointErrno (cxn->myEp) ;
1372 syswarn ("%s:%d cxnsleep can't read banner", peerName, cxn->ident) ;
1373 hostIpFailed (cxn->myHost) ;
1374
1375 cxnSleepOrDie (cxn) ;
1376 }
1377 else if (strchr (p, '\n') == NULL)
1378 { /* partial read. expand buffer and retry */
1379 expandBuffer (b[0], BUFFER_EXPAND_AMOUNT) ;
1380 readBuffers = makeBufferArray (bufferTakeRef (b[0]), NULL) ;
1381
1382 if ( !prepareRead (e, readBuffers, getBanner, cxn, 1) )
1383 {
1384 warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1385
1386 cxnSleepOrDie (cxn) ;
1387 }
1388 }
1389 else if ( !getNntpResponse (p, &code, &rest) )
1390 {
1391 trim_ws (p) ;
1392
1393 warn ("%s:%d cxnsleep response format: %s", peerName, cxn->ident, p) ;
1394
1395 cxnSleepOrDie (cxn) ;
1396 }
1397 else
1398 {
1399 trim_ws (p) ;
1400
1401 switch (code)
1402 {
1403 case 200: /* normal */
1404 case 201: /* can transfer but not post -- old nntpd */
1405 isOk = true ;
1406 break ;
1407
1408 case 400:
1409 cxnSleepOrDie (cxn) ;
1410 hostIpFailed (cxn->myHost) ;
1411 hostCxnBlocked (cxn->myHost, cxn, rest) ;
1412 break ;
1413
1414 case 502:
1415 warn ("%s:%d cxnsleep no permission to talk: %s", peerName,
1416 cxn->ident, p) ;
1417 cxnSleepOrDie (cxn) ;
1418 hostIpFailed (cxn->myHost) ;
1419 hostCxnBlocked (cxn->myHost, cxn, rest) ;
1420 break ;
1421
1422 default:
1423 warn ("%s:%d cxnsleep response unknown banner: %d %s", peerName,
1424 cxn->ident, code, p) ;
1425 d_printf (1,"%s:%d Unknown response code: %d: %s\n",
1426 hostPeerName (cxn->myHost),cxn->ident, code, p) ;
1427 cxnSleepOrDie (cxn) ;
1428 hostIpFailed (cxn->myHost) ;
1429 hostCxnBlocked (cxn->myHost, cxn, rest) ;
1430 break ;
1431 }
1432
1433 if ( isOk )
1434 {
1435 /* If we got this far and the connection is IPv6, remove
1436 the IPv4 addresses from the address list. */
1437 connectionIfIpv6DeleteIpv4Addr (cxn);
1438
1439 if (hostUsername (cxn->myHost) != NULL
1440 && hostPassword (cxn->myHost) != NULL)
1441 issueAuthUser (e,cxn);
1442 else
1443 issueModeStream (e,cxn);
1444 }
1445 }
1446 freeBufferArray (b) ;
1447 }
1448
1449
1450
1451
1452
issueAuthUser(EndPoint e,Connection cxn)1453 static void issueAuthUser (EndPoint e, Connection cxn)
1454 {
1455 Buffer authUserBuffer;
1456 Buffer *authUserCmdBuffers,*readBuffers;
1457 size_t lenBuff = 0 ;
1458 char *t ;
1459
1460 /* 17 == strlen("AUTHINFO USER \r\n\0") */
1461 lenBuff = (17 + strlen (hostUsername (cxn->myHost))) ;
1462 authUserBuffer = newBuffer (lenBuff) ;
1463 t = bufferBase (authUserBuffer) ;
1464
1465 sprintf (t, "AUTHINFO USER %s\r\n", hostUsername (cxn->myHost)) ;
1466 bufferSetDataSize (authUserBuffer, strlen (t)) ;
1467
1468 authUserCmdBuffers = makeBufferArray (authUserBuffer, NULL) ;
1469
1470 if ( !prepareWriteWithTimeout (e, authUserCmdBuffers, authUserIssued,
1471 cxn) )
1472 {
1473 die ("%s:%d fatal prepare write for authinfo user failed",
1474 hostPeerName (cxn->myHost), cxn->ident) ;
1475 }
1476
1477 bufferSetDataSize (cxn->respBuffer, 0) ;
1478
1479 readBuffers = makeBufferArray (bufferTakeRef(cxn->respBuffer),NULL);
1480
1481 if ( !prepareRead (e, readBuffers, getAuthUserResponse, cxn, 1) )
1482 {
1483 warn ("%s:%d cxnsleep prepare read failed", hostPeerName (cxn->myHost),
1484 cxn->ident) ;
1485 freeBufferArray (readBuffers) ;
1486 cxnSleepOrDie (cxn) ;
1487 }
1488
1489 }
1490
1491
1492
1493
1494
1495
issueAuthPass(EndPoint e,Connection cxn)1496 static void issueAuthPass (EndPoint e, Connection cxn)
1497 {
1498 Buffer authPassBuffer;
1499 Buffer *authPassCmdBuffers,*readBuffers;
1500 size_t lenBuff = 0 ;
1501 char *t ;
1502
1503 /* 17 == strlen("AUTHINFO PASS \r\n\0") */
1504 lenBuff = (17 + strlen (hostPassword (cxn->myHost))) ;
1505 authPassBuffer = newBuffer (lenBuff) ;
1506 t = bufferBase (authPassBuffer) ;
1507
1508 sprintf (t, "AUTHINFO PASS %s\r\n", hostPassword (cxn->myHost)) ;
1509 bufferSetDataSize (authPassBuffer, strlen (t)) ;
1510
1511 authPassCmdBuffers = makeBufferArray (authPassBuffer, NULL) ;
1512
1513 if ( !prepareWriteWithTimeout (e, authPassCmdBuffers, authPassIssued,
1514 cxn) )
1515 {
1516 die ("%s:%d fatal prepare write for authinfo pass failed",
1517 hostPeerName (cxn->myHost), cxn->ident) ;
1518 }
1519
1520 bufferSetDataSize (cxn->respBuffer, 0) ;
1521
1522 readBuffers = makeBufferArray (bufferTakeRef(cxn->respBuffer),NULL);
1523
1524 if ( !prepareRead (e, readBuffers, getAuthPassResponse, cxn, 1) )
1525 {
1526 warn ("%s:%d cxnsleep prepare read failed", hostPeerName (cxn->myHost),
1527 cxn->ident) ;
1528 freeBufferArray (readBuffers) ;
1529 cxnSleepOrDie (cxn) ;
1530 }
1531
1532 }
1533
1534
1535
1536
1537
1538
issueModeStream(EndPoint e,Connection cxn)1539 static void issueModeStream (EndPoint e, Connection cxn)
1540 {
1541 Buffer *modeCmdBuffers,*readBuffers ;
1542 Buffer modeBuffer ;
1543 char *p;
1544
1545 #define MODE_CMD "MODE STREAM\r\n"
1546
1547 modeBuffer = newBuffer (strlen (MODE_CMD) + 1) ;
1548 p = bufferBase (modeBuffer) ;
1549
1550 /* now issue the MODE STREAM command */
1551 d_printf (1, "%s:%d Issuing the streaming command\n",
1552 hostPeerName (cxn->myHost), cxn->ident) ;
1553
1554 strlcpy (p, MODE_CMD, bufferSize (modeBuffer)) ;
1555
1556 bufferSetDataSize (modeBuffer, strlen (p)) ;
1557
1558 modeCmdBuffers = makeBufferArray (modeBuffer, NULL) ;
1559
1560 if ( !prepareWriteWithTimeout (e, modeCmdBuffers, modeCmdIssued,
1561 cxn) )
1562 {
1563 die ("%s:%d fatal prepare write for mode stream failed",
1564 hostPeerName (cxn->myHost), cxn->ident) ;
1565 }
1566
1567 bufferSetDataSize (cxn->respBuffer, 0) ;
1568
1569 readBuffers = makeBufferArray (bufferTakeRef(cxn->respBuffer),NULL);
1570
1571 if ( !prepareRead (e, readBuffers, getModeResponse, cxn, 1) )
1572 {
1573 warn ("%s:%d cxnsleep prepare read failed", hostPeerName (cxn->myHost),
1574 cxn->ident) ;
1575 freeBufferArray (readBuffers) ;
1576 cxnSleepOrDie (cxn) ;
1577 }
1578 }
1579
1580
1581
1582
1583
1584 /*
1585 *
1586 */
getAuthUserResponse(EndPoint e,IoStatus i,Buffer * b,void * d)1587 static void getAuthUserResponse (EndPoint e, IoStatus i, Buffer *b, void *d)
1588 {
1589 Connection cxn = (Connection) d ;
1590 int code ;
1591 char *p = bufferBase (b[0]) ;
1592 Buffer *buffers ;
1593 const char *peerName ;
1594
1595 ASSERT (e == cxn->myEp) ;
1596 ASSERT (b [0] == cxn->respBuffer) ;
1597 ASSERT (b [1] == NULL) ; /* only ever one buffer on this read */
1598 ASSERT (cxn->state == cxnConnectingS) ;
1599 VALIDATE_CONNECTION (cxn) ;
1600
1601 peerName = hostPeerName (cxn->myHost) ;
1602
1603 bufferAddNullByte (b[0]) ;
1604
1605 d_printf (1,"%s:%d Processing authinfo user response: %s", /* no NL */
1606 hostPeerName (cxn->myHost), cxn->ident, p) ;
1607
1608 if (i == IoDone && writeIsPending (cxn->myEp))
1609 {
1610 /* badness. should never happen */
1611 warn ("%s:%d cxnsleep authinfo command still pending", peerName,
1612 cxn->ident) ;
1613
1614 cxnSleepOrDie (cxn) ;
1615 }
1616 else if (i != IoDone)
1617 {
1618 if (i != IoEOF)
1619 {
1620 errno = endPointErrno (e) ;
1621 syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1622 }
1623 cxnSleepOrDie (cxn) ;
1624 }
1625 else if (strchr (p, '\n') == NULL)
1626 {
1627 /* partial read */
1628 expandBuffer (b [0], BUFFER_EXPAND_AMOUNT) ;
1629
1630 buffers = makeBufferArray (bufferTakeRef (b [0]), NULL) ;
1631 if ( !prepareRead (e, buffers, getAuthUserResponse, cxn, 1) )
1632 {
1633 warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1634 freeBufferArray (buffers) ;
1635 cxnSleepOrDie (cxn) ;
1636 }
1637 }
1638 else
1639 {
1640 clearTimer (cxn->readBlockedTimerId) ;
1641
1642 if ( !getNntpResponse (p, &code, NULL) )
1643 {
1644 warn ("%s:%d cxnsleep response to AUTHINFO USER: %s", peerName,
1645 cxn->ident, p) ;
1646
1647 cxnSleepOrDie (cxn) ;
1648 }
1649 else
1650 {
1651 notice ("%s:%d connected", peerName, cxn->ident) ;
1652
1653 switch (code)
1654 {
1655 case 381:
1656 issueAuthPass (e,cxn);
1657 break ;
1658
1659 default:
1660 warn ("%s:%d cxnsleep response to AUTHINFO USER: %s", peerName,
1661 cxn->ident, p) ;
1662 cxn->authenticated = true;
1663 issueModeStream (e,cxn);
1664 break ;
1665 }
1666
1667 }
1668 }
1669 }
1670
1671
1672
1673
1674
1675 /*
1676 *
1677 */
getAuthPassResponse(EndPoint e,IoStatus i,Buffer * b,void * d)1678 static void getAuthPassResponse (EndPoint e, IoStatus i, Buffer *b, void *d)
1679 {
1680 Connection cxn = (Connection) d ;
1681 int code ;
1682 char *p = bufferBase (b[0]) ;
1683 Buffer *buffers ;
1684 const char *peerName ;
1685
1686 ASSERT (e == cxn->myEp) ;
1687 ASSERT (b [0] == cxn->respBuffer) ;
1688 ASSERT (b [1] == NULL) ; /* only ever one buffer on this read */
1689 ASSERT (cxn->state == cxnConnectingS) ;
1690 VALIDATE_CONNECTION (cxn) ;
1691
1692 peerName = hostPeerName (cxn->myHost) ;
1693
1694 bufferAddNullByte (b[0]) ;
1695
1696 d_printf (1,"%s:%d Processing authinfo pass response: %s", /* no NL */
1697 hostPeerName (cxn->myHost), cxn->ident, p) ;
1698
1699 if (i == IoDone && writeIsPending (cxn->myEp))
1700 {
1701 /* badness. should never happen */
1702 warn ("%s:%d cxnsleep authinfo command still pending", peerName,
1703 cxn->ident) ;
1704
1705 cxnSleepOrDie (cxn) ;
1706 }
1707 else if (i != IoDone)
1708 {
1709 if (i != IoEOF)
1710 {
1711 errno = endPointErrno (e) ;
1712 syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1713 }
1714 cxnSleepOrDie (cxn) ;
1715 }
1716 else if (strchr (p, '\n') == NULL)
1717 {
1718 /* partial read */
1719 expandBuffer (b [0], BUFFER_EXPAND_AMOUNT) ;
1720
1721 buffers = makeBufferArray (bufferTakeRef (b [0]), NULL) ;
1722 if ( !prepareRead (e, buffers, getAuthPassResponse, cxn, 1) )
1723 {
1724 warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1725 freeBufferArray (buffers) ;
1726 cxnSleepOrDie (cxn) ;
1727 }
1728 }
1729 else
1730 {
1731 clearTimer (cxn->readBlockedTimerId) ;
1732
1733 if ( !getNntpResponse (p, &code, NULL) )
1734 {
1735 warn ("%s:%d cxnsleep response to AUTHINFO PASS: %s", peerName,
1736 cxn->ident, p) ;
1737
1738 cxnSleepOrDie (cxn) ;
1739 }
1740 else
1741 {
1742 switch (code)
1743 {
1744 case 281:
1745 notice ("%s:%d authenticated", peerName, cxn->ident) ;
1746 cxn->authenticated = true ;
1747 issueModeStream (e,cxn);
1748 break ;
1749
1750 default:
1751 warn ("%s:%d cxnsleep response to AUTHINFO PASS: %s", peerName,
1752 cxn->ident, p) ;
1753 cxnSleepOrDie (cxn) ;
1754 break ;
1755 }
1756
1757 }
1758 }
1759 }
1760
1761
1762
1763
1764
1765 /*
1766 * Process the remote's response to our MODE STREAM command. This is where
1767 * the Connection moves into the cxnFeedingS state. If the remote has given
1768 * us a good welcome banner, but then immediately dropped the connection,
1769 * we'll arrive here with the MODE STREAM command still queued up.
1770 */
getModeResponse(EndPoint e,IoStatus i,Buffer * b,void * d)1771 static void getModeResponse (EndPoint e, IoStatus i, Buffer *b, void *d)
1772 {
1773 Connection cxn = (Connection) d ;
1774 int code ;
1775 char *p = bufferBase (b[0]) ;
1776 Buffer *buffers ;
1777 const char *peerName ;
1778
1779 ASSERT (e == cxn->myEp) ;
1780 ASSERT (b [0] == cxn->respBuffer) ;
1781 ASSERT (b [1] == NULL) ; /* only ever one buffer on this read */
1782 ASSERT (cxn->state == cxnConnectingS) ;
1783 VALIDATE_CONNECTION (cxn) ;
1784
1785 peerName = hostPeerName (cxn->myHost) ;
1786
1787 bufferAddNullByte (b[0]) ;
1788
1789 d_printf (1,"%s:%d Processing mode response: %s", /* no NL */
1790 hostPeerName (cxn->myHost), cxn->ident, p) ;
1791
1792 if (i == IoDone && writeIsPending (cxn->myEp))
1793 { /* badness. should never happen */
1794 warn ("%s:%d cxnsleep mode stream command still pending", peerName,
1795 cxn->ident) ;
1796
1797 cxnSleepOrDie (cxn) ;
1798 }
1799 else if (i != IoDone)
1800 {
1801 if (i != IoEOF)
1802 {
1803 errno = endPointErrno (e) ;
1804 syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1805 }
1806 cxnSleepOrDie (cxn) ;
1807 }
1808 else if (strchr (p, '\n') == NULL)
1809 { /* partial read */
1810 expandBuffer (b [0], BUFFER_EXPAND_AMOUNT) ;
1811
1812 buffers = makeBufferArray (bufferTakeRef (b [0]), NULL) ;
1813 if ( !prepareRead (e, buffers, getModeResponse, cxn, 1) )
1814 {
1815 warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1816 freeBufferArray (buffers) ;
1817 cxnSleepOrDie (cxn) ;
1818 }
1819 }
1820 else
1821 {
1822 clearTimer (cxn->readBlockedTimerId) ;
1823
1824 if ( !getNntpResponse (p, &code, NULL) )
1825 {
1826 warn ("%s:%d cxnsleep response to MODE STREAM: %s", peerName,
1827 cxn->ident, p) ;
1828
1829 cxnSleepOrDie (cxn) ;
1830 }
1831 else
1832 {
1833 if (!cxn->authenticated)
1834 notice ("%s:%d connected", peerName, cxn->ident) ;
1835
1836 switch (code)
1837 {
1838 case 203: /* will do streaming */
1839 hostRemoteStreams (cxn->myHost, cxn, true) ;
1840
1841 if (hostWantsStreaming (cxn->myHost))
1842 {
1843 cxn->doesStreaming = true ;
1844 cxn->maxCheck = hostMaxChecks (cxn->myHost) ;
1845 }
1846 else
1847 cxn->maxCheck = 1 ;
1848
1849 break ;
1850
1851 default: /* won't do it */
1852 hostRemoteStreams (cxn->myHost, cxn, false) ;
1853 cxn->maxCheck = 1 ;
1854 break ;
1855 }
1856
1857 /* now we consider ourselves completly connected. */
1858 cxn->timeCon = theTime();
1859 cxn->timeCon_checkpoint = theTime();
1860
1861 if (cxn->articleQTotal == 0)
1862 cxnIdle (cxn) ;
1863 else
1864 cxn->state = cxnFeedingS ;
1865
1866 /* one for the connection and one for the buffer array */
1867 ASSERT (cxn->authenticated || bufferRefCount (cxn->respBuffer) == 2) ;
1868
1869 /* there was only one line in there, right? */
1870 bufferSetDataSize (cxn->respBuffer, 0) ;
1871 buffers = makeBufferArray (bufferTakeRef (cxn->respBuffer), NULL) ;
1872
1873 /* sleepTimeout get changed at each failed attempt, so reset. */
1874 cxn->sleepTimeout = init_reconnect_period ;
1875
1876 if ( !prepareRead (cxn->myEp, buffers, responseIsRead, cxn, 1) )
1877 {
1878 freeBufferArray (buffers) ;
1879
1880 cxnSleepOrDie (cxn) ;
1881 }
1882 else
1883 {
1884 /* now we wait for articles from our Host, or we have some
1885 articles already. On infrequently used connections, the
1886 network link is torn down and rebuilt as needed. So we may
1887 be rebuilding the connection here in which case we have an
1888 article to send. */
1889 if (writesNeeded (cxn) || hostGimmeArticle (cxn->myHost,cxn))
1890 doSomeWrites (cxn) ;
1891 }
1892 }
1893 }
1894
1895 freeBufferArray (b) ;
1896 }
1897
1898
1899
1900
1901
1902 /*
1903 * called when a response has been read from the socket. This is
1904 * where the bulk of the processing starts.
1905 */
responseIsRead(EndPoint e,IoStatus i,Buffer * b,void * d)1906 static void responseIsRead (EndPoint e, IoStatus i, Buffer *b, void *d)
1907 {
1908 Connection cxn = (Connection) d ;
1909 char *response ;
1910 char *endr ;
1911 char *bufBase ;
1912 unsigned int respSize ;
1913 int code ;
1914 char *rest = NULL ;
1915 Buffer buf ;
1916 Buffer *bArr ;
1917 const char *peerName ;
1918
1919 ASSERT (e == cxn->myEp) ;
1920 ASSERT (b != NULL) ;
1921 ASSERT (b [1] == NULL) ;
1922 ASSERT (b [0] == cxn->respBuffer) ;
1923 ASSERT (cxn->state == cxnFeedingS ||
1924 cxn->state == cxnIdleS ||
1925 cxn->state == cxnClosingS ||
1926 cxn->state == cxnFlushingS) ;
1927 VALIDATE_CONNECTION (cxn) ;
1928
1929 bufferAddNullByte (b [0]) ;
1930
1931 peerName = hostPeerName (cxn->myHost) ;
1932
1933 if (i != IoDone)
1934 { /* uh oh. */
1935 if (i != IoEOF)
1936 {
1937 errno = endPointErrno (e) ;
1938 syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1939 }
1940 freeBufferArray (b) ;
1941
1942 cxnLogStats (cxn,true) ;
1943
1944 if (cxn->state == cxnClosingS)
1945 {
1946 cxnDead (cxn) ;
1947 delConnection (cxn) ;
1948 }
1949 else
1950 cxnSleep (cxn) ;
1951
1952 return ;
1953 }
1954
1955 buf = b [0] ;
1956 bufBase = bufferBase (buf) ;
1957
1958 /* check that we have (at least) a full line response. If not expand
1959 the buffer and resubmit the read. */
1960 if (strchr (bufBase, '\n') == 0)
1961 {
1962 if (!expandBuffer (buf, BUFFER_EXPAND_AMOUNT))
1963 {
1964 warn ("%s:%d cxnsleep can't expand input buffer", peerName,
1965 cxn->ident) ;
1966 freeBufferArray (b) ;
1967
1968 cxnSleepOrDie (cxn) ;
1969 }
1970 else if ( !prepareRead (cxn->myEp, b, responseIsRead, cxn, 1))
1971 {
1972 warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1973 freeBufferArray (b) ;
1974
1975 cxnSleepOrDie (cxn) ;
1976 }
1977
1978 return ;
1979 }
1980
1981
1982 freeBufferArray (b) ; /* connection still has reference to buffer */
1983
1984
1985 /*
1986 * Now process all the full responses that we have.
1987 */
1988 response = bufBase ;
1989 respSize = bufferDataSize (cxn->respBuffer) ;
1990
1991 while ((endr = strchr (response, '\n')) != NULL)
1992 {
1993 char *next = endr + 1 ;
1994
1995 if (*next == '\r')
1996 next++ ;
1997
1998 endr-- ;
1999 if (*endr != '\r')
2000 endr++ ;
2001
2002 if (next - endr != 2 && !cxn->loggedNoCr)
2003 {
2004 /* only a newline there. we'll live with it */
2005 warn ("%s:%d remote not giving out CR characters", peerName,
2006 cxn->ident) ;
2007 cxn->loggedNoCr = true ;
2008 }
2009
2010 *endr = '\0' ;
2011
2012 if ( !getNntpResponse (response, &code, &rest) )
2013 {
2014 warn ("%s:%d cxnsleep response format: %s", peerName, cxn->ident,
2015 response) ;
2016 cxnSleepOrDie (cxn) ;
2017
2018 return ;
2019 }
2020
2021 d_printf (5,"%s:%d Response %d: %s\n", peerName, cxn->ident, code, response) ;
2022
2023 /* now handle the response code. I'm not using symbolic names on
2024 purpose--the numbers are all you see in the RFC's. */
2025 switch (code)
2026 {
2027 case 205: /* OK response to QUIT. */
2028 processResponse205 (cxn, response) ;
2029 break ;
2030
2031
2032
2033 /* These three are from the CHECK command */
2034 case 238: /* no such article found */
2035 /* Do not incrFilter (cxn) now, wait till after
2036 subsequent TAKETHIS */
2037 processResponse238 (cxn, response) ;
2038 break ;
2039
2040 case 431: /* try again later (also for TAKETHIS) */
2041 decrFilter (cxn) ;
2042 if (hostDropDeferred (cxn->myHost))
2043 processResponse438 (cxn, response) ;
2044 else
2045 processResponse431 (cxn, response) ;
2046 break ;
2047
2048 case 438: /* already have it */
2049 decrFilter (cxn) ;
2050 processResponse438 (cxn, response) ;
2051 break ;
2052
2053
2054
2055 /* These are from the TAKETHIS command */
2056 case 239: /* article transferred OK */
2057 incrFilter (cxn) ;
2058 processResponse239 (cxn, response) ;
2059 break ;
2060
2061 case 439: /* article rejected */
2062 decrFilter (cxn) ;
2063 processResponse439 (cxn, response) ;
2064 break ;
2065
2066
2067
2068 /* These are from the IHAVE command */
2069 case 335: /* send article */
2070 processResponse335 (cxn, response) ;
2071 break ;
2072
2073 case 435: /* article not wanted */
2074 processResponse435 (cxn, response) ;
2075 break ;
2076
2077 case 436: /* transfer failed try again later */
2078 if (cxn->takeRespHead == NULL && hostDropDeferred (cxn->myHost))
2079 processResponse435 (cxn, response) ;
2080 else
2081 processResponse436 (cxn, response) ;
2082 break ;
2083
2084 case 437: /* article rejected */
2085 processResponse437 (cxn, response) ;
2086 break ;
2087
2088 case 400: /* has stopped accepting articles */
2089 processResponse400 (cxn, response) ;
2090 break ;
2091
2092
2093
2094 case 235: /* article transfered OK (IHAVE-body) */
2095 processResponse235 (cxn, response) ;
2096 break ;
2097
2098
2099 case 480: /* Transfer permission denied. */
2100 processResponse480 (cxn,response) ;
2101 break ;
2102
2103 case 503: /* remote timeout. */
2104 processResponse503 (cxn,response) ;
2105 break ;
2106
2107 default:
2108 warn ("%s:%d cxnsleep response unknown: %d %s", peerName,
2109 cxn->ident, code, response) ;
2110 cxnSleepOrDie (cxn) ;
2111 break ;
2112 }
2113
2114 VALIDATE_CONNECTION (cxn) ;
2115
2116 if (cxn->state != cxnFeedingS && cxn->state != cxnClosingS &&
2117 cxn->state != cxnFlushingS && cxn->state != cxnIdleS /* XXX */)
2118 break ; /* connection is terminated */
2119
2120 response = next ;
2121 }
2122
2123 d_printf (5,"%s:%d done with responses\n",hostPeerName (cxn->myHost),
2124 cxn->ident) ;
2125
2126 switch (cxn->state)
2127 {
2128 case cxnIdleS:
2129 case cxnFeedingS:
2130 case cxnClosingS:
2131 case cxnFlushingS:
2132 /* see if we need to drop in to or out of no-CHECK mode */
2133 if (cxn->state == cxnFeedingS && cxn->doesStreaming)
2134 {
2135 if ((cxn->filterValue > cxn->onThreshold) && cxn->needsChecks) {
2136 cxn->needsChecks = false;
2137 hostLogNoCheckMode (cxn->myHost, true,
2138 cxn->offThreshold/cxn->lowPassFilter,
2139 cxn->filterValue/cxn->lowPassFilter,
2140 cxn->onThreshold/cxn->lowPassFilter) ;
2141 /* on and log */
2142 } else if ((cxn->filterValue < cxn->offThreshold) &&
2143 !cxn->needsChecks) {
2144 cxn->needsChecks = true;
2145 hostLogNoCheckMode (cxn->myHost, false,
2146 cxn->offThreshold/cxn->lowPassFilter,
2147 cxn->filterValue/cxn->lowPassFilter,
2148 cxn->onThreshold/cxn->lowPassFilter) ;
2149 /* off and log */
2150 }
2151 }
2152
2153 /* Now handle possible remaining partial response and set up for
2154 next read. */
2155 if (*response != '\0')
2156 { /* partial response */
2157 unsigned int leftAmt = respSize - (response - bufBase) ;
2158
2159 d_printf (2,"%s:%d handling a partial response\n",
2160 hostPeerName (cxn->myHost),cxn->ident) ;
2161
2162 /* first we shift what's left in the buffer down to the
2163 bottom, if needed, or just expand the buffer */
2164 if (response != bufBase)
2165 {
2166 /* so next read appends */
2167 memmove (bufBase, response, leftAmt) ;
2168 bufferSetDataSize (cxn->respBuffer, leftAmt) ;
2169 }
2170 else if (!expandBuffer (cxn->respBuffer, BUFFER_EXPAND_AMOUNT))
2171 die ("%s:%d cxnsleep can't expand input buffer", peerName,
2172 cxn->ident) ;
2173 }
2174 else
2175 bufferSetDataSize (cxn->respBuffer, 0) ;
2176
2177 bArr = makeBufferArray (bufferTakeRef (cxn->respBuffer), NULL) ;
2178
2179 if ( !prepareRead (e, bArr, responseIsRead, cxn, 1) )
2180 {
2181 warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
2182 freeBufferArray (bArr) ;
2183 cxnWait (cxn) ;
2184 return ;
2185 }
2186 else
2187 {
2188 /* only setup the timer if we're still waiting for a response
2189 to something. There's not necessarily a 1-to-1 mapping
2190 between reads and writes in streaming mode. May have been
2191 set already above (that would be unlikely I think). */
2192 VALIDATE_CONNECTION (cxn) ;
2193
2194 d_printf (5,"%s:%d about to do some writes\n",
2195 hostPeerName (cxn->myHost),cxn->ident) ;
2196
2197 doSomeWrites (cxn) ;
2198
2199 /* If the read timer is (still) running, update it to give
2200 those terminally slow hosts that take forever to drain
2201 the network buffers and just dribble out responses the
2202 benefit of the doubt. XXX - maybe should just increase
2203 timeout for these! */
2204 if (cxn->readBlockedTimerId)
2205 cxn->readBlockedTimerId = updateSleep (cxn->readBlockedTimerId,
2206 responseTimeoutCbk,
2207 cxn->readTimeout,
2208 cxn) ;
2209 }
2210 VALIDATE_CONNECTION (cxn) ;
2211 break ;
2212
2213 case cxnWaitingS: /* presumably after a code 205 or 400 */
2214 case cxnConnectingS: /* presumably after a code 205 or 400 */
2215 case cxnSleepingS: /* probably after a 480 */
2216 break ;
2217
2218 case cxnDeadS:
2219 delConnection (cxn) ;
2220 break ;
2221
2222 case cxnStartingS:
2223 default:
2224 die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
2225 }
2226 }
2227
2228
2229
2230
2231
2232 /*
2233 * called when the write of the QUIT command has completed.
2234 */
quitWritten(EndPoint e,IoStatus i,Buffer * b,void * d)2235 static void quitWritten (EndPoint e, IoStatus i, Buffer *b, void *d)
2236 {
2237 Connection cxn = (Connection) d ;
2238 const char *peerName ;
2239
2240 peerName = hostPeerName (cxn->myHost) ;
2241
2242 clearTimer (cxn->writeBlockedTimerId) ;
2243
2244 ASSERT (cxn->myEp == e) ;
2245 VALIDATE_CONNECTION (cxn) ;
2246
2247 if (i != IoDone)
2248 {
2249 errno = endPointErrno (e) ;
2250 syswarn ("%s:%d cxnsleep can't write QUIT", peerName, cxn->ident) ;
2251 if (cxn->state == cxnClosingS)
2252 {
2253 cxnDead (cxn) ;
2254 delConnection (cxn) ;
2255 }
2256 else
2257 cxnWait (cxn) ;
2258 }
2259 else
2260 /* The QUIT command has been sent, so start the response timer. */
2261 initReadBlockedTimeout (cxn) ;
2262
2263 freeBufferArray (b) ;
2264 }
2265
2266
2267
2268
2269
2270 /*
2271 * called when the write of the IHAVE-body data is finished
2272 */
ihaveBodyDone(EndPoint e,IoStatus i,Buffer * b,void * d)2273 static void ihaveBodyDone (EndPoint e, IoStatus i, Buffer *b, void *d)
2274 {
2275 Connection cxn = (Connection) d ;
2276
2277 ASSERT (e == cxn->myEp) ;
2278
2279 clearTimer (cxn->writeBlockedTimerId) ;
2280
2281 if (i != IoDone)
2282 {
2283 errno = endPointErrno (e) ;
2284 syswarn ("%s:%d cxnsleep can't write IHAVE body",
2285 hostPeerName (cxn->myHost), cxn->ident) ;
2286
2287 cxnLogStats (cxn,true) ;
2288
2289 if (cxn->state == cxnClosingS)
2290 {
2291 cxnDead (cxn) ;
2292 delConnection (cxn) ;
2293 }
2294 else
2295 cxnSleep (cxn) ;
2296 }
2297 else
2298 {
2299 /* Some hosts return a response even before we're done sending, so don't
2300 go idle until here. */
2301 if (cxn->state == cxnFeedingS && cxn->articleQTotal == 0)
2302 cxnIdle (cxn) ;
2303 else
2304 /* The command set has been sent, so start the response timer. */
2305 initReadBlockedTimeout (cxn) ;
2306 }
2307
2308 freeBufferArray (b) ;
2309
2310 return ;
2311 }
2312
2313
2314
2315
2316
2317 /*
2318 * Called when a command set (IHAVE, CHECK, TAKETHIS) has been
2319 * written to the remote.
2320 */
commandWriteDone(EndPoint e,IoStatus i,Buffer * b,void * d)2321 static void commandWriteDone (EndPoint e, IoStatus i, Buffer *b, void *d)
2322 {
2323 Connection cxn = (Connection) d ;
2324 const char *peerName ;
2325
2326 ASSERT (e == cxn->myEp) ;
2327
2328 peerName = hostPeerName (cxn->myHost) ;
2329
2330 freeBufferArray (b) ;
2331
2332 clearTimer (cxn->writeBlockedTimerId) ;
2333
2334 if (i != IoDone)
2335 {
2336 errno = endPointErrno (e) ;
2337 syswarn ("%s:%d cxnsleep can't write command", peerName, cxn->ident) ;
2338
2339 cxnLogStats (cxn,true) ;
2340
2341 if (cxn->state == cxnClosingS)
2342 {
2343 cxnDead (cxn) ;
2344 delConnection (cxn) ;
2345 }
2346 else
2347 {
2348 /* XXX - so cxnSleep() doesn't die in VALIDATE_CONNECTION () */
2349 deferAllArticles (cxn) ;
2350 cxnIdle (cxn) ;
2351
2352 cxnSleep (cxn) ;
2353 }
2354 }
2355 else
2356 {
2357 /* Some hosts return a response even before we're done sending, so don't
2358 go idle until here */
2359 if (cxn->state == cxnFeedingS && cxn->articleQTotal == 0)
2360 cxnIdle (cxn) ;
2361 else
2362 /* The command set has been sent, so start the response timer.
2363 XXX - we'd like finer grained control */
2364 initReadBlockedTimeout (cxn) ;
2365
2366 if ( cxn->doesStreaming )
2367 doSomeWrites (cxn) ; /* pump data as fast as possible */
2368 /* XXX - will clear the read timeout */
2369 }
2370 }
2371
2372
2373
2374
2375
2376 /*
2377 * Called when the MODE STREAM command has been written down the pipe.
2378 */
modeCmdIssued(EndPoint e,IoStatus i,Buffer * b,void * d)2379 static void modeCmdIssued (EndPoint e, IoStatus i, Buffer *b, void *d)
2380 {
2381 Connection cxn = (Connection) d ;
2382
2383 ASSERT (e == cxn->myEp) ;
2384
2385 clearTimer (cxn->writeBlockedTimerId) ;
2386
2387 /* The mode command has been sent, so start the response timer */
2388 initReadBlockedTimeout (cxn) ;
2389
2390 if (i != IoDone)
2391 {
2392 d_printf (1,"%s:%d MODE STREAM command failed to write\n",
2393 hostPeerName (cxn->myHost), cxn->ident) ;
2394
2395 syswarn ("%s:%d cxnsleep can't write MODE STREAM",
2396 hostPeerName (cxn->myHost), cxn->ident) ;
2397
2398 cxnSleepOrDie (cxn) ;
2399 }
2400
2401 freeBufferArray (b) ;
2402 }
2403
2404
2405
2406
2407
2408 /*
2409 * Called when the AUTHINFO USER command has been written down the pipe.
2410 */
authUserIssued(EndPoint e,IoStatus i,Buffer * b,void * d)2411 static void authUserIssued (EndPoint e, IoStatus i, Buffer *b, void *d)
2412 {
2413 Connection cxn = (Connection) d ;
2414
2415 ASSERT (e == cxn->myEp) ;
2416
2417 clearTimer (cxn->writeBlockedTimerId) ;
2418
2419 /* The authinfo user command has been sent, so start the response timer */
2420 initReadBlockedTimeout (cxn) ;
2421
2422 if (i != IoDone)
2423 {
2424 d_printf (1,"%s:%d AUTHINFO USER command failed to write\n",
2425 hostPeerName (cxn->myHost), cxn->ident) ;
2426
2427 syswarn ("%s:%d cxnsleep can't write AUTHINFO USER",
2428 hostPeerName (cxn->myHost), cxn->ident) ;
2429
2430 cxnSleepOrDie (cxn) ;
2431 }
2432
2433 freeBufferArray (b) ;
2434 }
2435
2436
2437
2438
2439
2440
2441 /*
2442 * Called when the AUTHINFO USER command has been written down the pipe.
2443 */
authPassIssued(EndPoint e,IoStatus i,Buffer * b,void * d)2444 static void authPassIssued (EndPoint e, IoStatus i, Buffer *b, void *d)
2445 {
2446 Connection cxn = (Connection) d ;
2447
2448 ASSERT (e == cxn->myEp) ;
2449
2450 clearTimer (cxn->writeBlockedTimerId) ;
2451
2452 /* The authinfo pass command has been sent, so start the response timer */
2453 initReadBlockedTimeout (cxn) ;
2454
2455 if (i != IoDone)
2456 {
2457 d_printf (1,"%s:%d AUTHINFO PASS command failed to write\n",
2458 hostPeerName (cxn->myHost), cxn->ident) ;
2459
2460 syswarn ("%s:%d cxnsleep can't write AUTHINFO PASS",
2461 hostPeerName (cxn->myHost), cxn->ident) ;
2462
2463 cxnSleepOrDie (cxn) ;
2464 }
2465
2466 freeBufferArray (b) ;
2467 }
2468
2469
2470
2471
2472
2473
2474 /*
2475 * Called whenever some amount of data has been written to the pipe but
2476 * more data remains to be written
2477 */
writeProgress(EndPoint e UNUSED,IoStatus i,Buffer * b UNUSED,void * d)2478 static void writeProgress (EndPoint e UNUSED, IoStatus i, Buffer *b UNUSED,
2479 void *d)
2480 {
2481 Connection cxn = (Connection) d ;
2482
2483 ASSERT (i == IoProgress) ;
2484
2485 if (cxn->writeTimeout > 0)
2486 cxn->writeBlockedTimerId = updateSleep (cxn->writeBlockedTimerId,
2487 writeTimeoutCbk, cxn->writeTimeout,
2488 cxn) ;
2489 }
2490
2491
2492
2493
2494
2495 /*
2496 * Timers.
2497 */
2498
2499 /*
2500 * This is called when the timeout for the response from the remote
2501 * goes off. We tear down the connection and notify our host.
2502 */
responseTimeoutCbk(TimeoutId id,void * data)2503 static void responseTimeoutCbk (TimeoutId id, void *data)
2504 {
2505 Connection cxn = (Connection) data ;
2506 const char *peerName ;
2507
2508 ASSERT (id == cxn->readBlockedTimerId) ;
2509 ASSERT (cxn->state == cxnConnectingS ||
2510 cxn->state == cxnFeedingS ||
2511 cxn->state == cxnFlushingS ||
2512 cxn->state == cxnClosingS) ;
2513 VALIDATE_CONNECTION (cxn) ;
2514
2515 /* XXX - let abortConnection clear readBlockedTimerId, otherwise
2516 VALIDATE_CONNECTION() will croak */
2517
2518 peerName = hostPeerName (cxn->myHost) ;
2519
2520 warn ("%s:%d cxnsleep non-responsive connection", peerName, cxn->ident) ;
2521 d_printf (1,"%s:%d shutting down non-responsive connection\n",
2522 hostPeerName (cxn->myHost), cxn->ident) ;
2523
2524 cxnLogStats (cxn,true) ;
2525
2526 if (cxn->state == cxnClosingS)
2527 {
2528 abortConnection (cxn) ;
2529 delConnection (cxn) ;
2530 }
2531 else
2532 cxnSleep (cxn) ; /* will notify the Host */
2533 }
2534
2535
2536
2537
2538
2539 /*
2540 * This is called when the data write timeout for the remote
2541 * goes off. We tear down the connection and notify our host.
2542 */
writeTimeoutCbk(TimeoutId id,void * data)2543 static void writeTimeoutCbk (TimeoutId id, void *data)
2544 {
2545 Connection cxn = (Connection) data ;
2546 const char *peerName ;
2547
2548 ASSERT (id == cxn->writeBlockedTimerId) ;
2549 ASSERT (cxn->state == cxnConnectingS ||
2550 cxn->state == cxnFeedingS ||
2551 cxn->state == cxnFlushingS ||
2552 cxn->state == cxnClosingS) ;
2553 VALIDATE_CONNECTION (cxn) ;
2554
2555 /* XXX - let abortConnection clear writeBlockedTimerId, otherwise
2556 VALIDATE_CONNECTION() will croak */
2557
2558 peerName = hostPeerName (cxn->myHost) ;
2559
2560 warn ("%s:%d cxnsleep write timeout", peerName, cxn->ident) ;
2561 d_printf (1,"%s:%d shutting down non-responsive connection\n",
2562 hostPeerName (cxn->myHost), cxn->ident) ;
2563
2564 cxnLogStats (cxn,true) ;
2565
2566 if (cxn->state == cxnClosingS)
2567 {
2568 abortConnection (cxn) ;
2569 delConnection (cxn) ;
2570 }
2571 else
2572 cxnSleep (cxn) ; /* will notify the Host */
2573 }
2574
2575
2576
2577
2578
2579 /*
2580 * Called by the EndPoint class when the timer goes off
2581 */
reopenTimeoutCbk(TimeoutId id,void * data)2582 static void reopenTimeoutCbk (TimeoutId id, void *data)
2583 {
2584 Connection cxn = (Connection) data ;
2585
2586 ASSERT (id == cxn->sleepTimerId) ;
2587
2588 cxn->sleepTimerId = 0 ;
2589
2590 if (cxn->state != cxnSleepingS)
2591 {
2592 warn ("%s:%d cxnsleep connection in bad state: %s",
2593 hostPeerName (cxn->myHost), cxn->ident,
2594 stateToString (cxn->state)) ;
2595 cxnSleepOrDie (cxn) ;
2596 }
2597 else
2598 cxnConnect (cxn) ;
2599 }
2600
2601
2602
2603
2604
2605 /*
2606 * timeout callback to close down long running connection.
2607 */
flushCxnCbk(TimeoutId id,void * data)2608 static void flushCxnCbk (TimeoutId id, void *data)
2609 {
2610 Connection cxn = (Connection) data ;
2611
2612 ASSERT (id == cxn->flushTimerId) ;
2613 VALIDATE_CONNECTION (cxn) ;
2614
2615 cxn->flushTimerId = 0 ;
2616
2617 if (!(cxn->state == cxnFeedingS || cxn->state == cxnConnectingS ||
2618 cxn->state == cxnIdleS))
2619 {
2620 warn ("%s:%d cxnsleep connection in bad state: %s",
2621 hostPeerName (cxn->myHost), cxn->ident,
2622 stateToString (cxn->state)) ;
2623 cxnSleepOrDie (cxn) ;
2624 }
2625 else
2626 {
2627 d_printf (1,"%s:%d Handling periodic connection close.\n",
2628 hostPeerName (cxn->myHost), cxn->ident) ;
2629
2630 notice ("%s:%d periodic close", hostPeerName (cxn->myHost), cxn->ident) ;
2631
2632 cxnFlush (cxn) ;
2633 }
2634 }
2635
2636
2637
2638
2639
2640 /*
2641 * Timer callback for when the connection has not received an
2642 * article from INN. When that happens we tear down the network
2643 * connection to help recycle fds
2644 */
articleTimeoutCbk(TimeoutId id,void * data)2645 static void articleTimeoutCbk (TimeoutId id, void *data)
2646 {
2647 Connection cxn = (Connection) data ;
2648 const char *peerName = hostPeerName (cxn->myHost) ;
2649
2650 ASSERT (cxn->artReceiptTimerId == id) ;
2651 VALIDATE_CONNECTION (cxn) ;
2652
2653 cxn->artReceiptTimerId = 0 ;
2654
2655 if (cxn->state != cxnIdleS)
2656 {
2657 warn ("%s:%d cxnsleep connection in bad state: %s",
2658 hostPeerName (cxn->myHost), cxn->ident,
2659 stateToString (cxn->state)) ;
2660 cxnSleepOrDie (cxn) ;
2661
2662 return ;
2663 }
2664
2665 /* it's doubtful (right?) that this timer could go off and there'd
2666 still be articles in the queue. */
2667 if (cxn->articleQTotal > 0)
2668 {
2669 warn ("%s:%d idle connection still has articles", peerName, cxn->ident) ;
2670 }
2671 else
2672 {
2673 notice ("%s:%d idle tearing down connection", peerName, cxn->ident) ;
2674 cxn->state = cxnIdleTimeoutS ;
2675 cxnFlush (cxn) ;
2676 }
2677 }
2678
2679
2680
2681
2682
2683 /*
2684 * function to be called when the fd is not ready for reading, but there is
2685 * an article on tape or in the queue to be done. Things are done this way
2686 * so that a Connection doesn't hog time trying to find the next good
2687 * article for writing. With a large backlog of expired articles that would
2688 * take a long time. Instead the Connection just tries its next article on
2689 * tape or queue, and if that's no good then it registers this callback so
2690 * that other Connections have a chance of being serviced.
2691 *
2692 * This function is also put on the callback queue if we called doSomeWrites
2693 * but there was already a write pending, mostly so that we don't deadlock the
2694 * connection when we got a response to the body of an IHAVE command before we
2695 * finished sending the body.
2696 */
cxnWorkProc(EndPoint ep UNUSED,void * data)2697 static void cxnWorkProc (EndPoint ep UNUSED, void *data)
2698 {
2699 Connection cxn = (Connection) data ;
2700
2701 d_printf (2,"%s:%d calling work proc\n",
2702 hostPeerName (cxn->myHost),cxn->ident) ;
2703
2704 if (writesNeeded (cxn))
2705 doSomeWrites (cxn) ; /* may re-register the work proc... */
2706 else if (cxn->state == cxnFlushingS || cxn->state == cxnClosingS)
2707 {
2708 if (cxn->articleQTotal == 0)
2709 issueQUIT (cxn) ;
2710 }
2711 else
2712 d_printf (2,"%s:%d no writes were needed...\n",
2713 hostPeerName (cxn->myHost), cxn->ident) ;
2714 }
2715
2716
2717
2718 /****************************************************************************
2719 *
2720 * END EndPoint callback area.
2721 *
2722 ****************************************************************************/
2723
2724
2725
2726
2727
2728 /****************************************************************************
2729 *
2730 * RESPONSE CODE PROCESSING.
2731 *
2732 ***************************************************************************/
2733
2734
2735 /*
2736 * A connection needs to sleep, but if it's closing it needs to die instead.
2737 */
cxnSleepOrDie(Connection cxn)2738 static void cxnSleepOrDie (Connection cxn)
2739 {
2740 if (cxn->state == cxnClosingS)
2741 cxnDead (cxn) ;
2742 else
2743 cxnSleep (cxn) ;
2744 }
2745
2746
2747 /*
2748 * Handle the response 205 to our QUIT command, which means the
2749 * remote is going away and we can happily cleanup
2750 */
processResponse205(Connection cxn,char * response UNUSED)2751 static void processResponse205 (Connection cxn, char *response UNUSED)
2752 {
2753 bool immedRecon ;
2754
2755 VALIDATE_CONNECTION (cxn) ;
2756
2757 if (!(cxn->state == cxnFeedingS ||
2758 cxn->state == cxnIdleS ||
2759 cxn->state == cxnFlushingS ||
2760 cxn->state == cxnClosingS))
2761 {
2762 warn ("%s:%d cxnsleep connection in bad state: %s",
2763 hostPeerName (cxn->myHost), cxn->ident,
2764 stateToString (cxn->state)) ;
2765 cxnSleepOrDie (cxn) ;
2766 return ;
2767 }
2768
2769 switch (cxn->state)
2770 {
2771 case cxnFlushingS:
2772 case cxnClosingS:
2773 ASSERT (cxn->articleQTotal == 0) ;
2774
2775 cxnLogStats (cxn,true) ;
2776
2777 immedRecon = cxn->immedRecon ;
2778
2779 hostCxnDead (cxn->myHost,cxn) ;
2780
2781 if (cxn->state == cxnFlushingS && immedRecon)
2782 {
2783 abortConnection (cxn) ;
2784 if (!cxnConnect (cxn))
2785 notice ("%s:%d flush re-connect failed",
2786 hostPeerName (cxn->myHost), cxn->ident) ;
2787 }
2788 else if (cxn->state == cxnFlushingS)
2789 cxnWait (cxn) ;
2790 else
2791 cxnDead (cxn) ;
2792 break ;
2793
2794 case cxnIdleS:
2795 case cxnFeedingS:
2796 /* this shouldn't ever happen... */
2797 warn ("%s:%d cxnsleep response unexpected: %d",
2798 hostPeerName (cxn->myHost), cxn->ident, 205) ;
2799 cxnSleepOrDie (cxn) ;
2800 break ;
2801
2802 default:
2803 die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
2804 }
2805 }
2806
2807
2808
2809
2810
2811 /*
2812 * Handle a response code of 238 which is the "no such article"
2813 * reply to the CHECK command (i.e. remote wants it).
2814 */
processResponse238(Connection cxn,char * response)2815 static void processResponse238 (Connection cxn, char *response)
2816 {
2817 char *msgid ;
2818 ArtHolder artHolder ;
2819
2820 if (!cxn->doesStreaming)
2821 {
2822 warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
2823 " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
2824 response) ;
2825 cxnSleepOrDie (cxn) ;
2826 return ;
2827 }
2828
2829 if (!(cxn->state == cxnFlushingS ||
2830 cxn->state == cxnFeedingS ||
2831 cxn->state == cxnClosingS))
2832 {
2833 warn ("%s:%d cxnsleep connection in bad state: %s",
2834 hostPeerName (cxn->myHost), cxn->ident,
2835 stateToString (cxn->state)) ;
2836 cxnSleepOrDie (cxn) ;
2837 return ;
2838 }
2839
2840 VALIDATE_CONNECTION (cxn) ;
2841
2842 msgid = getMsgId (response) ;
2843
2844 if (cxn->checkRespHead == NULL) /* peer is confused */
2845 {
2846 warn ("%s:%d cxnsleep response unexpected: %d",
2847 hostPeerName (cxn->myHost),cxn->ident,238) ;
2848 cxnSleepOrDie (cxn) ;
2849 }
2850 else if (msgid == NULL || strlen (msgid) == 0 ||
2851 (artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
2852 noSuchMessageId (cxn,238,msgid,response) ;
2853 else
2854 {
2855 /* now remove the article from the check queue and move it onto the
2856 transmit queue. Another function wil take care of transmitting */
2857 remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
2858 if (cxn->state != cxnClosingS)
2859 appendArtHolder (artHolder, &cxn->takeHead, &cxn->articleQTotal) ;
2860 else
2861 {
2862 hostTakeBackArticle (cxn->myHost, cxn, artHolder->article) ;
2863 delArtHolder (artHolder) ;
2864 }
2865 }
2866
2867 if (msgid != NULL)
2868 free (msgid) ;
2869 }
2870
2871
2872
2873
2874
2875 /*
2876 * process the response "try again later" to the CHECK command If this
2877 * returns true then the connection is still usable.
2878 */
processResponse431(Connection cxn,char * response)2879 static void processResponse431 (Connection cxn, char *response)
2880 {
2881 char *msgid ;
2882 ArtHolder artHolder ;
2883
2884 if (!cxn->doesStreaming)
2885 {
2886 warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
2887 " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
2888 response) ;
2889 cxnSleepOrDie (cxn) ;
2890 return ;
2891 }
2892
2893 if (!(cxn->state == cxnFlushingS ||
2894 cxn->state == cxnFeedingS ||
2895 cxn->state == cxnClosingS))
2896 {
2897 warn ("%s:%d cxnsleep connection in bad state: %s",
2898 hostPeerName (cxn->myHost), cxn->ident,
2899 stateToString (cxn->state)) ;
2900 cxnSleepOrDie (cxn) ;
2901 return ;
2902 }
2903
2904 VALIDATE_CONNECTION (cxn) ;
2905
2906 msgid = getMsgId (response) ;
2907
2908 if (cxn->checkRespHead == NULL) /* peer is confused */
2909 {
2910 warn ("%s:%d cxnsleep response unexpected: %d",
2911 hostPeerName (cxn->myHost),cxn->ident,431) ;
2912 cxnSleepOrDie (cxn) ;
2913 }
2914 else if (msgid == NULL || strlen (msgid) == 0 ||
2915 (artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
2916 noSuchMessageId (cxn,431,msgid,response) ;
2917 else
2918 {
2919 remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
2920 if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
2921 cxnIdle (cxn) ;
2922 hostArticleDeferred (cxn->myHost, cxn, artHolder->article) ;
2923 delArtHolder (artHolder) ;
2924 }
2925
2926 if (msgid != NULL)
2927 free (msgid) ;
2928 }
2929
2930
2931
2932
2933
2934 /*
2935 * process the "already have it" response to the CHECK command. If this
2936 * returns true then the connection is still usable.
2937 */
processResponse438(Connection cxn,char * response)2938 static void processResponse438 (Connection cxn, char *response)
2939 {
2940 char *msgid ;
2941 ArtHolder artHolder ;
2942
2943 if (!cxn->doesStreaming)
2944 {
2945 warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
2946 " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
2947 response) ;
2948 cxnSleepOrDie (cxn) ;
2949 return ;
2950 }
2951
2952 if (!(cxn->state == cxnFlushingS ||
2953 cxn->state == cxnFeedingS ||
2954 cxn->state == cxnClosingS))
2955 {
2956 warn ("%s:%d cxnsleep connection in bad state: %s",
2957 hostPeerName (cxn->myHost), cxn->ident,
2958 stateToString (cxn->state)) ;
2959 cxnSleepOrDie (cxn) ;
2960 return ;
2961 }
2962
2963 VALIDATE_CONNECTION (cxn) ;
2964
2965 msgid = getMsgId (response) ;
2966
2967 if (cxn->checkRespHead == NULL) /* peer is confused */
2968 {
2969 warn ("%s:%d cxnsleep response unexpected: %d",
2970 hostPeerName (cxn->myHost),cxn->ident,438) ;
2971 cxnSleepOrDie (cxn) ;
2972 }
2973 else if (msgid == NULL || strlen (msgid) == 0 ||
2974 (artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
2975 noSuchMessageId (cxn,438,msgid,response) ;
2976 else
2977 {
2978 cxn->checksRefused++ ;
2979
2980 remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
2981 if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
2982 cxnIdle (cxn) ;
2983 hostArticleNotWanted (cxn->myHost, cxn, artHolder->article);
2984 delArtHolder (artHolder) ;
2985 }
2986
2987 if (msgid != NULL)
2988 free (msgid) ;
2989 }
2990
2991
2992
2993
2994
2995 /*
2996 * process the "article transferred ok" response to the TAKETHIS.
2997 */
processResponse239(Connection cxn,char * response)2998 static void processResponse239 (Connection cxn, char *response)
2999 {
3000 char *msgid ;
3001 ArtHolder artHolder ;
3002
3003 if (!cxn->doesStreaming)
3004 {
3005 warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
3006 " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
3007 response) ;
3008 cxnSleepOrDie (cxn) ;
3009 return ;
3010 }
3011
3012 if (!(cxn->state == cxnFlushingS ||
3013 cxn->state == cxnFeedingS ||
3014 cxn->state == cxnClosingS))
3015 {
3016 warn ("%s:%d cxnsleep connection in bad state: %s",
3017 hostPeerName (cxn->myHost), cxn->ident,
3018 stateToString (cxn->state)) ;
3019 cxnSleepOrDie (cxn) ;
3020 return ;
3021 }
3022
3023 VALIDATE_CONNECTION (cxn) ;
3024
3025 msgid = getMsgId (response) ;
3026
3027 if (cxn->takeRespHead == NULL) /* peer is confused */
3028 {
3029 warn ("%s:%d cxnsleep response unexpected: %d",
3030 hostPeerName (cxn->myHost),cxn->ident,239) ;
3031 cxnSleepOrDie (cxn) ;
3032 }
3033 else if (msgid == NULL || strlen (msgid) == 0 ||
3034 (artHolder = artHolderByMsgId (msgid, cxn->takeRespHead)) == NULL)
3035 noSuchMessageId (cxn,239,msgid,response) ;
3036 else
3037 {
3038 cxn->takesOkayed++ ;
3039 cxn->takesSizeOkayed += artSize(artHolder->article);
3040
3041 remArtHolder (artHolder, &cxn->takeRespHead, &cxn->articleQTotal) ;
3042 if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
3043 cxnIdle (cxn) ;
3044 hostArticleAccepted (cxn->myHost, cxn, artHolder->article) ;
3045 delArtHolder (artHolder) ;
3046 }
3047
3048 if (msgid != NULL)
3049 free (msgid) ;
3050 }
3051
3052
3053
3054 /*
3055 * Set the thresholds for no-CHECK mode; negative means leave existing value
3056 */
3057
cxnSetCheckThresholds(Connection cxn,double lowFilter,double highFilter,double lowPassFilter)3058 void cxnSetCheckThresholds (Connection cxn,
3059 double lowFilter, double highFilter,
3060 double lowPassFilter)
3061 {
3062 /* Adjust current value for new scaling */
3063 if (cxn->lowPassFilter > 0.0)
3064 cxn->filterValue = cxn->filterValue / cxn->lowPassFilter * lowPassFilter;
3065
3066 /* Stick in new values */
3067 if (highFilter >= 0)
3068 cxn->onThreshold = highFilter * lowPassFilter / 100.0;
3069 if (lowFilter >= 0)
3070 cxn->offThreshold = lowFilter * lowPassFilter / 100.0;
3071 cxn->lowPassFilter = lowPassFilter;
3072 }
3073
3074
3075 /*
3076 * Blow away the connection gracelessly and immedately clean up
3077 */
cxnNuke(Connection cxn)3078 void cxnNuke (Connection cxn)
3079 {
3080 abortConnection (cxn) ;
3081 hostCxnDead (cxn->myHost,cxn) ;
3082 delConnection(cxn) ;
3083 }
3084
3085
3086 /*
3087 * process a "article rejected do not try again" response to the
3088 * TAKETHIS.
3089 */
processResponse439(Connection cxn,char * response)3090 static void processResponse439 (Connection cxn, char *response)
3091 {
3092 char *msgid ;
3093 ArtHolder artHolder ;
3094
3095 if (!cxn->doesStreaming)
3096 {
3097 warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
3098 " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
3099 response) ;
3100 cxnSleepOrDie (cxn) ;
3101 return ;
3102 }
3103
3104 if (!(cxn->state == cxnFlushingS ||
3105 cxn->state == cxnFeedingS ||
3106 cxn->state == cxnClosingS))
3107 {
3108 warn ("%s:%d cxnsleep connection in bad state: %s",
3109 hostPeerName (cxn->myHost), cxn->ident,
3110 stateToString (cxn->state)) ;
3111 cxnSleepOrDie (cxn) ;
3112 return ;
3113 }
3114
3115 VALIDATE_CONNECTION (cxn) ;
3116
3117 msgid = getMsgId (response) ;
3118
3119 if (cxn->takeRespHead == NULL) /* peer is confused */
3120 {
3121 /* NNTPRelay return 439 for check <messid> if messid is bad */
3122 if (cxn->checkRespHead == NULL) /* peer is confused */
3123 {
3124 warn ("%s:%d cxnsleep response unexpected: %d",
3125 hostPeerName (cxn->myHost),cxn->ident,439) ;
3126 cxnSleepOrDie (cxn) ;
3127 }
3128 else
3129 {
3130 if ((artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
3131 noSuchMessageId (cxn,439,msgid,response) ;
3132 else
3133 {
3134 cxn->checksRefused++ ;
3135 remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
3136 if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
3137 cxnIdle (cxn) ;
3138 hostArticleNotWanted (cxn->myHost, cxn, artHolder->article);
3139 delArtHolder (artHolder) ;
3140 }
3141 }
3142 }
3143 else if (msgid == NULL || strlen (msgid) == 0 ||
3144 (artHolder = artHolderByMsgId (msgid, cxn->takeRespHead)) == NULL)
3145 noSuchMessageId (cxn,439,msgid,response) ;
3146 else
3147 {
3148 cxn->takesRejected++ ;
3149 cxn->takesSizeRejected += artSize(artHolder->article);
3150
3151 remArtHolder (artHolder, &cxn->takeRespHead, &cxn->articleQTotal) ;
3152 /* Some hosts return the 439 response even before we're done sending */
3153 if (cxn->articleQTotal == 0 && !writeIsPending(cxn->myEp))
3154 cxnIdle (cxn) ;
3155 hostArticleRejected (cxn->myHost, cxn, artHolder->article) ;
3156 delArtHolder (artHolder) ;
3157 }
3158
3159 if (msgid != NULL)
3160 free (msgid) ;
3161 }
3162
3163
3164
3165
3166
3167
3168 /*
3169 * process the "article transferred ok" response to the IHAVE-body.
3170 */
processResponse235(Connection cxn,char * response UNUSED)3171 static void processResponse235 (Connection cxn, char *response UNUSED)
3172 {
3173 ArtHolder artHolder ;
3174
3175 if (cxn->doesStreaming)
3176 {
3177 warn ("%s:%d cxnsleep unexpected non-streaming response for"
3178 " streaming connection: %s", hostPeerName (cxn->myHost),
3179 cxn->ident,response) ;
3180 cxnSleepOrDie (cxn) ;
3181 return ;
3182 }
3183
3184 if (!(cxn->state == cxnFlushingS ||
3185 cxn->state == cxnFeedingS ||
3186 cxn->state == cxnClosingS))
3187 {
3188 warn ("%s:%d cxnsleep connection in bad state: %s",
3189 hostPeerName (cxn->myHost), cxn->ident,
3190 stateToString (cxn->state)) ;
3191 cxnSleepOrDie (cxn) ;
3192 return ;
3193 }
3194
3195 ASSERT (cxn->articleQTotal == 1) ;
3196 ASSERT (cxn->takeRespHead != NULL) ;
3197 VALIDATE_CONNECTION (cxn) ;
3198
3199 if (cxn->takeRespHead == NULL) /* peer is confused */
3200 {
3201 warn ("%s:%d cxnsleep response unexpected: %d",
3202 hostPeerName (cxn->myHost),cxn->ident,235) ;
3203 cxnSleepOrDie (cxn) ;
3204 }
3205 else
3206 {
3207 /* now remove the article from the queue and tell the Host to forget
3208 about it. */
3209 artHolder = cxn->takeRespHead ;
3210
3211 cxn->takeRespHead = NULL ;
3212 cxn->articleQTotal = 0 ;
3213 cxn->takesOkayed++ ;
3214 cxn->takesSizeOkayed += artSize(artHolder->article);
3215
3216 if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
3217 cxnIdle (cxn) ;
3218
3219 hostArticleAccepted (cxn->myHost, cxn, artHolder->article) ;
3220 delArtHolder (artHolder) ;
3221 }
3222 }
3223
3224
3225
3226
3227
3228 /*
3229 * process the "send article to be transfered" response to the IHAVE.
3230 */
processResponse335(Connection cxn,char * response UNUSED)3231 static void processResponse335 (Connection cxn, char *response UNUSED)
3232 {
3233 if (cxn->doesStreaming)
3234 {
3235 warn ("%s:%d cxnsleep unexpected non-streaming response for"
3236 " streaming connection: %s", hostPeerName (cxn->myHost),
3237 cxn->ident,response) ;
3238 cxnSleepOrDie (cxn) ;
3239 return ;
3240 }
3241
3242 if (!(cxn->state == cxnFlushingS ||
3243 cxn->state == cxnFeedingS ||
3244 cxn->state == cxnClosingS))
3245 {
3246 warn ("%s:%d cxnsleep connection in bad state: %s",
3247 hostPeerName (cxn->myHost), cxn->ident,
3248 stateToString (cxn->state)) ;
3249 cxnSleepOrDie (cxn) ;
3250 return ;
3251 }
3252
3253 if (cxn->checkRespHead == NULL)
3254 {
3255 warn ("%s:%d cxnsleep response unexpected: %d",
3256 hostPeerName (cxn->myHost),cxn->ident,335) ;
3257 cxnSleepOrDie (cxn) ;
3258 }
3259 else
3260 {
3261 VALIDATE_CONNECTION (cxn) ;
3262 /* now move the article into the third queue */
3263 cxn->takeHead = cxn->checkRespHead ;
3264 cxn->checkRespHead = NULL ;
3265
3266 issueIHAVEBody (cxn) ;
3267 }
3268 }
3269
3270
3271
3272
3273
3274 /*
3275 * process the "not accepting articles" response. This could be to any of
3276 * the IHAVE/CHECK/TAKETHIS command, but not the banner--that's handled
3277 * elsewhere.
3278 */
processResponse400(Connection cxn,char * response)3279 static void processResponse400 (Connection cxn, char *response)
3280 {
3281 if (!(cxn->state == cxnFlushingS ||
3282 cxn->state == cxnFeedingS ||
3283 cxn->state == cxnIdleS ||
3284 cxn->state == cxnClosingS))
3285 {
3286 warn ("%s:%d cxnsleep connection in bad state: %s",
3287 hostPeerName (cxn->myHost), cxn->ident,
3288 stateToString (cxn->state)) ;
3289 cxnSleepOrDie (cxn) ;
3290 return ;
3291 }
3292
3293 VALIDATE_CONNECTION (cxn) ;
3294
3295 /* We may get a response 400 multiple times when in streaming mode. */
3296 notice ("%s:%d remote cannot accept articles: %s",
3297 hostPeerName(cxn->myHost), cxn->ident, response) ;
3298
3299 /* right here there may still be data queued to write and so we'll fail
3300 trying to issue the quit ('cause a write will be pending). Furthermore,
3301 the data pending may be half way through an command, and so just
3302 tossing the buffer is nt sufficient. But figuring out where we are and
3303 doing a tidy job is hard */
3304 if (writeIsPending (cxn->myEp))
3305 cxnSleepOrDie (cxn) ;
3306 else
3307 {
3308 if (cxn->articleQTotal > 0)
3309 {
3310 /* Defer the articles here so that cxnFlush() doesn't set up an
3311 immediate reconnect. */
3312 deferAllArticles (cxn) ;
3313 clearTimer (cxn->readBlockedTimerId) ;
3314 /* XXX - so cxnSleep() doesn't die when it validates the connection */
3315 cxnIdle (cxn) ;
3316 }
3317 /* XXX - it would be nice if we QUIT first, but we'd have to go
3318 into a state where we just search for the 205 response, and
3319 only go into the sleep state at that point */
3320 cxnSleepOrDie (cxn) ;
3321 }
3322 }
3323
3324
3325
3326
3327
3328 /*
3329 * process the "not wanted" response to the IHAVE.
3330 */
processResponse435(Connection cxn,char * response UNUSED)3331 static void processResponse435 (Connection cxn, char *response UNUSED)
3332 {
3333 ArtHolder artHolder ;
3334
3335 if (cxn->doesStreaming)
3336 {
3337 warn ("%s:%d cxnsleep unexpected non-streaming response for"
3338 " streaming connection: %s", hostPeerName (cxn->myHost),
3339 cxn->ident,response) ;
3340 cxnSleepOrDie (cxn) ;
3341 return ;
3342 }
3343
3344 if (!(cxn->state == cxnFlushingS ||
3345 cxn->state == cxnFeedingS ||
3346 cxn->state == cxnClosingS))
3347 {
3348 warn ("%s:%d cxnsleep connection in bad state: %s",
3349 hostPeerName (cxn->myHost), cxn->ident,
3350 stateToString (cxn->state)) ;
3351 cxnSleepOrDie (cxn) ;
3352 return ;
3353 }
3354
3355 /* Some servers, such as early versions of Diablo, had a bug where they'd
3356 respond with a 435 code (which should only be used for refusing an
3357 article before it was offered) after an article has been sent. */
3358 if (cxn->checkRespHead == NULL)
3359 {
3360 warn ("%s:%d cxnsleep response unexpected: %d",
3361 hostPeerName (cxn->myHost), cxn->ident, 435) ;
3362 cxnSleepOrDie (cxn) ;
3363 return ;
3364 }
3365
3366 ASSERT (cxn->articleQTotal == 1) ;
3367 VALIDATE_CONNECTION (cxn) ;
3368
3369 cxn->articleQTotal-- ;
3370 cxn->checksRefused++ ;
3371
3372 artHolder = cxn->checkRespHead ;
3373 cxn->checkRespHead = NULL ;
3374
3375 if (cxn->articleQTotal == 0 && !writeIsPending(cxn->myEp))
3376 cxnIdle (cxn) ;
3377
3378 hostArticleNotWanted (cxn->myHost, cxn, artHolder->article) ;
3379 delArtHolder (artHolder) ;
3380
3381 #if 0
3382 d_printf (1,"%s:%d On exiting 435 article queue total is %d (%d %d %d %d)\n",
3383 hostPeerName (cxn->myHost), cxn->ident,
3384 cxn->articleQTotal,
3385 (int) (cxn->checkHead != NULL),
3386 (int) (cxn->checkRespHead != NULL),
3387 (int) (cxn->takeHead != NULL),
3388 (int) (cxn->takeRespHead != NULL));
3389 #endif
3390 }
3391
3392
3393
3394
3395
3396 /*
3397 * process the "transfer failed" response to the IHAVE-body, (seems this
3398 * can come from the IHAVE too).
3399 */
processResponse436(Connection cxn,char * response UNUSED)3400 static void processResponse436 (Connection cxn, char *response UNUSED)
3401 {
3402 ArtHolder artHolder ;
3403
3404 if (cxn->doesStreaming)
3405 {
3406 warn ("%s:%d cxnsleep unexpected non-streaming response for"
3407 " streaming connection: %s", hostPeerName (cxn->myHost),
3408 cxn->ident,response) ;
3409 cxnSleepOrDie (cxn) ;
3410 return ;
3411 }
3412
3413 if (!(cxn->state == cxnFlushingS ||
3414 cxn->state == cxnFeedingS ||
3415 cxn->state == cxnClosingS))
3416 {
3417 warn ("%s:%d cxnsleep connection in bad state: %s",
3418 hostPeerName (cxn->myHost), cxn->ident,
3419 stateToString (cxn->state)) ;
3420 cxnSleepOrDie (cxn) ;
3421 return ;
3422 }
3423
3424 ASSERT (cxn->articleQTotal == 1) ;
3425 ASSERT (cxn->takeRespHead != NULL || cxn->checkRespHead != NULL) ;
3426 VALIDATE_CONNECTION (cxn) ;
3427
3428 cxn->articleQTotal-- ;
3429
3430 if (cxn->takeRespHead != NULL) /* IHAVE-body command barfed */
3431 {
3432 artHolder = cxn->takeRespHead ;
3433 cxn->takeRespHead = NULL ;
3434 }
3435 else /* IHAVE command barfed */
3436 {
3437 artHolder = cxn->checkRespHead ;
3438 cxn->checkRespHead = NULL ;
3439 }
3440
3441 if (cxn->articleQTotal == 0 && !writeIsPending(cxn->myEp))
3442 cxnIdle (cxn) ;
3443
3444 hostArticleDeferred (cxn->myHost, cxn, artHolder->article) ;
3445 delArtHolder (artHolder) ;
3446 }
3447
3448
3449
3450
3451
3452 /*
3453 * Process the "article rejected do not try again" response to the
3454 * IHAVE-body.
3455 */
processResponse437(Connection cxn,char * response UNUSED)3456 static void processResponse437 (Connection cxn, char *response UNUSED)
3457 {
3458 ArtHolder artHolder ;
3459
3460 if (cxn->doesStreaming)
3461 {
3462 warn ("%s:%d cxnsleep unexpected non-streaming response for"
3463 " streaming connection: %s", hostPeerName (cxn->myHost),
3464 cxn->ident,response) ;
3465 cxnSleepOrDie (cxn) ;
3466 return ;
3467 }
3468
3469 if (!(cxn->state == cxnFlushingS ||
3470 cxn->state == cxnFeedingS ||
3471 cxn->state == cxnClosingS))
3472 {
3473 warn ("%s:%d cxnsleep connection in bad state: %s",
3474 hostPeerName (cxn->myHost), cxn->ident,
3475 stateToString (cxn->state)) ;
3476 cxnSleepOrDie (cxn) ;
3477 return ;
3478 }
3479
3480 ASSERT (cxn->articleQTotal == 1) ;
3481 ASSERT (cxn->takeRespHead != NULL) ;
3482 VALIDATE_CONNECTION (cxn) ;
3483
3484 cxn->articleQTotal-- ;
3485 cxn->takesRejected++ ;
3486
3487 artHolder = cxn->takeRespHead ;
3488 cxn->takeRespHead = NULL ;
3489 cxn->takesSizeRejected += artSize(artHolder->article);
3490
3491 /* Some servers return the 437 response before we're done sending. */
3492 if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
3493 cxnIdle (cxn) ;
3494
3495 hostArticleRejected (cxn->myHost, cxn, artHolder->article) ;
3496 delArtHolder (artHolder) ;
3497 }
3498
3499
3500 /* Process the response 480 Transfer permission defined. We're probably
3501 talking to a remote nnrpd on a system that forgot to put us in
3502 the hosts.nntp */
processResponse480(Connection cxn,char * response UNUSED)3503 static void processResponse480 (Connection cxn, char *response UNUSED)
3504 {
3505 if (cxn->doesStreaming)
3506 {
3507 warn ("%s:%d cxnsleep unexpected non-streaming response for"
3508 " streaming connection: %s", hostPeerName (cxn->myHost),
3509 cxn->ident,response) ;
3510 cxnSleepOrDie (cxn) ;
3511 return ;
3512 }
3513
3514 if (!(cxn->state == cxnFlushingS ||
3515 cxn->state == cxnFeedingS ||
3516 cxn->state == cxnClosingS))
3517 {
3518 warn ("%s:%d cxnsleep connection in bad state: %s",
3519 hostPeerName (cxn->myHost), cxn->ident,
3520 stateToString (cxn->state)) ;
3521 cxnSleepOrDie (cxn) ;
3522 return ;
3523 }
3524
3525 VALIDATE_CONNECTION (cxn) ;
3526
3527 warn ("%s:%d cxnsleep transfer permission denied",
3528 hostPeerName (cxn->myHost), cxn->ident) ;
3529
3530 if (cxn->state == cxnClosingS)
3531 cxnDead (cxn) ;
3532 else
3533 cxnSleep (cxn) ;
3534 }
3535
3536
3537
3538
3539
3540 /*
3541 * Handle the response 503, which means the timeout of nnrpd.
3542 */
processResponse503(Connection cxn,char * response UNUSED)3543 static void processResponse503 (Connection cxn, char *response UNUSED)
3544 {
3545 bool immedRecon ;
3546
3547 VALIDATE_CONNECTION (cxn) ;
3548
3549 if (!(cxn->state == cxnFeedingS ||
3550 cxn->state == cxnIdleS ||
3551 cxn->state == cxnFlushingS ||
3552 cxn->state == cxnClosingS))
3553 {
3554 warn ("%s:%d cxnsleep connection in bad state: %s",
3555 hostPeerName (cxn->myHost), cxn->ident,
3556 stateToString (cxn->state)) ;
3557 cxnSleepOrDie (cxn) ;
3558 return ;
3559 }
3560
3561 if (cxn->articleQTotal != 0)
3562 notice ("%s:%d flush re-connect failed", hostPeerName (cxn->myHost),
3563 cxn->ident) ;
3564
3565 cxnLogStats (cxn,true) ;
3566
3567 immedRecon = cxn->immedRecon ;
3568
3569 hostCxnDead (cxn->myHost,cxn) ;
3570
3571 if (cxn->state == cxnFlushingS && immedRecon)
3572 {
3573 abortConnection (cxn) ;
3574 if (!cxnConnect (cxn))
3575 notice ("%s:%d flush re-connect failed", hostPeerName (cxn->myHost),
3576 cxn->ident) ;
3577 }
3578 else if (cxn->state == cxnFlushingS)
3579 cxnWait (cxn) ;
3580 else
3581 cxnDead (cxn) ;
3582
3583 }
3584
3585
3586
3587
3588
3589 /****************************************************************************
3590 *
3591 * END RESPONSE CODE PROCESSING.
3592 *
3593 ***************************************************************************/
3594
3595
3596
3597
3598
3599 /*
3600 * puts the Connection into the sleep state.
3601 */
cxnSleep(Connection cxn)3602 static void cxnSleep (Connection cxn)
3603 {
3604 ASSERT (cxn != NULL) ;
3605 ASSERT (cxn->state == cxnFlushingS ||
3606 cxn->state == cxnIdleS ||
3607 cxn->state == cxnFeedingS ||
3608 cxn->state == cxnConnectingS) ;
3609 VALIDATE_CONNECTION (cxn) ;
3610
3611 abortConnection (cxn) ;
3612
3613 prepareReopenCbk (cxn) ; /* XXX - we don't want to reopen if idle */
3614 cxn->state = cxnSleepingS ;
3615
3616 /* tell our Host we're asleep so it doesn't try to give us articles */
3617 hostCxnSleeping (cxn->myHost,cxn) ;
3618 }
3619
3620
3621
cxnDead(Connection cxn)3622 static void cxnDead (Connection cxn)
3623 {
3624 ASSERT (cxn != NULL) ;
3625 VALIDATE_CONNECTION (cxn) ;
3626
3627 abortConnection (cxn) ;
3628 cxn->state = cxnDeadS ;
3629 }
3630
3631
3632
3633 /*
3634 * Sets the idle timer. If no articles arrive before the timer expires, the
3635 * connection will be closed.
3636 */
cxnIdle(Connection cxn)3637 static void cxnIdle (Connection cxn)
3638 {
3639 ASSERT (cxn != NULL) ;
3640 ASSERT (cxn->state == cxnFeedingS || cxn->state == cxnConnectingS ||
3641 cxn->state == cxnFlushingS || cxn->state == cxnClosingS) ;
3642 ASSERT (cxn->articleQTotal == 0) ;
3643 ASSERT (cxn->writeBlockedTimerId == 0) ;
3644 ASSERT (!writeIsPending (cxn->myEp)) ;
3645 ASSERT (cxn->sleepTimerId == 0) ;
3646
3647 if (cxn->state == cxnFeedingS || cxn->state == cxnConnectingS)
3648 {
3649 if (cxn->articleReceiptTimeout > 0)
3650 {
3651 clearTimer (cxn->artReceiptTimerId) ;
3652 cxn->artReceiptTimerId = prepareSleep (articleTimeoutCbk,
3653 cxn->articleReceiptTimeout,
3654 cxn) ;
3655 }
3656
3657 if (cxn->readTimeout > 0 && cxn->state == cxnFeedingS)
3658 clearTimer (cxn->readBlockedTimerId) ;
3659
3660 cxn->state = cxnIdleS ;
3661 ASSERT (cxn->readBlockedTimerId == 0) ;
3662 }
3663 }
3664
3665
3666
3667
3668
3669 /*
3670 * Called when a response from the remote refers to a non-existant
3671 * message-id. The network connection is aborted and the Connection
3672 * object goes into sleep mode.
3673 */
noSuchMessageId(Connection cxn,unsigned int responseCode,const char * msgid,const char * response)3674 static void noSuchMessageId (Connection cxn, unsigned int responseCode,
3675 const char *msgid, const char *response)
3676 {
3677 const char *peerName = hostPeerName (cxn->myHost) ;
3678
3679 if (msgid == NULL || strlen (msgid) == 0)
3680 warn ("%s:%d cxnsleep message-id missing in response code %d: %s",
3681 peerName, cxn->ident, responseCode, response) ;
3682 else
3683 warn ("%s:%d cxnsleep message-id invalid message-id in response code"
3684 " %d: %s", peerName, cxn->ident, responseCode, msgid) ;
3685
3686 cxnLogStats (cxn,true) ;
3687
3688 if (cxn->state != cxnClosingS)
3689 cxnSleep (cxn) ;
3690 else
3691 cxnDead (cxn) ;
3692 }
3693
3694
3695
3696
3697
3698 /*
3699 * a processing error has occured (for example in parsing a response), or
3700 * we're at the end of the FSM and we're cleaning up.
3701 */
abortConnection(Connection cxn)3702 static void abortConnection (Connection cxn)
3703 {
3704 ASSERT (cxn != NULL) ;
3705 VALIDATE_CONNECTION (cxn) ;
3706
3707 /* cxn->myEp could be NULL if we get here during cxnConnect (via
3708 cxnWait()) */
3709 if (cxn->myEp != NULL)
3710 {
3711
3712 delEndPoint (cxn->myEp) ;
3713 cxn->myEp = NULL ;
3714 }
3715
3716 clearTimer (cxn->sleepTimerId) ;
3717 clearTimer (cxn->artReceiptTimerId) ;
3718 clearTimer (cxn->readBlockedTimerId) ;
3719 clearTimer (cxn->writeBlockedTimerId) ;
3720 clearTimer (cxn->flushTimerId) ;
3721
3722 deferAllArticles (cxn) ; /* give any articles back to Host */
3723
3724 bufferSetDataSize (cxn->respBuffer,0) ;
3725
3726 resetConnection (cxn) ;
3727
3728 if (cxn->state == cxnFeedingS ||
3729 cxn->state == cxnIdleS ||
3730 cxn->state == cxnFlushingS ||
3731 cxn->state == cxnClosingS)
3732 hostCxnDead (cxn->myHost,cxn) ;
3733 }
3734
3735
3736
3737
3738 /*
3739 * Set up the callback used when the Connection is sleeping (i.e. will try
3740 * to reopen the connection).
3741 */
prepareReopenCbk(Connection cxn)3742 static void prepareReopenCbk (Connection cxn)
3743 {
3744 ASSERT (cxn->sleepTimerId == 0) ;
3745
3746 if (!(cxn->state == cxnConnectingS ||
3747 cxn->state == cxnIdleS ||
3748 cxn->state == cxnFeedingS ||
3749 cxn->state == cxnFlushingS ||
3750 cxn->state == cxnStartingS))
3751 {
3752 warn ("%s:%d cxnsleep connection in bad state: %s",
3753 hostPeerName (cxn->myHost), cxn->ident,
3754 stateToString (cxn->state)) ;
3755 cxnSleepOrDie (cxn) ;
3756 return ;
3757 }
3758
3759 d_printf (1,"%s:%d Setting up a reopen callback\n",
3760 hostPeerName (cxn->myHost), cxn->ident) ;
3761
3762 cxn->sleepTimerId = prepareSleep (reopenTimeoutCbk, cxn->sleepTimeout, cxn) ;
3763
3764 /* bump the sleep timer amount each time to wait longer and longer. Gets
3765 reset in resetConnection() */
3766 cxn->sleepTimeout *= 2 ;
3767 if (cxn->sleepTimeout > max_reconnect_period)
3768 cxn->sleepTimeout = max_reconnect_period ;
3769 }
3770
3771
3772
3773
3774
3775 /*
3776 * (re)set all state variables to inital condition.
3777 */
resetConnection(Connection cxn)3778 static void resetConnection (Connection cxn)
3779 {
3780 ASSERT (cxn != NULL) ;
3781
3782 bufferSetDataSize (cxn->respBuffer,0) ;
3783
3784 cxn->loggedNoCr = false ;
3785 cxn->maxCheck = 1 ;
3786 cxn->immedRecon = false ;
3787 cxn->doesStreaming = false ; /* who knows, next time around maybe... */
3788 cxn->authenticated = false ;
3789 cxn->quitWasIssued = false ;
3790 cxn->needsChecks = true ;
3791 cxn->timeCon = 0 ;
3792
3793 cxn->artsTaken = 0 ;
3794 cxn->checksIssued = 0 ;
3795 cxn->checksRefused = 0 ;
3796 cxn->takesRejected = 0 ;
3797 cxn->takesOkayed = 0 ;
3798 cxn->takesSizeRejected = 0 ;
3799 cxn->takesSizeOkayed = 0 ;
3800
3801 cxn->timeCon_checkpoint = 0;
3802 cxn->checksIssued_checkpoint = 0;
3803 cxn->checksRefused_checkpoint = 0;
3804 cxn->takesRejected_checkpoint = 0;
3805 cxn->takesOkayed_checkpoint = 0;
3806 cxn->takesSizeRejected_checkpoint = 0;
3807 cxn->takesSizeOkayed_checkpoint = 0;
3808
3809 cxn->filterValue = 0.0 ;
3810 }
3811
3812
3813
3814 /*
3815 * Give back all articles that are queued, but not actually in progress.
3816 * XXX merge come of this with deferAllArticles
3817 */
deferQueuedArticles(Connection cxn)3818 static void deferQueuedArticles (Connection cxn)
3819 {
3820 ArtHolder p, q ;
3821
3822 for (q = NULL, p = cxn->checkHead ; p != NULL ; p = q)
3823 {
3824 q = p->next ;
3825 hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3826 delArtHolder (p) ;
3827 cxn->articleQTotal-- ;
3828 }
3829 cxn->checkHead = NULL ;
3830
3831 for (q = NULL, p = cxn->takeHead ; cxn->doesStreaming && p != NULL ; p = q)
3832 {
3833 q = p->next ;
3834 hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3835 delArtHolder (p) ;
3836 cxn->articleQTotal-- ;
3837 }
3838 cxn->takeHead = NULL ;
3839 }
3840
3841
3842
3843 /*
3844 * Give back any articles we have to the Host for later retrys.
3845 */
deferAllArticles(Connection cxn)3846 static void deferAllArticles (Connection cxn)
3847 {
3848 ArtHolder p, q ;
3849
3850 for (q = NULL, p = cxn->checkHead ; p != NULL ; p = q)
3851 {
3852 q = p->next ;
3853 hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3854 delArtHolder (p) ;
3855 cxn->articleQTotal-- ;
3856 }
3857 cxn->checkHead = NULL ;
3858
3859 for (q = NULL, p = cxn->checkRespHead ; p != NULL ; p = q)
3860 {
3861 q = p->next ;
3862 hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3863 delArtHolder (p) ;
3864 cxn->articleQTotal-- ;
3865 }
3866 cxn->checkRespHead = NULL ;
3867
3868 for (q = NULL, p = cxn->takeHead ; p != NULL ; p = q)
3869 {
3870 q = p->next ;
3871 hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3872 delArtHolder (p) ;
3873 cxn->articleQTotal-- ;
3874 }
3875 cxn->takeHead = NULL ;
3876
3877 for (q = NULL, p = cxn->takeRespHead ; p != NULL ; p = q)
3878 {
3879 q = p->next ;
3880 hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3881 delArtHolder (p) ;
3882 cxn->articleQTotal-- ;
3883 }
3884 cxn->takeRespHead = NULL ;
3885
3886 ASSERT (cxn->articleQTotal == 0) ;
3887 }
3888
3889
3890
3891
3892
3893 /*
3894 * Called when there's an article to be pushed out to the remote. Even if
3895 * the Connection has an article it's possible that nothing will be written
3896 * (e.g. if the article on the queue doesn't exist any more)
3897 */
doSomeWrites(Connection cxn)3898 static void doSomeWrites (Connection cxn)
3899 {
3900 bool doneSome = false ;
3901
3902 /* If there's a write pending we can't do anything now.
3903 * No addWorkCallback here (otherwise innfeed consumes too much CPU). */
3904 if ( writeIsPending (cxn->myEp) )
3905 {
3906 return ;
3907 }
3908 else if ( writesNeeded (cxn) ) /* something on a queue. */
3909 {
3910 if (cxn->doesStreaming)
3911 doneSome = issueStreamingCommands (cxn) ;
3912 else
3913 doneSome = issueIHAVE (cxn) ;
3914
3915 /* doneSome will be false if article(s) were gone, but if the Host
3916 has something available, then it would have been put on the queue
3917 for next time around. */
3918 if (!doneSome)
3919 {
3920 if (writesNeeded (cxn)) /* Host gave us something */
3921 addWorkCallback (cxn->myEp,cxnWorkProc,cxn) ; /* for next time. */
3922 else if (cxn->articleQTotal == 0)
3923 {
3924 /* if we were in cxnFeedingS, then issueStreamingCommands
3925 already called cxnIdle(). */
3926 if (cxn->state == cxnClosingS || cxn->state == cxnFlushingS)
3927 issueQUIT (cxn) ; /* and nothing to wait for... */
3928 }
3929 }
3930 }
3931 else if (cxn->state == cxnClosingS || cxn->state == cxnFlushingS)
3932 { /* nothing to do... */
3933 if (cxn->articleQTotal == 0)
3934 issueQUIT (cxn) ; /* and nothing to wait for before closing */
3935 }
3936 }
3937
3938
3939
3940
3941
3942 /* Queue up a buffer with the IHAVE command in it for the article at
3943 * the head of the transmisson queue.
3944 *
3945 * If the article is missing, then the Host will be notified and
3946 * another article may be put on the Connections queue. This new
3947 * article is ignored for now, but a work callback is registered so
3948 * that it can be looked at later.
3949 */
issueIHAVE(Connection cxn)3950 static bool issueIHAVE (Connection cxn)
3951 {
3952 Buffer ihaveBuff, *writeArr ;
3953 ArtHolder artH ;
3954 Article article ;
3955 const char *msgid ;
3956 char *p ;
3957 unsigned int tmp ;
3958 size_t bufLen = 256 ;
3959 bool rval = false ;
3960
3961 ASSERT (!cxn->doesStreaming) ;
3962 ASSERT (cxn->state == cxnFlushingS ||
3963 cxn->state == cxnFeedingS ||
3964 cxn->state == cxnClosingS) ;
3965 ASSERT (cxn->articleQTotal == 1) ;
3966 ASSERT (cxn->checkHead != NULL) ;
3967 ASSERT (writeIsPending (cxn->myEp) == false) ;
3968 VALIDATE_CONNECTION (cxn) ;
3969
3970 artH = cxn->checkHead ;
3971 article = cxn->checkHead->article ;
3972 msgid = artMsgId (artH->article) ;
3973
3974 ASSERT (msgid != NULL) ;
3975 ASSERT (article != NULL) ;
3976
3977 if ((tmp = (strlen (msgid) + 10)) > bufLen)
3978 bufLen = tmp ;
3979
3980 ihaveBuff = newBuffer (bufLen) ;
3981
3982 ASSERT (ihaveBuff != NULL) ;
3983
3984 p = bufferBase (ihaveBuff) ;
3985 sprintf (p, "IHAVE %s\r\n", msgid) ;
3986 bufferSetDataSize (ihaveBuff, strlen (p)) ;
3987
3988 d_printf (5,"%s:%d Command IHAVE %s\n",
3989 hostPeerName (cxn->myHost),cxn->ident,msgid) ;
3990
3991 writeArr = makeBufferArray (ihaveBuff, NULL) ;
3992 if ( !prepareWriteWithTimeout (cxn->myEp, writeArr, commandWriteDone,
3993 cxn) )
3994 {
3995 die ("%s:%d fatal prepare write for IHAVE failed",
3996 hostPeerName (cxn->myHost), cxn->ident) ;
3997 }
3998
3999 /* now move the article to the second queue */
4000 cxn->checkRespHead = cxn->checkHead ;
4001 cxn->checkHead = NULL ;
4002
4003 cxn->checksIssued++ ;
4004 hostArticleOffered (cxn->myHost, cxn) ;
4005
4006 rval = true ;
4007
4008 return rval ;
4009 }
4010
4011
4012
4013
4014
4015 /*
4016 * Do a prepare write with the article body as the body portion of the
4017 * IHAVE command
4018 */
issueIHAVEBody(Connection cxn)4019 static void issueIHAVEBody (Connection cxn)
4020 {
4021 Buffer *writeArray ;
4022 Article article ;
4023
4024 ASSERT (cxn != NULL) ;
4025 ASSERT (!cxn->doesStreaming) ;
4026 ASSERT (cxn->state == cxnFlushingS ||
4027 cxn->state == cxnFeedingS ||
4028 cxn->state == cxnClosingS) ;
4029 ASSERT (cxn->articleQTotal == 1) ;
4030 ASSERT (cxn->takeHead != NULL) ;
4031 VALIDATE_CONNECTION (cxn) ;
4032
4033 article = cxn->takeHead->article ;
4034 ASSERT (article != NULL) ;
4035
4036 if (cxn->state != cxnClosingS)
4037 writeArray = artGetNntpBuffers (article) ;
4038 else
4039 writeArray = NULL ;
4040
4041 if (writeArray == NULL)
4042 {
4043 /* missing article (expired for example) will get us here. */
4044 if (dotBuffer == NULL)
4045 {
4046 dotBuffer = newBufferByCharP (".\r\n",3,3) ;
4047 dotFirstBuffer = newBufferByCharP ("\r\n.",3,3) ;
4048 crlfBuffer = newBufferByCharP ("\r\n",2,2) ;
4049 }
4050
4051 /* we'll just write the empty buffer and the remote will complain
4052 with 437 */
4053 writeArray = makeBufferArray (bufferTakeRef (dotBuffer),NULL) ;
4054 }
4055
4056
4057 if ( !prepareWriteWithTimeout (cxn->myEp, writeArray, ihaveBodyDone, cxn) )
4058 {
4059 die ("%s:%d fatal prepare write failed in issueIHAVEBody",
4060 hostPeerName (cxn->myHost), cxn->ident) ;
4061 }
4062 else
4063 {
4064 d_printf (5,"%s:%d prepared write for IHAVE body.\n",
4065 hostPeerName (cxn->myHost),cxn->ident) ;
4066 }
4067
4068 /* now move the article to the last queue */
4069 cxn->takeRespHead = cxn->takeHead ;
4070 cxn->takeHead = NULL ;
4071
4072 return ;
4073 }
4074
4075
4076
4077
4078
4079 /* Process the two command queues. Slaps all the CHECKs together and
4080 * then does the TAKETHIS commands.
4081 *
4082 * If no articles on the queue(s) are valid, then the Host is
4083 * notified. It may queue up new articles on the Connection, but
4084 * these are ignored for now. A work proc is registered so the
4085 * articles can be processed later.
4086 */
issueStreamingCommands(Connection cxn)4087 static bool issueStreamingCommands (Connection cxn)
4088 {
4089 Buffer checkBuffer = NULL ; /* the buffer with the CHECK commands in it. */
4090 Buffer *writeArray = NULL ;
4091 ArtHolder p, q ;
4092 bool rval = false ;
4093
4094 ASSERT (cxn != NULL) ;
4095 ASSERT (cxn->myEp != NULL) ;
4096 ASSERT (cxn->doesStreaming) ;
4097 VALIDATE_CONNECTION (cxn) ;
4098
4099 checkBuffer = buildCheckBuffer (cxn) ; /* may be null if none to issue */
4100
4101 if (checkBuffer != NULL)
4102 {
4103 /* Now shift the articles to their new queue. */
4104 for (p = cxn->checkRespHead ; p != NULL && p->next != NULL ; p = p->next)
4105 /* nada--finding end of queue*/ ;
4106
4107 if (p == NULL)
4108 cxn->checkRespHead = cxn->checkHead ;
4109 else
4110 p->next = cxn->checkHead ;
4111
4112 cxn->checkHead = NULL ;
4113 }
4114
4115
4116 writeArray = buildTakethisBuffers (cxn,checkBuffer) ; /* may be null */
4117
4118 /* If not null, then writeArray will have checkBuffer (if it wasn't NULL)
4119 in the first spot and the takethis buffers after that. */
4120 if (writeArray)
4121 {
4122 if ( !prepareWriteWithTimeout (cxn->myEp, writeArray,
4123 commandWriteDone, cxn) )
4124 {
4125 die ("%s:%d fatal prepare write for STREAMING commands failed",
4126 hostPeerName (cxn->myHost), cxn->ident) ;
4127 }
4128
4129 rval = true ;
4130
4131 /* now shift articles over to their new queue. */
4132 for (p = cxn->takeRespHead ; p != NULL && p->next != NULL ; p = p->next)
4133 /* nada--finding end of queue */ ;
4134
4135 if (p == NULL)
4136 cxn->takeRespHead = cxn->takeHead ;
4137 else
4138 p->next = cxn->takeHead ;
4139
4140 cxn->takeHead = NULL ;
4141 }
4142
4143 /* we defer the missing article notification to here because if there
4144 was a big backlog of missing articles *and* we're running in
4145 no-CHECK mode, then the Host would be putting bad articles on the
4146 queue we're taking them off of. */
4147 if (cxn->missing && cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
4148 cxnIdle (cxn) ;
4149 for (p = cxn->missing ; p != NULL ; p = q)
4150 {
4151 hostArticleIsMissing (cxn->myHost, cxn, p->article) ;
4152 q = p->next ;
4153 delArtHolder (p) ;
4154 }
4155 cxn->missing = NULL ;
4156
4157 return rval ;
4158 }
4159
4160
4161
4162
4163
4164 /*
4165 * build up the buffer of all the CHECK commands.
4166 */
buildCheckBuffer(Connection cxn)4167 static Buffer buildCheckBuffer (Connection cxn)
4168 {
4169 ArtHolder p ;
4170 size_t lenBuff = 0 ;
4171 Buffer checkBuffer = NULL ;
4172 const char *peerName = hostPeerName (cxn->myHost) ;
4173
4174 p = cxn->checkHead ;
4175 while (p != NULL)
4176 {
4177 Article article = p->article ;
4178 const char *msgid ;
4179
4180 msgid = artMsgId (article) ;
4181
4182 lenBuff += (8 + strlen (msgid)) ; /* 8 == strlen("CHECK \r\n") */
4183 p = p->next ;
4184 }
4185
4186 if (lenBuff > 0)
4187 lenBuff++ ; /* for the null byte */
4188
4189 /* now build up the single buffer that contains all the CHECK commands */
4190 if (lenBuff > 0)
4191 {
4192 char *t ;
4193 size_t tlen = 0 ;
4194
4195 checkBuffer = newBuffer (lenBuff) ;
4196 t = bufferBase (checkBuffer) ;
4197
4198 p = cxn->checkHead ;
4199 while (p != NULL)
4200 {
4201 const char *msgid = artMsgId (p->article) ;
4202
4203 sprintf (t,"CHECK %s\r\n", msgid) ;
4204 d_printf (5,"%s:%d Command %s\n", peerName, cxn->ident, t) ;
4205
4206 tlen += strlen (t) ;
4207
4208 while ( *t ) t++ ;
4209
4210 cxn->checksIssued++ ;
4211 hostArticleOffered (cxn->myHost,cxn) ;
4212
4213 p = p->next ;
4214 }
4215
4216 ASSERT (tlen + 1 == lenBuff) ;
4217
4218 bufferSetDataSize (checkBuffer, tlen) ;
4219 }
4220
4221 return checkBuffer ;
4222 }
4223
4224
4225
4226
4227
4228
4229 /*
4230 * Construct and array of TAKETHIS commands and the command bodies. Any
4231 * articles on the queue that are missing will be removed and the Host will
4232 * be informed.
4233 */
buildTakethisBuffers(Connection cxn,Buffer checkBuffer)4234 static Buffer *buildTakethisBuffers (Connection cxn, Buffer checkBuffer)
4235 {
4236 size_t lenArray = 0 ;
4237 ArtHolder p, q ;
4238 Buffer *rval = NULL ;
4239 const char *peerName = hostPeerName (cxn->myHost) ;
4240
4241 if (checkBuffer != NULL)
4242 lenArray++ ;
4243
4244 if (cxn->takeHead != NULL) /* some TAKETHIS commands to be done. */
4245 {
4246 Buffer takeBuffer ;
4247 unsigned int takeBuffLen ;
4248 unsigned int writeIdx = 0 ;
4249
4250 /* count up all the buffers we'll be writing. One extra each time for
4251 the TAKETHIS command buffer*/
4252 for (p = cxn->takeHead ; p != NULL ; p = p->next)
4253 if (artContentsOk (p->article))
4254 lenArray += (1 + artNntpBufferCount (p->article)) ;
4255
4256 /* now allocate the array for the buffers and put them all in it */
4257 /* 1 for the terminator */
4258 rval = xmalloc (sizeof(Buffer) * (lenArray + 1)) ;
4259
4260 if (checkBuffer != NULL)
4261 rval [writeIdx++] = checkBuffer ;
4262
4263 q = NULL ;
4264 p = cxn->takeHead ;
4265 while (p != NULL)
4266 {
4267 char *t ;
4268 const char *msgid ;
4269 Article article ;
4270 Buffer *articleBuffers ;
4271 int i, nntpLen ;
4272
4273 article = p->article ;
4274 nntpLen = artNntpBufferCount (article) ;
4275 msgid = artMsgId (article) ;
4276
4277 if (nntpLen == 0)
4278 { /* file no longer valid so drop from queue */
4279 ArtHolder ta = p ;
4280
4281 if (q == NULL) /* it's the first in the queue */
4282 cxn->takeHead = p->next ;
4283 else
4284 q->next = p->next ;
4285
4286 p = p->next ;
4287 ASSERT (cxn->articleQTotal > 0) ;
4288 cxn->articleQTotal-- ;
4289
4290 ta->next = cxn->missing ;
4291 cxn->missing = ta ;
4292 }
4293 else
4294 {
4295 articleBuffers = artGetNntpBuffers (article) ;
4296
4297 /* set up the buffer with the TAKETHIS command in it.
4298 12 == strlen ("TAKETHIS \n\r") */
4299 takeBuffLen = 12 + strlen (msgid) ;
4300 takeBuffer = newBuffer (takeBuffLen) ;
4301 t = bufferBase (takeBuffer) ;
4302
4303 sprintf (t, "TAKETHIS %s\r\n", msgid) ;
4304 bufferSetDataSize (takeBuffer, strlen (t)) ;
4305
4306 d_printf (5,"%s:%d Command %s\n", peerName, cxn->ident, t) ;
4307
4308 ASSERT (writeIdx <= lenArray) ;
4309 rval [writeIdx++] = takeBuffer ;
4310
4311 /* now add all the buffers that make up the body of the TAKETHIS
4312 command */
4313 for (i = 0 ; i < nntpLen ; i++)
4314 {
4315 ASSERT (writeIdx <= lenArray) ;
4316 rval [writeIdx++] = bufferTakeRef (articleBuffers [i]) ;
4317 }
4318
4319 freeBufferArray (articleBuffers) ;
4320
4321 if ( !cxn->needsChecks )
4322 {
4323 /* this isn't quite right. An article may be counted
4324 twice if we switch to no-CHECK mode after its
4325 CHECK was issued, but before its TAKETHIS was done
4326 just now. I'm not going to worry unless someone
4327 complains. */
4328
4329 cxn->checksIssued++ ;
4330 hostArticleOffered (cxn->myHost,cxn) ;
4331 }
4332
4333 q = p ;
4334 p = p->next ;
4335 }
4336 }
4337
4338 if (writeIdx > 0)
4339 rval [writeIdx] = NULL ;
4340 else
4341 { /* all articles were missing and no CHECKS */
4342 free (rval) ;
4343 rval = NULL ;
4344 }
4345 }
4346 else if (checkBuffer != NULL) /* no TAKETHIS to do, but some CHECKS */
4347 rval = makeBufferArray (checkBuffer, NULL) ;
4348
4349 return rval ;
4350 }
4351
4352
4353
4354
4355
4356 /*
4357 * for one reason or another we need to disconnect gracefully. We send a
4358 * QUIT command.
4359 */
issueQUIT(Connection cxn)4360 static void issueQUIT (Connection cxn)
4361 {
4362 Buffer quitBuffer, *writeArray ;
4363 const char *peerName = hostPeerName (cxn->myHost) ;
4364
4365 ASSERT (cxn->takeHead == NULL) ;
4366 ASSERT (cxn->checkHead == NULL) ;
4367 VALIDATE_CONNECTION (cxn) ;
4368
4369 if (cxn->quitWasIssued)
4370 return ;
4371
4372 if (writeIsPending (cxn->myEp))
4373 {
4374 warn ("%s:%d internal QUIT while write pending", peerName,
4375 cxn->ident) ;
4376
4377 if (cxn->state == cxnClosingS)
4378 cxnDead (cxn) ;
4379 else
4380 cxnWait (cxn) ;
4381 }
4382 else
4383 {
4384 quitBuffer = newBuffer (7) ;
4385 strlcpy (bufferBase (quitBuffer), "QUIT\r\n", 7) ;
4386 bufferSetDataSize (quitBuffer, 6) ;
4387
4388 writeArray = makeBufferArray (quitBuffer, NULL) ;
4389
4390 d_printf (1,"%s:%d Sending a quit command\n",
4391 hostPeerName (cxn->myHost),cxn->ident) ;
4392
4393 cxn->quitWasIssued = true ; /* not exactly true, but good enough */
4394
4395 if ( !prepareWriteWithTimeout (cxn->myEp, writeArray, quitWritten,
4396 cxn) )
4397 {
4398 die ("%s:%d fatal prepare write for QUIT command failed", peerName,
4399 cxn->ident) ;
4400 }
4401 }
4402 }
4403
4404
4405
4406
4407
4408 /*
4409 * Set up the timer for the blocked reads
4410 */
initReadBlockedTimeout(Connection cxn)4411 static void initReadBlockedTimeout (Connection cxn)
4412 {
4413 ASSERT (cxn != NULL) ;
4414 ASSERT (cxn->state != cxnIdleS ) ;
4415
4416 /* set up the response timer. */
4417 clearTimer (cxn->readBlockedTimerId) ;
4418
4419 if (cxn->readTimeout > 0)
4420 cxn->readBlockedTimerId = prepareSleep (responseTimeoutCbk, cxn->readTimeout, cxn) ;
4421 }
4422
4423
4424
4425
4426
4427 /*
4428 * Set up the timer for the blocked reads
4429 */
prepareWriteWithTimeout(EndPoint endp,Buffer * buffers,EndpRWCB done,Connection cxn)4430 static int prepareWriteWithTimeout (EndPoint endp,
4431 Buffer *buffers,
4432 EndpRWCB done,
4433 Connection cxn)
4434 {
4435 /* Clear the read timer, since we can't expect a response until everything
4436 is sent.
4437 XXX - would be nice to have a timeout for responses if we're sending a
4438 string of commands. */
4439 clearTimer (cxn->readBlockedTimerId) ;
4440
4441 /* set up the write timer. */
4442 clearTimer (cxn->writeBlockedTimerId) ;
4443
4444 if (cxn->writeTimeout > 0)
4445 cxn->writeBlockedTimerId = prepareSleep (writeTimeoutCbk, cxn->writeTimeout,
4446 cxn) ;
4447
4448 /* set up the write. */
4449 return prepareWrite (endp, buffers, writeProgress, done, cxn) ;
4450 }
4451
4452
4453
4454
4455
4456 /*
4457 * Does the actual deletion of a connection and all its private data.
4458 */
delConnection(Connection cxn)4459 static void delConnection (Connection cxn)
4460 {
4461 bool shutDown;
4462 Connection c, q;
4463
4464 if (cxn == NULL)
4465 return ;
4466
4467 d_printf (1,"Deleting connection: %s:%d\n",
4468 hostPeerName (cxn->myHost),cxn->ident) ;
4469
4470 for (c = gCxnList, q = NULL ; c != NULL ; q = c, c = c->next)
4471 if (c == cxn)
4472 {
4473 if (gCxnList == c)
4474 gCxnList = gCxnList->next ;
4475 else
4476 q->next = c->next ;
4477 break ;
4478 }
4479
4480 ASSERT (c != NULL) ;
4481
4482 if (cxn->myEp != NULL)
4483 delEndPoint (cxn->myEp) ;
4484
4485 ASSERT (cxn->checkHead == NULL) ;
4486 ASSERT (cxn->checkRespHead == NULL) ;
4487 ASSERT (cxn->takeHead == NULL) ;
4488 ASSERT (cxn->takeRespHead == NULL) ;
4489
4490 delBuffer (cxn->respBuffer) ;
4491
4492 /* tell the Host we're outta here. */
4493 shutDown = hostCxnGone (cxn->myHost, cxn) ;
4494
4495 cxn->ident = 0 ;
4496 cxn->timeCon = 0 ;
4497
4498 free (cxn->ipName) ;
4499
4500 clearTimer (cxn->artReceiptTimerId) ;
4501 clearTimer (cxn->readBlockedTimerId) ;
4502 clearTimer (cxn->writeBlockedTimerId) ;
4503 clearTimer (cxn->flushTimerId) ;
4504
4505 free (cxn) ;
4506
4507 if (shutDown)
4508 {
4509 /* exit program if that was the last connexion for the last host */
4510 /* XXX what about if there are ever multiple listeners?
4511 XXX this will be executed if all hosts on only one of the
4512 XXX listeners have gone */
4513 time_t now = theTime () ;
4514 char dateString [30] ;
4515 char **p = PointersFreedOnExit ;
4516
4517 /* finish out all outstanding memory */
4518 while (*p)
4519 free (*p++) ;
4520 free (PointersFreedOnExit) ;
4521 freeTimeoutQueue () ;
4522
4523 timeToString (now, dateString, sizeof(dateString)) ;
4524 notice ("ME finishing at %s", dateString) ;
4525
4526 exit (0) ;
4527 }
4528 }
4529
4530
4531
4532
4533
4534 /*
4535 * Bump up the value of the low pass filter on the connection.
4536 */
incrFilter(Connection cxn)4537 static void incrFilter (Connection cxn)
4538 {
4539 cxn->filterValue *= (1.0 - (1.0 / cxn->lowPassFilter)) ;
4540 cxn->filterValue += 1.0 ;
4541 }
4542
4543
4544
4545
4546
4547 /*
4548 * decrement the value of the low pass filter on the connection.
4549 */
decrFilter(Connection cxn)4550 static void decrFilter (Connection cxn)
4551 {
4552 cxn->filterValue *= (1.0 - (1.0 / cxn->lowPassFilter)) ;
4553 }
4554
4555
4556
4557
4558
4559 /*
4560 * return true if we have articles we need to issue commands for.
4561 */
writesNeeded(Connection cxn)4562 static bool writesNeeded (Connection cxn)
4563 {
4564 return (cxn->checkHead != NULL || cxn->takeHead != NULL ? true : false) ;
4565 }
4566
4567
4568
4569
4570
4571 /*
4572 * do some simple tests to make sure it's OK.
4573 */
validateConnection(Connection cxn)4574 static void validateConnection (Connection cxn)
4575 {
4576 unsigned int i ;
4577 unsigned int old ;
4578 ArtHolder p ;
4579
4580 i = 0 ;
4581
4582 /* count up the articles the Connection has and make sure that matches. */
4583 for (p = cxn->takeHead ; p != NULL ; p = p->next)
4584 i++ ;
4585 d_printf (4,"TAKE queue: %d\n",i) ;
4586 old = i ;
4587
4588 for (p = cxn->takeRespHead ; p != NULL ; p = p->next)
4589 i++ ;
4590 d_printf (4,"TAKE response queue: %d\n",i - old) ;
4591 old = i ;
4592
4593 for (p = cxn->checkHead ; p != NULL ; p = p->next)
4594 i++ ;
4595 d_printf (4,"CHECK queue: %d\n",i - old) ;
4596 old = i ;
4597
4598 for (p = cxn->checkRespHead ; p != NULL ; p = p->next)
4599 i++ ;
4600 d_printf (4,"CHECK response queue: %d\n",i - old) ;
4601
4602 ASSERT (i == cxn->articleQTotal) ;
4603
4604 switch (cxn->state)
4605 {
4606 case cxnConnectingS:
4607 ASSERT (cxn->doesStreaming == false) ;
4608 ASSERT (cxn->articleQTotal <= 1) ;
4609 ASSERT (cxn->artReceiptTimerId == 0) ;
4610 ASSERT (cxn->sleepTimerId == 0) ;
4611 /* ASSERT (cxn->timeCon == 0) ; */
4612 break ;
4613
4614 case cxnWaitingS:
4615 ASSERT (cxn->articleQTotal == 0) ;
4616 ASSERT (cxn->myEp == NULL) ;
4617 ASSERT (cxn->artReceiptTimerId == 0) ;
4618 ASSERT (cxn->readBlockedTimerId == 0) ;
4619 ASSERT (cxn->writeBlockedTimerId == 0) ;
4620 ASSERT (cxn->flushTimerId == 0) ;
4621 ASSERT (cxn->sleepTimerId == 0) ;
4622 ASSERT (cxn->timeCon == 0) ;
4623 break ;
4624
4625 case cxnFlushingS:
4626 case cxnClosingS:
4627 if (!cxn->doesStreaming)
4628 ASSERT (cxn->articleQTotal <= 1) ;
4629 ASSERT (cxn->artReceiptTimerId == 0) ;
4630 ASSERT (cxn->flushTimerId == 0) ;
4631 ASSERT (cxn->sleepTimerId == 0) ;
4632 ASSERT (cxn->timeCon != 0) ;
4633 ASSERT (cxn->doesStreaming || cxn->maxCheck == 1) ;
4634 break ;
4635
4636 case cxnFeedingS:
4637 if (cxn->doesStreaming)
4638 /* Some(?) hosts return the 439 response even before we're done
4639 sending, so don't go idle until here */
4640 ASSERT (cxn->articleQTotal > 0 || writeIsPending (cxn->myEp)) ;
4641 else
4642 ASSERT (cxn->articleQTotal == 1) ;
4643 if (cxn->readTimeout > 0 && !writeIsPending (cxn->myEp) &&
4644 cxn->checkRespHead != NULL && cxn->takeRespHead != NULL)
4645 ASSERT (cxn->readBlockedTimerId != 0) ;
4646 if (cxn->writeTimeout > 0 && writeIsPending (cxn->myEp))
4647 ASSERT (cxn->writeBlockedTimerId != 0) ;
4648 ASSERT (cxn->sleepTimerId == 0) ;
4649 ASSERT (cxn->timeCon != 0) ;
4650 ASSERT (cxn->doesStreaming || cxn->maxCheck == 1) ;
4651 break;
4652
4653 case cxnIdleS:
4654 ASSERT (cxn->articleQTotal == 0) ;
4655 if (cxn->articleReceiptTimeout > 0)
4656 ASSERT (cxn->artReceiptTimerId != 0) ;
4657 ASSERT (cxn->readBlockedTimerId == 0) ;
4658 ASSERT (cxn->writeBlockedTimerId == 0) ;
4659 ASSERT (cxn->sleepTimerId == 0) ;
4660 ASSERT (cxn->timeCon != 0) ;
4661 ASSERT (!writeIsPending (cxn->myEp)) ;
4662 break ;
4663
4664 case cxnIdleTimeoutS:
4665 ASSERT (cxn->articleQTotal == 0) ;
4666 ASSERT (cxn->artReceiptTimerId == 0) ;
4667 ASSERT (cxn->writeBlockedTimerId == 0) ;
4668 ASSERT (cxn->sleepTimerId == 0) ;
4669 ASSERT (cxn->timeCon != 0) ;
4670 ASSERT (!writeIsPending (cxn->myEp)) ;
4671 break ;
4672
4673 case cxnSleepingS:
4674 ASSERT (cxn->articleQTotal == 0) ;
4675 ASSERT (cxn->myEp == NULL) ;
4676 ASSERT (cxn->artReceiptTimerId == 0) ;
4677 ASSERT (cxn->readBlockedTimerId == 0) ;
4678 ASSERT (cxn->writeBlockedTimerId == 0) ;
4679 ASSERT (cxn->flushTimerId == 0) ;
4680 ASSERT (cxn->timeCon == 0) ;
4681 break ;
4682
4683 case cxnStartingS:
4684 ASSERT (cxn->articleQTotal == 0) ;
4685 ASSERT (cxn->myEp == NULL) ;
4686 ASSERT (cxn->artReceiptTimerId == 0) ;
4687 ASSERT (cxn->readBlockedTimerId == 0) ;
4688 ASSERT (cxn->writeBlockedTimerId == 0) ;
4689 ASSERT (cxn->flushTimerId == 0) ;
4690 ASSERT (cxn->sleepTimerId == 0) ;
4691 ASSERT (cxn->timeCon == 0) ;
4692 break ;
4693
4694 case cxnDeadS:
4695 break ;
4696 }
4697 }
4698
4699
4700
4701
4702
4703 /*
4704 * Generate a printable string of the parameter.
4705 */
stateToString(CxnState state)4706 static const char *stateToString (CxnState state)
4707 {
4708 static char rval [64] ;
4709
4710 switch (state)
4711 {
4712 case cxnStartingS:
4713 strlcpy (rval,"cxnStartingS", sizeof(rval)) ;
4714 break ;
4715
4716 case cxnWaitingS:
4717 strlcpy (rval,"cxnWaitingS", sizeof(rval)) ;
4718 break ;
4719
4720 case cxnConnectingS:
4721 strlcpy (rval,"cxnConnectingS", sizeof(rval)) ;
4722 break ;
4723
4724 case cxnIdleS:
4725 strlcpy (rval,"cxnIdleS", sizeof(rval)) ;
4726 break ;
4727
4728 case cxnIdleTimeoutS:
4729 strlcpy (rval,"cxnIdleTimeoutS", sizeof(rval)) ;
4730 break ;
4731
4732 case cxnFeedingS:
4733 strlcpy (rval,"cxnFeedingS", sizeof(rval)) ;
4734 break ;
4735
4736 case cxnSleepingS:
4737 strlcpy (rval,"cxnSleepingS", sizeof(rval)) ;
4738 break ;
4739
4740 case cxnFlushingS:
4741 strlcpy (rval,"cxnFlushingS", sizeof(rval)) ;
4742 break ;
4743
4744 case cxnClosingS:
4745 strlcpy (rval,"cxnClosingS", sizeof(rval)) ;
4746 break ;
4747
4748 case cxnDeadS:
4749 strlcpy (rval,"cxnDeadS", sizeof(rval)) ;
4750 break ;
4751
4752 default:
4753 snprintf (rval,sizeof(rval),"UNKNOWN STATE: %d",state) ;
4754 break ;
4755 }
4756
4757 return rval ;
4758 }
4759
4760
4761
4762
4763
4764 /****************************************************************************
4765 *
4766 * Functions for managing the internal queue of Articles on each Connection.
4767 *
4768 ****************************************************************************/
4769
newArtHolder(Article article)4770 static ArtHolder newArtHolder (Article article)
4771 {
4772 ArtHolder a = xmalloc (sizeof(struct art_holder_s)) ;
4773
4774 a->article = article ;
4775 a->next = NULL ;
4776
4777 return a ;
4778 }
4779
4780
4781
4782
4783
4784 /*
4785 * Deletes the article holder
4786 */
delArtHolder(ArtHolder artH)4787 static void delArtHolder (ArtHolder artH)
4788 {
4789 if (artH != NULL)
4790 free (artH) ;
4791 }
4792
4793
4794
4795
4796
4797 /*
4798 * remove the article holder from the queue. Adjust the count and if nxtPtr
4799 * points at the element then adjust that too.
4800 */
remArtHolder(ArtHolder artH,ArtHolder * head,unsigned int * count)4801 static bool remArtHolder (ArtHolder artH, ArtHolder *head, unsigned int *count)
4802 {
4803 ArtHolder h, i ;
4804
4805 ASSERT (head != NULL) ;
4806 ASSERT (count != NULL) ;
4807
4808 h = *head ;
4809 i = NULL ;
4810 while (h != NULL && h != artH)
4811 {
4812 i = h ;
4813 h = h->next ;
4814 }
4815
4816 if (h == NULL)
4817 return false ;
4818
4819 if (i == NULL)
4820 *head = (*head)->next ;
4821 else
4822 i->next = artH->next ;
4823
4824 (*count)-- ;
4825
4826 return true ;
4827 }
4828
4829
4830
4831
4832
4833 /*
4834 * append the ArticleHolder to the queue
4835 */
appendArtHolder(ArtHolder artH,ArtHolder * head,unsigned int * count)4836 static void appendArtHolder (ArtHolder artH, ArtHolder *head, unsigned int *count)
4837 {
4838 ArtHolder p ;
4839
4840 ASSERT (head != NULL) ;
4841 ASSERT (count != NULL) ;
4842
4843 for (p = *head ; p != NULL && p->next != NULL ; p = p->next)
4844 /* nada */ ;
4845
4846 if (p == NULL)
4847 *head = artH ;
4848 else
4849 p->next = artH ;
4850
4851 artH->next = NULL ;
4852 (*count)++ ;
4853 }
4854
4855
4856
4857
4858
4859 /*
4860 * find the article holder on the queue by comparing the message-id.
4861 */
artHolderByMsgId(const char * msgid,ArtHolder head)4862 static ArtHolder artHolderByMsgId (const char *msgid, ArtHolder head)
4863 {
4864 while (head != NULL)
4865 {
4866 if (strcmp (msgid, artMsgId (head->article)) == 0)
4867 return head ;
4868
4869 head = head->next ;
4870 }
4871
4872 return NULL ;
4873 }
4874
4875
4876
4877 /*
4878 * Randomize a numeber by the given percentage
4879 */
4880
fudgeFactor(int initVal)4881 static int fudgeFactor (int initVal)
4882 {
4883 int newValue ;
4884 static bool seeded ;
4885
4886 if ( !seeded )
4887 {
4888 time_t t = theTime () ;
4889
4890 /* this may have been done already in endpoint.c. Is that a problem??? */
4891 srand (t) ;
4892 seeded = true ;
4893 }
4894
4895 newValue = initVal + (initVal / 10 - (rand() % (initVal / 5)));
4896
4897 return newValue ;
4898 }
4899