1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include "P_Cache.h"
25
26 #include "HttpCacheSM.h" //Added to get the scope of HttpCacheSM object.
27
28 Action *
open_read(Continuation * cont,const CacheKey * key,CacheFragType type,const char * hostname,int host_len)29 Cache::open_read(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len)
30 {
31 if (!CacheProcessor::IsCacheReady(type)) {
32 cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NOT_READY);
33 return ACTION_RESULT_DONE;
34 }
35 ink_assert(caches[type] == this);
36
37 Vol *vol = key_to_vol(key, hostname, host_len);
38 Dir result, *last_collision = nullptr;
39 ProxyMutex *mutex = cont->mutex.get();
40 OpenDirEntry *od = nullptr;
41 CacheVC *c = nullptr;
42 {
43 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
44 if (!lock.is_locked() || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
45 c = new_CacheVC(cont);
46 SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
47 c->vio.op = VIO::READ;
48 c->base_stat = cache_read_active_stat;
49 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
50 c->first_key = c->key = c->earliest_key = *key;
51 c->vol = vol;
52 c->frag_type = type;
53 c->od = od;
54 }
55 if (!c) {
56 goto Lmiss;
57 }
58 if (!lock.is_locked()) {
59 CONT_SCHED_LOCK_RETRY(c);
60 return &c->_action;
61 }
62 if (c->od) {
63 goto Lwriter;
64 }
65 c->dir = result;
66 c->last_collision = last_collision;
67 switch (c->do_read_call(&c->key)) {
68 case EVENT_DONE:
69 return ACTION_RESULT_DONE;
70 case EVENT_RETURN:
71 goto Lcallreturn;
72 default:
73 return &c->_action;
74 }
75 }
76 Lmiss:
77 CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
78 cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NO_DOC);
79 return ACTION_RESULT_DONE;
80 Lwriter:
81 SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
82 if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
83 return ACTION_RESULT_DONE;
84 }
85 return &c->_action;
86 Lcallreturn:
87 if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) {
88 return ACTION_RESULT_DONE;
89 }
90 return &c->_action;
91 }
92
93 Action *
open_read(Continuation * cont,const CacheKey * key,CacheHTTPHdr * request,const OverridableHttpConfigParams * params,CacheFragType type,const char * hostname,int host_len)94 Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, const OverridableHttpConfigParams *params,
95 CacheFragType type, const char *hostname, int host_len)
96 {
97 if (!CacheProcessor::IsCacheReady(type)) {
98 cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NOT_READY);
99 return ACTION_RESULT_DONE;
100 }
101 ink_assert(caches[type] == this);
102
103 Vol *vol = key_to_vol(key, hostname, host_len);
104 Dir result, *last_collision = nullptr;
105 ProxyMutex *mutex = cont->mutex.get();
106 OpenDirEntry *od = nullptr;
107 CacheVC *c = nullptr;
108
109 {
110 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
111 if (!lock.is_locked() || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
112 c = new_CacheVC(cont);
113 c->first_key = c->key = c->earliest_key = *key;
114 c->vol = vol;
115 c->vio.op = VIO::READ;
116 c->base_stat = cache_read_active_stat;
117 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
118 c->request.copy_shallow(request);
119 c->frag_type = CACHE_FRAG_TYPE_HTTP;
120 c->params = params;
121 c->od = od;
122 }
123 if (!lock.is_locked()) {
124 SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
125 CONT_SCHED_LOCK_RETRY(c);
126 return &c->_action;
127 }
128 if (!c) {
129 goto Lmiss;
130 }
131 if (c->od) {
132 goto Lwriter;
133 }
134 // hit
135 c->dir = c->first_dir = result;
136 c->last_collision = last_collision;
137 SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
138 switch (c->do_read_call(&c->key)) {
139 case EVENT_DONE:
140 return ACTION_RESULT_DONE;
141 case EVENT_RETURN:
142 goto Lcallreturn;
143 default:
144 return &c->_action;
145 }
146 }
147 Lmiss:
148 CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
149 cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NO_DOC);
150 return ACTION_RESULT_DONE;
151 Lwriter:
152 // this is a horrible violation of the interface and should be fixed (FIXME)
153 ((HttpCacheSM *)cont)->set_readwhilewrite_inprogress(true);
154 SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
155 if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
156 return ACTION_RESULT_DONE;
157 }
158 return &c->_action;
159 Lcallreturn:
160 if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) {
161 return ACTION_RESULT_DONE;
162 }
163 return &c->_action;
164 }
165
166 uint32_t
load_http_info(CacheHTTPInfoVector * info,Doc * doc,RefCountObj * block_ptr)167 CacheVC::load_http_info(CacheHTTPInfoVector *info, Doc *doc, RefCountObj *block_ptr)
168 {
169 uint32_t zret = info->get_handles(doc->hdr(), doc->hlen, block_ptr);
170 if (!this->f.doc_from_ram_cache && // ram cache is always already fixed up.
171 // If this is an old object, the object version will be old or 0, in either case this is
172 // correct. Forget the 4.2 compatibility, always update older versioned objects.
173 ts::VersionNumber(doc->v_major, doc->v_minor) < CACHE_DB_VERSION) {
174 for (int i = info->xcount - 1; i >= 0; --i) {
175 info->data(i).alternate.m_alt->m_response_hdr.m_mime->recompute_accelerators_and_presence_bits();
176 info->data(i).alternate.m_alt->m_request_hdr.m_mime->recompute_accelerators_and_presence_bits();
177 }
178 }
179 return zret;
180 }
181
182 int
openReadFromWriterFailure(int event,Event * e)183 CacheVC::openReadFromWriterFailure(int event, Event *e)
184 {
185 od = nullptr;
186 vector.clear(false);
187 CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
188 CACHE_INCREMENT_DYN_STAT(cache_read_busy_failure_stat);
189 _action.continuation->handleEvent(event, e);
190 free_CacheVC(this);
191 return EVENT_DONE;
192 }
193
194 int
openReadChooseWriter(int,Event *)195 CacheVC::openReadChooseWriter(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
196 {
197 intptr_t err = ECACHE_DOC_BUSY;
198 CacheVC *w = nullptr;
199
200 ink_assert(vol->mutex->thread_holding == mutex->thread_holding && write_vc == nullptr);
201
202 if (!od) {
203 return EVENT_RETURN;
204 }
205
206 if (frag_type != CACHE_FRAG_TYPE_HTTP) {
207 ink_assert(od->num_writers == 1);
208 w = od->writers.head;
209 if (w->start_time > start_time || w->closed < 0) {
210 od = nullptr;
211 return EVENT_RETURN;
212 }
213 if (!w->closed) {
214 return -err;
215 }
216 write_vc = w;
217 } else {
218 write_vector = &od->vector;
219 int write_vec_cnt = write_vector->count();
220 for (int c = 0; c < write_vec_cnt; c++) {
221 vector.insert(write_vector->get(c));
222 }
223 // check if all the writers who came before this reader have
224 // set the http_info.
225 for (w = static_cast<CacheVC *>(od->writers.head); w; w = static_cast<CacheVC *>(w->opendir_link.next)) {
226 if (w->start_time > start_time || w->closed < 0) {
227 continue;
228 }
229 if (!w->closed && !cache_config_read_while_writer) {
230 return -err;
231 }
232 if (w->alternate_index != CACHE_ALT_INDEX_DEFAULT) {
233 continue;
234 }
235
236 if (!w->closed && !w->alternate.valid()) {
237 od = nullptr;
238 ink_assert(!write_vc);
239 vector.clear(false);
240 return EVENT_CONT;
241 }
242 // construct the vector from the writers.
243 int alt_ndx = CACHE_ALT_INDEX_DEFAULT;
244 if (w->f.update) {
245 // all Update cases. Need to get the alternate index.
246 alt_ndx = get_alternate_index(&vector, w->update_key);
247 // if its an alternate delete
248 if (!w->alternate.valid()) {
249 if (alt_ndx >= 0) {
250 vector.remove(alt_ndx, false);
251 }
252 continue;
253 }
254 }
255 if (w->alternate.valid()) {
256 vector.insert(&w->alternate, alt_ndx);
257 }
258 }
259
260 if (!vector.count()) {
261 if (od->reading_vec) {
262 // the writer(s) are reading the vector, so there is probably
263 // an old vector. Since this reader came before any of the
264 // current writers, we should return the old data
265 od = nullptr;
266 return EVENT_RETURN;
267 }
268 return -ECACHE_NO_DOC;
269 }
270 if (cache_config_select_alternate) {
271 alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
272 if (alternate_index < 0) {
273 return -ECACHE_ALT_MISS;
274 }
275 } else {
276 alternate_index = 0;
277 }
278 CacheHTTPInfo *obj = vector.get(alternate_index);
279 for (w = static_cast<CacheVC *>(od->writers.head); w; w = static_cast<CacheVC *>(w->opendir_link.next)) {
280 if (obj->m_alt == w->alternate.m_alt) {
281 write_vc = w;
282 break;
283 }
284 }
285 vector.clear(false);
286 if (!write_vc) {
287 DDebug("cache_read_agg", "%p: key: %X writer alternate different: %d", this, first_key.slice32(1), alternate_index);
288 od = nullptr;
289 return EVENT_RETURN;
290 }
291
292 DDebug("cache_read_agg", "%p: key: %X eKey: %d # alts: %d, ndx: %d, # writers: %d writer: %p", this, first_key.slice32(1),
293 write_vc->earliest_key.slice32(1), vector.count(), alternate_index, od->num_writers, write_vc);
294 }
295 return EVENT_NONE;
296 }
297
298 int
openReadFromWriter(int event,Event * e)299 CacheVC::openReadFromWriter(int event, Event *e)
300 {
301 if (!f.read_from_writer_called) {
302 // The assignment to last_collision as nullptr was
303 // made conditional after INKqa08411
304 last_collision = nullptr;
305 // Let's restart the clock from here - the first time this a reader
306 // gets in this state. Its possible that the open_read was called
307 // before the open_write, but the reader could not get the volume
308 // lock. If we don't reset the clock here, we won't choose any writer
309 // and hence fail the read request.
310 start_time = Thread::get_hrtime();
311 f.read_from_writer_called = 1;
312 }
313 cancel_trigger();
314 intptr_t err = ECACHE_DOC_BUSY;
315 DDebug("cache_read_agg", "%p: key: %X In openReadFromWriter", this, first_key.slice32(1));
316 if (_action.cancelled) {
317 od = nullptr; // only open for read so no need to close
318 return free_CacheVC(this);
319 }
320 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
321 if (!lock.is_locked()) {
322 VC_SCHED_LOCK_RETRY();
323 }
324 od = vol->open_read(&first_key); // recheck in case the lock failed
325 if (!od) {
326 MUTEX_RELEASE(lock);
327 write_vc = nullptr;
328 SET_HANDLER(&CacheVC::openReadStartHead);
329 return openReadStartHead(event, e);
330 } else {
331 ink_assert(od == vol->open_read(&first_key));
332 }
333 if (!write_vc) {
334 int ret = openReadChooseWriter(event, e);
335 if (ret < 0) {
336 MUTEX_RELEASE(lock);
337 SET_HANDLER(&CacheVC::openReadFromWriterFailure);
338 return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<Event *>(ret));
339 } else if (ret == EVENT_RETURN) {
340 MUTEX_RELEASE(lock);
341 SET_HANDLER(&CacheVC::openReadStartHead);
342 return openReadStartHead(event, e);
343 } else if (ret == EVENT_CONT) {
344 ink_assert(!write_vc);
345 if (writer_lock_retry < cache_config_read_while_writer_max_retries) {
346 VC_SCHED_WRITER_RETRY();
347 } else {
348 return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
349 }
350 } else {
351 ink_assert(write_vc);
352 }
353 } else {
354 if (writer_done()) {
355 MUTEX_RELEASE(lock);
356 DDebug("cache_read_agg", "%p: key: %X writer %p has left, continuing as normal read", this, first_key.slice32(1), write_vc);
357 od = nullptr;
358 write_vc = nullptr;
359 SET_HANDLER(&CacheVC::openReadStartHead);
360 return openReadStartHead(event, e);
361 }
362 }
363 OpenDirEntry *cod = od;
364 od = nullptr;
365 // someone is currently writing the document
366 if (write_vc->closed < 0) {
367 MUTEX_RELEASE(lock);
368 write_vc = nullptr;
369 // writer aborted, continue as if there is no writer
370 SET_HANDLER(&CacheVC::openReadStartHead);
371 return openReadStartHead(EVENT_IMMEDIATE, nullptr);
372 }
373 // allow reading from unclosed writer for http requests only.
374 ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP || write_vc->closed);
375 if (!write_vc->closed && !write_vc->fragment) {
376 if (!cache_config_read_while_writer || frag_type != CACHE_FRAG_TYPE_HTTP ||
377 writer_lock_retry >= cache_config_read_while_writer_max_retries) {
378 MUTEX_RELEASE(lock);
379 return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
380 }
381 DDebug("cache_read_agg", "%p: key: %X writer: closed:%d, fragment:%d, retry: %d", this, first_key.slice32(1), write_vc->closed,
382 write_vc->fragment, writer_lock_retry);
383 VC_SCHED_WRITER_RETRY();
384 }
385
386 CACHE_TRY_LOCK(writer_lock, write_vc->mutex, mutex->thread_holding);
387 if (!writer_lock.is_locked()) {
388 DDebug("cache_read_agg", "%p: key: %X lock miss", this, first_key.slice32(1));
389 VC_SCHED_LOCK_RETRY();
390 }
391 MUTEX_RELEASE(lock);
392
393 if (!write_vc->io.ok()) {
394 return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
395 }
396 if (frag_type == CACHE_FRAG_TYPE_HTTP) {
397 DDebug("cache_read_agg", "%p: key: %X http passed stage 1, closed: %d, frag: %d", this, first_key.slice32(1), write_vc->closed,
398 write_vc->fragment);
399 if (!write_vc->alternate.valid()) {
400 return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
401 }
402 alternate.copy(&write_vc->alternate);
403 vector.insert(&alternate);
404 alternate.object_key_get(&key);
405 write_vc->f.readers = 1;
406 if (!(write_vc->f.update && write_vc->total_len == 0)) {
407 key = write_vc->earliest_key;
408 if (!write_vc->closed) {
409 alternate.object_size_set(write_vc->vio.nbytes);
410 } else {
411 alternate.object_size_set(write_vc->total_len);
412 }
413 } else {
414 key = write_vc->update_key;
415 ink_assert(write_vc->closed);
416 DDebug("cache_read_agg", "%p: key: %X writer header update", this, first_key.slice32(1));
417 // Update case (b) : grab doc_len from the writer's alternate
418 doc_len = alternate.object_size_get();
419 if (write_vc->update_key == cod->single_doc_key && (cod->move_resident_alt || write_vc->f.rewrite_resident_alt) &&
420 write_vc->first_buf.get()) {
421 // the resident alternate is being updated and its a
422 // header only update. The first_buf of the writer has the
423 // document body.
424 Doc *doc = reinterpret_cast<Doc *>(write_vc->first_buf->data());
425 writer_buf = new_IOBufferBlock(write_vc->first_buf, doc->data_len(), doc->prefix_len());
426 MUTEX_RELEASE(writer_lock);
427 ink_assert(doc_len == doc->data_len());
428 length = doc_len;
429 f.single_fragment = 1;
430 doc_pos = 0;
431 earliest_key = key;
432 dir_clean(&first_dir);
433 dir_clean(&earliest_dir);
434 SET_HANDLER(&CacheVC::openReadFromWriterMain);
435 CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
436 return callcont(CACHE_EVENT_OPEN_READ);
437 }
438 // want to snarf the new headers from the writer
439 // and then continue as if nothing happened
440 last_collision = nullptr;
441 MUTEX_RELEASE(writer_lock);
442 SET_HANDLER(&CacheVC::openReadStartEarliest);
443 return openReadStartEarliest(event, e);
444 }
445 } else {
446 DDebug("cache_read_agg", "%p: key: %X non-http passed stage 1", this, first_key.slice32(1));
447 key = write_vc->earliest_key;
448 }
449 if (write_vc->fragment) {
450 doc_len = write_vc->vio.nbytes;
451 last_collision = nullptr;
452 DDebug("cache_read_agg", "%p: key: %X closed: %d, fragment: %d, len: %d starting first fragment", this, first_key.slice32(1),
453 write_vc->closed, write_vc->fragment, (int)doc_len);
454 MUTEX_RELEASE(writer_lock);
455 // either a header + body update or a new document
456 SET_HANDLER(&CacheVC::openReadStartEarliest);
457 return openReadStartEarliest(event, e);
458 }
459 writer_buf = write_vc->blocks;
460 writer_offset = write_vc->offset;
461 length = write_vc->length;
462 // copy the vector
463 f.single_fragment = !write_vc->fragment; // single fragment doc
464 doc_pos = 0;
465 earliest_key = write_vc->earliest_key;
466 ink_assert(earliest_key == key);
467 doc_len = write_vc->total_len;
468 dir_clean(&first_dir);
469 dir_clean(&earliest_dir);
470 DDebug("cache_read_agg", "%p: key: %X %X: single fragment read", this, first_key.slice32(1), key.slice32(0));
471 MUTEX_RELEASE(writer_lock);
472 SET_HANDLER(&CacheVC::openReadFromWriterMain);
473 CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
474 return callcont(CACHE_EVENT_OPEN_READ);
475 }
476
477 int
openReadFromWriterMain(int,Event *)478 CacheVC::openReadFromWriterMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
479 {
480 cancel_trigger();
481 if (seek_to) {
482 vio.ndone = seek_to;
483 seek_to = 0;
484 }
485 IOBufferBlock *b = nullptr;
486 int64_t ntodo = vio.ntodo();
487 if (ntodo <= 0) {
488 return EVENT_CONT;
489 }
490 if (length < (static_cast<int64_t>(doc_len)) - vio.ndone) {
491 DDebug("cache_read_agg", "truncation %X", first_key.slice32(1));
492 if (is_action_tag_set("cache")) {
493 ink_release_assert(false);
494 }
495 Warning("Document %X truncated at %d of %d, reading from writer", first_key.slice32(1), (int)vio.ndone, (int)doc_len);
496 return calluser(VC_EVENT_ERROR);
497 }
498 /* its possible that the user did a do_io_close before
499 openWriteWriteDone was called. */
500 if (length > (static_cast<int64_t>(doc_len)) - vio.ndone) {
501 int64_t skip_bytes = length - (doc_len - vio.ndone);
502 iobufferblock_skip(writer_buf.get(), &writer_offset, &length, skip_bytes);
503 }
504 int64_t bytes = length;
505 if (bytes > vio.ntodo()) {
506 bytes = vio.ntodo();
507 }
508 if (vio.ndone >= static_cast<int64_t>(doc_len)) {
509 ink_assert(bytes <= 0);
510 // reached the end of the document and the user still wants more
511 return calluser(VC_EVENT_EOS);
512 }
513 b = iobufferblock_clone(writer_buf.get(), writer_offset, bytes);
514 writer_buf = iobufferblock_skip(writer_buf.get(), &writer_offset, &length, bytes);
515 vio.buffer.writer()->append_block(b);
516 vio.ndone += bytes;
517 if (vio.ntodo() <= 0) {
518 return calluser(VC_EVENT_READ_COMPLETE);
519 } else {
520 return calluser(VC_EVENT_READ_READY);
521 }
522 }
523
524 int
openReadClose(int event,Event *)525 CacheVC::openReadClose(int event, Event * /* e ATS_UNUSED */)
526 {
527 cancel_trigger();
528 if (is_io_in_progress()) {
529 if (event != AIO_EVENT_DONE) {
530 return EVENT_CONT;
531 }
532 set_io_not_in_progress();
533 }
534 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
535 if (!lock.is_locked()) {
536 VC_SCHED_LOCK_RETRY();
537 }
538 if (f.hit_evacuate && dir_valid(vol, &first_dir) && closed > 0) {
539 if (f.single_fragment) {
540 vol->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
541 } else if (dir_valid(vol, &earliest_dir)) {
542 vol->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
543 vol->force_evacuate_head(&earliest_dir, dir_pinned(&earliest_dir));
544 }
545 }
546 vol->close_read(this);
547 return free_CacheVC(this);
548 }
549
550 int
openReadReadDone(int event,Event * e)551 CacheVC::openReadReadDone(int event, Event *e)
552 {
553 Doc *doc = nullptr;
554
555 cancel_trigger();
556 if (event == EVENT_IMMEDIATE) {
557 return EVENT_CONT;
558 }
559 set_io_not_in_progress();
560 {
561 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
562 if (!lock.is_locked()) {
563 VC_SCHED_LOCK_RETRY();
564 }
565 if (event == AIO_EVENT_DONE && !io.ok()) {
566 goto Lerror;
567 }
568 if (last_collision && // no missed lock
569 dir_valid(vol, &dir)) // object still valid
570 {
571 doc = reinterpret_cast<Doc *>(buf->data());
572 if (doc->magic != DOC_MAGIC) {
573 char tmpstring[CRYPTO_HEX_SIZE];
574 if (doc->magic == DOC_CORRUPT) {
575 Warning("Middle: Doc checksum does not match for %s", key.toHexStr(tmpstring));
576 } else {
577 Warning("Middle: Doc magic does not match for %s", key.toHexStr(tmpstring));
578 }
579 goto Lerror;
580 }
581 if (doc->key == key) {
582 goto LreadMain;
583 }
584 }
585 if (last_collision && dir_offset(&dir) != dir_offset(last_collision)) {
586 last_collision = nullptr; // object has been/is being overwritten
587 }
588 if (dir_probe(&key, vol, &dir, &last_collision)) {
589 int ret = do_read_call(&key);
590 if (ret == EVENT_RETURN) {
591 goto Lcallreturn;
592 }
593 return EVENT_CONT;
594 } else if (write_vc) {
595 if (writer_done()) {
596 last_collision = nullptr;
597 while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
598 if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
599 DDebug("cache_read_agg", "%p: key: %X ReadRead complete: %d", this, first_key.slice32(1), (int)vio.ndone);
600 doc_len = vio.ndone;
601 goto Ldone;
602 }
603 }
604 DDebug("cache_read_agg", "%p: key: %X ReadRead writer aborted: %d", this, first_key.slice32(1), (int)vio.ndone);
605 goto Lerror;
606 }
607 if (writer_lock_retry < cache_config_read_while_writer_max_retries) {
608 DDebug("cache_read_agg", "%p: key: %X ReadRead retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
609 VC_SCHED_WRITER_RETRY(); // wait for writer
610 } else {
611 DDebug("cache_read_agg", "%p: key: %X ReadRead retries exhausted, bailing..: %d", this, first_key.slice32(1),
612 (int)vio.ndone);
613 goto Ldone;
614 }
615 }
616 // fall through for truncated documents
617 }
618 Lerror : {
619 char tmpstring[CRYPTO_HEX_SIZE];
620 if (request.valid()) {
621 int url_length;
622 const char *url_text = request.url_get()->string_get_ref(&url_length);
623 Warning("Document %s truncated, url[%.*s] .. clearing", earliest_key.toHexStr(tmpstring), url_length, url_text);
624 } else {
625 Warning("Document %s truncated .. clearing", earliest_key.toHexStr(tmpstring));
626 }
627 dir_delete(&earliest_key, vol, &earliest_dir);
628 return calluser(VC_EVENT_ERROR);
629 }
630 Ldone:
631 return calluser(VC_EVENT_EOS);
632 Lcallreturn:
633 return handleEvent(AIO_EVENT_DONE, nullptr);
634 LreadMain:
635 fragment++;
636 doc_pos = doc->prefix_len();
637 next_CacheKey(&key, &key);
638 SET_HANDLER(&CacheVC::openReadMain);
639 return openReadMain(event, e);
640 }
641
642 int
openReadMain(int,Event *)643 CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
644 {
645 cancel_trigger();
646 Doc *doc = reinterpret_cast<Doc *>(buf->data());
647 int64_t ntodo = vio.ntodo();
648 int64_t bytes = doc->len - doc_pos;
649 IOBufferBlock *b = nullptr;
650 if (seek_to) { // handle do_io_pread
651 if (seek_to >= doc_len) {
652 vio.ndone = doc_len;
653 return calluser(VC_EVENT_EOS);
654 }
655 HTTPInfo::FragOffset *frags = alternate.get_frag_table();
656 if (is_debug_tag_set("cache_seek")) {
657 char b[CRYPTO_HEX_SIZE], c[CRYPTO_HEX_SIZE];
658 Debug("cache_seek", "Seek @ %" PRId64 " in %s from #%d @ %" PRId64 "/%d:%s", seek_to, first_key.toHexStr(b), fragment,
659 doc_pos, doc->len, doc->key.toHexStr(c));
660 }
661 /* Because single fragment objects can migrate to hang off an alt vector
662 they can appear to the VC as multi-fragment when they are not really.
663 The essential difference is the existence of a fragment table.
664 */
665 if (frags) {
666 int target = 0;
667 HTTPInfo::FragOffset next_off = frags[target];
668 int lfi = static_cast<int>(alternate.get_frag_offset_count()) - 1;
669 ink_assert(lfi >= 0); // because it's not a single frag doc.
670
671 /* Note: frag[i].offset is the offset of the first byte past the
672 i'th fragment. So frag[0].offset is the offset of the first
673 byte of fragment 1. In addition the # of fragments is one
674 more than the fragment table length, the start of the last
675 fragment being the last offset in the table.
676 */
677 if (fragment == 0 || seek_to < frags[fragment - 1] || (fragment <= lfi && frags[fragment] <= seek_to)) {
678 // search from frag 0 on to find the proper frag
679 while (seek_to >= next_off && target < lfi) {
680 next_off = frags[++target];
681 }
682 if (target == lfi && seek_to >= next_off) {
683 ++target;
684 }
685 } else { // shortcut if we are in the fragment already
686 target = fragment;
687 }
688 if (target != fragment) {
689 // Lread will read the next fragment always, so if that
690 // is the one we want, we don't need to do anything
691 int cfi = fragment;
692 --target;
693 while (target > fragment) {
694 next_CacheKey(&key, &key);
695 ++fragment;
696 }
697 while (target < fragment) {
698 prev_CacheKey(&key, &key);
699 --fragment;
700 }
701
702 if (is_debug_tag_set("cache_seek")) {
703 char target_key_str[CRYPTO_HEX_SIZE];
704 key.toHexStr(target_key_str);
705 Debug("cache_seek", "Seek #%d @ %" PRId64 " -> #%d @ %" PRId64 ":%s", cfi, doc_pos, target, seek_to, target_key_str);
706 }
707 goto Lread;
708 }
709 }
710 doc_pos = doc->prefix_len() + seek_to;
711 if (fragment && frags) {
712 doc_pos -= static_cast<int64_t>(frags[fragment - 1]);
713 }
714 vio.ndone = 0;
715 seek_to = 0;
716 ntodo = vio.ntodo();
717 bytes = doc->len - doc_pos;
718 if (is_debug_tag_set("cache_seek")) {
719 char target_key_str[CRYPTO_HEX_SIZE];
720 Debug("cache_seek", "Read # %d @ %" PRId64 "/%d for %" PRId64 " %s", fragment, doc_pos, doc->len, bytes,
721 key.toHexStr(target_key_str));
722 }
723
724 // This shouldn't happen for HTTP assets but it does
725 // occasionally in production. This is a temporary fix
726 // to clean up broken objects until the root cause can
727 // be found. It must be the case that either the fragment
728 // offsets are incorrect or a fragment table isn't being
729 // created when it should be.
730 if (frag_type == CACHE_FRAG_TYPE_HTTP && bytes < 0) {
731 char xt[CRYPTO_HEX_SIZE];
732 char yt[CRYPTO_HEX_SIZE];
733
734 int url_length = 0;
735 char const *url_text = nullptr;
736 if (request.valid()) {
737 url_text = request.url_get()->string_get_ref(&url_length);
738 }
739
740 int64_t prev_frag_size = 0;
741 if (fragment && frags) {
742 prev_frag_size = static_cast<int64_t>(frags[fragment - 1]);
743 }
744
745 Warning("cache_seek range request bug: read %s targ %s - %s frag # %d (prev_frag %" PRId64 ") @ %" PRId64 "/%d for %" PRId64
746 " tot %" PRId64 " url '%.*s'",
747 doc->key.toHexStr(xt), key.toHexStr(yt), f.single_fragment ? "single" : "multi", fragment, prev_frag_size, doc_pos,
748 doc->len, bytes, doc->total_len, url_length, url_text);
749
750 doc->magic = DOC_CORRUPT;
751
752 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
753 if (!lock.is_locked()) {
754 SET_HANDLER(&CacheVC::openReadDirDelete);
755 VC_SCHED_LOCK_RETRY();
756 }
757
758 dir_delete(&earliest_key, vol, &earliest_dir);
759 goto Lerror;
760 }
761 }
762 if (ntodo <= 0) {
763 return EVENT_CONT;
764 }
765 if (vio.buffer.writer()->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) { // initiate read of first block
766 return EVENT_CONT;
767 }
768 if ((bytes <= 0) && vio.ntodo() >= 0) {
769 goto Lread;
770 }
771 if (bytes > vio.ntodo()) {
772 bytes = vio.ntodo();
773 }
774 b = new_IOBufferBlock(buf, bytes, doc_pos);
775 b->_buf_end = b->_end;
776 vio.buffer.writer()->append_block(b);
777 vio.ndone += bytes;
778 doc_pos += bytes;
779 if (vio.ntodo() <= 0) {
780 return calluser(VC_EVENT_READ_COMPLETE);
781 } else {
782 if (calluser(VC_EVENT_READ_READY) == EVENT_DONE) {
783 return EVENT_DONE;
784 }
785 // we have to keep reading until we give the user all the
786 // bytes it wanted or we hit the watermark.
787 if (vio.ntodo() > 0 && !vio.buffer.writer()->high_water()) {
788 goto Lread;
789 }
790 return EVENT_CONT;
791 }
792 Lread : {
793 if (vio.ndone >= static_cast<int64_t>(doc_len)) {
794 // reached the end of the document and the user still wants more
795 return calluser(VC_EVENT_EOS);
796 }
797 last_collision = nullptr;
798 writer_lock_retry = 0;
799 // if the state machine calls reenable on the callback from the cache,
800 // we set up a schedule_imm event. The openReadReadDone discards
801 // EVENT_IMMEDIATE events. So, we have to cancel that trigger and set
802 // a new EVENT_INTERVAL event.
803 cancel_trigger();
804 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
805 if (!lock.is_locked()) {
806 SET_HANDLER(&CacheVC::openReadMain);
807 VC_SCHED_LOCK_RETRY();
808 }
809 if (dir_probe(&key, vol, &dir, &last_collision)) {
810 SET_HANDLER(&CacheVC::openReadReadDone);
811 int ret = do_read_call(&key);
812 if (ret == EVENT_RETURN) {
813 goto Lcallreturn;
814 }
815 return EVENT_CONT;
816 } else if (write_vc) {
817 if (writer_done()) {
818 last_collision = nullptr;
819 while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
820 if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
821 DDebug("cache_read_agg", "%p: key: %X ReadMain complete: %d", this, first_key.slice32(1), (int)vio.ndone);
822 doc_len = vio.ndone;
823 goto Leos;
824 }
825 }
826 DDebug("cache_read_agg", "%p: key: %X ReadMain writer aborted: %d", this, first_key.slice32(1), (int)vio.ndone);
827 goto Lerror;
828 }
829 DDebug("cache_read_agg", "%p: key: %X ReadMain retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
830 SET_HANDLER(&CacheVC::openReadMain);
831 VC_SCHED_WRITER_RETRY();
832 }
833 if (is_action_tag_set("cache")) {
834 ink_release_assert(false);
835 }
836 Warning("Document %X truncated at %d of %d, missing fragment %X", first_key.slice32(1), (int)vio.ndone, (int)doc_len,
837 key.slice32(1));
838 // remove the directory entry
839 dir_delete(&earliest_key, vol, &earliest_dir);
840 }
841 Lerror:
842 return calluser(VC_EVENT_ERROR);
843 Leos:
844 return calluser(VC_EVENT_EOS);
845 Lcallreturn:
846 return handleEvent(AIO_EVENT_DONE, nullptr);
847 }
848
849 /*
850 This code follows CacheVC::openReadStartHead closely,
851 if you change this you might have to change that.
852 */
853 int
openReadStartEarliest(int,Event *)854 CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
855 {
856 int ret = 0;
857 Doc *doc = nullptr;
858 cancel_trigger();
859 set_io_not_in_progress();
860 if (_action.cancelled) {
861 return free_CacheVC(this);
862 }
863 {
864 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
865 if (!lock.is_locked()) {
866 VC_SCHED_LOCK_RETRY();
867 }
868 if (!buf) {
869 goto Lread;
870 }
871 if (!io.ok()) {
872 goto Ldone;
873 }
874 // an object needs to be outside the aggregation window in order to be
875 // be evacuated as it is read
876 if (!dir_agg_valid(vol, &dir)) {
877 // a directory entry which is no longer valid may have been overwritten
878 if (!dir_valid(vol, &dir)) {
879 last_collision = nullptr;
880 }
881 goto Lread;
882 }
883 doc = reinterpret_cast<Doc *>(buf->data());
884 if (doc->magic != DOC_MAGIC) {
885 char tmpstring[CRYPTO_HEX_SIZE];
886 if (is_action_tag_set("cache")) {
887 ink_release_assert(false);
888 }
889 if (doc->magic == DOC_CORRUPT) {
890 Warning("Earliest: Doc checksum does not match for %s", key.toHexStr(tmpstring));
891 } else {
892 Warning("Earliest : Doc magic does not match for %s", key.toHexStr(tmpstring));
893 }
894 // remove the dir entry
895 dir_delete(&key, vol, &dir);
896 // try going through the directory entries again
897 // in case the dir entry we deleted doesnt correspond
898 // to the key we are looking for. This is possible
899 // because of directory collisions
900 last_collision = nullptr;
901 goto Lread;
902 }
903 if (!(doc->key == key)) { // collision
904 goto Lread;
905 }
906 // success
907 earliest_key = key;
908 doc_pos = doc->prefix_len();
909 next_CacheKey(&key, &doc->key);
910 vol->begin_read(this);
911 if (vol->within_hit_evacuate_window(&earliest_dir) &&
912 (!cache_config_hit_evacuate_size_limit || doc_len <= static_cast<uint64_t>(cache_config_hit_evacuate_size_limit))) {
913 DDebug("cache_hit_evac", "dir: %" PRId64 ", write: %" PRId64 ", phase: %d", dir_offset(&earliest_dir),
914 vol->offset_to_vol_offset(vol->header->write_pos), vol->header->phase);
915 f.hit_evacuate = 1;
916 }
917 goto Lsuccess;
918 Lread:
919 if (dir_probe(&key, vol, &earliest_dir, &last_collision) || dir_lookaside_probe(&key, vol, &earliest_dir, nullptr)) {
920 dir = earliest_dir;
921 if ((ret = do_read_call(&key)) == EVENT_RETURN) {
922 goto Lcallreturn;
923 }
924 return ret;
925 }
926 // read has detected that alternate does not exist in the cache.
927 // rewrite the vector.
928 if (!f.read_from_writer_called && frag_type == CACHE_FRAG_TYPE_HTTP) {
929 // don't want any writers while we are evacuating the vector
930 if (!vol->open_write(this, false, 1)) {
931 Doc *doc1 = reinterpret_cast<Doc *>(first_buf->data());
932 uint32_t len = this->load_http_info(write_vector, doc1);
933 ink_assert(len == doc1->hlen && write_vector->count() > 0);
934 write_vector->remove(alternate_index, true);
935 // if the vector had one alternate, delete it's directory entry
936 if (len != doc1->hlen || !write_vector->count()) {
937 // sometimes the delete fails when there is a race and another read
938 // finds that the directory entry has been overwritten
939 // (cannot assert on the return value)
940 dir_delete(&first_key, vol, &first_dir);
941 } else {
942 buf = nullptr;
943 last_collision = nullptr;
944 write_len = 0;
945 header_len = write_vector->marshal_length();
946 f.evac_vector = 1;
947 f.use_first_key = 1;
948 key = first_key;
949 // always use od->first_dir to overwrite a directory.
950 // If an evacuation happens while a vector is being updated
951 // the evacuator changes the od->first_dir to the new directory
952 // that it inserted
953 od->first_dir = first_dir;
954 od->writing_vec = true;
955 earliest_key = zero_key;
956
957 // set up this VC as a alternate delete write_vc
958 vio.op = VIO::WRITE;
959 total_len = 0;
960 f.update = 1;
961 alternate_index = CACHE_ALT_REMOVED;
962 /////////////////////////////////////////////////////////////////
963 // change to create a directory entry for a resident alternate //
964 // when another alternate does not exist. //
965 /////////////////////////////////////////////////////////////////
966 if (doc1->total_len > 0) {
967 od->move_resident_alt = true;
968 od->single_doc_key = doc1->key;
969 dir_assign(&od->single_doc_dir, &dir);
970 dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
971 }
972 SET_HANDLER(&CacheVC::openReadVecWrite);
973 if ((ret = do_write_call()) == EVENT_RETURN) {
974 goto Lcallreturn;
975 }
976 return ret;
977 }
978 }
979 }
980 // open write failure - another writer, so don't modify the vector
981 Ldone:
982 if (od) {
983 vol->close_write(this);
984 }
985 }
986 CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
987 _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NO_DOC);
988 return free_CacheVC(this);
989 Lcallreturn:
990 return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call
991 Lsuccess:
992 if (write_vc) {
993 CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
994 }
995 SET_HANDLER(&CacheVC::openReadMain);
996 return callcont(CACHE_EVENT_OPEN_READ);
997 }
998
999 // create the directory entry after the vector has been evacuated
1000 // the volume lock has been taken when this function is called
1001 int
openReadVecWrite(int,Event *)1002 CacheVC::openReadVecWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
1003 {
1004 cancel_trigger();
1005 set_io_not_in_progress();
1006 ink_assert(od);
1007 od->writing_vec = false;
1008 if (_action.cancelled) {
1009 return openWriteCloseDir(EVENT_IMMEDIATE, nullptr);
1010 }
1011 {
1012 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
1013 if (!lock.is_locked()) {
1014 VC_SCHED_LOCK_RETRY();
1015 }
1016 if (io.ok()) {
1017 ink_assert(f.evac_vector);
1018 ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP);
1019 ink_assert(!buf);
1020 f.evac_vector = false;
1021 last_collision = nullptr;
1022 f.update = 0;
1023 alternate_index = CACHE_ALT_INDEX_DEFAULT;
1024 f.use_first_key = 0;
1025 vio.op = VIO::READ;
1026 dir_overwrite(&first_key, vol, &dir, &od->first_dir);
1027 if (od->move_resident_alt) {
1028 dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
1029 }
1030 int alt_ndx = HttpTransactCache::SelectFromAlternates(write_vector, &request, params);
1031 vol->close_write(this);
1032 if (alt_ndx >= 0) {
1033 vector.clear();
1034 // we don't need to start all over again, since we already
1035 // have the vector in memory. But this is simpler and this
1036 // case is rare.
1037 goto Lrestart;
1038 }
1039 } else {
1040 vol->close_write(this);
1041 }
1042 }
1043
1044 CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
1045 _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_ALT_MISS);
1046 return free_CacheVC(this);
1047 Lrestart:
1048 SET_HANDLER(&CacheVC::openReadStartHead);
1049 return openReadStartHead(EVENT_IMMEDIATE, nullptr);
1050 }
1051
1052 /*
1053 This code follows CacheVC::openReadStartEarliest closely,
1054 if you change this you might have to change that.
1055 */
1056 int
openReadStartHead(int event,Event * e)1057 CacheVC::openReadStartHead(int event, Event *e)
1058 {
1059 intptr_t err = ECACHE_NO_DOC;
1060 Doc *doc = nullptr;
1061 cancel_trigger();
1062 set_io_not_in_progress();
1063 if (_action.cancelled) {
1064 return free_CacheVC(this);
1065 }
1066 {
1067 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
1068 if (!lock.is_locked()) {
1069 VC_SCHED_LOCK_RETRY();
1070 }
1071 if (!buf) {
1072 goto Lread;
1073 }
1074 if (!io.ok()) {
1075 goto Ldone;
1076 }
1077 // an object needs to be outside the aggregation window in order to be
1078 // be evacuated as it is read
1079 if (!dir_agg_valid(vol, &dir)) {
1080 // a directory entry which is no longer valid may have been overwritten
1081 if (!dir_valid(vol, &dir)) {
1082 last_collision = nullptr;
1083 }
1084 goto Lread;
1085 }
1086 doc = reinterpret_cast<Doc *>(buf->data());
1087 if (doc->magic != DOC_MAGIC) {
1088 char tmpstring[CRYPTO_HEX_SIZE];
1089 if (is_action_tag_set("cache")) {
1090 ink_release_assert(false);
1091 }
1092 if (doc->magic == DOC_CORRUPT) {
1093 Warning("Head: Doc checksum does not match for %s", key.toHexStr(tmpstring));
1094 } else {
1095 Warning("Head : Doc magic does not match for %s", key.toHexStr(tmpstring));
1096 }
1097 // remove the dir entry
1098 dir_delete(&key, vol, &dir);
1099 // try going through the directory entries again
1100 // in case the dir entry we deleted doesnt correspond
1101 // to the key we are looking for. This is possible
1102 // because of directory collisions
1103 last_collision = nullptr;
1104 goto Lread;
1105 }
1106 if (!(doc->first_key == key)) {
1107 goto Lread;
1108 }
1109 if (f.lookup) {
1110 goto Lookup;
1111 }
1112 earliest_dir = dir;
1113 CacheHTTPInfo *alternate_tmp;
1114 if (frag_type == CACHE_FRAG_TYPE_HTTP) {
1115 uint32_t uml;
1116 ink_assert(doc->hlen);
1117 if (!doc->hlen) {
1118 goto Ldone;
1119 }
1120 if ((uml = this->load_http_info(&vector, doc)) != doc->hlen) {
1121 if (buf) {
1122 HTTPCacheAlt *alt = reinterpret_cast<HTTPCacheAlt *>(doc->hdr());
1123 int32_t alt_length = 0;
1124 // count should be reasonable, as vector is initialized and unlikely to be too corrupted
1125 // by bad disk data - count should be the number of successfully unmarshalled alts.
1126 for (int32_t i = 0; i < vector.count(); ++i) {
1127 CacheHTTPInfo *info = vector.get(i);
1128 if (info && info->m_alt) {
1129 alt_length += info->m_alt->m_unmarshal_len;
1130 }
1131 }
1132 Note("OpenReadHead failed for cachekey %X : vector inconsistency - "
1133 "unmarshalled %d expecting %d in %d (base=%zu, ver=%d:%d) "
1134 "- vector n=%d size=%d"
1135 "first alt=%d[%s]",
1136 key.slice32(0), uml, doc->hlen, doc->len, sizeof(Doc), doc->v_major, doc->v_minor, vector.count(), alt_length,
1137 alt->m_magic,
1138 (CACHE_ALT_MAGIC_ALIVE == alt->m_magic ?
1139 "alive" :
1140 CACHE_ALT_MAGIC_MARSHALED == alt->m_magic ? "serial" : CACHE_ALT_MAGIC_DEAD == alt->m_magic ? "dead" : "bogus"));
1141 dir_delete(&key, vol, &dir);
1142 }
1143 err = ECACHE_BAD_META_DATA;
1144 goto Ldone;
1145 }
1146 if (cache_config_select_alternate) {
1147 alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
1148 if (alternate_index < 0) {
1149 err = ECACHE_ALT_MISS;
1150 goto Ldone;
1151 }
1152 } else {
1153 alternate_index = 0;
1154 }
1155 alternate_tmp = vector.get(alternate_index);
1156 if (!alternate_tmp->valid()) {
1157 if (buf) {
1158 Note("OpenReadHead failed for cachekey %X : alternate inconsistency", key.slice32(0));
1159 dir_delete(&key, vol, &dir);
1160 }
1161 goto Ldone;
1162 }
1163
1164 alternate.copy_shallow(alternate_tmp);
1165 alternate.object_key_get(&key);
1166 doc_len = alternate.object_size_get();
1167 if (key == doc->key) { // is this my data?
1168 f.single_fragment = doc->single_fragment();
1169 ink_assert(f.single_fragment); // otherwise need to read earliest
1170 ink_assert(doc->hlen);
1171 doc_pos = doc->prefix_len();
1172 next_CacheKey(&key, &doc->key);
1173 } else {
1174 f.single_fragment = false;
1175 }
1176 } else {
1177 next_CacheKey(&key, &doc->key);
1178 f.single_fragment = doc->single_fragment();
1179 doc_pos = doc->prefix_len();
1180 doc_len = doc->total_len;
1181 }
1182
1183 if (is_debug_tag_set("cache_read")) { // amc debug
1184 char xt[CRYPTO_HEX_SIZE], yt[CRYPTO_HEX_SIZE];
1185 Debug("cache_read", "CacheReadStartHead - read %s target %s - %s %d of %" PRId64 " bytes, %d fragments",
1186 doc->key.toHexStr(xt), key.toHexStr(yt), f.single_fragment ? "single" : "multi", doc->len, doc->total_len,
1187 alternate.get_frag_offset_count());
1188 }
1189 // the first fragment might have been gc'ed. Make sure the first
1190 // fragment is there before returning CACHE_EVENT_OPEN_READ
1191 if (!f.single_fragment) {
1192 goto Learliest;
1193 }
1194
1195 if (vol->within_hit_evacuate_window(&dir) &&
1196 (!cache_config_hit_evacuate_size_limit || doc_len <= static_cast<uint64_t>(cache_config_hit_evacuate_size_limit))) {
1197 DDebug("cache_hit_evac", "dir: %" PRId64 ", write: %" PRId64 ", phase: %d", dir_offset(&dir),
1198 vol->offset_to_vol_offset(vol->header->write_pos), vol->header->phase);
1199 f.hit_evacuate = 1;
1200 }
1201
1202 first_buf = buf;
1203 vol->begin_read(this);
1204
1205 goto Lsuccess;
1206
1207 Lread:
1208 // check for collision
1209 // INKqa07684 - Cache::lookup returns CACHE_EVENT_OPEN_READ_FAILED.
1210 // don't want to go through this BS of reading from a writer if
1211 // its a lookup. In this case lookup will fail while the document is
1212 // being written to the cache.
1213 OpenDirEntry *cod = vol->open_read(&key);
1214 if (cod && !f.read_from_writer_called) {
1215 if (f.lookup) {
1216 err = ECACHE_DOC_BUSY;
1217 goto Ldone;
1218 }
1219 od = cod;
1220 MUTEX_RELEASE(lock);
1221 SET_HANDLER(&CacheVC::openReadFromWriter);
1222 return handleEvent(EVENT_IMMEDIATE, nullptr);
1223 }
1224 if (dir_probe(&key, vol, &dir, &last_collision)) {
1225 first_dir = dir;
1226 int ret = do_read_call(&key);
1227 if (ret == EVENT_RETURN) {
1228 goto Lcallreturn;
1229 }
1230 return ret;
1231 }
1232 }
1233 Ldone:
1234 if (!f.lookup) {
1235 CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
1236 _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-err);
1237 } else {
1238 CACHE_INCREMENT_DYN_STAT(cache_lookup_failure_stat);
1239 _action.continuation->handleEvent(CACHE_EVENT_LOOKUP_FAILED, (void *)-err);
1240 }
1241 return free_CacheVC(this);
1242 Lcallreturn:
1243 return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call
1244 Lsuccess:
1245 SET_HANDLER(&CacheVC::openReadMain);
1246 return callcont(CACHE_EVENT_OPEN_READ);
1247 Lookup:
1248 CACHE_INCREMENT_DYN_STAT(cache_lookup_success_stat);
1249 _action.continuation->handleEvent(CACHE_EVENT_LOOKUP, nullptr);
1250 return free_CacheVC(this);
1251 Learliest:
1252 first_buf = buf;
1253 buf = nullptr;
1254 earliest_key = key;
1255 last_collision = nullptr;
1256 SET_HANDLER(&CacheVC::openReadStartEarliest);
1257 return openReadStartEarliest(event, e);
1258 }
1259
1260 /*
1261 Handle a directory delete event in case of some detected corruption.
1262 */
1263 int
openReadDirDelete(int event,Event * e)1264 CacheVC::openReadDirDelete(int event, Event *e)
1265 {
1266 MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
1267 if (!lock.is_locked()) {
1268 VC_SCHED_LOCK_RETRY();
1269 }
1270
1271 dir_delete(&earliest_key, vol, &earliest_dir);
1272 return calluser(VC_EVENT_ERROR);
1273 }
1274