1 /** @file
2
3 Main program file for Cache Tool.
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include "CacheDefs.h"
25 #include <iostream>
26 #include <fcntl.h>
27
28 using namespace std;
29 using namespace ts;
30
31 using ts::Errata;
32 namespace ts
33 {
34 std::ostream &
operator <<(std::ostream & s,Bytes const & n)35 operator<<(std::ostream &s, Bytes const &n)
36 {
37 return s << n.count() << " bytes";
38 }
39 std::ostream &
operator <<(std::ostream & s,Kilobytes const & n)40 operator<<(std::ostream &s, Kilobytes const &n)
41 {
42 return s << n.count() << " KB";
43 }
44 std::ostream &
operator <<(std::ostream & s,Megabytes const & n)45 operator<<(std::ostream &s, Megabytes const &n)
46 {
47 return s << n.count() << " MB";
48 }
49 std::ostream &
operator <<(std::ostream & s,Gigabytes const & n)50 operator<<(std::ostream &s, Gigabytes const &n)
51 {
52 return s << n.count() << " GB";
53 }
54 std::ostream &
operator <<(std::ostream & s,Terabytes const & n)55 operator<<(std::ostream &s, Terabytes const &n)
56 {
57 return s << n.count() << " TB";
58 }
59
60 std::ostream &
operator <<(std::ostream & s,CacheStripeBlocks const & n)61 operator<<(std::ostream &s, CacheStripeBlocks const &n)
62 {
63 return s << n.count() << " stripe blocks";
64 }
65 std::ostream &
operator <<(std::ostream & s,CacheStoreBlocks const & n)66 operator<<(std::ostream &s, CacheStoreBlocks const &n)
67 {
68 return s << n.count() << " store blocks";
69 }
70 std::ostream &
operator <<(std::ostream & s,CacheDataBlocks const & n)71 operator<<(std::ostream &s, CacheDataBlocks const &n)
72 {
73 return s << n.count() << " data blocks";
74 }
75
76 Errata
parseURL(TextView URI)77 URLparser::parseURL(TextView URI)
78 {
79 Errata zret;
80 static const TextView HTTP("http");
81 static const TextView HTTPS("https");
82 TextView scheme = URI.take_prefix_at(':');
83 if ((strcasecmp(scheme, HTTP) == 0) || (strcasecmp(scheme, HTTPS) == 0)) {
84 TextView hostname = URI.take_prefix_at(':');
85 if (!hostname) // i.e. port not present
86 {
87 }
88 }
89
90 return zret;
91 }
92
93 int
getPort(std::string & fullURL,int & port_ptr,int & port_len)94 URLparser::getPort(std::string &fullURL, int &port_ptr, int &port_len)
95 {
96 url_matcher matcher;
97 int n_port = -1;
98 int u_pos = -1;
99
100 if (fullURL.find("https") == 0) {
101 u_pos = 8;
102 n_port = 443;
103 } else if (fullURL.find("http") == 0) {
104 u_pos = 7;
105 n_port = 80;
106 }
107 if (u_pos != -1) {
108 fullURL.insert(u_pos, ":@");
109 TextView url(fullURL.data(), static_cast<int>(fullURL.size()));
110
111 url += 9;
112
113 TextView hostPort = url.take_prefix_at(':');
114 if (!hostPort.empty()) // i.e. port is present
115 {
116 TextView port = url.take_prefix_at('/');
117 if (port.empty()) { // i.e. backslash is not present, then the rest of the url must be just port
118 port = url;
119 }
120 if (matcher.portmatch(port.data(), port.size())) {
121 TextView text;
122 n_port = svtoi(port, &text);
123 if (text == port) {
124 port_ptr = fullURL.find(':', 9);
125 port_len = port.size();
126 return n_port;
127 }
128 }
129 }
130 return n_port;
131 } else {
132 std::cout << "No scheme provided for: " << fullURL << std::endl;
133 return -1;
134 }
135 }
136
137 uint32_t
prefix_len()138 Doc::prefix_len()
139 {
140 return sizeof(Doc) + hlen;
141 }
142
143 uint32_t
data_len()144 Doc::data_len()
145 {
146 return len - sizeof(Doc) - hlen;
147 }
148
149 int
single_fragment()150 Doc::single_fragment()
151 {
152 return data_len() == total_len;
153 }
154
155 char *
hdr()156 Doc::hdr()
157 {
158 return reinterpret_cast<char *>(this) + sizeof(Doc);
159 }
160
161 char *
data()162 Doc::data()
163 {
164 return this->hdr() + hlen;
165 }
166 } // end namespace ts
167
168 int cache_config_min_average_object_size = ESTIMATED_OBJECT_SIZE;
169 CacheStoreBlocks Vol_hash_alloc_size(1024);
170 // Default this to read only, only enable write if specifically required.
171 int OPEN_RW_FLAG = O_RDONLY;
172
173 namespace ct
174 {
175 bool
validate_sync_serial()176 Stripe::validate_sync_serial()
177 {
178 // check if A sync_serials match and A is at least as updated as B
179 return (_meta[0][0].sync_serial == _meta[0][1].sync_serial &&
180 (_meta[0][0].sync_serial >= _meta[1][0].sync_serial ||
181 _meta[1][0].sync_serial != _meta[1][1].sync_serial)) || // OR check if B's sync_serials match
182 (_meta[1][0].sync_serial == _meta[1][1].sync_serial);
183 }
184
185 Errata
clear()186 Stripe::clear()
187 {
188 Errata zret;
189 alignas(512) static char zero[CacheStoreBlocks::SCALE]; // should be all zero, it's static.
190 for (auto i : {A, B}) {
191 for (auto j : {HEAD, FOOT}) {
192 ssize_t n = pwrite(_span->_fd, zero, CacheStoreBlocks::SCALE, this->_meta_pos[i][j]);
193 if (n < CacheStoreBlocks::SCALE) {
194 std::cout << "Failed to clear stripe header" << std::endl;
195 }
196 }
197 }
198
199 return zret;
200 }
~Chunk()201 Stripe::Chunk::~Chunk()
202 {
203 this->clear();
204 }
205 void
append(MemSpan<void> m)206 Stripe::Chunk::append(MemSpan<void> m)
207 {
208 _chain.push_back(m);
209 }
210 void
clear()211 Stripe::Chunk::clear()
212 {
213 for (auto &m : _chain) {
214 free(const_cast<void *>(m.data()));
215 }
216 _chain.clear();
217 }
218
Stripe(Span * span,const Bytes & start,const CacheStoreBlocks & len)219 Stripe::Stripe(Span *span, const Bytes &start, const CacheStoreBlocks &len) : _span(span), _start(start), _len(len)
220 {
221 ts::bwprint(hashText, "{} {}:{}", span->_path.view(), _start.count(), _len.count());
222 CryptoContext().hash_immediate(hash_id, hashText.data(), static_cast<int>(hashText.size()));
223 printf("hash id of stripe is hash of %.*s\n", static_cast<int>(hashText.size()), hashText.data());
224 }
225
226 bool
isFree() const227 Stripe::isFree() const
228 {
229 return 0 == _vol_idx;
230 }
231
232 // TODO: Implement the whole logic
233 Errata
InitializeMeta()234 Stripe::InitializeMeta()
235 {
236 Errata zret;
237 // memset(this->raw_dir, 0, dir_len);
238 for (auto &i : _meta) {
239 for (auto &j : i) {
240 j.magic = StripeMeta::MAGIC;
241 j.version._major = ts::CACHE_DB_MAJOR_VERSION;
242 j.version._minor = ts::CACHE_DB_MINOR_VERSION;
243 j.agg_pos = j.last_write_pos = j.write_pos = this->_content;
244 j.phase = j.cycle = j.sync_serial = j.write_serial = j.dirty = 0;
245 j.create_time = time(nullptr);
246 j.sector_size = DEFAULT_HW_SECTOR_SIZE;
247 }
248 }
249 if (!freelist) // freelist is not allocated yet
250 {
251 freelist = static_cast<uint16_t *>(malloc(_segments * sizeof(uint16_t))); // segments has already been calculated
252 }
253 if (!dir) // for new spans, this will likely be nullptr as we don't need to read the stripe meta from disk
254 {
255 char *raw_dir = static_cast<char *>(ats_memalign(ats_pagesize(), this->vol_dirlen()));
256 dir = reinterpret_cast<CacheDirEntry *>(raw_dir + this->vol_headerlen());
257 }
258 init_dir();
259 return zret;
260 }
261
262 // Need to be bit more robust at some point.
263 bool
validateMeta(StripeMeta const * meta)264 Stripe::validateMeta(StripeMeta const *meta)
265 {
266 // Need to be bit more robust at some point.
267 return StripeMeta::MAGIC == meta->magic && meta->version._major <= ts::CACHE_DB_MAJOR_VERSION &&
268 meta->version._minor <= 2 // This may have always been zero, actually.
269 ;
270 }
271
272 bool
probeMeta(MemSpan<void> & mem,StripeMeta const * base_meta)273 Stripe::probeMeta(MemSpan<void> &mem, StripeMeta const *base_meta)
274 {
275 while (mem.size() >= sizeof(StripeMeta)) {
276 StripeMeta const *meta = static_cast<StripeMeta *>(mem.data());
277 if (this->validateMeta(meta) && (base_meta == nullptr || // no base version to check against.
278 (meta->version == base_meta->version) // need more checks here I think.
279 )) {
280 return true;
281 }
282 // The meta data is stored aligned on a stripe block boundary, so only need to check there.
283 mem.remove_prefix(CacheStoreBlocks::SCALE);
284 }
285 return false;
286 }
287
288 Errata
updateHeaderFooter()289 Stripe::updateHeaderFooter()
290 {
291 Errata zret;
292 this->vol_init_data();
293
294 int64_t hdr_size = this->vol_headerlen();
295 int64_t dir_size = this->vol_dirlen();
296 Bytes footer_offset = Bytes(dir_size - ROUND_TO_STORE_BLOCK(sizeof(StripeMeta)));
297 _meta_pos[A][HEAD] = round_down(_start);
298 _meta_pos[A][FOOT] = round_down(_start + footer_offset);
299 _meta_pos[B][HEAD] = round_down(this->_start + Bytes(dir_size));
300 _meta_pos[B][FOOT] = round_down(this->_start + Bytes(dir_size) + footer_offset);
301 std::cout << "updating header " << _meta_pos[0][0] << std::endl;
302 std::cout << "updating header " << _meta_pos[0][1] << std::endl;
303 std::cout << "updating header " << _meta_pos[1][0] << std::endl;
304 std::cout << "updating header " << _meta_pos[1][1] << std::endl;
305 InitializeMeta();
306
307 if (!OPEN_RW_FLAG) {
308 zret.push(0, 1, "Writing Not Enabled.. Please use --write to enable writing to disk");
309 return zret;
310 }
311
312 char *meta_t = static_cast<char *>(ats_memalign(ats_pagesize(), dir_size));
313 // copy headers
314 for (auto i : {A, B}) {
315 // copy header
316 memcpy(meta_t, &_meta[i][HEAD], sizeof(StripeMeta));
317 // copy freelist
318 memcpy(meta_t + sizeof(StripeMeta) - sizeof(uint16_t), this->freelist, this->_segments * sizeof(uint16_t));
319
320 ssize_t n = pwrite(_span->_fd, meta_t, hdr_size, _meta_pos[i][HEAD]);
321 if (n < hdr_size) {
322 std::cout << "problem writing header to disk: " << strerror(errno) << ":"
323 << " " << n << "<" << hdr_size << std::endl;
324 zret = Errata::Message(0, errno, "Failed to write stripe header ");
325 ats_free(meta_t);
326 return zret;
327 }
328 // copy dir entries
329 dir_size = dir_size - hdr_size - ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
330 memcpy(meta_t, (char *)dir, dir_size);
331 n = pwrite(_span->_fd, meta_t, dir_size, _meta_pos[i][HEAD] + hdr_size); //
332 if (n < dir_size) {
333 std::cout << "problem writing dir to disk: " << strerror(errno) << ":"
334 << " " << n << "<" << dir_size << std::endl;
335 zret = Errata::Message(0, errno, "Failed to write stripe header ");
336 ats_free(meta_t);
337 return zret;
338 }
339
340 // copy footer,
341 memcpy(meta_t, &_meta[i][FOOT], sizeof(StripeMeta));
342
343 int64_t footer_size = ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
344 n = pwrite(_span->_fd, meta_t, footer_size, _meta_pos[i][FOOT]);
345 if (n < footer_size) {
346 std::cout << "problem writing footer to disk: " << strerror(errno) << ":"
347 << " " << n << "<" << footer_size << std::endl;
348 zret = Errata::Message(0, errno, "Failed to write stripe header ");
349 ats_free(meta_t);
350 return zret;
351 }
352 }
353 ats_free(meta_t);
354 return zret;
355 }
356
357 size_t
vol_dirlen()358 Stripe::vol_dirlen()
359 {
360 return vol_headerlen() + ROUND_TO_STORE_BLOCK(((size_t)this->_buckets) * DIR_DEPTH * this->_segments * SIZEOF_DIR) +
361 ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
362 }
363
364 void
vol_init_data_internal()365 Stripe::vol_init_data_internal()
366 {
367 this->_buckets =
368 ((this->_len.count() * 8192 - (this->_content - this->_start)) / cache_config_min_average_object_size) / DIR_DEPTH;
369 this->_segments = (this->_buckets + (((1 << 16) - 1) / DIR_DEPTH)) / ((1 << 16) / DIR_DEPTH);
370 this->_buckets = (this->_buckets + this->_segments - 1) / this->_segments;
371 this->_content = this->_start + Bytes(2 * vol_dirlen());
372 }
373
374 void
vol_init_data()375 Stripe::vol_init_data()
376 {
377 // iteratively calculate start + buckets
378 this->vol_init_data_internal();
379 this->vol_init_data_internal();
380 this->vol_init_data_internal();
381 }
382
383 void
updateLiveData(enum Copy c)384 Stripe::updateLiveData(enum Copy c)
385 {
386 // CacheStoreBlocks delta{_meta_pos[c][FOOT] - _meta_pos[c][HEAD]};
387 CacheStoreBlocks header_len(0);
388 // int64_t n_buckets;
389 // int64_t n_segments;
390
391 /*
392 * COMMENTING THIS SECTION FOR NOW TO USE THE EXACT LOGIN USED IN ATS TO CALCULATE THE NUMBER OF SEGMENTS AND BUCKETS
393 // Past the header is the segment free list heads which if sufficiently long (> ~4K) can take
394 // more than 1 store block. Start with a guess of 1 and adjust upwards as needed. A 2TB stripe
395 // with an AOS of 8000 has roughly 3700 segments meaning that for even 10TB drives this loop
396 // should only be a few iterations.
397 do {
398 ++header_len;
399 n_buckets = Bytes(delta - header_len) / (sizeof(CacheDirEntry) * ts::ENTRIES_PER_BUCKET);
400 n_segments = n_buckets / ts::MAX_BUCKETS_PER_SEGMENT;
401 // This should never be more than one loop, usually none.
402 while ((n_buckets / n_segments) > ts::MAX_BUCKETS_PER_SEGMENT)
403 ++n_segments;
404 } while ((sizeof(StripeMeta) + sizeof(uint16_t) * n_segments) > static_cast<size_t>(header_len));
405
406 _buckets = n_buckets / n_segments;
407 _segments = n_segments;
408 */
409 _directory._skip = header_len;
410 }
411
412 bool
dir_compare_tag(const CacheDirEntry * e,const CryptoHash * key)413 dir_compare_tag(const CacheDirEntry *e, const CryptoHash *key)
414 {
415 return (dir_tag(e) == DIR_MASK_TAG(key->slice32(2)));
416 }
417
418 int
vol_in_phase_valid(Stripe * d,CacheDirEntry * e)419 vol_in_phase_valid(Stripe *d, CacheDirEntry *e)
420 {
421 return (dir_offset(e) - 1 < ((d->_meta[0][0].write_pos + d->agg_buf_pos - d->_start) / CACHE_BLOCK_SIZE));
422 }
423
424 int
vol_out_of_phase_valid(Stripe * d,CacheDirEntry * e)425 vol_out_of_phase_valid(Stripe *d, CacheDirEntry *e)
426 {
427 return (dir_offset(e) - 1 >= ((d->_meta[0][0].agg_pos - d->_start) / CACHE_BLOCK_SIZE));
428 }
429
430 bool
dir_valid(CacheDirEntry * _e)431 Stripe::dir_valid(CacheDirEntry *_e)
432 {
433 return (this->_meta[0][0].phase == dir_phase(_e) ? vol_in_phase_valid(this, _e) : vol_out_of_phase_valid(this, _e));
434 }
435
436 Bytes
stripe_offset(CacheDirEntry * e)437 Stripe::stripe_offset(CacheDirEntry *e)
438 {
439 return this->_content + Bytes((dir_offset(e) * CACHE_BLOCK_SIZE) - CACHE_BLOCK_SIZE);
440 }
441
442 int
dir_probe(CryptoHash * key,CacheDirEntry * result,CacheDirEntry ** last_collision)443 Stripe::dir_probe(CryptoHash *key, CacheDirEntry *result, CacheDirEntry **last_collision)
444 {
445 int segment = key->slice32(0) % this->_segments;
446 int bucket = key->slice32(1) % this->_buckets;
447
448 CacheDirEntry *seg = this->dir_segment(segment);
449 CacheDirEntry *e = nullptr;
450 e = dir_bucket(bucket, seg);
451 char *stripe_buff2 = nullptr;
452 Doc *doc = nullptr;
453 // TODO: collision craft is pending.. look at the main ATS code. Assuming no collision for now
454 if (dir_offset(e)) {
455 do {
456 if (dir_compare_tag(e, key)) {
457 if (dir_valid(e)) {
458 stripe_buff2 = static_cast<char *>(ats_memalign(ats_pagesize(), dir_approx_size(e)));
459 std::cout << "dir_probe hit: found seg: " << segment << " bucket: " << bucket << " offset: " << dir_offset(e)
460 << "size: " << dir_approx_size(e) << std::endl;
461 break;
462 } else {
463 // let's skip deleting for now
464 // e = dir_delete_entry(e, p ,segment);
465 // continue;
466 }
467 }
468 e = next_dir(e, seg);
469
470 } while (e);
471 if (e == nullptr) {
472 std::cout << "No directory entry found matching the URL key" << std::endl;
473 return 0;
474 }
475 int fd = _span->_fd;
476 Bytes offset = stripe_offset(e);
477 int64_t size = dir_approx_size(e);
478 ssize_t n = pread(fd, stripe_buff2, size, offset);
479 if (n < size) {
480 std::cout << "Failed to read content from the Stripe:" << strerror(errno) << std::endl;
481 }
482
483 doc = reinterpret_cast<Doc *>(stripe_buff2);
484 std::string hdr(doc->hdr(), doc->hlen);
485
486 std::string data_(doc->data(), doc->data_len());
487 std::cout << "DATA\n" << data_ << std::endl;
488 } else {
489 std::cout << "Not found in the Cache" << std::endl;
490 }
491 free(stripe_buff2);
492 return 0; // Why does this have a non-void return?
493 }
494
495 CacheDirEntry *
dir_delete_entry(CacheDirEntry * e,CacheDirEntry * p,int s)496 Stripe::dir_delete_entry(CacheDirEntry *e, CacheDirEntry *p, int s)
497 {
498 CacheDirEntry *seg = this->dir_segment(s);
499 int no = dir_next(e);
500 this->_meta[0][0].dirty = 1;
501 if (p) {
502 unsigned int fo = this->freelist[s];
503 unsigned int eo = dir_to_offset(e, seg);
504 dir_clear(e);
505 dir_set_next(p, no);
506 dir_set_next(e, fo);
507 if (fo) {
508 dir_set_prev(dir_from_offset(fo, seg), eo);
509 }
510 this->freelist[s] = eo;
511 } else {
512 CacheDirEntry *n = next_dir(e, seg);
513 if (n) {
514 dir_assign(e, n);
515 dir_delete_entry(n, e, s);
516 return e;
517 } else {
518 dir_clear(e);
519 return nullptr;
520 }
521 }
522 return dir_from_offset(no, seg);
523 }
524
525 void
walk_all_buckets()526 Stripe::walk_all_buckets()
527 {
528 for (int s = 0; s < this->_segments; s++) {
529 if (walk_bucket_chain(s)) {
530 std::cout << "Loop present in Segment " << s << std::endl;
531 }
532 }
533 }
534
535 bool
walk_bucket_chain(int s)536 Stripe::walk_bucket_chain(int s)
537 {
538 CacheDirEntry *seg = this->dir_segment(s);
539 std::bitset<65536> b_bitset;
540 b_bitset.reset();
541 for (int b = 0; b < this->_buckets; b++) {
542 CacheDirEntry *p = nullptr;
543 auto *dir_b = dir_bucket(b, seg);
544 CacheDirEntry *e = dir_b;
545 int len = 0;
546
547 while (e) {
548 len++;
549 int i = dir_to_offset(e, seg);
550 if (b_bitset.test(i)) {
551 std::cout << "bit already set in "
552 << "seg " << s << " bucket " << b << std::endl;
553 }
554 if (i > 0) { // i.e., not the first dir in the segment
555 b_bitset[i] = true;
556 }
557
558 #if 1
559 if (!dir_valid(e) || !dir_offset(e)) {
560 // std::cout<<"dir_clean in segment "<<s<<" =>cleaning "<<e<<" tag"<<dir_tag(e)<<" boffset"<< dir_offset(e)<< " bucket:
561 // "<<dir_b<< " bucket len: "<<dir_bucket_length(dir_b, s)<<std::endl;
562 e = dir_delete_entry(e, p, s);
563 continue;
564 }
565 #endif
566 p = e;
567 e = next_dir(e, seg);
568 }
569 // std::cout<<"dir len in this bucket "<<len<<std::endl;
570 }
571 return false;
572 }
573
574 void
dir_free_entry(CacheDirEntry * e,int s)575 Stripe::dir_free_entry(CacheDirEntry *e, int s)
576 {
577 CacheDirEntry *seg = this->dir_segment(s);
578 unsigned int fo = this->freelist[s];
579 unsigned int eo = dir_to_offset(e, seg);
580 dir_set_next(e, fo);
581 if (fo) {
582 dir_set_prev(dir_from_offset(fo, seg), eo);
583 }
584 this->freelist[s] = eo;
585 }
586
587 // adds all the directory entries
588 // in a segment to the segment freelist
589 void
dir_init_segment(int s)590 Stripe::dir_init_segment(int s)
591 {
592 this->freelist[s] = 0;
593 CacheDirEntry *seg = this->dir_segment(s);
594 int l, b;
595 memset(seg, 0, SIZEOF_DIR * DIR_DEPTH * this->_buckets);
596 for (l = 1; l < DIR_DEPTH; l++) {
597 for (b = 0; b < this->_buckets; b++) {
598 CacheDirEntry *bucket = dir_bucket(b, seg);
599 this->dir_free_entry(dir_bucket_row(bucket, l), s);
600 }
601 }
602 }
603
604 void
init_dir()605 Stripe::init_dir()
606 {
607 for (int s = 0; s < this->_segments; s++) {
608 this->freelist[s] = 0;
609 CacheDirEntry *seg = this->dir_segment(s);
610 int l, b;
611 for (l = 1; l < DIR_DEPTH; l++) {
612 for (b = 0; b < this->_buckets; b++) {
613 CacheDirEntry *bucket = dir_bucket(b, seg);
614 this->dir_free_entry(dir_bucket_row(bucket, l), s);
615 // std::cout<<"freelist"<<this->freelist[s]<<std::endl;
616 }
617 }
618 }
619 }
620
621 Errata
loadDir()622 Stripe::loadDir()
623 {
624 Errata zret;
625 int64_t dirlen = this->vol_dirlen();
626 char *raw_dir = static_cast<char *>(ats_memalign(ats_pagesize(), dirlen));
627 dir = reinterpret_cast<CacheDirEntry *>(raw_dir + this->vol_headerlen());
628 // read directory
629 ssize_t n = pread(this->_span->_fd, raw_dir, dirlen, this->_start);
630 if (n < dirlen) {
631 std::cout << "Failed to read Dir from stripe @" << this->hashText;
632 }
633 return zret;
634 }
635 //
636 // Cache Directory
637 //
638
639 #if 0
640 // return value 1 means no loop
641 // zero indicates loop
642 int
643 dir_bucket_loop_check(CacheDirEntry *start_dir, CacheDirEntry *seg)
644 {
645 if (start_dir == nullptr) {
646 return 1;
647 }
648
649 CacheDirEntry *p1 = start_dir;
650 CacheDirEntry *p2 = start_dir;
651
652 while (p2) {
653 // p1 moves by one entry per iteration
654 assert(p1);
655 p1 = next_dir(p1, seg);
656 // p2 moves by two entries per iteration
657 p2 = next_dir(p2, seg);
658 if (p2) {
659 p2 = next_dir(p2, seg);
660 } else {
661 return 1;
662 }
663
664 if (p2 == p1) {
665 return 0; // we have a loop
666 }
667 }
668 return 1;
669 }
670 #endif
671
672 int
dir_freelist_length(int s)673 Stripe::dir_freelist_length(int s)
674 {
675 int free = 0;
676 CacheDirEntry *seg = this->dir_segment(s);
677 CacheDirEntry *e = dir_from_offset(this->freelist[s], seg);
678 if (this->check_loop(s)) {
679 return (DIR_DEPTH - 1) * this->_buckets;
680 }
681 while (e) {
682 free++;
683 e = next_dir(e, seg);
684 }
685 return free;
686 }
687
688 int
check_loop(int s)689 Stripe::check_loop(int s)
690 {
691 // look for loop in the segment
692 // rewrite the freelist if loop is present
693 CacheDirEntry *seg = this->dir_segment(s);
694 CacheDirEntry *e = dir_from_offset(this->freelist[s], seg);
695 std::bitset<65536> f_bitset;
696 f_bitset.reset();
697 while (e) {
698 int i = dir_next(e);
699 if (f_bitset.test(i)) {
700 // bit was set in a previous round so a loop is present
701 std::cout << "<check_loop> Loop present in Span" << this->_span->_path.string() << " Stripe: " << this->hashText
702 << "Segment: " << s << std::endl;
703 this->dir_init_segment(s);
704 return 1;
705 }
706 f_bitset[i] = true;
707 e = dir_from_offset(i, seg);
708 }
709
710 return 0;
711 }
712
713 int
compare_ushort(void const * a,void const * b)714 compare_ushort(void const *a, void const *b)
715 {
716 return *static_cast<unsigned short const *>(a) - *static_cast<unsigned short const *>(b);
717 }
718
719 void
dir_check()720 Stripe::dir_check()
721 {
722 static int const SEGMENT_HISTOGRAM_WIDTH = 16;
723 int hist[SEGMENT_HISTOGRAM_WIDTH + 1] = {0};
724 unsigned short chain_tag[MAX_ENTRIES_PER_SEGMENT];
725 int32_t chain_mark[MAX_ENTRIES_PER_SEGMENT];
726
727 this->loadMeta();
728 this->loadDir();
729 // uint64_t total_buckets = _segments * _buckets;
730 // uint64_t total_entries = total_buckets * DIR_DEPTH;
731 int frag_demographics[1 << DIR_SIZE_WIDTH][DIR_BLOCK_SIZES];
732 int j;
733 int stale = 0, in_use = 0, empty = 0;
734 int free = 0, head = 0, buckets_in_use = 0;
735
736 int max_chain_length = 0;
737 int64_t bytes_in_use = 0;
738 std::cout << "Stripe '[" << hashText << "]'" << std::endl;
739 std::cout << " Directory Bytes: " << _segments * _buckets * SIZEOF_DIR << std::endl;
740 std::cout << " Segments: " << _segments << std::endl;
741 std::cout << " Buckets per segment: " << _buckets << std::endl;
742 std::cout << " Entries: " << _segments * _buckets * DIR_DEPTH << std::endl;
743 for (int s = 0; s < _segments; s++) {
744 CacheDirEntry *seg = this->dir_segment(s);
745 int seg_chain_max = 0;
746 int seg_empty = 0;
747 int seg_in_use = 0;
748 int seg_stale = 0;
749 int seg_bytes_in_use = 0;
750 int seg_dups = 0;
751 int seg_buckets_in_use = 0;
752
753 ink_zero(chain_tag);
754 memset(chain_mark, -1, sizeof(chain_mark));
755 for (int b = 0; b < _buckets; b++) {
756 CacheDirEntry *root = dir_bucket(b, seg);
757 int h = 0;
758 int chain_idx = 0;
759 int mark = 0;
760 ++seg_buckets_in_use;
761 // walking through the directories
762 for (CacheDirEntry *e = root; e; e = next_dir(e, seg)) {
763 if (!dir_offset(e)) {
764 ++seg_empty;
765 --seg_buckets_in_use;
766 // this should only happen on the first dir in a bucket
767 assert(nullptr == next_dir(e, seg));
768 break;
769 } else {
770 int e_idx = e - seg;
771 ++h;
772 chain_tag[chain_idx++] = dir_tag(e);
773 if (chain_mark[e_idx] == mark) {
774 printf(" - Cycle of length %d detected for bucket %d\n", h, b);
775 } else if (chain_mark[e_idx] >= 0) {
776 printf(" - Entry %d is in chain %d and %d", e_idx, chain_mark[e_idx], mark);
777 } else {
778 chain_mark[e_idx] = mark;
779 }
780
781 if (!dir_valid(e)) {
782 ++seg_stale;
783 } else {
784 uint64_t size = dir_approx_size(e);
785 if (dir_head(e)) {
786 ++head;
787 }
788 ++seg_in_use;
789 seg_bytes_in_use += size;
790 ++frag_demographics[dir_size(e)][dir_big(e)];
791 }
792 }
793 e = next_dir(e, seg);
794 if (!e) {
795 break;
796 }
797 }
798
799 // Check for duplicates (identical tags in the same bucket).
800 if (h > 1) {
801 unsigned short last;
802 qsort(chain_tag, h, sizeof(chain_tag[0]), &compare_ushort);
803 last = chain_tag[0];
804 for (int k = 1; k < h; ++k) {
805 if (last == chain_tag[k]) {
806 ++seg_dups;
807 }
808 last = chain_tag[k];
809 }
810 }
811 ++hist[std::min(h, SEGMENT_HISTOGRAM_WIDTH)];
812 seg_chain_max = std::max(seg_chain_max, h);
813 }
814 int fl_size = dir_freelist_length(s);
815 in_use += seg_in_use;
816 empty += seg_empty;
817 stale += seg_stale;
818 free += fl_size;
819 buckets_in_use += seg_buckets_in_use;
820 max_chain_length = std::max(max_chain_length, seg_chain_max);
821 bytes_in_use += seg_bytes_in_use;
822
823 printf(" - Segment-%d | Entries: used=%d stale=%d free=%d disk-bytes=%d Buckets: used=%d empty=%d max=%d avg=%.2f dups=%d\n",
824 s, seg_in_use, seg_stale, fl_size, seg_bytes_in_use, seg_buckets_in_use, seg_empty, seg_chain_max,
825 seg_buckets_in_use ? static_cast<float>(seg_in_use + seg_stale) / seg_buckets_in_use : 0.0, seg_dups);
826 }
827 //////////////////
828
829 printf(" - Stripe | Entries: in-use=%d stale=%d free=%d Buckets: empty=%d max=%d avg=%.2f\n", in_use, stale, free, empty,
830 max_chain_length, buckets_in_use ? static_cast<float>(in_use + stale) / buckets_in_use : 0);
831
832 printf(" Chain lengths: ");
833 for (j = 0; j < SEGMENT_HISTOGRAM_WIDTH; ++j) {
834 printf(" %d=%d ", j, hist[j]);
835 }
836 printf(" %d>=%d\n", SEGMENT_HISTOGRAM_WIDTH, hist[SEGMENT_HISTOGRAM_WIDTH]);
837
838 char tt[256];
839 printf(" Total Size: %" PRIu64 "\n", static_cast<uint64_t>(_len.count()));
840 printf(" Bytes in Use: %" PRIu64 " [%0.2f%%]\n", bytes_in_use, 100.0 * (static_cast<float>(bytes_in_use) / _len.count()));
841 printf(" Objects: %d\n", head);
842 printf(" Average Size: %" PRIu64 "\n", head ? (bytes_in_use / head) : 0);
843 printf(" Average Frags: %.2f\n", head ? static_cast<float>(in_use) / head : 0);
844 printf(" Write Position: %" PRIu64 "\n", _meta[0][0].write_pos - _content.count());
845 printf(" Wrap Count: %d\n", _meta[0][0].cycle);
846 printf(" Phase: %s\n", _meta[0][0].phase ? "true" : "false");
847 ctime_r(&_meta[0][0].create_time, tt);
848 tt[strlen(tt) - 1] = 0;
849 printf(" Sync Serial: %u\n", _meta[0][0].sync_serial);
850 printf(" Write Serial: %u\n", _meta[0][0].write_serial);
851 printf(" Create Time: %s\n", tt);
852 printf("\n");
853 printf(" Fragment size demographics\n");
854 for (int b = 0; b < DIR_BLOCK_SIZES; ++b) {
855 int block_size = DIR_BLOCK_SIZE(b);
856 int s = 0;
857 while (s < 1 << DIR_SIZE_WIDTH) {
858 for (int j = 0; j < 8; ++j, ++s) {
859 // The size markings are redundant. Low values (less than DIR_SHIFT_WIDTH) for larger
860 // base block sizes should never be used. Such entries should use the next smaller base block size.
861 if (b > 0 && s < 1 << DIR_BLOCK_SHIFT(1)) {
862 assert(frag_demographics[s][b] == 0);
863 continue;
864 }
865 printf(" %8d[%2d:%1d]:%06d", (s + 1) * block_size, s, b, frag_demographics[s][b]);
866 }
867 printf("\n");
868 }
869 }
870 printf("\n");
871 ////////////////
872 }
873
874 Errata
loadMeta()875 Stripe::loadMeta()
876 {
877 // Read from disk in chunks of this size. This needs to be a multiple of both the
878 // store block size and the directory entry size so neither goes across read boundaries.
879 // Beyond that the value should be in the ~10MB range for what I guess is best performance
880 // vs. blocking production disk I/O on a live system.
881 constexpr static int64_t N = (1 << 8) * CacheStoreBlocks::SCALE * sizeof(CacheDirEntry);
882
883 Errata zret;
884
885 int fd = _span->_fd;
886 Bytes n;
887 bool found;
888 MemSpan<void> data; // The current view of the read buffer.
889 Bytes delta;
890 Bytes pos = _start;
891 // Avoid searching the entire span, because some of it must be content. Assume that AOS is more than 160
892 // which means at most 10/160 (1/16) of the span can be directory/header.
893 Bytes limit = pos + _len / 16;
894 size_t io_align = _span->_geometry.blocksz;
895 StripeMeta const *meta;
896
897 std::unique_ptr<char> bulk_buff; // Buffer for bulk reads.
898 static const size_t SBSIZE = CacheStoreBlocks::SCALE; // save some typing.
899 alignas(SBSIZE) char stripe_buff[SBSIZE]; // Use when reading a single stripe block.
900 alignas(SBSIZE) char stripe_buff2[SBSIZE]; // use to save the stripe freelist
901 if (io_align > SBSIZE) {
902 return Errata::Message(0, 1, "Cannot load stripe ", _idx, " on span ", _span->_path.string(),
903 " because the I/O block alignment ", io_align, " is larger than the buffer alignment ", SBSIZE);
904 }
905
906 _directory._start = pos;
907 // Header A must be at the start of the stripe block.
908 // Todo: really need to check pread() for failure.
909 ssize_t headerbyteCount = pread(fd, stripe_buff2, SBSIZE, pos);
910 n.assign(headerbyteCount);
911 data.assign(stripe_buff2, n);
912 meta = static_cast<StripeMeta *>(data.data());
913 // TODO:: We need to read more data at this point to populate dir
914 if (this->validateMeta(meta)) {
915 delta = Bytes(data.rebind<char>().data() - stripe_buff2);
916 _meta[A][HEAD] = *meta;
917 _meta_pos[A][HEAD] = round_down(pos + Bytes(delta));
918 pos += round_up(SBSIZE);
919 _directory._skip = Bytes(SBSIZE); // first guess, updated in @c updateLiveData when the header length is computed.
920 // Search for Footer A. Nothing for it except to grub through the disk.
921 // The searched data is cached so it's available for directory parsing later if needed.
922 while (pos < limit) {
923 char *buff = static_cast<char *>(ats_memalign(io_align, N));
924 bulk_buff.reset(buff);
925 n.assign(pread(fd, buff, N, pos));
926 data.assign(buff, n);
927 found = this->probeMeta(data, &_meta[A][HEAD]);
928 if (found) {
929 ptrdiff_t diff = data.rebind<char>().data() - buff;
930 _meta[A][FOOT] = static_cast<StripeMeta *>(data.data())[0];
931 _meta_pos[A][FOOT] = round_down(pos + Bytes(diff));
932 // don't bother attaching block if the footer is at the start
933 if (diff > 0) {
934 _directory._clip = Bytes(N - diff);
935 _directory.append({bulk_buff.release(), N});
936 }
937 data.remove_prefix(SBSIZE); // skip footer for checking on B copy.
938 break;
939 } else {
940 _directory.append({bulk_buff.release(), N});
941 pos += round_up(N);
942 }
943 }
944 } else {
945 zret.push(0, 1, "Header A not found");
946 }
947 pos = _meta_pos[A][FOOT];
948 // Technically if Copy A is valid, Copy B is not needed. But at this point it's cheap to retrieve
949 // (as the exact offset is computable).
950 if (_meta_pos[A][FOOT] > 0) {
951 delta = _meta_pos[A][FOOT] - _meta_pos[A][HEAD];
952 // Header B should be immediately after Footer A. If at the end of the last read,
953 // do another read.
954 pos = this->_start + Bytes(vol_dirlen());
955 meta = static_cast<StripeMeta *>(data.data());
956 if (this->validateMeta(meta)) {
957 _meta[B][HEAD] = *meta;
958 _meta_pos[B][HEAD] = round_down(pos);
959
960 // Footer B must be at the same relative offset to Header B as Footer A -> Header A.
961 pos += delta;
962 n = Bytes(pread(fd, stripe_buff, ts::CacheStoreBlocks::SCALE, pos));
963 data.assign(stripe_buff, n);
964 meta = static_cast<StripeMeta *>(data.data());
965 if (this->validateMeta(meta)) {
966 _meta[B][FOOT] = *meta;
967 _meta_pos[B][FOOT] = round_down(pos);
968 }
969 }
970 }
971
972 if (_meta_pos[A][FOOT] > 0) {
973 if (_meta[A][HEAD].sync_serial == _meta[A][FOOT].sync_serial &&
974 (0 == _meta_pos[B][FOOT] || _meta[B][HEAD].sync_serial != _meta[B][FOOT].sync_serial ||
975 _meta[A][HEAD].sync_serial >= _meta[B][HEAD].sync_serial)) {
976 this->updateLiveData(A);
977 } else if (_meta_pos[B][FOOT] > 0 && _meta[B][HEAD].sync_serial == _meta[B][FOOT].sync_serial) {
978 this->updateLiveData(B);
979 } else {
980 zret.push(0, 1, "Invalid stripe data - candidates found but sync serial data not valid. ", _meta[A][HEAD].sync_serial, ":",
981 _meta[A][FOOT].sync_serial, ":", _meta[B][HEAD].sync_serial, ":", _meta[B][FOOT].sync_serial);
982 }
983 }
984
985 n.assign(headerbyteCount);
986 data.assign(stripe_buff2, n);
987 meta = static_cast<StripeMeta *>(data.data());
988 // copy freelist
989 freelist = static_cast<uint16_t *>(malloc(_segments * sizeof(uint16_t)));
990 for (int i = 0; i < _segments; i++) {
991 freelist[i] = meta->freelist[i];
992 }
993
994 if (!zret) {
995 _directory.clear();
996 }
997 return zret;
998 }
999
1000 } // namespace ct
1001