1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"fmt"
11	"sync/atomic"
12	"unsafe"
13
14	"github.com/syndtr/goleveldb/leveldb/iterator"
15	"github.com/syndtr/goleveldb/leveldb/opt"
16	"github.com/syndtr/goleveldb/leveldb/util"
17)
18
19type tSet struct {
20	level int
21	table *tFile
22}
23
24type version struct {
25	s *session
26
27	levels []tFiles
28
29	// Level that should be compacted next and its compaction score.
30	// Score < 1 means compaction is not strictly needed. These fields
31	// are initialized by computeCompaction()
32	cLevel int
33	cScore float64
34
35	cSeek unsafe.Pointer
36
37	closing  bool
38	ref      int
39	released bool
40}
41
42func newVersion(s *session) *version {
43	return &version{s: s}
44}
45
46func (v *version) incref() {
47	if v.released {
48		panic("already released")
49	}
50
51	v.ref++
52	if v.ref == 1 {
53		// Incr file ref.
54		for _, tt := range v.levels {
55			for _, t := range tt {
56				v.s.addFileRef(t.fd, 1)
57			}
58		}
59	}
60}
61
62func (v *version) releaseNB() {
63	v.ref--
64	if v.ref > 0 {
65		return
66	} else if v.ref < 0 {
67		panic("negative version ref")
68	}
69
70	for _, tt := range v.levels {
71		for _, t := range tt {
72			if v.s.addFileRef(t.fd, -1) == 0 {
73				v.s.tops.remove(t)
74			}
75		}
76	}
77
78	v.released = true
79}
80
81func (v *version) release() {
82	v.s.vmu.Lock()
83	v.releaseNB()
84	v.s.vmu.Unlock()
85}
86
87func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int, t *tFile) bool, lf func(level int) bool) {
88	ukey := ikey.ukey()
89
90	// Aux level.
91	if aux != nil {
92		for _, t := range aux {
93			if t.overlaps(v.s.icmp, ukey, ukey) {
94				if !f(-1, t) {
95					return
96				}
97			}
98		}
99
100		if lf != nil && !lf(-1) {
101			return
102		}
103	}
104
105	// Walk tables level-by-level.
106	for level, tables := range v.levels {
107		if len(tables) == 0 {
108			continue
109		}
110
111		if level == 0 {
112			// Level-0 files may overlap each other. Find all files that
113			// overlap ukey.
114			for _, t := range tables {
115				if t.overlaps(v.s.icmp, ukey, ukey) {
116					if !f(level, t) {
117						return
118					}
119				}
120			}
121		} else {
122			if i := tables.searchMax(v.s.icmp, ikey); i < len(tables) {
123				t := tables[i]
124				if v.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
125					if !f(level, t) {
126						return
127					}
128				}
129			}
130		}
131
132		if lf != nil && !lf(level) {
133			return
134		}
135	}
136}
137
138func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) {
139	if v.closing {
140		return nil, false, ErrClosed
141	}
142
143	ukey := ikey.ukey()
144
145	var (
146		tset  *tSet
147		tseek bool
148
149		// Level-0.
150		zfound bool
151		zseq   uint64
152		zkt    keyType
153		zval   []byte
154	)
155
156	err = ErrNotFound
157
158	// Since entries never hop across level, finding key/value
159	// in smaller level make later levels irrelevant.
160	v.walkOverlapping(aux, ikey, func(level int, t *tFile) bool {
161		if level >= 0 && !tseek {
162			if tset == nil {
163				tset = &tSet{level, t}
164			} else {
165				tseek = true
166			}
167		}
168
169		var (
170			fikey, fval []byte
171			ferr        error
172		)
173		if noValue {
174			fikey, ferr = v.s.tops.findKey(t, ikey, ro)
175		} else {
176			fikey, fval, ferr = v.s.tops.find(t, ikey, ro)
177		}
178
179		switch ferr {
180		case nil:
181		case ErrNotFound:
182			return true
183		default:
184			err = ferr
185			return false
186		}
187
188		if fukey, fseq, fkt, fkerr := parseInternalKey(fikey); fkerr == nil {
189			if v.s.icmp.uCompare(ukey, fukey) == 0 {
190				// Level <= 0 may overlaps each-other.
191				if level <= 0 {
192					if fseq >= zseq {
193						zfound = true
194						zseq = fseq
195						zkt = fkt
196						zval = fval
197					}
198				} else {
199					switch fkt {
200					case keyTypeVal:
201						value = fval
202						err = nil
203					case keyTypeDel:
204					default:
205						panic("leveldb: invalid internalKey type")
206					}
207					return false
208				}
209			}
210		} else {
211			err = fkerr
212			return false
213		}
214
215		return true
216	}, func(level int) bool {
217		if zfound {
218			switch zkt {
219			case keyTypeVal:
220				value = zval
221				err = nil
222			case keyTypeDel:
223			default:
224				panic("leveldb: invalid internalKey type")
225			}
226			return false
227		}
228
229		return true
230	})
231
232	if tseek && tset.table.consumeSeek() <= 0 {
233		tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
234	}
235
236	return
237}
238
239func (v *version) sampleSeek(ikey internalKey) (tcomp bool) {
240	var tset *tSet
241
242	v.walkOverlapping(nil, ikey, func(level int, t *tFile) bool {
243		if tset == nil {
244			tset = &tSet{level, t}
245			return true
246		}
247		if tset.table.consumeSeek() <= 0 {
248			tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
249		}
250		return false
251	}, nil)
252
253	return
254}
255
256func (v *version) getIterators(slice *util.Range, ro *opt.ReadOptions) (its []iterator.Iterator) {
257	strict := opt.GetStrict(v.s.o.Options, ro, opt.StrictReader)
258	for level, tables := range v.levels {
259		if level == 0 {
260			// Merge all level zero files together since they may overlap.
261			for _, t := range tables {
262				its = append(its, v.s.tops.newIterator(t, slice, ro))
263			}
264		} else if len(tables) != 0 {
265			its = append(its, iterator.NewIndexedIterator(tables.newIndexIterator(v.s.tops, v.s.icmp, slice, ro), strict))
266		}
267	}
268	return
269}
270
271func (v *version) newStaging() *versionStaging {
272	return &versionStaging{base: v}
273}
274
275// Spawn a new version based on this version.
276func (v *version) spawn(r *sessionRecord) *version {
277	staging := v.newStaging()
278	staging.commit(r)
279	return staging.finish()
280}
281
282func (v *version) fillRecord(r *sessionRecord) {
283	for level, tables := range v.levels {
284		for _, t := range tables {
285			r.addTableFile(level, t)
286		}
287	}
288}
289
290func (v *version) tLen(level int) int {
291	if level < len(v.levels) {
292		return len(v.levels[level])
293	}
294	return 0
295}
296
297func (v *version) offsetOf(ikey internalKey) (n int64, err error) {
298	for level, tables := range v.levels {
299		for _, t := range tables {
300			if v.s.icmp.Compare(t.imax, ikey) <= 0 {
301				// Entire file is before "ikey", so just add the file size
302				n += t.size
303			} else if v.s.icmp.Compare(t.imin, ikey) > 0 {
304				// Entire file is after "ikey", so ignore
305				if level > 0 {
306					// Files other than level 0 are sorted by meta->min, so
307					// no further files in this level will contain data for
308					// "ikey".
309					break
310				}
311			} else {
312				// "ikey" falls in the range for this table. Add the
313				// approximate offset of "ikey" within the table.
314				if m, err := v.s.tops.offsetOf(t, ikey); err == nil {
315					n += m
316				} else {
317					return 0, err
318				}
319			}
320		}
321	}
322
323	return
324}
325
326func (v *version) pickMemdbLevel(umin, umax []byte, maxLevel int) (level int) {
327	if maxLevel > 0 {
328		if len(v.levels) == 0 {
329			return maxLevel
330		}
331		if !v.levels[0].overlaps(v.s.icmp, umin, umax, true) {
332			var overlaps tFiles
333			for ; level < maxLevel; level++ {
334				if pLevel := level + 1; pLevel >= len(v.levels) {
335					return maxLevel
336				} else if v.levels[pLevel].overlaps(v.s.icmp, umin, umax, false) {
337					break
338				}
339				if gpLevel := level + 2; gpLevel < len(v.levels) {
340					overlaps = v.levels[gpLevel].getOverlaps(overlaps, v.s.icmp, umin, umax, false)
341					if overlaps.size() > int64(v.s.o.GetCompactionGPOverlaps(level)) {
342						break
343					}
344				}
345			}
346		}
347	}
348	return
349}
350
351func (v *version) computeCompaction() {
352	// Precomputed best level for next compaction
353	bestLevel := int(-1)
354	bestScore := float64(-1)
355
356	statFiles := make([]int, len(v.levels))
357	statSizes := make([]string, len(v.levels))
358	statScore := make([]string, len(v.levels))
359	statTotSize := int64(0)
360
361	for level, tables := range v.levels {
362		var score float64
363		size := tables.size()
364		if level == 0 {
365			// We treat level-0 specially by bounding the number of files
366			// instead of number of bytes for two reasons:
367			//
368			// (1) With larger write-buffer sizes, it is nice not to do too
369			// many level-0 compaction.
370			//
371			// (2) The files in level-0 are merged on every read and
372			// therefore we wish to avoid too many files when the individual
373			// file size is small (perhaps because of a small write-buffer
374			// setting, or very high compression ratios, or lots of
375			// overwrites/deletions).
376			score = float64(len(tables)) / float64(v.s.o.GetCompactionL0Trigger())
377		} else {
378			score = float64(size) / float64(v.s.o.GetCompactionTotalSize(level))
379		}
380
381		if score > bestScore {
382			bestLevel = level
383			bestScore = score
384		}
385
386		statFiles[level] = len(tables)
387		statSizes[level] = shortenb(int(size))
388		statScore[level] = fmt.Sprintf("%.2f", score)
389		statTotSize += size
390	}
391
392	v.cLevel = bestLevel
393	v.cScore = bestScore
394
395	v.s.logf("version@stat F·%v S·%s%v Sc·%v", statFiles, shortenb(int(statTotSize)), statSizes, statScore)
396}
397
398func (v *version) needCompaction() bool {
399	return v.cScore >= 1 || atomic.LoadPointer(&v.cSeek) != nil
400}
401
402type tablesScratch struct {
403	added   map[int64]atRecord
404	deleted map[int64]struct{}
405}
406
407type versionStaging struct {
408	base   *version
409	levels []tablesScratch
410}
411
412func (p *versionStaging) getScratch(level int) *tablesScratch {
413	if level >= len(p.levels) {
414		newLevels := make([]tablesScratch, level+1)
415		copy(newLevels, p.levels)
416		p.levels = newLevels
417	}
418	return &(p.levels[level])
419}
420
421func (p *versionStaging) commit(r *sessionRecord) {
422	// Deleted tables.
423	for _, r := range r.deletedTables {
424		scratch := p.getScratch(r.level)
425		if r.level < len(p.base.levels) && len(p.base.levels[r.level]) > 0 {
426			if scratch.deleted == nil {
427				scratch.deleted = make(map[int64]struct{})
428			}
429			scratch.deleted[r.num] = struct{}{}
430		}
431		if scratch.added != nil {
432			delete(scratch.added, r.num)
433		}
434	}
435
436	// New tables.
437	for _, r := range r.addedTables {
438		scratch := p.getScratch(r.level)
439		if scratch.added == nil {
440			scratch.added = make(map[int64]atRecord)
441		}
442		scratch.added[r.num] = r
443		if scratch.deleted != nil {
444			delete(scratch.deleted, r.num)
445		}
446	}
447}
448
449func (p *versionStaging) finish() *version {
450	// Build new version.
451	nv := newVersion(p.base.s)
452	numLevel := len(p.levels)
453	if len(p.base.levels) > numLevel {
454		numLevel = len(p.base.levels)
455	}
456	nv.levels = make([]tFiles, numLevel)
457	for level := 0; level < numLevel; level++ {
458		var baseTabels tFiles
459		if level < len(p.base.levels) {
460			baseTabels = p.base.levels[level]
461		}
462
463		if level < len(p.levels) {
464			scratch := p.levels[level]
465
466			var nt tFiles
467			// Prealloc list if possible.
468			if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 {
469				nt = make(tFiles, 0, n)
470			}
471
472			// Base tables.
473			for _, t := range baseTabels {
474				if _, ok := scratch.deleted[t.fd.Num]; ok {
475					continue
476				}
477				if _, ok := scratch.added[t.fd.Num]; ok {
478					continue
479				}
480				nt = append(nt, t)
481			}
482
483			// New tables.
484			for _, r := range scratch.added {
485				nt = append(nt, tableFileFromRecord(r))
486			}
487
488			if len(nt) != 0 {
489				// Sort tables.
490				if level == 0 {
491					nt.sortByNum()
492				} else {
493					nt.sortByKey(p.base.s.icmp)
494				}
495
496				nv.levels[level] = nt
497			}
498		} else {
499			nv.levels[level] = baseTabels
500		}
501	}
502
503	// Trim levels.
504	n := len(nv.levels)
505	for ; n > 0 && nv.levels[n-1] == nil; n-- {
506	}
507	nv.levels = nv.levels[:n]
508
509	// Compute compaction score for new version.
510	nv.computeCompaction()
511
512	return nv
513}
514
515type versionReleaser struct {
516	v    *version
517	once bool
518}
519
520func (vr *versionReleaser) Release() {
521	v := vr.v
522	v.s.vmu.Lock()
523	if !vr.once {
524		v.releaseNB()
525		vr.once = true
526	}
527	v.s.vmu.Unlock()
528}
529