1 // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
2 //  Copyright (c) 2016, Red Hat, Inc.  All rights reserved.
3 //  This source code is licensed under both the GPLv2 (found in the
4 //  COPYING file in the root directory) and Apache 2.0 License
5 //  (found in the LICENSE.Apache file in the root directory).
6 
7 #ifndef ROCKSDB_LITE
8 
9 #include "rocksdb/utilities/env_librados.h"
10 #include <rados/librados.hpp>
11 #include "env/mock_env.h"
12 #include "test_util/testharness.h"
13 
14 #include "rocksdb/db.h"
15 #include "rocksdb/slice.h"
16 #include "rocksdb/options.h"
17 #include "util/random.h"
18 #include <chrono>
19 #include <ostream>
20 #include "rocksdb/utilities/transaction_db.h"
21 
22 class Timer {
23   using high_resolution_clock = std::chrono::high_resolution_clock;
24   using milliseconds = std::chrono::milliseconds;
25 
26  public:
Timer(bool run=false)27   explicit Timer(bool run = false)
28   {
29     if (run)
30       Reset();
31   }
Reset()32   void Reset()
33   {
34     _start = high_resolution_clock::now();
35   }
Elapsed() const36   milliseconds Elapsed() const
37   {
38     return std::chrono::duration_cast<milliseconds>(high_resolution_clock::now() - _start);
39   }
40   template <typename T, typename Traits>
operator <<(std::basic_ostream<T,Traits> & out,const Timer & timer)41   friend std::basic_ostream<T, Traits>& operator<<(std::basic_ostream<T, Traits>& out, const Timer& timer)
42   {
43     return out << timer.Elapsed().count();
44   }
45 private:
46   high_resolution_clock::time_point _start;
47 };
48 
49 namespace ROCKSDB_NAMESPACE {
50 
51 class EnvLibradosTest : public testing::Test {
52 public:
53   // we will use all of these below
54   const std::string db_name = "env_librados_test_db";
55   const std::string db_pool = db_name + "_pool";
56   const char *keyring = "admin";
57   const char *config = "../ceph/src/ceph.conf";
58 
59   EnvLibrados* env_;
60   const EnvOptions soptions_;
61 
EnvLibradosTest()62   EnvLibradosTest()
63     : env_(new EnvLibrados(db_name, config, db_pool)) {
64   }
~EnvLibradosTest()65   ~EnvLibradosTest() {
66     delete env_;
67     librados::Rados rados;
68     int ret = 0;
69     do {
70       ret = rados.init("admin"); // just use the client.admin keyring
71       if (ret < 0) { // let's handle any error that might have come back
72         std::cerr << "couldn't initialize rados! error " << ret << std::endl;
73         ret = EXIT_FAILURE;
74         break;
75       }
76 
77       ret = rados.conf_read_file(config);
78       if (ret < 0) {
79         // This could fail if the config file is malformed, but it'd be hard.
80         std::cerr << "failed to parse config file " << config
81                   << "! error" << ret << std::endl;
82         ret = EXIT_FAILURE;
83         break;
84       }
85 
86       /*
87        * next, we actually connect to the cluster
88        */
89 
90       ret = rados.connect();
91       if (ret < 0) {
92         std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
93         ret = EXIT_FAILURE;
94         break;
95       }
96 
97       /*
98        * And now we're done, so let's remove our pool and then
99        * shut down the connection gracefully.
100        */
101       int delete_ret = rados.pool_delete(db_pool.c_str());
102       if (delete_ret < 0) {
103         // be careful not to
104         std::cerr << "We failed to delete our test pool!" << db_pool << delete_ret << std::endl;
105         ret = EXIT_FAILURE;
106       }
107     } while (0);
108   }
109 };
110 
TEST_F(EnvLibradosTest,Basics)111 TEST_F(EnvLibradosTest, Basics) {
112   uint64_t file_size;
113   std::unique_ptr<WritableFile> writable_file;
114   std::vector<std::string> children;
115 
116   ASSERT_OK(env_->CreateDir("/dir"));
117   // Check that the directory is empty.
118   ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/non_existent"));
119   ASSERT_TRUE(!env_->GetFileSize("/dir/non_existent", &file_size).ok());
120   ASSERT_OK(env_->GetChildren("/dir", &children));
121   ASSERT_EQ(0U, children.size());
122 
123   // Create a file.
124   ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
125   writable_file.reset();
126 
127   // Check that the file exists.
128   ASSERT_OK(env_->FileExists("/dir/f"));
129   ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
130   ASSERT_EQ(0U, file_size);
131   ASSERT_OK(env_->GetChildren("/dir", &children));
132   ASSERT_EQ(1U, children.size());
133   ASSERT_EQ("f", children[0]);
134 
135   // Write to the file.
136   ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
137   ASSERT_OK(writable_file->Append("abc"));
138   writable_file.reset();
139 
140 
141   // Check for expected size.
142   ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
143   ASSERT_EQ(3U, file_size);
144 
145 
146   // Check that renaming works.
147   ASSERT_TRUE(!env_->RenameFile("/dir/non_existent", "/dir/g").ok());
148   ASSERT_OK(env_->RenameFile("/dir/f", "/dir/g"));
149   ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/f"));
150   ASSERT_OK(env_->FileExists("/dir/g"));
151   ASSERT_OK(env_->GetFileSize("/dir/g", &file_size));
152   ASSERT_EQ(3U, file_size);
153 
154   // Check that opening non-existent file fails.
155   std::unique_ptr<SequentialFile> seq_file;
156   std::unique_ptr<RandomAccessFile> rand_file;
157   ASSERT_TRUE(
158     !env_->NewSequentialFile("/dir/non_existent", &seq_file, soptions_).ok());
159   ASSERT_TRUE(!seq_file);
160   ASSERT_TRUE(!env_->NewRandomAccessFile("/dir/non_existent", &rand_file,
161                                          soptions_).ok());
162   ASSERT_TRUE(!rand_file);
163 
164   // Check that deleting works.
165   ASSERT_TRUE(!env_->DeleteFile("/dir/non_existent").ok());
166   ASSERT_OK(env_->DeleteFile("/dir/g"));
167   ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/g"));
168   ASSERT_OK(env_->GetChildren("/dir", &children));
169   ASSERT_EQ(0U, children.size());
170   ASSERT_OK(env_->DeleteDir("/dir"));
171 }
172 
TEST_F(EnvLibradosTest,ReadWrite)173 TEST_F(EnvLibradosTest, ReadWrite) {
174   std::unique_ptr<WritableFile> writable_file;
175   std::unique_ptr<SequentialFile> seq_file;
176   std::unique_ptr<RandomAccessFile> rand_file;
177   Slice result;
178   char scratch[100];
179 
180   ASSERT_OK(env_->CreateDir("/dir"));
181 
182   ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
183   ASSERT_OK(writable_file->Append("hello "));
184   ASSERT_OK(writable_file->Append("world"));
185   writable_file.reset();
186 
187   // Read sequentially.
188   ASSERT_OK(env_->NewSequentialFile("/dir/f", &seq_file, soptions_));
189   ASSERT_OK(seq_file->Read(5, &result, scratch));  // Read "hello".
190   ASSERT_EQ(0, result.compare("hello"));
191   ASSERT_OK(seq_file->Skip(1));
192   ASSERT_OK(seq_file->Read(1000, &result, scratch));  // Read "world".
193   ASSERT_EQ(0, result.compare("world"));
194   ASSERT_OK(seq_file->Read(1000, &result, scratch));  // Try reading past EOF.
195   ASSERT_EQ(0U, result.size());
196   ASSERT_OK(seq_file->Skip(100));  // Try to skip past end of file.
197   ASSERT_OK(seq_file->Read(1000, &result, scratch));
198   ASSERT_EQ(0U, result.size());
199 
200   // Random reads.
201   ASSERT_OK(env_->NewRandomAccessFile("/dir/f", &rand_file, soptions_));
202   ASSERT_OK(rand_file->Read(6, 5, &result, scratch));  // Read "world".
203   ASSERT_EQ(0, result.compare("world"));
204   ASSERT_OK(rand_file->Read(0, 5, &result, scratch));  // Read "hello".
205   ASSERT_EQ(0, result.compare("hello"));
206   ASSERT_OK(rand_file->Read(10, 100, &result, scratch));  // Read "d".
207   ASSERT_EQ(0, result.compare("d"));
208 
209   // Too high offset.
210   ASSERT_OK(rand_file->Read(1000, 5, &result, scratch));
211 }
212 
TEST_F(EnvLibradosTest,Locks)213 TEST_F(EnvLibradosTest, Locks) {
214   FileLock* lock = nullptr;
215   std::unique_ptr<WritableFile> writable_file;
216 
217   ASSERT_OK(env_->CreateDir("/dir"));
218 
219   ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
220 
221   // These are no-ops, but we test they return success.
222   ASSERT_OK(env_->LockFile("some file", &lock));
223   ASSERT_OK(env_->UnlockFile(lock));
224 
225   ASSERT_OK(env_->LockFile("/dir/f", &lock));
226   ASSERT_OK(env_->UnlockFile(lock));
227 }
228 
TEST_F(EnvLibradosTest,Misc)229 TEST_F(EnvLibradosTest, Misc) {
230   std::string test_dir;
231   ASSERT_OK(env_->GetTestDirectory(&test_dir));
232   ASSERT_TRUE(!test_dir.empty());
233 
234   std::unique_ptr<WritableFile> writable_file;
235   ASSERT_TRUE(!env_->NewWritableFile("/a/b", &writable_file, soptions_).ok());
236 
237   ASSERT_OK(env_->NewWritableFile("/a", &writable_file, soptions_));
238   // These are no-ops, but we test they return success.
239   ASSERT_OK(writable_file->Sync());
240   ASSERT_OK(writable_file->Flush());
241   ASSERT_OK(writable_file->Close());
242   writable_file.reset();
243 }
244 
TEST_F(EnvLibradosTest,LargeWrite)245 TEST_F(EnvLibradosTest, LargeWrite) {
246   const size_t kWriteSize = 300 * 1024;
247   char* scratch = new char[kWriteSize * 2];
248 
249   std::string write_data;
250   for (size_t i = 0; i < kWriteSize; ++i) {
251     write_data.append(1, 'h');
252   }
253 
254   std::unique_ptr<WritableFile> writable_file;
255   ASSERT_OK(env_->CreateDir("/dir"));
256   ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
257   ASSERT_OK(writable_file->Append("foo"));
258   ASSERT_OK(writable_file->Append(write_data));
259   writable_file.reset();
260 
261   std::unique_ptr<SequentialFile> seq_file;
262   Slice result;
263   ASSERT_OK(env_->NewSequentialFile("/dir/g", &seq_file, soptions_));
264   ASSERT_OK(seq_file->Read(3, &result, scratch));  // Read "foo".
265   ASSERT_EQ(0, result.compare("foo"));
266 
267   size_t read = 0;
268   std::string read_data;
269   while (read < kWriteSize) {
270     ASSERT_OK(seq_file->Read(kWriteSize - read, &result, scratch));
271     read_data.append(result.data(), result.size());
272     read += result.size();
273   }
274   ASSERT_TRUE(write_data == read_data);
275   delete[] scratch;
276 }
277 
TEST_F(EnvLibradosTest,FrequentlySmallWrite)278 TEST_F(EnvLibradosTest, FrequentlySmallWrite) {
279   const size_t kWriteSize = 1 << 10;
280   char* scratch = new char[kWriteSize * 2];
281 
282   std::string write_data;
283   for (size_t i = 0; i < kWriteSize; ++i) {
284     write_data.append(1, 'h');
285   }
286 
287   std::unique_ptr<WritableFile> writable_file;
288   ASSERT_OK(env_->CreateDir("/dir"));
289   ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
290   ASSERT_OK(writable_file->Append("foo"));
291 
292   for (size_t i = 0; i < kWriteSize; ++i) {
293     ASSERT_OK(writable_file->Append("h"));
294   }
295   writable_file.reset();
296 
297   std::unique_ptr<SequentialFile> seq_file;
298   Slice result;
299   ASSERT_OK(env_->NewSequentialFile("/dir/g", &seq_file, soptions_));
300   ASSERT_OK(seq_file->Read(3, &result, scratch));  // Read "foo".
301   ASSERT_EQ(0, result.compare("foo"));
302 
303   size_t read = 0;
304   std::string read_data;
305   while (read < kWriteSize) {
306     ASSERT_OK(seq_file->Read(kWriteSize - read, &result, scratch));
307     read_data.append(result.data(), result.size());
308     read += result.size();
309   }
310   ASSERT_TRUE(write_data == read_data);
311   delete[] scratch;
312 }
313 
TEST_F(EnvLibradosTest,Truncate)314 TEST_F(EnvLibradosTest, Truncate) {
315   const size_t kWriteSize = 300 * 1024;
316   const size_t truncSize = 1024;
317   std::string write_data;
318   for (size_t i = 0; i < kWriteSize; ++i) {
319     write_data.append(1, 'h');
320   }
321 
322   std::unique_ptr<WritableFile> writable_file;
323   ASSERT_OK(env_->CreateDir("/dir"));
324   ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
325   ASSERT_OK(writable_file->Append(write_data));
326   ASSERT_EQ(writable_file->GetFileSize(), kWriteSize);
327   ASSERT_OK(writable_file->Truncate(truncSize));
328   ASSERT_EQ(writable_file->GetFileSize(), truncSize);
329   writable_file.reset();
330 }
331 
TEST_F(EnvLibradosTest,DBBasics)332 TEST_F(EnvLibradosTest, DBBasics) {
333   std::string kDBPath = "/tmp/DBBasics";
334   DB* db;
335   Options options;
336   // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
337   options.IncreaseParallelism();
338   options.OptimizeLevelStyleCompaction();
339   // create the DB if it's not already present
340   options.create_if_missing = true;
341   options.env = env_;
342 
343   // open DB
344   Status s = DB::Open(options, kDBPath, &db);
345   assert(s.ok());
346 
347   // Put key-value
348   s = db->Put(WriteOptions(), "key1", "value");
349   assert(s.ok());
350   std::string value;
351   // get value
352   s = db->Get(ReadOptions(), "key1", &value);
353   assert(s.ok());
354   assert(value == "value");
355 
356   // atomically apply a set of updates
357   {
358     WriteBatch batch;
359     batch.Delete("key1");
360     batch.Put("key2", value);
361     s = db->Write(WriteOptions(), &batch);
362   }
363 
364   s = db->Get(ReadOptions(), "key1", &value);
365   assert(s.IsNotFound());
366 
367   db->Get(ReadOptions(), "key2", &value);
368   assert(value == "value");
369 
370   delete db;
371 }
372 
TEST_F(EnvLibradosTest,DBLoadKeysInRandomOrder)373 TEST_F(EnvLibradosTest, DBLoadKeysInRandomOrder) {
374   char key[20] = {0}, value[20] = {0};
375   int max_loop = 1 << 10;
376   Timer timer(false);
377   std::cout << "Test size : loop(" << max_loop << ")" << std::endl;
378   /**********************************
379             use default env
380   ***********************************/
381   std::string kDBPath1 = "/tmp/DBLoadKeysInRandomOrder1";
382   DB* db1;
383   Options options1;
384   // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
385   options1.IncreaseParallelism();
386   options1.OptimizeLevelStyleCompaction();
387   // create the DB if it's not already present
388   options1.create_if_missing = true;
389 
390   // open DB
391   Status s1 = DB::Open(options1, kDBPath1, &db1);
392   assert(s1.ok());
393 
394   ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
395 
396   timer.Reset();
397   for (int i = 0; i < max_loop; ++i) {
398     snprintf(key,
399              20,
400              "%16lx",
401              (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
402     snprintf(value,
403              20,
404              "%16lx",
405              (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
406     // Put key-value
407     s1 = db1->Put(WriteOptions(), key, value);
408     assert(s1.ok());
409   }
410   std::cout << "Time by default : " << timer << "ms" << std::endl;
411   delete db1;
412 
413   /**********************************
414             use librados env
415   ***********************************/
416   std::string kDBPath2 = "/tmp/DBLoadKeysInRandomOrder2";
417   DB* db2;
418   Options options2;
419   // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
420   options2.IncreaseParallelism();
421   options2.OptimizeLevelStyleCompaction();
422   // create the DB if it's not already present
423   options2.create_if_missing = true;
424   options2.env = env_;
425 
426   // open DB
427   Status s2 = DB::Open(options2, kDBPath2, &db2);
428   assert(s2.ok());
429 
430   ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
431 
432   timer.Reset();
433   for (int i = 0; i < max_loop; ++i) {
434     snprintf(key,
435              20,
436              "%16lx",
437              (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
438     snprintf(value,
439              20,
440              "%16lx",
441              (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
442     // Put key-value
443     s2 = db2->Put(WriteOptions(), key, value);
444     assert(s2.ok());
445   }
446   std::cout << "Time by librados : " << timer << "ms" << std::endl;
447   delete db2;
448 }
449 
TEST_F(EnvLibradosTest,DBBulkLoadKeysInRandomOrder)450 TEST_F(EnvLibradosTest, DBBulkLoadKeysInRandomOrder) {
451   char key[20] = {0}, value[20] = {0};
452   int max_loop = 1 << 6;
453   int bulk_size = 1 << 15;
454   Timer timer(false);
455   std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
456   /**********************************
457             use default env
458   ***********************************/
459   std::string kDBPath1 = "/tmp/DBBulkLoadKeysInRandomOrder1";
460   DB* db1;
461   Options options1;
462   // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
463   options1.IncreaseParallelism();
464   options1.OptimizeLevelStyleCompaction();
465   // create the DB if it's not already present
466   options1.create_if_missing = true;
467 
468   // open DB
469   Status s1 = DB::Open(options1, kDBPath1, &db1);
470   assert(s1.ok());
471 
472   ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
473 
474   timer.Reset();
475   for (int i = 0; i < max_loop; ++i) {
476     WriteBatch batch;
477     for (int j = 0; j < bulk_size; ++j) {
478       snprintf(key,
479                20,
480                "%16lx",
481                (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
482       snprintf(value,
483                20,
484                "%16lx",
485                (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
486       batch.Put(key, value);
487     }
488     s1 = db1->Write(WriteOptions(), &batch);
489     assert(s1.ok());
490   }
491   std::cout << "Time by default : " << timer << "ms" << std::endl;
492   delete db1;
493 
494   /**********************************
495             use librados env
496   ***********************************/
497   std::string kDBPath2 = "/tmp/DBBulkLoadKeysInRandomOrder2";
498   DB* db2;
499   Options options2;
500   // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
501   options2.IncreaseParallelism();
502   options2.OptimizeLevelStyleCompaction();
503   // create the DB if it's not already present
504   options2.create_if_missing = true;
505   options2.env = env_;
506 
507   // open DB
508   Status s2 = DB::Open(options2, kDBPath2, &db2);
509   assert(s2.ok());
510 
511   ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
512 
513   timer.Reset();
514   for (int i = 0; i < max_loop; ++i) {
515     WriteBatch batch;
516     for (int j = 0; j < bulk_size; ++j) {
517       snprintf(key,
518                20,
519                "%16lx",
520                (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
521       snprintf(value,
522                20,
523                "%16lx",
524                (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
525       batch.Put(key, value);
526     }
527     s2 = db2->Write(WriteOptions(), &batch);
528     assert(s2.ok());
529   }
530   std::cout << "Time by librados : " << timer << "ms" << std::endl;
531   delete db2;
532 }
533 
TEST_F(EnvLibradosTest,DBBulkLoadKeysInSequentialOrder)534 TEST_F(EnvLibradosTest, DBBulkLoadKeysInSequentialOrder) {
535   char key[20] = {0}, value[20] = {0};
536   int max_loop = 1 << 6;
537   int bulk_size = 1 << 15;
538   Timer timer(false);
539   std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
540   /**********************************
541             use default env
542   ***********************************/
543   std::string kDBPath1 = "/tmp/DBBulkLoadKeysInSequentialOrder1";
544   DB* db1;
545   Options options1;
546   // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
547   options1.IncreaseParallelism();
548   options1.OptimizeLevelStyleCompaction();
549   // create the DB if it's not already present
550   options1.create_if_missing = true;
551 
552   // open DB
553   Status s1 = DB::Open(options1, kDBPath1, &db1);
554   assert(s1.ok());
555 
556   ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
557 
558   timer.Reset();
559   for (int i = 0; i < max_loop; ++i) {
560     WriteBatch batch;
561     for (int j = 0; j < bulk_size; ++j) {
562       snprintf(key,
563                20,
564                "%019lld",
565                (long long)(i * bulk_size + j));
566       snprintf(value,
567                20,
568                "%16lx",
569                (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
570       batch.Put(key, value);
571     }
572     s1 = db1->Write(WriteOptions(), &batch);
573     assert(s1.ok());
574   }
575   std::cout << "Time by default : " << timer << "ms" << std::endl;
576   delete db1;
577 
578   /**********************************
579             use librados env
580   ***********************************/
581   std::string kDBPath2 = "/tmp/DBBulkLoadKeysInSequentialOrder2";
582   DB* db2;
583   Options options2;
584   // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
585   options2.IncreaseParallelism();
586   options2.OptimizeLevelStyleCompaction();
587   // create the DB if it's not already present
588   options2.create_if_missing = true;
589   options2.env = env_;
590 
591   // open DB
592   Status s2 = DB::Open(options2, kDBPath2, &db2);
593   assert(s2.ok());
594 
595   ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
596 
597   timer.Reset();
598   for (int i = 0; i < max_loop; ++i) {
599     WriteBatch batch;
600     for (int j = 0; j < bulk_size; ++j) {
601       snprintf(key,
602                20,
603                "%16lx",
604                (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
605       snprintf(value,
606                20,
607                "%16lx",
608                (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
609       batch.Put(key, value);
610     }
611     s2 = db2->Write(WriteOptions(), &batch);
612     assert(s2.ok());
613   }
614   std::cout << "Time by librados : " << timer << "ms" << std::endl;
615   delete db2;
616 }
617 
TEST_F(EnvLibradosTest,DBRandomRead)618 TEST_F(EnvLibradosTest, DBRandomRead) {
619   char key[20] = {0}, value[20] = {0};
620   int max_loop = 1 << 6;
621   int bulk_size = 1 << 10;
622   int read_loop = 1 << 20;
623   Timer timer(false);
624   std::cout << "Test size : keys_num(" << max_loop << ", " << bulk_size << "); read_loop(" << read_loop << ")" << std::endl;
625   /**********************************
626             use default env
627   ***********************************/
628   std::string kDBPath1 = "/tmp/DBRandomRead1";
629   DB* db1;
630   Options options1;
631   // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
632   options1.IncreaseParallelism();
633   options1.OptimizeLevelStyleCompaction();
634   // create the DB if it's not already present
635   options1.create_if_missing = true;
636 
637   // open DB
638   Status s1 = DB::Open(options1, kDBPath1, &db1);
639   assert(s1.ok());
640 
641   ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
642 
643   for (int i = 0; i < max_loop; ++i) {
644     WriteBatch batch;
645     for (int j = 0; j < bulk_size; ++j) {
646       snprintf(key,
647                20,
648                "%019lld",
649                (long long)(i * bulk_size + j));
650       snprintf(value,
651                20,
652                "%16lx",
653                (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
654       batch.Put(key, value);
655     }
656     s1 = db1->Write(WriteOptions(), &batch);
657     assert(s1.ok());
658   }
659   timer.Reset();
660   int base1 = 0, offset1 = 0;
661   for (int i = 0; i < read_loop; ++i) {
662     base1 = r1.Uniform(max_loop);
663     offset1 = r1.Uniform(bulk_size);
664     std::string value1;
665     snprintf(key,
666              20,
667              "%019lld",
668              (long long)(base1 * bulk_size + offset1));
669     s1 = db1->Get(ReadOptions(), key, &value1);
670     assert(s1.ok());
671   }
672   std::cout << "Time by default : " << timer << "ms" << std::endl;
673   delete db1;
674 
675   /**********************************
676             use librados env
677   ***********************************/
678   std::string kDBPath2 = "/tmp/DBRandomRead2";
679   DB* db2;
680   Options options2;
681   // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
682   options2.IncreaseParallelism();
683   options2.OptimizeLevelStyleCompaction();
684   // create the DB if it's not already present
685   options2.create_if_missing = true;
686   options2.env = env_;
687 
688   // open DB
689   Status s2 = DB::Open(options2, kDBPath2, &db2);
690   assert(s2.ok());
691 
692   ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
693 
694   for (int i = 0; i < max_loop; ++i) {
695     WriteBatch batch;
696     for (int j = 0; j < bulk_size; ++j) {
697       snprintf(key,
698                20,
699                "%019lld",
700                (long long)(i * bulk_size + j));
701       snprintf(value,
702                20,
703                "%16lx",
704                (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
705       batch.Put(key, value);
706     }
707     s2 = db2->Write(WriteOptions(), &batch);
708     assert(s2.ok());
709   }
710 
711   timer.Reset();
712   int base2 = 0, offset2 = 0;
713   for (int i = 0; i < read_loop; ++i) {
714     base2 = r2.Uniform(max_loop);
715     offset2 = r2.Uniform(bulk_size);
716     std::string value2;
717     snprintf(key,
718              20,
719              "%019lld",
720              (long long)(base2 * bulk_size + offset2));
721     s2 = db2->Get(ReadOptions(), key, &value2);
722     if (!s2.ok()) {
723       std::cout << s2.ToString() << std::endl;
724     }
725     assert(s2.ok());
726   }
727   std::cout << "Time by librados : " << timer << "ms" << std::endl;
728   delete db2;
729 }
730 
731 class EnvLibradosMutipoolTest : public testing::Test {
732 public:
733   // we will use all of these below
734   const std::string client_name = "client.admin";
735   const std::string cluster_name = "ceph";
736   const uint64_t flags = 0;
737   const std::string db_name = "env_librados_test_db";
738   const std::string db_pool = db_name + "_pool";
739   const std::string wal_dir = "/wal";
740   const std::string wal_pool = db_name + "_wal_pool";
741   const size_t write_buffer_size = 1 << 20;
742   const char *keyring = "admin";
743   const char *config = "../ceph/src/ceph.conf";
744 
745   EnvLibrados* env_;
746   const EnvOptions soptions_;
747 
EnvLibradosMutipoolTest()748   EnvLibradosMutipoolTest() {
749     env_ = new EnvLibrados(client_name, cluster_name, flags, db_name, config, db_pool, wal_dir, wal_pool, write_buffer_size);
750   }
~EnvLibradosMutipoolTest()751   ~EnvLibradosMutipoolTest() {
752     delete env_;
753     librados::Rados rados;
754     int ret = 0;
755     do {
756       ret = rados.init("admin"); // just use the client.admin keyring
757       if (ret < 0) { // let's handle any error that might have come back
758         std::cerr << "couldn't initialize rados! error " << ret << std::endl;
759         ret = EXIT_FAILURE;
760         break;
761       }
762 
763       ret = rados.conf_read_file(config);
764       if (ret < 0) {
765         // This could fail if the config file is malformed, but it'd be hard.
766         std::cerr << "failed to parse config file " << config
767                   << "! error" << ret << std::endl;
768         ret = EXIT_FAILURE;
769         break;
770       }
771 
772       /*
773        * next, we actually connect to the cluster
774        */
775 
776       ret = rados.connect();
777       if (ret < 0) {
778         std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
779         ret = EXIT_FAILURE;
780         break;
781       }
782 
783       /*
784        * And now we're done, so let's remove our pool and then
785        * shut down the connection gracefully.
786        */
787       int delete_ret = rados.pool_delete(db_pool.c_str());
788       if (delete_ret < 0) {
789         // be careful not to
790         std::cerr << "We failed to delete our test pool!" << db_pool << delete_ret << std::endl;
791         ret = EXIT_FAILURE;
792       }
793       delete_ret = rados.pool_delete(wal_pool.c_str());
794       if (delete_ret < 0) {
795         // be careful not to
796         std::cerr << "We failed to delete our test pool!" << wal_pool << delete_ret << std::endl;
797         ret = EXIT_FAILURE;
798       }
799     } while (0);
800   }
801 };
802 
TEST_F(EnvLibradosMutipoolTest,Basics)803 TEST_F(EnvLibradosMutipoolTest, Basics) {
804   uint64_t file_size;
805   std::unique_ptr<WritableFile> writable_file;
806   std::vector<std::string> children;
807   std::vector<std::string> v = {"/tmp/dir1", "/tmp/dir2", "/tmp/dir3", "/tmp/dir4", "dir"};
808 
809   for (size_t i = 0; i < v.size(); ++i) {
810     std::string dir = v[i];
811     std::string dir_non_existent = dir + "/non_existent";
812     std::string dir_f = dir + "/f";
813     std::string dir_g = dir + "/g";
814 
815     ASSERT_OK(env_->CreateDir(dir.c_str()));
816     // Check that the directory is empty.
817     ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_non_existent.c_str()));
818     ASSERT_TRUE(!env_->GetFileSize(dir_non_existent.c_str(), &file_size).ok());
819     ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
820     ASSERT_EQ(0U, children.size());
821 
822     // Create a file.
823     ASSERT_OK(env_->NewWritableFile(dir_f.c_str(), &writable_file, soptions_));
824     writable_file.reset();
825 
826     // Check that the file exists.
827     ASSERT_OK(env_->FileExists(dir_f.c_str()));
828     ASSERT_OK(env_->GetFileSize(dir_f.c_str(), &file_size));
829     ASSERT_EQ(0U, file_size);
830     ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
831     ASSERT_EQ(1U, children.size());
832     ASSERT_EQ("f", children[0]);
833 
834     // Write to the file.
835     ASSERT_OK(env_->NewWritableFile(dir_f.c_str(), &writable_file, soptions_));
836     ASSERT_OK(writable_file->Append("abc"));
837     writable_file.reset();
838 
839 
840     // Check for expected size.
841     ASSERT_OK(env_->GetFileSize(dir_f.c_str(), &file_size));
842     ASSERT_EQ(3U, file_size);
843 
844 
845     // Check that renaming works.
846     ASSERT_TRUE(!env_->RenameFile(dir_non_existent.c_str(), dir_g.c_str()).ok());
847     ASSERT_OK(env_->RenameFile(dir_f.c_str(), dir_g.c_str()));
848     ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_f.c_str()));
849     ASSERT_OK(env_->FileExists(dir_g.c_str()));
850     ASSERT_OK(env_->GetFileSize(dir_g.c_str(), &file_size));
851     ASSERT_EQ(3U, file_size);
852 
853     // Check that opening non-existent file fails.
854     std::unique_ptr<SequentialFile> seq_file;
855     std::unique_ptr<RandomAccessFile> rand_file;
856     ASSERT_TRUE(
857       !env_->NewSequentialFile(dir_non_existent.c_str(), &seq_file, soptions_).ok());
858     ASSERT_TRUE(!seq_file);
859     ASSERT_TRUE(!env_->NewRandomAccessFile(dir_non_existent.c_str(), &rand_file,
860                                            soptions_).ok());
861     ASSERT_TRUE(!rand_file);
862 
863     // Check that deleting works.
864     ASSERT_TRUE(!env_->DeleteFile(dir_non_existent.c_str()).ok());
865     ASSERT_OK(env_->DeleteFile(dir_g.c_str()));
866     ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_g.c_str()));
867     ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
868     ASSERT_EQ(0U, children.size());
869     ASSERT_OK(env_->DeleteDir(dir.c_str()));
870   }
871 }
872 
TEST_F(EnvLibradosMutipoolTest,DBBasics)873 TEST_F(EnvLibradosMutipoolTest, DBBasics) {
874   std::string kDBPath = "/tmp/DBBasics";
875   std::string walPath = "/tmp/wal";
876   DB* db;
877   Options options;
878   // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
879   options.IncreaseParallelism();
880   options.OptimizeLevelStyleCompaction();
881   // create the DB if it's not already present
882   options.create_if_missing = true;
883   options.env = env_;
884   options.wal_dir = walPath;
885 
886   // open DB
887   Status s = DB::Open(options, kDBPath, &db);
888   assert(s.ok());
889 
890   // Put key-value
891   s = db->Put(WriteOptions(), "key1", "value");
892   assert(s.ok());
893   std::string value;
894   // get value
895   s = db->Get(ReadOptions(), "key1", &value);
896   assert(s.ok());
897   assert(value == "value");
898 
899   // atomically apply a set of updates
900   {
901     WriteBatch batch;
902     batch.Delete("key1");
903     batch.Put("key2", value);
904     s = db->Write(WriteOptions(), &batch);
905   }
906 
907   s = db->Get(ReadOptions(), "key1", &value);
908   assert(s.IsNotFound());
909 
910   db->Get(ReadOptions(), "key2", &value);
911   assert(value == "value");
912 
913   delete db;
914 }
915 
TEST_F(EnvLibradosMutipoolTest,DBBulkLoadKeysInRandomOrder)916 TEST_F(EnvLibradosMutipoolTest, DBBulkLoadKeysInRandomOrder) {
917   char key[20] = {0}, value[20] = {0};
918   int max_loop = 1 << 6;
919   int bulk_size = 1 << 15;
920   Timer timer(false);
921   std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
922   /**********************************
923             use default env
924   ***********************************/
925   std::string kDBPath1 = "/tmp/DBBulkLoadKeysInRandomOrder1";
926   std::string walPath = "/tmp/wal";
927   DB* db1;
928   Options options1;
929   // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
930   options1.IncreaseParallelism();
931   options1.OptimizeLevelStyleCompaction();
932   // create the DB if it's not already present
933   options1.create_if_missing = true;
934 
935   // open DB
936   Status s1 = DB::Open(options1, kDBPath1, &db1);
937   assert(s1.ok());
938 
939   ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
940 
941   timer.Reset();
942   for (int i = 0; i < max_loop; ++i) {
943     WriteBatch batch;
944     for (int j = 0; j < bulk_size; ++j) {
945       snprintf(key,
946                20,
947                "%16lx",
948                (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
949       snprintf(value,
950                20,
951                "%16lx",
952                (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
953       batch.Put(key, value);
954     }
955     s1 = db1->Write(WriteOptions(), &batch);
956     assert(s1.ok());
957   }
958   std::cout << "Time by default : " << timer << "ms" << std::endl;
959   delete db1;
960 
961   /**********************************
962             use librados env
963   ***********************************/
964   std::string kDBPath2 = "/tmp/DBBulkLoadKeysInRandomOrder2";
965   DB* db2;
966   Options options2;
967   // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
968   options2.IncreaseParallelism();
969   options2.OptimizeLevelStyleCompaction();
970   // create the DB if it's not already present
971   options2.create_if_missing = true;
972   options2.env = env_;
973   options2.wal_dir = walPath;
974 
975   // open DB
976   Status s2 = DB::Open(options2, kDBPath2, &db2);
977   if (!s2.ok()) {
978     std::cerr << s2.ToString() << std::endl;
979   }
980   assert(s2.ok());
981 
982   ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
983 
984   timer.Reset();
985   for (int i = 0; i < max_loop; ++i) {
986     WriteBatch batch;
987     for (int j = 0; j < bulk_size; ++j) {
988       snprintf(key,
989                20,
990                "%16lx",
991                (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
992       snprintf(value,
993                20,
994                "%16lx",
995                (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
996       batch.Put(key, value);
997     }
998     s2 = db2->Write(WriteOptions(), &batch);
999     assert(s2.ok());
1000   }
1001   std::cout << "Time by librados : " << timer << "ms" << std::endl;
1002   delete db2;
1003 }
1004 
TEST_F(EnvLibradosMutipoolTest,DBTransactionDB)1005 TEST_F(EnvLibradosMutipoolTest, DBTransactionDB) {
1006   std::string kDBPath = "/tmp/DBTransactionDB";
1007   // open DB
1008   Options options;
1009   TransactionDBOptions txn_db_options;
1010   options.create_if_missing = true;
1011   options.env = env_;
1012   TransactionDB* txn_db;
1013 
1014   Status s = TransactionDB::Open(options, txn_db_options, kDBPath, &txn_db);
1015   assert(s.ok());
1016 
1017   WriteOptions write_options;
1018   ReadOptions read_options;
1019   TransactionOptions txn_options;
1020   std::string value;
1021 
1022   ////////////////////////////////////////////////////////
1023   //
1024   // Simple OptimisticTransaction Example ("Read Committed")
1025   //
1026   ////////////////////////////////////////////////////////
1027 
1028   // Start a transaction
1029   Transaction* txn = txn_db->BeginTransaction(write_options);
1030   assert(txn);
1031 
1032   // Read a key in this transaction
1033   s = txn->Get(read_options, "abc", &value);
1034   assert(s.IsNotFound());
1035 
1036   // Write a key in this transaction
1037   s = txn->Put("abc", "def");
1038   assert(s.ok());
1039 
1040   // Read a key OUTSIDE this transaction. Does not affect txn.
1041   s = txn_db->Get(read_options, "abc", &value);
1042 
1043   // Write a key OUTSIDE of this transaction.
1044   // Does not affect txn since this is an unrelated key.  If we wrote key 'abc'
1045   // here, the transaction would fail to commit.
1046   s = txn_db->Put(write_options, "xyz", "zzz");
1047 
1048   // Commit transaction
1049   s = txn->Commit();
1050   assert(s.ok());
1051   delete txn;
1052 
1053   ////////////////////////////////////////////////////////
1054   //
1055   // "Repeatable Read" (Snapshot Isolation) Example
1056   //   -- Using a single Snapshot
1057   //
1058   ////////////////////////////////////////////////////////
1059 
1060   // Set a snapshot at start of transaction by setting set_snapshot=true
1061   txn_options.set_snapshot = true;
1062   txn = txn_db->BeginTransaction(write_options, txn_options);
1063 
1064   const Snapshot* snapshot = txn->GetSnapshot();
1065 
1066   // Write a key OUTSIDE of transaction
1067   s = txn_db->Put(write_options, "abc", "xyz");
1068   assert(s.ok());
1069 
1070   // Attempt to read a key using the snapshot.  This will fail since
1071   // the previous write outside this txn conflicts with this read.
1072   read_options.snapshot = snapshot;
1073   s = txn->GetForUpdate(read_options, "abc", &value);
1074   assert(s.IsBusy());
1075 
1076   txn->Rollback();
1077 
1078   delete txn;
1079   // Clear snapshot from read options since it is no longer valid
1080   read_options.snapshot = nullptr;
1081   snapshot = nullptr;
1082 
1083   ////////////////////////////////////////////////////////
1084   //
1085   // "Read Committed" (Monotonic Atomic Views) Example
1086   //   --Using multiple Snapshots
1087   //
1088   ////////////////////////////////////////////////////////
1089 
1090   // In this example, we set the snapshot multiple times.  This is probably
1091   // only necessary if you have very strict isolation requirements to
1092   // implement.
1093 
1094   // Set a snapshot at start of transaction
1095   txn_options.set_snapshot = true;
1096   txn = txn_db->BeginTransaction(write_options, txn_options);
1097 
1098   // Do some reads and writes to key "x"
1099   read_options.snapshot = txn_db->GetSnapshot();
1100   s = txn->Get(read_options, "x", &value);
1101   txn->Put("x", "x");
1102 
1103   // Do a write outside of the transaction to key "y"
1104   s = txn_db->Put(write_options, "y", "y");
1105 
1106   // Set a new snapshot in the transaction
1107   txn->SetSnapshot();
1108   txn->SetSavePoint();
1109   read_options.snapshot = txn_db->GetSnapshot();
1110 
1111   // Do some reads and writes to key "y"
1112   // Since the snapshot was advanced, the write done outside of the
1113   // transaction does not conflict.
1114   s = txn->GetForUpdate(read_options, "y", &value);
1115   txn->Put("y", "y");
1116 
1117   // Decide we want to revert the last write from this transaction.
1118   txn->RollbackToSavePoint();
1119 
1120   // Commit.
1121   s = txn->Commit();
1122   assert(s.ok());
1123   delete txn;
1124   // Clear snapshot from read options since it is no longer valid
1125   read_options.snapshot = nullptr;
1126 
1127   // Cleanup
1128   delete txn_db;
1129   DestroyDB(kDBPath, options);
1130 }
1131 
1132 }  // namespace ROCKSDB_NAMESPACE
1133 
main(int argc,char ** argv)1134 int main(int argc, char** argv) {
1135   ::testing::InitGoogleTest(&argc, argv);
1136   if (getenv("CIRCLECI")) {
1137     fprintf(stderr,
1138             "TODO: get env_librados_test working in CI. Skipping for now.\n");
1139     return 0;
1140   }
1141   return RUN_ALL_TESTS();
1142 }
1143 
1144 #else
1145 #include <stdio.h>
1146 
main(int argc,char ** argv)1147 int main(int argc, char** argv) {
1148   fprintf(stderr, "SKIPPED as EnvLibrados is not supported in ROCKSDB_LITE\n");
1149   return 0;
1150 }
1151 
1152 #endif  // !ROCKSDB_LITE
1153