1package api
2
3import (
4	"log"
5	"net/http"
6	"net/http/httptest"
7	"net/http/httputil"
8	"strings"
9	"sync"
10	"testing"
11	"time"
12)
13
14func createTestSemaphore(t *testing.T, c *Client, prefix string, limit int) (*Semaphore, *Session) {
15	t.Helper()
16	session := c.Session()
17
18	se := &SessionEntry{
19		Name:     DefaultSemaphoreSessionName,
20		TTL:      DefaultSemaphoreSessionTTL,
21		Behavior: SessionBehaviorDelete,
22	}
23	id, _, err := session.CreateNoChecks(se, nil)
24	if err != nil {
25		t.Fatalf("err: %v", err)
26	}
27
28	opts := &SemaphoreOptions{
29		Prefix:      prefix,
30		Limit:       limit,
31		Session:     id,
32		SessionName: se.Name,
33		SessionTTL:  se.TTL,
34	}
35	sema, err := c.SemaphoreOpts(opts)
36	if err != nil {
37		t.Fatalf("err: %v", err)
38	}
39
40	return sema, session
41}
42
43func TestAPI_SemaphoreAcquireRelease(t *testing.T) {
44	t.Parallel()
45	c, s := makeClient(t)
46	defer s.Stop()
47
48	sema, session := createTestSemaphore(t, c, "test/semaphore", 2)
49	defer session.Destroy(sema.opts.Session, nil)
50
51	// Initial release should fail
52	err := sema.Release()
53	if err != ErrSemaphoreNotHeld {
54		t.Fatalf("err: %v", err)
55	}
56
57	// Should work
58	lockCh, err := sema.Acquire(nil)
59	if err != nil {
60		t.Fatalf("err: %v", err)
61	}
62	if lockCh == nil {
63		t.Fatalf("not hold")
64	}
65
66	// Double lock should fail
67	_, err = sema.Acquire(nil)
68	if err != ErrSemaphoreHeld {
69		t.Fatalf("err: %v", err)
70	}
71
72	// Should be held
73	select {
74	case <-lockCh:
75		t.Fatalf("should be held")
76	default:
77	}
78
79	// Initial release should work
80	err = sema.Release()
81	if err != nil {
82		t.Fatalf("err: %v", err)
83	}
84
85	// Double unlock should fail
86	err = sema.Release()
87	if err != ErrSemaphoreNotHeld {
88		t.Fatalf("err: %v", err)
89	}
90
91	// Should lose resource
92	select {
93	case <-lockCh:
94	case <-time.After(time.Second):
95		t.Fatalf("should not be held")
96	}
97}
98
99func TestAPI_SemaphoreForceInvalidate(t *testing.T) {
100	t.Parallel()
101	c, s := makeClient(t)
102	defer s.Stop()
103
104	s.WaitForSerfCheck(t)
105
106	sema, session := createTestSemaphore(t, c, "test/semaphore", 2)
107	defer session.Destroy(sema.opts.Session, nil)
108
109	// Should work
110	lockCh, err := sema.Acquire(nil)
111	if err != nil {
112		t.Fatalf("err: %v", err)
113	}
114	if lockCh == nil {
115		t.Fatalf("not acquired")
116	}
117	defer sema.Release()
118
119	go func() {
120		// Nuke the session, simulator an operator invalidation
121		// or a health check failure
122		session := c.Session()
123		session.Destroy(sema.lockSession, nil)
124	}()
125
126	// Should loose slot
127	select {
128	case <-lockCh:
129	case <-time.After(time.Second):
130		t.Fatalf("should not be locked")
131	}
132}
133
134func TestAPI_SemaphoreDeleteKey(t *testing.T) {
135	t.Parallel()
136	c, s := makeClient(t)
137	defer s.Stop()
138
139	s.WaitForSerfCheck(t)
140
141	sema, session := createTestSemaphore(t, c, "test/semaphore", 2)
142	defer session.Destroy(sema.opts.Session, nil)
143
144	// Should work
145	lockCh, err := sema.Acquire(nil)
146	if err != nil {
147		t.Fatalf("err: %v", err)
148	}
149	if lockCh == nil {
150		t.Fatalf("not locked")
151	}
152	defer sema.Release()
153
154	go func() {
155		// Nuke the key, simulate an operator intervention
156		kv := c.KV()
157		kv.DeleteTree("test/semaphore", nil)
158	}()
159
160	// Should loose leadership
161	select {
162	case <-lockCh:
163	case <-time.After(time.Second):
164		t.Fatalf("should not be locked")
165	}
166}
167
168func TestAPI_SemaphoreContend(t *testing.T) {
169	t.Parallel()
170	c, s := makeClient(t)
171	defer s.Stop()
172
173	s.WaitForSerfCheck(t)
174
175	wg := &sync.WaitGroup{}
176	acquired := make([]bool, 4)
177	for idx := range acquired {
178		wg.Add(1)
179		go func(idx int) {
180			defer wg.Done()
181			sema, session := createTestSemaphore(t, c, "test/semaphore", 2)
182			defer session.Destroy(sema.opts.Session, nil)
183
184			// Should work eventually, will contend
185			lockCh, err := sema.Acquire(nil)
186			if err != nil {
187				t.Errorf("err: %v", err)
188				return
189			}
190			if lockCh == nil {
191				t.Errorf("not locked")
192				return
193			}
194			defer sema.Release()
195			log.Printf("Contender %d acquired", idx)
196
197			// Set acquired and then leave
198			acquired[idx] = true
199		}(idx)
200	}
201
202	// Wait for termination
203	doneCh := make(chan struct{})
204	go func() {
205		wg.Wait()
206		close(doneCh)
207	}()
208
209	// Wait for everybody to get a turn
210	select {
211	case <-doneCh:
212	case <-time.After(3 * DefaultLockRetryTime):
213		t.Fatalf("timeout")
214	}
215
216	for idx, did := range acquired {
217		if !did {
218			t.Fatalf("contender %d never acquired", idx)
219		}
220	}
221}
222
223func TestAPI_SemaphoreBadLimit(t *testing.T) {
224	t.Parallel()
225	c, s := makeClient(t)
226	defer s.Stop()
227
228	s.WaitForSerfCheck(t)
229
230	_, err := c.SemaphorePrefix("test/semaphore", 0)
231	if err == nil {
232		t.Fatalf("should error, limit must be positive")
233	}
234
235	sema, session := createTestSemaphore(t, c, "test/semaphore", 1)
236	defer session.Destroy(sema.opts.Session, nil)
237
238	_, err = sema.Acquire(nil)
239	if err != nil {
240		t.Fatalf("err: %v", err)
241	}
242
243	sema2, session := createTestSemaphore(t, c, "test/semaphore", 2)
244	defer session.Destroy(sema.opts.Session, nil)
245
246	_, err = sema2.Acquire(nil)
247	if err.Error() != "semaphore limit conflict (lock: 1, local: 2)" {
248		t.Fatalf("err: %v", err)
249	}
250}
251
252func TestAPI_SemaphoreDestroy(t *testing.T) {
253	t.Parallel()
254	c, s := makeClient(t)
255	defer s.Stop()
256
257	s.WaitForSerfCheck(t)
258
259	sema, session := createTestSemaphore(t, c, "test/semaphore", 2)
260	defer session.Destroy(sema.opts.Session, nil)
261
262	sema2, session := createTestSemaphore(t, c, "test/semaphore", 2)
263	defer session.Destroy(sema.opts.Session, nil)
264
265	_, err := sema.Acquire(nil)
266	if err != nil {
267		t.Fatalf("err: %v", err)
268	}
269
270	_, err = sema2.Acquire(nil)
271	if err != nil {
272		t.Fatalf("err: %v", err)
273	}
274
275	// Destroy should fail, still held
276	if err := sema.Destroy(); err != ErrSemaphoreHeld {
277		t.Fatalf("err: %v", err)
278	}
279
280	err = sema.Release()
281	if err != nil {
282		t.Fatalf("err: %v", err)
283	}
284
285	// Destroy should fail, still in use
286	if err := sema.Destroy(); err != ErrSemaphoreInUse {
287		t.Fatalf("err: %v", err)
288	}
289
290	err = sema2.Release()
291	if err != nil {
292		t.Fatalf("err: %v", err)
293	}
294
295	// Destroy should work
296	if err := sema.Destroy(); err != nil {
297		t.Fatalf("err: %v", err)
298	}
299
300	// Destroy should work
301	if err := sema2.Destroy(); err != nil {
302		t.Fatalf("err: %v", err)
303	}
304}
305
306func TestAPI_SemaphoreConflict(t *testing.T) {
307	t.Parallel()
308	c, s := makeClient(t)
309	defer s.Stop()
310
311	s.WaitForSerfCheck(t)
312	lock, session := createTestLock(t, c, "test/sema/.lock")
313	defer session.Destroy(lock.opts.Session, nil)
314
315	// Should work
316	leaderCh, err := lock.Lock(nil)
317	if err != nil {
318		t.Fatalf("err: %v", err)
319	}
320	if leaderCh == nil {
321		t.Fatalf("not leader")
322	}
323	defer lock.Unlock()
324
325	sema, session := createTestSemaphore(t, c, "test/sema/", 2)
326	defer session.Destroy(sema.opts.Session, nil)
327
328	// Should conflict with lock
329	_, err = sema.Acquire(nil)
330	if err != ErrSemaphoreConflict {
331		t.Fatalf("err: %v", err)
332	}
333
334	// Should conflict with lock
335	err = sema.Destroy()
336	if err != ErrSemaphoreConflict {
337		t.Fatalf("err: %v", err)
338	}
339}
340
341func TestAPI_SemaphoreMonitorRetry(t *testing.T) {
342	t.Parallel()
343	raw, s := makeClient(t)
344	defer s.Stop()
345
346	s.WaitForSerfCheck(t)
347
348	// Set up a server that always responds with 500 errors.
349	failer := func(w http.ResponseWriter, req *http.Request) {
350		w.WriteHeader(500)
351	}
352	outage := httptest.NewServer(http.HandlerFunc(failer))
353	defer outage.Close()
354
355	// Set up a reverse proxy that will send some requests to the
356	// 500 server and pass everything else through to the real Consul
357	// server.
358	var mutex sync.Mutex
359	errors := 0
360	director := func(req *http.Request) {
361		mutex.Lock()
362		defer mutex.Unlock()
363
364		req.URL.Scheme = "http"
365		if errors > 0 && req.Method == "GET" && strings.Contains(req.URL.Path, "/v1/kv/test/sema/.lock") {
366			req.URL.Host = outage.URL[7:] // Strip off "http://".
367			errors--
368		} else {
369			req.URL.Host = raw.config.Address
370		}
371	}
372	proxy := httptest.NewServer(&httputil.ReverseProxy{Director: director})
373	defer proxy.Close()
374
375	// Make another client that points at the proxy instead of the real
376	// Consul server.
377	config := raw.config
378	config.Address = proxy.URL[7:] // Strip off "http://".
379	c, err := NewClient(&config)
380	if err != nil {
381		t.Fatalf("err: %v", err)
382	}
383
384	// Set up a lock with retries enabled.
385	opts := &SemaphoreOptions{
386		Prefix:         "test/sema/.lock",
387		Limit:          2,
388		SessionTTL:     "60s",
389		MonitorRetries: 3,
390	}
391	sema, err := c.SemaphoreOpts(opts)
392	if err != nil {
393		t.Fatalf("err: %v", err)
394	}
395
396	// Make sure the default got set.
397	if sema.opts.MonitorRetryTime != DefaultMonitorRetryTime {
398		t.Fatalf("bad: %d", sema.opts.MonitorRetryTime)
399	}
400
401	// Now set a custom time for the test.
402	opts.MonitorRetryTime = 250 * time.Millisecond
403	sema, err = c.SemaphoreOpts(opts)
404	if err != nil {
405		t.Fatalf("err: %v", err)
406	}
407	if sema.opts.MonitorRetryTime != 250*time.Millisecond {
408		t.Fatalf("bad: %d", sema.opts.MonitorRetryTime)
409	}
410
411	// Should get the lock.
412	ch, err := sema.Acquire(nil)
413	if err != nil {
414		t.Fatalf("err: %v", err)
415	}
416	if ch == nil {
417		t.Fatalf("didn't acquire")
418	}
419
420	// Take the semaphore using the raw client to force the monitor to wake
421	// up and check the lock again. This time we will return errors for some
422	// of the responses.
423	mutex.Lock()
424	errors = 2
425	mutex.Unlock()
426	another, err := raw.SemaphoreOpts(opts)
427	if err != nil {
428		t.Fatalf("err: %v", err)
429	}
430	if _, err := another.Acquire(nil); err != nil {
431		t.Fatalf("err: %v", err)
432	}
433	time.Sleep(5 * opts.MonitorRetryTime)
434
435	// Should still have the semaphore.
436	select {
437	case <-ch:
438		t.Fatalf("lost the semaphore")
439	default:
440	}
441
442	// Now return an overwhelming number of errors, using the raw client to
443	// poke the key and get the monitor to run again.
444	mutex.Lock()
445	errors = 10
446	mutex.Unlock()
447	if err := another.Release(); err != nil {
448		t.Fatalf("err: %v", err)
449	}
450	time.Sleep(5 * opts.MonitorRetryTime)
451
452	// Should lose the semaphore.
453	select {
454	case <-ch:
455	case <-time.After(time.Second):
456		t.Fatalf("should not have the semaphore")
457	}
458}
459
460func TestAPI_SemaphoreOneShot(t *testing.T) {
461	t.Parallel()
462	c, s := makeClient(t)
463	defer s.Stop()
464
465	s.WaitForSerfCheck(t)
466
467	// Set up a semaphore as a one-shot.
468	opts := &SemaphoreOptions{
469		Prefix:           "test/sema/.lock",
470		Limit:            2,
471		SemaphoreTryOnce: true,
472	}
473	sema, err := c.SemaphoreOpts(opts)
474	if err != nil {
475		t.Fatalf("err: %v", err)
476	}
477
478	// Make sure the default got set.
479	if sema.opts.SemaphoreWaitTime != DefaultSemaphoreWaitTime {
480		t.Fatalf("bad: %d", sema.opts.SemaphoreWaitTime)
481	}
482
483	// Now set a custom time for the test.
484	opts.SemaphoreWaitTime = 250 * time.Millisecond
485	sema, err = c.SemaphoreOpts(opts)
486	if err != nil {
487		t.Fatalf("err: %v", err)
488	}
489	if sema.opts.SemaphoreWaitTime != 250*time.Millisecond {
490		t.Fatalf("bad: %d", sema.opts.SemaphoreWaitTime)
491	}
492
493	// Should acquire the semaphore.
494	ch, err := sema.Acquire(nil)
495	if err != nil {
496		t.Fatalf("err: %v", err)
497	}
498	if ch == nil {
499		t.Fatalf("should have acquired the semaphore")
500	}
501
502	// Try with another session.
503	another, err := c.SemaphoreOpts(opts)
504	if err != nil {
505		t.Fatalf("err: %v", err)
506	}
507	ch, err = another.Acquire(nil)
508	if err != nil {
509		t.Fatalf("err: %v", err)
510	}
511	if ch == nil {
512		t.Fatalf("should have acquired the semaphore")
513	}
514
515	// Try with a third one that shouldn't get it.
516	contender, err := c.SemaphoreOpts(opts)
517	if err != nil {
518		t.Fatalf("err: %v", err)
519	}
520	start := time.Now()
521	ch, err = contender.Acquire(nil)
522	if err != nil {
523		t.Fatalf("err: %v", err)
524	}
525	if ch != nil {
526		t.Fatalf("should not have acquired the semaphore")
527	}
528	diff := time.Since(start)
529	if diff < contender.opts.SemaphoreWaitTime {
530		t.Fatalf("time out of bounds: %9.6f", diff.Seconds())
531	}
532
533	// Give up a slot and make sure the third one can get it.
534	if err := another.Release(); err != nil {
535		t.Fatalf("err: %v", err)
536	}
537	ch, err = contender.Acquire(nil)
538	if err != nil {
539		t.Fatalf("err: %v", err)
540	}
541	if ch == nil {
542		t.Fatalf("should have acquired the semaphore")
543	}
544}
545