1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/db_test_util.h"
11 #include "db/write_thread.h"
12 #include "port/stack_trace.h"
13 
14 namespace ROCKSDB_NAMESPACE {
15 
16 class DBWriteBufferManagerTest : public DBTestBase,
17                                  public testing::WithParamInterface<bool> {
18  public:
DBWriteBufferManagerTest()19   DBWriteBufferManagerTest()
20       : DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {}
21   bool cost_cache_;
22 };
23 
TEST_P(DBWriteBufferManagerTest,SharedBufferAcrossCFs1)24 TEST_P(DBWriteBufferManagerTest, SharedBufferAcrossCFs1) {
25   Options options = CurrentOptions();
26   options.arena_block_size = 4096;
27   options.write_buffer_size = 500000;  // this is never hit
28   std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
29   ASSERT_LT(cache->GetUsage(), 256 * 1024);
30   cost_cache_ = GetParam();
31 
32   if (cost_cache_) {
33     options.write_buffer_manager.reset(
34         new WriteBufferManager(100000, cache, true));
35   } else {
36     options.write_buffer_manager.reset(
37         new WriteBufferManager(100000, nullptr, true));
38   }
39 
40   WriteOptions wo;
41   wo.disableWAL = true;
42 
43   CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options);
44   ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
45   Flush(3);
46   ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
47   ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
48   Flush(0);
49 
50   // Write to "Default", "cf2" and "cf3".
51   ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
52   ASSERT_OK(Put(0, Key(1), DummyString(40000), wo));
53   ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
54 
55   ASSERT_OK(Put(3, Key(2), DummyString(40000), wo));
56   // WriteBufferManager::buffer_size_ has exceeded after the previous write is
57   // completed.
58 
59   // This make sures write will go through and if stall was in effect, it will
60   // end.
61   ASSERT_OK(Put(0, Key(2), DummyString(1), wo));
62 }
63 
64 // Test Single DB with multiple writer threads get blocked when
65 // WriteBufferManager execeeds buffer_size_ and flush is waiting to be
66 // finished.
TEST_P(DBWriteBufferManagerTest,SharedWriteBufferAcrossCFs2)67 TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs2) {
68   Options options = CurrentOptions();
69   options.arena_block_size = 4096;
70   options.write_buffer_size = 500000;  // this is never hit
71   std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
72   ASSERT_LT(cache->GetUsage(), 256 * 1024);
73   cost_cache_ = GetParam();
74 
75   if (cost_cache_) {
76     options.write_buffer_manager.reset(
77         new WriteBufferManager(100000, cache, true));
78   } else {
79     options.write_buffer_manager.reset(
80         new WriteBufferManager(100000, nullptr, true));
81   }
82   WriteOptions wo;
83   wo.disableWAL = true;
84 
85   CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options);
86   ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
87   Flush(3);
88   ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
89   ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
90   Flush(0);
91 
92   // Write to "Default", "cf2" and "cf3". No flush will be triggered.
93   ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
94   ASSERT_OK(Put(0, Key(1), DummyString(40000), wo));
95   ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
96 
97   ASSERT_OK(Put(3, Key(2), DummyString(40000), wo));
98   // WriteBufferManager::buffer_size_ has exceeded after the previous write is
99   // completed.
100 
101   std::unordered_set<WriteThread::Writer*> w_set;
102   std::vector<port::Thread> threads;
103   int wait_count_db = 0;
104   int num_writers = 4;
105   InstrumentedMutex mutex;
106   InstrumentedCondVar cv(&mutex);
107   std::atomic<int> thread_num(0);
108 
109   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
110       {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
111         "DBImpl::BackgroundCallFlush:start"}});
112 
113   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
114       "WBMStallInterface::BlockDB", [&](void*) {
115         InstrumentedMutexLock lock(&mutex);
116         wait_count_db++;
117         cv.SignalAll();
118       });
119   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
120       "WriteThread::WriteStall::Wait", [&](void* arg) {
121         InstrumentedMutexLock lock(&mutex);
122         WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg);
123         w_set.insert(w);
124         // Allow the flush to continue if all writer threads are blocked.
125         if (w_set.size() == (unsigned long)num_writers) {
126           TEST_SYNC_POINT(
127               "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
128         }
129       });
130   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
131 
132   bool s = true;
133 
134   std::function<void(int)> writer = [&](int cf) {
135     int a = thread_num.fetch_add(1);
136     std::string key = "foo" + std::to_string(a);
137     Status tmp = Put(cf, Slice(key), DummyString(1), wo);
138     InstrumentedMutexLock lock(&mutex);
139     s = s && tmp.ok();
140   };
141 
142   // Flow:
143   // main_writer thread will write but will be blocked (as Flush will on hold,
144   // buffer_size_ has exceeded, thus will create stall in effect).
145   //  |
146   //  |
147   //  multiple writer threads will be created to write across multiple columns
148   //  and they will be blocked.
149   //  |
150   //  |
151   //  Last writer thread will write and when its blocked it will signal Flush to
152   //  continue to clear the stall.
153 
154   threads.emplace_back(writer, 1);
155   // Wait untill first thread (main_writer) writing to DB is blocked and then
156   // create the multiple writers which will be blocked from getting added to the
157   // queue because stall is in effect.
158   {
159     InstrumentedMutexLock lock(&mutex);
160     while (wait_count_db != 1) {
161       cv.Wait();
162     }
163   }
164   for (int i = 0; i < num_writers; i++) {
165     threads.emplace_back(writer, i % 4);
166   }
167   for (auto& t : threads) {
168     t.join();
169   }
170 
171   ASSERT_TRUE(s);
172 
173   // Number of DBs blocked.
174   ASSERT_EQ(wait_count_db, 1);
175   // Number of Writer threads blocked.
176   ASSERT_EQ(w_set.size(), num_writers);
177 
178   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
179   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
180 }
181 
182 // Test multiple DBs get blocked when WriteBufferManager limit exceeds and flush
183 // is waiting to be finished but DBs tries to write meanwhile.
TEST_P(DBWriteBufferManagerTest,SharedWriteBufferLimitAcrossDB)184 TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB) {
185   std::vector<std::string> dbnames;
186   std::vector<DB*> dbs;
187   int num_dbs = 3;
188 
189   for (int i = 0; i < num_dbs; i++) {
190     dbs.push_back(nullptr);
191     dbnames.push_back(
192         test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i)));
193   }
194 
195   Options options = CurrentOptions();
196   options.arena_block_size = 4096;
197   options.write_buffer_size = 500000;  // this is never hit
198   std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
199   ASSERT_LT(cache->GetUsage(), 256 * 1024);
200   cost_cache_ = GetParam();
201 
202   if (cost_cache_) {
203     options.write_buffer_manager.reset(
204         new WriteBufferManager(100000, cache, true));
205   } else {
206     options.write_buffer_manager.reset(
207         new WriteBufferManager(100000, nullptr, true));
208   }
209   CreateAndReopenWithCF({"cf1", "cf2"}, options);
210 
211   for (int i = 0; i < num_dbs; i++) {
212     ASSERT_OK(DestroyDB(dbnames[i], options));
213     ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i])));
214   }
215   WriteOptions wo;
216   wo.disableWAL = true;
217 
218   for (int i = 0; i < num_dbs; i++) {
219     ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000)));
220   }
221   // Insert to db_.
222   ASSERT_OK(Put(0, Key(1), DummyString(30000), wo));
223 
224   // WriteBufferManager Limit exceeded.
225   std::vector<port::Thread> threads;
226   int wait_count_db = 0;
227   InstrumentedMutex mutex;
228   InstrumentedCondVar cv(&mutex);
229 
230   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
231       {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
232         "DBImpl::BackgroundCallFlush:start"}});
233 
234   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
235       "WBMStallInterface::BlockDB", [&](void*) {
236         {
237           InstrumentedMutexLock lock(&mutex);
238           wait_count_db++;
239           cv.Signal();
240           // Since this is the last DB, signal Flush to continue.
241           if (wait_count_db == num_dbs + 1) {
242             TEST_SYNC_POINT(
243                 "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
244           }
245         }
246       });
247   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
248 
249   bool s = true;
250 
251   // Write to DB.
252   std::function<void(DB*)> write_db = [&](DB* db) {
253     Status tmp = db->Put(wo, Key(3), DummyString(1));
254     InstrumentedMutexLock lock(&mutex);
255     s = s && tmp.ok();
256   };
257 
258   // Flow:
259   // db_ will write and will be blocked (as Flush will on hold and will create
260   // stall in effect).
261   //  |
262   //  multiple dbs writers will be created to write to that db and they will be
263   //  blocked.
264   //  |
265   //  |
266   //  Last writer will write and when its blocked it will signal Flush to
267   //  continue to clear the stall.
268 
269   threads.emplace_back(write_db, db_);
270   // Wait untill first DB is blocked and then create the multiple writers for
271   // different DBs which will be blocked from getting added to the queue because
272   // stall is in effect.
273   {
274     InstrumentedMutexLock lock(&mutex);
275     while (wait_count_db != 1) {
276       cv.Wait();
277     }
278   }
279   for (int i = 0; i < num_dbs; i++) {
280     threads.emplace_back(write_db, dbs[i]);
281   }
282   for (auto& t : threads) {
283     t.join();
284   }
285 
286   ASSERT_TRUE(s);
287   ASSERT_EQ(num_dbs + 1, wait_count_db);
288   // Clean up DBs.
289   for (int i = 0; i < num_dbs; i++) {
290     ASSERT_OK(dbs[i]->Close());
291     ASSERT_OK(DestroyDB(dbnames[i], options));
292     delete dbs[i];
293   }
294 
295   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
296   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
297 }
298 
299 // Test multiple threads writing across multiple DBs and multiple columns get
300 // blocked when stall by WriteBufferManager is in effect.
TEST_P(DBWriteBufferManagerTest,SharedWriteBufferLimitAcrossDB1)301 TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB1) {
302   std::vector<std::string> dbnames;
303   std::vector<DB*> dbs;
304   int num_dbs = 3;
305 
306   for (int i = 0; i < num_dbs; i++) {
307     dbs.push_back(nullptr);
308     dbnames.push_back(
309         test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i)));
310   }
311 
312   Options options = CurrentOptions();
313   options.arena_block_size = 4096;
314   options.write_buffer_size = 500000;  // this is never hit
315   std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
316   ASSERT_LT(cache->GetUsage(), 256 * 1024);
317   cost_cache_ = GetParam();
318 
319   if (cost_cache_) {
320     options.write_buffer_manager.reset(
321         new WriteBufferManager(100000, cache, true));
322   } else {
323     options.write_buffer_manager.reset(
324         new WriteBufferManager(100000, nullptr, true));
325   }
326   CreateAndReopenWithCF({"cf1", "cf2"}, options);
327 
328   for (int i = 0; i < num_dbs; i++) {
329     ASSERT_OK(DestroyDB(dbnames[i], options));
330     ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i])));
331   }
332   WriteOptions wo;
333   wo.disableWAL = true;
334 
335   for (int i = 0; i < num_dbs; i++) {
336     ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000)));
337   }
338   // Insert to db_.
339   ASSERT_OK(Put(0, Key(1), DummyString(30000), wo));
340 
341   // WriteBufferManager::buffer_size_ has exceeded after the previous write to
342   // dbs[0] is completed.
343   std::vector<port::Thread> threads;
344   int wait_count_db = 0;
345   InstrumentedMutex mutex;
346   InstrumentedCondVar cv(&mutex);
347   std::unordered_set<WriteThread::Writer*> w_set;
348   std::vector<port::Thread> writer_threads;
349   std::atomic<int> thread_num(0);
350 
351   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
352       {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
353         "DBImpl::BackgroundCallFlush:start"}});
354 
355   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
356       "WBMStallInterface::BlockDB", [&](void*) {
357         {
358           InstrumentedMutexLock lock(&mutex);
359           wait_count_db++;
360           thread_num.fetch_add(1);
361           cv.Signal();
362           // Allow the flush to continue if all writer threads are blocked.
363           if (thread_num.load(std::memory_order_relaxed) == 2 * num_dbs + 1) {
364             TEST_SYNC_POINT(
365                 "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
366           }
367         }
368       });
369   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
370       "WriteThread::WriteStall::Wait", [&](void* arg) {
371         WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg);
372         {
373           InstrumentedMutexLock lock(&mutex);
374           w_set.insert(w);
375           thread_num.fetch_add(1);
376           // Allow the flush continue if all writer threads are blocked.
377           if (thread_num.load(std::memory_order_relaxed) == 2 * num_dbs + 1) {
378             TEST_SYNC_POINT(
379                 "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
380           }
381         }
382       });
383   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
384 
385   bool s1 = true, s2 = true;
386   // Write to multiple columns of db_.
387   std::function<void(int)> write_cf = [&](int cf) {
388     Status tmp = Put(cf, Key(3), DummyString(1), wo);
389     InstrumentedMutexLock lock(&mutex);
390     s1 = s1 && tmp.ok();
391   };
392   // Write to multiple DBs.
393   std::function<void(DB*)> write_db = [&](DB* db) {
394     Status tmp = db->Put(wo, Key(3), DummyString(1));
395     InstrumentedMutexLock lock(&mutex);
396     s2 = s2 && tmp.ok();
397   };
398 
399   // Flow:
400   // thread will write to db_ will be blocked (as Flush will on hold,
401   // buffer_size_ has exceeded and will create stall in effect).
402   //  |
403   //  |
404   //  multiple writers threads writing to different DBs and to db_ across
405   //  multiple columns will be created and they will be blocked due to stall.
406   //  |
407   //  |
408   //  Last writer thread will write and when its blocked it will signal Flush to
409   //  continue to clear the stall.
410   threads.emplace_back(write_db, db_);
411   // Wait untill first thread is blocked and then create the multiple writer
412   // threads.
413   {
414     InstrumentedMutexLock lock(&mutex);
415     while (wait_count_db != 1) {
416       cv.Wait();
417     }
418   }
419 
420   for (int i = 0; i < num_dbs; i++) {
421     // Write to multiple columns of db_.
422     writer_threads.emplace_back(write_cf, i % 3);
423     // Write to different dbs.
424     threads.emplace_back(write_db, dbs[i]);
425   }
426   for (auto& t : threads) {
427     t.join();
428   }
429   for (auto& t : writer_threads) {
430     t.join();
431   }
432 
433   ASSERT_TRUE(s1);
434   ASSERT_TRUE(s2);
435 
436   // Number of DBs blocked.
437   ASSERT_EQ(num_dbs + 1, wait_count_db);
438   // Number of Writer threads blocked.
439   ASSERT_EQ(w_set.size(), num_dbs);
440   // Clean up DBs.
441   for (int i = 0; i < num_dbs; i++) {
442     ASSERT_OK(dbs[i]->Close());
443     ASSERT_OK(DestroyDB(dbnames[i], options));
444     delete dbs[i];
445   }
446 
447   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
448   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
449 }
450 
451 // Test multiple threads writing across multiple columns of db_ by passing
452 // different values to WriteOption.no_slown_down.
TEST_P(DBWriteBufferManagerTest,MixedSlowDownOptionsSingleDB)453 TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsSingleDB) {
454   Options options = CurrentOptions();
455   options.arena_block_size = 4096;
456   options.write_buffer_size = 500000;  // this is never hit
457   std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
458   ASSERT_LT(cache->GetUsage(), 256 * 1024);
459   cost_cache_ = GetParam();
460 
461   if (cost_cache_) {
462     options.write_buffer_manager.reset(
463         new WriteBufferManager(100000, cache, true));
464   } else {
465     options.write_buffer_manager.reset(
466         new WriteBufferManager(100000, nullptr, true));
467   }
468   WriteOptions wo;
469   wo.disableWAL = true;
470 
471   CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options);
472 
473   ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
474   Flush(3);
475   ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
476   ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
477   Flush(0);
478 
479   // Write to "Default", "cf2" and "cf3". No flush will be triggered.
480   ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
481   ASSERT_OK(Put(0, Key(1), DummyString(40000), wo));
482   ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
483   ASSERT_OK(Put(3, Key(2), DummyString(40000), wo));
484 
485   // WriteBufferManager::buffer_size_ has exceeded after the previous write to
486   // db_ is completed.
487 
488   std::unordered_set<WriteThread::Writer*> w_slowdown_set;
489   std::vector<port::Thread> threads;
490   int wait_count_db = 0;
491   int num_writers = 4;
492   InstrumentedMutex mutex;
493   InstrumentedCondVar cv(&mutex);
494   std::atomic<int> thread_num(0);
495   std::atomic<int> w_no_slowdown(0);
496 
497   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
498       {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
499         "DBImpl::BackgroundCallFlush:start"}});
500 
501   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
502       "WBMStallInterface::BlockDB", [&](void*) {
503         {
504           InstrumentedMutexLock lock(&mutex);
505           wait_count_db++;
506           cv.SignalAll();
507         }
508       });
509 
510   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
511       "WriteThread::WriteStall::Wait", [&](void* arg) {
512         {
513           InstrumentedMutexLock lock(&mutex);
514           WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg);
515           w_slowdown_set.insert(w);
516           // Allow the flush continue if all writer threads are blocked.
517           if (w_slowdown_set.size() + (unsigned long)w_no_slowdown.load(
518                                           std::memory_order_relaxed) ==
519               (unsigned long)num_writers) {
520             TEST_SYNC_POINT(
521                 "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
522           }
523         }
524       });
525   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
526 
527   bool s1 = true, s2 = true;
528 
529   std::function<void(int)> write_slow_down = [&](int cf) {
530     int a = thread_num.fetch_add(1);
531     std::string key = "foo" + std::to_string(a);
532     WriteOptions write_op;
533     write_op.no_slowdown = false;
534     Status tmp = Put(cf, Slice(key), DummyString(1), write_op);
535     InstrumentedMutexLock lock(&mutex);
536     s1 = s1 && tmp.ok();
537   };
538 
539   std::function<void(int)> write_no_slow_down = [&](int cf) {
540     int a = thread_num.fetch_add(1);
541     std::string key = "foo" + std::to_string(a);
542     WriteOptions write_op;
543     write_op.no_slowdown = true;
544     Status tmp = Put(cf, Slice(key), DummyString(1), write_op);
545     {
546       InstrumentedMutexLock lock(&mutex);
547       s2 = s2 && !tmp.ok();
548       w_no_slowdown.fetch_add(1);
549       // Allow the flush continue if all writer threads are blocked.
550       if (w_slowdown_set.size() +
551               (unsigned long)w_no_slowdown.load(std::memory_order_relaxed) ==
552           (unsigned long)num_writers) {
553         TEST_SYNC_POINT(
554             "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
555       }
556     }
557   };
558 
559   // Flow:
560   // main_writer thread will write but will be blocked (as Flush will on hold,
561   // buffer_size_ has exceeded, thus will create stall in effect).
562   //  |
563   //  |
564   //  multiple writer threads will be created to write across multiple columns
565   //  with different values of WriteOptions.no_slowdown. Some of them will
566   //  be blocked and some of them will return with Incomplete status.
567   //  |
568   //  |
569   //  Last writer thread will write and when its blocked/return it will signal
570   //  Flush to continue to clear the stall.
571   threads.emplace_back(write_slow_down, 1);
572   // Wait untill first thread (main_writer) writing to DB is blocked and then
573   // create the multiple writers which will be blocked from getting added to the
574   // queue because stall is in effect.
575   {
576     InstrumentedMutexLock lock(&mutex);
577     while (wait_count_db != 1) {
578       cv.Wait();
579     }
580   }
581 
582   for (int i = 0; i < num_writers; i += 2) {
583     threads.emplace_back(write_no_slow_down, (i) % 4);
584     threads.emplace_back(write_slow_down, (i + 1) % 4);
585   }
586   for (auto& t : threads) {
587     t.join();
588   }
589 
590   ASSERT_TRUE(s1);
591   ASSERT_TRUE(s2);
592   // Number of DBs blocked.
593   ASSERT_EQ(wait_count_db, 1);
594   // Number of Writer threads blocked.
595   ASSERT_EQ(w_slowdown_set.size(), num_writers / 2);
596   // Number of Writer threads with WriteOptions.no_slowdown = true.
597   ASSERT_EQ(w_no_slowdown.load(std::memory_order_relaxed), num_writers / 2);
598 
599   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
600   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
601 }
602 
603 // Test multiple threads writing across multiple columns of db_ and different
604 // dbs by passing different values to WriteOption.no_slown_down.
TEST_P(DBWriteBufferManagerTest,MixedSlowDownOptionsMultipleDB)605 TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) {
606   std::vector<std::string> dbnames;
607   std::vector<DB*> dbs;
608   int num_dbs = 4;
609 
610   for (int i = 0; i < num_dbs; i++) {
611     dbs.push_back(nullptr);
612     dbnames.push_back(
613         test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i)));
614   }
615 
616   Options options = CurrentOptions();
617   options.arena_block_size = 4096;
618   options.write_buffer_size = 500000;  // this is never hit
619   std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
620   ASSERT_LT(cache->GetUsage(), 256 * 1024);
621   cost_cache_ = GetParam();
622 
623   if (cost_cache_) {
624     options.write_buffer_manager.reset(
625         new WriteBufferManager(100000, cache, true));
626   } else {
627     options.write_buffer_manager.reset(
628         new WriteBufferManager(100000, nullptr, true));
629   }
630   CreateAndReopenWithCF({"cf1", "cf2"}, options);
631 
632   for (int i = 0; i < num_dbs; i++) {
633     ASSERT_OK(DestroyDB(dbnames[i], options));
634     ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i])));
635   }
636   WriteOptions wo;
637   wo.disableWAL = true;
638 
639   for (int i = 0; i < num_dbs; i++) {
640     ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000)));
641   }
642   // Insert to db_.
643   ASSERT_OK(Put(0, Key(1), DummyString(30000), wo));
644 
645   // WriteBufferManager::buffer_size_ has exceeded after the previous write to
646   // dbs[0] is completed.
647   std::vector<port::Thread> threads;
648   int wait_count_db = 0;
649   InstrumentedMutex mutex;
650   InstrumentedCondVar cv(&mutex);
651   std::unordered_set<WriteThread::Writer*> w_slowdown_set;
652   std::vector<port::Thread> writer_threads;
653   std::atomic<int> thread_num(0);
654   std::atomic<int> w_no_slowdown(0);
655 
656   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
657       {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
658         "DBImpl::BackgroundCallFlush:start"}});
659 
660   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
661       "WBMStallInterface::BlockDB", [&](void*) {
662         InstrumentedMutexLock lock(&mutex);
663         wait_count_db++;
664         cv.Signal();
665         // Allow the flush continue if all writer threads are blocked.
666         if (w_slowdown_set.size() +
667                 (unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) +
668                                 wait_count_db) ==
669             (unsigned long)(2 * num_dbs + 1)) {
670           TEST_SYNC_POINT(
671               "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
672         }
673       });
674 
675   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
676       "WriteThread::WriteStall::Wait", [&](void* arg) {
677         WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg);
678         InstrumentedMutexLock lock(&mutex);
679         w_slowdown_set.insert(w);
680         // Allow the flush continue if all writer threads are blocked.
681         if (w_slowdown_set.size() +
682                 (unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) +
683                                 wait_count_db) ==
684             (unsigned long)(2 * num_dbs + 1)) {
685           TEST_SYNC_POINT(
686               "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
687         }
688       });
689   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
690 
691   bool s1 = true, s2 = true;
692   std::function<void(DB*)> write_slow_down = [&](DB* db) {
693     int a = thread_num.fetch_add(1);
694     std::string key = "foo" + std::to_string(a);
695     WriteOptions write_op;
696     write_op.no_slowdown = false;
697     Status tmp = db->Put(write_op, Slice(key), DummyString(1));
698     InstrumentedMutexLock lock(&mutex);
699     s1 = s1 && tmp.ok();
700   };
701 
702   std::function<void(DB*)> write_no_slow_down = [&](DB* db) {
703     int a = thread_num.fetch_add(1);
704     std::string key = "foo" + std::to_string(a);
705     WriteOptions write_op;
706     write_op.no_slowdown = true;
707     Status tmp = db->Put(write_op, Slice(key), DummyString(1));
708     {
709       InstrumentedMutexLock lock(&mutex);
710       s2 = s2 && !tmp.ok();
711       w_no_slowdown.fetch_add(1);
712       if (w_slowdown_set.size() +
713               (unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) +
714                               wait_count_db) ==
715           (unsigned long)(2 * num_dbs + 1)) {
716         TEST_SYNC_POINT(
717             "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
718       }
719     }
720   };
721 
722   // Flow:
723   // first thread will write but will be blocked (as Flush will on hold,
724   // buffer_size_ has exceeded, thus will create stall in effect).
725   //  |
726   //  |
727   //  multiple writer threads will be created to write across multiple columns
728   //  of db_ and different DBs with different values of
729   //  WriteOptions.no_slowdown. Some of them will be blocked and some of them
730   //  will return with Incomplete status.
731   //  |
732   //  |
733   //  Last writer thread will write and when its blocked/return it will signal
734   //  Flush to continue to clear the stall.
735   threads.emplace_back(write_slow_down, db_);
736   // Wait untill first thread writing to DB is blocked and then
737   // create the multiple writers.
738   {
739     InstrumentedMutexLock lock(&mutex);
740     while (wait_count_db != 1) {
741       cv.Wait();
742     }
743   }
744 
745   for (int i = 0; i < num_dbs; i += 2) {
746     // Write to multiple columns of db_.
747     writer_threads.emplace_back(write_slow_down, db_);
748     writer_threads.emplace_back(write_no_slow_down, db_);
749     // Write to different DBs.
750     threads.emplace_back(write_slow_down, dbs[i]);
751     threads.emplace_back(write_no_slow_down, dbs[i + 1]);
752   }
753 
754   for (auto& t : threads) {
755     t.join();
756   }
757 
758   for (auto& t : writer_threads) {
759     t.join();
760   }
761 
762   ASSERT_TRUE(s1);
763   ASSERT_TRUE(s2);
764   // Number of DBs blocked.
765   ASSERT_EQ((num_dbs / 2) + 1, wait_count_db);
766   // Number of writer threads writing to db_ blocked from getting added to the
767   // queue.
768   ASSERT_EQ(w_slowdown_set.size(), num_dbs / 2);
769   // Number of threads with WriteOptions.no_slowdown = true.
770   ASSERT_EQ(w_no_slowdown.load(std::memory_order_relaxed), num_dbs);
771 
772   // Clean up DBs.
773   for (int i = 0; i < num_dbs; i++) {
774     ASSERT_OK(dbs[i]->Close());
775     ASSERT_OK(DestroyDB(dbnames[i], options));
776     delete dbs[i];
777   }
778 
779   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
780   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
781 }
782 
783 INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest,
784                         testing::Bool());
785 
786 }  // namespace ROCKSDB_NAMESPACE
787 
788 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
789 extern "C" {
790 void RegisterCustomObjects(int argc, char** argv);
791 }
792 #else
RegisterCustomObjects(int,char **)793 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
794 #endif  // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
795 
main(int argc,char ** argv)796 int main(int argc, char** argv) {
797   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
798   ::testing::InitGoogleTest(&argc, argv);
799   RegisterCustomObjects(argc, argv);
800   return RUN_ALL_TESTS();
801 }
802