1package api
2
3import (
4	"log"
5	"net/http"
6	"net/http/httptest"
7	"net/http/httputil"
8	"strings"
9	"sync"
10	"testing"
11	"time"
12)
13
14func createTestSemaphore(t *testing.T, c *Client, prefix string, limit int) (*Semaphore, *Session) {
15	t.Helper()
16	session := c.Session()
17
18	se := &SessionEntry{
19		Name:     DefaultSemaphoreSessionName,
20		TTL:      DefaultSemaphoreSessionTTL,
21		Behavior: SessionBehaviorDelete,
22	}
23	id, _, err := session.CreateNoChecks(se, nil)
24	if err != nil {
25		t.Fatalf("err: %v", err)
26	}
27
28	opts := &SemaphoreOptions{
29		Prefix:      prefix,
30		Limit:       limit,
31		Session:     id,
32		SessionName: se.Name,
33		SessionTTL:  se.TTL,
34	}
35	sema, err := c.SemaphoreOpts(opts)
36	if err != nil {
37		t.Fatalf("err: %v", err)
38	}
39
40	return sema, session
41}
42
43func TestAPI_SemaphoreAcquireRelease(t *testing.T) {
44	t.Parallel()
45	c, s := makeClient(t)
46	defer s.Stop()
47
48	sema, session := createTestSemaphore(t, c, "test/semaphore", 2)
49	defer session.Destroy(sema.opts.Session, nil)
50
51	// Initial release should fail
52	err := sema.Release()
53	if err != ErrSemaphoreNotHeld {
54		t.Fatalf("err: %v", err)
55	}
56
57	// Should work
58	lockCh, err := sema.Acquire(nil)
59	if err != nil {
60		t.Fatalf("err: %v", err)
61	}
62	if lockCh == nil {
63		t.Fatalf("not hold")
64	}
65
66	// Double lock should fail
67	_, err = sema.Acquire(nil)
68	if err != ErrSemaphoreHeld {
69		t.Fatalf("err: %v", err)
70	}
71
72	// Should be held
73	select {
74	case <-lockCh:
75		t.Fatalf("should be held")
76	default:
77	}
78
79	// Initial release should work
80	err = sema.Release()
81	if err != nil {
82		t.Fatalf("err: %v", err)
83	}
84
85	// Double unlock should fail
86	err = sema.Release()
87	if err != ErrSemaphoreNotHeld {
88		t.Fatalf("err: %v", err)
89	}
90
91	// Should lose resource
92	select {
93	case <-lockCh:
94	case <-time.After(time.Second):
95		t.Fatalf("should not be held")
96	}
97}
98
99func TestAPI_SemaphoreForceInvalidate(t *testing.T) {
100	t.Parallel()
101	c, s := makeClient(t)
102	defer s.Stop()
103
104	s.WaitForSerfCheck(t)
105
106	sema, session := createTestSemaphore(t, c, "test/semaphore", 2)
107	defer session.Destroy(sema.opts.Session, nil)
108
109	// Should work
110	lockCh, err := sema.Acquire(nil)
111	if err != nil {
112		t.Fatalf("err: %v", err)
113	}
114	if lockCh == nil {
115		t.Fatalf("not acquired")
116	}
117	defer sema.Release()
118
119	go func() {
120		// Nuke the session, simulator an operator invalidation
121		// or a health check failure
122		session := c.Session()
123		session.Destroy(sema.lockSession, nil)
124	}()
125
126	// Should loose slot
127	select {
128	case <-lockCh:
129	case <-time.After(time.Second):
130		t.Fatalf("should not be locked")
131	}
132}
133
134func TestAPI_SemaphoreDeleteKey(t *testing.T) {
135	t.Parallel()
136	c, s := makeClient(t)
137	defer s.Stop()
138
139	s.WaitForSerfCheck(t)
140
141	sema, session := createTestSemaphore(t, c, "test/semaphore", 2)
142	defer session.Destroy(sema.opts.Session, nil)
143
144	// Should work
145	lockCh, err := sema.Acquire(nil)
146	if err != nil {
147		t.Fatalf("err: %v", err)
148	}
149	if lockCh == nil {
150		t.Fatalf("not locked")
151	}
152	defer sema.Release()
153
154	go func() {
155		// Nuke the key, simulate an operator intervention
156		kv := c.KV()
157		kv.DeleteTree("test/semaphore", nil)
158	}()
159
160	// Should loose leadership
161	select {
162	case <-lockCh:
163	case <-time.After(time.Second):
164		t.Fatalf("should not be locked")
165	}
166}
167
168func TestAPI_SemaphoreContend(t *testing.T) {
169	t.Parallel()
170	c, s := makeClient(t)
171	defer s.Stop()
172
173	s.WaitForSerfCheck(t)
174
175	wg := &sync.WaitGroup{}
176	acquired := make([]bool, 4)
177	for idx := range acquired {
178		wg.Add(1)
179		go func(idx int) {
180			defer wg.Done()
181			sema, session := createTestSemaphore(t, c, "test/semaphore", 2)
182			defer session.Destroy(sema.opts.Session, nil)
183
184			// Should work eventually, will contend
185			lockCh, err := sema.Acquire(nil)
186			if err != nil {
187				t.Fatalf("err: %v", err)
188			}
189			if lockCh == nil {
190				t.Fatalf("not locked")
191			}
192			defer sema.Release()
193			log.Printf("Contender %d acquired", idx)
194
195			// Set acquired and then leave
196			acquired[idx] = true
197		}(idx)
198	}
199
200	// Wait for termination
201	doneCh := make(chan struct{})
202	go func() {
203		wg.Wait()
204		close(doneCh)
205	}()
206
207	// Wait for everybody to get a turn
208	select {
209	case <-doneCh:
210	case <-time.After(3 * DefaultLockRetryTime):
211		t.Fatalf("timeout")
212	}
213
214	for idx, did := range acquired {
215		if !did {
216			t.Fatalf("contender %d never acquired", idx)
217		}
218	}
219}
220
221func TestAPI_SemaphoreBadLimit(t *testing.T) {
222	t.Parallel()
223	c, s := makeClient(t)
224	defer s.Stop()
225
226	s.WaitForSerfCheck(t)
227
228	sema, err := c.SemaphorePrefix("test/semaphore", 0)
229	if err == nil {
230		t.Fatalf("should error, limit must be positive")
231	}
232
233	sema, session := createTestSemaphore(t, c, "test/semaphore", 1)
234	defer session.Destroy(sema.opts.Session, nil)
235
236	_, err = sema.Acquire(nil)
237	if err != nil {
238		t.Fatalf("err: %v", err)
239	}
240
241	sema2, session := createTestSemaphore(t, c, "test/semaphore", 2)
242	defer session.Destroy(sema.opts.Session, nil)
243
244	_, err = sema2.Acquire(nil)
245	if err.Error() != "semaphore limit conflict (lock: 1, local: 2)" {
246		t.Fatalf("err: %v", err)
247	}
248}
249
250func TestAPI_SemaphoreDestroy(t *testing.T) {
251	t.Parallel()
252	c, s := makeClient(t)
253	defer s.Stop()
254
255	s.WaitForSerfCheck(t)
256
257	sema, session := createTestSemaphore(t, c, "test/semaphore", 2)
258	defer session.Destroy(sema.opts.Session, nil)
259
260	sema2, session := createTestSemaphore(t, c, "test/semaphore", 2)
261	defer session.Destroy(sema.opts.Session, nil)
262
263	_, err := sema.Acquire(nil)
264	if err != nil {
265		t.Fatalf("err: %v", err)
266	}
267
268	_, err = sema2.Acquire(nil)
269	if err != nil {
270		t.Fatalf("err: %v", err)
271	}
272
273	// Destroy should fail, still held
274	if err := sema.Destroy(); err != ErrSemaphoreHeld {
275		t.Fatalf("err: %v", err)
276	}
277
278	err = sema.Release()
279	if err != nil {
280		t.Fatalf("err: %v", err)
281	}
282
283	// Destroy should fail, still in use
284	if err := sema.Destroy(); err != ErrSemaphoreInUse {
285		t.Fatalf("err: %v", err)
286	}
287
288	err = sema2.Release()
289	if err != nil {
290		t.Fatalf("err: %v", err)
291	}
292
293	// Destroy should work
294	if err := sema.Destroy(); err != nil {
295		t.Fatalf("err: %v", err)
296	}
297
298	// Destroy should work
299	if err := sema2.Destroy(); err != nil {
300		t.Fatalf("err: %v", err)
301	}
302}
303
304func TestAPI_SemaphoreConflict(t *testing.T) {
305	t.Parallel()
306	c, s := makeClient(t)
307	defer s.Stop()
308
309	s.WaitForSerfCheck(t)
310	lock, session := createTestLock(t, c, "test/sema/.lock")
311	defer session.Destroy(lock.opts.Session, nil)
312
313	// Should work
314	leaderCh, err := lock.Lock(nil)
315	if err != nil {
316		t.Fatalf("err: %v", err)
317	}
318	if leaderCh == nil {
319		t.Fatalf("not leader")
320	}
321	defer lock.Unlock()
322
323	sema, session := createTestSemaphore(t, c, "test/sema/", 2)
324	defer session.Destroy(sema.opts.Session, nil)
325
326	// Should conflict with lock
327	_, err = sema.Acquire(nil)
328	if err != ErrSemaphoreConflict {
329		t.Fatalf("err: %v", err)
330	}
331
332	// Should conflict with lock
333	err = sema.Destroy()
334	if err != ErrSemaphoreConflict {
335		t.Fatalf("err: %v", err)
336	}
337}
338
339func TestAPI_SemaphoreMonitorRetry(t *testing.T) {
340	t.Parallel()
341	raw, s := makeClient(t)
342	defer s.Stop()
343
344	s.WaitForSerfCheck(t)
345
346	// Set up a server that always responds with 500 errors.
347	failer := func(w http.ResponseWriter, req *http.Request) {
348		w.WriteHeader(500)
349	}
350	outage := httptest.NewServer(http.HandlerFunc(failer))
351	defer outage.Close()
352
353	// Set up a reverse proxy that will send some requests to the
354	// 500 server and pass everything else through to the real Consul
355	// server.
356	var mutex sync.Mutex
357	errors := 0
358	director := func(req *http.Request) {
359		mutex.Lock()
360		defer mutex.Unlock()
361
362		req.URL.Scheme = "http"
363		if errors > 0 && req.Method == "GET" && strings.Contains(req.URL.Path, "/v1/kv/test/sema/.lock") {
364			req.URL.Host = outage.URL[7:] // Strip off "http://".
365			errors--
366		} else {
367			req.URL.Host = raw.config.Address
368		}
369	}
370	proxy := httptest.NewServer(&httputil.ReverseProxy{Director: director})
371	defer proxy.Close()
372
373	// Make another client that points at the proxy instead of the real
374	// Consul server.
375	config := raw.config
376	config.Address = proxy.URL[7:] // Strip off "http://".
377	c, err := NewClient(&config)
378	if err != nil {
379		t.Fatalf("err: %v", err)
380	}
381
382	// Set up a lock with retries enabled.
383	opts := &SemaphoreOptions{
384		Prefix:         "test/sema/.lock",
385		Limit:          2,
386		SessionTTL:     "60s",
387		MonitorRetries: 3,
388	}
389	sema, err := c.SemaphoreOpts(opts)
390	if err != nil {
391		t.Fatalf("err: %v", err)
392	}
393
394	// Make sure the default got set.
395	if sema.opts.MonitorRetryTime != DefaultMonitorRetryTime {
396		t.Fatalf("bad: %d", sema.opts.MonitorRetryTime)
397	}
398
399	// Now set a custom time for the test.
400	opts.MonitorRetryTime = 250 * time.Millisecond
401	sema, err = c.SemaphoreOpts(opts)
402	if err != nil {
403		t.Fatalf("err: %v", err)
404	}
405	if sema.opts.MonitorRetryTime != 250*time.Millisecond {
406		t.Fatalf("bad: %d", sema.opts.MonitorRetryTime)
407	}
408
409	// Should get the lock.
410	ch, err := sema.Acquire(nil)
411	if err != nil {
412		t.Fatalf("err: %v", err)
413	}
414	if ch == nil {
415		t.Fatalf("didn't acquire")
416	}
417
418	// Take the semaphore using the raw client to force the monitor to wake
419	// up and check the lock again. This time we will return errors for some
420	// of the responses.
421	mutex.Lock()
422	errors = 2
423	mutex.Unlock()
424	another, err := raw.SemaphoreOpts(opts)
425	if err != nil {
426		t.Fatalf("err: %v", err)
427	}
428	if _, err := another.Acquire(nil); err != nil {
429		t.Fatalf("err: %v", err)
430	}
431	time.Sleep(5 * opts.MonitorRetryTime)
432
433	// Should still have the semaphore.
434	select {
435	case <-ch:
436		t.Fatalf("lost the semaphore")
437	default:
438	}
439
440	// Now return an overwhelming number of errors, using the raw client to
441	// poke the key and get the monitor to run again.
442	mutex.Lock()
443	errors = 10
444	mutex.Unlock()
445	if err := another.Release(); err != nil {
446		t.Fatalf("err: %v", err)
447	}
448	time.Sleep(5 * opts.MonitorRetryTime)
449
450	// Should lose the semaphore.
451	select {
452	case <-ch:
453	case <-time.After(time.Second):
454		t.Fatalf("should not have the semaphore")
455	}
456}
457
458func TestAPI_SemaphoreOneShot(t *testing.T) {
459	t.Parallel()
460	c, s := makeClient(t)
461	defer s.Stop()
462
463	s.WaitForSerfCheck(t)
464
465	// Set up a semaphore as a one-shot.
466	opts := &SemaphoreOptions{
467		Prefix:           "test/sema/.lock",
468		Limit:            2,
469		SemaphoreTryOnce: true,
470	}
471	sema, err := c.SemaphoreOpts(opts)
472	if err != nil {
473		t.Fatalf("err: %v", err)
474	}
475
476	// Make sure the default got set.
477	if sema.opts.SemaphoreWaitTime != DefaultSemaphoreWaitTime {
478		t.Fatalf("bad: %d", sema.opts.SemaphoreWaitTime)
479	}
480
481	// Now set a custom time for the test.
482	opts.SemaphoreWaitTime = 250 * time.Millisecond
483	sema, err = c.SemaphoreOpts(opts)
484	if err != nil {
485		t.Fatalf("err: %v", err)
486	}
487	if sema.opts.SemaphoreWaitTime != 250*time.Millisecond {
488		t.Fatalf("bad: %d", sema.opts.SemaphoreWaitTime)
489	}
490
491	// Should acquire the semaphore.
492	ch, err := sema.Acquire(nil)
493	if err != nil {
494		t.Fatalf("err: %v", err)
495	}
496	if ch == nil {
497		t.Fatalf("should have acquired the semaphore")
498	}
499
500	// Try with another session.
501	another, err := c.SemaphoreOpts(opts)
502	if err != nil {
503		t.Fatalf("err: %v", err)
504	}
505	ch, err = another.Acquire(nil)
506	if err != nil {
507		t.Fatalf("err: %v", err)
508	}
509	if ch == nil {
510		t.Fatalf("should have acquired the semaphore")
511	}
512
513	// Try with a third one that shouldn't get it.
514	contender, err := c.SemaphoreOpts(opts)
515	if err != nil {
516		t.Fatalf("err: %v", err)
517	}
518	start := time.Now()
519	ch, err = contender.Acquire(nil)
520	if err != nil {
521		t.Fatalf("err: %v", err)
522	}
523	if ch != nil {
524		t.Fatalf("should not have acquired the semaphore")
525	}
526	diff := time.Since(start)
527	if diff < contender.opts.SemaphoreWaitTime {
528		t.Fatalf("time out of bounds: %9.6f", diff.Seconds())
529	}
530
531	// Give up a slot and make sure the third one can get it.
532	if err := another.Release(); err != nil {
533		t.Fatalf("err: %v", err)
534	}
535	ch, err = contender.Acquire(nil)
536	if err != nil {
537		t.Fatalf("err: %v", err)
538	}
539	if ch == nil {
540		t.Fatalf("should have acquired the semaphore")
541	}
542}
543