1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #ifndef ROCKSDB_LITE
10 
11 #include "db/db_test_util.h"
12 #include "port/stack_trace.h"
13 #include "rocksdb/perf_context.h"
14 #include "rocksdb/sst_file_manager.h"
15 #include "test_util/fault_injection_test_env.h"
16 #if !defined(ROCKSDB_LITE)
17 #include "test_util/sync_point.h"
18 #endif
19 
20 namespace rocksdb {
21 
22 class DBErrorHandlingTest : public DBTestBase {
23  public:
DBErrorHandlingTest()24   DBErrorHandlingTest() : DBTestBase("/db_error_handling_test") {}
25 
GetManifestNameFromLiveFiles()26   std::string GetManifestNameFromLiveFiles() {
27     std::vector<std::string> live_files;
28     uint64_t manifest_size;
29 
30     dbfull()->GetLiveFiles(live_files, &manifest_size, false);
31     for (auto& file : live_files) {
32       uint64_t num = 0;
33       FileType type;
34       if (ParseFileName(file, &num, &type) && type == kDescriptorFile) {
35         return file;
36       }
37     }
38     return "";
39   }
40 };
41 
42 class DBErrorHandlingEnv : public EnvWrapper {
43   public:
DBErrorHandlingEnv()44     DBErrorHandlingEnv() : EnvWrapper(Env::Default()),
45       trig_no_space(false), trig_io_error(false) {}
46 
SetTrigNoSpace()47     void SetTrigNoSpace() {trig_no_space = true;}
SetTrigIoError()48     void SetTrigIoError() {trig_io_error = true;}
49   private:
50     bool trig_no_space;
51     bool trig_io_error;
52 };
53 
54 class ErrorHandlerListener : public EventListener {
55  public:
ErrorHandlerListener()56   ErrorHandlerListener()
57       : mutex_(),
58         cv_(&mutex_),
59         no_auto_recovery_(false),
60         recovery_complete_(false),
61         file_creation_started_(false),
62         override_bg_error_(false),
63         file_count_(0),
64         fault_env_(nullptr) {}
65 
OnTableFileCreationStarted(const TableFileCreationBriefInfo &)66   void OnTableFileCreationStarted(
67       const TableFileCreationBriefInfo& /*ti*/) override {
68     InstrumentedMutexLock l(&mutex_);
69     file_creation_started_ = true;
70     if (file_count_ > 0) {
71       if (--file_count_ == 0) {
72         fault_env_->SetFilesystemActive(false, file_creation_error_);
73         file_creation_error_ = Status::OK();
74       }
75     }
76     cv_.SignalAll();
77   }
78 
OnErrorRecoveryBegin(BackgroundErrorReason,Status,bool * auto_recovery)79   void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
80                             Status /*bg_error*/,
81                             bool* auto_recovery) override {
82     if (*auto_recovery && no_auto_recovery_) {
83       *auto_recovery = false;
84     }
85   }
86 
OnErrorRecoveryCompleted(Status)87   void OnErrorRecoveryCompleted(Status /*old_bg_error*/) override {
88     InstrumentedMutexLock l(&mutex_);
89     recovery_complete_ = true;
90     cv_.SignalAll();
91   }
92 
WaitForRecovery(uint64_t)93   bool WaitForRecovery(uint64_t /*abs_time_us*/) {
94     InstrumentedMutexLock l(&mutex_);
95     while (!recovery_complete_) {
96       cv_.Wait(/*abs_time_us*/);
97     }
98     if (recovery_complete_) {
99       recovery_complete_ = false;
100       return true;
101     }
102     return false;
103   }
104 
WaitForTableFileCreationStarted(uint64_t)105   void WaitForTableFileCreationStarted(uint64_t /*abs_time_us*/) {
106     InstrumentedMutexLock l(&mutex_);
107     while (!file_creation_started_) {
108       cv_.Wait(/*abs_time_us*/);
109     }
110     file_creation_started_ = false;
111   }
112 
OnBackgroundError(BackgroundErrorReason,Status * bg_error)113   void OnBackgroundError(BackgroundErrorReason /*reason*/,
114                          Status* bg_error) override {
115     if (override_bg_error_) {
116       *bg_error = bg_error_;
117       override_bg_error_ = false;
118     }
119   }
120 
EnableAutoRecovery(bool enable=true)121   void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; }
122 
OverrideBGError(Status bg_err)123   void OverrideBGError(Status bg_err) {
124     bg_error_ = bg_err;
125     override_bg_error_ = true;
126   }
127 
InjectFileCreationError(FaultInjectionTestEnv * env,int file_count,Status s)128   void InjectFileCreationError(FaultInjectionTestEnv* env, int file_count,
129                                Status s) {
130     fault_env_ = env;
131     file_count_ = file_count;
132     file_creation_error_ = s;
133   }
134 
135  private:
136   InstrumentedMutex mutex_;
137   InstrumentedCondVar cv_;
138   bool no_auto_recovery_;
139   bool recovery_complete_;
140   bool file_creation_started_;
141   bool override_bg_error_;
142   int file_count_;
143   Status file_creation_error_;
144   Status bg_error_;
145   FaultInjectionTestEnv* fault_env_;
146 };
147 
TEST_F(DBErrorHandlingTest,FLushWriteError)148 TEST_F(DBErrorHandlingTest, FLushWriteError) {
149   std::unique_ptr<FaultInjectionTestEnv> fault_env(
150       new FaultInjectionTestEnv(Env::Default()));
151   std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
152   Options options = GetDefaultOptions();
153   options.create_if_missing = true;
154   options.env = fault_env.get();
155   options.listeners.emplace_back(listener);
156   Status s;
157 
158   listener->EnableAutoRecovery(false);
159   DestroyAndReopen(options);
160 
161   Put(Key(0), "val");
162   SyncPoint::GetInstance()->SetCallBack(
163       "FlushJob::Start", [&](void *) {
164     fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
165   });
166   SyncPoint::GetInstance()->EnableProcessing();
167   s = Flush();
168   ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
169   SyncPoint::GetInstance()->DisableProcessing();
170   fault_env->SetFilesystemActive(true);
171   s = dbfull()->Resume();
172   ASSERT_EQ(s, Status::OK());
173 
174   Reopen(options);
175   ASSERT_EQ("val", Get(Key(0)));
176   Destroy(options);
177 }
178 
TEST_F(DBErrorHandlingTest,ManifestWriteError)179 TEST_F(DBErrorHandlingTest, ManifestWriteError) {
180   std::unique_ptr<FaultInjectionTestEnv> fault_env(
181       new FaultInjectionTestEnv(Env::Default()));
182   std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
183   Options options = GetDefaultOptions();
184   options.create_if_missing = true;
185   options.env = fault_env.get();
186   options.listeners.emplace_back(listener);
187   Status s;
188   std::string old_manifest;
189   std::string new_manifest;
190 
191   listener->EnableAutoRecovery(false);
192   DestroyAndReopen(options);
193   old_manifest = GetManifestNameFromLiveFiles();
194 
195   Put(Key(0), "val");
196   Flush();
197   Put(Key(1), "val");
198   SyncPoint::GetInstance()->SetCallBack(
199       "VersionSet::LogAndApply:WriteManifest", [&](void *) {
200     fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
201   });
202   SyncPoint::GetInstance()->EnableProcessing();
203   s = Flush();
204   ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
205   SyncPoint::GetInstance()->ClearAllCallBacks();
206   SyncPoint::GetInstance()->DisableProcessing();
207   fault_env->SetFilesystemActive(true);
208   s = dbfull()->Resume();
209   ASSERT_EQ(s, Status::OK());
210 
211   new_manifest = GetManifestNameFromLiveFiles();
212   ASSERT_NE(new_manifest, old_manifest);
213 
214   Reopen(options);
215   ASSERT_EQ("val", Get(Key(0)));
216   ASSERT_EQ("val", Get(Key(1)));
217   Close();
218 }
219 
TEST_F(DBErrorHandlingTest,DoubleManifestWriteError)220 TEST_F(DBErrorHandlingTest, DoubleManifestWriteError) {
221   std::unique_ptr<FaultInjectionTestEnv> fault_env(
222       new FaultInjectionTestEnv(Env::Default()));
223   std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
224   Options options = GetDefaultOptions();
225   options.create_if_missing = true;
226   options.env = fault_env.get();
227   options.listeners.emplace_back(listener);
228   Status s;
229   std::string old_manifest;
230   std::string new_manifest;
231 
232   listener->EnableAutoRecovery(false);
233   DestroyAndReopen(options);
234   old_manifest = GetManifestNameFromLiveFiles();
235 
236   Put(Key(0), "val");
237   Flush();
238   Put(Key(1), "val");
239   SyncPoint::GetInstance()->SetCallBack(
240       "VersionSet::LogAndApply:WriteManifest", [&](void *) {
241     fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
242   });
243   SyncPoint::GetInstance()->EnableProcessing();
244   s = Flush();
245   ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
246   fault_env->SetFilesystemActive(true);
247 
248   // This Resume() will attempt to create a new manifest file and fail again
249   s = dbfull()->Resume();
250   ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
251   fault_env->SetFilesystemActive(true);
252   SyncPoint::GetInstance()->ClearAllCallBacks();
253   SyncPoint::GetInstance()->DisableProcessing();
254 
255   // A successful Resume() will create a new manifest file
256   s = dbfull()->Resume();
257   ASSERT_EQ(s, Status::OK());
258 
259   new_manifest = GetManifestNameFromLiveFiles();
260   ASSERT_NE(new_manifest, old_manifest);
261 
262   Reopen(options);
263   ASSERT_EQ("val", Get(Key(0)));
264   ASSERT_EQ("val", Get(Key(1)));
265   Close();
266 }
267 
TEST_F(DBErrorHandlingTest,CompactionManifestWriteError)268 TEST_F(DBErrorHandlingTest, CompactionManifestWriteError) {
269   std::unique_ptr<FaultInjectionTestEnv> fault_env(
270       new FaultInjectionTestEnv(Env::Default()));
271   std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
272   Options options = GetDefaultOptions();
273   options.create_if_missing = true;
274   options.level0_file_num_compaction_trigger = 2;
275   options.listeners.emplace_back(listener);
276   options.env = fault_env.get();
277   Status s;
278   std::string old_manifest;
279   std::string new_manifest;
280   std::atomic<bool> fail_manifest(false);
281   DestroyAndReopen(options);
282   old_manifest = GetManifestNameFromLiveFiles();
283 
284   Put(Key(0), "val");
285   Put(Key(2), "val");
286   s = Flush();
287   ASSERT_EQ(s, Status::OK());
288 
289   rocksdb::SyncPoint::GetInstance()->LoadDependency(
290       // Wait for flush of 2nd L0 file before starting compaction
291       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
292         "BackgroundCallCompaction:0"},
293       // Wait for compaction to detect manifest write error
294        {"BackgroundCallCompaction:1",
295         "CompactionManifestWriteError:0"},
296       // Make compaction thread wait for error to be cleared
297        {"CompactionManifestWriteError:1",
298         "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"},
299       // Wait for DB instance to clear bg_error before calling
300       // TEST_WaitForCompact
301        {"SstFileManagerImpl::ClearError",
302         "CompactionManifestWriteError:2"}});
303   // trigger manifest write failure in compaction thread
304   rocksdb::SyncPoint::GetInstance()->SetCallBack(
305       "BackgroundCallCompaction:0", [&](void *) {
306       fail_manifest.store(true);
307       });
308   rocksdb::SyncPoint::GetInstance()->SetCallBack(
309       "VersionSet::LogAndApply:WriteManifest", [&](void *) {
310       if (fail_manifest.load()) {
311         fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
312       }
313       });
314   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
315 
316   Put(Key(1), "val");
317   // This Flush will trigger a compaction, which will fail when appending to
318   // the manifest
319   s = Flush();
320   ASSERT_EQ(s, Status::OK());
321 
322   TEST_SYNC_POINT("CompactionManifestWriteError:0");
323   // Clear all errors so when the compaction is retried, it will succeed
324   fault_env->SetFilesystemActive(true);
325   rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
326   TEST_SYNC_POINT("CompactionManifestWriteError:1");
327   TEST_SYNC_POINT("CompactionManifestWriteError:2");
328 
329   s = dbfull()->TEST_WaitForCompact();
330   rocksdb::SyncPoint::GetInstance()->DisableProcessing();
331   ASSERT_EQ(s, Status::OK());
332 
333   new_manifest = GetManifestNameFromLiveFiles();
334   ASSERT_NE(new_manifest, old_manifest);
335   Reopen(options);
336   ASSERT_EQ("val", Get(Key(0)));
337   ASSERT_EQ("val", Get(Key(1)));
338   ASSERT_EQ("val", Get(Key(2)));
339   Close();
340 }
341 
TEST_F(DBErrorHandlingTest,CompactionWriteError)342 TEST_F(DBErrorHandlingTest, CompactionWriteError) {
343   std::unique_ptr<FaultInjectionTestEnv> fault_env(
344       new FaultInjectionTestEnv(Env::Default()));
345   std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
346   Options options = GetDefaultOptions();
347   options.create_if_missing = true;
348   options.level0_file_num_compaction_trigger = 2;
349   options.listeners.emplace_back(listener);
350   options.env = fault_env.get();
351   Status s;
352   DestroyAndReopen(options);
353 
354   Put(Key(0), "va;");
355   Put(Key(2), "va;");
356   s = Flush();
357   ASSERT_EQ(s, Status::OK());
358 
359   listener->OverrideBGError(
360       Status(Status::NoSpace(), Status::Severity::kHardError)
361       );
362   listener->EnableAutoRecovery(false);
363   rocksdb::SyncPoint::GetInstance()->LoadDependency(
364       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
365         "BackgroundCallCompaction:0"}});
366   rocksdb::SyncPoint::GetInstance()->SetCallBack(
367       "BackgroundCallCompaction:0", [&](void *) {
368       fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
369       });
370   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
371 
372   Put(Key(1), "val");
373   s = Flush();
374   ASSERT_EQ(s, Status::OK());
375 
376   s = dbfull()->TEST_WaitForCompact();
377   ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
378 
379   fault_env->SetFilesystemActive(true);
380   s = dbfull()->Resume();
381   ASSERT_EQ(s, Status::OK());
382   Destroy(options);
383 }
384 
TEST_F(DBErrorHandlingTest,CorruptionError)385 TEST_F(DBErrorHandlingTest, CorruptionError) {
386   std::unique_ptr<FaultInjectionTestEnv> fault_env(
387       new FaultInjectionTestEnv(Env::Default()));
388   Options options = GetDefaultOptions();
389   options.create_if_missing = true;
390   options.level0_file_num_compaction_trigger = 2;
391   options.env = fault_env.get();
392   Status s;
393   DestroyAndReopen(options);
394 
395   Put(Key(0), "va;");
396   Put(Key(2), "va;");
397   s = Flush();
398   ASSERT_EQ(s, Status::OK());
399 
400   rocksdb::SyncPoint::GetInstance()->LoadDependency(
401       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
402         "BackgroundCallCompaction:0"}});
403   rocksdb::SyncPoint::GetInstance()->SetCallBack(
404       "BackgroundCallCompaction:0", [&](void *) {
405       fault_env->SetFilesystemActive(false, Status::Corruption("Corruption"));
406       });
407   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
408 
409   Put(Key(1), "val");
410   s = Flush();
411   ASSERT_EQ(s, Status::OK());
412 
413   s = dbfull()->TEST_WaitForCompact();
414   ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kUnrecoverableError);
415 
416   fault_env->SetFilesystemActive(true);
417   s = dbfull()->Resume();
418   ASSERT_NE(s, Status::OK());
419   Destroy(options);
420 }
421 
TEST_F(DBErrorHandlingTest,AutoRecoverFlushError)422 TEST_F(DBErrorHandlingTest, AutoRecoverFlushError) {
423   std::unique_ptr<FaultInjectionTestEnv> fault_env(
424       new FaultInjectionTestEnv(Env::Default()));
425   std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
426   Options options = GetDefaultOptions();
427   options.create_if_missing = true;
428   options.env = fault_env.get();
429   options.listeners.emplace_back(listener);
430   Status s;
431 
432   listener->EnableAutoRecovery();
433   DestroyAndReopen(options);
434 
435   Put(Key(0), "val");
436   SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
437     fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
438   });
439   SyncPoint::GetInstance()->EnableProcessing();
440   s = Flush();
441   ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
442   SyncPoint::GetInstance()->DisableProcessing();
443   fault_env->SetFilesystemActive(true);
444   ASSERT_EQ(listener->WaitForRecovery(5000000), true);
445 
446   s = Put(Key(1), "val");
447   ASSERT_EQ(s, Status::OK());
448 
449   Reopen(options);
450   ASSERT_EQ("val", Get(Key(0)));
451   ASSERT_EQ("val", Get(Key(1)));
452   Destroy(options);
453 }
454 
TEST_F(DBErrorHandlingTest,FailRecoverFlushError)455 TEST_F(DBErrorHandlingTest, FailRecoverFlushError) {
456   std::unique_ptr<FaultInjectionTestEnv> fault_env(
457       new FaultInjectionTestEnv(Env::Default()));
458   std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
459   Options options = GetDefaultOptions();
460   options.create_if_missing = true;
461   options.env = fault_env.get();
462   options.listeners.emplace_back(listener);
463   Status s;
464 
465   listener->EnableAutoRecovery();
466   DestroyAndReopen(options);
467 
468   Put(Key(0), "val");
469   SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
470     fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
471   });
472   SyncPoint::GetInstance()->EnableProcessing();
473   s = Flush();
474   ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
475   // We should be able to shutdown the database while auto recovery is going
476   // on in the background
477   Close();
478   DestroyDB(dbname_, options);
479 }
480 
TEST_F(DBErrorHandlingTest,WALWriteError)481 TEST_F(DBErrorHandlingTest, WALWriteError) {
482   std::unique_ptr<FaultInjectionTestEnv> fault_env(
483       new FaultInjectionTestEnv(Env::Default()));
484   std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
485   Options options = GetDefaultOptions();
486   options.create_if_missing = true;
487   options.writable_file_max_buffer_size = 32768;
488   options.env = fault_env.get();
489   options.listeners.emplace_back(listener);
490   Status s;
491   Random rnd(301);
492 
493   listener->EnableAutoRecovery();
494   DestroyAndReopen(options);
495 
496   {
497     WriteBatch batch;
498 
499     for (auto i = 0; i<100; ++i) {
500       batch.Put(Key(i), RandomString(&rnd, 1024));
501     }
502 
503     WriteOptions wopts;
504     wopts.sync = true;
505     ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
506   };
507 
508   {
509     WriteBatch batch;
510     int write_error = 0;
511 
512     for (auto i = 100; i<199; ++i) {
513       batch.Put(Key(i), RandomString(&rnd, 1024));
514     }
515 
516     SyncPoint::GetInstance()->SetCallBack("WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
517       write_error++;
518       if (write_error > 2) {
519         fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
520       }
521     });
522     SyncPoint::GetInstance()->EnableProcessing();
523     WriteOptions wopts;
524     wopts.sync = true;
525     s = dbfull()->Write(wopts, &batch);
526     ASSERT_EQ(s, s.NoSpace());
527   }
528   SyncPoint::GetInstance()->DisableProcessing();
529   fault_env->SetFilesystemActive(true);
530   ASSERT_EQ(listener->WaitForRecovery(5000000), true);
531   for (auto i=0; i<199; ++i) {
532     if (i < 100) {
533       ASSERT_NE(Get(Key(i)), "NOT_FOUND");
534     } else {
535       ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
536     }
537   }
538   Reopen(options);
539   for (auto i=0; i<199; ++i) {
540     if (i < 100) {
541       ASSERT_NE(Get(Key(i)), "NOT_FOUND");
542     } else {
543       ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
544     }
545   }
546   Close();
547 }
548 
TEST_F(DBErrorHandlingTest,MultiCFWALWriteError)549 TEST_F(DBErrorHandlingTest, MultiCFWALWriteError) {
550   std::unique_ptr<FaultInjectionTestEnv> fault_env(
551       new FaultInjectionTestEnv(Env::Default()));
552   std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
553   Options options = GetDefaultOptions();
554   options.create_if_missing = true;
555   options.writable_file_max_buffer_size = 32768;
556   options.env = fault_env.get();
557   options.listeners.emplace_back(listener);
558   Status s;
559   Random rnd(301);
560 
561   listener->EnableAutoRecovery();
562   CreateAndReopenWithCF({"one", "two", "three"}, options);
563 
564   {
565     WriteBatch batch;
566 
567     for (auto i = 1; i < 4; ++i) {
568       for (auto j = 0; j < 100; ++j) {
569         batch.Put(handles_[i], Key(j), RandomString(&rnd, 1024));
570       }
571     }
572 
573     WriteOptions wopts;
574     wopts.sync = true;
575     ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
576   };
577 
578   {
579     WriteBatch batch;
580     int write_error = 0;
581 
582     // Write to one CF
583     for (auto i = 100; i < 199; ++i) {
584       batch.Put(handles_[2], Key(i), RandomString(&rnd, 1024));
585     }
586 
587     SyncPoint::GetInstance()->SetCallBack(
588         "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
589           write_error++;
590           if (write_error > 2) {
591             fault_env->SetFilesystemActive(false,
592                                            Status::NoSpace("Out of space"));
593           }
594         });
595     SyncPoint::GetInstance()->EnableProcessing();
596     WriteOptions wopts;
597     wopts.sync = true;
598     s = dbfull()->Write(wopts, &batch);
599     ASSERT_EQ(s, s.NoSpace());
600   }
601   SyncPoint::GetInstance()->DisableProcessing();
602   fault_env->SetFilesystemActive(true);
603   ASSERT_EQ(listener->WaitForRecovery(5000000), true);
604 
605   for (auto i = 1; i < 4; ++i) {
606     // Every CF should have been flushed
607     ASSERT_EQ(NumTableFilesAtLevel(0, i), 1);
608   }
609 
610   for (auto i = 1; i < 4; ++i) {
611     for (auto j = 0; j < 199; ++j) {
612       if (j < 100) {
613         ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
614       } else {
615         ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
616       }
617     }
618   }
619   ReopenWithColumnFamilies({"default", "one", "two", "three"}, options);
620   for (auto i = 1; i < 4; ++i) {
621     for (auto j = 0; j < 199; ++j) {
622       if (j < 100) {
623         ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
624       } else {
625         ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
626       }
627     }
628   }
629   Close();
630 }
631 
TEST_F(DBErrorHandlingTest,MultiDBCompactionError)632 TEST_F(DBErrorHandlingTest, MultiDBCompactionError) {
633   FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default());
634   std::vector<std::unique_ptr<FaultInjectionTestEnv>> fault_env;
635   std::vector<Options> options;
636   std::vector<std::shared_ptr<ErrorHandlerListener>> listener;
637   std::vector<DB*> db;
638   std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
639   int kNumDbInstances = 3;
640   Random rnd(301);
641 
642   for (auto i = 0; i < kNumDbInstances; ++i) {
643     listener.emplace_back(new ErrorHandlerListener());
644     options.emplace_back(GetDefaultOptions());
645     fault_env.emplace_back(new FaultInjectionTestEnv(Env::Default()));
646     options[i].create_if_missing = true;
647     options[i].level0_file_num_compaction_trigger = 2;
648     options[i].writable_file_max_buffer_size = 32768;
649     options[i].env = fault_env[i].get();
650     options[i].listeners.emplace_back(listener[i]);
651     options[i].sst_file_manager = sfm;
652     DB* dbptr;
653     char buf[16];
654 
655     listener[i]->EnableAutoRecovery();
656     // Setup for returning error for the 3rd SST, which would be level 1
657     listener[i]->InjectFileCreationError(fault_env[i].get(), 3,
658                                          Status::NoSpace("Out of space"));
659     snprintf(buf, sizeof(buf), "_%d", i);
660     DestroyDB(dbname_ + std::string(buf), options[i]);
661     ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),
662               Status::OK());
663     db.emplace_back(dbptr);
664   }
665 
666   for (auto i = 0; i < kNumDbInstances; ++i) {
667     WriteBatch batch;
668 
669     for (auto j = 0; j <= 100; ++j) {
670       batch.Put(Key(j), RandomString(&rnd, 1024));
671     }
672 
673     WriteOptions wopts;
674     wopts.sync = true;
675     ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
676     ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
677   }
678 
679   def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
680   for (auto i = 0; i < kNumDbInstances; ++i) {
681     WriteBatch batch;
682 
683     // Write to one CF
684     for (auto j = 100; j < 199; ++j) {
685       batch.Put(Key(j), RandomString(&rnd, 1024));
686     }
687 
688     WriteOptions wopts;
689     wopts.sync = true;
690     ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
691     ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
692   }
693 
694   for (auto i = 0; i < kNumDbInstances; ++i) {
695     Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
696     ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
697     fault_env[i]->SetFilesystemActive(true);
698   }
699 
700   def_env->SetFilesystemActive(true);
701   for (auto i = 0; i < kNumDbInstances; ++i) {
702     std::string prop;
703     ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
704     EXPECT_TRUE(db[i]->GetProperty(
705         "rocksdb.num-files-at-level" + NumberToString(0), &prop));
706     EXPECT_EQ(atoi(prop.c_str()), 0);
707     EXPECT_TRUE(db[i]->GetProperty(
708         "rocksdb.num-files-at-level" + NumberToString(1), &prop));
709     EXPECT_EQ(atoi(prop.c_str()), 1);
710   }
711 
712   for (auto i = 0; i < kNumDbInstances; ++i) {
713     char buf[16];
714     snprintf(buf, sizeof(buf), "_%d", i);
715     delete db[i];
716     fault_env[i]->SetFilesystemActive(true);
717     if (getenv("KEEP_DB")) {
718       printf("DB is still at %s%s\n", dbname_.c_str(), buf);
719     } else {
720       Status s = DestroyDB(dbname_ + std::string(buf), options[i]);
721     }
722   }
723   options.clear();
724   sfm.reset();
725   delete def_env;
726 }
727 
TEST_F(DBErrorHandlingTest,MultiDBVariousErrors)728 TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) {
729   FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default());
730   std::vector<std::unique_ptr<FaultInjectionTestEnv>> fault_env;
731   std::vector<Options> options;
732   std::vector<std::shared_ptr<ErrorHandlerListener>> listener;
733   std::vector<DB*> db;
734   std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
735   int kNumDbInstances = 3;
736   Random rnd(301);
737 
738   for (auto i = 0; i < kNumDbInstances; ++i) {
739     listener.emplace_back(new ErrorHandlerListener());
740     options.emplace_back(GetDefaultOptions());
741     fault_env.emplace_back(new FaultInjectionTestEnv(Env::Default()));
742     options[i].create_if_missing = true;
743     options[i].level0_file_num_compaction_trigger = 2;
744     options[i].writable_file_max_buffer_size = 32768;
745     options[i].env = fault_env[i].get();
746     options[i].listeners.emplace_back(listener[i]);
747     options[i].sst_file_manager = sfm;
748     DB* dbptr;
749     char buf[16];
750 
751     listener[i]->EnableAutoRecovery();
752     switch (i) {
753       case 0:
754         // Setup for returning error for the 3rd SST, which would be level 1
755         listener[i]->InjectFileCreationError(fault_env[i].get(), 3,
756                                              Status::NoSpace("Out of space"));
757         break;
758       case 1:
759         // Setup for returning error after the 1st SST, which would result
760         // in a hard error
761         listener[i]->InjectFileCreationError(fault_env[i].get(), 2,
762                                              Status::NoSpace("Out of space"));
763         break;
764       default:
765         break;
766     }
767     snprintf(buf, sizeof(buf), "_%d", i);
768     DestroyDB(dbname_ + std::string(buf), options[i]);
769     ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),
770               Status::OK());
771     db.emplace_back(dbptr);
772   }
773 
774   for (auto i = 0; i < kNumDbInstances; ++i) {
775     WriteBatch batch;
776 
777     for (auto j = 0; j <= 100; ++j) {
778       batch.Put(Key(j), RandomString(&rnd, 1024));
779     }
780 
781     WriteOptions wopts;
782     wopts.sync = true;
783     ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
784     ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
785   }
786 
787   def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
788   for (auto i = 0; i < kNumDbInstances; ++i) {
789     WriteBatch batch;
790 
791     // Write to one CF
792     for (auto j = 100; j < 199; ++j) {
793       batch.Put(Key(j), RandomString(&rnd, 1024));
794     }
795 
796     WriteOptions wopts;
797     wopts.sync = true;
798     ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
799     if (i != 1) {
800       ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
801     } else {
802       ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::NoSpace());
803     }
804   }
805 
806   for (auto i = 0; i < kNumDbInstances; ++i) {
807     Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
808     switch (i) {
809       case 0:
810         ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
811         break;
812       case 1:
813         ASSERT_EQ(s.severity(), Status::Severity::kHardError);
814         break;
815       case 2:
816         ASSERT_EQ(s, Status::OK());
817         break;
818     }
819     fault_env[i]->SetFilesystemActive(true);
820   }
821 
822   def_env->SetFilesystemActive(true);
823   for (auto i = 0; i < kNumDbInstances; ++i) {
824     std::string prop;
825     if (i < 2) {
826       ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
827     }
828     if (i == 1) {
829       ASSERT_EQ(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true),
830                 Status::OK());
831     }
832     EXPECT_TRUE(db[i]->GetProperty(
833         "rocksdb.num-files-at-level" + NumberToString(0), &prop));
834     EXPECT_EQ(atoi(prop.c_str()), 0);
835     EXPECT_TRUE(db[i]->GetProperty(
836         "rocksdb.num-files-at-level" + NumberToString(1), &prop));
837     EXPECT_EQ(atoi(prop.c_str()), 1);
838   }
839 
840   for (auto i = 0; i < kNumDbInstances; ++i) {
841     char buf[16];
842     snprintf(buf, sizeof(buf), "_%d", i);
843     fault_env[i]->SetFilesystemActive(true);
844     delete db[i];
845     if (getenv("KEEP_DB")) {
846       printf("DB is still at %s%s\n", dbname_.c_str(), buf);
847     } else {
848       DestroyDB(dbname_ + std::string(buf), options[i]);
849     }
850   }
851   options.clear();
852   delete def_env;
853 }
854 
855 }  // namespace rocksdb
856 
main(int argc,char ** argv)857 int main(int argc, char** argv) {
858   rocksdb::port::InstallStackTraceHandler();
859   ::testing::InitGoogleTest(&argc, argv);
860   return RUN_ALL_TESTS();
861 }
862 
863 #else
864 #include <stdio.h>
865 
main(int,char **)866 int main(int /*argc*/, char** /*argv*/) {
867   fprintf(stderr, "SKIPPED as Cuckoo table is not supported in ROCKSDB_LITE\n");
868   return 0;
869 }
870 
871 #endif  // ROCKSDB_LITE
872