1 // Copyright (c) 2013, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 #ifndef ROCKSDB_LITE
6
7 #include "utilities/persistent_cache/block_cache_tier_file.h"
8
9 #ifndef OS_WIN
10 #include <unistd.h>
11 #endif
12 #include <functional>
13 #include <memory>
14 #include <vector>
15
16 #include "env/composite_env_wrapper.h"
17 #include "logging/logging.h"
18 #include "port/port.h"
19 #include "util/crc32c.h"
20
21 namespace ROCKSDB_NAMESPACE {
22
23 //
24 // File creation factories
25 //
NewWritableCacheFile(Env * const env,const std::string & filepath,std::unique_ptr<WritableFile> * file,const bool use_direct_writes=false)26 Status NewWritableCacheFile(Env* const env, const std::string& filepath,
27 std::unique_ptr<WritableFile>* file,
28 const bool use_direct_writes = false) {
29 EnvOptions opt;
30 opt.use_direct_writes = use_direct_writes;
31 Status s = env->NewWritableFile(filepath, file, opt);
32 return s;
33 }
34
NewRandomAccessCacheFile(Env * const env,const std::string & filepath,std::unique_ptr<RandomAccessFile> * file,const bool use_direct_reads=true)35 Status NewRandomAccessCacheFile(Env* const env, const std::string& filepath,
36 std::unique_ptr<RandomAccessFile>* file,
37 const bool use_direct_reads = true) {
38 assert(env);
39
40 EnvOptions opt;
41 opt.use_direct_reads = use_direct_reads;
42 Status s = env->NewRandomAccessFile(filepath, file, opt);
43 return s;
44 }
45
46 //
47 // BlockCacheFile
48 //
Delete(uint64_t * size)49 Status BlockCacheFile::Delete(uint64_t* size) {
50 assert(env_);
51
52 Status status = env_->GetFileSize(Path(), size);
53 if (!status.ok()) {
54 return status;
55 }
56 return env_->DeleteFile(Path());
57 }
58
59 //
60 // CacheRecord
61 //
62 // Cache record represents the record on disk
63 //
64 // +--------+---------+----------+------------+---------------+-------------+
65 // | magic | crc | key size | value size | key data | value data |
66 // +--------+---------+----------+------------+---------------+-------------+
67 // <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size -->
68 //
69 struct CacheRecordHeader {
CacheRecordHeaderROCKSDB_NAMESPACE::CacheRecordHeader70 CacheRecordHeader()
71 : magic_(0), crc_(0), key_size_(0), val_size_(0) {}
CacheRecordHeaderROCKSDB_NAMESPACE::CacheRecordHeader72 CacheRecordHeader(const uint32_t magic, const uint32_t key_size,
73 const uint32_t val_size)
74 : magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {}
75
76 uint32_t magic_;
77 uint32_t crc_;
78 uint32_t key_size_;
79 uint32_t val_size_;
80 };
81
82 struct CacheRecord {
CacheRecordROCKSDB_NAMESPACE::CacheRecord83 CacheRecord() {}
CacheRecordROCKSDB_NAMESPACE::CacheRecord84 CacheRecord(const Slice& key, const Slice& val)
85 : hdr_(MAGIC, static_cast<uint32_t>(key.size()),
86 static_cast<uint32_t>(val.size())),
87 key_(key),
88 val_(val) {
89 hdr_.crc_ = ComputeCRC();
90 }
91
92 uint32_t ComputeCRC() const;
93 bool Serialize(std::vector<CacheWriteBuffer*>* bufs, size_t* woff);
94 bool Deserialize(const Slice& buf);
95
CalcSizeROCKSDB_NAMESPACE::CacheRecord96 static uint32_t CalcSize(const Slice& key, const Slice& val) {
97 return static_cast<uint32_t>(sizeof(CacheRecordHeader) + key.size() +
98 val.size());
99 }
100
101 static const uint32_t MAGIC = 0xfefa;
102
103 bool Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
104 const char* data, const size_t size);
105
106 CacheRecordHeader hdr_;
107 Slice key_;
108 Slice val_;
109 };
110
111 static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned");
112
ComputeCRC() const113 uint32_t CacheRecord::ComputeCRC() const {
114 uint32_t crc = 0;
115 CacheRecordHeader tmp = hdr_;
116 tmp.crc_ = 0;
117 crc = crc32c::Extend(crc, reinterpret_cast<const char*>(&tmp), sizeof(tmp));
118 crc = crc32c::Extend(crc, reinterpret_cast<const char*>(key_.data()),
119 key_.size());
120 crc = crc32c::Extend(crc, reinterpret_cast<const char*>(val_.data()),
121 val_.size());
122 return crc;
123 }
124
Serialize(std::vector<CacheWriteBuffer * > * bufs,size_t * woff)125 bool CacheRecord::Serialize(std::vector<CacheWriteBuffer*>* bufs,
126 size_t* woff) {
127 assert(bufs->size());
128 return Append(bufs, woff, reinterpret_cast<const char*>(&hdr_),
129 sizeof(hdr_)) &&
130 Append(bufs, woff, reinterpret_cast<const char*>(key_.data()),
131 key_.size()) &&
132 Append(bufs, woff, reinterpret_cast<const char*>(val_.data()),
133 val_.size());
134 }
135
Append(std::vector<CacheWriteBuffer * > * bufs,size_t * woff,const char * data,const size_t data_size)136 bool CacheRecord::Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
137 const char* data, const size_t data_size) {
138 assert(*woff < bufs->size());
139
140 const char* p = data;
141 size_t size = data_size;
142
143 while (size && *woff < bufs->size()) {
144 CacheWriteBuffer* buf = (*bufs)[*woff];
145 const size_t free = buf->Free();
146 if (size <= free) {
147 buf->Append(p, size);
148 size = 0;
149 } else {
150 buf->Append(p, free);
151 p += free;
152 size -= free;
153 assert(!buf->Free());
154 assert(buf->Used() == buf->Capacity());
155 }
156
157 if (!buf->Free()) {
158 *woff += 1;
159 }
160 }
161
162 assert(!size);
163
164 return !size;
165 }
166
Deserialize(const Slice & data)167 bool CacheRecord::Deserialize(const Slice& data) {
168 assert(data.size() >= sizeof(CacheRecordHeader));
169 if (data.size() < sizeof(CacheRecordHeader)) {
170 return false;
171 }
172
173 memcpy(&hdr_, data.data(), sizeof(hdr_));
174
175 assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size());
176 if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) {
177 return false;
178 }
179
180 key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_);
181 val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_);
182
183 if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) {
184 fprintf(stderr, "** magic %d ** \n", hdr_.magic_);
185 fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_);
186 fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_);
187 fprintf(stderr, "** key %s ** \n", key_.ToString().c_str());
188 fprintf(stderr, "** val %s ** \n", val_.ToString().c_str());
189 for (size_t i = 0; i < hdr_.val_size_; ++i) {
190 fprintf(stderr, "%d.", (uint8_t)val_.data()[i]);
191 }
192 fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC());
193 }
194
195 assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_);
196 return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_;
197 }
198
199 //
200 // RandomAccessFile
201 //
202
Open(const bool enable_direct_reads)203 bool RandomAccessCacheFile::Open(const bool enable_direct_reads) {
204 WriteLock _(&rwlock_);
205 return OpenImpl(enable_direct_reads);
206 }
207
OpenImpl(const bool enable_direct_reads)208 bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) {
209 rwlock_.AssertHeld();
210
211 ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str());
212
213 std::unique_ptr<RandomAccessFile> file;
214 Status status =
215 NewRandomAccessCacheFile(env_, Path(), &file, enable_direct_reads);
216 if (!status.ok()) {
217 Error(log_, "Error opening random access file %s. %s", Path().c_str(),
218 status.ToString().c_str());
219 return false;
220 }
221 freader_.reset(new RandomAccessFileReader(
222 NewLegacyRandomAccessFileWrapper(file), Path(), env_));
223
224 return true;
225 }
226
Read(const LBA & lba,Slice * key,Slice * val,char * scratch)227 bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val,
228 char* scratch) {
229 ReadLock _(&rwlock_);
230
231 assert(lba.cache_id_ == cache_id_);
232
233 if (!freader_) {
234 return false;
235 }
236
237 Slice result;
238 Status s = freader_->Read(lba.off_, lba.size_, &result, scratch);
239 if (!s.ok()) {
240 Error(log_, "Error reading from file %s. %s", Path().c_str(),
241 s.ToString().c_str());
242 return false;
243 }
244
245 assert(result.data() == scratch);
246
247 return ParseRec(lba, key, val, scratch);
248 }
249
ParseRec(const LBA & lba,Slice * key,Slice * val,char * scratch)250 bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val,
251 char* scratch) {
252 Slice data(scratch, lba.size_);
253
254 CacheRecord rec;
255 if (!rec.Deserialize(data)) {
256 assert(!"Error deserializing data");
257 Error(log_, "Error de-serializing record from file %s off %d",
258 Path().c_str(), lba.off_);
259 return false;
260 }
261
262 *key = Slice(rec.key_);
263 *val = Slice(rec.val_);
264
265 return true;
266 }
267
268 //
269 // WriteableCacheFile
270 //
271
~WriteableCacheFile()272 WriteableCacheFile::~WriteableCacheFile() {
273 WriteLock _(&rwlock_);
274 if (!eof_) {
275 // This file never flushed. We give priority to shutdown since this is a
276 // cache
277 // TODO(krad): Figure a way to flush the pending data
278 if (file_) {
279 assert(refs_ == 1);
280 --refs_;
281 }
282 }
283 assert(!refs_);
284 ClearBuffers();
285 }
286
Create(const bool,const bool enable_direct_reads)287 bool WriteableCacheFile::Create(const bool /*enable_direct_writes*/,
288 const bool enable_direct_reads) {
289 WriteLock _(&rwlock_);
290
291 enable_direct_reads_ = enable_direct_reads;
292
293 ROCKS_LOG_DEBUG(log_, "Creating new cache %s (max size is %d B)",
294 Path().c_str(), max_size_);
295
296 assert(env_);
297
298 Status s = env_->FileExists(Path());
299 if (s.ok()) {
300 ROCKS_LOG_WARN(log_, "File %s already exists. %s", Path().c_str(),
301 s.ToString().c_str());
302 }
303
304 s = NewWritableCacheFile(env_, Path(), &file_);
305 if (!s.ok()) {
306 ROCKS_LOG_WARN(log_, "Unable to create file %s. %s", Path().c_str(),
307 s.ToString().c_str());
308 return false;
309 }
310
311 assert(!refs_);
312 ++refs_;
313
314 return true;
315 }
316
Append(const Slice & key,const Slice & val,LBA * lba)317 bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) {
318 WriteLock _(&rwlock_);
319
320 if (eof_) {
321 // We can't append since the file is full
322 return false;
323 }
324
325 // estimate the space required to store the (key, val)
326 uint32_t rec_size = CacheRecord::CalcSize(key, val);
327
328 if (!ExpandBuffer(rec_size)) {
329 // unable to expand the buffer
330 ROCKS_LOG_DEBUG(log_, "Error expanding buffers. size=%d", rec_size);
331 return false;
332 }
333
334 lba->cache_id_ = cache_id_;
335 lba->off_ = disk_woff_;
336 lba->size_ = rec_size;
337
338 CacheRecord rec(key, val);
339 if (!rec.Serialize(&bufs_, &buf_woff_)) {
340 // unexpected error: unable to serialize the data
341 assert(!"Error serializing record");
342 return false;
343 }
344
345 disk_woff_ += rec_size;
346 eof_ = disk_woff_ >= max_size_;
347
348 // dispatch buffer for flush
349 DispatchBuffer();
350
351 return true;
352 }
353
ExpandBuffer(const size_t size)354 bool WriteableCacheFile::ExpandBuffer(const size_t size) {
355 rwlock_.AssertHeld();
356 assert(!eof_);
357
358 // determine if there is enough space
359 size_t free = 0; // compute the free space left in buffer
360 for (size_t i = buf_woff_; i < bufs_.size(); ++i) {
361 free += bufs_[i]->Free();
362 if (size <= free) {
363 // we have enough space in the buffer
364 return true;
365 }
366 }
367
368 // expand the buffer until there is enough space to write `size` bytes
369 assert(free < size);
370 assert(alloc_);
371
372 while (free < size) {
373 CacheWriteBuffer* const buf = alloc_->Allocate();
374 if (!buf) {
375 ROCKS_LOG_DEBUG(log_, "Unable to allocate buffers");
376 return false;
377 }
378
379 size_ += static_cast<uint32_t>(buf->Free());
380 free += buf->Free();
381 bufs_.push_back(buf);
382 }
383
384 assert(free >= size);
385 return true;
386 }
387
DispatchBuffer()388 void WriteableCacheFile::DispatchBuffer() {
389 rwlock_.AssertHeld();
390
391 assert(bufs_.size());
392 assert(buf_doff_ <= buf_woff_);
393 assert(buf_woff_ <= bufs_.size());
394
395 if (pending_ios_) {
396 return;
397 }
398
399 if (!eof_ && buf_doff_ == buf_woff_) {
400 // dispatch buffer is pointing to write buffer and we haven't hit eof
401 return;
402 }
403
404 assert(eof_ || buf_doff_ < buf_woff_);
405 assert(buf_doff_ < bufs_.size());
406 assert(file_);
407 assert(alloc_);
408
409 auto* buf = bufs_[buf_doff_];
410 const uint64_t file_off = buf_doff_ * alloc_->BufferSize();
411
412 assert(!buf->Free() ||
413 (eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size()));
414 // we have reached end of file, and there is space in the last buffer
415 // pad it with zero for direct IO
416 buf->FillTrailingZeros();
417
418 assert(buf->Used() % kFileAlignmentSize == 0);
419
420 writer_->Write(file_.get(), buf, file_off,
421 std::bind(&WriteableCacheFile::BufferWriteDone, this));
422 pending_ios_++;
423 buf_doff_++;
424 }
425
BufferWriteDone()426 void WriteableCacheFile::BufferWriteDone() {
427 WriteLock _(&rwlock_);
428
429 assert(bufs_.size());
430
431 pending_ios_--;
432
433 if (buf_doff_ < bufs_.size()) {
434 DispatchBuffer();
435 }
436
437 if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) {
438 // end-of-file reached, move to read mode
439 CloseAndOpenForReading();
440 }
441 }
442
CloseAndOpenForReading()443 void WriteableCacheFile::CloseAndOpenForReading() {
444 // Our env abstraction do not allow reading from a file opened for appending
445 // We need close the file and re-open it for reading
446 Close();
447 RandomAccessCacheFile::OpenImpl(enable_direct_reads_);
448 }
449
ReadBuffer(const LBA & lba,Slice * key,Slice * block,char * scratch)450 bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block,
451 char* scratch) {
452 rwlock_.AssertHeld();
453
454 if (!ReadBuffer(lba, scratch)) {
455 Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_,
456 lba.off_);
457 return false;
458 }
459
460 return ParseRec(lba, key, block, scratch);
461 }
462
ReadBuffer(const LBA & lba,char * data)463 bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) {
464 rwlock_.AssertHeld();
465
466 assert(lba.off_ < disk_woff_);
467 assert(alloc_);
468
469 // we read from the buffers like reading from a flat file. The list of buffers
470 // are treated as contiguous stream of data
471
472 char* tmp = data;
473 size_t pending_nbytes = lba.size_;
474 // start buffer
475 size_t start_idx = lba.off_ / alloc_->BufferSize();
476 // offset into the start buffer
477 size_t start_off = lba.off_ % alloc_->BufferSize();
478
479 assert(start_idx <= buf_woff_);
480
481 for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) {
482 assert(i <= buf_woff_);
483 auto* buf = bufs_[i];
484 assert(i == buf_woff_ || !buf->Free());
485 // bytes to write to the buffer
486 size_t nbytes = pending_nbytes > (buf->Used() - start_off)
487 ? (buf->Used() - start_off)
488 : pending_nbytes;
489 memcpy(tmp, buf->Data() + start_off, nbytes);
490
491 // left over to be written
492 pending_nbytes -= nbytes;
493 start_off = 0;
494 tmp += nbytes;
495 }
496
497 assert(!pending_nbytes);
498 if (pending_nbytes) {
499 return false;
500 }
501
502 assert(tmp == data + lba.size_);
503 return true;
504 }
505
Close()506 void WriteableCacheFile::Close() {
507 rwlock_.AssertHeld();
508
509 assert(size_ >= max_size_);
510 assert(disk_woff_ >= max_size_);
511 assert(buf_doff_ == bufs_.size());
512 assert(bufs_.size() - buf_woff_ <= 1);
513 assert(!pending_ios_);
514
515 Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_,
516 disk_woff_);
517
518 ClearBuffers();
519 file_.reset();
520
521 assert(refs_);
522 --refs_;
523 }
524
ClearBuffers()525 void WriteableCacheFile::ClearBuffers() {
526 assert(alloc_);
527
528 for (size_t i = 0; i < bufs_.size(); ++i) {
529 alloc_->Deallocate(bufs_[i]);
530 }
531
532 bufs_.clear();
533 }
534
535 //
536 // ThreadedFileWriter implementation
537 //
ThreadedWriter(PersistentCacheTier * const cache,const size_t qdepth,const size_t io_size)538 ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache,
539 const size_t qdepth, const size_t io_size)
540 : Writer(cache), io_size_(io_size) {
541 for (size_t i = 0; i < qdepth; ++i) {
542 port::Thread th(&ThreadedWriter::ThreadMain, this);
543 threads_.push_back(std::move(th));
544 }
545 }
546
Stop()547 void ThreadedWriter::Stop() {
548 // notify all threads to exit
549 for (size_t i = 0; i < threads_.size(); ++i) {
550 q_.Push(IO(/*signal=*/true));
551 }
552
553 // wait for all threads to exit
554 for (auto& th : threads_) {
555 th.join();
556 assert(!th.joinable());
557 }
558 threads_.clear();
559 }
560
Write(WritableFile * const file,CacheWriteBuffer * buf,const uint64_t file_off,const std::function<void ()> callback)561 void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf,
562 const uint64_t file_off,
563 const std::function<void()> callback) {
564 q_.Push(IO(file, buf, file_off, callback));
565 }
566
ThreadMain()567 void ThreadedWriter::ThreadMain() {
568 while (true) {
569 // Fetch the IO to process
570 IO io(q_.Pop());
571 if (io.signal_) {
572 // that's secret signal to exit
573 break;
574 }
575
576 // Reserve space for writing the buffer
577 while (!cache_->Reserve(io.buf_->Used())) {
578 // We can fail to reserve space if every file in the system
579 // is being currently accessed
580 /* sleep override */
581 Env::Default()->SleepForMicroseconds(1000000);
582 }
583
584 DispatchIO(io);
585
586 io.callback_();
587 }
588 }
589
DispatchIO(const IO & io)590 void ThreadedWriter::DispatchIO(const IO& io) {
591 size_t written = 0;
592 while (written < io.buf_->Used()) {
593 Slice data(io.buf_->Data() + written, io_size_);
594 Status s = io.file_->Append(data);
595 assert(s.ok());
596 if (!s.ok()) {
597 // That is definite IO error to device. There is not much we can
598 // do but ignore the failure. This can lead to corruption of data on
599 // disk, but the cache will skip while reading
600 fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str());
601 }
602 written += io_size_;
603 }
604 }
605
606 } // namespace ROCKSDB_NAMESPACE
607
608 #endif
609