1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 
6 // How to use this example
7 // Open two terminals, in one of them, run `./multi_processes_example 0` to
8 // start a process running the primary instance. This will create a new DB in
9 // kDBPath. The process will run for a while inserting keys to the normal
10 // RocksDB database.
11 // Next, go to the other terminal and run `./multi_processes_example 1` to
12 // start a process running the secondary instance. This will create a secondary
13 // instance following the aforementioned primary instance. This process will
14 // run for a while, tailing the logs of the primary. After process with primary
15 // instance exits, this process will keep running until you hit 'CTRL+C'.
16 
17 #include <chrono>
18 #include <cinttypes>
19 #include <cstdio>
20 #include <cstdlib>
21 #include <ctime>
22 #include <string>
23 #include <thread>
24 #include <vector>
25 
26 #if defined(OS_LINUX)
27 #include <dirent.h>
28 #include <signal.h>
29 #include <sys/stat.h>
30 #include <sys/types.h>
31 #include <sys/wait.h>
32 #include <unistd.h>
33 #endif  // !OS_LINUX
34 
35 #include "rocksdb/db.h"
36 #include "rocksdb/options.h"
37 #include "rocksdb/slice.h"
38 
39 using ROCKSDB_NAMESPACE::ColumnFamilyDescriptor;
40 using ROCKSDB_NAMESPACE::ColumnFamilyHandle;
41 using ROCKSDB_NAMESPACE::ColumnFamilyOptions;
42 using ROCKSDB_NAMESPACE::DB;
43 using ROCKSDB_NAMESPACE::FlushOptions;
44 using ROCKSDB_NAMESPACE::Iterator;
45 using ROCKSDB_NAMESPACE::Options;
46 using ROCKSDB_NAMESPACE::ReadOptions;
47 using ROCKSDB_NAMESPACE::Slice;
48 using ROCKSDB_NAMESPACE::Status;
49 using ROCKSDB_NAMESPACE::WriteOptions;
50 
51 const std::string kDBPath = "/tmp/rocksdb_multi_processes_example";
52 const std::string kPrimaryStatusFile =
53     "/tmp/rocksdb_multi_processes_example_primary_status";
54 const uint64_t kMaxKey = 600000;
55 const size_t kMaxValueLength = 256;
56 const size_t kNumKeysPerFlush = 1000;
57 
GetColumnFamilyNames()58 const std::vector<std::string>& GetColumnFamilyNames() {
59   static std::vector<std::string> column_family_names = {
60       ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, "pikachu"};
61   return column_family_names;
62 }
63 
IsLittleEndian()64 inline bool IsLittleEndian() {
65   uint32_t x = 1;
66   return *reinterpret_cast<char*>(&x) != 0;
67 }
68 
ShouldSecondaryWait()69 static std::atomic<int>& ShouldSecondaryWait() {
70   static std::atomic<int> should_secondary_wait{1};
71   return should_secondary_wait;
72 }
73 
Key(uint64_t k)74 static std::string Key(uint64_t k) {
75   std::string ret;
76   if (IsLittleEndian()) {
77     ret.append(reinterpret_cast<char*>(&k), sizeof(k));
78   } else {
79     char buf[sizeof(k)];
80     buf[0] = k & 0xff;
81     buf[1] = (k >> 8) & 0xff;
82     buf[2] = (k >> 16) & 0xff;
83     buf[3] = (k >> 24) & 0xff;
84     buf[4] = (k >> 32) & 0xff;
85     buf[5] = (k >> 40) & 0xff;
86     buf[6] = (k >> 48) & 0xff;
87     buf[7] = (k >> 56) & 0xff;
88     ret.append(buf, sizeof(k));
89   }
90   size_t i = 0, j = ret.size() - 1;
91   while (i < j) {
92     char tmp = ret[i];
93     ret[i] = ret[j];
94     ret[j] = tmp;
95     ++i;
96     --j;
97   }
98   return ret;
99 }
100 
Key(std::string key)101 static uint64_t Key(std::string key) {
102   assert(key.size() == sizeof(uint64_t));
103   size_t i = 0, j = key.size() - 1;
104   while (i < j) {
105     char tmp = key[i];
106     key[i] = key[j];
107     key[j] = tmp;
108     ++i;
109     --j;
110   }
111   uint64_t ret = 0;
112   if (IsLittleEndian()) {
113     memcpy(&ret, key.c_str(), sizeof(uint64_t));
114   } else {
115     const char* buf = key.c_str();
116     ret |= static_cast<uint64_t>(buf[0]);
117     ret |= (static_cast<uint64_t>(buf[1]) << 8);
118     ret |= (static_cast<uint64_t>(buf[2]) << 16);
119     ret |= (static_cast<uint64_t>(buf[3]) << 24);
120     ret |= (static_cast<uint64_t>(buf[4]) << 32);
121     ret |= (static_cast<uint64_t>(buf[5]) << 40);
122     ret |= (static_cast<uint64_t>(buf[6]) << 48);
123     ret |= (static_cast<uint64_t>(buf[7]) << 56);
124   }
125   return ret;
126 }
127 
GenerateRandomValue(const size_t max_length,char scratch[])128 static Slice GenerateRandomValue(const size_t max_length, char scratch[]) {
129   size_t sz = 1 + (std::rand() % max_length);
130   int rnd = std::rand();
131   for (size_t i = 0; i != sz; ++i) {
132     scratch[i] = static_cast<char>(rnd ^ i);
133   }
134   return Slice(scratch, sz);
135 }
136 
ShouldCloseDB()137 static bool ShouldCloseDB() { return true; }
138 
139 // TODO: port this example to other systems. It should be straightforward for
140 // POSIX-compliant systems.
141 #if defined(OS_LINUX)
CreateDB()142 void CreateDB() {
143   long my_pid = static_cast<long>(getpid());
144   Options options;
145   Status s = ROCKSDB_NAMESPACE::DestroyDB(kDBPath, options);
146   if (!s.ok()) {
147     fprintf(stderr, "[process %ld] Failed to destroy DB: %s\n", my_pid,
148             s.ToString().c_str());
149     assert(false);
150   }
151   options.create_if_missing = true;
152   DB* db = nullptr;
153   s = DB::Open(options, kDBPath, &db);
154   if (!s.ok()) {
155     fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid,
156             s.ToString().c_str());
157     assert(false);
158   }
159   std::vector<ColumnFamilyHandle*> handles;
160   ColumnFamilyOptions cf_opts(options);
161   for (const auto& cf_name : GetColumnFamilyNames()) {
162     if (ROCKSDB_NAMESPACE::kDefaultColumnFamilyName != cf_name) {
163       ColumnFamilyHandle* handle = nullptr;
164       s = db->CreateColumnFamily(cf_opts, cf_name, &handle);
165       if (!s.ok()) {
166         fprintf(stderr, "[process %ld] Failed to create CF %s: %s\n", my_pid,
167                 cf_name.c_str(), s.ToString().c_str());
168         assert(false);
169       }
170       handles.push_back(handle);
171     }
172   }
173   fprintf(stdout, "[process %ld] Column families created\n", my_pid);
174   for (auto h : handles) {
175     delete h;
176   }
177   handles.clear();
178   delete db;
179 }
180 
RunPrimary()181 void RunPrimary() {
182   long my_pid = static_cast<long>(getpid());
183   fprintf(stdout, "[process %ld] Primary instance starts\n", my_pid);
184   CreateDB();
185   std::srand(time(nullptr));
186   DB* db = nullptr;
187   Options options;
188   options.create_if_missing = false;
189   std::vector<ColumnFamilyDescriptor> column_families;
190   for (const auto& cf_name : GetColumnFamilyNames()) {
191     column_families.push_back(ColumnFamilyDescriptor(cf_name, options));
192   }
193   std::vector<ColumnFamilyHandle*> handles;
194   WriteOptions write_opts;
195   char val_buf[kMaxValueLength] = {0};
196   uint64_t curr_key = 0;
197   while (curr_key < kMaxKey) {
198     Status s;
199     if (nullptr == db) {
200       s = DB::Open(options, kDBPath, column_families, &handles, &db);
201       if (!s.ok()) {
202         fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid,
203                 s.ToString().c_str());
204         assert(false);
205       }
206     }
207     assert(nullptr != db);
208     assert(handles.size() == GetColumnFamilyNames().size());
209     for (auto h : handles) {
210       assert(nullptr != h);
211       for (size_t i = 0; i != kNumKeysPerFlush; ++i) {
212         Slice key = Key(curr_key + static_cast<uint64_t>(i));
213         Slice value = GenerateRandomValue(kMaxValueLength, val_buf);
214         s = db->Put(write_opts, h, key, value);
215         if (!s.ok()) {
216           fprintf(stderr, "[process %ld] Failed to insert\n", my_pid);
217           assert(false);
218         }
219       }
220       s = db->Flush(FlushOptions(), h);
221       if (!s.ok()) {
222         fprintf(stderr, "[process %ld] Failed to flush\n", my_pid);
223         assert(false);
224       }
225     }
226     curr_key += static_cast<uint64_t>(kNumKeysPerFlush);
227     if (ShouldCloseDB()) {
228       for (auto h : handles) {
229         delete h;
230       }
231       handles.clear();
232       delete db;
233       db = nullptr;
234     }
235   }
236   if (nullptr != db) {
237     for (auto h : handles) {
238       delete h;
239     }
240     handles.clear();
241     delete db;
242     db = nullptr;
243   }
244   fprintf(stdout, "[process %ld] Finished adding keys\n", my_pid);
245 }
246 
secondary_instance_sigint_handler(int signal)247 void secondary_instance_sigint_handler(int signal) {
248   ShouldSecondaryWait().store(0, std::memory_order_relaxed);
249   fprintf(stdout, "\n");
250   fflush(stdout);
251 };
252 
RunSecondary()253 void RunSecondary() {
254   ::signal(SIGINT, secondary_instance_sigint_handler);
255   long my_pid = static_cast<long>(getpid());
256   const std::string kSecondaryPath =
257       "/tmp/rocksdb_multi_processes_example_secondary";
258   // Create directory if necessary
259   if (nullptr == opendir(kSecondaryPath.c_str())) {
260     int ret =
261         mkdir(kSecondaryPath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
262     if (ret < 0) {
263       perror("failed to create directory for secondary instance");
264       exit(0);
265     }
266   }
267   DB* db = nullptr;
268   Options options;
269   options.create_if_missing = false;
270   options.max_open_files = -1;
271   Status s = DB::OpenAsSecondary(options, kDBPath, kSecondaryPath, &db);
272   if (!s.ok()) {
273     fprintf(stderr, "[process %ld] Failed to open in secondary mode: %s\n",
274             my_pid, s.ToString().c_str());
275     assert(false);
276   } else {
277     fprintf(stdout, "[process %ld] Secondary instance starts\n", my_pid);
278   }
279 
280   ReadOptions ropts;
281   ropts.verify_checksums = true;
282   ropts.total_order_seek = true;
283 
284   std::vector<std::thread> test_threads;
285   test_threads.emplace_back([&]() {
286     while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
287       std::unique_ptr<Iterator> iter(db->NewIterator(ropts));
288       iter->SeekToFirst();
289       size_t count = 0;
290       for (; iter->Valid(); iter->Next()) {
291         ++count;
292       }
293     }
294     fprintf(stdout, "[process %ld] Range_scan thread finished\n", my_pid);
295   });
296 
297   test_threads.emplace_back([&]() {
298     std::srand(time(nullptr));
299     while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
300       Slice key = Key(std::rand() % kMaxKey);
301       std::string value;
302       db->Get(ropts, key, &value);
303     }
304     fprintf(stdout, "[process %ld] Point lookup thread finished\n");
305   });
306 
307   uint64_t curr_key = 0;
308   while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
309     s = db->TryCatchUpWithPrimary();
310     if (!s.ok()) {
311       fprintf(stderr,
312               "[process %ld] error while trying to catch up with "
313               "primary %s\n",
314               my_pid, s.ToString().c_str());
315       assert(false);
316     }
317     {
318       std::unique_ptr<Iterator> iter(db->NewIterator(ropts));
319       if (!iter) {
320         fprintf(stderr, "[process %ld] Failed to create iterator\n", my_pid);
321         assert(false);
322       }
323       iter->SeekToLast();
324       if (iter->Valid()) {
325         uint64_t curr_max_key = Key(iter->key().ToString());
326         if (curr_max_key != curr_key) {
327           fprintf(stdout, "[process %ld] Observed key %" PRIu64 "\n", my_pid,
328                   curr_key);
329           curr_key = curr_max_key;
330         }
331       }
332     }
333     std::this_thread::sleep_for(std::chrono::seconds(1));
334   }
335   s = db->TryCatchUpWithPrimary();
336   if (!s.ok()) {
337     fprintf(stderr,
338             "[process %ld] error while trying to catch up with "
339             "primary %s\n",
340             my_pid, s.ToString().c_str());
341     assert(false);
342   }
343 
344   std::vector<ColumnFamilyDescriptor> column_families;
345   for (const auto& cf_name : GetColumnFamilyNames()) {
346     column_families.push_back(ColumnFamilyDescriptor(cf_name, options));
347   }
348   std::vector<ColumnFamilyHandle*> handles;
349   DB* verification_db = nullptr;
350   s = DB::OpenForReadOnly(options, kDBPath, column_families, &handles,
351                           &verification_db);
352   assert(s.ok());
353   Iterator* iter1 = verification_db->NewIterator(ropts);
354   iter1->SeekToFirst();
355 
356   Iterator* iter = db->NewIterator(ropts);
357   iter->SeekToFirst();
358   for (; iter->Valid() && iter1->Valid(); iter->Next(), iter1->Next()) {
359     if (iter->key().ToString() != iter1->key().ToString()) {
360       fprintf(stderr, "%" PRIu64 "!= %" PRIu64 "\n",
361               Key(iter->key().ToString()), Key(iter1->key().ToString()));
362       assert(false);
363     } else if (iter->value().ToString() != iter1->value().ToString()) {
364       fprintf(stderr, "Value mismatch\n");
365       assert(false);
366     }
367   }
368   fprintf(stdout, "[process %ld] Verification succeeded\n", my_pid);
369   for (auto& thr : test_threads) {
370     thr.join();
371   }
372   delete iter;
373   delete iter1;
374   delete db;
375   delete verification_db;
376 }
377 
main(int argc,char ** argv)378 int main(int argc, char** argv) {
379   if (argc < 2) {
380     fprintf(stderr, "%s <0 for primary, 1 for secondary>\n", argv[0]);
381     return 0;
382   }
383   if (atoi(argv[1]) == 0) {
384     RunPrimary();
385   } else {
386     RunSecondary();
387   }
388   return 0;
389 }
390 #else   // OS_LINUX
main()391 int main() {
392   fpritnf(stderr, "Not implemented.\n");
393   return 0;
394 }
395 #endif  // !OS_LINUX
396