1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "fmt" 11 "sync/atomic" 12 "unsafe" 13 14 "github.com/syndtr/goleveldb/leveldb/iterator" 15 "github.com/syndtr/goleveldb/leveldb/opt" 16 "github.com/syndtr/goleveldb/leveldb/util" 17) 18 19type tSet struct { 20 level int 21 table *tFile 22} 23 24type version struct { 25 s *session 26 27 levels []tFiles 28 29 // Level that should be compacted next and its compaction score. 30 // Score < 1 means compaction is not strictly needed. These fields 31 // are initialized by computeCompaction() 32 cLevel int 33 cScore float64 34 35 cSeek unsafe.Pointer 36 37 closing bool 38 ref int 39 released bool 40} 41 42func newVersion(s *session) *version { 43 return &version{s: s} 44} 45 46func (v *version) incref() { 47 if v.released { 48 panic("already released") 49 } 50 51 v.ref++ 52 if v.ref == 1 { 53 // Incr file ref. 54 for _, tt := range v.levels { 55 for _, t := range tt { 56 v.s.addFileRef(t.fd, 1) 57 } 58 } 59 } 60} 61 62func (v *version) releaseNB() { 63 v.ref-- 64 if v.ref > 0 { 65 return 66 } else if v.ref < 0 { 67 panic("negative version ref") 68 } 69 70 for _, tt := range v.levels { 71 for _, t := range tt { 72 if v.s.addFileRef(t.fd, -1) == 0 { 73 v.s.tops.remove(t) 74 } 75 } 76 } 77 78 v.released = true 79} 80 81func (v *version) release() { 82 v.s.vmu.Lock() 83 v.releaseNB() 84 v.s.vmu.Unlock() 85} 86 87func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int, t *tFile) bool, lf func(level int) bool) { 88 ukey := ikey.ukey() 89 90 // Aux level. 91 if aux != nil { 92 for _, t := range aux { 93 if t.overlaps(v.s.icmp, ukey, ukey) { 94 if !f(-1, t) { 95 return 96 } 97 } 98 } 99 100 if lf != nil && !lf(-1) { 101 return 102 } 103 } 104 105 // Walk tables level-by-level. 106 for level, tables := range v.levels { 107 if len(tables) == 0 { 108 continue 109 } 110 111 if level == 0 { 112 // Level-0 files may overlap each other. Find all files that 113 // overlap ukey. 114 for _, t := range tables { 115 if t.overlaps(v.s.icmp, ukey, ukey) { 116 if !f(level, t) { 117 return 118 } 119 } 120 } 121 } else { 122 if i := tables.searchMax(v.s.icmp, ikey); i < len(tables) { 123 t := tables[i] 124 if v.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 { 125 if !f(level, t) { 126 return 127 } 128 } 129 } 130 } 131 132 if lf != nil && !lf(level) { 133 return 134 } 135 } 136} 137 138func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) { 139 if v.closing { 140 return nil, false, ErrClosed 141 } 142 143 ukey := ikey.ukey() 144 145 var ( 146 tset *tSet 147 tseek bool 148 149 // Level-0. 150 zfound bool 151 zseq uint64 152 zkt keyType 153 zval []byte 154 ) 155 156 err = ErrNotFound 157 158 // Since entries never hop across level, finding key/value 159 // in smaller level make later levels irrelevant. 160 v.walkOverlapping(aux, ikey, func(level int, t *tFile) bool { 161 if level >= 0 && !tseek { 162 if tset == nil { 163 tset = &tSet{level, t} 164 } else { 165 tseek = true 166 } 167 } 168 169 var ( 170 fikey, fval []byte 171 ferr error 172 ) 173 if noValue { 174 fikey, ferr = v.s.tops.findKey(t, ikey, ro) 175 } else { 176 fikey, fval, ferr = v.s.tops.find(t, ikey, ro) 177 } 178 179 switch ferr { 180 case nil: 181 case ErrNotFound: 182 return true 183 default: 184 err = ferr 185 return false 186 } 187 188 if fukey, fseq, fkt, fkerr := parseInternalKey(fikey); fkerr == nil { 189 if v.s.icmp.uCompare(ukey, fukey) == 0 { 190 // Level <= 0 may overlaps each-other. 191 if level <= 0 { 192 if fseq >= zseq { 193 zfound = true 194 zseq = fseq 195 zkt = fkt 196 zval = fval 197 } 198 } else { 199 switch fkt { 200 case keyTypeVal: 201 value = fval 202 err = nil 203 case keyTypeDel: 204 default: 205 panic("leveldb: invalid internalKey type") 206 } 207 return false 208 } 209 } 210 } else { 211 err = fkerr 212 return false 213 } 214 215 return true 216 }, func(level int) bool { 217 if zfound { 218 switch zkt { 219 case keyTypeVal: 220 value = zval 221 err = nil 222 case keyTypeDel: 223 default: 224 panic("leveldb: invalid internalKey type") 225 } 226 return false 227 } 228 229 return true 230 }) 231 232 if tseek && tset.table.consumeSeek() <= 0 { 233 tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset)) 234 } 235 236 return 237} 238 239func (v *version) sampleSeek(ikey internalKey) (tcomp bool) { 240 var tset *tSet 241 242 v.walkOverlapping(nil, ikey, func(level int, t *tFile) bool { 243 if tset == nil { 244 tset = &tSet{level, t} 245 return true 246 } 247 if tset.table.consumeSeek() <= 0 { 248 tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset)) 249 } 250 return false 251 }, nil) 252 253 return 254} 255 256func (v *version) getIterators(slice *util.Range, ro *opt.ReadOptions) (its []iterator.Iterator) { 257 strict := opt.GetStrict(v.s.o.Options, ro, opt.StrictReader) 258 for level, tables := range v.levels { 259 if level == 0 { 260 // Merge all level zero files together since they may overlap. 261 for _, t := range tables { 262 its = append(its, v.s.tops.newIterator(t, slice, ro)) 263 } 264 } else if len(tables) != 0 { 265 its = append(its, iterator.NewIndexedIterator(tables.newIndexIterator(v.s.tops, v.s.icmp, slice, ro), strict)) 266 } 267 } 268 return 269} 270 271func (v *version) newStaging() *versionStaging { 272 return &versionStaging{base: v} 273} 274 275// Spawn a new version based on this version. 276func (v *version) spawn(r *sessionRecord) *version { 277 staging := v.newStaging() 278 staging.commit(r) 279 return staging.finish() 280} 281 282func (v *version) fillRecord(r *sessionRecord) { 283 for level, tables := range v.levels { 284 for _, t := range tables { 285 r.addTableFile(level, t) 286 } 287 } 288} 289 290func (v *version) tLen(level int) int { 291 if level < len(v.levels) { 292 return len(v.levels[level]) 293 } 294 return 0 295} 296 297func (v *version) offsetOf(ikey internalKey) (n int64, err error) { 298 for level, tables := range v.levels { 299 for _, t := range tables { 300 if v.s.icmp.Compare(t.imax, ikey) <= 0 { 301 // Entire file is before "ikey", so just add the file size 302 n += t.size 303 } else if v.s.icmp.Compare(t.imin, ikey) > 0 { 304 // Entire file is after "ikey", so ignore 305 if level > 0 { 306 // Files other than level 0 are sorted by meta->min, so 307 // no further files in this level will contain data for 308 // "ikey". 309 break 310 } 311 } else { 312 // "ikey" falls in the range for this table. Add the 313 // approximate offset of "ikey" within the table. 314 if m, err := v.s.tops.offsetOf(t, ikey); err == nil { 315 n += m 316 } else { 317 return 0, err 318 } 319 } 320 } 321 } 322 323 return 324} 325 326func (v *version) pickMemdbLevel(umin, umax []byte, maxLevel int) (level int) { 327 if maxLevel > 0 { 328 if len(v.levels) == 0 { 329 return maxLevel 330 } 331 if !v.levels[0].overlaps(v.s.icmp, umin, umax, true) { 332 var overlaps tFiles 333 for ; level < maxLevel; level++ { 334 if pLevel := level + 1; pLevel >= len(v.levels) { 335 return maxLevel 336 } else if v.levels[pLevel].overlaps(v.s.icmp, umin, umax, false) { 337 break 338 } 339 if gpLevel := level + 2; gpLevel < len(v.levels) { 340 overlaps = v.levels[gpLevel].getOverlaps(overlaps, v.s.icmp, umin, umax, false) 341 if overlaps.size() > int64(v.s.o.GetCompactionGPOverlaps(level)) { 342 break 343 } 344 } 345 } 346 } 347 } 348 return 349} 350 351func (v *version) computeCompaction() { 352 // Precomputed best level for next compaction 353 bestLevel := int(-1) 354 bestScore := float64(-1) 355 356 statFiles := make([]int, len(v.levels)) 357 statSizes := make([]string, len(v.levels)) 358 statScore := make([]string, len(v.levels)) 359 statTotSize := int64(0) 360 361 for level, tables := range v.levels { 362 var score float64 363 size := tables.size() 364 if level == 0 { 365 // We treat level-0 specially by bounding the number of files 366 // instead of number of bytes for two reasons: 367 // 368 // (1) With larger write-buffer sizes, it is nice not to do too 369 // many level-0 compaction. 370 // 371 // (2) The files in level-0 are merged on every read and 372 // therefore we wish to avoid too many files when the individual 373 // file size is small (perhaps because of a small write-buffer 374 // setting, or very high compression ratios, or lots of 375 // overwrites/deletions). 376 score = float64(len(tables)) / float64(v.s.o.GetCompactionL0Trigger()) 377 } else { 378 score = float64(size) / float64(v.s.o.GetCompactionTotalSize(level)) 379 } 380 381 if score > bestScore { 382 bestLevel = level 383 bestScore = score 384 } 385 386 statFiles[level] = len(tables) 387 statSizes[level] = shortenb(int(size)) 388 statScore[level] = fmt.Sprintf("%.2f", score) 389 statTotSize += size 390 } 391 392 v.cLevel = bestLevel 393 v.cScore = bestScore 394 395 v.s.logf("version@stat F·%v S·%s%v Sc·%v", statFiles, shortenb(int(statTotSize)), statSizes, statScore) 396} 397 398func (v *version) needCompaction() bool { 399 return v.cScore >= 1 || atomic.LoadPointer(&v.cSeek) != nil 400} 401 402type tablesScratch struct { 403 added map[int64]atRecord 404 deleted map[int64]struct{} 405} 406 407type versionStaging struct { 408 base *version 409 levels []tablesScratch 410} 411 412func (p *versionStaging) getScratch(level int) *tablesScratch { 413 if level >= len(p.levels) { 414 newLevels := make([]tablesScratch, level+1) 415 copy(newLevels, p.levels) 416 p.levels = newLevels 417 } 418 return &(p.levels[level]) 419} 420 421func (p *versionStaging) commit(r *sessionRecord) { 422 // Deleted tables. 423 for _, r := range r.deletedTables { 424 scratch := p.getScratch(r.level) 425 if r.level < len(p.base.levels) && len(p.base.levels[r.level]) > 0 { 426 if scratch.deleted == nil { 427 scratch.deleted = make(map[int64]struct{}) 428 } 429 scratch.deleted[r.num] = struct{}{} 430 } 431 if scratch.added != nil { 432 delete(scratch.added, r.num) 433 } 434 } 435 436 // New tables. 437 for _, r := range r.addedTables { 438 scratch := p.getScratch(r.level) 439 if scratch.added == nil { 440 scratch.added = make(map[int64]atRecord) 441 } 442 scratch.added[r.num] = r 443 if scratch.deleted != nil { 444 delete(scratch.deleted, r.num) 445 } 446 } 447} 448 449func (p *versionStaging) finish() *version { 450 // Build new version. 451 nv := newVersion(p.base.s) 452 numLevel := len(p.levels) 453 if len(p.base.levels) > numLevel { 454 numLevel = len(p.base.levels) 455 } 456 nv.levels = make([]tFiles, numLevel) 457 for level := 0; level < numLevel; level++ { 458 var baseTabels tFiles 459 if level < len(p.base.levels) { 460 baseTabels = p.base.levels[level] 461 } 462 463 if level < len(p.levels) { 464 scratch := p.levels[level] 465 466 var nt tFiles 467 // Prealloc list if possible. 468 if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 { 469 nt = make(tFiles, 0, n) 470 } 471 472 // Base tables. 473 for _, t := range baseTabels { 474 if _, ok := scratch.deleted[t.fd.Num]; ok { 475 continue 476 } 477 if _, ok := scratch.added[t.fd.Num]; ok { 478 continue 479 } 480 nt = append(nt, t) 481 } 482 483 // New tables. 484 for _, r := range scratch.added { 485 nt = append(nt, tableFileFromRecord(r)) 486 } 487 488 if len(nt) != 0 { 489 // Sort tables. 490 if level == 0 { 491 nt.sortByNum() 492 } else { 493 nt.sortByKey(p.base.s.icmp) 494 } 495 496 nv.levels[level] = nt 497 } 498 } else { 499 nv.levels[level] = baseTabels 500 } 501 } 502 503 // Trim levels. 504 n := len(nv.levels) 505 for ; n > 0 && nv.levels[n-1] == nil; n-- { 506 } 507 nv.levels = nv.levels[:n] 508 509 // Compute compaction score for new version. 510 nv.computeCompaction() 511 512 return nv 513} 514 515type versionReleaser struct { 516 v *version 517 once bool 518} 519 520func (vr *versionReleaser) Release() { 521 v := vr.v 522 v.s.vmu.Lock() 523 if !vr.once { 524 v.releaseNB() 525 vr.once = true 526 } 527 v.s.vmu.Unlock() 528} 529