1package api 2 3import ( 4 "log" 5 "net/http" 6 "net/http/httptest" 7 "net/http/httputil" 8 "strings" 9 "sync" 10 "testing" 11 "time" 12) 13 14func createTestSemaphore(t *testing.T, c *Client, prefix string, limit int) (*Semaphore, *Session) { 15 t.Helper() 16 session := c.Session() 17 18 se := &SessionEntry{ 19 Name: DefaultSemaphoreSessionName, 20 TTL: DefaultSemaphoreSessionTTL, 21 Behavior: SessionBehaviorDelete, 22 } 23 id, _, err := session.CreateNoChecks(se, nil) 24 if err != nil { 25 t.Fatalf("err: %v", err) 26 } 27 28 opts := &SemaphoreOptions{ 29 Prefix: prefix, 30 Limit: limit, 31 Session: id, 32 SessionName: se.Name, 33 SessionTTL: se.TTL, 34 } 35 sema, err := c.SemaphoreOpts(opts) 36 if err != nil { 37 t.Fatalf("err: %v", err) 38 } 39 40 return sema, session 41} 42 43func TestAPI_SemaphoreAcquireRelease(t *testing.T) { 44 t.Parallel() 45 c, s := makeClient(t) 46 defer s.Stop() 47 48 sema, session := createTestSemaphore(t, c, "test/semaphore", 2) 49 defer session.Destroy(sema.opts.Session, nil) 50 51 // Initial release should fail 52 err := sema.Release() 53 if err != ErrSemaphoreNotHeld { 54 t.Fatalf("err: %v", err) 55 } 56 57 // Should work 58 lockCh, err := sema.Acquire(nil) 59 if err != nil { 60 t.Fatalf("err: %v", err) 61 } 62 if lockCh == nil { 63 t.Fatalf("not hold") 64 } 65 66 // Double lock should fail 67 _, err = sema.Acquire(nil) 68 if err != ErrSemaphoreHeld { 69 t.Fatalf("err: %v", err) 70 } 71 72 // Should be held 73 select { 74 case <-lockCh: 75 t.Fatalf("should be held") 76 default: 77 } 78 79 // Initial release should work 80 err = sema.Release() 81 if err != nil { 82 t.Fatalf("err: %v", err) 83 } 84 85 // Double unlock should fail 86 err = sema.Release() 87 if err != ErrSemaphoreNotHeld { 88 t.Fatalf("err: %v", err) 89 } 90 91 // Should lose resource 92 select { 93 case <-lockCh: 94 case <-time.After(time.Second): 95 t.Fatalf("should not be held") 96 } 97} 98 99func TestAPI_SemaphoreForceInvalidate(t *testing.T) { 100 t.Parallel() 101 c, s := makeClient(t) 102 defer s.Stop() 103 104 s.WaitForSerfCheck(t) 105 106 sema, session := createTestSemaphore(t, c, "test/semaphore", 2) 107 defer session.Destroy(sema.opts.Session, nil) 108 109 // Should work 110 lockCh, err := sema.Acquire(nil) 111 if err != nil { 112 t.Fatalf("err: %v", err) 113 } 114 if lockCh == nil { 115 t.Fatalf("not acquired") 116 } 117 defer sema.Release() 118 119 go func() { 120 // Nuke the session, simulator an operator invalidation 121 // or a health check failure 122 session := c.Session() 123 session.Destroy(sema.lockSession, nil) 124 }() 125 126 // Should loose slot 127 select { 128 case <-lockCh: 129 case <-time.After(time.Second): 130 t.Fatalf("should not be locked") 131 } 132} 133 134func TestAPI_SemaphoreDeleteKey(t *testing.T) { 135 t.Parallel() 136 c, s := makeClient(t) 137 defer s.Stop() 138 139 s.WaitForSerfCheck(t) 140 141 sema, session := createTestSemaphore(t, c, "test/semaphore", 2) 142 defer session.Destroy(sema.opts.Session, nil) 143 144 // Should work 145 lockCh, err := sema.Acquire(nil) 146 if err != nil { 147 t.Fatalf("err: %v", err) 148 } 149 if lockCh == nil { 150 t.Fatalf("not locked") 151 } 152 defer sema.Release() 153 154 go func() { 155 // Nuke the key, simulate an operator intervention 156 kv := c.KV() 157 kv.DeleteTree("test/semaphore", nil) 158 }() 159 160 // Should loose leadership 161 select { 162 case <-lockCh: 163 case <-time.After(time.Second): 164 t.Fatalf("should not be locked") 165 } 166} 167 168func TestAPI_SemaphoreContend(t *testing.T) { 169 t.Parallel() 170 c, s := makeClient(t) 171 defer s.Stop() 172 173 s.WaitForSerfCheck(t) 174 175 wg := &sync.WaitGroup{} 176 acquired := make([]bool, 4) 177 for idx := range acquired { 178 wg.Add(1) 179 go func(idx int) { 180 defer wg.Done() 181 sema, session := createTestSemaphore(t, c, "test/semaphore", 2) 182 defer session.Destroy(sema.opts.Session, nil) 183 184 // Should work eventually, will contend 185 lockCh, err := sema.Acquire(nil) 186 if err != nil { 187 t.Fatalf("err: %v", err) 188 } 189 if lockCh == nil { 190 t.Fatalf("not locked") 191 } 192 defer sema.Release() 193 log.Printf("Contender %d acquired", idx) 194 195 // Set acquired and then leave 196 acquired[idx] = true 197 }(idx) 198 } 199 200 // Wait for termination 201 doneCh := make(chan struct{}) 202 go func() { 203 wg.Wait() 204 close(doneCh) 205 }() 206 207 // Wait for everybody to get a turn 208 select { 209 case <-doneCh: 210 case <-time.After(3 * DefaultLockRetryTime): 211 t.Fatalf("timeout") 212 } 213 214 for idx, did := range acquired { 215 if !did { 216 t.Fatalf("contender %d never acquired", idx) 217 } 218 } 219} 220 221func TestAPI_SemaphoreBadLimit(t *testing.T) { 222 t.Parallel() 223 c, s := makeClient(t) 224 defer s.Stop() 225 226 s.WaitForSerfCheck(t) 227 228 sema, err := c.SemaphorePrefix("test/semaphore", 0) 229 if err == nil { 230 t.Fatalf("should error, limit must be positive") 231 } 232 233 sema, session := createTestSemaphore(t, c, "test/semaphore", 1) 234 defer session.Destroy(sema.opts.Session, nil) 235 236 _, err = sema.Acquire(nil) 237 if err != nil { 238 t.Fatalf("err: %v", err) 239 } 240 241 sema2, session := createTestSemaphore(t, c, "test/semaphore", 2) 242 defer session.Destroy(sema.opts.Session, nil) 243 244 _, err = sema2.Acquire(nil) 245 if err.Error() != "semaphore limit conflict (lock: 1, local: 2)" { 246 t.Fatalf("err: %v", err) 247 } 248} 249 250func TestAPI_SemaphoreDestroy(t *testing.T) { 251 t.Parallel() 252 c, s := makeClient(t) 253 defer s.Stop() 254 255 s.WaitForSerfCheck(t) 256 257 sema, session := createTestSemaphore(t, c, "test/semaphore", 2) 258 defer session.Destroy(sema.opts.Session, nil) 259 260 sema2, session := createTestSemaphore(t, c, "test/semaphore", 2) 261 defer session.Destroy(sema.opts.Session, nil) 262 263 _, err := sema.Acquire(nil) 264 if err != nil { 265 t.Fatalf("err: %v", err) 266 } 267 268 _, err = sema2.Acquire(nil) 269 if err != nil { 270 t.Fatalf("err: %v", err) 271 } 272 273 // Destroy should fail, still held 274 if err := sema.Destroy(); err != ErrSemaphoreHeld { 275 t.Fatalf("err: %v", err) 276 } 277 278 err = sema.Release() 279 if err != nil { 280 t.Fatalf("err: %v", err) 281 } 282 283 // Destroy should fail, still in use 284 if err := sema.Destroy(); err != ErrSemaphoreInUse { 285 t.Fatalf("err: %v", err) 286 } 287 288 err = sema2.Release() 289 if err != nil { 290 t.Fatalf("err: %v", err) 291 } 292 293 // Destroy should work 294 if err := sema.Destroy(); err != nil { 295 t.Fatalf("err: %v", err) 296 } 297 298 // Destroy should work 299 if err := sema2.Destroy(); err != nil { 300 t.Fatalf("err: %v", err) 301 } 302} 303 304func TestAPI_SemaphoreConflict(t *testing.T) { 305 t.Parallel() 306 c, s := makeClient(t) 307 defer s.Stop() 308 309 s.WaitForSerfCheck(t) 310 lock, session := createTestLock(t, c, "test/sema/.lock") 311 defer session.Destroy(lock.opts.Session, nil) 312 313 // Should work 314 leaderCh, err := lock.Lock(nil) 315 if err != nil { 316 t.Fatalf("err: %v", err) 317 } 318 if leaderCh == nil { 319 t.Fatalf("not leader") 320 } 321 defer lock.Unlock() 322 323 sema, session := createTestSemaphore(t, c, "test/sema/", 2) 324 defer session.Destroy(sema.opts.Session, nil) 325 326 // Should conflict with lock 327 _, err = sema.Acquire(nil) 328 if err != ErrSemaphoreConflict { 329 t.Fatalf("err: %v", err) 330 } 331 332 // Should conflict with lock 333 err = sema.Destroy() 334 if err != ErrSemaphoreConflict { 335 t.Fatalf("err: %v", err) 336 } 337} 338 339func TestAPI_SemaphoreMonitorRetry(t *testing.T) { 340 t.Parallel() 341 raw, s := makeClient(t) 342 defer s.Stop() 343 344 s.WaitForSerfCheck(t) 345 346 // Set up a server that always responds with 500 errors. 347 failer := func(w http.ResponseWriter, req *http.Request) { 348 w.WriteHeader(500) 349 } 350 outage := httptest.NewServer(http.HandlerFunc(failer)) 351 defer outage.Close() 352 353 // Set up a reverse proxy that will send some requests to the 354 // 500 server and pass everything else through to the real Consul 355 // server. 356 var mutex sync.Mutex 357 errors := 0 358 director := func(req *http.Request) { 359 mutex.Lock() 360 defer mutex.Unlock() 361 362 req.URL.Scheme = "http" 363 if errors > 0 && req.Method == "GET" && strings.Contains(req.URL.Path, "/v1/kv/test/sema/.lock") { 364 req.URL.Host = outage.URL[7:] // Strip off "http://". 365 errors-- 366 } else { 367 req.URL.Host = raw.config.Address 368 } 369 } 370 proxy := httptest.NewServer(&httputil.ReverseProxy{Director: director}) 371 defer proxy.Close() 372 373 // Make another client that points at the proxy instead of the real 374 // Consul server. 375 config := raw.config 376 config.Address = proxy.URL[7:] // Strip off "http://". 377 c, err := NewClient(&config) 378 if err != nil { 379 t.Fatalf("err: %v", err) 380 } 381 382 // Set up a lock with retries enabled. 383 opts := &SemaphoreOptions{ 384 Prefix: "test/sema/.lock", 385 Limit: 2, 386 SessionTTL: "60s", 387 MonitorRetries: 3, 388 } 389 sema, err := c.SemaphoreOpts(opts) 390 if err != nil { 391 t.Fatalf("err: %v", err) 392 } 393 394 // Make sure the default got set. 395 if sema.opts.MonitorRetryTime != DefaultMonitorRetryTime { 396 t.Fatalf("bad: %d", sema.opts.MonitorRetryTime) 397 } 398 399 // Now set a custom time for the test. 400 opts.MonitorRetryTime = 250 * time.Millisecond 401 sema, err = c.SemaphoreOpts(opts) 402 if err != nil { 403 t.Fatalf("err: %v", err) 404 } 405 if sema.opts.MonitorRetryTime != 250*time.Millisecond { 406 t.Fatalf("bad: %d", sema.opts.MonitorRetryTime) 407 } 408 409 // Should get the lock. 410 ch, err := sema.Acquire(nil) 411 if err != nil { 412 t.Fatalf("err: %v", err) 413 } 414 if ch == nil { 415 t.Fatalf("didn't acquire") 416 } 417 418 // Take the semaphore using the raw client to force the monitor to wake 419 // up and check the lock again. This time we will return errors for some 420 // of the responses. 421 mutex.Lock() 422 errors = 2 423 mutex.Unlock() 424 another, err := raw.SemaphoreOpts(opts) 425 if err != nil { 426 t.Fatalf("err: %v", err) 427 } 428 if _, err := another.Acquire(nil); err != nil { 429 t.Fatalf("err: %v", err) 430 } 431 time.Sleep(5 * opts.MonitorRetryTime) 432 433 // Should still have the semaphore. 434 select { 435 case <-ch: 436 t.Fatalf("lost the semaphore") 437 default: 438 } 439 440 // Now return an overwhelming number of errors, using the raw client to 441 // poke the key and get the monitor to run again. 442 mutex.Lock() 443 errors = 10 444 mutex.Unlock() 445 if err := another.Release(); err != nil { 446 t.Fatalf("err: %v", err) 447 } 448 time.Sleep(5 * opts.MonitorRetryTime) 449 450 // Should lose the semaphore. 451 select { 452 case <-ch: 453 case <-time.After(time.Second): 454 t.Fatalf("should not have the semaphore") 455 } 456} 457 458func TestAPI_SemaphoreOneShot(t *testing.T) { 459 t.Parallel() 460 c, s := makeClient(t) 461 defer s.Stop() 462 463 s.WaitForSerfCheck(t) 464 465 // Set up a semaphore as a one-shot. 466 opts := &SemaphoreOptions{ 467 Prefix: "test/sema/.lock", 468 Limit: 2, 469 SemaphoreTryOnce: true, 470 } 471 sema, err := c.SemaphoreOpts(opts) 472 if err != nil { 473 t.Fatalf("err: %v", err) 474 } 475 476 // Make sure the default got set. 477 if sema.opts.SemaphoreWaitTime != DefaultSemaphoreWaitTime { 478 t.Fatalf("bad: %d", sema.opts.SemaphoreWaitTime) 479 } 480 481 // Now set a custom time for the test. 482 opts.SemaphoreWaitTime = 250 * time.Millisecond 483 sema, err = c.SemaphoreOpts(opts) 484 if err != nil { 485 t.Fatalf("err: %v", err) 486 } 487 if sema.opts.SemaphoreWaitTime != 250*time.Millisecond { 488 t.Fatalf("bad: %d", sema.opts.SemaphoreWaitTime) 489 } 490 491 // Should acquire the semaphore. 492 ch, err := sema.Acquire(nil) 493 if err != nil { 494 t.Fatalf("err: %v", err) 495 } 496 if ch == nil { 497 t.Fatalf("should have acquired the semaphore") 498 } 499 500 // Try with another session. 501 another, err := c.SemaphoreOpts(opts) 502 if err != nil { 503 t.Fatalf("err: %v", err) 504 } 505 ch, err = another.Acquire(nil) 506 if err != nil { 507 t.Fatalf("err: %v", err) 508 } 509 if ch == nil { 510 t.Fatalf("should have acquired the semaphore") 511 } 512 513 // Try with a third one that shouldn't get it. 514 contender, err := c.SemaphoreOpts(opts) 515 if err != nil { 516 t.Fatalf("err: %v", err) 517 } 518 start := time.Now() 519 ch, err = contender.Acquire(nil) 520 if err != nil { 521 t.Fatalf("err: %v", err) 522 } 523 if ch != nil { 524 t.Fatalf("should not have acquired the semaphore") 525 } 526 diff := time.Since(start) 527 if diff < contender.opts.SemaphoreWaitTime { 528 t.Fatalf("time out of bounds: %9.6f", diff.Seconds()) 529 } 530 531 // Give up a slot and make sure the third one can get it. 532 if err := another.Release(); err != nil { 533 t.Fatalf("err: %v", err) 534 } 535 ch, err = contender.Acquire(nil) 536 if err != nil { 537 t.Fatalf("err: %v", err) 538 } 539 if ch == nil { 540 t.Fatalf("should have acquired the semaphore") 541 } 542} 543