1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"container/heap"
21	"container/list"
22	"context"
23	"fmt"
24	"log"
25	"math"
26	"math/rand"
27	"runtime/debug"
28	"sync"
29	"time"
30
31	"cloud.google.com/go/internal/trace"
32	"cloud.google.com/go/internal/version"
33	vkit "cloud.google.com/go/spanner/apiv1"
34	"go.opencensus.io/stats"
35	"go.opencensus.io/tag"
36	octrace "go.opencensus.io/trace"
37	sppb "google.golang.org/genproto/googleapis/spanner/v1"
38	"google.golang.org/grpc/codes"
39	"google.golang.org/grpc/metadata"
40)
41
42const healthCheckIntervalMins = 50
43
44// sessionHandle is an interface for transactions to access Cloud Spanner
45// sessions safely. It is generated by sessionPool.take().
46type sessionHandle struct {
47	// mu guarantees that the inner session object is returned / destroyed only
48	// once.
49	mu sync.Mutex
50	// session is a pointer to a session object. Transactions never need to
51	// access it directly.
52	session *session
53	// checkoutTime is the time the session was checked out of the pool.
54	checkoutTime time.Time
55	// trackedSessionHandle is the linked list node which links the session to
56	// the list of tracked session handles. trackedSessionHandle is only set if
57	// TrackSessionHandles has been enabled in the session pool configuration.
58	trackedSessionHandle *list.Element
59	// stack is the call stack of the goroutine that checked out the session
60	// from the pool. This can be used to track down session leak problems.
61	stack []byte
62}
63
64// recycle gives the inner session object back to its home session pool. It is
65// safe to call recycle multiple times but only the first one would take effect.
66func (sh *sessionHandle) recycle() {
67	sh.mu.Lock()
68	if sh.session == nil {
69		// sessionHandle has already been recycled.
70		sh.mu.Unlock()
71		return
72	}
73	p := sh.session.pool
74	tracked := sh.trackedSessionHandle
75	sh.session.recycle()
76	sh.session = nil
77	sh.trackedSessionHandle = nil
78	sh.checkoutTime = time.Time{}
79	sh.stack = nil
80	sh.mu.Unlock()
81	if tracked != nil {
82		p.mu.Lock()
83		p.trackedSessionHandles.Remove(tracked)
84		p.mu.Unlock()
85	}
86}
87
88// getID gets the Cloud Spanner session ID from the internal session object.
89// getID returns empty string if the sessionHandle is nil or the inner session
90// object has been released by recycle / destroy.
91func (sh *sessionHandle) getID() string {
92	sh.mu.Lock()
93	defer sh.mu.Unlock()
94	if sh.session == nil {
95		// sessionHandle has already been recycled/destroyed.
96		return ""
97	}
98	return sh.session.getID()
99}
100
101// getClient gets the Cloud Spanner RPC client associated with the session ID
102// in sessionHandle.
103func (sh *sessionHandle) getClient() *vkit.Client {
104	sh.mu.Lock()
105	defer sh.mu.Unlock()
106	if sh.session == nil {
107		return nil
108	}
109	return sh.session.client
110}
111
112// getMetadata returns the metadata associated with the session in sessionHandle.
113func (sh *sessionHandle) getMetadata() metadata.MD {
114	sh.mu.Lock()
115	defer sh.mu.Unlock()
116	if sh.session == nil {
117		return nil
118	}
119	return sh.session.md
120}
121
122// getTransactionID returns the transaction id in the session if available.
123func (sh *sessionHandle) getTransactionID() transactionID {
124	sh.mu.Lock()
125	defer sh.mu.Unlock()
126	if sh.session == nil {
127		return nil
128	}
129	return sh.session.tx
130}
131
132// destroy destroys the inner session object. It is safe to call destroy
133// multiple times and only the first call would attempt to
134// destroy the inner session object.
135func (sh *sessionHandle) destroy() {
136	sh.mu.Lock()
137	s := sh.session
138	if s == nil {
139		// sessionHandle has already been recycled.
140		sh.mu.Unlock()
141		return
142	}
143	tracked := sh.trackedSessionHandle
144	sh.session = nil
145	sh.trackedSessionHandle = nil
146	sh.checkoutTime = time.Time{}
147	sh.stack = nil
148	sh.mu.Unlock()
149
150	if tracked != nil {
151		p := s.pool
152		p.mu.Lock()
153		p.trackedSessionHandles.Remove(tracked)
154		p.mu.Unlock()
155	}
156	s.destroy(false)
157}
158
159// session wraps a Cloud Spanner session ID through which transactions are
160// created and executed.
161type session struct {
162	// client is the RPC channel to Cloud Spanner. It is set only once during
163	// session's creation.
164	client *vkit.Client
165	// id is the unique id of the session in Cloud Spanner. It is set only once
166	// during session's creation.
167	id string
168	// pool is the session's home session pool where it was created. It is set
169	// only once during session's creation.
170	pool *sessionPool
171	// createTime is the timestamp of the session's creation. It is set only
172	// once during session's creation.
173	createTime time.Time
174	// logger is the logger configured for the Spanner client that created the
175	// session. If nil, logging will be directed to the standard logger.
176	logger *log.Logger
177
178	// mu protects the following fields from concurrent access: both
179	// healthcheck workers and transactions can modify them.
180	mu sync.Mutex
181	// valid marks the validity of a session.
182	valid bool
183	// hcIndex is the index of the session inside the global healthcheck queue.
184	// If hcIndex < 0, session has been unregistered from the queue.
185	hcIndex int
186	// idleList is the linkedlist node which links the session to its home
187	// session pool's idle list. If idleList == nil, the
188	// session is not in idle list.
189	idleList *list.Element
190	// nextCheck is the timestamp of next scheduled healthcheck of the session.
191	// It is maintained by the global health checker.
192	nextCheck time.Time
193	// checkingHelath is true if currently this session is being processed by
194	// health checker. Must be modified under health checker lock.
195	checkingHealth bool
196	// md is the Metadata to be sent with each request.
197	md metadata.MD
198	// tx contains the transaction id if the session has been prepared for
199	// write.
200	tx transactionID
201	// firstHCDone indicates whether the first health check is done or not.
202	firstHCDone bool
203}
204
205// isValid returns true if the session is still valid for use.
206func (s *session) isValid() bool {
207	s.mu.Lock()
208	defer s.mu.Unlock()
209	return s.valid
210}
211
212// isWritePrepared returns true if the session is prepared for write.
213func (s *session) isWritePrepared() bool {
214	s.mu.Lock()
215	defer s.mu.Unlock()
216	return s.tx != nil
217}
218
219// String implements fmt.Stringer for session.
220func (s *session) String() string {
221	s.mu.Lock()
222	defer s.mu.Unlock()
223	return fmt.Sprintf("<id=%v, hcIdx=%v, idleList=%p, valid=%v, create=%v, nextcheck=%v>",
224		s.id, s.hcIndex, s.idleList, s.valid, s.createTime, s.nextCheck)
225}
226
227// ping verifies if the session is still alive in Cloud Spanner.
228func (s *session) ping() error {
229	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
230	defer cancel()
231
232	// Start parent span that doesn't record.
233	_, span := octrace.StartSpan(ctx, "cloud.google.com/go/spanner.ping", octrace.WithSampler(octrace.NeverSample()))
234	defer span.End()
235
236	// s.getID is safe even when s is invalid.
237	_, err := s.client.ExecuteSql(contextWithOutgoingMetadata(ctx, s.md), &sppb.ExecuteSqlRequest{
238		Session: s.getID(),
239		Sql:     "SELECT 1",
240	})
241	return err
242}
243
244// setHcIndex atomically sets the session's index in the healthcheck queue and
245// returns the old index.
246func (s *session) setHcIndex(i int) int {
247	s.mu.Lock()
248	defer s.mu.Unlock()
249	oi := s.hcIndex
250	s.hcIndex = i
251	return oi
252}
253
254// setIdleList atomically sets the session's idle list link and returns the old
255// link.
256func (s *session) setIdleList(le *list.Element) *list.Element {
257	s.mu.Lock()
258	defer s.mu.Unlock()
259	old := s.idleList
260	s.idleList = le
261	return old
262}
263
264// invalidate marks a session as invalid and returns the old validity.
265func (s *session) invalidate() bool {
266	s.mu.Lock()
267	defer s.mu.Unlock()
268	ov := s.valid
269	s.valid = false
270	return ov
271}
272
273// setNextCheck sets the timestamp for next healthcheck on the session.
274func (s *session) setNextCheck(t time.Time) {
275	s.mu.Lock()
276	defer s.mu.Unlock()
277	s.nextCheck = t
278}
279
280// setTransactionID sets the transaction id in the session
281func (s *session) setTransactionID(tx transactionID) {
282	s.mu.Lock()
283	defer s.mu.Unlock()
284	s.tx = tx
285}
286
287// getID returns the session ID which uniquely identifies the session in Cloud
288// Spanner.
289func (s *session) getID() string {
290	s.mu.Lock()
291	defer s.mu.Unlock()
292	return s.id
293}
294
295// getHcIndex returns the session's index into the global healthcheck priority
296// queue.
297func (s *session) getHcIndex() int {
298	s.mu.Lock()
299	defer s.mu.Unlock()
300	return s.hcIndex
301}
302
303// getIdleList returns the session's link in its home session pool's idle list.
304func (s *session) getIdleList() *list.Element {
305	s.mu.Lock()
306	defer s.mu.Unlock()
307	return s.idleList
308}
309
310// getNextCheck returns the timestamp for next healthcheck on the session.
311func (s *session) getNextCheck() time.Time {
312	s.mu.Lock()
313	defer s.mu.Unlock()
314	return s.nextCheck
315}
316
317// recycle turns the session back to its home session pool.
318func (s *session) recycle() {
319	s.setTransactionID(nil)
320	if !s.pool.recycle(s) {
321		// s is rejected by its home session pool because it expired and the
322		// session pool currently has enough open sessions.
323		s.destroy(false)
324	}
325	s.pool.decNumInUse(context.Background())
326}
327
328// destroy removes the session from its home session pool, healthcheck queue
329// and Cloud Spanner service.
330func (s *session) destroy(isExpire bool) bool {
331	// Remove s from session pool.
332	if !s.pool.remove(s, isExpire) {
333		return false
334	}
335	// Unregister s from healthcheck queue.
336	s.pool.hc.unregister(s)
337	// Remove s from Cloud Spanner service.
338	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
339	defer cancel()
340	s.delete(ctx)
341	return true
342}
343
344func (s *session) delete(ctx context.Context) {
345	// Ignore the error because even if we fail to explicitly destroy the
346	// session, it will be eventually garbage collected by Cloud Spanner.
347	err := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.DeleteSessionRequest{Name: s.getID()})
348	if err != nil {
349		logf(s.logger, "Failed to delete session %v. Error: %v", s.getID(), err)
350	}
351}
352
353// prepareForWrite prepares the session for write if it is not already in that
354// state.
355func (s *session) prepareForWrite(ctx context.Context) error {
356	if s.isWritePrepared() {
357		return nil
358	}
359	tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, s.md), s.getID(), s.client)
360	// Session not found should cause the session to be removed from the pool.
361	if isSessionNotFoundError(err) {
362		s.pool.remove(s, false)
363		s.pool.hc.unregister(s)
364		return err
365	}
366	// Enable/disable background preparing of write sessions depending on
367	// whether the BeginTransaction call succeeded. This will prevent the
368	// session pool workers from going into an infinite loop of trying to
369	// prepare sessions. Any subsequent successful BeginTransaction call from
370	// for example takeWriteSession will re-enable the background process.
371	s.pool.mu.Lock()
372	s.pool.disableBackgroundPrepareSessions = err != nil
373	s.pool.mu.Unlock()
374	if err != nil {
375		return err
376	}
377	s.setTransactionID(tx)
378	return nil
379}
380
381// SessionPoolConfig stores configurations of a session pool.
382type SessionPoolConfig struct {
383	// MaxOpened is the maximum number of opened sessions allowed by the session
384	// pool. If the client tries to open a session and there are already
385	// MaxOpened sessions, it will block until one becomes available or the
386	// context passed to the client method is canceled or times out.
387	//
388	// Defaults to NumChannels * 100.
389	MaxOpened uint64
390
391	// MinOpened is the minimum number of opened sessions that the session pool
392	// tries to maintain. Session pool won't continue to expire sessions if
393	// number of opened connections drops below MinOpened. However, if a session
394	// is found to be broken, it will still be evicted from the session pool,
395	// therefore it is posssible that the number of opened sessions drops below
396	// MinOpened.
397	//
398	// Defaults to 100.
399	MinOpened uint64
400
401	// MaxIdle is the maximum number of idle sessions, pool is allowed to keep.
402	//
403	// Defaults to 0.
404	MaxIdle uint64
405
406	// MaxBurst is the maximum number of concurrent session creation requests.
407	//
408	// Defaults to 10.
409	MaxBurst uint64
410
411	// incStep is the number of sessions to create in one batch when at least
412	// one more session is needed.
413	//
414	// Defaults to 25.
415	incStep uint64
416
417	// WriteSessions is the fraction of sessions we try to keep prepared for
418	// write.
419	//
420	// Defaults to 0.2.
421	WriteSessions float64
422
423	// HealthCheckWorkers is number of workers used by health checker for this
424	// pool.
425	//
426	// Defaults to 10.
427	HealthCheckWorkers int
428
429	// HealthCheckInterval is how often the health checker pings a session.
430	//
431	// Defaults to 5m.
432	HealthCheckInterval time.Duration
433
434	// TrackSessionHandles determines whether the session pool will keep track
435	// of the stacktrace of the goroutines that take sessions from the pool.
436	// This setting can be used to track down session leak problems.
437	//
438	// Defaults to false.
439	TrackSessionHandles bool
440
441	// healthCheckSampleInterval is how often the health checker samples live
442	// session (for use in maintaining session pool size).
443	//
444	// Defaults to 1m.
445	healthCheckSampleInterval time.Duration
446
447	// sessionLabels for the sessions created in the session pool.
448	sessionLabels map[string]string
449}
450
451// DefaultSessionPoolConfig is the default configuration for the session pool
452// that will be used for a Spanner client, unless the user supplies a specific
453// session pool config.
454var DefaultSessionPoolConfig = SessionPoolConfig{
455	MinOpened:           100,
456	MaxOpened:           numChannels * 100,
457	MaxBurst:            10,
458	incStep:             25,
459	WriteSessions:       0.2,
460	HealthCheckWorkers:  10,
461	HealthCheckInterval: healthCheckIntervalMins * time.Minute,
462}
463
464// errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set.
465func errMinOpenedGTMaxOpened(maxOpened, minOpened uint64) error {
466	return spannerErrorf(codes.InvalidArgument,
467		"require SessionPoolConfig.MaxOpened >= SessionPoolConfig.MinOpened, got %d and %d", maxOpened, minOpened)
468}
469
470// errWriteFractionOutOfRange returns error for
471// SessionPoolConfig.WriteFraction < 0 or SessionPoolConfig.WriteFraction > 1
472func errWriteFractionOutOfRange(writeFraction float64) error {
473	return spannerErrorf(codes.InvalidArgument,
474		"require SessionPoolConfig.WriteSessions >= 0.0 && SessionPoolConfig.WriteSessions <= 1.0, got %.2f", writeFraction)
475}
476
477// errHealthCheckWorkersNegative returns error for
478// SessionPoolConfig.HealthCheckWorkers < 0
479func errHealthCheckWorkersNegative(workers int) error {
480	return spannerErrorf(codes.InvalidArgument,
481		"require SessionPoolConfig.HealthCheckWorkers >= 0, got %d", workers)
482}
483
484// errHealthCheckIntervalNegative returns error for
485// SessionPoolConfig.HealthCheckInterval < 0
486func errHealthCheckIntervalNegative(interval time.Duration) error {
487	return spannerErrorf(codes.InvalidArgument,
488		"require SessionPoolConfig.HealthCheckInterval >= 0, got %v", interval)
489}
490
491// validate verifies that the SessionPoolConfig is good for use.
492func (spc *SessionPoolConfig) validate() error {
493	if spc.MinOpened > spc.MaxOpened && spc.MaxOpened > 0 {
494		return errMinOpenedGTMaxOpened(spc.MaxOpened, spc.MinOpened)
495	}
496	if spc.WriteSessions < 0.0 || spc.WriteSessions > 1.0 {
497		return errWriteFractionOutOfRange(spc.WriteSessions)
498	}
499	if spc.HealthCheckWorkers < 0 {
500		return errHealthCheckWorkersNegative(spc.HealthCheckWorkers)
501	}
502	if spc.HealthCheckInterval < 0 {
503		return errHealthCheckIntervalNegative(spc.HealthCheckInterval)
504	}
505	return nil
506}
507
508// sessionPool creates and caches Cloud Spanner sessions.
509type sessionPool struct {
510	// mu protects sessionPool from concurrent access.
511	mu sync.Mutex
512	// valid marks the validity of the session pool.
513	valid bool
514	// sc is used to create the sessions for the pool.
515	sc *sessionClient
516	// trackedSessionHandles contains all sessions handles that have been
517	// checked out of the pool. The list is only filled if TrackSessionHandles
518	// has been enabled.
519	trackedSessionHandles list.List
520	// idleList caches idle session IDs. Session IDs in this list can be
521	// allocated for use.
522	idleList list.List
523	// idleWriteList caches idle sessions which have been prepared for write.
524	idleWriteList list.List
525	// mayGetSession is for broadcasting that session retrival/creation may
526	// proceed.
527	mayGetSession chan struct{}
528	// sessionCreationError is the last error that occurred during session
529	// creation and is propagated to any waiters waiting for a session.
530	sessionCreationError error
531	// numOpened is the total number of open sessions from the session pool.
532	numOpened uint64
533	// createReqs is the number of ongoing session creation requests.
534	createReqs uint64
535	// prepareReqs is the number of ongoing session preparation request.
536	prepareReqs uint64
537	// numReadWaiters is the number of processes waiting for a read session to
538	// become available.
539	numReadWaiters uint64
540	// numWriteWaiters is the number of processes waiting for a write session
541	// to become available.
542	numWriteWaiters uint64
543	// disableBackgroundPrepareSessions indicates that the BeginTransaction
544	// call for a read/write transaction failed with a permanent error, such as
545	// PermissionDenied or `Database not found`. Further background calls to
546	// prepare sessions will be disabled.
547	disableBackgroundPrepareSessions bool
548	// configuration of the session pool.
549	SessionPoolConfig
550	// hc is the health checker
551	hc *healthChecker
552	// rand is a separately sourced random generator.
553	rand *rand.Rand
554	// numInUse is the number of sessions that are currently in use (checked out
555	// from the session pool).
556	numInUse uint64
557	// maxNumInUse is the maximum number of sessions in use concurrently in the
558	// current 10 minute interval.
559	maxNumInUse uint64
560	// lastResetTime is the start time of the window for recording maxNumInUse.
561	lastResetTime time.Time
562	// numReads is the number of sessions that are idle for reads.
563	numReads uint64
564	// numWrites is the number of sessions that are idle for writes.
565	numWrites uint64
566
567	// mw is the maintenance window containing statistics for the max number of
568	// sessions checked out of the pool during the last 10 minutes.
569	mw *maintenanceWindow
570
571	// tagMap is a map of all tags that are associated with the emitted metrics.
572	tagMap *tag.Map
573}
574
575// newSessionPool creates a new session pool.
576func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, error) {
577	if err := config.validate(); err != nil {
578		return nil, err
579	}
580	pool := &sessionPool{
581		sc:                sc,
582		valid:             true,
583		mayGetSession:     make(chan struct{}),
584		SessionPoolConfig: config,
585		mw:                newMaintenanceWindow(config.MaxOpened),
586		rand:              rand.New(rand.NewSource(time.Now().UnixNano())),
587	}
588	if config.HealthCheckWorkers == 0 {
589		// With 10 workers and assuming average latency of 5ms for
590		// BeginTransaction, we will be able to prepare 2000 tx/sec in advance.
591		// If the rate of takeWriteSession is more than that, it will degrade to
592		// doing BeginTransaction inline.
593		//
594		// TODO: consider resizing the worker pool dynamically according to the load.
595		config.HealthCheckWorkers = 10
596	}
597	if config.HealthCheckInterval == 0 {
598		config.HealthCheckInterval = healthCheckIntervalMins * time.Minute
599	}
600	if config.healthCheckSampleInterval == 0 {
601		config.healthCheckSampleInterval = time.Minute
602	}
603
604	_, instance, database, err := parseDatabaseName(sc.database)
605	if err != nil {
606		return nil, err
607	}
608	// Errors should not prevent initializing the session pool.
609	ctx, err := tag.New(context.Background(),
610		tag.Upsert(tagKeyClientID, sc.id),
611		tag.Upsert(tagKeyDatabase, database),
612		tag.Upsert(tagKeyInstance, instance),
613		tag.Upsert(tagKeyLibVersion, version.Repo),
614	)
615	if err != nil {
616		logf(pool.sc.logger, "Failed to create tag map, error: %v", err)
617	}
618	pool.tagMap = tag.FromContext(ctx)
619
620	// On GCE VM, within the same region an healthcheck ping takes on average
621	// 10ms to finish, given a 5 minutes interval and 10 healthcheck workers, a
622	// healthChecker can effectively mantain
623	// 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions.
624	pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, config.healthCheckSampleInterval, pool)
625
626	// First initialize the pool before we indicate that the healthchecker is
627	// ready. This prevents the maintainer from starting before the pool has
628	// been initialized, which means that we guarantee that the initial
629	// sessions are created using BatchCreateSessions.
630	if config.MinOpened > 0 {
631		numSessions := minUint64(config.MinOpened, math.MaxInt32)
632		if err := pool.initPool(numSessions); err != nil {
633			return nil, err
634		}
635	}
636	pool.recordStat(context.Background(), MaxAllowedSessionsCount, int64(config.MaxOpened))
637	close(pool.hc.ready)
638	return pool, nil
639}
640
641func (p *sessionPool) recordStat(ctx context.Context, m *stats.Int64Measure, n int64, tags ...tag.Tag) {
642	ctx = tag.NewContext(ctx, p.tagMap)
643	mutators := make([]tag.Mutator, len(tags))
644	for i, t := range tags {
645		mutators[i] = tag.Upsert(t.Key, t.Value)
646	}
647	ctx, err := tag.New(ctx, mutators...)
648	if err != nil {
649		logf(p.sc.logger, "Failed to tag metrics, error: %v", err)
650	}
651	recordStat(ctx, m, n)
652}
653
654func (p *sessionPool) initPool(numSessions uint64) error {
655	p.mu.Lock()
656	defer p.mu.Unlock()
657	return p.growPoolLocked(numSessions, true)
658}
659
660func (p *sessionPool) growPoolLocked(numSessions uint64, distributeOverChannels bool) error {
661	// Take budget before the actual session creation.
662	numSessions = minUint64(numSessions, math.MaxInt32)
663	p.numOpened += uint64(numSessions)
664	p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
665	p.createReqs += uint64(numSessions)
666	// Asynchronously create a batch of sessions for the pool.
667	return p.sc.batchCreateSessions(int32(numSessions), distributeOverChannels, p)
668}
669
670// sessionReady is executed by the SessionClient when a session has been
671// created and is ready to use. This method will add the new session to the
672// pool and decrease the number of sessions that is being created.
673func (p *sessionPool) sessionReady(s *session) {
674	p.mu.Lock()
675	defer p.mu.Unlock()
676	// Clear any session creation error.
677	p.sessionCreationError = nil
678	// Set this pool as the home pool of the session and register it with the
679	// health checker.
680	s.pool = p
681	p.hc.register(s)
682	p.createReqs--
683	// Insert the session at a random position in the pool to prevent all
684	// sessions affiliated with a channel to be placed at sequentially in the
685	// pool.
686	if p.idleList.Len() > 0 {
687		pos := rand.Intn(p.idleList.Len())
688		before := p.idleList.Front()
689		for i := 0; i < pos; i++ {
690			before = before.Next()
691		}
692		s.setIdleList(p.idleList.InsertBefore(s, before))
693	} else {
694		s.setIdleList(p.idleList.PushBack(s))
695	}
696	p.incNumReadsLocked(context.Background())
697	// Notify other waiters blocking on session creation.
698	close(p.mayGetSession)
699	p.mayGetSession = make(chan struct{})
700}
701
702// sessionCreationFailed is called by the SessionClient when the creation of one
703// or more requested sessions finished with an error. sessionCreationFailed will
704// decrease the number of sessions being created and notify any waiters that
705// the session creation failed.
706func (p *sessionPool) sessionCreationFailed(err error, numSessions int32) {
707	p.mu.Lock()
708	defer p.mu.Unlock()
709	p.createReqs -= uint64(numSessions)
710	p.numOpened -= uint64(numSessions)
711	p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
712	// Notify other waiters blocking on session creation.
713	p.sessionCreationError = err
714	close(p.mayGetSession)
715	p.mayGetSession = make(chan struct{})
716}
717
718// isValid checks if the session pool is still valid.
719func (p *sessionPool) isValid() bool {
720	if p == nil {
721		return false
722	}
723	p.mu.Lock()
724	defer p.mu.Unlock()
725	return p.valid
726}
727
728// close marks the session pool as closed.
729func (p *sessionPool) close() {
730	if p == nil {
731		return
732	}
733	p.mu.Lock()
734	if !p.valid {
735		p.mu.Unlock()
736		return
737	}
738	p.valid = false
739	p.mu.Unlock()
740	p.hc.close()
741	// destroy all the sessions
742	p.hc.mu.Lock()
743	allSessions := make([]*session, len(p.hc.queue.sessions))
744	copy(allSessions, p.hc.queue.sessions)
745	p.hc.mu.Unlock()
746	for _, s := range allSessions {
747		s.destroy(false)
748	}
749}
750
751// errInvalidSessionPool is the error for using an invalid session pool.
752var errInvalidSessionPool = spannerErrorf(codes.InvalidArgument, "invalid session pool")
753
754// errGetSessionTimeout returns error for context timeout during
755// sessionPool.take().
756var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canceled during getting session")
757
758// newSessionHandle creates a new session handle for the given session for this
759// session pool. The session handle will also hold a copy of the current call
760// stack if the session pool has been configured to track the call stacks of
761// sessions being checked out of the pool.
762func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) {
763	sh = &sessionHandle{session: s, checkoutTime: time.Now()}
764	if p.TrackSessionHandles {
765		p.mu.Lock()
766		sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh)
767		p.mu.Unlock()
768		sh.stack = debug.Stack()
769	}
770	return sh
771}
772
773// errGetSessionTimeout returns error for context timeout during
774// sessionPool.take().
775func (p *sessionPool) errGetSessionTimeout(ctx context.Context) error {
776	var code codes.Code
777	if ctx.Err() == context.DeadlineExceeded {
778		code = codes.DeadlineExceeded
779	} else {
780		code = codes.Canceled
781	}
782	if p.TrackSessionHandles {
783		return p.errGetSessionTimeoutWithTrackedSessionHandles(code)
784	}
785	return p.errGetBasicSessionTimeout(code)
786}
787
788// errGetBasicSessionTimeout returns error for context timout during
789// sessionPool.take() without any tracked sessionHandles.
790func (p *sessionPool) errGetBasicSessionTimeout(code codes.Code) error {
791	return spannerErrorf(code, "timeout / context canceled during getting session.\n"+
792		"Enable SessionPoolConfig.TrackSessionHandles if you suspect a session leak to get more information about the checked out sessions.")
793}
794
795// errGetSessionTimeoutWithTrackedSessionHandles returns error for context
796// timout during sessionPool.take() including a stacktrace of each checked out
797// session handle.
798func (p *sessionPool) errGetSessionTimeoutWithTrackedSessionHandles(code codes.Code) error {
799	err := spannerErrorf(code, "timeout / context canceled during getting session.")
800	err.(*Error).additionalInformation = p.getTrackedSessionHandleStacksLocked()
801	return err
802}
803
804// getTrackedSessionHandleStacksLocked returns a string containing the
805// stacktrace of all currently checked out sessions of the pool. This method
806// requires the caller to have locked p.mu.
807func (p *sessionPool) getTrackedSessionHandleStacksLocked() string {
808	p.mu.Lock()
809	defer p.mu.Unlock()
810	stackTraces := ""
811	i := 1
812	element := p.trackedSessionHandles.Front()
813	for element != nil {
814		sh := element.Value.(*sessionHandle)
815		sh.mu.Lock()
816		if sh.stack != nil {
817			stackTraces = fmt.Sprintf("%s\n\nSession %d checked out of pool at %s by goroutine:\n%s", stackTraces, i, sh.checkoutTime.Format(time.RFC3339), sh.stack)
818		}
819		sh.mu.Unlock()
820		element = element.Next()
821		i++
822	}
823	return stackTraces
824}
825
826// shouldPrepareWriteLocked returns true if we should prepare more sessions for write.
827func (p *sessionPool) shouldPrepareWriteLocked() bool {
828	return !p.disableBackgroundPrepareSessions && float64(p.numOpened)*p.WriteSessions > float64(p.idleWriteList.Len()+int(p.prepareReqs))
829}
830
831func (p *sessionPool) createSession(ctx context.Context) (*session, error) {
832	trace.TracePrintf(ctx, nil, "Creating a new session")
833	doneCreate := func(done bool) {
834		p.mu.Lock()
835		if !done {
836			// Session creation failed, give budget back.
837			p.numOpened--
838			p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
839		}
840		p.createReqs--
841		// Notify other waiters blocking on session creation.
842		close(p.mayGetSession)
843		p.mayGetSession = make(chan struct{})
844		p.mu.Unlock()
845	}
846	s, err := p.sc.createSession(ctx)
847	if err != nil {
848		doneCreate(false)
849		// Should return error directly because of the previous retries on
850		// CreateSession RPC.
851		// If the error is a timeout, there is a chance that the session was
852		// created on the server but is not known to the session pool. This
853		// session will then be garbage collected by the server after 1 hour.
854		return nil, err
855	}
856	s.pool = p
857	p.hc.register(s)
858	doneCreate(true)
859	return s, nil
860}
861
862func (p *sessionPool) isHealthy(s *session) bool {
863	if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) {
864		if err := s.ping(); isSessionNotFoundError(err) {
865			// The session is already bad, continue to fetch/create a new one.
866			s.destroy(false)
867			return false
868		}
869		p.hc.scheduledHC(s)
870	}
871	return true
872}
873
874// take returns a cached session if there are available ones; if there isn't
875// any, it tries to allocate a new one. Session returned by take should be used
876// for read operations.
877func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
878	trace.TracePrintf(ctx, nil, "Acquiring a read-only session")
879	for {
880		var s *session
881
882		p.mu.Lock()
883		if !p.valid {
884			p.mu.Unlock()
885			return nil, errInvalidSessionPool
886		}
887		if p.idleList.Len() > 0 {
888			// Idle sessions are available, get one from the top of the idle
889			// list.
890			s = p.idleList.Remove(p.idleList.Front()).(*session)
891			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
892				"Acquired read-only session")
893			p.decNumReadsLocked(ctx)
894		} else if p.idleWriteList.Len() > 0 {
895			s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session)
896			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
897				"Acquired read-write session")
898			p.decNumWritesLocked(ctx)
899		}
900		if s != nil {
901			s.setIdleList(nil)
902			numCheckedOut := p.currSessionsCheckedOutLocked()
903			p.mu.Unlock()
904			p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
905			// From here, session is no longer in idle list, so healthcheck
906			// workers won't destroy it. If healthcheck workers failed to
907			// schedule healthcheck for the session timely, do the check here.
908			// Because session check is still much cheaper than session
909			// creation, they should be reused as much as possible.
910			if !p.isHealthy(s) {
911				continue
912			}
913			p.incNumInUse(ctx)
914			return p.newSessionHandle(s), nil
915		}
916
917		// No session available. Start the creation of a new batch of sessions
918		// if that is allowed, and then wait for a session to come available.
919		if p.numReadWaiters+p.numWriteWaiters >= p.createReqs {
920			numSessions := minUint64(p.MaxOpened-p.numOpened, p.incStep)
921			if err := p.growPoolLocked(numSessions, false); err != nil {
922				p.mu.Unlock()
923				return nil, err
924			}
925		}
926
927		p.numReadWaiters++
928		mayGetSession := p.mayGetSession
929		p.mu.Unlock()
930		trace.TracePrintf(ctx, nil, "Waiting for read-only session to become available")
931		select {
932		case <-ctx.Done():
933			trace.TracePrintf(ctx, nil, "Context done waiting for session")
934			p.recordStat(ctx, GetSessionTimeoutsCount, 1)
935			p.mu.Lock()
936			p.numReadWaiters--
937			p.mu.Unlock()
938			return nil, p.errGetSessionTimeout(ctx)
939		case <-mayGetSession:
940			p.mu.Lock()
941			p.numReadWaiters--
942			if p.sessionCreationError != nil {
943				trace.TracePrintf(ctx, nil, "Error creating session: %v", p.sessionCreationError)
944				err := p.sessionCreationError
945				p.mu.Unlock()
946				return nil, err
947			}
948			p.mu.Unlock()
949		}
950	}
951}
952
953// takeWriteSession returns a write prepared cached session if there are
954// available ones; if there isn't any, it tries to allocate a new one. Session
955// returned should be used for read write transactions.
956func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) {
957	trace.TracePrintf(ctx, nil, "Acquiring a read-write session")
958	for {
959		var (
960			s   *session
961			err error
962		)
963
964		p.mu.Lock()
965		if !p.valid {
966			p.mu.Unlock()
967			return nil, errInvalidSessionPool
968		}
969		if p.idleWriteList.Len() > 0 {
970			// Idle sessions are available, get one from the top of the idle
971			// list.
972			s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session)
973			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-write session")
974			p.decNumWritesLocked(ctx)
975		} else if p.idleList.Len() > 0 {
976			s = p.idleList.Remove(p.idleList.Front()).(*session)
977			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-only session")
978			p.decNumReadsLocked(ctx)
979		}
980		if s != nil {
981			s.setIdleList(nil)
982			numCheckedOut := p.currSessionsCheckedOutLocked()
983			p.mu.Unlock()
984			p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
985			// From here, session is no longer in idle list, so healthcheck
986			// workers won't destroy it. If healthcheck workers failed to
987			// schedule healthcheck for the session timely, do the check here.
988			// Because session check is still much cheaper than session
989			// creation, they should be reused as much as possible.
990			if !p.isHealthy(s) {
991				continue
992			}
993		} else {
994			// No session available. Start the creation of a new batch of sessions
995			// if that is allowed, and then wait for a session to come available.
996			if p.numReadWaiters+p.numWriteWaiters >= p.createReqs {
997				numSessions := minUint64(p.MaxOpened-p.numOpened, p.incStep)
998				if err := p.growPoolLocked(numSessions, false); err != nil {
999					p.mu.Unlock()
1000					return nil, err
1001				}
1002			}
1003
1004			p.numWriteWaiters++
1005			mayGetSession := p.mayGetSession
1006			p.mu.Unlock()
1007			trace.TracePrintf(ctx, nil, "Waiting for read-write session to become available")
1008			select {
1009			case <-ctx.Done():
1010				trace.TracePrintf(ctx, nil, "Context done waiting for session")
1011				p.recordStat(ctx, GetSessionTimeoutsCount, 1)
1012				p.mu.Lock()
1013				p.numWriteWaiters--
1014				p.mu.Unlock()
1015				return nil, p.errGetSessionTimeout(ctx)
1016			case <-mayGetSession:
1017				p.mu.Lock()
1018				p.numWriteWaiters--
1019				if p.sessionCreationError != nil {
1020					err := p.sessionCreationError
1021					p.mu.Unlock()
1022					return nil, err
1023				}
1024				p.mu.Unlock()
1025			}
1026			continue
1027		}
1028		if !s.isWritePrepared() {
1029			p.incNumBeingPrepared(ctx)
1030			defer p.decNumBeingPrepared(ctx)
1031			if err = s.prepareForWrite(ctx); err != nil {
1032				if isSessionNotFoundError(err) {
1033					s.destroy(false)
1034					trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
1035						"Session not found for write")
1036					return nil, toSpannerError(err)
1037				}
1038
1039				s.recycle()
1040				trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
1041					"Error preparing session for write")
1042				return nil, toSpannerError(err)
1043			}
1044		}
1045		p.incNumInUse(ctx)
1046		return p.newSessionHandle(s), nil
1047	}
1048}
1049
1050// recycle puts session s back to the session pool's idle list, it returns true
1051// if the session pool successfully recycles session s.
1052func (p *sessionPool) recycle(s *session) bool {
1053	p.mu.Lock()
1054	defer p.mu.Unlock()
1055	if !s.isValid() || !p.valid {
1056		// Reject the session if session is invalid or pool itself is invalid.
1057		return false
1058	}
1059	ctx := context.Background()
1060	// Put session at the top of the list to be handed out in LIFO order for load balancing
1061	// across channels.
1062	if s.isWritePrepared() {
1063		s.setIdleList(p.idleWriteList.PushFront(s))
1064		p.incNumWritesLocked(ctx)
1065	} else {
1066		s.setIdleList(p.idleList.PushFront(s))
1067		p.incNumReadsLocked(ctx)
1068	}
1069	// Broadcast that a session has been returned to idle list.
1070	close(p.mayGetSession)
1071	p.mayGetSession = make(chan struct{})
1072	return true
1073}
1074
1075// remove atomically removes session s from the session pool and invalidates s.
1076// If isExpire == true, the removal is triggered by session expiration and in
1077// such cases, only idle sessions can be removed.
1078func (p *sessionPool) remove(s *session, isExpire bool) bool {
1079	p.mu.Lock()
1080	defer p.mu.Unlock()
1081	if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) {
1082		// Don't expire session if the session is not in idle list (in use), or
1083		// if number of open sessions is going below p.MinOpened.
1084		return false
1085	}
1086	ol := s.setIdleList(nil)
1087	ctx := context.Background()
1088	// If the session is in the idlelist, remove it.
1089	if ol != nil {
1090		// Remove from whichever list it is in.
1091		p.idleList.Remove(ol)
1092		p.idleWriteList.Remove(ol)
1093		if s.isWritePrepared() {
1094			p.decNumWritesLocked(ctx)
1095		} else {
1096			p.decNumReadsLocked(ctx)
1097		}
1098	}
1099	if s.invalidate() {
1100		// Decrease the number of opened sessions.
1101		p.numOpened--
1102		p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
1103		// Broadcast that a session has been destroyed.
1104		close(p.mayGetSession)
1105		p.mayGetSession = make(chan struct{})
1106		return true
1107	}
1108	return false
1109}
1110
1111func (p *sessionPool) currSessionsCheckedOutLocked() uint64 {
1112	return p.numOpened - uint64(p.idleList.Len()) - uint64(p.idleWriteList.Len())
1113}
1114
1115func (p *sessionPool) incNumInUse(ctx context.Context) {
1116	p.mu.Lock()
1117	p.incNumInUseLocked(ctx)
1118	p.mu.Unlock()
1119}
1120
1121func (p *sessionPool) incNumInUseLocked(ctx context.Context) {
1122	p.numInUse++
1123	p.recordStat(ctx, SessionsCount, int64(p.numInUse), tagNumInUseSessions)
1124	p.recordStat(ctx, AcquiredSessionsCount, 1)
1125	if p.numInUse > p.maxNumInUse {
1126		p.maxNumInUse = p.numInUse
1127		p.recordStat(ctx, MaxInUseSessionsCount, int64(p.maxNumInUse))
1128	}
1129}
1130
1131func (p *sessionPool) decNumInUse(ctx context.Context) {
1132	p.mu.Lock()
1133	p.decNumInUseLocked(ctx)
1134	p.mu.Unlock()
1135}
1136
1137func (p *sessionPool) decNumInUseLocked(ctx context.Context) {
1138	p.numInUse--
1139	p.recordStat(ctx, SessionsCount, int64(p.numInUse), tagNumInUseSessions)
1140	p.recordStat(ctx, ReleasedSessionsCount, 1)
1141}
1142
1143func (p *sessionPool) incNumReadsLocked(ctx context.Context) {
1144	p.numReads++
1145	p.recordStat(ctx, SessionsCount, int64(p.numReads), tagNumReadSessions)
1146}
1147
1148func (p *sessionPool) decNumReadsLocked(ctx context.Context) {
1149	p.numReads--
1150	p.recordStat(ctx, SessionsCount, int64(p.numReads), tagNumReadSessions)
1151}
1152
1153func (p *sessionPool) incNumWritesLocked(ctx context.Context) {
1154	p.numWrites++
1155	p.recordStat(ctx, SessionsCount, int64(p.numWrites), tagNumWriteSessions)
1156}
1157
1158func (p *sessionPool) decNumWritesLocked(ctx context.Context) {
1159	p.numWrites--
1160	p.recordStat(ctx, SessionsCount, int64(p.numWrites), tagNumWriteSessions)
1161}
1162
1163func (p *sessionPool) incNumBeingPrepared(ctx context.Context) {
1164	p.mu.Lock()
1165	p.incNumBeingPreparedLocked(ctx)
1166	p.mu.Unlock()
1167}
1168
1169func (p *sessionPool) incNumBeingPreparedLocked(ctx context.Context) {
1170	p.prepareReqs++
1171	p.recordStat(ctx, SessionsCount, int64(p.prepareReqs), tagNumBeingPrepared)
1172}
1173
1174func (p *sessionPool) decNumBeingPrepared(ctx context.Context) {
1175	p.mu.Lock()
1176	p.decNumBeingPreparedLocked(ctx)
1177	p.mu.Unlock()
1178}
1179
1180func (p *sessionPool) decNumBeingPreparedLocked(ctx context.Context) {
1181	p.prepareReqs--
1182	p.recordStat(ctx, SessionsCount, int64(p.prepareReqs), tagNumBeingPrepared)
1183}
1184
1185// hcHeap implements heap.Interface. It is used to create the priority queue for
1186// session healthchecks.
1187type hcHeap struct {
1188	sessions []*session
1189}
1190
1191// Len implements heap.Interface.Len.
1192func (h hcHeap) Len() int {
1193	return len(h.sessions)
1194}
1195
1196// Less implements heap.Interface.Less.
1197func (h hcHeap) Less(i, j int) bool {
1198	return h.sessions[i].getNextCheck().Before(h.sessions[j].getNextCheck())
1199}
1200
1201// Swap implements heap.Interface.Swap.
1202func (h hcHeap) Swap(i, j int) {
1203	h.sessions[i], h.sessions[j] = h.sessions[j], h.sessions[i]
1204	h.sessions[i].setHcIndex(i)
1205	h.sessions[j].setHcIndex(j)
1206}
1207
1208// Push implements heap.Interface.Push.
1209func (h *hcHeap) Push(s interface{}) {
1210	ns := s.(*session)
1211	ns.setHcIndex(len(h.sessions))
1212	h.sessions = append(h.sessions, ns)
1213}
1214
1215// Pop implements heap.Interface.Pop.
1216func (h *hcHeap) Pop() interface{} {
1217	old := h.sessions
1218	n := len(old)
1219	s := old[n-1]
1220	h.sessions = old[:n-1]
1221	s.setHcIndex(-1)
1222	return s
1223}
1224
1225// maintenanceWindowSize specifies the number of health check cycles that
1226// defines a maintenance window. The maintenance window keeps track of a
1227// rolling set of numbers for the number of maximum checked out sessions during
1228// the maintenance window. This is used by the maintainer to determine the
1229// number of sessions to create or delete at the end of each health check
1230// cycle.
1231const maintenanceWindowSize = 10
1232
1233// maintenanceWindow contains the statistics that are gathered during a health
1234// check maintenance window.
1235type maintenanceWindow struct {
1236	mu sync.Mutex
1237	// maxSessionsCheckedOut contains the maximum number of sessions that was
1238	// checked out of the session pool during a health check cycle. This number
1239	// indicates the number of sessions that was actually needed by the pool to
1240	// serve the load during that cycle. The values are kept as a rolling set
1241	// containing the values for the past 10 cycles (minutes). The maintainer
1242	// uses these values to determine the number of sessions to keep at the end
1243	// of each cycle.
1244	maxSessionsCheckedOut [maintenanceWindowSize]uint64
1245}
1246
1247// maxSessionsCheckedOutDuringWindow returns the maximum number of sessions
1248// that has been checked out during the last maintenance window of 10 cycles
1249// (minutes).
1250func (mw *maintenanceWindow) maxSessionsCheckedOutDuringWindow() uint64 {
1251	mw.mu.Lock()
1252	defer mw.mu.Unlock()
1253	var max uint64
1254	for _, cycleMax := range mw.maxSessionsCheckedOut {
1255		max = maxUint64(max, cycleMax)
1256	}
1257	return max
1258}
1259
1260// updateMaxSessionsCheckedOutDuringWindow updates the maximum number of
1261// sessions that has been checked out of the pool during the current
1262// cycle of the maintenance window. A maintenance window consists of 10
1263// maintenance cycles. Each cycle keeps track of the max number of sessions in
1264// use during that cycle. The rolling maintenance window of 10 cycles is used
1265// to determine the number of sessions to keep at the end of a cycle by
1266// calculating the max in use during the last 10 cycles.
1267func (mw *maintenanceWindow) updateMaxSessionsCheckedOutDuringWindow(currNumSessionsCheckedOut uint64) {
1268	mw.mu.Lock()
1269	defer mw.mu.Unlock()
1270	mw.maxSessionsCheckedOut[0] = maxUint64(currNumSessionsCheckedOut, mw.maxSessionsCheckedOut[0])
1271}
1272
1273// startNewCycle starts a new health check cycle with the specified number of
1274// checked out sessions as its initial value.
1275func (mw *maintenanceWindow) startNewCycle(currNumSessionsCheckedOut uint64) {
1276	mw.mu.Lock()
1277	defer mw.mu.Unlock()
1278	copy(mw.maxSessionsCheckedOut[1:], mw.maxSessionsCheckedOut[:9])
1279	mw.maxSessionsCheckedOut[0] = currNumSessionsCheckedOut
1280}
1281
1282// newMaintenanceWindow creates a new maintenance window with all values for
1283// maxSessionsCheckedOut set to maxOpened. This ensures that a complete
1284// maintenance window must pass before the maintainer will start to delete any
1285// sessions.
1286func newMaintenanceWindow(maxOpened uint64) *maintenanceWindow {
1287	mw := &maintenanceWindow{}
1288	// Initialize the rolling window with max values to prevent the maintainer
1289	// from deleting sessions before a complete window of 10 cycles has
1290	// finished.
1291	for i := 0; i < maintenanceWindowSize; i++ {
1292		mw.maxSessionsCheckedOut[i] = maxOpened
1293	}
1294	return mw
1295}
1296
1297// healthChecker performs periodical healthchecks on registered sessions.
1298type healthChecker struct {
1299	// mu protects concurrent access to healthChecker.
1300	mu sync.Mutex
1301	// queue is the priority queue for session healthchecks. Sessions with lower
1302	// nextCheck rank higher in the queue.
1303	queue hcHeap
1304	// interval is the average interval between two healthchecks on a session.
1305	interval time.Duration
1306	// workers is the number of concurrent healthcheck workers.
1307	workers int
1308	// waitWorkers waits for all healthcheck workers to exit
1309	waitWorkers sync.WaitGroup
1310	// pool is the underlying session pool.
1311	pool *sessionPool
1312	// sampleInterval is the interval of sampling by the maintainer.
1313	sampleInterval time.Duration
1314	// ready is used to signal that maintainer can start running.
1315	ready chan struct{}
1316	// done is used to signal that health checker should be closed.
1317	done chan struct{}
1318	// once is used for closing channel done only once.
1319	once             sync.Once
1320	maintainerCancel func()
1321}
1322
1323// newHealthChecker initializes new instance of healthChecker.
1324func newHealthChecker(interval time.Duration, workers int, sampleInterval time.Duration, pool *sessionPool) *healthChecker {
1325	if workers <= 0 {
1326		workers = 1
1327	}
1328	hc := &healthChecker{
1329		interval:         interval,
1330		workers:          workers,
1331		pool:             pool,
1332		sampleInterval:   sampleInterval,
1333		ready:            make(chan struct{}),
1334		done:             make(chan struct{}),
1335		maintainerCancel: func() {},
1336	}
1337	hc.waitWorkers.Add(1)
1338	go hc.maintainer()
1339	for i := 1; i <= hc.workers; i++ {
1340		hc.waitWorkers.Add(1)
1341		go hc.worker(i)
1342	}
1343	return hc
1344}
1345
1346// close closes the healthChecker and waits for all healthcheck workers to exit.
1347func (hc *healthChecker) close() {
1348	hc.mu.Lock()
1349	hc.maintainerCancel()
1350	hc.mu.Unlock()
1351	hc.once.Do(func() { close(hc.done) })
1352	hc.waitWorkers.Wait()
1353}
1354
1355// isClosing checks if a healthChecker is already closing.
1356func (hc *healthChecker) isClosing() bool {
1357	select {
1358	case <-hc.done:
1359		return true
1360	default:
1361		return false
1362	}
1363}
1364
1365// getInterval gets the healthcheck interval.
1366func (hc *healthChecker) getInterval() time.Duration {
1367	hc.mu.Lock()
1368	defer hc.mu.Unlock()
1369	return hc.interval
1370}
1371
1372// scheduledHCLocked schedules next healthcheck on session s with the assumption
1373// that hc.mu is being held.
1374func (hc *healthChecker) scheduledHCLocked(s *session) {
1375	var constPart, randPart float64
1376	if !s.firstHCDone {
1377		// The first check will be scheduled in a large range to make requests
1378		// more evenly distributed. The first healthcheck will be scheduled
1379		// after [interval*0.2, interval*1.1) ns.
1380		constPart = float64(hc.interval) * 0.2
1381		randPart = hc.pool.rand.Float64() * float64(hc.interval) * 0.9
1382		s.firstHCDone = true
1383	} else {
1384		// The next healthcheck will be scheduled after
1385		// [interval*0.9, interval*1.1) ns.
1386		constPart = float64(hc.interval) * 0.9
1387		randPart = hc.pool.rand.Float64() * float64(hc.interval) * 0.2
1388	}
1389	// math.Ceil makes the value to be at least 1 ns.
1390	nsFromNow := int64(math.Ceil(constPart + randPart))
1391	s.setNextCheck(time.Now().Add(time.Duration(nsFromNow)))
1392	if hi := s.getHcIndex(); hi != -1 {
1393		// Session is still being tracked by healthcheck workers.
1394		heap.Fix(&hc.queue, hi)
1395	}
1396}
1397
1398// scheduledHC schedules next healthcheck on session s. It is safe to be called
1399// concurrently.
1400func (hc *healthChecker) scheduledHC(s *session) {
1401	hc.mu.Lock()
1402	defer hc.mu.Unlock()
1403	hc.scheduledHCLocked(s)
1404}
1405
1406// register registers a session with healthChecker for periodical healthcheck.
1407func (hc *healthChecker) register(s *session) {
1408	hc.mu.Lock()
1409	defer hc.mu.Unlock()
1410	hc.scheduledHCLocked(s)
1411	heap.Push(&hc.queue, s)
1412}
1413
1414// unregister unregisters a session from healthcheck queue.
1415func (hc *healthChecker) unregister(s *session) {
1416	hc.mu.Lock()
1417	defer hc.mu.Unlock()
1418	oi := s.setHcIndex(-1)
1419	if oi >= 0 {
1420		heap.Remove(&hc.queue, oi)
1421	}
1422}
1423
1424// markDone marks that health check for session has been performed.
1425func (hc *healthChecker) markDone(s *session) {
1426	hc.mu.Lock()
1427	defer hc.mu.Unlock()
1428	s.checkingHealth = false
1429}
1430
1431// healthCheck checks the health of the session and pings it if needed.
1432func (hc *healthChecker) healthCheck(s *session) {
1433	defer hc.markDone(s)
1434	if !s.pool.isValid() {
1435		// Session pool is closed, perform a garbage collection.
1436		s.destroy(false)
1437		return
1438	}
1439	if err := s.ping(); isSessionNotFoundError(err) {
1440		// Ping failed, destroy the session.
1441		s.destroy(false)
1442	}
1443}
1444
1445// worker performs the healthcheck on sessions in healthChecker's priority
1446// queue.
1447func (hc *healthChecker) worker(i int) {
1448	// Returns a session which we should ping to keep it alive.
1449	getNextForPing := func() *session {
1450		hc.pool.mu.Lock()
1451		defer hc.pool.mu.Unlock()
1452		hc.mu.Lock()
1453		defer hc.mu.Unlock()
1454		if hc.queue.Len() <= 0 {
1455			// Queue is empty.
1456			return nil
1457		}
1458		s := hc.queue.sessions[0]
1459		if s.getNextCheck().After(time.Now()) && hc.pool.valid {
1460			// All sessions have been checked recently.
1461			return nil
1462		}
1463		hc.scheduledHCLocked(s)
1464		if !s.checkingHealth {
1465			s.checkingHealth = true
1466			return s
1467		}
1468		return nil
1469	}
1470
1471	// Returns a session which we should prepare for write.
1472	getNextForTx := func() *session {
1473		hc.pool.mu.Lock()
1474		defer hc.pool.mu.Unlock()
1475		if hc.pool.shouldPrepareWriteLocked() {
1476			if hc.pool.idleList.Len() > 0 && hc.pool.valid {
1477				hc.mu.Lock()
1478				defer hc.mu.Unlock()
1479				if hc.pool.idleList.Front().Value.(*session).checkingHealth {
1480					return nil
1481				}
1482				session := hc.pool.idleList.Remove(hc.pool.idleList.Front()).(*session)
1483				ctx := context.Background()
1484				hc.pool.decNumReadsLocked(ctx)
1485				session.checkingHealth = true
1486				hc.pool.incNumBeingPreparedLocked(ctx)
1487				return session
1488			}
1489		}
1490		return nil
1491	}
1492
1493	for {
1494		if hc.isClosing() {
1495			// Exit when the pool has been closed and all sessions have been
1496			// destroyed or when health checker has been closed.
1497			hc.waitWorkers.Done()
1498			return
1499		}
1500		ws := getNextForTx()
1501		if ws != nil {
1502			ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
1503			err := ws.prepareForWrite(ctx)
1504			if err != nil {
1505				// Skip handling prepare error, session can be prepared in next
1506				// cycle.
1507				// Don't log about permission errors, which may be expected
1508				// (e.g. using read-only auth).
1509				serr := toSpannerError(err).(*Error)
1510				if serr.Code != codes.PermissionDenied {
1511					logf(hc.pool.sc.logger, "Failed to prepare session, error: %v", serr)
1512				}
1513			}
1514			hc.pool.recycle(ws)
1515			hc.pool.mu.Lock()
1516			hc.pool.decNumBeingPreparedLocked(ctx)
1517			hc.pool.mu.Unlock()
1518			cancel()
1519			hc.markDone(ws)
1520		}
1521		rs := getNextForPing()
1522		if rs == nil {
1523			if ws == nil {
1524				// No work to be done so sleep to avoid burning CPU.
1525				pause := int64(100 * time.Millisecond)
1526				if pause > int64(hc.interval) {
1527					pause = int64(hc.interval)
1528				}
1529				select {
1530				case <-time.After(time.Duration(rand.Int63n(pause) + pause/2)):
1531				case <-hc.done:
1532				}
1533
1534			}
1535			continue
1536		}
1537		hc.healthCheck(rs)
1538	}
1539}
1540
1541// maintainer maintains the number of sessions in the pool based on the session
1542// pool configuration and the current and historical number of sessions checked
1543// out of the pool. The maintainer will:
1544// 1. Ensure that the session pool contains at least MinOpened sessions.
1545// 2. If the current number of sessions in the pool exceeds the greatest number
1546//    of checked out sessions (=sessions in use) during the last 10 minutes,
1547//    and the delta is larger than MaxIdleSessions, the maintainer will reduce
1548//    the number of sessions to maxSessionsInUseDuringWindow+MaxIdleSessions.
1549func (hc *healthChecker) maintainer() {
1550	// Wait until the pool is ready.
1551	<-hc.ready
1552
1553	for iteration := uint64(0); ; iteration++ {
1554		if hc.isClosing() {
1555			hc.waitWorkers.Done()
1556			return
1557		}
1558
1559		hc.pool.mu.Lock()
1560		currSessionsOpened := hc.pool.numOpened
1561		maxIdle := hc.pool.MaxIdle
1562		minOpened := hc.pool.MinOpened
1563
1564		// Reset the start time for recording the maximum number of sessions
1565		// in the pool.
1566		now := time.Now()
1567		if now.After(hc.pool.lastResetTime.Add(10 * time.Minute)) {
1568			hc.pool.maxNumInUse = hc.pool.numInUse
1569			hc.pool.recordStat(context.Background(), MaxInUseSessionsCount, int64(hc.pool.maxNumInUse))
1570			hc.pool.lastResetTime = now
1571		}
1572		hc.pool.mu.Unlock()
1573		// Get the maximum number of sessions in use during the current
1574		// maintenance window.
1575		maxSessionsInUseDuringWindow := hc.pool.mw.maxSessionsCheckedOutDuringWindow()
1576		hc.mu.Lock()
1577		ctx, cancel := context.WithTimeout(context.Background(), hc.sampleInterval)
1578		hc.maintainerCancel = cancel
1579		hc.mu.Unlock()
1580
1581		// Grow or shrink pool if needed.
1582		// The number of sessions in the pool should be in the range
1583		// [Config.MinOpened, Config.MaxIdle+maxSessionsInUseDuringWindow]
1584		if currSessionsOpened < minOpened {
1585			if err := hc.growPoolInBatch(ctx, minOpened); err != nil {
1586				logf(hc.pool.sc.logger, "failed to grow pool: %v", err)
1587			}
1588		} else if maxIdle+maxSessionsInUseDuringWindow < currSessionsOpened {
1589			hc.shrinkPool(ctx, maxIdle+maxSessionsInUseDuringWindow)
1590		}
1591
1592		select {
1593		case <-ctx.Done():
1594		case <-hc.done:
1595			cancel()
1596		}
1597		// Cycle the maintenance window. This will remove the oldest cycle and
1598		// add a new cycle at the beginning of the maintenance window with the
1599		// currently checked out number of sessions as the max number of
1600		// sessions in use in this cycle. This value will be increased during
1601		// the next cycle if it increases.
1602		hc.pool.mu.Lock()
1603		currSessionsInUse := hc.pool.currSessionsCheckedOutLocked()
1604		hc.pool.mu.Unlock()
1605		hc.pool.mw.startNewCycle(currSessionsInUse)
1606	}
1607}
1608
1609func (hc *healthChecker) growPoolInBatch(ctx context.Context, growToNumSessions uint64) error {
1610	hc.pool.mu.Lock()
1611	defer hc.pool.mu.Unlock()
1612	numSessions := growToNumSessions - hc.pool.numOpened
1613	return hc.pool.growPoolLocked(numSessions, false)
1614}
1615
1616// shrinkPool scales down the session pool. The method will stop deleting
1617// sessions when shrinkToNumSessions number of sessions in the pool has
1618// been reached. The method will also stop deleting sessions if it detects that
1619// another process has started creating sessions for the pool again, for
1620// example through the take() method.
1621func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uint64) {
1622	hc.pool.mu.Lock()
1623	maxSessionsToDelete := int(hc.pool.numOpened - shrinkToNumSessions)
1624	hc.pool.mu.Unlock()
1625	var deleted int
1626	var prevNumOpened uint64 = math.MaxUint64
1627	for {
1628		if ctx.Err() != nil {
1629			return
1630		}
1631
1632		p := hc.pool
1633		p.mu.Lock()
1634		// Check if the number of open sessions has increased. If it has, we
1635		// should stop deleting sessions, as the load has increased and
1636		// additional sessions are needed.
1637		if p.numOpened >= prevNumOpened {
1638			p.mu.Unlock()
1639			break
1640		}
1641		prevNumOpened = p.numOpened
1642
1643		// Check on both whether we have reached the number of open sessions as
1644		// well as the number of sessions to delete, in case sessions have been
1645		// deleted by other methods because they have expired or deemed
1646		// invalid.
1647		if shrinkToNumSessions >= p.numOpened || deleted >= maxSessionsToDelete {
1648			p.mu.Unlock()
1649			break
1650		}
1651
1652		var s *session
1653		if p.idleList.Len() > 0 {
1654			s = p.idleList.Front().Value.(*session)
1655		} else if p.idleWriteList.Len() > 0 {
1656			s = p.idleWriteList.Front().Value.(*session)
1657		}
1658		p.mu.Unlock()
1659		if s != nil {
1660			deleted++
1661			// destroy session as expire.
1662			s.destroy(true)
1663		} else {
1664			break
1665		}
1666	}
1667}
1668
1669// maxUint64 returns the maximum of two uint64.
1670func maxUint64(a, b uint64) uint64 {
1671	if a > b {
1672		return a
1673	}
1674	return b
1675}
1676
1677// minUint64 returns the minimum of two uint64.
1678func minUint64(a, b uint64) uint64 {
1679	if a > b {
1680		return b
1681	}
1682	return a
1683}
1684
1685// sessionResourceType is the type name of Spanner sessions.
1686const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session"
1687
1688// isSessionNotFoundError returns true if the given error is a
1689// `Session not found` error.
1690func isSessionNotFoundError(err error) bool {
1691	if err == nil {
1692		return false
1693	}
1694	if ErrCode(err) == codes.NotFound {
1695		if rt, ok := extractResourceType(err); ok {
1696			return rt == sessionResourceType
1697		}
1698	}
1699	return false
1700}
1701