1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"bytes"
11	"fmt"
12	"sort"
13	"sync/atomic"
14
15	"github.com/syndtr/goleveldb/leveldb/cache"
16	"github.com/syndtr/goleveldb/leveldb/iterator"
17	"github.com/syndtr/goleveldb/leveldb/opt"
18	"github.com/syndtr/goleveldb/leveldb/storage"
19	"github.com/syndtr/goleveldb/leveldb/table"
20	"github.com/syndtr/goleveldb/leveldb/util"
21)
22
23// tFile holds basic information about a table.
24type tFile struct {
25	fd         storage.FileDesc
26	seekLeft   int32
27	size       int64
28	imin, imax internalKey
29}
30
31// Returns true if given key is after largest key of this table.
32func (t *tFile) after(icmp *iComparer, ukey []byte) bool {
33	return ukey != nil && icmp.uCompare(ukey, t.imax.ukey()) > 0
34}
35
36// Returns true if given key is before smallest key of this table.
37func (t *tFile) before(icmp *iComparer, ukey []byte) bool {
38	return ukey != nil && icmp.uCompare(ukey, t.imin.ukey()) < 0
39}
40
41// Returns true if given key range overlaps with this table key range.
42func (t *tFile) overlaps(icmp *iComparer, umin, umax []byte) bool {
43	return !t.after(icmp, umin) && !t.before(icmp, umax)
44}
45
46// Cosumes one seek and return current seeks left.
47func (t *tFile) consumeSeek() int32 {
48	return atomic.AddInt32(&t.seekLeft, -1)
49}
50
51// Creates new tFile.
52func newTableFile(fd storage.FileDesc, size int64, imin, imax internalKey) *tFile {
53	f := &tFile{
54		fd:   fd,
55		size: size,
56		imin: imin,
57		imax: imax,
58	}
59
60	// We arrange to automatically compact this file after
61	// a certain number of seeks.  Let's assume:
62	//   (1) One seek costs 10ms
63	//   (2) Writing or reading 1MB costs 10ms (100MB/s)
64	//   (3) A compaction of 1MB does 25MB of IO:
65	//         1MB read from this level
66	//         10-12MB read from next level (boundaries may be misaligned)
67	//         10-12MB written to next level
68	// This implies that 25 seeks cost the same as the compaction
69	// of 1MB of data.  I.e., one seek costs approximately the
70	// same as the compaction of 40KB of data.  We are a little
71	// conservative and allow approximately one seek for every 16KB
72	// of data before triggering a compaction.
73	f.seekLeft = int32(size / 16384)
74	if f.seekLeft < 100 {
75		f.seekLeft = 100
76	}
77
78	return f
79}
80
81func tableFileFromRecord(r atRecord) *tFile {
82	return newTableFile(storage.FileDesc{Type: storage.TypeTable, Num: r.num}, r.size, r.imin, r.imax)
83}
84
85// tFiles hold multiple tFile.
86type tFiles []*tFile
87
88func (tf tFiles) Len() int      { return len(tf) }
89func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] }
90
91func (tf tFiles) nums() string {
92	x := "[ "
93	for i, f := range tf {
94		if i != 0 {
95			x += ", "
96		}
97		x += fmt.Sprint(f.fd.Num)
98	}
99	x += " ]"
100	return x
101}
102
103// Returns true if i smallest key is less than j.
104// This used for sort by key in ascending order.
105func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool {
106	a, b := tf[i], tf[j]
107	n := icmp.Compare(a.imin, b.imin)
108	if n == 0 {
109		return a.fd.Num < b.fd.Num
110	}
111	return n < 0
112}
113
114// Returns true if i file number is greater than j.
115// This used for sort by file number in descending order.
116func (tf tFiles) lessByNum(i, j int) bool {
117	return tf[i].fd.Num > tf[j].fd.Num
118}
119
120// Sorts tables by key in ascending order.
121func (tf tFiles) sortByKey(icmp *iComparer) {
122	sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp})
123}
124
125// Sorts tables by file number in descending order.
126func (tf tFiles) sortByNum() {
127	sort.Sort(&tFilesSortByNum{tFiles: tf})
128}
129
130// Returns sum of all tables size.
131func (tf tFiles) size() (sum int64) {
132	for _, t := range tf {
133		sum += t.size
134	}
135	return sum
136}
137
138// Searches smallest index of tables whose its smallest
139// key is after or equal with given key.
140func (tf tFiles) searchMin(icmp *iComparer, ikey internalKey) int {
141	return sort.Search(len(tf), func(i int) bool {
142		return icmp.Compare(tf[i].imin, ikey) >= 0
143	})
144}
145
146// Searches smallest index of tables whose its largest
147// key is after or equal with given key.
148func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int {
149	return sort.Search(len(tf), func(i int) bool {
150		return icmp.Compare(tf[i].imax, ikey) >= 0
151	})
152}
153
154// Searches smallest index of tables whose its file number
155// is smaller than the given number.
156func (tf tFiles) searchNumLess(num int64) int {
157	return sort.Search(len(tf), func(i int) bool {
158		return tf[i].fd.Num < num
159	})
160}
161
162// Searches smallest index of tables whose its smallest
163// key is after the given key.
164func (tf tFiles) searchMinUkey(icmp *iComparer, umin []byte) int {
165	return sort.Search(len(tf), func(i int) bool {
166		return icmp.ucmp.Compare(tf[i].imin.ukey(), umin) > 0
167	})
168}
169
170// Searches smallest index of tables whose its largest
171// key is after the given key.
172func (tf tFiles) searchMaxUkey(icmp *iComparer, umax []byte) int {
173	return sort.Search(len(tf), func(i int) bool {
174		return icmp.ucmp.Compare(tf[i].imax.ukey(), umax) > 0
175	})
176}
177
178// Returns true if given key range overlaps with one or more
179// tables key range. If unsorted is true then binary search will not be used.
180func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {
181	if unsorted {
182		// Check against all files.
183		for _, t := range tf {
184			if t.overlaps(icmp, umin, umax) {
185				return true
186			}
187		}
188		return false
189	}
190
191	i := 0
192	if len(umin) > 0 {
193		// Find the earliest possible internal key for min.
194		i = tf.searchMax(icmp, makeInternalKey(nil, umin, keyMaxSeq, keyTypeSeek))
195	}
196	if i >= len(tf) {
197		// Beginning of range is after all files, so no overlap.
198		return false
199	}
200	return !tf[i].before(icmp, umax)
201}
202
203// Returns tables whose its key range overlaps with given key range.
204// Range will be expanded if ukey found hop across tables.
205// If overlapped is true then the search will be restarted if umax
206// expanded.
207// The dst content will be overwritten.
208func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles {
209	// Short circuit if tf is empty
210	if len(tf) == 0 {
211		return nil
212	}
213	// For non-zero levels, there is no ukey hop across at all.
214	// And what's more, the files in these levels are strictly sorted,
215	// so use binary search instead of heavy traverse.
216	if !overlapped {
217		var begin, end int
218		// Determine the begin index of the overlapped file
219		if umin != nil {
220			index := tf.searchMinUkey(icmp, umin)
221			if index == 0 {
222				begin = 0
223			} else if bytes.Compare(tf[index-1].imax.ukey(), umin) >= 0 {
224				// The min ukey overlaps with the index-1 file, expand it.
225				begin = index - 1
226			} else {
227				begin = index
228			}
229		}
230		// Determine the end index of the overlapped file
231		if umax != nil {
232			index := tf.searchMaxUkey(icmp, umax)
233			if index == len(tf) {
234				end = len(tf)
235			} else if bytes.Compare(tf[index].imin.ukey(), umax) <= 0 {
236				// The max ukey overlaps with the index file, expand it.
237				end = index + 1
238			} else {
239				end = index
240			}
241		} else {
242			end = len(tf)
243		}
244		// Ensure the overlapped file indexes are valid.
245		if begin >= end {
246			return nil
247		}
248		dst = make([]*tFile, end-begin)
249		copy(dst, tf[begin:end])
250		return dst
251	}
252
253	dst = dst[:0]
254	for i := 0; i < len(tf); {
255		t := tf[i]
256		if t.overlaps(icmp, umin, umax) {
257			if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 {
258				umin = t.imin.ukey()
259				dst = dst[:0]
260				i = 0
261				continue
262			} else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
263				umax = t.imax.ukey()
264				// Restart search if it is overlapped.
265				dst = dst[:0]
266				i = 0
267				continue
268			}
269
270			dst = append(dst, t)
271		}
272		i++
273	}
274
275	return dst
276}
277
278// Returns tables key range.
279func (tf tFiles) getRange(icmp *iComparer) (imin, imax internalKey) {
280	for i, t := range tf {
281		if i == 0 {
282			imin, imax = t.imin, t.imax
283			continue
284		}
285		if icmp.Compare(t.imin, imin) < 0 {
286			imin = t.imin
287		}
288		if icmp.Compare(t.imax, imax) > 0 {
289			imax = t.imax
290		}
291	}
292
293	return
294}
295
296// Creates iterator index from tables.
297func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer {
298	if slice != nil {
299		var start, limit int
300		if slice.Start != nil {
301			start = tf.searchMax(icmp, internalKey(slice.Start))
302		}
303		if slice.Limit != nil {
304			limit = tf.searchMin(icmp, internalKey(slice.Limit))
305		} else {
306			limit = tf.Len()
307		}
308		tf = tf[start:limit]
309	}
310	return iterator.NewArrayIndexer(&tFilesArrayIndexer{
311		tFiles: tf,
312		tops:   tops,
313		icmp:   icmp,
314		slice:  slice,
315		ro:     ro,
316	})
317}
318
319// Tables iterator index.
320type tFilesArrayIndexer struct {
321	tFiles
322	tops  *tOps
323	icmp  *iComparer
324	slice *util.Range
325	ro    *opt.ReadOptions
326}
327
328func (a *tFilesArrayIndexer) Search(key []byte) int {
329	return a.searchMax(a.icmp, internalKey(key))
330}
331
332func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator {
333	if i == 0 || i == a.Len()-1 {
334		return a.tops.newIterator(a.tFiles[i], a.slice, a.ro)
335	}
336	return a.tops.newIterator(a.tFiles[i], nil, a.ro)
337}
338
339// Helper type for sortByKey.
340type tFilesSortByKey struct {
341	tFiles
342	icmp *iComparer
343}
344
345func (x *tFilesSortByKey) Less(i, j int) bool {
346	return x.lessByKey(x.icmp, i, j)
347}
348
349// Helper type for sortByNum.
350type tFilesSortByNum struct {
351	tFiles
352}
353
354func (x *tFilesSortByNum) Less(i, j int) bool {
355	return x.lessByNum(i, j)
356}
357
358// Table operations.
359type tOps struct {
360	s            *session
361	noSync       bool
362	evictRemoved bool
363	cache        *cache.Cache
364	bcache       *cache.Cache
365	bpool        *util.BufferPool
366}
367
368// Creates an empty table and returns table writer.
369func (t *tOps) create() (*tWriter, error) {
370	fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()}
371	fw, err := t.s.stor.Create(fd)
372	if err != nil {
373		return nil, err
374	}
375	return &tWriter{
376		t:  t,
377		fd: fd,
378		w:  fw,
379		tw: table.NewWriter(fw, t.s.o.Options),
380	}, nil
381}
382
383// Builds table from src iterator.
384func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
385	w, err := t.create()
386	if err != nil {
387		return
388	}
389
390	defer func() {
391		if err != nil {
392			w.drop()
393		}
394	}()
395
396	for src.Next() {
397		err = w.append(src.Key(), src.Value())
398		if err != nil {
399			return
400		}
401	}
402	err = src.Error()
403	if err != nil {
404		return
405	}
406
407	n = w.tw.EntriesLen()
408	f, err = w.finish()
409	return
410}
411
412// Opens table. It returns a cache handle, which should
413// be released after use.
414func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) {
415	ch = t.cache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) {
416		var r storage.Reader
417		r, err = t.s.stor.Open(f.fd)
418		if err != nil {
419			return 0, nil
420		}
421
422		var bcache *cache.NamespaceGetter
423		if t.bcache != nil {
424			bcache = &cache.NamespaceGetter{Cache: t.bcache, NS: uint64(f.fd.Num)}
425		}
426
427		var tr *table.Reader
428		tr, err = table.NewReader(r, f.size, f.fd, bcache, t.bpool, t.s.o.Options)
429		if err != nil {
430			r.Close()
431			return 0, nil
432		}
433		return 1, tr
434
435	})
436	if ch == nil && err == nil {
437		err = ErrClosed
438	}
439	return
440}
441
442// Finds key/value pair whose key is greater than or equal to the
443// given key.
444func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) {
445	ch, err := t.open(f)
446	if err != nil {
447		return nil, nil, err
448	}
449	defer ch.Release()
450	return ch.Value().(*table.Reader).Find(key, true, ro)
451}
452
453// Finds key that is greater than or equal to the given key.
454func (t *tOps) findKey(f *tFile, key []byte, ro *opt.ReadOptions) (rkey []byte, err error) {
455	ch, err := t.open(f)
456	if err != nil {
457		return nil, err
458	}
459	defer ch.Release()
460	return ch.Value().(*table.Reader).FindKey(key, true, ro)
461}
462
463// Returns approximate offset of the given key.
464func (t *tOps) offsetOf(f *tFile, key []byte) (offset int64, err error) {
465	ch, err := t.open(f)
466	if err != nil {
467		return
468	}
469	defer ch.Release()
470	return ch.Value().(*table.Reader).OffsetOf(key)
471}
472
473// Creates an iterator from the given table.
474func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
475	ch, err := t.open(f)
476	if err != nil {
477		return iterator.NewEmptyIterator(err)
478	}
479	iter := ch.Value().(*table.Reader).NewIterator(slice, ro)
480	iter.SetReleaser(ch)
481	return iter
482}
483
484// Removes table from persistent storage. It waits until
485// no one use the the table.
486func (t *tOps) remove(fd storage.FileDesc) {
487	t.cache.Delete(0, uint64(fd.Num), func() {
488		if err := t.s.stor.Remove(fd); err != nil {
489			t.s.logf("table@remove removing @%d %q", fd.Num, err)
490		} else {
491			t.s.logf("table@remove removed @%d", fd.Num)
492		}
493		if t.evictRemoved && t.bcache != nil {
494			t.bcache.EvictNS(uint64(fd.Num))
495		}
496		// Try to reuse file num, useful for discarded transaction.
497		t.s.reuseFileNum(fd.Num)
498	})
499}
500
501// Closes the table ops instance. It will close all tables,
502// regadless still used or not.
503func (t *tOps) close() {
504	t.bpool.Close()
505	t.cache.Close()
506	if t.bcache != nil {
507		t.bcache.CloseWeak()
508	}
509}
510
511// Creates new initialized table ops instance.
512func newTableOps(s *session) *tOps {
513	var (
514		cacher cache.Cacher
515		bcache *cache.Cache
516		bpool  *util.BufferPool
517	)
518	if s.o.GetOpenFilesCacheCapacity() > 0 {
519		cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity())
520	}
521	if !s.o.GetDisableBlockCache() {
522		var bcacher cache.Cacher
523		if s.o.GetBlockCacheCapacity() > 0 {
524			bcacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity())
525		}
526		bcache = cache.NewCache(bcacher)
527	}
528	if !s.o.GetDisableBufferPool() {
529		bpool = util.NewBufferPool(s.o.GetBlockSize() + 5)
530	}
531	return &tOps{
532		s:            s,
533		noSync:       s.o.GetNoSync(),
534		evictRemoved: s.o.GetBlockCacheEvictRemoved(),
535		cache:        cache.NewCache(cacher),
536		bcache:       bcache,
537		bpool:        bpool,
538	}
539}
540
541// tWriter wraps the table writer. It keep track of file descriptor
542// and added key range.
543type tWriter struct {
544	t *tOps
545
546	fd storage.FileDesc
547	w  storage.Writer
548	tw *table.Writer
549
550	first, last []byte
551}
552
553// Append key/value pair to the table.
554func (w *tWriter) append(key, value []byte) error {
555	if w.first == nil {
556		w.first = append([]byte{}, key...)
557	}
558	w.last = append(w.last[:0], key...)
559	return w.tw.Append(key, value)
560}
561
562// Returns true if the table is empty.
563func (w *tWriter) empty() bool {
564	return w.first == nil
565}
566
567// Closes the storage.Writer.
568func (w *tWriter) close() {
569	if w.w != nil {
570		w.w.Close()
571		w.w = nil
572	}
573}
574
575// Finalizes the table and returns table file.
576func (w *tWriter) finish() (f *tFile, err error) {
577	defer w.close()
578	err = w.tw.Close()
579	if err != nil {
580		return
581	}
582	if !w.t.noSync {
583		err = w.w.Sync()
584		if err != nil {
585			return
586		}
587	}
588	f = newTableFile(w.fd, int64(w.tw.BytesLen()), internalKey(w.first), internalKey(w.last))
589	return
590}
591
592// Drops the table.
593func (w *tWriter) drop() {
594	w.close()
595	w.t.s.stor.Remove(w.fd)
596	w.t.s.reuseFileNum(w.fd.Num)
597	w.tw = nil
598	w.first = nil
599	w.last = nil
600}
601