1package utils
2
3import (
4	"errors"
5	"fmt"
6	"sync"
7	"time"
8
9	"github.com/keybase/client/go/chat/globals"
10	"github.com/keybase/client/go/libkb"
11	"github.com/keybase/client/go/protocol/chat1"
12	"github.com/keybase/client/go/protocol/gregor1"
13	context "golang.org/x/net/context"
14)
15
16var ErrConvLockTabDeadlock = errors.New("timeout reading thread")
17
18type conversationLock struct {
19	refs, shares int
20	trace        string
21	lock         sync.Mutex
22}
23
24type ConversationLockTab struct {
25	globals.Contextified
26	sync.Mutex
27	DebugLabeler
28
29	maxAcquireRetries int
30	convLocks         map[string]*conversationLock
31	waits             map[string]string
32	blockCb           *chan struct{} // Testing
33}
34
35func NewConversationLockTab(g *globals.Context) *ConversationLockTab {
36	return &ConversationLockTab{
37		Contextified:      globals.NewContextified(g),
38		DebugLabeler:      NewDebugLabeler(g.ExternalG(), "ConversationLockTab", false),
39		convLocks:         make(map[string]*conversationLock),
40		waits:             make(map[string]string),
41		maxAcquireRetries: 25,
42	}
43}
44
45func (c *ConversationLockTab) SetMaxAcquireRetries(n int) {
46	c.Lock()
47	defer c.Unlock()
48	c.maxAcquireRetries = n
49}
50
51func (c *ConversationLockTab) SetBlockCb(ch *chan struct{}) {
52	c.Lock()
53	defer c.Unlock()
54	c.blockCb = ch
55}
56
57func (c *ConversationLockTab) NumLocks() int {
58	c.Lock()
59	defer c.Unlock()
60	return len(c.convLocks)
61}
62
63func (c *ConversationLockTab) key(uid gregor1.UID, convID chat1.ConversationID) string {
64	return fmt.Sprintf("%s:%s", uid, convID)
65}
66
67// deadlockDetect tries to find a deadlock condition in the current set of waiting acquirers.
68func (c *ConversationLockTab) deadlockDetect(ctx context.Context, trace string, waiters map[string]bool) bool {
69	// See if this trace is waiting on any other trace
70	waitingOnTrace, ok := c.waits[trace]
71	if !ok {
72		// If not, no deadlock
73		return false
74	}
75	// If we are waiting on a trace we have already encountered, then we have hit a deadlock
76	if waiters[waitingOnTrace] {
77		c.Debug(ctx, "deadlockDetect: deadlock detected: trace: %s waitingOnTrace: %s waiters: %v",
78			trace, waitingOnTrace, waiters)
79		return true
80	}
81	// Set the current trace as waiting, and then continue down the chain
82	waiters[trace] = true
83	return c.deadlockDetect(ctx, waitingOnTrace, waiters)
84}
85
86func (c *ConversationLockTab) doAcquire(ctx context.Context, uid gregor1.UID, convID chat1.ConversationID) (blocked bool, err error) {
87	key := c.key(uid, convID)
88	trace, ok := globals.CtxTrace(ctx)
89	if !ok {
90		c.Debug(ctx, "Acquire: failed to find trace value, not using a lock: convID: %s", convID)
91		return false, nil
92	}
93
94	c.Lock()
95	if lock, ok := c.convLocks[key]; ok {
96		if lock.trace == trace {
97			// Our request holds the lock on this conversation ID already, so just plow through it
98			lock.shares++
99			c.Unlock()
100			return
101		}
102		c.Debug(ctx, "Acquire: blocked by trace: %s on convID: %s", lock.trace, convID)
103		blocker := lock.trace
104		if c.blockCb != nil {
105			*c.blockCb <- struct{}{} // For testing
106		}
107		c.waits[trace] = lock.trace
108		// If we get blocked, let's make sure we aren't in a deadlock situation, and if so, we bail out
109		if c.deadlockDetect(ctx, lock.trace, map[string]bool{
110			trace: true,
111		}) {
112			c.Unlock()
113			return true, ErrConvLockTabDeadlock
114		}
115		lock.refs++
116		c.Unlock() // Give up map lock while we are waiting for conv lock
117		lock.lock.Lock()
118		c.Lock()
119		c.Debug(ctx, "Acquire: unblocked from trace: %s on convID: %s", blocker, convID)
120		delete(c.waits, trace)
121		lock.trace = trace
122		lock.shares = 1
123		c.Unlock()
124		return true, nil
125	}
126
127	lock := &conversationLock{
128		shares: 1,
129		refs:   1,
130		trace:  trace,
131	}
132	c.convLocks[key] = lock
133	lock.lock.Lock()
134	c.Unlock()
135	return false, nil
136}
137
138// Acquire obtains a per user per conversation lock on a per trace basis. That is, the lock is a
139// shared lock for the current chat trace, and serves to synchronize large chat operations. If there is
140// no chat trace, this is a no-op.
141func (c *ConversationLockTab) Acquire(ctx context.Context, uid gregor1.UID, convID chat1.ConversationID) (blocked bool, err error) {
142	sleep := 200 * time.Millisecond
143	for i := 0; i < c.maxAcquireRetries; i++ {
144		blocked, err = c.doAcquire(ctx, uid, convID)
145		if err != nil {
146			if err != ErrConvLockTabDeadlock {
147				return true, err
148			}
149			c.Debug(ctx, "Acquire: deadlock condition detected, sleeping and trying again: attempt: %d", i)
150			time.Sleep(libkb.RandomJitter(sleep))
151			continue
152		}
153		return blocked, nil
154	}
155	c.Debug(ctx, "Acquire: giving up, max attempts reached")
156	return true, ErrConvLockTabDeadlock
157}
158
159func (c *ConversationLockTab) Release(ctx context.Context, uid gregor1.UID, convID chat1.ConversationID) (released bool) {
160	c.Lock()
161	defer c.Unlock()
162	trace, ok := globals.CtxTrace(ctx)
163	if !ok {
164		c.Debug(ctx, "Release: failed to find trace value, doing nothing: convID: %s", convID)
165		return false
166	}
167
168	key := c.key(uid, convID)
169	if lock, ok := c.convLocks[key]; ok {
170		if lock.trace != trace {
171			c.Debug(ctx, "Release: different trace trying to free lock? convID: %s lock.trace: %s trace: %s",
172				convID, lock.trace, trace)
173		} else {
174			lock.shares--
175			if lock.shares == 0 {
176				lock.refs--
177				if lock.refs == 0 {
178					delete(c.convLocks, key)
179				}
180				lock.trace = ""
181				lock.lock.Unlock()
182				return true
183			}
184		}
185	}
186	return false
187}
188