1package local
2
3import (
4	"bytes"
5	"context"
6	"errors"
7	"flag"
8	"fmt"
9	"os"
10	"path"
11	"path/filepath"
12	"sync"
13	"time"
14
15	"github.com/go-kit/log/level"
16	"go.etcd.io/bbolt"
17
18	"github.com/cortexproject/cortex/pkg/chunk"
19	chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
20	util_log "github.com/cortexproject/cortex/pkg/util/log"
21)
22
23var (
24	bucketName          = []byte("index")
25	ErrUnexistentBoltDB = errors.New("boltdb file does not exist")
26)
27
28const (
29	separator      = "\000"
30	dbReloadPeriod = 10 * time.Minute
31
32	DBOperationRead = iota
33	DBOperationWrite
34
35	openBoltDBFileTimeout = 5 * time.Second
36)
37
38// BoltDBConfig for a BoltDB index client.
39type BoltDBConfig struct {
40	Directory string `yaml:"directory"`
41}
42
43// RegisterFlags registers flags.
44func (cfg *BoltDBConfig) RegisterFlags(f *flag.FlagSet) {
45	f.StringVar(&cfg.Directory, "boltdb.dir", "", "Location of BoltDB index files.")
46}
47
48type BoltIndexClient struct {
49	cfg BoltDBConfig
50
51	dbsMtx sync.RWMutex
52	dbs    map[string]*bbolt.DB
53	done   chan struct{}
54	wait   sync.WaitGroup
55}
56
57// NewBoltDBIndexClient creates a new IndexClient that used BoltDB.
58func NewBoltDBIndexClient(cfg BoltDBConfig) (*BoltIndexClient, error) {
59	if err := chunk_util.EnsureDirectory(cfg.Directory); err != nil {
60		return nil, err
61	}
62
63	indexClient := &BoltIndexClient{
64		cfg:  cfg,
65		dbs:  map[string]*bbolt.DB{},
66		done: make(chan struct{}),
67	}
68
69	indexClient.wait.Add(1)
70	go indexClient.loop()
71	return indexClient, nil
72}
73
74func (b *BoltIndexClient) loop() {
75	defer b.wait.Done()
76
77	ticker := time.NewTicker(dbReloadPeriod)
78	defer ticker.Stop()
79
80	for {
81		select {
82		case <-ticker.C:
83			b.reload()
84		case <-b.done:
85			return
86		}
87	}
88}
89
90func (b *BoltIndexClient) reload() {
91	b.dbsMtx.RLock()
92
93	removedDBs := []string{}
94	for name := range b.dbs {
95		if _, err := os.Stat(path.Join(b.cfg.Directory, name)); err != nil && os.IsNotExist(err) {
96			removedDBs = append(removedDBs, name)
97			level.Debug(util_log.Logger).Log("msg", "boltdb file got removed", "filename", name)
98			continue
99		}
100	}
101	b.dbsMtx.RUnlock()
102
103	if len(removedDBs) != 0 {
104		b.dbsMtx.Lock()
105		defer b.dbsMtx.Unlock()
106
107		for _, name := range removedDBs {
108			if err := b.dbs[name].Close(); err != nil {
109				level.Error(util_log.Logger).Log("msg", "failed to close removed boltdb", "filename", name, "err", err)
110				continue
111			}
112			delete(b.dbs, name)
113		}
114	}
115
116}
117
118func (b *BoltIndexClient) Stop() {
119	close(b.done)
120
121	b.dbsMtx.Lock()
122	defer b.dbsMtx.Unlock()
123	for _, db := range b.dbs {
124		db.Close()
125	}
126
127	b.wait.Wait()
128}
129
130func (b *BoltIndexClient) NewWriteBatch() chunk.WriteBatch {
131	return &BoltWriteBatch{
132		Writes: map[string]TableWrites{},
133	}
134}
135
136// GetDB should always return a db for write operation unless an error occurs while doing so.
137// While for read operation it should throw ErrUnexistentBoltDB error if file does not exist for reading
138func (b *BoltIndexClient) GetDB(name string, operation int) (*bbolt.DB, error) {
139	b.dbsMtx.RLock()
140	db, ok := b.dbs[name]
141	b.dbsMtx.RUnlock()
142	if ok {
143		return db, nil
144	}
145
146	// we do not want to create a new db for reading if it does not exist
147	if operation == DBOperationRead {
148		if _, err := os.Stat(path.Join(b.cfg.Directory, name)); err != nil {
149			if os.IsNotExist(err) {
150				return nil, ErrUnexistentBoltDB
151			}
152			return nil, err
153		}
154	}
155
156	b.dbsMtx.Lock()
157	defer b.dbsMtx.Unlock()
158	db, ok = b.dbs[name]
159	if ok {
160		return db, nil
161	}
162
163	// Open the database.
164	// Set Timeout to avoid obtaining file lock wait indefinitely.
165	db, err := bbolt.Open(path.Join(b.cfg.Directory, name), 0666, &bbolt.Options{Timeout: openBoltDBFileTimeout})
166	if err != nil {
167		return nil, err
168	}
169
170	b.dbs[name] = db
171	return db, nil
172}
173
174func (b *BoltIndexClient) WriteToDB(ctx context.Context, db *bbolt.DB, writes TableWrites) error {
175	return db.Update(func(tx *bbolt.Tx) error {
176		var b *bbolt.Bucket
177
178		// a bucket should already exist for deletes, for other writes we create one otherwise.
179		if len(writes.deletes) != 0 {
180			b = tx.Bucket(bucketName)
181			if b == nil {
182				return fmt.Errorf("bucket %s not found in table %s", bucketName, filepath.Base(db.Path()))
183			}
184		} else {
185			var err error
186			b, err = tx.CreateBucketIfNotExists(bucketName)
187			if err != nil {
188				return err
189			}
190		}
191
192		for key, value := range writes.puts {
193			if err := b.Put([]byte(key), value); err != nil {
194				return err
195			}
196		}
197
198		for key := range writes.deletes {
199			if err := b.Delete([]byte(key)); err != nil {
200				return err
201			}
202		}
203
204		return nil
205	})
206}
207
208func (b *BoltIndexClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error {
209	for table, writes := range batch.(*BoltWriteBatch).Writes {
210		db, err := b.GetDB(table, DBOperationWrite)
211		if err != nil {
212			return err
213		}
214
215		err = b.WriteToDB(ctx, db, writes)
216		if err != nil {
217			return err
218		}
219	}
220
221	return nil
222}
223
224func (b *BoltIndexClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
225	return chunk_util.DoParallelQueries(ctx, b.query, queries, callback)
226}
227
228func (b *BoltIndexClient) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error {
229	db, err := b.GetDB(query.TableName, DBOperationRead)
230	if err != nil {
231		if err == ErrUnexistentBoltDB {
232			return nil
233		}
234
235		return err
236	}
237
238	return b.QueryDB(ctx, db, query, callback)
239}
240
241func (b *BoltIndexClient) QueryDB(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
242	return db.View(func(tx *bbolt.Tx) error {
243		bucket := tx.Bucket(bucketName)
244		if bucket == nil {
245			return nil
246		}
247
248		return b.QueryWithCursor(ctx, bucket.Cursor(), query, callback)
249	})
250}
251
252func (b *BoltIndexClient) QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
253	var start []byte
254	if len(query.RangeValuePrefix) > 0 {
255		start = []byte(query.HashValue + separator + string(query.RangeValuePrefix))
256	} else if len(query.RangeValueStart) > 0 {
257		start = []byte(query.HashValue + separator + string(query.RangeValueStart))
258	} else {
259		start = []byte(query.HashValue + separator)
260	}
261
262	rowPrefix := []byte(query.HashValue + separator)
263
264	var batch boltReadBatch
265
266	for k, v := c.Seek(start); k != nil; k, v = c.Next() {
267		if !bytes.HasPrefix(k, rowPrefix) {
268			break
269		}
270
271		if len(query.RangeValuePrefix) > 0 && !bytes.HasPrefix(k, start) {
272			break
273		}
274		if len(query.ValueEqual) > 0 && !bytes.Equal(v, query.ValueEqual) {
275			continue
276		}
277
278		// make a copy since k, v are only valid for the life of the transaction.
279		// See: https://godoc.org/github.com/boltdb/bolt#Cursor.Seek
280		batch.rangeValue = make([]byte, len(k)-len(rowPrefix))
281		copy(batch.rangeValue, k[len(rowPrefix):])
282
283		batch.value = make([]byte, len(v))
284		copy(batch.value, v)
285
286		if !callback(query, &batch) {
287			break
288		}
289	}
290
291	return nil
292}
293
294type TableWrites struct {
295	puts    map[string][]byte
296	deletes map[string]struct{}
297}
298
299type BoltWriteBatch struct {
300	Writes map[string]TableWrites
301}
302
303func (b *BoltWriteBatch) getOrCreateTableWrites(tableName string) TableWrites {
304	writes, ok := b.Writes[tableName]
305	if !ok {
306		writes = TableWrites{
307			puts:    map[string][]byte{},
308			deletes: map[string]struct{}{},
309		}
310		b.Writes[tableName] = writes
311	}
312
313	return writes
314}
315
316func (b *BoltWriteBatch) Delete(tableName, hashValue string, rangeValue []byte) {
317	writes := b.getOrCreateTableWrites(tableName)
318
319	key := hashValue + separator + string(rangeValue)
320	writes.deletes[key] = struct{}{}
321}
322
323func (b *BoltWriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) {
324	writes := b.getOrCreateTableWrites(tableName)
325
326	key := hashValue + separator + string(rangeValue)
327	writes.puts[key] = value
328}
329
330type boltReadBatch struct {
331	rangeValue []byte
332	value      []byte
333}
334
335func (b boltReadBatch) Iterator() chunk.ReadBatchIterator {
336	return &boltReadBatchIterator{
337		boltReadBatch: b,
338	}
339}
340
341type boltReadBatchIterator struct {
342	consumed bool
343	boltReadBatch
344}
345
346func (b *boltReadBatchIterator) Next() bool {
347	if b.consumed {
348		return false
349	}
350	b.consumed = true
351	return true
352}
353
354func (b *boltReadBatchIterator) RangeValue() []byte {
355	return b.rangeValue
356}
357
358func (b *boltReadBatchIterator) Value() []byte {
359	return b.value
360}
361
362// Open the database.
363// Set Timeout to avoid obtaining file lock wait indefinitely.
364func OpenBoltdbFile(path string) (*bbolt.DB, error) {
365	return bbolt.Open(path, 0666, &bbolt.Options{Timeout: 5 * time.Second})
366}
367