1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"container/heap"
21	"container/list"
22	"context"
23	"fmt"
24	"log"
25	"math"
26	"math/rand"
27	"runtime/debug"
28	"sync"
29	"time"
30
31	"cloud.google.com/go/internal/trace"
32	"cloud.google.com/go/internal/version"
33	vkit "cloud.google.com/go/spanner/apiv1"
34	"go.opencensus.io/stats"
35	"go.opencensus.io/tag"
36	sppb "google.golang.org/genproto/googleapis/spanner/v1"
37	"google.golang.org/grpc/codes"
38	"google.golang.org/grpc/metadata"
39)
40
41const healthCheckIntervalMins = 50
42
43// sessionHandle is an interface for transactions to access Cloud Spanner
44// sessions safely. It is generated by sessionPool.take().
45type sessionHandle struct {
46	// mu guarantees that the inner session object is returned / destroyed only
47	// once.
48	mu sync.Mutex
49	// session is a pointer to a session object. Transactions never need to
50	// access it directly.
51	session *session
52	// checkoutTime is the time the session was checked out of the pool.
53	checkoutTime time.Time
54	// trackedSessionHandle is the linked list node which links the session to
55	// the list of tracked session handles. trackedSessionHandle is only set if
56	// TrackSessionHandles has been enabled in the session pool configuration.
57	trackedSessionHandle *list.Element
58	// stack is the call stack of the goroutine that checked out the session
59	// from the pool. This can be used to track down session leak problems.
60	stack []byte
61}
62
63// recycle gives the inner session object back to its home session pool. It is
64// safe to call recycle multiple times but only the first one would take effect.
65func (sh *sessionHandle) recycle() {
66	sh.mu.Lock()
67	if sh.session == nil {
68		// sessionHandle has already been recycled.
69		sh.mu.Unlock()
70		return
71	}
72	p := sh.session.pool
73	tracked := sh.trackedSessionHandle
74	sh.session.recycle()
75	sh.session = nil
76	sh.trackedSessionHandle = nil
77	sh.checkoutTime = time.Time{}
78	sh.stack = nil
79	sh.mu.Unlock()
80	if tracked != nil {
81		p.mu.Lock()
82		p.trackedSessionHandles.Remove(tracked)
83		p.mu.Unlock()
84	}
85}
86
87// getID gets the Cloud Spanner session ID from the internal session object.
88// getID returns empty string if the sessionHandle is nil or the inner session
89// object has been released by recycle / destroy.
90func (sh *sessionHandle) getID() string {
91	sh.mu.Lock()
92	defer sh.mu.Unlock()
93	if sh.session == nil {
94		// sessionHandle has already been recycled/destroyed.
95		return ""
96	}
97	return sh.session.getID()
98}
99
100// getClient gets the Cloud Spanner RPC client associated with the session ID
101// in sessionHandle.
102func (sh *sessionHandle) getClient() *vkit.Client {
103	sh.mu.Lock()
104	defer sh.mu.Unlock()
105	if sh.session == nil {
106		return nil
107	}
108	return sh.session.client
109}
110
111// getMetadata returns the metadata associated with the session in sessionHandle.
112func (sh *sessionHandle) getMetadata() metadata.MD {
113	sh.mu.Lock()
114	defer sh.mu.Unlock()
115	if sh.session == nil {
116		return nil
117	}
118	return sh.session.md
119}
120
121// getTransactionID returns the transaction id in the session if available.
122func (sh *sessionHandle) getTransactionID() transactionID {
123	sh.mu.Lock()
124	defer sh.mu.Unlock()
125	if sh.session == nil {
126		return nil
127	}
128	return sh.session.tx
129}
130
131// destroy destroys the inner session object. It is safe to call destroy
132// multiple times and only the first call would attempt to
133// destroy the inner session object.
134func (sh *sessionHandle) destroy() {
135	sh.mu.Lock()
136	s := sh.session
137	if s == nil {
138		// sessionHandle has already been recycled.
139		sh.mu.Unlock()
140		return
141	}
142	tracked := sh.trackedSessionHandle
143	sh.session = nil
144	sh.trackedSessionHandle = nil
145	sh.checkoutTime = time.Time{}
146	sh.stack = nil
147	sh.mu.Unlock()
148
149	if tracked != nil {
150		p := s.pool
151		p.mu.Lock()
152		p.trackedSessionHandles.Remove(tracked)
153		p.mu.Unlock()
154	}
155	s.destroy(false)
156}
157
158// session wraps a Cloud Spanner session ID through which transactions are
159// created and executed.
160type session struct {
161	// client is the RPC channel to Cloud Spanner. It is set only once during
162	// session's creation.
163	client *vkit.Client
164	// id is the unique id of the session in Cloud Spanner. It is set only once
165	// during session's creation.
166	id string
167	// pool is the session's home session pool where it was created. It is set
168	// only once during session's creation.
169	pool *sessionPool
170	// createTime is the timestamp of the session's creation. It is set only
171	// once during session's creation.
172	createTime time.Time
173	// logger is the logger configured for the Spanner client that created the
174	// session. If nil, logging will be directed to the standard logger.
175	logger *log.Logger
176
177	// mu protects the following fields from concurrent access: both
178	// healthcheck workers and transactions can modify them.
179	mu sync.Mutex
180	// valid marks the validity of a session.
181	valid bool
182	// hcIndex is the index of the session inside the global healthcheck queue.
183	// If hcIndex < 0, session has been unregistered from the queue.
184	hcIndex int
185	// idleList is the linkedlist node which links the session to its home
186	// session pool's idle list. If idleList == nil, the
187	// session is not in idle list.
188	idleList *list.Element
189	// nextCheck is the timestamp of next scheduled healthcheck of the session.
190	// It is maintained by the global health checker.
191	nextCheck time.Time
192	// checkingHelath is true if currently this session is being processed by
193	// health checker. Must be modified under health checker lock.
194	checkingHealth bool
195	// md is the Metadata to be sent with each request.
196	md metadata.MD
197	// tx contains the transaction id if the session has been prepared for
198	// write.
199	tx transactionID
200	// firstHCDone indicates whether the first health check is done or not.
201	firstHCDone bool
202}
203
204// isValid returns true if the session is still valid for use.
205func (s *session) isValid() bool {
206	s.mu.Lock()
207	defer s.mu.Unlock()
208	return s.valid
209}
210
211// isWritePrepared returns true if the session is prepared for write.
212func (s *session) isWritePrepared() bool {
213	s.mu.Lock()
214	defer s.mu.Unlock()
215	return s.tx != nil
216}
217
218// String implements fmt.Stringer for session.
219func (s *session) String() string {
220	s.mu.Lock()
221	defer s.mu.Unlock()
222	return fmt.Sprintf("<id=%v, hcIdx=%v, idleList=%p, valid=%v, create=%v, nextcheck=%v>",
223		s.id, s.hcIndex, s.idleList, s.valid, s.createTime, s.nextCheck)
224}
225
226// ping verifies if the session is still alive in Cloud Spanner.
227func (s *session) ping() error {
228	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
229	defer cancel()
230	// s.getID is safe even when s is invalid.
231	_, err := s.client.GetSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.GetSessionRequest{Name: s.getID()})
232	return err
233}
234
235// setHcIndex atomically sets the session's index in the healthcheck queue and
236// returns the old index.
237func (s *session) setHcIndex(i int) int {
238	s.mu.Lock()
239	defer s.mu.Unlock()
240	oi := s.hcIndex
241	s.hcIndex = i
242	return oi
243}
244
245// setIdleList atomically sets the session's idle list link and returns the old
246// link.
247func (s *session) setIdleList(le *list.Element) *list.Element {
248	s.mu.Lock()
249	defer s.mu.Unlock()
250	old := s.idleList
251	s.idleList = le
252	return old
253}
254
255// invalidate marks a session as invalid and returns the old validity.
256func (s *session) invalidate() bool {
257	s.mu.Lock()
258	defer s.mu.Unlock()
259	ov := s.valid
260	s.valid = false
261	return ov
262}
263
264// setNextCheck sets the timestamp for next healthcheck on the session.
265func (s *session) setNextCheck(t time.Time) {
266	s.mu.Lock()
267	defer s.mu.Unlock()
268	s.nextCheck = t
269}
270
271// setTransactionID sets the transaction id in the session
272func (s *session) setTransactionID(tx transactionID) {
273	s.mu.Lock()
274	defer s.mu.Unlock()
275	s.tx = tx
276}
277
278// getID returns the session ID which uniquely identifies the session in Cloud
279// Spanner.
280func (s *session) getID() string {
281	s.mu.Lock()
282	defer s.mu.Unlock()
283	return s.id
284}
285
286// getHcIndex returns the session's index into the global healthcheck priority
287// queue.
288func (s *session) getHcIndex() int {
289	s.mu.Lock()
290	defer s.mu.Unlock()
291	return s.hcIndex
292}
293
294// getIdleList returns the session's link in its home session pool's idle list.
295func (s *session) getIdleList() *list.Element {
296	s.mu.Lock()
297	defer s.mu.Unlock()
298	return s.idleList
299}
300
301// getNextCheck returns the timestamp for next healthcheck on the session.
302func (s *session) getNextCheck() time.Time {
303	s.mu.Lock()
304	defer s.mu.Unlock()
305	return s.nextCheck
306}
307
308// recycle turns the session back to its home session pool.
309func (s *session) recycle() {
310	s.setTransactionID(nil)
311	if !s.pool.recycle(s) {
312		// s is rejected by its home session pool because it expired and the
313		// session pool currently has enough open sessions.
314		s.destroy(false)
315	}
316}
317
318// destroy removes the session from its home session pool, healthcheck queue
319// and Cloud Spanner service.
320func (s *session) destroy(isExpire bool) bool {
321	// Remove s from session pool.
322	if !s.pool.remove(s, isExpire) {
323		return false
324	}
325	// Unregister s from healthcheck queue.
326	s.pool.hc.unregister(s)
327	// Remove s from Cloud Spanner service.
328	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
329	defer cancel()
330	s.delete(ctx)
331	return true
332}
333
334func (s *session) delete(ctx context.Context) {
335	// Ignore the error because even if we fail to explicitly destroy the
336	// session, it will be eventually garbage collected by Cloud Spanner.
337	err := s.client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: s.getID()})
338	if err != nil {
339		logf(s.logger, "Failed to delete session %v. Error: %v", s.getID(), err)
340	}
341}
342
343// prepareForWrite prepares the session for write if it is not already in that
344// state.
345func (s *session) prepareForWrite(ctx context.Context) error {
346	if s.isWritePrepared() {
347		return nil
348	}
349	tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, s.md), s.getID(), s.client)
350	// Session not found should cause the session to be removed from the pool.
351	if isSessionNotFoundError(err) {
352		s.pool.remove(s, false)
353		s.pool.hc.unregister(s)
354		return err
355	}
356	// Enable/disable background preparing of write sessions depending on
357	// whether the BeginTransaction call succeeded. This will prevent the
358	// session pool workers from going into an infinite loop of trying to
359	// prepare sessions. Any subsequent successful BeginTransaction call from
360	// for example takeWriteSession will re-enable the background process.
361	s.pool.mu.Lock()
362	s.pool.disableBackgroundPrepareSessions = err != nil
363	s.pool.mu.Unlock()
364	if err != nil {
365		return err
366	}
367	s.setTransactionID(tx)
368	return nil
369}
370
371// SessionPoolConfig stores configurations of a session pool.
372type SessionPoolConfig struct {
373	// MaxOpened is the maximum number of opened sessions allowed by the session
374	// pool. If the client tries to open a session and there are already
375	// MaxOpened sessions, it will block until one becomes available or the
376	// context passed to the client method is canceled or times out.
377	//
378	// Defaults to NumChannels * 100.
379	MaxOpened uint64
380
381	// MinOpened is the minimum number of opened sessions that the session pool
382	// tries to maintain. Session pool won't continue to expire sessions if
383	// number of opened connections drops below MinOpened. However, if a session
384	// is found to be broken, it will still be evicted from the session pool,
385	// therefore it is posssible that the number of opened sessions drops below
386	// MinOpened.
387	//
388	// Defaults to 100.
389	MinOpened uint64
390
391	// MaxIdle is the maximum number of idle sessions, pool is allowed to keep.
392	//
393	// Defaults to 0.
394	MaxIdle uint64
395
396	// MaxBurst is the maximum number of concurrent session creation requests.
397	//
398	// Defaults to 10.
399	MaxBurst uint64
400
401	// WriteSessions is the fraction of sessions we try to keep prepared for
402	// write.
403	//
404	// Defaults to 0.2.
405	WriteSessions float64
406
407	// HealthCheckWorkers is number of workers used by health checker for this
408	// pool.
409	//
410	// Defaults to 10.
411	HealthCheckWorkers int
412
413	// HealthCheckInterval is how often the health checker pings a session.
414	//
415	// Defaults to 5m.
416	HealthCheckInterval time.Duration
417
418	// TrackSessionHandles determines whether the session pool will keep track
419	// of the stacktrace of the goroutines that take sessions from the pool.
420	// This setting can be used to track down session leak problems.
421	//
422	// Defaults to false.
423	TrackSessionHandles bool
424
425	// healthCheckSampleInterval is how often the health checker samples live
426	// session (for use in maintaining session pool size).
427	//
428	// Defaults to 1m.
429	healthCheckSampleInterval time.Duration
430
431	// sessionLabels for the sessions created in the session pool.
432	sessionLabels map[string]string
433}
434
435// DefaultSessionPoolConfig is the default configuration for the session pool
436// that will be used for a Spanner client, unless the user supplies a specific
437// session pool config.
438var DefaultSessionPoolConfig = SessionPoolConfig{
439	MinOpened:           100,
440	MaxOpened:           numChannels * 100,
441	MaxBurst:            10,
442	WriteSessions:       0.2,
443	HealthCheckWorkers:  10,
444	HealthCheckInterval: healthCheckIntervalMins * time.Minute,
445}
446
447// errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set.
448func errMinOpenedGTMaxOpened(maxOpened, minOpened uint64) error {
449	return spannerErrorf(codes.InvalidArgument,
450		"require SessionPoolConfig.MaxOpened >= SessionPoolConfig.MinOpened, got %d and %d", maxOpened, minOpened)
451}
452
453// errWriteFractionOutOfRange returns error for
454// SessionPoolConfig.WriteFraction < 0 or SessionPoolConfig.WriteFraction > 1
455func errWriteFractionOutOfRange(writeFraction float64) error {
456	return spannerErrorf(codes.InvalidArgument,
457		"require SessionPoolConfig.WriteSessions >= 0.0 && SessionPoolConfig.WriteSessions <= 1.0, got %.2f", writeFraction)
458}
459
460// errHealthCheckWorkersNegative returns error for
461// SessionPoolConfig.HealthCheckWorkers < 0
462func errHealthCheckWorkersNegative(workers int) error {
463	return spannerErrorf(codes.InvalidArgument,
464		"require SessionPoolConfig.HealthCheckWorkers >= 0, got %d", workers)
465}
466
467// errHealthCheckIntervalNegative returns error for
468// SessionPoolConfig.HealthCheckInterval < 0
469func errHealthCheckIntervalNegative(interval time.Duration) error {
470	return spannerErrorf(codes.InvalidArgument,
471		"require SessionPoolConfig.HealthCheckInterval >= 0, got %v", interval)
472}
473
474// validate verifies that the SessionPoolConfig is good for use.
475func (spc *SessionPoolConfig) validate() error {
476	if spc.MinOpened > spc.MaxOpened && spc.MaxOpened > 0 {
477		return errMinOpenedGTMaxOpened(spc.MaxOpened, spc.MinOpened)
478	}
479	if spc.WriteSessions < 0.0 || spc.WriteSessions > 1.0 {
480		return errWriteFractionOutOfRange(spc.WriteSessions)
481	}
482	if spc.HealthCheckWorkers < 0 {
483		return errHealthCheckWorkersNegative(spc.HealthCheckWorkers)
484	}
485	if spc.HealthCheckInterval < 0 {
486		return errHealthCheckIntervalNegative(spc.HealthCheckInterval)
487	}
488	return nil
489}
490
491// sessionPool creates and caches Cloud Spanner sessions.
492type sessionPool struct {
493	// mu protects sessionPool from concurrent access.
494	mu sync.Mutex
495	// valid marks the validity of the session pool.
496	valid bool
497	// sc is used to create the sessions for the pool.
498	sc *sessionClient
499	// trackedSessionHandles contains all sessions handles that have been
500	// checked out of the pool. The list is only filled if TrackSessionHandles
501	// has been enabled.
502	trackedSessionHandles list.List
503	// idleList caches idle session IDs. Session IDs in this list can be
504	// allocated for use.
505	idleList list.List
506	// idleWriteList caches idle sessions which have been prepared for write.
507	idleWriteList list.List
508	// mayGetSession is for broadcasting that session retrival/creation may
509	// proceed.
510	mayGetSession chan struct{}
511	// numOpened is the total number of open sessions from the session pool.
512	numOpened uint64
513	// createReqs is the number of ongoing session creation requests.
514	createReqs uint64
515	// prepareReqs is the number of ongoing session preparation request.
516	prepareReqs uint64
517	// disableBackgroundPrepareSessions indicates that the BeginTransaction
518	// call for a read/write transaction failed with a permanent error, such as
519	// PermissionDenied or `Database not found`. Further background calls to
520	// prepare sessions will be disabled.
521	disableBackgroundPrepareSessions bool
522	// configuration of the session pool.
523	SessionPoolConfig
524	// hc is the health checker
525	hc *healthChecker
526	// rand is a separately sourced random generator.
527	rand *rand.Rand
528	// numInUse is the number of sessions that are currently in use (checked out
529	// from the session pool).
530	numInUse uint64
531	// maxNumInUse is the maximum number of sessions in use concurrently in the
532	// current 10 minute interval.
533	maxNumInUse uint64
534	// lastResetTime is the start time of the window for recording maxNumInUse.
535	lastResetTime time.Time
536
537	// mw is the maintenance window containing statistics for the max number of
538	// sessions checked out of the pool during the last 10 minutes.
539	mw *maintenanceWindow
540
541	// tagMap is a map of all tags that are associated with the emitted metrics.
542	tagMap *tag.Map
543}
544
545// newSessionPool creates a new session pool.
546func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, error) {
547	if err := config.validate(); err != nil {
548		return nil, err
549	}
550	pool := &sessionPool{
551		sc:                sc,
552		valid:             true,
553		mayGetSession:     make(chan struct{}),
554		SessionPoolConfig: config,
555		mw:                newMaintenanceWindow(config.MaxOpened),
556		rand:              rand.New(rand.NewSource(time.Now().UnixNano())),
557	}
558	if config.HealthCheckWorkers == 0 {
559		// With 10 workers and assuming average latency of 5ms for
560		// BeginTransaction, we will be able to prepare 2000 tx/sec in advance.
561		// If the rate of takeWriteSession is more than that, it will degrade to
562		// doing BeginTransaction inline.
563		//
564		// TODO: consider resizing the worker pool dynamically according to the load.
565		config.HealthCheckWorkers = 10
566	}
567	if config.HealthCheckInterval == 0 {
568		config.HealthCheckInterval = healthCheckIntervalMins * time.Minute
569	}
570	if config.healthCheckSampleInterval == 0 {
571		config.healthCheckSampleInterval = time.Minute
572	}
573
574	_, instance, database, err := parseDatabaseName(sc.database)
575	if err != nil {
576		return nil, err
577	}
578	// Errors should not prevent initializing the session pool.
579	ctx, err := tag.New(context.Background(),
580		tag.Upsert(tagClientID, sc.id),
581		tag.Upsert(tagDatabase, database),
582		tag.Upsert(tagInstance, instance),
583		tag.Upsert(tagLibVersion, version.Repo),
584	)
585	if err != nil {
586		logf(pool.sc.logger, "Failed to create tag map, error: %v", err)
587	}
588	pool.tagMap = tag.FromContext(ctx)
589
590	// On GCE VM, within the same region an healthcheck ping takes on average
591	// 10ms to finish, given a 5 minutes interval and 10 healthcheck workers, a
592	// healthChecker can effectively mantain
593	// 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions.
594	pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, config.healthCheckSampleInterval, pool)
595
596	// First initialize the pool before we indicate that the healthchecker is
597	// ready. This prevents the maintainer from starting before the pool has
598	// been initialized, which means that we guarantee that the initial
599	// sessions are created using BatchCreateSessions.
600	if config.MinOpened > 0 {
601		numSessions := minUint64(config.MinOpened, math.MaxInt32)
602		if err := pool.initPool(int32(numSessions)); err != nil {
603			return nil, err
604		}
605	}
606	pool.recordStat(context.Background(), MaxAllowedSessionsCount, int64(config.MaxOpened))
607	close(pool.hc.ready)
608	return pool, nil
609}
610
611func (p *sessionPool) recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
612	ctx = tag.NewContext(ctx, p.tagMap)
613	recordStat(ctx, m, n)
614}
615
616func (p *sessionPool) initPool(numSessions int32) error {
617	p.mu.Lock()
618	// Take budget before the actual session creation.
619	p.numOpened += uint64(numSessions)
620	p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
621	p.createReqs += uint64(numSessions)
622	p.mu.Unlock()
623	// Asynchronously create the initial sessions for the pool.
624	return p.sc.batchCreateSessions(numSessions, p)
625}
626
627// sessionReady is executed by the SessionClient when a session has been
628// created and is ready to use. This method will add the new session to the
629// pool and decrease the number of sessions that is being created.
630func (p *sessionPool) sessionReady(s *session) {
631	p.mu.Lock()
632	defer p.mu.Unlock()
633	// Set this pool as the home pool of the session and register it with the
634	// health checker.
635	s.pool = p
636	p.hc.register(s)
637	p.createReqs--
638	// Insert the session at a random position in the pool to prevent all
639	// sessions affiliated with a channel to be placed at sequentially in the
640	// pool.
641	if p.idleList.Len() > 0 {
642		pos := rand.Intn(p.idleList.Len())
643		before := p.idleList.Front()
644		for i := 0; i < pos; i++ {
645			before = before.Next()
646		}
647		s.setIdleList(p.idleList.InsertBefore(s, before))
648	} else {
649		s.setIdleList(p.idleList.PushBack(s))
650	}
651	// Notify other waiters blocking on session creation.
652	close(p.mayGetSession)
653	p.mayGetSession = make(chan struct{})
654}
655
656// sessionCreationFailed is called by the SessionClient when the creation of one
657// or more requested sessions finished with an error. sessionCreationFailed will
658// decrease the number of sessions being created and notify any waiters that
659// the session creation failed.
660func (p *sessionPool) sessionCreationFailed(err error, numSessions int32) {
661	p.mu.Lock()
662	defer p.mu.Unlock()
663	p.createReqs -= uint64(numSessions)
664	p.numOpened -= uint64(numSessions)
665	p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
666	// Notify other waiters blocking on session creation.
667	close(p.mayGetSession)
668	p.mayGetSession = make(chan struct{})
669}
670
671// isValid checks if the session pool is still valid.
672func (p *sessionPool) isValid() bool {
673	if p == nil {
674		return false
675	}
676	p.mu.Lock()
677	defer p.mu.Unlock()
678	return p.valid
679}
680
681// close marks the session pool as closed.
682func (p *sessionPool) close() {
683	if p == nil {
684		return
685	}
686	p.mu.Lock()
687	if !p.valid {
688		p.mu.Unlock()
689		return
690	}
691	p.valid = false
692	p.mu.Unlock()
693	p.hc.close()
694	// destroy all the sessions
695	p.hc.mu.Lock()
696	allSessions := make([]*session, len(p.hc.queue.sessions))
697	copy(allSessions, p.hc.queue.sessions)
698	p.hc.mu.Unlock()
699	for _, s := range allSessions {
700		s.destroy(false)
701	}
702}
703
704// errInvalidSessionPool is the error for using an invalid session pool.
705var errInvalidSessionPool = spannerErrorf(codes.InvalidArgument, "invalid session pool")
706
707// errGetSessionTimeout returns error for context timeout during
708// sessionPool.take().
709var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canceled during getting session")
710
711// newSessionHandle creates a new session handle for the given session for this
712// session pool. The session handle will also hold a copy of the current call
713// stack if the session pool has been configured to track the call stacks of
714// sessions being checked out of the pool.
715func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) {
716	sh = &sessionHandle{session: s, checkoutTime: time.Now()}
717	if p.TrackSessionHandles {
718		p.mu.Lock()
719		sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh)
720		p.mu.Unlock()
721		sh.stack = debug.Stack()
722	}
723	return sh
724}
725
726// errGetSessionTimeout returns error for context timeout during
727// sessionPool.take().
728func (p *sessionPool) errGetSessionTimeout() error {
729	if p.TrackSessionHandles {
730		return p.errGetSessionTimeoutWithTrackedSessionHandles()
731	}
732	return p.errGetBasicSessionTimeout()
733}
734
735// errGetBasicSessionTimeout returns error for context timout during
736// sessionPool.take() without any tracked sessionHandles.
737func (p *sessionPool) errGetBasicSessionTimeout() error {
738	return spannerErrorf(codes.Canceled, "timeout / context canceled during getting session.\n"+
739		"Enable SessionPoolConfig.TrackSessionHandles if you suspect a session leak to get more information about the checked out sessions.")
740}
741
742// errGetSessionTimeoutWithTrackedSessionHandles returns error for context
743// timout during sessionPool.take() including a stacktrace of each checked out
744// session handle.
745func (p *sessionPool) errGetSessionTimeoutWithTrackedSessionHandles() error {
746	err := spannerErrorf(codes.Canceled, "timeout / context canceled during getting session.")
747	err.(*Error).additionalInformation = p.getTrackedSessionHandleStacksLocked()
748	return err
749}
750
751// getTrackedSessionHandleStacksLocked returns a string containing the
752// stacktrace of all currently checked out sessions of the pool. This method
753// requires the caller to have locked p.mu.
754func (p *sessionPool) getTrackedSessionHandleStacksLocked() string {
755	p.mu.Lock()
756	defer p.mu.Unlock()
757	stackTraces := ""
758	i := 1
759	element := p.trackedSessionHandles.Front()
760	for element != nil {
761		sh := element.Value.(*sessionHandle)
762		sh.mu.Lock()
763		if sh.stack != nil {
764			stackTraces = fmt.Sprintf("%s\n\nSession %d checked out of pool at %s by goroutine:\n%s", stackTraces, i, sh.checkoutTime.Format(time.RFC3339), sh.stack)
765		}
766		sh.mu.Unlock()
767		element = element.Next()
768		i++
769	}
770	return stackTraces
771}
772
773// shouldPrepareWriteLocked returns true if we should prepare more sessions for write.
774func (p *sessionPool) shouldPrepareWriteLocked() bool {
775	return !p.disableBackgroundPrepareSessions && float64(p.numOpened)*p.WriteSessions > float64(p.idleWriteList.Len()+int(p.prepareReqs))
776}
777
778func (p *sessionPool) createSession(ctx context.Context) (*session, error) {
779	trace.TracePrintf(ctx, nil, "Creating a new session")
780	doneCreate := func(done bool) {
781		p.mu.Lock()
782		if !done {
783			// Session creation failed, give budget back.
784			p.numOpened--
785			p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
786		}
787		p.createReqs--
788		// Notify other waiters blocking on session creation.
789		close(p.mayGetSession)
790		p.mayGetSession = make(chan struct{})
791		p.mu.Unlock()
792	}
793	s, err := p.sc.createSession(ctx)
794	if err != nil {
795		doneCreate(false)
796		// Should return error directly because of the previous retries on
797		// CreateSession RPC.
798		// If the error is a timeout, there is a chance that the session was
799		// created on the server but is not known to the session pool. This
800		// session will then be garbage collected by the server after 1 hour.
801		return nil, err
802	}
803	s.pool = p
804	p.hc.register(s)
805	doneCreate(true)
806	return s, nil
807}
808
809func (p *sessionPool) isHealthy(s *session) bool {
810	if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) {
811		// TODO: figure out if we need to schedule a new healthcheck worker here.
812		if err := s.ping(); isSessionNotFoundError(err) {
813			// The session is already bad, continue to fetch/create a new one.
814			s.destroy(false)
815			return false
816		}
817		p.hc.scheduledHC(s)
818	}
819	return true
820}
821
822// take returns a cached session if there are available ones; if there isn't
823// any, it tries to allocate a new one. Session returned by take should be used
824// for read operations.
825func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
826	trace.TracePrintf(ctx, nil, "Acquiring a read-only session")
827	for {
828		var (
829			s   *session
830			err error
831		)
832
833		p.mu.Lock()
834		if !p.valid {
835			p.mu.Unlock()
836			return nil, errInvalidSessionPool
837		}
838		if p.idleList.Len() > 0 {
839			// Idle sessions are available, get one from the top of the idle
840			// list.
841			s = p.idleList.Remove(p.idleList.Front()).(*session)
842			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
843				"Acquired read-only session")
844		} else if p.idleWriteList.Len() > 0 {
845			s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session)
846			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
847				"Acquired read-write session")
848		}
849		if s != nil {
850			s.setIdleList(nil)
851			numCheckedOut := p.currSessionsCheckedOutLocked()
852			p.mu.Unlock()
853			p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
854			// From here, session is no longer in idle list, so healthcheck
855			// workers won't destroy it. If healthcheck workers failed to
856			// schedule healthcheck for the session timely, do the check here.
857			// Because session check is still much cheaper than session
858			// creation, they should be reused as much as possible.
859			if !p.isHealthy(s) {
860				continue
861			}
862			p.incNumInUse(ctx)
863			return p.newSessionHandle(s), nil
864		}
865
866		// Idle list is empty, block if session pool has reached max session
867		// creation concurrency or max number of open sessions.
868		if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) {
869			mayGetSession := p.mayGetSession
870			p.mu.Unlock()
871			trace.TracePrintf(ctx, nil, "Waiting for read-only session to become available")
872			select {
873			case <-ctx.Done():
874				trace.TracePrintf(ctx, nil, "Context done waiting for session")
875				p.recordStat(ctx, GetSessionTimeoutsCount, 1)
876				return nil, p.errGetSessionTimeout()
877			case <-mayGetSession:
878			}
879			continue
880		}
881
882		// Take budget before the actual session creation.
883		p.numOpened++
884		// Creating a new session that will be returned directly to the client
885		// means that the max number of sessions in use also increases.
886		numCheckedOut := p.currSessionsCheckedOutLocked()
887		p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
888		p.createReqs++
889		p.mu.Unlock()
890		p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
891		if s, err = p.createSession(ctx); err != nil {
892			trace.TracePrintf(ctx, nil, "Error creating session: %v", err)
893			return nil, toSpannerError(err)
894		}
895		trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
896			"Created session")
897		p.incNumInUse(ctx)
898		return p.newSessionHandle(s), nil
899	}
900}
901
902// takeWriteSession returns a write prepared cached session if there are
903// available ones; if there isn't any, it tries to allocate a new one. Session
904// returned should be used for read write transactions.
905func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) {
906	trace.TracePrintf(ctx, nil, "Acquiring a read-write session")
907	for {
908		var (
909			s   *session
910			err error
911		)
912
913		p.mu.Lock()
914		if !p.valid {
915			p.mu.Unlock()
916			return nil, errInvalidSessionPool
917		}
918		if p.idleWriteList.Len() > 0 {
919			// Idle sessions are available, get one from the top of the idle
920			// list.
921			s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session)
922			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-write session")
923		} else if p.idleList.Len() > 0 {
924			s = p.idleList.Remove(p.idleList.Front()).(*session)
925			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-only session")
926		}
927		if s != nil {
928			s.setIdleList(nil)
929			numCheckedOut := p.currSessionsCheckedOutLocked()
930			p.mu.Unlock()
931			p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
932			// From here, session is no longer in idle list, so healthcheck
933			// workers won't destroy it. If healthcheck workers failed to
934			// schedule healthcheck for the session timely, do the check here.
935			// Because session check is still much cheaper than session
936			// creation, they should be reused as much as possible.
937			if !p.isHealthy(s) {
938				continue
939			}
940		} else {
941			// Idle list is empty, block if session pool has reached max session
942			// creation concurrency or max number of open sessions.
943			if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) {
944				mayGetSession := p.mayGetSession
945				p.mu.Unlock()
946				trace.TracePrintf(ctx, nil, "Waiting for read-write session to become available")
947				select {
948				case <-ctx.Done():
949					trace.TracePrintf(ctx, nil, "Context done waiting for session")
950					p.recordStat(ctx, GetSessionTimeoutsCount, 1)
951					return nil, p.errGetSessionTimeout()
952				case <-mayGetSession:
953				}
954				continue
955			}
956
957			// Take budget before the actual session creation.
958			p.numOpened++
959			// Creating a new session that will be returned directly to the client
960			// means that the max number of sessions in use also increases.
961			numCheckedOut := p.currSessionsCheckedOutLocked()
962			p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
963			p.createReqs++
964			p.mu.Unlock()
965			p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
966			if s, err = p.createSession(ctx); err != nil {
967				trace.TracePrintf(ctx, nil, "Error creating session: %v", err)
968				return nil, toSpannerError(err)
969			}
970			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
971				"Created session")
972		}
973		if !s.isWritePrepared() {
974			if err = s.prepareForWrite(ctx); err != nil {
975				if isSessionNotFoundError(err) {
976					s.destroy(false)
977					trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
978						"Session not found for write")
979					return nil, toSpannerError(err)
980				}
981
982				s.recycle()
983				trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
984					"Error preparing session for write")
985				return nil, toSpannerError(err)
986			}
987		}
988		p.incNumInUse(ctx)
989		return p.newSessionHandle(s), nil
990	}
991}
992
993// recycle puts session s back to the session pool's idle list, it returns true
994// if the session pool successfully recycles session s.
995func (p *sessionPool) recycle(s *session) bool {
996	p.mu.Lock()
997	defer p.mu.Unlock()
998	if !s.isValid() || !p.valid {
999		// Reject the session if session is invalid or pool itself is invalid.
1000		return false
1001	}
1002	// Put session at the top of the list to be handed out in LIFO order for load balancing
1003	// across channels.
1004	if s.isWritePrepared() {
1005		s.setIdleList(p.idleWriteList.PushFront(s))
1006	} else {
1007		s.setIdleList(p.idleList.PushFront(s))
1008	}
1009	// Broadcast that a session has been returned to idle list.
1010	close(p.mayGetSession)
1011	p.mayGetSession = make(chan struct{})
1012	p.numInUse--
1013	p.recordStat(context.Background(), InUseSessionsCount, int64(p.numInUse))
1014	p.recordStat(context.Background(), ReleasedSessionsCount, 1)
1015	return true
1016}
1017
1018// remove atomically removes session s from the session pool and invalidates s.
1019// If isExpire == true, the removal is triggered by session expiration and in
1020// such cases, only idle sessions can be removed.
1021func (p *sessionPool) remove(s *session, isExpire bool) bool {
1022	p.mu.Lock()
1023	defer p.mu.Unlock()
1024	if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) {
1025		// Don't expire session if the session is not in idle list (in use), or
1026		// if number of open sessions is going below p.MinOpened.
1027		return false
1028	}
1029	ol := s.setIdleList(nil)
1030	// If the session is in the idlelist, remove it.
1031	if ol != nil {
1032		// Remove from whichever list it is in.
1033		p.idleList.Remove(ol)
1034		p.idleWriteList.Remove(ol)
1035	}
1036	if s.invalidate() {
1037		// Decrease the number of opened sessions.
1038		p.numOpened--
1039		p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
1040		// Broadcast that a session has been destroyed.
1041		close(p.mayGetSession)
1042		p.mayGetSession = make(chan struct{})
1043		return true
1044	}
1045	return false
1046}
1047
1048func (p *sessionPool) currSessionsCheckedOutLocked() uint64 {
1049	return p.numOpened - uint64(p.idleList.Len()) - uint64(p.idleWriteList.Len())
1050}
1051
1052func (p *sessionPool) incNumInUse(ctx context.Context) {
1053	p.mu.Lock()
1054	p.incNumInUseLocked(ctx)
1055	p.mu.Unlock()
1056}
1057
1058func (p *sessionPool) incNumInUseLocked(ctx context.Context) {
1059	p.numInUse++
1060	p.recordStat(ctx, InUseSessionsCount, int64(p.numInUse))
1061	p.recordStat(ctx, AcquiredSessionsCount, 1)
1062	if p.numInUse > p.maxNumInUse {
1063		p.maxNumInUse = p.numInUse
1064		p.recordStat(ctx, MaxInUseSessionsCount, int64(p.maxNumInUse))
1065	}
1066}
1067
1068// hcHeap implements heap.Interface. It is used to create the priority queue for
1069// session healthchecks.
1070type hcHeap struct {
1071	sessions []*session
1072}
1073
1074// Len implements heap.Interface.Len.
1075func (h hcHeap) Len() int {
1076	return len(h.sessions)
1077}
1078
1079// Less implements heap.Interface.Less.
1080func (h hcHeap) Less(i, j int) bool {
1081	return h.sessions[i].getNextCheck().Before(h.sessions[j].getNextCheck())
1082}
1083
1084// Swap implements heap.Interface.Swap.
1085func (h hcHeap) Swap(i, j int) {
1086	h.sessions[i], h.sessions[j] = h.sessions[j], h.sessions[i]
1087	h.sessions[i].setHcIndex(i)
1088	h.sessions[j].setHcIndex(j)
1089}
1090
1091// Push implements heap.Interface.Push.
1092func (h *hcHeap) Push(s interface{}) {
1093	ns := s.(*session)
1094	ns.setHcIndex(len(h.sessions))
1095	h.sessions = append(h.sessions, ns)
1096}
1097
1098// Pop implements heap.Interface.Pop.
1099func (h *hcHeap) Pop() interface{} {
1100	old := h.sessions
1101	n := len(old)
1102	s := old[n-1]
1103	h.sessions = old[:n-1]
1104	s.setHcIndex(-1)
1105	return s
1106}
1107
1108// maintenanceWindowSize specifies the number of health check cycles that
1109// defines a maintenance window. The maintenance window keeps track of a
1110// rolling set of numbers for the number of maximum checked out sessions during
1111// the maintenance window. This is used by the maintainer to determine the
1112// number of sessions to create or delete at the end of each health check
1113// cycle.
1114const maintenanceWindowSize = 10
1115
1116// maintenanceWindow contains the statistics that are gathered during a health
1117// check maintenance window.
1118type maintenanceWindow struct {
1119	mu sync.Mutex
1120	// maxSessionsCheckedOut contains the maximum number of sessions that was
1121	// checked out of the session pool during a health check cycle. This number
1122	// indicates the number of sessions that was actually needed by the pool to
1123	// serve the load during that cycle. The values are kept as a rolling set
1124	// containing the values for the past 10 cycles (minutes). The maintainer
1125	// uses these values to determine the number of sessions to keep at the end
1126	// of each cycle.
1127	maxSessionsCheckedOut [maintenanceWindowSize]uint64
1128}
1129
1130// maxSessionsCheckedOutDuringWindow returns the maximum number of sessions
1131// that has been checked out during the last maintenance window of 10 cycles
1132// (minutes).
1133func (mw *maintenanceWindow) maxSessionsCheckedOutDuringWindow() uint64 {
1134	mw.mu.Lock()
1135	defer mw.mu.Unlock()
1136	var max uint64
1137	for _, cycleMax := range mw.maxSessionsCheckedOut {
1138		max = maxUint64(max, cycleMax)
1139	}
1140	return max
1141}
1142
1143// updateMaxSessionsCheckedOutDuringWindow updates the maximum number of
1144// sessions that has been checked out of the pool during the current
1145// cycle of the maintenance window. A maintenance window consists of 10
1146// maintenance cycles. Each cycle keeps track of the max number of sessions in
1147// use during that cycle. The rolling maintenance window of 10 cycles is used
1148// to determine the number of sessions to keep at the end of a cycle by
1149// calculating the max in use during the last 10 cycles.
1150func (mw *maintenanceWindow) updateMaxSessionsCheckedOutDuringWindow(currNumSessionsCheckedOut uint64) {
1151	mw.mu.Lock()
1152	defer mw.mu.Unlock()
1153	mw.maxSessionsCheckedOut[0] = maxUint64(currNumSessionsCheckedOut, mw.maxSessionsCheckedOut[0])
1154}
1155
1156// startNewCycle starts a new health check cycle with the specified number of
1157// checked out sessions as its initial value.
1158func (mw *maintenanceWindow) startNewCycle(currNumSessionsCheckedOut uint64) {
1159	mw.mu.Lock()
1160	defer mw.mu.Unlock()
1161	copy(mw.maxSessionsCheckedOut[1:], mw.maxSessionsCheckedOut[:9])
1162	mw.maxSessionsCheckedOut[0] = currNumSessionsCheckedOut
1163}
1164
1165// newMaintenanceWindow creates a new maintenance window with all values for
1166// maxSessionsCheckedOut set to maxOpened. This ensures that a complete
1167// maintenance window must pass before the maintainer will start to delete any
1168// sessions.
1169func newMaintenanceWindow(maxOpened uint64) *maintenanceWindow {
1170	mw := &maintenanceWindow{}
1171	// Initialize the rolling window with max values to prevent the maintainer
1172	// from deleting sessions before a complete window of 10 cycles has
1173	// finished.
1174	for i := 0; i < maintenanceWindowSize; i++ {
1175		mw.maxSessionsCheckedOut[i] = maxOpened
1176	}
1177	return mw
1178}
1179
1180// healthChecker performs periodical healthchecks on registered sessions.
1181type healthChecker struct {
1182	// mu protects concurrent access to healthChecker.
1183	mu sync.Mutex
1184	// queue is the priority queue for session healthchecks. Sessions with lower
1185	// nextCheck rank higher in the queue.
1186	queue hcHeap
1187	// interval is the average interval between two healthchecks on a session.
1188	interval time.Duration
1189	// workers is the number of concurrent healthcheck workers.
1190	workers int
1191	// waitWorkers waits for all healthcheck workers to exit
1192	waitWorkers sync.WaitGroup
1193	// pool is the underlying session pool.
1194	pool *sessionPool
1195	// sampleInterval is the interval of sampling by the maintainer.
1196	sampleInterval time.Duration
1197	// ready is used to signal that maintainer can start running.
1198	ready chan struct{}
1199	// done is used to signal that health checker should be closed.
1200	done chan struct{}
1201	// once is used for closing channel done only once.
1202	once             sync.Once
1203	maintainerCancel func()
1204}
1205
1206// newHealthChecker initializes new instance of healthChecker.
1207func newHealthChecker(interval time.Duration, workers int, sampleInterval time.Duration, pool *sessionPool) *healthChecker {
1208	if workers <= 0 {
1209		workers = 1
1210	}
1211	hc := &healthChecker{
1212		interval:         interval,
1213		workers:          workers,
1214		pool:             pool,
1215		sampleInterval:   sampleInterval,
1216		ready:            make(chan struct{}),
1217		done:             make(chan struct{}),
1218		maintainerCancel: func() {},
1219	}
1220	hc.waitWorkers.Add(1)
1221	go hc.maintainer()
1222	for i := 1; i <= hc.workers; i++ {
1223		hc.waitWorkers.Add(1)
1224		go hc.worker(i)
1225	}
1226	return hc
1227}
1228
1229// close closes the healthChecker and waits for all healthcheck workers to exit.
1230func (hc *healthChecker) close() {
1231	hc.mu.Lock()
1232	hc.maintainerCancel()
1233	hc.mu.Unlock()
1234	hc.once.Do(func() { close(hc.done) })
1235	hc.waitWorkers.Wait()
1236}
1237
1238// isClosing checks if a healthChecker is already closing.
1239func (hc *healthChecker) isClosing() bool {
1240	select {
1241	case <-hc.done:
1242		return true
1243	default:
1244		return false
1245	}
1246}
1247
1248// getInterval gets the healthcheck interval.
1249func (hc *healthChecker) getInterval() time.Duration {
1250	hc.mu.Lock()
1251	defer hc.mu.Unlock()
1252	return hc.interval
1253}
1254
1255// scheduledHCLocked schedules next healthcheck on session s with the assumption
1256// that hc.mu is being held.
1257func (hc *healthChecker) scheduledHCLocked(s *session) {
1258	var constPart, randPart float64
1259	if !s.firstHCDone {
1260		// The first check will be scheduled in a large range to make requests
1261		// more evenly distributed. The first healthcheck will be scheduled
1262		// after [interval*0.2, interval*1.1) ns.
1263		constPart = float64(hc.interval) * 0.2
1264		randPart = hc.pool.rand.Float64() * float64(hc.interval) * 0.9
1265		s.firstHCDone = true
1266	} else {
1267		// The next healthcheck will be scheduled after
1268		// [interval*0.9, interval*1.1) ns.
1269		constPart = float64(hc.interval) * 0.9
1270		randPart = hc.pool.rand.Float64() * float64(hc.interval) * 0.2
1271	}
1272	// math.Ceil makes the value to be at least 1 ns.
1273	nsFromNow := int64(math.Ceil(constPart + randPart))
1274	s.setNextCheck(time.Now().Add(time.Duration(nsFromNow)))
1275	if hi := s.getHcIndex(); hi != -1 {
1276		// Session is still being tracked by healthcheck workers.
1277		heap.Fix(&hc.queue, hi)
1278	}
1279}
1280
1281// scheduledHC schedules next healthcheck on session s. It is safe to be called
1282// concurrently.
1283func (hc *healthChecker) scheduledHC(s *session) {
1284	hc.mu.Lock()
1285	defer hc.mu.Unlock()
1286	hc.scheduledHCLocked(s)
1287}
1288
1289// register registers a session with healthChecker for periodical healthcheck.
1290func (hc *healthChecker) register(s *session) {
1291	hc.mu.Lock()
1292	defer hc.mu.Unlock()
1293	hc.scheduledHCLocked(s)
1294	heap.Push(&hc.queue, s)
1295}
1296
1297// unregister unregisters a session from healthcheck queue.
1298func (hc *healthChecker) unregister(s *session) {
1299	hc.mu.Lock()
1300	defer hc.mu.Unlock()
1301	oi := s.setHcIndex(-1)
1302	if oi >= 0 {
1303		heap.Remove(&hc.queue, oi)
1304	}
1305}
1306
1307// markDone marks that health check for session has been performed.
1308func (hc *healthChecker) markDone(s *session) {
1309	hc.mu.Lock()
1310	defer hc.mu.Unlock()
1311	s.checkingHealth = false
1312}
1313
1314// healthCheck checks the health of the session and pings it if needed.
1315func (hc *healthChecker) healthCheck(s *session) {
1316	defer hc.markDone(s)
1317	if !s.pool.isValid() {
1318		// Session pool is closed, perform a garbage collection.
1319		s.destroy(false)
1320		return
1321	}
1322	if err := s.ping(); isSessionNotFoundError(err) {
1323		// Ping failed, destroy the session.
1324		s.destroy(false)
1325	}
1326}
1327
1328// worker performs the healthcheck on sessions in healthChecker's priority
1329// queue.
1330func (hc *healthChecker) worker(i int) {
1331	// Returns a session which we should ping to keep it alive.
1332	getNextForPing := func() *session {
1333		hc.pool.mu.Lock()
1334		defer hc.pool.mu.Unlock()
1335		hc.mu.Lock()
1336		defer hc.mu.Unlock()
1337		if hc.queue.Len() <= 0 {
1338			// Queue is empty.
1339			return nil
1340		}
1341		s := hc.queue.sessions[0]
1342		if s.getNextCheck().After(time.Now()) && hc.pool.valid {
1343			// All sessions have been checked recently.
1344			return nil
1345		}
1346		hc.scheduledHCLocked(s)
1347		if !s.checkingHealth {
1348			s.checkingHealth = true
1349			return s
1350		}
1351		return nil
1352	}
1353
1354	// Returns a session which we should prepare for write.
1355	getNextForTx := func() *session {
1356		hc.pool.mu.Lock()
1357		defer hc.pool.mu.Unlock()
1358		if hc.pool.shouldPrepareWriteLocked() {
1359			if hc.pool.idleList.Len() > 0 && hc.pool.valid {
1360				hc.mu.Lock()
1361				defer hc.mu.Unlock()
1362				if hc.pool.idleList.Front().Value.(*session).checkingHealth {
1363					return nil
1364				}
1365				session := hc.pool.idleList.Remove(hc.pool.idleList.Front()).(*session)
1366				session.checkingHealth = true
1367				hc.pool.prepareReqs++
1368				hc.pool.incNumInUseLocked(context.Background())
1369				return session
1370			}
1371		}
1372		return nil
1373	}
1374
1375	for {
1376		if hc.isClosing() {
1377			// Exit when the pool has been closed and all sessions have been
1378			// destroyed or when health checker has been closed.
1379			hc.waitWorkers.Done()
1380			return
1381		}
1382		ws := getNextForTx()
1383		if ws != nil {
1384			ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
1385			err := ws.prepareForWrite(ctx)
1386			cancel()
1387			if err != nil {
1388				// Skip handling prepare error, session can be prepared in next
1389				// cycle.
1390				// Don't log about permission errors, which may be expected
1391				// (e.g. using read-only auth).
1392				serr := toSpannerError(err).(*Error)
1393				if serr.Code != codes.PermissionDenied {
1394					logf(hc.pool.sc.logger, "Failed to prepare session, error: %v", serr)
1395				}
1396			}
1397			hc.pool.recycle(ws)
1398			hc.pool.mu.Lock()
1399			hc.pool.prepareReqs--
1400			hc.pool.mu.Unlock()
1401			hc.markDone(ws)
1402		}
1403		rs := getNextForPing()
1404		if rs == nil {
1405			if ws == nil {
1406				// No work to be done so sleep to avoid burning CPU.
1407				pause := int64(100 * time.Millisecond)
1408				if pause > int64(hc.interval) {
1409					pause = int64(hc.interval)
1410				}
1411				select {
1412				case <-time.After(time.Duration(rand.Int63n(pause) + pause/2)):
1413				case <-hc.done:
1414				}
1415
1416			}
1417			continue
1418		}
1419		hc.healthCheck(rs)
1420	}
1421}
1422
1423// maintainer maintains the number of sessions in the pool based on the session
1424// pool configuration and the current and historical number of sessions checked
1425// out of the pool. The maintainer will:
1426// 1. Ensure that the session pool contains at least MinOpened sessions.
1427// 2. If the current number of sessions in the pool exceeds the greatest number
1428//    of checked out sessions (=sessions in use) during the last 10 minutes,
1429//    and the delta is larger than MaxIdleSessions, the maintainer will reduce
1430//    the number of sessions to maxSessionsInUseDuringWindow+MaxIdleSessions.
1431func (hc *healthChecker) maintainer() {
1432	// Wait until the pool is ready.
1433	<-hc.ready
1434
1435	for iteration := uint64(0); ; iteration++ {
1436		if hc.isClosing() {
1437			hc.waitWorkers.Done()
1438			return
1439		}
1440
1441		hc.pool.mu.Lock()
1442		currSessionsOpened := hc.pool.numOpened
1443		maxIdle := hc.pool.MaxIdle
1444		minOpened := hc.pool.MinOpened
1445
1446		// Reset the start time for recording the maximum number of sessions
1447		// in the pool.
1448		now := time.Now()
1449		if now.After(hc.pool.lastResetTime.Add(10 * time.Minute)) {
1450			hc.pool.maxNumInUse = hc.pool.numInUse
1451			hc.pool.recordStat(context.Background(), MaxInUseSessionsCount, int64(hc.pool.maxNumInUse))
1452			hc.pool.lastResetTime = now
1453		}
1454		hc.pool.mu.Unlock()
1455		// Get the maximum number of sessions in use during the current
1456		// maintenance window.
1457		maxSessionsInUseDuringWindow := hc.pool.mw.maxSessionsCheckedOutDuringWindow()
1458		hc.mu.Lock()
1459		ctx, cancel := context.WithTimeout(context.Background(), hc.sampleInterval)
1460		hc.maintainerCancel = cancel
1461		hc.mu.Unlock()
1462
1463		// Grow or shrink pool if needed.
1464		// The number of sessions in the pool should be in the range
1465		// [Config.MinOpened, Config.MaxIdle+maxSessionsInUseDuringWindow]
1466		if currSessionsOpened < minOpened {
1467			hc.growPool(ctx, minOpened)
1468		} else if maxIdle+maxSessionsInUseDuringWindow < currSessionsOpened {
1469			hc.shrinkPool(ctx, maxIdle+maxSessionsInUseDuringWindow)
1470		}
1471
1472		select {
1473		case <-ctx.Done():
1474		case <-hc.done:
1475			cancel()
1476		}
1477		// Cycle the maintenance window. This will remove the oldest cycle and
1478		// add a new cycle at the beginning of the maintenance window with the
1479		// currently checked out number of sessions as the max number of
1480		// sessions in use in this cycle. This value will be increased during
1481		// the next cycle if it increases.
1482		hc.pool.mu.Lock()
1483		currSessionsInUse := hc.pool.currSessionsCheckedOutLocked()
1484		hc.pool.mu.Unlock()
1485		hc.pool.mw.startNewCycle(currSessionsInUse)
1486	}
1487}
1488
1489// growPool grows the number of sessions in the pool to the specified number of
1490// sessions. It timeouts on sampleInterval.
1491func (hc *healthChecker) growPool(ctx context.Context, growToNumSessions uint64) {
1492	// Calculate the max number of sessions to create as a safeguard against
1493	// other processes that could be deleting sessions concurrently.
1494	hc.pool.mu.Lock()
1495	maxSessionsToCreate := int(growToNumSessions - hc.pool.numOpened)
1496	hc.pool.mu.Unlock()
1497	var created int
1498	for {
1499		if ctx.Err() != nil {
1500			return
1501		}
1502
1503		p := hc.pool
1504		p.mu.Lock()
1505		// Take budget before the actual session creation.
1506		if growToNumSessions <= p.numOpened || created >= maxSessionsToCreate {
1507			p.mu.Unlock()
1508			break
1509		}
1510		p.numOpened++
1511		p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
1512		p.createReqs++
1513		shouldPrepareWrite := p.shouldPrepareWriteLocked()
1514		p.mu.Unlock()
1515		var (
1516			s   *session
1517			err error
1518		)
1519		createContext, cancel := context.WithTimeout(context.Background(), time.Minute)
1520		if s, err = p.createSession(createContext); err != nil {
1521			cancel()
1522			logf(p.sc.logger, "Failed to create session, error: %v", toSpannerError(err))
1523			continue
1524		}
1525		cancel()
1526		created++
1527		if shouldPrepareWrite {
1528			prepareContext, cancel := context.WithTimeout(context.Background(), time.Minute)
1529			if err = s.prepareForWrite(prepareContext); err != nil {
1530				cancel()
1531				p.recycle(s)
1532				// Don't log about permission errors, which may be expected
1533				// (e.g. using read-only auth).
1534				serr := toSpannerError(err).(*Error)
1535				if serr.Code != codes.PermissionDenied {
1536					logf(p.sc.logger, "Failed to prepare session, error: %v", serr)
1537				}
1538				continue
1539			}
1540			cancel()
1541		}
1542		p.recycle(s)
1543	}
1544}
1545
1546// shrinkPool scales down the session pool. The method will stop deleting
1547// sessions when shrinkToNumSessions number of sessions in the pool has
1548// been reached. The method will also stop deleting sessions if it detects that
1549// another process has started creating sessions for the pool again, for
1550// example through the take() method.
1551func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uint64) {
1552	hc.pool.mu.Lock()
1553	maxSessionsToDelete := int(hc.pool.numOpened - shrinkToNumSessions)
1554	hc.pool.mu.Unlock()
1555	var deleted int
1556	var prevNumOpened uint64 = math.MaxUint64
1557	for {
1558		if ctx.Err() != nil {
1559			return
1560		}
1561
1562		p := hc.pool
1563		p.mu.Lock()
1564		// Check if the number of open sessions has increased. If it has, we
1565		// should stop deleting sessions, as the load has increased and
1566		// additional sessions are needed.
1567		if p.numOpened >= prevNumOpened {
1568			p.mu.Unlock()
1569			break
1570		}
1571		prevNumOpened = p.numOpened
1572
1573		// Check on both whether we have reached the number of open sessions as
1574		// well as the number of sessions to delete, in case sessions have been
1575		// deleted by other methods because they have expired or deemed
1576		// invalid.
1577		if shrinkToNumSessions >= p.numOpened || deleted >= maxSessionsToDelete {
1578			p.mu.Unlock()
1579			break
1580		}
1581
1582		var s *session
1583		if p.idleList.Len() > 0 {
1584			s = p.idleList.Front().Value.(*session)
1585		} else if p.idleWriteList.Len() > 0 {
1586			s = p.idleWriteList.Front().Value.(*session)
1587		}
1588		p.mu.Unlock()
1589		if s != nil {
1590			deleted++
1591			// destroy session as expire.
1592			s.destroy(true)
1593		} else {
1594			break
1595		}
1596	}
1597}
1598
1599// maxUint64 returns the maximum of two uint64.
1600func maxUint64(a, b uint64) uint64 {
1601	if a > b {
1602		return a
1603	}
1604	return b
1605}
1606
1607// minUint64 returns the minimum of two uint64.
1608func minUint64(a, b uint64) uint64 {
1609	if a > b {
1610		return b
1611	}
1612	return a
1613}
1614
1615// sessionResourceType is the type name of Spanner sessions.
1616const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session"
1617
1618// isSessionNotFoundError returns true if the given error is a
1619// `Session not found` error.
1620func isSessionNotFoundError(err error) bool {
1621	if err == nil {
1622		return false
1623	}
1624	if ErrCode(err) == codes.NotFound {
1625		if rt, ok := extractResourceType(err); ok {
1626			return rt == sessionResourceType
1627		}
1628	}
1629	return false
1630}
1631