1package api 2 3import ( 4 "log" 5 "net/http" 6 "net/http/httptest" 7 "net/http/httputil" 8 "strings" 9 "sync" 10 "testing" 11 "time" 12) 13 14func createTestSemaphore(t *testing.T, c *Client, prefix string, limit int) (*Semaphore, *Session) { 15 t.Helper() 16 session := c.Session() 17 18 se := &SessionEntry{ 19 Name: DefaultSemaphoreSessionName, 20 TTL: DefaultSemaphoreSessionTTL, 21 Behavior: SessionBehaviorDelete, 22 } 23 id, _, err := session.CreateNoChecks(se, nil) 24 if err != nil { 25 t.Fatalf("err: %v", err) 26 } 27 28 opts := &SemaphoreOptions{ 29 Prefix: prefix, 30 Limit: limit, 31 Session: id, 32 SessionName: se.Name, 33 SessionTTL: se.TTL, 34 } 35 sema, err := c.SemaphoreOpts(opts) 36 if err != nil { 37 t.Fatalf("err: %v", err) 38 } 39 40 return sema, session 41} 42 43func TestAPI_SemaphoreAcquireRelease(t *testing.T) { 44 t.Parallel() 45 c, s := makeClient(t) 46 defer s.Stop() 47 48 sema, session := createTestSemaphore(t, c, "test/semaphore", 2) 49 defer session.Destroy(sema.opts.Session, nil) 50 51 // Initial release should fail 52 err := sema.Release() 53 if err != ErrSemaphoreNotHeld { 54 t.Fatalf("err: %v", err) 55 } 56 57 // Should work 58 lockCh, err := sema.Acquire(nil) 59 if err != nil { 60 t.Fatalf("err: %v", err) 61 } 62 if lockCh == nil { 63 t.Fatalf("not hold") 64 } 65 66 // Double lock should fail 67 _, err = sema.Acquire(nil) 68 if err != ErrSemaphoreHeld { 69 t.Fatalf("err: %v", err) 70 } 71 72 // Should be held 73 select { 74 case <-lockCh: 75 t.Fatalf("should be held") 76 default: 77 } 78 79 // Initial release should work 80 err = sema.Release() 81 if err != nil { 82 t.Fatalf("err: %v", err) 83 } 84 85 // Double unlock should fail 86 err = sema.Release() 87 if err != ErrSemaphoreNotHeld { 88 t.Fatalf("err: %v", err) 89 } 90 91 // Should lose resource 92 select { 93 case <-lockCh: 94 case <-time.After(time.Second): 95 t.Fatalf("should not be held") 96 } 97} 98 99func TestAPI_SemaphoreForceInvalidate(t *testing.T) { 100 t.Parallel() 101 c, s := makeClient(t) 102 defer s.Stop() 103 104 s.WaitForSerfCheck(t) 105 106 sema, session := createTestSemaphore(t, c, "test/semaphore", 2) 107 defer session.Destroy(sema.opts.Session, nil) 108 109 // Should work 110 lockCh, err := sema.Acquire(nil) 111 if err != nil { 112 t.Fatalf("err: %v", err) 113 } 114 if lockCh == nil { 115 t.Fatalf("not acquired") 116 } 117 defer sema.Release() 118 119 go func() { 120 // Nuke the session, simulator an operator invalidation 121 // or a health check failure 122 session := c.Session() 123 session.Destroy(sema.lockSession, nil) 124 }() 125 126 // Should loose slot 127 select { 128 case <-lockCh: 129 case <-time.After(time.Second): 130 t.Fatalf("should not be locked") 131 } 132} 133 134func TestAPI_SemaphoreDeleteKey(t *testing.T) { 135 t.Parallel() 136 c, s := makeClient(t) 137 defer s.Stop() 138 139 s.WaitForSerfCheck(t) 140 141 sema, session := createTestSemaphore(t, c, "test/semaphore", 2) 142 defer session.Destroy(sema.opts.Session, nil) 143 144 // Should work 145 lockCh, err := sema.Acquire(nil) 146 if err != nil { 147 t.Fatalf("err: %v", err) 148 } 149 if lockCh == nil { 150 t.Fatalf("not locked") 151 } 152 defer sema.Release() 153 154 go func() { 155 // Nuke the key, simulate an operator intervention 156 kv := c.KV() 157 kv.DeleteTree("test/semaphore", nil) 158 }() 159 160 // Should loose leadership 161 select { 162 case <-lockCh: 163 case <-time.After(time.Second): 164 t.Fatalf("should not be locked") 165 } 166} 167 168func TestAPI_SemaphoreContend(t *testing.T) { 169 t.Parallel() 170 c, s := makeClient(t) 171 defer s.Stop() 172 173 s.WaitForSerfCheck(t) 174 175 wg := &sync.WaitGroup{} 176 acquired := make([]bool, 4) 177 for idx := range acquired { 178 wg.Add(1) 179 go func(idx int) { 180 defer wg.Done() 181 sema, session := createTestSemaphore(t, c, "test/semaphore", 2) 182 defer session.Destroy(sema.opts.Session, nil) 183 184 // Should work eventually, will contend 185 lockCh, err := sema.Acquire(nil) 186 if err != nil { 187 t.Errorf("err: %v", err) 188 return 189 } 190 if lockCh == nil { 191 t.Errorf("not locked") 192 return 193 } 194 defer sema.Release() 195 log.Printf("Contender %d acquired", idx) 196 197 // Set acquired and then leave 198 acquired[idx] = true 199 }(idx) 200 } 201 202 // Wait for termination 203 doneCh := make(chan struct{}) 204 go func() { 205 wg.Wait() 206 close(doneCh) 207 }() 208 209 // Wait for everybody to get a turn 210 select { 211 case <-doneCh: 212 case <-time.After(3 * DefaultLockRetryTime): 213 t.Fatalf("timeout") 214 } 215 216 for idx, did := range acquired { 217 if !did { 218 t.Fatalf("contender %d never acquired", idx) 219 } 220 } 221} 222 223func TestAPI_SemaphoreBadLimit(t *testing.T) { 224 t.Parallel() 225 c, s := makeClient(t) 226 defer s.Stop() 227 228 s.WaitForSerfCheck(t) 229 230 _, err := c.SemaphorePrefix("test/semaphore", 0) 231 if err == nil { 232 t.Fatalf("should error, limit must be positive") 233 } 234 235 sema, session := createTestSemaphore(t, c, "test/semaphore", 1) 236 defer session.Destroy(sema.opts.Session, nil) 237 238 _, err = sema.Acquire(nil) 239 if err != nil { 240 t.Fatalf("err: %v", err) 241 } 242 243 sema2, session := createTestSemaphore(t, c, "test/semaphore", 2) 244 defer session.Destroy(sema.opts.Session, nil) 245 246 _, err = sema2.Acquire(nil) 247 if err.Error() != "semaphore limit conflict (lock: 1, local: 2)" { 248 t.Fatalf("err: %v", err) 249 } 250} 251 252func TestAPI_SemaphoreDestroy(t *testing.T) { 253 t.Parallel() 254 c, s := makeClient(t) 255 defer s.Stop() 256 257 s.WaitForSerfCheck(t) 258 259 sema, session := createTestSemaphore(t, c, "test/semaphore", 2) 260 defer session.Destroy(sema.opts.Session, nil) 261 262 sema2, session := createTestSemaphore(t, c, "test/semaphore", 2) 263 defer session.Destroy(sema.opts.Session, nil) 264 265 _, err := sema.Acquire(nil) 266 if err != nil { 267 t.Fatalf("err: %v", err) 268 } 269 270 _, err = sema2.Acquire(nil) 271 if err != nil { 272 t.Fatalf("err: %v", err) 273 } 274 275 // Destroy should fail, still held 276 if err := sema.Destroy(); err != ErrSemaphoreHeld { 277 t.Fatalf("err: %v", err) 278 } 279 280 err = sema.Release() 281 if err != nil { 282 t.Fatalf("err: %v", err) 283 } 284 285 // Destroy should fail, still in use 286 if err := sema.Destroy(); err != ErrSemaphoreInUse { 287 t.Fatalf("err: %v", err) 288 } 289 290 err = sema2.Release() 291 if err != nil { 292 t.Fatalf("err: %v", err) 293 } 294 295 // Destroy should work 296 if err := sema.Destroy(); err != nil { 297 t.Fatalf("err: %v", err) 298 } 299 300 // Destroy should work 301 if err := sema2.Destroy(); err != nil { 302 t.Fatalf("err: %v", err) 303 } 304} 305 306func TestAPI_SemaphoreConflict(t *testing.T) { 307 t.Parallel() 308 c, s := makeClient(t) 309 defer s.Stop() 310 311 s.WaitForSerfCheck(t) 312 lock, session := createTestLock(t, c, "test/sema/.lock") 313 defer session.Destroy(lock.opts.Session, nil) 314 315 // Should work 316 leaderCh, err := lock.Lock(nil) 317 if err != nil { 318 t.Fatalf("err: %v", err) 319 } 320 if leaderCh == nil { 321 t.Fatalf("not leader") 322 } 323 defer lock.Unlock() 324 325 sema, session := createTestSemaphore(t, c, "test/sema/", 2) 326 defer session.Destroy(sema.opts.Session, nil) 327 328 // Should conflict with lock 329 _, err = sema.Acquire(nil) 330 if err != ErrSemaphoreConflict { 331 t.Fatalf("err: %v", err) 332 } 333 334 // Should conflict with lock 335 err = sema.Destroy() 336 if err != ErrSemaphoreConflict { 337 t.Fatalf("err: %v", err) 338 } 339} 340 341func TestAPI_SemaphoreMonitorRetry(t *testing.T) { 342 t.Parallel() 343 raw, s := makeClient(t) 344 defer s.Stop() 345 346 s.WaitForSerfCheck(t) 347 348 // Set up a server that always responds with 500 errors. 349 failer := func(w http.ResponseWriter, req *http.Request) { 350 w.WriteHeader(500) 351 } 352 outage := httptest.NewServer(http.HandlerFunc(failer)) 353 defer outage.Close() 354 355 // Set up a reverse proxy that will send some requests to the 356 // 500 server and pass everything else through to the real Consul 357 // server. 358 var mutex sync.Mutex 359 errors := 0 360 director := func(req *http.Request) { 361 mutex.Lock() 362 defer mutex.Unlock() 363 364 req.URL.Scheme = "http" 365 if errors > 0 && req.Method == "GET" && strings.Contains(req.URL.Path, "/v1/kv/test/sema/.lock") { 366 req.URL.Host = outage.URL[7:] // Strip off "http://". 367 errors-- 368 } else { 369 req.URL.Host = raw.config.Address 370 } 371 } 372 proxy := httptest.NewServer(&httputil.ReverseProxy{Director: director}) 373 defer proxy.Close() 374 375 // Make another client that points at the proxy instead of the real 376 // Consul server. 377 config := raw.config 378 config.Address = proxy.URL[7:] // Strip off "http://". 379 c, err := NewClient(&config) 380 if err != nil { 381 t.Fatalf("err: %v", err) 382 } 383 384 // Set up a lock with retries enabled. 385 opts := &SemaphoreOptions{ 386 Prefix: "test/sema/.lock", 387 Limit: 2, 388 SessionTTL: "60s", 389 MonitorRetries: 3, 390 } 391 sema, err := c.SemaphoreOpts(opts) 392 if err != nil { 393 t.Fatalf("err: %v", err) 394 } 395 396 // Make sure the default got set. 397 if sema.opts.MonitorRetryTime != DefaultMonitorRetryTime { 398 t.Fatalf("bad: %d", sema.opts.MonitorRetryTime) 399 } 400 401 // Now set a custom time for the test. 402 opts.MonitorRetryTime = 250 * time.Millisecond 403 sema, err = c.SemaphoreOpts(opts) 404 if err != nil { 405 t.Fatalf("err: %v", err) 406 } 407 if sema.opts.MonitorRetryTime != 250*time.Millisecond { 408 t.Fatalf("bad: %d", sema.opts.MonitorRetryTime) 409 } 410 411 // Should get the lock. 412 ch, err := sema.Acquire(nil) 413 if err != nil { 414 t.Fatalf("err: %v", err) 415 } 416 if ch == nil { 417 t.Fatalf("didn't acquire") 418 } 419 420 // Take the semaphore using the raw client to force the monitor to wake 421 // up and check the lock again. This time we will return errors for some 422 // of the responses. 423 mutex.Lock() 424 errors = 2 425 mutex.Unlock() 426 another, err := raw.SemaphoreOpts(opts) 427 if err != nil { 428 t.Fatalf("err: %v", err) 429 } 430 if _, err := another.Acquire(nil); err != nil { 431 t.Fatalf("err: %v", err) 432 } 433 time.Sleep(5 * opts.MonitorRetryTime) 434 435 // Should still have the semaphore. 436 select { 437 case <-ch: 438 t.Fatalf("lost the semaphore") 439 default: 440 } 441 442 // Now return an overwhelming number of errors, using the raw client to 443 // poke the key and get the monitor to run again. 444 mutex.Lock() 445 errors = 10 446 mutex.Unlock() 447 if err := another.Release(); err != nil { 448 t.Fatalf("err: %v", err) 449 } 450 time.Sleep(5 * opts.MonitorRetryTime) 451 452 // Should lose the semaphore. 453 select { 454 case <-ch: 455 case <-time.After(time.Second): 456 t.Fatalf("should not have the semaphore") 457 } 458} 459 460func TestAPI_SemaphoreOneShot(t *testing.T) { 461 t.Parallel() 462 c, s := makeClient(t) 463 defer s.Stop() 464 465 s.WaitForSerfCheck(t) 466 467 // Set up a semaphore as a one-shot. 468 opts := &SemaphoreOptions{ 469 Prefix: "test/sema/.lock", 470 Limit: 2, 471 SemaphoreTryOnce: true, 472 } 473 sema, err := c.SemaphoreOpts(opts) 474 if err != nil { 475 t.Fatalf("err: %v", err) 476 } 477 478 // Make sure the default got set. 479 if sema.opts.SemaphoreWaitTime != DefaultSemaphoreWaitTime { 480 t.Fatalf("bad: %d", sema.opts.SemaphoreWaitTime) 481 } 482 483 // Now set a custom time for the test. 484 opts.SemaphoreWaitTime = 250 * time.Millisecond 485 sema, err = c.SemaphoreOpts(opts) 486 if err != nil { 487 t.Fatalf("err: %v", err) 488 } 489 if sema.opts.SemaphoreWaitTime != 250*time.Millisecond { 490 t.Fatalf("bad: %d", sema.opts.SemaphoreWaitTime) 491 } 492 493 // Should acquire the semaphore. 494 ch, err := sema.Acquire(nil) 495 if err != nil { 496 t.Fatalf("err: %v", err) 497 } 498 if ch == nil { 499 t.Fatalf("should have acquired the semaphore") 500 } 501 502 // Try with another session. 503 another, err := c.SemaphoreOpts(opts) 504 if err != nil { 505 t.Fatalf("err: %v", err) 506 } 507 ch, err = another.Acquire(nil) 508 if err != nil { 509 t.Fatalf("err: %v", err) 510 } 511 if ch == nil { 512 t.Fatalf("should have acquired the semaphore") 513 } 514 515 // Try with a third one that shouldn't get it. 516 contender, err := c.SemaphoreOpts(opts) 517 if err != nil { 518 t.Fatalf("err: %v", err) 519 } 520 start := time.Now() 521 ch, err = contender.Acquire(nil) 522 if err != nil { 523 t.Fatalf("err: %v", err) 524 } 525 if ch != nil { 526 t.Fatalf("should not have acquired the semaphore") 527 } 528 diff := time.Since(start) 529 if diff < contender.opts.SemaphoreWaitTime { 530 t.Fatalf("time out of bounds: %9.6f", diff.Seconds()) 531 } 532 533 // Give up a slot and make sure the third one can get it. 534 if err := another.Release(); err != nil { 535 t.Fatalf("err: %v", err) 536 } 537 ch, err = contender.Acquire(nil) 538 if err != nil { 539 t.Fatalf("err: %v", err) 540 } 541 if ch == nil { 542 t.Fatalf("should have acquired the semaphore") 543 } 544} 545