1package libkb
2
3import (
4	"bytes"
5	"errors"
6	"fmt"
7	"sync"
8	"time"
9
10	humanize "github.com/dustin/go-humanize"
11	lru "github.com/hashicorp/golang-lru"
12	keybase1 "github.com/keybase/client/go/protocol/keybase1"
13	"github.com/syndtr/goleveldb/leveldb"
14	"github.com/syndtr/goleveldb/leveldb/opt"
15	"github.com/syndtr/goleveldb/leveldb/util"
16	"golang.org/x/net/context"
17)
18
19type DbCleanerConfig struct {
20	// start cleaning if above this size
21	MaxSize uint64
22	// stop cleaning when below this size
23	HaltSize uint64
24	// attempt a clean with this frequency
25	CleanInterval time.Duration
26	// number of keys to keep cached
27	CacheCapacity int
28	// number of keys cached to run a clean
29	MinCacheSize int
30	// duration cleaner sleeps when cleaning
31	SleepInterval time.Duration
32}
33
34func (c DbCleanerConfig) String() string {
35	return fmt.Sprintf("DbCleanerConfig{MaxSize: %v, HaltSize: %v, CleanInterval: %v, CacheCapacity: %v, MinCacheSize: %v, SleepInterval: %v}",
36		humanize.Bytes(c.MaxSize), humanize.Bytes(c.HaltSize),
37		c.CleanInterval, c.CacheCapacity,
38		c.MinCacheSize, c.SleepInterval)
39}
40
41var DefaultMobileDbCleanerConfig = DbCleanerConfig{
42	MaxSize:       opt.GiB,
43	HaltSize:      opt.GiB * .75,
44	CleanInterval: time.Hour,
45	CacheCapacity: 100000,
46	MinCacheSize:  10000,
47	SleepInterval: 10 * time.Millisecond,
48}
49
50var DefaultDesktopDbCleanerConfig = DbCleanerConfig{
51	MaxSize:       2 * opt.GiB,
52	HaltSize:      1.5 * opt.GiB,
53	CleanInterval: time.Hour,
54	CacheCapacity: 100000,
55	MinCacheSize:  10000,
56	SleepInterval: 50 * time.Millisecond,
57}
58
59type levelDbCleaner struct {
60	MetaContextified
61	sync.Mutex
62
63	running  bool
64	lastKey  []byte
65	lastRun  time.Time
66	dbName   string
67	config   DbCleanerConfig
68	cache    *lru.Cache
69	cacheMu  sync.Mutex // protects the pointer to the cache
70	isMobile bool
71	db       *leveldb.DB
72	stopCh   chan struct{}
73	cancelCh chan struct{}
74
75	isShutdown bool
76}
77
78func newLevelDbCleaner(mctx MetaContext, dbName string) *levelDbCleaner {
79	config := DefaultDesktopDbCleanerConfig
80	isMobile := mctx.G().IsMobileAppType()
81	if isMobile {
82		config = DefaultMobileDbCleanerConfig
83	}
84	return newLevelDbCleanerWithConfig(mctx, dbName, config, isMobile)
85}
86
87func newLevelDbCleanerWithConfig(mctx MetaContext, dbName string, config DbCleanerConfig, isMobile bool) *levelDbCleaner {
88	cache, err := lru.New(config.CacheCapacity)
89	if err != nil {
90		panic(err)
91	}
92	mctx = mctx.WithLogTag("DBCLN")
93	c := &levelDbCleaner{
94		MetaContextified: NewMetaContextified(mctx),
95		// Start the run shortly after starting but not immediately
96		lastRun:  mctx.G().GetClock().Now().Add(-(config.CleanInterval - config.CleanInterval/10)),
97		dbName:   dbName,
98		config:   config,
99		cache:    cache,
100		isMobile: isMobile,
101		stopCh:   make(chan struct{}),
102		cancelCh: make(chan struct{}),
103	}
104	if isMobile {
105		go c.monitorAppState()
106	}
107	return c
108}
109
110func (c *levelDbCleaner) getCache() *lru.Cache {
111	c.cacheMu.Lock()
112	defer c.cacheMu.Unlock()
113	return c.cache
114}
115
116func (c *levelDbCleaner) Status() string {
117	return fmt.Sprintf("levelDbCleaner{cacheSize: %d, lastRun: %v, lastKey: %v, running: %v}\n%v\n",
118		c.cache.Len(), c.lastRun, c.lastKey, c.running, c.config)
119}
120
121func (c *levelDbCleaner) Stop() {
122	c.log("Stop")
123	c.Lock()
124	defer c.Unlock()
125	if c.stopCh != nil {
126		close(c.stopCh)
127		c.stopCh = make(chan struct{})
128	}
129}
130
131func (c *levelDbCleaner) monitorAppState() {
132	c.log("monitorAppState")
133	state := keybase1.MobileAppState_FOREGROUND
134	for {
135		select {
136		case state = <-c.G().MobileAppState.NextUpdate(&state):
137			switch state {
138			case keybase1.MobileAppState_BACKGROUNDACTIVE:
139			default:
140				c.log("monitorAppState: attempting cancel, state: %v", state)
141				c.Lock()
142				if c.cancelCh != nil {
143					close(c.cancelCh)
144					c.cancelCh = make(chan struct{})
145				}
146				c.Unlock()
147			}
148		case <-c.stopCh:
149			c.log("monitorAppState: stop")
150			return
151		}
152	}
153}
154
155func (c *levelDbCleaner) log(format string, args ...interface{}) {
156	c.M().Debug(fmt.Sprintf("levelDbCleaner(%s): %s", c.dbName, format), args...)
157}
158
159func (c *levelDbCleaner) setDb(db *leveldb.DB) {
160	c.Lock()
161	defer c.Unlock()
162	c.db = db
163}
164
165func (c *levelDbCleaner) cacheKey(key []byte) string {
166	return string(key)
167}
168
169func (c *levelDbCleaner) clearCache() {
170	c.cache.Purge()
171}
172
173func (c *levelDbCleaner) Shutdown() {
174	c.cacheMu.Lock()
175	defer c.cacheMu.Unlock()
176	c.cache, _ = lru.New(1)
177	c.isShutdown = true
178}
179
180func (c *levelDbCleaner) shouldCleanLocked(force bool) bool {
181	if c.running {
182		return false
183	}
184	if force {
185		return true
186	}
187	validCache := c.getCache().Len() >= c.config.MinCacheSize
188	return validCache &&
189		c.G().GetClock().Now().Sub(c.lastRun) >= c.config.CleanInterval
190}
191
192func (c *levelDbCleaner) getDbSize() (size uint64, err error) {
193	if c.db == nil {
194		return 0, nil
195	}
196	// get the size from the start of the kv table to the beginning of the perm
197	// table since that is all we can clean
198	dbRange := util.Range{Start: tablePrefix(levelDbTableKv), Limit: tablePrefix(levelDbTablePerm)}
199	sizes, err := c.db.SizeOf([]util.Range{dbRange})
200	if err != nil {
201		return 0, err
202	}
203	return uint64(sizes.Sum()), nil
204}
205
206func (c *levelDbCleaner) clean(force bool) (err error) {
207	c.Lock()
208	// get out without spamming the logs
209	if !c.shouldCleanLocked(force) {
210		c.Unlock()
211		return nil
212	}
213	c.running = true
214	key := c.lastKey
215	c.Unlock()
216
217	defer c.M().Trace(fmt.Sprintf("levelDbCleaner(%s) clean, config: %v", c.dbName, c.config), &err)()
218	defer func() {
219		c.Lock()
220		defer c.Unlock()
221		c.lastKey = key
222		c.lastRun = c.G().GetClock().Now()
223		c.running = false
224	}()
225
226	dbSize, err := c.getDbSize()
227	if err != nil {
228		return err
229	}
230
231	c.log("dbSize: %v, cacheSize: %v",
232		humanize.Bytes(dbSize), c.getCache().Len())
233	// check db size, abort if small enough
234	if !force && dbSize < c.config.MaxSize {
235		return nil
236	}
237
238	var totalNumPurged, numPurged int
239	for i := 0; i < 100; i++ {
240		select {
241		case <-c.cancelCh:
242			c.log("aborting clean, %d runs, canceled", i)
243			return nil
244		case <-c.stopCh:
245			c.log("aborting clean %d runs, stopped", i)
246			return nil
247		default:
248		}
249
250		start := c.G().GetClock().Now()
251		numPurged, key, err = c.cleanBatch(key)
252		if err != nil {
253			return err
254		}
255		if numPurged == 0 {
256			break
257		}
258		totalNumPurged += numPurged
259
260		if i%10 == 0 {
261			c.log("purged %d items, dbSize: %v, lastKey:%s, ran in: %v",
262				numPurged, humanize.Bytes(dbSize), key, c.G().GetClock().Now().Sub(start))
263		}
264		// check if we are within limits
265		dbSize, err = c.getDbSize()
266		if err != nil {
267			return err
268		}
269		// check db size, abort if small enough
270		if !force && dbSize < c.config.HaltSize {
271			break
272		}
273		time.Sleep(c.config.SleepInterval)
274	}
275	c.log("clean complete. purged %d items total, dbSize: %v", totalNumPurged, humanize.Bytes(dbSize))
276	return nil
277}
278
279func (c *levelDbCleaner) cleanBatch(startKey []byte) (int, []byte, error) {
280	// Start our range from wherever we left off last time, and clean up until
281	// the permanent entries table begins.
282	iterRange := &util.Range{Start: startKey, Limit: tablePrefix(levelDbTablePerm)}
283	// Option suggested in
284	// https://github.com/google/leveldb/blob/master/doc/index.md#cache
285	// """When performing a bulk read, the application may wish to disable
286	// caching so that the data processed by the bulk read does not end up
287	// displacing most of the cached contents."""
288	opts := &opt.ReadOptions{DontFillCache: true}
289	iter := c.db.NewIterator(iterRange, opts)
290	batch := new(leveldb.Batch)
291	for batch.Len() < 1000 && iter.Next() {
292		key := iter.Key()
293
294		c.cacheMu.Lock()
295		if c.isShutdown {
296			c.cacheMu.Unlock()
297			return 0, nil, errors.New("cleanBatch: cancelled due to shutdown")
298		}
299		cache := c.cache
300		c.cacheMu.Unlock()
301
302		if _, found := cache.Get(c.cacheKey(key)); !found {
303			cp := make([]byte, len(key))
304			copy(cp, key)
305			batch.Delete(cp)
306		} else {
307			// clear out the value from the lru
308			cache.Remove(c.cacheKey(key))
309		}
310	}
311	key := make([]byte, len(iter.Key()))
312	copy(key, iter.Key())
313	// see if we have reached the end of the db, if so explicitly reset the
314	// key value
315	iter.Last()
316	if bytes.Equal(key, iter.Key()) {
317		key = nil
318	}
319	iter.Release()
320	if err := iter.Error(); err != nil {
321		return 0, nil, err
322	}
323	if err := c.db.Write(batch, nil); err != nil {
324		return 0, nil, err
325	}
326	// Compact the range we just deleted in so the size changes are reflected
327	err := c.db.CompactRange(util.Range{Start: startKey, Limit: key})
328	return batch.Len(), key, err
329}
330
331func (c *levelDbCleaner) attemptClean(ctx context.Context) {
332	go func() {
333		if err := c.clean(false /*force */); err != nil {
334			c.log("unable to clean: %v", err)
335		}
336	}()
337}
338
339func (c *levelDbCleaner) markRecentlyUsed(ctx context.Context, key []byte) {
340	c.getCache().Add(c.cacheKey(key), true)
341	c.attemptClean(ctx)
342}
343
344func (c *levelDbCleaner) removeRecentlyUsed(ctx context.Context, key []byte) {
345	c.getCache().Remove(c.cacheKey(key))
346	c.attemptClean(ctx)
347}
348