1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_test_util.h"
11 #include "db/write_thread.h"
12 #include "port/stack_trace.h"
13
14 namespace ROCKSDB_NAMESPACE {
15
16 class DBWriteBufferManagerTest : public DBTestBase,
17 public testing::WithParamInterface<bool> {
18 public:
DBWriteBufferManagerTest()19 DBWriteBufferManagerTest()
20 : DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {}
21 bool cost_cache_;
22 };
23
TEST_P(DBWriteBufferManagerTest,SharedBufferAcrossCFs1)24 TEST_P(DBWriteBufferManagerTest, SharedBufferAcrossCFs1) {
25 Options options = CurrentOptions();
26 options.arena_block_size = 4096;
27 options.write_buffer_size = 500000; // this is never hit
28 std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
29 ASSERT_LT(cache->GetUsage(), 256 * 1024);
30 cost_cache_ = GetParam();
31
32 if (cost_cache_) {
33 options.write_buffer_manager.reset(
34 new WriteBufferManager(100000, cache, true));
35 } else {
36 options.write_buffer_manager.reset(
37 new WriteBufferManager(100000, nullptr, true));
38 }
39
40 WriteOptions wo;
41 wo.disableWAL = true;
42
43 CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options);
44 ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
45 Flush(3);
46 ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
47 ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
48 Flush(0);
49
50 // Write to "Default", "cf2" and "cf3".
51 ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
52 ASSERT_OK(Put(0, Key(1), DummyString(40000), wo));
53 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
54
55 ASSERT_OK(Put(3, Key(2), DummyString(40000), wo));
56 // WriteBufferManager::buffer_size_ has exceeded after the previous write is
57 // completed.
58
59 // This make sures write will go through and if stall was in effect, it will
60 // end.
61 ASSERT_OK(Put(0, Key(2), DummyString(1), wo));
62 }
63
64 // Test Single DB with multiple writer threads get blocked when
65 // WriteBufferManager execeeds buffer_size_ and flush is waiting to be
66 // finished.
TEST_P(DBWriteBufferManagerTest,SharedWriteBufferAcrossCFs2)67 TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs2) {
68 Options options = CurrentOptions();
69 options.arena_block_size = 4096;
70 options.write_buffer_size = 500000; // this is never hit
71 std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
72 ASSERT_LT(cache->GetUsage(), 256 * 1024);
73 cost_cache_ = GetParam();
74
75 if (cost_cache_) {
76 options.write_buffer_manager.reset(
77 new WriteBufferManager(100000, cache, true));
78 } else {
79 options.write_buffer_manager.reset(
80 new WriteBufferManager(100000, nullptr, true));
81 }
82 WriteOptions wo;
83 wo.disableWAL = true;
84
85 CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options);
86 ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
87 Flush(3);
88 ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
89 ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
90 Flush(0);
91
92 // Write to "Default", "cf2" and "cf3". No flush will be triggered.
93 ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
94 ASSERT_OK(Put(0, Key(1), DummyString(40000), wo));
95 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
96
97 ASSERT_OK(Put(3, Key(2), DummyString(40000), wo));
98 // WriteBufferManager::buffer_size_ has exceeded after the previous write is
99 // completed.
100
101 std::unordered_set<WriteThread::Writer*> w_set;
102 std::vector<port::Thread> threads;
103 int wait_count_db = 0;
104 int num_writers = 4;
105 InstrumentedMutex mutex;
106 InstrumentedCondVar cv(&mutex);
107 std::atomic<int> thread_num(0);
108
109 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
110 {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
111 "DBImpl::BackgroundCallFlush:start"}});
112
113 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
114 "WBMStallInterface::BlockDB", [&](void*) {
115 InstrumentedMutexLock lock(&mutex);
116 wait_count_db++;
117 cv.SignalAll();
118 });
119 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
120 "WriteThread::WriteStall::Wait", [&](void* arg) {
121 InstrumentedMutexLock lock(&mutex);
122 WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg);
123 w_set.insert(w);
124 // Allow the flush to continue if all writer threads are blocked.
125 if (w_set.size() == (unsigned long)num_writers) {
126 TEST_SYNC_POINT(
127 "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
128 }
129 });
130 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
131
132 bool s = true;
133
134 std::function<void(int)> writer = [&](int cf) {
135 int a = thread_num.fetch_add(1);
136 std::string key = "foo" + std::to_string(a);
137 Status tmp = Put(cf, Slice(key), DummyString(1), wo);
138 InstrumentedMutexLock lock(&mutex);
139 s = s && tmp.ok();
140 };
141
142 // Flow:
143 // main_writer thread will write but will be blocked (as Flush will on hold,
144 // buffer_size_ has exceeded, thus will create stall in effect).
145 // |
146 // |
147 // multiple writer threads will be created to write across multiple columns
148 // and they will be blocked.
149 // |
150 // |
151 // Last writer thread will write and when its blocked it will signal Flush to
152 // continue to clear the stall.
153
154 threads.emplace_back(writer, 1);
155 // Wait untill first thread (main_writer) writing to DB is blocked and then
156 // create the multiple writers which will be blocked from getting added to the
157 // queue because stall is in effect.
158 {
159 InstrumentedMutexLock lock(&mutex);
160 while (wait_count_db != 1) {
161 cv.Wait();
162 }
163 }
164 for (int i = 0; i < num_writers; i++) {
165 threads.emplace_back(writer, i % 4);
166 }
167 for (auto& t : threads) {
168 t.join();
169 }
170
171 ASSERT_TRUE(s);
172
173 // Number of DBs blocked.
174 ASSERT_EQ(wait_count_db, 1);
175 // Number of Writer threads blocked.
176 ASSERT_EQ(w_set.size(), num_writers);
177
178 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
179 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
180 }
181
182 // Test multiple DBs get blocked when WriteBufferManager limit exceeds and flush
183 // is waiting to be finished but DBs tries to write meanwhile.
TEST_P(DBWriteBufferManagerTest,SharedWriteBufferLimitAcrossDB)184 TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB) {
185 std::vector<std::string> dbnames;
186 std::vector<DB*> dbs;
187 int num_dbs = 3;
188
189 for (int i = 0; i < num_dbs; i++) {
190 dbs.push_back(nullptr);
191 dbnames.push_back(
192 test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i)));
193 }
194
195 Options options = CurrentOptions();
196 options.arena_block_size = 4096;
197 options.write_buffer_size = 500000; // this is never hit
198 std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
199 ASSERT_LT(cache->GetUsage(), 256 * 1024);
200 cost_cache_ = GetParam();
201
202 if (cost_cache_) {
203 options.write_buffer_manager.reset(
204 new WriteBufferManager(100000, cache, true));
205 } else {
206 options.write_buffer_manager.reset(
207 new WriteBufferManager(100000, nullptr, true));
208 }
209 CreateAndReopenWithCF({"cf1", "cf2"}, options);
210
211 for (int i = 0; i < num_dbs; i++) {
212 ASSERT_OK(DestroyDB(dbnames[i], options));
213 ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i])));
214 }
215 WriteOptions wo;
216 wo.disableWAL = true;
217
218 for (int i = 0; i < num_dbs; i++) {
219 ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000)));
220 }
221 // Insert to db_.
222 ASSERT_OK(Put(0, Key(1), DummyString(30000), wo));
223
224 // WriteBufferManager Limit exceeded.
225 std::vector<port::Thread> threads;
226 int wait_count_db = 0;
227 InstrumentedMutex mutex;
228 InstrumentedCondVar cv(&mutex);
229
230 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
231 {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
232 "DBImpl::BackgroundCallFlush:start"}});
233
234 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
235 "WBMStallInterface::BlockDB", [&](void*) {
236 {
237 InstrumentedMutexLock lock(&mutex);
238 wait_count_db++;
239 cv.Signal();
240 // Since this is the last DB, signal Flush to continue.
241 if (wait_count_db == num_dbs + 1) {
242 TEST_SYNC_POINT(
243 "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
244 }
245 }
246 });
247 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
248
249 bool s = true;
250
251 // Write to DB.
252 std::function<void(DB*)> write_db = [&](DB* db) {
253 Status tmp = db->Put(wo, Key(3), DummyString(1));
254 InstrumentedMutexLock lock(&mutex);
255 s = s && tmp.ok();
256 };
257
258 // Flow:
259 // db_ will write and will be blocked (as Flush will on hold and will create
260 // stall in effect).
261 // |
262 // multiple dbs writers will be created to write to that db and they will be
263 // blocked.
264 // |
265 // |
266 // Last writer will write and when its blocked it will signal Flush to
267 // continue to clear the stall.
268
269 threads.emplace_back(write_db, db_);
270 // Wait untill first DB is blocked and then create the multiple writers for
271 // different DBs which will be blocked from getting added to the queue because
272 // stall is in effect.
273 {
274 InstrumentedMutexLock lock(&mutex);
275 while (wait_count_db != 1) {
276 cv.Wait();
277 }
278 }
279 for (int i = 0; i < num_dbs; i++) {
280 threads.emplace_back(write_db, dbs[i]);
281 }
282 for (auto& t : threads) {
283 t.join();
284 }
285
286 ASSERT_TRUE(s);
287 ASSERT_EQ(num_dbs + 1, wait_count_db);
288 // Clean up DBs.
289 for (int i = 0; i < num_dbs; i++) {
290 ASSERT_OK(dbs[i]->Close());
291 ASSERT_OK(DestroyDB(dbnames[i], options));
292 delete dbs[i];
293 }
294
295 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
296 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
297 }
298
299 // Test multiple threads writing across multiple DBs and multiple columns get
300 // blocked when stall by WriteBufferManager is in effect.
TEST_P(DBWriteBufferManagerTest,SharedWriteBufferLimitAcrossDB1)301 TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB1) {
302 std::vector<std::string> dbnames;
303 std::vector<DB*> dbs;
304 int num_dbs = 3;
305
306 for (int i = 0; i < num_dbs; i++) {
307 dbs.push_back(nullptr);
308 dbnames.push_back(
309 test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i)));
310 }
311
312 Options options = CurrentOptions();
313 options.arena_block_size = 4096;
314 options.write_buffer_size = 500000; // this is never hit
315 std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
316 ASSERT_LT(cache->GetUsage(), 256 * 1024);
317 cost_cache_ = GetParam();
318
319 if (cost_cache_) {
320 options.write_buffer_manager.reset(
321 new WriteBufferManager(100000, cache, true));
322 } else {
323 options.write_buffer_manager.reset(
324 new WriteBufferManager(100000, nullptr, true));
325 }
326 CreateAndReopenWithCF({"cf1", "cf2"}, options);
327
328 for (int i = 0; i < num_dbs; i++) {
329 ASSERT_OK(DestroyDB(dbnames[i], options));
330 ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i])));
331 }
332 WriteOptions wo;
333 wo.disableWAL = true;
334
335 for (int i = 0; i < num_dbs; i++) {
336 ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000)));
337 }
338 // Insert to db_.
339 ASSERT_OK(Put(0, Key(1), DummyString(30000), wo));
340
341 // WriteBufferManager::buffer_size_ has exceeded after the previous write to
342 // dbs[0] is completed.
343 std::vector<port::Thread> threads;
344 int wait_count_db = 0;
345 InstrumentedMutex mutex;
346 InstrumentedCondVar cv(&mutex);
347 std::unordered_set<WriteThread::Writer*> w_set;
348 std::vector<port::Thread> writer_threads;
349 std::atomic<int> thread_num(0);
350
351 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
352 {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
353 "DBImpl::BackgroundCallFlush:start"}});
354
355 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
356 "WBMStallInterface::BlockDB", [&](void*) {
357 {
358 InstrumentedMutexLock lock(&mutex);
359 wait_count_db++;
360 thread_num.fetch_add(1);
361 cv.Signal();
362 // Allow the flush to continue if all writer threads are blocked.
363 if (thread_num.load(std::memory_order_relaxed) == 2 * num_dbs + 1) {
364 TEST_SYNC_POINT(
365 "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
366 }
367 }
368 });
369 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
370 "WriteThread::WriteStall::Wait", [&](void* arg) {
371 WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg);
372 {
373 InstrumentedMutexLock lock(&mutex);
374 w_set.insert(w);
375 thread_num.fetch_add(1);
376 // Allow the flush continue if all writer threads are blocked.
377 if (thread_num.load(std::memory_order_relaxed) == 2 * num_dbs + 1) {
378 TEST_SYNC_POINT(
379 "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
380 }
381 }
382 });
383 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
384
385 bool s1 = true, s2 = true;
386 // Write to multiple columns of db_.
387 std::function<void(int)> write_cf = [&](int cf) {
388 Status tmp = Put(cf, Key(3), DummyString(1), wo);
389 InstrumentedMutexLock lock(&mutex);
390 s1 = s1 && tmp.ok();
391 };
392 // Write to multiple DBs.
393 std::function<void(DB*)> write_db = [&](DB* db) {
394 Status tmp = db->Put(wo, Key(3), DummyString(1));
395 InstrumentedMutexLock lock(&mutex);
396 s2 = s2 && tmp.ok();
397 };
398
399 // Flow:
400 // thread will write to db_ will be blocked (as Flush will on hold,
401 // buffer_size_ has exceeded and will create stall in effect).
402 // |
403 // |
404 // multiple writers threads writing to different DBs and to db_ across
405 // multiple columns will be created and they will be blocked due to stall.
406 // |
407 // |
408 // Last writer thread will write and when its blocked it will signal Flush to
409 // continue to clear the stall.
410 threads.emplace_back(write_db, db_);
411 // Wait untill first thread is blocked and then create the multiple writer
412 // threads.
413 {
414 InstrumentedMutexLock lock(&mutex);
415 while (wait_count_db != 1) {
416 cv.Wait();
417 }
418 }
419
420 for (int i = 0; i < num_dbs; i++) {
421 // Write to multiple columns of db_.
422 writer_threads.emplace_back(write_cf, i % 3);
423 // Write to different dbs.
424 threads.emplace_back(write_db, dbs[i]);
425 }
426 for (auto& t : threads) {
427 t.join();
428 }
429 for (auto& t : writer_threads) {
430 t.join();
431 }
432
433 ASSERT_TRUE(s1);
434 ASSERT_TRUE(s2);
435
436 // Number of DBs blocked.
437 ASSERT_EQ(num_dbs + 1, wait_count_db);
438 // Number of Writer threads blocked.
439 ASSERT_EQ(w_set.size(), num_dbs);
440 // Clean up DBs.
441 for (int i = 0; i < num_dbs; i++) {
442 ASSERT_OK(dbs[i]->Close());
443 ASSERT_OK(DestroyDB(dbnames[i], options));
444 delete dbs[i];
445 }
446
447 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
448 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
449 }
450
451 // Test multiple threads writing across multiple columns of db_ by passing
452 // different values to WriteOption.no_slown_down.
TEST_P(DBWriteBufferManagerTest,MixedSlowDownOptionsSingleDB)453 TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsSingleDB) {
454 Options options = CurrentOptions();
455 options.arena_block_size = 4096;
456 options.write_buffer_size = 500000; // this is never hit
457 std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
458 ASSERT_LT(cache->GetUsage(), 256 * 1024);
459 cost_cache_ = GetParam();
460
461 if (cost_cache_) {
462 options.write_buffer_manager.reset(
463 new WriteBufferManager(100000, cache, true));
464 } else {
465 options.write_buffer_manager.reset(
466 new WriteBufferManager(100000, nullptr, true));
467 }
468 WriteOptions wo;
469 wo.disableWAL = true;
470
471 CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options);
472
473 ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
474 Flush(3);
475 ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
476 ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
477 Flush(0);
478
479 // Write to "Default", "cf2" and "cf3". No flush will be triggered.
480 ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
481 ASSERT_OK(Put(0, Key(1), DummyString(40000), wo));
482 ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
483 ASSERT_OK(Put(3, Key(2), DummyString(40000), wo));
484
485 // WriteBufferManager::buffer_size_ has exceeded after the previous write to
486 // db_ is completed.
487
488 std::unordered_set<WriteThread::Writer*> w_slowdown_set;
489 std::vector<port::Thread> threads;
490 int wait_count_db = 0;
491 int num_writers = 4;
492 InstrumentedMutex mutex;
493 InstrumentedCondVar cv(&mutex);
494 std::atomic<int> thread_num(0);
495 std::atomic<int> w_no_slowdown(0);
496
497 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
498 {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
499 "DBImpl::BackgroundCallFlush:start"}});
500
501 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
502 "WBMStallInterface::BlockDB", [&](void*) {
503 {
504 InstrumentedMutexLock lock(&mutex);
505 wait_count_db++;
506 cv.SignalAll();
507 }
508 });
509
510 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
511 "WriteThread::WriteStall::Wait", [&](void* arg) {
512 {
513 InstrumentedMutexLock lock(&mutex);
514 WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg);
515 w_slowdown_set.insert(w);
516 // Allow the flush continue if all writer threads are blocked.
517 if (w_slowdown_set.size() + (unsigned long)w_no_slowdown.load(
518 std::memory_order_relaxed) ==
519 (unsigned long)num_writers) {
520 TEST_SYNC_POINT(
521 "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
522 }
523 }
524 });
525 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
526
527 bool s1 = true, s2 = true;
528
529 std::function<void(int)> write_slow_down = [&](int cf) {
530 int a = thread_num.fetch_add(1);
531 std::string key = "foo" + std::to_string(a);
532 WriteOptions write_op;
533 write_op.no_slowdown = false;
534 Status tmp = Put(cf, Slice(key), DummyString(1), write_op);
535 InstrumentedMutexLock lock(&mutex);
536 s1 = s1 && tmp.ok();
537 };
538
539 std::function<void(int)> write_no_slow_down = [&](int cf) {
540 int a = thread_num.fetch_add(1);
541 std::string key = "foo" + std::to_string(a);
542 WriteOptions write_op;
543 write_op.no_slowdown = true;
544 Status tmp = Put(cf, Slice(key), DummyString(1), write_op);
545 {
546 InstrumentedMutexLock lock(&mutex);
547 s2 = s2 && !tmp.ok();
548 w_no_slowdown.fetch_add(1);
549 // Allow the flush continue if all writer threads are blocked.
550 if (w_slowdown_set.size() +
551 (unsigned long)w_no_slowdown.load(std::memory_order_relaxed) ==
552 (unsigned long)num_writers) {
553 TEST_SYNC_POINT(
554 "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
555 }
556 }
557 };
558
559 // Flow:
560 // main_writer thread will write but will be blocked (as Flush will on hold,
561 // buffer_size_ has exceeded, thus will create stall in effect).
562 // |
563 // |
564 // multiple writer threads will be created to write across multiple columns
565 // with different values of WriteOptions.no_slowdown. Some of them will
566 // be blocked and some of them will return with Incomplete status.
567 // |
568 // |
569 // Last writer thread will write and when its blocked/return it will signal
570 // Flush to continue to clear the stall.
571 threads.emplace_back(write_slow_down, 1);
572 // Wait untill first thread (main_writer) writing to DB is blocked and then
573 // create the multiple writers which will be blocked from getting added to the
574 // queue because stall is in effect.
575 {
576 InstrumentedMutexLock lock(&mutex);
577 while (wait_count_db != 1) {
578 cv.Wait();
579 }
580 }
581
582 for (int i = 0; i < num_writers; i += 2) {
583 threads.emplace_back(write_no_slow_down, (i) % 4);
584 threads.emplace_back(write_slow_down, (i + 1) % 4);
585 }
586 for (auto& t : threads) {
587 t.join();
588 }
589
590 ASSERT_TRUE(s1);
591 ASSERT_TRUE(s2);
592 // Number of DBs blocked.
593 ASSERT_EQ(wait_count_db, 1);
594 // Number of Writer threads blocked.
595 ASSERT_EQ(w_slowdown_set.size(), num_writers / 2);
596 // Number of Writer threads with WriteOptions.no_slowdown = true.
597 ASSERT_EQ(w_no_slowdown.load(std::memory_order_relaxed), num_writers / 2);
598
599 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
600 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
601 }
602
603 // Test multiple threads writing across multiple columns of db_ and different
604 // dbs by passing different values to WriteOption.no_slown_down.
TEST_P(DBWriteBufferManagerTest,MixedSlowDownOptionsMultipleDB)605 TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) {
606 std::vector<std::string> dbnames;
607 std::vector<DB*> dbs;
608 int num_dbs = 4;
609
610 for (int i = 0; i < num_dbs; i++) {
611 dbs.push_back(nullptr);
612 dbnames.push_back(
613 test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i)));
614 }
615
616 Options options = CurrentOptions();
617 options.arena_block_size = 4096;
618 options.write_buffer_size = 500000; // this is never hit
619 std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
620 ASSERT_LT(cache->GetUsage(), 256 * 1024);
621 cost_cache_ = GetParam();
622
623 if (cost_cache_) {
624 options.write_buffer_manager.reset(
625 new WriteBufferManager(100000, cache, true));
626 } else {
627 options.write_buffer_manager.reset(
628 new WriteBufferManager(100000, nullptr, true));
629 }
630 CreateAndReopenWithCF({"cf1", "cf2"}, options);
631
632 for (int i = 0; i < num_dbs; i++) {
633 ASSERT_OK(DestroyDB(dbnames[i], options));
634 ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i])));
635 }
636 WriteOptions wo;
637 wo.disableWAL = true;
638
639 for (int i = 0; i < num_dbs; i++) {
640 ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000)));
641 }
642 // Insert to db_.
643 ASSERT_OK(Put(0, Key(1), DummyString(30000), wo));
644
645 // WriteBufferManager::buffer_size_ has exceeded after the previous write to
646 // dbs[0] is completed.
647 std::vector<port::Thread> threads;
648 int wait_count_db = 0;
649 InstrumentedMutex mutex;
650 InstrumentedCondVar cv(&mutex);
651 std::unordered_set<WriteThread::Writer*> w_slowdown_set;
652 std::vector<port::Thread> writer_threads;
653 std::atomic<int> thread_num(0);
654 std::atomic<int> w_no_slowdown(0);
655
656 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
657 {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
658 "DBImpl::BackgroundCallFlush:start"}});
659
660 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
661 "WBMStallInterface::BlockDB", [&](void*) {
662 InstrumentedMutexLock lock(&mutex);
663 wait_count_db++;
664 cv.Signal();
665 // Allow the flush continue if all writer threads are blocked.
666 if (w_slowdown_set.size() +
667 (unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) +
668 wait_count_db) ==
669 (unsigned long)(2 * num_dbs + 1)) {
670 TEST_SYNC_POINT(
671 "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
672 }
673 });
674
675 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
676 "WriteThread::WriteStall::Wait", [&](void* arg) {
677 WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg);
678 InstrumentedMutexLock lock(&mutex);
679 w_slowdown_set.insert(w);
680 // Allow the flush continue if all writer threads are blocked.
681 if (w_slowdown_set.size() +
682 (unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) +
683 wait_count_db) ==
684 (unsigned long)(2 * num_dbs + 1)) {
685 TEST_SYNC_POINT(
686 "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
687 }
688 });
689 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
690
691 bool s1 = true, s2 = true;
692 std::function<void(DB*)> write_slow_down = [&](DB* db) {
693 int a = thread_num.fetch_add(1);
694 std::string key = "foo" + std::to_string(a);
695 WriteOptions write_op;
696 write_op.no_slowdown = false;
697 Status tmp = db->Put(write_op, Slice(key), DummyString(1));
698 InstrumentedMutexLock lock(&mutex);
699 s1 = s1 && tmp.ok();
700 };
701
702 std::function<void(DB*)> write_no_slow_down = [&](DB* db) {
703 int a = thread_num.fetch_add(1);
704 std::string key = "foo" + std::to_string(a);
705 WriteOptions write_op;
706 write_op.no_slowdown = true;
707 Status tmp = db->Put(write_op, Slice(key), DummyString(1));
708 {
709 InstrumentedMutexLock lock(&mutex);
710 s2 = s2 && !tmp.ok();
711 w_no_slowdown.fetch_add(1);
712 if (w_slowdown_set.size() +
713 (unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) +
714 wait_count_db) ==
715 (unsigned long)(2 * num_dbs + 1)) {
716 TEST_SYNC_POINT(
717 "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
718 }
719 }
720 };
721
722 // Flow:
723 // first thread will write but will be blocked (as Flush will on hold,
724 // buffer_size_ has exceeded, thus will create stall in effect).
725 // |
726 // |
727 // multiple writer threads will be created to write across multiple columns
728 // of db_ and different DBs with different values of
729 // WriteOptions.no_slowdown. Some of them will be blocked and some of them
730 // will return with Incomplete status.
731 // |
732 // |
733 // Last writer thread will write and when its blocked/return it will signal
734 // Flush to continue to clear the stall.
735 threads.emplace_back(write_slow_down, db_);
736 // Wait untill first thread writing to DB is blocked and then
737 // create the multiple writers.
738 {
739 InstrumentedMutexLock lock(&mutex);
740 while (wait_count_db != 1) {
741 cv.Wait();
742 }
743 }
744
745 for (int i = 0; i < num_dbs; i += 2) {
746 // Write to multiple columns of db_.
747 writer_threads.emplace_back(write_slow_down, db_);
748 writer_threads.emplace_back(write_no_slow_down, db_);
749 // Write to different DBs.
750 threads.emplace_back(write_slow_down, dbs[i]);
751 threads.emplace_back(write_no_slow_down, dbs[i + 1]);
752 }
753
754 for (auto& t : threads) {
755 t.join();
756 }
757
758 for (auto& t : writer_threads) {
759 t.join();
760 }
761
762 ASSERT_TRUE(s1);
763 ASSERT_TRUE(s2);
764 // Number of DBs blocked.
765 ASSERT_EQ((num_dbs / 2) + 1, wait_count_db);
766 // Number of writer threads writing to db_ blocked from getting added to the
767 // queue.
768 ASSERT_EQ(w_slowdown_set.size(), num_dbs / 2);
769 // Number of threads with WriteOptions.no_slowdown = true.
770 ASSERT_EQ(w_no_slowdown.load(std::memory_order_relaxed), num_dbs);
771
772 // Clean up DBs.
773 for (int i = 0; i < num_dbs; i++) {
774 ASSERT_OK(dbs[i]->Close());
775 ASSERT_OK(DestroyDB(dbnames[i], options));
776 delete dbs[i];
777 }
778
779 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
780 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
781 }
782
783 INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest,
784 testing::Bool());
785
786 } // namespace ROCKSDB_NAMESPACE
787
788 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
789 extern "C" {
790 void RegisterCustomObjects(int argc, char** argv);
791 }
792 #else
RegisterCustomObjects(int,char **)793 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
794 #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
795
main(int argc,char ** argv)796 int main(int argc, char** argv) {
797 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
798 ::testing::InitGoogleTest(&argc, argv);
799 RegisterCustomObjects(argc, argv);
800 return RUN_ALL_TESTS();
801 }
802