1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"container/heap"
21	"container/list"
22	"context"
23	"fmt"
24	"log"
25	"math"
26	"math/rand"
27	"runtime/debug"
28	"strings"
29	"sync"
30	"time"
31
32	"cloud.google.com/go/internal/trace"
33	vkit "cloud.google.com/go/spanner/apiv1"
34	sppb "google.golang.org/genproto/googleapis/spanner/v1"
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/metadata"
37)
38
39// sessionHandle is an interface for transactions to access Cloud Spanner
40// sessions safely. It is generated by sessionPool.take().
41type sessionHandle struct {
42	// mu guarantees that the inner session object is returned / destroyed only
43	// once.
44	mu sync.Mutex
45	// session is a pointer to a session object. Transactions never need to
46	// access it directly.
47	session *session
48	// checkoutTime is the time the session was checked out of the pool.
49	checkoutTime time.Time
50	// trackedSessionHandle is the linked list node which links the session to
51	// the list of tracked session handles. trackedSessionHandle is only set if
52	// TrackSessionHandles has been enabled in the session pool configuration.
53	trackedSessionHandle *list.Element
54	// stack is the call stack of the goroutine that checked out the session
55	// from the pool. This can be used to track down session leak problems.
56	stack []byte
57}
58
59// recycle gives the inner session object back to its home session pool. It is
60// safe to call recycle multiple times but only the first one would take effect.
61func (sh *sessionHandle) recycle() {
62	sh.mu.Lock()
63	if sh.session == nil {
64		// sessionHandle has already been recycled.
65		sh.mu.Unlock()
66		return
67	}
68	p := sh.session.pool
69	tracked := sh.trackedSessionHandle
70	sh.session.recycle()
71	sh.session = nil
72	sh.trackedSessionHandle = nil
73	sh.checkoutTime = time.Time{}
74	sh.stack = nil
75	sh.mu.Unlock()
76	if tracked != nil {
77		p.mu.Lock()
78		p.trackedSessionHandles.Remove(tracked)
79		p.mu.Unlock()
80	}
81}
82
83// getID gets the Cloud Spanner session ID from the internal session object.
84// getID returns empty string if the sessionHandle is nil or the inner session
85// object has been released by recycle / destroy.
86func (sh *sessionHandle) getID() string {
87	sh.mu.Lock()
88	defer sh.mu.Unlock()
89	if sh.session == nil {
90		// sessionHandle has already been recycled/destroyed.
91		return ""
92	}
93	return sh.session.getID()
94}
95
96// getClient gets the Cloud Spanner RPC client associated with the session ID
97// in sessionHandle.
98func (sh *sessionHandle) getClient() *vkit.Client {
99	sh.mu.Lock()
100	defer sh.mu.Unlock()
101	if sh.session == nil {
102		return nil
103	}
104	return sh.session.client
105}
106
107// getMetadata returns the metadata associated with the session in sessionHandle.
108func (sh *sessionHandle) getMetadata() metadata.MD {
109	sh.mu.Lock()
110	defer sh.mu.Unlock()
111	if sh.session == nil {
112		return nil
113	}
114	return sh.session.md
115}
116
117// getTransactionID returns the transaction id in the session if available.
118func (sh *sessionHandle) getTransactionID() transactionID {
119	sh.mu.Lock()
120	defer sh.mu.Unlock()
121	if sh.session == nil {
122		return nil
123	}
124	return sh.session.tx
125}
126
127// destroy destroys the inner session object. It is safe to call destroy
128// multiple times and only the first call would attempt to
129// destroy the inner session object.
130func (sh *sessionHandle) destroy() {
131	sh.mu.Lock()
132	s := sh.session
133	if s == nil {
134		// sessionHandle has already been recycled.
135		sh.mu.Unlock()
136		return
137	}
138	tracked := sh.trackedSessionHandle
139	sh.session = nil
140	sh.trackedSessionHandle = nil
141	sh.checkoutTime = time.Time{}
142	sh.stack = nil
143	sh.mu.Unlock()
144
145	if tracked != nil {
146		p := s.pool
147		p.mu.Lock()
148		p.trackedSessionHandles.Remove(tracked)
149		p.mu.Unlock()
150	}
151	s.destroy(false)
152}
153
154// session wraps a Cloud Spanner session ID through which transactions are
155// created and executed.
156type session struct {
157	// client is the RPC channel to Cloud Spanner. It is set only once during
158	// session's creation.
159	client *vkit.Client
160	// id is the unique id of the session in Cloud Spanner. It is set only once
161	// during session's creation.
162	id string
163	// pool is the session's home session pool where it was created. It is set
164	// only once during session's creation.
165	pool *sessionPool
166	// createTime is the timestamp of the session's creation. It is set only
167	// once during session's creation.
168	createTime time.Time
169	// logger is the logger configured for the Spanner client that created the
170	// session. If nil, logging will be directed to the standard logger.
171	logger *log.Logger
172
173	// mu protects the following fields from concurrent access: both
174	// healthcheck workers and transactions can modify them.
175	mu sync.Mutex
176	// valid marks the validity of a session.
177	valid bool
178	// hcIndex is the index of the session inside the global healthcheck queue.
179	// If hcIndex < 0, session has been unregistered from the queue.
180	hcIndex int
181	// idleList is the linkedlist node which links the session to its home
182	// session pool's idle list. If idleList == nil, the
183	// session is not in idle list.
184	idleList *list.Element
185	// nextCheck is the timestamp of next scheduled healthcheck of the session.
186	// It is maintained by the global health checker.
187	nextCheck time.Time
188	// checkingHelath is true if currently this session is being processed by
189	// health checker. Must be modified under health checker lock.
190	checkingHealth bool
191	// md is the Metadata to be sent with each request.
192	md metadata.MD
193	// tx contains the transaction id if the session has been prepared for
194	// write.
195	tx transactionID
196}
197
198// isValid returns true if the session is still valid for use.
199func (s *session) isValid() bool {
200	s.mu.Lock()
201	defer s.mu.Unlock()
202	return s.valid
203}
204
205// isWritePrepared returns true if the session is prepared for write.
206func (s *session) isWritePrepared() bool {
207	s.mu.Lock()
208	defer s.mu.Unlock()
209	return s.tx != nil
210}
211
212// String implements fmt.Stringer for session.
213func (s *session) String() string {
214	s.mu.Lock()
215	defer s.mu.Unlock()
216	return fmt.Sprintf("<id=%v, hcIdx=%v, idleList=%p, valid=%v, create=%v, nextcheck=%v>",
217		s.id, s.hcIndex, s.idleList, s.valid, s.createTime, s.nextCheck)
218}
219
220// ping verifies if the session is still alive in Cloud Spanner.
221func (s *session) ping() error {
222	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
223	defer cancel()
224	// s.getID is safe even when s is invalid.
225	_, err := s.client.GetSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.GetSessionRequest{Name: s.getID()})
226	return err
227}
228
229// setHcIndex atomically sets the session's index in the healthcheck queue and
230// returns the old index.
231func (s *session) setHcIndex(i int) int {
232	s.mu.Lock()
233	defer s.mu.Unlock()
234	oi := s.hcIndex
235	s.hcIndex = i
236	return oi
237}
238
239// setIdleList atomically sets the session's idle list link and returns the old
240// link.
241func (s *session) setIdleList(le *list.Element) *list.Element {
242	s.mu.Lock()
243	defer s.mu.Unlock()
244	old := s.idleList
245	s.idleList = le
246	return old
247}
248
249// invalidate marks a session as invalid and returns the old validity.
250func (s *session) invalidate() bool {
251	s.mu.Lock()
252	defer s.mu.Unlock()
253	ov := s.valid
254	s.valid = false
255	return ov
256}
257
258// setNextCheck sets the timestamp for next healthcheck on the session.
259func (s *session) setNextCheck(t time.Time) {
260	s.mu.Lock()
261	defer s.mu.Unlock()
262	s.nextCheck = t
263}
264
265// setTransactionID sets the transaction id in the session
266func (s *session) setTransactionID(tx transactionID) {
267	s.mu.Lock()
268	defer s.mu.Unlock()
269	s.tx = tx
270}
271
272// getID returns the session ID which uniquely identifies the session in Cloud
273// Spanner.
274func (s *session) getID() string {
275	s.mu.Lock()
276	defer s.mu.Unlock()
277	return s.id
278}
279
280// getHcIndex returns the session's index into the global healthcheck priority
281// queue.
282func (s *session) getHcIndex() int {
283	s.mu.Lock()
284	defer s.mu.Unlock()
285	return s.hcIndex
286}
287
288// getIdleList returns the session's link in its home session pool's idle list.
289func (s *session) getIdleList() *list.Element {
290	s.mu.Lock()
291	defer s.mu.Unlock()
292	return s.idleList
293}
294
295// getNextCheck returns the timestamp for next healthcheck on the session.
296func (s *session) getNextCheck() time.Time {
297	s.mu.Lock()
298	defer s.mu.Unlock()
299	return s.nextCheck
300}
301
302// recycle turns the session back to its home session pool.
303func (s *session) recycle() {
304	s.setTransactionID(nil)
305	if !s.pool.recycle(s) {
306		// s is rejected by its home session pool because it expired and the
307		// session pool currently has enough open sessions.
308		s.destroy(false)
309	}
310}
311
312// destroy removes the session from its home session pool, healthcheck queue
313// and Cloud Spanner service.
314func (s *session) destroy(isExpire bool) bool {
315	// Remove s from session pool.
316	if !s.pool.remove(s, isExpire) {
317		return false
318	}
319	// Unregister s from healthcheck queue.
320	s.pool.hc.unregister(s)
321	// Remove s from Cloud Spanner service.
322	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
323	defer cancel()
324	s.delete(ctx)
325	return true
326}
327
328func (s *session) delete(ctx context.Context) {
329	// Ignore the error because even if we fail to explicitly destroy the
330	// session, it will be eventually garbage collected by Cloud Spanner.
331	err := s.client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: s.getID()})
332	if err != nil {
333		logf(s.logger, "Failed to delete session %v. Error: %v", s.getID(), err)
334	}
335}
336
337// prepareForWrite prepares the session for write if it is not already in that
338// state.
339func (s *session) prepareForWrite(ctx context.Context) error {
340	if s.isWritePrepared() {
341		return nil
342	}
343	tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, s.md), s.getID(), s.client)
344	// Session not found should cause the session to be removed from the pool.
345	if isSessionNotFoundError(err) {
346		s.pool.remove(s, false)
347		s.pool.hc.unregister(s)
348		return err
349	}
350	// Enable/disable background preparing of write sessions depending on
351	// whether the BeginTransaction call succeeded. This will prevent the
352	// session pool workers from going into an infinite loop of trying to
353	// prepare sessions. Any subsequent successful BeginTransaction call from
354	// for example takeWriteSession will re-enable the background process.
355	s.pool.mu.Lock()
356	s.pool.disableBackgroundPrepareSessions = err != nil
357	s.pool.mu.Unlock()
358	if err != nil {
359		return err
360	}
361	s.setTransactionID(tx)
362	return nil
363}
364
365// SessionPoolConfig stores configurations of a session pool.
366type SessionPoolConfig struct {
367	// MaxOpened is the maximum number of opened sessions allowed by the session
368	// pool. If the client tries to open a session and there are already
369	// MaxOpened sessions, it will block until one becomes available or the
370	// context passed to the client method is canceled or times out.
371	//
372	// Defaults to NumChannels * 100.
373	MaxOpened uint64
374
375	// MinOpened is the minimum number of opened sessions that the session pool
376	// tries to maintain. Session pool won't continue to expire sessions if
377	// number of opened connections drops below MinOpened. However, if a session
378	// is found to be broken, it will still be evicted from the session pool,
379	// therefore it is posssible that the number of opened sessions drops below
380	// MinOpened.
381	//
382	// Defaults to 100.
383	MinOpened uint64
384
385	// MaxIdle is the maximum number of idle sessions, pool is allowed to keep.
386	//
387	// Defaults to 0.
388	MaxIdle uint64
389
390	// MaxBurst is the maximum number of concurrent session creation requests.
391	//
392	// Defaults to 10.
393	MaxBurst uint64
394
395	// WriteSessions is the fraction of sessions we try to keep prepared for
396	// write.
397	//
398	// Defaults to 0.2.
399	WriteSessions float64
400
401	// HealthCheckWorkers is number of workers used by health checker for this
402	// pool.
403	//
404	// Defaults to 10.
405	HealthCheckWorkers int
406
407	// HealthCheckInterval is how often the health checker pings a session.
408	//
409	// Defaults to 5m.
410	HealthCheckInterval time.Duration
411
412	// TrackSessionHandles determines whether the session pool will keep track
413	// of the stacktrace of the goroutines that take sessions from the pool.
414	// This setting can be used to track down session leak problems.
415	//
416	// Defaults to false.
417	TrackSessionHandles bool
418
419	// healthCheckSampleInterval is how often the health checker samples live
420	// session (for use in maintaining session pool size).
421	//
422	// Defaults to 1m.
423	healthCheckSampleInterval time.Duration
424
425	// sessionLabels for the sessions created in the session pool.
426	sessionLabels map[string]string
427}
428
429// DefaultSessionPoolConfig is the default configuration for the session pool
430// that will be used for a Spanner client, unless the user supplies a specific
431// session pool config.
432var DefaultSessionPoolConfig = SessionPoolConfig{
433	MinOpened:           100,
434	MaxOpened:           numChannels * 100,
435	MaxBurst:            10,
436	WriteSessions:       0.2,
437	HealthCheckWorkers:  10,
438	HealthCheckInterval: 5 * time.Minute,
439}
440
441// errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set.
442func errMinOpenedGTMaxOpened(maxOpened, minOpened uint64) error {
443	return spannerErrorf(codes.InvalidArgument,
444		"require SessionPoolConfig.MaxOpened >= SessionPoolConfig.MinOpened, got %d and %d", maxOpened, minOpened)
445}
446
447// errWriteFractionOutOfRange returns error for
448// SessionPoolConfig.WriteFraction < 0 or SessionPoolConfig.WriteFraction > 1
449func errWriteFractionOutOfRange(writeFraction float64) error {
450	return spannerErrorf(codes.InvalidArgument,
451		"require SessionPoolConfig.WriteSessions >= 0.0 && SessionPoolConfig.WriteSessions <= 1.0, got %.2f", writeFraction)
452}
453
454// errHealthCheckWorkersNegative returns error for
455// SessionPoolConfig.HealthCheckWorkers < 0
456func errHealthCheckWorkersNegative(workers int) error {
457	return spannerErrorf(codes.InvalidArgument,
458		"require SessionPoolConfig.HealthCheckWorkers >= 0, got %d", workers)
459}
460
461// errHealthCheckIntervalNegative returns error for
462// SessionPoolConfig.HealthCheckInterval < 0
463func errHealthCheckIntervalNegative(interval time.Duration) error {
464	return spannerErrorf(codes.InvalidArgument,
465		"require SessionPoolConfig.HealthCheckInterval >= 0, got %v", interval)
466}
467
468// validate verifies that the SessionPoolConfig is good for use.
469func (spc *SessionPoolConfig) validate() error {
470	if spc.MinOpened > spc.MaxOpened && spc.MaxOpened > 0 {
471		return errMinOpenedGTMaxOpened(spc.MaxOpened, spc.MinOpened)
472	}
473	if spc.WriteSessions < 0.0 || spc.WriteSessions > 1.0 {
474		return errWriteFractionOutOfRange(spc.WriteSessions)
475	}
476	if spc.HealthCheckWorkers < 0 {
477		return errHealthCheckWorkersNegative(spc.HealthCheckWorkers)
478	}
479	if spc.HealthCheckInterval < 0 {
480		return errHealthCheckIntervalNegative(spc.HealthCheckInterval)
481	}
482	return nil
483}
484
485// sessionPool creates and caches Cloud Spanner sessions.
486type sessionPool struct {
487	// mu protects sessionPool from concurrent access.
488	mu sync.Mutex
489	// valid marks the validity of the session pool.
490	valid bool
491	// sc is used to create the sessions for the pool.
492	sc *sessionClient
493	// trackedSessionHandles contains all sessions handles that have been
494	// checked out of the pool. The list is only filled if TrackSessionHandles
495	// has been enabled.
496	trackedSessionHandles list.List
497	// idleList caches idle session IDs. Session IDs in this list can be
498	// allocated for use.
499	idleList list.List
500	// idleWriteList caches idle sessions which have been prepared for write.
501	idleWriteList list.List
502	// mayGetSession is for broadcasting that session retrival/creation may
503	// proceed.
504	mayGetSession chan struct{}
505	// numOpened is the total number of open sessions from the session pool.
506	numOpened uint64
507	// createReqs is the number of ongoing session creation requests.
508	createReqs uint64
509	// prepareReqs is the number of ongoing session preparation request.
510	prepareReqs uint64
511	// disableBackgroundPrepareSessions indicates that the BeginTransaction
512	// call for a read/write transaction failed with a permanent error, such as
513	// PermissionDenied or `Database not found`. Further background calls to
514	// prepare sessions will be disabled.
515	disableBackgroundPrepareSessions bool
516	// configuration of the session pool.
517	SessionPoolConfig
518	// hc is the health checker
519	hc *healthChecker
520
521	// mw is the maintenance window containing statistics for the max number of
522	// sessions checked out of the pool during the last 10 minutes.
523	mw *maintenanceWindow
524}
525
526// newSessionPool creates a new session pool.
527func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, error) {
528	if err := config.validate(); err != nil {
529		return nil, err
530	}
531	pool := &sessionPool{
532		sc:                sc,
533		valid:             true,
534		mayGetSession:     make(chan struct{}),
535		SessionPoolConfig: config,
536		mw:                newMaintenanceWindow(config.MaxOpened),
537	}
538	if config.HealthCheckWorkers == 0 {
539		// With 10 workers and assuming average latency of 5ms for
540		// BeginTransaction, we will be able to prepare 2000 tx/sec in advance.
541		// If the rate of takeWriteSession is more than that, it will degrade to
542		// doing BeginTransaction inline.
543		//
544		// TODO: consider resizing the worker pool dynamically according to the load.
545		config.HealthCheckWorkers = 10
546	}
547	if config.HealthCheckInterval == 0 {
548		config.HealthCheckInterval = 5 * time.Minute
549	}
550	if config.healthCheckSampleInterval == 0 {
551		config.healthCheckSampleInterval = time.Minute
552	}
553	// On GCE VM, within the same region an healthcheck ping takes on average
554	// 10ms to finish, given a 5 minutes interval and 10 healthcheck workers, a
555	// healthChecker can effectively mantain
556	// 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions.
557	pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, config.healthCheckSampleInterval, pool)
558
559	// First initialize the pool before we indicate that the healthchecker is
560	// ready. This prevents the maintainer from starting before the pool has
561	// been initialized, which means that we guarantee that the initial
562	// sessions are created using BatchCreateSessions.
563	if config.MinOpened > 0 {
564		numSessions := minUint64(config.MinOpened, math.MaxInt32)
565		if err := pool.initPool(int32(numSessions)); err != nil {
566			return nil, err
567		}
568	}
569	close(pool.hc.ready)
570	return pool, nil
571}
572
573func (p *sessionPool) initPool(numSessions int32) error {
574	p.mu.Lock()
575	// Take budget before the actual session creation.
576	p.numOpened += uint64(numSessions)
577	recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
578	p.createReqs += uint64(numSessions)
579	p.mu.Unlock()
580	// Asynchronously create the initial sessions for the pool.
581	return p.sc.batchCreateSessions(numSessions, p)
582}
583
584// sessionReady is executed by the SessionClient when a session has been
585// created and is ready to use. This method will add the new session to the
586// pool and decrease the number of sessions that is being created.
587func (p *sessionPool) sessionReady(s *session) {
588	p.mu.Lock()
589	defer p.mu.Unlock()
590	// Set this pool as the home pool of the session and register it with the
591	// health checker.
592	s.pool = p
593	p.hc.register(s)
594	p.createReqs--
595	// Insert the session at a random position in the pool to prevent all
596	// sessions affiliated with a channel to be placed at sequentially in the
597	// pool.
598	if p.idleList.Len() > 0 {
599		pos := rand.Intn(p.idleList.Len())
600		before := p.idleList.Front()
601		for i := 0; i < pos; i++ {
602			before = before.Next()
603		}
604		s.setIdleList(p.idleList.InsertBefore(s, before))
605	} else {
606		s.setIdleList(p.idleList.PushBack(s))
607	}
608	// Notify other waiters blocking on session creation.
609	close(p.mayGetSession)
610	p.mayGetSession = make(chan struct{})
611}
612
613// sessionCreationFailed is called by the SessionClient when the creation of one
614// or more requested sessions finished with an error. sessionCreationFailed will
615// decrease the number of sessions being created and notify any waiters that
616// the session creation failed.
617func (p *sessionPool) sessionCreationFailed(err error, numSessions int32) {
618	p.mu.Lock()
619	defer p.mu.Unlock()
620	p.createReqs -= uint64(numSessions)
621	p.numOpened -= uint64(numSessions)
622	recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
623	// Notify other waiters blocking on session creation.
624	close(p.mayGetSession)
625	p.mayGetSession = make(chan struct{})
626}
627
628// isValid checks if the session pool is still valid.
629func (p *sessionPool) isValid() bool {
630	if p == nil {
631		return false
632	}
633	p.mu.Lock()
634	defer p.mu.Unlock()
635	return p.valid
636}
637
638// close marks the session pool as closed.
639func (p *sessionPool) close() {
640	if p == nil {
641		return
642	}
643	p.mu.Lock()
644	if !p.valid {
645		p.mu.Unlock()
646		return
647	}
648	p.valid = false
649	p.mu.Unlock()
650	p.hc.close()
651	// destroy all the sessions
652	p.hc.mu.Lock()
653	allSessions := make([]*session, len(p.hc.queue.sessions))
654	copy(allSessions, p.hc.queue.sessions)
655	p.hc.mu.Unlock()
656	for _, s := range allSessions {
657		s.destroy(false)
658	}
659}
660
661// errInvalidSessionPool is the error for using an invalid session pool.
662var errInvalidSessionPool = spannerErrorf(codes.InvalidArgument, "invalid session pool")
663
664// errGetSessionTimeout returns error for context timeout during
665// sessionPool.take().
666var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canceled during getting session")
667
668// newSessionHandle creates a new session handle for the given session for this
669// session pool. The session handle will also hold a copy of the current call
670// stack if the session pool has been configured to track the call stacks of
671// sessions being checked out of the pool.
672func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) {
673	sh = &sessionHandle{session: s, checkoutTime: time.Now()}
674	if p.TrackSessionHandles {
675		p.mu.Lock()
676		sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh)
677		p.mu.Unlock()
678		sh.stack = debug.Stack()
679	}
680	return sh
681}
682
683// errGetSessionTimeout returns error for context timeout during
684// sessionPool.take().
685func (p *sessionPool) errGetSessionTimeout() error {
686	if p.TrackSessionHandles {
687		return p.errGetSessionTimeoutWithTrackedSessionHandles()
688	}
689	return p.errGetBasicSessionTimeout()
690}
691
692// errGetBasicSessionTimeout returns error for context timout during
693// sessionPool.take() without any tracked sessionHandles.
694func (p *sessionPool) errGetBasicSessionTimeout() error {
695	return spannerErrorf(codes.Canceled, "timeout / context canceled during getting session.\n"+
696		"Enable SessionPoolConfig.TrackSessionHandles if you suspect a session leak to get more information about the checked out sessions.")
697}
698
699// errGetSessionTimeoutWithTrackedSessionHandles returns error for context
700// timout during sessionPool.take() including a stacktrace of each checked out
701// session handle.
702func (p *sessionPool) errGetSessionTimeoutWithTrackedSessionHandles() error {
703	err := spannerErrorf(codes.Canceled, "timeout / context canceled during getting session.")
704	err.(*Error).additionalInformation = p.getTrackedSessionHandleStacksLocked()
705	return err
706}
707
708// getTrackedSessionHandleStacksLocked returns a string containing the
709// stacktrace of all currently checked out sessions of the pool. This method
710// requires the caller to have locked p.mu.
711func (p *sessionPool) getTrackedSessionHandleStacksLocked() string {
712	p.mu.Lock()
713	defer p.mu.Unlock()
714	stackTraces := ""
715	i := 1
716	element := p.trackedSessionHandles.Front()
717	for element != nil {
718		sh := element.Value.(*sessionHandle)
719		sh.mu.Lock()
720		if sh.stack != nil {
721			stackTraces = fmt.Sprintf("%s\n\nSession %d checked out of pool at %s by goroutine:\n%s", stackTraces, i, sh.checkoutTime.Format(time.RFC3339), sh.stack)
722		}
723		sh.mu.Unlock()
724		element = element.Next()
725		i++
726	}
727	return stackTraces
728}
729
730// shouldPrepareWriteLocked returns true if we should prepare more sessions for write.
731func (p *sessionPool) shouldPrepareWriteLocked() bool {
732	return !p.disableBackgroundPrepareSessions && float64(p.numOpened)*p.WriteSessions > float64(p.idleWriteList.Len()+int(p.prepareReqs))
733}
734
735func (p *sessionPool) createSession(ctx context.Context) (*session, error) {
736	trace.TracePrintf(ctx, nil, "Creating a new session")
737	doneCreate := func(done bool) {
738		p.mu.Lock()
739		if !done {
740			// Session creation failed, give budget back.
741			p.numOpened--
742			recordStat(ctx, OpenSessionCount, int64(p.numOpened))
743		}
744		p.createReqs--
745		// Notify other waiters blocking on session creation.
746		close(p.mayGetSession)
747		p.mayGetSession = make(chan struct{})
748		p.mu.Unlock()
749	}
750	s, err := p.sc.createSession(ctx)
751	if err != nil {
752		doneCreate(false)
753		// Should return error directly because of the previous retries on
754		// CreateSession RPC.
755		// If the error is a timeout, there is a chance that the session was
756		// created on the server but is not known to the session pool. This
757		// session will then be garbage collected by the server after 1 hour.
758		return nil, err
759	}
760	s.pool = p
761	p.hc.register(s)
762	doneCreate(true)
763	return s, nil
764}
765
766func (p *sessionPool) isHealthy(s *session) bool {
767	if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) {
768		// TODO: figure out if we need to schedule a new healthcheck worker here.
769		if err := s.ping(); isSessionNotFoundError(err) {
770			// The session is already bad, continue to fetch/create a new one.
771			s.destroy(false)
772			return false
773		}
774		p.hc.scheduledHC(s)
775	}
776	return true
777}
778
779// take returns a cached session if there are available ones; if there isn't
780// any, it tries to allocate a new one. Session returned by take should be used
781// for read operations.
782func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
783	trace.TracePrintf(ctx, nil, "Acquiring a read-only session")
784	for {
785		var (
786			s   *session
787			err error
788		)
789
790		p.mu.Lock()
791		if !p.valid {
792			p.mu.Unlock()
793			return nil, errInvalidSessionPool
794		}
795		if p.idleList.Len() > 0 {
796			// Idle sessions are available, get one from the top of the idle
797			// list.
798			s = p.idleList.Remove(p.idleList.Front()).(*session)
799			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
800				"Acquired read-only session")
801		} else if p.idleWriteList.Len() > 0 {
802			s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session)
803			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
804				"Acquired read-write session")
805		}
806		if s != nil {
807			s.setIdleList(nil)
808			numCheckedOut := p.currSessionsCheckedOutLocked()
809			p.mu.Unlock()
810			p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
811			// From here, session is no longer in idle list, so healthcheck
812			// workers won't destroy it. If healthcheck workers failed to
813			// schedule healthcheck for the session timely, do the check here.
814			// Because session check is still much cheaper than session
815			// creation, they should be reused as much as possible.
816			if !p.isHealthy(s) {
817				continue
818			}
819			return p.newSessionHandle(s), nil
820		}
821
822		// Idle list is empty, block if session pool has reached max session
823		// creation concurrency or max number of open sessions.
824		if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) {
825			mayGetSession := p.mayGetSession
826			p.mu.Unlock()
827			trace.TracePrintf(ctx, nil, "Waiting for read-only session to become available")
828			select {
829			case <-ctx.Done():
830				trace.TracePrintf(ctx, nil, "Context done waiting for session")
831				return nil, p.errGetSessionTimeout()
832			case <-mayGetSession:
833			}
834			continue
835		}
836
837		// Take budget before the actual session creation.
838		p.numOpened++
839		// Creating a new session that will be returned directly to the client
840		// means that the max number of sessions in use also increases.
841		numCheckedOut := p.currSessionsCheckedOutLocked()
842		recordStat(ctx, OpenSessionCount, int64(p.numOpened))
843		p.createReqs++
844		p.mu.Unlock()
845		p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
846		if s, err = p.createSession(ctx); err != nil {
847			trace.TracePrintf(ctx, nil, "Error creating session: %v", err)
848			return nil, toSpannerError(err)
849		}
850		trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
851			"Created session")
852		return p.newSessionHandle(s), nil
853	}
854}
855
856// takeWriteSession returns a write prepared cached session if there are
857// available ones; if there isn't any, it tries to allocate a new one. Session
858// returned should be used for read write transactions.
859func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) {
860	trace.TracePrintf(ctx, nil, "Acquiring a read-write session")
861	for {
862		var (
863			s   *session
864			err error
865		)
866
867		p.mu.Lock()
868		if !p.valid {
869			p.mu.Unlock()
870			return nil, errInvalidSessionPool
871		}
872		if p.idleWriteList.Len() > 0 {
873			// Idle sessions are available, get one from the top of the idle
874			// list.
875			s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session)
876			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-write session")
877		} else if p.idleList.Len() > 0 {
878			s = p.idleList.Remove(p.idleList.Front()).(*session)
879			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-only session")
880		}
881		if s != nil {
882			s.setIdleList(nil)
883			numCheckedOut := p.currSessionsCheckedOutLocked()
884			p.mu.Unlock()
885			p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
886			// From here, session is no longer in idle list, so healthcheck
887			// workers won't destroy it. If healthcheck workers failed to
888			// schedule healthcheck for the session timely, do the check here.
889			// Because session check is still much cheaper than session
890			// creation, they should be reused as much as possible.
891			if !p.isHealthy(s) {
892				continue
893			}
894		} else {
895			// Idle list is empty, block if session pool has reached max session
896			// creation concurrency or max number of open sessions.
897			if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) {
898				mayGetSession := p.mayGetSession
899				p.mu.Unlock()
900				trace.TracePrintf(ctx, nil, "Waiting for read-write session to become available")
901				select {
902				case <-ctx.Done():
903					trace.TracePrintf(ctx, nil, "Context done waiting for session")
904					return nil, p.errGetSessionTimeout()
905				case <-mayGetSession:
906				}
907				continue
908			}
909
910			// Take budget before the actual session creation.
911			p.numOpened++
912			// Creating a new session that will be returned directly to the client
913			// means that the max number of sessions in use also increases.
914			numCheckedOut := p.currSessionsCheckedOutLocked()
915			recordStat(ctx, OpenSessionCount, int64(p.numOpened))
916			p.createReqs++
917			p.mu.Unlock()
918			p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
919			if s, err = p.createSession(ctx); err != nil {
920				trace.TracePrintf(ctx, nil, "Error creating session: %v", err)
921				return nil, toSpannerError(err)
922			}
923			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
924				"Created session")
925		}
926		if !s.isWritePrepared() {
927			if err = s.prepareForWrite(ctx); err != nil {
928				if isSessionNotFoundError(err) {
929					s.destroy(false)
930					trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
931						"Session not found for write")
932					return nil, toSpannerError(err)
933				}
934
935				s.recycle()
936				trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
937					"Error preparing session for write")
938				return nil, toSpannerError(err)
939			}
940		}
941		return p.newSessionHandle(s), nil
942	}
943}
944
945// recycle puts session s back to the session pool's idle list, it returns true
946// if the session pool successfully recycles session s.
947func (p *sessionPool) recycle(s *session) bool {
948	p.mu.Lock()
949	defer p.mu.Unlock()
950	if !s.isValid() || !p.valid {
951		// Reject the session if session is invalid or pool itself is invalid.
952		return false
953	}
954	// Put session at the top of the list to be handed out in LIFO order for load balancing
955	// across channels.
956	if s.isWritePrepared() {
957		s.setIdleList(p.idleWriteList.PushFront(s))
958	} else {
959		s.setIdleList(p.idleList.PushFront(s))
960	}
961	// Broadcast that a session has been returned to idle list.
962	close(p.mayGetSession)
963	p.mayGetSession = make(chan struct{})
964	return true
965}
966
967// remove atomically removes session s from the session pool and invalidates s.
968// If isExpire == true, the removal is triggered by session expiration and in
969// such cases, only idle sessions can be removed.
970func (p *sessionPool) remove(s *session, isExpire bool) bool {
971	p.mu.Lock()
972	defer p.mu.Unlock()
973	if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) {
974		// Don't expire session if the session is not in idle list (in use), or
975		// if number of open sessions is going below p.MinOpened.
976		return false
977	}
978	ol := s.setIdleList(nil)
979	// If the session is in the idlelist, remove it.
980	if ol != nil {
981		// Remove from whichever list it is in.
982		p.idleList.Remove(ol)
983		p.idleWriteList.Remove(ol)
984	}
985	if s.invalidate() {
986		// Decrease the number of opened sessions.
987		p.numOpened--
988		recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
989		// Broadcast that a session has been destroyed.
990		close(p.mayGetSession)
991		p.mayGetSession = make(chan struct{})
992		return true
993	}
994	return false
995}
996
997func (p *sessionPool) currSessionsCheckedOutLocked() uint64 {
998	return p.numOpened - uint64(p.idleList.Len()) - uint64(p.idleWriteList.Len())
999}
1000
1001// hcHeap implements heap.Interface. It is used to create the priority queue for
1002// session healthchecks.
1003type hcHeap struct {
1004	sessions []*session
1005}
1006
1007// Len implements heap.Interface.Len.
1008func (h hcHeap) Len() int {
1009	return len(h.sessions)
1010}
1011
1012// Less implements heap.Interface.Less.
1013func (h hcHeap) Less(i, j int) bool {
1014	return h.sessions[i].getNextCheck().Before(h.sessions[j].getNextCheck())
1015}
1016
1017// Swap implements heap.Interface.Swap.
1018func (h hcHeap) Swap(i, j int) {
1019	h.sessions[i], h.sessions[j] = h.sessions[j], h.sessions[i]
1020	h.sessions[i].setHcIndex(i)
1021	h.sessions[j].setHcIndex(j)
1022}
1023
1024// Push implements heap.Interface.Push.
1025func (h *hcHeap) Push(s interface{}) {
1026	ns := s.(*session)
1027	ns.setHcIndex(len(h.sessions))
1028	h.sessions = append(h.sessions, ns)
1029}
1030
1031// Pop implements heap.Interface.Pop.
1032func (h *hcHeap) Pop() interface{} {
1033	old := h.sessions
1034	n := len(old)
1035	s := old[n-1]
1036	h.sessions = old[:n-1]
1037	s.setHcIndex(-1)
1038	return s
1039}
1040
1041// maintenanceWindowSize specifies the number of health check cycles that
1042// defines a maintenance window. The maintenance window keeps track of a
1043// rolling set of numbers for the number of maximum checked out sessions during
1044// the maintenance window. This is used by the maintainer to determine the
1045// number of sessions to create or delete at the end of each health check
1046// cycle.
1047const maintenanceWindowSize = 10
1048
1049// maintenanceWindow contains the statistics that are gathered during a health
1050// check maintenance window.
1051type maintenanceWindow struct {
1052	mu sync.Mutex
1053	// maxSessionsCheckedOut contains the maximum number of sessions that was
1054	// checked out of the session pool during a health check cycle. This number
1055	// indicates the number of sessions that was actually needed by the pool to
1056	// serve the load during that cycle. The values are kept as a rolling set
1057	// containing the values for the past 10 cycles (minutes). The maintainer
1058	// uses these values to determine the number of sessions to keep at the end
1059	// of each cycle.
1060	maxSessionsCheckedOut [maintenanceWindowSize]uint64
1061}
1062
1063// maxSessionsCheckedOutDuringWindow returns the maximum number of sessions
1064// that has been checked out during the last maintenance window of 10 cycles
1065// (minutes).
1066func (mw *maintenanceWindow) maxSessionsCheckedOutDuringWindow() uint64 {
1067	mw.mu.Lock()
1068	defer mw.mu.Unlock()
1069	var max uint64
1070	for _, cycleMax := range mw.maxSessionsCheckedOut {
1071		max = maxUint64(max, cycleMax)
1072	}
1073	return max
1074}
1075
1076// updateMaxSessionsCheckedOutDuringWindow updates the maximum number of
1077// sessions that has been checked out of the pool during the current
1078// cycle of the maintenance window. A maintenance window consists of 10
1079// maintenance cycles. Each cycle keeps track of the max number of sessions in
1080// use during that cycle. The rolling maintenance window of 10 cycles is used
1081// to determine the number of sessions to keep at the end of a cycle by
1082// calculating the max in use during the last 10 cycles.
1083func (mw *maintenanceWindow) updateMaxSessionsCheckedOutDuringWindow(currNumSessionsCheckedOut uint64) {
1084	mw.mu.Lock()
1085	defer mw.mu.Unlock()
1086	mw.maxSessionsCheckedOut[0] = maxUint64(currNumSessionsCheckedOut, mw.maxSessionsCheckedOut[0])
1087}
1088
1089// startNewCycle starts a new health check cycle with the specified number of
1090// checked out sessions as its initial value.
1091func (mw *maintenanceWindow) startNewCycle(currNumSessionsCheckedOut uint64) {
1092	mw.mu.Lock()
1093	defer mw.mu.Unlock()
1094	copy(mw.maxSessionsCheckedOut[1:], mw.maxSessionsCheckedOut[:9])
1095	mw.maxSessionsCheckedOut[0] = currNumSessionsCheckedOut
1096}
1097
1098// newMaintenanceWindow creates a new maintenance window with all values for
1099// maxSessionsCheckedOut set to maxOpened. This ensures that a complete
1100// maintenance window must pass before the maintainer will start to delete any
1101// sessions.
1102func newMaintenanceWindow(maxOpened uint64) *maintenanceWindow {
1103	mw := &maintenanceWindow{}
1104	// Initialize the rolling window with max values to prevent the maintainer
1105	// from deleting sessions before a complete window of 10 cycles has
1106	// finished.
1107	for i := 0; i < maintenanceWindowSize; i++ {
1108		mw.maxSessionsCheckedOut[i] = maxOpened
1109	}
1110	return mw
1111}
1112
1113// healthChecker performs periodical healthchecks on registered sessions.
1114type healthChecker struct {
1115	// mu protects concurrent access to healthChecker.
1116	mu sync.Mutex
1117	// queue is the priority queue for session healthchecks. Sessions with lower
1118	// nextCheck rank higher in the queue.
1119	queue hcHeap
1120	// interval is the average interval between two healthchecks on a session.
1121	interval time.Duration
1122	// workers is the number of concurrent healthcheck workers.
1123	workers int
1124	// waitWorkers waits for all healthcheck workers to exit
1125	waitWorkers sync.WaitGroup
1126	// pool is the underlying session pool.
1127	pool *sessionPool
1128	// sampleInterval is the interval of sampling by the maintainer.
1129	sampleInterval time.Duration
1130	// ready is used to signal that maintainer can start running.
1131	ready chan struct{}
1132	// done is used to signal that health checker should be closed.
1133	done chan struct{}
1134	// once is used for closing channel done only once.
1135	once             sync.Once
1136	maintainerCancel func()
1137}
1138
1139// newHealthChecker initializes new instance of healthChecker.
1140func newHealthChecker(interval time.Duration, workers int, sampleInterval time.Duration, pool *sessionPool) *healthChecker {
1141	if workers <= 0 {
1142		workers = 1
1143	}
1144	hc := &healthChecker{
1145		interval:         interval,
1146		workers:          workers,
1147		pool:             pool,
1148		sampleInterval:   sampleInterval,
1149		ready:            make(chan struct{}),
1150		done:             make(chan struct{}),
1151		maintainerCancel: func() {},
1152	}
1153	hc.waitWorkers.Add(1)
1154	go hc.maintainer()
1155	for i := 1; i <= hc.workers; i++ {
1156		hc.waitWorkers.Add(1)
1157		go hc.worker(i)
1158	}
1159	return hc
1160}
1161
1162// close closes the healthChecker and waits for all healthcheck workers to exit.
1163func (hc *healthChecker) close() {
1164	hc.mu.Lock()
1165	hc.maintainerCancel()
1166	hc.mu.Unlock()
1167	hc.once.Do(func() { close(hc.done) })
1168	hc.waitWorkers.Wait()
1169}
1170
1171// isClosing checks if a healthChecker is already closing.
1172func (hc *healthChecker) isClosing() bool {
1173	select {
1174	case <-hc.done:
1175		return true
1176	default:
1177		return false
1178	}
1179}
1180
1181// getInterval gets the healthcheck interval.
1182func (hc *healthChecker) getInterval() time.Duration {
1183	hc.mu.Lock()
1184	defer hc.mu.Unlock()
1185	return hc.interval
1186}
1187
1188// scheduledHCLocked schedules next healthcheck on session s with the assumption
1189// that hc.mu is being held.
1190func (hc *healthChecker) scheduledHCLocked(s *session) {
1191	// The next healthcheck will be scheduled after
1192	// [interval*0.5, interval*1.5) ns.
1193	nsFromNow := rand.Int63n(int64(hc.interval)) + int64(hc.interval)/2
1194	s.setNextCheck(time.Now().Add(time.Duration(nsFromNow)))
1195	if hi := s.getHcIndex(); hi != -1 {
1196		// Session is still being tracked by healthcheck workers.
1197		heap.Fix(&hc.queue, hi)
1198	}
1199}
1200
1201// scheduledHC schedules next healthcheck on session s. It is safe to be called
1202// concurrently.
1203func (hc *healthChecker) scheduledHC(s *session) {
1204	hc.mu.Lock()
1205	defer hc.mu.Unlock()
1206	hc.scheduledHCLocked(s)
1207}
1208
1209// register registers a session with healthChecker for periodical healthcheck.
1210func (hc *healthChecker) register(s *session) {
1211	hc.mu.Lock()
1212	defer hc.mu.Unlock()
1213	hc.scheduledHCLocked(s)
1214	heap.Push(&hc.queue, s)
1215}
1216
1217// unregister unregisters a session from healthcheck queue.
1218func (hc *healthChecker) unregister(s *session) {
1219	hc.mu.Lock()
1220	defer hc.mu.Unlock()
1221	oi := s.setHcIndex(-1)
1222	if oi >= 0 {
1223		heap.Remove(&hc.queue, oi)
1224	}
1225}
1226
1227// markDone marks that health check for session has been performed.
1228func (hc *healthChecker) markDone(s *session) {
1229	hc.mu.Lock()
1230	defer hc.mu.Unlock()
1231	s.checkingHealth = false
1232}
1233
1234// healthCheck checks the health of the session and pings it if needed.
1235func (hc *healthChecker) healthCheck(s *session) {
1236	defer hc.markDone(s)
1237	if !s.pool.isValid() {
1238		// Session pool is closed, perform a garbage collection.
1239		s.destroy(false)
1240		return
1241	}
1242	if err := s.ping(); isSessionNotFoundError(err) {
1243		// Ping failed, destroy the session.
1244		s.destroy(false)
1245	}
1246}
1247
1248// worker performs the healthcheck on sessions in healthChecker's priority
1249// queue.
1250func (hc *healthChecker) worker(i int) {
1251	// Returns a session which we should ping to keep it alive.
1252	getNextForPing := func() *session {
1253		hc.pool.mu.Lock()
1254		defer hc.pool.mu.Unlock()
1255		hc.mu.Lock()
1256		defer hc.mu.Unlock()
1257		if hc.queue.Len() <= 0 {
1258			// Queue is empty.
1259			return nil
1260		}
1261		s := hc.queue.sessions[0]
1262		if s.getNextCheck().After(time.Now()) && hc.pool.valid {
1263			// All sessions have been checked recently.
1264			return nil
1265		}
1266		hc.scheduledHCLocked(s)
1267		if !s.checkingHealth {
1268			s.checkingHealth = true
1269			return s
1270		}
1271		return nil
1272	}
1273
1274	// Returns a session which we should prepare for write.
1275	getNextForTx := func() *session {
1276		hc.pool.mu.Lock()
1277		defer hc.pool.mu.Unlock()
1278		if hc.pool.shouldPrepareWriteLocked() {
1279			if hc.pool.idleList.Len() > 0 && hc.pool.valid {
1280				hc.mu.Lock()
1281				defer hc.mu.Unlock()
1282				if hc.pool.idleList.Front().Value.(*session).checkingHealth {
1283					return nil
1284				}
1285				session := hc.pool.idleList.Remove(hc.pool.idleList.Front()).(*session)
1286				session.checkingHealth = true
1287				hc.pool.prepareReqs++
1288				return session
1289			}
1290		}
1291		return nil
1292	}
1293
1294	for {
1295		if hc.isClosing() {
1296			// Exit when the pool has been closed and all sessions have been
1297			// destroyed or when health checker has been closed.
1298			hc.waitWorkers.Done()
1299			return
1300		}
1301		ws := getNextForTx()
1302		if ws != nil {
1303			ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
1304			err := ws.prepareForWrite(ctx)
1305			cancel()
1306			if err != nil {
1307				// Skip handling prepare error, session can be prepared in next
1308				// cycle.
1309				// Don't log about permission errors, which may be expected
1310				// (e.g. using read-only auth).
1311				serr := toSpannerError(err).(*Error)
1312				if serr.Code != codes.PermissionDenied {
1313					logf(hc.pool.sc.logger, "Failed to prepare session, error: %v", serr)
1314				}
1315			}
1316			hc.pool.recycle(ws)
1317			hc.pool.mu.Lock()
1318			hc.pool.prepareReqs--
1319			hc.pool.mu.Unlock()
1320			hc.markDone(ws)
1321		}
1322		rs := getNextForPing()
1323		if rs == nil {
1324			if ws == nil {
1325				// No work to be done so sleep to avoid burning CPU.
1326				pause := int64(100 * time.Millisecond)
1327				if pause > int64(hc.interval) {
1328					pause = int64(hc.interval)
1329				}
1330				select {
1331				case <-time.After(time.Duration(rand.Int63n(pause) + pause/2)):
1332				case <-hc.done:
1333				}
1334
1335			}
1336			continue
1337		}
1338		hc.healthCheck(rs)
1339	}
1340}
1341
1342// maintainer maintains the number of sessions in the pool based on the session
1343// pool configuration and the current and historical number of sessions checked
1344// out of the pool. The maintainer will:
1345// 1. Ensure that the session pool contains at least MinOpened sessions.
1346// 2. If the current number of sessions in the pool exceeds the greatest number
1347//    of checked out sessions (=sessions in use) during the last 10 minutes,
1348//    and the delta is larger than MaxIdleSessions, the maintainer will reduce
1349//    the number of sessions to maxSessionsInUseDuringWindow+MaxIdleSessions.
1350func (hc *healthChecker) maintainer() {
1351	// Wait until the pool is ready.
1352	<-hc.ready
1353
1354	for iteration := uint64(0); ; iteration++ {
1355		if hc.isClosing() {
1356			hc.waitWorkers.Done()
1357			return
1358		}
1359
1360		hc.pool.mu.Lock()
1361		currSessionsOpened := hc.pool.numOpened
1362		maxIdle := hc.pool.MaxIdle
1363		minOpened := hc.pool.MinOpened
1364		hc.pool.mu.Unlock()
1365		// Get the maximum number of sessions in use during the current
1366		// maintenance window.
1367		maxSessionsInUseDuringWindow := hc.pool.mw.maxSessionsCheckedOutDuringWindow()
1368		hc.mu.Lock()
1369		ctx, cancel := context.WithTimeout(context.Background(), hc.sampleInterval)
1370		hc.maintainerCancel = cancel
1371		hc.mu.Unlock()
1372
1373		// Grow or shrink pool if needed.
1374		// The number of sessions in the pool should be in the range
1375		// [Config.MinOpened, Config.MaxIdle+maxSessionsInUseDuringWindow]
1376		if currSessionsOpened < minOpened {
1377			hc.growPool(ctx, minOpened)
1378		} else if maxIdle+maxSessionsInUseDuringWindow < currSessionsOpened {
1379			hc.shrinkPool(ctx, maxIdle+maxSessionsInUseDuringWindow)
1380		}
1381
1382		select {
1383		case <-ctx.Done():
1384		case <-hc.done:
1385			cancel()
1386		}
1387		// Cycle the maintenance window. This will remove the oldest cycle and
1388		// add a new cycle at the beginning of the maintenance window with the
1389		// currently checked out number of sessions as the max number of
1390		// sessions in use in this cycle. This value will be increased during
1391		// the next cycle if it increases.
1392		hc.pool.mu.Lock()
1393		currSessionsInUse := hc.pool.currSessionsCheckedOutLocked()
1394		hc.pool.mu.Unlock()
1395		hc.pool.mw.startNewCycle(currSessionsInUse)
1396	}
1397}
1398
1399// growPool grows the number of sessions in the pool to the specified number of
1400// sessions. It timeouts on sampleInterval.
1401func (hc *healthChecker) growPool(ctx context.Context, growToNumSessions uint64) {
1402	// Calculate the max number of sessions to create as a safeguard against
1403	// other processes that could be deleting sessions concurrently.
1404	hc.pool.mu.Lock()
1405	maxSessionsToCreate := int(growToNumSessions - hc.pool.numOpened)
1406	hc.pool.mu.Unlock()
1407	var created int
1408	for {
1409		if ctx.Err() != nil {
1410			return
1411		}
1412
1413		p := hc.pool
1414		p.mu.Lock()
1415		// Take budget before the actual session creation.
1416		if growToNumSessions <= p.numOpened || created >= maxSessionsToCreate {
1417			p.mu.Unlock()
1418			break
1419		}
1420		p.numOpened++
1421		recordStat(ctx, OpenSessionCount, int64(p.numOpened))
1422		p.createReqs++
1423		shouldPrepareWrite := p.shouldPrepareWriteLocked()
1424		p.mu.Unlock()
1425		var (
1426			s   *session
1427			err error
1428		)
1429		createContext, cancel := context.WithTimeout(context.Background(), time.Minute)
1430		if s, err = p.createSession(createContext); err != nil {
1431			cancel()
1432			logf(p.sc.logger, "Failed to create session, error: %v", toSpannerError(err))
1433			continue
1434		}
1435		cancel()
1436		created++
1437		if shouldPrepareWrite {
1438			prepareContext, cancel := context.WithTimeout(context.Background(), time.Minute)
1439			if err = s.prepareForWrite(prepareContext); err != nil {
1440				cancel()
1441				p.recycle(s)
1442				// Don't log about permission errors, which may be expected
1443				// (e.g. using read-only auth).
1444				serr := toSpannerError(err).(*Error)
1445				if serr.Code != codes.PermissionDenied {
1446					logf(p.sc.logger, "Failed to prepare session, error: %v", serr)
1447				}
1448				continue
1449			}
1450			cancel()
1451		}
1452		p.recycle(s)
1453	}
1454}
1455
1456// shrinkPool scales down the session pool. The method will stop deleting
1457// sessions when shrinkToNumSessions number of sessions in the pool has
1458// been reached. The method will also stop deleting sessions if it detects that
1459// another process has started creating sessions for the pool again, for
1460// example through the take() method.
1461func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uint64) {
1462	hc.pool.mu.Lock()
1463	maxSessionsToDelete := int(hc.pool.numOpened - shrinkToNumSessions)
1464	hc.pool.mu.Unlock()
1465	var deleted int
1466	var prevNumOpened uint64 = math.MaxUint64
1467	for {
1468		if ctx.Err() != nil {
1469			return
1470		}
1471
1472		p := hc.pool
1473		p.mu.Lock()
1474		// Check if the number of open sessions has increased. If it has, we
1475		// should stop deleting sessions, as the load has increased and
1476		// additional sessions are needed.
1477		if p.numOpened >= prevNumOpened {
1478			p.mu.Unlock()
1479			break
1480		}
1481		prevNumOpened = p.numOpened
1482
1483		// Check on both whether we have reached the number of open sessions as
1484		// well as the number of sessions to delete, in case sessions have been
1485		// deleted by other methods because they have expired or deemed
1486		// invalid.
1487		if shrinkToNumSessions >= p.numOpened || deleted >= maxSessionsToDelete {
1488			p.mu.Unlock()
1489			break
1490		}
1491
1492		var s *session
1493		if p.idleList.Len() > 0 {
1494			s = p.idleList.Front().Value.(*session)
1495		} else if p.idleWriteList.Len() > 0 {
1496			s = p.idleWriteList.Front().Value.(*session)
1497		}
1498		p.mu.Unlock()
1499		if s != nil {
1500			deleted++
1501			// destroy session as expire.
1502			s.destroy(true)
1503		} else {
1504			break
1505		}
1506	}
1507}
1508
1509// maxUint64 returns the maximum of two uint64.
1510func maxUint64(a, b uint64) uint64 {
1511	if a > b {
1512		return a
1513	}
1514	return b
1515}
1516
1517// minUint64 returns the minimum of two uint64.
1518func minUint64(a, b uint64) uint64 {
1519	if a > b {
1520		return b
1521	}
1522	return a
1523}
1524
1525// isSessionNotFoundError returns true if the given error is a
1526// `Session not found` error.
1527func isSessionNotFoundError(err error) bool {
1528	if err == nil {
1529		return false
1530	}
1531	// We are checking specifically for the error message `Session not found`,
1532	// as the error could also be a `Database not found`. The latter should
1533	// cause the session pool to stop preparing sessions for read/write
1534	// transactions, while the former should not.
1535	// TODO: once gRPC can return auxiliary error information, stop parsing the error message.
1536	return ErrCode(err) == codes.NotFound && strings.Contains(err.Error(), "Session not found")
1537}
1538