1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "bytes" 11 "container/list" 12 crand "crypto/rand" 13 "encoding/binary" 14 "fmt" 15 "math/rand" 16 "os" 17 "path/filepath" 18 "runtime" 19 "strings" 20 "sync" 21 "sync/atomic" 22 "testing" 23 "time" 24 "unsafe" 25 26 "github.com/onsi/gomega" 27 28 "github.com/syndtr/goleveldb/leveldb/comparer" 29 "github.com/syndtr/goleveldb/leveldb/errors" 30 "github.com/syndtr/goleveldb/leveldb/filter" 31 "github.com/syndtr/goleveldb/leveldb/iterator" 32 "github.com/syndtr/goleveldb/leveldb/opt" 33 "github.com/syndtr/goleveldb/leveldb/storage" 34 "github.com/syndtr/goleveldb/leveldb/testutil" 35 "github.com/syndtr/goleveldb/leveldb/util" 36) 37 38func tkey(i int) []byte { 39 return []byte(fmt.Sprintf("%016d", i)) 40} 41 42func tval(seed, n int) []byte { 43 r := rand.New(rand.NewSource(int64(seed))) 44 return randomString(r, n) 45} 46 47func testingLogger(t *testing.T) func(log string) { 48 return func(log string) { 49 t.Log(log) 50 } 51} 52 53func testingPreserveOnFailed(t *testing.T) func() (preserve bool, err error) { 54 return func() (preserve bool, err error) { 55 preserve = t.Failed() 56 return 57 } 58} 59 60type dbHarness struct { 61 t *testing.T 62 63 stor *testutil.Storage 64 db *DB 65 o *opt.Options 66 ro *opt.ReadOptions 67 wo *opt.WriteOptions 68} 69 70func newDbHarnessWopt(t *testing.T, o *opt.Options) *dbHarness { 71 h := new(dbHarness) 72 h.init(t, o) 73 return h 74} 75 76func newDbHarness(t *testing.T) *dbHarness { 77 return newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true}) 78} 79 80func (h *dbHarness) init(t *testing.T, o *opt.Options) { 81 gomega.RegisterTestingT(t) 82 h.t = t 83 h.stor = testutil.NewStorage() 84 h.stor.OnLog(testingLogger(t)) 85 h.stor.OnClose(testingPreserveOnFailed(t)) 86 h.o = o 87 h.ro = nil 88 h.wo = nil 89 90 if err := h.openDB0(); err != nil { 91 // So that it will come after fatal message. 92 defer h.stor.Close() 93 h.t.Fatal("Open (init): got error: ", err) 94 } 95} 96 97func (h *dbHarness) openDB0() (err error) { 98 h.t.Log("opening DB") 99 h.db, err = Open(h.stor, h.o) 100 return 101} 102 103func (h *dbHarness) openDB() { 104 if err := h.openDB0(); err != nil { 105 h.t.Fatal("Open: got error: ", err) 106 } 107} 108 109func (h *dbHarness) closeDB0() error { 110 h.t.Log("closing DB") 111 return h.db.Close() 112} 113 114func (h *dbHarness) closeDB() { 115 if h.db != nil { 116 if err := h.closeDB0(); err != nil { 117 h.t.Error("Close: got error: ", err) 118 } 119 } 120 h.stor.CloseCheck() 121 runtime.GC() 122} 123 124func (h *dbHarness) reopenDB() { 125 if h.db != nil { 126 h.closeDB() 127 } 128 h.openDB() 129} 130 131func (h *dbHarness) close() { 132 if h.db != nil { 133 h.closeDB0() 134 h.db = nil 135 } 136 h.stor.Close() 137 h.stor = nil 138 runtime.GC() 139} 140 141func (h *dbHarness) openAssert(want bool) { 142 db, err := Open(h.stor, h.o) 143 if err != nil { 144 if want { 145 h.t.Error("Open: assert: got error: ", err) 146 } else { 147 h.t.Log("Open: assert: got error (expected): ", err) 148 } 149 } else { 150 if !want { 151 h.t.Error("Open: assert: expect error") 152 } 153 db.Close() 154 } 155} 156 157func (h *dbHarness) write(batch *Batch) { 158 if err := h.db.Write(batch, h.wo); err != nil { 159 h.t.Error("Write: got error: ", err) 160 } 161} 162 163func (h *dbHarness) put(key, value string) { 164 if err := h.db.Put([]byte(key), []byte(value), h.wo); err != nil { 165 h.t.Error("Put: got error: ", err) 166 } 167} 168 169func (h *dbHarness) putMulti(n int, low, hi string) { 170 for i := 0; i < n; i++ { 171 h.put(low, "begin") 172 h.put(hi, "end") 173 h.compactMem() 174 } 175} 176 177func (h *dbHarness) maxNextLevelOverlappingBytes(want int64) { 178 t := h.t 179 db := h.db 180 181 var ( 182 maxOverlaps int64 183 maxLevel int 184 ) 185 v := db.s.version() 186 if len(v.levels) > 2 { 187 for i, tt := range v.levels[1 : len(v.levels)-1] { 188 level := i + 1 189 next := v.levels[level+1] 190 for _, t := range tt { 191 r := next.getOverlaps(nil, db.s.icmp, t.imin.ukey(), t.imax.ukey(), false) 192 sum := r.size() 193 if sum > maxOverlaps { 194 maxOverlaps = sum 195 maxLevel = level 196 } 197 } 198 } 199 } 200 v.release() 201 202 if maxOverlaps > want { 203 t.Errorf("next level most overlapping bytes is more than %d, got=%d level=%d", want, maxOverlaps, maxLevel) 204 } else { 205 t.Logf("next level most overlapping bytes is %d, level=%d want=%d", maxOverlaps, maxLevel, want) 206 } 207} 208 209func (h *dbHarness) delete(key string) { 210 t := h.t 211 db := h.db 212 213 err := db.Delete([]byte(key), h.wo) 214 if err != nil { 215 t.Error("Delete: got error: ", err) 216 } 217} 218 219func (h *dbHarness) assertNumKeys(want int) { 220 iter := h.db.NewIterator(nil, h.ro) 221 defer iter.Release() 222 got := 0 223 for iter.Next() { 224 got++ 225 } 226 if err := iter.Error(); err != nil { 227 h.t.Error("assertNumKeys: ", err) 228 } 229 if want != got { 230 h.t.Errorf("assertNumKeys: want=%d got=%d", want, got) 231 } 232} 233 234func (h *dbHarness) getr(db Reader, key string, expectFound bool) (found bool, v []byte) { 235 t := h.t 236 v, err := db.Get([]byte(key), h.ro) 237 switch err { 238 case ErrNotFound: 239 if expectFound { 240 t.Errorf("Get: key '%s' not found, want found", key) 241 } 242 case nil: 243 found = true 244 if !expectFound { 245 t.Errorf("Get: key '%s' found, want not found", key) 246 } 247 default: 248 t.Error("Get: got error: ", err) 249 } 250 return 251} 252 253func (h *dbHarness) get(key string, expectFound bool) (found bool, v []byte) { 254 return h.getr(h.db, key, expectFound) 255} 256 257func (h *dbHarness) getValr(db Reader, key, value string) { 258 t := h.t 259 found, r := h.getr(db, key, true) 260 if !found { 261 return 262 } 263 rval := string(r) 264 if rval != value { 265 t.Errorf("Get: invalid value, got '%s', want '%s'", rval, value) 266 } 267} 268 269func (h *dbHarness) getVal(key, value string) { 270 h.getValr(h.db, key, value) 271} 272 273func (h *dbHarness) allEntriesFor(key, want string) { 274 t := h.t 275 db := h.db 276 s := db.s 277 278 ikey := makeInternalKey(nil, []byte(key), keyMaxSeq, keyTypeVal) 279 iter := db.newRawIterator(nil, nil, nil, nil) 280 if !iter.Seek(ikey) && iter.Error() != nil { 281 t.Error("AllEntries: error during seek, err: ", iter.Error()) 282 return 283 } 284 res := "[ " 285 first := true 286 for iter.Valid() { 287 if ukey, _, kt, kerr := parseInternalKey(iter.Key()); kerr == nil { 288 if s.icmp.uCompare(ikey.ukey(), ukey) != 0 { 289 break 290 } 291 if !first { 292 res += ", " 293 } 294 first = false 295 switch kt { 296 case keyTypeVal: 297 res += string(iter.Value()) 298 case keyTypeDel: 299 res += "DEL" 300 } 301 } else { 302 if !first { 303 res += ", " 304 } 305 first = false 306 res += "CORRUPTED" 307 } 308 iter.Next() 309 } 310 if !first { 311 res += " " 312 } 313 res += "]" 314 if res != want { 315 t.Errorf("AllEntries: assert failed for key %q, got=%q want=%q", key, res, want) 316 } 317} 318 319// Return a string that contains all key,value pairs in order, 320// formatted like "(k1->v1)(k2->v2)". 321func (h *dbHarness) getKeyVal(want string) { 322 t := h.t 323 db := h.db 324 325 s, err := db.GetSnapshot() 326 if err != nil { 327 t.Fatal("GetSnapshot: got error: ", err) 328 } 329 res := "" 330 iter := s.NewIterator(nil, nil) 331 for iter.Next() { 332 res += fmt.Sprintf("(%s->%s)", string(iter.Key()), string(iter.Value())) 333 } 334 iter.Release() 335 336 if res != want { 337 t.Errorf("GetKeyVal: invalid key/value pair, got=%q want=%q", res, want) 338 } 339 s.Release() 340} 341 342func (h *dbHarness) waitCompaction() { 343 t := h.t 344 db := h.db 345 if err := db.compTriggerWait(db.tcompCmdC); err != nil { 346 t.Error("compaction error: ", err) 347 } 348} 349 350func (h *dbHarness) waitMemCompaction() { 351 t := h.t 352 db := h.db 353 354 if err := db.compTriggerWait(db.mcompCmdC); err != nil { 355 t.Error("compaction error: ", err) 356 } 357} 358 359func (h *dbHarness) compactMem() { 360 t := h.t 361 db := h.db 362 363 t.Log("starting memdb compaction") 364 365 db.writeLockC <- struct{}{} 366 defer func() { 367 <-db.writeLockC 368 }() 369 370 if _, err := db.rotateMem(0, true); err != nil { 371 t.Error("compaction error: ", err) 372 } 373 374 if h.totalTables() == 0 { 375 t.Error("zero tables after mem compaction") 376 } 377 378 t.Log("memdb compaction done") 379} 380 381func (h *dbHarness) compactRangeAtErr(level int, min, max string, wanterr bool) { 382 t := h.t 383 db := h.db 384 385 var _min, _max []byte 386 if min != "" { 387 _min = []byte(min) 388 } 389 if max != "" { 390 _max = []byte(max) 391 } 392 393 t.Logf("starting table range compaction: level=%d, min=%q, max=%q", level, min, max) 394 395 if err := db.compTriggerRange(db.tcompCmdC, level, _min, _max); err != nil { 396 if wanterr { 397 t.Log("CompactRangeAt: got error (expected): ", err) 398 } else { 399 t.Error("CompactRangeAt: got error: ", err) 400 } 401 } else if wanterr { 402 t.Error("CompactRangeAt: expect error") 403 } 404 405 t.Log("table range compaction done") 406} 407 408func (h *dbHarness) compactRangeAt(level int, min, max string) { 409 h.compactRangeAtErr(level, min, max, false) 410} 411 412func (h *dbHarness) compactRange(min, max string) { 413 t := h.t 414 db := h.db 415 416 t.Logf("starting DB range compaction: min=%q, max=%q", min, max) 417 418 var r util.Range 419 if min != "" { 420 r.Start = []byte(min) 421 } 422 if max != "" { 423 r.Limit = []byte(max) 424 } 425 if err := db.CompactRange(r); err != nil { 426 t.Error("CompactRange: got error: ", err) 427 } 428 429 t.Log("DB range compaction done") 430} 431 432func (h *dbHarness) sizeOf(start, limit string) int64 { 433 sz, err := h.db.SizeOf([]util.Range{ 434 {Start: []byte(start), Limit: []byte(limit)}, 435 }) 436 if err != nil { 437 h.t.Error("SizeOf: got error: ", err) 438 } 439 return sz.Sum() 440} 441 442func (h *dbHarness) sizeAssert(start, limit string, low, hi int64) { 443 sz := h.sizeOf(start, limit) 444 if sz < low || sz > hi { 445 h.t.Errorf("sizeOf %q to %q not in range, want %d - %d, got %d", 446 shorten(start), shorten(limit), low, hi, sz) 447 } 448} 449 450func (h *dbHarness) getSnapshot() (s *Snapshot) { 451 s, err := h.db.GetSnapshot() 452 if err != nil { 453 h.t.Fatal("GetSnapshot: got error: ", err) 454 } 455 return 456} 457 458func (h *dbHarness) getTablesPerLevel() string { 459 res := "" 460 nz := 0 461 v := h.db.s.version() 462 for level, tables := range v.levels { 463 if level > 0 { 464 res += "," 465 } 466 res += fmt.Sprint(len(tables)) 467 if len(tables) > 0 { 468 nz = len(res) 469 } 470 } 471 v.release() 472 return res[:nz] 473} 474 475func (h *dbHarness) tablesPerLevel(want string) { 476 res := h.getTablesPerLevel() 477 if res != want { 478 h.t.Errorf("invalid tables len, want=%s, got=%s", want, res) 479 } 480} 481 482func (h *dbHarness) totalTables() (n int) { 483 v := h.db.s.version() 484 for _, tables := range v.levels { 485 n += len(tables) 486 } 487 v.release() 488 return 489} 490 491type keyValue interface { 492 Key() []byte 493 Value() []byte 494} 495 496func testKeyVal(t *testing.T, kv keyValue, want string) { 497 res := string(kv.Key()) + "->" + string(kv.Value()) 498 if res != want { 499 t.Errorf("invalid key/value, want=%q, got=%q", want, res) 500 } 501} 502 503func numKey(num int) string { 504 return fmt.Sprintf("key%06d", num) 505} 506 507var testingBloomFilter = filter.NewBloomFilter(10) 508 509func truno(t *testing.T, o *opt.Options, f func(h *dbHarness)) { 510 for i := 0; i < 4; i++ { 511 func() { 512 switch i { 513 case 0: 514 case 1: 515 if o == nil { 516 o = &opt.Options{ 517 DisableLargeBatchTransaction: true, 518 Filter: testingBloomFilter, 519 } 520 } else { 521 old := o 522 o = &opt.Options{} 523 *o = *old 524 o.Filter = testingBloomFilter 525 } 526 case 2: 527 if o == nil { 528 o = &opt.Options{ 529 DisableLargeBatchTransaction: true, 530 Compression: opt.NoCompression, 531 } 532 } else { 533 old := o 534 o = &opt.Options{} 535 *o = *old 536 o.Compression = opt.NoCompression 537 } 538 } 539 h := newDbHarnessWopt(t, o) 540 defer h.close() 541 switch i { 542 case 3: 543 h.reopenDB() 544 } 545 f(h) 546 }() 547 } 548} 549 550func trun(t *testing.T, f func(h *dbHarness)) { 551 truno(t, nil, f) 552} 553 554func testAligned(t *testing.T, name string, offset uintptr) { 555 if offset%8 != 0 { 556 t.Errorf("field %s offset is not 64-bit aligned", name) 557 } 558} 559 560func Test_FieldsAligned(t *testing.T) { 561 p1 := new(DB) 562 testAligned(t, "DB.seq", unsafe.Offsetof(p1.seq)) 563 p2 := new(session) 564 testAligned(t, "session.stNextFileNum", unsafe.Offsetof(p2.stNextFileNum)) 565 testAligned(t, "session.stJournalNum", unsafe.Offsetof(p2.stJournalNum)) 566 testAligned(t, "session.stPrevJournalNum", unsafe.Offsetof(p2.stPrevJournalNum)) 567 testAligned(t, "session.stSeqNum", unsafe.Offsetof(p2.stSeqNum)) 568} 569 570func TestDB_Locking(t *testing.T) { 571 h := newDbHarness(t) 572 defer h.stor.Close() 573 h.openAssert(false) 574 h.closeDB() 575 h.openAssert(true) 576} 577 578func TestDB_Empty(t *testing.T) { 579 trun(t, func(h *dbHarness) { 580 h.get("foo", false) 581 582 h.reopenDB() 583 h.get("foo", false) 584 }) 585} 586 587func TestDB_ReadWrite(t *testing.T) { 588 trun(t, func(h *dbHarness) { 589 h.put("foo", "v1") 590 h.getVal("foo", "v1") 591 h.put("bar", "v2") 592 h.put("foo", "v3") 593 h.getVal("foo", "v3") 594 h.getVal("bar", "v2") 595 596 h.reopenDB() 597 h.getVal("foo", "v3") 598 h.getVal("bar", "v2") 599 }) 600} 601 602func TestDB_PutDeleteGet(t *testing.T) { 603 trun(t, func(h *dbHarness) { 604 h.put("foo", "v1") 605 h.getVal("foo", "v1") 606 h.put("foo", "v2") 607 h.getVal("foo", "v2") 608 h.delete("foo") 609 h.get("foo", false) 610 611 h.reopenDB() 612 h.get("foo", false) 613 }) 614} 615 616func TestDB_EmptyBatch(t *testing.T) { 617 h := newDbHarness(t) 618 defer h.close() 619 620 h.get("foo", false) 621 err := h.db.Write(new(Batch), h.wo) 622 if err != nil { 623 t.Error("writing empty batch yield error: ", err) 624 } 625 h.get("foo", false) 626} 627 628func TestDB_GetFromFrozen(t *testing.T) { 629 h := newDbHarnessWopt(t, &opt.Options{ 630 DisableLargeBatchTransaction: true, 631 WriteBuffer: 100100, 632 }) 633 defer h.close() 634 635 h.put("foo", "v1") 636 h.getVal("foo", "v1") 637 638 h.stor.Stall(testutil.ModeSync, storage.TypeTable) // Block sync calls 639 h.put("k1", strings.Repeat("x", 100000)) // Fill memtable 640 h.put("k2", strings.Repeat("y", 100000)) // Trigger compaction 641 for i := 0; h.db.getFrozenMem() == nil && i < 100; i++ { 642 time.Sleep(10 * time.Microsecond) 643 } 644 if h.db.getFrozenMem() == nil { 645 h.stor.Release(testutil.ModeSync, storage.TypeTable) 646 t.Fatal("No frozen mem") 647 } 648 h.getVal("foo", "v1") 649 h.stor.Release(testutil.ModeSync, storage.TypeTable) // Release sync calls 650 651 h.reopenDB() 652 h.getVal("foo", "v1") 653 h.get("k1", true) 654 h.get("k2", true) 655} 656 657func TestDB_GetFromTable(t *testing.T) { 658 trun(t, func(h *dbHarness) { 659 h.put("foo", "v1") 660 h.compactMem() 661 h.getVal("foo", "v1") 662 }) 663} 664 665func TestDB_GetSnapshot(t *testing.T) { 666 trun(t, func(h *dbHarness) { 667 bar := strings.Repeat("b", 200) 668 h.put("foo", "v1") 669 h.put(bar, "v1") 670 671 snap, err := h.db.GetSnapshot() 672 if err != nil { 673 t.Fatal("GetSnapshot: got error: ", err) 674 } 675 676 h.put("foo", "v2") 677 h.put(bar, "v2") 678 679 h.getVal("foo", "v2") 680 h.getVal(bar, "v2") 681 h.getValr(snap, "foo", "v1") 682 h.getValr(snap, bar, "v1") 683 684 h.compactMem() 685 686 h.getVal("foo", "v2") 687 h.getVal(bar, "v2") 688 h.getValr(snap, "foo", "v1") 689 h.getValr(snap, bar, "v1") 690 691 snap.Release() 692 693 h.reopenDB() 694 h.getVal("foo", "v2") 695 h.getVal(bar, "v2") 696 }) 697} 698 699func TestDB_GetLevel0Ordering(t *testing.T) { 700 trun(t, func(h *dbHarness) { 701 h.db.memdbMaxLevel = 2 702 703 for i := 0; i < 4; i++ { 704 h.put("bar", fmt.Sprintf("b%d", i)) 705 h.put("foo", fmt.Sprintf("v%d", i)) 706 h.compactMem() 707 } 708 h.getVal("foo", "v3") 709 h.getVal("bar", "b3") 710 711 v := h.db.s.version() 712 t0len := v.tLen(0) 713 v.release() 714 if t0len < 2 { 715 t.Errorf("level-0 tables is less than 2, got %d", t0len) 716 } 717 718 h.reopenDB() 719 h.getVal("foo", "v3") 720 h.getVal("bar", "b3") 721 }) 722} 723 724func TestDB_GetOrderedByLevels(t *testing.T) { 725 trun(t, func(h *dbHarness) { 726 h.put("foo", "v1") 727 h.compactMem() 728 h.compactRange("a", "z") 729 h.getVal("foo", "v1") 730 h.put("foo", "v2") 731 h.compactMem() 732 h.getVal("foo", "v2") 733 }) 734} 735 736func TestDB_GetPicksCorrectFile(t *testing.T) { 737 trun(t, func(h *dbHarness) { 738 // Arrange to have multiple files in a non-level-0 level. 739 h.put("a", "va") 740 h.compactMem() 741 h.compactRange("a", "b") 742 h.put("x", "vx") 743 h.compactMem() 744 h.compactRange("x", "y") 745 h.put("f", "vf") 746 h.compactMem() 747 h.compactRange("f", "g") 748 749 h.getVal("a", "va") 750 h.getVal("f", "vf") 751 h.getVal("x", "vx") 752 753 h.compactRange("", "") 754 h.getVal("a", "va") 755 h.getVal("f", "vf") 756 h.getVal("x", "vx") 757 }) 758} 759 760func TestDB_GetEncountersEmptyLevel(t *testing.T) { 761 trun(t, func(h *dbHarness) { 762 h.db.memdbMaxLevel = 2 763 764 // Arrange for the following to happen: 765 // * sstable A in level 0 766 // * nothing in level 1 767 // * sstable B in level 2 768 // Then do enough Get() calls to arrange for an automatic compaction 769 // of sstable A. A bug would cause the compaction to be marked as 770 // occurring at level 1 (instead of the correct level 0). 771 772 // Step 1: First place sstables in levels 0 and 2 773 for i := 0; ; i++ { 774 if i >= 100 { 775 t.Fatal("could not fill levels-0 and level-2") 776 } 777 v := h.db.s.version() 778 if v.tLen(0) > 0 && v.tLen(2) > 0 { 779 v.release() 780 break 781 } 782 v.release() 783 h.put("a", "begin") 784 h.put("z", "end") 785 h.compactMem() 786 787 h.getVal("a", "begin") 788 h.getVal("z", "end") 789 } 790 791 // Step 2: clear level 1 if necessary. 792 h.compactRangeAt(1, "", "") 793 h.tablesPerLevel("1,0,1") 794 795 h.getVal("a", "begin") 796 h.getVal("z", "end") 797 798 // Step 3: read a bunch of times 799 for i := 0; i < 200; i++ { 800 h.get("missing", false) 801 } 802 803 // Step 4: Wait for compaction to finish 804 h.waitCompaction() 805 806 v := h.db.s.version() 807 if v.tLen(0) > 0 { 808 t.Errorf("level-0 tables more than 0, got %d", v.tLen(0)) 809 } 810 v.release() 811 812 h.getVal("a", "begin") 813 h.getVal("z", "end") 814 }) 815} 816 817func TestDB_IterMultiWithDelete(t *testing.T) { 818 trun(t, func(h *dbHarness) { 819 h.put("a", "va") 820 h.put("b", "vb") 821 h.put("c", "vc") 822 h.delete("b") 823 h.get("b", false) 824 825 iter := h.db.NewIterator(nil, nil) 826 iter.Seek([]byte("c")) 827 testKeyVal(t, iter, "c->vc") 828 iter.Prev() 829 testKeyVal(t, iter, "a->va") 830 iter.Release() 831 832 h.compactMem() 833 834 iter = h.db.NewIterator(nil, nil) 835 iter.Seek([]byte("c")) 836 testKeyVal(t, iter, "c->vc") 837 iter.Prev() 838 testKeyVal(t, iter, "a->va") 839 iter.Release() 840 }) 841} 842 843func TestDB_IteratorPinsRef(t *testing.T) { 844 h := newDbHarness(t) 845 defer h.close() 846 847 h.put("foo", "hello") 848 849 // Get iterator that will yield the current contents of the DB. 850 iter := h.db.NewIterator(nil, nil) 851 852 // Write to force compactions 853 h.put("foo", "newvalue1") 854 for i := 0; i < 100; i++ { 855 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10)) 856 } 857 h.put("foo", "newvalue2") 858 859 iter.First() 860 testKeyVal(t, iter, "foo->hello") 861 if iter.Next() { 862 t.Errorf("expect eof") 863 } 864 iter.Release() 865} 866 867func TestDB_Recover(t *testing.T) { 868 trun(t, func(h *dbHarness) { 869 h.put("foo", "v1") 870 h.put("baz", "v5") 871 872 h.reopenDB() 873 h.getVal("foo", "v1") 874 875 h.getVal("foo", "v1") 876 h.getVal("baz", "v5") 877 h.put("bar", "v2") 878 h.put("foo", "v3") 879 880 h.reopenDB() 881 h.getVal("foo", "v3") 882 h.put("foo", "v4") 883 h.getVal("foo", "v4") 884 h.getVal("bar", "v2") 885 h.getVal("baz", "v5") 886 }) 887} 888 889func TestDB_RecoverWithEmptyJournal(t *testing.T) { 890 trun(t, func(h *dbHarness) { 891 h.put("foo", "v1") 892 h.put("foo", "v2") 893 894 h.reopenDB() 895 h.reopenDB() 896 h.put("foo", "v3") 897 898 h.reopenDB() 899 h.getVal("foo", "v3") 900 }) 901} 902 903func TestDB_RecoverDuringMemtableCompaction(t *testing.T) { 904 truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 1000000}, func(h *dbHarness) { 905 906 h.stor.Stall(testutil.ModeSync, storage.TypeTable) 907 h.put("big1", strings.Repeat("x", 10000000)) 908 h.put("big2", strings.Repeat("y", 1000)) 909 h.put("bar", "v2") 910 h.stor.Release(testutil.ModeSync, storage.TypeTable) 911 912 h.reopenDB() 913 h.getVal("bar", "v2") 914 h.getVal("big1", strings.Repeat("x", 10000000)) 915 h.getVal("big2", strings.Repeat("y", 1000)) 916 }) 917} 918 919func TestDB_MinorCompactionsHappen(t *testing.T) { 920 h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 10000}) 921 defer h.close() 922 923 n := 500 924 925 key := func(i int) string { 926 return fmt.Sprintf("key%06d", i) 927 } 928 929 for i := 0; i < n; i++ { 930 h.put(key(i), key(i)+strings.Repeat("v", 1000)) 931 } 932 933 for i := 0; i < n; i++ { 934 h.getVal(key(i), key(i)+strings.Repeat("v", 1000)) 935 } 936 937 h.reopenDB() 938 for i := 0; i < n; i++ { 939 h.getVal(key(i), key(i)+strings.Repeat("v", 1000)) 940 } 941} 942 943func TestDB_RecoverWithLargeJournal(t *testing.T) { 944 h := newDbHarness(t) 945 defer h.close() 946 947 h.put("big1", strings.Repeat("1", 200000)) 948 h.put("big2", strings.Repeat("2", 200000)) 949 h.put("small3", strings.Repeat("3", 10)) 950 h.put("small4", strings.Repeat("4", 10)) 951 h.tablesPerLevel("") 952 953 // Make sure that if we re-open with a small write buffer size that 954 // we flush table files in the middle of a large journal file. 955 h.o.WriteBuffer = 100000 956 h.reopenDB() 957 h.getVal("big1", strings.Repeat("1", 200000)) 958 h.getVal("big2", strings.Repeat("2", 200000)) 959 h.getVal("small3", strings.Repeat("3", 10)) 960 h.getVal("small4", strings.Repeat("4", 10)) 961 v := h.db.s.version() 962 if v.tLen(0) <= 1 { 963 t.Errorf("tables-0 less than one") 964 } 965 v.release() 966} 967 968func TestDB_CompactionsGenerateMultipleFiles(t *testing.T) { 969 h := newDbHarnessWopt(t, &opt.Options{ 970 DisableLargeBatchTransaction: true, 971 WriteBuffer: 10000000, 972 Compression: opt.NoCompression, 973 }) 974 defer h.close() 975 976 v := h.db.s.version() 977 if v.tLen(0) > 0 { 978 t.Errorf("level-0 tables more than 0, got %d", v.tLen(0)) 979 } 980 v.release() 981 982 n := 80 983 984 // Write 8MB (80 values, each 100K) 985 for i := 0; i < n; i++ { 986 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10)) 987 } 988 989 // Reopening moves updates to level-0 990 h.reopenDB() 991 h.compactRangeAt(0, "", "") 992 993 v = h.db.s.version() 994 if v.tLen(0) > 0 { 995 t.Errorf("level-0 tables more than 0, got %d", v.tLen(0)) 996 } 997 if v.tLen(1) <= 1 { 998 t.Errorf("level-1 tables less than 1, got %d", v.tLen(1)) 999 } 1000 v.release() 1001 1002 for i := 0; i < n; i++ { 1003 h.getVal(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10)) 1004 } 1005} 1006 1007func TestDB_RepeatedWritesToSameKey(t *testing.T) { 1008 h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 100000}) 1009 defer h.close() 1010 1011 maxTables := h.o.GetWriteL0PauseTrigger() + 7 1012 1013 value := strings.Repeat("v", 2*h.o.GetWriteBuffer()) 1014 for i := 0; i < 5*maxTables; i++ { 1015 h.put("key", value) 1016 n := h.totalTables() 1017 if n > maxTables { 1018 t.Errorf("total tables exceed %d, got=%d, iter=%d", maxTables, n, i) 1019 } 1020 } 1021} 1022 1023func TestDB_RepeatedWritesToSameKeyAfterReopen(t *testing.T) { 1024 h := newDbHarnessWopt(t, &opt.Options{ 1025 DisableLargeBatchTransaction: true, 1026 WriteBuffer: 100000, 1027 }) 1028 defer h.close() 1029 1030 h.reopenDB() 1031 1032 maxTables := h.o.GetWriteL0PauseTrigger() + 7 1033 1034 value := strings.Repeat("v", 2*h.o.GetWriteBuffer()) 1035 for i := 0; i < 5*maxTables; i++ { 1036 h.put("key", value) 1037 n := h.totalTables() 1038 if n > maxTables { 1039 t.Errorf("total tables exceed %d, got=%d, iter=%d", maxTables, n, i) 1040 } 1041 } 1042} 1043 1044func TestDB_SparseMerge(t *testing.T) { 1045 h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, Compression: opt.NoCompression}) 1046 defer h.close() 1047 1048 h.putMulti(7, "A", "Z") 1049 1050 // Suppose there is: 1051 // small amount of data with prefix A 1052 // large amount of data with prefix B 1053 // small amount of data with prefix C 1054 // and that recent updates have made small changes to all three prefixes. 1055 // Check that we do not do a compaction that merges all of B in one shot. 1056 h.put("A", "va") 1057 value := strings.Repeat("x", 1000) 1058 for i := 0; i < 100000; i++ { 1059 h.put(fmt.Sprintf("B%010d", i), value) 1060 } 1061 h.put("C", "vc") 1062 h.compactMem() 1063 h.compactRangeAt(0, "", "") 1064 h.waitCompaction() 1065 1066 // Make sparse update 1067 h.put("A", "va2") 1068 h.put("B100", "bvalue2") 1069 h.put("C", "vc2") 1070 h.compactMem() 1071 1072 h.waitCompaction() 1073 h.maxNextLevelOverlappingBytes(20 * 1048576) 1074 h.compactRangeAt(0, "", "") 1075 h.waitCompaction() 1076 h.maxNextLevelOverlappingBytes(20 * 1048576) 1077 h.compactRangeAt(1, "", "") 1078 h.waitCompaction() 1079 h.maxNextLevelOverlappingBytes(20 * 1048576) 1080} 1081 1082func TestDB_SizeOf(t *testing.T) { 1083 h := newDbHarnessWopt(t, &opt.Options{ 1084 DisableLargeBatchTransaction: true, 1085 Compression: opt.NoCompression, 1086 WriteBuffer: 10000000, 1087 }) 1088 defer h.close() 1089 1090 h.sizeAssert("", "xyz", 0, 0) 1091 h.reopenDB() 1092 h.sizeAssert("", "xyz", 0, 0) 1093 1094 // Write 8MB (80 values, each 100K) 1095 n := 80 1096 s1 := 100000 1097 s2 := 105000 1098 1099 for i := 0; i < n; i++ { 1100 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), s1/10)) 1101 } 1102 1103 // 0 because SizeOf() does not account for memtable space 1104 h.sizeAssert("", numKey(50), 0, 0) 1105 1106 for r := 0; r < 3; r++ { 1107 h.reopenDB() 1108 1109 for cs := 0; cs < n; cs += 10 { 1110 for i := 0; i < n; i += 10 { 1111 h.sizeAssert("", numKey(i), int64(s1*i), int64(s2*i)) 1112 h.sizeAssert("", numKey(i)+".suffix", int64(s1*(i+1)), int64(s2*(i+1))) 1113 h.sizeAssert(numKey(i), numKey(i+10), int64(s1*10), int64(s2*10)) 1114 } 1115 1116 h.sizeAssert("", numKey(50), int64(s1*50), int64(s2*50)) 1117 h.sizeAssert("", numKey(50)+".suffix", int64(s1*50), int64(s2*50)) 1118 1119 h.compactRangeAt(0, numKey(cs), numKey(cs+9)) 1120 } 1121 1122 v := h.db.s.version() 1123 if v.tLen(0) != 0 { 1124 t.Errorf("level-0 tables was not zero, got %d", v.tLen(0)) 1125 } 1126 if v.tLen(1) == 0 { 1127 t.Error("level-1 tables was zero") 1128 } 1129 v.release() 1130 } 1131} 1132 1133func TestDB_SizeOf_MixOfSmallAndLarge(t *testing.T) { 1134 h := newDbHarnessWopt(t, &opt.Options{ 1135 DisableLargeBatchTransaction: true, 1136 Compression: opt.NoCompression, 1137 }) 1138 defer h.close() 1139 1140 sizes := []int64{ 1141 10000, 1142 10000, 1143 100000, 1144 10000, 1145 100000, 1146 10000, 1147 300000, 1148 10000, 1149 } 1150 1151 for i, n := range sizes { 1152 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), int(n)/10)) 1153 } 1154 1155 for r := 0; r < 3; r++ { 1156 h.reopenDB() 1157 1158 var x int64 1159 for i, n := range sizes { 1160 y := x 1161 if i > 0 { 1162 y += 1000 1163 } 1164 h.sizeAssert("", numKey(i), x, y) 1165 x += n 1166 } 1167 1168 h.sizeAssert(numKey(3), numKey(5), 110000, 111000) 1169 1170 h.compactRangeAt(0, "", "") 1171 } 1172} 1173 1174func TestDB_Snapshot(t *testing.T) { 1175 trun(t, func(h *dbHarness) { 1176 h.put("foo", "v1") 1177 s1 := h.getSnapshot() 1178 h.put("foo", "v2") 1179 s2 := h.getSnapshot() 1180 h.put("foo", "v3") 1181 s3 := h.getSnapshot() 1182 h.put("foo", "v4") 1183 1184 h.getValr(s1, "foo", "v1") 1185 h.getValr(s2, "foo", "v2") 1186 h.getValr(s3, "foo", "v3") 1187 h.getVal("foo", "v4") 1188 1189 s3.Release() 1190 h.getValr(s1, "foo", "v1") 1191 h.getValr(s2, "foo", "v2") 1192 h.getVal("foo", "v4") 1193 1194 s1.Release() 1195 h.getValr(s2, "foo", "v2") 1196 h.getVal("foo", "v4") 1197 1198 s2.Release() 1199 h.getVal("foo", "v4") 1200 }) 1201} 1202 1203func TestDB_SnapshotList(t *testing.T) { 1204 db := &DB{snapsList: list.New()} 1205 e0a := db.acquireSnapshot() 1206 e0b := db.acquireSnapshot() 1207 db.seq = 1 1208 e1 := db.acquireSnapshot() 1209 db.seq = 2 1210 e2 := db.acquireSnapshot() 1211 1212 if db.minSeq() != 0 { 1213 t.Fatalf("invalid sequence number, got=%d", db.minSeq()) 1214 } 1215 db.releaseSnapshot(e0a) 1216 if db.minSeq() != 0 { 1217 t.Fatalf("invalid sequence number, got=%d", db.minSeq()) 1218 } 1219 db.releaseSnapshot(e2) 1220 if db.minSeq() != 0 { 1221 t.Fatalf("invalid sequence number, got=%d", db.minSeq()) 1222 } 1223 db.releaseSnapshot(e0b) 1224 if db.minSeq() != 1 { 1225 t.Fatalf("invalid sequence number, got=%d", db.minSeq()) 1226 } 1227 e2 = db.acquireSnapshot() 1228 if db.minSeq() != 1 { 1229 t.Fatalf("invalid sequence number, got=%d", db.minSeq()) 1230 } 1231 db.releaseSnapshot(e1) 1232 if db.minSeq() != 2 { 1233 t.Fatalf("invalid sequence number, got=%d", db.minSeq()) 1234 } 1235 db.releaseSnapshot(e2) 1236 if db.minSeq() != 2 { 1237 t.Fatalf("invalid sequence number, got=%d", db.minSeq()) 1238 } 1239} 1240 1241func TestDB_HiddenValuesAreRemoved(t *testing.T) { 1242 trun(t, func(h *dbHarness) { 1243 s := h.db.s 1244 1245 m := 2 1246 h.db.memdbMaxLevel = m 1247 1248 h.put("foo", "v1") 1249 h.compactMem() 1250 v := s.version() 1251 num := v.tLen(m) 1252 v.release() 1253 if num != 1 { 1254 t.Errorf("invalid level-%d len, want=1 got=%d", m, num) 1255 } 1256 1257 // Place a table at level last-1 to prevent merging with preceding mutation 1258 h.put("a", "begin") 1259 h.put("z", "end") 1260 h.compactMem() 1261 v = s.version() 1262 if v.tLen(m) != 1 { 1263 t.Errorf("invalid level-%d len, want=1 got=%d", m, v.tLen(m)) 1264 } 1265 if v.tLen(m-1) != 1 { 1266 t.Errorf("invalid level-%d len, want=1 got=%d", m-1, v.tLen(m-1)) 1267 } 1268 v.release() 1269 1270 h.delete("foo") 1271 h.put("foo", "v2") 1272 h.allEntriesFor("foo", "[ v2, DEL, v1 ]") 1273 h.compactMem() 1274 h.allEntriesFor("foo", "[ v2, DEL, v1 ]") 1275 h.compactRangeAt(m-2, "", "z") 1276 // DEL eliminated, but v1 remains because we aren't compacting that level 1277 // (DEL can be eliminated because v2 hides v1). 1278 h.allEntriesFor("foo", "[ v2, v1 ]") 1279 h.compactRangeAt(m-1, "", "") 1280 // Merging last-1 w/ last, so we are the base level for "foo", so 1281 // DEL is removed. (as is v1). 1282 h.allEntriesFor("foo", "[ v2 ]") 1283 }) 1284} 1285 1286func TestDB_DeletionMarkers2(t *testing.T) { 1287 h := newDbHarness(t) 1288 defer h.close() 1289 s := h.db.s 1290 1291 m := 2 1292 h.db.memdbMaxLevel = m 1293 1294 h.put("foo", "v1") 1295 h.compactMem() 1296 v := s.version() 1297 num := v.tLen(m) 1298 v.release() 1299 if num != 1 { 1300 t.Errorf("invalid level-%d len, want=1 got=%d", m, num) 1301 } 1302 1303 // Place a table at level last-1 to prevent merging with preceding mutation 1304 h.put("a", "begin") 1305 h.put("z", "end") 1306 h.compactMem() 1307 v = s.version() 1308 if v.tLen(m) != 1 { 1309 t.Errorf("invalid level-%d len, want=1 got=%d", m, v.tLen(m)) 1310 } 1311 if v.tLen(m-1) != 1 { 1312 t.Errorf("invalid level-%d len, want=1 got=%d", m-1, v.tLen(m-1)) 1313 } 1314 v.release() 1315 1316 h.delete("foo") 1317 h.allEntriesFor("foo", "[ DEL, v1 ]") 1318 h.compactMem() // Moves to level last-2 1319 h.allEntriesFor("foo", "[ DEL, v1 ]") 1320 h.compactRangeAt(m-2, "", "") 1321 // DEL kept: "last" file overlaps 1322 h.allEntriesFor("foo", "[ DEL, v1 ]") 1323 h.compactRangeAt(m-1, "", "") 1324 // Merging last-1 w/ last, so we are the base level for "foo", so 1325 // DEL is removed. (as is v1). 1326 h.allEntriesFor("foo", "[ ]") 1327} 1328 1329func TestDB_CompactionTableOpenError(t *testing.T) { 1330 h := newDbHarnessWopt(t, &opt.Options{ 1331 DisableLargeBatchTransaction: true, 1332 OpenFilesCacheCapacity: -1, 1333 }) 1334 defer h.close() 1335 1336 h.db.memdbMaxLevel = 2 1337 1338 im := 10 1339 jm := 10 1340 for r := 0; r < 2; r++ { 1341 for i := 0; i < im; i++ { 1342 for j := 0; j < jm; j++ { 1343 h.put(fmt.Sprintf("k%d,%d", i, j), fmt.Sprintf("v%d,%d", i, j)) 1344 } 1345 h.compactMem() 1346 } 1347 } 1348 1349 if n := h.totalTables(); n != im*2 { 1350 t.Errorf("total tables is %d, want %d", n, im*2) 1351 } 1352 1353 h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, errors.New("open error during table compaction")) 1354 go h.db.CompactRange(util.Range{}) 1355 if err := h.db.compTriggerWait(h.db.tcompCmdC); err != nil { 1356 t.Log("compaction error: ", err) 1357 } 1358 h.closeDB0() 1359 h.openDB() 1360 h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, nil) 1361 1362 for i := 0; i < im; i++ { 1363 for j := 0; j < jm; j++ { 1364 h.getVal(fmt.Sprintf("k%d,%d", i, j), fmt.Sprintf("v%d,%d", i, j)) 1365 } 1366 } 1367} 1368 1369func TestDB_OverlapInLevel0(t *testing.T) { 1370 trun(t, func(h *dbHarness) { 1371 h.db.memdbMaxLevel = 2 1372 1373 // Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0. 1374 h.put("100", "v100") 1375 h.put("999", "v999") 1376 h.compactMem() 1377 h.delete("100") 1378 h.delete("999") 1379 h.compactMem() 1380 h.tablesPerLevel("0,1,1") 1381 1382 // Make files spanning the following ranges in level-0: 1383 // files[0] 200 .. 900 1384 // files[1] 300 .. 500 1385 // Note that files are sorted by min key. 1386 h.put("300", "v300") 1387 h.put("500", "v500") 1388 h.compactMem() 1389 h.put("200", "v200") 1390 h.put("600", "v600") 1391 h.put("900", "v900") 1392 h.compactMem() 1393 h.tablesPerLevel("2,1,1") 1394 1395 // Compact away the placeholder files we created initially 1396 h.compactRangeAt(1, "", "") 1397 h.compactRangeAt(2, "", "") 1398 h.tablesPerLevel("2") 1399 1400 // Do a memtable compaction. Before bug-fix, the compaction would 1401 // not detect the overlap with level-0 files and would incorrectly place 1402 // the deletion in a deeper level. 1403 h.delete("600") 1404 h.compactMem() 1405 h.tablesPerLevel("3") 1406 h.get("600", false) 1407 }) 1408} 1409 1410func TestDB_L0_CompactionBug_Issue44_a(t *testing.T) { 1411 h := newDbHarness(t) 1412 defer h.close() 1413 1414 h.reopenDB() 1415 h.put("b", "v") 1416 h.reopenDB() 1417 h.delete("b") 1418 h.delete("a") 1419 h.reopenDB() 1420 h.delete("a") 1421 h.reopenDB() 1422 h.put("a", "v") 1423 h.reopenDB() 1424 h.reopenDB() 1425 h.getKeyVal("(a->v)") 1426 h.waitCompaction() 1427 h.getKeyVal("(a->v)") 1428} 1429 1430func TestDB_L0_CompactionBug_Issue44_b(t *testing.T) { 1431 h := newDbHarness(t) 1432 defer h.close() 1433 1434 h.reopenDB() 1435 h.put("", "") 1436 h.reopenDB() 1437 h.delete("e") 1438 h.put("", "") 1439 h.reopenDB() 1440 h.put("c", "cv") 1441 h.reopenDB() 1442 h.put("", "") 1443 h.reopenDB() 1444 h.put("", "") 1445 h.waitCompaction() 1446 h.reopenDB() 1447 h.put("d", "dv") 1448 h.reopenDB() 1449 h.put("", "") 1450 h.reopenDB() 1451 h.delete("d") 1452 h.delete("b") 1453 h.reopenDB() 1454 h.getKeyVal("(->)(c->cv)") 1455 h.waitCompaction() 1456 h.getKeyVal("(->)(c->cv)") 1457} 1458 1459func TestDB_SingleEntryMemCompaction(t *testing.T) { 1460 trun(t, func(h *dbHarness) { 1461 for i := 0; i < 10; i++ { 1462 h.put("big", strings.Repeat("v", opt.DefaultWriteBuffer)) 1463 h.compactMem() 1464 h.put("key", strings.Repeat("v", opt.DefaultBlockSize)) 1465 h.compactMem() 1466 h.put("k", "v") 1467 h.compactMem() 1468 h.put("", "") 1469 h.compactMem() 1470 h.put("verybig", strings.Repeat("v", opt.DefaultWriteBuffer*2)) 1471 h.compactMem() 1472 } 1473 }) 1474} 1475 1476func TestDB_ManifestWriteError(t *testing.T) { 1477 for i := 0; i < 2; i++ { 1478 func() { 1479 h := newDbHarness(t) 1480 defer h.close() 1481 1482 h.put("foo", "bar") 1483 h.getVal("foo", "bar") 1484 1485 // Mem compaction (will succeed) 1486 h.compactMem() 1487 h.getVal("foo", "bar") 1488 v := h.db.s.version() 1489 if n := v.tLen(0); n != 1 { 1490 t.Errorf("invalid total tables, want=1 got=%d", n) 1491 } 1492 v.release() 1493 1494 if i == 0 { 1495 h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, errors.New("manifest write error")) 1496 } else { 1497 h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, errors.New("manifest sync error")) 1498 } 1499 1500 // Merging compaction (will fail) 1501 h.compactRangeAtErr(0, "", "", true) 1502 1503 h.db.Close() 1504 h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, nil) 1505 h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, nil) 1506 1507 // Should not lose data 1508 h.openDB() 1509 h.getVal("foo", "bar") 1510 }() 1511 } 1512} 1513 1514func assertErr(t *testing.T, err error, wanterr bool) { 1515 if err != nil { 1516 if wanterr { 1517 t.Log("AssertErr: got error (expected): ", err) 1518 } else { 1519 t.Error("AssertErr: got error: ", err) 1520 } 1521 } else if wanterr { 1522 t.Error("AssertErr: expect error") 1523 } 1524} 1525 1526func TestDB_ClosedIsClosed(t *testing.T) { 1527 h := newDbHarness(t) 1528 db := h.db 1529 1530 var iter, iter2 iterator.Iterator 1531 var snap *Snapshot 1532 func() { 1533 defer h.close() 1534 1535 h.put("k", "v") 1536 h.getVal("k", "v") 1537 1538 iter = db.NewIterator(nil, h.ro) 1539 iter.Seek([]byte("k")) 1540 testKeyVal(t, iter, "k->v") 1541 1542 var err error 1543 snap, err = db.GetSnapshot() 1544 if err != nil { 1545 t.Fatal("GetSnapshot: got error: ", err) 1546 } 1547 1548 h.getValr(snap, "k", "v") 1549 1550 iter2 = snap.NewIterator(nil, h.ro) 1551 iter2.Seek([]byte("k")) 1552 testKeyVal(t, iter2, "k->v") 1553 1554 h.put("foo", "v2") 1555 h.delete("foo") 1556 1557 // closing DB 1558 iter.Release() 1559 iter2.Release() 1560 }() 1561 1562 assertErr(t, db.Put([]byte("x"), []byte("y"), h.wo), true) 1563 _, err := db.Get([]byte("k"), h.ro) 1564 assertErr(t, err, true) 1565 1566 if iter.Valid() { 1567 t.Errorf("iter.Valid should false") 1568 } 1569 assertErr(t, iter.Error(), false) 1570 testKeyVal(t, iter, "->") 1571 if iter.Seek([]byte("k")) { 1572 t.Errorf("iter.Seek should false") 1573 } 1574 assertErr(t, iter.Error(), true) 1575 1576 assertErr(t, iter2.Error(), false) 1577 1578 _, err = snap.Get([]byte("k"), h.ro) 1579 assertErr(t, err, true) 1580 1581 _, err = db.GetSnapshot() 1582 assertErr(t, err, true) 1583 1584 iter3 := db.NewIterator(nil, h.ro) 1585 assertErr(t, iter3.Error(), true) 1586 1587 iter3 = snap.NewIterator(nil, h.ro) 1588 assertErr(t, iter3.Error(), true) 1589 1590 assertErr(t, db.Delete([]byte("k"), h.wo), true) 1591 1592 _, err = db.GetProperty("leveldb.stats") 1593 assertErr(t, err, true) 1594 1595 _, err = db.SizeOf([]util.Range{{Start: []byte("a"), Limit: []byte("z")}}) 1596 assertErr(t, err, true) 1597 1598 assertErr(t, db.CompactRange(util.Range{}), true) 1599 1600 assertErr(t, db.Close(), true) 1601} 1602 1603type numberComparer struct{} 1604 1605func (numberComparer) num(x []byte) (n int) { 1606 fmt.Sscan(string(x[1:len(x)-1]), &n) 1607 return 1608} 1609 1610func (numberComparer) Name() string { 1611 return "test.NumberComparer" 1612} 1613 1614func (p numberComparer) Compare(a, b []byte) int { 1615 return p.num(a) - p.num(b) 1616} 1617 1618func (numberComparer) Separator(dst, a, b []byte) []byte { return nil } 1619func (numberComparer) Successor(dst, b []byte) []byte { return nil } 1620 1621func TestDB_CustomComparer(t *testing.T) { 1622 h := newDbHarnessWopt(t, &opt.Options{ 1623 DisableLargeBatchTransaction: true, 1624 Comparer: numberComparer{}, 1625 WriteBuffer: 1000, 1626 }) 1627 defer h.close() 1628 1629 h.put("[10]", "ten") 1630 h.put("[0x14]", "twenty") 1631 for i := 0; i < 2; i++ { 1632 h.getVal("[10]", "ten") 1633 h.getVal("[0xa]", "ten") 1634 h.getVal("[20]", "twenty") 1635 h.getVal("[0x14]", "twenty") 1636 h.get("[15]", false) 1637 h.get("[0xf]", false) 1638 h.compactMem() 1639 h.compactRange("[0]", "[9999]") 1640 } 1641 1642 for n := 0; n < 2; n++ { 1643 for i := 0; i < 100; i++ { 1644 v := fmt.Sprintf("[%d]", i*10) 1645 h.put(v, v) 1646 } 1647 h.compactMem() 1648 h.compactRange("[0]", "[1000000]") 1649 } 1650} 1651 1652func TestDB_ManualCompaction(t *testing.T) { 1653 h := newDbHarness(t) 1654 defer h.close() 1655 1656 h.db.memdbMaxLevel = 2 1657 1658 h.putMulti(3, "p", "q") 1659 h.tablesPerLevel("1,1,1") 1660 1661 // Compaction range falls before files 1662 h.compactRange("", "c") 1663 h.tablesPerLevel("1,1,1") 1664 1665 // Compaction range falls after files 1666 h.compactRange("r", "z") 1667 h.tablesPerLevel("1,1,1") 1668 1669 // Compaction range overlaps files 1670 h.compactRange("p1", "p9") 1671 h.tablesPerLevel("0,0,1") 1672 1673 // Populate a different range 1674 h.putMulti(3, "c", "e") 1675 h.tablesPerLevel("1,1,2") 1676 1677 // Compact just the new range 1678 h.compactRange("b", "f") 1679 h.tablesPerLevel("0,0,2") 1680 1681 // Compact all 1682 h.putMulti(1, "a", "z") 1683 h.tablesPerLevel("0,1,2") 1684 h.compactRange("", "") 1685 h.tablesPerLevel("0,0,1") 1686} 1687 1688func TestDB_BloomFilter(t *testing.T) { 1689 h := newDbHarnessWopt(t, &opt.Options{ 1690 DisableLargeBatchTransaction: true, 1691 DisableBlockCache: true, 1692 Filter: filter.NewBloomFilter(10), 1693 }) 1694 defer h.close() 1695 1696 key := func(i int) string { 1697 return fmt.Sprintf("key%06d", i) 1698 } 1699 1700 const n = 10000 1701 1702 // Populate multiple layers 1703 for i := 0; i < n; i++ { 1704 h.put(key(i), key(i)) 1705 } 1706 h.compactMem() 1707 h.compactRange("a", "z") 1708 for i := 0; i < n; i += 100 { 1709 h.put(key(i), key(i)) 1710 } 1711 h.compactMem() 1712 1713 // Prevent auto compactions triggered by seeks 1714 h.stor.Stall(testutil.ModeSync, storage.TypeTable) 1715 1716 // Lookup present keys. Should rarely read from small sstable. 1717 h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable) 1718 for i := 0; i < n; i++ { 1719 h.getVal(key(i), key(i)) 1720 } 1721 cnt, _ := h.stor.Counter(testutil.ModeRead, storage.TypeTable) 1722 t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt) 1723 if min, max := n, n+2*n/100; cnt < min || cnt > max { 1724 t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt) 1725 } 1726 1727 // Lookup missing keys. Should rarely read from either sstable. 1728 h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable) 1729 for i := 0; i < n; i++ { 1730 h.get(key(i)+".missing", false) 1731 } 1732 cnt, _ = h.stor.Counter(testutil.ModeRead, storage.TypeTable) 1733 t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt) 1734 if max := 3 * n / 100; cnt > max { 1735 t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt) 1736 } 1737 1738 h.stor.Release(testutil.ModeSync, storage.TypeTable) 1739} 1740 1741func TestDB_Concurrent(t *testing.T) { 1742 const n, secs, maxkey = 4, 6, 1000 1743 h := newDbHarness(t) 1744 defer h.close() 1745 1746 runtime.GOMAXPROCS(runtime.NumCPU()) 1747 1748 var ( 1749 closeWg sync.WaitGroup 1750 stop uint32 1751 cnt [n]uint32 1752 ) 1753 1754 for i := 0; i < n; i++ { 1755 closeWg.Add(1) 1756 go func(i int) { 1757 var put, get, found uint 1758 defer func() { 1759 t.Logf("goroutine %d stopped after %d ops, put=%d get=%d found=%d missing=%d", 1760 i, cnt[i], put, get, found, get-found) 1761 closeWg.Done() 1762 }() 1763 1764 rnd := rand.New(rand.NewSource(int64(1000 + i))) 1765 for atomic.LoadUint32(&stop) == 0 { 1766 x := cnt[i] 1767 1768 k := rnd.Intn(maxkey) 1769 kstr := fmt.Sprintf("%016d", k) 1770 1771 if (rnd.Int() % 2) > 0 { 1772 put++ 1773 h.put(kstr, fmt.Sprintf("%d.%d.%-1000d", k, i, x)) 1774 } else { 1775 get++ 1776 v, err := h.db.Get([]byte(kstr), h.ro) 1777 if err == nil { 1778 found++ 1779 rk, ri, rx := 0, -1, uint32(0) 1780 fmt.Sscanf(string(v), "%d.%d.%d", &rk, &ri, &rx) 1781 if rk != k { 1782 t.Errorf("invalid key want=%d got=%d", k, rk) 1783 } 1784 if ri < 0 || ri >= n { 1785 t.Error("invalid goroutine number: ", ri) 1786 } else { 1787 tx := atomic.LoadUint32(&(cnt[ri])) 1788 if rx > tx { 1789 t.Errorf("invalid seq number, %d > %d ", rx, tx) 1790 } 1791 } 1792 } else if err != ErrNotFound { 1793 t.Error("Get: got error: ", err) 1794 return 1795 } 1796 } 1797 atomic.AddUint32(&cnt[i], 1) 1798 } 1799 }(i) 1800 } 1801 1802 time.Sleep(secs * time.Second) 1803 atomic.StoreUint32(&stop, 1) 1804 closeWg.Wait() 1805} 1806 1807func TestDB_ConcurrentIterator(t *testing.T) { 1808 const n, n2 = 4, 1000 1809 h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30}) 1810 defer h.close() 1811 1812 runtime.GOMAXPROCS(runtime.NumCPU()) 1813 1814 var ( 1815 closeWg sync.WaitGroup 1816 stop uint32 1817 ) 1818 1819 for i := 0; i < n; i++ { 1820 closeWg.Add(1) 1821 go func(i int) { 1822 for k := 0; atomic.LoadUint32(&stop) == 0; k++ { 1823 h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10)) 1824 } 1825 closeWg.Done() 1826 }(i) 1827 } 1828 1829 for i := 0; i < n; i++ { 1830 closeWg.Add(1) 1831 go func(i int) { 1832 for k := 1000000; k < 0 || atomic.LoadUint32(&stop) == 0; k-- { 1833 h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10)) 1834 } 1835 closeWg.Done() 1836 }(i) 1837 } 1838 1839 cmp := comparer.DefaultComparer 1840 for i := 0; i < n2; i++ { 1841 closeWg.Add(1) 1842 go func(i int) { 1843 it := h.db.NewIterator(nil, nil) 1844 var pk []byte 1845 for it.Next() { 1846 kk := it.Key() 1847 if cmp.Compare(kk, pk) <= 0 { 1848 t.Errorf("iter %d: %q is successor of %q", i, pk, kk) 1849 } 1850 pk = append(pk[:0], kk...) 1851 var k, vk, vi int 1852 if n, err := fmt.Sscanf(string(it.Key()), "k%d", &k); err != nil { 1853 t.Errorf("iter %d: Scanf error on key %q: %v", i, it.Key(), err) 1854 } else if n < 1 { 1855 t.Errorf("iter %d: Cannot parse key %q", i, it.Key()) 1856 } 1857 if n, err := fmt.Sscanf(string(it.Value()), "%d.%d", &vk, &vi); err != nil { 1858 t.Errorf("iter %d: Scanf error on value %q: %v", i, it.Value(), err) 1859 } else if n < 2 { 1860 t.Errorf("iter %d: Cannot parse value %q", i, it.Value()) 1861 } 1862 1863 if vk != k { 1864 t.Errorf("iter %d: invalid value i=%d, want=%d got=%d", i, vi, k, vk) 1865 } 1866 } 1867 if err := it.Error(); err != nil { 1868 t.Errorf("iter %d: Got error: %v", i, err) 1869 } 1870 it.Release() 1871 closeWg.Done() 1872 }(i) 1873 } 1874 1875 atomic.StoreUint32(&stop, 1) 1876 closeWg.Wait() 1877} 1878 1879func TestDB_ConcurrentWrite(t *testing.T) { 1880 const n, bk, niter = 10, 3, 10000 1881 h := newDbHarness(t) 1882 defer h.close() 1883 1884 runtime.GOMAXPROCS(runtime.NumCPU()) 1885 1886 var wg sync.WaitGroup 1887 for i := 0; i < n; i++ { 1888 wg.Add(1) 1889 go func(i int) { 1890 defer wg.Done() 1891 for k := 0; k < niter; k++ { 1892 kstr := fmt.Sprintf("put-%d.%d", i, k) 1893 vstr := fmt.Sprintf("v%d", k) 1894 h.put(kstr, vstr) 1895 // Key should immediately available after put returns. 1896 h.getVal(kstr, vstr) 1897 } 1898 }(i) 1899 } 1900 for i := 0; i < n; i++ { 1901 wg.Add(1) 1902 batch := &Batch{} 1903 go func(i int) { 1904 defer wg.Done() 1905 for k := 0; k < niter; k++ { 1906 batch.Reset() 1907 for j := 0; j < bk; j++ { 1908 batch.Put([]byte(fmt.Sprintf("batch-%d.%d.%d", i, k, j)), []byte(fmt.Sprintf("v%d", k))) 1909 } 1910 h.write(batch) 1911 // Key should immediately available after put returns. 1912 for j := 0; j < bk; j++ { 1913 h.getVal(fmt.Sprintf("batch-%d.%d.%d", i, k, j), fmt.Sprintf("v%d", k)) 1914 } 1915 } 1916 }(i) 1917 } 1918 wg.Wait() 1919} 1920 1921func TestDB_CreateReopenDbOnFile(t *testing.T) { 1922 dbpath := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbtestCreateReopenDbOnFile-%d", os.Getuid())) 1923 if err := os.RemoveAll(dbpath); err != nil { 1924 t.Fatal("cannot remove old db: ", err) 1925 } 1926 defer os.RemoveAll(dbpath) 1927 1928 for i := 0; i < 3; i++ { 1929 stor, err := storage.OpenFile(dbpath, false) 1930 if err != nil { 1931 t.Fatalf("(%d) cannot open storage: %s", i, err) 1932 } 1933 db, err := Open(stor, nil) 1934 if err != nil { 1935 t.Fatalf("(%d) cannot open db: %s", i, err) 1936 } 1937 if err := db.Put([]byte("foo"), []byte("bar"), nil); err != nil { 1938 t.Fatalf("(%d) cannot write to db: %s", i, err) 1939 } 1940 if err := db.Close(); err != nil { 1941 t.Fatalf("(%d) cannot close db: %s", i, err) 1942 } 1943 if err := stor.Close(); err != nil { 1944 t.Fatalf("(%d) cannot close storage: %s", i, err) 1945 } 1946 } 1947} 1948 1949func TestDB_CreateReopenDbOnFile2(t *testing.T) { 1950 dbpath := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbtestCreateReopenDbOnFile2-%d", os.Getuid())) 1951 if err := os.RemoveAll(dbpath); err != nil { 1952 t.Fatal("cannot remove old db: ", err) 1953 } 1954 defer os.RemoveAll(dbpath) 1955 1956 for i := 0; i < 3; i++ { 1957 db, err := OpenFile(dbpath, nil) 1958 if err != nil { 1959 t.Fatalf("(%d) cannot open db: %s", i, err) 1960 } 1961 if err := db.Put([]byte("foo"), []byte("bar"), nil); err != nil { 1962 t.Fatalf("(%d) cannot write to db: %s", i, err) 1963 } 1964 if err := db.Close(); err != nil { 1965 t.Fatalf("(%d) cannot close db: %s", i, err) 1966 } 1967 } 1968} 1969 1970func TestDB_DeletionMarkersOnMemdb(t *testing.T) { 1971 h := newDbHarness(t) 1972 defer h.close() 1973 1974 h.put("foo", "v1") 1975 h.compactMem() 1976 h.delete("foo") 1977 h.get("foo", false) 1978 h.getKeyVal("") 1979} 1980 1981func TestDB_LeveldbIssue178(t *testing.T) { 1982 nKeys := (opt.DefaultCompactionTableSize / 30) * 5 1983 key1 := func(i int) string { 1984 return fmt.Sprintf("my_key_%d", i) 1985 } 1986 key2 := func(i int) string { 1987 return fmt.Sprintf("my_key_%d_xxx", i) 1988 } 1989 1990 // Disable compression since it affects the creation of layers and the 1991 // code below is trying to test against a very specific scenario. 1992 h := newDbHarnessWopt(t, &opt.Options{ 1993 DisableLargeBatchTransaction: true, 1994 Compression: opt.NoCompression, 1995 }) 1996 defer h.close() 1997 1998 // Create first key range. 1999 batch := new(Batch) 2000 for i := 0; i < nKeys; i++ { 2001 batch.Put([]byte(key1(i)), []byte("value for range 1 key")) 2002 } 2003 h.write(batch) 2004 2005 // Create second key range. 2006 batch.Reset() 2007 for i := 0; i < nKeys; i++ { 2008 batch.Put([]byte(key2(i)), []byte("value for range 2 key")) 2009 } 2010 h.write(batch) 2011 2012 // Delete second key range. 2013 batch.Reset() 2014 for i := 0; i < nKeys; i++ { 2015 batch.Delete([]byte(key2(i))) 2016 } 2017 h.write(batch) 2018 h.waitMemCompaction() 2019 2020 // Run manual compaction. 2021 h.compactRange(key1(0), key1(nKeys-1)) 2022 2023 // Checking the keys. 2024 h.assertNumKeys(nKeys) 2025} 2026 2027func TestDB_LeveldbIssue200(t *testing.T) { 2028 h := newDbHarness(t) 2029 defer h.close() 2030 2031 h.put("1", "b") 2032 h.put("2", "c") 2033 h.put("3", "d") 2034 h.put("4", "e") 2035 h.put("5", "f") 2036 2037 iter := h.db.NewIterator(nil, h.ro) 2038 2039 // Add an element that should not be reflected in the iterator. 2040 h.put("25", "cd") 2041 2042 iter.Seek([]byte("5")) 2043 assertBytes(t, []byte("5"), iter.Key()) 2044 iter.Prev() 2045 assertBytes(t, []byte("4"), iter.Key()) 2046 iter.Prev() 2047 assertBytes(t, []byte("3"), iter.Key()) 2048 iter.Next() 2049 assertBytes(t, []byte("4"), iter.Key()) 2050 iter.Next() 2051 assertBytes(t, []byte("5"), iter.Key()) 2052} 2053 2054func TestDB_GoleveldbIssue74(t *testing.T) { 2055 h := newDbHarnessWopt(t, &opt.Options{ 2056 DisableLargeBatchTransaction: true, 2057 WriteBuffer: 1 * opt.MiB, 2058 }) 2059 defer h.close() 2060 2061 const n, dur = 10000, 5 * time.Second 2062 2063 runtime.GOMAXPROCS(runtime.NumCPU()) 2064 2065 until := time.Now().Add(dur) 2066 wg := new(sync.WaitGroup) 2067 wg.Add(2) 2068 var done uint32 2069 go func() { 2070 var i int 2071 defer func() { 2072 t.Logf("WRITER DONE #%d", i) 2073 atomic.StoreUint32(&done, 1) 2074 wg.Done() 2075 }() 2076 2077 b := new(Batch) 2078 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ { 2079 iv := fmt.Sprintf("VAL%010d", i) 2080 for k := 0; k < n; k++ { 2081 key := fmt.Sprintf("KEY%06d", k) 2082 b.Put([]byte(key), []byte(key+iv)) 2083 b.Put([]byte(fmt.Sprintf("PTR%06d", k)), []byte(key)) 2084 } 2085 h.write(b) 2086 2087 b.Reset() 2088 snap := h.getSnapshot() 2089 iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil) 2090 var k int 2091 for ; iter.Next(); k++ { 2092 ptrKey := iter.Key() 2093 key := iter.Value() 2094 2095 if _, err := snap.Get(ptrKey, nil); err != nil { 2096 t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, ptrKey, err) 2097 } 2098 if value, err := snap.Get(key, nil); err != nil { 2099 t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, key, err) 2100 } else if string(value) != string(key)+iv { 2101 t.Fatalf("WRITER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+iv, value) 2102 } 2103 2104 b.Delete(key) 2105 b.Delete(ptrKey) 2106 } 2107 h.write(b) 2108 iter.Release() 2109 snap.Release() 2110 if k != n { 2111 t.Fatalf("#%d %d != %d", i, k, n) 2112 } 2113 } 2114 }() 2115 go func() { 2116 var i int 2117 defer func() { 2118 t.Logf("READER DONE #%d", i) 2119 atomic.StoreUint32(&done, 1) 2120 wg.Done() 2121 }() 2122 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ { 2123 snap := h.getSnapshot() 2124 iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil) 2125 var prevValue string 2126 var k int 2127 for ; iter.Next(); k++ { 2128 ptrKey := iter.Key() 2129 key := iter.Value() 2130 2131 if _, err := snap.Get(ptrKey, nil); err != nil { 2132 t.Fatalf("READER #%d snapshot.Get %q: %v", i, ptrKey, err) 2133 } 2134 2135 if value, err := snap.Get(key, nil); err != nil { 2136 t.Fatalf("READER #%d snapshot.Get %q: %v", i, key, err) 2137 } else if prevValue != "" && string(value) != string(key)+prevValue { 2138 t.Fatalf("READER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+prevValue, value) 2139 } else { 2140 prevValue = string(value[len(key):]) 2141 } 2142 } 2143 iter.Release() 2144 snap.Release() 2145 if k > 0 && k != n { 2146 t.Fatalf("#%d %d != %d", i, k, n) 2147 } 2148 } 2149 }() 2150 wg.Wait() 2151} 2152 2153func TestDB_GetProperties(t *testing.T) { 2154 h := newDbHarness(t) 2155 defer h.close() 2156 2157 _, err := h.db.GetProperty("leveldb.num-files-at-level") 2158 if err == nil { 2159 t.Error("GetProperty() failed to detect missing level") 2160 } 2161 2162 _, err = h.db.GetProperty("leveldb.num-files-at-level0") 2163 if err != nil { 2164 t.Error("got unexpected error", err) 2165 } 2166 2167 _, err = h.db.GetProperty("leveldb.num-files-at-level0x") 2168 if err == nil { 2169 t.Error("GetProperty() failed to detect invalid level") 2170 } 2171} 2172 2173func TestDB_GoleveldbIssue72and83(t *testing.T) { 2174 h := newDbHarnessWopt(t, &opt.Options{ 2175 DisableLargeBatchTransaction: true, 2176 WriteBuffer: 1 * opt.MiB, 2177 OpenFilesCacheCapacity: 3, 2178 }) 2179 defer h.close() 2180 2181 const n, wn, dur = 10000, 100, 30 * time.Second 2182 2183 runtime.GOMAXPROCS(runtime.NumCPU()) 2184 2185 randomData := func(prefix byte, i int) []byte { 2186 data := make([]byte, 1+4+32+64+32) 2187 _, err := crand.Reader.Read(data[1 : len(data)-8]) 2188 if err != nil { 2189 panic(err) 2190 } 2191 data[0] = prefix 2192 binary.LittleEndian.PutUint32(data[len(data)-8:], uint32(i)) 2193 binary.LittleEndian.PutUint32(data[len(data)-4:], util.NewCRC(data[:len(data)-4]).Value()) 2194 return data 2195 } 2196 2197 keys := make([][]byte, n) 2198 for i := range keys { 2199 keys[i] = randomData(1, 0) 2200 } 2201 2202 until := time.Now().Add(dur) 2203 wg := new(sync.WaitGroup) 2204 wg.Add(3) 2205 var done uint32 2206 go func() { 2207 i := 0 2208 defer func() { 2209 t.Logf("WRITER DONE #%d", i) 2210 wg.Done() 2211 }() 2212 2213 b := new(Batch) 2214 for ; i < wn && atomic.LoadUint32(&done) == 0; i++ { 2215 b.Reset() 2216 for _, k1 := range keys { 2217 k2 := randomData(2, i) 2218 b.Put(k2, randomData(42, i)) 2219 b.Put(k1, k2) 2220 } 2221 if err := h.db.Write(b, h.wo); err != nil { 2222 atomic.StoreUint32(&done, 1) 2223 t.Fatalf("WRITER #%d db.Write: %v", i, err) 2224 } 2225 } 2226 }() 2227 go func() { 2228 var i int 2229 defer func() { 2230 t.Logf("READER0 DONE #%d", i) 2231 atomic.StoreUint32(&done, 1) 2232 wg.Done() 2233 }() 2234 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ { 2235 snap := h.getSnapshot() 2236 seq := snap.elem.seq 2237 if seq == 0 { 2238 snap.Release() 2239 continue 2240 } 2241 iter := snap.NewIterator(util.BytesPrefix([]byte{1}), nil) 2242 writei := int(seq/(n*2) - 1) 2243 var k int 2244 for ; iter.Next(); k++ { 2245 k1 := iter.Key() 2246 k2 := iter.Value() 2247 k1checksum0 := binary.LittleEndian.Uint32(k1[len(k1)-4:]) 2248 k1checksum1 := util.NewCRC(k1[:len(k1)-4]).Value() 2249 if k1checksum0 != k1checksum1 { 2250 t.Fatalf("READER0 #%d.%d W#%d invalid K1 checksum: %#x != %#x", i, k, writei, k1checksum0, k1checksum0) 2251 } 2252 k2checksum0 := binary.LittleEndian.Uint32(k2[len(k2)-4:]) 2253 k2checksum1 := util.NewCRC(k2[:len(k2)-4]).Value() 2254 if k2checksum0 != k2checksum1 { 2255 t.Fatalf("READER0 #%d.%d W#%d invalid K2 checksum: %#x != %#x", i, k, writei, k2checksum0, k2checksum1) 2256 } 2257 kwritei := int(binary.LittleEndian.Uint32(k2[len(k2)-8:])) 2258 if writei != kwritei { 2259 t.Fatalf("READER0 #%d.%d W#%d invalid write iteration num: %d", i, k, writei, kwritei) 2260 } 2261 if _, err := snap.Get(k2, nil); err != nil { 2262 t.Fatalf("READER0 #%d.%d W#%d snap.Get: %v\nk1: %x\n -> k2: %x", i, k, writei, err, k1, k2) 2263 } 2264 } 2265 if err := iter.Error(); err != nil { 2266 t.Fatalf("READER0 #%d.%d W#%d snap.Iterator: %v", i, k, writei, err) 2267 } 2268 iter.Release() 2269 snap.Release() 2270 if k > 0 && k != n { 2271 t.Fatalf("READER0 #%d W#%d short read, got=%d want=%d", i, writei, k, n) 2272 } 2273 } 2274 }() 2275 go func() { 2276 var i int 2277 defer func() { 2278 t.Logf("READER1 DONE #%d", i) 2279 atomic.StoreUint32(&done, 1) 2280 wg.Done() 2281 }() 2282 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ { 2283 iter := h.db.NewIterator(nil, nil) 2284 seq := iter.(*dbIter).seq 2285 if seq == 0 { 2286 iter.Release() 2287 continue 2288 } 2289 writei := int(seq/(n*2) - 1) 2290 var k int 2291 for ok := iter.Last(); ok; ok = iter.Prev() { 2292 k++ 2293 } 2294 if err := iter.Error(); err != nil { 2295 t.Fatalf("READER1 #%d.%d W#%d db.Iterator: %v", i, k, writei, err) 2296 } 2297 iter.Release() 2298 if m := (writei+1)*n + n; k != m { 2299 t.Fatalf("READER1 #%d W#%d short read, got=%d want=%d", i, writei, k, m) 2300 } 2301 } 2302 }() 2303 2304 wg.Wait() 2305} 2306 2307func TestDB_TransientError(t *testing.T) { 2308 h := newDbHarnessWopt(t, &opt.Options{ 2309 DisableLargeBatchTransaction: true, 2310 WriteBuffer: 128 * opt.KiB, 2311 OpenFilesCacheCapacity: 3, 2312 DisableCompactionBackoff: true, 2313 }) 2314 defer h.close() 2315 2316 const ( 2317 nSnap = 20 2318 nKey = 10000 2319 ) 2320 2321 var ( 2322 snaps [nSnap]*Snapshot 2323 b = &Batch{} 2324 ) 2325 for i := range snaps { 2326 vtail := fmt.Sprintf("VAL%030d", i) 2327 b.Reset() 2328 for k := 0; k < nKey; k++ { 2329 key := fmt.Sprintf("KEY%8d", k) 2330 b.Put([]byte(key), []byte(key+vtail)) 2331 } 2332 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error")) 2333 if err := h.db.Write(b, nil); err != nil { 2334 t.Logf("WRITE #%d error: %v", i, err) 2335 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil) 2336 for { 2337 if err := h.db.Write(b, nil); err == nil { 2338 break 2339 } else if errors.IsCorrupted(err) { 2340 t.Fatalf("WRITE #%d corrupted: %v", i, err) 2341 } 2342 } 2343 } 2344 2345 snaps[i] = h.db.newSnapshot() 2346 b.Reset() 2347 for k := 0; k < nKey; k++ { 2348 key := fmt.Sprintf("KEY%8d", k) 2349 b.Delete([]byte(key)) 2350 } 2351 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error")) 2352 if err := h.db.Write(b, nil); err != nil { 2353 t.Logf("WRITE #%d error: %v", i, err) 2354 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil) 2355 for { 2356 if err := h.db.Write(b, nil); err == nil { 2357 break 2358 } else if errors.IsCorrupted(err) { 2359 t.Fatalf("WRITE #%d corrupted: %v", i, err) 2360 } 2361 } 2362 } 2363 } 2364 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil) 2365 2366 runtime.GOMAXPROCS(runtime.NumCPU()) 2367 2368 rnd := rand.New(rand.NewSource(0xecafdaed)) 2369 wg := &sync.WaitGroup{} 2370 for i, snap := range snaps { 2371 wg.Add(2) 2372 2373 go func(i int, snap *Snapshot, sk []int) { 2374 defer wg.Done() 2375 2376 vtail := fmt.Sprintf("VAL%030d", i) 2377 for _, k := range sk { 2378 key := fmt.Sprintf("KEY%8d", k) 2379 xvalue, err := snap.Get([]byte(key), nil) 2380 if err != nil { 2381 t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err) 2382 } 2383 value := key + vtail 2384 if !bytes.Equal([]byte(value), xvalue) { 2385 t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue) 2386 } 2387 } 2388 }(i, snap, rnd.Perm(nKey)) 2389 2390 go func(i int, snap *Snapshot) { 2391 defer wg.Done() 2392 2393 vtail := fmt.Sprintf("VAL%030d", i) 2394 iter := snap.NewIterator(nil, nil) 2395 defer iter.Release() 2396 for k := 0; k < nKey; k++ { 2397 if !iter.Next() { 2398 if err := iter.Error(); err != nil { 2399 t.Fatalf("READER_ITER #%d K%d error: %v", i, k, err) 2400 } else { 2401 t.Fatalf("READER_ITER #%d K%d eoi", i, k) 2402 } 2403 } 2404 key := fmt.Sprintf("KEY%8d", k) 2405 xkey := iter.Key() 2406 if !bytes.Equal([]byte(key), xkey) { 2407 t.Fatalf("READER_ITER #%d K%d invalid key: want %q, got %q", i, k, key, xkey) 2408 } 2409 value := key + vtail 2410 xvalue := iter.Value() 2411 if !bytes.Equal([]byte(value), xvalue) { 2412 t.Fatalf("READER_ITER #%d K%d invalid value: want %q, got %q", i, k, value, xvalue) 2413 } 2414 } 2415 }(i, snap) 2416 } 2417 2418 wg.Wait() 2419} 2420 2421func TestDB_UkeyShouldntHopAcrossTable(t *testing.T) { 2422 h := newDbHarnessWopt(t, &opt.Options{ 2423 DisableLargeBatchTransaction: true, 2424 WriteBuffer: 112 * opt.KiB, 2425 CompactionTableSize: 90 * opt.KiB, 2426 CompactionExpandLimitFactor: 1, 2427 }) 2428 defer h.close() 2429 2430 const ( 2431 nSnap = 190 2432 nKey = 140 2433 ) 2434 2435 var ( 2436 snaps [nSnap]*Snapshot 2437 b = &Batch{} 2438 ) 2439 for i := range snaps { 2440 vtail := fmt.Sprintf("VAL%030d", i) 2441 b.Reset() 2442 for k := 0; k < nKey; k++ { 2443 key := fmt.Sprintf("KEY%08d", k) 2444 b.Put([]byte(key), []byte(key+vtail)) 2445 } 2446 if err := h.db.Write(b, nil); err != nil { 2447 t.Fatalf("WRITE #%d error: %v", i, err) 2448 } 2449 2450 snaps[i] = h.db.newSnapshot() 2451 b.Reset() 2452 for k := 0; k < nKey; k++ { 2453 key := fmt.Sprintf("KEY%08d", k) 2454 b.Delete([]byte(key)) 2455 } 2456 if err := h.db.Write(b, nil); err != nil { 2457 t.Fatalf("WRITE #%d error: %v", i, err) 2458 } 2459 } 2460 2461 h.compactMem() 2462 2463 h.waitCompaction() 2464 for level, tables := range h.db.s.stVersion.levels { 2465 for _, table := range tables { 2466 t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax) 2467 } 2468 } 2469 2470 h.compactRangeAt(0, "", "") 2471 h.waitCompaction() 2472 for level, tables := range h.db.s.stVersion.levels { 2473 for _, table := range tables { 2474 t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax) 2475 } 2476 } 2477 h.compactRangeAt(1, "", "") 2478 h.waitCompaction() 2479 for level, tables := range h.db.s.stVersion.levels { 2480 for _, table := range tables { 2481 t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax) 2482 } 2483 } 2484 runtime.GOMAXPROCS(runtime.NumCPU()) 2485 2486 wg := &sync.WaitGroup{} 2487 for i, snap := range snaps { 2488 wg.Add(1) 2489 2490 go func(i int, snap *Snapshot) { 2491 defer wg.Done() 2492 2493 vtail := fmt.Sprintf("VAL%030d", i) 2494 for k := 0; k < nKey; k++ { 2495 key := fmt.Sprintf("KEY%08d", k) 2496 xvalue, err := snap.Get([]byte(key), nil) 2497 if err != nil { 2498 t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err) 2499 } 2500 value := key + vtail 2501 if !bytes.Equal([]byte(value), xvalue) { 2502 t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue) 2503 } 2504 } 2505 }(i, snap) 2506 } 2507 2508 wg.Wait() 2509} 2510 2511func TestDB_TableCompactionBuilder(t *testing.T) { 2512 gomega.RegisterTestingT(t) 2513 stor := testutil.NewStorage() 2514 stor.OnLog(testingLogger(t)) 2515 stor.OnClose(testingPreserveOnFailed(t)) 2516 defer stor.Close() 2517 2518 const nSeq = 99 2519 2520 o := &opt.Options{ 2521 DisableLargeBatchTransaction: true, 2522 WriteBuffer: 112 * opt.KiB, 2523 CompactionTableSize: 43 * opt.KiB, 2524 CompactionExpandLimitFactor: 1, 2525 CompactionGPOverlapsFactor: 1, 2526 DisableBlockCache: true, 2527 } 2528 s, err := newSession(stor, o) 2529 if err != nil { 2530 t.Fatal(err) 2531 } 2532 if err := s.create(); err != nil { 2533 t.Fatal(err) 2534 } 2535 defer s.close() 2536 var ( 2537 seq uint64 2538 targetSize = 5 * o.CompactionTableSize 2539 value = bytes.Repeat([]byte{'0'}, 100) 2540 ) 2541 for i := 0; i < 2; i++ { 2542 tw, err := s.tops.create(0) 2543 if err != nil { 2544 t.Fatal(err) 2545 } 2546 for k := 0; tw.tw.BytesLen() < targetSize; k++ { 2547 key := []byte(fmt.Sprintf("%09d", k)) 2548 seq += nSeq - 1 2549 for x := uint64(0); x < nSeq; x++ { 2550 if err := tw.append(makeInternalKey(nil, key, seq-x, keyTypeVal), value); err != nil { 2551 t.Fatal(err) 2552 } 2553 } 2554 } 2555 tf, err := tw.finish() 2556 if err != nil { 2557 t.Fatal(err) 2558 } 2559 rec := &sessionRecord{} 2560 rec.addTableFile(i, tf) 2561 if err := s.commit(rec, false); err != nil { 2562 t.Fatal(err) 2563 } 2564 } 2565 2566 // Build grandparent. 2567 v := s.version() 2568 c := newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...), undefinedCompaction) 2569 rec := &sessionRecord{} 2570 b := &tableCompactionBuilder{ 2571 s: s, 2572 c: c, 2573 rec: rec, 2574 stat1: new(cStatStaging), 2575 minSeq: 0, 2576 strict: true, 2577 tableSize: o.CompactionTableSize/3 + 961, 2578 } 2579 if err := b.run(new(compactionTransactCounter)); err != nil { 2580 t.Fatal(err) 2581 } 2582 for _, t := range c.levels[0] { 2583 rec.delTable(c.sourceLevel, t.fd.Num) 2584 } 2585 if err := s.commit(rec, false); err != nil { 2586 t.Fatal(err) 2587 } 2588 c.release() 2589 2590 // Build level-1. 2591 v = s.version() 2592 c = newCompaction(s, v, 0, append(tFiles{}, v.levels[0]...), undefinedCompaction) 2593 rec = &sessionRecord{} 2594 b = &tableCompactionBuilder{ 2595 s: s, 2596 c: c, 2597 rec: rec, 2598 stat1: new(cStatStaging), 2599 minSeq: 0, 2600 strict: true, 2601 tableSize: o.CompactionTableSize, 2602 } 2603 if err := b.run(new(compactionTransactCounter)); err != nil { 2604 t.Fatal(err) 2605 } 2606 for _, t := range c.levels[0] { 2607 rec.delTable(c.sourceLevel, t.fd.Num) 2608 } 2609 // Move grandparent to level-3 2610 for _, t := range v.levels[2] { 2611 rec.delTable(2, t.fd.Num) 2612 rec.addTableFile(3, t) 2613 } 2614 if err := s.commit(rec, false); err != nil { 2615 t.Fatal(err) 2616 } 2617 c.release() 2618 2619 v = s.version() 2620 for level, want := range []bool{false, true, false, true} { 2621 got := len(v.levels[level]) > 0 2622 if want != got { 2623 t.Fatalf("invalid level-%d tables len: want %v, got %v", level, want, got) 2624 } 2625 } 2626 for i, f := range v.levels[1][:len(v.levels[1])-1] { 2627 nf := v.levels[1][i+1] 2628 if bytes.Equal(f.imax.ukey(), nf.imin.ukey()) { 2629 t.Fatalf("KEY %q hop across table %d .. %d", f.imax.ukey(), f.fd.Num, nf.fd.Num) 2630 } 2631 } 2632 v.release() 2633 2634 // Compaction with transient error. 2635 v = s.version() 2636 c = newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...), undefinedCompaction) 2637 rec = &sessionRecord{} 2638 b = &tableCompactionBuilder{ 2639 s: s, 2640 c: c, 2641 rec: rec, 2642 stat1: new(cStatStaging), 2643 minSeq: 0, 2644 strict: true, 2645 tableSize: o.CompactionTableSize, 2646 } 2647 stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, errors.New("table sync error (once)")) 2648 stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0.01, errors.New("table random IO error")) 2649 for { 2650 if err := b.run(new(compactionTransactCounter)); err != nil { 2651 t.Logf("(expected) b.run: %v", err) 2652 } else { 2653 break 2654 } 2655 } 2656 if err := s.commit(rec, false); err != nil { 2657 t.Fatal(err) 2658 } 2659 c.release() 2660 2661 stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, nil) 2662 stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0, nil) 2663 2664 v = s.version() 2665 if len(v.levels[1]) != len(v.levels[2]) { 2666 t.Fatalf("invalid tables length, want %d, got %d", len(v.levels[1]), len(v.levels[2])) 2667 } 2668 for i, f0 := range v.levels[1] { 2669 f1 := v.levels[2][i] 2670 iter0 := s.tops.newIterator(f0, nil, nil) 2671 iter1 := s.tops.newIterator(f1, nil, nil) 2672 for j := 0; true; j++ { 2673 next0 := iter0.Next() 2674 next1 := iter1.Next() 2675 if next0 != next1 { 2676 t.Fatalf("#%d.%d invalid eoi: want %v, got %v", i, j, next0, next1) 2677 } 2678 key0 := iter0.Key() 2679 key1 := iter1.Key() 2680 if !bytes.Equal(key0, key1) { 2681 t.Fatalf("#%d.%d invalid key: want %q, got %q", i, j, key0, key1) 2682 } 2683 if next0 == false { 2684 break 2685 } 2686 } 2687 iter0.Release() 2688 iter1.Release() 2689 } 2690 v.release() 2691} 2692 2693func testDB_IterTriggeredCompaction(t *testing.T, limitDiv int) { 2694 const ( 2695 vSize = 200 * opt.KiB 2696 tSize = 100 * opt.MiB 2697 mIter = 100 2698 n = tSize / vSize 2699 ) 2700 2701 h := newDbHarnessWopt(t, &opt.Options{ 2702 DisableLargeBatchTransaction: true, 2703 Compression: opt.NoCompression, 2704 DisableBlockCache: true, 2705 }) 2706 defer h.close() 2707 2708 h.db.memdbMaxLevel = 2 2709 2710 key := func(x int) string { 2711 return fmt.Sprintf("v%06d", x) 2712 } 2713 2714 // Fill. 2715 value := strings.Repeat("x", vSize) 2716 for i := 0; i < n; i++ { 2717 h.put(key(i), value) 2718 } 2719 h.compactMem() 2720 2721 // Delete all. 2722 for i := 0; i < n; i++ { 2723 h.delete(key(i)) 2724 } 2725 h.compactMem() 2726 2727 var ( 2728 limit = n / limitDiv 2729 2730 startKey = key(0) 2731 limitKey = key(limit) 2732 maxKey = key(n) 2733 slice = &util.Range{Limit: []byte(limitKey)} 2734 2735 initialSize0 = h.sizeOf(startKey, limitKey) 2736 initialSize1 = h.sizeOf(limitKey, maxKey) 2737 ) 2738 2739 t.Logf("initial size %s [rest %s]", shortenb(int(initialSize0)), shortenb(int(initialSize1))) 2740 2741 for r := 0; true; r++ { 2742 if r >= mIter { 2743 t.Fatal("taking too long to compact") 2744 } 2745 2746 // Iterates. 2747 iter := h.db.NewIterator(slice, h.ro) 2748 for iter.Next() { 2749 } 2750 if err := iter.Error(); err != nil { 2751 t.Fatalf("Iter err: %v", err) 2752 } 2753 iter.Release() 2754 2755 // Wait compaction. 2756 h.waitCompaction() 2757 2758 // Check size. 2759 size0 := h.sizeOf(startKey, limitKey) 2760 size1 := h.sizeOf(limitKey, maxKey) 2761 t.Logf("#%03d size %s [rest %s]", r, shortenb(int(size0)), shortenb(int(size1))) 2762 if size0 < initialSize0/10 { 2763 break 2764 } 2765 } 2766 2767 if initialSize1 > 0 { 2768 h.sizeAssert(limitKey, maxKey, initialSize1/4-opt.MiB, initialSize1+opt.MiB) 2769 } 2770} 2771 2772func TestDB_IterTriggeredCompaction(t *testing.T) { 2773 testDB_IterTriggeredCompaction(t, 1) 2774} 2775 2776func TestDB_IterTriggeredCompactionHalf(t *testing.T) { 2777 testDB_IterTriggeredCompaction(t, 2) 2778} 2779 2780func TestDB_ReadOnly(t *testing.T) { 2781 h := newDbHarness(t) 2782 defer h.close() 2783 2784 h.put("foo", "v1") 2785 h.put("bar", "v2") 2786 h.compactMem() 2787 2788 h.put("xfoo", "v1") 2789 h.put("xbar", "v2") 2790 2791 t.Log("Trigger read-only") 2792 if err := h.db.SetReadOnly(); err != nil { 2793 h.close() 2794 t.Fatalf("SetReadOnly error: %v", err) 2795 } 2796 2797 mode := testutil.ModeCreate | testutil.ModeRemove | testutil.ModeRename | testutil.ModeWrite | testutil.ModeSync 2798 h.stor.EmulateError(mode, storage.TypeAll, errors.New("read-only DB shouldn't writes")) 2799 2800 ro := func(key, value, wantValue string) { 2801 if err := h.db.Put([]byte(key), []byte(value), h.wo); err != ErrReadOnly { 2802 t.Fatalf("unexpected error: %v", err) 2803 } 2804 h.getVal(key, wantValue) 2805 } 2806 2807 ro("foo", "vx", "v1") 2808 2809 h.o.ReadOnly = true 2810 h.reopenDB() 2811 2812 ro("foo", "vx", "v1") 2813 ro("bar", "vx", "v2") 2814 h.assertNumKeys(4) 2815} 2816 2817func TestDB_BulkInsertDelete(t *testing.T) { 2818 h := newDbHarnessWopt(t, &opt.Options{ 2819 DisableLargeBatchTransaction: true, 2820 Compression: opt.NoCompression, 2821 CompactionTableSize: 128 * opt.KiB, 2822 CompactionTotalSize: 1 * opt.MiB, 2823 WriteBuffer: 256 * opt.KiB, 2824 }) 2825 defer h.close() 2826 2827 const R = 100 2828 const N = 2500 2829 key := make([]byte, 4) 2830 value := make([]byte, 256) 2831 for i := 0; i < R; i++ { 2832 offset := N * i 2833 for j := 0; j < N; j++ { 2834 binary.BigEndian.PutUint32(key, uint32(offset+j)) 2835 h.db.Put(key, value, nil) 2836 } 2837 for j := 0; j < N; j++ { 2838 binary.BigEndian.PutUint32(key, uint32(offset+j)) 2839 h.db.Delete(key, nil) 2840 } 2841 } 2842 2843 h.waitCompaction() 2844 if tot := h.totalTables(); tot > 10 { 2845 t.Fatalf("too many uncompacted tables: %d (%s)", tot, h.getTablesPerLevel()) 2846 } 2847} 2848 2849func TestDB_GracefulClose(t *testing.T) { 2850 runtime.GOMAXPROCS(4) 2851 h := newDbHarnessWopt(t, &opt.Options{ 2852 DisableLargeBatchTransaction: true, 2853 Compression: opt.NoCompression, 2854 CompactionTableSize: 1 * opt.MiB, 2855 WriteBuffer: 1 * opt.MiB, 2856 }) 2857 defer h.close() 2858 2859 var closeWait sync.WaitGroup 2860 2861 // During write. 2862 n := 0 2863 closing := false 2864 for i := 0; i < 1000000; i++ { 2865 if !closing && h.totalTables() > 3 { 2866 t.Logf("close db during write, index=%d", i) 2867 closeWait.Add(1) 2868 go func() { 2869 h.closeDB() 2870 closeWait.Done() 2871 }() 2872 closing = true 2873 } 2874 if err := h.db.Put([]byte(fmt.Sprintf("%09d", i)), []byte(fmt.Sprintf("VAL-%09d", i)), h.wo); err != nil { 2875 t.Logf("Put error: %s (expected)", err) 2876 n = i 2877 break 2878 } 2879 } 2880 closeWait.Wait() 2881 2882 // During read. 2883 h.openDB() 2884 closing = false 2885 for i := 0; i < n; i++ { 2886 if !closing && i > n/2 { 2887 t.Logf("close db during read, index=%d", i) 2888 closeWait.Add(1) 2889 go func() { 2890 h.closeDB() 2891 closeWait.Done() 2892 }() 2893 closing = true 2894 } 2895 if _, err := h.db.Get([]byte(fmt.Sprintf("%09d", i)), h.ro); err != nil { 2896 t.Logf("Get error: %s (expected)", err) 2897 break 2898 } 2899 } 2900 closeWait.Wait() 2901 2902 // During iterate. 2903 h.openDB() 2904 closing = false 2905 iter := h.db.NewIterator(nil, h.ro) 2906 for i := 0; iter.Next(); i++ { 2907 if len(iter.Key()) == 0 || len(iter.Value()) == 0 { 2908 t.Error("Key or value has zero length") 2909 } 2910 if !closing { 2911 t.Logf("close db during iter, index=%d", i) 2912 closeWait.Add(1) 2913 go func() { 2914 h.closeDB() 2915 closeWait.Done() 2916 }() 2917 closing = true 2918 } 2919 time.Sleep(time.Millisecond) 2920 } 2921 if err := iter.Error(); err != nil { 2922 t.Logf("Iter error: %s (expected)", err) 2923 } 2924 iter.Release() 2925 closeWait.Wait() 2926} 2927