1 /*
2  * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 84    Helper process maintenance */
10 
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "comm.h"
15 #include "comm/Connection.h"
16 #include "comm/Read.h"
17 #include "comm/Write.h"
18 #include "fd.h"
19 #include "fde.h"
20 #include "format/Quoting.h"
21 #include "helper.h"
22 #include "helper/Reply.h"
23 #include "helper/Request.h"
24 #include "MemBuf.h"
25 #include "SquidConfig.h"
26 #include "SquidIpc.h"
27 #include "SquidMath.h"
28 #include "SquidTime.h"
29 #include "Store.h"
30 #include "wordlist.h"
31 
32 // helper_stateful_server::data uses explicit alloc()/freeOne() */
33 #include "mem/Pool.h"
34 
35 #define HELPER_MAX_ARGS 64
36 
37 /// The maximum allowed request retries.
38 #define MAX_RETRIES 2
39 
40 /// Helpers input buffer size.
41 const size_t ReadBufSize(32*1024);
42 
43 static IOCB helperHandleRead;
44 static IOCB helperStatefulHandleRead;
45 static void helperServerFree(helper_server *srv);
46 static void helperStatefulServerFree(helper_stateful_server *srv);
47 static void Enqueue(helper * hlp, Helper::Xaction *);
48 static helper_server *GetFirstAvailable(const helper * hlp);
49 static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper * hlp);
50 static void helperDispatch(helper_server * srv, Helper::Xaction * r);
51 static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
52 static void helperKickQueue(helper * hlp);
53 static void helperStatefulKickQueue(statefulhelper * hlp);
54 static void helperStatefulServerDone(helper_stateful_server * srv);
55 static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
56 
57 CBDATA_CLASS_INIT(helper);
58 CBDATA_CLASS_INIT(helper_server);
59 CBDATA_CLASS_INIT(statefulhelper);
60 CBDATA_CLASS_INIT(helper_stateful_server);
61 
62 InstanceIdDefinitions(HelperServerBase, "Hlpr");
63 
64 void
initStats()65 HelperServerBase::initStats()
66 {
67     stats.uses=0;
68     stats.replies=0;
69     stats.pending=0;
70     stats.releases=0;
71     stats.timedout = 0;
72 }
73 
74 void
closePipesSafely(const char * id_name)75 HelperServerBase::closePipesSafely(const char *id_name)
76 {
77 #if _SQUID_WINDOWS_
78     shutdown(writePipe->fd, SD_BOTH);
79 #endif
80 
81     flags.closing = true;
82     if (readPipe->fd == writePipe->fd)
83         readPipe->fd = -1;
84     else
85         readPipe->close();
86     writePipe->close();
87 
88 #if _SQUID_WINDOWS_
89     if (hIpc) {
90         if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
91             getCurrentTime();
92             debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
93                    " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
94         }
95         CloseHandle(hIpc);
96     }
97 #endif
98 }
99 
100 void
closeWritePipeSafely(const char * id_name)101 HelperServerBase::closeWritePipeSafely(const char *id_name)
102 {
103 #if _SQUID_WINDOWS_
104     shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
105 #endif
106 
107     flags.closing = true;
108     if (readPipe->fd == writePipe->fd)
109         readPipe->fd = -1;
110     writePipe->close();
111 
112 #if _SQUID_WINDOWS_
113     if (hIpc) {
114         if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
115             getCurrentTime();
116             debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
117                    " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
118         }
119         CloseHandle(hIpc);
120     }
121 #endif
122 }
123 
124 void
helperOpenServers(helper * hlp)125 helperOpenServers(helper * hlp)
126 {
127     char *s;
128     char *progname;
129     char *shortname;
130     char *procname;
131     const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
132     char fd_note_buf[FD_DESC_SZ];
133     helper_server *srv;
134     int nargs = 0;
135     int k;
136     pid_t pid;
137     int rfd;
138     int wfd;
139     void * hIpc;
140     wordlist *w;
141 
142     if (hlp->cmdline == NULL)
143         return;
144 
145     progname = hlp->cmdline->key;
146 
147     if ((s = strrchr(progname, '/')))
148         shortname = xstrdup(s + 1);
149     else
150         shortname = xstrdup(progname);
151 
152     /* figure out how many new child are actually needed. */
153     int need_new = hlp->childs.needNew();
154 
155     debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
156 
157     if (need_new < 1) {
158         debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
159     }
160 
161     procname = (char *)xmalloc(strlen(shortname) + 3);
162 
163     snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
164 
165     args[nargs] = procname;
166     ++nargs;
167 
168     for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
169         args[nargs] = w->key;
170         ++nargs;
171     }
172 
173     args[nargs] = NULL;
174     ++nargs;
175 
176     assert(nargs <= HELPER_MAX_ARGS);
177 
178     for (k = 0; k < need_new; ++k) {
179         getCurrentTime();
180         rfd = wfd = -1;
181         pid = ipcCreate(hlp->ipc_type,
182                         progname,
183                         args,
184                         shortname,
185                         hlp->addr,
186                         &rfd,
187                         &wfd,
188                         &hIpc);
189 
190         if (pid < 0) {
191             debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
192             continue;
193         }
194 
195         ++ hlp->childs.n_running;
196         ++ hlp->childs.n_active;
197         srv = new helper_server;
198         srv->hIpc = hIpc;
199         srv->pid = pid;
200         srv->initStats();
201         srv->addr = hlp->addr;
202         srv->readPipe = new Comm::Connection;
203         srv->readPipe->fd = rfd;
204         srv->writePipe = new Comm::Connection;
205         srv->writePipe->fd = wfd;
206         srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
207         srv->wqueue = new MemBuf;
208         srv->roffset = 0;
209         srv->nextRequestId = 0;
210         srv->replyXaction = NULL;
211         srv->ignoreToEom = false;
212         srv->parent = cbdataReference(hlp);
213         dlinkAddTail(srv, &srv->link, &hlp->servers);
214 
215         if (rfd == wfd) {
216             snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
217             fd_note(rfd, fd_note_buf);
218         } else {
219             snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
220             fd_note(rfd, fd_note_buf);
221             snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
222             fd_note(wfd, fd_note_buf);
223         }
224 
225         commSetNonBlocking(rfd);
226 
227         if (wfd != rfd)
228             commSetNonBlocking(wfd);
229 
230         AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
231         comm_add_close_handler(rfd, closeCall);
232 
233         if (hlp->timeout && hlp->childs.concurrency) {
234             AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
235                                              CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
236             commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
237         }
238 
239         AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
240                                              CommIoCbPtrFun(helperHandleRead, srv));
241         comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
242     }
243 
244     hlp->last_restart = squid_curtime;
245     safe_free(shortname);
246     safe_free(procname);
247     helperKickQueue(hlp);
248 }
249 
250 /**
251  * DPW 2007-05-08
252  *
253  * helperStatefulOpenServers: create the stateful child helper processes
254  */
255 void
helperStatefulOpenServers(statefulhelper * hlp)256 helperStatefulOpenServers(statefulhelper * hlp)
257 {
258     char *shortname;
259     const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
260     char fd_note_buf[FD_DESC_SZ];
261     int nargs = 0;
262 
263     if (hlp->cmdline == NULL)
264         return;
265 
266     if (hlp->childs.concurrency)
267         debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
268 
269     char *progname = hlp->cmdline->key;
270 
271     char *s;
272     if ((s = strrchr(progname, '/')))
273         shortname = xstrdup(s + 1);
274     else
275         shortname = xstrdup(progname);
276 
277     /* figure out haw mant new helpers are needed. */
278     int need_new = hlp->childs.needNew();
279 
280     debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
281 
282     if (need_new < 1) {
283         debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
284     }
285 
286     char *procname = (char *)xmalloc(strlen(shortname) + 3);
287 
288     snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
289 
290     args[nargs] = procname;
291     ++nargs;
292 
293     for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
294         args[nargs] = w->key;
295         ++nargs;
296     }
297 
298     args[nargs] = NULL;
299     ++nargs;
300 
301     assert(nargs <= HELPER_MAX_ARGS);
302 
303     for (int k = 0; k < need_new; ++k) {
304         getCurrentTime();
305         int rfd = -1;
306         int wfd = -1;
307         void * hIpc;
308         pid_t pid = ipcCreate(hlp->ipc_type,
309                               progname,
310                               args,
311                               shortname,
312                               hlp->addr,
313                               &rfd,
314                               &wfd,
315                               &hIpc);
316 
317         if (pid < 0) {
318             debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
319             continue;
320         }
321 
322         ++ hlp->childs.n_running;
323         ++ hlp->childs.n_active;
324         helper_stateful_server *srv = new helper_stateful_server;
325         srv->hIpc = hIpc;
326         srv->pid = pid;
327         srv->flags.reserved = false;
328         srv->initStats();
329         srv->addr = hlp->addr;
330         srv->readPipe = new Comm::Connection;
331         srv->readPipe->fd = rfd;
332         srv->writePipe = new Comm::Connection;
333         srv->writePipe->fd = wfd;
334         srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
335         srv->roffset = 0;
336         srv->parent = cbdataReference(hlp);
337 
338         if (hlp->datapool != NULL)
339             srv->data = hlp->datapool->alloc();
340 
341         dlinkAddTail(srv, &srv->link, &hlp->servers);
342 
343         if (rfd == wfd) {
344             snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
345             fd_note(rfd, fd_note_buf);
346         } else {
347             snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
348             fd_note(rfd, fd_note_buf);
349             snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
350             fd_note(wfd, fd_note_buf);
351         }
352 
353         commSetNonBlocking(rfd);
354 
355         if (wfd != rfd)
356             commSetNonBlocking(wfd);
357 
358         AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
359         comm_add_close_handler(rfd, closeCall);
360 
361         AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
362                                              CommIoCbPtrFun(helperStatefulHandleRead, srv));
363         comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
364     }
365 
366     hlp->last_restart = squid_curtime;
367     safe_free(shortname);
368     safe_free(procname);
369     helperStatefulKickQueue(hlp);
370 }
371 
372 void
submitRequest(Helper::Xaction * r)373 helper::submitRequest(Helper::Xaction *r)
374 {
375     helper_server *srv;
376 
377     if ((srv = GetFirstAvailable(this)))
378         helperDispatch(srv, r);
379     else
380         Enqueue(this, r);
381 
382     syncQueueStats();
383 }
384 
385 /// handles helperSubmit() and helperStatefulSubmit() failures
386 static void
SubmissionFailure(helper * hlp,HLPCB * callback,void * data)387 SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
388 {
389     auto result = Helper::Error;
390     if (!hlp) {
391         debugs(84, 3, "no helper");
392         result = Helper::Unknown;
393     }
394     // else pretend the helper has responded with ERR
395 
396     callback(data, Helper::Reply(result));
397 }
398 
399 void
helperSubmit(helper * hlp,const char * buf,HLPCB * callback,void * data)400 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
401 {
402     if (!hlp || !hlp->trySubmit(buf, callback, data))
403         SubmissionFailure(hlp, callback, data);
404 }
405 
406 /// whether queuing an additional request would overload the helper
407 bool
queueFull() const408 helper::queueFull() const {
409     return stats.queue_size >= static_cast<int>(childs.queue_size);
410 }
411 
412 bool
overloaded() const413 helper::overloaded() const {
414     return stats.queue_size > static_cast<int>(childs.queue_size);
415 }
416 
417 /// synchronizes queue-dependent measurements with the current queue state
418 void
syncQueueStats()419 helper::syncQueueStats()
420 {
421     if (overloaded()) {
422         if (overloadStart) {
423             debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
424         } else {
425             overloadStart = squid_curtime;
426             debugs(84, 3, id_name << " became overloaded");
427         }
428     } else {
429         if (overloadStart) {
430             debugs(84, 5, id_name << " is no longer overloaded");
431             if (droppedRequests) {
432                 debugs(84, DBG_IMPORTANT, "helper " << id_name <<
433                        " is no longer overloaded after dropping " << droppedRequests <<
434                        " requests in " << (squid_curtime - overloadStart) << " seconds");
435                 droppedRequests = 0;
436             }
437             overloadStart = 0;
438         }
439     }
440 }
441 
442 /// prepares the helper for request submission
443 /// returns true if and only if the submission should proceed
444 /// may kill Squid if the helper remains overloaded for too long
445 bool
prepSubmit()446 helper::prepSubmit()
447 {
448     // re-sync for the configuration may have changed since the last submission
449     syncQueueStats();
450 
451     // Nothing special to do if the new request does not overload (i.e., the
452     // queue is not even full yet) or only _starts_ overloading this helper
453     // (i.e., the queue is currently at its limit).
454     if (!overloaded())
455         return true;
456 
457     if (squid_curtime - overloadStart <= 180)
458         return true; // also OK: overload has not persisted long enough to panic
459 
460     if (childs.onPersistentOverload == Helper::ChildConfig::actDie)
461         fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
462 
463     if (!droppedRequests) {
464         debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
465                id_name << " helper configured with on-persistent-overload=err");
466     }
467     ++droppedRequests;
468     debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
469     return false;
470 }
471 
472 bool
trySubmit(const char * buf,HLPCB * callback,void * data)473 helper::trySubmit(const char *buf, HLPCB * callback, void *data)
474 {
475     if (!prepSubmit())
476         return false; // request was dropped
477 
478     submit(buf, callback, data); // will send or queue
479     return true; // request submitted or queued
480 }
481 
482 /// dispatches or enqueues a helper requests; does not enforce queue limits
483 void
submit(const char * buf,HLPCB * callback,void * data)484 helper::submit(const char *buf, HLPCB * callback, void *data)
485 {
486     Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
487     submitRequest(r);
488     debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
489 }
490 
491 /// lastserver = "server last used as part of a reserved request sequence"
492 void
helperStatefulSubmit(statefulhelper * hlp,const char * buf,HLPCB * callback,void * data,helper_stateful_server * lastserver)493 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
494 {
495     if (!hlp || !hlp->trySubmit(buf, callback, data, lastserver))
496         SubmissionFailure(hlp, callback, data);
497 }
498 
499 /// If possible, submit request. Otherwise, either kill Squid or return false.
500 bool
trySubmit(const char * buf,HLPCB * callback,void * data,helper_stateful_server * lastserver)501 statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, helper_stateful_server *lastserver)
502 {
503     if (!prepSubmit())
504         return false; // request was dropped
505 
506     submit(buf, callback, data, lastserver); // will send or queue
507     return true; // request submitted or queued
508 }
509 
submit(const char * buf,HLPCB * callback,void * data,helper_stateful_server * lastserver)510 void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
511 {
512     Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
513 
514     if ((buf != NULL) && lastserver) {
515         debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
516         assert(lastserver->flags.reserved);
517         assert(!lastserver->requests.size());
518 
519         debugs(84, 5, "StatefulSubmit dispatching");
520         helperStatefulDispatch(lastserver, r);
521     } else {
522         helper_stateful_server *srv;
523         if ((srv = StatefulGetFirstAvailable(this))) {
524             helperStatefulDispatch(srv, r);
525         } else
526             StatefulEnqueue(this, r);
527     }
528 
529     debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
530            "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
531 
532     syncQueueStats();
533 }
534 
535 /**
536  * DPW 2007-05-08
537  *
538  * helperStatefulReleaseServer tells the helper that whoever was
539  * using it no longer needs its services.
540  */
541 void
helperStatefulReleaseServer(helper_stateful_server * srv)542 helperStatefulReleaseServer(helper_stateful_server * srv)
543 {
544     debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
545     if (!srv->flags.reserved)
546         return;
547 
548     ++ srv->stats.releases;
549 
550     srv->flags.reserved = false;
551 
552     helperStatefulServerDone(srv);
553 }
554 
555 /** return a pointer to the stateful routines data area */
556 void *
helperStatefulServerGetData(helper_stateful_server * srv)557 helperStatefulServerGetData(helper_stateful_server * srv)
558 {
559     return srv->data;
560 }
561 
562 void
packStatsInto(Packable * p,const char * label) const563 helper::packStatsInto(Packable *p, const char *label) const
564 {
565     if (label)
566         p->appendf("%s:\n", label);
567 
568     p->appendf("  program: %s\n", cmdline->key);
569     p->appendf("  number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
570     p->appendf("  requests sent: %d\n", stats.requests);
571     p->appendf("  replies received: %d\n", stats.replies);
572     p->appendf("  requests timedout: %d\n", stats.timedout);
573     p->appendf("  queue length: %d\n", stats.queue_size);
574     p->appendf("  avg service time: %d msec\n", stats.avg_svc_time);
575     p->append("\n",1);
576     p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
577                "ID #",
578                "FD",
579                "PID",
580                "# Requests",
581                "# Replies",
582                "# Timed-out",
583                "Flags",
584                "Time",
585                "Offset",
586                "Request");
587 
588     for (dlink_node *link = servers.head; link; link = link->next) {
589         HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
590         assert(srv);
591         Helper::Xaction *xaction = srv->requests.empty() ? NULL : srv->requests.front();
592         double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
593         p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
594                    srv->index.value,
595                    srv->readPipe->fd,
596                    srv->pid,
597                    srv->stats.uses,
598                    srv->stats.replies,
599                    srv->stats.timedout,
600                    srv->stats.pending ? 'B' : ' ',
601                    srv->flags.writing ? 'W' : ' ',
602                    srv->flags.closing ? 'C' : ' ',
603                    srv->flags.reserved ? 'R' : ' ',
604                    srv->flags.shutdown ? 'S' : ' ',
605                    xaction && xaction->request.placeholder ? 'P' : ' ',
606                    tt < 0.0 ? 0.0 : tt,
607                    (int) srv->roffset,
608                    xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
609     }
610 
611     p->append("\nFlags key:\n"
612               "   B\tBUSY\n"
613               "   W\tWRITING\n"
614               "   C\tCLOSING\n"
615               "   R\tRESERVED\n"
616               "   S\tSHUTDOWN PENDING\n"
617               "   P\tPLACEHOLDER\n", 101);
618 }
619 
620 bool
willOverload() const621 helper::willOverload() const {
622     return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
623 }
624 
625 void
helperShutdown(helper * hlp)626 helperShutdown(helper * hlp)
627 {
628     dlink_node *link = hlp->servers.head;
629 
630     while (link) {
631         helper_server *srv;
632         srv = (helper_server *)link->data;
633         link = link->next;
634 
635         if (srv->flags.shutdown) {
636             debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
637             continue;
638         }
639 
640         assert(hlp->childs.n_active > 0);
641         -- hlp->childs.n_active;
642         srv->flags.shutdown = true; /* request it to shut itself down */
643 
644         if (srv->flags.closing) {
645             debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
646             continue;
647         }
648 
649         if (srv->stats.pending) {
650             debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
651             continue;
652         }
653 
654         debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
655         /* the rest of the details is dealt with in the helperServerFree
656          * close handler
657          */
658         srv->closePipesSafely(hlp->id_name);
659     }
660 }
661 
662 void
helperStatefulShutdown(statefulhelper * hlp)663 helperStatefulShutdown(statefulhelper * hlp)
664 {
665     dlink_node *link = hlp->servers.head;
666     helper_stateful_server *srv;
667 
668     while (link) {
669         srv = (helper_stateful_server *)link->data;
670         link = link->next;
671 
672         if (srv->flags.shutdown) {
673             debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
674             continue;
675         }
676 
677         assert(hlp->childs.n_active > 0);
678         -- hlp->childs.n_active;
679         srv->flags.shutdown = true; /* request it to shut itself down */
680 
681         if (srv->stats.pending) {
682             debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
683             continue;
684         }
685 
686         if (srv->flags.closing) {
687             debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
688             continue;
689         }
690 
691         if (srv->flags.reserved) {
692             if (shutting_down) {
693                 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
694             } else {
695                 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
696                 continue;
697             }
698         }
699 
700         debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
701 
702         /* the rest of the details is dealt with in the helperStatefulServerFree
703          * close handler
704          */
705         srv->closePipesSafely(hlp->id_name);
706     }
707 }
708 
~helper()709 helper::~helper()
710 {
711     /* note, don't free id_name, it probably points to static memory */
712 
713     // TODO: if the queue is not empty it will leak Helper::Request's
714     if (!queue.empty())
715         debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
716 }
717 
718 /* ====================================================================== */
719 /* LOCAL FUNCTIONS */
720 /* ====================================================================== */
721 
722 static void
helperServerFree(helper_server * srv)723 helperServerFree(helper_server *srv)
724 {
725     helper *hlp = srv->parent;
726     int concurrency = hlp->childs.concurrency;
727 
728     if (!concurrency)
729         concurrency = 1;
730 
731     if (srv->rbuf) {
732         memFreeBuf(srv->rbuf_sz, srv->rbuf);
733         srv->rbuf = NULL;
734     }
735 
736     srv->wqueue->clean();
737     delete srv->wqueue;
738 
739     if (srv->writebuf) {
740         srv->writebuf->clean();
741         delete srv->writebuf;
742         srv->writebuf = NULL;
743     }
744 
745     if (Comm::IsConnOpen(srv->writePipe))
746         srv->closeWritePipeSafely(hlp->id_name);
747 
748     dlinkDelete(&srv->link, &hlp->servers);
749 
750     assert(hlp->childs.n_running > 0);
751     -- hlp->childs.n_running;
752 
753     if (!srv->flags.shutdown) {
754         assert(hlp->childs.n_active > 0);
755         -- hlp->childs.n_active;
756         debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
757 
758         if (hlp->childs.needNew() > 0) {
759             debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
760 
761             if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
762                 if (srv->stats.replies < 1)
763                     fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
764                 else
765                     debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
766             }
767 
768             debugs(80, DBG_IMPORTANT, "Starting new helpers");
769             helperOpenServers(hlp);
770         }
771     }
772 
773     while (!srv->requests.empty()) {
774         // XXX: re-schedule these on another helper?
775         Helper::Xaction *r = srv->requests.front();
776         srv->requests.pop_front();
777         void *cbdata;
778 
779         if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
780             r->reply.result = Helper::Unknown;
781             r->request.callback(cbdata, r->reply);
782         }
783 
784         delete r;
785     }
786     srv->requestsIndex.clear();
787 
788     cbdataReferenceDone(srv->parent);
789     delete srv;
790 }
791 
792 static void
helperStatefulServerFree(helper_stateful_server * srv)793 helperStatefulServerFree(helper_stateful_server *srv)
794 {
795     statefulhelper *hlp = srv->parent;
796 
797     if (srv->rbuf) {
798         memFreeBuf(srv->rbuf_sz, srv->rbuf);
799         srv->rbuf = NULL;
800     }
801 
802 #if 0
803     srv->wqueue->clean();
804 
805     delete srv->wqueue;
806 
807 #endif
808 
809     /* TODO: walk the local queue of requests and carry them all out */
810     if (Comm::IsConnOpen(srv->writePipe))
811         srv->closeWritePipeSafely(hlp->id_name);
812 
813     dlinkDelete(&srv->link, &hlp->servers);
814 
815     assert(hlp->childs.n_running > 0);
816     -- hlp->childs.n_running;
817 
818     if (!srv->flags.shutdown) {
819         assert( hlp->childs.n_active > 0);
820         -- hlp->childs.n_active;
821         debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
822 
823         if (hlp->childs.needNew() > 0) {
824             debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
825 
826             if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
827                 if (srv->stats.replies < 1)
828                     fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
829                 else
830                     debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
831             }
832 
833             debugs(80, DBG_IMPORTANT, "Starting new helpers");
834             helperStatefulOpenServers(hlp);
835         }
836     }
837 
838     while (!srv->requests.empty()) {
839         // XXX: re-schedule these on another helper?
840         Helper::Xaction *r = srv->requests.front();
841         srv->requests.pop_front();
842         void *cbdata;
843 
844         if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
845             r->reply.result = Helper::Unknown;
846             r->request.callback(cbdata, r->reply);
847         }
848 
849         delete r;
850     }
851 
852     if (srv->data != NULL)
853         hlp->datapool->freeOne(srv->data);
854 
855     cbdataReferenceDone(srv->parent);
856 
857     delete srv;
858 }
859 
860 Helper::Xaction *
popRequest(int request_number)861 helper_server::popRequest(int request_number)
862 {
863     Helper::Xaction *r = nullptr;
864     helper_server::RequestIndex::iterator it;
865     if (parent->childs.concurrency) {
866         // If concurency supported retrieve request from ID
867         it = requestsIndex.find(request_number);
868         if (it != requestsIndex.end()) {
869             r = *(it->second);
870             requests.erase(it->second);
871             requestsIndex.erase(it);
872         }
873     } else if(!requests.empty()) {
874         // Else get the first request from queue, if any
875         r = requests.front();
876         requests.pop_front();
877     }
878 
879     return r;
880 }
881 
882 /// Calls back with a pointer to the buffer with the helper output
883 static void
helperReturnBuffer(helper_server * srv,helper * hlp,char * msg,size_t msgSize,char * msgEnd)884 helperReturnBuffer(helper_server * srv, helper * hlp, char * msg, size_t msgSize, char * msgEnd)
885 {
886     if (Helper::Xaction *r = srv->replyXaction) {
887         const bool hasSpace = r->reply.accumulate(msg, msgSize);
888         if (!hasSpace) {
889             debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
890                    "helper that overflowed " << srv->rbuf_sz << "-byte " <<
891                    "Squid input buffer: " << hlp->id_name << " #" << srv->index);
892             srv->closePipesSafely(hlp->id_name);
893             return;
894         }
895 
896         if (!msgEnd)
897             return; // We are waiting for more data.
898 
899         bool retry = false;
900         if (cbdataReferenceValid(r->request.data)) {
901             r->reply.finalize();
902             if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
903                 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
904                 retry = true;
905             } else {
906                 HLPCB *callback = r->request.callback;
907                 r->request.callback = nullptr;
908                 void *cbdata = nullptr;
909                 if (cbdataReferenceValidDone(r->request.data, &cbdata))
910                     callback(cbdata, r->reply);
911             }
912         }
913 
914         -- srv->stats.pending;
915         ++ srv->stats.replies;
916 
917         ++ hlp->stats.replies;
918 
919         srv->answer_time = current_time;
920 
921         srv->dispatch_time = r->request.dispatch_time;
922 
923         hlp->stats.avg_svc_time =
924             Math::intAverage(hlp->stats.avg_svc_time,
925                              tvSubMsec(r->request.dispatch_time, current_time),
926                              hlp->stats.replies, REDIRECT_AV_FACTOR);
927 
928         // release or re-submit parsedRequestXaction object
929         srv->replyXaction = nullptr;
930         if (retry) {
931             ++r->request.retries;
932             hlp->submitRequest(r);
933         } else
934             delete r;
935     }
936 
937     if (hlp->timeout && hlp->childs.concurrency)
938         srv->checkForTimedOutRequests(hlp->retryTimedOut);
939 
940     if (!srv->flags.shutdown) {
941         helperKickQueue(hlp);
942     } else if (!srv->flags.closing && !srv->stats.pending) {
943         srv->flags.closing=true;
944         srv->writePipe->close();
945     }
946 }
947 
948 static void
helperHandleRead(const Comm::ConnectionPointer & conn,char *,size_t len,Comm::Flag flag,int,void * data)949 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
950 {
951     helper_server *srv = (helper_server *)data;
952     helper *hlp = srv->parent;
953     assert(cbdataReferenceValid(data));
954 
955     /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
956 
957     if (flag == Comm::ERR_CLOSING) {
958         return;
959     }
960 
961     assert(conn->fd == srv->readPipe->fd);
962 
963     debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
964 
965     if (flag != Comm::OK || len == 0) {
966         srv->closePipesSafely(hlp->id_name);
967         return;
968     }
969 
970     srv->roffset += len;
971     srv->rbuf[srv->roffset] = '\0';
972     debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
973 
974     if (!srv->stats.pending && !srv->stats.timedout) {
975         /* someone spoke without being spoken to */
976         debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
977                hlp->id_name << " #" << srv->index << ", " << (int)len <<
978                " bytes '" << srv->rbuf << "'");
979 
980         srv->roffset = 0;
981         srv->rbuf[0] = '\0';
982     }
983 
984     bool needsMore = false;
985     char *msg = srv->rbuf;
986     while (*msg && !needsMore) {
987         int skip = 0;
988         char *eom = strchr(msg, hlp->eom);
989         if (eom) {
990             skip = 1;
991             debugs(84, 3, "helperHandleRead: end of reply found");
992             if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
993                 *eom = '\0';
994                 // rewind to the \r octet which is the real terminal now
995                 // and remember that we have to skip forward 2 places now.
996                 skip = 2;
997                 --eom;
998             }
999             *eom = '\0';
1000         }
1001 
1002         if (!srv->ignoreToEom && !srv->replyXaction) {
1003             int i = 0;
1004             if (hlp->childs.concurrency) {
1005                 char *e = NULL;
1006                 i = strtol(msg, &e, 10);
1007                 // Do we need to check for e == msg? Means wrong response from helper.
1008                 // Will be droped as "unexpected reply on channel 0"
1009                 needsMore = !(xisspace(*e) || (eom && e == eom));
1010                 if (!needsMore) {
1011                     msg = e;
1012                     while (*msg && xisspace(*msg))
1013                         ++msg;
1014                 } // else not enough data to compute request number
1015             }
1016             if (!(srv->replyXaction = srv->popRequest(i))) {
1017                 if (srv->stats.timedout) {
1018                     debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1019                 } else {
1020                     debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
1021                            i << " from " << hlp->id_name << " #" << srv->index <<
1022                            " '" << srv->rbuf << "'");
1023                 }
1024                 srv->ignoreToEom = true;
1025             }
1026         } // else we need to just append reply data to the current Xaction
1027 
1028         if (!needsMore) {
1029             size_t msgSize  = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1030             assert(msgSize <= srv->rbuf_sz);
1031             helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1032             msg += msgSize + skip;
1033             assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1034 
1035             // The next message should not ignored.
1036             if (eom && srv->ignoreToEom)
1037                 srv->ignoreToEom = false;
1038         } else
1039             assert(skip == 0 && eom == NULL);
1040     }
1041 
1042     if (needsMore) {
1043         size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1044         assert(msgSize <= srv->rbuf_sz);
1045         memmove(srv->rbuf, msg, msgSize);
1046         srv->roffset = msgSize;
1047         srv->rbuf[srv->roffset] = '\0';
1048     } else {
1049         // All of the responses parsed and msg points at the end of read data
1050         assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1051         srv->roffset = 0;
1052     }
1053 
1054     if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1055         int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1056         assert(spaceSize >= 0);
1057 
1058         AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1059                                              CommIoCbPtrFun(helperHandleRead, srv));
1060         comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1061     }
1062 }
1063 
1064 static void
helperStatefulHandleRead(const Comm::ConnectionPointer & conn,char *,size_t len,Comm::Flag flag,int,void * data)1065 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1066 {
1067     char *t = NULL;
1068     helper_stateful_server *srv = (helper_stateful_server *)data;
1069     statefulhelper *hlp = srv->parent;
1070     assert(cbdataReferenceValid(data));
1071 
1072     /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1073 
1074     if (flag == Comm::ERR_CLOSING) {
1075         return;
1076     }
1077 
1078     assert(conn->fd == srv->readPipe->fd);
1079 
1080     debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1081            hlp->id_name << " #" << srv->index);
1082 
1083     if (flag != Comm::OK || len == 0) {
1084         srv->closePipesSafely(hlp->id_name);
1085         return;
1086     }
1087 
1088     srv->roffset += len;
1089     srv->rbuf[srv->roffset] = '\0';
1090     Helper::Xaction *r = srv->requests.front();
1091     debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1092 
1093     if (r == NULL) {
1094         /* someone spoke without being spoken to */
1095         debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1096                hlp->id_name << " #" << srv->index << ", " << (int)len <<
1097                " bytes '" << srv->rbuf << "'");
1098 
1099         srv->roffset = 0;
1100     }
1101 
1102     if ((t = strchr(srv->rbuf, hlp->eom))) {
1103         debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1104 
1105         if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1106             *t = '\0';
1107             // rewind to the \r octet which is the real terminal now
1108             --t;
1109         }
1110 
1111         *t = '\0';
1112     }
1113 
1114     if (r && !r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1115         debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1116                "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1117                "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1118         srv->closePipesSafely(hlp->id_name);
1119         return;
1120     }
1121     /**
1122      * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1123      *      Doing this prohibits concurrency support with multiple replies per read().
1124      * TODO: check that read() setup on these buffers pays attention to roffest!=0
1125      * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1126      */
1127     srv->roffset = 0;
1128 
1129     if (t) {
1130         /* end of reply found */
1131         srv->requests.pop_front(); // we already have it in 'r'
1132         int called = 1;
1133 
1134         if (r && cbdataReferenceValid(r->request.data)) {
1135             r->reply.finalize();
1136             r->reply.whichServer = srv;
1137             r->request.callback(r->request.data, r->reply);
1138         } else {
1139             debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1140             called = 0;
1141         }
1142 
1143         delete r;
1144 
1145         -- srv->stats.pending;
1146         ++ srv->stats.replies;
1147 
1148         ++ hlp->stats.replies;
1149         srv->answer_time = current_time;
1150         hlp->stats.avg_svc_time =
1151             Math::intAverage(hlp->stats.avg_svc_time,
1152                              tvSubMsec(srv->dispatch_time, current_time),
1153                              hlp->stats.replies, REDIRECT_AV_FACTOR);
1154 
1155         if (called)
1156             helperStatefulServerDone(srv);
1157         else
1158             helperStatefulReleaseServer(srv);
1159     }
1160 
1161     if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1162         int spaceSize = srv->rbuf_sz - 1;
1163 
1164         AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1165                                              CommIoCbPtrFun(helperStatefulHandleRead, srv));
1166         comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1167     }
1168 }
1169 
1170 /// Handles a request when all running helpers, if any, are busy.
1171 static void
Enqueue(helper * hlp,Helper::Xaction * r)1172 Enqueue(helper * hlp, Helper::Xaction * r)
1173 {
1174     hlp->queue.push(r);
1175     ++ hlp->stats.queue_size;
1176 
1177     /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1178     if (hlp->childs.needNew() > 0) {
1179         debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1180         helperOpenServers(hlp);
1181         return;
1182     }
1183 
1184     if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1185         return;
1186 
1187     if (squid_curtime - hlp->last_queue_warn < 600)
1188         return;
1189 
1190     if (shutting_down || reconfiguring)
1191         return;
1192 
1193     hlp->last_queue_warn = squid_curtime;
1194 
1195     debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1196     debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1197     debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1198 }
1199 
1200 static void
StatefulEnqueue(statefulhelper * hlp,Helper::Xaction * r)1201 StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r)
1202 {
1203     hlp->queue.push(r);
1204     ++ hlp->stats.queue_size;
1205 
1206     /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1207     if (hlp->childs.needNew() > 0) {
1208         debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1209         helperStatefulOpenServers(hlp);
1210         return;
1211     }
1212 
1213     if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1214         return;
1215 
1216     if (squid_curtime - hlp->last_queue_warn < 600)
1217         return;
1218 
1219     if (shutting_down || reconfiguring)
1220         return;
1221 
1222     hlp->last_queue_warn = squid_curtime;
1223 
1224     debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1225     debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1226     debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1227 }
1228 
1229 Helper::Xaction *
nextRequest()1230 helper::nextRequest()
1231 {
1232     if (queue.empty())
1233         return nullptr;
1234 
1235     auto *r = queue.front();
1236     queue.pop();
1237     --stats.queue_size;
1238     return r;
1239 }
1240 
1241 static helper_server *
GetFirstAvailable(const helper * hlp)1242 GetFirstAvailable(const helper * hlp)
1243 {
1244     dlink_node *n;
1245     helper_server *srv;
1246     helper_server *selected = NULL;
1247     debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1248 
1249     if (hlp->childs.n_running == 0)
1250         return NULL;
1251 
1252     /* Find "least" loaded helper (approx) */
1253     for (n = hlp->servers.head; n != NULL; n = n->next) {
1254         srv = (helper_server *)n->data;
1255 
1256         if (selected && selected->stats.pending <= srv->stats.pending)
1257             continue;
1258 
1259         if (srv->flags.shutdown)
1260             continue;
1261 
1262         if (!srv->stats.pending)
1263             return srv;
1264 
1265         if (selected) {
1266             selected = srv;
1267             break;
1268         }
1269 
1270         selected = srv;
1271     }
1272 
1273     if (!selected) {
1274         debugs(84, 5, "GetFirstAvailable: None available.");
1275         return NULL;
1276     }
1277 
1278     if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1279         debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1280         return NULL;
1281     }
1282 
1283     debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1284     return selected;
1285 }
1286 
1287 static helper_stateful_server *
StatefulGetFirstAvailable(const statefulhelper * hlp)1288 StatefulGetFirstAvailable(const statefulhelper * hlp)
1289 {
1290     dlink_node *n;
1291     helper_stateful_server *srv = NULL;
1292     debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1293 
1294     if (hlp->childs.n_running == 0)
1295         return NULL;
1296 
1297     for (n = hlp->servers.head; n != NULL; n = n->next) {
1298         srv = (helper_stateful_server *)n->data;
1299 
1300         if (srv->stats.pending)
1301             continue;
1302 
1303         if (srv->flags.reserved)
1304             continue;
1305 
1306         if (srv->flags.shutdown)
1307             continue;
1308 
1309         debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1310         return srv;
1311     }
1312 
1313     debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1314     return NULL;
1315 }
1316 
1317 static void
helperDispatchWriteDone(const Comm::ConnectionPointer &,char *,size_t,Comm::Flag flag,int,void * data)1318 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1319 {
1320     helper_server *srv = (helper_server *)data;
1321 
1322     srv->writebuf->clean();
1323     delete srv->writebuf;
1324     srv->writebuf = NULL;
1325     srv->flags.writing = false;
1326 
1327     if (flag != Comm::OK) {
1328         /* Helper server has crashed */
1329         debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1330         return;
1331     }
1332 
1333     if (!srv->wqueue->isNull()) {
1334         srv->writebuf = srv->wqueue;
1335         srv->wqueue = new MemBuf;
1336         srv->flags.writing = true;
1337         AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1338                                              CommIoCbPtrFun(helperDispatchWriteDone, srv));
1339         Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1340     }
1341 }
1342 
1343 static void
helperDispatch(helper_server * srv,Helper::Xaction * r)1344 helperDispatch(helper_server * srv, Helper::Xaction * r)
1345 {
1346     helper *hlp = srv->parent;
1347     const uint64_t reqId = ++srv->nextRequestId;
1348 
1349     if (!cbdataReferenceValid(r->request.data)) {
1350         debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1351         delete r;
1352         return;
1353     }
1354 
1355     r->request.Id = reqId;
1356     helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
1357     r->request.dispatch_time = current_time;
1358 
1359     if (srv->wqueue->isNull())
1360         srv->wqueue->init();
1361 
1362     if (hlp->childs.concurrency) {
1363         srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
1364         assert(srv->requestsIndex.size() == srv->requests.size());
1365         srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1366     } else
1367         srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1368 
1369     if (!srv->flags.writing) {
1370         assert(NULL == srv->writebuf);
1371         srv->writebuf = srv->wqueue;
1372         srv->wqueue = new MemBuf;
1373         srv->flags.writing = true;
1374         AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1375                                              CommIoCbPtrFun(helperDispatchWriteDone, srv));
1376         Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1377     }
1378 
1379     debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1380 
1381     ++ srv->stats.uses;
1382     ++ srv->stats.pending;
1383     ++ hlp->stats.requests;
1384 }
1385 
1386 static void
helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &,char *,size_t,Comm::Flag,int,void *)1387 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
1388 {}
1389 
1390 static void
helperStatefulDispatch(helper_stateful_server * srv,Helper::Xaction * r)1391 helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
1392 {
1393     statefulhelper *hlp = srv->parent;
1394 
1395     if (!cbdataReferenceValid(r->request.data)) {
1396         debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1397         delete r;
1398         helperStatefulReleaseServer(srv);
1399         return;
1400     }
1401 
1402     debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1403 
1404     if (r->request.placeholder == 1) {
1405         /* a callback is needed before this request can _use_ a helper. */
1406         /* we don't care about releasing this helper. The request NEVER
1407          * gets to the helper. So we throw away the return code */
1408         r->reply.result = Helper::Unknown;
1409         r->reply.whichServer = srv;
1410         r->request.callback(r->request.data, r->reply);
1411         /* throw away the placeholder */
1412         delete r;
1413         /* and push the queue. Note that the callback may have submitted a new
1414          * request to the helper which is why we test for the request */
1415 
1416         if (!srv->requests.size())
1417             helperStatefulServerDone(srv);
1418 
1419         return;
1420     }
1421 
1422     srv->flags.reserved = true;
1423     srv->requests.push_back(r);
1424     srv->dispatch_time = current_time;
1425     AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1426                                          CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1427     Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, NULL);
1428     debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1429            hlp->id_name << " #" << srv->index << ", " <<
1430            (int) strlen(r->request.buf) << " bytes");
1431 
1432     ++ srv->stats.uses;
1433     ++ srv->stats.pending;
1434     ++ hlp->stats.requests;
1435 }
1436 
1437 static void
helperKickQueue(helper * hlp)1438 helperKickQueue(helper * hlp)
1439 {
1440     Helper::Xaction *r;
1441     helper_server *srv;
1442 
1443     while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1444         helperDispatch(srv, r);
1445 }
1446 
1447 static void
helperStatefulKickQueue(statefulhelper * hlp)1448 helperStatefulKickQueue(statefulhelper * hlp)
1449 {
1450     Helper::Xaction *r;
1451     helper_stateful_server *srv;
1452 
1453     while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1454         helperStatefulDispatch(srv, r);
1455 }
1456 
1457 static void
helperStatefulServerDone(helper_stateful_server * srv)1458 helperStatefulServerDone(helper_stateful_server * srv)
1459 {
1460     if (!srv->flags.shutdown) {
1461         helperStatefulKickQueue(srv->parent);
1462     } else if (!srv->flags.closing && !srv->flags.reserved && !srv->stats.pending) {
1463         srv->closeWritePipeSafely(srv->parent->id_name);
1464         return;
1465     }
1466 }
1467 
1468 void
checkForTimedOutRequests(bool const retry)1469 helper_server::checkForTimedOutRequests(bool const retry)
1470 {
1471     assert(parent->childs.concurrency);
1472     while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1473         Helper::Xaction *r = requests.front();
1474         RequestIndex::iterator it;
1475         it = requestsIndex.find(r->request.Id);
1476         assert(it != requestsIndex.end());
1477         requestsIndex.erase(it);
1478         requests.pop_front();
1479         debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1480         void *cbdata;
1481         bool retried = false;
1482         if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1483             debugs(84, 2, "Retry request " << r->request.Id);
1484             ++r->request.retries;
1485             parent->submitRequest(r);
1486             retried = true;
1487         } else if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
1488             if (!parent->onTimedOutResponse.isEmpty()) {
1489                 if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1490                     r->reply.finalize();
1491                 else
1492                     r->reply.result = Helper::TimedOut;
1493                 r->request.callback(cbdata, r->reply);
1494             } else {
1495                 r->reply.result = Helper::TimedOut;
1496                 r->request.callback(cbdata, r->reply);
1497             }
1498         }
1499         --stats.pending;
1500         ++stats.timedout;
1501         ++parent->stats.timedout;
1502         if (!retried)
1503             delete r;
1504     }
1505 }
1506 
1507 void
requestTimeout(const CommTimeoutCbParams & io)1508 helper_server::requestTimeout(const CommTimeoutCbParams &io)
1509 {
1510     debugs(26, 3, HERE << io.conn);
1511     helper_server *srv = static_cast<helper_server *>(io.data);
1512 
1513     if (!cbdataReferenceValid(srv))
1514         return;
1515 
1516     srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1517 
1518     debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
1519     AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
1520                                      CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
1521 
1522     const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1523     const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
1524 
1525     commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1526 }
1527 
1528