1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "P_Cache.h"
25 
26 // Cache Inspector and State Pages
27 #include "P_CacheTest.h"
28 #include "StatPages.h"
29 
30 #include "tscore/I_Layout.h"
31 #include "tscore/Filenames.h"
32 
33 #include "HttpTransactCache.h"
34 #include "HttpSM.h"
35 #include "HttpCacheSM.h"
36 #include "InkAPIInternal.h"
37 
38 #include "tscore/hugepages.h"
39 
40 #include <atomic>
41 
42 constexpr ts::VersionNumber CACHE_DB_VERSION(CACHE_DB_MAJOR_VERSION, CACHE_DB_MINOR_VERSION);
43 
44 // Compilation Options
45 #define USELESS_REENABLES // allow them for now
46 // #define VERIFY_JTEST_DATA
47 
48 static size_t DEFAULT_RAM_CACHE_MULTIPLIER = 10; // I.e. 10x 1MB per 1GB of disk.
49 
50 // This is the oldest version number that is still usable.
51 static short int const CACHE_DB_MAJOR_VERSION_COMPATIBLE = 21;
52 
53 #define DOCACHE_CLEAR_DYN_STAT(x)  \
54   do {                             \
55     RecSetRawStatSum(rsb, x, 0);   \
56     RecSetRawStatCount(rsb, x, 0); \
57   } while (0);
58 
59 // Configuration
60 
61 int64_t cache_config_ram_cache_size            = AUTO_SIZE_RAM_CACHE;
62 int cache_config_ram_cache_algorithm           = 1;
63 int cache_config_ram_cache_compress            = 0;
64 int cache_config_ram_cache_compress_percent    = 90;
65 int cache_config_ram_cache_use_seen_filter     = 1;
66 int cache_config_http_max_alts                 = 3;
67 int cache_config_dir_sync_frequency            = 60;
68 int cache_config_permit_pinning                = 0;
69 int cache_config_select_alternate              = 1;
70 int cache_config_max_doc_size                  = 0;
71 int cache_config_min_average_object_size       = ESTIMATED_OBJECT_SIZE;
72 int64_t cache_config_ram_cache_cutoff          = AGG_SIZE;
73 int cache_config_max_disk_errors               = 5;
74 int cache_config_hit_evacuate_percent          = 10;
75 int cache_config_hit_evacuate_size_limit       = 0;
76 int cache_config_force_sector_size             = 0;
77 int cache_config_target_fragment_size          = DEFAULT_TARGET_FRAGMENT_SIZE;
78 int cache_config_agg_write_backlog             = AGG_SIZE * 2;
79 int cache_config_enable_checksum               = 0;
80 int cache_config_alt_rewrite_max_size          = 4096;
81 int cache_config_read_while_writer             = 0;
82 int cache_config_mutex_retry_delay             = 2;
83 int cache_read_while_writer_retry_delay        = 50;
84 int cache_config_read_while_writer_max_retries = 10;
85 
86 // Globals
87 
88 RecRawStatBlock *cache_rsb          = nullptr;
89 Cache *theCache                     = nullptr;
90 CacheDisk **gdisks                  = nullptr;
91 int gndisks                         = 0;
92 std::atomic<int> initialize_disk    = 0;
93 Cache *caches[NUM_CACHE_FRAG_TYPES] = {nullptr};
94 CacheSync *cacheDirSync             = nullptr;
95 Store theCacheStore;
96 int CacheProcessor::initialized          = CACHE_INITIALIZING;
97 uint32_t CacheProcessor::cache_ready     = 0;
98 int CacheProcessor::start_done           = 0;
99 bool CacheProcessor::clear               = false;
100 bool CacheProcessor::fix                 = false;
101 bool CacheProcessor::check               = false;
102 int CacheProcessor::start_internal_flags = 0;
103 int CacheProcessor::auto_clear_flag      = 0;
104 CacheProcessor cacheProcessor;
105 Vol **gvol             = nullptr;
106 std::atomic<int> gnvol = 0;
107 ClassAllocator<CacheVC> cacheVConnectionAllocator("cacheVConnection");
108 ClassAllocator<EvacuationBlock> evacuationBlockAllocator("evacuationBlock");
109 ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator("cacheRemoveCont");
110 ClassAllocator<EvacuationKey> evacuationKeyAllocator("evacuationKey");
111 int CacheVC::size_to_init = -1;
112 CacheKey zero_key;
113 
114 struct VolInitInfo {
115   off_t recover_pos;
116   AIOCallbackInternal vol_aio[4];
117   char *vol_h_f;
118 
VolInitInfoVolInitInfo119   VolInitInfo()
120   {
121     recover_pos = 0;
122     vol_h_f     = static_cast<char *>(ats_memalign(ats_pagesize(), 4 * STORE_BLOCK_SIZE));
123     memset(vol_h_f, 0, 4 * STORE_BLOCK_SIZE);
124   }
125 
~VolInitInfoVolInitInfo126   ~VolInitInfo()
127   {
128     for (auto &i : vol_aio) {
129       i.action = nullptr;
130       i.mutex.clear();
131     }
132     free(vol_h_f);
133   }
134 };
135 
136 #if AIO_MODE == AIO_MODE_NATIVE
137 struct VolInit : public Continuation {
138   Vol *vol;
139   char *path;
140   off_t blocks;
141   int64_t offset;
142   bool vol_clear;
143 
144   int
mainEventVolInit145   mainEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
146   {
147     vol->init(path, blocks, offset, vol_clear);
148     mutex.clear();
149     delete this;
150     return EVENT_DONE;
151   }
152 
VolInitVolInit153   VolInit(Vol *v, char *p, off_t b, int64_t o, bool c) : Continuation(v->mutex), vol(v), path(p), blocks(b), offset(o), vol_clear(c)
154   {
155     SET_HANDLER(&VolInit::mainEvent);
156   }
157 };
158 
159 struct DiskInit : public Continuation {
160   CacheDisk *disk;
161   char *s;
162   off_t blocks;
163   off_t askip;
164   int ahw_sector_size;
165   int fildes;
166   bool clear;
167 
168   int
mainEventDiskInit169   mainEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
170   {
171     disk->open(s, blocks, askip, ahw_sector_size, fildes, clear);
172     ats_free(s);
173     mutex.clear();
174     delete this;
175     return EVENT_DONE;
176   }
177 
DiskInitDiskInit178   DiskInit(CacheDisk *d, char *str, off_t b, off_t skip, int sector, int f, bool c)
179     : Continuation(d->mutex), disk(d), s(ats_strdup(str)), blocks(b), askip(skip), ahw_sector_size(sector), fildes(f), clear(c)
180   {
181     SET_HANDLER(&DiskInit::mainEvent);
182   }
183 };
184 #endif
185 void cplist_init();
186 static void cplist_update();
187 int cplist_reconfigure();
188 static int create_volume(int volume_number, off_t size_in_blocks, int scheme, CacheVol *cp);
189 static void rebuild_host_table(Cache *cache);
190 void register_cache_stats(RecRawStatBlock *rsb, const char *prefix);
191 
192 // Global list of the volumes created
193 Queue<CacheVol> cp_list;
194 int cp_list_len = 0;
195 ConfigVolumes config_volumes;
196 
197 #if TS_HAS_TESTS
198 void
force_link_CacheTestCaller()199 force_link_CacheTestCaller()
200 {
201   force_link_CacheTest();
202 }
203 #endif
204 
205 int64_t
cache_bytes_used(int volume)206 cache_bytes_used(int volume)
207 {
208   uint64_t used = 0;
209 
210   for (int i = 0; i < gnvol; i++) {
211     if (!DISK_BAD(gvol[i]->disk) && (volume == -1 || gvol[i]->cache_vol->vol_number == volume)) {
212       if (!gvol[i]->header->cycle) {
213         used += gvol[i]->header->write_pos - gvol[i]->start;
214       } else {
215         used += gvol[i]->len - gvol[i]->dirlen() - EVACUATION_SIZE;
216       }
217     }
218   }
219 
220   return used;
221 }
222 
223 int
cache_stats_bytes_used_cb(const char * name,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)224 cache_stats_bytes_used_cb(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
225 {
226   int volume = -1;
227   char *p;
228 
229   // Well, there's no way to pass along the volume ID, so extracting it from the stat name.
230   p = strstr(const_cast<char *>(name), "volume_");
231   if (p != nullptr) {
232     // I'm counting on the compiler to optimize out strlen("volume_").
233     volume = strtol(p + strlen("volume_"), nullptr, 10);
234   }
235 
236   if (cacheProcessor.initialized == CACHE_INITIALIZED) {
237     int64_t used, total = 0;
238     float percent_full;
239 
240     used = cache_bytes_used(volume);
241     RecSetGlobalRawStatSum(rsb, id, used);
242     RecRawStatSyncSum(name, data_type, data, rsb, id);
243     RecGetGlobalRawStatSum(rsb, static_cast<int>(cache_bytes_total_stat), &total);
244     percent_full = static_cast<float>(used) / static_cast<float>(total) * 100;
245     // The percent_full float below gets rounded down
246     RecSetGlobalRawStatSum(rsb, static_cast<int>(cache_percent_full_stat), static_cast<int64_t>(percent_full));
247   }
248 
249   return 1;
250 }
251 
252 static int
validate_rww(int new_value)253 validate_rww(int new_value)
254 {
255   if (new_value) {
256     float http_bg_fill;
257 
258     REC_ReadConfigFloat(http_bg_fill, "proxy.config.http.background_fill_completed_threshold");
259     if (http_bg_fill > 0.0) {
260       Note("to enable reading while writing a document, %s should be 0.0: read while writing disabled",
261            "proxy.config.http.background_fill_completed_threshold");
262       return 0;
263     }
264     if (cache_config_max_doc_size > 0) {
265       Note("to enable reading while writing a document, %s should be 0: read while writing disabled",
266            "proxy.config.cache.max_doc_size");
267       return 0;
268     }
269     return new_value;
270   }
271   return 0;
272 }
273 
274 static int
update_cache_config(const char *,RecDataT,RecData data,void *)275 update_cache_config(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data,
276                     void * /* cookie ATS_UNUSED */)
277 {
278   int new_value                  = validate_rww(data.rec_int);
279   cache_config_read_while_writer = new_value;
280 
281   return 0;
282 }
283 
CacheVC()284 CacheVC::CacheVC()
285 {
286   size_to_init = sizeof(CacheVC) - (size_t) & ((CacheVC *)nullptr)->vio;
287   memset((void *)&vio, 0, size_to_init);
288 }
289 
290 HTTPInfo::FragOffset *
get_frag_table()291 CacheVC::get_frag_table()
292 {
293   ink_assert(alternate.valid());
294   return alternate.valid() ? alternate.get_frag_table() : nullptr;
295 }
296 
297 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * abuf)298 CacheVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *abuf)
299 {
300   ink_assert(vio.op == VIO::READ);
301   vio.buffer.writer_for(abuf);
302   vio.set_continuation(c);
303   vio.ndone     = 0;
304   vio.nbytes    = nbytes;
305   vio.vc_server = this;
306 #ifdef DEBUG
307   ink_assert(!c || c->mutex->thread_holding);
308 #endif
309   if (c && !trigger && !recursive) {
310     trigger = c->mutex->thread_holding->schedule_imm_local(this);
311   }
312   return &vio;
313 }
314 
315 VIO *
do_io_pread(Continuation * c,int64_t nbytes,MIOBuffer * abuf,int64_t offset)316 CacheVC::do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *abuf, int64_t offset)
317 {
318   ink_assert(vio.op == VIO::READ);
319   vio.buffer.writer_for(abuf);
320   vio.set_continuation(c);
321   vio.ndone     = 0;
322   vio.nbytes    = nbytes;
323   vio.vc_server = this;
324   seek_to       = offset;
325 #ifdef DEBUG
326   ink_assert(c->mutex->thread_holding);
327 #endif
328   if (!trigger && !recursive) {
329     trigger = c->mutex->thread_holding->schedule_imm_local(this);
330   }
331   return &vio;
332 }
333 
334 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * abuf,bool owner)335 CacheVC::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuf, bool owner)
336 {
337   ink_assert(vio.op == VIO::WRITE);
338   ink_assert(!owner);
339   vio.buffer.reader_for(abuf);
340   vio.set_continuation(c);
341   vio.ndone     = 0;
342   vio.nbytes    = nbytes;
343   vio.vc_server = this;
344 #ifdef DEBUG
345   ink_assert(!c || c->mutex->thread_holding);
346 #endif
347   if (c && !trigger && !recursive) {
348     trigger = c->mutex->thread_holding->schedule_imm_local(this);
349   }
350   return &vio;
351 }
352 
353 void
do_io_close(int alerrno)354 CacheVC::do_io_close(int alerrno)
355 {
356   ink_assert(mutex->thread_holding == this_ethread());
357   int previous_closed = closed;
358   closed              = (alerrno == -1) ? 1 : -1; // Stupid default arguments
359   DDebug("cache_close", "do_io_close %p %d %d", this, alerrno, closed);
360   if (!previous_closed && !recursive) {
361     die();
362   }
363 }
364 
365 void
reenable(VIO * avio)366 CacheVC::reenable(VIO *avio)
367 {
368   DDebug("cache_reenable", "reenable %p", this);
369   (void)avio;
370 #ifdef DEBUG
371   ink_assert(avio->mutex->thread_holding);
372 #endif
373   if (!trigger) {
374 #ifndef USELESS_REENABLES
375     if (vio.op == VIO::READ) {
376       if (vio.buffer.mbuf->max_read_avail() > vio.buffer.writer()->water_mark)
377         ink_assert(!"useless reenable of cache read");
378     } else if (!vio.buffer.reader()->read_avail())
379       ink_assert(!"useless reenable of cache write");
380 #endif
381     trigger = avio->mutex->thread_holding->schedule_imm_local(this);
382   }
383 }
384 
385 void
reenable_re(VIO * avio)386 CacheVC::reenable_re(VIO *avio)
387 {
388   DDebug("cache_reenable", "reenable_re %p", this);
389   (void)avio;
390 #ifdef DEBUG
391   ink_assert(avio->mutex->thread_holding);
392 #endif
393   if (!trigger) {
394     if (!is_io_in_progress() && !recursive) {
395       handleEvent(EVENT_NONE, (void *)nullptr);
396     } else {
397       trigger = avio->mutex->thread_holding->schedule_imm_local(this);
398     }
399   }
400 }
401 
402 bool
get_data(int i,void * data)403 CacheVC::get_data(int i, void *data)
404 {
405   switch (i) {
406   case CACHE_DATA_HTTP_INFO:
407     *(static_cast<CacheHTTPInfo **>(data)) = &alternate;
408     return true;
409   case CACHE_DATA_RAM_CACHE_HIT_FLAG:
410     *(static_cast<int *>(data)) = !f.not_from_ram_cache;
411     return true;
412   default:
413     break;
414   }
415   return false;
416 }
417 
418 int64_t
get_object_size()419 CacheVC::get_object_size()
420 {
421   return (this)->doc_len;
422 }
423 
424 bool
set_data(int,void *)425 CacheVC::set_data(int /* i ATS_UNUSED */, void * /* data */)
426 {
427   ink_assert(!"CacheVC::set_data should not be called!");
428   return true;
429 }
430 
431 void
get_http_info(CacheHTTPInfo ** ainfo)432 CacheVC::get_http_info(CacheHTTPInfo **ainfo)
433 {
434   *ainfo = &(this)->alternate;
435 }
436 
437 // set_http_info must be called before do_io_write
438 // cluster vc does an optimization where it calls do_io_write() before
439 // calling set_http_info(), but it guarantees that the info will
440 // be set before transferring any bytes
441 void
set_http_info(CacheHTTPInfo * ainfo)442 CacheVC::set_http_info(CacheHTTPInfo *ainfo)
443 {
444   ink_assert(!total_len);
445   if (f.update) {
446     ainfo->object_key_set(update_key);
447     ainfo->object_size_set(update_len);
448   } else {
449     ainfo->object_key_set(earliest_key);
450     // don't know the total len yet
451   }
452 
453   MIMEField *field = ainfo->m_alt->m_response_hdr.field_find(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH);
454   if (field && !field->value_get_int64()) {
455     f.allow_empty_doc = 1;
456   } else {
457     f.allow_empty_doc = 0;
458   }
459 
460   alternate.copy_shallow(ainfo);
461   ainfo->clear();
462 }
463 
464 bool
set_pin_in_cache(time_t time_pin)465 CacheVC::set_pin_in_cache(time_t time_pin)
466 {
467   if (total_len) {
468     ink_assert(!"should Pin the document before writing");
469     return false;
470   }
471   if (vio.op != VIO::WRITE) {
472     ink_assert(!"Pinning only allowed while writing objects to the cache");
473     return false;
474   }
475   pin_in_cache = time_pin;
476   return true;
477 }
478 
479 time_t
get_pin_in_cache()480 CacheVC::get_pin_in_cache()
481 {
482   return pin_in_cache;
483 }
484 
485 int
begin_read(CacheVC * cont)486 Vol::begin_read(CacheVC *cont)
487 {
488   ink_assert(cont->mutex->thread_holding == this_ethread());
489   ink_assert(mutex->thread_holding == this_ethread());
490 #ifdef CACHE_STAT_PAGES
491   ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
492   stat_cache_vcs.enqueue(cont, cont->stat_link);
493 #endif
494   // no need for evacuation as the entire document is already in memory
495   if (cont->f.single_fragment) {
496     return 0;
497   }
498   int i = dir_evac_bucket(&cont->earliest_dir);
499   EvacuationBlock *b;
500   for (b = evacuate[i].head; b; b = b->link.next) {
501     if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir)) {
502       continue;
503     }
504     if (b->readers) {
505       b->readers = b->readers + 1;
506     }
507     return 0;
508   }
509   // we don't actually need to preserve this block as it is already in
510   // memory, but this is easier, and evacuations are rare
511   EThread *t        = cont->mutex->thread_holding;
512   b                 = new_EvacuationBlock(t);
513   b->readers        = 1;
514   b->dir            = cont->earliest_dir;
515   b->evac_frags.key = cont->earliest_key;
516   evacuate[i].push(b);
517   return 1;
518 }
519 
520 int
close_read(CacheVC * cont)521 Vol::close_read(CacheVC *cont)
522 {
523   EThread *t = cont->mutex->thread_holding;
524   ink_assert(t == this_ethread());
525   ink_assert(t == mutex->thread_holding);
526   if (dir_is_empty(&cont->earliest_dir)) {
527     return 1;
528   }
529   int i = dir_evac_bucket(&cont->earliest_dir);
530   EvacuationBlock *b;
531   for (b = evacuate[i].head; b;) {
532     EvacuationBlock *next = b->link.next;
533     if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir)) {
534       b = next;
535       continue;
536     }
537     if (b->readers && !--b->readers) {
538       evacuate[i].remove(b);
539       free_EvacuationBlock(b, t);
540       break;
541     }
542     b = next;
543   }
544 #ifdef CACHE_STAT_PAGES
545   stat_cache_vcs.remove(cont, cont->stat_link);
546   ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
547 #endif
548   return 1;
549 }
550 
551 // Cache Processor
552 
553 int
start(int,size_t)554 CacheProcessor::start(int, size_t)
555 {
556   return start_internal(0);
557 }
558 
559 static const int DEFAULT_CACHE_OPTIONS = (O_RDWR);
560 
561 int
start_internal(int flags)562 CacheProcessor::start_internal(int flags)
563 {
564   ink_assert((int)TS_EVENT_CACHE_OPEN_READ == (int)CACHE_EVENT_OPEN_READ);
565   ink_assert((int)TS_EVENT_CACHE_OPEN_READ_FAILED == (int)CACHE_EVENT_OPEN_READ_FAILED);
566   ink_assert((int)TS_EVENT_CACHE_OPEN_WRITE == (int)CACHE_EVENT_OPEN_WRITE);
567   ink_assert((int)TS_EVENT_CACHE_OPEN_WRITE_FAILED == (int)CACHE_EVENT_OPEN_WRITE_FAILED);
568   ink_assert((int)TS_EVENT_CACHE_REMOVE == (int)CACHE_EVENT_REMOVE);
569   ink_assert((int)TS_EVENT_CACHE_REMOVE_FAILED == (int)CACHE_EVENT_REMOVE_FAILED);
570   ink_assert((int)TS_EVENT_CACHE_SCAN == (int)CACHE_EVENT_SCAN);
571   ink_assert((int)TS_EVENT_CACHE_SCAN_FAILED == (int)CACHE_EVENT_SCAN_FAILED);
572   ink_assert((int)TS_EVENT_CACHE_SCAN_OBJECT == (int)CACHE_EVENT_SCAN_OBJECT);
573   ink_assert((int)TS_EVENT_CACHE_SCAN_OPERATION_BLOCKED == (int)CACHE_EVENT_SCAN_OPERATION_BLOCKED);
574   ink_assert((int)TS_EVENT_CACHE_SCAN_OPERATION_FAILED == (int)CACHE_EVENT_SCAN_OPERATION_FAILED);
575   ink_assert((int)TS_EVENT_CACHE_SCAN_DONE == (int)CACHE_EVENT_SCAN_DONE);
576 
577 #if AIO_MODE == AIO_MODE_NATIVE
578   int etype            = ET_NET;
579   int n_netthreads     = eventProcessor.n_threads_for_type[etype];
580   EThread **netthreads = eventProcessor.eventthread[etype];
581   for (int i = 0; i < n_netthreads; ++i) {
582     netthreads[i]->diskHandler = new DiskHandler();
583     netthreads[i]->schedule_imm(netthreads[i]->diskHandler);
584   }
585 #endif
586 
587   start_internal_flags = flags;
588   clear                = !!(flags & PROCESSOR_RECONFIGURE) || auto_clear_flag;
589   fix                  = !!(flags & PROCESSOR_FIX);
590   check                = (flags & PROCESSOR_CHECK) != 0;
591   start_done           = 0;
592 
593   /* read the config file and create the data structures corresponding
594      to the file */
595   gndisks = theCacheStore.n_disks;
596   gdisks  = static_cast<CacheDisk **>(ats_malloc(gndisks * sizeof(CacheDisk *)));
597 
598   // Temporaries to carry values between loops
599   char **paths = static_cast<char **>(alloca(sizeof(char *) * gndisks));
600   memset(paths, 0, sizeof(char *) * gndisks);
601   int *fds = static_cast<int *>(alloca(sizeof(int) * gndisks));
602   memset(fds, 0, sizeof(int) * gndisks);
603   int *sector_sizes = static_cast<int *>(alloca(sizeof(int) * gndisks));
604   memset(sector_sizes, 0, sizeof(int) * gndisks);
605   Span **sds = static_cast<Span **>(alloca(sizeof(Span *) * gndisks));
606   memset(sds, 0, sizeof(Span *) * gndisks);
607 
608   gndisks = 0;
609   ink_aio_set_callback(new AIO_Callback_handler());
610 
611   config_volumes.read_config_file();
612 
613   /*
614    create CacheDisk objects for each span in the configuration file and store in gdisks
615    */
616   for (unsigned i = 0; i < theCacheStore.n_disks; i++) {
617     Span *sd = theCacheStore.disk[i];
618     int opts = DEFAULT_CACHE_OPTIONS;
619 
620     if (!paths[gndisks]) {
621       paths[gndisks] = static_cast<char *>(alloca(PATH_NAME_MAX));
622     }
623     ink_strlcpy(paths[gndisks], sd->pathname, PATH_NAME_MAX);
624     if (!sd->file_pathname) {
625       ink_strlcat(paths[gndisks], "/cache.db", PATH_NAME_MAX);
626       opts |= O_CREAT;
627     }
628 
629 #ifdef O_DIRECT
630     opts |= O_DIRECT;
631 #endif
632 #ifdef O_DSYNC
633     opts |= O_DSYNC;
634 #endif
635     if (check) {
636       opts &= ~O_CREAT;
637       opts |= O_RDONLY;
638     }
639 
640     int fd         = open(paths[gndisks], opts, 0644);
641     int64_t blocks = sd->blocks;
642 
643     if (fd < 0 && (opts & O_CREAT)) { // Try without O_DIRECT if this is a file on filesystem, e.g. tmpfs.
644       fd = open(paths[gndisks], DEFAULT_CACHE_OPTIONS | O_CREAT, 0644);
645     }
646 
647     if (fd >= 0) {
648       bool diskok = true;
649       if (!sd->file_pathname) {
650         if (!check) {
651           if (ftruncate(fd, blocks * STORE_BLOCK_SIZE) < 0) {
652             Warning("unable to truncate cache file '%s' to %" PRId64 " blocks", paths[gndisks], blocks);
653             diskok = false;
654           }
655         } else { // read-only mode checks
656           struct stat sbuf;
657           if (-1 == fstat(fd, &sbuf)) {
658             fprintf(stderr, "Failed to stat cache file for directory %s\n", paths[gndisks]);
659             diskok = false;
660           } else if (blocks != sbuf.st_size / STORE_BLOCK_SIZE) {
661             fprintf(stderr, "Cache file for directory %s is %" PRId64 " bytes, expected %" PRId64 "\n", paths[gndisks],
662                     sbuf.st_size, blocks * static_cast<int64_t>(STORE_BLOCK_SIZE));
663             diskok = false;
664           }
665         }
666       }
667       if (diskok) {
668         int sector_size = sd->hw_sector_size;
669 
670         gdisks[gndisks] = new CacheDisk();
671         if (check) {
672           gdisks[gndisks]->read_only_p = true;
673         }
674         gdisks[gndisks]->forced_volume_num = sd->forced_volume_num;
675         if (sd->hash_base_string) {
676           gdisks[gndisks]->hash_base_string = ats_strdup(sd->hash_base_string);
677         }
678 
679         if (sector_size < cache_config_force_sector_size) {
680           sector_size = cache_config_force_sector_size;
681         }
682 
683         // It's actually common that the hardware I/O size is larger than the store block size as
684         // storage systems increasingly want larger I/Os. For example, on macOS, the filesystem block
685         // size is always reported as 1MB.
686         if (sd->hw_sector_size <= 0 || sector_size > STORE_BLOCK_SIZE) {
687           Note("resetting hardware sector size from %d to %d", sector_size, STORE_BLOCK_SIZE);
688           sector_size = STORE_BLOCK_SIZE;
689         }
690         sector_sizes[gndisks] = sector_size;
691         fds[gndisks]          = fd;
692         sds[gndisks]          = sd;
693         fd                    = -1;
694         gndisks++;
695       }
696     } else {
697       if (errno == EINVAL) {
698         Warning("cache unable to open '%s': It must be placed on a file system that supports direct I/O.", paths[gndisks]);
699       } else {
700         Warning("cache unable to open '%s': %s", paths[gndisks], strerror(errno));
701       }
702     }
703     if (fd >= 0) {
704       close(fd);
705     }
706   }
707 
708   // Before we kick off asynchronous operations, make sure sufficient disks are available and we don't just shutdown
709   // Exiting with background threads in operation will likely cause a seg fault
710   start_done = 1;
711 
712   if (gndisks == 0) {
713     CacheProcessor::initialized = CACHE_INIT_FAILED;
714     // Have to do this here because no IO events were scheduled and so @c diskInitialized() won't be called.
715     if (cb_after_init) {
716       cb_after_init();
717     }
718 
719     if (this->waitForCache() > 1) {
720       Emergency("Cache initialization failed - no disks available but cache required");
721     } else {
722       Warning("unable to open cache disk(s): Cache Disabled\n");
723       return -1; // pointless, AFAICT this is ignored.
724     }
725   } else if (this->waitForCache() == 3 && static_cast<unsigned int>(gndisks) < theCacheStore.n_disks_in_config) {
726     CacheProcessor::initialized = CACHE_INIT_FAILED;
727     if (cb_after_init) {
728       cb_after_init();
729     }
730     Emergency("Cache initialization failed - only %d out of %d disks were valid and all were required.", gndisks,
731               theCacheStore.n_disks_in_config);
732   } else if (this->waitForCache() == 2 && static_cast<unsigned int>(gndisks) < theCacheStore.n_disks_in_config) {
733     Warning("Cache initialization incomplete - only %d out of %d disks were valid.", gndisks, theCacheStore.n_disks_in_config);
734   }
735 
736   // If we got here, we have enough disks to proceed
737   for (int j = 0; j < gndisks; j++) {
738     Span *sd = sds[j];
739     ink_release_assert(sds[j] != nullptr); // Defeat clang-analyzer
740     off_t skip     = ROUND_TO_STORE_BLOCK((sd->offset < START_POS ? START_POS + sd->alignment : sd->offset));
741     int64_t blocks = sd->blocks - (skip >> STORE_BLOCK_SHIFT);
742 #if AIO_MODE == AIO_MODE_NATIVE
743     eventProcessor.schedule_imm(new DiskInit(gdisks[j], paths[j], blocks, skip, sector_sizes[j], fds[j], clear));
744 #else
745     gdisks[j]->open(paths[j], blocks, skip, sector_sizes[j], fds[j], clear);
746 #endif
747 
748     Debug("cache_hosting", "Disk: %d:%s, blocks: %" PRId64 "", gndisks, paths[j], blocks);
749   }
750 
751   return 0;
752 }
753 
754 void
diskInitialized()755 CacheProcessor::diskInitialized()
756 {
757   int n_init    = initialize_disk++;
758   int bad_disks = 0;
759   int res       = 0;
760   int i;
761 
762   // Wait for all the cache disks are initialized
763   if (n_init != gndisks - 1) {
764     return;
765   }
766 
767   // Check and remove bad disks from gdisks[]
768   for (i = 0; i < gndisks; i++) {
769     if (DISK_BAD(gdisks[i])) {
770       delete gdisks[i];
771       gdisks[i] = nullptr;
772       bad_disks++;
773     } else if (bad_disks > 0) {
774       gdisks[i - bad_disks] = gdisks[i];
775       gdisks[i]             = nullptr;
776     }
777   }
778   if (bad_disks > 0) {
779     // Update the number of available cache disks
780     gndisks -= bad_disks;
781     // Check if this is a fatal error
782     if (this->waitForCache() == 3 || (0 == gndisks && this->waitForCache() == 2)) {
783       // This could be passed off to @c cacheInitialized (as with volume config problems) but I think
784       // the more specific error message here is worth the extra code.
785       CacheProcessor::initialized = CACHE_INIT_FAILED;
786       if (cb_after_init) {
787         cb_after_init();
788       }
789       Emergency("Cache initialization failed - only %d of %d disks were available.", gndisks, theCacheStore.n_disks_in_config);
790     } else if (this->waitForCache() == 2) {
791       Warning("Cache initialization incomplete - only %d of %d disks were available.", gndisks, theCacheStore.n_disks_in_config);
792     }
793   }
794 
795   /* Practically just took all bad_disks offline so update the stats. */
796   RecSetGlobalRawStatSum(cache_rsb, cache_span_offline_stat, bad_disks);
797   RecIncrGlobalRawStat(cache_rsb, cache_span_failing_stat, -bad_disks);
798   RecSetGlobalRawStatSum(cache_rsb, cache_span_online_stat, gndisks);
799 
800   /* create the cachevol list only if num volumes are greater than 0. */
801   if (config_volumes.num_volumes == 0) {
802     /* if no volumes, default to just an http cache */
803     res = cplist_reconfigure();
804   } else {
805     // else
806     /* create the cachevol list. */
807     cplist_init();
808     /* now change the cachevol list based on the config file */
809     res = cplist_reconfigure();
810   }
811 
812   if (res == -1) {
813     /* problems initializing the volume.config. Punt */
814     gnvol = 0;
815     cacheInitialized();
816     return;
817   } else {
818     CacheVol *cp = cp_list.head;
819     for (; cp; cp = cp->link.next) {
820       cp->vol_rsb = RecAllocateRawStatBlock(static_cast<int>(cache_stat_count));
821       char vol_stat_str_prefix[256];
822       snprintf(vol_stat_str_prefix, sizeof(vol_stat_str_prefix), "proxy.process.cache.volume_%d", cp->vol_number);
823       register_cache_stats(cp->vol_rsb, vol_stat_str_prefix);
824     }
825   }
826 
827   gvol = static_cast<Vol **>(ats_malloc(gnvol * sizeof(Vol *)));
828   memset(gvol, 0, gnvol * sizeof(Vol *));
829   gnvol = 0;
830   for (i = 0; i < gndisks; i++) {
831     CacheDisk *d = gdisks[i];
832     if (is_debug_tag_set("cache_hosting")) {
833       int j;
834       Debug("cache_hosting", "Disk: %d:%s: Vol Blocks: %u: Free space: %" PRIu64, i, d->path, d->header->num_diskvol_blks,
835             d->free_space);
836       for (j = 0; j < static_cast<int>(d->header->num_volumes); j++) {
837         Debug("cache_hosting", "\tVol: %d Size: %" PRIu64, d->disk_vols[j]->vol_number, d->disk_vols[j]->size);
838       }
839       for (j = 0; j < static_cast<int>(d->header->num_diskvol_blks); j++) {
840         Debug("cache_hosting", "\tBlock No: %d Size: %" PRIu64 " Free: %u", d->header->vol_info[j].number,
841               d->header->vol_info[j].len, d->header->vol_info[j].free);
842       }
843     }
844     if (!check) {
845       d->sync();
846     }
847   }
848   if (config_volumes.num_volumes == 0) {
849     theCache         = new Cache();
850     theCache->scheme = CACHE_HTTP_TYPE;
851     theCache->open(clear, fix);
852     return;
853   }
854   if (config_volumes.num_http_volumes != 0) {
855     theCache         = new Cache();
856     theCache->scheme = CACHE_HTTP_TYPE;
857     theCache->open(clear, fix);
858   }
859 }
860 
861 void
cacheInitialized()862 CacheProcessor::cacheInitialized()
863 {
864   int i;
865 
866   if (theCache && (theCache->ready == CACHE_INITIALIZING)) {
867     return;
868   }
869 
870   int caches_ready  = 0;
871   int cache_init_ok = 0;
872   /* allocate ram size in proportion to the disk space the
873      volume occupies */
874   int64_t total_size             = 0; // count in HTTP & MIXT
875   uint64_t total_cache_bytes     = 0; // bytes that can used in total_size
876   uint64_t total_direntries      = 0; // all the direntries in the cache
877   uint64_t used_direntries       = 0; //   and used
878   uint64_t vol_total_cache_bytes = 0;
879   uint64_t vol_total_direntries  = 0;
880   uint64_t vol_used_direntries   = 0;
881   Vol *vol;
882 
883   ProxyMutex *mutex = this_ethread()->mutex.get();
884 
885   if (theCache) {
886     total_size += theCache->cache_size;
887     Debug("cache_init", "CacheProcessor::cacheInitialized - theCache, total_size = %" PRId64 " = %" PRId64 " MB", total_size,
888           total_size / ((1024 * 1024) / STORE_BLOCK_SIZE));
889     if (theCache->ready == CACHE_INIT_FAILED) {
890       Debug("cache_init", "CacheProcessor::cacheInitialized - failed to initialize the cache for http: cache disabled");
891       Warning("failed to initialize the cache for http: cache disabled\n");
892     } else {
893       caches_ready                 = caches_ready | (1 << CACHE_FRAG_TYPE_HTTP);
894       caches_ready                 = caches_ready | (1 << CACHE_FRAG_TYPE_NONE);
895       caches[CACHE_FRAG_TYPE_HTTP] = theCache;
896       caches[CACHE_FRAG_TYPE_NONE] = theCache;
897     }
898   }
899 
900   // Update stripe version data.
901   if (gnvol) { // start with whatever the first stripe is.
902     cacheProcessor.min_stripe_version = cacheProcessor.max_stripe_version = gvol[0]->header->version;
903   }
904   // scan the rest of the stripes.
905   for (i = 1; i < gnvol; i++) {
906     Vol *v = gvol[i];
907     if (v->header->version < cacheProcessor.min_stripe_version) {
908       cacheProcessor.min_stripe_version = v->header->version;
909     }
910     if (cacheProcessor.max_stripe_version < v->header->version) {
911       cacheProcessor.max_stripe_version = v->header->version;
912     }
913   }
914 
915   if (caches_ready) {
916     Debug("cache_init", "CacheProcessor::cacheInitialized - caches_ready=0x%0X, gnvol=%d", (unsigned int)caches_ready,
917           gnvol.load());
918 
919     int64_t ram_cache_bytes = 0;
920 
921     if (gnvol) {
922       // new ram_caches, with algorithm from the config
923       for (i = 0; i < gnvol; i++) {
924         switch (cache_config_ram_cache_algorithm) {
925         default:
926         case RAM_CACHE_ALGORITHM_CLFUS:
927           gvol[i]->ram_cache = new_RamCacheCLFUS();
928           break;
929         case RAM_CACHE_ALGORITHM_LRU:
930           gvol[i]->ram_cache = new_RamCacheLRU();
931           break;
932         }
933       }
934       // let us calculate the Size
935       if (cache_config_ram_cache_size == AUTO_SIZE_RAM_CACHE) {
936         Debug("cache_init", "CacheProcessor::cacheInitialized - cache_config_ram_cache_size == AUTO_SIZE_RAM_CACHE");
937         for (i = 0; i < gnvol; i++) {
938           vol = gvol[i];
939 
940           if (gvol[i]->cache_vol->ramcache_enabled) {
941             gvol[i]->ram_cache->init(vol->dirlen() * DEFAULT_RAM_CACHE_MULTIPLIER, vol);
942             ram_cache_bytes += gvol[i]->dirlen();
943             Debug("cache_init", "CacheProcessor::cacheInitialized - ram_cache_bytes = %" PRId64 " = %" PRId64 "Mb", ram_cache_bytes,
944                   ram_cache_bytes / (1024 * 1024));
945             CACHE_VOL_SUM_DYN_STAT(cache_ram_cache_bytes_total_stat, (int64_t)gvol[i]->dirlen());
946           }
947           vol_total_cache_bytes = gvol[i]->len - gvol[i]->dirlen();
948           total_cache_bytes += vol_total_cache_bytes;
949           Debug("cache_init", "CacheProcessor::cacheInitialized - total_cache_bytes = %" PRId64 " = %" PRId64 "Mb",
950                 total_cache_bytes, total_cache_bytes / (1024 * 1024));
951 
952           CACHE_VOL_SUM_DYN_STAT(cache_bytes_total_stat, vol_total_cache_bytes);
953 
954           vol_total_direntries = gvol[i]->buckets * gvol[i]->segments * DIR_DEPTH;
955           total_direntries += vol_total_direntries;
956           CACHE_VOL_SUM_DYN_STAT(cache_direntries_total_stat, vol_total_direntries);
957 
958           vol_used_direntries = dir_entries_used(gvol[i]);
959           CACHE_VOL_SUM_DYN_STAT(cache_direntries_used_stat, vol_used_direntries);
960           used_direntries += vol_used_direntries;
961         }
962 
963       } else {
964         // we got configured memory size
965         // TODO, should we check the available system memories, or you will
966         //   OOM or swapout, that is not a good situation for the server
967         Debug("cache_init", "CacheProcessor::cacheInitialized - %" PRId64 " != AUTO_SIZE_RAM_CACHE", cache_config_ram_cache_size);
968         int64_t http_ram_cache_size =
969           (theCache) ?
970             static_cast<int64_t>((static_cast<double>(theCache->cache_size) / total_size) * cache_config_ram_cache_size) :
971             0;
972         Debug("cache_init", "CacheProcessor::cacheInitialized - http_ram_cache_size = %" PRId64 " = %" PRId64 "Mb",
973               http_ram_cache_size, http_ram_cache_size / (1024 * 1024));
974         int64_t stream_ram_cache_size = cache_config_ram_cache_size - http_ram_cache_size;
975         Debug("cache_init", "CacheProcessor::cacheInitialized - stream_ram_cache_size = %" PRId64 " = %" PRId64 "Mb",
976               stream_ram_cache_size, stream_ram_cache_size / (1024 * 1024));
977 
978         // Dump some ram_cache size information in debug mode.
979         Debug("ram_cache", "config: size = %" PRId64 ", cutoff = %" PRId64 "", cache_config_ram_cache_size,
980               cache_config_ram_cache_cutoff);
981 
982         for (i = 0; i < gnvol; i++) {
983           vol = gvol[i];
984           double factor;
985           if (gvol[i]->cache == theCache && gvol[i]->cache_vol->ramcache_enabled) {
986             ink_assert(gvol[i]->cache != nullptr);
987             factor = static_cast<double>(static_cast<int64_t>(gvol[i]->len >> STORE_BLOCK_SHIFT)) / theCache->cache_size;
988             Debug("cache_init", "CacheProcessor::cacheInitialized - factor = %f", factor);
989             gvol[i]->ram_cache->init(static_cast<int64_t>(http_ram_cache_size * factor), vol);
990             ram_cache_bytes += static_cast<int64_t>(http_ram_cache_size * factor);
991             CACHE_VOL_SUM_DYN_STAT(cache_ram_cache_bytes_total_stat, (int64_t)(http_ram_cache_size * factor));
992           } else if (gvol[i]->cache_vol->ramcache_enabled) {
993             ink_release_assert(!"Unexpected non-HTTP cache volume");
994           }
995           Debug("cache_init", "CacheProcessor::cacheInitialized[%d] - ram_cache_bytes = %" PRId64 " = %" PRId64 "Mb", i,
996                 ram_cache_bytes, ram_cache_bytes / (1024 * 1024));
997           vol_total_cache_bytes = gvol[i]->len - gvol[i]->dirlen();
998           total_cache_bytes += vol_total_cache_bytes;
999           CACHE_VOL_SUM_DYN_STAT(cache_bytes_total_stat, vol_total_cache_bytes);
1000           Debug("cache_init", "CacheProcessor::cacheInitialized - total_cache_bytes = %" PRId64 " = %" PRId64 "Mb",
1001                 total_cache_bytes, total_cache_bytes / (1024 * 1024));
1002 
1003           vol_total_direntries = gvol[i]->buckets * gvol[i]->segments * DIR_DEPTH;
1004           total_direntries += vol_total_direntries;
1005           CACHE_VOL_SUM_DYN_STAT(cache_direntries_total_stat, vol_total_direntries);
1006 
1007           vol_used_direntries = dir_entries_used(gvol[i]);
1008           CACHE_VOL_SUM_DYN_STAT(cache_direntries_used_stat, vol_used_direntries);
1009           used_direntries += vol_used_direntries;
1010         }
1011       }
1012       switch (cache_config_ram_cache_compress) {
1013       default:
1014         Fatal("unknown RAM cache compression type: %d", cache_config_ram_cache_compress);
1015       case CACHE_COMPRESSION_NONE:
1016       case CACHE_COMPRESSION_FASTLZ:
1017         break;
1018       case CACHE_COMPRESSION_LIBZ:
1019 #ifndef HAVE_ZLIB_H
1020         Fatal("libz not available for RAM cache compression");
1021 #endif
1022         break;
1023       case CACHE_COMPRESSION_LIBLZMA:
1024 #ifndef HAVE_LZMA_H
1025         Fatal("lzma not available for RAM cache compression");
1026 #endif
1027         break;
1028       }
1029 
1030       GLOBAL_CACHE_SET_DYN_STAT(cache_ram_cache_bytes_total_stat, ram_cache_bytes);
1031       GLOBAL_CACHE_SET_DYN_STAT(cache_bytes_total_stat, total_cache_bytes);
1032       GLOBAL_CACHE_SET_DYN_STAT(cache_direntries_total_stat, total_direntries);
1033       GLOBAL_CACHE_SET_DYN_STAT(cache_direntries_used_stat, used_direntries);
1034       if (!check) {
1035         dir_sync_init();
1036       }
1037       cache_init_ok = 1;
1038     } else {
1039       Warning("cache unable to open any vols, disabled");
1040     }
1041   }
1042   if (cache_init_ok) {
1043     // Initialize virtual cache
1044     CacheProcessor::initialized = CACHE_INITIALIZED;
1045     CacheProcessor::cache_ready = caches_ready;
1046     Note("cache enabled");
1047   } else {
1048     CacheProcessor::initialized = CACHE_INIT_FAILED;
1049     Note("cache disabled");
1050   }
1051 
1052   // Fire callback to signal initialization finished.
1053   if (cb_after_init) {
1054     cb_after_init();
1055   }
1056 
1057   // TS-3848
1058   if (CACHE_INIT_FAILED == CacheProcessor::initialized && cacheProcessor.waitForCache() > 1) {
1059     Emergency("Cache initialization failed with cache required, exiting.");
1060   }
1061 }
1062 
1063 void
stop()1064 CacheProcessor::stop()
1065 {
1066 }
1067 
1068 int
dir_check(bool afix)1069 CacheProcessor::dir_check(bool afix)
1070 {
1071   for (int i = 0; i < gnvol; i++) {
1072     gvol[i]->dir_check(afix);
1073   }
1074   return 0;
1075 }
1076 
1077 int
db_check(bool afix)1078 CacheProcessor::db_check(bool afix)
1079 {
1080   for (int i = 0; i < gnvol; i++) {
1081     gvol[i]->db_check(afix);
1082   }
1083   return 0;
1084 }
1085 
1086 Action *
lookup(Continuation * cont,const CacheKey * key,CacheFragType frag_type,const char * hostname,int host_len)1087 CacheProcessor::lookup(Continuation *cont, const CacheKey *key, CacheFragType frag_type, const char *hostname, int host_len)
1088 {
1089   return caches[frag_type]->lookup(cont, key, frag_type, hostname, host_len);
1090 }
1091 
1092 inkcoreapi Action *
open_read(Continuation * cont,const CacheKey * key,CacheFragType frag_type,const char * hostname,int hostlen)1093 CacheProcessor::open_read(Continuation *cont, const CacheKey *key, CacheFragType frag_type, const char *hostname, int hostlen)
1094 {
1095   return caches[frag_type]->open_read(cont, key, frag_type, hostname, hostlen);
1096 }
1097 
1098 inkcoreapi Action *
open_write(Continuation * cont,CacheKey * key,CacheFragType frag_type,int expected_size ATS_UNUSED,int options,time_t pin_in_cache,char * hostname,int host_len)1099 CacheProcessor::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type, int expected_size ATS_UNUSED, int options,
1100                            time_t pin_in_cache, char *hostname, int host_len)
1101 {
1102   return caches[frag_type]->open_write(cont, key, frag_type, options, pin_in_cache, hostname, host_len);
1103 }
1104 
1105 Action *
remove(Continuation * cont,const CacheKey * key,CacheFragType frag_type,const char * hostname,int host_len)1106 CacheProcessor::remove(Continuation *cont, const CacheKey *key, CacheFragType frag_type, const char *hostname, int host_len)
1107 {
1108   Debug("cache_remove", "[CacheProcessor::remove] Issuing cache delete for %u", cache_hash(*key));
1109   return caches[frag_type]->remove(cont, key, frag_type, hostname, host_len);
1110 }
1111 
1112 Action *
lookup(Continuation * cont,const HttpCacheKey * key,CacheFragType frag_type)1113 CacheProcessor::lookup(Continuation *cont, const HttpCacheKey *key, CacheFragType frag_type)
1114 {
1115   return lookup(cont, &key->hash, frag_type, key->hostname, key->hostlen);
1116 }
1117 
1118 Action *
scan(Continuation * cont,char * hostname,int host_len,int KB_per_second)1119 CacheProcessor::scan(Continuation *cont, char *hostname, int host_len, int KB_per_second)
1120 {
1121   return caches[CACHE_FRAG_TYPE_HTTP]->scan(cont, hostname, host_len, KB_per_second);
1122 }
1123 
1124 int
IsCacheEnabled()1125 CacheProcessor::IsCacheEnabled()
1126 {
1127   return CacheProcessor::initialized;
1128 }
1129 
1130 bool
IsCacheReady(CacheFragType type)1131 CacheProcessor::IsCacheReady(CacheFragType type)
1132 {
1133   if (IsCacheEnabled() != CACHE_INITIALIZED) {
1134     return false;
1135   }
1136   return static_cast<bool>(cache_ready & (1 << type));
1137 }
1138 
1139 int
db_check(bool)1140 Vol::db_check(bool /* fix ATS_UNUSED */)
1141 {
1142   char tt[256];
1143   printf("    Data for [%s]\n", hash_text.get());
1144   printf("        Length:          %" PRIu64 "\n", static_cast<uint64_t>(len));
1145   printf("        Write Position:  %" PRIu64 "\n", static_cast<uint64_t>(header->write_pos - skip));
1146   printf("        Phase:           %d\n", static_cast<int>(!!header->phase));
1147   ink_ctime_r(&header->create_time, tt);
1148   tt[strlen(tt) - 1] = 0;
1149   printf("        Create Time:     %s\n", tt);
1150   printf("        Sync Serial:     %u\n", static_cast<unsigned int>(header->sync_serial));
1151   printf("        Write Serial:    %u\n", static_cast<unsigned int>(header->write_serial));
1152   printf("\n");
1153 
1154   return 0;
1155 }
1156 
1157 static void
vol_init_data_internal(Vol * d)1158 vol_init_data_internal(Vol *d)
1159 {
1160   // step1: calculate the number of entries.
1161   off_t total_entries = (d->len - (d->start - d->skip)) / cache_config_min_average_object_size;
1162   // step2: calculate the number of buckets
1163   off_t total_buckets = total_entries / DIR_DEPTH;
1164   // step3: calculate the number of segments, no segment has more than 16384 buckets
1165   d->segments = (total_buckets + (((1 << 16) - 1) / DIR_DEPTH)) / ((1 << 16) / DIR_DEPTH);
1166   // step4: divide total_buckets into segments on average.
1167   d->buckets = (total_buckets + d->segments - 1) / d->segments;
1168   // step5: set the start pointer.
1169   d->start = d->skip + 2 * d->dirlen();
1170 }
1171 
1172 static void
vol_init_data(Vol * d)1173 vol_init_data(Vol *d)
1174 {
1175   // iteratively calculate start + buckets
1176   vol_init_data_internal(d);
1177   vol_init_data_internal(d);
1178   vol_init_data_internal(d);
1179 }
1180 
1181 void
vol_init_dir(Vol * d)1182 vol_init_dir(Vol *d)
1183 {
1184   int b, s, l;
1185 
1186   for (s = 0; s < d->segments; s++) {
1187     d->header->freelist[s] = 0;
1188     Dir *seg               = d->dir_segment(s);
1189     for (l = 1; l < DIR_DEPTH; l++) {
1190       for (b = 0; b < d->buckets; b++) {
1191         Dir *bucket = dir_bucket(b, seg);
1192         dir_free_entry(dir_bucket_row(bucket, l), s, d);
1193       }
1194     }
1195   }
1196 }
1197 
1198 void
vol_clear_init(Vol * d)1199 vol_clear_init(Vol *d)
1200 {
1201   size_t dir_len = d->dirlen();
1202   memset(d->raw_dir, 0, dir_len);
1203   vol_init_dir(d);
1204   d->header->magic          = VOL_MAGIC;
1205   d->header->version._major = CACHE_DB_MAJOR_VERSION;
1206   d->header->version._minor = CACHE_DB_MINOR_VERSION;
1207   d->scan_pos = d->header->agg_pos = d->header->write_pos = d->start;
1208   d->header->last_write_pos                               = d->header->write_pos;
1209   d->header->phase                                        = 0;
1210   d->header->cycle                                        = 0;
1211   d->header->create_time                                  = time(nullptr);
1212   d->header->dirty                                        = 0;
1213   d->sector_size = d->header->sector_size = d->disk->hw_sector_size;
1214   *d->footer                              = *d->header;
1215 }
1216 
1217 int
vol_dir_clear(Vol * d)1218 vol_dir_clear(Vol *d)
1219 {
1220   size_t dir_len = d->dirlen();
1221   vol_clear_init(d);
1222 
1223   if (pwrite(d->fd, d->raw_dir, dir_len, d->skip) < 0) {
1224     Warning("unable to clear cache directory '%s'", d->hash_text.get());
1225     return -1;
1226   }
1227   return 0;
1228 }
1229 
1230 int
clear_dir()1231 Vol::clear_dir()
1232 {
1233   size_t dir_len = this->dirlen();
1234   vol_clear_init(this);
1235 
1236   SET_HANDLER(&Vol::handle_dir_clear);
1237 
1238   io.aiocb.aio_fildes = fd;
1239   io.aiocb.aio_buf    = raw_dir;
1240   io.aiocb.aio_nbytes = dir_len;
1241   io.aiocb.aio_offset = skip;
1242   io.action           = this;
1243   io.thread           = AIO_CALLBACK_THREAD_ANY;
1244   io.then             = nullptr;
1245   ink_assert(ink_aio_write(&io));
1246   return 0;
1247 }
1248 
1249 int
init(char * s,off_t blocks,off_t dir_skip,bool clear)1250 Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear)
1251 {
1252   char *seed_str              = disk->hash_base_string ? disk->hash_base_string : s;
1253   const size_t hash_seed_size = strlen(seed_str);
1254   const size_t hash_text_size = hash_seed_size + 32;
1255 
1256   hash_text = static_cast<char *>(ats_malloc(hash_text_size));
1257   ink_strlcpy(hash_text, seed_str, hash_text_size);
1258   snprintf(hash_text + hash_seed_size, (hash_text_size - hash_seed_size), " %" PRIu64 ":%" PRIu64 "",
1259            static_cast<uint64_t>(dir_skip), static_cast<uint64_t>(blocks));
1260   CryptoContext().hash_immediate(hash_id, hash_text, strlen(hash_text));
1261 
1262   dir_skip = ROUND_TO_STORE_BLOCK((dir_skip < START_POS ? START_POS : dir_skip));
1263   path     = ats_strdup(s);
1264   len      = blocks * STORE_BLOCK_SIZE;
1265   ink_assert(len <= MAX_VOL_SIZE);
1266   skip             = dir_skip;
1267   prev_recover_pos = 0;
1268 
1269   // successive approximation, directory/meta data eats up some storage
1270   start = dir_skip;
1271   vol_init_data(this);
1272   data_blocks         = (len - (start - skip)) / STORE_BLOCK_SIZE;
1273   hit_evacuate_window = (data_blocks * cache_config_hit_evacuate_percent) / 100;
1274 
1275   evacuate_size = static_cast<int>(len / EVACUATION_BUCKET_SIZE) + 2;
1276   int evac_len  = evacuate_size * sizeof(DLL<EvacuationBlock>);
1277   evacuate      = static_cast<DLL<EvacuationBlock> *>(ats_malloc(evac_len));
1278   memset(static_cast<void *>(evacuate), 0, evac_len);
1279 
1280   Debug("cache_init", "Vol %s: allocating %zu directory bytes for a %lld byte volume (%lf%%)", hash_text.get(), dirlen(),
1281         (long long)this->len, (double)dirlen() / (double)this->len * 100.0);
1282 
1283   raw_dir = nullptr;
1284   if (ats_hugepage_enabled()) {
1285     raw_dir = static_cast<char *>(ats_alloc_hugepage(this->dirlen()));
1286   }
1287   if (raw_dir == nullptr) {
1288     raw_dir = static_cast<char *>(ats_memalign(ats_pagesize(), this->dirlen()));
1289   }
1290 
1291   dir    = reinterpret_cast<Dir *>(raw_dir + this->headerlen());
1292   header = reinterpret_cast<VolHeaderFooter *>(raw_dir);
1293   footer = reinterpret_cast<VolHeaderFooter *>(raw_dir + this->dirlen() - ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter)));
1294 
1295   if (clear) {
1296     Note("clearing cache directory '%s'", hash_text.get());
1297     return clear_dir();
1298   }
1299 
1300   init_info           = new VolInitInfo();
1301   int footerlen       = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
1302   off_t footer_offset = this->dirlen() - footerlen;
1303   // try A
1304   off_t as = skip;
1305 
1306   Debug("cache_init", "reading directory '%s'", hash_text.get());
1307   SET_HANDLER(&Vol::handle_header_read);
1308   init_info->vol_aio[0].aiocb.aio_offset = as;
1309   init_info->vol_aio[1].aiocb.aio_offset = as + footer_offset;
1310   off_t bs                               = skip + this->dirlen();
1311   init_info->vol_aio[2].aiocb.aio_offset = bs;
1312   init_info->vol_aio[3].aiocb.aio_offset = bs + footer_offset;
1313 
1314   for (unsigned i = 0; i < countof(init_info->vol_aio); i++) {
1315     AIOCallback *aio      = &(init_info->vol_aio[i]);
1316     aio->aiocb.aio_fildes = fd;
1317     aio->aiocb.aio_buf    = &(init_info->vol_h_f[i * STORE_BLOCK_SIZE]);
1318     aio->aiocb.aio_nbytes = footerlen;
1319     aio->action           = this;
1320     aio->thread           = AIO_CALLBACK_THREAD_ANY;
1321     aio->then             = (i < 3) ? &(init_info->vol_aio[i + 1]) : nullptr;
1322   }
1323 #if AIO_MODE == AIO_MODE_NATIVE
1324   ink_assert(ink_aio_readv(init_info->vol_aio));
1325 #else
1326   ink_assert(ink_aio_read(init_info->vol_aio));
1327 #endif
1328   return 0;
1329 }
1330 
1331 int
handle_dir_clear(int event,void * data)1332 Vol::handle_dir_clear(int event, void *data)
1333 {
1334   size_t dir_len = this->dirlen();
1335   AIOCallback *op;
1336 
1337   if (event == AIO_EVENT_DONE) {
1338     op = static_cast<AIOCallback *>(data);
1339     if (static_cast<size_t>(op->aio_result) != op->aiocb.aio_nbytes) {
1340       Warning("unable to clear cache directory '%s'", hash_text.get());
1341       disk->incrErrors(op);
1342       fd = -1;
1343     }
1344 
1345     if (op->aiocb.aio_nbytes == dir_len) {
1346       /* clear the header for directory B. We don't need to clear the
1347          whole of directory B. The header for directory B starts at
1348          skip + len */
1349       op->aiocb.aio_nbytes = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
1350       op->aiocb.aio_offset = skip + dir_len;
1351       ink_assert(ink_aio_write(op));
1352       return EVENT_DONE;
1353     }
1354     set_io_not_in_progress();
1355     SET_HANDLER(&Vol::dir_init_done);
1356     dir_init_done(EVENT_IMMEDIATE, nullptr);
1357     /* mark the volume as bad */
1358   }
1359   return EVENT_DONE;
1360 }
1361 
1362 int
handle_dir_read(int event,void * data)1363 Vol::handle_dir_read(int event, void *data)
1364 {
1365   AIOCallback *op = static_cast<AIOCallback *>(data);
1366 
1367   if (event == AIO_EVENT_DONE) {
1368     if (static_cast<size_t>(op->aio_result) != op->aiocb.aio_nbytes) {
1369       Note("Directory read failed: clearing cache directory %s", this->hash_text.get());
1370       clear_dir();
1371       return EVENT_DONE;
1372     }
1373   }
1374 
1375   if (!(header->magic == VOL_MAGIC && footer->magic == VOL_MAGIC && CACHE_DB_MAJOR_VERSION_COMPATIBLE <= header->version._major &&
1376         header->version._major <= CACHE_DB_MAJOR_VERSION)) {
1377     Warning("bad footer in cache directory for '%s', clearing", hash_text.get());
1378     Note("VOL_MAGIC %d\n header magic: %d\n footer_magic %d\n CACHE_DB_MAJOR_VERSION_COMPATIBLE %d\n major version %d\n"
1379          "CACHE_DB_MAJOR_VERSION %d\n",
1380          VOL_MAGIC, header->magic, footer->magic, CACHE_DB_MAJOR_VERSION_COMPATIBLE, header->version._major,
1381          CACHE_DB_MAJOR_VERSION);
1382     Note("clearing cache directory '%s'", hash_text.get());
1383     clear_dir();
1384     return EVENT_DONE;
1385   }
1386   CHECK_DIR(this);
1387 
1388   sector_size = header->sector_size;
1389 
1390   return this->recover_data();
1391 }
1392 
1393 int
recover_data()1394 Vol::recover_data()
1395 {
1396   SET_HANDLER(&Vol::handle_recover_from_data);
1397   return handle_recover_from_data(EVENT_IMMEDIATE, nullptr);
1398 }
1399 
1400 /*
1401    Philosophy:  The idea is to find the region of disk that could be
1402    inconsistent and remove all directory entries pointing to that potentially
1403    inconsistent region.
1404    Start from a consistent position (the write_pos of the last directory
1405    synced to disk) and scan forward. Two invariants for docs that were
1406    written to the disk after the directory was synced:
1407 
1408    1. doc->magic == DOC_MAGIC
1409 
1410    The following two cases happen only when the previous generation
1411    documents are aligned with the current ones.
1412 
1413    2. All the docs written to the disk
1414    after the directory was synced will have their sync_serial <=
1415    header->sync_serial + 1,  because the write aggregation can take
1416    indeterminate amount of time to sync. The doc->sync_serial can be
1417    equal to header->sync_serial + 1, because we increment the sync_serial
1418    before we sync the directory to disk.
1419 
1420    3. The doc->sync_serial will always increase. If doc->sync_serial
1421    decreases, the document was written in the previous phase
1422 
1423    If either of these conditions fail and we are not too close to the end
1424    (see the next comment ) then we're done
1425 
1426    We actually start from header->last_write_pos instead of header->write_pos
1427    to make sure that we haven't wrapped around the whole disk without
1428    syncing the directory.  Since the sync serial is 60 seconds, it is
1429    entirely possible to write through the whole cache without
1430    once syncing the directory. In this case, we need to clear the
1431    cache.The documents written right before we synced the
1432    directory to disk should have the write_serial <= header->sync_serial.
1433 
1434       */
1435 
1436 int
handle_recover_from_data(int event,void *)1437 Vol::handle_recover_from_data(int event, void * /* data ATS_UNUSED */)
1438 {
1439   uint32_t got_len         = 0;
1440   uint32_t max_sync_serial = header->sync_serial;
1441   char *s, *e;
1442   if (event == EVENT_IMMEDIATE) {
1443     if (header->sync_serial == 0) {
1444       io.aiocb.aio_buf = nullptr;
1445       SET_HANDLER(&Vol::handle_recover_write_dir);
1446       return handle_recover_write_dir(EVENT_IMMEDIATE, nullptr);
1447     }
1448     // initialize
1449     recover_wrapped   = false;
1450     last_sync_serial  = 0;
1451     last_write_serial = 0;
1452     recover_pos       = header->last_write_pos;
1453     if (recover_pos >= skip + len) {
1454       recover_wrapped = true;
1455       recover_pos     = start;
1456     }
1457     io.aiocb.aio_buf    = static_cast<char *>(ats_memalign(ats_pagesize(), RECOVERY_SIZE));
1458     io.aiocb.aio_nbytes = RECOVERY_SIZE;
1459     if (static_cast<off_t>(recover_pos + io.aiocb.aio_nbytes) > static_cast<off_t>(skip + len)) {
1460       io.aiocb.aio_nbytes = (skip + len) - recover_pos;
1461     }
1462   } else if (event == AIO_EVENT_DONE) {
1463     if (io.aiocb.aio_nbytes != static_cast<size_t>(io.aio_result)) {
1464       Warning("disk read error on recover '%s', clearing", hash_text.get());
1465       disk->incrErrors(&io);
1466       goto Lclear;
1467     }
1468     if (io.aiocb.aio_offset == header->last_write_pos) {
1469       /* check that we haven't wrapped around without syncing
1470          the directory. Start from last_write_serial (write pos the documents
1471          were written to just before syncing the directory) and make sure
1472          that all documents have write_serial <= header->write_serial.
1473        */
1474       uint32_t to_check = header->write_pos - header->last_write_pos;
1475       ink_assert(to_check && to_check < (uint32_t)io.aiocb.aio_nbytes);
1476       uint32_t done = 0;
1477       s             = static_cast<char *>(io.aiocb.aio_buf);
1478       while (done < to_check) {
1479         Doc *doc = reinterpret_cast<Doc *>(s + done);
1480         if (doc->magic != DOC_MAGIC || doc->write_serial > header->write_serial) {
1481           Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
1482           goto Lclear;
1483         }
1484         done += round_to_approx_size(doc->len);
1485         if (doc->sync_serial > last_write_serial) {
1486           last_sync_serial = doc->sync_serial;
1487         }
1488       }
1489       ink_assert(done == to_check);
1490 
1491       got_len = io.aiocb.aio_nbytes - done;
1492       recover_pos += io.aiocb.aio_nbytes;
1493       s = static_cast<char *>(io.aiocb.aio_buf) + done;
1494       e = s + got_len;
1495     } else {
1496       got_len = io.aiocb.aio_nbytes;
1497       recover_pos += io.aiocb.aio_nbytes;
1498       s = static_cast<char *>(io.aiocb.aio_buf);
1499       e = s + got_len;
1500     }
1501   }
1502   // examine what we got
1503   if (got_len) {
1504     Doc *doc = nullptr;
1505 
1506     if (recover_wrapped && start == io.aiocb.aio_offset) {
1507       doc = reinterpret_cast<Doc *>(s);
1508       if (doc->magic != DOC_MAGIC || doc->write_serial < last_write_serial) {
1509         recover_pos = skip + len - EVACUATION_SIZE;
1510         goto Ldone;
1511       }
1512     }
1513 
1514     // If execution reaches here, then @c got_len > 0 and e == s + got_len therefore s < e
1515     // clang analyzer can't figure this out, so be explicit.
1516     ink_assert(s < e);
1517     while (s < e) {
1518       doc = reinterpret_cast<Doc *>(s);
1519 
1520       if (doc->magic != DOC_MAGIC || doc->sync_serial != last_sync_serial) {
1521         if (doc->magic == DOC_MAGIC) {
1522           if (doc->sync_serial > header->sync_serial) {
1523             max_sync_serial = doc->sync_serial;
1524           }
1525 
1526           /*
1527              doc->magic == DOC_MAGIC, but doc->sync_serial != last_sync_serial
1528              This might happen in the following situations
1529              1. We are starting off recovery. In this case the
1530              last_sync_serial == header->sync_serial, but the doc->sync_serial
1531              can be anywhere in the range (0, header->sync_serial + 1]
1532              If this is the case, update last_sync_serial and continue;
1533 
1534              2. A dir sync started between writing documents to the
1535              aggregation buffer and hence the doc->sync_serial went up.
1536              If the doc->sync_serial is greater than the last
1537              sync serial and less than (header->sync_serial + 2) then
1538              continue;
1539 
1540              3. If the position we are recovering from is within AGG_SIZE
1541              from the disk end, then we can't trust this document. The
1542              aggregation buffer might have been larger than the remaining space
1543              at the end and we decided to wrap around instead of writing
1544              anything at that point. In this case, wrap around and start
1545              from the beginning.
1546 
1547              If neither of these 3 cases happen, then we are indeed done.
1548 
1549            */
1550 
1551           // case 1
1552           // case 2
1553           if (doc->sync_serial > last_sync_serial && doc->sync_serial <= header->sync_serial + 1) {
1554             last_sync_serial = doc->sync_serial;
1555             s += round_to_approx_size(doc->len);
1556             continue;
1557           }
1558           // case 3 - we have already recovered some data and
1559           // (doc->sync_serial < last_sync_serial) ||
1560           // (doc->sync_serial > header->sync_serial + 1).
1561           // if we are too close to the end, wrap around
1562           else if (recover_pos - (e - s) > (skip + len) - AGG_SIZE) {
1563             recover_wrapped     = true;
1564             recover_pos         = start;
1565             io.aiocb.aio_nbytes = RECOVERY_SIZE;
1566 
1567             break;
1568           }
1569           // we are done. This doc was written in the earlier phase
1570           recover_pos -= e - s;
1571           goto Ldone;
1572         } else {
1573           // doc->magic != DOC_MAGIC
1574           // If we are in the danger zone - recover_pos is within AGG_SIZE
1575           // from the end, then wrap around
1576           recover_pos -= e - s;
1577           if (recover_pos > (skip + len) - AGG_SIZE) {
1578             recover_wrapped     = true;
1579             recover_pos         = start;
1580             io.aiocb.aio_nbytes = RECOVERY_SIZE;
1581 
1582             break;
1583           }
1584           // we ar not in the danger zone
1585           goto Ldone;
1586         }
1587       }
1588       // doc->magic == DOC_MAGIC && doc->sync_serial == last_sync_serial
1589       last_write_serial = doc->write_serial;
1590       s += round_to_approx_size(doc->len);
1591     }
1592 
1593     /* if (s > e) then we gone through RECOVERY_SIZE; we need to
1594        read more data off disk and continue recovering */
1595     if (s >= e) {
1596       /* In the last iteration, we increment s by doc->len...need to undo
1597          that change */
1598       if (s > e) {
1599         s -= round_to_approx_size(doc->len);
1600       }
1601       recover_pos -= e - s;
1602       if (recover_pos >= skip + len) {
1603         recover_wrapped = true;
1604         recover_pos     = start;
1605       }
1606       io.aiocb.aio_nbytes = RECOVERY_SIZE;
1607       if (static_cast<off_t>(recover_pos + io.aiocb.aio_nbytes) > static_cast<off_t>(skip + len)) {
1608         io.aiocb.aio_nbytes = (skip + len) - recover_pos;
1609       }
1610     }
1611   }
1612   if (recover_pos == prev_recover_pos) { // this should never happen, but if it does break the loop
1613     goto Lclear;
1614   }
1615   prev_recover_pos    = recover_pos;
1616   io.aiocb.aio_offset = recover_pos;
1617   ink_assert(ink_aio_read(&io));
1618   return EVENT_CONT;
1619 
1620 Ldone : {
1621   /* if we come back to the starting position, then we don't have to recover anything */
1622   if (recover_pos == header->write_pos && recover_wrapped) {
1623     SET_HANDLER(&Vol::handle_recover_write_dir);
1624     if (is_debug_tag_set("cache_init")) {
1625       Note("recovery wrapped around. nothing to clear\n");
1626     }
1627     return handle_recover_write_dir(EVENT_IMMEDIATE, nullptr);
1628   }
1629 
1630   recover_pos += EVACUATION_SIZE; // safely cover the max write size
1631   if (recover_pos < header->write_pos && (recover_pos + EVACUATION_SIZE >= header->write_pos)) {
1632     Debug("cache_init", "Head Pos: %" PRIu64 ", Rec Pos: %" PRIu64 ", Wrapped:%d", header->write_pos, recover_pos, recover_wrapped);
1633     Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
1634     goto Lclear;
1635   }
1636 
1637   if (recover_pos > skip + len) {
1638     recover_pos -= skip + len;
1639   }
1640   // bump sync number so it is different from that in the Doc structs
1641   uint32_t next_sync_serial = max_sync_serial + 1;
1642   // make that the next sync does not overwrite our good copy!
1643   if (!(header->sync_serial & 1) == !(next_sync_serial & 1)) {
1644     next_sync_serial++;
1645   }
1646   // clear effected portion of the cache
1647   off_t clear_start = this->offset_to_vol_offset(header->write_pos);
1648   off_t clear_end   = this->offset_to_vol_offset(recover_pos);
1649   if (clear_start <= clear_end) {
1650     dir_clear_range(clear_start, clear_end, this);
1651   } else {
1652     dir_clear_range(clear_start, DIR_OFFSET_MAX, this);
1653     dir_clear_range(1, clear_end, this);
1654   }
1655 
1656   Note("recovery clearing offsets of Vol %s : [%" PRIu64 ", %" PRIu64 "] sync_serial %d next %d\n", hash_text.get(),
1657        header->write_pos, recover_pos, header->sync_serial, next_sync_serial);
1658 
1659   footer->sync_serial = header->sync_serial = next_sync_serial;
1660 
1661   for (int i = 0; i < 3; i++) {
1662     AIOCallback *aio      = &(init_info->vol_aio[i]);
1663     aio->aiocb.aio_fildes = fd;
1664     aio->action           = this;
1665     aio->thread           = AIO_CALLBACK_THREAD_ANY;
1666     aio->then             = (i < 2) ? &(init_info->vol_aio[i + 1]) : nullptr;
1667   }
1668   int footerlen = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
1669   size_t dirlen = this->dirlen();
1670   int B         = header->sync_serial & 1;
1671   off_t ss      = skip + (B ? dirlen : 0);
1672 
1673   init_info->vol_aio[0].aiocb.aio_buf    = raw_dir;
1674   init_info->vol_aio[0].aiocb.aio_nbytes = footerlen;
1675   init_info->vol_aio[0].aiocb.aio_offset = ss;
1676   init_info->vol_aio[1].aiocb.aio_buf    = raw_dir + footerlen;
1677   init_info->vol_aio[1].aiocb.aio_nbytes = dirlen - 2 * footerlen;
1678   init_info->vol_aio[1].aiocb.aio_offset = ss + footerlen;
1679   init_info->vol_aio[2].aiocb.aio_buf    = raw_dir + dirlen - footerlen;
1680   init_info->vol_aio[2].aiocb.aio_nbytes = footerlen;
1681   init_info->vol_aio[2].aiocb.aio_offset = ss + dirlen - footerlen;
1682 
1683   SET_HANDLER(&Vol::handle_recover_write_dir);
1684 #if AIO_MODE == AIO_MODE_NATIVE
1685   ink_assert(ink_aio_writev(init_info->vol_aio));
1686 #else
1687   ink_assert(ink_aio_write(init_info->vol_aio));
1688 #endif
1689   return EVENT_CONT;
1690 }
1691 
1692 Lclear:
1693   free(static_cast<char *>(io.aiocb.aio_buf));
1694   delete init_info;
1695   init_info = nullptr;
1696   clear_dir();
1697   return EVENT_CONT;
1698 }
1699 
1700 int
handle_recover_write_dir(int,void *)1701 Vol::handle_recover_write_dir(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1702 {
1703   if (io.aiocb.aio_buf) {
1704     free(static_cast<char *>(io.aiocb.aio_buf));
1705   }
1706   delete init_info;
1707   init_info = nullptr;
1708   set_io_not_in_progress();
1709   scan_pos = header->write_pos;
1710   periodic_scan();
1711   SET_HANDLER(&Vol::dir_init_done);
1712   return dir_init_done(EVENT_IMMEDIATE, nullptr);
1713 }
1714 
1715 int
handle_header_read(int event,void * data)1716 Vol::handle_header_read(int event, void *data)
1717 {
1718   AIOCallback *op;
1719   VolHeaderFooter *hf[4];
1720   switch (event) {
1721   case AIO_EVENT_DONE:
1722     op = static_cast<AIOCallback *>(data);
1723     for (auto &i : hf) {
1724       ink_assert(op != nullptr);
1725       i = static_cast<VolHeaderFooter *>(op->aiocb.aio_buf);
1726       if (static_cast<size_t>(op->aio_result) != op->aiocb.aio_nbytes) {
1727         Note("Header read failed: clearing cache directory %s", this->hash_text.get());
1728         clear_dir();
1729         return EVENT_DONE;
1730       }
1731       op = op->then;
1732     }
1733 
1734     io.aiocb.aio_fildes = fd;
1735     io.aiocb.aio_nbytes = this->dirlen();
1736     io.aiocb.aio_buf    = raw_dir;
1737     io.action           = this;
1738     io.thread           = AIO_CALLBACK_THREAD_ANY;
1739     io.then             = nullptr;
1740 
1741     if (hf[0]->sync_serial == hf[1]->sync_serial &&
1742         (hf[0]->sync_serial >= hf[2]->sync_serial || hf[2]->sync_serial != hf[3]->sync_serial)) {
1743       SET_HANDLER(&Vol::handle_dir_read);
1744       if (is_debug_tag_set("cache_init")) {
1745         Note("using directory A for '%s'", hash_text.get());
1746       }
1747       io.aiocb.aio_offset = skip;
1748       ink_assert(ink_aio_read(&io));
1749     }
1750     // try B
1751     else if (hf[2]->sync_serial == hf[3]->sync_serial) {
1752       SET_HANDLER(&Vol::handle_dir_read);
1753       if (is_debug_tag_set("cache_init")) {
1754         Note("using directory B for '%s'", hash_text.get());
1755       }
1756       io.aiocb.aio_offset = skip + this->dirlen();
1757       ink_assert(ink_aio_read(&io));
1758     } else {
1759       Note("no good directory, clearing '%s' since sync_serials on both A and B copies are invalid", hash_text.get());
1760       Note("Header A: %d\nFooter A: %d\n Header B: %d\n Footer B %d\n", hf[0]->sync_serial, hf[1]->sync_serial, hf[2]->sync_serial,
1761            hf[3]->sync_serial);
1762       clear_dir();
1763       delete init_info;
1764       init_info = nullptr;
1765     }
1766     return EVENT_DONE;
1767   default:
1768     ink_assert(!"not reach here");
1769   }
1770   return EVENT_DONE;
1771 }
1772 
1773 int
dir_init_done(int,void *)1774 Vol::dir_init_done(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1775 {
1776   if (!cache->cache_read_done) {
1777     eventProcessor.schedule_in(this, HRTIME_MSECONDS(5), ET_CALL);
1778     return EVENT_CONT;
1779   } else {
1780     int vol_no = gnvol++;
1781     ink_assert(!gvol[vol_no]);
1782     gvol[vol_no] = this;
1783     SET_HANDLER(&Vol::aggWrite);
1784     if (fd == -1) {
1785       cache->vol_initialized(false);
1786     } else {
1787       cache->vol_initialized(true);
1788     }
1789     return EVENT_DONE;
1790   }
1791 }
1792 
1793 // explicit pair for random table in build_vol_hash_table
1794 struct rtable_pair {
1795   unsigned int rval; ///< relative value, used to sort.
1796   unsigned int idx;  ///< volume mapping table index.
1797 };
1798 
1799 // comparison operator for random table in build_vol_hash_table
1800 // sorts based on the randomly assigned rval
1801 static int
cmprtable(const void * aa,const void * bb)1802 cmprtable(const void *aa, const void *bb)
1803 {
1804   rtable_pair *a = (rtable_pair *)aa;
1805   rtable_pair *b = (rtable_pair *)bb;
1806   if (a->rval < b->rval) {
1807     return -1;
1808   }
1809   if (a->rval > b->rval) {
1810     return 1;
1811   }
1812   return 0;
1813 }
1814 
1815 void
build_vol_hash_table(CacheHostRecord * cp)1816 build_vol_hash_table(CacheHostRecord *cp)
1817 {
1818   int num_vols          = cp->num_vols;
1819   unsigned int *mapping = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1820   Vol **p               = static_cast<Vol **>(ats_malloc(sizeof(Vol *) * num_vols));
1821 
1822   memset(mapping, 0, num_vols * sizeof(unsigned int));
1823   memset(p, 0, num_vols * sizeof(Vol *));
1824   uint64_t total = 0;
1825   int bad_vols   = 0;
1826   int map        = 0;
1827   uint64_t used  = 0;
1828   // initialize number of elements per vol
1829   for (int i = 0; i < num_vols; i++) {
1830     if (DISK_BAD(cp->vols[i]->disk)) {
1831       bad_vols++;
1832       continue;
1833     }
1834     mapping[map] = i;
1835     p[map++]     = cp->vols[i];
1836     total += (cp->vols[i]->len >> STORE_BLOCK_SHIFT);
1837   }
1838 
1839   num_vols -= bad_vols;
1840 
1841   if (!num_vols || !total) {
1842     // all the disks are corrupt,
1843     if (cp->vol_hash_table) {
1844       new_Freer(cp->vol_hash_table, CACHE_MEM_FREE_TIMEOUT);
1845     }
1846     cp->vol_hash_table = nullptr;
1847     ats_free(mapping);
1848     ats_free(p);
1849     return;
1850   }
1851 
1852   unsigned int *forvol   = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1853   unsigned int *gotvol   = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1854   unsigned int *rnd      = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1855   unsigned short *ttable = static_cast<unsigned short *>(ats_malloc(sizeof(unsigned short) * VOL_HASH_TABLE_SIZE));
1856   unsigned short *old_table;
1857   unsigned int *rtable_entries = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1858   unsigned int rtable_size     = 0;
1859 
1860   // estimate allocation
1861   for (int i = 0; i < num_vols; i++) {
1862     forvol[i] = (VOL_HASH_TABLE_SIZE * (p[i]->len >> STORE_BLOCK_SHIFT)) / total;
1863     used += forvol[i];
1864     rtable_entries[i] = p[i]->len / VOL_HASH_ALLOC_SIZE;
1865     rtable_size += rtable_entries[i];
1866     gotvol[i] = 0;
1867   }
1868   // spread around the excess
1869   int extra = VOL_HASH_TABLE_SIZE - used;
1870   for (int i = 0; i < extra; i++) {
1871     forvol[i % num_vols]++;
1872   }
1873   // seed random number generator
1874   for (int i = 0; i < num_vols; i++) {
1875     uint64_t x = p[i]->hash_id.fold();
1876     rnd[i]     = static_cast<unsigned int>(x);
1877   }
1878   // initialize table to "empty"
1879   for (int i = 0; i < VOL_HASH_TABLE_SIZE; i++) {
1880     ttable[i] = VOL_HASH_EMPTY;
1881   }
1882   // generate random numbers proportional to allocation
1883   rtable_pair *rtable = static_cast<rtable_pair *>(ats_malloc(sizeof(rtable_pair) * rtable_size));
1884   int rindex          = 0;
1885   for (int i = 0; i < num_vols; i++) {
1886     for (int j = 0; j < static_cast<int>(rtable_entries[i]); j++) {
1887       rtable[rindex].rval = next_rand(&rnd[i]);
1888       rtable[rindex].idx  = i;
1889       rindex++;
1890     }
1891   }
1892   ink_assert(rindex == (int)rtable_size);
1893   // sort (rand #, vol $ pairs)
1894   qsort(rtable, rtable_size, sizeof(rtable_pair), cmprtable);
1895   unsigned int width = (1LL << 32) / VOL_HASH_TABLE_SIZE;
1896   unsigned int pos; // target position to allocate
1897   // select vol with closest random number for each bucket
1898   int i = 0; // index moving through the random numbers
1899   for (int j = 0; j < VOL_HASH_TABLE_SIZE; j++) {
1900     pos = width / 2 + j * width; // position to select closest to
1901     while (pos > rtable[i].rval && i < static_cast<int>(rtable_size) - 1) {
1902       i++;
1903     }
1904     ttable[j] = mapping[rtable[i].idx];
1905     gotvol[rtable[i].idx]++;
1906   }
1907   for (int i = 0; i < num_vols; i++) {
1908     Debug("cache_init", "build_vol_hash_table index %d mapped to %d requested %d got %d", i, mapping[i], forvol[i], gotvol[i]);
1909   }
1910   // install new table
1911   if (nullptr != (old_table = ink_atomic_swap(&(cp->vol_hash_table), ttable))) {
1912     new_Freer(old_table, CACHE_MEM_FREE_TIMEOUT);
1913   }
1914   ats_free(mapping);
1915   ats_free(p);
1916   ats_free(forvol);
1917   ats_free(gotvol);
1918   ats_free(rnd);
1919   ats_free(rtable_entries);
1920   ats_free(rtable);
1921 }
1922 
1923 void
vol_initialized(bool result)1924 Cache::vol_initialized(bool result)
1925 {
1926   if (result) {
1927     ink_atomic_increment(&total_good_nvol, 1);
1928   }
1929   if (total_nvol == ink_atomic_increment(&total_initialized_vol, 1) + 1) {
1930     open_done();
1931   }
1932 }
1933 
1934 /** Set the state of a disk programmatically.
1935  */
1936 bool
mark_storage_offline(CacheDisk * d,bool admin)1937 CacheProcessor::mark_storage_offline(CacheDisk *d, ///< Target disk
1938                                      bool admin)
1939 {
1940   bool zret; // indicates whether there's any online storage left.
1941   int p;
1942   uint64_t total_bytes_delete = 0;
1943   uint64_t total_dir_delete   = 0;
1944   uint64_t used_dir_delete    = 0;
1945 
1946   /* Don't mark it again, it will invalidate the stats! */
1947   if (!d->online) {
1948     return this->has_online_storage();
1949   }
1950 
1951   d->online = false;
1952 
1953   if (!DISK_BAD(d)) {
1954     SET_DISK_BAD(d);
1955   }
1956 
1957   for (p = 0; p < gnvol; p++) {
1958     if (d->fd == gvol[p]->fd) {
1959       total_dir_delete += gvol[p]->buckets * gvol[p]->segments * DIR_DEPTH;
1960       used_dir_delete += dir_entries_used(gvol[p]);
1961       total_bytes_delete += gvol[p]->len - gvol[p]->dirlen();
1962     }
1963   }
1964 
1965   RecIncrGlobalRawStat(cache_rsb, cache_bytes_total_stat, -total_bytes_delete);
1966   RecIncrGlobalRawStat(cache_rsb, cache_direntries_total_stat, -total_dir_delete);
1967   RecIncrGlobalRawStat(cache_rsb, cache_direntries_used_stat, -used_dir_delete);
1968 
1969   /* Update the span metrics, if failing then move the span from "failing" to "offline" bucket
1970    * if operator took it offline, move it from "online" to "offline" bucket */
1971   RecIncrGlobalRawStat(cache_rsb, admin ? cache_span_online_stat : cache_span_failing_stat, -1);
1972   RecIncrGlobalRawStat(cache_rsb, cache_span_offline_stat, 1);
1973 
1974   if (theCache) {
1975     rebuild_host_table(theCache);
1976   }
1977 
1978   zret = this->has_online_storage();
1979   if (!zret) {
1980     Warning("All storage devices offline, cache disabled");
1981     CacheProcessor::cache_ready = 0;
1982   } else { // check cache types specifically
1983     if (theCache && !theCache->hosttable->gen_host_rec.vol_hash_table) {
1984       unsigned int caches_ready = 0;
1985       caches_ready              = caches_ready | (1 << CACHE_FRAG_TYPE_HTTP);
1986       caches_ready              = caches_ready | (1 << CACHE_FRAG_TYPE_NONE);
1987       caches_ready              = ~caches_ready;
1988       CacheProcessor::cache_ready &= caches_ready;
1989       Warning("all volumes for http cache are corrupt, http cache disabled");
1990     }
1991   }
1992 
1993   return zret;
1994 }
1995 
1996 bool
has_online_storage() const1997 CacheProcessor::has_online_storage() const
1998 {
1999   CacheDisk **dptr = gdisks;
2000   for (int disk_no = 0; disk_no < gndisks; ++disk_no, ++dptr) {
2001     if (!DISK_BAD(*dptr) && (*dptr)->online) {
2002       return true;
2003     }
2004   }
2005   return false;
2006 }
2007 
2008 int
handle_disk_failure(int,void * data)2009 AIO_Callback_handler::handle_disk_failure(int /* event ATS_UNUSED */, void *data)
2010 {
2011   /* search for the matching file descriptor */
2012   if (!CacheProcessor::cache_ready) {
2013     return EVENT_DONE;
2014   }
2015   int disk_no     = 0;
2016   AIOCallback *cb = static_cast<AIOCallback *>(data);
2017 
2018   for (; disk_no < gndisks; disk_no++) {
2019     CacheDisk *d = gdisks[disk_no];
2020 
2021     if (d->fd == cb->aiocb.aio_fildes) {
2022       char message[256];
2023       d->incrErrors(cb);
2024 
2025       if (!DISK_BAD(d)) {
2026         snprintf(message, sizeof(message), "Error accessing Disk %s [%d/%d]", d->path, d->num_errors, cache_config_max_disk_errors);
2027         Warning("%s", message);
2028         RecSignalManager(REC_SIGNAL_CACHE_WARNING, message);
2029       } else if (!DISK_BAD_SIGNALLED(d)) {
2030         snprintf(message, sizeof(message), "too many errors accessing disk %s [%d/%d]: declaring disk bad", d->path, d->num_errors,
2031                  cache_config_max_disk_errors);
2032         Warning("%s", message);
2033         RecSignalManager(REC_SIGNAL_CACHE_ERROR, message);
2034         cacheProcessor.mark_storage_offline(d); // take it out of service
2035       }
2036       break;
2037     }
2038   }
2039 
2040   delete cb;
2041   return EVENT_DONE;
2042 }
2043 
2044 int
open_done()2045 Cache::open_done()
2046 {
2047   Action *register_ShowCache(Continuation * c, HTTPHdr * h);
2048   Action *register_ShowCacheInternal(Continuation * c, HTTPHdr * h);
2049   statPagesManager.register_http("cache", register_ShowCache);
2050   statPagesManager.register_http("cache-internal", register_ShowCacheInternal);
2051 
2052   if (total_good_nvol == 0) {
2053     ready = CACHE_INIT_FAILED;
2054     cacheProcessor.cacheInitialized();
2055     return 0;
2056   }
2057 
2058   hosttable = new CacheHostTable(this, scheme);
2059   hosttable->register_config_callback(&hosttable);
2060 
2061   if (hosttable->gen_host_rec.num_cachevols == 0) {
2062     ready = CACHE_INIT_FAILED;
2063   } else {
2064     ready = CACHE_INITIALIZED;
2065   }
2066 
2067   // TS-3848
2068   if (ready == CACHE_INIT_FAILED && cacheProcessor.waitForCache() >= 2) {
2069     Emergency("Failed to initialize cache host table");
2070   }
2071 
2072   cacheProcessor.cacheInitialized();
2073 
2074   return 0;
2075 }
2076 
2077 int
open(bool clear,bool)2078 Cache::open(bool clear, bool /* fix ATS_UNUSED */)
2079 {
2080   int i;
2081   off_t blocks          = 0;
2082   cache_read_done       = 0;
2083   total_initialized_vol = 0;
2084   total_nvol            = 0;
2085   total_good_nvol       = 0;
2086 
2087   REC_EstablishStaticConfigInt32(cache_config_min_average_object_size, "proxy.config.cache.min_average_object_size");
2088   Debug("cache_init", "Cache::open - proxy.config.cache.min_average_object_size = %d", (int)cache_config_min_average_object_size);
2089 
2090   CacheVol *cp = cp_list.head;
2091   for (; cp; cp = cp->link.next) {
2092     if (cp->scheme == scheme) {
2093       cp->vols   = static_cast<Vol **>(ats_malloc(cp->num_vols * sizeof(Vol *)));
2094       int vol_no = 0;
2095       for (i = 0; i < gndisks; i++) {
2096         if (cp->disk_vols[i] && !DISK_BAD(cp->disk_vols[i]->disk)) {
2097           DiskVolBlockQueue *q = cp->disk_vols[i]->dpb_queue.head;
2098           for (; q; q = q->link.next) {
2099             cp->vols[vol_no]            = new Vol();
2100             CacheDisk *d                = cp->disk_vols[i]->disk;
2101             cp->vols[vol_no]->disk      = d;
2102             cp->vols[vol_no]->fd        = d->fd;
2103             cp->vols[vol_no]->cache     = this;
2104             cp->vols[vol_no]->cache_vol = cp;
2105             blocks                      = q->b->len;
2106 
2107             bool vol_clear = clear || d->cleared || q->new_block;
2108 #if AIO_MODE == AIO_MODE_NATIVE
2109             eventProcessor.schedule_imm(new VolInit(cp->vols[vol_no], d->path, blocks, q->b->offset, vol_clear));
2110 #else
2111             cp->vols[vol_no]->init(d->path, blocks, q->b->offset, vol_clear);
2112 #endif
2113             vol_no++;
2114             cache_size += blocks;
2115           }
2116         }
2117       }
2118       total_nvol += vol_no;
2119     }
2120   }
2121   if (total_nvol == 0) {
2122     return open_done();
2123   }
2124   cache_read_done = 1;
2125   return 0;
2126 }
2127 
2128 int
close()2129 Cache::close()
2130 {
2131   return -1;
2132 }
2133 
2134 int
dead(int,Event *)2135 CacheVC::dead(int /* event ATS_UNUSED */, Event * /*e ATS_UNUSED */)
2136 {
2137   ink_assert(0);
2138   return EVENT_DONE;
2139 }
2140 
2141 bool
is_pread_capable()2142 CacheVC::is_pread_capable()
2143 {
2144   return !f.read_from_writer_called;
2145 }
2146 
2147 #define STORE_COLLISION 1
2148 
2149 static void
unmarshal_helper(Doc * doc,Ptr<IOBufferData> & buf,int & okay)2150 unmarshal_helper(Doc *doc, Ptr<IOBufferData> &buf, int &okay)
2151 {
2152   using UnmarshalFunc           = int(char *buf, int len, RefCountObj *block_ref);
2153   UnmarshalFunc *unmarshal_func = &HTTPInfo::unmarshal;
2154   ts::VersionNumber version(doc->v_major, doc->v_minor);
2155 
2156   // introduced by https://github.com/apache/trafficserver/pull/4874, this is used to distinguish the doc version
2157   // before and after #4847
2158   if (version < CACHE_DB_VERSION) {
2159     unmarshal_func = &HTTPInfo::unmarshal_v24_1;
2160   }
2161 
2162   char *tmp = doc->hdr();
2163   int len   = doc->hlen;
2164   while (len > 0) {
2165     int r = unmarshal_func(tmp, len, buf.get());
2166     if (r < 0) {
2167       ink_assert(!"CacheVC::handleReadDone unmarshal failed");
2168       okay = 0;
2169       break;
2170     }
2171     len -= r;
2172     tmp += r;
2173   }
2174 }
2175 
2176 // [amc] I think this is where all disk reads from cache funnel through here.
2177 int
handleReadDone(int event,Event * e)2178 CacheVC::handleReadDone(int event, Event *e)
2179 {
2180   cancel_trigger();
2181   ink_assert(this_ethread() == mutex->thread_holding);
2182 
2183   Doc *doc = nullptr;
2184   if (event == AIO_EVENT_DONE) {
2185     set_io_not_in_progress();
2186   } else if (is_io_in_progress()) {
2187     return EVENT_CONT;
2188   }
2189   {
2190     MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
2191     if (!lock.is_locked()) {
2192       VC_SCHED_LOCK_RETRY();
2193     }
2194     if ((!dir_valid(vol, &dir)) || (!io.ok())) {
2195       if (!io.ok()) {
2196         Debug("cache_disk_error", "Read error on disk %s\n \
2197 	    read range : [%" PRIu64 " - %" PRIu64 " bytes]  [%" PRIu64 " - %" PRIu64 " blocks] \n",
2198               vol->hash_text.get(), (uint64_t)io.aiocb.aio_offset, (uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
2199               (uint64_t)io.aiocb.aio_offset / 512, (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / 512);
2200       }
2201       goto Ldone;
2202     }
2203 
2204     doc = reinterpret_cast<Doc *>(buf->data());
2205     ink_assert(vol->mutex->nthread_holding < 1000);
2206     ink_assert(doc->magic == DOC_MAGIC);
2207 
2208     if (ts::VersionNumber(doc->v_major, doc->v_minor) > CACHE_DB_VERSION) {
2209       // future version, count as corrupted
2210       doc->magic = DOC_CORRUPT;
2211       Debug("cache_bc", "Object is future version %d:%d - disk %s - doc id = %" PRIx64 ":%" PRIx64 "", doc->v_major, doc->v_minor,
2212             vol->hash_text.get(), read_key->slice64(0), read_key->slice64(1));
2213       goto Ldone;
2214     }
2215 
2216 #ifdef VERIFY_JTEST_DATA
2217     char xx[500];
2218     if (read_key && *read_key == doc->key && request.valid() && !dir_head(&dir) && !vio.ndone) {
2219       int ib = 0, xd = 0;
2220       request.url_get()->print(xx, 500, &ib, &xd);
2221       char *x = xx;
2222       for (int q = 0; q < 3; q++)
2223         x = strchr(x + 1, '/');
2224       ink_assert(!memcmp(doc->data(), x, ib - (x - xx)));
2225     }
2226 #endif
2227 
2228     if (is_debug_tag_set("cache_read")) {
2229       char xt[CRYPTO_HEX_SIZE];
2230       Debug("cache_read", "Read complete on fragment %s. Length: data payload=%d this fragment=%d total doc=%" PRId64 " prefix=%d",
2231             doc->key.toHexStr(xt), doc->data_len(), doc->len, doc->total_len, doc->prefix_len());
2232     }
2233 
2234     // put into ram cache?
2235     if (io.ok() && ((doc->first_key == *read_key) || (doc->key == *read_key) || STORE_COLLISION) && doc->magic == DOC_MAGIC) {
2236       int okay = 1;
2237       if (!f.doc_from_ram_cache) {
2238         f.not_from_ram_cache = 1;
2239       }
2240       if (cache_config_enable_checksum && doc->checksum != DOC_NO_CHECKSUM) {
2241         // verify that the checksum matches
2242         uint32_t checksum = 0;
2243         for (char *b = doc->hdr(); b < reinterpret_cast<char *>(doc) + doc->len; b++) {
2244           checksum += *b;
2245         }
2246         ink_assert(checksum == doc->checksum);
2247         if (checksum != doc->checksum) {
2248           Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "] len %d, hlen %d, disk %s, offset %" PRIu64 " size %zu",
2249                doc->first_key.b[0], doc->first_key.b[1], doc->len, doc->hlen, vol->path, (uint64_t)io.aiocb.aio_offset,
2250                (size_t)io.aiocb.aio_nbytes);
2251           doc->magic = DOC_CORRUPT;
2252           okay       = 0;
2253         }
2254       }
2255       (void)e; // Avoid compiler warnings
2256       bool http_copy_hdr = false;
2257       http_copy_hdr =
2258         cache_config_ram_cache_compress && !f.doc_from_ram_cache && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen;
2259       // If http doc we need to unmarshal the headers before putting in the ram cache
2260       // unless it could be compressed
2261       if (!http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay) {
2262         unmarshal_helper(doc, buf, okay);
2263       }
2264       // Put the request in the ram cache only if its a open_read or lookup
2265       if (vio.op == VIO::READ && okay) {
2266         bool cutoff_check;
2267         // cutoff_check :
2268         // doc_len == 0 for the first fragment (it is set from the vector)
2269         //                The decision on the first fragment is based on
2270         //                doc->total_len
2271         // After that, the decision is based of doc_len (doc_len != 0)
2272         // (cache_config_ram_cache_cutoff == 0) : no cutoffs
2273         cutoff_check =
2274           ((!doc_len && static_cast<int64_t>(doc->total_len) < cache_config_ram_cache_cutoff) ||
2275            (doc_len && static_cast<int64_t>(doc_len) < cache_config_ram_cache_cutoff) || !cache_config_ram_cache_cutoff);
2276         if (cutoff_check && !f.doc_from_ram_cache) {
2277           uint64_t o = dir_offset(&dir);
2278           vol->ram_cache->put(read_key, buf.get(), doc->len, http_copy_hdr, static_cast<uint32_t>(o >> 32),
2279                               static_cast<uint32_t>(o));
2280         }
2281         if (!doc_len) {
2282           // keep a pointer to it. In case the state machine decides to
2283           // update this document, we don't have to read it back in memory
2284           // again
2285           vol->first_fragment_key    = *read_key;
2286           vol->first_fragment_offset = dir_offset(&dir);
2287           vol->first_fragment_data   = buf;
2288         }
2289       } // end VIO::READ check
2290       // If it could be compressed, unmarshal after
2291       if (http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay) {
2292         unmarshal_helper(doc, buf, okay);
2293       }
2294     } // end io.ok() check
2295   }
2296 Ldone:
2297   POP_HANDLER;
2298   return handleEvent(AIO_EVENT_DONE, nullptr);
2299 }
2300 
2301 int
handleRead(int,Event *)2302 CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
2303 {
2304   cancel_trigger();
2305 
2306   f.doc_from_ram_cache = false;
2307 
2308   // check ram cache
2309   ink_assert(vol->mutex->thread_holding == this_ethread());
2310   int64_t o           = dir_offset(&dir);
2311   int ram_hit_state   = vol->ram_cache->get(read_key, &buf, static_cast<uint32_t>(o >> 32), static_cast<uint32_t>(o));
2312   f.compressed_in_ram = (ram_hit_state > RAM_HIT_COMPRESS_NONE) ? 1 : 0;
2313   if (ram_hit_state >= RAM_HIT_COMPRESS_NONE) {
2314     goto LramHit;
2315   }
2316 
2317   // check if it was read in the last open_read call
2318   if (*read_key == vol->first_fragment_key && dir_offset(&dir) == vol->first_fragment_offset) {
2319     buf = vol->first_fragment_data;
2320     goto LmemHit;
2321   }
2322   // see if its in the aggregation buffer
2323   if (dir_agg_buf_valid(vol, &dir)) {
2324     int agg_offset = vol->vol_offset(&dir) - vol->header->write_pos;
2325     buf            = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
2326     ink_assert((agg_offset + io.aiocb.aio_nbytes) <= (unsigned)vol->agg_buf_pos);
2327     char *doc = buf->data();
2328     char *agg = vol->agg_buffer + agg_offset;
2329     memcpy(doc, agg, io.aiocb.aio_nbytes);
2330     io.aio_result = io.aiocb.aio_nbytes;
2331     SET_HANDLER(&CacheVC::handleReadDone);
2332     return EVENT_RETURN;
2333   }
2334 
2335   io.aiocb.aio_fildes = vol->fd;
2336   io.aiocb.aio_offset = vol->vol_offset(&dir);
2337   if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast<off_t>(vol->skip + vol->len)) {
2338     io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset;
2339   }
2340   buf              = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
2341   io.aiocb.aio_buf = buf->data();
2342   io.action        = this;
2343   io.thread        = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
2344   SET_HANDLER(&CacheVC::handleReadDone);
2345   ink_assert(ink_aio_read(&io) >= 0);
2346   CACHE_DEBUG_INCREMENT_DYN_STAT(cache_pread_count_stat);
2347   return EVENT_CONT;
2348 
2349 LramHit : {
2350   f.doc_from_ram_cache = true;
2351   io.aio_result        = io.aiocb.aio_nbytes;
2352   Doc *doc             = reinterpret_cast<Doc *>(buf->data());
2353   if (cache_config_ram_cache_compress && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
2354     SET_HANDLER(&CacheVC::handleReadDone);
2355     return EVENT_RETURN;
2356   }
2357 }
2358 LmemHit:
2359   f.doc_from_ram_cache = true;
2360   io.aio_result        = io.aiocb.aio_nbytes;
2361   POP_HANDLER;
2362   return EVENT_RETURN; // allow the caller to release the volume lock
2363 }
2364 
2365 Action *
lookup(Continuation * cont,const CacheKey * key,CacheFragType type,const char * hostname,int host_len)2366 Cache::lookup(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len)
2367 {
2368   if (!CacheProcessor::IsCacheReady(type)) {
2369     cont->handleEvent(CACHE_EVENT_LOOKUP_FAILED, nullptr);
2370     return ACTION_RESULT_DONE;
2371   }
2372 
2373   Vol *vol          = key_to_vol(key, hostname, host_len);
2374   ProxyMutex *mutex = cont->mutex.get();
2375   CacheVC *c        = new_CacheVC(cont);
2376   SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
2377   c->vio.op    = VIO::READ;
2378   c->base_stat = cache_lookup_active_stat;
2379   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
2380   c->first_key = c->key = *key;
2381   c->frag_type          = type;
2382   c->f.lookup           = 1;
2383   c->vol                = vol;
2384   c->last_collision     = nullptr;
2385 
2386   if (c->handleEvent(EVENT_INTERVAL, nullptr) == EVENT_CONT) {
2387     return &c->_action;
2388   } else {
2389     return ACTION_RESULT_DONE;
2390   }
2391 }
2392 
2393 int
removeEvent(int,Event *)2394 CacheVC::removeEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
2395 {
2396   cancel_trigger();
2397   set_io_not_in_progress();
2398   {
2399     MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
2400     if (!lock.is_locked()) {
2401       VC_SCHED_LOCK_RETRY();
2402     }
2403     if (_action.cancelled) {
2404       if (od) {
2405         vol->close_write(this);
2406         od = nullptr;
2407       }
2408       goto Lfree;
2409     }
2410     if (!f.remove_aborted_writers) {
2411       if (vol->open_write(this, true, 1)) {
2412         // writer  exists
2413         od = vol->open_read(&key);
2414         ink_release_assert(od);
2415         od->dont_update_directory = true;
2416         od                        = nullptr;
2417       } else {
2418         od->dont_update_directory = true;
2419       }
2420       f.remove_aborted_writers = 1;
2421     }
2422   Lread:
2423     SET_HANDLER(&CacheVC::removeEvent);
2424     if (!buf) {
2425       goto Lcollision;
2426     }
2427     if (!dir_valid(vol, &dir)) {
2428       last_collision = nullptr;
2429       goto Lcollision;
2430     }
2431     // check read completed correct FIXME: remove bad vols
2432     if (static_cast<size_t>(io.aio_result) != io.aiocb.aio_nbytes) {
2433       goto Ldone;
2434     }
2435     {
2436       // verify that this is our document
2437       Doc *doc = reinterpret_cast<Doc *>(buf->data());
2438       /* should be first_key not key..right?? */
2439       if (doc->first_key == key) {
2440         ink_assert(doc->magic == DOC_MAGIC);
2441         if (dir_delete(&key, vol, &dir) > 0) {
2442           if (od) {
2443             vol->close_write(this);
2444           }
2445           od = nullptr;
2446           goto Lremoved;
2447         }
2448         goto Ldone;
2449       }
2450     }
2451   Lcollision:
2452     // check for collision
2453     if (dir_probe(&key, vol, &dir, &last_collision) > 0) {
2454       int ret = do_read_call(&key);
2455       if (ret == EVENT_RETURN) {
2456         goto Lread;
2457       }
2458       return ret;
2459     }
2460   Ldone:
2461     CACHE_INCREMENT_DYN_STAT(cache_remove_failure_stat);
2462     if (od) {
2463       vol->close_write(this);
2464     }
2465   }
2466   ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
2467   _action.continuation->handleEvent(CACHE_EVENT_REMOVE_FAILED, (void *)-ECACHE_NO_DOC);
2468   goto Lfree;
2469 Lremoved:
2470   _action.continuation->handleEvent(CACHE_EVENT_REMOVE, nullptr);
2471 Lfree:
2472   return free_CacheVC(this);
2473 }
2474 
2475 Action *
remove(Continuation * cont,const CacheKey * key,CacheFragType type,const char * hostname,int host_len)2476 Cache::remove(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len)
2477 {
2478   if (!CacheProcessor::IsCacheReady(type)) {
2479     if (cont) {
2480       cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, nullptr);
2481     }
2482     return ACTION_RESULT_DONE;
2483   }
2484 
2485   Ptr<ProxyMutex> mutex;
2486   if (!cont) {
2487     cont = new_CacheRemoveCont();
2488   }
2489 
2490   CACHE_TRY_LOCK(lock, cont->mutex, this_ethread());
2491   ink_assert(lock.is_locked());
2492   Vol *vol = key_to_vol(key, hostname, host_len);
2493   // coverity[var_decl]
2494   Dir result;
2495   dir_clear(&result); // initialized here, set result empty so we can recognize missed lock
2496   mutex = cont->mutex;
2497 
2498   CacheVC *c   = new_CacheVC(cont);
2499   c->vio.op    = VIO::NONE;
2500   c->frag_type = type;
2501   c->base_stat = cache_remove_active_stat;
2502   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
2503   c->first_key = c->key = *key;
2504   c->vol                = vol;
2505   c->dir                = result;
2506   c->f.remove           = 1;
2507 
2508   SET_CONTINUATION_HANDLER(c, &CacheVC::removeEvent);
2509   int ret = c->removeEvent(EVENT_IMMEDIATE, nullptr);
2510   if (ret == EVENT_DONE) {
2511     return ACTION_RESULT_DONE;
2512   } else {
2513     return &c->_action;
2514   }
2515 }
2516 // CacheVConnection
2517 
CacheVConnection()2518 CacheVConnection::CacheVConnection() : VConnection(nullptr) {}
2519 
2520 void
cplist_init()2521 cplist_init()
2522 {
2523   cp_list_len = 0;
2524   for (int i = 0; i < gndisks; i++) {
2525     CacheDisk *d = gdisks[i];
2526     DiskVol **dp = d->disk_vols;
2527     for (unsigned int j = 0; j < d->header->num_volumes; j++) {
2528       ink_assert(dp[j]->dpb_queue.head);
2529       CacheVol *p = cp_list.head;
2530       while (p) {
2531         if (p->vol_number == dp[j]->vol_number) {
2532           ink_assert(p->scheme == (int)dp[j]->dpb_queue.head->b->type);
2533           p->size += dp[j]->size;
2534           p->num_vols += dp[j]->num_volblocks;
2535           p->disk_vols[i] = dp[j];
2536           break;
2537         }
2538         p = p->link.next;
2539       }
2540       if (!p) {
2541         // did not find a volume in the cache vol list...create
2542         // a new one
2543         CacheVol *new_p   = new CacheVol();
2544         new_p->vol_number = dp[j]->vol_number;
2545         new_p->num_vols   = dp[j]->num_volblocks;
2546         new_p->size       = dp[j]->size;
2547         new_p->scheme     = dp[j]->dpb_queue.head->b->type;
2548         new_p->disk_vols  = static_cast<DiskVol **>(ats_malloc(gndisks * sizeof(DiskVol *)));
2549         memset(new_p->disk_vols, 0, gndisks * sizeof(DiskVol *));
2550         new_p->disk_vols[i] = dp[j];
2551         cp_list.enqueue(new_p);
2552         cp_list_len++;
2553       }
2554     }
2555   }
2556 }
2557 
2558 static int fillExclusiveDisks(CacheVol *cp);
2559 
2560 void
cplist_update()2561 cplist_update()
2562 {
2563   /* go through cplist and delete volumes that are not in the volume.config */
2564   CacheVol *cp = cp_list.head;
2565   ConfigVol *config_vol;
2566 
2567   while (cp) {
2568     for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
2569       if (config_vol->number == cp->vol_number) {
2570         if (cp->scheme == config_vol->scheme) {
2571           cp->ramcache_enabled = config_vol->ramcache_enabled;
2572           config_vol->cachep   = cp;
2573         } else {
2574           /* delete this volume from all the disks */
2575           int d_no;
2576           int clearCV = 1;
2577 
2578           for (d_no = 0; d_no < gndisks; d_no++) {
2579             if (cp->disk_vols[d_no]) {
2580               if (cp->disk_vols[d_no]->disk->forced_volume_num == cp->vol_number) {
2581                 clearCV            = 0;
2582                 config_vol->cachep = cp;
2583               } else {
2584                 cp->disk_vols[d_no]->disk->delete_volume(cp->vol_number);
2585                 cp->disk_vols[d_no] = nullptr;
2586               }
2587             }
2588           }
2589           if (clearCV) {
2590             config_vol = nullptr;
2591           }
2592         }
2593         break;
2594       }
2595     }
2596 
2597     if (!config_vol) {
2598       // did not find a matching volume in the config file.
2599       // Delete the volume from the cache vol list
2600       int d_no;
2601       for (d_no = 0; d_no < gndisks; d_no++) {
2602         if (cp->disk_vols[d_no]) {
2603           cp->disk_vols[d_no]->disk->delete_volume(cp->vol_number);
2604         }
2605       }
2606       CacheVol *temp_cp = cp;
2607       cp                = cp->link.next;
2608       cp_list.remove(temp_cp);
2609       cp_list_len--;
2610       delete temp_cp;
2611       continue;
2612     } else {
2613       cp = cp->link.next;
2614     }
2615   }
2616 
2617   // Look for (exclusive) spans forced to a specific volume but not yet referenced by any volumes in cp_list,
2618   // if found then create a new volume. This also makes sure new exclusive disk volumes are created first
2619   // before any other new volumes to assure proper span free space calculation and proper volume block distribution.
2620   for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
2621     if (nullptr == config_vol->cachep) {
2622       // Find out if this is a forced volume assigned exclusively to a span which was cleared (hence not referenced in cp_list).
2623       // Note: non-exclusive cleared spans are not handled here, only the "exclusive"
2624       bool forced_volume = false;
2625       for (int d_no = 0; d_no < gndisks; d_no++) {
2626         if (gdisks[d_no]->forced_volume_num == config_vol->number) {
2627           forced_volume = true;
2628         }
2629       }
2630 
2631       if (forced_volume) {
2632         CacheVol *new_cp = new CacheVol();
2633         if (nullptr != new_cp) {
2634           new_cp->disk_vols = static_cast<decltype(new_cp->disk_vols)>(ats_malloc(gndisks * sizeof(DiskVol *)));
2635           if (nullptr != new_cp->disk_vols) {
2636             memset(new_cp->disk_vols, 0, gndisks * sizeof(DiskVol *));
2637             new_cp->vol_number = config_vol->number;
2638             new_cp->scheme     = config_vol->scheme;
2639             config_vol->cachep = new_cp;
2640             fillExclusiveDisks(config_vol->cachep);
2641             cp_list.enqueue(new_cp);
2642           } else {
2643             delete new_cp;
2644           }
2645         }
2646       }
2647     } else {
2648       // Fill if this is exclusive disk.
2649       fillExclusiveDisks(config_vol->cachep);
2650     }
2651   }
2652 }
2653 
2654 static int
fillExclusiveDisks(CacheVol * cp)2655 fillExclusiveDisks(CacheVol *cp)
2656 {
2657   int diskCount     = 0;
2658   int volume_number = cp->vol_number;
2659 
2660   Debug("cache_init", "volume %d", volume_number);
2661   for (int i = 0; i < gndisks; i++) {
2662     if (gdisks[i]->forced_volume_num != volume_number) {
2663       continue;
2664     }
2665 
2666     /* OK, this should be an "exclusive" disk (span). */
2667     diskCount++;
2668 
2669     /* There should be a single "forced" volume and no other volumes should exist on this "exclusive" disk (span) */
2670     bool found_nonforced_volumes = false;
2671     for (int j = 0; j < static_cast<int>(gdisks[i]->header->num_volumes); j++) {
2672       if (volume_number != gdisks[i]->disk_vols[j]->vol_number) {
2673         found_nonforced_volumes = true;
2674         break;
2675       }
2676     }
2677 
2678     if (found_nonforced_volumes) {
2679       /* The user had created several volumes before - clear the disk and create one volume for http */
2680       Note("Clearing Disk: %s", gdisks[i]->path);
2681       gdisks[i]->delete_all_volumes();
2682     } else if (1 == gdisks[i]->header->num_volumes) {
2683       /* "Forced" volumes take the whole disk (span) hence nothing more to do for this span. */
2684       continue;
2685     }
2686 
2687     /* Now, volumes have been either deleted or did not exist to begin with so we need to create them. */
2688 
2689     int64_t size_diff = gdisks[i]->num_usable_blocks;
2690     DiskVolBlock *dpb;
2691     do {
2692       dpb = gdisks[i]->create_volume(volume_number, size_diff, cp->scheme);
2693       if (dpb) {
2694         if (!cp->disk_vols[i]) {
2695           cp->disk_vols[i] = gdisks[i]->get_diskvol(volume_number);
2696         }
2697         size_diff -= dpb->len;
2698         cp->size += dpb->len;
2699         cp->num_vols++;
2700       } else {
2701         Debug("cache_init", "create_volume failed");
2702         break;
2703       }
2704     } while ((size_diff > 0));
2705   }
2706 
2707   /* Report back the number of disks (spans) that were assigned to volume specified by volume_number. */
2708   return diskCount;
2709 }
2710 
2711 int
cplist_reconfigure()2712 cplist_reconfigure()
2713 {
2714   int64_t size;
2715   int volume_number;
2716   off_t size_in_blocks;
2717   ConfigVol *config_vol;
2718 
2719   gnvol = 0;
2720   if (config_volumes.num_volumes == 0) {
2721     /* only the http cache */
2722     CacheVol *cp   = new CacheVol();
2723     cp->vol_number = 0;
2724     cp->scheme     = CACHE_HTTP_TYPE;
2725     cp->disk_vols  = static_cast<DiskVol **>(ats_malloc(gndisks * sizeof(DiskVol *)));
2726     memset(cp->disk_vols, 0, gndisks * sizeof(DiskVol *));
2727     cp_list.enqueue(cp);
2728     cp_list_len++;
2729     for (int i = 0; i < gndisks; i++) {
2730       if (gdisks[i]->header->num_volumes != 1 || gdisks[i]->disk_vols[0]->vol_number != 0) {
2731         /* The user had created several volumes before - clear the disk
2732            and create one volume for http */
2733         Note("Clearing Disk: %s", gdisks[i]->path);
2734         gdisks[i]->delete_all_volumes();
2735       }
2736       if (gdisks[i]->cleared) {
2737         uint64_t free_space = gdisks[i]->free_space * STORE_BLOCK_SIZE;
2738         int vols            = (free_space / MAX_VOL_SIZE) + 1;
2739         for (int p = 0; p < vols; p++) {
2740           off_t b = gdisks[i]->free_space / (vols - p);
2741           Debug("cache_hosting", "blocks = %" PRId64, (int64_t)b);
2742           DiskVolBlock *dpb = gdisks[i]->create_volume(0, b, CACHE_HTTP_TYPE);
2743           ink_assert(dpb && dpb->len == (uint64_t)b);
2744         }
2745         ink_assert(gdisks[i]->free_space == 0);
2746       }
2747 
2748       ink_assert(gdisks[i]->header->num_volumes == 1);
2749       DiskVol **dp = gdisks[i]->disk_vols;
2750       gnvol += dp[0]->num_volblocks;
2751       cp->size += dp[0]->size;
2752       cp->num_vols += dp[0]->num_volblocks;
2753       cp->disk_vols[i] = dp[0];
2754     }
2755 
2756   } else {
2757     for (int i = 0; i < gndisks; i++) {
2758       if (gdisks[i]->header->num_volumes == 1 && gdisks[i]->disk_vols[0]->vol_number == 0) {
2759         /* The user had created several volumes before - clear the disk
2760            and create one volume for http */
2761         Note("Clearing Disk: %s", gdisks[i]->path);
2762         gdisks[i]->delete_all_volumes();
2763       }
2764     }
2765 
2766     /* change percentages in the config partitions to absolute value */
2767     off_t tot_space_in_blks = 0;
2768     off_t blocks_per_vol    = VOL_BLOCK_SIZE / STORE_BLOCK_SIZE;
2769     /* sum up the total space available on all the disks.
2770        round down the space to 128 megabytes */
2771     for (int i = 0; i < gndisks; i++) {
2772       // Exclude exclusive disks (with forced volumes) from the following total space calculation,
2773       // in such a way forced volumes will not impact volume percentage calculations.
2774       if (-1 == gdisks[i]->forced_volume_num) {
2775         tot_space_in_blks += (gdisks[i]->num_usable_blocks / blocks_per_vol) * blocks_per_vol;
2776       }
2777     }
2778 
2779     double percent_remaining = 100.00;
2780     for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
2781       if (config_vol->in_percent) {
2782         if (config_vol->percent > percent_remaining) {
2783           Warning("total volume sizes added up to more than 100%%!");
2784           Warning("no volumes created");
2785           return -1;
2786         }
2787 
2788         // Find if the volume is forced and if it is then calculate the total forced volume size.
2789         // Forced volumes take the whole span (disk) also sum all disk space this volume is forced to.
2790         int64_t tot_forced_space_in_blks = 0;
2791         for (int i = 0; i < gndisks; i++) {
2792           if (config_vol->number == gdisks[i]->forced_volume_num) {
2793             tot_forced_space_in_blks += (gdisks[i]->num_usable_blocks / blocks_per_vol) * blocks_per_vol;
2794           }
2795         }
2796 
2797         int64_t space_in_blks = 0;
2798         if (0 == tot_forced_space_in_blks) {
2799           // Calculate the space as percentage of total space in blocks.
2800           space_in_blks = static_cast<int64_t>(((config_vol->percent / percent_remaining)) * tot_space_in_blks);
2801         } else {
2802           // Forced volumes take all disk space, so no percentage calculations here.
2803           space_in_blks = tot_forced_space_in_blks;
2804         }
2805 
2806         space_in_blks = space_in_blks >> (20 - STORE_BLOCK_SHIFT);
2807         /* round down to 128 megabyte multiple */
2808         space_in_blks    = (space_in_blks >> 7) << 7;
2809         config_vol->size = space_in_blks;
2810 
2811         if (0 == tot_forced_space_in_blks) {
2812           tot_space_in_blks -= space_in_blks << (20 - STORE_BLOCK_SHIFT);
2813           percent_remaining -= (config_vol->size < 128) ? 0 : config_vol->percent;
2814         }
2815       }
2816       if (config_vol->size < 128) {
2817         Warning("the size of volume %d (%" PRId64 ") is less than the minimum required volume size %d", config_vol->number,
2818                 (int64_t)config_vol->size, 128);
2819         Warning("volume %d is not created", config_vol->number);
2820       }
2821       Debug("cache_hosting", "Volume: %d Size: %" PRId64 " Ramcache: %d", config_vol->number, (int64_t)config_vol->size,
2822             config_vol->ramcache_enabled);
2823     }
2824     cplist_update();
2825 
2826     /* go through volume config and grow and create volumes */
2827     for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
2828       size = config_vol->size;
2829       if (size < 128) {
2830         continue;
2831       }
2832 
2833       volume_number = config_vol->number;
2834 
2835       size_in_blocks = (static_cast<off_t>(size) * 1024 * 1024) / STORE_BLOCK_SIZE;
2836 
2837       if (config_vol->cachep && config_vol->cachep->num_vols > 0) {
2838         gnvol += config_vol->cachep->num_vols;
2839         continue;
2840       }
2841 
2842       if (!config_vol->cachep) {
2843         // we did not find a corresponding entry in cache vol...create one
2844 
2845         CacheVol *new_cp  = new CacheVol();
2846         new_cp->disk_vols = static_cast<DiskVol **>(ats_malloc(gndisks * sizeof(DiskVol *)));
2847         memset(new_cp->disk_vols, 0, gndisks * sizeof(DiskVol *));
2848         if (create_volume(config_vol->number, size_in_blocks, config_vol->scheme, new_cp)) {
2849           ats_free(new_cp->disk_vols);
2850           new_cp->disk_vols = nullptr;
2851           delete new_cp;
2852           return -1;
2853         }
2854         cp_list.enqueue(new_cp);
2855         cp_list_len++;
2856         config_vol->cachep = new_cp;
2857         gnvol += new_cp->num_vols;
2858         continue;
2859       }
2860       //    else
2861       CacheVol *cp = config_vol->cachep;
2862       ink_assert(cp->size <= size_in_blocks);
2863       if (cp->size == size_in_blocks) {
2864         gnvol += cp->num_vols;
2865         continue;
2866       }
2867       // else the size is greater...
2868       /* search the cp_list */
2869 
2870       int *sorted_vols = new int[gndisks];
2871       for (int i = 0; i < gndisks; i++) {
2872         sorted_vols[i] = i;
2873       }
2874       for (int i = 0; i < gndisks - 1; i++) {
2875         int smallest     = sorted_vols[i];
2876         int smallest_ndx = i;
2877         for (int j = i + 1; j < gndisks; j++) {
2878           int curr      = sorted_vols[j];
2879           DiskVol *dvol = cp->disk_vols[curr];
2880           if (gdisks[curr]->cleared) {
2881             ink_assert(!dvol);
2882             // disks that are cleared should be filled first
2883             smallest     = curr;
2884             smallest_ndx = j;
2885           } else if (!dvol && cp->disk_vols[smallest]) {
2886             smallest     = curr;
2887             smallest_ndx = j;
2888           } else if (dvol && cp->disk_vols[smallest] && (dvol->size < cp->disk_vols[smallest]->size)) {
2889             smallest     = curr;
2890             smallest_ndx = j;
2891           }
2892         }
2893         sorted_vols[smallest_ndx] = sorted_vols[i];
2894         sorted_vols[i]            = smallest;
2895       }
2896 
2897       int64_t size_to_alloc = size_in_blocks - cp->size;
2898       int disk_full         = 0;
2899       for (int i = 0; (i < gndisks) && size_to_alloc; i++) {
2900         int disk_no = sorted_vols[i];
2901         ink_assert(cp->disk_vols[sorted_vols[gndisks - 1]]);
2902         int largest_vol = cp->disk_vols[sorted_vols[gndisks - 1]]->size;
2903 
2904         /* allocate storage on new disk. Find the difference
2905            between the biggest volume on any disk and
2906            the volume on this disk and try to make
2907            them equal */
2908         int64_t size_diff = (cp->disk_vols[disk_no]) ? largest_vol - cp->disk_vols[disk_no]->size : largest_vol;
2909         size_diff         = (size_diff < size_to_alloc) ? size_diff : size_to_alloc;
2910         /* if size_diff == 0, then then the disks have volumes of the
2911            same sizes, so we don't need to balance the disks */
2912         if (size_diff == 0) {
2913           break;
2914         }
2915 
2916         DiskVolBlock *dpb;
2917         do {
2918           dpb = gdisks[disk_no]->create_volume(volume_number, size_diff, cp->scheme);
2919           if (dpb) {
2920             if (!cp->disk_vols[disk_no]) {
2921               cp->disk_vols[disk_no] = gdisks[disk_no]->get_diskvol(volume_number);
2922             }
2923             size_diff -= dpb->len;
2924             cp->size += dpb->len;
2925             cp->num_vols++;
2926           } else {
2927             break;
2928           }
2929         } while ((size_diff > 0));
2930 
2931         if (!dpb) {
2932           disk_full++;
2933         }
2934 
2935         size_to_alloc = size_in_blocks - cp->size;
2936       }
2937 
2938       delete[] sorted_vols;
2939 
2940       if (size_to_alloc) {
2941         if (create_volume(volume_number, size_to_alloc, cp->scheme, cp)) {
2942           return -1;
2943         }
2944       }
2945       gnvol += cp->num_vols;
2946     }
2947   }
2948   return 0;
2949 }
2950 
2951 // This is some really bad code, and needs to be rewritten!
2952 int
create_volume(int volume_number,off_t size_in_blocks,int scheme,CacheVol * cp)2953 create_volume(int volume_number, off_t size_in_blocks, int scheme, CacheVol *cp)
2954 {
2955   static int curr_vol  = 0; // FIXME: this will not reinitialize correctly
2956   off_t to_create      = size_in_blocks;
2957   off_t blocks_per_vol = VOL_BLOCK_SIZE >> STORE_BLOCK_SHIFT;
2958   int full_disks       = 0;
2959 
2960   cp->vol_number = volume_number;
2961   cp->scheme     = scheme;
2962   if (fillExclusiveDisks(cp)) {
2963     Debug("cache_init", "volume successfully filled from forced disks: volume_number=%d", volume_number);
2964     return 0;
2965   }
2966 
2967   int *sp = new int[gndisks];
2968   memset(sp, 0, gndisks * sizeof(int));
2969 
2970   int i = curr_vol;
2971   while (size_in_blocks > 0) {
2972     if (gdisks[i]->free_space >= (sp[i] + blocks_per_vol)) {
2973       sp[i] += blocks_per_vol;
2974       size_in_blocks -= blocks_per_vol;
2975       full_disks = 0;
2976     } else {
2977       full_disks += 1;
2978       if (full_disks == gndisks) {
2979         char config_file[PATH_NAME_MAX];
2980         REC_ReadConfigString(config_file, "proxy.config.cache.volume_filename", PATH_NAME_MAX);
2981         if (cp->size) {
2982           Warning("not enough space to increase volume: [%d] to size: [%" PRId64 "]", volume_number,
2983                   (int64_t)((to_create + cp->size) >> (20 - STORE_BLOCK_SHIFT)));
2984         } else {
2985           Warning("not enough space to create volume: [%d], size: [%" PRId64 "]", volume_number,
2986                   (int64_t)(to_create >> (20 - STORE_BLOCK_SHIFT)));
2987         }
2988 
2989         Note("edit the %s file and restart traffic_server", config_file);
2990         delete[] sp;
2991         return -1;
2992       }
2993     }
2994     i = (i + 1) % gndisks;
2995   }
2996   cp->vol_number = volume_number;
2997   cp->scheme     = scheme;
2998   curr_vol       = i;
2999   for (i = 0; i < gndisks; i++) {
3000     if (sp[i] > 0) {
3001       while (sp[i] > 0) {
3002         DiskVolBlock *p = gdisks[i]->create_volume(volume_number, sp[i], scheme);
3003         ink_assert(p && (p->len >= (unsigned int)blocks_per_vol));
3004         sp[i] -= p->len;
3005         cp->num_vols++;
3006         cp->size += p->len;
3007       }
3008       if (!cp->disk_vols[i]) {
3009         cp->disk_vols[i] = gdisks[i]->get_diskvol(volume_number);
3010       }
3011     }
3012   }
3013   delete[] sp;
3014   return 0;
3015 }
3016 
3017 void
rebuild_host_table(Cache * cache)3018 rebuild_host_table(Cache *cache)
3019 {
3020   build_vol_hash_table(&cache->hosttable->gen_host_rec);
3021   if (cache->hosttable->m_numEntries != 0) {
3022     CacheHostMatcher *hm   = cache->hosttable->getHostMatcher();
3023     CacheHostRecord *h_rec = hm->getDataArray();
3024     int h_rec_len          = hm->getNumElements();
3025     int i;
3026     for (i = 0; i < h_rec_len; i++) {
3027       build_vol_hash_table(&h_rec[i]);
3028     }
3029   }
3030 }
3031 
3032 // if generic_host_rec.vols == nullptr, what do we do???
3033 Vol *
key_to_vol(const CacheKey * key,const char * hostname,int host_len)3034 Cache::key_to_vol(const CacheKey *key, const char *hostname, int host_len)
3035 {
3036   uint32_t h                 = (key->slice32(2) >> DIR_TAG_WIDTH) % VOL_HASH_TABLE_SIZE;
3037   unsigned short *hash_table = hosttable->gen_host_rec.vol_hash_table;
3038   CacheHostRecord *host_rec  = &hosttable->gen_host_rec;
3039 
3040   if (hosttable->m_numEntries > 0 && host_len) {
3041     CacheHostResult res;
3042     hosttable->Match(hostname, host_len, &res);
3043     if (res.record) {
3044       unsigned short *host_hash_table = res.record->vol_hash_table;
3045       if (host_hash_table) {
3046         if (is_debug_tag_set("cache_hosting")) {
3047           char format_str[50];
3048           snprintf(format_str, sizeof(format_str), "Volume: %%xd for host: %%.%ds", host_len);
3049           Debug("cache_hosting", format_str, res.record, hostname);
3050         }
3051         return res.record->vols[host_hash_table[h]];
3052       }
3053     }
3054   }
3055   if (hash_table) {
3056     if (is_debug_tag_set("cache_hosting")) {
3057       char format_str[50];
3058       snprintf(format_str, sizeof(format_str), "Generic volume: %%xd for host: %%.%ds", host_len);
3059       Debug("cache_hosting", format_str, host_rec, hostname);
3060     }
3061     return host_rec->vols[hash_table[h]];
3062   } else {
3063     return host_rec->vols[0];
3064   }
3065 }
3066 
3067 static void
reg_int(const char * str,int stat,RecRawStatBlock * rsb,const char * prefix,RecRawStatSyncCb sync_cb=RecRawStatSyncSum)3068 reg_int(const char *str, int stat, RecRawStatBlock *rsb, const char *prefix, RecRawStatSyncCb sync_cb = RecRawStatSyncSum)
3069 {
3070   char stat_str[256];
3071   snprintf(stat_str, sizeof(stat_str), "%s.%s", prefix, str);
3072   RecRegisterRawStat(rsb, RECT_PROCESS, stat_str, RECD_INT, RECP_NON_PERSISTENT, stat, sync_cb);
3073   DOCACHE_CLEAR_DYN_STAT(stat)
3074 }
3075 #define REG_INT(_str, _stat) reg_int(_str, (int)_stat, rsb, prefix)
3076 
3077 // Register Stats
3078 void
register_cache_stats(RecRawStatBlock * rsb,const char * prefix)3079 register_cache_stats(RecRawStatBlock *rsb, const char *prefix)
3080 {
3081   // Special case for this sucker, since it uses its own aggregator.
3082   reg_int("bytes_used", cache_bytes_used_stat, rsb, prefix, cache_stats_bytes_used_cb);
3083 
3084   REG_INT("bytes_total", cache_bytes_total_stat);
3085   REG_INT("ram_cache.total_bytes", cache_ram_cache_bytes_total_stat);
3086   REG_INT("ram_cache.bytes_used", cache_ram_cache_bytes_stat);
3087   REG_INT("ram_cache.hits", cache_ram_cache_hits_stat);
3088   REG_INT("ram_cache.misses", cache_ram_cache_misses_stat);
3089   REG_INT("pread_count", cache_pread_count_stat);
3090   REG_INT("percent_full", cache_percent_full_stat);
3091   REG_INT("lookup.active", cache_lookup_active_stat);
3092   REG_INT("lookup.success", cache_lookup_success_stat);
3093   REG_INT("lookup.failure", cache_lookup_failure_stat);
3094   REG_INT("read.active", cache_read_active_stat);
3095   REG_INT("read.success", cache_read_success_stat);
3096   REG_INT("read.failure", cache_read_failure_stat);
3097   REG_INT("write.active", cache_write_active_stat);
3098   REG_INT("write.success", cache_write_success_stat);
3099   REG_INT("write.failure", cache_write_failure_stat);
3100   REG_INT("write.backlog.failure", cache_write_backlog_failure_stat);
3101   REG_INT("update.active", cache_update_active_stat);
3102   REG_INT("update.success", cache_update_success_stat);
3103   REG_INT("update.failure", cache_update_failure_stat);
3104   REG_INT("remove.active", cache_remove_active_stat);
3105   REG_INT("remove.success", cache_remove_success_stat);
3106   REG_INT("remove.failure", cache_remove_failure_stat);
3107   REG_INT("evacuate.active", cache_evacuate_active_stat);
3108   REG_INT("evacuate.success", cache_evacuate_success_stat);
3109   REG_INT("evacuate.failure", cache_evacuate_failure_stat);
3110   REG_INT("scan.active", cache_scan_active_stat);
3111   REG_INT("scan.success", cache_scan_success_stat);
3112   REG_INT("scan.failure", cache_scan_failure_stat);
3113   REG_INT("direntries.total", cache_direntries_total_stat);
3114   REG_INT("direntries.used", cache_direntries_used_stat);
3115   REG_INT("directory_collision", cache_directory_collision_count_stat);
3116   REG_INT("frags_per_doc.1", cache_single_fragment_document_count_stat);
3117   REG_INT("frags_per_doc.2", cache_two_fragment_document_count_stat);
3118   REG_INT("frags_per_doc.3+", cache_three_plus_plus_fragment_document_count_stat);
3119   REG_INT("read_busy.success", cache_read_busy_success_stat);
3120   REG_INT("read_busy.failure", cache_read_busy_failure_stat);
3121   REG_INT("write_bytes_stat", cache_write_bytes_stat);
3122   REG_INT("vector_marshals", cache_hdr_vector_marshal_stat);
3123   REG_INT("hdr_marshals", cache_hdr_marshal_stat);
3124   REG_INT("hdr_marshal_bytes", cache_hdr_marshal_bytes_stat);
3125   REG_INT("gc_bytes_evacuated", cache_gc_bytes_evacuated_stat);
3126   REG_INT("gc_frags_evacuated", cache_gc_frags_evacuated_stat);
3127   REG_INT("wrap_count", cache_directory_wrap_stat);
3128   REG_INT("sync.count", cache_directory_sync_count_stat);
3129   REG_INT("sync.bytes", cache_directory_sync_bytes_stat);
3130   REG_INT("sync.time", cache_directory_sync_time_stat);
3131   REG_INT("span.errors.read", cache_span_errors_read_stat);
3132   REG_INT("span.errors.write", cache_span_errors_write_stat);
3133   REG_INT("span.failing", cache_span_failing_stat);
3134   REG_INT("span.offline", cache_span_offline_stat);
3135   REG_INT("span.online", cache_span_online_stat);
3136 }
3137 
3138 int
FragmentSizeUpdateCb(const char *,RecDataT,RecData data,void * cookie)3139 FragmentSizeUpdateCb(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data, void *cookie)
3140 {
3141   if (sizeof(Doc) >= static_cast<size_t>(data.rec_int) || static_cast<size_t>(data.rec_int) - sizeof(Doc) > MAX_FRAG_SIZE) {
3142     Warning("The fragments size exceed the limitation, ignore: %" PRId64 ", %d", data.rec_int, cache_config_target_fragment_size);
3143     return 0;
3144   }
3145 
3146   cache_config_target_fragment_size = data.rec_int;
3147   return 0;
3148 }
3149 
3150 void
ink_cache_init(ts::ModuleVersion v)3151 ink_cache_init(ts::ModuleVersion v)
3152 {
3153   ink_release_assert(v.check(CACHE_MODULE_VERSION));
3154 
3155   cache_rsb = RecAllocateRawStatBlock(static_cast<int>(cache_stat_count));
3156 
3157   REC_EstablishStaticConfigInteger(cache_config_ram_cache_size, "proxy.config.cache.ram_cache.size");
3158   Debug("cache_init", "proxy.config.cache.ram_cache.size = %" PRId64 " = %" PRId64 "Mb", cache_config_ram_cache_size,
3159         cache_config_ram_cache_size / (1024 * 1024));
3160 
3161   REC_EstablishStaticConfigInt32(cache_config_ram_cache_algorithm, "proxy.config.cache.ram_cache.algorithm");
3162   REC_EstablishStaticConfigInt32(cache_config_ram_cache_compress, "proxy.config.cache.ram_cache.compress");
3163   REC_EstablishStaticConfigInt32(cache_config_ram_cache_compress_percent, "proxy.config.cache.ram_cache.compress_percent");
3164   REC_ReadConfigInt32(cache_config_ram_cache_use_seen_filter, "proxy.config.cache.ram_cache.use_seen_filter");
3165 
3166   REC_EstablishStaticConfigInt32(cache_config_http_max_alts, "proxy.config.cache.limits.http.max_alts");
3167   Debug("cache_init", "proxy.config.cache.limits.http.max_alts = %d", cache_config_http_max_alts);
3168 
3169   REC_EstablishStaticConfigInteger(cache_config_ram_cache_cutoff, "proxy.config.cache.ram_cache_cutoff");
3170   Debug("cache_init", "cache_config_ram_cache_cutoff = %" PRId64 " = %" PRId64 "Mb", cache_config_ram_cache_cutoff,
3171         cache_config_ram_cache_cutoff / (1024 * 1024));
3172 
3173   REC_EstablishStaticConfigInt32(cache_config_permit_pinning, "proxy.config.cache.permit.pinning");
3174   Debug("cache_init", "proxy.config.cache.permit.pinning = %d", cache_config_permit_pinning);
3175 
3176   REC_EstablishStaticConfigInt32(cache_config_dir_sync_frequency, "proxy.config.cache.dir.sync_frequency");
3177   Debug("cache_init", "proxy.config.cache.dir.sync_frequency = %d", cache_config_dir_sync_frequency);
3178 
3179   REC_EstablishStaticConfigInt32(cache_config_select_alternate, "proxy.config.cache.select_alternate");
3180   Debug("cache_init", "proxy.config.cache.select_alternate = %d", cache_config_select_alternate);
3181 
3182   REC_EstablishStaticConfigInt32(cache_config_max_doc_size, "proxy.config.cache.max_doc_size");
3183   Debug("cache_init", "proxy.config.cache.max_doc_size = %d = %dMb", cache_config_max_doc_size,
3184         cache_config_max_doc_size / (1024 * 1024));
3185 
3186   REC_EstablishStaticConfigInt32(cache_config_mutex_retry_delay, "proxy.config.cache.mutex_retry_delay");
3187   Debug("cache_init", "proxy.config.cache.mutex_retry_delay = %dms", cache_config_mutex_retry_delay);
3188 
3189   REC_EstablishStaticConfigInt32(cache_config_read_while_writer_max_retries, "proxy.config.cache.read_while_writer.max_retries");
3190   Debug("cache_init", "proxy.config.cache.read_while_writer.max_retries = %d", cache_config_read_while_writer_max_retries);
3191 
3192   REC_EstablishStaticConfigInt32(cache_read_while_writer_retry_delay, "proxy.config.cache.read_while_writer_retry.delay");
3193   Debug("cache_init", "proxy.config.cache.read_while_writer_retry.delay = %dms", cache_read_while_writer_retry_delay);
3194 
3195   REC_EstablishStaticConfigInt32(cache_config_hit_evacuate_percent, "proxy.config.cache.hit_evacuate_percent");
3196   Debug("cache_init", "proxy.config.cache.hit_evacuate_percent = %d", cache_config_hit_evacuate_percent);
3197 
3198   REC_EstablishStaticConfigInt32(cache_config_hit_evacuate_size_limit, "proxy.config.cache.hit_evacuate_size_limit");
3199   Debug("cache_init", "proxy.config.cache.hit_evacuate_size_limit = %d", cache_config_hit_evacuate_size_limit);
3200 
3201   REC_EstablishStaticConfigInt32(cache_config_force_sector_size, "proxy.config.cache.force_sector_size");
3202 
3203   ink_assert(REC_RegisterConfigUpdateFunc("proxy.config.cache.target_fragment_size", FragmentSizeUpdateCb, nullptr) !=
3204              REC_ERR_FAIL);
3205   REC_ReadConfigInt32(cache_config_target_fragment_size, "proxy.config.cache.target_fragment_size");
3206 
3207   if (cache_config_target_fragment_size == 0 || cache_config_target_fragment_size - sizeof(Doc) > MAX_FRAG_SIZE) {
3208     cache_config_target_fragment_size = DEFAULT_TARGET_FRAGMENT_SIZE;
3209   }
3210 
3211   REC_EstablishStaticConfigInt32(cache_config_max_disk_errors, "proxy.config.cache.max_disk_errors");
3212   Debug("cache_init", "proxy.config.cache.max_disk_errors = %d", cache_config_max_disk_errors);
3213 
3214   REC_EstablishStaticConfigInt32(cache_config_agg_write_backlog, "proxy.config.cache.agg_write_backlog");
3215   Debug("cache_init", "proxy.config.cache.agg_write_backlog = %d", cache_config_agg_write_backlog);
3216 
3217   REC_EstablishStaticConfigInt32(cache_config_enable_checksum, "proxy.config.cache.enable_checksum");
3218   Debug("cache_init", "proxy.config.cache.enable_checksum = %d", cache_config_enable_checksum);
3219 
3220   REC_EstablishStaticConfigInt32(cache_config_alt_rewrite_max_size, "proxy.config.cache.alt_rewrite_max_size");
3221   Debug("cache_init", "proxy.config.cache.alt_rewrite_max_size = %d", cache_config_alt_rewrite_max_size);
3222 
3223   REC_EstablishStaticConfigInt32(cache_config_read_while_writer, "proxy.config.cache.enable_read_while_writer");
3224   cache_config_read_while_writer = validate_rww(cache_config_read_while_writer);
3225   REC_RegisterConfigUpdateFunc("proxy.config.cache.enable_read_while_writer", update_cache_config, nullptr);
3226   Debug("cache_init", "proxy.config.cache.enable_read_while_writer = %d", cache_config_read_while_writer);
3227 
3228   register_cache_stats(cache_rsb, "proxy.process.cache");
3229 
3230   REC_ReadConfigInteger(cacheProcessor.wait_for_cache, "proxy.config.http.wait_for_cache");
3231 
3232   Result result = theCacheStore.read_config();
3233   if (result.failed()) {
3234     Fatal("Failed to read cache configuration %s: %s", ts::filename::STORAGE, result.message());
3235   }
3236 }
3237 
3238 //----------------------------------------------------------------------------
3239 Action *
open_read(Continuation * cont,const HttpCacheKey * key,CacheHTTPHdr * request,const OverridableHttpConfigParams * params,time_t pin_in_cache,CacheFragType type)3240 CacheProcessor::open_read(Continuation *cont, const HttpCacheKey *key, CacheHTTPHdr *request,
3241                           const OverridableHttpConfigParams *params, time_t pin_in_cache, CacheFragType type)
3242 {
3243   return caches[type]->open_read(cont, &key->hash, request, params, type, key->hostname, key->hostlen);
3244 }
3245 
3246 //----------------------------------------------------------------------------
3247 Action *
open_write(Continuation * cont,int expected_size,const HttpCacheKey * key,CacheHTTPHdr * request,CacheHTTPInfo * old_info,time_t pin_in_cache,CacheFragType type)3248 CacheProcessor::open_write(Continuation *cont, int expected_size, const HttpCacheKey *key, CacheHTTPHdr *request,
3249                            CacheHTTPInfo *old_info, time_t pin_in_cache, CacheFragType type)
3250 {
3251   return caches[type]->open_write(cont, &key->hash, old_info, pin_in_cache, nullptr /* key1 */, type, key->hostname, key->hostlen);
3252 }
3253 
3254 //----------------------------------------------------------------------------
3255 // Note: this should not be called from from the cluster processor, or bad
3256 // recursion could occur. This is merely a convenience wrapper.
3257 Action *
remove(Continuation * cont,const HttpCacheKey * key,CacheFragType frag_type)3258 CacheProcessor::remove(Continuation *cont, const HttpCacheKey *key, CacheFragType frag_type)
3259 {
3260   return caches[frag_type]->remove(cont, &key->hash, frag_type, key->hostname, key->hostlen);
3261 }
3262 
3263 CacheDisk *
find_by_path(const char * path,int len)3264 CacheProcessor::find_by_path(const char *path, int len)
3265 {
3266   if (CACHE_INITIALIZED == initialized) {
3267     // If no length is passed in, assume it's null terminated.
3268     if (0 >= len && 0 != *path) {
3269       len = strlen(path);
3270     }
3271 
3272     for (int i = 0; i < gndisks; ++i) {
3273       if (0 == strncmp(path, gdisks[i]->path, len)) {
3274         return gdisks[i];
3275       }
3276     }
3277   }
3278 
3279   return nullptr;
3280 }
3281