1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #pragma once
25 
26 #include "tscore/ink_platform.h"
27 #include "tscore/InkErrno.h"
28 
29 #include "HTTP.h"
30 #include "P_CacheHttp.h"
31 
32 struct EvacuationBlock;
33 
34 // Compilation Options
35 
36 #define ALTERNATES 1
37 // #define CACHE_LOCK_FAIL_RATE         0.001
38 // #define CACHE_AGG_FAIL_RATE          0.005
39 // #define CACHE_INSPECTOR_PAGES
40 #define MAX_CACHE_VCS_PER_THREAD 500
41 
42 #define INTEGRAL_FRAGS 4
43 
44 #ifdef CACHE_INSPECTOR_PAGES
45 #ifdef DEBUG
46 #define CACHE_STAT_PAGES
47 #endif
48 #endif
49 
50 #ifdef DEBUG
51 #define DDebug(tag, fmt, ...) Debug(tag, fmt, ##__VA_ARGS__)
52 #else
53 #define DDebug(tag, fmt, ...)
54 #endif
55 
56 #define AIO_SOFT_FAILURE -100000
57 
58 #ifndef CACHE_LOCK_FAIL_RATE
59 #define CACHE_TRY_LOCK(_l, _m, _t) MUTEX_TRY_LOCK(_l, _m, _t)
60 #else
61 #define CACHE_TRY_LOCK(_l, _m, _t)                                                    \
62   MUTEX_TRY_LOCK(_l, _m, _t);                                                         \
63   if ((uint32_t)_t->generator.random() < (uint32_t)(UINT_MAX * CACHE_LOCK_FAIL_RATE)) \
64   CACHE_MUTEX_RELEASE(_l)
65 #endif
66 
67 #define VC_LOCK_RETRY_EVENT()                                                                                         \
68   do {                                                                                                                \
69     trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay), event); \
70     return EVENT_CONT;                                                                                                \
71   } while (0)
72 
73 #define VC_SCHED_LOCK_RETRY()                                                                                  \
74   do {                                                                                                         \
75     trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \
76     return EVENT_CONT;                                                                                         \
77   } while (0)
78 
79 #define CONT_SCHED_LOCK_RETRY_RET(_c)                                                                  \
80   do {                                                                                                 \
81     _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \
82     return EVENT_CONT;                                                                                 \
83   } while (0)
84 
85 #define CONT_SCHED_LOCK_RETRY(_c) _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay))
86 
87 #define VC_SCHED_WRITER_RETRY()                                           \
88   do {                                                                    \
89     ink_assert(!trigger);                                                 \
90     writer_lock_retry++;                                                  \
91     ink_hrtime _t = HRTIME_MSECONDS(cache_read_while_writer_retry_delay); \
92     if (writer_lock_retry > 2)                                            \
93       _t = HRTIME_MSECONDS(cache_read_while_writer_retry_delay) * 2;      \
94     trigger = mutex->thread_holding->schedule_in_local(this, _t);         \
95     return EVENT_CONT;                                                    \
96   } while (0)
97 
98 // cache stats definitions
99 enum {
100   cache_bytes_used_stat,
101   cache_bytes_total_stat,
102   cache_ram_cache_bytes_stat,
103   cache_ram_cache_bytes_total_stat,
104   cache_direntries_total_stat,
105   cache_direntries_used_stat,
106   cache_ram_cache_hits_stat,
107   cache_ram_cache_misses_stat,
108   cache_pread_count_stat,
109   cache_percent_full_stat,
110   cache_lookup_active_stat,
111   cache_lookup_success_stat,
112   cache_lookup_failure_stat,
113   cache_read_active_stat,
114   cache_read_success_stat,
115   cache_read_failure_stat,
116   cache_write_active_stat,
117   cache_write_success_stat,
118   cache_write_failure_stat,
119   cache_write_backlog_failure_stat,
120   cache_update_active_stat,
121   cache_update_success_stat,
122   cache_update_failure_stat,
123   cache_remove_active_stat,
124   cache_remove_success_stat,
125   cache_remove_failure_stat,
126   cache_evacuate_active_stat,
127   cache_evacuate_success_stat,
128   cache_evacuate_failure_stat,
129   cache_scan_active_stat,
130   cache_scan_success_stat,
131   cache_scan_failure_stat,
132   cache_directory_collision_count_stat,
133   cache_single_fragment_document_count_stat,
134   cache_two_fragment_document_count_stat,
135   cache_three_plus_plus_fragment_document_count_stat,
136   cache_read_busy_success_stat,
137   cache_read_busy_failure_stat,
138   cache_gc_bytes_evacuated_stat,
139   cache_gc_frags_evacuated_stat,
140   cache_write_bytes_stat,
141   cache_hdr_vector_marshal_stat,
142   cache_hdr_marshal_stat,
143   cache_hdr_marshal_bytes_stat,
144   cache_directory_wrap_stat,
145   cache_directory_sync_count_stat,
146   cache_directory_sync_time_stat,
147   cache_directory_sync_bytes_stat,
148   /* AIO read/write error counters */
149   cache_span_errors_read_stat,
150   cache_span_errors_write_stat,
151   /* Span related gauges. A span "moves" from "online" (errors==0)
152    * to "failing" (errors > 0 && errors < proxy.config.cache.max_disk_errors)
153    * to "offline"(errors >= proxy.config.cache.max_disk_errors.
154    * "failing" + "offline" + "online" = total number of spans */
155   cache_span_offline_stat,
156   cache_span_online_stat,
157   cache_span_failing_stat,
158   cache_stat_count
159 };
160 
161 extern RecRawStatBlock *cache_rsb;
162 
163 #define GLOBAL_CACHE_SET_DYN_STAT(x, y) RecSetGlobalRawStatSum(cache_rsb, (x), (y))
164 
165 #define CACHE_SET_DYN_STAT(x, y) \
166   RecSetGlobalRawStatSum(cache_rsb, (x), (y)) RecSetGlobalRawStatSum(vol->cache_vol->vol_rsb, (x), (y))
167 
168 #define CACHE_INCREMENT_DYN_STAT(x)                                              \
169   do {                                                                           \
170     RecIncrRawStat(cache_rsb, mutex->thread_holding, (int)(x), 1);               \
171     RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), 1); \
172   } while (0);
173 
174 #define CACHE_DECREMENT_DYN_STAT(x)                                               \
175   do {                                                                            \
176     RecIncrRawStat(cache_rsb, mutex->thread_holding, (int)(x), -1);               \
177     RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), -1); \
178   } while (0);
179 
180 #define CACHE_VOL_SUM_DYN_STAT(x, y) RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), (int64_t)y);
181 
182 #define CACHE_SUM_DYN_STAT(x, y)                                                            \
183   do {                                                                                      \
184     RecIncrRawStat(cache_rsb, mutex->thread_holding, (int)(x), (int64_t)(y));               \
185     RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), (int64_t)(y)); \
186   } while (0);
187 
188 #define CACHE_SUM_DYN_STAT_THREAD(x, y)                                              \
189   do {                                                                               \
190     RecIncrRawStat(cache_rsb, this_ethread(), (int)(x), (int64_t)(y));               \
191     RecIncrRawStat(vol->cache_vol->vol_rsb, this_ethread(), (int)(x), (int64_t)(y)); \
192   } while (0);
193 
194 #define GLOBAL_CACHE_SUM_GLOBAL_DYN_STAT(x, y) RecIncrGlobalRawStatSum(cache_rsb, (x), (y))
195 
196 #define CACHE_SUM_GLOBAL_DYN_STAT(x, y) \
197   RecIncrGlobalRawStatSum(cache_rsb, (x), (y)) RecIncrGlobalRawStatSum(vol->cache_vol->vol_rsb, (x), (y))
198 
199 #define CACHE_CLEAR_DYN_STAT(x)                          \
200   do {                                                   \
201     RecSetRawStatSum(cache_rsb, (x), 0);                 \
202     RecSetRawStatCount(cache_rsb, (x), 0);               \
203     RecSetRawStatSum(vol->cache_vol->vol_rsb, (x), 0);   \
204     RecSetRawStatCount(vol->cache_vol->vol_rsb, (x), 0); \
205   } while (0);
206 
207 // Configuration
208 extern int cache_config_dir_sync_frequency;
209 extern int cache_config_http_max_alts;
210 extern int cache_config_permit_pinning;
211 extern int cache_config_select_alternate;
212 extern int cache_config_max_doc_size;
213 extern int cache_config_min_average_object_size;
214 extern int cache_config_agg_write_backlog;
215 extern int cache_config_enable_checksum;
216 extern int cache_config_alt_rewrite_max_size;
217 extern int cache_config_read_while_writer;
218 extern int cache_config_agg_write_backlog;
219 extern int cache_config_ram_cache_compress;
220 extern int cache_config_ram_cache_compress_percent;
221 extern int cache_config_ram_cache_use_seen_filter;
222 extern int cache_config_hit_evacuate_percent;
223 extern int cache_config_hit_evacuate_size_limit;
224 extern int cache_config_force_sector_size;
225 extern int cache_config_target_fragment_size;
226 extern int cache_config_mutex_retry_delay;
227 extern int cache_read_while_writer_retry_delay;
228 extern int cache_config_read_while_writer_max_retries;
229 
230 // CacheVC
231 struct CacheVC : public CacheVConnection {
232   CacheVC();
233 
234   VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) override;
235   VIO *do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *buf, int64_t offset) override;
236   VIO *do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner = false) override;
237   void do_io_close(int lerrno = -1) override;
238   void reenable(VIO *avio) override;
239   void reenable_re(VIO *avio) override;
240   bool get_data(int i, void *data) override;
241   bool set_data(int i, void *data) override;
242 
243   bool
is_ram_cache_hitCacheVC244   is_ram_cache_hit() const override
245   {
246     ink_assert(vio.op == VIO::READ);
247     return !f.not_from_ram_cache;
248   }
249 
250   int
get_headerCacheVC251   get_header(void **ptr, int *len) override
252   {
253     if (first_buf) {
254       Doc *doc = (Doc *)first_buf->data();
255       *ptr     = doc->hdr();
256       *len     = doc->hlen;
257       return 0;
258     }
259 
260     return -1;
261   }
262 
263   int
set_headerCacheVC264   set_header(void *ptr, int len) override
265   {
266     header_to_write     = ptr;
267     header_to_write_len = len;
268     return 0;
269   }
270 
271   int
get_single_dataCacheVC272   get_single_data(void **ptr, int *len) override
273   {
274     if (first_buf) {
275       Doc *doc = (Doc *)first_buf->data();
276       if (doc->data_len() == doc->total_len) {
277         *ptr = doc->data();
278         *len = doc->data_len();
279         return 0;
280       }
281     }
282 
283     return -1;
284   }
285 
286   int
get_volume_numberCacheVC287   get_volume_number() const override
288   {
289     if (vol && vol->cache_vol) {
290       return vol->cache_vol->vol_number;
291     }
292 
293     return -1;
294   }
295 
296   const char *
get_disk_pathCacheVC297   get_disk_path() const override
298   {
299     if (vol && vol->disk) {
300       return vol->disk->path;
301     }
302 
303     return nullptr;
304   }
305 
306   bool
is_compressed_in_ramCacheVC307   is_compressed_in_ram() const override
308   {
309     ink_assert(vio.op == VIO::READ);
310     return f.compressed_in_ram;
311   }
312 
313   bool writer_done();
314   int calluser(int event);
315   int callcont(int event);
316   int die();
317   int dead(int event, Event *e);
318 
319   int handleReadDone(int event, Event *e);
320   int handleRead(int event, Event *e);
321   int do_read_call(CacheKey *akey);
322   int handleWrite(int event, Event *e);
323   int handleWriteLock(int event, Event *e);
324   int do_write_call();
325   int do_write_lock();
326   int do_write_lock_call();
327   int do_sync(uint32_t target_write_serial);
328 
329   int openReadClose(int event, Event *e);
330   int openReadReadDone(int event, Event *e);
331   int openReadMain(int event, Event *e);
332   int openReadStartEarliest(int event, Event *e);
333   int openReadVecWrite(int event, Event *e);
334   int openReadStartHead(int event, Event *e);
335   int openReadFromWriter(int event, Event *e);
336   int openReadFromWriterMain(int event, Event *e);
337   int openReadFromWriterFailure(int event, Event *);
338   int openReadChooseWriter(int event, Event *e);
339   int openReadDirDelete(int event, Event *e);
340 
341   int openWriteCloseDir(int event, Event *e);
342   int openWriteCloseHeadDone(int event, Event *e);
343   int openWriteCloseHead(int event, Event *e);
344   int openWriteCloseDataDone(int event, Event *e);
345   int openWriteClose(int event, Event *e);
346   int openWriteRemoveVector(int event, Event *e);
347   int openWriteWriteDone(int event, Event *e);
348   int openWriteOverwrite(int event, Event *e);
349   int openWriteMain(int event, Event *e);
350   int openWriteStartDone(int event, Event *e);
351   int openWriteStartBegin(int event, Event *e);
352 
353   int updateVector(int event, Event *e);
354   int updateReadDone(int event, Event *e);
355   int updateVecWrite(int event, Event *e);
356 
357   int removeEvent(int event, Event *e);
358 
359   int linkWrite(int event, Event *e);
360   int derefRead(int event, Event *e);
361 
362   int scanVol(int event, Event *e);
363   int scanObject(int event, Event *e);
364   int scanUpdateDone(int event, Event *e);
365   int scanOpenWrite(int event, Event *e);
366   int scanRemoveDone(int event, Event *e);
367 
368   int
is_io_in_progressCacheVC369   is_io_in_progress()
370   {
371     return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
372   }
373   void
set_io_not_in_progressCacheVC374   set_io_not_in_progress()
375   {
376     io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
377   }
378   void
set_agg_write_in_progressCacheVC379   set_agg_write_in_progress()
380   {
381     io.aiocb.aio_fildes = AIO_AGG_WRITE_IN_PROGRESS;
382   }
383   int evacuateDocDone(int event, Event *e);
384   int evacuateReadHead(int event, Event *e);
385 
386   void cancel_trigger();
387   int64_t get_object_size() override;
388   void set_http_info(CacheHTTPInfo *info) override;
389   void get_http_info(CacheHTTPInfo **info) override;
390   /** Get the fragment table.
391       @return The address of the start of the fragment table,
392       or @c nullptr if there is no fragment table.
393   */
394   virtual HTTPInfo::FragOffset *get_frag_table();
395   /** Load alt pointers and do fixups if needed.
396       @return Length of header data used for alternates.
397    */
398   virtual uint32_t load_http_info(CacheHTTPInfoVector *info, struct Doc *doc, RefCountObj *block_ptr = nullptr);
399   bool is_pread_capable() override;
400   bool set_pin_in_cache(time_t time_pin) override;
401   time_t get_pin_in_cache() override;
402 
403 // offsets from the base stat
404 #define CACHE_STAT_ACTIVE 0
405 #define CACHE_STAT_SUCCESS 1
406 #define CACHE_STAT_FAILURE 2
407 
408   // number of bytes to memset to 0 in the CacheVC when we free
409   // it. All member variables starting from vio are memset to 0.
410   // This variable is initialized in CacheVC constructor.
411   static int size_to_init;
412 
413   // Start Region A
414   // This set of variables are not reset when the cacheVC is freed.
415   // A CacheVC must set these to the correct values whenever needed
416   // These are variables that are always set to the correct values
417   // before being used by the CacheVC
418   CacheKey key, first_key, earliest_key, update_key;
419   Dir dir, earliest_dir, overwrite_dir, first_dir;
420   // end Region A
421 
422   // Start Region B
423   // These variables are individually cleared or reset when the
424   // CacheVC is freed. All these variables must be reset/cleared
425   // in free_CacheVC.
426   Action _action;
427   CacheHTTPHdr request;
428   CacheHTTPInfoVector vector;
429   CacheHTTPInfo alternate;
430   Ptr<IOBufferData> buf;
431   Ptr<IOBufferData> first_buf;
432   Ptr<IOBufferBlock> blocks; // data available to write
433   Ptr<IOBufferBlock> writer_buf;
434 
435   OpenDirEntry *od = nullptr;
436   AIOCallbackInternal io;
437   int alternate_index = CACHE_ALT_INDEX_DEFAULT; // preferred position in vector
438   LINK(CacheVC, opendir_link);
439 #ifdef CACHE_STAT_PAGES
440   LINK(CacheVC, stat_link);
441 #endif
442   // end Region B
443 
444   // Start Region C
445   // These variables are memset to 0 when the structure is freed.
446   // The size of this region is size_to_init which is initialized
447   // in the CacheVC constructor. It assumes that vio is the start
448   // of this region.
449   // NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the
450   // size_to_init initialization
451   VIO vio;
452   CacheFragType frag_type;
453   CacheHTTPInfo *info;
454   CacheHTTPInfoVector *write_vector;
455   const OverridableHttpConfigParams *params;
456   int header_len;        // for communicating with agg_copy
457   int frag_len;          // for communicating with agg_copy
458   uint32_t write_len;    // for communicating with agg_copy
459   uint32_t agg_len;      // for communicating with aggWrite
460   uint32_t write_serial; // serial of the final write for SYNC
461   Vol *vol;
462   Dir *last_collision;
463   Event *trigger;
464   CacheKey *read_key;
465   ContinuationHandler save_handler;
466   uint32_t pin_in_cache;
467   ink_hrtime start_time;
468   int base_stat;
469   int recursive;
470   int closed;
471   uint64_t seek_to;      // pread offset
472   int64_t offset;        // offset into 'blocks' of data to write
473   int64_t writer_offset; // offset of the writer for reading from a writer
474   int64_t length;        // length of data available to write
475   int64_t doc_pos;       // read position in 'buf'
476   uint64_t write_pos;    // length written
477   uint64_t total_len;    // total length written and available to write
478   uint64_t doc_len;      // total_length (of the selected alternate for HTTP)
479   uint64_t update_len;
480   int fragment;
481   int scan_msec_delay;
482   CacheVC *write_vc;
483   char *hostname;
484   int host_len;
485   int header_to_write_len;
486   void *header_to_write;
487   short writer_lock_retry;
488   union {
489     uint32_t flags;
490     struct {
491       unsigned int use_first_key : 1;
492       unsigned int overwrite : 1;      // overwrite first_key Dir if it exists
493       unsigned int close_complete : 1; // WRITE_COMPLETE is final
494       unsigned int sync : 1;           // write to be committed to durable storage before WRITE_COMPLETE
495       unsigned int evacuator : 1;
496       unsigned int single_fragment : 1;
497       unsigned int evac_vector : 1;
498       unsigned int lookup : 1;
499       unsigned int update : 1;
500       unsigned int remove : 1;
501       unsigned int remove_aborted_writers : 1;
502       unsigned int open_read_timeout : 1; // UNUSED
503       unsigned int data_done : 1;
504       unsigned int read_from_writer_called : 1;
505       unsigned int not_from_ram_cache : 1; // entire object was from ram cache
506       unsigned int rewrite_resident_alt : 1;
507       unsigned int readers : 1;
508       unsigned int doc_from_ram_cache : 1;
509       unsigned int hit_evacuate : 1;
510       unsigned int compressed_in_ram : 1; // compressed state in ram cache
511       unsigned int allow_empty_doc : 1;   // used for cache empty http document
512     } f;
513   };
514   // BTF optimization used to skip reading stuff in cache partition that doesn't contain any
515   // dir entries.
516   char *scan_vol_map;
517   // BTF fix to handle objects that overlapped over two different reads,
518   // this is how much we need to back up the buffer to get the start of the overlapping object.
519   off_t scan_fix_buffer_offset;
520   // end region C
521 };
522 
523 #define PUSH_HANDLER(_x)                                          \
524   do {                                                            \
525     ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \
526     save_handler = handler;                                       \
527     handler      = (ContinuationHandler)(_x);                     \
528   } while (0)
529 
530 #define POP_HANDLER                                               \
531   do {                                                            \
532     handler = save_handler;                                       \
533     ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \
534   } while (0)
535 
536 struct CacheRemoveCont : public Continuation {
537   int event_handler(int event, void *data);
538 
CacheRemoveContCacheRemoveCont539   CacheRemoveCont() : Continuation(nullptr) {}
540 };
541 
542 // Global Data
543 extern ClassAllocator<CacheVC> cacheVConnectionAllocator;
544 extern CacheKey zero_key;
545 extern CacheSync *cacheDirSync;
546 // Function Prototypes
547 int cache_write(CacheVC *, CacheHTTPInfoVector *);
548 int get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key);
549 CacheVC *new_DocEvacuator(int nbytes, Vol *d);
550 
551 // inline Functions
552 
553 TS_INLINE CacheVC *
new_CacheVC(Continuation * cont)554 new_CacheVC(Continuation *cont)
555 {
556   EThread *t          = cont->mutex->thread_holding;
557   CacheVC *c          = THREAD_ALLOC(cacheVConnectionAllocator, t);
558   c->vector.data.data = &c->vector.data.fast_data[0];
559   c->_action          = cont;
560   c->mutex            = cont->mutex;
561   c->start_time       = Thread::get_hrtime();
562   c->setThreadAffinity(t);
563   ink_assert(c->trigger == nullptr);
564   Debug("cache_new", "new %p", c);
565 #ifdef CACHE_STAT_PAGES
566   ink_assert(!c->stat_link.next);
567   ink_assert(!c->stat_link.prev);
568 #endif
569   dir_clear(&c->dir);
570   return c;
571 }
572 
573 TS_INLINE int
free_CacheVC(CacheVC * cont)574 free_CacheVC(CacheVC *cont)
575 {
576   Debug("cache_free", "free %p", cont);
577   ProxyMutex *mutex = cont->mutex.get();
578   Vol *vol          = cont->vol;
579   if (vol) {
580     CACHE_DECREMENT_DYN_STAT(cont->base_stat + CACHE_STAT_ACTIVE);
581     if (cont->closed > 0) {
582       CACHE_INCREMENT_DYN_STAT(cont->base_stat + CACHE_STAT_SUCCESS);
583     } // else abort,cancel
584   }
585   ink_assert(mutex->thread_holding == this_ethread());
586   if (cont->trigger) {
587     cont->trigger->cancel();
588   }
589   ink_assert(!cont->is_io_in_progress());
590   ink_assert(!cont->od);
591   /* calling cont->io.action = nullptr causes compile problem on 2.6 solaris
592      release build....weird??? For now, null out continuation and mutex
593      of the action separately */
594   cont->io.action.continuation = nullptr;
595   cont->io.action.mutex        = nullptr;
596   cont->io.mutex.clear();
597   cont->io.aio_result       = 0;
598   cont->io.aiocb.aio_nbytes = 0;
599   cont->request.reset();
600   cont->vector.clear();
601   cont->vio.buffer.clear();
602   cont->vio.mutex.clear();
603   if (cont->vio.op == VIO::WRITE && cont->alternate_index == CACHE_ALT_INDEX_DEFAULT) {
604     cont->alternate.destroy();
605   } else {
606     cont->alternate.clear();
607   }
608   cont->_action.cancelled = 0;
609   cont->_action.mutex.clear();
610   cont->mutex.clear();
611   cont->buf.clear();
612   cont->first_buf.clear();
613   cont->blocks.clear();
614   cont->writer_buf.clear();
615   cont->alternate_index = CACHE_ALT_INDEX_DEFAULT;
616   if (cont->scan_vol_map) {
617     ats_free(cont->scan_vol_map);
618   }
619   memset((char *)&cont->vio, 0, cont->size_to_init);
620 #ifdef CACHE_STAT_PAGES
621   ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
622 #endif
623 #ifdef DEBUG
624   SET_CONTINUATION_HANDLER(cont, &CacheVC::dead);
625 #endif
626   THREAD_FREE(cont, cacheVConnectionAllocator, this_thread());
627   return EVENT_DONE;
628 }
629 
630 TS_INLINE int
calluser(int event)631 CacheVC::calluser(int event)
632 {
633   recursive++;
634   ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
635   vio.cont->handleEvent(event, (void *)&vio);
636   recursive--;
637   if (closed) {
638     die();
639     return EVENT_DONE;
640   }
641   return EVENT_CONT;
642 }
643 
644 TS_INLINE int
callcont(int event)645 CacheVC::callcont(int event)
646 {
647   recursive++;
648   ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
649   _action.continuation->handleEvent(event, this);
650   recursive--;
651   if (closed) {
652     die();
653   } else if (vio.vc_server) {
654     handleEvent(EVENT_IMMEDIATE, nullptr);
655   }
656   return EVENT_DONE;
657 }
658 
659 TS_INLINE int
do_read_call(CacheKey * akey)660 CacheVC::do_read_call(CacheKey *akey)
661 {
662   doc_pos             = 0;
663   read_key            = akey;
664   io.aiocb.aio_nbytes = dir_approx_size(&dir);
665   PUSH_HANDLER(&CacheVC::handleRead);
666   return handleRead(EVENT_CALL, nullptr);
667 }
668 
669 TS_INLINE int
do_write_call()670 CacheVC::do_write_call()
671 {
672   PUSH_HANDLER(&CacheVC::handleWrite);
673   return handleWrite(EVENT_CALL, nullptr);
674 }
675 
676 TS_INLINE void
cancel_trigger()677 CacheVC::cancel_trigger()
678 {
679   if (trigger) {
680     trigger->cancel_action();
681     trigger = nullptr;
682   }
683 }
684 
685 TS_INLINE int
die()686 CacheVC::die()
687 {
688   if (vio.op == VIO::WRITE) {
689     if (f.update && total_len) {
690       alternate.object_key_set(earliest_key);
691     }
692     if (!is_io_in_progress()) {
693       SET_HANDLER(&CacheVC::openWriteClose);
694       if (!recursive) {
695         openWriteClose(EVENT_NONE, nullptr);
696       }
697     } // else catch it at the end of openWriteWriteDone
698     return EVENT_CONT;
699   } else {
700     if (is_io_in_progress()) {
701       save_handler = (ContinuationHandler)&CacheVC::openReadClose;
702     } else {
703       SET_HANDLER(&CacheVC::openReadClose);
704       if (!recursive) {
705         openReadClose(EVENT_NONE, nullptr);
706       }
707     }
708     return EVENT_CONT;
709   }
710 }
711 
712 TS_INLINE int
handleWriteLock(int,Event * e)713 CacheVC::handleWriteLock(int /* event ATS_UNUSED */, Event *e)
714 {
715   cancel_trigger();
716   int ret = 0;
717   {
718     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
719     if (!lock.is_locked()) {
720       set_agg_write_in_progress();
721       trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
722       return EVENT_CONT;
723     }
724     ret = handleWrite(EVENT_CALL, e);
725   }
726   if (ret == EVENT_RETURN) {
727     return handleEvent(AIO_EVENT_DONE, nullptr);
728   }
729   return EVENT_CONT;
730 }
731 
732 TS_INLINE int
do_write_lock()733 CacheVC::do_write_lock()
734 {
735   PUSH_HANDLER(&CacheVC::handleWriteLock);
736   return handleWriteLock(EVENT_NONE, nullptr);
737 }
738 
739 TS_INLINE int
do_write_lock_call()740 CacheVC::do_write_lock_call()
741 {
742   PUSH_HANDLER(&CacheVC::handleWriteLock);
743   return handleWriteLock(EVENT_CALL, nullptr);
744 }
745 
746 TS_INLINE bool
writer_done()747 CacheVC::writer_done()
748 {
749   OpenDirEntry *cod = od;
750   if (!cod) {
751     cod = vol->open_read(&first_key);
752   }
753   CacheVC *w = (cod) ? cod->writers.head : nullptr;
754   // If the write vc started after the reader, then its not the
755   // original writer, since we never choose a writer that started
756   // after the reader. The original writer was deallocated and then
757   // reallocated for the same first_key
758   for (; w && (w != write_vc || w->start_time > start_time); w = (CacheVC *)w->opendir_link.next) {
759     ;
760   }
761   if (!w) {
762     return true;
763   }
764   return false;
765 }
766 
767 TS_INLINE int
close_write(CacheVC * cont)768 Vol::close_write(CacheVC *cont)
769 {
770 #ifdef CACHE_STAT_PAGES
771   ink_assert(stat_cache_vcs.head);
772   stat_cache_vcs.remove(cont, cont->stat_link);
773   ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
774 #endif
775   return open_dir.close_write(cont);
776 }
777 
778 // Returns 0 on success or a positive error code on failure
779 TS_INLINE int
open_write(CacheVC * cont,int allow_if_writers,int max_writers)780 Vol::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
781 {
782   Vol *vol       = this;
783   bool agg_error = false;
784   if (!cont->f.remove) {
785     agg_error = (!cont->f.update && agg_todo_size > cache_config_agg_write_backlog);
786 #ifdef CACHE_AGG_FAIL_RATE
787     agg_error = agg_error || ((uint32_t)mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX * CACHE_AGG_FAIL_RATE));
788 #endif
789   }
790   if (agg_error) {
791     CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
792     return ECACHE_WRITE_FAIL;
793   }
794   if (open_dir.open_write(cont, allow_if_writers, max_writers)) {
795 #ifdef CACHE_STAT_PAGES
796     ink_assert(cont->mutex->thread_holding == this_ethread());
797     ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
798     stat_cache_vcs.enqueue(cont, cont->stat_link);
799 #endif
800     return 0;
801   }
802   return ECACHE_DOC_BUSY;
803 }
804 
805 TS_INLINE int
close_write_lock(CacheVC * cont)806 Vol::close_write_lock(CacheVC *cont)
807 {
808   EThread *t = cont->mutex->thread_holding;
809   CACHE_TRY_LOCK(lock, mutex, t);
810   if (!lock.is_locked()) {
811     return -1;
812   }
813   return close_write(cont);
814 }
815 
816 TS_INLINE int
open_write_lock(CacheVC * cont,int allow_if_writers,int max_writers)817 Vol::open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers)
818 {
819   EThread *t = cont->mutex->thread_holding;
820   CACHE_TRY_LOCK(lock, mutex, t);
821   if (!lock.is_locked()) {
822     return -1;
823   }
824   return open_write(cont, allow_if_writers, max_writers);
825 }
826 
827 TS_INLINE OpenDirEntry *
open_read_lock(CryptoHash * key,EThread * t)828 Vol::open_read_lock(CryptoHash *key, EThread *t)
829 {
830   CACHE_TRY_LOCK(lock, mutex, t);
831   if (!lock.is_locked()) {
832     return nullptr;
833   }
834   return open_dir.open_read(key);
835 }
836 
837 TS_INLINE int
begin_read_lock(CacheVC * cont)838 Vol::begin_read_lock(CacheVC *cont)
839 {
840 // no need for evacuation as the entire document is already in memory
841 #ifndef CACHE_STAT_PAGES
842   if (cont->f.single_fragment) {
843     return 0;
844   }
845 #endif
846   // VC is enqueued in stat_cache_vcs in the begin_read call
847   EThread *t = cont->mutex->thread_holding;
848   CACHE_TRY_LOCK(lock, mutex, t);
849   if (!lock.is_locked()) {
850     return -1;
851   }
852   return begin_read(cont);
853 }
854 
855 TS_INLINE int
close_read_lock(CacheVC * cont)856 Vol::close_read_lock(CacheVC *cont)
857 {
858   EThread *t = cont->mutex->thread_holding;
859   CACHE_TRY_LOCK(lock, mutex, t);
860   if (!lock.is_locked()) {
861     return -1;
862   }
863   return close_read(cont);
864 }
865 
866 TS_INLINE int
dir_delete_lock(CacheKey * key,Vol * d,ProxyMutex * m,Dir * del)867 dir_delete_lock(CacheKey *key, Vol *d, ProxyMutex *m, Dir *del)
868 {
869   EThread *thread = m->thread_holding;
870   CACHE_TRY_LOCK(lock, d->mutex, thread);
871   if (!lock.is_locked()) {
872     return -1;
873   }
874   return dir_delete(key, d, del);
875 }
876 
877 TS_INLINE int
dir_insert_lock(CacheKey * key,Vol * d,Dir * to_part,ProxyMutex * m)878 dir_insert_lock(CacheKey *key, Vol *d, Dir *to_part, ProxyMutex *m)
879 {
880   EThread *thread = m->thread_holding;
881   CACHE_TRY_LOCK(lock, d->mutex, thread);
882   if (!lock.is_locked()) {
883     return -1;
884   }
885   return dir_insert(key, d, to_part);
886 }
887 
888 TS_INLINE int
889 dir_overwrite_lock(CacheKey *key, Vol *d, Dir *to_part, ProxyMutex *m, Dir *overwrite, bool must_overwrite = true)
890 {
891   EThread *thread = m->thread_holding;
892   CACHE_TRY_LOCK(lock, d->mutex, thread);
893   if (!lock.is_locked()) {
894     return -1;
895   }
896   return dir_overwrite(key, d, to_part, overwrite, must_overwrite);
897 }
898 
899 void TS_INLINE
rand_CacheKey(CacheKey * next_key,Ptr<ProxyMutex> & mutex)900 rand_CacheKey(CacheKey *next_key, Ptr<ProxyMutex> &mutex)
901 {
902   next_key->b[0] = mutex->thread_holding->generator.random();
903   next_key->b[1] = mutex->thread_holding->generator.random();
904 }
905 
906 extern uint8_t CacheKey_next_table[];
907 void TS_INLINE
next_CacheKey(CacheKey * next_key,CacheKey * key)908 next_CacheKey(CacheKey *next_key, CacheKey *key)
909 {
910   uint8_t *b = (uint8_t *)next_key;
911   uint8_t *k = (uint8_t *)key;
912   b[0]       = CacheKey_next_table[k[0]];
913   for (int i = 1; i < 16; i++) {
914     b[i] = CacheKey_next_table[(b[i - 1] + k[i]) & 0xFF];
915   }
916 }
917 extern uint8_t CacheKey_prev_table[];
918 void TS_INLINE
prev_CacheKey(CacheKey * prev_key,CacheKey * key)919 prev_CacheKey(CacheKey *prev_key, CacheKey *key)
920 {
921   uint8_t *b = (uint8_t *)prev_key;
922   uint8_t *k = (uint8_t *)key;
923   for (int i = 15; i > 0; i--) {
924     b[i] = 256 + CacheKey_prev_table[k[i]] - k[i - 1];
925   }
926   b[0] = CacheKey_prev_table[k[0]];
927 }
928 
929 TS_INLINE unsigned int
next_rand(unsigned int * p)930 next_rand(unsigned int *p)
931 {
932   unsigned int seed = *p;
933   seed              = 1103515145 * seed + 12345;
934   *p                = seed;
935   return seed;
936 }
937 
938 extern ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator;
939 
940 TS_INLINE CacheRemoveCont *
new_CacheRemoveCont()941 new_CacheRemoveCont()
942 {
943   CacheRemoveCont *cache_rm = cacheRemoveContAllocator.alloc();
944 
945   cache_rm->mutex = new_ProxyMutex();
946   SET_CONTINUATION_HANDLER(cache_rm, &CacheRemoveCont::event_handler);
947   return cache_rm;
948 }
949 
950 TS_INLINE void
free_CacheRemoveCont(CacheRemoveCont * cache_rm)951 free_CacheRemoveCont(CacheRemoveCont *cache_rm)
952 {
953   cache_rm->mutex = nullptr;
954   cacheRemoveContAllocator.free(cache_rm);
955 }
956 
957 TS_INLINE int
event_handler(int event,void * data)958 CacheRemoveCont::event_handler(int event, void *data)
959 {
960   (void)event;
961   (void)data;
962   free_CacheRemoveCont(this);
963   return EVENT_DONE;
964 }
965 
966 int64_t cache_bytes_used();
967 int64_t cache_bytes_total();
968 
969 #ifdef DEBUG
970 #define CACHE_DEBUG_INCREMENT_DYN_STAT(_x) CACHE_INCREMENT_DYN_STAT(_x)
971 #define CACHE_DEBUG_SUM_DYN_STAT(_x, _y) CACHE_SUM_DYN_STAT(_x, _y)
972 #else
973 #define CACHE_DEBUG_INCREMENT_DYN_STAT(_x)
974 #define CACHE_DEBUG_SUM_DYN_STAT(_x, _y)
975 #endif
976 
977 struct CacheHostRecord;
978 struct Vol;
979 class CacheHostTable;
980 
981 struct Cache {
982   int cache_read_done       = 0;
983   int total_good_nvol       = 0;
984   int total_nvol            = 0;
985   int ready                 = CACHE_INITIALIZING;
986   int64_t cache_size        = 0; // in store block size
987   CacheHostTable *hosttable = nullptr;
988   int total_initialized_vol = 0;
989   CacheType scheme          = CACHE_NONE_TYPE;
990 
991   int open(bool reconfigure, bool fix);
992   int close();
993 
994   Action *lookup(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len);
995   inkcoreapi Action *open_read(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int len);
996   inkcoreapi Action *open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_type, int options = 0,
997                                 time_t pin_in_cache = (time_t)0, const char *hostname = nullptr, int host_len = 0);
998   inkcoreapi Action *remove(Continuation *cont, const CacheKey *key, CacheFragType type = CACHE_FRAG_TYPE_HTTP,
999                             const char *hostname = nullptr, int host_len = 0);
1000   Action *scan(Continuation *cont, const char *hostname = nullptr, int host_len = 0, int KB_per_second = 2500);
1001 
1002   Action *open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, const OverridableHttpConfigParams *params,
1003                     CacheFragType type, const char *hostname, int host_len);
1004   Action *open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t)0,
1005                      const CacheKey *key1 = nullptr, CacheFragType type = CACHE_FRAG_TYPE_HTTP, const char *hostname = nullptr,
1006                      int host_len = 0);
1007   static void generate_key(CryptoHash *hash, CacheURL *url);
1008   static void generate_key(HttpCacheKey *hash, CacheURL *url, cache_generation_t generation = -1);
1009 
1010   Action *link(Continuation *cont, const CacheKey *from, const CacheKey *to, CacheFragType type, const char *hostname,
1011                int host_len);
1012   Action *deref(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len);
1013 
1014   void vol_initialized(bool result);
1015 
1016   int open_done();
1017 
1018   Vol *key_to_vol(const CacheKey *key, const char *hostname, int host_len);
1019 
CacheCache1020   Cache() {}
1021 };
1022 
1023 extern Cache *theCache;
1024 inkcoreapi extern Cache *caches[NUM_CACHE_FRAG_TYPES];
1025 
1026 TS_INLINE void
generate_key(CryptoHash * hash,CacheURL * url)1027 Cache::generate_key(CryptoHash *hash, CacheURL *url)
1028 {
1029   url->hash_get(hash);
1030 }
1031 
1032 TS_INLINE void
generate_key(HttpCacheKey * key,CacheURL * url,cache_generation_t generation)1033 Cache::generate_key(HttpCacheKey *key, CacheURL *url, cache_generation_t generation)
1034 {
1035   key->hostname = url->host_get(&key->hostlen);
1036   url->hash_get(&key->hash, generation);
1037 }
1038 
1039 TS_INLINE unsigned int
cache_hash(const CryptoHash & hash)1040 cache_hash(const CryptoHash &hash)
1041 {
1042   uint64_t f         = hash.fold();
1043   unsigned int mhash = (unsigned int)(f >> 32);
1044   return mhash;
1045 }
1046 
1047 LINK_DEFINITION(CacheVC, opendir_link)
1048