1 //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 #ifndef ROCKSDB_LITE
6 
7 #include "utilities/persistent_cache/block_cache_tier_file.h"
8 
9 #ifndef OS_WIN
10 #include <unistd.h>
11 #endif
12 #include <functional>
13 #include <memory>
14 #include <vector>
15 
16 #include "env/composite_env_wrapper.h"
17 #include "logging/logging.h"
18 #include "port/port.h"
19 #include "util/crc32c.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 //
24 // File creation factories
25 //
NewWritableCacheFile(Env * const env,const std::string & filepath,std::unique_ptr<WritableFile> * file,const bool use_direct_writes=false)26 Status NewWritableCacheFile(Env* const env, const std::string& filepath,
27                             std::unique_ptr<WritableFile>* file,
28                             const bool use_direct_writes = false) {
29   EnvOptions opt;
30   opt.use_direct_writes = use_direct_writes;
31   Status s = env->NewWritableFile(filepath, file, opt);
32   return s;
33 }
34 
NewRandomAccessCacheFile(Env * const env,const std::string & filepath,std::unique_ptr<RandomAccessFile> * file,const bool use_direct_reads=true)35 Status NewRandomAccessCacheFile(Env* const env, const std::string& filepath,
36                                 std::unique_ptr<RandomAccessFile>* file,
37                                 const bool use_direct_reads = true) {
38   assert(env);
39 
40   EnvOptions opt;
41   opt.use_direct_reads = use_direct_reads;
42   Status s = env->NewRandomAccessFile(filepath, file, opt);
43   return s;
44 }
45 
46 //
47 // BlockCacheFile
48 //
Delete(uint64_t * size)49 Status BlockCacheFile::Delete(uint64_t* size) {
50   assert(env_);
51 
52   Status status = env_->GetFileSize(Path(), size);
53   if (!status.ok()) {
54     return status;
55   }
56   return env_->DeleteFile(Path());
57 }
58 
59 //
60 // CacheRecord
61 //
62 // Cache record represents the record on disk
63 //
64 // +--------+---------+----------+------------+---------------+-------------+
65 // | magic  | crc     | key size | value size | key data      | value data  |
66 // +--------+---------+----------+------------+---------------+-------------+
67 // <-- 4 --><-- 4  --><-- 4   --><-- 4     --><-- key size  --><-- v-size -->
68 //
69 struct CacheRecordHeader {
CacheRecordHeaderROCKSDB_NAMESPACE::CacheRecordHeader70   CacheRecordHeader()
71     : magic_(0), crc_(0), key_size_(0), val_size_(0) {}
CacheRecordHeaderROCKSDB_NAMESPACE::CacheRecordHeader72   CacheRecordHeader(const uint32_t magic, const uint32_t key_size,
73                     const uint32_t val_size)
74       : magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {}
75 
76   uint32_t magic_;
77   uint32_t crc_;
78   uint32_t key_size_;
79   uint32_t val_size_;
80 };
81 
82 struct CacheRecord {
CacheRecordROCKSDB_NAMESPACE::CacheRecord83   CacheRecord() {}
CacheRecordROCKSDB_NAMESPACE::CacheRecord84   CacheRecord(const Slice& key, const Slice& val)
85       : hdr_(MAGIC, static_cast<uint32_t>(key.size()),
86              static_cast<uint32_t>(val.size())),
87         key_(key),
88         val_(val) {
89     hdr_.crc_ = ComputeCRC();
90   }
91 
92   uint32_t ComputeCRC() const;
93   bool Serialize(std::vector<CacheWriteBuffer*>* bufs, size_t* woff);
94   bool Deserialize(const Slice& buf);
95 
CalcSizeROCKSDB_NAMESPACE::CacheRecord96   static uint32_t CalcSize(const Slice& key, const Slice& val) {
97     return static_cast<uint32_t>(sizeof(CacheRecordHeader) + key.size() +
98                                  val.size());
99   }
100 
101   static const uint32_t MAGIC = 0xfefa;
102 
103   bool Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
104               const char* data, const size_t size);
105 
106   CacheRecordHeader hdr_;
107   Slice key_;
108   Slice val_;
109 };
110 
111 static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned");
112 
ComputeCRC() const113 uint32_t CacheRecord::ComputeCRC() const {
114   uint32_t crc = 0;
115   CacheRecordHeader tmp = hdr_;
116   tmp.crc_ = 0;
117   crc = crc32c::Extend(crc, reinterpret_cast<const char*>(&tmp), sizeof(tmp));
118   crc = crc32c::Extend(crc, reinterpret_cast<const char*>(key_.data()),
119                        key_.size());
120   crc = crc32c::Extend(crc, reinterpret_cast<const char*>(val_.data()),
121                        val_.size());
122   return crc;
123 }
124 
Serialize(std::vector<CacheWriteBuffer * > * bufs,size_t * woff)125 bool CacheRecord::Serialize(std::vector<CacheWriteBuffer*>* bufs,
126                             size_t* woff) {
127   assert(bufs->size());
128   return Append(bufs, woff, reinterpret_cast<const char*>(&hdr_),
129                 sizeof(hdr_)) &&
130          Append(bufs, woff, reinterpret_cast<const char*>(key_.data()),
131                 key_.size()) &&
132          Append(bufs, woff, reinterpret_cast<const char*>(val_.data()),
133                 val_.size());
134 }
135 
Append(std::vector<CacheWriteBuffer * > * bufs,size_t * woff,const char * data,const size_t data_size)136 bool CacheRecord::Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
137                          const char* data, const size_t data_size) {
138   assert(*woff < bufs->size());
139 
140   const char* p = data;
141   size_t size = data_size;
142 
143   while (size && *woff < bufs->size()) {
144     CacheWriteBuffer* buf = (*bufs)[*woff];
145     const size_t free = buf->Free();
146     if (size <= free) {
147       buf->Append(p, size);
148       size = 0;
149     } else {
150       buf->Append(p, free);
151       p += free;
152       size -= free;
153       assert(!buf->Free());
154       assert(buf->Used() == buf->Capacity());
155     }
156 
157     if (!buf->Free()) {
158       *woff += 1;
159     }
160   }
161 
162   assert(!size);
163 
164   return !size;
165 }
166 
Deserialize(const Slice & data)167 bool CacheRecord::Deserialize(const Slice& data) {
168   assert(data.size() >= sizeof(CacheRecordHeader));
169   if (data.size() < sizeof(CacheRecordHeader)) {
170     return false;
171   }
172 
173   memcpy(&hdr_, data.data(), sizeof(hdr_));
174 
175   assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size());
176   if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) {
177     return false;
178   }
179 
180   key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_);
181   val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_);
182 
183   if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) {
184     fprintf(stderr, "** magic %d ** \n", hdr_.magic_);
185     fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_);
186     fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_);
187     fprintf(stderr, "** key %s ** \n", key_.ToString().c_str());
188     fprintf(stderr, "** val %s ** \n", val_.ToString().c_str());
189     for (size_t i = 0; i < hdr_.val_size_; ++i) {
190       fprintf(stderr, "%d.", (uint8_t)val_.data()[i]);
191     }
192     fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC());
193   }
194 
195   assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_);
196   return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_;
197 }
198 
199 //
200 // RandomAccessFile
201 //
202 
Open(const bool enable_direct_reads)203 bool RandomAccessCacheFile::Open(const bool enable_direct_reads) {
204   WriteLock _(&rwlock_);
205   return OpenImpl(enable_direct_reads);
206 }
207 
OpenImpl(const bool enable_direct_reads)208 bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) {
209   rwlock_.AssertHeld();
210 
211   ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str());
212 
213   std::unique_ptr<RandomAccessFile> file;
214   Status status =
215       NewRandomAccessCacheFile(env_, Path(), &file, enable_direct_reads);
216   if (!status.ok()) {
217     Error(log_, "Error opening random access file %s. %s", Path().c_str(),
218           status.ToString().c_str());
219     return false;
220   }
221   freader_.reset(new RandomAccessFileReader(
222       NewLegacyRandomAccessFileWrapper(file), Path(), env_));
223 
224   return true;
225 }
226 
Read(const LBA & lba,Slice * key,Slice * val,char * scratch)227 bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val,
228                                  char* scratch) {
229   ReadLock _(&rwlock_);
230 
231   assert(lba.cache_id_ == cache_id_);
232 
233   if (!freader_) {
234     return false;
235   }
236 
237   Slice result;
238   Status s = freader_->Read(lba.off_, lba.size_, &result, scratch);
239   if (!s.ok()) {
240     Error(log_, "Error reading from file %s. %s", Path().c_str(),
241           s.ToString().c_str());
242     return false;
243   }
244 
245   assert(result.data() == scratch);
246 
247   return ParseRec(lba, key, val, scratch);
248 }
249 
ParseRec(const LBA & lba,Slice * key,Slice * val,char * scratch)250 bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val,
251                                      char* scratch) {
252   Slice data(scratch, lba.size_);
253 
254   CacheRecord rec;
255   if (!rec.Deserialize(data)) {
256     assert(!"Error deserializing data");
257     Error(log_, "Error de-serializing record from file %s off %d",
258           Path().c_str(), lba.off_);
259     return false;
260   }
261 
262   *key = Slice(rec.key_);
263   *val = Slice(rec.val_);
264 
265   return true;
266 }
267 
268 //
269 // WriteableCacheFile
270 //
271 
~WriteableCacheFile()272 WriteableCacheFile::~WriteableCacheFile() {
273   WriteLock _(&rwlock_);
274   if (!eof_) {
275     // This file never flushed. We give priority to shutdown since this is a
276     // cache
277     // TODO(krad): Figure a way to flush the pending data
278     if (file_) {
279       assert(refs_ == 1);
280       --refs_;
281     }
282   }
283   assert(!refs_);
284   ClearBuffers();
285 }
286 
Create(const bool,const bool enable_direct_reads)287 bool WriteableCacheFile::Create(const bool /*enable_direct_writes*/,
288                                 const bool enable_direct_reads) {
289   WriteLock _(&rwlock_);
290 
291   enable_direct_reads_ = enable_direct_reads;
292 
293   ROCKS_LOG_DEBUG(log_, "Creating new cache %s (max size is %d B)",
294                   Path().c_str(), max_size_);
295 
296   assert(env_);
297 
298   Status s = env_->FileExists(Path());
299   if (s.ok()) {
300     ROCKS_LOG_WARN(log_, "File %s already exists. %s", Path().c_str(),
301                    s.ToString().c_str());
302   }
303 
304   s = NewWritableCacheFile(env_, Path(), &file_);
305   if (!s.ok()) {
306     ROCKS_LOG_WARN(log_, "Unable to create file %s. %s", Path().c_str(),
307                    s.ToString().c_str());
308     return false;
309   }
310 
311   assert(!refs_);
312   ++refs_;
313 
314   return true;
315 }
316 
Append(const Slice & key,const Slice & val,LBA * lba)317 bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) {
318   WriteLock _(&rwlock_);
319 
320   if (eof_) {
321     // We can't append since the file is full
322     return false;
323   }
324 
325   // estimate the space required to store the (key, val)
326   uint32_t rec_size = CacheRecord::CalcSize(key, val);
327 
328   if (!ExpandBuffer(rec_size)) {
329     // unable to expand the buffer
330     ROCKS_LOG_DEBUG(log_, "Error expanding buffers. size=%d", rec_size);
331     return false;
332   }
333 
334   lba->cache_id_ = cache_id_;
335   lba->off_ = disk_woff_;
336   lba->size_ = rec_size;
337 
338   CacheRecord rec(key, val);
339   if (!rec.Serialize(&bufs_, &buf_woff_)) {
340     // unexpected error: unable to serialize the data
341     assert(!"Error serializing record");
342     return false;
343   }
344 
345   disk_woff_ += rec_size;
346   eof_ = disk_woff_ >= max_size_;
347 
348   // dispatch buffer for flush
349   DispatchBuffer();
350 
351   return true;
352 }
353 
ExpandBuffer(const size_t size)354 bool WriteableCacheFile::ExpandBuffer(const size_t size) {
355   rwlock_.AssertHeld();
356   assert(!eof_);
357 
358   // determine if there is enough space
359   size_t free = 0;  // compute the free space left in buffer
360   for (size_t i = buf_woff_; i < bufs_.size(); ++i) {
361     free += bufs_[i]->Free();
362     if (size <= free) {
363       // we have enough space in the buffer
364       return true;
365     }
366   }
367 
368   // expand the buffer until there is enough space to write `size` bytes
369   assert(free < size);
370   assert(alloc_);
371 
372   while (free < size) {
373     CacheWriteBuffer* const buf = alloc_->Allocate();
374     if (!buf) {
375       ROCKS_LOG_DEBUG(log_, "Unable to allocate buffers");
376       return false;
377     }
378 
379     size_ += static_cast<uint32_t>(buf->Free());
380     free += buf->Free();
381     bufs_.push_back(buf);
382   }
383 
384   assert(free >= size);
385   return true;
386 }
387 
DispatchBuffer()388 void WriteableCacheFile::DispatchBuffer() {
389   rwlock_.AssertHeld();
390 
391   assert(bufs_.size());
392   assert(buf_doff_ <= buf_woff_);
393   assert(buf_woff_ <= bufs_.size());
394 
395   if (pending_ios_) {
396     return;
397   }
398 
399   if (!eof_ && buf_doff_ == buf_woff_) {
400     // dispatch buffer is pointing to write buffer and we haven't hit eof
401     return;
402   }
403 
404   assert(eof_ || buf_doff_ < buf_woff_);
405   assert(buf_doff_ < bufs_.size());
406   assert(file_);
407   assert(alloc_);
408 
409   auto* buf = bufs_[buf_doff_];
410   const uint64_t file_off = buf_doff_ * alloc_->BufferSize();
411 
412   assert(!buf->Free() ||
413          (eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size()));
414   // we have reached end of file, and there is space in the last buffer
415   // pad it with zero for direct IO
416   buf->FillTrailingZeros();
417 
418   assert(buf->Used() % kFileAlignmentSize == 0);
419 
420   writer_->Write(file_.get(), buf, file_off,
421                  std::bind(&WriteableCacheFile::BufferWriteDone, this));
422   pending_ios_++;
423   buf_doff_++;
424 }
425 
BufferWriteDone()426 void WriteableCacheFile::BufferWriteDone() {
427   WriteLock _(&rwlock_);
428 
429   assert(bufs_.size());
430 
431   pending_ios_--;
432 
433   if (buf_doff_ < bufs_.size()) {
434     DispatchBuffer();
435   }
436 
437   if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) {
438     // end-of-file reached, move to read mode
439     CloseAndOpenForReading();
440   }
441 }
442 
CloseAndOpenForReading()443 void WriteableCacheFile::CloseAndOpenForReading() {
444   // Our env abstraction do not allow reading from a file opened for appending
445   // We need close the file and re-open it for reading
446   Close();
447   RandomAccessCacheFile::OpenImpl(enable_direct_reads_);
448 }
449 
ReadBuffer(const LBA & lba,Slice * key,Slice * block,char * scratch)450 bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block,
451                                     char* scratch) {
452   rwlock_.AssertHeld();
453 
454   if (!ReadBuffer(lba, scratch)) {
455     Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_,
456           lba.off_);
457     return false;
458   }
459 
460   return ParseRec(lba, key, block, scratch);
461 }
462 
ReadBuffer(const LBA & lba,char * data)463 bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) {
464   rwlock_.AssertHeld();
465 
466   assert(lba.off_ < disk_woff_);
467   assert(alloc_);
468 
469   // we read from the buffers like reading from a flat file. The list of buffers
470   // are treated as contiguous stream of data
471 
472   char* tmp = data;
473   size_t pending_nbytes = lba.size_;
474   // start buffer
475   size_t start_idx = lba.off_ / alloc_->BufferSize();
476   // offset into the start buffer
477   size_t start_off = lba.off_ % alloc_->BufferSize();
478 
479   assert(start_idx <= buf_woff_);
480 
481   for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) {
482     assert(i <= buf_woff_);
483     auto* buf = bufs_[i];
484     assert(i == buf_woff_ || !buf->Free());
485     // bytes to write to the buffer
486     size_t nbytes = pending_nbytes > (buf->Used() - start_off)
487                         ? (buf->Used() - start_off)
488                         : pending_nbytes;
489     memcpy(tmp, buf->Data() + start_off, nbytes);
490 
491     // left over to be written
492     pending_nbytes -= nbytes;
493     start_off = 0;
494     tmp += nbytes;
495   }
496 
497   assert(!pending_nbytes);
498   if (pending_nbytes) {
499     return false;
500   }
501 
502   assert(tmp == data + lba.size_);
503   return true;
504 }
505 
Close()506 void WriteableCacheFile::Close() {
507   rwlock_.AssertHeld();
508 
509   assert(size_ >= max_size_);
510   assert(disk_woff_ >= max_size_);
511   assert(buf_doff_ == bufs_.size());
512   assert(bufs_.size() - buf_woff_ <= 1);
513   assert(!pending_ios_);
514 
515   Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_,
516        disk_woff_);
517 
518   ClearBuffers();
519   file_.reset();
520 
521   assert(refs_);
522   --refs_;
523 }
524 
ClearBuffers()525 void WriteableCacheFile::ClearBuffers() {
526   assert(alloc_);
527 
528   for (size_t i = 0; i < bufs_.size(); ++i) {
529     alloc_->Deallocate(bufs_[i]);
530   }
531 
532   bufs_.clear();
533 }
534 
535 //
536 // ThreadedFileWriter implementation
537 //
ThreadedWriter(PersistentCacheTier * const cache,const size_t qdepth,const size_t io_size)538 ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache,
539                                const size_t qdepth, const size_t io_size)
540     : Writer(cache), io_size_(io_size) {
541   for (size_t i = 0; i < qdepth; ++i) {
542     port::Thread th(&ThreadedWriter::ThreadMain, this);
543     threads_.push_back(std::move(th));
544   }
545 }
546 
Stop()547 void ThreadedWriter::Stop() {
548   // notify all threads to exit
549   for (size_t i = 0; i < threads_.size(); ++i) {
550     q_.Push(IO(/*signal=*/true));
551   }
552 
553   // wait for all threads to exit
554   for (auto& th : threads_) {
555     th.join();
556     assert(!th.joinable());
557   }
558   threads_.clear();
559 }
560 
Write(WritableFile * const file,CacheWriteBuffer * buf,const uint64_t file_off,const std::function<void ()> callback)561 void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf,
562                            const uint64_t file_off,
563                            const std::function<void()> callback) {
564   q_.Push(IO(file, buf, file_off, callback));
565 }
566 
ThreadMain()567 void ThreadedWriter::ThreadMain() {
568   while (true) {
569     // Fetch the IO to process
570     IO io(q_.Pop());
571     if (io.signal_) {
572       // that's secret signal to exit
573       break;
574     }
575 
576     // Reserve space for writing the buffer
577     while (!cache_->Reserve(io.buf_->Used())) {
578       // We can fail to reserve space if every file in the system
579       // is being currently accessed
580       /* sleep override */
581       Env::Default()->SleepForMicroseconds(1000000);
582     }
583 
584     DispatchIO(io);
585 
586     io.callback_();
587   }
588 }
589 
DispatchIO(const IO & io)590 void ThreadedWriter::DispatchIO(const IO& io) {
591   size_t written = 0;
592   while (written < io.buf_->Used()) {
593     Slice data(io.buf_->Data() + written, io_size_);
594     Status s = io.file_->Append(data);
595     assert(s.ok());
596     if (!s.ok()) {
597       // That is definite IO error to device. There is not much we can
598       // do but ignore the failure. This can lead to corruption of data on
599       // disk, but the cache will skip while reading
600       fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str());
601     }
602     written += io_size_;
603   }
604 }
605 
606 }  // namespace ROCKSDB_NAMESPACE
607 
608 #endif
609