1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "bytes" 21 "container/heap" 22 "context" 23 "fmt" 24 "io/ioutil" 25 "log" 26 "math/rand" 27 "os" 28 "reflect" 29 "strings" 30 "testing" 31 "time" 32 33 . "cloud.google.com/go/spanner/internal/testutil" 34 "google.golang.org/api/iterator" 35 "google.golang.org/genproto/googleapis/rpc/errdetails" 36 sppb "google.golang.org/genproto/googleapis/spanner/v1" 37 "google.golang.org/grpc/codes" 38 "google.golang.org/grpc/status" 39) 40 41func newSessionNotFoundError(name string) error { 42 s := status.Newf(codes.NotFound, "Session not found: Session with id %s not found", name) 43 s, _ = s.WithDetails(&errdetails.ResourceInfo{ResourceType: sessionResourceType, ResourceName: name}) 44 return s.Err() 45} 46 47// TestSessionPoolConfigValidation tests session pool config validation. 48func TestSessionPoolConfigValidation(t *testing.T) { 49 t.Parallel() 50 _, client, teardown := setupMockedTestServer(t) 51 defer teardown() 52 53 for _, test := range []struct { 54 spc SessionPoolConfig 55 err error 56 }{ 57 { 58 SessionPoolConfig{ 59 MinOpened: 10, 60 MaxOpened: 5, 61 }, 62 errMinOpenedGTMaxOpened(5, 10), 63 }, 64 { 65 SessionPoolConfig{ 66 WriteSessions: -0.1, 67 }, 68 errWriteFractionOutOfRange(-0.1), 69 }, 70 { 71 SessionPoolConfig{ 72 WriteSessions: 2.0, 73 }, 74 errWriteFractionOutOfRange(2.0), 75 }, 76 { 77 SessionPoolConfig{ 78 HealthCheckWorkers: -1, 79 }, 80 errHealthCheckWorkersNegative(-1), 81 }, 82 { 83 SessionPoolConfig{ 84 HealthCheckInterval: -time.Second, 85 }, 86 errHealthCheckIntervalNegative(-time.Second), 87 }, 88 } { 89 if _, err := newSessionPool(client.sc, test.spc); !testEqual(err, test.err) { 90 t.Fatalf("want %v, got %v", test.err, err) 91 } 92 } 93} 94 95// TestSessionCreation tests session creation during sessionPool.Take(). 96func TestSessionCreation(t *testing.T) { 97 t.Parallel() 98 ctx := context.Background() 99 _, client, teardown := setupMockedTestServer(t) 100 defer teardown() 101 sp := client.idleSessions 102 103 // Take three sessions from session pool, this should trigger session pool 104 // to create SessionPoolConfig.incStep new sessions. 105 shs := make([]*sessionHandle, 3) 106 for i := 0; i < len(shs); i++ { 107 var err error 108 shs[i], err = sp.take(ctx) 109 if err != nil { 110 t.Fatalf("failed to get session(%v): %v", i, err) 111 } 112 } 113 // Wait until session creation has seized. 114 timeout := time.After(4 * time.Second) 115 var numBeingCreated uint64 116loop: 117 for { 118 sp.mu.Lock() 119 numBeingCreated = sp.createReqs 120 sp.mu.Unlock() 121 select { 122 case <-timeout: 123 t.Fatalf("timed out, still %d session(s) being created, want %d", numBeingCreated, 0) 124 default: 125 if numBeingCreated == 0 { 126 break loop 127 } 128 } 129 } 130 for _, sh := range shs { 131 if _, err := sh.getClient().GetSession(context.Background(), &sppb.GetSessionRequest{ 132 Name: sh.getID(), 133 }); err != nil { 134 t.Fatalf("error getting expected session from server: %v", err) 135 } 136 } 137 // Verify that created sessions are recorded correctly in session pool. 138 sp.mu.Lock() 139 if sp.numOpened != sp.incStep { 140 t.Fatalf("session pool reports %v open sessions, want %v", sp.numOpened, sp.incStep) 141 } 142 if sp.createReqs != 0 { 143 t.Fatalf("session pool reports %v session create requests, want 0", int(sp.createReqs)) 144 } 145 sp.mu.Unlock() 146 // Verify that created sessions are tracked correctly by healthcheck queue. 147 hc := sp.hc 148 hc.mu.Lock() 149 if uint64(hc.queue.Len()) != sp.incStep { 150 t.Fatalf("healthcheck queue length = %v, want %v", hc.queue.Len(), sp.incStep) 151 } 152 hc.mu.Unlock() 153} 154 155// TestLIFOSessionOrder tests if session pool hand out sessions in LIFO order. 156func TestLIFOSessionOrder(t *testing.T) { 157 t.Parallel() 158 ctx := context.Background() 159 _, client, teardown := setupMockedTestServerWithConfig(t, 160 ClientConfig{ 161 SessionPoolConfig: SessionPoolConfig{ 162 MaxOpened: 3, 163 MinOpened: 3, 164 }, 165 }) 166 defer teardown() 167 sp := client.idleSessions 168 // Create/take three sessions and recycle them. 169 shs, shsIDs := make([]*sessionHandle, 3), make([]string, 3) 170 for i := 0; i < len(shs); i++ { 171 var err error 172 if shs[i], err = sp.take(ctx); err != nil { 173 t.Fatalf("failed to take session(%v): %v", i, err) 174 } 175 shsIDs[i] = shs[i].getID() 176 } 177 for i := 0; i < len(shs); i++ { 178 shs[i].recycle() 179 } 180 for i := 2; i >= 0; i-- { 181 sh, err := sp.take(ctx) 182 if err != nil { 183 t.Fatalf("cannot take session from session pool: %v", err) 184 } 185 // check, if sessions returned in LIFO order. 186 if wantID, gotID := shsIDs[i], sh.getID(); wantID != gotID { 187 t.Fatalf("got session with id: %v, want: %v", gotID, wantID) 188 } 189 } 190} 191 192// TestLIFOTakeWriteSessionOrder tests if write session pool hand out sessions in LIFO order. 193func TestLIFOTakeWriteSessionOrder(t *testing.T) { 194 t.Parallel() 195 ctx := context.Background() 196 _, client, teardown := setupMockedTestServerWithConfig(t, 197 ClientConfig{ 198 SessionPoolConfig: SessionPoolConfig{ 199 MaxOpened: 3, 200 MinOpened: 3, 201 // Set the write fraction to 0 to ensure the write sessions are 202 // only created on demand, which will guarantee the exact order 203 // in which we receive the sessions. 204 WriteSessions: 0, 205 }, 206 }) 207 defer teardown() 208 sp := client.idleSessions 209 // Create/take three sessions and recycle them. 210 shs, shsIDs := make([]*sessionHandle, 3), make([]string, 3) 211 for i := 0; i < len(shs); i++ { 212 var err error 213 if shs[i], err = sp.takeWriteSession(ctx); err != nil { 214 t.Fatalf("failed to take session(%v): %v", i, err) 215 } 216 shsIDs[i] = shs[i].getID() 217 } 218 for i := 0; i < len(shs); i++ { 219 shs[i].recycle() 220 } 221 for i := 2; i >= 0; i-- { 222 ws, err := sp.takeWriteSession(ctx) 223 if err != nil { 224 t.Fatalf("cannot take session from session pool: %v", err) 225 } 226 // check, if write sessions returned in LIFO order. 227 if wantID, gotID := shsIDs[i], ws.getID(); wantID != gotID { 228 t.Fatalf("got session with id: %v, want: %v", gotID, wantID) 229 } 230 } 231} 232 233// TestTakeFromIdleList tests taking sessions from session pool's idle list. 234func TestTakeFromIdleList(t *testing.T) { 235 t.Parallel() 236 ctx := context.Background() 237 238 // Make sure maintainer keeps the idle sessions. 239 server, client, teardown := setupMockedTestServerWithConfig(t, 240 ClientConfig{ 241 SessionPoolConfig: SessionPoolConfig{MaxIdle: 10, MaxOpened: 10}, 242 }) 243 defer teardown() 244 sp := client.idleSessions 245 246 // Take ten sessions from session pool and recycle them. 247 shs := make([]*sessionHandle, 10) 248 for i := 0; i < len(shs); i++ { 249 var err error 250 shs[i], err = sp.take(ctx) 251 if err != nil { 252 t.Fatalf("failed to get session(%v): %v", i, err) 253 } 254 } 255 // Make sure it's sampled once before recycling, otherwise it will be 256 // cleaned up. 257 <-time.After(sp.SessionPoolConfig.healthCheckSampleInterval) 258 for i := 0; i < len(shs); i++ { 259 shs[i].recycle() 260 } 261 // Further session requests from session pool won't cause mockclient to 262 // create more sessions. 263 wantSessions := server.TestSpanner.DumpSessions() 264 // Take ten sessions from session pool again, this time all sessions should 265 // come from idle list. 266 gotSessions := map[string]bool{} 267 for i := 0; i < len(shs); i++ { 268 sh, err := sp.take(ctx) 269 if err != nil { 270 t.Fatalf("cannot take session from session pool: %v", err) 271 } 272 gotSessions[sh.getID()] = true 273 } 274 if len(gotSessions) != 10 { 275 t.Fatalf("got %v unique sessions, want 10", len(gotSessions)) 276 } 277 if !testEqual(gotSessions, wantSessions) { 278 t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions) 279 } 280} 281 282// TesttakeWriteSessionFromIdleList tests taking write sessions from session 283// pool's idle list. 284func TestTakeWriteSessionFromIdleList(t *testing.T) { 285 t.Parallel() 286 ctx := context.Background() 287 288 // Make sure maintainer keeps the idle sessions. 289 server, client, teardown := setupMockedTestServerWithConfig(t, 290 ClientConfig{ 291 SessionPoolConfig: SessionPoolConfig{MaxIdle: 10, MaxOpened: 10}, 292 }) 293 defer teardown() 294 sp := client.idleSessions 295 296 // Take ten sessions from session pool and recycle them. 297 shs := make([]*sessionHandle, 10) 298 for i := 0; i < len(shs); i++ { 299 var err error 300 shs[i], err = sp.takeWriteSession(ctx) 301 if err != nil { 302 t.Fatalf("failed to get session(%v): %v", i, err) 303 } 304 } 305 // Make sure it's sampled once before recycling, otherwise it will be 306 // cleaned up. 307 <-time.After(sp.SessionPoolConfig.healthCheckSampleInterval) 308 for i := 0; i < len(shs); i++ { 309 shs[i].recycle() 310 } 311 // Further session requests from session pool won't cause mockclient to 312 // create more sessions. 313 wantSessions := server.TestSpanner.DumpSessions() 314 // Take ten sessions from session pool again, this time all sessions should 315 // come from idle list. 316 gotSessions := map[string]bool{} 317 for i := 0; i < len(shs); i++ { 318 sh, err := sp.takeWriteSession(ctx) 319 if err != nil { 320 t.Fatalf("cannot take session from session pool: %v", err) 321 } 322 gotSessions[sh.getID()] = true 323 } 324 if len(gotSessions) != 10 { 325 t.Fatalf("got %v unique sessions, want 10", len(gotSessions)) 326 } 327 if !testEqual(gotSessions, wantSessions) { 328 t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions) 329 } 330} 331 332// TestTakeFromIdleListChecked tests taking sessions from session pool's idle 333// list, but with a extra ping check. 334func TestTakeFromIdleListChecked(t *testing.T) { 335 t.Parallel() 336 ctx := context.Background() 337 338 // Make sure maintainer keeps the idle sessions. 339 server, client, teardown := setupMockedTestServerWithConfig(t, 340 ClientConfig{ 341 SessionPoolConfig: SessionPoolConfig{ 342 WriteSessions: 0.0, 343 MaxIdle: 1, 344 HealthCheckInterval: 50 * time.Millisecond, 345 healthCheckSampleInterval: 10 * time.Millisecond, 346 }, 347 }) 348 defer teardown() 349 sp := client.idleSessions 350 351 // Stop healthcheck workers to simulate slow pings. 352 sp.hc.close() 353 354 // Create a session and recycle it. 355 sh, err := sp.take(ctx) 356 if err != nil { 357 t.Fatalf("failed to get session: %v", err) 358 } 359 360 // Wait until all session creation has finished. 361 waitFor(t, func() error { 362 sp.mu.Lock() 363 // WriteSessions = 0, so we only have to check for read sessions. 364 numOpened := uint64(sp.idleList.Len()) 365 sp.mu.Unlock() 366 if numOpened < sp.SessionPoolConfig.incStep-1 { 367 return fmt.Errorf("creation not yet finished") 368 } 369 return nil 370 }) 371 372 // Force ping during the first take() by setting check time to the past. 373 sp.hc.mu.Lock() 374 sh.session.nextCheck = time.Now().Add(-time.Minute) 375 sp.hc.mu.Unlock() 376 wantSid := sh.getID() 377 sh.recycle() 378 379 // Two back-to-back session requests, both of them should return the same 380 // session created before, but only the first of them should trigger a session ping. 381 for i := 0; i < 2; i++ { 382 // Take the session from the idle list and recycle it. 383 sh, err = sp.take(ctx) 384 if err != nil { 385 t.Fatalf("%v - failed to get session: %v", i, err) 386 } 387 if gotSid := sh.getID(); gotSid != wantSid { 388 t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid) 389 } 390 391 // The two back-to-back session requests shouldn't trigger any session 392 // pings because sessionPool.Take reschedules the next healthcheck. 393 if got, want := server.TestSpanner.DumpPings(), ([]string{wantSid}); !testEqual(got, want) { 394 t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want) 395 } 396 sh.recycle() 397 } 398 399 // Inject session error to server stub, and take the session from the 400 // session pool, the old session should be destroyed and the session pool 401 // will create a new session. 402 server.TestSpanner.PutExecutionTime(MethodExecuteSql, 403 SimulatedExecutionTime{ 404 Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}, 405 }) 406 407 // Force ping by setting check time in the past. 408 s := sp.idleList.Front().Value.(*session) 409 s.nextCheck = time.Now().Add(-time.Minute) 410 411 // take will take the idle session. Then it will send a GetSession request 412 // to check if it's healthy. It'll discover that it's not healthy 413 // (NotFound) and drop it. No new session will be created as MinOpened=0. 414 sh, err = sp.take(ctx) 415 if err != nil { 416 t.Fatalf("failed to get session: %v", err) 417 } 418 ds := server.TestSpanner.DumpSessions() 419 if g, w := uint64(len(ds)), sp.incStep-1; g != w { 420 t.Fatalf("number of sessions from mock server mismatch\nGot: %v\nWant: %v\n", g, w) 421 } 422 if sh.getID() == wantSid { 423 t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid) 424 } 425} 426 427// TestTakeFromIdleWriteListChecked tests taking sessions from session pool's 428// idle list, but with a extra ping check. 429func TestTakeFromIdleWriteListChecked(t *testing.T) { 430 t.Parallel() 431 ctx := context.Background() 432 433 // Make sure maintainer keeps the idle sessions. 434 server, client, teardown := setupMockedTestServerWithConfig(t, 435 ClientConfig{ 436 SessionPoolConfig: SessionPoolConfig{ 437 MaxIdle: 1, 438 HealthCheckInterval: 50 * time.Millisecond, 439 healthCheckSampleInterval: 10 * time.Millisecond, 440 }, 441 }) 442 defer teardown() 443 sp := client.idleSessions 444 445 // Stop healthcheck workers to simulate slow pings. 446 sp.hc.close() 447 448 // Create a session and recycle it. 449 sh, err := sp.takeWriteSession(ctx) 450 if err != nil { 451 t.Fatalf("failed to get session: %v", err) 452 } 453 wantSid := sh.getID() 454 // Set the next check in the past to ensure the next take() call will 455 // trigger a health check. 456 sh.session.nextCheck = time.Now().Add(-time.Minute) 457 sh.recycle() 458 459 // Two back-to-back session requests, both of them should return the same 460 // session created before and only the first of them should trigger a 461 // session ping. 462 for i := 0; i < 2; i++ { 463 // Take the session from the idle list and recycle it. 464 sh, err = sp.takeWriteSession(ctx) 465 if err != nil { 466 t.Fatalf("%v - failed to get session: %v", i, err) 467 } 468 if gotSid := sh.getID(); gotSid != wantSid { 469 t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid) 470 } 471 // The two back-to-back session requests shouldn't trigger any session 472 // pings because sessionPool.Take reschedules the next healthcheck. 473 if got, want := server.TestSpanner.DumpPings(), ([]string{wantSid}); !testEqual(got, want) { 474 t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want) 475 } 476 sh.recycle() 477 } 478 479 // Inject session error to mockclient, and take the session from the 480 // session pool, the old session should be destroyed and the session pool 481 // will create a new session. 482 server.TestSpanner.PutExecutionTime(MethodExecuteSql, 483 SimulatedExecutionTime{ 484 Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}, 485 }) 486 487 // Force ping by setting check time in the past. 488 s := sp.idleList.Front().Value.(*session) 489 s.nextCheck = time.Now().Add(-time.Minute) 490 491 sh, err = sp.takeWriteSession(ctx) 492 if err != nil { 493 t.Fatalf("failed to get session: %v", err) 494 } 495 ds := server.TestSpanner.DumpSessions() 496 if g, w := uint64(len(ds)), sp.incStep-1; g != w { 497 t.Fatalf("number of sessions from mock server mismatch\nGot: %v\nWant: %v\n", g, w) 498 } 499 if sh.getID() == wantSid { 500 t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid) 501 } 502} 503 504// TestSessionLeak tests leaking a session and getting the stack of the 505// goroutine that leaked it. 506func TestSessionLeak(t *testing.T) { 507 t.Parallel() 508 ctx := context.Background() 509 510 _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ 511 SessionPoolConfig: SessionPoolConfig{ 512 TrackSessionHandles: true, 513 MinOpened: 0, 514 MaxOpened: 1, 515 }, 516 }) 517 defer teardown() 518 519 // Execute a query without calling rowIterator.Stop. This will cause the 520 // session not to be returned to the pool. 521 single := client.Single() 522 iter := single.Query(ctx, NewStatement(SelectFooFromBar)) 523 for { 524 _, err := iter.Next() 525 if err == iterator.Done { 526 break 527 } 528 if err != nil { 529 t.Fatalf("Got unexpected error while iterating results: %v\n", err) 530 } 531 } 532 // The session should not have been returned to the pool. 533 if g, w := client.idleSessions.idleList.Len(), 0; g != w { 534 t.Fatalf("Idle sessions count mismatch\nGot: %d\nWant: %d\n", g, w) 535 } 536 // The checked out session should contain a stack trace. 537 if single.sh.stack == nil { 538 t.Fatalf("Missing stacktrace from session handle") 539 } 540 stack := fmt.Sprintf("%s", single.sh.stack) 541 testMethod := "TestSessionLeak" 542 if !strings.Contains(stack, testMethod) { 543 t.Fatalf("Stacktrace does not contain '%s'\nGot: %s", testMethod, stack) 544 } 545 // Return the session to the pool. 546 iter.Stop() 547 // The stack should now have been removed from the session handle. 548 if single.sh.stack != nil { 549 t.Fatalf("Got unexpected stacktrace in session handle: %s", single.sh.stack) 550 } 551 552 // Do another query and hold on to the session. 553 single = client.Single() 554 iter = single.Query(ctx, NewStatement(SelectFooFromBar)) 555 for { 556 _, err := iter.Next() 557 if err == iterator.Done { 558 break 559 } 560 if err != nil { 561 t.Fatalf("Got unexpected error while iterating results: %v\n", err) 562 } 563 } 564 // Try to do another query. This will fail as MaxOpened=1. 565 ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Millisecond*10) 566 defer cancel() 567 single2 := client.Single() 568 iter2 := single2.Query(ctxWithTimeout, NewStatement(SelectFooFromBar)) 569 _, gotErr := iter2.Next() 570 wantErr := client.idleSessions.errGetSessionTimeoutWithTrackedSessionHandles(codes.DeadlineExceeded) 571 // The error should contain the stacktraces of all the checked out 572 // sessions. 573 if !testEqual(gotErr, wantErr) { 574 t.Fatalf("Error mismatch on iterating result set.\nGot: %v\nWant: %v\n", gotErr, wantErr) 575 } 576 if !strings.Contains(gotErr.Error(), testMethod) { 577 t.Fatalf("Error does not contain '%s'\nGot: %s", testMethod, gotErr.Error()) 578 } 579 // Close iterators to check sessions back into the pool before closing. 580 iter2.Stop() 581 iter.Stop() 582} 583 584// TestMaxOpenedSessions tests max open sessions constraint. 585func TestMaxOpenedSessions(t *testing.T) { 586 t.Parallel() 587 ctx := context.Background() 588 _, client, teardown := setupMockedTestServerWithConfig(t, 589 ClientConfig{ 590 SessionPoolConfig: SessionPoolConfig{ 591 MaxOpened: 1, 592 }, 593 }) 594 defer teardown() 595 sp := client.idleSessions 596 597 sh1, err := sp.take(ctx) 598 if err != nil { 599 t.Fatalf("cannot take session from session pool: %v", err) 600 } 601 602 // Session request will timeout due to the max open sessions constraint. 603 ctx2, cancel := context.WithTimeout(ctx, 10*time.Millisecond) 604 defer cancel() 605 _, gotErr := sp.take(ctx2) 606 if wantErr := sp.errGetBasicSessionTimeout(codes.DeadlineExceeded); !testEqual(gotErr, wantErr) { 607 t.Fatalf("the second session retrival returns error %v, want %v", gotErr, wantErr) 608 } 609 doneWaiting := make(chan struct{}) 610 go func() { 611 // Destroy the first session to allow the next session request to 612 // proceed. 613 <-doneWaiting 614 sh1.destroy() 615 }() 616 617 go func() { 618 // Wait a short random time before destroying the session handle. 619 <-time.After(10 * time.Millisecond) 620 close(doneWaiting) 621 }() 622 // Now session request can be processed because the first session will be 623 // destroyed. 624 ctx3, cancel := context.WithTimeout(ctx, time.Second) 625 defer cancel() 626 sh2, err := sp.take(ctx3) 627 if err != nil { 628 t.Fatalf("after the first session is destroyed, session retrival still returns error %v, want nil", err) 629 } 630 if !sh2.session.isValid() || sh2.getID() == "" { 631 t.Fatalf("got invalid session: %v", sh2.session) 632 } 633} 634 635// TestMinOpenedSessions tests min open session constraint. 636func TestMinOpenedSessions(t *testing.T) { 637 t.Parallel() 638 ctx := context.Background() 639 _, client, teardown := setupMockedTestServerWithConfig(t, 640 ClientConfig{ 641 SessionPoolConfig: SessionPoolConfig{ 642 MinOpened: 1, 643 healthCheckSampleInterval: time.Millisecond, 644 }, 645 }) 646 defer teardown() 647 sp := client.idleSessions 648 649 // Take ten sessions from session pool and recycle them. 650 var ss []*session 651 var shs []*sessionHandle 652 for i := 0; i < 10; i++ { 653 sh, err := sp.take(ctx) 654 if err != nil { 655 t.Fatalf("failed to get session(%v): %v", i, err) 656 } 657 ss = append(ss, sh.session) 658 shs = append(shs, sh) 659 sh.recycle() 660 } 661 for _, sh := range shs { 662 sh.recycle() 663 } 664 665 // Simulate session expiration. 666 for _, s := range ss { 667 s.destroy(true) 668 } 669 670 // Wait until the maintainer has had a chance to replenish the pool. 671 for i := 0; i < 10; i++ { 672 sp.mu.Lock() 673 if sp.numOpened > 0 { 674 sp.mu.Unlock() 675 break 676 } 677 sp.mu.Unlock() 678 <-time.After(sp.healthCheckSampleInterval) 679 } 680 sp.mu.Lock() 681 defer sp.mu.Unlock() 682 // There should be still one session left in either the idle list or in one 683 // of the other opened states due to the min open sessions constraint. 684 if (sp.idleList.Len() + 685 sp.idleWriteList.Len() + 686 int(sp.prepareReqs) + 687 int(sp.createReqs)) != 1 { 688 t.Fatalf( 689 "got %v sessions in idle lists, want 1. Opened: %d, read: %d, "+ 690 "write: %d, in preparation: %d, in creation: %d", 691 sp.idleList.Len()+sp.idleWriteList.Len(), sp.numOpened, 692 sp.idleList.Len(), sp.idleWriteList.Len(), sp.prepareReqs, 693 sp.createReqs) 694 } 695} 696 697// TestMaxBurst tests max burst constraint. 698func TestMaxBurst(t *testing.T) { 699 t.Parallel() 700 ctx := context.Background() 701 server, client, teardown := setupMockedTestServerWithConfig(t, 702 ClientConfig{ 703 SessionPoolConfig: SessionPoolConfig{ 704 MaxBurst: 1, 705 }, 706 }) 707 defer teardown() 708 sp := client.idleSessions 709 710 // Will cause session creation RPC to be retried forever. 711 server.TestSpanner.PutExecutionTime(MethodBatchCreateSession, 712 SimulatedExecutionTime{ 713 Errors: []error{status.Errorf(codes.Unavailable, "try later")}, 714 KeepError: true, 715 }) 716 717 // This session request will never finish until the injected error is 718 // cleared. 719 go sp.take(ctx) 720 721 // Poll for the execution of the first session request. 722 for { 723 sp.mu.Lock() 724 cr := sp.createReqs 725 sp.mu.Unlock() 726 if cr == 0 { 727 <-time.After(time.Second) 728 continue 729 } 730 // The first session request is being executed. 731 break 732 } 733 734 ctx2, cancel := context.WithTimeout(ctx, time.Second) 735 defer cancel() 736 _, gotErr := sp.take(ctx2) 737 738 // Since MaxBurst == 1, the second session request should block. 739 if wantErr := sp.errGetBasicSessionTimeout(codes.DeadlineExceeded); !testEqual(gotErr, wantErr) { 740 t.Fatalf("session retrival returns error %v, want %v", gotErr, wantErr) 741 } 742 743 // Let the first session request succeed. 744 server.TestSpanner.Freeze() 745 server.TestSpanner.PutExecutionTime(MethodBatchCreateSession, SimulatedExecutionTime{}) 746 server.TestSpanner.Unfreeze() 747 748 // Now new session request can proceed because the first session request will eventually succeed. 749 sh, err := sp.take(ctx) 750 if err != nil { 751 t.Fatalf("session retrival returns error %v, want nil", err) 752 } 753 if !sh.session.isValid() || sh.getID() == "" { 754 t.Fatalf("got invalid session: %v", sh.session) 755 } 756} 757 758// TestSessionRecycle tests recycling sessions. 759func TestSessionRecycle(t *testing.T) { 760 t.Parallel() 761 ctx := context.Background() 762 // Set MaxBurst=MinOpened to prevent additional sessions to be created 763 // while session pool initialization is still running. 764 _, client, teardown := setupMockedTestServerWithConfig(t, 765 ClientConfig{ 766 SessionPoolConfig: SessionPoolConfig{ 767 MinOpened: 1, 768 MaxIdle: 5, 769 MaxBurst: 1, 770 }, 771 }) 772 defer teardown() 773 sp := client.idleSessions 774 775 // Test session is correctly recycled and reused. 776 for i := 0; i < 20; i++ { 777 s, err := sp.take(ctx) 778 if err != nil { 779 t.Fatalf("cannot get the session %v: %v", i, err) 780 } 781 s.recycle() 782 } 783 784 sp.mu.Lock() 785 defer sp.mu.Unlock() 786 // The session pool should only contain 1 session, as there is no minimum 787 // configured. In addition, there has never been more than one session in 788 // use at any time, so there's no need for the session pool to create a 789 // second session. The session has also been in use all the time, so there 790 // also no reason for the session pool to delete the session. 791 if sp.numOpened != 1 { 792 t.Fatalf("Expect session pool size 1, got %d", sp.numOpened) 793 } 794} 795 796// TestSessionDestroy tests destroying sessions. 797func TestSessionDestroy(t *testing.T) { 798 t.Parallel() 799 ctx := context.Background() 800 _, client, teardown := setupMockedTestServerWithConfig(t, 801 ClientConfig{ 802 SessionPoolConfig: SessionPoolConfig{ 803 MinOpened: 1, 804 MaxBurst: 1, 805 }, 806 }) 807 defer teardown() 808 sp := client.idleSessions 809 810 // Creating a session pool with MinSessions=1 will automatically start the 811 // creation of 1 session when the session pool is created. As MaxBurst=1, 812 // the session pool will never create more than 1 session at a time, so the 813 // take() method will wait if the initial session has not yet been created. 814 sh, err := sp.take(ctx) 815 if err != nil { 816 t.Fatalf("cannot get session from session pool: %v", err) 817 } 818 s := sh.session 819 sh.recycle() 820 if d := s.destroy(true); d || !s.isValid() { 821 // Session should be remaining because of min open sessions constraint. 822 t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d) 823 } 824 if d := s.destroy(false); !d || s.isValid() { 825 // Session should be destroyed. 826 t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d) 827 } 828} 829 830// TestHcHeap tests heap operation on top of hcHeap. 831func TestHcHeap(t *testing.T) { 832 in := []*session{ 833 {nextCheck: time.Unix(10, 0)}, 834 {nextCheck: time.Unix(0, 5)}, 835 {nextCheck: time.Unix(1, 8)}, 836 {nextCheck: time.Unix(11, 7)}, 837 {nextCheck: time.Unix(6, 3)}, 838 } 839 want := []*session{ 840 {nextCheck: time.Unix(1, 8), hcIndex: 0}, 841 {nextCheck: time.Unix(6, 3), hcIndex: 1}, 842 {nextCheck: time.Unix(8, 2), hcIndex: 2}, 843 {nextCheck: time.Unix(10, 0), hcIndex: 3}, 844 {nextCheck: time.Unix(11, 7), hcIndex: 4}, 845 } 846 hh := hcHeap{} 847 for _, s := range in { 848 heap.Push(&hh, s) 849 } 850 // Change top of the heap and do a adjustment. 851 hh.sessions[0].nextCheck = time.Unix(8, 2) 852 heap.Fix(&hh, 0) 853 for idx := 0; hh.Len() > 0; idx++ { 854 got := heap.Pop(&hh).(*session) 855 want[idx].hcIndex = -1 856 if !testEqual(got, want[idx]) { 857 t.Fatalf("%v: heap.Pop returns %v, want %v", idx, got, want[idx]) 858 } 859 } 860} 861 862// TestHealthCheckScheduler tests if healthcheck workers can schedule and 863// perform healthchecks properly. 864func TestHealthCheckScheduler(t *testing.T) { 865 t.Parallel() 866 ctx := context.Background() 867 server, client, teardown := setupMockedTestServerWithConfig(t, 868 ClientConfig{ 869 SessionPoolConfig: SessionPoolConfig{ 870 HealthCheckInterval: 50 * time.Millisecond, 871 healthCheckSampleInterval: 10 * time.Millisecond, 872 }, 873 }) 874 defer teardown() 875 sp := client.idleSessions 876 877 // Create 50 sessions. 878 for i := 0; i < 50; i++ { 879 _, err := sp.take(ctx) 880 if err != nil { 881 t.Fatalf("cannot get session from session pool: %v", err) 882 } 883 } 884 885 // Make sure we start with a ping history to avoid that the first 886 // sessions that were created have not already exceeded the maximum 887 // number of pings. 888 server.TestSpanner.ClearPings() 889 // Wait for 10-30 pings per session. 890 waitFor(t, func() error { 891 // Only check actually live sessions and ignore any sessions the 892 // session pool may have deleted in the meantime. 893 liveSessions := server.TestSpanner.DumpSessions() 894 dp := server.TestSpanner.DumpPings() 895 gotPings := map[string]int64{} 896 for _, p := range dp { 897 gotPings[p]++ 898 } 899 for s := range liveSessions { 900 want := int64(20) 901 if got := gotPings[s]; got < want/2 || got > want+want/2 { 902 // This is an unnacceptable amount of pings. 903 return fmt.Errorf("got %v healthchecks on session %v, want it between (%v, %v)", got, s, want/2, want+want/2) 904 } 905 } 906 return nil 907 }) 908} 909 910// TestHealthCheck_FirstHealthCheck tests if the first healthcheck scheduling 911// works properly. 912func TestHealthCheck_FirstHealthCheck(t *testing.T) { 913 t.Parallel() 914 _, client, teardown := setupMockedTestServerWithConfig(t, 915 ClientConfig{ 916 SessionPoolConfig: SessionPoolConfig{ 917 MaxOpened: 0, 918 MinOpened: 0, 919 HealthCheckInterval: 50 * time.Minute, 920 }, 921 }) 922 defer teardown() 923 sp := client.idleSessions 924 925 now := time.Now() 926 start := now.Add(time.Duration(float64(sp.hc.interval) * 0.2)) 927 // A second is added to avoid the edge case. 928 end := now.Add(time.Duration(float64(sp.hc.interval)*1.1) + time.Second) 929 930 s := &session{} 931 sp.hc.scheduledHCLocked(s) 932 933 if s.nextCheck.Before(start) || s.nextCheck.After(end) { 934 t.Fatalf("The first healthcheck schedule is not in the correct range: %v", s.nextCheck) 935 } 936 if !s.firstHCDone { 937 t.Fatal("The flag 'firstHCDone' should be set to true after the first healthcheck.") 938 } 939} 940 941// TestHealthCheck_NonFirstHealthCheck tests if the scheduling after the first 942// health check works properly. 943func TestHealthCheck_NonFirstHealthCheck(t *testing.T) { 944 t.Parallel() 945 _, client, teardown := setupMockedTestServerWithConfig(t, 946 ClientConfig{ 947 SessionPoolConfig: SessionPoolConfig{ 948 MaxOpened: 0, 949 MinOpened: 0, 950 HealthCheckInterval: 50 * time.Minute, 951 }, 952 }) 953 defer teardown() 954 sp := client.idleSessions 955 956 now := time.Now() 957 start := now.Add(time.Duration(float64(sp.hc.interval) * 0.9)) 958 // A second is added to avoid the edge case. 959 end := now.Add(time.Duration(float64(sp.hc.interval)*1.1) + time.Second) 960 961 s := &session{firstHCDone: true} 962 sp.hc.scheduledHCLocked(s) 963 964 if s.nextCheck.Before(start) || s.nextCheck.After(end) { 965 t.Fatalf("The non-first healthcheck schedule is not in the correct range: %v", s.nextCheck) 966 } 967} 968 969// Tests that a fractions of sessions are prepared for write by health checker. 970func TestWriteSessionsPrepared(t *testing.T) { 971 t.Parallel() 972 ctx := context.Background() 973 _, client, teardown := setupMockedTestServerWithConfig(t, 974 ClientConfig{ 975 SessionPoolConfig: SessionPoolConfig{ 976 WriteSessions: 0.5, 977 MaxIdle: 200, 978 HealthCheckInterval: time.Nanosecond, 979 }, 980 }) 981 defer teardown() 982 sp := client.idleSessions 983 984 shs := make([]*sessionHandle, 100) 985 var err error 986 for i := 0; i < 100; i++ { 987 shs[i], err = sp.take(ctx) 988 if err != nil { 989 t.Fatalf("cannot get session from session pool: %v", err) 990 } 991 } 992 // Now there are 100 sessions in the pool. Release them. 993 for _, sh := range shs { 994 sh.recycle() 995 } 996 997 // Take 50 write sessions. The write sessions will be taken from either the 998 // list of prepared sessions (idleWriteList), or they will be prepared 999 // during the takeWriteSession method. 1000 wshs := make([]*sessionHandle, 50) 1001 for i := 0; i < 50; i++ { 1002 wshs[i], err = sp.takeWriteSession(ctx) 1003 if err != nil { 1004 t.Fatalf("cannot get session from session pool: %v", err) 1005 } 1006 if wshs[i].getTransactionID() == nil { 1007 t.Fatalf("got nil transaction id from session pool") 1008 } 1009 } 1010 // Return the session to the pool. 1011 for _, sh := range wshs { 1012 sh.recycle() 1013 } 1014 1015 // Now force creation of 100 more sessions. 1016 shs = make([]*sessionHandle, 200) 1017 for i := 0; i < 200; i++ { 1018 shs[i], err = sp.take(ctx) 1019 if err != nil { 1020 t.Fatalf("cannot get session from session pool: %v", err) 1021 } 1022 } 1023 1024 // Now there are 200 sessions in the pool. Release them. 1025 for _, sh := range shs { 1026 sh.recycle() 1027 } 1028 // The health checker should eventually prepare 100 of the 200 sessions with 1029 // a r/w tx. 1030 waitUntil := time.After(time.Second) 1031 var numWritePrepared int 1032 for numWritePrepared < 100 { 1033 select { 1034 case <-waitUntil: 1035 break 1036 default: 1037 } 1038 sp.mu.Lock() 1039 numWritePrepared = sp.idleWriteList.Len() 1040 sp.mu.Unlock() 1041 } 1042 1043 sp.mu.Lock() 1044 defer sp.mu.Unlock() 1045 if sp.idleWriteList.Len() != 100 { 1046 t.Fatalf("Expect 100 write prepared session, got: %d", sp.idleWriteList.Len()) 1047 } 1048} 1049 1050// TestTakeFromWriteQueue tests that sessionPool.take() returns write prepared 1051// sessions as well. 1052func TestTakeFromWriteQueue(t *testing.T) { 1053 t.Parallel() 1054 ctx := context.Background() 1055 _, client, teardown := setupMockedTestServerWithConfig(t, 1056 ClientConfig{ 1057 SessionPoolConfig: SessionPoolConfig{ 1058 MaxOpened: 1, 1059 WriteSessions: 1.0, 1060 MaxIdle: 1, 1061 HealthCheckInterval: time.Nanosecond, 1062 }, 1063 }) 1064 defer teardown() 1065 sp := client.idleSessions 1066 1067 sh, err := sp.take(ctx) 1068 if err != nil { 1069 t.Fatalf("cannot get session from session pool: %v", err) 1070 } 1071 sh.recycle() 1072 1073 // Wait until the health checker has write-prepared the session. 1074 waitUntil := time.After(time.Second) 1075 var numWritePrepared int 1076 for numWritePrepared == 0 { 1077 select { 1078 case <-waitUntil: 1079 break 1080 default: 1081 } 1082 sp.mu.Lock() 1083 numWritePrepared = sp.idleWriteList.Len() 1084 sp.mu.Unlock() 1085 } 1086 1087 // The session should now be in write queue but take should also return it. 1088 sp.mu.Lock() 1089 if sp.idleWriteList.Len() == 0 { 1090 t.Fatalf("write queue unexpectedly empty") 1091 } 1092 if sp.idleList.Len() != 0 { 1093 t.Fatalf("read queue not empty") 1094 } 1095 sp.mu.Unlock() 1096 sh, err = sp.take(ctx) 1097 if err != nil { 1098 t.Fatalf("cannot get session from session pool: %v", err) 1099 } 1100 sh.recycle() 1101} 1102 1103// The session pool should stop trying to create write-prepared sessions if a 1104// non-transient error occurs while trying to begin a transaction. The 1105// process for preparing write sessions should automatically be re-enabled if 1106// a BeginTransaction call initiated by takeWriteSession succeeds. 1107// 1108// The only exception to the above is that a 'Session not found' error should 1109// cause the session to be removed from the session pool, and it should not 1110// affect the background process of preparing sessions. 1111func TestErrorOnPrepareSession(t *testing.T) { 1112 t.Parallel() 1113 1114 serverErrors := []error{ 1115 status.Errorf(codes.PermissionDenied, "Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource"), 1116 status.Errorf(codes.NotFound, `Database not found: projects/<project>/instances/<instance>/databases/<database> resource_type: "type.googleapis.com/google.spanner.admin.database.v1.Database" resource_name: "projects/<project>/instances/<instance>/databases/<database>" description: "Database does not exist."`), 1117 status.Errorf(codes.FailedPrecondition, "Invalid transaction option"), 1118 status.Errorf(codes.Internal, "Unknown server error"), 1119 } 1120 logger := log.New(os.Stderr, "", log.LstdFlags) 1121 for _, serverErr := range serverErrors { 1122 ctx := context.Background() 1123 server, client, teardown := setupMockedTestServerWithConfig(t, 1124 ClientConfig{ 1125 SessionPoolConfig: SessionPoolConfig{ 1126 MinOpened: 10, 1127 MaxOpened: 10, 1128 WriteSessions: 0.5, 1129 HealthCheckInterval: time.Millisecond, 1130 }, 1131 logger: logger, 1132 }) 1133 defer teardown() 1134 // Discard logging until trying to prepare sessions has stopped. 1135 logger.SetOutput(ioutil.Discard) 1136 server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{ 1137 Errors: []error{serverErr}, 1138 KeepError: true, 1139 }) 1140 sp := client.idleSessions 1141 1142 // Wait until the health checker has tried to write-prepare a session. 1143 // This will cause the session pool to write some errors to the log that 1144 // preparing sessions failed. 1145 waitUntil := time.After(time.Second) 1146 var prepareDisabled bool 1147 var numOpened int 1148 waitForPrepare: 1149 for !prepareDisabled || numOpened < 10 { 1150 select { 1151 case <-waitUntil: 1152 break waitForPrepare 1153 default: 1154 } 1155 sp.mu.Lock() 1156 prepareDisabled = sp.disableBackgroundPrepareSessions 1157 numOpened = sp.idleList.Len() 1158 sp.mu.Unlock() 1159 } 1160 // Re-enable logging. 1161 logger.SetOutput(os.Stderr) 1162 1163 // There should be no write-prepared sessions. 1164 sp.mu.Lock() 1165 if sp.idleWriteList.Len() != 0 { 1166 sp.mu.Unlock() 1167 t.Fatalf("write queue unexpectedly not empty") 1168 } 1169 // All sessions should be in the read idle list. 1170 if g, w := sp.idleList.Len(), 10; g != w { 1171 sp.mu.Unlock() 1172 t.Fatalf("session count mismatch:\nWant: %v\nGot: %v", w, g) 1173 } 1174 sp.mu.Unlock() 1175 // Take a read session should succeed. 1176 sh, err := sp.take(ctx) 1177 if err != nil { 1178 t.Fatalf("cannot get session from session pool: %v", err) 1179 } 1180 sh.recycle() 1181 // Take a write session should fail with the server error. 1182 _, err = sp.takeWriteSession(ctx) 1183 if ErrCode(err) != ErrCode(serverErr) { 1184 t.Fatalf("take write session failed with unexpected error.\nGot: %v\nWant: %v\n", err, serverErr) 1185 } 1186 1187 // Clearing the error on the server should allow us to take a write 1188 // session. 1189 server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{}) 1190 sh, err = sp.takeWriteSession(ctx) 1191 if err != nil { 1192 t.Fatalf("cannot get write session from session pool: %v", err) 1193 } 1194 sh.recycle() 1195 // The maintainer should also pick this up and prepare 50% of the sessions. 1196 waitUntil = time.After(time.Second) 1197 var numPrepared int 1198 for numPrepared < 5 { 1199 select { 1200 case <-waitUntil: 1201 break 1202 default: 1203 } 1204 sp.mu.Lock() 1205 numPrepared = sp.idleWriteList.Len() 1206 sp.mu.Unlock() 1207 } 1208 sp.mu.Lock() 1209 if g, w := sp.idleWriteList.Len(), 5; g != w { 1210 sp.mu.Unlock() 1211 t.Fatalf("write session count mismatch:\nWant: %v\nGot: %v", w, g) 1212 } 1213 sp.mu.Unlock() 1214 } 1215} 1216 1217// The session pool should continue to try to create write-prepared sessions if 1218// a 'Session not found' error occurs. The session that has been deleted by 1219// backend should be removed from the pool, and the maintainer should create a 1220// new session if this causes the number of sessions in the pool to fall below 1221// MinOpened. 1222func TestSessionNotFoundOnPrepareSession(t *testing.T) { 1223 t.Parallel() 1224 1225 // The server will return 'Session not found' for the first 8 1226 // BeginTransaction calls. 1227 sessionNotFoundErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s") 1228 serverErrors := make([]error, 8) 1229 for i := range serverErrors { 1230 serverErrors[i] = sessionNotFoundErr 1231 } 1232 ctx := context.Background() 1233 logger := log.New(os.Stderr, "", log.LstdFlags) 1234 server, client, teardown := setupMockedTestServerWithConfig(t, 1235 ClientConfig{ 1236 SessionPoolConfig: SessionPoolConfig{ 1237 MinOpened: 10, 1238 MaxOpened: 10, 1239 WriteSessions: 0.5, 1240 HealthCheckInterval: time.Millisecond, 1241 healthCheckSampleInterval: time.Millisecond, 1242 }, 1243 logger: logger, 1244 }) 1245 defer teardown() 1246 // Discard logging until trying to prepare sessions has stopped. 1247 logger.SetOutput(ioutil.Discard) 1248 server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{ 1249 Errors: serverErrors, 1250 }) 1251 sp := client.idleSessions 1252 1253 // Wait until the health checker has tried to write-prepare the sessions. 1254 waitUntil := time.After(5 * time.Second) 1255 var numWriteSessions int 1256 var numReadSessions int 1257waitForPrepare: 1258 for (numWriteSessions+numReadSessions) < 10 || numWriteSessions < 5 { 1259 select { 1260 case <-waitUntil: 1261 break waitForPrepare 1262 default: 1263 } 1264 sp.mu.Lock() 1265 numReadSessions = sp.idleList.Len() 1266 numWriteSessions = sp.idleWriteList.Len() 1267 sp.mu.Unlock() 1268 } 1269 // Re-enable logging. 1270 logger.SetOutput(os.Stderr) 1271 1272 // There should be at least 5 write-prepared sessions. 1273 sp.mu.Lock() 1274 if g, w := sp.idleWriteList.Len(), 5; g < w { 1275 sp.mu.Unlock() 1276 t.Fatalf("write-prepared session count mismatch.\nWant at least: %v\nGot: %v", w, g) 1277 } 1278 // The other sessions should be in the read idle list. 1279 if g, w := sp.idleList.Len()+sp.idleWriteList.Len(), 10; g != w { 1280 sp.mu.Unlock() 1281 t.Fatalf("total session count mismatch:\nWant: %v\nGot: %v", w, g) 1282 } 1283 sp.mu.Unlock() 1284 // Take a read session should succeed. 1285 sh, err := sp.take(ctx) 1286 if err != nil { 1287 t.Fatalf("cannot get session from session pool: %v", err) 1288 } 1289 sh.recycle() 1290 // Take a write session should succeed. 1291 sh, err = sp.takeWriteSession(ctx) 1292 if err != nil { 1293 t.Fatalf("take write session failed with unexpected error.\nGot: %v\nWant: %v\n", err, nil) 1294 } 1295 sh.recycle() 1296} 1297 1298// TestSessionHealthCheck tests healthchecking cases. 1299func TestSessionHealthCheck(t *testing.T) { 1300 t.Parallel() 1301 ctx := context.Background() 1302 server, client, teardown := setupMockedTestServerWithConfig(t, 1303 ClientConfig{ 1304 SessionPoolConfig: SessionPoolConfig{ 1305 HealthCheckInterval: time.Nanosecond, 1306 healthCheckSampleInterval: 10 * time.Millisecond, 1307 incStep: 1, 1308 }, 1309 }) 1310 defer teardown() 1311 sp := client.idleSessions 1312 1313 // Test pinging sessions. 1314 sh, err := sp.take(ctx) 1315 if err != nil { 1316 t.Fatalf("cannot get session from session pool: %v", err) 1317 } 1318 1319 // Wait for healthchecker to send pings to session. 1320 waitFor(t, func() error { 1321 pings := server.TestSpanner.DumpPings() 1322 if len(pings) == 0 || pings[0] != sh.getID() { 1323 return fmt.Errorf("healthchecker didn't send any ping to session %v", sh.getID()) 1324 } 1325 return nil 1326 }) 1327 // Test broken session detection. 1328 sh, err = sp.take(ctx) 1329 if err != nil { 1330 t.Fatalf("cannot get session from session pool: %v", err) 1331 } 1332 1333 server.TestSpanner.Freeze() 1334 server.TestSpanner.PutExecutionTime(MethodExecuteSql, 1335 SimulatedExecutionTime{ 1336 Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}, 1337 KeepError: true, 1338 }) 1339 server.TestSpanner.Unfreeze() 1340 1341 s := sh.session 1342 waitFor(t, func() error { 1343 if sh.session.isValid() { 1344 return fmt.Errorf("session(%v) is still alive, want it to be dropped by healthcheck workers", s) 1345 } 1346 return nil 1347 }) 1348 1349 server.TestSpanner.Freeze() 1350 server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{}) 1351 server.TestSpanner.Unfreeze() 1352 1353 // Test garbage collection. 1354 sh, err = sp.take(ctx) 1355 if err != nil { 1356 t.Fatalf("cannot get session from session pool: %v", err) 1357 } 1358 sp.close() 1359 if sh.session.isValid() { 1360 t.Fatalf("session(%v) is still alive, want it to be garbage collected", s) 1361 } 1362} 1363 1364// TestStressSessionPool does stress test on session pool by the following concurrent operations: 1365// 1) Test worker gets a session from the pool. 1366// 2) Test worker turns a session back into the pool. 1367// 3) Test worker destroys a session got from the pool. 1368// 4) Healthcheck destroys a broken session (because a worker has already destroyed it). 1369// 5) Test worker closes the session pool. 1370// 1371// During the test, the session pool maintainer maintains the number of sessions, 1372// and it is expected that all sessions that are taken from session pool remains valid. 1373// When all test workers and healthcheck workers exit, mockclient, session pool 1374// and healthchecker should be in consistent state. 1375func TestStressSessionPool(t *testing.T) { 1376 t.Parallel() 1377 1378 // Use concurrent workers to test different session pool built from different configurations. 1379 for ti, cfg := range []SessionPoolConfig{ 1380 {}, 1381 {MinOpened: 10, MaxOpened: 100}, 1382 {MaxBurst: 50}, 1383 {MinOpened: 10, MaxOpened: 200, MaxBurst: 5}, 1384 {MinOpened: 10, MaxOpened: 200, MaxBurst: 5, WriteSessions: 0.2}, 1385 } { 1386 // Create a more aggressive session healthchecker to increase test concurrency. 1387 cfg.HealthCheckInterval = 50 * time.Millisecond 1388 cfg.healthCheckSampleInterval = 10 * time.Millisecond 1389 cfg.HealthCheckWorkers = 50 1390 1391 server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ 1392 SessionPoolConfig: cfg, 1393 }) 1394 sp := client.idleSessions 1395 1396 // Create a test group for this configuration and schedule 100 sub 1397 // sub tests within the group. 1398 t.Run(fmt.Sprintf("TestStressSessionPoolGroup%v", ti), func(t *testing.T) { 1399 for i := 0; i < 100; i++ { 1400 idx := i 1401 t.Logf("TestStressSessionPoolWithCfg%dWorker%03d", ti, idx) 1402 testStressSessionPool(t, cfg, ti, idx, sp, client) 1403 } 1404 }) 1405 sp.hc.close() 1406 // Here the states of healthchecker, session pool and mockclient are 1407 // stable. 1408 sp.mu.Lock() 1409 idleSessions := map[string]bool{} 1410 hcSessions := map[string]bool{} 1411 mockSessions := server.TestSpanner.DumpSessions() 1412 // Dump session pool's idle list. 1413 for sl := sp.idleList.Front(); sl != nil; sl = sl.Next() { 1414 s := sl.Value.(*session) 1415 if idleSessions[s.getID()] { 1416 t.Fatalf("%v: found duplicated session in idle list: %v", ti, s.getID()) 1417 } 1418 idleSessions[s.getID()] = true 1419 } 1420 for sl := sp.idleWriteList.Front(); sl != nil; sl = sl.Next() { 1421 s := sl.Value.(*session) 1422 if idleSessions[s.getID()] { 1423 t.Fatalf("%v: found duplicated session in idle write list: %v", ti, s.getID()) 1424 } 1425 idleSessions[s.getID()] = true 1426 } 1427 if int(sp.numOpened) != len(idleSessions) { 1428 t.Fatalf("%v: number of opened sessions (%v) != number of idle sessions (%v)", ti, sp.numOpened, len(idleSessions)) 1429 } 1430 if sp.createReqs != 0 { 1431 t.Fatalf("%v: number of pending session creations = %v, want 0", ti, sp.createReqs) 1432 } 1433 // Dump healthcheck queue. 1434 sp.hc.mu.Lock() 1435 for _, s := range sp.hc.queue.sessions { 1436 if hcSessions[s.getID()] { 1437 t.Fatalf("%v: found duplicated session in healthcheck queue: %v", ti, s.getID()) 1438 } 1439 hcSessions[s.getID()] = true 1440 } 1441 sp.mu.Unlock() 1442 sp.hc.mu.Unlock() 1443 1444 // Verify that idleSessions == hcSessions == mockSessions. 1445 if !testEqual(idleSessions, hcSessions) { 1446 t.Fatalf("%v: sessions in idle list (%v) != sessions in healthcheck queue (%v)", ti, idleSessions, hcSessions) 1447 } 1448 // The server may contain more sessions than the health check queue. 1449 // This can be caused by a timeout client side during a CreateSession 1450 // request. The request may still be received and executed by the 1451 // server, but the session pool will not register the session. 1452 for id, b := range hcSessions { 1453 if b && !mockSessions[id] { 1454 t.Fatalf("%v: session in healthcheck queue (%v) was not found on server", ti, id) 1455 } 1456 } 1457 sp.close() 1458 mockSessions = server.TestSpanner.DumpSessions() 1459 for id, b := range hcSessions { 1460 if b && mockSessions[id] { 1461 // We only log a warning for this, as it sometimes happens. 1462 // The exact reason for it is unknown, but in a real life 1463 // situation the session would be garbage collected by the 1464 // server after 60 minutes. 1465 t.Logf("Found session from pool still live on server: %v", id) 1466 } 1467 } 1468 teardown() 1469 } 1470} 1471 1472func testStressSessionPool(t *testing.T, cfg SessionPoolConfig, ti int, idx int, pool *sessionPool, client *Client) { 1473 ctx := context.Background() 1474 // Test worker iterates 1K times and tries different 1475 // session / session pool operations. 1476 for j := 0; j < 1000; j++ { 1477 if idx%10 == 0 && j >= 900 { 1478 // Close the pool in selected set of workers during the 1479 // middle of the test. 1480 pool.close() 1481 } 1482 // Take a write sessions ~ 20% of the times. 1483 takeWrite := rand.Intn(5) == 4 1484 var ( 1485 sh *sessionHandle 1486 gotErr error 1487 ) 1488 wasValid := pool.isValid() 1489 if takeWrite { 1490 sh, gotErr = pool.takeWriteSession(ctx) 1491 } else { 1492 sh, gotErr = pool.take(ctx) 1493 } 1494 if gotErr != nil { 1495 if pool.isValid() { 1496 t.Fatalf("%v.%v: pool.take returns error when pool is still valid: %v", ti, idx, gotErr) 1497 } 1498 // If the session pool was closed when we tried to take a session 1499 // from the pool, then we should have gotten a specific error. 1500 // If the session pool was closed between the take() and now (or 1501 // even during a take()) then an error is ok. 1502 if !wasValid { 1503 if wantErr := errInvalidSessionPool; gotErr != wantErr { 1504 t.Fatalf("%v.%v: got error when pool is closed: %v, want %v", ti, idx, gotErr, wantErr) 1505 } 1506 } 1507 continue 1508 } 1509 // Verify if session is valid when session pool is valid. 1510 // Note that if session pool is invalid after sh is taken, 1511 // then sh might be invalidated by healthcheck workers. 1512 if (sh.getID() == "" || sh.session == nil || !sh.session.isValid()) && pool.isValid() { 1513 t.Fatalf("%v.%v.%v: pool.take returns invalid session %v", ti, idx, takeWrite, sh.session) 1514 } 1515 if takeWrite && sh.getTransactionID() == nil { 1516 t.Fatalf("%v.%v: pool.takeWriteSession returns session %v without transaction", ti, idx, sh.session) 1517 } 1518 if rand.Intn(100) < idx { 1519 // Random sleep before destroying/recycling the session, 1520 // to give healthcheck worker a chance to step in. 1521 <-time.After(time.Duration(rand.Int63n(int64(cfg.HealthCheckInterval)))) 1522 } 1523 if rand.Intn(100) < idx { 1524 // destroy the session. 1525 sh.destroy() 1526 continue 1527 } 1528 // recycle the session. 1529 sh.recycle() 1530 } 1531} 1532 1533// TestMaintainer checks the session pool maintainer maintains the number of 1534// sessions in the following cases: 1535// 1536// 1. On initialization of session pool, replenish session pool to meet 1537// MinOpened or MaxIdle. 1538// 2. On increased session usage, provision extra MaxIdle sessions. 1539// 3. After the surge passes, scale down the session pool accordingly. 1540func TestMaintainer(t *testing.T) { 1541 t.Parallel() 1542 ctx := context.Background() 1543 1544 minOpened := uint64(5) 1545 maxIdle := uint64(4) 1546 _, client, teardown := setupMockedTestServerWithConfig(t, 1547 ClientConfig{ 1548 SessionPoolConfig: SessionPoolConfig{ 1549 MinOpened: minOpened, 1550 MaxIdle: maxIdle, 1551 healthCheckSampleInterval: time.Millisecond, 1552 }, 1553 }) 1554 defer teardown() 1555 sp := client.idleSessions 1556 1557 waitFor(t, func() error { 1558 sp.mu.Lock() 1559 defer sp.mu.Unlock() 1560 if sp.numOpened != 5 { 1561 return fmt.Errorf("Replenish. Expect %d open, got %d", sp.MinOpened, sp.numOpened) 1562 } 1563 return nil 1564 }) 1565 1566 // To save test time, we are not creating many sessions, because the time 1567 // to create sessions will have impact on the decision on sessionsToKeep. 1568 // We also parallelize the take and recycle process. 1569 shs := make([]*sessionHandle, 20) 1570 for i := 0; i < len(shs); i++ { 1571 var err error 1572 shs[i], err = sp.take(ctx) 1573 if err != nil { 1574 t.Fatalf("cannot get session from session pool: %v", err) 1575 } 1576 } 1577 sp.mu.Lock() 1578 g, w := sp.numOpened, sp.MinOpened+sp.incStep 1579 sp.mu.Unlock() 1580 if g != w { 1581 t.Fatalf("numOpened sessions mismatch\nGot: %d\nWant: %d", g, w) 1582 } 1583 1584 // Return 14 sessions to the pool. There are still 6 sessions checked out. 1585 for _, sh := range shs[:14] { 1586 sh.recycle() 1587 } 1588 1589 // The pool should scale down to sessionsInUse + MaxIdle = 6 + 4 = 10. 1590 waitFor(t, func() error { 1591 sp.mu.Lock() 1592 defer sp.mu.Unlock() 1593 if sp.numOpened != 10 { 1594 return fmt.Errorf("Keep extra MaxIdle sessions. Expect %d open, got %d", 10, sp.numOpened) 1595 } 1596 return nil 1597 }) 1598 1599 // Return the remaining 6 sessions. 1600 // The pool should now scale down to minOpened + maxIdle. 1601 for _, sh := range shs[14:] { 1602 sh.recycle() 1603 } 1604 waitFor(t, func() error { 1605 sp.mu.Lock() 1606 defer sp.mu.Unlock() 1607 if sp.numOpened != minOpened { 1608 return fmt.Errorf("Scale down. Expect %d open, got %d", minOpened+maxIdle, sp.numOpened) 1609 } 1610 return nil 1611 }) 1612} 1613 1614// Tests that the session pool creates up to MinOpened connections. 1615// 1616// Historical context: This test also checks that a low 1617// healthCheckSampleInterval does not prevent it from opening connections. 1618// The low healthCheckSampleInterval will however sometimes cause session 1619// creations to time out. That should not be considered a problem, but it 1620// could cause the test case to fail if it happens too often. 1621// See: https://github.com/googleapis/google-cloud-go/issues/1259 1622func TestInit_CreatesSessions(t *testing.T) { 1623 t.Parallel() 1624 spc := SessionPoolConfig{ 1625 MinOpened: 10, 1626 MaxIdle: 10, 1627 WriteSessions: 0.0, 1628 healthCheckSampleInterval: 20 * time.Millisecond, 1629 } 1630 server, client, teardown := setupMockedTestServerWithConfig(t, 1631 ClientConfig{ 1632 SessionPoolConfig: spc, 1633 NumChannels: 4, 1634 }) 1635 defer teardown() 1636 sp := client.idleSessions 1637 1638 timeout := time.After(4 * time.Second) 1639 var numOpened int 1640loop: 1641 for { 1642 select { 1643 case <-timeout: 1644 t.Fatalf("timed out, got %d session(s), want %d", numOpened, spc.MinOpened) 1645 default: 1646 sp.mu.Lock() 1647 numOpened = sp.idleList.Len() + sp.idleWriteList.Len() 1648 sp.mu.Unlock() 1649 if numOpened == 10 { 1650 break loop 1651 } 1652 } 1653 } 1654 _, err := shouldHaveReceived(server.TestSpanner, []interface{}{ 1655 &sppb.BatchCreateSessionsRequest{}, 1656 &sppb.BatchCreateSessionsRequest{}, 1657 &sppb.BatchCreateSessionsRequest{}, 1658 &sppb.BatchCreateSessionsRequest{}, 1659 }) 1660 if err != nil { 1661 t.Fatal(err) 1662 } 1663} 1664 1665// Tests that the session pool with a MinSessions>0 also prepares WriteSessions 1666// sessions. 1667func TestInit_PreparesSessions(t *testing.T) { 1668 t.Parallel() 1669 spc := SessionPoolConfig{ 1670 MinOpened: 10, 1671 MaxIdle: 10, 1672 WriteSessions: 0.5, 1673 healthCheckSampleInterval: 20 * time.Millisecond, 1674 } 1675 _, client, teardown := setupMockedTestServerWithConfig(t, 1676 ClientConfig{ 1677 SessionPoolConfig: spc, 1678 }) 1679 defer teardown() 1680 sp := client.idleSessions 1681 1682 timeoutAmt := 4 * time.Second 1683 timeout := time.After(timeoutAmt) 1684 var numPrepared int 1685 want := int(spc.WriteSessions * float64(spc.MinOpened)) 1686loop: 1687 for { 1688 select { 1689 case <-timeout: 1690 t.Fatalf("timed out after %v, got %d write-prepared session(s), want %d", timeoutAmt, numPrepared, want) 1691 default: 1692 sp.mu.Lock() 1693 numPrepared = sp.idleWriteList.Len() 1694 sp.mu.Unlock() 1695 if numPrepared == want { 1696 break loop 1697 } 1698 } 1699 } 1700} 1701 1702func (s1 *session) Equal(s2 *session) bool { 1703 return s1.client == s2.client && 1704 s1.id == s2.id && 1705 s1.pool == s2.pool && 1706 s1.createTime == s2.createTime && 1707 s1.valid == s2.valid && 1708 s1.hcIndex == s2.hcIndex && 1709 s1.idleList == s2.idleList && 1710 s1.nextCheck.Equal(s2.nextCheck) && 1711 s1.checkingHealth == s2.checkingHealth && 1712 testEqual(s1.md, s2.md) && 1713 bytes.Equal(s1.tx, s2.tx) 1714} 1715 1716func waitFor(t *testing.T, assert func() error) { 1717 t.Helper() 1718 timeout := 15 * time.Second 1719 ta := time.After(timeout) 1720 1721 for { 1722 select { 1723 case <-ta: 1724 if err := assert(); err != nil { 1725 t.Fatalf("after %v waiting, got %v", timeout, err) 1726 } 1727 return 1728 default: 1729 } 1730 1731 if err := assert(); err != nil { 1732 // Fail. Let's pause and retry. 1733 time.Sleep(10 * time.Millisecond) 1734 continue 1735 } 1736 1737 return 1738 } 1739} 1740 1741// Tests that maintainer only deletes sessions after a full maintenance window 1742// of 10 cycles has finished. 1743func TestMaintainer_DeletesSessions(t *testing.T) { 1744 t.Parallel() 1745 1746 ctx := context.Background() 1747 const sampleInterval = time.Millisecond * 10 1748 _, client, teardown := setupMockedTestServerWithConfig(t, 1749 ClientConfig{ 1750 SessionPoolConfig: SessionPoolConfig{healthCheckSampleInterval: sampleInterval}, 1751 }) 1752 defer teardown() 1753 sp := client.idleSessions 1754 1755 // Take two sessions from the pool. 1756 // This will cause max sessions in use to be 2 during this window. 1757 sh1 := takeSession(ctx, t, sp) 1758 sh2 := takeSession(ctx, t, sp) 1759 wantSessions := map[string]bool{} 1760 wantSessions[sh1.getID()] = true 1761 wantSessions[sh2.getID()] = true 1762 // Return the sessions to the pool and then assure that they 1763 // are not deleted while still within the maintenance window. 1764 sh1.recycle() 1765 sh2.recycle() 1766 // Wait for 20 milliseconds, i.e. approx 2 iterations of the 1767 // maintainer. The sessions should still be in the pool. 1768 <-time.After(sampleInterval * 2) 1769 sh3 := takeSession(ctx, t, sp) 1770 sh4 := takeSession(ctx, t, sp) 1771 // Check that the returned sessions are equal to the sessions that we got 1772 // the first time from the session pool. 1773 gotSessions := map[string]bool{} 1774 gotSessions[sh3.getID()] = true 1775 gotSessions[sh4.getID()] = true 1776 testEqual(wantSessions, gotSessions) 1777 // Return the sessions to the pool. 1778 sh3.recycle() 1779 sh4.recycle() 1780 1781 // Now wait for the maintenance window to finish. This will cause the 1782 // maintainer to enter a new window and reset the max number of sessions in 1783 // use to the currently number of checked out sessions. That is 0, as all 1784 // sessions have been returned to the pool. That again will cause the 1785 // maintainer to delete these sessions at the next iteration, unless we 1786 // checkout new sessions during the first iteration. 1787 waitFor(t, func() error { 1788 sp.mu.Lock() 1789 defer sp.mu.Unlock() 1790 if sp.numOpened > 0 { 1791 return fmt.Errorf("session pool still contains more than 0 sessions") 1792 } 1793 return nil 1794 }) 1795 sh5 := takeSession(ctx, t, sp) 1796 sh6 := takeSession(ctx, t, sp) 1797 // Assure that these sessions are new sessions. 1798 if gotSessions[sh5.getID()] || gotSessions[sh6.getID()] { 1799 t.Fatal("got unexpected existing session from pool") 1800 } 1801} 1802 1803func takeSession(ctx context.Context, t *testing.T, sp *sessionPool) *sessionHandle { 1804 sh, err := sp.take(ctx) 1805 if err != nil { 1806 t.Fatalf("cannot get session from session pool: %v", err) 1807 } 1808 return sh 1809} 1810 1811func TestMaintenanceWindow_CycleAndUpdateMaxCheckedOut(t *testing.T) { 1812 t.Parallel() 1813 1814 maxOpened := uint64(1000) 1815 mw := newMaintenanceWindow(maxOpened) 1816 for _, m := range mw.maxSessionsCheckedOut { 1817 if m < maxOpened { 1818 t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, maxOpened) 1819 } 1820 } 1821 // Do one cycle and simulate that there are currently no sessions checked 1822 // out of the pool. 1823 mw.startNewCycle(0) 1824 if g, w := mw.maxSessionsCheckedOut[0], uint64(0); g != w { 1825 t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w) 1826 } 1827 for _, m := range mw.maxSessionsCheckedOut[1:] { 1828 if m < maxOpened { 1829 t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, maxOpened) 1830 } 1831 } 1832 // Check that the max checked out during the entire window is still 1833 // maxOpened. 1834 if g, w := mw.maxSessionsCheckedOutDuringWindow(), maxOpened; g != w { 1835 t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w) 1836 } 1837 // Update the max number checked out for the current cycle. 1838 mw.updateMaxSessionsCheckedOutDuringWindow(uint64(10)) 1839 if g, w := mw.maxSessionsCheckedOut[0], uint64(10); g != w { 1840 t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w) 1841 } 1842 // The max of the entire window should still not change. 1843 if g, w := mw.maxSessionsCheckedOutDuringWindow(), maxOpened; g != w { 1844 t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w) 1845 } 1846 // Now pass enough cycles to complete a maintenance window. Each cycle has 1847 // no sessions checked out. We start at 1, as we have already passed one 1848 // cycle. This should then be the last cycle still in the maintenance 1849 // window, and the only one with a maxSessionsCheckedOut greater than 0. 1850 for i := 1; i < maintenanceWindowSize; i++ { 1851 mw.startNewCycle(0) 1852 } 1853 for _, m := range mw.maxSessionsCheckedOut[:9] { 1854 if m != 0 { 1855 t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, 0) 1856 } 1857 } 1858 // The oldest cycle in the window should have max=10. 1859 if g, w := mw.maxSessionsCheckedOut[maintenanceWindowSize-1], uint64(10); g != w { 1860 t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w) 1861 } 1862 // The max of the entire window should now be 10. 1863 if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(10); g != w { 1864 t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w) 1865 } 1866 // Do another cycle with max=0. 1867 mw.startNewCycle(0) 1868 // The max of the entire window should now be 0. 1869 if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(0); g != w { 1870 t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w) 1871 } 1872 // Do another cycle with 5 sessions as max. This should now be the new 1873 // window max. 1874 mw.startNewCycle(5) 1875 if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(5); g != w { 1876 t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w) 1877 } 1878 // Do a couple of cycles so that the only non-zero value is in the middle. 1879 // The max for the entire window should still be 5. 1880 for i := 0; i < maintenanceWindowSize/2; i++ { 1881 mw.startNewCycle(0) 1882 } 1883 if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(5); g != w { 1884 t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w) 1885 } 1886} 1887 1888func TestSessionCreationIsDistributedOverChannels(t *testing.T) { 1889 t.Parallel() 1890 numChannels := 4 1891 spc := SessionPoolConfig{ 1892 MinOpened: 12, 1893 WriteSessions: 0.0, 1894 incStep: 2, 1895 } 1896 _, client, teardown := setupMockedTestServerWithConfig(t, 1897 ClientConfig{ 1898 SessionPoolConfig: spc, 1899 NumChannels: numChannels, 1900 }) 1901 defer teardown() 1902 sp := client.idleSessions 1903 1904 waitFor(t, func() error { 1905 sp.mu.Lock() 1906 // WriteSessions = 0, so we only have to check for read sessions. 1907 numOpened := uint64(sp.idleList.Len()) 1908 sp.mu.Unlock() 1909 if numOpened < spc.MinOpened { 1910 return fmt.Errorf("not yet initialized") 1911 } 1912 return nil 1913 }) 1914 1915 sessionsPerChannel := getSessionsPerChannel(sp) 1916 if g, w := len(sessionsPerChannel), numChannels; g != w { 1917 t.Errorf("number of channels mismatch\nGot: %d\nWant: %d", g, w) 1918 } 1919 for k, v := range sessionsPerChannel { 1920 if g, w := v, int(sp.MinOpened)/numChannels; g != w { 1921 t.Errorf("number of sessions mismatch for %s:\nGot: %d\nWant: %d", k, g, w) 1922 } 1923 } 1924 // Check out all sessions + incStep * numChannels from the pool. This 1925 // should cause incStep * numChannels additional sessions to be created. 1926 checkedOut := make([]*sessionHandle, sp.MinOpened+sp.incStep*uint64(numChannels)) 1927 var err error 1928 for i := 0; i < cap(checkedOut); i++ { 1929 checkedOut[i], err = sp.take(context.Background()) 1930 if err != nil { 1931 t.Fatal(err) 1932 } 1933 } 1934 for i := 0; i < cap(checkedOut); i++ { 1935 checkedOut[i].recycle() 1936 } 1937 // The additional sessions should also be distributed over all available 1938 // channels. 1939 sessionsPerChannel = getSessionsPerChannel(sp) 1940 // There should not be any new clients (channels). 1941 if g, w := len(sessionsPerChannel), numChannels; g != w { 1942 t.Errorf("number of channels mismatch\nGot: %d\nWant: %d", g, w) 1943 } 1944 for k, v := range sessionsPerChannel { 1945 if g, w := v, int(sp.MinOpened)/numChannels+int(sp.incStep); g != w { 1946 t.Errorf("number of sessions mismatch for %s:\nGot: %d\nWant: %d", k, g, w) 1947 } 1948 } 1949} 1950 1951func getSessionsPerChannel(sp *sessionPool) map[string]int { 1952 sessionsPerChannel := make(map[string]int) 1953 sp.mu.Lock() 1954 defer sp.mu.Unlock() 1955 el := sp.idleList.Front() 1956 for el != nil { 1957 s, _ := el.Value.(*session) 1958 // Get the pointer to the actual underlying gRPC ClientConn and use 1959 // that as the key in the map. 1960 val := reflect.ValueOf(s.client).Elem() 1961 connPool := val.FieldByName("connPool").Elem().Elem() 1962 conn := connPool.Field(0).Pointer() 1963 key := fmt.Sprintf("%v", conn) 1964 sessionsPerChannel[key] = sessionsPerChannel[key] + 1 1965 el = el.Next() 1966 } 1967 return sessionsPerChannel 1968} 1969