1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "container/heap" 21 "container/list" 22 "context" 23 "fmt" 24 "log" 25 "math" 26 "math/rand" 27 "runtime/debug" 28 "sync" 29 "time" 30 31 "cloud.google.com/go/internal/trace" 32 "cloud.google.com/go/internal/version" 33 vkit "cloud.google.com/go/spanner/apiv1" 34 "go.opencensus.io/stats" 35 "go.opencensus.io/tag" 36 octrace "go.opencensus.io/trace" 37 sppb "google.golang.org/genproto/googleapis/spanner/v1" 38 "google.golang.org/grpc/codes" 39 "google.golang.org/grpc/metadata" 40) 41 42const healthCheckIntervalMins = 50 43 44// sessionHandle is an interface for transactions to access Cloud Spanner 45// sessions safely. It is generated by sessionPool.take(). 46type sessionHandle struct { 47 // mu guarantees that the inner session object is returned / destroyed only 48 // once. 49 mu sync.Mutex 50 // session is a pointer to a session object. Transactions never need to 51 // access it directly. 52 session *session 53 // checkoutTime is the time the session was checked out of the pool. 54 checkoutTime time.Time 55 // trackedSessionHandle is the linked list node which links the session to 56 // the list of tracked session handles. trackedSessionHandle is only set if 57 // TrackSessionHandles has been enabled in the session pool configuration. 58 trackedSessionHandle *list.Element 59 // stack is the call stack of the goroutine that checked out the session 60 // from the pool. This can be used to track down session leak problems. 61 stack []byte 62} 63 64// recycle gives the inner session object back to its home session pool. It is 65// safe to call recycle multiple times but only the first one would take effect. 66func (sh *sessionHandle) recycle() { 67 sh.mu.Lock() 68 if sh.session == nil { 69 // sessionHandle has already been recycled. 70 sh.mu.Unlock() 71 return 72 } 73 p := sh.session.pool 74 tracked := sh.trackedSessionHandle 75 sh.session.recycle() 76 sh.session = nil 77 sh.trackedSessionHandle = nil 78 sh.checkoutTime = time.Time{} 79 sh.stack = nil 80 sh.mu.Unlock() 81 if tracked != nil { 82 p.mu.Lock() 83 p.trackedSessionHandles.Remove(tracked) 84 p.mu.Unlock() 85 } 86} 87 88// getID gets the Cloud Spanner session ID from the internal session object. 89// getID returns empty string if the sessionHandle is nil or the inner session 90// object has been released by recycle / destroy. 91func (sh *sessionHandle) getID() string { 92 sh.mu.Lock() 93 defer sh.mu.Unlock() 94 if sh.session == nil { 95 // sessionHandle has already been recycled/destroyed. 96 return "" 97 } 98 return sh.session.getID() 99} 100 101// getClient gets the Cloud Spanner RPC client associated with the session ID 102// in sessionHandle. 103func (sh *sessionHandle) getClient() *vkit.Client { 104 sh.mu.Lock() 105 defer sh.mu.Unlock() 106 if sh.session == nil { 107 return nil 108 } 109 return sh.session.client 110} 111 112// getMetadata returns the metadata associated with the session in sessionHandle. 113func (sh *sessionHandle) getMetadata() metadata.MD { 114 sh.mu.Lock() 115 defer sh.mu.Unlock() 116 if sh.session == nil { 117 return nil 118 } 119 return sh.session.md 120} 121 122// getTransactionID returns the transaction id in the session if available. 123func (sh *sessionHandle) getTransactionID() transactionID { 124 sh.mu.Lock() 125 defer sh.mu.Unlock() 126 if sh.session == nil { 127 return nil 128 } 129 return sh.session.tx 130} 131 132// destroy destroys the inner session object. It is safe to call destroy 133// multiple times and only the first call would attempt to 134// destroy the inner session object. 135func (sh *sessionHandle) destroy() { 136 sh.mu.Lock() 137 s := sh.session 138 if s == nil { 139 // sessionHandle has already been recycled. 140 sh.mu.Unlock() 141 return 142 } 143 tracked := sh.trackedSessionHandle 144 sh.session = nil 145 sh.trackedSessionHandle = nil 146 sh.checkoutTime = time.Time{} 147 sh.stack = nil 148 sh.mu.Unlock() 149 150 if tracked != nil { 151 p := s.pool 152 p.mu.Lock() 153 p.trackedSessionHandles.Remove(tracked) 154 p.mu.Unlock() 155 } 156 s.destroy(false) 157} 158 159// session wraps a Cloud Spanner session ID through which transactions are 160// created and executed. 161type session struct { 162 // client is the RPC channel to Cloud Spanner. It is set only once during 163 // session's creation. 164 client *vkit.Client 165 // id is the unique id of the session in Cloud Spanner. It is set only once 166 // during session's creation. 167 id string 168 // pool is the session's home session pool where it was created. It is set 169 // only once during session's creation. 170 pool *sessionPool 171 // createTime is the timestamp of the session's creation. It is set only 172 // once during session's creation. 173 createTime time.Time 174 // logger is the logger configured for the Spanner client that created the 175 // session. If nil, logging will be directed to the standard logger. 176 logger *log.Logger 177 178 // mu protects the following fields from concurrent access: both 179 // healthcheck workers and transactions can modify them. 180 mu sync.Mutex 181 // valid marks the validity of a session. 182 valid bool 183 // hcIndex is the index of the session inside the global healthcheck queue. 184 // If hcIndex < 0, session has been unregistered from the queue. 185 hcIndex int 186 // idleList is the linkedlist node which links the session to its home 187 // session pool's idle list. If idleList == nil, the 188 // session is not in idle list. 189 idleList *list.Element 190 // nextCheck is the timestamp of next scheduled healthcheck of the session. 191 // It is maintained by the global health checker. 192 nextCheck time.Time 193 // checkingHelath is true if currently this session is being processed by 194 // health checker. Must be modified under health checker lock. 195 checkingHealth bool 196 // md is the Metadata to be sent with each request. 197 md metadata.MD 198 // tx contains the transaction id if the session has been prepared for 199 // write. 200 tx transactionID 201 // firstHCDone indicates whether the first health check is done or not. 202 firstHCDone bool 203} 204 205// isValid returns true if the session is still valid for use. 206func (s *session) isValid() bool { 207 s.mu.Lock() 208 defer s.mu.Unlock() 209 return s.valid 210} 211 212// isWritePrepared returns true if the session is prepared for write. 213func (s *session) isWritePrepared() bool { 214 s.mu.Lock() 215 defer s.mu.Unlock() 216 return s.tx != nil 217} 218 219// String implements fmt.Stringer for session. 220func (s *session) String() string { 221 s.mu.Lock() 222 defer s.mu.Unlock() 223 return fmt.Sprintf("<id=%v, hcIdx=%v, idleList=%p, valid=%v, create=%v, nextcheck=%v>", 224 s.id, s.hcIndex, s.idleList, s.valid, s.createTime, s.nextCheck) 225} 226 227// ping verifies if the session is still alive in Cloud Spanner. 228func (s *session) ping() error { 229 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 230 defer cancel() 231 232 // Start parent span that doesn't record. 233 _, span := octrace.StartSpan(ctx, "cloud.google.com/go/spanner.ping", octrace.WithSampler(octrace.NeverSample())) 234 defer span.End() 235 236 // s.getID is safe even when s is invalid. 237 _, err := s.client.ExecuteSql(contextWithOutgoingMetadata(ctx, s.md), &sppb.ExecuteSqlRequest{ 238 Session: s.getID(), 239 Sql: "SELECT 1", 240 }) 241 return err 242} 243 244// setHcIndex atomically sets the session's index in the healthcheck queue and 245// returns the old index. 246func (s *session) setHcIndex(i int) int { 247 s.mu.Lock() 248 defer s.mu.Unlock() 249 oi := s.hcIndex 250 s.hcIndex = i 251 return oi 252} 253 254// setIdleList atomically sets the session's idle list link and returns the old 255// link. 256func (s *session) setIdleList(le *list.Element) *list.Element { 257 s.mu.Lock() 258 defer s.mu.Unlock() 259 old := s.idleList 260 s.idleList = le 261 return old 262} 263 264// invalidate marks a session as invalid and returns the old validity. 265func (s *session) invalidate() bool { 266 s.mu.Lock() 267 defer s.mu.Unlock() 268 ov := s.valid 269 s.valid = false 270 return ov 271} 272 273// setNextCheck sets the timestamp for next healthcheck on the session. 274func (s *session) setNextCheck(t time.Time) { 275 s.mu.Lock() 276 defer s.mu.Unlock() 277 s.nextCheck = t 278} 279 280// setTransactionID sets the transaction id in the session 281func (s *session) setTransactionID(tx transactionID) { 282 s.mu.Lock() 283 defer s.mu.Unlock() 284 s.tx = tx 285} 286 287// getID returns the session ID which uniquely identifies the session in Cloud 288// Spanner. 289func (s *session) getID() string { 290 s.mu.Lock() 291 defer s.mu.Unlock() 292 return s.id 293} 294 295// getHcIndex returns the session's index into the global healthcheck priority 296// queue. 297func (s *session) getHcIndex() int { 298 s.mu.Lock() 299 defer s.mu.Unlock() 300 return s.hcIndex 301} 302 303// getIdleList returns the session's link in its home session pool's idle list. 304func (s *session) getIdleList() *list.Element { 305 s.mu.Lock() 306 defer s.mu.Unlock() 307 return s.idleList 308} 309 310// getNextCheck returns the timestamp for next healthcheck on the session. 311func (s *session) getNextCheck() time.Time { 312 s.mu.Lock() 313 defer s.mu.Unlock() 314 return s.nextCheck 315} 316 317// recycle turns the session back to its home session pool. 318func (s *session) recycle() { 319 s.setTransactionID(nil) 320 if !s.pool.recycle(s) { 321 // s is rejected by its home session pool because it expired and the 322 // session pool currently has enough open sessions. 323 s.destroy(false) 324 } 325 s.pool.decNumInUse(context.Background()) 326} 327 328// destroy removes the session from its home session pool, healthcheck queue 329// and Cloud Spanner service. 330func (s *session) destroy(isExpire bool) bool { 331 // Remove s from session pool. 332 if !s.pool.remove(s, isExpire) { 333 return false 334 } 335 // Unregister s from healthcheck queue. 336 s.pool.hc.unregister(s) 337 // Remove s from Cloud Spanner service. 338 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) 339 defer cancel() 340 s.delete(ctx) 341 return true 342} 343 344func (s *session) delete(ctx context.Context) { 345 // Ignore the error because even if we fail to explicitly destroy the 346 // session, it will be eventually garbage collected by Cloud Spanner. 347 err := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.DeleteSessionRequest{Name: s.getID()}) 348 if err != nil { 349 logf(s.logger, "Failed to delete session %v. Error: %v", s.getID(), err) 350 } 351} 352 353// prepareForWrite prepares the session for write if it is not already in that 354// state. 355func (s *session) prepareForWrite(ctx context.Context) error { 356 if s.isWritePrepared() { 357 return nil 358 } 359 tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, s.md), s.getID(), s.client) 360 // Session not found should cause the session to be removed from the pool. 361 if isSessionNotFoundError(err) { 362 s.pool.remove(s, false) 363 s.pool.hc.unregister(s) 364 return err 365 } 366 // Enable/disable background preparing of write sessions depending on 367 // whether the BeginTransaction call succeeded. This will prevent the 368 // session pool workers from going into an infinite loop of trying to 369 // prepare sessions. Any subsequent successful BeginTransaction call from 370 // for example takeWriteSession will re-enable the background process. 371 s.pool.mu.Lock() 372 s.pool.disableBackgroundPrepareSessions = err != nil 373 s.pool.mu.Unlock() 374 if err != nil { 375 return err 376 } 377 s.setTransactionID(tx) 378 return nil 379} 380 381// SessionPoolConfig stores configurations of a session pool. 382type SessionPoolConfig struct { 383 // MaxOpened is the maximum number of opened sessions allowed by the session 384 // pool. If the client tries to open a session and there are already 385 // MaxOpened sessions, it will block until one becomes available or the 386 // context passed to the client method is canceled or times out. 387 // 388 // Defaults to NumChannels * 100. 389 MaxOpened uint64 390 391 // MinOpened is the minimum number of opened sessions that the session pool 392 // tries to maintain. Session pool won't continue to expire sessions if 393 // number of opened connections drops below MinOpened. However, if a session 394 // is found to be broken, it will still be evicted from the session pool, 395 // therefore it is posssible that the number of opened sessions drops below 396 // MinOpened. 397 // 398 // Defaults to 100. 399 MinOpened uint64 400 401 // MaxIdle is the maximum number of idle sessions, pool is allowed to keep. 402 // 403 // Defaults to 0. 404 MaxIdle uint64 405 406 // MaxBurst is the maximum number of concurrent session creation requests. 407 // 408 // Defaults to 10. 409 MaxBurst uint64 410 411 // incStep is the number of sessions to create in one batch when at least 412 // one more session is needed. 413 // 414 // Defaults to 25. 415 incStep uint64 416 417 // WriteSessions is the fraction of sessions we try to keep prepared for 418 // write. 419 // 420 // Defaults to 0.2. 421 WriteSessions float64 422 423 // HealthCheckWorkers is number of workers used by health checker for this 424 // pool. 425 // 426 // Defaults to 10. 427 HealthCheckWorkers int 428 429 // HealthCheckInterval is how often the health checker pings a session. 430 // 431 // Defaults to 5m. 432 HealthCheckInterval time.Duration 433 434 // TrackSessionHandles determines whether the session pool will keep track 435 // of the stacktrace of the goroutines that take sessions from the pool. 436 // This setting can be used to track down session leak problems. 437 // 438 // Defaults to false. 439 TrackSessionHandles bool 440 441 // healthCheckSampleInterval is how often the health checker samples live 442 // session (for use in maintaining session pool size). 443 // 444 // Defaults to 1m. 445 healthCheckSampleInterval time.Duration 446 447 // sessionLabels for the sessions created in the session pool. 448 sessionLabels map[string]string 449} 450 451// DefaultSessionPoolConfig is the default configuration for the session pool 452// that will be used for a Spanner client, unless the user supplies a specific 453// session pool config. 454var DefaultSessionPoolConfig = SessionPoolConfig{ 455 MinOpened: 100, 456 MaxOpened: numChannels * 100, 457 MaxBurst: 10, 458 incStep: 25, 459 WriteSessions: 0.2, 460 HealthCheckWorkers: 10, 461 HealthCheckInterval: healthCheckIntervalMins * time.Minute, 462} 463 464// errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set. 465func errMinOpenedGTMaxOpened(maxOpened, minOpened uint64) error { 466 return spannerErrorf(codes.InvalidArgument, 467 "require SessionPoolConfig.MaxOpened >= SessionPoolConfig.MinOpened, got %d and %d", maxOpened, minOpened) 468} 469 470// errWriteFractionOutOfRange returns error for 471// SessionPoolConfig.WriteFraction < 0 or SessionPoolConfig.WriteFraction > 1 472func errWriteFractionOutOfRange(writeFraction float64) error { 473 return spannerErrorf(codes.InvalidArgument, 474 "require SessionPoolConfig.WriteSessions >= 0.0 && SessionPoolConfig.WriteSessions <= 1.0, got %.2f", writeFraction) 475} 476 477// errHealthCheckWorkersNegative returns error for 478// SessionPoolConfig.HealthCheckWorkers < 0 479func errHealthCheckWorkersNegative(workers int) error { 480 return spannerErrorf(codes.InvalidArgument, 481 "require SessionPoolConfig.HealthCheckWorkers >= 0, got %d", workers) 482} 483 484// errHealthCheckIntervalNegative returns error for 485// SessionPoolConfig.HealthCheckInterval < 0 486func errHealthCheckIntervalNegative(interval time.Duration) error { 487 return spannerErrorf(codes.InvalidArgument, 488 "require SessionPoolConfig.HealthCheckInterval >= 0, got %v", interval) 489} 490 491// validate verifies that the SessionPoolConfig is good for use. 492func (spc *SessionPoolConfig) validate() error { 493 if spc.MinOpened > spc.MaxOpened && spc.MaxOpened > 0 { 494 return errMinOpenedGTMaxOpened(spc.MaxOpened, spc.MinOpened) 495 } 496 if spc.WriteSessions < 0.0 || spc.WriteSessions > 1.0 { 497 return errWriteFractionOutOfRange(spc.WriteSessions) 498 } 499 if spc.HealthCheckWorkers < 0 { 500 return errHealthCheckWorkersNegative(spc.HealthCheckWorkers) 501 } 502 if spc.HealthCheckInterval < 0 { 503 return errHealthCheckIntervalNegative(spc.HealthCheckInterval) 504 } 505 return nil 506} 507 508// sessionPool creates and caches Cloud Spanner sessions. 509type sessionPool struct { 510 // mu protects sessionPool from concurrent access. 511 mu sync.Mutex 512 // valid marks the validity of the session pool. 513 valid bool 514 // sc is used to create the sessions for the pool. 515 sc *sessionClient 516 // trackedSessionHandles contains all sessions handles that have been 517 // checked out of the pool. The list is only filled if TrackSessionHandles 518 // has been enabled. 519 trackedSessionHandles list.List 520 // idleList caches idle session IDs. Session IDs in this list can be 521 // allocated for use. 522 idleList list.List 523 // idleWriteList caches idle sessions which have been prepared for write. 524 idleWriteList list.List 525 // mayGetSession is for broadcasting that session retrival/creation may 526 // proceed. 527 mayGetSession chan struct{} 528 // sessionCreationError is the last error that occurred during session 529 // creation and is propagated to any waiters waiting for a session. 530 sessionCreationError error 531 // numOpened is the total number of open sessions from the session pool. 532 numOpened uint64 533 // createReqs is the number of ongoing session creation requests. 534 createReqs uint64 535 // prepareReqs is the number of ongoing session preparation request. 536 prepareReqs uint64 537 // numReadWaiters is the number of processes waiting for a read session to 538 // become available. 539 numReadWaiters uint64 540 // numWriteWaiters is the number of processes waiting for a write session 541 // to become available. 542 numWriteWaiters uint64 543 // disableBackgroundPrepareSessions indicates that the BeginTransaction 544 // call for a read/write transaction failed with a permanent error, such as 545 // PermissionDenied or `Database not found`. Further background calls to 546 // prepare sessions will be disabled. 547 disableBackgroundPrepareSessions bool 548 // configuration of the session pool. 549 SessionPoolConfig 550 // hc is the health checker 551 hc *healthChecker 552 // rand is a separately sourced random generator. 553 rand *rand.Rand 554 // numInUse is the number of sessions that are currently in use (checked out 555 // from the session pool). 556 numInUse uint64 557 // maxNumInUse is the maximum number of sessions in use concurrently in the 558 // current 10 minute interval. 559 maxNumInUse uint64 560 // lastResetTime is the start time of the window for recording maxNumInUse. 561 lastResetTime time.Time 562 // numReads is the number of sessions that are idle for reads. 563 numReads uint64 564 // numWrites is the number of sessions that are idle for writes. 565 numWrites uint64 566 567 // mw is the maintenance window containing statistics for the max number of 568 // sessions checked out of the pool during the last 10 minutes. 569 mw *maintenanceWindow 570 571 // tagMap is a map of all tags that are associated with the emitted metrics. 572 tagMap *tag.Map 573} 574 575// newSessionPool creates a new session pool. 576func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, error) { 577 if err := config.validate(); err != nil { 578 return nil, err 579 } 580 pool := &sessionPool{ 581 sc: sc, 582 valid: true, 583 mayGetSession: make(chan struct{}), 584 SessionPoolConfig: config, 585 mw: newMaintenanceWindow(config.MaxOpened), 586 rand: rand.New(rand.NewSource(time.Now().UnixNano())), 587 } 588 if config.HealthCheckWorkers == 0 { 589 // With 10 workers and assuming average latency of 5ms for 590 // BeginTransaction, we will be able to prepare 2000 tx/sec in advance. 591 // If the rate of takeWriteSession is more than that, it will degrade to 592 // doing BeginTransaction inline. 593 // 594 // TODO: consider resizing the worker pool dynamically according to the load. 595 config.HealthCheckWorkers = 10 596 } 597 if config.HealthCheckInterval == 0 { 598 config.HealthCheckInterval = healthCheckIntervalMins * time.Minute 599 } 600 if config.healthCheckSampleInterval == 0 { 601 config.healthCheckSampleInterval = time.Minute 602 } 603 604 _, instance, database, err := parseDatabaseName(sc.database) 605 if err != nil { 606 return nil, err 607 } 608 // Errors should not prevent initializing the session pool. 609 ctx, err := tag.New(context.Background(), 610 tag.Upsert(tagKeyClientID, sc.id), 611 tag.Upsert(tagKeyDatabase, database), 612 tag.Upsert(tagKeyInstance, instance), 613 tag.Upsert(tagKeyLibVersion, version.Repo), 614 ) 615 if err != nil { 616 logf(pool.sc.logger, "Failed to create tag map, error: %v", err) 617 } 618 pool.tagMap = tag.FromContext(ctx) 619 620 // On GCE VM, within the same region an healthcheck ping takes on average 621 // 10ms to finish, given a 5 minutes interval and 10 healthcheck workers, a 622 // healthChecker can effectively mantain 623 // 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions. 624 pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, config.healthCheckSampleInterval, pool) 625 626 // First initialize the pool before we indicate that the healthchecker is 627 // ready. This prevents the maintainer from starting before the pool has 628 // been initialized, which means that we guarantee that the initial 629 // sessions are created using BatchCreateSessions. 630 if config.MinOpened > 0 { 631 numSessions := minUint64(config.MinOpened, math.MaxInt32) 632 if err := pool.initPool(numSessions); err != nil { 633 return nil, err 634 } 635 } 636 pool.recordStat(context.Background(), MaxAllowedSessionsCount, int64(config.MaxOpened)) 637 close(pool.hc.ready) 638 return pool, nil 639} 640 641func (p *sessionPool) recordStat(ctx context.Context, m *stats.Int64Measure, n int64, tags ...tag.Tag) { 642 ctx = tag.NewContext(ctx, p.tagMap) 643 mutators := make([]tag.Mutator, len(tags)) 644 for i, t := range tags { 645 mutators[i] = tag.Upsert(t.Key, t.Value) 646 } 647 ctx, err := tag.New(ctx, mutators...) 648 if err != nil { 649 logf(p.sc.logger, "Failed to tag metrics, error: %v", err) 650 } 651 recordStat(ctx, m, n) 652} 653 654func (p *sessionPool) initPool(numSessions uint64) error { 655 p.mu.Lock() 656 defer p.mu.Unlock() 657 return p.growPoolLocked(numSessions, true) 658} 659 660func (p *sessionPool) growPoolLocked(numSessions uint64, distributeOverChannels bool) error { 661 // Take budget before the actual session creation. 662 numSessions = minUint64(numSessions, math.MaxInt32) 663 p.numOpened += uint64(numSessions) 664 p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) 665 p.createReqs += uint64(numSessions) 666 // Asynchronously create a batch of sessions for the pool. 667 return p.sc.batchCreateSessions(int32(numSessions), distributeOverChannels, p) 668} 669 670// sessionReady is executed by the SessionClient when a session has been 671// created and is ready to use. This method will add the new session to the 672// pool and decrease the number of sessions that is being created. 673func (p *sessionPool) sessionReady(s *session) { 674 p.mu.Lock() 675 defer p.mu.Unlock() 676 // Clear any session creation error. 677 p.sessionCreationError = nil 678 // Set this pool as the home pool of the session and register it with the 679 // health checker. 680 s.pool = p 681 p.hc.register(s) 682 p.createReqs-- 683 // Insert the session at a random position in the pool to prevent all 684 // sessions affiliated with a channel to be placed at sequentially in the 685 // pool. 686 if p.idleList.Len() > 0 { 687 pos := rand.Intn(p.idleList.Len()) 688 before := p.idleList.Front() 689 for i := 0; i < pos; i++ { 690 before = before.Next() 691 } 692 s.setIdleList(p.idleList.InsertBefore(s, before)) 693 } else { 694 s.setIdleList(p.idleList.PushBack(s)) 695 } 696 p.incNumReadsLocked(context.Background()) 697 // Notify other waiters blocking on session creation. 698 close(p.mayGetSession) 699 p.mayGetSession = make(chan struct{}) 700} 701 702// sessionCreationFailed is called by the SessionClient when the creation of one 703// or more requested sessions finished with an error. sessionCreationFailed will 704// decrease the number of sessions being created and notify any waiters that 705// the session creation failed. 706func (p *sessionPool) sessionCreationFailed(err error, numSessions int32) { 707 p.mu.Lock() 708 defer p.mu.Unlock() 709 p.createReqs -= uint64(numSessions) 710 p.numOpened -= uint64(numSessions) 711 p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) 712 // Notify other waiters blocking on session creation. 713 p.sessionCreationError = err 714 close(p.mayGetSession) 715 p.mayGetSession = make(chan struct{}) 716} 717 718// isValid checks if the session pool is still valid. 719func (p *sessionPool) isValid() bool { 720 if p == nil { 721 return false 722 } 723 p.mu.Lock() 724 defer p.mu.Unlock() 725 return p.valid 726} 727 728// close marks the session pool as closed. 729func (p *sessionPool) close() { 730 if p == nil { 731 return 732 } 733 p.mu.Lock() 734 if !p.valid { 735 p.mu.Unlock() 736 return 737 } 738 p.valid = false 739 p.mu.Unlock() 740 p.hc.close() 741 // destroy all the sessions 742 p.hc.mu.Lock() 743 allSessions := make([]*session, len(p.hc.queue.sessions)) 744 copy(allSessions, p.hc.queue.sessions) 745 p.hc.mu.Unlock() 746 for _, s := range allSessions { 747 s.destroy(false) 748 } 749} 750 751// errInvalidSessionPool is the error for using an invalid session pool. 752var errInvalidSessionPool = spannerErrorf(codes.InvalidArgument, "invalid session pool") 753 754// errGetSessionTimeout returns error for context timeout during 755// sessionPool.take(). 756var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canceled during getting session") 757 758// newSessionHandle creates a new session handle for the given session for this 759// session pool. The session handle will also hold a copy of the current call 760// stack if the session pool has been configured to track the call stacks of 761// sessions being checked out of the pool. 762func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) { 763 sh = &sessionHandle{session: s, checkoutTime: time.Now()} 764 if p.TrackSessionHandles { 765 p.mu.Lock() 766 sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh) 767 p.mu.Unlock() 768 sh.stack = debug.Stack() 769 } 770 return sh 771} 772 773// errGetSessionTimeout returns error for context timeout during 774// sessionPool.take(). 775func (p *sessionPool) errGetSessionTimeout(ctx context.Context) error { 776 var code codes.Code 777 if ctx.Err() == context.DeadlineExceeded { 778 code = codes.DeadlineExceeded 779 } else { 780 code = codes.Canceled 781 } 782 if p.TrackSessionHandles { 783 return p.errGetSessionTimeoutWithTrackedSessionHandles(code) 784 } 785 return p.errGetBasicSessionTimeout(code) 786} 787 788// errGetBasicSessionTimeout returns error for context timout during 789// sessionPool.take() without any tracked sessionHandles. 790func (p *sessionPool) errGetBasicSessionTimeout(code codes.Code) error { 791 return spannerErrorf(code, "timeout / context canceled during getting session.\n"+ 792 "Enable SessionPoolConfig.TrackSessionHandles if you suspect a session leak to get more information about the checked out sessions.") 793} 794 795// errGetSessionTimeoutWithTrackedSessionHandles returns error for context 796// timout during sessionPool.take() including a stacktrace of each checked out 797// session handle. 798func (p *sessionPool) errGetSessionTimeoutWithTrackedSessionHandles(code codes.Code) error { 799 err := spannerErrorf(code, "timeout / context canceled during getting session.") 800 err.(*Error).additionalInformation = p.getTrackedSessionHandleStacksLocked() 801 return err 802} 803 804// getTrackedSessionHandleStacksLocked returns a string containing the 805// stacktrace of all currently checked out sessions of the pool. This method 806// requires the caller to have locked p.mu. 807func (p *sessionPool) getTrackedSessionHandleStacksLocked() string { 808 p.mu.Lock() 809 defer p.mu.Unlock() 810 stackTraces := "" 811 i := 1 812 element := p.trackedSessionHandles.Front() 813 for element != nil { 814 sh := element.Value.(*sessionHandle) 815 sh.mu.Lock() 816 if sh.stack != nil { 817 stackTraces = fmt.Sprintf("%s\n\nSession %d checked out of pool at %s by goroutine:\n%s", stackTraces, i, sh.checkoutTime.Format(time.RFC3339), sh.stack) 818 } 819 sh.mu.Unlock() 820 element = element.Next() 821 i++ 822 } 823 return stackTraces 824} 825 826// shouldPrepareWriteLocked returns true if we should prepare more sessions for write. 827func (p *sessionPool) shouldPrepareWriteLocked() bool { 828 return !p.disableBackgroundPrepareSessions && float64(p.numOpened)*p.WriteSessions > float64(p.idleWriteList.Len()+int(p.prepareReqs)) 829} 830 831func (p *sessionPool) createSession(ctx context.Context) (*session, error) { 832 trace.TracePrintf(ctx, nil, "Creating a new session") 833 doneCreate := func(done bool) { 834 p.mu.Lock() 835 if !done { 836 // Session creation failed, give budget back. 837 p.numOpened-- 838 p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 839 } 840 p.createReqs-- 841 // Notify other waiters blocking on session creation. 842 close(p.mayGetSession) 843 p.mayGetSession = make(chan struct{}) 844 p.mu.Unlock() 845 } 846 s, err := p.sc.createSession(ctx) 847 if err != nil { 848 doneCreate(false) 849 // Should return error directly because of the previous retries on 850 // CreateSession RPC. 851 // If the error is a timeout, there is a chance that the session was 852 // created on the server but is not known to the session pool. This 853 // session will then be garbage collected by the server after 1 hour. 854 return nil, err 855 } 856 s.pool = p 857 p.hc.register(s) 858 doneCreate(true) 859 return s, nil 860} 861 862func (p *sessionPool) isHealthy(s *session) bool { 863 if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) { 864 if err := s.ping(); isSessionNotFoundError(err) { 865 // The session is already bad, continue to fetch/create a new one. 866 s.destroy(false) 867 return false 868 } 869 p.hc.scheduledHC(s) 870 } 871 return true 872} 873 874// take returns a cached session if there are available ones; if there isn't 875// any, it tries to allocate a new one. Session returned by take should be used 876// for read operations. 877func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) { 878 trace.TracePrintf(ctx, nil, "Acquiring a read-only session") 879 for { 880 var s *session 881 882 p.mu.Lock() 883 if !p.valid { 884 p.mu.Unlock() 885 return nil, errInvalidSessionPool 886 } 887 if p.idleList.Len() > 0 { 888 // Idle sessions are available, get one from the top of the idle 889 // list. 890 s = p.idleList.Remove(p.idleList.Front()).(*session) 891 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 892 "Acquired read-only session") 893 p.decNumReadsLocked(ctx) 894 } else if p.idleWriteList.Len() > 0 { 895 s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session) 896 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 897 "Acquired read-write session") 898 p.decNumWritesLocked(ctx) 899 } 900 if s != nil { 901 s.setIdleList(nil) 902 numCheckedOut := p.currSessionsCheckedOutLocked() 903 p.mu.Unlock() 904 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 905 // From here, session is no longer in idle list, so healthcheck 906 // workers won't destroy it. If healthcheck workers failed to 907 // schedule healthcheck for the session timely, do the check here. 908 // Because session check is still much cheaper than session 909 // creation, they should be reused as much as possible. 910 if !p.isHealthy(s) { 911 continue 912 } 913 p.incNumInUse(ctx) 914 return p.newSessionHandle(s), nil 915 } 916 917 // No session available. Start the creation of a new batch of sessions 918 // if that is allowed, and then wait for a session to come available. 919 if p.numReadWaiters+p.numWriteWaiters >= p.createReqs { 920 numSessions := minUint64(p.MaxOpened-p.numOpened, p.incStep) 921 if err := p.growPoolLocked(numSessions, false); err != nil { 922 p.mu.Unlock() 923 return nil, err 924 } 925 } 926 927 p.numReadWaiters++ 928 mayGetSession := p.mayGetSession 929 p.mu.Unlock() 930 trace.TracePrintf(ctx, nil, "Waiting for read-only session to become available") 931 select { 932 case <-ctx.Done(): 933 trace.TracePrintf(ctx, nil, "Context done waiting for session") 934 p.recordStat(ctx, GetSessionTimeoutsCount, 1) 935 p.mu.Lock() 936 p.numReadWaiters-- 937 p.mu.Unlock() 938 return nil, p.errGetSessionTimeout(ctx) 939 case <-mayGetSession: 940 p.mu.Lock() 941 p.numReadWaiters-- 942 if p.sessionCreationError != nil { 943 trace.TracePrintf(ctx, nil, "Error creating session: %v", p.sessionCreationError) 944 err := p.sessionCreationError 945 p.mu.Unlock() 946 return nil, err 947 } 948 p.mu.Unlock() 949 } 950 } 951} 952 953// takeWriteSession returns a write prepared cached session if there are 954// available ones; if there isn't any, it tries to allocate a new one. Session 955// returned should be used for read write transactions. 956func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) { 957 trace.TracePrintf(ctx, nil, "Acquiring a read-write session") 958 for { 959 var ( 960 s *session 961 err error 962 ) 963 964 p.mu.Lock() 965 if !p.valid { 966 p.mu.Unlock() 967 return nil, errInvalidSessionPool 968 } 969 if p.idleWriteList.Len() > 0 { 970 // Idle sessions are available, get one from the top of the idle 971 // list. 972 s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session) 973 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-write session") 974 p.decNumWritesLocked(ctx) 975 } else if p.idleList.Len() > 0 { 976 s = p.idleList.Remove(p.idleList.Front()).(*session) 977 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-only session") 978 p.decNumReadsLocked(ctx) 979 } 980 if s != nil { 981 s.setIdleList(nil) 982 numCheckedOut := p.currSessionsCheckedOutLocked() 983 p.mu.Unlock() 984 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 985 // From here, session is no longer in idle list, so healthcheck 986 // workers won't destroy it. If healthcheck workers failed to 987 // schedule healthcheck for the session timely, do the check here. 988 // Because session check is still much cheaper than session 989 // creation, they should be reused as much as possible. 990 if !p.isHealthy(s) { 991 continue 992 } 993 } else { 994 // No session available. Start the creation of a new batch of sessions 995 // if that is allowed, and then wait for a session to come available. 996 if p.numReadWaiters+p.numWriteWaiters >= p.createReqs { 997 numSessions := minUint64(p.MaxOpened-p.numOpened, p.incStep) 998 if err := p.growPoolLocked(numSessions, false); err != nil { 999 p.mu.Unlock() 1000 return nil, err 1001 } 1002 } 1003 1004 p.numWriteWaiters++ 1005 mayGetSession := p.mayGetSession 1006 p.mu.Unlock() 1007 trace.TracePrintf(ctx, nil, "Waiting for read-write session to become available") 1008 select { 1009 case <-ctx.Done(): 1010 trace.TracePrintf(ctx, nil, "Context done waiting for session") 1011 p.recordStat(ctx, GetSessionTimeoutsCount, 1) 1012 p.mu.Lock() 1013 p.numWriteWaiters-- 1014 p.mu.Unlock() 1015 return nil, p.errGetSessionTimeout(ctx) 1016 case <-mayGetSession: 1017 p.mu.Lock() 1018 p.numWriteWaiters-- 1019 if p.sessionCreationError != nil { 1020 err := p.sessionCreationError 1021 p.mu.Unlock() 1022 return nil, err 1023 } 1024 p.mu.Unlock() 1025 } 1026 continue 1027 } 1028 if !s.isWritePrepared() { 1029 p.incNumBeingPrepared(ctx) 1030 defer p.decNumBeingPrepared(ctx) 1031 if err = s.prepareForWrite(ctx); err != nil { 1032 if isSessionNotFoundError(err) { 1033 s.destroy(false) 1034 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 1035 "Session not found for write") 1036 return nil, toSpannerError(err) 1037 } 1038 1039 s.recycle() 1040 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 1041 "Error preparing session for write") 1042 return nil, toSpannerError(err) 1043 } 1044 } 1045 p.incNumInUse(ctx) 1046 return p.newSessionHandle(s), nil 1047 } 1048} 1049 1050// recycle puts session s back to the session pool's idle list, it returns true 1051// if the session pool successfully recycles session s. 1052func (p *sessionPool) recycle(s *session) bool { 1053 p.mu.Lock() 1054 defer p.mu.Unlock() 1055 if !s.isValid() || !p.valid { 1056 // Reject the session if session is invalid or pool itself is invalid. 1057 return false 1058 } 1059 ctx := context.Background() 1060 // Put session at the top of the list to be handed out in LIFO order for load balancing 1061 // across channels. 1062 if s.isWritePrepared() { 1063 s.setIdleList(p.idleWriteList.PushFront(s)) 1064 p.incNumWritesLocked(ctx) 1065 } else { 1066 s.setIdleList(p.idleList.PushFront(s)) 1067 p.incNumReadsLocked(ctx) 1068 } 1069 // Broadcast that a session has been returned to idle list. 1070 close(p.mayGetSession) 1071 p.mayGetSession = make(chan struct{}) 1072 return true 1073} 1074 1075// remove atomically removes session s from the session pool and invalidates s. 1076// If isExpire == true, the removal is triggered by session expiration and in 1077// such cases, only idle sessions can be removed. 1078func (p *sessionPool) remove(s *session, isExpire bool) bool { 1079 p.mu.Lock() 1080 defer p.mu.Unlock() 1081 if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) { 1082 // Don't expire session if the session is not in idle list (in use), or 1083 // if number of open sessions is going below p.MinOpened. 1084 return false 1085 } 1086 ol := s.setIdleList(nil) 1087 ctx := context.Background() 1088 // If the session is in the idlelist, remove it. 1089 if ol != nil { 1090 // Remove from whichever list it is in. 1091 p.idleList.Remove(ol) 1092 p.idleWriteList.Remove(ol) 1093 if s.isWritePrepared() { 1094 p.decNumWritesLocked(ctx) 1095 } else { 1096 p.decNumReadsLocked(ctx) 1097 } 1098 } 1099 if s.invalidate() { 1100 // Decrease the number of opened sessions. 1101 p.numOpened-- 1102 p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 1103 // Broadcast that a session has been destroyed. 1104 close(p.mayGetSession) 1105 p.mayGetSession = make(chan struct{}) 1106 return true 1107 } 1108 return false 1109} 1110 1111func (p *sessionPool) currSessionsCheckedOutLocked() uint64 { 1112 return p.numOpened - uint64(p.idleList.Len()) - uint64(p.idleWriteList.Len()) 1113} 1114 1115func (p *sessionPool) incNumInUse(ctx context.Context) { 1116 p.mu.Lock() 1117 p.incNumInUseLocked(ctx) 1118 p.mu.Unlock() 1119} 1120 1121func (p *sessionPool) incNumInUseLocked(ctx context.Context) { 1122 p.numInUse++ 1123 p.recordStat(ctx, SessionsCount, int64(p.numInUse), tagNumInUseSessions) 1124 p.recordStat(ctx, AcquiredSessionsCount, 1) 1125 if p.numInUse > p.maxNumInUse { 1126 p.maxNumInUse = p.numInUse 1127 p.recordStat(ctx, MaxInUseSessionsCount, int64(p.maxNumInUse)) 1128 } 1129} 1130 1131func (p *sessionPool) decNumInUse(ctx context.Context) { 1132 p.mu.Lock() 1133 p.decNumInUseLocked(ctx) 1134 p.mu.Unlock() 1135} 1136 1137func (p *sessionPool) decNumInUseLocked(ctx context.Context) { 1138 p.numInUse-- 1139 p.recordStat(ctx, SessionsCount, int64(p.numInUse), tagNumInUseSessions) 1140 p.recordStat(ctx, ReleasedSessionsCount, 1) 1141} 1142 1143func (p *sessionPool) incNumReadsLocked(ctx context.Context) { 1144 p.numReads++ 1145 p.recordStat(ctx, SessionsCount, int64(p.numReads), tagNumReadSessions) 1146} 1147 1148func (p *sessionPool) decNumReadsLocked(ctx context.Context) { 1149 p.numReads-- 1150 p.recordStat(ctx, SessionsCount, int64(p.numReads), tagNumReadSessions) 1151} 1152 1153func (p *sessionPool) incNumWritesLocked(ctx context.Context) { 1154 p.numWrites++ 1155 p.recordStat(ctx, SessionsCount, int64(p.numWrites), tagNumWriteSessions) 1156} 1157 1158func (p *sessionPool) decNumWritesLocked(ctx context.Context) { 1159 p.numWrites-- 1160 p.recordStat(ctx, SessionsCount, int64(p.numWrites), tagNumWriteSessions) 1161} 1162 1163func (p *sessionPool) incNumBeingPrepared(ctx context.Context) { 1164 p.mu.Lock() 1165 p.incNumBeingPreparedLocked(ctx) 1166 p.mu.Unlock() 1167} 1168 1169func (p *sessionPool) incNumBeingPreparedLocked(ctx context.Context) { 1170 p.prepareReqs++ 1171 p.recordStat(ctx, SessionsCount, int64(p.prepareReqs), tagNumBeingPrepared) 1172} 1173 1174func (p *sessionPool) decNumBeingPrepared(ctx context.Context) { 1175 p.mu.Lock() 1176 p.decNumBeingPreparedLocked(ctx) 1177 p.mu.Unlock() 1178} 1179 1180func (p *sessionPool) decNumBeingPreparedLocked(ctx context.Context) { 1181 p.prepareReqs-- 1182 p.recordStat(ctx, SessionsCount, int64(p.prepareReqs), tagNumBeingPrepared) 1183} 1184 1185// hcHeap implements heap.Interface. It is used to create the priority queue for 1186// session healthchecks. 1187type hcHeap struct { 1188 sessions []*session 1189} 1190 1191// Len implements heap.Interface.Len. 1192func (h hcHeap) Len() int { 1193 return len(h.sessions) 1194} 1195 1196// Less implements heap.Interface.Less. 1197func (h hcHeap) Less(i, j int) bool { 1198 return h.sessions[i].getNextCheck().Before(h.sessions[j].getNextCheck()) 1199} 1200 1201// Swap implements heap.Interface.Swap. 1202func (h hcHeap) Swap(i, j int) { 1203 h.sessions[i], h.sessions[j] = h.sessions[j], h.sessions[i] 1204 h.sessions[i].setHcIndex(i) 1205 h.sessions[j].setHcIndex(j) 1206} 1207 1208// Push implements heap.Interface.Push. 1209func (h *hcHeap) Push(s interface{}) { 1210 ns := s.(*session) 1211 ns.setHcIndex(len(h.sessions)) 1212 h.sessions = append(h.sessions, ns) 1213} 1214 1215// Pop implements heap.Interface.Pop. 1216func (h *hcHeap) Pop() interface{} { 1217 old := h.sessions 1218 n := len(old) 1219 s := old[n-1] 1220 h.sessions = old[:n-1] 1221 s.setHcIndex(-1) 1222 return s 1223} 1224 1225// maintenanceWindowSize specifies the number of health check cycles that 1226// defines a maintenance window. The maintenance window keeps track of a 1227// rolling set of numbers for the number of maximum checked out sessions during 1228// the maintenance window. This is used by the maintainer to determine the 1229// number of sessions to create or delete at the end of each health check 1230// cycle. 1231const maintenanceWindowSize = 10 1232 1233// maintenanceWindow contains the statistics that are gathered during a health 1234// check maintenance window. 1235type maintenanceWindow struct { 1236 mu sync.Mutex 1237 // maxSessionsCheckedOut contains the maximum number of sessions that was 1238 // checked out of the session pool during a health check cycle. This number 1239 // indicates the number of sessions that was actually needed by the pool to 1240 // serve the load during that cycle. The values are kept as a rolling set 1241 // containing the values for the past 10 cycles (minutes). The maintainer 1242 // uses these values to determine the number of sessions to keep at the end 1243 // of each cycle. 1244 maxSessionsCheckedOut [maintenanceWindowSize]uint64 1245} 1246 1247// maxSessionsCheckedOutDuringWindow returns the maximum number of sessions 1248// that has been checked out during the last maintenance window of 10 cycles 1249// (minutes). 1250func (mw *maintenanceWindow) maxSessionsCheckedOutDuringWindow() uint64 { 1251 mw.mu.Lock() 1252 defer mw.mu.Unlock() 1253 var max uint64 1254 for _, cycleMax := range mw.maxSessionsCheckedOut { 1255 max = maxUint64(max, cycleMax) 1256 } 1257 return max 1258} 1259 1260// updateMaxSessionsCheckedOutDuringWindow updates the maximum number of 1261// sessions that has been checked out of the pool during the current 1262// cycle of the maintenance window. A maintenance window consists of 10 1263// maintenance cycles. Each cycle keeps track of the max number of sessions in 1264// use during that cycle. The rolling maintenance window of 10 cycles is used 1265// to determine the number of sessions to keep at the end of a cycle by 1266// calculating the max in use during the last 10 cycles. 1267func (mw *maintenanceWindow) updateMaxSessionsCheckedOutDuringWindow(currNumSessionsCheckedOut uint64) { 1268 mw.mu.Lock() 1269 defer mw.mu.Unlock() 1270 mw.maxSessionsCheckedOut[0] = maxUint64(currNumSessionsCheckedOut, mw.maxSessionsCheckedOut[0]) 1271} 1272 1273// startNewCycle starts a new health check cycle with the specified number of 1274// checked out sessions as its initial value. 1275func (mw *maintenanceWindow) startNewCycle(currNumSessionsCheckedOut uint64) { 1276 mw.mu.Lock() 1277 defer mw.mu.Unlock() 1278 copy(mw.maxSessionsCheckedOut[1:], mw.maxSessionsCheckedOut[:9]) 1279 mw.maxSessionsCheckedOut[0] = currNumSessionsCheckedOut 1280} 1281 1282// newMaintenanceWindow creates a new maintenance window with all values for 1283// maxSessionsCheckedOut set to maxOpened. This ensures that a complete 1284// maintenance window must pass before the maintainer will start to delete any 1285// sessions. 1286func newMaintenanceWindow(maxOpened uint64) *maintenanceWindow { 1287 mw := &maintenanceWindow{} 1288 // Initialize the rolling window with max values to prevent the maintainer 1289 // from deleting sessions before a complete window of 10 cycles has 1290 // finished. 1291 for i := 0; i < maintenanceWindowSize; i++ { 1292 mw.maxSessionsCheckedOut[i] = maxOpened 1293 } 1294 return mw 1295} 1296 1297// healthChecker performs periodical healthchecks on registered sessions. 1298type healthChecker struct { 1299 // mu protects concurrent access to healthChecker. 1300 mu sync.Mutex 1301 // queue is the priority queue for session healthchecks. Sessions with lower 1302 // nextCheck rank higher in the queue. 1303 queue hcHeap 1304 // interval is the average interval between two healthchecks on a session. 1305 interval time.Duration 1306 // workers is the number of concurrent healthcheck workers. 1307 workers int 1308 // waitWorkers waits for all healthcheck workers to exit 1309 waitWorkers sync.WaitGroup 1310 // pool is the underlying session pool. 1311 pool *sessionPool 1312 // sampleInterval is the interval of sampling by the maintainer. 1313 sampleInterval time.Duration 1314 // ready is used to signal that maintainer can start running. 1315 ready chan struct{} 1316 // done is used to signal that health checker should be closed. 1317 done chan struct{} 1318 // once is used for closing channel done only once. 1319 once sync.Once 1320 maintainerCancel func() 1321} 1322 1323// newHealthChecker initializes new instance of healthChecker. 1324func newHealthChecker(interval time.Duration, workers int, sampleInterval time.Duration, pool *sessionPool) *healthChecker { 1325 if workers <= 0 { 1326 workers = 1 1327 } 1328 hc := &healthChecker{ 1329 interval: interval, 1330 workers: workers, 1331 pool: pool, 1332 sampleInterval: sampleInterval, 1333 ready: make(chan struct{}), 1334 done: make(chan struct{}), 1335 maintainerCancel: func() {}, 1336 } 1337 hc.waitWorkers.Add(1) 1338 go hc.maintainer() 1339 for i := 1; i <= hc.workers; i++ { 1340 hc.waitWorkers.Add(1) 1341 go hc.worker(i) 1342 } 1343 return hc 1344} 1345 1346// close closes the healthChecker and waits for all healthcheck workers to exit. 1347func (hc *healthChecker) close() { 1348 hc.mu.Lock() 1349 hc.maintainerCancel() 1350 hc.mu.Unlock() 1351 hc.once.Do(func() { close(hc.done) }) 1352 hc.waitWorkers.Wait() 1353} 1354 1355// isClosing checks if a healthChecker is already closing. 1356func (hc *healthChecker) isClosing() bool { 1357 select { 1358 case <-hc.done: 1359 return true 1360 default: 1361 return false 1362 } 1363} 1364 1365// getInterval gets the healthcheck interval. 1366func (hc *healthChecker) getInterval() time.Duration { 1367 hc.mu.Lock() 1368 defer hc.mu.Unlock() 1369 return hc.interval 1370} 1371 1372// scheduledHCLocked schedules next healthcheck on session s with the assumption 1373// that hc.mu is being held. 1374func (hc *healthChecker) scheduledHCLocked(s *session) { 1375 var constPart, randPart float64 1376 if !s.firstHCDone { 1377 // The first check will be scheduled in a large range to make requests 1378 // more evenly distributed. The first healthcheck will be scheduled 1379 // after [interval*0.2, interval*1.1) ns. 1380 constPart = float64(hc.interval) * 0.2 1381 randPart = hc.pool.rand.Float64() * float64(hc.interval) * 0.9 1382 s.firstHCDone = true 1383 } else { 1384 // The next healthcheck will be scheduled after 1385 // [interval*0.9, interval*1.1) ns. 1386 constPart = float64(hc.interval) * 0.9 1387 randPart = hc.pool.rand.Float64() * float64(hc.interval) * 0.2 1388 } 1389 // math.Ceil makes the value to be at least 1 ns. 1390 nsFromNow := int64(math.Ceil(constPart + randPart)) 1391 s.setNextCheck(time.Now().Add(time.Duration(nsFromNow))) 1392 if hi := s.getHcIndex(); hi != -1 { 1393 // Session is still being tracked by healthcheck workers. 1394 heap.Fix(&hc.queue, hi) 1395 } 1396} 1397 1398// scheduledHC schedules next healthcheck on session s. It is safe to be called 1399// concurrently. 1400func (hc *healthChecker) scheduledHC(s *session) { 1401 hc.mu.Lock() 1402 defer hc.mu.Unlock() 1403 hc.scheduledHCLocked(s) 1404} 1405 1406// register registers a session with healthChecker for periodical healthcheck. 1407func (hc *healthChecker) register(s *session) { 1408 hc.mu.Lock() 1409 defer hc.mu.Unlock() 1410 hc.scheduledHCLocked(s) 1411 heap.Push(&hc.queue, s) 1412} 1413 1414// unregister unregisters a session from healthcheck queue. 1415func (hc *healthChecker) unregister(s *session) { 1416 hc.mu.Lock() 1417 defer hc.mu.Unlock() 1418 oi := s.setHcIndex(-1) 1419 if oi >= 0 { 1420 heap.Remove(&hc.queue, oi) 1421 } 1422} 1423 1424// markDone marks that health check for session has been performed. 1425func (hc *healthChecker) markDone(s *session) { 1426 hc.mu.Lock() 1427 defer hc.mu.Unlock() 1428 s.checkingHealth = false 1429} 1430 1431// healthCheck checks the health of the session and pings it if needed. 1432func (hc *healthChecker) healthCheck(s *session) { 1433 defer hc.markDone(s) 1434 if !s.pool.isValid() { 1435 // Session pool is closed, perform a garbage collection. 1436 s.destroy(false) 1437 return 1438 } 1439 if err := s.ping(); isSessionNotFoundError(err) { 1440 // Ping failed, destroy the session. 1441 s.destroy(false) 1442 } 1443} 1444 1445// worker performs the healthcheck on sessions in healthChecker's priority 1446// queue. 1447func (hc *healthChecker) worker(i int) { 1448 // Returns a session which we should ping to keep it alive. 1449 getNextForPing := func() *session { 1450 hc.pool.mu.Lock() 1451 defer hc.pool.mu.Unlock() 1452 hc.mu.Lock() 1453 defer hc.mu.Unlock() 1454 if hc.queue.Len() <= 0 { 1455 // Queue is empty. 1456 return nil 1457 } 1458 s := hc.queue.sessions[0] 1459 if s.getNextCheck().After(time.Now()) && hc.pool.valid { 1460 // All sessions have been checked recently. 1461 return nil 1462 } 1463 hc.scheduledHCLocked(s) 1464 if !s.checkingHealth { 1465 s.checkingHealth = true 1466 return s 1467 } 1468 return nil 1469 } 1470 1471 // Returns a session which we should prepare for write. 1472 getNextForTx := func() *session { 1473 hc.pool.mu.Lock() 1474 defer hc.pool.mu.Unlock() 1475 if hc.pool.shouldPrepareWriteLocked() { 1476 if hc.pool.idleList.Len() > 0 && hc.pool.valid { 1477 hc.mu.Lock() 1478 defer hc.mu.Unlock() 1479 if hc.pool.idleList.Front().Value.(*session).checkingHealth { 1480 return nil 1481 } 1482 session := hc.pool.idleList.Remove(hc.pool.idleList.Front()).(*session) 1483 ctx := context.Background() 1484 hc.pool.decNumReadsLocked(ctx) 1485 session.checkingHealth = true 1486 hc.pool.incNumBeingPreparedLocked(ctx) 1487 return session 1488 } 1489 } 1490 return nil 1491 } 1492 1493 for { 1494 if hc.isClosing() { 1495 // Exit when the pool has been closed and all sessions have been 1496 // destroyed or when health checker has been closed. 1497 hc.waitWorkers.Done() 1498 return 1499 } 1500 ws := getNextForTx() 1501 if ws != nil { 1502 ctx, cancel := context.WithTimeout(context.Background(), time.Minute) 1503 err := ws.prepareForWrite(ctx) 1504 if err != nil { 1505 // Skip handling prepare error, session can be prepared in next 1506 // cycle. 1507 // Don't log about permission errors, which may be expected 1508 // (e.g. using read-only auth). 1509 serr := toSpannerError(err).(*Error) 1510 if serr.Code != codes.PermissionDenied { 1511 logf(hc.pool.sc.logger, "Failed to prepare session, error: %v", serr) 1512 } 1513 } 1514 hc.pool.recycle(ws) 1515 hc.pool.mu.Lock() 1516 hc.pool.decNumBeingPreparedLocked(ctx) 1517 hc.pool.mu.Unlock() 1518 cancel() 1519 hc.markDone(ws) 1520 } 1521 rs := getNextForPing() 1522 if rs == nil { 1523 if ws == nil { 1524 // No work to be done so sleep to avoid burning CPU. 1525 pause := int64(100 * time.Millisecond) 1526 if pause > int64(hc.interval) { 1527 pause = int64(hc.interval) 1528 } 1529 select { 1530 case <-time.After(time.Duration(rand.Int63n(pause) + pause/2)): 1531 case <-hc.done: 1532 } 1533 1534 } 1535 continue 1536 } 1537 hc.healthCheck(rs) 1538 } 1539} 1540 1541// maintainer maintains the number of sessions in the pool based on the session 1542// pool configuration and the current and historical number of sessions checked 1543// out of the pool. The maintainer will: 1544// 1. Ensure that the session pool contains at least MinOpened sessions. 1545// 2. If the current number of sessions in the pool exceeds the greatest number 1546// of checked out sessions (=sessions in use) during the last 10 minutes, 1547// and the delta is larger than MaxIdleSessions, the maintainer will reduce 1548// the number of sessions to maxSessionsInUseDuringWindow+MaxIdleSessions. 1549func (hc *healthChecker) maintainer() { 1550 // Wait until the pool is ready. 1551 <-hc.ready 1552 1553 for iteration := uint64(0); ; iteration++ { 1554 if hc.isClosing() { 1555 hc.waitWorkers.Done() 1556 return 1557 } 1558 1559 hc.pool.mu.Lock() 1560 currSessionsOpened := hc.pool.numOpened 1561 maxIdle := hc.pool.MaxIdle 1562 minOpened := hc.pool.MinOpened 1563 1564 // Reset the start time for recording the maximum number of sessions 1565 // in the pool. 1566 now := time.Now() 1567 if now.After(hc.pool.lastResetTime.Add(10 * time.Minute)) { 1568 hc.pool.maxNumInUse = hc.pool.numInUse 1569 hc.pool.recordStat(context.Background(), MaxInUseSessionsCount, int64(hc.pool.maxNumInUse)) 1570 hc.pool.lastResetTime = now 1571 } 1572 hc.pool.mu.Unlock() 1573 // Get the maximum number of sessions in use during the current 1574 // maintenance window. 1575 maxSessionsInUseDuringWindow := hc.pool.mw.maxSessionsCheckedOutDuringWindow() 1576 hc.mu.Lock() 1577 ctx, cancel := context.WithTimeout(context.Background(), hc.sampleInterval) 1578 hc.maintainerCancel = cancel 1579 hc.mu.Unlock() 1580 1581 // Grow or shrink pool if needed. 1582 // The number of sessions in the pool should be in the range 1583 // [Config.MinOpened, Config.MaxIdle+maxSessionsInUseDuringWindow] 1584 if currSessionsOpened < minOpened { 1585 if err := hc.growPoolInBatch(ctx, minOpened); err != nil { 1586 logf(hc.pool.sc.logger, "failed to grow pool: %v", err) 1587 } 1588 } else if maxIdle+maxSessionsInUseDuringWindow < currSessionsOpened { 1589 hc.shrinkPool(ctx, maxIdle+maxSessionsInUseDuringWindow) 1590 } 1591 1592 select { 1593 case <-ctx.Done(): 1594 case <-hc.done: 1595 cancel() 1596 } 1597 // Cycle the maintenance window. This will remove the oldest cycle and 1598 // add a new cycle at the beginning of the maintenance window with the 1599 // currently checked out number of sessions as the max number of 1600 // sessions in use in this cycle. This value will be increased during 1601 // the next cycle if it increases. 1602 hc.pool.mu.Lock() 1603 currSessionsInUse := hc.pool.currSessionsCheckedOutLocked() 1604 hc.pool.mu.Unlock() 1605 hc.pool.mw.startNewCycle(currSessionsInUse) 1606 } 1607} 1608 1609func (hc *healthChecker) growPoolInBatch(ctx context.Context, growToNumSessions uint64) error { 1610 hc.pool.mu.Lock() 1611 defer hc.pool.mu.Unlock() 1612 numSessions := growToNumSessions - hc.pool.numOpened 1613 return hc.pool.growPoolLocked(numSessions, false) 1614} 1615 1616// shrinkPool scales down the session pool. The method will stop deleting 1617// sessions when shrinkToNumSessions number of sessions in the pool has 1618// been reached. The method will also stop deleting sessions if it detects that 1619// another process has started creating sessions for the pool again, for 1620// example through the take() method. 1621func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uint64) { 1622 hc.pool.mu.Lock() 1623 maxSessionsToDelete := int(hc.pool.numOpened - shrinkToNumSessions) 1624 hc.pool.mu.Unlock() 1625 var deleted int 1626 var prevNumOpened uint64 = math.MaxUint64 1627 for { 1628 if ctx.Err() != nil { 1629 return 1630 } 1631 1632 p := hc.pool 1633 p.mu.Lock() 1634 // Check if the number of open sessions has increased. If it has, we 1635 // should stop deleting sessions, as the load has increased and 1636 // additional sessions are needed. 1637 if p.numOpened >= prevNumOpened { 1638 p.mu.Unlock() 1639 break 1640 } 1641 prevNumOpened = p.numOpened 1642 1643 // Check on both whether we have reached the number of open sessions as 1644 // well as the number of sessions to delete, in case sessions have been 1645 // deleted by other methods because they have expired or deemed 1646 // invalid. 1647 if shrinkToNumSessions >= p.numOpened || deleted >= maxSessionsToDelete { 1648 p.mu.Unlock() 1649 break 1650 } 1651 1652 var s *session 1653 if p.idleList.Len() > 0 { 1654 s = p.idleList.Front().Value.(*session) 1655 } else if p.idleWriteList.Len() > 0 { 1656 s = p.idleWriteList.Front().Value.(*session) 1657 } 1658 p.mu.Unlock() 1659 if s != nil { 1660 deleted++ 1661 // destroy session as expire. 1662 s.destroy(true) 1663 } else { 1664 break 1665 } 1666 } 1667} 1668 1669// maxUint64 returns the maximum of two uint64. 1670func maxUint64(a, b uint64) uint64 { 1671 if a > b { 1672 return a 1673 } 1674 return b 1675} 1676 1677// minUint64 returns the minimum of two uint64. 1678func minUint64(a, b uint64) uint64 { 1679 if a > b { 1680 return b 1681 } 1682 return a 1683} 1684 1685// sessionResourceType is the type name of Spanner sessions. 1686const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session" 1687 1688// isSessionNotFoundError returns true if the given error is a 1689// `Session not found` error. 1690func isSessionNotFoundError(err error) bool { 1691 if err == nil { 1692 return false 1693 } 1694 if ErrCode(err) == codes.NotFound { 1695 if rt, ok := extractResourceType(err); ok { 1696 return rt == sessionResourceType 1697 } 1698 } 1699 return false 1700} 1701