1// Copyright (c) 2014, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package testutil
8
9import (
10	"bytes"
11	"fmt"
12	"io"
13	"math/rand"
14	"os"
15	"path/filepath"
16	"runtime"
17	"strings"
18	"sync"
19
20	. "github.com/onsi/gomega"
21
22	"github.com/syndtr/goleveldb/leveldb/storage"
23)
24
25var (
26	storageMu     sync.Mutex
27	storageUseFS  = true
28	storageKeepFS = false
29	storageNum    int
30)
31
32type StorageMode int
33
34const (
35	ModeOpen StorageMode = 1 << iota
36	ModeCreate
37	ModeRemove
38	ModeRename
39	ModeRead
40	ModeWrite
41	ModeSync
42	ModeClose
43)
44
45const (
46	modeOpen = iota
47	modeCreate
48	modeRemove
49	modeRename
50	modeRead
51	modeWrite
52	modeSync
53	modeClose
54
55	modeCount
56)
57
58const (
59	typeManifest = iota
60	typeJournal
61	typeTable
62	typeTemp
63
64	typeCount
65)
66
67const flattenCount = modeCount * typeCount
68
69func flattenType(m StorageMode, t storage.FileType) int {
70	var x int
71	switch m {
72	case ModeOpen:
73		x = modeOpen
74	case ModeCreate:
75		x = modeCreate
76	case ModeRemove:
77		x = modeRemove
78	case ModeRename:
79		x = modeRename
80	case ModeRead:
81		x = modeRead
82	case ModeWrite:
83		x = modeWrite
84	case ModeSync:
85		x = modeSync
86	case ModeClose:
87		x = modeClose
88	default:
89		panic("invalid storage mode")
90	}
91	x *= typeCount
92	switch t {
93	case storage.TypeManifest:
94		return x + typeManifest
95	case storage.TypeJournal:
96		return x + typeJournal
97	case storage.TypeTable:
98		return x + typeTable
99	case storage.TypeTemp:
100		return x + typeTemp
101	default:
102		panic("invalid file type")
103	}
104}
105
106func listFlattenType(m StorageMode, t storage.FileType) []int {
107	ret := make([]int, 0, flattenCount)
108	add := func(x int) {
109		x *= typeCount
110		switch {
111		case t&storage.TypeManifest != 0:
112			ret = append(ret, x+typeManifest)
113		case t&storage.TypeJournal != 0:
114			ret = append(ret, x+typeJournal)
115		case t&storage.TypeTable != 0:
116			ret = append(ret, x+typeTable)
117		case t&storage.TypeTemp != 0:
118			ret = append(ret, x+typeTemp)
119		}
120	}
121	switch {
122	case m&ModeOpen != 0:
123		add(modeOpen)
124	case m&ModeCreate != 0:
125		add(modeCreate)
126	case m&ModeRemove != 0:
127		add(modeRemove)
128	case m&ModeRename != 0:
129		add(modeRename)
130	case m&ModeRead != 0:
131		add(modeRead)
132	case m&ModeWrite != 0:
133		add(modeWrite)
134	case m&ModeSync != 0:
135		add(modeSync)
136	case m&ModeClose != 0:
137		add(modeClose)
138	}
139	return ret
140}
141
142func packFile(fd storage.FileDesc) uint64 {
143	if fd.Num>>(63-typeCount) != 0 {
144		panic("overflow")
145	}
146	return uint64(fd.Num<<typeCount) | uint64(fd.Type)
147}
148
149func unpackFile(x uint64) storage.FileDesc {
150	return storage.FileDesc{
151		Type: storage.FileType(x) & storage.TypeAll,
152		Num:  int64(x >> typeCount),
153	}
154}
155
156type emulatedError struct {
157	err error
158}
159
160func (err emulatedError) Error() string {
161	return fmt.Sprintf("emulated storage error: %v", err.err)
162}
163
164type storageLock struct {
165	s *Storage
166	l storage.Locker
167}
168
169func (l storageLock) Unlock() {
170	l.l.Unlock()
171	l.s.logI("storage lock released")
172}
173
174type reader struct {
175	s  *Storage
176	fd storage.FileDesc
177	storage.Reader
178}
179
180func (r *reader) Read(p []byte) (n int, err error) {
181	err = r.s.emulateError(ModeRead, r.fd.Type)
182	if err == nil {
183		r.s.stall(ModeRead, r.fd.Type)
184		n, err = r.Reader.Read(p)
185	}
186	r.s.count(ModeRead, r.fd.Type, n)
187	if err != nil && err != io.EOF {
188		r.s.logI("read error, fd=%s n=%d err=%v", r.fd, n, err)
189	}
190	return
191}
192
193func (r *reader) ReadAt(p []byte, off int64) (n int, err error) {
194	err = r.s.emulateError(ModeRead, r.fd.Type)
195	if err == nil {
196		r.s.stall(ModeRead, r.fd.Type)
197		n, err = r.Reader.ReadAt(p, off)
198	}
199	r.s.count(ModeRead, r.fd.Type, n)
200	if err != nil && err != io.EOF {
201		r.s.logI("readAt error, fd=%s offset=%d n=%d err=%v", r.fd, off, n, err)
202	}
203	return
204}
205
206func (r *reader) Close() (err error) {
207	return r.s.fileClose(r.fd, r.Reader)
208}
209
210type writer struct {
211	s  *Storage
212	fd storage.FileDesc
213	storage.Writer
214}
215
216func (w *writer) Write(p []byte) (n int, err error) {
217	err = w.s.emulateError(ModeWrite, w.fd.Type)
218	if err == nil {
219		w.s.stall(ModeWrite, w.fd.Type)
220		n, err = w.Writer.Write(p)
221	}
222	w.s.count(ModeWrite, w.fd.Type, n)
223	if err != nil && err != io.EOF {
224		w.s.logI("write error, fd=%s n=%d err=%v", w.fd, n, err)
225	}
226	return
227}
228
229func (w *writer) Sync() (err error) {
230	err = w.s.emulateError(ModeSync, w.fd.Type)
231	if err == nil {
232		w.s.stall(ModeSync, w.fd.Type)
233		err = w.Writer.Sync()
234	}
235	w.s.count(ModeSync, w.fd.Type, 0)
236	if err != nil {
237		w.s.logI("sync error, fd=%s err=%v", w.fd, err)
238	}
239	return
240}
241
242func (w *writer) Close() (err error) {
243	return w.s.fileClose(w.fd, w.Writer)
244}
245
246type Storage struct {
247	storage.Storage
248	path    string
249	onClose func() (preserve bool, err error)
250	onLog   func(str string)
251
252	lmu sync.Mutex
253	lb  bytes.Buffer
254
255	mu   sync.Mutex
256	rand *rand.Rand
257	// Open files, true=writer, false=reader
258	opens                   map[uint64]bool
259	counters                [flattenCount]int
260	bytesCounter            [flattenCount]int64
261	emulatedError           [flattenCount]error
262	emulatedErrorOnce       [flattenCount]bool
263	emulatedRandomError     [flattenCount]error
264	emulatedRandomErrorProb [flattenCount]float64
265	stallCond               sync.Cond
266	stalled                 [flattenCount]bool
267}
268
269func (s *Storage) log(skip int, str string) {
270	s.lmu.Lock()
271	defer s.lmu.Unlock()
272	_, file, line, ok := runtime.Caller(skip + 2)
273	if ok {
274		// Truncate file name at last file name separator.
275		if index := strings.LastIndex(file, "/"); index >= 0 {
276			file = file[index+1:]
277		} else if index = strings.LastIndex(file, "\\"); index >= 0 {
278			file = file[index+1:]
279		}
280	} else {
281		file = "???"
282		line = 1
283	}
284	fmt.Fprintf(&s.lb, "%s:%d: ", file, line)
285	lines := strings.Split(str, "\n")
286	if l := len(lines); l > 1 && lines[l-1] == "" {
287		lines = lines[:l-1]
288	}
289	for i, line := range lines {
290		if i > 0 {
291			s.lb.WriteString("\n\t")
292		}
293		s.lb.WriteString(line)
294	}
295	if s.onLog != nil {
296		s.onLog(s.lb.String())
297		s.lb.Reset()
298	} else {
299		s.lb.WriteByte('\n')
300	}
301}
302
303func (s *Storage) logISkip(skip int, format string, args ...interface{}) {
304	pc, _, _, ok := runtime.Caller(skip + 1)
305	if ok {
306		if f := runtime.FuncForPC(pc); f != nil {
307			fname := f.Name()
308			if index := strings.LastIndex(fname, "."); index >= 0 {
309				fname = fname[index+1:]
310			}
311			format = fname + ": " + format
312		}
313	}
314	s.log(skip+1, fmt.Sprintf(format, args...))
315}
316
317func (s *Storage) logI(format string, args ...interface{}) {
318	s.logISkip(1, format, args...)
319}
320
321func (s *Storage) OnLog(onLog func(log string)) {
322	s.lmu.Lock()
323	s.onLog = onLog
324	if s.lb.Len() != 0 {
325		log := s.lb.String()
326		s.onLog(log[:len(log)-1])
327		s.lb.Reset()
328	}
329	s.lmu.Unlock()
330}
331
332func (s *Storage) Log(str string) {
333	s.log(1, "Log: "+str)
334	s.Storage.Log(str)
335}
336
337func (s *Storage) Lock() (l storage.Locker, err error) {
338	l, err = s.Storage.Lock()
339	if err != nil {
340		s.logI("storage locking failed, err=%v", err)
341	} else {
342		s.logI("storage locked")
343		l = storageLock{s, l}
344	}
345	return
346}
347
348func (s *Storage) List(t storage.FileType) (fds []storage.FileDesc, err error) {
349	fds, err = s.Storage.List(t)
350	if err != nil {
351		s.logI("list failed, err=%v", err)
352		return
353	}
354	s.logI("list, type=0x%x count=%d", int(t), len(fds))
355	return
356}
357
358func (s *Storage) GetMeta() (fd storage.FileDesc, err error) {
359	fd, err = s.Storage.GetMeta()
360	if err != nil {
361		if !os.IsNotExist(err) {
362			s.logI("get meta failed, err=%v", err)
363		}
364		return
365	}
366	s.logI("get meta, fd=%s", fd)
367	return
368}
369
370func (s *Storage) SetMeta(fd storage.FileDesc) error {
371	ExpectWithOffset(1, fd.Type).To(Equal(storage.TypeManifest))
372	err := s.Storage.SetMeta(fd)
373	if err != nil {
374		s.logI("set meta failed, fd=%s err=%v", fd, err)
375	} else {
376		s.logI("set meta, fd=%s", fd)
377	}
378	return err
379}
380
381func (s *Storage) fileClose(fd storage.FileDesc, closer io.Closer) (err error) {
382	err = s.emulateError(ModeClose, fd.Type)
383	if err == nil {
384		s.stall(ModeClose, fd.Type)
385	}
386	x := packFile(fd)
387	s.mu.Lock()
388	defer s.mu.Unlock()
389	if err == nil {
390		ExpectWithOffset(2, s.opens).To(HaveKey(x), "File closed, fd=%s", fd)
391		err = closer.Close()
392	}
393	s.countNB(ModeClose, fd.Type, 0)
394	writer := s.opens[x]
395	if err != nil {
396		s.logISkip(1, "file close failed, fd=%s writer=%v err=%v", fd, writer, err)
397	} else {
398		s.logISkip(1, "file closed, fd=%s writer=%v", fd, writer)
399		delete(s.opens, x)
400	}
401	return
402}
403
404func (s *Storage) assertOpen(fd storage.FileDesc) {
405	x := packFile(fd)
406	ExpectWithOffset(2, s.opens).NotTo(HaveKey(x), "File open, fd=%s writer=%v", fd, s.opens[x])
407}
408
409func (s *Storage) Open(fd storage.FileDesc) (r storage.Reader, err error) {
410	err = s.emulateError(ModeOpen, fd.Type)
411	if err == nil {
412		s.stall(ModeOpen, fd.Type)
413	}
414	s.mu.Lock()
415	defer s.mu.Unlock()
416	if err == nil {
417		s.assertOpen(fd)
418		s.countNB(ModeOpen, fd.Type, 0)
419		r, err = s.Storage.Open(fd)
420	}
421	if err != nil {
422		s.logI("file open failed, fd=%s err=%v", fd, err)
423	} else {
424		s.logI("file opened, fd=%s", fd)
425		s.opens[packFile(fd)] = false
426		r = &reader{s, fd, r}
427	}
428	return
429}
430
431func (s *Storage) Create(fd storage.FileDesc) (w storage.Writer, err error) {
432	err = s.emulateError(ModeCreate, fd.Type)
433	if err == nil {
434		s.stall(ModeCreate, fd.Type)
435	}
436	s.mu.Lock()
437	defer s.mu.Unlock()
438	if err == nil {
439		s.assertOpen(fd)
440		s.countNB(ModeCreate, fd.Type, 0)
441		w, err = s.Storage.Create(fd)
442	}
443	if err != nil {
444		s.logI("file create failed, fd=%s err=%v", fd, err)
445	} else {
446		s.logI("file created, fd=%s", fd)
447		s.opens[packFile(fd)] = true
448		w = &writer{s, fd, w}
449	}
450	return
451}
452
453func (s *Storage) Remove(fd storage.FileDesc) (err error) {
454	err = s.emulateError(ModeRemove, fd.Type)
455	if err == nil {
456		s.stall(ModeRemove, fd.Type)
457	}
458	s.mu.Lock()
459	defer s.mu.Unlock()
460	if err == nil {
461		s.assertOpen(fd)
462		s.countNB(ModeRemove, fd.Type, 0)
463		err = s.Storage.Remove(fd)
464	}
465	if err != nil {
466		s.logI("file remove failed, fd=%s err=%v", fd, err)
467	} else {
468		s.logI("file removed, fd=%s", fd)
469	}
470	return
471}
472
473func (s *Storage) ForceRemove(fd storage.FileDesc) (err error) {
474	s.countNB(ModeRemove, fd.Type, 0)
475	if err = s.Storage.Remove(fd); err != nil {
476		s.logI("file remove failed (forced), fd=%s err=%v", fd, err)
477	} else {
478		s.logI("file removed (forced), fd=%s", fd)
479	}
480	return
481}
482
483func (s *Storage) Rename(oldfd, newfd storage.FileDesc) (err error) {
484	err = s.emulateError(ModeRename, oldfd.Type)
485	if err == nil {
486		s.stall(ModeRename, oldfd.Type)
487	}
488	s.mu.Lock()
489	defer s.mu.Unlock()
490	if err == nil {
491		s.assertOpen(oldfd)
492		s.assertOpen(newfd)
493		s.countNB(ModeRename, oldfd.Type, 0)
494		err = s.Storage.Rename(oldfd, newfd)
495	}
496	if err != nil {
497		s.logI("file rename failed, oldfd=%s newfd=%s err=%v", oldfd, newfd, err)
498	} else {
499		s.logI("file renamed, oldfd=%s newfd=%s", oldfd, newfd)
500	}
501	return
502}
503
504func (s *Storage) ForceRename(oldfd, newfd storage.FileDesc) (err error) {
505	s.countNB(ModeRename, oldfd.Type, 0)
506	if err = s.Storage.Rename(oldfd, newfd); err != nil {
507		s.logI("file rename failed (forced), oldfd=%s newfd=%s err=%v", oldfd, newfd, err)
508	} else {
509		s.logI("file renamed (forced), oldfd=%s newfd=%s", oldfd, newfd)
510	}
511	return
512}
513
514func (s *Storage) openFiles() string {
515	out := "Open files:"
516	for x, writer := range s.opens {
517		fd := unpackFile(x)
518		out += fmt.Sprintf("\n · fd=%s writer=%v", fd, writer)
519	}
520	return out
521}
522
523func (s *Storage) CloseCheck() {
524	s.mu.Lock()
525	defer s.mu.Unlock()
526	ExpectWithOffset(1, s.opens).To(BeEmpty(), s.openFiles())
527}
528
529func (s *Storage) OnClose(onClose func() (preserve bool, err error)) {
530	s.mu.Lock()
531	s.onClose = onClose
532	s.mu.Unlock()
533}
534
535func (s *Storage) Close() error {
536	s.mu.Lock()
537	defer s.mu.Unlock()
538	ExpectWithOffset(1, s.opens).To(BeEmpty(), s.openFiles())
539	err := s.Storage.Close()
540	if err != nil {
541		s.logI("storage closing failed, err=%v", err)
542	} else {
543		s.logI("storage closed")
544	}
545	var preserve bool
546	if s.onClose != nil {
547		var err0 error
548		if preserve, err0 = s.onClose(); err0 != nil {
549			s.logI("onClose error, err=%v", err0)
550		}
551	}
552	if s.path != "" {
553		if storageKeepFS || preserve {
554			s.logI("storage is preserved, path=%v", s.path)
555		} else {
556			if err1 := os.RemoveAll(s.path); err1 != nil {
557				s.logI("cannot remove storage, err=%v", err1)
558			} else {
559				s.logI("storage has been removed")
560			}
561		}
562	}
563	return err
564}
565
566func (s *Storage) countNB(m StorageMode, t storage.FileType, n int) {
567	s.counters[flattenType(m, t)]++
568	s.bytesCounter[flattenType(m, t)] += int64(n)
569}
570
571func (s *Storage) count(m StorageMode, t storage.FileType, n int) {
572	s.mu.Lock()
573	defer s.mu.Unlock()
574	s.countNB(m, t, n)
575}
576
577func (s *Storage) ResetCounter(m StorageMode, t storage.FileType) {
578	for _, x := range listFlattenType(m, t) {
579		s.counters[x] = 0
580		s.bytesCounter[x] = 0
581	}
582}
583
584func (s *Storage) Counter(m StorageMode, t storage.FileType) (count int, bytes int64) {
585	for _, x := range listFlattenType(m, t) {
586		count += s.counters[x]
587		bytes += s.bytesCounter[x]
588	}
589	return
590}
591
592func (s *Storage) emulateError(m StorageMode, t storage.FileType) error {
593	s.mu.Lock()
594	defer s.mu.Unlock()
595	x := flattenType(m, t)
596	if err := s.emulatedError[x]; err != nil {
597		if s.emulatedErrorOnce[x] {
598			s.emulatedError[x] = nil
599		}
600		return emulatedError{err}
601	}
602	if err := s.emulatedRandomError[x]; err != nil && s.rand.Float64() < s.emulatedRandomErrorProb[x] {
603		return emulatedError{err}
604	}
605	return nil
606}
607
608func (s *Storage) EmulateError(m StorageMode, t storage.FileType, err error) {
609	s.mu.Lock()
610	defer s.mu.Unlock()
611	for _, x := range listFlattenType(m, t) {
612		s.emulatedError[x] = err
613		s.emulatedErrorOnce[x] = false
614	}
615}
616
617func (s *Storage) EmulateErrorOnce(m StorageMode, t storage.FileType, err error) {
618	s.mu.Lock()
619	defer s.mu.Unlock()
620	for _, x := range listFlattenType(m, t) {
621		s.emulatedError[x] = err
622		s.emulatedErrorOnce[x] = true
623	}
624}
625
626func (s *Storage) EmulateRandomError(m StorageMode, t storage.FileType, prob float64, err error) {
627	s.mu.Lock()
628	defer s.mu.Unlock()
629	for _, x := range listFlattenType(m, t) {
630		s.emulatedRandomError[x] = err
631		s.emulatedRandomErrorProb[x] = prob
632	}
633}
634
635func (s *Storage) stall(m StorageMode, t storage.FileType) {
636	x := flattenType(m, t)
637	s.mu.Lock()
638	defer s.mu.Unlock()
639	for s.stalled[x] {
640		s.stallCond.Wait()
641	}
642}
643
644func (s *Storage) Stall(m StorageMode, t storage.FileType) {
645	s.mu.Lock()
646	defer s.mu.Unlock()
647	for _, x := range listFlattenType(m, t) {
648		s.stalled[x] = true
649	}
650}
651
652func (s *Storage) Release(m StorageMode, t storage.FileType) {
653	s.mu.Lock()
654	defer s.mu.Unlock()
655	for _, x := range listFlattenType(m, t) {
656		s.stalled[x] = false
657	}
658	s.stallCond.Broadcast()
659}
660
661func NewStorage() *Storage {
662	var (
663		stor storage.Storage
664		path string
665	)
666	if storageUseFS {
667		for {
668			storageMu.Lock()
669			num := storageNum
670			storageNum++
671			storageMu.Unlock()
672			path = filepath.Join(os.TempDir(), fmt.Sprintf("goleveldb-test%d0%d0%d", os.Getuid(), os.Getpid(), num))
673			if _, err := os.Stat(path); os.IsNotExist(err) {
674				stor, err = storage.OpenFile(path, false)
675				ExpectWithOffset(1, err).NotTo(HaveOccurred(), "creating storage at %s", path)
676				break
677			}
678		}
679	} else {
680		stor = storage.NewMemStorage()
681	}
682	s := &Storage{
683		Storage: stor,
684		path:    path,
685		rand:    NewRand(),
686		opens:   make(map[uint64]bool),
687	}
688	s.stallCond.L = &s.mu
689	if s.path != "" {
690		s.logI("using FS storage")
691		s.logI("storage path: %s", s.path)
692	} else {
693		s.logI("using MEM storage")
694	}
695	return s
696}
697