1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include "P_Cache.h"
25
26 // Cache Inspector and State Pages
27 #include "P_CacheTest.h"
28 #include "StatPages.h"
29
30 #include "tscore/I_Layout.h"
31 #include "tscore/Filenames.h"
32
33 #include "HttpTransactCache.h"
34 #include "HttpSM.h"
35 #include "HttpCacheSM.h"
36 #include "InkAPIInternal.h"
37
38 #include "tscore/hugepages.h"
39
40 #include <atomic>
41
42 constexpr ts::VersionNumber CACHE_DB_VERSION(CACHE_DB_MAJOR_VERSION, CACHE_DB_MINOR_VERSION);
43
44 // Compilation Options
45 #define USELESS_REENABLES // allow them for now
46 // #define VERIFY_JTEST_DATA
47
48 static size_t DEFAULT_RAM_CACHE_MULTIPLIER = 10; // I.e. 10x 1MB per 1GB of disk.
49
50 // This is the oldest version number that is still usable.
51 static short int const CACHE_DB_MAJOR_VERSION_COMPATIBLE = 21;
52
53 #define DOCACHE_CLEAR_DYN_STAT(x) \
54 do { \
55 RecSetRawStatSum(rsb, x, 0); \
56 RecSetRawStatCount(rsb, x, 0); \
57 } while (0);
58
59 // Configuration
60
61 int64_t cache_config_ram_cache_size = AUTO_SIZE_RAM_CACHE;
62 int cache_config_ram_cache_algorithm = 1;
63 int cache_config_ram_cache_compress = 0;
64 int cache_config_ram_cache_compress_percent = 90;
65 int cache_config_ram_cache_use_seen_filter = 1;
66 int cache_config_http_max_alts = 3;
67 int cache_config_dir_sync_frequency = 60;
68 int cache_config_permit_pinning = 0;
69 int cache_config_select_alternate = 1;
70 int cache_config_max_doc_size = 0;
71 int cache_config_min_average_object_size = ESTIMATED_OBJECT_SIZE;
72 int64_t cache_config_ram_cache_cutoff = AGG_SIZE;
73 int cache_config_max_disk_errors = 5;
74 int cache_config_hit_evacuate_percent = 10;
75 int cache_config_hit_evacuate_size_limit = 0;
76 int cache_config_force_sector_size = 0;
77 int cache_config_target_fragment_size = DEFAULT_TARGET_FRAGMENT_SIZE;
78 int cache_config_agg_write_backlog = AGG_SIZE * 2;
79 int cache_config_enable_checksum = 0;
80 int cache_config_alt_rewrite_max_size = 4096;
81 int cache_config_read_while_writer = 0;
82 int cache_config_mutex_retry_delay = 2;
83 int cache_read_while_writer_retry_delay = 50;
84 int cache_config_read_while_writer_max_retries = 10;
85
86 // Globals
87
88 RecRawStatBlock *cache_rsb = nullptr;
89 Cache *theCache = nullptr;
90 CacheDisk **gdisks = nullptr;
91 int gndisks = 0;
92 std::atomic<int> initialize_disk = 0;
93 Cache *caches[NUM_CACHE_FRAG_TYPES] = {nullptr};
94 CacheSync *cacheDirSync = nullptr;
95 Store theCacheStore;
96 int CacheProcessor::initialized = CACHE_INITIALIZING;
97 uint32_t CacheProcessor::cache_ready = 0;
98 int CacheProcessor::start_done = 0;
99 bool CacheProcessor::clear = false;
100 bool CacheProcessor::fix = false;
101 bool CacheProcessor::check = false;
102 int CacheProcessor::start_internal_flags = 0;
103 int CacheProcessor::auto_clear_flag = 0;
104 CacheProcessor cacheProcessor;
105 Vol **gvol = nullptr;
106 std::atomic<int> gnvol = 0;
107 ClassAllocator<CacheVC> cacheVConnectionAllocator("cacheVConnection");
108 ClassAllocator<EvacuationBlock> evacuationBlockAllocator("evacuationBlock");
109 ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator("cacheRemoveCont");
110 ClassAllocator<EvacuationKey> evacuationKeyAllocator("evacuationKey");
111 int CacheVC::size_to_init = -1;
112 CacheKey zero_key;
113
114 struct VolInitInfo {
115 off_t recover_pos;
116 AIOCallbackInternal vol_aio[4];
117 char *vol_h_f;
118
VolInitInfoVolInitInfo119 VolInitInfo()
120 {
121 recover_pos = 0;
122 vol_h_f = static_cast<char *>(ats_memalign(ats_pagesize(), 4 * STORE_BLOCK_SIZE));
123 memset(vol_h_f, 0, 4 * STORE_BLOCK_SIZE);
124 }
125
~VolInitInfoVolInitInfo126 ~VolInitInfo()
127 {
128 for (auto &i : vol_aio) {
129 i.action = nullptr;
130 i.mutex.clear();
131 }
132 free(vol_h_f);
133 }
134 };
135
136 #if AIO_MODE == AIO_MODE_NATIVE
137 struct VolInit : public Continuation {
138 Vol *vol;
139 char *path;
140 off_t blocks;
141 int64_t offset;
142 bool vol_clear;
143
144 int
mainEventVolInit145 mainEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
146 {
147 vol->init(path, blocks, offset, vol_clear);
148 mutex.clear();
149 delete this;
150 return EVENT_DONE;
151 }
152
VolInitVolInit153 VolInit(Vol *v, char *p, off_t b, int64_t o, bool c) : Continuation(v->mutex), vol(v), path(p), blocks(b), offset(o), vol_clear(c)
154 {
155 SET_HANDLER(&VolInit::mainEvent);
156 }
157 };
158
159 struct DiskInit : public Continuation {
160 CacheDisk *disk;
161 char *s;
162 off_t blocks;
163 off_t askip;
164 int ahw_sector_size;
165 int fildes;
166 bool clear;
167
168 int
mainEventDiskInit169 mainEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
170 {
171 disk->open(s, blocks, askip, ahw_sector_size, fildes, clear);
172 ats_free(s);
173 mutex.clear();
174 delete this;
175 return EVENT_DONE;
176 }
177
DiskInitDiskInit178 DiskInit(CacheDisk *d, char *str, off_t b, off_t skip, int sector, int f, bool c)
179 : Continuation(d->mutex), disk(d), s(ats_strdup(str)), blocks(b), askip(skip), ahw_sector_size(sector), fildes(f), clear(c)
180 {
181 SET_HANDLER(&DiskInit::mainEvent);
182 }
183 };
184 #endif
185 void cplist_init();
186 static void cplist_update();
187 int cplist_reconfigure();
188 static int create_volume(int volume_number, off_t size_in_blocks, int scheme, CacheVol *cp);
189 static void rebuild_host_table(Cache *cache);
190 void register_cache_stats(RecRawStatBlock *rsb, const char *prefix);
191
192 // Global list of the volumes created
193 Queue<CacheVol> cp_list;
194 int cp_list_len = 0;
195 ConfigVolumes config_volumes;
196
197 #if TS_HAS_TESTS
198 void
force_link_CacheTestCaller()199 force_link_CacheTestCaller()
200 {
201 force_link_CacheTest();
202 }
203 #endif
204
205 int64_t
cache_bytes_used(int volume)206 cache_bytes_used(int volume)
207 {
208 uint64_t used = 0;
209
210 for (int i = 0; i < gnvol; i++) {
211 if (!DISK_BAD(gvol[i]->disk) && (volume == -1 || gvol[i]->cache_vol->vol_number == volume)) {
212 if (!gvol[i]->header->cycle) {
213 used += gvol[i]->header->write_pos - gvol[i]->start;
214 } else {
215 used += gvol[i]->len - gvol[i]->dirlen() - EVACUATION_SIZE;
216 }
217 }
218 }
219
220 return used;
221 }
222
223 int
cache_stats_bytes_used_cb(const char * name,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)224 cache_stats_bytes_used_cb(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
225 {
226 int volume = -1;
227 char *p;
228
229 // Well, there's no way to pass along the volume ID, so extracting it from the stat name.
230 p = strstr(const_cast<char *>(name), "volume_");
231 if (p != nullptr) {
232 // I'm counting on the compiler to optimize out strlen("volume_").
233 volume = strtol(p + strlen("volume_"), nullptr, 10);
234 }
235
236 if (cacheProcessor.initialized == CACHE_INITIALIZED) {
237 int64_t used, total = 0;
238 float percent_full;
239
240 used = cache_bytes_used(volume);
241 RecSetGlobalRawStatSum(rsb, id, used);
242 RecRawStatSyncSum(name, data_type, data, rsb, id);
243 RecGetGlobalRawStatSum(rsb, static_cast<int>(cache_bytes_total_stat), &total);
244 percent_full = static_cast<float>(used) / static_cast<float>(total) * 100;
245 // The percent_full float below gets rounded down
246 RecSetGlobalRawStatSum(rsb, static_cast<int>(cache_percent_full_stat), static_cast<int64_t>(percent_full));
247 }
248
249 return 1;
250 }
251
252 static int
validate_rww(int new_value)253 validate_rww(int new_value)
254 {
255 if (new_value) {
256 float http_bg_fill;
257
258 REC_ReadConfigFloat(http_bg_fill, "proxy.config.http.background_fill_completed_threshold");
259 if (http_bg_fill > 0.0) {
260 Note("to enable reading while writing a document, %s should be 0.0: read while writing disabled",
261 "proxy.config.http.background_fill_completed_threshold");
262 return 0;
263 }
264 if (cache_config_max_doc_size > 0) {
265 Note("to enable reading while writing a document, %s should be 0: read while writing disabled",
266 "proxy.config.cache.max_doc_size");
267 return 0;
268 }
269 return new_value;
270 }
271 return 0;
272 }
273
274 static int
update_cache_config(const char *,RecDataT,RecData data,void *)275 update_cache_config(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data,
276 void * /* cookie ATS_UNUSED */)
277 {
278 int new_value = validate_rww(data.rec_int);
279 cache_config_read_while_writer = new_value;
280
281 return 0;
282 }
283
CacheVC()284 CacheVC::CacheVC()
285 {
286 size_to_init = sizeof(CacheVC) - (size_t) & ((CacheVC *)nullptr)->vio;
287 memset((void *)&vio, 0, size_to_init);
288 }
289
290 HTTPInfo::FragOffset *
get_frag_table()291 CacheVC::get_frag_table()
292 {
293 ink_assert(alternate.valid());
294 return alternate.valid() ? alternate.get_frag_table() : nullptr;
295 }
296
297 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * abuf)298 CacheVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *abuf)
299 {
300 ink_assert(vio.op == VIO::READ);
301 vio.buffer.writer_for(abuf);
302 vio.set_continuation(c);
303 vio.ndone = 0;
304 vio.nbytes = nbytes;
305 vio.vc_server = this;
306 #ifdef DEBUG
307 ink_assert(!c || c->mutex->thread_holding);
308 #endif
309 if (c && !trigger && !recursive) {
310 trigger = c->mutex->thread_holding->schedule_imm_local(this);
311 }
312 return &vio;
313 }
314
315 VIO *
do_io_pread(Continuation * c,int64_t nbytes,MIOBuffer * abuf,int64_t offset)316 CacheVC::do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *abuf, int64_t offset)
317 {
318 ink_assert(vio.op == VIO::READ);
319 vio.buffer.writer_for(abuf);
320 vio.set_continuation(c);
321 vio.ndone = 0;
322 vio.nbytes = nbytes;
323 vio.vc_server = this;
324 seek_to = offset;
325 #ifdef DEBUG
326 ink_assert(c->mutex->thread_holding);
327 #endif
328 if (!trigger && !recursive) {
329 trigger = c->mutex->thread_holding->schedule_imm_local(this);
330 }
331 return &vio;
332 }
333
334 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * abuf,bool owner)335 CacheVC::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuf, bool owner)
336 {
337 ink_assert(vio.op == VIO::WRITE);
338 ink_assert(!owner);
339 vio.buffer.reader_for(abuf);
340 vio.set_continuation(c);
341 vio.ndone = 0;
342 vio.nbytes = nbytes;
343 vio.vc_server = this;
344 #ifdef DEBUG
345 ink_assert(!c || c->mutex->thread_holding);
346 #endif
347 if (c && !trigger && !recursive) {
348 trigger = c->mutex->thread_holding->schedule_imm_local(this);
349 }
350 return &vio;
351 }
352
353 void
do_io_close(int alerrno)354 CacheVC::do_io_close(int alerrno)
355 {
356 ink_assert(mutex->thread_holding == this_ethread());
357 int previous_closed = closed;
358 closed = (alerrno == -1) ? 1 : -1; // Stupid default arguments
359 DDebug("cache_close", "do_io_close %p %d %d", this, alerrno, closed);
360 if (!previous_closed && !recursive) {
361 die();
362 }
363 }
364
365 void
reenable(VIO * avio)366 CacheVC::reenable(VIO *avio)
367 {
368 DDebug("cache_reenable", "reenable %p", this);
369 (void)avio;
370 #ifdef DEBUG
371 ink_assert(avio->mutex->thread_holding);
372 #endif
373 if (!trigger) {
374 #ifndef USELESS_REENABLES
375 if (vio.op == VIO::READ) {
376 if (vio.buffer.mbuf->max_read_avail() > vio.buffer.writer()->water_mark)
377 ink_assert(!"useless reenable of cache read");
378 } else if (!vio.buffer.reader()->read_avail())
379 ink_assert(!"useless reenable of cache write");
380 #endif
381 trigger = avio->mutex->thread_holding->schedule_imm_local(this);
382 }
383 }
384
385 void
reenable_re(VIO * avio)386 CacheVC::reenable_re(VIO *avio)
387 {
388 DDebug("cache_reenable", "reenable_re %p", this);
389 (void)avio;
390 #ifdef DEBUG
391 ink_assert(avio->mutex->thread_holding);
392 #endif
393 if (!trigger) {
394 if (!is_io_in_progress() && !recursive) {
395 handleEvent(EVENT_NONE, (void *)nullptr);
396 } else {
397 trigger = avio->mutex->thread_holding->schedule_imm_local(this);
398 }
399 }
400 }
401
402 bool
get_data(int i,void * data)403 CacheVC::get_data(int i, void *data)
404 {
405 switch (i) {
406 case CACHE_DATA_HTTP_INFO:
407 *(static_cast<CacheHTTPInfo **>(data)) = &alternate;
408 return true;
409 case CACHE_DATA_RAM_CACHE_HIT_FLAG:
410 *(static_cast<int *>(data)) = !f.not_from_ram_cache;
411 return true;
412 default:
413 break;
414 }
415 return false;
416 }
417
418 int64_t
get_object_size()419 CacheVC::get_object_size()
420 {
421 return (this)->doc_len;
422 }
423
424 bool
set_data(int,void *)425 CacheVC::set_data(int /* i ATS_UNUSED */, void * /* data */)
426 {
427 ink_assert(!"CacheVC::set_data should not be called!");
428 return true;
429 }
430
431 void
get_http_info(CacheHTTPInfo ** ainfo)432 CacheVC::get_http_info(CacheHTTPInfo **ainfo)
433 {
434 *ainfo = &(this)->alternate;
435 }
436
437 // set_http_info must be called before do_io_write
438 // cluster vc does an optimization where it calls do_io_write() before
439 // calling set_http_info(), but it guarantees that the info will
440 // be set before transferring any bytes
441 void
set_http_info(CacheHTTPInfo * ainfo)442 CacheVC::set_http_info(CacheHTTPInfo *ainfo)
443 {
444 ink_assert(!total_len);
445 if (f.update) {
446 ainfo->object_key_set(update_key);
447 ainfo->object_size_set(update_len);
448 } else {
449 ainfo->object_key_set(earliest_key);
450 // don't know the total len yet
451 }
452
453 MIMEField *field = ainfo->m_alt->m_response_hdr.field_find(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH);
454 if (field && !field->value_get_int64()) {
455 f.allow_empty_doc = 1;
456 } else {
457 f.allow_empty_doc = 0;
458 }
459
460 alternate.copy_shallow(ainfo);
461 ainfo->clear();
462 }
463
464 bool
set_pin_in_cache(time_t time_pin)465 CacheVC::set_pin_in_cache(time_t time_pin)
466 {
467 if (total_len) {
468 ink_assert(!"should Pin the document before writing");
469 return false;
470 }
471 if (vio.op != VIO::WRITE) {
472 ink_assert(!"Pinning only allowed while writing objects to the cache");
473 return false;
474 }
475 pin_in_cache = time_pin;
476 return true;
477 }
478
479 time_t
get_pin_in_cache()480 CacheVC::get_pin_in_cache()
481 {
482 return pin_in_cache;
483 }
484
485 int
begin_read(CacheVC * cont)486 Vol::begin_read(CacheVC *cont)
487 {
488 ink_assert(cont->mutex->thread_holding == this_ethread());
489 ink_assert(mutex->thread_holding == this_ethread());
490 #ifdef CACHE_STAT_PAGES
491 ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
492 stat_cache_vcs.enqueue(cont, cont->stat_link);
493 #endif
494 // no need for evacuation as the entire document is already in memory
495 if (cont->f.single_fragment) {
496 return 0;
497 }
498 int i = dir_evac_bucket(&cont->earliest_dir);
499 EvacuationBlock *b;
500 for (b = evacuate[i].head; b; b = b->link.next) {
501 if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir)) {
502 continue;
503 }
504 if (b->readers) {
505 b->readers = b->readers + 1;
506 }
507 return 0;
508 }
509 // we don't actually need to preserve this block as it is already in
510 // memory, but this is easier, and evacuations are rare
511 EThread *t = cont->mutex->thread_holding;
512 b = new_EvacuationBlock(t);
513 b->readers = 1;
514 b->dir = cont->earliest_dir;
515 b->evac_frags.key = cont->earliest_key;
516 evacuate[i].push(b);
517 return 1;
518 }
519
520 int
close_read(CacheVC * cont)521 Vol::close_read(CacheVC *cont)
522 {
523 EThread *t = cont->mutex->thread_holding;
524 ink_assert(t == this_ethread());
525 ink_assert(t == mutex->thread_holding);
526 if (dir_is_empty(&cont->earliest_dir)) {
527 return 1;
528 }
529 int i = dir_evac_bucket(&cont->earliest_dir);
530 EvacuationBlock *b;
531 for (b = evacuate[i].head; b;) {
532 EvacuationBlock *next = b->link.next;
533 if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir)) {
534 b = next;
535 continue;
536 }
537 if (b->readers && !--b->readers) {
538 evacuate[i].remove(b);
539 free_EvacuationBlock(b, t);
540 break;
541 }
542 b = next;
543 }
544 #ifdef CACHE_STAT_PAGES
545 stat_cache_vcs.remove(cont, cont->stat_link);
546 ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
547 #endif
548 return 1;
549 }
550
551 // Cache Processor
552
553 int
start(int,size_t)554 CacheProcessor::start(int, size_t)
555 {
556 return start_internal(0);
557 }
558
559 static const int DEFAULT_CACHE_OPTIONS = (O_RDWR);
560
561 int
start_internal(int flags)562 CacheProcessor::start_internal(int flags)
563 {
564 ink_assert((int)TS_EVENT_CACHE_OPEN_READ == (int)CACHE_EVENT_OPEN_READ);
565 ink_assert((int)TS_EVENT_CACHE_OPEN_READ_FAILED == (int)CACHE_EVENT_OPEN_READ_FAILED);
566 ink_assert((int)TS_EVENT_CACHE_OPEN_WRITE == (int)CACHE_EVENT_OPEN_WRITE);
567 ink_assert((int)TS_EVENT_CACHE_OPEN_WRITE_FAILED == (int)CACHE_EVENT_OPEN_WRITE_FAILED);
568 ink_assert((int)TS_EVENT_CACHE_REMOVE == (int)CACHE_EVENT_REMOVE);
569 ink_assert((int)TS_EVENT_CACHE_REMOVE_FAILED == (int)CACHE_EVENT_REMOVE_FAILED);
570 ink_assert((int)TS_EVENT_CACHE_SCAN == (int)CACHE_EVENT_SCAN);
571 ink_assert((int)TS_EVENT_CACHE_SCAN_FAILED == (int)CACHE_EVENT_SCAN_FAILED);
572 ink_assert((int)TS_EVENT_CACHE_SCAN_OBJECT == (int)CACHE_EVENT_SCAN_OBJECT);
573 ink_assert((int)TS_EVENT_CACHE_SCAN_OPERATION_BLOCKED == (int)CACHE_EVENT_SCAN_OPERATION_BLOCKED);
574 ink_assert((int)TS_EVENT_CACHE_SCAN_OPERATION_FAILED == (int)CACHE_EVENT_SCAN_OPERATION_FAILED);
575 ink_assert((int)TS_EVENT_CACHE_SCAN_DONE == (int)CACHE_EVENT_SCAN_DONE);
576
577 #if AIO_MODE == AIO_MODE_NATIVE
578 int etype = ET_NET;
579 int n_netthreads = eventProcessor.n_threads_for_type[etype];
580 EThread **netthreads = eventProcessor.eventthread[etype];
581 for (int i = 0; i < n_netthreads; ++i) {
582 netthreads[i]->diskHandler = new DiskHandler();
583 netthreads[i]->schedule_imm(netthreads[i]->diskHandler);
584 }
585 #endif
586
587 start_internal_flags = flags;
588 clear = !!(flags & PROCESSOR_RECONFIGURE) || auto_clear_flag;
589 fix = !!(flags & PROCESSOR_FIX);
590 check = (flags & PROCESSOR_CHECK) != 0;
591 start_done = 0;
592
593 /* read the config file and create the data structures corresponding
594 to the file */
595 gndisks = theCacheStore.n_disks;
596 gdisks = static_cast<CacheDisk **>(ats_malloc(gndisks * sizeof(CacheDisk *)));
597
598 // Temporaries to carry values between loops
599 char **paths = static_cast<char **>(alloca(sizeof(char *) * gndisks));
600 memset(paths, 0, sizeof(char *) * gndisks);
601 int *fds = static_cast<int *>(alloca(sizeof(int) * gndisks));
602 memset(fds, 0, sizeof(int) * gndisks);
603 int *sector_sizes = static_cast<int *>(alloca(sizeof(int) * gndisks));
604 memset(sector_sizes, 0, sizeof(int) * gndisks);
605 Span **sds = static_cast<Span **>(alloca(sizeof(Span *) * gndisks));
606 memset(sds, 0, sizeof(Span *) * gndisks);
607
608 gndisks = 0;
609 ink_aio_set_callback(new AIO_Callback_handler());
610
611 config_volumes.read_config_file();
612
613 /*
614 create CacheDisk objects for each span in the configuration file and store in gdisks
615 */
616 for (unsigned i = 0; i < theCacheStore.n_disks; i++) {
617 Span *sd = theCacheStore.disk[i];
618 int opts = DEFAULT_CACHE_OPTIONS;
619
620 if (!paths[gndisks]) {
621 paths[gndisks] = static_cast<char *>(alloca(PATH_NAME_MAX));
622 }
623 ink_strlcpy(paths[gndisks], sd->pathname, PATH_NAME_MAX);
624 if (!sd->file_pathname) {
625 ink_strlcat(paths[gndisks], "/cache.db", PATH_NAME_MAX);
626 opts |= O_CREAT;
627 }
628
629 #ifdef O_DIRECT
630 opts |= O_DIRECT;
631 #endif
632 #ifdef O_DSYNC
633 opts |= O_DSYNC;
634 #endif
635 if (check) {
636 opts &= ~O_CREAT;
637 opts |= O_RDONLY;
638 }
639
640 int fd = open(paths[gndisks], opts, 0644);
641 int64_t blocks = sd->blocks;
642
643 if (fd < 0 && (opts & O_CREAT)) { // Try without O_DIRECT if this is a file on filesystem, e.g. tmpfs.
644 fd = open(paths[gndisks], DEFAULT_CACHE_OPTIONS | O_CREAT, 0644);
645 }
646
647 if (fd >= 0) {
648 bool diskok = true;
649 if (!sd->file_pathname) {
650 if (!check) {
651 if (ftruncate(fd, blocks * STORE_BLOCK_SIZE) < 0) {
652 Warning("unable to truncate cache file '%s' to %" PRId64 " blocks", paths[gndisks], blocks);
653 diskok = false;
654 }
655 } else { // read-only mode checks
656 struct stat sbuf;
657 if (-1 == fstat(fd, &sbuf)) {
658 fprintf(stderr, "Failed to stat cache file for directory %s\n", paths[gndisks]);
659 diskok = false;
660 } else if (blocks != sbuf.st_size / STORE_BLOCK_SIZE) {
661 fprintf(stderr, "Cache file for directory %s is %" PRId64 " bytes, expected %" PRId64 "\n", paths[gndisks],
662 sbuf.st_size, blocks * static_cast<int64_t>(STORE_BLOCK_SIZE));
663 diskok = false;
664 }
665 }
666 }
667 if (diskok) {
668 int sector_size = sd->hw_sector_size;
669
670 gdisks[gndisks] = new CacheDisk();
671 if (check) {
672 gdisks[gndisks]->read_only_p = true;
673 }
674 gdisks[gndisks]->forced_volume_num = sd->forced_volume_num;
675 if (sd->hash_base_string) {
676 gdisks[gndisks]->hash_base_string = ats_strdup(sd->hash_base_string);
677 }
678
679 if (sector_size < cache_config_force_sector_size) {
680 sector_size = cache_config_force_sector_size;
681 }
682
683 // It's actually common that the hardware I/O size is larger than the store block size as
684 // storage systems increasingly want larger I/Os. For example, on macOS, the filesystem block
685 // size is always reported as 1MB.
686 if (sd->hw_sector_size <= 0 || sector_size > STORE_BLOCK_SIZE) {
687 Note("resetting hardware sector size from %d to %d", sector_size, STORE_BLOCK_SIZE);
688 sector_size = STORE_BLOCK_SIZE;
689 }
690 sector_sizes[gndisks] = sector_size;
691 fds[gndisks] = fd;
692 sds[gndisks] = sd;
693 fd = -1;
694 gndisks++;
695 }
696 } else {
697 if (errno == EINVAL) {
698 Warning("cache unable to open '%s': It must be placed on a file system that supports direct I/O.", paths[gndisks]);
699 } else {
700 Warning("cache unable to open '%s': %s", paths[gndisks], strerror(errno));
701 }
702 }
703 if (fd >= 0) {
704 close(fd);
705 }
706 }
707
708 // Before we kick off asynchronous operations, make sure sufficient disks are available and we don't just shutdown
709 // Exiting with background threads in operation will likely cause a seg fault
710 start_done = 1;
711
712 if (gndisks == 0) {
713 CacheProcessor::initialized = CACHE_INIT_FAILED;
714 // Have to do this here because no IO events were scheduled and so @c diskInitialized() won't be called.
715 if (cb_after_init) {
716 cb_after_init();
717 }
718
719 if (this->waitForCache() > 1) {
720 Emergency("Cache initialization failed - no disks available but cache required");
721 } else {
722 Warning("unable to open cache disk(s): Cache Disabled\n");
723 return -1; // pointless, AFAICT this is ignored.
724 }
725 } else if (this->waitForCache() == 3 && static_cast<unsigned int>(gndisks) < theCacheStore.n_disks_in_config) {
726 CacheProcessor::initialized = CACHE_INIT_FAILED;
727 if (cb_after_init) {
728 cb_after_init();
729 }
730 Emergency("Cache initialization failed - only %d out of %d disks were valid and all were required.", gndisks,
731 theCacheStore.n_disks_in_config);
732 } else if (this->waitForCache() == 2 && static_cast<unsigned int>(gndisks) < theCacheStore.n_disks_in_config) {
733 Warning("Cache initialization incomplete - only %d out of %d disks were valid.", gndisks, theCacheStore.n_disks_in_config);
734 }
735
736 // If we got here, we have enough disks to proceed
737 for (int j = 0; j < gndisks; j++) {
738 Span *sd = sds[j];
739 ink_release_assert(sds[j] != nullptr); // Defeat clang-analyzer
740 off_t skip = ROUND_TO_STORE_BLOCK((sd->offset < START_POS ? START_POS + sd->alignment : sd->offset));
741 int64_t blocks = sd->blocks - (skip >> STORE_BLOCK_SHIFT);
742 #if AIO_MODE == AIO_MODE_NATIVE
743 eventProcessor.schedule_imm(new DiskInit(gdisks[j], paths[j], blocks, skip, sector_sizes[j], fds[j], clear));
744 #else
745 gdisks[j]->open(paths[j], blocks, skip, sector_sizes[j], fds[j], clear);
746 #endif
747
748 Debug("cache_hosting", "Disk: %d:%s, blocks: %" PRId64 "", gndisks, paths[j], blocks);
749 }
750
751 return 0;
752 }
753
754 void
diskInitialized()755 CacheProcessor::diskInitialized()
756 {
757 int n_init = initialize_disk++;
758 int bad_disks = 0;
759 int res = 0;
760 int i;
761
762 // Wait for all the cache disks are initialized
763 if (n_init != gndisks - 1) {
764 return;
765 }
766
767 // Check and remove bad disks from gdisks[]
768 for (i = 0; i < gndisks; i++) {
769 if (DISK_BAD(gdisks[i])) {
770 delete gdisks[i];
771 gdisks[i] = nullptr;
772 bad_disks++;
773 } else if (bad_disks > 0) {
774 gdisks[i - bad_disks] = gdisks[i];
775 gdisks[i] = nullptr;
776 }
777 }
778 if (bad_disks > 0) {
779 // Update the number of available cache disks
780 gndisks -= bad_disks;
781 // Check if this is a fatal error
782 if (this->waitForCache() == 3 || (0 == gndisks && this->waitForCache() == 2)) {
783 // This could be passed off to @c cacheInitialized (as with volume config problems) but I think
784 // the more specific error message here is worth the extra code.
785 CacheProcessor::initialized = CACHE_INIT_FAILED;
786 if (cb_after_init) {
787 cb_after_init();
788 }
789 Emergency("Cache initialization failed - only %d of %d disks were available.", gndisks, theCacheStore.n_disks_in_config);
790 } else if (this->waitForCache() == 2) {
791 Warning("Cache initialization incomplete - only %d of %d disks were available.", gndisks, theCacheStore.n_disks_in_config);
792 }
793 }
794
795 /* Practically just took all bad_disks offline so update the stats. */
796 RecSetGlobalRawStatSum(cache_rsb, cache_span_offline_stat, bad_disks);
797 RecIncrGlobalRawStat(cache_rsb, cache_span_failing_stat, -bad_disks);
798 RecSetGlobalRawStatSum(cache_rsb, cache_span_online_stat, gndisks);
799
800 /* create the cachevol list only if num volumes are greater than 0. */
801 if (config_volumes.num_volumes == 0) {
802 /* if no volumes, default to just an http cache */
803 res = cplist_reconfigure();
804 } else {
805 // else
806 /* create the cachevol list. */
807 cplist_init();
808 /* now change the cachevol list based on the config file */
809 res = cplist_reconfigure();
810 }
811
812 if (res == -1) {
813 /* problems initializing the volume.config. Punt */
814 gnvol = 0;
815 cacheInitialized();
816 return;
817 } else {
818 CacheVol *cp = cp_list.head;
819 for (; cp; cp = cp->link.next) {
820 cp->vol_rsb = RecAllocateRawStatBlock(static_cast<int>(cache_stat_count));
821 char vol_stat_str_prefix[256];
822 snprintf(vol_stat_str_prefix, sizeof(vol_stat_str_prefix), "proxy.process.cache.volume_%d", cp->vol_number);
823 register_cache_stats(cp->vol_rsb, vol_stat_str_prefix);
824 }
825 }
826
827 gvol = static_cast<Vol **>(ats_malloc(gnvol * sizeof(Vol *)));
828 memset(gvol, 0, gnvol * sizeof(Vol *));
829 gnvol = 0;
830 for (i = 0; i < gndisks; i++) {
831 CacheDisk *d = gdisks[i];
832 if (is_debug_tag_set("cache_hosting")) {
833 int j;
834 Debug("cache_hosting", "Disk: %d:%s: Vol Blocks: %u: Free space: %" PRIu64, i, d->path, d->header->num_diskvol_blks,
835 d->free_space);
836 for (j = 0; j < static_cast<int>(d->header->num_volumes); j++) {
837 Debug("cache_hosting", "\tVol: %d Size: %" PRIu64, d->disk_vols[j]->vol_number, d->disk_vols[j]->size);
838 }
839 for (j = 0; j < static_cast<int>(d->header->num_diskvol_blks); j++) {
840 Debug("cache_hosting", "\tBlock No: %d Size: %" PRIu64 " Free: %u", d->header->vol_info[j].number,
841 d->header->vol_info[j].len, d->header->vol_info[j].free);
842 }
843 }
844 if (!check) {
845 d->sync();
846 }
847 }
848 if (config_volumes.num_volumes == 0) {
849 theCache = new Cache();
850 theCache->scheme = CACHE_HTTP_TYPE;
851 theCache->open(clear, fix);
852 return;
853 }
854 if (config_volumes.num_http_volumes != 0) {
855 theCache = new Cache();
856 theCache->scheme = CACHE_HTTP_TYPE;
857 theCache->open(clear, fix);
858 }
859 }
860
861 void
cacheInitialized()862 CacheProcessor::cacheInitialized()
863 {
864 int i;
865
866 if (theCache && (theCache->ready == CACHE_INITIALIZING)) {
867 return;
868 }
869
870 int caches_ready = 0;
871 int cache_init_ok = 0;
872 /* allocate ram size in proportion to the disk space the
873 volume occupies */
874 int64_t total_size = 0; // count in HTTP & MIXT
875 uint64_t total_cache_bytes = 0; // bytes that can used in total_size
876 uint64_t total_direntries = 0; // all the direntries in the cache
877 uint64_t used_direntries = 0; // and used
878 uint64_t vol_total_cache_bytes = 0;
879 uint64_t vol_total_direntries = 0;
880 uint64_t vol_used_direntries = 0;
881 Vol *vol;
882
883 ProxyMutex *mutex = this_ethread()->mutex.get();
884
885 if (theCache) {
886 total_size += theCache->cache_size;
887 Debug("cache_init", "CacheProcessor::cacheInitialized - theCache, total_size = %" PRId64 " = %" PRId64 " MB", total_size,
888 total_size / ((1024 * 1024) / STORE_BLOCK_SIZE));
889 if (theCache->ready == CACHE_INIT_FAILED) {
890 Debug("cache_init", "CacheProcessor::cacheInitialized - failed to initialize the cache for http: cache disabled");
891 Warning("failed to initialize the cache for http: cache disabled\n");
892 } else {
893 caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_HTTP);
894 caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_NONE);
895 caches[CACHE_FRAG_TYPE_HTTP] = theCache;
896 caches[CACHE_FRAG_TYPE_NONE] = theCache;
897 }
898 }
899
900 // Update stripe version data.
901 if (gnvol) { // start with whatever the first stripe is.
902 cacheProcessor.min_stripe_version = cacheProcessor.max_stripe_version = gvol[0]->header->version;
903 }
904 // scan the rest of the stripes.
905 for (i = 1; i < gnvol; i++) {
906 Vol *v = gvol[i];
907 if (v->header->version < cacheProcessor.min_stripe_version) {
908 cacheProcessor.min_stripe_version = v->header->version;
909 }
910 if (cacheProcessor.max_stripe_version < v->header->version) {
911 cacheProcessor.max_stripe_version = v->header->version;
912 }
913 }
914
915 if (caches_ready) {
916 Debug("cache_init", "CacheProcessor::cacheInitialized - caches_ready=0x%0X, gnvol=%d", (unsigned int)caches_ready,
917 gnvol.load());
918
919 int64_t ram_cache_bytes = 0;
920
921 if (gnvol) {
922 // new ram_caches, with algorithm from the config
923 for (i = 0; i < gnvol; i++) {
924 switch (cache_config_ram_cache_algorithm) {
925 default:
926 case RAM_CACHE_ALGORITHM_CLFUS:
927 gvol[i]->ram_cache = new_RamCacheCLFUS();
928 break;
929 case RAM_CACHE_ALGORITHM_LRU:
930 gvol[i]->ram_cache = new_RamCacheLRU();
931 break;
932 }
933 }
934 // let us calculate the Size
935 if (cache_config_ram_cache_size == AUTO_SIZE_RAM_CACHE) {
936 Debug("cache_init", "CacheProcessor::cacheInitialized - cache_config_ram_cache_size == AUTO_SIZE_RAM_CACHE");
937 for (i = 0; i < gnvol; i++) {
938 vol = gvol[i];
939
940 if (gvol[i]->cache_vol->ramcache_enabled) {
941 gvol[i]->ram_cache->init(vol->dirlen() * DEFAULT_RAM_CACHE_MULTIPLIER, vol);
942 ram_cache_bytes += gvol[i]->dirlen();
943 Debug("cache_init", "CacheProcessor::cacheInitialized - ram_cache_bytes = %" PRId64 " = %" PRId64 "Mb", ram_cache_bytes,
944 ram_cache_bytes / (1024 * 1024));
945 CACHE_VOL_SUM_DYN_STAT(cache_ram_cache_bytes_total_stat, (int64_t)gvol[i]->dirlen());
946 }
947 vol_total_cache_bytes = gvol[i]->len - gvol[i]->dirlen();
948 total_cache_bytes += vol_total_cache_bytes;
949 Debug("cache_init", "CacheProcessor::cacheInitialized - total_cache_bytes = %" PRId64 " = %" PRId64 "Mb",
950 total_cache_bytes, total_cache_bytes / (1024 * 1024));
951
952 CACHE_VOL_SUM_DYN_STAT(cache_bytes_total_stat, vol_total_cache_bytes);
953
954 vol_total_direntries = gvol[i]->buckets * gvol[i]->segments * DIR_DEPTH;
955 total_direntries += vol_total_direntries;
956 CACHE_VOL_SUM_DYN_STAT(cache_direntries_total_stat, vol_total_direntries);
957
958 vol_used_direntries = dir_entries_used(gvol[i]);
959 CACHE_VOL_SUM_DYN_STAT(cache_direntries_used_stat, vol_used_direntries);
960 used_direntries += vol_used_direntries;
961 }
962
963 } else {
964 // we got configured memory size
965 // TODO, should we check the available system memories, or you will
966 // OOM or swapout, that is not a good situation for the server
967 Debug("cache_init", "CacheProcessor::cacheInitialized - %" PRId64 " != AUTO_SIZE_RAM_CACHE", cache_config_ram_cache_size);
968 int64_t http_ram_cache_size =
969 (theCache) ?
970 static_cast<int64_t>((static_cast<double>(theCache->cache_size) / total_size) * cache_config_ram_cache_size) :
971 0;
972 Debug("cache_init", "CacheProcessor::cacheInitialized - http_ram_cache_size = %" PRId64 " = %" PRId64 "Mb",
973 http_ram_cache_size, http_ram_cache_size / (1024 * 1024));
974 int64_t stream_ram_cache_size = cache_config_ram_cache_size - http_ram_cache_size;
975 Debug("cache_init", "CacheProcessor::cacheInitialized - stream_ram_cache_size = %" PRId64 " = %" PRId64 "Mb",
976 stream_ram_cache_size, stream_ram_cache_size / (1024 * 1024));
977
978 // Dump some ram_cache size information in debug mode.
979 Debug("ram_cache", "config: size = %" PRId64 ", cutoff = %" PRId64 "", cache_config_ram_cache_size,
980 cache_config_ram_cache_cutoff);
981
982 for (i = 0; i < gnvol; i++) {
983 vol = gvol[i];
984 double factor;
985 if (gvol[i]->cache == theCache && gvol[i]->cache_vol->ramcache_enabled) {
986 ink_assert(gvol[i]->cache != nullptr);
987 factor = static_cast<double>(static_cast<int64_t>(gvol[i]->len >> STORE_BLOCK_SHIFT)) / theCache->cache_size;
988 Debug("cache_init", "CacheProcessor::cacheInitialized - factor = %f", factor);
989 gvol[i]->ram_cache->init(static_cast<int64_t>(http_ram_cache_size * factor), vol);
990 ram_cache_bytes += static_cast<int64_t>(http_ram_cache_size * factor);
991 CACHE_VOL_SUM_DYN_STAT(cache_ram_cache_bytes_total_stat, (int64_t)(http_ram_cache_size * factor));
992 } else if (gvol[i]->cache_vol->ramcache_enabled) {
993 ink_release_assert(!"Unexpected non-HTTP cache volume");
994 }
995 Debug("cache_init", "CacheProcessor::cacheInitialized[%d] - ram_cache_bytes = %" PRId64 " = %" PRId64 "Mb", i,
996 ram_cache_bytes, ram_cache_bytes / (1024 * 1024));
997 vol_total_cache_bytes = gvol[i]->len - gvol[i]->dirlen();
998 total_cache_bytes += vol_total_cache_bytes;
999 CACHE_VOL_SUM_DYN_STAT(cache_bytes_total_stat, vol_total_cache_bytes);
1000 Debug("cache_init", "CacheProcessor::cacheInitialized - total_cache_bytes = %" PRId64 " = %" PRId64 "Mb",
1001 total_cache_bytes, total_cache_bytes / (1024 * 1024));
1002
1003 vol_total_direntries = gvol[i]->buckets * gvol[i]->segments * DIR_DEPTH;
1004 total_direntries += vol_total_direntries;
1005 CACHE_VOL_SUM_DYN_STAT(cache_direntries_total_stat, vol_total_direntries);
1006
1007 vol_used_direntries = dir_entries_used(gvol[i]);
1008 CACHE_VOL_SUM_DYN_STAT(cache_direntries_used_stat, vol_used_direntries);
1009 used_direntries += vol_used_direntries;
1010 }
1011 }
1012 switch (cache_config_ram_cache_compress) {
1013 default:
1014 Fatal("unknown RAM cache compression type: %d", cache_config_ram_cache_compress);
1015 case CACHE_COMPRESSION_NONE:
1016 case CACHE_COMPRESSION_FASTLZ:
1017 break;
1018 case CACHE_COMPRESSION_LIBZ:
1019 #ifndef HAVE_ZLIB_H
1020 Fatal("libz not available for RAM cache compression");
1021 #endif
1022 break;
1023 case CACHE_COMPRESSION_LIBLZMA:
1024 #ifndef HAVE_LZMA_H
1025 Fatal("lzma not available for RAM cache compression");
1026 #endif
1027 break;
1028 }
1029
1030 GLOBAL_CACHE_SET_DYN_STAT(cache_ram_cache_bytes_total_stat, ram_cache_bytes);
1031 GLOBAL_CACHE_SET_DYN_STAT(cache_bytes_total_stat, total_cache_bytes);
1032 GLOBAL_CACHE_SET_DYN_STAT(cache_direntries_total_stat, total_direntries);
1033 GLOBAL_CACHE_SET_DYN_STAT(cache_direntries_used_stat, used_direntries);
1034 if (!check) {
1035 dir_sync_init();
1036 }
1037 cache_init_ok = 1;
1038 } else {
1039 Warning("cache unable to open any vols, disabled");
1040 }
1041 }
1042 if (cache_init_ok) {
1043 // Initialize virtual cache
1044 CacheProcessor::initialized = CACHE_INITIALIZED;
1045 CacheProcessor::cache_ready = caches_ready;
1046 Note("cache enabled");
1047 } else {
1048 CacheProcessor::initialized = CACHE_INIT_FAILED;
1049 Note("cache disabled");
1050 }
1051
1052 // Fire callback to signal initialization finished.
1053 if (cb_after_init) {
1054 cb_after_init();
1055 }
1056
1057 // TS-3848
1058 if (CACHE_INIT_FAILED == CacheProcessor::initialized && cacheProcessor.waitForCache() > 1) {
1059 Emergency("Cache initialization failed with cache required, exiting.");
1060 }
1061 }
1062
1063 void
stop()1064 CacheProcessor::stop()
1065 {
1066 }
1067
1068 int
dir_check(bool afix)1069 CacheProcessor::dir_check(bool afix)
1070 {
1071 for (int i = 0; i < gnvol; i++) {
1072 gvol[i]->dir_check(afix);
1073 }
1074 return 0;
1075 }
1076
1077 int
db_check(bool afix)1078 CacheProcessor::db_check(bool afix)
1079 {
1080 for (int i = 0; i < gnvol; i++) {
1081 gvol[i]->db_check(afix);
1082 }
1083 return 0;
1084 }
1085
1086 Action *
lookup(Continuation * cont,const CacheKey * key,CacheFragType frag_type,const char * hostname,int host_len)1087 CacheProcessor::lookup(Continuation *cont, const CacheKey *key, CacheFragType frag_type, const char *hostname, int host_len)
1088 {
1089 return caches[frag_type]->lookup(cont, key, frag_type, hostname, host_len);
1090 }
1091
1092 inkcoreapi Action *
open_read(Continuation * cont,const CacheKey * key,CacheFragType frag_type,const char * hostname,int hostlen)1093 CacheProcessor::open_read(Continuation *cont, const CacheKey *key, CacheFragType frag_type, const char *hostname, int hostlen)
1094 {
1095 return caches[frag_type]->open_read(cont, key, frag_type, hostname, hostlen);
1096 }
1097
1098 inkcoreapi Action *
open_write(Continuation * cont,CacheKey * key,CacheFragType frag_type,int expected_size ATS_UNUSED,int options,time_t pin_in_cache,char * hostname,int host_len)1099 CacheProcessor::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type, int expected_size ATS_UNUSED, int options,
1100 time_t pin_in_cache, char *hostname, int host_len)
1101 {
1102 return caches[frag_type]->open_write(cont, key, frag_type, options, pin_in_cache, hostname, host_len);
1103 }
1104
1105 Action *
remove(Continuation * cont,const CacheKey * key,CacheFragType frag_type,const char * hostname,int host_len)1106 CacheProcessor::remove(Continuation *cont, const CacheKey *key, CacheFragType frag_type, const char *hostname, int host_len)
1107 {
1108 Debug("cache_remove", "[CacheProcessor::remove] Issuing cache delete for %u", cache_hash(*key));
1109 return caches[frag_type]->remove(cont, key, frag_type, hostname, host_len);
1110 }
1111
1112 Action *
lookup(Continuation * cont,const HttpCacheKey * key,CacheFragType frag_type)1113 CacheProcessor::lookup(Continuation *cont, const HttpCacheKey *key, CacheFragType frag_type)
1114 {
1115 return lookup(cont, &key->hash, frag_type, key->hostname, key->hostlen);
1116 }
1117
1118 Action *
scan(Continuation * cont,char * hostname,int host_len,int KB_per_second)1119 CacheProcessor::scan(Continuation *cont, char *hostname, int host_len, int KB_per_second)
1120 {
1121 return caches[CACHE_FRAG_TYPE_HTTP]->scan(cont, hostname, host_len, KB_per_second);
1122 }
1123
1124 int
IsCacheEnabled()1125 CacheProcessor::IsCacheEnabled()
1126 {
1127 return CacheProcessor::initialized;
1128 }
1129
1130 bool
IsCacheReady(CacheFragType type)1131 CacheProcessor::IsCacheReady(CacheFragType type)
1132 {
1133 if (IsCacheEnabled() != CACHE_INITIALIZED) {
1134 return false;
1135 }
1136 return static_cast<bool>(cache_ready & (1 << type));
1137 }
1138
1139 int
db_check(bool)1140 Vol::db_check(bool /* fix ATS_UNUSED */)
1141 {
1142 char tt[256];
1143 printf(" Data for [%s]\n", hash_text.get());
1144 printf(" Length: %" PRIu64 "\n", static_cast<uint64_t>(len));
1145 printf(" Write Position: %" PRIu64 "\n", static_cast<uint64_t>(header->write_pos - skip));
1146 printf(" Phase: %d\n", static_cast<int>(!!header->phase));
1147 ink_ctime_r(&header->create_time, tt);
1148 tt[strlen(tt) - 1] = 0;
1149 printf(" Create Time: %s\n", tt);
1150 printf(" Sync Serial: %u\n", static_cast<unsigned int>(header->sync_serial));
1151 printf(" Write Serial: %u\n", static_cast<unsigned int>(header->write_serial));
1152 printf("\n");
1153
1154 return 0;
1155 }
1156
1157 static void
vol_init_data_internal(Vol * d)1158 vol_init_data_internal(Vol *d)
1159 {
1160 // step1: calculate the number of entries.
1161 off_t total_entries = (d->len - (d->start - d->skip)) / cache_config_min_average_object_size;
1162 // step2: calculate the number of buckets
1163 off_t total_buckets = total_entries / DIR_DEPTH;
1164 // step3: calculate the number of segments, no segment has more than 16384 buckets
1165 d->segments = (total_buckets + (((1 << 16) - 1) / DIR_DEPTH)) / ((1 << 16) / DIR_DEPTH);
1166 // step4: divide total_buckets into segments on average.
1167 d->buckets = (total_buckets + d->segments - 1) / d->segments;
1168 // step5: set the start pointer.
1169 d->start = d->skip + 2 * d->dirlen();
1170 }
1171
1172 static void
vol_init_data(Vol * d)1173 vol_init_data(Vol *d)
1174 {
1175 // iteratively calculate start + buckets
1176 vol_init_data_internal(d);
1177 vol_init_data_internal(d);
1178 vol_init_data_internal(d);
1179 }
1180
1181 void
vol_init_dir(Vol * d)1182 vol_init_dir(Vol *d)
1183 {
1184 int b, s, l;
1185
1186 for (s = 0; s < d->segments; s++) {
1187 d->header->freelist[s] = 0;
1188 Dir *seg = d->dir_segment(s);
1189 for (l = 1; l < DIR_DEPTH; l++) {
1190 for (b = 0; b < d->buckets; b++) {
1191 Dir *bucket = dir_bucket(b, seg);
1192 dir_free_entry(dir_bucket_row(bucket, l), s, d);
1193 }
1194 }
1195 }
1196 }
1197
1198 void
vol_clear_init(Vol * d)1199 vol_clear_init(Vol *d)
1200 {
1201 size_t dir_len = d->dirlen();
1202 memset(d->raw_dir, 0, dir_len);
1203 vol_init_dir(d);
1204 d->header->magic = VOL_MAGIC;
1205 d->header->version._major = CACHE_DB_MAJOR_VERSION;
1206 d->header->version._minor = CACHE_DB_MINOR_VERSION;
1207 d->scan_pos = d->header->agg_pos = d->header->write_pos = d->start;
1208 d->header->last_write_pos = d->header->write_pos;
1209 d->header->phase = 0;
1210 d->header->cycle = 0;
1211 d->header->create_time = time(nullptr);
1212 d->header->dirty = 0;
1213 d->sector_size = d->header->sector_size = d->disk->hw_sector_size;
1214 *d->footer = *d->header;
1215 }
1216
1217 int
vol_dir_clear(Vol * d)1218 vol_dir_clear(Vol *d)
1219 {
1220 size_t dir_len = d->dirlen();
1221 vol_clear_init(d);
1222
1223 if (pwrite(d->fd, d->raw_dir, dir_len, d->skip) < 0) {
1224 Warning("unable to clear cache directory '%s'", d->hash_text.get());
1225 return -1;
1226 }
1227 return 0;
1228 }
1229
1230 int
clear_dir()1231 Vol::clear_dir()
1232 {
1233 size_t dir_len = this->dirlen();
1234 vol_clear_init(this);
1235
1236 SET_HANDLER(&Vol::handle_dir_clear);
1237
1238 io.aiocb.aio_fildes = fd;
1239 io.aiocb.aio_buf = raw_dir;
1240 io.aiocb.aio_nbytes = dir_len;
1241 io.aiocb.aio_offset = skip;
1242 io.action = this;
1243 io.thread = AIO_CALLBACK_THREAD_ANY;
1244 io.then = nullptr;
1245 ink_assert(ink_aio_write(&io));
1246 return 0;
1247 }
1248
1249 int
init(char * s,off_t blocks,off_t dir_skip,bool clear)1250 Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear)
1251 {
1252 char *seed_str = disk->hash_base_string ? disk->hash_base_string : s;
1253 const size_t hash_seed_size = strlen(seed_str);
1254 const size_t hash_text_size = hash_seed_size + 32;
1255
1256 hash_text = static_cast<char *>(ats_malloc(hash_text_size));
1257 ink_strlcpy(hash_text, seed_str, hash_text_size);
1258 snprintf(hash_text + hash_seed_size, (hash_text_size - hash_seed_size), " %" PRIu64 ":%" PRIu64 "",
1259 static_cast<uint64_t>(dir_skip), static_cast<uint64_t>(blocks));
1260 CryptoContext().hash_immediate(hash_id, hash_text, strlen(hash_text));
1261
1262 dir_skip = ROUND_TO_STORE_BLOCK((dir_skip < START_POS ? START_POS : dir_skip));
1263 path = ats_strdup(s);
1264 len = blocks * STORE_BLOCK_SIZE;
1265 ink_assert(len <= MAX_VOL_SIZE);
1266 skip = dir_skip;
1267 prev_recover_pos = 0;
1268
1269 // successive approximation, directory/meta data eats up some storage
1270 start = dir_skip;
1271 vol_init_data(this);
1272 data_blocks = (len - (start - skip)) / STORE_BLOCK_SIZE;
1273 hit_evacuate_window = (data_blocks * cache_config_hit_evacuate_percent) / 100;
1274
1275 evacuate_size = static_cast<int>(len / EVACUATION_BUCKET_SIZE) + 2;
1276 int evac_len = evacuate_size * sizeof(DLL<EvacuationBlock>);
1277 evacuate = static_cast<DLL<EvacuationBlock> *>(ats_malloc(evac_len));
1278 memset(static_cast<void *>(evacuate), 0, evac_len);
1279
1280 Debug("cache_init", "Vol %s: allocating %zu directory bytes for a %lld byte volume (%lf%%)", hash_text.get(), dirlen(),
1281 (long long)this->len, (double)dirlen() / (double)this->len * 100.0);
1282
1283 raw_dir = nullptr;
1284 if (ats_hugepage_enabled()) {
1285 raw_dir = static_cast<char *>(ats_alloc_hugepage(this->dirlen()));
1286 }
1287 if (raw_dir == nullptr) {
1288 raw_dir = static_cast<char *>(ats_memalign(ats_pagesize(), this->dirlen()));
1289 }
1290
1291 dir = reinterpret_cast<Dir *>(raw_dir + this->headerlen());
1292 header = reinterpret_cast<VolHeaderFooter *>(raw_dir);
1293 footer = reinterpret_cast<VolHeaderFooter *>(raw_dir + this->dirlen() - ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter)));
1294
1295 if (clear) {
1296 Note("clearing cache directory '%s'", hash_text.get());
1297 return clear_dir();
1298 }
1299
1300 init_info = new VolInitInfo();
1301 int footerlen = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
1302 off_t footer_offset = this->dirlen() - footerlen;
1303 // try A
1304 off_t as = skip;
1305
1306 Debug("cache_init", "reading directory '%s'", hash_text.get());
1307 SET_HANDLER(&Vol::handle_header_read);
1308 init_info->vol_aio[0].aiocb.aio_offset = as;
1309 init_info->vol_aio[1].aiocb.aio_offset = as + footer_offset;
1310 off_t bs = skip + this->dirlen();
1311 init_info->vol_aio[2].aiocb.aio_offset = bs;
1312 init_info->vol_aio[3].aiocb.aio_offset = bs + footer_offset;
1313
1314 for (unsigned i = 0; i < countof(init_info->vol_aio); i++) {
1315 AIOCallback *aio = &(init_info->vol_aio[i]);
1316 aio->aiocb.aio_fildes = fd;
1317 aio->aiocb.aio_buf = &(init_info->vol_h_f[i * STORE_BLOCK_SIZE]);
1318 aio->aiocb.aio_nbytes = footerlen;
1319 aio->action = this;
1320 aio->thread = AIO_CALLBACK_THREAD_ANY;
1321 aio->then = (i < 3) ? &(init_info->vol_aio[i + 1]) : nullptr;
1322 }
1323 #if AIO_MODE == AIO_MODE_NATIVE
1324 ink_assert(ink_aio_readv(init_info->vol_aio));
1325 #else
1326 ink_assert(ink_aio_read(init_info->vol_aio));
1327 #endif
1328 return 0;
1329 }
1330
1331 int
handle_dir_clear(int event,void * data)1332 Vol::handle_dir_clear(int event, void *data)
1333 {
1334 size_t dir_len = this->dirlen();
1335 AIOCallback *op;
1336
1337 if (event == AIO_EVENT_DONE) {
1338 op = static_cast<AIOCallback *>(data);
1339 if (static_cast<size_t>(op->aio_result) != op->aiocb.aio_nbytes) {
1340 Warning("unable to clear cache directory '%s'", hash_text.get());
1341 disk->incrErrors(op);
1342 fd = -1;
1343 }
1344
1345 if (op->aiocb.aio_nbytes == dir_len) {
1346 /* clear the header for directory B. We don't need to clear the
1347 whole of directory B. The header for directory B starts at
1348 skip + len */
1349 op->aiocb.aio_nbytes = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
1350 op->aiocb.aio_offset = skip + dir_len;
1351 ink_assert(ink_aio_write(op));
1352 return EVENT_DONE;
1353 }
1354 set_io_not_in_progress();
1355 SET_HANDLER(&Vol::dir_init_done);
1356 dir_init_done(EVENT_IMMEDIATE, nullptr);
1357 /* mark the volume as bad */
1358 }
1359 return EVENT_DONE;
1360 }
1361
1362 int
handle_dir_read(int event,void * data)1363 Vol::handle_dir_read(int event, void *data)
1364 {
1365 AIOCallback *op = static_cast<AIOCallback *>(data);
1366
1367 if (event == AIO_EVENT_DONE) {
1368 if (static_cast<size_t>(op->aio_result) != op->aiocb.aio_nbytes) {
1369 Note("Directory read failed: clearing cache directory %s", this->hash_text.get());
1370 clear_dir();
1371 return EVENT_DONE;
1372 }
1373 }
1374
1375 if (!(header->magic == VOL_MAGIC && footer->magic == VOL_MAGIC && CACHE_DB_MAJOR_VERSION_COMPATIBLE <= header->version._major &&
1376 header->version._major <= CACHE_DB_MAJOR_VERSION)) {
1377 Warning("bad footer in cache directory for '%s', clearing", hash_text.get());
1378 Note("VOL_MAGIC %d\n header magic: %d\n footer_magic %d\n CACHE_DB_MAJOR_VERSION_COMPATIBLE %d\n major version %d\n"
1379 "CACHE_DB_MAJOR_VERSION %d\n",
1380 VOL_MAGIC, header->magic, footer->magic, CACHE_DB_MAJOR_VERSION_COMPATIBLE, header->version._major,
1381 CACHE_DB_MAJOR_VERSION);
1382 Note("clearing cache directory '%s'", hash_text.get());
1383 clear_dir();
1384 return EVENT_DONE;
1385 }
1386 CHECK_DIR(this);
1387
1388 sector_size = header->sector_size;
1389
1390 return this->recover_data();
1391 }
1392
1393 int
recover_data()1394 Vol::recover_data()
1395 {
1396 SET_HANDLER(&Vol::handle_recover_from_data);
1397 return handle_recover_from_data(EVENT_IMMEDIATE, nullptr);
1398 }
1399
1400 /*
1401 Philosophy: The idea is to find the region of disk that could be
1402 inconsistent and remove all directory entries pointing to that potentially
1403 inconsistent region.
1404 Start from a consistent position (the write_pos of the last directory
1405 synced to disk) and scan forward. Two invariants for docs that were
1406 written to the disk after the directory was synced:
1407
1408 1. doc->magic == DOC_MAGIC
1409
1410 The following two cases happen only when the previous generation
1411 documents are aligned with the current ones.
1412
1413 2. All the docs written to the disk
1414 after the directory was synced will have their sync_serial <=
1415 header->sync_serial + 1, because the write aggregation can take
1416 indeterminate amount of time to sync. The doc->sync_serial can be
1417 equal to header->sync_serial + 1, because we increment the sync_serial
1418 before we sync the directory to disk.
1419
1420 3. The doc->sync_serial will always increase. If doc->sync_serial
1421 decreases, the document was written in the previous phase
1422
1423 If either of these conditions fail and we are not too close to the end
1424 (see the next comment ) then we're done
1425
1426 We actually start from header->last_write_pos instead of header->write_pos
1427 to make sure that we haven't wrapped around the whole disk without
1428 syncing the directory. Since the sync serial is 60 seconds, it is
1429 entirely possible to write through the whole cache without
1430 once syncing the directory. In this case, we need to clear the
1431 cache.The documents written right before we synced the
1432 directory to disk should have the write_serial <= header->sync_serial.
1433
1434 */
1435
1436 int
handle_recover_from_data(int event,void *)1437 Vol::handle_recover_from_data(int event, void * /* data ATS_UNUSED */)
1438 {
1439 uint32_t got_len = 0;
1440 uint32_t max_sync_serial = header->sync_serial;
1441 char *s, *e;
1442 if (event == EVENT_IMMEDIATE) {
1443 if (header->sync_serial == 0) {
1444 io.aiocb.aio_buf = nullptr;
1445 SET_HANDLER(&Vol::handle_recover_write_dir);
1446 return handle_recover_write_dir(EVENT_IMMEDIATE, nullptr);
1447 }
1448 // initialize
1449 recover_wrapped = false;
1450 last_sync_serial = 0;
1451 last_write_serial = 0;
1452 recover_pos = header->last_write_pos;
1453 if (recover_pos >= skip + len) {
1454 recover_wrapped = true;
1455 recover_pos = start;
1456 }
1457 io.aiocb.aio_buf = static_cast<char *>(ats_memalign(ats_pagesize(), RECOVERY_SIZE));
1458 io.aiocb.aio_nbytes = RECOVERY_SIZE;
1459 if (static_cast<off_t>(recover_pos + io.aiocb.aio_nbytes) > static_cast<off_t>(skip + len)) {
1460 io.aiocb.aio_nbytes = (skip + len) - recover_pos;
1461 }
1462 } else if (event == AIO_EVENT_DONE) {
1463 if (io.aiocb.aio_nbytes != static_cast<size_t>(io.aio_result)) {
1464 Warning("disk read error on recover '%s', clearing", hash_text.get());
1465 disk->incrErrors(&io);
1466 goto Lclear;
1467 }
1468 if (io.aiocb.aio_offset == header->last_write_pos) {
1469 /* check that we haven't wrapped around without syncing
1470 the directory. Start from last_write_serial (write pos the documents
1471 were written to just before syncing the directory) and make sure
1472 that all documents have write_serial <= header->write_serial.
1473 */
1474 uint32_t to_check = header->write_pos - header->last_write_pos;
1475 ink_assert(to_check && to_check < (uint32_t)io.aiocb.aio_nbytes);
1476 uint32_t done = 0;
1477 s = static_cast<char *>(io.aiocb.aio_buf);
1478 while (done < to_check) {
1479 Doc *doc = reinterpret_cast<Doc *>(s + done);
1480 if (doc->magic != DOC_MAGIC || doc->write_serial > header->write_serial) {
1481 Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
1482 goto Lclear;
1483 }
1484 done += round_to_approx_size(doc->len);
1485 if (doc->sync_serial > last_write_serial) {
1486 last_sync_serial = doc->sync_serial;
1487 }
1488 }
1489 ink_assert(done == to_check);
1490
1491 got_len = io.aiocb.aio_nbytes - done;
1492 recover_pos += io.aiocb.aio_nbytes;
1493 s = static_cast<char *>(io.aiocb.aio_buf) + done;
1494 e = s + got_len;
1495 } else {
1496 got_len = io.aiocb.aio_nbytes;
1497 recover_pos += io.aiocb.aio_nbytes;
1498 s = static_cast<char *>(io.aiocb.aio_buf);
1499 e = s + got_len;
1500 }
1501 }
1502 // examine what we got
1503 if (got_len) {
1504 Doc *doc = nullptr;
1505
1506 if (recover_wrapped && start == io.aiocb.aio_offset) {
1507 doc = reinterpret_cast<Doc *>(s);
1508 if (doc->magic != DOC_MAGIC || doc->write_serial < last_write_serial) {
1509 recover_pos = skip + len - EVACUATION_SIZE;
1510 goto Ldone;
1511 }
1512 }
1513
1514 // If execution reaches here, then @c got_len > 0 and e == s + got_len therefore s < e
1515 // clang analyzer can't figure this out, so be explicit.
1516 ink_assert(s < e);
1517 while (s < e) {
1518 doc = reinterpret_cast<Doc *>(s);
1519
1520 if (doc->magic != DOC_MAGIC || doc->sync_serial != last_sync_serial) {
1521 if (doc->magic == DOC_MAGIC) {
1522 if (doc->sync_serial > header->sync_serial) {
1523 max_sync_serial = doc->sync_serial;
1524 }
1525
1526 /*
1527 doc->magic == DOC_MAGIC, but doc->sync_serial != last_sync_serial
1528 This might happen in the following situations
1529 1. We are starting off recovery. In this case the
1530 last_sync_serial == header->sync_serial, but the doc->sync_serial
1531 can be anywhere in the range (0, header->sync_serial + 1]
1532 If this is the case, update last_sync_serial and continue;
1533
1534 2. A dir sync started between writing documents to the
1535 aggregation buffer and hence the doc->sync_serial went up.
1536 If the doc->sync_serial is greater than the last
1537 sync serial and less than (header->sync_serial + 2) then
1538 continue;
1539
1540 3. If the position we are recovering from is within AGG_SIZE
1541 from the disk end, then we can't trust this document. The
1542 aggregation buffer might have been larger than the remaining space
1543 at the end and we decided to wrap around instead of writing
1544 anything at that point. In this case, wrap around and start
1545 from the beginning.
1546
1547 If neither of these 3 cases happen, then we are indeed done.
1548
1549 */
1550
1551 // case 1
1552 // case 2
1553 if (doc->sync_serial > last_sync_serial && doc->sync_serial <= header->sync_serial + 1) {
1554 last_sync_serial = doc->sync_serial;
1555 s += round_to_approx_size(doc->len);
1556 continue;
1557 }
1558 // case 3 - we have already recovered some data and
1559 // (doc->sync_serial < last_sync_serial) ||
1560 // (doc->sync_serial > header->sync_serial + 1).
1561 // if we are too close to the end, wrap around
1562 else if (recover_pos - (e - s) > (skip + len) - AGG_SIZE) {
1563 recover_wrapped = true;
1564 recover_pos = start;
1565 io.aiocb.aio_nbytes = RECOVERY_SIZE;
1566
1567 break;
1568 }
1569 // we are done. This doc was written in the earlier phase
1570 recover_pos -= e - s;
1571 goto Ldone;
1572 } else {
1573 // doc->magic != DOC_MAGIC
1574 // If we are in the danger zone - recover_pos is within AGG_SIZE
1575 // from the end, then wrap around
1576 recover_pos -= e - s;
1577 if (recover_pos > (skip + len) - AGG_SIZE) {
1578 recover_wrapped = true;
1579 recover_pos = start;
1580 io.aiocb.aio_nbytes = RECOVERY_SIZE;
1581
1582 break;
1583 }
1584 // we ar not in the danger zone
1585 goto Ldone;
1586 }
1587 }
1588 // doc->magic == DOC_MAGIC && doc->sync_serial == last_sync_serial
1589 last_write_serial = doc->write_serial;
1590 s += round_to_approx_size(doc->len);
1591 }
1592
1593 /* if (s > e) then we gone through RECOVERY_SIZE; we need to
1594 read more data off disk and continue recovering */
1595 if (s >= e) {
1596 /* In the last iteration, we increment s by doc->len...need to undo
1597 that change */
1598 if (s > e) {
1599 s -= round_to_approx_size(doc->len);
1600 }
1601 recover_pos -= e - s;
1602 if (recover_pos >= skip + len) {
1603 recover_wrapped = true;
1604 recover_pos = start;
1605 }
1606 io.aiocb.aio_nbytes = RECOVERY_SIZE;
1607 if (static_cast<off_t>(recover_pos + io.aiocb.aio_nbytes) > static_cast<off_t>(skip + len)) {
1608 io.aiocb.aio_nbytes = (skip + len) - recover_pos;
1609 }
1610 }
1611 }
1612 if (recover_pos == prev_recover_pos) { // this should never happen, but if it does break the loop
1613 goto Lclear;
1614 }
1615 prev_recover_pos = recover_pos;
1616 io.aiocb.aio_offset = recover_pos;
1617 ink_assert(ink_aio_read(&io));
1618 return EVENT_CONT;
1619
1620 Ldone : {
1621 /* if we come back to the starting position, then we don't have to recover anything */
1622 if (recover_pos == header->write_pos && recover_wrapped) {
1623 SET_HANDLER(&Vol::handle_recover_write_dir);
1624 if (is_debug_tag_set("cache_init")) {
1625 Note("recovery wrapped around. nothing to clear\n");
1626 }
1627 return handle_recover_write_dir(EVENT_IMMEDIATE, nullptr);
1628 }
1629
1630 recover_pos += EVACUATION_SIZE; // safely cover the max write size
1631 if (recover_pos < header->write_pos && (recover_pos + EVACUATION_SIZE >= header->write_pos)) {
1632 Debug("cache_init", "Head Pos: %" PRIu64 ", Rec Pos: %" PRIu64 ", Wrapped:%d", header->write_pos, recover_pos, recover_wrapped);
1633 Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
1634 goto Lclear;
1635 }
1636
1637 if (recover_pos > skip + len) {
1638 recover_pos -= skip + len;
1639 }
1640 // bump sync number so it is different from that in the Doc structs
1641 uint32_t next_sync_serial = max_sync_serial + 1;
1642 // make that the next sync does not overwrite our good copy!
1643 if (!(header->sync_serial & 1) == !(next_sync_serial & 1)) {
1644 next_sync_serial++;
1645 }
1646 // clear effected portion of the cache
1647 off_t clear_start = this->offset_to_vol_offset(header->write_pos);
1648 off_t clear_end = this->offset_to_vol_offset(recover_pos);
1649 if (clear_start <= clear_end) {
1650 dir_clear_range(clear_start, clear_end, this);
1651 } else {
1652 dir_clear_range(clear_start, DIR_OFFSET_MAX, this);
1653 dir_clear_range(1, clear_end, this);
1654 }
1655
1656 Note("recovery clearing offsets of Vol %s : [%" PRIu64 ", %" PRIu64 "] sync_serial %d next %d\n", hash_text.get(),
1657 header->write_pos, recover_pos, header->sync_serial, next_sync_serial);
1658
1659 footer->sync_serial = header->sync_serial = next_sync_serial;
1660
1661 for (int i = 0; i < 3; i++) {
1662 AIOCallback *aio = &(init_info->vol_aio[i]);
1663 aio->aiocb.aio_fildes = fd;
1664 aio->action = this;
1665 aio->thread = AIO_CALLBACK_THREAD_ANY;
1666 aio->then = (i < 2) ? &(init_info->vol_aio[i + 1]) : nullptr;
1667 }
1668 int footerlen = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
1669 size_t dirlen = this->dirlen();
1670 int B = header->sync_serial & 1;
1671 off_t ss = skip + (B ? dirlen : 0);
1672
1673 init_info->vol_aio[0].aiocb.aio_buf = raw_dir;
1674 init_info->vol_aio[0].aiocb.aio_nbytes = footerlen;
1675 init_info->vol_aio[0].aiocb.aio_offset = ss;
1676 init_info->vol_aio[1].aiocb.aio_buf = raw_dir + footerlen;
1677 init_info->vol_aio[1].aiocb.aio_nbytes = dirlen - 2 * footerlen;
1678 init_info->vol_aio[1].aiocb.aio_offset = ss + footerlen;
1679 init_info->vol_aio[2].aiocb.aio_buf = raw_dir + dirlen - footerlen;
1680 init_info->vol_aio[2].aiocb.aio_nbytes = footerlen;
1681 init_info->vol_aio[2].aiocb.aio_offset = ss + dirlen - footerlen;
1682
1683 SET_HANDLER(&Vol::handle_recover_write_dir);
1684 #if AIO_MODE == AIO_MODE_NATIVE
1685 ink_assert(ink_aio_writev(init_info->vol_aio));
1686 #else
1687 ink_assert(ink_aio_write(init_info->vol_aio));
1688 #endif
1689 return EVENT_CONT;
1690 }
1691
1692 Lclear:
1693 free(static_cast<char *>(io.aiocb.aio_buf));
1694 delete init_info;
1695 init_info = nullptr;
1696 clear_dir();
1697 return EVENT_CONT;
1698 }
1699
1700 int
handle_recover_write_dir(int,void *)1701 Vol::handle_recover_write_dir(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1702 {
1703 if (io.aiocb.aio_buf) {
1704 free(static_cast<char *>(io.aiocb.aio_buf));
1705 }
1706 delete init_info;
1707 init_info = nullptr;
1708 set_io_not_in_progress();
1709 scan_pos = header->write_pos;
1710 periodic_scan();
1711 SET_HANDLER(&Vol::dir_init_done);
1712 return dir_init_done(EVENT_IMMEDIATE, nullptr);
1713 }
1714
1715 int
handle_header_read(int event,void * data)1716 Vol::handle_header_read(int event, void *data)
1717 {
1718 AIOCallback *op;
1719 VolHeaderFooter *hf[4];
1720 switch (event) {
1721 case AIO_EVENT_DONE:
1722 op = static_cast<AIOCallback *>(data);
1723 for (auto &i : hf) {
1724 ink_assert(op != nullptr);
1725 i = static_cast<VolHeaderFooter *>(op->aiocb.aio_buf);
1726 if (static_cast<size_t>(op->aio_result) != op->aiocb.aio_nbytes) {
1727 Note("Header read failed: clearing cache directory %s", this->hash_text.get());
1728 clear_dir();
1729 return EVENT_DONE;
1730 }
1731 op = op->then;
1732 }
1733
1734 io.aiocb.aio_fildes = fd;
1735 io.aiocb.aio_nbytes = this->dirlen();
1736 io.aiocb.aio_buf = raw_dir;
1737 io.action = this;
1738 io.thread = AIO_CALLBACK_THREAD_ANY;
1739 io.then = nullptr;
1740
1741 if (hf[0]->sync_serial == hf[1]->sync_serial &&
1742 (hf[0]->sync_serial >= hf[2]->sync_serial || hf[2]->sync_serial != hf[3]->sync_serial)) {
1743 SET_HANDLER(&Vol::handle_dir_read);
1744 if (is_debug_tag_set("cache_init")) {
1745 Note("using directory A for '%s'", hash_text.get());
1746 }
1747 io.aiocb.aio_offset = skip;
1748 ink_assert(ink_aio_read(&io));
1749 }
1750 // try B
1751 else if (hf[2]->sync_serial == hf[3]->sync_serial) {
1752 SET_HANDLER(&Vol::handle_dir_read);
1753 if (is_debug_tag_set("cache_init")) {
1754 Note("using directory B for '%s'", hash_text.get());
1755 }
1756 io.aiocb.aio_offset = skip + this->dirlen();
1757 ink_assert(ink_aio_read(&io));
1758 } else {
1759 Note("no good directory, clearing '%s' since sync_serials on both A and B copies are invalid", hash_text.get());
1760 Note("Header A: %d\nFooter A: %d\n Header B: %d\n Footer B %d\n", hf[0]->sync_serial, hf[1]->sync_serial, hf[2]->sync_serial,
1761 hf[3]->sync_serial);
1762 clear_dir();
1763 delete init_info;
1764 init_info = nullptr;
1765 }
1766 return EVENT_DONE;
1767 default:
1768 ink_assert(!"not reach here");
1769 }
1770 return EVENT_DONE;
1771 }
1772
1773 int
dir_init_done(int,void *)1774 Vol::dir_init_done(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1775 {
1776 if (!cache->cache_read_done) {
1777 eventProcessor.schedule_in(this, HRTIME_MSECONDS(5), ET_CALL);
1778 return EVENT_CONT;
1779 } else {
1780 int vol_no = gnvol++;
1781 ink_assert(!gvol[vol_no]);
1782 gvol[vol_no] = this;
1783 SET_HANDLER(&Vol::aggWrite);
1784 if (fd == -1) {
1785 cache->vol_initialized(false);
1786 } else {
1787 cache->vol_initialized(true);
1788 }
1789 return EVENT_DONE;
1790 }
1791 }
1792
1793 // explicit pair for random table in build_vol_hash_table
1794 struct rtable_pair {
1795 unsigned int rval; ///< relative value, used to sort.
1796 unsigned int idx; ///< volume mapping table index.
1797 };
1798
1799 // comparison operator for random table in build_vol_hash_table
1800 // sorts based on the randomly assigned rval
1801 static int
cmprtable(const void * aa,const void * bb)1802 cmprtable(const void *aa, const void *bb)
1803 {
1804 rtable_pair *a = (rtable_pair *)aa;
1805 rtable_pair *b = (rtable_pair *)bb;
1806 if (a->rval < b->rval) {
1807 return -1;
1808 }
1809 if (a->rval > b->rval) {
1810 return 1;
1811 }
1812 return 0;
1813 }
1814
1815 void
build_vol_hash_table(CacheHostRecord * cp)1816 build_vol_hash_table(CacheHostRecord *cp)
1817 {
1818 int num_vols = cp->num_vols;
1819 unsigned int *mapping = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1820 Vol **p = static_cast<Vol **>(ats_malloc(sizeof(Vol *) * num_vols));
1821
1822 memset(mapping, 0, num_vols * sizeof(unsigned int));
1823 memset(p, 0, num_vols * sizeof(Vol *));
1824 uint64_t total = 0;
1825 int bad_vols = 0;
1826 int map = 0;
1827 uint64_t used = 0;
1828 // initialize number of elements per vol
1829 for (int i = 0; i < num_vols; i++) {
1830 if (DISK_BAD(cp->vols[i]->disk)) {
1831 bad_vols++;
1832 continue;
1833 }
1834 mapping[map] = i;
1835 p[map++] = cp->vols[i];
1836 total += (cp->vols[i]->len >> STORE_BLOCK_SHIFT);
1837 }
1838
1839 num_vols -= bad_vols;
1840
1841 if (!num_vols || !total) {
1842 // all the disks are corrupt,
1843 if (cp->vol_hash_table) {
1844 new_Freer(cp->vol_hash_table, CACHE_MEM_FREE_TIMEOUT);
1845 }
1846 cp->vol_hash_table = nullptr;
1847 ats_free(mapping);
1848 ats_free(p);
1849 return;
1850 }
1851
1852 unsigned int *forvol = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1853 unsigned int *gotvol = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1854 unsigned int *rnd = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1855 unsigned short *ttable = static_cast<unsigned short *>(ats_malloc(sizeof(unsigned short) * VOL_HASH_TABLE_SIZE));
1856 unsigned short *old_table;
1857 unsigned int *rtable_entries = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols));
1858 unsigned int rtable_size = 0;
1859
1860 // estimate allocation
1861 for (int i = 0; i < num_vols; i++) {
1862 forvol[i] = (VOL_HASH_TABLE_SIZE * (p[i]->len >> STORE_BLOCK_SHIFT)) / total;
1863 used += forvol[i];
1864 rtable_entries[i] = p[i]->len / VOL_HASH_ALLOC_SIZE;
1865 rtable_size += rtable_entries[i];
1866 gotvol[i] = 0;
1867 }
1868 // spread around the excess
1869 int extra = VOL_HASH_TABLE_SIZE - used;
1870 for (int i = 0; i < extra; i++) {
1871 forvol[i % num_vols]++;
1872 }
1873 // seed random number generator
1874 for (int i = 0; i < num_vols; i++) {
1875 uint64_t x = p[i]->hash_id.fold();
1876 rnd[i] = static_cast<unsigned int>(x);
1877 }
1878 // initialize table to "empty"
1879 for (int i = 0; i < VOL_HASH_TABLE_SIZE; i++) {
1880 ttable[i] = VOL_HASH_EMPTY;
1881 }
1882 // generate random numbers proportional to allocation
1883 rtable_pair *rtable = static_cast<rtable_pair *>(ats_malloc(sizeof(rtable_pair) * rtable_size));
1884 int rindex = 0;
1885 for (int i = 0; i < num_vols; i++) {
1886 for (int j = 0; j < static_cast<int>(rtable_entries[i]); j++) {
1887 rtable[rindex].rval = next_rand(&rnd[i]);
1888 rtable[rindex].idx = i;
1889 rindex++;
1890 }
1891 }
1892 ink_assert(rindex == (int)rtable_size);
1893 // sort (rand #, vol $ pairs)
1894 qsort(rtable, rtable_size, sizeof(rtable_pair), cmprtable);
1895 unsigned int width = (1LL << 32) / VOL_HASH_TABLE_SIZE;
1896 unsigned int pos; // target position to allocate
1897 // select vol with closest random number for each bucket
1898 int i = 0; // index moving through the random numbers
1899 for (int j = 0; j < VOL_HASH_TABLE_SIZE; j++) {
1900 pos = width / 2 + j * width; // position to select closest to
1901 while (pos > rtable[i].rval && i < static_cast<int>(rtable_size) - 1) {
1902 i++;
1903 }
1904 ttable[j] = mapping[rtable[i].idx];
1905 gotvol[rtable[i].idx]++;
1906 }
1907 for (int i = 0; i < num_vols; i++) {
1908 Debug("cache_init", "build_vol_hash_table index %d mapped to %d requested %d got %d", i, mapping[i], forvol[i], gotvol[i]);
1909 }
1910 // install new table
1911 if (nullptr != (old_table = ink_atomic_swap(&(cp->vol_hash_table), ttable))) {
1912 new_Freer(old_table, CACHE_MEM_FREE_TIMEOUT);
1913 }
1914 ats_free(mapping);
1915 ats_free(p);
1916 ats_free(forvol);
1917 ats_free(gotvol);
1918 ats_free(rnd);
1919 ats_free(rtable_entries);
1920 ats_free(rtable);
1921 }
1922
1923 void
vol_initialized(bool result)1924 Cache::vol_initialized(bool result)
1925 {
1926 if (result) {
1927 ink_atomic_increment(&total_good_nvol, 1);
1928 }
1929 if (total_nvol == ink_atomic_increment(&total_initialized_vol, 1) + 1) {
1930 open_done();
1931 }
1932 }
1933
1934 /** Set the state of a disk programmatically.
1935 */
1936 bool
mark_storage_offline(CacheDisk * d,bool admin)1937 CacheProcessor::mark_storage_offline(CacheDisk *d, ///< Target disk
1938 bool admin)
1939 {
1940 bool zret; // indicates whether there's any online storage left.
1941 int p;
1942 uint64_t total_bytes_delete = 0;
1943 uint64_t total_dir_delete = 0;
1944 uint64_t used_dir_delete = 0;
1945
1946 /* Don't mark it again, it will invalidate the stats! */
1947 if (!d->online) {
1948 return this->has_online_storage();
1949 }
1950
1951 d->online = false;
1952
1953 if (!DISK_BAD(d)) {
1954 SET_DISK_BAD(d);
1955 }
1956
1957 for (p = 0; p < gnvol; p++) {
1958 if (d->fd == gvol[p]->fd) {
1959 total_dir_delete += gvol[p]->buckets * gvol[p]->segments * DIR_DEPTH;
1960 used_dir_delete += dir_entries_used(gvol[p]);
1961 total_bytes_delete += gvol[p]->len - gvol[p]->dirlen();
1962 }
1963 }
1964
1965 RecIncrGlobalRawStat(cache_rsb, cache_bytes_total_stat, -total_bytes_delete);
1966 RecIncrGlobalRawStat(cache_rsb, cache_direntries_total_stat, -total_dir_delete);
1967 RecIncrGlobalRawStat(cache_rsb, cache_direntries_used_stat, -used_dir_delete);
1968
1969 /* Update the span metrics, if failing then move the span from "failing" to "offline" bucket
1970 * if operator took it offline, move it from "online" to "offline" bucket */
1971 RecIncrGlobalRawStat(cache_rsb, admin ? cache_span_online_stat : cache_span_failing_stat, -1);
1972 RecIncrGlobalRawStat(cache_rsb, cache_span_offline_stat, 1);
1973
1974 if (theCache) {
1975 rebuild_host_table(theCache);
1976 }
1977
1978 zret = this->has_online_storage();
1979 if (!zret) {
1980 Warning("All storage devices offline, cache disabled");
1981 CacheProcessor::cache_ready = 0;
1982 } else { // check cache types specifically
1983 if (theCache && !theCache->hosttable->gen_host_rec.vol_hash_table) {
1984 unsigned int caches_ready = 0;
1985 caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_HTTP);
1986 caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_NONE);
1987 caches_ready = ~caches_ready;
1988 CacheProcessor::cache_ready &= caches_ready;
1989 Warning("all volumes for http cache are corrupt, http cache disabled");
1990 }
1991 }
1992
1993 return zret;
1994 }
1995
1996 bool
has_online_storage() const1997 CacheProcessor::has_online_storage() const
1998 {
1999 CacheDisk **dptr = gdisks;
2000 for (int disk_no = 0; disk_no < gndisks; ++disk_no, ++dptr) {
2001 if (!DISK_BAD(*dptr) && (*dptr)->online) {
2002 return true;
2003 }
2004 }
2005 return false;
2006 }
2007
2008 int
handle_disk_failure(int,void * data)2009 AIO_Callback_handler::handle_disk_failure(int /* event ATS_UNUSED */, void *data)
2010 {
2011 /* search for the matching file descriptor */
2012 if (!CacheProcessor::cache_ready) {
2013 return EVENT_DONE;
2014 }
2015 int disk_no = 0;
2016 AIOCallback *cb = static_cast<AIOCallback *>(data);
2017
2018 for (; disk_no < gndisks; disk_no++) {
2019 CacheDisk *d = gdisks[disk_no];
2020
2021 if (d->fd == cb->aiocb.aio_fildes) {
2022 char message[256];
2023 d->incrErrors(cb);
2024
2025 if (!DISK_BAD(d)) {
2026 snprintf(message, sizeof(message), "Error accessing Disk %s [%d/%d]", d->path, d->num_errors, cache_config_max_disk_errors);
2027 Warning("%s", message);
2028 RecSignalManager(REC_SIGNAL_CACHE_WARNING, message);
2029 } else if (!DISK_BAD_SIGNALLED(d)) {
2030 snprintf(message, sizeof(message), "too many errors accessing disk %s [%d/%d]: declaring disk bad", d->path, d->num_errors,
2031 cache_config_max_disk_errors);
2032 Warning("%s", message);
2033 RecSignalManager(REC_SIGNAL_CACHE_ERROR, message);
2034 cacheProcessor.mark_storage_offline(d); // take it out of service
2035 }
2036 break;
2037 }
2038 }
2039
2040 delete cb;
2041 return EVENT_DONE;
2042 }
2043
2044 int
open_done()2045 Cache::open_done()
2046 {
2047 Action *register_ShowCache(Continuation * c, HTTPHdr * h);
2048 Action *register_ShowCacheInternal(Continuation * c, HTTPHdr * h);
2049 statPagesManager.register_http("cache", register_ShowCache);
2050 statPagesManager.register_http("cache-internal", register_ShowCacheInternal);
2051
2052 if (total_good_nvol == 0) {
2053 ready = CACHE_INIT_FAILED;
2054 cacheProcessor.cacheInitialized();
2055 return 0;
2056 }
2057
2058 hosttable = new CacheHostTable(this, scheme);
2059 hosttable->register_config_callback(&hosttable);
2060
2061 if (hosttable->gen_host_rec.num_cachevols == 0) {
2062 ready = CACHE_INIT_FAILED;
2063 } else {
2064 ready = CACHE_INITIALIZED;
2065 }
2066
2067 // TS-3848
2068 if (ready == CACHE_INIT_FAILED && cacheProcessor.waitForCache() >= 2) {
2069 Emergency("Failed to initialize cache host table");
2070 }
2071
2072 cacheProcessor.cacheInitialized();
2073
2074 return 0;
2075 }
2076
2077 int
open(bool clear,bool)2078 Cache::open(bool clear, bool /* fix ATS_UNUSED */)
2079 {
2080 int i;
2081 off_t blocks = 0;
2082 cache_read_done = 0;
2083 total_initialized_vol = 0;
2084 total_nvol = 0;
2085 total_good_nvol = 0;
2086
2087 REC_EstablishStaticConfigInt32(cache_config_min_average_object_size, "proxy.config.cache.min_average_object_size");
2088 Debug("cache_init", "Cache::open - proxy.config.cache.min_average_object_size = %d", (int)cache_config_min_average_object_size);
2089
2090 CacheVol *cp = cp_list.head;
2091 for (; cp; cp = cp->link.next) {
2092 if (cp->scheme == scheme) {
2093 cp->vols = static_cast<Vol **>(ats_malloc(cp->num_vols * sizeof(Vol *)));
2094 int vol_no = 0;
2095 for (i = 0; i < gndisks; i++) {
2096 if (cp->disk_vols[i] && !DISK_BAD(cp->disk_vols[i]->disk)) {
2097 DiskVolBlockQueue *q = cp->disk_vols[i]->dpb_queue.head;
2098 for (; q; q = q->link.next) {
2099 cp->vols[vol_no] = new Vol();
2100 CacheDisk *d = cp->disk_vols[i]->disk;
2101 cp->vols[vol_no]->disk = d;
2102 cp->vols[vol_no]->fd = d->fd;
2103 cp->vols[vol_no]->cache = this;
2104 cp->vols[vol_no]->cache_vol = cp;
2105 blocks = q->b->len;
2106
2107 bool vol_clear = clear || d->cleared || q->new_block;
2108 #if AIO_MODE == AIO_MODE_NATIVE
2109 eventProcessor.schedule_imm(new VolInit(cp->vols[vol_no], d->path, blocks, q->b->offset, vol_clear));
2110 #else
2111 cp->vols[vol_no]->init(d->path, blocks, q->b->offset, vol_clear);
2112 #endif
2113 vol_no++;
2114 cache_size += blocks;
2115 }
2116 }
2117 }
2118 total_nvol += vol_no;
2119 }
2120 }
2121 if (total_nvol == 0) {
2122 return open_done();
2123 }
2124 cache_read_done = 1;
2125 return 0;
2126 }
2127
2128 int
close()2129 Cache::close()
2130 {
2131 return -1;
2132 }
2133
2134 int
dead(int,Event *)2135 CacheVC::dead(int /* event ATS_UNUSED */, Event * /*e ATS_UNUSED */)
2136 {
2137 ink_assert(0);
2138 return EVENT_DONE;
2139 }
2140
2141 bool
is_pread_capable()2142 CacheVC::is_pread_capable()
2143 {
2144 return !f.read_from_writer_called;
2145 }
2146
2147 #define STORE_COLLISION 1
2148
2149 static void
unmarshal_helper(Doc * doc,Ptr<IOBufferData> & buf,int & okay)2150 unmarshal_helper(Doc *doc, Ptr<IOBufferData> &buf, int &okay)
2151 {
2152 using UnmarshalFunc = int(char *buf, int len, RefCountObj *block_ref);
2153 UnmarshalFunc *unmarshal_func = &HTTPInfo::unmarshal;
2154 ts::VersionNumber version(doc->v_major, doc->v_minor);
2155
2156 // introduced by https://github.com/apache/trafficserver/pull/4874, this is used to distinguish the doc version
2157 // before and after #4847
2158 if (version < CACHE_DB_VERSION) {
2159 unmarshal_func = &HTTPInfo::unmarshal_v24_1;
2160 }
2161
2162 char *tmp = doc->hdr();
2163 int len = doc->hlen;
2164 while (len > 0) {
2165 int r = unmarshal_func(tmp, len, buf.get());
2166 if (r < 0) {
2167 ink_assert(!"CacheVC::handleReadDone unmarshal failed");
2168 okay = 0;
2169 break;
2170 }
2171 len -= r;
2172 tmp += r;
2173 }
2174 }
2175
2176 // [amc] I think this is where all disk reads from cache funnel through here.
2177 int
handleReadDone(int event,Event * e)2178 CacheVC::handleReadDone(int event, Event *e)
2179 {
2180 cancel_trigger();
2181 ink_assert(this_ethread() == mutex->thread_holding);
2182
2183 Doc *doc = nullptr;
2184 if (event == AIO_EVENT_DONE) {
2185 set_io_not_in_progress();
2186 } else if (is_io_in_progress()) {
2187 return EVENT_CONT;
2188 }
2189 {
2190 MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
2191 if (!lock.is_locked()) {
2192 VC_SCHED_LOCK_RETRY();
2193 }
2194 if ((!dir_valid(vol, &dir)) || (!io.ok())) {
2195 if (!io.ok()) {
2196 Debug("cache_disk_error", "Read error on disk %s\n \
2197 read range : [%" PRIu64 " - %" PRIu64 " bytes] [%" PRIu64 " - %" PRIu64 " blocks] \n",
2198 vol->hash_text.get(), (uint64_t)io.aiocb.aio_offset, (uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
2199 (uint64_t)io.aiocb.aio_offset / 512, (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / 512);
2200 }
2201 goto Ldone;
2202 }
2203
2204 doc = reinterpret_cast<Doc *>(buf->data());
2205 ink_assert(vol->mutex->nthread_holding < 1000);
2206 ink_assert(doc->magic == DOC_MAGIC);
2207
2208 if (ts::VersionNumber(doc->v_major, doc->v_minor) > CACHE_DB_VERSION) {
2209 // future version, count as corrupted
2210 doc->magic = DOC_CORRUPT;
2211 Debug("cache_bc", "Object is future version %d:%d - disk %s - doc id = %" PRIx64 ":%" PRIx64 "", doc->v_major, doc->v_minor,
2212 vol->hash_text.get(), read_key->slice64(0), read_key->slice64(1));
2213 goto Ldone;
2214 }
2215
2216 #ifdef VERIFY_JTEST_DATA
2217 char xx[500];
2218 if (read_key && *read_key == doc->key && request.valid() && !dir_head(&dir) && !vio.ndone) {
2219 int ib = 0, xd = 0;
2220 request.url_get()->print(xx, 500, &ib, &xd);
2221 char *x = xx;
2222 for (int q = 0; q < 3; q++)
2223 x = strchr(x + 1, '/');
2224 ink_assert(!memcmp(doc->data(), x, ib - (x - xx)));
2225 }
2226 #endif
2227
2228 if (is_debug_tag_set("cache_read")) {
2229 char xt[CRYPTO_HEX_SIZE];
2230 Debug("cache_read", "Read complete on fragment %s. Length: data payload=%d this fragment=%d total doc=%" PRId64 " prefix=%d",
2231 doc->key.toHexStr(xt), doc->data_len(), doc->len, doc->total_len, doc->prefix_len());
2232 }
2233
2234 // put into ram cache?
2235 if (io.ok() && ((doc->first_key == *read_key) || (doc->key == *read_key) || STORE_COLLISION) && doc->magic == DOC_MAGIC) {
2236 int okay = 1;
2237 if (!f.doc_from_ram_cache) {
2238 f.not_from_ram_cache = 1;
2239 }
2240 if (cache_config_enable_checksum && doc->checksum != DOC_NO_CHECKSUM) {
2241 // verify that the checksum matches
2242 uint32_t checksum = 0;
2243 for (char *b = doc->hdr(); b < reinterpret_cast<char *>(doc) + doc->len; b++) {
2244 checksum += *b;
2245 }
2246 ink_assert(checksum == doc->checksum);
2247 if (checksum != doc->checksum) {
2248 Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "] len %d, hlen %d, disk %s, offset %" PRIu64 " size %zu",
2249 doc->first_key.b[0], doc->first_key.b[1], doc->len, doc->hlen, vol->path, (uint64_t)io.aiocb.aio_offset,
2250 (size_t)io.aiocb.aio_nbytes);
2251 doc->magic = DOC_CORRUPT;
2252 okay = 0;
2253 }
2254 }
2255 (void)e; // Avoid compiler warnings
2256 bool http_copy_hdr = false;
2257 http_copy_hdr =
2258 cache_config_ram_cache_compress && !f.doc_from_ram_cache && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen;
2259 // If http doc we need to unmarshal the headers before putting in the ram cache
2260 // unless it could be compressed
2261 if (!http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay) {
2262 unmarshal_helper(doc, buf, okay);
2263 }
2264 // Put the request in the ram cache only if its a open_read or lookup
2265 if (vio.op == VIO::READ && okay) {
2266 bool cutoff_check;
2267 // cutoff_check :
2268 // doc_len == 0 for the first fragment (it is set from the vector)
2269 // The decision on the first fragment is based on
2270 // doc->total_len
2271 // After that, the decision is based of doc_len (doc_len != 0)
2272 // (cache_config_ram_cache_cutoff == 0) : no cutoffs
2273 cutoff_check =
2274 ((!doc_len && static_cast<int64_t>(doc->total_len) < cache_config_ram_cache_cutoff) ||
2275 (doc_len && static_cast<int64_t>(doc_len) < cache_config_ram_cache_cutoff) || !cache_config_ram_cache_cutoff);
2276 if (cutoff_check && !f.doc_from_ram_cache) {
2277 uint64_t o = dir_offset(&dir);
2278 vol->ram_cache->put(read_key, buf.get(), doc->len, http_copy_hdr, static_cast<uint32_t>(o >> 32),
2279 static_cast<uint32_t>(o));
2280 }
2281 if (!doc_len) {
2282 // keep a pointer to it. In case the state machine decides to
2283 // update this document, we don't have to read it back in memory
2284 // again
2285 vol->first_fragment_key = *read_key;
2286 vol->first_fragment_offset = dir_offset(&dir);
2287 vol->first_fragment_data = buf;
2288 }
2289 } // end VIO::READ check
2290 // If it could be compressed, unmarshal after
2291 if (http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay) {
2292 unmarshal_helper(doc, buf, okay);
2293 }
2294 } // end io.ok() check
2295 }
2296 Ldone:
2297 POP_HANDLER;
2298 return handleEvent(AIO_EVENT_DONE, nullptr);
2299 }
2300
2301 int
handleRead(int,Event *)2302 CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
2303 {
2304 cancel_trigger();
2305
2306 f.doc_from_ram_cache = false;
2307
2308 // check ram cache
2309 ink_assert(vol->mutex->thread_holding == this_ethread());
2310 int64_t o = dir_offset(&dir);
2311 int ram_hit_state = vol->ram_cache->get(read_key, &buf, static_cast<uint32_t>(o >> 32), static_cast<uint32_t>(o));
2312 f.compressed_in_ram = (ram_hit_state > RAM_HIT_COMPRESS_NONE) ? 1 : 0;
2313 if (ram_hit_state >= RAM_HIT_COMPRESS_NONE) {
2314 goto LramHit;
2315 }
2316
2317 // check if it was read in the last open_read call
2318 if (*read_key == vol->first_fragment_key && dir_offset(&dir) == vol->first_fragment_offset) {
2319 buf = vol->first_fragment_data;
2320 goto LmemHit;
2321 }
2322 // see if its in the aggregation buffer
2323 if (dir_agg_buf_valid(vol, &dir)) {
2324 int agg_offset = vol->vol_offset(&dir) - vol->header->write_pos;
2325 buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
2326 ink_assert((agg_offset + io.aiocb.aio_nbytes) <= (unsigned)vol->agg_buf_pos);
2327 char *doc = buf->data();
2328 char *agg = vol->agg_buffer + agg_offset;
2329 memcpy(doc, agg, io.aiocb.aio_nbytes);
2330 io.aio_result = io.aiocb.aio_nbytes;
2331 SET_HANDLER(&CacheVC::handleReadDone);
2332 return EVENT_RETURN;
2333 }
2334
2335 io.aiocb.aio_fildes = vol->fd;
2336 io.aiocb.aio_offset = vol->vol_offset(&dir);
2337 if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast<off_t>(vol->skip + vol->len)) {
2338 io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset;
2339 }
2340 buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
2341 io.aiocb.aio_buf = buf->data();
2342 io.action = this;
2343 io.thread = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
2344 SET_HANDLER(&CacheVC::handleReadDone);
2345 ink_assert(ink_aio_read(&io) >= 0);
2346 CACHE_DEBUG_INCREMENT_DYN_STAT(cache_pread_count_stat);
2347 return EVENT_CONT;
2348
2349 LramHit : {
2350 f.doc_from_ram_cache = true;
2351 io.aio_result = io.aiocb.aio_nbytes;
2352 Doc *doc = reinterpret_cast<Doc *>(buf->data());
2353 if (cache_config_ram_cache_compress && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
2354 SET_HANDLER(&CacheVC::handleReadDone);
2355 return EVENT_RETURN;
2356 }
2357 }
2358 LmemHit:
2359 f.doc_from_ram_cache = true;
2360 io.aio_result = io.aiocb.aio_nbytes;
2361 POP_HANDLER;
2362 return EVENT_RETURN; // allow the caller to release the volume lock
2363 }
2364
2365 Action *
lookup(Continuation * cont,const CacheKey * key,CacheFragType type,const char * hostname,int host_len)2366 Cache::lookup(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len)
2367 {
2368 if (!CacheProcessor::IsCacheReady(type)) {
2369 cont->handleEvent(CACHE_EVENT_LOOKUP_FAILED, nullptr);
2370 return ACTION_RESULT_DONE;
2371 }
2372
2373 Vol *vol = key_to_vol(key, hostname, host_len);
2374 ProxyMutex *mutex = cont->mutex.get();
2375 CacheVC *c = new_CacheVC(cont);
2376 SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
2377 c->vio.op = VIO::READ;
2378 c->base_stat = cache_lookup_active_stat;
2379 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
2380 c->first_key = c->key = *key;
2381 c->frag_type = type;
2382 c->f.lookup = 1;
2383 c->vol = vol;
2384 c->last_collision = nullptr;
2385
2386 if (c->handleEvent(EVENT_INTERVAL, nullptr) == EVENT_CONT) {
2387 return &c->_action;
2388 } else {
2389 return ACTION_RESULT_DONE;
2390 }
2391 }
2392
2393 int
removeEvent(int,Event *)2394 CacheVC::removeEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
2395 {
2396 cancel_trigger();
2397 set_io_not_in_progress();
2398 {
2399 MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
2400 if (!lock.is_locked()) {
2401 VC_SCHED_LOCK_RETRY();
2402 }
2403 if (_action.cancelled) {
2404 if (od) {
2405 vol->close_write(this);
2406 od = nullptr;
2407 }
2408 goto Lfree;
2409 }
2410 if (!f.remove_aborted_writers) {
2411 if (vol->open_write(this, true, 1)) {
2412 // writer exists
2413 od = vol->open_read(&key);
2414 ink_release_assert(od);
2415 od->dont_update_directory = true;
2416 od = nullptr;
2417 } else {
2418 od->dont_update_directory = true;
2419 }
2420 f.remove_aborted_writers = 1;
2421 }
2422 Lread:
2423 SET_HANDLER(&CacheVC::removeEvent);
2424 if (!buf) {
2425 goto Lcollision;
2426 }
2427 if (!dir_valid(vol, &dir)) {
2428 last_collision = nullptr;
2429 goto Lcollision;
2430 }
2431 // check read completed correct FIXME: remove bad vols
2432 if (static_cast<size_t>(io.aio_result) != io.aiocb.aio_nbytes) {
2433 goto Ldone;
2434 }
2435 {
2436 // verify that this is our document
2437 Doc *doc = reinterpret_cast<Doc *>(buf->data());
2438 /* should be first_key not key..right?? */
2439 if (doc->first_key == key) {
2440 ink_assert(doc->magic == DOC_MAGIC);
2441 if (dir_delete(&key, vol, &dir) > 0) {
2442 if (od) {
2443 vol->close_write(this);
2444 }
2445 od = nullptr;
2446 goto Lremoved;
2447 }
2448 goto Ldone;
2449 }
2450 }
2451 Lcollision:
2452 // check for collision
2453 if (dir_probe(&key, vol, &dir, &last_collision) > 0) {
2454 int ret = do_read_call(&key);
2455 if (ret == EVENT_RETURN) {
2456 goto Lread;
2457 }
2458 return ret;
2459 }
2460 Ldone:
2461 CACHE_INCREMENT_DYN_STAT(cache_remove_failure_stat);
2462 if (od) {
2463 vol->close_write(this);
2464 }
2465 }
2466 ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
2467 _action.continuation->handleEvent(CACHE_EVENT_REMOVE_FAILED, (void *)-ECACHE_NO_DOC);
2468 goto Lfree;
2469 Lremoved:
2470 _action.continuation->handleEvent(CACHE_EVENT_REMOVE, nullptr);
2471 Lfree:
2472 return free_CacheVC(this);
2473 }
2474
2475 Action *
remove(Continuation * cont,const CacheKey * key,CacheFragType type,const char * hostname,int host_len)2476 Cache::remove(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len)
2477 {
2478 if (!CacheProcessor::IsCacheReady(type)) {
2479 if (cont) {
2480 cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, nullptr);
2481 }
2482 return ACTION_RESULT_DONE;
2483 }
2484
2485 Ptr<ProxyMutex> mutex;
2486 if (!cont) {
2487 cont = new_CacheRemoveCont();
2488 }
2489
2490 CACHE_TRY_LOCK(lock, cont->mutex, this_ethread());
2491 ink_assert(lock.is_locked());
2492 Vol *vol = key_to_vol(key, hostname, host_len);
2493 // coverity[var_decl]
2494 Dir result;
2495 dir_clear(&result); // initialized here, set result empty so we can recognize missed lock
2496 mutex = cont->mutex;
2497
2498 CacheVC *c = new_CacheVC(cont);
2499 c->vio.op = VIO::NONE;
2500 c->frag_type = type;
2501 c->base_stat = cache_remove_active_stat;
2502 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
2503 c->first_key = c->key = *key;
2504 c->vol = vol;
2505 c->dir = result;
2506 c->f.remove = 1;
2507
2508 SET_CONTINUATION_HANDLER(c, &CacheVC::removeEvent);
2509 int ret = c->removeEvent(EVENT_IMMEDIATE, nullptr);
2510 if (ret == EVENT_DONE) {
2511 return ACTION_RESULT_DONE;
2512 } else {
2513 return &c->_action;
2514 }
2515 }
2516 // CacheVConnection
2517
CacheVConnection()2518 CacheVConnection::CacheVConnection() : VConnection(nullptr) {}
2519
2520 void
cplist_init()2521 cplist_init()
2522 {
2523 cp_list_len = 0;
2524 for (int i = 0; i < gndisks; i++) {
2525 CacheDisk *d = gdisks[i];
2526 DiskVol **dp = d->disk_vols;
2527 for (unsigned int j = 0; j < d->header->num_volumes; j++) {
2528 ink_assert(dp[j]->dpb_queue.head);
2529 CacheVol *p = cp_list.head;
2530 while (p) {
2531 if (p->vol_number == dp[j]->vol_number) {
2532 ink_assert(p->scheme == (int)dp[j]->dpb_queue.head->b->type);
2533 p->size += dp[j]->size;
2534 p->num_vols += dp[j]->num_volblocks;
2535 p->disk_vols[i] = dp[j];
2536 break;
2537 }
2538 p = p->link.next;
2539 }
2540 if (!p) {
2541 // did not find a volume in the cache vol list...create
2542 // a new one
2543 CacheVol *new_p = new CacheVol();
2544 new_p->vol_number = dp[j]->vol_number;
2545 new_p->num_vols = dp[j]->num_volblocks;
2546 new_p->size = dp[j]->size;
2547 new_p->scheme = dp[j]->dpb_queue.head->b->type;
2548 new_p->disk_vols = static_cast<DiskVol **>(ats_malloc(gndisks * sizeof(DiskVol *)));
2549 memset(new_p->disk_vols, 0, gndisks * sizeof(DiskVol *));
2550 new_p->disk_vols[i] = dp[j];
2551 cp_list.enqueue(new_p);
2552 cp_list_len++;
2553 }
2554 }
2555 }
2556 }
2557
2558 static int fillExclusiveDisks(CacheVol *cp);
2559
2560 void
cplist_update()2561 cplist_update()
2562 {
2563 /* go through cplist and delete volumes that are not in the volume.config */
2564 CacheVol *cp = cp_list.head;
2565 ConfigVol *config_vol;
2566
2567 while (cp) {
2568 for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
2569 if (config_vol->number == cp->vol_number) {
2570 if (cp->scheme == config_vol->scheme) {
2571 cp->ramcache_enabled = config_vol->ramcache_enabled;
2572 config_vol->cachep = cp;
2573 } else {
2574 /* delete this volume from all the disks */
2575 int d_no;
2576 int clearCV = 1;
2577
2578 for (d_no = 0; d_no < gndisks; d_no++) {
2579 if (cp->disk_vols[d_no]) {
2580 if (cp->disk_vols[d_no]->disk->forced_volume_num == cp->vol_number) {
2581 clearCV = 0;
2582 config_vol->cachep = cp;
2583 } else {
2584 cp->disk_vols[d_no]->disk->delete_volume(cp->vol_number);
2585 cp->disk_vols[d_no] = nullptr;
2586 }
2587 }
2588 }
2589 if (clearCV) {
2590 config_vol = nullptr;
2591 }
2592 }
2593 break;
2594 }
2595 }
2596
2597 if (!config_vol) {
2598 // did not find a matching volume in the config file.
2599 // Delete the volume from the cache vol list
2600 int d_no;
2601 for (d_no = 0; d_no < gndisks; d_no++) {
2602 if (cp->disk_vols[d_no]) {
2603 cp->disk_vols[d_no]->disk->delete_volume(cp->vol_number);
2604 }
2605 }
2606 CacheVol *temp_cp = cp;
2607 cp = cp->link.next;
2608 cp_list.remove(temp_cp);
2609 cp_list_len--;
2610 delete temp_cp;
2611 continue;
2612 } else {
2613 cp = cp->link.next;
2614 }
2615 }
2616
2617 // Look for (exclusive) spans forced to a specific volume but not yet referenced by any volumes in cp_list,
2618 // if found then create a new volume. This also makes sure new exclusive disk volumes are created first
2619 // before any other new volumes to assure proper span free space calculation and proper volume block distribution.
2620 for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
2621 if (nullptr == config_vol->cachep) {
2622 // Find out if this is a forced volume assigned exclusively to a span which was cleared (hence not referenced in cp_list).
2623 // Note: non-exclusive cleared spans are not handled here, only the "exclusive"
2624 bool forced_volume = false;
2625 for (int d_no = 0; d_no < gndisks; d_no++) {
2626 if (gdisks[d_no]->forced_volume_num == config_vol->number) {
2627 forced_volume = true;
2628 }
2629 }
2630
2631 if (forced_volume) {
2632 CacheVol *new_cp = new CacheVol();
2633 if (nullptr != new_cp) {
2634 new_cp->disk_vols = static_cast<decltype(new_cp->disk_vols)>(ats_malloc(gndisks * sizeof(DiskVol *)));
2635 if (nullptr != new_cp->disk_vols) {
2636 memset(new_cp->disk_vols, 0, gndisks * sizeof(DiskVol *));
2637 new_cp->vol_number = config_vol->number;
2638 new_cp->scheme = config_vol->scheme;
2639 config_vol->cachep = new_cp;
2640 fillExclusiveDisks(config_vol->cachep);
2641 cp_list.enqueue(new_cp);
2642 } else {
2643 delete new_cp;
2644 }
2645 }
2646 }
2647 } else {
2648 // Fill if this is exclusive disk.
2649 fillExclusiveDisks(config_vol->cachep);
2650 }
2651 }
2652 }
2653
2654 static int
fillExclusiveDisks(CacheVol * cp)2655 fillExclusiveDisks(CacheVol *cp)
2656 {
2657 int diskCount = 0;
2658 int volume_number = cp->vol_number;
2659
2660 Debug("cache_init", "volume %d", volume_number);
2661 for (int i = 0; i < gndisks; i++) {
2662 if (gdisks[i]->forced_volume_num != volume_number) {
2663 continue;
2664 }
2665
2666 /* OK, this should be an "exclusive" disk (span). */
2667 diskCount++;
2668
2669 /* There should be a single "forced" volume and no other volumes should exist on this "exclusive" disk (span) */
2670 bool found_nonforced_volumes = false;
2671 for (int j = 0; j < static_cast<int>(gdisks[i]->header->num_volumes); j++) {
2672 if (volume_number != gdisks[i]->disk_vols[j]->vol_number) {
2673 found_nonforced_volumes = true;
2674 break;
2675 }
2676 }
2677
2678 if (found_nonforced_volumes) {
2679 /* The user had created several volumes before - clear the disk and create one volume for http */
2680 Note("Clearing Disk: %s", gdisks[i]->path);
2681 gdisks[i]->delete_all_volumes();
2682 } else if (1 == gdisks[i]->header->num_volumes) {
2683 /* "Forced" volumes take the whole disk (span) hence nothing more to do for this span. */
2684 continue;
2685 }
2686
2687 /* Now, volumes have been either deleted or did not exist to begin with so we need to create them. */
2688
2689 int64_t size_diff = gdisks[i]->num_usable_blocks;
2690 DiskVolBlock *dpb;
2691 do {
2692 dpb = gdisks[i]->create_volume(volume_number, size_diff, cp->scheme);
2693 if (dpb) {
2694 if (!cp->disk_vols[i]) {
2695 cp->disk_vols[i] = gdisks[i]->get_diskvol(volume_number);
2696 }
2697 size_diff -= dpb->len;
2698 cp->size += dpb->len;
2699 cp->num_vols++;
2700 } else {
2701 Debug("cache_init", "create_volume failed");
2702 break;
2703 }
2704 } while ((size_diff > 0));
2705 }
2706
2707 /* Report back the number of disks (spans) that were assigned to volume specified by volume_number. */
2708 return diskCount;
2709 }
2710
2711 int
cplist_reconfigure()2712 cplist_reconfigure()
2713 {
2714 int64_t size;
2715 int volume_number;
2716 off_t size_in_blocks;
2717 ConfigVol *config_vol;
2718
2719 gnvol = 0;
2720 if (config_volumes.num_volumes == 0) {
2721 /* only the http cache */
2722 CacheVol *cp = new CacheVol();
2723 cp->vol_number = 0;
2724 cp->scheme = CACHE_HTTP_TYPE;
2725 cp->disk_vols = static_cast<DiskVol **>(ats_malloc(gndisks * sizeof(DiskVol *)));
2726 memset(cp->disk_vols, 0, gndisks * sizeof(DiskVol *));
2727 cp_list.enqueue(cp);
2728 cp_list_len++;
2729 for (int i = 0; i < gndisks; i++) {
2730 if (gdisks[i]->header->num_volumes != 1 || gdisks[i]->disk_vols[0]->vol_number != 0) {
2731 /* The user had created several volumes before - clear the disk
2732 and create one volume for http */
2733 Note("Clearing Disk: %s", gdisks[i]->path);
2734 gdisks[i]->delete_all_volumes();
2735 }
2736 if (gdisks[i]->cleared) {
2737 uint64_t free_space = gdisks[i]->free_space * STORE_BLOCK_SIZE;
2738 int vols = (free_space / MAX_VOL_SIZE) + 1;
2739 for (int p = 0; p < vols; p++) {
2740 off_t b = gdisks[i]->free_space / (vols - p);
2741 Debug("cache_hosting", "blocks = %" PRId64, (int64_t)b);
2742 DiskVolBlock *dpb = gdisks[i]->create_volume(0, b, CACHE_HTTP_TYPE);
2743 ink_assert(dpb && dpb->len == (uint64_t)b);
2744 }
2745 ink_assert(gdisks[i]->free_space == 0);
2746 }
2747
2748 ink_assert(gdisks[i]->header->num_volumes == 1);
2749 DiskVol **dp = gdisks[i]->disk_vols;
2750 gnvol += dp[0]->num_volblocks;
2751 cp->size += dp[0]->size;
2752 cp->num_vols += dp[0]->num_volblocks;
2753 cp->disk_vols[i] = dp[0];
2754 }
2755
2756 } else {
2757 for (int i = 0; i < gndisks; i++) {
2758 if (gdisks[i]->header->num_volumes == 1 && gdisks[i]->disk_vols[0]->vol_number == 0) {
2759 /* The user had created several volumes before - clear the disk
2760 and create one volume for http */
2761 Note("Clearing Disk: %s", gdisks[i]->path);
2762 gdisks[i]->delete_all_volumes();
2763 }
2764 }
2765
2766 /* change percentages in the config partitions to absolute value */
2767 off_t tot_space_in_blks = 0;
2768 off_t blocks_per_vol = VOL_BLOCK_SIZE / STORE_BLOCK_SIZE;
2769 /* sum up the total space available on all the disks.
2770 round down the space to 128 megabytes */
2771 for (int i = 0; i < gndisks; i++) {
2772 // Exclude exclusive disks (with forced volumes) from the following total space calculation,
2773 // in such a way forced volumes will not impact volume percentage calculations.
2774 if (-1 == gdisks[i]->forced_volume_num) {
2775 tot_space_in_blks += (gdisks[i]->num_usable_blocks / blocks_per_vol) * blocks_per_vol;
2776 }
2777 }
2778
2779 double percent_remaining = 100.00;
2780 for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
2781 if (config_vol->in_percent) {
2782 if (config_vol->percent > percent_remaining) {
2783 Warning("total volume sizes added up to more than 100%%!");
2784 Warning("no volumes created");
2785 return -1;
2786 }
2787
2788 // Find if the volume is forced and if it is then calculate the total forced volume size.
2789 // Forced volumes take the whole span (disk) also sum all disk space this volume is forced to.
2790 int64_t tot_forced_space_in_blks = 0;
2791 for (int i = 0; i < gndisks; i++) {
2792 if (config_vol->number == gdisks[i]->forced_volume_num) {
2793 tot_forced_space_in_blks += (gdisks[i]->num_usable_blocks / blocks_per_vol) * blocks_per_vol;
2794 }
2795 }
2796
2797 int64_t space_in_blks = 0;
2798 if (0 == tot_forced_space_in_blks) {
2799 // Calculate the space as percentage of total space in blocks.
2800 space_in_blks = static_cast<int64_t>(((config_vol->percent / percent_remaining)) * tot_space_in_blks);
2801 } else {
2802 // Forced volumes take all disk space, so no percentage calculations here.
2803 space_in_blks = tot_forced_space_in_blks;
2804 }
2805
2806 space_in_blks = space_in_blks >> (20 - STORE_BLOCK_SHIFT);
2807 /* round down to 128 megabyte multiple */
2808 space_in_blks = (space_in_blks >> 7) << 7;
2809 config_vol->size = space_in_blks;
2810
2811 if (0 == tot_forced_space_in_blks) {
2812 tot_space_in_blks -= space_in_blks << (20 - STORE_BLOCK_SHIFT);
2813 percent_remaining -= (config_vol->size < 128) ? 0 : config_vol->percent;
2814 }
2815 }
2816 if (config_vol->size < 128) {
2817 Warning("the size of volume %d (%" PRId64 ") is less than the minimum required volume size %d", config_vol->number,
2818 (int64_t)config_vol->size, 128);
2819 Warning("volume %d is not created", config_vol->number);
2820 }
2821 Debug("cache_hosting", "Volume: %d Size: %" PRId64 " Ramcache: %d", config_vol->number, (int64_t)config_vol->size,
2822 config_vol->ramcache_enabled);
2823 }
2824 cplist_update();
2825
2826 /* go through volume config and grow and create volumes */
2827 for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
2828 size = config_vol->size;
2829 if (size < 128) {
2830 continue;
2831 }
2832
2833 volume_number = config_vol->number;
2834
2835 size_in_blocks = (static_cast<off_t>(size) * 1024 * 1024) / STORE_BLOCK_SIZE;
2836
2837 if (config_vol->cachep && config_vol->cachep->num_vols > 0) {
2838 gnvol += config_vol->cachep->num_vols;
2839 continue;
2840 }
2841
2842 if (!config_vol->cachep) {
2843 // we did not find a corresponding entry in cache vol...create one
2844
2845 CacheVol *new_cp = new CacheVol();
2846 new_cp->disk_vols = static_cast<DiskVol **>(ats_malloc(gndisks * sizeof(DiskVol *)));
2847 memset(new_cp->disk_vols, 0, gndisks * sizeof(DiskVol *));
2848 if (create_volume(config_vol->number, size_in_blocks, config_vol->scheme, new_cp)) {
2849 ats_free(new_cp->disk_vols);
2850 new_cp->disk_vols = nullptr;
2851 delete new_cp;
2852 return -1;
2853 }
2854 cp_list.enqueue(new_cp);
2855 cp_list_len++;
2856 config_vol->cachep = new_cp;
2857 gnvol += new_cp->num_vols;
2858 continue;
2859 }
2860 // else
2861 CacheVol *cp = config_vol->cachep;
2862 ink_assert(cp->size <= size_in_blocks);
2863 if (cp->size == size_in_blocks) {
2864 gnvol += cp->num_vols;
2865 continue;
2866 }
2867 // else the size is greater...
2868 /* search the cp_list */
2869
2870 int *sorted_vols = new int[gndisks];
2871 for (int i = 0; i < gndisks; i++) {
2872 sorted_vols[i] = i;
2873 }
2874 for (int i = 0; i < gndisks - 1; i++) {
2875 int smallest = sorted_vols[i];
2876 int smallest_ndx = i;
2877 for (int j = i + 1; j < gndisks; j++) {
2878 int curr = sorted_vols[j];
2879 DiskVol *dvol = cp->disk_vols[curr];
2880 if (gdisks[curr]->cleared) {
2881 ink_assert(!dvol);
2882 // disks that are cleared should be filled first
2883 smallest = curr;
2884 smallest_ndx = j;
2885 } else if (!dvol && cp->disk_vols[smallest]) {
2886 smallest = curr;
2887 smallest_ndx = j;
2888 } else if (dvol && cp->disk_vols[smallest] && (dvol->size < cp->disk_vols[smallest]->size)) {
2889 smallest = curr;
2890 smallest_ndx = j;
2891 }
2892 }
2893 sorted_vols[smallest_ndx] = sorted_vols[i];
2894 sorted_vols[i] = smallest;
2895 }
2896
2897 int64_t size_to_alloc = size_in_blocks - cp->size;
2898 int disk_full = 0;
2899 for (int i = 0; (i < gndisks) && size_to_alloc; i++) {
2900 int disk_no = sorted_vols[i];
2901 ink_assert(cp->disk_vols[sorted_vols[gndisks - 1]]);
2902 int largest_vol = cp->disk_vols[sorted_vols[gndisks - 1]]->size;
2903
2904 /* allocate storage on new disk. Find the difference
2905 between the biggest volume on any disk and
2906 the volume on this disk and try to make
2907 them equal */
2908 int64_t size_diff = (cp->disk_vols[disk_no]) ? largest_vol - cp->disk_vols[disk_no]->size : largest_vol;
2909 size_diff = (size_diff < size_to_alloc) ? size_diff : size_to_alloc;
2910 /* if size_diff == 0, then then the disks have volumes of the
2911 same sizes, so we don't need to balance the disks */
2912 if (size_diff == 0) {
2913 break;
2914 }
2915
2916 DiskVolBlock *dpb;
2917 do {
2918 dpb = gdisks[disk_no]->create_volume(volume_number, size_diff, cp->scheme);
2919 if (dpb) {
2920 if (!cp->disk_vols[disk_no]) {
2921 cp->disk_vols[disk_no] = gdisks[disk_no]->get_diskvol(volume_number);
2922 }
2923 size_diff -= dpb->len;
2924 cp->size += dpb->len;
2925 cp->num_vols++;
2926 } else {
2927 break;
2928 }
2929 } while ((size_diff > 0));
2930
2931 if (!dpb) {
2932 disk_full++;
2933 }
2934
2935 size_to_alloc = size_in_blocks - cp->size;
2936 }
2937
2938 delete[] sorted_vols;
2939
2940 if (size_to_alloc) {
2941 if (create_volume(volume_number, size_to_alloc, cp->scheme, cp)) {
2942 return -1;
2943 }
2944 }
2945 gnvol += cp->num_vols;
2946 }
2947 }
2948 return 0;
2949 }
2950
2951 // This is some really bad code, and needs to be rewritten!
2952 int
create_volume(int volume_number,off_t size_in_blocks,int scheme,CacheVol * cp)2953 create_volume(int volume_number, off_t size_in_blocks, int scheme, CacheVol *cp)
2954 {
2955 static int curr_vol = 0; // FIXME: this will not reinitialize correctly
2956 off_t to_create = size_in_blocks;
2957 off_t blocks_per_vol = VOL_BLOCK_SIZE >> STORE_BLOCK_SHIFT;
2958 int full_disks = 0;
2959
2960 cp->vol_number = volume_number;
2961 cp->scheme = scheme;
2962 if (fillExclusiveDisks(cp)) {
2963 Debug("cache_init", "volume successfully filled from forced disks: volume_number=%d", volume_number);
2964 return 0;
2965 }
2966
2967 int *sp = new int[gndisks];
2968 memset(sp, 0, gndisks * sizeof(int));
2969
2970 int i = curr_vol;
2971 while (size_in_blocks > 0) {
2972 if (gdisks[i]->free_space >= (sp[i] + blocks_per_vol)) {
2973 sp[i] += blocks_per_vol;
2974 size_in_blocks -= blocks_per_vol;
2975 full_disks = 0;
2976 } else {
2977 full_disks += 1;
2978 if (full_disks == gndisks) {
2979 char config_file[PATH_NAME_MAX];
2980 REC_ReadConfigString(config_file, "proxy.config.cache.volume_filename", PATH_NAME_MAX);
2981 if (cp->size) {
2982 Warning("not enough space to increase volume: [%d] to size: [%" PRId64 "]", volume_number,
2983 (int64_t)((to_create + cp->size) >> (20 - STORE_BLOCK_SHIFT)));
2984 } else {
2985 Warning("not enough space to create volume: [%d], size: [%" PRId64 "]", volume_number,
2986 (int64_t)(to_create >> (20 - STORE_BLOCK_SHIFT)));
2987 }
2988
2989 Note("edit the %s file and restart traffic_server", config_file);
2990 delete[] sp;
2991 return -1;
2992 }
2993 }
2994 i = (i + 1) % gndisks;
2995 }
2996 cp->vol_number = volume_number;
2997 cp->scheme = scheme;
2998 curr_vol = i;
2999 for (i = 0; i < gndisks; i++) {
3000 if (sp[i] > 0) {
3001 while (sp[i] > 0) {
3002 DiskVolBlock *p = gdisks[i]->create_volume(volume_number, sp[i], scheme);
3003 ink_assert(p && (p->len >= (unsigned int)blocks_per_vol));
3004 sp[i] -= p->len;
3005 cp->num_vols++;
3006 cp->size += p->len;
3007 }
3008 if (!cp->disk_vols[i]) {
3009 cp->disk_vols[i] = gdisks[i]->get_diskvol(volume_number);
3010 }
3011 }
3012 }
3013 delete[] sp;
3014 return 0;
3015 }
3016
3017 void
rebuild_host_table(Cache * cache)3018 rebuild_host_table(Cache *cache)
3019 {
3020 build_vol_hash_table(&cache->hosttable->gen_host_rec);
3021 if (cache->hosttable->m_numEntries != 0) {
3022 CacheHostMatcher *hm = cache->hosttable->getHostMatcher();
3023 CacheHostRecord *h_rec = hm->getDataArray();
3024 int h_rec_len = hm->getNumElements();
3025 int i;
3026 for (i = 0; i < h_rec_len; i++) {
3027 build_vol_hash_table(&h_rec[i]);
3028 }
3029 }
3030 }
3031
3032 // if generic_host_rec.vols == nullptr, what do we do???
3033 Vol *
key_to_vol(const CacheKey * key,const char * hostname,int host_len)3034 Cache::key_to_vol(const CacheKey *key, const char *hostname, int host_len)
3035 {
3036 uint32_t h = (key->slice32(2) >> DIR_TAG_WIDTH) % VOL_HASH_TABLE_SIZE;
3037 unsigned short *hash_table = hosttable->gen_host_rec.vol_hash_table;
3038 CacheHostRecord *host_rec = &hosttable->gen_host_rec;
3039
3040 if (hosttable->m_numEntries > 0 && host_len) {
3041 CacheHostResult res;
3042 hosttable->Match(hostname, host_len, &res);
3043 if (res.record) {
3044 unsigned short *host_hash_table = res.record->vol_hash_table;
3045 if (host_hash_table) {
3046 if (is_debug_tag_set("cache_hosting")) {
3047 char format_str[50];
3048 snprintf(format_str, sizeof(format_str), "Volume: %%xd for host: %%.%ds", host_len);
3049 Debug("cache_hosting", format_str, res.record, hostname);
3050 }
3051 return res.record->vols[host_hash_table[h]];
3052 }
3053 }
3054 }
3055 if (hash_table) {
3056 if (is_debug_tag_set("cache_hosting")) {
3057 char format_str[50];
3058 snprintf(format_str, sizeof(format_str), "Generic volume: %%xd for host: %%.%ds", host_len);
3059 Debug("cache_hosting", format_str, host_rec, hostname);
3060 }
3061 return host_rec->vols[hash_table[h]];
3062 } else {
3063 return host_rec->vols[0];
3064 }
3065 }
3066
3067 static void
reg_int(const char * str,int stat,RecRawStatBlock * rsb,const char * prefix,RecRawStatSyncCb sync_cb=RecRawStatSyncSum)3068 reg_int(const char *str, int stat, RecRawStatBlock *rsb, const char *prefix, RecRawStatSyncCb sync_cb = RecRawStatSyncSum)
3069 {
3070 char stat_str[256];
3071 snprintf(stat_str, sizeof(stat_str), "%s.%s", prefix, str);
3072 RecRegisterRawStat(rsb, RECT_PROCESS, stat_str, RECD_INT, RECP_NON_PERSISTENT, stat, sync_cb);
3073 DOCACHE_CLEAR_DYN_STAT(stat)
3074 }
3075 #define REG_INT(_str, _stat) reg_int(_str, (int)_stat, rsb, prefix)
3076
3077 // Register Stats
3078 void
register_cache_stats(RecRawStatBlock * rsb,const char * prefix)3079 register_cache_stats(RecRawStatBlock *rsb, const char *prefix)
3080 {
3081 // Special case for this sucker, since it uses its own aggregator.
3082 reg_int("bytes_used", cache_bytes_used_stat, rsb, prefix, cache_stats_bytes_used_cb);
3083
3084 REG_INT("bytes_total", cache_bytes_total_stat);
3085 REG_INT("ram_cache.total_bytes", cache_ram_cache_bytes_total_stat);
3086 REG_INT("ram_cache.bytes_used", cache_ram_cache_bytes_stat);
3087 REG_INT("ram_cache.hits", cache_ram_cache_hits_stat);
3088 REG_INT("ram_cache.misses", cache_ram_cache_misses_stat);
3089 REG_INT("pread_count", cache_pread_count_stat);
3090 REG_INT("percent_full", cache_percent_full_stat);
3091 REG_INT("lookup.active", cache_lookup_active_stat);
3092 REG_INT("lookup.success", cache_lookup_success_stat);
3093 REG_INT("lookup.failure", cache_lookup_failure_stat);
3094 REG_INT("read.active", cache_read_active_stat);
3095 REG_INT("read.success", cache_read_success_stat);
3096 REG_INT("read.failure", cache_read_failure_stat);
3097 REG_INT("write.active", cache_write_active_stat);
3098 REG_INT("write.success", cache_write_success_stat);
3099 REG_INT("write.failure", cache_write_failure_stat);
3100 REG_INT("write.backlog.failure", cache_write_backlog_failure_stat);
3101 REG_INT("update.active", cache_update_active_stat);
3102 REG_INT("update.success", cache_update_success_stat);
3103 REG_INT("update.failure", cache_update_failure_stat);
3104 REG_INT("remove.active", cache_remove_active_stat);
3105 REG_INT("remove.success", cache_remove_success_stat);
3106 REG_INT("remove.failure", cache_remove_failure_stat);
3107 REG_INT("evacuate.active", cache_evacuate_active_stat);
3108 REG_INT("evacuate.success", cache_evacuate_success_stat);
3109 REG_INT("evacuate.failure", cache_evacuate_failure_stat);
3110 REG_INT("scan.active", cache_scan_active_stat);
3111 REG_INT("scan.success", cache_scan_success_stat);
3112 REG_INT("scan.failure", cache_scan_failure_stat);
3113 REG_INT("direntries.total", cache_direntries_total_stat);
3114 REG_INT("direntries.used", cache_direntries_used_stat);
3115 REG_INT("directory_collision", cache_directory_collision_count_stat);
3116 REG_INT("frags_per_doc.1", cache_single_fragment_document_count_stat);
3117 REG_INT("frags_per_doc.2", cache_two_fragment_document_count_stat);
3118 REG_INT("frags_per_doc.3+", cache_three_plus_plus_fragment_document_count_stat);
3119 REG_INT("read_busy.success", cache_read_busy_success_stat);
3120 REG_INT("read_busy.failure", cache_read_busy_failure_stat);
3121 REG_INT("write_bytes_stat", cache_write_bytes_stat);
3122 REG_INT("vector_marshals", cache_hdr_vector_marshal_stat);
3123 REG_INT("hdr_marshals", cache_hdr_marshal_stat);
3124 REG_INT("hdr_marshal_bytes", cache_hdr_marshal_bytes_stat);
3125 REG_INT("gc_bytes_evacuated", cache_gc_bytes_evacuated_stat);
3126 REG_INT("gc_frags_evacuated", cache_gc_frags_evacuated_stat);
3127 REG_INT("wrap_count", cache_directory_wrap_stat);
3128 REG_INT("sync.count", cache_directory_sync_count_stat);
3129 REG_INT("sync.bytes", cache_directory_sync_bytes_stat);
3130 REG_INT("sync.time", cache_directory_sync_time_stat);
3131 REG_INT("span.errors.read", cache_span_errors_read_stat);
3132 REG_INT("span.errors.write", cache_span_errors_write_stat);
3133 REG_INT("span.failing", cache_span_failing_stat);
3134 REG_INT("span.offline", cache_span_offline_stat);
3135 REG_INT("span.online", cache_span_online_stat);
3136 }
3137
3138 int
FragmentSizeUpdateCb(const char *,RecDataT,RecData data,void * cookie)3139 FragmentSizeUpdateCb(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data, void *cookie)
3140 {
3141 if (sizeof(Doc) >= static_cast<size_t>(data.rec_int) || static_cast<size_t>(data.rec_int) - sizeof(Doc) > MAX_FRAG_SIZE) {
3142 Warning("The fragments size exceed the limitation, ignore: %" PRId64 ", %d", data.rec_int, cache_config_target_fragment_size);
3143 return 0;
3144 }
3145
3146 cache_config_target_fragment_size = data.rec_int;
3147 return 0;
3148 }
3149
3150 void
ink_cache_init(ts::ModuleVersion v)3151 ink_cache_init(ts::ModuleVersion v)
3152 {
3153 ink_release_assert(v.check(CACHE_MODULE_VERSION));
3154
3155 cache_rsb = RecAllocateRawStatBlock(static_cast<int>(cache_stat_count));
3156
3157 REC_EstablishStaticConfigInteger(cache_config_ram_cache_size, "proxy.config.cache.ram_cache.size");
3158 Debug("cache_init", "proxy.config.cache.ram_cache.size = %" PRId64 " = %" PRId64 "Mb", cache_config_ram_cache_size,
3159 cache_config_ram_cache_size / (1024 * 1024));
3160
3161 REC_EstablishStaticConfigInt32(cache_config_ram_cache_algorithm, "proxy.config.cache.ram_cache.algorithm");
3162 REC_EstablishStaticConfigInt32(cache_config_ram_cache_compress, "proxy.config.cache.ram_cache.compress");
3163 REC_EstablishStaticConfigInt32(cache_config_ram_cache_compress_percent, "proxy.config.cache.ram_cache.compress_percent");
3164 REC_ReadConfigInt32(cache_config_ram_cache_use_seen_filter, "proxy.config.cache.ram_cache.use_seen_filter");
3165
3166 REC_EstablishStaticConfigInt32(cache_config_http_max_alts, "proxy.config.cache.limits.http.max_alts");
3167 Debug("cache_init", "proxy.config.cache.limits.http.max_alts = %d", cache_config_http_max_alts);
3168
3169 REC_EstablishStaticConfigInteger(cache_config_ram_cache_cutoff, "proxy.config.cache.ram_cache_cutoff");
3170 Debug("cache_init", "cache_config_ram_cache_cutoff = %" PRId64 " = %" PRId64 "Mb", cache_config_ram_cache_cutoff,
3171 cache_config_ram_cache_cutoff / (1024 * 1024));
3172
3173 REC_EstablishStaticConfigInt32(cache_config_permit_pinning, "proxy.config.cache.permit.pinning");
3174 Debug("cache_init", "proxy.config.cache.permit.pinning = %d", cache_config_permit_pinning);
3175
3176 REC_EstablishStaticConfigInt32(cache_config_dir_sync_frequency, "proxy.config.cache.dir.sync_frequency");
3177 Debug("cache_init", "proxy.config.cache.dir.sync_frequency = %d", cache_config_dir_sync_frequency);
3178
3179 REC_EstablishStaticConfigInt32(cache_config_select_alternate, "proxy.config.cache.select_alternate");
3180 Debug("cache_init", "proxy.config.cache.select_alternate = %d", cache_config_select_alternate);
3181
3182 REC_EstablishStaticConfigInt32(cache_config_max_doc_size, "proxy.config.cache.max_doc_size");
3183 Debug("cache_init", "proxy.config.cache.max_doc_size = %d = %dMb", cache_config_max_doc_size,
3184 cache_config_max_doc_size / (1024 * 1024));
3185
3186 REC_EstablishStaticConfigInt32(cache_config_mutex_retry_delay, "proxy.config.cache.mutex_retry_delay");
3187 Debug("cache_init", "proxy.config.cache.mutex_retry_delay = %dms", cache_config_mutex_retry_delay);
3188
3189 REC_EstablishStaticConfigInt32(cache_config_read_while_writer_max_retries, "proxy.config.cache.read_while_writer.max_retries");
3190 Debug("cache_init", "proxy.config.cache.read_while_writer.max_retries = %d", cache_config_read_while_writer_max_retries);
3191
3192 REC_EstablishStaticConfigInt32(cache_read_while_writer_retry_delay, "proxy.config.cache.read_while_writer_retry.delay");
3193 Debug("cache_init", "proxy.config.cache.read_while_writer_retry.delay = %dms", cache_read_while_writer_retry_delay);
3194
3195 REC_EstablishStaticConfigInt32(cache_config_hit_evacuate_percent, "proxy.config.cache.hit_evacuate_percent");
3196 Debug("cache_init", "proxy.config.cache.hit_evacuate_percent = %d", cache_config_hit_evacuate_percent);
3197
3198 REC_EstablishStaticConfigInt32(cache_config_hit_evacuate_size_limit, "proxy.config.cache.hit_evacuate_size_limit");
3199 Debug("cache_init", "proxy.config.cache.hit_evacuate_size_limit = %d", cache_config_hit_evacuate_size_limit);
3200
3201 REC_EstablishStaticConfigInt32(cache_config_force_sector_size, "proxy.config.cache.force_sector_size");
3202
3203 ink_assert(REC_RegisterConfigUpdateFunc("proxy.config.cache.target_fragment_size", FragmentSizeUpdateCb, nullptr) !=
3204 REC_ERR_FAIL);
3205 REC_ReadConfigInt32(cache_config_target_fragment_size, "proxy.config.cache.target_fragment_size");
3206
3207 if (cache_config_target_fragment_size == 0 || cache_config_target_fragment_size - sizeof(Doc) > MAX_FRAG_SIZE) {
3208 cache_config_target_fragment_size = DEFAULT_TARGET_FRAGMENT_SIZE;
3209 }
3210
3211 REC_EstablishStaticConfigInt32(cache_config_max_disk_errors, "proxy.config.cache.max_disk_errors");
3212 Debug("cache_init", "proxy.config.cache.max_disk_errors = %d", cache_config_max_disk_errors);
3213
3214 REC_EstablishStaticConfigInt32(cache_config_agg_write_backlog, "proxy.config.cache.agg_write_backlog");
3215 Debug("cache_init", "proxy.config.cache.agg_write_backlog = %d", cache_config_agg_write_backlog);
3216
3217 REC_EstablishStaticConfigInt32(cache_config_enable_checksum, "proxy.config.cache.enable_checksum");
3218 Debug("cache_init", "proxy.config.cache.enable_checksum = %d", cache_config_enable_checksum);
3219
3220 REC_EstablishStaticConfigInt32(cache_config_alt_rewrite_max_size, "proxy.config.cache.alt_rewrite_max_size");
3221 Debug("cache_init", "proxy.config.cache.alt_rewrite_max_size = %d", cache_config_alt_rewrite_max_size);
3222
3223 REC_EstablishStaticConfigInt32(cache_config_read_while_writer, "proxy.config.cache.enable_read_while_writer");
3224 cache_config_read_while_writer = validate_rww(cache_config_read_while_writer);
3225 REC_RegisterConfigUpdateFunc("proxy.config.cache.enable_read_while_writer", update_cache_config, nullptr);
3226 Debug("cache_init", "proxy.config.cache.enable_read_while_writer = %d", cache_config_read_while_writer);
3227
3228 register_cache_stats(cache_rsb, "proxy.process.cache");
3229
3230 REC_ReadConfigInteger(cacheProcessor.wait_for_cache, "proxy.config.http.wait_for_cache");
3231
3232 Result result = theCacheStore.read_config();
3233 if (result.failed()) {
3234 Fatal("Failed to read cache configuration %s: %s", ts::filename::STORAGE, result.message());
3235 }
3236 }
3237
3238 //----------------------------------------------------------------------------
3239 Action *
open_read(Continuation * cont,const HttpCacheKey * key,CacheHTTPHdr * request,const OverridableHttpConfigParams * params,time_t pin_in_cache,CacheFragType type)3240 CacheProcessor::open_read(Continuation *cont, const HttpCacheKey *key, CacheHTTPHdr *request,
3241 const OverridableHttpConfigParams *params, time_t pin_in_cache, CacheFragType type)
3242 {
3243 return caches[type]->open_read(cont, &key->hash, request, params, type, key->hostname, key->hostlen);
3244 }
3245
3246 //----------------------------------------------------------------------------
3247 Action *
open_write(Continuation * cont,int expected_size,const HttpCacheKey * key,CacheHTTPHdr * request,CacheHTTPInfo * old_info,time_t pin_in_cache,CacheFragType type)3248 CacheProcessor::open_write(Continuation *cont, int expected_size, const HttpCacheKey *key, CacheHTTPHdr *request,
3249 CacheHTTPInfo *old_info, time_t pin_in_cache, CacheFragType type)
3250 {
3251 return caches[type]->open_write(cont, &key->hash, old_info, pin_in_cache, nullptr /* key1 */, type, key->hostname, key->hostlen);
3252 }
3253
3254 //----------------------------------------------------------------------------
3255 // Note: this should not be called from from the cluster processor, or bad
3256 // recursion could occur. This is merely a convenience wrapper.
3257 Action *
remove(Continuation * cont,const HttpCacheKey * key,CacheFragType frag_type)3258 CacheProcessor::remove(Continuation *cont, const HttpCacheKey *key, CacheFragType frag_type)
3259 {
3260 return caches[frag_type]->remove(cont, &key->hash, frag_type, key->hostname, key->hostlen);
3261 }
3262
3263 CacheDisk *
find_by_path(const char * path,int len)3264 CacheProcessor::find_by_path(const char *path, int len)
3265 {
3266 if (CACHE_INITIALIZED == initialized) {
3267 // If no length is passed in, assume it's null terminated.
3268 if (0 >= len && 0 != *path) {
3269 len = strlen(path);
3270 }
3271
3272 for (int i = 0; i < gndisks; ++i) {
3273 if (0 == strncmp(path, gdisks[i]->path, len)) {
3274 return gdisks[i];
3275 }
3276 }
3277 }
3278
3279 return nullptr;
3280 }
3281