1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "fmt" 11 "sync/atomic" 12 13 "github.com/syndtr/goleveldb/leveldb/journal" 14 "github.com/syndtr/goleveldb/leveldb/storage" 15) 16 17// Logging. 18 19type dropper struct { 20 s *session 21 fd storage.FileDesc 22} 23 24func (d dropper) Drop(err error) { 25 if e, ok := err.(*journal.ErrCorrupted); ok { 26 d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason) 27 } else { 28 d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err) 29 } 30} 31 32func (s *session) log(v ...interface{}) { s.stor.Log(fmt.Sprint(v...)) } 33func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) } 34 35// File utils. 36 37func (s *session) newTemp() storage.FileDesc { 38 num := atomic.AddInt64(&s.stTempFileNum, 1) - 1 39 return storage.FileDesc{Type: storage.TypeTemp, Num: num} 40} 41 42func (s *session) addFileRef(fd storage.FileDesc, ref int) int { 43 ref += s.fileRef[fd.Num] 44 if ref > 0 { 45 s.fileRef[fd.Num] = ref 46 } else if ref == 0 { 47 delete(s.fileRef, fd.Num) 48 } else { 49 panic(fmt.Sprintf("negative ref: %v", fd)) 50 } 51 return ref 52} 53 54// Session state. 55 56// Get current version. This will incr version ref, must call 57// version.release (exactly once) after use. 58func (s *session) version() *version { 59 s.vmu.Lock() 60 defer s.vmu.Unlock() 61 s.stVersion.incref() 62 return s.stVersion 63} 64 65func (s *session) tLen(level int) int { 66 s.vmu.Lock() 67 defer s.vmu.Unlock() 68 return s.stVersion.tLen(level) 69} 70 71// Set current version to v. 72func (s *session) setVersion(v *version) { 73 s.vmu.Lock() 74 defer s.vmu.Unlock() 75 // Hold by session. It is important to call this first before releasing 76 // current version, otherwise the still used files might get released. 77 v.incref() 78 if s.stVersion != nil { 79 // Release current version. 80 s.stVersion.releaseNB() 81 } 82 s.stVersion = v 83} 84 85// Get current unused file number. 86func (s *session) nextFileNum() int64 { 87 return atomic.LoadInt64(&s.stNextFileNum) 88} 89 90// Set current unused file number to num. 91func (s *session) setNextFileNum(num int64) { 92 atomic.StoreInt64(&s.stNextFileNum, num) 93} 94 95// Mark file number as used. 96func (s *session) markFileNum(num int64) { 97 nextFileNum := num + 1 98 for { 99 old, x := s.stNextFileNum, nextFileNum 100 if old > x { 101 x = old 102 } 103 if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) { 104 break 105 } 106 } 107} 108 109// Allocate a file number. 110func (s *session) allocFileNum() int64 { 111 return atomic.AddInt64(&s.stNextFileNum, 1) - 1 112} 113 114// Reuse given file number. 115func (s *session) reuseFileNum(num int64) { 116 for { 117 old, x := s.stNextFileNum, num 118 if old != x+1 { 119 x = old 120 } 121 if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) { 122 break 123 } 124 } 125} 126 127// Set compaction ptr at given level; need external synchronization. 128func (s *session) setCompPtr(level int, ik internalKey) { 129 if level >= len(s.stCompPtrs) { 130 newCompPtrs := make([]internalKey, level+1) 131 copy(newCompPtrs, s.stCompPtrs) 132 s.stCompPtrs = newCompPtrs 133 } 134 s.stCompPtrs[level] = append(internalKey{}, ik...) 135} 136 137// Get compaction ptr at given level; need external synchronization. 138func (s *session) getCompPtr(level int) internalKey { 139 if level >= len(s.stCompPtrs) { 140 return nil 141 } 142 return s.stCompPtrs[level] 143} 144 145// Manifest related utils. 146 147// Fill given session record obj with current states; need external 148// synchronization. 149func (s *session) fillRecord(r *sessionRecord, snapshot bool) { 150 r.setNextFileNum(s.nextFileNum()) 151 152 if snapshot { 153 if !r.has(recJournalNum) { 154 r.setJournalNum(s.stJournalNum) 155 } 156 157 if !r.has(recSeqNum) { 158 r.setSeqNum(s.stSeqNum) 159 } 160 161 for level, ik := range s.stCompPtrs { 162 if ik != nil { 163 r.addCompPtr(level, ik) 164 } 165 } 166 167 r.setComparer(s.icmp.uName()) 168 } 169} 170 171// Mark if record has been committed, this will update session state; 172// need external synchronization. 173func (s *session) recordCommited(rec *sessionRecord) { 174 if rec.has(recJournalNum) { 175 s.stJournalNum = rec.journalNum 176 } 177 178 if rec.has(recPrevJournalNum) { 179 s.stPrevJournalNum = rec.prevJournalNum 180 } 181 182 if rec.has(recSeqNum) { 183 s.stSeqNum = rec.seqNum 184 } 185 186 for _, r := range rec.compPtrs { 187 s.setCompPtr(r.level, internalKey(r.ikey)) 188 } 189} 190 191// Create a new manifest file; need external synchronization. 192func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { 193 fd := storage.FileDesc{Type: storage.TypeManifest, Num: s.allocFileNum()} 194 writer, err := s.stor.Create(fd) 195 if err != nil { 196 return 197 } 198 jw := journal.NewWriter(writer) 199 200 if v == nil { 201 v = s.version() 202 defer v.release() 203 } 204 if rec == nil { 205 rec = &sessionRecord{} 206 } 207 s.fillRecord(rec, true) 208 v.fillRecord(rec) 209 210 defer func() { 211 if err == nil { 212 s.recordCommited(rec) 213 if s.manifest != nil { 214 s.manifest.Close() 215 } 216 if s.manifestWriter != nil { 217 s.manifestWriter.Close() 218 } 219 if !s.manifestFd.Zero() { 220 s.stor.Remove(s.manifestFd) 221 } 222 s.manifestFd = fd 223 s.manifestWriter = writer 224 s.manifest = jw 225 } else { 226 writer.Close() 227 s.stor.Remove(fd) 228 s.reuseFileNum(fd.Num) 229 } 230 }() 231 232 w, err := jw.Next() 233 if err != nil { 234 return 235 } 236 err = rec.encode(w) 237 if err != nil { 238 return 239 } 240 err = jw.Flush() 241 if err != nil { 242 return 243 } 244 err = s.stor.SetMeta(fd) 245 return 246} 247 248// Flush record to disk. 249func (s *session) flushManifest(rec *sessionRecord) (err error) { 250 s.fillRecord(rec, false) 251 w, err := s.manifest.Next() 252 if err != nil { 253 return 254 } 255 err = rec.encode(w) 256 if err != nil { 257 return 258 } 259 err = s.manifest.Flush() 260 if err != nil { 261 return 262 } 263 if !s.o.GetNoSync() { 264 err = s.manifestWriter.Sync() 265 if err != nil { 266 return 267 } 268 } 269 s.recordCommited(rec) 270 return 271} 272