1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "container/heap" 21 "container/list" 22 "context" 23 "fmt" 24 "log" 25 "math" 26 "math/rand" 27 "runtime/debug" 28 "strings" 29 "sync" 30 "time" 31 32 "cloud.google.com/go/internal/trace" 33 vkit "cloud.google.com/go/spanner/apiv1" 34 sppb "google.golang.org/genproto/googleapis/spanner/v1" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/metadata" 37) 38 39// sessionHandle is an interface for transactions to access Cloud Spanner 40// sessions safely. It is generated by sessionPool.take(). 41type sessionHandle struct { 42 // mu guarantees that the inner session object is returned / destroyed only 43 // once. 44 mu sync.Mutex 45 // session is a pointer to a session object. Transactions never need to 46 // access it directly. 47 session *session 48 // checkoutTime is the time the session was checked out of the pool. 49 checkoutTime time.Time 50 // trackedSessionHandle is the linked list node which links the session to 51 // the list of tracked session handles. trackedSessionHandle is only set if 52 // TrackSessionHandles has been enabled in the session pool configuration. 53 trackedSessionHandle *list.Element 54 // stack is the call stack of the goroutine that checked out the session 55 // from the pool. This can be used to track down session leak problems. 56 stack []byte 57} 58 59// recycle gives the inner session object back to its home session pool. It is 60// safe to call recycle multiple times but only the first one would take effect. 61func (sh *sessionHandle) recycle() { 62 sh.mu.Lock() 63 if sh.session == nil { 64 // sessionHandle has already been recycled. 65 sh.mu.Unlock() 66 return 67 } 68 p := sh.session.pool 69 tracked := sh.trackedSessionHandle 70 sh.session.recycle() 71 sh.session = nil 72 sh.trackedSessionHandle = nil 73 sh.checkoutTime = time.Time{} 74 sh.stack = nil 75 sh.mu.Unlock() 76 if tracked != nil { 77 p.mu.Lock() 78 p.trackedSessionHandles.Remove(tracked) 79 p.mu.Unlock() 80 } 81} 82 83// getID gets the Cloud Spanner session ID from the internal session object. 84// getID returns empty string if the sessionHandle is nil or the inner session 85// object has been released by recycle / destroy. 86func (sh *sessionHandle) getID() string { 87 sh.mu.Lock() 88 defer sh.mu.Unlock() 89 if sh.session == nil { 90 // sessionHandle has already been recycled/destroyed. 91 return "" 92 } 93 return sh.session.getID() 94} 95 96// getClient gets the Cloud Spanner RPC client associated with the session ID 97// in sessionHandle. 98func (sh *sessionHandle) getClient() *vkit.Client { 99 sh.mu.Lock() 100 defer sh.mu.Unlock() 101 if sh.session == nil { 102 return nil 103 } 104 return sh.session.client 105} 106 107// getMetadata returns the metadata associated with the session in sessionHandle. 108func (sh *sessionHandle) getMetadata() metadata.MD { 109 sh.mu.Lock() 110 defer sh.mu.Unlock() 111 if sh.session == nil { 112 return nil 113 } 114 return sh.session.md 115} 116 117// getTransactionID returns the transaction id in the session if available. 118func (sh *sessionHandle) getTransactionID() transactionID { 119 sh.mu.Lock() 120 defer sh.mu.Unlock() 121 if sh.session == nil { 122 return nil 123 } 124 return sh.session.tx 125} 126 127// destroy destroys the inner session object. It is safe to call destroy 128// multiple times and only the first call would attempt to 129// destroy the inner session object. 130func (sh *sessionHandle) destroy() { 131 sh.mu.Lock() 132 s := sh.session 133 if s == nil { 134 // sessionHandle has already been recycled. 135 sh.mu.Unlock() 136 return 137 } 138 tracked := sh.trackedSessionHandle 139 sh.session = nil 140 sh.trackedSessionHandle = nil 141 sh.checkoutTime = time.Time{} 142 sh.stack = nil 143 sh.mu.Unlock() 144 145 if tracked != nil { 146 p := s.pool 147 p.mu.Lock() 148 p.trackedSessionHandles.Remove(tracked) 149 p.mu.Unlock() 150 } 151 s.destroy(false) 152} 153 154// session wraps a Cloud Spanner session ID through which transactions are 155// created and executed. 156type session struct { 157 // client is the RPC channel to Cloud Spanner. It is set only once during 158 // session's creation. 159 client *vkit.Client 160 // id is the unique id of the session in Cloud Spanner. It is set only once 161 // during session's creation. 162 id string 163 // pool is the session's home session pool where it was created. It is set 164 // only once during session's creation. 165 pool *sessionPool 166 // createTime is the timestamp of the session's creation. It is set only 167 // once during session's creation. 168 createTime time.Time 169 // logger is the logger configured for the Spanner client that created the 170 // session. If nil, logging will be directed to the standard logger. 171 logger *log.Logger 172 173 // mu protects the following fields from concurrent access: both 174 // healthcheck workers and transactions can modify them. 175 mu sync.Mutex 176 // valid marks the validity of a session. 177 valid bool 178 // hcIndex is the index of the session inside the global healthcheck queue. 179 // If hcIndex < 0, session has been unregistered from the queue. 180 hcIndex int 181 // idleList is the linkedlist node which links the session to its home 182 // session pool's idle list. If idleList == nil, the 183 // session is not in idle list. 184 idleList *list.Element 185 // nextCheck is the timestamp of next scheduled healthcheck of the session. 186 // It is maintained by the global health checker. 187 nextCheck time.Time 188 // checkingHelath is true if currently this session is being processed by 189 // health checker. Must be modified under health checker lock. 190 checkingHealth bool 191 // md is the Metadata to be sent with each request. 192 md metadata.MD 193 // tx contains the transaction id if the session has been prepared for 194 // write. 195 tx transactionID 196} 197 198// isValid returns true if the session is still valid for use. 199func (s *session) isValid() bool { 200 s.mu.Lock() 201 defer s.mu.Unlock() 202 return s.valid 203} 204 205// isWritePrepared returns true if the session is prepared for write. 206func (s *session) isWritePrepared() bool { 207 s.mu.Lock() 208 defer s.mu.Unlock() 209 return s.tx != nil 210} 211 212// String implements fmt.Stringer for session. 213func (s *session) String() string { 214 s.mu.Lock() 215 defer s.mu.Unlock() 216 return fmt.Sprintf("<id=%v, hcIdx=%v, idleList=%p, valid=%v, create=%v, nextcheck=%v>", 217 s.id, s.hcIndex, s.idleList, s.valid, s.createTime, s.nextCheck) 218} 219 220// ping verifies if the session is still alive in Cloud Spanner. 221func (s *session) ping() error { 222 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 223 defer cancel() 224 // s.getID is safe even when s is invalid. 225 _, err := s.client.GetSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.GetSessionRequest{Name: s.getID()}) 226 return err 227} 228 229// setHcIndex atomically sets the session's index in the healthcheck queue and 230// returns the old index. 231func (s *session) setHcIndex(i int) int { 232 s.mu.Lock() 233 defer s.mu.Unlock() 234 oi := s.hcIndex 235 s.hcIndex = i 236 return oi 237} 238 239// setIdleList atomically sets the session's idle list link and returns the old 240// link. 241func (s *session) setIdleList(le *list.Element) *list.Element { 242 s.mu.Lock() 243 defer s.mu.Unlock() 244 old := s.idleList 245 s.idleList = le 246 return old 247} 248 249// invalidate marks a session as invalid and returns the old validity. 250func (s *session) invalidate() bool { 251 s.mu.Lock() 252 defer s.mu.Unlock() 253 ov := s.valid 254 s.valid = false 255 return ov 256} 257 258// setNextCheck sets the timestamp for next healthcheck on the session. 259func (s *session) setNextCheck(t time.Time) { 260 s.mu.Lock() 261 defer s.mu.Unlock() 262 s.nextCheck = t 263} 264 265// setTransactionID sets the transaction id in the session 266func (s *session) setTransactionID(tx transactionID) { 267 s.mu.Lock() 268 defer s.mu.Unlock() 269 s.tx = tx 270} 271 272// getID returns the session ID which uniquely identifies the session in Cloud 273// Spanner. 274func (s *session) getID() string { 275 s.mu.Lock() 276 defer s.mu.Unlock() 277 return s.id 278} 279 280// getHcIndex returns the session's index into the global healthcheck priority 281// queue. 282func (s *session) getHcIndex() int { 283 s.mu.Lock() 284 defer s.mu.Unlock() 285 return s.hcIndex 286} 287 288// getIdleList returns the session's link in its home session pool's idle list. 289func (s *session) getIdleList() *list.Element { 290 s.mu.Lock() 291 defer s.mu.Unlock() 292 return s.idleList 293} 294 295// getNextCheck returns the timestamp for next healthcheck on the session. 296func (s *session) getNextCheck() time.Time { 297 s.mu.Lock() 298 defer s.mu.Unlock() 299 return s.nextCheck 300} 301 302// recycle turns the session back to its home session pool. 303func (s *session) recycle() { 304 s.setTransactionID(nil) 305 if !s.pool.recycle(s) { 306 // s is rejected by its home session pool because it expired and the 307 // session pool currently has enough open sessions. 308 s.destroy(false) 309 } 310} 311 312// destroy removes the session from its home session pool, healthcheck queue 313// and Cloud Spanner service. 314func (s *session) destroy(isExpire bool) bool { 315 // Remove s from session pool. 316 if !s.pool.remove(s, isExpire) { 317 return false 318 } 319 // Unregister s from healthcheck queue. 320 s.pool.hc.unregister(s) 321 // Remove s from Cloud Spanner service. 322 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) 323 defer cancel() 324 s.delete(ctx) 325 return true 326} 327 328func (s *session) delete(ctx context.Context) { 329 // Ignore the error because even if we fail to explicitly destroy the 330 // session, it will be eventually garbage collected by Cloud Spanner. 331 err := s.client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: s.getID()}) 332 if err != nil { 333 logf(s.logger, "Failed to delete session %v. Error: %v", s.getID(), err) 334 } 335} 336 337// prepareForWrite prepares the session for write if it is not already in that 338// state. 339func (s *session) prepareForWrite(ctx context.Context) error { 340 if s.isWritePrepared() { 341 return nil 342 } 343 tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, s.md), s.getID(), s.client) 344 // Session not found should cause the session to be removed from the pool. 345 if isSessionNotFoundError(err) { 346 s.pool.remove(s, false) 347 s.pool.hc.unregister(s) 348 return err 349 } 350 // Enable/disable background preparing of write sessions depending on 351 // whether the BeginTransaction call succeeded. This will prevent the 352 // session pool workers from going into an infinite loop of trying to 353 // prepare sessions. Any subsequent successful BeginTransaction call from 354 // for example takeWriteSession will re-enable the background process. 355 s.pool.mu.Lock() 356 s.pool.disableBackgroundPrepareSessions = err != nil 357 s.pool.mu.Unlock() 358 if err != nil { 359 return err 360 } 361 s.setTransactionID(tx) 362 return nil 363} 364 365// SessionPoolConfig stores configurations of a session pool. 366type SessionPoolConfig struct { 367 // MaxOpened is the maximum number of opened sessions allowed by the session 368 // pool. If the client tries to open a session and there are already 369 // MaxOpened sessions, it will block until one becomes available or the 370 // context passed to the client method is canceled or times out. 371 // 372 // Defaults to NumChannels * 100. 373 MaxOpened uint64 374 375 // MinOpened is the minimum number of opened sessions that the session pool 376 // tries to maintain. Session pool won't continue to expire sessions if 377 // number of opened connections drops below MinOpened. However, if a session 378 // is found to be broken, it will still be evicted from the session pool, 379 // therefore it is posssible that the number of opened sessions drops below 380 // MinOpened. 381 // 382 // Defaults to 100. 383 MinOpened uint64 384 385 // MaxIdle is the maximum number of idle sessions, pool is allowed to keep. 386 // 387 // Defaults to 0. 388 MaxIdle uint64 389 390 // MaxBurst is the maximum number of concurrent session creation requests. 391 // 392 // Defaults to 10. 393 MaxBurst uint64 394 395 // WriteSessions is the fraction of sessions we try to keep prepared for 396 // write. 397 // 398 // Defaults to 0.2. 399 WriteSessions float64 400 401 // HealthCheckWorkers is number of workers used by health checker for this 402 // pool. 403 // 404 // Defaults to 10. 405 HealthCheckWorkers int 406 407 // HealthCheckInterval is how often the health checker pings a session. 408 // 409 // Defaults to 5m. 410 HealthCheckInterval time.Duration 411 412 // TrackSessionHandles determines whether the session pool will keep track 413 // of the stacktrace of the goroutines that take sessions from the pool. 414 // This setting can be used to track down session leak problems. 415 // 416 // Defaults to false. 417 TrackSessionHandles bool 418 419 // healthCheckSampleInterval is how often the health checker samples live 420 // session (for use in maintaining session pool size). 421 // 422 // Defaults to 1m. 423 healthCheckSampleInterval time.Duration 424 425 // sessionLabels for the sessions created in the session pool. 426 sessionLabels map[string]string 427} 428 429// DefaultSessionPoolConfig is the default configuration for the session pool 430// that will be used for a Spanner client, unless the user supplies a specific 431// session pool config. 432var DefaultSessionPoolConfig = SessionPoolConfig{ 433 MinOpened: 100, 434 MaxOpened: numChannels * 100, 435 MaxBurst: 10, 436 WriteSessions: 0.2, 437 HealthCheckWorkers: 10, 438 HealthCheckInterval: 5 * time.Minute, 439} 440 441// errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set. 442func errMinOpenedGTMaxOpened(maxOpened, minOpened uint64) error { 443 return spannerErrorf(codes.InvalidArgument, 444 "require SessionPoolConfig.MaxOpened >= SessionPoolConfig.MinOpened, got %d and %d", maxOpened, minOpened) 445} 446 447// errWriteFractionOutOfRange returns error for 448// SessionPoolConfig.WriteFraction < 0 or SessionPoolConfig.WriteFraction > 1 449func errWriteFractionOutOfRange(writeFraction float64) error { 450 return spannerErrorf(codes.InvalidArgument, 451 "require SessionPoolConfig.WriteSessions >= 0.0 && SessionPoolConfig.WriteSessions <= 1.0, got %.2f", writeFraction) 452} 453 454// errHealthCheckWorkersNegative returns error for 455// SessionPoolConfig.HealthCheckWorkers < 0 456func errHealthCheckWorkersNegative(workers int) error { 457 return spannerErrorf(codes.InvalidArgument, 458 "require SessionPoolConfig.HealthCheckWorkers >= 0, got %d", workers) 459} 460 461// errHealthCheckIntervalNegative returns error for 462// SessionPoolConfig.HealthCheckInterval < 0 463func errHealthCheckIntervalNegative(interval time.Duration) error { 464 return spannerErrorf(codes.InvalidArgument, 465 "require SessionPoolConfig.HealthCheckInterval >= 0, got %v", interval) 466} 467 468// validate verifies that the SessionPoolConfig is good for use. 469func (spc *SessionPoolConfig) validate() error { 470 if spc.MinOpened > spc.MaxOpened && spc.MaxOpened > 0 { 471 return errMinOpenedGTMaxOpened(spc.MaxOpened, spc.MinOpened) 472 } 473 if spc.WriteSessions < 0.0 || spc.WriteSessions > 1.0 { 474 return errWriteFractionOutOfRange(spc.WriteSessions) 475 } 476 if spc.HealthCheckWorkers < 0 { 477 return errHealthCheckWorkersNegative(spc.HealthCheckWorkers) 478 } 479 if spc.HealthCheckInterval < 0 { 480 return errHealthCheckIntervalNegative(spc.HealthCheckInterval) 481 } 482 return nil 483} 484 485// sessionPool creates and caches Cloud Spanner sessions. 486type sessionPool struct { 487 // mu protects sessionPool from concurrent access. 488 mu sync.Mutex 489 // valid marks the validity of the session pool. 490 valid bool 491 // sc is used to create the sessions for the pool. 492 sc *sessionClient 493 // trackedSessionHandles contains all sessions handles that have been 494 // checked out of the pool. The list is only filled if TrackSessionHandles 495 // has been enabled. 496 trackedSessionHandles list.List 497 // idleList caches idle session IDs. Session IDs in this list can be 498 // allocated for use. 499 idleList list.List 500 // idleWriteList caches idle sessions which have been prepared for write. 501 idleWriteList list.List 502 // mayGetSession is for broadcasting that session retrival/creation may 503 // proceed. 504 mayGetSession chan struct{} 505 // numOpened is the total number of open sessions from the session pool. 506 numOpened uint64 507 // createReqs is the number of ongoing session creation requests. 508 createReqs uint64 509 // prepareReqs is the number of ongoing session preparation request. 510 prepareReqs uint64 511 // disableBackgroundPrepareSessions indicates that the BeginTransaction 512 // call for a read/write transaction failed with a permanent error, such as 513 // PermissionDenied or `Database not found`. Further background calls to 514 // prepare sessions will be disabled. 515 disableBackgroundPrepareSessions bool 516 // configuration of the session pool. 517 SessionPoolConfig 518 // hc is the health checker 519 hc *healthChecker 520 521 // mw is the maintenance window containing statistics for the max number of 522 // sessions checked out of the pool during the last 10 minutes. 523 mw *maintenanceWindow 524} 525 526// newSessionPool creates a new session pool. 527func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, error) { 528 if err := config.validate(); err != nil { 529 return nil, err 530 } 531 pool := &sessionPool{ 532 sc: sc, 533 valid: true, 534 mayGetSession: make(chan struct{}), 535 SessionPoolConfig: config, 536 mw: newMaintenanceWindow(config.MaxOpened), 537 } 538 if config.HealthCheckWorkers == 0 { 539 // With 10 workers and assuming average latency of 5ms for 540 // BeginTransaction, we will be able to prepare 2000 tx/sec in advance. 541 // If the rate of takeWriteSession is more than that, it will degrade to 542 // doing BeginTransaction inline. 543 // 544 // TODO: consider resizing the worker pool dynamically according to the load. 545 config.HealthCheckWorkers = 10 546 } 547 if config.HealthCheckInterval == 0 { 548 config.HealthCheckInterval = 5 * time.Minute 549 } 550 if config.healthCheckSampleInterval == 0 { 551 config.healthCheckSampleInterval = time.Minute 552 } 553 // On GCE VM, within the same region an healthcheck ping takes on average 554 // 10ms to finish, given a 5 minutes interval and 10 healthcheck workers, a 555 // healthChecker can effectively mantain 556 // 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions. 557 pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, config.healthCheckSampleInterval, pool) 558 559 // First initialize the pool before we indicate that the healthchecker is 560 // ready. This prevents the maintainer from starting before the pool has 561 // been initialized, which means that we guarantee that the initial 562 // sessions are created using BatchCreateSessions. 563 if config.MinOpened > 0 { 564 numSessions := minUint64(config.MinOpened, math.MaxInt32) 565 if err := pool.initPool(int32(numSessions)); err != nil { 566 return nil, err 567 } 568 } 569 close(pool.hc.ready) 570 return pool, nil 571} 572 573func (p *sessionPool) initPool(numSessions int32) error { 574 p.mu.Lock() 575 // Take budget before the actual session creation. 576 p.numOpened += uint64(numSessions) 577 recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) 578 p.createReqs += uint64(numSessions) 579 p.mu.Unlock() 580 // Asynchronously create the initial sessions for the pool. 581 return p.sc.batchCreateSessions(numSessions, p) 582} 583 584// sessionReady is executed by the SessionClient when a session has been 585// created and is ready to use. This method will add the new session to the 586// pool and decrease the number of sessions that is being created. 587func (p *sessionPool) sessionReady(s *session) { 588 p.mu.Lock() 589 defer p.mu.Unlock() 590 // Set this pool as the home pool of the session and register it with the 591 // health checker. 592 s.pool = p 593 p.hc.register(s) 594 p.createReqs-- 595 // Insert the session at a random position in the pool to prevent all 596 // sessions affiliated with a channel to be placed at sequentially in the 597 // pool. 598 if p.idleList.Len() > 0 { 599 pos := rand.Intn(p.idleList.Len()) 600 before := p.idleList.Front() 601 for i := 0; i < pos; i++ { 602 before = before.Next() 603 } 604 s.setIdleList(p.idleList.InsertBefore(s, before)) 605 } else { 606 s.setIdleList(p.idleList.PushBack(s)) 607 } 608 // Notify other waiters blocking on session creation. 609 close(p.mayGetSession) 610 p.mayGetSession = make(chan struct{}) 611} 612 613// sessionCreationFailed is called by the SessionClient when the creation of one 614// or more requested sessions finished with an error. sessionCreationFailed will 615// decrease the number of sessions being created and notify any waiters that 616// the session creation failed. 617func (p *sessionPool) sessionCreationFailed(err error, numSessions int32) { 618 p.mu.Lock() 619 defer p.mu.Unlock() 620 p.createReqs -= uint64(numSessions) 621 p.numOpened -= uint64(numSessions) 622 recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) 623 // Notify other waiters blocking on session creation. 624 close(p.mayGetSession) 625 p.mayGetSession = make(chan struct{}) 626} 627 628// isValid checks if the session pool is still valid. 629func (p *sessionPool) isValid() bool { 630 if p == nil { 631 return false 632 } 633 p.mu.Lock() 634 defer p.mu.Unlock() 635 return p.valid 636} 637 638// close marks the session pool as closed. 639func (p *sessionPool) close() { 640 if p == nil { 641 return 642 } 643 p.mu.Lock() 644 if !p.valid { 645 p.mu.Unlock() 646 return 647 } 648 p.valid = false 649 p.mu.Unlock() 650 p.hc.close() 651 // destroy all the sessions 652 p.hc.mu.Lock() 653 allSessions := make([]*session, len(p.hc.queue.sessions)) 654 copy(allSessions, p.hc.queue.sessions) 655 p.hc.mu.Unlock() 656 for _, s := range allSessions { 657 s.destroy(false) 658 } 659} 660 661// errInvalidSessionPool is the error for using an invalid session pool. 662var errInvalidSessionPool = spannerErrorf(codes.InvalidArgument, "invalid session pool") 663 664// errGetSessionTimeout returns error for context timeout during 665// sessionPool.take(). 666var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canceled during getting session") 667 668// newSessionHandle creates a new session handle for the given session for this 669// session pool. The session handle will also hold a copy of the current call 670// stack if the session pool has been configured to track the call stacks of 671// sessions being checked out of the pool. 672func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) { 673 sh = &sessionHandle{session: s, checkoutTime: time.Now()} 674 if p.TrackSessionHandles { 675 p.mu.Lock() 676 sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh) 677 p.mu.Unlock() 678 sh.stack = debug.Stack() 679 } 680 return sh 681} 682 683// errGetSessionTimeout returns error for context timeout during 684// sessionPool.take(). 685func (p *sessionPool) errGetSessionTimeout() error { 686 if p.TrackSessionHandles { 687 return p.errGetSessionTimeoutWithTrackedSessionHandles() 688 } 689 return p.errGetBasicSessionTimeout() 690} 691 692// errGetBasicSessionTimeout returns error for context timout during 693// sessionPool.take() without any tracked sessionHandles. 694func (p *sessionPool) errGetBasicSessionTimeout() error { 695 return spannerErrorf(codes.Canceled, "timeout / context canceled during getting session.\n"+ 696 "Enable SessionPoolConfig.TrackSessionHandles if you suspect a session leak to get more information about the checked out sessions.") 697} 698 699// errGetSessionTimeoutWithTrackedSessionHandles returns error for context 700// timout during sessionPool.take() including a stacktrace of each checked out 701// session handle. 702func (p *sessionPool) errGetSessionTimeoutWithTrackedSessionHandles() error { 703 err := spannerErrorf(codes.Canceled, "timeout / context canceled during getting session.") 704 err.(*Error).additionalInformation = p.getTrackedSessionHandleStacksLocked() 705 return err 706} 707 708// getTrackedSessionHandleStacksLocked returns a string containing the 709// stacktrace of all currently checked out sessions of the pool. This method 710// requires the caller to have locked p.mu. 711func (p *sessionPool) getTrackedSessionHandleStacksLocked() string { 712 p.mu.Lock() 713 defer p.mu.Unlock() 714 stackTraces := "" 715 i := 1 716 element := p.trackedSessionHandles.Front() 717 for element != nil { 718 sh := element.Value.(*sessionHandle) 719 sh.mu.Lock() 720 if sh.stack != nil { 721 stackTraces = fmt.Sprintf("%s\n\nSession %d checked out of pool at %s by goroutine:\n%s", stackTraces, i, sh.checkoutTime.Format(time.RFC3339), sh.stack) 722 } 723 sh.mu.Unlock() 724 element = element.Next() 725 i++ 726 } 727 return stackTraces 728} 729 730// shouldPrepareWriteLocked returns true if we should prepare more sessions for write. 731func (p *sessionPool) shouldPrepareWriteLocked() bool { 732 return !p.disableBackgroundPrepareSessions && float64(p.numOpened)*p.WriteSessions > float64(p.idleWriteList.Len()+int(p.prepareReqs)) 733} 734 735func (p *sessionPool) createSession(ctx context.Context) (*session, error) { 736 trace.TracePrintf(ctx, nil, "Creating a new session") 737 doneCreate := func(done bool) { 738 p.mu.Lock() 739 if !done { 740 // Session creation failed, give budget back. 741 p.numOpened-- 742 recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 743 } 744 p.createReqs-- 745 // Notify other waiters blocking on session creation. 746 close(p.mayGetSession) 747 p.mayGetSession = make(chan struct{}) 748 p.mu.Unlock() 749 } 750 s, err := p.sc.createSession(ctx) 751 if err != nil { 752 doneCreate(false) 753 // Should return error directly because of the previous retries on 754 // CreateSession RPC. 755 // If the error is a timeout, there is a chance that the session was 756 // created on the server but is not known to the session pool. This 757 // session will then be garbage collected by the server after 1 hour. 758 return nil, err 759 } 760 s.pool = p 761 p.hc.register(s) 762 doneCreate(true) 763 return s, nil 764} 765 766func (p *sessionPool) isHealthy(s *session) bool { 767 if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) { 768 // TODO: figure out if we need to schedule a new healthcheck worker here. 769 if err := s.ping(); isSessionNotFoundError(err) { 770 // The session is already bad, continue to fetch/create a new one. 771 s.destroy(false) 772 return false 773 } 774 p.hc.scheduledHC(s) 775 } 776 return true 777} 778 779// take returns a cached session if there are available ones; if there isn't 780// any, it tries to allocate a new one. Session returned by take should be used 781// for read operations. 782func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) { 783 trace.TracePrintf(ctx, nil, "Acquiring a read-only session") 784 for { 785 var ( 786 s *session 787 err error 788 ) 789 790 p.mu.Lock() 791 if !p.valid { 792 p.mu.Unlock() 793 return nil, errInvalidSessionPool 794 } 795 if p.idleList.Len() > 0 { 796 // Idle sessions are available, get one from the top of the idle 797 // list. 798 s = p.idleList.Remove(p.idleList.Front()).(*session) 799 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 800 "Acquired read-only session") 801 } else if p.idleWriteList.Len() > 0 { 802 s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session) 803 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 804 "Acquired read-write session") 805 } 806 if s != nil { 807 s.setIdleList(nil) 808 numCheckedOut := p.currSessionsCheckedOutLocked() 809 p.mu.Unlock() 810 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 811 // From here, session is no longer in idle list, so healthcheck 812 // workers won't destroy it. If healthcheck workers failed to 813 // schedule healthcheck for the session timely, do the check here. 814 // Because session check is still much cheaper than session 815 // creation, they should be reused as much as possible. 816 if !p.isHealthy(s) { 817 continue 818 } 819 return p.newSessionHandle(s), nil 820 } 821 822 // Idle list is empty, block if session pool has reached max session 823 // creation concurrency or max number of open sessions. 824 if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) { 825 mayGetSession := p.mayGetSession 826 p.mu.Unlock() 827 trace.TracePrintf(ctx, nil, "Waiting for read-only session to become available") 828 select { 829 case <-ctx.Done(): 830 trace.TracePrintf(ctx, nil, "Context done waiting for session") 831 return nil, p.errGetSessionTimeout() 832 case <-mayGetSession: 833 } 834 continue 835 } 836 837 // Take budget before the actual session creation. 838 p.numOpened++ 839 // Creating a new session that will be returned directly to the client 840 // means that the max number of sessions in use also increases. 841 numCheckedOut := p.currSessionsCheckedOutLocked() 842 recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 843 p.createReqs++ 844 p.mu.Unlock() 845 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 846 if s, err = p.createSession(ctx); err != nil { 847 trace.TracePrintf(ctx, nil, "Error creating session: %v", err) 848 return nil, toSpannerError(err) 849 } 850 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 851 "Created session") 852 return p.newSessionHandle(s), nil 853 } 854} 855 856// takeWriteSession returns a write prepared cached session if there are 857// available ones; if there isn't any, it tries to allocate a new one. Session 858// returned should be used for read write transactions. 859func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) { 860 trace.TracePrintf(ctx, nil, "Acquiring a read-write session") 861 for { 862 var ( 863 s *session 864 err error 865 ) 866 867 p.mu.Lock() 868 if !p.valid { 869 p.mu.Unlock() 870 return nil, errInvalidSessionPool 871 } 872 if p.idleWriteList.Len() > 0 { 873 // Idle sessions are available, get one from the top of the idle 874 // list. 875 s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session) 876 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-write session") 877 } else if p.idleList.Len() > 0 { 878 s = p.idleList.Remove(p.idleList.Front()).(*session) 879 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-only session") 880 } 881 if s != nil { 882 s.setIdleList(nil) 883 numCheckedOut := p.currSessionsCheckedOutLocked() 884 p.mu.Unlock() 885 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 886 // From here, session is no longer in idle list, so healthcheck 887 // workers won't destroy it. If healthcheck workers failed to 888 // schedule healthcheck for the session timely, do the check here. 889 // Because session check is still much cheaper than session 890 // creation, they should be reused as much as possible. 891 if !p.isHealthy(s) { 892 continue 893 } 894 } else { 895 // Idle list is empty, block if session pool has reached max session 896 // creation concurrency or max number of open sessions. 897 if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) { 898 mayGetSession := p.mayGetSession 899 p.mu.Unlock() 900 trace.TracePrintf(ctx, nil, "Waiting for read-write session to become available") 901 select { 902 case <-ctx.Done(): 903 trace.TracePrintf(ctx, nil, "Context done waiting for session") 904 return nil, p.errGetSessionTimeout() 905 case <-mayGetSession: 906 } 907 continue 908 } 909 910 // Take budget before the actual session creation. 911 p.numOpened++ 912 // Creating a new session that will be returned directly to the client 913 // means that the max number of sessions in use also increases. 914 numCheckedOut := p.currSessionsCheckedOutLocked() 915 recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 916 p.createReqs++ 917 p.mu.Unlock() 918 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 919 if s, err = p.createSession(ctx); err != nil { 920 trace.TracePrintf(ctx, nil, "Error creating session: %v", err) 921 return nil, toSpannerError(err) 922 } 923 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 924 "Created session") 925 } 926 if !s.isWritePrepared() { 927 if err = s.prepareForWrite(ctx); err != nil { 928 if isSessionNotFoundError(err) { 929 s.destroy(false) 930 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 931 "Session not found for write") 932 return nil, toSpannerError(err) 933 } 934 935 s.recycle() 936 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 937 "Error preparing session for write") 938 return nil, toSpannerError(err) 939 } 940 } 941 return p.newSessionHandle(s), nil 942 } 943} 944 945// recycle puts session s back to the session pool's idle list, it returns true 946// if the session pool successfully recycles session s. 947func (p *sessionPool) recycle(s *session) bool { 948 p.mu.Lock() 949 defer p.mu.Unlock() 950 if !s.isValid() || !p.valid { 951 // Reject the session if session is invalid or pool itself is invalid. 952 return false 953 } 954 // Put session at the top of the list to be handed out in LIFO order for load balancing 955 // across channels. 956 if s.isWritePrepared() { 957 s.setIdleList(p.idleWriteList.PushFront(s)) 958 } else { 959 s.setIdleList(p.idleList.PushFront(s)) 960 } 961 // Broadcast that a session has been returned to idle list. 962 close(p.mayGetSession) 963 p.mayGetSession = make(chan struct{}) 964 return true 965} 966 967// remove atomically removes session s from the session pool and invalidates s. 968// If isExpire == true, the removal is triggered by session expiration and in 969// such cases, only idle sessions can be removed. 970func (p *sessionPool) remove(s *session, isExpire bool) bool { 971 p.mu.Lock() 972 defer p.mu.Unlock() 973 if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) { 974 // Don't expire session if the session is not in idle list (in use), or 975 // if number of open sessions is going below p.MinOpened. 976 return false 977 } 978 ol := s.setIdleList(nil) 979 // If the session is in the idlelist, remove it. 980 if ol != nil { 981 // Remove from whichever list it is in. 982 p.idleList.Remove(ol) 983 p.idleWriteList.Remove(ol) 984 } 985 if s.invalidate() { 986 // Decrease the number of opened sessions. 987 p.numOpened-- 988 recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) 989 // Broadcast that a session has been destroyed. 990 close(p.mayGetSession) 991 p.mayGetSession = make(chan struct{}) 992 return true 993 } 994 return false 995} 996 997func (p *sessionPool) currSessionsCheckedOutLocked() uint64 { 998 return p.numOpened - uint64(p.idleList.Len()) - uint64(p.idleWriteList.Len()) 999} 1000 1001// hcHeap implements heap.Interface. It is used to create the priority queue for 1002// session healthchecks. 1003type hcHeap struct { 1004 sessions []*session 1005} 1006 1007// Len implements heap.Interface.Len. 1008func (h hcHeap) Len() int { 1009 return len(h.sessions) 1010} 1011 1012// Less implements heap.Interface.Less. 1013func (h hcHeap) Less(i, j int) bool { 1014 return h.sessions[i].getNextCheck().Before(h.sessions[j].getNextCheck()) 1015} 1016 1017// Swap implements heap.Interface.Swap. 1018func (h hcHeap) Swap(i, j int) { 1019 h.sessions[i], h.sessions[j] = h.sessions[j], h.sessions[i] 1020 h.sessions[i].setHcIndex(i) 1021 h.sessions[j].setHcIndex(j) 1022} 1023 1024// Push implements heap.Interface.Push. 1025func (h *hcHeap) Push(s interface{}) { 1026 ns := s.(*session) 1027 ns.setHcIndex(len(h.sessions)) 1028 h.sessions = append(h.sessions, ns) 1029} 1030 1031// Pop implements heap.Interface.Pop. 1032func (h *hcHeap) Pop() interface{} { 1033 old := h.sessions 1034 n := len(old) 1035 s := old[n-1] 1036 h.sessions = old[:n-1] 1037 s.setHcIndex(-1) 1038 return s 1039} 1040 1041// maintenanceWindowSize specifies the number of health check cycles that 1042// defines a maintenance window. The maintenance window keeps track of a 1043// rolling set of numbers for the number of maximum checked out sessions during 1044// the maintenance window. This is used by the maintainer to determine the 1045// number of sessions to create or delete at the end of each health check 1046// cycle. 1047const maintenanceWindowSize = 10 1048 1049// maintenanceWindow contains the statistics that are gathered during a health 1050// check maintenance window. 1051type maintenanceWindow struct { 1052 mu sync.Mutex 1053 // maxSessionsCheckedOut contains the maximum number of sessions that was 1054 // checked out of the session pool during a health check cycle. This number 1055 // indicates the number of sessions that was actually needed by the pool to 1056 // serve the load during that cycle. The values are kept as a rolling set 1057 // containing the values for the past 10 cycles (minutes). The maintainer 1058 // uses these values to determine the number of sessions to keep at the end 1059 // of each cycle. 1060 maxSessionsCheckedOut [maintenanceWindowSize]uint64 1061} 1062 1063// maxSessionsCheckedOutDuringWindow returns the maximum number of sessions 1064// that has been checked out during the last maintenance window of 10 cycles 1065// (minutes). 1066func (mw *maintenanceWindow) maxSessionsCheckedOutDuringWindow() uint64 { 1067 mw.mu.Lock() 1068 defer mw.mu.Unlock() 1069 var max uint64 1070 for _, cycleMax := range mw.maxSessionsCheckedOut { 1071 max = maxUint64(max, cycleMax) 1072 } 1073 return max 1074} 1075 1076// updateMaxSessionsCheckedOutDuringWindow updates the maximum number of 1077// sessions that has been checked out of the pool during the current 1078// cycle of the maintenance window. A maintenance window consists of 10 1079// maintenance cycles. Each cycle keeps track of the max number of sessions in 1080// use during that cycle. The rolling maintenance window of 10 cycles is used 1081// to determine the number of sessions to keep at the end of a cycle by 1082// calculating the max in use during the last 10 cycles. 1083func (mw *maintenanceWindow) updateMaxSessionsCheckedOutDuringWindow(currNumSessionsCheckedOut uint64) { 1084 mw.mu.Lock() 1085 defer mw.mu.Unlock() 1086 mw.maxSessionsCheckedOut[0] = maxUint64(currNumSessionsCheckedOut, mw.maxSessionsCheckedOut[0]) 1087} 1088 1089// startNewCycle starts a new health check cycle with the specified number of 1090// checked out sessions as its initial value. 1091func (mw *maintenanceWindow) startNewCycle(currNumSessionsCheckedOut uint64) { 1092 mw.mu.Lock() 1093 defer mw.mu.Unlock() 1094 copy(mw.maxSessionsCheckedOut[1:], mw.maxSessionsCheckedOut[:9]) 1095 mw.maxSessionsCheckedOut[0] = currNumSessionsCheckedOut 1096} 1097 1098// newMaintenanceWindow creates a new maintenance window with all values for 1099// maxSessionsCheckedOut set to maxOpened. This ensures that a complete 1100// maintenance window must pass before the maintainer will start to delete any 1101// sessions. 1102func newMaintenanceWindow(maxOpened uint64) *maintenanceWindow { 1103 mw := &maintenanceWindow{} 1104 // Initialize the rolling window with max values to prevent the maintainer 1105 // from deleting sessions before a complete window of 10 cycles has 1106 // finished. 1107 for i := 0; i < maintenanceWindowSize; i++ { 1108 mw.maxSessionsCheckedOut[i] = maxOpened 1109 } 1110 return mw 1111} 1112 1113// healthChecker performs periodical healthchecks on registered sessions. 1114type healthChecker struct { 1115 // mu protects concurrent access to healthChecker. 1116 mu sync.Mutex 1117 // queue is the priority queue for session healthchecks. Sessions with lower 1118 // nextCheck rank higher in the queue. 1119 queue hcHeap 1120 // interval is the average interval between two healthchecks on a session. 1121 interval time.Duration 1122 // workers is the number of concurrent healthcheck workers. 1123 workers int 1124 // waitWorkers waits for all healthcheck workers to exit 1125 waitWorkers sync.WaitGroup 1126 // pool is the underlying session pool. 1127 pool *sessionPool 1128 // sampleInterval is the interval of sampling by the maintainer. 1129 sampleInterval time.Duration 1130 // ready is used to signal that maintainer can start running. 1131 ready chan struct{} 1132 // done is used to signal that health checker should be closed. 1133 done chan struct{} 1134 // once is used for closing channel done only once. 1135 once sync.Once 1136 maintainerCancel func() 1137} 1138 1139// newHealthChecker initializes new instance of healthChecker. 1140func newHealthChecker(interval time.Duration, workers int, sampleInterval time.Duration, pool *sessionPool) *healthChecker { 1141 if workers <= 0 { 1142 workers = 1 1143 } 1144 hc := &healthChecker{ 1145 interval: interval, 1146 workers: workers, 1147 pool: pool, 1148 sampleInterval: sampleInterval, 1149 ready: make(chan struct{}), 1150 done: make(chan struct{}), 1151 maintainerCancel: func() {}, 1152 } 1153 hc.waitWorkers.Add(1) 1154 go hc.maintainer() 1155 for i := 1; i <= hc.workers; i++ { 1156 hc.waitWorkers.Add(1) 1157 go hc.worker(i) 1158 } 1159 return hc 1160} 1161 1162// close closes the healthChecker and waits for all healthcheck workers to exit. 1163func (hc *healthChecker) close() { 1164 hc.mu.Lock() 1165 hc.maintainerCancel() 1166 hc.mu.Unlock() 1167 hc.once.Do(func() { close(hc.done) }) 1168 hc.waitWorkers.Wait() 1169} 1170 1171// isClosing checks if a healthChecker is already closing. 1172func (hc *healthChecker) isClosing() bool { 1173 select { 1174 case <-hc.done: 1175 return true 1176 default: 1177 return false 1178 } 1179} 1180 1181// getInterval gets the healthcheck interval. 1182func (hc *healthChecker) getInterval() time.Duration { 1183 hc.mu.Lock() 1184 defer hc.mu.Unlock() 1185 return hc.interval 1186} 1187 1188// scheduledHCLocked schedules next healthcheck on session s with the assumption 1189// that hc.mu is being held. 1190func (hc *healthChecker) scheduledHCLocked(s *session) { 1191 // The next healthcheck will be scheduled after 1192 // [interval*0.5, interval*1.5) ns. 1193 nsFromNow := rand.Int63n(int64(hc.interval)) + int64(hc.interval)/2 1194 s.setNextCheck(time.Now().Add(time.Duration(nsFromNow))) 1195 if hi := s.getHcIndex(); hi != -1 { 1196 // Session is still being tracked by healthcheck workers. 1197 heap.Fix(&hc.queue, hi) 1198 } 1199} 1200 1201// scheduledHC schedules next healthcheck on session s. It is safe to be called 1202// concurrently. 1203func (hc *healthChecker) scheduledHC(s *session) { 1204 hc.mu.Lock() 1205 defer hc.mu.Unlock() 1206 hc.scheduledHCLocked(s) 1207} 1208 1209// register registers a session with healthChecker for periodical healthcheck. 1210func (hc *healthChecker) register(s *session) { 1211 hc.mu.Lock() 1212 defer hc.mu.Unlock() 1213 hc.scheduledHCLocked(s) 1214 heap.Push(&hc.queue, s) 1215} 1216 1217// unregister unregisters a session from healthcheck queue. 1218func (hc *healthChecker) unregister(s *session) { 1219 hc.mu.Lock() 1220 defer hc.mu.Unlock() 1221 oi := s.setHcIndex(-1) 1222 if oi >= 0 { 1223 heap.Remove(&hc.queue, oi) 1224 } 1225} 1226 1227// markDone marks that health check for session has been performed. 1228func (hc *healthChecker) markDone(s *session) { 1229 hc.mu.Lock() 1230 defer hc.mu.Unlock() 1231 s.checkingHealth = false 1232} 1233 1234// healthCheck checks the health of the session and pings it if needed. 1235func (hc *healthChecker) healthCheck(s *session) { 1236 defer hc.markDone(s) 1237 if !s.pool.isValid() { 1238 // Session pool is closed, perform a garbage collection. 1239 s.destroy(false) 1240 return 1241 } 1242 if err := s.ping(); isSessionNotFoundError(err) { 1243 // Ping failed, destroy the session. 1244 s.destroy(false) 1245 } 1246} 1247 1248// worker performs the healthcheck on sessions in healthChecker's priority 1249// queue. 1250func (hc *healthChecker) worker(i int) { 1251 // Returns a session which we should ping to keep it alive. 1252 getNextForPing := func() *session { 1253 hc.pool.mu.Lock() 1254 defer hc.pool.mu.Unlock() 1255 hc.mu.Lock() 1256 defer hc.mu.Unlock() 1257 if hc.queue.Len() <= 0 { 1258 // Queue is empty. 1259 return nil 1260 } 1261 s := hc.queue.sessions[0] 1262 if s.getNextCheck().After(time.Now()) && hc.pool.valid { 1263 // All sessions have been checked recently. 1264 return nil 1265 } 1266 hc.scheduledHCLocked(s) 1267 if !s.checkingHealth { 1268 s.checkingHealth = true 1269 return s 1270 } 1271 return nil 1272 } 1273 1274 // Returns a session which we should prepare for write. 1275 getNextForTx := func() *session { 1276 hc.pool.mu.Lock() 1277 defer hc.pool.mu.Unlock() 1278 if hc.pool.shouldPrepareWriteLocked() { 1279 if hc.pool.idleList.Len() > 0 && hc.pool.valid { 1280 hc.mu.Lock() 1281 defer hc.mu.Unlock() 1282 if hc.pool.idleList.Front().Value.(*session).checkingHealth { 1283 return nil 1284 } 1285 session := hc.pool.idleList.Remove(hc.pool.idleList.Front()).(*session) 1286 session.checkingHealth = true 1287 hc.pool.prepareReqs++ 1288 return session 1289 } 1290 } 1291 return nil 1292 } 1293 1294 for { 1295 if hc.isClosing() { 1296 // Exit when the pool has been closed and all sessions have been 1297 // destroyed or when health checker has been closed. 1298 hc.waitWorkers.Done() 1299 return 1300 } 1301 ws := getNextForTx() 1302 if ws != nil { 1303 ctx, cancel := context.WithTimeout(context.Background(), time.Minute) 1304 err := ws.prepareForWrite(ctx) 1305 cancel() 1306 if err != nil { 1307 // Skip handling prepare error, session can be prepared in next 1308 // cycle. 1309 // Don't log about permission errors, which may be expected 1310 // (e.g. using read-only auth). 1311 serr := toSpannerError(err).(*Error) 1312 if serr.Code != codes.PermissionDenied { 1313 logf(hc.pool.sc.logger, "Failed to prepare session, error: %v", serr) 1314 } 1315 } 1316 hc.pool.recycle(ws) 1317 hc.pool.mu.Lock() 1318 hc.pool.prepareReqs-- 1319 hc.pool.mu.Unlock() 1320 hc.markDone(ws) 1321 } 1322 rs := getNextForPing() 1323 if rs == nil { 1324 if ws == nil { 1325 // No work to be done so sleep to avoid burning CPU. 1326 pause := int64(100 * time.Millisecond) 1327 if pause > int64(hc.interval) { 1328 pause = int64(hc.interval) 1329 } 1330 select { 1331 case <-time.After(time.Duration(rand.Int63n(pause) + pause/2)): 1332 case <-hc.done: 1333 } 1334 1335 } 1336 continue 1337 } 1338 hc.healthCheck(rs) 1339 } 1340} 1341 1342// maintainer maintains the number of sessions in the pool based on the session 1343// pool configuration and the current and historical number of sessions checked 1344// out of the pool. The maintainer will: 1345// 1. Ensure that the session pool contains at least MinOpened sessions. 1346// 2. If the current number of sessions in the pool exceeds the greatest number 1347// of checked out sessions (=sessions in use) during the last 10 minutes, 1348// and the delta is larger than MaxIdleSessions, the maintainer will reduce 1349// the number of sessions to maxSessionsInUseDuringWindow+MaxIdleSessions. 1350func (hc *healthChecker) maintainer() { 1351 // Wait until the pool is ready. 1352 <-hc.ready 1353 1354 for iteration := uint64(0); ; iteration++ { 1355 if hc.isClosing() { 1356 hc.waitWorkers.Done() 1357 return 1358 } 1359 1360 hc.pool.mu.Lock() 1361 currSessionsOpened := hc.pool.numOpened 1362 maxIdle := hc.pool.MaxIdle 1363 minOpened := hc.pool.MinOpened 1364 hc.pool.mu.Unlock() 1365 // Get the maximum number of sessions in use during the current 1366 // maintenance window. 1367 maxSessionsInUseDuringWindow := hc.pool.mw.maxSessionsCheckedOutDuringWindow() 1368 hc.mu.Lock() 1369 ctx, cancel := context.WithTimeout(context.Background(), hc.sampleInterval) 1370 hc.maintainerCancel = cancel 1371 hc.mu.Unlock() 1372 1373 // Grow or shrink pool if needed. 1374 // The number of sessions in the pool should be in the range 1375 // [Config.MinOpened, Config.MaxIdle+maxSessionsInUseDuringWindow] 1376 if currSessionsOpened < minOpened { 1377 hc.growPool(ctx, minOpened) 1378 } else if maxIdle+maxSessionsInUseDuringWindow < currSessionsOpened { 1379 hc.shrinkPool(ctx, maxIdle+maxSessionsInUseDuringWindow) 1380 } 1381 1382 select { 1383 case <-ctx.Done(): 1384 case <-hc.done: 1385 cancel() 1386 } 1387 // Cycle the maintenance window. This will remove the oldest cycle and 1388 // add a new cycle at the beginning of the maintenance window with the 1389 // currently checked out number of sessions as the max number of 1390 // sessions in use in this cycle. This value will be increased during 1391 // the next cycle if it increases. 1392 hc.pool.mu.Lock() 1393 currSessionsInUse := hc.pool.currSessionsCheckedOutLocked() 1394 hc.pool.mu.Unlock() 1395 hc.pool.mw.startNewCycle(currSessionsInUse) 1396 } 1397} 1398 1399// growPool grows the number of sessions in the pool to the specified number of 1400// sessions. It timeouts on sampleInterval. 1401func (hc *healthChecker) growPool(ctx context.Context, growToNumSessions uint64) { 1402 // Calculate the max number of sessions to create as a safeguard against 1403 // other processes that could be deleting sessions concurrently. 1404 hc.pool.mu.Lock() 1405 maxSessionsToCreate := int(growToNumSessions - hc.pool.numOpened) 1406 hc.pool.mu.Unlock() 1407 var created int 1408 for { 1409 if ctx.Err() != nil { 1410 return 1411 } 1412 1413 p := hc.pool 1414 p.mu.Lock() 1415 // Take budget before the actual session creation. 1416 if growToNumSessions <= p.numOpened || created >= maxSessionsToCreate { 1417 p.mu.Unlock() 1418 break 1419 } 1420 p.numOpened++ 1421 recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 1422 p.createReqs++ 1423 shouldPrepareWrite := p.shouldPrepareWriteLocked() 1424 p.mu.Unlock() 1425 var ( 1426 s *session 1427 err error 1428 ) 1429 createContext, cancel := context.WithTimeout(context.Background(), time.Minute) 1430 if s, err = p.createSession(createContext); err != nil { 1431 cancel() 1432 logf(p.sc.logger, "Failed to create session, error: %v", toSpannerError(err)) 1433 continue 1434 } 1435 cancel() 1436 created++ 1437 if shouldPrepareWrite { 1438 prepareContext, cancel := context.WithTimeout(context.Background(), time.Minute) 1439 if err = s.prepareForWrite(prepareContext); err != nil { 1440 cancel() 1441 p.recycle(s) 1442 // Don't log about permission errors, which may be expected 1443 // (e.g. using read-only auth). 1444 serr := toSpannerError(err).(*Error) 1445 if serr.Code != codes.PermissionDenied { 1446 logf(p.sc.logger, "Failed to prepare session, error: %v", serr) 1447 } 1448 continue 1449 } 1450 cancel() 1451 } 1452 p.recycle(s) 1453 } 1454} 1455 1456// shrinkPool scales down the session pool. The method will stop deleting 1457// sessions when shrinkToNumSessions number of sessions in the pool has 1458// been reached. The method will also stop deleting sessions if it detects that 1459// another process has started creating sessions for the pool again, for 1460// example through the take() method. 1461func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uint64) { 1462 hc.pool.mu.Lock() 1463 maxSessionsToDelete := int(hc.pool.numOpened - shrinkToNumSessions) 1464 hc.pool.mu.Unlock() 1465 var deleted int 1466 var prevNumOpened uint64 = math.MaxUint64 1467 for { 1468 if ctx.Err() != nil { 1469 return 1470 } 1471 1472 p := hc.pool 1473 p.mu.Lock() 1474 // Check if the number of open sessions has increased. If it has, we 1475 // should stop deleting sessions, as the load has increased and 1476 // additional sessions are needed. 1477 if p.numOpened >= prevNumOpened { 1478 p.mu.Unlock() 1479 break 1480 } 1481 prevNumOpened = p.numOpened 1482 1483 // Check on both whether we have reached the number of open sessions as 1484 // well as the number of sessions to delete, in case sessions have been 1485 // deleted by other methods because they have expired or deemed 1486 // invalid. 1487 if shrinkToNumSessions >= p.numOpened || deleted >= maxSessionsToDelete { 1488 p.mu.Unlock() 1489 break 1490 } 1491 1492 var s *session 1493 if p.idleList.Len() > 0 { 1494 s = p.idleList.Front().Value.(*session) 1495 } else if p.idleWriteList.Len() > 0 { 1496 s = p.idleWriteList.Front().Value.(*session) 1497 } 1498 p.mu.Unlock() 1499 if s != nil { 1500 deleted++ 1501 // destroy session as expire. 1502 s.destroy(true) 1503 } else { 1504 break 1505 } 1506 } 1507} 1508 1509// maxUint64 returns the maximum of two uint64. 1510func maxUint64(a, b uint64) uint64 { 1511 if a > b { 1512 return a 1513 } 1514 return b 1515} 1516 1517// minUint64 returns the minimum of two uint64. 1518func minUint64(a, b uint64) uint64 { 1519 if a > b { 1520 return b 1521 } 1522 return a 1523} 1524 1525// isSessionNotFoundError returns true if the given error is a 1526// `Session not found` error. 1527func isSessionNotFoundError(err error) bool { 1528 if err == nil { 1529 return false 1530 } 1531 // We are checking specifically for the error message `Session not found`, 1532 // as the error could also be a `Database not found`. The latter should 1533 // cause the session pool to stop preparing sessions for read/write 1534 // transactions, while the former should not. 1535 // TODO: once gRPC can return auxiliary error information, stop parsing the error message. 1536 return ErrCode(err) == codes.NotFound && strings.Contains(err.Error(), "Session not found") 1537} 1538