1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #pragma once
25
26 #include "tscore/ink_platform.h"
27 #include "tscore/InkErrno.h"
28
29 #include "HTTP.h"
30 #include "P_CacheHttp.h"
31
32 struct EvacuationBlock;
33
34 // Compilation Options
35
36 #define ALTERNATES 1
37 // #define CACHE_LOCK_FAIL_RATE 0.001
38 // #define CACHE_AGG_FAIL_RATE 0.005
39 // #define CACHE_INSPECTOR_PAGES
40 #define MAX_CACHE_VCS_PER_THREAD 500
41
42 #define INTEGRAL_FRAGS 4
43
44 #ifdef CACHE_INSPECTOR_PAGES
45 #ifdef DEBUG
46 #define CACHE_STAT_PAGES
47 #endif
48 #endif
49
50 #ifdef DEBUG
51 #define DDebug(tag, fmt, ...) Debug(tag, fmt, ##__VA_ARGS__)
52 #else
53 #define DDebug(tag, fmt, ...)
54 #endif
55
56 #define AIO_SOFT_FAILURE -100000
57
58 #ifndef CACHE_LOCK_FAIL_RATE
59 #define CACHE_TRY_LOCK(_l, _m, _t) MUTEX_TRY_LOCK(_l, _m, _t)
60 #else
61 #define CACHE_TRY_LOCK(_l, _m, _t) \
62 MUTEX_TRY_LOCK(_l, _m, _t); \
63 if ((uint32_t)_t->generator.random() < (uint32_t)(UINT_MAX * CACHE_LOCK_FAIL_RATE)) \
64 CACHE_MUTEX_RELEASE(_l)
65 #endif
66
67 #define VC_LOCK_RETRY_EVENT() \
68 do { \
69 trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay), event); \
70 return EVENT_CONT; \
71 } while (0)
72
73 #define VC_SCHED_LOCK_RETRY() \
74 do { \
75 trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \
76 return EVENT_CONT; \
77 } while (0)
78
79 #define CONT_SCHED_LOCK_RETRY_RET(_c) \
80 do { \
81 _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \
82 return EVENT_CONT; \
83 } while (0)
84
85 #define CONT_SCHED_LOCK_RETRY(_c) _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay))
86
87 #define VC_SCHED_WRITER_RETRY() \
88 do { \
89 ink_assert(!trigger); \
90 writer_lock_retry++; \
91 ink_hrtime _t = HRTIME_MSECONDS(cache_read_while_writer_retry_delay); \
92 if (writer_lock_retry > 2) \
93 _t = HRTIME_MSECONDS(cache_read_while_writer_retry_delay) * 2; \
94 trigger = mutex->thread_holding->schedule_in_local(this, _t); \
95 return EVENT_CONT; \
96 } while (0)
97
98 // cache stats definitions
99 enum {
100 cache_bytes_used_stat,
101 cache_bytes_total_stat,
102 cache_ram_cache_bytes_stat,
103 cache_ram_cache_bytes_total_stat,
104 cache_direntries_total_stat,
105 cache_direntries_used_stat,
106 cache_ram_cache_hits_stat,
107 cache_ram_cache_misses_stat,
108 cache_pread_count_stat,
109 cache_percent_full_stat,
110 cache_lookup_active_stat,
111 cache_lookup_success_stat,
112 cache_lookup_failure_stat,
113 cache_read_active_stat,
114 cache_read_success_stat,
115 cache_read_failure_stat,
116 cache_write_active_stat,
117 cache_write_success_stat,
118 cache_write_failure_stat,
119 cache_write_backlog_failure_stat,
120 cache_update_active_stat,
121 cache_update_success_stat,
122 cache_update_failure_stat,
123 cache_remove_active_stat,
124 cache_remove_success_stat,
125 cache_remove_failure_stat,
126 cache_evacuate_active_stat,
127 cache_evacuate_success_stat,
128 cache_evacuate_failure_stat,
129 cache_scan_active_stat,
130 cache_scan_success_stat,
131 cache_scan_failure_stat,
132 cache_directory_collision_count_stat,
133 cache_single_fragment_document_count_stat,
134 cache_two_fragment_document_count_stat,
135 cache_three_plus_plus_fragment_document_count_stat,
136 cache_read_busy_success_stat,
137 cache_read_busy_failure_stat,
138 cache_gc_bytes_evacuated_stat,
139 cache_gc_frags_evacuated_stat,
140 cache_write_bytes_stat,
141 cache_hdr_vector_marshal_stat,
142 cache_hdr_marshal_stat,
143 cache_hdr_marshal_bytes_stat,
144 cache_directory_wrap_stat,
145 cache_directory_sync_count_stat,
146 cache_directory_sync_time_stat,
147 cache_directory_sync_bytes_stat,
148 /* AIO read/write error counters */
149 cache_span_errors_read_stat,
150 cache_span_errors_write_stat,
151 /* Span related gauges. A span "moves" from "online" (errors==0)
152 * to "failing" (errors > 0 && errors < proxy.config.cache.max_disk_errors)
153 * to "offline"(errors >= proxy.config.cache.max_disk_errors.
154 * "failing" + "offline" + "online" = total number of spans */
155 cache_span_offline_stat,
156 cache_span_online_stat,
157 cache_span_failing_stat,
158 cache_stat_count
159 };
160
161 extern RecRawStatBlock *cache_rsb;
162
163 #define GLOBAL_CACHE_SET_DYN_STAT(x, y) RecSetGlobalRawStatSum(cache_rsb, (x), (y))
164
165 #define CACHE_SET_DYN_STAT(x, y) \
166 RecSetGlobalRawStatSum(cache_rsb, (x), (y)) RecSetGlobalRawStatSum(vol->cache_vol->vol_rsb, (x), (y))
167
168 #define CACHE_INCREMENT_DYN_STAT(x) \
169 do { \
170 RecIncrRawStat(cache_rsb, mutex->thread_holding, (int)(x), 1); \
171 RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), 1); \
172 } while (0);
173
174 #define CACHE_DECREMENT_DYN_STAT(x) \
175 do { \
176 RecIncrRawStat(cache_rsb, mutex->thread_holding, (int)(x), -1); \
177 RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), -1); \
178 } while (0);
179
180 #define CACHE_VOL_SUM_DYN_STAT(x, y) RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), (int64_t)y);
181
182 #define CACHE_SUM_DYN_STAT(x, y) \
183 do { \
184 RecIncrRawStat(cache_rsb, mutex->thread_holding, (int)(x), (int64_t)(y)); \
185 RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), (int64_t)(y)); \
186 } while (0);
187
188 #define CACHE_SUM_DYN_STAT_THREAD(x, y) \
189 do { \
190 RecIncrRawStat(cache_rsb, this_ethread(), (int)(x), (int64_t)(y)); \
191 RecIncrRawStat(vol->cache_vol->vol_rsb, this_ethread(), (int)(x), (int64_t)(y)); \
192 } while (0);
193
194 #define GLOBAL_CACHE_SUM_GLOBAL_DYN_STAT(x, y) RecIncrGlobalRawStatSum(cache_rsb, (x), (y))
195
196 #define CACHE_SUM_GLOBAL_DYN_STAT(x, y) \
197 RecIncrGlobalRawStatSum(cache_rsb, (x), (y)) RecIncrGlobalRawStatSum(vol->cache_vol->vol_rsb, (x), (y))
198
199 #define CACHE_CLEAR_DYN_STAT(x) \
200 do { \
201 RecSetRawStatSum(cache_rsb, (x), 0); \
202 RecSetRawStatCount(cache_rsb, (x), 0); \
203 RecSetRawStatSum(vol->cache_vol->vol_rsb, (x), 0); \
204 RecSetRawStatCount(vol->cache_vol->vol_rsb, (x), 0); \
205 } while (0);
206
207 // Configuration
208 extern int cache_config_dir_sync_frequency;
209 extern int cache_config_http_max_alts;
210 extern int cache_config_permit_pinning;
211 extern int cache_config_select_alternate;
212 extern int cache_config_max_doc_size;
213 extern int cache_config_min_average_object_size;
214 extern int cache_config_agg_write_backlog;
215 extern int cache_config_enable_checksum;
216 extern int cache_config_alt_rewrite_max_size;
217 extern int cache_config_read_while_writer;
218 extern int cache_config_agg_write_backlog;
219 extern int cache_config_ram_cache_compress;
220 extern int cache_config_ram_cache_compress_percent;
221 extern int cache_config_ram_cache_use_seen_filter;
222 extern int cache_config_hit_evacuate_percent;
223 extern int cache_config_hit_evacuate_size_limit;
224 extern int cache_config_force_sector_size;
225 extern int cache_config_target_fragment_size;
226 extern int cache_config_mutex_retry_delay;
227 extern int cache_read_while_writer_retry_delay;
228 extern int cache_config_read_while_writer_max_retries;
229
230 // CacheVC
231 struct CacheVC : public CacheVConnection {
232 CacheVC();
233
234 VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) override;
235 VIO *do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *buf, int64_t offset) override;
236 VIO *do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner = false) override;
237 void do_io_close(int lerrno = -1) override;
238 void reenable(VIO *avio) override;
239 void reenable_re(VIO *avio) override;
240 bool get_data(int i, void *data) override;
241 bool set_data(int i, void *data) override;
242
243 bool
is_ram_cache_hitCacheVC244 is_ram_cache_hit() const override
245 {
246 ink_assert(vio.op == VIO::READ);
247 return !f.not_from_ram_cache;
248 }
249
250 int
get_headerCacheVC251 get_header(void **ptr, int *len) override
252 {
253 if (first_buf) {
254 Doc *doc = (Doc *)first_buf->data();
255 *ptr = doc->hdr();
256 *len = doc->hlen;
257 return 0;
258 }
259
260 return -1;
261 }
262
263 int
set_headerCacheVC264 set_header(void *ptr, int len) override
265 {
266 header_to_write = ptr;
267 header_to_write_len = len;
268 return 0;
269 }
270
271 int
get_single_dataCacheVC272 get_single_data(void **ptr, int *len) override
273 {
274 if (first_buf) {
275 Doc *doc = (Doc *)first_buf->data();
276 if (doc->data_len() == doc->total_len) {
277 *ptr = doc->data();
278 *len = doc->data_len();
279 return 0;
280 }
281 }
282
283 return -1;
284 }
285
286 int
get_volume_numberCacheVC287 get_volume_number() const override
288 {
289 if (vol && vol->cache_vol) {
290 return vol->cache_vol->vol_number;
291 }
292
293 return -1;
294 }
295
296 const char *
get_disk_pathCacheVC297 get_disk_path() const override
298 {
299 if (vol && vol->disk) {
300 return vol->disk->path;
301 }
302
303 return nullptr;
304 }
305
306 bool
is_compressed_in_ramCacheVC307 is_compressed_in_ram() const override
308 {
309 ink_assert(vio.op == VIO::READ);
310 return f.compressed_in_ram;
311 }
312
313 bool writer_done();
314 int calluser(int event);
315 int callcont(int event);
316 int die();
317 int dead(int event, Event *e);
318
319 int handleReadDone(int event, Event *e);
320 int handleRead(int event, Event *e);
321 int do_read_call(CacheKey *akey);
322 int handleWrite(int event, Event *e);
323 int handleWriteLock(int event, Event *e);
324 int do_write_call();
325 int do_write_lock();
326 int do_write_lock_call();
327 int do_sync(uint32_t target_write_serial);
328
329 int openReadClose(int event, Event *e);
330 int openReadReadDone(int event, Event *e);
331 int openReadMain(int event, Event *e);
332 int openReadStartEarliest(int event, Event *e);
333 int openReadVecWrite(int event, Event *e);
334 int openReadStartHead(int event, Event *e);
335 int openReadFromWriter(int event, Event *e);
336 int openReadFromWriterMain(int event, Event *e);
337 int openReadFromWriterFailure(int event, Event *);
338 int openReadChooseWriter(int event, Event *e);
339 int openReadDirDelete(int event, Event *e);
340
341 int openWriteCloseDir(int event, Event *e);
342 int openWriteCloseHeadDone(int event, Event *e);
343 int openWriteCloseHead(int event, Event *e);
344 int openWriteCloseDataDone(int event, Event *e);
345 int openWriteClose(int event, Event *e);
346 int openWriteRemoveVector(int event, Event *e);
347 int openWriteWriteDone(int event, Event *e);
348 int openWriteOverwrite(int event, Event *e);
349 int openWriteMain(int event, Event *e);
350 int openWriteStartDone(int event, Event *e);
351 int openWriteStartBegin(int event, Event *e);
352
353 int updateVector(int event, Event *e);
354 int updateReadDone(int event, Event *e);
355 int updateVecWrite(int event, Event *e);
356
357 int removeEvent(int event, Event *e);
358
359 int linkWrite(int event, Event *e);
360 int derefRead(int event, Event *e);
361
362 int scanVol(int event, Event *e);
363 int scanObject(int event, Event *e);
364 int scanUpdateDone(int event, Event *e);
365 int scanOpenWrite(int event, Event *e);
366 int scanRemoveDone(int event, Event *e);
367
368 int
is_io_in_progressCacheVC369 is_io_in_progress()
370 {
371 return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
372 }
373 void
set_io_not_in_progressCacheVC374 set_io_not_in_progress()
375 {
376 io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
377 }
378 void
set_agg_write_in_progressCacheVC379 set_agg_write_in_progress()
380 {
381 io.aiocb.aio_fildes = AIO_AGG_WRITE_IN_PROGRESS;
382 }
383 int evacuateDocDone(int event, Event *e);
384 int evacuateReadHead(int event, Event *e);
385
386 void cancel_trigger();
387 int64_t get_object_size() override;
388 void set_http_info(CacheHTTPInfo *info) override;
389 void get_http_info(CacheHTTPInfo **info) override;
390 /** Get the fragment table.
391 @return The address of the start of the fragment table,
392 or @c nullptr if there is no fragment table.
393 */
394 virtual HTTPInfo::FragOffset *get_frag_table();
395 /** Load alt pointers and do fixups if needed.
396 @return Length of header data used for alternates.
397 */
398 virtual uint32_t load_http_info(CacheHTTPInfoVector *info, struct Doc *doc, RefCountObj *block_ptr = nullptr);
399 bool is_pread_capable() override;
400 bool set_pin_in_cache(time_t time_pin) override;
401 time_t get_pin_in_cache() override;
402
403 // offsets from the base stat
404 #define CACHE_STAT_ACTIVE 0
405 #define CACHE_STAT_SUCCESS 1
406 #define CACHE_STAT_FAILURE 2
407
408 // number of bytes to memset to 0 in the CacheVC when we free
409 // it. All member variables starting from vio are memset to 0.
410 // This variable is initialized in CacheVC constructor.
411 static int size_to_init;
412
413 // Start Region A
414 // This set of variables are not reset when the cacheVC is freed.
415 // A CacheVC must set these to the correct values whenever needed
416 // These are variables that are always set to the correct values
417 // before being used by the CacheVC
418 CacheKey key, first_key, earliest_key, update_key;
419 Dir dir, earliest_dir, overwrite_dir, first_dir;
420 // end Region A
421
422 // Start Region B
423 // These variables are individually cleared or reset when the
424 // CacheVC is freed. All these variables must be reset/cleared
425 // in free_CacheVC.
426 Action _action;
427 CacheHTTPHdr request;
428 CacheHTTPInfoVector vector;
429 CacheHTTPInfo alternate;
430 Ptr<IOBufferData> buf;
431 Ptr<IOBufferData> first_buf;
432 Ptr<IOBufferBlock> blocks; // data available to write
433 Ptr<IOBufferBlock> writer_buf;
434
435 OpenDirEntry *od = nullptr;
436 AIOCallbackInternal io;
437 int alternate_index = CACHE_ALT_INDEX_DEFAULT; // preferred position in vector
438 LINK(CacheVC, opendir_link);
439 #ifdef CACHE_STAT_PAGES
440 LINK(CacheVC, stat_link);
441 #endif
442 // end Region B
443
444 // Start Region C
445 // These variables are memset to 0 when the structure is freed.
446 // The size of this region is size_to_init which is initialized
447 // in the CacheVC constructor. It assumes that vio is the start
448 // of this region.
449 // NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the
450 // size_to_init initialization
451 VIO vio;
452 CacheFragType frag_type;
453 CacheHTTPInfo *info;
454 CacheHTTPInfoVector *write_vector;
455 const OverridableHttpConfigParams *params;
456 int header_len; // for communicating with agg_copy
457 int frag_len; // for communicating with agg_copy
458 uint32_t write_len; // for communicating with agg_copy
459 uint32_t agg_len; // for communicating with aggWrite
460 uint32_t write_serial; // serial of the final write for SYNC
461 Vol *vol;
462 Dir *last_collision;
463 Event *trigger;
464 CacheKey *read_key;
465 ContinuationHandler save_handler;
466 uint32_t pin_in_cache;
467 ink_hrtime start_time;
468 int base_stat;
469 int recursive;
470 int closed;
471 uint64_t seek_to; // pread offset
472 int64_t offset; // offset into 'blocks' of data to write
473 int64_t writer_offset; // offset of the writer for reading from a writer
474 int64_t length; // length of data available to write
475 int64_t doc_pos; // read position in 'buf'
476 uint64_t write_pos; // length written
477 uint64_t total_len; // total length written and available to write
478 uint64_t doc_len; // total_length (of the selected alternate for HTTP)
479 uint64_t update_len;
480 int fragment;
481 int scan_msec_delay;
482 CacheVC *write_vc;
483 char *hostname;
484 int host_len;
485 int header_to_write_len;
486 void *header_to_write;
487 short writer_lock_retry;
488 union {
489 uint32_t flags;
490 struct {
491 unsigned int use_first_key : 1;
492 unsigned int overwrite : 1; // overwrite first_key Dir if it exists
493 unsigned int close_complete : 1; // WRITE_COMPLETE is final
494 unsigned int sync : 1; // write to be committed to durable storage before WRITE_COMPLETE
495 unsigned int evacuator : 1;
496 unsigned int single_fragment : 1;
497 unsigned int evac_vector : 1;
498 unsigned int lookup : 1;
499 unsigned int update : 1;
500 unsigned int remove : 1;
501 unsigned int remove_aborted_writers : 1;
502 unsigned int open_read_timeout : 1; // UNUSED
503 unsigned int data_done : 1;
504 unsigned int read_from_writer_called : 1;
505 unsigned int not_from_ram_cache : 1; // entire object was from ram cache
506 unsigned int rewrite_resident_alt : 1;
507 unsigned int readers : 1;
508 unsigned int doc_from_ram_cache : 1;
509 unsigned int hit_evacuate : 1;
510 unsigned int compressed_in_ram : 1; // compressed state in ram cache
511 unsigned int allow_empty_doc : 1; // used for cache empty http document
512 } f;
513 };
514 // BTF optimization used to skip reading stuff in cache partition that doesn't contain any
515 // dir entries.
516 char *scan_vol_map;
517 // BTF fix to handle objects that overlapped over two different reads,
518 // this is how much we need to back up the buffer to get the start of the overlapping object.
519 off_t scan_fix_buffer_offset;
520 // end region C
521 };
522
523 #define PUSH_HANDLER(_x) \
524 do { \
525 ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \
526 save_handler = handler; \
527 handler = (ContinuationHandler)(_x); \
528 } while (0)
529
530 #define POP_HANDLER \
531 do { \
532 handler = save_handler; \
533 ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \
534 } while (0)
535
536 struct CacheRemoveCont : public Continuation {
537 int event_handler(int event, void *data);
538
CacheRemoveContCacheRemoveCont539 CacheRemoveCont() : Continuation(nullptr) {}
540 };
541
542 // Global Data
543 extern ClassAllocator<CacheVC> cacheVConnectionAllocator;
544 extern CacheKey zero_key;
545 extern CacheSync *cacheDirSync;
546 // Function Prototypes
547 int cache_write(CacheVC *, CacheHTTPInfoVector *);
548 int get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key);
549 CacheVC *new_DocEvacuator(int nbytes, Vol *d);
550
551 // inline Functions
552
553 TS_INLINE CacheVC *
new_CacheVC(Continuation * cont)554 new_CacheVC(Continuation *cont)
555 {
556 EThread *t = cont->mutex->thread_holding;
557 CacheVC *c = THREAD_ALLOC(cacheVConnectionAllocator, t);
558 c->vector.data.data = &c->vector.data.fast_data[0];
559 c->_action = cont;
560 c->mutex = cont->mutex;
561 c->start_time = Thread::get_hrtime();
562 c->setThreadAffinity(t);
563 ink_assert(c->trigger == nullptr);
564 Debug("cache_new", "new %p", c);
565 #ifdef CACHE_STAT_PAGES
566 ink_assert(!c->stat_link.next);
567 ink_assert(!c->stat_link.prev);
568 #endif
569 dir_clear(&c->dir);
570 return c;
571 }
572
573 TS_INLINE int
free_CacheVC(CacheVC * cont)574 free_CacheVC(CacheVC *cont)
575 {
576 Debug("cache_free", "free %p", cont);
577 ProxyMutex *mutex = cont->mutex.get();
578 Vol *vol = cont->vol;
579 if (vol) {
580 CACHE_DECREMENT_DYN_STAT(cont->base_stat + CACHE_STAT_ACTIVE);
581 if (cont->closed > 0) {
582 CACHE_INCREMENT_DYN_STAT(cont->base_stat + CACHE_STAT_SUCCESS);
583 } // else abort,cancel
584 }
585 ink_assert(mutex->thread_holding == this_ethread());
586 if (cont->trigger) {
587 cont->trigger->cancel();
588 }
589 ink_assert(!cont->is_io_in_progress());
590 ink_assert(!cont->od);
591 /* calling cont->io.action = nullptr causes compile problem on 2.6 solaris
592 release build....weird??? For now, null out continuation and mutex
593 of the action separately */
594 cont->io.action.continuation = nullptr;
595 cont->io.action.mutex = nullptr;
596 cont->io.mutex.clear();
597 cont->io.aio_result = 0;
598 cont->io.aiocb.aio_nbytes = 0;
599 cont->request.reset();
600 cont->vector.clear();
601 cont->vio.buffer.clear();
602 cont->vio.mutex.clear();
603 if (cont->vio.op == VIO::WRITE && cont->alternate_index == CACHE_ALT_INDEX_DEFAULT) {
604 cont->alternate.destroy();
605 } else {
606 cont->alternate.clear();
607 }
608 cont->_action.cancelled = 0;
609 cont->_action.mutex.clear();
610 cont->mutex.clear();
611 cont->buf.clear();
612 cont->first_buf.clear();
613 cont->blocks.clear();
614 cont->writer_buf.clear();
615 cont->alternate_index = CACHE_ALT_INDEX_DEFAULT;
616 if (cont->scan_vol_map) {
617 ats_free(cont->scan_vol_map);
618 }
619 memset((char *)&cont->vio, 0, cont->size_to_init);
620 #ifdef CACHE_STAT_PAGES
621 ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
622 #endif
623 #ifdef DEBUG
624 SET_CONTINUATION_HANDLER(cont, &CacheVC::dead);
625 #endif
626 THREAD_FREE(cont, cacheVConnectionAllocator, this_thread());
627 return EVENT_DONE;
628 }
629
630 TS_INLINE int
calluser(int event)631 CacheVC::calluser(int event)
632 {
633 recursive++;
634 ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
635 vio.cont->handleEvent(event, (void *)&vio);
636 recursive--;
637 if (closed) {
638 die();
639 return EVENT_DONE;
640 }
641 return EVENT_CONT;
642 }
643
644 TS_INLINE int
callcont(int event)645 CacheVC::callcont(int event)
646 {
647 recursive++;
648 ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
649 _action.continuation->handleEvent(event, this);
650 recursive--;
651 if (closed) {
652 die();
653 } else if (vio.vc_server) {
654 handleEvent(EVENT_IMMEDIATE, nullptr);
655 }
656 return EVENT_DONE;
657 }
658
659 TS_INLINE int
do_read_call(CacheKey * akey)660 CacheVC::do_read_call(CacheKey *akey)
661 {
662 doc_pos = 0;
663 read_key = akey;
664 io.aiocb.aio_nbytes = dir_approx_size(&dir);
665 PUSH_HANDLER(&CacheVC::handleRead);
666 return handleRead(EVENT_CALL, nullptr);
667 }
668
669 TS_INLINE int
do_write_call()670 CacheVC::do_write_call()
671 {
672 PUSH_HANDLER(&CacheVC::handleWrite);
673 return handleWrite(EVENT_CALL, nullptr);
674 }
675
676 TS_INLINE void
cancel_trigger()677 CacheVC::cancel_trigger()
678 {
679 if (trigger) {
680 trigger->cancel_action();
681 trigger = nullptr;
682 }
683 }
684
685 TS_INLINE int
die()686 CacheVC::die()
687 {
688 if (vio.op == VIO::WRITE) {
689 if (f.update && total_len) {
690 alternate.object_key_set(earliest_key);
691 }
692 if (!is_io_in_progress()) {
693 SET_HANDLER(&CacheVC::openWriteClose);
694 if (!recursive) {
695 openWriteClose(EVENT_NONE, nullptr);
696 }
697 } // else catch it at the end of openWriteWriteDone
698 return EVENT_CONT;
699 } else {
700 if (is_io_in_progress()) {
701 save_handler = (ContinuationHandler)&CacheVC::openReadClose;
702 } else {
703 SET_HANDLER(&CacheVC::openReadClose);
704 if (!recursive) {
705 openReadClose(EVENT_NONE, nullptr);
706 }
707 }
708 return EVENT_CONT;
709 }
710 }
711
712 TS_INLINE int
handleWriteLock(int,Event * e)713 CacheVC::handleWriteLock(int /* event ATS_UNUSED */, Event *e)
714 {
715 cancel_trigger();
716 int ret = 0;
717 {
718 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
719 if (!lock.is_locked()) {
720 set_agg_write_in_progress();
721 trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
722 return EVENT_CONT;
723 }
724 ret = handleWrite(EVENT_CALL, e);
725 }
726 if (ret == EVENT_RETURN) {
727 return handleEvent(AIO_EVENT_DONE, nullptr);
728 }
729 return EVENT_CONT;
730 }
731
732 TS_INLINE int
do_write_lock()733 CacheVC::do_write_lock()
734 {
735 PUSH_HANDLER(&CacheVC::handleWriteLock);
736 return handleWriteLock(EVENT_NONE, nullptr);
737 }
738
739 TS_INLINE int
do_write_lock_call()740 CacheVC::do_write_lock_call()
741 {
742 PUSH_HANDLER(&CacheVC::handleWriteLock);
743 return handleWriteLock(EVENT_CALL, nullptr);
744 }
745
746 TS_INLINE bool
writer_done()747 CacheVC::writer_done()
748 {
749 OpenDirEntry *cod = od;
750 if (!cod) {
751 cod = vol->open_read(&first_key);
752 }
753 CacheVC *w = (cod) ? cod->writers.head : nullptr;
754 // If the write vc started after the reader, then its not the
755 // original writer, since we never choose a writer that started
756 // after the reader. The original writer was deallocated and then
757 // reallocated for the same first_key
758 for (; w && (w != write_vc || w->start_time > start_time); w = (CacheVC *)w->opendir_link.next) {
759 ;
760 }
761 if (!w) {
762 return true;
763 }
764 return false;
765 }
766
767 TS_INLINE int
close_write(CacheVC * cont)768 Vol::close_write(CacheVC *cont)
769 {
770 #ifdef CACHE_STAT_PAGES
771 ink_assert(stat_cache_vcs.head);
772 stat_cache_vcs.remove(cont, cont->stat_link);
773 ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
774 #endif
775 return open_dir.close_write(cont);
776 }
777
778 // Returns 0 on success or a positive error code on failure
779 TS_INLINE int
open_write(CacheVC * cont,int allow_if_writers,int max_writers)780 Vol::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
781 {
782 Vol *vol = this;
783 bool agg_error = false;
784 if (!cont->f.remove) {
785 agg_error = (!cont->f.update && agg_todo_size > cache_config_agg_write_backlog);
786 #ifdef CACHE_AGG_FAIL_RATE
787 agg_error = agg_error || ((uint32_t)mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX * CACHE_AGG_FAIL_RATE));
788 #endif
789 }
790 if (agg_error) {
791 CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
792 return ECACHE_WRITE_FAIL;
793 }
794 if (open_dir.open_write(cont, allow_if_writers, max_writers)) {
795 #ifdef CACHE_STAT_PAGES
796 ink_assert(cont->mutex->thread_holding == this_ethread());
797 ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
798 stat_cache_vcs.enqueue(cont, cont->stat_link);
799 #endif
800 return 0;
801 }
802 return ECACHE_DOC_BUSY;
803 }
804
805 TS_INLINE int
close_write_lock(CacheVC * cont)806 Vol::close_write_lock(CacheVC *cont)
807 {
808 EThread *t = cont->mutex->thread_holding;
809 CACHE_TRY_LOCK(lock, mutex, t);
810 if (!lock.is_locked()) {
811 return -1;
812 }
813 return close_write(cont);
814 }
815
816 TS_INLINE int
open_write_lock(CacheVC * cont,int allow_if_writers,int max_writers)817 Vol::open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers)
818 {
819 EThread *t = cont->mutex->thread_holding;
820 CACHE_TRY_LOCK(lock, mutex, t);
821 if (!lock.is_locked()) {
822 return -1;
823 }
824 return open_write(cont, allow_if_writers, max_writers);
825 }
826
827 TS_INLINE OpenDirEntry *
open_read_lock(CryptoHash * key,EThread * t)828 Vol::open_read_lock(CryptoHash *key, EThread *t)
829 {
830 CACHE_TRY_LOCK(lock, mutex, t);
831 if (!lock.is_locked()) {
832 return nullptr;
833 }
834 return open_dir.open_read(key);
835 }
836
837 TS_INLINE int
begin_read_lock(CacheVC * cont)838 Vol::begin_read_lock(CacheVC *cont)
839 {
840 // no need for evacuation as the entire document is already in memory
841 #ifndef CACHE_STAT_PAGES
842 if (cont->f.single_fragment) {
843 return 0;
844 }
845 #endif
846 // VC is enqueued in stat_cache_vcs in the begin_read call
847 EThread *t = cont->mutex->thread_holding;
848 CACHE_TRY_LOCK(lock, mutex, t);
849 if (!lock.is_locked()) {
850 return -1;
851 }
852 return begin_read(cont);
853 }
854
855 TS_INLINE int
close_read_lock(CacheVC * cont)856 Vol::close_read_lock(CacheVC *cont)
857 {
858 EThread *t = cont->mutex->thread_holding;
859 CACHE_TRY_LOCK(lock, mutex, t);
860 if (!lock.is_locked()) {
861 return -1;
862 }
863 return close_read(cont);
864 }
865
866 TS_INLINE int
dir_delete_lock(CacheKey * key,Vol * d,ProxyMutex * m,Dir * del)867 dir_delete_lock(CacheKey *key, Vol *d, ProxyMutex *m, Dir *del)
868 {
869 EThread *thread = m->thread_holding;
870 CACHE_TRY_LOCK(lock, d->mutex, thread);
871 if (!lock.is_locked()) {
872 return -1;
873 }
874 return dir_delete(key, d, del);
875 }
876
877 TS_INLINE int
dir_insert_lock(CacheKey * key,Vol * d,Dir * to_part,ProxyMutex * m)878 dir_insert_lock(CacheKey *key, Vol *d, Dir *to_part, ProxyMutex *m)
879 {
880 EThread *thread = m->thread_holding;
881 CACHE_TRY_LOCK(lock, d->mutex, thread);
882 if (!lock.is_locked()) {
883 return -1;
884 }
885 return dir_insert(key, d, to_part);
886 }
887
888 TS_INLINE int
889 dir_overwrite_lock(CacheKey *key, Vol *d, Dir *to_part, ProxyMutex *m, Dir *overwrite, bool must_overwrite = true)
890 {
891 EThread *thread = m->thread_holding;
892 CACHE_TRY_LOCK(lock, d->mutex, thread);
893 if (!lock.is_locked()) {
894 return -1;
895 }
896 return dir_overwrite(key, d, to_part, overwrite, must_overwrite);
897 }
898
899 void TS_INLINE
rand_CacheKey(CacheKey * next_key,Ptr<ProxyMutex> & mutex)900 rand_CacheKey(CacheKey *next_key, Ptr<ProxyMutex> &mutex)
901 {
902 next_key->b[0] = mutex->thread_holding->generator.random();
903 next_key->b[1] = mutex->thread_holding->generator.random();
904 }
905
906 extern uint8_t CacheKey_next_table[];
907 void TS_INLINE
next_CacheKey(CacheKey * next_key,CacheKey * key)908 next_CacheKey(CacheKey *next_key, CacheKey *key)
909 {
910 uint8_t *b = (uint8_t *)next_key;
911 uint8_t *k = (uint8_t *)key;
912 b[0] = CacheKey_next_table[k[0]];
913 for (int i = 1; i < 16; i++) {
914 b[i] = CacheKey_next_table[(b[i - 1] + k[i]) & 0xFF];
915 }
916 }
917 extern uint8_t CacheKey_prev_table[];
918 void TS_INLINE
prev_CacheKey(CacheKey * prev_key,CacheKey * key)919 prev_CacheKey(CacheKey *prev_key, CacheKey *key)
920 {
921 uint8_t *b = (uint8_t *)prev_key;
922 uint8_t *k = (uint8_t *)key;
923 for (int i = 15; i > 0; i--) {
924 b[i] = 256 + CacheKey_prev_table[k[i]] - k[i - 1];
925 }
926 b[0] = CacheKey_prev_table[k[0]];
927 }
928
929 TS_INLINE unsigned int
next_rand(unsigned int * p)930 next_rand(unsigned int *p)
931 {
932 unsigned int seed = *p;
933 seed = 1103515145 * seed + 12345;
934 *p = seed;
935 return seed;
936 }
937
938 extern ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator;
939
940 TS_INLINE CacheRemoveCont *
new_CacheRemoveCont()941 new_CacheRemoveCont()
942 {
943 CacheRemoveCont *cache_rm = cacheRemoveContAllocator.alloc();
944
945 cache_rm->mutex = new_ProxyMutex();
946 SET_CONTINUATION_HANDLER(cache_rm, &CacheRemoveCont::event_handler);
947 return cache_rm;
948 }
949
950 TS_INLINE void
free_CacheRemoveCont(CacheRemoveCont * cache_rm)951 free_CacheRemoveCont(CacheRemoveCont *cache_rm)
952 {
953 cache_rm->mutex = nullptr;
954 cacheRemoveContAllocator.free(cache_rm);
955 }
956
957 TS_INLINE int
event_handler(int event,void * data)958 CacheRemoveCont::event_handler(int event, void *data)
959 {
960 (void)event;
961 (void)data;
962 free_CacheRemoveCont(this);
963 return EVENT_DONE;
964 }
965
966 int64_t cache_bytes_used();
967 int64_t cache_bytes_total();
968
969 #ifdef DEBUG
970 #define CACHE_DEBUG_INCREMENT_DYN_STAT(_x) CACHE_INCREMENT_DYN_STAT(_x)
971 #define CACHE_DEBUG_SUM_DYN_STAT(_x, _y) CACHE_SUM_DYN_STAT(_x, _y)
972 #else
973 #define CACHE_DEBUG_INCREMENT_DYN_STAT(_x)
974 #define CACHE_DEBUG_SUM_DYN_STAT(_x, _y)
975 #endif
976
977 struct CacheHostRecord;
978 struct Vol;
979 class CacheHostTable;
980
981 struct Cache {
982 int cache_read_done = 0;
983 int total_good_nvol = 0;
984 int total_nvol = 0;
985 int ready = CACHE_INITIALIZING;
986 int64_t cache_size = 0; // in store block size
987 CacheHostTable *hosttable = nullptr;
988 int total_initialized_vol = 0;
989 CacheType scheme = CACHE_NONE_TYPE;
990
991 int open(bool reconfigure, bool fix);
992 int close();
993
994 Action *lookup(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len);
995 inkcoreapi Action *open_read(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int len);
996 inkcoreapi Action *open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_type, int options = 0,
997 time_t pin_in_cache = (time_t)0, const char *hostname = nullptr, int host_len = 0);
998 inkcoreapi Action *remove(Continuation *cont, const CacheKey *key, CacheFragType type = CACHE_FRAG_TYPE_HTTP,
999 const char *hostname = nullptr, int host_len = 0);
1000 Action *scan(Continuation *cont, const char *hostname = nullptr, int host_len = 0, int KB_per_second = 2500);
1001
1002 Action *open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, const OverridableHttpConfigParams *params,
1003 CacheFragType type, const char *hostname, int host_len);
1004 Action *open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t)0,
1005 const CacheKey *key1 = nullptr, CacheFragType type = CACHE_FRAG_TYPE_HTTP, const char *hostname = nullptr,
1006 int host_len = 0);
1007 static void generate_key(CryptoHash *hash, CacheURL *url);
1008 static void generate_key(HttpCacheKey *hash, CacheURL *url, cache_generation_t generation = -1);
1009
1010 Action *link(Continuation *cont, const CacheKey *from, const CacheKey *to, CacheFragType type, const char *hostname,
1011 int host_len);
1012 Action *deref(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len);
1013
1014 void vol_initialized(bool result);
1015
1016 int open_done();
1017
1018 Vol *key_to_vol(const CacheKey *key, const char *hostname, int host_len);
1019
CacheCache1020 Cache() {}
1021 };
1022
1023 extern Cache *theCache;
1024 inkcoreapi extern Cache *caches[NUM_CACHE_FRAG_TYPES];
1025
1026 TS_INLINE void
generate_key(CryptoHash * hash,CacheURL * url)1027 Cache::generate_key(CryptoHash *hash, CacheURL *url)
1028 {
1029 url->hash_get(hash);
1030 }
1031
1032 TS_INLINE void
generate_key(HttpCacheKey * key,CacheURL * url,cache_generation_t generation)1033 Cache::generate_key(HttpCacheKey *key, CacheURL *url, cache_generation_t generation)
1034 {
1035 key->hostname = url->host_get(&key->hostlen);
1036 url->hash_get(&key->hash, generation);
1037 }
1038
1039 TS_INLINE unsigned int
cache_hash(const CryptoHash & hash)1040 cache_hash(const CryptoHash &hash)
1041 {
1042 uint64_t f = hash.fold();
1043 unsigned int mhash = (unsigned int)(f >> 32);
1044 return mhash;
1045 }
1046
1047 LINK_DEFINITION(CacheVC, opendir_link)
1048