1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #ifndef ROCKSDB_LITE
10
11 #include "db/db_test_util.h"
12 #include "file/sst_file_manager_impl.h"
13 #include "port/stack_trace.h"
14 #include "rocksdb/io_status.h"
15 #include "rocksdb/sst_file_manager.h"
16 #if !defined(ROCKSDB_LITE)
17 #include "test_util/sync_point.h"
18 #endif
19 #include "util/random.h"
20 #include "utilities/fault_injection_env.h"
21 #include "utilities/fault_injection_fs.h"
22
23 namespace ROCKSDB_NAMESPACE {
24
25 class DBErrorHandlingFSTest : public DBTestBase {
26 public:
DBErrorHandlingFSTest()27 DBErrorHandlingFSTest()
28 : DBTestBase("/db_error_handling_fs_test", /*env_do_fsync=*/true) {
29 fault_fs_.reset(new FaultInjectionTestFS(env_->GetFileSystem()));
30 fault_env_.reset(new CompositeEnvWrapper(env_, fault_fs_));
31 }
32
GetManifestNameFromLiveFiles()33 std::string GetManifestNameFromLiveFiles() {
34 std::vector<std::string> live_files;
35 uint64_t manifest_size;
36
37 Status s = dbfull()->GetLiveFiles(live_files, &manifest_size, false);
38 if (!s.ok()) {
39 return "";
40 }
41 for (auto& file : live_files) {
42 uint64_t num = 0;
43 FileType type;
44 if (ParseFileName(file, &num, &type) && type == kDescriptorFile) {
45 return file;
46 }
47 }
48 return "";
49 }
50
51 std::shared_ptr<FaultInjectionTestFS> fault_fs_;
52 std::unique_ptr<Env> fault_env_;
53 };
54
55 class ErrorHandlerFSListener : public EventListener {
56 public:
ErrorHandlerFSListener()57 ErrorHandlerFSListener()
58 : mutex_(),
59 cv_(&mutex_),
60 no_auto_recovery_(false),
61 recovery_complete_(false),
62 file_creation_started_(false),
63 override_bg_error_(false),
64 file_count_(0),
65 fault_fs_(nullptr) {}
~ErrorHandlerFSListener()66 ~ErrorHandlerFSListener() {
67 file_creation_error_.PermitUncheckedError();
68 bg_error_.PermitUncheckedError();
69 }
70
OnTableFileCreationStarted(const TableFileCreationBriefInfo &)71 void OnTableFileCreationStarted(
72 const TableFileCreationBriefInfo& /*ti*/) override {
73 InstrumentedMutexLock l(&mutex_);
74 file_creation_started_ = true;
75 if (file_count_ > 0) {
76 if (--file_count_ == 0) {
77 fault_fs_->SetFilesystemActive(false, file_creation_error_);
78 file_creation_error_ = IOStatus::OK();
79 }
80 }
81 cv_.SignalAll();
82 }
83
OnErrorRecoveryBegin(BackgroundErrorReason,Status bg_error,bool * auto_recovery)84 void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/, Status bg_error,
85 bool* auto_recovery) override {
86 bg_error.PermitUncheckedError();
87 if (*auto_recovery && no_auto_recovery_) {
88 *auto_recovery = false;
89 }
90 }
91
OnErrorRecoveryCompleted(Status old_bg_error)92 void OnErrorRecoveryCompleted(Status old_bg_error) override {
93 InstrumentedMutexLock l(&mutex_);
94 recovery_complete_ = true;
95 cv_.SignalAll();
96 old_bg_error.PermitUncheckedError();
97 }
98
WaitForRecovery(uint64_t)99 bool WaitForRecovery(uint64_t /*abs_time_us*/) {
100 InstrumentedMutexLock l(&mutex_);
101 while (!recovery_complete_) {
102 cv_.Wait(/*abs_time_us*/);
103 }
104 if (recovery_complete_) {
105 recovery_complete_ = false;
106 return true;
107 }
108 return false;
109 }
110
WaitForTableFileCreationStarted(uint64_t)111 void WaitForTableFileCreationStarted(uint64_t /*abs_time_us*/) {
112 InstrumentedMutexLock l(&mutex_);
113 while (!file_creation_started_) {
114 cv_.Wait(/*abs_time_us*/);
115 }
116 file_creation_started_ = false;
117 }
118
OnBackgroundError(BackgroundErrorReason,Status * bg_error)119 void OnBackgroundError(BackgroundErrorReason /*reason*/,
120 Status* bg_error) override {
121 if (override_bg_error_) {
122 *bg_error = bg_error_;
123 override_bg_error_ = false;
124 }
125 }
126
EnableAutoRecovery(bool enable=true)127 void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; }
128
OverrideBGError(Status bg_err)129 void OverrideBGError(Status bg_err) {
130 bg_error_ = bg_err;
131 override_bg_error_ = true;
132 }
133
InjectFileCreationError(FaultInjectionTestFS * fs,int file_count,IOStatus io_s)134 void InjectFileCreationError(FaultInjectionTestFS* fs, int file_count,
135 IOStatus io_s) {
136 fault_fs_ = fs;
137 file_count_ = file_count;
138 file_creation_error_ = io_s;
139 }
140
141 private:
142 InstrumentedMutex mutex_;
143 InstrumentedCondVar cv_;
144 bool no_auto_recovery_;
145 bool recovery_complete_;
146 bool file_creation_started_;
147 bool override_bg_error_;
148 int file_count_;
149 IOStatus file_creation_error_;
150 Status bg_error_;
151 FaultInjectionTestFS* fault_fs_;
152 };
153
TEST_F(DBErrorHandlingFSTest,FLushWriteError)154 TEST_F(DBErrorHandlingFSTest, FLushWriteError) {
155 std::shared_ptr<ErrorHandlerFSListener> listener(
156 new ErrorHandlerFSListener());
157 Options options = GetDefaultOptions();
158 options.env = fault_env_.get();
159 options.create_if_missing = true;
160 options.listeners.emplace_back(listener);
161 options.statistics = CreateDBStatistics();
162 Status s;
163
164 listener->EnableAutoRecovery(false);
165 DestroyAndReopen(options);
166
167 ASSERT_OK(Put(Key(0), "val"));
168 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
169 fault_fs_->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
170 });
171 SyncPoint::GetInstance()->EnableProcessing();
172 s = Flush();
173 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
174 SyncPoint::GetInstance()->DisableProcessing();
175 fault_fs_->SetFilesystemActive(true);
176 s = dbfull()->Resume();
177 ASSERT_OK(s);
178 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
179 ERROR_HANDLER_BG_ERROR_COUNT));
180 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
181 ERROR_HANDLER_BG_IO_ERROR_COUNT));
182 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
183 ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
184 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
185 ERROR_HANDLER_AUTORESUME_COUNT));
186 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
187 ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
188 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
189 ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
190
191 Reopen(options);
192 ASSERT_EQ("val", Get(Key(0)));
193 Destroy(options);
194 }
195
TEST_F(DBErrorHandlingFSTest,FLushWriteRetryableError)196 TEST_F(DBErrorHandlingFSTest, FLushWriteRetryableError) {
197 std::shared_ptr<ErrorHandlerFSListener> listener(
198 new ErrorHandlerFSListener());
199 Options options = GetDefaultOptions();
200 options.env = fault_env_.get();
201 options.create_if_missing = true;
202 options.listeners.emplace_back(listener);
203 options.max_bgerror_resume_count = 0;
204 options.statistics = CreateDBStatistics();
205 Status s;
206
207 listener->EnableAutoRecovery(false);
208 DestroyAndReopen(options);
209
210 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
211 error_msg.SetRetryable(true);
212
213 ASSERT_OK(Put(Key(1), "val1"));
214 SyncPoint::GetInstance()->SetCallBack(
215 "BuildTable:BeforeFinishBuildTable",
216 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
217 SyncPoint::GetInstance()->EnableProcessing();
218 s = Flush();
219 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
220 SyncPoint::GetInstance()->DisableProcessing();
221 fault_fs_->SetFilesystemActive(true);
222 s = dbfull()->Resume();
223 ASSERT_OK(s);
224 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
225 ERROR_HANDLER_BG_ERROR_COUNT));
226 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
227 ERROR_HANDLER_BG_IO_ERROR_COUNT));
228 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
229 ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
230 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
231 ERROR_HANDLER_AUTORESUME_COUNT));
232 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
233 ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
234 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
235 ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
236 Reopen(options);
237 ASSERT_EQ("val1", Get(Key(1)));
238
239 ASSERT_OK(Put(Key(2), "val2"));
240 SyncPoint::GetInstance()->SetCallBack(
241 "BuildTable:BeforeSyncTable",
242 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
243 SyncPoint::GetInstance()->EnableProcessing();
244 s = Flush();
245 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
246 SyncPoint::GetInstance()->DisableProcessing();
247 fault_fs_->SetFilesystemActive(true);
248 s = dbfull()->Resume();
249 ASSERT_OK(s);
250 Reopen(options);
251 ASSERT_EQ("val2", Get(Key(2)));
252
253 ASSERT_OK(Put(Key(3), "val3"));
254 SyncPoint::GetInstance()->SetCallBack(
255 "BuildTable:BeforeCloseTableFile",
256 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
257 SyncPoint::GetInstance()->EnableProcessing();
258 s = Flush();
259 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
260 SyncPoint::GetInstance()->DisableProcessing();
261 fault_fs_->SetFilesystemActive(true);
262 s = dbfull()->Resume();
263 ASSERT_OK(s);
264 Reopen(options);
265 ASSERT_EQ("val3", Get(Key(3)));
266
267 Destroy(options);
268 }
269
TEST_F(DBErrorHandlingFSTest,FLushWriteFileScopeError)270 TEST_F(DBErrorHandlingFSTest, FLushWriteFileScopeError) {
271 std::shared_ptr<ErrorHandlerFSListener> listener(
272 new ErrorHandlerFSListener());
273 Options options = GetDefaultOptions();
274 options.env = fault_env_.get();
275 options.create_if_missing = true;
276 options.listeners.emplace_back(listener);
277 options.max_bgerror_resume_count = 0;
278 Status s;
279
280 listener->EnableAutoRecovery(false);
281 DestroyAndReopen(options);
282
283 IOStatus error_msg = IOStatus::IOError("File Scope Data Loss Error");
284 error_msg.SetDataLoss(true);
285 error_msg.SetScope(
286 ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFile);
287 error_msg.SetRetryable(false);
288
289 ASSERT_OK(Put(Key(1), "val1"));
290 SyncPoint::GetInstance()->SetCallBack(
291 "BuildTable:BeforeFinishBuildTable",
292 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
293 SyncPoint::GetInstance()->EnableProcessing();
294 s = Flush();
295 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
296 SyncPoint::GetInstance()->DisableProcessing();
297 fault_fs_->SetFilesystemActive(true);
298 s = dbfull()->Resume();
299 ASSERT_OK(s);
300 Reopen(options);
301 ASSERT_EQ("val1", Get(Key(1)));
302
303 ASSERT_OK(Put(Key(2), "val2"));
304 SyncPoint::GetInstance()->SetCallBack(
305 "BuildTable:BeforeSyncTable",
306 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
307 SyncPoint::GetInstance()->EnableProcessing();
308 s = Flush();
309 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
310 SyncPoint::GetInstance()->DisableProcessing();
311 fault_fs_->SetFilesystemActive(true);
312 s = dbfull()->Resume();
313 ASSERT_OK(s);
314 Reopen(options);
315 ASSERT_EQ("val2", Get(Key(2)));
316
317 ASSERT_OK(Put(Key(3), "val3"));
318 SyncPoint::GetInstance()->SetCallBack(
319 "BuildTable:BeforeCloseTableFile",
320 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
321 SyncPoint::GetInstance()->EnableProcessing();
322 s = Flush();
323 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
324 SyncPoint::GetInstance()->DisableProcessing();
325 fault_fs_->SetFilesystemActive(true);
326 s = dbfull()->Resume();
327 ASSERT_OK(s);
328 Reopen(options);
329 ASSERT_EQ("val3", Get(Key(3)));
330
331 // not file scope, but retyrable set
332 error_msg.SetDataLoss(false);
333 error_msg.SetScope(
334 ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFileSystem);
335 error_msg.SetRetryable(true);
336
337 ASSERT_OK(Put(Key(3), "val3"));
338 SyncPoint::GetInstance()->SetCallBack(
339 "BuildTable:BeforeCloseTableFile",
340 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
341 SyncPoint::GetInstance()->EnableProcessing();
342 s = Flush();
343 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
344 SyncPoint::GetInstance()->DisableProcessing();
345 fault_fs_->SetFilesystemActive(true);
346 s = dbfull()->Resume();
347 ASSERT_OK(s);
348 Reopen(options);
349 ASSERT_EQ("val3", Get(Key(3)));
350
351 Destroy(options);
352 }
353
TEST_F(DBErrorHandlingFSTest,FLushWALWriteRetryableError)354 TEST_F(DBErrorHandlingFSTest, FLushWALWriteRetryableError) {
355 std::shared_ptr<ErrorHandlerFSListener> listener(
356 new ErrorHandlerFSListener());
357 Options options = GetDefaultOptions();
358 options.env = fault_env_.get();
359 options.create_if_missing = true;
360 options.listeners.emplace_back(listener);
361 options.max_bgerror_resume_count = 0;
362 Status s;
363
364 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
365 error_msg.SetRetryable(true);
366
367 listener->EnableAutoRecovery(false);
368 SyncPoint::GetInstance()->SetCallBack(
369 "DBImpl::SyncClosedLogs:Start",
370 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
371 SyncPoint::GetInstance()->EnableProcessing();
372
373 CreateAndReopenWithCF({"pikachu, sdfsdfsdf"}, options);
374
375 WriteOptions wo = WriteOptions();
376 wo.disableWAL = false;
377 ASSERT_OK(Put(Key(1), "val1", wo));
378
379 s = Flush();
380 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
381 SyncPoint::GetInstance()->DisableProcessing();
382 fault_fs_->SetFilesystemActive(true);
383 auto cfh = dbfull()->GetColumnFamilyHandle(1);
384 s = dbfull()->DropColumnFamily(cfh);
385
386 s = dbfull()->Resume();
387 ASSERT_OK(s);
388 ASSERT_EQ("val1", Get(Key(1)));
389 ASSERT_OK(Put(Key(3), "val3", wo));
390 ASSERT_EQ("val3", Get(Key(3)));
391 s = Flush();
392 ASSERT_OK(s);
393 ASSERT_EQ("val3", Get(Key(3)));
394
395 Destroy(options);
396 }
397
TEST_F(DBErrorHandlingFSTest,FLushWALAtomicWriteRetryableError)398 TEST_F(DBErrorHandlingFSTest, FLushWALAtomicWriteRetryableError) {
399 std::shared_ptr<ErrorHandlerFSListener> listener(
400 new ErrorHandlerFSListener());
401 Options options = GetDefaultOptions();
402 options.env = fault_env_.get();
403 options.create_if_missing = true;
404 options.listeners.emplace_back(listener);
405 options.max_bgerror_resume_count = 0;
406 options.atomic_flush = true;
407 Status s;
408
409 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
410 error_msg.SetRetryable(true);
411
412 listener->EnableAutoRecovery(false);
413 SyncPoint::GetInstance()->SetCallBack(
414 "DBImpl::SyncClosedLogs:Start",
415 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
416 SyncPoint::GetInstance()->EnableProcessing();
417
418 CreateAndReopenWithCF({"pikachu, sdfsdfsdf"}, options);
419
420 WriteOptions wo = WriteOptions();
421 wo.disableWAL = false;
422 ASSERT_OK(Put(Key(1), "val1", wo));
423
424 s = Flush();
425 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
426 SyncPoint::GetInstance()->DisableProcessing();
427 fault_fs_->SetFilesystemActive(true);
428 auto cfh = dbfull()->GetColumnFamilyHandle(1);
429 s = dbfull()->DropColumnFamily(cfh);
430
431 s = dbfull()->Resume();
432 ASSERT_OK(s);
433 ASSERT_EQ("val1", Get(Key(1)));
434 ASSERT_OK(Put(Key(3), "val3", wo));
435 ASSERT_EQ("val3", Get(Key(3)));
436 s = Flush();
437 ASSERT_OK(s);
438 ASSERT_EQ("val3", Get(Key(3)));
439
440 Destroy(options);
441 }
442
TEST_F(DBErrorHandlingFSTest,FLushWritNoWALRetryableError1)443 TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError1) {
444 std::shared_ptr<ErrorHandlerFSListener> listener(
445 new ErrorHandlerFSListener());
446 Options options = GetDefaultOptions();
447 options.env = fault_env_.get();
448 options.create_if_missing = true;
449 options.listeners.emplace_back(listener);
450 options.max_bgerror_resume_count = 0;
451 options.statistics = CreateDBStatistics();
452 Status s;
453
454 listener->EnableAutoRecovery(false);
455 DestroyAndReopen(options);
456
457 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
458 error_msg.SetRetryable(true);
459
460 WriteOptions wo = WriteOptions();
461 wo.disableWAL = true;
462 ASSERT_OK(Put(Key(1), "val1", wo));
463 SyncPoint::GetInstance()->SetCallBack(
464 "BuildTable:BeforeFinishBuildTable",
465 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
466 SyncPoint::GetInstance()->EnableProcessing();
467 s = Flush();
468 ASSERT_OK(Put(Key(2), "val2", wo));
469 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
470 ASSERT_EQ("val2", Get(Key(2)));
471 SyncPoint::GetInstance()->DisableProcessing();
472 fault_fs_->SetFilesystemActive(true);
473 s = dbfull()->Resume();
474 ASSERT_OK(s);
475 ASSERT_EQ("val1", Get(Key(1)));
476 ASSERT_EQ("val2", Get(Key(2)));
477 ASSERT_OK(Put(Key(3), "val3", wo));
478 ASSERT_EQ("val3", Get(Key(3)));
479 s = Flush();
480 ASSERT_OK(s);
481 ASSERT_EQ("val3", Get(Key(3)));
482 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
483 ERROR_HANDLER_BG_ERROR_COUNT));
484 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
485 ERROR_HANDLER_BG_IO_ERROR_COUNT));
486 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
487 ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
488 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
489 ERROR_HANDLER_AUTORESUME_COUNT));
490 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
491 ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
492 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
493 ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
494
495 Destroy(options);
496 }
497
TEST_F(DBErrorHandlingFSTest,FLushWriteNoWALRetryableError2)498 TEST_F(DBErrorHandlingFSTest, FLushWriteNoWALRetryableError2) {
499 std::shared_ptr<ErrorHandlerFSListener> listener(
500 new ErrorHandlerFSListener());
501 Options options = GetDefaultOptions();
502 options.env = fault_env_.get();
503 options.create_if_missing = true;
504 options.listeners.emplace_back(listener);
505 options.max_bgerror_resume_count = 0;
506 Status s;
507
508 listener->EnableAutoRecovery(false);
509 DestroyAndReopen(options);
510
511 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
512 error_msg.SetRetryable(true);
513
514 WriteOptions wo = WriteOptions();
515 wo.disableWAL = true;
516
517 ASSERT_OK(Put(Key(1), "val1", wo));
518 SyncPoint::GetInstance()->SetCallBack(
519 "BuildTable:BeforeSyncTable",
520 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
521 SyncPoint::GetInstance()->EnableProcessing();
522 s = Flush();
523 ASSERT_OK(Put(Key(2), "val2", wo));
524 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
525 ASSERT_EQ("val2", Get(Key(2)));
526 SyncPoint::GetInstance()->DisableProcessing();
527 fault_fs_->SetFilesystemActive(true);
528 s = dbfull()->Resume();
529 ASSERT_OK(s);
530 ASSERT_EQ("val1", Get(Key(1)));
531 ASSERT_EQ("val2", Get(Key(2)));
532 ASSERT_OK(Put(Key(3), "val3", wo));
533 ASSERT_EQ("val3", Get(Key(3)));
534 s = Flush();
535 ASSERT_OK(s);
536 ASSERT_EQ("val3", Get(Key(3)));
537
538 Destroy(options);
539 }
540
TEST_F(DBErrorHandlingFSTest,FLushWriteNoWALRetryableError3)541 TEST_F(DBErrorHandlingFSTest, FLushWriteNoWALRetryableError3) {
542 std::shared_ptr<ErrorHandlerFSListener> listener(
543 new ErrorHandlerFSListener());
544 Options options = GetDefaultOptions();
545 options.env = fault_env_.get();
546 options.create_if_missing = true;
547 options.listeners.emplace_back(listener);
548 options.max_bgerror_resume_count = 0;
549 Status s;
550
551 listener->EnableAutoRecovery(false);
552 DestroyAndReopen(options);
553
554 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
555 error_msg.SetRetryable(true);
556
557 WriteOptions wo = WriteOptions();
558 wo.disableWAL = true;
559
560 ASSERT_OK(Put(Key(1), "val1", wo));
561 SyncPoint::GetInstance()->SetCallBack(
562 "BuildTable:BeforeCloseTableFile",
563 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
564 SyncPoint::GetInstance()->EnableProcessing();
565 s = Flush();
566 ASSERT_OK(Put(Key(2), "val2", wo));
567 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
568 ASSERT_EQ("val2", Get(Key(2)));
569 SyncPoint::GetInstance()->DisableProcessing();
570 fault_fs_->SetFilesystemActive(true);
571 s = dbfull()->Resume();
572 ASSERT_OK(s);
573 ASSERT_EQ("val1", Get(Key(1)));
574 ASSERT_EQ("val2", Get(Key(2)));
575 ASSERT_OK(Put(Key(3), "val3", wo));
576 ASSERT_EQ("val3", Get(Key(3)));
577 s = Flush();
578 ASSERT_OK(s);
579 ASSERT_EQ("val3", Get(Key(3)));
580
581 Destroy(options);
582 }
583
TEST_F(DBErrorHandlingFSTest,ManifestWriteError)584 TEST_F(DBErrorHandlingFSTest, ManifestWriteError) {
585 std::shared_ptr<ErrorHandlerFSListener> listener(
586 new ErrorHandlerFSListener());
587 Options options = GetDefaultOptions();
588 options.env = fault_env_.get();
589 options.create_if_missing = true;
590 options.listeners.emplace_back(listener);
591 Status s;
592 std::string old_manifest;
593 std::string new_manifest;
594
595 listener->EnableAutoRecovery(false);
596 DestroyAndReopen(options);
597 old_manifest = GetManifestNameFromLiveFiles();
598
599 ASSERT_OK(Put(Key(0), "val"));
600 ASSERT_OK(Flush());
601 ASSERT_OK(Put(Key(1), "val"));
602 SyncPoint::GetInstance()->SetCallBack(
603 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
604 fault_fs_->SetFilesystemActive(false,
605 IOStatus::NoSpace("Out of space"));
606 });
607 SyncPoint::GetInstance()->EnableProcessing();
608 s = Flush();
609 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
610 SyncPoint::GetInstance()->ClearAllCallBacks();
611 SyncPoint::GetInstance()->DisableProcessing();
612 fault_fs_->SetFilesystemActive(true);
613 s = dbfull()->Resume();
614 ASSERT_OK(s);
615
616 new_manifest = GetManifestNameFromLiveFiles();
617 ASSERT_NE(new_manifest, old_manifest);
618
619 Reopen(options);
620 ASSERT_EQ("val", Get(Key(0)));
621 ASSERT_EQ("val", Get(Key(1)));
622 Close();
623 }
624
TEST_F(DBErrorHandlingFSTest,ManifestWriteRetryableError)625 TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableError) {
626 std::shared_ptr<ErrorHandlerFSListener> listener(
627 new ErrorHandlerFSListener());
628 Options options = GetDefaultOptions();
629 options.env = fault_env_.get();
630 options.create_if_missing = true;
631 options.listeners.emplace_back(listener);
632 options.max_bgerror_resume_count = 0;
633 Status s;
634 std::string old_manifest;
635 std::string new_manifest;
636
637 listener->EnableAutoRecovery(false);
638 DestroyAndReopen(options);
639 old_manifest = GetManifestNameFromLiveFiles();
640
641 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
642 error_msg.SetRetryable(true);
643
644 ASSERT_OK(Put(Key(0), "val"));
645 ASSERT_OK(Flush());
646 ASSERT_OK(Put(Key(1), "val"));
647 SyncPoint::GetInstance()->SetCallBack(
648 "VersionSet::LogAndApply:WriteManifest",
649 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
650 SyncPoint::GetInstance()->EnableProcessing();
651 s = Flush();
652 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
653 SyncPoint::GetInstance()->ClearAllCallBacks();
654 SyncPoint::GetInstance()->DisableProcessing();
655 fault_fs_->SetFilesystemActive(true);
656 s = dbfull()->Resume();
657 ASSERT_OK(s);
658
659 new_manifest = GetManifestNameFromLiveFiles();
660 ASSERT_NE(new_manifest, old_manifest);
661
662 Reopen(options);
663 ASSERT_EQ("val", Get(Key(0)));
664 ASSERT_EQ("val", Get(Key(1)));
665 Close();
666 }
667
TEST_F(DBErrorHandlingFSTest,ManifestWriteFileScopeError)668 TEST_F(DBErrorHandlingFSTest, ManifestWriteFileScopeError) {
669 std::shared_ptr<ErrorHandlerFSListener> listener(
670 new ErrorHandlerFSListener());
671 Options options = GetDefaultOptions();
672 options.env = fault_env_.get();
673 options.create_if_missing = true;
674 options.listeners.emplace_back(listener);
675 options.max_bgerror_resume_count = 0;
676 Status s;
677 std::string old_manifest;
678 std::string new_manifest;
679
680 listener->EnableAutoRecovery(false);
681 DestroyAndReopen(options);
682 old_manifest = GetManifestNameFromLiveFiles();
683
684 IOStatus error_msg = IOStatus::IOError("File Scope Data Loss Error");
685 error_msg.SetDataLoss(true);
686 error_msg.SetScope(
687 ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFile);
688 error_msg.SetRetryable(false);
689
690 ASSERT_OK(Put(Key(0), "val"));
691 ASSERT_OK(Flush());
692 ASSERT_OK(Put(Key(1), "val"));
693 SyncPoint::GetInstance()->SetCallBack(
694 "VersionSet::LogAndApply:WriteManifest",
695 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
696 SyncPoint::GetInstance()->EnableProcessing();
697 s = Flush();
698 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
699 SyncPoint::GetInstance()->ClearAllCallBacks();
700 SyncPoint::GetInstance()->DisableProcessing();
701 fault_fs_->SetFilesystemActive(true);
702 s = dbfull()->Resume();
703 ASSERT_OK(s);
704
705 new_manifest = GetManifestNameFromLiveFiles();
706 ASSERT_NE(new_manifest, old_manifest);
707
708 Reopen(options);
709 ASSERT_EQ("val", Get(Key(0)));
710 ASSERT_EQ("val", Get(Key(1)));
711 Close();
712 }
713
TEST_F(DBErrorHandlingFSTest,ManifestWriteNoWALRetryableError)714 TEST_F(DBErrorHandlingFSTest, ManifestWriteNoWALRetryableError) {
715 std::shared_ptr<ErrorHandlerFSListener> listener(
716 new ErrorHandlerFSListener());
717 Options options = GetDefaultOptions();
718 options.env = fault_env_.get();
719 options.create_if_missing = true;
720 options.listeners.emplace_back(listener);
721 options.max_bgerror_resume_count = 0;
722 Status s;
723 std::string old_manifest;
724 std::string new_manifest;
725
726 listener->EnableAutoRecovery(false);
727 DestroyAndReopen(options);
728 old_manifest = GetManifestNameFromLiveFiles();
729
730 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
731 error_msg.SetRetryable(true);
732
733 WriteOptions wo = WriteOptions();
734 wo.disableWAL = true;
735 ASSERT_OK(Put(Key(0), "val", wo));
736 ASSERT_OK(Flush());
737 ASSERT_OK(Put(Key(1), "val", wo));
738 SyncPoint::GetInstance()->SetCallBack(
739 "VersionSet::LogAndApply:WriteManifest",
740 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
741 SyncPoint::GetInstance()->EnableProcessing();
742 s = Flush();
743 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
744 SyncPoint::GetInstance()->ClearAllCallBacks();
745 SyncPoint::GetInstance()->DisableProcessing();
746 fault_fs_->SetFilesystemActive(true);
747 s = dbfull()->Resume();
748 ASSERT_OK(s);
749
750 new_manifest = GetManifestNameFromLiveFiles();
751 ASSERT_NE(new_manifest, old_manifest);
752
753 Reopen(options);
754 ASSERT_EQ("val", Get(Key(0)));
755 ASSERT_EQ("val", Get(Key(1)));
756 Close();
757 }
758
TEST_F(DBErrorHandlingFSTest,DoubleManifestWriteError)759 TEST_F(DBErrorHandlingFSTest, DoubleManifestWriteError) {
760 std::shared_ptr<ErrorHandlerFSListener> listener(
761 new ErrorHandlerFSListener());
762 Options options = GetDefaultOptions();
763 options.env = fault_env_.get();
764 options.create_if_missing = true;
765 options.listeners.emplace_back(listener);
766 Status s;
767 std::string old_manifest;
768 std::string new_manifest;
769
770 listener->EnableAutoRecovery(false);
771 DestroyAndReopen(options);
772 old_manifest = GetManifestNameFromLiveFiles();
773
774 ASSERT_OK(Put(Key(0), "val"));
775 ASSERT_OK(Flush());
776 ASSERT_OK(Put(Key(1), "val"));
777 SyncPoint::GetInstance()->SetCallBack(
778 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
779 fault_fs_->SetFilesystemActive(false,
780 IOStatus::NoSpace("Out of space"));
781 });
782 SyncPoint::GetInstance()->EnableProcessing();
783 s = Flush();
784 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
785 fault_fs_->SetFilesystemActive(true);
786
787 // This Resume() will attempt to create a new manifest file and fail again
788 s = dbfull()->Resume();
789 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
790 fault_fs_->SetFilesystemActive(true);
791 SyncPoint::GetInstance()->ClearAllCallBacks();
792 SyncPoint::GetInstance()->DisableProcessing();
793
794 // A successful Resume() will create a new manifest file
795 s = dbfull()->Resume();
796 ASSERT_OK(s);
797
798 new_manifest = GetManifestNameFromLiveFiles();
799 ASSERT_NE(new_manifest, old_manifest);
800
801 Reopen(options);
802 ASSERT_EQ("val", Get(Key(0)));
803 ASSERT_EQ("val", Get(Key(1)));
804 Close();
805 }
806
TEST_F(DBErrorHandlingFSTest,CompactionManifestWriteError)807 TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) {
808 if (mem_env_ != nullptr) {
809 ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
810 return;
811 }
812 std::shared_ptr<ErrorHandlerFSListener> listener(
813 new ErrorHandlerFSListener());
814 Options options = GetDefaultOptions();
815 options.env = fault_env_.get();
816 options.create_if_missing = true;
817 options.level0_file_num_compaction_trigger = 2;
818 options.listeners.emplace_back(listener);
819 Status s;
820 std::string old_manifest;
821 std::string new_manifest;
822 std::atomic<bool> fail_manifest(false);
823 DestroyAndReopen(options);
824 old_manifest = GetManifestNameFromLiveFiles();
825
826 ASSERT_OK(Put(Key(0), "val"));
827 ASSERT_OK(Put(Key(2), "val"));
828 s = Flush();
829 ASSERT_OK(s);
830
831 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
832 // Wait for flush of 2nd L0 file before starting compaction
833 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
834 "BackgroundCallCompaction:0"},
835 // Wait for compaction to detect manifest write error
836 {"BackgroundCallCompaction:1", "CompactionManifestWriteError:0"},
837 // Make compaction thread wait for error to be cleared
838 {"CompactionManifestWriteError:1",
839 "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"},
840 // Wait for DB instance to clear bg_error before calling
841 // TEST_WaitForCompact
842 {"SstFileManagerImpl::ErrorCleared", "CompactionManifestWriteError:2"}});
843 // trigger manifest write failure in compaction thread
844 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
845 "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
846 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
847 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
848 if (fail_manifest.load()) {
849 fault_fs_->SetFilesystemActive(false,
850 IOStatus::NoSpace("Out of space"));
851 }
852 });
853 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
854
855 ASSERT_OK(Put(Key(1), "val"));
856 // This Flush will trigger a compaction, which will fail when appending to
857 // the manifest
858 s = Flush();
859 ASSERT_OK(s);
860
861 TEST_SYNC_POINT("CompactionManifestWriteError:0");
862 // Clear all errors so when the compaction is retried, it will succeed
863 fault_fs_->SetFilesystemActive(true);
864 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
865 TEST_SYNC_POINT("CompactionManifestWriteError:1");
866 TEST_SYNC_POINT("CompactionManifestWriteError:2");
867
868 s = dbfull()->TEST_WaitForCompact();
869 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
870 ASSERT_OK(s);
871
872 new_manifest = GetManifestNameFromLiveFiles();
873 ASSERT_NE(new_manifest, old_manifest);
874 Reopen(options);
875 ASSERT_EQ("val", Get(Key(0)));
876 ASSERT_EQ("val", Get(Key(1)));
877 ASSERT_EQ("val", Get(Key(2)));
878 Close();
879 }
880
TEST_F(DBErrorHandlingFSTest,CompactionManifestWriteRetryableError)881 TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) {
882 std::shared_ptr<ErrorHandlerFSListener> listener(
883 new ErrorHandlerFSListener());
884 Options options = GetDefaultOptions();
885 options.env = fault_env_.get();
886 options.create_if_missing = true;
887 options.level0_file_num_compaction_trigger = 2;
888 options.listeners.emplace_back(listener);
889 options.max_bgerror_resume_count = 0;
890 Status s;
891 std::string old_manifest;
892 std::string new_manifest;
893 std::atomic<bool> fail_manifest(false);
894 DestroyAndReopen(options);
895 old_manifest = GetManifestNameFromLiveFiles();
896
897 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
898 error_msg.SetRetryable(true);
899
900 ASSERT_OK(Put(Key(0), "val"));
901 ASSERT_OK(Put(Key(2), "val"));
902 s = Flush();
903 ASSERT_OK(s);
904
905 listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
906 listener->EnableAutoRecovery(false);
907 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
908 // Wait for flush of 2nd L0 file before starting compaction
909 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
910 "BackgroundCallCompaction:0"},
911 // Wait for compaction to detect manifest write error
912 {"BackgroundCallCompaction:1", "CompactionManifestWriteError:0"},
913 // Make compaction thread wait for error to be cleared
914 {"CompactionManifestWriteError:1",
915 "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"}});
916 // trigger manifest write failure in compaction thread
917 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
918 "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
919 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
920 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
921 if (fail_manifest.load()) {
922 fault_fs_->SetFilesystemActive(false, error_msg);
923 }
924 });
925 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
926
927 ASSERT_OK(Put(Key(1), "val"));
928 s = Flush();
929 ASSERT_OK(s);
930
931 TEST_SYNC_POINT("CompactionManifestWriteError:0");
932 TEST_SYNC_POINT("CompactionManifestWriteError:1");
933
934 s = dbfull()->TEST_WaitForCompact();
935 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
936
937 fault_fs_->SetFilesystemActive(true);
938 SyncPoint::GetInstance()->ClearAllCallBacks();
939 SyncPoint::GetInstance()->DisableProcessing();
940 s = dbfull()->Resume();
941 ASSERT_OK(s);
942
943 new_manifest = GetManifestNameFromLiveFiles();
944 ASSERT_NE(new_manifest, old_manifest);
945
946 Reopen(options);
947 ASSERT_EQ("val", Get(Key(0)));
948 ASSERT_EQ("val", Get(Key(1)));
949 ASSERT_EQ("val", Get(Key(2)));
950 Close();
951 }
952
TEST_F(DBErrorHandlingFSTest,CompactionWriteError)953 TEST_F(DBErrorHandlingFSTest, CompactionWriteError) {
954 std::shared_ptr<ErrorHandlerFSListener> listener(
955 new ErrorHandlerFSListener());
956 Options options = GetDefaultOptions();
957 options.env = fault_env_.get();
958 options.create_if_missing = true;
959 options.level0_file_num_compaction_trigger = 2;
960 options.listeners.emplace_back(listener);
961 Status s;
962 DestroyAndReopen(options);
963
964 ASSERT_OK(Put(Key(0), "va;"));
965 ASSERT_OK(Put(Key(2), "va;"));
966 s = Flush();
967 ASSERT_OK(s);
968
969 listener->OverrideBGError(
970 Status(Status::NoSpace(), Status::Severity::kHardError));
971 listener->EnableAutoRecovery(false);
972 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
973 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
974 "BackgroundCallCompaction:0"}});
975 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
976 "BackgroundCallCompaction:0", [&](void*) {
977 fault_fs_->SetFilesystemActive(false,
978 IOStatus::NoSpace("Out of space"));
979 });
980 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
981
982 ASSERT_OK(Put(Key(1), "val"));
983 s = Flush();
984 ASSERT_OK(s);
985
986 s = dbfull()->TEST_WaitForCompact();
987 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
988
989 fault_fs_->SetFilesystemActive(true);
990 s = dbfull()->Resume();
991 ASSERT_OK(s);
992 Destroy(options);
993 }
994
TEST_F(DBErrorHandlingFSTest,DISABLED_CompactionWriteRetryableError)995 TEST_F(DBErrorHandlingFSTest, DISABLED_CompactionWriteRetryableError) {
996 std::shared_ptr<ErrorHandlerFSListener> listener(
997 new ErrorHandlerFSListener());
998 Options options = GetDefaultOptions();
999 options.env = fault_env_.get();
1000 options.create_if_missing = true;
1001 options.level0_file_num_compaction_trigger = 2;
1002 options.listeners.emplace_back(listener);
1003 options.max_bgerror_resume_count = 0;
1004 Status s;
1005 DestroyAndReopen(options);
1006
1007 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1008 error_msg.SetRetryable(true);
1009
1010 ASSERT_OK(Put(Key(0), "va;"));
1011 ASSERT_OK(Put(Key(2), "va;"));
1012 s = Flush();
1013 ASSERT_OK(s);
1014
1015 listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
1016 listener->EnableAutoRecovery(false);
1017 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1018 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
1019 "BackgroundCallCompaction:0"}});
1020 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1021 "CompactionJob::OpenCompactionOutputFile",
1022 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1023 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1024 "DBImpl::BackgroundCompaction:Finish",
1025 [&](void*) { CancelAllBackgroundWork(dbfull()); });
1026 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1027
1028 ASSERT_OK(Put(Key(1), "val"));
1029 s = Flush();
1030 ASSERT_OK(s);
1031
1032 s = dbfull()->TEST_GetBGError();
1033 ASSERT_OK(s);
1034 fault_fs_->SetFilesystemActive(true);
1035 SyncPoint::GetInstance()->ClearAllCallBacks();
1036 SyncPoint::GetInstance()->DisableProcessing();
1037 s = dbfull()->Resume();
1038 ASSERT_OK(s);
1039 Destroy(options);
1040 }
1041
TEST_F(DBErrorHandlingFSTest,DISABLED_CompactionWriteFileScopeError)1042 TEST_F(DBErrorHandlingFSTest, DISABLED_CompactionWriteFileScopeError) {
1043 std::shared_ptr<ErrorHandlerFSListener> listener(
1044 new ErrorHandlerFSListener());
1045 Options options = GetDefaultOptions();
1046 options.env = fault_env_.get();
1047 options.create_if_missing = true;
1048 options.level0_file_num_compaction_trigger = 2;
1049 options.listeners.emplace_back(listener);
1050 options.max_bgerror_resume_count = 0;
1051 Status s;
1052 DestroyAndReopen(options);
1053
1054 IOStatus error_msg = IOStatus::IOError("File Scope Data Loss Error");
1055 error_msg.SetDataLoss(true);
1056 error_msg.SetScope(
1057 ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFile);
1058 error_msg.SetRetryable(false);
1059
1060 ASSERT_OK(Put(Key(0), "va;"));
1061 ASSERT_OK(Put(Key(2), "va;"));
1062 s = Flush();
1063 ASSERT_OK(s);
1064
1065 listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
1066 listener->EnableAutoRecovery(false);
1067 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1068 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
1069 "BackgroundCallCompaction:0"}});
1070 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1071 "CompactionJob::OpenCompactionOutputFile",
1072 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1073 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1074 "DBImpl::BackgroundCompaction:Finish",
1075 [&](void*) { CancelAllBackgroundWork(dbfull()); });
1076 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1077
1078 ASSERT_OK(Put(Key(1), "val"));
1079 s = Flush();
1080 ASSERT_OK(s);
1081
1082 s = dbfull()->TEST_GetBGError();
1083 ASSERT_OK(s);
1084
1085 fault_fs_->SetFilesystemActive(true);
1086 SyncPoint::GetInstance()->ClearAllCallBacks();
1087 SyncPoint::GetInstance()->DisableProcessing();
1088 s = dbfull()->Resume();
1089 ASSERT_OK(s);
1090 Destroy(options);
1091 }
1092
TEST_F(DBErrorHandlingFSTest,CorruptionError)1093 TEST_F(DBErrorHandlingFSTest, CorruptionError) {
1094 Options options = GetDefaultOptions();
1095 options.env = fault_env_.get();
1096 options.create_if_missing = true;
1097 options.level0_file_num_compaction_trigger = 2;
1098 Status s;
1099 DestroyAndReopen(options);
1100
1101 ASSERT_OK(Put(Key(0), "va;"));
1102 ASSERT_OK(Put(Key(2), "va;"));
1103 s = Flush();
1104 ASSERT_OK(s);
1105
1106 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1107 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
1108 "BackgroundCallCompaction:0"}});
1109 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1110 "BackgroundCallCompaction:0", [&](void*) {
1111 fault_fs_->SetFilesystemActive(false,
1112 IOStatus::Corruption("Corruption"));
1113 });
1114 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1115
1116 ASSERT_OK(Put(Key(1), "val"));
1117 s = Flush();
1118 ASSERT_OK(s);
1119
1120 s = dbfull()->TEST_WaitForCompact();
1121 ASSERT_EQ(s.severity(),
1122 ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
1123
1124 fault_fs_->SetFilesystemActive(true);
1125 s = dbfull()->Resume();
1126 ASSERT_NOK(s);
1127 Destroy(options);
1128 }
1129
TEST_F(DBErrorHandlingFSTest,AutoRecoverFlushError)1130 TEST_F(DBErrorHandlingFSTest, AutoRecoverFlushError) {
1131 if (mem_env_ != nullptr) {
1132 ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
1133 return;
1134 }
1135 std::shared_ptr<ErrorHandlerFSListener> listener(
1136 new ErrorHandlerFSListener());
1137 Options options = GetDefaultOptions();
1138 options.env = fault_env_.get();
1139 options.create_if_missing = true;
1140 options.listeners.emplace_back(listener);
1141 options.statistics = CreateDBStatistics();
1142 Status s;
1143
1144 listener->EnableAutoRecovery();
1145 DestroyAndReopen(options);
1146
1147 ASSERT_OK(Put(Key(0), "val"));
1148 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
1149 fault_fs_->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
1150 });
1151 SyncPoint::GetInstance()->EnableProcessing();
1152 s = Flush();
1153 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
1154 SyncPoint::GetInstance()->DisableProcessing();
1155 fault_fs_->SetFilesystemActive(true);
1156 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
1157
1158 s = Put(Key(1), "val");
1159 ASSERT_OK(s);
1160 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
1161 ERROR_HANDLER_BG_ERROR_COUNT));
1162 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
1163 ERROR_HANDLER_BG_IO_ERROR_COUNT));
1164 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
1165 ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
1166 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
1167 ERROR_HANDLER_AUTORESUME_COUNT));
1168 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
1169 ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
1170 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
1171 ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
1172
1173 Reopen(options);
1174 ASSERT_EQ("val", Get(Key(0)));
1175 ASSERT_EQ("val", Get(Key(1)));
1176 Destroy(options);
1177 }
1178
TEST_F(DBErrorHandlingFSTest,FailRecoverFlushError)1179 TEST_F(DBErrorHandlingFSTest, FailRecoverFlushError) {
1180 std::shared_ptr<ErrorHandlerFSListener> listener(
1181 new ErrorHandlerFSListener());
1182 Options options = GetDefaultOptions();
1183 options.env = fault_env_.get();
1184 options.create_if_missing = true;
1185 options.listeners.emplace_back(listener);
1186 Status s;
1187
1188 listener->EnableAutoRecovery();
1189 DestroyAndReopen(options);
1190
1191 ASSERT_OK(Put(Key(0), "val"));
1192 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
1193 fault_fs_->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
1194 });
1195 SyncPoint::GetInstance()->EnableProcessing();
1196 s = Flush();
1197 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
1198 // We should be able to shutdown the database while auto recovery is going
1199 // on in the background
1200 Close();
1201 DestroyDB(dbname_, options).PermitUncheckedError();
1202 }
1203
TEST_F(DBErrorHandlingFSTest,WALWriteError)1204 TEST_F(DBErrorHandlingFSTest, WALWriteError) {
1205 if (mem_env_ != nullptr) {
1206 ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
1207 return;
1208 }
1209 std::shared_ptr<ErrorHandlerFSListener> listener(
1210 new ErrorHandlerFSListener());
1211 Options options = GetDefaultOptions();
1212 options.env = fault_env_.get();
1213 options.create_if_missing = true;
1214 options.writable_file_max_buffer_size = 32768;
1215 options.listeners.emplace_back(listener);
1216 Status s;
1217 Random rnd(301);
1218
1219 listener->EnableAutoRecovery();
1220 DestroyAndReopen(options);
1221
1222 {
1223 WriteBatch batch;
1224
1225 for (auto i = 0; i < 100; ++i) {
1226 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
1227 }
1228
1229 WriteOptions wopts;
1230 wopts.sync = true;
1231 ASSERT_OK(dbfull()->Write(wopts, &batch));
1232 };
1233
1234 {
1235 WriteBatch batch;
1236 int write_error = 0;
1237
1238 for (auto i = 100; i < 199; ++i) {
1239 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
1240 }
1241
1242 SyncPoint::GetInstance()->SetCallBack(
1243 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
1244 write_error++;
1245 if (write_error > 2) {
1246 fault_fs_->SetFilesystemActive(false,
1247 IOStatus::NoSpace("Out of space"));
1248 }
1249 });
1250 SyncPoint::GetInstance()->EnableProcessing();
1251 WriteOptions wopts;
1252 wopts.sync = true;
1253 s = dbfull()->Write(wopts, &batch);
1254 ASSERT_EQ(s, s.NoSpace());
1255 }
1256 SyncPoint::GetInstance()->DisableProcessing();
1257 fault_fs_->SetFilesystemActive(true);
1258 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
1259 for (auto i = 0; i < 199; ++i) {
1260 if (i < 100) {
1261 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
1262 } else {
1263 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
1264 }
1265 }
1266 Reopen(options);
1267 for (auto i = 0; i < 199; ++i) {
1268 if (i < 100) {
1269 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
1270 } else {
1271 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
1272 }
1273 }
1274 Close();
1275 }
1276
TEST_F(DBErrorHandlingFSTest,WALWriteRetryableError)1277 TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) {
1278 std::shared_ptr<ErrorHandlerFSListener> listener(
1279 new ErrorHandlerFSListener());
1280 Options options = GetDefaultOptions();
1281 options.env = fault_env_.get();
1282 options.create_if_missing = true;
1283 options.writable_file_max_buffer_size = 32768;
1284 options.listeners.emplace_back(listener);
1285 options.paranoid_checks = true;
1286 options.max_bgerror_resume_count = 0;
1287 Random rnd(301);
1288
1289 DestroyAndReopen(options);
1290
1291 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1292 error_msg.SetRetryable(true);
1293
1294 // For the first batch, write is successful, require sync
1295 {
1296 WriteBatch batch;
1297
1298 for (auto i = 0; i < 100; ++i) {
1299 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
1300 }
1301
1302 WriteOptions wopts;
1303 wopts.sync = true;
1304 ASSERT_OK(dbfull()->Write(wopts, &batch));
1305 };
1306
1307 // For the second batch, the first 2 file Append are successful, then the
1308 // following Append fails due to file system retryable IOError.
1309 {
1310 WriteBatch batch;
1311 int write_error = 0;
1312
1313 for (auto i = 100; i < 200; ++i) {
1314 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
1315 }
1316
1317 SyncPoint::GetInstance()->SetCallBack(
1318 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
1319 write_error++;
1320 if (write_error > 2) {
1321 fault_fs_->SetFilesystemActive(false, error_msg);
1322 }
1323 });
1324 SyncPoint::GetInstance()->EnableProcessing();
1325 WriteOptions wopts;
1326 wopts.sync = true;
1327 Status s = dbfull()->Write(wopts, &batch);
1328 ASSERT_TRUE(s.IsIOError());
1329 }
1330 fault_fs_->SetFilesystemActive(true);
1331 SyncPoint::GetInstance()->ClearAllCallBacks();
1332 SyncPoint::GetInstance()->DisableProcessing();
1333
1334 // Data in corrupted WAL are not stored
1335 for (auto i = 0; i < 199; ++i) {
1336 if (i < 100) {
1337 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
1338 } else {
1339 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
1340 }
1341 }
1342
1343 // Resume and write a new batch, should be in the WAL
1344 ASSERT_OK(dbfull()->Resume());
1345 {
1346 WriteBatch batch;
1347
1348 for (auto i = 200; i < 300; ++i) {
1349 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
1350 }
1351
1352 WriteOptions wopts;
1353 wopts.sync = true;
1354 ASSERT_OK(dbfull()->Write(wopts, &batch));
1355 };
1356
1357 Reopen(options);
1358 for (auto i = 0; i < 300; ++i) {
1359 if (i < 100 || i >= 200) {
1360 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
1361 } else {
1362 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
1363 }
1364 }
1365 Close();
1366 }
1367
TEST_F(DBErrorHandlingFSTest,MultiCFWALWriteError)1368 TEST_F(DBErrorHandlingFSTest, MultiCFWALWriteError) {
1369 if (mem_env_ != nullptr) {
1370 ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
1371 return;
1372 }
1373 std::shared_ptr<ErrorHandlerFSListener> listener(
1374 new ErrorHandlerFSListener());
1375 Options options = GetDefaultOptions();
1376 options.env = fault_env_.get();
1377 options.create_if_missing = true;
1378 options.writable_file_max_buffer_size = 32768;
1379 options.listeners.emplace_back(listener);
1380 Random rnd(301);
1381
1382 listener->EnableAutoRecovery();
1383 CreateAndReopenWithCF({"one", "two", "three"}, options);
1384
1385 {
1386 WriteBatch batch;
1387
1388 for (auto i = 1; i < 4; ++i) {
1389 for (auto j = 0; j < 100; ++j) {
1390 ASSERT_OK(batch.Put(handles_[i], Key(j), rnd.RandomString(1024)));
1391 }
1392 }
1393
1394 WriteOptions wopts;
1395 wopts.sync = true;
1396 ASSERT_OK(dbfull()->Write(wopts, &batch));
1397 };
1398
1399 {
1400 WriteBatch batch;
1401 int write_error = 0;
1402
1403 // Write to one CF
1404 for (auto i = 100; i < 199; ++i) {
1405 ASSERT_OK(batch.Put(handles_[2], Key(i), rnd.RandomString(1024)));
1406 }
1407
1408 SyncPoint::GetInstance()->SetCallBack(
1409 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
1410 write_error++;
1411 if (write_error > 2) {
1412 fault_fs_->SetFilesystemActive(false,
1413 IOStatus::NoSpace("Out of space"));
1414 }
1415 });
1416 SyncPoint::GetInstance()->EnableProcessing();
1417 WriteOptions wopts;
1418 wopts.sync = true;
1419 Status s = dbfull()->Write(wopts, &batch);
1420 ASSERT_TRUE(s.IsNoSpace());
1421 }
1422 SyncPoint::GetInstance()->DisableProcessing();
1423 fault_fs_->SetFilesystemActive(true);
1424 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
1425
1426 for (auto i = 1; i < 4; ++i) {
1427 // Every CF should have been flushed
1428 ASSERT_EQ(NumTableFilesAtLevel(0, i), 1);
1429 }
1430
1431 for (auto i = 1; i < 4; ++i) {
1432 for (auto j = 0; j < 199; ++j) {
1433 if (j < 100) {
1434 ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
1435 } else {
1436 ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
1437 }
1438 }
1439 }
1440 ReopenWithColumnFamilies({"default", "one", "two", "three"}, options);
1441 for (auto i = 1; i < 4; ++i) {
1442 for (auto j = 0; j < 199; ++j) {
1443 if (j < 100) {
1444 ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
1445 } else {
1446 ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
1447 }
1448 }
1449 }
1450 Close();
1451 }
1452
TEST_F(DBErrorHandlingFSTest,MultiDBCompactionError)1453 TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) {
1454 if (mem_env_ != nullptr) {
1455 ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
1456 return;
1457 }
1458 FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(env_);
1459 std::vector<std::unique_ptr<Env>> fault_envs;
1460 std::vector<FaultInjectionTestFS*> fault_fs;
1461 std::vector<Options> options;
1462 std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener;
1463 std::vector<DB*> db;
1464 std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
1465 int kNumDbInstances = 3;
1466 Random rnd(301);
1467
1468 for (auto i = 0; i < kNumDbInstances; ++i) {
1469 listener.emplace_back(new ErrorHandlerFSListener());
1470 options.emplace_back(GetDefaultOptions());
1471 fault_fs.emplace_back(new FaultInjectionTestFS(env_->GetFileSystem()));
1472 std::shared_ptr<FileSystem> fs(fault_fs.back());
1473 fault_envs.emplace_back(new CompositeEnvWrapper(def_env, fs));
1474 options[i].env = fault_envs.back().get();
1475 options[i].create_if_missing = true;
1476 options[i].level0_file_num_compaction_trigger = 2;
1477 options[i].writable_file_max_buffer_size = 32768;
1478 options[i].listeners.emplace_back(listener[i]);
1479 options[i].sst_file_manager = sfm;
1480 DB* dbptr;
1481 char buf[16];
1482
1483 listener[i]->EnableAutoRecovery();
1484 // Setup for returning error for the 3rd SST, which would be level 1
1485 listener[i]->InjectFileCreationError(fault_fs[i], 3,
1486 IOStatus::NoSpace("Out of space"));
1487 snprintf(buf, sizeof(buf), "_%d", i);
1488 ASSERT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
1489 ASSERT_OK(DB::Open(options[i], dbname_ + std::string(buf), &dbptr));
1490 db.emplace_back(dbptr);
1491 }
1492
1493 for (auto i = 0; i < kNumDbInstances; ++i) {
1494 WriteBatch batch;
1495
1496 for (auto j = 0; j <= 100; ++j) {
1497 ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
1498 }
1499
1500 WriteOptions wopts;
1501 wopts.sync = true;
1502 ASSERT_OK(db[i]->Write(wopts, &batch));
1503 ASSERT_OK(db[i]->Flush(FlushOptions()));
1504 }
1505
1506 def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
1507 for (auto i = 0; i < kNumDbInstances; ++i) {
1508 WriteBatch batch;
1509
1510 // Write to one CF
1511 for (auto j = 100; j < 199; ++j) {
1512 ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
1513 }
1514
1515 WriteOptions wopts;
1516 wopts.sync = true;
1517 ASSERT_OK(db[i]->Write(wopts, &batch));
1518 ASSERT_OK(db[i]->Flush(FlushOptions()));
1519 }
1520
1521 for (auto i = 0; i < kNumDbInstances; ++i) {
1522 Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
1523 ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
1524 fault_fs[i]->SetFilesystemActive(true);
1525 }
1526
1527 def_env->SetFilesystemActive(true);
1528 for (auto i = 0; i < kNumDbInstances; ++i) {
1529 std::string prop;
1530 ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
1531 ASSERT_OK(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true));
1532 EXPECT_TRUE(db[i]->GetProperty(
1533 "rocksdb.num-files-at-level" + NumberToString(0), &prop));
1534 EXPECT_EQ(atoi(prop.c_str()), 0);
1535 EXPECT_TRUE(db[i]->GetProperty(
1536 "rocksdb.num-files-at-level" + NumberToString(1), &prop));
1537 EXPECT_EQ(atoi(prop.c_str()), 1);
1538 }
1539
1540 SstFileManagerImpl* sfmImpl =
1541 static_cast_with_check<SstFileManagerImpl>(sfm.get());
1542 sfmImpl->Close();
1543
1544 for (auto i = 0; i < kNumDbInstances; ++i) {
1545 char buf[16];
1546 snprintf(buf, sizeof(buf), "_%d", i);
1547 delete db[i];
1548 fault_fs[i]->SetFilesystemActive(true);
1549 if (getenv("KEEP_DB")) {
1550 printf("DB is still at %s%s\n", dbname_.c_str(), buf);
1551 } else {
1552 ASSERT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
1553 }
1554 }
1555 options.clear();
1556 sfm.reset();
1557 delete def_env;
1558 }
1559
TEST_F(DBErrorHandlingFSTest,MultiDBVariousErrors)1560 TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
1561 if (mem_env_ != nullptr) {
1562 ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
1563 return;
1564 }
1565 FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(env_);
1566 std::vector<std::unique_ptr<Env>> fault_envs;
1567 std::vector<FaultInjectionTestFS*> fault_fs;
1568 std::vector<Options> options;
1569 std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener;
1570 std::vector<DB*> db;
1571 std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
1572 int kNumDbInstances = 3;
1573 Random rnd(301);
1574
1575 for (auto i = 0; i < kNumDbInstances; ++i) {
1576 listener.emplace_back(new ErrorHandlerFSListener());
1577 options.emplace_back(GetDefaultOptions());
1578 fault_fs.emplace_back(new FaultInjectionTestFS(env_->GetFileSystem()));
1579 std::shared_ptr<FileSystem> fs(fault_fs.back());
1580 fault_envs.emplace_back(new CompositeEnvWrapper(def_env, fs));
1581 options[i].env = fault_envs.back().get();
1582 options[i].create_if_missing = true;
1583 options[i].level0_file_num_compaction_trigger = 2;
1584 options[i].writable_file_max_buffer_size = 32768;
1585 options[i].listeners.emplace_back(listener[i]);
1586 options[i].sst_file_manager = sfm;
1587 DB* dbptr;
1588 char buf[16];
1589
1590 listener[i]->EnableAutoRecovery();
1591 switch (i) {
1592 case 0:
1593 // Setup for returning error for the 3rd SST, which would be level 1
1594 listener[i]->InjectFileCreationError(fault_fs[i], 3,
1595 IOStatus::NoSpace("Out of space"));
1596 break;
1597 case 1:
1598 // Setup for returning error after the 1st SST, which would result
1599 // in a hard error
1600 listener[i]->InjectFileCreationError(fault_fs[i], 2,
1601 IOStatus::NoSpace("Out of space"));
1602 break;
1603 default:
1604 break;
1605 }
1606 snprintf(buf, sizeof(buf), "_%d", i);
1607 ASSERT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
1608 ASSERT_OK(DB::Open(options[i], dbname_ + std::string(buf), &dbptr));
1609 db.emplace_back(dbptr);
1610 }
1611
1612 for (auto i = 0; i < kNumDbInstances; ++i) {
1613 WriteBatch batch;
1614
1615 for (auto j = 0; j <= 100; ++j) {
1616 ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
1617 }
1618
1619 WriteOptions wopts;
1620 wopts.sync = true;
1621 ASSERT_OK(db[i]->Write(wopts, &batch));
1622 ASSERT_OK(db[i]->Flush(FlushOptions()));
1623 }
1624
1625 def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
1626 for (auto i = 0; i < kNumDbInstances; ++i) {
1627 WriteBatch batch;
1628
1629 // Write to one CF
1630 for (auto j = 100; j < 199; ++j) {
1631 ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
1632 }
1633
1634 WriteOptions wopts;
1635 wopts.sync = true;
1636 ASSERT_OK(db[i]->Write(wopts, &batch));
1637 if (i != 1) {
1638 ASSERT_OK(db[i]->Flush(FlushOptions()));
1639 } else {
1640 ASSERT_TRUE(db[i]->Flush(FlushOptions()).IsNoSpace());
1641 }
1642 }
1643
1644 for (auto i = 0; i < kNumDbInstances; ++i) {
1645 Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
1646 switch (i) {
1647 case 0:
1648 ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
1649 break;
1650 case 1:
1651 ASSERT_EQ(s.severity(), Status::Severity::kHardError);
1652 break;
1653 case 2:
1654 ASSERT_OK(s);
1655 break;
1656 }
1657 fault_fs[i]->SetFilesystemActive(true);
1658 }
1659
1660 def_env->SetFilesystemActive(true);
1661 for (auto i = 0; i < kNumDbInstances; ++i) {
1662 std::string prop;
1663 if (i < 2) {
1664 ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
1665 }
1666 if (i == 1) {
1667 ASSERT_OK(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true));
1668 }
1669 EXPECT_TRUE(db[i]->GetProperty(
1670 "rocksdb.num-files-at-level" + NumberToString(0), &prop));
1671 EXPECT_EQ(atoi(prop.c_str()), 0);
1672 EXPECT_TRUE(db[i]->GetProperty(
1673 "rocksdb.num-files-at-level" + NumberToString(1), &prop));
1674 EXPECT_EQ(atoi(prop.c_str()), 1);
1675 }
1676
1677 SstFileManagerImpl* sfmImpl =
1678 static_cast_with_check<SstFileManagerImpl>(sfm.get());
1679 sfmImpl->Close();
1680
1681 for (auto i = 0; i < kNumDbInstances; ++i) {
1682 char buf[16];
1683 snprintf(buf, sizeof(buf), "_%d", i);
1684 fault_fs[i]->SetFilesystemActive(true);
1685 delete db[i];
1686 if (getenv("KEEP_DB")) {
1687 printf("DB is still at %s%s\n", dbname_.c_str(), buf);
1688 } else {
1689 EXPECT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
1690 }
1691 }
1692 options.clear();
1693 delete def_env;
1694 }
1695
1696 // When Put the KV-pair, the write option is set to disable WAL.
1697 // If retryable error happens in this condition, map the bg error
1698 // to soft error and trigger auto resume. During auto resume, SwitchMemtable
1699 // is disabled to avoid small SST tables. Write can still be applied before
1700 // the bg error is cleaned unless the memtable is full.
TEST_F(DBErrorHandlingFSTest,FLushWritNoWALRetryableErrorAutoRecover1)1701 TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableErrorAutoRecover1) {
1702 // Activate the FS before the first resume
1703 std::shared_ptr<ErrorHandlerFSListener> listener(
1704 new ErrorHandlerFSListener());
1705 Options options = GetDefaultOptions();
1706 options.env = fault_env_.get();
1707 options.create_if_missing = true;
1708 options.listeners.emplace_back(listener);
1709 options.max_bgerror_resume_count = 2;
1710 options.bgerror_resume_retry_interval = 100000; // 0.1 second
1711 options.statistics = CreateDBStatistics();
1712 Status s;
1713
1714 listener->EnableAutoRecovery(false);
1715 DestroyAndReopen(options);
1716
1717 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1718 error_msg.SetRetryable(true);
1719
1720 WriteOptions wo = WriteOptions();
1721 wo.disableWAL = true;
1722 ASSERT_OK(Put(Key(1), "val1", wo));
1723 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1724 {{"RecoverFromRetryableBGIOError:LoopOut",
1725 "FLushWritNoWALRetryableeErrorAutoRecover1:1"}});
1726 SyncPoint::GetInstance()->SetCallBack(
1727 "BuildTable:BeforeFinishBuildTable",
1728 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1729
1730 SyncPoint::GetInstance()->EnableProcessing();
1731 s = Flush();
1732 ASSERT_EQ("val1", Get(Key(1)));
1733 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1734 TEST_SYNC_POINT("FLushWritNoWALRetryableeErrorAutoRecover1:1");
1735 ASSERT_EQ("val1", Get(Key(1)));
1736 ASSERT_EQ("val1", Get(Key(1)));
1737 SyncPoint::GetInstance()->DisableProcessing();
1738 fault_fs_->SetFilesystemActive(true);
1739 ASSERT_EQ(3, options.statistics->getAndResetTickerCount(
1740 ERROR_HANDLER_BG_ERROR_COUNT));
1741 ASSERT_EQ(3, options.statistics->getAndResetTickerCount(
1742 ERROR_HANDLER_BG_IO_ERROR_COUNT));
1743 ASSERT_EQ(3, options.statistics->getAndResetTickerCount(
1744 ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
1745 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
1746 ERROR_HANDLER_AUTORESUME_COUNT));
1747 ASSERT_LE(0, options.statistics->getAndResetTickerCount(
1748 ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
1749 ASSERT_LE(0, options.statistics->getAndResetTickerCount(
1750 ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
1751 HistogramData autoresume_retry;
1752 options.statistics->histogramData(ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
1753 &autoresume_retry);
1754 ASSERT_GE(autoresume_retry.max, 0);
1755 ASSERT_OK(Put(Key(2), "val2", wo));
1756 s = Flush();
1757 // Since auto resume fails, the bg error is not cleand, flush will
1758 // return the bg_error set before.
1759 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1760 ASSERT_EQ("val2", Get(Key(2)));
1761
1762 // call auto resume
1763 ASSERT_OK(dbfull()->Resume());
1764 ASSERT_OK(Put(Key(3), "val3", wo));
1765 // After resume is successful, the flush should be ok.
1766 ASSERT_OK(Flush());
1767 ASSERT_EQ("val3", Get(Key(3)));
1768 Destroy(options);
1769 }
1770
TEST_F(DBErrorHandlingFSTest,FLushWritNoWALRetryableErrorAutoRecover2)1771 TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableErrorAutoRecover2) {
1772 // Activate the FS before the first resume
1773 std::shared_ptr<ErrorHandlerFSListener> listener(
1774 new ErrorHandlerFSListener());
1775 Options options = GetDefaultOptions();
1776 options.env = fault_env_.get();
1777 options.create_if_missing = true;
1778 options.listeners.emplace_back(listener);
1779 options.max_bgerror_resume_count = 2;
1780 options.bgerror_resume_retry_interval = 100000; // 0.1 second
1781 options.statistics = CreateDBStatistics();
1782 Status s;
1783
1784 listener->EnableAutoRecovery(false);
1785 DestroyAndReopen(options);
1786
1787 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1788 error_msg.SetRetryable(true);
1789
1790 WriteOptions wo = WriteOptions();
1791 wo.disableWAL = true;
1792 ASSERT_OK(Put(Key(1), "val1", wo));
1793 SyncPoint::GetInstance()->SetCallBack(
1794 "BuildTable:BeforeFinishBuildTable",
1795 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1796
1797 SyncPoint::GetInstance()->EnableProcessing();
1798 s = Flush();
1799 ASSERT_EQ("val1", Get(Key(1)));
1800 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1801 SyncPoint::GetInstance()->DisableProcessing();
1802 fault_fs_->SetFilesystemActive(true);
1803 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
1804 ASSERT_EQ("val1", Get(Key(1)));
1805 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
1806 ERROR_HANDLER_BG_ERROR_COUNT));
1807 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
1808 ERROR_HANDLER_BG_IO_ERROR_COUNT));
1809 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
1810 ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
1811 ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
1812 ERROR_HANDLER_AUTORESUME_COUNT));
1813 ASSERT_LE(0, options.statistics->getAndResetTickerCount(
1814 ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
1815 ASSERT_LE(0, options.statistics->getAndResetTickerCount(
1816 ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
1817 HistogramData autoresume_retry;
1818 options.statistics->histogramData(ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
1819 &autoresume_retry);
1820 ASSERT_GE(autoresume_retry.max, 0);
1821 ASSERT_OK(Put(Key(2), "val2", wo));
1822 s = Flush();
1823 // Since auto resume is successful, the bg error is cleaned, flush will
1824 // be successful.
1825 ASSERT_OK(s);
1826 ASSERT_EQ("val2", Get(Key(2)));
1827 Destroy(options);
1828 }
1829
TEST_F(DBErrorHandlingFSTest,FLushWritRetryableErrorAutoRecover1)1830 TEST_F(DBErrorHandlingFSTest, FLushWritRetryableErrorAutoRecover1) {
1831 // Activate the FS before the first resume
1832 std::shared_ptr<ErrorHandlerFSListener> listener(
1833 new ErrorHandlerFSListener());
1834 Options options = GetDefaultOptions();
1835 options.env = fault_env_.get();
1836 options.create_if_missing = true;
1837 options.listeners.emplace_back(listener);
1838 options.max_bgerror_resume_count = 2;
1839 options.bgerror_resume_retry_interval = 100000; // 0.1 second
1840 Status s;
1841
1842 listener->EnableAutoRecovery(false);
1843 DestroyAndReopen(options);
1844
1845 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1846 error_msg.SetRetryable(true);
1847
1848 ASSERT_OK(Put(Key(1), "val1"));
1849 SyncPoint::GetInstance()->SetCallBack(
1850 "BuildTable:BeforeFinishBuildTable",
1851 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1852
1853 SyncPoint::GetInstance()->EnableProcessing();
1854 s = Flush();
1855 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1856 SyncPoint::GetInstance()->DisableProcessing();
1857 fault_fs_->SetFilesystemActive(true);
1858 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
1859
1860 ASSERT_EQ("val1", Get(Key(1)));
1861 Reopen(options);
1862 ASSERT_EQ("val1", Get(Key(1)));
1863 ASSERT_OK(Put(Key(2), "val2"));
1864 ASSERT_OK(Flush());
1865 ASSERT_EQ("val2", Get(Key(2)));
1866
1867 Destroy(options);
1868 }
1869
TEST_F(DBErrorHandlingFSTest,FLushWritRetryableErrorAutoRecover2)1870 TEST_F(DBErrorHandlingFSTest, FLushWritRetryableErrorAutoRecover2) {
1871 // Fail all the resume and let user to resume
1872 std::shared_ptr<ErrorHandlerFSListener> listener(
1873 new ErrorHandlerFSListener());
1874 Options options = GetDefaultOptions();
1875 options.env = fault_env_.get();
1876 options.create_if_missing = true;
1877 options.listeners.emplace_back(listener);
1878 options.max_bgerror_resume_count = 2;
1879 options.bgerror_resume_retry_interval = 100000; // 0.1 second
1880 Status s;
1881
1882 listener->EnableAutoRecovery(false);
1883 DestroyAndReopen(options);
1884
1885 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1886 error_msg.SetRetryable(true);
1887
1888 ASSERT_OK(Put(Key(1), "val1"));
1889 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1890 {{"FLushWritRetryableeErrorAutoRecover2:0",
1891 "RecoverFromRetryableBGIOError:BeforeStart"},
1892 {"RecoverFromRetryableBGIOError:LoopOut",
1893 "FLushWritRetryableeErrorAutoRecover2:1"}});
1894 SyncPoint::GetInstance()->SetCallBack(
1895 "BuildTable:BeforeFinishBuildTable",
1896 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1897 SyncPoint::GetInstance()->EnableProcessing();
1898 s = Flush();
1899 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1900 TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover2:0");
1901 TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover2:1");
1902 fault_fs_->SetFilesystemActive(true);
1903 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1904 SyncPoint::GetInstance()->DisableProcessing();
1905
1906 ASSERT_EQ("val1", Get(Key(1)));
1907 // Auto resume fails due to FS does not recover during resume. User call
1908 // resume manually here.
1909 s = dbfull()->Resume();
1910 ASSERT_EQ("val1", Get(Key(1)));
1911 ASSERT_OK(s);
1912 ASSERT_OK(Put(Key(2), "val2"));
1913 ASSERT_OK(Flush());
1914 ASSERT_EQ("val2", Get(Key(2)));
1915
1916 Destroy(options);
1917 }
1918
TEST_F(DBErrorHandlingFSTest,ManifestWriteRetryableErrorAutoRecover)1919 TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableErrorAutoRecover) {
1920 // Fail the first resume and let the second resume be successful
1921 std::shared_ptr<ErrorHandlerFSListener> listener(
1922 new ErrorHandlerFSListener());
1923 Options options = GetDefaultOptions();
1924 options.env = fault_env_.get();
1925 options.create_if_missing = true;
1926 options.listeners.emplace_back(listener);
1927 options.max_bgerror_resume_count = 2;
1928 options.bgerror_resume_retry_interval = 100000; // 0.1 second
1929 Status s;
1930 std::string old_manifest;
1931 std::string new_manifest;
1932
1933 listener->EnableAutoRecovery(false);
1934 DestroyAndReopen(options);
1935 old_manifest = GetManifestNameFromLiveFiles();
1936
1937 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1938 error_msg.SetRetryable(true);
1939
1940 ASSERT_OK(Put(Key(0), "val"));
1941 ASSERT_OK(Flush());
1942 ASSERT_OK(Put(Key(1), "val"));
1943 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1944 {{"RecoverFromRetryableBGIOError:BeforeStart",
1945 "ManifestWriteRetryableErrorAutoRecover:0"},
1946 {"ManifestWriteRetryableErrorAutoRecover:1",
1947 "RecoverFromRetryableBGIOError:BeforeWait1"},
1948 {"RecoverFromRetryableBGIOError:RecoverSuccess",
1949 "ManifestWriteRetryableErrorAutoRecover:2"}});
1950 SyncPoint::GetInstance()->SetCallBack(
1951 "VersionSet::LogAndApply:WriteManifest",
1952 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1953 SyncPoint::GetInstance()->EnableProcessing();
1954 s = Flush();
1955 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1956 TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:0");
1957 fault_fs_->SetFilesystemActive(true);
1958 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1959 TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:1");
1960 TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:2");
1961 SyncPoint::GetInstance()->DisableProcessing();
1962
1963 new_manifest = GetManifestNameFromLiveFiles();
1964 ASSERT_NE(new_manifest, old_manifest);
1965
1966 Reopen(options);
1967 ASSERT_EQ("val", Get(Key(0)));
1968 ASSERT_EQ("val", Get(Key(1)));
1969 Close();
1970 }
1971
TEST_F(DBErrorHandlingFSTest,ManifestWriteNoWALRetryableErrorAutoRecover)1972 TEST_F(DBErrorHandlingFSTest, ManifestWriteNoWALRetryableErrorAutoRecover) {
1973 // Fail the first resume and let the second resume be successful
1974 std::shared_ptr<ErrorHandlerFSListener> listener(
1975 new ErrorHandlerFSListener());
1976 Options options = GetDefaultOptions();
1977 options.env = fault_env_.get();
1978 options.create_if_missing = true;
1979 options.listeners.emplace_back(listener);
1980 options.max_bgerror_resume_count = 2;
1981 options.bgerror_resume_retry_interval = 100000; // 0.1 second
1982 Status s;
1983 std::string old_manifest;
1984 std::string new_manifest;
1985
1986 listener->EnableAutoRecovery(false);
1987 DestroyAndReopen(options);
1988 old_manifest = GetManifestNameFromLiveFiles();
1989
1990 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1991 error_msg.SetRetryable(true);
1992
1993 WriteOptions wo = WriteOptions();
1994 wo.disableWAL = true;
1995 ASSERT_OK(Put(Key(0), "val", wo));
1996 ASSERT_OK(Flush());
1997 ASSERT_OK(Put(Key(1), "val", wo));
1998 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1999 {{"RecoverFromRetryableBGIOError:BeforeStart",
2000 "ManifestWriteNoWALRetryableErrorAutoRecover:0"},
2001 {"ManifestWriteNoWALRetryableErrorAutoRecover:1",
2002 "RecoverFromRetryableBGIOError:BeforeWait1"},
2003 {"RecoverFromRetryableBGIOError:RecoverSuccess",
2004 "ManifestWriteNoWALRetryableErrorAutoRecover:2"}});
2005 SyncPoint::GetInstance()->SetCallBack(
2006 "VersionSet::LogAndApply:WriteManifest",
2007 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
2008 SyncPoint::GetInstance()->EnableProcessing();
2009 s = Flush();
2010 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
2011 TEST_SYNC_POINT("ManifestWriteNoWALRetryableErrorAutoRecover:0");
2012 fault_fs_->SetFilesystemActive(true);
2013 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
2014 TEST_SYNC_POINT("ManifestWriteNoWALRetryableErrorAutoRecover:1");
2015 TEST_SYNC_POINT("ManifestWriteNoWALRetryableErrorAutoRecover:2");
2016 SyncPoint::GetInstance()->DisableProcessing();
2017
2018 new_manifest = GetManifestNameFromLiveFiles();
2019 ASSERT_NE(new_manifest, old_manifest);
2020
2021 Reopen(options);
2022 ASSERT_EQ("val", Get(Key(0)));
2023 ASSERT_EQ("val", Get(Key(1)));
2024 Close();
2025 }
2026
TEST_F(DBErrorHandlingFSTest,CompactionManifestWriteRetryableErrorAutoRecover)2027 TEST_F(DBErrorHandlingFSTest,
2028 CompactionManifestWriteRetryableErrorAutoRecover) {
2029 std::shared_ptr<ErrorHandlerFSListener> listener(
2030 new ErrorHandlerFSListener());
2031 Options options = GetDefaultOptions();
2032 options.env = fault_env_.get();
2033 options.create_if_missing = true;
2034 options.level0_file_num_compaction_trigger = 2;
2035 options.listeners.emplace_back(listener);
2036 options.max_bgerror_resume_count = 2;
2037 options.bgerror_resume_retry_interval = 100000; // 0.1 second
2038 Status s;
2039 std::string old_manifest;
2040 std::string new_manifest;
2041 std::atomic<bool> fail_manifest(false);
2042 DestroyAndReopen(options);
2043 old_manifest = GetManifestNameFromLiveFiles();
2044
2045 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
2046 error_msg.SetRetryable(true);
2047
2048 ASSERT_OK(Put(Key(0), "val"));
2049 ASSERT_OK(Put(Key(2), "val"));
2050 ASSERT_OK(Flush());
2051
2052 listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
2053 listener->EnableAutoRecovery(false);
2054 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2055 // Wait for flush of 2nd L0 file before starting compaction
2056 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
2057 "BackgroundCallCompaction:0"},
2058 // Wait for compaction to detect manifest write error
2059 {"BackgroundCallCompaction:1", "CompactionManifestWriteErrorAR:0"},
2060 // Make compaction thread wait for error to be cleared
2061 {"CompactionManifestWriteErrorAR:1",
2062 "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"},
2063 {"CompactionManifestWriteErrorAR:2",
2064 "RecoverFromRetryableBGIOError:BeforeStart"},
2065 // Fail the first resume, before the wait in resume
2066 {"RecoverFromRetryableBGIOError:BeforeResume0",
2067 "CompactionManifestWriteErrorAR:3"},
2068 // Activate the FS before the second resume
2069 {"CompactionManifestWriteErrorAR:4",
2070 "RecoverFromRetryableBGIOError:BeforeResume1"},
2071 // Wait the auto resume be sucessful
2072 {"RecoverFromRetryableBGIOError:RecoverSuccess",
2073 "CompactionManifestWriteErrorAR:5"}});
2074 // trigger manifest write failure in compaction thread
2075 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2076 "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
2077 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2078 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
2079 if (fail_manifest.load()) {
2080 fault_fs_->SetFilesystemActive(false, error_msg);
2081 }
2082 });
2083 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2084
2085 ASSERT_OK(Put(Key(1), "val"));
2086 s = Flush();
2087 ASSERT_OK(s);
2088
2089 TEST_SYNC_POINT("CompactionManifestWriteErrorAR:0");
2090 TEST_SYNC_POINT("CompactionManifestWriteErrorAR:1");
2091
2092 s = dbfull()->TEST_WaitForCompact();
2093 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
2094 TEST_SYNC_POINT("CompactionManifestWriteErrorAR:2");
2095 TEST_SYNC_POINT("CompactionManifestWriteErrorAR:3");
2096 fault_fs_->SetFilesystemActive(true);
2097 SyncPoint::GetInstance()->ClearAllCallBacks();
2098 TEST_SYNC_POINT("CompactionManifestWriteErrorAR:4");
2099 TEST_SYNC_POINT("CompactionManifestWriteErrorAR:5");
2100 SyncPoint::GetInstance()->DisableProcessing();
2101
2102 new_manifest = GetManifestNameFromLiveFiles();
2103 ASSERT_NE(new_manifest, old_manifest);
2104
2105 Reopen(options);
2106 ASSERT_EQ("val", Get(Key(0)));
2107 ASSERT_EQ("val", Get(Key(1)));
2108 ASSERT_EQ("val", Get(Key(2)));
2109 Close();
2110 }
2111
TEST_F(DBErrorHandlingFSTest,CompactionWriteRetryableErrorAutoRecover)2112 TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableErrorAutoRecover) {
2113 // In this test, in the first round of compaction, the FS is set to error.
2114 // So the first compaction fails due to retryable IO error and it is mapped
2115 // to soft error. Then, compaction is rescheduled, in the second round of
2116 // compaction, the FS is set to active and compaction is successful, so
2117 // the test will hit the CompactionJob::FinishCompactionOutputFile1 sync
2118 // point.
2119 std::shared_ptr<ErrorHandlerFSListener> listener(
2120 new ErrorHandlerFSListener());
2121 Options options = GetDefaultOptions();
2122 options.env = fault_env_.get();
2123 options.create_if_missing = true;
2124 options.level0_file_num_compaction_trigger = 2;
2125 options.listeners.emplace_back(listener);
2126 Status s;
2127 std::atomic<bool> fail_first(false);
2128 std::atomic<bool> fail_second(true);
2129 DestroyAndReopen(options);
2130
2131 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
2132 error_msg.SetRetryable(true);
2133
2134 ASSERT_OK(Put(Key(0), "va;"));
2135 ASSERT_OK(Put(Key(2), "va;"));
2136 s = Flush();
2137 ASSERT_OK(s);
2138
2139 listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
2140 listener->EnableAutoRecovery(false);
2141 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2142 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
2143 "BackgroundCallCompaction:0"},
2144 {"CompactionJob::FinishCompactionOutputFile1",
2145 "CompactionWriteRetryableErrorAutoRecover0"}});
2146 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2147 "DBImpl::BackgroundCompaction:Start",
2148 [&](void*) { fault_fs_->SetFilesystemActive(true); });
2149 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2150 "BackgroundCallCompaction:0", [&](void*) { fail_first.store(true); });
2151 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2152 "CompactionJob::OpenCompactionOutputFile", [&](void*) {
2153 if (fail_first.load() && fail_second.load()) {
2154 fault_fs_->SetFilesystemActive(false, error_msg);
2155 fail_second.store(false);
2156 }
2157 });
2158 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2159
2160 ASSERT_OK(Put(Key(1), "val"));
2161 s = Flush();
2162 ASSERT_OK(s);
2163
2164 s = dbfull()->TEST_WaitForCompact();
2165 ASSERT_OK(s);
2166 TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0");
2167 SyncPoint::GetInstance()->ClearAllCallBacks();
2168 SyncPoint::GetInstance()->DisableProcessing();
2169 Destroy(options);
2170 }
2171
TEST_F(DBErrorHandlingFSTest,WALWriteRetryableErrorAutoRecover1)2172 TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover1) {
2173 std::shared_ptr<ErrorHandlerFSListener> listener(
2174 new ErrorHandlerFSListener());
2175 Options options = GetDefaultOptions();
2176 options.env = fault_env_.get();
2177 options.create_if_missing = true;
2178 options.writable_file_max_buffer_size = 32768;
2179 options.listeners.emplace_back(listener);
2180 options.paranoid_checks = true;
2181 options.max_bgerror_resume_count = 2;
2182 options.bgerror_resume_retry_interval = 100000; // 0.1 second
2183 Status s;
2184 Random rnd(301);
2185
2186 DestroyAndReopen(options);
2187
2188 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
2189 error_msg.SetRetryable(true);
2190
2191 // For the first batch, write is successful, require sync
2192 {
2193 WriteBatch batch;
2194
2195 for (auto i = 0; i < 100; ++i) {
2196 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2197 }
2198
2199 WriteOptions wopts;
2200 wopts.sync = true;
2201 ASSERT_OK(dbfull()->Write(wopts, &batch));
2202 };
2203
2204 // For the second batch, the first 2 file Append are successful, then the
2205 // following Append fails due to file system retryable IOError.
2206 {
2207 WriteBatch batch;
2208 int write_error = 0;
2209
2210 for (auto i = 100; i < 200; ++i) {
2211 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2212 }
2213 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2214 {{"WALWriteErrorDone", "RecoverFromRetryableBGIOError:BeforeStart"},
2215 {"RecoverFromRetryableBGIOError:BeforeResume0", "WALWriteError1:0"},
2216 {"WALWriteError1:1", "RecoverFromRetryableBGIOError:BeforeResume1"},
2217 {"RecoverFromRetryableBGIOError:RecoverSuccess", "WALWriteError1:2"}});
2218
2219 SyncPoint::GetInstance()->SetCallBack(
2220 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
2221 write_error++;
2222 if (write_error > 2) {
2223 fault_fs_->SetFilesystemActive(false, error_msg);
2224 }
2225 });
2226 SyncPoint::GetInstance()->EnableProcessing();
2227 WriteOptions wopts;
2228 wopts.sync = true;
2229 s = dbfull()->Write(wopts, &batch);
2230 ASSERT_EQ(true, s.IsIOError());
2231 TEST_SYNC_POINT("WALWriteErrorDone");
2232
2233 TEST_SYNC_POINT("WALWriteError1:0");
2234 fault_fs_->SetFilesystemActive(true);
2235 SyncPoint::GetInstance()->ClearAllCallBacks();
2236 TEST_SYNC_POINT("WALWriteError1:1");
2237 TEST_SYNC_POINT("WALWriteError1:2");
2238 }
2239 SyncPoint::GetInstance()->DisableProcessing();
2240
2241 // Data in corrupted WAL are not stored
2242 for (auto i = 0; i < 199; ++i) {
2243 if (i < 100) {
2244 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
2245 } else {
2246 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
2247 }
2248 }
2249
2250 // Resume and write a new batch, should be in the WAL
2251 {
2252 WriteBatch batch;
2253
2254 for (auto i = 200; i < 300; ++i) {
2255 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2256 }
2257
2258 WriteOptions wopts;
2259 wopts.sync = true;
2260 ASSERT_OK(dbfull()->Write(wopts, &batch));
2261 };
2262
2263 Reopen(options);
2264 for (auto i = 0; i < 300; ++i) {
2265 if (i < 100 || i >= 200) {
2266 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
2267 } else {
2268 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
2269 }
2270 }
2271 Close();
2272 }
2273
TEST_F(DBErrorHandlingFSTest,WALWriteRetryableErrorAutoRecover2)2274 TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover2) {
2275 // Fail the first recover and try second time.
2276 std::shared_ptr<ErrorHandlerFSListener> listener(
2277 new ErrorHandlerFSListener());
2278 Options options = GetDefaultOptions();
2279 options.env = fault_env_.get();
2280 options.create_if_missing = true;
2281 options.writable_file_max_buffer_size = 32768;
2282 options.listeners.emplace_back(listener);
2283 options.paranoid_checks = true;
2284 options.max_bgerror_resume_count = 2;
2285 options.bgerror_resume_retry_interval = 100000; // 0.1 second
2286 Status s;
2287 Random rnd(301);
2288
2289 DestroyAndReopen(options);
2290
2291 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
2292 error_msg.SetRetryable(true);
2293
2294 // For the first batch, write is successful, require sync
2295 {
2296 WriteBatch batch;
2297
2298 for (auto i = 0; i < 100; ++i) {
2299 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2300 }
2301
2302 WriteOptions wopts;
2303 wopts.sync = true;
2304 ASSERT_OK(dbfull()->Write(wopts, &batch));
2305 };
2306
2307 // For the second batch, the first 2 file Append are successful, then the
2308 // following Append fails due to file system retryable IOError.
2309 {
2310 WriteBatch batch;
2311 int write_error = 0;
2312
2313 for (auto i = 100; i < 200; ++i) {
2314 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2315 }
2316 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2317 {{"RecoverFromRetryableBGIOError:BeforeWait0", "WALWriteError2:0"},
2318 {"WALWriteError2:1", "RecoverFromRetryableBGIOError:BeforeWait1"},
2319 {"RecoverFromRetryableBGIOError:RecoverSuccess", "WALWriteError2:2"}});
2320
2321 SyncPoint::GetInstance()->SetCallBack(
2322 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
2323 write_error++;
2324 if (write_error > 2) {
2325 fault_fs_->SetFilesystemActive(false, error_msg);
2326 }
2327 });
2328 SyncPoint::GetInstance()->EnableProcessing();
2329 WriteOptions wopts;
2330 wopts.sync = true;
2331 s = dbfull()->Write(wopts, &batch);
2332 ASSERT_EQ(true, s.IsIOError());
2333
2334 TEST_SYNC_POINT("WALWriteError2:0");
2335 fault_fs_->SetFilesystemActive(true);
2336 SyncPoint::GetInstance()->ClearAllCallBacks();
2337 TEST_SYNC_POINT("WALWriteError2:1");
2338 TEST_SYNC_POINT("WALWriteError2:2");
2339 }
2340 SyncPoint::GetInstance()->DisableProcessing();
2341
2342 // Data in corrupted WAL are not stored
2343 for (auto i = 0; i < 199; ++i) {
2344 if (i < 100) {
2345 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
2346 } else {
2347 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
2348 }
2349 }
2350
2351 // Resume and write a new batch, should be in the WAL
2352 {
2353 WriteBatch batch;
2354
2355 for (auto i = 200; i < 300; ++i) {
2356 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2357 }
2358
2359 WriteOptions wopts;
2360 wopts.sync = true;
2361 ASSERT_OK(dbfull()->Write(wopts, &batch));
2362 };
2363
2364 Reopen(options);
2365 for (auto i = 0; i < 300; ++i) {
2366 if (i < 100 || i >= 200) {
2367 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
2368 } else {
2369 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
2370 }
2371 }
2372 Close();
2373 }
2374
2375 class DBErrorHandlingFencingTest : public DBErrorHandlingFSTest,
2376 public testing::WithParamInterface<bool> {};
2377
TEST_P(DBErrorHandlingFencingTest,FLushWriteFenced)2378 TEST_P(DBErrorHandlingFencingTest, FLushWriteFenced) {
2379 std::shared_ptr<ErrorHandlerFSListener> listener(
2380 new ErrorHandlerFSListener());
2381 Options options = GetDefaultOptions();
2382 options.env = fault_env_.get();
2383 options.create_if_missing = true;
2384 options.listeners.emplace_back(listener);
2385 options.paranoid_checks = GetParam();
2386 Status s;
2387
2388 listener->EnableAutoRecovery(true);
2389 DestroyAndReopen(options);
2390
2391 ASSERT_OK(Put(Key(0), "val"));
2392 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
2393 fault_fs_->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
2394 });
2395 SyncPoint::GetInstance()->EnableProcessing();
2396 s = Flush();
2397 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
2398 ASSERT_TRUE(s.IsIOFenced());
2399 SyncPoint::GetInstance()->DisableProcessing();
2400 fault_fs_->SetFilesystemActive(true);
2401 s = dbfull()->Resume();
2402 ASSERT_TRUE(s.IsIOFenced());
2403 Destroy(options);
2404 }
2405
TEST_P(DBErrorHandlingFencingTest,ManifestWriteFenced)2406 TEST_P(DBErrorHandlingFencingTest, ManifestWriteFenced) {
2407 std::shared_ptr<ErrorHandlerFSListener> listener(
2408 new ErrorHandlerFSListener());
2409 Options options = GetDefaultOptions();
2410 options.env = fault_env_.get();
2411 options.create_if_missing = true;
2412 options.listeners.emplace_back(listener);
2413 options.paranoid_checks = GetParam();
2414 Status s;
2415 std::string old_manifest;
2416 std::string new_manifest;
2417
2418 listener->EnableAutoRecovery(true);
2419 DestroyAndReopen(options);
2420 old_manifest = GetManifestNameFromLiveFiles();
2421
2422 ASSERT_OK(Put(Key(0), "val"));
2423 ASSERT_OK(Flush());
2424 ASSERT_OK(Put(Key(1), "val"));
2425 SyncPoint::GetInstance()->SetCallBack(
2426 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
2427 fault_fs_->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
2428 });
2429 SyncPoint::GetInstance()->EnableProcessing();
2430 s = Flush();
2431 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
2432 ASSERT_TRUE(s.IsIOFenced());
2433 SyncPoint::GetInstance()->ClearAllCallBacks();
2434 SyncPoint::GetInstance()->DisableProcessing();
2435 fault_fs_->SetFilesystemActive(true);
2436 s = dbfull()->Resume();
2437 ASSERT_TRUE(s.IsIOFenced());
2438 Close();
2439 }
2440
TEST_P(DBErrorHandlingFencingTest,CompactionWriteFenced)2441 TEST_P(DBErrorHandlingFencingTest, CompactionWriteFenced) {
2442 std::shared_ptr<ErrorHandlerFSListener> listener(
2443 new ErrorHandlerFSListener());
2444 Options options = GetDefaultOptions();
2445 options.env = fault_env_.get();
2446 options.create_if_missing = true;
2447 options.level0_file_num_compaction_trigger = 2;
2448 options.listeners.emplace_back(listener);
2449 options.paranoid_checks = GetParam();
2450 Status s;
2451 DestroyAndReopen(options);
2452
2453 ASSERT_OK(Put(Key(0), "va;"));
2454 ASSERT_OK(Put(Key(2), "va;"));
2455 s = Flush();
2456 ASSERT_OK(s);
2457
2458 listener->EnableAutoRecovery(true);
2459 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2460 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
2461 "BackgroundCallCompaction:0"}});
2462 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2463 "BackgroundCallCompaction:0", [&](void*) {
2464 fault_fs_->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
2465 });
2466 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2467
2468 ASSERT_OK(Put(Key(1), "val"));
2469 s = Flush();
2470 ASSERT_OK(s);
2471
2472 s = dbfull()->TEST_WaitForCompact();
2473 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
2474 ASSERT_TRUE(s.IsIOFenced());
2475
2476 fault_fs_->SetFilesystemActive(true);
2477 s = dbfull()->Resume();
2478 ASSERT_TRUE(s.IsIOFenced());
2479 Destroy(options);
2480 }
2481
TEST_P(DBErrorHandlingFencingTest,WALWriteFenced)2482 TEST_P(DBErrorHandlingFencingTest, WALWriteFenced) {
2483 std::shared_ptr<ErrorHandlerFSListener> listener(
2484 new ErrorHandlerFSListener());
2485 Options options = GetDefaultOptions();
2486 options.env = fault_env_.get();
2487 options.create_if_missing = true;
2488 options.writable_file_max_buffer_size = 32768;
2489 options.listeners.emplace_back(listener);
2490 options.paranoid_checks = GetParam();
2491 Status s;
2492 Random rnd(301);
2493
2494 listener->EnableAutoRecovery(true);
2495 DestroyAndReopen(options);
2496
2497 {
2498 WriteBatch batch;
2499
2500 for (auto i = 0; i < 100; ++i) {
2501 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2502 }
2503
2504 WriteOptions wopts;
2505 wopts.sync = true;
2506 ASSERT_OK(dbfull()->Write(wopts, &batch));
2507 };
2508
2509 {
2510 WriteBatch batch;
2511 int write_error = 0;
2512
2513 for (auto i = 100; i < 199; ++i) {
2514 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2515 }
2516
2517 SyncPoint::GetInstance()->SetCallBack(
2518 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
2519 write_error++;
2520 if (write_error > 2) {
2521 fault_fs_->SetFilesystemActive(false,
2522 IOStatus::IOFenced("IO fenced"));
2523 }
2524 });
2525 SyncPoint::GetInstance()->EnableProcessing();
2526 WriteOptions wopts;
2527 wopts.sync = true;
2528 s = dbfull()->Write(wopts, &batch);
2529 ASSERT_TRUE(s.IsIOFenced());
2530 }
2531 SyncPoint::GetInstance()->DisableProcessing();
2532 fault_fs_->SetFilesystemActive(true);
2533 {
2534 WriteBatch batch;
2535
2536 for (auto i = 0; i < 100; ++i) {
2537 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2538 }
2539
2540 WriteOptions wopts;
2541 wopts.sync = true;
2542 s = dbfull()->Write(wopts, &batch);
2543 ASSERT_TRUE(s.IsIOFenced());
2544 }
2545 Close();
2546 }
2547
2548 INSTANTIATE_TEST_CASE_P(DBErrorHandlingFSTest, DBErrorHandlingFencingTest,
2549 ::testing::Bool());
2550
2551 } // namespace ROCKSDB_NAMESPACE
2552
main(int argc,char ** argv)2553 int main(int argc, char** argv) {
2554 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
2555 ::testing::InitGoogleTest(&argc, argv);
2556 return RUN_ALL_TESTS();
2557 }
2558
2559 #else
2560 #include <stdio.h>
2561
main(int,char **)2562 int main(int /*argc*/, char** /*argv*/) {
2563 fprintf(stderr, "SKIPPED as Cuckoo table is not supported in ROCKSDB_LITE\n");
2564 return 0;
2565 }
2566
2567 #endif // ROCKSDB_LITE
2568