1 // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
2 // Copyright (c) 2016, Red Hat, Inc. All rights reserved.
3 // This source code is licensed under both the GPLv2 (found in the
4 // COPYING file in the root directory) and Apache 2.0 License
5 // (found in the LICENSE.Apache file in the root directory).
6
7 #ifndef ROCKSDB_LITE
8
9 #include "rocksdb/utilities/env_librados.h"
10 #include <rados/librados.hpp>
11 #include "env/mock_env.h"
12 #include "test_util/testharness.h"
13
14 #include "rocksdb/db.h"
15 #include "rocksdb/slice.h"
16 #include "rocksdb/options.h"
17 #include "util/random.h"
18 #include <chrono>
19 #include <ostream>
20 #include "rocksdb/utilities/transaction_db.h"
21
22 class Timer {
23 using high_resolution_clock = std::chrono::high_resolution_clock;
24 using milliseconds = std::chrono::milliseconds;
25
26 public:
Timer(bool run=false)27 explicit Timer(bool run = false)
28 {
29 if (run)
30 Reset();
31 }
Reset()32 void Reset()
33 {
34 _start = high_resolution_clock::now();
35 }
Elapsed() const36 milliseconds Elapsed() const
37 {
38 return std::chrono::duration_cast<milliseconds>(high_resolution_clock::now() - _start);
39 }
40 template <typename T, typename Traits>
operator <<(std::basic_ostream<T,Traits> & out,const Timer & timer)41 friend std::basic_ostream<T, Traits>& operator<<(std::basic_ostream<T, Traits>& out, const Timer& timer)
42 {
43 return out << timer.Elapsed().count();
44 }
45 private:
46 high_resolution_clock::time_point _start;
47 };
48
49 namespace ROCKSDB_NAMESPACE {
50
51 class EnvLibradosTest : public testing::Test {
52 public:
53 // we will use all of these below
54 const std::string db_name = "env_librados_test_db";
55 const std::string db_pool = db_name + "_pool";
56 const char *keyring = "admin";
57 const char *config = "../ceph/src/ceph.conf";
58
59 EnvLibrados* env_;
60 const EnvOptions soptions_;
61
EnvLibradosTest()62 EnvLibradosTest()
63 : env_(new EnvLibrados(db_name, config, db_pool)) {
64 }
~EnvLibradosTest()65 ~EnvLibradosTest() {
66 delete env_;
67 librados::Rados rados;
68 int ret = 0;
69 do {
70 ret = rados.init("admin"); // just use the client.admin keyring
71 if (ret < 0) { // let's handle any error that might have come back
72 std::cerr << "couldn't initialize rados! error " << ret << std::endl;
73 ret = EXIT_FAILURE;
74 break;
75 }
76
77 ret = rados.conf_read_file(config);
78 if (ret < 0) {
79 // This could fail if the config file is malformed, but it'd be hard.
80 std::cerr << "failed to parse config file " << config
81 << "! error" << ret << std::endl;
82 ret = EXIT_FAILURE;
83 break;
84 }
85
86 /*
87 * next, we actually connect to the cluster
88 */
89
90 ret = rados.connect();
91 if (ret < 0) {
92 std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
93 ret = EXIT_FAILURE;
94 break;
95 }
96
97 /*
98 * And now we're done, so let's remove our pool and then
99 * shut down the connection gracefully.
100 */
101 int delete_ret = rados.pool_delete(db_pool.c_str());
102 if (delete_ret < 0) {
103 // be careful not to
104 std::cerr << "We failed to delete our test pool!" << db_pool << delete_ret << std::endl;
105 ret = EXIT_FAILURE;
106 }
107 } while (0);
108 }
109 };
110
TEST_F(EnvLibradosTest,Basics)111 TEST_F(EnvLibradosTest, Basics) {
112 uint64_t file_size;
113 std::unique_ptr<WritableFile> writable_file;
114 std::vector<std::string> children;
115
116 ASSERT_OK(env_->CreateDir("/dir"));
117 // Check that the directory is empty.
118 ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/non_existent"));
119 ASSERT_TRUE(!env_->GetFileSize("/dir/non_existent", &file_size).ok());
120 ASSERT_OK(env_->GetChildren("/dir", &children));
121 ASSERT_EQ(0U, children.size());
122
123 // Create a file.
124 ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
125 writable_file.reset();
126
127 // Check that the file exists.
128 ASSERT_OK(env_->FileExists("/dir/f"));
129 ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
130 ASSERT_EQ(0U, file_size);
131 ASSERT_OK(env_->GetChildren("/dir", &children));
132 ASSERT_EQ(1U, children.size());
133 ASSERT_EQ("f", children[0]);
134
135 // Write to the file.
136 ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
137 ASSERT_OK(writable_file->Append("abc"));
138 writable_file.reset();
139
140
141 // Check for expected size.
142 ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
143 ASSERT_EQ(3U, file_size);
144
145
146 // Check that renaming works.
147 ASSERT_TRUE(!env_->RenameFile("/dir/non_existent", "/dir/g").ok());
148 ASSERT_OK(env_->RenameFile("/dir/f", "/dir/g"));
149 ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/f"));
150 ASSERT_OK(env_->FileExists("/dir/g"));
151 ASSERT_OK(env_->GetFileSize("/dir/g", &file_size));
152 ASSERT_EQ(3U, file_size);
153
154 // Check that opening non-existent file fails.
155 std::unique_ptr<SequentialFile> seq_file;
156 std::unique_ptr<RandomAccessFile> rand_file;
157 ASSERT_TRUE(
158 !env_->NewSequentialFile("/dir/non_existent", &seq_file, soptions_).ok());
159 ASSERT_TRUE(!seq_file);
160 ASSERT_TRUE(!env_->NewRandomAccessFile("/dir/non_existent", &rand_file,
161 soptions_).ok());
162 ASSERT_TRUE(!rand_file);
163
164 // Check that deleting works.
165 ASSERT_TRUE(!env_->DeleteFile("/dir/non_existent").ok());
166 ASSERT_OK(env_->DeleteFile("/dir/g"));
167 ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/g"));
168 ASSERT_OK(env_->GetChildren("/dir", &children));
169 ASSERT_EQ(0U, children.size());
170 ASSERT_OK(env_->DeleteDir("/dir"));
171 }
172
TEST_F(EnvLibradosTest,ReadWrite)173 TEST_F(EnvLibradosTest, ReadWrite) {
174 std::unique_ptr<WritableFile> writable_file;
175 std::unique_ptr<SequentialFile> seq_file;
176 std::unique_ptr<RandomAccessFile> rand_file;
177 Slice result;
178 char scratch[100];
179
180 ASSERT_OK(env_->CreateDir("/dir"));
181
182 ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
183 ASSERT_OK(writable_file->Append("hello "));
184 ASSERT_OK(writable_file->Append("world"));
185 writable_file.reset();
186
187 // Read sequentially.
188 ASSERT_OK(env_->NewSequentialFile("/dir/f", &seq_file, soptions_));
189 ASSERT_OK(seq_file->Read(5, &result, scratch)); // Read "hello".
190 ASSERT_EQ(0, result.compare("hello"));
191 ASSERT_OK(seq_file->Skip(1));
192 ASSERT_OK(seq_file->Read(1000, &result, scratch)); // Read "world".
193 ASSERT_EQ(0, result.compare("world"));
194 ASSERT_OK(seq_file->Read(1000, &result, scratch)); // Try reading past EOF.
195 ASSERT_EQ(0U, result.size());
196 ASSERT_OK(seq_file->Skip(100)); // Try to skip past end of file.
197 ASSERT_OK(seq_file->Read(1000, &result, scratch));
198 ASSERT_EQ(0U, result.size());
199
200 // Random reads.
201 ASSERT_OK(env_->NewRandomAccessFile("/dir/f", &rand_file, soptions_));
202 ASSERT_OK(rand_file->Read(6, 5, &result, scratch)); // Read "world".
203 ASSERT_EQ(0, result.compare("world"));
204 ASSERT_OK(rand_file->Read(0, 5, &result, scratch)); // Read "hello".
205 ASSERT_EQ(0, result.compare("hello"));
206 ASSERT_OK(rand_file->Read(10, 100, &result, scratch)); // Read "d".
207 ASSERT_EQ(0, result.compare("d"));
208
209 // Too high offset.
210 ASSERT_OK(rand_file->Read(1000, 5, &result, scratch));
211 }
212
TEST_F(EnvLibradosTest,Locks)213 TEST_F(EnvLibradosTest, Locks) {
214 FileLock* lock = nullptr;
215 std::unique_ptr<WritableFile> writable_file;
216
217 ASSERT_OK(env_->CreateDir("/dir"));
218
219 ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
220
221 // These are no-ops, but we test they return success.
222 ASSERT_OK(env_->LockFile("some file", &lock));
223 ASSERT_OK(env_->UnlockFile(lock));
224
225 ASSERT_OK(env_->LockFile("/dir/f", &lock));
226 ASSERT_OK(env_->UnlockFile(lock));
227 }
228
TEST_F(EnvLibradosTest,Misc)229 TEST_F(EnvLibradosTest, Misc) {
230 std::string test_dir;
231 ASSERT_OK(env_->GetTestDirectory(&test_dir));
232 ASSERT_TRUE(!test_dir.empty());
233
234 std::unique_ptr<WritableFile> writable_file;
235 ASSERT_TRUE(!env_->NewWritableFile("/a/b", &writable_file, soptions_).ok());
236
237 ASSERT_OK(env_->NewWritableFile("/a", &writable_file, soptions_));
238 // These are no-ops, but we test they return success.
239 ASSERT_OK(writable_file->Sync());
240 ASSERT_OK(writable_file->Flush());
241 ASSERT_OK(writable_file->Close());
242 writable_file.reset();
243 }
244
TEST_F(EnvLibradosTest,LargeWrite)245 TEST_F(EnvLibradosTest, LargeWrite) {
246 const size_t kWriteSize = 300 * 1024;
247 char* scratch = new char[kWriteSize * 2];
248
249 std::string write_data;
250 for (size_t i = 0; i < kWriteSize; ++i) {
251 write_data.append(1, 'h');
252 }
253
254 std::unique_ptr<WritableFile> writable_file;
255 ASSERT_OK(env_->CreateDir("/dir"));
256 ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
257 ASSERT_OK(writable_file->Append("foo"));
258 ASSERT_OK(writable_file->Append(write_data));
259 writable_file.reset();
260
261 std::unique_ptr<SequentialFile> seq_file;
262 Slice result;
263 ASSERT_OK(env_->NewSequentialFile("/dir/g", &seq_file, soptions_));
264 ASSERT_OK(seq_file->Read(3, &result, scratch)); // Read "foo".
265 ASSERT_EQ(0, result.compare("foo"));
266
267 size_t read = 0;
268 std::string read_data;
269 while (read < kWriteSize) {
270 ASSERT_OK(seq_file->Read(kWriteSize - read, &result, scratch));
271 read_data.append(result.data(), result.size());
272 read += result.size();
273 }
274 ASSERT_TRUE(write_data == read_data);
275 delete[] scratch;
276 }
277
TEST_F(EnvLibradosTest,FrequentlySmallWrite)278 TEST_F(EnvLibradosTest, FrequentlySmallWrite) {
279 const size_t kWriteSize = 1 << 10;
280 char* scratch = new char[kWriteSize * 2];
281
282 std::string write_data;
283 for (size_t i = 0; i < kWriteSize; ++i) {
284 write_data.append(1, 'h');
285 }
286
287 std::unique_ptr<WritableFile> writable_file;
288 ASSERT_OK(env_->CreateDir("/dir"));
289 ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
290 ASSERT_OK(writable_file->Append("foo"));
291
292 for (size_t i = 0; i < kWriteSize; ++i) {
293 ASSERT_OK(writable_file->Append("h"));
294 }
295 writable_file.reset();
296
297 std::unique_ptr<SequentialFile> seq_file;
298 Slice result;
299 ASSERT_OK(env_->NewSequentialFile("/dir/g", &seq_file, soptions_));
300 ASSERT_OK(seq_file->Read(3, &result, scratch)); // Read "foo".
301 ASSERT_EQ(0, result.compare("foo"));
302
303 size_t read = 0;
304 std::string read_data;
305 while (read < kWriteSize) {
306 ASSERT_OK(seq_file->Read(kWriteSize - read, &result, scratch));
307 read_data.append(result.data(), result.size());
308 read += result.size();
309 }
310 ASSERT_TRUE(write_data == read_data);
311 delete[] scratch;
312 }
313
TEST_F(EnvLibradosTest,Truncate)314 TEST_F(EnvLibradosTest, Truncate) {
315 const size_t kWriteSize = 300 * 1024;
316 const size_t truncSize = 1024;
317 std::string write_data;
318 for (size_t i = 0; i < kWriteSize; ++i) {
319 write_data.append(1, 'h');
320 }
321
322 std::unique_ptr<WritableFile> writable_file;
323 ASSERT_OK(env_->CreateDir("/dir"));
324 ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
325 ASSERT_OK(writable_file->Append(write_data));
326 ASSERT_EQ(writable_file->GetFileSize(), kWriteSize);
327 ASSERT_OK(writable_file->Truncate(truncSize));
328 ASSERT_EQ(writable_file->GetFileSize(), truncSize);
329 writable_file.reset();
330 }
331
TEST_F(EnvLibradosTest,DBBasics)332 TEST_F(EnvLibradosTest, DBBasics) {
333 std::string kDBPath = "/tmp/DBBasics";
334 DB* db;
335 Options options;
336 // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
337 options.IncreaseParallelism();
338 options.OptimizeLevelStyleCompaction();
339 // create the DB if it's not already present
340 options.create_if_missing = true;
341 options.env = env_;
342
343 // open DB
344 Status s = DB::Open(options, kDBPath, &db);
345 assert(s.ok());
346
347 // Put key-value
348 s = db->Put(WriteOptions(), "key1", "value");
349 assert(s.ok());
350 std::string value;
351 // get value
352 s = db->Get(ReadOptions(), "key1", &value);
353 assert(s.ok());
354 assert(value == "value");
355
356 // atomically apply a set of updates
357 {
358 WriteBatch batch;
359 batch.Delete("key1");
360 batch.Put("key2", value);
361 s = db->Write(WriteOptions(), &batch);
362 }
363
364 s = db->Get(ReadOptions(), "key1", &value);
365 assert(s.IsNotFound());
366
367 db->Get(ReadOptions(), "key2", &value);
368 assert(value == "value");
369
370 delete db;
371 }
372
TEST_F(EnvLibradosTest,DBLoadKeysInRandomOrder)373 TEST_F(EnvLibradosTest, DBLoadKeysInRandomOrder) {
374 char key[20] = {0}, value[20] = {0};
375 int max_loop = 1 << 10;
376 Timer timer(false);
377 std::cout << "Test size : loop(" << max_loop << ")" << std::endl;
378 /**********************************
379 use default env
380 ***********************************/
381 std::string kDBPath1 = "/tmp/DBLoadKeysInRandomOrder1";
382 DB* db1;
383 Options options1;
384 // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
385 options1.IncreaseParallelism();
386 options1.OptimizeLevelStyleCompaction();
387 // create the DB if it's not already present
388 options1.create_if_missing = true;
389
390 // open DB
391 Status s1 = DB::Open(options1, kDBPath1, &db1);
392 assert(s1.ok());
393
394 ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
395
396 timer.Reset();
397 for (int i = 0; i < max_loop; ++i) {
398 snprintf(key,
399 20,
400 "%16lx",
401 (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
402 snprintf(value,
403 20,
404 "%16lx",
405 (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
406 // Put key-value
407 s1 = db1->Put(WriteOptions(), key, value);
408 assert(s1.ok());
409 }
410 std::cout << "Time by default : " << timer << "ms" << std::endl;
411 delete db1;
412
413 /**********************************
414 use librados env
415 ***********************************/
416 std::string kDBPath2 = "/tmp/DBLoadKeysInRandomOrder2";
417 DB* db2;
418 Options options2;
419 // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
420 options2.IncreaseParallelism();
421 options2.OptimizeLevelStyleCompaction();
422 // create the DB if it's not already present
423 options2.create_if_missing = true;
424 options2.env = env_;
425
426 // open DB
427 Status s2 = DB::Open(options2, kDBPath2, &db2);
428 assert(s2.ok());
429
430 ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
431
432 timer.Reset();
433 for (int i = 0; i < max_loop; ++i) {
434 snprintf(key,
435 20,
436 "%16lx",
437 (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
438 snprintf(value,
439 20,
440 "%16lx",
441 (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
442 // Put key-value
443 s2 = db2->Put(WriteOptions(), key, value);
444 assert(s2.ok());
445 }
446 std::cout << "Time by librados : " << timer << "ms" << std::endl;
447 delete db2;
448 }
449
TEST_F(EnvLibradosTest,DBBulkLoadKeysInRandomOrder)450 TEST_F(EnvLibradosTest, DBBulkLoadKeysInRandomOrder) {
451 char key[20] = {0}, value[20] = {0};
452 int max_loop = 1 << 6;
453 int bulk_size = 1 << 15;
454 Timer timer(false);
455 std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
456 /**********************************
457 use default env
458 ***********************************/
459 std::string kDBPath1 = "/tmp/DBBulkLoadKeysInRandomOrder1";
460 DB* db1;
461 Options options1;
462 // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
463 options1.IncreaseParallelism();
464 options1.OptimizeLevelStyleCompaction();
465 // create the DB if it's not already present
466 options1.create_if_missing = true;
467
468 // open DB
469 Status s1 = DB::Open(options1, kDBPath1, &db1);
470 assert(s1.ok());
471
472 ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
473
474 timer.Reset();
475 for (int i = 0; i < max_loop; ++i) {
476 WriteBatch batch;
477 for (int j = 0; j < bulk_size; ++j) {
478 snprintf(key,
479 20,
480 "%16lx",
481 (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
482 snprintf(value,
483 20,
484 "%16lx",
485 (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
486 batch.Put(key, value);
487 }
488 s1 = db1->Write(WriteOptions(), &batch);
489 assert(s1.ok());
490 }
491 std::cout << "Time by default : " << timer << "ms" << std::endl;
492 delete db1;
493
494 /**********************************
495 use librados env
496 ***********************************/
497 std::string kDBPath2 = "/tmp/DBBulkLoadKeysInRandomOrder2";
498 DB* db2;
499 Options options2;
500 // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
501 options2.IncreaseParallelism();
502 options2.OptimizeLevelStyleCompaction();
503 // create the DB if it's not already present
504 options2.create_if_missing = true;
505 options2.env = env_;
506
507 // open DB
508 Status s2 = DB::Open(options2, kDBPath2, &db2);
509 assert(s2.ok());
510
511 ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
512
513 timer.Reset();
514 for (int i = 0; i < max_loop; ++i) {
515 WriteBatch batch;
516 for (int j = 0; j < bulk_size; ++j) {
517 snprintf(key,
518 20,
519 "%16lx",
520 (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
521 snprintf(value,
522 20,
523 "%16lx",
524 (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
525 batch.Put(key, value);
526 }
527 s2 = db2->Write(WriteOptions(), &batch);
528 assert(s2.ok());
529 }
530 std::cout << "Time by librados : " << timer << "ms" << std::endl;
531 delete db2;
532 }
533
TEST_F(EnvLibradosTest,DBBulkLoadKeysInSequentialOrder)534 TEST_F(EnvLibradosTest, DBBulkLoadKeysInSequentialOrder) {
535 char key[20] = {0}, value[20] = {0};
536 int max_loop = 1 << 6;
537 int bulk_size = 1 << 15;
538 Timer timer(false);
539 std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
540 /**********************************
541 use default env
542 ***********************************/
543 std::string kDBPath1 = "/tmp/DBBulkLoadKeysInSequentialOrder1";
544 DB* db1;
545 Options options1;
546 // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
547 options1.IncreaseParallelism();
548 options1.OptimizeLevelStyleCompaction();
549 // create the DB if it's not already present
550 options1.create_if_missing = true;
551
552 // open DB
553 Status s1 = DB::Open(options1, kDBPath1, &db1);
554 assert(s1.ok());
555
556 ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
557
558 timer.Reset();
559 for (int i = 0; i < max_loop; ++i) {
560 WriteBatch batch;
561 for (int j = 0; j < bulk_size; ++j) {
562 snprintf(key,
563 20,
564 "%019lld",
565 (long long)(i * bulk_size + j));
566 snprintf(value,
567 20,
568 "%16lx",
569 (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
570 batch.Put(key, value);
571 }
572 s1 = db1->Write(WriteOptions(), &batch);
573 assert(s1.ok());
574 }
575 std::cout << "Time by default : " << timer << "ms" << std::endl;
576 delete db1;
577
578 /**********************************
579 use librados env
580 ***********************************/
581 std::string kDBPath2 = "/tmp/DBBulkLoadKeysInSequentialOrder2";
582 DB* db2;
583 Options options2;
584 // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
585 options2.IncreaseParallelism();
586 options2.OptimizeLevelStyleCompaction();
587 // create the DB if it's not already present
588 options2.create_if_missing = true;
589 options2.env = env_;
590
591 // open DB
592 Status s2 = DB::Open(options2, kDBPath2, &db2);
593 assert(s2.ok());
594
595 ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
596
597 timer.Reset();
598 for (int i = 0; i < max_loop; ++i) {
599 WriteBatch batch;
600 for (int j = 0; j < bulk_size; ++j) {
601 snprintf(key,
602 20,
603 "%16lx",
604 (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
605 snprintf(value,
606 20,
607 "%16lx",
608 (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
609 batch.Put(key, value);
610 }
611 s2 = db2->Write(WriteOptions(), &batch);
612 assert(s2.ok());
613 }
614 std::cout << "Time by librados : " << timer << "ms" << std::endl;
615 delete db2;
616 }
617
TEST_F(EnvLibradosTest,DBRandomRead)618 TEST_F(EnvLibradosTest, DBRandomRead) {
619 char key[20] = {0}, value[20] = {0};
620 int max_loop = 1 << 6;
621 int bulk_size = 1 << 10;
622 int read_loop = 1 << 20;
623 Timer timer(false);
624 std::cout << "Test size : keys_num(" << max_loop << ", " << bulk_size << "); read_loop(" << read_loop << ")" << std::endl;
625 /**********************************
626 use default env
627 ***********************************/
628 std::string kDBPath1 = "/tmp/DBRandomRead1";
629 DB* db1;
630 Options options1;
631 // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
632 options1.IncreaseParallelism();
633 options1.OptimizeLevelStyleCompaction();
634 // create the DB if it's not already present
635 options1.create_if_missing = true;
636
637 // open DB
638 Status s1 = DB::Open(options1, kDBPath1, &db1);
639 assert(s1.ok());
640
641 ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
642
643 for (int i = 0; i < max_loop; ++i) {
644 WriteBatch batch;
645 for (int j = 0; j < bulk_size; ++j) {
646 snprintf(key,
647 20,
648 "%019lld",
649 (long long)(i * bulk_size + j));
650 snprintf(value,
651 20,
652 "%16lx",
653 (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
654 batch.Put(key, value);
655 }
656 s1 = db1->Write(WriteOptions(), &batch);
657 assert(s1.ok());
658 }
659 timer.Reset();
660 int base1 = 0, offset1 = 0;
661 for (int i = 0; i < read_loop; ++i) {
662 base1 = r1.Uniform(max_loop);
663 offset1 = r1.Uniform(bulk_size);
664 std::string value1;
665 snprintf(key,
666 20,
667 "%019lld",
668 (long long)(base1 * bulk_size + offset1));
669 s1 = db1->Get(ReadOptions(), key, &value1);
670 assert(s1.ok());
671 }
672 std::cout << "Time by default : " << timer << "ms" << std::endl;
673 delete db1;
674
675 /**********************************
676 use librados env
677 ***********************************/
678 std::string kDBPath2 = "/tmp/DBRandomRead2";
679 DB* db2;
680 Options options2;
681 // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
682 options2.IncreaseParallelism();
683 options2.OptimizeLevelStyleCompaction();
684 // create the DB if it's not already present
685 options2.create_if_missing = true;
686 options2.env = env_;
687
688 // open DB
689 Status s2 = DB::Open(options2, kDBPath2, &db2);
690 assert(s2.ok());
691
692 ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
693
694 for (int i = 0; i < max_loop; ++i) {
695 WriteBatch batch;
696 for (int j = 0; j < bulk_size; ++j) {
697 snprintf(key,
698 20,
699 "%019lld",
700 (long long)(i * bulk_size + j));
701 snprintf(value,
702 20,
703 "%16lx",
704 (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
705 batch.Put(key, value);
706 }
707 s2 = db2->Write(WriteOptions(), &batch);
708 assert(s2.ok());
709 }
710
711 timer.Reset();
712 int base2 = 0, offset2 = 0;
713 for (int i = 0; i < read_loop; ++i) {
714 base2 = r2.Uniform(max_loop);
715 offset2 = r2.Uniform(bulk_size);
716 std::string value2;
717 snprintf(key,
718 20,
719 "%019lld",
720 (long long)(base2 * bulk_size + offset2));
721 s2 = db2->Get(ReadOptions(), key, &value2);
722 if (!s2.ok()) {
723 std::cout << s2.ToString() << std::endl;
724 }
725 assert(s2.ok());
726 }
727 std::cout << "Time by librados : " << timer << "ms" << std::endl;
728 delete db2;
729 }
730
731 class EnvLibradosMutipoolTest : public testing::Test {
732 public:
733 // we will use all of these below
734 const std::string client_name = "client.admin";
735 const std::string cluster_name = "ceph";
736 const uint64_t flags = 0;
737 const std::string db_name = "env_librados_test_db";
738 const std::string db_pool = db_name + "_pool";
739 const std::string wal_dir = "/wal";
740 const std::string wal_pool = db_name + "_wal_pool";
741 const size_t write_buffer_size = 1 << 20;
742 const char *keyring = "admin";
743 const char *config = "../ceph/src/ceph.conf";
744
745 EnvLibrados* env_;
746 const EnvOptions soptions_;
747
EnvLibradosMutipoolTest()748 EnvLibradosMutipoolTest() {
749 env_ = new EnvLibrados(client_name, cluster_name, flags, db_name, config, db_pool, wal_dir, wal_pool, write_buffer_size);
750 }
~EnvLibradosMutipoolTest()751 ~EnvLibradosMutipoolTest() {
752 delete env_;
753 librados::Rados rados;
754 int ret = 0;
755 do {
756 ret = rados.init("admin"); // just use the client.admin keyring
757 if (ret < 0) { // let's handle any error that might have come back
758 std::cerr << "couldn't initialize rados! error " << ret << std::endl;
759 ret = EXIT_FAILURE;
760 break;
761 }
762
763 ret = rados.conf_read_file(config);
764 if (ret < 0) {
765 // This could fail if the config file is malformed, but it'd be hard.
766 std::cerr << "failed to parse config file " << config
767 << "! error" << ret << std::endl;
768 ret = EXIT_FAILURE;
769 break;
770 }
771
772 /*
773 * next, we actually connect to the cluster
774 */
775
776 ret = rados.connect();
777 if (ret < 0) {
778 std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
779 ret = EXIT_FAILURE;
780 break;
781 }
782
783 /*
784 * And now we're done, so let's remove our pool and then
785 * shut down the connection gracefully.
786 */
787 int delete_ret = rados.pool_delete(db_pool.c_str());
788 if (delete_ret < 0) {
789 // be careful not to
790 std::cerr << "We failed to delete our test pool!" << db_pool << delete_ret << std::endl;
791 ret = EXIT_FAILURE;
792 }
793 delete_ret = rados.pool_delete(wal_pool.c_str());
794 if (delete_ret < 0) {
795 // be careful not to
796 std::cerr << "We failed to delete our test pool!" << wal_pool << delete_ret << std::endl;
797 ret = EXIT_FAILURE;
798 }
799 } while (0);
800 }
801 };
802
TEST_F(EnvLibradosMutipoolTest,Basics)803 TEST_F(EnvLibradosMutipoolTest, Basics) {
804 uint64_t file_size;
805 std::unique_ptr<WritableFile> writable_file;
806 std::vector<std::string> children;
807 std::vector<std::string> v = {"/tmp/dir1", "/tmp/dir2", "/tmp/dir3", "/tmp/dir4", "dir"};
808
809 for (size_t i = 0; i < v.size(); ++i) {
810 std::string dir = v[i];
811 std::string dir_non_existent = dir + "/non_existent";
812 std::string dir_f = dir + "/f";
813 std::string dir_g = dir + "/g";
814
815 ASSERT_OK(env_->CreateDir(dir.c_str()));
816 // Check that the directory is empty.
817 ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_non_existent.c_str()));
818 ASSERT_TRUE(!env_->GetFileSize(dir_non_existent.c_str(), &file_size).ok());
819 ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
820 ASSERT_EQ(0U, children.size());
821
822 // Create a file.
823 ASSERT_OK(env_->NewWritableFile(dir_f.c_str(), &writable_file, soptions_));
824 writable_file.reset();
825
826 // Check that the file exists.
827 ASSERT_OK(env_->FileExists(dir_f.c_str()));
828 ASSERT_OK(env_->GetFileSize(dir_f.c_str(), &file_size));
829 ASSERT_EQ(0U, file_size);
830 ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
831 ASSERT_EQ(1U, children.size());
832 ASSERT_EQ("f", children[0]);
833
834 // Write to the file.
835 ASSERT_OK(env_->NewWritableFile(dir_f.c_str(), &writable_file, soptions_));
836 ASSERT_OK(writable_file->Append("abc"));
837 writable_file.reset();
838
839
840 // Check for expected size.
841 ASSERT_OK(env_->GetFileSize(dir_f.c_str(), &file_size));
842 ASSERT_EQ(3U, file_size);
843
844
845 // Check that renaming works.
846 ASSERT_TRUE(!env_->RenameFile(dir_non_existent.c_str(), dir_g.c_str()).ok());
847 ASSERT_OK(env_->RenameFile(dir_f.c_str(), dir_g.c_str()));
848 ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_f.c_str()));
849 ASSERT_OK(env_->FileExists(dir_g.c_str()));
850 ASSERT_OK(env_->GetFileSize(dir_g.c_str(), &file_size));
851 ASSERT_EQ(3U, file_size);
852
853 // Check that opening non-existent file fails.
854 std::unique_ptr<SequentialFile> seq_file;
855 std::unique_ptr<RandomAccessFile> rand_file;
856 ASSERT_TRUE(
857 !env_->NewSequentialFile(dir_non_existent.c_str(), &seq_file, soptions_).ok());
858 ASSERT_TRUE(!seq_file);
859 ASSERT_TRUE(!env_->NewRandomAccessFile(dir_non_existent.c_str(), &rand_file,
860 soptions_).ok());
861 ASSERT_TRUE(!rand_file);
862
863 // Check that deleting works.
864 ASSERT_TRUE(!env_->DeleteFile(dir_non_existent.c_str()).ok());
865 ASSERT_OK(env_->DeleteFile(dir_g.c_str()));
866 ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_g.c_str()));
867 ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
868 ASSERT_EQ(0U, children.size());
869 ASSERT_OK(env_->DeleteDir(dir.c_str()));
870 }
871 }
872
TEST_F(EnvLibradosMutipoolTest,DBBasics)873 TEST_F(EnvLibradosMutipoolTest, DBBasics) {
874 std::string kDBPath = "/tmp/DBBasics";
875 std::string walPath = "/tmp/wal";
876 DB* db;
877 Options options;
878 // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
879 options.IncreaseParallelism();
880 options.OptimizeLevelStyleCompaction();
881 // create the DB if it's not already present
882 options.create_if_missing = true;
883 options.env = env_;
884 options.wal_dir = walPath;
885
886 // open DB
887 Status s = DB::Open(options, kDBPath, &db);
888 assert(s.ok());
889
890 // Put key-value
891 s = db->Put(WriteOptions(), "key1", "value");
892 assert(s.ok());
893 std::string value;
894 // get value
895 s = db->Get(ReadOptions(), "key1", &value);
896 assert(s.ok());
897 assert(value == "value");
898
899 // atomically apply a set of updates
900 {
901 WriteBatch batch;
902 batch.Delete("key1");
903 batch.Put("key2", value);
904 s = db->Write(WriteOptions(), &batch);
905 }
906
907 s = db->Get(ReadOptions(), "key1", &value);
908 assert(s.IsNotFound());
909
910 db->Get(ReadOptions(), "key2", &value);
911 assert(value == "value");
912
913 delete db;
914 }
915
TEST_F(EnvLibradosMutipoolTest,DBBulkLoadKeysInRandomOrder)916 TEST_F(EnvLibradosMutipoolTest, DBBulkLoadKeysInRandomOrder) {
917 char key[20] = {0}, value[20] = {0};
918 int max_loop = 1 << 6;
919 int bulk_size = 1 << 15;
920 Timer timer(false);
921 std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
922 /**********************************
923 use default env
924 ***********************************/
925 std::string kDBPath1 = "/tmp/DBBulkLoadKeysInRandomOrder1";
926 std::string walPath = "/tmp/wal";
927 DB* db1;
928 Options options1;
929 // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
930 options1.IncreaseParallelism();
931 options1.OptimizeLevelStyleCompaction();
932 // create the DB if it's not already present
933 options1.create_if_missing = true;
934
935 // open DB
936 Status s1 = DB::Open(options1, kDBPath1, &db1);
937 assert(s1.ok());
938
939 ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
940
941 timer.Reset();
942 for (int i = 0; i < max_loop; ++i) {
943 WriteBatch batch;
944 for (int j = 0; j < bulk_size; ++j) {
945 snprintf(key,
946 20,
947 "%16lx",
948 (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
949 snprintf(value,
950 20,
951 "%16lx",
952 (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
953 batch.Put(key, value);
954 }
955 s1 = db1->Write(WriteOptions(), &batch);
956 assert(s1.ok());
957 }
958 std::cout << "Time by default : " << timer << "ms" << std::endl;
959 delete db1;
960
961 /**********************************
962 use librados env
963 ***********************************/
964 std::string kDBPath2 = "/tmp/DBBulkLoadKeysInRandomOrder2";
965 DB* db2;
966 Options options2;
967 // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
968 options2.IncreaseParallelism();
969 options2.OptimizeLevelStyleCompaction();
970 // create the DB if it's not already present
971 options2.create_if_missing = true;
972 options2.env = env_;
973 options2.wal_dir = walPath;
974
975 // open DB
976 Status s2 = DB::Open(options2, kDBPath2, &db2);
977 if (!s2.ok()) {
978 std::cerr << s2.ToString() << std::endl;
979 }
980 assert(s2.ok());
981
982 ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
983
984 timer.Reset();
985 for (int i = 0; i < max_loop; ++i) {
986 WriteBatch batch;
987 for (int j = 0; j < bulk_size; ++j) {
988 snprintf(key,
989 20,
990 "%16lx",
991 (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
992 snprintf(value,
993 20,
994 "%16lx",
995 (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
996 batch.Put(key, value);
997 }
998 s2 = db2->Write(WriteOptions(), &batch);
999 assert(s2.ok());
1000 }
1001 std::cout << "Time by librados : " << timer << "ms" << std::endl;
1002 delete db2;
1003 }
1004
TEST_F(EnvLibradosMutipoolTest,DBTransactionDB)1005 TEST_F(EnvLibradosMutipoolTest, DBTransactionDB) {
1006 std::string kDBPath = "/tmp/DBTransactionDB";
1007 // open DB
1008 Options options;
1009 TransactionDBOptions txn_db_options;
1010 options.create_if_missing = true;
1011 options.env = env_;
1012 TransactionDB* txn_db;
1013
1014 Status s = TransactionDB::Open(options, txn_db_options, kDBPath, &txn_db);
1015 assert(s.ok());
1016
1017 WriteOptions write_options;
1018 ReadOptions read_options;
1019 TransactionOptions txn_options;
1020 std::string value;
1021
1022 ////////////////////////////////////////////////////////
1023 //
1024 // Simple OptimisticTransaction Example ("Read Committed")
1025 //
1026 ////////////////////////////////////////////////////////
1027
1028 // Start a transaction
1029 Transaction* txn = txn_db->BeginTransaction(write_options);
1030 assert(txn);
1031
1032 // Read a key in this transaction
1033 s = txn->Get(read_options, "abc", &value);
1034 assert(s.IsNotFound());
1035
1036 // Write a key in this transaction
1037 s = txn->Put("abc", "def");
1038 assert(s.ok());
1039
1040 // Read a key OUTSIDE this transaction. Does not affect txn.
1041 s = txn_db->Get(read_options, "abc", &value);
1042
1043 // Write a key OUTSIDE of this transaction.
1044 // Does not affect txn since this is an unrelated key. If we wrote key 'abc'
1045 // here, the transaction would fail to commit.
1046 s = txn_db->Put(write_options, "xyz", "zzz");
1047
1048 // Commit transaction
1049 s = txn->Commit();
1050 assert(s.ok());
1051 delete txn;
1052
1053 ////////////////////////////////////////////////////////
1054 //
1055 // "Repeatable Read" (Snapshot Isolation) Example
1056 // -- Using a single Snapshot
1057 //
1058 ////////////////////////////////////////////////////////
1059
1060 // Set a snapshot at start of transaction by setting set_snapshot=true
1061 txn_options.set_snapshot = true;
1062 txn = txn_db->BeginTransaction(write_options, txn_options);
1063
1064 const Snapshot* snapshot = txn->GetSnapshot();
1065
1066 // Write a key OUTSIDE of transaction
1067 s = txn_db->Put(write_options, "abc", "xyz");
1068 assert(s.ok());
1069
1070 // Attempt to read a key using the snapshot. This will fail since
1071 // the previous write outside this txn conflicts with this read.
1072 read_options.snapshot = snapshot;
1073 s = txn->GetForUpdate(read_options, "abc", &value);
1074 assert(s.IsBusy());
1075
1076 txn->Rollback();
1077
1078 delete txn;
1079 // Clear snapshot from read options since it is no longer valid
1080 read_options.snapshot = nullptr;
1081 snapshot = nullptr;
1082
1083 ////////////////////////////////////////////////////////
1084 //
1085 // "Read Committed" (Monotonic Atomic Views) Example
1086 // --Using multiple Snapshots
1087 //
1088 ////////////////////////////////////////////////////////
1089
1090 // In this example, we set the snapshot multiple times. This is probably
1091 // only necessary if you have very strict isolation requirements to
1092 // implement.
1093
1094 // Set a snapshot at start of transaction
1095 txn_options.set_snapshot = true;
1096 txn = txn_db->BeginTransaction(write_options, txn_options);
1097
1098 // Do some reads and writes to key "x"
1099 read_options.snapshot = txn_db->GetSnapshot();
1100 s = txn->Get(read_options, "x", &value);
1101 txn->Put("x", "x");
1102
1103 // Do a write outside of the transaction to key "y"
1104 s = txn_db->Put(write_options, "y", "y");
1105
1106 // Set a new snapshot in the transaction
1107 txn->SetSnapshot();
1108 txn->SetSavePoint();
1109 read_options.snapshot = txn_db->GetSnapshot();
1110
1111 // Do some reads and writes to key "y"
1112 // Since the snapshot was advanced, the write done outside of the
1113 // transaction does not conflict.
1114 s = txn->GetForUpdate(read_options, "y", &value);
1115 txn->Put("y", "y");
1116
1117 // Decide we want to revert the last write from this transaction.
1118 txn->RollbackToSavePoint();
1119
1120 // Commit.
1121 s = txn->Commit();
1122 assert(s.ok());
1123 delete txn;
1124 // Clear snapshot from read options since it is no longer valid
1125 read_options.snapshot = nullptr;
1126
1127 // Cleanup
1128 delete txn_db;
1129 DestroyDB(kDBPath, options);
1130 }
1131
1132 } // namespace ROCKSDB_NAMESPACE
1133
main(int argc,char ** argv)1134 int main(int argc, char** argv) {
1135 ::testing::InitGoogleTest(&argc, argv);
1136 if (getenv("CIRCLECI")) {
1137 fprintf(stderr,
1138 "TODO: get env_librados_test working in CI. Skipping for now.\n");
1139 return 0;
1140 }
1141 return RUN_ALL_TESTS();
1142 }
1143
1144 #else
1145 #include <stdio.h>
1146
main(int argc,char ** argv)1147 int main(int argc, char** argv) {
1148 fprintf(stderr, "SKIPPED as EnvLibrados is not supported in ROCKSDB_LITE\n");
1149 return 0;
1150 }
1151
1152 #endif // !ROCKSDB_LITE
1153