1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #pragma once
25
26 #include <atomic>
27
28 #define CACHE_BLOCK_SHIFT 9
29 #define CACHE_BLOCK_SIZE (1 << CACHE_BLOCK_SHIFT) // 512, smallest sector size
30 #define ROUND_TO_STORE_BLOCK(_x) INK_ALIGN((_x), STORE_BLOCK_SIZE)
31 #define ROUND_TO_CACHE_BLOCK(_x) INK_ALIGN((_x), CACHE_BLOCK_SIZE)
32 #define ROUND_TO_SECTOR(_p, _x) INK_ALIGN((_x), _p->sector_size)
33 #define ROUND_TO(_x, _y) INK_ALIGN((_x), (_y))
34
35 // Vol (volumes)
36 #define VOL_MAGIC 0xF1D0F00D
37 #define START_BLOCKS 16 // 8k, STORE_BLOCK_SIZE
38 #define START_POS ((off_t)START_BLOCKS * CACHE_BLOCK_SIZE)
39 #define AGG_SIZE (4 * 1024 * 1024) // 4MB
40 #define AGG_HIGH_WATER (AGG_SIZE / 2) // 2MB
41 #define EVACUATION_SIZE (2 * AGG_SIZE) // 8MB
42 #define MAX_VOL_SIZE ((off_t)512 * 1024 * 1024 * 1024 * 1024)
43 #define STORE_BLOCKS_PER_CACHE_BLOCK (STORE_BLOCK_SIZE / CACHE_BLOCK_SIZE)
44 #define MAX_VOL_BLOCKS (MAX_VOL_SIZE / CACHE_BLOCK_SIZE)
45 #define MAX_FRAG_SIZE (AGG_SIZE - sizeof(Doc)) // true max
46 #define LEAVE_FREE DEFAULT_MAX_BUFFER_SIZE
47 #define PIN_SCAN_EVERY 16 // scan every 1/16 of disk
48 #define VOL_HASH_TABLE_SIZE 32707
49 #define VOL_HASH_EMPTY 0xFFFF
50 #define VOL_HASH_ALLOC_SIZE (8 * 1024 * 1024) // one chance per this unit
51 #define LOOKASIDE_SIZE 256
52 #define EVACUATION_BUCKET_SIZE (2 * EVACUATION_SIZE) // 16MB
53 #define RECOVERY_SIZE EVACUATION_SIZE // 8MB
54 #define AIO_NOT_IN_PROGRESS 0
55 #define AIO_AGG_WRITE_IN_PROGRESS -1
56 #define AUTO_SIZE_RAM_CACHE -1 // 1-1 with directory size
57 #define DEFAULT_TARGET_FRAGMENT_SIZE (1048576 - sizeof(Doc)) // 1MB
58
59 #define dir_offset_evac_bucket(_o) (_o / (EVACUATION_BUCKET_SIZE / CACHE_BLOCK_SIZE))
60 #define dir_evac_bucket(_e) dir_offset_evac_bucket(dir_offset(_e))
61 #define offset_evac_bucket(_d, _o) \
62 dir_offset_evac_bucket((_d->offset_to_vol_offset(_o)
63
64 // Documents
65
66 #define DOC_MAGIC ((uint32_t)0x5F129B13)
67 #define DOC_CORRUPT ((uint32_t)0xDEADBABE)
68 #define DOC_NO_CHECKSUM ((uint32_t)0xA0B0C0D0)
69
70 struct Cache;
71 struct Vol;
72 struct CacheDisk;
73 struct VolInitInfo;
74 struct DiskVol;
75 struct CacheVol;
76
77 struct VolHeaderFooter {
78 unsigned int magic;
79 ts::VersionNumber version;
80 time_t create_time;
81 off_t write_pos;
82 off_t last_write_pos;
83 off_t agg_pos;
84 uint32_t generation; // token generation (vary), this cannot be 0
85 uint32_t phase;
86 uint32_t cycle;
87 uint32_t sync_serial;
88 uint32_t write_serial;
89 uint32_t dirty;
90 uint32_t sector_size;
91 uint32_t unused; // pad out to 8 byte boundary
92 uint16_t freelist[1];
93 };
94
95 // Key and Earliest key for each fragment that needs to be evacuated
96 struct EvacuationKey {
97 SLink<EvacuationKey> link;
98 CryptoHash key;
99 CryptoHash earliest_key;
100 };
101
102 struct EvacuationBlock {
103 union {
104 unsigned int init;
105 struct {
106 unsigned int done : 1; // has been evacuated
107 unsigned int pinned : 1; // check pinning timeout
108 unsigned int evacuate_head : 1; // check pinning timeout
109 unsigned int unused : 29;
110 } f;
111 };
112
113 int readers;
114 Dir dir;
115 Dir new_dir;
116 // we need to have a list of evacuationkeys because of collision.
117 EvacuationKey evac_frags;
118 CacheVC *earliest_evacuator;
119 LINK(EvacuationBlock, link);
120 };
121
122 struct Vol : public Continuation {
123 char *path = nullptr;
124 ats_scoped_str hash_text;
125 CryptoHash hash_id;
126 int fd = -1;
127
128 char *raw_dir = nullptr;
129 Dir *dir = nullptr;
130 VolHeaderFooter *header = nullptr;
131 VolHeaderFooter *footer = nullptr;
132 int segments = 0;
133 off_t buckets = 0;
134 off_t recover_pos = 0;
135 off_t prev_recover_pos = 0;
136 off_t scan_pos = 0;
137 off_t skip = 0; // start of headers
138 off_t start = 0; // start of data
139 off_t len = 0;
140 off_t data_blocks = 0;
141 int hit_evacuate_window = 0;
142 AIOCallbackInternal io;
143
144 Queue<CacheVC, Continuation::Link_link> agg;
145 Queue<CacheVC, Continuation::Link_link> stat_cache_vcs;
146 Queue<CacheVC, Continuation::Link_link> sync;
147 char *agg_buffer = nullptr;
148 int agg_todo_size = 0;
149 int agg_buf_pos = 0;
150
151 Event *trigger = nullptr;
152
153 OpenDir open_dir;
154 RamCache *ram_cache = nullptr;
155 int evacuate_size = 0;
156 DLL<EvacuationBlock> *evacuate = nullptr;
157 DLL<EvacuationBlock> lookaside[LOOKASIDE_SIZE];
158 CacheVC *doc_evacuator = nullptr;
159
160 VolInitInfo *init_info = nullptr;
161
162 CacheDisk *disk = nullptr;
163 Cache *cache = nullptr;
164 CacheVol *cache_vol = nullptr;
165 uint32_t last_sync_serial = 0;
166 uint32_t last_write_serial = 0;
167 uint32_t sector_size = 0;
168 bool recover_wrapped = false;
169 bool dir_sync_waiting = false;
170 bool dir_sync_in_progress = false;
171 bool writing_end_marker = false;
172
173 CacheKey first_fragment_key;
174 int64_t first_fragment_offset = 0;
175 Ptr<IOBufferData> first_fragment_data;
176
177 void cancel_trigger();
178
179 int recover_data();
180
181 int open_write(CacheVC *cont, int allow_if_writers, int max_writers);
182 int open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers);
183 int close_write(CacheVC *cont);
184 int close_write_lock(CacheVC *cont);
185 int begin_read(CacheVC *cont);
186 int begin_read_lock(CacheVC *cont);
187 // unused read-write interlock code
188 // currently http handles a write-lock failure by retrying the read
189 OpenDirEntry *open_read(const CryptoHash *key);
190 OpenDirEntry *open_read_lock(CryptoHash *key, EThread *t);
191 int close_read(CacheVC *cont);
192 int close_read_lock(CacheVC *cont);
193
194 int clear_dir();
195
196 int init(char *s, off_t blocks, off_t dir_skip, bool clear);
197
198 int handle_dir_clear(int event, void *data);
199 int handle_dir_read(int event, void *data);
200 int handle_recover_from_data(int event, void *data);
201 int handle_recover_write_dir(int event, void *data);
202 int handle_header_read(int event, void *data);
203
204 int dir_init_done(int event, void *data);
205
206 int dir_check(bool fix);
207 int db_check(bool fix);
208
209 int
is_io_in_progressVol210 is_io_in_progress()
211 {
212 return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
213 }
214 int
increment_generationVol215 increment_generation()
216 {
217 // this is stored in the offset field of the directory (!=0)
218 ink_assert(mutex->thread_holding == this_ethread());
219 header->generation++;
220 if (!header->generation)
221 header->generation++;
222 return header->generation;
223 }
224 void
set_io_not_in_progressVol225 set_io_not_in_progress()
226 {
227 io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
228 }
229
230 int aggWriteDone(int event, Event *e);
231 int aggWrite(int event, void *e);
232 void agg_wrap();
233
234 int evacuateWrite(CacheVC *evacuator, int event, Event *e);
235 int evacuateDocReadDone(int event, Event *e);
236 int evacuateDoc(int event, Event *e);
237
238 int evac_range(off_t start, off_t end, int evac_phase);
239 void periodic_scan();
240 void scan_for_pinned_documents();
241 void evacuate_cleanup_blocks(int i);
242 void evacuate_cleanup();
243 EvacuationBlock *force_evacuate_head(Dir *dir, int pinned);
244 int within_hit_evacuate_window(Dir *dir);
245 uint32_t round_to_approx_size(uint32_t l);
246
247 // inline functions
248 int headerlen(); // calculates the total length of the vol header and the freelist
249 int direntries(); // total number of dir entries
250 Dir *dir_segment(int s); // returns the first dir in the segment s
251 size_t dirlen(); // calculates the total length of header, directories and footer
252 int vol_out_of_phase_valid(Dir *e);
253
254 int vol_out_of_phase_agg_valid(Dir *e);
255 int vol_out_of_phase_write_valid(Dir *e);
256 int vol_in_phase_valid(Dir *e);
257 int vol_in_phase_agg_buf_valid(Dir *e);
258
259 off_t vol_offset(Dir *e);
260 off_t offset_to_vol_offset(off_t pos);
261 off_t vol_offset_to_offset(off_t pos);
262 off_t vol_relative_length(off_t start_offset);
263
VolVol264 Vol() : Continuation(new_ProxyMutex())
265 {
266 open_dir.mutex = mutex;
267 agg_buffer = (char *)ats_memalign(ats_pagesize(), AGG_SIZE);
268 memset(agg_buffer, 0, AGG_SIZE);
269 SET_HANDLER(&Vol::aggWrite);
270 }
271
~VolVol272 ~Vol() override { ats_free(agg_buffer); }
273 };
274
275 struct AIO_Callback_handler : public Continuation {
276 int handle_disk_failure(int event, void *data);
277
AIO_Callback_handlerAIO_Callback_handler278 AIO_Callback_handler() : Continuation(new_ProxyMutex()) { SET_HANDLER(&AIO_Callback_handler::handle_disk_failure); }
279 };
280
281 struct CacheVol {
282 int vol_number = -1;
283 int scheme = 0;
284 off_t size = 0;
285 int num_vols = 0;
286 bool ramcache_enabled = true;
287 Vol **vols = nullptr;
288 DiskVol **disk_vols = nullptr;
289 LINK(CacheVol, link);
290 // per volume stats
291 RecRawStatBlock *vol_rsb = nullptr;
292
CacheVolCacheVol293 CacheVol() {}
294 };
295
296 // Note : hdr() needs to be 8 byte aligned.
297 struct Doc {
298 uint32_t magic; // DOC_MAGIC
299 uint32_t len; // length of this fragment (including hlen & sizeof(Doc), unrounded)
300 uint64_t total_len; // total length of document
301 #if TS_ENABLE_FIPS == 1
302 // For FIPS CryptoHash is 256 bits vs. 128, and the 'first_key' must be checked first, so
303 // ensure that the new 'first_key' overlaps the old 'first_key' and that the rest of the data layout
304 // is the same by putting 'key' at the ned.
305 CryptoHash first_key; ///< first key in object.
306 #else
307 CryptoHash first_key; ///< first key in object.
308 CryptoHash key; ///< Key for this doc.
309 #endif
310 uint32_t hlen; ///< Length of this header.
311 uint32_t doc_type : 8; ///< Doc type - indicates the format of this structure and its content.
312 uint32_t v_major : 8; ///< Major version number.
313 uint32_t v_minor : 8; ///< Minor version number.
314 uint32_t unused : 8; ///< Unused, forced to zero.
315 uint32_t sync_serial;
316 uint32_t write_serial;
317 uint32_t pinned; // pinned until
318 uint32_t checksum;
319 #if TS_ENABLE_FIPS == 1
320 CryptoHash key; ///< Key for this doc.
321 #endif
322
323 uint32_t data_len();
324 uint32_t prefix_len();
325 int single_fragment();
326 int no_data_in_fragment();
327 char *hdr();
328 char *data();
329 };
330
331 // Global Data
332
333 extern Vol **gvol;
334 extern std::atomic<int> gnvol;
335 extern ClassAllocator<OpenDirEntry> openDirEntryAllocator;
336 extern ClassAllocator<EvacuationBlock> evacuationBlockAllocator;
337 extern ClassAllocator<EvacuationKey> evacuationKeyAllocator;
338 extern unsigned short *vol_hash_table;
339
340 // inline Functions
341
342 TS_INLINE int
headerlen()343 Vol::headerlen()
344 {
345 return ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter) + sizeof(uint16_t) * (this->segments - 1));
346 }
347
348 TS_INLINE Dir *
dir_segment(int s)349 Vol::dir_segment(int s)
350 {
351 return (Dir *)(((char *)this->dir) + (s * this->buckets) * DIR_DEPTH * SIZEOF_DIR);
352 }
353
354 TS_INLINE size_t
dirlen()355 Vol::dirlen()
356 {
357 return this->headerlen() + ROUND_TO_STORE_BLOCK(((size_t)this->buckets) * DIR_DEPTH * this->segments * SIZEOF_DIR) +
358 ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
359 }
360
361 TS_INLINE int
direntries()362 Vol::direntries()
363 {
364 return this->buckets * DIR_DEPTH * this->segments;
365 }
366
367 TS_INLINE int
vol_out_of_phase_valid(Dir * e)368 Vol::vol_out_of_phase_valid(Dir *e)
369 {
370 return (dir_offset(e) - 1 >= ((this->header->agg_pos - this->start) / CACHE_BLOCK_SIZE));
371 }
372
373 TS_INLINE int
vol_out_of_phase_agg_valid(Dir * e)374 Vol::vol_out_of_phase_agg_valid(Dir *e)
375 {
376 return (dir_offset(e) - 1 >= ((this->header->agg_pos - this->start + AGG_SIZE) / CACHE_BLOCK_SIZE));
377 }
378
379 TS_INLINE int
vol_out_of_phase_write_valid(Dir * e)380 Vol::vol_out_of_phase_write_valid(Dir *e)
381 {
382 return (dir_offset(e) - 1 >= ((this->header->write_pos - this->start) / CACHE_BLOCK_SIZE));
383 }
384
385 TS_INLINE int
vol_in_phase_valid(Dir * e)386 Vol::vol_in_phase_valid(Dir *e)
387 {
388 return (dir_offset(e) - 1 < ((this->header->write_pos + this->agg_buf_pos - this->start) / CACHE_BLOCK_SIZE));
389 }
390
391 TS_INLINE off_t
vol_offset(Dir * e)392 Vol::vol_offset(Dir *e)
393 {
394 return this->start + (off_t)dir_offset(e) * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE;
395 }
396
397 TS_INLINE off_t
offset_to_vol_offset(off_t pos)398 Vol::offset_to_vol_offset(off_t pos)
399 {
400 return ((pos - this->start + CACHE_BLOCK_SIZE) / CACHE_BLOCK_SIZE);
401 }
402
403 TS_INLINE off_t
vol_offset_to_offset(off_t pos)404 Vol::vol_offset_to_offset(off_t pos)
405 {
406 return this->start + pos * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE;
407 }
408
409 TS_INLINE int
vol_in_phase_agg_buf_valid(Dir * e)410 Vol::vol_in_phase_agg_buf_valid(Dir *e)
411 {
412 return (this->vol_offset(e) >= this->header->write_pos && this->vol_offset(e) < (this->header->write_pos + this->agg_buf_pos));
413 }
414 // length of the partition not including the offset of location 0.
415 TS_INLINE off_t
vol_relative_length(off_t start_offset)416 Vol::vol_relative_length(off_t start_offset)
417 {
418 return (this->len + this->skip) - start_offset;
419 }
420
421 TS_INLINE uint32_t
prefix_len()422 Doc::prefix_len()
423 {
424 return sizeof(Doc) + hlen;
425 }
426
427 TS_INLINE uint32_t
data_len()428 Doc::data_len()
429 {
430 return len - sizeof(Doc) - hlen;
431 }
432
433 TS_INLINE int
single_fragment()434 Doc::single_fragment()
435 {
436 return data_len() == total_len;
437 }
438
439 TS_INLINE char *
hdr()440 Doc::hdr()
441 {
442 return reinterpret_cast<char *>(this) + sizeof(Doc);
443 }
444
445 TS_INLINE char *
data()446 Doc::data()
447 {
448 return this->hdr() + hlen;
449 }
450
451 int vol_dir_clear(Vol *d);
452 int vol_init(Vol *d, char *s, off_t blocks, off_t skip, bool clear);
453
454 // inline Functions
455
456 TS_INLINE EvacuationBlock *
evacuation_block_exists(Dir * dir,Vol * p)457 evacuation_block_exists(Dir *dir, Vol *p)
458 {
459 EvacuationBlock *b = p->evacuate[dir_evac_bucket(dir)].head;
460 for (; b; b = b->link.next)
461 if (dir_offset(&b->dir) == dir_offset(dir))
462 return b;
463 return nullptr;
464 }
465
466 TS_INLINE void
cancel_trigger()467 Vol::cancel_trigger()
468 {
469 if (trigger) {
470 trigger->cancel_action();
471 trigger = nullptr;
472 }
473 }
474
475 TS_INLINE EvacuationBlock *
new_EvacuationBlock(EThread * t)476 new_EvacuationBlock(EThread *t)
477 {
478 EvacuationBlock *b = THREAD_ALLOC(evacuationBlockAllocator, t);
479 b->init = 0;
480 b->readers = 0;
481 b->earliest_evacuator = nullptr;
482 b->evac_frags.link.next = nullptr;
483 return b;
484 }
485
486 TS_INLINE void
free_EvacuationBlock(EvacuationBlock * b,EThread * t)487 free_EvacuationBlock(EvacuationBlock *b, EThread *t)
488 {
489 EvacuationKey *e = b->evac_frags.link.next;
490 while (e) {
491 EvacuationKey *n = e->link.next;
492 evacuationKeyAllocator.free(e);
493 e = n;
494 }
495 THREAD_FREE(b, evacuationBlockAllocator, t);
496 }
497
498 TS_INLINE OpenDirEntry *
open_read(const CryptoHash * key)499 Vol::open_read(const CryptoHash *key)
500 {
501 return open_dir.open_read(key);
502 }
503
504 TS_INLINE int
within_hit_evacuate_window(Dir * xdir)505 Vol::within_hit_evacuate_window(Dir *xdir)
506 {
507 off_t oft = dir_offset(xdir) - 1;
508 off_t write_off = (header->write_pos + AGG_SIZE - start) / CACHE_BLOCK_SIZE;
509 off_t delta = oft - write_off;
510 if (delta >= 0)
511 return delta < hit_evacuate_window;
512 else
513 return -delta > (data_blocks - hit_evacuate_window) && -delta < data_blocks;
514 }
515
516 TS_INLINE uint32_t
round_to_approx_size(uint32_t l)517 Vol::round_to_approx_size(uint32_t l)
518 {
519 uint32_t ll = round_to_approx_dir_size(l);
520 return ROUND_TO_SECTOR(this, ll);
521 }
522