1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "P_Cache.h"
25 
26 #include "HttpCacheSM.h" //Added to get the scope of HttpCacheSM object.
27 
28 Action *
open_read(Continuation * cont,const CacheKey * key,CacheFragType type,const char * hostname,int host_len)29 Cache::open_read(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len)
30 {
31   if (!CacheProcessor::IsCacheReady(type)) {
32     cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NOT_READY);
33     return ACTION_RESULT_DONE;
34   }
35   ink_assert(caches[type] == this);
36 
37   Vol *vol = key_to_vol(key, hostname, host_len);
38   Dir result, *last_collision = nullptr;
39   ProxyMutex *mutex = cont->mutex.get();
40   OpenDirEntry *od  = nullptr;
41   CacheVC *c        = nullptr;
42   {
43     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
44     if (!lock.is_locked() || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
45       c = new_CacheVC(cont);
46       SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
47       c->vio.op    = VIO::READ;
48       c->base_stat = cache_read_active_stat;
49       CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
50       c->first_key = c->key = c->earliest_key = *key;
51       c->vol                                  = vol;
52       c->frag_type                            = type;
53       c->od                                   = od;
54     }
55     if (!c) {
56       goto Lmiss;
57     }
58     if (!lock.is_locked()) {
59       CONT_SCHED_LOCK_RETRY(c);
60       return &c->_action;
61     }
62     if (c->od) {
63       goto Lwriter;
64     }
65     c->dir            = result;
66     c->last_collision = last_collision;
67     switch (c->do_read_call(&c->key)) {
68     case EVENT_DONE:
69       return ACTION_RESULT_DONE;
70     case EVENT_RETURN:
71       goto Lcallreturn;
72     default:
73       return &c->_action;
74     }
75   }
76 Lmiss:
77   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
78   cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NO_DOC);
79   return ACTION_RESULT_DONE;
80 Lwriter:
81   SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
82   if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
83     return ACTION_RESULT_DONE;
84   }
85   return &c->_action;
86 Lcallreturn:
87   if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) {
88     return ACTION_RESULT_DONE;
89   }
90   return &c->_action;
91 }
92 
93 Action *
open_read(Continuation * cont,const CacheKey * key,CacheHTTPHdr * request,const OverridableHttpConfigParams * params,CacheFragType type,const char * hostname,int host_len)94 Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, const OverridableHttpConfigParams *params,
95                  CacheFragType type, const char *hostname, int host_len)
96 {
97   if (!CacheProcessor::IsCacheReady(type)) {
98     cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NOT_READY);
99     return ACTION_RESULT_DONE;
100   }
101   ink_assert(caches[type] == this);
102 
103   Vol *vol = key_to_vol(key, hostname, host_len);
104   Dir result, *last_collision = nullptr;
105   ProxyMutex *mutex = cont->mutex.get();
106   OpenDirEntry *od  = nullptr;
107   CacheVC *c        = nullptr;
108 
109   {
110     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
111     if (!lock.is_locked() || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
112       c            = new_CacheVC(cont);
113       c->first_key = c->key = c->earliest_key = *key;
114       c->vol                                  = vol;
115       c->vio.op                               = VIO::READ;
116       c->base_stat                            = cache_read_active_stat;
117       CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
118       c->request.copy_shallow(request);
119       c->frag_type = CACHE_FRAG_TYPE_HTTP;
120       c->params    = params;
121       c->od        = od;
122     }
123     if (!lock.is_locked()) {
124       SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
125       CONT_SCHED_LOCK_RETRY(c);
126       return &c->_action;
127     }
128     if (!c) {
129       goto Lmiss;
130     }
131     if (c->od) {
132       goto Lwriter;
133     }
134     // hit
135     c->dir = c->first_dir = result;
136     c->last_collision     = last_collision;
137     SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
138     switch (c->do_read_call(&c->key)) {
139     case EVENT_DONE:
140       return ACTION_RESULT_DONE;
141     case EVENT_RETURN:
142       goto Lcallreturn;
143     default:
144       return &c->_action;
145     }
146   }
147 Lmiss:
148   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
149   cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NO_DOC);
150   return ACTION_RESULT_DONE;
151 Lwriter:
152   // this is a horrible violation of the interface and should be fixed (FIXME)
153   ((HttpCacheSM *)cont)->set_readwhilewrite_inprogress(true);
154   SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
155   if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
156     return ACTION_RESULT_DONE;
157   }
158   return &c->_action;
159 Lcallreturn:
160   if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) {
161     return ACTION_RESULT_DONE;
162   }
163   return &c->_action;
164 }
165 
166 uint32_t
load_http_info(CacheHTTPInfoVector * info,Doc * doc,RefCountObj * block_ptr)167 CacheVC::load_http_info(CacheHTTPInfoVector *info, Doc *doc, RefCountObj *block_ptr)
168 {
169   uint32_t zret = info->get_handles(doc->hdr(), doc->hlen, block_ptr);
170   if (!this->f.doc_from_ram_cache && // ram cache is always already fixed up.
171                                      // If this is an old object, the object version will be old or 0, in either case this is
172                                      // correct. Forget the 4.2 compatibility, always update older versioned objects.
173       ts::VersionNumber(doc->v_major, doc->v_minor) < CACHE_DB_VERSION) {
174     for (int i = info->xcount - 1; i >= 0; --i) {
175       info->data(i).alternate.m_alt->m_response_hdr.m_mime->recompute_accelerators_and_presence_bits();
176       info->data(i).alternate.m_alt->m_request_hdr.m_mime->recompute_accelerators_and_presence_bits();
177     }
178   }
179   return zret;
180 }
181 
182 int
openReadFromWriterFailure(int event,Event * e)183 CacheVC::openReadFromWriterFailure(int event, Event *e)
184 {
185   od = nullptr;
186   vector.clear(false);
187   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
188   CACHE_INCREMENT_DYN_STAT(cache_read_busy_failure_stat);
189   _action.continuation->handleEvent(event, e);
190   free_CacheVC(this);
191   return EVENT_DONE;
192 }
193 
194 int
openReadChooseWriter(int,Event *)195 CacheVC::openReadChooseWriter(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
196 {
197   intptr_t err = ECACHE_DOC_BUSY;
198   CacheVC *w   = nullptr;
199 
200   ink_assert(vol->mutex->thread_holding == mutex->thread_holding && write_vc == nullptr);
201 
202   if (!od) {
203     return EVENT_RETURN;
204   }
205 
206   if (frag_type != CACHE_FRAG_TYPE_HTTP) {
207     ink_assert(od->num_writers == 1);
208     w = od->writers.head;
209     if (w->start_time > start_time || w->closed < 0) {
210       od = nullptr;
211       return EVENT_RETURN;
212     }
213     if (!w->closed) {
214       return -err;
215     }
216     write_vc = w;
217   } else {
218     write_vector      = &od->vector;
219     int write_vec_cnt = write_vector->count();
220     for (int c = 0; c < write_vec_cnt; c++) {
221       vector.insert(write_vector->get(c));
222     }
223     // check if all the writers who came before this reader have
224     // set the http_info.
225     for (w = static_cast<CacheVC *>(od->writers.head); w; w = static_cast<CacheVC *>(w->opendir_link.next)) {
226       if (w->start_time > start_time || w->closed < 0) {
227         continue;
228       }
229       if (!w->closed && !cache_config_read_while_writer) {
230         return -err;
231       }
232       if (w->alternate_index != CACHE_ALT_INDEX_DEFAULT) {
233         continue;
234       }
235 
236       if (!w->closed && !w->alternate.valid()) {
237         od = nullptr;
238         ink_assert(!write_vc);
239         vector.clear(false);
240         return EVENT_CONT;
241       }
242       // construct the vector from the writers.
243       int alt_ndx = CACHE_ALT_INDEX_DEFAULT;
244       if (w->f.update) {
245         // all Update cases. Need to get the alternate index.
246         alt_ndx = get_alternate_index(&vector, w->update_key);
247         // if its an alternate delete
248         if (!w->alternate.valid()) {
249           if (alt_ndx >= 0) {
250             vector.remove(alt_ndx, false);
251           }
252           continue;
253         }
254       }
255       if (w->alternate.valid()) {
256         vector.insert(&w->alternate, alt_ndx);
257       }
258     }
259 
260     if (!vector.count()) {
261       if (od->reading_vec) {
262         // the writer(s) are reading the vector, so there is probably
263         // an old vector. Since this reader came before any of the
264         // current writers, we should return the old data
265         od = nullptr;
266         return EVENT_RETURN;
267       }
268       return -ECACHE_NO_DOC;
269     }
270     if (cache_config_select_alternate) {
271       alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
272       if (alternate_index < 0) {
273         return -ECACHE_ALT_MISS;
274       }
275     } else {
276       alternate_index = 0;
277     }
278     CacheHTTPInfo *obj = vector.get(alternate_index);
279     for (w = static_cast<CacheVC *>(od->writers.head); w; w = static_cast<CacheVC *>(w->opendir_link.next)) {
280       if (obj->m_alt == w->alternate.m_alt) {
281         write_vc = w;
282         break;
283       }
284     }
285     vector.clear(false);
286     if (!write_vc) {
287       DDebug("cache_read_agg", "%p: key: %X writer alternate different: %d", this, first_key.slice32(1), alternate_index);
288       od = nullptr;
289       return EVENT_RETURN;
290     }
291 
292     DDebug("cache_read_agg", "%p: key: %X eKey: %d # alts: %d, ndx: %d, # writers: %d writer: %p", this, first_key.slice32(1),
293            write_vc->earliest_key.slice32(1), vector.count(), alternate_index, od->num_writers, write_vc);
294   }
295   return EVENT_NONE;
296 }
297 
298 int
openReadFromWriter(int event,Event * e)299 CacheVC::openReadFromWriter(int event, Event *e)
300 {
301   if (!f.read_from_writer_called) {
302     // The assignment to last_collision as nullptr was
303     // made conditional after INKqa08411
304     last_collision = nullptr;
305     // Let's restart the clock from here - the first time this a reader
306     // gets in this state. Its possible that the open_read was called
307     // before the open_write, but the reader could not get the volume
308     // lock. If we don't reset the clock here, we won't choose any writer
309     // and hence fail the read request.
310     start_time                = Thread::get_hrtime();
311     f.read_from_writer_called = 1;
312   }
313   cancel_trigger();
314   intptr_t err = ECACHE_DOC_BUSY;
315   DDebug("cache_read_agg", "%p: key: %X In openReadFromWriter", this, first_key.slice32(1));
316   if (_action.cancelled) {
317     od = nullptr; // only open for read so no need to close
318     return free_CacheVC(this);
319   }
320   CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
321   if (!lock.is_locked()) {
322     VC_SCHED_LOCK_RETRY();
323   }
324   od = vol->open_read(&first_key); // recheck in case the lock failed
325   if (!od) {
326     MUTEX_RELEASE(lock);
327     write_vc = nullptr;
328     SET_HANDLER(&CacheVC::openReadStartHead);
329     return openReadStartHead(event, e);
330   } else {
331     ink_assert(od == vol->open_read(&first_key));
332   }
333   if (!write_vc) {
334     int ret = openReadChooseWriter(event, e);
335     if (ret < 0) {
336       MUTEX_RELEASE(lock);
337       SET_HANDLER(&CacheVC::openReadFromWriterFailure);
338       return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<Event *>(ret));
339     } else if (ret == EVENT_RETURN) {
340       MUTEX_RELEASE(lock);
341       SET_HANDLER(&CacheVC::openReadStartHead);
342       return openReadStartHead(event, e);
343     } else if (ret == EVENT_CONT) {
344       ink_assert(!write_vc);
345       if (writer_lock_retry < cache_config_read_while_writer_max_retries) {
346         VC_SCHED_WRITER_RETRY();
347       } else {
348         return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
349       }
350     } else {
351       ink_assert(write_vc);
352     }
353   } else {
354     if (writer_done()) {
355       MUTEX_RELEASE(lock);
356       DDebug("cache_read_agg", "%p: key: %X writer %p has left, continuing as normal read", this, first_key.slice32(1), write_vc);
357       od       = nullptr;
358       write_vc = nullptr;
359       SET_HANDLER(&CacheVC::openReadStartHead);
360       return openReadStartHead(event, e);
361     }
362   }
363   OpenDirEntry *cod = od;
364   od                = nullptr;
365   // someone is currently writing the document
366   if (write_vc->closed < 0) {
367     MUTEX_RELEASE(lock);
368     write_vc = nullptr;
369     // writer aborted, continue as if there is no writer
370     SET_HANDLER(&CacheVC::openReadStartHead);
371     return openReadStartHead(EVENT_IMMEDIATE, nullptr);
372   }
373   // allow reading from unclosed writer for http requests only.
374   ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP || write_vc->closed);
375   if (!write_vc->closed && !write_vc->fragment) {
376     if (!cache_config_read_while_writer || frag_type != CACHE_FRAG_TYPE_HTTP ||
377         writer_lock_retry >= cache_config_read_while_writer_max_retries) {
378       MUTEX_RELEASE(lock);
379       return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
380     }
381     DDebug("cache_read_agg", "%p: key: %X writer: closed:%d, fragment:%d, retry: %d", this, first_key.slice32(1), write_vc->closed,
382            write_vc->fragment, writer_lock_retry);
383     VC_SCHED_WRITER_RETRY();
384   }
385 
386   CACHE_TRY_LOCK(writer_lock, write_vc->mutex, mutex->thread_holding);
387   if (!writer_lock.is_locked()) {
388     DDebug("cache_read_agg", "%p: key: %X lock miss", this, first_key.slice32(1));
389     VC_SCHED_LOCK_RETRY();
390   }
391   MUTEX_RELEASE(lock);
392 
393   if (!write_vc->io.ok()) {
394     return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
395   }
396   if (frag_type == CACHE_FRAG_TYPE_HTTP) {
397     DDebug("cache_read_agg", "%p: key: %X http passed stage 1, closed: %d, frag: %d", this, first_key.slice32(1), write_vc->closed,
398            write_vc->fragment);
399     if (!write_vc->alternate.valid()) {
400       return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
401     }
402     alternate.copy(&write_vc->alternate);
403     vector.insert(&alternate);
404     alternate.object_key_get(&key);
405     write_vc->f.readers = 1;
406     if (!(write_vc->f.update && write_vc->total_len == 0)) {
407       key = write_vc->earliest_key;
408       if (!write_vc->closed) {
409         alternate.object_size_set(write_vc->vio.nbytes);
410       } else {
411         alternate.object_size_set(write_vc->total_len);
412       }
413     } else {
414       key = write_vc->update_key;
415       ink_assert(write_vc->closed);
416       DDebug("cache_read_agg", "%p: key: %X writer header update", this, first_key.slice32(1));
417       // Update case (b) : grab doc_len from the writer's alternate
418       doc_len = alternate.object_size_get();
419       if (write_vc->update_key == cod->single_doc_key && (cod->move_resident_alt || write_vc->f.rewrite_resident_alt) &&
420           write_vc->first_buf.get()) {
421         // the resident alternate is being updated and its a
422         // header only update. The first_buf of the writer has the
423         // document body.
424         Doc *doc   = reinterpret_cast<Doc *>(write_vc->first_buf->data());
425         writer_buf = new_IOBufferBlock(write_vc->first_buf, doc->data_len(), doc->prefix_len());
426         MUTEX_RELEASE(writer_lock);
427         ink_assert(doc_len == doc->data_len());
428         length            = doc_len;
429         f.single_fragment = 1;
430         doc_pos           = 0;
431         earliest_key      = key;
432         dir_clean(&first_dir);
433         dir_clean(&earliest_dir);
434         SET_HANDLER(&CacheVC::openReadFromWriterMain);
435         CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
436         return callcont(CACHE_EVENT_OPEN_READ);
437       }
438       // want to snarf the new headers from the writer
439       // and then continue as if nothing happened
440       last_collision = nullptr;
441       MUTEX_RELEASE(writer_lock);
442       SET_HANDLER(&CacheVC::openReadStartEarliest);
443       return openReadStartEarliest(event, e);
444     }
445   } else {
446     DDebug("cache_read_agg", "%p: key: %X non-http passed stage 1", this, first_key.slice32(1));
447     key = write_vc->earliest_key;
448   }
449   if (write_vc->fragment) {
450     doc_len        = write_vc->vio.nbytes;
451     last_collision = nullptr;
452     DDebug("cache_read_agg", "%p: key: %X closed: %d, fragment: %d, len: %d starting first fragment", this, first_key.slice32(1),
453            write_vc->closed, write_vc->fragment, (int)doc_len);
454     MUTEX_RELEASE(writer_lock);
455     // either a header + body update or a new document
456     SET_HANDLER(&CacheVC::openReadStartEarliest);
457     return openReadStartEarliest(event, e);
458   }
459   writer_buf    = write_vc->blocks;
460   writer_offset = write_vc->offset;
461   length        = write_vc->length;
462   // copy the vector
463   f.single_fragment = !write_vc->fragment; // single fragment doc
464   doc_pos           = 0;
465   earliest_key      = write_vc->earliest_key;
466   ink_assert(earliest_key == key);
467   doc_len = write_vc->total_len;
468   dir_clean(&first_dir);
469   dir_clean(&earliest_dir);
470   DDebug("cache_read_agg", "%p: key: %X %X: single fragment read", this, first_key.slice32(1), key.slice32(0));
471   MUTEX_RELEASE(writer_lock);
472   SET_HANDLER(&CacheVC::openReadFromWriterMain);
473   CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
474   return callcont(CACHE_EVENT_OPEN_READ);
475 }
476 
477 int
openReadFromWriterMain(int,Event *)478 CacheVC::openReadFromWriterMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
479 {
480   cancel_trigger();
481   if (seek_to) {
482     vio.ndone = seek_to;
483     seek_to   = 0;
484   }
485   IOBufferBlock *b = nullptr;
486   int64_t ntodo    = vio.ntodo();
487   if (ntodo <= 0) {
488     return EVENT_CONT;
489   }
490   if (length < (static_cast<int64_t>(doc_len)) - vio.ndone) {
491     DDebug("cache_read_agg", "truncation %X", first_key.slice32(1));
492     if (is_action_tag_set("cache")) {
493       ink_release_assert(false);
494     }
495     Warning("Document %X truncated at %d of %d, reading from writer", first_key.slice32(1), (int)vio.ndone, (int)doc_len);
496     return calluser(VC_EVENT_ERROR);
497   }
498   /* its possible that the user did a do_io_close before
499      openWriteWriteDone was called. */
500   if (length > (static_cast<int64_t>(doc_len)) - vio.ndone) {
501     int64_t skip_bytes = length - (doc_len - vio.ndone);
502     iobufferblock_skip(writer_buf.get(), &writer_offset, &length, skip_bytes);
503   }
504   int64_t bytes = length;
505   if (bytes > vio.ntodo()) {
506     bytes = vio.ntodo();
507   }
508   if (vio.ndone >= static_cast<int64_t>(doc_len)) {
509     ink_assert(bytes <= 0);
510     // reached the end of the document and the user still wants more
511     return calluser(VC_EVENT_EOS);
512   }
513   b          = iobufferblock_clone(writer_buf.get(), writer_offset, bytes);
514   writer_buf = iobufferblock_skip(writer_buf.get(), &writer_offset, &length, bytes);
515   vio.buffer.writer()->append_block(b);
516   vio.ndone += bytes;
517   if (vio.ntodo() <= 0) {
518     return calluser(VC_EVENT_READ_COMPLETE);
519   } else {
520     return calluser(VC_EVENT_READ_READY);
521   }
522 }
523 
524 int
openReadClose(int event,Event *)525 CacheVC::openReadClose(int event, Event * /* e ATS_UNUSED */)
526 {
527   cancel_trigger();
528   if (is_io_in_progress()) {
529     if (event != AIO_EVENT_DONE) {
530       return EVENT_CONT;
531     }
532     set_io_not_in_progress();
533   }
534   CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
535   if (!lock.is_locked()) {
536     VC_SCHED_LOCK_RETRY();
537   }
538   if (f.hit_evacuate && dir_valid(vol, &first_dir) && closed > 0) {
539     if (f.single_fragment) {
540       vol->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
541     } else if (dir_valid(vol, &earliest_dir)) {
542       vol->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
543       vol->force_evacuate_head(&earliest_dir, dir_pinned(&earliest_dir));
544     }
545   }
546   vol->close_read(this);
547   return free_CacheVC(this);
548 }
549 
550 int
openReadReadDone(int event,Event * e)551 CacheVC::openReadReadDone(int event, Event *e)
552 {
553   Doc *doc = nullptr;
554 
555   cancel_trigger();
556   if (event == EVENT_IMMEDIATE) {
557     return EVENT_CONT;
558   }
559   set_io_not_in_progress();
560   {
561     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
562     if (!lock.is_locked()) {
563       VC_SCHED_LOCK_RETRY();
564     }
565     if (event == AIO_EVENT_DONE && !io.ok()) {
566       goto Lerror;
567     }
568     if (last_collision &&     // no missed lock
569         dir_valid(vol, &dir)) // object still valid
570     {
571       doc = reinterpret_cast<Doc *>(buf->data());
572       if (doc->magic != DOC_MAGIC) {
573         char tmpstring[CRYPTO_HEX_SIZE];
574         if (doc->magic == DOC_CORRUPT) {
575           Warning("Middle: Doc checksum does not match for %s", key.toHexStr(tmpstring));
576         } else {
577           Warning("Middle: Doc magic does not match for %s", key.toHexStr(tmpstring));
578         }
579         goto Lerror;
580       }
581       if (doc->key == key) {
582         goto LreadMain;
583       }
584     }
585     if (last_collision && dir_offset(&dir) != dir_offset(last_collision)) {
586       last_collision = nullptr; // object has been/is being overwritten
587     }
588     if (dir_probe(&key, vol, &dir, &last_collision)) {
589       int ret = do_read_call(&key);
590       if (ret == EVENT_RETURN) {
591         goto Lcallreturn;
592       }
593       return EVENT_CONT;
594     } else if (write_vc) {
595       if (writer_done()) {
596         last_collision = nullptr;
597         while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
598           if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
599             DDebug("cache_read_agg", "%p: key: %X ReadRead complete: %d", this, first_key.slice32(1), (int)vio.ndone);
600             doc_len = vio.ndone;
601             goto Ldone;
602           }
603         }
604         DDebug("cache_read_agg", "%p: key: %X ReadRead writer aborted: %d", this, first_key.slice32(1), (int)vio.ndone);
605         goto Lerror;
606       }
607       if (writer_lock_retry < cache_config_read_while_writer_max_retries) {
608         DDebug("cache_read_agg", "%p: key: %X ReadRead retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
609         VC_SCHED_WRITER_RETRY(); // wait for writer
610       } else {
611         DDebug("cache_read_agg", "%p: key: %X ReadRead retries exhausted, bailing..: %d", this, first_key.slice32(1),
612                (int)vio.ndone);
613         goto Ldone;
614       }
615     }
616     // fall through for truncated documents
617   }
618 Lerror : {
619   char tmpstring[CRYPTO_HEX_SIZE];
620   if (request.valid()) {
621     int url_length;
622     const char *url_text = request.url_get()->string_get_ref(&url_length);
623     Warning("Document %s truncated, url[%.*s] .. clearing", earliest_key.toHexStr(tmpstring), url_length, url_text);
624   } else {
625     Warning("Document %s truncated .. clearing", earliest_key.toHexStr(tmpstring));
626   }
627   dir_delete(&earliest_key, vol, &earliest_dir);
628   return calluser(VC_EVENT_ERROR);
629 }
630 Ldone:
631   return calluser(VC_EVENT_EOS);
632 Lcallreturn:
633   return handleEvent(AIO_EVENT_DONE, nullptr);
634 LreadMain:
635   fragment++;
636   doc_pos = doc->prefix_len();
637   next_CacheKey(&key, &key);
638   SET_HANDLER(&CacheVC::openReadMain);
639   return openReadMain(event, e);
640 }
641 
642 int
openReadMain(int,Event *)643 CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
644 {
645   cancel_trigger();
646   Doc *doc         = reinterpret_cast<Doc *>(buf->data());
647   int64_t ntodo    = vio.ntodo();
648   int64_t bytes    = doc->len - doc_pos;
649   IOBufferBlock *b = nullptr;
650   if (seek_to) { // handle do_io_pread
651     if (seek_to >= doc_len) {
652       vio.ndone = doc_len;
653       return calluser(VC_EVENT_EOS);
654     }
655     HTTPInfo::FragOffset *frags = alternate.get_frag_table();
656     if (is_debug_tag_set("cache_seek")) {
657       char b[CRYPTO_HEX_SIZE], c[CRYPTO_HEX_SIZE];
658       Debug("cache_seek", "Seek @ %" PRId64 " in %s from #%d @ %" PRId64 "/%d:%s", seek_to, first_key.toHexStr(b), fragment,
659             doc_pos, doc->len, doc->key.toHexStr(c));
660     }
661     /* Because single fragment objects can migrate to hang off an alt vector
662        they can appear to the VC as multi-fragment when they are not really.
663        The essential difference is the existence of a fragment table.
664     */
665     if (frags) {
666       int target                    = 0;
667       HTTPInfo::FragOffset next_off = frags[target];
668       int lfi                       = static_cast<int>(alternate.get_frag_offset_count()) - 1;
669       ink_assert(lfi >= 0); // because it's not a single frag doc.
670 
671       /* Note: frag[i].offset is the offset of the first byte past the
672          i'th fragment. So frag[0].offset is the offset of the first
673          byte of fragment 1. In addition the # of fragments is one
674          more than the fragment table length, the start of the last
675          fragment being the last offset in the table.
676       */
677       if (fragment == 0 || seek_to < frags[fragment - 1] || (fragment <= lfi && frags[fragment] <= seek_to)) {
678         // search from frag 0 on to find the proper frag
679         while (seek_to >= next_off && target < lfi) {
680           next_off = frags[++target];
681         }
682         if (target == lfi && seek_to >= next_off) {
683           ++target;
684         }
685       } else { // shortcut if we are in the fragment already
686         target = fragment;
687       }
688       if (target != fragment) {
689         // Lread will read the next fragment always, so if that
690         // is the one we want, we don't need to do anything
691         int cfi = fragment;
692         --target;
693         while (target > fragment) {
694           next_CacheKey(&key, &key);
695           ++fragment;
696         }
697         while (target < fragment) {
698           prev_CacheKey(&key, &key);
699           --fragment;
700         }
701 
702         if (is_debug_tag_set("cache_seek")) {
703           char target_key_str[CRYPTO_HEX_SIZE];
704           key.toHexStr(target_key_str);
705           Debug("cache_seek", "Seek #%d @ %" PRId64 " -> #%d @ %" PRId64 ":%s", cfi, doc_pos, target, seek_to, target_key_str);
706         }
707         goto Lread;
708       }
709     }
710     doc_pos = doc->prefix_len() + seek_to;
711     if (fragment && frags) {
712       doc_pos -= static_cast<int64_t>(frags[fragment - 1]);
713     }
714     vio.ndone = 0;
715     seek_to   = 0;
716     ntodo     = vio.ntodo();
717     bytes     = doc->len - doc_pos;
718     if (is_debug_tag_set("cache_seek")) {
719       char target_key_str[CRYPTO_HEX_SIZE];
720       Debug("cache_seek", "Read # %d @ %" PRId64 "/%d for %" PRId64 " %s", fragment, doc_pos, doc->len, bytes,
721             key.toHexStr(target_key_str));
722     }
723 
724     // This shouldn't happen for HTTP assets but it does
725     // occasionally in production. This is a temporary fix
726     // to clean up broken objects until the root cause can
727     // be found. It must be the case that either the fragment
728     // offsets are incorrect or a fragment table isn't being
729     // created when it should be.
730     if (frag_type == CACHE_FRAG_TYPE_HTTP && bytes < 0) {
731       char xt[CRYPTO_HEX_SIZE];
732       char yt[CRYPTO_HEX_SIZE];
733 
734       int url_length       = 0;
735       char const *url_text = nullptr;
736       if (request.valid()) {
737         url_text = request.url_get()->string_get_ref(&url_length);
738       }
739 
740       int64_t prev_frag_size = 0;
741       if (fragment && frags) {
742         prev_frag_size = static_cast<int64_t>(frags[fragment - 1]);
743       }
744 
745       Warning("cache_seek range request bug: read %s targ %s - %s frag # %d (prev_frag %" PRId64 ") @ %" PRId64 "/%d for %" PRId64
746               " tot %" PRId64 " url '%.*s'",
747               doc->key.toHexStr(xt), key.toHexStr(yt), f.single_fragment ? "single" : "multi", fragment, prev_frag_size, doc_pos,
748               doc->len, bytes, doc->total_len, url_length, url_text);
749 
750       doc->magic = DOC_CORRUPT;
751 
752       CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
753       if (!lock.is_locked()) {
754         SET_HANDLER(&CacheVC::openReadDirDelete);
755         VC_SCHED_LOCK_RETRY();
756       }
757 
758       dir_delete(&earliest_key, vol, &earliest_dir);
759       goto Lerror;
760     }
761   }
762   if (ntodo <= 0) {
763     return EVENT_CONT;
764   }
765   if (vio.buffer.writer()->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) { // initiate read of first block
766     return EVENT_CONT;
767   }
768   if ((bytes <= 0) && vio.ntodo() >= 0) {
769     goto Lread;
770   }
771   if (bytes > vio.ntodo()) {
772     bytes = vio.ntodo();
773   }
774   b           = new_IOBufferBlock(buf, bytes, doc_pos);
775   b->_buf_end = b->_end;
776   vio.buffer.writer()->append_block(b);
777   vio.ndone += bytes;
778   doc_pos += bytes;
779   if (vio.ntodo() <= 0) {
780     return calluser(VC_EVENT_READ_COMPLETE);
781   } else {
782     if (calluser(VC_EVENT_READ_READY) == EVENT_DONE) {
783       return EVENT_DONE;
784     }
785     // we have to keep reading until we give the user all the
786     // bytes it wanted or we hit the watermark.
787     if (vio.ntodo() > 0 && !vio.buffer.writer()->high_water()) {
788       goto Lread;
789     }
790     return EVENT_CONT;
791   }
792 Lread : {
793   if (vio.ndone >= static_cast<int64_t>(doc_len)) {
794     // reached the end of the document and the user still wants more
795     return calluser(VC_EVENT_EOS);
796   }
797   last_collision    = nullptr;
798   writer_lock_retry = 0;
799   // if the state machine calls reenable on the callback from the cache,
800   // we set up a schedule_imm event. The openReadReadDone discards
801   // EVENT_IMMEDIATE events. So, we have to cancel that trigger and set
802   // a new EVENT_INTERVAL event.
803   cancel_trigger();
804   CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
805   if (!lock.is_locked()) {
806     SET_HANDLER(&CacheVC::openReadMain);
807     VC_SCHED_LOCK_RETRY();
808   }
809   if (dir_probe(&key, vol, &dir, &last_collision)) {
810     SET_HANDLER(&CacheVC::openReadReadDone);
811     int ret = do_read_call(&key);
812     if (ret == EVENT_RETURN) {
813       goto Lcallreturn;
814     }
815     return EVENT_CONT;
816   } else if (write_vc) {
817     if (writer_done()) {
818       last_collision = nullptr;
819       while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
820         if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
821           DDebug("cache_read_agg", "%p: key: %X ReadMain complete: %d", this, first_key.slice32(1), (int)vio.ndone);
822           doc_len = vio.ndone;
823           goto Leos;
824         }
825       }
826       DDebug("cache_read_agg", "%p: key: %X ReadMain writer aborted: %d", this, first_key.slice32(1), (int)vio.ndone);
827       goto Lerror;
828     }
829     DDebug("cache_read_agg", "%p: key: %X ReadMain retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
830     SET_HANDLER(&CacheVC::openReadMain);
831     VC_SCHED_WRITER_RETRY();
832   }
833   if (is_action_tag_set("cache")) {
834     ink_release_assert(false);
835   }
836   Warning("Document %X truncated at %d of %d, missing fragment %X", first_key.slice32(1), (int)vio.ndone, (int)doc_len,
837           key.slice32(1));
838   // remove the directory entry
839   dir_delete(&earliest_key, vol, &earliest_dir);
840 }
841 Lerror:
842   return calluser(VC_EVENT_ERROR);
843 Leos:
844   return calluser(VC_EVENT_EOS);
845 Lcallreturn:
846   return handleEvent(AIO_EVENT_DONE, nullptr);
847 }
848 
849 /*
850   This code follows CacheVC::openReadStartHead closely,
851   if you change this you might have to change that.
852 */
853 int
openReadStartEarliest(int,Event *)854 CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
855 {
856   int ret  = 0;
857   Doc *doc = nullptr;
858   cancel_trigger();
859   set_io_not_in_progress();
860   if (_action.cancelled) {
861     return free_CacheVC(this);
862   }
863   {
864     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
865     if (!lock.is_locked()) {
866       VC_SCHED_LOCK_RETRY();
867     }
868     if (!buf) {
869       goto Lread;
870     }
871     if (!io.ok()) {
872       goto Ldone;
873     }
874     // an object needs to be outside the aggregation window in order to be
875     // be evacuated as it is read
876     if (!dir_agg_valid(vol, &dir)) {
877       // a directory entry which is no longer valid may have been overwritten
878       if (!dir_valid(vol, &dir)) {
879         last_collision = nullptr;
880       }
881       goto Lread;
882     }
883     doc = reinterpret_cast<Doc *>(buf->data());
884     if (doc->magic != DOC_MAGIC) {
885       char tmpstring[CRYPTO_HEX_SIZE];
886       if (is_action_tag_set("cache")) {
887         ink_release_assert(false);
888       }
889       if (doc->magic == DOC_CORRUPT) {
890         Warning("Earliest: Doc checksum does not match for %s", key.toHexStr(tmpstring));
891       } else {
892         Warning("Earliest : Doc magic does not match for %s", key.toHexStr(tmpstring));
893       }
894       // remove the dir entry
895       dir_delete(&key, vol, &dir);
896       // try going through the directory entries again
897       // in case the dir entry we deleted doesnt correspond
898       // to the key we are looking for. This is possible
899       // because of directory collisions
900       last_collision = nullptr;
901       goto Lread;
902     }
903     if (!(doc->key == key)) { // collision
904       goto Lread;
905     }
906     // success
907     earliest_key = key;
908     doc_pos      = doc->prefix_len();
909     next_CacheKey(&key, &doc->key);
910     vol->begin_read(this);
911     if (vol->within_hit_evacuate_window(&earliest_dir) &&
912         (!cache_config_hit_evacuate_size_limit || doc_len <= static_cast<uint64_t>(cache_config_hit_evacuate_size_limit))) {
913       DDebug("cache_hit_evac", "dir: %" PRId64 ", write: %" PRId64 ", phase: %d", dir_offset(&earliest_dir),
914              vol->offset_to_vol_offset(vol->header->write_pos), vol->header->phase);
915       f.hit_evacuate = 1;
916     }
917     goto Lsuccess;
918   Lread:
919     if (dir_probe(&key, vol, &earliest_dir, &last_collision) || dir_lookaside_probe(&key, vol, &earliest_dir, nullptr)) {
920       dir = earliest_dir;
921       if ((ret = do_read_call(&key)) == EVENT_RETURN) {
922         goto Lcallreturn;
923       }
924       return ret;
925     }
926     // read has detected that alternate does not exist in the cache.
927     // rewrite the vector.
928     if (!f.read_from_writer_called && frag_type == CACHE_FRAG_TYPE_HTTP) {
929       // don't want any writers while we are evacuating the vector
930       if (!vol->open_write(this, false, 1)) {
931         Doc *doc1    = reinterpret_cast<Doc *>(first_buf->data());
932         uint32_t len = this->load_http_info(write_vector, doc1);
933         ink_assert(len == doc1->hlen && write_vector->count() > 0);
934         write_vector->remove(alternate_index, true);
935         // if the vector had one alternate, delete it's directory entry
936         if (len != doc1->hlen || !write_vector->count()) {
937           // sometimes the delete fails when there is a race and another read
938           // finds that the directory entry has been overwritten
939           // (cannot assert on the return value)
940           dir_delete(&first_key, vol, &first_dir);
941         } else {
942           buf             = nullptr;
943           last_collision  = nullptr;
944           write_len       = 0;
945           header_len      = write_vector->marshal_length();
946           f.evac_vector   = 1;
947           f.use_first_key = 1;
948           key             = first_key;
949           // always use od->first_dir to overwrite a directory.
950           // If an evacuation happens while a vector is being updated
951           // the evacuator changes the od->first_dir to the new directory
952           // that it inserted
953           od->first_dir   = first_dir;
954           od->writing_vec = true;
955           earliest_key    = zero_key;
956 
957           // set up this VC as a alternate delete write_vc
958           vio.op          = VIO::WRITE;
959           total_len       = 0;
960           f.update        = 1;
961           alternate_index = CACHE_ALT_REMOVED;
962           /////////////////////////////////////////////////////////////////
963           // change to create a directory entry for a resident alternate //
964           // when another alternate does not exist.                      //
965           /////////////////////////////////////////////////////////////////
966           if (doc1->total_len > 0) {
967             od->move_resident_alt = true;
968             od->single_doc_key    = doc1->key;
969             dir_assign(&od->single_doc_dir, &dir);
970             dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
971           }
972           SET_HANDLER(&CacheVC::openReadVecWrite);
973           if ((ret = do_write_call()) == EVENT_RETURN) {
974             goto Lcallreturn;
975           }
976           return ret;
977         }
978       }
979     }
980   // open write failure - another writer, so don't modify the vector
981   Ldone:
982     if (od) {
983       vol->close_write(this);
984     }
985   }
986   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
987   _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NO_DOC);
988   return free_CacheVC(this);
989 Lcallreturn:
990   return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call
991 Lsuccess:
992   if (write_vc) {
993     CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
994   }
995   SET_HANDLER(&CacheVC::openReadMain);
996   return callcont(CACHE_EVENT_OPEN_READ);
997 }
998 
999 // create the directory entry after the vector has been evacuated
1000 // the volume lock has been taken when this function is called
1001 int
openReadVecWrite(int,Event *)1002 CacheVC::openReadVecWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
1003 {
1004   cancel_trigger();
1005   set_io_not_in_progress();
1006   ink_assert(od);
1007   od->writing_vec = false;
1008   if (_action.cancelled) {
1009     return openWriteCloseDir(EVENT_IMMEDIATE, nullptr);
1010   }
1011   {
1012     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
1013     if (!lock.is_locked()) {
1014       VC_SCHED_LOCK_RETRY();
1015     }
1016     if (io.ok()) {
1017       ink_assert(f.evac_vector);
1018       ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP);
1019       ink_assert(!buf);
1020       f.evac_vector   = false;
1021       last_collision  = nullptr;
1022       f.update        = 0;
1023       alternate_index = CACHE_ALT_INDEX_DEFAULT;
1024       f.use_first_key = 0;
1025       vio.op          = VIO::READ;
1026       dir_overwrite(&first_key, vol, &dir, &od->first_dir);
1027       if (od->move_resident_alt) {
1028         dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
1029       }
1030       int alt_ndx = HttpTransactCache::SelectFromAlternates(write_vector, &request, params);
1031       vol->close_write(this);
1032       if (alt_ndx >= 0) {
1033         vector.clear();
1034         // we don't need to start all over again, since we already
1035         // have the vector in memory. But this is simpler and this
1036         // case is rare.
1037         goto Lrestart;
1038       }
1039     } else {
1040       vol->close_write(this);
1041     }
1042   }
1043 
1044   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
1045   _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_ALT_MISS);
1046   return free_CacheVC(this);
1047 Lrestart:
1048   SET_HANDLER(&CacheVC::openReadStartHead);
1049   return openReadStartHead(EVENT_IMMEDIATE, nullptr);
1050 }
1051 
1052 /*
1053   This code follows CacheVC::openReadStartEarliest closely,
1054   if you change this you might have to change that.
1055 */
1056 int
openReadStartHead(int event,Event * e)1057 CacheVC::openReadStartHead(int event, Event *e)
1058 {
1059   intptr_t err = ECACHE_NO_DOC;
1060   Doc *doc     = nullptr;
1061   cancel_trigger();
1062   set_io_not_in_progress();
1063   if (_action.cancelled) {
1064     return free_CacheVC(this);
1065   }
1066   {
1067     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
1068     if (!lock.is_locked()) {
1069       VC_SCHED_LOCK_RETRY();
1070     }
1071     if (!buf) {
1072       goto Lread;
1073     }
1074     if (!io.ok()) {
1075       goto Ldone;
1076     }
1077     // an object needs to be outside the aggregation window in order to be
1078     // be evacuated as it is read
1079     if (!dir_agg_valid(vol, &dir)) {
1080       // a directory entry which is no longer valid may have been overwritten
1081       if (!dir_valid(vol, &dir)) {
1082         last_collision = nullptr;
1083       }
1084       goto Lread;
1085     }
1086     doc = reinterpret_cast<Doc *>(buf->data());
1087     if (doc->magic != DOC_MAGIC) {
1088       char tmpstring[CRYPTO_HEX_SIZE];
1089       if (is_action_tag_set("cache")) {
1090         ink_release_assert(false);
1091       }
1092       if (doc->magic == DOC_CORRUPT) {
1093         Warning("Head: Doc checksum does not match for %s", key.toHexStr(tmpstring));
1094       } else {
1095         Warning("Head : Doc magic does not match for %s", key.toHexStr(tmpstring));
1096       }
1097       // remove the dir entry
1098       dir_delete(&key, vol, &dir);
1099       // try going through the directory entries again
1100       // in case the dir entry we deleted doesnt correspond
1101       // to the key we are looking for. This is possible
1102       // because of directory collisions
1103       last_collision = nullptr;
1104       goto Lread;
1105     }
1106     if (!(doc->first_key == key)) {
1107       goto Lread;
1108     }
1109     if (f.lookup) {
1110       goto Lookup;
1111     }
1112     earliest_dir = dir;
1113     CacheHTTPInfo *alternate_tmp;
1114     if (frag_type == CACHE_FRAG_TYPE_HTTP) {
1115       uint32_t uml;
1116       ink_assert(doc->hlen);
1117       if (!doc->hlen) {
1118         goto Ldone;
1119       }
1120       if ((uml = this->load_http_info(&vector, doc)) != doc->hlen) {
1121         if (buf) {
1122           HTTPCacheAlt *alt  = reinterpret_cast<HTTPCacheAlt *>(doc->hdr());
1123           int32_t alt_length = 0;
1124           // count should be reasonable, as vector is initialized and unlikely to be too corrupted
1125           // by bad disk data - count should be the number of successfully unmarshalled alts.
1126           for (int32_t i = 0; i < vector.count(); ++i) {
1127             CacheHTTPInfo *info = vector.get(i);
1128             if (info && info->m_alt) {
1129               alt_length += info->m_alt->m_unmarshal_len;
1130             }
1131           }
1132           Note("OpenReadHead failed for cachekey %X : vector inconsistency - "
1133                "unmarshalled %d expecting %d in %d (base=%zu, ver=%d:%d) "
1134                "- vector n=%d size=%d"
1135                "first alt=%d[%s]",
1136                key.slice32(0), uml, doc->hlen, doc->len, sizeof(Doc), doc->v_major, doc->v_minor, vector.count(), alt_length,
1137                alt->m_magic,
1138                (CACHE_ALT_MAGIC_ALIVE == alt->m_magic ?
1139                   "alive" :
1140                   CACHE_ALT_MAGIC_MARSHALED == alt->m_magic ? "serial" : CACHE_ALT_MAGIC_DEAD == alt->m_magic ? "dead" : "bogus"));
1141           dir_delete(&key, vol, &dir);
1142         }
1143         err = ECACHE_BAD_META_DATA;
1144         goto Ldone;
1145       }
1146       if (cache_config_select_alternate) {
1147         alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
1148         if (alternate_index < 0) {
1149           err = ECACHE_ALT_MISS;
1150           goto Ldone;
1151         }
1152       } else {
1153         alternate_index = 0;
1154       }
1155       alternate_tmp = vector.get(alternate_index);
1156       if (!alternate_tmp->valid()) {
1157         if (buf) {
1158           Note("OpenReadHead failed for cachekey %X : alternate inconsistency", key.slice32(0));
1159           dir_delete(&key, vol, &dir);
1160         }
1161         goto Ldone;
1162       }
1163 
1164       alternate.copy_shallow(alternate_tmp);
1165       alternate.object_key_get(&key);
1166       doc_len = alternate.object_size_get();
1167       if (key == doc->key) { // is this my data?
1168         f.single_fragment = doc->single_fragment();
1169         ink_assert(f.single_fragment); // otherwise need to read earliest
1170         ink_assert(doc->hlen);
1171         doc_pos = doc->prefix_len();
1172         next_CacheKey(&key, &doc->key);
1173       } else {
1174         f.single_fragment = false;
1175       }
1176     } else {
1177       next_CacheKey(&key, &doc->key);
1178       f.single_fragment = doc->single_fragment();
1179       doc_pos           = doc->prefix_len();
1180       doc_len           = doc->total_len;
1181     }
1182 
1183     if (is_debug_tag_set("cache_read")) { // amc debug
1184       char xt[CRYPTO_HEX_SIZE], yt[CRYPTO_HEX_SIZE];
1185       Debug("cache_read", "CacheReadStartHead - read %s target %s - %s %d of %" PRId64 " bytes, %d fragments",
1186             doc->key.toHexStr(xt), key.toHexStr(yt), f.single_fragment ? "single" : "multi", doc->len, doc->total_len,
1187             alternate.get_frag_offset_count());
1188     }
1189     // the first fragment might have been gc'ed. Make sure the first
1190     // fragment is there before returning CACHE_EVENT_OPEN_READ
1191     if (!f.single_fragment) {
1192       goto Learliest;
1193     }
1194 
1195     if (vol->within_hit_evacuate_window(&dir) &&
1196         (!cache_config_hit_evacuate_size_limit || doc_len <= static_cast<uint64_t>(cache_config_hit_evacuate_size_limit))) {
1197       DDebug("cache_hit_evac", "dir: %" PRId64 ", write: %" PRId64 ", phase: %d", dir_offset(&dir),
1198              vol->offset_to_vol_offset(vol->header->write_pos), vol->header->phase);
1199       f.hit_evacuate = 1;
1200     }
1201 
1202     first_buf = buf;
1203     vol->begin_read(this);
1204 
1205     goto Lsuccess;
1206 
1207   Lread:
1208     // check for collision
1209     // INKqa07684 - Cache::lookup returns CACHE_EVENT_OPEN_READ_FAILED.
1210     // don't want to go through this BS of reading from a writer if
1211     // its a lookup. In this case lookup will fail while the document is
1212     // being written to the cache.
1213     OpenDirEntry *cod = vol->open_read(&key);
1214     if (cod && !f.read_from_writer_called) {
1215       if (f.lookup) {
1216         err = ECACHE_DOC_BUSY;
1217         goto Ldone;
1218       }
1219       od = cod;
1220       MUTEX_RELEASE(lock);
1221       SET_HANDLER(&CacheVC::openReadFromWriter);
1222       return handleEvent(EVENT_IMMEDIATE, nullptr);
1223     }
1224     if (dir_probe(&key, vol, &dir, &last_collision)) {
1225       first_dir = dir;
1226       int ret   = do_read_call(&key);
1227       if (ret == EVENT_RETURN) {
1228         goto Lcallreturn;
1229       }
1230       return ret;
1231     }
1232   }
1233 Ldone:
1234   if (!f.lookup) {
1235     CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
1236     _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-err);
1237   } else {
1238     CACHE_INCREMENT_DYN_STAT(cache_lookup_failure_stat);
1239     _action.continuation->handleEvent(CACHE_EVENT_LOOKUP_FAILED, (void *)-err);
1240   }
1241   return free_CacheVC(this);
1242 Lcallreturn:
1243   return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call
1244 Lsuccess:
1245   SET_HANDLER(&CacheVC::openReadMain);
1246   return callcont(CACHE_EVENT_OPEN_READ);
1247 Lookup:
1248   CACHE_INCREMENT_DYN_STAT(cache_lookup_success_stat);
1249   _action.continuation->handleEvent(CACHE_EVENT_LOOKUP, nullptr);
1250   return free_CacheVC(this);
1251 Learliest:
1252   first_buf      = buf;
1253   buf            = nullptr;
1254   earliest_key   = key;
1255   last_collision = nullptr;
1256   SET_HANDLER(&CacheVC::openReadStartEarliest);
1257   return openReadStartEarliest(event, e);
1258 }
1259 
1260 /*
1261    Handle a directory delete event in case of some detected corruption.
1262 */
1263 int
openReadDirDelete(int event,Event * e)1264 CacheVC::openReadDirDelete(int event, Event *e)
1265 {
1266   MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
1267   if (!lock.is_locked()) {
1268     VC_SCHED_LOCK_RETRY();
1269   }
1270 
1271   dir_delete(&earliest_key, vol, &earliest_dir);
1272   return calluser(VC_EVENT_ERROR);
1273 }
1274