1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "fmt" 11 "io" 12 "os" 13 "sync" 14 15 "github.com/syndtr/goleveldb/leveldb/errors" 16 "github.com/syndtr/goleveldb/leveldb/journal" 17 "github.com/syndtr/goleveldb/leveldb/opt" 18 "github.com/syndtr/goleveldb/leveldb/storage" 19) 20 21// ErrManifestCorrupted records manifest corruption. This error will be 22// wrapped with errors.ErrCorrupted. 23type ErrManifestCorrupted struct { 24 Field string 25 Reason string 26} 27 28func (e *ErrManifestCorrupted) Error() string { 29 return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason) 30} 31 32func newErrManifestCorrupted(fd storage.FileDesc, field, reason string) error { 33 return errors.NewErrCorrupted(fd, &ErrManifestCorrupted{field, reason}) 34} 35 36// session represent a persistent database session. 37type session struct { 38 // Need 64-bit alignment. 39 stNextFileNum int64 // current unused file number 40 stJournalNum int64 // current journal file number; need external synchronization 41 stPrevJournalNum int64 // prev journal file number; no longer used; for compatibility with older version of leveldb 42 stTempFileNum int64 43 stSeqNum uint64 // last mem compacted seq; need external synchronization 44 45 stor *iStorage 46 storLock storage.Locker 47 o *cachedOptions 48 icmp *iComparer 49 tops *tOps 50 fileRef map[int64]int 51 52 manifest *journal.Writer 53 manifestWriter storage.Writer 54 manifestFd storage.FileDesc 55 56 stCompPtrs []internalKey // compaction pointers; need external synchronization 57 stVersion *version // current version 58 vmu sync.Mutex 59} 60 61// Creates new initialized session instance. 62func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) { 63 if stor == nil { 64 return nil, os.ErrInvalid 65 } 66 storLock, err := stor.Lock() 67 if err != nil { 68 return 69 } 70 s = &session{ 71 stor: newIStorage(stor), 72 storLock: storLock, 73 fileRef: make(map[int64]int), 74 } 75 s.setOptions(o) 76 s.tops = newTableOps(s) 77 s.setVersion(newVersion(s)) 78 s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed") 79 return 80} 81 82// Close session. 83func (s *session) close() { 84 s.tops.close() 85 if s.manifest != nil { 86 s.manifest.Close() 87 } 88 if s.manifestWriter != nil { 89 s.manifestWriter.Close() 90 } 91 s.manifest = nil 92 s.manifestWriter = nil 93 s.setVersion(&version{s: s, closing: true}) 94} 95 96// Release session lock. 97func (s *session) release() { 98 s.storLock.Unlock() 99} 100 101// Create a new database session; need external synchronization. 102func (s *session) create() error { 103 // create manifest 104 return s.newManifest(nil, nil) 105} 106 107// Recover a database session; need external synchronization. 108func (s *session) recover() (err error) { 109 defer func() { 110 if os.IsNotExist(err) { 111 // Don't return os.ErrNotExist if the underlying storage contains 112 // other files that belong to LevelDB. So the DB won't get trashed. 113 if fds, _ := s.stor.List(storage.TypeAll); len(fds) > 0 { 114 err = &errors.ErrCorrupted{Fd: storage.FileDesc{Type: storage.TypeManifest}, Err: &errors.ErrMissingFiles{}} 115 } 116 } 117 }() 118 119 fd, err := s.stor.GetMeta() 120 if err != nil { 121 return 122 } 123 124 reader, err := s.stor.Open(fd) 125 if err != nil { 126 return 127 } 128 defer reader.Close() 129 130 var ( 131 // Options. 132 strict = s.o.GetStrict(opt.StrictManifest) 133 134 jr = journal.NewReader(reader, dropper{s, fd}, strict, true) 135 rec = &sessionRecord{} 136 staging = s.stVersion.newStaging() 137 ) 138 for { 139 var r io.Reader 140 r, err = jr.Next() 141 if err != nil { 142 if err == io.EOF { 143 err = nil 144 break 145 } 146 return errors.SetFd(err, fd) 147 } 148 149 err = rec.decode(r) 150 if err == nil { 151 // save compact pointers 152 for _, r := range rec.compPtrs { 153 s.setCompPtr(r.level, internalKey(r.ikey)) 154 } 155 // commit record to version staging 156 staging.commit(rec) 157 } else { 158 err = errors.SetFd(err, fd) 159 if strict || !errors.IsCorrupted(err) { 160 return 161 } 162 s.logf("manifest error: %v (skipped)", errors.SetFd(err, fd)) 163 } 164 rec.resetCompPtrs() 165 rec.resetAddedTables() 166 rec.resetDeletedTables() 167 } 168 169 switch { 170 case !rec.has(recComparer): 171 return newErrManifestCorrupted(fd, "comparer", "missing") 172 case rec.comparer != s.icmp.uName(): 173 return newErrManifestCorrupted(fd, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer)) 174 case !rec.has(recNextFileNum): 175 return newErrManifestCorrupted(fd, "next-file-num", "missing") 176 case !rec.has(recJournalNum): 177 return newErrManifestCorrupted(fd, "journal-file-num", "missing") 178 case !rec.has(recSeqNum): 179 return newErrManifestCorrupted(fd, "seq-num", "missing") 180 } 181 182 s.manifestFd = fd 183 s.setVersion(staging.finish()) 184 s.setNextFileNum(rec.nextFileNum) 185 s.recordCommited(rec) 186 return nil 187} 188 189// Commit session; need external synchronization. 190func (s *session) commit(r *sessionRecord) (err error) { 191 v := s.version() 192 defer v.release() 193 194 // spawn new version based on current version 195 nv := v.spawn(r) 196 197 if s.manifest == nil { 198 // manifest journal writer not yet created, create one 199 err = s.newManifest(r, nv) 200 } else { 201 err = s.flushManifest(r) 202 } 203 204 // finally, apply new version if no error rise 205 if err == nil { 206 s.setVersion(nv) 207 } 208 209 return 210} 211