1 /*
2  * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 90    Storage Manager Client-Side Interface */
10 
11 #include "squid.h"
12 #include "event.h"
13 #include "globals.h"
14 #include "HttpReply.h"
15 #include "HttpRequest.h"
16 #include "MemBuf.h"
17 #include "MemObject.h"
18 #include "mime_header.h"
19 #include "profiler/Profiler.h"
20 #include "SquidConfig.h"
21 #include "StatCounters.h"
22 #include "Store.h"
23 #include "store_swapin.h"
24 #include "StoreClient.h"
25 #include "StoreMeta.h"
26 #include "StoreMetaUnpacker.h"
27 #if USE_DELAY_POOLS
28 #include "DelayPools.h"
29 #endif
30 
31 /*
32  * NOTE: 'Header' refers to the swapfile metadata header.
33  *   'OBJHeader' refers to the object header, with cannonical
34  *   processed object headers (which may derive from FTP/HTTP etc
35  *   upstream protocols
36  *       'Body' refers to the swapfile body, which is the full
37  *        HTTP reply (including HTTP headers and body).
38  */
39 static StoreIOState::STRCB storeClientReadBody;
40 static StoreIOState::STRCB storeClientReadHeader;
41 static void storeClientCopy2(StoreEntry * e, store_client * sc);
42 static EVH storeClientCopyEvent;
43 static bool CheckQuickAbortIsReasonable(StoreEntry * entry);
44 
45 CBDATA_CLASS_INIT(store_client);
46 
47 bool
memReaderHasLowerOffset(int64_t anOffset) const48 store_client::memReaderHasLowerOffset(int64_t anOffset) const
49 {
50     return getType() == STORE_MEM_CLIENT && copyInto.offset < anOffset;
51 }
52 
53 int
getType() const54 store_client::getType() const
55 {
56     return type;
57 }
58 
59 #if STORE_CLIENT_LIST_DEBUG
60 static store_client *
storeClientListSearch(const MemObject * mem,void * data)61 storeClientListSearch(const MemObject * mem, void *data)
62 {
63     dlink_node *node;
64     store_client *sc = NULL;
65 
66     for (node = mem->clients.head; node; node = node->next) {
67         sc = node->data;
68 
69         if (sc->owner == data)
70             return sc;
71     }
72 
73     return NULL;
74 }
75 
76 int
storeClientIsThisAClient(store_client * sc,void * someClient)77 storeClientIsThisAClient(store_client * sc, void *someClient)
78 {
79     return sc->owner == someClient;
80 }
81 
82 #endif
83 #include "HttpRequest.h"
84 
85 /* add client with fd to client list */
86 store_client *
storeClientListAdd(StoreEntry * e,void * data)87 storeClientListAdd(StoreEntry * e, void *data)
88 {
89     MemObject *mem = e->mem_obj;
90     store_client *sc;
91     assert(mem);
92 #if STORE_CLIENT_LIST_DEBUG
93 
94     if (storeClientListSearch(mem, data) != NULL)
95         /* XXX die! */
96         assert(1 == 0);
97 
98 #endif
99 
100     sc = new store_client (e);
101 
102     mem->addClient(sc);
103 
104     return sc;
105 }
106 
107 void
callback(ssize_t sz,bool error)108 store_client::callback(ssize_t sz, bool error)
109 {
110     size_t bSz = 0;
111 
112     if (sz >= 0 && !error)
113         bSz = sz;
114 
115     StoreIOBuffer result(bSz, 0 ,copyInto.data);
116 
117     if (sz < 0 || error)
118         result.flags.error = 1;
119 
120     result.offset = cmp_offset;
121     assert(_callback.pending());
122     cmp_offset = copyInto.offset + bSz;
123     STCB *temphandler = _callback.callback_handler;
124     void *cbdata = _callback.callback_data;
125     _callback = Callback(NULL, NULL);
126     copyInto.data = NULL;
127 
128     if (cbdataReferenceValid(cbdata))
129         temphandler(cbdata, result);
130 
131     cbdataReferenceDone(cbdata);
132 }
133 
134 static void
storeClientCopyEvent(void * data)135 storeClientCopyEvent(void *data)
136 {
137     store_client *sc = (store_client *)data;
138     debugs(90, 3, "storeClientCopyEvent: Running");
139     assert (sc->flags.copy_event_pending);
140     sc->flags.copy_event_pending = false;
141 
142     if (!sc->_callback.pending())
143         return;
144 
145     storeClientCopy2(sc->entry, sc);
146 }
147 
store_client(StoreEntry * e)148 store_client::store_client(StoreEntry *e) :
149     cmp_offset(0),
150 #if STORE_CLIENT_LIST_DEBUG
151     owner(cbdataReference(data)),
152 #endif
153     entry(e),
154     type(e->storeClientType()),
155     object_ok(true)
156 {
157     flags.disk_io_pending = false;
158     flags.store_copying = false;
159     flags.copy_event_pending = false;
160     ++ entry->refcount;
161 
162     if (getType() == STORE_DISK_CLIENT) {
163         /* assert we'll be able to get the data we want */
164         /* maybe we should open swapin_sio here */
165         assert(entry->hasDisk() && !entry->swapoutFailed());
166     }
167 }
168 
~store_client()169 store_client::~store_client()
170 {}
171 
172 /* copy bytes requested by the client */
173 void
storeClientCopy(store_client * sc,StoreEntry * e,StoreIOBuffer copyInto,STCB * callback,void * data)174 storeClientCopy(store_client * sc,
175                 StoreEntry * e,
176                 StoreIOBuffer copyInto,
177                 STCB * callback,
178                 void *data)
179 {
180     assert (sc != NULL);
181     sc->copy(e, copyInto,callback,data);
182 }
183 
184 void
copy(StoreEntry * anEntry,StoreIOBuffer copyRequest,STCB * callback_fn,void * data)185 store_client::copy(StoreEntry * anEntry,
186                    StoreIOBuffer copyRequest,
187                    STCB * callback_fn,
188                    void *data)
189 {
190     assert (anEntry == entry);
191     assert (callback_fn);
192     assert (data);
193     assert(!EBIT_TEST(entry->flags, ENTRY_ABORTED));
194     debugs(90, 3, "store_client::copy: " << entry->getMD5Text() << ", from " <<
195            copyRequest.offset << ", for length " <<
196            (int) copyRequest.length << ", cb " << callback_fn << ", cbdata " <<
197            data);
198 
199 #if STORE_CLIENT_LIST_DEBUG
200 
201     assert(this == storeClientListSearch(entry->mem_obj, data));
202 #endif
203 
204     assert(!_callback.pending());
205 #if ONLYCONTIGUOUSREQUESTS
206 
207     assert(cmp_offset == copyRequest.offset);
208 #endif
209     /* range requests will skip into the body */
210     cmp_offset = copyRequest.offset;
211     _callback = Callback (callback_fn, cbdataReference(data));
212     copyInto.data = copyRequest.data;
213     copyInto.length = copyRequest.length;
214     copyInto.offset = copyRequest.offset;
215 
216     static bool copying (false);
217     assert (!copying);
218     copying = true;
219     PROF_start(storeClient_kickReads);
220     /* we might be blocking comm reads due to readahead limits
221      * now we have a new offset, trigger those reads...
222      */
223     entry->mem_obj->kickReads();
224     PROF_stop(storeClient_kickReads);
225     copying = false;
226 
227     anEntry->lock("store_client::copy"); // see deletion note below
228 
229     storeClientCopy2(entry, this);
230 
231     // Bug 3480: This store_client object may be deleted now if, for example,
232     // the client rejects the hit response copied above. Use on-stack pointers!
233 
234 #if USE_ADAPTATION
235     anEntry->kickProducer();
236 #endif
237     anEntry->unlock("store_client::copy");
238 
239     // Add no code here. This object may no longer exist.
240 }
241 
242 /// Whether there is (or will be) more entry data for us.
243 bool
moreToSend() const244 store_client::moreToSend() const
245 {
246     if (entry->store_status == STORE_PENDING)
247         return true; // there may be more coming
248 
249     /* STORE_OK, including aborted entries: no more data is coming */
250 
251     const int64_t len = entry->objectLen();
252 
253     // If we do not know the entry length, then we have to open the swap file.
254     const bool canSwapIn = entry->hasDisk();
255     if (len < 0)
256         return canSwapIn;
257 
258     if (copyInto.offset >= len)
259         return false; // sent everything there is
260 
261     if (canSwapIn)
262         return true; // if we lack prefix, we can swap it in
263 
264     // If we cannot swap in, make sure we have what we want in RAM. Otherwise,
265     // scheduleRead calls scheduleDiskRead which asserts without a swap file.
266     const MemObject *mem = entry->mem_obj;
267     return mem &&
268            mem->inmem_lo <= copyInto.offset && copyInto.offset < mem->endOffset();
269 }
270 
271 static void
storeClientCopy2(StoreEntry * e,store_client * sc)272 storeClientCopy2(StoreEntry * e, store_client * sc)
273 {
274     /* reentrancy not allowed  - note this could lead to
275      * dropped events
276      */
277 
278     if (sc->flags.copy_event_pending) {
279         return;
280     }
281 
282     if (sc->flags.store_copying) {
283         sc->flags.copy_event_pending = true;
284         debugs(90, 3, "storeClientCopy2: Queueing storeClientCopyEvent()");
285         eventAdd("storeClientCopyEvent", storeClientCopyEvent, sc, 0.0, 0);
286         return;
287     }
288 
289     debugs(90, 3, "storeClientCopy2: " << e->getMD5Text());
290     assert(sc->_callback.pending());
291     /*
292      * We used to check for ENTRY_ABORTED here.  But there were some
293      * problems.  For example, we might have a slow client (or two) and
294      * the peer server is reading far ahead and swapping to disk.  Even
295      * if the peer aborts, we want to give the client(s)
296      * everything we got before the abort condition occurred.
297      */
298     /* Warning: doCopy may indirectly free itself in callbacks,
299      * hence the lock to keep it active for the duration of
300      * this function
301      * XXX: Locking does not prevent calling sc destructor (it only prevents
302      * freeing sc memory) so sc may become invalid from C++ p.o.v.
303      */
304     CbcPointer<store_client> tmpLock = sc;
305     assert (!sc->flags.store_copying);
306     sc->doCopy(e);
307     assert(!sc->flags.store_copying);
308 }
309 
310 void
doCopy(StoreEntry * anEntry)311 store_client::doCopy(StoreEntry *anEntry)
312 {
313     assert (anEntry == entry);
314     flags.store_copying = true;
315     MemObject *mem = entry->mem_obj;
316 
317     debugs(33, 5, "store_client::doCopy: co: " <<
318            copyInto.offset << ", hi: " <<
319            mem->endOffset());
320 
321     if (!moreToSend()) {
322         /* There is no more to send! */
323         debugs(33, 3, HERE << "There is no more to send!");
324         callback(0);
325         flags.store_copying = false;
326         return;
327     }
328 
329     /* Check that we actually have data */
330     if (anEntry->store_status == STORE_PENDING && copyInto.offset >= mem->endOffset()) {
331         debugs(90, 3, "store_client::doCopy: Waiting for more");
332         flags.store_copying = false;
333         return;
334     }
335 
336     /*
337      * Slight weirdness here.  We open a swapin file for any
338      * STORE_DISK_CLIENT, even if we can copy the requested chunk
339      * from memory in the next block.  We must try to open the
340      * swapin file before sending any data to the client side.  If
341      * we postpone the open, and then can not open the file later
342      * on, the client loses big time.  Its transfer just gets cut
343      * off.  Better to open it early (while the client side handler
344      * is clientCacheHit) so that we can fall back to a cache miss
345      * if needed.
346      */
347 
348     if (STORE_DISK_CLIENT == getType() && swapin_sio == NULL) {
349         if (!startSwapin())
350             return; // failure
351     }
352     scheduleRead();
353 }
354 
355 /// opens the swapin "file" if possible; otherwise, fail()s and returns false
356 bool
startSwapin()357 store_client::startSwapin()
358 {
359     debugs(90, 3, "store_client::doCopy: Need to open swap in file");
360     /* gotta open the swapin file */
361 
362     if (storeTooManyDiskFilesOpen()) {
363         /* yuck -- this causes a TCP_SWAPFAIL_MISS on the client side */
364         fail();
365         flags.store_copying = false;
366         return false;
367     } else if (!flags.disk_io_pending) {
368         /* Don't set store_io_pending here */
369         storeSwapInStart(this);
370 
371         if (swapin_sio == NULL) {
372             fail();
373             flags.store_copying = false;
374             return false;
375         }
376 
377         return true;
378     } else {
379         debugs(90, DBG_IMPORTANT, "WARNING: Averted multiple fd operation (1)");
380         flags.store_copying = false;
381         return false;
382     }
383 }
384 
385 void
scheduleRead()386 store_client::scheduleRead()
387 {
388     MemObject *mem = entry->mem_obj;
389 
390     if (copyInto.offset >= mem->inmem_lo && copyInto.offset < mem->endOffset())
391         scheduleMemRead();
392     else
393         scheduleDiskRead();
394 }
395 
396 void
scheduleDiskRead()397 store_client::scheduleDiskRead()
398 {
399     /* What the client wants is not in memory. Schedule a disk read */
400     if (getType() == STORE_DISK_CLIENT) {
401         // we should have called startSwapin() already
402         assert(swapin_sio != NULL);
403     } else if (!swapin_sio && !startSwapin()) {
404         debugs(90, 3, "bailing after swapin start failure for " << *entry);
405         assert(!flags.store_copying);
406         return;
407     }
408 
409     assert(!flags.disk_io_pending);
410 
411     debugs(90, 3, "reading " << *entry << " from disk");
412 
413     fileRead();
414 
415     flags.store_copying = false;
416 }
417 
418 void
scheduleMemRead()419 store_client::scheduleMemRead()
420 {
421     /* What the client wants is in memory */
422     /* Old style */
423     debugs(90, 3, "store_client::doCopy: Copying normal from memory");
424     size_t sz = entry->mem_obj->data_hdr.copy(copyInto);
425     callback(sz);
426     flags.store_copying = false;
427 }
428 
429 void
fileRead()430 store_client::fileRead()
431 {
432     MemObject *mem = entry->mem_obj;
433 
434     assert(_callback.pending());
435     assert(!flags.disk_io_pending);
436     flags.disk_io_pending = true;
437 
438     if (mem->swap_hdr_sz != 0)
439         if (entry->swappingOut())
440             assert(mem->swapout.sio->offset() > copyInto.offset + (int64_t)mem->swap_hdr_sz);
441 
442     storeRead(swapin_sio,
443               copyInto.data,
444               copyInto.length,
445               copyInto.offset + mem->swap_hdr_sz,
446               mem->swap_hdr_sz == 0 ? storeClientReadHeader
447               : storeClientReadBody,
448               this);
449 }
450 
451 void
readBody(const char *,ssize_t len)452 store_client::readBody(const char *, ssize_t len)
453 {
454     int parsed_header = 0;
455 
456     // Don't assert disk_io_pending here.. may be called by read_header
457     flags.disk_io_pending = false;
458     assert(_callback.pending());
459     debugs(90, 3, "storeClientReadBody: len " << len << "");
460 
461     if (len < 0)
462         return fail();
463 
464     if (copyInto.offset == 0 && len > 0 && entry->getReply()->sline.status() == Http::scNone) {
465         /* Our structure ! */
466         HttpReply *rep = (HttpReply *) entry->getReply(); // bypass const
467 
468         if (!rep->parseCharBuf(copyInto.data, headersEnd(copyInto.data, len))) {
469             debugs(90, DBG_CRITICAL, "Could not parse headers from on disk object");
470         } else {
471             parsed_header = 1;
472         }
473     }
474 
475     const HttpReply *rep = entry->getReply();
476     if (len > 0 && rep && entry->mem_obj->inmem_lo == 0 && entry->objectLen() <= (int64_t)Config.Store.maxInMemObjSize && Config.onoff.memory_cache_disk) {
477         storeGetMemSpace(len);
478         // The above may start to free our object so we need to check again
479         if (entry->mem_obj->inmem_lo == 0) {
480             /* Copy read data back into memory.
481              * copyInto.offset includes headers, which is what mem cache needs
482              */
483             int64_t mem_offset = entry->mem_obj->endOffset();
484             if ((copyInto.offset == mem_offset) || (parsed_header && mem_offset == rep->hdr_sz)) {
485                 entry->mem_obj->write(StoreIOBuffer(len, copyInto.offset, copyInto.data));
486             }
487         }
488     }
489 
490     callback(len);
491 }
492 
493 void
fail()494 store_client::fail()
495 {
496     object_ok = false;
497     /* synchronous open failures callback from the store,
498      * before startSwapin detects the failure.
499      * TODO: fix this inconsistent behaviour - probably by
500      * having storeSwapInStart become a callback functions,
501      * not synchronous
502      */
503 
504     if (_callback.pending())
505         callback(0, true);
506 }
507 
508 static void
storeClientReadHeader(void * data,const char * buf,ssize_t len,StoreIOState::Pointer)509 storeClientReadHeader(void *data, const char *buf, ssize_t len, StoreIOState::Pointer)
510 {
511     store_client *sc = (store_client *)data;
512     sc->readHeader(buf, len);
513 }
514 
515 static void
storeClientReadBody(void * data,const char * buf,ssize_t len,StoreIOState::Pointer)516 storeClientReadBody(void *data, const char *buf, ssize_t len, StoreIOState::Pointer)
517 {
518     store_client *sc = (store_client *)data;
519     sc->readBody(buf, len);
520 }
521 
522 bool
unpackHeader(char const * buf,ssize_t len)523 store_client::unpackHeader(char const *buf, ssize_t len)
524 {
525     debugs(90, 3, "store_client::unpackHeader: len " << len << "");
526     assert(len >= 0);
527 
528     int swap_hdr_sz = 0;
529     tlv *tlv_list = nullptr;
530     try {
531         StoreMetaUnpacker aBuilder(buf, len, &swap_hdr_sz);
532         tlv_list = aBuilder.createStoreMeta();
533     } catch (const std::exception &e) {
534         debugs(90, DBG_IMPORTANT, "WARNING: failed to unpack metadata because " << e.what());
535         return false;
536     }
537     assert(tlv_list);
538 
539     /*
540      * Check the meta data and make sure we got the right object.
541      */
542     for (tlv *t = tlv_list; t; t = t->next) {
543         if (!t->checkConsistency(entry)) {
544             storeSwapTLVFree(tlv_list);
545             return false;
546         }
547     }
548 
549     storeSwapTLVFree(tlv_list);
550 
551     assert(swap_hdr_sz >= 0);
552     entry->mem_obj->swap_hdr_sz = swap_hdr_sz;
553     if (entry->swap_file_sz > 0) { // collapsed hits may not know swap_file_sz
554         assert(entry->swap_file_sz >= static_cast<uint64_t>(swap_hdr_sz));
555         entry->mem_obj->object_sz = entry->swap_file_sz - swap_hdr_sz;
556     }
557     debugs(90, 5, "store_client::unpackHeader: swap_file_sz=" <<
558            entry->swap_file_sz << "( " << swap_hdr_sz << " + " <<
559            entry->mem_obj->object_sz << ")");
560     return true;
561 }
562 
563 void
readHeader(char const * buf,ssize_t len)564 store_client::readHeader(char const *buf, ssize_t len)
565 {
566     MemObject *const mem = entry->mem_obj;
567 
568     assert(flags.disk_io_pending);
569     flags.disk_io_pending = false;
570     assert(_callback.pending());
571 
572     // abort if we fail()'d earlier
573     if (!object_ok)
574         return;
575 
576     if (len < 0)
577         return fail();
578 
579     if (!unpackHeader(buf, len)) {
580         fail();
581         return;
582     }
583 
584     /*
585      * If our last read got some data the client wants, then give
586      * it to them, otherwise schedule another read.
587      */
588     size_t body_sz = len - mem->swap_hdr_sz;
589 
590     if (copyInto.offset < static_cast<int64_t>(body_sz)) {
591         /*
592          * we have (part of) what they want
593          */
594         size_t copy_sz = min(copyInto.length, body_sz);
595         debugs(90, 3, "storeClientReadHeader: copying " << copy_sz << " bytes of body");
596         memmove(copyInto.data, copyInto.data + mem->swap_hdr_sz, copy_sz);
597 
598         readBody(copyInto.data, copy_sz);
599 
600         return;
601     }
602 
603     /*
604      * we don't have what the client wants, but at least we now
605      * know the swap header size.
606      */
607     fileRead();
608 }
609 
610 int
storeClientCopyPending(store_client * sc,StoreEntry * e,void * data)611 storeClientCopyPending(store_client * sc, StoreEntry * e, void *data)
612 {
613 #if STORE_CLIENT_LIST_DEBUG
614     assert(sc == storeClientListSearch(e->mem_obj, data));
615 #endif
616 #ifndef SILLY_CODE
617 
618     assert(sc);
619 #endif
620 
621     assert(sc->entry == e);
622 #if SILLY_CODE
623 
624     if (sc == NULL)
625         return 0;
626 
627 #endif
628 
629     if (!sc->_callback.pending())
630         return 0;
631 
632     return 1;
633 }
634 
635 /*
636  * This routine hasn't been optimised to take advantage of the
637  * passed sc. Yet.
638  */
639 int
storeUnregister(store_client * sc,StoreEntry * e,void * data)640 storeUnregister(store_client * sc, StoreEntry * e, void *data)
641 {
642     MemObject *mem = e->mem_obj;
643 #if STORE_CLIENT_LIST_DEBUG
644 
645     assert(sc == storeClientListSearch(e->mem_obj, data));
646 #endif
647 
648     if (mem == NULL)
649         return 0;
650 
651     debugs(90, 3, "storeUnregister: called for '" << e->getMD5Text() << "'");
652 
653     if (sc == NULL) {
654         debugs(90, 3, "storeUnregister: No matching client for '" << e->getMD5Text() << "'");
655         return 0;
656     }
657 
658     if (mem->clientCount() == 0) {
659         debugs(90, 3, "storeUnregister: Consistency failure - store client being unregistered is not in the mem object's list for '" << e->getMD5Text() << "'");
660         return 0;
661     }
662 
663     dlinkDelete(&sc->node, &mem->clients);
664     -- mem->nclients;
665 
666     const auto swapoutFinished = e->swappedOut() || e->swapoutFailed();
667     if (e->store_status == STORE_OK && !swapoutFinished)
668         e->swapOut();
669 
670     if (sc->swapin_sio != NULL) {
671         storeClose(sc->swapin_sio, StoreIOState::readerDone);
672         sc->swapin_sio = NULL;
673         ++statCounter.swap.ins;
674     }
675 
676     if (sc->_callback.pending()) {
677         /* callback with ssize = -1 to indicate unexpected termination */
678         debugs(90, 3, "store_client for " << *e << " has a callback");
679         sc->fail();
680     }
681 
682 #if STORE_CLIENT_LIST_DEBUG
683     cbdataReferenceDone(sc->owner);
684 
685 #endif
686 
687     delete sc;
688 
689     assert(e->locked());
690     // An entry locked by others may be unlocked (and destructed) by others, so
691     // we must lock again to safely dereference e after CheckQuickAbortIsReasonable().
692     e->lock("storeUnregister");
693 
694     if (CheckQuickAbortIsReasonable(e))
695         e->abort();
696     else
697         mem->kickReads();
698 
699 #if USE_ADAPTATION
700     e->kickProducer();
701 #endif
702 
703     e->unlock("storeUnregister");
704     return 1;
705 }
706 
707 /* Call handlers waiting for  data to be appended to E. */
708 void
invokeHandlers()709 StoreEntry::invokeHandlers()
710 {
711     if (EBIT_TEST(flags, DELAY_SENDING)) {
712         debugs(90, 3, "DELAY_SENDING is on, exiting " << *this);
713         return;
714     }
715     if (EBIT_TEST(flags, ENTRY_FWD_HDR_WAIT)) {
716         debugs(90, 3, "ENTRY_FWD_HDR_WAIT is on, exiting " << *this);
717         return;
718     }
719 
720     /* Commit what we can to disk, if appropriate */
721     swapOut();
722     int i = 0;
723     store_client *sc;
724     dlink_node *nx = NULL;
725     dlink_node *node;
726 
727     PROF_start(InvokeHandlers);
728 
729     debugs(90, 3, "InvokeHandlers: " << getMD5Text()  );
730     /* walk the entire list looking for valid callbacks */
731 
732     for (node = mem_obj->clients.head; node; node = nx) {
733         sc = (store_client *)node->data;
734         nx = node->next;
735         debugs(90, 3, "StoreEntry::InvokeHandlers: checking client #" << i  );
736         ++i;
737 
738         if (!sc->_callback.pending())
739             continue;
740 
741         if (sc->flags.disk_io_pending)
742             continue;
743 
744         storeClientCopy2(this, sc);
745     }
746     PROF_stop(InvokeHandlers);
747 }
748 
749 // Does not account for remote readers/clients.
750 int
storePendingNClients(const StoreEntry * e)751 storePendingNClients(const StoreEntry * e)
752 {
753     MemObject *mem = e->mem_obj;
754     int npend = NULL == mem ? 0 : mem->nclients;
755     debugs(90, 3, "storePendingNClients: returning " << npend);
756     return npend;
757 }
758 
759 /* return true if the request should be aborted */
760 static bool
CheckQuickAbortIsReasonable(StoreEntry * entry)761 CheckQuickAbortIsReasonable(StoreEntry * entry)
762 {
763     assert(entry);
764     debugs(90, 3, "entry=" << *entry);
765 
766     if (storePendingNClients(entry) > 0) {
767         debugs(90, 3, "quick-abort? NO storePendingNClients() > 0");
768         return false;
769     }
770 
771     if (!shutting_down && Store::Root().transientReaders(*entry)) {
772         debugs(90, 3, "quick-abort? NO still have one or more transient readers");
773         return false;
774     }
775 
776     if (entry->store_status != STORE_PENDING) {
777         debugs(90, 3, "quick-abort? NO store_status != STORE_PENDING");
778         return false;
779     }
780 
781     if (EBIT_TEST(entry->flags, ENTRY_SPECIAL)) {
782         debugs(90, 3, "quick-abort? NO ENTRY_SPECIAL");
783         return false;
784     }
785 
786     MemObject * const mem = entry->mem_obj;
787     assert(mem);
788     debugs(90, 3, "mem=" << mem);
789 
790     if (mem->request && !mem->request->flags.cachable) {
791         debugs(90, 3, "quick-abort? YES !mem->request->flags.cachable");
792         return true;
793     }
794 
795     if (EBIT_TEST(entry->flags, KEY_PRIVATE)) {
796         debugs(90, 3, "quick-abort? YES KEY_PRIVATE");
797         return true;
798     }
799 
800     int64_t expectlen = entry->getReply()->content_length + entry->getReply()->hdr_sz;
801 
802     if (expectlen < 0) {
803         /* expectlen is < 0 if *no* information about the object has been received */
804         debugs(90, 3, "quick-abort? YES no object data received yet");
805         return true;
806     }
807 
808     int64_t curlen =  mem->endOffset();
809 
810     if (Config.quickAbort.min < 0) {
811         debugs(90, 3, "quick-abort? NO disabled");
812         return false;
813     }
814 
815     if (mem->request && mem->request->range && mem->request->getRangeOffsetLimit() < 0) {
816         /* Don't abort if the admin has configured range_ofset -1 to download fully for caching. */
817         debugs(90, 3, "quick-abort? NO admin configured range replies to full-download");
818         return false;
819     }
820 
821     if (curlen > expectlen) {
822         debugs(90, 3, "quick-abort? YES bad content length (" << curlen << " of " << expectlen << " bytes received)");
823         return true;
824     }
825 
826     if ((expectlen - curlen) < (Config.quickAbort.min << 10)) {
827         debugs(90, 3, "quick-abort? NO only a little more object left to receive");
828         return false;
829     }
830 
831     if ((expectlen - curlen) > (Config.quickAbort.max << 10)) {
832         debugs(90, 3, "quick-abort? YES too much left to go");
833         return true;
834     }
835 
836     if (expectlen < 100) {
837         debugs(90, 3, "quick-abort? NO avoid FPE");
838         return false;
839     }
840 
841     if ((curlen / (expectlen / 100)) > (Config.quickAbort.pct)) {
842         debugs(90, 3, "quick-abort? NO past point of no return");
843         return false;
844     }
845 
846     debugs(90, 3, "quick-abort? YES default");
847     return true;
848 }
849 
850 void
dumpStats(MemBuf * output,int clientNumber) const851 store_client::dumpStats(MemBuf * output, int clientNumber) const
852 {
853     if (_callback.pending())
854         return;
855 
856     output->appendf("\tClient #%d, %p\n", clientNumber, _callback.callback_data);
857     output->appendf("\t\tcopy_offset: %" PRId64 "\n", copyInto.offset);
858     output->appendf("\t\tcopy_size: %" PRIuSIZE "\n", copyInto.length);
859     output->append("\t\tflags:", 8);
860 
861     if (flags.disk_io_pending)
862         output->append(" disk_io_pending", 16);
863 
864     if (flags.store_copying)
865         output->append(" store_copying", 14);
866 
867     if (flags.copy_event_pending)
868         output->append(" copy_event_pending", 19);
869 
870     output->append("\n",1);
871 }
872 
873 bool
pending() const874 store_client::Callback::pending() const
875 {
876     return callback_handler && callback_data;
877 }
878 
Callback(STCB * function,void * data)879 store_client::Callback::Callback(STCB *function, void *data) : callback_handler(function), callback_data (data) {}
880 
881 #if USE_DELAY_POOLS
882 void
setDelayId(DelayId delay_id)883 store_client::setDelayId(DelayId delay_id)
884 {
885     delayId = delay_id;
886 }
887 #endif
888 
889