1package main
2
3import (
4	"crypto/rand"
5	"encoding/binary"
6	"flag"
7	"fmt"
8	"log"
9	mrand "math/rand"
10	"net/http"
11	_ "net/http/pprof"
12	"os"
13	"os/signal"
14	"path"
15	"runtime"
16	"strconv"
17	"strings"
18	"sync"
19	"sync/atomic"
20	"time"
21
22	"github.com/syndtr/goleveldb/leveldb"
23	"github.com/syndtr/goleveldb/leveldb/errors"
24	"github.com/syndtr/goleveldb/leveldb/opt"
25	"github.com/syndtr/goleveldb/leveldb/storage"
26	"github.com/syndtr/goleveldb/leveldb/table"
27	"github.com/syndtr/goleveldb/leveldb/util"
28)
29
30var (
31	dbPath                 = path.Join(os.TempDir(), "goleveldb-testdb")
32	openFilesCacheCapacity = 500
33	keyLen                 = 63
34	valueLen               = 256
35	numKeys                = arrayInt{100000, 1332, 531, 1234, 9553, 1024, 35743}
36	httpProf               = "127.0.0.1:5454"
37	transactionProb        = 0.5
38	enableBlockCache       = false
39	enableCompression      = false
40	enableBufferPool       = false
41
42	wg         = new(sync.WaitGroup)
43	done, fail uint32
44
45	bpool *util.BufferPool
46)
47
48type arrayInt []int
49
50func (a arrayInt) String() string {
51	var str string
52	for i, n := range a {
53		if i > 0 {
54			str += ","
55		}
56		str += strconv.Itoa(n)
57	}
58	return str
59}
60
61func (a *arrayInt) Set(str string) error {
62	var na arrayInt
63	for _, s := range strings.Split(str, ",") {
64		s = strings.TrimSpace(s)
65		if s != "" {
66			n, err := strconv.Atoi(s)
67			if err != nil {
68				return err
69			}
70			na = append(na, n)
71		}
72	}
73	*a = na
74	return nil
75}
76
77func init() {
78	flag.StringVar(&dbPath, "db", dbPath, "testdb path")
79	flag.IntVar(&openFilesCacheCapacity, "openfilescachecap", openFilesCacheCapacity, "open files cache capacity")
80	flag.IntVar(&keyLen, "keylen", keyLen, "key length")
81	flag.IntVar(&valueLen, "valuelen", valueLen, "value length")
82	flag.Var(&numKeys, "numkeys", "num keys")
83	flag.StringVar(&httpProf, "httpprof", httpProf, "http pprof listen addr")
84	flag.Float64Var(&transactionProb, "transactionprob", transactionProb, "probablity of writes using transaction")
85	flag.BoolVar(&enableBufferPool, "enablebufferpool", enableBufferPool, "enable buffer pool")
86	flag.BoolVar(&enableBlockCache, "enableblockcache", enableBlockCache, "enable block cache")
87	flag.BoolVar(&enableCompression, "enablecompression", enableCompression, "enable block compression")
88}
89
90func randomData(dst []byte, ns, prefix byte, i uint32, dataLen int) []byte {
91	if dataLen < (2+4+4)*2+4 {
92		panic("dataLen is too small")
93	}
94	if cap(dst) < dataLen {
95		dst = make([]byte, dataLen)
96	} else {
97		dst = dst[:dataLen]
98	}
99	half := (dataLen - 4) / 2
100	if _, err := rand.Reader.Read(dst[2 : half-8]); err != nil {
101		panic(err)
102	}
103	dst[0] = ns
104	dst[1] = prefix
105	binary.LittleEndian.PutUint32(dst[half-8:], i)
106	binary.LittleEndian.PutUint32(dst[half-8:], i)
107	binary.LittleEndian.PutUint32(dst[half-4:], util.NewCRC(dst[:half-4]).Value())
108	full := half * 2
109	copy(dst[half:full], dst[:half])
110	if full < dataLen-4 {
111		if _, err := rand.Reader.Read(dst[full : dataLen-4]); err != nil {
112			panic(err)
113		}
114	}
115	binary.LittleEndian.PutUint32(dst[dataLen-4:], util.NewCRC(dst[:dataLen-4]).Value())
116	return dst
117}
118
119func dataSplit(data []byte) (data0, data1 []byte) {
120	n := (len(data) - 4) / 2
121	return data[:n], data[n : n+n]
122}
123
124func dataNS(data []byte) byte {
125	return data[0]
126}
127
128func dataPrefix(data []byte) byte {
129	return data[1]
130}
131
132func dataI(data []byte) uint32 {
133	return binary.LittleEndian.Uint32(data[(len(data)-4)/2-8:])
134}
135
136func dataChecksum(data []byte) (uint32, uint32) {
137	checksum0 := binary.LittleEndian.Uint32(data[len(data)-4:])
138	checksum1 := util.NewCRC(data[:len(data)-4]).Value()
139	return checksum0, checksum1
140}
141
142func dataPrefixSlice(ns, prefix byte) *util.Range {
143	return util.BytesPrefix([]byte{ns, prefix})
144}
145
146func dataNsSlice(ns byte) *util.Range {
147	return util.BytesPrefix([]byte{ns})
148}
149
150type testingStorage struct {
151	storage.Storage
152}
153
154func (ts *testingStorage) scanTable(fd storage.FileDesc, checksum bool) (corrupted bool) {
155	r, err := ts.Open(fd)
156	if err != nil {
157		log.Fatal(err)
158	}
159	defer r.Close()
160
161	size, err := r.Seek(0, os.SEEK_END)
162	if err != nil {
163		log.Fatal(err)
164	}
165
166	o := &opt.Options{
167		DisableLargeBatchTransaction: true,
168		Strict: opt.NoStrict,
169	}
170	if checksum {
171		o.Strict = opt.StrictBlockChecksum | opt.StrictReader
172	}
173	tr, err := table.NewReader(r, size, fd, nil, bpool, o)
174	if err != nil {
175		log.Fatal(err)
176	}
177	defer tr.Release()
178
179	checkData := func(i int, t string, data []byte) bool {
180		if len(data) == 0 {
181			panic(fmt.Sprintf("[%v] nil data: i=%d t=%s", fd, i, t))
182		}
183
184		checksum0, checksum1 := dataChecksum(data)
185		if checksum0 != checksum1 {
186			atomic.StoreUint32(&fail, 1)
187			atomic.StoreUint32(&done, 1)
188			corrupted = true
189
190			data0, data1 := dataSplit(data)
191			data0c0, data0c1 := dataChecksum(data0)
192			data1c0, data1c1 := dataChecksum(data1)
193			log.Printf("FATAL: [%v] Corrupted data i=%d t=%s (%#x != %#x): %x(%v) vs %x(%v)",
194				fd, i, t, checksum0, checksum1, data0, data0c0 == data0c1, data1, data1c0 == data1c1)
195			return true
196		}
197		return false
198	}
199
200	iter := tr.NewIterator(nil, nil)
201	defer iter.Release()
202	for i := 0; iter.Next(); i++ {
203		ukey, _, kt, kerr := parseIkey(iter.Key())
204		if kerr != nil {
205			atomic.StoreUint32(&fail, 1)
206			atomic.StoreUint32(&done, 1)
207			corrupted = true
208
209			log.Printf("FATAL: [%v] Corrupted ikey i=%d: %v", fd, i, kerr)
210			return
211		}
212		if checkData(i, "key", ukey) {
213			return
214		}
215		if kt == ktVal && checkData(i, "value", iter.Value()) {
216			return
217		}
218	}
219	if err := iter.Error(); err != nil {
220		if errors.IsCorrupted(err) {
221			atomic.StoreUint32(&fail, 1)
222			atomic.StoreUint32(&done, 1)
223			corrupted = true
224
225			log.Printf("FATAL: [%v] Corruption detected: %v", fd, err)
226		} else {
227			log.Fatal(err)
228		}
229	}
230
231	return
232}
233
234func (ts *testingStorage) Remove(fd storage.FileDesc) error {
235	if atomic.LoadUint32(&fail) == 1 {
236		return nil
237	}
238
239	if fd.Type == storage.TypeTable {
240		if ts.scanTable(fd, true) {
241			return nil
242		}
243	}
244	return ts.Storage.Remove(fd)
245}
246
247type latencyStats struct {
248	mark          time.Time
249	dur, min, max time.Duration
250	num           int
251}
252
253func (s *latencyStats) start() {
254	s.mark = time.Now()
255}
256
257func (s *latencyStats) record(n int) {
258	if s.mark.IsZero() {
259		panic("not started")
260	}
261	dur := time.Now().Sub(s.mark)
262	dur1 := dur / time.Duration(n)
263	if dur1 < s.min || s.min == 0 {
264		s.min = dur1
265	}
266	if dur1 > s.max {
267		s.max = dur1
268	}
269	s.dur += dur
270	s.num += n
271	s.mark = time.Time{}
272}
273
274func (s *latencyStats) ratePerSec() int {
275	durSec := s.dur / time.Second
276	if durSec > 0 {
277		return s.num / int(durSec)
278	}
279	return s.num
280}
281
282func (s *latencyStats) avg() time.Duration {
283	if s.num > 0 {
284		return s.dur / time.Duration(s.num)
285	}
286	return 0
287}
288
289func (s *latencyStats) add(x *latencyStats) {
290	if x.min < s.min || s.min == 0 {
291		s.min = x.min
292	}
293	if x.max > s.max {
294		s.max = x.max
295	}
296	s.dur += x.dur
297	s.num += x.num
298}
299
300func main() {
301	flag.Parse()
302
303	if enableBufferPool {
304		bpool = util.NewBufferPool(opt.DefaultBlockSize + 128)
305	}
306
307	log.Printf("Test DB stored at %q", dbPath)
308	if httpProf != "" {
309		log.Printf("HTTP pprof listening at %q", httpProf)
310		runtime.SetBlockProfileRate(1)
311		go func() {
312			if err := http.ListenAndServe(httpProf, nil); err != nil {
313				log.Fatalf("HTTPPROF: %v", err)
314			}
315		}()
316	}
317
318	runtime.GOMAXPROCS(runtime.NumCPU())
319
320	os.RemoveAll(dbPath)
321	stor, err := storage.OpenFile(dbPath, false)
322	if err != nil {
323		log.Fatal(err)
324	}
325	tstor := &testingStorage{stor}
326	defer tstor.Close()
327
328	fatalf := func(err error, format string, v ...interface{}) {
329		atomic.StoreUint32(&fail, 1)
330		atomic.StoreUint32(&done, 1)
331		log.Printf("FATAL: "+format, v...)
332		if err != nil && errors.IsCorrupted(err) {
333			cerr := err.(*errors.ErrCorrupted)
334			if !cerr.Fd.Zero() && cerr.Fd.Type == storage.TypeTable {
335				log.Print("FATAL: corruption detected, scanning...")
336				if !tstor.scanTable(storage.FileDesc{Type: storage.TypeTable, Num: cerr.Fd.Num}, false) {
337					log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.Fd)
338				}
339			}
340		}
341		runtime.Goexit()
342	}
343
344	if openFilesCacheCapacity == 0 {
345		openFilesCacheCapacity = -1
346	}
347	o := &opt.Options{
348		OpenFilesCacheCapacity: openFilesCacheCapacity,
349		DisableBufferPool:      !enableBufferPool,
350		DisableBlockCache:      !enableBlockCache,
351		ErrorIfExist:           true,
352		Compression:            opt.NoCompression,
353	}
354	if enableCompression {
355		o.Compression = opt.DefaultCompression
356	}
357
358	db, err := leveldb.Open(tstor, o)
359	if err != nil {
360		log.Fatal(err)
361	}
362	defer db.Close()
363
364	var (
365		mu              = &sync.Mutex{}
366		gGetStat        = &latencyStats{}
367		gIterStat       = &latencyStats{}
368		gWriteStat      = &latencyStats{}
369		gTrasactionStat = &latencyStats{}
370		startTime       = time.Now()
371
372		writeReq    = make(chan *leveldb.Batch)
373		writeAck    = make(chan error)
374		writeAckAck = make(chan struct{})
375	)
376
377	go func() {
378		for b := range writeReq {
379
380			var err error
381			if mrand.Float64() < transactionProb {
382				log.Print("> Write using transaction")
383				gTrasactionStat.start()
384				var tr *leveldb.Transaction
385				if tr, err = db.OpenTransaction(); err == nil {
386					if err = tr.Write(b, nil); err == nil {
387						if err = tr.Commit(); err == nil {
388							gTrasactionStat.record(b.Len())
389						}
390					} else {
391						tr.Discard()
392					}
393				}
394			} else {
395				gWriteStat.start()
396				if err = db.Write(b, nil); err == nil {
397					gWriteStat.record(b.Len())
398				}
399			}
400			writeAck <- err
401			<-writeAckAck
402		}
403	}()
404
405	go func() {
406		for {
407			time.Sleep(3 * time.Second)
408
409			log.Print("------------------------")
410
411			log.Printf("> Elapsed=%v", time.Now().Sub(startTime))
412			mu.Lock()
413			log.Printf("> GetLatencyMin=%v GetLatencyMax=%v GetLatencyAvg=%v GetRatePerSec=%d",
414				gGetStat.min, gGetStat.max, gGetStat.avg(), gGetStat.ratePerSec())
415			log.Printf("> IterLatencyMin=%v IterLatencyMax=%v IterLatencyAvg=%v IterRatePerSec=%d",
416				gIterStat.min, gIterStat.max, gIterStat.avg(), gIterStat.ratePerSec())
417			log.Printf("> WriteLatencyMin=%v WriteLatencyMax=%v WriteLatencyAvg=%v WriteRatePerSec=%d",
418				gWriteStat.min, gWriteStat.max, gWriteStat.avg(), gWriteStat.ratePerSec())
419			log.Printf("> TransactionLatencyMin=%v TransactionLatencyMax=%v TransactionLatencyAvg=%v TransactionRatePerSec=%d",
420				gTrasactionStat.min, gTrasactionStat.max, gTrasactionStat.avg(), gTrasactionStat.ratePerSec())
421			mu.Unlock()
422
423			cachedblock, _ := db.GetProperty("leveldb.cachedblock")
424			openedtables, _ := db.GetProperty("leveldb.openedtables")
425			alivesnaps, _ := db.GetProperty("leveldb.alivesnaps")
426			aliveiters, _ := db.GetProperty("leveldb.aliveiters")
427			blockpool, _ := db.GetProperty("leveldb.blockpool")
428			log.Printf("> BlockCache=%s OpenedTables=%s AliveSnaps=%s AliveIter=%s BlockPool=%q",
429				cachedblock, openedtables, alivesnaps, aliveiters, blockpool)
430
431			log.Print("------------------------")
432		}
433	}()
434
435	for ns, numKey := range numKeys {
436		func(ns, numKey int) {
437			log.Printf("[%02d] STARTING: numKey=%d", ns, numKey)
438
439			keys := make([][]byte, numKey)
440			for i := range keys {
441				keys[i] = randomData(nil, byte(ns), 1, uint32(i), keyLen)
442			}
443
444			wg.Add(1)
445			go func() {
446				var wi uint32
447				defer func() {
448					log.Printf("[%02d] WRITER DONE #%d", ns, wi)
449					wg.Done()
450				}()
451
452				var (
453					b       = new(leveldb.Batch)
454					k2, v2  []byte
455					nReader int32
456				)
457				for atomic.LoadUint32(&done) == 0 {
458					log.Printf("[%02d] WRITER #%d", ns, wi)
459
460					b.Reset()
461					for _, k1 := range keys {
462						k2 = randomData(k2, byte(ns), 2, wi, keyLen)
463						v2 = randomData(v2, byte(ns), 3, wi, valueLen)
464						b.Put(k2, v2)
465						b.Put(k1, k2)
466					}
467					writeReq <- b
468					if err := <-writeAck; err != nil {
469						writeAckAck <- struct{}{}
470						fatalf(err, "[%02d] WRITER #%d db.Write: %v", ns, wi, err)
471					}
472
473					snap, err := db.GetSnapshot()
474					if err != nil {
475						writeAckAck <- struct{}{}
476						fatalf(err, "[%02d] WRITER #%d db.GetSnapshot: %v", ns, wi, err)
477					}
478
479					writeAckAck <- struct{}{}
480
481					wg.Add(1)
482					atomic.AddInt32(&nReader, 1)
483					go func(snapwi uint32, snap *leveldb.Snapshot) {
484						var (
485							ri       int
486							iterStat = &latencyStats{}
487							getStat  = &latencyStats{}
488						)
489						defer func() {
490							mu.Lock()
491							gGetStat.add(getStat)
492							gIterStat.add(iterStat)
493							mu.Unlock()
494
495							atomic.AddInt32(&nReader, -1)
496							log.Printf("[%02d] READER #%d.%d DONE Snap=%v Alive=%d IterLatency=%v GetLatency=%v", ns, snapwi, ri, snap, atomic.LoadInt32(&nReader), iterStat.avg(), getStat.avg())
497							snap.Release()
498							wg.Done()
499						}()
500
501						stopi := snapwi + 3
502						for (ri < 3 || atomic.LoadUint32(&wi) < stopi) && atomic.LoadUint32(&done) == 0 {
503							var n int
504							iter := snap.NewIterator(dataPrefixSlice(byte(ns), 1), nil)
505							iterStat.start()
506							for iter.Next() {
507								k1 := iter.Key()
508								k2 := iter.Value()
509								iterStat.record(1)
510
511								if dataNS(k2) != byte(ns) {
512									fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key NS: want=%d got=%d", ns, snapwi, ri, n, ns, dataNS(k2))
513								}
514
515								kwritei := dataI(k2)
516								if kwritei != snapwi {
517									fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key iter num: %d", ns, snapwi, ri, n, kwritei)
518								}
519
520								getStat.start()
521								v2, err := snap.Get(k2, nil)
522								if err != nil {
523									fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2)
524								}
525								getStat.record(1)
526
527								if checksum0, checksum1 := dataChecksum(v2); checksum0 != checksum1 {
528									err := &errors.ErrCorrupted{Fd: storage.FileDesc{0xff, 0}, Err: fmt.Errorf("v2: %x: checksum mismatch: %v vs %v", v2, checksum0, checksum1)}
529									fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2)
530								}
531
532								n++
533								iterStat.start()
534							}
535							iter.Release()
536							if err := iter.Error(); err != nil {
537								fatalf(err, "[%02d] READER #%d.%d K%d iter.Error: %v", ns, snapwi, ri, numKey, err)
538							}
539							if n != numKey {
540								fatalf(nil, "[%02d] READER #%d.%d missing keys: want=%d got=%d", ns, snapwi, ri, numKey, n)
541							}
542
543							ri++
544						}
545					}(wi, snap)
546
547					atomic.AddUint32(&wi, 1)
548				}
549			}()
550
551			delB := new(leveldb.Batch)
552			wg.Add(1)
553			go func() {
554				var (
555					i        int
556					iterStat = &latencyStats{}
557				)
558				defer func() {
559					log.Printf("[%02d] SCANNER DONE #%d", ns, i)
560					wg.Done()
561				}()
562
563				time.Sleep(2 * time.Second)
564
565				for atomic.LoadUint32(&done) == 0 {
566					var n int
567					delB.Reset()
568					iter := db.NewIterator(dataNsSlice(byte(ns)), nil)
569					iterStat.start()
570					for iter.Next() && atomic.LoadUint32(&done) == 0 {
571						k := iter.Key()
572						v := iter.Value()
573						iterStat.record(1)
574
575						for ci, x := range [...][]byte{k, v} {
576							checksum0, checksum1 := dataChecksum(x)
577							if checksum0 != checksum1 {
578								if ci == 0 {
579									fatalf(nil, "[%02d] SCANNER %d.%d invalid key checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v)
580								} else {
581									fatalf(nil, "[%02d] SCANNER %d.%d invalid value checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v)
582								}
583							}
584						}
585
586						if dataPrefix(k) == 2 || mrand.Int()%999 == 0 {
587							delB.Delete(k)
588						}
589
590						n++
591						iterStat.start()
592					}
593					iter.Release()
594					if err := iter.Error(); err != nil {
595						fatalf(err, "[%02d] SCANNER #%d.%d iter.Error: %v", ns, i, n, err)
596					}
597
598					if n > 0 {
599						log.Printf("[%02d] SCANNER #%d IterLatency=%v", ns, i, iterStat.avg())
600					}
601
602					if delB.Len() > 0 && atomic.LoadUint32(&done) == 0 {
603						t := time.Now()
604						writeReq <- delB
605						if err := <-writeAck; err != nil {
606							writeAckAck <- struct{}{}
607							fatalf(err, "[%02d] SCANNER #%d db.Write: %v", ns, i, err)
608						} else {
609							writeAckAck <- struct{}{}
610						}
611						log.Printf("[%02d] SCANNER #%d Deleted=%d Time=%v", ns, i, delB.Len(), time.Now().Sub(t))
612					}
613
614					i++
615				}
616			}()
617		}(ns, numKey)
618	}
619
620	go func() {
621		sig := make(chan os.Signal)
622		signal.Notify(sig, os.Interrupt, os.Kill)
623		log.Printf("Got signal: %v, exiting...", <-sig)
624		atomic.StoreUint32(&done, 1)
625	}()
626
627	wg.Wait()
628}
629