1package leveldb
2
3import (
4	"encoding/binary"
5	"math/rand"
6	"reflect"
7	"sync"
8	"testing"
9	"time"
10
11	"github.com/onsi/gomega"
12	"github.com/syndtr/goleveldb/leveldb/storage"
13	"github.com/syndtr/goleveldb/leveldb/testutil"
14)
15
16type testFileRec struct {
17	level int
18	num   int64
19}
20
21func TestVersionStaging(t *testing.T) {
22	gomega.RegisterTestingT(t)
23	stor := testutil.NewStorage()
24	defer stor.Close()
25	s, err := newSession(stor, nil)
26	if err != nil {
27		t.Fatal(err)
28	}
29	defer func() {
30		s.close()
31		s.release()
32	}()
33
34	v := newVersion(s)
35	v.newStaging()
36
37	tmp := make([]byte, 4)
38	mik := func(i uint64) []byte {
39		binary.BigEndian.PutUint32(tmp, uint32(i))
40		return []byte(makeInternalKey(nil, tmp, 0, keyTypeVal))
41	}
42
43	for i, x := range []struct {
44		add, del []testFileRec
45		trivial  bool
46		levels   [][]int64
47	}{
48		{
49			add: []testFileRec{
50				{1, 1},
51			},
52			levels: [][]int64{
53				{},
54				{1},
55			},
56		},
57		{
58			add: []testFileRec{
59				{1, 1},
60			},
61			levels: [][]int64{
62				{},
63				{1},
64			},
65		},
66		{
67			del: []testFileRec{
68				{1, 1},
69			},
70			levels: [][]int64{},
71		},
72		{
73			add: []testFileRec{
74				{0, 1},
75				{0, 3},
76				{0, 2},
77				{2, 5},
78				{1, 4},
79			},
80			levels: [][]int64{
81				{3, 2, 1},
82				{4},
83				{5},
84			},
85		},
86		{
87			add: []testFileRec{
88				{1, 6},
89				{2, 5},
90			},
91			del: []testFileRec{
92				{0, 1},
93				{0, 4},
94			},
95			levels: [][]int64{
96				{3, 2},
97				{4, 6},
98				{5},
99			},
100		},
101		{
102			del: []testFileRec{
103				{0, 3},
104				{0, 2},
105				{1, 4},
106				{1, 6},
107				{2, 5},
108			},
109			levels: [][]int64{},
110		},
111		{
112			add: []testFileRec{
113				{0, 1},
114			},
115			levels: [][]int64{
116				{1},
117			},
118		},
119		{
120			add: []testFileRec{
121				{1, 2},
122			},
123			levels: [][]int64{
124				{1},
125				{2},
126			},
127		},
128		{
129			add: []testFileRec{
130				{0, 3},
131			},
132			levels: [][]int64{
133				{3, 1},
134				{2},
135			},
136		},
137		{
138			add: []testFileRec{
139				{6, 9},
140			},
141			levels: [][]int64{
142				{3, 1},
143				{2},
144				{},
145				{},
146				{},
147				{},
148				{9},
149			},
150		},
151		{
152			del: []testFileRec{
153				{6, 9},
154			},
155			levels: [][]int64{
156				{3, 1},
157				{2},
158			},
159		},
160		// memory compaction
161		{
162			add: []testFileRec{
163				{0, 5},
164			},
165			trivial: true,
166			levels: [][]int64{
167				{5, 3, 1},
168				{2},
169			},
170		},
171		// memory compaction
172		{
173			add: []testFileRec{
174				{0, 4},
175			},
176			trivial: true,
177			levels: [][]int64{
178				{5, 4, 3, 1},
179				{2},
180			},
181		},
182		// table compaction
183		{
184			add: []testFileRec{
185				{1, 6},
186				{1, 7},
187				{1, 8},
188			},
189			del: []testFileRec{
190				{0, 3},
191				{0, 4},
192				{0, 5},
193			},
194			trivial: true,
195			levels: [][]int64{
196				{1},
197				{2, 6, 7, 8},
198			},
199		},
200	} {
201		rec := &sessionRecord{}
202		for _, f := range x.add {
203			ik := mik(uint64(f.num))
204			rec.addTable(f.level, f.num, 1, ik, ik)
205		}
206		for _, f := range x.del {
207			rec.delTable(f.level, f.num)
208		}
209		vs := v.newStaging()
210		vs.commit(rec)
211		v = vs.finish(x.trivial)
212		if len(v.levels) != len(x.levels) {
213			t.Fatalf("#%d: invalid level count: want=%d got=%d", i, len(x.levels), len(v.levels))
214		}
215		for j, want := range x.levels {
216			tables := v.levels[j]
217			if len(want) != len(tables) {
218				t.Fatalf("#%d.%d: invalid tables count: want=%d got=%d", i, j, len(want), len(tables))
219			}
220			got := make([]int64, len(tables))
221			for k, t := range tables {
222				got[k] = t.fd.Num
223			}
224			if !reflect.DeepEqual(want, got) {
225				t.Fatalf("#%d.%d: invalid tables: want=%v got=%v", i, j, want, got)
226			}
227		}
228	}
229}
230
231func TestVersionReference(t *testing.T) {
232	gomega.RegisterTestingT(t)
233	stor := testutil.NewStorage()
234	defer stor.Close()
235	s, err := newSession(stor, nil)
236	if err != nil {
237		t.Fatal(err)
238	}
239	defer func() {
240		s.close()
241		s.release()
242	}()
243
244	tmp := make([]byte, 4)
245	mik := func(i uint64) []byte {
246		binary.BigEndian.PutUint32(tmp, uint32(i))
247		return []byte(makeInternalKey(nil, tmp, 0, keyTypeVal))
248	}
249
250	// Test normal version task correctness
251	refc := make(chan map[int64]int)
252
253	for i, x := range []struct {
254		add, del []testFileRec
255		expect   map[int64]int
256		failed   bool
257	}{
258		{
259			[]testFileRec{{0, 1}, {0, 2}},
260			nil,
261			map[int64]int{1: 1, 2: 1},
262			false,
263		},
264		{
265			[]testFileRec{{0, 3}, {0, 4}},
266			[]testFileRec{{0, 1}},
267			map[int64]int{2: 1, 3: 1, 4: 1},
268			false,
269		},
270		{
271			[]testFileRec{{0, 1}, {0, 5}, {0, 6}, {0, 7}},
272			[]testFileRec{{0, 2}, {0, 3}, {0, 4}},
273			map[int64]int{1: 1, 5: 1, 6: 1, 7: 1},
274			false,
275		},
276		{
277			nil,
278			nil,
279			map[int64]int{1: 1, 5: 1, 6: 1, 7: 1},
280			true,
281		},
282		{
283			[]testFileRec{{0, 1}, {0, 5}, {0, 6}, {0, 7}},
284			nil,
285			map[int64]int{1: 2, 5: 2, 6: 2, 7: 2},
286			false,
287		},
288		{
289			nil,
290			[]testFileRec{{0, 1}, {0, 5}, {0, 6}, {0, 7}},
291			map[int64]int{1: 1, 5: 1, 6: 1, 7: 1},
292			false,
293		},
294		{
295			[]testFileRec{{0, 0}},
296			[]testFileRec{{0, 1}, {0, 5}, {0, 6}, {0, 7}},
297			map[int64]int{0: 1},
298			false,
299		},
300	} {
301		rec := &sessionRecord{}
302		for n, f := range x.add {
303			rec.addTable(f.level, f.num, 1, mik(uint64(i+n)), mik(uint64(i+n)))
304		}
305		for _, f := range x.del {
306			rec.delTable(f.level, f.num)
307		}
308
309		// Simulate some read operations
310		var wg sync.WaitGroup
311		readN := rand.Intn(300)
312		for i := 0; i < readN; i++ {
313			wg.Add(1)
314			go func() {
315				v := s.version()
316				time.Sleep(time.Millisecond * time.Duration(rand.Intn(300)))
317				v.release()
318				wg.Done()
319			}()
320		}
321
322		v := s.version()
323		vs := v.newStaging()
324		vs.commit(rec)
325		nv := vs.finish(false)
326
327		if x.failed {
328			s.abandon <- nv.id
329		} else {
330			s.setVersion(rec, nv)
331		}
332		v.release()
333
334		// Wait all read operations
335		wg.Wait()
336
337		time.Sleep(100 * time.Millisecond) // Wait lazy reference finish tasks
338
339		s.fileRefCh <- refc
340		ref := <-refc
341		if !reflect.DeepEqual(ref, x.expect) {
342			t.Errorf("case %d failed, file reference mismatch, GOT %v, WANT %v", i, ref, x.expect)
343		}
344	}
345
346	// Test version task overflow
347	var longV = s.version() // This version is held by some long-time operation
348	var exp = map[int64]int{0: 1, maxCachedNumber: 1}
349	for i := 1; i <= maxCachedNumber; i++ {
350		rec := &sessionRecord{}
351		rec.addTable(0, int64(i), 1, mik(uint64(i)), mik(uint64(i)))
352		rec.delTable(0, int64(i-1))
353		v := s.version()
354		vs := v.newStaging()
355		vs.commit(rec)
356		nv := vs.finish(false)
357		s.setVersion(rec, nv)
358		v.release()
359	}
360	time.Sleep(100 * time.Millisecond) // Wait lazy reference finish tasks
361
362	s.fileRefCh <- refc
363	ref := <-refc
364	if !reflect.DeepEqual(exp, ref) {
365		t.Errorf("file reference mismatch, GOT %v, WANT %v", ref, exp)
366	}
367
368	longV.release()
369	s.fileRefCh <- refc
370	ref = <-refc
371	delete(exp, 0)
372	if !reflect.DeepEqual(exp, ref) {
373		t.Errorf("file reference mismatch, GOT %v, WANT %v", ref, exp)
374	}
375}
376
377func BenchmarkVersionStagingNonTrivial(b *testing.B) {
378	benchmarkVersionStaging(b, false, 100000)
379}
380
381func BenchmarkVersionStagingTrivial(b *testing.B) {
382	benchmarkVersionStaging(b, true, 100000)
383}
384
385func benchmarkVersionStaging(b *testing.B, trivial bool, size int) {
386	stor := storage.NewMemStorage()
387	defer stor.Close()
388	s, err := newSession(stor, nil)
389	if err != nil {
390		b.Fatal(err)
391	}
392	defer func() {
393		s.close()
394		s.release()
395	}()
396
397	tmp := make([]byte, 4)
398	mik := func(i uint64) []byte {
399		binary.BigEndian.PutUint32(tmp, uint32(i))
400		return []byte(makeInternalKey(nil, tmp, 0, keyTypeVal))
401	}
402
403	rec := &sessionRecord{}
404	for i := 0; i < size; i++ {
405		ik := mik(uint64(i))
406		rec.addTable(1, int64(i), 1, ik, ik)
407	}
408
409	v := newVersion(s)
410	vs := v.newStaging()
411	vs.commit(rec)
412	v = vs.finish(false)
413
414	b.ResetTimer()
415	b.ReportAllocs()
416
417	for i := 0; i < b.N; i++ {
418		rec := &sessionRecord{}
419		index := rand.Intn(size)
420		ik := mik(uint64(index))
421
422		cnt := 0
423		for j := index; j < size && cnt <= 3; j++ {
424			rec.addTable(1, int64(i), 1, ik, ik)
425			cnt += 1
426		}
427		vs := v.newStaging()
428		vs.commit(rec)
429		vs.finish(trivial)
430	}
431}
432