1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "container/heap" 21 "container/list" 22 "context" 23 "fmt" 24 "log" 25 "math" 26 "math/rand" 27 "runtime/debug" 28 "sync" 29 "time" 30 31 "cloud.google.com/go/internal/trace" 32 "cloud.google.com/go/internal/version" 33 vkit "cloud.google.com/go/spanner/apiv1" 34 "go.opencensus.io/stats" 35 "go.opencensus.io/tag" 36 sppb "google.golang.org/genproto/googleapis/spanner/v1" 37 "google.golang.org/grpc/codes" 38 "google.golang.org/grpc/metadata" 39) 40 41const healthCheckIntervalMins = 50 42 43// sessionHandle is an interface for transactions to access Cloud Spanner 44// sessions safely. It is generated by sessionPool.take(). 45type sessionHandle struct { 46 // mu guarantees that the inner session object is returned / destroyed only 47 // once. 48 mu sync.Mutex 49 // session is a pointer to a session object. Transactions never need to 50 // access it directly. 51 session *session 52 // checkoutTime is the time the session was checked out of the pool. 53 checkoutTime time.Time 54 // trackedSessionHandle is the linked list node which links the session to 55 // the list of tracked session handles. trackedSessionHandle is only set if 56 // TrackSessionHandles has been enabled in the session pool configuration. 57 trackedSessionHandle *list.Element 58 // stack is the call stack of the goroutine that checked out the session 59 // from the pool. This can be used to track down session leak problems. 60 stack []byte 61} 62 63// recycle gives the inner session object back to its home session pool. It is 64// safe to call recycle multiple times but only the first one would take effect. 65func (sh *sessionHandle) recycle() { 66 sh.mu.Lock() 67 if sh.session == nil { 68 // sessionHandle has already been recycled. 69 sh.mu.Unlock() 70 return 71 } 72 p := sh.session.pool 73 tracked := sh.trackedSessionHandle 74 sh.session.recycle() 75 sh.session = nil 76 sh.trackedSessionHandle = nil 77 sh.checkoutTime = time.Time{} 78 sh.stack = nil 79 sh.mu.Unlock() 80 if tracked != nil { 81 p.mu.Lock() 82 p.trackedSessionHandles.Remove(tracked) 83 p.mu.Unlock() 84 } 85} 86 87// getID gets the Cloud Spanner session ID from the internal session object. 88// getID returns empty string if the sessionHandle is nil or the inner session 89// object has been released by recycle / destroy. 90func (sh *sessionHandle) getID() string { 91 sh.mu.Lock() 92 defer sh.mu.Unlock() 93 if sh.session == nil { 94 // sessionHandle has already been recycled/destroyed. 95 return "" 96 } 97 return sh.session.getID() 98} 99 100// getClient gets the Cloud Spanner RPC client associated with the session ID 101// in sessionHandle. 102func (sh *sessionHandle) getClient() *vkit.Client { 103 sh.mu.Lock() 104 defer sh.mu.Unlock() 105 if sh.session == nil { 106 return nil 107 } 108 return sh.session.client 109} 110 111// getMetadata returns the metadata associated with the session in sessionHandle. 112func (sh *sessionHandle) getMetadata() metadata.MD { 113 sh.mu.Lock() 114 defer sh.mu.Unlock() 115 if sh.session == nil { 116 return nil 117 } 118 return sh.session.md 119} 120 121// getTransactionID returns the transaction id in the session if available. 122func (sh *sessionHandle) getTransactionID() transactionID { 123 sh.mu.Lock() 124 defer sh.mu.Unlock() 125 if sh.session == nil { 126 return nil 127 } 128 return sh.session.tx 129} 130 131// destroy destroys the inner session object. It is safe to call destroy 132// multiple times and only the first call would attempt to 133// destroy the inner session object. 134func (sh *sessionHandle) destroy() { 135 sh.mu.Lock() 136 s := sh.session 137 if s == nil { 138 // sessionHandle has already been recycled. 139 sh.mu.Unlock() 140 return 141 } 142 tracked := sh.trackedSessionHandle 143 sh.session = nil 144 sh.trackedSessionHandle = nil 145 sh.checkoutTime = time.Time{} 146 sh.stack = nil 147 sh.mu.Unlock() 148 149 if tracked != nil { 150 p := s.pool 151 p.mu.Lock() 152 p.trackedSessionHandles.Remove(tracked) 153 p.mu.Unlock() 154 } 155 s.destroy(false) 156} 157 158// session wraps a Cloud Spanner session ID through which transactions are 159// created and executed. 160type session struct { 161 // client is the RPC channel to Cloud Spanner. It is set only once during 162 // session's creation. 163 client *vkit.Client 164 // id is the unique id of the session in Cloud Spanner. It is set only once 165 // during session's creation. 166 id string 167 // pool is the session's home session pool where it was created. It is set 168 // only once during session's creation. 169 pool *sessionPool 170 // createTime is the timestamp of the session's creation. It is set only 171 // once during session's creation. 172 createTime time.Time 173 // logger is the logger configured for the Spanner client that created the 174 // session. If nil, logging will be directed to the standard logger. 175 logger *log.Logger 176 177 // mu protects the following fields from concurrent access: both 178 // healthcheck workers and transactions can modify them. 179 mu sync.Mutex 180 // valid marks the validity of a session. 181 valid bool 182 // hcIndex is the index of the session inside the global healthcheck queue. 183 // If hcIndex < 0, session has been unregistered from the queue. 184 hcIndex int 185 // idleList is the linkedlist node which links the session to its home 186 // session pool's idle list. If idleList == nil, the 187 // session is not in idle list. 188 idleList *list.Element 189 // nextCheck is the timestamp of next scheduled healthcheck of the session. 190 // It is maintained by the global health checker. 191 nextCheck time.Time 192 // checkingHelath is true if currently this session is being processed by 193 // health checker. Must be modified under health checker lock. 194 checkingHealth bool 195 // md is the Metadata to be sent with each request. 196 md metadata.MD 197 // tx contains the transaction id if the session has been prepared for 198 // write. 199 tx transactionID 200 // firstHCDone indicates whether the first health check is done or not. 201 firstHCDone bool 202} 203 204// isValid returns true if the session is still valid for use. 205func (s *session) isValid() bool { 206 s.mu.Lock() 207 defer s.mu.Unlock() 208 return s.valid 209} 210 211// isWritePrepared returns true if the session is prepared for write. 212func (s *session) isWritePrepared() bool { 213 s.mu.Lock() 214 defer s.mu.Unlock() 215 return s.tx != nil 216} 217 218// String implements fmt.Stringer for session. 219func (s *session) String() string { 220 s.mu.Lock() 221 defer s.mu.Unlock() 222 return fmt.Sprintf("<id=%v, hcIdx=%v, idleList=%p, valid=%v, create=%v, nextcheck=%v>", 223 s.id, s.hcIndex, s.idleList, s.valid, s.createTime, s.nextCheck) 224} 225 226// ping verifies if the session is still alive in Cloud Spanner. 227func (s *session) ping() error { 228 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 229 defer cancel() 230 // s.getID is safe even when s is invalid. 231 _, err := s.client.GetSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.GetSessionRequest{Name: s.getID()}) 232 return err 233} 234 235// setHcIndex atomically sets the session's index in the healthcheck queue and 236// returns the old index. 237func (s *session) setHcIndex(i int) int { 238 s.mu.Lock() 239 defer s.mu.Unlock() 240 oi := s.hcIndex 241 s.hcIndex = i 242 return oi 243} 244 245// setIdleList atomically sets the session's idle list link and returns the old 246// link. 247func (s *session) setIdleList(le *list.Element) *list.Element { 248 s.mu.Lock() 249 defer s.mu.Unlock() 250 old := s.idleList 251 s.idleList = le 252 return old 253} 254 255// invalidate marks a session as invalid and returns the old validity. 256func (s *session) invalidate() bool { 257 s.mu.Lock() 258 defer s.mu.Unlock() 259 ov := s.valid 260 s.valid = false 261 return ov 262} 263 264// setNextCheck sets the timestamp for next healthcheck on the session. 265func (s *session) setNextCheck(t time.Time) { 266 s.mu.Lock() 267 defer s.mu.Unlock() 268 s.nextCheck = t 269} 270 271// setTransactionID sets the transaction id in the session 272func (s *session) setTransactionID(tx transactionID) { 273 s.mu.Lock() 274 defer s.mu.Unlock() 275 s.tx = tx 276} 277 278// getID returns the session ID which uniquely identifies the session in Cloud 279// Spanner. 280func (s *session) getID() string { 281 s.mu.Lock() 282 defer s.mu.Unlock() 283 return s.id 284} 285 286// getHcIndex returns the session's index into the global healthcheck priority 287// queue. 288func (s *session) getHcIndex() int { 289 s.mu.Lock() 290 defer s.mu.Unlock() 291 return s.hcIndex 292} 293 294// getIdleList returns the session's link in its home session pool's idle list. 295func (s *session) getIdleList() *list.Element { 296 s.mu.Lock() 297 defer s.mu.Unlock() 298 return s.idleList 299} 300 301// getNextCheck returns the timestamp for next healthcheck on the session. 302func (s *session) getNextCheck() time.Time { 303 s.mu.Lock() 304 defer s.mu.Unlock() 305 return s.nextCheck 306} 307 308// recycle turns the session back to its home session pool. 309func (s *session) recycle() { 310 s.setTransactionID(nil) 311 if !s.pool.recycle(s) { 312 // s is rejected by its home session pool because it expired and the 313 // session pool currently has enough open sessions. 314 s.destroy(false) 315 } 316} 317 318// destroy removes the session from its home session pool, healthcheck queue 319// and Cloud Spanner service. 320func (s *session) destroy(isExpire bool) bool { 321 // Remove s from session pool. 322 if !s.pool.remove(s, isExpire) { 323 return false 324 } 325 // Unregister s from healthcheck queue. 326 s.pool.hc.unregister(s) 327 // Remove s from Cloud Spanner service. 328 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) 329 defer cancel() 330 s.delete(ctx) 331 return true 332} 333 334func (s *session) delete(ctx context.Context) { 335 // Ignore the error because even if we fail to explicitly destroy the 336 // session, it will be eventually garbage collected by Cloud Spanner. 337 err := s.client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: s.getID()}) 338 if err != nil { 339 logf(s.logger, "Failed to delete session %v. Error: %v", s.getID(), err) 340 } 341} 342 343// prepareForWrite prepares the session for write if it is not already in that 344// state. 345func (s *session) prepareForWrite(ctx context.Context) error { 346 if s.isWritePrepared() { 347 return nil 348 } 349 tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, s.md), s.getID(), s.client) 350 // Session not found should cause the session to be removed from the pool. 351 if isSessionNotFoundError(err) { 352 s.pool.remove(s, false) 353 s.pool.hc.unregister(s) 354 return err 355 } 356 // Enable/disable background preparing of write sessions depending on 357 // whether the BeginTransaction call succeeded. This will prevent the 358 // session pool workers from going into an infinite loop of trying to 359 // prepare sessions. Any subsequent successful BeginTransaction call from 360 // for example takeWriteSession will re-enable the background process. 361 s.pool.mu.Lock() 362 s.pool.disableBackgroundPrepareSessions = err != nil 363 s.pool.mu.Unlock() 364 if err != nil { 365 return err 366 } 367 s.setTransactionID(tx) 368 return nil 369} 370 371// SessionPoolConfig stores configurations of a session pool. 372type SessionPoolConfig struct { 373 // MaxOpened is the maximum number of opened sessions allowed by the session 374 // pool. If the client tries to open a session and there are already 375 // MaxOpened sessions, it will block until one becomes available or the 376 // context passed to the client method is canceled or times out. 377 // 378 // Defaults to NumChannels * 100. 379 MaxOpened uint64 380 381 // MinOpened is the minimum number of opened sessions that the session pool 382 // tries to maintain. Session pool won't continue to expire sessions if 383 // number of opened connections drops below MinOpened. However, if a session 384 // is found to be broken, it will still be evicted from the session pool, 385 // therefore it is posssible that the number of opened sessions drops below 386 // MinOpened. 387 // 388 // Defaults to 100. 389 MinOpened uint64 390 391 // MaxIdle is the maximum number of idle sessions, pool is allowed to keep. 392 // 393 // Defaults to 0. 394 MaxIdle uint64 395 396 // MaxBurst is the maximum number of concurrent session creation requests. 397 // 398 // Defaults to 10. 399 MaxBurst uint64 400 401 // WriteSessions is the fraction of sessions we try to keep prepared for 402 // write. 403 // 404 // Defaults to 0.2. 405 WriteSessions float64 406 407 // HealthCheckWorkers is number of workers used by health checker for this 408 // pool. 409 // 410 // Defaults to 10. 411 HealthCheckWorkers int 412 413 // HealthCheckInterval is how often the health checker pings a session. 414 // 415 // Defaults to 5m. 416 HealthCheckInterval time.Duration 417 418 // TrackSessionHandles determines whether the session pool will keep track 419 // of the stacktrace of the goroutines that take sessions from the pool. 420 // This setting can be used to track down session leak problems. 421 // 422 // Defaults to false. 423 TrackSessionHandles bool 424 425 // healthCheckSampleInterval is how often the health checker samples live 426 // session (for use in maintaining session pool size). 427 // 428 // Defaults to 1m. 429 healthCheckSampleInterval time.Duration 430 431 // sessionLabels for the sessions created in the session pool. 432 sessionLabels map[string]string 433} 434 435// DefaultSessionPoolConfig is the default configuration for the session pool 436// that will be used for a Spanner client, unless the user supplies a specific 437// session pool config. 438var DefaultSessionPoolConfig = SessionPoolConfig{ 439 MinOpened: 100, 440 MaxOpened: numChannels * 100, 441 MaxBurst: 10, 442 WriteSessions: 0.2, 443 HealthCheckWorkers: 10, 444 HealthCheckInterval: healthCheckIntervalMins * time.Minute, 445} 446 447// errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set. 448func errMinOpenedGTMaxOpened(maxOpened, minOpened uint64) error { 449 return spannerErrorf(codes.InvalidArgument, 450 "require SessionPoolConfig.MaxOpened >= SessionPoolConfig.MinOpened, got %d and %d", maxOpened, minOpened) 451} 452 453// errWriteFractionOutOfRange returns error for 454// SessionPoolConfig.WriteFraction < 0 or SessionPoolConfig.WriteFraction > 1 455func errWriteFractionOutOfRange(writeFraction float64) error { 456 return spannerErrorf(codes.InvalidArgument, 457 "require SessionPoolConfig.WriteSessions >= 0.0 && SessionPoolConfig.WriteSessions <= 1.0, got %.2f", writeFraction) 458} 459 460// errHealthCheckWorkersNegative returns error for 461// SessionPoolConfig.HealthCheckWorkers < 0 462func errHealthCheckWorkersNegative(workers int) error { 463 return spannerErrorf(codes.InvalidArgument, 464 "require SessionPoolConfig.HealthCheckWorkers >= 0, got %d", workers) 465} 466 467// errHealthCheckIntervalNegative returns error for 468// SessionPoolConfig.HealthCheckInterval < 0 469func errHealthCheckIntervalNegative(interval time.Duration) error { 470 return spannerErrorf(codes.InvalidArgument, 471 "require SessionPoolConfig.HealthCheckInterval >= 0, got %v", interval) 472} 473 474// validate verifies that the SessionPoolConfig is good for use. 475func (spc *SessionPoolConfig) validate() error { 476 if spc.MinOpened > spc.MaxOpened && spc.MaxOpened > 0 { 477 return errMinOpenedGTMaxOpened(spc.MaxOpened, spc.MinOpened) 478 } 479 if spc.WriteSessions < 0.0 || spc.WriteSessions > 1.0 { 480 return errWriteFractionOutOfRange(spc.WriteSessions) 481 } 482 if spc.HealthCheckWorkers < 0 { 483 return errHealthCheckWorkersNegative(spc.HealthCheckWorkers) 484 } 485 if spc.HealthCheckInterval < 0 { 486 return errHealthCheckIntervalNegative(spc.HealthCheckInterval) 487 } 488 return nil 489} 490 491// sessionPool creates and caches Cloud Spanner sessions. 492type sessionPool struct { 493 // mu protects sessionPool from concurrent access. 494 mu sync.Mutex 495 // valid marks the validity of the session pool. 496 valid bool 497 // sc is used to create the sessions for the pool. 498 sc *sessionClient 499 // trackedSessionHandles contains all sessions handles that have been 500 // checked out of the pool. The list is only filled if TrackSessionHandles 501 // has been enabled. 502 trackedSessionHandles list.List 503 // idleList caches idle session IDs. Session IDs in this list can be 504 // allocated for use. 505 idleList list.List 506 // idleWriteList caches idle sessions which have been prepared for write. 507 idleWriteList list.List 508 // mayGetSession is for broadcasting that session retrival/creation may 509 // proceed. 510 mayGetSession chan struct{} 511 // numOpened is the total number of open sessions from the session pool. 512 numOpened uint64 513 // createReqs is the number of ongoing session creation requests. 514 createReqs uint64 515 // prepareReqs is the number of ongoing session preparation request. 516 prepareReqs uint64 517 // disableBackgroundPrepareSessions indicates that the BeginTransaction 518 // call for a read/write transaction failed with a permanent error, such as 519 // PermissionDenied or `Database not found`. Further background calls to 520 // prepare sessions will be disabled. 521 disableBackgroundPrepareSessions bool 522 // configuration of the session pool. 523 SessionPoolConfig 524 // hc is the health checker 525 hc *healthChecker 526 // rand is a separately sourced random generator. 527 rand *rand.Rand 528 // numInUse is the number of sessions that are currently in use (checked out 529 // from the session pool). 530 numInUse uint64 531 // maxNumInUse is the maximum number of sessions in use concurrently in the 532 // current 10 minute interval. 533 maxNumInUse uint64 534 // lastResetTime is the start time of the window for recording maxNumInUse. 535 lastResetTime time.Time 536 537 // mw is the maintenance window containing statistics for the max number of 538 // sessions checked out of the pool during the last 10 minutes. 539 mw *maintenanceWindow 540 541 // tagMap is a map of all tags that are associated with the emitted metrics. 542 tagMap *tag.Map 543} 544 545// newSessionPool creates a new session pool. 546func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, error) { 547 if err := config.validate(); err != nil { 548 return nil, err 549 } 550 pool := &sessionPool{ 551 sc: sc, 552 valid: true, 553 mayGetSession: make(chan struct{}), 554 SessionPoolConfig: config, 555 mw: newMaintenanceWindow(config.MaxOpened), 556 rand: rand.New(rand.NewSource(time.Now().UnixNano())), 557 } 558 if config.HealthCheckWorkers == 0 { 559 // With 10 workers and assuming average latency of 5ms for 560 // BeginTransaction, we will be able to prepare 2000 tx/sec in advance. 561 // If the rate of takeWriteSession is more than that, it will degrade to 562 // doing BeginTransaction inline. 563 // 564 // TODO: consider resizing the worker pool dynamically according to the load. 565 config.HealthCheckWorkers = 10 566 } 567 if config.HealthCheckInterval == 0 { 568 config.HealthCheckInterval = healthCheckIntervalMins * time.Minute 569 } 570 if config.healthCheckSampleInterval == 0 { 571 config.healthCheckSampleInterval = time.Minute 572 } 573 574 _, instance, database, err := parseDatabaseName(sc.database) 575 if err != nil { 576 return nil, err 577 } 578 // Errors should not prevent initializing the session pool. 579 ctx, err := tag.New(context.Background(), 580 tag.Upsert(tagClientID, sc.id), 581 tag.Upsert(tagDatabase, database), 582 tag.Upsert(tagInstance, instance), 583 tag.Upsert(tagLibVersion, version.Repo), 584 ) 585 if err != nil { 586 logf(pool.sc.logger, "Failed to create tag map, error: %v", err) 587 } 588 pool.tagMap = tag.FromContext(ctx) 589 590 // On GCE VM, within the same region an healthcheck ping takes on average 591 // 10ms to finish, given a 5 minutes interval and 10 healthcheck workers, a 592 // healthChecker can effectively mantain 593 // 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions. 594 pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, config.healthCheckSampleInterval, pool) 595 596 // First initialize the pool before we indicate that the healthchecker is 597 // ready. This prevents the maintainer from starting before the pool has 598 // been initialized, which means that we guarantee that the initial 599 // sessions are created using BatchCreateSessions. 600 if config.MinOpened > 0 { 601 numSessions := minUint64(config.MinOpened, math.MaxInt32) 602 if err := pool.initPool(int32(numSessions)); err != nil { 603 return nil, err 604 } 605 } 606 pool.recordStat(context.Background(), MaxAllowedSessionsCount, int64(config.MaxOpened)) 607 close(pool.hc.ready) 608 return pool, nil 609} 610 611func (p *sessionPool) recordStat(ctx context.Context, m *stats.Int64Measure, n int64) { 612 ctx = tag.NewContext(ctx, p.tagMap) 613 recordStat(ctx, m, n) 614} 615 616func (p *sessionPool) initPool(numSessions int32) error { 617 p.mu.Lock() 618 // Take budget before the actual session creation. 619 p.numOpened += uint64(numSessions) 620 p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) 621 p.createReqs += uint64(numSessions) 622 p.mu.Unlock() 623 // Asynchronously create the initial sessions for the pool. 624 return p.sc.batchCreateSessions(numSessions, p) 625} 626 627// sessionReady is executed by the SessionClient when a session has been 628// created and is ready to use. This method will add the new session to the 629// pool and decrease the number of sessions that is being created. 630func (p *sessionPool) sessionReady(s *session) { 631 p.mu.Lock() 632 defer p.mu.Unlock() 633 // Set this pool as the home pool of the session and register it with the 634 // health checker. 635 s.pool = p 636 p.hc.register(s) 637 p.createReqs-- 638 // Insert the session at a random position in the pool to prevent all 639 // sessions affiliated with a channel to be placed at sequentially in the 640 // pool. 641 if p.idleList.Len() > 0 { 642 pos := rand.Intn(p.idleList.Len()) 643 before := p.idleList.Front() 644 for i := 0; i < pos; i++ { 645 before = before.Next() 646 } 647 s.setIdleList(p.idleList.InsertBefore(s, before)) 648 } else { 649 s.setIdleList(p.idleList.PushBack(s)) 650 } 651 // Notify other waiters blocking on session creation. 652 close(p.mayGetSession) 653 p.mayGetSession = make(chan struct{}) 654} 655 656// sessionCreationFailed is called by the SessionClient when the creation of one 657// or more requested sessions finished with an error. sessionCreationFailed will 658// decrease the number of sessions being created and notify any waiters that 659// the session creation failed. 660func (p *sessionPool) sessionCreationFailed(err error, numSessions int32) { 661 p.mu.Lock() 662 defer p.mu.Unlock() 663 p.createReqs -= uint64(numSessions) 664 p.numOpened -= uint64(numSessions) 665 p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) 666 // Notify other waiters blocking on session creation. 667 close(p.mayGetSession) 668 p.mayGetSession = make(chan struct{}) 669} 670 671// isValid checks if the session pool is still valid. 672func (p *sessionPool) isValid() bool { 673 if p == nil { 674 return false 675 } 676 p.mu.Lock() 677 defer p.mu.Unlock() 678 return p.valid 679} 680 681// close marks the session pool as closed. 682func (p *sessionPool) close() { 683 if p == nil { 684 return 685 } 686 p.mu.Lock() 687 if !p.valid { 688 p.mu.Unlock() 689 return 690 } 691 p.valid = false 692 p.mu.Unlock() 693 p.hc.close() 694 // destroy all the sessions 695 p.hc.mu.Lock() 696 allSessions := make([]*session, len(p.hc.queue.sessions)) 697 copy(allSessions, p.hc.queue.sessions) 698 p.hc.mu.Unlock() 699 for _, s := range allSessions { 700 s.destroy(false) 701 } 702} 703 704// errInvalidSessionPool is the error for using an invalid session pool. 705var errInvalidSessionPool = spannerErrorf(codes.InvalidArgument, "invalid session pool") 706 707// errGetSessionTimeout returns error for context timeout during 708// sessionPool.take(). 709var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canceled during getting session") 710 711// newSessionHandle creates a new session handle for the given session for this 712// session pool. The session handle will also hold a copy of the current call 713// stack if the session pool has been configured to track the call stacks of 714// sessions being checked out of the pool. 715func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) { 716 sh = &sessionHandle{session: s, checkoutTime: time.Now()} 717 if p.TrackSessionHandles { 718 p.mu.Lock() 719 sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh) 720 p.mu.Unlock() 721 sh.stack = debug.Stack() 722 } 723 return sh 724} 725 726// errGetSessionTimeout returns error for context timeout during 727// sessionPool.take(). 728func (p *sessionPool) errGetSessionTimeout() error { 729 if p.TrackSessionHandles { 730 return p.errGetSessionTimeoutWithTrackedSessionHandles() 731 } 732 return p.errGetBasicSessionTimeout() 733} 734 735// errGetBasicSessionTimeout returns error for context timout during 736// sessionPool.take() without any tracked sessionHandles. 737func (p *sessionPool) errGetBasicSessionTimeout() error { 738 return spannerErrorf(codes.Canceled, "timeout / context canceled during getting session.\n"+ 739 "Enable SessionPoolConfig.TrackSessionHandles if you suspect a session leak to get more information about the checked out sessions.") 740} 741 742// errGetSessionTimeoutWithTrackedSessionHandles returns error for context 743// timout during sessionPool.take() including a stacktrace of each checked out 744// session handle. 745func (p *sessionPool) errGetSessionTimeoutWithTrackedSessionHandles() error { 746 err := spannerErrorf(codes.Canceled, "timeout / context canceled during getting session.") 747 err.(*Error).additionalInformation = p.getTrackedSessionHandleStacksLocked() 748 return err 749} 750 751// getTrackedSessionHandleStacksLocked returns a string containing the 752// stacktrace of all currently checked out sessions of the pool. This method 753// requires the caller to have locked p.mu. 754func (p *sessionPool) getTrackedSessionHandleStacksLocked() string { 755 p.mu.Lock() 756 defer p.mu.Unlock() 757 stackTraces := "" 758 i := 1 759 element := p.trackedSessionHandles.Front() 760 for element != nil { 761 sh := element.Value.(*sessionHandle) 762 sh.mu.Lock() 763 if sh.stack != nil { 764 stackTraces = fmt.Sprintf("%s\n\nSession %d checked out of pool at %s by goroutine:\n%s", stackTraces, i, sh.checkoutTime.Format(time.RFC3339), sh.stack) 765 } 766 sh.mu.Unlock() 767 element = element.Next() 768 i++ 769 } 770 return stackTraces 771} 772 773// shouldPrepareWriteLocked returns true if we should prepare more sessions for write. 774func (p *sessionPool) shouldPrepareWriteLocked() bool { 775 return !p.disableBackgroundPrepareSessions && float64(p.numOpened)*p.WriteSessions > float64(p.idleWriteList.Len()+int(p.prepareReqs)) 776} 777 778func (p *sessionPool) createSession(ctx context.Context) (*session, error) { 779 trace.TracePrintf(ctx, nil, "Creating a new session") 780 doneCreate := func(done bool) { 781 p.mu.Lock() 782 if !done { 783 // Session creation failed, give budget back. 784 p.numOpened-- 785 p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 786 } 787 p.createReqs-- 788 // Notify other waiters blocking on session creation. 789 close(p.mayGetSession) 790 p.mayGetSession = make(chan struct{}) 791 p.mu.Unlock() 792 } 793 s, err := p.sc.createSession(ctx) 794 if err != nil { 795 doneCreate(false) 796 // Should return error directly because of the previous retries on 797 // CreateSession RPC. 798 // If the error is a timeout, there is a chance that the session was 799 // created on the server but is not known to the session pool. This 800 // session will then be garbage collected by the server after 1 hour. 801 return nil, err 802 } 803 s.pool = p 804 p.hc.register(s) 805 doneCreate(true) 806 return s, nil 807} 808 809func (p *sessionPool) isHealthy(s *session) bool { 810 if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) { 811 // TODO: figure out if we need to schedule a new healthcheck worker here. 812 if err := s.ping(); isSessionNotFoundError(err) { 813 // The session is already bad, continue to fetch/create a new one. 814 s.destroy(false) 815 return false 816 } 817 p.hc.scheduledHC(s) 818 } 819 return true 820} 821 822// take returns a cached session if there are available ones; if there isn't 823// any, it tries to allocate a new one. Session returned by take should be used 824// for read operations. 825func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) { 826 trace.TracePrintf(ctx, nil, "Acquiring a read-only session") 827 for { 828 var ( 829 s *session 830 err error 831 ) 832 833 p.mu.Lock() 834 if !p.valid { 835 p.mu.Unlock() 836 return nil, errInvalidSessionPool 837 } 838 if p.idleList.Len() > 0 { 839 // Idle sessions are available, get one from the top of the idle 840 // list. 841 s = p.idleList.Remove(p.idleList.Front()).(*session) 842 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 843 "Acquired read-only session") 844 } else if p.idleWriteList.Len() > 0 { 845 s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session) 846 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 847 "Acquired read-write session") 848 } 849 if s != nil { 850 s.setIdleList(nil) 851 numCheckedOut := p.currSessionsCheckedOutLocked() 852 p.mu.Unlock() 853 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 854 // From here, session is no longer in idle list, so healthcheck 855 // workers won't destroy it. If healthcheck workers failed to 856 // schedule healthcheck for the session timely, do the check here. 857 // Because session check is still much cheaper than session 858 // creation, they should be reused as much as possible. 859 if !p.isHealthy(s) { 860 continue 861 } 862 p.incNumInUse(ctx) 863 return p.newSessionHandle(s), nil 864 } 865 866 // Idle list is empty, block if session pool has reached max session 867 // creation concurrency or max number of open sessions. 868 if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) { 869 mayGetSession := p.mayGetSession 870 p.mu.Unlock() 871 trace.TracePrintf(ctx, nil, "Waiting for read-only session to become available") 872 select { 873 case <-ctx.Done(): 874 trace.TracePrintf(ctx, nil, "Context done waiting for session") 875 p.recordStat(ctx, GetSessionTimeoutsCount, 1) 876 return nil, p.errGetSessionTimeout() 877 case <-mayGetSession: 878 } 879 continue 880 } 881 882 // Take budget before the actual session creation. 883 p.numOpened++ 884 // Creating a new session that will be returned directly to the client 885 // means that the max number of sessions in use also increases. 886 numCheckedOut := p.currSessionsCheckedOutLocked() 887 p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 888 p.createReqs++ 889 p.mu.Unlock() 890 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 891 if s, err = p.createSession(ctx); err != nil { 892 trace.TracePrintf(ctx, nil, "Error creating session: %v", err) 893 return nil, toSpannerError(err) 894 } 895 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 896 "Created session") 897 p.incNumInUse(ctx) 898 return p.newSessionHandle(s), nil 899 } 900} 901 902// takeWriteSession returns a write prepared cached session if there are 903// available ones; if there isn't any, it tries to allocate a new one. Session 904// returned should be used for read write transactions. 905func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) { 906 trace.TracePrintf(ctx, nil, "Acquiring a read-write session") 907 for { 908 var ( 909 s *session 910 err error 911 ) 912 913 p.mu.Lock() 914 if !p.valid { 915 p.mu.Unlock() 916 return nil, errInvalidSessionPool 917 } 918 if p.idleWriteList.Len() > 0 { 919 // Idle sessions are available, get one from the top of the idle 920 // list. 921 s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session) 922 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-write session") 923 } else if p.idleList.Len() > 0 { 924 s = p.idleList.Remove(p.idleList.Front()).(*session) 925 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-only session") 926 } 927 if s != nil { 928 s.setIdleList(nil) 929 numCheckedOut := p.currSessionsCheckedOutLocked() 930 p.mu.Unlock() 931 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 932 // From here, session is no longer in idle list, so healthcheck 933 // workers won't destroy it. If healthcheck workers failed to 934 // schedule healthcheck for the session timely, do the check here. 935 // Because session check is still much cheaper than session 936 // creation, they should be reused as much as possible. 937 if !p.isHealthy(s) { 938 continue 939 } 940 } else { 941 // Idle list is empty, block if session pool has reached max session 942 // creation concurrency or max number of open sessions. 943 if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) { 944 mayGetSession := p.mayGetSession 945 p.mu.Unlock() 946 trace.TracePrintf(ctx, nil, "Waiting for read-write session to become available") 947 select { 948 case <-ctx.Done(): 949 trace.TracePrintf(ctx, nil, "Context done waiting for session") 950 p.recordStat(ctx, GetSessionTimeoutsCount, 1) 951 return nil, p.errGetSessionTimeout() 952 case <-mayGetSession: 953 } 954 continue 955 } 956 957 // Take budget before the actual session creation. 958 p.numOpened++ 959 // Creating a new session that will be returned directly to the client 960 // means that the max number of sessions in use also increases. 961 numCheckedOut := p.currSessionsCheckedOutLocked() 962 p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 963 p.createReqs++ 964 p.mu.Unlock() 965 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 966 if s, err = p.createSession(ctx); err != nil { 967 trace.TracePrintf(ctx, nil, "Error creating session: %v", err) 968 return nil, toSpannerError(err) 969 } 970 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 971 "Created session") 972 } 973 if !s.isWritePrepared() { 974 if err = s.prepareForWrite(ctx); err != nil { 975 if isSessionNotFoundError(err) { 976 s.destroy(false) 977 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 978 "Session not found for write") 979 return nil, toSpannerError(err) 980 } 981 982 s.recycle() 983 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 984 "Error preparing session for write") 985 return nil, toSpannerError(err) 986 } 987 } 988 p.incNumInUse(ctx) 989 return p.newSessionHandle(s), nil 990 } 991} 992 993// recycle puts session s back to the session pool's idle list, it returns true 994// if the session pool successfully recycles session s. 995func (p *sessionPool) recycle(s *session) bool { 996 p.mu.Lock() 997 defer p.mu.Unlock() 998 if !s.isValid() || !p.valid { 999 // Reject the session if session is invalid or pool itself is invalid. 1000 return false 1001 } 1002 // Put session at the top of the list to be handed out in LIFO order for load balancing 1003 // across channels. 1004 if s.isWritePrepared() { 1005 s.setIdleList(p.idleWriteList.PushFront(s)) 1006 } else { 1007 s.setIdleList(p.idleList.PushFront(s)) 1008 } 1009 // Broadcast that a session has been returned to idle list. 1010 close(p.mayGetSession) 1011 p.mayGetSession = make(chan struct{}) 1012 p.numInUse-- 1013 p.recordStat(context.Background(), InUseSessionsCount, int64(p.numInUse)) 1014 p.recordStat(context.Background(), ReleasedSessionsCount, 1) 1015 return true 1016} 1017 1018// remove atomically removes session s from the session pool and invalidates s. 1019// If isExpire == true, the removal is triggered by session expiration and in 1020// such cases, only idle sessions can be removed. 1021func (p *sessionPool) remove(s *session, isExpire bool) bool { 1022 p.mu.Lock() 1023 defer p.mu.Unlock() 1024 if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) { 1025 // Don't expire session if the session is not in idle list (in use), or 1026 // if number of open sessions is going below p.MinOpened. 1027 return false 1028 } 1029 ol := s.setIdleList(nil) 1030 // If the session is in the idlelist, remove it. 1031 if ol != nil { 1032 // Remove from whichever list it is in. 1033 p.idleList.Remove(ol) 1034 p.idleWriteList.Remove(ol) 1035 } 1036 if s.invalidate() { 1037 // Decrease the number of opened sessions. 1038 p.numOpened-- 1039 p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) 1040 // Broadcast that a session has been destroyed. 1041 close(p.mayGetSession) 1042 p.mayGetSession = make(chan struct{}) 1043 return true 1044 } 1045 return false 1046} 1047 1048func (p *sessionPool) currSessionsCheckedOutLocked() uint64 { 1049 return p.numOpened - uint64(p.idleList.Len()) - uint64(p.idleWriteList.Len()) 1050} 1051 1052func (p *sessionPool) incNumInUse(ctx context.Context) { 1053 p.mu.Lock() 1054 p.incNumInUseLocked(ctx) 1055 p.mu.Unlock() 1056} 1057 1058func (p *sessionPool) incNumInUseLocked(ctx context.Context) { 1059 p.numInUse++ 1060 p.recordStat(ctx, InUseSessionsCount, int64(p.numInUse)) 1061 p.recordStat(ctx, AcquiredSessionsCount, 1) 1062 if p.numInUse > p.maxNumInUse { 1063 p.maxNumInUse = p.numInUse 1064 p.recordStat(ctx, MaxInUseSessionsCount, int64(p.maxNumInUse)) 1065 } 1066} 1067 1068// hcHeap implements heap.Interface. It is used to create the priority queue for 1069// session healthchecks. 1070type hcHeap struct { 1071 sessions []*session 1072} 1073 1074// Len implements heap.Interface.Len. 1075func (h hcHeap) Len() int { 1076 return len(h.sessions) 1077} 1078 1079// Less implements heap.Interface.Less. 1080func (h hcHeap) Less(i, j int) bool { 1081 return h.sessions[i].getNextCheck().Before(h.sessions[j].getNextCheck()) 1082} 1083 1084// Swap implements heap.Interface.Swap. 1085func (h hcHeap) Swap(i, j int) { 1086 h.sessions[i], h.sessions[j] = h.sessions[j], h.sessions[i] 1087 h.sessions[i].setHcIndex(i) 1088 h.sessions[j].setHcIndex(j) 1089} 1090 1091// Push implements heap.Interface.Push. 1092func (h *hcHeap) Push(s interface{}) { 1093 ns := s.(*session) 1094 ns.setHcIndex(len(h.sessions)) 1095 h.sessions = append(h.sessions, ns) 1096} 1097 1098// Pop implements heap.Interface.Pop. 1099func (h *hcHeap) Pop() interface{} { 1100 old := h.sessions 1101 n := len(old) 1102 s := old[n-1] 1103 h.sessions = old[:n-1] 1104 s.setHcIndex(-1) 1105 return s 1106} 1107 1108// maintenanceWindowSize specifies the number of health check cycles that 1109// defines a maintenance window. The maintenance window keeps track of a 1110// rolling set of numbers for the number of maximum checked out sessions during 1111// the maintenance window. This is used by the maintainer to determine the 1112// number of sessions to create or delete at the end of each health check 1113// cycle. 1114const maintenanceWindowSize = 10 1115 1116// maintenanceWindow contains the statistics that are gathered during a health 1117// check maintenance window. 1118type maintenanceWindow struct { 1119 mu sync.Mutex 1120 // maxSessionsCheckedOut contains the maximum number of sessions that was 1121 // checked out of the session pool during a health check cycle. This number 1122 // indicates the number of sessions that was actually needed by the pool to 1123 // serve the load during that cycle. The values are kept as a rolling set 1124 // containing the values for the past 10 cycles (minutes). The maintainer 1125 // uses these values to determine the number of sessions to keep at the end 1126 // of each cycle. 1127 maxSessionsCheckedOut [maintenanceWindowSize]uint64 1128} 1129 1130// maxSessionsCheckedOutDuringWindow returns the maximum number of sessions 1131// that has been checked out during the last maintenance window of 10 cycles 1132// (minutes). 1133func (mw *maintenanceWindow) maxSessionsCheckedOutDuringWindow() uint64 { 1134 mw.mu.Lock() 1135 defer mw.mu.Unlock() 1136 var max uint64 1137 for _, cycleMax := range mw.maxSessionsCheckedOut { 1138 max = maxUint64(max, cycleMax) 1139 } 1140 return max 1141} 1142 1143// updateMaxSessionsCheckedOutDuringWindow updates the maximum number of 1144// sessions that has been checked out of the pool during the current 1145// cycle of the maintenance window. A maintenance window consists of 10 1146// maintenance cycles. Each cycle keeps track of the max number of sessions in 1147// use during that cycle. The rolling maintenance window of 10 cycles is used 1148// to determine the number of sessions to keep at the end of a cycle by 1149// calculating the max in use during the last 10 cycles. 1150func (mw *maintenanceWindow) updateMaxSessionsCheckedOutDuringWindow(currNumSessionsCheckedOut uint64) { 1151 mw.mu.Lock() 1152 defer mw.mu.Unlock() 1153 mw.maxSessionsCheckedOut[0] = maxUint64(currNumSessionsCheckedOut, mw.maxSessionsCheckedOut[0]) 1154} 1155 1156// startNewCycle starts a new health check cycle with the specified number of 1157// checked out sessions as its initial value. 1158func (mw *maintenanceWindow) startNewCycle(currNumSessionsCheckedOut uint64) { 1159 mw.mu.Lock() 1160 defer mw.mu.Unlock() 1161 copy(mw.maxSessionsCheckedOut[1:], mw.maxSessionsCheckedOut[:9]) 1162 mw.maxSessionsCheckedOut[0] = currNumSessionsCheckedOut 1163} 1164 1165// newMaintenanceWindow creates a new maintenance window with all values for 1166// maxSessionsCheckedOut set to maxOpened. This ensures that a complete 1167// maintenance window must pass before the maintainer will start to delete any 1168// sessions. 1169func newMaintenanceWindow(maxOpened uint64) *maintenanceWindow { 1170 mw := &maintenanceWindow{} 1171 // Initialize the rolling window with max values to prevent the maintainer 1172 // from deleting sessions before a complete window of 10 cycles has 1173 // finished. 1174 for i := 0; i < maintenanceWindowSize; i++ { 1175 mw.maxSessionsCheckedOut[i] = maxOpened 1176 } 1177 return mw 1178} 1179 1180// healthChecker performs periodical healthchecks on registered sessions. 1181type healthChecker struct { 1182 // mu protects concurrent access to healthChecker. 1183 mu sync.Mutex 1184 // queue is the priority queue for session healthchecks. Sessions with lower 1185 // nextCheck rank higher in the queue. 1186 queue hcHeap 1187 // interval is the average interval between two healthchecks on a session. 1188 interval time.Duration 1189 // workers is the number of concurrent healthcheck workers. 1190 workers int 1191 // waitWorkers waits for all healthcheck workers to exit 1192 waitWorkers sync.WaitGroup 1193 // pool is the underlying session pool. 1194 pool *sessionPool 1195 // sampleInterval is the interval of sampling by the maintainer. 1196 sampleInterval time.Duration 1197 // ready is used to signal that maintainer can start running. 1198 ready chan struct{} 1199 // done is used to signal that health checker should be closed. 1200 done chan struct{} 1201 // once is used for closing channel done only once. 1202 once sync.Once 1203 maintainerCancel func() 1204} 1205 1206// newHealthChecker initializes new instance of healthChecker. 1207func newHealthChecker(interval time.Duration, workers int, sampleInterval time.Duration, pool *sessionPool) *healthChecker { 1208 if workers <= 0 { 1209 workers = 1 1210 } 1211 hc := &healthChecker{ 1212 interval: interval, 1213 workers: workers, 1214 pool: pool, 1215 sampleInterval: sampleInterval, 1216 ready: make(chan struct{}), 1217 done: make(chan struct{}), 1218 maintainerCancel: func() {}, 1219 } 1220 hc.waitWorkers.Add(1) 1221 go hc.maintainer() 1222 for i := 1; i <= hc.workers; i++ { 1223 hc.waitWorkers.Add(1) 1224 go hc.worker(i) 1225 } 1226 return hc 1227} 1228 1229// close closes the healthChecker and waits for all healthcheck workers to exit. 1230func (hc *healthChecker) close() { 1231 hc.mu.Lock() 1232 hc.maintainerCancel() 1233 hc.mu.Unlock() 1234 hc.once.Do(func() { close(hc.done) }) 1235 hc.waitWorkers.Wait() 1236} 1237 1238// isClosing checks if a healthChecker is already closing. 1239func (hc *healthChecker) isClosing() bool { 1240 select { 1241 case <-hc.done: 1242 return true 1243 default: 1244 return false 1245 } 1246} 1247 1248// getInterval gets the healthcheck interval. 1249func (hc *healthChecker) getInterval() time.Duration { 1250 hc.mu.Lock() 1251 defer hc.mu.Unlock() 1252 return hc.interval 1253} 1254 1255// scheduledHCLocked schedules next healthcheck on session s with the assumption 1256// that hc.mu is being held. 1257func (hc *healthChecker) scheduledHCLocked(s *session) { 1258 var constPart, randPart float64 1259 if !s.firstHCDone { 1260 // The first check will be scheduled in a large range to make requests 1261 // more evenly distributed. The first healthcheck will be scheduled 1262 // after [interval*0.2, interval*1.1) ns. 1263 constPart = float64(hc.interval) * 0.2 1264 randPart = hc.pool.rand.Float64() * float64(hc.interval) * 0.9 1265 s.firstHCDone = true 1266 } else { 1267 // The next healthcheck will be scheduled after 1268 // [interval*0.9, interval*1.1) ns. 1269 constPart = float64(hc.interval) * 0.9 1270 randPart = hc.pool.rand.Float64() * float64(hc.interval) * 0.2 1271 } 1272 // math.Ceil makes the value to be at least 1 ns. 1273 nsFromNow := int64(math.Ceil(constPart + randPart)) 1274 s.setNextCheck(time.Now().Add(time.Duration(nsFromNow))) 1275 if hi := s.getHcIndex(); hi != -1 { 1276 // Session is still being tracked by healthcheck workers. 1277 heap.Fix(&hc.queue, hi) 1278 } 1279} 1280 1281// scheduledHC schedules next healthcheck on session s. It is safe to be called 1282// concurrently. 1283func (hc *healthChecker) scheduledHC(s *session) { 1284 hc.mu.Lock() 1285 defer hc.mu.Unlock() 1286 hc.scheduledHCLocked(s) 1287} 1288 1289// register registers a session with healthChecker for periodical healthcheck. 1290func (hc *healthChecker) register(s *session) { 1291 hc.mu.Lock() 1292 defer hc.mu.Unlock() 1293 hc.scheduledHCLocked(s) 1294 heap.Push(&hc.queue, s) 1295} 1296 1297// unregister unregisters a session from healthcheck queue. 1298func (hc *healthChecker) unregister(s *session) { 1299 hc.mu.Lock() 1300 defer hc.mu.Unlock() 1301 oi := s.setHcIndex(-1) 1302 if oi >= 0 { 1303 heap.Remove(&hc.queue, oi) 1304 } 1305} 1306 1307// markDone marks that health check for session has been performed. 1308func (hc *healthChecker) markDone(s *session) { 1309 hc.mu.Lock() 1310 defer hc.mu.Unlock() 1311 s.checkingHealth = false 1312} 1313 1314// healthCheck checks the health of the session and pings it if needed. 1315func (hc *healthChecker) healthCheck(s *session) { 1316 defer hc.markDone(s) 1317 if !s.pool.isValid() { 1318 // Session pool is closed, perform a garbage collection. 1319 s.destroy(false) 1320 return 1321 } 1322 if err := s.ping(); isSessionNotFoundError(err) { 1323 // Ping failed, destroy the session. 1324 s.destroy(false) 1325 } 1326} 1327 1328// worker performs the healthcheck on sessions in healthChecker's priority 1329// queue. 1330func (hc *healthChecker) worker(i int) { 1331 // Returns a session which we should ping to keep it alive. 1332 getNextForPing := func() *session { 1333 hc.pool.mu.Lock() 1334 defer hc.pool.mu.Unlock() 1335 hc.mu.Lock() 1336 defer hc.mu.Unlock() 1337 if hc.queue.Len() <= 0 { 1338 // Queue is empty. 1339 return nil 1340 } 1341 s := hc.queue.sessions[0] 1342 if s.getNextCheck().After(time.Now()) && hc.pool.valid { 1343 // All sessions have been checked recently. 1344 return nil 1345 } 1346 hc.scheduledHCLocked(s) 1347 if !s.checkingHealth { 1348 s.checkingHealth = true 1349 return s 1350 } 1351 return nil 1352 } 1353 1354 // Returns a session which we should prepare for write. 1355 getNextForTx := func() *session { 1356 hc.pool.mu.Lock() 1357 defer hc.pool.mu.Unlock() 1358 if hc.pool.shouldPrepareWriteLocked() { 1359 if hc.pool.idleList.Len() > 0 && hc.pool.valid { 1360 hc.mu.Lock() 1361 defer hc.mu.Unlock() 1362 if hc.pool.idleList.Front().Value.(*session).checkingHealth { 1363 return nil 1364 } 1365 session := hc.pool.idleList.Remove(hc.pool.idleList.Front()).(*session) 1366 session.checkingHealth = true 1367 hc.pool.prepareReqs++ 1368 hc.pool.incNumInUseLocked(context.Background()) 1369 return session 1370 } 1371 } 1372 return nil 1373 } 1374 1375 for { 1376 if hc.isClosing() { 1377 // Exit when the pool has been closed and all sessions have been 1378 // destroyed or when health checker has been closed. 1379 hc.waitWorkers.Done() 1380 return 1381 } 1382 ws := getNextForTx() 1383 if ws != nil { 1384 ctx, cancel := context.WithTimeout(context.Background(), time.Minute) 1385 err := ws.prepareForWrite(ctx) 1386 cancel() 1387 if err != nil { 1388 // Skip handling prepare error, session can be prepared in next 1389 // cycle. 1390 // Don't log about permission errors, which may be expected 1391 // (e.g. using read-only auth). 1392 serr := toSpannerError(err).(*Error) 1393 if serr.Code != codes.PermissionDenied { 1394 logf(hc.pool.sc.logger, "Failed to prepare session, error: %v", serr) 1395 } 1396 } 1397 hc.pool.recycle(ws) 1398 hc.pool.mu.Lock() 1399 hc.pool.prepareReqs-- 1400 hc.pool.mu.Unlock() 1401 hc.markDone(ws) 1402 } 1403 rs := getNextForPing() 1404 if rs == nil { 1405 if ws == nil { 1406 // No work to be done so sleep to avoid burning CPU. 1407 pause := int64(100 * time.Millisecond) 1408 if pause > int64(hc.interval) { 1409 pause = int64(hc.interval) 1410 } 1411 select { 1412 case <-time.After(time.Duration(rand.Int63n(pause) + pause/2)): 1413 case <-hc.done: 1414 } 1415 1416 } 1417 continue 1418 } 1419 hc.healthCheck(rs) 1420 } 1421} 1422 1423// maintainer maintains the number of sessions in the pool based on the session 1424// pool configuration and the current and historical number of sessions checked 1425// out of the pool. The maintainer will: 1426// 1. Ensure that the session pool contains at least MinOpened sessions. 1427// 2. If the current number of sessions in the pool exceeds the greatest number 1428// of checked out sessions (=sessions in use) during the last 10 minutes, 1429// and the delta is larger than MaxIdleSessions, the maintainer will reduce 1430// the number of sessions to maxSessionsInUseDuringWindow+MaxIdleSessions. 1431func (hc *healthChecker) maintainer() { 1432 // Wait until the pool is ready. 1433 <-hc.ready 1434 1435 for iteration := uint64(0); ; iteration++ { 1436 if hc.isClosing() { 1437 hc.waitWorkers.Done() 1438 return 1439 } 1440 1441 hc.pool.mu.Lock() 1442 currSessionsOpened := hc.pool.numOpened 1443 maxIdle := hc.pool.MaxIdle 1444 minOpened := hc.pool.MinOpened 1445 1446 // Reset the start time for recording the maximum number of sessions 1447 // in the pool. 1448 now := time.Now() 1449 if now.After(hc.pool.lastResetTime.Add(10 * time.Minute)) { 1450 hc.pool.maxNumInUse = hc.pool.numInUse 1451 hc.pool.recordStat(context.Background(), MaxInUseSessionsCount, int64(hc.pool.maxNumInUse)) 1452 hc.pool.lastResetTime = now 1453 } 1454 hc.pool.mu.Unlock() 1455 // Get the maximum number of sessions in use during the current 1456 // maintenance window. 1457 maxSessionsInUseDuringWindow := hc.pool.mw.maxSessionsCheckedOutDuringWindow() 1458 hc.mu.Lock() 1459 ctx, cancel := context.WithTimeout(context.Background(), hc.sampleInterval) 1460 hc.maintainerCancel = cancel 1461 hc.mu.Unlock() 1462 1463 // Grow or shrink pool if needed. 1464 // The number of sessions in the pool should be in the range 1465 // [Config.MinOpened, Config.MaxIdle+maxSessionsInUseDuringWindow] 1466 if currSessionsOpened < minOpened { 1467 hc.growPool(ctx, minOpened) 1468 } else if maxIdle+maxSessionsInUseDuringWindow < currSessionsOpened { 1469 hc.shrinkPool(ctx, maxIdle+maxSessionsInUseDuringWindow) 1470 } 1471 1472 select { 1473 case <-ctx.Done(): 1474 case <-hc.done: 1475 cancel() 1476 } 1477 // Cycle the maintenance window. This will remove the oldest cycle and 1478 // add a new cycle at the beginning of the maintenance window with the 1479 // currently checked out number of sessions as the max number of 1480 // sessions in use in this cycle. This value will be increased during 1481 // the next cycle if it increases. 1482 hc.pool.mu.Lock() 1483 currSessionsInUse := hc.pool.currSessionsCheckedOutLocked() 1484 hc.pool.mu.Unlock() 1485 hc.pool.mw.startNewCycle(currSessionsInUse) 1486 } 1487} 1488 1489// growPool grows the number of sessions in the pool to the specified number of 1490// sessions. It timeouts on sampleInterval. 1491func (hc *healthChecker) growPool(ctx context.Context, growToNumSessions uint64) { 1492 // Calculate the max number of sessions to create as a safeguard against 1493 // other processes that could be deleting sessions concurrently. 1494 hc.pool.mu.Lock() 1495 maxSessionsToCreate := int(growToNumSessions - hc.pool.numOpened) 1496 hc.pool.mu.Unlock() 1497 var created int 1498 for { 1499 if ctx.Err() != nil { 1500 return 1501 } 1502 1503 p := hc.pool 1504 p.mu.Lock() 1505 // Take budget before the actual session creation. 1506 if growToNumSessions <= p.numOpened || created >= maxSessionsToCreate { 1507 p.mu.Unlock() 1508 break 1509 } 1510 p.numOpened++ 1511 p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 1512 p.createReqs++ 1513 shouldPrepareWrite := p.shouldPrepareWriteLocked() 1514 p.mu.Unlock() 1515 var ( 1516 s *session 1517 err error 1518 ) 1519 createContext, cancel := context.WithTimeout(context.Background(), time.Minute) 1520 if s, err = p.createSession(createContext); err != nil { 1521 cancel() 1522 logf(p.sc.logger, "Failed to create session, error: %v", toSpannerError(err)) 1523 continue 1524 } 1525 cancel() 1526 created++ 1527 if shouldPrepareWrite { 1528 prepareContext, cancel := context.WithTimeout(context.Background(), time.Minute) 1529 if err = s.prepareForWrite(prepareContext); err != nil { 1530 cancel() 1531 p.recycle(s) 1532 // Don't log about permission errors, which may be expected 1533 // (e.g. using read-only auth). 1534 serr := toSpannerError(err).(*Error) 1535 if serr.Code != codes.PermissionDenied { 1536 logf(p.sc.logger, "Failed to prepare session, error: %v", serr) 1537 } 1538 continue 1539 } 1540 cancel() 1541 } 1542 p.recycle(s) 1543 } 1544} 1545 1546// shrinkPool scales down the session pool. The method will stop deleting 1547// sessions when shrinkToNumSessions number of sessions in the pool has 1548// been reached. The method will also stop deleting sessions if it detects that 1549// another process has started creating sessions for the pool again, for 1550// example through the take() method. 1551func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uint64) { 1552 hc.pool.mu.Lock() 1553 maxSessionsToDelete := int(hc.pool.numOpened - shrinkToNumSessions) 1554 hc.pool.mu.Unlock() 1555 var deleted int 1556 var prevNumOpened uint64 = math.MaxUint64 1557 for { 1558 if ctx.Err() != nil { 1559 return 1560 } 1561 1562 p := hc.pool 1563 p.mu.Lock() 1564 // Check if the number of open sessions has increased. If it has, we 1565 // should stop deleting sessions, as the load has increased and 1566 // additional sessions are needed. 1567 if p.numOpened >= prevNumOpened { 1568 p.mu.Unlock() 1569 break 1570 } 1571 prevNumOpened = p.numOpened 1572 1573 // Check on both whether we have reached the number of open sessions as 1574 // well as the number of sessions to delete, in case sessions have been 1575 // deleted by other methods because they have expired or deemed 1576 // invalid. 1577 if shrinkToNumSessions >= p.numOpened || deleted >= maxSessionsToDelete { 1578 p.mu.Unlock() 1579 break 1580 } 1581 1582 var s *session 1583 if p.idleList.Len() > 0 { 1584 s = p.idleList.Front().Value.(*session) 1585 } else if p.idleWriteList.Len() > 0 { 1586 s = p.idleWriteList.Front().Value.(*session) 1587 } 1588 p.mu.Unlock() 1589 if s != nil { 1590 deleted++ 1591 // destroy session as expire. 1592 s.destroy(true) 1593 } else { 1594 break 1595 } 1596 } 1597} 1598 1599// maxUint64 returns the maximum of two uint64. 1600func maxUint64(a, b uint64) uint64 { 1601 if a > b { 1602 return a 1603 } 1604 return b 1605} 1606 1607// minUint64 returns the minimum of two uint64. 1608func minUint64(a, b uint64) uint64 { 1609 if a > b { 1610 return b 1611 } 1612 return a 1613} 1614 1615// sessionResourceType is the type name of Spanner sessions. 1616const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session" 1617 1618// isSessionNotFoundError returns true if the given error is a 1619// `Session not found` error. 1620func isSessionNotFoundError(err error) bool { 1621 if err == nil { 1622 return false 1623 } 1624 if ErrCode(err) == codes.NotFound { 1625 if rt, ok := extractResourceType(err); ok { 1626 return rt == sessionResourceType 1627 } 1628 } 1629 return false 1630} 1631