1// Copyright 2020 Keybase Inc. All rights reserved.
2// Use of this source code is governed by a BSD
3// license that can be found in the LICENSE file.
4
5package search
6
7import (
8	"sync"
9	"time"
10
11	"github.com/keybase/client/go/kbfs/libkbfs"
12	"github.com/keybase/client/go/kbfs/tlf"
13	"github.com/keybase/client/go/protocol/keybase1"
14	"github.com/pkg/errors"
15)
16
17type indexType int
18
19const (
20	indexNone indexType = iota
21	indexFull
22	indexIncremental
23)
24
25// Progress represents the current state of the indexer, and how far
26// along the indexing is.
27type Progress struct {
28	clock libkbfs.Clock
29
30	lock             sync.RWMutex
31	tlfSizesToIndex  map[tlf.ID]uint64
32	currTlf          tlf.ID
33	currTotalToIndex uint64
34	currHasIndexed   uint64
35	currIndexType    indexType
36	currStartTime    time.Time
37	lastIndexRate    float64 // (bytes/second)
38}
39
40// NewProgress creates a new Progress instance.
41func NewProgress(clock libkbfs.Clock) *Progress {
42	return &Progress{
43		clock:           clock,
44		tlfSizesToIndex: make(map[tlf.ID]uint64),
45	}
46}
47
48func (p *Progress) tlfQueue(id tlf.ID, sizeEstimate uint64) {
49	p.lock.Lock()
50	defer p.lock.Unlock()
51
52	// Overwrite whatever was already there.
53	p.tlfSizesToIndex[id] = sizeEstimate
54}
55
56func (p *Progress) tlfUnqueue(id tlf.ID) {
57	p.lock.Lock()
58	defer p.lock.Unlock()
59
60	delete(p.tlfSizesToIndex, id)
61}
62
63func (p *Progress) startIndex(
64	id tlf.ID, sizeEstimate uint64, t indexType) error {
65	p.lock.Lock()
66	defer p.lock.Unlock()
67
68	if p.currTlf != tlf.NullID {
69		return errors.Errorf("Cannot index %s before finishing index of %s",
70			id, p.currTlf)
71	}
72
73	p.currTlf = id
74	delete(p.tlfSizesToIndex, id)
75	p.currTotalToIndex = sizeEstimate
76	p.currHasIndexed = 0
77	p.currIndexType = t
78	p.currStartTime = p.clock.Now()
79	return nil
80}
81
82func (p *Progress) indexedBytes(size uint64) {
83	p.lock.Lock()
84	defer p.lock.Unlock()
85
86	p.currHasIndexed += size
87	if p.currHasIndexed > p.currTotalToIndex {
88		// The provided size estimate was wrong.  But we don't know by
89		// how much.  So just add the newly-indexed bytes onto it, ot
90		// make sure we're still under the limit.
91		p.currTotalToIndex += size
92	}
93}
94
95func (p *Progress) finishIndex(id tlf.ID) error {
96	p.lock.Lock()
97	defer p.lock.Unlock()
98
99	if id != p.currTlf {
100		return errors.Errorf(
101			"Cannot finish index for %s, because %s is the current TLF",
102			id, p.currTlf)
103	}
104
105	// Estimate how long these bytes took to index.
106	timeSecs := p.clock.Now().Sub(p.currStartTime).Seconds()
107	if timeSecs > 0 {
108		p.lastIndexRate = float64(p.currHasIndexed) / timeSecs
109	} else {
110		p.lastIndexRate = 0
111	}
112
113	p.currTlf = tlf.NullID
114	p.currTotalToIndex = 0
115	p.currHasIndexed = 0
116	p.currIndexType = indexNone
117	p.currStartTime = time.Time{}
118	return nil
119}
120
121func (p *Progress) fillInProgressRecord(
122	total, soFar uint64, rate float64, rec *keybase1.IndexProgressRecord) {
123	if rate > 0 {
124		bytesLeft := total - soFar
125		timeLeft := time.Duration(
126			(float64(bytesLeft) / rate) * float64(time.Second))
127		rec.EndEstimate = keybase1.ToTime(p.clock.Now().Add(timeLeft))
128	}
129	rec.BytesSoFar = int64(soFar)
130	rec.BytesTotal = int64(total)
131}
132
133// GetStatus returns the current progress status.
134func (p *Progress) GetStatus() (
135	currProgress, overallProgress keybase1.IndexProgressRecord,
136	currTlf tlf.ID, queuedTlfs []tlf.ID) {
137	p.lock.RLock()
138	defer p.lock.RUnlock()
139
140	// At what rate is the current indexer running?
141	rate := p.lastIndexRate
142	if !p.currStartTime.IsZero() && p.currHasIndexed != 0 {
143		timeSecs := p.clock.Now().Sub(p.currStartTime).Seconds()
144		rate = float64(p.currHasIndexed) / timeSecs
145	}
146
147	if p.currTlf != tlf.NullID {
148		p.fillInProgressRecord(
149			p.currTotalToIndex, p.currHasIndexed, rate, &currProgress)
150	}
151
152	queuedTlfs = make([]tlf.ID, 0, len(p.tlfSizesToIndex))
153	var totalSize, soFar uint64
154	if p.currTlf != tlf.NullID {
155		totalSize += p.currTotalToIndex
156		soFar += p.currHasIndexed
157	}
158	for id, size := range p.tlfSizesToIndex {
159		if id == p.currTlf {
160			continue
161		}
162		totalSize += size
163		queuedTlfs = append(queuedTlfs, id)
164	}
165	if totalSize != 0 {
166		p.fillInProgressRecord(
167			totalSize, soFar, rate, &overallProgress)
168	}
169
170	return currProgress, overallProgress, p.currTlf, queuedTlfs
171}
172