1 /** @file
2 
3     Main program file for Cache Tool.
4 
5     @section license License
6 
7     Licensed to the Apache Software Foundation (ASF) under one
8     or more contributor license agreements.  See the NOTICE file
9     distributed with this work for additional information
10     regarding copyright ownership.  The ASF licenses this file
11     to you under the Apache License, Version 2.0 (the
12     "License"); you may not use this file except in compliance
13     with the License.  You may obtain a copy of the License at
14 
15     http://www.apache.org/licenses/LICENSE-2.0
16 
17     Unless required by applicable law or agreed to in writing, software
18     distributed under the License is distributed on an "AS IS" BASIS,
19     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20     See the License for the specific language governing permissions and
21     limitations under the License.
22  */
23 
24 #include "CacheDefs.h"
25 #include <iostream>
26 #include <fcntl.h>
27 
28 using namespace std;
29 using namespace ts;
30 
31 using ts::Errata;
32 namespace ts
33 {
34 std::ostream &
operator <<(std::ostream & s,Bytes const & n)35 operator<<(std::ostream &s, Bytes const &n)
36 {
37   return s << n.count() << " bytes";
38 }
39 std::ostream &
operator <<(std::ostream & s,Kilobytes const & n)40 operator<<(std::ostream &s, Kilobytes const &n)
41 {
42   return s << n.count() << " KB";
43 }
44 std::ostream &
operator <<(std::ostream & s,Megabytes const & n)45 operator<<(std::ostream &s, Megabytes const &n)
46 {
47   return s << n.count() << " MB";
48 }
49 std::ostream &
operator <<(std::ostream & s,Gigabytes const & n)50 operator<<(std::ostream &s, Gigabytes const &n)
51 {
52   return s << n.count() << " GB";
53 }
54 std::ostream &
operator <<(std::ostream & s,Terabytes const & n)55 operator<<(std::ostream &s, Terabytes const &n)
56 {
57   return s << n.count() << " TB";
58 }
59 
60 std::ostream &
operator <<(std::ostream & s,CacheStripeBlocks const & n)61 operator<<(std::ostream &s, CacheStripeBlocks const &n)
62 {
63   return s << n.count() << " stripe blocks";
64 }
65 std::ostream &
operator <<(std::ostream & s,CacheStoreBlocks const & n)66 operator<<(std::ostream &s, CacheStoreBlocks const &n)
67 {
68   return s << n.count() << " store blocks";
69 }
70 std::ostream &
operator <<(std::ostream & s,CacheDataBlocks const & n)71 operator<<(std::ostream &s, CacheDataBlocks const &n)
72 {
73   return s << n.count() << " data blocks";
74 }
75 
76 Errata
parseURL(TextView URI)77 URLparser::parseURL(TextView URI)
78 {
79   Errata zret;
80   static const TextView HTTP("http");
81   static const TextView HTTPS("https");
82   TextView scheme = URI.take_prefix_at(':');
83   if ((strcasecmp(scheme, HTTP) == 0) || (strcasecmp(scheme, HTTPS) == 0)) {
84     TextView hostname = URI.take_prefix_at(':');
85     if (!hostname) // i.e. port not present
86     {
87     }
88   }
89 
90   return zret;
91 }
92 
93 int
getPort(std::string & fullURL,int & port_ptr,int & port_len)94 URLparser::getPort(std::string &fullURL, int &port_ptr, int &port_len)
95 {
96   url_matcher matcher;
97   int n_port = -1;
98   int u_pos  = -1;
99 
100   if (fullURL.find("https") == 0) {
101     u_pos  = 8;
102     n_port = 443;
103   } else if (fullURL.find("http") == 0) {
104     u_pos  = 7;
105     n_port = 80;
106   }
107   if (u_pos != -1) {
108     fullURL.insert(u_pos, ":@");
109     TextView url(fullURL.data(), static_cast<int>(fullURL.size()));
110 
111     url += 9;
112 
113     TextView hostPort = url.take_prefix_at(':');
114     if (!hostPort.empty()) // i.e. port is present
115     {
116       TextView port = url.take_prefix_at('/');
117       if (port.empty()) { // i.e. backslash is not present, then the rest of the url must be just port
118         port = url;
119       }
120       if (matcher.portmatch(port.data(), port.size())) {
121         TextView text;
122         n_port = svtoi(port, &text);
123         if (text == port) {
124           port_ptr = fullURL.find(':', 9);
125           port_len = port.size();
126           return n_port;
127         }
128       }
129     }
130     return n_port;
131   } else {
132     std::cout << "No scheme provided for: " << fullURL << std::endl;
133     return -1;
134   }
135 }
136 
137 uint32_t
prefix_len()138 Doc::prefix_len()
139 {
140   return sizeof(Doc) + hlen;
141 }
142 
143 uint32_t
data_len()144 Doc::data_len()
145 {
146   return len - sizeof(Doc) - hlen;
147 }
148 
149 int
single_fragment()150 Doc::single_fragment()
151 {
152   return data_len() == total_len;
153 }
154 
155 char *
hdr()156 Doc::hdr()
157 {
158   return reinterpret_cast<char *>(this) + sizeof(Doc);
159 }
160 
161 char *
data()162 Doc::data()
163 {
164   return this->hdr() + hlen;
165 }
166 } // end namespace ts
167 
168 int cache_config_min_average_object_size = ESTIMATED_OBJECT_SIZE;
169 CacheStoreBlocks Vol_hash_alloc_size(1024);
170 // Default this to read only, only enable write if specifically required.
171 int OPEN_RW_FLAG = O_RDONLY;
172 
173 namespace ct
174 {
175 bool
validate_sync_serial()176 Stripe::validate_sync_serial()
177 {
178   // check if A sync_serials match and A is at least as updated as B
179   return (_meta[0][0].sync_serial == _meta[0][1].sync_serial &&
180           (_meta[0][0].sync_serial >= _meta[1][0].sync_serial ||
181            _meta[1][0].sync_serial != _meta[1][1].sync_serial)) || // OR check if B's sync_serials match
182          (_meta[1][0].sync_serial == _meta[1][1].sync_serial);
183 }
184 
185 Errata
clear()186 Stripe::clear()
187 {
188   Errata zret;
189   alignas(512) static char zero[CacheStoreBlocks::SCALE]; // should be all zero, it's static.
190   for (auto i : {A, B}) {
191     for (auto j : {HEAD, FOOT}) {
192       ssize_t n = pwrite(_span->_fd, zero, CacheStoreBlocks::SCALE, this->_meta_pos[i][j]);
193       if (n < CacheStoreBlocks::SCALE) {
194         std::cout << "Failed to clear stripe header" << std::endl;
195       }
196     }
197   }
198 
199   return zret;
200 }
~Chunk()201 Stripe::Chunk::~Chunk()
202 {
203   this->clear();
204 }
205 void
append(MemSpan<void> m)206 Stripe::Chunk::append(MemSpan<void> m)
207 {
208   _chain.push_back(m);
209 }
210 void
clear()211 Stripe::Chunk::clear()
212 {
213   for (auto &m : _chain) {
214     free(const_cast<void *>(m.data()));
215   }
216   _chain.clear();
217 }
218 
Stripe(Span * span,const Bytes & start,const CacheStoreBlocks & len)219 Stripe::Stripe(Span *span, const Bytes &start, const CacheStoreBlocks &len) : _span(span), _start(start), _len(len)
220 {
221   ts::bwprint(hashText, "{} {}:{}", span->_path.view(), _start.count(), _len.count());
222   CryptoContext().hash_immediate(hash_id, hashText.data(), static_cast<int>(hashText.size()));
223   printf("hash id of stripe is hash of %.*s\n", static_cast<int>(hashText.size()), hashText.data());
224 }
225 
226 bool
isFree() const227 Stripe::isFree() const
228 {
229   return 0 == _vol_idx;
230 }
231 
232 // TODO: Implement the whole logic
233 Errata
InitializeMeta()234 Stripe::InitializeMeta()
235 {
236   Errata zret;
237   // memset(this->raw_dir, 0, dir_len);
238   for (auto &i : _meta) {
239     for (auto &j : i) {
240       j.magic          = StripeMeta::MAGIC;
241       j.version._major = ts::CACHE_DB_MAJOR_VERSION;
242       j.version._minor = ts::CACHE_DB_MINOR_VERSION;
243       j.agg_pos = j.last_write_pos = j.write_pos = this->_content;
244       j.phase = j.cycle = j.sync_serial = j.write_serial = j.dirty = 0;
245       j.create_time                                                = time(nullptr);
246       j.sector_size                                                = DEFAULT_HW_SECTOR_SIZE;
247     }
248   }
249   if (!freelist) // freelist is not allocated yet
250   {
251     freelist = static_cast<uint16_t *>(malloc(_segments * sizeof(uint16_t))); // segments has already been calculated
252   }
253   if (!dir) // for new spans, this will likely be nullptr as we don't need to read the stripe meta from disk
254   {
255     char *raw_dir = static_cast<char *>(ats_memalign(ats_pagesize(), this->vol_dirlen()));
256     dir           = reinterpret_cast<CacheDirEntry *>(raw_dir + this->vol_headerlen());
257   }
258   init_dir();
259   return zret;
260 }
261 
262 // Need to be bit more robust at some point.
263 bool
validateMeta(StripeMeta const * meta)264 Stripe::validateMeta(StripeMeta const *meta)
265 {
266   // Need to be bit more robust at some point.
267   return StripeMeta::MAGIC == meta->magic && meta->version._major <= ts::CACHE_DB_MAJOR_VERSION &&
268          meta->version._minor <= 2 // This may have always been zero, actually.
269     ;
270 }
271 
272 bool
probeMeta(MemSpan<void> & mem,StripeMeta const * base_meta)273 Stripe::probeMeta(MemSpan<void> &mem, StripeMeta const *base_meta)
274 {
275   while (mem.size() >= sizeof(StripeMeta)) {
276     StripeMeta const *meta = static_cast<StripeMeta *>(mem.data());
277     if (this->validateMeta(meta) && (base_meta == nullptr ||               // no base version to check against.
278                                      (meta->version == base_meta->version) // need more checks here I think.
279                                      )) {
280       return true;
281     }
282     // The meta data is stored aligned on a stripe block boundary, so only need to check there.
283     mem.remove_prefix(CacheStoreBlocks::SCALE);
284   }
285   return false;
286 }
287 
288 Errata
updateHeaderFooter()289 Stripe::updateHeaderFooter()
290 {
291   Errata zret;
292   this->vol_init_data();
293 
294   int64_t hdr_size    = this->vol_headerlen();
295   int64_t dir_size    = this->vol_dirlen();
296   Bytes footer_offset = Bytes(dir_size - ROUND_TO_STORE_BLOCK(sizeof(StripeMeta)));
297   _meta_pos[A][HEAD]  = round_down(_start);
298   _meta_pos[A][FOOT]  = round_down(_start + footer_offset);
299   _meta_pos[B][HEAD]  = round_down(this->_start + Bytes(dir_size));
300   _meta_pos[B][FOOT]  = round_down(this->_start + Bytes(dir_size) + footer_offset);
301   std::cout << "updating header " << _meta_pos[0][0] << std::endl;
302   std::cout << "updating header " << _meta_pos[0][1] << std::endl;
303   std::cout << "updating header " << _meta_pos[1][0] << std::endl;
304   std::cout << "updating header " << _meta_pos[1][1] << std::endl;
305   InitializeMeta();
306 
307   if (!OPEN_RW_FLAG) {
308     zret.push(0, 1, "Writing Not Enabled.. Please use --write to enable writing to disk");
309     return zret;
310   }
311 
312   char *meta_t = static_cast<char *>(ats_memalign(ats_pagesize(), dir_size));
313   // copy headers
314   for (auto i : {A, B}) {
315     // copy header
316     memcpy(meta_t, &_meta[i][HEAD], sizeof(StripeMeta));
317     // copy freelist
318     memcpy(meta_t + sizeof(StripeMeta) - sizeof(uint16_t), this->freelist, this->_segments * sizeof(uint16_t));
319 
320     ssize_t n = pwrite(_span->_fd, meta_t, hdr_size, _meta_pos[i][HEAD]);
321     if (n < hdr_size) {
322       std::cout << "problem writing header to disk: " << strerror(errno) << ":"
323                 << " " << n << "<" << hdr_size << std::endl;
324       zret = Errata::Message(0, errno, "Failed to write stripe header ");
325       ats_free(meta_t);
326       return zret;
327     }
328     // copy dir entries
329     dir_size = dir_size - hdr_size - ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
330     memcpy(meta_t, (char *)dir, dir_size);
331     n = pwrite(_span->_fd, meta_t, dir_size, _meta_pos[i][HEAD] + hdr_size); //
332     if (n < dir_size) {
333       std::cout << "problem writing dir to disk: " << strerror(errno) << ":"
334                 << " " << n << "<" << dir_size << std::endl;
335       zret = Errata::Message(0, errno, "Failed to write stripe header ");
336       ats_free(meta_t);
337       return zret;
338     }
339 
340     // copy footer,
341     memcpy(meta_t, &_meta[i][FOOT], sizeof(StripeMeta));
342 
343     int64_t footer_size = ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
344     n                   = pwrite(_span->_fd, meta_t, footer_size, _meta_pos[i][FOOT]);
345     if (n < footer_size) {
346       std::cout << "problem writing footer to disk: " << strerror(errno) << ":"
347                 << " " << n << "<" << footer_size << std::endl;
348       zret = Errata::Message(0, errno, "Failed to write stripe header ");
349       ats_free(meta_t);
350       return zret;
351     }
352   }
353   ats_free(meta_t);
354   return zret;
355 }
356 
357 size_t
vol_dirlen()358 Stripe::vol_dirlen()
359 {
360   return vol_headerlen() + ROUND_TO_STORE_BLOCK(((size_t)this->_buckets) * DIR_DEPTH * this->_segments * SIZEOF_DIR) +
361          ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
362 }
363 
364 void
vol_init_data_internal()365 Stripe::vol_init_data_internal()
366 {
367   this->_buckets =
368     ((this->_len.count() * 8192 - (this->_content - this->_start)) / cache_config_min_average_object_size) / DIR_DEPTH;
369   this->_segments = (this->_buckets + (((1 << 16) - 1) / DIR_DEPTH)) / ((1 << 16) / DIR_DEPTH);
370   this->_buckets  = (this->_buckets + this->_segments - 1) / this->_segments;
371   this->_content  = this->_start + Bytes(2 * vol_dirlen());
372 }
373 
374 void
vol_init_data()375 Stripe::vol_init_data()
376 {
377   // iteratively calculate start + buckets
378   this->vol_init_data_internal();
379   this->vol_init_data_internal();
380   this->vol_init_data_internal();
381 }
382 
383 void
updateLiveData(enum Copy c)384 Stripe::updateLiveData(enum Copy c)
385 {
386   //  CacheStoreBlocks delta{_meta_pos[c][FOOT] - _meta_pos[c][HEAD]};
387   CacheStoreBlocks header_len(0);
388   //  int64_t n_buckets;
389   //  int64_t n_segments;
390 
391   /*
392    * COMMENTING THIS SECTION FOR NOW TO USE THE EXACT LOGIN USED IN ATS TO CALCULATE THE NUMBER OF SEGMENTS AND BUCKETS
393   // Past the header is the segment free list heads which if sufficiently long (> ~4K) can take
394   // more than 1 store block. Start with a guess of 1 and adjust upwards as needed. A 2TB stripe
395   // with an AOS of 8000 has roughly 3700 segments meaning that for even 10TB drives this loop
396   // should only be a few iterations.
397   do {
398     ++header_len;
399     n_buckets  = Bytes(delta - header_len) / (sizeof(CacheDirEntry) * ts::ENTRIES_PER_BUCKET);
400     n_segments = n_buckets / ts::MAX_BUCKETS_PER_SEGMENT;
401     // This should never be more than one loop, usually none.
402     while ((n_buckets / n_segments) > ts::MAX_BUCKETS_PER_SEGMENT)
403       ++n_segments;
404   } while ((sizeof(StripeMeta) + sizeof(uint16_t) * n_segments) > static_cast<size_t>(header_len));
405 
406   _buckets         = n_buckets / n_segments;
407   _segments        = n_segments;
408   */
409   _directory._skip = header_len;
410 }
411 
412 bool
dir_compare_tag(const CacheDirEntry * e,const CryptoHash * key)413 dir_compare_tag(const CacheDirEntry *e, const CryptoHash *key)
414 {
415   return (dir_tag(e) == DIR_MASK_TAG(key->slice32(2)));
416 }
417 
418 int
vol_in_phase_valid(Stripe * d,CacheDirEntry * e)419 vol_in_phase_valid(Stripe *d, CacheDirEntry *e)
420 {
421   return (dir_offset(e) - 1 < ((d->_meta[0][0].write_pos + d->agg_buf_pos - d->_start) / CACHE_BLOCK_SIZE));
422 }
423 
424 int
vol_out_of_phase_valid(Stripe * d,CacheDirEntry * e)425 vol_out_of_phase_valid(Stripe *d, CacheDirEntry *e)
426 {
427   return (dir_offset(e) - 1 >= ((d->_meta[0][0].agg_pos - d->_start) / CACHE_BLOCK_SIZE));
428 }
429 
430 bool
dir_valid(CacheDirEntry * _e)431 Stripe::dir_valid(CacheDirEntry *_e)
432 {
433   return (this->_meta[0][0].phase == dir_phase(_e) ? vol_in_phase_valid(this, _e) : vol_out_of_phase_valid(this, _e));
434 }
435 
436 Bytes
stripe_offset(CacheDirEntry * e)437 Stripe::stripe_offset(CacheDirEntry *e)
438 {
439   return this->_content + Bytes((dir_offset(e) * CACHE_BLOCK_SIZE) - CACHE_BLOCK_SIZE);
440 }
441 
442 int
dir_probe(CryptoHash * key,CacheDirEntry * result,CacheDirEntry ** last_collision)443 Stripe::dir_probe(CryptoHash *key, CacheDirEntry *result, CacheDirEntry **last_collision)
444 {
445   int segment = key->slice32(0) % this->_segments;
446   int bucket  = key->slice32(1) % this->_buckets;
447 
448   CacheDirEntry *seg = this->dir_segment(segment);
449   CacheDirEntry *e   = nullptr;
450   e                  = dir_bucket(bucket, seg);
451   char *stripe_buff2 = nullptr;
452   Doc *doc           = nullptr;
453   // TODO: collision craft is pending.. look at the main ATS code. Assuming no collision for now
454   if (dir_offset(e)) {
455     do {
456       if (dir_compare_tag(e, key)) {
457         if (dir_valid(e)) {
458           stripe_buff2 = static_cast<char *>(ats_memalign(ats_pagesize(), dir_approx_size(e)));
459           std::cout << "dir_probe hit: found seg: " << segment << " bucket: " << bucket << " offset: " << dir_offset(e)
460                     << "size: " << dir_approx_size(e) << std::endl;
461           break;
462         } else {
463           // let's skip deleting for now
464           // e = dir_delete_entry(e, p ,segment);
465           // continue;
466         }
467       }
468       e = next_dir(e, seg);
469 
470     } while (e);
471     if (e == nullptr) {
472       std::cout << "No directory entry found matching the URL key" << std::endl;
473       return 0;
474     }
475     int fd       = _span->_fd;
476     Bytes offset = stripe_offset(e);
477     int64_t size = dir_approx_size(e);
478     ssize_t n    = pread(fd, stripe_buff2, size, offset);
479     if (n < size) {
480       std::cout << "Failed to read content from the Stripe:" << strerror(errno) << std::endl;
481     }
482 
483     doc = reinterpret_cast<Doc *>(stripe_buff2);
484     std::string hdr(doc->hdr(), doc->hlen);
485 
486     std::string data_(doc->data(), doc->data_len());
487     std::cout << "DATA\n" << data_ << std::endl;
488   } else {
489     std::cout << "Not found in the Cache" << std::endl;
490   }
491   free(stripe_buff2);
492   return 0; // Why does this have a non-void return?
493 }
494 
495 CacheDirEntry *
dir_delete_entry(CacheDirEntry * e,CacheDirEntry * p,int s)496 Stripe::dir_delete_entry(CacheDirEntry *e, CacheDirEntry *p, int s)
497 {
498   CacheDirEntry *seg      = this->dir_segment(s);
499   int no                  = dir_next(e);
500   this->_meta[0][0].dirty = 1;
501   if (p) {
502     unsigned int fo = this->freelist[s];
503     unsigned int eo = dir_to_offset(e, seg);
504     dir_clear(e);
505     dir_set_next(p, no);
506     dir_set_next(e, fo);
507     if (fo) {
508       dir_set_prev(dir_from_offset(fo, seg), eo);
509     }
510     this->freelist[s] = eo;
511   } else {
512     CacheDirEntry *n = next_dir(e, seg);
513     if (n) {
514       dir_assign(e, n);
515       dir_delete_entry(n, e, s);
516       return e;
517     } else {
518       dir_clear(e);
519       return nullptr;
520     }
521   }
522   return dir_from_offset(no, seg);
523 }
524 
525 void
walk_all_buckets()526 Stripe::walk_all_buckets()
527 {
528   for (int s = 0; s < this->_segments; s++) {
529     if (walk_bucket_chain(s)) {
530       std::cout << "Loop present in Segment " << s << std::endl;
531     }
532   }
533 }
534 
535 bool
walk_bucket_chain(int s)536 Stripe::walk_bucket_chain(int s)
537 {
538   CacheDirEntry *seg = this->dir_segment(s);
539   std::bitset<65536> b_bitset;
540   b_bitset.reset();
541   for (int b = 0; b < this->_buckets; b++) {
542     CacheDirEntry *p = nullptr;
543     auto *dir_b      = dir_bucket(b, seg);
544     CacheDirEntry *e = dir_b;
545     int len          = 0;
546 
547     while (e) {
548       len++;
549       int i = dir_to_offset(e, seg);
550       if (b_bitset.test(i)) {
551         std::cout << "bit already set in "
552                   << "seg " << s << " bucket " << b << std::endl;
553       }
554       if (i > 0) { // i.e., not the first dir in the segment
555         b_bitset[i] = true;
556       }
557 
558 #if 1
559       if (!dir_valid(e) || !dir_offset(e)) {
560         // std::cout<<"dir_clean in segment "<<s<<" =>cleaning "<<e<<" tag"<<dir_tag(e)<<" boffset"<< dir_offset(e)<< " bucket:
561         // "<<dir_b<< " bucket len: "<<dir_bucket_length(dir_b, s)<<std::endl;
562         e = dir_delete_entry(e, p, s);
563         continue;
564       }
565 #endif
566       p = e;
567       e = next_dir(e, seg);
568     }
569     //    std::cout<<"dir len in this bucket "<<len<<std::endl;
570   }
571   return false;
572 }
573 
574 void
dir_free_entry(CacheDirEntry * e,int s)575 Stripe::dir_free_entry(CacheDirEntry *e, int s)
576 {
577   CacheDirEntry *seg = this->dir_segment(s);
578   unsigned int fo    = this->freelist[s];
579   unsigned int eo    = dir_to_offset(e, seg);
580   dir_set_next(e, fo);
581   if (fo) {
582     dir_set_prev(dir_from_offset(fo, seg), eo);
583   }
584   this->freelist[s] = eo;
585 }
586 
587 // adds all the directory entries
588 // in a segment to the segment freelist
589 void
dir_init_segment(int s)590 Stripe::dir_init_segment(int s)
591 {
592   this->freelist[s]  = 0;
593   CacheDirEntry *seg = this->dir_segment(s);
594   int l, b;
595   memset(seg, 0, SIZEOF_DIR * DIR_DEPTH * this->_buckets);
596   for (l = 1; l < DIR_DEPTH; l++) {
597     for (b = 0; b < this->_buckets; b++) {
598       CacheDirEntry *bucket = dir_bucket(b, seg);
599       this->dir_free_entry(dir_bucket_row(bucket, l), s);
600     }
601   }
602 }
603 
604 void
init_dir()605 Stripe::init_dir()
606 {
607   for (int s = 0; s < this->_segments; s++) {
608     this->freelist[s]  = 0;
609     CacheDirEntry *seg = this->dir_segment(s);
610     int l, b;
611     for (l = 1; l < DIR_DEPTH; l++) {
612       for (b = 0; b < this->_buckets; b++) {
613         CacheDirEntry *bucket = dir_bucket(b, seg);
614         this->dir_free_entry(dir_bucket_row(bucket, l), s);
615         // std::cout<<"freelist"<<this->freelist[s]<<std::endl;
616       }
617     }
618   }
619 }
620 
621 Errata
loadDir()622 Stripe::loadDir()
623 {
624   Errata zret;
625   int64_t dirlen = this->vol_dirlen();
626   char *raw_dir  = static_cast<char *>(ats_memalign(ats_pagesize(), dirlen));
627   dir            = reinterpret_cast<CacheDirEntry *>(raw_dir + this->vol_headerlen());
628   // read directory
629   ssize_t n = pread(this->_span->_fd, raw_dir, dirlen, this->_start);
630   if (n < dirlen) {
631     std::cout << "Failed to read Dir from stripe @" << this->hashText;
632   }
633   return zret;
634 }
635 //
636 // Cache Directory
637 //
638 
639 #if 0
640 // return value 1 means no loop
641 // zero indicates loop
642 int
643 dir_bucket_loop_check(CacheDirEntry *start_dir, CacheDirEntry *seg)
644 {
645   if (start_dir == nullptr) {
646     return 1;
647   }
648 
649   CacheDirEntry *p1 = start_dir;
650   CacheDirEntry *p2 = start_dir;
651 
652   while (p2) {
653     // p1 moves by one entry per iteration
654     assert(p1);
655     p1 = next_dir(p1, seg);
656     // p2 moves by two entries per iteration
657     p2 = next_dir(p2, seg);
658     if (p2) {
659       p2 = next_dir(p2, seg);
660     } else {
661       return 1;
662     }
663 
664     if (p2 == p1) {
665       return 0; // we have a loop
666     }
667   }
668   return 1;
669 }
670 #endif
671 
672 int
dir_freelist_length(int s)673 Stripe::dir_freelist_length(int s)
674 {
675   int free           = 0;
676   CacheDirEntry *seg = this->dir_segment(s);
677   CacheDirEntry *e   = dir_from_offset(this->freelist[s], seg);
678   if (this->check_loop(s)) {
679     return (DIR_DEPTH - 1) * this->_buckets;
680   }
681   while (e) {
682     free++;
683     e = next_dir(e, seg);
684   }
685   return free;
686 }
687 
688 int
check_loop(int s)689 Stripe::check_loop(int s)
690 {
691   // look for loop in the segment
692   // rewrite the freelist if loop is present
693   CacheDirEntry *seg = this->dir_segment(s);
694   CacheDirEntry *e   = dir_from_offset(this->freelist[s], seg);
695   std::bitset<65536> f_bitset;
696   f_bitset.reset();
697   while (e) {
698     int i = dir_next(e);
699     if (f_bitset.test(i)) {
700       // bit was set in a previous round so a loop is present
701       std::cout << "<check_loop> Loop present in Span" << this->_span->_path.string() << " Stripe: " << this->hashText
702                 << "Segment: " << s << std::endl;
703       this->dir_init_segment(s);
704       return 1;
705     }
706     f_bitset[i] = true;
707     e           = dir_from_offset(i, seg);
708   }
709 
710   return 0;
711 }
712 
713 int
compare_ushort(void const * a,void const * b)714 compare_ushort(void const *a, void const *b)
715 {
716   return *static_cast<unsigned short const *>(a) - *static_cast<unsigned short const *>(b);
717 }
718 
719 void
dir_check()720 Stripe::dir_check()
721 {
722   static int const SEGMENT_HISTOGRAM_WIDTH = 16;
723   int hist[SEGMENT_HISTOGRAM_WIDTH + 1]    = {0};
724   unsigned short chain_tag[MAX_ENTRIES_PER_SEGMENT];
725   int32_t chain_mark[MAX_ENTRIES_PER_SEGMENT];
726 
727   this->loadMeta();
728   this->loadDir();
729   //  uint64_t total_buckets = _segments * _buckets;
730   //  uint64_t total_entries = total_buckets * DIR_DEPTH;
731   int frag_demographics[1 << DIR_SIZE_WIDTH][DIR_BLOCK_SIZES];
732   int j;
733   int stale = 0, in_use = 0, empty = 0;
734   int free = 0, head = 0, buckets_in_use = 0;
735 
736   int max_chain_length = 0;
737   int64_t bytes_in_use = 0;
738   std::cout << "Stripe '[" << hashText << "]'" << std::endl;
739   std::cout << "  Directory Bytes: " << _segments * _buckets * SIZEOF_DIR << std::endl;
740   std::cout << "  Segments:  " << _segments << std::endl;
741   std::cout << "  Buckets per segment:  " << _buckets << std::endl;
742   std::cout << "  Entries:  " << _segments * _buckets * DIR_DEPTH << std::endl;
743   for (int s = 0; s < _segments; s++) {
744     CacheDirEntry *seg     = this->dir_segment(s);
745     int seg_chain_max      = 0;
746     int seg_empty          = 0;
747     int seg_in_use         = 0;
748     int seg_stale          = 0;
749     int seg_bytes_in_use   = 0;
750     int seg_dups           = 0;
751     int seg_buckets_in_use = 0;
752 
753     ink_zero(chain_tag);
754     memset(chain_mark, -1, sizeof(chain_mark));
755     for (int b = 0; b < _buckets; b++) {
756       CacheDirEntry *root = dir_bucket(b, seg);
757       int h               = 0;
758       int chain_idx       = 0;
759       int mark            = 0;
760       ++seg_buckets_in_use;
761       // walking through the directories
762       for (CacheDirEntry *e = root; e; e = next_dir(e, seg)) {
763         if (!dir_offset(e)) {
764           ++seg_empty;
765           --seg_buckets_in_use;
766           // this should only happen on the first dir in a bucket
767           assert(nullptr == next_dir(e, seg));
768           break;
769         } else {
770           int e_idx = e - seg;
771           ++h;
772           chain_tag[chain_idx++] = dir_tag(e);
773           if (chain_mark[e_idx] == mark) {
774             printf("    - Cycle of length %d detected for bucket %d\n", h, b);
775           } else if (chain_mark[e_idx] >= 0) {
776             printf("    - Entry %d is in chain %d and %d", e_idx, chain_mark[e_idx], mark);
777           } else {
778             chain_mark[e_idx] = mark;
779           }
780 
781           if (!dir_valid(e)) {
782             ++seg_stale;
783           } else {
784             uint64_t size = dir_approx_size(e);
785             if (dir_head(e)) {
786               ++head;
787             }
788             ++seg_in_use;
789             seg_bytes_in_use += size;
790             ++frag_demographics[dir_size(e)][dir_big(e)];
791           }
792         }
793         e = next_dir(e, seg);
794         if (!e) {
795           break;
796         }
797       }
798 
799       // Check for duplicates (identical tags in the same bucket).
800       if (h > 1) {
801         unsigned short last;
802         qsort(chain_tag, h, sizeof(chain_tag[0]), &compare_ushort);
803         last = chain_tag[0];
804         for (int k = 1; k < h; ++k) {
805           if (last == chain_tag[k]) {
806             ++seg_dups;
807           }
808           last = chain_tag[k];
809         }
810       }
811       ++hist[std::min(h, SEGMENT_HISTOGRAM_WIDTH)];
812       seg_chain_max = std::max(seg_chain_max, h);
813     }
814     int fl_size = dir_freelist_length(s);
815     in_use += seg_in_use;
816     empty += seg_empty;
817     stale += seg_stale;
818     free += fl_size;
819     buckets_in_use += seg_buckets_in_use;
820     max_chain_length = std::max(max_chain_length, seg_chain_max);
821     bytes_in_use += seg_bytes_in_use;
822 
823     printf("  - Segment-%d | Entries: used=%d stale=%d free=%d disk-bytes=%d Buckets: used=%d empty=%d max=%d avg=%.2f dups=%d\n",
824            s, seg_in_use, seg_stale, fl_size, seg_bytes_in_use, seg_buckets_in_use, seg_empty, seg_chain_max,
825            seg_buckets_in_use ? static_cast<float>(seg_in_use + seg_stale) / seg_buckets_in_use : 0.0, seg_dups);
826   }
827   //////////////////
828 
829   printf("  - Stripe | Entries: in-use=%d stale=%d free=%d Buckets: empty=%d max=%d avg=%.2f\n", in_use, stale, free, empty,
830          max_chain_length, buckets_in_use ? static_cast<float>(in_use + stale) / buckets_in_use : 0);
831 
832   printf("    Chain lengths:  ");
833   for (j = 0; j < SEGMENT_HISTOGRAM_WIDTH; ++j) {
834     printf(" %d=%d ", j, hist[j]);
835   }
836   printf(" %d>=%d\n", SEGMENT_HISTOGRAM_WIDTH, hist[SEGMENT_HISTOGRAM_WIDTH]);
837 
838   char tt[256];
839   printf("    Total Size:      %" PRIu64 "\n", static_cast<uint64_t>(_len.count()));
840   printf("    Bytes in Use:    %" PRIu64 " [%0.2f%%]\n", bytes_in_use, 100.0 * (static_cast<float>(bytes_in_use) / _len.count()));
841   printf("    Objects:         %d\n", head);
842   printf("    Average Size:    %" PRIu64 "\n", head ? (bytes_in_use / head) : 0);
843   printf("    Average Frags:   %.2f\n", head ? static_cast<float>(in_use) / head : 0);
844   printf("    Write Position:  %" PRIu64 "\n", _meta[0][0].write_pos - _content.count());
845   printf("    Wrap Count:      %d\n", _meta[0][0].cycle);
846   printf("    Phase:           %s\n", _meta[0][0].phase ? "true" : "false");
847   ctime_r(&_meta[0][0].create_time, tt);
848   tt[strlen(tt) - 1] = 0;
849   printf("    Sync Serial:     %u\n", _meta[0][0].sync_serial);
850   printf("    Write Serial:    %u\n", _meta[0][0].write_serial);
851   printf("    Create Time:     %s\n", tt);
852   printf("\n");
853   printf("  Fragment size demographics\n");
854   for (int b = 0; b < DIR_BLOCK_SIZES; ++b) {
855     int block_size = DIR_BLOCK_SIZE(b);
856     int s          = 0;
857     while (s < 1 << DIR_SIZE_WIDTH) {
858       for (int j = 0; j < 8; ++j, ++s) {
859         // The size markings are redundant. Low values (less than DIR_SHIFT_WIDTH) for larger
860         // base block sizes should never be used. Such entries should use the next smaller base block size.
861         if (b > 0 && s < 1 << DIR_BLOCK_SHIFT(1)) {
862           assert(frag_demographics[s][b] == 0);
863           continue;
864         }
865         printf(" %8d[%2d:%1d]:%06d", (s + 1) * block_size, s, b, frag_demographics[s][b]);
866       }
867       printf("\n");
868     }
869   }
870   printf("\n");
871   ////////////////
872 }
873 
874 Errata
loadMeta()875 Stripe::loadMeta()
876 {
877   // Read from disk in chunks of this size. This needs to be a multiple of both the
878   // store block size and the directory entry size so neither goes across read boundaries.
879   // Beyond that the value should be in the ~10MB range for what I guess is best performance
880   // vs. blocking production disk I/O on a live system.
881   constexpr static int64_t N = (1 << 8) * CacheStoreBlocks::SCALE * sizeof(CacheDirEntry);
882 
883   Errata zret;
884 
885   int fd = _span->_fd;
886   Bytes n;
887   bool found;
888   MemSpan<void> data; // The current view of the read buffer.
889   Bytes delta;
890   Bytes pos = _start;
891   // Avoid searching the entire span, because some of it must be content. Assume that AOS is more than 160
892   // which means at most 10/160 (1/16) of the span can be directory/header.
893   Bytes limit     = pos + _len / 16;
894   size_t io_align = _span->_geometry.blocksz;
895   StripeMeta const *meta;
896 
897   std::unique_ptr<char> bulk_buff;                      // Buffer for bulk reads.
898   static const size_t SBSIZE = CacheStoreBlocks::SCALE; // save some typing.
899   alignas(SBSIZE) char stripe_buff[SBSIZE];             // Use when reading a single stripe block.
900   alignas(SBSIZE) char stripe_buff2[SBSIZE];            // use to save the stripe freelist
901   if (io_align > SBSIZE) {
902     return Errata::Message(0, 1, "Cannot load stripe ", _idx, " on span ", _span->_path.string(),
903                            " because the I/O block alignment ", io_align, " is larger than the buffer alignment ", SBSIZE);
904   }
905 
906   _directory._start = pos;
907   // Header A must be at the start of the stripe block.
908   // Todo: really need to check pread() for failure.
909   ssize_t headerbyteCount = pread(fd, stripe_buff2, SBSIZE, pos);
910   n.assign(headerbyteCount);
911   data.assign(stripe_buff2, n);
912   meta = static_cast<StripeMeta *>(data.data());
913   // TODO:: We need to read more data at this point  to populate dir
914   if (this->validateMeta(meta)) {
915     delta              = Bytes(data.rebind<char>().data() - stripe_buff2);
916     _meta[A][HEAD]     = *meta;
917     _meta_pos[A][HEAD] = round_down(pos + Bytes(delta));
918     pos += round_up(SBSIZE);
919     _directory._skip = Bytes(SBSIZE); // first guess, updated in @c updateLiveData when the header length is computed.
920     // Search for Footer A. Nothing for it except to grub through the disk.
921     // The searched data is cached so it's available for directory parsing later if needed.
922     while (pos < limit) {
923       char *buff = static_cast<char *>(ats_memalign(io_align, N));
924       bulk_buff.reset(buff);
925       n.assign(pread(fd, buff, N, pos));
926       data.assign(buff, n);
927       found = this->probeMeta(data, &_meta[A][HEAD]);
928       if (found) {
929         ptrdiff_t diff     = data.rebind<char>().data() - buff;
930         _meta[A][FOOT]     = static_cast<StripeMeta *>(data.data())[0];
931         _meta_pos[A][FOOT] = round_down(pos + Bytes(diff));
932         // don't bother attaching block if the footer is at the start
933         if (diff > 0) {
934           _directory._clip = Bytes(N - diff);
935           _directory.append({bulk_buff.release(), N});
936         }
937         data.remove_prefix(SBSIZE); // skip footer for checking on B copy.
938         break;
939       } else {
940         _directory.append({bulk_buff.release(), N});
941         pos += round_up(N);
942       }
943     }
944   } else {
945     zret.push(0, 1, "Header A not found");
946   }
947   pos = _meta_pos[A][FOOT];
948   // Technically if Copy A is valid, Copy B is not needed. But at this point it's cheap to retrieve
949   // (as the exact offset is computable).
950   if (_meta_pos[A][FOOT] > 0) {
951     delta = _meta_pos[A][FOOT] - _meta_pos[A][HEAD];
952     // Header B should be immediately after Footer A. If at the end of the last read,
953     // do another read.
954     pos  = this->_start + Bytes(vol_dirlen());
955     meta = static_cast<StripeMeta *>(data.data());
956     if (this->validateMeta(meta)) {
957       _meta[B][HEAD]     = *meta;
958       _meta_pos[B][HEAD] = round_down(pos);
959 
960       // Footer B must be at the same relative offset to Header B as Footer A -> Header A.
961       pos += delta;
962       n = Bytes(pread(fd, stripe_buff, ts::CacheStoreBlocks::SCALE, pos));
963       data.assign(stripe_buff, n);
964       meta = static_cast<StripeMeta *>(data.data());
965       if (this->validateMeta(meta)) {
966         _meta[B][FOOT]     = *meta;
967         _meta_pos[B][FOOT] = round_down(pos);
968       }
969     }
970   }
971 
972   if (_meta_pos[A][FOOT] > 0) {
973     if (_meta[A][HEAD].sync_serial == _meta[A][FOOT].sync_serial &&
974         (0 == _meta_pos[B][FOOT] || _meta[B][HEAD].sync_serial != _meta[B][FOOT].sync_serial ||
975          _meta[A][HEAD].sync_serial >= _meta[B][HEAD].sync_serial)) {
976       this->updateLiveData(A);
977     } else if (_meta_pos[B][FOOT] > 0 && _meta[B][HEAD].sync_serial == _meta[B][FOOT].sync_serial) {
978       this->updateLiveData(B);
979     } else {
980       zret.push(0, 1, "Invalid stripe data - candidates found but sync serial data not valid. ", _meta[A][HEAD].sync_serial, ":",
981                 _meta[A][FOOT].sync_serial, ":", _meta[B][HEAD].sync_serial, ":", _meta[B][FOOT].sync_serial);
982     }
983   }
984 
985   n.assign(headerbyteCount);
986   data.assign(stripe_buff2, n);
987   meta = static_cast<StripeMeta *>(data.data());
988   // copy freelist
989   freelist = static_cast<uint16_t *>(malloc(_segments * sizeof(uint16_t)));
990   for (int i = 0; i < _segments; i++) {
991     freelist[i] = meta->freelist[i];
992   }
993 
994   if (!zret) {
995     _directory.clear();
996   }
997   return zret;
998 }
999 
1000 } // namespace ct
1001