1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 
6 // How to use this example
7 // Open two terminals, in one of them, run `./multi_processes_example 0` to
8 // start a process running the primary instance. This will create a new DB in
9 // kDBPath. The process will run for a while inserting keys to the normal
10 // RocksDB database.
11 // Next, go to the other terminal and run `./multi_processes_example 1` to
12 // start a process running the secondary instance. This will create a secondary
13 // instance following the aforementioned primary instance. This process will
14 // run for a while, tailing the logs of the primary. After process with primary
15 // instance exits, this process will keep running until you hit 'CTRL+C'.
16 
17 #include <chrono>
18 #include <cinttypes>
19 #include <cstdio>
20 #include <cstdlib>
21 #include <ctime>
22 #include <string>
23 #include <thread>
24 #include <vector>
25 
26 // TODO: port this example to other systems. It should be straightforward for
27 // POSIX-compliant systems.
28 #if defined(OS_LINUX)
29 #include <dirent.h>
30 #include <signal.h>
31 #include <sys/stat.h>
32 #include <sys/types.h>
33 #include <sys/wait.h>
34 #include <unistd.h>
35 
36 #include "rocksdb/db.h"
37 #include "rocksdb/options.h"
38 #include "rocksdb/slice.h"
39 
40 using ROCKSDB_NAMESPACE::ColumnFamilyDescriptor;
41 using ROCKSDB_NAMESPACE::ColumnFamilyHandle;
42 using ROCKSDB_NAMESPACE::ColumnFamilyOptions;
43 using ROCKSDB_NAMESPACE::DB;
44 using ROCKSDB_NAMESPACE::FlushOptions;
45 using ROCKSDB_NAMESPACE::Iterator;
46 using ROCKSDB_NAMESPACE::Options;
47 using ROCKSDB_NAMESPACE::ReadOptions;
48 using ROCKSDB_NAMESPACE::Slice;
49 using ROCKSDB_NAMESPACE::Status;
50 using ROCKSDB_NAMESPACE::WriteOptions;
51 
52 const std::string kDBPath = "/tmp/rocksdb_multi_processes_example";
53 const std::string kPrimaryStatusFile =
54     "/tmp/rocksdb_multi_processes_example_primary_status";
55 const uint64_t kMaxKey = 600000;
56 const size_t kMaxValueLength = 256;
57 const size_t kNumKeysPerFlush = 1000;
58 
GetColumnFamilyNames()59 const std::vector<std::string>& GetColumnFamilyNames() {
60   static std::vector<std::string> column_family_names = {
61       ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, "pikachu"};
62   return column_family_names;
63 }
64 
IsLittleEndian()65 inline bool IsLittleEndian() {
66   uint32_t x = 1;
67   return *reinterpret_cast<char*>(&x) != 0;
68 }
69 
ShouldSecondaryWait()70 static std::atomic<int>& ShouldSecondaryWait() {
71   static std::atomic<int> should_secondary_wait{1};
72   return should_secondary_wait;
73 }
74 
Key(uint64_t k)75 static std::string Key(uint64_t k) {
76   std::string ret;
77   if (IsLittleEndian()) {
78     ret.append(reinterpret_cast<char*>(&k), sizeof(k));
79   } else {
80     char buf[sizeof(k)];
81     buf[0] = k & 0xff;
82     buf[1] = (k >> 8) & 0xff;
83     buf[2] = (k >> 16) & 0xff;
84     buf[3] = (k >> 24) & 0xff;
85     buf[4] = (k >> 32) & 0xff;
86     buf[5] = (k >> 40) & 0xff;
87     buf[6] = (k >> 48) & 0xff;
88     buf[7] = (k >> 56) & 0xff;
89     ret.append(buf, sizeof(k));
90   }
91   size_t i = 0, j = ret.size() - 1;
92   while (i < j) {
93     char tmp = ret[i];
94     ret[i] = ret[j];
95     ret[j] = tmp;
96     ++i;
97     --j;
98   }
99   return ret;
100 }
101 
Key(std::string key)102 static uint64_t Key(std::string key) {
103   assert(key.size() == sizeof(uint64_t));
104   size_t i = 0, j = key.size() - 1;
105   while (i < j) {
106     char tmp = key[i];
107     key[i] = key[j];
108     key[j] = tmp;
109     ++i;
110     --j;
111   }
112   uint64_t ret = 0;
113   if (IsLittleEndian()) {
114     memcpy(&ret, key.c_str(), sizeof(uint64_t));
115   } else {
116     const char* buf = key.c_str();
117     ret |= static_cast<uint64_t>(buf[0]);
118     ret |= (static_cast<uint64_t>(buf[1]) << 8);
119     ret |= (static_cast<uint64_t>(buf[2]) << 16);
120     ret |= (static_cast<uint64_t>(buf[3]) << 24);
121     ret |= (static_cast<uint64_t>(buf[4]) << 32);
122     ret |= (static_cast<uint64_t>(buf[5]) << 40);
123     ret |= (static_cast<uint64_t>(buf[6]) << 48);
124     ret |= (static_cast<uint64_t>(buf[7]) << 56);
125   }
126   return ret;
127 }
128 
GenerateRandomValue(const size_t max_length,char scratch[])129 static Slice GenerateRandomValue(const size_t max_length, char scratch[]) {
130   size_t sz = 1 + (std::rand() % max_length);
131   int rnd = std::rand();
132   for (size_t i = 0; i != sz; ++i) {
133     scratch[i] = static_cast<char>(rnd ^ i);
134   }
135   return Slice(scratch, sz);
136 }
137 
ShouldCloseDB()138 static bool ShouldCloseDB() { return true; }
139 
CreateDB()140 void CreateDB() {
141   long my_pid = static_cast<long>(getpid());
142   Options options;
143   Status s = ROCKSDB_NAMESPACE::DestroyDB(kDBPath, options);
144   if (!s.ok()) {
145     fprintf(stderr, "[process %ld] Failed to destroy DB: %s\n", my_pid,
146             s.ToString().c_str());
147     assert(false);
148   }
149   options.create_if_missing = true;
150   DB* db = nullptr;
151   s = DB::Open(options, kDBPath, &db);
152   if (!s.ok()) {
153     fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid,
154             s.ToString().c_str());
155     assert(false);
156   }
157   std::vector<ColumnFamilyHandle*> handles;
158   ColumnFamilyOptions cf_opts(options);
159   for (const auto& cf_name : GetColumnFamilyNames()) {
160     if (ROCKSDB_NAMESPACE::kDefaultColumnFamilyName != cf_name) {
161       ColumnFamilyHandle* handle = nullptr;
162       s = db->CreateColumnFamily(cf_opts, cf_name, &handle);
163       if (!s.ok()) {
164         fprintf(stderr, "[process %ld] Failed to create CF %s: %s\n", my_pid,
165                 cf_name.c_str(), s.ToString().c_str());
166         assert(false);
167       }
168       handles.push_back(handle);
169     }
170   }
171   fprintf(stdout, "[process %ld] Column families created\n", my_pid);
172   for (auto h : handles) {
173     delete h;
174   }
175   handles.clear();
176   delete db;
177 }
178 
RunPrimary()179 void RunPrimary() {
180   long my_pid = static_cast<long>(getpid());
181   fprintf(stdout, "[process %ld] Primary instance starts\n", my_pid);
182   CreateDB();
183   std::srand(time(nullptr));
184   DB* db = nullptr;
185   Options options;
186   options.create_if_missing = false;
187   std::vector<ColumnFamilyDescriptor> column_families;
188   for (const auto& cf_name : GetColumnFamilyNames()) {
189     column_families.push_back(ColumnFamilyDescriptor(cf_name, options));
190   }
191   std::vector<ColumnFamilyHandle*> handles;
192   WriteOptions write_opts;
193   char val_buf[kMaxValueLength] = {0};
194   uint64_t curr_key = 0;
195   while (curr_key < kMaxKey) {
196     Status s;
197     if (nullptr == db) {
198       s = DB::Open(options, kDBPath, column_families, &handles, &db);
199       if (!s.ok()) {
200         fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid,
201                 s.ToString().c_str());
202         assert(false);
203       }
204     }
205     assert(nullptr != db);
206     assert(handles.size() == GetColumnFamilyNames().size());
207     for (auto h : handles) {
208       assert(nullptr != h);
209       for (size_t i = 0; i != kNumKeysPerFlush; ++i) {
210         Slice key = Key(curr_key + static_cast<uint64_t>(i));
211         Slice value = GenerateRandomValue(kMaxValueLength, val_buf);
212         s = db->Put(write_opts, h, key, value);
213         if (!s.ok()) {
214           fprintf(stderr, "[process %ld] Failed to insert\n", my_pid);
215           assert(false);
216         }
217       }
218       s = db->Flush(FlushOptions(), h);
219       if (!s.ok()) {
220         fprintf(stderr, "[process %ld] Failed to flush\n", my_pid);
221         assert(false);
222       }
223     }
224     curr_key += static_cast<uint64_t>(kNumKeysPerFlush);
225     if (ShouldCloseDB()) {
226       for (auto h : handles) {
227         delete h;
228       }
229       handles.clear();
230       delete db;
231       db = nullptr;
232     }
233   }
234   if (nullptr != db) {
235     for (auto h : handles) {
236       delete h;
237     }
238     handles.clear();
239     delete db;
240     db = nullptr;
241   }
242   fprintf(stdout, "[process %ld] Finished adding keys\n", my_pid);
243 }
244 
secondary_instance_sigint_handler(int signal)245 void secondary_instance_sigint_handler(int signal) {
246   ShouldSecondaryWait().store(0, std::memory_order_relaxed);
247   fprintf(stdout, "\n");
248   fflush(stdout);
249 };
250 
RunSecondary()251 void RunSecondary() {
252   ::signal(SIGINT, secondary_instance_sigint_handler);
253   long my_pid = static_cast<long>(getpid());
254   const std::string kSecondaryPath =
255       "/tmp/rocksdb_multi_processes_example_secondary";
256   // Create directory if necessary
257   if (nullptr == opendir(kSecondaryPath.c_str())) {
258     int ret =
259         mkdir(kSecondaryPath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
260     if (ret < 0) {
261       perror("failed to create directory for secondary instance");
262       exit(0);
263     }
264   }
265   DB* db = nullptr;
266   Options options;
267   options.create_if_missing = false;
268   options.max_open_files = -1;
269   Status s = DB::OpenAsSecondary(options, kDBPath, kSecondaryPath, &db);
270   if (!s.ok()) {
271     fprintf(stderr, "[process %ld] Failed to open in secondary mode: %s\n",
272             my_pid, s.ToString().c_str());
273     assert(false);
274   } else {
275     fprintf(stdout, "[process %ld] Secondary instance starts\n", my_pid);
276   }
277 
278   ReadOptions ropts;
279   ropts.verify_checksums = true;
280   ropts.total_order_seek = true;
281 
282   std::vector<std::thread> test_threads;
283   test_threads.emplace_back([&]() {
284     while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
285       std::unique_ptr<Iterator> iter(db->NewIterator(ropts));
286       iter->SeekToFirst();
287       size_t count = 0;
288       for (; iter->Valid(); iter->Next()) {
289         ++count;
290       }
291     }
292     fprintf(stdout, "[process %ld] Range_scan thread finished\n", my_pid);
293   });
294 
295   test_threads.emplace_back([&]() {
296     std::srand(time(nullptr));
297     while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
298       Slice key = Key(std::rand() % kMaxKey);
299       std::string value;
300       db->Get(ropts, key, &value);
301     }
302     fprintf(stdout, "[process %ld] Point lookup thread finished\n", my_pid);
303   });
304 
305   uint64_t curr_key = 0;
306   while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
307     s = db->TryCatchUpWithPrimary();
308     if (!s.ok()) {
309       fprintf(stderr,
310               "[process %ld] error while trying to catch up with "
311               "primary %s\n",
312               my_pid, s.ToString().c_str());
313       assert(false);
314     }
315     {
316       std::unique_ptr<Iterator> iter(db->NewIterator(ropts));
317       if (!iter) {
318         fprintf(stderr, "[process %ld] Failed to create iterator\n", my_pid);
319         assert(false);
320       }
321       iter->SeekToLast();
322       if (iter->Valid()) {
323         uint64_t curr_max_key = Key(iter->key().ToString());
324         if (curr_max_key != curr_key) {
325           fprintf(stdout, "[process %ld] Observed key %" PRIu64 "\n", my_pid,
326                   curr_key);
327           curr_key = curr_max_key;
328         }
329       }
330     }
331     std::this_thread::sleep_for(std::chrono::seconds(1));
332   }
333   s = db->TryCatchUpWithPrimary();
334   if (!s.ok()) {
335     fprintf(stderr,
336             "[process %ld] error while trying to catch up with "
337             "primary %s\n",
338             my_pid, s.ToString().c_str());
339     assert(false);
340   }
341 
342   std::vector<ColumnFamilyDescriptor> column_families;
343   for (const auto& cf_name : GetColumnFamilyNames()) {
344     column_families.push_back(ColumnFamilyDescriptor(cf_name, options));
345   }
346   std::vector<ColumnFamilyHandle*> handles;
347   DB* verification_db = nullptr;
348   s = DB::OpenForReadOnly(options, kDBPath, column_families, &handles,
349                           &verification_db);
350   assert(s.ok());
351   Iterator* iter1 = verification_db->NewIterator(ropts);
352   iter1->SeekToFirst();
353 
354   Iterator* iter = db->NewIterator(ropts);
355   iter->SeekToFirst();
356   for (; iter->Valid() && iter1->Valid(); iter->Next(), iter1->Next()) {
357     if (iter->key().ToString() != iter1->key().ToString()) {
358       fprintf(stderr, "%" PRIu64 "!= %" PRIu64 "\n",
359               Key(iter->key().ToString()), Key(iter1->key().ToString()));
360       assert(false);
361     } else if (iter->value().ToString() != iter1->value().ToString()) {
362       fprintf(stderr, "Value mismatch\n");
363       assert(false);
364     }
365   }
366   fprintf(stdout, "[process %ld] Verification succeeded\n", my_pid);
367   for (auto& thr : test_threads) {
368     thr.join();
369   }
370   delete iter;
371   delete iter1;
372   delete db;
373   delete verification_db;
374 }
375 
main(int argc,char ** argv)376 int main(int argc, char** argv) {
377   if (argc < 2) {
378     fprintf(stderr, "%s <0 for primary, 1 for secondary>\n", argv[0]);
379     return 0;
380   }
381   if (atoi(argv[1]) == 0) {
382     RunPrimary();
383   } else {
384     RunSecondary();
385   }
386   return 0;
387 }
388 #else   // OS_LINUX
main()389 int main() {
390   fprintf(stderr, "Not implemented.\n");
391   return 0;
392 }
393 #endif  // !OS_LINUX
394