1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"fmt"
11	"sort"
12	"sync/atomic"
13
14	"github.com/syndtr/goleveldb/leveldb/cache"
15	"github.com/syndtr/goleveldb/leveldb/iterator"
16	"github.com/syndtr/goleveldb/leveldb/opt"
17	"github.com/syndtr/goleveldb/leveldb/storage"
18	"github.com/syndtr/goleveldb/leveldb/table"
19	"github.com/syndtr/goleveldb/leveldb/util"
20)
21
22// tFile holds basic information about a table.
23type tFile struct {
24	fd         storage.FileDesc
25	seekLeft   int32
26	size       int64
27	imin, imax internalKey
28}
29
30// Returns true if given key is after largest key of this table.
31func (t *tFile) after(icmp *iComparer, ukey []byte) bool {
32	return ukey != nil && icmp.uCompare(ukey, t.imax.ukey()) > 0
33}
34
35// Returns true if given key is before smallest key of this table.
36func (t *tFile) before(icmp *iComparer, ukey []byte) bool {
37	return ukey != nil && icmp.uCompare(ukey, t.imin.ukey()) < 0
38}
39
40// Returns true if given key range overlaps with this table key range.
41func (t *tFile) overlaps(icmp *iComparer, umin, umax []byte) bool {
42	return !t.after(icmp, umin) && !t.before(icmp, umax)
43}
44
45// Cosumes one seek and return current seeks left.
46func (t *tFile) consumeSeek() int32 {
47	return atomic.AddInt32(&t.seekLeft, -1)
48}
49
50// Creates new tFile.
51func newTableFile(fd storage.FileDesc, size int64, imin, imax internalKey) *tFile {
52	f := &tFile{
53		fd:   fd,
54		size: size,
55		imin: imin,
56		imax: imax,
57	}
58
59	// We arrange to automatically compact this file after
60	// a certain number of seeks.  Let's assume:
61	//   (1) One seek costs 10ms
62	//   (2) Writing or reading 1MB costs 10ms (100MB/s)
63	//   (3) A compaction of 1MB does 25MB of IO:
64	//         1MB read from this level
65	//         10-12MB read from next level (boundaries may be misaligned)
66	//         10-12MB written to next level
67	// This implies that 25 seeks cost the same as the compaction
68	// of 1MB of data.  I.e., one seek costs approximately the
69	// same as the compaction of 40KB of data.  We are a little
70	// conservative and allow approximately one seek for every 16KB
71	// of data before triggering a compaction.
72	f.seekLeft = int32(size / 16384)
73	if f.seekLeft < 100 {
74		f.seekLeft = 100
75	}
76
77	return f
78}
79
80func tableFileFromRecord(r atRecord) *tFile {
81	return newTableFile(storage.FileDesc{Type: storage.TypeTable, Num: r.num}, r.size, r.imin, r.imax)
82}
83
84// tFiles hold multiple tFile.
85type tFiles []*tFile
86
87func (tf tFiles) Len() int      { return len(tf) }
88func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] }
89
90func (tf tFiles) nums() string {
91	x := "[ "
92	for i, f := range tf {
93		if i != 0 {
94			x += ", "
95		}
96		x += fmt.Sprint(f.fd.Num)
97	}
98	x += " ]"
99	return x
100}
101
102// Returns true if i smallest key is less than j.
103// This used for sort by key in ascending order.
104func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool {
105	a, b := tf[i], tf[j]
106	n := icmp.Compare(a.imin, b.imin)
107	if n == 0 {
108		return a.fd.Num < b.fd.Num
109	}
110	return n < 0
111}
112
113// Returns true if i file number is greater than j.
114// This used for sort by file number in descending order.
115func (tf tFiles) lessByNum(i, j int) bool {
116	return tf[i].fd.Num > tf[j].fd.Num
117}
118
119// Sorts tables by key in ascending order.
120func (tf tFiles) sortByKey(icmp *iComparer) {
121	sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp})
122}
123
124// Sorts tables by file number in descending order.
125func (tf tFiles) sortByNum() {
126	sort.Sort(&tFilesSortByNum{tFiles: tf})
127}
128
129// Returns sum of all tables size.
130func (tf tFiles) size() (sum int64) {
131	for _, t := range tf {
132		sum += t.size
133	}
134	return sum
135}
136
137// Searches smallest index of tables whose its smallest
138// key is after or equal with given key.
139func (tf tFiles) searchMin(icmp *iComparer, ikey internalKey) int {
140	return sort.Search(len(tf), func(i int) bool {
141		return icmp.Compare(tf[i].imin, ikey) >= 0
142	})
143}
144
145// Searches smallest index of tables whose its largest
146// key is after or equal with given key.
147func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int {
148	return sort.Search(len(tf), func(i int) bool {
149		return icmp.Compare(tf[i].imax, ikey) >= 0
150	})
151}
152
153// Returns true if given key range overlaps with one or more
154// tables key range. If unsorted is true then binary search will not be used.
155func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {
156	if unsorted {
157		// Check against all files.
158		for _, t := range tf {
159			if t.overlaps(icmp, umin, umax) {
160				return true
161			}
162		}
163		return false
164	}
165
166	i := 0
167	if len(umin) > 0 {
168		// Find the earliest possible internal key for min.
169		i = tf.searchMax(icmp, makeInternalKey(nil, umin, keyMaxSeq, keyTypeSeek))
170	}
171	if i >= len(tf) {
172		// Beginning of range is after all files, so no overlap.
173		return false
174	}
175	return !tf[i].before(icmp, umax)
176}
177
178// Returns tables whose its key range overlaps with given key range.
179// Range will be expanded if ukey found hop across tables.
180// If overlapped is true then the search will be restarted if umax
181// expanded.
182// The dst content will be overwritten.
183func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles {
184	dst = dst[:0]
185	for i := 0; i < len(tf); {
186		t := tf[i]
187		if t.overlaps(icmp, umin, umax) {
188			if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 {
189				umin = t.imin.ukey()
190				dst = dst[:0]
191				i = 0
192				continue
193			} else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
194				umax = t.imax.ukey()
195				// Restart search if it is overlapped.
196				if overlapped {
197					dst = dst[:0]
198					i = 0
199					continue
200				}
201			}
202
203			dst = append(dst, t)
204		}
205		i++
206	}
207
208	return dst
209}
210
211// Returns tables key range.
212func (tf tFiles) getRange(icmp *iComparer) (imin, imax internalKey) {
213	for i, t := range tf {
214		if i == 0 {
215			imin, imax = t.imin, t.imax
216			continue
217		}
218		if icmp.Compare(t.imin, imin) < 0 {
219			imin = t.imin
220		}
221		if icmp.Compare(t.imax, imax) > 0 {
222			imax = t.imax
223		}
224	}
225
226	return
227}
228
229// Creates iterator index from tables.
230func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer {
231	if slice != nil {
232		var start, limit int
233		if slice.Start != nil {
234			start = tf.searchMax(icmp, internalKey(slice.Start))
235		}
236		if slice.Limit != nil {
237			limit = tf.searchMin(icmp, internalKey(slice.Limit))
238		} else {
239			limit = tf.Len()
240		}
241		tf = tf[start:limit]
242	}
243	return iterator.NewArrayIndexer(&tFilesArrayIndexer{
244		tFiles: tf,
245		tops:   tops,
246		icmp:   icmp,
247		slice:  slice,
248		ro:     ro,
249	})
250}
251
252// Tables iterator index.
253type tFilesArrayIndexer struct {
254	tFiles
255	tops  *tOps
256	icmp  *iComparer
257	slice *util.Range
258	ro    *opt.ReadOptions
259}
260
261func (a *tFilesArrayIndexer) Search(key []byte) int {
262	return a.searchMax(a.icmp, internalKey(key))
263}
264
265func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator {
266	if i == 0 || i == a.Len()-1 {
267		return a.tops.newIterator(a.tFiles[i], a.slice, a.ro)
268	}
269	return a.tops.newIterator(a.tFiles[i], nil, a.ro)
270}
271
272// Helper type for sortByKey.
273type tFilesSortByKey struct {
274	tFiles
275	icmp *iComparer
276}
277
278func (x *tFilesSortByKey) Less(i, j int) bool {
279	return x.lessByKey(x.icmp, i, j)
280}
281
282// Helper type for sortByNum.
283type tFilesSortByNum struct {
284	tFiles
285}
286
287func (x *tFilesSortByNum) Less(i, j int) bool {
288	return x.lessByNum(i, j)
289}
290
291// Table operations.
292type tOps struct {
293	s            *session
294	noSync       bool
295	evictRemoved bool
296	cache        *cache.Cache
297	bcache       *cache.Cache
298	bpool        *util.BufferPool
299}
300
301// Creates an empty table and returns table writer.
302func (t *tOps) create() (*tWriter, error) {
303	fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()}
304	fw, err := t.s.stor.Create(fd)
305	if err != nil {
306		return nil, err
307	}
308	return &tWriter{
309		t:  t,
310		fd: fd,
311		w:  fw,
312		tw: table.NewWriter(fw, t.s.o.Options),
313	}, nil
314}
315
316// Builds table from src iterator.
317func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
318	w, err := t.create()
319	if err != nil {
320		return
321	}
322
323	defer func() {
324		if err != nil {
325			w.drop()
326		}
327	}()
328
329	for src.Next() {
330		err = w.append(src.Key(), src.Value())
331		if err != nil {
332			return
333		}
334	}
335	err = src.Error()
336	if err != nil {
337		return
338	}
339
340	n = w.tw.EntriesLen()
341	f, err = w.finish()
342	return
343}
344
345// Opens table. It returns a cache handle, which should
346// be released after use.
347func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) {
348	ch = t.cache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) {
349		var r storage.Reader
350		r, err = t.s.stor.Open(f.fd)
351		if err != nil {
352			return 0, nil
353		}
354
355		var bcache *cache.NamespaceGetter
356		if t.bcache != nil {
357			bcache = &cache.NamespaceGetter{Cache: t.bcache, NS: uint64(f.fd.Num)}
358		}
359
360		var tr *table.Reader
361		tr, err = table.NewReader(r, f.size, f.fd, bcache, t.bpool, t.s.o.Options)
362		if err != nil {
363			r.Close()
364			return 0, nil
365		}
366		return 1, tr
367
368	})
369	if ch == nil && err == nil {
370		err = ErrClosed
371	}
372	return
373}
374
375// Finds key/value pair whose key is greater than or equal to the
376// given key.
377func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) {
378	ch, err := t.open(f)
379	if err != nil {
380		return nil, nil, err
381	}
382	defer ch.Release()
383	return ch.Value().(*table.Reader).Find(key, true, ro)
384}
385
386// Finds key that is greater than or equal to the given key.
387func (t *tOps) findKey(f *tFile, key []byte, ro *opt.ReadOptions) (rkey []byte, err error) {
388	ch, err := t.open(f)
389	if err != nil {
390		return nil, err
391	}
392	defer ch.Release()
393	return ch.Value().(*table.Reader).FindKey(key, true, ro)
394}
395
396// Returns approximate offset of the given key.
397func (t *tOps) offsetOf(f *tFile, key []byte) (offset int64, err error) {
398	ch, err := t.open(f)
399	if err != nil {
400		return
401	}
402	defer ch.Release()
403	return ch.Value().(*table.Reader).OffsetOf(key)
404}
405
406// Creates an iterator from the given table.
407func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
408	ch, err := t.open(f)
409	if err != nil {
410		return iterator.NewEmptyIterator(err)
411	}
412	iter := ch.Value().(*table.Reader).NewIterator(slice, ro)
413	iter.SetReleaser(ch)
414	return iter
415}
416
417// Removes table from persistent storage. It waits until
418// no one use the the table.
419func (t *tOps) remove(f *tFile) {
420	t.cache.Delete(0, uint64(f.fd.Num), func() {
421		if err := t.s.stor.Remove(f.fd); err != nil {
422			t.s.logf("table@remove removing @%d %q", f.fd.Num, err)
423		} else {
424			t.s.logf("table@remove removed @%d", f.fd.Num)
425		}
426		if t.evictRemoved && t.bcache != nil {
427			t.bcache.EvictNS(uint64(f.fd.Num))
428		}
429	})
430}
431
432// Closes the table ops instance. It will close all tables,
433// regadless still used or not.
434func (t *tOps) close() {
435	t.bpool.Close()
436	t.cache.Close()
437	if t.bcache != nil {
438		t.bcache.CloseWeak()
439	}
440}
441
442// Creates new initialized table ops instance.
443func newTableOps(s *session) *tOps {
444	var (
445		cacher cache.Cacher
446		bcache *cache.Cache
447		bpool  *util.BufferPool
448	)
449	if s.o.GetOpenFilesCacheCapacity() > 0 {
450		cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity())
451	}
452	if !s.o.GetDisableBlockCache() {
453		var bcacher cache.Cacher
454		if s.o.GetBlockCacheCapacity() > 0 {
455			bcacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity())
456		}
457		bcache = cache.NewCache(bcacher)
458	}
459	if !s.o.GetDisableBufferPool() {
460		bpool = util.NewBufferPool(s.o.GetBlockSize() + 5)
461	}
462	return &tOps{
463		s:            s,
464		noSync:       s.o.GetNoSync(),
465		evictRemoved: s.o.GetBlockCacheEvictRemoved(),
466		cache:        cache.NewCache(cacher),
467		bcache:       bcache,
468		bpool:        bpool,
469	}
470}
471
472// tWriter wraps the table writer. It keep track of file descriptor
473// and added key range.
474type tWriter struct {
475	t *tOps
476
477	fd storage.FileDesc
478	w  storage.Writer
479	tw *table.Writer
480
481	first, last []byte
482}
483
484// Append key/value pair to the table.
485func (w *tWriter) append(key, value []byte) error {
486	if w.first == nil {
487		w.first = append([]byte{}, key...)
488	}
489	w.last = append(w.last[:0], key...)
490	return w.tw.Append(key, value)
491}
492
493// Returns true if the table is empty.
494func (w *tWriter) empty() bool {
495	return w.first == nil
496}
497
498// Closes the storage.Writer.
499func (w *tWriter) close() {
500	if w.w != nil {
501		w.w.Close()
502		w.w = nil
503	}
504}
505
506// Finalizes the table and returns table file.
507func (w *tWriter) finish() (f *tFile, err error) {
508	defer w.close()
509	err = w.tw.Close()
510	if err != nil {
511		return
512	}
513	if !w.t.noSync {
514		err = w.w.Sync()
515		if err != nil {
516			return
517		}
518	}
519	f = newTableFile(w.fd, int64(w.tw.BytesLen()), internalKey(w.first), internalKey(w.last))
520	return
521}
522
523// Drops the table.
524func (w *tWriter) drop() {
525	w.close()
526	w.t.s.stor.Remove(w.fd)
527	w.t.s.reuseFileNum(w.fd.Num)
528	w.tw = nil
529	w.first = nil
530	w.last = nil
531}
532