1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 #ifndef GFLAGS
8 #include <cstdio>
main()9 int main() {
10   fprintf(stderr, "Please install gflags to run rocksdb tools\n");
11   return 1;
12 }
13 #else
14 
15 #include <atomic>
16 #include <cstdio>
17 
18 #include "db/write_batch_internal.h"
19 #include "rocksdb/db.h"
20 #include "rocksdb/types.h"
21 #include "test_util/testutil.h"
22 #include "util/gflags_compat.h"
23 
24 // Run a thread to perform Put's.
25 // Another thread uses GetUpdatesSince API to keep getting the updates.
26 // options :
27 // --num_inserts = the num of inserts the first thread should perform.
28 // --wal_ttl = the wal ttl for the run.
29 
30 DEFINE_uint64(num_inserts, 1000,
31               "the num of inserts the first thread should"
32               " perform.");
33 DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
34 DEFINE_uint64(wal_size_limit_MB, 10,
35               "the wal size limit for the run"
36               "(in MB)");
37 
38 using namespace ROCKSDB_NAMESPACE;
39 
40 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
41 using GFLAGS_NAMESPACE::SetUsageMessage;
42 
43 struct DataPumpThread {
44   DB* db;  // Assumption DB is Open'ed already.
45 };
46 
DataPumpThreadBody(void * arg)47 static void DataPumpThreadBody(void* arg) {
48   DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg);
49   DB* db = t->db;
50   Random rnd(301);
51   uint64_t i = 0;
52   while (i++ < FLAGS_num_inserts) {
53     if (!db->Put(WriteOptions(), Slice(rnd.RandomString(500)),
54                  Slice(rnd.RandomString(500)))
55              .ok()) {
56       fprintf(stderr, "Error in put\n");
57       exit(1);
58     }
59   }
60 }
61 
main(int argc,const char ** argv)62 int main(int argc, const char** argv) {
63   SetUsageMessage(
64       std::string("\nUSAGE:\n") + std::string(argv[0]) +
65       " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" +
66       " --wal_size_limit_MB=<WAL_size_limit_MB>");
67   ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true);
68 
69   Env* env = Env::Default();
70   std::string default_db_path;
71   env->GetTestDirectory(&default_db_path);
72   default_db_path += "db_repl_stress";
73   Options options;
74   options.create_if_missing = true;
75   options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
76   options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
77   DB* db;
78   DestroyDB(default_db_path, options);
79 
80   Status s = DB::Open(options, default_db_path, &db);
81 
82   if (!s.ok()) {
83     fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str());
84     exit(1);
85   }
86 
87   DataPumpThread dataPump;
88   dataPump.db = db;
89   env->StartThread(DataPumpThreadBody, &dataPump);
90 
91   std::unique_ptr<TransactionLogIterator> iter;
92   SequenceNumber currentSeqNum = 1;
93   uint64_t num_read = 0;
94   for (;;) {
95     iter.reset();
96     // Continue to probe a bit more after all received
97     size_t probes = 0;
98     while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
99       probes++;
100       if (probes > 100 && num_read >= FLAGS_num_inserts) {
101         if (num_read > FLAGS_num_inserts) {
102           fprintf(stderr, "Too many updates read: %ld expected: %ld\n",
103                   (long)num_read, (long)FLAGS_num_inserts);
104           exit(1);
105         }
106         fprintf(stderr, "Successful!\n");
107         return 0;
108       }
109     }
110     fprintf(stderr, "Refreshing iterator\n");
111     for (; iter->Valid(); iter->Next(), num_read++, currentSeqNum++) {
112       BatchResult res = iter->GetBatch();
113       if (res.sequence != currentSeqNum) {
114         fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n",
115                 (long)currentSeqNum, (long)res.sequence);
116         exit(1);
117       }
118     }
119   }
120 }
121 
122 #endif  // GFLAGS
123 
124 #else  // ROCKSDB_LITE
125 #include <stdio.h>
main(int,char **)126 int main(int /*argc*/, char** /*argv*/) {
127   fprintf(stderr, "Not supported in lite mode.\n");
128   return 1;
129 }
130 #endif  // ROCKSDB_LITE
131