1package api
2
3import (
4	"fmt"
5	"sync"
6	"time"
7)
8
9const (
10	// DefaultLockSessionName is the Session Name we assign if none is provided
11	DefaultLockSessionName = "Consul API Lock"
12
13	// DefaultLockSessionTTL is the default session TTL if no Session is provided
14	// when creating a new Lock. This is used because we do not have another
15	// other check to depend upon.
16	DefaultLockSessionTTL = "15s"
17
18	// DefaultLockWaitTime is how long we block for at a time to check if lock
19	// acquisition is possible. This affects the minimum time it takes to cancel
20	// a Lock acquisition.
21	DefaultLockWaitTime = 15 * time.Second
22
23	// DefaultLockRetryTime is how long we wait after a failed lock acquisition
24	// before attempting to do the lock again. This is so that once a lock-delay
25	// is in effect, we do not hot loop retrying the acquisition.
26	DefaultLockRetryTime = 5 * time.Second
27
28	// DefaultMonitorRetryTime is how long we wait after a failed monitor check
29	// of a lock (500 response code). This allows the monitor to ride out brief
30	// periods of unavailability, subject to the MonitorRetries setting in the
31	// lock options which is by default set to 0, disabling this feature. This
32	// affects locks and semaphores.
33	DefaultMonitorRetryTime = 2 * time.Second
34
35	// LockFlagValue is a magic flag we set to indicate a key
36	// is being used for a lock. It is used to detect a potential
37	// conflict with a semaphore.
38	LockFlagValue = 0x2ddccbc058a50c18
39)
40
41var (
42	// ErrLockHeld is returned if we attempt to double lock
43	ErrLockHeld = fmt.Errorf("Lock already held")
44
45	// ErrLockNotHeld is returned if we attempt to unlock a lock
46	// that we do not hold.
47	ErrLockNotHeld = fmt.Errorf("Lock not held")
48
49	// ErrLockInUse is returned if we attempt to destroy a lock
50	// that is in use.
51	ErrLockInUse = fmt.Errorf("Lock in use")
52
53	// ErrLockConflict is returned if the flags on a key
54	// used for a lock do not match expectation
55	ErrLockConflict = fmt.Errorf("Existing key does not match lock use")
56)
57
58// Lock is used to implement client-side leader election. It is follows the
59// algorithm as described here: https://www.consul.io/docs/guides/leader-election.html.
60type Lock struct {
61	c    *Client
62	opts *LockOptions
63
64	isHeld       bool
65	sessionRenew chan struct{}
66	lockSession  string
67	l            sync.Mutex
68}
69
70// LockOptions is used to parameterize the Lock behavior.
71type LockOptions struct {
72	Key              string        // Must be set and have write permissions
73	Value            []byte        // Optional, value to associate with the lock
74	Session          string        // Optional, created if not specified
75	SessionOpts      *SessionEntry // Optional, options to use when creating a session
76	SessionName      string        // Optional, defaults to DefaultLockSessionName (ignored if SessionOpts is given)
77	SessionTTL       string        // Optional, defaults to DefaultLockSessionTTL (ignored if SessionOpts is given)
78	MonitorRetries   int           // Optional, defaults to 0 which means no retries
79	MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime
80	LockWaitTime     time.Duration // Optional, defaults to DefaultLockWaitTime
81	LockTryOnce      bool          // Optional, defaults to false which means try forever
82}
83
84// LockKey returns a handle to a lock struct which can be used
85// to acquire and release the mutex. The key used must have
86// write permissions.
87func (c *Client) LockKey(key string) (*Lock, error) {
88	opts := &LockOptions{
89		Key: key,
90	}
91	return c.LockOpts(opts)
92}
93
94// LockOpts returns a handle to a lock struct which can be used
95// to acquire and release the mutex. The key used must have
96// write permissions.
97func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
98	if opts.Key == "" {
99		return nil, fmt.Errorf("missing key")
100	}
101	if opts.SessionName == "" {
102		opts.SessionName = DefaultLockSessionName
103	}
104	if opts.SessionTTL == "" {
105		opts.SessionTTL = DefaultLockSessionTTL
106	} else {
107		if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
108			return nil, fmt.Errorf("invalid SessionTTL: %v", err)
109		}
110	}
111	if opts.MonitorRetryTime == 0 {
112		opts.MonitorRetryTime = DefaultMonitorRetryTime
113	}
114	if opts.LockWaitTime == 0 {
115		opts.LockWaitTime = DefaultLockWaitTime
116	}
117	l := &Lock{
118		c:    c,
119		opts: opts,
120	}
121	return l, nil
122}
123
124// Lock attempts to acquire the lock and blocks while doing so.
125// Providing a non-nil stopCh can be used to abort the lock attempt.
126// Returns a channel that is closed if our lock is lost or an error.
127// This channel could be closed at any time due to session invalidation,
128// communication errors, operator intervention, etc. It is NOT safe to
129// assume that the lock is held until Unlock() unless the Session is specifically
130// created without any associated health checks. By default Consul sessions
131// prefer liveness over safety and an application must be able to handle
132// the lock being lost.
133func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
134	// Hold the lock as we try to acquire
135	l.l.Lock()
136	defer l.l.Unlock()
137
138	// Check if we already hold the lock
139	if l.isHeld {
140		return nil, ErrLockHeld
141	}
142
143	// Check if we need to create a session first
144	l.lockSession = l.opts.Session
145	if l.lockSession == "" {
146		s, err := l.createSession()
147		if err != nil {
148			return nil, fmt.Errorf("failed to create session: %v", err)
149		}
150
151		l.sessionRenew = make(chan struct{})
152		l.lockSession = s
153		session := l.c.Session()
154		go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)
155
156		// If we fail to acquire the lock, cleanup the session
157		defer func() {
158			if !l.isHeld {
159				close(l.sessionRenew)
160				l.sessionRenew = nil
161			}
162		}()
163	}
164
165	// Setup the query options
166	kv := l.c.KV()
167	qOpts := &QueryOptions{
168		WaitTime: l.opts.LockWaitTime,
169	}
170
171	start := time.Now()
172	attempts := 0
173WAIT:
174	// Check if we should quit
175	select {
176	case <-stopCh:
177		return nil, nil
178	default:
179	}
180
181	// Handle the one-shot mode.
182	if l.opts.LockTryOnce && attempts > 0 {
183		elapsed := time.Since(start)
184		if elapsed > l.opts.LockWaitTime {
185			return nil, nil
186		}
187
188		// Query wait time should not exceed the lock wait time
189		qOpts.WaitTime = l.opts.LockWaitTime - elapsed
190	}
191	attempts++
192
193	// Look for an existing lock, blocking until not taken
194	pair, meta, err := kv.Get(l.opts.Key, qOpts)
195	if err != nil {
196		return nil, fmt.Errorf("failed to read lock: %v", err)
197	}
198	if pair != nil && pair.Flags != LockFlagValue {
199		return nil, ErrLockConflict
200	}
201	locked := false
202	if pair != nil && pair.Session == l.lockSession {
203		goto HELD
204	}
205	if pair != nil && pair.Session != "" {
206		qOpts.WaitIndex = meta.LastIndex
207		goto WAIT
208	}
209
210	// Try to acquire the lock
211	pair = l.lockEntry(l.lockSession)
212	locked, _, err = kv.Acquire(pair, nil)
213	if err != nil {
214		return nil, fmt.Errorf("failed to acquire lock: %v", err)
215	}
216
217	// Handle the case of not getting the lock
218	if !locked {
219		// Determine why the lock failed
220		qOpts.WaitIndex = 0
221		pair, meta, err = kv.Get(l.opts.Key, qOpts)
222		if pair != nil && pair.Session != "" {
223			//If the session is not null, this means that a wait can safely happen
224			//using a long poll
225			qOpts.WaitIndex = meta.LastIndex
226			goto WAIT
227		} else {
228			// If the session is empty and the lock failed to acquire, then it means
229			// a lock-delay is in effect and a timed wait must be used
230			select {
231			case <-time.After(DefaultLockRetryTime):
232				goto WAIT
233			case <-stopCh:
234				return nil, nil
235			}
236		}
237	}
238
239HELD:
240	// Watch to ensure we maintain leadership
241	leaderCh := make(chan struct{})
242	go l.monitorLock(l.lockSession, leaderCh)
243
244	// Set that we own the lock
245	l.isHeld = true
246
247	// Locked! All done
248	return leaderCh, nil
249}
250
251// Unlock released the lock. It is an error to call this
252// if the lock is not currently held.
253func (l *Lock) Unlock() error {
254	// Hold the lock as we try to release
255	l.l.Lock()
256	defer l.l.Unlock()
257
258	// Ensure the lock is actually held
259	if !l.isHeld {
260		return ErrLockNotHeld
261	}
262
263	// Set that we no longer own the lock
264	l.isHeld = false
265
266	// Stop the session renew
267	if l.sessionRenew != nil {
268		defer func() {
269			close(l.sessionRenew)
270			l.sessionRenew = nil
271		}()
272	}
273
274	// Get the lock entry, and clear the lock session
275	lockEnt := l.lockEntry(l.lockSession)
276	l.lockSession = ""
277
278	// Release the lock explicitly
279	kv := l.c.KV()
280	_, _, err := kv.Release(lockEnt, nil)
281	if err != nil {
282		return fmt.Errorf("failed to release lock: %v", err)
283	}
284	return nil
285}
286
287// Destroy is used to cleanup the lock entry. It is not necessary
288// to invoke. It will fail if the lock is in use.
289func (l *Lock) Destroy() error {
290	// Hold the lock as we try to release
291	l.l.Lock()
292	defer l.l.Unlock()
293
294	// Check if we already hold the lock
295	if l.isHeld {
296		return ErrLockHeld
297	}
298
299	// Look for an existing lock
300	kv := l.c.KV()
301	pair, _, err := kv.Get(l.opts.Key, nil)
302	if err != nil {
303		return fmt.Errorf("failed to read lock: %v", err)
304	}
305
306	// Nothing to do if the lock does not exist
307	if pair == nil {
308		return nil
309	}
310
311	// Check for possible flag conflict
312	if pair.Flags != LockFlagValue {
313		return ErrLockConflict
314	}
315
316	// Check if it is in use
317	if pair.Session != "" {
318		return ErrLockInUse
319	}
320
321	// Attempt the delete
322	didRemove, _, err := kv.DeleteCAS(pair, nil)
323	if err != nil {
324		return fmt.Errorf("failed to remove lock: %v", err)
325	}
326	if !didRemove {
327		return ErrLockInUse
328	}
329	return nil
330}
331
332// createSession is used to create a new managed session
333func (l *Lock) createSession() (string, error) {
334	session := l.c.Session()
335	se := l.opts.SessionOpts
336	if se == nil {
337		se = &SessionEntry{
338			Name: l.opts.SessionName,
339			TTL:  l.opts.SessionTTL,
340		}
341	}
342	id, _, err := session.Create(se, nil)
343	if err != nil {
344		return "", err
345	}
346	return id, nil
347}
348
349// lockEntry returns a formatted KVPair for the lock
350func (l *Lock) lockEntry(session string) *KVPair {
351	return &KVPair{
352		Key:     l.opts.Key,
353		Value:   l.opts.Value,
354		Session: session,
355		Flags:   LockFlagValue,
356	}
357}
358
359// monitorLock is a long running routine to monitor a lock ownership
360// It closes the stopCh if we lose our leadership.
361func (l *Lock) monitorLock(session string, stopCh chan struct{}) {
362	defer close(stopCh)
363	kv := l.c.KV()
364	opts := &QueryOptions{RequireConsistent: true}
365WAIT:
366	retries := l.opts.MonitorRetries
367RETRY:
368	pair, meta, err := kv.Get(l.opts.Key, opts)
369	if err != nil {
370		// If configured we can try to ride out a brief Consul unavailability
371		// by doing retries. Note that we have to attempt the retry in a non-
372		// blocking fashion so that we have a clean place to reset the retry
373		// counter if service is restored.
374		if retries > 0 && IsRetryableError(err) {
375			time.Sleep(l.opts.MonitorRetryTime)
376			retries--
377			opts.WaitIndex = 0
378			goto RETRY
379		}
380		return
381	}
382	if pair != nil && pair.Session == session {
383		opts.WaitIndex = meta.LastIndex
384		goto WAIT
385	}
386}
387