1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #ifndef ROCKSDB_LITE
10
11 #include "db/db_test_util.h"
12 #include "port/stack_trace.h"
13 #include "rocksdb/perf_context.h"
14 #include "rocksdb/sst_file_manager.h"
15 #include "test_util/fault_injection_test_env.h"
16 #if !defined(ROCKSDB_LITE)
17 #include "test_util/sync_point.h"
18 #endif
19
20 namespace rocksdb {
21
22 class DBErrorHandlingTest : public DBTestBase {
23 public:
DBErrorHandlingTest()24 DBErrorHandlingTest() : DBTestBase("/db_error_handling_test") {}
25
GetManifestNameFromLiveFiles()26 std::string GetManifestNameFromLiveFiles() {
27 std::vector<std::string> live_files;
28 uint64_t manifest_size;
29
30 dbfull()->GetLiveFiles(live_files, &manifest_size, false);
31 for (auto& file : live_files) {
32 uint64_t num = 0;
33 FileType type;
34 if (ParseFileName(file, &num, &type) && type == kDescriptorFile) {
35 return file;
36 }
37 }
38 return "";
39 }
40 };
41
42 class DBErrorHandlingEnv : public EnvWrapper {
43 public:
DBErrorHandlingEnv()44 DBErrorHandlingEnv() : EnvWrapper(Env::Default()),
45 trig_no_space(false), trig_io_error(false) {}
46
SetTrigNoSpace()47 void SetTrigNoSpace() {trig_no_space = true;}
SetTrigIoError()48 void SetTrigIoError() {trig_io_error = true;}
49 private:
50 bool trig_no_space;
51 bool trig_io_error;
52 };
53
54 class ErrorHandlerListener : public EventListener {
55 public:
ErrorHandlerListener()56 ErrorHandlerListener()
57 : mutex_(),
58 cv_(&mutex_),
59 no_auto_recovery_(false),
60 recovery_complete_(false),
61 file_creation_started_(false),
62 override_bg_error_(false),
63 file_count_(0),
64 fault_env_(nullptr) {}
65
OnTableFileCreationStarted(const TableFileCreationBriefInfo &)66 void OnTableFileCreationStarted(
67 const TableFileCreationBriefInfo& /*ti*/) override {
68 InstrumentedMutexLock l(&mutex_);
69 file_creation_started_ = true;
70 if (file_count_ > 0) {
71 if (--file_count_ == 0) {
72 fault_env_->SetFilesystemActive(false, file_creation_error_);
73 file_creation_error_ = Status::OK();
74 }
75 }
76 cv_.SignalAll();
77 }
78
OnErrorRecoveryBegin(BackgroundErrorReason,Status,bool * auto_recovery)79 void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
80 Status /*bg_error*/,
81 bool* auto_recovery) override {
82 if (*auto_recovery && no_auto_recovery_) {
83 *auto_recovery = false;
84 }
85 }
86
OnErrorRecoveryCompleted(Status)87 void OnErrorRecoveryCompleted(Status /*old_bg_error*/) override {
88 InstrumentedMutexLock l(&mutex_);
89 recovery_complete_ = true;
90 cv_.SignalAll();
91 }
92
WaitForRecovery(uint64_t)93 bool WaitForRecovery(uint64_t /*abs_time_us*/) {
94 InstrumentedMutexLock l(&mutex_);
95 while (!recovery_complete_) {
96 cv_.Wait(/*abs_time_us*/);
97 }
98 if (recovery_complete_) {
99 recovery_complete_ = false;
100 return true;
101 }
102 return false;
103 }
104
WaitForTableFileCreationStarted(uint64_t)105 void WaitForTableFileCreationStarted(uint64_t /*abs_time_us*/) {
106 InstrumentedMutexLock l(&mutex_);
107 while (!file_creation_started_) {
108 cv_.Wait(/*abs_time_us*/);
109 }
110 file_creation_started_ = false;
111 }
112
OnBackgroundError(BackgroundErrorReason,Status * bg_error)113 void OnBackgroundError(BackgroundErrorReason /*reason*/,
114 Status* bg_error) override {
115 if (override_bg_error_) {
116 *bg_error = bg_error_;
117 override_bg_error_ = false;
118 }
119 }
120
EnableAutoRecovery(bool enable=true)121 void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; }
122
OverrideBGError(Status bg_err)123 void OverrideBGError(Status bg_err) {
124 bg_error_ = bg_err;
125 override_bg_error_ = true;
126 }
127
InjectFileCreationError(FaultInjectionTestEnv * env,int file_count,Status s)128 void InjectFileCreationError(FaultInjectionTestEnv* env, int file_count,
129 Status s) {
130 fault_env_ = env;
131 file_count_ = file_count;
132 file_creation_error_ = s;
133 }
134
135 private:
136 InstrumentedMutex mutex_;
137 InstrumentedCondVar cv_;
138 bool no_auto_recovery_;
139 bool recovery_complete_;
140 bool file_creation_started_;
141 bool override_bg_error_;
142 int file_count_;
143 Status file_creation_error_;
144 Status bg_error_;
145 FaultInjectionTestEnv* fault_env_;
146 };
147
TEST_F(DBErrorHandlingTest,FLushWriteError)148 TEST_F(DBErrorHandlingTest, FLushWriteError) {
149 std::unique_ptr<FaultInjectionTestEnv> fault_env(
150 new FaultInjectionTestEnv(Env::Default()));
151 std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
152 Options options = GetDefaultOptions();
153 options.create_if_missing = true;
154 options.env = fault_env.get();
155 options.listeners.emplace_back(listener);
156 Status s;
157
158 listener->EnableAutoRecovery(false);
159 DestroyAndReopen(options);
160
161 Put(Key(0), "val");
162 SyncPoint::GetInstance()->SetCallBack(
163 "FlushJob::Start", [&](void *) {
164 fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
165 });
166 SyncPoint::GetInstance()->EnableProcessing();
167 s = Flush();
168 ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
169 SyncPoint::GetInstance()->DisableProcessing();
170 fault_env->SetFilesystemActive(true);
171 s = dbfull()->Resume();
172 ASSERT_EQ(s, Status::OK());
173
174 Reopen(options);
175 ASSERT_EQ("val", Get(Key(0)));
176 Destroy(options);
177 }
178
TEST_F(DBErrorHandlingTest,ManifestWriteError)179 TEST_F(DBErrorHandlingTest, ManifestWriteError) {
180 std::unique_ptr<FaultInjectionTestEnv> fault_env(
181 new FaultInjectionTestEnv(Env::Default()));
182 std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
183 Options options = GetDefaultOptions();
184 options.create_if_missing = true;
185 options.env = fault_env.get();
186 options.listeners.emplace_back(listener);
187 Status s;
188 std::string old_manifest;
189 std::string new_manifest;
190
191 listener->EnableAutoRecovery(false);
192 DestroyAndReopen(options);
193 old_manifest = GetManifestNameFromLiveFiles();
194
195 Put(Key(0), "val");
196 Flush();
197 Put(Key(1), "val");
198 SyncPoint::GetInstance()->SetCallBack(
199 "VersionSet::LogAndApply:WriteManifest", [&](void *) {
200 fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
201 });
202 SyncPoint::GetInstance()->EnableProcessing();
203 s = Flush();
204 ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
205 SyncPoint::GetInstance()->ClearAllCallBacks();
206 SyncPoint::GetInstance()->DisableProcessing();
207 fault_env->SetFilesystemActive(true);
208 s = dbfull()->Resume();
209 ASSERT_EQ(s, Status::OK());
210
211 new_manifest = GetManifestNameFromLiveFiles();
212 ASSERT_NE(new_manifest, old_manifest);
213
214 Reopen(options);
215 ASSERT_EQ("val", Get(Key(0)));
216 ASSERT_EQ("val", Get(Key(1)));
217 Close();
218 }
219
TEST_F(DBErrorHandlingTest,DoubleManifestWriteError)220 TEST_F(DBErrorHandlingTest, DoubleManifestWriteError) {
221 std::unique_ptr<FaultInjectionTestEnv> fault_env(
222 new FaultInjectionTestEnv(Env::Default()));
223 std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
224 Options options = GetDefaultOptions();
225 options.create_if_missing = true;
226 options.env = fault_env.get();
227 options.listeners.emplace_back(listener);
228 Status s;
229 std::string old_manifest;
230 std::string new_manifest;
231
232 listener->EnableAutoRecovery(false);
233 DestroyAndReopen(options);
234 old_manifest = GetManifestNameFromLiveFiles();
235
236 Put(Key(0), "val");
237 Flush();
238 Put(Key(1), "val");
239 SyncPoint::GetInstance()->SetCallBack(
240 "VersionSet::LogAndApply:WriteManifest", [&](void *) {
241 fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
242 });
243 SyncPoint::GetInstance()->EnableProcessing();
244 s = Flush();
245 ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
246 fault_env->SetFilesystemActive(true);
247
248 // This Resume() will attempt to create a new manifest file and fail again
249 s = dbfull()->Resume();
250 ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
251 fault_env->SetFilesystemActive(true);
252 SyncPoint::GetInstance()->ClearAllCallBacks();
253 SyncPoint::GetInstance()->DisableProcessing();
254
255 // A successful Resume() will create a new manifest file
256 s = dbfull()->Resume();
257 ASSERT_EQ(s, Status::OK());
258
259 new_manifest = GetManifestNameFromLiveFiles();
260 ASSERT_NE(new_manifest, old_manifest);
261
262 Reopen(options);
263 ASSERT_EQ("val", Get(Key(0)));
264 ASSERT_EQ("val", Get(Key(1)));
265 Close();
266 }
267
TEST_F(DBErrorHandlingTest,CompactionManifestWriteError)268 TEST_F(DBErrorHandlingTest, CompactionManifestWriteError) {
269 std::unique_ptr<FaultInjectionTestEnv> fault_env(
270 new FaultInjectionTestEnv(Env::Default()));
271 std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
272 Options options = GetDefaultOptions();
273 options.create_if_missing = true;
274 options.level0_file_num_compaction_trigger = 2;
275 options.listeners.emplace_back(listener);
276 options.env = fault_env.get();
277 Status s;
278 std::string old_manifest;
279 std::string new_manifest;
280 std::atomic<bool> fail_manifest(false);
281 DestroyAndReopen(options);
282 old_manifest = GetManifestNameFromLiveFiles();
283
284 Put(Key(0), "val");
285 Put(Key(2), "val");
286 s = Flush();
287 ASSERT_EQ(s, Status::OK());
288
289 rocksdb::SyncPoint::GetInstance()->LoadDependency(
290 // Wait for flush of 2nd L0 file before starting compaction
291 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
292 "BackgroundCallCompaction:0"},
293 // Wait for compaction to detect manifest write error
294 {"BackgroundCallCompaction:1",
295 "CompactionManifestWriteError:0"},
296 // Make compaction thread wait for error to be cleared
297 {"CompactionManifestWriteError:1",
298 "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"},
299 // Wait for DB instance to clear bg_error before calling
300 // TEST_WaitForCompact
301 {"SstFileManagerImpl::ClearError",
302 "CompactionManifestWriteError:2"}});
303 // trigger manifest write failure in compaction thread
304 rocksdb::SyncPoint::GetInstance()->SetCallBack(
305 "BackgroundCallCompaction:0", [&](void *) {
306 fail_manifest.store(true);
307 });
308 rocksdb::SyncPoint::GetInstance()->SetCallBack(
309 "VersionSet::LogAndApply:WriteManifest", [&](void *) {
310 if (fail_manifest.load()) {
311 fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
312 }
313 });
314 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
315
316 Put(Key(1), "val");
317 // This Flush will trigger a compaction, which will fail when appending to
318 // the manifest
319 s = Flush();
320 ASSERT_EQ(s, Status::OK());
321
322 TEST_SYNC_POINT("CompactionManifestWriteError:0");
323 // Clear all errors so when the compaction is retried, it will succeed
324 fault_env->SetFilesystemActive(true);
325 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
326 TEST_SYNC_POINT("CompactionManifestWriteError:1");
327 TEST_SYNC_POINT("CompactionManifestWriteError:2");
328
329 s = dbfull()->TEST_WaitForCompact();
330 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
331 ASSERT_EQ(s, Status::OK());
332
333 new_manifest = GetManifestNameFromLiveFiles();
334 ASSERT_NE(new_manifest, old_manifest);
335 Reopen(options);
336 ASSERT_EQ("val", Get(Key(0)));
337 ASSERT_EQ("val", Get(Key(1)));
338 ASSERT_EQ("val", Get(Key(2)));
339 Close();
340 }
341
TEST_F(DBErrorHandlingTest,CompactionWriteError)342 TEST_F(DBErrorHandlingTest, CompactionWriteError) {
343 std::unique_ptr<FaultInjectionTestEnv> fault_env(
344 new FaultInjectionTestEnv(Env::Default()));
345 std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
346 Options options = GetDefaultOptions();
347 options.create_if_missing = true;
348 options.level0_file_num_compaction_trigger = 2;
349 options.listeners.emplace_back(listener);
350 options.env = fault_env.get();
351 Status s;
352 DestroyAndReopen(options);
353
354 Put(Key(0), "va;");
355 Put(Key(2), "va;");
356 s = Flush();
357 ASSERT_EQ(s, Status::OK());
358
359 listener->OverrideBGError(
360 Status(Status::NoSpace(), Status::Severity::kHardError)
361 );
362 listener->EnableAutoRecovery(false);
363 rocksdb::SyncPoint::GetInstance()->LoadDependency(
364 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
365 "BackgroundCallCompaction:0"}});
366 rocksdb::SyncPoint::GetInstance()->SetCallBack(
367 "BackgroundCallCompaction:0", [&](void *) {
368 fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
369 });
370 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
371
372 Put(Key(1), "val");
373 s = Flush();
374 ASSERT_EQ(s, Status::OK());
375
376 s = dbfull()->TEST_WaitForCompact();
377 ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
378
379 fault_env->SetFilesystemActive(true);
380 s = dbfull()->Resume();
381 ASSERT_EQ(s, Status::OK());
382 Destroy(options);
383 }
384
TEST_F(DBErrorHandlingTest,CorruptionError)385 TEST_F(DBErrorHandlingTest, CorruptionError) {
386 std::unique_ptr<FaultInjectionTestEnv> fault_env(
387 new FaultInjectionTestEnv(Env::Default()));
388 Options options = GetDefaultOptions();
389 options.create_if_missing = true;
390 options.level0_file_num_compaction_trigger = 2;
391 options.env = fault_env.get();
392 Status s;
393 DestroyAndReopen(options);
394
395 Put(Key(0), "va;");
396 Put(Key(2), "va;");
397 s = Flush();
398 ASSERT_EQ(s, Status::OK());
399
400 rocksdb::SyncPoint::GetInstance()->LoadDependency(
401 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
402 "BackgroundCallCompaction:0"}});
403 rocksdb::SyncPoint::GetInstance()->SetCallBack(
404 "BackgroundCallCompaction:0", [&](void *) {
405 fault_env->SetFilesystemActive(false, Status::Corruption("Corruption"));
406 });
407 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
408
409 Put(Key(1), "val");
410 s = Flush();
411 ASSERT_EQ(s, Status::OK());
412
413 s = dbfull()->TEST_WaitForCompact();
414 ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kUnrecoverableError);
415
416 fault_env->SetFilesystemActive(true);
417 s = dbfull()->Resume();
418 ASSERT_NE(s, Status::OK());
419 Destroy(options);
420 }
421
TEST_F(DBErrorHandlingTest,AutoRecoverFlushError)422 TEST_F(DBErrorHandlingTest, AutoRecoverFlushError) {
423 std::unique_ptr<FaultInjectionTestEnv> fault_env(
424 new FaultInjectionTestEnv(Env::Default()));
425 std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
426 Options options = GetDefaultOptions();
427 options.create_if_missing = true;
428 options.env = fault_env.get();
429 options.listeners.emplace_back(listener);
430 Status s;
431
432 listener->EnableAutoRecovery();
433 DestroyAndReopen(options);
434
435 Put(Key(0), "val");
436 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
437 fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
438 });
439 SyncPoint::GetInstance()->EnableProcessing();
440 s = Flush();
441 ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
442 SyncPoint::GetInstance()->DisableProcessing();
443 fault_env->SetFilesystemActive(true);
444 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
445
446 s = Put(Key(1), "val");
447 ASSERT_EQ(s, Status::OK());
448
449 Reopen(options);
450 ASSERT_EQ("val", Get(Key(0)));
451 ASSERT_EQ("val", Get(Key(1)));
452 Destroy(options);
453 }
454
TEST_F(DBErrorHandlingTest,FailRecoverFlushError)455 TEST_F(DBErrorHandlingTest, FailRecoverFlushError) {
456 std::unique_ptr<FaultInjectionTestEnv> fault_env(
457 new FaultInjectionTestEnv(Env::Default()));
458 std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
459 Options options = GetDefaultOptions();
460 options.create_if_missing = true;
461 options.env = fault_env.get();
462 options.listeners.emplace_back(listener);
463 Status s;
464
465 listener->EnableAutoRecovery();
466 DestroyAndReopen(options);
467
468 Put(Key(0), "val");
469 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
470 fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
471 });
472 SyncPoint::GetInstance()->EnableProcessing();
473 s = Flush();
474 ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
475 // We should be able to shutdown the database while auto recovery is going
476 // on in the background
477 Close();
478 DestroyDB(dbname_, options);
479 }
480
TEST_F(DBErrorHandlingTest,WALWriteError)481 TEST_F(DBErrorHandlingTest, WALWriteError) {
482 std::unique_ptr<FaultInjectionTestEnv> fault_env(
483 new FaultInjectionTestEnv(Env::Default()));
484 std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
485 Options options = GetDefaultOptions();
486 options.create_if_missing = true;
487 options.writable_file_max_buffer_size = 32768;
488 options.env = fault_env.get();
489 options.listeners.emplace_back(listener);
490 Status s;
491 Random rnd(301);
492
493 listener->EnableAutoRecovery();
494 DestroyAndReopen(options);
495
496 {
497 WriteBatch batch;
498
499 for (auto i = 0; i<100; ++i) {
500 batch.Put(Key(i), RandomString(&rnd, 1024));
501 }
502
503 WriteOptions wopts;
504 wopts.sync = true;
505 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
506 };
507
508 {
509 WriteBatch batch;
510 int write_error = 0;
511
512 for (auto i = 100; i<199; ++i) {
513 batch.Put(Key(i), RandomString(&rnd, 1024));
514 }
515
516 SyncPoint::GetInstance()->SetCallBack("WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
517 write_error++;
518 if (write_error > 2) {
519 fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
520 }
521 });
522 SyncPoint::GetInstance()->EnableProcessing();
523 WriteOptions wopts;
524 wopts.sync = true;
525 s = dbfull()->Write(wopts, &batch);
526 ASSERT_EQ(s, s.NoSpace());
527 }
528 SyncPoint::GetInstance()->DisableProcessing();
529 fault_env->SetFilesystemActive(true);
530 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
531 for (auto i=0; i<199; ++i) {
532 if (i < 100) {
533 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
534 } else {
535 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
536 }
537 }
538 Reopen(options);
539 for (auto i=0; i<199; ++i) {
540 if (i < 100) {
541 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
542 } else {
543 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
544 }
545 }
546 Close();
547 }
548
TEST_F(DBErrorHandlingTest,MultiCFWALWriteError)549 TEST_F(DBErrorHandlingTest, MultiCFWALWriteError) {
550 std::unique_ptr<FaultInjectionTestEnv> fault_env(
551 new FaultInjectionTestEnv(Env::Default()));
552 std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
553 Options options = GetDefaultOptions();
554 options.create_if_missing = true;
555 options.writable_file_max_buffer_size = 32768;
556 options.env = fault_env.get();
557 options.listeners.emplace_back(listener);
558 Status s;
559 Random rnd(301);
560
561 listener->EnableAutoRecovery();
562 CreateAndReopenWithCF({"one", "two", "three"}, options);
563
564 {
565 WriteBatch batch;
566
567 for (auto i = 1; i < 4; ++i) {
568 for (auto j = 0; j < 100; ++j) {
569 batch.Put(handles_[i], Key(j), RandomString(&rnd, 1024));
570 }
571 }
572
573 WriteOptions wopts;
574 wopts.sync = true;
575 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
576 };
577
578 {
579 WriteBatch batch;
580 int write_error = 0;
581
582 // Write to one CF
583 for (auto i = 100; i < 199; ++i) {
584 batch.Put(handles_[2], Key(i), RandomString(&rnd, 1024));
585 }
586
587 SyncPoint::GetInstance()->SetCallBack(
588 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
589 write_error++;
590 if (write_error > 2) {
591 fault_env->SetFilesystemActive(false,
592 Status::NoSpace("Out of space"));
593 }
594 });
595 SyncPoint::GetInstance()->EnableProcessing();
596 WriteOptions wopts;
597 wopts.sync = true;
598 s = dbfull()->Write(wopts, &batch);
599 ASSERT_EQ(s, s.NoSpace());
600 }
601 SyncPoint::GetInstance()->DisableProcessing();
602 fault_env->SetFilesystemActive(true);
603 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
604
605 for (auto i = 1; i < 4; ++i) {
606 // Every CF should have been flushed
607 ASSERT_EQ(NumTableFilesAtLevel(0, i), 1);
608 }
609
610 for (auto i = 1; i < 4; ++i) {
611 for (auto j = 0; j < 199; ++j) {
612 if (j < 100) {
613 ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
614 } else {
615 ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
616 }
617 }
618 }
619 ReopenWithColumnFamilies({"default", "one", "two", "three"}, options);
620 for (auto i = 1; i < 4; ++i) {
621 for (auto j = 0; j < 199; ++j) {
622 if (j < 100) {
623 ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
624 } else {
625 ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
626 }
627 }
628 }
629 Close();
630 }
631
TEST_F(DBErrorHandlingTest,MultiDBCompactionError)632 TEST_F(DBErrorHandlingTest, MultiDBCompactionError) {
633 FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default());
634 std::vector<std::unique_ptr<FaultInjectionTestEnv>> fault_env;
635 std::vector<Options> options;
636 std::vector<std::shared_ptr<ErrorHandlerListener>> listener;
637 std::vector<DB*> db;
638 std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
639 int kNumDbInstances = 3;
640 Random rnd(301);
641
642 for (auto i = 0; i < kNumDbInstances; ++i) {
643 listener.emplace_back(new ErrorHandlerListener());
644 options.emplace_back(GetDefaultOptions());
645 fault_env.emplace_back(new FaultInjectionTestEnv(Env::Default()));
646 options[i].create_if_missing = true;
647 options[i].level0_file_num_compaction_trigger = 2;
648 options[i].writable_file_max_buffer_size = 32768;
649 options[i].env = fault_env[i].get();
650 options[i].listeners.emplace_back(listener[i]);
651 options[i].sst_file_manager = sfm;
652 DB* dbptr;
653 char buf[16];
654
655 listener[i]->EnableAutoRecovery();
656 // Setup for returning error for the 3rd SST, which would be level 1
657 listener[i]->InjectFileCreationError(fault_env[i].get(), 3,
658 Status::NoSpace("Out of space"));
659 snprintf(buf, sizeof(buf), "_%d", i);
660 DestroyDB(dbname_ + std::string(buf), options[i]);
661 ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),
662 Status::OK());
663 db.emplace_back(dbptr);
664 }
665
666 for (auto i = 0; i < kNumDbInstances; ++i) {
667 WriteBatch batch;
668
669 for (auto j = 0; j <= 100; ++j) {
670 batch.Put(Key(j), RandomString(&rnd, 1024));
671 }
672
673 WriteOptions wopts;
674 wopts.sync = true;
675 ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
676 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
677 }
678
679 def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
680 for (auto i = 0; i < kNumDbInstances; ++i) {
681 WriteBatch batch;
682
683 // Write to one CF
684 for (auto j = 100; j < 199; ++j) {
685 batch.Put(Key(j), RandomString(&rnd, 1024));
686 }
687
688 WriteOptions wopts;
689 wopts.sync = true;
690 ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
691 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
692 }
693
694 for (auto i = 0; i < kNumDbInstances; ++i) {
695 Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
696 ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
697 fault_env[i]->SetFilesystemActive(true);
698 }
699
700 def_env->SetFilesystemActive(true);
701 for (auto i = 0; i < kNumDbInstances; ++i) {
702 std::string prop;
703 ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
704 EXPECT_TRUE(db[i]->GetProperty(
705 "rocksdb.num-files-at-level" + NumberToString(0), &prop));
706 EXPECT_EQ(atoi(prop.c_str()), 0);
707 EXPECT_TRUE(db[i]->GetProperty(
708 "rocksdb.num-files-at-level" + NumberToString(1), &prop));
709 EXPECT_EQ(atoi(prop.c_str()), 1);
710 }
711
712 for (auto i = 0; i < kNumDbInstances; ++i) {
713 char buf[16];
714 snprintf(buf, sizeof(buf), "_%d", i);
715 delete db[i];
716 fault_env[i]->SetFilesystemActive(true);
717 if (getenv("KEEP_DB")) {
718 printf("DB is still at %s%s\n", dbname_.c_str(), buf);
719 } else {
720 Status s = DestroyDB(dbname_ + std::string(buf), options[i]);
721 }
722 }
723 options.clear();
724 sfm.reset();
725 delete def_env;
726 }
727
TEST_F(DBErrorHandlingTest,MultiDBVariousErrors)728 TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) {
729 FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default());
730 std::vector<std::unique_ptr<FaultInjectionTestEnv>> fault_env;
731 std::vector<Options> options;
732 std::vector<std::shared_ptr<ErrorHandlerListener>> listener;
733 std::vector<DB*> db;
734 std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
735 int kNumDbInstances = 3;
736 Random rnd(301);
737
738 for (auto i = 0; i < kNumDbInstances; ++i) {
739 listener.emplace_back(new ErrorHandlerListener());
740 options.emplace_back(GetDefaultOptions());
741 fault_env.emplace_back(new FaultInjectionTestEnv(Env::Default()));
742 options[i].create_if_missing = true;
743 options[i].level0_file_num_compaction_trigger = 2;
744 options[i].writable_file_max_buffer_size = 32768;
745 options[i].env = fault_env[i].get();
746 options[i].listeners.emplace_back(listener[i]);
747 options[i].sst_file_manager = sfm;
748 DB* dbptr;
749 char buf[16];
750
751 listener[i]->EnableAutoRecovery();
752 switch (i) {
753 case 0:
754 // Setup for returning error for the 3rd SST, which would be level 1
755 listener[i]->InjectFileCreationError(fault_env[i].get(), 3,
756 Status::NoSpace("Out of space"));
757 break;
758 case 1:
759 // Setup for returning error after the 1st SST, which would result
760 // in a hard error
761 listener[i]->InjectFileCreationError(fault_env[i].get(), 2,
762 Status::NoSpace("Out of space"));
763 break;
764 default:
765 break;
766 }
767 snprintf(buf, sizeof(buf), "_%d", i);
768 DestroyDB(dbname_ + std::string(buf), options[i]);
769 ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),
770 Status::OK());
771 db.emplace_back(dbptr);
772 }
773
774 for (auto i = 0; i < kNumDbInstances; ++i) {
775 WriteBatch batch;
776
777 for (auto j = 0; j <= 100; ++j) {
778 batch.Put(Key(j), RandomString(&rnd, 1024));
779 }
780
781 WriteOptions wopts;
782 wopts.sync = true;
783 ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
784 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
785 }
786
787 def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
788 for (auto i = 0; i < kNumDbInstances; ++i) {
789 WriteBatch batch;
790
791 // Write to one CF
792 for (auto j = 100; j < 199; ++j) {
793 batch.Put(Key(j), RandomString(&rnd, 1024));
794 }
795
796 WriteOptions wopts;
797 wopts.sync = true;
798 ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
799 if (i != 1) {
800 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
801 } else {
802 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::NoSpace());
803 }
804 }
805
806 for (auto i = 0; i < kNumDbInstances; ++i) {
807 Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
808 switch (i) {
809 case 0:
810 ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
811 break;
812 case 1:
813 ASSERT_EQ(s.severity(), Status::Severity::kHardError);
814 break;
815 case 2:
816 ASSERT_EQ(s, Status::OK());
817 break;
818 }
819 fault_env[i]->SetFilesystemActive(true);
820 }
821
822 def_env->SetFilesystemActive(true);
823 for (auto i = 0; i < kNumDbInstances; ++i) {
824 std::string prop;
825 if (i < 2) {
826 ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
827 }
828 if (i == 1) {
829 ASSERT_EQ(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true),
830 Status::OK());
831 }
832 EXPECT_TRUE(db[i]->GetProperty(
833 "rocksdb.num-files-at-level" + NumberToString(0), &prop));
834 EXPECT_EQ(atoi(prop.c_str()), 0);
835 EXPECT_TRUE(db[i]->GetProperty(
836 "rocksdb.num-files-at-level" + NumberToString(1), &prop));
837 EXPECT_EQ(atoi(prop.c_str()), 1);
838 }
839
840 for (auto i = 0; i < kNumDbInstances; ++i) {
841 char buf[16];
842 snprintf(buf, sizeof(buf), "_%d", i);
843 fault_env[i]->SetFilesystemActive(true);
844 delete db[i];
845 if (getenv("KEEP_DB")) {
846 printf("DB is still at %s%s\n", dbname_.c_str(), buf);
847 } else {
848 DestroyDB(dbname_ + std::string(buf), options[i]);
849 }
850 }
851 options.clear();
852 delete def_env;
853 }
854
855 } // namespace rocksdb
856
main(int argc,char ** argv)857 int main(int argc, char** argv) {
858 rocksdb::port::InstallStackTraceHandler();
859 ::testing::InitGoogleTest(&argc, argv);
860 return RUN_ALL_TESTS();
861 }
862
863 #else
864 #include <stdio.h>
865
main(int,char **)866 int main(int /*argc*/, char** /*argv*/) {
867 fprintf(stderr, "SKIPPED as Cuckoo table is not supported in ROCKSDB_LITE\n");
868 return 0;
869 }
870
871 #endif // ROCKSDB_LITE
872