1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7 #ifndef GFLAGS
8 #include <cstdio>
main()9 int main() {
10 fprintf(stderr, "Please install gflags to run rocksdb tools\n");
11 return 1;
12 }
13 #else
14
15 #include <atomic>
16 #include <cstdio>
17
18 #include "db/write_batch_internal.h"
19 #include "rocksdb/db.h"
20 #include "rocksdb/types.h"
21 #include "test_util/testutil.h"
22 #include "util/gflags_compat.h"
23
24 // Run a thread to perform Put's.
25 // Another thread uses GetUpdatesSince API to keep getting the updates.
26 // options :
27 // --num_inserts = the num of inserts the first thread should perform.
28 // --wal_ttl = the wal ttl for the run.
29
30 DEFINE_uint64(num_inserts, 1000,
31 "the num of inserts the first thread should"
32 " perform.");
33 DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
34 DEFINE_uint64(wal_size_limit_MB, 10,
35 "the wal size limit for the run"
36 "(in MB)");
37
38 using namespace ROCKSDB_NAMESPACE;
39
40 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
41 using GFLAGS_NAMESPACE::SetUsageMessage;
42
43 struct DataPumpThread {
44 DB* db; // Assumption DB is Open'ed already.
45 };
46
DataPumpThreadBody(void * arg)47 static void DataPumpThreadBody(void* arg) {
48 DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg);
49 DB* db = t->db;
50 Random rnd(301);
51 uint64_t i = 0;
52 while (i++ < FLAGS_num_inserts) {
53 if (!db->Put(WriteOptions(), Slice(rnd.RandomString(500)),
54 Slice(rnd.RandomString(500)))
55 .ok()) {
56 fprintf(stderr, "Error in put\n");
57 exit(1);
58 }
59 }
60 }
61
main(int argc,const char ** argv)62 int main(int argc, const char** argv) {
63 SetUsageMessage(
64 std::string("\nUSAGE:\n") + std::string(argv[0]) +
65 " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" +
66 " --wal_size_limit_MB=<WAL_size_limit_MB>");
67 ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true);
68
69 Env* env = Env::Default();
70 std::string default_db_path;
71 env->GetTestDirectory(&default_db_path);
72 default_db_path += "db_repl_stress";
73 Options options;
74 options.create_if_missing = true;
75 options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
76 options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
77 DB* db;
78 DestroyDB(default_db_path, options);
79
80 Status s = DB::Open(options, default_db_path, &db);
81
82 if (!s.ok()) {
83 fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str());
84 exit(1);
85 }
86
87 DataPumpThread dataPump;
88 dataPump.db = db;
89 env->StartThread(DataPumpThreadBody, &dataPump);
90
91 std::unique_ptr<TransactionLogIterator> iter;
92 SequenceNumber currentSeqNum = 1;
93 uint64_t num_read = 0;
94 for (;;) {
95 iter.reset();
96 // Continue to probe a bit more after all received
97 size_t probes = 0;
98 while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
99 probes++;
100 if (probes > 100 && num_read >= FLAGS_num_inserts) {
101 if (num_read > FLAGS_num_inserts) {
102 fprintf(stderr, "Too many updates read: %ld expected: %ld\n",
103 (long)num_read, (long)FLAGS_num_inserts);
104 exit(1);
105 }
106 fprintf(stderr, "Successful!\n");
107 return 0;
108 }
109 }
110 fprintf(stderr, "Refreshing iterator\n");
111 for (; iter->Valid(); iter->Next(), num_read++, currentSeqNum++) {
112 BatchResult res = iter->GetBatch();
113 if (res.sequence != currentSeqNum) {
114 fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n",
115 (long)currentSeqNum, (long)res.sequence);
116 exit(1);
117 }
118 }
119 }
120 }
121
122 #endif // GFLAGS
123
124 #else // ROCKSDB_LITE
125 #include <stdio.h>
main(int,char **)126 int main(int /*argc*/, char** /*argv*/) {
127 fprintf(stderr, "Not supported in lite mode.\n");
128 return 1;
129 }
130 #endif // ROCKSDB_LITE
131