1package chat
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"net"
8	"sync"
9	"time"
10
11	"github.com/keybase/client/go/chat/attachments"
12	"github.com/keybase/client/go/chat/globals"
13	"github.com/keybase/client/go/chat/storage"
14	"github.com/keybase/client/go/chat/types"
15	"github.com/keybase/client/go/chat/utils"
16	"github.com/keybase/client/go/libkb"
17	"github.com/keybase/client/go/protocol/chat1"
18	"github.com/keybase/client/go/protocol/gregor1"
19	"github.com/keybase/client/go/protocol/keybase1"
20	"github.com/keybase/clockwork"
21	"golang.org/x/sync/errgroup"
22)
23
24const deliverMaxAttempts = 180           // fifteen minutes in default mode
25const deliverDisconnectLimitMinutes = 10 // need to be offline for at least 10 minutes before auto failing a send
26
27type DelivererInfoError interface {
28	IsImmediateFail() (chat1.OutboxErrorType, bool)
29}
30
31type senderError struct {
32	msg       string
33	permanent bool
34}
35
36func newSenderError(msg string, permanent bool) *senderError {
37	return &senderError{
38		msg:       msg,
39		permanent: permanent,
40	}
41}
42
43func (e *senderError) Error() string {
44	return fmt.Sprintf("senderError: %v, permanent: %v", e.msg, e.permanent)
45}
46
47func (e *senderError) IsImmediateFail() (chat1.OutboxErrorType, bool) {
48	return chat1.OutboxErrorType_MISC, e.permanent
49}
50
51// delivererExpireError is used when a message fails because it has languished
52// in the outbox for too long.
53type delivererExpireError struct{}
54
55func (e delivererExpireError) Error() string {
56	return "message failed to send"
57}
58
59func (e delivererExpireError) IsImmediateFail() (chat1.OutboxErrorType, bool) {
60	return chat1.OutboxErrorType_EXPIRED, true
61}
62
63type Deliverer struct {
64	globals.Contextified
65	sync.Mutex
66	utils.DebugLabeler
67
68	sender           types.Sender
69	serverConn       types.ServerConnection
70	outbox           *storage.Outbox
71	identNotifier    types.IdentifyNotifier
72	shutdownCh       chan struct{}
73	msgSentCh        chan struct{}
74	reconnectCh      chan struct{}
75	kbfsDeliverQueue chan chat1.OutboxRecord
76	delivering       bool
77	connected        bool
78	disconnTime      time.Time
79	clock            clockwork.Clock
80	eg               errgroup.Group
81
82	notifyFailureChsMu sync.Mutex
83	notifyFailureChs   map[string]chan []chat1.OutboxRecord
84
85	// Testing
86	testingNameInfoSource types.NameInfoSource
87}
88
89var _ types.MessageDeliverer = (*Deliverer)(nil)
90
91func NewDeliverer(g *globals.Context, sender types.Sender, serverConn types.ServerConnection) *Deliverer {
92	d := &Deliverer{
93		Contextified:     globals.NewContextified(g),
94		DebugLabeler:     utils.NewDebugLabeler(g.ExternalG(), "Deliverer", false),
95		msgSentCh:        make(chan struct{}, 100),
96		reconnectCh:      make(chan struct{}, 100),
97		kbfsDeliverQueue: make(chan chat1.OutboxRecord, 100),
98		sender:           sender,
99		identNotifier:    NewCachingIdentifyNotifier(g),
100		clock:            clockwork.NewRealClock(),
101		notifyFailureChs: make(map[string]chan []chat1.OutboxRecord),
102		serverConn:       serverConn,
103	}
104
105	d.identNotifier.ResetOnGUIConnect()
106	return d
107}
108
109func (s *Deliverer) setTestingNameInfoSource(ni types.NameInfoSource) {
110	s.testingNameInfoSource = ni
111}
112
113func (s *Deliverer) presentUIItem(ctx context.Context, uid gregor1.UID, conv *chat1.ConversationLocal) (res *chat1.InboxUIItem) {
114	if conv != nil {
115		pc := utils.PresentConversationLocal(ctx, s.G(), uid, *conv, utils.PresentParticipantsModeSkip)
116		res = &pc
117	}
118	return res
119}
120
121func (s *Deliverer) Start(ctx context.Context, uid gregor1.UID) {
122	s.Lock()
123	defer s.Unlock()
124
125	<-s.doStop(ctx)
126	s.outbox = storage.NewOutbox(s.G(), uid,
127		storage.PendingPreviewer(func(ctx context.Context, obr *chat1.OutboxRecord) error {
128			return attachments.AddPendingPreview(ctx, s.G(), obr)
129		}),
130		storage.NewMessageNotifier(func(ctx context.Context, obr chat1.OutboxRecord) {
131			uid := obr.Msg.ClientHeader.Sender
132			convID := obr.ConvID
133
134			// fill in reply
135			msg, err := NewReplyFiller(s.G()).FillSingle(ctx, uid, convID,
136				chat1.NewMessageUnboxedWithOutbox(obr))
137			if err != nil {
138				s.Debug(ctx, "outboxNotify: failed to get replyto: %s", err)
139			} else {
140				obr.ReplyTo = &msg
141			}
142			emojiText := obr.Msg.MessageBody.TextForDecoration()
143			if len(emojiText) > 0 {
144				if obr.Msg.Emojis, err = s.G().EmojiSource.Harvest(ctx, emojiText,
145					uid, convID, types.EmojiHarvestModeFast); err != nil {
146					s.Debug(ctx, "outboxNotify: failed to get emojis: %s", err)
147				}
148			}
149
150			act := chat1.NewChatActivityWithIncomingMessage(chat1.IncomingMessage{
151				Message: utils.PresentMessageUnboxed(ctx, s.G(), chat1.NewMessageUnboxedWithOutbox(obr),
152					uid, convID),
153				ConvID: convID,
154			})
155			s.G().ActivityNotifier.Activity(ctx, uid, obr.Msg.ClientHeader.Conv.TopicType, &act,
156				chat1.ChatActivitySource_LOCAL)
157		}))
158	s.outbox.SetClock(s.clock)
159
160	s.delivering = true
161	s.shutdownCh = make(chan struct{})
162	s.eg.Go(func() error { return s.deliverLoop(s.shutdownCh) })
163	s.eg.Go(func() error { return s.kbfsDeliverLoop(s.shutdownCh) })
164}
165
166func (s *Deliverer) Stop(ctx context.Context) chan struct{} {
167	s.Lock()
168	defer s.Unlock()
169	return s.doStop(ctx)
170}
171
172func (s *Deliverer) doStop(ctx context.Context) chan struct{} {
173	cb := make(chan struct{})
174	if s.delivering {
175		s.Debug(ctx, "stopping")
176		close(s.shutdownCh)
177		s.delivering = false
178		go func() {
179			if err := s.eg.Wait(); err != nil {
180				s.Debug(ctx, "unable to stop loops: %v", err)
181			}
182			close(cb)
183		}()
184		return cb
185	}
186	close(cb)
187	return cb
188}
189
190func (s *Deliverer) ForceDeliverLoop(ctx context.Context) {
191	s.Debug(ctx, "force deliver loop invoked")
192	s.msgSentCh <- struct{}{}
193}
194
195func (s *Deliverer) SetSender(sender types.Sender) {
196	s.sender = sender
197}
198
199func (s *Deliverer) SetClock(clock clockwork.Clock) {
200	s.clock = clock
201}
202
203func (s *Deliverer) Connected(ctx context.Context) {
204	s.connected = true
205
206	// Wake up deliver loop on reconnect
207	s.Debug(ctx, "reconnected: forcing deliver loop run")
208	s.reconnectCh <- struct{}{}
209}
210
211func (s *Deliverer) Disconnected(ctx context.Context) {
212	s.Debug(ctx, "disconnected: all errors from now on will be permanent")
213	s.connected = false
214	s.disconnTime = s.clock.Now()
215}
216
217func (s *Deliverer) disconnectedTime() time.Duration {
218	if s.connected {
219		return 0
220	}
221	return s.clock.Now().Sub(s.disconnTime)
222}
223
224func (s *Deliverer) IsOffline(ctx context.Context) bool {
225	return !s.connected
226}
227
228func (s *Deliverer) IsDelivering() bool {
229	s.Lock()
230	defer s.Unlock()
231	return s.delivering
232}
233
234func (s *Deliverer) Queue(ctx context.Context, convID chat1.ConversationID, msg chat1.MessagePlaintext,
235	outboxID *chat1.OutboxID, sendOpts *chat1.SenderSendOptions, prepareOpts *chat1.SenderPrepareOptions,
236	identifyBehavior keybase1.TLFIdentifyBehavior) (obr chat1.OutboxRecord, err error) {
237	defer s.Trace(ctx, &err, "Queue")()
238
239	// KBFSFILEEDIT msgs skip the traditional outbox
240	if msg.ClientHeader.Conv.TopicType == chat1.TopicType_KBFSFILEEDIT {
241		obr := chat1.OutboxRecord{
242			ConvID: convID,
243			Msg:    msg,
244		}
245		select {
246		case s.kbfsDeliverQueue <- obr:
247		default:
248			s.Debug(ctx, "unable to deliver, kbfs queue full: %v", convID)
249		}
250		return obr, nil
251	}
252
253	// Push onto outbox and immediately return
254	obr, err = s.outbox.PushMessage(ctx, convID, msg, outboxID, sendOpts, prepareOpts, identifyBehavior)
255	if err != nil {
256		return obr, err
257	}
258	s.Debug(ctx, "Queue: queued new message: convID: %s outboxID: %s uid: %s ident: %v", convID,
259		obr.OutboxID, s.outbox.GetUID(), identifyBehavior)
260
261	// Alert the deliver loop it should wake up
262	s.msgSentCh <- struct{}{}
263	// Only update mtime badgable messages
264	if obr.Msg.IsBadgableType() {
265		go func(ctx context.Context) {
266			update := []chat1.LocalMtimeUpdate{{ConvID: convID, Mtime: obr.Ctime}}
267			if err := s.G().InboxSource.UpdateLocalMtime(ctx, s.outbox.GetUID(), update); err != nil {
268				s.Debug(ctx, "Queue: unable to update local mtime %v", obr.Ctime.Time())
269			}
270			time.Sleep(250 * time.Millisecond)
271			s.G().InboxSource.NotifyUpdate(ctx, s.outbox.GetUID(), convID)
272		}(globals.BackgroundChatCtx(ctx, s.G()))
273	}
274	return obr, nil
275}
276
277func (s *Deliverer) ActiveDeliveries(ctx context.Context) (res []chat1.OutboxRecord, err error) {
278	defer s.Trace(ctx, &err, "ActiveDeliveries")()
279	if !s.IsDelivering() {
280		s.Debug(ctx, "ActiveDeliveries: not delivering, returning empty")
281		return nil, nil
282	}
283	obrs, err := s.outbox.PullAllConversations(ctx, false, false)
284	if err != nil {
285		s.Debug(ctx, "ActiveDeliveries: failed to pull convs: %s", err)
286		return res, err
287	}
288
289	for _, obr := range obrs {
290		styp, err := obr.State.State()
291		if err != nil {
292			s.Debug(ctx, "ActiveDeliveries: bogus state: outboxID: %s err: %s", obr.OutboxID, err)
293			continue
294		}
295		if styp == chat1.OutboxStateType_SENDING {
296			res = append(res, obr)
297		}
298	}
299	return res, nil
300}
301
302func (s *Deliverer) NextFailure() (chan []chat1.OutboxRecord, func()) {
303	s.notifyFailureChsMu.Lock()
304	defer s.notifyFailureChsMu.Unlock()
305	ch := make(chan []chat1.OutboxRecord, 1)
306	id := libkb.RandStringB64(3)
307	s.notifyFailureChs[id] = ch
308	return ch, func() {
309		s.notifyFailureChsMu.Lock()
310		defer s.notifyFailureChsMu.Unlock()
311		delete(s.notifyFailureChs, id)
312	}
313}
314
315func (s *Deliverer) alertFailureChannels(obrs []chat1.OutboxRecord) {
316	s.notifyFailureChsMu.Lock()
317	defer s.notifyFailureChsMu.Unlock()
318	for _, ch := range s.notifyFailureChs {
319		ch <- obrs
320	}
321	s.notifyFailureChs = make(map[string]chan []chat1.OutboxRecord)
322}
323
324func (s *Deliverer) doNotRetryFailure(ctx context.Context, obr chat1.OutboxRecord, err error) (resType chat1.OutboxErrorType, resErr error, resFail bool) {
325	defer func() {
326		if resErr != nil && resFail {
327			s.Debug(ctx, "doNotRetryFailure: sending back to not retry: err: %s: typ: %T", resErr, resErr)
328		}
329	}()
330	// Check attempts
331	if obr.State.Sending() >= deliverMaxAttempts {
332		return chat1.OutboxErrorType_TOOMANYATTEMPTS, errors.New("max send attempts reached"), true
333	}
334	if !s.connected {
335		// Check to see how long we have been disconnected to see if this
336		// should be retried
337		if disconnTime := s.disconnectedTime(); disconnTime.Minutes() > deliverDisconnectLimitMinutes {
338			s.Debug(ctx, "doNotRetryFailure: not retrying offline failure, disconnected for: %v",
339				disconnTime)
340			return chat1.OutboxErrorType_OFFLINE, err, true
341		}
342	}
343	// Check for any errors that should cause us to give up right away
344	switch berr := err.(type) {
345	case types.UnboxingError:
346		return chat1.OutboxErrorType_MISC, err, berr.IsPermanent()
347	case DelivererInfoError:
348		if typ, ok := berr.IsImmediateFail(); ok {
349			return typ, err, true
350		}
351		return 0, err, false
352	case net.Error:
353		s.Debug(ctx, "doNotRetryFailure: generic net error, reconnecting to the server: %s(%T)", berr, berr)
354		if _, rerr := s.serverConn.Reconnect(ctx); rerr != nil {
355			s.Debug(ctx, "doNotRetryFailure: failed to reconnect: %s", rerr)
356		}
357		return chat1.OutboxErrorType_OFFLINE, err, !berr.Temporary()
358	}
359	switch err {
360	case ErrChatServerTimeout, ErrDuplicateConnection, ErrKeyServerTimeout:
361		return 0, err, false
362	}
363	return 0, err, true
364}
365
366func (s *Deliverer) failMessage(ctx context.Context, obr chat1.OutboxRecord,
367	oserr chat1.OutboxStateError) (err error) {
368	var marked []chat1.OutboxRecord
369	uid := s.outbox.GetUID()
370	convID := obr.ConvID
371	switch oserr.Typ {
372	case chat1.OutboxErrorType_TOOMANYATTEMPTS:
373		s.Debug(ctx, "failMessage: too many attempts failure, marking conv as failed")
374		if marked, err = s.outbox.MarkConvAsError(ctx, convID, oserr); err != nil {
375			s.Debug(ctx, "failMessage: unable to mark conv as error on outbox: uid: %s convID: %v, err: %v",
376				s.outbox.GetUID(), obr.ConvID, err)
377			return err
378		}
379	case chat1.OutboxErrorType_DUPLICATE, chat1.OutboxErrorType_ALREADY_DELETED:
380		// Here we don't send a notification to the frontend, we just want
381		// these to go away
382		if _, err = s.outbox.RemoveMessage(ctx, obr.OutboxID); err != nil {
383			s.Debug(ctx, "deliverLoop: failed to remove duplicate delete msg: %v", err)
384			return err
385		}
386	default:
387		var m chat1.OutboxRecord
388		if m, err = s.outbox.MarkAsError(ctx, obr, oserr); err != nil {
389			s.Debug(ctx, "failMessage: unable to mark as error: %v", err)
390			return err
391		}
392		marked = []chat1.OutboxRecord{m}
393	}
394
395	if len(marked) > 0 {
396		convLocal, err := s.G().InboxSource.IncrementLocalConvVersion(ctx, uid, convID)
397		if err != nil {
398			s.Debug(ctx, "failMessage: failed to get IncrementLocalConvVersion")
399		}
400		act := chat1.NewChatActivityWithFailedMessage(chat1.FailedMessageInfo{
401			OutboxRecords: marked,
402			Conv:          s.presentUIItem(ctx, uid, convLocal),
403		})
404		s.G().ActivityNotifier.Activity(context.Background(), uid, chat1.TopicType_NONE, &act,
405			chat1.ChatActivitySource_LOCAL)
406		s.alertFailureChannels(marked)
407		if err := s.G().Badger.Send(context.Background()); err != nil {
408			s.Debug(ctx, "failMessage: unable to update badger: %v", err)
409			return err
410		}
411	}
412	return nil
413}
414
415type delivererBackgroundTaskError struct {
416	Typ string
417}
418
419var _ (DelivererInfoError) = (*delivererBackgroundTaskError)(nil)
420
421func (e delivererBackgroundTaskError) Error() string {
422	return fmt.Sprintf("%s in progress", e.Typ)
423}
424
425func (e delivererBackgroundTaskError) IsImmediateFail() (chat1.OutboxErrorType, bool) {
426	return chat1.OutboxErrorType_MISC, false
427}
428
429var errDelivererUploadInProgress = delivererBackgroundTaskError{Typ: "attachment upload"}
430var errDelivererUnfurlInProgress = delivererBackgroundTaskError{Typ: "unfurl"}
431var errDelivererFlipConvCreationInProgress = delivererBackgroundTaskError{Typ: "flip"}
432
433func (s *Deliverer) processAttachment(ctx context.Context, obr chat1.OutboxRecord) (chat1.OutboxRecord, error) {
434	if !obr.IsAttachment() {
435		return obr, nil
436	}
437	status, res, err := s.G().AttachmentUploader.Status(ctx, obr.OutboxID)
438	if err != nil {
439		return obr, NewAttachmentUploadError(err.Error(), false)
440	}
441	switch status {
442	case types.AttachmentUploaderTaskStatusSuccess:
443		// Modify the attachment message
444		att := chat1.MessageAttachment{
445			Object:   res.Object,
446			Metadata: res.Metadata,
447			Uploaded: true,
448			Preview:  res.Preview,
449		}
450		if res.Preview != nil {
451			att.Previews = []chat1.Asset{*res.Preview}
452		}
453		obr.Msg.MessageBody = chat1.NewMessageBodyWithAttachment(att)
454		if _, err := s.outbox.UpdateMessage(ctx, obr); err != nil {
455			return obr, err
456		}
457	case types.AttachmentUploaderTaskStatusFailed:
458		errStr := "<unknown>"
459		if res.Error != nil {
460			errStr = *res.Error
461		}
462		// register this as a failure, but still attempt a retry
463		if _, err := s.G().AttachmentUploader.Retry(ctx, obr.OutboxID); err != nil {
464			s.Debug(ctx, "processAttachment: failed to retry upload on in progress task: %s", err)
465			return obr, NewAttachmentUploadError(err.Error(), true)
466		}
467		return obr, NewAttachmentUploadError(errStr, false)
468	case types.AttachmentUploaderTaskStatusUploading:
469		// Make sure we are actually trying to upload this guy
470		if _, err := s.G().AttachmentUploader.Retry(ctx, obr.OutboxID); err != nil {
471			s.Debug(ctx, "processAttachment: failed to retry upload on in progress task: %s", err)
472			return obr, NewAttachmentUploadError(err.Error(), true)
473		}
474		return obr, errDelivererUploadInProgress
475	}
476	return obr, nil
477}
478
479type unfurlError struct {
480	status types.UnfurlerTaskStatus
481}
482
483func newUnfurlError(status types.UnfurlerTaskStatus) unfurlError {
484	return unfurlError{
485		status: status,
486	}
487}
488
489func (e unfurlError) Error() string {
490	if e.status == types.UnfurlerTaskStatusPermFailed {
491		return "unfurler permanent error"
492	}
493	return "unfurler error"
494}
495
496func (e unfurlError) IsImmediateFail() (chat1.OutboxErrorType, bool) {
497	return chat1.OutboxErrorType_MISC, e.status == types.UnfurlerTaskStatusPermFailed
498}
499
500var _ (DelivererInfoError) = (*unfurlError)(nil)
501
502func (s *Deliverer) processUnfurl(ctx context.Context, obr chat1.OutboxRecord) (chat1.OutboxRecord, error) {
503	if !obr.IsUnfurl() {
504		return obr, nil
505	}
506	status, res, err := s.G().Unfurler.Status(ctx, obr.OutboxID)
507	if err != nil {
508		return obr, err
509	}
510	switch status {
511	case types.UnfurlerTaskStatusSuccess:
512		if res == nil {
513			return obr, errors.New("unfurl success with no result")
514		}
515		unfurl := chat1.MessageUnfurl{
516			MessageID: obr.Msg.ClientHeader.Supersedes,
517			Unfurl:    *res,
518		}
519		obr.Msg.MessageBody = chat1.NewMessageBodyWithUnfurl(unfurl)
520		if _, err := s.outbox.UpdateMessage(ctx, obr); err != nil {
521			return obr, err
522		}
523	case types.UnfurlerTaskStatusUnfurling:
524		s.G().Unfurler.Retry(ctx, obr.OutboxID)
525		return obr, errDelivererUnfurlInProgress
526	case types.UnfurlerTaskStatusFailed:
527		s.G().Unfurler.Retry(ctx, obr.OutboxID)
528		return obr, newUnfurlError(status)
529	case types.UnfurlerTaskStatusPermFailed:
530		return obr, newUnfurlError(status)
531	}
532	return obr, nil
533}
534
535type flipPermError struct{}
536
537func (e flipPermError) Error() string {
538	return "unable to start flip"
539}
540
541func (e flipPermError) IsImmediateFail() (chat1.OutboxErrorType, bool) {
542	return chat1.OutboxErrorType_MISC, true
543}
544
545func (s *Deliverer) processFlip(ctx context.Context, obr chat1.OutboxRecord) (chat1.OutboxRecord, error) {
546	if !obr.IsChatFlip() {
547		return obr, nil
548	}
549	body := obr.Msg.MessageBody.Flip()
550	flipConvID, status := s.G().CoinFlipManager.IsFlipConversationCreated(ctx, obr.OutboxID)
551	switch status {
552	case types.FlipSendStatusInProgress:
553		return obr, errDelivererFlipConvCreationInProgress
554	case types.FlipSendStatusError:
555		return obr, flipPermError{}
556	case types.FlipSendStatusSent:
557		s.Debug(ctx, "processFlip: sending with convID: %s", flipConvID)
558		obr.Msg.MessageBody = chat1.NewMessageBodyWithFlip(chat1.MessageFlip{
559			Text:       body.Text,
560			GameID:     body.GameID,
561			FlipConvID: flipConvID,
562		})
563		if _, err := s.outbox.UpdateMessage(ctx, obr); err != nil {
564			return obr, err
565		}
566		return obr, nil
567	}
568	return obr, nil
569}
570
571func (s *Deliverer) processBackgroundTaskMessage(ctx context.Context, obr chat1.OutboxRecord) (chat1.OutboxRecord, error) {
572	switch obr.MessageType() {
573	case chat1.MessageType_ATTACHMENT:
574		return s.processAttachment(ctx, obr)
575	case chat1.MessageType_UNFURL:
576		return s.processUnfurl(ctx, obr)
577	case chat1.MessageType_FLIP:
578		return s.processFlip(ctx, obr)
579	default:
580		return obr, nil
581	}
582}
583
584// cancelPendingDuplicateReactions removes duplicate reactions in the outbox.
585// If we cancel an odd number of items we cancel ourselves since the current
586// reaction state is correct.
587func (s *Deliverer) cancelPendingDuplicateReactions(ctx context.Context, obr chat1.OutboxRecord) (bool, error) {
588	if obr.Msg.ClientHeader.MessageType != chat1.MessageType_REACTION {
589		// nothing to do here
590		return false, nil
591	}
592	// While holding the outbox lock, let's remove any duplicate reaction
593	// messages and  make sure we are in the outbox, otherwise someone else
594	// canceled us.
595	inOutbox := false
596	numCanceled, err := s.outbox.CancelMessagesWithPredicate(ctx, func(o chat1.OutboxRecord) bool {
597		if !o.ConvID.Eq(obr.ConvID) {
598			return false
599		}
600		if o.Msg.ClientHeader.MessageType != chat1.MessageType_REACTION {
601			return false
602		}
603
604		idEq := o.OutboxID.Eq(&obr.OutboxID)
605		bodyEq := o.Msg.MessageBody.Reaction().Eq(obr.Msg.MessageBody.Reaction())
606		// Don't delete ourselves from the outbox, but we want to make sure we
607		// are in here.
608		inOutbox = inOutbox || idEq
609		shouldCancel := bodyEq && !idEq
610		if shouldCancel {
611			s.Debug(ctx, "canceling outbox message convID: %v obid: %v", o.ConvID, o.OutboxID)
612		}
613		return shouldCancel
614	})
615
616	if err != nil {
617		return false, err
618	} else if !inOutbox {
619		// we were canceled previously, the jig is up
620		return true, nil
621	} else if numCanceled%2 == 1 {
622		// Since we're just toggling the reaction on/off, we should abort here
623		// and remove ourselves from the outbox since our message wouldn't
624		// change the reaction state.
625		_, err = s.outbox.RemoveMessage(ctx, obr.OutboxID)
626		return true, err
627	}
628	return false, nil
629}
630
631func (s *Deliverer) shouldRecordError(ctx context.Context, err error) bool {
632	// This just happens when threads are racing to reconnect to
633	// Gregor, don't count it as an error to send.
634	return err != ErrDuplicateConnection
635}
636
637func (s *Deliverer) shouldBreakLoop(ctx context.Context, obr chat1.OutboxRecord) bool {
638	if obr.Msg.ClientHeader.MessageType == chat1.MessageType_UNFURL {
639		s.Debug(ctx, "shouldBreakLoop: not breaking deliverer loop for unfurl failure: outboxID: %s",
640			obr.OutboxID)
641		return false
642	}
643	return true
644}
645
646func (s *Deliverer) kbfsDeliverLoop(shutdownCh chan struct{}) error {
647	bgctx := globals.ChatCtx(context.Background(), s.G(), keybase1.TLFIdentifyBehavior_CHAT_CLI, nil, nil)
648	bgctx = libkb.WithLogTag(bgctx, "KDELV")
649	s.Debug(bgctx, "deliverLoop: starting non blocking sender kbfs deliver loop: uid: %s",
650		s.outbox.GetUID())
651	for {
652		select {
653		case <-shutdownCh:
654			s.Debug(bgctx, "deliverLoop: shutting down outbox deliver loop: uid: %s", s.outbox.GetUID())
655			return nil
656		case obr := <-s.kbfsDeliverQueue:
657			s.Debug(bgctx, "deliverLoop: flushing record obr for %v", obr.ConvID)
658			if _, _, err := s.sender.Send(bgctx, obr.ConvID, obr.Msg, 0, nil, nil, nil); err != nil {
659				s.Debug(bgctx, "Unable to deliver msg: %v", err)
660			}
661		}
662	}
663}
664
665func (s *Deliverer) deliverLoop(shutdownCh chan struct{}) error {
666	bgctx := libkb.WithLogTag(context.Background(), "DELV")
667	s.Debug(bgctx, "deliverLoop: starting non blocking sender deliver loop: uid: %s duration: %v",
668		s.outbox.GetUID(), s.G().Env.GetChatDelivererInterval())
669	for {
670		// Wait for the signal to take action
671		select {
672		case <-shutdownCh:
673			s.Debug(bgctx, "deliverLoop: shutting down outbox deliver loop: uid: %s", s.outbox.GetUID())
674			return nil
675		case <-s.reconnectCh:
676			s.Debug(bgctx, "deliverLoop: flushing outbox on reconnect: uid: %s", s.outbox.GetUID())
677		case <-s.msgSentCh:
678			s.Debug(bgctx, "deliverLoop: flushing outbox on new message: uid: %s", s.outbox.GetUID())
679		case <-s.G().Clock().After(s.G().Env.GetChatDelivererInterval()):
680		}
681
682		// Fetch outbox
683		obrs, err := s.outbox.PullAllConversations(bgctx, false, false)
684		if err != nil {
685			if _, ok := err.(storage.MissError); !ok {
686				s.Debug(bgctx, "deliverLoop: unable to pull outbox: uid: %s err: %v", s.outbox.GetUID(),
687					err)
688			}
689			continue
690		}
691
692		convMap := make(map[chat1.ConvIDStr][]chat1.OutboxRecord)
693		for _, o := range obrs {
694			obr := o
695			convMap[obr.ConvID.ConvIDStr()] = append(convMap[obr.ConvID.ConvIDStr()], obr)
696		}
697
698		var eg errgroup.Group
699		for _, o := range convMap {
700			obrs := o
701			eg.Go(func() error { s.deliverForConv(bgctx, obrs); return nil })
702		}
703		if err := eg.Wait(); err != nil {
704			s.Debug(bgctx, "deliverLoop: error in waitgroup %v", err)
705		}
706	}
707}
708
709func (s *Deliverer) deliverForConv(ctx context.Context, obrs []chat1.OutboxRecord) {
710	if len(obrs) > 0 {
711		s.Debug(ctx, "deliverLoop: flushing %d items from the outbox: uid: %s, convID %v",
712			len(obrs), s.outbox.GetUID(), obrs[0].ConvID)
713	}
714
715	// Send messages
716	var err error
717	var breaks []keybase1.TLFIdentifyFailure
718	for _, obr := range obrs {
719		bctx := globals.ChatCtx(context.Background(), s.G(), obr.IdentifyBehavior, &breaks,
720			s.identNotifier)
721
722		if s.testingNameInfoSource != nil {
723			bctx = globals.CtxAddOverrideNameInfoSource(bctx, s.testingNameInfoSource)
724		}
725		if !s.connected {
726			err = newSenderError("disconnected from chat server", false)
727		} else if s.clock.Now().Sub(obr.Ctime.Time()) > time.Hour {
728			// If we are re-trying a message after an hour, let's just give up. These times can
729			// get very long if the app is suspended on mobile.
730			s.Debug(bctx, "deliverLoop: expiring pending message because it is too old: obid: %s dur: %v",
731				obr.OutboxID, s.clock.Now().Sub(obr.Ctime.Time()))
732			err = delivererExpireError{}
733		} else {
734			// Check for special messages and process based on completion status
735			obr, err = s.processBackgroundTaskMessage(bctx, obr)
736			if err == nil {
737				canceled, err := s.cancelPendingDuplicateReactions(bctx, obr)
738				if err == nil && canceled {
739					s.Debug(bctx, "deliverLoop: aborting send, duplicate send convID: %s, obid: %s",
740						obr.ConvID, obr.OutboxID)
741					continue
742				}
743			} else if _, ok := err.(delivererBackgroundTaskError); ok {
744				// check for bkg task error and loop around if we hit one
745				s.Debug(bctx, "deliverLoop: bkg task in progress, skipping: convID: %s obid: %s task: %v",
746					obr.ConvID, obr.OutboxID, err)
747				continue
748			}
749			if err == nil {
750				_, _, err = s.sender.Send(bctx, obr.ConvID, obr.Msg, 0, nil, obr.SendOpts,
751					obr.PrepareOpts)
752			}
753		}
754		if err != nil {
755			s.Debug(bctx,
756				"deliverLoop: failed to send msg: uid: %s convID: %s obid: %s err: %v attempts: %d",
757				s.outbox.GetUID(), obr.ConvID, obr.OutboxID, err, obr.State.Sending())
758
759			// Process failure. If we determine that the message is unrecoverable, then bail out.
760			if errTyp, newErr, ok := s.doNotRetryFailure(bctx, obr, err); ok {
761				// Record failure if we hit this case, and put the rest of this loop in a
762				// mode where all other entries also fail.
763				s.Debug(bctx, "deliverLoop: failure condition reached, marking as error and notifying: obid: %s errTyp: %v attempts: %d", obr.OutboxID, errTyp, obr.State.Sending())
764
765				if err := s.failMessage(bctx, obr, chat1.OutboxStateError{
766					Message: newErr.Error(),
767					Typ:     errTyp,
768				}); err != nil {
769					s.Debug(bctx, "deliverLoop: unable to fail message: err: %v", err)
770				}
771			} else if s.shouldRecordError(bctx, err) {
772				if err = s.outbox.RecordFailedAttempt(bctx, obr); err != nil {
773					s.Debug(ctx, "deliverLoop: unable to record failed attempt on outbox: uid %s err: %v",
774						s.outbox.GetUID(), err)
775				}
776			}
777			// Check if we should break out of the deliverer loop on this failure
778			if s.shouldBreakLoop(bctx, obr) {
779				break
780			}
781		} else {
782			// BlockingSender actually does this too, so this will likely fail, but to maintain
783			// the types.Sender abstraction we will do it here too and likely fail.
784			if _, err = s.outbox.RemoveMessage(bctx, obr.OutboxID); err != nil {
785				s.Debug(ctx, "deliverLoop: failed to remove successful message send: %v", err)
786			}
787		}
788	}
789}
790