1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "bytes" 21 "container/heap" 22 "context" 23 "fmt" 24 "math/rand" 25 "sync" 26 "sync/atomic" 27 "testing" 28 "time" 29 30 "cloud.google.com/go/spanner/internal/testutil" 31 sppb "google.golang.org/genproto/googleapis/spanner/v1" 32 "google.golang.org/grpc" 33 "google.golang.org/grpc/codes" 34 "google.golang.org/grpc/status" 35) 36 37// TestSessionPoolConfigValidation tests session pool config validation. 38func TestSessionPoolConfigValidation(t *testing.T) { 39 t.Parallel() 40 41 sc := testutil.NewMockCloudSpannerClient(t) 42 for _, test := range []struct { 43 spc SessionPoolConfig 44 err error 45 }{ 46 { 47 SessionPoolConfig{}, 48 errNoRPCGetter(), 49 }, 50 { 51 SessionPoolConfig{ 52 getRPCClient: func() (sppb.SpannerClient, error) { 53 return sc, nil 54 }, 55 MinOpened: 10, 56 MaxOpened: 5, 57 }, 58 errMinOpenedGTMaxOpened(5, 10), 59 }, 60 } { 61 if _, err := newSessionPool("mockdb", test.spc, nil); !testEqual(err, test.err) { 62 t.Fatalf("want %v, got %v", test.err, err) 63 } 64 } 65} 66 67// TestSessionCreation tests session creation during sessionPool.Take(). 68func TestSessionCreation(t *testing.T) { 69 t.Parallel() 70 ctx := context.Background() 71 _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{}) 72 defer cleanup() 73 74 // Take three sessions from session pool, this should trigger session pool 75 // to create three new sessions. 76 shs := make([]*sessionHandle, 3) 77 // gotDs holds the unique sessions taken from session pool. 78 gotDs := map[string]bool{} 79 for i := 0; i < len(shs); i++ { 80 var err error 81 shs[i], err = sp.take(ctx) 82 if err != nil { 83 t.Fatalf("failed to get session(%v): %v", i, err) 84 } 85 gotDs[shs[i].getID()] = true 86 } 87 if len(gotDs) != len(shs) { 88 t.Fatalf("session pool created %v sessions, want %v", len(gotDs), len(shs)) 89 } 90 if wantDs := mock.DumpSessions(); !testEqual(gotDs, wantDs) { 91 t.Fatalf("session pool creates sessions %v, want %v", gotDs, wantDs) 92 } 93 // Verify that created sessions are recorded correctly in session pool. 94 sp.mu.Lock() 95 if int(sp.numOpened) != len(shs) { 96 t.Fatalf("session pool reports %v open sessions, want %v", sp.numOpened, len(shs)) 97 } 98 if sp.createReqs != 0 { 99 t.Fatalf("session pool reports %v session create requests, want 0", int(sp.createReqs)) 100 } 101 sp.mu.Unlock() 102 // Verify that created sessions are tracked correctly by healthcheck queue. 103 hc := sp.hc 104 hc.mu.Lock() 105 if hc.queue.Len() != len(shs) { 106 t.Fatalf("healthcheck queue length = %v, want %v", hc.queue.Len(), len(shs)) 107 } 108 for _, s := range hc.queue.sessions { 109 if !gotDs[s.getID()] { 110 t.Fatalf("session %v is in healthcheck queue, but it is not created by session pool", s.getID()) 111 } 112 } 113 hc.mu.Unlock() 114} 115 116// TestTakeFromIdleList tests taking sessions from session pool's idle list. 117func TestTakeFromIdleList(t *testing.T) { 118 t.Parallel() 119 ctx := context.Background() 120 121 // Make sure maintainer keeps the idle sessions. 122 _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{MaxIdle: 10}) 123 defer cleanup() 124 125 // Take ten sessions from session pool and recycle them. 126 shs := make([]*sessionHandle, 10) 127 for i := 0; i < len(shs); i++ { 128 var err error 129 shs[i], err = sp.take(ctx) 130 if err != nil { 131 t.Fatalf("failed to get session(%v): %v", i, err) 132 } 133 } 134 // Make sure it's sampled once before recycling, otherwise it will be 135 // cleaned up. 136 <-time.After(sp.SessionPoolConfig.healthCheckSampleInterval) 137 for i := 0; i < len(shs); i++ { 138 shs[i].recycle() 139 } 140 // Further session requests from session pool won't cause mockclient to 141 // create more sessions. 142 wantSessions := mock.DumpSessions() 143 // Take ten sessions from session pool again, this time all sessions should 144 // come from idle list. 145 gotSessions := map[string]bool{} 146 for i := 0; i < len(shs); i++ { 147 sh, err := sp.take(ctx) 148 if err != nil { 149 t.Fatalf("cannot take session from session pool: %v", err) 150 } 151 gotSessions[sh.getID()] = true 152 } 153 if len(gotSessions) != 10 { 154 t.Fatalf("got %v unique sessions, want 10", len(gotSessions)) 155 } 156 if !testEqual(gotSessions, wantSessions) { 157 t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions) 158 } 159} 160 161// TesttakeWriteSessionFromIdleList tests taking write sessions from session 162// pool's idle list. 163func TestTakeWriteSessionFromIdleList(t *testing.T) { 164 t.Parallel() 165 ctx := context.Background() 166 167 // Make sure maintainer keeps the idle sessions. 168 _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{MaxIdle: 20}) 169 defer cleanup() 170 171 // Take ten sessions from session pool and recycle them. 172 shs := make([]*sessionHandle, 10) 173 for i := 0; i < len(shs); i++ { 174 var err error 175 shs[i], err = sp.takeWriteSession(ctx) 176 if err != nil { 177 t.Fatalf("failed to get session(%v): %v", i, err) 178 } 179 } 180 // Make sure it's sampled once before recycling, otherwise it will be 181 // cleaned up. 182 <-time.After(sp.SessionPoolConfig.healthCheckSampleInterval) 183 for i := 0; i < len(shs); i++ { 184 shs[i].recycle() 185 } 186 // Further session requests from session pool won't cause mockclient to 187 // create more sessions. 188 wantSessions := mock.DumpSessions() 189 // Take ten sessions from session pool again, this time all sessions should 190 // come from idle list. 191 gotSessions := map[string]bool{} 192 for i := 0; i < len(shs); i++ { 193 sh, err := sp.takeWriteSession(ctx) 194 if err != nil { 195 t.Fatalf("cannot take session from session pool: %v", err) 196 } 197 gotSessions[sh.getID()] = true 198 } 199 if len(gotSessions) != 10 { 200 t.Fatalf("got %v unique sessions, want 10", len(gotSessions)) 201 } 202 if !testEqual(gotSessions, wantSessions) { 203 t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions) 204 } 205} 206 207// TestTakeFromIdleListChecked tests taking sessions from session pool's idle 208// list, but with a extra ping check. 209func TestTakeFromIdleListChecked(t *testing.T) { 210 t.Parallel() 211 ctx := context.Background() 212 213 // Make sure maintainer keeps the idle sessions. 214 _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{ 215 MaxIdle: 1, 216 HealthCheckInterval: 50 * time.Millisecond, 217 healthCheckSampleInterval: 10 * time.Millisecond, 218 }) 219 defer cleanup() 220 221 // Stop healthcheck workers to simulate slow pings. 222 sp.hc.close() 223 224 // Create a session and recycle it. 225 sh, err := sp.take(ctx) 226 if err != nil { 227 t.Fatalf("failed to get session: %v", err) 228 } 229 230 // Make sure it's sampled once before recycling, otherwise it will be 231 // cleaned up. 232 <-time.After(sp.SessionPoolConfig.healthCheckSampleInterval) 233 wantSid := sh.getID() 234 sh.recycle() 235 236 // TODO(deklerk): get rid of this 237 <-time.After(time.Second) 238 239 // Two back-to-back session requests, both of them should return the same 240 // session created before and none of them should trigger a session ping. 241 for i := 0; i < 2; i++ { 242 // Take the session from the idle list and recycle it. 243 sh, err = sp.take(ctx) 244 if err != nil { 245 t.Fatalf("%v - failed to get session: %v", i, err) 246 } 247 if gotSid := sh.getID(); gotSid != wantSid { 248 t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid) 249 } 250 251 // The two back-to-back session requests shouldn't trigger any session 252 // pings because sessionPool.Take 253 // reschedules the next healthcheck. 254 if got, want := mock.DumpPings(), ([]string{wantSid}); !testEqual(got, want) { 255 t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want) 256 } 257 sh.recycle() 258 } 259 260 // Inject session error to server stub, and take the session from the 261 // session pool, the old session should be destroyed and the session pool 262 // will create a new session. 263 mock.GetSessionFn = func(c context.Context, r *sppb.GetSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) { 264 mock.MockCloudSpannerClient.ReceivedRequests <- r 265 return nil, status.Errorf(codes.NotFound, "Session not found") 266 } 267 268 // Delay to trigger sessionPool.Take to ping the session. 269 // TODO(deklerk): get rid of this 270 <-time.After(time.Second) 271 272 // take will take the idle session. Then it will send a GetSession request 273 // to check if it's healthy. It'll discover that it's not healthy 274 // (NotFound), drop it, and create a new session. 275 sh, err = sp.take(ctx) 276 if err != nil { 277 t.Fatalf("failed to get session: %v", err) 278 } 279 ds := mock.DumpSessions() 280 if len(ds) != 1 { 281 t.Fatalf("dumped sessions from mockclient: %v, want %v", ds, sh.getID()) 282 } 283 if sh.getID() == wantSid { 284 t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid) 285 } 286} 287 288// TestTakeFromIdleWriteListChecked tests taking sessions from session pool's 289// idle list, but with a extra ping check. 290func TestTakeFromIdleWriteListChecked(t *testing.T) { 291 t.Parallel() 292 ctx := context.Background() 293 294 // Make sure maintainer keeps the idle sessions. 295 _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{ 296 MaxIdle: 1, 297 HealthCheckInterval: 50 * time.Millisecond, 298 healthCheckSampleInterval: 10 * time.Millisecond, 299 }) 300 defer cleanup() 301 302 // Stop healthcheck workers to simulate slow pings. 303 sp.hc.close() 304 305 // Create a session and recycle it. 306 sh, err := sp.takeWriteSession(ctx) 307 if err != nil { 308 t.Fatalf("failed to get session: %v", err) 309 } 310 wantSid := sh.getID() 311 312 // Make sure it's sampled once before recycling, otherwise it will be 313 // cleaned up. 314 <-time.After(sp.SessionPoolConfig.healthCheckSampleInterval) 315 sh.recycle() 316 317 // TODO(deklerk): get rid of this 318 <-time.After(time.Second) 319 320 // Two back-to-back session requests, both of them should return the same 321 // session created before and none of them should trigger a session ping. 322 for i := 0; i < 2; i++ { 323 // Take the session from the idle list and recycle it. 324 sh, err = sp.takeWriteSession(ctx) 325 if err != nil { 326 t.Fatalf("%v - failed to get session: %v", i, err) 327 } 328 if gotSid := sh.getID(); gotSid != wantSid { 329 t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid) 330 } 331 // The two back-to-back session requests shouldn't trigger any session 332 // pings because sessionPool.Take reschedules the next healthcheck. 333 if got, want := mock.DumpPings(), ([]string{wantSid}); !testEqual(got, want) { 334 t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want) 335 } 336 sh.recycle() 337 } 338 339 // Inject session error to mockclient, and take the session from the 340 // session pool, the old session should be destroyed and the session pool 341 // will create a new session. 342 mock.GetSessionFn = func(c context.Context, r *sppb.GetSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) { 343 mock.MockCloudSpannerClient.ReceivedRequests <- r 344 return nil, status.Errorf(codes.NotFound, "Session not found") 345 } 346 347 // Delay to trigger sessionPool.Take to ping the session. 348 // TOOD(deklerk) get rid of this 349 <-time.After(time.Second) 350 351 sh, err = sp.takeWriteSession(ctx) 352 if err != nil { 353 t.Fatalf("failed to get session: %v", err) 354 } 355 ds := mock.DumpSessions() 356 if len(ds) != 1 { 357 t.Fatalf("dumped sessions from mockclient: %v, want %v", ds, sh.getID()) 358 } 359 if sh.getID() == wantSid { 360 t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid) 361 } 362} 363 364// TestMaxOpenedSessions tests max open sessions constraint. 365func TestMaxOpenedSessions(t *testing.T) { 366 t.Parallel() 367 ctx := context.Background() 368 _, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MaxOpened: 1}) 369 defer cleanup() 370 371 sh1, err := sp.take(ctx) 372 if err != nil { 373 t.Fatalf("cannot take session from session pool: %v", err) 374 } 375 376 // Session request will timeout due to the max open sessions constraint. 377 ctx2, cancel := context.WithTimeout(ctx, time.Second) 378 defer cancel() 379 _, gotErr := sp.take(ctx2) 380 if wantErr := errGetSessionTimeout(); !testEqual(gotErr, wantErr) { 381 t.Fatalf("the second session retrival returns error %v, want %v", gotErr, wantErr) 382 } 383 384 go func() { 385 // TODO(deklerk): remove this 386 <-time.After(time.Second) 387 // Destroy the first session to allow the next session request to 388 // proceed. 389 sh1.destroy() 390 }() 391 392 // Now session request can be processed because the first session will be 393 // destroyed. 394 sh2, err := sp.take(ctx) 395 if err != nil { 396 t.Fatalf("after the first session is destroyed, session retrival still returns error %v, want nil", err) 397 } 398 if !sh2.session.isValid() || sh2.getID() == "" { 399 t.Fatalf("got invalid session: %v", sh2.session) 400 } 401} 402 403// TestMinOpenedSessions tests min open session constraint. 404func TestMinOpenedSessions(t *testing.T) { 405 t.Parallel() 406 ctx := context.Background() 407 _, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MinOpened: 1}) 408 defer cleanup() 409 410 // Take ten sessions from session pool and recycle them. 411 var ss []*session 412 var shs []*sessionHandle 413 for i := 0; i < 10; i++ { 414 sh, err := sp.take(ctx) 415 if err != nil { 416 t.Fatalf("failed to get session(%v): %v", i, err) 417 } 418 ss = append(ss, sh.session) 419 shs = append(shs, sh) 420 sh.recycle() 421 } 422 for _, sh := range shs { 423 sh.recycle() 424 } 425 426 // Simulate session expiration. 427 for _, s := range ss { 428 s.destroy(true) 429 } 430 431 sp.mu.Lock() 432 defer sp.mu.Unlock() 433 // There should be still one session left in idle list due to the min open 434 // sessions constraint. 435 if sp.idleList.Len() != 1 { 436 t.Fatalf("got %v sessions in idle list, want 1 %d", sp.idleList.Len(), sp.numOpened) 437 } 438} 439 440// TestMaxBurst tests max burst constraint. 441func TestMaxBurst(t *testing.T) { 442 t.Parallel() 443 ctx := context.Background() 444 _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{MaxBurst: 1}) 445 defer cleanup() 446 447 // Will cause session creation RPC to be retried forever. 448 allowRequests := make(chan struct{}) 449 mock.CreateSessionFn = func(c context.Context, r *sppb.CreateSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) { 450 select { 451 case <-allowRequests: 452 return mock.MockCloudSpannerClient.CreateSession(c, r, opts...) 453 default: 454 mock.MockCloudSpannerClient.ReceivedRequests <- r 455 return nil, status.Errorf(codes.Unavailable, "try later") 456 } 457 } 458 459 // This session request will never finish until the injected error is 460 // cleared. 461 go sp.take(ctx) 462 463 // Poll for the execution of the first session request. 464 for { 465 sp.mu.Lock() 466 cr := sp.createReqs 467 sp.mu.Unlock() 468 if cr == 0 { 469 <-time.After(time.Second) 470 continue 471 } 472 // The first session request is being executed. 473 break 474 } 475 476 ctx2, cancel := context.WithTimeout(ctx, time.Second) 477 defer cancel() 478 _, gotErr := sp.take(ctx2) 479 480 // Since MaxBurst == 1, the second session request should block. 481 if wantErr := errGetSessionTimeout(); !testEqual(gotErr, wantErr) { 482 t.Fatalf("session retrival returns error %v, want %v", gotErr, wantErr) 483 } 484 485 // Let the first session request succeed. 486 close(allowRequests) 487 488 // Now new session request can proceed because the first session request will eventually succeed. 489 sh, err := sp.take(ctx) 490 if err != nil { 491 t.Fatalf("session retrival returns error %v, want nil", err) 492 } 493 if !sh.session.isValid() || sh.getID() == "" { 494 t.Fatalf("got invalid session: %v", sh.session) 495 } 496} 497 498// TestSessionRecycle tests recycling sessions. 499func TestSessionRecycle(t *testing.T) { 500 t.Parallel() 501 ctx := context.Background() 502 _, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MinOpened: 1, MaxIdle: 5}) 503 defer cleanup() 504 505 // Test session is correctly recycled and reused. 506 for i := 0; i < 20; i++ { 507 s, err := sp.take(ctx) 508 if err != nil { 509 t.Fatalf("cannot get the session %v: %v", i, err) 510 } 511 s.recycle() 512 } 513 514 sp.mu.Lock() 515 defer sp.mu.Unlock() 516 // Ideally it should only be 1, because the session should be recycled and 517 // re-used each time. However, sometimes the pool maintainer might increase 518 // the pool size by 1 right around the time we take (which also increases 519 // the pool size by 1), so this assertion is OK with either 1 or 2. We 520 // expect never to see more than 2, though, even when MaxIdle is quite high: 521 // each session should be recycled and re-used. 522 if sp.numOpened != 1 && sp.numOpened != 2 { 523 t.Fatalf("Expect session pool size 1 or 2, got %d", sp.numOpened) 524 } 525} 526 527// TODO(deklerk): Investigate why s.destroy(true) is flakey. 528// TestSessionDestroy tests destroying sessions. 529func TestSessionDestroy(t *testing.T) { 530 t.Skip("s.destroy(true) is flakey") 531 t.Parallel() 532 ctx := context.Background() 533 _, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MinOpened: 1}) 534 defer cleanup() 535 536 <-time.After(10 * time.Millisecond) // maintainer will create one session, we wait for it create session to avoid flakiness in test 537 sh, err := sp.take(ctx) 538 if err != nil { 539 t.Fatalf("cannot get session from session pool: %v", err) 540 } 541 s := sh.session 542 sh.recycle() 543 if d := s.destroy(true); d || !s.isValid() { 544 // Session should be remaining because of min open sessions constraint. 545 t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d) 546 } 547 if d := s.destroy(false); !d || s.isValid() { 548 // Session should be destroyed. 549 t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d) 550 } 551} 552 553// TestHcHeap tests heap operation on top of hcHeap. 554func TestHcHeap(t *testing.T) { 555 in := []*session{ 556 {nextCheck: time.Unix(10, 0)}, 557 {nextCheck: time.Unix(0, 5)}, 558 {nextCheck: time.Unix(1, 8)}, 559 {nextCheck: time.Unix(11, 7)}, 560 {nextCheck: time.Unix(6, 3)}, 561 } 562 want := []*session{ 563 {nextCheck: time.Unix(1, 8), hcIndex: 0}, 564 {nextCheck: time.Unix(6, 3), hcIndex: 1}, 565 {nextCheck: time.Unix(8, 2), hcIndex: 2}, 566 {nextCheck: time.Unix(10, 0), hcIndex: 3}, 567 {nextCheck: time.Unix(11, 7), hcIndex: 4}, 568 } 569 hh := hcHeap{} 570 for _, s := range in { 571 heap.Push(&hh, s) 572 } 573 // Change top of the heap and do a adjustment. 574 hh.sessions[0].nextCheck = time.Unix(8, 2) 575 heap.Fix(&hh, 0) 576 for idx := 0; hh.Len() > 0; idx++ { 577 got := heap.Pop(&hh).(*session) 578 want[idx].hcIndex = -1 579 if !testEqual(got, want[idx]) { 580 t.Fatalf("%v: heap.Pop returns %v, want %v", idx, got, want[idx]) 581 } 582 } 583} 584 585// TestHealthCheckScheduler tests if healthcheck workers can schedule and 586// perform healthchecks properly. 587func TestHealthCheckScheduler(t *testing.T) { 588 t.Parallel() 589 ctx := context.Background() 590 _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{ 591 HealthCheckInterval: 50 * time.Millisecond, 592 healthCheckSampleInterval: 10 * time.Millisecond, 593 }) 594 defer cleanup() 595 596 // Create 50 sessions. 597 ss := []string{} 598 for i := 0; i < 50; i++ { 599 sh, err := sp.take(ctx) 600 if err != nil { 601 t.Fatalf("cannot get session from session pool: %v", err) 602 } 603 ss = append(ss, sh.getID()) 604 } 605 606 // Wait for 10-30 pings per session. 607 waitFor(t, func() error { 608 dp := mock.DumpPings() 609 gotPings := map[string]int64{} 610 for _, p := range dp { 611 gotPings[p]++ 612 } 613 for _, s := range ss { 614 want := int64(20) 615 if got := gotPings[s]; got < want/2 || got > want+want/2 { 616 // This is an unnacceptable amount of pings. 617 return fmt.Errorf("got %v healthchecks on session %v, want it between (%v, %v)", got, s, want/2, want+want/2) 618 } 619 } 620 return nil 621 }) 622} 623 624// Tests that a fractions of sessions are prepared for write by health checker. 625func TestWriteSessionsPrepared(t *testing.T) { 626 t.Parallel() 627 ctx := context.Background() 628 _, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{WriteSessions: 0.5, MaxIdle: 20}) 629 defer cleanup() 630 631 shs := make([]*sessionHandle, 10) 632 var err error 633 for i := 0; i < 10; i++ { 634 shs[i], err = sp.take(ctx) 635 if err != nil { 636 t.Fatalf("cannot get session from session pool: %v", err) 637 } 638 } 639 // Now there are 10 sessions in the pool. Release them. 640 for _, sh := range shs { 641 sh.recycle() 642 } 643 644 // Sleep for 1s, allowing healthcheck workers to invoke begin transaction. 645 // TODO(deklerk): get rid of this 646 <-time.After(time.Second) 647 wshs := make([]*sessionHandle, 5) 648 for i := 0; i < 5; i++ { 649 wshs[i], err = sp.takeWriteSession(ctx) 650 if err != nil { 651 t.Fatalf("cannot get session from session pool: %v", err) 652 } 653 if wshs[i].getTransactionID() == nil { 654 t.Fatalf("got nil transaction id from session pool") 655 } 656 } 657 for _, sh := range wshs { 658 sh.recycle() 659 } 660 661 // TODO(deklerk): get rid of this 662 <-time.After(time.Second) 663 664 // Now force creation of 10 more sessions. 665 shs = make([]*sessionHandle, 20) 666 for i := 0; i < 20; i++ { 667 shs[i], err = sp.take(ctx) 668 if err != nil { 669 t.Fatalf("cannot get session from session pool: %v", err) 670 } 671 } 672 673 // Now there are 20 sessions in the pool. Release them. 674 for _, sh := range shs { 675 sh.recycle() 676 } 677 678 // TODO(deklerk): get rid of this 679 <-time.After(time.Second) 680 681 if sp.idleWriteList.Len() != 10 { 682 t.Fatalf("Expect 10 write prepared session, got: %d", sp.idleWriteList.Len()) 683 } 684} 685 686// TestTakeFromWriteQueue tests that sessionPool.take() returns write prepared 687// sessions as well. 688func TestTakeFromWriteQueue(t *testing.T) { 689 t.Parallel() 690 ctx := context.Background() 691 _, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MaxOpened: 1, WriteSessions: 1.0, MaxIdle: 1}) 692 defer cleanup() 693 694 sh, err := sp.take(ctx) 695 if err != nil { 696 t.Fatalf("cannot get session from session pool: %v", err) 697 } 698 sh.recycle() 699 700 // TODO(deklerk): get rid of this 701 <-time.After(time.Second) 702 703 // The session should now be in write queue but take should also return it. 704 if sp.idleWriteList.Len() == 0 { 705 t.Fatalf("write queue unexpectedly empty") 706 } 707 if sp.idleList.Len() != 0 { 708 t.Fatalf("read queue not empty") 709 } 710 sh, err = sp.take(ctx) 711 if err != nil { 712 t.Fatalf("cannot get session from session pool: %v", err) 713 } 714 sh.recycle() 715} 716 717// TestSessionHealthCheck tests healthchecking cases. 718func TestSessionHealthCheck(t *testing.T) { 719 t.Parallel() 720 ctx := context.Background() 721 _, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{ 722 HealthCheckInterval: 50 * time.Millisecond, 723 healthCheckSampleInterval: 10 * time.Millisecond, 724 }) 725 defer cleanup() 726 727 var requestShouldErr int64 // 0 == false, 1 == true 728 mock.GetSessionFn = func(c context.Context, r *sppb.GetSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) { 729 if shouldErr := atomic.LoadInt64(&requestShouldErr); shouldErr == 1 { 730 mock.MockCloudSpannerClient.ReceivedRequests <- r 731 return nil, status.Errorf(codes.NotFound, "Session not found") 732 } 733 return mock.MockCloudSpannerClient.GetSession(c, r, opts...) 734 } 735 736 // Test pinging sessions. 737 sh, err := sp.take(ctx) 738 if err != nil { 739 t.Fatalf("cannot get session from session pool: %v", err) 740 } 741 742 // Wait for healthchecker to send pings to session. 743 waitFor(t, func() error { 744 pings := mock.DumpPings() 745 if len(pings) == 0 || pings[0] != sh.getID() { 746 return fmt.Errorf("healthchecker didn't send any ping to session %v", sh.getID()) 747 } 748 return nil 749 }) 750 // Test broken session detection. 751 sh, err = sp.take(ctx) 752 if err != nil { 753 t.Fatalf("cannot get session from session pool: %v", err) 754 } 755 756 atomic.SwapInt64(&requestShouldErr, 1) 757 758 // Wait for healthcheck workers to find the broken session and tear it down. 759 // TODO(deklerk): get rid of this 760 <-time.After(1 * time.Second) 761 762 s := sh.session 763 if sh.session.isValid() { 764 t.Fatalf("session(%v) is still alive, want it to be dropped by healthcheck workers", s) 765 } 766 767 atomic.SwapInt64(&requestShouldErr, 0) 768 769 // Test garbage collection. 770 sh, err = sp.take(ctx) 771 if err != nil { 772 t.Fatalf("cannot get session from session pool: %v", err) 773 } 774 sp.close() 775 if sh.session.isValid() { 776 t.Fatalf("session(%v) is still alive, want it to be garbage collected", s) 777 } 778} 779 780// TestStressSessionPool does stress test on session pool by the following concurrent operations: 781// 1) Test worker gets a session from the pool. 782// 2) Test worker turns a session back into the pool. 783// 3) Test worker destroys a session got from the pool. 784// 4) Healthcheck destroys a broken session (because a worker has already destroyed it). 785// 5) Test worker closes the session pool. 786// 787// During the test, the session pool maintainer maintains the number of sessions, 788// and it is expected that all sessions that are taken from session pool remains valid. 789// When all test workers and healthcheck workers exit, mockclient, session pool 790// and healthchecker should be in consistent state. 791func TestStressSessionPool(t *testing.T) { 792 t.Parallel() 793 ctx := context.Background() 794 795 // Use concurrent workers to test different session pool built from different configurations. 796 for ti, cfg := range []SessionPoolConfig{ 797 {}, 798 {MinOpened: 10, MaxOpened: 100}, 799 {MaxBurst: 50}, 800 {MinOpened: 10, MaxOpened: 200, MaxBurst: 5}, 801 {MinOpened: 10, MaxOpened: 200, MaxBurst: 5, WriteSessions: 0.2}, 802 } { 803 var wg sync.WaitGroup 804 // Create a more aggressive session healthchecker to increase test concurrency. 805 cfg.HealthCheckInterval = 50 * time.Millisecond 806 cfg.healthCheckSampleInterval = 10 * time.Millisecond 807 cfg.HealthCheckWorkers = 50 808 sc := testutil.NewMockCloudSpannerClient(t) 809 cfg.getRPCClient = func() (sppb.SpannerClient, error) { 810 return sc, nil 811 } 812 sp, _ := newSessionPool("mockdb", cfg, nil) 813 defer sp.hc.close() 814 defer sp.close() 815 816 for i := 0; i < 100; i++ { 817 wg.Add(1) 818 // Schedule a test worker. 819 go func(idx int, pool *sessionPool, client sppb.SpannerClient) { 820 defer wg.Done() 821 // Test worker iterates 1K times and tries different 822 // session / session pool operations. 823 for j := 0; j < 1000; j++ { 824 if idx%10 == 0 && j >= 900 { 825 // Close the pool in selected set of workers during the 826 // middle of the test. 827 pool.close() 828 } 829 // Take a write sessions ~ 20% of the times. 830 takeWrite := rand.Intn(5) == 4 831 var ( 832 sh *sessionHandle 833 gotErr error 834 ) 835 if takeWrite { 836 sh, gotErr = pool.takeWriteSession(ctx) 837 } else { 838 sh, gotErr = pool.take(ctx) 839 } 840 if gotErr != nil { 841 if pool.isValid() { 842 t.Errorf("%v.%v: pool.take returns error when pool is still valid: %v", ti, idx, gotErr) 843 } 844 if wantErr := errInvalidSessionPool(); !testEqual(gotErr, wantErr) { 845 t.Errorf("%v.%v: got error when pool is closed: %v, want %v", ti, idx, gotErr, wantErr) 846 } 847 continue 848 } 849 // Verify if session is valid when session pool is valid. 850 // Note that if session pool is invalid after sh is taken, 851 // then sh might be invalidated by healthcheck workers. 852 if (sh.getID() == "" || sh.session == nil || !sh.session.isValid()) && pool.isValid() { 853 t.Errorf("%v.%v.%v: pool.take returns invalid session %v", ti, idx, takeWrite, sh.session) 854 } 855 if takeWrite && sh.getTransactionID() == nil { 856 t.Errorf("%v.%v: pool.takeWriteSession returns session %v without transaction", ti, idx, sh.session) 857 } 858 if rand.Intn(100) < idx { 859 // Random sleep before destroying/recycling the session, 860 // to give healthcheck worker a chance to step in. 861 <-time.After(time.Duration(rand.Int63n(int64(cfg.HealthCheckInterval)))) 862 } 863 if rand.Intn(100) < idx { 864 // destroy the session. 865 sh.destroy() 866 continue 867 } 868 // recycle the session. 869 sh.recycle() 870 } 871 }(i, sp, sc) 872 } 873 wg.Wait() 874 sp.hc.close() 875 // Here the states of healthchecker, session pool and mockclient are 876 // stable. 877 idleSessions := map[string]bool{} 878 hcSessions := map[string]bool{} 879 mockSessions := sc.DumpSessions() 880 // Dump session pool's idle list. 881 for sl := sp.idleList.Front(); sl != nil; sl = sl.Next() { 882 s := sl.Value.(*session) 883 if idleSessions[s.getID()] { 884 t.Fatalf("%v: found duplicated session in idle list: %v", ti, s.getID()) 885 } 886 idleSessions[s.getID()] = true 887 } 888 for sl := sp.idleWriteList.Front(); sl != nil; sl = sl.Next() { 889 s := sl.Value.(*session) 890 if idleSessions[s.getID()] { 891 t.Fatalf("%v: found duplicated session in idle write list: %v", ti, s.getID()) 892 } 893 idleSessions[s.getID()] = true 894 } 895 sp.mu.Lock() 896 if int(sp.numOpened) != len(idleSessions) { 897 t.Fatalf("%v: number of opened sessions (%v) != number of idle sessions (%v)", ti, sp.numOpened, len(idleSessions)) 898 } 899 if sp.createReqs != 0 { 900 t.Fatalf("%v: number of pending session creations = %v, want 0", ti, sp.createReqs) 901 } 902 // Dump healthcheck queue. 903 for _, s := range sp.hc.queue.sessions { 904 if hcSessions[s.getID()] { 905 t.Fatalf("%v: found duplicated session in healthcheck queue: %v", ti, s.getID()) 906 } 907 hcSessions[s.getID()] = true 908 } 909 sp.mu.Unlock() 910 911 // Verify that idleSessions == hcSessions == mockSessions. 912 if !testEqual(idleSessions, hcSessions) { 913 t.Fatalf("%v: sessions in idle list (%v) != sessions in healthcheck queue (%v)", ti, idleSessions, hcSessions) 914 } 915 if !testEqual(hcSessions, mockSessions) { 916 t.Fatalf("%v: sessions in healthcheck queue (%v) != sessions in mockclient (%v)", ti, hcSessions, mockSessions) 917 } 918 sp.close() 919 mockSessions = sc.DumpSessions() 920 if len(mockSessions) != 0 { 921 t.Fatalf("Found live sessions: %v", mockSessions) 922 } 923 } 924} 925 926// TODO(deklerk): Investigate why this test is flakey, even with waitFor. Example 927// flakey failure: session_test.go:946: after 15s waiting, got Scale down. 928// Expect 5 open, got 6 929// 930// TestMaintainer checks the session pool maintainer maintains the number of 931// sessions in the following cases: 932// 933// 1. On initialization of session pool, replenish session pool to meet 934// MinOpened or MaxIdle. 935// 2. On increased session usage, provision extra MaxIdle sessions. 936// 3. After the surge passes, scale down the session pool accordingly. 937func TestMaintainer(t *testing.T) { 938 t.Skip("asserting session state seems flakey") 939 t.Parallel() 940 ctx := context.Background() 941 942 minOpened := uint64(5) 943 maxIdle := uint64(4) 944 _, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MinOpened: minOpened, MaxIdle: maxIdle}) 945 defer cleanup() 946 947 sampleInterval := sp.SessionPoolConfig.healthCheckSampleInterval 948 949 waitFor(t, func() error { 950 sp.mu.Lock() 951 defer sp.mu.Unlock() 952 if sp.numOpened != 5 { 953 return fmt.Errorf("Replenish. Expect %d open, got %d", sp.MinOpened, sp.numOpened) 954 } 955 return nil 956 }) 957 958 // To save test time, we are not creating many sessions, because the time 959 // to create sessions will have impact on the decision on sessionsToKeep. 960 // We also parallelize the take and recycle process. 961 shs := make([]*sessionHandle, 10) 962 for i := 0; i < len(shs); i++ { 963 var err error 964 shs[i], err = sp.take(ctx) 965 if err != nil { 966 t.Fatalf("cannot get session from session pool: %v", err) 967 } 968 } 969 sp.mu.Lock() 970 if sp.numOpened != 10 { 971 t.Fatalf("Scale out from normal use. Expect %d open, got %d", 10, sp.numOpened) 972 } 973 sp.mu.Unlock() 974 975 <-time.After(sampleInterval) 976 for _, sh := range shs[:7] { 977 sh.recycle() 978 } 979 980 waitFor(t, func() error { 981 sp.mu.Lock() 982 defer sp.mu.Unlock() 983 if sp.numOpened != 7 { 984 return fmt.Errorf("Keep extra MaxIdle sessions. Expect %d open, got %d", 7, sp.numOpened) 985 } 986 return nil 987 }) 988 989 for _, sh := range shs[7:] { 990 sh.recycle() 991 } 992 waitFor(t, func() error { 993 sp.mu.Lock() 994 defer sp.mu.Unlock() 995 if sp.numOpened != minOpened { 996 return fmt.Errorf("Scale down. Expect %d open, got %d", minOpened, sp.numOpened) 997 } 998 return nil 999 }) 1000} 1001 1002// Tests that maintainer creates up to MinOpened connections. 1003// 1004// Historical context: This test also checks that a low 1005// healthCheckSampleInterval does not prevent it from opening connections. 1006// See: https://github.com/googleapis/google-cloud-go/issues/1259 1007func TestMaintainer_CreatesSessions(t *testing.T) { 1008 t.Parallel() 1009 1010 rawServerStub := testutil.NewMockCloudSpannerClient(t) 1011 serverClientMock := testutil.FuncMock{MockCloudSpannerClient: rawServerStub} 1012 serverClientMock.CreateSessionFn = func(c context.Context, r *sppb.CreateSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) { 1013 time.Sleep(10 * time.Millisecond) 1014 return rawServerStub.CreateSession(c, r, opts...) 1015 } 1016 spc := SessionPoolConfig{ 1017 MinOpened: 10, 1018 MaxIdle: 10, 1019 healthCheckSampleInterval: time.Millisecond, 1020 getRPCClient: func() (sppb.SpannerClient, error) { 1021 return &serverClientMock, nil 1022 }, 1023 } 1024 db := "mockdb" 1025 sp, err := newSessionPool(db, spc, nil) 1026 if err != nil { 1027 t.Fatalf("cannot create session pool: %v", err) 1028 } 1029 client := Client{ 1030 database: db, 1031 idleSessions: sp, 1032 } 1033 defer func() { 1034 client.Close() 1035 sp.hc.close() 1036 sp.close() 1037 }() 1038 1039 timeoutAmt := 2 * time.Second 1040 timeout := time.After(timeoutAmt) 1041 var numOpened uint64 1042loop: 1043 for { 1044 select { 1045 case <-timeout: 1046 t.Fatalf("timed out after %v, got %d session(s), want %d", timeoutAmt, numOpened, spc.MinOpened) 1047 default: 1048 sp.mu.Lock() 1049 numOpened = sp.numOpened 1050 sp.mu.Unlock() 1051 if numOpened == 10 { 1052 break loop 1053 } 1054 } 1055 } 1056} 1057 1058func (s1 *session) Equal(s2 *session) bool { 1059 return s1.client == s2.client && 1060 s1.id == s2.id && 1061 s1.pool == s2.pool && 1062 s1.createTime == s2.createTime && 1063 s1.valid == s2.valid && 1064 s1.hcIndex == s2.hcIndex && 1065 s1.idleList == s2.idleList && 1066 s1.nextCheck.Equal(s2.nextCheck) && 1067 s1.checkingHealth == s2.checkingHealth && 1068 testEqual(s1.md, s2.md) && 1069 bytes.Equal(s1.tx, s2.tx) 1070} 1071 1072func waitFor(t *testing.T, assert func() error) { 1073 t.Helper() 1074 timeout := 15 * time.Second 1075 ta := time.After(timeout) 1076 1077 for { 1078 select { 1079 case <-ta: 1080 if err := assert(); err != nil { 1081 t.Fatalf("after %v waiting, got %v", timeout, err) 1082 } 1083 return 1084 default: 1085 } 1086 1087 if err := assert(); err != nil { 1088 // Fail. Let's pause and retry. 1089 time.Sleep(10 * time.Millisecond) 1090 continue 1091 } 1092 1093 return 1094 } 1095} 1096