1// Copyright 2015 Keybase, Inc. All rights reserved. Use of
2// this source code is governed by the included BSD license.
3
4package libkb
5
6import (
7	"fmt"
8	"os"
9	"path/filepath"
10	"strings"
11	"sync"
12
13	"github.com/syndtr/goleveldb/leveldb"
14	errors "github.com/syndtr/goleveldb/leveldb/errors"
15	"github.com/syndtr/goleveldb/leveldb/filter"
16	"github.com/syndtr/goleveldb/leveldb/opt"
17	"github.com/syndtr/goleveldb/leveldb/util"
18	"golang.org/x/net/context"
19)
20
21// table names
22const (
23	levelDbTableLo = "lo"
24	levelDbTableKv = "kv"
25	// keys with this prefix are ignored by the dbcleaner
26	levelDbTablePerm = "pm"
27)
28
29type levelDBOps interface {
30	Delete(key []byte, wo *opt.WriteOptions) error
31	Get(key []byte, ro *opt.ReadOptions) (value []byte, err error)
32	Put(key, value []byte, wo *opt.WriteOptions) error
33	Write(b *leveldb.Batch, wo *opt.WriteOptions) error
34}
35
36func levelDbPut(ops levelDBOps, cleaner *levelDbCleaner, id DbKey, aliases []DbKey, value []byte) (err error) {
37	defer convertNoSpaceError(&err)
38
39	idb := id.ToBytes()
40	if aliases == nil {
41		// if no aliases, just do a put
42		if err := ops.Put(idb, value, nil); err != nil {
43			return err
44		}
45		cleaner.markRecentlyUsed(context.Background(), idb)
46		return nil
47	}
48
49	batch := new(leveldb.Batch)
50	batch.Put(idb, value)
51	keys := make([][]byte, len(aliases))
52	keys = append(keys, idb)
53	for i, alias := range aliases {
54		aliasKey := alias.ToBytesLookup()
55		batch.Put(aliasKey, idb)
56		keys[i] = aliasKey
57	}
58
59	if err := ops.Write(batch, nil); err != nil {
60		return err
61	}
62	for _, key := range keys {
63		cleaner.markRecentlyUsed(context.Background(), key)
64	}
65	return nil
66}
67
68func levelDbGetWhich(ops levelDBOps, cleaner *levelDbCleaner, key []byte) (val []byte, found bool, err error) {
69	val, err = ops.Get(key, nil)
70	found = false
71	if err == nil {
72		found = true
73	} else if err == leveldb.ErrNotFound {
74		err = nil
75	}
76
77	if found && err == nil {
78		cleaner.markRecentlyUsed(context.Background(), key)
79	}
80	return val, found, err
81}
82
83func levelDbGet(ops levelDBOps, cleaner *levelDbCleaner, id DbKey) ([]byte, bool, error) {
84	return levelDbGetWhich(ops, cleaner, id.ToBytes())
85}
86
87func levelDbLookup(ops levelDBOps, cleaner *levelDbCleaner, id DbKey) (val []byte, found bool, err error) {
88	val, found, err = levelDbGetWhich(ops, cleaner, id.ToBytesLookup())
89	if found {
90		if tab, id2, err2 := DbKeyParse(string(val)); err2 != nil {
91			err = err2
92		} else if tab != levelDbTableKv && tab != levelDbTablePerm {
93			err = fmt.Errorf("bad alias; expected 'kv' or 'pm' but got '%s'", tab)
94		} else {
95			val, found, err = levelDbGetWhich(ops, cleaner, id2.ToBytes())
96		}
97	}
98	return val, found, err
99}
100
101func levelDbDelete(ops levelDBOps, cleaner *levelDbCleaner, id DbKey) (err error) {
102	defer convertNoSpaceError(&err)
103	key := id.ToBytes()
104	if err := ops.Delete(key, nil); err != nil {
105		return err
106	}
107
108	cleaner.removeRecentlyUsed(context.Background(), key)
109	return nil
110}
111
112type LevelDb struct {
113	// We use a RWMutex here to ensure close doesn't happen in the middle of
114	// other DB operations, and DB operations doesn't happen after close. The
115	// lock should be considered for the db pointer and dbOpenerOnce pointer,
116	// rather than the DB itself.  More specifically, close does Lock(), while
117	// other DB operations does RLock().
118	sync.RWMutex
119	db           *leveldb.DB
120	dbOpenerOnce *sync.Once
121	cleaner      *levelDbCleaner
122
123	filename string
124	Contextified
125}
126
127func NewLevelDb(g *GlobalContext, filename func() string) *LevelDb {
128	path := filename()
129	return &LevelDb{
130		Contextified: NewContextified(g),
131		filename:     path,
132		dbOpenerOnce: new(sync.Once),
133		cleaner:      newLevelDbCleaner(NewMetaContextTODO(g), filepath.Base(path)),
134	}
135}
136
137// Explicit open does nothing we'll wait for a lazy open
138func (l *LevelDb) Open() error { return nil }
139
140// Opts returns the options for all leveldb databases.
141//
142// PC: I think it's worth trying a bloom filter.  From docs:
143// "In many cases, a filter can cut down the number of disk
144// seeks from a handful to a single disk seek per DB.Get call."
145func (l *LevelDb) Opts() *opt.Options {
146	return &opt.Options{
147		OpenFilesCacheCapacity: l.G().Env.GetLevelDBNumFiles(),
148		Filter:                 filter.NewBloomFilter(10),
149		CompactionTableSize:    10 * opt.MiB,
150		WriteBuffer:            l.G().Env.GetLevelDBWriteBufferMB() * opt.MiB,
151	}
152}
153
154func (l *LevelDb) doWhileOpenAndNukeIfCorrupted(action func() error) (err error) {
155	err = func() error {
156		l.RLock()
157		defer l.RUnlock()
158
159		// This only happens at first ever doWhileOpenAndNukeIfCorrupted call, or
160		// when doOpenerOnce is just reset in Nuke()
161		l.dbOpenerOnce.Do(func() {
162			l.G().Log.Debug("+ LevelDb.open")
163			fn := l.GetFilename()
164			l.G().Log.Debug("| Opening LevelDB for local cache: %v %s", l, fn)
165			l.G().Log.Debug("| Opening LevelDB options: %+v", l.Opts())
166			l.db, err = leveldb.OpenFile(fn, l.Opts())
167			if _, ok := err.(*errors.ErrCorrupted); ok {
168				l.G().Log.Debug("| LevelDb was corrupted; attempting recovery (%v)", err)
169				var recoveryError error
170				l.db, recoveryError = leveldb.RecoverFile(fn, nil)
171				if recoveryError != nil {
172					l.G().Log.Debug("| Recovery failed: %v", recoveryError)
173				} else {
174					l.G().Log.Debug("| Recovery succeeded!")
175					// wipe the outer error since it's fixed now
176					err = nil
177				}
178			}
179			l.G().Log.Debug("- LevelDb.open -> %s", ErrToOk(err))
180			if l.db != nil {
181				l.cleaner.setDb(l.db)
182			}
183		})
184
185		if err != nil {
186			return err
187		}
188
189		if l.db == nil {
190			// This means DB is already closed. We are preventing lazy-opening after
191			// closing, so just return error here.
192			return LevelDBOpenClosedError{}
193		}
194
195		return action()
196	}()
197
198	// If the file is corrupt, just nuke and act like we didn't find anything
199	if l.nukeIfCorrupt(err) {
200		err = nil
201	} else if IsNoSpaceOnDeviceError(err) {
202		// If we are out of space force a db clean
203		go func() { _ = l.cleaner.clean(true) }()
204	}
205
206	// Notably missing here is the error handling for when DB open fails but on
207	// an error other than "db is corrupted". We simply return the error here
208	// without resetting `dbOpenerOnce` (i.e. next call into LevelDb would result
209	// in a LevelDBOpenClosedError), because if DB open fails, retrying it
210	// wouldn't help. We should find the root cause and deal with it.
211	// MM: 10/12/2017: I am changing the above policy. I am not so sure retrying it won't help,
212	// we should at least try instead of auto returning LevelDBOpenClosederror.
213	if err != nil {
214		l.Lock()
215		if l.db == nil {
216			l.G().Log.Debug("LevelDb: doWhileOpenAndNukeIfCorrupted: resetting sync one: %s", err)
217			l.dbOpenerOnce = new(sync.Once)
218		}
219		l.Unlock()
220	}
221	return err
222}
223
224// ForceOpen opens the leveldb file.  This is used in situations
225// where we want to get around the lazy open and make sure we can
226// use it later.
227func (l *LevelDb) ForceOpen() error {
228	return l.doWhileOpenAndNukeIfCorrupted(func() error { return nil })
229}
230
231func (l *LevelDb) Stats() (stats string) {
232	if err := l.doWhileOpenAndNukeIfCorrupted(func() (err error) {
233		stats, err = l.db.GetProperty("leveldb.stats")
234		stats = fmt.Sprintf("%s\n%s", stats, l.cleaner.Status())
235		return err
236	}); err != nil {
237		return ""
238	}
239	return stats
240}
241
242func (l *LevelDb) CompactionStats() (memActive, tableActive bool, err error) {
243	var dbStats leveldb.DBStats
244	if err := l.doWhileOpenAndNukeIfCorrupted(func() (err error) {
245		return l.db.Stats(&dbStats)
246	}); err != nil {
247		return false, false, err
248	}
249	return dbStats.MemCompactionActive, dbStats.TableCompactionActive, nil
250}
251
252func (l *LevelDb) GetFilename() string {
253	if len(l.filename) == 0 {
254		l.G().Log.Fatalf("DB filename empty")
255	}
256	return l.filename
257}
258
259func (l *LevelDb) Close() error {
260	l.Lock()
261	defer l.Unlock()
262	return l.closeLocked()
263}
264
265func (l *LevelDb) closeLocked() error {
266	var err error
267	if l.db != nil {
268		l.G().Log.Debug("Closing LevelDB local cache: %s", l.GetFilename())
269		err = l.db.Close()
270		l.db = nil
271
272		// In case we just nuked DB and reset the dbOpenerOnce, this makes sure it
273		// doesn't open the DB again.
274		l.dbOpenerOnce.Do(func() {})
275		// stop any active cleaning jobs
276		l.cleaner.Stop()
277		l.cleaner.Shutdown()
278	}
279	return err
280}
281
282func (l *LevelDb) isCorrupt(err error) bool {
283	if err == nil {
284		return false
285	}
286
287	// If the error is of type ErrCorrupted, then we nuke
288	if _, ok := err.(*errors.ErrCorrupted); ok {
289		return true
290	}
291
292	// Sometimes the LevelDB library will return generic error messages about
293	// corruption, also nuke on them
294	if strings.Contains(err.Error(), "corrupt") {
295		return true
296	}
297	return false
298}
299
300func (l *LevelDb) Clean(force bool) (err error) {
301	l.Lock()
302	defer l.Unlock()
303	defer l.G().Trace("LevelDb::Clean", &err)()
304	return l.cleaner.clean(force)
305}
306
307func (l *LevelDb) Nuke() (fn string, err error) {
308	l.Lock()
309	// We need to do deferred Unlock here in Nuke rather than delegating to
310	// l.Close() because we'll be re-opening the database later, and it's
311	// necessary to block other doWhileOpenAndNukeIfCorrupted() calls.
312	defer l.Unlock()
313	defer l.G().Trace("LevelDb::Nuke", &err)()
314
315	// even if we can't close the db try to nuke the files directly
316	if err = l.closeLocked(); err != nil {
317		l.G().Log.Debug("Error closing leveldb %v, attempting nuke anyway", err)
318	}
319
320	fn = l.GetFilename()
321	if err = os.RemoveAll(fn); err != nil {
322		return fn, err
323	}
324	// reset dbOpenerOnce since this is not a explicit close and there might be
325	// more legitimate DB operations coming in
326	l.dbOpenerOnce = new(sync.Once)
327	return fn, err
328}
329
330func (l *LevelDb) nukeIfCorrupt(err error) bool {
331	if l.isCorrupt(err) {
332		l.G().Log.Debug("LevelDB file corrupted, nuking database and starting fresh")
333		if _, err := l.Nuke(); err != nil {
334			l.G().Log.Debug("Error nuking LevelDB file: %s", err)
335			return false
336		}
337		return true
338	}
339	return false
340}
341
342func (l *LevelDb) Put(id DbKey, aliases []DbKey, value []byte) error {
343	return l.doWhileOpenAndNukeIfCorrupted(func() error {
344		return levelDbPut(l.db, l.cleaner, id, aliases, value)
345	})
346}
347
348func (l *LevelDb) Get(id DbKey) (val []byte, found bool, err error) {
349	err = l.doWhileOpenAndNukeIfCorrupted(func() error {
350		val, found, err = levelDbGet(l.db, l.cleaner, id)
351		return err
352	})
353	return val, found, err
354}
355
356func (l *LevelDb) Lookup(id DbKey) (val []byte, found bool, err error) {
357	err = l.doWhileOpenAndNukeIfCorrupted(func() error {
358		val, found, err = levelDbLookup(l.db, l.cleaner, id)
359		return err
360	})
361	return val, found, err
362}
363
364func (l *LevelDb) Delete(id DbKey) error {
365	return l.doWhileOpenAndNukeIfCorrupted(func() error {
366		return levelDbDelete(l.db, l.cleaner, id)
367	})
368}
369
370func (l *LevelDb) OpenTransaction() (LocalDbTransaction, error) {
371	var (
372		ltr LevelDbTransaction
373		err error
374	)
375	if ltr.tr, err = l.db.OpenTransaction(); err != nil {
376		return LevelDbTransaction{}, err
377	}
378	ltr.cleaner = l.cleaner
379	return ltr, nil
380}
381
382func (l *LevelDb) KeysWithPrefixes(prefixes ...[]byte) (DBKeySet, error) {
383	m := make(map[DbKey]struct{})
384	err := l.doWhileOpenAndNukeIfCorrupted(func() error {
385		opts := &opt.ReadOptions{DontFillCache: true}
386		for _, prefix := range prefixes {
387			iter := l.db.NewIterator(util.BytesPrefix(prefix), opts)
388			for iter.Next() {
389				_, dbKey, err := DbKeyParse(string(iter.Key()))
390				if err != nil {
391					iter.Release()
392					return err
393				}
394				m[dbKey] = struct{}{}
395			}
396			iter.Release()
397			err := iter.Error()
398			if err != nil {
399				return nil
400			}
401		}
402		return nil
403	})
404	if err != nil {
405		return nil, err
406	}
407
408	return m, nil
409}
410
411type LevelDbTransaction struct {
412	tr      *leveldb.Transaction
413	cleaner *levelDbCleaner
414}
415
416func (l LevelDbTransaction) Put(id DbKey, aliases []DbKey, value []byte) error {
417	return levelDbPut(l.tr, l.cleaner, id, aliases, value)
418}
419
420func (l LevelDbTransaction) Get(id DbKey) (val []byte, found bool, err error) {
421	return levelDbGet(l.tr, l.cleaner, id)
422}
423
424func (l LevelDbTransaction) Lookup(id DbKey) (val []byte, found bool, err error) {
425	return levelDbLookup(l.tr, l.cleaner, id)
426}
427
428func (l LevelDbTransaction) Delete(id DbKey) error {
429	return levelDbDelete(l.tr, l.cleaner, id)
430}
431
432func (l LevelDbTransaction) Commit() (err error) {
433	defer convertNoSpaceError(&err)
434	return l.tr.Commit()
435}
436
437func (l LevelDbTransaction) Discard() {
438	l.tr.Discard()
439}
440
441func convertNoSpaceError(err *error) {
442	if IsNoSpaceOnDeviceError(*err) {
443		// embed in exportable error type
444		*err = NoSpaceOnDeviceError{Desc: (*err).Error()}
445	}
446}
447