1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"fmt"
11	"io"
12	"os"
13	"sync"
14
15	"github.com/syndtr/goleveldb/leveldb/errors"
16	"github.com/syndtr/goleveldb/leveldb/journal"
17	"github.com/syndtr/goleveldb/leveldb/opt"
18	"github.com/syndtr/goleveldb/leveldb/storage"
19)
20
21// ErrManifestCorrupted records manifest corruption. This error will be
22// wrapped with errors.ErrCorrupted.
23type ErrManifestCorrupted struct {
24	Field  string
25	Reason string
26}
27
28func (e *ErrManifestCorrupted) Error() string {
29	return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason)
30}
31
32func newErrManifestCorrupted(fd storage.FileDesc, field, reason string) error {
33	return errors.NewErrCorrupted(fd, &ErrManifestCorrupted{field, reason})
34}
35
36// session represent a persistent database session.
37type session struct {
38	// Need 64-bit alignment.
39	stNextFileNum    int64 // current unused file number
40	stJournalNum     int64 // current journal file number; need external synchronization
41	stPrevJournalNum int64 // prev journal file number; no longer used; for compatibility with older version of leveldb
42	stTempFileNum    int64
43	stSeqNum         uint64 // last mem compacted seq; need external synchronization
44
45	stor     *iStorage
46	storLock storage.Locker
47	o        *cachedOptions
48	icmp     *iComparer
49	tops     *tOps
50	fileRef  map[int64]int
51
52	manifest       *journal.Writer
53	manifestWriter storage.Writer
54	manifestFd     storage.FileDesc
55
56	stCompPtrs []internalKey // compaction pointers; need external synchronization
57	stVersion  *version      // current version
58	vmu        sync.Mutex
59}
60
61// Creates new initialized session instance.
62func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
63	if stor == nil {
64		return nil, os.ErrInvalid
65	}
66	storLock, err := stor.Lock()
67	if err != nil {
68		return
69	}
70	s = &session{
71		stor:     newIStorage(stor),
72		storLock: storLock,
73		fileRef:  make(map[int64]int),
74	}
75	s.setOptions(o)
76	s.tops = newTableOps(s)
77	s.setVersion(newVersion(s))
78	s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
79	return
80}
81
82// Close session.
83func (s *session) close() {
84	s.tops.close()
85	if s.manifest != nil {
86		s.manifest.Close()
87	}
88	if s.manifestWriter != nil {
89		s.manifestWriter.Close()
90	}
91	s.manifest = nil
92	s.manifestWriter = nil
93	s.setVersion(&version{s: s, closing: true})
94}
95
96// Release session lock.
97func (s *session) release() {
98	s.storLock.Unlock()
99}
100
101// Create a new database session; need external synchronization.
102func (s *session) create() error {
103	// create manifest
104	return s.newManifest(nil, nil)
105}
106
107// Recover a database session; need external synchronization.
108func (s *session) recover() (err error) {
109	defer func() {
110		if os.IsNotExist(err) {
111			// Don't return os.ErrNotExist if the underlying storage contains
112			// other files that belong to LevelDB. So the DB won't get trashed.
113			if fds, _ := s.stor.List(storage.TypeAll); len(fds) > 0 {
114				err = &errors.ErrCorrupted{Fd: storage.FileDesc{Type: storage.TypeManifest}, Err: &errors.ErrMissingFiles{}}
115			}
116		}
117	}()
118
119	fd, err := s.stor.GetMeta()
120	if err != nil {
121		return
122	}
123
124	reader, err := s.stor.Open(fd)
125	if err != nil {
126		return
127	}
128	defer reader.Close()
129
130	var (
131		// Options.
132		strict = s.o.GetStrict(opt.StrictManifest)
133
134		jr      = journal.NewReader(reader, dropper{s, fd}, strict, true)
135		rec     = &sessionRecord{}
136		staging = s.stVersion.newStaging()
137	)
138	for {
139		var r io.Reader
140		r, err = jr.Next()
141		if err != nil {
142			if err == io.EOF {
143				err = nil
144				break
145			}
146			return errors.SetFd(err, fd)
147		}
148
149		err = rec.decode(r)
150		if err == nil {
151			// save compact pointers
152			for _, r := range rec.compPtrs {
153				s.setCompPtr(r.level, internalKey(r.ikey))
154			}
155			// commit record to version staging
156			staging.commit(rec)
157		} else {
158			err = errors.SetFd(err, fd)
159			if strict || !errors.IsCorrupted(err) {
160				return
161			}
162			s.logf("manifest error: %v (skipped)", errors.SetFd(err, fd))
163		}
164		rec.resetCompPtrs()
165		rec.resetAddedTables()
166		rec.resetDeletedTables()
167	}
168
169	switch {
170	case !rec.has(recComparer):
171		return newErrManifestCorrupted(fd, "comparer", "missing")
172	case rec.comparer != s.icmp.uName():
173		return newErrManifestCorrupted(fd, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer))
174	case !rec.has(recNextFileNum):
175		return newErrManifestCorrupted(fd, "next-file-num", "missing")
176	case !rec.has(recJournalNum):
177		return newErrManifestCorrupted(fd, "journal-file-num", "missing")
178	case !rec.has(recSeqNum):
179		return newErrManifestCorrupted(fd, "seq-num", "missing")
180	}
181
182	s.manifestFd = fd
183	s.setVersion(staging.finish())
184	s.setNextFileNum(rec.nextFileNum)
185	s.recordCommited(rec)
186	return nil
187}
188
189// Commit session; need external synchronization.
190func (s *session) commit(r *sessionRecord) (err error) {
191	v := s.version()
192	defer v.release()
193
194	// spawn new version based on current version
195	nv := v.spawn(r)
196
197	if s.manifest == nil {
198		// manifest journal writer not yet created, create one
199		err = s.newManifest(r, nv)
200	} else {
201		err = s.flushManifest(r)
202	}
203
204	// finally, apply new version if no error rise
205	if err == nil {
206		s.setVersion(nv)
207	}
208
209	return
210}
211