1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "P_Cache.h"
25 
26 #define UINT_WRAP_LTE(_x, _y) (((_y) - (_x)) < INT_MAX) // exploit overflow
27 #define UINT_WRAP_GTE(_x, _y) (((_x) - (_y)) < INT_MAX) // exploit overflow
28 #define UINT_WRAP_LT(_x, _y) (((_x) - (_y)) >= INT_MAX) // exploit overflow
29 
30 // Given a key, finds the index of the alternate which matches
31 // used to get the alternate which is actually present in the document
32 int
get_alternate_index(CacheHTTPInfoVector * cache_vector,CacheKey key)33 get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key)
34 {
35   int alt_count = cache_vector->count();
36   CacheHTTPInfo *obj;
37   if (!alt_count) {
38     return -1;
39   }
40   for (int i = 0; i < alt_count; i++) {
41     obj = cache_vector->get(i);
42     if (obj->compare_object_key(&key)) {
43       // Debug("cache_key", "Resident alternate key  %X", key.slice32(0));
44       return i;
45     }
46   }
47   return -1;
48 }
49 
50 // Adds/Deletes alternate to the od->vector (write_vector). If the vector
51 // is empty, deletes the directory entry pointing to the vector. Each
52 // CacheVC must write the vector down to disk after making changes. If we
53 // wait till the last writer, that writer will have the responsibility of
54 // of writing the vector even if the http state machine aborts.  This
55 // makes it easier to handle situations where writers abort.
56 int
updateVector(int,Event *)57 CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
58 {
59   cancel_trigger();
60   if (od->reading_vec || od->writing_vec) {
61     VC_SCHED_LOCK_RETRY();
62   }
63   int ret = 0;
64   {
65     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
66     if (!lock.is_locked() || od->writing_vec) {
67       VC_SCHED_LOCK_RETRY();
68     }
69 
70     int vec = alternate.valid();
71     if (f.update) {
72       // all Update cases. Need to get the alternate index.
73       alternate_index = get_alternate_index(write_vector, update_key);
74       Debug("cache_update", "updating alternate index %d frags %d", alternate_index,
75             alternate_index >= 0 ? write_vector->get(alternate_index)->get_frag_offset_count() : -1);
76       // if its an alternate delete
77       if (!vec) {
78         ink_assert(!total_len);
79         if (alternate_index >= 0) {
80           write_vector->remove(alternate_index, true);
81           alternate_index = CACHE_ALT_REMOVED;
82           if (!write_vector->count()) {
83             dir_delete(&first_key, vol, &od->first_dir);
84           }
85         }
86         // the alternate is not there any more. somebody might have
87         // deleted it. Just close this writer
88         if (alternate_index != CACHE_ALT_REMOVED || !write_vector->count()) {
89           SET_HANDLER(&CacheVC::openWriteCloseDir);
90           return openWriteCloseDir(EVENT_IMMEDIATE, nullptr);
91         }
92       }
93       if (update_key == od->single_doc_key && (total_len || f.allow_empty_doc || !vec)) {
94         od->move_resident_alt = false;
95       }
96     }
97     if (cache_config_http_max_alts > 1 && write_vector->count() >= cache_config_http_max_alts && alternate_index < 0) {
98       if (od->move_resident_alt && get_alternate_index(write_vector, od->single_doc_key) == 0) {
99         od->move_resident_alt = false;
100       }
101       write_vector->remove(0, true);
102     }
103     if (vec) {
104       /* preserve fragment offset data from old info. This method is
105          called iff the update is a header only update so the fragment
106          data should remain valid.
107       */
108       // If we are not in header only updating case. Don't copy fragments.
109       if (alternate_index >= 0 &&
110           ((total_len == 0 && alternate.get_frag_offset_count() == 0) && !(f.allow_empty_doc && this->vio.nbytes == 0))) {
111         alternate.copy_frag_offsets_from(write_vector->get(alternate_index));
112       }
113       alternate_index = write_vector->insert(&alternate, alternate_index);
114     }
115 
116     if (od->move_resident_alt && first_buf.get() && !od->has_multiple_writers()) {
117       Doc *doc         = reinterpret_cast<Doc *>(first_buf->data());
118       int small_doc    = static_cast<int64_t>(doc->data_len()) < static_cast<int64_t>(cache_config_alt_rewrite_max_size);
119       int have_res_alt = doc->key == od->single_doc_key;
120       // if the new alternate is not written with the vector
121       // then move the old one with the vector
122       // if its a header only update move the resident alternate
123       // with the vector.
124       // We are sure that the body of the resident alternate that we are
125       // rewriting has not changed and the alternate is not being deleted,
126       // since we set od->move_resident_alt  to 0 in that case
127       // (in updateVector)
128       if (small_doc && have_res_alt && (fragment || (f.update && !total_len))) {
129         // for multiple fragment document, we must have done
130         // CacheVC:openWriteCloseDataDone
131         ink_assert(!fragment || f.data_done);
132         od->move_resident_alt  = false;
133         f.rewrite_resident_alt = 1;
134         write_len              = doc->data_len();
135         Debug("cache_update_alt", "rewriting resident alt size: %d key: %X, first_key: %X", write_len, doc->key.slice32(0),
136               first_key.slice32(0));
137       }
138     }
139     header_len      = write_vector->marshal_length();
140     od->writing_vec = true;
141     f.use_first_key = 1;
142     SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
143     ret = do_write_call();
144   }
145   if (ret == EVENT_RETURN) {
146     return handleEvent(AIO_EVENT_DONE, nullptr);
147   }
148   return ret;
149 }
150 /*
151    The following fields of the CacheVC are used when writing down a fragment.
152    Make sure that each of the fields is set to a valid value before calling
153    this function
154    - frag_type. Checked to see if a vector needs to be marshalled.
155    - f.use_first_key. To decide if the vector should be marshalled and to set
156      the doc->key to the appropriate key (first_key or earliest_key)
157    - f.evac_vector. If set, the writer is pushed in the beginning of the
158      agg queue. And if !f.evac_vector && !f.update the alternate->object_size
159      is set to vc->total_len
160    - f.readers.  If set, assumes that this is an evacuation, so the write
161      is not aborted even if vol->agg_todo_size > agg_write_backlog
162    - f.evacuator. If this is an evacuation.
163    - f.rewrite_resident_alt. The resident alternate is rewritten.
164    - f.update. Used only if the write_vector needs to be written to disk.
165      Used to set the length of the alternate to total_len.
166    - write_vector. Used only if frag_type == CACHE_FRAG_TYPE_HTTP &&
167      (f.use_fist_key || f.evac_vector) is set. Write_vector is written to disk
168    - alternate_index. Used only if write_vector needs to be written to disk.
169      Used to find out the VC's alternate in the write_vector and set its
170      length to tatal_len.
171    - write_len. The number of bytes for this fragment.
172    - total_len. The total number of bytes for the document so far.
173      Doc->total_len and alternate's total len is set to this value.
174    - first_key. Doc's first_key is set to this value.
175    - pin_in_cache. Doc's pinned value is set to this + Thread::get_hrtime().
176    - earliest_key. If f.use_first_key, Doc's key is set to this value.
177    - key. If !f.use_first_key, Doc's key is set to this value.
178    - blocks. Used only if write_len is set. Data to be written
179    - offset. Used only if write_len is set. offset into the block to copy
180      the data from.
181    - buf. Used only if f.evacuator is set. Should point to the old document.
182    The functions sets the length, offset, pinned, head and phase of vc->dir.
183    */
184 
185 int
handleWrite(int event,Event *)186 CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED */)
187 {
188   // plain write case
189   ink_assert(!trigger);
190   frag_len = 0;
191 
192   set_agg_write_in_progress();
193   POP_HANDLER;
194   agg_len = vol->round_to_approx_size(write_len + header_len + frag_len + sizeof(Doc));
195   vol->agg_todo_size += agg_len;
196   bool agg_error = (agg_len > AGG_SIZE || header_len + sizeof(Doc) > MAX_FRAG_SIZE ||
197                     (!f.readers && (vol->agg_todo_size > cache_config_agg_write_backlog + AGG_SIZE) && write_len));
198 #ifdef CACHE_AGG_FAIL_RATE
199   agg_error = agg_error || ((uint32_t)mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX * CACHE_AGG_FAIL_RATE));
200 #endif
201   bool max_doc_error = (cache_config_max_doc_size && (cache_config_max_doc_size < vio.ndone ||
202                                                       (vio.nbytes != INT64_MAX && (cache_config_max_doc_size < vio.nbytes))));
203 
204   if (agg_error || max_doc_error) {
205     CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
206     CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
207     vol->agg_todo_size -= agg_len;
208     io.aio_result = AIO_SOFT_FAILURE;
209     if (event == EVENT_CALL) {
210       return EVENT_RETURN;
211     }
212     return handleEvent(AIO_EVENT_DONE, nullptr);
213   }
214   ink_assert(agg_len <= AGG_SIZE);
215   if (f.evac_vector) {
216     vol->agg.push(this);
217   } else {
218     vol->agg.enqueue(this);
219   }
220   if (!vol->is_io_in_progress()) {
221     return vol->aggWrite(event, this);
222   }
223   return EVENT_CONT;
224 }
225 
226 static char *
iobufferblock_memcpy(char * p,int len,IOBufferBlock * ab,int offset)227 iobufferblock_memcpy(char *p, int len, IOBufferBlock *ab, int offset)
228 {
229   IOBufferBlock *b = ab;
230   while (b && len >= 0) {
231     char *start   = b->_start;
232     char *end     = b->_end;
233     int max_bytes = end - start;
234     max_bytes -= offset;
235     if (max_bytes <= 0) {
236       offset = -max_bytes;
237       b      = b->next.get();
238       continue;
239     }
240     int bytes = len;
241     if (bytes >= max_bytes) {
242       bytes = max_bytes;
243     }
244     ::memcpy(p, start + offset, bytes);
245     p += bytes;
246     len -= bytes;
247     b      = b->next.get();
248     offset = 0;
249   }
250   return p;
251 }
252 
253 EvacuationBlock *
force_evacuate_head(Dir * evac_dir,int pinned)254 Vol::force_evacuate_head(Dir *evac_dir, int pinned)
255 {
256   // build an evacuation block for the object
257   EvacuationBlock *b = evacuation_block_exists(evac_dir, this);
258   // if we have already started evacuating this document, its too late
259   // to evacuate the head...bad luck
260   if (b && b->f.done) {
261     return b;
262   }
263 
264   if (!b) {
265     b      = new_EvacuationBlock(mutex->thread_holding);
266     b->dir = *evac_dir;
267     DDebug("cache_evac", "force: %d, %d", (int)dir_offset(evac_dir), (int)dir_phase(evac_dir));
268     evacuate[dir_evac_bucket(evac_dir)].push(b);
269   }
270   b->f.pinned        = pinned;
271   b->f.evacuate_head = 1;
272   b->evac_frags.key  = zero_key; // ensure that the block gets
273   // evacuated no matter what
274   b->readers = 0; // ensure that the block does not disappear
275   return b;
276 }
277 
278 void
scan_for_pinned_documents()279 Vol::scan_for_pinned_documents()
280 {
281   if (cache_config_permit_pinning) {
282     // we can't evacuate anything between header->write_pos and
283     // header->write_pos + AGG_SIZE.
284     int ps                = this->offset_to_vol_offset(header->write_pos + AGG_SIZE);
285     int pe                = this->offset_to_vol_offset(header->write_pos + 2 * EVACUATION_SIZE + (len / PIN_SCAN_EVERY));
286     int vol_end_offset    = this->offset_to_vol_offset(len + skip);
287     int before_end_of_vol = pe < vol_end_offset;
288     DDebug("cache_evac", "scan %d %d", ps, pe);
289     for (int i = 0; i < this->direntries(); i++) {
290       // is it a valid pinned object?
291       if (!dir_is_empty(&dir[i]) && dir_pinned(&dir[i]) && dir_head(&dir[i])) {
292         // select objects only within this PIN_SCAN region
293         int o = dir_offset(&dir[i]);
294         if (dir_phase(&dir[i]) == header->phase) {
295           if (before_end_of_vol || o >= (pe - vol_end_offset)) {
296             continue;
297           }
298         } else {
299           if (o < ps || o >= pe) {
300             continue;
301           }
302         }
303         force_evacuate_head(&dir[i], 1);
304         //      DDebug("cache_evac", "scan pinned at offset %d %d %d %d %d %d",
305         //            (int)dir_offset(&b->dir), ps, o , pe, i, (int)b->f.done);
306       }
307     }
308   }
309 }
310 
311 /* NOTE:: This state can be called by an AIO thread, so DON'T DON'T
312    DON'T schedule any events on this thread using VC_SCHED_XXX or
313    mutex->thread_holding->schedule_xxx_local(). ALWAYS use
314    eventProcessor.schedule_xxx().
315    */
316 int
aggWriteDone(int event,Event * e)317 Vol::aggWriteDone(int event, Event *e)
318 {
319   cancel_trigger();
320 
321   // ensure we have the cacheDirSync lock if we intend to call it later
322   // retaking the current mutex recursively is a NOOP
323   CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
324   if (!lock.is_locked()) {
325     eventProcessor.schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
326     return EVENT_CONT;
327   }
328   if (io.ok()) {
329     header->last_write_pos = header->write_pos;
330     header->write_pos += io.aiocb.aio_nbytes;
331     ink_assert(header->write_pos >= start);
332     DDebug("cache_agg", "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "", hash_text.get(), header->write_pos,
333            header->last_write_pos);
334     ink_assert(header->write_pos == header->agg_pos);
335     if (header->write_pos + EVACUATION_SIZE > scan_pos) {
336       periodic_scan();
337     }
338     agg_buf_pos = 0;
339     header->write_serial++;
340   } else {
341     // delete all the directory entries that we inserted
342     // for fragments is this aggregation buffer
343     Debug("cache_disk_error", "Write error on disk %s\n \
344               write range : [%" PRIu64 " - %" PRIu64 " bytes]  [%" PRIu64 " - %" PRIu64 " blocks] \n",
345           hash_text.get(), (uint64_t)io.aiocb.aio_offset, (uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
346           (uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE,
347           (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE);
348     Dir del_dir;
349     dir_clear(&del_dir);
350     for (int done = 0; done < agg_buf_pos;) {
351       Doc *doc = reinterpret_cast<Doc *>(agg_buffer + done);
352       dir_set_offset(&del_dir, header->write_pos + done);
353       dir_delete(&doc->key, this, &del_dir);
354       done += round_to_approx_size(doc->len);
355     }
356     agg_buf_pos = 0;
357   }
358   set_io_not_in_progress();
359   // callback ready sync CacheVCs
360   CacheVC *c = nullptr;
361   while ((c = sync.dequeue())) {
362     if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) {
363       eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
364     } else {
365       sync.push(c); // put it back on the front
366       break;
367     }
368   }
369   if (dir_sync_waiting) {
370     dir_sync_waiting = false;
371     cacheDirSync->handleEvent(EVENT_IMMEDIATE, nullptr);
372   }
373   if (agg.head || sync.head) {
374     return aggWrite(event, e);
375   }
376   return EVENT_CONT;
377 }
378 
379 CacheVC *
new_DocEvacuator(int nbytes,Vol * vol)380 new_DocEvacuator(int nbytes, Vol *vol)
381 {
382   CacheVC *c        = new_CacheVC(vol);
383   ProxyMutex *mutex = vol->mutex.get();
384   c->base_stat      = cache_evacuate_active_stat;
385   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
386   c->buf          = new_IOBufferData(iobuffer_size_to_index(nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
387   c->vol          = vol;
388   c->f.evacuator  = 1;
389   c->earliest_key = zero_key;
390   SET_CONTINUATION_HANDLER(c, &CacheVC::evacuateDocDone);
391   return c;
392 }
393 
394 int
evacuateReadHead(int,Event *)395 CacheVC::evacuateReadHead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
396 {
397   // The evacuator vc shares the lock with the volition mutex
398   ink_assert(vol->mutex->thread_holding == this_ethread());
399   cancel_trigger();
400   Doc *doc                     = reinterpret_cast<Doc *>(buf->data());
401   CacheHTTPInfo *alternate_tmp = nullptr;
402   if (!io.ok()) {
403     goto Ldone;
404   }
405   // a directory entry which is no longer valid may have been overwritten
406   if (!dir_valid(vol, &dir)) {
407     last_collision = nullptr;
408     goto Lcollision;
409   }
410   if (doc->magic != DOC_MAGIC || !(doc->first_key == first_key)) {
411     goto Lcollision;
412   }
413   alternate_tmp = nullptr;
414   if (doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
415     // its an http document
416     if (this->load_http_info(&vector, doc) != doc->hlen) {
417       Note("bad vector detected during evacuation");
418       goto Ldone;
419     }
420     alternate_index = get_alternate_index(&vector, earliest_key);
421     if (alternate_index < 0) {
422       goto Ldone;
423     }
424     alternate_tmp = vector.get(alternate_index);
425     doc_len       = alternate_tmp->object_size_get();
426     Debug("cache_evac", "evacuateReadHead http earliest %X first: %X len: %" PRId64, first_key.slice32(0), earliest_key.slice32(0),
427           doc_len);
428   } else {
429     // non-http document
430     CacheKey next_key;
431     next_CacheKey(&next_key, &doc->key);
432     if (!(next_key == earliest_key)) {
433       goto Ldone;
434     }
435     doc_len = doc->total_len;
436     DDebug("cache_evac", "evacuateReadHead non-http earliest %X first: %X len: %" PRId64, first_key.slice32(0),
437            earliest_key.slice32(0), doc_len);
438   }
439   if (doc_len == total_len) {
440     // the whole document has been evacuated. Insert the directory
441     // entry in the directory.
442     dir_lookaside_fixup(&earliest_key, vol);
443     return free_CacheVC(this);
444   }
445   return EVENT_CONT;
446 Lcollision:
447   if (dir_probe(&first_key, vol, &dir, &last_collision)) {
448     int ret = do_read_call(&first_key);
449     if (ret == EVENT_RETURN) {
450       return handleEvent(AIO_EVENT_DONE, nullptr);
451     }
452     return ret;
453   }
454 Ldone:
455   dir_lookaside_remove(&earliest_key, vol);
456   return free_CacheVC(this);
457 }
458 
459 int
evacuateDocDone(int,Event *)460 CacheVC::evacuateDocDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
461 {
462   ink_assert(vol->mutex->thread_holding == this_ethread());
463   Doc *doc = reinterpret_cast<Doc *>(buf->data());
464   DDebug("cache_evac", "evacuateDocDone %X o %d p %d new_o %d new_p %d", (int)key.slice32(0), (int)dir_offset(&overwrite_dir),
465          (int)dir_phase(&overwrite_dir), (int)dir_offset(&dir), (int)dir_phase(&dir));
466   int i = dir_evac_bucket(&overwrite_dir);
467   // nasty beeping race condition, need to have the EvacuationBlock here
468   EvacuationBlock *b = vol->evacuate[i].head;
469   for (; b; b = b->link.next) {
470     if (dir_offset(&b->dir) == dir_offset(&overwrite_dir)) {
471       // If the document is single fragment (although not tied to the vector),
472       // then we don't have to put the directory entry in the lookaside
473       // buffer. But, we have no way of finding out if the document is
474       // single fragment. doc->single_fragment() can be true for a multiple
475       // fragment document since total_len and doc->len could be equal at
476       // the time we write the fragment down. To be on the safe side, we
477       // only overwrite the entry in the directory if its not a head.
478       if (!dir_head(&overwrite_dir)) {
479         // find the earliest key
480         EvacuationKey *evac = &b->evac_frags;
481         for (; evac && !(evac->key == doc->key); evac = evac->link.next) {
482           ;
483         }
484         ink_assert(evac);
485         if (!evac) {
486           break;
487         }
488         if (evac->earliest_key.fold()) {
489           DDebug("cache_evac", "evacdocdone: evacuating key %X earliest %X", evac->key.slice32(0), evac->earliest_key.slice32(0));
490           EvacuationBlock *eblock = nullptr;
491           Dir dir_tmp;
492           dir_lookaside_probe(&evac->earliest_key, vol, &dir_tmp, &eblock);
493           if (eblock) {
494             CacheVC *earliest_evac = eblock->earliest_evacuator;
495             earliest_evac->total_len += doc->data_len();
496             if (earliest_evac->total_len == earliest_evac->doc_len) {
497               dir_lookaside_fixup(&evac->earliest_key, vol);
498               free_CacheVC(earliest_evac);
499             }
500           }
501         }
502         dir_overwrite(&doc->key, vol, &dir, &overwrite_dir);
503       }
504       // if the tag in the overwrite_dir matches the first_key in the
505       // document, then it has to be the vector. We guarantee that
506       // the first_key and the earliest_key will never collide (see
507       // Cache::open_write). Once we know its the vector, we can
508       // safely overwrite the first_key in the directory.
509       if (dir_head(&overwrite_dir) && b->f.evacuate_head) {
510         DDebug("cache_evac", "evacuateDocDone evacuate_head %X %X hlen %d offset %d", (int)key.slice32(0), (int)doc->key.slice32(0),
511                doc->hlen, (int)dir_offset(&overwrite_dir));
512 
513         if (dir_compare_tag(&overwrite_dir, &doc->first_key)) {
514           OpenDirEntry *cod;
515           DDebug("cache_evac", "evacuating vector: %X %d", (int)doc->first_key.slice32(0), (int)dir_offset(&overwrite_dir));
516           if ((cod = vol->open_read(&doc->first_key))) {
517             // writer  exists
518             DDebug("cache_evac", "overwriting the open directory %X %d %d", (int)doc->first_key.slice32(0),
519                    (int)dir_offset(&cod->first_dir), (int)dir_offset(&dir));
520             cod->first_dir = dir;
521           }
522           if (dir_overwrite(&doc->first_key, vol, &dir, &overwrite_dir)) {
523             int64_t o = dir_offset(&overwrite_dir), n = dir_offset(&dir);
524             vol->ram_cache->fixup(&doc->first_key, static_cast<uint32_t>(o >> 32), static_cast<uint32_t>(o),
525                                   static_cast<uint32_t>(n >> 32), static_cast<uint32_t>(n));
526           }
527         } else {
528           DDebug("cache_evac", "evacuating earliest: %X %d", (int)doc->key.slice32(0), (int)dir_offset(&overwrite_dir));
529           ink_assert(dir_compare_tag(&overwrite_dir, &doc->key));
530           ink_assert(b->earliest_evacuator == this);
531           total_len += doc->data_len();
532           first_key    = doc->first_key;
533           earliest_dir = dir;
534           if (dir_probe(&first_key, vol, &dir, &last_collision) > 0) {
535             dir_lookaside_insert(b, vol, &earliest_dir);
536             // read the vector
537             SET_HANDLER(&CacheVC::evacuateReadHead);
538             int ret = do_read_call(&first_key);
539             if (ret == EVENT_RETURN) {
540               return handleEvent(AIO_EVENT_DONE, nullptr);
541             }
542             return ret;
543           }
544         }
545       }
546       break;
547     }
548   }
549   return free_CacheVC(this);
550 }
551 
552 static int
evacuate_fragments(CacheKey * key,CacheKey * earliest_key,int force,Vol * vol)553 evacuate_fragments(CacheKey *key, CacheKey *earliest_key, int force, Vol *vol)
554 {
555   Dir dir, *last_collision = nullptr;
556   int i = 0;
557   while (dir_probe(key, vol, &dir, &last_collision)) {
558     // next fragment cannot be a head...if it is, it must have been a
559     // directory collision.
560     if (dir_head(&dir)) {
561       continue;
562     }
563     EvacuationBlock *b = evacuation_block_exists(&dir, vol);
564     if (!b) {
565       b                          = new_EvacuationBlock(vol->mutex->thread_holding);
566       b->dir                     = dir;
567       b->evac_frags.key          = *key;
568       b->evac_frags.earliest_key = *earliest_key;
569       vol->evacuate[dir_evac_bucket(&dir)].push(b);
570       i++;
571     } else {
572       ink_assert(dir_offset(&dir) == dir_offset(&b->dir));
573       ink_assert(dir_phase(&dir) == dir_phase(&b->dir));
574       EvacuationKey *evac_frag = evacuationKeyAllocator.alloc();
575       evac_frag->key           = *key;
576       evac_frag->earliest_key  = *earliest_key;
577       evac_frag->link.next     = b->evac_frags.link.next;
578       b->evac_frags.link.next  = evac_frag;
579     }
580     if (force) {
581       b->readers = 0;
582     }
583     DDebug("cache_evac", "next fragment %X Earliest: %X offset %d phase %d force %d", (int)key->slice32(0),
584            (int)earliest_key->slice32(0), (int)dir_offset(&dir), (int)dir_phase(&dir), force);
585   }
586   return i;
587 }
588 
589 int
evacuateWrite(CacheVC * evacuator,int event,Event * e)590 Vol::evacuateWrite(CacheVC *evacuator, int event, Event *e)
591 {
592   // push to front of aggregation write list, so it is written first
593 
594   evacuator->agg_len = round_to_approx_size((reinterpret_cast<Doc *>(evacuator->buf->data()))->len);
595   agg_todo_size += evacuator->agg_len;
596   /* insert the evacuator after all the other evacuators */
597   CacheVC *cur   = static_cast<CacheVC *>(agg.head);
598   CacheVC *after = nullptr;
599   for (; cur && cur->f.evacuator; cur = (CacheVC *)cur->link.next) {
600     after = cur;
601   }
602   ink_assert(evacuator->agg_len <= AGG_SIZE);
603   agg.insert(evacuator, after);
604   return aggWrite(event, e);
605 }
606 
607 int
evacuateDocReadDone(int event,Event * e)608 Vol::evacuateDocReadDone(int event, Event *e)
609 {
610   cancel_trigger();
611   if (event != AIO_EVENT_DONE) {
612     return EVENT_DONE;
613   }
614   ink_assert(is_io_in_progress());
615   set_io_not_in_progress();
616   ink_assert(mutex->thread_holding == this_ethread());
617   Doc *doc = reinterpret_cast<Doc *>(doc_evacuator->buf->data());
618   CacheKey next_key;
619   EvacuationBlock *b = nullptr;
620   if (doc->magic != DOC_MAGIC) {
621     Debug("cache_evac", "DOC magic: %X %d", (int)dir_tag(&doc_evacuator->overwrite_dir),
622           (int)dir_offset(&doc_evacuator->overwrite_dir));
623     ink_assert(doc->magic == DOC_MAGIC);
624     goto Ldone;
625   }
626   DDebug("cache_evac", "evacuateDocReadDone %X offset %d", (int)doc->key.slice32(0),
627          (int)dir_offset(&doc_evacuator->overwrite_dir));
628 
629   b = evacuate[dir_evac_bucket(&doc_evacuator->overwrite_dir)].head;
630   while (b) {
631     if (dir_offset(&b->dir) == dir_offset(&doc_evacuator->overwrite_dir)) {
632       break;
633     }
634     b = b->link.next;
635   }
636   if (!b) {
637     goto Ldone;
638   }
639   if ((b->f.pinned && !b->readers) && doc->pinned < static_cast<uint32_t>(Thread::get_hrtime() / HRTIME_SECOND)) {
640     goto Ldone;
641   }
642 
643   if (dir_head(&b->dir) && b->f.evacuate_head) {
644     ink_assert(!b->evac_frags.key.fold());
645     // if its a head (vector), evacuation is real simple...we just
646     // need to write this vector down and overwrite the directory entry.
647     if (dir_compare_tag(&b->dir, &doc->first_key)) {
648       doc_evacuator->key = doc->first_key;
649       b->evac_frags.key  = doc->first_key;
650       DDebug("cache_evac", "evacuating vector %X offset %d", (int)doc->first_key.slice32(0),
651              (int)dir_offset(&doc_evacuator->overwrite_dir));
652       b->f.unused = 57;
653     } else {
654       // if its an earliest fragment (alternate) evacuation, things get
655       // a little tricky. We have to propagate the earliest key to the next
656       // fragments for this alternate. The last fragment to be evacuated
657       // fixes up the lookaside buffer.
658       doc_evacuator->key          = doc->key;
659       doc_evacuator->earliest_key = doc->key;
660       b->evac_frags.key           = doc->key;
661       b->evac_frags.earliest_key  = doc->key;
662       b->earliest_evacuator       = doc_evacuator;
663       DDebug("cache_evac", "evacuating earliest %X %X evac: %p offset: %d", (int)b->evac_frags.key.slice32(0),
664              (int)doc->key.slice32(0), doc_evacuator, (int)dir_offset(&doc_evacuator->overwrite_dir));
665       b->f.unused = 67;
666     }
667   } else {
668     // find which key matches the document
669     EvacuationKey *ek = &b->evac_frags;
670     for (; ek && !(ek->key == doc->key); ek = ek->link.next) {
671       ;
672     }
673     if (!ek) {
674       b->f.unused = 77;
675       goto Ldone;
676     }
677     doc_evacuator->key          = ek->key;
678     doc_evacuator->earliest_key = ek->earliest_key;
679     DDebug("cache_evac", "evacuateDocReadDone key: %X earliest: %X", (int)ek->key.slice32(0), (int)ek->earliest_key.slice32(0));
680     b->f.unused = 87;
681   }
682   // if the tag in the c->dir does match the first_key in the
683   // document, then it has to be the earliest fragment. We guarantee that
684   // the first_key and the earliest_key will never collide (see
685   // Cache::open_write).
686   if (!dir_head(&b->dir) || !dir_compare_tag(&b->dir, &doc->first_key)) {
687     next_CacheKey(&next_key, &doc->key);
688     evacuate_fragments(&next_key, &doc_evacuator->earliest_key, !b->readers, this);
689   }
690   return evacuateWrite(doc_evacuator, event, e);
691 Ldone:
692   free_CacheVC(doc_evacuator);
693   doc_evacuator = nullptr;
694   return aggWrite(event, e);
695 }
696 
697 int
evac_range(off_t low,off_t high,int evac_phase)698 Vol::evac_range(off_t low, off_t high, int evac_phase)
699 {
700   off_t s = this->offset_to_vol_offset(low);
701   off_t e = this->offset_to_vol_offset(high);
702   int si  = dir_offset_evac_bucket(s);
703   int ei  = dir_offset_evac_bucket(e);
704 
705   for (int i = si; i <= ei; i++) {
706     EvacuationBlock *b     = evacuate[i].head;
707     EvacuationBlock *first = nullptr;
708     int64_t first_offset   = INT64_MAX;
709     for (; b; b = b->link.next) {
710       int64_t offset = dir_offset(&b->dir);
711       int phase      = dir_phase(&b->dir);
712       if (offset >= s && offset < e && !b->f.done && phase == evac_phase) {
713         if (offset < first_offset) {
714           first        = b;
715           first_offset = offset;
716         }
717       }
718     }
719     if (first) {
720       first->f.done       = 1;
721       io.aiocb.aio_fildes = fd;
722       io.aiocb.aio_nbytes = dir_approx_size(&first->dir);
723       io.aiocb.aio_offset = this->vol_offset(&first->dir);
724       if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast<off_t>(skip + len)) {
725         io.aiocb.aio_nbytes = skip + len - io.aiocb.aio_offset;
726       }
727       doc_evacuator                = new_DocEvacuator(io.aiocb.aio_nbytes, this);
728       doc_evacuator->overwrite_dir = first->dir;
729 
730       io.aiocb.aio_buf = doc_evacuator->buf->data();
731       io.action        = this;
732       io.thread        = AIO_CALLBACK_THREAD_ANY;
733       DDebug("cache_evac", "evac_range evacuating %X %d", (int)dir_tag(&first->dir), (int)dir_offset(&first->dir));
734       SET_HANDLER(&Vol::evacuateDocReadDone);
735       ink_assert(ink_aio_read(&io) >= 0);
736       return -1;
737     }
738   }
739   return 0;
740 }
741 
742 static int
agg_copy(char * p,CacheVC * vc)743 agg_copy(char *p, CacheVC *vc)
744 {
745   Vol *vol = vc->vol;
746   off_t o  = vol->header->write_pos + vol->agg_buf_pos;
747 
748   if (!vc->f.evacuator) {
749     Doc *doc                   = reinterpret_cast<Doc *>(p);
750     IOBufferBlock *res_alt_blk = nullptr;
751 
752     uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeof(Doc);
753     ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeof(Doc));
754     ink_assert(vol->round_to_approx_size(len) == vc->agg_len);
755     // update copy of directory entry for this document
756     dir_set_approx_size(&vc->dir, vc->agg_len);
757     dir_set_offset(&vc->dir, vol->offset_to_vol_offset(o));
758     ink_assert(vol->vol_offset(&vc->dir) < (vol->skip + vol->len));
759     dir_set_phase(&vc->dir, vol->header->phase);
760 
761     // fill in document header
762     doc->magic       = DOC_MAGIC;
763     doc->len         = len;
764     doc->hlen        = vc->header_len;
765     doc->doc_type    = vc->frag_type;
766     doc->v_major     = CACHE_DB_MAJOR_VERSION;
767     doc->v_minor     = CACHE_DB_MINOR_VERSION;
768     doc->unused      = 0; // force this for forward compatibility.
769     doc->total_len   = vc->total_len;
770     doc->first_key   = vc->first_key;
771     doc->sync_serial = vol->header->sync_serial;
772     vc->write_serial = doc->write_serial = vol->header->write_serial;
773     doc->checksum                        = DOC_NO_CHECKSUM;
774     if (vc->pin_in_cache) {
775       dir_set_pinned(&vc->dir, 1);
776       doc->pinned = static_cast<uint32_t>(Thread::get_hrtime() / HRTIME_SECOND) + vc->pin_in_cache;
777     } else {
778       dir_set_pinned(&vc->dir, 0);
779       doc->pinned = 0;
780     }
781 
782     if (vc->f.use_first_key) {
783       if (doc->data_len() || vc->f.allow_empty_doc) {
784         doc->key = vc->earliest_key;
785       } else { // the vector is being written by itself
786         if (vc->earliest_key == zero_key) {
787           do {
788             rand_CacheKey(&doc->key, vc->vol->mutex);
789           } while (DIR_MASK_TAG(doc->key.slice32(2)) == DIR_MASK_TAG(vc->first_key.slice32(2)));
790         } else {
791           prev_CacheKey(&doc->key, &vc->earliest_key);
792         }
793       }
794       dir_set_head(&vc->dir, true);
795     } else {
796       doc->key = vc->key;
797       dir_set_head(&vc->dir, !vc->fragment);
798     }
799 
800     if (vc->f.rewrite_resident_alt) {
801       ink_assert(vc->f.use_first_key);
802       Doc *res_doc   = reinterpret_cast<Doc *>(vc->first_buf->data());
803       res_alt_blk    = new_IOBufferBlock(vc->first_buf, res_doc->data_len(), sizeof(Doc) + res_doc->hlen);
804       doc->key       = res_doc->key;
805       doc->total_len = res_doc->data_len();
806     }
807     // update the new_info object_key, and total_len and dirinfo
808     if (vc->header_len) {
809       ink_assert(vc->f.use_first_key);
810       if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
811         ink_assert(vc->write_vector->count() > 0);
812         if (!vc->f.update && !vc->f.evac_vector) {
813           ink_assert(!(vc->first_key == zero_key));
814           CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
815           http_info->object_size_set(vc->total_len);
816         }
817         // update + data_written =>  Update case (b)
818         // need to change the old alternate's object length
819         if (vc->f.update && vc->total_len) {
820           CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
821           http_info->object_size_set(vc->total_len);
822         }
823         ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
824         ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len));
825       } else {
826         memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
827       }
828       // the single fragment flag is not used in the write call.
829       // putting it in for completeness.
830       vc->f.single_fragment = doc->single_fragment();
831     }
832     // move data
833     if (vc->write_len) {
834       {
835         ProxyMutex *mutex ATS_UNUSED = vc->vol->mutex.get();
836         ink_assert(mutex->thread_holding == this_ethread());
837         CACHE_DEBUG_SUM_DYN_STAT(cache_write_bytes_stat, vc->write_len);
838       }
839       if (vc->f.rewrite_resident_alt) {
840         iobufferblock_memcpy(doc->data(), vc->write_len, res_alt_blk, 0);
841       } else {
842         iobufferblock_memcpy(doc->data(), vc->write_len, vc->blocks.get(), vc->offset);
843       }
844 #ifdef VERIFY_JTEST_DATA
845       if (f.use_first_key && header_len) {
846         int ib = 0, xd = 0;
847         char xx[500];
848         new_info.request_get().url_get().print(xx, 500, &ib, &xd);
849         char *x = xx;
850         for (int q = 0; q < 3; q++)
851           x = strchr(x + 1, '/');
852         ink_assert(!memcmp(doc->hdr(), x, ib - (x - xx)));
853       }
854 #endif
855     }
856     if (cache_config_enable_checksum) {
857       doc->checksum = 0;
858       for (char *b = doc->hdr(); b < reinterpret_cast<char *>(doc) + doc->len; b++) {
859         doc->checksum += *b;
860       }
861     }
862     if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment) {
863       ink_assert(doc->hlen);
864     }
865 
866     if (res_alt_blk) {
867       res_alt_blk->free();
868     }
869 
870     return vc->agg_len;
871   } else {
872     // for evacuated documents, copy the data, and update directory
873     Doc *doc = reinterpret_cast<Doc *>(vc->buf->data());
874     int l    = vc->vol->round_to_approx_size(doc->len);
875     {
876       ProxyMutex *mutex ATS_UNUSED = vc->vol->mutex.get();
877       ink_assert(mutex->thread_holding == this_ethread());
878       CACHE_DEBUG_INCREMENT_DYN_STAT(cache_gc_frags_evacuated_stat);
879       CACHE_DEBUG_SUM_DYN_STAT(cache_gc_bytes_evacuated_stat, l);
880     }
881 
882     doc->sync_serial  = vc->vol->header->sync_serial;
883     doc->write_serial = vc->vol->header->write_serial;
884 
885     memcpy(p, doc, doc->len);
886 
887     vc->dir = vc->overwrite_dir;
888     dir_set_offset(&vc->dir, vc->vol->offset_to_vol_offset(o));
889     dir_set_phase(&vc->dir, vc->vol->header->phase);
890     return l;
891   }
892 }
893 
894 inline void
evacuate_cleanup_blocks(int i)895 Vol::evacuate_cleanup_blocks(int i)
896 {
897   EvacuationBlock *b = evacuate[i].head;
898   while (b) {
899     if (b->f.done && ((header->phase != dir_phase(&b->dir) && header->write_pos > this->vol_offset(&b->dir)) ||
900                       (header->phase == dir_phase(&b->dir) && header->write_pos <= this->vol_offset(&b->dir)))) {
901       EvacuationBlock *x = b;
902       DDebug("cache_evac", "evacuate cleanup free %X offset %d", (int)b->evac_frags.key.slice32(0), (int)dir_offset(&b->dir));
903       b = b->link.next;
904       evacuate[i].remove(x);
905       free_EvacuationBlock(x, mutex->thread_holding);
906       continue;
907     }
908     b = b->link.next;
909   }
910 }
911 
912 void
evacuate_cleanup()913 Vol::evacuate_cleanup()
914 {
915   int64_t eo = ((header->write_pos - start) / CACHE_BLOCK_SIZE) + 1;
916   int64_t e  = dir_offset_evac_bucket(eo);
917   int64_t sx = e - (evacuate_size / PIN_SCAN_EVERY) - 1;
918   int64_t s  = sx;
919   int i;
920 
921   if (e > evacuate_size) {
922     e = evacuate_size;
923   }
924   if (sx < 0) {
925     s = 0;
926   }
927   for (i = s; i < e; i++) {
928     evacuate_cleanup_blocks(i);
929   }
930 
931   // if we have wrapped, handle the end bit
932   if (sx <= 0) {
933     s = evacuate_size + sx - 2;
934     if (s < 0) {
935       s = 0;
936     }
937     for (i = s; i < evacuate_size; i++) {
938       evacuate_cleanup_blocks(i);
939     }
940   }
941 }
942 
943 void
periodic_scan()944 Vol::periodic_scan()
945 {
946   evacuate_cleanup();
947   scan_for_pinned_documents();
948   if (header->write_pos == start) {
949     scan_pos = start;
950   }
951   scan_pos += len / PIN_SCAN_EVERY;
952 }
953 
954 void
agg_wrap()955 Vol::agg_wrap()
956 {
957   header->write_pos = start;
958   header->phase     = !header->phase;
959 
960   header->cycle++;
961   header->agg_pos = header->write_pos;
962   dir_lookaside_cleanup(this);
963   dir_clean_vol(this);
964   {
965     Vol *vol = this;
966     CACHE_INCREMENT_DYN_STAT(cache_directory_wrap_stat);
967     Note("Cache volume %d on disk '%s' wraps around", vol->cache_vol->vol_number, vol->hash_text.get());
968   }
969   periodic_scan();
970 }
971 
972 /* NOTE: This state can be called by an AIO thread, so DON'T DON'T
973    DON'T schedule any events on this thread using VC_SCHED_XXX or
974    mutex->thread_holding->schedule_xxx_local(). ALWAYS use
975    eventProcessor.schedule_xxx().
976    Also, make sure that any functions called by this also use
977    the eventProcessor to schedule events
978 */
979 int
aggWrite(int event,void *)980 Vol::aggWrite(int event, void * /* e ATS_UNUSED */)
981 {
982   ink_assert(!is_io_in_progress());
983 
984   Que(CacheVC, link) tocall;
985   CacheVC *c;
986 
987   cancel_trigger();
988 
989 Lagain:
990   // calculate length of aggregated write
991   for (c = static_cast<CacheVC *>(agg.head); c;) {
992     int writelen = c->agg_len;
993     // [amc] this is checked multiple places, on here was it strictly less.
994     ink_assert(writelen <= AGG_SIZE);
995     if (agg_buf_pos + writelen > AGG_SIZE || header->write_pos + agg_buf_pos + writelen > (skip + len)) {
996       break;
997     }
998     DDebug("agg_read", "copying: %d, %" PRIu64 ", key: %d", agg_buf_pos, header->write_pos + agg_buf_pos, c->first_key.slice32(0));
999     int wrotelen = agg_copy(agg_buffer + agg_buf_pos, c);
1000     ink_assert(writelen == wrotelen);
1001     agg_todo_size -= writelen;
1002     agg_buf_pos += writelen;
1003     CacheVC *n = (CacheVC *)c->link.next;
1004     agg.dequeue();
1005     if (c->f.sync && c->f.use_first_key) {
1006       CacheVC *last = sync.tail;
1007       while (last && UINT_WRAP_LT(c->write_serial, last->write_serial)) {
1008         last = (CacheVC *)last->link.prev;
1009       }
1010       sync.insert(c, last);
1011     } else if (c->f.evacuator) {
1012       c->handleEvent(AIO_EVENT_DONE, nullptr);
1013     } else {
1014       tocall.enqueue(c);
1015     }
1016     c = n;
1017   }
1018 
1019   // if we got nothing...
1020   if (!agg_buf_pos) {
1021     if (!agg.head && !sync.head) { // nothing to get
1022       return EVENT_CONT;
1023     }
1024     if (header->write_pos == start) {
1025       // write aggregation too long, bad bad, punt on everything.
1026       Note("write aggregation exceeds vol size");
1027       ink_assert(!tocall.head);
1028       ink_assert(false);
1029       while ((c = agg.dequeue())) {
1030         agg_todo_size -= c->agg_len;
1031         eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
1032       }
1033       return EVENT_CONT;
1034     }
1035     // start back
1036     if (agg.head) {
1037       agg_wrap();
1038       goto Lagain;
1039     }
1040   }
1041 
1042   // evacuate space
1043   off_t end = header->write_pos + agg_buf_pos + EVACUATION_SIZE;
1044   if (evac_range(header->write_pos, end, !header->phase) < 0) {
1045     goto Lwait;
1046   }
1047   if (end > skip + len) {
1048     if (evac_range(start, start + (end - (skip + len)), header->phase) < 0) {
1049       goto Lwait;
1050     }
1051   }
1052 
1053   // if agg.head, then we are near the end of the disk, so
1054   // write down the aggregation in whatever size it is.
1055   if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !sync.head && !dir_sync_waiting) {
1056     goto Lwait;
1057   }
1058 
1059   // write sync marker
1060   if (!agg_buf_pos) {
1061     ink_assert(sync.head);
1062     int l       = round_to_approx_size(sizeof(Doc));
1063     agg_buf_pos = l;
1064     Doc *d      = reinterpret_cast<Doc *>(agg_buffer);
1065     memset(static_cast<void *>(d), 0, sizeof(Doc));
1066     d->magic        = DOC_MAGIC;
1067     d->len          = l;
1068     d->sync_serial  = header->sync_serial;
1069     d->write_serial = header->write_serial;
1070   }
1071 
1072   // set write limit
1073   header->agg_pos = header->write_pos + agg_buf_pos;
1074 
1075   io.aiocb.aio_fildes = fd;
1076   io.aiocb.aio_offset = header->write_pos;
1077   io.aiocb.aio_buf    = agg_buffer;
1078   io.aiocb.aio_nbytes = agg_buf_pos;
1079   io.action           = this;
1080   /*
1081     Callback on AIO thread so that we can issue a new write ASAP
1082     as all writes are serialized in the volume.  This is not necessary
1083     for reads proceed independently.
1084    */
1085   io.thread = AIO_CALLBACK_THREAD_AIO;
1086   SET_HANDLER(&Vol::aggWriteDone);
1087   ink_aio_write(&io);
1088 
1089 Lwait:
1090   int ret = EVENT_CONT;
1091   while ((c = tocall.dequeue())) {
1092     if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) {
1093       ret = EVENT_RETURN;
1094     } else {
1095       eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
1096     }
1097   }
1098   return ret;
1099 }
1100 
1101 int
openWriteCloseDir(int,Event *)1102 CacheVC::openWriteCloseDir(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
1103 {
1104   cancel_trigger();
1105   {
1106     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
1107     if (!lock.is_locked()) {
1108       SET_HANDLER(&CacheVC::openWriteCloseDir);
1109       ink_assert(!is_io_in_progress());
1110       VC_SCHED_LOCK_RETRY();
1111     }
1112     vol->close_write(this);
1113     if (closed < 0 && fragment) {
1114       dir_delete(&earliest_key, vol, &earliest_dir);
1115     }
1116   }
1117   if (is_debug_tag_set("cache_update")) {
1118     if (f.update && closed > 0) {
1119       if (!total_len && !f.allow_empty_doc && alternate_index != CACHE_ALT_REMOVED) {
1120         Debug("cache_update", "header only %d (%" PRIu64 ", %" PRIu64 ")", DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0],
1121               update_key.b[1]);
1122 
1123       } else if ((total_len || f.allow_empty_doc) && alternate_index != CACHE_ALT_REMOVED) {
1124         Debug("cache_update", "header body, %d, (%" PRIu64 ", %" PRIu64 "), (%" PRIu64 ", %" PRIu64 ")",
1125               DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], update_key.b[1], earliest_key.b[0], earliest_key.b[1]);
1126       } else if (!total_len && alternate_index == CACHE_ALT_REMOVED) {
1127         Debug("cache_update", "alt delete, %d, (%" PRIu64 ", %" PRIu64 ")", DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0],
1128               update_key.b[1]);
1129       }
1130     }
1131   }
1132   // update the appropriate stat variable
1133   // These variables may not give the current no of documents with
1134   // one, two and three or more fragments. This is because for
1135   // updates we dont decrement the variable corresponding the old
1136   // size of the document
1137   if ((closed == 1) && (total_len > 0 || f.allow_empty_doc)) {
1138     DDebug("cache_stats", "Fragment = %d", fragment);
1139     switch (fragment) {
1140     case 0:
1141       CACHE_INCREMENT_DYN_STAT(cache_single_fragment_document_count_stat);
1142       break;
1143     case 1:
1144       CACHE_INCREMENT_DYN_STAT(cache_two_fragment_document_count_stat);
1145       break;
1146     default:
1147       CACHE_INCREMENT_DYN_STAT(cache_three_plus_plus_fragment_document_count_stat);
1148       break;
1149     }
1150   }
1151   if (f.close_complete) {
1152     recursive++;
1153     ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
1154     vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, (void *)&vio);
1155     recursive--;
1156   }
1157   return free_CacheVC(this);
1158 }
1159 
1160 int
openWriteCloseHeadDone(int event,Event * e)1161 CacheVC::openWriteCloseHeadDone(int event, Event *e)
1162 {
1163   if (event == AIO_EVENT_DONE) {
1164     set_io_not_in_progress();
1165   } else if (is_io_in_progress()) {
1166     return EVENT_CONT;
1167   }
1168   {
1169     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
1170     if (!lock.is_locked()) {
1171       VC_LOCK_RETRY_EVENT();
1172     }
1173     od->writing_vec = false;
1174     if (!io.ok()) {
1175       goto Lclose;
1176     }
1177     ink_assert(f.use_first_key);
1178     if (!od->dont_update_directory) {
1179       if (dir_is_empty(&od->first_dir)) {
1180         dir_insert(&first_key, vol, &dir);
1181       } else {
1182         // multiple fragment vector write
1183         dir_overwrite(&first_key, vol, &dir, &od->first_dir, false);
1184         // insert moved resident alternate
1185         if (od->move_resident_alt) {
1186           if (dir_valid(vol, &od->single_doc_dir)) {
1187             dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
1188           }
1189           od->move_resident_alt = false;
1190         }
1191       }
1192       od->first_dir = dir;
1193       if (frag_type == CACHE_FRAG_TYPE_HTTP && f.single_fragment) {
1194         // fragment is tied to the vector
1195         od->move_resident_alt = true;
1196         if (!f.rewrite_resident_alt) {
1197           od->single_doc_key = earliest_key;
1198         }
1199         dir_assign(&od->single_doc_dir, &dir);
1200         dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
1201       }
1202     }
1203   }
1204 Lclose:
1205   return openWriteCloseDir(event, e);
1206 }
1207 
1208 int
openWriteCloseHead(int event,Event * e)1209 CacheVC::openWriteCloseHead(int event, Event *e)
1210 {
1211   cancel_trigger();
1212   f.use_first_key = 1;
1213   if (io.ok()) {
1214     ink_assert(fragment || (length == (int64_t)total_len));
1215   } else {
1216     return openWriteCloseDir(event, e);
1217   }
1218   if (f.data_done) {
1219     write_len = 0;
1220   } else {
1221     write_len = length;
1222   }
1223   if (frag_type == CACHE_FRAG_TYPE_HTTP) {
1224     SET_HANDLER(&CacheVC::updateVector);
1225     return updateVector(EVENT_IMMEDIATE, nullptr);
1226   } else {
1227     header_len = header_to_write_len;
1228     SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
1229     return do_write_lock();
1230   }
1231 }
1232 
1233 int
openWriteCloseDataDone(int event,Event * e)1234 CacheVC::openWriteCloseDataDone(int event, Event *e)
1235 {
1236   int ret = 0;
1237   cancel_trigger();
1238 
1239   if (event == AIO_EVENT_DONE) {
1240     set_io_not_in_progress();
1241   } else if (is_io_in_progress()) {
1242     return EVENT_CONT;
1243   }
1244   if (!io.ok()) {
1245     return openWriteCloseDir(event, e);
1246   }
1247   {
1248     CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
1249     if (!lock.is_locked()) {
1250       VC_LOCK_RETRY_EVENT();
1251     }
1252     if (!fragment) {
1253       ink_assert(key == earliest_key);
1254       earliest_dir = dir;
1255     } else {
1256       // Store the offset only if there is a table.
1257       // Currently there is no alt (and thence no table) for non-HTTP.
1258       if (alternate.valid()) {
1259         alternate.push_frag_offset(write_pos);
1260       }
1261     }
1262     fragment++;
1263     write_pos += write_len;
1264     dir_insert(&key, vol, &dir);
1265     blocks = iobufferblock_skip(blocks.get(), &offset, &length, write_len);
1266     next_CacheKey(&key, &key);
1267     if (length) {
1268       write_len = length;
1269       if (write_len > MAX_FRAG_SIZE) {
1270         write_len = MAX_FRAG_SIZE;
1271       }
1272       if ((ret = do_write_call()) == EVENT_RETURN) {
1273         goto Lcallreturn;
1274       }
1275       return ret;
1276     }
1277     f.data_done = 1;
1278     return openWriteCloseHead(event, e); // must be called under vol lock from here
1279   }
1280 Lcallreturn:
1281   return handleEvent(AIO_EVENT_DONE, nullptr);
1282 }
1283 
1284 int
openWriteClose(int event,Event * e)1285 CacheVC::openWriteClose(int event, Event *e)
1286 {
1287   cancel_trigger();
1288   if (is_io_in_progress()) {
1289     if (event != AIO_EVENT_DONE) {
1290       return EVENT_CONT;
1291     }
1292     set_io_not_in_progress();
1293     if (!io.ok()) {
1294       return openWriteCloseDir(event, e);
1295     }
1296   }
1297   if (closed > 0 || f.allow_empty_doc) {
1298     if (total_len == 0) {
1299       if (f.update || f.allow_empty_doc) {
1300         return updateVector(event, e);
1301       } else {
1302         // If we've been CLOSE'd but nothing has been written then
1303         // this close is transformed into an abort.
1304         closed = -1;
1305         return openWriteCloseDir(event, e);
1306       }
1307     }
1308     if (length && (fragment || length > static_cast<int>(MAX_FRAG_SIZE))) {
1309       SET_HANDLER(&CacheVC::openWriteCloseDataDone);
1310       write_len = length;
1311       if (write_len > MAX_FRAG_SIZE) {
1312         write_len = MAX_FRAG_SIZE;
1313       }
1314       return do_write_lock_call();
1315     } else {
1316       return openWriteCloseHead(event, e);
1317     }
1318   } else {
1319     return openWriteCloseDir(event, e);
1320   }
1321 }
1322 
1323 int
openWriteWriteDone(int event,Event * e)1324 CacheVC::openWriteWriteDone(int event, Event *e)
1325 {
1326   cancel_trigger();
1327   if (event == AIO_EVENT_DONE) {
1328     set_io_not_in_progress();
1329   } else if (is_io_in_progress()) {
1330     return EVENT_CONT;
1331   }
1332   // In the event of VC_EVENT_ERROR, the cont must do an io_close
1333   if (!io.ok()) {
1334     if (closed) {
1335       closed = -1;
1336       return die();
1337     }
1338     SET_HANDLER(&CacheVC::openWriteMain);
1339     return calluser(VC_EVENT_ERROR);
1340   }
1341   {
1342     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
1343     if (!lock.is_locked()) {
1344       VC_LOCK_RETRY_EVENT();
1345     }
1346     // store the earliest directory. Need to remove the earliest dir
1347     // in case the writer aborts.
1348     if (!fragment) {
1349       ink_assert(key == earliest_key);
1350       earliest_dir = dir;
1351     } else {
1352       // Store the offset only if there is a table.
1353       // Currently there is no alt (and thence no table) for non-HTTP.
1354       if (alternate.valid()) {
1355         alternate.push_frag_offset(write_pos);
1356       }
1357     }
1358     ++fragment;
1359     write_pos += write_len;
1360     dir_insert(&key, vol, &dir);
1361     DDebug("cache_insert", "WriteDone: %X, %X, %d", key.slice32(0), first_key.slice32(0), write_len);
1362     blocks = iobufferblock_skip(blocks.get(), &offset, &length, write_len);
1363     next_CacheKey(&key, &key);
1364   }
1365   if (closed) {
1366     return die();
1367   }
1368   SET_HANDLER(&CacheVC::openWriteMain);
1369   return openWriteMain(event, e);
1370 }
1371 
1372 static inline int
target_fragment_size()1373 target_fragment_size()
1374 {
1375   uint64_t value = cache_config_target_fragment_size - sizeof(Doc);
1376   ink_release_assert(value <= MAX_FRAG_SIZE);
1377   return value;
1378 }
1379 
1380 int
openWriteMain(int,Event *)1381 CacheVC::openWriteMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
1382 {
1383   cancel_trigger();
1384   int called_user = 0;
1385   ink_assert(!is_io_in_progress());
1386 Lagain:
1387   if (!vio.buffer.writer()) {
1388     if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE) {
1389       return EVENT_DONE;
1390     }
1391     if (!vio.buffer.writer()) {
1392       return EVENT_CONT;
1393     }
1394   }
1395   if (vio.ntodo() <= 0) {
1396     called_user = 1;
1397     if (calluser(VC_EVENT_WRITE_COMPLETE) == EVENT_DONE) {
1398       return EVENT_DONE;
1399     }
1400     ink_assert(!f.close_complete || !"close expected after write COMPLETE");
1401     if (vio.ntodo() <= 0) {
1402       return EVENT_CONT;
1403     }
1404   }
1405   int64_t ntodo       = static_cast<int64_t>(vio.ntodo() + length);
1406   int64_t total_avail = vio.buffer.reader()->read_avail();
1407   int64_t avail       = total_avail;
1408   int64_t towrite     = avail + length;
1409   if (towrite > ntodo) {
1410     avail -= (towrite - ntodo);
1411     towrite = ntodo;
1412   }
1413   if (towrite > static_cast<int>(MAX_FRAG_SIZE)) {
1414     avail -= (towrite - MAX_FRAG_SIZE);
1415     towrite = MAX_FRAG_SIZE;
1416   }
1417   if (!blocks && towrite) {
1418     blocks = vio.buffer.reader()->block;
1419     offset = vio.buffer.reader()->start_offset;
1420   }
1421   if (avail > 0) {
1422     vio.buffer.reader()->consume(avail);
1423     vio.ndone += avail;
1424     total_len += avail;
1425   }
1426   length = static_cast<uint64_t>(towrite);
1427   if (length > target_fragment_size() && (length < target_fragment_size() + target_fragment_size() / 4)) {
1428     write_len = target_fragment_size();
1429   } else {
1430     write_len = length;
1431   }
1432   bool not_writing = towrite != ntodo && towrite < target_fragment_size();
1433   if (!called_user) {
1434     if (not_writing) {
1435       called_user = 1;
1436       if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE) {
1437         return EVENT_DONE;
1438       }
1439       goto Lagain;
1440     } else if (vio.ntodo() <= 0) {
1441       goto Lagain;
1442     }
1443   }
1444   if (not_writing) {
1445     return EVENT_CONT;
1446   }
1447   if (towrite == ntodo && f.close_complete) {
1448     closed = 1;
1449     SET_HANDLER(&CacheVC::openWriteClose);
1450     return openWriteClose(EVENT_NONE, nullptr);
1451   }
1452   SET_HANDLER(&CacheVC::openWriteWriteDone);
1453   return do_write_lock_call();
1454 }
1455 
1456 // begin overwrite
1457 int
openWriteOverwrite(int event,Event * e)1458 CacheVC::openWriteOverwrite(int event, Event *e)
1459 {
1460   cancel_trigger();
1461   if (event != AIO_EVENT_DONE) {
1462     if (event == EVENT_IMMEDIATE) {
1463       last_collision = nullptr;
1464     }
1465   } else {
1466     Doc *doc = nullptr;
1467     set_io_not_in_progress();
1468     if (_action.cancelled) {
1469       return openWriteCloseDir(event, e);
1470     }
1471     if (!io.ok()) {
1472       goto Ldone;
1473     }
1474     doc = reinterpret_cast<Doc *>(buf->data());
1475     if (!(doc->first_key == first_key)) {
1476       goto Lcollision;
1477     }
1478     od->first_dir = dir;
1479     first_buf     = buf;
1480     goto Ldone;
1481   }
1482 Lcollision : {
1483   CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
1484   if (!lock.is_locked()) {
1485     VC_LOCK_RETRY_EVENT();
1486   }
1487   int res = dir_probe(&first_key, vol, &dir, &last_collision);
1488   if (res > 0) {
1489     if ((res = do_read_call(&first_key)) == EVENT_RETURN) {
1490       goto Lcallreturn;
1491     }
1492     return res;
1493   }
1494 }
1495 Ldone:
1496   SET_HANDLER(&CacheVC::openWriteMain);
1497   return callcont(CACHE_EVENT_OPEN_WRITE);
1498 Lcallreturn:
1499   return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call
1500 }
1501 
1502 // openWriteStartDone handles vector read (addition of alternates)
1503 // and lock misses
1504 int
openWriteStartDone(int event,Event * e)1505 CacheVC::openWriteStartDone(int event, Event *e)
1506 {
1507   intptr_t err = ECACHE_NO_DOC;
1508   cancel_trigger();
1509   if (is_io_in_progress()) {
1510     if (event != AIO_EVENT_DONE) {
1511       return EVENT_CONT;
1512     }
1513     set_io_not_in_progress();
1514   }
1515   {
1516     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
1517     if (!lock.is_locked()) {
1518       VC_LOCK_RETRY_EVENT();
1519     }
1520 
1521     if (_action.cancelled && (!od || !od->has_multiple_writers())) {
1522       goto Lcancel;
1523     }
1524 
1525     if (event == AIO_EVENT_DONE) { // vector read done
1526       Doc *doc = reinterpret_cast<Doc *>(buf->data());
1527       if (!io.ok()) {
1528         err = ECACHE_READ_FAIL;
1529         goto Lfailure;
1530       }
1531 
1532       /* INKqa07123.
1533          A directory entry which is no longer valid may have been overwritten.
1534          We need to start afresh from the beginning by setting last_collision
1535          to nullptr.
1536        */
1537       if (!dir_valid(vol, &dir)) {
1538         DDebug("cache_write", "OpenReadStartDone: Dir not valid: Write Head: %" PRId64 ", Dir: %" PRId64,
1539                (int64_t)vol->offset_to_vol_offset(vol->header->write_pos), dir_offset(&dir));
1540         last_collision = nullptr;
1541         goto Lcollision;
1542       }
1543       if (!(doc->first_key == first_key)) {
1544         goto Lcollision;
1545       }
1546 
1547       if (doc->magic != DOC_MAGIC || !doc->hlen || this->load_http_info(write_vector, doc, buf.object()) != doc->hlen) {
1548         err = ECACHE_BAD_META_DATA;
1549         goto Lfailure;
1550       }
1551       ink_assert(write_vector->count() > 0);
1552       od->first_dir = dir;
1553       first_dir     = dir;
1554       if (doc->single_fragment()) {
1555         // fragment is tied to the vector
1556         od->move_resident_alt = true;
1557         od->single_doc_key    = doc->key;
1558         dir_assign(&od->single_doc_dir, &dir);
1559         dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
1560       }
1561       first_buf = buf;
1562       goto Lsuccess;
1563     }
1564 
1565   Lcollision:
1566     int if_writers = ((uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES);
1567     if (!od) {
1568       if ((err = vol->open_write(this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0) {
1569         goto Lfailure;
1570       }
1571       if (od->has_multiple_writers()) {
1572         MUTEX_RELEASE(lock);
1573         SET_HANDLER(&CacheVC::openWriteMain);
1574         return callcont(CACHE_EVENT_OPEN_WRITE);
1575       }
1576     }
1577     // check for collision
1578     if (dir_probe(&first_key, vol, &dir, &last_collision)) {
1579       od->reading_vec = true;
1580       int ret         = do_read_call(&first_key);
1581       if (ret == EVENT_RETURN) {
1582         goto Lcallreturn;
1583       }
1584       return ret;
1585     }
1586     if (f.update) {
1587       // fail update because vector has been GC'd
1588       goto Lfailure;
1589     }
1590   }
1591 Lsuccess:
1592   od->reading_vec = false;
1593   if (_action.cancelled) {
1594     goto Lcancel;
1595   }
1596   SET_HANDLER(&CacheVC::openWriteMain);
1597   return callcont(CACHE_EVENT_OPEN_WRITE);
1598 
1599 Lfailure:
1600   CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
1601   _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
1602 Lcancel:
1603   if (od) {
1604     od->reading_vec = false;
1605     return openWriteCloseDir(event, e);
1606   } else {
1607     return free_CacheVC(this);
1608   }
1609 Lcallreturn:
1610   return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call
1611 }
1612 
1613 // handle lock failures from main Cache::open_write entry points below
1614 int
openWriteStartBegin(int,Event *)1615 CacheVC::openWriteStartBegin(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
1616 {
1617   intptr_t err;
1618   cancel_trigger();
1619   if (_action.cancelled) {
1620     return free_CacheVC(this);
1621   }
1622   if (((err = vol->open_write_lock(this, false, 1)) > 0)) {
1623     CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
1624     free_CacheVC(this);
1625     _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
1626     return EVENT_DONE;
1627   }
1628   if (err < 0) {
1629     VC_SCHED_LOCK_RETRY();
1630   }
1631   if (f.overwrite) {
1632     SET_HANDLER(&CacheVC::openWriteOverwrite);
1633     return openWriteOverwrite(EVENT_IMMEDIATE, nullptr);
1634   } else {
1635     // write by key
1636     SET_HANDLER(&CacheVC::openWriteMain);
1637     return callcont(CACHE_EVENT_OPEN_WRITE);
1638   }
1639 }
1640 
1641 // main entry point for writing of of non-http documents
1642 Action *
open_write(Continuation * cont,const CacheKey * key,CacheFragType frag_type,int options,time_t apin_in_cache,const char * hostname,int host_len)1643 Cache::open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_type, int options, time_t apin_in_cache,
1644                   const char *hostname, int host_len)
1645 {
1646   if (!CacheProcessor::IsCacheReady(frag_type)) {
1647     cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-ECACHE_NOT_READY);
1648     return ACTION_RESULT_DONE;
1649   }
1650 
1651   ink_assert(caches[frag_type] == this);
1652 
1653   intptr_t res      = 0;
1654   CacheVC *c        = new_CacheVC(cont);
1655   ProxyMutex *mutex = cont->mutex.get();
1656   SCOPED_MUTEX_LOCK(lock, c->mutex, this_ethread());
1657   c->vio.op    = VIO::WRITE;
1658   c->base_stat = cache_write_active_stat;
1659   c->vol       = key_to_vol(key, hostname, host_len);
1660   Vol *vol     = c->vol;
1661   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
1662   c->first_key = c->key = *key;
1663   c->frag_type          = frag_type;
1664   /*
1665      The transition from single fragment document to a multi-fragment document
1666      would cause a problem if the key and the first_key collide. In case of
1667      a collision, old vector data could be served to HTTP. Need to avoid that.
1668      Also, when evacuating a fragment, we have to decide if its the first_key
1669      or the earliest_key based on the dir_tag.
1670    */
1671   do {
1672     rand_CacheKey(&c->key, cont->mutex);
1673   } while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
1674   c->earliest_key     = c->key;
1675   c->info             = nullptr;
1676   c->f.overwrite      = (options & CACHE_WRITE_OPT_OVERWRITE) != 0;
1677   c->f.close_complete = (options & CACHE_WRITE_OPT_CLOSE_COMPLETE) != 0;
1678   c->f.sync           = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC;
1679   c->pin_in_cache     = static_cast<uint32_t>(apin_in_cache);
1680 
1681   if ((res = c->vol->open_write_lock(c, false, 1)) > 0) {
1682     // document currently being written, abort
1683     CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
1684     cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-res);
1685     free_CacheVC(c);
1686     return ACTION_RESULT_DONE;
1687   }
1688   if (res < 0) {
1689     SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartBegin);
1690     c->trigger = CONT_SCHED_LOCK_RETRY(c);
1691     return &c->_action;
1692   }
1693   if (!c->f.overwrite) {
1694     SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
1695     c->callcont(CACHE_EVENT_OPEN_WRITE);
1696     return ACTION_RESULT_DONE;
1697   } else {
1698     SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteOverwrite);
1699     if (c->openWriteOverwrite(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
1700       return ACTION_RESULT_DONE;
1701     } else {
1702       return &c->_action;
1703     }
1704   }
1705 }
1706 
1707 // main entry point for writing of http documents
1708 Action *
open_write(Continuation * cont,const CacheKey * key,CacheHTTPInfo * info,time_t apin_in_cache,const CacheKey *,CacheFragType type,const char * hostname,int host_len)1709 Cache::open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *info, time_t apin_in_cache,
1710                   const CacheKey * /* key1 ATS_UNUSED */, CacheFragType type, const char *hostname, int host_len)
1711 {
1712   if (!CacheProcessor::IsCacheReady(type)) {
1713     cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-ECACHE_NOT_READY);
1714     return ACTION_RESULT_DONE;
1715   }
1716 
1717   ink_assert(caches[type] == this);
1718   intptr_t err      = 0;
1719   int if_writers    = (uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES;
1720   CacheVC *c        = new_CacheVC(cont);
1721   ProxyMutex *mutex = cont->mutex.get();
1722   c->vio.op         = VIO::WRITE;
1723   c->first_key      = *key;
1724   /*
1725      The transition from single fragment document to a multi-fragment document
1726      would cause a problem if the key and the first_key collide. In case of
1727      a collision, old vector data could be served to HTTP. Need to avoid that.
1728      Also, when evacuating a fragment, we have to decide if its the first_key
1729      or the earliest_key based on the dir_tag.
1730    */
1731   do {
1732     rand_CacheKey(&c->key, cont->mutex);
1733   } while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
1734   c->earliest_key = c->key;
1735   c->frag_type    = CACHE_FRAG_TYPE_HTTP;
1736   c->vol          = key_to_vol(key, hostname, host_len);
1737   Vol *vol        = c->vol;
1738   c->info         = info;
1739   if (c->info && (uintptr_t)info != CACHE_ALLOW_MULTIPLE_WRITES) {
1740     /*
1741        Update has the following code paths :
1742        a) Update alternate header only :
1743        In this case the vector has to be rewritten. The content
1744        length(update_len) and the key for the document are set in the
1745        new_info in the set_http_info call.
1746        HTTP OPERATIONS
1747        open_write with info set
1748        set_http_info new_info
1749        (total_len == 0)
1750        close
1751        b) Update alternate and data
1752        In this case both the vector and the data needs to be rewritten.
1753        This case is similar to the standard write of a document case except
1754        that the new_info is inserted into the vector at the alternate_index
1755        (overwriting the old alternate) rather than the end of the vector.
1756        HTTP OPERATIONS
1757        open_write with info set
1758        set_http_info new_info
1759        do_io_write =>  (total_len > 0)
1760        close
1761        c) Delete an alternate
1762        The vector may need to be deleted (if there was only one alternate) or
1763        rewritten (if there were more than one alternate). The deletion of the
1764        vector is done in openWriteRemoveVector.
1765        HTTP OPERATIONS
1766        open_write with info set
1767        close
1768      */
1769     c->f.update  = 1;
1770     c->base_stat = cache_update_active_stat;
1771     DDebug("cache_update", "Update called");
1772     info->object_key_get(&c->update_key);
1773     ink_assert(!(c->update_key == zero_key));
1774     c->update_len = info->object_size_get();
1775   } else {
1776     c->base_stat = cache_write_active_stat;
1777   }
1778   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
1779   c->pin_in_cache = static_cast<uint32_t>(apin_in_cache);
1780 
1781   {
1782     CACHE_TRY_LOCK(lock, c->vol->mutex, cont->mutex->thread_holding);
1783     if (lock.is_locked()) {
1784       if ((err = c->vol->open_write(c, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0) {
1785         goto Lfailure;
1786       }
1787       // If there are multiple writers, then this one cannot be an update.
1788       // Only the first writer can do an update. If that's the case, we can
1789       // return success to the state machine now.;
1790       if (c->od->has_multiple_writers()) {
1791         goto Lmiss;
1792       }
1793       if (!dir_probe(key, c->vol, &c->dir, &c->last_collision)) {
1794         if (c->f.update) {
1795           // fail update because vector has been GC'd
1796           // This situation can also arise in openWriteStartDone
1797           err = ECACHE_NO_DOC;
1798           goto Lfailure;
1799         }
1800         // document doesn't exist, begin write
1801         goto Lmiss;
1802       } else {
1803         c->od->reading_vec = true;
1804         // document exists, read vector
1805         SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
1806         switch (c->do_read_call(&c->first_key)) {
1807         case EVENT_DONE:
1808           return ACTION_RESULT_DONE;
1809         case EVENT_RETURN:
1810           goto Lcallreturn;
1811         default:
1812           return &c->_action;
1813         }
1814       }
1815     }
1816     // missed lock
1817     SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
1818     CONT_SCHED_LOCK_RETRY(c);
1819     return &c->_action;
1820   }
1821 
1822 Lmiss:
1823   SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
1824   c->callcont(CACHE_EVENT_OPEN_WRITE);
1825   return ACTION_RESULT_DONE;
1826 
1827 Lfailure:
1828   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
1829   cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
1830   if (c->od) {
1831     c->openWriteCloseDir(EVENT_IMMEDIATE, nullptr);
1832     return ACTION_RESULT_DONE;
1833   }
1834   free_CacheVC(c);
1835   return ACTION_RESULT_DONE;
1836 
1837 Lcallreturn:
1838   if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) {
1839     return ACTION_RESULT_DONE;
1840   }
1841   return &c->_action;
1842 }
1843