1package api
2
3import (
4	"log"
5	"net/http"
6	"net/http/httptest"
7	"net/http/httputil"
8	"strings"
9	"sync"
10	"testing"
11	"time"
12
13	"github.com/hashicorp/consul/sdk/testutil/retry"
14)
15
16func createTestLock(t *testing.T, c *Client, key string) (*Lock, *Session) {
17	t.Helper()
18	session := c.Session()
19
20	se := &SessionEntry{
21		Name:     DefaultLockSessionName,
22		TTL:      DefaultLockSessionTTL,
23		Behavior: SessionBehaviorDelete,
24	}
25	id, _, err := session.CreateNoChecks(se, nil)
26	if err != nil {
27		t.Fatalf("err: %v", err)
28	}
29
30	opts := &LockOptions{
31		Key:         key,
32		Session:     id,
33		SessionName: se.Name,
34		SessionTTL:  se.TTL,
35	}
36	lock, err := c.LockOpts(opts)
37	if err != nil {
38		t.Fatalf("err: %v", err)
39	}
40
41	return lock, session
42}
43
44func TestAPI_LockLockUnlock(t *testing.T) {
45	t.Parallel()
46	c, s := makeClientWithoutConnect(t)
47	defer s.Stop()
48
49	lock, session := createTestLock(t, c, "test/lock")
50	defer session.Destroy(lock.opts.Session, nil)
51
52	// Initial unlock should fail
53	err := lock.Unlock()
54	if err != ErrLockNotHeld {
55		t.Fatalf("err: %v", err)
56	}
57
58	// Should work
59	leaderCh, err := lock.Lock(nil)
60	if err != nil {
61		t.Fatalf("err: %v", err)
62	}
63	if leaderCh == nil {
64		t.Fatalf("not leader")
65	}
66
67	// Double lock should fail
68	_, err = lock.Lock(nil)
69	if err != ErrLockHeld {
70		t.Fatalf("err: %v", err)
71	}
72
73	// Should be leader
74	select {
75	case <-leaderCh:
76		t.Fatalf("should be leader")
77	default:
78	}
79
80	// Initial unlock should work
81	err = lock.Unlock()
82	if err != nil {
83		t.Fatalf("err: %v", err)
84	}
85
86	// Double unlock should fail
87	err = lock.Unlock()
88	if err != ErrLockNotHeld {
89		t.Fatalf("err: %v", err)
90	}
91
92	// Should lose leadership
93	select {
94	case <-leaderCh:
95	case <-time.After(time.Second):
96		t.Fatalf("should not be leader")
97	}
98}
99
100func TestAPI_LockForceInvalidate(t *testing.T) {
101	t.Parallel()
102	c, s := makeClientWithoutConnect(t)
103	defer s.Stop()
104
105	retry.Run(t, func(r *retry.R) {
106		lock, session := createTestLock(t, c, "test/lock")
107		defer session.Destroy(lock.opts.Session, nil)
108
109		// Should work
110		leaderCh, err := lock.Lock(nil)
111		if err != nil {
112			r.Fatalf("err: %v", err)
113		}
114		if leaderCh == nil {
115			r.Fatalf("not leader")
116		}
117		defer lock.Unlock()
118
119		go func() {
120			// Nuke the session, simulator an operator invalidation
121			// or a health check failure
122			session := c.Session()
123			session.Destroy(lock.lockSession, nil)
124		}()
125
126		// Should loose leadership
127		select {
128		case <-leaderCh:
129		case <-time.After(time.Second):
130			r.Fatalf("should not be leader")
131		}
132	})
133}
134
135func TestAPI_LockDeleteKey(t *testing.T) {
136	t.Parallel()
137	c, s := makeClientWithoutConnect(t)
138	defer s.Stop()
139
140	// This uncovered some issues around special-case handling of low index
141	// numbers where it would work with a low number but fail for higher
142	// ones, so we loop this a bit to sweep the index up out of that
143	// territory.
144	for i := 0; i < 10; i++ {
145		func() {
146			lock, session := createTestLock(t, c, "test/lock")
147			defer session.Destroy(lock.opts.Session, nil)
148
149			// Should work
150			leaderCh, err := lock.Lock(nil)
151			if err != nil {
152				t.Fatalf("err: %v", err)
153			}
154			if leaderCh == nil {
155				t.Fatalf("not leader")
156			}
157			defer lock.Unlock()
158
159			go func() {
160				// Nuke the key, simulate an operator intervention
161				kv := c.KV()
162				kv.Delete("test/lock", nil)
163			}()
164
165			// Should loose leadership
166			select {
167			case <-leaderCh:
168			case <-time.After(10 * time.Second):
169				t.Fatalf("should not be leader")
170			}
171		}()
172	}
173}
174
175func TestAPI_LockContend(t *testing.T) {
176	t.Parallel()
177	c, s := makeClientWithoutConnect(t)
178	defer s.Stop()
179
180	wg := &sync.WaitGroup{}
181	acquired := make([]bool, 3)
182	for idx := range acquired {
183		wg.Add(1)
184		go func(idx int) {
185			defer wg.Done()
186			lock, session := createTestLock(t, c, "test/lock")
187			defer session.Destroy(lock.opts.Session, nil)
188
189			// Should work eventually, will contend
190			leaderCh, err := lock.Lock(nil)
191			if err != nil {
192				t.Errorf("err: %v", err)
193				return
194			}
195			if leaderCh == nil {
196				t.Errorf("not leader")
197				return
198			}
199			defer lock.Unlock()
200			log.Printf("Contender %d acquired", idx)
201
202			// Set acquired and then leave
203			acquired[idx] = true
204		}(idx)
205	}
206
207	// Wait for termination
208	doneCh := make(chan struct{})
209	go func() {
210		wg.Wait()
211		close(doneCh)
212	}()
213
214	// Wait for everybody to get a turn
215	select {
216	case <-doneCh:
217	case <-time.After(3 * DefaultLockRetryTime):
218		t.Fatalf("timeout")
219	}
220
221	for idx, did := range acquired {
222		if !did {
223			t.Fatalf("contender %d never acquired", idx)
224		}
225	}
226}
227
228func TestAPI_LockDestroy(t *testing.T) {
229	t.Parallel()
230	c, s := makeClientWithoutConnect(t)
231	defer s.Stop()
232
233	lock, session := createTestLock(t, c, "test/lock")
234	defer session.Destroy(lock.opts.Session, nil)
235
236	// Should work
237	leaderCh, err := lock.Lock(nil)
238	if err != nil {
239		t.Fatalf("err: %v", err)
240	}
241	if leaderCh == nil {
242		t.Fatalf("not leader")
243	}
244
245	// Destroy should fail
246	if err := lock.Destroy(); err != ErrLockHeld {
247		t.Fatalf("err: %v", err)
248	}
249
250	// Should be able to release
251	err = lock.Unlock()
252	if err != nil {
253		t.Fatalf("err: %v", err)
254	}
255
256	// Acquire with a different lock
257	l2, session := createTestLock(t, c, "test/lock")
258	defer session.Destroy(lock.opts.Session, nil)
259
260	// Should work
261	leaderCh, err = l2.Lock(nil)
262	if err != nil {
263		t.Fatalf("err: %v", err)
264	}
265	if leaderCh == nil {
266		t.Fatalf("not leader")
267	}
268
269	// Destroy should still fail
270	if err := lock.Destroy(); err != ErrLockInUse {
271		t.Fatalf("err: %v", err)
272	}
273
274	// Should release
275	err = l2.Unlock()
276	if err != nil {
277		t.Fatalf("err: %v", err)
278	}
279
280	// Destroy should work
281	err = lock.Destroy()
282	if err != nil {
283		t.Fatalf("err: %v", err)
284	}
285
286	// Double destroy should work
287	err = l2.Destroy()
288	if err != nil {
289		t.Fatalf("err: %v", err)
290	}
291}
292
293func TestAPI_LockConflict(t *testing.T) {
294	t.Parallel()
295	c, s := makeClientWithoutConnect(t)
296	defer s.Stop()
297
298	sema, session := createTestSemaphore(t, c, "test/lock/", 2)
299	defer session.Destroy(sema.opts.Session, nil)
300
301	// Should work
302	lockCh, err := sema.Acquire(nil)
303	if err != nil {
304		t.Fatalf("err: %v", err)
305	}
306	if lockCh == nil {
307		t.Fatalf("not hold")
308	}
309	defer sema.Release()
310
311	lock, session := createTestLock(t, c, "test/lock/.lock")
312	defer session.Destroy(lock.opts.Session, nil)
313
314	// Should conflict with semaphore
315	_, err = lock.Lock(nil)
316	if err != ErrLockConflict {
317		t.Fatalf("err: %v", err)
318	}
319
320	// Should conflict with semaphore
321	err = lock.Destroy()
322	if err != ErrLockConflict {
323		t.Fatalf("err: %v", err)
324	}
325}
326
327func TestAPI_LockReclaimLock(t *testing.T) {
328	t.Parallel()
329	c, s := makeClientWithoutConnect(t)
330	defer s.Stop()
331
332	s.WaitForSerfCheck(t)
333
334	session, _, err := c.Session().Create(&SessionEntry{}, nil)
335	if err != nil {
336		t.Fatalf("err: %v", err)
337	}
338
339	lock, err := c.LockOpts(&LockOptions{Key: "test/lock", Session: session})
340	if err != nil {
341		t.Fatalf("err: %v", err)
342	}
343
344	// Should work
345	leaderCh, err := lock.Lock(nil)
346	if err != nil {
347		t.Fatalf("err: %v", err)
348	}
349	if leaderCh == nil {
350		t.Fatalf("not leader")
351	}
352	defer lock.Unlock()
353
354	l2, err := c.LockOpts(&LockOptions{Key: "test/lock", Session: session})
355	if err != nil {
356		t.Fatalf("err: %v", err)
357	}
358
359	reclaimed := make(chan (<-chan struct{}), 1)
360	go func() {
361		l2Ch, err := l2.Lock(nil)
362		if err != nil {
363			t.Errorf("not locked: %v", err)
364		}
365		reclaimed <- l2Ch
366	}()
367
368	// Should reclaim the lock
369	var leader2Ch <-chan struct{}
370
371	select {
372	case leader2Ch = <-reclaimed:
373	case <-time.After(time.Second):
374		t.Fatalf("should have locked")
375	}
376
377	// unlock should work
378	err = l2.Unlock()
379	if err != nil {
380		t.Fatalf("err: %v", err)
381	}
382
383	//Both locks should see the unlock
384	select {
385	case <-leader2Ch:
386	case <-time.After(time.Second):
387		t.Fatalf("should not be leader")
388	}
389
390	select {
391	case <-leaderCh:
392	case <-time.After(time.Second):
393		t.Fatalf("should not be leader")
394	}
395}
396
397func TestAPI_LockMonitorRetry(t *testing.T) {
398	t.Parallel()
399	raw, s := makeClientWithoutConnect(t)
400	defer s.Stop()
401
402	s.WaitForSerfCheck(t)
403
404	// Set up a server that always responds with 500 errors.
405	failer := func(w http.ResponseWriter, req *http.Request) {
406		w.WriteHeader(500)
407	}
408	outage := httptest.NewServer(http.HandlerFunc(failer))
409	defer outage.Close()
410
411	// Set up a reverse proxy that will send some requests to the
412	// 500 server and pass everything else through to the real Consul
413	// server.
414	var mutex sync.Mutex
415	errors := 0
416	director := func(req *http.Request) {
417		mutex.Lock()
418		defer mutex.Unlock()
419
420		req.URL.Scheme = "http"
421		if errors > 0 && req.Method == "GET" && strings.Contains(req.URL.Path, "/v1/kv/test/lock") {
422			req.URL.Host = outage.URL[7:] // Strip off "http://".
423			errors--
424		} else {
425			req.URL.Host = raw.config.Address
426		}
427	}
428	proxy := httptest.NewServer(&httputil.ReverseProxy{Director: director})
429	defer proxy.Close()
430
431	// Make another client that points at the proxy instead of the real
432	// Consul server.
433	config := raw.config
434	config.Address = proxy.URL[7:] // Strip off "http://".
435	c, err := NewClient(&config)
436	if err != nil {
437		t.Fatalf("err: %v", err)
438	}
439
440	// Set up a lock with retries enabled.
441	opts := &LockOptions{
442		Key:            "test/lock",
443		SessionTTL:     "60s",
444		MonitorRetries: 3,
445	}
446	lock, err := c.LockOpts(opts)
447	if err != nil {
448		t.Fatalf("err: %v", err)
449	}
450
451	// Make sure the default got set.
452	if lock.opts.MonitorRetryTime != DefaultMonitorRetryTime {
453		t.Fatalf("bad: %d", lock.opts.MonitorRetryTime)
454	}
455
456	// Now set a custom time for the test.
457	opts.MonitorRetryTime = 250 * time.Millisecond
458	lock, err = c.LockOpts(opts)
459	if err != nil {
460		t.Fatalf("err: %v", err)
461	}
462	if lock.opts.MonitorRetryTime != 250*time.Millisecond {
463		t.Fatalf("bad: %d", lock.opts.MonitorRetryTime)
464	}
465
466	// Should get the lock.
467	leaderCh, err := lock.Lock(nil)
468	if err != nil {
469		t.Fatalf("err: %v", err)
470	}
471	if leaderCh == nil {
472		t.Fatalf("not leader")
473	}
474
475	// Poke the key using the raw client to force the monitor to wake up
476	// and check the lock again. This time we will return errors for some
477	// of the responses.
478	mutex.Lock()
479	errors = 2
480	mutex.Unlock()
481	pair, _, err := raw.KV().Get("test/lock", &QueryOptions{})
482	if err != nil {
483		t.Fatalf("err: %v", err)
484	}
485	pair.Value = []byte{1}
486	if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil {
487		t.Fatalf("err: %v", err)
488	}
489	time.Sleep(5 * opts.MonitorRetryTime)
490
491	// Should still be the leader.
492	select {
493	case <-leaderCh:
494		t.Fatalf("should be leader")
495	default:
496	}
497
498	// Now return an overwhelming number of errors.
499	mutex.Lock()
500	errors = 10
501	mutex.Unlock()
502	pair.Value = []byte{2}
503	if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil {
504		t.Fatalf("err: %v", err)
505	}
506	time.Sleep(5 * opts.MonitorRetryTime)
507
508	// Should lose leadership.
509	select {
510	case <-leaderCh:
511	case <-time.After(time.Second):
512		t.Fatalf("should not be leader")
513	}
514}
515
516func TestAPI_LockOneShot(t *testing.T) {
517	t.Parallel()
518	c, s := makeClientWithoutConnect(t)
519	defer s.Stop()
520
521	s.WaitForSerfCheck(t)
522
523	// Set up a lock as a one-shot.
524	opts := &LockOptions{
525		Key:         "test/lock",
526		LockTryOnce: true,
527	}
528	lock, err := c.LockOpts(opts)
529	if err != nil {
530		t.Fatalf("err: %v", err)
531	}
532
533	// Make sure the default got set.
534	if lock.opts.LockWaitTime != DefaultLockWaitTime {
535		t.Fatalf("bad: %d", lock.opts.LockWaitTime)
536	}
537
538	// Now set a custom time for the test.
539	opts.LockWaitTime = 250 * time.Millisecond
540	lock, err = c.LockOpts(opts)
541	if err != nil {
542		t.Fatalf("err: %v", err)
543	}
544	if lock.opts.LockWaitTime != 250*time.Millisecond {
545		t.Fatalf("bad: %d", lock.opts.LockWaitTime)
546	}
547
548	// Should get the lock.
549	ch, err := lock.Lock(nil)
550	if err != nil {
551		t.Fatalf("err: %v", err)
552	}
553	if ch == nil {
554		t.Fatalf("not leader")
555	}
556
557	// Now try with another session.
558	contender, err := c.LockOpts(opts)
559	if err != nil {
560		t.Fatalf("err: %v", err)
561	}
562	start := time.Now()
563	ch, err = contender.Lock(nil)
564	if err != nil {
565		t.Fatalf("err: %v", err)
566	}
567	if ch != nil {
568		t.Fatalf("should not be leader")
569	}
570	diff := time.Since(start)
571	if diff < contender.opts.LockWaitTime || diff > 2*contender.opts.LockWaitTime {
572		t.Fatalf("time out of bounds: %9.6f", diff.Seconds())
573	}
574
575	// Unlock and then make sure the contender can get it.
576	if err := lock.Unlock(); err != nil {
577		t.Fatalf("err: %v", err)
578	}
579	ch, err = contender.Lock(nil)
580	if err != nil {
581		t.Fatalf("err: %v", err)
582	}
583	if ch == nil {
584		t.Fatalf("should be leader")
585	}
586}
587