1package api 2 3import ( 4 "log" 5 "net/http" 6 "net/http/httptest" 7 "net/http/httputil" 8 "strings" 9 "sync" 10 "testing" 11 "time" 12 13 "github.com/hashicorp/consul/sdk/testutil/retry" 14) 15 16func createTestLock(t *testing.T, c *Client, key string) (*Lock, *Session) { 17 t.Helper() 18 session := c.Session() 19 20 se := &SessionEntry{ 21 Name: DefaultLockSessionName, 22 TTL: DefaultLockSessionTTL, 23 Behavior: SessionBehaviorDelete, 24 } 25 id, _, err := session.CreateNoChecks(se, nil) 26 if err != nil { 27 t.Fatalf("err: %v", err) 28 } 29 30 opts := &LockOptions{ 31 Key: key, 32 Session: id, 33 SessionName: se.Name, 34 SessionTTL: se.TTL, 35 } 36 lock, err := c.LockOpts(opts) 37 if err != nil { 38 t.Fatalf("err: %v", err) 39 } 40 41 return lock, session 42} 43 44func TestAPI_LockLockUnlock(t *testing.T) { 45 t.Parallel() 46 c, s := makeClientWithoutConnect(t) 47 defer s.Stop() 48 49 lock, session := createTestLock(t, c, "test/lock") 50 defer session.Destroy(lock.opts.Session, nil) 51 52 // Initial unlock should fail 53 err := lock.Unlock() 54 if err != ErrLockNotHeld { 55 t.Fatalf("err: %v", err) 56 } 57 58 // Should work 59 leaderCh, err := lock.Lock(nil) 60 if err != nil { 61 t.Fatalf("err: %v", err) 62 } 63 if leaderCh == nil { 64 t.Fatalf("not leader") 65 } 66 67 // Double lock should fail 68 _, err = lock.Lock(nil) 69 if err != ErrLockHeld { 70 t.Fatalf("err: %v", err) 71 } 72 73 // Should be leader 74 select { 75 case <-leaderCh: 76 t.Fatalf("should be leader") 77 default: 78 } 79 80 // Initial unlock should work 81 err = lock.Unlock() 82 if err != nil { 83 t.Fatalf("err: %v", err) 84 } 85 86 // Double unlock should fail 87 err = lock.Unlock() 88 if err != ErrLockNotHeld { 89 t.Fatalf("err: %v", err) 90 } 91 92 // Should lose leadership 93 select { 94 case <-leaderCh: 95 case <-time.After(time.Second): 96 t.Fatalf("should not be leader") 97 } 98} 99 100func TestAPI_LockForceInvalidate(t *testing.T) { 101 t.Parallel() 102 c, s := makeClientWithoutConnect(t) 103 defer s.Stop() 104 105 retry.Run(t, func(r *retry.R) { 106 lock, session := createTestLock(t, c, "test/lock") 107 defer session.Destroy(lock.opts.Session, nil) 108 109 // Should work 110 leaderCh, err := lock.Lock(nil) 111 if err != nil { 112 r.Fatalf("err: %v", err) 113 } 114 if leaderCh == nil { 115 r.Fatalf("not leader") 116 } 117 defer lock.Unlock() 118 119 go func() { 120 // Nuke the session, simulator an operator invalidation 121 // or a health check failure 122 session := c.Session() 123 session.Destroy(lock.lockSession, nil) 124 }() 125 126 // Should loose leadership 127 select { 128 case <-leaderCh: 129 case <-time.After(time.Second): 130 r.Fatalf("should not be leader") 131 } 132 }) 133} 134 135func TestAPI_LockDeleteKey(t *testing.T) { 136 t.Parallel() 137 c, s := makeClientWithoutConnect(t) 138 defer s.Stop() 139 140 // This uncovered some issues around special-case handling of low index 141 // numbers where it would work with a low number but fail for higher 142 // ones, so we loop this a bit to sweep the index up out of that 143 // territory. 144 for i := 0; i < 10; i++ { 145 func() { 146 lock, session := createTestLock(t, c, "test/lock") 147 defer session.Destroy(lock.opts.Session, nil) 148 149 // Should work 150 leaderCh, err := lock.Lock(nil) 151 if err != nil { 152 t.Fatalf("err: %v", err) 153 } 154 if leaderCh == nil { 155 t.Fatalf("not leader") 156 } 157 defer lock.Unlock() 158 159 go func() { 160 // Nuke the key, simulate an operator intervention 161 kv := c.KV() 162 kv.Delete("test/lock", nil) 163 }() 164 165 // Should loose leadership 166 select { 167 case <-leaderCh: 168 case <-time.After(10 * time.Second): 169 t.Fatalf("should not be leader") 170 } 171 }() 172 } 173} 174 175func TestAPI_LockContend(t *testing.T) { 176 t.Parallel() 177 c, s := makeClientWithoutConnect(t) 178 defer s.Stop() 179 180 wg := &sync.WaitGroup{} 181 acquired := make([]bool, 3) 182 for idx := range acquired { 183 wg.Add(1) 184 go func(idx int) { 185 defer wg.Done() 186 lock, session := createTestLock(t, c, "test/lock") 187 defer session.Destroy(lock.opts.Session, nil) 188 189 // Should work eventually, will contend 190 leaderCh, err := lock.Lock(nil) 191 if err != nil { 192 t.Errorf("err: %v", err) 193 return 194 } 195 if leaderCh == nil { 196 t.Errorf("not leader") 197 return 198 } 199 defer lock.Unlock() 200 log.Printf("Contender %d acquired", idx) 201 202 // Set acquired and then leave 203 acquired[idx] = true 204 }(idx) 205 } 206 207 // Wait for termination 208 doneCh := make(chan struct{}) 209 go func() { 210 wg.Wait() 211 close(doneCh) 212 }() 213 214 // Wait for everybody to get a turn 215 select { 216 case <-doneCh: 217 case <-time.After(3 * DefaultLockRetryTime): 218 t.Fatalf("timeout") 219 } 220 221 for idx, did := range acquired { 222 if !did { 223 t.Fatalf("contender %d never acquired", idx) 224 } 225 } 226} 227 228func TestAPI_LockDestroy(t *testing.T) { 229 t.Parallel() 230 c, s := makeClientWithoutConnect(t) 231 defer s.Stop() 232 233 lock, session := createTestLock(t, c, "test/lock") 234 defer session.Destroy(lock.opts.Session, nil) 235 236 // Should work 237 leaderCh, err := lock.Lock(nil) 238 if err != nil { 239 t.Fatalf("err: %v", err) 240 } 241 if leaderCh == nil { 242 t.Fatalf("not leader") 243 } 244 245 // Destroy should fail 246 if err := lock.Destroy(); err != ErrLockHeld { 247 t.Fatalf("err: %v", err) 248 } 249 250 // Should be able to release 251 err = lock.Unlock() 252 if err != nil { 253 t.Fatalf("err: %v", err) 254 } 255 256 // Acquire with a different lock 257 l2, session := createTestLock(t, c, "test/lock") 258 defer session.Destroy(lock.opts.Session, nil) 259 260 // Should work 261 leaderCh, err = l2.Lock(nil) 262 if err != nil { 263 t.Fatalf("err: %v", err) 264 } 265 if leaderCh == nil { 266 t.Fatalf("not leader") 267 } 268 269 // Destroy should still fail 270 if err := lock.Destroy(); err != ErrLockInUse { 271 t.Fatalf("err: %v", err) 272 } 273 274 // Should release 275 err = l2.Unlock() 276 if err != nil { 277 t.Fatalf("err: %v", err) 278 } 279 280 // Destroy should work 281 err = lock.Destroy() 282 if err != nil { 283 t.Fatalf("err: %v", err) 284 } 285 286 // Double destroy should work 287 err = l2.Destroy() 288 if err != nil { 289 t.Fatalf("err: %v", err) 290 } 291} 292 293func TestAPI_LockConflict(t *testing.T) { 294 t.Parallel() 295 c, s := makeClientWithoutConnect(t) 296 defer s.Stop() 297 298 sema, session := createTestSemaphore(t, c, "test/lock/", 2) 299 defer session.Destroy(sema.opts.Session, nil) 300 301 // Should work 302 lockCh, err := sema.Acquire(nil) 303 if err != nil { 304 t.Fatalf("err: %v", err) 305 } 306 if lockCh == nil { 307 t.Fatalf("not hold") 308 } 309 defer sema.Release() 310 311 lock, session := createTestLock(t, c, "test/lock/.lock") 312 defer session.Destroy(lock.opts.Session, nil) 313 314 // Should conflict with semaphore 315 _, err = lock.Lock(nil) 316 if err != ErrLockConflict { 317 t.Fatalf("err: %v", err) 318 } 319 320 // Should conflict with semaphore 321 err = lock.Destroy() 322 if err != ErrLockConflict { 323 t.Fatalf("err: %v", err) 324 } 325} 326 327func TestAPI_LockReclaimLock(t *testing.T) { 328 t.Parallel() 329 c, s := makeClientWithoutConnect(t) 330 defer s.Stop() 331 332 s.WaitForSerfCheck(t) 333 334 session, _, err := c.Session().Create(&SessionEntry{}, nil) 335 if err != nil { 336 t.Fatalf("err: %v", err) 337 } 338 339 lock, err := c.LockOpts(&LockOptions{Key: "test/lock", Session: session}) 340 if err != nil { 341 t.Fatalf("err: %v", err) 342 } 343 344 // Should work 345 leaderCh, err := lock.Lock(nil) 346 if err != nil { 347 t.Fatalf("err: %v", err) 348 } 349 if leaderCh == nil { 350 t.Fatalf("not leader") 351 } 352 defer lock.Unlock() 353 354 l2, err := c.LockOpts(&LockOptions{Key: "test/lock", Session: session}) 355 if err != nil { 356 t.Fatalf("err: %v", err) 357 } 358 359 reclaimed := make(chan (<-chan struct{}), 1) 360 go func() { 361 l2Ch, err := l2.Lock(nil) 362 if err != nil { 363 t.Errorf("not locked: %v", err) 364 } 365 reclaimed <- l2Ch 366 }() 367 368 // Should reclaim the lock 369 var leader2Ch <-chan struct{} 370 371 select { 372 case leader2Ch = <-reclaimed: 373 case <-time.After(time.Second): 374 t.Fatalf("should have locked") 375 } 376 377 // unlock should work 378 err = l2.Unlock() 379 if err != nil { 380 t.Fatalf("err: %v", err) 381 } 382 383 //Both locks should see the unlock 384 select { 385 case <-leader2Ch: 386 case <-time.After(time.Second): 387 t.Fatalf("should not be leader") 388 } 389 390 select { 391 case <-leaderCh: 392 case <-time.After(time.Second): 393 t.Fatalf("should not be leader") 394 } 395} 396 397func TestAPI_LockMonitorRetry(t *testing.T) { 398 t.Parallel() 399 raw, s := makeClientWithoutConnect(t) 400 defer s.Stop() 401 402 s.WaitForSerfCheck(t) 403 404 // Set up a server that always responds with 500 errors. 405 failer := func(w http.ResponseWriter, req *http.Request) { 406 w.WriteHeader(500) 407 } 408 outage := httptest.NewServer(http.HandlerFunc(failer)) 409 defer outage.Close() 410 411 // Set up a reverse proxy that will send some requests to the 412 // 500 server and pass everything else through to the real Consul 413 // server. 414 var mutex sync.Mutex 415 errors := 0 416 director := func(req *http.Request) { 417 mutex.Lock() 418 defer mutex.Unlock() 419 420 req.URL.Scheme = "http" 421 if errors > 0 && req.Method == "GET" && strings.Contains(req.URL.Path, "/v1/kv/test/lock") { 422 req.URL.Host = outage.URL[7:] // Strip off "http://". 423 errors-- 424 } else { 425 req.URL.Host = raw.config.Address 426 } 427 } 428 proxy := httptest.NewServer(&httputil.ReverseProxy{Director: director}) 429 defer proxy.Close() 430 431 // Make another client that points at the proxy instead of the real 432 // Consul server. 433 config := raw.config 434 config.Address = proxy.URL[7:] // Strip off "http://". 435 c, err := NewClient(&config) 436 if err != nil { 437 t.Fatalf("err: %v", err) 438 } 439 440 // Set up a lock with retries enabled. 441 opts := &LockOptions{ 442 Key: "test/lock", 443 SessionTTL: "60s", 444 MonitorRetries: 3, 445 } 446 lock, err := c.LockOpts(opts) 447 if err != nil { 448 t.Fatalf("err: %v", err) 449 } 450 451 // Make sure the default got set. 452 if lock.opts.MonitorRetryTime != DefaultMonitorRetryTime { 453 t.Fatalf("bad: %d", lock.opts.MonitorRetryTime) 454 } 455 456 // Now set a custom time for the test. 457 opts.MonitorRetryTime = 250 * time.Millisecond 458 lock, err = c.LockOpts(opts) 459 if err != nil { 460 t.Fatalf("err: %v", err) 461 } 462 if lock.opts.MonitorRetryTime != 250*time.Millisecond { 463 t.Fatalf("bad: %d", lock.opts.MonitorRetryTime) 464 } 465 466 // Should get the lock. 467 leaderCh, err := lock.Lock(nil) 468 if err != nil { 469 t.Fatalf("err: %v", err) 470 } 471 if leaderCh == nil { 472 t.Fatalf("not leader") 473 } 474 475 // Poke the key using the raw client to force the monitor to wake up 476 // and check the lock again. This time we will return errors for some 477 // of the responses. 478 mutex.Lock() 479 errors = 2 480 mutex.Unlock() 481 pair, _, err := raw.KV().Get("test/lock", &QueryOptions{}) 482 if err != nil { 483 t.Fatalf("err: %v", err) 484 } 485 pair.Value = []byte{1} 486 if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil { 487 t.Fatalf("err: %v", err) 488 } 489 time.Sleep(5 * opts.MonitorRetryTime) 490 491 // Should still be the leader. 492 select { 493 case <-leaderCh: 494 t.Fatalf("should be leader") 495 default: 496 } 497 498 // Now return an overwhelming number of errors. 499 mutex.Lock() 500 errors = 10 501 mutex.Unlock() 502 pair.Value = []byte{2} 503 if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil { 504 t.Fatalf("err: %v", err) 505 } 506 time.Sleep(5 * opts.MonitorRetryTime) 507 508 // Should lose leadership. 509 select { 510 case <-leaderCh: 511 case <-time.After(time.Second): 512 t.Fatalf("should not be leader") 513 } 514} 515 516func TestAPI_LockOneShot(t *testing.T) { 517 t.Parallel() 518 c, s := makeClientWithoutConnect(t) 519 defer s.Stop() 520 521 s.WaitForSerfCheck(t) 522 523 // Set up a lock as a one-shot. 524 opts := &LockOptions{ 525 Key: "test/lock", 526 LockTryOnce: true, 527 } 528 lock, err := c.LockOpts(opts) 529 if err != nil { 530 t.Fatalf("err: %v", err) 531 } 532 533 // Make sure the default got set. 534 if lock.opts.LockWaitTime != DefaultLockWaitTime { 535 t.Fatalf("bad: %d", lock.opts.LockWaitTime) 536 } 537 538 // Now set a custom time for the test. 539 opts.LockWaitTime = 250 * time.Millisecond 540 lock, err = c.LockOpts(opts) 541 if err != nil { 542 t.Fatalf("err: %v", err) 543 } 544 if lock.opts.LockWaitTime != 250*time.Millisecond { 545 t.Fatalf("bad: %d", lock.opts.LockWaitTime) 546 } 547 548 // Should get the lock. 549 ch, err := lock.Lock(nil) 550 if err != nil { 551 t.Fatalf("err: %v", err) 552 } 553 if ch == nil { 554 t.Fatalf("not leader") 555 } 556 557 // Now try with another session. 558 contender, err := c.LockOpts(opts) 559 if err != nil { 560 t.Fatalf("err: %v", err) 561 } 562 start := time.Now() 563 ch, err = contender.Lock(nil) 564 if err != nil { 565 t.Fatalf("err: %v", err) 566 } 567 if ch != nil { 568 t.Fatalf("should not be leader") 569 } 570 diff := time.Since(start) 571 if diff < contender.opts.LockWaitTime || diff > 2*contender.opts.LockWaitTime { 572 t.Fatalf("time out of bounds: %9.6f", diff.Seconds()) 573 } 574 575 // Unlock and then make sure the contender can get it. 576 if err := lock.Unlock(); err != nil { 577 t.Fatalf("err: %v", err) 578 } 579 ch, err = contender.Lock(nil) 580 if err != nil { 581 t.Fatalf("err: %v", err) 582 } 583 if ch == nil { 584 t.Fatalf("should be leader") 585 } 586} 587