1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "container/heap" 21 "container/list" 22 "context" 23 "fmt" 24 "log" 25 "math" 26 "math/rand" 27 "runtime/debug" 28 "sync" 29 "time" 30 31 "cloud.google.com/go/internal/trace" 32 vkit "cloud.google.com/go/spanner/apiv1" 33 sppb "google.golang.org/genproto/googleapis/spanner/v1" 34 "google.golang.org/grpc/codes" 35 "google.golang.org/grpc/metadata" 36) 37 38// sessionHandle is an interface for transactions to access Cloud Spanner 39// sessions safely. It is generated by sessionPool.take(). 40type sessionHandle struct { 41 // mu guarantees that the inner session object is returned / destroyed only 42 // once. 43 mu sync.Mutex 44 // session is a pointer to a session object. Transactions never need to 45 // access it directly. 46 session *session 47 // checkoutTime is the time the session was checked out of the pool. 48 checkoutTime time.Time 49 // trackedSessionHandle is the linked list node which links the session to 50 // the list of tracked session handles. trackedSessionHandle is only set if 51 // TrackSessionHandles has been enabled in the session pool configuration. 52 trackedSessionHandle *list.Element 53 // stack is the call stack of the goroutine that checked out the session 54 // from the pool. This can be used to track down session leak problems. 55 stack []byte 56} 57 58// recycle gives the inner session object back to its home session pool. It is 59// safe to call recycle multiple times but only the first one would take effect. 60func (sh *sessionHandle) recycle() { 61 sh.mu.Lock() 62 if sh.session == nil { 63 // sessionHandle has already been recycled. 64 sh.mu.Unlock() 65 return 66 } 67 p := sh.session.pool 68 tracked := sh.trackedSessionHandle 69 sh.session.recycle() 70 sh.session = nil 71 sh.trackedSessionHandle = nil 72 sh.checkoutTime = time.Time{} 73 sh.stack = nil 74 sh.mu.Unlock() 75 if tracked != nil { 76 p.mu.Lock() 77 p.trackedSessionHandles.Remove(tracked) 78 p.mu.Unlock() 79 } 80} 81 82// getID gets the Cloud Spanner session ID from the internal session object. 83// getID returns empty string if the sessionHandle is nil or the inner session 84// object has been released by recycle / destroy. 85func (sh *sessionHandle) getID() string { 86 sh.mu.Lock() 87 defer sh.mu.Unlock() 88 if sh.session == nil { 89 // sessionHandle has already been recycled/destroyed. 90 return "" 91 } 92 return sh.session.getID() 93} 94 95// getClient gets the Cloud Spanner RPC client associated with the session ID 96// in sessionHandle. 97func (sh *sessionHandle) getClient() *vkit.Client { 98 sh.mu.Lock() 99 defer sh.mu.Unlock() 100 if sh.session == nil { 101 return nil 102 } 103 return sh.session.client 104} 105 106// getMetadata returns the metadata associated with the session in sessionHandle. 107func (sh *sessionHandle) getMetadata() metadata.MD { 108 sh.mu.Lock() 109 defer sh.mu.Unlock() 110 if sh.session == nil { 111 return nil 112 } 113 return sh.session.md 114} 115 116// getTransactionID returns the transaction id in the session if available. 117func (sh *sessionHandle) getTransactionID() transactionID { 118 sh.mu.Lock() 119 defer sh.mu.Unlock() 120 if sh.session == nil { 121 return nil 122 } 123 return sh.session.tx 124} 125 126// destroy destroys the inner session object. It is safe to call destroy 127// multiple times and only the first call would attempt to 128// destroy the inner session object. 129func (sh *sessionHandle) destroy() { 130 sh.mu.Lock() 131 s := sh.session 132 if s == nil { 133 // sessionHandle has already been recycled. 134 sh.mu.Unlock() 135 return 136 } 137 tracked := sh.trackedSessionHandle 138 sh.session = nil 139 sh.trackedSessionHandle = nil 140 sh.checkoutTime = time.Time{} 141 sh.stack = nil 142 sh.mu.Unlock() 143 144 if tracked != nil { 145 p := s.pool 146 p.mu.Lock() 147 p.trackedSessionHandles.Remove(tracked) 148 p.mu.Unlock() 149 } 150 s.destroy(false) 151} 152 153// session wraps a Cloud Spanner session ID through which transactions are 154// created and executed. 155type session struct { 156 // client is the RPC channel to Cloud Spanner. It is set only once during 157 // session's creation. 158 client *vkit.Client 159 // id is the unique id of the session in Cloud Spanner. It is set only once 160 // during session's creation. 161 id string 162 // pool is the session's home session pool where it was created. It is set 163 // only once during session's creation. 164 pool *sessionPool 165 // createTime is the timestamp of the session's creation. It is set only 166 // once during session's creation. 167 createTime time.Time 168 // logger is the logger configured for the Spanner client that created the 169 // session. If nil, logging will be directed to the standard logger. 170 logger *log.Logger 171 172 // mu protects the following fields from concurrent access: both 173 // healthcheck workers and transactions can modify them. 174 mu sync.Mutex 175 // valid marks the validity of a session. 176 valid bool 177 // hcIndex is the index of the session inside the global healthcheck queue. 178 // If hcIndex < 0, session has been unregistered from the queue. 179 hcIndex int 180 // idleList is the linkedlist node which links the session to its home 181 // session pool's idle list. If idleList == nil, the 182 // session is not in idle list. 183 idleList *list.Element 184 // nextCheck is the timestamp of next scheduled healthcheck of the session. 185 // It is maintained by the global health checker. 186 nextCheck time.Time 187 // checkingHelath is true if currently this session is being processed by 188 // health checker. Must be modified under health checker lock. 189 checkingHealth bool 190 // md is the Metadata to be sent with each request. 191 md metadata.MD 192 // tx contains the transaction id if the session has been prepared for 193 // write. 194 tx transactionID 195} 196 197// isValid returns true if the session is still valid for use. 198func (s *session) isValid() bool { 199 s.mu.Lock() 200 defer s.mu.Unlock() 201 return s.valid 202} 203 204// isWritePrepared returns true if the session is prepared for write. 205func (s *session) isWritePrepared() bool { 206 s.mu.Lock() 207 defer s.mu.Unlock() 208 return s.tx != nil 209} 210 211// String implements fmt.Stringer for session. 212func (s *session) String() string { 213 s.mu.Lock() 214 defer s.mu.Unlock() 215 return fmt.Sprintf("<id=%v, hcIdx=%v, idleList=%p, valid=%v, create=%v, nextcheck=%v>", 216 s.id, s.hcIndex, s.idleList, s.valid, s.createTime, s.nextCheck) 217} 218 219// ping verifies if the session is still alive in Cloud Spanner. 220func (s *session) ping() error { 221 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 222 defer cancel() 223 // s.getID is safe even when s is invalid. 224 _, err := s.client.GetSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.GetSessionRequest{Name: s.getID()}) 225 return err 226} 227 228// setHcIndex atomically sets the session's index in the healthcheck queue and 229// returns the old index. 230func (s *session) setHcIndex(i int) int { 231 s.mu.Lock() 232 defer s.mu.Unlock() 233 oi := s.hcIndex 234 s.hcIndex = i 235 return oi 236} 237 238// setIdleList atomically sets the session's idle list link and returns the old 239// link. 240func (s *session) setIdleList(le *list.Element) *list.Element { 241 s.mu.Lock() 242 defer s.mu.Unlock() 243 old := s.idleList 244 s.idleList = le 245 return old 246} 247 248// invalidate marks a session as invalid and returns the old validity. 249func (s *session) invalidate() bool { 250 s.mu.Lock() 251 defer s.mu.Unlock() 252 ov := s.valid 253 s.valid = false 254 return ov 255} 256 257// setNextCheck sets the timestamp for next healthcheck on the session. 258func (s *session) setNextCheck(t time.Time) { 259 s.mu.Lock() 260 defer s.mu.Unlock() 261 s.nextCheck = t 262} 263 264// setTransactionID sets the transaction id in the session 265func (s *session) setTransactionID(tx transactionID) { 266 s.mu.Lock() 267 defer s.mu.Unlock() 268 s.tx = tx 269} 270 271// getID returns the session ID which uniquely identifies the session in Cloud 272// Spanner. 273func (s *session) getID() string { 274 s.mu.Lock() 275 defer s.mu.Unlock() 276 return s.id 277} 278 279// getHcIndex returns the session's index into the global healthcheck priority 280// queue. 281func (s *session) getHcIndex() int { 282 s.mu.Lock() 283 defer s.mu.Unlock() 284 return s.hcIndex 285} 286 287// getIdleList returns the session's link in its home session pool's idle list. 288func (s *session) getIdleList() *list.Element { 289 s.mu.Lock() 290 defer s.mu.Unlock() 291 return s.idleList 292} 293 294// getNextCheck returns the timestamp for next healthcheck on the session. 295func (s *session) getNextCheck() time.Time { 296 s.mu.Lock() 297 defer s.mu.Unlock() 298 return s.nextCheck 299} 300 301// recycle turns the session back to its home session pool. 302func (s *session) recycle() { 303 s.setTransactionID(nil) 304 if !s.pool.recycle(s) { 305 // s is rejected by its home session pool because it expired and the 306 // session pool currently has enough open sessions. 307 s.destroy(false) 308 } 309} 310 311// destroy removes the session from its home session pool, healthcheck queue 312// and Cloud Spanner service. 313func (s *session) destroy(isExpire bool) bool { 314 // Remove s from session pool. 315 if !s.pool.remove(s, isExpire) { 316 return false 317 } 318 // Unregister s from healthcheck queue. 319 s.pool.hc.unregister(s) 320 // Remove s from Cloud Spanner service. 321 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) 322 defer cancel() 323 s.delete(ctx) 324 return true 325} 326 327func (s *session) delete(ctx context.Context) { 328 // Ignore the error because even if we fail to explicitly destroy the 329 // session, it will be eventually garbage collected by Cloud Spanner. 330 err := s.client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: s.getID()}) 331 if err != nil { 332 logf(s.logger, "Failed to delete session %v. Error: %v", s.getID(), err) 333 } 334} 335 336// prepareForWrite prepares the session for write if it is not already in that 337// state. 338func (s *session) prepareForWrite(ctx context.Context) error { 339 if s.isWritePrepared() { 340 return nil 341 } 342 tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, s.md), s.getID(), s.client) 343 // Session not found should cause the session to be removed from the pool. 344 if isSessionNotFoundError(err) { 345 s.pool.remove(s, false) 346 s.pool.hc.unregister(s) 347 return err 348 } 349 // Enable/disable background preparing of write sessions depending on 350 // whether the BeginTransaction call succeeded. This will prevent the 351 // session pool workers from going into an infinite loop of trying to 352 // prepare sessions. Any subsequent successful BeginTransaction call from 353 // for example takeWriteSession will re-enable the background process. 354 s.pool.mu.Lock() 355 s.pool.disableBackgroundPrepareSessions = err != nil 356 s.pool.mu.Unlock() 357 if err != nil { 358 return err 359 } 360 s.setTransactionID(tx) 361 return nil 362} 363 364// SessionPoolConfig stores configurations of a session pool. 365type SessionPoolConfig struct { 366 // MaxOpened is the maximum number of opened sessions allowed by the session 367 // pool. If the client tries to open a session and there are already 368 // MaxOpened sessions, it will block until one becomes available or the 369 // context passed to the client method is canceled or times out. 370 // 371 // Defaults to NumChannels * 100. 372 MaxOpened uint64 373 374 // MinOpened is the minimum number of opened sessions that the session pool 375 // tries to maintain. Session pool won't continue to expire sessions if 376 // number of opened connections drops below MinOpened. However, if a session 377 // is found to be broken, it will still be evicted from the session pool, 378 // therefore it is posssible that the number of opened sessions drops below 379 // MinOpened. 380 // 381 // Defaults to 100. 382 MinOpened uint64 383 384 // MaxIdle is the maximum number of idle sessions, pool is allowed to keep. 385 // 386 // Defaults to 0. 387 MaxIdle uint64 388 389 // MaxBurst is the maximum number of concurrent session creation requests. 390 // 391 // Defaults to 10. 392 MaxBurst uint64 393 394 // WriteSessions is the fraction of sessions we try to keep prepared for 395 // write. 396 // 397 // Defaults to 0.2. 398 WriteSessions float64 399 400 // HealthCheckWorkers is number of workers used by health checker for this 401 // pool. 402 // 403 // Defaults to 10. 404 HealthCheckWorkers int 405 406 // HealthCheckInterval is how often the health checker pings a session. 407 // 408 // Defaults to 5m. 409 HealthCheckInterval time.Duration 410 411 // TrackSessionHandles determines whether the session pool will keep track 412 // of the stacktrace of the goroutines that take sessions from the pool. 413 // This setting can be used to track down session leak problems. 414 // 415 // Defaults to false. 416 TrackSessionHandles bool 417 418 // healthCheckSampleInterval is how often the health checker samples live 419 // session (for use in maintaining session pool size). 420 // 421 // Defaults to 1m. 422 healthCheckSampleInterval time.Duration 423 424 // sessionLabels for the sessions created in the session pool. 425 sessionLabels map[string]string 426} 427 428// DefaultSessionPoolConfig is the default configuration for the session pool 429// that will be used for a Spanner client, unless the user supplies a specific 430// session pool config. 431var DefaultSessionPoolConfig = SessionPoolConfig{ 432 MinOpened: 100, 433 MaxOpened: numChannels * 100, 434 MaxBurst: 10, 435 WriteSessions: 0.2, 436 HealthCheckWorkers: 10, 437 HealthCheckInterval: 30 * time.Minute, 438} 439 440// errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set. 441func errMinOpenedGTMaxOpened(maxOpened, minOpened uint64) error { 442 return spannerErrorf(codes.InvalidArgument, 443 "require SessionPoolConfig.MaxOpened >= SessionPoolConfig.MinOpened, got %d and %d", maxOpened, minOpened) 444} 445 446// errWriteFractionOutOfRange returns error for 447// SessionPoolConfig.WriteFraction < 0 or SessionPoolConfig.WriteFraction > 1 448func errWriteFractionOutOfRange(writeFraction float64) error { 449 return spannerErrorf(codes.InvalidArgument, 450 "require SessionPoolConfig.WriteSessions >= 0.0 && SessionPoolConfig.WriteSessions <= 1.0, got %.2f", writeFraction) 451} 452 453// errHealthCheckWorkersNegative returns error for 454// SessionPoolConfig.HealthCheckWorkers < 0 455func errHealthCheckWorkersNegative(workers int) error { 456 return spannerErrorf(codes.InvalidArgument, 457 "require SessionPoolConfig.HealthCheckWorkers >= 0, got %d", workers) 458} 459 460// errHealthCheckIntervalNegative returns error for 461// SessionPoolConfig.HealthCheckInterval < 0 462func errHealthCheckIntervalNegative(interval time.Duration) error { 463 return spannerErrorf(codes.InvalidArgument, 464 "require SessionPoolConfig.HealthCheckInterval >= 0, got %v", interval) 465} 466 467// validate verifies that the SessionPoolConfig is good for use. 468func (spc *SessionPoolConfig) validate() error { 469 if spc.MinOpened > spc.MaxOpened && spc.MaxOpened > 0 { 470 return errMinOpenedGTMaxOpened(spc.MaxOpened, spc.MinOpened) 471 } 472 if spc.WriteSessions < 0.0 || spc.WriteSessions > 1.0 { 473 return errWriteFractionOutOfRange(spc.WriteSessions) 474 } 475 if spc.HealthCheckWorkers < 0 { 476 return errHealthCheckWorkersNegative(spc.HealthCheckWorkers) 477 } 478 if spc.HealthCheckInterval < 0 { 479 return errHealthCheckIntervalNegative(spc.HealthCheckInterval) 480 } 481 return nil 482} 483 484// sessionPool creates and caches Cloud Spanner sessions. 485type sessionPool struct { 486 // mu protects sessionPool from concurrent access. 487 mu sync.Mutex 488 // valid marks the validity of the session pool. 489 valid bool 490 // sc is used to create the sessions for the pool. 491 sc *sessionClient 492 // trackedSessionHandles contains all sessions handles that have been 493 // checked out of the pool. The list is only filled if TrackSessionHandles 494 // has been enabled. 495 trackedSessionHandles list.List 496 // idleList caches idle session IDs. Session IDs in this list can be 497 // allocated for use. 498 idleList list.List 499 // idleWriteList caches idle sessions which have been prepared for write. 500 idleWriteList list.List 501 // mayGetSession is for broadcasting that session retrival/creation may 502 // proceed. 503 mayGetSession chan struct{} 504 // numOpened is the total number of open sessions from the session pool. 505 numOpened uint64 506 // createReqs is the number of ongoing session creation requests. 507 createReqs uint64 508 // prepareReqs is the number of ongoing session preparation request. 509 prepareReqs uint64 510 // disableBackgroundPrepareSessions indicates that the BeginTransaction 511 // call for a read/write transaction failed with a permanent error, such as 512 // PermissionDenied or `Database not found`. Further background calls to 513 // prepare sessions will be disabled. 514 disableBackgroundPrepareSessions bool 515 // configuration of the session pool. 516 SessionPoolConfig 517 // hc is the health checker 518 hc *healthChecker 519 520 // mw is the maintenance window containing statistics for the max number of 521 // sessions checked out of the pool during the last 10 minutes. 522 mw *maintenanceWindow 523} 524 525// newSessionPool creates a new session pool. 526func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, error) { 527 if err := config.validate(); err != nil { 528 return nil, err 529 } 530 pool := &sessionPool{ 531 sc: sc, 532 valid: true, 533 mayGetSession: make(chan struct{}), 534 SessionPoolConfig: config, 535 mw: newMaintenanceWindow(config.MaxOpened), 536 } 537 if config.HealthCheckWorkers == 0 { 538 // With 10 workers and assuming average latency of 5ms for 539 // BeginTransaction, we will be able to prepare 2000 tx/sec in advance. 540 // If the rate of takeWriteSession is more than that, it will degrade to 541 // doing BeginTransaction inline. 542 // 543 // TODO: consider resizing the worker pool dynamically according to the load. 544 config.HealthCheckWorkers = 10 545 } 546 if config.HealthCheckInterval == 0 { 547 config.HealthCheckInterval = 5 * time.Minute 548 } 549 if config.healthCheckSampleInterval == 0 { 550 config.healthCheckSampleInterval = time.Minute 551 } 552 // On GCE VM, within the same region an healthcheck ping takes on average 553 // 10ms to finish, given a 5 minutes interval and 10 healthcheck workers, a 554 // healthChecker can effectively mantain 555 // 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions. 556 pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, config.healthCheckSampleInterval, pool) 557 558 // First initialize the pool before we indicate that the healthchecker is 559 // ready. This prevents the maintainer from starting before the pool has 560 // been initialized, which means that we guarantee that the initial 561 // sessions are created using BatchCreateSessions. 562 if config.MinOpened > 0 { 563 numSessions := minUint64(config.MinOpened, math.MaxInt32) 564 if err := pool.initPool(int32(numSessions)); err != nil { 565 return nil, err 566 } 567 } 568 close(pool.hc.ready) 569 return pool, nil 570} 571 572func (p *sessionPool) initPool(numSessions int32) error { 573 p.mu.Lock() 574 // Take budget before the actual session creation. 575 p.numOpened += uint64(numSessions) 576 recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) 577 p.createReqs += uint64(numSessions) 578 p.mu.Unlock() 579 // Asynchronously create the initial sessions for the pool. 580 return p.sc.batchCreateSessions(numSessions, p) 581} 582 583// sessionReady is executed by the SessionClient when a session has been 584// created and is ready to use. This method will add the new session to the 585// pool and decrease the number of sessions that is being created. 586func (p *sessionPool) sessionReady(s *session) { 587 p.mu.Lock() 588 defer p.mu.Unlock() 589 // Set this pool as the home pool of the session and register it with the 590 // health checker. 591 s.pool = p 592 p.hc.register(s) 593 p.createReqs-- 594 // Insert the session at a random position in the pool to prevent all 595 // sessions affiliated with a channel to be placed at sequentially in the 596 // pool. 597 if p.idleList.Len() > 0 { 598 pos := rand.Intn(p.idleList.Len()) 599 before := p.idleList.Front() 600 for i := 0; i < pos; i++ { 601 before = before.Next() 602 } 603 s.setIdleList(p.idleList.InsertBefore(s, before)) 604 } else { 605 s.setIdleList(p.idleList.PushBack(s)) 606 } 607 // Notify other waiters blocking on session creation. 608 close(p.mayGetSession) 609 p.mayGetSession = make(chan struct{}) 610} 611 612// sessionCreationFailed is called by the SessionClient when the creation of one 613// or more requested sessions finished with an error. sessionCreationFailed will 614// decrease the number of sessions being created and notify any waiters that 615// the session creation failed. 616func (p *sessionPool) sessionCreationFailed(err error, numSessions int32) { 617 p.mu.Lock() 618 defer p.mu.Unlock() 619 p.createReqs -= uint64(numSessions) 620 p.numOpened -= uint64(numSessions) 621 recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) 622 // Notify other waiters blocking on session creation. 623 close(p.mayGetSession) 624 p.mayGetSession = make(chan struct{}) 625} 626 627// isValid checks if the session pool is still valid. 628func (p *sessionPool) isValid() bool { 629 if p == nil { 630 return false 631 } 632 p.mu.Lock() 633 defer p.mu.Unlock() 634 return p.valid 635} 636 637// close marks the session pool as closed. 638func (p *sessionPool) close() { 639 if p == nil { 640 return 641 } 642 p.mu.Lock() 643 if !p.valid { 644 p.mu.Unlock() 645 return 646 } 647 p.valid = false 648 p.mu.Unlock() 649 p.hc.close() 650 // destroy all the sessions 651 p.hc.mu.Lock() 652 allSessions := make([]*session, len(p.hc.queue.sessions)) 653 copy(allSessions, p.hc.queue.sessions) 654 p.hc.mu.Unlock() 655 for _, s := range allSessions { 656 s.destroy(false) 657 } 658} 659 660// errInvalidSessionPool is the error for using an invalid session pool. 661var errInvalidSessionPool = spannerErrorf(codes.InvalidArgument, "invalid session pool") 662 663// errGetSessionTimeout returns error for context timeout during 664// sessionPool.take(). 665var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canceled during getting session") 666 667// newSessionHandle creates a new session handle for the given session for this 668// session pool. The session handle will also hold a copy of the current call 669// stack if the session pool has been configured to track the call stacks of 670// sessions being checked out of the pool. 671func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) { 672 sh = &sessionHandle{session: s, checkoutTime: time.Now()} 673 if p.TrackSessionHandles { 674 p.mu.Lock() 675 sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh) 676 p.mu.Unlock() 677 sh.stack = debug.Stack() 678 } 679 return sh 680} 681 682// errGetSessionTimeout returns error for context timeout during 683// sessionPool.take(). 684func (p *sessionPool) errGetSessionTimeout() error { 685 if p.TrackSessionHandles { 686 return p.errGetSessionTimeoutWithTrackedSessionHandles() 687 } 688 return p.errGetBasicSessionTimeout() 689} 690 691// errGetBasicSessionTimeout returns error for context timout during 692// sessionPool.take() without any tracked sessionHandles. 693func (p *sessionPool) errGetBasicSessionTimeout() error { 694 return spannerErrorf(codes.Canceled, "timeout / context canceled during getting session.\n"+ 695 "Enable SessionPoolConfig.TrackSessionHandles if you suspect a session leak to get more information about the checked out sessions.") 696} 697 698// errGetSessionTimeoutWithTrackedSessionHandles returns error for context 699// timout during sessionPool.take() including a stacktrace of each checked out 700// session handle. 701func (p *sessionPool) errGetSessionTimeoutWithTrackedSessionHandles() error { 702 err := spannerErrorf(codes.Canceled, "timeout / context canceled during getting session.") 703 err.(*Error).additionalInformation = p.getTrackedSessionHandleStacksLocked() 704 return err 705} 706 707// getTrackedSessionHandleStacksLocked returns a string containing the 708// stacktrace of all currently checked out sessions of the pool. This method 709// requires the caller to have locked p.mu. 710func (p *sessionPool) getTrackedSessionHandleStacksLocked() string { 711 p.mu.Lock() 712 defer p.mu.Unlock() 713 stackTraces := "" 714 i := 1 715 element := p.trackedSessionHandles.Front() 716 for element != nil { 717 sh := element.Value.(*sessionHandle) 718 sh.mu.Lock() 719 if sh.stack != nil { 720 stackTraces = fmt.Sprintf("%s\n\nSession %d checked out of pool at %s by goroutine:\n%s", stackTraces, i, sh.checkoutTime.Format(time.RFC3339), sh.stack) 721 } 722 sh.mu.Unlock() 723 element = element.Next() 724 i++ 725 } 726 return stackTraces 727} 728 729// shouldPrepareWriteLocked returns true if we should prepare more sessions for write. 730func (p *sessionPool) shouldPrepareWriteLocked() bool { 731 return !p.disableBackgroundPrepareSessions && float64(p.numOpened)*p.WriteSessions > float64(p.idleWriteList.Len()+int(p.prepareReqs)) 732} 733 734func (p *sessionPool) createSession(ctx context.Context) (*session, error) { 735 trace.TracePrintf(ctx, nil, "Creating a new session") 736 doneCreate := func(done bool) { 737 p.mu.Lock() 738 if !done { 739 // Session creation failed, give budget back. 740 p.numOpened-- 741 recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 742 } 743 p.createReqs-- 744 // Notify other waiters blocking on session creation. 745 close(p.mayGetSession) 746 p.mayGetSession = make(chan struct{}) 747 p.mu.Unlock() 748 } 749 s, err := p.sc.createSession(ctx) 750 if err != nil { 751 doneCreate(false) 752 // Should return error directly because of the previous retries on 753 // CreateSession RPC. 754 // If the error is a timeout, there is a chance that the session was 755 // created on the server but is not known to the session pool. This 756 // session will then be garbage collected by the server after 1 hour. 757 return nil, err 758 } 759 s.pool = p 760 p.hc.register(s) 761 doneCreate(true) 762 return s, nil 763} 764 765func (p *sessionPool) isHealthy(s *session) bool { 766 if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) { 767 // TODO: figure out if we need to schedule a new healthcheck worker here. 768 if err := s.ping(); isSessionNotFoundError(err) { 769 // The session is already bad, continue to fetch/create a new one. 770 s.destroy(false) 771 return false 772 } 773 p.hc.scheduledHC(s) 774 } 775 return true 776} 777 778// take returns a cached session if there are available ones; if there isn't 779// any, it tries to allocate a new one. Session returned by take should be used 780// for read operations. 781func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) { 782 trace.TracePrintf(ctx, nil, "Acquiring a read-only session") 783 for { 784 var ( 785 s *session 786 err error 787 ) 788 789 p.mu.Lock() 790 if !p.valid { 791 p.mu.Unlock() 792 return nil, errInvalidSessionPool 793 } 794 if p.idleList.Len() > 0 { 795 // Idle sessions are available, get one from the top of the idle 796 // list. 797 s = p.idleList.Remove(p.idleList.Front()).(*session) 798 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 799 "Acquired read-only session") 800 } else if p.idleWriteList.Len() > 0 { 801 s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session) 802 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 803 "Acquired read-write session") 804 } 805 if s != nil { 806 s.setIdleList(nil) 807 numCheckedOut := p.currSessionsCheckedOutLocked() 808 p.mu.Unlock() 809 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 810 // From here, session is no longer in idle list, so healthcheck 811 // workers won't destroy it. If healthcheck workers failed to 812 // schedule healthcheck for the session timely, do the check here. 813 // Because session check is still much cheaper than session 814 // creation, they should be reused as much as possible. 815 if !p.isHealthy(s) { 816 continue 817 } 818 return p.newSessionHandle(s), nil 819 } 820 821 // Idle list is empty, block if session pool has reached max session 822 // creation concurrency or max number of open sessions. 823 if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) { 824 mayGetSession := p.mayGetSession 825 p.mu.Unlock() 826 trace.TracePrintf(ctx, nil, "Waiting for read-only session to become available") 827 select { 828 case <-ctx.Done(): 829 trace.TracePrintf(ctx, nil, "Context done waiting for session") 830 return nil, p.errGetSessionTimeout() 831 case <-mayGetSession: 832 } 833 continue 834 } 835 836 // Take budget before the actual session creation. 837 p.numOpened++ 838 // Creating a new session that will be returned directly to the client 839 // means that the max number of sessions in use also increases. 840 numCheckedOut := p.currSessionsCheckedOutLocked() 841 recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 842 p.createReqs++ 843 p.mu.Unlock() 844 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 845 if s, err = p.createSession(ctx); err != nil { 846 trace.TracePrintf(ctx, nil, "Error creating session: %v", err) 847 return nil, toSpannerError(err) 848 } 849 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 850 "Created session") 851 return p.newSessionHandle(s), nil 852 } 853} 854 855// takeWriteSession returns a write prepared cached session if there are 856// available ones; if there isn't any, it tries to allocate a new one. Session 857// returned should be used for read write transactions. 858func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) { 859 trace.TracePrintf(ctx, nil, "Acquiring a read-write session") 860 for { 861 var ( 862 s *session 863 err error 864 ) 865 866 p.mu.Lock() 867 if !p.valid { 868 p.mu.Unlock() 869 return nil, errInvalidSessionPool 870 } 871 if p.idleWriteList.Len() > 0 { 872 // Idle sessions are available, get one from the top of the idle 873 // list. 874 s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session) 875 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-write session") 876 } else if p.idleList.Len() > 0 { 877 s = p.idleList.Remove(p.idleList.Front()).(*session) 878 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-only session") 879 } 880 if s != nil { 881 s.setIdleList(nil) 882 numCheckedOut := p.currSessionsCheckedOutLocked() 883 p.mu.Unlock() 884 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 885 // From here, session is no longer in idle list, so healthcheck 886 // workers won't destroy it. If healthcheck workers failed to 887 // schedule healthcheck for the session timely, do the check here. 888 // Because session check is still much cheaper than session 889 // creation, they should be reused as much as possible. 890 if !p.isHealthy(s) { 891 continue 892 } 893 } else { 894 // Idle list is empty, block if session pool has reached max session 895 // creation concurrency or max number of open sessions. 896 if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) { 897 mayGetSession := p.mayGetSession 898 p.mu.Unlock() 899 trace.TracePrintf(ctx, nil, "Waiting for read-write session to become available") 900 select { 901 case <-ctx.Done(): 902 trace.TracePrintf(ctx, nil, "Context done waiting for session") 903 return nil, p.errGetSessionTimeout() 904 case <-mayGetSession: 905 } 906 continue 907 } 908 909 // Take budget before the actual session creation. 910 p.numOpened++ 911 // Creating a new session that will be returned directly to the client 912 // means that the max number of sessions in use also increases. 913 numCheckedOut := p.currSessionsCheckedOutLocked() 914 recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 915 p.createReqs++ 916 p.mu.Unlock() 917 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 918 if s, err = p.createSession(ctx); err != nil { 919 trace.TracePrintf(ctx, nil, "Error creating session: %v", err) 920 return nil, toSpannerError(err) 921 } 922 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 923 "Created session") 924 } 925 if !s.isWritePrepared() { 926 if err = s.prepareForWrite(ctx); err != nil { 927 if isSessionNotFoundError(err) { 928 s.destroy(false) 929 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 930 "Session not found for write") 931 return nil, toSpannerError(err) 932 } 933 934 s.recycle() 935 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 936 "Error preparing session for write") 937 return nil, toSpannerError(err) 938 } 939 } 940 return p.newSessionHandle(s), nil 941 } 942} 943 944// recycle puts session s back to the session pool's idle list, it returns true 945// if the session pool successfully recycles session s. 946func (p *sessionPool) recycle(s *session) bool { 947 p.mu.Lock() 948 defer p.mu.Unlock() 949 if !s.isValid() || !p.valid { 950 // Reject the session if session is invalid or pool itself is invalid. 951 return false 952 } 953 // Put session at the top of the list to be handed out in LIFO order for load balancing 954 // across channels. 955 if s.isWritePrepared() { 956 s.setIdleList(p.idleWriteList.PushFront(s)) 957 } else { 958 s.setIdleList(p.idleList.PushFront(s)) 959 } 960 // Broadcast that a session has been returned to idle list. 961 close(p.mayGetSession) 962 p.mayGetSession = make(chan struct{}) 963 return true 964} 965 966// remove atomically removes session s from the session pool and invalidates s. 967// If isExpire == true, the removal is triggered by session expiration and in 968// such cases, only idle sessions can be removed. 969func (p *sessionPool) remove(s *session, isExpire bool) bool { 970 p.mu.Lock() 971 defer p.mu.Unlock() 972 if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) { 973 // Don't expire session if the session is not in idle list (in use), or 974 // if number of open sessions is going below p.MinOpened. 975 return false 976 } 977 ol := s.setIdleList(nil) 978 // If the session is in the idlelist, remove it. 979 if ol != nil { 980 // Remove from whichever list it is in. 981 p.idleList.Remove(ol) 982 p.idleWriteList.Remove(ol) 983 } 984 if s.invalidate() { 985 // Decrease the number of opened sessions. 986 p.numOpened-- 987 recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) 988 // Broadcast that a session has been destroyed. 989 close(p.mayGetSession) 990 p.mayGetSession = make(chan struct{}) 991 return true 992 } 993 return false 994} 995 996func (p *sessionPool) currSessionsCheckedOutLocked() uint64 { 997 return p.numOpened - uint64(p.idleList.Len()) - uint64(p.idleWriteList.Len()) 998} 999 1000// hcHeap implements heap.Interface. It is used to create the priority queue for 1001// session healthchecks. 1002type hcHeap struct { 1003 sessions []*session 1004} 1005 1006// Len implements heap.Interface.Len. 1007func (h hcHeap) Len() int { 1008 return len(h.sessions) 1009} 1010 1011// Less implements heap.Interface.Less. 1012func (h hcHeap) Less(i, j int) bool { 1013 return h.sessions[i].getNextCheck().Before(h.sessions[j].getNextCheck()) 1014} 1015 1016// Swap implements heap.Interface.Swap. 1017func (h hcHeap) Swap(i, j int) { 1018 h.sessions[i], h.sessions[j] = h.sessions[j], h.sessions[i] 1019 h.sessions[i].setHcIndex(i) 1020 h.sessions[j].setHcIndex(j) 1021} 1022 1023// Push implements heap.Interface.Push. 1024func (h *hcHeap) Push(s interface{}) { 1025 ns := s.(*session) 1026 ns.setHcIndex(len(h.sessions)) 1027 h.sessions = append(h.sessions, ns) 1028} 1029 1030// Pop implements heap.Interface.Pop. 1031func (h *hcHeap) Pop() interface{} { 1032 old := h.sessions 1033 n := len(old) 1034 s := old[n-1] 1035 h.sessions = old[:n-1] 1036 s.setHcIndex(-1) 1037 return s 1038} 1039 1040// maintenanceWindowSize specifies the number of health check cycles that 1041// defines a maintenance window. The maintenance window keeps track of a 1042// rolling set of numbers for the number of maximum checked out sessions during 1043// the maintenance window. This is used by the maintainer to determine the 1044// number of sessions to create or delete at the end of each health check 1045// cycle. 1046const maintenanceWindowSize = 10 1047 1048// maintenanceWindow contains the statistics that are gathered during a health 1049// check maintenance window. 1050type maintenanceWindow struct { 1051 mu sync.Mutex 1052 // maxSessionsCheckedOut contains the maximum number of sessions that was 1053 // checked out of the session pool during a health check cycle. This number 1054 // indicates the number of sessions that was actually needed by the pool to 1055 // serve the load during that cycle. The values are kept as a rolling set 1056 // containing the values for the past 10 cycles (minutes). The maintainer 1057 // uses these values to determine the number of sessions to keep at the end 1058 // of each cycle. 1059 maxSessionsCheckedOut [maintenanceWindowSize]uint64 1060} 1061 1062// maxSessionsCheckedOutDuringWindow returns the maximum number of sessions 1063// that has been checked out during the last maintenance window of 10 cycles 1064// (minutes). 1065func (mw *maintenanceWindow) maxSessionsCheckedOutDuringWindow() uint64 { 1066 mw.mu.Lock() 1067 defer mw.mu.Unlock() 1068 var max uint64 1069 for _, cycleMax := range mw.maxSessionsCheckedOut { 1070 max = maxUint64(max, cycleMax) 1071 } 1072 return max 1073} 1074 1075// updateMaxSessionsCheckedOutDuringWindow updates the maximum number of 1076// sessions that has been checked out of the pool during the current 1077// cycle of the maintenance window. A maintenance window consists of 10 1078// maintenance cycles. Each cycle keeps track of the max number of sessions in 1079// use during that cycle. The rolling maintenance window of 10 cycles is used 1080// to determine the number of sessions to keep at the end of a cycle by 1081// calculating the max in use during the last 10 cycles. 1082func (mw *maintenanceWindow) updateMaxSessionsCheckedOutDuringWindow(currNumSessionsCheckedOut uint64) { 1083 mw.mu.Lock() 1084 defer mw.mu.Unlock() 1085 mw.maxSessionsCheckedOut[0] = maxUint64(currNumSessionsCheckedOut, mw.maxSessionsCheckedOut[0]) 1086} 1087 1088// startNewCycle starts a new health check cycle with the specified number of 1089// checked out sessions as its initial value. 1090func (mw *maintenanceWindow) startNewCycle(currNumSessionsCheckedOut uint64) { 1091 mw.mu.Lock() 1092 defer mw.mu.Unlock() 1093 copy(mw.maxSessionsCheckedOut[1:], mw.maxSessionsCheckedOut[:9]) 1094 mw.maxSessionsCheckedOut[0] = currNumSessionsCheckedOut 1095} 1096 1097// newMaintenanceWindow creates a new maintenance window with all values for 1098// maxSessionsCheckedOut set to maxOpened. This ensures that a complete 1099// maintenance window must pass before the maintainer will start to delete any 1100// sessions. 1101func newMaintenanceWindow(maxOpened uint64) *maintenanceWindow { 1102 mw := &maintenanceWindow{} 1103 // Initialize the rolling window with max values to prevent the maintainer 1104 // from deleting sessions before a complete window of 10 cycles has 1105 // finished. 1106 for i := 0; i < maintenanceWindowSize; i++ { 1107 mw.maxSessionsCheckedOut[i] = maxOpened 1108 } 1109 return mw 1110} 1111 1112// healthChecker performs periodical healthchecks on registered sessions. 1113type healthChecker struct { 1114 // mu protects concurrent access to healthChecker. 1115 mu sync.Mutex 1116 // queue is the priority queue for session healthchecks. Sessions with lower 1117 // nextCheck rank higher in the queue. 1118 queue hcHeap 1119 // interval is the average interval between two healthchecks on a session. 1120 interval time.Duration 1121 // workers is the number of concurrent healthcheck workers. 1122 workers int 1123 // waitWorkers waits for all healthcheck workers to exit 1124 waitWorkers sync.WaitGroup 1125 // pool is the underlying session pool. 1126 pool *sessionPool 1127 // sampleInterval is the interval of sampling by the maintainer. 1128 sampleInterval time.Duration 1129 // ready is used to signal that maintainer can start running. 1130 ready chan struct{} 1131 // done is used to signal that health checker should be closed. 1132 done chan struct{} 1133 // once is used for closing channel done only once. 1134 once sync.Once 1135 maintainerCancel func() 1136} 1137 1138// newHealthChecker initializes new instance of healthChecker. 1139func newHealthChecker(interval time.Duration, workers int, sampleInterval time.Duration, pool *sessionPool) *healthChecker { 1140 if workers <= 0 { 1141 workers = 1 1142 } 1143 hc := &healthChecker{ 1144 interval: interval, 1145 workers: workers, 1146 pool: pool, 1147 sampleInterval: sampleInterval, 1148 ready: make(chan struct{}), 1149 done: make(chan struct{}), 1150 maintainerCancel: func() {}, 1151 } 1152 hc.waitWorkers.Add(1) 1153 go hc.maintainer() 1154 for i := 1; i <= hc.workers; i++ { 1155 hc.waitWorkers.Add(1) 1156 go hc.worker(i) 1157 } 1158 return hc 1159} 1160 1161// close closes the healthChecker and waits for all healthcheck workers to exit. 1162func (hc *healthChecker) close() { 1163 hc.mu.Lock() 1164 hc.maintainerCancel() 1165 hc.mu.Unlock() 1166 hc.once.Do(func() { close(hc.done) }) 1167 hc.waitWorkers.Wait() 1168} 1169 1170// isClosing checks if a healthChecker is already closing. 1171func (hc *healthChecker) isClosing() bool { 1172 select { 1173 case <-hc.done: 1174 return true 1175 default: 1176 return false 1177 } 1178} 1179 1180// getInterval gets the healthcheck interval. 1181func (hc *healthChecker) getInterval() time.Duration { 1182 hc.mu.Lock() 1183 defer hc.mu.Unlock() 1184 return hc.interval 1185} 1186 1187// scheduledHCLocked schedules next healthcheck on session s with the assumption 1188// that hc.mu is being held. 1189func (hc *healthChecker) scheduledHCLocked(s *session) { 1190 // The next healthcheck will be scheduled after 1191 // [interval*0.5, interval*1.5) ns. 1192 nsFromNow := rand.Int63n(int64(hc.interval)) + int64(hc.interval)/2 1193 s.setNextCheck(time.Now().Add(time.Duration(nsFromNow))) 1194 if hi := s.getHcIndex(); hi != -1 { 1195 // Session is still being tracked by healthcheck workers. 1196 heap.Fix(&hc.queue, hi) 1197 } 1198} 1199 1200// scheduledHC schedules next healthcheck on session s. It is safe to be called 1201// concurrently. 1202func (hc *healthChecker) scheduledHC(s *session) { 1203 hc.mu.Lock() 1204 defer hc.mu.Unlock() 1205 hc.scheduledHCLocked(s) 1206} 1207 1208// register registers a session with healthChecker for periodical healthcheck. 1209func (hc *healthChecker) register(s *session) { 1210 hc.mu.Lock() 1211 defer hc.mu.Unlock() 1212 hc.scheduledHCLocked(s) 1213 heap.Push(&hc.queue, s) 1214} 1215 1216// unregister unregisters a session from healthcheck queue. 1217func (hc *healthChecker) unregister(s *session) { 1218 hc.mu.Lock() 1219 defer hc.mu.Unlock() 1220 oi := s.setHcIndex(-1) 1221 if oi >= 0 { 1222 heap.Remove(&hc.queue, oi) 1223 } 1224} 1225 1226// markDone marks that health check for session has been performed. 1227func (hc *healthChecker) markDone(s *session) { 1228 hc.mu.Lock() 1229 defer hc.mu.Unlock() 1230 s.checkingHealth = false 1231} 1232 1233// healthCheck checks the health of the session and pings it if needed. 1234func (hc *healthChecker) healthCheck(s *session) { 1235 defer hc.markDone(s) 1236 if !s.pool.isValid() { 1237 // Session pool is closed, perform a garbage collection. 1238 s.destroy(false) 1239 return 1240 } 1241 if err := s.ping(); isSessionNotFoundError(err) { 1242 // Ping failed, destroy the session. 1243 s.destroy(false) 1244 } 1245} 1246 1247// worker performs the healthcheck on sessions in healthChecker's priority 1248// queue. 1249func (hc *healthChecker) worker(i int) { 1250 // Returns a session which we should ping to keep it alive. 1251 getNextForPing := func() *session { 1252 hc.pool.mu.Lock() 1253 defer hc.pool.mu.Unlock() 1254 hc.mu.Lock() 1255 defer hc.mu.Unlock() 1256 if hc.queue.Len() <= 0 { 1257 // Queue is empty. 1258 return nil 1259 } 1260 s := hc.queue.sessions[0] 1261 if s.getNextCheck().After(time.Now()) && hc.pool.valid { 1262 // All sessions have been checked recently. 1263 return nil 1264 } 1265 hc.scheduledHCLocked(s) 1266 if !s.checkingHealth { 1267 s.checkingHealth = true 1268 return s 1269 } 1270 return nil 1271 } 1272 1273 // Returns a session which we should prepare for write. 1274 getNextForTx := func() *session { 1275 hc.pool.mu.Lock() 1276 defer hc.pool.mu.Unlock() 1277 if hc.pool.shouldPrepareWriteLocked() { 1278 if hc.pool.idleList.Len() > 0 && hc.pool.valid { 1279 hc.mu.Lock() 1280 defer hc.mu.Unlock() 1281 if hc.pool.idleList.Front().Value.(*session).checkingHealth { 1282 return nil 1283 } 1284 session := hc.pool.idleList.Remove(hc.pool.idleList.Front()).(*session) 1285 session.checkingHealth = true 1286 hc.pool.prepareReqs++ 1287 return session 1288 } 1289 } 1290 return nil 1291 } 1292 1293 for { 1294 if hc.isClosing() { 1295 // Exit when the pool has been closed and all sessions have been 1296 // destroyed or when health checker has been closed. 1297 hc.waitWorkers.Done() 1298 return 1299 } 1300 ws := getNextForTx() 1301 if ws != nil { 1302 ctx, cancel := context.WithTimeout(context.Background(), time.Minute) 1303 err := ws.prepareForWrite(ctx) 1304 cancel() 1305 if err != nil { 1306 // Skip handling prepare error, session can be prepared in next 1307 // cycle. 1308 // Don't log about permission errors, which may be expected 1309 // (e.g. using read-only auth). 1310 serr := toSpannerError(err).(*Error) 1311 if serr.Code != codes.PermissionDenied { 1312 logf(hc.pool.sc.logger, "Failed to prepare session, error: %v", serr) 1313 } 1314 } 1315 hc.pool.recycle(ws) 1316 hc.pool.mu.Lock() 1317 hc.pool.prepareReqs-- 1318 hc.pool.mu.Unlock() 1319 hc.markDone(ws) 1320 } 1321 rs := getNextForPing() 1322 if rs == nil { 1323 if ws == nil { 1324 // No work to be done so sleep to avoid burning CPU. 1325 pause := int64(100 * time.Millisecond) 1326 if pause > int64(hc.interval) { 1327 pause = int64(hc.interval) 1328 } 1329 select { 1330 case <-time.After(time.Duration(rand.Int63n(pause) + pause/2)): 1331 case <-hc.done: 1332 } 1333 1334 } 1335 continue 1336 } 1337 hc.healthCheck(rs) 1338 } 1339} 1340 1341// maintainer maintains the number of sessions in the pool based on the session 1342// pool configuration and the current and historical number of sessions checked 1343// out of the pool. The maintainer will: 1344// 1. Ensure that the session pool contains at least MinOpened sessions. 1345// 2. If the current number of sessions in the pool exceeds the greatest number 1346// of checked out sessions (=sessions in use) during the last 10 minutes, 1347// and the delta is larger than MaxIdleSessions, the maintainer will reduce 1348// the number of sessions to maxSessionsInUseDuringWindow+MaxIdleSessions. 1349func (hc *healthChecker) maintainer() { 1350 // Wait until the pool is ready. 1351 <-hc.ready 1352 1353 for iteration := uint64(0); ; iteration++ { 1354 if hc.isClosing() { 1355 hc.waitWorkers.Done() 1356 return 1357 } 1358 1359 hc.pool.mu.Lock() 1360 currSessionsOpened := hc.pool.numOpened 1361 maxIdle := hc.pool.MaxIdle 1362 minOpened := hc.pool.MinOpened 1363 hc.pool.mu.Unlock() 1364 // Get the maximum number of sessions in use during the current 1365 // maintenance window. 1366 maxSessionsInUseDuringWindow := hc.pool.mw.maxSessionsCheckedOutDuringWindow() 1367 hc.mu.Lock() 1368 ctx, cancel := context.WithTimeout(context.Background(), hc.sampleInterval) 1369 hc.maintainerCancel = cancel 1370 hc.mu.Unlock() 1371 1372 // Grow or shrink pool if needed. 1373 // The number of sessions in the pool should be in the range 1374 // [Config.MinOpened, Config.MaxIdle+maxSessionsInUseDuringWindow] 1375 if currSessionsOpened < minOpened { 1376 hc.growPool(ctx, minOpened) 1377 } else if maxIdle+maxSessionsInUseDuringWindow < currSessionsOpened { 1378 hc.shrinkPool(ctx, maxIdle+maxSessionsInUseDuringWindow) 1379 } 1380 1381 select { 1382 case <-ctx.Done(): 1383 case <-hc.done: 1384 cancel() 1385 } 1386 // Cycle the maintenance window. This will remove the oldest cycle and 1387 // add a new cycle at the beginning of the maintenance window with the 1388 // currently checked out number of sessions as the max number of 1389 // sessions in use in this cycle. This value will be increased during 1390 // the next cycle if it increases. 1391 hc.pool.mu.Lock() 1392 currSessionsInUse := hc.pool.currSessionsCheckedOutLocked() 1393 hc.pool.mu.Unlock() 1394 hc.pool.mw.startNewCycle(currSessionsInUse) 1395 } 1396} 1397 1398// growPool grows the number of sessions in the pool to the specified number of 1399// sessions. It timeouts on sampleInterval. 1400func (hc *healthChecker) growPool(ctx context.Context, growToNumSessions uint64) { 1401 // Calculate the max number of sessions to create as a safeguard against 1402 // other processes that could be deleting sessions concurrently. 1403 hc.pool.mu.Lock() 1404 maxSessionsToCreate := int(growToNumSessions - hc.pool.numOpened) 1405 hc.pool.mu.Unlock() 1406 var created int 1407 for { 1408 if ctx.Err() != nil { 1409 return 1410 } 1411 1412 p := hc.pool 1413 p.mu.Lock() 1414 // Take budget before the actual session creation. 1415 if growToNumSessions <= p.numOpened || created >= maxSessionsToCreate { 1416 p.mu.Unlock() 1417 break 1418 } 1419 p.numOpened++ 1420 recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 1421 p.createReqs++ 1422 shouldPrepareWrite := p.shouldPrepareWriteLocked() 1423 p.mu.Unlock() 1424 var ( 1425 s *session 1426 err error 1427 ) 1428 createContext, cancel := context.WithTimeout(context.Background(), time.Minute) 1429 if s, err = p.createSession(createContext); err != nil { 1430 cancel() 1431 logf(p.sc.logger, "Failed to create session, error: %v", toSpannerError(err)) 1432 continue 1433 } 1434 cancel() 1435 created++ 1436 if shouldPrepareWrite { 1437 prepareContext, cancel := context.WithTimeout(context.Background(), time.Minute) 1438 if err = s.prepareForWrite(prepareContext); err != nil { 1439 cancel() 1440 p.recycle(s) 1441 // Don't log about permission errors, which may be expected 1442 // (e.g. using read-only auth). 1443 serr := toSpannerError(err).(*Error) 1444 if serr.Code != codes.PermissionDenied { 1445 logf(p.sc.logger, "Failed to prepare session, error: %v", serr) 1446 } 1447 continue 1448 } 1449 cancel() 1450 } 1451 p.recycle(s) 1452 } 1453} 1454 1455// shrinkPool scales down the session pool. The method will stop deleting 1456// sessions when shrinkToNumSessions number of sessions in the pool has 1457// been reached. The method will also stop deleting sessions if it detects that 1458// another process has started creating sessions for the pool again, for 1459// example through the take() method. 1460func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uint64) { 1461 hc.pool.mu.Lock() 1462 maxSessionsToDelete := int(hc.pool.numOpened - shrinkToNumSessions) 1463 hc.pool.mu.Unlock() 1464 var deleted int 1465 var prevNumOpened uint64 = math.MaxUint64 1466 for { 1467 if ctx.Err() != nil { 1468 return 1469 } 1470 1471 p := hc.pool 1472 p.mu.Lock() 1473 // Check if the number of open sessions has increased. If it has, we 1474 // should stop deleting sessions, as the load has increased and 1475 // additional sessions are needed. 1476 if p.numOpened >= prevNumOpened { 1477 p.mu.Unlock() 1478 break 1479 } 1480 prevNumOpened = p.numOpened 1481 1482 // Check on both whether we have reached the number of open sessions as 1483 // well as the number of sessions to delete, in case sessions have been 1484 // deleted by other methods because they have expired or deemed 1485 // invalid. 1486 if shrinkToNumSessions >= p.numOpened || deleted >= maxSessionsToDelete { 1487 p.mu.Unlock() 1488 break 1489 } 1490 1491 var s *session 1492 if p.idleList.Len() > 0 { 1493 s = p.idleList.Front().Value.(*session) 1494 } else if p.idleWriteList.Len() > 0 { 1495 s = p.idleWriteList.Front().Value.(*session) 1496 } 1497 p.mu.Unlock() 1498 if s != nil { 1499 deleted++ 1500 // destroy session as expire. 1501 s.destroy(true) 1502 } else { 1503 break 1504 } 1505 } 1506} 1507 1508// maxUint64 returns the maximum of two uint64. 1509func maxUint64(a, b uint64) uint64 { 1510 if a > b { 1511 return a 1512 } 1513 return b 1514} 1515 1516// minUint64 returns the minimum of two uint64. 1517func minUint64(a, b uint64) uint64 { 1518 if a > b { 1519 return b 1520 } 1521 return a 1522} 1523 1524// sessionResourceType is the type name of Spanner sessions. 1525const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session" 1526 1527// isSessionNotFoundError returns true if the given error is a 1528// `Session not found` error. 1529func isSessionNotFoundError(err error) bool { 1530 if err == nil { 1531 return false 1532 } 1533 if ErrCode(err) == codes.NotFound { 1534 if rt, ok := extractResourceType(err); ok { 1535 return rt == sessionResourceType 1536 } 1537 } 1538 return false 1539} 1540