1package api
2
3import (
4	"fmt"
5	"sync"
6	"time"
7)
8
9const (
10	// DefaultLockSessionName is the Session Name we assign if none is provided
11	DefaultLockSessionName = "Consul API Lock"
12
13	// DefaultLockSessionTTL is the default session TTL if no Session is provided
14	// when creating a new Lock. This is used because we do not have another
15	// other check to depend upon.
16	DefaultLockSessionTTL = "15s"
17
18	// DefaultLockWaitTime is how long we block for at a time to check if lock
19	// acquisition is possible. This affects the minimum time it takes to cancel
20	// a Lock acquisition.
21	DefaultLockWaitTime = 15 * time.Second
22
23	// DefaultLockRetryTime is how long we wait after a failed lock acquisition
24	// before attempting to do the lock again. This is so that once a lock-delay
25	// is in effect, we do not hot loop retrying the acquisition.
26	DefaultLockRetryTime = 5 * time.Second
27
28	// DefaultMonitorRetryTime is how long we wait after a failed monitor check
29	// of a lock (500 response code). This allows the monitor to ride out brief
30	// periods of unavailability, subject to the MonitorRetries setting in the
31	// lock options which is by default set to 0, disabling this feature. This
32	// affects locks and semaphores.
33	DefaultMonitorRetryTime = 2 * time.Second
34
35	// LockFlagValue is a magic flag we set to indicate a key
36	// is being used for a lock. It is used to detect a potential
37	// conflict with a semaphore.
38	LockFlagValue = 0x2ddccbc058a50c18
39)
40
41var (
42	// ErrLockHeld is returned if we attempt to double lock
43	ErrLockHeld = fmt.Errorf("Lock already held")
44
45	// ErrLockNotHeld is returned if we attempt to unlock a lock
46	// that we do not hold.
47	ErrLockNotHeld = fmt.Errorf("Lock not held")
48
49	// ErrLockInUse is returned if we attempt to destroy a lock
50	// that is in use.
51	ErrLockInUse = fmt.Errorf("Lock in use")
52
53	// ErrLockConflict is returned if the flags on a key
54	// used for a lock do not match expectation
55	ErrLockConflict = fmt.Errorf("Existing key does not match lock use")
56)
57
58// Lock is used to implement client-side leader election. It is follows the
59// algorithm as described here: https://www.consul.io/docs/guides/leader-election.html.
60type Lock struct {
61	c    *Client
62	opts *LockOptions
63
64	isHeld       bool
65	sessionRenew chan struct{}
66	lockSession  string
67	l            sync.Mutex
68}
69
70// LockOptions is used to parameterize the Lock behavior.
71type LockOptions struct {
72	Key              string        // Must be set and have write permissions
73	Value            []byte        // Optional, value to associate with the lock
74	Session          string        // Optional, created if not specified
75	SessionOpts      *SessionEntry // Optional, options to use when creating a session
76	SessionName      string        // Optional, defaults to DefaultLockSessionName (ignored if SessionOpts is given)
77	SessionTTL       string        // Optional, defaults to DefaultLockSessionTTL (ignored if SessionOpts is given)
78	MonitorRetries   int           // Optional, defaults to 0 which means no retries
79	MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime
80	LockWaitTime     time.Duration // Optional, defaults to DefaultLockWaitTime
81	LockTryOnce      bool          // Optional, defaults to false which means try forever
82	LockDelay        time.Duration // Optional, defaults to 15s
83	Namespace        string        `json:",omitempty"` // Optional, defaults to API client config, namespace of ACL token, or "default" namespace
84}
85
86// LockKey returns a handle to a lock struct which can be used
87// to acquire and release the mutex. The key used must have
88// write permissions.
89func (c *Client) LockKey(key string) (*Lock, error) {
90	opts := &LockOptions{
91		Key: key,
92	}
93	return c.LockOpts(opts)
94}
95
96// LockOpts returns a handle to a lock struct which can be used
97// to acquire and release the mutex. The key used must have
98// write permissions.
99func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
100	if opts.Key == "" {
101		return nil, fmt.Errorf("missing key")
102	}
103	if opts.SessionName == "" {
104		opts.SessionName = DefaultLockSessionName
105	}
106	if opts.SessionTTL == "" {
107		opts.SessionTTL = DefaultLockSessionTTL
108	} else {
109		if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
110			return nil, fmt.Errorf("invalid SessionTTL: %v", err)
111		}
112	}
113	if opts.MonitorRetryTime == 0 {
114		opts.MonitorRetryTime = DefaultMonitorRetryTime
115	}
116	if opts.LockWaitTime == 0 {
117		opts.LockWaitTime = DefaultLockWaitTime
118	}
119	l := &Lock{
120		c:    c,
121		opts: opts,
122	}
123	return l, nil
124}
125
126// Lock attempts to acquire the lock and blocks while doing so.
127// Providing a non-nil stopCh can be used to abort the lock attempt.
128// Returns a channel that is closed if our lock is lost or an error.
129// This channel could be closed at any time due to session invalidation,
130// communication errors, operator intervention, etc. It is NOT safe to
131// assume that the lock is held until Unlock() unless the Session is specifically
132// created without any associated health checks. By default Consul sessions
133// prefer liveness over safety and an application must be able to handle
134// the lock being lost.
135func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
136	// Hold the lock as we try to acquire
137	l.l.Lock()
138	defer l.l.Unlock()
139
140	// Check if we already hold the lock
141	if l.isHeld {
142		return nil, ErrLockHeld
143	}
144
145	wOpts := WriteOptions{
146		Namespace: l.opts.Namespace,
147	}
148
149	// Check if we need to create a session first
150	l.lockSession = l.opts.Session
151	if l.lockSession == "" {
152		s, err := l.createSession()
153		if err != nil {
154			return nil, fmt.Errorf("failed to create session: %v", err)
155		}
156
157		l.sessionRenew = make(chan struct{})
158		l.lockSession = s
159
160		session := l.c.Session()
161		go session.RenewPeriodic(l.opts.SessionTTL, s, &wOpts, l.sessionRenew)
162
163		// If we fail to acquire the lock, cleanup the session
164		defer func() {
165			if !l.isHeld {
166				close(l.sessionRenew)
167				l.sessionRenew = nil
168			}
169		}()
170	}
171
172	// Setup the query options
173	kv := l.c.KV()
174	qOpts := QueryOptions{
175		WaitTime:  l.opts.LockWaitTime,
176		Namespace: l.opts.Namespace,
177	}
178
179	start := time.Now()
180	attempts := 0
181WAIT:
182	// Check if we should quit
183	select {
184	case <-stopCh:
185		return nil, nil
186	default:
187	}
188
189	// Handle the one-shot mode.
190	if l.opts.LockTryOnce && attempts > 0 {
191		elapsed := time.Since(start)
192		if elapsed > l.opts.LockWaitTime {
193			return nil, nil
194		}
195
196		// Query wait time should not exceed the lock wait time
197		qOpts.WaitTime = l.opts.LockWaitTime - elapsed
198	}
199	attempts++
200
201	// Look for an existing lock, blocking until not taken
202	pair, meta, err := kv.Get(l.opts.Key, &qOpts)
203	if err != nil {
204		return nil, fmt.Errorf("failed to read lock: %v", err)
205	}
206	if pair != nil && pair.Flags != LockFlagValue {
207		return nil, ErrLockConflict
208	}
209	locked := false
210	if pair != nil && pair.Session == l.lockSession {
211		goto HELD
212	}
213	if pair != nil && pair.Session != "" {
214		qOpts.WaitIndex = meta.LastIndex
215		goto WAIT
216	}
217
218	// Try to acquire the lock
219	pair = l.lockEntry(l.lockSession)
220
221	locked, _, err = kv.Acquire(pair, &wOpts)
222	if err != nil {
223		return nil, fmt.Errorf("failed to acquire lock: %v", err)
224	}
225
226	// Handle the case of not getting the lock
227	if !locked {
228		// Determine why the lock failed
229		qOpts.WaitIndex = 0
230		pair, meta, err = kv.Get(l.opts.Key, &qOpts)
231		if err != nil {
232			return nil, err
233		}
234		if pair != nil && pair.Session != "" {
235			//If the session is not null, this means that a wait can safely happen
236			//using a long poll
237			qOpts.WaitIndex = meta.LastIndex
238			goto WAIT
239		} else {
240			// If the session is empty and the lock failed to acquire, then it means
241			// a lock-delay is in effect and a timed wait must be used
242			select {
243			case <-time.After(DefaultLockRetryTime):
244				goto WAIT
245			case <-stopCh:
246				return nil, nil
247			}
248		}
249	}
250
251HELD:
252	// Watch to ensure we maintain leadership
253	leaderCh := make(chan struct{})
254	go l.monitorLock(l.lockSession, leaderCh)
255
256	// Set that we own the lock
257	l.isHeld = true
258
259	// Locked! All done
260	return leaderCh, nil
261}
262
263// Unlock released the lock. It is an error to call this
264// if the lock is not currently held.
265func (l *Lock) Unlock() error {
266	// Hold the lock as we try to release
267	l.l.Lock()
268	defer l.l.Unlock()
269
270	// Ensure the lock is actually held
271	if !l.isHeld {
272		return ErrLockNotHeld
273	}
274
275	// Set that we no longer own the lock
276	l.isHeld = false
277
278	// Stop the session renew
279	if l.sessionRenew != nil {
280		defer func() {
281			close(l.sessionRenew)
282			l.sessionRenew = nil
283		}()
284	}
285
286	// Get the lock entry, and clear the lock session
287	lockEnt := l.lockEntry(l.lockSession)
288	l.lockSession = ""
289
290	// Release the lock explicitly
291	kv := l.c.KV()
292	w := WriteOptions{Namespace: l.opts.Namespace}
293
294	_, _, err := kv.Release(lockEnt, &w)
295	if err != nil {
296		return fmt.Errorf("failed to release lock: %v", err)
297	}
298	return nil
299}
300
301// Destroy is used to cleanup the lock entry. It is not necessary
302// to invoke. It will fail if the lock is in use.
303func (l *Lock) Destroy() error {
304	// Hold the lock as we try to release
305	l.l.Lock()
306	defer l.l.Unlock()
307
308	// Check if we already hold the lock
309	if l.isHeld {
310		return ErrLockHeld
311	}
312
313	// Look for an existing lock
314	kv := l.c.KV()
315	q := QueryOptions{Namespace: l.opts.Namespace}
316
317	pair, _, err := kv.Get(l.opts.Key, &q)
318	if err != nil {
319		return fmt.Errorf("failed to read lock: %v", err)
320	}
321
322	// Nothing to do if the lock does not exist
323	if pair == nil {
324		return nil
325	}
326
327	// Check for possible flag conflict
328	if pair.Flags != LockFlagValue {
329		return ErrLockConflict
330	}
331
332	// Check if it is in use
333	if pair.Session != "" {
334		return ErrLockInUse
335	}
336
337	// Attempt the delete
338	w := WriteOptions{Namespace: l.opts.Namespace}
339	didRemove, _, err := kv.DeleteCAS(pair, &w)
340	if err != nil {
341		return fmt.Errorf("failed to remove lock: %v", err)
342	}
343	if !didRemove {
344		return ErrLockInUse
345	}
346	return nil
347}
348
349// createSession is used to create a new managed session
350func (l *Lock) createSession() (string, error) {
351	session := l.c.Session()
352	se := l.opts.SessionOpts
353	if se == nil {
354		se = &SessionEntry{
355			Name:      l.opts.SessionName,
356			TTL:       l.opts.SessionTTL,
357			LockDelay: l.opts.LockDelay,
358		}
359	}
360	w := WriteOptions{Namespace: l.opts.Namespace}
361	id, _, err := session.Create(se, &w)
362	if err != nil {
363		return "", err
364	}
365	return id, nil
366}
367
368// lockEntry returns a formatted KVPair for the lock
369func (l *Lock) lockEntry(session string) *KVPair {
370	return &KVPair{
371		Key:     l.opts.Key,
372		Value:   l.opts.Value,
373		Session: session,
374		Flags:   LockFlagValue,
375	}
376}
377
378// monitorLock is a long running routine to monitor a lock ownership
379// It closes the stopCh if we lose our leadership.
380func (l *Lock) monitorLock(session string, stopCh chan struct{}) {
381	defer close(stopCh)
382	kv := l.c.KV()
383	opts := QueryOptions{
384		RequireConsistent: true,
385		Namespace:         l.opts.Namespace,
386	}
387WAIT:
388	retries := l.opts.MonitorRetries
389RETRY:
390	pair, meta, err := kv.Get(l.opts.Key, &opts)
391	if err != nil {
392		// If configured we can try to ride out a brief Consul unavailability
393		// by doing retries. Note that we have to attempt the retry in a non-
394		// blocking fashion so that we have a clean place to reset the retry
395		// counter if service is restored.
396		if retries > 0 && IsRetryableError(err) {
397			time.Sleep(l.opts.MonitorRetryTime)
398			retries--
399			opts.WaitIndex = 0
400			goto RETRY
401		}
402		return
403	}
404	if pair != nil && pair.Session == session {
405		opts.WaitIndex = meta.LastIndex
406		goto WAIT
407	}
408}
409