1 /*
2 * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 84 Helper process maintenance */
10
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "comm.h"
15 #include "comm/Connection.h"
16 #include "comm/Read.h"
17 #include "comm/Write.h"
18 #include "fd.h"
19 #include "fde.h"
20 #include "format/Quoting.h"
21 #include "helper.h"
22 #include "helper/Reply.h"
23 #include "helper/Request.h"
24 #include "MemBuf.h"
25 #include "SquidConfig.h"
26 #include "SquidIpc.h"
27 #include "SquidMath.h"
28 #include "SquidTime.h"
29 #include "Store.h"
30 #include "wordlist.h"
31
32 // helper_stateful_server::data uses explicit alloc()/freeOne() */
33 #include "mem/Pool.h"
34
35 #define HELPER_MAX_ARGS 64
36
37 /// The maximum allowed request retries.
38 #define MAX_RETRIES 2
39
40 /// Helpers input buffer size.
41 const size_t ReadBufSize(32*1024);
42
43 static IOCB helperHandleRead;
44 static IOCB helperStatefulHandleRead;
45 static void helperServerFree(helper_server *srv);
46 static void helperStatefulServerFree(helper_stateful_server *srv);
47 static void Enqueue(helper * hlp, Helper::Xaction *);
48 static helper_server *GetFirstAvailable(const helper * hlp);
49 static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper * hlp);
50 static void helperDispatch(helper_server * srv, Helper::Xaction * r);
51 static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
52 static void helperKickQueue(helper * hlp);
53 static void helperStatefulKickQueue(statefulhelper * hlp);
54 static void helperStatefulServerDone(helper_stateful_server * srv);
55 static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
56
57 CBDATA_CLASS_INIT(helper);
58 CBDATA_CLASS_INIT(helper_server);
59 CBDATA_CLASS_INIT(statefulhelper);
60 CBDATA_CLASS_INIT(helper_stateful_server);
61
62 InstanceIdDefinitions(HelperServerBase, "Hlpr");
63
64 void
initStats()65 HelperServerBase::initStats()
66 {
67 stats.uses=0;
68 stats.replies=0;
69 stats.pending=0;
70 stats.releases=0;
71 stats.timedout = 0;
72 }
73
74 void
closePipesSafely(const char * id_name)75 HelperServerBase::closePipesSafely(const char *id_name)
76 {
77 #if _SQUID_WINDOWS_
78 shutdown(writePipe->fd, SD_BOTH);
79 #endif
80
81 flags.closing = true;
82 if (readPipe->fd == writePipe->fd)
83 readPipe->fd = -1;
84 else
85 readPipe->close();
86 writePipe->close();
87
88 #if _SQUID_WINDOWS_
89 if (hIpc) {
90 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
91 getCurrentTime();
92 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
93 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
94 }
95 CloseHandle(hIpc);
96 }
97 #endif
98 }
99
100 void
closeWritePipeSafely(const char * id_name)101 HelperServerBase::closeWritePipeSafely(const char *id_name)
102 {
103 #if _SQUID_WINDOWS_
104 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
105 #endif
106
107 flags.closing = true;
108 if (readPipe->fd == writePipe->fd)
109 readPipe->fd = -1;
110 writePipe->close();
111
112 #if _SQUID_WINDOWS_
113 if (hIpc) {
114 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
115 getCurrentTime();
116 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
117 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
118 }
119 CloseHandle(hIpc);
120 }
121 #endif
122 }
123
124 void
helperOpenServers(helper * hlp)125 helperOpenServers(helper * hlp)
126 {
127 char *s;
128 char *progname;
129 char *shortname;
130 char *procname;
131 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
132 char fd_note_buf[FD_DESC_SZ];
133 helper_server *srv;
134 int nargs = 0;
135 int k;
136 pid_t pid;
137 int rfd;
138 int wfd;
139 void * hIpc;
140 wordlist *w;
141
142 if (hlp->cmdline == NULL)
143 return;
144
145 progname = hlp->cmdline->key;
146
147 if ((s = strrchr(progname, '/')))
148 shortname = xstrdup(s + 1);
149 else
150 shortname = xstrdup(progname);
151
152 /* figure out how many new child are actually needed. */
153 int need_new = hlp->childs.needNew();
154
155 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
156
157 if (need_new < 1) {
158 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
159 }
160
161 procname = (char *)xmalloc(strlen(shortname) + 3);
162
163 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
164
165 args[nargs] = procname;
166 ++nargs;
167
168 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
169 args[nargs] = w->key;
170 ++nargs;
171 }
172
173 args[nargs] = NULL;
174 ++nargs;
175
176 assert(nargs <= HELPER_MAX_ARGS);
177
178 for (k = 0; k < need_new; ++k) {
179 getCurrentTime();
180 rfd = wfd = -1;
181 pid = ipcCreate(hlp->ipc_type,
182 progname,
183 args,
184 shortname,
185 hlp->addr,
186 &rfd,
187 &wfd,
188 &hIpc);
189
190 if (pid < 0) {
191 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
192 continue;
193 }
194
195 ++ hlp->childs.n_running;
196 ++ hlp->childs.n_active;
197 srv = new helper_server;
198 srv->hIpc = hIpc;
199 srv->pid = pid;
200 srv->initStats();
201 srv->addr = hlp->addr;
202 srv->readPipe = new Comm::Connection;
203 srv->readPipe->fd = rfd;
204 srv->writePipe = new Comm::Connection;
205 srv->writePipe->fd = wfd;
206 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
207 srv->wqueue = new MemBuf;
208 srv->roffset = 0;
209 srv->nextRequestId = 0;
210 srv->replyXaction = NULL;
211 srv->ignoreToEom = false;
212 srv->parent = cbdataReference(hlp);
213 dlinkAddTail(srv, &srv->link, &hlp->servers);
214
215 if (rfd == wfd) {
216 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
217 fd_note(rfd, fd_note_buf);
218 } else {
219 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
220 fd_note(rfd, fd_note_buf);
221 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
222 fd_note(wfd, fd_note_buf);
223 }
224
225 commSetNonBlocking(rfd);
226
227 if (wfd != rfd)
228 commSetNonBlocking(wfd);
229
230 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
231 comm_add_close_handler(rfd, closeCall);
232
233 if (hlp->timeout && hlp->childs.concurrency) {
234 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
235 CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
236 commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
237 }
238
239 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
240 CommIoCbPtrFun(helperHandleRead, srv));
241 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
242 }
243
244 hlp->last_restart = squid_curtime;
245 safe_free(shortname);
246 safe_free(procname);
247 helperKickQueue(hlp);
248 }
249
250 /**
251 * DPW 2007-05-08
252 *
253 * helperStatefulOpenServers: create the stateful child helper processes
254 */
255 void
helperStatefulOpenServers(statefulhelper * hlp)256 helperStatefulOpenServers(statefulhelper * hlp)
257 {
258 char *shortname;
259 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
260 char fd_note_buf[FD_DESC_SZ];
261 int nargs = 0;
262
263 if (hlp->cmdline == NULL)
264 return;
265
266 if (hlp->childs.concurrency)
267 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
268
269 char *progname = hlp->cmdline->key;
270
271 char *s;
272 if ((s = strrchr(progname, '/')))
273 shortname = xstrdup(s + 1);
274 else
275 shortname = xstrdup(progname);
276
277 /* figure out haw mant new helpers are needed. */
278 int need_new = hlp->childs.needNew();
279
280 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
281
282 if (need_new < 1) {
283 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
284 }
285
286 char *procname = (char *)xmalloc(strlen(shortname) + 3);
287
288 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
289
290 args[nargs] = procname;
291 ++nargs;
292
293 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
294 args[nargs] = w->key;
295 ++nargs;
296 }
297
298 args[nargs] = NULL;
299 ++nargs;
300
301 assert(nargs <= HELPER_MAX_ARGS);
302
303 for (int k = 0; k < need_new; ++k) {
304 getCurrentTime();
305 int rfd = -1;
306 int wfd = -1;
307 void * hIpc;
308 pid_t pid = ipcCreate(hlp->ipc_type,
309 progname,
310 args,
311 shortname,
312 hlp->addr,
313 &rfd,
314 &wfd,
315 &hIpc);
316
317 if (pid < 0) {
318 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
319 continue;
320 }
321
322 ++ hlp->childs.n_running;
323 ++ hlp->childs.n_active;
324 helper_stateful_server *srv = new helper_stateful_server;
325 srv->hIpc = hIpc;
326 srv->pid = pid;
327 srv->flags.reserved = false;
328 srv->initStats();
329 srv->addr = hlp->addr;
330 srv->readPipe = new Comm::Connection;
331 srv->readPipe->fd = rfd;
332 srv->writePipe = new Comm::Connection;
333 srv->writePipe->fd = wfd;
334 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
335 srv->roffset = 0;
336 srv->parent = cbdataReference(hlp);
337
338 if (hlp->datapool != NULL)
339 srv->data = hlp->datapool->alloc();
340
341 dlinkAddTail(srv, &srv->link, &hlp->servers);
342
343 if (rfd == wfd) {
344 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
345 fd_note(rfd, fd_note_buf);
346 } else {
347 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
348 fd_note(rfd, fd_note_buf);
349 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
350 fd_note(wfd, fd_note_buf);
351 }
352
353 commSetNonBlocking(rfd);
354
355 if (wfd != rfd)
356 commSetNonBlocking(wfd);
357
358 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
359 comm_add_close_handler(rfd, closeCall);
360
361 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
362 CommIoCbPtrFun(helperStatefulHandleRead, srv));
363 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
364 }
365
366 hlp->last_restart = squid_curtime;
367 safe_free(shortname);
368 safe_free(procname);
369 helperStatefulKickQueue(hlp);
370 }
371
372 void
submitRequest(Helper::Xaction * r)373 helper::submitRequest(Helper::Xaction *r)
374 {
375 helper_server *srv;
376
377 if ((srv = GetFirstAvailable(this)))
378 helperDispatch(srv, r);
379 else
380 Enqueue(this, r);
381
382 syncQueueStats();
383 }
384
385 /// handles helperSubmit() and helperStatefulSubmit() failures
386 static void
SubmissionFailure(helper * hlp,HLPCB * callback,void * data)387 SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
388 {
389 auto result = Helper::Error;
390 if (!hlp) {
391 debugs(84, 3, "no helper");
392 result = Helper::Unknown;
393 }
394 // else pretend the helper has responded with ERR
395
396 callback(data, Helper::Reply(result));
397 }
398
399 void
helperSubmit(helper * hlp,const char * buf,HLPCB * callback,void * data)400 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
401 {
402 if (!hlp || !hlp->trySubmit(buf, callback, data))
403 SubmissionFailure(hlp, callback, data);
404 }
405
406 /// whether queuing an additional request would overload the helper
407 bool
queueFull() const408 helper::queueFull() const {
409 return stats.queue_size >= static_cast<int>(childs.queue_size);
410 }
411
412 bool
overloaded() const413 helper::overloaded() const {
414 return stats.queue_size > static_cast<int>(childs.queue_size);
415 }
416
417 /// synchronizes queue-dependent measurements with the current queue state
418 void
syncQueueStats()419 helper::syncQueueStats()
420 {
421 if (overloaded()) {
422 if (overloadStart) {
423 debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
424 } else {
425 overloadStart = squid_curtime;
426 debugs(84, 3, id_name << " became overloaded");
427 }
428 } else {
429 if (overloadStart) {
430 debugs(84, 5, id_name << " is no longer overloaded");
431 if (droppedRequests) {
432 debugs(84, DBG_IMPORTANT, "helper " << id_name <<
433 " is no longer overloaded after dropping " << droppedRequests <<
434 " requests in " << (squid_curtime - overloadStart) << " seconds");
435 droppedRequests = 0;
436 }
437 overloadStart = 0;
438 }
439 }
440 }
441
442 /// prepares the helper for request submission
443 /// returns true if and only if the submission should proceed
444 /// may kill Squid if the helper remains overloaded for too long
445 bool
prepSubmit()446 helper::prepSubmit()
447 {
448 // re-sync for the configuration may have changed since the last submission
449 syncQueueStats();
450
451 // Nothing special to do if the new request does not overload (i.e., the
452 // queue is not even full yet) or only _starts_ overloading this helper
453 // (i.e., the queue is currently at its limit).
454 if (!overloaded())
455 return true;
456
457 if (squid_curtime - overloadStart <= 180)
458 return true; // also OK: overload has not persisted long enough to panic
459
460 if (childs.onPersistentOverload == Helper::ChildConfig::actDie)
461 fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
462
463 if (!droppedRequests) {
464 debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
465 id_name << " helper configured with on-persistent-overload=err");
466 }
467 ++droppedRequests;
468 debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
469 return false;
470 }
471
472 bool
trySubmit(const char * buf,HLPCB * callback,void * data)473 helper::trySubmit(const char *buf, HLPCB * callback, void *data)
474 {
475 if (!prepSubmit())
476 return false; // request was dropped
477
478 submit(buf, callback, data); // will send or queue
479 return true; // request submitted or queued
480 }
481
482 /// dispatches or enqueues a helper requests; does not enforce queue limits
483 void
submit(const char * buf,HLPCB * callback,void * data)484 helper::submit(const char *buf, HLPCB * callback, void *data)
485 {
486 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
487 submitRequest(r);
488 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
489 }
490
491 /// lastserver = "server last used as part of a reserved request sequence"
492 void
helperStatefulSubmit(statefulhelper * hlp,const char * buf,HLPCB * callback,void * data,helper_stateful_server * lastserver)493 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
494 {
495 if (!hlp || !hlp->trySubmit(buf, callback, data, lastserver))
496 SubmissionFailure(hlp, callback, data);
497 }
498
499 /// If possible, submit request. Otherwise, either kill Squid or return false.
500 bool
trySubmit(const char * buf,HLPCB * callback,void * data,helper_stateful_server * lastserver)501 statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, helper_stateful_server *lastserver)
502 {
503 if (!prepSubmit())
504 return false; // request was dropped
505
506 submit(buf, callback, data, lastserver); // will send or queue
507 return true; // request submitted or queued
508 }
509
submit(const char * buf,HLPCB * callback,void * data,helper_stateful_server * lastserver)510 void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
511 {
512 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
513
514 if ((buf != NULL) && lastserver) {
515 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
516 assert(lastserver->flags.reserved);
517 assert(!lastserver->requests.size());
518
519 debugs(84, 5, "StatefulSubmit dispatching");
520 helperStatefulDispatch(lastserver, r);
521 } else {
522 helper_stateful_server *srv;
523 if ((srv = StatefulGetFirstAvailable(this))) {
524 helperStatefulDispatch(srv, r);
525 } else
526 StatefulEnqueue(this, r);
527 }
528
529 debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
530 "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
531
532 syncQueueStats();
533 }
534
535 /**
536 * DPW 2007-05-08
537 *
538 * helperStatefulReleaseServer tells the helper that whoever was
539 * using it no longer needs its services.
540 */
541 void
helperStatefulReleaseServer(helper_stateful_server * srv)542 helperStatefulReleaseServer(helper_stateful_server * srv)
543 {
544 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
545 if (!srv->flags.reserved)
546 return;
547
548 ++ srv->stats.releases;
549
550 srv->flags.reserved = false;
551
552 helperStatefulServerDone(srv);
553 }
554
555 /** return a pointer to the stateful routines data area */
556 void *
helperStatefulServerGetData(helper_stateful_server * srv)557 helperStatefulServerGetData(helper_stateful_server * srv)
558 {
559 return srv->data;
560 }
561
562 void
packStatsInto(Packable * p,const char * label) const563 helper::packStatsInto(Packable *p, const char *label) const
564 {
565 if (label)
566 p->appendf("%s:\n", label);
567
568 p->appendf(" program: %s\n", cmdline->key);
569 p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
570 p->appendf(" requests sent: %d\n", stats.requests);
571 p->appendf(" replies received: %d\n", stats.replies);
572 p->appendf(" requests timedout: %d\n", stats.timedout);
573 p->appendf(" queue length: %d\n", stats.queue_size);
574 p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
575 p->append("\n",1);
576 p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
577 "ID #",
578 "FD",
579 "PID",
580 "# Requests",
581 "# Replies",
582 "# Timed-out",
583 "Flags",
584 "Time",
585 "Offset",
586 "Request");
587
588 for (dlink_node *link = servers.head; link; link = link->next) {
589 HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
590 assert(srv);
591 Helper::Xaction *xaction = srv->requests.empty() ? NULL : srv->requests.front();
592 double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
593 p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
594 srv->index.value,
595 srv->readPipe->fd,
596 srv->pid,
597 srv->stats.uses,
598 srv->stats.replies,
599 srv->stats.timedout,
600 srv->stats.pending ? 'B' : ' ',
601 srv->flags.writing ? 'W' : ' ',
602 srv->flags.closing ? 'C' : ' ',
603 srv->flags.reserved ? 'R' : ' ',
604 srv->flags.shutdown ? 'S' : ' ',
605 xaction && xaction->request.placeholder ? 'P' : ' ',
606 tt < 0.0 ? 0.0 : tt,
607 (int) srv->roffset,
608 xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
609 }
610
611 p->append("\nFlags key:\n"
612 " B\tBUSY\n"
613 " W\tWRITING\n"
614 " C\tCLOSING\n"
615 " R\tRESERVED\n"
616 " S\tSHUTDOWN PENDING\n"
617 " P\tPLACEHOLDER\n", 101);
618 }
619
620 bool
willOverload() const621 helper::willOverload() const {
622 return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
623 }
624
625 void
helperShutdown(helper * hlp)626 helperShutdown(helper * hlp)
627 {
628 dlink_node *link = hlp->servers.head;
629
630 while (link) {
631 helper_server *srv;
632 srv = (helper_server *)link->data;
633 link = link->next;
634
635 if (srv->flags.shutdown) {
636 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
637 continue;
638 }
639
640 assert(hlp->childs.n_active > 0);
641 -- hlp->childs.n_active;
642 srv->flags.shutdown = true; /* request it to shut itself down */
643
644 if (srv->flags.closing) {
645 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
646 continue;
647 }
648
649 if (srv->stats.pending) {
650 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
651 continue;
652 }
653
654 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
655 /* the rest of the details is dealt with in the helperServerFree
656 * close handler
657 */
658 srv->closePipesSafely(hlp->id_name);
659 }
660 }
661
662 void
helperStatefulShutdown(statefulhelper * hlp)663 helperStatefulShutdown(statefulhelper * hlp)
664 {
665 dlink_node *link = hlp->servers.head;
666 helper_stateful_server *srv;
667
668 while (link) {
669 srv = (helper_stateful_server *)link->data;
670 link = link->next;
671
672 if (srv->flags.shutdown) {
673 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
674 continue;
675 }
676
677 assert(hlp->childs.n_active > 0);
678 -- hlp->childs.n_active;
679 srv->flags.shutdown = true; /* request it to shut itself down */
680
681 if (srv->stats.pending) {
682 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
683 continue;
684 }
685
686 if (srv->flags.closing) {
687 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
688 continue;
689 }
690
691 if (srv->flags.reserved) {
692 if (shutting_down) {
693 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
694 } else {
695 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
696 continue;
697 }
698 }
699
700 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
701
702 /* the rest of the details is dealt with in the helperStatefulServerFree
703 * close handler
704 */
705 srv->closePipesSafely(hlp->id_name);
706 }
707 }
708
~helper()709 helper::~helper()
710 {
711 /* note, don't free id_name, it probably points to static memory */
712
713 // TODO: if the queue is not empty it will leak Helper::Request's
714 if (!queue.empty())
715 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
716 }
717
718 /* ====================================================================== */
719 /* LOCAL FUNCTIONS */
720 /* ====================================================================== */
721
722 static void
helperServerFree(helper_server * srv)723 helperServerFree(helper_server *srv)
724 {
725 helper *hlp = srv->parent;
726 int concurrency = hlp->childs.concurrency;
727
728 if (!concurrency)
729 concurrency = 1;
730
731 if (srv->rbuf) {
732 memFreeBuf(srv->rbuf_sz, srv->rbuf);
733 srv->rbuf = NULL;
734 }
735
736 srv->wqueue->clean();
737 delete srv->wqueue;
738
739 if (srv->writebuf) {
740 srv->writebuf->clean();
741 delete srv->writebuf;
742 srv->writebuf = NULL;
743 }
744
745 if (Comm::IsConnOpen(srv->writePipe))
746 srv->closeWritePipeSafely(hlp->id_name);
747
748 dlinkDelete(&srv->link, &hlp->servers);
749
750 assert(hlp->childs.n_running > 0);
751 -- hlp->childs.n_running;
752
753 if (!srv->flags.shutdown) {
754 assert(hlp->childs.n_active > 0);
755 -- hlp->childs.n_active;
756 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
757
758 if (hlp->childs.needNew() > 0) {
759 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
760
761 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
762 if (srv->stats.replies < 1)
763 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
764 else
765 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
766 }
767
768 debugs(80, DBG_IMPORTANT, "Starting new helpers");
769 helperOpenServers(hlp);
770 }
771 }
772
773 while (!srv->requests.empty()) {
774 // XXX: re-schedule these on another helper?
775 Helper::Xaction *r = srv->requests.front();
776 srv->requests.pop_front();
777 void *cbdata;
778
779 if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
780 r->reply.result = Helper::Unknown;
781 r->request.callback(cbdata, r->reply);
782 }
783
784 delete r;
785 }
786 srv->requestsIndex.clear();
787
788 cbdataReferenceDone(srv->parent);
789 delete srv;
790 }
791
792 static void
helperStatefulServerFree(helper_stateful_server * srv)793 helperStatefulServerFree(helper_stateful_server *srv)
794 {
795 statefulhelper *hlp = srv->parent;
796
797 if (srv->rbuf) {
798 memFreeBuf(srv->rbuf_sz, srv->rbuf);
799 srv->rbuf = NULL;
800 }
801
802 #if 0
803 srv->wqueue->clean();
804
805 delete srv->wqueue;
806
807 #endif
808
809 /* TODO: walk the local queue of requests and carry them all out */
810 if (Comm::IsConnOpen(srv->writePipe))
811 srv->closeWritePipeSafely(hlp->id_name);
812
813 dlinkDelete(&srv->link, &hlp->servers);
814
815 assert(hlp->childs.n_running > 0);
816 -- hlp->childs.n_running;
817
818 if (!srv->flags.shutdown) {
819 assert( hlp->childs.n_active > 0);
820 -- hlp->childs.n_active;
821 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
822
823 if (hlp->childs.needNew() > 0) {
824 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
825
826 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
827 if (srv->stats.replies < 1)
828 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
829 else
830 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
831 }
832
833 debugs(80, DBG_IMPORTANT, "Starting new helpers");
834 helperStatefulOpenServers(hlp);
835 }
836 }
837
838 while (!srv->requests.empty()) {
839 // XXX: re-schedule these on another helper?
840 Helper::Xaction *r = srv->requests.front();
841 srv->requests.pop_front();
842 void *cbdata;
843
844 if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
845 r->reply.result = Helper::Unknown;
846 r->request.callback(cbdata, r->reply);
847 }
848
849 delete r;
850 }
851
852 if (srv->data != NULL)
853 hlp->datapool->freeOne(srv->data);
854
855 cbdataReferenceDone(srv->parent);
856
857 delete srv;
858 }
859
860 Helper::Xaction *
popRequest(int request_number)861 helper_server::popRequest(int request_number)
862 {
863 Helper::Xaction *r = nullptr;
864 helper_server::RequestIndex::iterator it;
865 if (parent->childs.concurrency) {
866 // If concurency supported retrieve request from ID
867 it = requestsIndex.find(request_number);
868 if (it != requestsIndex.end()) {
869 r = *(it->second);
870 requests.erase(it->second);
871 requestsIndex.erase(it);
872 }
873 } else if(!requests.empty()) {
874 // Else get the first request from queue, if any
875 r = requests.front();
876 requests.pop_front();
877 }
878
879 return r;
880 }
881
882 /// Calls back with a pointer to the buffer with the helper output
883 static void
helperReturnBuffer(helper_server * srv,helper * hlp,char * msg,size_t msgSize,char * msgEnd)884 helperReturnBuffer(helper_server * srv, helper * hlp, char * msg, size_t msgSize, char * msgEnd)
885 {
886 if (Helper::Xaction *r = srv->replyXaction) {
887 const bool hasSpace = r->reply.accumulate(msg, msgSize);
888 if (!hasSpace) {
889 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
890 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
891 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
892 srv->closePipesSafely(hlp->id_name);
893 return;
894 }
895
896 if (!msgEnd)
897 return; // We are waiting for more data.
898
899 bool retry = false;
900 if (cbdataReferenceValid(r->request.data)) {
901 r->reply.finalize();
902 if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
903 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
904 retry = true;
905 } else {
906 HLPCB *callback = r->request.callback;
907 r->request.callback = nullptr;
908 void *cbdata = nullptr;
909 if (cbdataReferenceValidDone(r->request.data, &cbdata))
910 callback(cbdata, r->reply);
911 }
912 }
913
914 -- srv->stats.pending;
915 ++ srv->stats.replies;
916
917 ++ hlp->stats.replies;
918
919 srv->answer_time = current_time;
920
921 srv->dispatch_time = r->request.dispatch_time;
922
923 hlp->stats.avg_svc_time =
924 Math::intAverage(hlp->stats.avg_svc_time,
925 tvSubMsec(r->request.dispatch_time, current_time),
926 hlp->stats.replies, REDIRECT_AV_FACTOR);
927
928 // release or re-submit parsedRequestXaction object
929 srv->replyXaction = nullptr;
930 if (retry) {
931 ++r->request.retries;
932 hlp->submitRequest(r);
933 } else
934 delete r;
935 }
936
937 if (hlp->timeout && hlp->childs.concurrency)
938 srv->checkForTimedOutRequests(hlp->retryTimedOut);
939
940 if (!srv->flags.shutdown) {
941 helperKickQueue(hlp);
942 } else if (!srv->flags.closing && !srv->stats.pending) {
943 srv->flags.closing=true;
944 srv->writePipe->close();
945 }
946 }
947
948 static void
helperHandleRead(const Comm::ConnectionPointer & conn,char *,size_t len,Comm::Flag flag,int,void * data)949 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
950 {
951 helper_server *srv = (helper_server *)data;
952 helper *hlp = srv->parent;
953 assert(cbdataReferenceValid(data));
954
955 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
956
957 if (flag == Comm::ERR_CLOSING) {
958 return;
959 }
960
961 assert(conn->fd == srv->readPipe->fd);
962
963 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
964
965 if (flag != Comm::OK || len == 0) {
966 srv->closePipesSafely(hlp->id_name);
967 return;
968 }
969
970 srv->roffset += len;
971 srv->rbuf[srv->roffset] = '\0';
972 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
973
974 if (!srv->stats.pending && !srv->stats.timedout) {
975 /* someone spoke without being spoken to */
976 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
977 hlp->id_name << " #" << srv->index << ", " << (int)len <<
978 " bytes '" << srv->rbuf << "'");
979
980 srv->roffset = 0;
981 srv->rbuf[0] = '\0';
982 }
983
984 bool needsMore = false;
985 char *msg = srv->rbuf;
986 while (*msg && !needsMore) {
987 int skip = 0;
988 char *eom = strchr(msg, hlp->eom);
989 if (eom) {
990 skip = 1;
991 debugs(84, 3, "helperHandleRead: end of reply found");
992 if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
993 *eom = '\0';
994 // rewind to the \r octet which is the real terminal now
995 // and remember that we have to skip forward 2 places now.
996 skip = 2;
997 --eom;
998 }
999 *eom = '\0';
1000 }
1001
1002 if (!srv->ignoreToEom && !srv->replyXaction) {
1003 int i = 0;
1004 if (hlp->childs.concurrency) {
1005 char *e = NULL;
1006 i = strtol(msg, &e, 10);
1007 // Do we need to check for e == msg? Means wrong response from helper.
1008 // Will be droped as "unexpected reply on channel 0"
1009 needsMore = !(xisspace(*e) || (eom && e == eom));
1010 if (!needsMore) {
1011 msg = e;
1012 while (*msg && xisspace(*msg))
1013 ++msg;
1014 } // else not enough data to compute request number
1015 }
1016 if (!(srv->replyXaction = srv->popRequest(i))) {
1017 if (srv->stats.timedout) {
1018 debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1019 } else {
1020 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
1021 i << " from " << hlp->id_name << " #" << srv->index <<
1022 " '" << srv->rbuf << "'");
1023 }
1024 srv->ignoreToEom = true;
1025 }
1026 } // else we need to just append reply data to the current Xaction
1027
1028 if (!needsMore) {
1029 size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1030 assert(msgSize <= srv->rbuf_sz);
1031 helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1032 msg += msgSize + skip;
1033 assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1034
1035 // The next message should not ignored.
1036 if (eom && srv->ignoreToEom)
1037 srv->ignoreToEom = false;
1038 } else
1039 assert(skip == 0 && eom == NULL);
1040 }
1041
1042 if (needsMore) {
1043 size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1044 assert(msgSize <= srv->rbuf_sz);
1045 memmove(srv->rbuf, msg, msgSize);
1046 srv->roffset = msgSize;
1047 srv->rbuf[srv->roffset] = '\0';
1048 } else {
1049 // All of the responses parsed and msg points at the end of read data
1050 assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1051 srv->roffset = 0;
1052 }
1053
1054 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1055 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1056 assert(spaceSize >= 0);
1057
1058 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1059 CommIoCbPtrFun(helperHandleRead, srv));
1060 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1061 }
1062 }
1063
1064 static void
helperStatefulHandleRead(const Comm::ConnectionPointer & conn,char *,size_t len,Comm::Flag flag,int,void * data)1065 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1066 {
1067 char *t = NULL;
1068 helper_stateful_server *srv = (helper_stateful_server *)data;
1069 statefulhelper *hlp = srv->parent;
1070 assert(cbdataReferenceValid(data));
1071
1072 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1073
1074 if (flag == Comm::ERR_CLOSING) {
1075 return;
1076 }
1077
1078 assert(conn->fd == srv->readPipe->fd);
1079
1080 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1081 hlp->id_name << " #" << srv->index);
1082
1083 if (flag != Comm::OK || len == 0) {
1084 srv->closePipesSafely(hlp->id_name);
1085 return;
1086 }
1087
1088 srv->roffset += len;
1089 srv->rbuf[srv->roffset] = '\0';
1090 Helper::Xaction *r = srv->requests.front();
1091 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1092
1093 if (r == NULL) {
1094 /* someone spoke without being spoken to */
1095 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1096 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1097 " bytes '" << srv->rbuf << "'");
1098
1099 srv->roffset = 0;
1100 }
1101
1102 if ((t = strchr(srv->rbuf, hlp->eom))) {
1103 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1104
1105 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1106 *t = '\0';
1107 // rewind to the \r octet which is the real terminal now
1108 --t;
1109 }
1110
1111 *t = '\0';
1112 }
1113
1114 if (r && !r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1115 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1116 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1117 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1118 srv->closePipesSafely(hlp->id_name);
1119 return;
1120 }
1121 /**
1122 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1123 * Doing this prohibits concurrency support with multiple replies per read().
1124 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1125 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1126 */
1127 srv->roffset = 0;
1128
1129 if (t) {
1130 /* end of reply found */
1131 srv->requests.pop_front(); // we already have it in 'r'
1132 int called = 1;
1133
1134 if (r && cbdataReferenceValid(r->request.data)) {
1135 r->reply.finalize();
1136 r->reply.whichServer = srv;
1137 r->request.callback(r->request.data, r->reply);
1138 } else {
1139 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1140 called = 0;
1141 }
1142
1143 delete r;
1144
1145 -- srv->stats.pending;
1146 ++ srv->stats.replies;
1147
1148 ++ hlp->stats.replies;
1149 srv->answer_time = current_time;
1150 hlp->stats.avg_svc_time =
1151 Math::intAverage(hlp->stats.avg_svc_time,
1152 tvSubMsec(srv->dispatch_time, current_time),
1153 hlp->stats.replies, REDIRECT_AV_FACTOR);
1154
1155 if (called)
1156 helperStatefulServerDone(srv);
1157 else
1158 helperStatefulReleaseServer(srv);
1159 }
1160
1161 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1162 int spaceSize = srv->rbuf_sz - 1;
1163
1164 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1165 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1166 comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1167 }
1168 }
1169
1170 /// Handles a request when all running helpers, if any, are busy.
1171 static void
Enqueue(helper * hlp,Helper::Xaction * r)1172 Enqueue(helper * hlp, Helper::Xaction * r)
1173 {
1174 hlp->queue.push(r);
1175 ++ hlp->stats.queue_size;
1176
1177 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1178 if (hlp->childs.needNew() > 0) {
1179 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1180 helperOpenServers(hlp);
1181 return;
1182 }
1183
1184 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1185 return;
1186
1187 if (squid_curtime - hlp->last_queue_warn < 600)
1188 return;
1189
1190 if (shutting_down || reconfiguring)
1191 return;
1192
1193 hlp->last_queue_warn = squid_curtime;
1194
1195 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1196 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1197 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1198 }
1199
1200 static void
StatefulEnqueue(statefulhelper * hlp,Helper::Xaction * r)1201 StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r)
1202 {
1203 hlp->queue.push(r);
1204 ++ hlp->stats.queue_size;
1205
1206 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1207 if (hlp->childs.needNew() > 0) {
1208 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1209 helperStatefulOpenServers(hlp);
1210 return;
1211 }
1212
1213 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1214 return;
1215
1216 if (squid_curtime - hlp->last_queue_warn < 600)
1217 return;
1218
1219 if (shutting_down || reconfiguring)
1220 return;
1221
1222 hlp->last_queue_warn = squid_curtime;
1223
1224 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1225 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1226 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1227 }
1228
1229 Helper::Xaction *
nextRequest()1230 helper::nextRequest()
1231 {
1232 if (queue.empty())
1233 return nullptr;
1234
1235 auto *r = queue.front();
1236 queue.pop();
1237 --stats.queue_size;
1238 return r;
1239 }
1240
1241 static helper_server *
GetFirstAvailable(const helper * hlp)1242 GetFirstAvailable(const helper * hlp)
1243 {
1244 dlink_node *n;
1245 helper_server *srv;
1246 helper_server *selected = NULL;
1247 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1248
1249 if (hlp->childs.n_running == 0)
1250 return NULL;
1251
1252 /* Find "least" loaded helper (approx) */
1253 for (n = hlp->servers.head; n != NULL; n = n->next) {
1254 srv = (helper_server *)n->data;
1255
1256 if (selected && selected->stats.pending <= srv->stats.pending)
1257 continue;
1258
1259 if (srv->flags.shutdown)
1260 continue;
1261
1262 if (!srv->stats.pending)
1263 return srv;
1264
1265 if (selected) {
1266 selected = srv;
1267 break;
1268 }
1269
1270 selected = srv;
1271 }
1272
1273 if (!selected) {
1274 debugs(84, 5, "GetFirstAvailable: None available.");
1275 return NULL;
1276 }
1277
1278 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1279 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1280 return NULL;
1281 }
1282
1283 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1284 return selected;
1285 }
1286
1287 static helper_stateful_server *
StatefulGetFirstAvailable(const statefulhelper * hlp)1288 StatefulGetFirstAvailable(const statefulhelper * hlp)
1289 {
1290 dlink_node *n;
1291 helper_stateful_server *srv = NULL;
1292 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1293
1294 if (hlp->childs.n_running == 0)
1295 return NULL;
1296
1297 for (n = hlp->servers.head; n != NULL; n = n->next) {
1298 srv = (helper_stateful_server *)n->data;
1299
1300 if (srv->stats.pending)
1301 continue;
1302
1303 if (srv->flags.reserved)
1304 continue;
1305
1306 if (srv->flags.shutdown)
1307 continue;
1308
1309 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1310 return srv;
1311 }
1312
1313 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1314 return NULL;
1315 }
1316
1317 static void
helperDispatchWriteDone(const Comm::ConnectionPointer &,char *,size_t,Comm::Flag flag,int,void * data)1318 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1319 {
1320 helper_server *srv = (helper_server *)data;
1321
1322 srv->writebuf->clean();
1323 delete srv->writebuf;
1324 srv->writebuf = NULL;
1325 srv->flags.writing = false;
1326
1327 if (flag != Comm::OK) {
1328 /* Helper server has crashed */
1329 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1330 return;
1331 }
1332
1333 if (!srv->wqueue->isNull()) {
1334 srv->writebuf = srv->wqueue;
1335 srv->wqueue = new MemBuf;
1336 srv->flags.writing = true;
1337 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1338 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1339 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1340 }
1341 }
1342
1343 static void
helperDispatch(helper_server * srv,Helper::Xaction * r)1344 helperDispatch(helper_server * srv, Helper::Xaction * r)
1345 {
1346 helper *hlp = srv->parent;
1347 const uint64_t reqId = ++srv->nextRequestId;
1348
1349 if (!cbdataReferenceValid(r->request.data)) {
1350 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1351 delete r;
1352 return;
1353 }
1354
1355 r->request.Id = reqId;
1356 helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
1357 r->request.dispatch_time = current_time;
1358
1359 if (srv->wqueue->isNull())
1360 srv->wqueue->init();
1361
1362 if (hlp->childs.concurrency) {
1363 srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
1364 assert(srv->requestsIndex.size() == srv->requests.size());
1365 srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1366 } else
1367 srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1368
1369 if (!srv->flags.writing) {
1370 assert(NULL == srv->writebuf);
1371 srv->writebuf = srv->wqueue;
1372 srv->wqueue = new MemBuf;
1373 srv->flags.writing = true;
1374 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1375 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1376 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1377 }
1378
1379 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1380
1381 ++ srv->stats.uses;
1382 ++ srv->stats.pending;
1383 ++ hlp->stats.requests;
1384 }
1385
1386 static void
helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &,char *,size_t,Comm::Flag,int,void *)1387 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
1388 {}
1389
1390 static void
helperStatefulDispatch(helper_stateful_server * srv,Helper::Xaction * r)1391 helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
1392 {
1393 statefulhelper *hlp = srv->parent;
1394
1395 if (!cbdataReferenceValid(r->request.data)) {
1396 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1397 delete r;
1398 helperStatefulReleaseServer(srv);
1399 return;
1400 }
1401
1402 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1403
1404 if (r->request.placeholder == 1) {
1405 /* a callback is needed before this request can _use_ a helper. */
1406 /* we don't care about releasing this helper. The request NEVER
1407 * gets to the helper. So we throw away the return code */
1408 r->reply.result = Helper::Unknown;
1409 r->reply.whichServer = srv;
1410 r->request.callback(r->request.data, r->reply);
1411 /* throw away the placeholder */
1412 delete r;
1413 /* and push the queue. Note that the callback may have submitted a new
1414 * request to the helper which is why we test for the request */
1415
1416 if (!srv->requests.size())
1417 helperStatefulServerDone(srv);
1418
1419 return;
1420 }
1421
1422 srv->flags.reserved = true;
1423 srv->requests.push_back(r);
1424 srv->dispatch_time = current_time;
1425 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1426 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1427 Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, NULL);
1428 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1429 hlp->id_name << " #" << srv->index << ", " <<
1430 (int) strlen(r->request.buf) << " bytes");
1431
1432 ++ srv->stats.uses;
1433 ++ srv->stats.pending;
1434 ++ hlp->stats.requests;
1435 }
1436
1437 static void
helperKickQueue(helper * hlp)1438 helperKickQueue(helper * hlp)
1439 {
1440 Helper::Xaction *r;
1441 helper_server *srv;
1442
1443 while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1444 helperDispatch(srv, r);
1445 }
1446
1447 static void
helperStatefulKickQueue(statefulhelper * hlp)1448 helperStatefulKickQueue(statefulhelper * hlp)
1449 {
1450 Helper::Xaction *r;
1451 helper_stateful_server *srv;
1452
1453 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1454 helperStatefulDispatch(srv, r);
1455 }
1456
1457 static void
helperStatefulServerDone(helper_stateful_server * srv)1458 helperStatefulServerDone(helper_stateful_server * srv)
1459 {
1460 if (!srv->flags.shutdown) {
1461 helperStatefulKickQueue(srv->parent);
1462 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->stats.pending) {
1463 srv->closeWritePipeSafely(srv->parent->id_name);
1464 return;
1465 }
1466 }
1467
1468 void
checkForTimedOutRequests(bool const retry)1469 helper_server::checkForTimedOutRequests(bool const retry)
1470 {
1471 assert(parent->childs.concurrency);
1472 while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1473 Helper::Xaction *r = requests.front();
1474 RequestIndex::iterator it;
1475 it = requestsIndex.find(r->request.Id);
1476 assert(it != requestsIndex.end());
1477 requestsIndex.erase(it);
1478 requests.pop_front();
1479 debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1480 void *cbdata;
1481 bool retried = false;
1482 if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1483 debugs(84, 2, "Retry request " << r->request.Id);
1484 ++r->request.retries;
1485 parent->submitRequest(r);
1486 retried = true;
1487 } else if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
1488 if (!parent->onTimedOutResponse.isEmpty()) {
1489 if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1490 r->reply.finalize();
1491 else
1492 r->reply.result = Helper::TimedOut;
1493 r->request.callback(cbdata, r->reply);
1494 } else {
1495 r->reply.result = Helper::TimedOut;
1496 r->request.callback(cbdata, r->reply);
1497 }
1498 }
1499 --stats.pending;
1500 ++stats.timedout;
1501 ++parent->stats.timedout;
1502 if (!retried)
1503 delete r;
1504 }
1505 }
1506
1507 void
requestTimeout(const CommTimeoutCbParams & io)1508 helper_server::requestTimeout(const CommTimeoutCbParams &io)
1509 {
1510 debugs(26, 3, HERE << io.conn);
1511 helper_server *srv = static_cast<helper_server *>(io.data);
1512
1513 if (!cbdataReferenceValid(srv))
1514 return;
1515
1516 srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1517
1518 debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
1519 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
1520 CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
1521
1522 const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1523 const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
1524
1525 commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1526 }
1527
1528