1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "fmt" 11 "io" 12 "os" 13 "sync" 14 15 "github.com/syndtr/goleveldb/leveldb/errors" 16 "github.com/syndtr/goleveldb/leveldb/journal" 17 "github.com/syndtr/goleveldb/leveldb/opt" 18 "github.com/syndtr/goleveldb/leveldb/storage" 19) 20 21// ErrManifestCorrupted records manifest corruption. This error will be 22// wrapped with errors.ErrCorrupted. 23type ErrManifestCorrupted struct { 24 Field string 25 Reason string 26} 27 28func (e *ErrManifestCorrupted) Error() string { 29 return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason) 30} 31 32func newErrManifestCorrupted(fd storage.FileDesc, field, reason string) error { 33 return errors.NewErrCorrupted(fd, &ErrManifestCorrupted{field, reason}) 34} 35 36// session represent a persistent database session. 37type session struct { 38 // Need 64-bit alignment. 39 stNextFileNum int64 // current unused file number 40 stJournalNum int64 // current journal file number; need external synchronization 41 stPrevJournalNum int64 // prev journal file number; no longer used; for compatibility with older version of leveldb 42 stTempFileNum int64 43 stSeqNum uint64 // last mem compacted seq; need external synchronization 44 45 stor *iStorage 46 storLock storage.Locker 47 o *cachedOptions 48 icmp *iComparer 49 tops *tOps 50 51 manifest *journal.Writer 52 manifestWriter storage.Writer 53 manifestFd storage.FileDesc 54 55 stCompPtrs []internalKey // compaction pointers; need external synchronization 56 stVersion *version // current version 57 ntVersionId int64 // next version id to assign 58 refCh chan *vTask 59 relCh chan *vTask 60 deltaCh chan *vDelta 61 abandon chan int64 62 closeC chan struct{} 63 closeW sync.WaitGroup 64 vmu sync.Mutex 65 66 // Testing fields 67 fileRefCh chan chan map[int64]int // channel used to pass current reference stat 68} 69 70// Creates new initialized session instance. 71func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) { 72 if stor == nil { 73 return nil, os.ErrInvalid 74 } 75 storLock, err := stor.Lock() 76 if err != nil { 77 return 78 } 79 s = &session{ 80 stor: newIStorage(stor), 81 storLock: storLock, 82 refCh: make(chan *vTask), 83 relCh: make(chan *vTask), 84 deltaCh: make(chan *vDelta), 85 abandon: make(chan int64), 86 fileRefCh: make(chan chan map[int64]int), 87 closeC: make(chan struct{}), 88 } 89 s.setOptions(o) 90 s.tops = newTableOps(s) 91 92 s.closeW.Add(1) 93 go s.refLoop() 94 s.setVersion(nil, newVersion(s)) 95 s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed") 96 return 97} 98 99// Close session. 100func (s *session) close() { 101 s.tops.close() 102 if s.manifest != nil { 103 s.manifest.Close() 104 } 105 if s.manifestWriter != nil { 106 s.manifestWriter.Close() 107 } 108 s.manifest = nil 109 s.manifestWriter = nil 110 s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionId}) 111 112 // Close all background goroutines 113 close(s.closeC) 114 s.closeW.Wait() 115} 116 117// Release session lock. 118func (s *session) release() { 119 s.storLock.Unlock() 120} 121 122// Create a new database session; need external synchronization. 123func (s *session) create() error { 124 // create manifest 125 return s.newManifest(nil, nil) 126} 127 128// Recover a database session; need external synchronization. 129func (s *session) recover() (err error) { 130 defer func() { 131 if os.IsNotExist(err) { 132 // Don't return os.ErrNotExist if the underlying storage contains 133 // other files that belong to LevelDB. So the DB won't get trashed. 134 if fds, _ := s.stor.List(storage.TypeAll); len(fds) > 0 { 135 err = &errors.ErrCorrupted{Err: errors.New("database entry point either missing or corrupted")} 136 } 137 } 138 }() 139 140 fd, err := s.stor.GetMeta() 141 if err != nil { 142 return 143 } 144 145 reader, err := s.stor.Open(fd) 146 if err != nil { 147 return 148 } 149 defer reader.Close() 150 151 var ( 152 // Options. 153 strict = s.o.GetStrict(opt.StrictManifest) 154 155 jr = journal.NewReader(reader, dropper{s, fd}, strict, true) 156 rec = &sessionRecord{} 157 staging = s.stVersion.newStaging() 158 ) 159 for { 160 var r io.Reader 161 r, err = jr.Next() 162 if err != nil { 163 if err == io.EOF { 164 err = nil 165 break 166 } 167 return errors.SetFd(err, fd) 168 } 169 170 err = rec.decode(r) 171 if err == nil { 172 // save compact pointers 173 for _, r := range rec.compPtrs { 174 s.setCompPtr(r.level, internalKey(r.ikey)) 175 } 176 // commit record to version staging 177 staging.commit(rec) 178 } else { 179 err = errors.SetFd(err, fd) 180 if strict || !errors.IsCorrupted(err) { 181 return 182 } 183 s.logf("manifest error: %v (skipped)", errors.SetFd(err, fd)) 184 } 185 rec.resetCompPtrs() 186 rec.resetAddedTables() 187 rec.resetDeletedTables() 188 } 189 190 switch { 191 case !rec.has(recComparer): 192 return newErrManifestCorrupted(fd, "comparer", "missing") 193 case rec.comparer != s.icmp.uName(): 194 return newErrManifestCorrupted(fd, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer)) 195 case !rec.has(recNextFileNum): 196 return newErrManifestCorrupted(fd, "next-file-num", "missing") 197 case !rec.has(recJournalNum): 198 return newErrManifestCorrupted(fd, "journal-file-num", "missing") 199 case !rec.has(recSeqNum): 200 return newErrManifestCorrupted(fd, "seq-num", "missing") 201 } 202 203 s.manifestFd = fd 204 s.setVersion(rec, staging.finish(false)) 205 s.setNextFileNum(rec.nextFileNum) 206 s.recordCommited(rec) 207 return nil 208} 209 210// Commit session; need external synchronization. 211func (s *session) commit(r *sessionRecord, trivial bool) (err error) { 212 v := s.version() 213 defer v.release() 214 215 // spawn new version based on current version 216 nv := v.spawn(r, trivial) 217 218 // abandon useless version id to prevent blocking version processing loop. 219 defer func() { 220 if err != nil { 221 s.abandon <- nv.id 222 s.logf("commit@abandon useless vid D%d", nv.id) 223 } 224 }() 225 226 if s.manifest == nil { 227 // manifest journal writer not yet created, create one 228 err = s.newManifest(r, nv) 229 } else { 230 err = s.flushManifest(r) 231 } 232 233 // finally, apply new version if no error rise 234 if err == nil { 235 s.setVersion(r, nv) 236 } 237 238 return 239} 240