1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "bytes" 21 "container/heap" 22 "context" 23 "fmt" 24 "io/ioutil" 25 "log" 26 "math/rand" 27 "os" 28 "strings" 29 "testing" 30 "time" 31 32 . "cloud.google.com/go/spanner/internal/testutil" 33 "google.golang.org/api/iterator" 34 "google.golang.org/genproto/googleapis/rpc/errdetails" 35 sppb "google.golang.org/genproto/googleapis/spanner/v1" 36 "google.golang.org/grpc/codes" 37 "google.golang.org/grpc/status" 38) 39 40func newSessionNotFoundError(name string) error { 41 s := status.Newf(codes.NotFound, "Session not found: Session with id %s not found", name) 42 s, _ = s.WithDetails(&errdetails.ResourceInfo{ResourceType: sessionResourceType, ResourceName: name}) 43 return s.Err() 44} 45 46// TestSessionPoolConfigValidation tests session pool config validation. 47func TestSessionPoolConfigValidation(t *testing.T) { 48 t.Parallel() 49 _, client, teardown := setupMockedTestServer(t) 50 defer teardown() 51 52 for _, test := range []struct { 53 spc SessionPoolConfig 54 err error 55 }{ 56 { 57 SessionPoolConfig{ 58 MinOpened: 10, 59 MaxOpened: 5, 60 }, 61 errMinOpenedGTMaxOpened(5, 10), 62 }, 63 { 64 SessionPoolConfig{ 65 WriteSessions: -0.1, 66 }, 67 errWriteFractionOutOfRange(-0.1), 68 }, 69 { 70 SessionPoolConfig{ 71 WriteSessions: 2.0, 72 }, 73 errWriteFractionOutOfRange(2.0), 74 }, 75 { 76 SessionPoolConfig{ 77 HealthCheckWorkers: -1, 78 }, 79 errHealthCheckWorkersNegative(-1), 80 }, 81 { 82 SessionPoolConfig{ 83 HealthCheckInterval: -time.Second, 84 }, 85 errHealthCheckIntervalNegative(-time.Second), 86 }, 87 } { 88 if _, err := newSessionPool(client.sc, test.spc); !testEqual(err, test.err) { 89 t.Fatalf("want %v, got %v", test.err, err) 90 } 91 } 92} 93 94// TestSessionCreation tests session creation during sessionPool.Take(). 95func TestSessionCreation(t *testing.T) { 96 t.Parallel() 97 ctx := context.Background() 98 server, client, teardown := setupMockedTestServer(t) 99 defer teardown() 100 sp := client.idleSessions 101 102 // Take three sessions from session pool, this should trigger session pool 103 // to create three new sessions. 104 shs := make([]*sessionHandle, 3) 105 // gotDs holds the unique sessions taken from session pool. 106 gotDs := map[string]bool{} 107 for i := 0; i < len(shs); i++ { 108 var err error 109 shs[i], err = sp.take(ctx) 110 if err != nil { 111 t.Fatalf("failed to get session(%v): %v", i, err) 112 } 113 gotDs[shs[i].getID()] = true 114 } 115 if len(gotDs) != len(shs) { 116 t.Fatalf("session pool created %v sessions, want %v", len(gotDs), len(shs)) 117 } 118 if wantDs := server.TestSpanner.DumpSessions(); !testEqual(gotDs, wantDs) { 119 t.Fatalf("session pool creates sessions %v, want %v", gotDs, wantDs) 120 } 121 // Verify that created sessions are recorded correctly in session pool. 122 sp.mu.Lock() 123 if int(sp.numOpened) != len(shs) { 124 t.Fatalf("session pool reports %v open sessions, want %v", sp.numOpened, len(shs)) 125 } 126 if sp.createReqs != 0 { 127 t.Fatalf("session pool reports %v session create requests, want 0", int(sp.createReqs)) 128 } 129 sp.mu.Unlock() 130 // Verify that created sessions are tracked correctly by healthcheck queue. 131 hc := sp.hc 132 hc.mu.Lock() 133 if hc.queue.Len() != len(shs) { 134 t.Fatalf("healthcheck queue length = %v, want %v", hc.queue.Len(), len(shs)) 135 } 136 for _, s := range hc.queue.sessions { 137 if !gotDs[s.getID()] { 138 t.Fatalf("session %v is in healthcheck queue, but it is not created by session pool", s.getID()) 139 } 140 } 141 hc.mu.Unlock() 142} 143 144// TestLIFOSessionOrder tests if session pool hand out sessions in LIFO order. 145func TestLIFOSessionOrder(t *testing.T) { 146 t.Parallel() 147 ctx := context.Background() 148 _, client, teardown := setupMockedTestServerWithConfig(t, 149 ClientConfig{ 150 SessionPoolConfig: SessionPoolConfig{ 151 MaxOpened: 3, 152 MinOpened: 3, 153 }, 154 }) 155 defer teardown() 156 sp := client.idleSessions 157 // Create/take three sessions and recycle them. 158 shs, shsIDs := make([]*sessionHandle, 3), make([]string, 3) 159 for i := 0; i < len(shs); i++ { 160 var err error 161 if shs[i], err = sp.take(ctx); err != nil { 162 t.Fatalf("failed to take session(%v): %v", i, err) 163 } 164 shsIDs[i] = shs[i].getID() 165 } 166 for i := 0; i < len(shs); i++ { 167 shs[i].recycle() 168 } 169 for i := 2; i >= 0; i-- { 170 sh, err := sp.take(ctx) 171 if err != nil { 172 t.Fatalf("cannot take session from session pool: %v", err) 173 } 174 // check, if sessions returned in LIFO order. 175 if wantID, gotID := shsIDs[i], sh.getID(); wantID != gotID { 176 t.Fatalf("got session with id: %v, want: %v", gotID, wantID) 177 } 178 } 179} 180 181// TestLIFOTakeWriteSessionOrder tests if write session pool hand out sessions in LIFO order. 182func TestLIFOTakeWriteSessionOrder(t *testing.T) { 183 t.Skip("https://github.com/googleapis/google-cloud-go/issues/1704") 184 t.Parallel() 185 ctx := context.Background() 186 _, client, teardown := setupMockedTestServerWithConfig(t, 187 ClientConfig{ 188 SessionPoolConfig: SessionPoolConfig{ 189 MaxOpened: 3, 190 MinOpened: 3, 191 WriteSessions: 1, 192 }, 193 }) 194 defer teardown() 195 sp := client.idleSessions 196 // Create/take three sessions and recycle them. 197 shs, shsIDs := make([]*sessionHandle, 3), make([]string, 3) 198 for i := 0; i < len(shs); i++ { 199 var err error 200 if shs[i], err = sp.takeWriteSession(ctx); err != nil { 201 t.Fatalf("failed to take session(%v): %v", i, err) 202 } 203 shsIDs[i] = shs[i].getID() 204 } 205 for i := 0; i < len(shs); i++ { 206 shs[i].recycle() 207 } 208 for i := 2; i >= 0; i-- { 209 ws, err := sp.takeWriteSession(ctx) 210 if err != nil { 211 t.Fatalf("cannot take session from session pool: %v", err) 212 } 213 // check, if write sessions returned in LIFO order. 214 if wantID, gotID := shsIDs[i], ws.getID(); wantID != gotID { 215 t.Fatalf("got session with id: %v, want: %v", gotID, wantID) 216 } 217 } 218} 219 220// TestTakeFromIdleList tests taking sessions from session pool's idle list. 221func TestTakeFromIdleList(t *testing.T) { 222 t.Parallel() 223 ctx := context.Background() 224 225 // Make sure maintainer keeps the idle sessions. 226 server, client, teardown := setupMockedTestServerWithConfig(t, 227 ClientConfig{ 228 SessionPoolConfig: SessionPoolConfig{MaxIdle: 10}, 229 }) 230 defer teardown() 231 sp := client.idleSessions 232 233 // Take ten sessions from session pool and recycle them. 234 shs := make([]*sessionHandle, 10) 235 for i := 0; i < len(shs); i++ { 236 var err error 237 shs[i], err = sp.take(ctx) 238 if err != nil { 239 t.Fatalf("failed to get session(%v): %v", i, err) 240 } 241 } 242 // Make sure it's sampled once before recycling, otherwise it will be 243 // cleaned up. 244 <-time.After(sp.SessionPoolConfig.healthCheckSampleInterval) 245 for i := 0; i < len(shs); i++ { 246 shs[i].recycle() 247 } 248 // Further session requests from session pool won't cause mockclient to 249 // create more sessions. 250 wantSessions := server.TestSpanner.DumpSessions() 251 // Take ten sessions from session pool again, this time all sessions should 252 // come from idle list. 253 gotSessions := map[string]bool{} 254 for i := 0; i < len(shs); i++ { 255 sh, err := sp.take(ctx) 256 if err != nil { 257 t.Fatalf("cannot take session from session pool: %v", err) 258 } 259 gotSessions[sh.getID()] = true 260 } 261 if len(gotSessions) != 10 { 262 t.Fatalf("got %v unique sessions, want 10", len(gotSessions)) 263 } 264 if !testEqual(gotSessions, wantSessions) { 265 t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions) 266 } 267} 268 269// TesttakeWriteSessionFromIdleList tests taking write sessions from session 270// pool's idle list. 271func TestTakeWriteSessionFromIdleList(t *testing.T) { 272 t.Parallel() 273 ctx := context.Background() 274 275 // Make sure maintainer keeps the idle sessions. 276 server, client, teardown := setupMockedTestServerWithConfig(t, 277 ClientConfig{ 278 SessionPoolConfig: SessionPoolConfig{MaxIdle: 20}, 279 }) 280 defer teardown() 281 sp := client.idleSessions 282 283 // Take ten sessions from session pool and recycle them. 284 shs := make([]*sessionHandle, 10) 285 for i := 0; i < len(shs); i++ { 286 var err error 287 shs[i], err = sp.takeWriteSession(ctx) 288 if err != nil { 289 t.Fatalf("failed to get session(%v): %v", i, err) 290 } 291 } 292 // Make sure it's sampled once before recycling, otherwise it will be 293 // cleaned up. 294 <-time.After(sp.SessionPoolConfig.healthCheckSampleInterval) 295 for i := 0; i < len(shs); i++ { 296 shs[i].recycle() 297 } 298 // Further session requests from session pool won't cause mockclient to 299 // create more sessions. 300 wantSessions := server.TestSpanner.DumpSessions() 301 // Take ten sessions from session pool again, this time all sessions should 302 // come from idle list. 303 gotSessions := map[string]bool{} 304 for i := 0; i < len(shs); i++ { 305 sh, err := sp.takeWriteSession(ctx) 306 if err != nil { 307 t.Fatalf("cannot take session from session pool: %v", err) 308 } 309 gotSessions[sh.getID()] = true 310 } 311 if len(gotSessions) != 10 { 312 t.Fatalf("got %v unique sessions, want 10", len(gotSessions)) 313 } 314 if !testEqual(gotSessions, wantSessions) { 315 t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions) 316 } 317} 318 319// TestTakeFromIdleListChecked tests taking sessions from session pool's idle 320// list, but with a extra ping check. 321func TestTakeFromIdleListChecked(t *testing.T) { 322 t.Parallel() 323 ctx := context.Background() 324 325 // Make sure maintainer keeps the idle sessions. 326 server, client, teardown := setupMockedTestServerWithConfig(t, 327 ClientConfig{ 328 SessionPoolConfig: SessionPoolConfig{ 329 MaxIdle: 1, 330 HealthCheckInterval: 50 * time.Millisecond, 331 healthCheckSampleInterval: 10 * time.Millisecond, 332 }, 333 }) 334 defer teardown() 335 sp := client.idleSessions 336 337 // Stop healthcheck workers to simulate slow pings. 338 sp.hc.close() 339 340 // Create a session and recycle it. 341 sh, err := sp.take(ctx) 342 if err != nil { 343 t.Fatalf("failed to get session: %v", err) 344 } 345 346 // Force ping during the first take() by setting check time to the past. 347 sh.session.nextCheck = time.Now().Add(-time.Minute) 348 wantSid := sh.getID() 349 sh.recycle() 350 351 // Two back-to-back session requests, both of them should return the same 352 // session created before, but only the first of them should trigger a session ping. 353 for i := 0; i < 2; i++ { 354 // Take the session from the idle list and recycle it. 355 sh, err = sp.take(ctx) 356 if err != nil { 357 t.Fatalf("%v - failed to get session: %v", i, err) 358 } 359 if gotSid := sh.getID(); gotSid != wantSid { 360 t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid) 361 } 362 363 // The two back-to-back session requests shouldn't trigger any session 364 // pings because sessionPool.Take reschedules the next healthcheck. 365 if got, want := server.TestSpanner.DumpPings(), ([]string{wantSid}); !testEqual(got, want) { 366 t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want) 367 } 368 sh.recycle() 369 } 370 371 // Inject session error to server stub, and take the session from the 372 // session pool, the old session should be destroyed and the session pool 373 // will create a new session. 374 server.TestSpanner.PutExecutionTime(MethodGetSession, 375 SimulatedExecutionTime{ 376 Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}, 377 }) 378 379 // Force ping by setting check time in the past. 380 s := sp.idleList.Front().Value.(*session) 381 s.nextCheck = time.Now().Add(-time.Minute) 382 383 // take will take the idle session. Then it will send a GetSession request 384 // to check if it's healthy. It'll discover that it's not healthy 385 // (NotFound), drop it, and create a new session. 386 sh, err = sp.take(ctx) 387 if err != nil { 388 t.Fatalf("failed to get session: %v", err) 389 } 390 ds := server.TestSpanner.DumpSessions() 391 if len(ds) != 1 { 392 t.Fatalf("dumped sessions from mockclient: %v, want %v", ds, sh.getID()) 393 } 394 if sh.getID() == wantSid { 395 t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid) 396 } 397} 398 399// TestTakeFromIdleWriteListChecked tests taking sessions from session pool's 400// idle list, but with a extra ping check. 401func TestTakeFromIdleWriteListChecked(t *testing.T) { 402 t.Parallel() 403 ctx := context.Background() 404 405 // Make sure maintainer keeps the idle sessions. 406 server, client, teardown := setupMockedTestServerWithConfig(t, 407 ClientConfig{ 408 SessionPoolConfig: SessionPoolConfig{ 409 MaxIdle: 1, 410 HealthCheckInterval: 50 * time.Millisecond, 411 healthCheckSampleInterval: 10 * time.Millisecond, 412 }, 413 }) 414 defer teardown() 415 sp := client.idleSessions 416 417 // Stop healthcheck workers to simulate slow pings. 418 sp.hc.close() 419 420 // Create a session and recycle it. 421 sh, err := sp.takeWriteSession(ctx) 422 if err != nil { 423 t.Fatalf("failed to get session: %v", err) 424 } 425 wantSid := sh.getID() 426 // Set the next check in the past to ensure the next take() call will 427 // trigger a health check. 428 sh.session.nextCheck = time.Now().Add(-time.Minute) 429 sh.recycle() 430 431 // Two back-to-back session requests, both of them should return the same 432 // session created before and only the first of them should trigger a 433 // session ping. 434 for i := 0; i < 2; i++ { 435 // Take the session from the idle list and recycle it. 436 sh, err = sp.takeWriteSession(ctx) 437 if err != nil { 438 t.Fatalf("%v - failed to get session: %v", i, err) 439 } 440 if gotSid := sh.getID(); gotSid != wantSid { 441 t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid) 442 } 443 // The two back-to-back session requests shouldn't trigger any session 444 // pings because sessionPool.Take reschedules the next healthcheck. 445 if got, want := server.TestSpanner.DumpPings(), ([]string{wantSid}); !testEqual(got, want) { 446 t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want) 447 } 448 sh.recycle() 449 } 450 451 // Inject session error to mockclient, and take the session from the 452 // session pool, the old session should be destroyed and the session pool 453 // will create a new session. 454 server.TestSpanner.PutExecutionTime(MethodGetSession, 455 SimulatedExecutionTime{ 456 Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}, 457 }) 458 459 // Force ping by setting check time in the past. 460 s := sp.idleList.Front().Value.(*session) 461 s.nextCheck = time.Now().Add(-time.Minute) 462 463 sh, err = sp.takeWriteSession(ctx) 464 if err != nil { 465 t.Fatalf("failed to get session: %v", err) 466 } 467 ds := server.TestSpanner.DumpSessions() 468 if len(ds) != 1 { 469 t.Fatalf("dumped sessions from mockclient: %v, want %v", ds, sh.getID()) 470 } 471 if sh.getID() == wantSid { 472 t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid) 473 } 474} 475 476// TestSessionLeak tests leaking a session and getting the stack of the 477// goroutine that leaked it. 478func TestSessionLeak(t *testing.T) { 479 t.Parallel() 480 ctx := context.Background() 481 482 _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ 483 SessionPoolConfig: SessionPoolConfig{ 484 TrackSessionHandles: true, 485 MinOpened: 0, 486 MaxOpened: 1, 487 }, 488 }) 489 defer teardown() 490 491 // Execute a query without calling rowIterator.Stop. This will cause the 492 // session not to be returned to the pool. 493 single := client.Single() 494 iter := single.Query(ctx, NewStatement(SelectFooFromBar)) 495 for { 496 _, err := iter.Next() 497 if err == iterator.Done { 498 break 499 } 500 if err != nil { 501 t.Fatalf("Got unexpected error while iterating results: %v\n", err) 502 } 503 } 504 // The session should not have been returned to the pool. 505 if g, w := client.idleSessions.idleList.Len(), 0; g != w { 506 t.Fatalf("Idle sessions count mismatch\nGot: %d\nWant: %d\n", g, w) 507 } 508 // The checked out session should contain a stack trace. 509 if single.sh.stack == nil { 510 t.Fatalf("Missing stacktrace from session handle") 511 } 512 stack := fmt.Sprintf("%s", single.sh.stack) 513 testMethod := "TestSessionLeak" 514 if !strings.Contains(stack, testMethod) { 515 t.Fatalf("Stacktrace does not contain '%s'\nGot: %s", testMethod, stack) 516 } 517 // Return the session to the pool. 518 iter.Stop() 519 // The stack should now have been removed from the session handle. 520 if single.sh.stack != nil { 521 t.Fatalf("Got unexpected stacktrace in session handle: %s", single.sh.stack) 522 } 523 524 // Do another query and hold on to the session. 525 single = client.Single() 526 iter = single.Query(ctx, NewStatement(SelectFooFromBar)) 527 for { 528 _, err := iter.Next() 529 if err == iterator.Done { 530 break 531 } 532 if err != nil { 533 t.Fatalf("Got unexpected error while iterating results: %v\n", err) 534 } 535 } 536 // Try to do another query. This will fail as MaxOpened=1. 537 ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Millisecond*10) 538 defer cancel() 539 single2 := client.Single() 540 iter2 := single2.Query(ctxWithTimeout, NewStatement(SelectFooFromBar)) 541 _, gotErr := iter2.Next() 542 wantErr := client.idleSessions.errGetSessionTimeoutWithTrackedSessionHandles() 543 // The error should contain the stacktraces of all the checked out 544 // sessions. 545 if !testEqual(gotErr, wantErr) { 546 t.Fatalf("Error mismatch on iterating result set.\nGot: %v\nWant: %v\n", gotErr, wantErr) 547 } 548 if !strings.Contains(gotErr.Error(), testMethod) { 549 t.Fatalf("Error does not contain '%s'\nGot: %s", testMethod, gotErr.Error()) 550 } 551 // Close iterators to check sessions back into the pool before closing. 552 iter2.Stop() 553 iter.Stop() 554} 555 556// TestMaxOpenedSessions tests max open sessions constraint. 557func TestMaxOpenedSessions(t *testing.T) { 558 t.Parallel() 559 ctx := context.Background() 560 _, client, teardown := setupMockedTestServerWithConfig(t, 561 ClientConfig{ 562 SessionPoolConfig: SessionPoolConfig{ 563 MaxOpened: 1, 564 }, 565 }) 566 defer teardown() 567 sp := client.idleSessions 568 569 sh1, err := sp.take(ctx) 570 if err != nil { 571 t.Fatalf("cannot take session from session pool: %v", err) 572 } 573 574 // Session request will timeout due to the max open sessions constraint. 575 ctx2, cancel := context.WithTimeout(ctx, 10*time.Millisecond) 576 defer cancel() 577 _, gotErr := sp.take(ctx2) 578 if wantErr := sp.errGetBasicSessionTimeout(); !testEqual(gotErr, wantErr) { 579 t.Fatalf("the second session retrival returns error %v, want %v", gotErr, wantErr) 580 } 581 doneWaiting := make(chan struct{}) 582 go func() { 583 // Destroy the first session to allow the next session request to 584 // proceed. 585 <-doneWaiting 586 sh1.destroy() 587 }() 588 589 go func() { 590 // Wait a short random time before destroying the session handle. 591 <-time.After(10 * time.Millisecond) 592 close(doneWaiting) 593 }() 594 // Now session request can be processed because the first session will be 595 // destroyed. 596 ctx3, cancel := context.WithTimeout(ctx, time.Second) 597 defer cancel() 598 sh2, err := sp.take(ctx3) 599 if err != nil { 600 t.Fatalf("after the first session is destroyed, session retrival still returns error %v, want nil", err) 601 } 602 if !sh2.session.isValid() || sh2.getID() == "" { 603 t.Fatalf("got invalid session: %v", sh2.session) 604 } 605} 606 607// TestMinOpenedSessions tests min open session constraint. 608func TestMinOpenedSessions(t *testing.T) { 609 t.Parallel() 610 ctx := context.Background() 611 _, client, teardown := setupMockedTestServerWithConfig(t, 612 ClientConfig{ 613 SessionPoolConfig: SessionPoolConfig{ 614 MinOpened: 1, 615 healthCheckSampleInterval: time.Millisecond, 616 }, 617 }) 618 defer teardown() 619 sp := client.idleSessions 620 621 // Take ten sessions from session pool and recycle them. 622 var ss []*session 623 var shs []*sessionHandle 624 for i := 0; i < 10; i++ { 625 sh, err := sp.take(ctx) 626 if err != nil { 627 t.Fatalf("failed to get session(%v): %v", i, err) 628 } 629 ss = append(ss, sh.session) 630 shs = append(shs, sh) 631 sh.recycle() 632 } 633 for _, sh := range shs { 634 sh.recycle() 635 } 636 637 // Simulate session expiration. 638 for _, s := range ss { 639 s.destroy(true) 640 } 641 642 // Wait until the maintainer has had a chance to replenish the pool. 643 for i := 0; i < 10; i++ { 644 sp.mu.Lock() 645 if sp.numOpened > 0 { 646 sp.mu.Unlock() 647 break 648 } 649 sp.mu.Unlock() 650 <-time.After(sp.healthCheckSampleInterval) 651 } 652 sp.mu.Lock() 653 defer sp.mu.Unlock() 654 // There should be still one session left in either the idle list or in one 655 // of the other opened states due to the min open sessions constraint. 656 if (sp.idleList.Len() + 657 sp.idleWriteList.Len() + 658 int(sp.prepareReqs) + 659 int(sp.createReqs)) != 1 { 660 t.Fatalf( 661 "got %v sessions in idle lists, want 1. Opened: %d, read: %d, "+ 662 "write: %d, in preparation: %d, in creation: %d", 663 sp.idleList.Len()+sp.idleWriteList.Len(), sp.numOpened, 664 sp.idleList.Len(), sp.idleWriteList.Len(), sp.prepareReqs, 665 sp.createReqs) 666 } 667} 668 669// TestMaxBurst tests max burst constraint. 670func TestMaxBurst(t *testing.T) { 671 t.Parallel() 672 ctx := context.Background() 673 server, client, teardown := setupMockedTestServerWithConfig(t, 674 ClientConfig{ 675 SessionPoolConfig: SessionPoolConfig{ 676 MaxBurst: 1, 677 }, 678 }) 679 defer teardown() 680 sp := client.idleSessions 681 682 // Will cause session creation RPC to be retried forever. 683 server.TestSpanner.PutExecutionTime(MethodCreateSession, 684 SimulatedExecutionTime{ 685 Errors: []error{status.Errorf(codes.Unavailable, "try later")}, 686 KeepError: true, 687 }) 688 689 // This session request will never finish until the injected error is 690 // cleared. 691 go sp.take(ctx) 692 693 // Poll for the execution of the first session request. 694 for { 695 sp.mu.Lock() 696 cr := sp.createReqs 697 sp.mu.Unlock() 698 if cr == 0 { 699 <-time.After(time.Second) 700 continue 701 } 702 // The first session request is being executed. 703 break 704 } 705 706 ctx2, cancel := context.WithTimeout(ctx, time.Second) 707 defer cancel() 708 _, gotErr := sp.take(ctx2) 709 710 // Since MaxBurst == 1, the second session request should block. 711 if wantErr := sp.errGetBasicSessionTimeout(); !testEqual(gotErr, wantErr) { 712 t.Fatalf("session retrival returns error %v, want %v", gotErr, wantErr) 713 } 714 715 // Let the first session request succeed. 716 server.TestSpanner.Freeze() 717 server.TestSpanner.PutExecutionTime(MethodCreateSession, SimulatedExecutionTime{}) 718 //close(allowRequests) 719 server.TestSpanner.Unfreeze() 720 721 // Now new session request can proceed because the first session request will eventually succeed. 722 sh, err := sp.take(ctx) 723 if err != nil { 724 t.Fatalf("session retrival returns error %v, want nil", err) 725 } 726 if !sh.session.isValid() || sh.getID() == "" { 727 t.Fatalf("got invalid session: %v", sh.session) 728 } 729} 730 731// TestSessionRecycle tests recycling sessions. 732func TestSessionRecycle(t *testing.T) { 733 t.Parallel() 734 ctx := context.Background() 735 // Set MaxBurst=MinOpened to prevent additional sessions to be created 736 // while session pool initialization is still running. 737 _, client, teardown := setupMockedTestServerWithConfig(t, 738 ClientConfig{ 739 SessionPoolConfig: SessionPoolConfig{ 740 MinOpened: 1, 741 MaxIdle: 5, 742 MaxBurst: 1, 743 }, 744 }) 745 defer teardown() 746 sp := client.idleSessions 747 748 // Test session is correctly recycled and reused. 749 for i := 0; i < 20; i++ { 750 s, err := sp.take(ctx) 751 if err != nil { 752 t.Fatalf("cannot get the session %v: %v", i, err) 753 } 754 s.recycle() 755 } 756 757 sp.mu.Lock() 758 defer sp.mu.Unlock() 759 // The session pool should only contain 1 session, as there is no minimum 760 // configured. In addition, there has never been more than one session in 761 // use at any time, so there's no need for the session pool to create a 762 // second session. The session has also been in use all the time, so there 763 // also no reason for the session pool to delete the session. 764 if sp.numOpened != 1 { 765 t.Fatalf("Expect session pool size 1, got %d", sp.numOpened) 766 } 767} 768 769// TestSessionDestroy tests destroying sessions. 770func TestSessionDestroy(t *testing.T) { 771 t.Parallel() 772 ctx := context.Background() 773 _, client, teardown := setupMockedTestServerWithConfig(t, 774 ClientConfig{ 775 SessionPoolConfig: SessionPoolConfig{ 776 MinOpened: 1, 777 MaxBurst: 1, 778 }, 779 }) 780 defer teardown() 781 sp := client.idleSessions 782 783 // Creating a session pool with MinSessions=1 will automatically start the 784 // creation of 1 session when the session pool is created. As MaxBurst=1, 785 // the session pool will never create more than 1 session at a time, so the 786 // take() method will wait if the initial session has not yet been created. 787 sh, err := sp.take(ctx) 788 if err != nil { 789 t.Fatalf("cannot get session from session pool: %v", err) 790 } 791 s := sh.session 792 sh.recycle() 793 if d := s.destroy(true); d || !s.isValid() { 794 // Session should be remaining because of min open sessions constraint. 795 t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d) 796 } 797 if d := s.destroy(false); !d || s.isValid() { 798 // Session should be destroyed. 799 t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d) 800 } 801} 802 803// TestHcHeap tests heap operation on top of hcHeap. 804func TestHcHeap(t *testing.T) { 805 in := []*session{ 806 {nextCheck: time.Unix(10, 0)}, 807 {nextCheck: time.Unix(0, 5)}, 808 {nextCheck: time.Unix(1, 8)}, 809 {nextCheck: time.Unix(11, 7)}, 810 {nextCheck: time.Unix(6, 3)}, 811 } 812 want := []*session{ 813 {nextCheck: time.Unix(1, 8), hcIndex: 0}, 814 {nextCheck: time.Unix(6, 3), hcIndex: 1}, 815 {nextCheck: time.Unix(8, 2), hcIndex: 2}, 816 {nextCheck: time.Unix(10, 0), hcIndex: 3}, 817 {nextCheck: time.Unix(11, 7), hcIndex: 4}, 818 } 819 hh := hcHeap{} 820 for _, s := range in { 821 heap.Push(&hh, s) 822 } 823 // Change top of the heap and do a adjustment. 824 hh.sessions[0].nextCheck = time.Unix(8, 2) 825 heap.Fix(&hh, 0) 826 for idx := 0; hh.Len() > 0; idx++ { 827 got := heap.Pop(&hh).(*session) 828 want[idx].hcIndex = -1 829 if !testEqual(got, want[idx]) { 830 t.Fatalf("%v: heap.Pop returns %v, want %v", idx, got, want[idx]) 831 } 832 } 833} 834 835// TestHealthCheckScheduler tests if healthcheck workers can schedule and 836// perform healthchecks properly. 837func TestHealthCheckScheduler(t *testing.T) { 838 t.Parallel() 839 ctx := context.Background() 840 server, client, teardown := setupMockedTestServerWithConfig(t, 841 ClientConfig{ 842 SessionPoolConfig: SessionPoolConfig{ 843 HealthCheckInterval: 50 * time.Millisecond, 844 healthCheckSampleInterval: 10 * time.Millisecond, 845 }, 846 }) 847 defer teardown() 848 sp := client.idleSessions 849 850 // Create 50 sessions. 851 for i := 0; i < 50; i++ { 852 _, err := sp.take(ctx) 853 if err != nil { 854 t.Fatalf("cannot get session from session pool: %v", err) 855 } 856 } 857 858 // Make sure we start with a ping history to avoid that the first 859 // sessions that were created have not already exceeded the maximum 860 // number of pings. 861 server.TestSpanner.ClearPings() 862 // Wait for 10-30 pings per session. 863 waitFor(t, func() error { 864 // Only check actually live sessions and ignore any sessions the 865 // session pool may have deleted in the meantime. 866 liveSessions := server.TestSpanner.DumpSessions() 867 dp := server.TestSpanner.DumpPings() 868 gotPings := map[string]int64{} 869 for _, p := range dp { 870 gotPings[p]++ 871 } 872 for s := range liveSessions { 873 want := int64(20) 874 if got := gotPings[s]; got < want/2 || got > want+want/2 { 875 // This is an unnacceptable amount of pings. 876 return fmt.Errorf("got %v healthchecks on session %v, want it between (%v, %v)", got, s, want/2, want+want/2) 877 } 878 } 879 return nil 880 }) 881} 882 883// Tests that a fractions of sessions are prepared for write by health checker. 884func TestWriteSessionsPrepared(t *testing.T) { 885 t.Parallel() 886 ctx := context.Background() 887 _, client, teardown := setupMockedTestServerWithConfig(t, 888 ClientConfig{ 889 SessionPoolConfig: SessionPoolConfig{ 890 WriteSessions: 0.5, 891 MaxIdle: 20, 892 HealthCheckInterval: time.Nanosecond, 893 }, 894 }) 895 defer teardown() 896 sp := client.idleSessions 897 898 shs := make([]*sessionHandle, 10) 899 var err error 900 for i := 0; i < 10; i++ { 901 shs[i], err = sp.take(ctx) 902 if err != nil { 903 t.Fatalf("cannot get session from session pool: %v", err) 904 } 905 } 906 // Now there are 10 sessions in the pool. Release them. 907 for _, sh := range shs { 908 sh.recycle() 909 } 910 911 // Take 5 write sessions. The write sessions will be taken from either the 912 // list of prepared sessions (idleWriteList), or they will be prepared 913 // during the takeWriteSession method. 914 wshs := make([]*sessionHandle, 5) 915 for i := 0; i < 5; i++ { 916 wshs[i], err = sp.takeWriteSession(ctx) 917 if err != nil { 918 t.Fatalf("cannot get session from session pool: %v", err) 919 } 920 if wshs[i].getTransactionID() == nil { 921 t.Fatalf("got nil transaction id from session pool") 922 } 923 } 924 // Return the session to the pool. 925 for _, sh := range wshs { 926 sh.recycle() 927 } 928 929 // Now force creation of 10 more sessions. 930 shs = make([]*sessionHandle, 20) 931 for i := 0; i < 20; i++ { 932 shs[i], err = sp.take(ctx) 933 if err != nil { 934 t.Fatalf("cannot get session from session pool: %v", err) 935 } 936 } 937 938 // Now there are 20 sessions in the pool. Release them. 939 for _, sh := range shs { 940 sh.recycle() 941 } 942 // The health checker should eventually prepare 10 of the 20 sessions with 943 // a r/w tx. 944 waitUntil := time.After(time.Second) 945 var numWritePrepared int 946 for numWritePrepared < 10 { 947 select { 948 case <-waitUntil: 949 break 950 default: 951 } 952 sp.mu.Lock() 953 numWritePrepared = sp.idleWriteList.Len() 954 sp.mu.Unlock() 955 } 956 957 sp.mu.Lock() 958 defer sp.mu.Unlock() 959 if sp.idleWriteList.Len() != 10 { 960 t.Fatalf("Expect 10 write prepared session, got: %d", sp.idleWriteList.Len()) 961 } 962} 963 964// TestTakeFromWriteQueue tests that sessionPool.take() returns write prepared 965// sessions as well. 966func TestTakeFromWriteQueue(t *testing.T) { 967 t.Parallel() 968 ctx := context.Background() 969 _, client, teardown := setupMockedTestServerWithConfig(t, 970 ClientConfig{ 971 SessionPoolConfig: SessionPoolConfig{ 972 MaxOpened: 1, 973 WriteSessions: 1.0, 974 MaxIdle: 1, 975 HealthCheckInterval: time.Nanosecond, 976 }, 977 }) 978 defer teardown() 979 sp := client.idleSessions 980 981 sh, err := sp.take(ctx) 982 if err != nil { 983 t.Fatalf("cannot get session from session pool: %v", err) 984 } 985 sh.recycle() 986 987 // Wait until the health checker has write-prepared the session. 988 waitUntil := time.After(time.Second) 989 var numWritePrepared int 990 for numWritePrepared == 0 { 991 select { 992 case <-waitUntil: 993 break 994 default: 995 } 996 sp.mu.Lock() 997 numWritePrepared = sp.idleWriteList.Len() 998 sp.mu.Unlock() 999 } 1000 1001 // The session should now be in write queue but take should also return it. 1002 sp.mu.Lock() 1003 if sp.idleWriteList.Len() == 0 { 1004 t.Fatalf("write queue unexpectedly empty") 1005 } 1006 if sp.idleList.Len() != 0 { 1007 t.Fatalf("read queue not empty") 1008 } 1009 sp.mu.Unlock() 1010 sh, err = sp.take(ctx) 1011 if err != nil { 1012 t.Fatalf("cannot get session from session pool: %v", err) 1013 } 1014 sh.recycle() 1015} 1016 1017// The session pool should stop trying to create write-prepared sessions if a 1018// non-transient error occurs while trying to begin a transaction. The 1019// process for preparing write sessions should automatically be re-enabled if 1020// a BeginTransaction call initiated by takeWriteSession succeeds. 1021// 1022// The only exception to the above is that a 'Session not found' error should 1023// cause the session to be removed from the session pool, and it should not 1024// affect the background process of preparing sessions. 1025func TestErrorOnPrepareSession(t *testing.T) { 1026 t.Parallel() 1027 1028 serverErrors := []error{ 1029 status.Errorf(codes.PermissionDenied, "Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource"), 1030 status.Errorf(codes.NotFound, `Database not found: projects/<project>/instances/<instance>/databases/<database> resource_type: "type.googleapis.com/google.spanner.admin.database.v1.Database" resource_name: "projects/<project>/instances/<instance>/databases/<database>" description: "Database does not exist."`), 1031 status.Errorf(codes.FailedPrecondition, "Invalid transaction option"), 1032 status.Errorf(codes.Internal, "Unknown server error"), 1033 } 1034 logger := log.New(os.Stderr, "", log.LstdFlags) 1035 for _, serverErr := range serverErrors { 1036 ctx := context.Background() 1037 server, client, teardown := setupMockedTestServerWithConfig(t, 1038 ClientConfig{ 1039 SessionPoolConfig: SessionPoolConfig{ 1040 MinOpened: 10, 1041 MaxOpened: 10, 1042 WriteSessions: 0.5, 1043 HealthCheckInterval: time.Millisecond, 1044 }, 1045 logger: logger, 1046 }) 1047 defer teardown() 1048 // Discard logging until trying to prepare sessions has stopped. 1049 logger.SetOutput(ioutil.Discard) 1050 server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{ 1051 Errors: []error{serverErr}, 1052 KeepError: true, 1053 }) 1054 sp := client.idleSessions 1055 1056 // Wait until the health checker has tried to write-prepare a session. 1057 // This will cause the session pool to write some errors to the log that 1058 // preparing sessions failed. 1059 waitUntil := time.After(time.Second) 1060 var prepareDisabled bool 1061 var numOpened int 1062 waitForPrepare: 1063 for !prepareDisabled || numOpened < 10 { 1064 select { 1065 case <-waitUntil: 1066 break waitForPrepare 1067 default: 1068 } 1069 sp.mu.Lock() 1070 prepareDisabled = sp.disableBackgroundPrepareSessions 1071 numOpened = sp.idleList.Len() 1072 sp.mu.Unlock() 1073 } 1074 // Re-enable logging. 1075 logger.SetOutput(os.Stderr) 1076 1077 // There should be no write-prepared sessions. 1078 sp.mu.Lock() 1079 if sp.idleWriteList.Len() != 0 { 1080 sp.mu.Unlock() 1081 t.Fatalf("write queue unexpectedly not empty") 1082 } 1083 // All sessions should be in the read idle list. 1084 if g, w := sp.idleList.Len(), 10; g != w { 1085 sp.mu.Unlock() 1086 t.Fatalf("session count mismatch:\nWant: %v\nGot: %v", w, g) 1087 } 1088 sp.mu.Unlock() 1089 // Take a read session should succeed. 1090 sh, err := sp.take(ctx) 1091 if err != nil { 1092 t.Fatalf("cannot get session from session pool: %v", err) 1093 } 1094 sh.recycle() 1095 // Take a write session should fail with the server error. 1096 _, err = sp.takeWriteSession(ctx) 1097 if ErrCode(err) != ErrCode(serverErr) { 1098 t.Fatalf("take write session failed with unexpected error.\nGot: %v\nWant: %v\n", err, serverErr) 1099 } 1100 1101 // Clearing the error on the server should allow us to take a write 1102 // session. 1103 server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{}) 1104 sh, err = sp.takeWriteSession(ctx) 1105 if err != nil { 1106 t.Fatalf("cannot get write session from session pool: %v", err) 1107 } 1108 sh.recycle() 1109 // The maintainer should also pick this up and prepare 50% of the sessions. 1110 waitUntil = time.After(time.Second) 1111 var numPrepared int 1112 for numPrepared < 5 { 1113 select { 1114 case <-waitUntil: 1115 break 1116 default: 1117 } 1118 sp.mu.Lock() 1119 numPrepared = sp.idleWriteList.Len() 1120 sp.mu.Unlock() 1121 } 1122 sp.mu.Lock() 1123 if g, w := sp.idleWriteList.Len(), 5; g != w { 1124 sp.mu.Unlock() 1125 t.Fatalf("write session count mismatch:\nWant: %v\nGot: %v", w, g) 1126 } 1127 sp.mu.Unlock() 1128 } 1129} 1130 1131// The session pool should continue to try to create write-prepared sessions if 1132// a 'Session not found' error occurs. The session that has been deleted by 1133// backend should be removed from the pool, and the maintainer should create a 1134// new session if this causes the number of sessions in the pool to fall below 1135// MinOpened. 1136func TestSessionNotFoundOnPrepareSession(t *testing.T) { 1137 t.Parallel() 1138 1139 // The server will return 'Session not found' for the first 8 1140 // BeginTransaction calls. 1141 sessionNotFoundErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s") 1142 serverErrors := make([]error, 8) 1143 for i := range serverErrors { 1144 serverErrors[i] = sessionNotFoundErr 1145 } 1146 ctx := context.Background() 1147 logger := log.New(os.Stderr, "", log.LstdFlags) 1148 server, client, teardown := setupMockedTestServerWithConfig(t, 1149 ClientConfig{ 1150 SessionPoolConfig: SessionPoolConfig{ 1151 MinOpened: 10, 1152 MaxOpened: 10, 1153 WriteSessions: 0.5, 1154 HealthCheckInterval: time.Millisecond, 1155 healthCheckSampleInterval: time.Millisecond, 1156 }, 1157 logger: logger, 1158 }) 1159 defer teardown() 1160 // Discard logging until trying to prepare sessions has stopped. 1161 logger.SetOutput(ioutil.Discard) 1162 server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{ 1163 Errors: serverErrors, 1164 }) 1165 sp := client.idleSessions 1166 1167 // Wait until the health checker has tried to write-prepare the sessions. 1168 waitUntil := time.After(5 * time.Second) 1169 var numWriteSessions int 1170 var numReadSessions int 1171waitForPrepare: 1172 for (numWriteSessions+numReadSessions) < 10 || numWriteSessions < 5 { 1173 select { 1174 case <-waitUntil: 1175 break waitForPrepare 1176 default: 1177 } 1178 sp.mu.Lock() 1179 numReadSessions = sp.idleList.Len() 1180 numWriteSessions = sp.idleWriteList.Len() 1181 sp.mu.Unlock() 1182 } 1183 // Re-enable logging. 1184 logger.SetOutput(os.Stderr) 1185 1186 // There should be at least 5 write-prepared sessions. 1187 sp.mu.Lock() 1188 if g, w := sp.idleWriteList.Len(), 5; g < w { 1189 sp.mu.Unlock() 1190 t.Fatalf("write-prepared session count mismatch.\nWant at least: %v\nGot: %v", w, g) 1191 } 1192 // The other sessions should be in the read idle list. 1193 if g, w := sp.idleList.Len()+sp.idleWriteList.Len(), 10; g != w { 1194 sp.mu.Unlock() 1195 t.Fatalf("total session count mismatch:\nWant: %v\nGot: %v", w, g) 1196 } 1197 sp.mu.Unlock() 1198 // Take a read session should succeed. 1199 sh, err := sp.take(ctx) 1200 if err != nil { 1201 t.Fatalf("cannot get session from session pool: %v", err) 1202 } 1203 sh.recycle() 1204 // Take a write session should succeed. 1205 sh, err = sp.takeWriteSession(ctx) 1206 if err != nil { 1207 t.Fatalf("take write session failed with unexpected error.\nGot: %v\nWant: %v\n", err, nil) 1208 } 1209 sh.recycle() 1210} 1211 1212// TestSessionHealthCheck tests healthchecking cases. 1213func TestSessionHealthCheck(t *testing.T) { 1214 t.Parallel() 1215 ctx := context.Background() 1216 server, client, teardown := setupMockedTestServerWithConfig(t, 1217 ClientConfig{ 1218 SessionPoolConfig: SessionPoolConfig{ 1219 HealthCheckInterval: time.Nanosecond, 1220 healthCheckSampleInterval: 10 * time.Millisecond, 1221 }, 1222 }) 1223 defer teardown() 1224 sp := client.idleSessions 1225 1226 // Test pinging sessions. 1227 sh, err := sp.take(ctx) 1228 if err != nil { 1229 t.Fatalf("cannot get session from session pool: %v", err) 1230 } 1231 1232 // Wait for healthchecker to send pings to session. 1233 waitFor(t, func() error { 1234 pings := server.TestSpanner.DumpPings() 1235 if len(pings) == 0 || pings[0] != sh.getID() { 1236 return fmt.Errorf("healthchecker didn't send any ping to session %v", sh.getID()) 1237 } 1238 return nil 1239 }) 1240 // Test broken session detection. 1241 sh, err = sp.take(ctx) 1242 if err != nil { 1243 t.Fatalf("cannot get session from session pool: %v", err) 1244 } 1245 1246 server.TestSpanner.Freeze() 1247 server.TestSpanner.PutExecutionTime(MethodGetSession, 1248 SimulatedExecutionTime{ 1249 Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}, 1250 KeepError: true, 1251 }) 1252 server.TestSpanner.Unfreeze() 1253 1254 s := sh.session 1255 waitFor(t, func() error { 1256 if sh.session.isValid() { 1257 return fmt.Errorf("session(%v) is still alive, want it to be dropped by healthcheck workers", s) 1258 } 1259 return nil 1260 }) 1261 1262 server.TestSpanner.Freeze() 1263 server.TestSpanner.PutExecutionTime(MethodGetSession, SimulatedExecutionTime{}) 1264 server.TestSpanner.Unfreeze() 1265 1266 // Test garbage collection. 1267 sh, err = sp.take(ctx) 1268 if err != nil { 1269 t.Fatalf("cannot get session from session pool: %v", err) 1270 } 1271 sp.close() 1272 if sh.session.isValid() { 1273 t.Fatalf("session(%v) is still alive, want it to be garbage collected", s) 1274 } 1275} 1276 1277// TestStressSessionPool does stress test on session pool by the following concurrent operations: 1278// 1) Test worker gets a session from the pool. 1279// 2) Test worker turns a session back into the pool. 1280// 3) Test worker destroys a session got from the pool. 1281// 4) Healthcheck destroys a broken session (because a worker has already destroyed it). 1282// 5) Test worker closes the session pool. 1283// 1284// During the test, the session pool maintainer maintains the number of sessions, 1285// and it is expected that all sessions that are taken from session pool remains valid. 1286// When all test workers and healthcheck workers exit, mockclient, session pool 1287// and healthchecker should be in consistent state. 1288func TestStressSessionPool(t *testing.T) { 1289 t.Parallel() 1290 1291 // Use concurrent workers to test different session pool built from different configurations. 1292 for ti, cfg := range []SessionPoolConfig{ 1293 {}, 1294 {MinOpened: 10, MaxOpened: 100}, 1295 {MaxBurst: 50}, 1296 {MinOpened: 10, MaxOpened: 200, MaxBurst: 5}, 1297 {MinOpened: 10, MaxOpened: 200, MaxBurst: 5, WriteSessions: 0.2}, 1298 } { 1299 // Create a more aggressive session healthchecker to increase test concurrency. 1300 cfg.HealthCheckInterval = 50 * time.Millisecond 1301 cfg.healthCheckSampleInterval = 10 * time.Millisecond 1302 cfg.HealthCheckWorkers = 50 1303 1304 server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ 1305 SessionPoolConfig: cfg, 1306 }) 1307 sp := client.idleSessions 1308 1309 // Create a test group for this configuration and schedule 100 sub 1310 // sub tests within the group. 1311 t.Run(fmt.Sprintf("TestStressSessionPoolGroup%v", ti), func(t *testing.T) { 1312 for i := 0; i < 100; i++ { 1313 idx := i 1314 t.Logf("TestStressSessionPoolWithCfg%dWorker%03d", ti, idx) 1315 testStressSessionPool(t, cfg, ti, idx, sp, client) 1316 } 1317 }) 1318 sp.hc.close() 1319 // Here the states of healthchecker, session pool and mockclient are 1320 // stable. 1321 sp.mu.Lock() 1322 idleSessions := map[string]bool{} 1323 hcSessions := map[string]bool{} 1324 mockSessions := server.TestSpanner.DumpSessions() 1325 // Dump session pool's idle list. 1326 for sl := sp.idleList.Front(); sl != nil; sl = sl.Next() { 1327 s := sl.Value.(*session) 1328 if idleSessions[s.getID()] { 1329 t.Fatalf("%v: found duplicated session in idle list: %v", ti, s.getID()) 1330 } 1331 idleSessions[s.getID()] = true 1332 } 1333 for sl := sp.idleWriteList.Front(); sl != nil; sl = sl.Next() { 1334 s := sl.Value.(*session) 1335 if idleSessions[s.getID()] { 1336 t.Fatalf("%v: found duplicated session in idle write list: %v", ti, s.getID()) 1337 } 1338 idleSessions[s.getID()] = true 1339 } 1340 if int(sp.numOpened) != len(idleSessions) { 1341 t.Fatalf("%v: number of opened sessions (%v) != number of idle sessions (%v)", ti, sp.numOpened, len(idleSessions)) 1342 } 1343 if sp.createReqs != 0 { 1344 t.Fatalf("%v: number of pending session creations = %v, want 0", ti, sp.createReqs) 1345 } 1346 // Dump healthcheck queue. 1347 for _, s := range sp.hc.queue.sessions { 1348 if hcSessions[s.getID()] { 1349 t.Fatalf("%v: found duplicated session in healthcheck queue: %v", ti, s.getID()) 1350 } 1351 hcSessions[s.getID()] = true 1352 } 1353 sp.mu.Unlock() 1354 1355 // Verify that idleSessions == hcSessions == mockSessions. 1356 if !testEqual(idleSessions, hcSessions) { 1357 t.Fatalf("%v: sessions in idle list (%v) != sessions in healthcheck queue (%v)", ti, idleSessions, hcSessions) 1358 } 1359 // The server may contain more sessions than the health check queue. 1360 // This can be caused by a timeout client side during a CreateSession 1361 // request. The request may still be received and executed by the 1362 // server, but the session pool will not register the session. 1363 for id, b := range hcSessions { 1364 if b && !mockSessions[id] { 1365 t.Fatalf("%v: session in healthcheck queue (%v) was not found on server", ti, id) 1366 } 1367 } 1368 sp.close() 1369 mockSessions = server.TestSpanner.DumpSessions() 1370 for id, b := range hcSessions { 1371 if b && mockSessions[id] { 1372 t.Fatalf("Found session from pool still live on server: %v", id) 1373 } 1374 } 1375 teardown() 1376 } 1377} 1378 1379func testStressSessionPool(t *testing.T, cfg SessionPoolConfig, ti int, idx int, pool *sessionPool, client *Client) { 1380 ctx := context.Background() 1381 // Test worker iterates 1K times and tries different 1382 // session / session pool operations. 1383 for j := 0; j < 1000; j++ { 1384 if idx%10 == 0 && j >= 900 { 1385 // Close the pool in selected set of workers during the 1386 // middle of the test. 1387 pool.close() 1388 } 1389 // Take a write sessions ~ 20% of the times. 1390 takeWrite := rand.Intn(5) == 4 1391 var ( 1392 sh *sessionHandle 1393 gotErr error 1394 ) 1395 wasValid := pool.isValid() 1396 if takeWrite { 1397 sh, gotErr = pool.takeWriteSession(ctx) 1398 } else { 1399 sh, gotErr = pool.take(ctx) 1400 } 1401 if gotErr != nil { 1402 if pool.isValid() { 1403 t.Fatalf("%v.%v: pool.take returns error when pool is still valid: %v", ti, idx, gotErr) 1404 } 1405 // If the session pool was closed when we tried to take a session 1406 // from the pool, then we should have gotten a specific error. 1407 // If the session pool was closed between the take() and now (or 1408 // even during a take()) then an error is ok. 1409 if !wasValid { 1410 if wantErr := errInvalidSessionPool; gotErr != wantErr { 1411 t.Fatalf("%v.%v: got error when pool is closed: %v, want %v", ti, idx, gotErr, wantErr) 1412 } 1413 } 1414 continue 1415 } 1416 // Verify if session is valid when session pool is valid. 1417 // Note that if session pool is invalid after sh is taken, 1418 // then sh might be invalidated by healthcheck workers. 1419 if (sh.getID() == "" || sh.session == nil || !sh.session.isValid()) && pool.isValid() { 1420 t.Fatalf("%v.%v.%v: pool.take returns invalid session %v", ti, idx, takeWrite, sh.session) 1421 } 1422 if takeWrite && sh.getTransactionID() == nil { 1423 t.Fatalf("%v.%v: pool.takeWriteSession returns session %v without transaction", ti, idx, sh.session) 1424 } 1425 if rand.Intn(100) < idx { 1426 // Random sleep before destroying/recycling the session, 1427 // to give healthcheck worker a chance to step in. 1428 <-time.After(time.Duration(rand.Int63n(int64(cfg.HealthCheckInterval)))) 1429 } 1430 if rand.Intn(100) < idx { 1431 // destroy the session. 1432 sh.destroy() 1433 continue 1434 } 1435 // recycle the session. 1436 sh.recycle() 1437 } 1438} 1439 1440// TestMaintainer checks the session pool maintainer maintains the number of 1441// sessions in the following cases: 1442// 1443// 1. On initialization of session pool, replenish session pool to meet 1444// MinOpened or MaxIdle. 1445// 2. On increased session usage, provision extra MaxIdle sessions. 1446// 3. After the surge passes, scale down the session pool accordingly. 1447func TestMaintainer(t *testing.T) { 1448 t.Parallel() 1449 ctx := context.Background() 1450 1451 minOpened := uint64(5) 1452 maxIdle := uint64(4) 1453 _, client, teardown := setupMockedTestServerWithConfig(t, 1454 ClientConfig{ 1455 SessionPoolConfig: SessionPoolConfig{ 1456 MinOpened: minOpened, 1457 MaxIdle: maxIdle, 1458 healthCheckSampleInterval: time.Millisecond, 1459 }, 1460 }) 1461 defer teardown() 1462 sp := client.idleSessions 1463 1464 waitFor(t, func() error { 1465 sp.mu.Lock() 1466 defer sp.mu.Unlock() 1467 if sp.numOpened != 5 { 1468 return fmt.Errorf("Replenish. Expect %d open, got %d", sp.MinOpened, sp.numOpened) 1469 } 1470 return nil 1471 }) 1472 1473 // To save test time, we are not creating many sessions, because the time 1474 // to create sessions will have impact on the decision on sessionsToKeep. 1475 // We also parallelize the take and recycle process. 1476 shs := make([]*sessionHandle, 20) 1477 for i := 0; i < len(shs); i++ { 1478 var err error 1479 shs[i], err = sp.take(ctx) 1480 if err != nil { 1481 t.Fatalf("cannot get session from session pool: %v", err) 1482 } 1483 } 1484 sp.mu.Lock() 1485 if sp.numOpened != 20 { 1486 t.Fatalf("Scale out from normal use. Expect %d open, got %d", 20, sp.numOpened) 1487 } 1488 sp.mu.Unlock() 1489 1490 // Return 14 sessions to the pool. There are still 6 sessions checked out. 1491 for _, sh := range shs[:14] { 1492 sh.recycle() 1493 } 1494 1495 // The pool should scale down to sessionsInUse + MaxIdle = 6 + 4 = 10. 1496 waitFor(t, func() error { 1497 sp.mu.Lock() 1498 defer sp.mu.Unlock() 1499 if sp.numOpened != 10 { 1500 return fmt.Errorf("Keep extra MaxIdle sessions. Expect %d open, got %d", 10, sp.numOpened) 1501 } 1502 return nil 1503 }) 1504 1505 // Return the remaining 6 sessions. 1506 // The pool should now scale down to minOpened + maxIdle. 1507 for _, sh := range shs[14:] { 1508 sh.recycle() 1509 } 1510 waitFor(t, func() error { 1511 sp.mu.Lock() 1512 defer sp.mu.Unlock() 1513 if sp.numOpened != minOpened { 1514 return fmt.Errorf("Scale down. Expect %d open, got %d", minOpened+maxIdle, sp.numOpened) 1515 } 1516 return nil 1517 }) 1518} 1519 1520// Tests that the session pool creates up to MinOpened connections. 1521// 1522// Historical context: This test also checks that a low 1523// healthCheckSampleInterval does not prevent it from opening connections. 1524// The low healthCheckSampleInterval will however sometimes cause session 1525// creations to time out. That should not be considered a problem, but it 1526// could cause the test case to fail if it happens too often. 1527// See: https://github.com/googleapis/google-cloud-go/issues/1259 1528func TestInit_CreatesSessions(t *testing.T) { 1529 t.Parallel() 1530 spc := SessionPoolConfig{ 1531 MinOpened: 10, 1532 MaxIdle: 10, 1533 WriteSessions: 0.0, 1534 healthCheckSampleInterval: 20 * time.Millisecond, 1535 } 1536 server, client, teardown := setupMockedTestServerWithConfig(t, 1537 ClientConfig{ 1538 SessionPoolConfig: spc, 1539 NumChannels: 4, 1540 }) 1541 defer teardown() 1542 sp := client.idleSessions 1543 1544 timeout := time.After(4 * time.Second) 1545 var numOpened int 1546loop: 1547 for { 1548 select { 1549 case <-timeout: 1550 t.Fatalf("timed out, got %d session(s), want %d", numOpened, spc.MinOpened) 1551 default: 1552 sp.mu.Lock() 1553 numOpened = sp.idleList.Len() + sp.idleWriteList.Len() 1554 sp.mu.Unlock() 1555 if numOpened == 10 { 1556 break loop 1557 } 1558 } 1559 } 1560 _, err := shouldHaveReceived(server.TestSpanner, []interface{}{ 1561 &sppb.BatchCreateSessionsRequest{}, 1562 &sppb.BatchCreateSessionsRequest{}, 1563 &sppb.BatchCreateSessionsRequest{}, 1564 &sppb.BatchCreateSessionsRequest{}, 1565 }) 1566 if err != nil { 1567 t.Fatal(err) 1568 } 1569} 1570 1571// Tests that the session pool with a MinSessions>0 also prepares WriteSessions 1572// sessions. 1573func TestInit_PreparesSessions(t *testing.T) { 1574 t.Parallel() 1575 spc := SessionPoolConfig{ 1576 MinOpened: 10, 1577 MaxIdle: 10, 1578 WriteSessions: 0.5, 1579 healthCheckSampleInterval: 20 * time.Millisecond, 1580 } 1581 _, client, teardown := setupMockedTestServerWithConfig(t, 1582 ClientConfig{ 1583 SessionPoolConfig: spc, 1584 }) 1585 defer teardown() 1586 sp := client.idleSessions 1587 1588 timeoutAmt := 4 * time.Second 1589 timeout := time.After(timeoutAmt) 1590 var numPrepared int 1591 want := int(spc.WriteSessions * float64(spc.MinOpened)) 1592loop: 1593 for { 1594 select { 1595 case <-timeout: 1596 t.Fatalf("timed out after %v, got %d write-prepared session(s), want %d", timeoutAmt, numPrepared, want) 1597 default: 1598 sp.mu.Lock() 1599 numPrepared = sp.idleWriteList.Len() 1600 sp.mu.Unlock() 1601 if numPrepared == want { 1602 break loop 1603 } 1604 } 1605 } 1606} 1607 1608func (s1 *session) Equal(s2 *session) bool { 1609 return s1.client == s2.client && 1610 s1.id == s2.id && 1611 s1.pool == s2.pool && 1612 s1.createTime == s2.createTime && 1613 s1.valid == s2.valid && 1614 s1.hcIndex == s2.hcIndex && 1615 s1.idleList == s2.idleList && 1616 s1.nextCheck.Equal(s2.nextCheck) && 1617 s1.checkingHealth == s2.checkingHealth && 1618 testEqual(s1.md, s2.md) && 1619 bytes.Equal(s1.tx, s2.tx) 1620} 1621 1622func waitFor(t *testing.T, assert func() error) { 1623 t.Helper() 1624 timeout := 15 * time.Second 1625 ta := time.After(timeout) 1626 1627 for { 1628 select { 1629 case <-ta: 1630 if err := assert(); err != nil { 1631 t.Fatalf("after %v waiting, got %v", timeout, err) 1632 } 1633 return 1634 default: 1635 } 1636 1637 if err := assert(); err != nil { 1638 // Fail. Let's pause and retry. 1639 time.Sleep(10 * time.Millisecond) 1640 continue 1641 } 1642 1643 return 1644 } 1645} 1646 1647// Tests that maintainer only deletes sessions after a full maintenance window 1648// of 10 cycles has finished. 1649func TestMaintainer_DeletesSessions(t *testing.T) { 1650 t.Parallel() 1651 1652 ctx := context.Background() 1653 const sampleInterval = time.Millisecond * 10 1654 _, client, teardown := setupMockedTestServerWithConfig(t, 1655 ClientConfig{ 1656 SessionPoolConfig: SessionPoolConfig{healthCheckSampleInterval: sampleInterval}, 1657 }) 1658 defer teardown() 1659 sp := client.idleSessions 1660 1661 // Take two sessions from the pool. 1662 // This will cause max sessions in use to be 2 during this window. 1663 sh1 := takeSession(ctx, t, sp) 1664 sh2 := takeSession(ctx, t, sp) 1665 wantSessions := map[string]bool{} 1666 wantSessions[sh1.getID()] = true 1667 wantSessions[sh2.getID()] = true 1668 // Return the sessions to the pool and then assure that they 1669 // are not deleted while still within the maintenance window. 1670 sh1.recycle() 1671 sh2.recycle() 1672 // Wait for 20 milliseconds, i.e. approx 2 iterations of the 1673 // maintainer. The sessions should still be in the pool. 1674 <-time.After(sampleInterval * 2) 1675 sh3 := takeSession(ctx, t, sp) 1676 sh4 := takeSession(ctx, t, sp) 1677 // Check that the returned sessions are equal to the sessions that we got 1678 // the first time from the session pool. 1679 gotSessions := map[string]bool{} 1680 gotSessions[sh3.getID()] = true 1681 gotSessions[sh4.getID()] = true 1682 testEqual(wantSessions, gotSessions) 1683 // Return the sessions to the pool. 1684 sh3.recycle() 1685 sh4.recycle() 1686 1687 // Now wait for the maintenance window to finish. This will cause the 1688 // maintainer to enter a new window and reset the max number of sessions in 1689 // use to the currently number of checked out sessions. That is 0, as all 1690 // sessions have been returned to the pool. That again will cause the 1691 // maintainer to delete these sessions at the next iteration, unless we 1692 // checkout new sessions during the first iteration. 1693 waitFor(t, func() error { 1694 sp.mu.Lock() 1695 defer sp.mu.Unlock() 1696 if sp.numOpened > 0 { 1697 return fmt.Errorf("session pool still contains more than 0 sessions") 1698 } 1699 return nil 1700 }) 1701 sh5 := takeSession(ctx, t, sp) 1702 sh6 := takeSession(ctx, t, sp) 1703 // Assure that these sessions are new sessions. 1704 if gotSessions[sh5.getID()] || gotSessions[sh6.getID()] { 1705 t.Fatal("got unexpected existing session from pool") 1706 } 1707} 1708 1709func takeSession(ctx context.Context, t *testing.T, sp *sessionPool) *sessionHandle { 1710 sh, err := sp.take(ctx) 1711 if err != nil { 1712 t.Fatalf("cannot get session from session pool: %v", err) 1713 } 1714 return sh 1715} 1716 1717func TestMaintenanceWindow_CycleAndUpdateMaxCheckedOut(t *testing.T) { 1718 t.Parallel() 1719 1720 maxOpened := uint64(1000) 1721 mw := newMaintenanceWindow(maxOpened) 1722 for _, m := range mw.maxSessionsCheckedOut { 1723 if m < maxOpened { 1724 t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, maxOpened) 1725 } 1726 } 1727 // Do one cycle and simulate that there are currently no sessions checked 1728 // out of the pool. 1729 mw.startNewCycle(0) 1730 if g, w := mw.maxSessionsCheckedOut[0], uint64(0); g != w { 1731 t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w) 1732 } 1733 for _, m := range mw.maxSessionsCheckedOut[1:] { 1734 if m < maxOpened { 1735 t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, maxOpened) 1736 } 1737 } 1738 // Check that the max checked out during the entire window is still 1739 // maxOpened. 1740 if g, w := mw.maxSessionsCheckedOutDuringWindow(), maxOpened; g != w { 1741 t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w) 1742 } 1743 // Update the max number checked out for the current cycle. 1744 mw.updateMaxSessionsCheckedOutDuringWindow(uint64(10)) 1745 if g, w := mw.maxSessionsCheckedOut[0], uint64(10); g != w { 1746 t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w) 1747 } 1748 // The max of the entire window should still not change. 1749 if g, w := mw.maxSessionsCheckedOutDuringWindow(), maxOpened; g != w { 1750 t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w) 1751 } 1752 // Now pass enough cycles to complete a maintenance window. Each cycle has 1753 // no sessions checked out. We start at 1, as we have already passed one 1754 // cycle. This should then be the last cycle still in the maintenance 1755 // window, and the only one with a maxSessionsCheckedOut greater than 0. 1756 for i := 1; i < maintenanceWindowSize; i++ { 1757 mw.startNewCycle(0) 1758 } 1759 for _, m := range mw.maxSessionsCheckedOut[:9] { 1760 if m != 0 { 1761 t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, 0) 1762 } 1763 } 1764 // The oldest cycle in the window should have max=10. 1765 if g, w := mw.maxSessionsCheckedOut[maintenanceWindowSize-1], uint64(10); g != w { 1766 t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w) 1767 } 1768 // The max of the entire window should now be 10. 1769 if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(10); g != w { 1770 t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w) 1771 } 1772 // Do another cycle with max=0. 1773 mw.startNewCycle(0) 1774 // The max of the entire window should now be 0. 1775 if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(0); g != w { 1776 t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w) 1777 } 1778 // Do another cycle with 5 sessions as max. This should now be the new 1779 // window max. 1780 mw.startNewCycle(5) 1781 if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(5); g != w { 1782 t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w) 1783 } 1784 // Do a couple of cycles so that the only non-zero value is in the middle. 1785 // The max for the entire window should still be 5. 1786 for i := 0; i < maintenanceWindowSize/2; i++ { 1787 mw.startNewCycle(0) 1788 } 1789 if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(5); g != w { 1790 t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w) 1791 } 1792} 1793