1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include "P_Cache.h"
25
26 #define UINT_WRAP_LTE(_x, _y) (((_y) - (_x)) < INT_MAX) // exploit overflow
27 #define UINT_WRAP_GTE(_x, _y) (((_x) - (_y)) < INT_MAX) // exploit overflow
28 #define UINT_WRAP_LT(_x, _y) (((_x) - (_y)) >= INT_MAX) // exploit overflow
29
30 // Given a key, finds the index of the alternate which matches
31 // used to get the alternate which is actually present in the document
32 int
get_alternate_index(CacheHTTPInfoVector * cache_vector,CacheKey key)33 get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key)
34 {
35 int alt_count = cache_vector->count();
36 CacheHTTPInfo *obj;
37 if (!alt_count) {
38 return -1;
39 }
40 for (int i = 0; i < alt_count; i++) {
41 obj = cache_vector->get(i);
42 if (obj->compare_object_key(&key)) {
43 // Debug("cache_key", "Resident alternate key %X", key.slice32(0));
44 return i;
45 }
46 }
47 return -1;
48 }
49
50 // Adds/Deletes alternate to the od->vector (write_vector). If the vector
51 // is empty, deletes the directory entry pointing to the vector. Each
52 // CacheVC must write the vector down to disk after making changes. If we
53 // wait till the last writer, that writer will have the responsibility of
54 // of writing the vector even if the http state machine aborts. This
55 // makes it easier to handle situations where writers abort.
56 int
updateVector(int,Event *)57 CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
58 {
59 cancel_trigger();
60 if (od->reading_vec || od->writing_vec) {
61 VC_SCHED_LOCK_RETRY();
62 }
63 int ret = 0;
64 {
65 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
66 if (!lock.is_locked() || od->writing_vec) {
67 VC_SCHED_LOCK_RETRY();
68 }
69
70 int vec = alternate.valid();
71 if (f.update) {
72 // all Update cases. Need to get the alternate index.
73 alternate_index = get_alternate_index(write_vector, update_key);
74 Debug("cache_update", "updating alternate index %d frags %d", alternate_index,
75 alternate_index >= 0 ? write_vector->get(alternate_index)->get_frag_offset_count() : -1);
76 // if its an alternate delete
77 if (!vec) {
78 ink_assert(!total_len);
79 if (alternate_index >= 0) {
80 write_vector->remove(alternate_index, true);
81 alternate_index = CACHE_ALT_REMOVED;
82 if (!write_vector->count()) {
83 dir_delete(&first_key, vol, &od->first_dir);
84 }
85 }
86 // the alternate is not there any more. somebody might have
87 // deleted it. Just close this writer
88 if (alternate_index != CACHE_ALT_REMOVED || !write_vector->count()) {
89 SET_HANDLER(&CacheVC::openWriteCloseDir);
90 return openWriteCloseDir(EVENT_IMMEDIATE, nullptr);
91 }
92 }
93 if (update_key == od->single_doc_key && (total_len || f.allow_empty_doc || !vec)) {
94 od->move_resident_alt = false;
95 }
96 }
97 if (cache_config_http_max_alts > 1 && write_vector->count() >= cache_config_http_max_alts && alternate_index < 0) {
98 if (od->move_resident_alt && get_alternate_index(write_vector, od->single_doc_key) == 0) {
99 od->move_resident_alt = false;
100 }
101 write_vector->remove(0, true);
102 }
103 if (vec) {
104 /* preserve fragment offset data from old info. This method is
105 called iff the update is a header only update so the fragment
106 data should remain valid.
107 */
108 // If we are not in header only updating case. Don't copy fragments.
109 if (alternate_index >= 0 &&
110 ((total_len == 0 && alternate.get_frag_offset_count() == 0) && !(f.allow_empty_doc && this->vio.nbytes == 0))) {
111 alternate.copy_frag_offsets_from(write_vector->get(alternate_index));
112 }
113 alternate_index = write_vector->insert(&alternate, alternate_index);
114 }
115
116 if (od->move_resident_alt && first_buf.get() && !od->has_multiple_writers()) {
117 Doc *doc = reinterpret_cast<Doc *>(first_buf->data());
118 int small_doc = static_cast<int64_t>(doc->data_len()) < static_cast<int64_t>(cache_config_alt_rewrite_max_size);
119 int have_res_alt = doc->key == od->single_doc_key;
120 // if the new alternate is not written with the vector
121 // then move the old one with the vector
122 // if its a header only update move the resident alternate
123 // with the vector.
124 // We are sure that the body of the resident alternate that we are
125 // rewriting has not changed and the alternate is not being deleted,
126 // since we set od->move_resident_alt to 0 in that case
127 // (in updateVector)
128 if (small_doc && have_res_alt && (fragment || (f.update && !total_len))) {
129 // for multiple fragment document, we must have done
130 // CacheVC:openWriteCloseDataDone
131 ink_assert(!fragment || f.data_done);
132 od->move_resident_alt = false;
133 f.rewrite_resident_alt = 1;
134 write_len = doc->data_len();
135 Debug("cache_update_alt", "rewriting resident alt size: %d key: %X, first_key: %X", write_len, doc->key.slice32(0),
136 first_key.slice32(0));
137 }
138 }
139 header_len = write_vector->marshal_length();
140 od->writing_vec = true;
141 f.use_first_key = 1;
142 SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
143 ret = do_write_call();
144 }
145 if (ret == EVENT_RETURN) {
146 return handleEvent(AIO_EVENT_DONE, nullptr);
147 }
148 return ret;
149 }
150 /*
151 The following fields of the CacheVC are used when writing down a fragment.
152 Make sure that each of the fields is set to a valid value before calling
153 this function
154 - frag_type. Checked to see if a vector needs to be marshalled.
155 - f.use_first_key. To decide if the vector should be marshalled and to set
156 the doc->key to the appropriate key (first_key or earliest_key)
157 - f.evac_vector. If set, the writer is pushed in the beginning of the
158 agg queue. And if !f.evac_vector && !f.update the alternate->object_size
159 is set to vc->total_len
160 - f.readers. If set, assumes that this is an evacuation, so the write
161 is not aborted even if vol->agg_todo_size > agg_write_backlog
162 - f.evacuator. If this is an evacuation.
163 - f.rewrite_resident_alt. The resident alternate is rewritten.
164 - f.update. Used only if the write_vector needs to be written to disk.
165 Used to set the length of the alternate to total_len.
166 - write_vector. Used only if frag_type == CACHE_FRAG_TYPE_HTTP &&
167 (f.use_fist_key || f.evac_vector) is set. Write_vector is written to disk
168 - alternate_index. Used only if write_vector needs to be written to disk.
169 Used to find out the VC's alternate in the write_vector and set its
170 length to tatal_len.
171 - write_len. The number of bytes for this fragment.
172 - total_len. The total number of bytes for the document so far.
173 Doc->total_len and alternate's total len is set to this value.
174 - first_key. Doc's first_key is set to this value.
175 - pin_in_cache. Doc's pinned value is set to this + Thread::get_hrtime().
176 - earliest_key. If f.use_first_key, Doc's key is set to this value.
177 - key. If !f.use_first_key, Doc's key is set to this value.
178 - blocks. Used only if write_len is set. Data to be written
179 - offset. Used only if write_len is set. offset into the block to copy
180 the data from.
181 - buf. Used only if f.evacuator is set. Should point to the old document.
182 The functions sets the length, offset, pinned, head and phase of vc->dir.
183 */
184
185 int
handleWrite(int event,Event *)186 CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED */)
187 {
188 // plain write case
189 ink_assert(!trigger);
190 frag_len = 0;
191
192 set_agg_write_in_progress();
193 POP_HANDLER;
194 agg_len = vol->round_to_approx_size(write_len + header_len + frag_len + sizeof(Doc));
195 vol->agg_todo_size += agg_len;
196 bool agg_error = (agg_len > AGG_SIZE || header_len + sizeof(Doc) > MAX_FRAG_SIZE ||
197 (!f.readers && (vol->agg_todo_size > cache_config_agg_write_backlog + AGG_SIZE) && write_len));
198 #ifdef CACHE_AGG_FAIL_RATE
199 agg_error = agg_error || ((uint32_t)mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX * CACHE_AGG_FAIL_RATE));
200 #endif
201 bool max_doc_error = (cache_config_max_doc_size && (cache_config_max_doc_size < vio.ndone ||
202 (vio.nbytes != INT64_MAX && (cache_config_max_doc_size < vio.nbytes))));
203
204 if (agg_error || max_doc_error) {
205 CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
206 CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
207 vol->agg_todo_size -= agg_len;
208 io.aio_result = AIO_SOFT_FAILURE;
209 if (event == EVENT_CALL) {
210 return EVENT_RETURN;
211 }
212 return handleEvent(AIO_EVENT_DONE, nullptr);
213 }
214 ink_assert(agg_len <= AGG_SIZE);
215 if (f.evac_vector) {
216 vol->agg.push(this);
217 } else {
218 vol->agg.enqueue(this);
219 }
220 if (!vol->is_io_in_progress()) {
221 return vol->aggWrite(event, this);
222 }
223 return EVENT_CONT;
224 }
225
226 static char *
iobufferblock_memcpy(char * p,int len,IOBufferBlock * ab,int offset)227 iobufferblock_memcpy(char *p, int len, IOBufferBlock *ab, int offset)
228 {
229 IOBufferBlock *b = ab;
230 while (b && len >= 0) {
231 char *start = b->_start;
232 char *end = b->_end;
233 int max_bytes = end - start;
234 max_bytes -= offset;
235 if (max_bytes <= 0) {
236 offset = -max_bytes;
237 b = b->next.get();
238 continue;
239 }
240 int bytes = len;
241 if (bytes >= max_bytes) {
242 bytes = max_bytes;
243 }
244 ::memcpy(p, start + offset, bytes);
245 p += bytes;
246 len -= bytes;
247 b = b->next.get();
248 offset = 0;
249 }
250 return p;
251 }
252
253 EvacuationBlock *
force_evacuate_head(Dir * evac_dir,int pinned)254 Vol::force_evacuate_head(Dir *evac_dir, int pinned)
255 {
256 // build an evacuation block for the object
257 EvacuationBlock *b = evacuation_block_exists(evac_dir, this);
258 // if we have already started evacuating this document, its too late
259 // to evacuate the head...bad luck
260 if (b && b->f.done) {
261 return b;
262 }
263
264 if (!b) {
265 b = new_EvacuationBlock(mutex->thread_holding);
266 b->dir = *evac_dir;
267 DDebug("cache_evac", "force: %d, %d", (int)dir_offset(evac_dir), (int)dir_phase(evac_dir));
268 evacuate[dir_evac_bucket(evac_dir)].push(b);
269 }
270 b->f.pinned = pinned;
271 b->f.evacuate_head = 1;
272 b->evac_frags.key = zero_key; // ensure that the block gets
273 // evacuated no matter what
274 b->readers = 0; // ensure that the block does not disappear
275 return b;
276 }
277
278 void
scan_for_pinned_documents()279 Vol::scan_for_pinned_documents()
280 {
281 if (cache_config_permit_pinning) {
282 // we can't evacuate anything between header->write_pos and
283 // header->write_pos + AGG_SIZE.
284 int ps = this->offset_to_vol_offset(header->write_pos + AGG_SIZE);
285 int pe = this->offset_to_vol_offset(header->write_pos + 2 * EVACUATION_SIZE + (len / PIN_SCAN_EVERY));
286 int vol_end_offset = this->offset_to_vol_offset(len + skip);
287 int before_end_of_vol = pe < vol_end_offset;
288 DDebug("cache_evac", "scan %d %d", ps, pe);
289 for (int i = 0; i < this->direntries(); i++) {
290 // is it a valid pinned object?
291 if (!dir_is_empty(&dir[i]) && dir_pinned(&dir[i]) && dir_head(&dir[i])) {
292 // select objects only within this PIN_SCAN region
293 int o = dir_offset(&dir[i]);
294 if (dir_phase(&dir[i]) == header->phase) {
295 if (before_end_of_vol || o >= (pe - vol_end_offset)) {
296 continue;
297 }
298 } else {
299 if (o < ps || o >= pe) {
300 continue;
301 }
302 }
303 force_evacuate_head(&dir[i], 1);
304 // DDebug("cache_evac", "scan pinned at offset %d %d %d %d %d %d",
305 // (int)dir_offset(&b->dir), ps, o , pe, i, (int)b->f.done);
306 }
307 }
308 }
309 }
310
311 /* NOTE:: This state can be called by an AIO thread, so DON'T DON'T
312 DON'T schedule any events on this thread using VC_SCHED_XXX or
313 mutex->thread_holding->schedule_xxx_local(). ALWAYS use
314 eventProcessor.schedule_xxx().
315 */
316 int
aggWriteDone(int event,Event * e)317 Vol::aggWriteDone(int event, Event *e)
318 {
319 cancel_trigger();
320
321 // ensure we have the cacheDirSync lock if we intend to call it later
322 // retaking the current mutex recursively is a NOOP
323 CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
324 if (!lock.is_locked()) {
325 eventProcessor.schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
326 return EVENT_CONT;
327 }
328 if (io.ok()) {
329 header->last_write_pos = header->write_pos;
330 header->write_pos += io.aiocb.aio_nbytes;
331 ink_assert(header->write_pos >= start);
332 DDebug("cache_agg", "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "", hash_text.get(), header->write_pos,
333 header->last_write_pos);
334 ink_assert(header->write_pos == header->agg_pos);
335 if (header->write_pos + EVACUATION_SIZE > scan_pos) {
336 periodic_scan();
337 }
338 agg_buf_pos = 0;
339 header->write_serial++;
340 } else {
341 // delete all the directory entries that we inserted
342 // for fragments is this aggregation buffer
343 Debug("cache_disk_error", "Write error on disk %s\n \
344 write range : [%" PRIu64 " - %" PRIu64 " bytes] [%" PRIu64 " - %" PRIu64 " blocks] \n",
345 hash_text.get(), (uint64_t)io.aiocb.aio_offset, (uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
346 (uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE,
347 (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE);
348 Dir del_dir;
349 dir_clear(&del_dir);
350 for (int done = 0; done < agg_buf_pos;) {
351 Doc *doc = reinterpret_cast<Doc *>(agg_buffer + done);
352 dir_set_offset(&del_dir, header->write_pos + done);
353 dir_delete(&doc->key, this, &del_dir);
354 done += round_to_approx_size(doc->len);
355 }
356 agg_buf_pos = 0;
357 }
358 set_io_not_in_progress();
359 // callback ready sync CacheVCs
360 CacheVC *c = nullptr;
361 while ((c = sync.dequeue())) {
362 if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) {
363 eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
364 } else {
365 sync.push(c); // put it back on the front
366 break;
367 }
368 }
369 if (dir_sync_waiting) {
370 dir_sync_waiting = false;
371 cacheDirSync->handleEvent(EVENT_IMMEDIATE, nullptr);
372 }
373 if (agg.head || sync.head) {
374 return aggWrite(event, e);
375 }
376 return EVENT_CONT;
377 }
378
379 CacheVC *
new_DocEvacuator(int nbytes,Vol * vol)380 new_DocEvacuator(int nbytes, Vol *vol)
381 {
382 CacheVC *c = new_CacheVC(vol);
383 ProxyMutex *mutex = vol->mutex.get();
384 c->base_stat = cache_evacuate_active_stat;
385 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
386 c->buf = new_IOBufferData(iobuffer_size_to_index(nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
387 c->vol = vol;
388 c->f.evacuator = 1;
389 c->earliest_key = zero_key;
390 SET_CONTINUATION_HANDLER(c, &CacheVC::evacuateDocDone);
391 return c;
392 }
393
394 int
evacuateReadHead(int,Event *)395 CacheVC::evacuateReadHead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
396 {
397 // The evacuator vc shares the lock with the volition mutex
398 ink_assert(vol->mutex->thread_holding == this_ethread());
399 cancel_trigger();
400 Doc *doc = reinterpret_cast<Doc *>(buf->data());
401 CacheHTTPInfo *alternate_tmp = nullptr;
402 if (!io.ok()) {
403 goto Ldone;
404 }
405 // a directory entry which is no longer valid may have been overwritten
406 if (!dir_valid(vol, &dir)) {
407 last_collision = nullptr;
408 goto Lcollision;
409 }
410 if (doc->magic != DOC_MAGIC || !(doc->first_key == first_key)) {
411 goto Lcollision;
412 }
413 alternate_tmp = nullptr;
414 if (doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
415 // its an http document
416 if (this->load_http_info(&vector, doc) != doc->hlen) {
417 Note("bad vector detected during evacuation");
418 goto Ldone;
419 }
420 alternate_index = get_alternate_index(&vector, earliest_key);
421 if (alternate_index < 0) {
422 goto Ldone;
423 }
424 alternate_tmp = vector.get(alternate_index);
425 doc_len = alternate_tmp->object_size_get();
426 Debug("cache_evac", "evacuateReadHead http earliest %X first: %X len: %" PRId64, first_key.slice32(0), earliest_key.slice32(0),
427 doc_len);
428 } else {
429 // non-http document
430 CacheKey next_key;
431 next_CacheKey(&next_key, &doc->key);
432 if (!(next_key == earliest_key)) {
433 goto Ldone;
434 }
435 doc_len = doc->total_len;
436 DDebug("cache_evac", "evacuateReadHead non-http earliest %X first: %X len: %" PRId64, first_key.slice32(0),
437 earliest_key.slice32(0), doc_len);
438 }
439 if (doc_len == total_len) {
440 // the whole document has been evacuated. Insert the directory
441 // entry in the directory.
442 dir_lookaside_fixup(&earliest_key, vol);
443 return free_CacheVC(this);
444 }
445 return EVENT_CONT;
446 Lcollision:
447 if (dir_probe(&first_key, vol, &dir, &last_collision)) {
448 int ret = do_read_call(&first_key);
449 if (ret == EVENT_RETURN) {
450 return handleEvent(AIO_EVENT_DONE, nullptr);
451 }
452 return ret;
453 }
454 Ldone:
455 dir_lookaside_remove(&earliest_key, vol);
456 return free_CacheVC(this);
457 }
458
459 int
evacuateDocDone(int,Event *)460 CacheVC::evacuateDocDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
461 {
462 ink_assert(vol->mutex->thread_holding == this_ethread());
463 Doc *doc = reinterpret_cast<Doc *>(buf->data());
464 DDebug("cache_evac", "evacuateDocDone %X o %d p %d new_o %d new_p %d", (int)key.slice32(0), (int)dir_offset(&overwrite_dir),
465 (int)dir_phase(&overwrite_dir), (int)dir_offset(&dir), (int)dir_phase(&dir));
466 int i = dir_evac_bucket(&overwrite_dir);
467 // nasty beeping race condition, need to have the EvacuationBlock here
468 EvacuationBlock *b = vol->evacuate[i].head;
469 for (; b; b = b->link.next) {
470 if (dir_offset(&b->dir) == dir_offset(&overwrite_dir)) {
471 // If the document is single fragment (although not tied to the vector),
472 // then we don't have to put the directory entry in the lookaside
473 // buffer. But, we have no way of finding out if the document is
474 // single fragment. doc->single_fragment() can be true for a multiple
475 // fragment document since total_len and doc->len could be equal at
476 // the time we write the fragment down. To be on the safe side, we
477 // only overwrite the entry in the directory if its not a head.
478 if (!dir_head(&overwrite_dir)) {
479 // find the earliest key
480 EvacuationKey *evac = &b->evac_frags;
481 for (; evac && !(evac->key == doc->key); evac = evac->link.next) {
482 ;
483 }
484 ink_assert(evac);
485 if (!evac) {
486 break;
487 }
488 if (evac->earliest_key.fold()) {
489 DDebug("cache_evac", "evacdocdone: evacuating key %X earliest %X", evac->key.slice32(0), evac->earliest_key.slice32(0));
490 EvacuationBlock *eblock = nullptr;
491 Dir dir_tmp;
492 dir_lookaside_probe(&evac->earliest_key, vol, &dir_tmp, &eblock);
493 if (eblock) {
494 CacheVC *earliest_evac = eblock->earliest_evacuator;
495 earliest_evac->total_len += doc->data_len();
496 if (earliest_evac->total_len == earliest_evac->doc_len) {
497 dir_lookaside_fixup(&evac->earliest_key, vol);
498 free_CacheVC(earliest_evac);
499 }
500 }
501 }
502 dir_overwrite(&doc->key, vol, &dir, &overwrite_dir);
503 }
504 // if the tag in the overwrite_dir matches the first_key in the
505 // document, then it has to be the vector. We guarantee that
506 // the first_key and the earliest_key will never collide (see
507 // Cache::open_write). Once we know its the vector, we can
508 // safely overwrite the first_key in the directory.
509 if (dir_head(&overwrite_dir) && b->f.evacuate_head) {
510 DDebug("cache_evac", "evacuateDocDone evacuate_head %X %X hlen %d offset %d", (int)key.slice32(0), (int)doc->key.slice32(0),
511 doc->hlen, (int)dir_offset(&overwrite_dir));
512
513 if (dir_compare_tag(&overwrite_dir, &doc->first_key)) {
514 OpenDirEntry *cod;
515 DDebug("cache_evac", "evacuating vector: %X %d", (int)doc->first_key.slice32(0), (int)dir_offset(&overwrite_dir));
516 if ((cod = vol->open_read(&doc->first_key))) {
517 // writer exists
518 DDebug("cache_evac", "overwriting the open directory %X %d %d", (int)doc->first_key.slice32(0),
519 (int)dir_offset(&cod->first_dir), (int)dir_offset(&dir));
520 cod->first_dir = dir;
521 }
522 if (dir_overwrite(&doc->first_key, vol, &dir, &overwrite_dir)) {
523 int64_t o = dir_offset(&overwrite_dir), n = dir_offset(&dir);
524 vol->ram_cache->fixup(&doc->first_key, static_cast<uint32_t>(o >> 32), static_cast<uint32_t>(o),
525 static_cast<uint32_t>(n >> 32), static_cast<uint32_t>(n));
526 }
527 } else {
528 DDebug("cache_evac", "evacuating earliest: %X %d", (int)doc->key.slice32(0), (int)dir_offset(&overwrite_dir));
529 ink_assert(dir_compare_tag(&overwrite_dir, &doc->key));
530 ink_assert(b->earliest_evacuator == this);
531 total_len += doc->data_len();
532 first_key = doc->first_key;
533 earliest_dir = dir;
534 if (dir_probe(&first_key, vol, &dir, &last_collision) > 0) {
535 dir_lookaside_insert(b, vol, &earliest_dir);
536 // read the vector
537 SET_HANDLER(&CacheVC::evacuateReadHead);
538 int ret = do_read_call(&first_key);
539 if (ret == EVENT_RETURN) {
540 return handleEvent(AIO_EVENT_DONE, nullptr);
541 }
542 return ret;
543 }
544 }
545 }
546 break;
547 }
548 }
549 return free_CacheVC(this);
550 }
551
552 static int
evacuate_fragments(CacheKey * key,CacheKey * earliest_key,int force,Vol * vol)553 evacuate_fragments(CacheKey *key, CacheKey *earliest_key, int force, Vol *vol)
554 {
555 Dir dir, *last_collision = nullptr;
556 int i = 0;
557 while (dir_probe(key, vol, &dir, &last_collision)) {
558 // next fragment cannot be a head...if it is, it must have been a
559 // directory collision.
560 if (dir_head(&dir)) {
561 continue;
562 }
563 EvacuationBlock *b = evacuation_block_exists(&dir, vol);
564 if (!b) {
565 b = new_EvacuationBlock(vol->mutex->thread_holding);
566 b->dir = dir;
567 b->evac_frags.key = *key;
568 b->evac_frags.earliest_key = *earliest_key;
569 vol->evacuate[dir_evac_bucket(&dir)].push(b);
570 i++;
571 } else {
572 ink_assert(dir_offset(&dir) == dir_offset(&b->dir));
573 ink_assert(dir_phase(&dir) == dir_phase(&b->dir));
574 EvacuationKey *evac_frag = evacuationKeyAllocator.alloc();
575 evac_frag->key = *key;
576 evac_frag->earliest_key = *earliest_key;
577 evac_frag->link.next = b->evac_frags.link.next;
578 b->evac_frags.link.next = evac_frag;
579 }
580 if (force) {
581 b->readers = 0;
582 }
583 DDebug("cache_evac", "next fragment %X Earliest: %X offset %d phase %d force %d", (int)key->slice32(0),
584 (int)earliest_key->slice32(0), (int)dir_offset(&dir), (int)dir_phase(&dir), force);
585 }
586 return i;
587 }
588
589 int
evacuateWrite(CacheVC * evacuator,int event,Event * e)590 Vol::evacuateWrite(CacheVC *evacuator, int event, Event *e)
591 {
592 // push to front of aggregation write list, so it is written first
593
594 evacuator->agg_len = round_to_approx_size((reinterpret_cast<Doc *>(evacuator->buf->data()))->len);
595 agg_todo_size += evacuator->agg_len;
596 /* insert the evacuator after all the other evacuators */
597 CacheVC *cur = static_cast<CacheVC *>(agg.head);
598 CacheVC *after = nullptr;
599 for (; cur && cur->f.evacuator; cur = (CacheVC *)cur->link.next) {
600 after = cur;
601 }
602 ink_assert(evacuator->agg_len <= AGG_SIZE);
603 agg.insert(evacuator, after);
604 return aggWrite(event, e);
605 }
606
607 int
evacuateDocReadDone(int event,Event * e)608 Vol::evacuateDocReadDone(int event, Event *e)
609 {
610 cancel_trigger();
611 if (event != AIO_EVENT_DONE) {
612 return EVENT_DONE;
613 }
614 ink_assert(is_io_in_progress());
615 set_io_not_in_progress();
616 ink_assert(mutex->thread_holding == this_ethread());
617 Doc *doc = reinterpret_cast<Doc *>(doc_evacuator->buf->data());
618 CacheKey next_key;
619 EvacuationBlock *b = nullptr;
620 if (doc->magic != DOC_MAGIC) {
621 Debug("cache_evac", "DOC magic: %X %d", (int)dir_tag(&doc_evacuator->overwrite_dir),
622 (int)dir_offset(&doc_evacuator->overwrite_dir));
623 ink_assert(doc->magic == DOC_MAGIC);
624 goto Ldone;
625 }
626 DDebug("cache_evac", "evacuateDocReadDone %X offset %d", (int)doc->key.slice32(0),
627 (int)dir_offset(&doc_evacuator->overwrite_dir));
628
629 b = evacuate[dir_evac_bucket(&doc_evacuator->overwrite_dir)].head;
630 while (b) {
631 if (dir_offset(&b->dir) == dir_offset(&doc_evacuator->overwrite_dir)) {
632 break;
633 }
634 b = b->link.next;
635 }
636 if (!b) {
637 goto Ldone;
638 }
639 if ((b->f.pinned && !b->readers) && doc->pinned < static_cast<uint32_t>(Thread::get_hrtime() / HRTIME_SECOND)) {
640 goto Ldone;
641 }
642
643 if (dir_head(&b->dir) && b->f.evacuate_head) {
644 ink_assert(!b->evac_frags.key.fold());
645 // if its a head (vector), evacuation is real simple...we just
646 // need to write this vector down and overwrite the directory entry.
647 if (dir_compare_tag(&b->dir, &doc->first_key)) {
648 doc_evacuator->key = doc->first_key;
649 b->evac_frags.key = doc->first_key;
650 DDebug("cache_evac", "evacuating vector %X offset %d", (int)doc->first_key.slice32(0),
651 (int)dir_offset(&doc_evacuator->overwrite_dir));
652 b->f.unused = 57;
653 } else {
654 // if its an earliest fragment (alternate) evacuation, things get
655 // a little tricky. We have to propagate the earliest key to the next
656 // fragments for this alternate. The last fragment to be evacuated
657 // fixes up the lookaside buffer.
658 doc_evacuator->key = doc->key;
659 doc_evacuator->earliest_key = doc->key;
660 b->evac_frags.key = doc->key;
661 b->evac_frags.earliest_key = doc->key;
662 b->earliest_evacuator = doc_evacuator;
663 DDebug("cache_evac", "evacuating earliest %X %X evac: %p offset: %d", (int)b->evac_frags.key.slice32(0),
664 (int)doc->key.slice32(0), doc_evacuator, (int)dir_offset(&doc_evacuator->overwrite_dir));
665 b->f.unused = 67;
666 }
667 } else {
668 // find which key matches the document
669 EvacuationKey *ek = &b->evac_frags;
670 for (; ek && !(ek->key == doc->key); ek = ek->link.next) {
671 ;
672 }
673 if (!ek) {
674 b->f.unused = 77;
675 goto Ldone;
676 }
677 doc_evacuator->key = ek->key;
678 doc_evacuator->earliest_key = ek->earliest_key;
679 DDebug("cache_evac", "evacuateDocReadDone key: %X earliest: %X", (int)ek->key.slice32(0), (int)ek->earliest_key.slice32(0));
680 b->f.unused = 87;
681 }
682 // if the tag in the c->dir does match the first_key in the
683 // document, then it has to be the earliest fragment. We guarantee that
684 // the first_key and the earliest_key will never collide (see
685 // Cache::open_write).
686 if (!dir_head(&b->dir) || !dir_compare_tag(&b->dir, &doc->first_key)) {
687 next_CacheKey(&next_key, &doc->key);
688 evacuate_fragments(&next_key, &doc_evacuator->earliest_key, !b->readers, this);
689 }
690 return evacuateWrite(doc_evacuator, event, e);
691 Ldone:
692 free_CacheVC(doc_evacuator);
693 doc_evacuator = nullptr;
694 return aggWrite(event, e);
695 }
696
697 int
evac_range(off_t low,off_t high,int evac_phase)698 Vol::evac_range(off_t low, off_t high, int evac_phase)
699 {
700 off_t s = this->offset_to_vol_offset(low);
701 off_t e = this->offset_to_vol_offset(high);
702 int si = dir_offset_evac_bucket(s);
703 int ei = dir_offset_evac_bucket(e);
704
705 for (int i = si; i <= ei; i++) {
706 EvacuationBlock *b = evacuate[i].head;
707 EvacuationBlock *first = nullptr;
708 int64_t first_offset = INT64_MAX;
709 for (; b; b = b->link.next) {
710 int64_t offset = dir_offset(&b->dir);
711 int phase = dir_phase(&b->dir);
712 if (offset >= s && offset < e && !b->f.done && phase == evac_phase) {
713 if (offset < first_offset) {
714 first = b;
715 first_offset = offset;
716 }
717 }
718 }
719 if (first) {
720 first->f.done = 1;
721 io.aiocb.aio_fildes = fd;
722 io.aiocb.aio_nbytes = dir_approx_size(&first->dir);
723 io.aiocb.aio_offset = this->vol_offset(&first->dir);
724 if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast<off_t>(skip + len)) {
725 io.aiocb.aio_nbytes = skip + len - io.aiocb.aio_offset;
726 }
727 doc_evacuator = new_DocEvacuator(io.aiocb.aio_nbytes, this);
728 doc_evacuator->overwrite_dir = first->dir;
729
730 io.aiocb.aio_buf = doc_evacuator->buf->data();
731 io.action = this;
732 io.thread = AIO_CALLBACK_THREAD_ANY;
733 DDebug("cache_evac", "evac_range evacuating %X %d", (int)dir_tag(&first->dir), (int)dir_offset(&first->dir));
734 SET_HANDLER(&Vol::evacuateDocReadDone);
735 ink_assert(ink_aio_read(&io) >= 0);
736 return -1;
737 }
738 }
739 return 0;
740 }
741
742 static int
agg_copy(char * p,CacheVC * vc)743 agg_copy(char *p, CacheVC *vc)
744 {
745 Vol *vol = vc->vol;
746 off_t o = vol->header->write_pos + vol->agg_buf_pos;
747
748 if (!vc->f.evacuator) {
749 Doc *doc = reinterpret_cast<Doc *>(p);
750 IOBufferBlock *res_alt_blk = nullptr;
751
752 uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeof(Doc);
753 ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeof(Doc));
754 ink_assert(vol->round_to_approx_size(len) == vc->agg_len);
755 // update copy of directory entry for this document
756 dir_set_approx_size(&vc->dir, vc->agg_len);
757 dir_set_offset(&vc->dir, vol->offset_to_vol_offset(o));
758 ink_assert(vol->vol_offset(&vc->dir) < (vol->skip + vol->len));
759 dir_set_phase(&vc->dir, vol->header->phase);
760
761 // fill in document header
762 doc->magic = DOC_MAGIC;
763 doc->len = len;
764 doc->hlen = vc->header_len;
765 doc->doc_type = vc->frag_type;
766 doc->v_major = CACHE_DB_MAJOR_VERSION;
767 doc->v_minor = CACHE_DB_MINOR_VERSION;
768 doc->unused = 0; // force this for forward compatibility.
769 doc->total_len = vc->total_len;
770 doc->first_key = vc->first_key;
771 doc->sync_serial = vol->header->sync_serial;
772 vc->write_serial = doc->write_serial = vol->header->write_serial;
773 doc->checksum = DOC_NO_CHECKSUM;
774 if (vc->pin_in_cache) {
775 dir_set_pinned(&vc->dir, 1);
776 doc->pinned = static_cast<uint32_t>(Thread::get_hrtime() / HRTIME_SECOND) + vc->pin_in_cache;
777 } else {
778 dir_set_pinned(&vc->dir, 0);
779 doc->pinned = 0;
780 }
781
782 if (vc->f.use_first_key) {
783 if (doc->data_len() || vc->f.allow_empty_doc) {
784 doc->key = vc->earliest_key;
785 } else { // the vector is being written by itself
786 if (vc->earliest_key == zero_key) {
787 do {
788 rand_CacheKey(&doc->key, vc->vol->mutex);
789 } while (DIR_MASK_TAG(doc->key.slice32(2)) == DIR_MASK_TAG(vc->first_key.slice32(2)));
790 } else {
791 prev_CacheKey(&doc->key, &vc->earliest_key);
792 }
793 }
794 dir_set_head(&vc->dir, true);
795 } else {
796 doc->key = vc->key;
797 dir_set_head(&vc->dir, !vc->fragment);
798 }
799
800 if (vc->f.rewrite_resident_alt) {
801 ink_assert(vc->f.use_first_key);
802 Doc *res_doc = reinterpret_cast<Doc *>(vc->first_buf->data());
803 res_alt_blk = new_IOBufferBlock(vc->first_buf, res_doc->data_len(), sizeof(Doc) + res_doc->hlen);
804 doc->key = res_doc->key;
805 doc->total_len = res_doc->data_len();
806 }
807 // update the new_info object_key, and total_len and dirinfo
808 if (vc->header_len) {
809 ink_assert(vc->f.use_first_key);
810 if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
811 ink_assert(vc->write_vector->count() > 0);
812 if (!vc->f.update && !vc->f.evac_vector) {
813 ink_assert(!(vc->first_key == zero_key));
814 CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
815 http_info->object_size_set(vc->total_len);
816 }
817 // update + data_written => Update case (b)
818 // need to change the old alternate's object length
819 if (vc->f.update && vc->total_len) {
820 CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
821 http_info->object_size_set(vc->total_len);
822 }
823 ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
824 ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len));
825 } else {
826 memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
827 }
828 // the single fragment flag is not used in the write call.
829 // putting it in for completeness.
830 vc->f.single_fragment = doc->single_fragment();
831 }
832 // move data
833 if (vc->write_len) {
834 {
835 ProxyMutex *mutex ATS_UNUSED = vc->vol->mutex.get();
836 ink_assert(mutex->thread_holding == this_ethread());
837 CACHE_DEBUG_SUM_DYN_STAT(cache_write_bytes_stat, vc->write_len);
838 }
839 if (vc->f.rewrite_resident_alt) {
840 iobufferblock_memcpy(doc->data(), vc->write_len, res_alt_blk, 0);
841 } else {
842 iobufferblock_memcpy(doc->data(), vc->write_len, vc->blocks.get(), vc->offset);
843 }
844 #ifdef VERIFY_JTEST_DATA
845 if (f.use_first_key && header_len) {
846 int ib = 0, xd = 0;
847 char xx[500];
848 new_info.request_get().url_get().print(xx, 500, &ib, &xd);
849 char *x = xx;
850 for (int q = 0; q < 3; q++)
851 x = strchr(x + 1, '/');
852 ink_assert(!memcmp(doc->hdr(), x, ib - (x - xx)));
853 }
854 #endif
855 }
856 if (cache_config_enable_checksum) {
857 doc->checksum = 0;
858 for (char *b = doc->hdr(); b < reinterpret_cast<char *>(doc) + doc->len; b++) {
859 doc->checksum += *b;
860 }
861 }
862 if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment) {
863 ink_assert(doc->hlen);
864 }
865
866 if (res_alt_blk) {
867 res_alt_blk->free();
868 }
869
870 return vc->agg_len;
871 } else {
872 // for evacuated documents, copy the data, and update directory
873 Doc *doc = reinterpret_cast<Doc *>(vc->buf->data());
874 int l = vc->vol->round_to_approx_size(doc->len);
875 {
876 ProxyMutex *mutex ATS_UNUSED = vc->vol->mutex.get();
877 ink_assert(mutex->thread_holding == this_ethread());
878 CACHE_DEBUG_INCREMENT_DYN_STAT(cache_gc_frags_evacuated_stat);
879 CACHE_DEBUG_SUM_DYN_STAT(cache_gc_bytes_evacuated_stat, l);
880 }
881
882 doc->sync_serial = vc->vol->header->sync_serial;
883 doc->write_serial = vc->vol->header->write_serial;
884
885 memcpy(p, doc, doc->len);
886
887 vc->dir = vc->overwrite_dir;
888 dir_set_offset(&vc->dir, vc->vol->offset_to_vol_offset(o));
889 dir_set_phase(&vc->dir, vc->vol->header->phase);
890 return l;
891 }
892 }
893
894 inline void
evacuate_cleanup_blocks(int i)895 Vol::evacuate_cleanup_blocks(int i)
896 {
897 EvacuationBlock *b = evacuate[i].head;
898 while (b) {
899 if (b->f.done && ((header->phase != dir_phase(&b->dir) && header->write_pos > this->vol_offset(&b->dir)) ||
900 (header->phase == dir_phase(&b->dir) && header->write_pos <= this->vol_offset(&b->dir)))) {
901 EvacuationBlock *x = b;
902 DDebug("cache_evac", "evacuate cleanup free %X offset %d", (int)b->evac_frags.key.slice32(0), (int)dir_offset(&b->dir));
903 b = b->link.next;
904 evacuate[i].remove(x);
905 free_EvacuationBlock(x, mutex->thread_holding);
906 continue;
907 }
908 b = b->link.next;
909 }
910 }
911
912 void
evacuate_cleanup()913 Vol::evacuate_cleanup()
914 {
915 int64_t eo = ((header->write_pos - start) / CACHE_BLOCK_SIZE) + 1;
916 int64_t e = dir_offset_evac_bucket(eo);
917 int64_t sx = e - (evacuate_size / PIN_SCAN_EVERY) - 1;
918 int64_t s = sx;
919 int i;
920
921 if (e > evacuate_size) {
922 e = evacuate_size;
923 }
924 if (sx < 0) {
925 s = 0;
926 }
927 for (i = s; i < e; i++) {
928 evacuate_cleanup_blocks(i);
929 }
930
931 // if we have wrapped, handle the end bit
932 if (sx <= 0) {
933 s = evacuate_size + sx - 2;
934 if (s < 0) {
935 s = 0;
936 }
937 for (i = s; i < evacuate_size; i++) {
938 evacuate_cleanup_blocks(i);
939 }
940 }
941 }
942
943 void
periodic_scan()944 Vol::periodic_scan()
945 {
946 evacuate_cleanup();
947 scan_for_pinned_documents();
948 if (header->write_pos == start) {
949 scan_pos = start;
950 }
951 scan_pos += len / PIN_SCAN_EVERY;
952 }
953
954 void
agg_wrap()955 Vol::agg_wrap()
956 {
957 header->write_pos = start;
958 header->phase = !header->phase;
959
960 header->cycle++;
961 header->agg_pos = header->write_pos;
962 dir_lookaside_cleanup(this);
963 dir_clean_vol(this);
964 {
965 Vol *vol = this;
966 CACHE_INCREMENT_DYN_STAT(cache_directory_wrap_stat);
967 Note("Cache volume %d on disk '%s' wraps around", vol->cache_vol->vol_number, vol->hash_text.get());
968 }
969 periodic_scan();
970 }
971
972 /* NOTE: This state can be called by an AIO thread, so DON'T DON'T
973 DON'T schedule any events on this thread using VC_SCHED_XXX or
974 mutex->thread_holding->schedule_xxx_local(). ALWAYS use
975 eventProcessor.schedule_xxx().
976 Also, make sure that any functions called by this also use
977 the eventProcessor to schedule events
978 */
979 int
aggWrite(int event,void *)980 Vol::aggWrite(int event, void * /* e ATS_UNUSED */)
981 {
982 ink_assert(!is_io_in_progress());
983
984 Que(CacheVC, link) tocall;
985 CacheVC *c;
986
987 cancel_trigger();
988
989 Lagain:
990 // calculate length of aggregated write
991 for (c = static_cast<CacheVC *>(agg.head); c;) {
992 int writelen = c->agg_len;
993 // [amc] this is checked multiple places, on here was it strictly less.
994 ink_assert(writelen <= AGG_SIZE);
995 if (agg_buf_pos + writelen > AGG_SIZE || header->write_pos + agg_buf_pos + writelen > (skip + len)) {
996 break;
997 }
998 DDebug("agg_read", "copying: %d, %" PRIu64 ", key: %d", agg_buf_pos, header->write_pos + agg_buf_pos, c->first_key.slice32(0));
999 int wrotelen = agg_copy(agg_buffer + agg_buf_pos, c);
1000 ink_assert(writelen == wrotelen);
1001 agg_todo_size -= writelen;
1002 agg_buf_pos += writelen;
1003 CacheVC *n = (CacheVC *)c->link.next;
1004 agg.dequeue();
1005 if (c->f.sync && c->f.use_first_key) {
1006 CacheVC *last = sync.tail;
1007 while (last && UINT_WRAP_LT(c->write_serial, last->write_serial)) {
1008 last = (CacheVC *)last->link.prev;
1009 }
1010 sync.insert(c, last);
1011 } else if (c->f.evacuator) {
1012 c->handleEvent(AIO_EVENT_DONE, nullptr);
1013 } else {
1014 tocall.enqueue(c);
1015 }
1016 c = n;
1017 }
1018
1019 // if we got nothing...
1020 if (!agg_buf_pos) {
1021 if (!agg.head && !sync.head) { // nothing to get
1022 return EVENT_CONT;
1023 }
1024 if (header->write_pos == start) {
1025 // write aggregation too long, bad bad, punt on everything.
1026 Note("write aggregation exceeds vol size");
1027 ink_assert(!tocall.head);
1028 ink_assert(false);
1029 while ((c = agg.dequeue())) {
1030 agg_todo_size -= c->agg_len;
1031 eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
1032 }
1033 return EVENT_CONT;
1034 }
1035 // start back
1036 if (agg.head) {
1037 agg_wrap();
1038 goto Lagain;
1039 }
1040 }
1041
1042 // evacuate space
1043 off_t end = header->write_pos + agg_buf_pos + EVACUATION_SIZE;
1044 if (evac_range(header->write_pos, end, !header->phase) < 0) {
1045 goto Lwait;
1046 }
1047 if (end > skip + len) {
1048 if (evac_range(start, start + (end - (skip + len)), header->phase) < 0) {
1049 goto Lwait;
1050 }
1051 }
1052
1053 // if agg.head, then we are near the end of the disk, so
1054 // write down the aggregation in whatever size it is.
1055 if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !sync.head && !dir_sync_waiting) {
1056 goto Lwait;
1057 }
1058
1059 // write sync marker
1060 if (!agg_buf_pos) {
1061 ink_assert(sync.head);
1062 int l = round_to_approx_size(sizeof(Doc));
1063 agg_buf_pos = l;
1064 Doc *d = reinterpret_cast<Doc *>(agg_buffer);
1065 memset(static_cast<void *>(d), 0, sizeof(Doc));
1066 d->magic = DOC_MAGIC;
1067 d->len = l;
1068 d->sync_serial = header->sync_serial;
1069 d->write_serial = header->write_serial;
1070 }
1071
1072 // set write limit
1073 header->agg_pos = header->write_pos + agg_buf_pos;
1074
1075 io.aiocb.aio_fildes = fd;
1076 io.aiocb.aio_offset = header->write_pos;
1077 io.aiocb.aio_buf = agg_buffer;
1078 io.aiocb.aio_nbytes = agg_buf_pos;
1079 io.action = this;
1080 /*
1081 Callback on AIO thread so that we can issue a new write ASAP
1082 as all writes are serialized in the volume. This is not necessary
1083 for reads proceed independently.
1084 */
1085 io.thread = AIO_CALLBACK_THREAD_AIO;
1086 SET_HANDLER(&Vol::aggWriteDone);
1087 ink_aio_write(&io);
1088
1089 Lwait:
1090 int ret = EVENT_CONT;
1091 while ((c = tocall.dequeue())) {
1092 if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) {
1093 ret = EVENT_RETURN;
1094 } else {
1095 eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
1096 }
1097 }
1098 return ret;
1099 }
1100
1101 int
openWriteCloseDir(int,Event *)1102 CacheVC::openWriteCloseDir(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
1103 {
1104 cancel_trigger();
1105 {
1106 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
1107 if (!lock.is_locked()) {
1108 SET_HANDLER(&CacheVC::openWriteCloseDir);
1109 ink_assert(!is_io_in_progress());
1110 VC_SCHED_LOCK_RETRY();
1111 }
1112 vol->close_write(this);
1113 if (closed < 0 && fragment) {
1114 dir_delete(&earliest_key, vol, &earliest_dir);
1115 }
1116 }
1117 if (is_debug_tag_set("cache_update")) {
1118 if (f.update && closed > 0) {
1119 if (!total_len && !f.allow_empty_doc && alternate_index != CACHE_ALT_REMOVED) {
1120 Debug("cache_update", "header only %d (%" PRIu64 ", %" PRIu64 ")", DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0],
1121 update_key.b[1]);
1122
1123 } else if ((total_len || f.allow_empty_doc) && alternate_index != CACHE_ALT_REMOVED) {
1124 Debug("cache_update", "header body, %d, (%" PRIu64 ", %" PRIu64 "), (%" PRIu64 ", %" PRIu64 ")",
1125 DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], update_key.b[1], earliest_key.b[0], earliest_key.b[1]);
1126 } else if (!total_len && alternate_index == CACHE_ALT_REMOVED) {
1127 Debug("cache_update", "alt delete, %d, (%" PRIu64 ", %" PRIu64 ")", DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0],
1128 update_key.b[1]);
1129 }
1130 }
1131 }
1132 // update the appropriate stat variable
1133 // These variables may not give the current no of documents with
1134 // one, two and three or more fragments. This is because for
1135 // updates we dont decrement the variable corresponding the old
1136 // size of the document
1137 if ((closed == 1) && (total_len > 0 || f.allow_empty_doc)) {
1138 DDebug("cache_stats", "Fragment = %d", fragment);
1139 switch (fragment) {
1140 case 0:
1141 CACHE_INCREMENT_DYN_STAT(cache_single_fragment_document_count_stat);
1142 break;
1143 case 1:
1144 CACHE_INCREMENT_DYN_STAT(cache_two_fragment_document_count_stat);
1145 break;
1146 default:
1147 CACHE_INCREMENT_DYN_STAT(cache_three_plus_plus_fragment_document_count_stat);
1148 break;
1149 }
1150 }
1151 if (f.close_complete) {
1152 recursive++;
1153 ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
1154 vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, (void *)&vio);
1155 recursive--;
1156 }
1157 return free_CacheVC(this);
1158 }
1159
1160 int
openWriteCloseHeadDone(int event,Event * e)1161 CacheVC::openWriteCloseHeadDone(int event, Event *e)
1162 {
1163 if (event == AIO_EVENT_DONE) {
1164 set_io_not_in_progress();
1165 } else if (is_io_in_progress()) {
1166 return EVENT_CONT;
1167 }
1168 {
1169 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
1170 if (!lock.is_locked()) {
1171 VC_LOCK_RETRY_EVENT();
1172 }
1173 od->writing_vec = false;
1174 if (!io.ok()) {
1175 goto Lclose;
1176 }
1177 ink_assert(f.use_first_key);
1178 if (!od->dont_update_directory) {
1179 if (dir_is_empty(&od->first_dir)) {
1180 dir_insert(&first_key, vol, &dir);
1181 } else {
1182 // multiple fragment vector write
1183 dir_overwrite(&first_key, vol, &dir, &od->first_dir, false);
1184 // insert moved resident alternate
1185 if (od->move_resident_alt) {
1186 if (dir_valid(vol, &od->single_doc_dir)) {
1187 dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
1188 }
1189 od->move_resident_alt = false;
1190 }
1191 }
1192 od->first_dir = dir;
1193 if (frag_type == CACHE_FRAG_TYPE_HTTP && f.single_fragment) {
1194 // fragment is tied to the vector
1195 od->move_resident_alt = true;
1196 if (!f.rewrite_resident_alt) {
1197 od->single_doc_key = earliest_key;
1198 }
1199 dir_assign(&od->single_doc_dir, &dir);
1200 dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
1201 }
1202 }
1203 }
1204 Lclose:
1205 return openWriteCloseDir(event, e);
1206 }
1207
1208 int
openWriteCloseHead(int event,Event * e)1209 CacheVC::openWriteCloseHead(int event, Event *e)
1210 {
1211 cancel_trigger();
1212 f.use_first_key = 1;
1213 if (io.ok()) {
1214 ink_assert(fragment || (length == (int64_t)total_len));
1215 } else {
1216 return openWriteCloseDir(event, e);
1217 }
1218 if (f.data_done) {
1219 write_len = 0;
1220 } else {
1221 write_len = length;
1222 }
1223 if (frag_type == CACHE_FRAG_TYPE_HTTP) {
1224 SET_HANDLER(&CacheVC::updateVector);
1225 return updateVector(EVENT_IMMEDIATE, nullptr);
1226 } else {
1227 header_len = header_to_write_len;
1228 SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
1229 return do_write_lock();
1230 }
1231 }
1232
1233 int
openWriteCloseDataDone(int event,Event * e)1234 CacheVC::openWriteCloseDataDone(int event, Event *e)
1235 {
1236 int ret = 0;
1237 cancel_trigger();
1238
1239 if (event == AIO_EVENT_DONE) {
1240 set_io_not_in_progress();
1241 } else if (is_io_in_progress()) {
1242 return EVENT_CONT;
1243 }
1244 if (!io.ok()) {
1245 return openWriteCloseDir(event, e);
1246 }
1247 {
1248 CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
1249 if (!lock.is_locked()) {
1250 VC_LOCK_RETRY_EVENT();
1251 }
1252 if (!fragment) {
1253 ink_assert(key == earliest_key);
1254 earliest_dir = dir;
1255 } else {
1256 // Store the offset only if there is a table.
1257 // Currently there is no alt (and thence no table) for non-HTTP.
1258 if (alternate.valid()) {
1259 alternate.push_frag_offset(write_pos);
1260 }
1261 }
1262 fragment++;
1263 write_pos += write_len;
1264 dir_insert(&key, vol, &dir);
1265 blocks = iobufferblock_skip(blocks.get(), &offset, &length, write_len);
1266 next_CacheKey(&key, &key);
1267 if (length) {
1268 write_len = length;
1269 if (write_len > MAX_FRAG_SIZE) {
1270 write_len = MAX_FRAG_SIZE;
1271 }
1272 if ((ret = do_write_call()) == EVENT_RETURN) {
1273 goto Lcallreturn;
1274 }
1275 return ret;
1276 }
1277 f.data_done = 1;
1278 return openWriteCloseHead(event, e); // must be called under vol lock from here
1279 }
1280 Lcallreturn:
1281 return handleEvent(AIO_EVENT_DONE, nullptr);
1282 }
1283
1284 int
openWriteClose(int event,Event * e)1285 CacheVC::openWriteClose(int event, Event *e)
1286 {
1287 cancel_trigger();
1288 if (is_io_in_progress()) {
1289 if (event != AIO_EVENT_DONE) {
1290 return EVENT_CONT;
1291 }
1292 set_io_not_in_progress();
1293 if (!io.ok()) {
1294 return openWriteCloseDir(event, e);
1295 }
1296 }
1297 if (closed > 0 || f.allow_empty_doc) {
1298 if (total_len == 0) {
1299 if (f.update || f.allow_empty_doc) {
1300 return updateVector(event, e);
1301 } else {
1302 // If we've been CLOSE'd but nothing has been written then
1303 // this close is transformed into an abort.
1304 closed = -1;
1305 return openWriteCloseDir(event, e);
1306 }
1307 }
1308 if (length && (fragment || length > static_cast<int>(MAX_FRAG_SIZE))) {
1309 SET_HANDLER(&CacheVC::openWriteCloseDataDone);
1310 write_len = length;
1311 if (write_len > MAX_FRAG_SIZE) {
1312 write_len = MAX_FRAG_SIZE;
1313 }
1314 return do_write_lock_call();
1315 } else {
1316 return openWriteCloseHead(event, e);
1317 }
1318 } else {
1319 return openWriteCloseDir(event, e);
1320 }
1321 }
1322
1323 int
openWriteWriteDone(int event,Event * e)1324 CacheVC::openWriteWriteDone(int event, Event *e)
1325 {
1326 cancel_trigger();
1327 if (event == AIO_EVENT_DONE) {
1328 set_io_not_in_progress();
1329 } else if (is_io_in_progress()) {
1330 return EVENT_CONT;
1331 }
1332 // In the event of VC_EVENT_ERROR, the cont must do an io_close
1333 if (!io.ok()) {
1334 if (closed) {
1335 closed = -1;
1336 return die();
1337 }
1338 SET_HANDLER(&CacheVC::openWriteMain);
1339 return calluser(VC_EVENT_ERROR);
1340 }
1341 {
1342 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
1343 if (!lock.is_locked()) {
1344 VC_LOCK_RETRY_EVENT();
1345 }
1346 // store the earliest directory. Need to remove the earliest dir
1347 // in case the writer aborts.
1348 if (!fragment) {
1349 ink_assert(key == earliest_key);
1350 earliest_dir = dir;
1351 } else {
1352 // Store the offset only if there is a table.
1353 // Currently there is no alt (and thence no table) for non-HTTP.
1354 if (alternate.valid()) {
1355 alternate.push_frag_offset(write_pos);
1356 }
1357 }
1358 ++fragment;
1359 write_pos += write_len;
1360 dir_insert(&key, vol, &dir);
1361 DDebug("cache_insert", "WriteDone: %X, %X, %d", key.slice32(0), first_key.slice32(0), write_len);
1362 blocks = iobufferblock_skip(blocks.get(), &offset, &length, write_len);
1363 next_CacheKey(&key, &key);
1364 }
1365 if (closed) {
1366 return die();
1367 }
1368 SET_HANDLER(&CacheVC::openWriteMain);
1369 return openWriteMain(event, e);
1370 }
1371
1372 static inline int
target_fragment_size()1373 target_fragment_size()
1374 {
1375 uint64_t value = cache_config_target_fragment_size - sizeof(Doc);
1376 ink_release_assert(value <= MAX_FRAG_SIZE);
1377 return value;
1378 }
1379
1380 int
openWriteMain(int,Event *)1381 CacheVC::openWriteMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
1382 {
1383 cancel_trigger();
1384 int called_user = 0;
1385 ink_assert(!is_io_in_progress());
1386 Lagain:
1387 if (!vio.buffer.writer()) {
1388 if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE) {
1389 return EVENT_DONE;
1390 }
1391 if (!vio.buffer.writer()) {
1392 return EVENT_CONT;
1393 }
1394 }
1395 if (vio.ntodo() <= 0) {
1396 called_user = 1;
1397 if (calluser(VC_EVENT_WRITE_COMPLETE) == EVENT_DONE) {
1398 return EVENT_DONE;
1399 }
1400 ink_assert(!f.close_complete || !"close expected after write COMPLETE");
1401 if (vio.ntodo() <= 0) {
1402 return EVENT_CONT;
1403 }
1404 }
1405 int64_t ntodo = static_cast<int64_t>(vio.ntodo() + length);
1406 int64_t total_avail = vio.buffer.reader()->read_avail();
1407 int64_t avail = total_avail;
1408 int64_t towrite = avail + length;
1409 if (towrite > ntodo) {
1410 avail -= (towrite - ntodo);
1411 towrite = ntodo;
1412 }
1413 if (towrite > static_cast<int>(MAX_FRAG_SIZE)) {
1414 avail -= (towrite - MAX_FRAG_SIZE);
1415 towrite = MAX_FRAG_SIZE;
1416 }
1417 if (!blocks && towrite) {
1418 blocks = vio.buffer.reader()->block;
1419 offset = vio.buffer.reader()->start_offset;
1420 }
1421 if (avail > 0) {
1422 vio.buffer.reader()->consume(avail);
1423 vio.ndone += avail;
1424 total_len += avail;
1425 }
1426 length = static_cast<uint64_t>(towrite);
1427 if (length > target_fragment_size() && (length < target_fragment_size() + target_fragment_size() / 4)) {
1428 write_len = target_fragment_size();
1429 } else {
1430 write_len = length;
1431 }
1432 bool not_writing = towrite != ntodo && towrite < target_fragment_size();
1433 if (!called_user) {
1434 if (not_writing) {
1435 called_user = 1;
1436 if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE) {
1437 return EVENT_DONE;
1438 }
1439 goto Lagain;
1440 } else if (vio.ntodo() <= 0) {
1441 goto Lagain;
1442 }
1443 }
1444 if (not_writing) {
1445 return EVENT_CONT;
1446 }
1447 if (towrite == ntodo && f.close_complete) {
1448 closed = 1;
1449 SET_HANDLER(&CacheVC::openWriteClose);
1450 return openWriteClose(EVENT_NONE, nullptr);
1451 }
1452 SET_HANDLER(&CacheVC::openWriteWriteDone);
1453 return do_write_lock_call();
1454 }
1455
1456 // begin overwrite
1457 int
openWriteOverwrite(int event,Event * e)1458 CacheVC::openWriteOverwrite(int event, Event *e)
1459 {
1460 cancel_trigger();
1461 if (event != AIO_EVENT_DONE) {
1462 if (event == EVENT_IMMEDIATE) {
1463 last_collision = nullptr;
1464 }
1465 } else {
1466 Doc *doc = nullptr;
1467 set_io_not_in_progress();
1468 if (_action.cancelled) {
1469 return openWriteCloseDir(event, e);
1470 }
1471 if (!io.ok()) {
1472 goto Ldone;
1473 }
1474 doc = reinterpret_cast<Doc *>(buf->data());
1475 if (!(doc->first_key == first_key)) {
1476 goto Lcollision;
1477 }
1478 od->first_dir = dir;
1479 first_buf = buf;
1480 goto Ldone;
1481 }
1482 Lcollision : {
1483 CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
1484 if (!lock.is_locked()) {
1485 VC_LOCK_RETRY_EVENT();
1486 }
1487 int res = dir_probe(&first_key, vol, &dir, &last_collision);
1488 if (res > 0) {
1489 if ((res = do_read_call(&first_key)) == EVENT_RETURN) {
1490 goto Lcallreturn;
1491 }
1492 return res;
1493 }
1494 }
1495 Ldone:
1496 SET_HANDLER(&CacheVC::openWriteMain);
1497 return callcont(CACHE_EVENT_OPEN_WRITE);
1498 Lcallreturn:
1499 return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call
1500 }
1501
1502 // openWriteStartDone handles vector read (addition of alternates)
1503 // and lock misses
1504 int
openWriteStartDone(int event,Event * e)1505 CacheVC::openWriteStartDone(int event, Event *e)
1506 {
1507 intptr_t err = ECACHE_NO_DOC;
1508 cancel_trigger();
1509 if (is_io_in_progress()) {
1510 if (event != AIO_EVENT_DONE) {
1511 return EVENT_CONT;
1512 }
1513 set_io_not_in_progress();
1514 }
1515 {
1516 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
1517 if (!lock.is_locked()) {
1518 VC_LOCK_RETRY_EVENT();
1519 }
1520
1521 if (_action.cancelled && (!od || !od->has_multiple_writers())) {
1522 goto Lcancel;
1523 }
1524
1525 if (event == AIO_EVENT_DONE) { // vector read done
1526 Doc *doc = reinterpret_cast<Doc *>(buf->data());
1527 if (!io.ok()) {
1528 err = ECACHE_READ_FAIL;
1529 goto Lfailure;
1530 }
1531
1532 /* INKqa07123.
1533 A directory entry which is no longer valid may have been overwritten.
1534 We need to start afresh from the beginning by setting last_collision
1535 to nullptr.
1536 */
1537 if (!dir_valid(vol, &dir)) {
1538 DDebug("cache_write", "OpenReadStartDone: Dir not valid: Write Head: %" PRId64 ", Dir: %" PRId64,
1539 (int64_t)vol->offset_to_vol_offset(vol->header->write_pos), dir_offset(&dir));
1540 last_collision = nullptr;
1541 goto Lcollision;
1542 }
1543 if (!(doc->first_key == first_key)) {
1544 goto Lcollision;
1545 }
1546
1547 if (doc->magic != DOC_MAGIC || !doc->hlen || this->load_http_info(write_vector, doc, buf.object()) != doc->hlen) {
1548 err = ECACHE_BAD_META_DATA;
1549 goto Lfailure;
1550 }
1551 ink_assert(write_vector->count() > 0);
1552 od->first_dir = dir;
1553 first_dir = dir;
1554 if (doc->single_fragment()) {
1555 // fragment is tied to the vector
1556 od->move_resident_alt = true;
1557 od->single_doc_key = doc->key;
1558 dir_assign(&od->single_doc_dir, &dir);
1559 dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
1560 }
1561 first_buf = buf;
1562 goto Lsuccess;
1563 }
1564
1565 Lcollision:
1566 int if_writers = ((uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES);
1567 if (!od) {
1568 if ((err = vol->open_write(this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0) {
1569 goto Lfailure;
1570 }
1571 if (od->has_multiple_writers()) {
1572 MUTEX_RELEASE(lock);
1573 SET_HANDLER(&CacheVC::openWriteMain);
1574 return callcont(CACHE_EVENT_OPEN_WRITE);
1575 }
1576 }
1577 // check for collision
1578 if (dir_probe(&first_key, vol, &dir, &last_collision)) {
1579 od->reading_vec = true;
1580 int ret = do_read_call(&first_key);
1581 if (ret == EVENT_RETURN) {
1582 goto Lcallreturn;
1583 }
1584 return ret;
1585 }
1586 if (f.update) {
1587 // fail update because vector has been GC'd
1588 goto Lfailure;
1589 }
1590 }
1591 Lsuccess:
1592 od->reading_vec = false;
1593 if (_action.cancelled) {
1594 goto Lcancel;
1595 }
1596 SET_HANDLER(&CacheVC::openWriteMain);
1597 return callcont(CACHE_EVENT_OPEN_WRITE);
1598
1599 Lfailure:
1600 CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
1601 _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
1602 Lcancel:
1603 if (od) {
1604 od->reading_vec = false;
1605 return openWriteCloseDir(event, e);
1606 } else {
1607 return free_CacheVC(this);
1608 }
1609 Lcallreturn:
1610 return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call
1611 }
1612
1613 // handle lock failures from main Cache::open_write entry points below
1614 int
openWriteStartBegin(int,Event *)1615 CacheVC::openWriteStartBegin(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
1616 {
1617 intptr_t err;
1618 cancel_trigger();
1619 if (_action.cancelled) {
1620 return free_CacheVC(this);
1621 }
1622 if (((err = vol->open_write_lock(this, false, 1)) > 0)) {
1623 CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
1624 free_CacheVC(this);
1625 _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
1626 return EVENT_DONE;
1627 }
1628 if (err < 0) {
1629 VC_SCHED_LOCK_RETRY();
1630 }
1631 if (f.overwrite) {
1632 SET_HANDLER(&CacheVC::openWriteOverwrite);
1633 return openWriteOverwrite(EVENT_IMMEDIATE, nullptr);
1634 } else {
1635 // write by key
1636 SET_HANDLER(&CacheVC::openWriteMain);
1637 return callcont(CACHE_EVENT_OPEN_WRITE);
1638 }
1639 }
1640
1641 // main entry point for writing of of non-http documents
1642 Action *
open_write(Continuation * cont,const CacheKey * key,CacheFragType frag_type,int options,time_t apin_in_cache,const char * hostname,int host_len)1643 Cache::open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_type, int options, time_t apin_in_cache,
1644 const char *hostname, int host_len)
1645 {
1646 if (!CacheProcessor::IsCacheReady(frag_type)) {
1647 cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-ECACHE_NOT_READY);
1648 return ACTION_RESULT_DONE;
1649 }
1650
1651 ink_assert(caches[frag_type] == this);
1652
1653 intptr_t res = 0;
1654 CacheVC *c = new_CacheVC(cont);
1655 ProxyMutex *mutex = cont->mutex.get();
1656 SCOPED_MUTEX_LOCK(lock, c->mutex, this_ethread());
1657 c->vio.op = VIO::WRITE;
1658 c->base_stat = cache_write_active_stat;
1659 c->vol = key_to_vol(key, hostname, host_len);
1660 Vol *vol = c->vol;
1661 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
1662 c->first_key = c->key = *key;
1663 c->frag_type = frag_type;
1664 /*
1665 The transition from single fragment document to a multi-fragment document
1666 would cause a problem if the key and the first_key collide. In case of
1667 a collision, old vector data could be served to HTTP. Need to avoid that.
1668 Also, when evacuating a fragment, we have to decide if its the first_key
1669 or the earliest_key based on the dir_tag.
1670 */
1671 do {
1672 rand_CacheKey(&c->key, cont->mutex);
1673 } while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
1674 c->earliest_key = c->key;
1675 c->info = nullptr;
1676 c->f.overwrite = (options & CACHE_WRITE_OPT_OVERWRITE) != 0;
1677 c->f.close_complete = (options & CACHE_WRITE_OPT_CLOSE_COMPLETE) != 0;
1678 c->f.sync = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC;
1679 c->pin_in_cache = static_cast<uint32_t>(apin_in_cache);
1680
1681 if ((res = c->vol->open_write_lock(c, false, 1)) > 0) {
1682 // document currently being written, abort
1683 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
1684 cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-res);
1685 free_CacheVC(c);
1686 return ACTION_RESULT_DONE;
1687 }
1688 if (res < 0) {
1689 SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartBegin);
1690 c->trigger = CONT_SCHED_LOCK_RETRY(c);
1691 return &c->_action;
1692 }
1693 if (!c->f.overwrite) {
1694 SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
1695 c->callcont(CACHE_EVENT_OPEN_WRITE);
1696 return ACTION_RESULT_DONE;
1697 } else {
1698 SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteOverwrite);
1699 if (c->openWriteOverwrite(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
1700 return ACTION_RESULT_DONE;
1701 } else {
1702 return &c->_action;
1703 }
1704 }
1705 }
1706
1707 // main entry point for writing of http documents
1708 Action *
open_write(Continuation * cont,const CacheKey * key,CacheHTTPInfo * info,time_t apin_in_cache,const CacheKey *,CacheFragType type,const char * hostname,int host_len)1709 Cache::open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *info, time_t apin_in_cache,
1710 const CacheKey * /* key1 ATS_UNUSED */, CacheFragType type, const char *hostname, int host_len)
1711 {
1712 if (!CacheProcessor::IsCacheReady(type)) {
1713 cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-ECACHE_NOT_READY);
1714 return ACTION_RESULT_DONE;
1715 }
1716
1717 ink_assert(caches[type] == this);
1718 intptr_t err = 0;
1719 int if_writers = (uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES;
1720 CacheVC *c = new_CacheVC(cont);
1721 ProxyMutex *mutex = cont->mutex.get();
1722 c->vio.op = VIO::WRITE;
1723 c->first_key = *key;
1724 /*
1725 The transition from single fragment document to a multi-fragment document
1726 would cause a problem if the key and the first_key collide. In case of
1727 a collision, old vector data could be served to HTTP. Need to avoid that.
1728 Also, when evacuating a fragment, we have to decide if its the first_key
1729 or the earliest_key based on the dir_tag.
1730 */
1731 do {
1732 rand_CacheKey(&c->key, cont->mutex);
1733 } while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
1734 c->earliest_key = c->key;
1735 c->frag_type = CACHE_FRAG_TYPE_HTTP;
1736 c->vol = key_to_vol(key, hostname, host_len);
1737 Vol *vol = c->vol;
1738 c->info = info;
1739 if (c->info && (uintptr_t)info != CACHE_ALLOW_MULTIPLE_WRITES) {
1740 /*
1741 Update has the following code paths :
1742 a) Update alternate header only :
1743 In this case the vector has to be rewritten. The content
1744 length(update_len) and the key for the document are set in the
1745 new_info in the set_http_info call.
1746 HTTP OPERATIONS
1747 open_write with info set
1748 set_http_info new_info
1749 (total_len == 0)
1750 close
1751 b) Update alternate and data
1752 In this case both the vector and the data needs to be rewritten.
1753 This case is similar to the standard write of a document case except
1754 that the new_info is inserted into the vector at the alternate_index
1755 (overwriting the old alternate) rather than the end of the vector.
1756 HTTP OPERATIONS
1757 open_write with info set
1758 set_http_info new_info
1759 do_io_write => (total_len > 0)
1760 close
1761 c) Delete an alternate
1762 The vector may need to be deleted (if there was only one alternate) or
1763 rewritten (if there were more than one alternate). The deletion of the
1764 vector is done in openWriteRemoveVector.
1765 HTTP OPERATIONS
1766 open_write with info set
1767 close
1768 */
1769 c->f.update = 1;
1770 c->base_stat = cache_update_active_stat;
1771 DDebug("cache_update", "Update called");
1772 info->object_key_get(&c->update_key);
1773 ink_assert(!(c->update_key == zero_key));
1774 c->update_len = info->object_size_get();
1775 } else {
1776 c->base_stat = cache_write_active_stat;
1777 }
1778 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
1779 c->pin_in_cache = static_cast<uint32_t>(apin_in_cache);
1780
1781 {
1782 CACHE_TRY_LOCK(lock, c->vol->mutex, cont->mutex->thread_holding);
1783 if (lock.is_locked()) {
1784 if ((err = c->vol->open_write(c, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0) {
1785 goto Lfailure;
1786 }
1787 // If there are multiple writers, then this one cannot be an update.
1788 // Only the first writer can do an update. If that's the case, we can
1789 // return success to the state machine now.;
1790 if (c->od->has_multiple_writers()) {
1791 goto Lmiss;
1792 }
1793 if (!dir_probe(key, c->vol, &c->dir, &c->last_collision)) {
1794 if (c->f.update) {
1795 // fail update because vector has been GC'd
1796 // This situation can also arise in openWriteStartDone
1797 err = ECACHE_NO_DOC;
1798 goto Lfailure;
1799 }
1800 // document doesn't exist, begin write
1801 goto Lmiss;
1802 } else {
1803 c->od->reading_vec = true;
1804 // document exists, read vector
1805 SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
1806 switch (c->do_read_call(&c->first_key)) {
1807 case EVENT_DONE:
1808 return ACTION_RESULT_DONE;
1809 case EVENT_RETURN:
1810 goto Lcallreturn;
1811 default:
1812 return &c->_action;
1813 }
1814 }
1815 }
1816 // missed lock
1817 SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
1818 CONT_SCHED_LOCK_RETRY(c);
1819 return &c->_action;
1820 }
1821
1822 Lmiss:
1823 SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
1824 c->callcont(CACHE_EVENT_OPEN_WRITE);
1825 return ACTION_RESULT_DONE;
1826
1827 Lfailure:
1828 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
1829 cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
1830 if (c->od) {
1831 c->openWriteCloseDir(EVENT_IMMEDIATE, nullptr);
1832 return ACTION_RESULT_DONE;
1833 }
1834 free_CacheVC(c);
1835 return ACTION_RESULT_DONE;
1836
1837 Lcallreturn:
1838 if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) {
1839 return ACTION_RESULT_DONE;
1840 }
1841 return &c->_action;
1842 }
1843