1package chat 2 3import ( 4 "container/heap" 5 "context" 6 "fmt" 7 "sync" 8 "time" 9 10 "github.com/keybase/client/go/chat/globals" 11 "github.com/keybase/client/go/chat/types" 12 "github.com/keybase/client/go/chat/utils" 13 "github.com/keybase/client/go/protocol/chat1" 14 "github.com/keybase/client/go/protocol/gregor1" 15 "github.com/keybase/clockwork" 16 "golang.org/x/sync/errgroup" 17) 18 19// An queueItem is something we manage in a priority queue. 20type queueItem struct { 21 purgeInfo chat1.EphemeralPurgeInfo 22 23 // The index is needed by `update` and is maintained by the heap.Interface methods. 24 index int // The index of the queueItem in the heap. 25} 26 27func (q *queueItem) String() string { 28 if q == nil { 29 return "<nil>" 30 } 31 return q.purgeInfo.String() 32} 33 34// A priorityQueue implements heap.Interface and holds queueItems. 35// We also keep a map of queueItems for easy item updates 36type priorityQueue struct { 37 sync.RWMutex 38 39 queue []*queueItem 40 itemMap map[chat1.ConvIDStr]*queueItem 41} 42 43func newPriorityQueue() *priorityQueue { 44 return &priorityQueue{ 45 queue: []*queueItem{}, 46 itemMap: make(map[chat1.ConvIDStr]*queueItem), 47 } 48} 49 50func (pq *priorityQueue) Len() int { 51 pq.RLock() 52 defer pq.RUnlock() 53 54 return len(pq.queue) 55} 56 57func (pq *priorityQueue) Less(i, j int) bool { 58 pq.RLock() 59 defer pq.RUnlock() 60 61 return pq.queue[i].purgeInfo.NextPurgeTime < pq.queue[j].purgeInfo.NextPurgeTime 62} 63 64func (pq *priorityQueue) Swap(i, j int) { 65 pq.Lock() 66 defer pq.Unlock() 67 68 pq.queue[i], pq.queue[j] = pq.queue[j], pq.queue[i] 69 pq.queue[i].index = i 70 pq.queue[j].index = j 71} 72 73// Note this method should not be used directly since we only want each 74// conversation to appear once in the heap. Use 75// `BackgroundEphemeralPurger.update` instead since it handles this as 76// intended. 77func (pq *priorityQueue) Push(x interface{}) { 78 pq.Lock() 79 defer pq.Unlock() 80 81 item := x.(*queueItem) 82 item.index = len(pq.queue) 83 pq.queue = append(pq.queue, item) 84 pq.itemMap[item.purgeInfo.ConvID.ConvIDStr()] = item 85} 86 87func (pq *priorityQueue) Pop() interface{} { 88 pq.Lock() 89 defer pq.Unlock() 90 91 n := len(pq.queue) 92 item := pq.queue[n-1] 93 item.index = -1 // for safety 94 pq.queue = pq.queue[:n-1] 95 delete(pq.itemMap, item.purgeInfo.ConvID.ConvIDStr()) 96 return item 97} 98 99func (pq *priorityQueue) Peek() *queueItem { 100 pq.RLock() 101 defer pq.RUnlock() 102 103 if len(pq.queue) == 0 { 104 return nil 105 } 106 return pq.queue[0] 107} 108 109type BackgroundEphemeralPurger struct { 110 globals.Contextified 111 utils.DebugLabeler 112 // used to prevent concurrent calls to Start/Stop 113 lock sync.Mutex 114 // used to prevent concurrent modifications to `pq` 115 queueLock sync.Mutex 116 117 uid gregor1.UID 118 pq *priorityQueue 119 120 started bool 121 shutdownCh chan struct{} 122 eg errgroup.Group 123 delay time.Duration 124 clock clockwork.Clock 125 purgeTimer *time.Timer 126} 127 128var _ types.EphemeralPurger = (*BackgroundEphemeralPurger)(nil) 129 130func NewBackgroundEphemeralPurger(g *globals.Context) *BackgroundEphemeralPurger { 131 return &BackgroundEphemeralPurger{ 132 Contextified: globals.NewContextified(g), 133 DebugLabeler: utils.NewDebugLabeler(g.ExternalG(), "BackgroundEphemeralPurger", false), 134 delay: 500 * time.Millisecond, 135 clock: clockwork.NewRealClock(), 136 } 137} 138 139func (b *BackgroundEphemeralPurger) SetClock(clock clockwork.Clock) { 140 b.clock = clock 141} 142 143func (b *BackgroundEphemeralPurger) Start(ctx context.Context, uid gregor1.UID) { 144 defer b.Trace(ctx, nil, "Start")() 145 146 b.lock.Lock() 147 defer b.lock.Unlock() 148 if b.started { 149 return 150 } 151 152 b.started = true 153 b.uid = uid 154 b.initQueue(ctx) 155 // Immediately fire to queue any purges we picked up during initQueue 156 b.purgeTimer = time.NewTimer(0) 157 shutdownCh := make(chan struct{}) 158 b.shutdownCh = shutdownCh 159 b.eg.Go(func() error { return b.loop(shutdownCh) }) 160} 161 162func (b *BackgroundEphemeralPurger) Stop(ctx context.Context) (ch chan struct{}) { 163 defer b.Trace(ctx, nil, "Stop")() 164 b.lock.Lock() 165 defer b.lock.Unlock() 166 167 ch = make(chan struct{}) 168 if b.started { 169 close(b.shutdownCh) 170 b.started = false 171 go func() { 172 if err := b.eg.Wait(); err != nil { 173 b.Debug(ctx, "error stopping background loop: %v", err) 174 } 175 close(ch) 176 }() 177 } else { 178 close(ch) 179 } 180 return ch 181} 182 183func (b *BackgroundEphemeralPurger) Queue(ctx context.Context, purgeInfo chat1.EphemeralPurgeInfo) error { 184 b.queueLock.Lock() 185 defer b.queueLock.Unlock() 186 187 if b.pq == nil { 188 return fmt.Errorf("Must call Start() before adding to the Queue") 189 } 190 191 // skip duplicate items 192 item, ok := b.pq.itemMap[purgeInfo.ConvID.ConvIDStr()] 193 if ok && item.purgeInfo.Eq(purgeInfo) { 194 return nil 195 } 196 // We only keep active items in the queue. 197 if !purgeInfo.IsActive { 198 return nil 199 } 200 201 now := b.clock.Now() 202 nextPurgeTime := purgeInfo.NextPurgeTime.Time() 203 if nextPurgeTime.Before(now) || nextPurgeTime.Equal(now) { 204 b.addPurgeToConvLoaderLocked(ctx, purgeInfo) 205 return nil 206 } 207 208 // If we are starting the queue or get an earlier expiration time, reset or 209 // start the timer 210 head := b.pq.Peek() 211 if head == nil || purgeInfo.NextPurgeTime < head.purgeInfo.NextPurgeTime { 212 b.resetTimer(ctx, purgeInfo) 213 } 214 b.updateQueue(purgeInfo) 215 b.Debug(ctx, "Queue purgeInfo: %v, head: %+v, queueSize: %v", 216 purgeInfo, head, b.pq.Len()) 217 218 // Sanity check to force our timer to fire if it hasn't for some reason. 219 head = b.pq.Peek() 220 if head.purgeInfo.NextPurgeTime.Time().Before(b.clock.Now()) { 221 b.Debug(ctx, "Queue resetting timer, head is in the past.") 222 b.resetTimer(ctx, head.purgeInfo) 223 } 224 return nil 225} 226 227// Read all purgeInfo from disk and startup our queue. 228func (b *BackgroundEphemeralPurger) initQueue(ctx context.Context) { 229 b.queueLock.Lock() 230 defer b.queueLock.Unlock() 231 232 // Create a new queue 233 b.pq = newPriorityQueue() 234 heap.Init(b.pq) 235 236 allPurgeInfo, err := b.G().EphemeralTracker.GetAllPurgeInfo(ctx, b.uid) 237 if err != nil { 238 b.Debug(ctx, "unable to get purgeInfo: %v", allPurgeInfo) 239 } 240 for _, purgeInfo := range allPurgeInfo { 241 if purgeInfo.IsActive { 242 b.updateQueue(purgeInfo) 243 } 244 } 245} 246 247func (b *BackgroundEphemeralPurger) updateQueue(purgeInfo chat1.EphemeralPurgeInfo) { 248 item, ok := b.pq.itemMap[purgeInfo.ConvID.ConvIDStr()] 249 if ok { 250 item.purgeInfo = purgeInfo 251 heap.Fix(b.pq, item.index) 252 } else { 253 heap.Push(b.pq, &queueItem{purgeInfo: purgeInfo}) 254 } 255} 256 257// This runs when we are waiting to run a job but will shut itself down if we 258// have no work. 259func (b *BackgroundEphemeralPurger) loop(shutdownCh chan struct{}) error { 260 bgctx := context.Background() 261 b.Debug(bgctx, "loop: starting for %s", b.uid) 262 suspended := false 263 for { 264 select { 265 case <-b.purgeTimer.C: 266 b.Debug(bgctx, "loop: timer fired %s", b.uid) 267 b.queuePurges(bgctx) 268 case suspended = <-b.G().DesktopAppState.NextSuspendUpdate(&suspended): 269 if !suspended { 270 b.Debug(bgctx, "loop: queuing purges on resume %s", b.uid) 271 b.queuePurges(bgctx) 272 } 273 case <-shutdownCh: 274 b.Debug(bgctx, "loop: shutting down for %s", b.uid) 275 return nil 276 } 277 } 278} 279 280// Send any conversations that need an ephemeral message purged to the 281// convLoader. We reset our timer with the next minimum time (if any) returning 282// if the work loop should stop or not. 283func (b *BackgroundEphemeralPurger) queuePurges(ctx context.Context) bool { 284 defer b.Trace(ctx, nil, "queuePurges")() 285 b.queueLock.Lock() 286 defer b.queueLock.Unlock() 287 288 i := 0 289 // Peek into the queue for any expired convs 290 for _, item := range b.pq.queue { 291 purgeInfo := item.purgeInfo 292 now := b.clock.Now() 293 nextPurgeTime := purgeInfo.NextPurgeTime.Time() 294 if nextPurgeTime.Before(now) || nextPurgeTime.Equal(now) { 295 b.addPurgeToConvLoaderLocked(ctx, purgeInfo) 296 // Don't spam out to the convloader 297 if i > 0 { 298 b.Debug(ctx, "queuePurges sleeping for %v", b.delay) 299 b.clock.Sleep(b.delay) 300 } 301 i++ 302 } else { 303 break 304 } 305 } 306 // Maintain the queue and pop off any items we just sent off for purging 307 for i > 0 { 308 heap.Pop(b.pq) 309 i-- 310 } 311 312 nextItem := b.pq.Peek() 313 if nextItem == nil { 314 return true 315 } 316 // Reset our time for the next min item of the queue. 317 b.resetTimer(ctx, nextItem.purgeInfo) 318 return false 319} 320 321func (b *BackgroundEphemeralPurger) Len() int { 322 defer b.Trace(context.TODO(), nil, "Len")() 323 b.queueLock.Lock() 324 defer b.queueLock.Unlock() 325 return b.pq.Len() 326} 327 328func (b *BackgroundEphemeralPurger) addPurgeToConvLoaderLocked(ctx context.Context, purgeInfo chat1.EphemeralPurgeInfo) { 329 job := types.NewConvLoaderJob(purgeInfo.ConvID, &chat1.Pagination{Num: 0}, 330 types.ConvLoaderPriorityHigh, types.ConvLoaderUnique, 331 newConvLoaderEphemeralPurgeHook(b.G(), b.uid, &purgeInfo)) 332 if err := b.G().ConvLoader.Queue(ctx, job); err != nil { 333 b.Debug(ctx, "convLoader Queue error %s", err) 334 } 335} 336 337func (b *BackgroundEphemeralPurger) resetTimer(ctx context.Context, purgeInfo chat1.EphemeralPurgeInfo) { 338 duration := purgeInfo.NextPurgeTime.Time().Sub(b.clock.Now()) 339 b.Debug(ctx, "resetTimer nextPurgeTime: %v, now: %v, duration: %v", 340 purgeInfo.NextPurgeTime.Time(), b.clock.Now(), duration) 341 b.purgeTimer.Stop() 342 b.purgeTimer.Reset(duration) 343} 344 345func newConvLoaderEphemeralPurgeHook(g *globals.Context, uid gregor1.UID, purgeInfo *chat1.EphemeralPurgeInfo) func(ctx context.Context, tv chat1.ThreadView, job types.ConvLoaderJob) { 346 return func(ctx context.Context, tv chat1.ThreadView, job types.ConvLoaderJob) { 347 if _, _, err := g.ConvSource.EphemeralPurge(ctx, job.ConvID, uid, purgeInfo); err != nil { 348 g.GetLog().CDebugf(ctx, "ephemeralPurge: %s", err) 349 } 350 } 351} 352