1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"fmt"
11	"io"
12	"os"
13	"sync"
14
15	"github.com/syndtr/goleveldb/leveldb/errors"
16	"github.com/syndtr/goleveldb/leveldb/journal"
17	"github.com/syndtr/goleveldb/leveldb/opt"
18	"github.com/syndtr/goleveldb/leveldb/storage"
19)
20
21// ErrManifestCorrupted records manifest corruption. This error will be
22// wrapped with errors.ErrCorrupted.
23type ErrManifestCorrupted struct {
24	Field  string
25	Reason string
26}
27
28func (e *ErrManifestCorrupted) Error() string {
29	return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason)
30}
31
32func newErrManifestCorrupted(fd storage.FileDesc, field, reason string) error {
33	return errors.NewErrCorrupted(fd, &ErrManifestCorrupted{field, reason})
34}
35
36// session represent a persistent database session.
37type session struct {
38	// Need 64-bit alignment.
39	stNextFileNum    int64 // current unused file number
40	stJournalNum     int64 // current journal file number; need external synchronization
41	stPrevJournalNum int64 // prev journal file number; no longer used; for compatibility with older version of leveldb
42	stTempFileNum    int64
43	stSeqNum         uint64 // last mem compacted seq; need external synchronization
44
45	stor     *iStorage
46	storLock storage.Locker
47	o        *cachedOptions
48	icmp     *iComparer
49	tops     *tOps
50
51	manifest       *journal.Writer
52	manifestWriter storage.Writer
53	manifestFd     storage.FileDesc
54
55	stCompPtrs  []internalKey // compaction pointers; need external synchronization
56	stVersion   *version      // current version
57	ntVersionId int64         // next version id to assign
58	refCh       chan *vTask
59	relCh       chan *vTask
60	deltaCh     chan *vDelta
61	abandon     chan int64
62	closeC      chan struct{}
63	closeW      sync.WaitGroup
64	vmu         sync.Mutex
65
66	// Testing fields
67	fileRefCh chan chan map[int64]int // channel used to pass current reference stat
68}
69
70// Creates new initialized session instance.
71func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
72	if stor == nil {
73		return nil, os.ErrInvalid
74	}
75	storLock, err := stor.Lock()
76	if err != nil {
77		return
78	}
79	s = &session{
80		stor:      newIStorage(stor),
81		storLock:  storLock,
82		refCh:     make(chan *vTask),
83		relCh:     make(chan *vTask),
84		deltaCh:   make(chan *vDelta),
85		abandon:   make(chan int64),
86		fileRefCh: make(chan chan map[int64]int),
87		closeC:    make(chan struct{}),
88	}
89	s.setOptions(o)
90	s.tops = newTableOps(s)
91
92	s.closeW.Add(1)
93	go s.refLoop()
94	s.setVersion(nil, newVersion(s))
95	s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
96	return
97}
98
99// Close session.
100func (s *session) close() {
101	s.tops.close()
102	if s.manifest != nil {
103		s.manifest.Close()
104	}
105	if s.manifestWriter != nil {
106		s.manifestWriter.Close()
107	}
108	s.manifest = nil
109	s.manifestWriter = nil
110	s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionId})
111
112	// Close all background goroutines
113	close(s.closeC)
114	s.closeW.Wait()
115}
116
117// Release session lock.
118func (s *session) release() {
119	s.storLock.Unlock()
120}
121
122// Create a new database session; need external synchronization.
123func (s *session) create() error {
124	// create manifest
125	return s.newManifest(nil, nil)
126}
127
128// Recover a database session; need external synchronization.
129func (s *session) recover() (err error) {
130	defer func() {
131		if os.IsNotExist(err) {
132			// Don't return os.ErrNotExist if the underlying storage contains
133			// other files that belong to LevelDB. So the DB won't get trashed.
134			if fds, _ := s.stor.List(storage.TypeAll); len(fds) > 0 {
135				err = &errors.ErrCorrupted{Err: errors.New("database entry point either missing or corrupted")}
136			}
137		}
138	}()
139
140	fd, err := s.stor.GetMeta()
141	if err != nil {
142		return
143	}
144
145	reader, err := s.stor.Open(fd)
146	if err != nil {
147		return
148	}
149	defer reader.Close()
150
151	var (
152		// Options.
153		strict = s.o.GetStrict(opt.StrictManifest)
154
155		jr      = journal.NewReader(reader, dropper{s, fd}, strict, true)
156		rec     = &sessionRecord{}
157		staging = s.stVersion.newStaging()
158	)
159	for {
160		var r io.Reader
161		r, err = jr.Next()
162		if err != nil {
163			if err == io.EOF {
164				err = nil
165				break
166			}
167			return errors.SetFd(err, fd)
168		}
169
170		err = rec.decode(r)
171		if err == nil {
172			// save compact pointers
173			for _, r := range rec.compPtrs {
174				s.setCompPtr(r.level, internalKey(r.ikey))
175			}
176			// commit record to version staging
177			staging.commit(rec)
178		} else {
179			err = errors.SetFd(err, fd)
180			if strict || !errors.IsCorrupted(err) {
181				return
182			}
183			s.logf("manifest error: %v (skipped)", errors.SetFd(err, fd))
184		}
185		rec.resetCompPtrs()
186		rec.resetAddedTables()
187		rec.resetDeletedTables()
188	}
189
190	switch {
191	case !rec.has(recComparer):
192		return newErrManifestCorrupted(fd, "comparer", "missing")
193	case rec.comparer != s.icmp.uName():
194		return newErrManifestCorrupted(fd, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer))
195	case !rec.has(recNextFileNum):
196		return newErrManifestCorrupted(fd, "next-file-num", "missing")
197	case !rec.has(recJournalNum):
198		return newErrManifestCorrupted(fd, "journal-file-num", "missing")
199	case !rec.has(recSeqNum):
200		return newErrManifestCorrupted(fd, "seq-num", "missing")
201	}
202
203	s.manifestFd = fd
204	s.setVersion(rec, staging.finish(false))
205	s.setNextFileNum(rec.nextFileNum)
206	s.recordCommited(rec)
207	return nil
208}
209
210// Commit session; need external synchronization.
211func (s *session) commit(r *sessionRecord, trivial bool) (err error) {
212	v := s.version()
213	defer v.release()
214
215	// spawn new version based on current version
216	nv := v.spawn(r, trivial)
217
218	// abandon useless version id to prevent blocking version processing loop.
219	defer func() {
220		if err != nil {
221			s.abandon <- nv.id
222			s.logf("commit@abandon useless vid D%d", nv.id)
223		}
224	}()
225
226	if s.manifest == nil {
227		// manifest journal writer not yet created, create one
228		err = s.newManifest(r, nv)
229	} else {
230		err = s.flushManifest(r)
231	}
232
233	// finally, apply new version if no error rise
234	if err == nil {
235		s.setVersion(r, nv)
236	}
237
238	return
239}
240