1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #pragma once
25 
26 #include <atomic>
27 
28 #define CACHE_BLOCK_SHIFT 9
29 #define CACHE_BLOCK_SIZE (1 << CACHE_BLOCK_SHIFT) // 512, smallest sector size
30 #define ROUND_TO_STORE_BLOCK(_x) INK_ALIGN((_x), STORE_BLOCK_SIZE)
31 #define ROUND_TO_CACHE_BLOCK(_x) INK_ALIGN((_x), CACHE_BLOCK_SIZE)
32 #define ROUND_TO_SECTOR(_p, _x) INK_ALIGN((_x), _p->sector_size)
33 #define ROUND_TO(_x, _y) INK_ALIGN((_x), (_y))
34 
35 // Vol (volumes)
36 #define VOL_MAGIC 0xF1D0F00D
37 #define START_BLOCKS 16 // 8k, STORE_BLOCK_SIZE
38 #define START_POS ((off_t)START_BLOCKS * CACHE_BLOCK_SIZE)
39 #define AGG_SIZE (4 * 1024 * 1024)     // 4MB
40 #define AGG_HIGH_WATER (AGG_SIZE / 2)  // 2MB
41 #define EVACUATION_SIZE (2 * AGG_SIZE) // 8MB
42 #define MAX_VOL_SIZE ((off_t)512 * 1024 * 1024 * 1024 * 1024)
43 #define STORE_BLOCKS_PER_CACHE_BLOCK (STORE_BLOCK_SIZE / CACHE_BLOCK_SIZE)
44 #define MAX_VOL_BLOCKS (MAX_VOL_SIZE / CACHE_BLOCK_SIZE)
45 #define MAX_FRAG_SIZE (AGG_SIZE - sizeof(Doc)) // true max
46 #define LEAVE_FREE DEFAULT_MAX_BUFFER_SIZE
47 #define PIN_SCAN_EVERY 16 // scan every 1/16 of disk
48 #define VOL_HASH_TABLE_SIZE 32707
49 #define VOL_HASH_EMPTY 0xFFFF
50 #define VOL_HASH_ALLOC_SIZE (8 * 1024 * 1024) // one chance per this unit
51 #define LOOKASIDE_SIZE 256
52 #define EVACUATION_BUCKET_SIZE (2 * EVACUATION_SIZE) // 16MB
53 #define RECOVERY_SIZE EVACUATION_SIZE                // 8MB
54 #define AIO_NOT_IN_PROGRESS 0
55 #define AIO_AGG_WRITE_IN_PROGRESS -1
56 #define AUTO_SIZE_RAM_CACHE -1                               // 1-1 with directory size
57 #define DEFAULT_TARGET_FRAGMENT_SIZE (1048576 - sizeof(Doc)) // 1MB
58 
59 #define dir_offset_evac_bucket(_o) (_o / (EVACUATION_BUCKET_SIZE / CACHE_BLOCK_SIZE))
60 #define dir_evac_bucket(_e) dir_offset_evac_bucket(dir_offset(_e))
61 #define offset_evac_bucket(_d, _o) \
62   dir_offset_evac_bucket((_d->offset_to_vol_offset(_o)
63 
64 // Documents
65 
66 #define DOC_MAGIC ((uint32_t)0x5F129B13)
67 #define DOC_CORRUPT ((uint32_t)0xDEADBABE)
68 #define DOC_NO_CHECKSUM ((uint32_t)0xA0B0C0D0)
69 
70 struct Cache;
71 struct Vol;
72 struct CacheDisk;
73 struct VolInitInfo;
74 struct DiskVol;
75 struct CacheVol;
76 
77 struct VolHeaderFooter {
78   unsigned int magic;
79   ts::VersionNumber version;
80   time_t create_time;
81   off_t write_pos;
82   off_t last_write_pos;
83   off_t agg_pos;
84   uint32_t generation; // token generation (vary), this cannot be 0
85   uint32_t phase;
86   uint32_t cycle;
87   uint32_t sync_serial;
88   uint32_t write_serial;
89   uint32_t dirty;
90   uint32_t sector_size;
91   uint32_t unused; // pad out to 8 byte boundary
92   uint16_t freelist[1];
93 };
94 
95 // Key and Earliest key for each fragment that needs to be evacuated
96 struct EvacuationKey {
97   SLink<EvacuationKey> link;
98   CryptoHash key;
99   CryptoHash earliest_key;
100 };
101 
102 struct EvacuationBlock {
103   union {
104     unsigned int init;
105     struct {
106       unsigned int done : 1;          // has been evacuated
107       unsigned int pinned : 1;        // check pinning timeout
108       unsigned int evacuate_head : 1; // check pinning timeout
109       unsigned int unused : 29;
110     } f;
111   };
112 
113   int readers;
114   Dir dir;
115   Dir new_dir;
116   // we need to have a list of evacuationkeys because of collision.
117   EvacuationKey evac_frags;
118   CacheVC *earliest_evacuator;
119   LINK(EvacuationBlock, link);
120 };
121 
122 struct Vol : public Continuation {
123   char *path = nullptr;
124   ats_scoped_str hash_text;
125   CryptoHash hash_id;
126   int fd = -1;
127 
128   char *raw_dir           = nullptr;
129   Dir *dir                = nullptr;
130   VolHeaderFooter *header = nullptr;
131   VolHeaderFooter *footer = nullptr;
132   int segments            = 0;
133   off_t buckets           = 0;
134   off_t recover_pos       = 0;
135   off_t prev_recover_pos  = 0;
136   off_t scan_pos          = 0;
137   off_t skip              = 0; // start of headers
138   off_t start             = 0; // start of data
139   off_t len               = 0;
140   off_t data_blocks       = 0;
141   int hit_evacuate_window = 0;
142   AIOCallbackInternal io;
143 
144   Queue<CacheVC, Continuation::Link_link> agg;
145   Queue<CacheVC, Continuation::Link_link> stat_cache_vcs;
146   Queue<CacheVC, Continuation::Link_link> sync;
147   char *agg_buffer  = nullptr;
148   int agg_todo_size = 0;
149   int agg_buf_pos   = 0;
150 
151   Event *trigger = nullptr;
152 
153   OpenDir open_dir;
154   RamCache *ram_cache            = nullptr;
155   int evacuate_size              = 0;
156   DLL<EvacuationBlock> *evacuate = nullptr;
157   DLL<EvacuationBlock> lookaside[LOOKASIDE_SIZE];
158   CacheVC *doc_evacuator = nullptr;
159 
160   VolInitInfo *init_info = nullptr;
161 
162   CacheDisk *disk            = nullptr;
163   Cache *cache               = nullptr;
164   CacheVol *cache_vol        = nullptr;
165   uint32_t last_sync_serial  = 0;
166   uint32_t last_write_serial = 0;
167   uint32_t sector_size       = 0;
168   bool recover_wrapped       = false;
169   bool dir_sync_waiting      = false;
170   bool dir_sync_in_progress  = false;
171   bool writing_end_marker    = false;
172 
173   CacheKey first_fragment_key;
174   int64_t first_fragment_offset = 0;
175   Ptr<IOBufferData> first_fragment_data;
176 
177   void cancel_trigger();
178 
179   int recover_data();
180 
181   int open_write(CacheVC *cont, int allow_if_writers, int max_writers);
182   int open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers);
183   int close_write(CacheVC *cont);
184   int close_write_lock(CacheVC *cont);
185   int begin_read(CacheVC *cont);
186   int begin_read_lock(CacheVC *cont);
187   // unused read-write interlock code
188   // currently http handles a write-lock failure by retrying the read
189   OpenDirEntry *open_read(const CryptoHash *key);
190   OpenDirEntry *open_read_lock(CryptoHash *key, EThread *t);
191   int close_read(CacheVC *cont);
192   int close_read_lock(CacheVC *cont);
193 
194   int clear_dir();
195 
196   int init(char *s, off_t blocks, off_t dir_skip, bool clear);
197 
198   int handle_dir_clear(int event, void *data);
199   int handle_dir_read(int event, void *data);
200   int handle_recover_from_data(int event, void *data);
201   int handle_recover_write_dir(int event, void *data);
202   int handle_header_read(int event, void *data);
203 
204   int dir_init_done(int event, void *data);
205 
206   int dir_check(bool fix);
207   int db_check(bool fix);
208 
209   int
is_io_in_progressVol210   is_io_in_progress()
211   {
212     return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
213   }
214   int
increment_generationVol215   increment_generation()
216   {
217     // this is stored in the offset field of the directory (!=0)
218     ink_assert(mutex->thread_holding == this_ethread());
219     header->generation++;
220     if (!header->generation)
221       header->generation++;
222     return header->generation;
223   }
224   void
set_io_not_in_progressVol225   set_io_not_in_progress()
226   {
227     io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
228   }
229 
230   int aggWriteDone(int event, Event *e);
231   int aggWrite(int event, void *e);
232   void agg_wrap();
233 
234   int evacuateWrite(CacheVC *evacuator, int event, Event *e);
235   int evacuateDocReadDone(int event, Event *e);
236   int evacuateDoc(int event, Event *e);
237 
238   int evac_range(off_t start, off_t end, int evac_phase);
239   void periodic_scan();
240   void scan_for_pinned_documents();
241   void evacuate_cleanup_blocks(int i);
242   void evacuate_cleanup();
243   EvacuationBlock *force_evacuate_head(Dir *dir, int pinned);
244   int within_hit_evacuate_window(Dir *dir);
245   uint32_t round_to_approx_size(uint32_t l);
246 
247   // inline functions
248   int headerlen();         // calculates the total length of the vol header and the freelist
249   int direntries();        // total number of dir entries
250   Dir *dir_segment(int s); // returns the first dir in the segment s
251   size_t dirlen();         // calculates the total length of header, directories and footer
252   int vol_out_of_phase_valid(Dir *e);
253 
254   int vol_out_of_phase_agg_valid(Dir *e);
255   int vol_out_of_phase_write_valid(Dir *e);
256   int vol_in_phase_valid(Dir *e);
257   int vol_in_phase_agg_buf_valid(Dir *e);
258 
259   off_t vol_offset(Dir *e);
260   off_t offset_to_vol_offset(off_t pos);
261   off_t vol_offset_to_offset(off_t pos);
262   off_t vol_relative_length(off_t start_offset);
263 
VolVol264   Vol() : Continuation(new_ProxyMutex())
265   {
266     open_dir.mutex = mutex;
267     agg_buffer     = (char *)ats_memalign(ats_pagesize(), AGG_SIZE);
268     memset(agg_buffer, 0, AGG_SIZE);
269     SET_HANDLER(&Vol::aggWrite);
270   }
271 
~VolVol272   ~Vol() override { ats_free(agg_buffer); }
273 };
274 
275 struct AIO_Callback_handler : public Continuation {
276   int handle_disk_failure(int event, void *data);
277 
AIO_Callback_handlerAIO_Callback_handler278   AIO_Callback_handler() : Continuation(new_ProxyMutex()) { SET_HANDLER(&AIO_Callback_handler::handle_disk_failure); }
279 };
280 
281 struct CacheVol {
282   int vol_number        = -1;
283   int scheme            = 0;
284   off_t size            = 0;
285   int num_vols          = 0;
286   bool ramcache_enabled = true;
287   Vol **vols            = nullptr;
288   DiskVol **disk_vols   = nullptr;
289   LINK(CacheVol, link);
290   // per volume stats
291   RecRawStatBlock *vol_rsb = nullptr;
292 
CacheVolCacheVol293   CacheVol() {}
294 };
295 
296 // Note : hdr() needs to be 8 byte aligned.
297 struct Doc {
298   uint32_t magic;     // DOC_MAGIC
299   uint32_t len;       // length of this fragment (including hlen & sizeof(Doc), unrounded)
300   uint64_t total_len; // total length of document
301 #if TS_ENABLE_FIPS == 1
302   // For FIPS CryptoHash is 256 bits vs. 128, and the 'first_key' must be checked first, so
303   // ensure that the new 'first_key' overlaps the old 'first_key' and that the rest of the data layout
304   // is the same by putting 'key' at the ned.
305   CryptoHash first_key; ///< first key in object.
306 #else
307   CryptoHash first_key; ///< first key in object.
308   CryptoHash key;       ///< Key for this doc.
309 #endif
310   uint32_t hlen;         ///< Length of this header.
311   uint32_t doc_type : 8; ///< Doc type - indicates the format of this structure and its content.
312   uint32_t v_major : 8;  ///< Major version number.
313   uint32_t v_minor : 8;  ///< Minor version number.
314   uint32_t unused : 8;   ///< Unused, forced to zero.
315   uint32_t sync_serial;
316   uint32_t write_serial;
317   uint32_t pinned; // pinned until
318   uint32_t checksum;
319 #if TS_ENABLE_FIPS == 1
320   CryptoHash key; ///< Key for this doc.
321 #endif
322 
323   uint32_t data_len();
324   uint32_t prefix_len();
325   int single_fragment();
326   int no_data_in_fragment();
327   char *hdr();
328   char *data();
329 };
330 
331 // Global Data
332 
333 extern Vol **gvol;
334 extern std::atomic<int> gnvol;
335 extern ClassAllocator<OpenDirEntry> openDirEntryAllocator;
336 extern ClassAllocator<EvacuationBlock> evacuationBlockAllocator;
337 extern ClassAllocator<EvacuationKey> evacuationKeyAllocator;
338 extern unsigned short *vol_hash_table;
339 
340 // inline Functions
341 
342 TS_INLINE int
headerlen()343 Vol::headerlen()
344 {
345   return ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter) + sizeof(uint16_t) * (this->segments - 1));
346 }
347 
348 TS_INLINE Dir *
dir_segment(int s)349 Vol::dir_segment(int s)
350 {
351   return (Dir *)(((char *)this->dir) + (s * this->buckets) * DIR_DEPTH * SIZEOF_DIR);
352 }
353 
354 TS_INLINE size_t
dirlen()355 Vol::dirlen()
356 {
357   return this->headerlen() + ROUND_TO_STORE_BLOCK(((size_t)this->buckets) * DIR_DEPTH * this->segments * SIZEOF_DIR) +
358          ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
359 }
360 
361 TS_INLINE int
direntries()362 Vol::direntries()
363 {
364   return this->buckets * DIR_DEPTH * this->segments;
365 }
366 
367 TS_INLINE int
vol_out_of_phase_valid(Dir * e)368 Vol::vol_out_of_phase_valid(Dir *e)
369 {
370   return (dir_offset(e) - 1 >= ((this->header->agg_pos - this->start) / CACHE_BLOCK_SIZE));
371 }
372 
373 TS_INLINE int
vol_out_of_phase_agg_valid(Dir * e)374 Vol::vol_out_of_phase_agg_valid(Dir *e)
375 {
376   return (dir_offset(e) - 1 >= ((this->header->agg_pos - this->start + AGG_SIZE) / CACHE_BLOCK_SIZE));
377 }
378 
379 TS_INLINE int
vol_out_of_phase_write_valid(Dir * e)380 Vol::vol_out_of_phase_write_valid(Dir *e)
381 {
382   return (dir_offset(e) - 1 >= ((this->header->write_pos - this->start) / CACHE_BLOCK_SIZE));
383 }
384 
385 TS_INLINE int
vol_in_phase_valid(Dir * e)386 Vol::vol_in_phase_valid(Dir *e)
387 {
388   return (dir_offset(e) - 1 < ((this->header->write_pos + this->agg_buf_pos - this->start) / CACHE_BLOCK_SIZE));
389 }
390 
391 TS_INLINE off_t
vol_offset(Dir * e)392 Vol::vol_offset(Dir *e)
393 {
394   return this->start + (off_t)dir_offset(e) * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE;
395 }
396 
397 TS_INLINE off_t
offset_to_vol_offset(off_t pos)398 Vol::offset_to_vol_offset(off_t pos)
399 {
400   return ((pos - this->start + CACHE_BLOCK_SIZE) / CACHE_BLOCK_SIZE);
401 }
402 
403 TS_INLINE off_t
vol_offset_to_offset(off_t pos)404 Vol::vol_offset_to_offset(off_t pos)
405 {
406   return this->start + pos * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE;
407 }
408 
409 TS_INLINE int
vol_in_phase_agg_buf_valid(Dir * e)410 Vol::vol_in_phase_agg_buf_valid(Dir *e)
411 {
412   return (this->vol_offset(e) >= this->header->write_pos && this->vol_offset(e) < (this->header->write_pos + this->agg_buf_pos));
413 }
414 // length of the partition not including the offset of location 0.
415 TS_INLINE off_t
vol_relative_length(off_t start_offset)416 Vol::vol_relative_length(off_t start_offset)
417 {
418   return (this->len + this->skip) - start_offset;
419 }
420 
421 TS_INLINE uint32_t
prefix_len()422 Doc::prefix_len()
423 {
424   return sizeof(Doc) + hlen;
425 }
426 
427 TS_INLINE uint32_t
data_len()428 Doc::data_len()
429 {
430   return len - sizeof(Doc) - hlen;
431 }
432 
433 TS_INLINE int
single_fragment()434 Doc::single_fragment()
435 {
436   return data_len() == total_len;
437 }
438 
439 TS_INLINE char *
hdr()440 Doc::hdr()
441 {
442   return reinterpret_cast<char *>(this) + sizeof(Doc);
443 }
444 
445 TS_INLINE char *
data()446 Doc::data()
447 {
448   return this->hdr() + hlen;
449 }
450 
451 int vol_dir_clear(Vol *d);
452 int vol_init(Vol *d, char *s, off_t blocks, off_t skip, bool clear);
453 
454 // inline Functions
455 
456 TS_INLINE EvacuationBlock *
evacuation_block_exists(Dir * dir,Vol * p)457 evacuation_block_exists(Dir *dir, Vol *p)
458 {
459   EvacuationBlock *b = p->evacuate[dir_evac_bucket(dir)].head;
460   for (; b; b = b->link.next)
461     if (dir_offset(&b->dir) == dir_offset(dir))
462       return b;
463   return nullptr;
464 }
465 
466 TS_INLINE void
cancel_trigger()467 Vol::cancel_trigger()
468 {
469   if (trigger) {
470     trigger->cancel_action();
471     trigger = nullptr;
472   }
473 }
474 
475 TS_INLINE EvacuationBlock *
new_EvacuationBlock(EThread * t)476 new_EvacuationBlock(EThread *t)
477 {
478   EvacuationBlock *b      = THREAD_ALLOC(evacuationBlockAllocator, t);
479   b->init                 = 0;
480   b->readers              = 0;
481   b->earliest_evacuator   = nullptr;
482   b->evac_frags.link.next = nullptr;
483   return b;
484 }
485 
486 TS_INLINE void
free_EvacuationBlock(EvacuationBlock * b,EThread * t)487 free_EvacuationBlock(EvacuationBlock *b, EThread *t)
488 {
489   EvacuationKey *e = b->evac_frags.link.next;
490   while (e) {
491     EvacuationKey *n = e->link.next;
492     evacuationKeyAllocator.free(e);
493     e = n;
494   }
495   THREAD_FREE(b, evacuationBlockAllocator, t);
496 }
497 
498 TS_INLINE OpenDirEntry *
open_read(const CryptoHash * key)499 Vol::open_read(const CryptoHash *key)
500 {
501   return open_dir.open_read(key);
502 }
503 
504 TS_INLINE int
within_hit_evacuate_window(Dir * xdir)505 Vol::within_hit_evacuate_window(Dir *xdir)
506 {
507   off_t oft       = dir_offset(xdir) - 1;
508   off_t write_off = (header->write_pos + AGG_SIZE - start) / CACHE_BLOCK_SIZE;
509   off_t delta     = oft - write_off;
510   if (delta >= 0)
511     return delta < hit_evacuate_window;
512   else
513     return -delta > (data_blocks - hit_evacuate_window) && -delta < data_blocks;
514 }
515 
516 TS_INLINE uint32_t
round_to_approx_size(uint32_t l)517 Vol::round_to_approx_size(uint32_t l)
518 {
519   uint32_t ll = round_to_approx_dir_size(l);
520   return ROUND_TO_SECTOR(this, ll);
521 }
522