1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"bytes"
11	"container/list"
12	crand "crypto/rand"
13	"encoding/binary"
14	"fmt"
15	"math/rand"
16	"os"
17	"path/filepath"
18	"runtime"
19	"strings"
20	"sync"
21	"sync/atomic"
22	"testing"
23	"time"
24	"unsafe"
25
26	"github.com/onsi/gomega"
27
28	"github.com/syndtr/goleveldb/leveldb/comparer"
29	"github.com/syndtr/goleveldb/leveldb/errors"
30	"github.com/syndtr/goleveldb/leveldb/filter"
31	"github.com/syndtr/goleveldb/leveldb/iterator"
32	"github.com/syndtr/goleveldb/leveldb/opt"
33	"github.com/syndtr/goleveldb/leveldb/storage"
34	"github.com/syndtr/goleveldb/leveldb/testutil"
35	"github.com/syndtr/goleveldb/leveldb/util"
36)
37
38func tkey(i int) []byte {
39	return []byte(fmt.Sprintf("%016d", i))
40}
41
42func tval(seed, n int) []byte {
43	r := rand.New(rand.NewSource(int64(seed)))
44	return randomString(r, n)
45}
46
47func testingLogger(t *testing.T) func(log string) {
48	return func(log string) {
49		t.Log(log)
50	}
51}
52
53func testingPreserveOnFailed(t *testing.T) func() (preserve bool, err error) {
54	return func() (preserve bool, err error) {
55		preserve = t.Failed()
56		return
57	}
58}
59
60type dbHarness struct {
61	t *testing.T
62
63	stor *testutil.Storage
64	db   *DB
65	o    *opt.Options
66	ro   *opt.ReadOptions
67	wo   *opt.WriteOptions
68}
69
70func newDbHarnessWopt(t *testing.T, o *opt.Options) *dbHarness {
71	h := new(dbHarness)
72	h.init(t, o)
73	return h
74}
75
76func newDbHarness(t *testing.T) *dbHarness {
77	return newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true})
78}
79
80func (h *dbHarness) init(t *testing.T, o *opt.Options) {
81	gomega.RegisterTestingT(t)
82	h.t = t
83	h.stor = testutil.NewStorage()
84	h.stor.OnLog(testingLogger(t))
85	h.stor.OnClose(testingPreserveOnFailed(t))
86	h.o = o
87	h.ro = nil
88	h.wo = nil
89
90	if err := h.openDB0(); err != nil {
91		// So that it will come after fatal message.
92		defer h.stor.Close()
93		h.t.Fatal("Open (init): got error: ", err)
94	}
95}
96
97func (h *dbHarness) openDB0() (err error) {
98	h.t.Log("opening DB")
99	h.db, err = Open(h.stor, h.o)
100	return
101}
102
103func (h *dbHarness) openDB() {
104	if err := h.openDB0(); err != nil {
105		h.t.Fatal("Open: got error: ", err)
106	}
107}
108
109func (h *dbHarness) closeDB0() error {
110	h.t.Log("closing DB")
111	return h.db.Close()
112}
113
114func (h *dbHarness) closeDB() {
115	if h.db != nil {
116		if err := h.closeDB0(); err != nil {
117			h.t.Error("Close: got error: ", err)
118		}
119	}
120	h.stor.CloseCheck()
121	runtime.GC()
122}
123
124func (h *dbHarness) reopenDB() {
125	if h.db != nil {
126		h.closeDB()
127	}
128	h.openDB()
129}
130
131func (h *dbHarness) close() {
132	if h.db != nil {
133		h.closeDB0()
134		h.db = nil
135	}
136	h.stor.Close()
137	h.stor = nil
138	runtime.GC()
139}
140
141func (h *dbHarness) openAssert(want bool) {
142	db, err := Open(h.stor, h.o)
143	if err != nil {
144		if want {
145			h.t.Error("Open: assert: got error: ", err)
146		} else {
147			h.t.Log("Open: assert: got error (expected): ", err)
148		}
149	} else {
150		if !want {
151			h.t.Error("Open: assert: expect error")
152		}
153		db.Close()
154	}
155}
156
157func (h *dbHarness) write(batch *Batch) {
158	if err := h.db.Write(batch, h.wo); err != nil {
159		h.t.Error("Write: got error: ", err)
160	}
161}
162
163func (h *dbHarness) put(key, value string) {
164	if err := h.db.Put([]byte(key), []byte(value), h.wo); err != nil {
165		h.t.Error("Put: got error: ", err)
166	}
167}
168
169func (h *dbHarness) putMulti(n int, low, hi string) {
170	for i := 0; i < n; i++ {
171		h.put(low, "begin")
172		h.put(hi, "end")
173		h.compactMem()
174	}
175}
176
177func (h *dbHarness) maxNextLevelOverlappingBytes(want int64) {
178	t := h.t
179	db := h.db
180
181	var (
182		maxOverlaps int64
183		maxLevel    int
184	)
185	v := db.s.version()
186	if len(v.levels) > 2 {
187		for i, tt := range v.levels[1 : len(v.levels)-1] {
188			level := i + 1
189			next := v.levels[level+1]
190			for _, t := range tt {
191				r := next.getOverlaps(nil, db.s.icmp, t.imin.ukey(), t.imax.ukey(), false)
192				sum := r.size()
193				if sum > maxOverlaps {
194					maxOverlaps = sum
195					maxLevel = level
196				}
197			}
198		}
199	}
200	v.release()
201
202	if maxOverlaps > want {
203		t.Errorf("next level most overlapping bytes is more than %d, got=%d level=%d", want, maxOverlaps, maxLevel)
204	} else {
205		t.Logf("next level most overlapping bytes is %d, level=%d want=%d", maxOverlaps, maxLevel, want)
206	}
207}
208
209func (h *dbHarness) delete(key string) {
210	t := h.t
211	db := h.db
212
213	err := db.Delete([]byte(key), h.wo)
214	if err != nil {
215		t.Error("Delete: got error: ", err)
216	}
217}
218
219func (h *dbHarness) assertNumKeys(want int) {
220	iter := h.db.NewIterator(nil, h.ro)
221	defer iter.Release()
222	got := 0
223	for iter.Next() {
224		got++
225	}
226	if err := iter.Error(); err != nil {
227		h.t.Error("assertNumKeys: ", err)
228	}
229	if want != got {
230		h.t.Errorf("assertNumKeys: want=%d got=%d", want, got)
231	}
232}
233
234func (h *dbHarness) getr(db Reader, key string, expectFound bool) (found bool, v []byte) {
235	t := h.t
236	v, err := db.Get([]byte(key), h.ro)
237	switch err {
238	case ErrNotFound:
239		if expectFound {
240			t.Errorf("Get: key '%s' not found, want found", key)
241		}
242	case nil:
243		found = true
244		if !expectFound {
245			t.Errorf("Get: key '%s' found, want not found", key)
246		}
247	default:
248		t.Error("Get: got error: ", err)
249	}
250	return
251}
252
253func (h *dbHarness) get(key string, expectFound bool) (found bool, v []byte) {
254	return h.getr(h.db, key, expectFound)
255}
256
257func (h *dbHarness) getValr(db Reader, key, value string) {
258	t := h.t
259	found, r := h.getr(db, key, true)
260	if !found {
261		return
262	}
263	rval := string(r)
264	if rval != value {
265		t.Errorf("Get: invalid value, got '%s', want '%s'", rval, value)
266	}
267}
268
269func (h *dbHarness) getVal(key, value string) {
270	h.getValr(h.db, key, value)
271}
272
273func (h *dbHarness) allEntriesFor(key, want string) {
274	t := h.t
275	db := h.db
276	s := db.s
277
278	ikey := makeInternalKey(nil, []byte(key), keyMaxSeq, keyTypeVal)
279	iter := db.newRawIterator(nil, nil, nil, nil)
280	if !iter.Seek(ikey) && iter.Error() != nil {
281		t.Error("AllEntries: error during seek, err: ", iter.Error())
282		return
283	}
284	res := "[ "
285	first := true
286	for iter.Valid() {
287		if ukey, _, kt, kerr := parseInternalKey(iter.Key()); kerr == nil {
288			if s.icmp.uCompare(ikey.ukey(), ukey) != 0 {
289				break
290			}
291			if !first {
292				res += ", "
293			}
294			first = false
295			switch kt {
296			case keyTypeVal:
297				res += string(iter.Value())
298			case keyTypeDel:
299				res += "DEL"
300			}
301		} else {
302			if !first {
303				res += ", "
304			}
305			first = false
306			res += "CORRUPTED"
307		}
308		iter.Next()
309	}
310	if !first {
311		res += " "
312	}
313	res += "]"
314	if res != want {
315		t.Errorf("AllEntries: assert failed for key %q, got=%q want=%q", key, res, want)
316	}
317}
318
319// Return a string that contains all key,value pairs in order,
320// formatted like "(k1->v1)(k2->v2)".
321func (h *dbHarness) getKeyVal(want string) {
322	t := h.t
323	db := h.db
324
325	s, err := db.GetSnapshot()
326	if err != nil {
327		t.Fatal("GetSnapshot: got error: ", err)
328	}
329	res := ""
330	iter := s.NewIterator(nil, nil)
331	for iter.Next() {
332		res += fmt.Sprintf("(%s->%s)", string(iter.Key()), string(iter.Value()))
333	}
334	iter.Release()
335
336	if res != want {
337		t.Errorf("GetKeyVal: invalid key/value pair, got=%q want=%q", res, want)
338	}
339	s.Release()
340}
341
342func (h *dbHarness) waitCompaction() {
343	t := h.t
344	db := h.db
345	if err := db.compTriggerWait(db.tcompCmdC); err != nil {
346		t.Error("compaction error: ", err)
347	}
348}
349
350func (h *dbHarness) waitMemCompaction() {
351	t := h.t
352	db := h.db
353
354	if err := db.compTriggerWait(db.mcompCmdC); err != nil {
355		t.Error("compaction error: ", err)
356	}
357}
358
359func (h *dbHarness) compactMem() {
360	t := h.t
361	db := h.db
362
363	t.Log("starting memdb compaction")
364
365	db.writeLockC <- struct{}{}
366	defer func() {
367		<-db.writeLockC
368	}()
369
370	if _, err := db.rotateMem(0, true); err != nil {
371		t.Error("compaction error: ", err)
372	}
373
374	if h.totalTables() == 0 {
375		t.Error("zero tables after mem compaction")
376	}
377
378	t.Log("memdb compaction done")
379}
380
381func (h *dbHarness) compactRangeAtErr(level int, min, max string, wanterr bool) {
382	t := h.t
383	db := h.db
384
385	var _min, _max []byte
386	if min != "" {
387		_min = []byte(min)
388	}
389	if max != "" {
390		_max = []byte(max)
391	}
392
393	t.Logf("starting table range compaction: level=%d, min=%q, max=%q", level, min, max)
394
395	if err := db.compTriggerRange(db.tcompCmdC, level, _min, _max); err != nil {
396		if wanterr {
397			t.Log("CompactRangeAt: got error (expected): ", err)
398		} else {
399			t.Error("CompactRangeAt: got error: ", err)
400		}
401	} else if wanterr {
402		t.Error("CompactRangeAt: expect error")
403	}
404
405	t.Log("table range compaction done")
406}
407
408func (h *dbHarness) compactRangeAt(level int, min, max string) {
409	h.compactRangeAtErr(level, min, max, false)
410}
411
412func (h *dbHarness) compactRange(min, max string) {
413	t := h.t
414	db := h.db
415
416	t.Logf("starting DB range compaction: min=%q, max=%q", min, max)
417
418	var r util.Range
419	if min != "" {
420		r.Start = []byte(min)
421	}
422	if max != "" {
423		r.Limit = []byte(max)
424	}
425	if err := db.CompactRange(r); err != nil {
426		t.Error("CompactRange: got error: ", err)
427	}
428
429	t.Log("DB range compaction done")
430}
431
432func (h *dbHarness) sizeOf(start, limit string) int64 {
433	sz, err := h.db.SizeOf([]util.Range{
434		{Start: []byte(start), Limit: []byte(limit)},
435	})
436	if err != nil {
437		h.t.Error("SizeOf: got error: ", err)
438	}
439	return sz.Sum()
440}
441
442func (h *dbHarness) sizeAssert(start, limit string, low, hi int64) {
443	sz := h.sizeOf(start, limit)
444	if sz < low || sz > hi {
445		h.t.Errorf("sizeOf %q to %q not in range, want %d - %d, got %d",
446			shorten(start), shorten(limit), low, hi, sz)
447	}
448}
449
450func (h *dbHarness) getSnapshot() (s *Snapshot) {
451	s, err := h.db.GetSnapshot()
452	if err != nil {
453		h.t.Fatal("GetSnapshot: got error: ", err)
454	}
455	return
456}
457
458func (h *dbHarness) getTablesPerLevel() string {
459	res := ""
460	nz := 0
461	v := h.db.s.version()
462	for level, tables := range v.levels {
463		if level > 0 {
464			res += ","
465		}
466		res += fmt.Sprint(len(tables))
467		if len(tables) > 0 {
468			nz = len(res)
469		}
470	}
471	v.release()
472	return res[:nz]
473}
474
475func (h *dbHarness) tablesPerLevel(want string) {
476	res := h.getTablesPerLevel()
477	if res != want {
478		h.t.Errorf("invalid tables len, want=%s, got=%s", want, res)
479	}
480}
481
482func (h *dbHarness) totalTables() (n int) {
483	v := h.db.s.version()
484	for _, tables := range v.levels {
485		n += len(tables)
486	}
487	v.release()
488	return
489}
490
491type keyValue interface {
492	Key() []byte
493	Value() []byte
494}
495
496func testKeyVal(t *testing.T, kv keyValue, want string) {
497	res := string(kv.Key()) + "->" + string(kv.Value())
498	if res != want {
499		t.Errorf("invalid key/value, want=%q, got=%q", want, res)
500	}
501}
502
503func numKey(num int) string {
504	return fmt.Sprintf("key%06d", num)
505}
506
507var testingBloomFilter = filter.NewBloomFilter(10)
508
509func truno(t *testing.T, o *opt.Options, f func(h *dbHarness)) {
510	for i := 0; i < 4; i++ {
511		func() {
512			switch i {
513			case 0:
514			case 1:
515				if o == nil {
516					o = &opt.Options{
517						DisableLargeBatchTransaction: true,
518						Filter:                       testingBloomFilter,
519					}
520				} else {
521					old := o
522					o = &opt.Options{}
523					*o = *old
524					o.Filter = testingBloomFilter
525				}
526			case 2:
527				if o == nil {
528					o = &opt.Options{
529						DisableLargeBatchTransaction: true,
530						Compression:                  opt.NoCompression,
531					}
532				} else {
533					old := o
534					o = &opt.Options{}
535					*o = *old
536					o.Compression = opt.NoCompression
537				}
538			}
539			h := newDbHarnessWopt(t, o)
540			defer h.close()
541			switch i {
542			case 3:
543				h.reopenDB()
544			}
545			f(h)
546		}()
547	}
548}
549
550func trun(t *testing.T, f func(h *dbHarness)) {
551	truno(t, nil, f)
552}
553
554func testAligned(t *testing.T, name string, offset uintptr) {
555	if offset%8 != 0 {
556		t.Errorf("field %s offset is not 64-bit aligned", name)
557	}
558}
559
560func Test_FieldsAligned(t *testing.T) {
561	p1 := new(DB)
562	testAligned(t, "DB.seq", unsafe.Offsetof(p1.seq))
563	p2 := new(session)
564	testAligned(t, "session.stNextFileNum", unsafe.Offsetof(p2.stNextFileNum))
565	testAligned(t, "session.stJournalNum", unsafe.Offsetof(p2.stJournalNum))
566	testAligned(t, "session.stPrevJournalNum", unsafe.Offsetof(p2.stPrevJournalNum))
567	testAligned(t, "session.stSeqNum", unsafe.Offsetof(p2.stSeqNum))
568}
569
570func TestDB_Locking(t *testing.T) {
571	h := newDbHarness(t)
572	defer h.stor.Close()
573	h.openAssert(false)
574	h.closeDB()
575	h.openAssert(true)
576}
577
578func TestDB_Empty(t *testing.T) {
579	trun(t, func(h *dbHarness) {
580		h.get("foo", false)
581
582		h.reopenDB()
583		h.get("foo", false)
584	})
585}
586
587func TestDB_ReadWrite(t *testing.T) {
588	trun(t, func(h *dbHarness) {
589		h.put("foo", "v1")
590		h.getVal("foo", "v1")
591		h.put("bar", "v2")
592		h.put("foo", "v3")
593		h.getVal("foo", "v3")
594		h.getVal("bar", "v2")
595
596		h.reopenDB()
597		h.getVal("foo", "v3")
598		h.getVal("bar", "v2")
599	})
600}
601
602func TestDB_PutDeleteGet(t *testing.T) {
603	trun(t, func(h *dbHarness) {
604		h.put("foo", "v1")
605		h.getVal("foo", "v1")
606		h.put("foo", "v2")
607		h.getVal("foo", "v2")
608		h.delete("foo")
609		h.get("foo", false)
610
611		h.reopenDB()
612		h.get("foo", false)
613	})
614}
615
616func TestDB_EmptyBatch(t *testing.T) {
617	h := newDbHarness(t)
618	defer h.close()
619
620	h.get("foo", false)
621	err := h.db.Write(new(Batch), h.wo)
622	if err != nil {
623		t.Error("writing empty batch yield error: ", err)
624	}
625	h.get("foo", false)
626}
627
628func TestDB_GetFromFrozen(t *testing.T) {
629	h := newDbHarnessWopt(t, &opt.Options{
630		DisableLargeBatchTransaction: true,
631		WriteBuffer:                  100100,
632	})
633	defer h.close()
634
635	h.put("foo", "v1")
636	h.getVal("foo", "v1")
637
638	h.stor.Stall(testutil.ModeSync, storage.TypeTable) // Block sync calls
639	h.put("k1", strings.Repeat("x", 100000))           // Fill memtable
640	h.put("k2", strings.Repeat("y", 100000))           // Trigger compaction
641	for i := 0; h.db.getFrozenMem() == nil && i < 100; i++ {
642		time.Sleep(10 * time.Microsecond)
643	}
644	if h.db.getFrozenMem() == nil {
645		h.stor.Release(testutil.ModeSync, storage.TypeTable)
646		t.Fatal("No frozen mem")
647	}
648	h.getVal("foo", "v1")
649	h.stor.Release(testutil.ModeSync, storage.TypeTable) // Release sync calls
650
651	h.reopenDB()
652	h.getVal("foo", "v1")
653	h.get("k1", true)
654	h.get("k2", true)
655}
656
657func TestDB_GetFromTable(t *testing.T) {
658	trun(t, func(h *dbHarness) {
659		h.put("foo", "v1")
660		h.compactMem()
661		h.getVal("foo", "v1")
662	})
663}
664
665func TestDB_GetSnapshot(t *testing.T) {
666	trun(t, func(h *dbHarness) {
667		bar := strings.Repeat("b", 200)
668		h.put("foo", "v1")
669		h.put(bar, "v1")
670
671		snap, err := h.db.GetSnapshot()
672		if err != nil {
673			t.Fatal("GetSnapshot: got error: ", err)
674		}
675
676		h.put("foo", "v2")
677		h.put(bar, "v2")
678
679		h.getVal("foo", "v2")
680		h.getVal(bar, "v2")
681		h.getValr(snap, "foo", "v1")
682		h.getValr(snap, bar, "v1")
683
684		h.compactMem()
685
686		h.getVal("foo", "v2")
687		h.getVal(bar, "v2")
688		h.getValr(snap, "foo", "v1")
689		h.getValr(snap, bar, "v1")
690
691		snap.Release()
692
693		h.reopenDB()
694		h.getVal("foo", "v2")
695		h.getVal(bar, "v2")
696	})
697}
698
699func TestDB_GetLevel0Ordering(t *testing.T) {
700	trun(t, func(h *dbHarness) {
701		h.db.memdbMaxLevel = 2
702
703		for i := 0; i < 4; i++ {
704			h.put("bar", fmt.Sprintf("b%d", i))
705			h.put("foo", fmt.Sprintf("v%d", i))
706			h.compactMem()
707		}
708		h.getVal("foo", "v3")
709		h.getVal("bar", "b3")
710
711		v := h.db.s.version()
712		t0len := v.tLen(0)
713		v.release()
714		if t0len < 2 {
715			t.Errorf("level-0 tables is less than 2, got %d", t0len)
716		}
717
718		h.reopenDB()
719		h.getVal("foo", "v3")
720		h.getVal("bar", "b3")
721	})
722}
723
724func TestDB_GetOrderedByLevels(t *testing.T) {
725	trun(t, func(h *dbHarness) {
726		h.put("foo", "v1")
727		h.compactMem()
728		h.compactRange("a", "z")
729		h.getVal("foo", "v1")
730		h.put("foo", "v2")
731		h.compactMem()
732		h.getVal("foo", "v2")
733	})
734}
735
736func TestDB_GetPicksCorrectFile(t *testing.T) {
737	trun(t, func(h *dbHarness) {
738		// Arrange to have multiple files in a non-level-0 level.
739		h.put("a", "va")
740		h.compactMem()
741		h.compactRange("a", "b")
742		h.put("x", "vx")
743		h.compactMem()
744		h.compactRange("x", "y")
745		h.put("f", "vf")
746		h.compactMem()
747		h.compactRange("f", "g")
748
749		h.getVal("a", "va")
750		h.getVal("f", "vf")
751		h.getVal("x", "vx")
752
753		h.compactRange("", "")
754		h.getVal("a", "va")
755		h.getVal("f", "vf")
756		h.getVal("x", "vx")
757	})
758}
759
760func TestDB_GetEncountersEmptyLevel(t *testing.T) {
761	trun(t, func(h *dbHarness) {
762		h.db.memdbMaxLevel = 2
763
764		// Arrange for the following to happen:
765		//   * sstable A in level 0
766		//   * nothing in level 1
767		//   * sstable B in level 2
768		// Then do enough Get() calls to arrange for an automatic compaction
769		// of sstable A.  A bug would cause the compaction to be marked as
770		// occurring at level 1 (instead of the correct level 0).
771
772		// Step 1: First place sstables in levels 0 and 2
773		for i := 0; ; i++ {
774			if i >= 100 {
775				t.Fatal("could not fill levels-0 and level-2")
776			}
777			v := h.db.s.version()
778			if v.tLen(0) > 0 && v.tLen(2) > 0 {
779				v.release()
780				break
781			}
782			v.release()
783			h.put("a", "begin")
784			h.put("z", "end")
785			h.compactMem()
786
787			h.getVal("a", "begin")
788			h.getVal("z", "end")
789		}
790
791		// Step 2: clear level 1 if necessary.
792		h.compactRangeAt(1, "", "")
793		h.tablesPerLevel("1,0,1")
794
795		h.getVal("a", "begin")
796		h.getVal("z", "end")
797
798		// Step 3: read a bunch of times
799		for i := 0; i < 200; i++ {
800			h.get("missing", false)
801		}
802
803		// Step 4: Wait for compaction to finish
804		h.waitCompaction()
805
806		v := h.db.s.version()
807		if v.tLen(0) > 0 {
808			t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
809		}
810		v.release()
811
812		h.getVal("a", "begin")
813		h.getVal("z", "end")
814	})
815}
816
817func TestDB_IterMultiWithDelete(t *testing.T) {
818	trun(t, func(h *dbHarness) {
819		h.put("a", "va")
820		h.put("b", "vb")
821		h.put("c", "vc")
822		h.delete("b")
823		h.get("b", false)
824
825		iter := h.db.NewIterator(nil, nil)
826		iter.Seek([]byte("c"))
827		testKeyVal(t, iter, "c->vc")
828		iter.Prev()
829		testKeyVal(t, iter, "a->va")
830		iter.Release()
831
832		h.compactMem()
833
834		iter = h.db.NewIterator(nil, nil)
835		iter.Seek([]byte("c"))
836		testKeyVal(t, iter, "c->vc")
837		iter.Prev()
838		testKeyVal(t, iter, "a->va")
839		iter.Release()
840	})
841}
842
843func TestDB_IteratorPinsRef(t *testing.T) {
844	h := newDbHarness(t)
845	defer h.close()
846
847	h.put("foo", "hello")
848
849	// Get iterator that will yield the current contents of the DB.
850	iter := h.db.NewIterator(nil, nil)
851
852	// Write to force compactions
853	h.put("foo", "newvalue1")
854	for i := 0; i < 100; i++ {
855		h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
856	}
857	h.put("foo", "newvalue2")
858
859	iter.First()
860	testKeyVal(t, iter, "foo->hello")
861	if iter.Next() {
862		t.Errorf("expect eof")
863	}
864	iter.Release()
865}
866
867func TestDB_Recover(t *testing.T) {
868	trun(t, func(h *dbHarness) {
869		h.put("foo", "v1")
870		h.put("baz", "v5")
871
872		h.reopenDB()
873		h.getVal("foo", "v1")
874
875		h.getVal("foo", "v1")
876		h.getVal("baz", "v5")
877		h.put("bar", "v2")
878		h.put("foo", "v3")
879
880		h.reopenDB()
881		h.getVal("foo", "v3")
882		h.put("foo", "v4")
883		h.getVal("foo", "v4")
884		h.getVal("bar", "v2")
885		h.getVal("baz", "v5")
886	})
887}
888
889func TestDB_RecoverWithEmptyJournal(t *testing.T) {
890	trun(t, func(h *dbHarness) {
891		h.put("foo", "v1")
892		h.put("foo", "v2")
893
894		h.reopenDB()
895		h.reopenDB()
896		h.put("foo", "v3")
897
898		h.reopenDB()
899		h.getVal("foo", "v3")
900	})
901}
902
903func TestDB_RecoverDuringMemtableCompaction(t *testing.T) {
904	truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 1000000}, func(h *dbHarness) {
905
906		h.stor.Stall(testutil.ModeSync, storage.TypeTable)
907		h.put("big1", strings.Repeat("x", 10000000))
908		h.put("big2", strings.Repeat("y", 1000))
909		h.put("bar", "v2")
910		h.stor.Release(testutil.ModeSync, storage.TypeTable)
911
912		h.reopenDB()
913		h.getVal("bar", "v2")
914		h.getVal("big1", strings.Repeat("x", 10000000))
915		h.getVal("big2", strings.Repeat("y", 1000))
916	})
917}
918
919func TestDB_MinorCompactionsHappen(t *testing.T) {
920	h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 10000})
921	defer h.close()
922
923	n := 500
924
925	key := func(i int) string {
926		return fmt.Sprintf("key%06d", i)
927	}
928
929	for i := 0; i < n; i++ {
930		h.put(key(i), key(i)+strings.Repeat("v", 1000))
931	}
932
933	for i := 0; i < n; i++ {
934		h.getVal(key(i), key(i)+strings.Repeat("v", 1000))
935	}
936
937	h.reopenDB()
938	for i := 0; i < n; i++ {
939		h.getVal(key(i), key(i)+strings.Repeat("v", 1000))
940	}
941}
942
943func TestDB_RecoverWithLargeJournal(t *testing.T) {
944	h := newDbHarness(t)
945	defer h.close()
946
947	h.put("big1", strings.Repeat("1", 200000))
948	h.put("big2", strings.Repeat("2", 200000))
949	h.put("small3", strings.Repeat("3", 10))
950	h.put("small4", strings.Repeat("4", 10))
951	h.tablesPerLevel("")
952
953	// Make sure that if we re-open with a small write buffer size that
954	// we flush table files in the middle of a large journal file.
955	h.o.WriteBuffer = 100000
956	h.reopenDB()
957	h.getVal("big1", strings.Repeat("1", 200000))
958	h.getVal("big2", strings.Repeat("2", 200000))
959	h.getVal("small3", strings.Repeat("3", 10))
960	h.getVal("small4", strings.Repeat("4", 10))
961	v := h.db.s.version()
962	if v.tLen(0) <= 1 {
963		t.Errorf("tables-0 less than one")
964	}
965	v.release()
966}
967
968func TestDB_CompactionsGenerateMultipleFiles(t *testing.T) {
969	h := newDbHarnessWopt(t, &opt.Options{
970		DisableLargeBatchTransaction: true,
971		WriteBuffer:                  10000000,
972		Compression:                  opt.NoCompression,
973	})
974	defer h.close()
975
976	v := h.db.s.version()
977	if v.tLen(0) > 0 {
978		t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
979	}
980	v.release()
981
982	n := 80
983
984	// Write 8MB (80 values, each 100K)
985	for i := 0; i < n; i++ {
986		h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
987	}
988
989	// Reopening moves updates to level-0
990	h.reopenDB()
991	h.compactRangeAt(0, "", "")
992
993	v = h.db.s.version()
994	if v.tLen(0) > 0 {
995		t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
996	}
997	if v.tLen(1) <= 1 {
998		t.Errorf("level-1 tables less than 1, got %d", v.tLen(1))
999	}
1000	v.release()
1001
1002	for i := 0; i < n; i++ {
1003		h.getVal(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
1004	}
1005}
1006
1007func TestDB_RepeatedWritesToSameKey(t *testing.T) {
1008	h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 100000})
1009	defer h.close()
1010
1011	maxTables := h.o.GetWriteL0PauseTrigger() + 7
1012
1013	value := strings.Repeat("v", 2*h.o.GetWriteBuffer())
1014	for i := 0; i < 5*maxTables; i++ {
1015		h.put("key", value)
1016		n := h.totalTables()
1017		if n > maxTables {
1018			t.Errorf("total tables exceed %d, got=%d, iter=%d", maxTables, n, i)
1019		}
1020	}
1021}
1022
1023func TestDB_RepeatedWritesToSameKeyAfterReopen(t *testing.T) {
1024	h := newDbHarnessWopt(t, &opt.Options{
1025		DisableLargeBatchTransaction: true,
1026		WriteBuffer:                  100000,
1027	})
1028	defer h.close()
1029
1030	h.reopenDB()
1031
1032	maxTables := h.o.GetWriteL0PauseTrigger() + 7
1033
1034	value := strings.Repeat("v", 2*h.o.GetWriteBuffer())
1035	for i := 0; i < 5*maxTables; i++ {
1036		h.put("key", value)
1037		n := h.totalTables()
1038		if n > maxTables {
1039			t.Errorf("total tables exceed %d, got=%d, iter=%d", maxTables, n, i)
1040		}
1041	}
1042}
1043
1044func TestDB_SparseMerge(t *testing.T) {
1045	h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, Compression: opt.NoCompression})
1046	defer h.close()
1047
1048	h.putMulti(7, "A", "Z")
1049
1050	// Suppose there is:
1051	//    small amount of data with prefix A
1052	//    large amount of data with prefix B
1053	//    small amount of data with prefix C
1054	// and that recent updates have made small changes to all three prefixes.
1055	// Check that we do not do a compaction that merges all of B in one shot.
1056	h.put("A", "va")
1057	value := strings.Repeat("x", 1000)
1058	for i := 0; i < 100000; i++ {
1059		h.put(fmt.Sprintf("B%010d", i), value)
1060	}
1061	h.put("C", "vc")
1062	h.compactMem()
1063	h.compactRangeAt(0, "", "")
1064	h.waitCompaction()
1065
1066	// Make sparse update
1067	h.put("A", "va2")
1068	h.put("B100", "bvalue2")
1069	h.put("C", "vc2")
1070	h.compactMem()
1071
1072	h.waitCompaction()
1073	h.maxNextLevelOverlappingBytes(20 * 1048576)
1074	h.compactRangeAt(0, "", "")
1075	h.waitCompaction()
1076	h.maxNextLevelOverlappingBytes(20 * 1048576)
1077	h.compactRangeAt(1, "", "")
1078	h.waitCompaction()
1079	h.maxNextLevelOverlappingBytes(20 * 1048576)
1080}
1081
1082func TestDB_SizeOf(t *testing.T) {
1083	h := newDbHarnessWopt(t, &opt.Options{
1084		DisableLargeBatchTransaction: true,
1085		Compression:                  opt.NoCompression,
1086		WriteBuffer:                  10000000,
1087	})
1088	defer h.close()
1089
1090	h.sizeAssert("", "xyz", 0, 0)
1091	h.reopenDB()
1092	h.sizeAssert("", "xyz", 0, 0)
1093
1094	// Write 8MB (80 values, each 100K)
1095	n := 80
1096	s1 := 100000
1097	s2 := 105000
1098
1099	for i := 0; i < n; i++ {
1100		h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), s1/10))
1101	}
1102
1103	// 0 because SizeOf() does not account for memtable space
1104	h.sizeAssert("", numKey(50), 0, 0)
1105
1106	for r := 0; r < 3; r++ {
1107		h.reopenDB()
1108
1109		for cs := 0; cs < n; cs += 10 {
1110			for i := 0; i < n; i += 10 {
1111				h.sizeAssert("", numKey(i), int64(s1*i), int64(s2*i))
1112				h.sizeAssert("", numKey(i)+".suffix", int64(s1*(i+1)), int64(s2*(i+1)))
1113				h.sizeAssert(numKey(i), numKey(i+10), int64(s1*10), int64(s2*10))
1114			}
1115
1116			h.sizeAssert("", numKey(50), int64(s1*50), int64(s2*50))
1117			h.sizeAssert("", numKey(50)+".suffix", int64(s1*50), int64(s2*50))
1118
1119			h.compactRangeAt(0, numKey(cs), numKey(cs+9))
1120		}
1121
1122		v := h.db.s.version()
1123		if v.tLen(0) != 0 {
1124			t.Errorf("level-0 tables was not zero, got %d", v.tLen(0))
1125		}
1126		if v.tLen(1) == 0 {
1127			t.Error("level-1 tables was zero")
1128		}
1129		v.release()
1130	}
1131}
1132
1133func TestDB_SizeOf_MixOfSmallAndLarge(t *testing.T) {
1134	h := newDbHarnessWopt(t, &opt.Options{
1135		DisableLargeBatchTransaction: true,
1136		Compression:                  opt.NoCompression,
1137	})
1138	defer h.close()
1139
1140	sizes := []int64{
1141		10000,
1142		10000,
1143		100000,
1144		10000,
1145		100000,
1146		10000,
1147		300000,
1148		10000,
1149	}
1150
1151	for i, n := range sizes {
1152		h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), int(n)/10))
1153	}
1154
1155	for r := 0; r < 3; r++ {
1156		h.reopenDB()
1157
1158		var x int64
1159		for i, n := range sizes {
1160			y := x
1161			if i > 0 {
1162				y += 1000
1163			}
1164			h.sizeAssert("", numKey(i), x, y)
1165			x += n
1166		}
1167
1168		h.sizeAssert(numKey(3), numKey(5), 110000, 111000)
1169
1170		h.compactRangeAt(0, "", "")
1171	}
1172}
1173
1174func TestDB_Snapshot(t *testing.T) {
1175	trun(t, func(h *dbHarness) {
1176		h.put("foo", "v1")
1177		s1 := h.getSnapshot()
1178		h.put("foo", "v2")
1179		s2 := h.getSnapshot()
1180		h.put("foo", "v3")
1181		s3 := h.getSnapshot()
1182		h.put("foo", "v4")
1183
1184		h.getValr(s1, "foo", "v1")
1185		h.getValr(s2, "foo", "v2")
1186		h.getValr(s3, "foo", "v3")
1187		h.getVal("foo", "v4")
1188
1189		s3.Release()
1190		h.getValr(s1, "foo", "v1")
1191		h.getValr(s2, "foo", "v2")
1192		h.getVal("foo", "v4")
1193
1194		s1.Release()
1195		h.getValr(s2, "foo", "v2")
1196		h.getVal("foo", "v4")
1197
1198		s2.Release()
1199		h.getVal("foo", "v4")
1200	})
1201}
1202
1203func TestDB_SnapshotList(t *testing.T) {
1204	db := &DB{snapsList: list.New()}
1205	e0a := db.acquireSnapshot()
1206	e0b := db.acquireSnapshot()
1207	db.seq = 1
1208	e1 := db.acquireSnapshot()
1209	db.seq = 2
1210	e2 := db.acquireSnapshot()
1211
1212	if db.minSeq() != 0 {
1213		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1214	}
1215	db.releaseSnapshot(e0a)
1216	if db.minSeq() != 0 {
1217		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1218	}
1219	db.releaseSnapshot(e2)
1220	if db.minSeq() != 0 {
1221		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1222	}
1223	db.releaseSnapshot(e0b)
1224	if db.minSeq() != 1 {
1225		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1226	}
1227	e2 = db.acquireSnapshot()
1228	if db.minSeq() != 1 {
1229		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1230	}
1231	db.releaseSnapshot(e1)
1232	if db.minSeq() != 2 {
1233		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1234	}
1235	db.releaseSnapshot(e2)
1236	if db.minSeq() != 2 {
1237		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1238	}
1239}
1240
1241func TestDB_HiddenValuesAreRemoved(t *testing.T) {
1242	trun(t, func(h *dbHarness) {
1243		s := h.db.s
1244
1245		m := 2
1246		h.db.memdbMaxLevel = m
1247
1248		h.put("foo", "v1")
1249		h.compactMem()
1250		v := s.version()
1251		num := v.tLen(m)
1252		v.release()
1253		if num != 1 {
1254			t.Errorf("invalid level-%d len, want=1 got=%d", m, num)
1255		}
1256
1257		// Place a table at level last-1 to prevent merging with preceding mutation
1258		h.put("a", "begin")
1259		h.put("z", "end")
1260		h.compactMem()
1261		v = s.version()
1262		if v.tLen(m) != 1 {
1263			t.Errorf("invalid level-%d len, want=1 got=%d", m, v.tLen(m))
1264		}
1265		if v.tLen(m-1) != 1 {
1266			t.Errorf("invalid level-%d len, want=1 got=%d", m-1, v.tLen(m-1))
1267		}
1268		v.release()
1269
1270		h.delete("foo")
1271		h.put("foo", "v2")
1272		h.allEntriesFor("foo", "[ v2, DEL, v1 ]")
1273		h.compactMem()
1274		h.allEntriesFor("foo", "[ v2, DEL, v1 ]")
1275		h.compactRangeAt(m-2, "", "z")
1276		// DEL eliminated, but v1 remains because we aren't compacting that level
1277		// (DEL can be eliminated because v2 hides v1).
1278		h.allEntriesFor("foo", "[ v2, v1 ]")
1279		h.compactRangeAt(m-1, "", "")
1280		// Merging last-1 w/ last, so we are the base level for "foo", so
1281		// DEL is removed.  (as is v1).
1282		h.allEntriesFor("foo", "[ v2 ]")
1283	})
1284}
1285
1286func TestDB_DeletionMarkers2(t *testing.T) {
1287	h := newDbHarness(t)
1288	defer h.close()
1289	s := h.db.s
1290
1291	m := 2
1292	h.db.memdbMaxLevel = m
1293
1294	h.put("foo", "v1")
1295	h.compactMem()
1296	v := s.version()
1297	num := v.tLen(m)
1298	v.release()
1299	if num != 1 {
1300		t.Errorf("invalid level-%d len, want=1 got=%d", m, num)
1301	}
1302
1303	// Place a table at level last-1 to prevent merging with preceding mutation
1304	h.put("a", "begin")
1305	h.put("z", "end")
1306	h.compactMem()
1307	v = s.version()
1308	if v.tLen(m) != 1 {
1309		t.Errorf("invalid level-%d len, want=1 got=%d", m, v.tLen(m))
1310	}
1311	if v.tLen(m-1) != 1 {
1312		t.Errorf("invalid level-%d len, want=1 got=%d", m-1, v.tLen(m-1))
1313	}
1314	v.release()
1315
1316	h.delete("foo")
1317	h.allEntriesFor("foo", "[ DEL, v1 ]")
1318	h.compactMem() // Moves to level last-2
1319	h.allEntriesFor("foo", "[ DEL, v1 ]")
1320	h.compactRangeAt(m-2, "", "")
1321	// DEL kept: "last" file overlaps
1322	h.allEntriesFor("foo", "[ DEL, v1 ]")
1323	h.compactRangeAt(m-1, "", "")
1324	// Merging last-1 w/ last, so we are the base level for "foo", so
1325	// DEL is removed.  (as is v1).
1326	h.allEntriesFor("foo", "[ ]")
1327}
1328
1329func TestDB_CompactionTableOpenError(t *testing.T) {
1330	h := newDbHarnessWopt(t, &opt.Options{
1331		DisableLargeBatchTransaction: true,
1332		OpenFilesCacheCapacity:       -1,
1333	})
1334	defer h.close()
1335
1336	h.db.memdbMaxLevel = 2
1337
1338	im := 10
1339	jm := 10
1340	for r := 0; r < 2; r++ {
1341		for i := 0; i < im; i++ {
1342			for j := 0; j < jm; j++ {
1343				h.put(fmt.Sprintf("k%d,%d", i, j), fmt.Sprintf("v%d,%d", i, j))
1344			}
1345			h.compactMem()
1346		}
1347	}
1348
1349	if n := h.totalTables(); n != im*2 {
1350		t.Errorf("total tables is %d, want %d", n, im*2)
1351	}
1352
1353	h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, errors.New("open error during table compaction"))
1354	go h.db.CompactRange(util.Range{})
1355	if err := h.db.compTriggerWait(h.db.tcompCmdC); err != nil {
1356		t.Log("compaction error: ", err)
1357	}
1358	h.closeDB0()
1359	h.openDB()
1360	h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, nil)
1361
1362	for i := 0; i < im; i++ {
1363		for j := 0; j < jm; j++ {
1364			h.getVal(fmt.Sprintf("k%d,%d", i, j), fmt.Sprintf("v%d,%d", i, j))
1365		}
1366	}
1367}
1368
1369func TestDB_OverlapInLevel0(t *testing.T) {
1370	trun(t, func(h *dbHarness) {
1371		h.db.memdbMaxLevel = 2
1372
1373		// Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0.
1374		h.put("100", "v100")
1375		h.put("999", "v999")
1376		h.compactMem()
1377		h.delete("100")
1378		h.delete("999")
1379		h.compactMem()
1380		h.tablesPerLevel("0,1,1")
1381
1382		// Make files spanning the following ranges in level-0:
1383		//  files[0]  200 .. 900
1384		//  files[1]  300 .. 500
1385		// Note that files are sorted by min key.
1386		h.put("300", "v300")
1387		h.put("500", "v500")
1388		h.compactMem()
1389		h.put("200", "v200")
1390		h.put("600", "v600")
1391		h.put("900", "v900")
1392		h.compactMem()
1393		h.tablesPerLevel("2,1,1")
1394
1395		// Compact away the placeholder files we created initially
1396		h.compactRangeAt(1, "", "")
1397		h.compactRangeAt(2, "", "")
1398		h.tablesPerLevel("2")
1399
1400		// Do a memtable compaction.  Before bug-fix, the compaction would
1401		// not detect the overlap with level-0 files and would incorrectly place
1402		// the deletion in a deeper level.
1403		h.delete("600")
1404		h.compactMem()
1405		h.tablesPerLevel("3")
1406		h.get("600", false)
1407	})
1408}
1409
1410func TestDB_L0_CompactionBug_Issue44_a(t *testing.T) {
1411	h := newDbHarness(t)
1412	defer h.close()
1413
1414	h.reopenDB()
1415	h.put("b", "v")
1416	h.reopenDB()
1417	h.delete("b")
1418	h.delete("a")
1419	h.reopenDB()
1420	h.delete("a")
1421	h.reopenDB()
1422	h.put("a", "v")
1423	h.reopenDB()
1424	h.reopenDB()
1425	h.getKeyVal("(a->v)")
1426	h.waitCompaction()
1427	h.getKeyVal("(a->v)")
1428}
1429
1430func TestDB_L0_CompactionBug_Issue44_b(t *testing.T) {
1431	h := newDbHarness(t)
1432	defer h.close()
1433
1434	h.reopenDB()
1435	h.put("", "")
1436	h.reopenDB()
1437	h.delete("e")
1438	h.put("", "")
1439	h.reopenDB()
1440	h.put("c", "cv")
1441	h.reopenDB()
1442	h.put("", "")
1443	h.reopenDB()
1444	h.put("", "")
1445	h.waitCompaction()
1446	h.reopenDB()
1447	h.put("d", "dv")
1448	h.reopenDB()
1449	h.put("", "")
1450	h.reopenDB()
1451	h.delete("d")
1452	h.delete("b")
1453	h.reopenDB()
1454	h.getKeyVal("(->)(c->cv)")
1455	h.waitCompaction()
1456	h.getKeyVal("(->)(c->cv)")
1457}
1458
1459func TestDB_SingleEntryMemCompaction(t *testing.T) {
1460	trun(t, func(h *dbHarness) {
1461		for i := 0; i < 10; i++ {
1462			h.put("big", strings.Repeat("v", opt.DefaultWriteBuffer))
1463			h.compactMem()
1464			h.put("key", strings.Repeat("v", opt.DefaultBlockSize))
1465			h.compactMem()
1466			h.put("k", "v")
1467			h.compactMem()
1468			h.put("", "")
1469			h.compactMem()
1470			h.put("verybig", strings.Repeat("v", opt.DefaultWriteBuffer*2))
1471			h.compactMem()
1472		}
1473	})
1474}
1475
1476func TestDB_ManifestWriteError(t *testing.T) {
1477	for i := 0; i < 2; i++ {
1478		func() {
1479			h := newDbHarness(t)
1480			defer h.close()
1481
1482			h.put("foo", "bar")
1483			h.getVal("foo", "bar")
1484
1485			// Mem compaction (will succeed)
1486			h.compactMem()
1487			h.getVal("foo", "bar")
1488			v := h.db.s.version()
1489			if n := v.tLen(0); n != 1 {
1490				t.Errorf("invalid total tables, want=1 got=%d", n)
1491			}
1492			v.release()
1493
1494			if i == 0 {
1495				h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, errors.New("manifest write error"))
1496			} else {
1497				h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, errors.New("manifest sync error"))
1498			}
1499
1500			// Merging compaction (will fail)
1501			h.compactRangeAtErr(0, "", "", true)
1502
1503			h.db.Close()
1504			h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, nil)
1505			h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, nil)
1506
1507			// Should not lose data
1508			h.openDB()
1509			h.getVal("foo", "bar")
1510		}()
1511	}
1512}
1513
1514func assertErr(t *testing.T, err error, wanterr bool) {
1515	if err != nil {
1516		if wanterr {
1517			t.Log("AssertErr: got error (expected): ", err)
1518		} else {
1519			t.Error("AssertErr: got error: ", err)
1520		}
1521	} else if wanterr {
1522		t.Error("AssertErr: expect error")
1523	}
1524}
1525
1526func TestDB_ClosedIsClosed(t *testing.T) {
1527	h := newDbHarness(t)
1528	db := h.db
1529
1530	var iter, iter2 iterator.Iterator
1531	var snap *Snapshot
1532	func() {
1533		defer h.close()
1534
1535		h.put("k", "v")
1536		h.getVal("k", "v")
1537
1538		iter = db.NewIterator(nil, h.ro)
1539		iter.Seek([]byte("k"))
1540		testKeyVal(t, iter, "k->v")
1541
1542		var err error
1543		snap, err = db.GetSnapshot()
1544		if err != nil {
1545			t.Fatal("GetSnapshot: got error: ", err)
1546		}
1547
1548		h.getValr(snap, "k", "v")
1549
1550		iter2 = snap.NewIterator(nil, h.ro)
1551		iter2.Seek([]byte("k"))
1552		testKeyVal(t, iter2, "k->v")
1553
1554		h.put("foo", "v2")
1555		h.delete("foo")
1556
1557		// closing DB
1558		iter.Release()
1559		iter2.Release()
1560	}()
1561
1562	assertErr(t, db.Put([]byte("x"), []byte("y"), h.wo), true)
1563	_, err := db.Get([]byte("k"), h.ro)
1564	assertErr(t, err, true)
1565
1566	if iter.Valid() {
1567		t.Errorf("iter.Valid should false")
1568	}
1569	assertErr(t, iter.Error(), false)
1570	testKeyVal(t, iter, "->")
1571	if iter.Seek([]byte("k")) {
1572		t.Errorf("iter.Seek should false")
1573	}
1574	assertErr(t, iter.Error(), true)
1575
1576	assertErr(t, iter2.Error(), false)
1577
1578	_, err = snap.Get([]byte("k"), h.ro)
1579	assertErr(t, err, true)
1580
1581	_, err = db.GetSnapshot()
1582	assertErr(t, err, true)
1583
1584	iter3 := db.NewIterator(nil, h.ro)
1585	assertErr(t, iter3.Error(), true)
1586
1587	iter3 = snap.NewIterator(nil, h.ro)
1588	assertErr(t, iter3.Error(), true)
1589
1590	assertErr(t, db.Delete([]byte("k"), h.wo), true)
1591
1592	_, err = db.GetProperty("leveldb.stats")
1593	assertErr(t, err, true)
1594
1595	_, err = db.SizeOf([]util.Range{{Start: []byte("a"), Limit: []byte("z")}})
1596	assertErr(t, err, true)
1597
1598	assertErr(t, db.CompactRange(util.Range{}), true)
1599
1600	assertErr(t, db.Close(), true)
1601}
1602
1603type numberComparer struct{}
1604
1605func (numberComparer) num(x []byte) (n int) {
1606	fmt.Sscan(string(x[1:len(x)-1]), &n)
1607	return
1608}
1609
1610func (numberComparer) Name() string {
1611	return "test.NumberComparer"
1612}
1613
1614func (p numberComparer) Compare(a, b []byte) int {
1615	return p.num(a) - p.num(b)
1616}
1617
1618func (numberComparer) Separator(dst, a, b []byte) []byte { return nil }
1619func (numberComparer) Successor(dst, b []byte) []byte    { return nil }
1620
1621func TestDB_CustomComparer(t *testing.T) {
1622	h := newDbHarnessWopt(t, &opt.Options{
1623		DisableLargeBatchTransaction: true,
1624		Comparer:                     numberComparer{},
1625		WriteBuffer:                  1000,
1626	})
1627	defer h.close()
1628
1629	h.put("[10]", "ten")
1630	h.put("[0x14]", "twenty")
1631	for i := 0; i < 2; i++ {
1632		h.getVal("[10]", "ten")
1633		h.getVal("[0xa]", "ten")
1634		h.getVal("[20]", "twenty")
1635		h.getVal("[0x14]", "twenty")
1636		h.get("[15]", false)
1637		h.get("[0xf]", false)
1638		h.compactMem()
1639		h.compactRange("[0]", "[9999]")
1640	}
1641
1642	for n := 0; n < 2; n++ {
1643		for i := 0; i < 100; i++ {
1644			v := fmt.Sprintf("[%d]", i*10)
1645			h.put(v, v)
1646		}
1647		h.compactMem()
1648		h.compactRange("[0]", "[1000000]")
1649	}
1650}
1651
1652func TestDB_ManualCompaction(t *testing.T) {
1653	h := newDbHarness(t)
1654	defer h.close()
1655
1656	h.db.memdbMaxLevel = 2
1657
1658	h.putMulti(3, "p", "q")
1659	h.tablesPerLevel("1,1,1")
1660
1661	// Compaction range falls before files
1662	h.compactRange("", "c")
1663	h.tablesPerLevel("1,1,1")
1664
1665	// Compaction range falls after files
1666	h.compactRange("r", "z")
1667	h.tablesPerLevel("1,1,1")
1668
1669	// Compaction range overlaps files
1670	h.compactRange("p1", "p9")
1671	h.tablesPerLevel("0,0,1")
1672
1673	// Populate a different range
1674	h.putMulti(3, "c", "e")
1675	h.tablesPerLevel("1,1,2")
1676
1677	// Compact just the new range
1678	h.compactRange("b", "f")
1679	h.tablesPerLevel("0,0,2")
1680
1681	// Compact all
1682	h.putMulti(1, "a", "z")
1683	h.tablesPerLevel("0,1,2")
1684	h.compactRange("", "")
1685	h.tablesPerLevel("0,0,1")
1686}
1687
1688func TestDB_BloomFilter(t *testing.T) {
1689	h := newDbHarnessWopt(t, &opt.Options{
1690		DisableLargeBatchTransaction: true,
1691		DisableBlockCache:            true,
1692		Filter:                       filter.NewBloomFilter(10),
1693	})
1694	defer h.close()
1695
1696	key := func(i int) string {
1697		return fmt.Sprintf("key%06d", i)
1698	}
1699
1700	const n = 10000
1701
1702	// Populate multiple layers
1703	for i := 0; i < n; i++ {
1704		h.put(key(i), key(i))
1705	}
1706	h.compactMem()
1707	h.compactRange("a", "z")
1708	for i := 0; i < n; i += 100 {
1709		h.put(key(i), key(i))
1710	}
1711	h.compactMem()
1712
1713	// Prevent auto compactions triggered by seeks
1714	h.stor.Stall(testutil.ModeSync, storage.TypeTable)
1715
1716	// Lookup present keys. Should rarely read from small sstable.
1717	h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable)
1718	for i := 0; i < n; i++ {
1719		h.getVal(key(i), key(i))
1720	}
1721	cnt, _ := h.stor.Counter(testutil.ModeRead, storage.TypeTable)
1722	t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt)
1723	if min, max := n, n+2*n/100; cnt < min || cnt > max {
1724		t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt)
1725	}
1726
1727	// Lookup missing keys. Should rarely read from either sstable.
1728	h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable)
1729	for i := 0; i < n; i++ {
1730		h.get(key(i)+".missing", false)
1731	}
1732	cnt, _ = h.stor.Counter(testutil.ModeRead, storage.TypeTable)
1733	t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt)
1734	if max := 3 * n / 100; cnt > max {
1735		t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt)
1736	}
1737
1738	h.stor.Release(testutil.ModeSync, storage.TypeTable)
1739}
1740
1741func TestDB_Concurrent(t *testing.T) {
1742	const n, secs, maxkey = 4, 6, 1000
1743	h := newDbHarness(t)
1744	defer h.close()
1745
1746	runtime.GOMAXPROCS(runtime.NumCPU())
1747
1748	var (
1749		closeWg sync.WaitGroup
1750		stop    uint32
1751		cnt     [n]uint32
1752	)
1753
1754	for i := 0; i < n; i++ {
1755		closeWg.Add(1)
1756		go func(i int) {
1757			var put, get, found uint
1758			defer func() {
1759				t.Logf("goroutine %d stopped after %d ops, put=%d get=%d found=%d missing=%d",
1760					i, cnt[i], put, get, found, get-found)
1761				closeWg.Done()
1762			}()
1763
1764			rnd := rand.New(rand.NewSource(int64(1000 + i)))
1765			for atomic.LoadUint32(&stop) == 0 {
1766				x := cnt[i]
1767
1768				k := rnd.Intn(maxkey)
1769				kstr := fmt.Sprintf("%016d", k)
1770
1771				if (rnd.Int() % 2) > 0 {
1772					put++
1773					h.put(kstr, fmt.Sprintf("%d.%d.%-1000d", k, i, x))
1774				} else {
1775					get++
1776					v, err := h.db.Get([]byte(kstr), h.ro)
1777					if err == nil {
1778						found++
1779						rk, ri, rx := 0, -1, uint32(0)
1780						fmt.Sscanf(string(v), "%d.%d.%d", &rk, &ri, &rx)
1781						if rk != k {
1782							t.Errorf("invalid key want=%d got=%d", k, rk)
1783						}
1784						if ri < 0 || ri >= n {
1785							t.Error("invalid goroutine number: ", ri)
1786						} else {
1787							tx := atomic.LoadUint32(&(cnt[ri]))
1788							if rx > tx {
1789								t.Errorf("invalid seq number, %d > %d ", rx, tx)
1790							}
1791						}
1792					} else if err != ErrNotFound {
1793						t.Error("Get: got error: ", err)
1794						return
1795					}
1796				}
1797				atomic.AddUint32(&cnt[i], 1)
1798			}
1799		}(i)
1800	}
1801
1802	time.Sleep(secs * time.Second)
1803	atomic.StoreUint32(&stop, 1)
1804	closeWg.Wait()
1805}
1806
1807func TestDB_ConcurrentIterator(t *testing.T) {
1808	const n, n2 = 4, 1000
1809	h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30})
1810	defer h.close()
1811
1812	runtime.GOMAXPROCS(runtime.NumCPU())
1813
1814	var (
1815		closeWg sync.WaitGroup
1816		stop    uint32
1817	)
1818
1819	for i := 0; i < n; i++ {
1820		closeWg.Add(1)
1821		go func(i int) {
1822			for k := 0; atomic.LoadUint32(&stop) == 0; k++ {
1823				h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
1824			}
1825			closeWg.Done()
1826		}(i)
1827	}
1828
1829	for i := 0; i < n; i++ {
1830		closeWg.Add(1)
1831		go func(i int) {
1832			for k := 1000000; k < 0 || atomic.LoadUint32(&stop) == 0; k-- {
1833				h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
1834			}
1835			closeWg.Done()
1836		}(i)
1837	}
1838
1839	cmp := comparer.DefaultComparer
1840	for i := 0; i < n2; i++ {
1841		closeWg.Add(1)
1842		go func(i int) {
1843			it := h.db.NewIterator(nil, nil)
1844			var pk []byte
1845			for it.Next() {
1846				kk := it.Key()
1847				if cmp.Compare(kk, pk) <= 0 {
1848					t.Errorf("iter %d: %q is successor of %q", i, pk, kk)
1849				}
1850				pk = append(pk[:0], kk...)
1851				var k, vk, vi int
1852				if n, err := fmt.Sscanf(string(it.Key()), "k%d", &k); err != nil {
1853					t.Errorf("iter %d: Scanf error on key %q: %v", i, it.Key(), err)
1854				} else if n < 1 {
1855					t.Errorf("iter %d: Cannot parse key %q", i, it.Key())
1856				}
1857				if n, err := fmt.Sscanf(string(it.Value()), "%d.%d", &vk, &vi); err != nil {
1858					t.Errorf("iter %d: Scanf error on value %q: %v", i, it.Value(), err)
1859				} else if n < 2 {
1860					t.Errorf("iter %d: Cannot parse value %q", i, it.Value())
1861				}
1862
1863				if vk != k {
1864					t.Errorf("iter %d: invalid value i=%d, want=%d got=%d", i, vi, k, vk)
1865				}
1866			}
1867			if err := it.Error(); err != nil {
1868				t.Errorf("iter %d: Got error: %v", i, err)
1869			}
1870			it.Release()
1871			closeWg.Done()
1872		}(i)
1873	}
1874
1875	atomic.StoreUint32(&stop, 1)
1876	closeWg.Wait()
1877}
1878
1879func TestDB_ConcurrentWrite(t *testing.T) {
1880	const n, bk, niter = 10, 3, 10000
1881	h := newDbHarness(t)
1882	defer h.close()
1883
1884	runtime.GOMAXPROCS(runtime.NumCPU())
1885
1886	var wg sync.WaitGroup
1887	for i := 0; i < n; i++ {
1888		wg.Add(1)
1889		go func(i int) {
1890			defer wg.Done()
1891			for k := 0; k < niter; k++ {
1892				kstr := fmt.Sprintf("put-%d.%d", i, k)
1893				vstr := fmt.Sprintf("v%d", k)
1894				h.put(kstr, vstr)
1895				// Key should immediately available after put returns.
1896				h.getVal(kstr, vstr)
1897			}
1898		}(i)
1899	}
1900	for i := 0; i < n; i++ {
1901		wg.Add(1)
1902		batch := &Batch{}
1903		go func(i int) {
1904			defer wg.Done()
1905			for k := 0; k < niter; k++ {
1906				batch.Reset()
1907				for j := 0; j < bk; j++ {
1908					batch.Put([]byte(fmt.Sprintf("batch-%d.%d.%d", i, k, j)), []byte(fmt.Sprintf("v%d", k)))
1909				}
1910				h.write(batch)
1911				// Key should immediately available after put returns.
1912				for j := 0; j < bk; j++ {
1913					h.getVal(fmt.Sprintf("batch-%d.%d.%d", i, k, j), fmt.Sprintf("v%d", k))
1914				}
1915			}
1916		}(i)
1917	}
1918	wg.Wait()
1919}
1920
1921func TestDB_CreateReopenDbOnFile(t *testing.T) {
1922	dbpath := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbtestCreateReopenDbOnFile-%d", os.Getuid()))
1923	if err := os.RemoveAll(dbpath); err != nil {
1924		t.Fatal("cannot remove old db: ", err)
1925	}
1926	defer os.RemoveAll(dbpath)
1927
1928	for i := 0; i < 3; i++ {
1929		stor, err := storage.OpenFile(dbpath, false)
1930		if err != nil {
1931			t.Fatalf("(%d) cannot open storage: %s", i, err)
1932		}
1933		db, err := Open(stor, nil)
1934		if err != nil {
1935			t.Fatalf("(%d) cannot open db: %s", i, err)
1936		}
1937		if err := db.Put([]byte("foo"), []byte("bar"), nil); err != nil {
1938			t.Fatalf("(%d) cannot write to db: %s", i, err)
1939		}
1940		if err := db.Close(); err != nil {
1941			t.Fatalf("(%d) cannot close db: %s", i, err)
1942		}
1943		if err := stor.Close(); err != nil {
1944			t.Fatalf("(%d) cannot close storage: %s", i, err)
1945		}
1946	}
1947}
1948
1949func TestDB_CreateReopenDbOnFile2(t *testing.T) {
1950	dbpath := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbtestCreateReopenDbOnFile2-%d", os.Getuid()))
1951	if err := os.RemoveAll(dbpath); err != nil {
1952		t.Fatal("cannot remove old db: ", err)
1953	}
1954	defer os.RemoveAll(dbpath)
1955
1956	for i := 0; i < 3; i++ {
1957		db, err := OpenFile(dbpath, nil)
1958		if err != nil {
1959			t.Fatalf("(%d) cannot open db: %s", i, err)
1960		}
1961		if err := db.Put([]byte("foo"), []byte("bar"), nil); err != nil {
1962			t.Fatalf("(%d) cannot write to db: %s", i, err)
1963		}
1964		if err := db.Close(); err != nil {
1965			t.Fatalf("(%d) cannot close db: %s", i, err)
1966		}
1967	}
1968}
1969
1970func TestDB_DeletionMarkersOnMemdb(t *testing.T) {
1971	h := newDbHarness(t)
1972	defer h.close()
1973
1974	h.put("foo", "v1")
1975	h.compactMem()
1976	h.delete("foo")
1977	h.get("foo", false)
1978	h.getKeyVal("")
1979}
1980
1981func TestDB_LeveldbIssue178(t *testing.T) {
1982	nKeys := (opt.DefaultCompactionTableSize / 30) * 5
1983	key1 := func(i int) string {
1984		return fmt.Sprintf("my_key_%d", i)
1985	}
1986	key2 := func(i int) string {
1987		return fmt.Sprintf("my_key_%d_xxx", i)
1988	}
1989
1990	// Disable compression since it affects the creation of layers and the
1991	// code below is trying to test against a very specific scenario.
1992	h := newDbHarnessWopt(t, &opt.Options{
1993		DisableLargeBatchTransaction: true,
1994		Compression:                  opt.NoCompression,
1995	})
1996	defer h.close()
1997
1998	// Create first key range.
1999	batch := new(Batch)
2000	for i := 0; i < nKeys; i++ {
2001		batch.Put([]byte(key1(i)), []byte("value for range 1 key"))
2002	}
2003	h.write(batch)
2004
2005	// Create second key range.
2006	batch.Reset()
2007	for i := 0; i < nKeys; i++ {
2008		batch.Put([]byte(key2(i)), []byte("value for range 2 key"))
2009	}
2010	h.write(batch)
2011
2012	// Delete second key range.
2013	batch.Reset()
2014	for i := 0; i < nKeys; i++ {
2015		batch.Delete([]byte(key2(i)))
2016	}
2017	h.write(batch)
2018	h.waitMemCompaction()
2019
2020	// Run manual compaction.
2021	h.compactRange(key1(0), key1(nKeys-1))
2022
2023	// Checking the keys.
2024	h.assertNumKeys(nKeys)
2025}
2026
2027func TestDB_LeveldbIssue200(t *testing.T) {
2028	h := newDbHarness(t)
2029	defer h.close()
2030
2031	h.put("1", "b")
2032	h.put("2", "c")
2033	h.put("3", "d")
2034	h.put("4", "e")
2035	h.put("5", "f")
2036
2037	iter := h.db.NewIterator(nil, h.ro)
2038
2039	// Add an element that should not be reflected in the iterator.
2040	h.put("25", "cd")
2041
2042	iter.Seek([]byte("5"))
2043	assertBytes(t, []byte("5"), iter.Key())
2044	iter.Prev()
2045	assertBytes(t, []byte("4"), iter.Key())
2046	iter.Prev()
2047	assertBytes(t, []byte("3"), iter.Key())
2048	iter.Next()
2049	assertBytes(t, []byte("4"), iter.Key())
2050	iter.Next()
2051	assertBytes(t, []byte("5"), iter.Key())
2052}
2053
2054func TestDB_GoleveldbIssue74(t *testing.T) {
2055	h := newDbHarnessWopt(t, &opt.Options{
2056		DisableLargeBatchTransaction: true,
2057		WriteBuffer:                  1 * opt.MiB,
2058	})
2059	defer h.close()
2060
2061	const n, dur = 10000, 5 * time.Second
2062
2063	runtime.GOMAXPROCS(runtime.NumCPU())
2064
2065	until := time.Now().Add(dur)
2066	wg := new(sync.WaitGroup)
2067	wg.Add(2)
2068	var done uint32
2069	go func() {
2070		var i int
2071		defer func() {
2072			t.Logf("WRITER DONE #%d", i)
2073			atomic.StoreUint32(&done, 1)
2074			wg.Done()
2075		}()
2076
2077		b := new(Batch)
2078		for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2079			iv := fmt.Sprintf("VAL%010d", i)
2080			for k := 0; k < n; k++ {
2081				key := fmt.Sprintf("KEY%06d", k)
2082				b.Put([]byte(key), []byte(key+iv))
2083				b.Put([]byte(fmt.Sprintf("PTR%06d", k)), []byte(key))
2084			}
2085			h.write(b)
2086
2087			b.Reset()
2088			snap := h.getSnapshot()
2089			iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil)
2090			var k int
2091			for ; iter.Next(); k++ {
2092				ptrKey := iter.Key()
2093				key := iter.Value()
2094
2095				if _, err := snap.Get(ptrKey, nil); err != nil {
2096					t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, ptrKey, err)
2097				}
2098				if value, err := snap.Get(key, nil); err != nil {
2099					t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, key, err)
2100				} else if string(value) != string(key)+iv {
2101					t.Fatalf("WRITER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+iv, value)
2102				}
2103
2104				b.Delete(key)
2105				b.Delete(ptrKey)
2106			}
2107			h.write(b)
2108			iter.Release()
2109			snap.Release()
2110			if k != n {
2111				t.Fatalf("#%d %d != %d", i, k, n)
2112			}
2113		}
2114	}()
2115	go func() {
2116		var i int
2117		defer func() {
2118			t.Logf("READER DONE #%d", i)
2119			atomic.StoreUint32(&done, 1)
2120			wg.Done()
2121		}()
2122		for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2123			snap := h.getSnapshot()
2124			iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil)
2125			var prevValue string
2126			var k int
2127			for ; iter.Next(); k++ {
2128				ptrKey := iter.Key()
2129				key := iter.Value()
2130
2131				if _, err := snap.Get(ptrKey, nil); err != nil {
2132					t.Fatalf("READER #%d snapshot.Get %q: %v", i, ptrKey, err)
2133				}
2134
2135				if value, err := snap.Get(key, nil); err != nil {
2136					t.Fatalf("READER #%d snapshot.Get %q: %v", i, key, err)
2137				} else if prevValue != "" && string(value) != string(key)+prevValue {
2138					t.Fatalf("READER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+prevValue, value)
2139				} else {
2140					prevValue = string(value[len(key):])
2141				}
2142			}
2143			iter.Release()
2144			snap.Release()
2145			if k > 0 && k != n {
2146				t.Fatalf("#%d %d != %d", i, k, n)
2147			}
2148		}
2149	}()
2150	wg.Wait()
2151}
2152
2153func TestDB_GetProperties(t *testing.T) {
2154	h := newDbHarness(t)
2155	defer h.close()
2156
2157	_, err := h.db.GetProperty("leveldb.num-files-at-level")
2158	if err == nil {
2159		t.Error("GetProperty() failed to detect missing level")
2160	}
2161
2162	_, err = h.db.GetProperty("leveldb.num-files-at-level0")
2163	if err != nil {
2164		t.Error("got unexpected error", err)
2165	}
2166
2167	_, err = h.db.GetProperty("leveldb.num-files-at-level0x")
2168	if err == nil {
2169		t.Error("GetProperty() failed to detect invalid level")
2170	}
2171}
2172
2173func TestDB_GoleveldbIssue72and83(t *testing.T) {
2174	h := newDbHarnessWopt(t, &opt.Options{
2175		DisableLargeBatchTransaction: true,
2176		WriteBuffer:                  1 * opt.MiB,
2177		OpenFilesCacheCapacity:       3,
2178	})
2179	defer h.close()
2180
2181	const n, wn, dur = 10000, 100, 30 * time.Second
2182
2183	runtime.GOMAXPROCS(runtime.NumCPU())
2184
2185	randomData := func(prefix byte, i int) []byte {
2186		data := make([]byte, 1+4+32+64+32)
2187		_, err := crand.Reader.Read(data[1 : len(data)-8])
2188		if err != nil {
2189			panic(err)
2190		}
2191		data[0] = prefix
2192		binary.LittleEndian.PutUint32(data[len(data)-8:], uint32(i))
2193		binary.LittleEndian.PutUint32(data[len(data)-4:], util.NewCRC(data[:len(data)-4]).Value())
2194		return data
2195	}
2196
2197	keys := make([][]byte, n)
2198	for i := range keys {
2199		keys[i] = randomData(1, 0)
2200	}
2201
2202	until := time.Now().Add(dur)
2203	wg := new(sync.WaitGroup)
2204	wg.Add(3)
2205	var done uint32
2206	go func() {
2207		i := 0
2208		defer func() {
2209			t.Logf("WRITER DONE #%d", i)
2210			wg.Done()
2211		}()
2212
2213		b := new(Batch)
2214		for ; i < wn && atomic.LoadUint32(&done) == 0; i++ {
2215			b.Reset()
2216			for _, k1 := range keys {
2217				k2 := randomData(2, i)
2218				b.Put(k2, randomData(42, i))
2219				b.Put(k1, k2)
2220			}
2221			if err := h.db.Write(b, h.wo); err != nil {
2222				atomic.StoreUint32(&done, 1)
2223				t.Fatalf("WRITER #%d db.Write: %v", i, err)
2224			}
2225		}
2226	}()
2227	go func() {
2228		var i int
2229		defer func() {
2230			t.Logf("READER0 DONE #%d", i)
2231			atomic.StoreUint32(&done, 1)
2232			wg.Done()
2233		}()
2234		for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2235			snap := h.getSnapshot()
2236			seq := snap.elem.seq
2237			if seq == 0 {
2238				snap.Release()
2239				continue
2240			}
2241			iter := snap.NewIterator(util.BytesPrefix([]byte{1}), nil)
2242			writei := int(seq/(n*2) - 1)
2243			var k int
2244			for ; iter.Next(); k++ {
2245				k1 := iter.Key()
2246				k2 := iter.Value()
2247				k1checksum0 := binary.LittleEndian.Uint32(k1[len(k1)-4:])
2248				k1checksum1 := util.NewCRC(k1[:len(k1)-4]).Value()
2249				if k1checksum0 != k1checksum1 {
2250					t.Fatalf("READER0 #%d.%d W#%d invalid K1 checksum: %#x != %#x", i, k, writei, k1checksum0, k1checksum0)
2251				}
2252				k2checksum0 := binary.LittleEndian.Uint32(k2[len(k2)-4:])
2253				k2checksum1 := util.NewCRC(k2[:len(k2)-4]).Value()
2254				if k2checksum0 != k2checksum1 {
2255					t.Fatalf("READER0 #%d.%d W#%d invalid K2 checksum: %#x != %#x", i, k, writei, k2checksum0, k2checksum1)
2256				}
2257				kwritei := int(binary.LittleEndian.Uint32(k2[len(k2)-8:]))
2258				if writei != kwritei {
2259					t.Fatalf("READER0 #%d.%d W#%d invalid write iteration num: %d", i, k, writei, kwritei)
2260				}
2261				if _, err := snap.Get(k2, nil); err != nil {
2262					t.Fatalf("READER0 #%d.%d W#%d snap.Get: %v\nk1: %x\n -> k2: %x", i, k, writei, err, k1, k2)
2263				}
2264			}
2265			if err := iter.Error(); err != nil {
2266				t.Fatalf("READER0 #%d.%d W#%d snap.Iterator: %v", i, k, writei, err)
2267			}
2268			iter.Release()
2269			snap.Release()
2270			if k > 0 && k != n {
2271				t.Fatalf("READER0 #%d W#%d short read, got=%d want=%d", i, writei, k, n)
2272			}
2273		}
2274	}()
2275	go func() {
2276		var i int
2277		defer func() {
2278			t.Logf("READER1 DONE #%d", i)
2279			atomic.StoreUint32(&done, 1)
2280			wg.Done()
2281		}()
2282		for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2283			iter := h.db.NewIterator(nil, nil)
2284			seq := iter.(*dbIter).seq
2285			if seq == 0 {
2286				iter.Release()
2287				continue
2288			}
2289			writei := int(seq/(n*2) - 1)
2290			var k int
2291			for ok := iter.Last(); ok; ok = iter.Prev() {
2292				k++
2293			}
2294			if err := iter.Error(); err != nil {
2295				t.Fatalf("READER1 #%d.%d W#%d db.Iterator: %v", i, k, writei, err)
2296			}
2297			iter.Release()
2298			if m := (writei+1)*n + n; k != m {
2299				t.Fatalf("READER1 #%d W#%d short read, got=%d want=%d", i, writei, k, m)
2300			}
2301		}
2302	}()
2303
2304	wg.Wait()
2305}
2306
2307func TestDB_TransientError(t *testing.T) {
2308	h := newDbHarnessWopt(t, &opt.Options{
2309		DisableLargeBatchTransaction: true,
2310		WriteBuffer:                  128 * opt.KiB,
2311		OpenFilesCacheCapacity:       3,
2312		DisableCompactionBackoff:     true,
2313	})
2314	defer h.close()
2315
2316	const (
2317		nSnap = 20
2318		nKey  = 10000
2319	)
2320
2321	var (
2322		snaps [nSnap]*Snapshot
2323		b     = &Batch{}
2324	)
2325	for i := range snaps {
2326		vtail := fmt.Sprintf("VAL%030d", i)
2327		b.Reset()
2328		for k := 0; k < nKey; k++ {
2329			key := fmt.Sprintf("KEY%8d", k)
2330			b.Put([]byte(key), []byte(key+vtail))
2331		}
2332		h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error"))
2333		if err := h.db.Write(b, nil); err != nil {
2334			t.Logf("WRITE #%d error: %v", i, err)
2335			h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
2336			for {
2337				if err := h.db.Write(b, nil); err == nil {
2338					break
2339				} else if errors.IsCorrupted(err) {
2340					t.Fatalf("WRITE #%d corrupted: %v", i, err)
2341				}
2342			}
2343		}
2344
2345		snaps[i] = h.db.newSnapshot()
2346		b.Reset()
2347		for k := 0; k < nKey; k++ {
2348			key := fmt.Sprintf("KEY%8d", k)
2349			b.Delete([]byte(key))
2350		}
2351		h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error"))
2352		if err := h.db.Write(b, nil); err != nil {
2353			t.Logf("WRITE #%d  error: %v", i, err)
2354			h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
2355			for {
2356				if err := h.db.Write(b, nil); err == nil {
2357					break
2358				} else if errors.IsCorrupted(err) {
2359					t.Fatalf("WRITE #%d corrupted: %v", i, err)
2360				}
2361			}
2362		}
2363	}
2364	h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
2365
2366	runtime.GOMAXPROCS(runtime.NumCPU())
2367
2368	rnd := rand.New(rand.NewSource(0xecafdaed))
2369	wg := &sync.WaitGroup{}
2370	for i, snap := range snaps {
2371		wg.Add(2)
2372
2373		go func(i int, snap *Snapshot, sk []int) {
2374			defer wg.Done()
2375
2376			vtail := fmt.Sprintf("VAL%030d", i)
2377			for _, k := range sk {
2378				key := fmt.Sprintf("KEY%8d", k)
2379				xvalue, err := snap.Get([]byte(key), nil)
2380				if err != nil {
2381					t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
2382				}
2383				value := key + vtail
2384				if !bytes.Equal([]byte(value), xvalue) {
2385					t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
2386				}
2387			}
2388		}(i, snap, rnd.Perm(nKey))
2389
2390		go func(i int, snap *Snapshot) {
2391			defer wg.Done()
2392
2393			vtail := fmt.Sprintf("VAL%030d", i)
2394			iter := snap.NewIterator(nil, nil)
2395			defer iter.Release()
2396			for k := 0; k < nKey; k++ {
2397				if !iter.Next() {
2398					if err := iter.Error(); err != nil {
2399						t.Fatalf("READER_ITER #%d K%d error: %v", i, k, err)
2400					} else {
2401						t.Fatalf("READER_ITER #%d K%d eoi", i, k)
2402					}
2403				}
2404				key := fmt.Sprintf("KEY%8d", k)
2405				xkey := iter.Key()
2406				if !bytes.Equal([]byte(key), xkey) {
2407					t.Fatalf("READER_ITER #%d K%d invalid key: want %q, got %q", i, k, key, xkey)
2408				}
2409				value := key + vtail
2410				xvalue := iter.Value()
2411				if !bytes.Equal([]byte(value), xvalue) {
2412					t.Fatalf("READER_ITER #%d K%d invalid value: want %q, got %q", i, k, value, xvalue)
2413				}
2414			}
2415		}(i, snap)
2416	}
2417
2418	wg.Wait()
2419}
2420
2421func TestDB_UkeyShouldntHopAcrossTable(t *testing.T) {
2422	h := newDbHarnessWopt(t, &opt.Options{
2423		DisableLargeBatchTransaction: true,
2424		WriteBuffer:                  112 * opt.KiB,
2425		CompactionTableSize:          90 * opt.KiB,
2426		CompactionExpandLimitFactor:  1,
2427	})
2428	defer h.close()
2429
2430	const (
2431		nSnap = 190
2432		nKey  = 140
2433	)
2434
2435	var (
2436		snaps [nSnap]*Snapshot
2437		b     = &Batch{}
2438	)
2439	for i := range snaps {
2440		vtail := fmt.Sprintf("VAL%030d", i)
2441		b.Reset()
2442		for k := 0; k < nKey; k++ {
2443			key := fmt.Sprintf("KEY%08d", k)
2444			b.Put([]byte(key), []byte(key+vtail))
2445		}
2446		if err := h.db.Write(b, nil); err != nil {
2447			t.Fatalf("WRITE #%d error: %v", i, err)
2448		}
2449
2450		snaps[i] = h.db.newSnapshot()
2451		b.Reset()
2452		for k := 0; k < nKey; k++ {
2453			key := fmt.Sprintf("KEY%08d", k)
2454			b.Delete([]byte(key))
2455		}
2456		if err := h.db.Write(b, nil); err != nil {
2457			t.Fatalf("WRITE #%d  error: %v", i, err)
2458		}
2459	}
2460
2461	h.compactMem()
2462
2463	h.waitCompaction()
2464	for level, tables := range h.db.s.stVersion.levels {
2465		for _, table := range tables {
2466			t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
2467		}
2468	}
2469
2470	h.compactRangeAt(0, "", "")
2471	h.waitCompaction()
2472	for level, tables := range h.db.s.stVersion.levels {
2473		for _, table := range tables {
2474			t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
2475		}
2476	}
2477	h.compactRangeAt(1, "", "")
2478	h.waitCompaction()
2479	for level, tables := range h.db.s.stVersion.levels {
2480		for _, table := range tables {
2481			t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
2482		}
2483	}
2484	runtime.GOMAXPROCS(runtime.NumCPU())
2485
2486	wg := &sync.WaitGroup{}
2487	for i, snap := range snaps {
2488		wg.Add(1)
2489
2490		go func(i int, snap *Snapshot) {
2491			defer wg.Done()
2492
2493			vtail := fmt.Sprintf("VAL%030d", i)
2494			for k := 0; k < nKey; k++ {
2495				key := fmt.Sprintf("KEY%08d", k)
2496				xvalue, err := snap.Get([]byte(key), nil)
2497				if err != nil {
2498					t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
2499				}
2500				value := key + vtail
2501				if !bytes.Equal([]byte(value), xvalue) {
2502					t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
2503				}
2504			}
2505		}(i, snap)
2506	}
2507
2508	wg.Wait()
2509}
2510
2511func TestDB_TableCompactionBuilder(t *testing.T) {
2512	gomega.RegisterTestingT(t)
2513	stor := testutil.NewStorage()
2514	stor.OnLog(testingLogger(t))
2515	stor.OnClose(testingPreserveOnFailed(t))
2516	defer stor.Close()
2517
2518	const nSeq = 99
2519
2520	o := &opt.Options{
2521		DisableLargeBatchTransaction: true,
2522		WriteBuffer:                  112 * opt.KiB,
2523		CompactionTableSize:          43 * opt.KiB,
2524		CompactionExpandLimitFactor:  1,
2525		CompactionGPOverlapsFactor:   1,
2526		DisableBlockCache:            true,
2527	}
2528	s, err := newSession(stor, o)
2529	if err != nil {
2530		t.Fatal(err)
2531	}
2532	if err := s.create(); err != nil {
2533		t.Fatal(err)
2534	}
2535	defer s.close()
2536	var (
2537		seq        uint64
2538		targetSize = 5 * o.CompactionTableSize
2539		value      = bytes.Repeat([]byte{'0'}, 100)
2540	)
2541	for i := 0; i < 2; i++ {
2542		tw, err := s.tops.create(0)
2543		if err != nil {
2544			t.Fatal(err)
2545		}
2546		for k := 0; tw.tw.BytesLen() < targetSize; k++ {
2547			key := []byte(fmt.Sprintf("%09d", k))
2548			seq += nSeq - 1
2549			for x := uint64(0); x < nSeq; x++ {
2550				if err := tw.append(makeInternalKey(nil, key, seq-x, keyTypeVal), value); err != nil {
2551					t.Fatal(err)
2552				}
2553			}
2554		}
2555		tf, err := tw.finish()
2556		if err != nil {
2557			t.Fatal(err)
2558		}
2559		rec := &sessionRecord{}
2560		rec.addTableFile(i, tf)
2561		if err := s.commit(rec, false); err != nil {
2562			t.Fatal(err)
2563		}
2564	}
2565
2566	// Build grandparent.
2567	v := s.version()
2568	c := newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...), undefinedCompaction)
2569	rec := &sessionRecord{}
2570	b := &tableCompactionBuilder{
2571		s:         s,
2572		c:         c,
2573		rec:       rec,
2574		stat1:     new(cStatStaging),
2575		minSeq:    0,
2576		strict:    true,
2577		tableSize: o.CompactionTableSize/3 + 961,
2578	}
2579	if err := b.run(new(compactionTransactCounter)); err != nil {
2580		t.Fatal(err)
2581	}
2582	for _, t := range c.levels[0] {
2583		rec.delTable(c.sourceLevel, t.fd.Num)
2584	}
2585	if err := s.commit(rec, false); err != nil {
2586		t.Fatal(err)
2587	}
2588	c.release()
2589
2590	// Build level-1.
2591	v = s.version()
2592	c = newCompaction(s, v, 0, append(tFiles{}, v.levels[0]...), undefinedCompaction)
2593	rec = &sessionRecord{}
2594	b = &tableCompactionBuilder{
2595		s:         s,
2596		c:         c,
2597		rec:       rec,
2598		stat1:     new(cStatStaging),
2599		minSeq:    0,
2600		strict:    true,
2601		tableSize: o.CompactionTableSize,
2602	}
2603	if err := b.run(new(compactionTransactCounter)); err != nil {
2604		t.Fatal(err)
2605	}
2606	for _, t := range c.levels[0] {
2607		rec.delTable(c.sourceLevel, t.fd.Num)
2608	}
2609	// Move grandparent to level-3
2610	for _, t := range v.levels[2] {
2611		rec.delTable(2, t.fd.Num)
2612		rec.addTableFile(3, t)
2613	}
2614	if err := s.commit(rec, false); err != nil {
2615		t.Fatal(err)
2616	}
2617	c.release()
2618
2619	v = s.version()
2620	for level, want := range []bool{false, true, false, true} {
2621		got := len(v.levels[level]) > 0
2622		if want != got {
2623			t.Fatalf("invalid level-%d tables len: want %v, got %v", level, want, got)
2624		}
2625	}
2626	for i, f := range v.levels[1][:len(v.levels[1])-1] {
2627		nf := v.levels[1][i+1]
2628		if bytes.Equal(f.imax.ukey(), nf.imin.ukey()) {
2629			t.Fatalf("KEY %q hop across table %d .. %d", f.imax.ukey(), f.fd.Num, nf.fd.Num)
2630		}
2631	}
2632	v.release()
2633
2634	// Compaction with transient error.
2635	v = s.version()
2636	c = newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...), undefinedCompaction)
2637	rec = &sessionRecord{}
2638	b = &tableCompactionBuilder{
2639		s:         s,
2640		c:         c,
2641		rec:       rec,
2642		stat1:     new(cStatStaging),
2643		minSeq:    0,
2644		strict:    true,
2645		tableSize: o.CompactionTableSize,
2646	}
2647	stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, errors.New("table sync error (once)"))
2648	stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0.01, errors.New("table random IO error"))
2649	for {
2650		if err := b.run(new(compactionTransactCounter)); err != nil {
2651			t.Logf("(expected) b.run: %v", err)
2652		} else {
2653			break
2654		}
2655	}
2656	if err := s.commit(rec, false); err != nil {
2657		t.Fatal(err)
2658	}
2659	c.release()
2660
2661	stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, nil)
2662	stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0, nil)
2663
2664	v = s.version()
2665	if len(v.levels[1]) != len(v.levels[2]) {
2666		t.Fatalf("invalid tables length, want %d, got %d", len(v.levels[1]), len(v.levels[2]))
2667	}
2668	for i, f0 := range v.levels[1] {
2669		f1 := v.levels[2][i]
2670		iter0 := s.tops.newIterator(f0, nil, nil)
2671		iter1 := s.tops.newIterator(f1, nil, nil)
2672		for j := 0; true; j++ {
2673			next0 := iter0.Next()
2674			next1 := iter1.Next()
2675			if next0 != next1 {
2676				t.Fatalf("#%d.%d invalid eoi: want %v, got %v", i, j, next0, next1)
2677			}
2678			key0 := iter0.Key()
2679			key1 := iter1.Key()
2680			if !bytes.Equal(key0, key1) {
2681				t.Fatalf("#%d.%d invalid key: want %q, got %q", i, j, key0, key1)
2682			}
2683			if next0 == false {
2684				break
2685			}
2686		}
2687		iter0.Release()
2688		iter1.Release()
2689	}
2690	v.release()
2691}
2692
2693func testDB_IterTriggeredCompaction(t *testing.T, limitDiv int) {
2694	const (
2695		vSize = 200 * opt.KiB
2696		tSize = 100 * opt.MiB
2697		mIter = 100
2698		n     = tSize / vSize
2699	)
2700
2701	h := newDbHarnessWopt(t, &opt.Options{
2702		DisableLargeBatchTransaction: true,
2703		Compression:                  opt.NoCompression,
2704		DisableBlockCache:            true,
2705	})
2706	defer h.close()
2707
2708	h.db.memdbMaxLevel = 2
2709
2710	key := func(x int) string {
2711		return fmt.Sprintf("v%06d", x)
2712	}
2713
2714	// Fill.
2715	value := strings.Repeat("x", vSize)
2716	for i := 0; i < n; i++ {
2717		h.put(key(i), value)
2718	}
2719	h.compactMem()
2720
2721	// Delete all.
2722	for i := 0; i < n; i++ {
2723		h.delete(key(i))
2724	}
2725	h.compactMem()
2726
2727	var (
2728		limit = n / limitDiv
2729
2730		startKey = key(0)
2731		limitKey = key(limit)
2732		maxKey   = key(n)
2733		slice    = &util.Range{Limit: []byte(limitKey)}
2734
2735		initialSize0 = h.sizeOf(startKey, limitKey)
2736		initialSize1 = h.sizeOf(limitKey, maxKey)
2737	)
2738
2739	t.Logf("initial size %s [rest %s]", shortenb(int(initialSize0)), shortenb(int(initialSize1)))
2740
2741	for r := 0; true; r++ {
2742		if r >= mIter {
2743			t.Fatal("taking too long to compact")
2744		}
2745
2746		// Iterates.
2747		iter := h.db.NewIterator(slice, h.ro)
2748		for iter.Next() {
2749		}
2750		if err := iter.Error(); err != nil {
2751			t.Fatalf("Iter err: %v", err)
2752		}
2753		iter.Release()
2754
2755		// Wait compaction.
2756		h.waitCompaction()
2757
2758		// Check size.
2759		size0 := h.sizeOf(startKey, limitKey)
2760		size1 := h.sizeOf(limitKey, maxKey)
2761		t.Logf("#%03d size %s [rest %s]", r, shortenb(int(size0)), shortenb(int(size1)))
2762		if size0 < initialSize0/10 {
2763			break
2764		}
2765	}
2766
2767	if initialSize1 > 0 {
2768		h.sizeAssert(limitKey, maxKey, initialSize1/4-opt.MiB, initialSize1+opt.MiB)
2769	}
2770}
2771
2772func TestDB_IterTriggeredCompaction(t *testing.T) {
2773	testDB_IterTriggeredCompaction(t, 1)
2774}
2775
2776func TestDB_IterTriggeredCompactionHalf(t *testing.T) {
2777	testDB_IterTriggeredCompaction(t, 2)
2778}
2779
2780func TestDB_ReadOnly(t *testing.T) {
2781	h := newDbHarness(t)
2782	defer h.close()
2783
2784	h.put("foo", "v1")
2785	h.put("bar", "v2")
2786	h.compactMem()
2787
2788	h.put("xfoo", "v1")
2789	h.put("xbar", "v2")
2790
2791	t.Log("Trigger read-only")
2792	if err := h.db.SetReadOnly(); err != nil {
2793		h.close()
2794		t.Fatalf("SetReadOnly error: %v", err)
2795	}
2796
2797	mode := testutil.ModeCreate | testutil.ModeRemove | testutil.ModeRename | testutil.ModeWrite | testutil.ModeSync
2798	h.stor.EmulateError(mode, storage.TypeAll, errors.New("read-only DB shouldn't writes"))
2799
2800	ro := func(key, value, wantValue string) {
2801		if err := h.db.Put([]byte(key), []byte(value), h.wo); err != ErrReadOnly {
2802			t.Fatalf("unexpected error: %v", err)
2803		}
2804		h.getVal(key, wantValue)
2805	}
2806
2807	ro("foo", "vx", "v1")
2808
2809	h.o.ReadOnly = true
2810	h.reopenDB()
2811
2812	ro("foo", "vx", "v1")
2813	ro("bar", "vx", "v2")
2814	h.assertNumKeys(4)
2815}
2816
2817func TestDB_BulkInsertDelete(t *testing.T) {
2818	h := newDbHarnessWopt(t, &opt.Options{
2819		DisableLargeBatchTransaction: true,
2820		Compression:                  opt.NoCompression,
2821		CompactionTableSize:          128 * opt.KiB,
2822		CompactionTotalSize:          1 * opt.MiB,
2823		WriteBuffer:                  256 * opt.KiB,
2824	})
2825	defer h.close()
2826
2827	const R = 100
2828	const N = 2500
2829	key := make([]byte, 4)
2830	value := make([]byte, 256)
2831	for i := 0; i < R; i++ {
2832		offset := N * i
2833		for j := 0; j < N; j++ {
2834			binary.BigEndian.PutUint32(key, uint32(offset+j))
2835			h.db.Put(key, value, nil)
2836		}
2837		for j := 0; j < N; j++ {
2838			binary.BigEndian.PutUint32(key, uint32(offset+j))
2839			h.db.Delete(key, nil)
2840		}
2841	}
2842
2843	h.waitCompaction()
2844	if tot := h.totalTables(); tot > 10 {
2845		t.Fatalf("too many uncompacted tables: %d (%s)", tot, h.getTablesPerLevel())
2846	}
2847}
2848
2849func TestDB_GracefulClose(t *testing.T) {
2850	runtime.GOMAXPROCS(4)
2851	h := newDbHarnessWopt(t, &opt.Options{
2852		DisableLargeBatchTransaction: true,
2853		Compression:                  opt.NoCompression,
2854		CompactionTableSize:          1 * opt.MiB,
2855		WriteBuffer:                  1 * opt.MiB,
2856	})
2857	defer h.close()
2858
2859	var closeWait sync.WaitGroup
2860
2861	// During write.
2862	n := 0
2863	closing := false
2864	for i := 0; i < 1000000; i++ {
2865		if !closing && h.totalTables() > 3 {
2866			t.Logf("close db during write, index=%d", i)
2867			closeWait.Add(1)
2868			go func() {
2869				h.closeDB()
2870				closeWait.Done()
2871			}()
2872			closing = true
2873		}
2874		if err := h.db.Put([]byte(fmt.Sprintf("%09d", i)), []byte(fmt.Sprintf("VAL-%09d", i)), h.wo); err != nil {
2875			t.Logf("Put error: %s (expected)", err)
2876			n = i
2877			break
2878		}
2879	}
2880	closeWait.Wait()
2881
2882	// During read.
2883	h.openDB()
2884	closing = false
2885	for i := 0; i < n; i++ {
2886		if !closing && i > n/2 {
2887			t.Logf("close db during read, index=%d", i)
2888			closeWait.Add(1)
2889			go func() {
2890				h.closeDB()
2891				closeWait.Done()
2892			}()
2893			closing = true
2894		}
2895		if _, err := h.db.Get([]byte(fmt.Sprintf("%09d", i)), h.ro); err != nil {
2896			t.Logf("Get error: %s (expected)", err)
2897			break
2898		}
2899	}
2900	closeWait.Wait()
2901
2902	// During iterate.
2903	h.openDB()
2904	closing = false
2905	iter := h.db.NewIterator(nil, h.ro)
2906	for i := 0; iter.Next(); i++ {
2907		if len(iter.Key()) == 0 || len(iter.Value()) == 0 {
2908			t.Error("Key or value has zero length")
2909		}
2910		if !closing {
2911			t.Logf("close db during iter, index=%d", i)
2912			closeWait.Add(1)
2913			go func() {
2914				h.closeDB()
2915				closeWait.Done()
2916			}()
2917			closing = true
2918		}
2919		time.Sleep(time.Millisecond)
2920	}
2921	if err := iter.Error(); err != nil {
2922		t.Logf("Iter error: %s (expected)", err)
2923	}
2924	iter.Release()
2925	closeWait.Wait()
2926}
2927