1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"fmt"
11	"sort"
12	"sync/atomic"
13
14	"github.com/syndtr/goleveldb/leveldb/cache"
15	"github.com/syndtr/goleveldb/leveldb/iterator"
16	"github.com/syndtr/goleveldb/leveldb/opt"
17	"github.com/syndtr/goleveldb/leveldb/storage"
18	"github.com/syndtr/goleveldb/leveldb/table"
19	"github.com/syndtr/goleveldb/leveldb/util"
20)
21
22// tFile holds basic information about a table.
23type tFile struct {
24	fd         storage.FileDesc
25	seekLeft   int32
26	size       int64
27	imin, imax internalKey
28}
29
30// Returns true if given key is after largest key of this table.
31func (t *tFile) after(icmp *iComparer, ukey []byte) bool {
32	return ukey != nil && icmp.uCompare(ukey, t.imax.ukey()) > 0
33}
34
35// Returns true if given key is before smallest key of this table.
36func (t *tFile) before(icmp *iComparer, ukey []byte) bool {
37	return ukey != nil && icmp.uCompare(ukey, t.imin.ukey()) < 0
38}
39
40// Returns true if given key range overlaps with this table key range.
41func (t *tFile) overlaps(icmp *iComparer, umin, umax []byte) bool {
42	return !t.after(icmp, umin) && !t.before(icmp, umax)
43}
44
45// Cosumes one seek and return current seeks left.
46func (t *tFile) consumeSeek() int32 {
47	return atomic.AddInt32(&t.seekLeft, -1)
48}
49
50// Creates new tFile.
51func newTableFile(fd storage.FileDesc, size int64, imin, imax internalKey) *tFile {
52	f := &tFile{
53		fd:   fd,
54		size: size,
55		imin: imin,
56		imax: imax,
57	}
58
59	// We arrange to automatically compact this file after
60	// a certain number of seeks.  Let's assume:
61	//   (1) One seek costs 10ms
62	//   (2) Writing or reading 1MB costs 10ms (100MB/s)
63	//   (3) A compaction of 1MB does 25MB of IO:
64	//         1MB read from this level
65	//         10-12MB read from next level (boundaries may be misaligned)
66	//         10-12MB written to next level
67	// This implies that 25 seeks cost the same as the compaction
68	// of 1MB of data.  I.e., one seek costs approximately the
69	// same as the compaction of 40KB of data.  We are a little
70	// conservative and allow approximately one seek for every 16KB
71	// of data before triggering a compaction.
72	f.seekLeft = int32(size / 16384)
73	if f.seekLeft < 100 {
74		f.seekLeft = 100
75	}
76
77	return f
78}
79
80func tableFileFromRecord(r atRecord) *tFile {
81	return newTableFile(storage.FileDesc{storage.TypeTable, r.num}, r.size, r.imin, r.imax)
82}
83
84// tFiles hold multiple tFile.
85type tFiles []*tFile
86
87func (tf tFiles) Len() int      { return len(tf) }
88func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] }
89
90func (tf tFiles) nums() string {
91	x := "[ "
92	for i, f := range tf {
93		if i != 0 {
94			x += ", "
95		}
96		x += fmt.Sprint(f.fd.Num)
97	}
98	x += " ]"
99	return x
100}
101
102// Returns true if i smallest key is less than j.
103// This used for sort by key in ascending order.
104func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool {
105	a, b := tf[i], tf[j]
106	n := icmp.Compare(a.imin, b.imin)
107	if n == 0 {
108		return a.fd.Num < b.fd.Num
109	}
110	return n < 0
111}
112
113// Returns true if i file number is greater than j.
114// This used for sort by file number in descending order.
115func (tf tFiles) lessByNum(i, j int) bool {
116	return tf[i].fd.Num > tf[j].fd.Num
117}
118
119// Sorts tables by key in ascending order.
120func (tf tFiles) sortByKey(icmp *iComparer) {
121	sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp})
122}
123
124// Sorts tables by file number in descending order.
125func (tf tFiles) sortByNum() {
126	sort.Sort(&tFilesSortByNum{tFiles: tf})
127}
128
129// Returns sum of all tables size.
130func (tf tFiles) size() (sum int64) {
131	for _, t := range tf {
132		sum += t.size
133	}
134	return sum
135}
136
137// Searches smallest index of tables whose its smallest
138// key is after or equal with given key.
139func (tf tFiles) searchMin(icmp *iComparer, ikey internalKey) int {
140	return sort.Search(len(tf), func(i int) bool {
141		return icmp.Compare(tf[i].imin, ikey) >= 0
142	})
143}
144
145// Searches smallest index of tables whose its largest
146// key is after or equal with given key.
147func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int {
148	return sort.Search(len(tf), func(i int) bool {
149		return icmp.Compare(tf[i].imax, ikey) >= 0
150	})
151}
152
153// Returns true if given key range overlaps with one or more
154// tables key range. If unsorted is true then binary search will not be used.
155func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {
156	if unsorted {
157		// Check against all files.
158		for _, t := range tf {
159			if t.overlaps(icmp, umin, umax) {
160				return true
161			}
162		}
163		return false
164	}
165
166	i := 0
167	if len(umin) > 0 {
168		// Find the earliest possible internal key for min.
169		i = tf.searchMax(icmp, makeInternalKey(nil, umin, keyMaxSeq, keyTypeSeek))
170	}
171	if i >= len(tf) {
172		// Beginning of range is after all files, so no overlap.
173		return false
174	}
175	return !tf[i].before(icmp, umax)
176}
177
178// Returns tables whose its key range overlaps with given key range.
179// Range will be expanded if ukey found hop across tables.
180// If overlapped is true then the search will be restarted if umax
181// expanded.
182// The dst content will be overwritten.
183func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles {
184	dst = dst[:0]
185	for i := 0; i < len(tf); {
186		t := tf[i]
187		if t.overlaps(icmp, umin, umax) {
188			if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 {
189				umin = t.imin.ukey()
190				dst = dst[:0]
191				i = 0
192				continue
193			} else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
194				umax = t.imax.ukey()
195				// Restart search if it is overlapped.
196				if overlapped {
197					dst = dst[:0]
198					i = 0
199					continue
200				}
201			}
202
203			dst = append(dst, t)
204		}
205		i++
206	}
207
208	return dst
209}
210
211// Returns tables key range.
212func (tf tFiles) getRange(icmp *iComparer) (imin, imax internalKey) {
213	for i, t := range tf {
214		if i == 0 {
215			imin, imax = t.imin, t.imax
216			continue
217		}
218		if icmp.Compare(t.imin, imin) < 0 {
219			imin = t.imin
220		}
221		if icmp.Compare(t.imax, imax) > 0 {
222			imax = t.imax
223		}
224	}
225
226	return
227}
228
229// Creates iterator index from tables.
230func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer {
231	if slice != nil {
232		var start, limit int
233		if slice.Start != nil {
234			start = tf.searchMax(icmp, internalKey(slice.Start))
235		}
236		if slice.Limit != nil {
237			limit = tf.searchMin(icmp, internalKey(slice.Limit))
238		} else {
239			limit = tf.Len()
240		}
241		tf = tf[start:limit]
242	}
243	return iterator.NewArrayIndexer(&tFilesArrayIndexer{
244		tFiles: tf,
245		tops:   tops,
246		icmp:   icmp,
247		slice:  slice,
248		ro:     ro,
249	})
250}
251
252// Tables iterator index.
253type tFilesArrayIndexer struct {
254	tFiles
255	tops  *tOps
256	icmp  *iComparer
257	slice *util.Range
258	ro    *opt.ReadOptions
259}
260
261func (a *tFilesArrayIndexer) Search(key []byte) int {
262	return a.searchMax(a.icmp, internalKey(key))
263}
264
265func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator {
266	if i == 0 || i == a.Len()-1 {
267		return a.tops.newIterator(a.tFiles[i], a.slice, a.ro)
268	}
269	return a.tops.newIterator(a.tFiles[i], nil, a.ro)
270}
271
272// Helper type for sortByKey.
273type tFilesSortByKey struct {
274	tFiles
275	icmp *iComparer
276}
277
278func (x *tFilesSortByKey) Less(i, j int) bool {
279	return x.lessByKey(x.icmp, i, j)
280}
281
282// Helper type for sortByNum.
283type tFilesSortByNum struct {
284	tFiles
285}
286
287func (x *tFilesSortByNum) Less(i, j int) bool {
288	return x.lessByNum(i, j)
289}
290
291// Table operations.
292type tOps struct {
293	s      *session
294	noSync bool
295	cache  *cache.Cache
296	bcache *cache.Cache
297	bpool  *util.BufferPool
298}
299
300// Creates an empty table and returns table writer.
301func (t *tOps) create() (*tWriter, error) {
302	fd := storage.FileDesc{storage.TypeTable, t.s.allocFileNum()}
303	fw, err := t.s.stor.Create(fd)
304	if err != nil {
305		return nil, err
306	}
307	return &tWriter{
308		t:  t,
309		fd: fd,
310		w:  fw,
311		tw: table.NewWriter(fw, t.s.o.Options),
312	}, nil
313}
314
315// Builds table from src iterator.
316func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
317	w, err := t.create()
318	if err != nil {
319		return
320	}
321
322	defer func() {
323		if err != nil {
324			w.drop()
325		}
326	}()
327
328	for src.Next() {
329		err = w.append(src.Key(), src.Value())
330		if err != nil {
331			return
332		}
333	}
334	err = src.Error()
335	if err != nil {
336		return
337	}
338
339	n = w.tw.EntriesLen()
340	f, err = w.finish()
341	return
342}
343
344// Opens table. It returns a cache handle, which should
345// be released after use.
346func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) {
347	ch = t.cache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) {
348		var r storage.Reader
349		r, err = t.s.stor.Open(f.fd)
350		if err != nil {
351			return 0, nil
352		}
353
354		var bcache *cache.NamespaceGetter
355		if t.bcache != nil {
356			bcache = &cache.NamespaceGetter{Cache: t.bcache, NS: uint64(f.fd.Num)}
357		}
358
359		var tr *table.Reader
360		tr, err = table.NewReader(r, f.size, f.fd, bcache, t.bpool, t.s.o.Options)
361		if err != nil {
362			r.Close()
363			return 0, nil
364		}
365		return 1, tr
366
367	})
368	if ch == nil && err == nil {
369		err = ErrClosed
370	}
371	return
372}
373
374// Finds key/value pair whose key is greater than or equal to the
375// given key.
376func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) {
377	ch, err := t.open(f)
378	if err != nil {
379		return nil, nil, err
380	}
381	defer ch.Release()
382	return ch.Value().(*table.Reader).Find(key, true, ro)
383}
384
385// Finds key that is greater than or equal to the given key.
386func (t *tOps) findKey(f *tFile, key []byte, ro *opt.ReadOptions) (rkey []byte, err error) {
387	ch, err := t.open(f)
388	if err != nil {
389		return nil, err
390	}
391	defer ch.Release()
392	return ch.Value().(*table.Reader).FindKey(key, true, ro)
393}
394
395// Returns approximate offset of the given key.
396func (t *tOps) offsetOf(f *tFile, key []byte) (offset int64, err error) {
397	ch, err := t.open(f)
398	if err != nil {
399		return
400	}
401	defer ch.Release()
402	return ch.Value().(*table.Reader).OffsetOf(key)
403}
404
405// Creates an iterator from the given table.
406func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
407	ch, err := t.open(f)
408	if err != nil {
409		return iterator.NewEmptyIterator(err)
410	}
411	iter := ch.Value().(*table.Reader).NewIterator(slice, ro)
412	iter.SetReleaser(ch)
413	return iter
414}
415
416// Removes table from persistent storage. It waits until
417// no one use the the table.
418func (t *tOps) remove(f *tFile) {
419	t.cache.Delete(0, uint64(f.fd.Num), func() {
420		if err := t.s.stor.Remove(f.fd); err != nil {
421			t.s.logf("table@remove removing @%d %q", f.fd.Num, err)
422		} else {
423			t.s.logf("table@remove removed @%d", f.fd.Num)
424		}
425		if t.bcache != nil {
426			t.bcache.EvictNS(uint64(f.fd.Num))
427		}
428	})
429}
430
431// Closes the table ops instance. It will close all tables,
432// regadless still used or not.
433func (t *tOps) close() {
434	t.bpool.Close()
435	t.cache.Close()
436	if t.bcache != nil {
437		t.bcache.CloseWeak()
438	}
439}
440
441// Creates new initialized table ops instance.
442func newTableOps(s *session) *tOps {
443	var (
444		cacher cache.Cacher
445		bcache *cache.Cache
446		bpool  *util.BufferPool
447	)
448	if s.o.GetOpenFilesCacheCapacity() > 0 {
449		cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity())
450	}
451	if !s.o.GetDisableBlockCache() {
452		var bcacher cache.Cacher
453		if s.o.GetBlockCacheCapacity() > 0 {
454			bcacher = cache.NewLRU(s.o.GetBlockCacheCapacity())
455		}
456		bcache = cache.NewCache(bcacher)
457	}
458	if !s.o.GetDisableBufferPool() {
459		bpool = util.NewBufferPool(s.o.GetBlockSize() + 5)
460	}
461	return &tOps{
462		s:      s,
463		noSync: s.o.GetNoSync(),
464		cache:  cache.NewCache(cacher),
465		bcache: bcache,
466		bpool:  bpool,
467	}
468}
469
470// tWriter wraps the table writer. It keep track of file descriptor
471// and added key range.
472type tWriter struct {
473	t *tOps
474
475	fd storage.FileDesc
476	w  storage.Writer
477	tw *table.Writer
478
479	first, last []byte
480}
481
482// Append key/value pair to the table.
483func (w *tWriter) append(key, value []byte) error {
484	if w.first == nil {
485		w.first = append([]byte{}, key...)
486	}
487	w.last = append(w.last[:0], key...)
488	return w.tw.Append(key, value)
489}
490
491// Returns true if the table is empty.
492func (w *tWriter) empty() bool {
493	return w.first == nil
494}
495
496// Closes the storage.Writer.
497func (w *tWriter) close() {
498	if w.w != nil {
499		w.w.Close()
500		w.w = nil
501	}
502}
503
504// Finalizes the table and returns table file.
505func (w *tWriter) finish() (f *tFile, err error) {
506	defer w.close()
507	err = w.tw.Close()
508	if err != nil {
509		return
510	}
511	if !w.t.noSync {
512		err = w.w.Sync()
513		if err != nil {
514			return
515		}
516	}
517	f = newTableFile(w.fd, int64(w.tw.BytesLen()), internalKey(w.first), internalKey(w.last))
518	return
519}
520
521// Drops the table.
522func (w *tWriter) drop() {
523	w.close()
524	w.t.s.stor.Remove(w.fd)
525	w.t.s.reuseFileNum(w.fd.Num)
526	w.tw = nil
527	w.first = nil
528	w.last = nil
529}
530