1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "container/heap" 21 "container/list" 22 "context" 23 "fmt" 24 "log" 25 "math" 26 "math/rand" 27 "runtime/debug" 28 "sync" 29 "time" 30 31 "cloud.google.com/go/internal/trace" 32 "cloud.google.com/go/internal/version" 33 vkit "cloud.google.com/go/spanner/apiv1" 34 "go.opencensus.io/stats" 35 "go.opencensus.io/tag" 36 octrace "go.opencensus.io/trace" 37 sppb "google.golang.org/genproto/googleapis/spanner/v1" 38 "google.golang.org/grpc/codes" 39 "google.golang.org/grpc/metadata" 40) 41 42const healthCheckIntervalMins = 50 43 44// sessionHandle is an interface for transactions to access Cloud Spanner 45// sessions safely. It is generated by sessionPool.take(). 46type sessionHandle struct { 47 // mu guarantees that the inner session object is returned / destroyed only 48 // once. 49 mu sync.Mutex 50 // session is a pointer to a session object. Transactions never need to 51 // access it directly. 52 session *session 53 // checkoutTime is the time the session was checked out of the pool. 54 checkoutTime time.Time 55 // trackedSessionHandle is the linked list node which links the session to 56 // the list of tracked session handles. trackedSessionHandle is only set if 57 // TrackSessionHandles has been enabled in the session pool configuration. 58 trackedSessionHandle *list.Element 59 // stack is the call stack of the goroutine that checked out the session 60 // from the pool. This can be used to track down session leak problems. 61 stack []byte 62} 63 64// recycle gives the inner session object back to its home session pool. It is 65// safe to call recycle multiple times but only the first one would take effect. 66func (sh *sessionHandle) recycle() { 67 sh.mu.Lock() 68 if sh.session == nil { 69 // sessionHandle has already been recycled. 70 sh.mu.Unlock() 71 return 72 } 73 p := sh.session.pool 74 tracked := sh.trackedSessionHandle 75 sh.session.recycle() 76 sh.session = nil 77 sh.trackedSessionHandle = nil 78 sh.checkoutTime = time.Time{} 79 sh.stack = nil 80 sh.mu.Unlock() 81 if tracked != nil { 82 p.mu.Lock() 83 p.trackedSessionHandles.Remove(tracked) 84 p.mu.Unlock() 85 } 86} 87 88// getID gets the Cloud Spanner session ID from the internal session object. 89// getID returns empty string if the sessionHandle is nil or the inner session 90// object has been released by recycle / destroy. 91func (sh *sessionHandle) getID() string { 92 sh.mu.Lock() 93 defer sh.mu.Unlock() 94 if sh.session == nil { 95 // sessionHandle has already been recycled/destroyed. 96 return "" 97 } 98 return sh.session.getID() 99} 100 101// getClient gets the Cloud Spanner RPC client associated with the session ID 102// in sessionHandle. 103func (sh *sessionHandle) getClient() *vkit.Client { 104 sh.mu.Lock() 105 defer sh.mu.Unlock() 106 if sh.session == nil { 107 return nil 108 } 109 return sh.session.client 110} 111 112// getMetadata returns the metadata associated with the session in sessionHandle. 113func (sh *sessionHandle) getMetadata() metadata.MD { 114 sh.mu.Lock() 115 defer sh.mu.Unlock() 116 if sh.session == nil { 117 return nil 118 } 119 return sh.session.md 120} 121 122// getTransactionID returns the transaction id in the session if available. 123func (sh *sessionHandle) getTransactionID() transactionID { 124 sh.mu.Lock() 125 defer sh.mu.Unlock() 126 if sh.session == nil { 127 return nil 128 } 129 return sh.session.tx 130} 131 132// destroy destroys the inner session object. It is safe to call destroy 133// multiple times and only the first call would attempt to 134// destroy the inner session object. 135func (sh *sessionHandle) destroy() { 136 sh.mu.Lock() 137 s := sh.session 138 if s == nil { 139 // sessionHandle has already been recycled. 140 sh.mu.Unlock() 141 return 142 } 143 tracked := sh.trackedSessionHandle 144 sh.session = nil 145 sh.trackedSessionHandle = nil 146 sh.checkoutTime = time.Time{} 147 sh.stack = nil 148 sh.mu.Unlock() 149 150 if tracked != nil { 151 p := s.pool 152 p.mu.Lock() 153 p.trackedSessionHandles.Remove(tracked) 154 p.mu.Unlock() 155 } 156 s.destroy(false) 157} 158 159// session wraps a Cloud Spanner session ID through which transactions are 160// created and executed. 161type session struct { 162 // client is the RPC channel to Cloud Spanner. It is set only once during 163 // session's creation. 164 client *vkit.Client 165 // id is the unique id of the session in Cloud Spanner. It is set only once 166 // during session's creation. 167 id string 168 // pool is the session's home session pool where it was created. It is set 169 // only once during session's creation. 170 pool *sessionPool 171 // createTime is the timestamp of the session's creation. It is set only 172 // once during session's creation. 173 createTime time.Time 174 // logger is the logger configured for the Spanner client that created the 175 // session. If nil, logging will be directed to the standard logger. 176 logger *log.Logger 177 178 // mu protects the following fields from concurrent access: both 179 // healthcheck workers and transactions can modify them. 180 mu sync.Mutex 181 // valid marks the validity of a session. 182 valid bool 183 // hcIndex is the index of the session inside the global healthcheck queue. 184 // If hcIndex < 0, session has been unregistered from the queue. 185 hcIndex int 186 // idleList is the linkedlist node which links the session to its home 187 // session pool's idle list. If idleList == nil, the 188 // session is not in idle list. 189 idleList *list.Element 190 // nextCheck is the timestamp of next scheduled healthcheck of the session. 191 // It is maintained by the global health checker. 192 nextCheck time.Time 193 // checkingHelath is true if currently this session is being processed by 194 // health checker. Must be modified under health checker lock. 195 checkingHealth bool 196 // md is the Metadata to be sent with each request. 197 md metadata.MD 198 // tx contains the transaction id if the session has been prepared for 199 // write. 200 tx transactionID 201 // firstHCDone indicates whether the first health check is done or not. 202 firstHCDone bool 203} 204 205// isValid returns true if the session is still valid for use. 206func (s *session) isValid() bool { 207 s.mu.Lock() 208 defer s.mu.Unlock() 209 return s.valid 210} 211 212// isWritePrepared returns true if the session is prepared for write. 213func (s *session) isWritePrepared() bool { 214 s.mu.Lock() 215 defer s.mu.Unlock() 216 return s.tx != nil 217} 218 219// String implements fmt.Stringer for session. 220func (s *session) String() string { 221 s.mu.Lock() 222 defer s.mu.Unlock() 223 return fmt.Sprintf("<id=%v, hcIdx=%v, idleList=%p, valid=%v, create=%v, nextcheck=%v>", 224 s.id, s.hcIndex, s.idleList, s.valid, s.createTime, s.nextCheck) 225} 226 227// ping verifies if the session is still alive in Cloud Spanner. 228func (s *session) ping() error { 229 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 230 defer cancel() 231 232 // Start parent span that doesn't record. 233 _, span := octrace.StartSpan(ctx, "cloud.google.com/go/spanner.ping", octrace.WithSampler(octrace.NeverSample())) 234 defer span.End() 235 236 // s.getID is safe even when s is invalid. 237 _, err := s.client.ExecuteSql(contextWithOutgoingMetadata(ctx, s.md), &sppb.ExecuteSqlRequest{ 238 Session: s.getID(), 239 Sql: "SELECT 1", 240 }) 241 return err 242} 243 244// setHcIndex atomically sets the session's index in the healthcheck queue and 245// returns the old index. 246func (s *session) setHcIndex(i int) int { 247 s.mu.Lock() 248 defer s.mu.Unlock() 249 oi := s.hcIndex 250 s.hcIndex = i 251 return oi 252} 253 254// setIdleList atomically sets the session's idle list link and returns the old 255// link. 256func (s *session) setIdleList(le *list.Element) *list.Element { 257 s.mu.Lock() 258 defer s.mu.Unlock() 259 old := s.idleList 260 s.idleList = le 261 return old 262} 263 264// invalidate marks a session as invalid and returns the old validity. 265func (s *session) invalidate() bool { 266 s.mu.Lock() 267 defer s.mu.Unlock() 268 ov := s.valid 269 s.valid = false 270 return ov 271} 272 273// setNextCheck sets the timestamp for next healthcheck on the session. 274func (s *session) setNextCheck(t time.Time) { 275 s.mu.Lock() 276 defer s.mu.Unlock() 277 s.nextCheck = t 278} 279 280// setTransactionID sets the transaction id in the session 281func (s *session) setTransactionID(tx transactionID) { 282 s.mu.Lock() 283 defer s.mu.Unlock() 284 s.tx = tx 285} 286 287// getID returns the session ID which uniquely identifies the session in Cloud 288// Spanner. 289func (s *session) getID() string { 290 s.mu.Lock() 291 defer s.mu.Unlock() 292 return s.id 293} 294 295// getHcIndex returns the session's index into the global healthcheck priority 296// queue. 297func (s *session) getHcIndex() int { 298 s.mu.Lock() 299 defer s.mu.Unlock() 300 return s.hcIndex 301} 302 303// getIdleList returns the session's link in its home session pool's idle list. 304func (s *session) getIdleList() *list.Element { 305 s.mu.Lock() 306 defer s.mu.Unlock() 307 return s.idleList 308} 309 310// getNextCheck returns the timestamp for next healthcheck on the session. 311func (s *session) getNextCheck() time.Time { 312 s.mu.Lock() 313 defer s.mu.Unlock() 314 return s.nextCheck 315} 316 317// recycle turns the session back to its home session pool. 318func (s *session) recycle() { 319 s.setTransactionID(nil) 320 if !s.pool.recycle(s) { 321 // s is rejected by its home session pool because it expired and the 322 // session pool currently has enough open sessions. 323 s.destroy(false) 324 } 325 s.pool.decNumInUse(context.Background()) 326} 327 328// destroy removes the session from its home session pool, healthcheck queue 329// and Cloud Spanner service. 330func (s *session) destroy(isExpire bool) bool { 331 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) 332 defer cancel() 333 return s.destroyWithContext(ctx, isExpire) 334} 335 336func (s *session) destroyWithContext(ctx context.Context, isExpire bool) bool { 337 // Remove s from session pool. 338 if !s.pool.remove(s, isExpire) { 339 return false 340 } 341 // Unregister s from healthcheck queue. 342 s.pool.hc.unregister(s) 343 // Remove s from Cloud Spanner service. 344 s.delete(ctx) 345 return true 346} 347 348func (s *session) delete(ctx context.Context) { 349 // Ignore the error because even if we fail to explicitly destroy the 350 // session, it will be eventually garbage collected by Cloud Spanner. 351 err := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.DeleteSessionRequest{Name: s.getID()}) 352 // Do not log DeadlineExceeded errors when deleting sessions, as these do 353 // not indicate anything the user can or should act upon. 354 if err != nil && ErrCode(err) != codes.DeadlineExceeded { 355 logf(s.logger, "Failed to delete session %v. Error: %v", s.getID(), err) 356 } 357} 358 359// prepareForWrite prepares the session for write if it is not already in that 360// state. 361func (s *session) prepareForWrite(ctx context.Context) error { 362 if s.isWritePrepared() { 363 return nil 364 } 365 tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, s.md), s.getID(), s.client) 366 // Session not found should cause the session to be removed from the pool. 367 if isSessionNotFoundError(err) { 368 s.pool.remove(s, false) 369 s.pool.hc.unregister(s) 370 return err 371 } 372 // Enable/disable background preparing of write sessions depending on 373 // whether the BeginTransaction call succeeded. This will prevent the 374 // session pool workers from going into an infinite loop of trying to 375 // prepare sessions. Any subsequent successful BeginTransaction call from 376 // for example takeWriteSession will re-enable the background process. 377 s.pool.mu.Lock() 378 s.pool.disableBackgroundPrepareSessions = err != nil 379 s.pool.mu.Unlock() 380 if err != nil { 381 return err 382 } 383 s.setTransactionID(tx) 384 return nil 385} 386 387// SessionPoolConfig stores configurations of a session pool. 388type SessionPoolConfig struct { 389 // MaxOpened is the maximum number of opened sessions allowed by the session 390 // pool. If the client tries to open a session and there are already 391 // MaxOpened sessions, it will block until one becomes available or the 392 // context passed to the client method is canceled or times out. 393 // 394 // Defaults to NumChannels * 100. 395 MaxOpened uint64 396 397 // MinOpened is the minimum number of opened sessions that the session pool 398 // tries to maintain. Session pool won't continue to expire sessions if 399 // number of opened connections drops below MinOpened. However, if a session 400 // is found to be broken, it will still be evicted from the session pool, 401 // therefore it is posssible that the number of opened sessions drops below 402 // MinOpened. 403 // 404 // Defaults to 100. 405 MinOpened uint64 406 407 // MaxIdle is the maximum number of idle sessions that are allowed in the 408 // session pool. 409 // 410 // Defaults to 0. 411 MaxIdle uint64 412 413 // MaxBurst is the maximum number of concurrent session creation requests. 414 // 415 // Deprecated: MaxBurst exists for historical compatibility and should not 416 // be used. MaxBurst was used to limit the number of sessions that the 417 // session pool could create within a time frame. This was an early safety 418 // valve to prevent a client from overwhelming the backend if a large number 419 // of sessions was suddenly needed. The session pool would then pause the 420 // creation of sessions for a while. Such a pause is no longer needed and 421 // the implementation has been removed from the pool. 422 // 423 // Defaults to 10. 424 MaxBurst uint64 425 426 // incStep is the number of sessions to create in one batch when at least 427 // one more session is needed. 428 // 429 // Defaults to 25. 430 incStep uint64 431 432 // WriteSessions is the fraction of sessions we try to keep prepared for 433 // write. 434 // 435 // Defaults to 0.2. 436 WriteSessions float64 437 438 // HealthCheckWorkers is number of workers used by health checker for this 439 // pool. 440 // 441 // Defaults to 10. 442 HealthCheckWorkers int 443 444 // HealthCheckInterval is how often the health checker pings a session. 445 // 446 // Defaults to 50m. 447 HealthCheckInterval time.Duration 448 449 // TrackSessionHandles determines whether the session pool will keep track 450 // of the stacktrace of the goroutines that take sessions from the pool. 451 // This setting can be used to track down session leak problems. 452 // 453 // Defaults to false. 454 TrackSessionHandles bool 455 456 // healthCheckSampleInterval is how often the health checker samples live 457 // session (for use in maintaining session pool size). 458 // 459 // Defaults to 1m. 460 healthCheckSampleInterval time.Duration 461 462 // sessionLabels for the sessions created in the session pool. 463 sessionLabels map[string]string 464} 465 466// DefaultSessionPoolConfig is the default configuration for the session pool 467// that will be used for a Spanner client, unless the user supplies a specific 468// session pool config. 469var DefaultSessionPoolConfig = SessionPoolConfig{ 470 MinOpened: 100, 471 MaxOpened: numChannels * 100, 472 MaxBurst: 10, 473 incStep: 25, 474 WriteSessions: 0.2, 475 HealthCheckWorkers: 10, 476 HealthCheckInterval: healthCheckIntervalMins * time.Minute, 477} 478 479// errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set. 480func errMinOpenedGTMaxOpened(maxOpened, minOpened uint64) error { 481 return spannerErrorf(codes.InvalidArgument, 482 "require SessionPoolConfig.MaxOpened >= SessionPoolConfig.MinOpened, got %d and %d", maxOpened, minOpened) 483} 484 485// errWriteFractionOutOfRange returns error for 486// SessionPoolConfig.WriteFraction < 0 or SessionPoolConfig.WriteFraction > 1 487func errWriteFractionOutOfRange(writeFraction float64) error { 488 return spannerErrorf(codes.InvalidArgument, 489 "require SessionPoolConfig.WriteSessions >= 0.0 && SessionPoolConfig.WriteSessions <= 1.0, got %.2f", writeFraction) 490} 491 492// errHealthCheckWorkersNegative returns error for 493// SessionPoolConfig.HealthCheckWorkers < 0 494func errHealthCheckWorkersNegative(workers int) error { 495 return spannerErrorf(codes.InvalidArgument, 496 "require SessionPoolConfig.HealthCheckWorkers >= 0, got %d", workers) 497} 498 499// errHealthCheckIntervalNegative returns error for 500// SessionPoolConfig.HealthCheckInterval < 0 501func errHealthCheckIntervalNegative(interval time.Duration) error { 502 return spannerErrorf(codes.InvalidArgument, 503 "require SessionPoolConfig.HealthCheckInterval >= 0, got %v", interval) 504} 505 506// validate verifies that the SessionPoolConfig is good for use. 507func (spc *SessionPoolConfig) validate() error { 508 if spc.MinOpened > spc.MaxOpened && spc.MaxOpened > 0 { 509 return errMinOpenedGTMaxOpened(spc.MaxOpened, spc.MinOpened) 510 } 511 if spc.WriteSessions < 0.0 || spc.WriteSessions > 1.0 { 512 return errWriteFractionOutOfRange(spc.WriteSessions) 513 } 514 if spc.HealthCheckWorkers < 0 { 515 return errHealthCheckWorkersNegative(spc.HealthCheckWorkers) 516 } 517 if spc.HealthCheckInterval < 0 { 518 return errHealthCheckIntervalNegative(spc.HealthCheckInterval) 519 } 520 return nil 521} 522 523// sessionPool creates and caches Cloud Spanner sessions. 524type sessionPool struct { 525 // mu protects sessionPool from concurrent access. 526 mu sync.Mutex 527 // valid marks the validity of the session pool. 528 valid bool 529 // sc is used to create the sessions for the pool. 530 sc *sessionClient 531 // trackedSessionHandles contains all sessions handles that have been 532 // checked out of the pool. The list is only filled if TrackSessionHandles 533 // has been enabled. 534 trackedSessionHandles list.List 535 // idleList caches idle session IDs. Session IDs in this list can be 536 // allocated for use. 537 idleList list.List 538 // idleWriteList caches idle sessions which have been prepared for write. 539 idleWriteList list.List 540 // mayGetSession is for broadcasting that session retrival/creation may 541 // proceed. 542 mayGetSession chan struct{} 543 // sessionCreationError is the last error that occurred during session 544 // creation and is propagated to any waiters waiting for a session. 545 sessionCreationError error 546 // numOpened is the total number of open sessions from the session pool. 547 numOpened uint64 548 // createReqs is the number of ongoing session creation requests. 549 createReqs uint64 550 // prepareReqs is the number of ongoing session preparation request. 551 prepareReqs uint64 552 // numReadWaiters is the number of processes waiting for a read session to 553 // become available. 554 numReadWaiters uint64 555 // numWriteWaiters is the number of processes waiting for a write session 556 // to become available. 557 numWriteWaiters uint64 558 // disableBackgroundPrepareSessions indicates that the BeginTransaction 559 // call for a read/write transaction failed with a permanent error, such as 560 // PermissionDenied or `Database not found`. Further background calls to 561 // prepare sessions will be disabled. 562 disableBackgroundPrepareSessions bool 563 // configuration of the session pool. 564 SessionPoolConfig 565 // hc is the health checker 566 hc *healthChecker 567 // rand is a separately sourced random generator. 568 rand *rand.Rand 569 // numInUse is the number of sessions that are currently in use (checked out 570 // from the session pool). 571 numInUse uint64 572 // maxNumInUse is the maximum number of sessions in use concurrently in the 573 // current 10 minute interval. 574 maxNumInUse uint64 575 // lastResetTime is the start time of the window for recording maxNumInUse. 576 lastResetTime time.Time 577 // numReads is the number of sessions that are idle for reads. 578 numReads uint64 579 // numWrites is the number of sessions that are idle for writes. 580 numWrites uint64 581 582 // mw is the maintenance window containing statistics for the max number of 583 // sessions checked out of the pool during the last 10 minutes. 584 mw *maintenanceWindow 585 586 // tagMap is a map of all tags that are associated with the emitted metrics. 587 tagMap *tag.Map 588} 589 590// newSessionPool creates a new session pool. 591func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, error) { 592 if err := config.validate(); err != nil { 593 return nil, err 594 } 595 pool := &sessionPool{ 596 sc: sc, 597 valid: true, 598 mayGetSession: make(chan struct{}), 599 SessionPoolConfig: config, 600 mw: newMaintenanceWindow(config.MaxOpened), 601 rand: rand.New(rand.NewSource(time.Now().UnixNano())), 602 } 603 if config.HealthCheckWorkers == 0 { 604 // With 10 workers and assuming average latency of 5ms for 605 // BeginTransaction, we will be able to prepare 2000 tx/sec in advance. 606 // If the rate of takeWriteSession is more than that, it will degrade to 607 // doing BeginTransaction inline. 608 // 609 // TODO: consider resizing the worker pool dynamically according to the load. 610 config.HealthCheckWorkers = 10 611 } 612 if config.HealthCheckInterval == 0 { 613 config.HealthCheckInterval = healthCheckIntervalMins * time.Minute 614 } 615 if config.healthCheckSampleInterval == 0 { 616 config.healthCheckSampleInterval = time.Minute 617 } 618 619 _, instance, database, err := parseDatabaseName(sc.database) 620 if err != nil { 621 return nil, err 622 } 623 // Errors should not prevent initializing the session pool. 624 ctx, err := tag.New(context.Background(), 625 tag.Upsert(tagKeyClientID, sc.id), 626 tag.Upsert(tagKeyDatabase, database), 627 tag.Upsert(tagKeyInstance, instance), 628 tag.Upsert(tagKeyLibVersion, version.Repo), 629 ) 630 if err != nil { 631 logf(pool.sc.logger, "Failed to create tag map, error: %v", err) 632 } 633 pool.tagMap = tag.FromContext(ctx) 634 635 // On GCE VM, within the same region an healthcheck ping takes on average 636 // 10ms to finish, given a 5 minutes interval and 10 healthcheck workers, a 637 // healthChecker can effectively mantain 638 // 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions. 639 pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, config.healthCheckSampleInterval, pool) 640 641 // First initialize the pool before we indicate that the healthchecker is 642 // ready. This prevents the maintainer from starting before the pool has 643 // been initialized, which means that we guarantee that the initial 644 // sessions are created using BatchCreateSessions. 645 if config.MinOpened > 0 { 646 numSessions := minUint64(config.MinOpened, math.MaxInt32) 647 if err := pool.initPool(numSessions); err != nil { 648 return nil, err 649 } 650 } 651 pool.recordStat(context.Background(), MaxAllowedSessionsCount, int64(config.MaxOpened)) 652 close(pool.hc.ready) 653 return pool, nil 654} 655 656func (p *sessionPool) recordStat(ctx context.Context, m *stats.Int64Measure, n int64, tags ...tag.Tag) { 657 ctx = tag.NewContext(ctx, p.tagMap) 658 mutators := make([]tag.Mutator, len(tags)) 659 for i, t := range tags { 660 mutators[i] = tag.Upsert(t.Key, t.Value) 661 } 662 ctx, err := tag.New(ctx, mutators...) 663 if err != nil { 664 logf(p.sc.logger, "Failed to tag metrics, error: %v", err) 665 } 666 recordStat(ctx, m, n) 667} 668 669func (p *sessionPool) initPool(numSessions uint64) error { 670 p.mu.Lock() 671 defer p.mu.Unlock() 672 return p.growPoolLocked(numSessions, true) 673} 674 675func (p *sessionPool) growPoolLocked(numSessions uint64, distributeOverChannels bool) error { 676 // Take budget before the actual session creation. 677 numSessions = minUint64(numSessions, math.MaxInt32) 678 p.numOpened += uint64(numSessions) 679 p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) 680 p.createReqs += uint64(numSessions) 681 // Asynchronously create a batch of sessions for the pool. 682 return p.sc.batchCreateSessions(int32(numSessions), distributeOverChannels, p) 683} 684 685// sessionReady is executed by the SessionClient when a session has been 686// created and is ready to use. This method will add the new session to the 687// pool and decrease the number of sessions that is being created. 688func (p *sessionPool) sessionReady(s *session) { 689 p.mu.Lock() 690 defer p.mu.Unlock() 691 // Clear any session creation error. 692 p.sessionCreationError = nil 693 // Set this pool as the home pool of the session and register it with the 694 // health checker. 695 s.pool = p 696 p.hc.register(s) 697 p.createReqs-- 698 // Insert the session at a random position in the pool to prevent all 699 // sessions affiliated with a channel to be placed at sequentially in the 700 // pool. 701 if p.idleList.Len() > 0 { 702 pos := rand.Intn(p.idleList.Len()) 703 before := p.idleList.Front() 704 for i := 0; i < pos; i++ { 705 before = before.Next() 706 } 707 s.setIdleList(p.idleList.InsertBefore(s, before)) 708 } else { 709 s.setIdleList(p.idleList.PushBack(s)) 710 } 711 p.incNumReadsLocked(context.Background()) 712 // Notify other waiters blocking on session creation. 713 close(p.mayGetSession) 714 p.mayGetSession = make(chan struct{}) 715} 716 717// sessionCreationFailed is called by the SessionClient when the creation of one 718// or more requested sessions finished with an error. sessionCreationFailed will 719// decrease the number of sessions being created and notify any waiters that 720// the session creation failed. 721func (p *sessionPool) sessionCreationFailed(err error, numSessions int32) { 722 p.mu.Lock() 723 defer p.mu.Unlock() 724 p.createReqs -= uint64(numSessions) 725 p.numOpened -= uint64(numSessions) 726 p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) 727 // Notify other waiters blocking on session creation. 728 p.sessionCreationError = err 729 close(p.mayGetSession) 730 p.mayGetSession = make(chan struct{}) 731} 732 733// isValid checks if the session pool is still valid. 734func (p *sessionPool) isValid() bool { 735 if p == nil { 736 return false 737 } 738 p.mu.Lock() 739 defer p.mu.Unlock() 740 return p.valid 741} 742 743// close marks the session pool as closed and deletes all sessions in parallel. 744// Any errors that are returned by the Delete RPC are logged but otherwise 745// ignored, except for DeadlineExceeded errors, which are ignored and not 746// logged. 747func (p *sessionPool) close(ctx context.Context) { 748 if p == nil { 749 return 750 } 751 p.mu.Lock() 752 if !p.valid { 753 p.mu.Unlock() 754 return 755 } 756 p.valid = false 757 p.mu.Unlock() 758 p.hc.close() 759 // destroy all the sessions 760 p.hc.mu.Lock() 761 allSessions := make([]*session, len(p.hc.queue.sessions)) 762 copy(allSessions, p.hc.queue.sessions) 763 p.hc.mu.Unlock() 764 wg := sync.WaitGroup{} 765 for _, s := range allSessions { 766 wg.Add(1) 767 go deleteSession(ctx, s, &wg) 768 } 769 wg.Wait() 770} 771 772func deleteSession(ctx context.Context, s *session, wg *sync.WaitGroup) { 773 defer wg.Done() 774 s.destroyWithContext(ctx, false) 775} 776 777// errInvalidSessionPool is the error for using an invalid session pool. 778var errInvalidSessionPool = spannerErrorf(codes.InvalidArgument, "invalid session pool") 779 780// errGetSessionTimeout returns error for context timeout during 781// sessionPool.take(). 782var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canceled during getting session") 783 784// newSessionHandle creates a new session handle for the given session for this 785// session pool. The session handle will also hold a copy of the current call 786// stack if the session pool has been configured to track the call stacks of 787// sessions being checked out of the pool. 788func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) { 789 sh = &sessionHandle{session: s, checkoutTime: time.Now()} 790 if p.TrackSessionHandles { 791 p.mu.Lock() 792 sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh) 793 p.mu.Unlock() 794 sh.stack = debug.Stack() 795 } 796 return sh 797} 798 799// errGetSessionTimeout returns error for context timeout during 800// sessionPool.take(). 801func (p *sessionPool) errGetSessionTimeout(ctx context.Context) error { 802 var code codes.Code 803 if ctx.Err() == context.DeadlineExceeded { 804 code = codes.DeadlineExceeded 805 } else { 806 code = codes.Canceled 807 } 808 if p.TrackSessionHandles { 809 return p.errGetSessionTimeoutWithTrackedSessionHandles(code) 810 } 811 return p.errGetBasicSessionTimeout(code) 812} 813 814// errGetBasicSessionTimeout returns error for context timout during 815// sessionPool.take() without any tracked sessionHandles. 816func (p *sessionPool) errGetBasicSessionTimeout(code codes.Code) error { 817 return spannerErrorf(code, "timeout / context canceled during getting session.\n"+ 818 "Enable SessionPoolConfig.TrackSessionHandles if you suspect a session leak to get more information about the checked out sessions.") 819} 820 821// errGetSessionTimeoutWithTrackedSessionHandles returns error for context 822// timout during sessionPool.take() including a stacktrace of each checked out 823// session handle. 824func (p *sessionPool) errGetSessionTimeoutWithTrackedSessionHandles(code codes.Code) error { 825 err := spannerErrorf(code, "timeout / context canceled during getting session.") 826 err.(*Error).additionalInformation = p.getTrackedSessionHandleStacksLocked() 827 return err 828} 829 830// getTrackedSessionHandleStacksLocked returns a string containing the 831// stacktrace of all currently checked out sessions of the pool. This method 832// requires the caller to have locked p.mu. 833func (p *sessionPool) getTrackedSessionHandleStacksLocked() string { 834 p.mu.Lock() 835 defer p.mu.Unlock() 836 stackTraces := "" 837 i := 1 838 element := p.trackedSessionHandles.Front() 839 for element != nil { 840 sh := element.Value.(*sessionHandle) 841 sh.mu.Lock() 842 if sh.stack != nil { 843 stackTraces = fmt.Sprintf("%s\n\nSession %d checked out of pool at %s by goroutine:\n%s", stackTraces, i, sh.checkoutTime.Format(time.RFC3339), sh.stack) 844 } 845 sh.mu.Unlock() 846 element = element.Next() 847 i++ 848 } 849 return stackTraces 850} 851 852// shouldPrepareWriteLocked returns true if we should prepare more sessions for write. 853func (p *sessionPool) shouldPrepareWriteLocked() bool { 854 return !p.disableBackgroundPrepareSessions && float64(p.numOpened)*p.WriteSessions > float64(p.idleWriteList.Len()+int(p.prepareReqs)) 855} 856 857func (p *sessionPool) createSession(ctx context.Context) (*session, error) { 858 trace.TracePrintf(ctx, nil, "Creating a new session") 859 doneCreate := func(done bool) { 860 p.mu.Lock() 861 if !done { 862 // Session creation failed, give budget back. 863 p.numOpened-- 864 p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 865 } 866 p.createReqs-- 867 // Notify other waiters blocking on session creation. 868 close(p.mayGetSession) 869 p.mayGetSession = make(chan struct{}) 870 p.mu.Unlock() 871 } 872 s, err := p.sc.createSession(ctx) 873 if err != nil { 874 doneCreate(false) 875 // Should return error directly because of the previous retries on 876 // CreateSession RPC. 877 // If the error is a timeout, there is a chance that the session was 878 // created on the server but is not known to the session pool. This 879 // session will then be garbage collected by the server after 1 hour. 880 return nil, err 881 } 882 s.pool = p 883 p.hc.register(s) 884 doneCreate(true) 885 return s, nil 886} 887 888func (p *sessionPool) isHealthy(s *session) bool { 889 if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) { 890 if err := s.ping(); isSessionNotFoundError(err) { 891 // The session is already bad, continue to fetch/create a new one. 892 s.destroy(false) 893 return false 894 } 895 p.hc.scheduledHC(s) 896 } 897 return true 898} 899 900// take returns a cached session if there are available ones; if there isn't 901// any, it tries to allocate a new one. Session returned by take should be used 902// for read operations. 903func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) { 904 trace.TracePrintf(ctx, nil, "Acquiring a read-only session") 905 for { 906 var s *session 907 908 p.mu.Lock() 909 if !p.valid { 910 p.mu.Unlock() 911 return nil, errInvalidSessionPool 912 } 913 if p.idleList.Len() > 0 { 914 // Idle sessions are available, get one from the top of the idle 915 // list. 916 s = p.idleList.Remove(p.idleList.Front()).(*session) 917 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 918 "Acquired read-only session") 919 p.decNumReadsLocked(ctx) 920 } else if p.idleWriteList.Len() > 0 { 921 s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session) 922 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 923 "Acquired read-write session") 924 p.decNumWritesLocked(ctx) 925 } 926 if s != nil { 927 s.setIdleList(nil) 928 numCheckedOut := p.currSessionsCheckedOutLocked() 929 p.mu.Unlock() 930 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 931 // From here, session is no longer in idle list, so healthcheck 932 // workers won't destroy it. If healthcheck workers failed to 933 // schedule healthcheck for the session timely, do the check here. 934 // Because session check is still much cheaper than session 935 // creation, they should be reused as much as possible. 936 if !p.isHealthy(s) { 937 continue 938 } 939 p.incNumInUse(ctx) 940 return p.newSessionHandle(s), nil 941 } 942 943 // No session available. Start the creation of a new batch of sessions 944 // if that is allowed, and then wait for a session to come available. 945 if p.numReadWaiters+p.numWriteWaiters >= p.createReqs { 946 numSessions := minUint64(p.MaxOpened-p.numOpened, p.incStep) 947 if err := p.growPoolLocked(numSessions, false); err != nil { 948 p.mu.Unlock() 949 return nil, err 950 } 951 } 952 953 p.numReadWaiters++ 954 mayGetSession := p.mayGetSession 955 p.mu.Unlock() 956 trace.TracePrintf(ctx, nil, "Waiting for read-only session to become available") 957 select { 958 case <-ctx.Done(): 959 trace.TracePrintf(ctx, nil, "Context done waiting for session") 960 p.recordStat(ctx, GetSessionTimeoutsCount, 1) 961 p.mu.Lock() 962 p.numReadWaiters-- 963 p.mu.Unlock() 964 return nil, p.errGetSessionTimeout(ctx) 965 case <-mayGetSession: 966 p.mu.Lock() 967 p.numReadWaiters-- 968 if p.sessionCreationError != nil { 969 trace.TracePrintf(ctx, nil, "Error creating session: %v", p.sessionCreationError) 970 err := p.sessionCreationError 971 p.mu.Unlock() 972 return nil, err 973 } 974 p.mu.Unlock() 975 } 976 } 977} 978 979// takeWriteSession returns a write prepared cached session if there are 980// available ones; if there isn't any, it tries to allocate a new one. Session 981// returned should be used for read write transactions. 982func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) { 983 trace.TracePrintf(ctx, nil, "Acquiring a read-write session") 984 for { 985 var ( 986 s *session 987 err error 988 ) 989 990 p.mu.Lock() 991 if !p.valid { 992 p.mu.Unlock() 993 return nil, errInvalidSessionPool 994 } 995 if p.idleWriteList.Len() > 0 { 996 // Idle sessions are available, get one from the top of the idle 997 // list. 998 s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session) 999 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-write session") 1000 p.decNumWritesLocked(ctx) 1001 } else if p.idleList.Len() > 0 { 1002 s = p.idleList.Remove(p.idleList.Front()).(*session) 1003 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-only session") 1004 p.decNumReadsLocked(ctx) 1005 } 1006 if s != nil { 1007 s.setIdleList(nil) 1008 numCheckedOut := p.currSessionsCheckedOutLocked() 1009 p.mu.Unlock() 1010 p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) 1011 // From here, session is no longer in idle list, so healthcheck 1012 // workers won't destroy it. If healthcheck workers failed to 1013 // schedule healthcheck for the session timely, do the check here. 1014 // Because session check is still much cheaper than session 1015 // creation, they should be reused as much as possible. 1016 if !p.isHealthy(s) { 1017 continue 1018 } 1019 } else { 1020 // No session available. Start the creation of a new batch of sessions 1021 // if that is allowed, and then wait for a session to come available. 1022 if p.numReadWaiters+p.numWriteWaiters >= p.createReqs { 1023 numSessions := minUint64(p.MaxOpened-p.numOpened, p.incStep) 1024 if err := p.growPoolLocked(numSessions, false); err != nil { 1025 p.mu.Unlock() 1026 return nil, err 1027 } 1028 } 1029 1030 p.numWriteWaiters++ 1031 mayGetSession := p.mayGetSession 1032 p.mu.Unlock() 1033 trace.TracePrintf(ctx, nil, "Waiting for read-write session to become available") 1034 select { 1035 case <-ctx.Done(): 1036 trace.TracePrintf(ctx, nil, "Context done waiting for session") 1037 p.recordStat(ctx, GetSessionTimeoutsCount, 1) 1038 p.mu.Lock() 1039 p.numWriteWaiters-- 1040 p.mu.Unlock() 1041 return nil, p.errGetSessionTimeout(ctx) 1042 case <-mayGetSession: 1043 p.mu.Lock() 1044 p.numWriteWaiters-- 1045 if p.sessionCreationError != nil { 1046 err := p.sessionCreationError 1047 p.mu.Unlock() 1048 return nil, err 1049 } 1050 p.mu.Unlock() 1051 } 1052 continue 1053 } 1054 if !s.isWritePrepared() { 1055 p.incNumBeingPrepared(ctx) 1056 defer p.decNumBeingPrepared(ctx) 1057 if err = s.prepareForWrite(ctx); err != nil { 1058 if isSessionNotFoundError(err) { 1059 s.destroy(false) 1060 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 1061 "Session not found for write") 1062 return nil, ToSpannerError(err) 1063 } 1064 1065 s.recycle() 1066 trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, 1067 "Error preparing session for write") 1068 return nil, ToSpannerError(err) 1069 } 1070 } 1071 p.incNumInUse(ctx) 1072 return p.newSessionHandle(s), nil 1073 } 1074} 1075 1076// recycle puts session s back to the session pool's idle list, it returns true 1077// if the session pool successfully recycles session s. 1078func (p *sessionPool) recycle(s *session) bool { 1079 p.mu.Lock() 1080 defer p.mu.Unlock() 1081 if !s.isValid() || !p.valid { 1082 // Reject the session if session is invalid or pool itself is invalid. 1083 return false 1084 } 1085 ctx := context.Background() 1086 // Put session at the top of the list to be handed out in LIFO order for load balancing 1087 // across channels. 1088 if s.isWritePrepared() { 1089 s.setIdleList(p.idleWriteList.PushFront(s)) 1090 p.incNumWritesLocked(ctx) 1091 } else { 1092 s.setIdleList(p.idleList.PushFront(s)) 1093 p.incNumReadsLocked(ctx) 1094 } 1095 // Broadcast that a session has been returned to idle list. 1096 close(p.mayGetSession) 1097 p.mayGetSession = make(chan struct{}) 1098 return true 1099} 1100 1101// remove atomically removes session s from the session pool and invalidates s. 1102// If isExpire == true, the removal is triggered by session expiration and in 1103// such cases, only idle sessions can be removed. 1104func (p *sessionPool) remove(s *session, isExpire bool) bool { 1105 p.mu.Lock() 1106 defer p.mu.Unlock() 1107 if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) { 1108 // Don't expire session if the session is not in idle list (in use), or 1109 // if number of open sessions is going below p.MinOpened. 1110 return false 1111 } 1112 ol := s.setIdleList(nil) 1113 ctx := context.Background() 1114 // If the session is in the idlelist, remove it. 1115 if ol != nil { 1116 // Remove from whichever list it is in. 1117 p.idleList.Remove(ol) 1118 p.idleWriteList.Remove(ol) 1119 if s.isWritePrepared() { 1120 p.decNumWritesLocked(ctx) 1121 } else { 1122 p.decNumReadsLocked(ctx) 1123 } 1124 } 1125 if s.invalidate() { 1126 // Decrease the number of opened sessions. 1127 p.numOpened-- 1128 p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) 1129 // Broadcast that a session has been destroyed. 1130 close(p.mayGetSession) 1131 p.mayGetSession = make(chan struct{}) 1132 return true 1133 } 1134 return false 1135} 1136 1137func (p *sessionPool) currSessionsCheckedOutLocked() uint64 { 1138 return p.numOpened - uint64(p.idleList.Len()) - uint64(p.idleWriteList.Len()) 1139} 1140 1141func (p *sessionPool) incNumInUse(ctx context.Context) { 1142 p.mu.Lock() 1143 p.incNumInUseLocked(ctx) 1144 p.mu.Unlock() 1145} 1146 1147func (p *sessionPool) incNumInUseLocked(ctx context.Context) { 1148 p.numInUse++ 1149 p.recordStat(ctx, SessionsCount, int64(p.numInUse), tagNumInUseSessions) 1150 p.recordStat(ctx, AcquiredSessionsCount, 1) 1151 if p.numInUse > p.maxNumInUse { 1152 p.maxNumInUse = p.numInUse 1153 p.recordStat(ctx, MaxInUseSessionsCount, int64(p.maxNumInUse)) 1154 } 1155} 1156 1157func (p *sessionPool) decNumInUse(ctx context.Context) { 1158 p.mu.Lock() 1159 p.decNumInUseLocked(ctx) 1160 p.mu.Unlock() 1161} 1162 1163func (p *sessionPool) decNumInUseLocked(ctx context.Context) { 1164 p.numInUse-- 1165 p.recordStat(ctx, SessionsCount, int64(p.numInUse), tagNumInUseSessions) 1166 p.recordStat(ctx, ReleasedSessionsCount, 1) 1167} 1168 1169func (p *sessionPool) incNumReadsLocked(ctx context.Context) { 1170 p.numReads++ 1171 p.recordStat(ctx, SessionsCount, int64(p.numReads), tagNumReadSessions) 1172} 1173 1174func (p *sessionPool) decNumReadsLocked(ctx context.Context) { 1175 p.numReads-- 1176 p.recordStat(ctx, SessionsCount, int64(p.numReads), tagNumReadSessions) 1177} 1178 1179func (p *sessionPool) incNumWritesLocked(ctx context.Context) { 1180 p.numWrites++ 1181 p.recordStat(ctx, SessionsCount, int64(p.numWrites), tagNumWriteSessions) 1182} 1183 1184func (p *sessionPool) decNumWritesLocked(ctx context.Context) { 1185 p.numWrites-- 1186 p.recordStat(ctx, SessionsCount, int64(p.numWrites), tagNumWriteSessions) 1187} 1188 1189func (p *sessionPool) incNumBeingPrepared(ctx context.Context) { 1190 p.mu.Lock() 1191 p.incNumBeingPreparedLocked(ctx) 1192 p.mu.Unlock() 1193} 1194 1195func (p *sessionPool) incNumBeingPreparedLocked(ctx context.Context) { 1196 p.prepareReqs++ 1197 p.recordStat(ctx, SessionsCount, int64(p.prepareReqs), tagNumBeingPrepared) 1198} 1199 1200func (p *sessionPool) decNumBeingPrepared(ctx context.Context) { 1201 p.mu.Lock() 1202 p.decNumBeingPreparedLocked(ctx) 1203 p.mu.Unlock() 1204} 1205 1206func (p *sessionPool) decNumBeingPreparedLocked(ctx context.Context) { 1207 p.prepareReqs-- 1208 p.recordStat(ctx, SessionsCount, int64(p.prepareReqs), tagNumBeingPrepared) 1209} 1210 1211// hcHeap implements heap.Interface. It is used to create the priority queue for 1212// session healthchecks. 1213type hcHeap struct { 1214 sessions []*session 1215} 1216 1217// Len implements heap.Interface.Len. 1218func (h hcHeap) Len() int { 1219 return len(h.sessions) 1220} 1221 1222// Less implements heap.Interface.Less. 1223func (h hcHeap) Less(i, j int) bool { 1224 return h.sessions[i].getNextCheck().Before(h.sessions[j].getNextCheck()) 1225} 1226 1227// Swap implements heap.Interface.Swap. 1228func (h hcHeap) Swap(i, j int) { 1229 h.sessions[i], h.sessions[j] = h.sessions[j], h.sessions[i] 1230 h.sessions[i].setHcIndex(i) 1231 h.sessions[j].setHcIndex(j) 1232} 1233 1234// Push implements heap.Interface.Push. 1235func (h *hcHeap) Push(s interface{}) { 1236 ns := s.(*session) 1237 ns.setHcIndex(len(h.sessions)) 1238 h.sessions = append(h.sessions, ns) 1239} 1240 1241// Pop implements heap.Interface.Pop. 1242func (h *hcHeap) Pop() interface{} { 1243 old := h.sessions 1244 n := len(old) 1245 s := old[n-1] 1246 h.sessions = old[:n-1] 1247 s.setHcIndex(-1) 1248 return s 1249} 1250 1251// maintenanceWindowSize specifies the number of health check cycles that 1252// defines a maintenance window. The maintenance window keeps track of a 1253// rolling set of numbers for the number of maximum checked out sessions during 1254// the maintenance window. This is used by the maintainer to determine the 1255// number of sessions to create or delete at the end of each health check 1256// cycle. 1257const maintenanceWindowSize = 10 1258 1259// maintenanceWindow contains the statistics that are gathered during a health 1260// check maintenance window. 1261type maintenanceWindow struct { 1262 mu sync.Mutex 1263 // maxSessionsCheckedOut contains the maximum number of sessions that was 1264 // checked out of the session pool during a health check cycle. This number 1265 // indicates the number of sessions that was actually needed by the pool to 1266 // serve the load during that cycle. The values are kept as a rolling set 1267 // containing the values for the past 10 cycles (minutes). The maintainer 1268 // uses these values to determine the number of sessions to keep at the end 1269 // of each cycle. 1270 maxSessionsCheckedOut [maintenanceWindowSize]uint64 1271} 1272 1273// maxSessionsCheckedOutDuringWindow returns the maximum number of sessions 1274// that has been checked out during the last maintenance window of 10 cycles 1275// (minutes). 1276func (mw *maintenanceWindow) maxSessionsCheckedOutDuringWindow() uint64 { 1277 mw.mu.Lock() 1278 defer mw.mu.Unlock() 1279 var max uint64 1280 for _, cycleMax := range mw.maxSessionsCheckedOut { 1281 max = maxUint64(max, cycleMax) 1282 } 1283 return max 1284} 1285 1286// updateMaxSessionsCheckedOutDuringWindow updates the maximum number of 1287// sessions that has been checked out of the pool during the current 1288// cycle of the maintenance window. A maintenance window consists of 10 1289// maintenance cycles. Each cycle keeps track of the max number of sessions in 1290// use during that cycle. The rolling maintenance window of 10 cycles is used 1291// to determine the number of sessions to keep at the end of a cycle by 1292// calculating the max in use during the last 10 cycles. 1293func (mw *maintenanceWindow) updateMaxSessionsCheckedOutDuringWindow(currNumSessionsCheckedOut uint64) { 1294 mw.mu.Lock() 1295 defer mw.mu.Unlock() 1296 mw.maxSessionsCheckedOut[0] = maxUint64(currNumSessionsCheckedOut, mw.maxSessionsCheckedOut[0]) 1297} 1298 1299// startNewCycle starts a new health check cycle with the specified number of 1300// checked out sessions as its initial value. 1301func (mw *maintenanceWindow) startNewCycle(currNumSessionsCheckedOut uint64) { 1302 mw.mu.Lock() 1303 defer mw.mu.Unlock() 1304 copy(mw.maxSessionsCheckedOut[1:], mw.maxSessionsCheckedOut[:9]) 1305 mw.maxSessionsCheckedOut[0] = currNumSessionsCheckedOut 1306} 1307 1308// newMaintenanceWindow creates a new maintenance window with all values for 1309// maxSessionsCheckedOut set to maxOpened. This ensures that a complete 1310// maintenance window must pass before the maintainer will start to delete any 1311// sessions. 1312func newMaintenanceWindow(maxOpened uint64) *maintenanceWindow { 1313 mw := &maintenanceWindow{} 1314 // Initialize the rolling window with max values to prevent the maintainer 1315 // from deleting sessions before a complete window of 10 cycles has 1316 // finished. 1317 for i := 0; i < maintenanceWindowSize; i++ { 1318 mw.maxSessionsCheckedOut[i] = maxOpened 1319 } 1320 return mw 1321} 1322 1323// healthChecker performs periodical healthchecks on registered sessions. 1324type healthChecker struct { 1325 // mu protects concurrent access to healthChecker. 1326 mu sync.Mutex 1327 // queue is the priority queue for session healthchecks. Sessions with lower 1328 // nextCheck rank higher in the queue. 1329 queue hcHeap 1330 // interval is the average interval between two healthchecks on a session. 1331 interval time.Duration 1332 // workers is the number of concurrent healthcheck workers. 1333 workers int 1334 // waitWorkers waits for all healthcheck workers to exit 1335 waitWorkers sync.WaitGroup 1336 // pool is the underlying session pool. 1337 pool *sessionPool 1338 // sampleInterval is the interval of sampling by the maintainer. 1339 sampleInterval time.Duration 1340 // ready is used to signal that maintainer can start running. 1341 ready chan struct{} 1342 // done is used to signal that health checker should be closed. 1343 done chan struct{} 1344 // once is used for closing channel done only once. 1345 once sync.Once 1346 maintainerCancel func() 1347} 1348 1349// newHealthChecker initializes new instance of healthChecker. 1350func newHealthChecker(interval time.Duration, workers int, sampleInterval time.Duration, pool *sessionPool) *healthChecker { 1351 if workers <= 0 { 1352 workers = 1 1353 } 1354 hc := &healthChecker{ 1355 interval: interval, 1356 workers: workers, 1357 pool: pool, 1358 sampleInterval: sampleInterval, 1359 ready: make(chan struct{}), 1360 done: make(chan struct{}), 1361 maintainerCancel: func() {}, 1362 } 1363 hc.waitWorkers.Add(1) 1364 go hc.maintainer() 1365 for i := 1; i <= hc.workers; i++ { 1366 hc.waitWorkers.Add(1) 1367 go hc.worker(i) 1368 } 1369 return hc 1370} 1371 1372// close closes the healthChecker and waits for all healthcheck workers to exit. 1373func (hc *healthChecker) close() { 1374 hc.mu.Lock() 1375 hc.maintainerCancel() 1376 hc.mu.Unlock() 1377 hc.once.Do(func() { close(hc.done) }) 1378 hc.waitWorkers.Wait() 1379} 1380 1381// isClosing checks if a healthChecker is already closing. 1382func (hc *healthChecker) isClosing() bool { 1383 select { 1384 case <-hc.done: 1385 return true 1386 default: 1387 return false 1388 } 1389} 1390 1391// getInterval gets the healthcheck interval. 1392func (hc *healthChecker) getInterval() time.Duration { 1393 hc.mu.Lock() 1394 defer hc.mu.Unlock() 1395 return hc.interval 1396} 1397 1398// scheduledHCLocked schedules next healthcheck on session s with the assumption 1399// that hc.mu is being held. 1400func (hc *healthChecker) scheduledHCLocked(s *session) { 1401 var constPart, randPart float64 1402 if !s.firstHCDone { 1403 // The first check will be scheduled in a large range to make requests 1404 // more evenly distributed. The first healthcheck will be scheduled 1405 // after [interval*0.2, interval*1.1) ns. 1406 constPart = float64(hc.interval) * 0.2 1407 randPart = hc.pool.rand.Float64() * float64(hc.interval) * 0.9 1408 s.firstHCDone = true 1409 } else { 1410 // The next healthcheck will be scheduled after 1411 // [interval*0.9, interval*1.1) ns. 1412 constPart = float64(hc.interval) * 0.9 1413 randPart = hc.pool.rand.Float64() * float64(hc.interval) * 0.2 1414 } 1415 // math.Ceil makes the value to be at least 1 ns. 1416 nsFromNow := int64(math.Ceil(constPart + randPart)) 1417 s.setNextCheck(time.Now().Add(time.Duration(nsFromNow))) 1418 if hi := s.getHcIndex(); hi != -1 { 1419 // Session is still being tracked by healthcheck workers. 1420 heap.Fix(&hc.queue, hi) 1421 } 1422} 1423 1424// scheduledHC schedules next healthcheck on session s. It is safe to be called 1425// concurrently. 1426func (hc *healthChecker) scheduledHC(s *session) { 1427 hc.mu.Lock() 1428 defer hc.mu.Unlock() 1429 hc.scheduledHCLocked(s) 1430} 1431 1432// register registers a session with healthChecker for periodical healthcheck. 1433func (hc *healthChecker) register(s *session) { 1434 hc.mu.Lock() 1435 defer hc.mu.Unlock() 1436 hc.scheduledHCLocked(s) 1437 heap.Push(&hc.queue, s) 1438} 1439 1440// unregister unregisters a session from healthcheck queue. 1441func (hc *healthChecker) unregister(s *session) { 1442 hc.mu.Lock() 1443 defer hc.mu.Unlock() 1444 oi := s.setHcIndex(-1) 1445 if oi >= 0 { 1446 heap.Remove(&hc.queue, oi) 1447 } 1448} 1449 1450// markDone marks that health check for session has been performed. 1451func (hc *healthChecker) markDone(s *session) { 1452 hc.mu.Lock() 1453 defer hc.mu.Unlock() 1454 s.checkingHealth = false 1455} 1456 1457// healthCheck checks the health of the session and pings it if needed. 1458func (hc *healthChecker) healthCheck(s *session) { 1459 defer hc.markDone(s) 1460 if !s.pool.isValid() { 1461 // Session pool is closed, perform a garbage collection. 1462 s.destroy(false) 1463 return 1464 } 1465 if err := s.ping(); isSessionNotFoundError(err) { 1466 // Ping failed, destroy the session. 1467 s.destroy(false) 1468 } 1469} 1470 1471// worker performs the healthcheck on sessions in healthChecker's priority 1472// queue. 1473func (hc *healthChecker) worker(i int) { 1474 // Returns a session which we should ping to keep it alive. 1475 getNextForPing := func() *session { 1476 hc.pool.mu.Lock() 1477 defer hc.pool.mu.Unlock() 1478 hc.mu.Lock() 1479 defer hc.mu.Unlock() 1480 if hc.queue.Len() <= 0 { 1481 // Queue is empty. 1482 return nil 1483 } 1484 s := hc.queue.sessions[0] 1485 if s.getNextCheck().After(time.Now()) && hc.pool.valid { 1486 // All sessions have been checked recently. 1487 return nil 1488 } 1489 hc.scheduledHCLocked(s) 1490 if !s.checkingHealth { 1491 s.checkingHealth = true 1492 return s 1493 } 1494 return nil 1495 } 1496 1497 // Returns a session which we should prepare for write. 1498 getNextForTx := func() *session { 1499 hc.pool.mu.Lock() 1500 defer hc.pool.mu.Unlock() 1501 if hc.pool.shouldPrepareWriteLocked() { 1502 if hc.pool.idleList.Len() > 0 && hc.pool.valid { 1503 hc.mu.Lock() 1504 defer hc.mu.Unlock() 1505 if hc.pool.idleList.Front().Value.(*session).checkingHealth { 1506 return nil 1507 } 1508 session := hc.pool.idleList.Remove(hc.pool.idleList.Front()).(*session) 1509 ctx := context.Background() 1510 hc.pool.decNumReadsLocked(ctx) 1511 session.checkingHealth = true 1512 hc.pool.incNumBeingPreparedLocked(ctx) 1513 return session 1514 } 1515 } 1516 return nil 1517 } 1518 1519 for { 1520 if hc.isClosing() { 1521 // Exit when the pool has been closed and all sessions have been 1522 // destroyed or when health checker has been closed. 1523 hc.waitWorkers.Done() 1524 return 1525 } 1526 ws := getNextForTx() 1527 if ws != nil { 1528 ctx, cancel := context.WithTimeout(context.Background(), time.Minute) 1529 err := ws.prepareForWrite(ctx) 1530 if err != nil { 1531 // Skip handling prepare error, session can be prepared in next 1532 // cycle. 1533 // Don't log about permission errors, which may be expected 1534 // (e.g. using read-only auth). 1535 serr := ToSpannerError(err).(*Error) 1536 if serr.Code != codes.PermissionDenied { 1537 logf(hc.pool.sc.logger, "Failed to prepare session, error: %v", serr) 1538 } 1539 } 1540 hc.pool.recycle(ws) 1541 hc.pool.mu.Lock() 1542 hc.pool.decNumBeingPreparedLocked(ctx) 1543 hc.pool.mu.Unlock() 1544 cancel() 1545 hc.markDone(ws) 1546 } 1547 rs := getNextForPing() 1548 if rs == nil { 1549 if ws == nil { 1550 // No work to be done so sleep to avoid burning CPU. 1551 pause := int64(100 * time.Millisecond) 1552 if pause > int64(hc.interval) { 1553 pause = int64(hc.interval) 1554 } 1555 select { 1556 case <-time.After(time.Duration(rand.Int63n(pause) + pause/2)): 1557 case <-hc.done: 1558 } 1559 1560 } 1561 continue 1562 } 1563 hc.healthCheck(rs) 1564 } 1565} 1566 1567// maintainer maintains the number of sessions in the pool based on the session 1568// pool configuration and the current and historical number of sessions checked 1569// out of the pool. The maintainer will: 1570// 1. Ensure that the session pool contains at least MinOpened sessions. 1571// 2. If the current number of sessions in the pool exceeds the greatest number 1572// of checked out sessions (=sessions in use) during the last 10 minutes, 1573// and the delta is larger than MaxIdleSessions, the maintainer will reduce 1574// the number of sessions to maxSessionsInUseDuringWindow+MaxIdleSessions. 1575func (hc *healthChecker) maintainer() { 1576 // Wait until the pool is ready. 1577 <-hc.ready 1578 1579 for iteration := uint64(0); ; iteration++ { 1580 if hc.isClosing() { 1581 hc.waitWorkers.Done() 1582 return 1583 } 1584 1585 hc.pool.mu.Lock() 1586 currSessionsOpened := hc.pool.numOpened 1587 maxIdle := hc.pool.MaxIdle 1588 minOpened := hc.pool.MinOpened 1589 1590 // Reset the start time for recording the maximum number of sessions 1591 // in the pool. 1592 now := time.Now() 1593 if now.After(hc.pool.lastResetTime.Add(10 * time.Minute)) { 1594 hc.pool.maxNumInUse = hc.pool.numInUse 1595 hc.pool.recordStat(context.Background(), MaxInUseSessionsCount, int64(hc.pool.maxNumInUse)) 1596 hc.pool.lastResetTime = now 1597 } 1598 hc.pool.mu.Unlock() 1599 // Get the maximum number of sessions in use during the current 1600 // maintenance window. 1601 maxSessionsInUseDuringWindow := hc.pool.mw.maxSessionsCheckedOutDuringWindow() 1602 hc.mu.Lock() 1603 ctx, cancel := context.WithTimeout(context.Background(), hc.sampleInterval) 1604 hc.maintainerCancel = cancel 1605 hc.mu.Unlock() 1606 1607 // Grow or shrink pool if needed. 1608 // The number of sessions in the pool should be in the range 1609 // [Config.MinOpened, Config.MaxIdle+maxSessionsInUseDuringWindow] 1610 if currSessionsOpened < minOpened { 1611 if err := hc.growPoolInBatch(ctx, minOpened); err != nil { 1612 logf(hc.pool.sc.logger, "failed to grow pool: %v", err) 1613 } 1614 } else if maxIdle+maxSessionsInUseDuringWindow < currSessionsOpened { 1615 hc.shrinkPool(ctx, maxIdle+maxSessionsInUseDuringWindow) 1616 } 1617 1618 select { 1619 case <-ctx.Done(): 1620 case <-hc.done: 1621 cancel() 1622 } 1623 // Cycle the maintenance window. This will remove the oldest cycle and 1624 // add a new cycle at the beginning of the maintenance window with the 1625 // currently checked out number of sessions as the max number of 1626 // sessions in use in this cycle. This value will be increased during 1627 // the next cycle if it increases. 1628 hc.pool.mu.Lock() 1629 currSessionsInUse := hc.pool.currSessionsCheckedOutLocked() 1630 hc.pool.mu.Unlock() 1631 hc.pool.mw.startNewCycle(currSessionsInUse) 1632 } 1633} 1634 1635func (hc *healthChecker) growPoolInBatch(ctx context.Context, growToNumSessions uint64) error { 1636 hc.pool.mu.Lock() 1637 defer hc.pool.mu.Unlock() 1638 numSessions := growToNumSessions - hc.pool.numOpened 1639 return hc.pool.growPoolLocked(numSessions, false) 1640} 1641 1642// shrinkPool scales down the session pool. The method will stop deleting 1643// sessions when shrinkToNumSessions number of sessions in the pool has 1644// been reached. The method will also stop deleting sessions if it detects that 1645// another process has started creating sessions for the pool again, for 1646// example through the take() method. 1647func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uint64) { 1648 hc.pool.mu.Lock() 1649 maxSessionsToDelete := int(hc.pool.numOpened - shrinkToNumSessions) 1650 hc.pool.mu.Unlock() 1651 var deleted int 1652 var prevNumOpened uint64 = math.MaxUint64 1653 for { 1654 if ctx.Err() != nil { 1655 return 1656 } 1657 1658 p := hc.pool 1659 p.mu.Lock() 1660 // Check if the number of open sessions has increased. If it has, we 1661 // should stop deleting sessions, as the load has increased and 1662 // additional sessions are needed. 1663 if p.numOpened >= prevNumOpened { 1664 p.mu.Unlock() 1665 break 1666 } 1667 prevNumOpened = p.numOpened 1668 1669 // Check on both whether we have reached the number of open sessions as 1670 // well as the number of sessions to delete, in case sessions have been 1671 // deleted by other methods because they have expired or deemed 1672 // invalid. 1673 if shrinkToNumSessions >= p.numOpened || deleted >= maxSessionsToDelete { 1674 p.mu.Unlock() 1675 break 1676 } 1677 1678 var s *session 1679 if p.idleList.Len() > 0 { 1680 s = p.idleList.Front().Value.(*session) 1681 } else if p.idleWriteList.Len() > 0 { 1682 s = p.idleWriteList.Front().Value.(*session) 1683 } 1684 p.mu.Unlock() 1685 if s != nil { 1686 deleted++ 1687 // destroy session as expire. 1688 s.destroy(true) 1689 } else { 1690 break 1691 } 1692 } 1693} 1694 1695// maxUint64 returns the maximum of two uint64. 1696func maxUint64(a, b uint64) uint64 { 1697 if a > b { 1698 return a 1699 } 1700 return b 1701} 1702 1703// minUint64 returns the minimum of two uint64. 1704func minUint64(a, b uint64) uint64 { 1705 if a > b { 1706 return b 1707 } 1708 return a 1709} 1710 1711// sessionResourceType is the type name of Spanner sessions. 1712const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session" 1713 1714// isSessionNotFoundError returns true if the given error is a 1715// `Session not found` error. 1716func isSessionNotFoundError(err error) bool { 1717 if err == nil { 1718 return false 1719 } 1720 if ErrCode(err) == codes.NotFound { 1721 if rt, ok := extractResourceType(err); ok { 1722 return rt == sessionResourceType 1723 } 1724 } 1725 return false 1726} 1727