1package utils 2 3import ( 4 "errors" 5 "fmt" 6 "sync" 7 "time" 8 9 "github.com/keybase/client/go/chat/globals" 10 "github.com/keybase/client/go/libkb" 11 "github.com/keybase/client/go/protocol/chat1" 12 "github.com/keybase/client/go/protocol/gregor1" 13 context "golang.org/x/net/context" 14) 15 16var ErrConvLockTabDeadlock = errors.New("timeout reading thread") 17 18type conversationLock struct { 19 refs, shares int 20 trace string 21 lock sync.Mutex 22} 23 24type ConversationLockTab struct { 25 globals.Contextified 26 sync.Mutex 27 DebugLabeler 28 29 maxAcquireRetries int 30 convLocks map[string]*conversationLock 31 waits map[string]string 32 blockCb *chan struct{} // Testing 33} 34 35func NewConversationLockTab(g *globals.Context) *ConversationLockTab { 36 return &ConversationLockTab{ 37 Contextified: globals.NewContextified(g), 38 DebugLabeler: NewDebugLabeler(g.ExternalG(), "ConversationLockTab", false), 39 convLocks: make(map[string]*conversationLock), 40 waits: make(map[string]string), 41 maxAcquireRetries: 25, 42 } 43} 44 45func (c *ConversationLockTab) SetMaxAcquireRetries(n int) { 46 c.Lock() 47 defer c.Unlock() 48 c.maxAcquireRetries = n 49} 50 51func (c *ConversationLockTab) SetBlockCb(ch *chan struct{}) { 52 c.Lock() 53 defer c.Unlock() 54 c.blockCb = ch 55} 56 57func (c *ConversationLockTab) NumLocks() int { 58 c.Lock() 59 defer c.Unlock() 60 return len(c.convLocks) 61} 62 63func (c *ConversationLockTab) key(uid gregor1.UID, convID chat1.ConversationID) string { 64 return fmt.Sprintf("%s:%s", uid, convID) 65} 66 67// deadlockDetect tries to find a deadlock condition in the current set of waiting acquirers. 68func (c *ConversationLockTab) deadlockDetect(ctx context.Context, trace string, waiters map[string]bool) bool { 69 // See if this trace is waiting on any other trace 70 waitingOnTrace, ok := c.waits[trace] 71 if !ok { 72 // If not, no deadlock 73 return false 74 } 75 // If we are waiting on a trace we have already encountered, then we have hit a deadlock 76 if waiters[waitingOnTrace] { 77 c.Debug(ctx, "deadlockDetect: deadlock detected: trace: %s waitingOnTrace: %s waiters: %v", 78 trace, waitingOnTrace, waiters) 79 return true 80 } 81 // Set the current trace as waiting, and then continue down the chain 82 waiters[trace] = true 83 return c.deadlockDetect(ctx, waitingOnTrace, waiters) 84} 85 86func (c *ConversationLockTab) doAcquire(ctx context.Context, uid gregor1.UID, convID chat1.ConversationID) (blocked bool, err error) { 87 key := c.key(uid, convID) 88 trace, ok := globals.CtxTrace(ctx) 89 if !ok { 90 c.Debug(ctx, "Acquire: failed to find trace value, not using a lock: convID: %s", convID) 91 return false, nil 92 } 93 94 c.Lock() 95 if lock, ok := c.convLocks[key]; ok { 96 if lock.trace == trace { 97 // Our request holds the lock on this conversation ID already, so just plow through it 98 lock.shares++ 99 c.Unlock() 100 return 101 } 102 c.Debug(ctx, "Acquire: blocked by trace: %s on convID: %s", lock.trace, convID) 103 blocker := lock.trace 104 if c.blockCb != nil { 105 *c.blockCb <- struct{}{} // For testing 106 } 107 c.waits[trace] = lock.trace 108 // If we get blocked, let's make sure we aren't in a deadlock situation, and if so, we bail out 109 if c.deadlockDetect(ctx, lock.trace, map[string]bool{ 110 trace: true, 111 }) { 112 c.Unlock() 113 return true, ErrConvLockTabDeadlock 114 } 115 lock.refs++ 116 c.Unlock() // Give up map lock while we are waiting for conv lock 117 lock.lock.Lock() 118 c.Lock() 119 c.Debug(ctx, "Acquire: unblocked from trace: %s on convID: %s", blocker, convID) 120 delete(c.waits, trace) 121 lock.trace = trace 122 lock.shares = 1 123 c.Unlock() 124 return true, nil 125 } 126 127 lock := &conversationLock{ 128 shares: 1, 129 refs: 1, 130 trace: trace, 131 } 132 c.convLocks[key] = lock 133 lock.lock.Lock() 134 c.Unlock() 135 return false, nil 136} 137 138// Acquire obtains a per user per conversation lock on a per trace basis. That is, the lock is a 139// shared lock for the current chat trace, and serves to synchronize large chat operations. If there is 140// no chat trace, this is a no-op. 141func (c *ConversationLockTab) Acquire(ctx context.Context, uid gregor1.UID, convID chat1.ConversationID) (blocked bool, err error) { 142 sleep := 200 * time.Millisecond 143 for i := 0; i < c.maxAcquireRetries; i++ { 144 blocked, err = c.doAcquire(ctx, uid, convID) 145 if err != nil { 146 if err != ErrConvLockTabDeadlock { 147 return true, err 148 } 149 c.Debug(ctx, "Acquire: deadlock condition detected, sleeping and trying again: attempt: %d", i) 150 time.Sleep(libkb.RandomJitter(sleep)) 151 continue 152 } 153 return blocked, nil 154 } 155 c.Debug(ctx, "Acquire: giving up, max attempts reached") 156 return true, ErrConvLockTabDeadlock 157} 158 159func (c *ConversationLockTab) Release(ctx context.Context, uid gregor1.UID, convID chat1.ConversationID) (released bool) { 160 c.Lock() 161 defer c.Unlock() 162 trace, ok := globals.CtxTrace(ctx) 163 if !ok { 164 c.Debug(ctx, "Release: failed to find trace value, doing nothing: convID: %s", convID) 165 return false 166 } 167 168 key := c.key(uid, convID) 169 if lock, ok := c.convLocks[key]; ok { 170 if lock.trace != trace { 171 c.Debug(ctx, "Release: different trace trying to free lock? convID: %s lock.trace: %s trace: %s", 172 convID, lock.trace, trace) 173 } else { 174 lock.shares-- 175 if lock.shares == 0 { 176 lock.refs-- 177 if lock.refs == 0 { 178 delete(c.convLocks, key) 179 } 180 lock.trace = "" 181 lock.lock.Unlock() 182 return true 183 } 184 } 185 } 186 return false 187} 188