1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "fmt" 11 "sync/atomic" 12 "time" 13 14 "github.com/syndtr/goleveldb/leveldb/journal" 15 "github.com/syndtr/goleveldb/leveldb/storage" 16) 17 18// Logging. 19 20type dropper struct { 21 s *session 22 fd storage.FileDesc 23} 24 25func (d dropper) Drop(err error) { 26 if e, ok := err.(*journal.ErrCorrupted); ok { 27 d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason) 28 } else { 29 d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err) 30 } 31} 32 33func (s *session) log(v ...interface{}) { s.stor.Log(fmt.Sprint(v...)) } 34func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) } 35 36// File utils. 37 38func (s *session) newTemp() storage.FileDesc { 39 num := atomic.AddInt64(&s.stTempFileNum, 1) - 1 40 return storage.FileDesc{Type: storage.TypeTemp, Num: num} 41} 42 43// Session state. 44 45const ( 46 // maxCachedNumber represents the maximum number of version tasks 47 // that can be cached in the ref loop. 48 maxCachedNumber = 256 49 50 // maxCachedTime represents the maximum time for ref loop to cache 51 // a version task. 52 maxCachedTime = 5 * time.Minute 53) 54 55// vDelta indicates the change information between the next version 56// and the currently specified version 57type vDelta struct { 58 vid int64 59 added []int64 60 deleted []int64 61} 62 63// vTask defines a version task for either reference or release. 64type vTask struct { 65 vid int64 66 files []tFiles 67 created time.Time 68} 69 70func (s *session) refLoop() { 71 var ( 72 fileRef = make(map[int64]int) // Table file reference counter 73 ref = make(map[int64]*vTask) // Current referencing version store 74 deltas = make(map[int64]*vDelta) 75 referenced = make(map[int64]struct{}) 76 released = make(map[int64]*vDelta) // Released version that waiting for processing 77 abandoned = make(map[int64]struct{}) // Abandoned version id 78 next, last int64 79 ) 80 // addFileRef adds file reference counter with specified file number and 81 // reference value 82 addFileRef := func(fnum int64, ref int) int { 83 ref += fileRef[fnum] 84 if ref > 0 { 85 fileRef[fnum] = ref 86 } else if ref == 0 { 87 delete(fileRef, fnum) 88 } else { 89 panic(fmt.Sprintf("negative ref: %v", fnum)) 90 } 91 return ref 92 } 93 // skipAbandoned skips useless abandoned version id. 94 skipAbandoned := func() bool { 95 if _, exist := abandoned[next]; exist { 96 delete(abandoned, next) 97 return true 98 } 99 return false 100 } 101 // applyDelta applies version change to current file reference. 102 applyDelta := func(d *vDelta) { 103 for _, t := range d.added { 104 addFileRef(t, 1) 105 } 106 for _, t := range d.deleted { 107 if addFileRef(t, -1) == 0 { 108 s.tops.remove(storage.FileDesc{Type: storage.TypeTable, Num: t}) 109 } 110 } 111 } 112 113 timer := time.NewTimer(0) 114 <-timer.C // discard the initial tick 115 defer timer.Stop() 116 117 // processTasks processes version tasks in strict order. 118 // 119 // If we want to use delta to reduce the cost of file references and dereferences, 120 // we must strictly follow the id of the version, otherwise some files that are 121 // being referenced will be deleted. 122 // 123 // In addition, some db operations (such as iterators) may cause a version to be 124 // referenced for a long time. In order to prevent such operations from blocking 125 // the entire processing queue, we will properly convert some of the version tasks 126 // into full file references and releases. 127 processTasks := func() { 128 timer.Reset(maxCachedTime) 129 // Make sure we don't cache too many version tasks. 130 for { 131 // Skip any abandoned version number to prevent blocking processing. 132 if skipAbandoned() { 133 next += 1 134 continue 135 } 136 // Don't bother the version that has been released. 137 if _, exist := released[next]; exist { 138 break 139 } 140 // Ensure the specified version has been referenced. 141 if _, exist := ref[next]; !exist { 142 break 143 } 144 if last-next < maxCachedNumber && time.Since(ref[next].created) < maxCachedTime { 145 break 146 } 147 // Convert version task into full file references and releases mode. 148 // Reference version(i+1) first and wait version(i) to release. 149 // FileRef(i+1) = FileRef(i) + Delta(i) 150 for _, tt := range ref[next].files { 151 for _, t := range tt { 152 addFileRef(t.fd.Num, 1) 153 } 154 } 155 // Note, if some compactions take a long time, even more than 5 minutes, 156 // we may miss the corresponding delta information here. 157 // Fortunately it will not affect the correctness of the file reference, 158 // and we can apply the delta once we receive it. 159 if d := deltas[next]; d != nil { 160 applyDelta(d) 161 } 162 referenced[next] = struct{}{} 163 delete(ref, next) 164 delete(deltas, next) 165 next += 1 166 } 167 168 // Use delta information to process all released versions. 169 for { 170 if skipAbandoned() { 171 next += 1 172 continue 173 } 174 if d, exist := released[next]; exist { 175 if d != nil { 176 applyDelta(d) 177 } 178 delete(released, next) 179 next += 1 180 continue 181 } 182 return 183 } 184 } 185 186 for { 187 processTasks() 188 189 select { 190 case t := <-s.refCh: 191 if _, exist := ref[t.vid]; exist { 192 panic("duplicate reference request") 193 } 194 ref[t.vid] = t 195 if t.vid > last { 196 last = t.vid 197 } 198 199 case d := <-s.deltaCh: 200 if _, exist := ref[d.vid]; !exist { 201 if _, exist2 := referenced[d.vid]; !exist2 { 202 panic("invalid release request") 203 } 204 // The reference opt is already expired, apply 205 // delta here. 206 applyDelta(d) 207 continue 208 } 209 deltas[d.vid] = d 210 211 case t := <-s.relCh: 212 if _, exist := referenced[t.vid]; exist { 213 for _, tt := range t.files { 214 for _, t := range tt { 215 if addFileRef(t.fd.Num, -1) == 0 { 216 s.tops.remove(t.fd) 217 } 218 } 219 } 220 delete(referenced, t.vid) 221 continue 222 } 223 if _, exist := ref[t.vid]; !exist { 224 panic("invalid release request") 225 } 226 released[t.vid] = deltas[t.vid] 227 delete(deltas, t.vid) 228 delete(ref, t.vid) 229 230 case id := <-s.abandon: 231 if id >= next { 232 abandoned[id] = struct{}{} 233 } 234 235 case <-timer.C: 236 237 case r := <-s.fileRefCh: 238 ref := make(map[int64]int) 239 for f, c := range fileRef { 240 ref[f] = c 241 } 242 r <- ref 243 244 case <-s.closeC: 245 s.closeW.Done() 246 return 247 } 248 } 249} 250 251// Get current version. This will incr version ref, must call 252// version.release (exactly once) after use. 253func (s *session) version() *version { 254 s.vmu.Lock() 255 defer s.vmu.Unlock() 256 s.stVersion.incref() 257 return s.stVersion 258} 259 260func (s *session) tLen(level int) int { 261 s.vmu.Lock() 262 defer s.vmu.Unlock() 263 return s.stVersion.tLen(level) 264} 265 266// Set current version to v. 267func (s *session) setVersion(r *sessionRecord, v *version) { 268 s.vmu.Lock() 269 defer s.vmu.Unlock() 270 // Hold by session. It is important to call this first before releasing 271 // current version, otherwise the still used files might get released. 272 v.incref() 273 if s.stVersion != nil { 274 if r != nil { 275 var ( 276 added = make([]int64, 0, len(r.addedTables)) 277 deleted = make([]int64, 0, len(r.deletedTables)) 278 ) 279 for _, t := range r.addedTables { 280 added = append(added, t.num) 281 } 282 for _, t := range r.deletedTables { 283 deleted = append(deleted, t.num) 284 } 285 select { 286 case s.deltaCh <- &vDelta{vid: s.stVersion.id, added: added, deleted: deleted}: 287 case <-v.s.closeC: 288 s.log("reference loop already exist") 289 } 290 } 291 // Release current version. 292 s.stVersion.releaseNB() 293 } 294 s.stVersion = v 295} 296 297// Get current unused file number. 298func (s *session) nextFileNum() int64 { 299 return atomic.LoadInt64(&s.stNextFileNum) 300} 301 302// Set current unused file number to num. 303func (s *session) setNextFileNum(num int64) { 304 atomic.StoreInt64(&s.stNextFileNum, num) 305} 306 307// Mark file number as used. 308func (s *session) markFileNum(num int64) { 309 nextFileNum := num + 1 310 for { 311 old, x := atomic.LoadInt64(&s.stNextFileNum), nextFileNum 312 if old > x { 313 x = old 314 } 315 if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) { 316 break 317 } 318 } 319} 320 321// Allocate a file number. 322func (s *session) allocFileNum() int64 { 323 return atomic.AddInt64(&s.stNextFileNum, 1) - 1 324} 325 326// Reuse given file number. 327func (s *session) reuseFileNum(num int64) { 328 for { 329 old, x := atomic.LoadInt64(&s.stNextFileNum), num 330 if old != x+1 { 331 x = old 332 } 333 if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) { 334 break 335 } 336 } 337} 338 339// Set compaction ptr at given level; need external synchronization. 340func (s *session) setCompPtr(level int, ik internalKey) { 341 if level >= len(s.stCompPtrs) { 342 newCompPtrs := make([]internalKey, level+1) 343 copy(newCompPtrs, s.stCompPtrs) 344 s.stCompPtrs = newCompPtrs 345 } 346 s.stCompPtrs[level] = append(internalKey{}, ik...) 347} 348 349// Get compaction ptr at given level; need external synchronization. 350func (s *session) getCompPtr(level int) internalKey { 351 if level >= len(s.stCompPtrs) { 352 return nil 353 } 354 return s.stCompPtrs[level] 355} 356 357// Manifest related utils. 358 359// Fill given session record obj with current states; need external 360// synchronization. 361func (s *session) fillRecord(r *sessionRecord, snapshot bool) { 362 r.setNextFileNum(s.nextFileNum()) 363 364 if snapshot { 365 if !r.has(recJournalNum) { 366 r.setJournalNum(s.stJournalNum) 367 } 368 369 if !r.has(recSeqNum) { 370 r.setSeqNum(s.stSeqNum) 371 } 372 373 for level, ik := range s.stCompPtrs { 374 if ik != nil { 375 r.addCompPtr(level, ik) 376 } 377 } 378 379 r.setComparer(s.icmp.uName()) 380 } 381} 382 383// Mark if record has been committed, this will update session state; 384// need external synchronization. 385func (s *session) recordCommited(rec *sessionRecord) { 386 if rec.has(recJournalNum) { 387 s.stJournalNum = rec.journalNum 388 } 389 390 if rec.has(recPrevJournalNum) { 391 s.stPrevJournalNum = rec.prevJournalNum 392 } 393 394 if rec.has(recSeqNum) { 395 s.stSeqNum = rec.seqNum 396 } 397 398 for _, r := range rec.compPtrs { 399 s.setCompPtr(r.level, internalKey(r.ikey)) 400 } 401} 402 403// Create a new manifest file; need external synchronization. 404func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { 405 fd := storage.FileDesc{Type: storage.TypeManifest, Num: s.allocFileNum()} 406 writer, err := s.stor.Create(fd) 407 if err != nil { 408 return 409 } 410 jw := journal.NewWriter(writer) 411 412 if v == nil { 413 v = s.version() 414 defer v.release() 415 } 416 if rec == nil { 417 rec = &sessionRecord{} 418 } 419 s.fillRecord(rec, true) 420 v.fillRecord(rec) 421 422 defer func() { 423 if err == nil { 424 s.recordCommited(rec) 425 if s.manifest != nil { 426 s.manifest.Close() 427 } 428 if s.manifestWriter != nil { 429 s.manifestWriter.Close() 430 } 431 if !s.manifestFd.Zero() { 432 s.stor.Remove(s.manifestFd) 433 } 434 s.manifestFd = fd 435 s.manifestWriter = writer 436 s.manifest = jw 437 } else { 438 writer.Close() 439 s.stor.Remove(fd) 440 s.reuseFileNum(fd.Num) 441 } 442 }() 443 444 w, err := jw.Next() 445 if err != nil { 446 return 447 } 448 err = rec.encode(w) 449 if err != nil { 450 return 451 } 452 err = jw.Flush() 453 if err != nil { 454 return 455 } 456 err = s.stor.SetMeta(fd) 457 return 458} 459 460// Flush record to disk. 461func (s *session) flushManifest(rec *sessionRecord) (err error) { 462 s.fillRecord(rec, false) 463 w, err := s.manifest.Next() 464 if err != nil { 465 return 466 } 467 err = rec.encode(w) 468 if err != nil { 469 return 470 } 471 err = s.manifest.Flush() 472 if err != nil { 473 return 474 } 475 if !s.o.GetNoSync() { 476 err = s.manifestWriter.Sync() 477 if err != nil { 478 return 479 } 480 } 481 s.recordCommited(rec) 482 return 483} 484