1 /*
2  * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 #include "squid.h"
10 #include "acl/FilledChecklist.h"
11 #include "acl/Gadgets.h"
12 #include "base/TextException.h"
13 #include "clients/Client.h"
14 #include "comm/Connection.h"
15 #include "comm/forward.h"
16 #include "comm/Write.h"
17 #include "err_detail_type.h"
18 #include "errorpage.h"
19 #include "fd.h"
20 #include "HttpHdrContRange.h"
21 #include "HttpReply.h"
22 #include "HttpRequest.h"
23 #include "SquidConfig.h"
24 #include "SquidTime.h"
25 #include "StatCounters.h"
26 #include "Store.h"
27 #include "tools.h"
28 
29 #if USE_ADAPTATION
30 #include "adaptation/AccessCheck.h"
31 #include "adaptation/Answer.h"
32 #include "adaptation/Iterator.h"
33 #include "base/AsyncCall.h"
34 #endif
35 
36 // implemented in client_side_reply.cc until sides have a common parent
37 void purgeEntriesByUrl(HttpRequest * req, const char *url);
38 
Client(FwdState * theFwdState)39 Client::Client(FwdState *theFwdState): AsyncJob("Client"),
40     completed(false),
41     currentOffset(0),
42     responseBodyBuffer(NULL),
43     fwd(theFwdState),
44     requestSender(NULL),
45 #if USE_ADAPTATION
46     adaptedHeadSource(NULL),
47     adaptationAccessCheckPending(false),
48     startedAdaptation(false),
49 #endif
50     receivedWholeRequestBody(false),
51     doneWithFwd(nullptr),
52     theVirginReply(NULL),
53     theFinalReply(NULL)
54 {
55     entry = fwd->entry;
56     entry->lock("Client");
57 
58     request = fwd->request;
59     HTTPMSGLOCK(request);
60 }
61 
~Client()62 Client::~Client()
63 {
64     // paranoid: check that swanSong has been called
65     assert(!requestBodySource);
66 #if USE_ADAPTATION
67     assert(!virginBodyDestination);
68     assert(!adaptedBodySource);
69 #endif
70 
71     entry->unlock("Client");
72 
73     HTTPMSGUNLOCK(request);
74     HTTPMSGUNLOCK(theVirginReply);
75     HTTPMSGUNLOCK(theFinalReply);
76 
77     if (responseBodyBuffer != NULL) {
78         delete responseBodyBuffer;
79         responseBodyBuffer = NULL;
80     }
81 }
82 
83 void
swanSong()84 Client::swanSong()
85 {
86     // get rid of our piping obligations
87     if (requestBodySource != NULL)
88         stopConsumingFrom(requestBodySource);
89 
90 #if USE_ADAPTATION
91     cleanAdaptation();
92 #endif
93 
94     if (!doneWithServer())
95         closeServer();
96 
97     if (!doneWithFwd) {
98         doneWithFwd = "swanSong()";
99         fwd->handleUnregisteredServerEnd();
100     }
101 
102     BodyConsumer::swanSong();
103 #if USE_ADAPTATION
104     Initiator::swanSong();
105     BodyProducer::swanSong();
106 #endif
107 
108     // paranoid: check that swanSong has been called
109     // extra paranoid: yeah, I really mean it. they MUST pass here.
110     assert(!requestBodySource);
111 #if USE_ADAPTATION
112     assert(!virginBodyDestination);
113     assert(!adaptedBodySource);
114 #endif
115 }
116 
117 HttpReply *
virginReply()118 Client::virginReply()
119 {
120     assert(theVirginReply);
121     return theVirginReply;
122 }
123 
124 const HttpReply *
virginReply() const125 Client::virginReply() const
126 {
127     assert(theVirginReply);
128     return theVirginReply;
129 }
130 
131 HttpReply *
setVirginReply(HttpReply * rep)132 Client::setVirginReply(HttpReply *rep)
133 {
134     debugs(11,5, HERE << this << " setting virgin reply to " << rep);
135     assert(!theVirginReply);
136     assert(rep);
137     theVirginReply = rep;
138     HTTPMSGLOCK(theVirginReply);
139     return theVirginReply;
140 }
141 
142 HttpReply *
finalReply()143 Client::finalReply()
144 {
145     assert(theFinalReply);
146     return theFinalReply;
147 }
148 
149 HttpReply *
setFinalReply(HttpReply * rep)150 Client::setFinalReply(HttpReply *rep)
151 {
152     debugs(11,5, HERE << this << " setting final reply to " << rep);
153 
154     assert(!theFinalReply);
155     assert(rep);
156     theFinalReply = rep;
157     HTTPMSGLOCK(theFinalReply);
158 
159     // give entry the reply because haveParsedReplyHeaders() expects it there
160     entry->replaceHttpReply(theFinalReply, false); // but do not write yet
161     haveParsedReplyHeaders(); // update the entry/reply (e.g., set timestamps)
162     if (!EBIT_TEST(entry->flags, RELEASE_REQUEST) && blockCaching())
163         entry->release();
164     entry->startWriting(); // write the updated entry to store
165 
166     return theFinalReply;
167 }
168 
169 // called when no more server communication is expected; may quit
170 void
serverComplete()171 Client::serverComplete()
172 {
173     debugs(11,5,HERE << "serverComplete " << this);
174 
175     if (!doneWithServer()) {
176         closeServer();
177         assert(doneWithServer());
178     }
179 
180     completed = true;
181 
182     HttpRequest *r = originalRequest();
183     r->hier.stopPeerClock(true);
184 
185     if (requestBodySource != NULL)
186         stopConsumingFrom(requestBodySource);
187 
188     if (responseBodyBuffer != NULL)
189         return;
190 
191     serverComplete2();
192 }
193 
194 void
serverComplete2()195 Client::serverComplete2()
196 {
197     debugs(11,5,HERE << "serverComplete2 " << this);
198 
199 #if USE_ADAPTATION
200     if (virginBodyDestination != NULL)
201         stopProducingFor(virginBodyDestination, true);
202 
203     if (!doneWithAdaptation())
204         return;
205 #endif
206 
207     completeForwarding();
208 }
209 
doneAll() const210 bool Client::doneAll() const
211 {
212     return  doneWithServer() &&
213 #if USE_ADAPTATION
214             doneWithAdaptation() &&
215             Adaptation::Initiator::doneAll() &&
216             BodyProducer::doneAll() &&
217 #endif
218             BodyConsumer::doneAll();
219 }
220 
221 // FTP side overloads this to work around multiple calls to fwd->complete
222 void
completeForwarding()223 Client::completeForwarding()
224 {
225     debugs(11,5, HERE << "completing forwarding for "  << fwd);
226     assert(fwd != NULL);
227     doneWithFwd = "completeForwarding()";
228     fwd->complete();
229 }
230 
231 // Register to receive request body
startRequestBodyFlow()232 bool Client::startRequestBodyFlow()
233 {
234     HttpRequest *r = originalRequest();
235     assert(r->body_pipe != NULL);
236     requestBodySource = r->body_pipe;
237     if (requestBodySource->setConsumerIfNotLate(this)) {
238         debugs(11,3, HERE << "expecting request body from " <<
239                requestBodySource->status());
240         return true;
241     }
242 
243     debugs(11,3, HERE << "aborting on partially consumed request body: " <<
244            requestBodySource->status());
245     requestBodySource = NULL;
246     return false;
247 }
248 
249 // Entry-dependent callbacks use this check to quit if the entry went bad
250 bool
abortOnBadEntry(const char * abortReason)251 Client::abortOnBadEntry(const char *abortReason)
252 {
253     if (entry->isAccepting())
254         return false;
255 
256     debugs(11,5, HERE << "entry is not Accepting!");
257     abortOnData(abortReason);
258     return true;
259 }
260 
261 // more request or adapted response body is available
262 void
noteMoreBodyDataAvailable(BodyPipe::Pointer bp)263 Client::noteMoreBodyDataAvailable(BodyPipe::Pointer bp)
264 {
265 #if USE_ADAPTATION
266     if (adaptedBodySource == bp) {
267         handleMoreAdaptedBodyAvailable();
268         return;
269     }
270 #endif
271     if (requestBodySource == bp)
272         handleMoreRequestBodyAvailable();
273 }
274 
275 // the entire request or adapted response body was provided, successfully
276 void
noteBodyProductionEnded(BodyPipe::Pointer bp)277 Client::noteBodyProductionEnded(BodyPipe::Pointer bp)
278 {
279 #if USE_ADAPTATION
280     if (adaptedBodySource == bp) {
281         handleAdaptedBodyProductionEnded();
282         return;
283     }
284 #endif
285     if (requestBodySource == bp)
286         handleRequestBodyProductionEnded();
287 }
288 
289 // premature end of the request or adapted response body production
290 void
noteBodyProducerAborted(BodyPipe::Pointer bp)291 Client::noteBodyProducerAborted(BodyPipe::Pointer bp)
292 {
293 #if USE_ADAPTATION
294     if (adaptedBodySource == bp) {
295         handleAdaptedBodyProducerAborted();
296         return;
297     }
298 #endif
299     if (requestBodySource == bp)
300         handleRequestBodyProducerAborted();
301 }
302 
303 bool
abortOnData(const char * reason)304 Client::abortOnData(const char *reason)
305 {
306     abortAll(reason);
307     return true;
308 }
309 
310 // more origin request body data is available
311 void
handleMoreRequestBodyAvailable()312 Client::handleMoreRequestBodyAvailable()
313 {
314     if (!requestSender)
315         sendMoreRequestBody();
316     else
317         debugs(9,3, HERE << "waiting for request body write to complete");
318 }
319 
320 // there will be no more handleMoreRequestBodyAvailable calls
321 void
handleRequestBodyProductionEnded()322 Client::handleRequestBodyProductionEnded()
323 {
324     receivedWholeRequestBody = true;
325     if (!requestSender)
326         doneSendingRequestBody();
327     else
328         debugs(9,3, HERE << "waiting for request body write to complete");
329 }
330 
331 // called when we are done sending request body; kids extend this
332 void
doneSendingRequestBody()333 Client::doneSendingRequestBody()
334 {
335     debugs(9,3, HERE << "done sending request body");
336     assert(requestBodySource != NULL);
337     stopConsumingFrom(requestBodySource);
338 
339     // kids extend this
340 }
341 
342 // called when body producers aborts; kids extend this
343 void
handleRequestBodyProducerAborted()344 Client::handleRequestBodyProducerAborted()
345 {
346     if (requestSender != NULL)
347         debugs(9,3, HERE << "fyi: request body aborted while we were sending");
348 
349     fwd->dontRetry(true); // the problem is not with the server
350     stopConsumingFrom(requestBodySource); // requestSender, if any, will notice
351 
352     // kids extend this
353 }
354 
355 // called when we wrote request headers(!) or a part of the body
356 void
sentRequestBody(const CommIoCbParams & io)357 Client::sentRequestBody(const CommIoCbParams &io)
358 {
359     debugs(11, 5, "sentRequestBody: FD " << io.fd << ": size " << io.size << ": errflag " << io.flag << ".");
360     debugs(32,3,HERE << "sentRequestBody called");
361 
362     requestSender = NULL;
363 
364     if (io.size > 0) {
365         fd_bytes(io.fd, io.size, FD_WRITE);
366         statCounter.server.all.kbytes_out += io.size;
367         // kids should increment their counters
368     }
369 
370     if (io.flag == Comm::ERR_CLOSING)
371         return;
372 
373     if (!requestBodySource) {
374         debugs(9,3, HERE << "detected while-we-were-sending abort");
375         return; // do nothing;
376     }
377 
378     // both successful and failed writes affect response times
379     request->hier.notePeerWrite();
380 
381     if (io.flag) {
382         debugs(11, DBG_IMPORTANT, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(io.xerrno));
383         ErrorState *err;
384         err = new ErrorState(ERR_WRITE_ERROR, Http::scBadGateway, fwd->request);
385         err->xerrno = io.xerrno;
386         fwd->fail(err);
387         abortOnData("I/O error while sending request body");
388         return;
389     }
390 
391     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
392         abortOnData("store entry aborted while sending request body");
393         return;
394     }
395 
396     if (!requestBodySource->exhausted())
397         sendMoreRequestBody();
398     else if (receivedWholeRequestBody)
399         doneSendingRequestBody();
400     else
401         debugs(9,3, HERE << "waiting for body production end or abort");
402 }
403 
404 void
sendMoreRequestBody()405 Client::sendMoreRequestBody()
406 {
407     assert(requestBodySource != NULL);
408     assert(!requestSender);
409 
410     const Comm::ConnectionPointer conn = dataConnection();
411 
412     if (!Comm::IsConnOpen(conn)) {
413         debugs(9,3, HERE << "cannot send request body to closing " << conn);
414         return; // wait for the kid's close handler; TODO: assert(closer);
415     }
416 
417     MemBuf buf;
418     if (getMoreRequestBody(buf) && buf.contentSize() > 0) {
419         debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes");
420         typedef CommCbMemFunT<Client, CommIoCbParams> Dialer;
421         requestSender = JobCallback(93,3, Dialer, this, Client::sentRequestBody);
422         Comm::Write(conn, &buf, requestSender);
423     } else {
424         debugs(9,3, HERE << "will wait for more request body bytes or eof");
425         requestSender = NULL;
426     }
427 }
428 
429 /// either fill buf with available [encoded] request body bytes or return false
430 bool
getMoreRequestBody(MemBuf & buf)431 Client::getMoreRequestBody(MemBuf &buf)
432 {
433     // default implementation does not encode request body content
434     Must(requestBodySource != NULL);
435     return requestBodySource->getMoreData(buf);
436 }
437 
438 // Compares hosts in urls, returns false if different, no sheme, or no host.
439 static bool
sameUrlHosts(const char * url1,const char * url2)440 sameUrlHosts(const char *url1, const char *url2)
441 {
442     // XXX: Want AnyP::Uri::parse() here, but it uses static storage and copying
443     const char *host1 = strchr(url1, ':');
444     const char *host2 = strchr(url2, ':');
445 
446     if (host1 && host2) {
447         // skip scheme slashes
448         do {
449             ++host1;
450             ++host2;
451         } while (*host1 == '/' && *host2 == '/');
452 
453         if (!*host1)
454             return false; // no host
455 
456         // increment while the same until we reach the end of the URL/host
457         while (*host1 && *host1 != '/' && *host1 == *host2) {
458             ++host1;
459             ++host2;
460         }
461         return *host1 == *host2;
462     }
463 
464     return false; // no URL scheme
465 }
466 
467 // purges entries that match the value of a given HTTP [response] header
468 static void
purgeEntriesByHeader(HttpRequest * req,const char * reqUrl,HttpMsg * rep,Http::HdrType hdr)469 purgeEntriesByHeader(HttpRequest *req, const char *reqUrl, HttpMsg *rep, Http::HdrType hdr)
470 {
471     const auto hdrUrl = rep->header.getStr(hdr);
472     if (!hdrUrl)
473         return;
474 
475     /*
476      * If the URL is relative, make it absolute so we can find it.
477      * If it's absolute, make sure the host parts match to avoid DOS attacks
478      * as per RFC 2616 13.10.
479      */
480     SBuf absUrlMaker;
481     const char *absUrl = nullptr;
482     if (urlIsRelative(hdrUrl)) {
483         if (req->method.id() == Http::METHOD_CONNECT)
484             absUrl = hdrUrl; // TODO: merge authority-uri and hdrUrl
485         else if (req->url.getScheme() == AnyP::PROTO_URN)
486             absUrl = req->url.absolute().c_str();
487         else {
488             AnyP::Uri tmpUrl = req->url;
489             if (*hdrUrl == '/') {
490                 // RFC 3986 section 4.2: absolute-path reference
491                 // for this logic replace the entire request-target URI path
492                 tmpUrl.path(hdrUrl);
493             } else {
494                 tmpUrl.addRelativePath(reqUrl);
495             }
496             absUrlMaker = tmpUrl.absolute();
497             absUrl = absUrlMaker.c_str();
498         }
499     } else if (!sameUrlHosts(reqUrl, hdrUrl)) {
500         return;
501     } else
502         absUrl = hdrUrl;
503 
504     purgeEntriesByUrl(req, absUrl);
505 }
506 
507 // some HTTP methods should purge matching cache entries
508 void
maybePurgeOthers()509 Client::maybePurgeOthers()
510 {
511     // only some HTTP methods should purge matching cache entries
512     if (!request->method.purgesOthers())
513         return;
514 
515     // and probably only if the response was successful
516     if (theFinalReply->sline.status() >= 400)
517         return;
518 
519     // XXX: should we use originalRequest() here?
520     SBuf tmp(request->effectiveRequestUri());
521     const char *reqUrl = tmp.c_str();
522     debugs(88, 5, "maybe purging due to " << request->method << ' ' << tmp);
523     purgeEntriesByUrl(request, reqUrl);
524     purgeEntriesByHeader(request, reqUrl, theFinalReply, Http::HdrType::LOCATION);
525     purgeEntriesByHeader(request, reqUrl, theFinalReply, Http::HdrType::CONTENT_LOCATION);
526 }
527 
528 /// called when we have final (possibly adapted) reply headers; kids extend
529 void
haveParsedReplyHeaders()530 Client::haveParsedReplyHeaders()
531 {
532     Must(theFinalReply);
533     maybePurgeOthers();
534 
535     // adaptation may overwrite old offset computed using the virgin response
536     currentOffset = 0;
537     if (const auto cr = theFinalReply->contentRange()) {
538         if (cr->spec.offset != HttpHdrRangeSpec::UnknownPosition)
539             currentOffset = cr->spec.offset;
540     }
541 }
542 
543 /// whether to prevent caching of an otherwise cachable response
544 bool
blockCaching()545 Client::blockCaching()
546 {
547     if (const Acl::Tree *acl = Config.accessList.storeMiss) {
548         // This relatively expensive check is not in StoreEntry::checkCachable:
549         // That method lacks HttpRequest and may be called too many times.
550         ACLFilledChecklist ch(acl, originalRequest(), NULL);
551         ch.reply = const_cast<HttpReply*>(entry->getReply()); // ACLFilledChecklist API bug
552         HTTPMSGLOCK(ch.reply);
553         if (!ch.fastCheck().allowed()) { // when in doubt, block
554             debugs(20, 3, "store_miss prohibits caching");
555             return true;
556         }
557     }
558     return false;
559 }
560 
561 HttpRequest *
originalRequest()562 Client::originalRequest()
563 {
564     return request;
565 }
566 
567 #if USE_ADAPTATION
568 /// Initiate an asynchronous adaptation transaction which will call us back.
569 void
startAdaptation(const Adaptation::ServiceGroupPointer & group,HttpRequest * cause)570 Client::startAdaptation(const Adaptation::ServiceGroupPointer &group, HttpRequest *cause)
571 {
572     debugs(11, 5, "Client::startAdaptation() called");
573     // check whether we should be sending a body as well
574     // start body pipe to feed ICAP transaction if needed
575     assert(!virginBodyDestination);
576     HttpReply *vrep = virginReply();
577     assert(!vrep->body_pipe);
578     int64_t size = 0;
579     if (vrep->expectingBody(cause->method, size) && size) {
580         virginBodyDestination = new BodyPipe(this);
581         vrep->body_pipe = virginBodyDestination;
582         debugs(93, 6, HERE << "will send virgin reply body to " <<
583                virginBodyDestination << "; size: " << size);
584         if (size > 0)
585             virginBodyDestination->setBodySize(size);
586     }
587 
588     adaptedHeadSource = initiateAdaptation(
589                             new Adaptation::Iterator(vrep, cause, fwd->al, group));
590     startedAdaptation = initiated(adaptedHeadSource);
591     Must(startedAdaptation);
592 }
593 
594 // properly cleans up ICAP-related state
595 // may be called multiple times
cleanAdaptation()596 void Client::cleanAdaptation()
597 {
598     debugs(11,5, HERE << "cleaning ICAP; ACL: " << adaptationAccessCheckPending);
599 
600     if (virginBodyDestination != NULL)
601         stopProducingFor(virginBodyDestination, false);
602 
603     announceInitiatorAbort(adaptedHeadSource);
604 
605     if (adaptedBodySource != NULL)
606         stopConsumingFrom(adaptedBodySource);
607 
608     if (!adaptationAccessCheckPending) // we cannot cancel a pending callback
609         assert(doneWithAdaptation()); // make sure the two methods are in sync
610 }
611 
612 bool
doneWithAdaptation() const613 Client::doneWithAdaptation() const
614 {
615     return !adaptationAccessCheckPending &&
616            !virginBodyDestination && !adaptedHeadSource && !adaptedBodySource;
617 }
618 
619 // sends virgin reply body to ICAP, buffering excesses if needed
620 void
adaptVirginReplyBody(const char * data,ssize_t len)621 Client::adaptVirginReplyBody(const char *data, ssize_t len)
622 {
623     assert(startedAdaptation);
624 
625     if (!virginBodyDestination) {
626         debugs(11,3, HERE << "ICAP does not want more virgin body");
627         return;
628     }
629 
630     // grow overflow area if already overflowed
631     if (responseBodyBuffer) {
632         responseBodyBuffer->append(data, len);
633         data = responseBodyBuffer->content();
634         len = responseBodyBuffer->contentSize();
635     }
636 
637     const ssize_t putSize = virginBodyDestination->putMoreData(data, len);
638     data += putSize;
639     len -= putSize;
640 
641     // if we had overflow area, shrink it as necessary
642     if (responseBodyBuffer) {
643         if (putSize == responseBodyBuffer->contentSize()) {
644             delete responseBodyBuffer;
645             responseBodyBuffer = NULL;
646         } else {
647             responseBodyBuffer->consume(putSize);
648         }
649         return;
650     }
651 
652     // if we did not have an overflow area, create it as needed
653     if (len > 0) {
654         assert(!responseBodyBuffer);
655         responseBodyBuffer = new MemBuf;
656         responseBodyBuffer->init(4096, SQUID_TCP_SO_RCVBUF * 10);
657         responseBodyBuffer->append(data, len);
658     }
659 }
660 
661 // can supply more virgin response body data
662 void
noteMoreBodySpaceAvailable(BodyPipe::Pointer)663 Client::noteMoreBodySpaceAvailable(BodyPipe::Pointer)
664 {
665     if (responseBodyBuffer) {
666         addVirginReplyBody(NULL, 0); // kick the buffered fragment alive again
667         if (completed && !responseBodyBuffer) {
668             serverComplete2();
669             return;
670         }
671     }
672     maybeReadVirginBody();
673 }
674 
675 // the consumer of our virgin response body aborted
676 void
noteBodyConsumerAborted(BodyPipe::Pointer)677 Client::noteBodyConsumerAborted(BodyPipe::Pointer)
678 {
679     stopProducingFor(virginBodyDestination, false);
680 
681     // do not force closeServer here in case we need to bypass AdaptationQueryAbort
682 
683     if (doneWithAdaptation()) // we may still be receiving adapted response
684         handleAdaptationCompleted();
685 }
686 
687 // received adapted response headers (body may follow)
688 void
noteAdaptationAnswer(const Adaptation::Answer & answer)689 Client::noteAdaptationAnswer(const Adaptation::Answer &answer)
690 {
691     clearAdaptation(adaptedHeadSource); // we do not expect more messages
692 
693     switch (answer.kind) {
694     case Adaptation::Answer::akForward:
695         handleAdaptedHeader(const_cast<HttpMsg*>(answer.message.getRaw()));
696         break;
697 
698     case Adaptation::Answer::akBlock:
699         handleAdaptationBlocked(answer);
700         break;
701 
702     case Adaptation::Answer::akError:
703         handleAdaptationAborted(!answer.final);
704         break;
705     }
706 }
707 
708 void
handleAdaptedHeader(HttpMsg * msg)709 Client::handleAdaptedHeader(HttpMsg *msg)
710 {
711     if (abortOnBadEntry("entry went bad while waiting for adapted headers")) {
712         // If the adapted response has a body, the ICAP side needs to know
713         // that nobody will consume that body. We will be destroyed upon
714         // return. Tell the ICAP side that it is on its own.
715         HttpReply *rep = dynamic_cast<HttpReply*>(msg);
716         assert(rep);
717         if (rep->body_pipe != NULL)
718             rep->body_pipe->expectNoConsumption();
719 
720         return;
721     }
722 
723     HttpReply *rep = dynamic_cast<HttpReply*>(msg);
724     assert(rep);
725     debugs(11,5, HERE << this << " setting adapted reply to " << rep);
726     setFinalReply(rep);
727 
728     assert(!adaptedBodySource);
729     if (rep->body_pipe != NULL) {
730         // subscribe to receive adapted body
731         adaptedBodySource = rep->body_pipe;
732         // assume that ICAP does not auto-consume on failures
733         const bool result = adaptedBodySource->setConsumerIfNotLate(this);
734         assert(result);
735     } else {
736         // no body
737         if (doneWithAdaptation()) // we may still be sending virgin response
738             handleAdaptationCompleted();
739     }
740 }
741 
742 void
resumeBodyStorage()743 Client::resumeBodyStorage()
744 {
745     if (abortOnBadEntry("store entry aborted while kick producer callback"))
746         return;
747 
748     if (!adaptedBodySource)
749         return;
750 
751     handleMoreAdaptedBodyAvailable();
752 
753     if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
754         endAdaptedBodyConsumption();
755 }
756 
757 // more adapted response body is available
758 void
handleMoreAdaptedBodyAvailable()759 Client::handleMoreAdaptedBodyAvailable()
760 {
761     if (abortOnBadEntry("entry refuses adapted body"))
762         return;
763 
764     assert(entry);
765 
766     size_t contentSize = adaptedBodySource->buf().contentSize();
767 
768     if (!contentSize)
769         return; // XXX: bytesWanted asserts on zero-size ranges
770 
771     const size_t spaceAvailable = entry->bytesWanted(Range<size_t>(0, contentSize), true);
772 
773     if (spaceAvailable < contentSize ) {
774         // No or partial body data consuming
775         typedef NullaryMemFunT<Client> Dialer;
776         AsyncCall::Pointer call = asyncCall(93, 5, "Client::resumeBodyStorage",
777                                             Dialer(this, &Client::resumeBodyStorage));
778         entry->deferProducer(call);
779     }
780 
781     if (!spaceAvailable)  {
782         debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
783                "response body at offset " << adaptedBodySource->consumedSize());
784         return;
785     }
786 
787     if (spaceAvailable < contentSize ) {
788         debugs(11, 5, HERE << "postponing storage of " <<
789                (contentSize - spaceAvailable) << " body bytes");
790         contentSize = spaceAvailable;
791     }
792 
793     debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
794            "response body at offset " << adaptedBodySource->consumedSize());
795 
796     BodyPipeCheckout bpc(*adaptedBodySource);
797     const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
798     currentOffset += ioBuf.length;
799     entry->write(ioBuf);
800     bpc.buf.consume(contentSize);
801     bpc.checkIn();
802 }
803 
804 // the entire adapted response body was produced, successfully
805 void
handleAdaptedBodyProductionEnded()806 Client::handleAdaptedBodyProductionEnded()
807 {
808     if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
809         return;
810 
811     // end consumption if we consumed everything
812     if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
813         endAdaptedBodyConsumption();
814     // else resumeBodyStorage() will eventually consume the rest
815 }
816 
817 void
endAdaptedBodyConsumption()818 Client::endAdaptedBodyConsumption()
819 {
820     stopConsumingFrom(adaptedBodySource);
821     handleAdaptationCompleted();
822 }
823 
824 // premature end of the adapted response body
handleAdaptedBodyProducerAborted()825 void Client::handleAdaptedBodyProducerAborted()
826 {
827     if (abortOnBadEntry("entry went bad while waiting for the now-aborted adapted body"))
828         return;
829 
830     Must(adaptedBodySource != nullptr);
831     if (!adaptedBodySource->exhausted()) {
832         debugs(11,5, "waiting to consume the remainder of the aborted adapted body");
833         return; // resumeBodyStorage() should eventually consume the rest
834     }
835 
836     stopConsumingFrom(adaptedBodySource);
837 
838     if (handledEarlyAdaptationAbort())
839         return;
840 
841     entry->lengthWentBad("body adaptation aborted");
842     handleAdaptationCompleted(); // the user should get a truncated response
843 }
844 
845 // common part of noteAdaptationAnswer and handleAdaptedBodyProductionEnded
846 void
handleAdaptationCompleted()847 Client::handleAdaptationCompleted()
848 {
849     debugs(11,5, HERE << "handleAdaptationCompleted");
850     cleanAdaptation();
851 
852     // We stop reading origin response because we have no place to put it(*) and
853     // cannot use it. If some origin servers do not like that or if we want to
854     // reuse more pconns, we can add code to discard unneeded origin responses.
855     // (*) TODO: Is it possible that the adaptation xaction is still running?
856     if (mayReadVirginReplyBody()) {
857         debugs(11,3, HERE << "closing origin conn due to ICAP completion");
858         closeServer();
859     }
860 
861     completeForwarding();
862 }
863 
864 // common part of noteAdaptation*Aborted and noteBodyConsumerAborted methods
865 void
handleAdaptationAborted(bool bypassable)866 Client::handleAdaptationAborted(bool bypassable)
867 {
868     debugs(11,5, HERE << "handleAdaptationAborted; bypassable: " << bypassable <<
869            ", entry empty: " << entry->isEmpty());
870 
871     if (abortOnBadEntry("entry went bad while ICAP aborted"))
872         return;
873 
874     // TODO: bypass if possible
875     if (!handledEarlyAdaptationAbort())
876         abortAll("adaptation failure with a filled entry");
877 }
878 
879 /// If the store entry is still empty, fully handles adaptation abort, returning
880 /// true. Otherwise just updates the request error detail and returns false.
881 bool
handledEarlyAdaptationAbort()882 Client::handledEarlyAdaptationAbort()
883 {
884     if (entry->isEmpty()) {
885         debugs(11,8, "adaptation failure with an empty entry: " << *entry);
886         ErrorState *err = new ErrorState(ERR_ICAP_FAILURE, Http::scInternalServerError, request);
887         err->detailError(ERR_DETAIL_ICAP_RESPMOD_EARLY);
888         fwd->fail(err);
889         fwd->dontRetry(true);
890         abortAll("adaptation failure with an empty entry");
891         return true; // handled
892     }
893 
894     if (request) // update logged info directly
895         request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_ICAP_RESPMOD_LATE);
896 
897     return false; // the caller must handle
898 }
899 
900 // adaptation service wants us to deny HTTP client access to this response
901 void
handleAdaptationBlocked(const Adaptation::Answer & answer)902 Client::handleAdaptationBlocked(const Adaptation::Answer &answer)
903 {
904     debugs(11,5, HERE << answer.ruleId);
905 
906     if (abortOnBadEntry("entry went bad while ICAP aborted"))
907         return;
908 
909     if (!entry->isEmpty()) { // too late to block (should not really happen)
910         if (request)
911             request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_RESPMOD_BLOCK_LATE);
912         abortAll("late adaptation block");
913         return;
914     }
915 
916     debugs(11,7, HERE << "creating adaptation block response");
917 
918     err_type page_id =
919         aclGetDenyInfoPage(&Config.denyInfoList, answer.ruleId.termedBuf(), 1);
920     if (page_id == ERR_NONE)
921         page_id = ERR_ACCESS_DENIED;
922 
923     ErrorState *err = new ErrorState(page_id, Http::scForbidden, request);
924     err->detailError(ERR_DETAIL_RESPMOD_BLOCK_EARLY);
925     fwd->fail(err);
926     fwd->dontRetry(true);
927 
928     abortOnData("timely adaptation block");
929 }
930 
931 void
noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group)932 Client::noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
933 {
934     adaptationAccessCheckPending = false;
935 
936     if (abortOnBadEntry("entry went bad while waiting for ICAP ACL check"))
937         return;
938 
939     // TODO: Should non-ICAP and ICAP REPMOD pre-cache paths check this?
940     // That check now only happens on REQMOD pre-cache and REPMOD post-cache, in processReplyAccess().
941     if (virginReply()->expectedBodyTooLarge(*request)) {
942         sendBodyIsTooLargeError();
943         return;
944     }
945     // TODO: Should we check receivedBodyTooLarge as well?
946 
947     if (!group) {
948         debugs(11,3, HERE << "no adapation needed");
949         setFinalReply(virginReply());
950         processReplyBody();
951         return;
952     }
953 
954     startAdaptation(group, originalRequest());
955     processReplyBody();
956 }
957 #endif
958 
959 void
sendBodyIsTooLargeError()960 Client::sendBodyIsTooLargeError()
961 {
962     ErrorState *err = new ErrorState(ERR_TOO_BIG, Http::scForbidden, request);
963     fwd->fail(err);
964     fwd->dontRetry(true);
965     abortOnData("Virgin body too large.");
966 }
967 
968 // TODO: when HttpStateData sends all errors to ICAP,
969 // we should be able to move this at the end of setVirginReply().
970 void
adaptOrFinalizeReply()971 Client::adaptOrFinalizeReply()
972 {
973 #if USE_ADAPTATION
974     // TODO: merge with client side and return void to hide the on/off logic?
975     // The callback can be called with a NULL service if adaptation is off.
976     adaptationAccessCheckPending = Adaptation::AccessCheck::Start(
977                                        Adaptation::methodRespmod, Adaptation::pointPreCache,
978                                        originalRequest(), virginReply(), fwd->al, this);
979     debugs(11,5, HERE << "adaptationAccessCheckPending=" << adaptationAccessCheckPending);
980     if (adaptationAccessCheckPending)
981         return;
982 #endif
983 
984     setFinalReply(virginReply());
985 }
986 
987 /// initializes bodyBytesRead stats if needed and applies delta
988 void
adjustBodyBytesRead(const int64_t delta)989 Client::adjustBodyBytesRead(const int64_t delta)
990 {
991     int64_t &bodyBytesRead = originalRequest()->hier.bodyBytesRead;
992 
993     // if we got here, do not log a dash even if we got nothing from the server
994     if (bodyBytesRead < 0)
995         bodyBytesRead = 0;
996 
997     bodyBytesRead += delta; // supports negative and zero deltas
998 
999     // check for overflows ("infinite" response?) and undeflows (a bug)
1000     Must(bodyBytesRead >= 0);
1001 }
1002 
1003 void
addVirginReplyBody(const char * data,ssize_t len)1004 Client::addVirginReplyBody(const char *data, ssize_t len)
1005 {
1006     adjustBodyBytesRead(len);
1007 
1008 #if USE_ADAPTATION
1009     assert(!adaptationAccessCheckPending); // or would need to buffer while waiting
1010     if (startedAdaptation) {
1011         adaptVirginReplyBody(data, len);
1012         return;
1013     }
1014 #endif
1015     storeReplyBody(data, len);
1016 }
1017 
1018 // writes virgin or adapted reply body to store
1019 void
storeReplyBody(const char * data,ssize_t len)1020 Client::storeReplyBody(const char *data, ssize_t len)
1021 {
1022     // write even if len is zero to push headers towards the client side
1023     entry->write (StoreIOBuffer(len, currentOffset, (char*)data));
1024 
1025     currentOffset += len;
1026 }
1027 
1028 size_t
calcBufferSpaceToReserve(size_t space,const size_t wantSpace) const1029 Client::calcBufferSpaceToReserve(size_t space, const size_t wantSpace) const
1030 {
1031     if (space < wantSpace) {
1032         const size_t maxSpace = SBuf::maxSize; // absolute best
1033         space = min(wantSpace, maxSpace); // do not promise more than asked
1034     }
1035 
1036 #if USE_ADAPTATION
1037     if (responseBodyBuffer) {
1038         return 0;   // Stop reading if already overflowed waiting for ICAP to catch up
1039     }
1040 
1041     if (virginBodyDestination != NULL) {
1042         /*
1043          * BodyPipe buffer has a finite size limit.  We
1044          * should not read more data from the network than will fit
1045          * into the pipe buffer or we _lose_ what did not fit if
1046          * the response ends sooner that BodyPipe frees up space:
1047          * There is no code to keep pumping data into the pipe once
1048          * response ends and serverComplete() is called.
1049          */
1050         const size_t adaptor_space = virginBodyDestination->buf().potentialSpaceSize();
1051 
1052         debugs(11,9, "Client may read up to min(" <<
1053                adaptor_space << ", " << space << ") bytes");
1054 
1055         if (adaptor_space < space)
1056             space = adaptor_space;
1057     }
1058 #endif
1059 
1060     return space;
1061 }
1062 
1063 size_t
replyBodySpace(const MemBuf & readBuf,const size_t minSpace) const1064 Client::replyBodySpace(const MemBuf &readBuf, const size_t minSpace) const
1065 {
1066     size_t space = readBuf.spaceSize(); // available space w/o heroic measures
1067     if (space < minSpace) {
1068         const size_t maxSpace = readBuf.potentialSpaceSize(); // absolute best
1069         space = min(minSpace, maxSpace); // do not promise more than asked
1070     }
1071 
1072 #if USE_ADAPTATION
1073     if (responseBodyBuffer) {
1074         return 0;   // Stop reading if already overflowed waiting for ICAP to catch up
1075     }
1076 
1077     if (virginBodyDestination != NULL) {
1078         /*
1079          * BodyPipe buffer has a finite size limit.  We
1080          * should not read more data from the network than will fit
1081          * into the pipe buffer or we _lose_ what did not fit if
1082          * the response ends sooner that BodyPipe frees up space:
1083          * There is no code to keep pumping data into the pipe once
1084          * response ends and serverComplete() is called.
1085          *
1086          * If the pipe is totally full, don't register the read handler.
1087          * The BodyPipe will call our noteMoreBodySpaceAvailable() method
1088          * when it has free space again.
1089          */
1090         size_t adaptation_space =
1091             virginBodyDestination->buf().potentialSpaceSize();
1092 
1093         debugs(11,9, "Client may read up to min(" <<
1094                adaptation_space << ", " << space << ") bytes");
1095 
1096         if (adaptation_space < space)
1097             space = adaptation_space;
1098     }
1099 #endif
1100 
1101     return space;
1102 }
1103 
1104