1package api 2 3import ( 4 "fmt" 5 "sync" 6 "time" 7) 8 9const ( 10 // DefaultLockSessionName is the Session Name we assign if none is provided 11 DefaultLockSessionName = "Consul API Lock" 12 13 // DefaultLockSessionTTL is the default session TTL if no Session is provided 14 // when creating a new Lock. This is used because we do not have another 15 // other check to depend upon. 16 DefaultLockSessionTTL = "15s" 17 18 // DefaultLockWaitTime is how long we block for at a time to check if lock 19 // acquisition is possible. This affects the minimum time it takes to cancel 20 // a Lock acquisition. 21 DefaultLockWaitTime = 15 * time.Second 22 23 // DefaultLockRetryTime is how long we wait after a failed lock acquisition 24 // before attempting to do the lock again. This is so that once a lock-delay 25 // is in effect, we do not hot loop retrying the acquisition. 26 DefaultLockRetryTime = 5 * time.Second 27 28 // DefaultMonitorRetryTime is how long we wait after a failed monitor check 29 // of a lock (500 response code). This allows the monitor to ride out brief 30 // periods of unavailability, subject to the MonitorRetries setting in the 31 // lock options which is by default set to 0, disabling this feature. This 32 // affects locks and semaphores. 33 DefaultMonitorRetryTime = 2 * time.Second 34 35 // LockFlagValue is a magic flag we set to indicate a key 36 // is being used for a lock. It is used to detect a potential 37 // conflict with a semaphore. 38 LockFlagValue = 0x2ddccbc058a50c18 39) 40 41var ( 42 // ErrLockHeld is returned if we attempt to double lock 43 ErrLockHeld = fmt.Errorf("Lock already held") 44 45 // ErrLockNotHeld is returned if we attempt to unlock a lock 46 // that we do not hold. 47 ErrLockNotHeld = fmt.Errorf("Lock not held") 48 49 // ErrLockInUse is returned if we attempt to destroy a lock 50 // that is in use. 51 ErrLockInUse = fmt.Errorf("Lock in use") 52 53 // ErrLockConflict is returned if the flags on a key 54 // used for a lock do not match expectation 55 ErrLockConflict = fmt.Errorf("Existing key does not match lock use") 56) 57 58// Lock is used to implement client-side leader election. It is follows the 59// algorithm as described here: https://www.consul.io/docs/guides/leader-election.html. 60type Lock struct { 61 c *Client 62 opts *LockOptions 63 64 isHeld bool 65 sessionRenew chan struct{} 66 lockSession string 67 l sync.Mutex 68} 69 70// LockOptions is used to parameterize the Lock behavior. 71type LockOptions struct { 72 Key string // Must be set and have write permissions 73 Value []byte // Optional, value to associate with the lock 74 Session string // Optional, created if not specified 75 SessionOpts *SessionEntry // Optional, options to use when creating a session 76 SessionName string // Optional, defaults to DefaultLockSessionName (ignored if SessionOpts is given) 77 SessionTTL string // Optional, defaults to DefaultLockSessionTTL (ignored if SessionOpts is given) 78 MonitorRetries int // Optional, defaults to 0 which means no retries 79 MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime 80 LockWaitTime time.Duration // Optional, defaults to DefaultLockWaitTime 81 LockTryOnce bool // Optional, defaults to false which means try forever 82 LockDelay time.Duration // Optional, defaults to 15s 83 Namespace string `json:",omitempty"` // Optional, defaults to API client config, namespace of ACL token, or "default" namespace 84} 85 86// LockKey returns a handle to a lock struct which can be used 87// to acquire and release the mutex. The key used must have 88// write permissions. 89func (c *Client) LockKey(key string) (*Lock, error) { 90 opts := &LockOptions{ 91 Key: key, 92 } 93 return c.LockOpts(opts) 94} 95 96// LockOpts returns a handle to a lock struct which can be used 97// to acquire and release the mutex. The key used must have 98// write permissions. 99func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) { 100 if opts.Key == "" { 101 return nil, fmt.Errorf("missing key") 102 } 103 if opts.SessionName == "" { 104 opts.SessionName = DefaultLockSessionName 105 } 106 if opts.SessionTTL == "" { 107 opts.SessionTTL = DefaultLockSessionTTL 108 } else { 109 if _, err := time.ParseDuration(opts.SessionTTL); err != nil { 110 return nil, fmt.Errorf("invalid SessionTTL: %v", err) 111 } 112 } 113 if opts.MonitorRetryTime == 0 { 114 opts.MonitorRetryTime = DefaultMonitorRetryTime 115 } 116 if opts.LockWaitTime == 0 { 117 opts.LockWaitTime = DefaultLockWaitTime 118 } 119 l := &Lock{ 120 c: c, 121 opts: opts, 122 } 123 return l, nil 124} 125 126// Lock attempts to acquire the lock and blocks while doing so. 127// Providing a non-nil stopCh can be used to abort the lock attempt. 128// Returns a channel that is closed if our lock is lost or an error. 129// This channel could be closed at any time due to session invalidation, 130// communication errors, operator intervention, etc. It is NOT safe to 131// assume that the lock is held until Unlock() unless the Session is specifically 132// created without any associated health checks. By default Consul sessions 133// prefer liveness over safety and an application must be able to handle 134// the lock being lost. 135func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { 136 // Hold the lock as we try to acquire 137 l.l.Lock() 138 defer l.l.Unlock() 139 140 // Check if we already hold the lock 141 if l.isHeld { 142 return nil, ErrLockHeld 143 } 144 145 wOpts := WriteOptions{ 146 Namespace: l.opts.Namespace, 147 } 148 149 // Check if we need to create a session first 150 l.lockSession = l.opts.Session 151 if l.lockSession == "" { 152 s, err := l.createSession() 153 if err != nil { 154 return nil, fmt.Errorf("failed to create session: %v", err) 155 } 156 157 l.sessionRenew = make(chan struct{}) 158 l.lockSession = s 159 160 session := l.c.Session() 161 go session.RenewPeriodic(l.opts.SessionTTL, s, &wOpts, l.sessionRenew) 162 163 // If we fail to acquire the lock, cleanup the session 164 defer func() { 165 if !l.isHeld { 166 close(l.sessionRenew) 167 l.sessionRenew = nil 168 } 169 }() 170 } 171 172 // Setup the query options 173 kv := l.c.KV() 174 qOpts := QueryOptions{ 175 WaitTime: l.opts.LockWaitTime, 176 Namespace: l.opts.Namespace, 177 } 178 179 start := time.Now() 180 attempts := 0 181WAIT: 182 // Check if we should quit 183 select { 184 case <-stopCh: 185 return nil, nil 186 default: 187 } 188 189 // Handle the one-shot mode. 190 if l.opts.LockTryOnce && attempts > 0 { 191 elapsed := time.Since(start) 192 if elapsed > l.opts.LockWaitTime { 193 return nil, nil 194 } 195 196 // Query wait time should not exceed the lock wait time 197 qOpts.WaitTime = l.opts.LockWaitTime - elapsed 198 } 199 attempts++ 200 201 // Look for an existing lock, blocking until not taken 202 pair, meta, err := kv.Get(l.opts.Key, &qOpts) 203 if err != nil { 204 return nil, fmt.Errorf("failed to read lock: %v", err) 205 } 206 if pair != nil && pair.Flags != LockFlagValue { 207 return nil, ErrLockConflict 208 } 209 locked := false 210 if pair != nil && pair.Session == l.lockSession { 211 goto HELD 212 } 213 if pair != nil && pair.Session != "" { 214 qOpts.WaitIndex = meta.LastIndex 215 goto WAIT 216 } 217 218 // Try to acquire the lock 219 pair = l.lockEntry(l.lockSession) 220 221 locked, _, err = kv.Acquire(pair, &wOpts) 222 if err != nil { 223 return nil, fmt.Errorf("failed to acquire lock: %v", err) 224 } 225 226 // Handle the case of not getting the lock 227 if !locked { 228 // Determine why the lock failed 229 qOpts.WaitIndex = 0 230 pair, meta, err = kv.Get(l.opts.Key, &qOpts) 231 if err != nil { 232 return nil, err 233 } 234 if pair != nil && pair.Session != "" { 235 //If the session is not null, this means that a wait can safely happen 236 //using a long poll 237 qOpts.WaitIndex = meta.LastIndex 238 goto WAIT 239 } else { 240 // If the session is empty and the lock failed to acquire, then it means 241 // a lock-delay is in effect and a timed wait must be used 242 select { 243 case <-time.After(DefaultLockRetryTime): 244 goto WAIT 245 case <-stopCh: 246 return nil, nil 247 } 248 } 249 } 250 251HELD: 252 // Watch to ensure we maintain leadership 253 leaderCh := make(chan struct{}) 254 go l.monitorLock(l.lockSession, leaderCh) 255 256 // Set that we own the lock 257 l.isHeld = true 258 259 // Locked! All done 260 return leaderCh, nil 261} 262 263// Unlock released the lock. It is an error to call this 264// if the lock is not currently held. 265func (l *Lock) Unlock() error { 266 // Hold the lock as we try to release 267 l.l.Lock() 268 defer l.l.Unlock() 269 270 // Ensure the lock is actually held 271 if !l.isHeld { 272 return ErrLockNotHeld 273 } 274 275 // Set that we no longer own the lock 276 l.isHeld = false 277 278 // Stop the session renew 279 if l.sessionRenew != nil { 280 defer func() { 281 close(l.sessionRenew) 282 l.sessionRenew = nil 283 }() 284 } 285 286 // Get the lock entry, and clear the lock session 287 lockEnt := l.lockEntry(l.lockSession) 288 l.lockSession = "" 289 290 // Release the lock explicitly 291 kv := l.c.KV() 292 w := WriteOptions{Namespace: l.opts.Namespace} 293 294 _, _, err := kv.Release(lockEnt, &w) 295 if err != nil { 296 return fmt.Errorf("failed to release lock: %v", err) 297 } 298 return nil 299} 300 301// Destroy is used to cleanup the lock entry. It is not necessary 302// to invoke. It will fail if the lock is in use. 303func (l *Lock) Destroy() error { 304 // Hold the lock as we try to release 305 l.l.Lock() 306 defer l.l.Unlock() 307 308 // Check if we already hold the lock 309 if l.isHeld { 310 return ErrLockHeld 311 } 312 313 // Look for an existing lock 314 kv := l.c.KV() 315 q := QueryOptions{Namespace: l.opts.Namespace} 316 317 pair, _, err := kv.Get(l.opts.Key, &q) 318 if err != nil { 319 return fmt.Errorf("failed to read lock: %v", err) 320 } 321 322 // Nothing to do if the lock does not exist 323 if pair == nil { 324 return nil 325 } 326 327 // Check for possible flag conflict 328 if pair.Flags != LockFlagValue { 329 return ErrLockConflict 330 } 331 332 // Check if it is in use 333 if pair.Session != "" { 334 return ErrLockInUse 335 } 336 337 // Attempt the delete 338 w := WriteOptions{Namespace: l.opts.Namespace} 339 didRemove, _, err := kv.DeleteCAS(pair, &w) 340 if err != nil { 341 return fmt.Errorf("failed to remove lock: %v", err) 342 } 343 if !didRemove { 344 return ErrLockInUse 345 } 346 return nil 347} 348 349// createSession is used to create a new managed session 350func (l *Lock) createSession() (string, error) { 351 session := l.c.Session() 352 se := l.opts.SessionOpts 353 if se == nil { 354 se = &SessionEntry{ 355 Name: l.opts.SessionName, 356 TTL: l.opts.SessionTTL, 357 LockDelay: l.opts.LockDelay, 358 } 359 } 360 w := WriteOptions{Namespace: l.opts.Namespace} 361 id, _, err := session.Create(se, &w) 362 if err != nil { 363 return "", err 364 } 365 return id, nil 366} 367 368// lockEntry returns a formatted KVPair for the lock 369func (l *Lock) lockEntry(session string) *KVPair { 370 return &KVPair{ 371 Key: l.opts.Key, 372 Value: l.opts.Value, 373 Session: session, 374 Flags: LockFlagValue, 375 } 376} 377 378// monitorLock is a long running routine to monitor a lock ownership 379// It closes the stopCh if we lose our leadership. 380func (l *Lock) monitorLock(session string, stopCh chan struct{}) { 381 defer close(stopCh) 382 kv := l.c.KV() 383 opts := QueryOptions{ 384 RequireConsistent: true, 385 Namespace: l.opts.Namespace, 386 } 387WAIT: 388 retries := l.opts.MonitorRetries 389RETRY: 390 pair, meta, err := kv.Get(l.opts.Key, &opts) 391 if err != nil { 392 // If configured we can try to ride out a brief Consul unavailability 393 // by doing retries. Note that we have to attempt the retry in a non- 394 // blocking fashion so that we have a clean place to reset the retry 395 // counter if service is restored. 396 if retries > 0 && IsRetryableError(err) { 397 time.Sleep(l.opts.MonitorRetryTime) 398 retries-- 399 opts.WaitIndex = 0 400 goto RETRY 401 } 402 return 403 } 404 if pair != nil && pair.Session == session { 405 opts.WaitIndex = meta.LastIndex 406 goto WAIT 407 } 408} 409