1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
4 #include "rocksdb/utilities/env_librados.h"
5 #include "util/random.h"
6 #include <mutex>
7 #include <cstdlib>
8 
9 namespace ROCKSDB_NAMESPACE {
10 /* GLOBAL DIFINE */
11 // #define DEBUG
12 #ifdef DEBUG
13 #include <cstdio>
14 #include <sys/syscall.h>
15 #include <unistd.h>
16 #define LOG_DEBUG(...)  do{\
17     printf("[%ld:%s:%i:%s]", syscall(SYS_gettid), __FILE__, __LINE__, __FUNCTION__);\
18     printf(__VA_ARGS__);\
19   }while(0)
20 #else
21 #define LOG_DEBUG(...)
22 #endif
23 
24 /* GLOBAL CONSTANT */
25 const char *default_db_name     = "default_envlibrados_db";
26 const char *default_pool_name   = "default_envlibrados_pool";
27 const char *default_config_path = "CEPH_CONFIG_PATH";           // the env variable name of ceph configure file
28 // maximum dir/file that can store in the fs
29 const int MAX_ITEMS_IN_FS = 1 << 30;
30 // root dir tag
31 const std::string ROOT_DIR_KEY = "/";
32 const std::string DIR_ID_VALUE = "<DIR>";
33 
34 /**
35  * @brief convert error code to status
36  * @details Convert internal linux error code to Status
37  *
38  * @param r [description]
39  * @return [description]
40  */
err_to_status(int r)41 Status err_to_status(int r)
42 {
43   switch (r) {
44   case 0:
45     return Status::OK();
46   case -ENOENT:
47     return Status::IOError();
48   case -ENODATA:
49   case -ENOTDIR:
50     return Status::NotFound(Status::kNone);
51   case -EINVAL:
52     return Status::InvalidArgument(Status::kNone);
53   case -EIO:
54     return Status::IOError(Status::kNone);
55   default:
56     // FIXME :(
57     assert(0 == "unrecognized error code");
58     return Status::NotSupported(Status::kNone);
59   }
60 }
61 
62 /**
63  * @brief split file path into dir path and file name
64  * @details
65  * Because rocksdb only need a 2-level structure (dir/file), all input path will be shortened to dir/file format
66  *  For example:
67  *    b/c => dir '/b', file 'c'
68  *    /a/b/c => dir '/b', file 'c'
69  *
70  * @param fn [description]
71  * @param dir [description]
72  * @param file [description]
73  */
split(const std::string & fn,std::string * dir,std::string * file)74 void split(const std::string &fn, std::string *dir, std::string *file) {
75   LOG_DEBUG("[IN]%s\n", fn.c_str());
76   int pos = fn.size() - 1;
77   while ('/' == fn[pos]) --pos;
78   size_t fstart = fn.rfind('/', pos);
79   *file = fn.substr(fstart + 1, pos - fstart);
80 
81   pos = fstart;
82   while (pos >= 0 && '/' == fn[pos]) --pos;
83 
84   if (pos < 0) {
85     *dir = "/";
86   } else {
87     size_t dstart = fn.rfind('/', pos);
88     *dir = fn.substr(dstart + 1, pos - dstart);
89     *dir = std::string("/") + *dir;
90   }
91 
92   LOG_DEBUG("[OUT]%s | %s\n", dir->c_str(), file->c_str());
93 }
94 
95 // A file abstraction for reading sequentially through a file
96 class LibradosSequentialFile : public SequentialFile {
97   librados::IoCtx * _io_ctx;
98   std::string _fid;
99   std::string _hint;
100   int _offset;
101 public:
LibradosSequentialFile(librados::IoCtx * io_ctx,std::string fid,std::string hint)102   LibradosSequentialFile(librados::IoCtx * io_ctx, std::string fid, std::string hint):
103     _io_ctx(io_ctx), _fid(fid), _hint(hint), _offset(0) {}
104 
~LibradosSequentialFile()105   ~LibradosSequentialFile() {}
106 
107   /**
108    * @brief read file
109    * @details
110    *  Read up to "n" bytes from the file.  "scratch[0..n-1]" may be
111    *  written by this routine.  Sets "*result" to the data that was
112    *  read (including if fewer than "n" bytes were successfully read).
113    *  May set "*result" to point at data in "scratch[0..n-1]", so
114    *  "scratch[0..n-1]" must be live when "*result" is used.
115    *  If an error was encountered, returns a non-OK status.
116    *
117    *  REQUIRES: External synchronization
118    *
119    * @param n [description]
120    * @param result [description]
121    * @param scratch [description]
122    * @return [description]
123    */
Read(size_t n,Slice * result,char * scratch)124   Status Read(size_t n, Slice* result, char* scratch) {
125     LOG_DEBUG("[IN]%i\n", (int)n);
126     librados::bufferlist buffer;
127     Status s;
128     int r = _io_ctx->read(_fid, buffer, n, _offset);
129     if (r >= 0) {
130       buffer.begin().copy(r, scratch);
131       *result = Slice(scratch, r);
132       _offset += r;
133       s = Status::OK();
134     } else {
135       s = err_to_status(r);
136       if (s == Status::IOError()) {
137         *result = Slice();
138         s = Status::OK();
139       }
140     }
141     LOG_DEBUG("[OUT]%s, %i, %s\n", s.ToString().c_str(), (int)r, buffer.c_str());
142     return s;
143   }
144 
145   /**
146    * @brief skip "n" bytes from the file
147    * @details
148    *  Skip "n" bytes from the file. This is guaranteed to be no
149    *  slower that reading the same data, but may be faster.
150    *
151    *  If end of file is reached, skipping will stop at the end of the
152    *  file, and Skip will return OK.
153    *
154    *  REQUIRES: External synchronization
155    *
156    * @param n [description]
157    * @return [description]
158    */
Skip(uint64_t n)159   Status Skip(uint64_t n) {
160     _offset += n;
161     return Status::OK();
162   }
163 
164   /**
165    * @brief noop
166    * @details
167    *  rocksdb has it's own caching capabilities that we should be able to use,
168    *  without relying on a cache here. This can safely be a no-op.
169    *
170    * @param offset [description]
171    * @param length [description]
172    *
173    * @return [description]
174    */
InvalidateCache(size_t offset,size_t length)175   Status InvalidateCache(size_t offset, size_t length) {
176     return Status::OK();
177   }
178 };
179 
180 // A file abstraction for randomly reading the contents of a file.
181 class LibradosRandomAccessFile : public RandomAccessFile {
182   librados::IoCtx * _io_ctx;
183   std::string _fid;
184   std::string _hint;
185 public:
LibradosRandomAccessFile(librados::IoCtx * io_ctx,std::string fid,std::string hint)186   LibradosRandomAccessFile(librados::IoCtx * io_ctx, std::string fid, std::string hint):
187     _io_ctx(io_ctx), _fid(fid), _hint(hint) {}
188 
~LibradosRandomAccessFile()189   ~LibradosRandomAccessFile() {}
190 
191   /**
192    * @brief read file
193    * @details similar to LibradosSequentialFile::Read
194    *
195    * @param offset [description]
196    * @param n [description]
197    * @param result [description]
198    * @param scratch [description]
199    * @return [description]
200    */
Read(uint64_t offset,size_t n,Slice * result,char * scratch) const201   Status Read(uint64_t offset, size_t n, Slice* result,
202               char* scratch) const {
203     LOG_DEBUG("[IN]%i\n", (int)n);
204     librados::bufferlist buffer;
205     Status s;
206     int r = _io_ctx->read(_fid, buffer, n, offset);
207     if (r >= 0) {
208       buffer.begin().copy(r, scratch);
209       *result = Slice(scratch, r);
210       s = Status::OK();
211     } else {
212       s = err_to_status(r);
213       if (s == Status::IOError()) {
214         *result = Slice();
215         s = Status::OK();
216       }
217     }
218     LOG_DEBUG("[OUT]%s, %i, %s\n", s.ToString().c_str(), (int)r, buffer.c_str());
219     return s;
220   }
221 
222   /**
223    * @brief [brief description]
224    * @details Get unique id for each file and guarantee this id is different for each file
225    *
226    * @param id [description]
227    * @param max_size max size of id, it shoud be larger than 16
228    *
229    * @return [description]
230    */
GetUniqueId(char * id,size_t max_size) const231   size_t GetUniqueId(char* id, size_t max_size) const {
232     // All fid has the same db_id prefix, so we need to ignore db_id prefix
233     size_t s = std::min(max_size, _fid.size());
234     strncpy(id, _fid.c_str() + (_fid.size() - s), s);
235     id[s - 1] = '\0';
236     return s;
237   };
238 
239   //enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED };
Hint(AccessPattern pattern)240   void Hint(AccessPattern pattern) {
241     /* Do nothing */
242   }
243 
244   /**
245    * @brief noop
246    * @details [long description]
247    *
248    * @param offset [description]
249    * @param length [description]
250    *
251    * @return [description]
252    */
InvalidateCache(size_t offset,size_t length)253   Status InvalidateCache(size_t offset, size_t length) {
254     return Status::OK();
255   }
256 };
257 
258 
259 // A file abstraction for sequential writing.  The implementation
260 // must provide buffering since callers may append small fragments
261 // at a time to the file.
262 class LibradosWritableFile : public WritableFile {
263   librados::IoCtx * _io_ctx;
264   std::string _fid;
265   std::string _hint;
266   const EnvLibrados * const _env;
267 
268   std::mutex _mutex;                 // used to protect modification of all following variables
269   librados::bufferlist _buffer;      // write buffer
270   uint64_t _buffer_size;             // write buffer size
271   uint64_t _file_size;               // this file size doesn't include buffer size
272 
273   /**
274    * @brief assuming caller holds lock
275    * @details [long description]
276    * @return [description]
277    */
_SyncLocked()278   int _SyncLocked() {
279     // 1. sync append data to RADOS
280     int r = _io_ctx->append(_fid, _buffer, _buffer_size);
281     assert(r >= 0);
282 
283     // 2. update local variables
284     if (0 == r) {
285       _buffer.clear();
286       _file_size += _buffer_size;
287       _buffer_size = 0;
288     }
289 
290     return r;
291   }
292 
293  public:
LibradosWritableFile(librados::IoCtx * io_ctx,std::string fid,std::string hint,const EnvLibrados * const env,const EnvOptions & options)294   LibradosWritableFile(librados::IoCtx* io_ctx, std::string fid,
295                        std::string hint, const EnvLibrados* const env,
296                        const EnvOptions& options)
297       : WritableFile(options),
298         _io_ctx(io_ctx),
299         _fid(fid),
300         _hint(hint),
301         _env(env),
302         _buffer(),
303         _buffer_size(0),
304         _file_size(0) {
305     int ret = _io_ctx->stat(_fid, &_file_size, nullptr);
306 
307     // if file not exist
308     if (ret < 0) {
309       _file_size = 0;
310     }
311   }
312 
~LibradosWritableFile()313   ~LibradosWritableFile() {
314     // sync before closeing writable file
315     Sync();
316   }
317 
318   /**
319    * @brief append data to file
320    * @details
321    *  Append will save all written data in buffer util buffer size
322    *  reaches buffer max size. Then, it will write buffer into rados
323    *
324    * @param data [description]
325    * @return [description]
326    */
Append(const Slice & data)327   Status Append(const Slice& data) {
328     // append buffer
329     LOG_DEBUG("[IN] %i | %s\n", (int)data.size(), data.data());
330     int r = 0;
331 
332     std::lock_guard<std::mutex> lock(_mutex);
333     _buffer.append(data.data(), data.size());
334     _buffer_size += data.size();
335 
336     if (_buffer_size > _env->_write_buffer_size) {
337       r = _SyncLocked();
338     }
339 
340     LOG_DEBUG("[OUT] %i\n", r);
341     return err_to_status(r);
342   }
343 
344   /**
345    * @brief not supported
346    * @details [long description]
347    * @return [description]
348    */
PositionedAppend(const Slice &,uint64_t)349   Status PositionedAppend(
350     const Slice& /* data */,
351     uint64_t /* offset */) {
352     return Status::NotSupported();
353   }
354 
355   /**
356    * @brief truncate file to assigned size
357    * @details [long description]
358    *
359    * @param size [description]
360    * @return [description]
361    */
Truncate(uint64_t size)362   Status Truncate(uint64_t size) {
363     LOG_DEBUG("[IN]%lld|%lld|%lld\n", (long long)size, (long long)_file_size, (long long)_buffer_size);
364     int r = 0;
365 
366     std::lock_guard<std::mutex> lock(_mutex);
367     if (_file_size > size) {
368       r = _io_ctx->trunc(_fid, size);
369 
370       if (r == 0) {
371         _buffer.clear();
372         _buffer_size = 0;
373         _file_size = size;
374       }
375     } else if (_file_size == size) {
376       _buffer.clear();
377       _buffer_size = 0;
378     } else {
379       librados::bufferlist tmp;
380       tmp.claim(_buffer);
381       _buffer.substr_of(tmp, 0, size - _file_size);
382       _buffer_size = size - _file_size;
383     }
384 
385     LOG_DEBUG("[OUT] %i\n", r);
386     return err_to_status(r);
387   }
388 
389   /**
390    * @brief close file
391    * @details [long description]
392    * @return [description]
393    */
Close()394   Status Close() {
395     LOG_DEBUG("%s | %lld | %lld\n", _hint.c_str(), (long long)_buffer_size, (long long)_file_size);
396     return Sync();
397   }
398 
399   /**
400    * @brief flush file,
401    * @details initiate an aio write and not wait
402    *
403    * @return [description]
404    */
Flush()405   Status Flush() {
406     librados::AioCompletion *write_completion = librados::Rados::aio_create_completion();
407     int r = 0;
408 
409     std::lock_guard<std::mutex> lock(_mutex);
410     r = _io_ctx->aio_append(_fid, write_completion, _buffer, _buffer_size);
411 
412     if (0 == r) {
413       _file_size += _buffer_size;
414       _buffer.clear();
415       _buffer_size = 0;
416     }
417 
418     write_completion->release();
419 
420     return err_to_status(r);
421   }
422 
423   /**
424    * @brief write buffer data to rados
425    * @details initiate an aio write and wait for result
426    * @return [description]
427    */
Sync()428   Status Sync() { // sync data
429     int r = 0;
430 
431     std::lock_guard<std::mutex> lock(_mutex);
432     if (_buffer_size > 0) {
433       r = _SyncLocked();
434     }
435 
436     return err_to_status(r);
437   }
438 
439   /**
440    * @brief [brief description]
441    * @details [long description]
442    * @return true if Sync() and Fsync() are safe to call concurrently with Append()and Flush().
443    */
IsSyncThreadSafe() const444   bool IsSyncThreadSafe() const {
445     return true;
446   }
447 
448   /**
449    * @brief Indicates the upper layers if the current WritableFile implementation uses direct IO.
450    * @details [long description]
451    * @return [description]
452    */
use_direct_io() const453   bool use_direct_io() const {
454     return false;
455   }
456 
457   /**
458    * @brief Get file size
459    * @details
460    *  This API will use cached file_size.
461    * @return [description]
462    */
GetFileSize()463   uint64_t GetFileSize() {
464     LOG_DEBUG("%lld|%lld\n", (long long)_buffer_size, (long long)_file_size);
465 
466     std::lock_guard<std::mutex> lock(_mutex);
467     int file_size = _file_size + _buffer_size;
468 
469     return file_size;
470   }
471 
472   /**
473    * @brief For documentation, refer to RandomAccessFile::GetUniqueId()
474    * @details [long description]
475    *
476    * @param id [description]
477    * @param max_size [description]
478    *
479    * @return [description]
480    */
GetUniqueId(char * id,size_t max_size) const481   size_t GetUniqueId(char* id, size_t max_size) const {
482     // All fid has the same db_id prefix, so we need to ignore db_id prefix
483     size_t s = std::min(max_size, _fid.size());
484     strncpy(id, _fid.c_str() + (_fid.size() - s), s);
485     id[s - 1] = '\0';
486     return s;
487   }
488 
489   /**
490    * @brief noop
491    * @details [long description]
492    *
493    * @param offset [description]
494    * @param length [description]
495    *
496    * @return [description]
497    */
InvalidateCache(size_t offset,size_t length)498   Status InvalidateCache(size_t offset, size_t length) {
499     return Status::OK();
500   }
501 
502   using WritableFile::RangeSync;
503   /**
504    * @brief No RangeSync support, just call Sync()
505    * @details [long description]
506    *
507    * @param offset [description]
508    * @param nbytes [description]
509    *
510    * @return [description]
511    */
RangeSync(off_t offset,off_t nbytes)512   Status RangeSync(off_t offset, off_t nbytes) {
513     return Sync();
514   }
515 
516 protected:
517   using WritableFile::Allocate;
518   /**
519    * @brief noop
520    * @details [long description]
521    *
522    * @param offset [description]
523    * @param len [description]
524    *
525    * @return [description]
526    */
Allocate(off_t offset,off_t len)527   Status Allocate(off_t offset, off_t len) {
528     return Status::OK();
529   }
530 };
531 
532 
533 // Directory object represents collection of files and implements
534 // filesystem operations that can be executed on directories.
535 class LibradosDirectory : public Directory {
536   librados::IoCtx * _io_ctx;
537   std::string _fid;
538 public:
LibradosDirectory(librados::IoCtx * io_ctx,std::string fid)539   explicit LibradosDirectory(librados::IoCtx * io_ctx, std::string fid):
540     _io_ctx(io_ctx), _fid(fid) {}
541 
542   // Fsync directory. Can be called concurrently from multiple threads.
Fsync()543   Status Fsync() {
544     return Status::OK();
545   }
546 };
547 
548 // Identifies a locked file.
549 // This is exclusive lock and can't nested lock by same thread
550 class LibradosFileLock : public FileLock {
551   librados::IoCtx * _io_ctx;
552   const std::string _obj_name;
553   const std::string _lock_name;
554   const std::string _cookie;
555   int lock_state;
556 public:
LibradosFileLock(librados::IoCtx * io_ctx,const std::string obj_name)557   LibradosFileLock(
558     librados::IoCtx * io_ctx,
559     const std::string obj_name):
560     _io_ctx(io_ctx),
561     _obj_name(obj_name),
562     _lock_name("lock_name"),
563     _cookie("cookie") {
564 
565     // TODO: the lock will never expire. It may cause problem if the process crash or abnormally exit.
566     while (!_io_ctx->lock_exclusive(
567              _obj_name,
568              _lock_name,
569              _cookie,
570              "description", nullptr, 0));
571   }
572 
~LibradosFileLock()573   ~LibradosFileLock() {
574     _io_ctx->unlock(_obj_name, _lock_name, _cookie);
575   }
576 };
577 
578 
579 // --------------------
580 // --- EnvLibrados ----
581 // --------------------
582 /**
583  * @brief EnvLibrados ctor
584  * @details [long description]
585  *
586  * @param db_name unique database name
587  * @param config_path the configure file path for rados
588  */
EnvLibrados(const std::string & db_name,const std::string & config_path,const std::string & db_pool)589 EnvLibrados::EnvLibrados(const std::string& db_name,
590                          const std::string& config_path,
591                          const std::string& db_pool)
592   : EnvLibrados("client.admin",
593                 "ceph",
594                 0,
595                 db_name,
596                 config_path,
597                 db_pool,
598                 "/wal",
599                 db_pool,
600                 1 << 20) {}
601 
602 /**
603  * @brief EnvLibrados ctor
604  * @details [long description]
605  *
606  * @param client_name       first 3 parameters is for RADOS client init
607  * @param cluster_name
608  * @param flags
609  * @param db_name           unique database name, used as db_id key
610  * @param config_path the   configure file path for rados
611  * @param db_pool the pool  for db data
612  * @param wal_pool the pool for WAL data
613  * @param write_buffer_size WritableFile buffer max size
614  */
EnvLibrados(const std::string & client_name,const std::string & cluster_name,const uint64_t flags,const std::string & db_name,const std::string & config_path,const std::string & db_pool,const std::string & wal_dir,const std::string & wal_pool,const uint64_t write_buffer_size)615 EnvLibrados::EnvLibrados(const std::string& client_name,
616                          const std::string& cluster_name,
617                          const uint64_t flags,
618                          const std::string& db_name,
619                          const std::string& config_path,
620                          const std::string& db_pool,
621                          const std::string& wal_dir,
622                          const std::string& wal_pool,
623                          const uint64_t write_buffer_size)
624   : EnvWrapper(Env::Default()),
625     _client_name(client_name),
626     _cluster_name(cluster_name),
627     _flags(flags),
628     _db_name(db_name),
629     _config_path(config_path),
630     _db_pool_name(db_pool),
631     _wal_dir(wal_dir),
632     _wal_pool_name(wal_pool),
633     _write_buffer_size(write_buffer_size) {
634   int ret = 0;
635 
636   // 1. create a Rados object and initialize it
637   ret = _rados.init2(_client_name.c_str(), _cluster_name.c_str(), _flags); // just use the client.admin keyring
638   if (ret < 0) { // let's handle any error that might have come back
639     std::cerr << "couldn't initialize rados! error " << ret << std::endl;
640     ret = EXIT_FAILURE;
641     goto out;
642   }
643 
644   // 2. read configure file
645   ret = _rados.conf_read_file(_config_path.c_str());
646   if (ret < 0) {
647     // This could fail if the config file is malformed, but it'd be hard.
648     std::cerr << "failed to parse config file " << _config_path
649               << "! error" << ret << std::endl;
650     ret = EXIT_FAILURE;
651     goto out;
652   }
653 
654   // 3. we actually connect to the cluster
655   ret = _rados.connect();
656   if (ret < 0) {
657     std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
658     ret = EXIT_FAILURE;
659     goto out;
660   }
661 
662   // 4. create db_pool if not exist
663   ret = _rados.pool_create(_db_pool_name.c_str());
664   if (ret < 0 && ret != -EEXIST && ret !=  -EPERM) {
665     std::cerr << "couldn't create pool! error " << ret << std::endl;
666     goto out;
667   }
668 
669   // 5. create db_pool_ioctx
670   ret = _rados.ioctx_create(_db_pool_name.c_str(), _db_pool_ioctx);
671   if (ret < 0) {
672     std::cerr << "couldn't set up ioctx! error " << ret << std::endl;
673     ret = EXIT_FAILURE;
674     goto out;
675   }
676 
677   // 6. create wal_pool if not exist
678   ret = _rados.pool_create(_wal_pool_name.c_str());
679   if (ret < 0 && ret != -EEXIST && ret !=  -EPERM) {
680     std::cerr << "couldn't create pool! error " << ret << std::endl;
681     goto out;
682   }
683 
684   // 7. create wal_pool_ioctx
685   ret = _rados.ioctx_create(_wal_pool_name.c_str(), _wal_pool_ioctx);
686   if (ret < 0) {
687     std::cerr << "couldn't set up ioctx! error " << ret << std::endl;
688     ret = EXIT_FAILURE;
689     goto out;
690   }
691 
692   // 8. add root dir
693   _AddFid(ROOT_DIR_KEY, DIR_ID_VALUE);
694 
695 out:
696   LOG_DEBUG("rados connect result code : %i\n", ret);
697 }
698 
699 /****************************************************
700   private functions to handle fid operation.
701   Dir also have fid, but the value is DIR_ID_VALUE
702 ****************************************************/
703 
704 /**
705  * @brief generate a new fid
706  * @details [long description]
707  * @return [description]
708  */
_CreateFid()709 std::string EnvLibrados::_CreateFid() {
710   return _db_name + "." + GenerateUniqueId();
711 }
712 
713 /**
714  * @brief get fid
715  * @details [long description]
716  *
717  * @param fname [description]
718  * @param fid [description]
719  *
720  * @return
721  *  Status::OK()
722  *  Status::NotFound()
723  */
_GetFid(const std::string & fname,std::string & fid)724 Status EnvLibrados::_GetFid(
725   const std::string &fname,
726   std::string& fid) {
727   std::set<std::string> keys;
728   std::map<std::string, librados::bufferlist> kvs;
729   keys.insert(fname);
730   int r = _db_pool_ioctx.omap_get_vals_by_keys(_db_name, keys, &kvs);
731 
732   if (0 == r && 0 == kvs.size()) {
733     return Status::NotFound();
734   } else if (0 == r && 0 != kvs.size()) {
735     fid.assign(kvs[fname].c_str(), kvs[fname].length());
736     return Status::OK();
737   } else {
738     return err_to_status(r);
739   }
740 }
741 
742 /**
743  * @brief rename fid
744  * @details Only modify object in rados once,
745  * so this rename operation is atomic in term of rados
746  *
747  * @param old_fname [description]
748  * @param new_fname [description]
749  *
750  * @return [description]
751  */
_RenameFid(const std::string & old_fname,const std::string & new_fname)752 Status EnvLibrados::_RenameFid(const std::string& old_fname,
753                                const std::string& new_fname) {
754   std::string fid;
755   Status s = _GetFid(old_fname, fid);
756 
757   if (Status::OK() != s) {
758     return s;
759   }
760 
761   librados::bufferlist bl;
762   std::set<std::string> keys;
763   std::map<std::string, librados::bufferlist> kvs;
764   librados::ObjectWriteOperation o;
765   bl.append(fid);
766   keys.insert(old_fname);
767   kvs[new_fname] = bl;
768   o.omap_rm_keys(keys);
769   o.omap_set(kvs);
770   int r = _db_pool_ioctx.operate(_db_name, &o);
771   return err_to_status(r);
772 }
773 
774 /**
775  * @brief add <file path, fid> to metadata object. It may overwrite exist key.
776  * @details [long description]
777  *
778  * @param fname [description]
779  * @param fid [description]
780  *
781  * @return [description]
782  */
_AddFid(const std::string & fname,const std::string & fid)783 Status EnvLibrados::_AddFid(
784   const std::string& fname,
785   const std::string& fid) {
786   std::map<std::string, librados::bufferlist> kvs;
787   librados::bufferlist value;
788   value.append(fid);
789   kvs[fname] = value;
790   int r = _db_pool_ioctx.omap_set(_db_name, kvs);
791   return err_to_status(r);
792 }
793 
794 /**
795  * @brief return subfile names of dir.
796  * @details
797  *  RocksDB has a 2-level structure, so all keys
798  *  that have dir as prefix are subfiles of dir.
799  *  So we can just return these files' name.
800  *
801  * @param dir [description]
802  * @param result [description]
803  *
804  * @return [description]
805  */
_GetSubFnames(const std::string & dir,std::vector<std::string> * result)806 Status EnvLibrados::_GetSubFnames(
807   const std::string& dir,
808   std::vector<std::string> * result
809 ) {
810   std::string start_after(dir);
811   std::string filter_prefix(dir);
812   std::map<std::string, librados::bufferlist> kvs;
813   _db_pool_ioctx.omap_get_vals(_db_name,
814                                start_after, filter_prefix,
815                                MAX_ITEMS_IN_FS, &kvs);
816 
817   result->clear();
818   for (auto i = kvs.begin(); i != kvs.end(); i++) {
819     result->push_back(i->first.substr(dir.size() + 1));
820   }
821   return Status::OK();
822 }
823 
824 /**
825  * @brief delete key fname from metadata object
826  * @details [long description]
827  *
828  * @param fname [description]
829  * @return [description]
830  */
_DelFid(const std::string & fname)831 Status EnvLibrados::_DelFid(
832   const std::string& fname) {
833   std::set<std::string> keys;
834   keys.insert(fname);
835   int r = _db_pool_ioctx.omap_rm_keys(_db_name, keys);
836   return err_to_status(r);
837 }
838 
839 /**
840  * @brief get match IoCtx from _prefix_pool_map
841  * @details [long description]
842  *
843  * @param prefix [description]
844  * @return [description]
845  *
846  */
_GetIoctx(const std::string & fpath)847 librados::IoCtx* EnvLibrados::_GetIoctx(const std::string& fpath) {
848   auto is_prefix = [](const std::string & s1, const std::string & s2) {
849     auto it1 = s1.begin(), it2 = s2.begin();
850     while (it1 != s1.end() && it2 != s2.end() && *it1 == *it2) ++it1, ++it2;
851     return it1 == s1.end();
852   };
853 
854   if (is_prefix(_wal_dir, fpath)) {
855     return &_wal_pool_ioctx;
856   } else {
857     return &_db_pool_ioctx;
858   }
859 }
860 
861 /************************************************************
862                 public functions
863 ************************************************************/
864 /**
865  * @brief generate unique id
866  * @details Combine system time and random number.
867  * @return [description]
868  */
GenerateUniqueId()869 std::string EnvLibrados::GenerateUniqueId() {
870   Random64 r(time(nullptr));
871   uint64_t random_uuid_portion =
872     r.Uniform(std::numeric_limits<uint64_t>::max());
873   uint64_t nanos_uuid_portion = NowNanos();
874   char uuid2[200];
875   snprintf(uuid2,
876            200,
877            "%16lx-%16lx",
878            (unsigned long)nanos_uuid_portion,
879            (unsigned long)random_uuid_portion);
880   return uuid2;
881 }
882 
883 /**
884  * @brief create a new sequential read file handler
885  * @details it will check the existence of fname
886  *
887  * @param fname [description]
888  * @param result [description]
889  * @param options [description]
890  * @return [description]
891  */
NewSequentialFile(const std::string & fname,std::unique_ptr<SequentialFile> * result,const EnvOptions & options)892 Status EnvLibrados::NewSequentialFile(
893   const std::string& fname,
894   std::unique_ptr<SequentialFile>* result,
895   const EnvOptions& options)
896 {
897   LOG_DEBUG("[IN]%s\n", fname.c_str());
898   std::string dir, file, fid;
899   split(fname, &dir, &file);
900   Status s;
901   std::string fpath = dir + "/" + file;
902   do {
903     s = _GetFid(dir, fid);
904 
905     if (!s.ok() || fid != DIR_ID_VALUE) {
906       if (fid != DIR_ID_VALUE) s = Status::IOError();
907       break;
908     }
909 
910     s = _GetFid(fpath, fid);
911 
912     if (Status::NotFound() == s) {
913       s = Status::IOError();
914       errno = ENOENT;
915       break;
916     }
917 
918     result->reset(new LibradosSequentialFile(_GetIoctx(fpath), fid, fpath));
919     s = Status::OK();
920   } while (0);
921 
922   LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
923   return s;
924 }
925 
926 /**
927  * @brief create a new random access file handler
928  * @details it will check the existence of fname
929  *
930  * @param fname [description]
931  * @param result [description]
932  * @param options [description]
933  * @return [description]
934  */
NewRandomAccessFile(const std::string & fname,std::unique_ptr<RandomAccessFile> * result,const EnvOptions & options)935 Status EnvLibrados::NewRandomAccessFile(
936   const std::string& fname,
937   std::unique_ptr<RandomAccessFile>* result,
938   const EnvOptions& options)
939 {
940   LOG_DEBUG("[IN]%s\n", fname.c_str());
941   std::string dir, file, fid;
942   split(fname, &dir, &file);
943   Status s;
944   std::string fpath = dir + "/" + file;
945   do {
946     s = _GetFid(dir, fid);
947 
948     if (!s.ok() || fid != DIR_ID_VALUE) {
949       s = Status::IOError();
950       break;
951     }
952 
953     s = _GetFid(fpath, fid);
954 
955     if (Status::NotFound() == s) {
956       s = Status::IOError();
957       errno = ENOENT;
958       break;
959     }
960 
961     result->reset(new LibradosRandomAccessFile(_GetIoctx(fpath), fid, fpath));
962     s = Status::OK();
963   } while (0);
964 
965   LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
966   return s;
967 }
968 
969 /**
970  * @brief create a new write file handler
971  * @details it will check the existence of fname
972  *
973  * @param fname [description]
974  * @param result [description]
975  * @param options [description]
976  * @return [description]
977  */
NewWritableFile(const std::string & fname,std::unique_ptr<WritableFile> * result,const EnvOptions & options)978 Status EnvLibrados::NewWritableFile(
979   const std::string& fname,
980   std::unique_ptr<WritableFile>* result,
981   const EnvOptions& options)
982 {
983   LOG_DEBUG("[IN]%s\n", fname.c_str());
984   std::string dir, file, fid;
985   split(fname, &dir, &file);
986   Status s;
987   std::string fpath = dir + "/" + file;
988 
989   do {
990     // 1. check if dir exist
991     s = _GetFid(dir, fid);
992     if (!s.ok()) {
993       break;
994     }
995 
996     if (fid != DIR_ID_VALUE) {
997       s = Status::IOError();
998       break;
999     }
1000 
1001     // 2. check if file exist.
1002     // 2.1 exist, use it
1003     // 2.2 not exist, create it
1004     s = _GetFid(fpath, fid);
1005     if (Status::NotFound() == s) {
1006       fid = _CreateFid();
1007       _AddFid(fpath, fid);
1008     }
1009 
1010     result->reset(
1011         new LibradosWritableFile(_GetIoctx(fpath), fid, fpath, this, options));
1012     s = Status::OK();
1013   } while (0);
1014 
1015   LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1016   return s;
1017 }
1018 
1019 /**
1020  * @brief reuse write file handler
1021  * @details
1022  *  This function will rename old_fname to new_fname,
1023  *  then return the handler of new_fname
1024  *
1025  * @param new_fname [description]
1026  * @param old_fname [description]
1027  * @param result [description]
1028  * @param options [description]
1029  * @return [description]
1030  */
ReuseWritableFile(const std::string & new_fname,const std::string & old_fname,std::unique_ptr<WritableFile> * result,const EnvOptions & options)1031 Status EnvLibrados::ReuseWritableFile(
1032   const std::string& new_fname,
1033   const std::string& old_fname,
1034   std::unique_ptr<WritableFile>* result,
1035   const EnvOptions& options)
1036 {
1037   LOG_DEBUG("[IN]%s => %s\n", old_fname.c_str(), new_fname.c_str());
1038   std::string src_fid, tmp_fid, src_dir, src_file, dst_dir, dst_file;
1039   split(old_fname, &src_dir, &src_file);
1040   split(new_fname, &dst_dir, &dst_file);
1041 
1042   std::string src_fpath = src_dir + "/" + src_file;
1043   std::string dst_fpath = dst_dir + "/" + dst_file;
1044   Status r = Status::OK();
1045   do {
1046     r = _RenameFid(src_fpath,
1047                    dst_fpath);
1048     if (!r.ok()) {
1049       break;
1050     }
1051 
1052     result->reset(new LibradosWritableFile(_GetIoctx(dst_fpath), src_fid,
1053                                            dst_fpath, this, options));
1054   } while (0);
1055 
1056   LOG_DEBUG("[OUT]%s\n", r.ToString().c_str());
1057   return r;
1058 }
1059 
1060 /**
1061  * @brief create a new directory handler
1062  * @details [long description]
1063  *
1064  * @param name [description]
1065  * @param result [description]
1066  *
1067  * @return [description]
1068  */
NewDirectory(const std::string & name,std::unique_ptr<Directory> * result)1069 Status EnvLibrados::NewDirectory(
1070   const std::string& name,
1071   std::unique_ptr<Directory>* result)
1072 {
1073   LOG_DEBUG("[IN]%s\n", name.c_str());
1074   std::string fid, dir, file;
1075   /* just want to get dir name */
1076   split(name + "/tmp", &dir, &file);
1077   Status s;
1078 
1079   do {
1080     s = _GetFid(dir, fid);
1081 
1082     if (!s.ok() || DIR_ID_VALUE != fid) {
1083       s = Status::IOError(name, strerror(-ENOENT));
1084       break;
1085     }
1086 
1087     if (Status::NotFound() == s) {
1088       s = _AddFid(dir, DIR_ID_VALUE);
1089       if (!s.ok()) break;
1090     } else if (!s.ok()) {
1091       break;
1092     }
1093 
1094     result->reset(new LibradosDirectory(_GetIoctx(dir), dir));
1095     s = Status::OK();
1096   } while (0);
1097 
1098   LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1099   return s;
1100 }
1101 
1102 /**
1103  * @brief check if fname is exist
1104  * @details [long description]
1105  *
1106  * @param fname [description]
1107  * @return [description]
1108  */
FileExists(const std::string & fname)1109 Status EnvLibrados::FileExists(const std::string& fname)
1110 {
1111   LOG_DEBUG("[IN]%s\n", fname.c_str());
1112   std::string fid, dir, file;
1113   split(fname, &dir, &file);
1114   Status s = _GetFid(dir + "/" + file, fid);
1115 
1116   if (s.ok() && fid != DIR_ID_VALUE) {
1117     s = Status::OK();
1118   }
1119 
1120   LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1121   return s;
1122 }
1123 
1124 /**
1125  * @brief get subfile name of dir_in
1126  * @details [long description]
1127  *
1128  * @param dir_in [description]
1129  * @param result [description]
1130  *
1131  * @return [description]
1132  */
GetChildren(const std::string & dir_in,std::vector<std::string> * result)1133 Status EnvLibrados::GetChildren(
1134   const std::string& dir_in,
1135   std::vector<std::string>* result)
1136 {
1137   LOG_DEBUG("[IN]%s\n", dir_in.c_str());
1138   std::string fid, dir, file;
1139   split(dir_in + "/temp", &dir, &file);
1140   Status s;
1141 
1142   do {
1143     s = _GetFid(dir, fid);
1144     if (!s.ok()) {
1145       break;
1146     }
1147 
1148     if (fid != DIR_ID_VALUE) {
1149       s = Status::IOError();
1150       break;
1151     }
1152 
1153     s = _GetSubFnames(dir, result);
1154   } while (0);
1155 
1156   LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1157   return s;
1158 }
1159 
1160 /**
1161  * @brief delete fname
1162  * @details [long description]
1163  *
1164  * @param fname [description]
1165  * @return [description]
1166  */
DeleteFile(const std::string & fname)1167 Status EnvLibrados::DeleteFile(const std::string& fname)
1168 {
1169   LOG_DEBUG("[IN]%s\n", fname.c_str());
1170   std::string fid, dir, file;
1171   split(fname, &dir, &file);
1172   Status s = _GetFid(dir + "/" + file, fid);
1173 
1174   if (s.ok() && DIR_ID_VALUE != fid) {
1175     s = _DelFid(dir + "/" + file);
1176   } else {
1177     s = Status::NotFound();
1178   }
1179   LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1180   return s;
1181 }
1182 
1183 /**
1184  * @brief create new dir
1185  * @details [long description]
1186  *
1187  * @param dirname [description]
1188  * @return [description]
1189  */
CreateDir(const std::string & dirname)1190 Status EnvLibrados::CreateDir(const std::string& dirname)
1191 {
1192   LOG_DEBUG("[IN]%s\n", dirname.c_str());
1193   std::string fid, dir, file;
1194   split(dirname + "/temp", &dir, &file);
1195   Status s = _GetFid(dir + "/" + file, fid);
1196 
1197   do {
1198     if (Status::NotFound() != s && fid != DIR_ID_VALUE) {
1199       break;
1200     } else if (Status::OK() == s && fid == DIR_ID_VALUE) {
1201       break;
1202     }
1203 
1204     s = _AddFid(dir, DIR_ID_VALUE);
1205   } while (0);
1206 
1207   LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1208   return s;
1209 }
1210 
1211 /**
1212  * @brief create dir if missing
1213  * @details [long description]
1214  *
1215  * @param dirname [description]
1216  * @return [description]
1217  */
CreateDirIfMissing(const std::string & dirname)1218 Status EnvLibrados::CreateDirIfMissing(const std::string& dirname)
1219 {
1220   LOG_DEBUG("[IN]%s\n", dirname.c_str());
1221   std::string fid, dir, file;
1222   split(dirname + "/temp", &dir, &file);
1223   Status s = Status::OK();
1224 
1225   do {
1226     s = _GetFid(dir, fid);
1227     if (Status::NotFound() != s) {
1228       break;
1229     }
1230 
1231     s = _AddFid(dir, DIR_ID_VALUE);
1232   } while (0);
1233 
1234   LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1235   return s;
1236 }
1237 
1238 /**
1239  * @brief delete dir
1240  * @details
1241  *
1242  * @param dirname [description]
1243  * @return [description]
1244  */
DeleteDir(const std::string & dirname)1245 Status EnvLibrados::DeleteDir(const std::string& dirname)
1246 {
1247   LOG_DEBUG("[IN]%s\n", dirname.c_str());
1248   std::string fid, dir, file;
1249   split(dirname + "/temp", &dir, &file);
1250   Status s = Status::OK();
1251 
1252   s = _GetFid(dir, fid);
1253 
1254   if (s.ok() && DIR_ID_VALUE == fid) {
1255     std::vector<std::string> subs;
1256     s = _GetSubFnames(dir, &subs);
1257     // if subfiles exist, can't delete dir
1258     if (subs.size() > 0) {
1259       s = Status::IOError();
1260     } else {
1261       s = _DelFid(dir);
1262     }
1263   } else {
1264     s = Status::NotFound();
1265   }
1266 
1267   LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1268   return s;
1269 }
1270 
1271 /**
1272  * @brief return file size
1273  * @details [long description]
1274  *
1275  * @param fname [description]
1276  * @param file_size [description]
1277  *
1278  * @return [description]
1279  */
GetFileSize(const std::string & fname,uint64_t * file_size)1280 Status EnvLibrados::GetFileSize(
1281   const std::string& fname,
1282   uint64_t* file_size)
1283 {
1284   LOG_DEBUG("[IN]%s\n", fname.c_str());
1285   std::string fid, dir, file;
1286   split(fname, &dir, &file);
1287   time_t mtime;
1288   Status s;
1289 
1290   do {
1291     std::string fpath = dir + "/" + file;
1292     s = _GetFid(fpath, fid);
1293 
1294     if (!s.ok()) {
1295       break;
1296     }
1297 
1298     int ret = _GetIoctx(fpath)->stat(fid, file_size, &mtime);
1299     if (ret < 0) {
1300       LOG_DEBUG("%i\n", ret);
1301       if (-ENOENT == ret) {
1302         *file_size = 0;
1303         s = Status::OK();
1304       } else {
1305         s = err_to_status(ret);
1306       }
1307     } else {
1308       s = Status::OK();
1309     }
1310   } while (0);
1311 
1312   LOG_DEBUG("[OUT]%s|%lld\n", s.ToString().c_str(), (long long)*file_size);
1313   return s;
1314 }
1315 
1316 /**
1317  * @brief get file modification time
1318  * @details [long description]
1319  *
1320  * @param fname [description]
1321  * @param file_mtime [description]
1322  *
1323  * @return [description]
1324  */
GetFileModificationTime(const std::string & fname,uint64_t * file_mtime)1325 Status EnvLibrados::GetFileModificationTime(const std::string& fname,
1326     uint64_t* file_mtime)
1327 {
1328   LOG_DEBUG("[IN]%s\n", fname.c_str());
1329   std::string fid, dir, file;
1330   split(fname, &dir, &file);
1331   time_t mtime;
1332   uint64_t file_size;
1333   Status s = Status::OK();
1334   do {
1335     std::string fpath = dir + "/" + file;
1336     s = _GetFid(dir + "/" + file, fid);
1337 
1338     if (!s.ok()) {
1339       break;
1340     }
1341 
1342     int ret = _GetIoctx(fpath)->stat(fid, &file_size, &mtime);
1343     if (ret < 0) {
1344       if (Status::NotFound() == err_to_status(ret)) {
1345         *file_mtime = static_cast<uint64_t>(mtime);
1346         s = Status::OK();
1347       } else {
1348         s = err_to_status(ret);
1349       }
1350     } else {
1351       s = Status::OK();
1352     }
1353   } while (0);
1354 
1355   LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1356   return s;
1357 }
1358 
1359 /**
1360  * @brief rename file
1361  * @details
1362  *
1363  * @param src [description]
1364  * @param target_in [description]
1365  *
1366  * @return [description]
1367  */
RenameFile(const std::string & src,const std::string & target_in)1368 Status EnvLibrados::RenameFile(
1369   const std::string& src,
1370   const std::string& target_in)
1371 {
1372   LOG_DEBUG("[IN]%s => %s\n", src.c_str(), target_in.c_str());
1373   std::string src_fid, tmp_fid, src_dir, src_file, dst_dir, dst_file;
1374   split(src, &src_dir, &src_file);
1375   split(target_in, &dst_dir, &dst_file);
1376 
1377   auto s = _RenameFid(src_dir + "/" + src_file,
1378                       dst_dir + "/" + dst_file);
1379   LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1380   return s;
1381 }
1382 
1383 /**
1384  * @brief not support
1385  * @details [long description]
1386  *
1387  * @param src [description]
1388  * @param target_in [description]
1389  *
1390  * @return [description]
1391  */
LinkFile(const std::string & src,const std::string & target_in)1392 Status EnvLibrados::LinkFile(
1393   const std::string& src,
1394   const std::string& target_in)
1395 {
1396   LOG_DEBUG("[IO]%s => %s\n", src.c_str(), target_in.c_str());
1397   return Status::NotSupported();
1398 }
1399 
1400 /**
1401  * @brief lock file. create if missing.
1402  * @details [long description]
1403  *
1404  * It seems that LockFile is used for preventing other instance of RocksDB
1405  * from opening up the database at the same time. From RocksDB source code,
1406  * the invokes of LockFile are at following locations:
1407  *
1408  *  ./db/db_impl.cc:1159:    s = env_->LockFile(LockFileName(dbname_), &db_lock_);    // DBImpl::Recover
1409  *  ./db/db_impl.cc:5839:  Status result = env->LockFile(lockname, &lock);            // Status DestroyDB
1410  *
1411  * When db recovery and db destroy, RocksDB will call LockFile
1412  *
1413  * @param fname [description]
1414  * @param lock [description]
1415  *
1416  * @return [description]
1417  */
LockFile(const std::string & fname,FileLock ** lock)1418 Status EnvLibrados::LockFile(
1419   const std::string& fname,
1420   FileLock** lock)
1421 {
1422   LOG_DEBUG("[IN]%s\n", fname.c_str());
1423   std::string fid, dir, file;
1424   split(fname, &dir, &file);
1425   Status s = Status::OK();
1426 
1427   do {
1428     std::string fpath = dir + "/" + file;
1429     s = _GetFid(fpath, fid);
1430 
1431     if (Status::OK() != s &&
1432         Status::NotFound() != s) {
1433       break;
1434     } else if (Status::NotFound() == s) {
1435       s = _AddFid(fpath, _CreateFid());
1436       if (!s.ok()) {
1437         break;
1438       }
1439     } else if (Status::OK() == s && DIR_ID_VALUE == fid) {
1440       s = Status::IOError();
1441       break;
1442     }
1443 
1444     *lock = new LibradosFileLock(_GetIoctx(fpath), fpath);
1445   } while (0);
1446 
1447   LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1448   return s;
1449 }
1450 
1451 /**
1452  * @brief unlock file
1453  * @details [long description]
1454  *
1455  * @param lock [description]
1456  * @return [description]
1457  */
UnlockFile(FileLock * lock)1458 Status EnvLibrados::UnlockFile(FileLock* lock)
1459 {
1460   LOG_DEBUG("[IO]%p\n", lock);
1461   if (nullptr != lock) {
1462     delete lock;
1463   }
1464   return Status::OK();
1465 }
1466 
1467 
1468 /**
1469  * @brief not support
1470  * @details [long description]
1471  *
1472  * @param db_path [description]
1473  * @param output_path [description]
1474  *
1475  * @return [description]
1476  */
GetAbsolutePath(const std::string & db_path,std::string * output_path)1477 Status EnvLibrados::GetAbsolutePath(
1478   const std::string& db_path,
1479   std::string* output_path)
1480 {
1481   LOG_DEBUG("[IO]%s\n", db_path.c_str());
1482   return Status::NotSupported();
1483 }
1484 
1485 /**
1486  * @brief Get default EnvLibrados
1487  * @details [long description]
1488  * @return [description]
1489  */
Default()1490 EnvLibrados* EnvLibrados::Default() {
1491   static EnvLibrados default_env(default_db_name,
1492                                  std::getenv(default_config_path),
1493                                  default_pool_name);
1494   return &default_env;
1495 }
1496 
1497 }  // namespace ROCKSDB_NAMESPACE
1498