1 //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 #ifndef ROCKSDB_LITE
6 
7 #include "utilities/persistent_cache/block_cache_tier.h"
8 
9 #include <regex>
10 #include <utility>
11 #include <vector>
12 
13 #include "logging/logging.h"
14 #include "port/port.h"
15 #include "test_util/sync_point.h"
16 #include "util/stop_watch.h"
17 #include "utilities/persistent_cache/block_cache_tier_file.h"
18 
19 namespace ROCKSDB_NAMESPACE {
20 
21 //
22 // BlockCacheImpl
23 //
Open()24 Status BlockCacheTier::Open() {
25   Status status;
26 
27   WriteLock _(&lock_);
28 
29   assert(!size_);
30 
31   // Check the validity of the options
32   status = opt_.ValidateSettings();
33   assert(status.ok());
34   if (!status.ok()) {
35     Error(opt_.log, "Invalid block cache options");
36     return status;
37   }
38 
39   // Create base directory or cleanup existing directory
40   status = opt_.env->CreateDirIfMissing(opt_.path);
41   if (!status.ok()) {
42     Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
43           status.ToString().c_str());
44     return status;
45   }
46 
47   // Create base/<cache dir> directory
48   status = opt_.env->CreateDir(GetCachePath());
49   if (!status.ok()) {
50     // directory already exists, clean it up
51     status = CleanupCacheFolder(GetCachePath());
52     assert(status.ok());
53     if (!status.ok()) {
54       Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
55             status.ToString().c_str());
56       return status;
57     }
58   }
59 
60   // create a new file
61   assert(!cache_file_);
62   status = NewCacheFile();
63   if (!status.ok()) {
64     Error(opt_.log, "Error creating new file %s. %s", opt_.path.c_str(),
65           status.ToString().c_str());
66     return status;
67   }
68 
69   assert(cache_file_);
70 
71   if (opt_.pipeline_writes) {
72     assert(!insert_th_.joinable());
73     insert_th_ = port::Thread(&BlockCacheTier::InsertMain, this);
74   }
75 
76   return Status::OK();
77 }
78 
IsCacheFile(const std::string & file)79 bool IsCacheFile(const std::string& file) {
80   // check if the file has .rc suffix
81   // Unfortunately regex support across compilers is not even, so we use simple
82   // string parsing
83   size_t pos = file.find(".");
84   if (pos == std::string::npos) {
85     return false;
86   }
87 
88   std::string suffix = file.substr(pos);
89   return suffix == ".rc";
90 }
91 
CleanupCacheFolder(const std::string & folder)92 Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) {
93   std::vector<std::string> files;
94   Status status = opt_.env->GetChildren(folder, &files);
95   if (!status.ok()) {
96     Error(opt_.log, "Error getting files for %s. %s", folder.c_str(),
97           status.ToString().c_str());
98     return status;
99   }
100 
101   // cleanup files with the patter :digi:.rc
102   for (auto file : files) {
103     if (IsCacheFile(file)) {
104       // cache file
105       Info(opt_.log, "Removing file %s.", file.c_str());
106       status = opt_.env->DeleteFile(folder + "/" + file);
107       if (!status.ok()) {
108         Error(opt_.log, "Error deleting file %s. %s", file.c_str(),
109               status.ToString().c_str());
110         return status;
111       }
112     } else {
113       ROCKS_LOG_DEBUG(opt_.log, "Skipping file %s", file.c_str());
114     }
115   }
116   return Status::OK();
117 }
118 
Close()119 Status BlockCacheTier::Close() {
120   // stop the insert thread
121   if (opt_.pipeline_writes && insert_th_.joinable()) {
122     InsertOp op(/*quit=*/true);
123     insert_ops_.Push(std::move(op));
124     insert_th_.join();
125   }
126 
127   // stop the writer before
128   writer_.Stop();
129 
130   // clear all metadata
131   WriteLock _(&lock_);
132   metadata_.Clear();
133   return Status::OK();
134 }
135 
136 template<class T>
Add(std::map<std::string,double> * stats,const std::string & key,const T & t)137 void Add(std::map<std::string, double>* stats, const std::string& key,
138          const T& t) {
139   stats->insert({key, static_cast<double>(t)});
140 }
141 
Stats()142 PersistentCache::StatsType BlockCacheTier::Stats() {
143   std::map<std::string, double> stats;
144   Add(&stats, "persistentcache.blockcachetier.bytes_piplined",
145       stats_.bytes_pipelined_.Average());
146   Add(&stats, "persistentcache.blockcachetier.bytes_written",
147       stats_.bytes_written_.Average());
148   Add(&stats, "persistentcache.blockcachetier.bytes_read",
149       stats_.bytes_read_.Average());
150   Add(&stats, "persistentcache.blockcachetier.insert_dropped",
151       stats_.insert_dropped_);
152   Add(&stats, "persistentcache.blockcachetier.cache_hits",
153       stats_.cache_hits_);
154   Add(&stats, "persistentcache.blockcachetier.cache_misses",
155       stats_.cache_misses_);
156   Add(&stats, "persistentcache.blockcachetier.cache_errors",
157       stats_.cache_errors_);
158   Add(&stats, "persistentcache.blockcachetier.cache_hits_pct",
159       stats_.CacheHitPct());
160   Add(&stats, "persistentcache.blockcachetier.cache_misses_pct",
161       stats_.CacheMissPct());
162   Add(&stats, "persistentcache.blockcachetier.read_hit_latency",
163       stats_.read_hit_latency_.Average());
164   Add(&stats, "persistentcache.blockcachetier.read_miss_latency",
165       stats_.read_miss_latency_.Average());
166   Add(&stats, "persistentcache.blockcachetier.write_latency",
167       stats_.write_latency_.Average());
168 
169   auto out = PersistentCacheTier::Stats();
170   out.push_back(stats);
171   return out;
172 }
173 
Insert(const Slice & key,const char * data,const size_t size)174 Status BlockCacheTier::Insert(const Slice& key, const char* data,
175                               const size_t size) {
176   // update stats
177   stats_.bytes_pipelined_.Add(size);
178 
179   if (opt_.pipeline_writes) {
180     // off load the write to the write thread
181     insert_ops_.Push(
182         InsertOp(key.ToString(), std::move(std::string(data, size))));
183     return Status::OK();
184   }
185 
186   assert(!opt_.pipeline_writes);
187   return InsertImpl(key, Slice(data, size));
188 }
189 
InsertMain()190 void BlockCacheTier::InsertMain() {
191   while (true) {
192     InsertOp op(insert_ops_.Pop());
193 
194     if (op.signal_) {
195       // that is a secret signal to exit
196       break;
197     }
198 
199     size_t retry = 0;
200     Status s;
201     while ((s = InsertImpl(Slice(op.key_), Slice(op.data_))).IsTryAgain()) {
202       if (retry > kMaxRetry) {
203         break;
204       }
205 
206       // this can happen when the buffers are full, we wait till some buffers
207       // are free. Why don't we wait inside the code. This is because we want
208       // to support both pipelined and non-pipelined mode
209       buffer_allocator_.WaitUntilUsable();
210       retry++;
211     }
212 
213     if (!s.ok()) {
214       stats_.insert_dropped_++;
215     }
216   }
217 }
218 
InsertImpl(const Slice & key,const Slice & data)219 Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) {
220   // pre-condition
221   assert(key.size());
222   assert(data.size());
223   assert(cache_file_);
224 
225   StopWatchNano timer(opt_.env, /*auto_start=*/ true);
226 
227   WriteLock _(&lock_);
228 
229   LBA lba;
230   if (metadata_.Lookup(key, &lba)) {
231     // the key already exists, this is duplicate insert
232     return Status::OK();
233   }
234 
235   while (!cache_file_->Append(key, data, &lba)) {
236     if (!cache_file_->Eof()) {
237       ROCKS_LOG_DEBUG(opt_.log, "Error inserting to cache file %d",
238                       cache_file_->cacheid());
239       stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
240       return Status::TryAgain();
241     }
242 
243     assert(cache_file_->Eof());
244     Status status = NewCacheFile();
245     if (!status.ok()) {
246       return status;
247     }
248   }
249 
250   // Insert into lookup index
251   BlockInfo* info = metadata_.Insert(key, lba);
252   assert(info);
253   if (!info) {
254     return Status::IOError("Unexpected error inserting to index");
255   }
256 
257   // insert to cache file reverse mapping
258   cache_file_->Add(info);
259 
260   // update stats
261   stats_.bytes_written_.Add(data.size());
262   stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
263   return Status::OK();
264 }
265 
Lookup(const Slice & key,std::unique_ptr<char[]> * val,size_t * size)266 Status BlockCacheTier::Lookup(const Slice& key, std::unique_ptr<char[]>* val,
267                               size_t* size) {
268   StopWatchNano timer(opt_.env, /*auto_start=*/ true);
269 
270   LBA lba;
271   bool status;
272   status = metadata_.Lookup(key, &lba);
273   if (!status) {
274     stats_.cache_misses_++;
275     stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
276     return Status::NotFound("blockcache: key not found");
277   }
278 
279   BlockCacheFile* const file = metadata_.Lookup(lba.cache_id_);
280   if (!file) {
281     // this can happen because the block index and cache file index are
282     // different, and the cache file might be removed between the two lookups
283     stats_.cache_misses_++;
284     stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
285     return Status::NotFound("blockcache: cache file not found");
286   }
287 
288   assert(file->refs_);
289 
290   std::unique_ptr<char[]> scratch(new char[lba.size_]);
291   Slice blk_key;
292   Slice blk_val;
293 
294   status = file->Read(lba, &blk_key, &blk_val, scratch.get());
295   --file->refs_;
296   if (!status) {
297     stats_.cache_misses_++;
298     stats_.cache_errors_++;
299     stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
300     return Status::NotFound("blockcache: error reading data");
301   }
302 
303   assert(blk_key == key);
304 
305   val->reset(new char[blk_val.size()]);
306   memcpy(val->get(), blk_val.data(), blk_val.size());
307   *size = blk_val.size();
308 
309   stats_.bytes_read_.Add(*size);
310   stats_.cache_hits_++;
311   stats_.read_hit_latency_.Add(timer.ElapsedNanos() / 1000);
312 
313   return Status::OK();
314 }
315 
Erase(const Slice & key)316 bool BlockCacheTier::Erase(const Slice& key) {
317   WriteLock _(&lock_);
318   BlockInfo* info = metadata_.Remove(key);
319   assert(info);
320   delete info;
321   return true;
322 }
323 
NewCacheFile()324 Status BlockCacheTier::NewCacheFile() {
325   lock_.AssertHeld();
326 
327   TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir",
328                            (void*)(GetCachePath().c_str()));
329 
330   std::unique_ptr<WriteableCacheFile> f(
331     new WriteableCacheFile(opt_.env, &buffer_allocator_, &writer_,
332                            GetCachePath(), writer_cache_id_,
333                            opt_.cache_file_size, opt_.log));
334 
335   bool status = f->Create(opt_.enable_direct_writes, opt_.enable_direct_reads);
336   if (!status) {
337     return Status::IOError("Error creating file");
338   }
339 
340   Info(opt_.log, "Created cache file %d", writer_cache_id_);
341 
342   writer_cache_id_++;
343   cache_file_ = f.release();
344 
345   // insert to cache files tree
346   status = metadata_.Insert(cache_file_);
347   assert(status);
348   if (!status) {
349     Error(opt_.log, "Error inserting to metadata");
350     return Status::IOError("Error inserting to metadata");
351   }
352 
353   return Status::OK();
354 }
355 
Reserve(const size_t size)356 bool BlockCacheTier::Reserve(const size_t size) {
357   WriteLock _(&lock_);
358   assert(size_ <= opt_.cache_size);
359 
360   if (size + size_ <= opt_.cache_size) {
361     // there is enough space to write
362     size_ += size;
363     return true;
364   }
365 
366   assert(size + size_ >= opt_.cache_size);
367   // there is not enough space to fit the requested data
368   // we can clear some space by evicting cold data
369 
370   const double retain_fac = (100 - kEvictPct) / static_cast<double>(100);
371   while (size + size_ > opt_.cache_size * retain_fac) {
372     std::unique_ptr<BlockCacheFile> f(metadata_.Evict());
373     if (!f) {
374       // nothing is evictable
375       return false;
376     }
377     assert(!f->refs_);
378     uint64_t file_size;
379     if (!f->Delete(&file_size).ok()) {
380       // unable to delete file
381       return false;
382     }
383 
384     assert(file_size <= size_);
385     size_ -= file_size;
386   }
387 
388   size_ += size;
389   assert(size_ <= opt_.cache_size * 0.9);
390   return true;
391 }
392 
NewPersistentCache(Env * const env,const std::string & path,const uint64_t size,const std::shared_ptr<Logger> & log,const bool optimized_for_nvm,std::shared_ptr<PersistentCache> * cache)393 Status NewPersistentCache(Env* const env, const std::string& path,
394                           const uint64_t size,
395                           const std::shared_ptr<Logger>& log,
396                           const bool optimized_for_nvm,
397                           std::shared_ptr<PersistentCache>* cache) {
398   if (!cache) {
399     return Status::IOError("invalid argument cache");
400   }
401 
402   auto opt = PersistentCacheConfig(env, path, size, log);
403   if (optimized_for_nvm) {
404     // the default settings are optimized for SSD
405     // NVM devices are better accessed with 4K direct IO and written with
406     // parallelism
407     opt.enable_direct_writes = true;
408     opt.writer_qdepth = 4;
409     opt.writer_dispatch_size = 4 * 1024;
410   }
411 
412   auto pcache = std::make_shared<BlockCacheTier>(opt);
413   Status s = pcache->Open();
414 
415   if (!s.ok()) {
416     return s;
417   }
418 
419   *cache = pcache;
420   return s;
421 }
422 
423 }  // namespace ROCKSDB_NAMESPACE
424 
425 #endif  // ifndef ROCKSDB_LITE
426