1package chat
2
3import (
4	"container/heap"
5	"context"
6	"fmt"
7	"sync"
8	"time"
9
10	"github.com/keybase/client/go/chat/globals"
11	"github.com/keybase/client/go/chat/types"
12	"github.com/keybase/client/go/chat/utils"
13	"github.com/keybase/client/go/protocol/chat1"
14	"github.com/keybase/client/go/protocol/gregor1"
15	"github.com/keybase/clockwork"
16	"golang.org/x/sync/errgroup"
17)
18
19// An queueItem is something we manage in a priority queue.
20type queueItem struct {
21	purgeInfo chat1.EphemeralPurgeInfo
22
23	// The index is needed by `update` and is maintained by the heap.Interface methods.
24	index int // The index of the queueItem in the heap.
25}
26
27func (q *queueItem) String() string {
28	if q == nil {
29		return "<nil>"
30	}
31	return q.purgeInfo.String()
32}
33
34// A priorityQueue implements heap.Interface and holds queueItems.
35// We also keep a map of queueItems for easy item updates
36type priorityQueue struct {
37	sync.RWMutex
38
39	queue   []*queueItem
40	itemMap map[chat1.ConvIDStr]*queueItem
41}
42
43func newPriorityQueue() *priorityQueue {
44	return &priorityQueue{
45		queue:   []*queueItem{},
46		itemMap: make(map[chat1.ConvIDStr]*queueItem),
47	}
48}
49
50func (pq *priorityQueue) Len() int {
51	pq.RLock()
52	defer pq.RUnlock()
53
54	return len(pq.queue)
55}
56
57func (pq *priorityQueue) Less(i, j int) bool {
58	pq.RLock()
59	defer pq.RUnlock()
60
61	return pq.queue[i].purgeInfo.NextPurgeTime < pq.queue[j].purgeInfo.NextPurgeTime
62}
63
64func (pq *priorityQueue) Swap(i, j int) {
65	pq.Lock()
66	defer pq.Unlock()
67
68	pq.queue[i], pq.queue[j] = pq.queue[j], pq.queue[i]
69	pq.queue[i].index = i
70	pq.queue[j].index = j
71}
72
73// Note this method should not be used directly since we only want each
74// conversation to appear once in the heap. Use
75// `BackgroundEphemeralPurger.update` instead since it handles this as
76// intended.
77func (pq *priorityQueue) Push(x interface{}) {
78	pq.Lock()
79	defer pq.Unlock()
80
81	item := x.(*queueItem)
82	item.index = len(pq.queue)
83	pq.queue = append(pq.queue, item)
84	pq.itemMap[item.purgeInfo.ConvID.ConvIDStr()] = item
85}
86
87func (pq *priorityQueue) Pop() interface{} {
88	pq.Lock()
89	defer pq.Unlock()
90
91	n := len(pq.queue)
92	item := pq.queue[n-1]
93	item.index = -1 // for safety
94	pq.queue = pq.queue[:n-1]
95	delete(pq.itemMap, item.purgeInfo.ConvID.ConvIDStr())
96	return item
97}
98
99func (pq *priorityQueue) Peek() *queueItem {
100	pq.RLock()
101	defer pq.RUnlock()
102
103	if len(pq.queue) == 0 {
104		return nil
105	}
106	return pq.queue[0]
107}
108
109type BackgroundEphemeralPurger struct {
110	globals.Contextified
111	utils.DebugLabeler
112	// used to prevent concurrent calls to Start/Stop
113	lock sync.Mutex
114	// used to prevent concurrent modifications to `pq`
115	queueLock sync.Mutex
116
117	uid gregor1.UID
118	pq  *priorityQueue
119
120	started    bool
121	shutdownCh chan struct{}
122	eg         errgroup.Group
123	delay      time.Duration
124	clock      clockwork.Clock
125	purgeTimer *time.Timer
126}
127
128var _ types.EphemeralPurger = (*BackgroundEphemeralPurger)(nil)
129
130func NewBackgroundEphemeralPurger(g *globals.Context) *BackgroundEphemeralPurger {
131	return &BackgroundEphemeralPurger{
132		Contextified: globals.NewContextified(g),
133		DebugLabeler: utils.NewDebugLabeler(g.ExternalG(), "BackgroundEphemeralPurger", false),
134		delay:        500 * time.Millisecond,
135		clock:        clockwork.NewRealClock(),
136	}
137}
138
139func (b *BackgroundEphemeralPurger) SetClock(clock clockwork.Clock) {
140	b.clock = clock
141}
142
143func (b *BackgroundEphemeralPurger) Start(ctx context.Context, uid gregor1.UID) {
144	defer b.Trace(ctx, nil, "Start")()
145
146	b.lock.Lock()
147	defer b.lock.Unlock()
148	if b.started {
149		return
150	}
151
152	b.started = true
153	b.uid = uid
154	b.initQueue(ctx)
155	// Immediately fire to queue any purges we picked up during initQueue
156	b.purgeTimer = time.NewTimer(0)
157	shutdownCh := make(chan struct{})
158	b.shutdownCh = shutdownCh
159	b.eg.Go(func() error { return b.loop(shutdownCh) })
160}
161
162func (b *BackgroundEphemeralPurger) Stop(ctx context.Context) (ch chan struct{}) {
163	defer b.Trace(ctx, nil, "Stop")()
164	b.lock.Lock()
165	defer b.lock.Unlock()
166
167	ch = make(chan struct{})
168	if b.started {
169		close(b.shutdownCh)
170		b.started = false
171		go func() {
172			if err := b.eg.Wait(); err != nil {
173				b.Debug(ctx, "error stopping background loop: %v", err)
174			}
175			close(ch)
176		}()
177	} else {
178		close(ch)
179	}
180	return ch
181}
182
183func (b *BackgroundEphemeralPurger) Queue(ctx context.Context, purgeInfo chat1.EphemeralPurgeInfo) error {
184	b.queueLock.Lock()
185	defer b.queueLock.Unlock()
186
187	if b.pq == nil {
188		return fmt.Errorf("Must call Start() before adding to the Queue")
189	}
190
191	// skip duplicate items
192	item, ok := b.pq.itemMap[purgeInfo.ConvID.ConvIDStr()]
193	if ok && item.purgeInfo.Eq(purgeInfo) {
194		return nil
195	}
196	// We only keep active items in the queue.
197	if !purgeInfo.IsActive {
198		return nil
199	}
200
201	now := b.clock.Now()
202	nextPurgeTime := purgeInfo.NextPurgeTime.Time()
203	if nextPurgeTime.Before(now) || nextPurgeTime.Equal(now) {
204		b.addPurgeToConvLoaderLocked(ctx, purgeInfo)
205		return nil
206	}
207
208	// If we are starting the queue or get an earlier expiration time, reset or
209	// start the timer
210	head := b.pq.Peek()
211	if head == nil || purgeInfo.NextPurgeTime < head.purgeInfo.NextPurgeTime {
212		b.resetTimer(ctx, purgeInfo)
213	}
214	b.updateQueue(purgeInfo)
215	b.Debug(ctx, "Queue purgeInfo: %v, head: %+v, queueSize: %v",
216		purgeInfo, head, b.pq.Len())
217
218	// Sanity check to force our timer to fire if it hasn't for some reason.
219	head = b.pq.Peek()
220	if head.purgeInfo.NextPurgeTime.Time().Before(b.clock.Now()) {
221		b.Debug(ctx, "Queue resetting timer, head is in the past.")
222		b.resetTimer(ctx, head.purgeInfo)
223	}
224	return nil
225}
226
227// Read all purgeInfo from disk and startup our queue.
228func (b *BackgroundEphemeralPurger) initQueue(ctx context.Context) {
229	b.queueLock.Lock()
230	defer b.queueLock.Unlock()
231
232	// Create a new queue
233	b.pq = newPriorityQueue()
234	heap.Init(b.pq)
235
236	allPurgeInfo, err := b.G().EphemeralTracker.GetAllPurgeInfo(ctx, b.uid)
237	if err != nil {
238		b.Debug(ctx, "unable to get purgeInfo: %v", allPurgeInfo)
239	}
240	for _, purgeInfo := range allPurgeInfo {
241		if purgeInfo.IsActive {
242			b.updateQueue(purgeInfo)
243		}
244	}
245}
246
247func (b *BackgroundEphemeralPurger) updateQueue(purgeInfo chat1.EphemeralPurgeInfo) {
248	item, ok := b.pq.itemMap[purgeInfo.ConvID.ConvIDStr()]
249	if ok {
250		item.purgeInfo = purgeInfo
251		heap.Fix(b.pq, item.index)
252	} else {
253		heap.Push(b.pq, &queueItem{purgeInfo: purgeInfo})
254	}
255}
256
257// This runs when we are waiting to run a job but will shut itself down if we
258// have no work.
259func (b *BackgroundEphemeralPurger) loop(shutdownCh chan struct{}) error {
260	bgctx := context.Background()
261	b.Debug(bgctx, "loop: starting for %s", b.uid)
262	suspended := false
263	for {
264		select {
265		case <-b.purgeTimer.C:
266			b.Debug(bgctx, "loop: timer fired %s", b.uid)
267			b.queuePurges(bgctx)
268		case suspended = <-b.G().DesktopAppState.NextSuspendUpdate(&suspended):
269			if !suspended {
270				b.Debug(bgctx, "loop: queuing purges on resume %s", b.uid)
271				b.queuePurges(bgctx)
272			}
273		case <-shutdownCh:
274			b.Debug(bgctx, "loop: shutting down for %s", b.uid)
275			return nil
276		}
277	}
278}
279
280// Send any conversations that need an ephemeral message purged to the
281// convLoader. We reset our timer with the next minimum time (if any) returning
282// if the work loop should stop or not.
283func (b *BackgroundEphemeralPurger) queuePurges(ctx context.Context) bool {
284	defer b.Trace(ctx, nil, "queuePurges")()
285	b.queueLock.Lock()
286	defer b.queueLock.Unlock()
287
288	i := 0
289	// Peek into the queue for any expired convs
290	for _, item := range b.pq.queue {
291		purgeInfo := item.purgeInfo
292		now := b.clock.Now()
293		nextPurgeTime := purgeInfo.NextPurgeTime.Time()
294		if nextPurgeTime.Before(now) || nextPurgeTime.Equal(now) {
295			b.addPurgeToConvLoaderLocked(ctx, purgeInfo)
296			// Don't spam out to the convloader
297			if i > 0 {
298				b.Debug(ctx, "queuePurges sleeping for %v", b.delay)
299				b.clock.Sleep(b.delay)
300			}
301			i++
302		} else {
303			break
304		}
305	}
306	// Maintain the queue and pop off any items we just sent off for purging
307	for i > 0 {
308		heap.Pop(b.pq)
309		i--
310	}
311
312	nextItem := b.pq.Peek()
313	if nextItem == nil {
314		return true
315	}
316	// Reset our time for the next min item of the queue.
317	b.resetTimer(ctx, nextItem.purgeInfo)
318	return false
319}
320
321func (b *BackgroundEphemeralPurger) Len() int {
322	defer b.Trace(context.TODO(), nil, "Len")()
323	b.queueLock.Lock()
324	defer b.queueLock.Unlock()
325	return b.pq.Len()
326}
327
328func (b *BackgroundEphemeralPurger) addPurgeToConvLoaderLocked(ctx context.Context, purgeInfo chat1.EphemeralPurgeInfo) {
329	job := types.NewConvLoaderJob(purgeInfo.ConvID, &chat1.Pagination{Num: 0},
330		types.ConvLoaderPriorityHigh, types.ConvLoaderUnique,
331		newConvLoaderEphemeralPurgeHook(b.G(), b.uid, &purgeInfo))
332	if err := b.G().ConvLoader.Queue(ctx, job); err != nil {
333		b.Debug(ctx, "convLoader Queue error %s", err)
334	}
335}
336
337func (b *BackgroundEphemeralPurger) resetTimer(ctx context.Context, purgeInfo chat1.EphemeralPurgeInfo) {
338	duration := purgeInfo.NextPurgeTime.Time().Sub(b.clock.Now())
339	b.Debug(ctx, "resetTimer nextPurgeTime: %v, now: %v, duration: %v",
340		purgeInfo.NextPurgeTime.Time(), b.clock.Now(), duration)
341	b.purgeTimer.Stop()
342	b.purgeTimer.Reset(duration)
343}
344
345func newConvLoaderEphemeralPurgeHook(g *globals.Context, uid gregor1.UID, purgeInfo *chat1.EphemeralPurgeInfo) func(ctx context.Context, tv chat1.ThreadView, job types.ConvLoaderJob) {
346	return func(ctx context.Context, tv chat1.ThreadView, job types.ConvLoaderJob) {
347		if _, _, err := g.ConvSource.EphemeralPurge(ctx, job.ConvID, uid, purgeInfo); err != nil {
348			g.GetLog().CDebugf(ctx, "ephemeralPurge: %s", err)
349		}
350	}
351}
352