1// Copyright 2014 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package index
15
16import (
17	"encoding"
18
19	"github.com/syndtr/goleveldb/leveldb"
20	leveldb_filter "github.com/syndtr/goleveldb/leveldb/filter"
21	leveldb_iterator "github.com/syndtr/goleveldb/leveldb/iterator"
22	leveldb_opt "github.com/syndtr/goleveldb/leveldb/opt"
23	leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
24)
25
26var (
27	keyspace = &leveldb_util.Range{
28		Start: nil,
29		Limit: nil,
30	}
31
32	iteratorOpts = &leveldb_opt.ReadOptions{
33		DontFillCache: true,
34	}
35)
36
37// LevelDB is a LevelDB-backed sorted KeyValueStore.
38type LevelDB struct {
39	storage   *leveldb.DB
40	readOpts  *leveldb_opt.ReadOptions
41	writeOpts *leveldb_opt.WriteOptions
42}
43
44// LevelDBOptions provides options for a LevelDB.
45type LevelDBOptions struct {
46	Path           string // Base path to store files.
47	CacheSizeBytes int
48}
49
50// NewLevelDB returns a newly allocated LevelDB-backed KeyValueStore ready to
51// use.
52func NewLevelDB(o LevelDBOptions) (KeyValueStore, error) {
53	options := &leveldb_opt.Options{
54		BlockCacheCapacity: o.CacheSizeBytes,
55		Filter:             leveldb_filter.NewBloomFilter(10),
56	}
57
58	storage, err := leveldb.OpenFile(o.Path, options)
59	if err != nil {
60		return nil, err
61	}
62
63	return &LevelDB{
64		storage:   storage,
65		readOpts:  &leveldb_opt.ReadOptions{},
66		writeOpts: &leveldb_opt.WriteOptions{},
67	}, nil
68}
69
70// NewBatch implements KeyValueStore.
71func (l *LevelDB) NewBatch() Batch {
72	return &LevelDBBatch{
73		batch: &leveldb.Batch{},
74	}
75}
76
77// Close implements KeyValueStore.
78func (l *LevelDB) Close() error {
79	return l.storage.Close()
80}
81
82// Get implements KeyValueStore.
83func (l *LevelDB) Get(key encoding.BinaryMarshaler, value encoding.BinaryUnmarshaler) (bool, error) {
84	k, err := key.MarshalBinary()
85	if err != nil {
86		return false, err
87	}
88	raw, err := l.storage.Get(k, l.readOpts)
89	if err == leveldb.ErrNotFound {
90		return false, nil
91	}
92	if err != nil {
93		return false, err
94	}
95	if value == nil {
96		return true, nil
97	}
98	return true, value.UnmarshalBinary(raw)
99}
100
101// Has implements KeyValueStore.
102func (l *LevelDB) Has(key encoding.BinaryMarshaler) (has bool, err error) {
103	return l.Get(key, nil)
104}
105
106// Delete implements KeyValueStore.
107func (l *LevelDB) Delete(key encoding.BinaryMarshaler) (bool, error) {
108	k, err := key.MarshalBinary()
109	if err != nil {
110		return false, err
111	}
112	// Note that Delete returns nil if k does not exist. So we have to test
113	// for existence with Has first.
114	if has, err := l.storage.Has(k, l.readOpts); !has || err != nil {
115		return false, err
116	}
117	if err = l.storage.Delete(k, l.writeOpts); err != nil {
118		return false, err
119	}
120	return true, nil
121}
122
123// Put implements KeyValueStore.
124func (l *LevelDB) Put(key, value encoding.BinaryMarshaler) error {
125	k, err := key.MarshalBinary()
126	if err != nil {
127		return err
128	}
129	v, err := value.MarshalBinary()
130	if err != nil {
131		return err
132	}
133	return l.storage.Put(k, v, l.writeOpts)
134}
135
136// Commit implements KeyValueStore.
137func (l *LevelDB) Commit(b Batch) error {
138	return l.storage.Write(b.(*LevelDBBatch).batch, l.writeOpts)
139}
140
141// ForEach implements KeyValueStore.
142func (l *LevelDB) ForEach(cb func(kv KeyValueAccessor) error) error {
143	snap, err := l.storage.GetSnapshot()
144	if err != nil {
145		return err
146	}
147	defer snap.Release()
148
149	iter := snap.NewIterator(keyspace, iteratorOpts)
150
151	kv := &levelDBKeyValueAccessor{it: iter}
152
153	for valid := iter.First(); valid; valid = iter.Next() {
154		if err = iter.Error(); err != nil {
155			return err
156		}
157
158		if err := cb(kv); err != nil {
159			return err
160		}
161	}
162	return nil
163}
164
165// LevelDBBatch is a Batch implementation for LevelDB.
166type LevelDBBatch struct {
167	batch *leveldb.Batch
168}
169
170// Put implements Batch.
171func (b *LevelDBBatch) Put(key, value encoding.BinaryMarshaler) error {
172	k, err := key.MarshalBinary()
173	if err != nil {
174		return err
175	}
176	v, err := value.MarshalBinary()
177	if err != nil {
178		return err
179	}
180	b.batch.Put(k, v)
181	return nil
182}
183
184// Delete implements Batch.
185func (b *LevelDBBatch) Delete(key encoding.BinaryMarshaler) error {
186	k, err := key.MarshalBinary()
187	if err != nil {
188		return err
189	}
190	b.batch.Delete(k)
191	return nil
192}
193
194// Reset implements Batch.
195func (b *LevelDBBatch) Reset() {
196	b.batch.Reset()
197}
198
199// levelDBKeyValueAccessor implements KeyValueAccessor.
200type levelDBKeyValueAccessor struct {
201	it leveldb_iterator.Iterator
202}
203
204func (i *levelDBKeyValueAccessor) Key(key encoding.BinaryUnmarshaler) error {
205	return key.UnmarshalBinary(i.it.Key())
206}
207
208func (i *levelDBKeyValueAccessor) Value(value encoding.BinaryUnmarshaler) error {
209	return value.UnmarshalBinary(i.it.Value())
210}
211