1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #ifndef ROCKSDB_LITE
10 
11 #include "db/db_test_util.h"
12 #include "file/sst_file_manager_impl.h"
13 #include "port/stack_trace.h"
14 #include "rocksdb/io_status.h"
15 #include "rocksdb/sst_file_manager.h"
16 #if !defined(ROCKSDB_LITE)
17 #include "test_util/sync_point.h"
18 #endif
19 #include "util/random.h"
20 #include "utilities/fault_injection_env.h"
21 #include "utilities/fault_injection_fs.h"
22 
23 namespace ROCKSDB_NAMESPACE {
24 
25 class DBErrorHandlingFSTest : public DBTestBase {
26  public:
DBErrorHandlingFSTest()27   DBErrorHandlingFSTest()
28       : DBTestBase("/db_error_handling_fs_test", /*env_do_fsync=*/true) {
29     fault_fs_.reset(new FaultInjectionTestFS(env_->GetFileSystem()));
30     fault_env_.reset(new CompositeEnvWrapper(env_, fault_fs_));
31   }
32 
GetManifestNameFromLiveFiles()33   std::string GetManifestNameFromLiveFiles() {
34     std::vector<std::string> live_files;
35     uint64_t manifest_size;
36 
37     Status s = dbfull()->GetLiveFiles(live_files, &manifest_size, false);
38     if (!s.ok()) {
39       return "";
40     }
41     for (auto& file : live_files) {
42       uint64_t num = 0;
43       FileType type;
44       if (ParseFileName(file, &num, &type) && type == kDescriptorFile) {
45         return file;
46       }
47     }
48     return "";
49   }
50 
51   std::shared_ptr<FaultInjectionTestFS> fault_fs_;
52   std::unique_ptr<Env> fault_env_;
53 };
54 
55 class ErrorHandlerFSListener : public EventListener {
56  public:
ErrorHandlerFSListener()57   ErrorHandlerFSListener()
58       : mutex_(),
59         cv_(&mutex_),
60         no_auto_recovery_(false),
61         recovery_complete_(false),
62         file_creation_started_(false),
63         override_bg_error_(false),
64         file_count_(0),
65         fault_fs_(nullptr) {}
~ErrorHandlerFSListener()66   ~ErrorHandlerFSListener() {
67     file_creation_error_.PermitUncheckedError();
68     bg_error_.PermitUncheckedError();
69   }
70 
OnTableFileCreationStarted(const TableFileCreationBriefInfo &)71   void OnTableFileCreationStarted(
72       const TableFileCreationBriefInfo& /*ti*/) override {
73     InstrumentedMutexLock l(&mutex_);
74     file_creation_started_ = true;
75     if (file_count_ > 0) {
76       if (--file_count_ == 0) {
77         fault_fs_->SetFilesystemActive(false, file_creation_error_);
78         file_creation_error_ = IOStatus::OK();
79       }
80     }
81     cv_.SignalAll();
82   }
83 
OnErrorRecoveryBegin(BackgroundErrorReason,Status bg_error,bool * auto_recovery)84   void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/, Status bg_error,
85                             bool* auto_recovery) override {
86     bg_error.PermitUncheckedError();
87     if (*auto_recovery && no_auto_recovery_) {
88       *auto_recovery = false;
89     }
90   }
91 
OnErrorRecoveryCompleted(Status old_bg_error)92   void OnErrorRecoveryCompleted(Status old_bg_error) override {
93     InstrumentedMutexLock l(&mutex_);
94     recovery_complete_ = true;
95     cv_.SignalAll();
96     old_bg_error.PermitUncheckedError();
97   }
98 
WaitForRecovery(uint64_t)99   bool WaitForRecovery(uint64_t /*abs_time_us*/) {
100     InstrumentedMutexLock l(&mutex_);
101     while (!recovery_complete_) {
102       cv_.Wait(/*abs_time_us*/);
103     }
104     if (recovery_complete_) {
105       recovery_complete_ = false;
106       return true;
107     }
108     return false;
109   }
110 
WaitForTableFileCreationStarted(uint64_t)111   void WaitForTableFileCreationStarted(uint64_t /*abs_time_us*/) {
112     InstrumentedMutexLock l(&mutex_);
113     while (!file_creation_started_) {
114       cv_.Wait(/*abs_time_us*/);
115     }
116     file_creation_started_ = false;
117   }
118 
OnBackgroundError(BackgroundErrorReason,Status * bg_error)119   void OnBackgroundError(BackgroundErrorReason /*reason*/,
120                          Status* bg_error) override {
121     if (override_bg_error_) {
122       *bg_error = bg_error_;
123       override_bg_error_ = false;
124     }
125   }
126 
EnableAutoRecovery(bool enable=true)127   void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; }
128 
OverrideBGError(Status bg_err)129   void OverrideBGError(Status bg_err) {
130     bg_error_ = bg_err;
131     override_bg_error_ = true;
132   }
133 
InjectFileCreationError(FaultInjectionTestFS * fs,int file_count,IOStatus io_s)134   void InjectFileCreationError(FaultInjectionTestFS* fs, int file_count,
135                                IOStatus io_s) {
136     fault_fs_ = fs;
137     file_count_ = file_count;
138     file_creation_error_ = io_s;
139   }
140 
141  private:
142   InstrumentedMutex mutex_;
143   InstrumentedCondVar cv_;
144   bool no_auto_recovery_;
145   bool recovery_complete_;
146   bool file_creation_started_;
147   bool override_bg_error_;
148   int file_count_;
149   IOStatus file_creation_error_;
150   Status bg_error_;
151   FaultInjectionTestFS* fault_fs_;
152 };
153 
TEST_F(DBErrorHandlingFSTest,FLushWriteError)154 TEST_F(DBErrorHandlingFSTest, FLushWriteError) {
155   std::shared_ptr<ErrorHandlerFSListener> listener(
156       new ErrorHandlerFSListener());
157   Options options = GetDefaultOptions();
158   options.env = fault_env_.get();
159   options.create_if_missing = true;
160   options.listeners.emplace_back(listener);
161   options.statistics = CreateDBStatistics();
162   Status s;
163 
164   listener->EnableAutoRecovery(false);
165   DestroyAndReopen(options);
166 
167   ASSERT_OK(Put(Key(0), "val"));
168   SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
169     fault_fs_->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
170   });
171   SyncPoint::GetInstance()->EnableProcessing();
172   s = Flush();
173   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
174   SyncPoint::GetInstance()->DisableProcessing();
175   fault_fs_->SetFilesystemActive(true);
176   s = dbfull()->Resume();
177   ASSERT_OK(s);
178   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
179                    ERROR_HANDLER_BG_ERROR_COUNT));
180   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
181                    ERROR_HANDLER_BG_IO_ERROR_COUNT));
182   ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
183                    ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
184   ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
185                    ERROR_HANDLER_AUTORESUME_COUNT));
186   ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
187                    ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
188   ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
189                    ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
190 
191   Reopen(options);
192   ASSERT_EQ("val", Get(Key(0)));
193   Destroy(options);
194 }
195 
TEST_F(DBErrorHandlingFSTest,FLushWriteRetryableError)196 TEST_F(DBErrorHandlingFSTest, FLushWriteRetryableError) {
197   std::shared_ptr<ErrorHandlerFSListener> listener(
198       new ErrorHandlerFSListener());
199   Options options = GetDefaultOptions();
200   options.env = fault_env_.get();
201   options.create_if_missing = true;
202   options.listeners.emplace_back(listener);
203   options.max_bgerror_resume_count = 0;
204   options.statistics = CreateDBStatistics();
205   Status s;
206 
207   listener->EnableAutoRecovery(false);
208   DestroyAndReopen(options);
209 
210   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
211   error_msg.SetRetryable(true);
212 
213   ASSERT_OK(Put(Key(1), "val1"));
214   SyncPoint::GetInstance()->SetCallBack(
215       "BuildTable:BeforeFinishBuildTable",
216       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
217   SyncPoint::GetInstance()->EnableProcessing();
218   s = Flush();
219   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
220   SyncPoint::GetInstance()->DisableProcessing();
221   fault_fs_->SetFilesystemActive(true);
222   s = dbfull()->Resume();
223   ASSERT_OK(s);
224   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
225                    ERROR_HANDLER_BG_ERROR_COUNT));
226   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
227                    ERROR_HANDLER_BG_IO_ERROR_COUNT));
228   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
229                    ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
230   ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
231                    ERROR_HANDLER_AUTORESUME_COUNT));
232   ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
233                    ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
234   ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
235                    ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
236   Reopen(options);
237   ASSERT_EQ("val1", Get(Key(1)));
238 
239   ASSERT_OK(Put(Key(2), "val2"));
240   SyncPoint::GetInstance()->SetCallBack(
241       "BuildTable:BeforeSyncTable",
242       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
243   SyncPoint::GetInstance()->EnableProcessing();
244   s = Flush();
245   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
246   SyncPoint::GetInstance()->DisableProcessing();
247   fault_fs_->SetFilesystemActive(true);
248   s = dbfull()->Resume();
249   ASSERT_OK(s);
250   Reopen(options);
251   ASSERT_EQ("val2", Get(Key(2)));
252 
253   ASSERT_OK(Put(Key(3), "val3"));
254   SyncPoint::GetInstance()->SetCallBack(
255       "BuildTable:BeforeCloseTableFile",
256       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
257   SyncPoint::GetInstance()->EnableProcessing();
258   s = Flush();
259   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
260   SyncPoint::GetInstance()->DisableProcessing();
261   fault_fs_->SetFilesystemActive(true);
262   s = dbfull()->Resume();
263   ASSERT_OK(s);
264   Reopen(options);
265   ASSERT_EQ("val3", Get(Key(3)));
266 
267   Destroy(options);
268 }
269 
TEST_F(DBErrorHandlingFSTest,FLushWriteFileScopeError)270 TEST_F(DBErrorHandlingFSTest, FLushWriteFileScopeError) {
271   std::shared_ptr<ErrorHandlerFSListener> listener(
272       new ErrorHandlerFSListener());
273   Options options = GetDefaultOptions();
274   options.env = fault_env_.get();
275   options.create_if_missing = true;
276   options.listeners.emplace_back(listener);
277   options.max_bgerror_resume_count = 0;
278   Status s;
279 
280   listener->EnableAutoRecovery(false);
281   DestroyAndReopen(options);
282 
283   IOStatus error_msg = IOStatus::IOError("File Scope Data Loss Error");
284   error_msg.SetDataLoss(true);
285   error_msg.SetScope(
286       ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFile);
287   error_msg.SetRetryable(false);
288 
289   ASSERT_OK(Put(Key(1), "val1"));
290   SyncPoint::GetInstance()->SetCallBack(
291       "BuildTable:BeforeFinishBuildTable",
292       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
293   SyncPoint::GetInstance()->EnableProcessing();
294   s = Flush();
295   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
296   SyncPoint::GetInstance()->DisableProcessing();
297   fault_fs_->SetFilesystemActive(true);
298   s = dbfull()->Resume();
299   ASSERT_OK(s);
300   Reopen(options);
301   ASSERT_EQ("val1", Get(Key(1)));
302 
303   ASSERT_OK(Put(Key(2), "val2"));
304   SyncPoint::GetInstance()->SetCallBack(
305       "BuildTable:BeforeSyncTable",
306       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
307   SyncPoint::GetInstance()->EnableProcessing();
308   s = Flush();
309   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
310   SyncPoint::GetInstance()->DisableProcessing();
311   fault_fs_->SetFilesystemActive(true);
312   s = dbfull()->Resume();
313   ASSERT_OK(s);
314   Reopen(options);
315   ASSERT_EQ("val2", Get(Key(2)));
316 
317   ASSERT_OK(Put(Key(3), "val3"));
318   SyncPoint::GetInstance()->SetCallBack(
319       "BuildTable:BeforeCloseTableFile",
320       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
321   SyncPoint::GetInstance()->EnableProcessing();
322   s = Flush();
323   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
324   SyncPoint::GetInstance()->DisableProcessing();
325   fault_fs_->SetFilesystemActive(true);
326   s = dbfull()->Resume();
327   ASSERT_OK(s);
328   Reopen(options);
329   ASSERT_EQ("val3", Get(Key(3)));
330 
331   // not file scope, but retyrable set
332   error_msg.SetDataLoss(false);
333   error_msg.SetScope(
334       ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFileSystem);
335   error_msg.SetRetryable(true);
336 
337   ASSERT_OK(Put(Key(3), "val3"));
338   SyncPoint::GetInstance()->SetCallBack(
339       "BuildTable:BeforeCloseTableFile",
340       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
341   SyncPoint::GetInstance()->EnableProcessing();
342   s = Flush();
343   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
344   SyncPoint::GetInstance()->DisableProcessing();
345   fault_fs_->SetFilesystemActive(true);
346   s = dbfull()->Resume();
347   ASSERT_OK(s);
348   Reopen(options);
349   ASSERT_EQ("val3", Get(Key(3)));
350 
351   Destroy(options);
352 }
353 
TEST_F(DBErrorHandlingFSTest,FLushWALWriteRetryableError)354 TEST_F(DBErrorHandlingFSTest, FLushWALWriteRetryableError) {
355   std::shared_ptr<ErrorHandlerFSListener> listener(
356       new ErrorHandlerFSListener());
357   Options options = GetDefaultOptions();
358   options.env = fault_env_.get();
359   options.create_if_missing = true;
360   options.listeners.emplace_back(listener);
361   options.max_bgerror_resume_count = 0;
362   Status s;
363 
364   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
365   error_msg.SetRetryable(true);
366 
367   listener->EnableAutoRecovery(false);
368   SyncPoint::GetInstance()->SetCallBack(
369       "DBImpl::SyncClosedLogs:Start",
370       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
371   SyncPoint::GetInstance()->EnableProcessing();
372 
373   CreateAndReopenWithCF({"pikachu, sdfsdfsdf"}, options);
374 
375   WriteOptions wo = WriteOptions();
376   wo.disableWAL = false;
377   ASSERT_OK(Put(Key(1), "val1", wo));
378 
379   s = Flush();
380   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
381   SyncPoint::GetInstance()->DisableProcessing();
382   fault_fs_->SetFilesystemActive(true);
383   auto cfh = dbfull()->GetColumnFamilyHandle(1);
384   s = dbfull()->DropColumnFamily(cfh);
385 
386   s = dbfull()->Resume();
387   ASSERT_OK(s);
388   ASSERT_EQ("val1", Get(Key(1)));
389   ASSERT_OK(Put(Key(3), "val3", wo));
390   ASSERT_EQ("val3", Get(Key(3)));
391   s = Flush();
392   ASSERT_OK(s);
393   ASSERT_EQ("val3", Get(Key(3)));
394 
395   Destroy(options);
396 }
397 
TEST_F(DBErrorHandlingFSTest,FLushWALAtomicWriteRetryableError)398 TEST_F(DBErrorHandlingFSTest, FLushWALAtomicWriteRetryableError) {
399   std::shared_ptr<ErrorHandlerFSListener> listener(
400       new ErrorHandlerFSListener());
401   Options options = GetDefaultOptions();
402   options.env = fault_env_.get();
403   options.create_if_missing = true;
404   options.listeners.emplace_back(listener);
405   options.max_bgerror_resume_count = 0;
406   options.atomic_flush = true;
407   Status s;
408 
409   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
410   error_msg.SetRetryable(true);
411 
412   listener->EnableAutoRecovery(false);
413   SyncPoint::GetInstance()->SetCallBack(
414       "DBImpl::SyncClosedLogs:Start",
415       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
416   SyncPoint::GetInstance()->EnableProcessing();
417 
418   CreateAndReopenWithCF({"pikachu, sdfsdfsdf"}, options);
419 
420   WriteOptions wo = WriteOptions();
421   wo.disableWAL = false;
422   ASSERT_OK(Put(Key(1), "val1", wo));
423 
424   s = Flush();
425   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
426   SyncPoint::GetInstance()->DisableProcessing();
427   fault_fs_->SetFilesystemActive(true);
428   auto cfh = dbfull()->GetColumnFamilyHandle(1);
429   s = dbfull()->DropColumnFamily(cfh);
430 
431   s = dbfull()->Resume();
432   ASSERT_OK(s);
433   ASSERT_EQ("val1", Get(Key(1)));
434   ASSERT_OK(Put(Key(3), "val3", wo));
435   ASSERT_EQ("val3", Get(Key(3)));
436   s = Flush();
437   ASSERT_OK(s);
438   ASSERT_EQ("val3", Get(Key(3)));
439 
440   Destroy(options);
441 }
442 
TEST_F(DBErrorHandlingFSTest,FLushWritNoWALRetryableError1)443 TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError1) {
444   std::shared_ptr<ErrorHandlerFSListener> listener(
445       new ErrorHandlerFSListener());
446   Options options = GetDefaultOptions();
447   options.env = fault_env_.get();
448   options.create_if_missing = true;
449   options.listeners.emplace_back(listener);
450   options.max_bgerror_resume_count = 0;
451   options.statistics = CreateDBStatistics();
452   Status s;
453 
454   listener->EnableAutoRecovery(false);
455   DestroyAndReopen(options);
456 
457   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
458   error_msg.SetRetryable(true);
459 
460   WriteOptions wo = WriteOptions();
461   wo.disableWAL = true;
462   ASSERT_OK(Put(Key(1), "val1", wo));
463   SyncPoint::GetInstance()->SetCallBack(
464       "BuildTable:BeforeFinishBuildTable",
465       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
466   SyncPoint::GetInstance()->EnableProcessing();
467   s = Flush();
468   ASSERT_OK(Put(Key(2), "val2", wo));
469   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
470   ASSERT_EQ("val2", Get(Key(2)));
471   SyncPoint::GetInstance()->DisableProcessing();
472   fault_fs_->SetFilesystemActive(true);
473   s = dbfull()->Resume();
474   ASSERT_OK(s);
475   ASSERT_EQ("val1", Get(Key(1)));
476   ASSERT_EQ("val2", Get(Key(2)));
477   ASSERT_OK(Put(Key(3), "val3", wo));
478   ASSERT_EQ("val3", Get(Key(3)));
479   s = Flush();
480   ASSERT_OK(s);
481   ASSERT_EQ("val3", Get(Key(3)));
482   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
483                    ERROR_HANDLER_BG_ERROR_COUNT));
484   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
485                    ERROR_HANDLER_BG_IO_ERROR_COUNT));
486   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
487                    ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
488   ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
489                    ERROR_HANDLER_AUTORESUME_COUNT));
490   ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
491                    ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
492   ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
493                    ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
494 
495   Destroy(options);
496 }
497 
TEST_F(DBErrorHandlingFSTest,FLushWriteNoWALRetryableError2)498 TEST_F(DBErrorHandlingFSTest, FLushWriteNoWALRetryableError2) {
499   std::shared_ptr<ErrorHandlerFSListener> listener(
500       new ErrorHandlerFSListener());
501   Options options = GetDefaultOptions();
502   options.env = fault_env_.get();
503   options.create_if_missing = true;
504   options.listeners.emplace_back(listener);
505   options.max_bgerror_resume_count = 0;
506   Status s;
507 
508   listener->EnableAutoRecovery(false);
509   DestroyAndReopen(options);
510 
511   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
512   error_msg.SetRetryable(true);
513 
514   WriteOptions wo = WriteOptions();
515   wo.disableWAL = true;
516 
517   ASSERT_OK(Put(Key(1), "val1", wo));
518   SyncPoint::GetInstance()->SetCallBack(
519       "BuildTable:BeforeSyncTable",
520       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
521   SyncPoint::GetInstance()->EnableProcessing();
522   s = Flush();
523   ASSERT_OK(Put(Key(2), "val2", wo));
524   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
525   ASSERT_EQ("val2", Get(Key(2)));
526   SyncPoint::GetInstance()->DisableProcessing();
527   fault_fs_->SetFilesystemActive(true);
528   s = dbfull()->Resume();
529   ASSERT_OK(s);
530   ASSERT_EQ("val1", Get(Key(1)));
531   ASSERT_EQ("val2", Get(Key(2)));
532   ASSERT_OK(Put(Key(3), "val3", wo));
533   ASSERT_EQ("val3", Get(Key(3)));
534   s = Flush();
535   ASSERT_OK(s);
536   ASSERT_EQ("val3", Get(Key(3)));
537 
538   Destroy(options);
539 }
540 
TEST_F(DBErrorHandlingFSTest,FLushWriteNoWALRetryableError3)541 TEST_F(DBErrorHandlingFSTest, FLushWriteNoWALRetryableError3) {
542   std::shared_ptr<ErrorHandlerFSListener> listener(
543       new ErrorHandlerFSListener());
544   Options options = GetDefaultOptions();
545   options.env = fault_env_.get();
546   options.create_if_missing = true;
547   options.listeners.emplace_back(listener);
548   options.max_bgerror_resume_count = 0;
549   Status s;
550 
551   listener->EnableAutoRecovery(false);
552   DestroyAndReopen(options);
553 
554   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
555   error_msg.SetRetryable(true);
556 
557   WriteOptions wo = WriteOptions();
558   wo.disableWAL = true;
559 
560   ASSERT_OK(Put(Key(1), "val1", wo));
561   SyncPoint::GetInstance()->SetCallBack(
562       "BuildTable:BeforeCloseTableFile",
563       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
564   SyncPoint::GetInstance()->EnableProcessing();
565   s = Flush();
566   ASSERT_OK(Put(Key(2), "val2", wo));
567   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
568   ASSERT_EQ("val2", Get(Key(2)));
569   SyncPoint::GetInstance()->DisableProcessing();
570   fault_fs_->SetFilesystemActive(true);
571   s = dbfull()->Resume();
572   ASSERT_OK(s);
573   ASSERT_EQ("val1", Get(Key(1)));
574   ASSERT_EQ("val2", Get(Key(2)));
575   ASSERT_OK(Put(Key(3), "val3", wo));
576   ASSERT_EQ("val3", Get(Key(3)));
577   s = Flush();
578   ASSERT_OK(s);
579   ASSERT_EQ("val3", Get(Key(3)));
580 
581   Destroy(options);
582 }
583 
TEST_F(DBErrorHandlingFSTest,ManifestWriteError)584 TEST_F(DBErrorHandlingFSTest, ManifestWriteError) {
585   std::shared_ptr<ErrorHandlerFSListener> listener(
586       new ErrorHandlerFSListener());
587   Options options = GetDefaultOptions();
588   options.env = fault_env_.get();
589   options.create_if_missing = true;
590   options.listeners.emplace_back(listener);
591   Status s;
592   std::string old_manifest;
593   std::string new_manifest;
594 
595   listener->EnableAutoRecovery(false);
596   DestroyAndReopen(options);
597   old_manifest = GetManifestNameFromLiveFiles();
598 
599   ASSERT_OK(Put(Key(0), "val"));
600   ASSERT_OK(Flush());
601   ASSERT_OK(Put(Key(1), "val"));
602   SyncPoint::GetInstance()->SetCallBack(
603       "VersionSet::LogAndApply:WriteManifest", [&](void*) {
604         fault_fs_->SetFilesystemActive(false,
605                                        IOStatus::NoSpace("Out of space"));
606       });
607   SyncPoint::GetInstance()->EnableProcessing();
608   s = Flush();
609   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
610   SyncPoint::GetInstance()->ClearAllCallBacks();
611   SyncPoint::GetInstance()->DisableProcessing();
612   fault_fs_->SetFilesystemActive(true);
613   s = dbfull()->Resume();
614   ASSERT_OK(s);
615 
616   new_manifest = GetManifestNameFromLiveFiles();
617   ASSERT_NE(new_manifest, old_manifest);
618 
619   Reopen(options);
620   ASSERT_EQ("val", Get(Key(0)));
621   ASSERT_EQ("val", Get(Key(1)));
622   Close();
623 }
624 
TEST_F(DBErrorHandlingFSTest,ManifestWriteRetryableError)625 TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableError) {
626   std::shared_ptr<ErrorHandlerFSListener> listener(
627       new ErrorHandlerFSListener());
628   Options options = GetDefaultOptions();
629   options.env = fault_env_.get();
630   options.create_if_missing = true;
631   options.listeners.emplace_back(listener);
632   options.max_bgerror_resume_count = 0;
633   Status s;
634   std::string old_manifest;
635   std::string new_manifest;
636 
637   listener->EnableAutoRecovery(false);
638   DestroyAndReopen(options);
639   old_manifest = GetManifestNameFromLiveFiles();
640 
641   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
642   error_msg.SetRetryable(true);
643 
644   ASSERT_OK(Put(Key(0), "val"));
645   ASSERT_OK(Flush());
646   ASSERT_OK(Put(Key(1), "val"));
647   SyncPoint::GetInstance()->SetCallBack(
648       "VersionSet::LogAndApply:WriteManifest",
649       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
650   SyncPoint::GetInstance()->EnableProcessing();
651   s = Flush();
652   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
653   SyncPoint::GetInstance()->ClearAllCallBacks();
654   SyncPoint::GetInstance()->DisableProcessing();
655   fault_fs_->SetFilesystemActive(true);
656   s = dbfull()->Resume();
657   ASSERT_OK(s);
658 
659   new_manifest = GetManifestNameFromLiveFiles();
660   ASSERT_NE(new_manifest, old_manifest);
661 
662   Reopen(options);
663   ASSERT_EQ("val", Get(Key(0)));
664   ASSERT_EQ("val", Get(Key(1)));
665   Close();
666 }
667 
TEST_F(DBErrorHandlingFSTest,ManifestWriteFileScopeError)668 TEST_F(DBErrorHandlingFSTest, ManifestWriteFileScopeError) {
669   std::shared_ptr<ErrorHandlerFSListener> listener(
670       new ErrorHandlerFSListener());
671   Options options = GetDefaultOptions();
672   options.env = fault_env_.get();
673   options.create_if_missing = true;
674   options.listeners.emplace_back(listener);
675   options.max_bgerror_resume_count = 0;
676   Status s;
677   std::string old_manifest;
678   std::string new_manifest;
679 
680   listener->EnableAutoRecovery(false);
681   DestroyAndReopen(options);
682   old_manifest = GetManifestNameFromLiveFiles();
683 
684   IOStatus error_msg = IOStatus::IOError("File Scope Data Loss Error");
685   error_msg.SetDataLoss(true);
686   error_msg.SetScope(
687       ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFile);
688   error_msg.SetRetryable(false);
689 
690   ASSERT_OK(Put(Key(0), "val"));
691   ASSERT_OK(Flush());
692   ASSERT_OK(Put(Key(1), "val"));
693   SyncPoint::GetInstance()->SetCallBack(
694       "VersionSet::LogAndApply:WriteManifest",
695       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
696   SyncPoint::GetInstance()->EnableProcessing();
697   s = Flush();
698   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
699   SyncPoint::GetInstance()->ClearAllCallBacks();
700   SyncPoint::GetInstance()->DisableProcessing();
701   fault_fs_->SetFilesystemActive(true);
702   s = dbfull()->Resume();
703   ASSERT_OK(s);
704 
705   new_manifest = GetManifestNameFromLiveFiles();
706   ASSERT_NE(new_manifest, old_manifest);
707 
708   Reopen(options);
709   ASSERT_EQ("val", Get(Key(0)));
710   ASSERT_EQ("val", Get(Key(1)));
711   Close();
712 }
713 
TEST_F(DBErrorHandlingFSTest,ManifestWriteNoWALRetryableError)714 TEST_F(DBErrorHandlingFSTest, ManifestWriteNoWALRetryableError) {
715   std::shared_ptr<ErrorHandlerFSListener> listener(
716       new ErrorHandlerFSListener());
717   Options options = GetDefaultOptions();
718   options.env = fault_env_.get();
719   options.create_if_missing = true;
720   options.listeners.emplace_back(listener);
721   options.max_bgerror_resume_count = 0;
722   Status s;
723   std::string old_manifest;
724   std::string new_manifest;
725 
726   listener->EnableAutoRecovery(false);
727   DestroyAndReopen(options);
728   old_manifest = GetManifestNameFromLiveFiles();
729 
730   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
731   error_msg.SetRetryable(true);
732 
733   WriteOptions wo = WriteOptions();
734   wo.disableWAL = true;
735   ASSERT_OK(Put(Key(0), "val", wo));
736   ASSERT_OK(Flush());
737   ASSERT_OK(Put(Key(1), "val", wo));
738   SyncPoint::GetInstance()->SetCallBack(
739       "VersionSet::LogAndApply:WriteManifest",
740       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
741   SyncPoint::GetInstance()->EnableProcessing();
742   s = Flush();
743   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
744   SyncPoint::GetInstance()->ClearAllCallBacks();
745   SyncPoint::GetInstance()->DisableProcessing();
746   fault_fs_->SetFilesystemActive(true);
747   s = dbfull()->Resume();
748   ASSERT_OK(s);
749 
750   new_manifest = GetManifestNameFromLiveFiles();
751   ASSERT_NE(new_manifest, old_manifest);
752 
753   Reopen(options);
754   ASSERT_EQ("val", Get(Key(0)));
755   ASSERT_EQ("val", Get(Key(1)));
756   Close();
757 }
758 
TEST_F(DBErrorHandlingFSTest,DoubleManifestWriteError)759 TEST_F(DBErrorHandlingFSTest, DoubleManifestWriteError) {
760   std::shared_ptr<ErrorHandlerFSListener> listener(
761       new ErrorHandlerFSListener());
762   Options options = GetDefaultOptions();
763   options.env = fault_env_.get();
764   options.create_if_missing = true;
765   options.listeners.emplace_back(listener);
766   Status s;
767   std::string old_manifest;
768   std::string new_manifest;
769 
770   listener->EnableAutoRecovery(false);
771   DestroyAndReopen(options);
772   old_manifest = GetManifestNameFromLiveFiles();
773 
774   ASSERT_OK(Put(Key(0), "val"));
775   ASSERT_OK(Flush());
776   ASSERT_OK(Put(Key(1), "val"));
777   SyncPoint::GetInstance()->SetCallBack(
778       "VersionSet::LogAndApply:WriteManifest", [&](void*) {
779         fault_fs_->SetFilesystemActive(false,
780                                        IOStatus::NoSpace("Out of space"));
781       });
782   SyncPoint::GetInstance()->EnableProcessing();
783   s = Flush();
784   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
785   fault_fs_->SetFilesystemActive(true);
786 
787   // This Resume() will attempt to create a new manifest file and fail again
788   s = dbfull()->Resume();
789   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
790   fault_fs_->SetFilesystemActive(true);
791   SyncPoint::GetInstance()->ClearAllCallBacks();
792   SyncPoint::GetInstance()->DisableProcessing();
793 
794   // A successful Resume() will create a new manifest file
795   s = dbfull()->Resume();
796   ASSERT_OK(s);
797 
798   new_manifest = GetManifestNameFromLiveFiles();
799   ASSERT_NE(new_manifest, old_manifest);
800 
801   Reopen(options);
802   ASSERT_EQ("val", Get(Key(0)));
803   ASSERT_EQ("val", Get(Key(1)));
804   Close();
805 }
806 
TEST_F(DBErrorHandlingFSTest,CompactionManifestWriteError)807 TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) {
808   if (mem_env_ != nullptr) {
809     ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
810     return;
811   }
812   std::shared_ptr<ErrorHandlerFSListener> listener(
813       new ErrorHandlerFSListener());
814   Options options = GetDefaultOptions();
815   options.env = fault_env_.get();
816   options.create_if_missing = true;
817   options.level0_file_num_compaction_trigger = 2;
818   options.listeners.emplace_back(listener);
819   Status s;
820   std::string old_manifest;
821   std::string new_manifest;
822   std::atomic<bool> fail_manifest(false);
823   DestroyAndReopen(options);
824   old_manifest = GetManifestNameFromLiveFiles();
825 
826   ASSERT_OK(Put(Key(0), "val"));
827   ASSERT_OK(Put(Key(2), "val"));
828   s = Flush();
829   ASSERT_OK(s);
830 
831   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
832       // Wait for flush of 2nd L0 file before starting compaction
833       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
834         "BackgroundCallCompaction:0"},
835        // Wait for compaction to detect manifest write error
836        {"BackgroundCallCompaction:1", "CompactionManifestWriteError:0"},
837        // Make compaction thread wait for error to be cleared
838        {"CompactionManifestWriteError:1",
839         "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"},
840        // Wait for DB instance to clear bg_error before calling
841        // TEST_WaitForCompact
842        {"SstFileManagerImpl::ErrorCleared", "CompactionManifestWriteError:2"}});
843   // trigger manifest write failure in compaction thread
844   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
845       "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
846   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
847       "VersionSet::LogAndApply:WriteManifest", [&](void*) {
848         if (fail_manifest.load()) {
849           fault_fs_->SetFilesystemActive(false,
850                                          IOStatus::NoSpace("Out of space"));
851         }
852       });
853   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
854 
855   ASSERT_OK(Put(Key(1), "val"));
856   // This Flush will trigger a compaction, which will fail when appending to
857   // the manifest
858   s = Flush();
859   ASSERT_OK(s);
860 
861   TEST_SYNC_POINT("CompactionManifestWriteError:0");
862   // Clear all errors so when the compaction is retried, it will succeed
863   fault_fs_->SetFilesystemActive(true);
864   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
865   TEST_SYNC_POINT("CompactionManifestWriteError:1");
866   TEST_SYNC_POINT("CompactionManifestWriteError:2");
867 
868   s = dbfull()->TEST_WaitForCompact();
869   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
870   ASSERT_OK(s);
871 
872   new_manifest = GetManifestNameFromLiveFiles();
873   ASSERT_NE(new_manifest, old_manifest);
874   Reopen(options);
875   ASSERT_EQ("val", Get(Key(0)));
876   ASSERT_EQ("val", Get(Key(1)));
877   ASSERT_EQ("val", Get(Key(2)));
878   Close();
879 }
880 
TEST_F(DBErrorHandlingFSTest,CompactionManifestWriteRetryableError)881 TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) {
882   std::shared_ptr<ErrorHandlerFSListener> listener(
883       new ErrorHandlerFSListener());
884   Options options = GetDefaultOptions();
885   options.env = fault_env_.get();
886   options.create_if_missing = true;
887   options.level0_file_num_compaction_trigger = 2;
888   options.listeners.emplace_back(listener);
889   options.max_bgerror_resume_count = 0;
890   Status s;
891   std::string old_manifest;
892   std::string new_manifest;
893   std::atomic<bool> fail_manifest(false);
894   DestroyAndReopen(options);
895   old_manifest = GetManifestNameFromLiveFiles();
896 
897   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
898   error_msg.SetRetryable(true);
899 
900   ASSERT_OK(Put(Key(0), "val"));
901   ASSERT_OK(Put(Key(2), "val"));
902   s = Flush();
903   ASSERT_OK(s);
904 
905   listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
906   listener->EnableAutoRecovery(false);
907   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
908       // Wait for flush of 2nd L0 file before starting compaction
909       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
910         "BackgroundCallCompaction:0"},
911        // Wait for compaction to detect manifest write error
912        {"BackgroundCallCompaction:1", "CompactionManifestWriteError:0"},
913        // Make compaction thread wait for error to be cleared
914        {"CompactionManifestWriteError:1",
915         "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"}});
916   // trigger manifest write failure in compaction thread
917   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
918       "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
919   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
920       "VersionSet::LogAndApply:WriteManifest", [&](void*) {
921         if (fail_manifest.load()) {
922           fault_fs_->SetFilesystemActive(false, error_msg);
923         }
924       });
925   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
926 
927   ASSERT_OK(Put(Key(1), "val"));
928   s = Flush();
929   ASSERT_OK(s);
930 
931   TEST_SYNC_POINT("CompactionManifestWriteError:0");
932   TEST_SYNC_POINT("CompactionManifestWriteError:1");
933 
934   s = dbfull()->TEST_WaitForCompact();
935   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
936 
937   fault_fs_->SetFilesystemActive(true);
938   SyncPoint::GetInstance()->ClearAllCallBacks();
939   SyncPoint::GetInstance()->DisableProcessing();
940   s = dbfull()->Resume();
941   ASSERT_OK(s);
942 
943   new_manifest = GetManifestNameFromLiveFiles();
944   ASSERT_NE(new_manifest, old_manifest);
945 
946   Reopen(options);
947   ASSERT_EQ("val", Get(Key(0)));
948   ASSERT_EQ("val", Get(Key(1)));
949   ASSERT_EQ("val", Get(Key(2)));
950   Close();
951 }
952 
TEST_F(DBErrorHandlingFSTest,CompactionWriteError)953 TEST_F(DBErrorHandlingFSTest, CompactionWriteError) {
954   std::shared_ptr<ErrorHandlerFSListener> listener(
955       new ErrorHandlerFSListener());
956   Options options = GetDefaultOptions();
957   options.env = fault_env_.get();
958   options.create_if_missing = true;
959   options.level0_file_num_compaction_trigger = 2;
960   options.listeners.emplace_back(listener);
961   Status s;
962   DestroyAndReopen(options);
963 
964   ASSERT_OK(Put(Key(0), "va;"));
965   ASSERT_OK(Put(Key(2), "va;"));
966   s = Flush();
967   ASSERT_OK(s);
968 
969   listener->OverrideBGError(
970       Status(Status::NoSpace(), Status::Severity::kHardError));
971   listener->EnableAutoRecovery(false);
972   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
973       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
974         "BackgroundCallCompaction:0"}});
975   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
976       "BackgroundCallCompaction:0", [&](void*) {
977         fault_fs_->SetFilesystemActive(false,
978                                        IOStatus::NoSpace("Out of space"));
979       });
980   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
981 
982   ASSERT_OK(Put(Key(1), "val"));
983   s = Flush();
984   ASSERT_OK(s);
985 
986   s = dbfull()->TEST_WaitForCompact();
987   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
988 
989   fault_fs_->SetFilesystemActive(true);
990   s = dbfull()->Resume();
991   ASSERT_OK(s);
992   Destroy(options);
993 }
994 
TEST_F(DBErrorHandlingFSTest,DISABLED_CompactionWriteRetryableError)995 TEST_F(DBErrorHandlingFSTest, DISABLED_CompactionWriteRetryableError) {
996   std::shared_ptr<ErrorHandlerFSListener> listener(
997       new ErrorHandlerFSListener());
998   Options options = GetDefaultOptions();
999   options.env = fault_env_.get();
1000   options.create_if_missing = true;
1001   options.level0_file_num_compaction_trigger = 2;
1002   options.listeners.emplace_back(listener);
1003   options.max_bgerror_resume_count = 0;
1004   Status s;
1005   DestroyAndReopen(options);
1006 
1007   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1008   error_msg.SetRetryable(true);
1009 
1010   ASSERT_OK(Put(Key(0), "va;"));
1011   ASSERT_OK(Put(Key(2), "va;"));
1012   s = Flush();
1013   ASSERT_OK(s);
1014 
1015   listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
1016   listener->EnableAutoRecovery(false);
1017   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1018       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
1019         "BackgroundCallCompaction:0"}});
1020   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1021       "CompactionJob::OpenCompactionOutputFile",
1022       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1023   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1024       "DBImpl::BackgroundCompaction:Finish",
1025       [&](void*) { CancelAllBackgroundWork(dbfull()); });
1026   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1027 
1028   ASSERT_OK(Put(Key(1), "val"));
1029   s = Flush();
1030   ASSERT_OK(s);
1031 
1032   s = dbfull()->TEST_GetBGError();
1033   ASSERT_OK(s);
1034   fault_fs_->SetFilesystemActive(true);
1035   SyncPoint::GetInstance()->ClearAllCallBacks();
1036   SyncPoint::GetInstance()->DisableProcessing();
1037   s = dbfull()->Resume();
1038   ASSERT_OK(s);
1039   Destroy(options);
1040 }
1041 
TEST_F(DBErrorHandlingFSTest,DISABLED_CompactionWriteFileScopeError)1042 TEST_F(DBErrorHandlingFSTest, DISABLED_CompactionWriteFileScopeError) {
1043   std::shared_ptr<ErrorHandlerFSListener> listener(
1044       new ErrorHandlerFSListener());
1045   Options options = GetDefaultOptions();
1046   options.env = fault_env_.get();
1047   options.create_if_missing = true;
1048   options.level0_file_num_compaction_trigger = 2;
1049   options.listeners.emplace_back(listener);
1050   options.max_bgerror_resume_count = 0;
1051   Status s;
1052   DestroyAndReopen(options);
1053 
1054   IOStatus error_msg = IOStatus::IOError("File Scope Data Loss Error");
1055   error_msg.SetDataLoss(true);
1056   error_msg.SetScope(
1057       ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFile);
1058   error_msg.SetRetryable(false);
1059 
1060   ASSERT_OK(Put(Key(0), "va;"));
1061   ASSERT_OK(Put(Key(2), "va;"));
1062   s = Flush();
1063   ASSERT_OK(s);
1064 
1065   listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
1066   listener->EnableAutoRecovery(false);
1067   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1068       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
1069         "BackgroundCallCompaction:0"}});
1070   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1071       "CompactionJob::OpenCompactionOutputFile",
1072       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1073   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1074       "DBImpl::BackgroundCompaction:Finish",
1075       [&](void*) { CancelAllBackgroundWork(dbfull()); });
1076   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1077 
1078   ASSERT_OK(Put(Key(1), "val"));
1079   s = Flush();
1080   ASSERT_OK(s);
1081 
1082   s = dbfull()->TEST_GetBGError();
1083   ASSERT_OK(s);
1084 
1085   fault_fs_->SetFilesystemActive(true);
1086   SyncPoint::GetInstance()->ClearAllCallBacks();
1087   SyncPoint::GetInstance()->DisableProcessing();
1088   s = dbfull()->Resume();
1089   ASSERT_OK(s);
1090   Destroy(options);
1091 }
1092 
TEST_F(DBErrorHandlingFSTest,CorruptionError)1093 TEST_F(DBErrorHandlingFSTest, CorruptionError) {
1094   Options options = GetDefaultOptions();
1095   options.env = fault_env_.get();
1096   options.create_if_missing = true;
1097   options.level0_file_num_compaction_trigger = 2;
1098   Status s;
1099   DestroyAndReopen(options);
1100 
1101   ASSERT_OK(Put(Key(0), "va;"));
1102   ASSERT_OK(Put(Key(2), "va;"));
1103   s = Flush();
1104   ASSERT_OK(s);
1105 
1106   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1107       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
1108         "BackgroundCallCompaction:0"}});
1109   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1110       "BackgroundCallCompaction:0", [&](void*) {
1111         fault_fs_->SetFilesystemActive(false,
1112                                        IOStatus::Corruption("Corruption"));
1113       });
1114   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1115 
1116   ASSERT_OK(Put(Key(1), "val"));
1117   s = Flush();
1118   ASSERT_OK(s);
1119 
1120   s = dbfull()->TEST_WaitForCompact();
1121   ASSERT_EQ(s.severity(),
1122             ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
1123 
1124   fault_fs_->SetFilesystemActive(true);
1125   s = dbfull()->Resume();
1126   ASSERT_NOK(s);
1127   Destroy(options);
1128 }
1129 
TEST_F(DBErrorHandlingFSTest,AutoRecoverFlushError)1130 TEST_F(DBErrorHandlingFSTest, AutoRecoverFlushError) {
1131   if (mem_env_ != nullptr) {
1132     ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
1133     return;
1134   }
1135   std::shared_ptr<ErrorHandlerFSListener> listener(
1136       new ErrorHandlerFSListener());
1137   Options options = GetDefaultOptions();
1138   options.env = fault_env_.get();
1139   options.create_if_missing = true;
1140   options.listeners.emplace_back(listener);
1141   options.statistics = CreateDBStatistics();
1142   Status s;
1143 
1144   listener->EnableAutoRecovery();
1145   DestroyAndReopen(options);
1146 
1147   ASSERT_OK(Put(Key(0), "val"));
1148   SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
1149     fault_fs_->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
1150   });
1151   SyncPoint::GetInstance()->EnableProcessing();
1152   s = Flush();
1153   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
1154   SyncPoint::GetInstance()->DisableProcessing();
1155   fault_fs_->SetFilesystemActive(true);
1156   ASSERT_EQ(listener->WaitForRecovery(5000000), true);
1157 
1158   s = Put(Key(1), "val");
1159   ASSERT_OK(s);
1160   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
1161                    ERROR_HANDLER_BG_ERROR_COUNT));
1162   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
1163                    ERROR_HANDLER_BG_IO_ERROR_COUNT));
1164   ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
1165                    ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
1166   ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
1167                    ERROR_HANDLER_AUTORESUME_COUNT));
1168   ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
1169                    ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
1170   ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
1171                    ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
1172 
1173   Reopen(options);
1174   ASSERT_EQ("val", Get(Key(0)));
1175   ASSERT_EQ("val", Get(Key(1)));
1176   Destroy(options);
1177 }
1178 
TEST_F(DBErrorHandlingFSTest,FailRecoverFlushError)1179 TEST_F(DBErrorHandlingFSTest, FailRecoverFlushError) {
1180   std::shared_ptr<ErrorHandlerFSListener> listener(
1181       new ErrorHandlerFSListener());
1182   Options options = GetDefaultOptions();
1183   options.env = fault_env_.get();
1184   options.create_if_missing = true;
1185   options.listeners.emplace_back(listener);
1186   Status s;
1187 
1188   listener->EnableAutoRecovery();
1189   DestroyAndReopen(options);
1190 
1191   ASSERT_OK(Put(Key(0), "val"));
1192   SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
1193     fault_fs_->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
1194   });
1195   SyncPoint::GetInstance()->EnableProcessing();
1196   s = Flush();
1197   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
1198   // We should be able to shutdown the database while auto recovery is going
1199   // on in the background
1200   Close();
1201   DestroyDB(dbname_, options).PermitUncheckedError();
1202 }
1203 
TEST_F(DBErrorHandlingFSTest,WALWriteError)1204 TEST_F(DBErrorHandlingFSTest, WALWriteError) {
1205   if (mem_env_ != nullptr) {
1206     ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
1207     return;
1208   }
1209   std::shared_ptr<ErrorHandlerFSListener> listener(
1210       new ErrorHandlerFSListener());
1211   Options options = GetDefaultOptions();
1212   options.env = fault_env_.get();
1213   options.create_if_missing = true;
1214   options.writable_file_max_buffer_size = 32768;
1215   options.listeners.emplace_back(listener);
1216   Status s;
1217   Random rnd(301);
1218 
1219   listener->EnableAutoRecovery();
1220   DestroyAndReopen(options);
1221 
1222   {
1223     WriteBatch batch;
1224 
1225     for (auto i = 0; i < 100; ++i) {
1226       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
1227     }
1228 
1229     WriteOptions wopts;
1230     wopts.sync = true;
1231     ASSERT_OK(dbfull()->Write(wopts, &batch));
1232   };
1233 
1234   {
1235     WriteBatch batch;
1236     int write_error = 0;
1237 
1238     for (auto i = 100; i < 199; ++i) {
1239       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
1240     }
1241 
1242     SyncPoint::GetInstance()->SetCallBack(
1243         "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
1244           write_error++;
1245           if (write_error > 2) {
1246             fault_fs_->SetFilesystemActive(false,
1247                                            IOStatus::NoSpace("Out of space"));
1248           }
1249         });
1250     SyncPoint::GetInstance()->EnableProcessing();
1251     WriteOptions wopts;
1252     wopts.sync = true;
1253     s = dbfull()->Write(wopts, &batch);
1254     ASSERT_EQ(s, s.NoSpace());
1255   }
1256   SyncPoint::GetInstance()->DisableProcessing();
1257   fault_fs_->SetFilesystemActive(true);
1258   ASSERT_EQ(listener->WaitForRecovery(5000000), true);
1259   for (auto i = 0; i < 199; ++i) {
1260     if (i < 100) {
1261       ASSERT_NE(Get(Key(i)), "NOT_FOUND");
1262     } else {
1263       ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
1264     }
1265   }
1266   Reopen(options);
1267   for (auto i = 0; i < 199; ++i) {
1268     if (i < 100) {
1269       ASSERT_NE(Get(Key(i)), "NOT_FOUND");
1270     } else {
1271       ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
1272     }
1273   }
1274   Close();
1275 }
1276 
TEST_F(DBErrorHandlingFSTest,WALWriteRetryableError)1277 TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) {
1278   std::shared_ptr<ErrorHandlerFSListener> listener(
1279       new ErrorHandlerFSListener());
1280   Options options = GetDefaultOptions();
1281   options.env = fault_env_.get();
1282   options.create_if_missing = true;
1283   options.writable_file_max_buffer_size = 32768;
1284   options.listeners.emplace_back(listener);
1285   options.paranoid_checks = true;
1286   options.max_bgerror_resume_count = 0;
1287   Random rnd(301);
1288 
1289   DestroyAndReopen(options);
1290 
1291   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1292   error_msg.SetRetryable(true);
1293 
1294   // For the first batch, write is successful, require sync
1295   {
1296     WriteBatch batch;
1297 
1298     for (auto i = 0; i < 100; ++i) {
1299       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
1300     }
1301 
1302     WriteOptions wopts;
1303     wopts.sync = true;
1304     ASSERT_OK(dbfull()->Write(wopts, &batch));
1305   };
1306 
1307   // For the second batch, the first 2 file Append are successful, then the
1308   // following Append fails due to file system retryable IOError.
1309   {
1310     WriteBatch batch;
1311     int write_error = 0;
1312 
1313     for (auto i = 100; i < 200; ++i) {
1314       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
1315     }
1316 
1317     SyncPoint::GetInstance()->SetCallBack(
1318         "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
1319           write_error++;
1320           if (write_error > 2) {
1321             fault_fs_->SetFilesystemActive(false, error_msg);
1322           }
1323         });
1324     SyncPoint::GetInstance()->EnableProcessing();
1325     WriteOptions wopts;
1326     wopts.sync = true;
1327     Status s = dbfull()->Write(wopts, &batch);
1328     ASSERT_TRUE(s.IsIOError());
1329   }
1330   fault_fs_->SetFilesystemActive(true);
1331   SyncPoint::GetInstance()->ClearAllCallBacks();
1332   SyncPoint::GetInstance()->DisableProcessing();
1333 
1334   // Data in corrupted WAL are not stored
1335   for (auto i = 0; i < 199; ++i) {
1336     if (i < 100) {
1337       ASSERT_NE(Get(Key(i)), "NOT_FOUND");
1338     } else {
1339       ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
1340     }
1341   }
1342 
1343   // Resume and write a new batch, should be in the WAL
1344   ASSERT_OK(dbfull()->Resume());
1345   {
1346     WriteBatch batch;
1347 
1348     for (auto i = 200; i < 300; ++i) {
1349       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
1350     }
1351 
1352     WriteOptions wopts;
1353     wopts.sync = true;
1354     ASSERT_OK(dbfull()->Write(wopts, &batch));
1355   };
1356 
1357   Reopen(options);
1358   for (auto i = 0; i < 300; ++i) {
1359     if (i < 100 || i >= 200) {
1360       ASSERT_NE(Get(Key(i)), "NOT_FOUND");
1361     } else {
1362       ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
1363     }
1364   }
1365   Close();
1366 }
1367 
TEST_F(DBErrorHandlingFSTest,MultiCFWALWriteError)1368 TEST_F(DBErrorHandlingFSTest, MultiCFWALWriteError) {
1369   if (mem_env_ != nullptr) {
1370     ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
1371     return;
1372   }
1373   std::shared_ptr<ErrorHandlerFSListener> listener(
1374       new ErrorHandlerFSListener());
1375   Options options = GetDefaultOptions();
1376   options.env = fault_env_.get();
1377   options.create_if_missing = true;
1378   options.writable_file_max_buffer_size = 32768;
1379   options.listeners.emplace_back(listener);
1380   Random rnd(301);
1381 
1382   listener->EnableAutoRecovery();
1383   CreateAndReopenWithCF({"one", "two", "three"}, options);
1384 
1385   {
1386     WriteBatch batch;
1387 
1388     for (auto i = 1; i < 4; ++i) {
1389       for (auto j = 0; j < 100; ++j) {
1390         ASSERT_OK(batch.Put(handles_[i], Key(j), rnd.RandomString(1024)));
1391       }
1392     }
1393 
1394     WriteOptions wopts;
1395     wopts.sync = true;
1396     ASSERT_OK(dbfull()->Write(wopts, &batch));
1397   };
1398 
1399   {
1400     WriteBatch batch;
1401     int write_error = 0;
1402 
1403     // Write to one CF
1404     for (auto i = 100; i < 199; ++i) {
1405       ASSERT_OK(batch.Put(handles_[2], Key(i), rnd.RandomString(1024)));
1406     }
1407 
1408     SyncPoint::GetInstance()->SetCallBack(
1409         "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
1410           write_error++;
1411           if (write_error > 2) {
1412             fault_fs_->SetFilesystemActive(false,
1413                                            IOStatus::NoSpace("Out of space"));
1414           }
1415         });
1416     SyncPoint::GetInstance()->EnableProcessing();
1417     WriteOptions wopts;
1418     wopts.sync = true;
1419     Status s = dbfull()->Write(wopts, &batch);
1420     ASSERT_TRUE(s.IsNoSpace());
1421   }
1422   SyncPoint::GetInstance()->DisableProcessing();
1423   fault_fs_->SetFilesystemActive(true);
1424   ASSERT_EQ(listener->WaitForRecovery(5000000), true);
1425 
1426   for (auto i = 1; i < 4; ++i) {
1427     // Every CF should have been flushed
1428     ASSERT_EQ(NumTableFilesAtLevel(0, i), 1);
1429   }
1430 
1431   for (auto i = 1; i < 4; ++i) {
1432     for (auto j = 0; j < 199; ++j) {
1433       if (j < 100) {
1434         ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
1435       } else {
1436         ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
1437       }
1438     }
1439   }
1440   ReopenWithColumnFamilies({"default", "one", "two", "three"}, options);
1441   for (auto i = 1; i < 4; ++i) {
1442     for (auto j = 0; j < 199; ++j) {
1443       if (j < 100) {
1444         ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
1445       } else {
1446         ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
1447       }
1448     }
1449   }
1450   Close();
1451 }
1452 
TEST_F(DBErrorHandlingFSTest,MultiDBCompactionError)1453 TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) {
1454   if (mem_env_ != nullptr) {
1455     ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
1456     return;
1457   }
1458   FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(env_);
1459   std::vector<std::unique_ptr<Env>> fault_envs;
1460   std::vector<FaultInjectionTestFS*> fault_fs;
1461   std::vector<Options> options;
1462   std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener;
1463   std::vector<DB*> db;
1464   std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
1465   int kNumDbInstances = 3;
1466   Random rnd(301);
1467 
1468   for (auto i = 0; i < kNumDbInstances; ++i) {
1469     listener.emplace_back(new ErrorHandlerFSListener());
1470     options.emplace_back(GetDefaultOptions());
1471     fault_fs.emplace_back(new FaultInjectionTestFS(env_->GetFileSystem()));
1472     std::shared_ptr<FileSystem> fs(fault_fs.back());
1473     fault_envs.emplace_back(new CompositeEnvWrapper(def_env, fs));
1474     options[i].env = fault_envs.back().get();
1475     options[i].create_if_missing = true;
1476     options[i].level0_file_num_compaction_trigger = 2;
1477     options[i].writable_file_max_buffer_size = 32768;
1478     options[i].listeners.emplace_back(listener[i]);
1479     options[i].sst_file_manager = sfm;
1480     DB* dbptr;
1481     char buf[16];
1482 
1483     listener[i]->EnableAutoRecovery();
1484     // Setup for returning error for the 3rd SST, which would be level 1
1485     listener[i]->InjectFileCreationError(fault_fs[i], 3,
1486                                          IOStatus::NoSpace("Out of space"));
1487     snprintf(buf, sizeof(buf), "_%d", i);
1488     ASSERT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
1489     ASSERT_OK(DB::Open(options[i], dbname_ + std::string(buf), &dbptr));
1490     db.emplace_back(dbptr);
1491   }
1492 
1493   for (auto i = 0; i < kNumDbInstances; ++i) {
1494     WriteBatch batch;
1495 
1496     for (auto j = 0; j <= 100; ++j) {
1497       ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
1498     }
1499 
1500     WriteOptions wopts;
1501     wopts.sync = true;
1502     ASSERT_OK(db[i]->Write(wopts, &batch));
1503     ASSERT_OK(db[i]->Flush(FlushOptions()));
1504   }
1505 
1506   def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
1507   for (auto i = 0; i < kNumDbInstances; ++i) {
1508     WriteBatch batch;
1509 
1510     // Write to one CF
1511     for (auto j = 100; j < 199; ++j) {
1512       ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
1513     }
1514 
1515     WriteOptions wopts;
1516     wopts.sync = true;
1517     ASSERT_OK(db[i]->Write(wopts, &batch));
1518     ASSERT_OK(db[i]->Flush(FlushOptions()));
1519   }
1520 
1521   for (auto i = 0; i < kNumDbInstances; ++i) {
1522     Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
1523     ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
1524     fault_fs[i]->SetFilesystemActive(true);
1525   }
1526 
1527   def_env->SetFilesystemActive(true);
1528   for (auto i = 0; i < kNumDbInstances; ++i) {
1529     std::string prop;
1530     ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
1531     ASSERT_OK(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true));
1532     EXPECT_TRUE(db[i]->GetProperty(
1533         "rocksdb.num-files-at-level" + NumberToString(0), &prop));
1534     EXPECT_EQ(atoi(prop.c_str()), 0);
1535     EXPECT_TRUE(db[i]->GetProperty(
1536         "rocksdb.num-files-at-level" + NumberToString(1), &prop));
1537     EXPECT_EQ(atoi(prop.c_str()), 1);
1538   }
1539 
1540   SstFileManagerImpl* sfmImpl =
1541       static_cast_with_check<SstFileManagerImpl>(sfm.get());
1542   sfmImpl->Close();
1543 
1544   for (auto i = 0; i < kNumDbInstances; ++i) {
1545     char buf[16];
1546     snprintf(buf, sizeof(buf), "_%d", i);
1547     delete db[i];
1548     fault_fs[i]->SetFilesystemActive(true);
1549     if (getenv("KEEP_DB")) {
1550       printf("DB is still at %s%s\n", dbname_.c_str(), buf);
1551     } else {
1552       ASSERT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
1553     }
1554   }
1555   options.clear();
1556   sfm.reset();
1557   delete def_env;
1558 }
1559 
TEST_F(DBErrorHandlingFSTest,MultiDBVariousErrors)1560 TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
1561   if (mem_env_ != nullptr) {
1562     ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
1563     return;
1564   }
1565   FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(env_);
1566   std::vector<std::unique_ptr<Env>> fault_envs;
1567   std::vector<FaultInjectionTestFS*> fault_fs;
1568   std::vector<Options> options;
1569   std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener;
1570   std::vector<DB*> db;
1571   std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
1572   int kNumDbInstances = 3;
1573   Random rnd(301);
1574 
1575   for (auto i = 0; i < kNumDbInstances; ++i) {
1576     listener.emplace_back(new ErrorHandlerFSListener());
1577     options.emplace_back(GetDefaultOptions());
1578     fault_fs.emplace_back(new FaultInjectionTestFS(env_->GetFileSystem()));
1579     std::shared_ptr<FileSystem> fs(fault_fs.back());
1580     fault_envs.emplace_back(new CompositeEnvWrapper(def_env, fs));
1581     options[i].env = fault_envs.back().get();
1582     options[i].create_if_missing = true;
1583     options[i].level0_file_num_compaction_trigger = 2;
1584     options[i].writable_file_max_buffer_size = 32768;
1585     options[i].listeners.emplace_back(listener[i]);
1586     options[i].sst_file_manager = sfm;
1587     DB* dbptr;
1588     char buf[16];
1589 
1590     listener[i]->EnableAutoRecovery();
1591     switch (i) {
1592       case 0:
1593         // Setup for returning error for the 3rd SST, which would be level 1
1594         listener[i]->InjectFileCreationError(fault_fs[i], 3,
1595                                              IOStatus::NoSpace("Out of space"));
1596         break;
1597       case 1:
1598         // Setup for returning error after the 1st SST, which would result
1599         // in a hard error
1600         listener[i]->InjectFileCreationError(fault_fs[i], 2,
1601                                              IOStatus::NoSpace("Out of space"));
1602         break;
1603       default:
1604         break;
1605     }
1606     snprintf(buf, sizeof(buf), "_%d", i);
1607     ASSERT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
1608     ASSERT_OK(DB::Open(options[i], dbname_ + std::string(buf), &dbptr));
1609     db.emplace_back(dbptr);
1610   }
1611 
1612   for (auto i = 0; i < kNumDbInstances; ++i) {
1613     WriteBatch batch;
1614 
1615     for (auto j = 0; j <= 100; ++j) {
1616       ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
1617     }
1618 
1619     WriteOptions wopts;
1620     wopts.sync = true;
1621     ASSERT_OK(db[i]->Write(wopts, &batch));
1622     ASSERT_OK(db[i]->Flush(FlushOptions()));
1623   }
1624 
1625   def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
1626   for (auto i = 0; i < kNumDbInstances; ++i) {
1627     WriteBatch batch;
1628 
1629     // Write to one CF
1630     for (auto j = 100; j < 199; ++j) {
1631       ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
1632     }
1633 
1634     WriteOptions wopts;
1635     wopts.sync = true;
1636     ASSERT_OK(db[i]->Write(wopts, &batch));
1637     if (i != 1) {
1638       ASSERT_OK(db[i]->Flush(FlushOptions()));
1639     } else {
1640       ASSERT_TRUE(db[i]->Flush(FlushOptions()).IsNoSpace());
1641     }
1642   }
1643 
1644   for (auto i = 0; i < kNumDbInstances; ++i) {
1645     Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
1646     switch (i) {
1647       case 0:
1648         ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
1649         break;
1650       case 1:
1651         ASSERT_EQ(s.severity(), Status::Severity::kHardError);
1652         break;
1653       case 2:
1654         ASSERT_OK(s);
1655         break;
1656     }
1657     fault_fs[i]->SetFilesystemActive(true);
1658   }
1659 
1660   def_env->SetFilesystemActive(true);
1661   for (auto i = 0; i < kNumDbInstances; ++i) {
1662     std::string prop;
1663     if (i < 2) {
1664       ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
1665     }
1666     if (i == 1) {
1667       ASSERT_OK(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true));
1668     }
1669     EXPECT_TRUE(db[i]->GetProperty(
1670         "rocksdb.num-files-at-level" + NumberToString(0), &prop));
1671     EXPECT_EQ(atoi(prop.c_str()), 0);
1672     EXPECT_TRUE(db[i]->GetProperty(
1673         "rocksdb.num-files-at-level" + NumberToString(1), &prop));
1674     EXPECT_EQ(atoi(prop.c_str()), 1);
1675   }
1676 
1677   SstFileManagerImpl* sfmImpl =
1678       static_cast_with_check<SstFileManagerImpl>(sfm.get());
1679   sfmImpl->Close();
1680 
1681   for (auto i = 0; i < kNumDbInstances; ++i) {
1682     char buf[16];
1683     snprintf(buf, sizeof(buf), "_%d", i);
1684     fault_fs[i]->SetFilesystemActive(true);
1685     delete db[i];
1686     if (getenv("KEEP_DB")) {
1687       printf("DB is still at %s%s\n", dbname_.c_str(), buf);
1688     } else {
1689       EXPECT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
1690     }
1691   }
1692   options.clear();
1693   delete def_env;
1694 }
1695 
1696 // When Put the KV-pair, the write option is set to disable WAL.
1697 // If retryable error happens in this condition, map the bg error
1698 // to soft error and trigger auto resume. During auto resume, SwitchMemtable
1699 // is disabled to avoid small SST tables. Write can still be applied before
1700 // the bg error is cleaned unless the memtable is full.
TEST_F(DBErrorHandlingFSTest,FLushWritNoWALRetryableErrorAutoRecover1)1701 TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableErrorAutoRecover1) {
1702   // Activate the FS before the first resume
1703   std::shared_ptr<ErrorHandlerFSListener> listener(
1704       new ErrorHandlerFSListener());
1705   Options options = GetDefaultOptions();
1706   options.env = fault_env_.get();
1707   options.create_if_missing = true;
1708   options.listeners.emplace_back(listener);
1709   options.max_bgerror_resume_count = 2;
1710   options.bgerror_resume_retry_interval = 100000;  // 0.1 second
1711   options.statistics = CreateDBStatistics();
1712   Status s;
1713 
1714   listener->EnableAutoRecovery(false);
1715   DestroyAndReopen(options);
1716 
1717   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1718   error_msg.SetRetryable(true);
1719 
1720   WriteOptions wo = WriteOptions();
1721   wo.disableWAL = true;
1722   ASSERT_OK(Put(Key(1), "val1", wo));
1723   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1724       {{"RecoverFromRetryableBGIOError:LoopOut",
1725         "FLushWritNoWALRetryableeErrorAutoRecover1:1"}});
1726   SyncPoint::GetInstance()->SetCallBack(
1727       "BuildTable:BeforeFinishBuildTable",
1728       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1729 
1730   SyncPoint::GetInstance()->EnableProcessing();
1731   s = Flush();
1732   ASSERT_EQ("val1", Get(Key(1)));
1733   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1734   TEST_SYNC_POINT("FLushWritNoWALRetryableeErrorAutoRecover1:1");
1735   ASSERT_EQ("val1", Get(Key(1)));
1736   ASSERT_EQ("val1", Get(Key(1)));
1737   SyncPoint::GetInstance()->DisableProcessing();
1738   fault_fs_->SetFilesystemActive(true);
1739   ASSERT_EQ(3, options.statistics->getAndResetTickerCount(
1740                    ERROR_HANDLER_BG_ERROR_COUNT));
1741   ASSERT_EQ(3, options.statistics->getAndResetTickerCount(
1742                    ERROR_HANDLER_BG_IO_ERROR_COUNT));
1743   ASSERT_EQ(3, options.statistics->getAndResetTickerCount(
1744                    ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
1745   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
1746                    ERROR_HANDLER_AUTORESUME_COUNT));
1747   ASSERT_LE(0, options.statistics->getAndResetTickerCount(
1748                    ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
1749   ASSERT_LE(0, options.statistics->getAndResetTickerCount(
1750                    ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
1751   HistogramData autoresume_retry;
1752   options.statistics->histogramData(ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
1753                                     &autoresume_retry);
1754   ASSERT_GE(autoresume_retry.max, 0);
1755   ASSERT_OK(Put(Key(2), "val2", wo));
1756   s = Flush();
1757   // Since auto resume fails, the bg error is not cleand, flush will
1758   // return the bg_error set before.
1759   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1760   ASSERT_EQ("val2", Get(Key(2)));
1761 
1762   // call auto resume
1763   ASSERT_OK(dbfull()->Resume());
1764   ASSERT_OK(Put(Key(3), "val3", wo));
1765   // After resume is successful, the flush should be ok.
1766   ASSERT_OK(Flush());
1767   ASSERT_EQ("val3", Get(Key(3)));
1768   Destroy(options);
1769 }
1770 
TEST_F(DBErrorHandlingFSTest,FLushWritNoWALRetryableErrorAutoRecover2)1771 TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableErrorAutoRecover2) {
1772   // Activate the FS before the first resume
1773   std::shared_ptr<ErrorHandlerFSListener> listener(
1774       new ErrorHandlerFSListener());
1775   Options options = GetDefaultOptions();
1776   options.env = fault_env_.get();
1777   options.create_if_missing = true;
1778   options.listeners.emplace_back(listener);
1779   options.max_bgerror_resume_count = 2;
1780   options.bgerror_resume_retry_interval = 100000;  // 0.1 second
1781   options.statistics = CreateDBStatistics();
1782   Status s;
1783 
1784   listener->EnableAutoRecovery(false);
1785   DestroyAndReopen(options);
1786 
1787   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1788   error_msg.SetRetryable(true);
1789 
1790   WriteOptions wo = WriteOptions();
1791   wo.disableWAL = true;
1792   ASSERT_OK(Put(Key(1), "val1", wo));
1793   SyncPoint::GetInstance()->SetCallBack(
1794       "BuildTable:BeforeFinishBuildTable",
1795       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1796 
1797   SyncPoint::GetInstance()->EnableProcessing();
1798   s = Flush();
1799   ASSERT_EQ("val1", Get(Key(1)));
1800   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1801   SyncPoint::GetInstance()->DisableProcessing();
1802   fault_fs_->SetFilesystemActive(true);
1803   ASSERT_EQ(listener->WaitForRecovery(5000000), true);
1804   ASSERT_EQ("val1", Get(Key(1)));
1805   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
1806                    ERROR_HANDLER_BG_ERROR_COUNT));
1807   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
1808                    ERROR_HANDLER_BG_IO_ERROR_COUNT));
1809   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
1810                    ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
1811   ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
1812                    ERROR_HANDLER_AUTORESUME_COUNT));
1813   ASSERT_LE(0, options.statistics->getAndResetTickerCount(
1814                    ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
1815   ASSERT_LE(0, options.statistics->getAndResetTickerCount(
1816                    ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
1817   HistogramData autoresume_retry;
1818   options.statistics->histogramData(ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
1819                                     &autoresume_retry);
1820   ASSERT_GE(autoresume_retry.max, 0);
1821   ASSERT_OK(Put(Key(2), "val2", wo));
1822   s = Flush();
1823   // Since auto resume is successful, the bg error is cleaned, flush will
1824   // be successful.
1825   ASSERT_OK(s);
1826   ASSERT_EQ("val2", Get(Key(2)));
1827   Destroy(options);
1828 }
1829 
TEST_F(DBErrorHandlingFSTest,FLushWritRetryableErrorAutoRecover1)1830 TEST_F(DBErrorHandlingFSTest, FLushWritRetryableErrorAutoRecover1) {
1831   // Activate the FS before the first resume
1832   std::shared_ptr<ErrorHandlerFSListener> listener(
1833       new ErrorHandlerFSListener());
1834   Options options = GetDefaultOptions();
1835   options.env = fault_env_.get();
1836   options.create_if_missing = true;
1837   options.listeners.emplace_back(listener);
1838   options.max_bgerror_resume_count = 2;
1839   options.bgerror_resume_retry_interval = 100000;  // 0.1 second
1840   Status s;
1841 
1842   listener->EnableAutoRecovery(false);
1843   DestroyAndReopen(options);
1844 
1845   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1846   error_msg.SetRetryable(true);
1847 
1848   ASSERT_OK(Put(Key(1), "val1"));
1849   SyncPoint::GetInstance()->SetCallBack(
1850       "BuildTable:BeforeFinishBuildTable",
1851       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1852 
1853   SyncPoint::GetInstance()->EnableProcessing();
1854   s = Flush();
1855   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1856   SyncPoint::GetInstance()->DisableProcessing();
1857   fault_fs_->SetFilesystemActive(true);
1858   ASSERT_EQ(listener->WaitForRecovery(5000000), true);
1859 
1860   ASSERT_EQ("val1", Get(Key(1)));
1861   Reopen(options);
1862   ASSERT_EQ("val1", Get(Key(1)));
1863   ASSERT_OK(Put(Key(2), "val2"));
1864   ASSERT_OK(Flush());
1865   ASSERT_EQ("val2", Get(Key(2)));
1866 
1867   Destroy(options);
1868 }
1869 
TEST_F(DBErrorHandlingFSTest,FLushWritRetryableErrorAutoRecover2)1870 TEST_F(DBErrorHandlingFSTest, FLushWritRetryableErrorAutoRecover2) {
1871   // Fail all the resume and let user to resume
1872   std::shared_ptr<ErrorHandlerFSListener> listener(
1873       new ErrorHandlerFSListener());
1874   Options options = GetDefaultOptions();
1875   options.env = fault_env_.get();
1876   options.create_if_missing = true;
1877   options.listeners.emplace_back(listener);
1878   options.max_bgerror_resume_count = 2;
1879   options.bgerror_resume_retry_interval = 100000;  // 0.1 second
1880   Status s;
1881 
1882   listener->EnableAutoRecovery(false);
1883   DestroyAndReopen(options);
1884 
1885   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1886   error_msg.SetRetryable(true);
1887 
1888   ASSERT_OK(Put(Key(1), "val1"));
1889   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1890       {{"FLushWritRetryableeErrorAutoRecover2:0",
1891         "RecoverFromRetryableBGIOError:BeforeStart"},
1892        {"RecoverFromRetryableBGIOError:LoopOut",
1893         "FLushWritRetryableeErrorAutoRecover2:1"}});
1894   SyncPoint::GetInstance()->SetCallBack(
1895       "BuildTable:BeforeFinishBuildTable",
1896       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1897   SyncPoint::GetInstance()->EnableProcessing();
1898   s = Flush();
1899   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1900   TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover2:0");
1901   TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover2:1");
1902   fault_fs_->SetFilesystemActive(true);
1903   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1904   SyncPoint::GetInstance()->DisableProcessing();
1905 
1906   ASSERT_EQ("val1", Get(Key(1)));
1907   // Auto resume fails due to FS does not recover during resume. User call
1908   // resume manually here.
1909   s = dbfull()->Resume();
1910   ASSERT_EQ("val1", Get(Key(1)));
1911   ASSERT_OK(s);
1912   ASSERT_OK(Put(Key(2), "val2"));
1913   ASSERT_OK(Flush());
1914   ASSERT_EQ("val2", Get(Key(2)));
1915 
1916   Destroy(options);
1917 }
1918 
TEST_F(DBErrorHandlingFSTest,ManifestWriteRetryableErrorAutoRecover)1919 TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableErrorAutoRecover) {
1920   // Fail the first resume and let the second resume be successful
1921   std::shared_ptr<ErrorHandlerFSListener> listener(
1922       new ErrorHandlerFSListener());
1923   Options options = GetDefaultOptions();
1924   options.env = fault_env_.get();
1925   options.create_if_missing = true;
1926   options.listeners.emplace_back(listener);
1927   options.max_bgerror_resume_count = 2;
1928   options.bgerror_resume_retry_interval = 100000;  // 0.1 second
1929   Status s;
1930   std::string old_manifest;
1931   std::string new_manifest;
1932 
1933   listener->EnableAutoRecovery(false);
1934   DestroyAndReopen(options);
1935   old_manifest = GetManifestNameFromLiveFiles();
1936 
1937   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1938   error_msg.SetRetryable(true);
1939 
1940   ASSERT_OK(Put(Key(0), "val"));
1941   ASSERT_OK(Flush());
1942   ASSERT_OK(Put(Key(1), "val"));
1943   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1944       {{"RecoverFromRetryableBGIOError:BeforeStart",
1945         "ManifestWriteRetryableErrorAutoRecover:0"},
1946        {"ManifestWriteRetryableErrorAutoRecover:1",
1947         "RecoverFromRetryableBGIOError:BeforeWait1"},
1948        {"RecoverFromRetryableBGIOError:RecoverSuccess",
1949         "ManifestWriteRetryableErrorAutoRecover:2"}});
1950   SyncPoint::GetInstance()->SetCallBack(
1951       "VersionSet::LogAndApply:WriteManifest",
1952       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1953   SyncPoint::GetInstance()->EnableProcessing();
1954   s = Flush();
1955   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1956   TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:0");
1957   fault_fs_->SetFilesystemActive(true);
1958   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1959   TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:1");
1960   TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:2");
1961   SyncPoint::GetInstance()->DisableProcessing();
1962 
1963   new_manifest = GetManifestNameFromLiveFiles();
1964   ASSERT_NE(new_manifest, old_manifest);
1965 
1966   Reopen(options);
1967   ASSERT_EQ("val", Get(Key(0)));
1968   ASSERT_EQ("val", Get(Key(1)));
1969   Close();
1970 }
1971 
TEST_F(DBErrorHandlingFSTest,ManifestWriteNoWALRetryableErrorAutoRecover)1972 TEST_F(DBErrorHandlingFSTest, ManifestWriteNoWALRetryableErrorAutoRecover) {
1973   // Fail the first resume and let the second resume be successful
1974   std::shared_ptr<ErrorHandlerFSListener> listener(
1975       new ErrorHandlerFSListener());
1976   Options options = GetDefaultOptions();
1977   options.env = fault_env_.get();
1978   options.create_if_missing = true;
1979   options.listeners.emplace_back(listener);
1980   options.max_bgerror_resume_count = 2;
1981   options.bgerror_resume_retry_interval = 100000;  // 0.1 second
1982   Status s;
1983   std::string old_manifest;
1984   std::string new_manifest;
1985 
1986   listener->EnableAutoRecovery(false);
1987   DestroyAndReopen(options);
1988   old_manifest = GetManifestNameFromLiveFiles();
1989 
1990   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1991   error_msg.SetRetryable(true);
1992 
1993   WriteOptions wo = WriteOptions();
1994   wo.disableWAL = true;
1995   ASSERT_OK(Put(Key(0), "val", wo));
1996   ASSERT_OK(Flush());
1997   ASSERT_OK(Put(Key(1), "val", wo));
1998   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1999       {{"RecoverFromRetryableBGIOError:BeforeStart",
2000         "ManifestWriteNoWALRetryableErrorAutoRecover:0"},
2001        {"ManifestWriteNoWALRetryableErrorAutoRecover:1",
2002         "RecoverFromRetryableBGIOError:BeforeWait1"},
2003        {"RecoverFromRetryableBGIOError:RecoverSuccess",
2004         "ManifestWriteNoWALRetryableErrorAutoRecover:2"}});
2005   SyncPoint::GetInstance()->SetCallBack(
2006       "VersionSet::LogAndApply:WriteManifest",
2007       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
2008   SyncPoint::GetInstance()->EnableProcessing();
2009   s = Flush();
2010   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
2011   TEST_SYNC_POINT("ManifestWriteNoWALRetryableErrorAutoRecover:0");
2012   fault_fs_->SetFilesystemActive(true);
2013   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
2014   TEST_SYNC_POINT("ManifestWriteNoWALRetryableErrorAutoRecover:1");
2015   TEST_SYNC_POINT("ManifestWriteNoWALRetryableErrorAutoRecover:2");
2016   SyncPoint::GetInstance()->DisableProcessing();
2017 
2018   new_manifest = GetManifestNameFromLiveFiles();
2019   ASSERT_NE(new_manifest, old_manifest);
2020 
2021   Reopen(options);
2022   ASSERT_EQ("val", Get(Key(0)));
2023   ASSERT_EQ("val", Get(Key(1)));
2024   Close();
2025 }
2026 
TEST_F(DBErrorHandlingFSTest,CompactionManifestWriteRetryableErrorAutoRecover)2027 TEST_F(DBErrorHandlingFSTest,
2028        CompactionManifestWriteRetryableErrorAutoRecover) {
2029   std::shared_ptr<ErrorHandlerFSListener> listener(
2030       new ErrorHandlerFSListener());
2031   Options options = GetDefaultOptions();
2032   options.env = fault_env_.get();
2033   options.create_if_missing = true;
2034   options.level0_file_num_compaction_trigger = 2;
2035   options.listeners.emplace_back(listener);
2036   options.max_bgerror_resume_count = 2;
2037   options.bgerror_resume_retry_interval = 100000;  // 0.1 second
2038   Status s;
2039   std::string old_manifest;
2040   std::string new_manifest;
2041   std::atomic<bool> fail_manifest(false);
2042   DestroyAndReopen(options);
2043   old_manifest = GetManifestNameFromLiveFiles();
2044 
2045   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
2046   error_msg.SetRetryable(true);
2047 
2048   ASSERT_OK(Put(Key(0), "val"));
2049   ASSERT_OK(Put(Key(2), "val"));
2050   ASSERT_OK(Flush());
2051 
2052   listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
2053   listener->EnableAutoRecovery(false);
2054   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2055       // Wait for flush of 2nd L0 file before starting compaction
2056       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
2057         "BackgroundCallCompaction:0"},
2058        // Wait for compaction to detect manifest write error
2059        {"BackgroundCallCompaction:1", "CompactionManifestWriteErrorAR:0"},
2060        // Make compaction thread wait for error to be cleared
2061        {"CompactionManifestWriteErrorAR:1",
2062         "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"},
2063        {"CompactionManifestWriteErrorAR:2",
2064         "RecoverFromRetryableBGIOError:BeforeStart"},
2065        // Fail the first resume, before the wait in resume
2066        {"RecoverFromRetryableBGIOError:BeforeResume0",
2067         "CompactionManifestWriteErrorAR:3"},
2068        // Activate the FS before the second resume
2069        {"CompactionManifestWriteErrorAR:4",
2070         "RecoverFromRetryableBGIOError:BeforeResume1"},
2071        // Wait the auto resume be sucessful
2072        {"RecoverFromRetryableBGIOError:RecoverSuccess",
2073         "CompactionManifestWriteErrorAR:5"}});
2074   // trigger manifest write failure in compaction thread
2075   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2076       "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
2077   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2078       "VersionSet::LogAndApply:WriteManifest", [&](void*) {
2079         if (fail_manifest.load()) {
2080           fault_fs_->SetFilesystemActive(false, error_msg);
2081         }
2082       });
2083   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2084 
2085   ASSERT_OK(Put(Key(1), "val"));
2086   s = Flush();
2087   ASSERT_OK(s);
2088 
2089   TEST_SYNC_POINT("CompactionManifestWriteErrorAR:0");
2090   TEST_SYNC_POINT("CompactionManifestWriteErrorAR:1");
2091 
2092   s = dbfull()->TEST_WaitForCompact();
2093   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
2094   TEST_SYNC_POINT("CompactionManifestWriteErrorAR:2");
2095   TEST_SYNC_POINT("CompactionManifestWriteErrorAR:3");
2096   fault_fs_->SetFilesystemActive(true);
2097   SyncPoint::GetInstance()->ClearAllCallBacks();
2098   TEST_SYNC_POINT("CompactionManifestWriteErrorAR:4");
2099   TEST_SYNC_POINT("CompactionManifestWriteErrorAR:5");
2100   SyncPoint::GetInstance()->DisableProcessing();
2101 
2102   new_manifest = GetManifestNameFromLiveFiles();
2103   ASSERT_NE(new_manifest, old_manifest);
2104 
2105   Reopen(options);
2106   ASSERT_EQ("val", Get(Key(0)));
2107   ASSERT_EQ("val", Get(Key(1)));
2108   ASSERT_EQ("val", Get(Key(2)));
2109   Close();
2110 }
2111 
TEST_F(DBErrorHandlingFSTest,CompactionWriteRetryableErrorAutoRecover)2112 TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableErrorAutoRecover) {
2113   // In this test, in the first round of compaction, the FS is set to error.
2114   // So the first compaction fails due to retryable IO error and it is mapped
2115   // to soft error. Then, compaction is rescheduled, in the second round of
2116   // compaction, the FS is set to active and compaction is successful, so
2117   // the test will hit the CompactionJob::FinishCompactionOutputFile1 sync
2118   // point.
2119   std::shared_ptr<ErrorHandlerFSListener> listener(
2120       new ErrorHandlerFSListener());
2121   Options options = GetDefaultOptions();
2122   options.env = fault_env_.get();
2123   options.create_if_missing = true;
2124   options.level0_file_num_compaction_trigger = 2;
2125   options.listeners.emplace_back(listener);
2126   Status s;
2127   std::atomic<bool> fail_first(false);
2128   std::atomic<bool> fail_second(true);
2129   DestroyAndReopen(options);
2130 
2131   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
2132   error_msg.SetRetryable(true);
2133 
2134   ASSERT_OK(Put(Key(0), "va;"));
2135   ASSERT_OK(Put(Key(2), "va;"));
2136   s = Flush();
2137   ASSERT_OK(s);
2138 
2139   listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
2140   listener->EnableAutoRecovery(false);
2141   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2142       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
2143         "BackgroundCallCompaction:0"},
2144        {"CompactionJob::FinishCompactionOutputFile1",
2145         "CompactionWriteRetryableErrorAutoRecover0"}});
2146   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2147       "DBImpl::BackgroundCompaction:Start",
2148       [&](void*) { fault_fs_->SetFilesystemActive(true); });
2149   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2150       "BackgroundCallCompaction:0", [&](void*) { fail_first.store(true); });
2151   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2152       "CompactionJob::OpenCompactionOutputFile", [&](void*) {
2153         if (fail_first.load() && fail_second.load()) {
2154           fault_fs_->SetFilesystemActive(false, error_msg);
2155           fail_second.store(false);
2156         }
2157       });
2158   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2159 
2160   ASSERT_OK(Put(Key(1), "val"));
2161   s = Flush();
2162   ASSERT_OK(s);
2163 
2164   s = dbfull()->TEST_WaitForCompact();
2165   ASSERT_OK(s);
2166   TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0");
2167   SyncPoint::GetInstance()->ClearAllCallBacks();
2168   SyncPoint::GetInstance()->DisableProcessing();
2169   Destroy(options);
2170 }
2171 
TEST_F(DBErrorHandlingFSTest,WALWriteRetryableErrorAutoRecover1)2172 TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover1) {
2173   std::shared_ptr<ErrorHandlerFSListener> listener(
2174       new ErrorHandlerFSListener());
2175   Options options = GetDefaultOptions();
2176   options.env = fault_env_.get();
2177   options.create_if_missing = true;
2178   options.writable_file_max_buffer_size = 32768;
2179   options.listeners.emplace_back(listener);
2180   options.paranoid_checks = true;
2181   options.max_bgerror_resume_count = 2;
2182   options.bgerror_resume_retry_interval = 100000;  // 0.1 second
2183   Status s;
2184   Random rnd(301);
2185 
2186   DestroyAndReopen(options);
2187 
2188   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
2189   error_msg.SetRetryable(true);
2190 
2191   // For the first batch, write is successful, require sync
2192   {
2193     WriteBatch batch;
2194 
2195     for (auto i = 0; i < 100; ++i) {
2196       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2197     }
2198 
2199     WriteOptions wopts;
2200     wopts.sync = true;
2201     ASSERT_OK(dbfull()->Write(wopts, &batch));
2202   };
2203 
2204   // For the second batch, the first 2 file Append are successful, then the
2205   // following Append fails due to file system retryable IOError.
2206   {
2207     WriteBatch batch;
2208     int write_error = 0;
2209 
2210     for (auto i = 100; i < 200; ++i) {
2211       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2212     }
2213     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2214         {{"WALWriteErrorDone", "RecoverFromRetryableBGIOError:BeforeStart"},
2215          {"RecoverFromRetryableBGIOError:BeforeResume0", "WALWriteError1:0"},
2216          {"WALWriteError1:1", "RecoverFromRetryableBGIOError:BeforeResume1"},
2217          {"RecoverFromRetryableBGIOError:RecoverSuccess", "WALWriteError1:2"}});
2218 
2219     SyncPoint::GetInstance()->SetCallBack(
2220         "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
2221           write_error++;
2222           if (write_error > 2) {
2223             fault_fs_->SetFilesystemActive(false, error_msg);
2224           }
2225         });
2226     SyncPoint::GetInstance()->EnableProcessing();
2227     WriteOptions wopts;
2228     wopts.sync = true;
2229     s = dbfull()->Write(wopts, &batch);
2230     ASSERT_EQ(true, s.IsIOError());
2231     TEST_SYNC_POINT("WALWriteErrorDone");
2232 
2233     TEST_SYNC_POINT("WALWriteError1:0");
2234     fault_fs_->SetFilesystemActive(true);
2235     SyncPoint::GetInstance()->ClearAllCallBacks();
2236     TEST_SYNC_POINT("WALWriteError1:1");
2237     TEST_SYNC_POINT("WALWriteError1:2");
2238   }
2239   SyncPoint::GetInstance()->DisableProcessing();
2240 
2241   // Data in corrupted WAL are not stored
2242   for (auto i = 0; i < 199; ++i) {
2243     if (i < 100) {
2244       ASSERT_NE(Get(Key(i)), "NOT_FOUND");
2245     } else {
2246       ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
2247     }
2248   }
2249 
2250   // Resume and write a new batch, should be in the WAL
2251   {
2252     WriteBatch batch;
2253 
2254     for (auto i = 200; i < 300; ++i) {
2255       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2256     }
2257 
2258     WriteOptions wopts;
2259     wopts.sync = true;
2260     ASSERT_OK(dbfull()->Write(wopts, &batch));
2261   };
2262 
2263   Reopen(options);
2264   for (auto i = 0; i < 300; ++i) {
2265     if (i < 100 || i >= 200) {
2266       ASSERT_NE(Get(Key(i)), "NOT_FOUND");
2267     } else {
2268       ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
2269     }
2270   }
2271   Close();
2272 }
2273 
TEST_F(DBErrorHandlingFSTest,WALWriteRetryableErrorAutoRecover2)2274 TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover2) {
2275   // Fail the first recover and try second time.
2276   std::shared_ptr<ErrorHandlerFSListener> listener(
2277       new ErrorHandlerFSListener());
2278   Options options = GetDefaultOptions();
2279   options.env = fault_env_.get();
2280   options.create_if_missing = true;
2281   options.writable_file_max_buffer_size = 32768;
2282   options.listeners.emplace_back(listener);
2283   options.paranoid_checks = true;
2284   options.max_bgerror_resume_count = 2;
2285   options.bgerror_resume_retry_interval = 100000;  // 0.1 second
2286   Status s;
2287   Random rnd(301);
2288 
2289   DestroyAndReopen(options);
2290 
2291   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
2292   error_msg.SetRetryable(true);
2293 
2294   // For the first batch, write is successful, require sync
2295   {
2296     WriteBatch batch;
2297 
2298     for (auto i = 0; i < 100; ++i) {
2299       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2300     }
2301 
2302     WriteOptions wopts;
2303     wopts.sync = true;
2304     ASSERT_OK(dbfull()->Write(wopts, &batch));
2305   };
2306 
2307   // For the second batch, the first 2 file Append are successful, then the
2308   // following Append fails due to file system retryable IOError.
2309   {
2310     WriteBatch batch;
2311     int write_error = 0;
2312 
2313     for (auto i = 100; i < 200; ++i) {
2314       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2315     }
2316     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2317         {{"RecoverFromRetryableBGIOError:BeforeWait0", "WALWriteError2:0"},
2318          {"WALWriteError2:1", "RecoverFromRetryableBGIOError:BeforeWait1"},
2319          {"RecoverFromRetryableBGIOError:RecoverSuccess", "WALWriteError2:2"}});
2320 
2321     SyncPoint::GetInstance()->SetCallBack(
2322         "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
2323           write_error++;
2324           if (write_error > 2) {
2325             fault_fs_->SetFilesystemActive(false, error_msg);
2326           }
2327         });
2328     SyncPoint::GetInstance()->EnableProcessing();
2329     WriteOptions wopts;
2330     wopts.sync = true;
2331     s = dbfull()->Write(wopts, &batch);
2332     ASSERT_EQ(true, s.IsIOError());
2333 
2334     TEST_SYNC_POINT("WALWriteError2:0");
2335     fault_fs_->SetFilesystemActive(true);
2336     SyncPoint::GetInstance()->ClearAllCallBacks();
2337     TEST_SYNC_POINT("WALWriteError2:1");
2338     TEST_SYNC_POINT("WALWriteError2:2");
2339   }
2340   SyncPoint::GetInstance()->DisableProcessing();
2341 
2342   // Data in corrupted WAL are not stored
2343   for (auto i = 0; i < 199; ++i) {
2344     if (i < 100) {
2345       ASSERT_NE(Get(Key(i)), "NOT_FOUND");
2346     } else {
2347       ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
2348     }
2349   }
2350 
2351   // Resume and write a new batch, should be in the WAL
2352   {
2353     WriteBatch batch;
2354 
2355     for (auto i = 200; i < 300; ++i) {
2356       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2357     }
2358 
2359     WriteOptions wopts;
2360     wopts.sync = true;
2361     ASSERT_OK(dbfull()->Write(wopts, &batch));
2362   };
2363 
2364   Reopen(options);
2365   for (auto i = 0; i < 300; ++i) {
2366     if (i < 100 || i >= 200) {
2367       ASSERT_NE(Get(Key(i)), "NOT_FOUND");
2368     } else {
2369       ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
2370     }
2371   }
2372   Close();
2373 }
2374 
2375 class DBErrorHandlingFencingTest : public DBErrorHandlingFSTest,
2376                                    public testing::WithParamInterface<bool> {};
2377 
TEST_P(DBErrorHandlingFencingTest,FLushWriteFenced)2378 TEST_P(DBErrorHandlingFencingTest, FLushWriteFenced) {
2379   std::shared_ptr<ErrorHandlerFSListener> listener(
2380       new ErrorHandlerFSListener());
2381   Options options = GetDefaultOptions();
2382   options.env = fault_env_.get();
2383   options.create_if_missing = true;
2384   options.listeners.emplace_back(listener);
2385   options.paranoid_checks = GetParam();
2386   Status s;
2387 
2388   listener->EnableAutoRecovery(true);
2389   DestroyAndReopen(options);
2390 
2391   ASSERT_OK(Put(Key(0), "val"));
2392   SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
2393     fault_fs_->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
2394   });
2395   SyncPoint::GetInstance()->EnableProcessing();
2396   s = Flush();
2397   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
2398   ASSERT_TRUE(s.IsIOFenced());
2399   SyncPoint::GetInstance()->DisableProcessing();
2400   fault_fs_->SetFilesystemActive(true);
2401   s = dbfull()->Resume();
2402   ASSERT_TRUE(s.IsIOFenced());
2403   Destroy(options);
2404 }
2405 
TEST_P(DBErrorHandlingFencingTest,ManifestWriteFenced)2406 TEST_P(DBErrorHandlingFencingTest, ManifestWriteFenced) {
2407   std::shared_ptr<ErrorHandlerFSListener> listener(
2408       new ErrorHandlerFSListener());
2409   Options options = GetDefaultOptions();
2410   options.env = fault_env_.get();
2411   options.create_if_missing = true;
2412   options.listeners.emplace_back(listener);
2413   options.paranoid_checks = GetParam();
2414   Status s;
2415   std::string old_manifest;
2416   std::string new_manifest;
2417 
2418   listener->EnableAutoRecovery(true);
2419   DestroyAndReopen(options);
2420   old_manifest = GetManifestNameFromLiveFiles();
2421 
2422   ASSERT_OK(Put(Key(0), "val"));
2423   ASSERT_OK(Flush());
2424   ASSERT_OK(Put(Key(1), "val"));
2425   SyncPoint::GetInstance()->SetCallBack(
2426       "VersionSet::LogAndApply:WriteManifest", [&](void*) {
2427         fault_fs_->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
2428       });
2429   SyncPoint::GetInstance()->EnableProcessing();
2430   s = Flush();
2431   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
2432   ASSERT_TRUE(s.IsIOFenced());
2433   SyncPoint::GetInstance()->ClearAllCallBacks();
2434   SyncPoint::GetInstance()->DisableProcessing();
2435   fault_fs_->SetFilesystemActive(true);
2436   s = dbfull()->Resume();
2437   ASSERT_TRUE(s.IsIOFenced());
2438   Close();
2439 }
2440 
TEST_P(DBErrorHandlingFencingTest,CompactionWriteFenced)2441 TEST_P(DBErrorHandlingFencingTest, CompactionWriteFenced) {
2442   std::shared_ptr<ErrorHandlerFSListener> listener(
2443       new ErrorHandlerFSListener());
2444   Options options = GetDefaultOptions();
2445   options.env = fault_env_.get();
2446   options.create_if_missing = true;
2447   options.level0_file_num_compaction_trigger = 2;
2448   options.listeners.emplace_back(listener);
2449   options.paranoid_checks = GetParam();
2450   Status s;
2451   DestroyAndReopen(options);
2452 
2453   ASSERT_OK(Put(Key(0), "va;"));
2454   ASSERT_OK(Put(Key(2), "va;"));
2455   s = Flush();
2456   ASSERT_OK(s);
2457 
2458   listener->EnableAutoRecovery(true);
2459   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2460       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
2461         "BackgroundCallCompaction:0"}});
2462   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2463       "BackgroundCallCompaction:0", [&](void*) {
2464         fault_fs_->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
2465       });
2466   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2467 
2468   ASSERT_OK(Put(Key(1), "val"));
2469   s = Flush();
2470   ASSERT_OK(s);
2471 
2472   s = dbfull()->TEST_WaitForCompact();
2473   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
2474   ASSERT_TRUE(s.IsIOFenced());
2475 
2476   fault_fs_->SetFilesystemActive(true);
2477   s = dbfull()->Resume();
2478   ASSERT_TRUE(s.IsIOFenced());
2479   Destroy(options);
2480 }
2481 
TEST_P(DBErrorHandlingFencingTest,WALWriteFenced)2482 TEST_P(DBErrorHandlingFencingTest, WALWriteFenced) {
2483   std::shared_ptr<ErrorHandlerFSListener> listener(
2484       new ErrorHandlerFSListener());
2485   Options options = GetDefaultOptions();
2486   options.env = fault_env_.get();
2487   options.create_if_missing = true;
2488   options.writable_file_max_buffer_size = 32768;
2489   options.listeners.emplace_back(listener);
2490   options.paranoid_checks = GetParam();
2491   Status s;
2492   Random rnd(301);
2493 
2494   listener->EnableAutoRecovery(true);
2495   DestroyAndReopen(options);
2496 
2497   {
2498     WriteBatch batch;
2499 
2500     for (auto i = 0; i < 100; ++i) {
2501       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2502     }
2503 
2504     WriteOptions wopts;
2505     wopts.sync = true;
2506     ASSERT_OK(dbfull()->Write(wopts, &batch));
2507   };
2508 
2509   {
2510     WriteBatch batch;
2511     int write_error = 0;
2512 
2513     for (auto i = 100; i < 199; ++i) {
2514       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2515     }
2516 
2517     SyncPoint::GetInstance()->SetCallBack(
2518         "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
2519           write_error++;
2520           if (write_error > 2) {
2521             fault_fs_->SetFilesystemActive(false,
2522                                            IOStatus::IOFenced("IO fenced"));
2523           }
2524         });
2525     SyncPoint::GetInstance()->EnableProcessing();
2526     WriteOptions wopts;
2527     wopts.sync = true;
2528     s = dbfull()->Write(wopts, &batch);
2529     ASSERT_TRUE(s.IsIOFenced());
2530   }
2531   SyncPoint::GetInstance()->DisableProcessing();
2532   fault_fs_->SetFilesystemActive(true);
2533   {
2534     WriteBatch batch;
2535 
2536     for (auto i = 0; i < 100; ++i) {
2537       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2538     }
2539 
2540     WriteOptions wopts;
2541     wopts.sync = true;
2542     s = dbfull()->Write(wopts, &batch);
2543     ASSERT_TRUE(s.IsIOFenced());
2544   }
2545   Close();
2546 }
2547 
2548 INSTANTIATE_TEST_CASE_P(DBErrorHandlingFSTest, DBErrorHandlingFencingTest,
2549                         ::testing::Bool());
2550 
2551 }  // namespace ROCKSDB_NAMESPACE
2552 
main(int argc,char ** argv)2553 int main(int argc, char** argv) {
2554   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
2555   ::testing::InitGoogleTest(&argc, argv);
2556   return RUN_ALL_TESTS();
2557 }
2558 
2559 #else
2560 #include <stdio.h>
2561 
main(int,char **)2562 int main(int /*argc*/, char** /*argv*/) {
2563   fprintf(stderr, "SKIPPED as Cuckoo table is not supported in ROCKSDB_LITE\n");
2564   return 0;
2565 }
2566 
2567 #endif  // ROCKSDB_LITE
2568