1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"container/heap"
21	"container/list"
22	"context"
23	"fmt"
24	"log"
25	"math"
26	"math/rand"
27	"runtime/debug"
28	"sync"
29	"time"
30
31	"cloud.google.com/go/internal/trace"
32	vkit "cloud.google.com/go/spanner/apiv1"
33	sppb "google.golang.org/genproto/googleapis/spanner/v1"
34	"google.golang.org/grpc/codes"
35	"google.golang.org/grpc/metadata"
36)
37
38// sessionHandle is an interface for transactions to access Cloud Spanner
39// sessions safely. It is generated by sessionPool.take().
40type sessionHandle struct {
41	// mu guarantees that the inner session object is returned / destroyed only
42	// once.
43	mu sync.Mutex
44	// session is a pointer to a session object. Transactions never need to
45	// access it directly.
46	session *session
47	// checkoutTime is the time the session was checked out of the pool.
48	checkoutTime time.Time
49	// trackedSessionHandle is the linked list node which links the session to
50	// the list of tracked session handles. trackedSessionHandle is only set if
51	// TrackSessionHandles has been enabled in the session pool configuration.
52	trackedSessionHandle *list.Element
53	// stack is the call stack of the goroutine that checked out the session
54	// from the pool. This can be used to track down session leak problems.
55	stack []byte
56}
57
58// recycle gives the inner session object back to its home session pool. It is
59// safe to call recycle multiple times but only the first one would take effect.
60func (sh *sessionHandle) recycle() {
61	sh.mu.Lock()
62	if sh.session == nil {
63		// sessionHandle has already been recycled.
64		sh.mu.Unlock()
65		return
66	}
67	p := sh.session.pool
68	tracked := sh.trackedSessionHandle
69	sh.session.recycle()
70	sh.session = nil
71	sh.trackedSessionHandle = nil
72	sh.checkoutTime = time.Time{}
73	sh.stack = nil
74	sh.mu.Unlock()
75	if tracked != nil {
76		p.mu.Lock()
77		p.trackedSessionHandles.Remove(tracked)
78		p.mu.Unlock()
79	}
80}
81
82// getID gets the Cloud Spanner session ID from the internal session object.
83// getID returns empty string if the sessionHandle is nil or the inner session
84// object has been released by recycle / destroy.
85func (sh *sessionHandle) getID() string {
86	sh.mu.Lock()
87	defer sh.mu.Unlock()
88	if sh.session == nil {
89		// sessionHandle has already been recycled/destroyed.
90		return ""
91	}
92	return sh.session.getID()
93}
94
95// getClient gets the Cloud Spanner RPC client associated with the session ID
96// in sessionHandle.
97func (sh *sessionHandle) getClient() *vkit.Client {
98	sh.mu.Lock()
99	defer sh.mu.Unlock()
100	if sh.session == nil {
101		return nil
102	}
103	return sh.session.client
104}
105
106// getMetadata returns the metadata associated with the session in sessionHandle.
107func (sh *sessionHandle) getMetadata() metadata.MD {
108	sh.mu.Lock()
109	defer sh.mu.Unlock()
110	if sh.session == nil {
111		return nil
112	}
113	return sh.session.md
114}
115
116// getTransactionID returns the transaction id in the session if available.
117func (sh *sessionHandle) getTransactionID() transactionID {
118	sh.mu.Lock()
119	defer sh.mu.Unlock()
120	if sh.session == nil {
121		return nil
122	}
123	return sh.session.tx
124}
125
126// destroy destroys the inner session object. It is safe to call destroy
127// multiple times and only the first call would attempt to
128// destroy the inner session object.
129func (sh *sessionHandle) destroy() {
130	sh.mu.Lock()
131	s := sh.session
132	if s == nil {
133		// sessionHandle has already been recycled.
134		sh.mu.Unlock()
135		return
136	}
137	tracked := sh.trackedSessionHandle
138	sh.session = nil
139	sh.trackedSessionHandle = nil
140	sh.checkoutTime = time.Time{}
141	sh.stack = nil
142	sh.mu.Unlock()
143
144	if tracked != nil {
145		p := s.pool
146		p.mu.Lock()
147		p.trackedSessionHandles.Remove(tracked)
148		p.mu.Unlock()
149	}
150	s.destroy(false)
151}
152
153// session wraps a Cloud Spanner session ID through which transactions are
154// created and executed.
155type session struct {
156	// client is the RPC channel to Cloud Spanner. It is set only once during
157	// session's creation.
158	client *vkit.Client
159	// id is the unique id of the session in Cloud Spanner. It is set only once
160	// during session's creation.
161	id string
162	// pool is the session's home session pool where it was created. It is set
163	// only once during session's creation.
164	pool *sessionPool
165	// createTime is the timestamp of the session's creation. It is set only
166	// once during session's creation.
167	createTime time.Time
168	// logger is the logger configured for the Spanner client that created the
169	// session. If nil, logging will be directed to the standard logger.
170	logger *log.Logger
171
172	// mu protects the following fields from concurrent access: both
173	// healthcheck workers and transactions can modify them.
174	mu sync.Mutex
175	// valid marks the validity of a session.
176	valid bool
177	// hcIndex is the index of the session inside the global healthcheck queue.
178	// If hcIndex < 0, session has been unregistered from the queue.
179	hcIndex int
180	// idleList is the linkedlist node which links the session to its home
181	// session pool's idle list. If idleList == nil, the
182	// session is not in idle list.
183	idleList *list.Element
184	// nextCheck is the timestamp of next scheduled healthcheck of the session.
185	// It is maintained by the global health checker.
186	nextCheck time.Time
187	// checkingHelath is true if currently this session is being processed by
188	// health checker. Must be modified under health checker lock.
189	checkingHealth bool
190	// md is the Metadata to be sent with each request.
191	md metadata.MD
192	// tx contains the transaction id if the session has been prepared for
193	// write.
194	tx transactionID
195}
196
197// isValid returns true if the session is still valid for use.
198func (s *session) isValid() bool {
199	s.mu.Lock()
200	defer s.mu.Unlock()
201	return s.valid
202}
203
204// isWritePrepared returns true if the session is prepared for write.
205func (s *session) isWritePrepared() bool {
206	s.mu.Lock()
207	defer s.mu.Unlock()
208	return s.tx != nil
209}
210
211// String implements fmt.Stringer for session.
212func (s *session) String() string {
213	s.mu.Lock()
214	defer s.mu.Unlock()
215	return fmt.Sprintf("<id=%v, hcIdx=%v, idleList=%p, valid=%v, create=%v, nextcheck=%v>",
216		s.id, s.hcIndex, s.idleList, s.valid, s.createTime, s.nextCheck)
217}
218
219// ping verifies if the session is still alive in Cloud Spanner.
220func (s *session) ping() error {
221	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
222	defer cancel()
223	// s.getID is safe even when s is invalid.
224	_, err := s.client.GetSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.GetSessionRequest{Name: s.getID()})
225	return err
226}
227
228// setHcIndex atomically sets the session's index in the healthcheck queue and
229// returns the old index.
230func (s *session) setHcIndex(i int) int {
231	s.mu.Lock()
232	defer s.mu.Unlock()
233	oi := s.hcIndex
234	s.hcIndex = i
235	return oi
236}
237
238// setIdleList atomically sets the session's idle list link and returns the old
239// link.
240func (s *session) setIdleList(le *list.Element) *list.Element {
241	s.mu.Lock()
242	defer s.mu.Unlock()
243	old := s.idleList
244	s.idleList = le
245	return old
246}
247
248// invalidate marks a session as invalid and returns the old validity.
249func (s *session) invalidate() bool {
250	s.mu.Lock()
251	defer s.mu.Unlock()
252	ov := s.valid
253	s.valid = false
254	return ov
255}
256
257// setNextCheck sets the timestamp for next healthcheck on the session.
258func (s *session) setNextCheck(t time.Time) {
259	s.mu.Lock()
260	defer s.mu.Unlock()
261	s.nextCheck = t
262}
263
264// setTransactionID sets the transaction id in the session
265func (s *session) setTransactionID(tx transactionID) {
266	s.mu.Lock()
267	defer s.mu.Unlock()
268	s.tx = tx
269}
270
271// getID returns the session ID which uniquely identifies the session in Cloud
272// Spanner.
273func (s *session) getID() string {
274	s.mu.Lock()
275	defer s.mu.Unlock()
276	return s.id
277}
278
279// getHcIndex returns the session's index into the global healthcheck priority
280// queue.
281func (s *session) getHcIndex() int {
282	s.mu.Lock()
283	defer s.mu.Unlock()
284	return s.hcIndex
285}
286
287// getIdleList returns the session's link in its home session pool's idle list.
288func (s *session) getIdleList() *list.Element {
289	s.mu.Lock()
290	defer s.mu.Unlock()
291	return s.idleList
292}
293
294// getNextCheck returns the timestamp for next healthcheck on the session.
295func (s *session) getNextCheck() time.Time {
296	s.mu.Lock()
297	defer s.mu.Unlock()
298	return s.nextCheck
299}
300
301// recycle turns the session back to its home session pool.
302func (s *session) recycle() {
303	s.setTransactionID(nil)
304	if !s.pool.recycle(s) {
305		// s is rejected by its home session pool because it expired and the
306		// session pool currently has enough open sessions.
307		s.destroy(false)
308	}
309}
310
311// destroy removes the session from its home session pool, healthcheck queue
312// and Cloud Spanner service.
313func (s *session) destroy(isExpire bool) bool {
314	// Remove s from session pool.
315	if !s.pool.remove(s, isExpire) {
316		return false
317	}
318	// Unregister s from healthcheck queue.
319	s.pool.hc.unregister(s)
320	// Remove s from Cloud Spanner service.
321	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
322	defer cancel()
323	s.delete(ctx)
324	return true
325}
326
327func (s *session) delete(ctx context.Context) {
328	// Ignore the error because even if we fail to explicitly destroy the
329	// session, it will be eventually garbage collected by Cloud Spanner.
330	err := s.client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: s.getID()})
331	if err != nil {
332		logf(s.logger, "Failed to delete session %v. Error: %v", s.getID(), err)
333	}
334}
335
336// prepareForWrite prepares the session for write if it is not already in that
337// state.
338func (s *session) prepareForWrite(ctx context.Context) error {
339	if s.isWritePrepared() {
340		return nil
341	}
342	tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, s.md), s.getID(), s.client)
343	// Session not found should cause the session to be removed from the pool.
344	if isSessionNotFoundError(err) {
345		s.pool.remove(s, false)
346		s.pool.hc.unregister(s)
347		return err
348	}
349	// Enable/disable background preparing of write sessions depending on
350	// whether the BeginTransaction call succeeded. This will prevent the
351	// session pool workers from going into an infinite loop of trying to
352	// prepare sessions. Any subsequent successful BeginTransaction call from
353	// for example takeWriteSession will re-enable the background process.
354	s.pool.mu.Lock()
355	s.pool.disableBackgroundPrepareSessions = err != nil
356	s.pool.mu.Unlock()
357	if err != nil {
358		return err
359	}
360	s.setTransactionID(tx)
361	return nil
362}
363
364// SessionPoolConfig stores configurations of a session pool.
365type SessionPoolConfig struct {
366	// MaxOpened is the maximum number of opened sessions allowed by the session
367	// pool. If the client tries to open a session and there are already
368	// MaxOpened sessions, it will block until one becomes available or the
369	// context passed to the client method is canceled or times out.
370	//
371	// Defaults to NumChannels * 100.
372	MaxOpened uint64
373
374	// MinOpened is the minimum number of opened sessions that the session pool
375	// tries to maintain. Session pool won't continue to expire sessions if
376	// number of opened connections drops below MinOpened. However, if a session
377	// is found to be broken, it will still be evicted from the session pool,
378	// therefore it is posssible that the number of opened sessions drops below
379	// MinOpened.
380	//
381	// Defaults to 100.
382	MinOpened uint64
383
384	// MaxIdle is the maximum number of idle sessions, pool is allowed to keep.
385	//
386	// Defaults to 0.
387	MaxIdle uint64
388
389	// MaxBurst is the maximum number of concurrent session creation requests.
390	//
391	// Defaults to 10.
392	MaxBurst uint64
393
394	// WriteSessions is the fraction of sessions we try to keep prepared for
395	// write.
396	//
397	// Defaults to 0.2.
398	WriteSessions float64
399
400	// HealthCheckWorkers is number of workers used by health checker for this
401	// pool.
402	//
403	// Defaults to 10.
404	HealthCheckWorkers int
405
406	// HealthCheckInterval is how often the health checker pings a session.
407	//
408	// Defaults to 5m.
409	HealthCheckInterval time.Duration
410
411	// TrackSessionHandles determines whether the session pool will keep track
412	// of the stacktrace of the goroutines that take sessions from the pool.
413	// This setting can be used to track down session leak problems.
414	//
415	// Defaults to false.
416	TrackSessionHandles bool
417
418	// healthCheckSampleInterval is how often the health checker samples live
419	// session (for use in maintaining session pool size).
420	//
421	// Defaults to 1m.
422	healthCheckSampleInterval time.Duration
423
424	// sessionLabels for the sessions created in the session pool.
425	sessionLabels map[string]string
426}
427
428// DefaultSessionPoolConfig is the default configuration for the session pool
429// that will be used for a Spanner client, unless the user supplies a specific
430// session pool config.
431var DefaultSessionPoolConfig = SessionPoolConfig{
432	MinOpened:           100,
433	MaxOpened:           numChannels * 100,
434	MaxBurst:            10,
435	WriteSessions:       0.2,
436	HealthCheckWorkers:  10,
437	HealthCheckInterval: 30 * time.Minute,
438}
439
440// errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set.
441func errMinOpenedGTMaxOpened(maxOpened, minOpened uint64) error {
442	return spannerErrorf(codes.InvalidArgument,
443		"require SessionPoolConfig.MaxOpened >= SessionPoolConfig.MinOpened, got %d and %d", maxOpened, minOpened)
444}
445
446// errWriteFractionOutOfRange returns error for
447// SessionPoolConfig.WriteFraction < 0 or SessionPoolConfig.WriteFraction > 1
448func errWriteFractionOutOfRange(writeFraction float64) error {
449	return spannerErrorf(codes.InvalidArgument,
450		"require SessionPoolConfig.WriteSessions >= 0.0 && SessionPoolConfig.WriteSessions <= 1.0, got %.2f", writeFraction)
451}
452
453// errHealthCheckWorkersNegative returns error for
454// SessionPoolConfig.HealthCheckWorkers < 0
455func errHealthCheckWorkersNegative(workers int) error {
456	return spannerErrorf(codes.InvalidArgument,
457		"require SessionPoolConfig.HealthCheckWorkers >= 0, got %d", workers)
458}
459
460// errHealthCheckIntervalNegative returns error for
461// SessionPoolConfig.HealthCheckInterval < 0
462func errHealthCheckIntervalNegative(interval time.Duration) error {
463	return spannerErrorf(codes.InvalidArgument,
464		"require SessionPoolConfig.HealthCheckInterval >= 0, got %v", interval)
465}
466
467// validate verifies that the SessionPoolConfig is good for use.
468func (spc *SessionPoolConfig) validate() error {
469	if spc.MinOpened > spc.MaxOpened && spc.MaxOpened > 0 {
470		return errMinOpenedGTMaxOpened(spc.MaxOpened, spc.MinOpened)
471	}
472	if spc.WriteSessions < 0.0 || spc.WriteSessions > 1.0 {
473		return errWriteFractionOutOfRange(spc.WriteSessions)
474	}
475	if spc.HealthCheckWorkers < 0 {
476		return errHealthCheckWorkersNegative(spc.HealthCheckWorkers)
477	}
478	if spc.HealthCheckInterval < 0 {
479		return errHealthCheckIntervalNegative(spc.HealthCheckInterval)
480	}
481	return nil
482}
483
484// sessionPool creates and caches Cloud Spanner sessions.
485type sessionPool struct {
486	// mu protects sessionPool from concurrent access.
487	mu sync.Mutex
488	// valid marks the validity of the session pool.
489	valid bool
490	// sc is used to create the sessions for the pool.
491	sc *sessionClient
492	// trackedSessionHandles contains all sessions handles that have been
493	// checked out of the pool. The list is only filled if TrackSessionHandles
494	// has been enabled.
495	trackedSessionHandles list.List
496	// idleList caches idle session IDs. Session IDs in this list can be
497	// allocated for use.
498	idleList list.List
499	// idleWriteList caches idle sessions which have been prepared for write.
500	idleWriteList list.List
501	// mayGetSession is for broadcasting that session retrival/creation may
502	// proceed.
503	mayGetSession chan struct{}
504	// numOpened is the total number of open sessions from the session pool.
505	numOpened uint64
506	// createReqs is the number of ongoing session creation requests.
507	createReqs uint64
508	// prepareReqs is the number of ongoing session preparation request.
509	prepareReqs uint64
510	// disableBackgroundPrepareSessions indicates that the BeginTransaction
511	// call for a read/write transaction failed with a permanent error, such as
512	// PermissionDenied or `Database not found`. Further background calls to
513	// prepare sessions will be disabled.
514	disableBackgroundPrepareSessions bool
515	// configuration of the session pool.
516	SessionPoolConfig
517	// hc is the health checker
518	hc *healthChecker
519
520	// mw is the maintenance window containing statistics for the max number of
521	// sessions checked out of the pool during the last 10 minutes.
522	mw *maintenanceWindow
523}
524
525// newSessionPool creates a new session pool.
526func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, error) {
527	if err := config.validate(); err != nil {
528		return nil, err
529	}
530	pool := &sessionPool{
531		sc:                sc,
532		valid:             true,
533		mayGetSession:     make(chan struct{}),
534		SessionPoolConfig: config,
535		mw:                newMaintenanceWindow(config.MaxOpened),
536	}
537	if config.HealthCheckWorkers == 0 {
538		// With 10 workers and assuming average latency of 5ms for
539		// BeginTransaction, we will be able to prepare 2000 tx/sec in advance.
540		// If the rate of takeWriteSession is more than that, it will degrade to
541		// doing BeginTransaction inline.
542		//
543		// TODO: consider resizing the worker pool dynamically according to the load.
544		config.HealthCheckWorkers = 10
545	}
546	if config.HealthCheckInterval == 0 {
547		config.HealthCheckInterval = 5 * time.Minute
548	}
549	if config.healthCheckSampleInterval == 0 {
550		config.healthCheckSampleInterval = time.Minute
551	}
552	// On GCE VM, within the same region an healthcheck ping takes on average
553	// 10ms to finish, given a 5 minutes interval and 10 healthcheck workers, a
554	// healthChecker can effectively mantain
555	// 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions.
556	pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, config.healthCheckSampleInterval, pool)
557
558	// First initialize the pool before we indicate that the healthchecker is
559	// ready. This prevents the maintainer from starting before the pool has
560	// been initialized, which means that we guarantee that the initial
561	// sessions are created using BatchCreateSessions.
562	if config.MinOpened > 0 {
563		numSessions := minUint64(config.MinOpened, math.MaxInt32)
564		if err := pool.initPool(int32(numSessions)); err != nil {
565			return nil, err
566		}
567	}
568	close(pool.hc.ready)
569	return pool, nil
570}
571
572func (p *sessionPool) initPool(numSessions int32) error {
573	p.mu.Lock()
574	// Take budget before the actual session creation.
575	p.numOpened += uint64(numSessions)
576	recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
577	p.createReqs += uint64(numSessions)
578	p.mu.Unlock()
579	// Asynchronously create the initial sessions for the pool.
580	return p.sc.batchCreateSessions(numSessions, p)
581}
582
583// sessionReady is executed by the SessionClient when a session has been
584// created and is ready to use. This method will add the new session to the
585// pool and decrease the number of sessions that is being created.
586func (p *sessionPool) sessionReady(s *session) {
587	p.mu.Lock()
588	defer p.mu.Unlock()
589	// Set this pool as the home pool of the session and register it with the
590	// health checker.
591	s.pool = p
592	p.hc.register(s)
593	p.createReqs--
594	// Insert the session at a random position in the pool to prevent all
595	// sessions affiliated with a channel to be placed at sequentially in the
596	// pool.
597	if p.idleList.Len() > 0 {
598		pos := rand.Intn(p.idleList.Len())
599		before := p.idleList.Front()
600		for i := 0; i < pos; i++ {
601			before = before.Next()
602		}
603		s.setIdleList(p.idleList.InsertBefore(s, before))
604	} else {
605		s.setIdleList(p.idleList.PushBack(s))
606	}
607	// Notify other waiters blocking on session creation.
608	close(p.mayGetSession)
609	p.mayGetSession = make(chan struct{})
610}
611
612// sessionCreationFailed is called by the SessionClient when the creation of one
613// or more requested sessions finished with an error. sessionCreationFailed will
614// decrease the number of sessions being created and notify any waiters that
615// the session creation failed.
616func (p *sessionPool) sessionCreationFailed(err error, numSessions int32) {
617	p.mu.Lock()
618	defer p.mu.Unlock()
619	p.createReqs -= uint64(numSessions)
620	p.numOpened -= uint64(numSessions)
621	recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
622	// Notify other waiters blocking on session creation.
623	close(p.mayGetSession)
624	p.mayGetSession = make(chan struct{})
625}
626
627// isValid checks if the session pool is still valid.
628func (p *sessionPool) isValid() bool {
629	if p == nil {
630		return false
631	}
632	p.mu.Lock()
633	defer p.mu.Unlock()
634	return p.valid
635}
636
637// close marks the session pool as closed.
638func (p *sessionPool) close() {
639	if p == nil {
640		return
641	}
642	p.mu.Lock()
643	if !p.valid {
644		p.mu.Unlock()
645		return
646	}
647	p.valid = false
648	p.mu.Unlock()
649	p.hc.close()
650	// destroy all the sessions
651	p.hc.mu.Lock()
652	allSessions := make([]*session, len(p.hc.queue.sessions))
653	copy(allSessions, p.hc.queue.sessions)
654	p.hc.mu.Unlock()
655	for _, s := range allSessions {
656		s.destroy(false)
657	}
658}
659
660// errInvalidSessionPool is the error for using an invalid session pool.
661var errInvalidSessionPool = spannerErrorf(codes.InvalidArgument, "invalid session pool")
662
663// errGetSessionTimeout returns error for context timeout during
664// sessionPool.take().
665var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canceled during getting session")
666
667// newSessionHandle creates a new session handle for the given session for this
668// session pool. The session handle will also hold a copy of the current call
669// stack if the session pool has been configured to track the call stacks of
670// sessions being checked out of the pool.
671func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) {
672	sh = &sessionHandle{session: s, checkoutTime: time.Now()}
673	if p.TrackSessionHandles {
674		p.mu.Lock()
675		sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh)
676		p.mu.Unlock()
677		sh.stack = debug.Stack()
678	}
679	return sh
680}
681
682// errGetSessionTimeout returns error for context timeout during
683// sessionPool.take().
684func (p *sessionPool) errGetSessionTimeout() error {
685	if p.TrackSessionHandles {
686		return p.errGetSessionTimeoutWithTrackedSessionHandles()
687	}
688	return p.errGetBasicSessionTimeout()
689}
690
691// errGetBasicSessionTimeout returns error for context timout during
692// sessionPool.take() without any tracked sessionHandles.
693func (p *sessionPool) errGetBasicSessionTimeout() error {
694	return spannerErrorf(codes.Canceled, "timeout / context canceled during getting session.\n"+
695		"Enable SessionPoolConfig.TrackSessionHandles if you suspect a session leak to get more information about the checked out sessions.")
696}
697
698// errGetSessionTimeoutWithTrackedSessionHandles returns error for context
699// timout during sessionPool.take() including a stacktrace of each checked out
700// session handle.
701func (p *sessionPool) errGetSessionTimeoutWithTrackedSessionHandles() error {
702	err := spannerErrorf(codes.Canceled, "timeout / context canceled during getting session.")
703	err.(*Error).additionalInformation = p.getTrackedSessionHandleStacksLocked()
704	return err
705}
706
707// getTrackedSessionHandleStacksLocked returns a string containing the
708// stacktrace of all currently checked out sessions of the pool. This method
709// requires the caller to have locked p.mu.
710func (p *sessionPool) getTrackedSessionHandleStacksLocked() string {
711	p.mu.Lock()
712	defer p.mu.Unlock()
713	stackTraces := ""
714	i := 1
715	element := p.trackedSessionHandles.Front()
716	for element != nil {
717		sh := element.Value.(*sessionHandle)
718		sh.mu.Lock()
719		if sh.stack != nil {
720			stackTraces = fmt.Sprintf("%s\n\nSession %d checked out of pool at %s by goroutine:\n%s", stackTraces, i, sh.checkoutTime.Format(time.RFC3339), sh.stack)
721		}
722		sh.mu.Unlock()
723		element = element.Next()
724		i++
725	}
726	return stackTraces
727}
728
729// shouldPrepareWriteLocked returns true if we should prepare more sessions for write.
730func (p *sessionPool) shouldPrepareWriteLocked() bool {
731	return !p.disableBackgroundPrepareSessions && float64(p.numOpened)*p.WriteSessions > float64(p.idleWriteList.Len()+int(p.prepareReqs))
732}
733
734func (p *sessionPool) createSession(ctx context.Context) (*session, error) {
735	trace.TracePrintf(ctx, nil, "Creating a new session")
736	doneCreate := func(done bool) {
737		p.mu.Lock()
738		if !done {
739			// Session creation failed, give budget back.
740			p.numOpened--
741			recordStat(ctx, OpenSessionCount, int64(p.numOpened))
742		}
743		p.createReqs--
744		// Notify other waiters blocking on session creation.
745		close(p.mayGetSession)
746		p.mayGetSession = make(chan struct{})
747		p.mu.Unlock()
748	}
749	s, err := p.sc.createSession(ctx)
750	if err != nil {
751		doneCreate(false)
752		// Should return error directly because of the previous retries on
753		// CreateSession RPC.
754		// If the error is a timeout, there is a chance that the session was
755		// created on the server but is not known to the session pool. This
756		// session will then be garbage collected by the server after 1 hour.
757		return nil, err
758	}
759	s.pool = p
760	p.hc.register(s)
761	doneCreate(true)
762	return s, nil
763}
764
765func (p *sessionPool) isHealthy(s *session) bool {
766	if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) {
767		// TODO: figure out if we need to schedule a new healthcheck worker here.
768		if err := s.ping(); isSessionNotFoundError(err) {
769			// The session is already bad, continue to fetch/create a new one.
770			s.destroy(false)
771			return false
772		}
773		p.hc.scheduledHC(s)
774	}
775	return true
776}
777
778// take returns a cached session if there are available ones; if there isn't
779// any, it tries to allocate a new one. Session returned by take should be used
780// for read operations.
781func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
782	trace.TracePrintf(ctx, nil, "Acquiring a read-only session")
783	for {
784		var (
785			s   *session
786			err error
787		)
788
789		p.mu.Lock()
790		if !p.valid {
791			p.mu.Unlock()
792			return nil, errInvalidSessionPool
793		}
794		if p.idleList.Len() > 0 {
795			// Idle sessions are available, get one from the top of the idle
796			// list.
797			s = p.idleList.Remove(p.idleList.Front()).(*session)
798			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
799				"Acquired read-only session")
800		} else if p.idleWriteList.Len() > 0 {
801			s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session)
802			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
803				"Acquired read-write session")
804		}
805		if s != nil {
806			s.setIdleList(nil)
807			numCheckedOut := p.currSessionsCheckedOutLocked()
808			p.mu.Unlock()
809			p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
810			// From here, session is no longer in idle list, so healthcheck
811			// workers won't destroy it. If healthcheck workers failed to
812			// schedule healthcheck for the session timely, do the check here.
813			// Because session check is still much cheaper than session
814			// creation, they should be reused as much as possible.
815			if !p.isHealthy(s) {
816				continue
817			}
818			return p.newSessionHandle(s), nil
819		}
820
821		// Idle list is empty, block if session pool has reached max session
822		// creation concurrency or max number of open sessions.
823		if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) {
824			mayGetSession := p.mayGetSession
825			p.mu.Unlock()
826			trace.TracePrintf(ctx, nil, "Waiting for read-only session to become available")
827			select {
828			case <-ctx.Done():
829				trace.TracePrintf(ctx, nil, "Context done waiting for session")
830				return nil, p.errGetSessionTimeout()
831			case <-mayGetSession:
832			}
833			continue
834		}
835
836		// Take budget before the actual session creation.
837		p.numOpened++
838		// Creating a new session that will be returned directly to the client
839		// means that the max number of sessions in use also increases.
840		numCheckedOut := p.currSessionsCheckedOutLocked()
841		recordStat(ctx, OpenSessionCount, int64(p.numOpened))
842		p.createReqs++
843		p.mu.Unlock()
844		p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
845		if s, err = p.createSession(ctx); err != nil {
846			trace.TracePrintf(ctx, nil, "Error creating session: %v", err)
847			return nil, toSpannerError(err)
848		}
849		trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
850			"Created session")
851		return p.newSessionHandle(s), nil
852	}
853}
854
855// takeWriteSession returns a write prepared cached session if there are
856// available ones; if there isn't any, it tries to allocate a new one. Session
857// returned should be used for read write transactions.
858func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) {
859	trace.TracePrintf(ctx, nil, "Acquiring a read-write session")
860	for {
861		var (
862			s   *session
863			err error
864		)
865
866		p.mu.Lock()
867		if !p.valid {
868			p.mu.Unlock()
869			return nil, errInvalidSessionPool
870		}
871		if p.idleWriteList.Len() > 0 {
872			// Idle sessions are available, get one from the top of the idle
873			// list.
874			s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session)
875			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-write session")
876		} else if p.idleList.Len() > 0 {
877			s = p.idleList.Remove(p.idleList.Front()).(*session)
878			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-only session")
879		}
880		if s != nil {
881			s.setIdleList(nil)
882			numCheckedOut := p.currSessionsCheckedOutLocked()
883			p.mu.Unlock()
884			p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
885			// From here, session is no longer in idle list, so healthcheck
886			// workers won't destroy it. If healthcheck workers failed to
887			// schedule healthcheck for the session timely, do the check here.
888			// Because session check is still much cheaper than session
889			// creation, they should be reused as much as possible.
890			if !p.isHealthy(s) {
891				continue
892			}
893		} else {
894			// Idle list is empty, block if session pool has reached max session
895			// creation concurrency or max number of open sessions.
896			if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) {
897				mayGetSession := p.mayGetSession
898				p.mu.Unlock()
899				trace.TracePrintf(ctx, nil, "Waiting for read-write session to become available")
900				select {
901				case <-ctx.Done():
902					trace.TracePrintf(ctx, nil, "Context done waiting for session")
903					return nil, p.errGetSessionTimeout()
904				case <-mayGetSession:
905				}
906				continue
907			}
908
909			// Take budget before the actual session creation.
910			p.numOpened++
911			// Creating a new session that will be returned directly to the client
912			// means that the max number of sessions in use also increases.
913			numCheckedOut := p.currSessionsCheckedOutLocked()
914			recordStat(ctx, OpenSessionCount, int64(p.numOpened))
915			p.createReqs++
916			p.mu.Unlock()
917			p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
918			if s, err = p.createSession(ctx); err != nil {
919				trace.TracePrintf(ctx, nil, "Error creating session: %v", err)
920				return nil, toSpannerError(err)
921			}
922			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
923				"Created session")
924		}
925		if !s.isWritePrepared() {
926			if err = s.prepareForWrite(ctx); err != nil {
927				if isSessionNotFoundError(err) {
928					s.destroy(false)
929					trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
930						"Session not found for write")
931					return nil, toSpannerError(err)
932				}
933
934				s.recycle()
935				trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
936					"Error preparing session for write")
937				return nil, toSpannerError(err)
938			}
939		}
940		return p.newSessionHandle(s), nil
941	}
942}
943
944// recycle puts session s back to the session pool's idle list, it returns true
945// if the session pool successfully recycles session s.
946func (p *sessionPool) recycle(s *session) bool {
947	p.mu.Lock()
948	defer p.mu.Unlock()
949	if !s.isValid() || !p.valid {
950		// Reject the session if session is invalid or pool itself is invalid.
951		return false
952	}
953	// Put session at the top of the list to be handed out in LIFO order for load balancing
954	// across channels.
955	if s.isWritePrepared() {
956		s.setIdleList(p.idleWriteList.PushFront(s))
957	} else {
958		s.setIdleList(p.idleList.PushFront(s))
959	}
960	// Broadcast that a session has been returned to idle list.
961	close(p.mayGetSession)
962	p.mayGetSession = make(chan struct{})
963	return true
964}
965
966// remove atomically removes session s from the session pool and invalidates s.
967// If isExpire == true, the removal is triggered by session expiration and in
968// such cases, only idle sessions can be removed.
969func (p *sessionPool) remove(s *session, isExpire bool) bool {
970	p.mu.Lock()
971	defer p.mu.Unlock()
972	if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) {
973		// Don't expire session if the session is not in idle list (in use), or
974		// if number of open sessions is going below p.MinOpened.
975		return false
976	}
977	ol := s.setIdleList(nil)
978	// If the session is in the idlelist, remove it.
979	if ol != nil {
980		// Remove from whichever list it is in.
981		p.idleList.Remove(ol)
982		p.idleWriteList.Remove(ol)
983	}
984	if s.invalidate() {
985		// Decrease the number of opened sessions.
986		p.numOpened--
987		recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
988		// Broadcast that a session has been destroyed.
989		close(p.mayGetSession)
990		p.mayGetSession = make(chan struct{})
991		return true
992	}
993	return false
994}
995
996func (p *sessionPool) currSessionsCheckedOutLocked() uint64 {
997	return p.numOpened - uint64(p.idleList.Len()) - uint64(p.idleWriteList.Len())
998}
999
1000// hcHeap implements heap.Interface. It is used to create the priority queue for
1001// session healthchecks.
1002type hcHeap struct {
1003	sessions []*session
1004}
1005
1006// Len implements heap.Interface.Len.
1007func (h hcHeap) Len() int {
1008	return len(h.sessions)
1009}
1010
1011// Less implements heap.Interface.Less.
1012func (h hcHeap) Less(i, j int) bool {
1013	return h.sessions[i].getNextCheck().Before(h.sessions[j].getNextCheck())
1014}
1015
1016// Swap implements heap.Interface.Swap.
1017func (h hcHeap) Swap(i, j int) {
1018	h.sessions[i], h.sessions[j] = h.sessions[j], h.sessions[i]
1019	h.sessions[i].setHcIndex(i)
1020	h.sessions[j].setHcIndex(j)
1021}
1022
1023// Push implements heap.Interface.Push.
1024func (h *hcHeap) Push(s interface{}) {
1025	ns := s.(*session)
1026	ns.setHcIndex(len(h.sessions))
1027	h.sessions = append(h.sessions, ns)
1028}
1029
1030// Pop implements heap.Interface.Pop.
1031func (h *hcHeap) Pop() interface{} {
1032	old := h.sessions
1033	n := len(old)
1034	s := old[n-1]
1035	h.sessions = old[:n-1]
1036	s.setHcIndex(-1)
1037	return s
1038}
1039
1040// maintenanceWindowSize specifies the number of health check cycles that
1041// defines a maintenance window. The maintenance window keeps track of a
1042// rolling set of numbers for the number of maximum checked out sessions during
1043// the maintenance window. This is used by the maintainer to determine the
1044// number of sessions to create or delete at the end of each health check
1045// cycle.
1046const maintenanceWindowSize = 10
1047
1048// maintenanceWindow contains the statistics that are gathered during a health
1049// check maintenance window.
1050type maintenanceWindow struct {
1051	mu sync.Mutex
1052	// maxSessionsCheckedOut contains the maximum number of sessions that was
1053	// checked out of the session pool during a health check cycle. This number
1054	// indicates the number of sessions that was actually needed by the pool to
1055	// serve the load during that cycle. The values are kept as a rolling set
1056	// containing the values for the past 10 cycles (minutes). The maintainer
1057	// uses these values to determine the number of sessions to keep at the end
1058	// of each cycle.
1059	maxSessionsCheckedOut [maintenanceWindowSize]uint64
1060}
1061
1062// maxSessionsCheckedOutDuringWindow returns the maximum number of sessions
1063// that has been checked out during the last maintenance window of 10 cycles
1064// (minutes).
1065func (mw *maintenanceWindow) maxSessionsCheckedOutDuringWindow() uint64 {
1066	mw.mu.Lock()
1067	defer mw.mu.Unlock()
1068	var max uint64
1069	for _, cycleMax := range mw.maxSessionsCheckedOut {
1070		max = maxUint64(max, cycleMax)
1071	}
1072	return max
1073}
1074
1075// updateMaxSessionsCheckedOutDuringWindow updates the maximum number of
1076// sessions that has been checked out of the pool during the current
1077// cycle of the maintenance window. A maintenance window consists of 10
1078// maintenance cycles. Each cycle keeps track of the max number of sessions in
1079// use during that cycle. The rolling maintenance window of 10 cycles is used
1080// to determine the number of sessions to keep at the end of a cycle by
1081// calculating the max in use during the last 10 cycles.
1082func (mw *maintenanceWindow) updateMaxSessionsCheckedOutDuringWindow(currNumSessionsCheckedOut uint64) {
1083	mw.mu.Lock()
1084	defer mw.mu.Unlock()
1085	mw.maxSessionsCheckedOut[0] = maxUint64(currNumSessionsCheckedOut, mw.maxSessionsCheckedOut[0])
1086}
1087
1088// startNewCycle starts a new health check cycle with the specified number of
1089// checked out sessions as its initial value.
1090func (mw *maintenanceWindow) startNewCycle(currNumSessionsCheckedOut uint64) {
1091	mw.mu.Lock()
1092	defer mw.mu.Unlock()
1093	copy(mw.maxSessionsCheckedOut[1:], mw.maxSessionsCheckedOut[:9])
1094	mw.maxSessionsCheckedOut[0] = currNumSessionsCheckedOut
1095}
1096
1097// newMaintenanceWindow creates a new maintenance window with all values for
1098// maxSessionsCheckedOut set to maxOpened. This ensures that a complete
1099// maintenance window must pass before the maintainer will start to delete any
1100// sessions.
1101func newMaintenanceWindow(maxOpened uint64) *maintenanceWindow {
1102	mw := &maintenanceWindow{}
1103	// Initialize the rolling window with max values to prevent the maintainer
1104	// from deleting sessions before a complete window of 10 cycles has
1105	// finished.
1106	for i := 0; i < maintenanceWindowSize; i++ {
1107		mw.maxSessionsCheckedOut[i] = maxOpened
1108	}
1109	return mw
1110}
1111
1112// healthChecker performs periodical healthchecks on registered sessions.
1113type healthChecker struct {
1114	// mu protects concurrent access to healthChecker.
1115	mu sync.Mutex
1116	// queue is the priority queue for session healthchecks. Sessions with lower
1117	// nextCheck rank higher in the queue.
1118	queue hcHeap
1119	// interval is the average interval between two healthchecks on a session.
1120	interval time.Duration
1121	// workers is the number of concurrent healthcheck workers.
1122	workers int
1123	// waitWorkers waits for all healthcheck workers to exit
1124	waitWorkers sync.WaitGroup
1125	// pool is the underlying session pool.
1126	pool *sessionPool
1127	// sampleInterval is the interval of sampling by the maintainer.
1128	sampleInterval time.Duration
1129	// ready is used to signal that maintainer can start running.
1130	ready chan struct{}
1131	// done is used to signal that health checker should be closed.
1132	done chan struct{}
1133	// once is used for closing channel done only once.
1134	once             sync.Once
1135	maintainerCancel func()
1136}
1137
1138// newHealthChecker initializes new instance of healthChecker.
1139func newHealthChecker(interval time.Duration, workers int, sampleInterval time.Duration, pool *sessionPool) *healthChecker {
1140	if workers <= 0 {
1141		workers = 1
1142	}
1143	hc := &healthChecker{
1144		interval:         interval,
1145		workers:          workers,
1146		pool:             pool,
1147		sampleInterval:   sampleInterval,
1148		ready:            make(chan struct{}),
1149		done:             make(chan struct{}),
1150		maintainerCancel: func() {},
1151	}
1152	hc.waitWorkers.Add(1)
1153	go hc.maintainer()
1154	for i := 1; i <= hc.workers; i++ {
1155		hc.waitWorkers.Add(1)
1156		go hc.worker(i)
1157	}
1158	return hc
1159}
1160
1161// close closes the healthChecker and waits for all healthcheck workers to exit.
1162func (hc *healthChecker) close() {
1163	hc.mu.Lock()
1164	hc.maintainerCancel()
1165	hc.mu.Unlock()
1166	hc.once.Do(func() { close(hc.done) })
1167	hc.waitWorkers.Wait()
1168}
1169
1170// isClosing checks if a healthChecker is already closing.
1171func (hc *healthChecker) isClosing() bool {
1172	select {
1173	case <-hc.done:
1174		return true
1175	default:
1176		return false
1177	}
1178}
1179
1180// getInterval gets the healthcheck interval.
1181func (hc *healthChecker) getInterval() time.Duration {
1182	hc.mu.Lock()
1183	defer hc.mu.Unlock()
1184	return hc.interval
1185}
1186
1187// scheduledHCLocked schedules next healthcheck on session s with the assumption
1188// that hc.mu is being held.
1189func (hc *healthChecker) scheduledHCLocked(s *session) {
1190	// The next healthcheck will be scheduled after
1191	// [interval*0.5, interval*1.5) ns.
1192	nsFromNow := rand.Int63n(int64(hc.interval)) + int64(hc.interval)/2
1193	s.setNextCheck(time.Now().Add(time.Duration(nsFromNow)))
1194	if hi := s.getHcIndex(); hi != -1 {
1195		// Session is still being tracked by healthcheck workers.
1196		heap.Fix(&hc.queue, hi)
1197	}
1198}
1199
1200// scheduledHC schedules next healthcheck on session s. It is safe to be called
1201// concurrently.
1202func (hc *healthChecker) scheduledHC(s *session) {
1203	hc.mu.Lock()
1204	defer hc.mu.Unlock()
1205	hc.scheduledHCLocked(s)
1206}
1207
1208// register registers a session with healthChecker for periodical healthcheck.
1209func (hc *healthChecker) register(s *session) {
1210	hc.mu.Lock()
1211	defer hc.mu.Unlock()
1212	hc.scheduledHCLocked(s)
1213	heap.Push(&hc.queue, s)
1214}
1215
1216// unregister unregisters a session from healthcheck queue.
1217func (hc *healthChecker) unregister(s *session) {
1218	hc.mu.Lock()
1219	defer hc.mu.Unlock()
1220	oi := s.setHcIndex(-1)
1221	if oi >= 0 {
1222		heap.Remove(&hc.queue, oi)
1223	}
1224}
1225
1226// markDone marks that health check for session has been performed.
1227func (hc *healthChecker) markDone(s *session) {
1228	hc.mu.Lock()
1229	defer hc.mu.Unlock()
1230	s.checkingHealth = false
1231}
1232
1233// healthCheck checks the health of the session and pings it if needed.
1234func (hc *healthChecker) healthCheck(s *session) {
1235	defer hc.markDone(s)
1236	if !s.pool.isValid() {
1237		// Session pool is closed, perform a garbage collection.
1238		s.destroy(false)
1239		return
1240	}
1241	if err := s.ping(); isSessionNotFoundError(err) {
1242		// Ping failed, destroy the session.
1243		s.destroy(false)
1244	}
1245}
1246
1247// worker performs the healthcheck on sessions in healthChecker's priority
1248// queue.
1249func (hc *healthChecker) worker(i int) {
1250	// Returns a session which we should ping to keep it alive.
1251	getNextForPing := func() *session {
1252		hc.pool.mu.Lock()
1253		defer hc.pool.mu.Unlock()
1254		hc.mu.Lock()
1255		defer hc.mu.Unlock()
1256		if hc.queue.Len() <= 0 {
1257			// Queue is empty.
1258			return nil
1259		}
1260		s := hc.queue.sessions[0]
1261		if s.getNextCheck().After(time.Now()) && hc.pool.valid {
1262			// All sessions have been checked recently.
1263			return nil
1264		}
1265		hc.scheduledHCLocked(s)
1266		if !s.checkingHealth {
1267			s.checkingHealth = true
1268			return s
1269		}
1270		return nil
1271	}
1272
1273	// Returns a session which we should prepare for write.
1274	getNextForTx := func() *session {
1275		hc.pool.mu.Lock()
1276		defer hc.pool.mu.Unlock()
1277		if hc.pool.shouldPrepareWriteLocked() {
1278			if hc.pool.idleList.Len() > 0 && hc.pool.valid {
1279				hc.mu.Lock()
1280				defer hc.mu.Unlock()
1281				if hc.pool.idleList.Front().Value.(*session).checkingHealth {
1282					return nil
1283				}
1284				session := hc.pool.idleList.Remove(hc.pool.idleList.Front()).(*session)
1285				session.checkingHealth = true
1286				hc.pool.prepareReqs++
1287				return session
1288			}
1289		}
1290		return nil
1291	}
1292
1293	for {
1294		if hc.isClosing() {
1295			// Exit when the pool has been closed and all sessions have been
1296			// destroyed or when health checker has been closed.
1297			hc.waitWorkers.Done()
1298			return
1299		}
1300		ws := getNextForTx()
1301		if ws != nil {
1302			ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
1303			err := ws.prepareForWrite(ctx)
1304			cancel()
1305			if err != nil {
1306				// Skip handling prepare error, session can be prepared in next
1307				// cycle.
1308				// Don't log about permission errors, which may be expected
1309				// (e.g. using read-only auth).
1310				serr := toSpannerError(err).(*Error)
1311				if serr.Code != codes.PermissionDenied {
1312					logf(hc.pool.sc.logger, "Failed to prepare session, error: %v", serr)
1313				}
1314			}
1315			hc.pool.recycle(ws)
1316			hc.pool.mu.Lock()
1317			hc.pool.prepareReqs--
1318			hc.pool.mu.Unlock()
1319			hc.markDone(ws)
1320		}
1321		rs := getNextForPing()
1322		if rs == nil {
1323			if ws == nil {
1324				// No work to be done so sleep to avoid burning CPU.
1325				pause := int64(100 * time.Millisecond)
1326				if pause > int64(hc.interval) {
1327					pause = int64(hc.interval)
1328				}
1329				select {
1330				case <-time.After(time.Duration(rand.Int63n(pause) + pause/2)):
1331				case <-hc.done:
1332				}
1333
1334			}
1335			continue
1336		}
1337		hc.healthCheck(rs)
1338	}
1339}
1340
1341// maintainer maintains the number of sessions in the pool based on the session
1342// pool configuration and the current and historical number of sessions checked
1343// out of the pool. The maintainer will:
1344// 1. Ensure that the session pool contains at least MinOpened sessions.
1345// 2. If the current number of sessions in the pool exceeds the greatest number
1346//    of checked out sessions (=sessions in use) during the last 10 minutes,
1347//    and the delta is larger than MaxIdleSessions, the maintainer will reduce
1348//    the number of sessions to maxSessionsInUseDuringWindow+MaxIdleSessions.
1349func (hc *healthChecker) maintainer() {
1350	// Wait until the pool is ready.
1351	<-hc.ready
1352
1353	for iteration := uint64(0); ; iteration++ {
1354		if hc.isClosing() {
1355			hc.waitWorkers.Done()
1356			return
1357		}
1358
1359		hc.pool.mu.Lock()
1360		currSessionsOpened := hc.pool.numOpened
1361		maxIdle := hc.pool.MaxIdle
1362		minOpened := hc.pool.MinOpened
1363		hc.pool.mu.Unlock()
1364		// Get the maximum number of sessions in use during the current
1365		// maintenance window.
1366		maxSessionsInUseDuringWindow := hc.pool.mw.maxSessionsCheckedOutDuringWindow()
1367		hc.mu.Lock()
1368		ctx, cancel := context.WithTimeout(context.Background(), hc.sampleInterval)
1369		hc.maintainerCancel = cancel
1370		hc.mu.Unlock()
1371
1372		// Grow or shrink pool if needed.
1373		// The number of sessions in the pool should be in the range
1374		// [Config.MinOpened, Config.MaxIdle+maxSessionsInUseDuringWindow]
1375		if currSessionsOpened < minOpened {
1376			hc.growPool(ctx, minOpened)
1377		} else if maxIdle+maxSessionsInUseDuringWindow < currSessionsOpened {
1378			hc.shrinkPool(ctx, maxIdle+maxSessionsInUseDuringWindow)
1379		}
1380
1381		select {
1382		case <-ctx.Done():
1383		case <-hc.done:
1384			cancel()
1385		}
1386		// Cycle the maintenance window. This will remove the oldest cycle and
1387		// add a new cycle at the beginning of the maintenance window with the
1388		// currently checked out number of sessions as the max number of
1389		// sessions in use in this cycle. This value will be increased during
1390		// the next cycle if it increases.
1391		hc.pool.mu.Lock()
1392		currSessionsInUse := hc.pool.currSessionsCheckedOutLocked()
1393		hc.pool.mu.Unlock()
1394		hc.pool.mw.startNewCycle(currSessionsInUse)
1395	}
1396}
1397
1398// growPool grows the number of sessions in the pool to the specified number of
1399// sessions. It timeouts on sampleInterval.
1400func (hc *healthChecker) growPool(ctx context.Context, growToNumSessions uint64) {
1401	// Calculate the max number of sessions to create as a safeguard against
1402	// other processes that could be deleting sessions concurrently.
1403	hc.pool.mu.Lock()
1404	maxSessionsToCreate := int(growToNumSessions - hc.pool.numOpened)
1405	hc.pool.mu.Unlock()
1406	var created int
1407	for {
1408		if ctx.Err() != nil {
1409			return
1410		}
1411
1412		p := hc.pool
1413		p.mu.Lock()
1414		// Take budget before the actual session creation.
1415		if growToNumSessions <= p.numOpened || created >= maxSessionsToCreate {
1416			p.mu.Unlock()
1417			break
1418		}
1419		p.numOpened++
1420		recordStat(ctx, OpenSessionCount, int64(p.numOpened))
1421		p.createReqs++
1422		shouldPrepareWrite := p.shouldPrepareWriteLocked()
1423		p.mu.Unlock()
1424		var (
1425			s   *session
1426			err error
1427		)
1428		createContext, cancel := context.WithTimeout(context.Background(), time.Minute)
1429		if s, err = p.createSession(createContext); err != nil {
1430			cancel()
1431			logf(p.sc.logger, "Failed to create session, error: %v", toSpannerError(err))
1432			continue
1433		}
1434		cancel()
1435		created++
1436		if shouldPrepareWrite {
1437			prepareContext, cancel := context.WithTimeout(context.Background(), time.Minute)
1438			if err = s.prepareForWrite(prepareContext); err != nil {
1439				cancel()
1440				p.recycle(s)
1441				// Don't log about permission errors, which may be expected
1442				// (e.g. using read-only auth).
1443				serr := toSpannerError(err).(*Error)
1444				if serr.Code != codes.PermissionDenied {
1445					logf(p.sc.logger, "Failed to prepare session, error: %v", serr)
1446				}
1447				continue
1448			}
1449			cancel()
1450		}
1451		p.recycle(s)
1452	}
1453}
1454
1455// shrinkPool scales down the session pool. The method will stop deleting
1456// sessions when shrinkToNumSessions number of sessions in the pool has
1457// been reached. The method will also stop deleting sessions if it detects that
1458// another process has started creating sessions for the pool again, for
1459// example through the take() method.
1460func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uint64) {
1461	hc.pool.mu.Lock()
1462	maxSessionsToDelete := int(hc.pool.numOpened - shrinkToNumSessions)
1463	hc.pool.mu.Unlock()
1464	var deleted int
1465	var prevNumOpened uint64 = math.MaxUint64
1466	for {
1467		if ctx.Err() != nil {
1468			return
1469		}
1470
1471		p := hc.pool
1472		p.mu.Lock()
1473		// Check if the number of open sessions has increased. If it has, we
1474		// should stop deleting sessions, as the load has increased and
1475		// additional sessions are needed.
1476		if p.numOpened >= prevNumOpened {
1477			p.mu.Unlock()
1478			break
1479		}
1480		prevNumOpened = p.numOpened
1481
1482		// Check on both whether we have reached the number of open sessions as
1483		// well as the number of sessions to delete, in case sessions have been
1484		// deleted by other methods because they have expired or deemed
1485		// invalid.
1486		if shrinkToNumSessions >= p.numOpened || deleted >= maxSessionsToDelete {
1487			p.mu.Unlock()
1488			break
1489		}
1490
1491		var s *session
1492		if p.idleList.Len() > 0 {
1493			s = p.idleList.Front().Value.(*session)
1494		} else if p.idleWriteList.Len() > 0 {
1495			s = p.idleWriteList.Front().Value.(*session)
1496		}
1497		p.mu.Unlock()
1498		if s != nil {
1499			deleted++
1500			// destroy session as expire.
1501			s.destroy(true)
1502		} else {
1503			break
1504		}
1505	}
1506}
1507
1508// maxUint64 returns the maximum of two uint64.
1509func maxUint64(a, b uint64) uint64 {
1510	if a > b {
1511		return a
1512	}
1513	return b
1514}
1515
1516// minUint64 returns the minimum of two uint64.
1517func minUint64(a, b uint64) uint64 {
1518	if a > b {
1519		return b
1520	}
1521	return a
1522}
1523
1524// sessionResourceType is the type name of Spanner sessions.
1525const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session"
1526
1527// isSessionNotFoundError returns true if the given error is a
1528// `Session not found` error.
1529func isSessionNotFoundError(err error) bool {
1530	if err == nil {
1531		return false
1532	}
1533	if ErrCode(err) == codes.NotFound {
1534		if rt, ok := extractResourceType(err); ok {
1535			return rt == sessionResourceType
1536		}
1537	}
1538	return false
1539}
1540