1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"container/heap"
21	"container/list"
22	"context"
23	"fmt"
24	"log"
25	"math"
26	"math/rand"
27	"runtime/debug"
28	"sync"
29	"time"
30
31	"cloud.google.com/go/internal/trace"
32	"cloud.google.com/go/internal/version"
33	vkit "cloud.google.com/go/spanner/apiv1"
34	"go.opencensus.io/stats"
35	"go.opencensus.io/tag"
36	octrace "go.opencensus.io/trace"
37	sppb "google.golang.org/genproto/googleapis/spanner/v1"
38	"google.golang.org/grpc/codes"
39	"google.golang.org/grpc/metadata"
40)
41
42const healthCheckIntervalMins = 50
43
44// sessionHandle is an interface for transactions to access Cloud Spanner
45// sessions safely. It is generated by sessionPool.take().
46type sessionHandle struct {
47	// mu guarantees that the inner session object is returned / destroyed only
48	// once.
49	mu sync.Mutex
50	// session is a pointer to a session object. Transactions never need to
51	// access it directly.
52	session *session
53	// checkoutTime is the time the session was checked out of the pool.
54	checkoutTime time.Time
55	// trackedSessionHandle is the linked list node which links the session to
56	// the list of tracked session handles. trackedSessionHandle is only set if
57	// TrackSessionHandles has been enabled in the session pool configuration.
58	trackedSessionHandle *list.Element
59	// stack is the call stack of the goroutine that checked out the session
60	// from the pool. This can be used to track down session leak problems.
61	stack []byte
62}
63
64// recycle gives the inner session object back to its home session pool. It is
65// safe to call recycle multiple times but only the first one would take effect.
66func (sh *sessionHandle) recycle() {
67	sh.mu.Lock()
68	if sh.session == nil {
69		// sessionHandle has already been recycled.
70		sh.mu.Unlock()
71		return
72	}
73	p := sh.session.pool
74	tracked := sh.trackedSessionHandle
75	sh.session.recycle()
76	sh.session = nil
77	sh.trackedSessionHandle = nil
78	sh.checkoutTime = time.Time{}
79	sh.stack = nil
80	sh.mu.Unlock()
81	if tracked != nil {
82		p.mu.Lock()
83		p.trackedSessionHandles.Remove(tracked)
84		p.mu.Unlock()
85	}
86}
87
88// getID gets the Cloud Spanner session ID from the internal session object.
89// getID returns empty string if the sessionHandle is nil or the inner session
90// object has been released by recycle / destroy.
91func (sh *sessionHandle) getID() string {
92	sh.mu.Lock()
93	defer sh.mu.Unlock()
94	if sh.session == nil {
95		// sessionHandle has already been recycled/destroyed.
96		return ""
97	}
98	return sh.session.getID()
99}
100
101// getClient gets the Cloud Spanner RPC client associated with the session ID
102// in sessionHandle.
103func (sh *sessionHandle) getClient() *vkit.Client {
104	sh.mu.Lock()
105	defer sh.mu.Unlock()
106	if sh.session == nil {
107		return nil
108	}
109	return sh.session.client
110}
111
112// getMetadata returns the metadata associated with the session in sessionHandle.
113func (sh *sessionHandle) getMetadata() metadata.MD {
114	sh.mu.Lock()
115	defer sh.mu.Unlock()
116	if sh.session == nil {
117		return nil
118	}
119	return sh.session.md
120}
121
122// getTransactionID returns the transaction id in the session if available.
123func (sh *sessionHandle) getTransactionID() transactionID {
124	sh.mu.Lock()
125	defer sh.mu.Unlock()
126	if sh.session == nil {
127		return nil
128	}
129	return sh.session.tx
130}
131
132// destroy destroys the inner session object. It is safe to call destroy
133// multiple times and only the first call would attempt to
134// destroy the inner session object.
135func (sh *sessionHandle) destroy() {
136	sh.mu.Lock()
137	s := sh.session
138	if s == nil {
139		// sessionHandle has already been recycled.
140		sh.mu.Unlock()
141		return
142	}
143	tracked := sh.trackedSessionHandle
144	sh.session = nil
145	sh.trackedSessionHandle = nil
146	sh.checkoutTime = time.Time{}
147	sh.stack = nil
148	sh.mu.Unlock()
149
150	if tracked != nil {
151		p := s.pool
152		p.mu.Lock()
153		p.trackedSessionHandles.Remove(tracked)
154		p.mu.Unlock()
155	}
156	s.destroy(false)
157}
158
159// session wraps a Cloud Spanner session ID through which transactions are
160// created and executed.
161type session struct {
162	// client is the RPC channel to Cloud Spanner. It is set only once during
163	// session's creation.
164	client *vkit.Client
165	// id is the unique id of the session in Cloud Spanner. It is set only once
166	// during session's creation.
167	id string
168	// pool is the session's home session pool where it was created. It is set
169	// only once during session's creation.
170	pool *sessionPool
171	// createTime is the timestamp of the session's creation. It is set only
172	// once during session's creation.
173	createTime time.Time
174	// logger is the logger configured for the Spanner client that created the
175	// session. If nil, logging will be directed to the standard logger.
176	logger *log.Logger
177
178	// mu protects the following fields from concurrent access: both
179	// healthcheck workers and transactions can modify them.
180	mu sync.Mutex
181	// valid marks the validity of a session.
182	valid bool
183	// hcIndex is the index of the session inside the global healthcheck queue.
184	// If hcIndex < 0, session has been unregistered from the queue.
185	hcIndex int
186	// idleList is the linkedlist node which links the session to its home
187	// session pool's idle list. If idleList == nil, the
188	// session is not in idle list.
189	idleList *list.Element
190	// nextCheck is the timestamp of next scheduled healthcheck of the session.
191	// It is maintained by the global health checker.
192	nextCheck time.Time
193	// checkingHelath is true if currently this session is being processed by
194	// health checker. Must be modified under health checker lock.
195	checkingHealth bool
196	// md is the Metadata to be sent with each request.
197	md metadata.MD
198	// tx contains the transaction id if the session has been prepared for
199	// write.
200	tx transactionID
201	// firstHCDone indicates whether the first health check is done or not.
202	firstHCDone bool
203}
204
205// isValid returns true if the session is still valid for use.
206func (s *session) isValid() bool {
207	s.mu.Lock()
208	defer s.mu.Unlock()
209	return s.valid
210}
211
212// isWritePrepared returns true if the session is prepared for write.
213func (s *session) isWritePrepared() bool {
214	s.mu.Lock()
215	defer s.mu.Unlock()
216	return s.tx != nil
217}
218
219// String implements fmt.Stringer for session.
220func (s *session) String() string {
221	s.mu.Lock()
222	defer s.mu.Unlock()
223	return fmt.Sprintf("<id=%v, hcIdx=%v, idleList=%p, valid=%v, create=%v, nextcheck=%v>",
224		s.id, s.hcIndex, s.idleList, s.valid, s.createTime, s.nextCheck)
225}
226
227// ping verifies if the session is still alive in Cloud Spanner.
228func (s *session) ping() error {
229	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
230	defer cancel()
231
232	// Start parent span that doesn't record.
233	_, span := octrace.StartSpan(ctx, "cloud.google.com/go/spanner.ping", octrace.WithSampler(octrace.NeverSample()))
234	defer span.End()
235
236	// s.getID is safe even when s is invalid.
237	_, err := s.client.ExecuteSql(contextWithOutgoingMetadata(ctx, s.md), &sppb.ExecuteSqlRequest{
238		Session: s.getID(),
239		Sql:     "SELECT 1",
240	})
241	return err
242}
243
244// setHcIndex atomically sets the session's index in the healthcheck queue and
245// returns the old index.
246func (s *session) setHcIndex(i int) int {
247	s.mu.Lock()
248	defer s.mu.Unlock()
249	oi := s.hcIndex
250	s.hcIndex = i
251	return oi
252}
253
254// setIdleList atomically sets the session's idle list link and returns the old
255// link.
256func (s *session) setIdleList(le *list.Element) *list.Element {
257	s.mu.Lock()
258	defer s.mu.Unlock()
259	old := s.idleList
260	s.idleList = le
261	return old
262}
263
264// invalidate marks a session as invalid and returns the old validity.
265func (s *session) invalidate() bool {
266	s.mu.Lock()
267	defer s.mu.Unlock()
268	ov := s.valid
269	s.valid = false
270	return ov
271}
272
273// setNextCheck sets the timestamp for next healthcheck on the session.
274func (s *session) setNextCheck(t time.Time) {
275	s.mu.Lock()
276	defer s.mu.Unlock()
277	s.nextCheck = t
278}
279
280// setTransactionID sets the transaction id in the session
281func (s *session) setTransactionID(tx transactionID) {
282	s.mu.Lock()
283	defer s.mu.Unlock()
284	s.tx = tx
285}
286
287// getID returns the session ID which uniquely identifies the session in Cloud
288// Spanner.
289func (s *session) getID() string {
290	s.mu.Lock()
291	defer s.mu.Unlock()
292	return s.id
293}
294
295// getHcIndex returns the session's index into the global healthcheck priority
296// queue.
297func (s *session) getHcIndex() int {
298	s.mu.Lock()
299	defer s.mu.Unlock()
300	return s.hcIndex
301}
302
303// getIdleList returns the session's link in its home session pool's idle list.
304func (s *session) getIdleList() *list.Element {
305	s.mu.Lock()
306	defer s.mu.Unlock()
307	return s.idleList
308}
309
310// getNextCheck returns the timestamp for next healthcheck on the session.
311func (s *session) getNextCheck() time.Time {
312	s.mu.Lock()
313	defer s.mu.Unlock()
314	return s.nextCheck
315}
316
317// recycle turns the session back to its home session pool.
318func (s *session) recycle() {
319	s.setTransactionID(nil)
320	if !s.pool.recycle(s) {
321		// s is rejected by its home session pool because it expired and the
322		// session pool currently has enough open sessions.
323		s.destroy(false)
324	}
325	s.pool.decNumInUse(context.Background())
326}
327
328// destroy removes the session from its home session pool, healthcheck queue
329// and Cloud Spanner service.
330func (s *session) destroy(isExpire bool) bool {
331	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
332	defer cancel()
333	return s.destroyWithContext(ctx, isExpire)
334}
335
336func (s *session) destroyWithContext(ctx context.Context, isExpire bool) bool {
337	// Remove s from session pool.
338	if !s.pool.remove(s, isExpire) {
339		return false
340	}
341	// Unregister s from healthcheck queue.
342	s.pool.hc.unregister(s)
343	// Remove s from Cloud Spanner service.
344	s.delete(ctx)
345	return true
346}
347
348func (s *session) delete(ctx context.Context) {
349	// Ignore the error because even if we fail to explicitly destroy the
350	// session, it will be eventually garbage collected by Cloud Spanner.
351	err := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.DeleteSessionRequest{Name: s.getID()})
352	// Do not log DeadlineExceeded errors when deleting sessions, as these do
353	// not indicate anything the user can or should act upon.
354	if err != nil && ErrCode(err) != codes.DeadlineExceeded {
355		logf(s.logger, "Failed to delete session %v. Error: %v", s.getID(), err)
356	}
357}
358
359// prepareForWrite prepares the session for write if it is not already in that
360// state.
361func (s *session) prepareForWrite(ctx context.Context) error {
362	if s.isWritePrepared() {
363		return nil
364	}
365	tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, s.md), s.getID(), s.client)
366	// Session not found should cause the session to be removed from the pool.
367	if isSessionNotFoundError(err) {
368		s.pool.remove(s, false)
369		s.pool.hc.unregister(s)
370		return err
371	}
372	// Enable/disable background preparing of write sessions depending on
373	// whether the BeginTransaction call succeeded. This will prevent the
374	// session pool workers from going into an infinite loop of trying to
375	// prepare sessions. Any subsequent successful BeginTransaction call from
376	// for example takeWriteSession will re-enable the background process.
377	s.pool.mu.Lock()
378	s.pool.disableBackgroundPrepareSessions = err != nil
379	s.pool.mu.Unlock()
380	if err != nil {
381		return err
382	}
383	s.setTransactionID(tx)
384	return nil
385}
386
387// SessionPoolConfig stores configurations of a session pool.
388type SessionPoolConfig struct {
389	// MaxOpened is the maximum number of opened sessions allowed by the session
390	// pool. If the client tries to open a session and there are already
391	// MaxOpened sessions, it will block until one becomes available or the
392	// context passed to the client method is canceled or times out.
393	//
394	// Defaults to NumChannels * 100.
395	MaxOpened uint64
396
397	// MinOpened is the minimum number of opened sessions that the session pool
398	// tries to maintain. Session pool won't continue to expire sessions if
399	// number of opened connections drops below MinOpened. However, if a session
400	// is found to be broken, it will still be evicted from the session pool,
401	// therefore it is posssible that the number of opened sessions drops below
402	// MinOpened.
403	//
404	// Defaults to 100.
405	MinOpened uint64
406
407	// MaxIdle is the maximum number of idle sessions that are allowed in the
408	// session pool.
409	//
410	// Defaults to 0.
411	MaxIdle uint64
412
413	// MaxBurst is the maximum number of concurrent session creation requests.
414	//
415	// Deprecated: MaxBurst exists for historical compatibility and should not
416	// be used. MaxBurst was used to limit the number of sessions that the
417	// session pool could create within a time frame. This was an early safety
418	// valve to prevent a client from overwhelming the backend if a large number
419	// of sessions was suddenly needed. The session pool would then pause the
420	// creation of sessions for a while. Such a pause is no longer needed and
421	// the implementation has been removed from the pool.
422	//
423	// Defaults to 10.
424	MaxBurst uint64
425
426	// incStep is the number of sessions to create in one batch when at least
427	// one more session is needed.
428	//
429	// Defaults to 25.
430	incStep uint64
431
432	// WriteSessions is the fraction of sessions we try to keep prepared for
433	// write.
434	//
435	// Defaults to 0.2.
436	WriteSessions float64
437
438	// HealthCheckWorkers is number of workers used by health checker for this
439	// pool.
440	//
441	// Defaults to 10.
442	HealthCheckWorkers int
443
444	// HealthCheckInterval is how often the health checker pings a session.
445	//
446	// Defaults to 50m.
447	HealthCheckInterval time.Duration
448
449	// TrackSessionHandles determines whether the session pool will keep track
450	// of the stacktrace of the goroutines that take sessions from the pool.
451	// This setting can be used to track down session leak problems.
452	//
453	// Defaults to false.
454	TrackSessionHandles bool
455
456	// healthCheckSampleInterval is how often the health checker samples live
457	// session (for use in maintaining session pool size).
458	//
459	// Defaults to 1m.
460	healthCheckSampleInterval time.Duration
461
462	// sessionLabels for the sessions created in the session pool.
463	sessionLabels map[string]string
464}
465
466// DefaultSessionPoolConfig is the default configuration for the session pool
467// that will be used for a Spanner client, unless the user supplies a specific
468// session pool config.
469var DefaultSessionPoolConfig = SessionPoolConfig{
470	MinOpened:           100,
471	MaxOpened:           numChannels * 100,
472	MaxBurst:            10,
473	incStep:             25,
474	WriteSessions:       0.2,
475	HealthCheckWorkers:  10,
476	HealthCheckInterval: healthCheckIntervalMins * time.Minute,
477}
478
479// errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set.
480func errMinOpenedGTMaxOpened(maxOpened, minOpened uint64) error {
481	return spannerErrorf(codes.InvalidArgument,
482		"require SessionPoolConfig.MaxOpened >= SessionPoolConfig.MinOpened, got %d and %d", maxOpened, minOpened)
483}
484
485// errWriteFractionOutOfRange returns error for
486// SessionPoolConfig.WriteFraction < 0 or SessionPoolConfig.WriteFraction > 1
487func errWriteFractionOutOfRange(writeFraction float64) error {
488	return spannerErrorf(codes.InvalidArgument,
489		"require SessionPoolConfig.WriteSessions >= 0.0 && SessionPoolConfig.WriteSessions <= 1.0, got %.2f", writeFraction)
490}
491
492// errHealthCheckWorkersNegative returns error for
493// SessionPoolConfig.HealthCheckWorkers < 0
494func errHealthCheckWorkersNegative(workers int) error {
495	return spannerErrorf(codes.InvalidArgument,
496		"require SessionPoolConfig.HealthCheckWorkers >= 0, got %d", workers)
497}
498
499// errHealthCheckIntervalNegative returns error for
500// SessionPoolConfig.HealthCheckInterval < 0
501func errHealthCheckIntervalNegative(interval time.Duration) error {
502	return spannerErrorf(codes.InvalidArgument,
503		"require SessionPoolConfig.HealthCheckInterval >= 0, got %v", interval)
504}
505
506// validate verifies that the SessionPoolConfig is good for use.
507func (spc *SessionPoolConfig) validate() error {
508	if spc.MinOpened > spc.MaxOpened && spc.MaxOpened > 0 {
509		return errMinOpenedGTMaxOpened(spc.MaxOpened, spc.MinOpened)
510	}
511	if spc.WriteSessions < 0.0 || spc.WriteSessions > 1.0 {
512		return errWriteFractionOutOfRange(spc.WriteSessions)
513	}
514	if spc.HealthCheckWorkers < 0 {
515		return errHealthCheckWorkersNegative(spc.HealthCheckWorkers)
516	}
517	if spc.HealthCheckInterval < 0 {
518		return errHealthCheckIntervalNegative(spc.HealthCheckInterval)
519	}
520	return nil
521}
522
523// sessionPool creates and caches Cloud Spanner sessions.
524type sessionPool struct {
525	// mu protects sessionPool from concurrent access.
526	mu sync.Mutex
527	// valid marks the validity of the session pool.
528	valid bool
529	// sc is used to create the sessions for the pool.
530	sc *sessionClient
531	// trackedSessionHandles contains all sessions handles that have been
532	// checked out of the pool. The list is only filled if TrackSessionHandles
533	// has been enabled.
534	trackedSessionHandles list.List
535	// idleList caches idle session IDs. Session IDs in this list can be
536	// allocated for use.
537	idleList list.List
538	// idleWriteList caches idle sessions which have been prepared for write.
539	idleWriteList list.List
540	// mayGetSession is for broadcasting that session retrival/creation may
541	// proceed.
542	mayGetSession chan struct{}
543	// sessionCreationError is the last error that occurred during session
544	// creation and is propagated to any waiters waiting for a session.
545	sessionCreationError error
546	// numOpened is the total number of open sessions from the session pool.
547	numOpened uint64
548	// createReqs is the number of ongoing session creation requests.
549	createReqs uint64
550	// prepareReqs is the number of ongoing session preparation request.
551	prepareReqs uint64
552	// numReadWaiters is the number of processes waiting for a read session to
553	// become available.
554	numReadWaiters uint64
555	// numWriteWaiters is the number of processes waiting for a write session
556	// to become available.
557	numWriteWaiters uint64
558	// disableBackgroundPrepareSessions indicates that the BeginTransaction
559	// call for a read/write transaction failed with a permanent error, such as
560	// PermissionDenied or `Database not found`. Further background calls to
561	// prepare sessions will be disabled.
562	disableBackgroundPrepareSessions bool
563	// configuration of the session pool.
564	SessionPoolConfig
565	// hc is the health checker
566	hc *healthChecker
567	// rand is a separately sourced random generator.
568	rand *rand.Rand
569	// numInUse is the number of sessions that are currently in use (checked out
570	// from the session pool).
571	numInUse uint64
572	// maxNumInUse is the maximum number of sessions in use concurrently in the
573	// current 10 minute interval.
574	maxNumInUse uint64
575	// lastResetTime is the start time of the window for recording maxNumInUse.
576	lastResetTime time.Time
577	// numReads is the number of sessions that are idle for reads.
578	numReads uint64
579	// numWrites is the number of sessions that are idle for writes.
580	numWrites uint64
581
582	// mw is the maintenance window containing statistics for the max number of
583	// sessions checked out of the pool during the last 10 minutes.
584	mw *maintenanceWindow
585
586	// tagMap is a map of all tags that are associated with the emitted metrics.
587	tagMap *tag.Map
588}
589
590// newSessionPool creates a new session pool.
591func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, error) {
592	if err := config.validate(); err != nil {
593		return nil, err
594	}
595	pool := &sessionPool{
596		sc:                sc,
597		valid:             true,
598		mayGetSession:     make(chan struct{}),
599		SessionPoolConfig: config,
600		mw:                newMaintenanceWindow(config.MaxOpened),
601		rand:              rand.New(rand.NewSource(time.Now().UnixNano())),
602	}
603	if config.HealthCheckWorkers == 0 {
604		// With 10 workers and assuming average latency of 5ms for
605		// BeginTransaction, we will be able to prepare 2000 tx/sec in advance.
606		// If the rate of takeWriteSession is more than that, it will degrade to
607		// doing BeginTransaction inline.
608		//
609		// TODO: consider resizing the worker pool dynamically according to the load.
610		config.HealthCheckWorkers = 10
611	}
612	if config.HealthCheckInterval == 0 {
613		config.HealthCheckInterval = healthCheckIntervalMins * time.Minute
614	}
615	if config.healthCheckSampleInterval == 0 {
616		config.healthCheckSampleInterval = time.Minute
617	}
618
619	_, instance, database, err := parseDatabaseName(sc.database)
620	if err != nil {
621		return nil, err
622	}
623	// Errors should not prevent initializing the session pool.
624	ctx, err := tag.New(context.Background(),
625		tag.Upsert(tagKeyClientID, sc.id),
626		tag.Upsert(tagKeyDatabase, database),
627		tag.Upsert(tagKeyInstance, instance),
628		tag.Upsert(tagKeyLibVersion, version.Repo),
629	)
630	if err != nil {
631		logf(pool.sc.logger, "Failed to create tag map, error: %v", err)
632	}
633	pool.tagMap = tag.FromContext(ctx)
634
635	// On GCE VM, within the same region an healthcheck ping takes on average
636	// 10ms to finish, given a 5 minutes interval and 10 healthcheck workers, a
637	// healthChecker can effectively mantain
638	// 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions.
639	pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, config.healthCheckSampleInterval, pool)
640
641	// First initialize the pool before we indicate that the healthchecker is
642	// ready. This prevents the maintainer from starting before the pool has
643	// been initialized, which means that we guarantee that the initial
644	// sessions are created using BatchCreateSessions.
645	if config.MinOpened > 0 {
646		numSessions := minUint64(config.MinOpened, math.MaxInt32)
647		if err := pool.initPool(numSessions); err != nil {
648			return nil, err
649		}
650	}
651	pool.recordStat(context.Background(), MaxAllowedSessionsCount, int64(config.MaxOpened))
652	close(pool.hc.ready)
653	return pool, nil
654}
655
656func (p *sessionPool) recordStat(ctx context.Context, m *stats.Int64Measure, n int64, tags ...tag.Tag) {
657	ctx = tag.NewContext(ctx, p.tagMap)
658	mutators := make([]tag.Mutator, len(tags))
659	for i, t := range tags {
660		mutators[i] = tag.Upsert(t.Key, t.Value)
661	}
662	ctx, err := tag.New(ctx, mutators...)
663	if err != nil {
664		logf(p.sc.logger, "Failed to tag metrics, error: %v", err)
665	}
666	recordStat(ctx, m, n)
667}
668
669func (p *sessionPool) initPool(numSessions uint64) error {
670	p.mu.Lock()
671	defer p.mu.Unlock()
672	return p.growPoolLocked(numSessions, true)
673}
674
675func (p *sessionPool) growPoolLocked(numSessions uint64, distributeOverChannels bool) error {
676	// Take budget before the actual session creation.
677	numSessions = minUint64(numSessions, math.MaxInt32)
678	p.numOpened += uint64(numSessions)
679	p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
680	p.createReqs += uint64(numSessions)
681	// Asynchronously create a batch of sessions for the pool.
682	return p.sc.batchCreateSessions(int32(numSessions), distributeOverChannels, p)
683}
684
685// sessionReady is executed by the SessionClient when a session has been
686// created and is ready to use. This method will add the new session to the
687// pool and decrease the number of sessions that is being created.
688func (p *sessionPool) sessionReady(s *session) {
689	p.mu.Lock()
690	defer p.mu.Unlock()
691	// Clear any session creation error.
692	p.sessionCreationError = nil
693	// Set this pool as the home pool of the session and register it with the
694	// health checker.
695	s.pool = p
696	p.hc.register(s)
697	p.createReqs--
698	// Insert the session at a random position in the pool to prevent all
699	// sessions affiliated with a channel to be placed at sequentially in the
700	// pool.
701	if p.idleList.Len() > 0 {
702		pos := rand.Intn(p.idleList.Len())
703		before := p.idleList.Front()
704		for i := 0; i < pos; i++ {
705			before = before.Next()
706		}
707		s.setIdleList(p.idleList.InsertBefore(s, before))
708	} else {
709		s.setIdleList(p.idleList.PushBack(s))
710	}
711	p.incNumReadsLocked(context.Background())
712	// Notify other waiters blocking on session creation.
713	close(p.mayGetSession)
714	p.mayGetSession = make(chan struct{})
715}
716
717// sessionCreationFailed is called by the SessionClient when the creation of one
718// or more requested sessions finished with an error. sessionCreationFailed will
719// decrease the number of sessions being created and notify any waiters that
720// the session creation failed.
721func (p *sessionPool) sessionCreationFailed(err error, numSessions int32) {
722	p.mu.Lock()
723	defer p.mu.Unlock()
724	p.createReqs -= uint64(numSessions)
725	p.numOpened -= uint64(numSessions)
726	p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
727	// Notify other waiters blocking on session creation.
728	p.sessionCreationError = err
729	close(p.mayGetSession)
730	p.mayGetSession = make(chan struct{})
731}
732
733// isValid checks if the session pool is still valid.
734func (p *sessionPool) isValid() bool {
735	if p == nil {
736		return false
737	}
738	p.mu.Lock()
739	defer p.mu.Unlock()
740	return p.valid
741}
742
743// close marks the session pool as closed and deletes all sessions in parallel.
744// Any errors that are returned by the Delete RPC are logged but otherwise
745// ignored, except for DeadlineExceeded errors, which are ignored and not
746// logged.
747func (p *sessionPool) close(ctx context.Context) {
748	if p == nil {
749		return
750	}
751	p.mu.Lock()
752	if !p.valid {
753		p.mu.Unlock()
754		return
755	}
756	p.valid = false
757	p.mu.Unlock()
758	p.hc.close()
759	// destroy all the sessions
760	p.hc.mu.Lock()
761	allSessions := make([]*session, len(p.hc.queue.sessions))
762	copy(allSessions, p.hc.queue.sessions)
763	p.hc.mu.Unlock()
764	wg := sync.WaitGroup{}
765	for _, s := range allSessions {
766		wg.Add(1)
767		go deleteSession(ctx, s, &wg)
768	}
769	wg.Wait()
770}
771
772func deleteSession(ctx context.Context, s *session, wg *sync.WaitGroup) {
773	defer wg.Done()
774	s.destroyWithContext(ctx, false)
775}
776
777// errInvalidSessionPool is the error for using an invalid session pool.
778var errInvalidSessionPool = spannerErrorf(codes.InvalidArgument, "invalid session pool")
779
780// errGetSessionTimeout returns error for context timeout during
781// sessionPool.take().
782var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canceled during getting session")
783
784// newSessionHandle creates a new session handle for the given session for this
785// session pool. The session handle will also hold a copy of the current call
786// stack if the session pool has been configured to track the call stacks of
787// sessions being checked out of the pool.
788func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) {
789	sh = &sessionHandle{session: s, checkoutTime: time.Now()}
790	if p.TrackSessionHandles {
791		p.mu.Lock()
792		sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh)
793		p.mu.Unlock()
794		sh.stack = debug.Stack()
795	}
796	return sh
797}
798
799// errGetSessionTimeout returns error for context timeout during
800// sessionPool.take().
801func (p *sessionPool) errGetSessionTimeout(ctx context.Context) error {
802	var code codes.Code
803	if ctx.Err() == context.DeadlineExceeded {
804		code = codes.DeadlineExceeded
805	} else {
806		code = codes.Canceled
807	}
808	if p.TrackSessionHandles {
809		return p.errGetSessionTimeoutWithTrackedSessionHandles(code)
810	}
811	return p.errGetBasicSessionTimeout(code)
812}
813
814// errGetBasicSessionTimeout returns error for context timout during
815// sessionPool.take() without any tracked sessionHandles.
816func (p *sessionPool) errGetBasicSessionTimeout(code codes.Code) error {
817	return spannerErrorf(code, "timeout / context canceled during getting session.\n"+
818		"Enable SessionPoolConfig.TrackSessionHandles if you suspect a session leak to get more information about the checked out sessions.")
819}
820
821// errGetSessionTimeoutWithTrackedSessionHandles returns error for context
822// timout during sessionPool.take() including a stacktrace of each checked out
823// session handle.
824func (p *sessionPool) errGetSessionTimeoutWithTrackedSessionHandles(code codes.Code) error {
825	err := spannerErrorf(code, "timeout / context canceled during getting session.")
826	err.(*Error).additionalInformation = p.getTrackedSessionHandleStacksLocked()
827	return err
828}
829
830// getTrackedSessionHandleStacksLocked returns a string containing the
831// stacktrace of all currently checked out sessions of the pool. This method
832// requires the caller to have locked p.mu.
833func (p *sessionPool) getTrackedSessionHandleStacksLocked() string {
834	p.mu.Lock()
835	defer p.mu.Unlock()
836	stackTraces := ""
837	i := 1
838	element := p.trackedSessionHandles.Front()
839	for element != nil {
840		sh := element.Value.(*sessionHandle)
841		sh.mu.Lock()
842		if sh.stack != nil {
843			stackTraces = fmt.Sprintf("%s\n\nSession %d checked out of pool at %s by goroutine:\n%s", stackTraces, i, sh.checkoutTime.Format(time.RFC3339), sh.stack)
844		}
845		sh.mu.Unlock()
846		element = element.Next()
847		i++
848	}
849	return stackTraces
850}
851
852// shouldPrepareWriteLocked returns true if we should prepare more sessions for write.
853func (p *sessionPool) shouldPrepareWriteLocked() bool {
854	return !p.disableBackgroundPrepareSessions && float64(p.numOpened)*p.WriteSessions > float64(p.idleWriteList.Len()+int(p.prepareReqs))
855}
856
857func (p *sessionPool) createSession(ctx context.Context) (*session, error) {
858	trace.TracePrintf(ctx, nil, "Creating a new session")
859	doneCreate := func(done bool) {
860		p.mu.Lock()
861		if !done {
862			// Session creation failed, give budget back.
863			p.numOpened--
864			p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
865		}
866		p.createReqs--
867		// Notify other waiters blocking on session creation.
868		close(p.mayGetSession)
869		p.mayGetSession = make(chan struct{})
870		p.mu.Unlock()
871	}
872	s, err := p.sc.createSession(ctx)
873	if err != nil {
874		doneCreate(false)
875		// Should return error directly because of the previous retries on
876		// CreateSession RPC.
877		// If the error is a timeout, there is a chance that the session was
878		// created on the server but is not known to the session pool. This
879		// session will then be garbage collected by the server after 1 hour.
880		return nil, err
881	}
882	s.pool = p
883	p.hc.register(s)
884	doneCreate(true)
885	return s, nil
886}
887
888func (p *sessionPool) isHealthy(s *session) bool {
889	if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) {
890		if err := s.ping(); isSessionNotFoundError(err) {
891			// The session is already bad, continue to fetch/create a new one.
892			s.destroy(false)
893			return false
894		}
895		p.hc.scheduledHC(s)
896	}
897	return true
898}
899
900// take returns a cached session if there are available ones; if there isn't
901// any, it tries to allocate a new one. Session returned by take should be used
902// for read operations.
903func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
904	trace.TracePrintf(ctx, nil, "Acquiring a read-only session")
905	for {
906		var s *session
907
908		p.mu.Lock()
909		if !p.valid {
910			p.mu.Unlock()
911			return nil, errInvalidSessionPool
912		}
913		if p.idleList.Len() > 0 {
914			// Idle sessions are available, get one from the top of the idle
915			// list.
916			s = p.idleList.Remove(p.idleList.Front()).(*session)
917			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
918				"Acquired read-only session")
919			p.decNumReadsLocked(ctx)
920		} else if p.idleWriteList.Len() > 0 {
921			s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session)
922			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
923				"Acquired read-write session")
924			p.decNumWritesLocked(ctx)
925		}
926		if s != nil {
927			s.setIdleList(nil)
928			numCheckedOut := p.currSessionsCheckedOutLocked()
929			p.mu.Unlock()
930			p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
931			// From here, session is no longer in idle list, so healthcheck
932			// workers won't destroy it. If healthcheck workers failed to
933			// schedule healthcheck for the session timely, do the check here.
934			// Because session check is still much cheaper than session
935			// creation, they should be reused as much as possible.
936			if !p.isHealthy(s) {
937				continue
938			}
939			p.incNumInUse(ctx)
940			return p.newSessionHandle(s), nil
941		}
942
943		// No session available. Start the creation of a new batch of sessions
944		// if that is allowed, and then wait for a session to come available.
945		if p.numReadWaiters+p.numWriteWaiters >= p.createReqs {
946			numSessions := minUint64(p.MaxOpened-p.numOpened, p.incStep)
947			if err := p.growPoolLocked(numSessions, false); err != nil {
948				p.mu.Unlock()
949				return nil, err
950			}
951		}
952
953		p.numReadWaiters++
954		mayGetSession := p.mayGetSession
955		p.mu.Unlock()
956		trace.TracePrintf(ctx, nil, "Waiting for read-only session to become available")
957		select {
958		case <-ctx.Done():
959			trace.TracePrintf(ctx, nil, "Context done waiting for session")
960			p.recordStat(ctx, GetSessionTimeoutsCount, 1)
961			p.mu.Lock()
962			p.numReadWaiters--
963			p.mu.Unlock()
964			return nil, p.errGetSessionTimeout(ctx)
965		case <-mayGetSession:
966			p.mu.Lock()
967			p.numReadWaiters--
968			if p.sessionCreationError != nil {
969				trace.TracePrintf(ctx, nil, "Error creating session: %v", p.sessionCreationError)
970				err := p.sessionCreationError
971				p.mu.Unlock()
972				return nil, err
973			}
974			p.mu.Unlock()
975		}
976	}
977}
978
979// takeWriteSession returns a write prepared cached session if there are
980// available ones; if there isn't any, it tries to allocate a new one. Session
981// returned should be used for read write transactions.
982func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) {
983	trace.TracePrintf(ctx, nil, "Acquiring a read-write session")
984	for {
985		var (
986			s   *session
987			err error
988		)
989
990		p.mu.Lock()
991		if !p.valid {
992			p.mu.Unlock()
993			return nil, errInvalidSessionPool
994		}
995		if p.idleWriteList.Len() > 0 {
996			// Idle sessions are available, get one from the top of the idle
997			// list.
998			s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session)
999			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-write session")
1000			p.decNumWritesLocked(ctx)
1001		} else if p.idleList.Len() > 0 {
1002			s = p.idleList.Remove(p.idleList.Front()).(*session)
1003			trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-only session")
1004			p.decNumReadsLocked(ctx)
1005		}
1006		if s != nil {
1007			s.setIdleList(nil)
1008			numCheckedOut := p.currSessionsCheckedOutLocked()
1009			p.mu.Unlock()
1010			p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
1011			// From here, session is no longer in idle list, so healthcheck
1012			// workers won't destroy it. If healthcheck workers failed to
1013			// schedule healthcheck for the session timely, do the check here.
1014			// Because session check is still much cheaper than session
1015			// creation, they should be reused as much as possible.
1016			if !p.isHealthy(s) {
1017				continue
1018			}
1019		} else {
1020			// No session available. Start the creation of a new batch of sessions
1021			// if that is allowed, and then wait for a session to come available.
1022			if p.numReadWaiters+p.numWriteWaiters >= p.createReqs {
1023				numSessions := minUint64(p.MaxOpened-p.numOpened, p.incStep)
1024				if err := p.growPoolLocked(numSessions, false); err != nil {
1025					p.mu.Unlock()
1026					return nil, err
1027				}
1028			}
1029
1030			p.numWriteWaiters++
1031			mayGetSession := p.mayGetSession
1032			p.mu.Unlock()
1033			trace.TracePrintf(ctx, nil, "Waiting for read-write session to become available")
1034			select {
1035			case <-ctx.Done():
1036				trace.TracePrintf(ctx, nil, "Context done waiting for session")
1037				p.recordStat(ctx, GetSessionTimeoutsCount, 1)
1038				p.mu.Lock()
1039				p.numWriteWaiters--
1040				p.mu.Unlock()
1041				return nil, p.errGetSessionTimeout(ctx)
1042			case <-mayGetSession:
1043				p.mu.Lock()
1044				p.numWriteWaiters--
1045				if p.sessionCreationError != nil {
1046					err := p.sessionCreationError
1047					p.mu.Unlock()
1048					return nil, err
1049				}
1050				p.mu.Unlock()
1051			}
1052			continue
1053		}
1054		if !s.isWritePrepared() {
1055			p.incNumBeingPrepared(ctx)
1056			defer p.decNumBeingPrepared(ctx)
1057			if err = s.prepareForWrite(ctx); err != nil {
1058				if isSessionNotFoundError(err) {
1059					s.destroy(false)
1060					trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
1061						"Session not found for write")
1062					return nil, ToSpannerError(err)
1063				}
1064
1065				s.recycle()
1066				trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
1067					"Error preparing session for write")
1068				return nil, ToSpannerError(err)
1069			}
1070		}
1071		p.incNumInUse(ctx)
1072		return p.newSessionHandle(s), nil
1073	}
1074}
1075
1076// recycle puts session s back to the session pool's idle list, it returns true
1077// if the session pool successfully recycles session s.
1078func (p *sessionPool) recycle(s *session) bool {
1079	p.mu.Lock()
1080	defer p.mu.Unlock()
1081	if !s.isValid() || !p.valid {
1082		// Reject the session if session is invalid or pool itself is invalid.
1083		return false
1084	}
1085	ctx := context.Background()
1086	// Put session at the top of the list to be handed out in LIFO order for load balancing
1087	// across channels.
1088	if s.isWritePrepared() {
1089		s.setIdleList(p.idleWriteList.PushFront(s))
1090		p.incNumWritesLocked(ctx)
1091	} else {
1092		s.setIdleList(p.idleList.PushFront(s))
1093		p.incNumReadsLocked(ctx)
1094	}
1095	// Broadcast that a session has been returned to idle list.
1096	close(p.mayGetSession)
1097	p.mayGetSession = make(chan struct{})
1098	return true
1099}
1100
1101// remove atomically removes session s from the session pool and invalidates s.
1102// If isExpire == true, the removal is triggered by session expiration and in
1103// such cases, only idle sessions can be removed.
1104func (p *sessionPool) remove(s *session, isExpire bool) bool {
1105	p.mu.Lock()
1106	defer p.mu.Unlock()
1107	if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) {
1108		// Don't expire session if the session is not in idle list (in use), or
1109		// if number of open sessions is going below p.MinOpened.
1110		return false
1111	}
1112	ol := s.setIdleList(nil)
1113	ctx := context.Background()
1114	// If the session is in the idlelist, remove it.
1115	if ol != nil {
1116		// Remove from whichever list it is in.
1117		p.idleList.Remove(ol)
1118		p.idleWriteList.Remove(ol)
1119		if s.isWritePrepared() {
1120			p.decNumWritesLocked(ctx)
1121		} else {
1122			p.decNumReadsLocked(ctx)
1123		}
1124	}
1125	if s.invalidate() {
1126		// Decrease the number of opened sessions.
1127		p.numOpened--
1128		p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
1129		// Broadcast that a session has been destroyed.
1130		close(p.mayGetSession)
1131		p.mayGetSession = make(chan struct{})
1132		return true
1133	}
1134	return false
1135}
1136
1137func (p *sessionPool) currSessionsCheckedOutLocked() uint64 {
1138	return p.numOpened - uint64(p.idleList.Len()) - uint64(p.idleWriteList.Len())
1139}
1140
1141func (p *sessionPool) incNumInUse(ctx context.Context) {
1142	p.mu.Lock()
1143	p.incNumInUseLocked(ctx)
1144	p.mu.Unlock()
1145}
1146
1147func (p *sessionPool) incNumInUseLocked(ctx context.Context) {
1148	p.numInUse++
1149	p.recordStat(ctx, SessionsCount, int64(p.numInUse), tagNumInUseSessions)
1150	p.recordStat(ctx, AcquiredSessionsCount, 1)
1151	if p.numInUse > p.maxNumInUse {
1152		p.maxNumInUse = p.numInUse
1153		p.recordStat(ctx, MaxInUseSessionsCount, int64(p.maxNumInUse))
1154	}
1155}
1156
1157func (p *sessionPool) decNumInUse(ctx context.Context) {
1158	p.mu.Lock()
1159	p.decNumInUseLocked(ctx)
1160	p.mu.Unlock()
1161}
1162
1163func (p *sessionPool) decNumInUseLocked(ctx context.Context) {
1164	p.numInUse--
1165	p.recordStat(ctx, SessionsCount, int64(p.numInUse), tagNumInUseSessions)
1166	p.recordStat(ctx, ReleasedSessionsCount, 1)
1167}
1168
1169func (p *sessionPool) incNumReadsLocked(ctx context.Context) {
1170	p.numReads++
1171	p.recordStat(ctx, SessionsCount, int64(p.numReads), tagNumReadSessions)
1172}
1173
1174func (p *sessionPool) decNumReadsLocked(ctx context.Context) {
1175	p.numReads--
1176	p.recordStat(ctx, SessionsCount, int64(p.numReads), tagNumReadSessions)
1177}
1178
1179func (p *sessionPool) incNumWritesLocked(ctx context.Context) {
1180	p.numWrites++
1181	p.recordStat(ctx, SessionsCount, int64(p.numWrites), tagNumWriteSessions)
1182}
1183
1184func (p *sessionPool) decNumWritesLocked(ctx context.Context) {
1185	p.numWrites--
1186	p.recordStat(ctx, SessionsCount, int64(p.numWrites), tagNumWriteSessions)
1187}
1188
1189func (p *sessionPool) incNumBeingPrepared(ctx context.Context) {
1190	p.mu.Lock()
1191	p.incNumBeingPreparedLocked(ctx)
1192	p.mu.Unlock()
1193}
1194
1195func (p *sessionPool) incNumBeingPreparedLocked(ctx context.Context) {
1196	p.prepareReqs++
1197	p.recordStat(ctx, SessionsCount, int64(p.prepareReqs), tagNumBeingPrepared)
1198}
1199
1200func (p *sessionPool) decNumBeingPrepared(ctx context.Context) {
1201	p.mu.Lock()
1202	p.decNumBeingPreparedLocked(ctx)
1203	p.mu.Unlock()
1204}
1205
1206func (p *sessionPool) decNumBeingPreparedLocked(ctx context.Context) {
1207	p.prepareReqs--
1208	p.recordStat(ctx, SessionsCount, int64(p.prepareReqs), tagNumBeingPrepared)
1209}
1210
1211// hcHeap implements heap.Interface. It is used to create the priority queue for
1212// session healthchecks.
1213type hcHeap struct {
1214	sessions []*session
1215}
1216
1217// Len implements heap.Interface.Len.
1218func (h hcHeap) Len() int {
1219	return len(h.sessions)
1220}
1221
1222// Less implements heap.Interface.Less.
1223func (h hcHeap) Less(i, j int) bool {
1224	return h.sessions[i].getNextCheck().Before(h.sessions[j].getNextCheck())
1225}
1226
1227// Swap implements heap.Interface.Swap.
1228func (h hcHeap) Swap(i, j int) {
1229	h.sessions[i], h.sessions[j] = h.sessions[j], h.sessions[i]
1230	h.sessions[i].setHcIndex(i)
1231	h.sessions[j].setHcIndex(j)
1232}
1233
1234// Push implements heap.Interface.Push.
1235func (h *hcHeap) Push(s interface{}) {
1236	ns := s.(*session)
1237	ns.setHcIndex(len(h.sessions))
1238	h.sessions = append(h.sessions, ns)
1239}
1240
1241// Pop implements heap.Interface.Pop.
1242func (h *hcHeap) Pop() interface{} {
1243	old := h.sessions
1244	n := len(old)
1245	s := old[n-1]
1246	h.sessions = old[:n-1]
1247	s.setHcIndex(-1)
1248	return s
1249}
1250
1251// maintenanceWindowSize specifies the number of health check cycles that
1252// defines a maintenance window. The maintenance window keeps track of a
1253// rolling set of numbers for the number of maximum checked out sessions during
1254// the maintenance window. This is used by the maintainer to determine the
1255// number of sessions to create or delete at the end of each health check
1256// cycle.
1257const maintenanceWindowSize = 10
1258
1259// maintenanceWindow contains the statistics that are gathered during a health
1260// check maintenance window.
1261type maintenanceWindow struct {
1262	mu sync.Mutex
1263	// maxSessionsCheckedOut contains the maximum number of sessions that was
1264	// checked out of the session pool during a health check cycle. This number
1265	// indicates the number of sessions that was actually needed by the pool to
1266	// serve the load during that cycle. The values are kept as a rolling set
1267	// containing the values for the past 10 cycles (minutes). The maintainer
1268	// uses these values to determine the number of sessions to keep at the end
1269	// of each cycle.
1270	maxSessionsCheckedOut [maintenanceWindowSize]uint64
1271}
1272
1273// maxSessionsCheckedOutDuringWindow returns the maximum number of sessions
1274// that has been checked out during the last maintenance window of 10 cycles
1275// (minutes).
1276func (mw *maintenanceWindow) maxSessionsCheckedOutDuringWindow() uint64 {
1277	mw.mu.Lock()
1278	defer mw.mu.Unlock()
1279	var max uint64
1280	for _, cycleMax := range mw.maxSessionsCheckedOut {
1281		max = maxUint64(max, cycleMax)
1282	}
1283	return max
1284}
1285
1286// updateMaxSessionsCheckedOutDuringWindow updates the maximum number of
1287// sessions that has been checked out of the pool during the current
1288// cycle of the maintenance window. A maintenance window consists of 10
1289// maintenance cycles. Each cycle keeps track of the max number of sessions in
1290// use during that cycle. The rolling maintenance window of 10 cycles is used
1291// to determine the number of sessions to keep at the end of a cycle by
1292// calculating the max in use during the last 10 cycles.
1293func (mw *maintenanceWindow) updateMaxSessionsCheckedOutDuringWindow(currNumSessionsCheckedOut uint64) {
1294	mw.mu.Lock()
1295	defer mw.mu.Unlock()
1296	mw.maxSessionsCheckedOut[0] = maxUint64(currNumSessionsCheckedOut, mw.maxSessionsCheckedOut[0])
1297}
1298
1299// startNewCycle starts a new health check cycle with the specified number of
1300// checked out sessions as its initial value.
1301func (mw *maintenanceWindow) startNewCycle(currNumSessionsCheckedOut uint64) {
1302	mw.mu.Lock()
1303	defer mw.mu.Unlock()
1304	copy(mw.maxSessionsCheckedOut[1:], mw.maxSessionsCheckedOut[:9])
1305	mw.maxSessionsCheckedOut[0] = currNumSessionsCheckedOut
1306}
1307
1308// newMaintenanceWindow creates a new maintenance window with all values for
1309// maxSessionsCheckedOut set to maxOpened. This ensures that a complete
1310// maintenance window must pass before the maintainer will start to delete any
1311// sessions.
1312func newMaintenanceWindow(maxOpened uint64) *maintenanceWindow {
1313	mw := &maintenanceWindow{}
1314	// Initialize the rolling window with max values to prevent the maintainer
1315	// from deleting sessions before a complete window of 10 cycles has
1316	// finished.
1317	for i := 0; i < maintenanceWindowSize; i++ {
1318		mw.maxSessionsCheckedOut[i] = maxOpened
1319	}
1320	return mw
1321}
1322
1323// healthChecker performs periodical healthchecks on registered sessions.
1324type healthChecker struct {
1325	// mu protects concurrent access to healthChecker.
1326	mu sync.Mutex
1327	// queue is the priority queue for session healthchecks. Sessions with lower
1328	// nextCheck rank higher in the queue.
1329	queue hcHeap
1330	// interval is the average interval between two healthchecks on a session.
1331	interval time.Duration
1332	// workers is the number of concurrent healthcheck workers.
1333	workers int
1334	// waitWorkers waits for all healthcheck workers to exit
1335	waitWorkers sync.WaitGroup
1336	// pool is the underlying session pool.
1337	pool *sessionPool
1338	// sampleInterval is the interval of sampling by the maintainer.
1339	sampleInterval time.Duration
1340	// ready is used to signal that maintainer can start running.
1341	ready chan struct{}
1342	// done is used to signal that health checker should be closed.
1343	done chan struct{}
1344	// once is used for closing channel done only once.
1345	once             sync.Once
1346	maintainerCancel func()
1347}
1348
1349// newHealthChecker initializes new instance of healthChecker.
1350func newHealthChecker(interval time.Duration, workers int, sampleInterval time.Duration, pool *sessionPool) *healthChecker {
1351	if workers <= 0 {
1352		workers = 1
1353	}
1354	hc := &healthChecker{
1355		interval:         interval,
1356		workers:          workers,
1357		pool:             pool,
1358		sampleInterval:   sampleInterval,
1359		ready:            make(chan struct{}),
1360		done:             make(chan struct{}),
1361		maintainerCancel: func() {},
1362	}
1363	hc.waitWorkers.Add(1)
1364	go hc.maintainer()
1365	for i := 1; i <= hc.workers; i++ {
1366		hc.waitWorkers.Add(1)
1367		go hc.worker(i)
1368	}
1369	return hc
1370}
1371
1372// close closes the healthChecker and waits for all healthcheck workers to exit.
1373func (hc *healthChecker) close() {
1374	hc.mu.Lock()
1375	hc.maintainerCancel()
1376	hc.mu.Unlock()
1377	hc.once.Do(func() { close(hc.done) })
1378	hc.waitWorkers.Wait()
1379}
1380
1381// isClosing checks if a healthChecker is already closing.
1382func (hc *healthChecker) isClosing() bool {
1383	select {
1384	case <-hc.done:
1385		return true
1386	default:
1387		return false
1388	}
1389}
1390
1391// getInterval gets the healthcheck interval.
1392func (hc *healthChecker) getInterval() time.Duration {
1393	hc.mu.Lock()
1394	defer hc.mu.Unlock()
1395	return hc.interval
1396}
1397
1398// scheduledHCLocked schedules next healthcheck on session s with the assumption
1399// that hc.mu is being held.
1400func (hc *healthChecker) scheduledHCLocked(s *session) {
1401	var constPart, randPart float64
1402	if !s.firstHCDone {
1403		// The first check will be scheduled in a large range to make requests
1404		// more evenly distributed. The first healthcheck will be scheduled
1405		// after [interval*0.2, interval*1.1) ns.
1406		constPart = float64(hc.interval) * 0.2
1407		randPart = hc.pool.rand.Float64() * float64(hc.interval) * 0.9
1408		s.firstHCDone = true
1409	} else {
1410		// The next healthcheck will be scheduled after
1411		// [interval*0.9, interval*1.1) ns.
1412		constPart = float64(hc.interval) * 0.9
1413		randPart = hc.pool.rand.Float64() * float64(hc.interval) * 0.2
1414	}
1415	// math.Ceil makes the value to be at least 1 ns.
1416	nsFromNow := int64(math.Ceil(constPart + randPart))
1417	s.setNextCheck(time.Now().Add(time.Duration(nsFromNow)))
1418	if hi := s.getHcIndex(); hi != -1 {
1419		// Session is still being tracked by healthcheck workers.
1420		heap.Fix(&hc.queue, hi)
1421	}
1422}
1423
1424// scheduledHC schedules next healthcheck on session s. It is safe to be called
1425// concurrently.
1426func (hc *healthChecker) scheduledHC(s *session) {
1427	hc.mu.Lock()
1428	defer hc.mu.Unlock()
1429	hc.scheduledHCLocked(s)
1430}
1431
1432// register registers a session with healthChecker for periodical healthcheck.
1433func (hc *healthChecker) register(s *session) {
1434	hc.mu.Lock()
1435	defer hc.mu.Unlock()
1436	hc.scheduledHCLocked(s)
1437	heap.Push(&hc.queue, s)
1438}
1439
1440// unregister unregisters a session from healthcheck queue.
1441func (hc *healthChecker) unregister(s *session) {
1442	hc.mu.Lock()
1443	defer hc.mu.Unlock()
1444	oi := s.setHcIndex(-1)
1445	if oi >= 0 {
1446		heap.Remove(&hc.queue, oi)
1447	}
1448}
1449
1450// markDone marks that health check for session has been performed.
1451func (hc *healthChecker) markDone(s *session) {
1452	hc.mu.Lock()
1453	defer hc.mu.Unlock()
1454	s.checkingHealth = false
1455}
1456
1457// healthCheck checks the health of the session and pings it if needed.
1458func (hc *healthChecker) healthCheck(s *session) {
1459	defer hc.markDone(s)
1460	if !s.pool.isValid() {
1461		// Session pool is closed, perform a garbage collection.
1462		s.destroy(false)
1463		return
1464	}
1465	if err := s.ping(); isSessionNotFoundError(err) {
1466		// Ping failed, destroy the session.
1467		s.destroy(false)
1468	}
1469}
1470
1471// worker performs the healthcheck on sessions in healthChecker's priority
1472// queue.
1473func (hc *healthChecker) worker(i int) {
1474	// Returns a session which we should ping to keep it alive.
1475	getNextForPing := func() *session {
1476		hc.pool.mu.Lock()
1477		defer hc.pool.mu.Unlock()
1478		hc.mu.Lock()
1479		defer hc.mu.Unlock()
1480		if hc.queue.Len() <= 0 {
1481			// Queue is empty.
1482			return nil
1483		}
1484		s := hc.queue.sessions[0]
1485		if s.getNextCheck().After(time.Now()) && hc.pool.valid {
1486			// All sessions have been checked recently.
1487			return nil
1488		}
1489		hc.scheduledHCLocked(s)
1490		if !s.checkingHealth {
1491			s.checkingHealth = true
1492			return s
1493		}
1494		return nil
1495	}
1496
1497	// Returns a session which we should prepare for write.
1498	getNextForTx := func() *session {
1499		hc.pool.mu.Lock()
1500		defer hc.pool.mu.Unlock()
1501		if hc.pool.shouldPrepareWriteLocked() {
1502			if hc.pool.idleList.Len() > 0 && hc.pool.valid {
1503				hc.mu.Lock()
1504				defer hc.mu.Unlock()
1505				if hc.pool.idleList.Front().Value.(*session).checkingHealth {
1506					return nil
1507				}
1508				session := hc.pool.idleList.Remove(hc.pool.idleList.Front()).(*session)
1509				ctx := context.Background()
1510				hc.pool.decNumReadsLocked(ctx)
1511				session.checkingHealth = true
1512				hc.pool.incNumBeingPreparedLocked(ctx)
1513				return session
1514			}
1515		}
1516		return nil
1517	}
1518
1519	for {
1520		if hc.isClosing() {
1521			// Exit when the pool has been closed and all sessions have been
1522			// destroyed or when health checker has been closed.
1523			hc.waitWorkers.Done()
1524			return
1525		}
1526		ws := getNextForTx()
1527		if ws != nil {
1528			ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
1529			err := ws.prepareForWrite(ctx)
1530			if err != nil {
1531				// Skip handling prepare error, session can be prepared in next
1532				// cycle.
1533				// Don't log about permission errors, which may be expected
1534				// (e.g. using read-only auth).
1535				serr := ToSpannerError(err).(*Error)
1536				if serr.Code != codes.PermissionDenied {
1537					logf(hc.pool.sc.logger, "Failed to prepare session, error: %v", serr)
1538				}
1539			}
1540			hc.pool.recycle(ws)
1541			hc.pool.mu.Lock()
1542			hc.pool.decNumBeingPreparedLocked(ctx)
1543			hc.pool.mu.Unlock()
1544			cancel()
1545			hc.markDone(ws)
1546		}
1547		rs := getNextForPing()
1548		if rs == nil {
1549			if ws == nil {
1550				// No work to be done so sleep to avoid burning CPU.
1551				pause := int64(100 * time.Millisecond)
1552				if pause > int64(hc.interval) {
1553					pause = int64(hc.interval)
1554				}
1555				select {
1556				case <-time.After(time.Duration(rand.Int63n(pause) + pause/2)):
1557				case <-hc.done:
1558				}
1559
1560			}
1561			continue
1562		}
1563		hc.healthCheck(rs)
1564	}
1565}
1566
1567// maintainer maintains the number of sessions in the pool based on the session
1568// pool configuration and the current and historical number of sessions checked
1569// out of the pool. The maintainer will:
1570// 1. Ensure that the session pool contains at least MinOpened sessions.
1571// 2. If the current number of sessions in the pool exceeds the greatest number
1572//    of checked out sessions (=sessions in use) during the last 10 minutes,
1573//    and the delta is larger than MaxIdleSessions, the maintainer will reduce
1574//    the number of sessions to maxSessionsInUseDuringWindow+MaxIdleSessions.
1575func (hc *healthChecker) maintainer() {
1576	// Wait until the pool is ready.
1577	<-hc.ready
1578
1579	for iteration := uint64(0); ; iteration++ {
1580		if hc.isClosing() {
1581			hc.waitWorkers.Done()
1582			return
1583		}
1584
1585		hc.pool.mu.Lock()
1586		currSessionsOpened := hc.pool.numOpened
1587		maxIdle := hc.pool.MaxIdle
1588		minOpened := hc.pool.MinOpened
1589
1590		// Reset the start time for recording the maximum number of sessions
1591		// in the pool.
1592		now := time.Now()
1593		if now.After(hc.pool.lastResetTime.Add(10 * time.Minute)) {
1594			hc.pool.maxNumInUse = hc.pool.numInUse
1595			hc.pool.recordStat(context.Background(), MaxInUseSessionsCount, int64(hc.pool.maxNumInUse))
1596			hc.pool.lastResetTime = now
1597		}
1598		hc.pool.mu.Unlock()
1599		// Get the maximum number of sessions in use during the current
1600		// maintenance window.
1601		maxSessionsInUseDuringWindow := hc.pool.mw.maxSessionsCheckedOutDuringWindow()
1602		hc.mu.Lock()
1603		ctx, cancel := context.WithTimeout(context.Background(), hc.sampleInterval)
1604		hc.maintainerCancel = cancel
1605		hc.mu.Unlock()
1606
1607		// Grow or shrink pool if needed.
1608		// The number of sessions in the pool should be in the range
1609		// [Config.MinOpened, Config.MaxIdle+maxSessionsInUseDuringWindow]
1610		if currSessionsOpened < minOpened {
1611			if err := hc.growPoolInBatch(ctx, minOpened); err != nil {
1612				logf(hc.pool.sc.logger, "failed to grow pool: %v", err)
1613			}
1614		} else if maxIdle+maxSessionsInUseDuringWindow < currSessionsOpened {
1615			hc.shrinkPool(ctx, maxIdle+maxSessionsInUseDuringWindow)
1616		}
1617
1618		select {
1619		case <-ctx.Done():
1620		case <-hc.done:
1621			cancel()
1622		}
1623		// Cycle the maintenance window. This will remove the oldest cycle and
1624		// add a new cycle at the beginning of the maintenance window with the
1625		// currently checked out number of sessions as the max number of
1626		// sessions in use in this cycle. This value will be increased during
1627		// the next cycle if it increases.
1628		hc.pool.mu.Lock()
1629		currSessionsInUse := hc.pool.currSessionsCheckedOutLocked()
1630		hc.pool.mu.Unlock()
1631		hc.pool.mw.startNewCycle(currSessionsInUse)
1632	}
1633}
1634
1635func (hc *healthChecker) growPoolInBatch(ctx context.Context, growToNumSessions uint64) error {
1636	hc.pool.mu.Lock()
1637	defer hc.pool.mu.Unlock()
1638	numSessions := growToNumSessions - hc.pool.numOpened
1639	return hc.pool.growPoolLocked(numSessions, false)
1640}
1641
1642// shrinkPool scales down the session pool. The method will stop deleting
1643// sessions when shrinkToNumSessions number of sessions in the pool has
1644// been reached. The method will also stop deleting sessions if it detects that
1645// another process has started creating sessions for the pool again, for
1646// example through the take() method.
1647func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uint64) {
1648	hc.pool.mu.Lock()
1649	maxSessionsToDelete := int(hc.pool.numOpened - shrinkToNumSessions)
1650	hc.pool.mu.Unlock()
1651	var deleted int
1652	var prevNumOpened uint64 = math.MaxUint64
1653	for {
1654		if ctx.Err() != nil {
1655			return
1656		}
1657
1658		p := hc.pool
1659		p.mu.Lock()
1660		// Check if the number of open sessions has increased. If it has, we
1661		// should stop deleting sessions, as the load has increased and
1662		// additional sessions are needed.
1663		if p.numOpened >= prevNumOpened {
1664			p.mu.Unlock()
1665			break
1666		}
1667		prevNumOpened = p.numOpened
1668
1669		// Check on both whether we have reached the number of open sessions as
1670		// well as the number of sessions to delete, in case sessions have been
1671		// deleted by other methods because they have expired or deemed
1672		// invalid.
1673		if shrinkToNumSessions >= p.numOpened || deleted >= maxSessionsToDelete {
1674			p.mu.Unlock()
1675			break
1676		}
1677
1678		var s *session
1679		if p.idleList.Len() > 0 {
1680			s = p.idleList.Front().Value.(*session)
1681		} else if p.idleWriteList.Len() > 0 {
1682			s = p.idleWriteList.Front().Value.(*session)
1683		}
1684		p.mu.Unlock()
1685		if s != nil {
1686			deleted++
1687			// destroy session as expire.
1688			s.destroy(true)
1689		} else {
1690			break
1691		}
1692	}
1693}
1694
1695// maxUint64 returns the maximum of two uint64.
1696func maxUint64(a, b uint64) uint64 {
1697	if a > b {
1698		return a
1699	}
1700	return b
1701}
1702
1703// minUint64 returns the minimum of two uint64.
1704func minUint64(a, b uint64) uint64 {
1705	if a > b {
1706		return b
1707	}
1708	return a
1709}
1710
1711// sessionResourceType is the type name of Spanner sessions.
1712const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session"
1713
1714// isSessionNotFoundError returns true if the given error is a
1715// `Session not found` error.
1716func isSessionNotFoundError(err error) bool {
1717	if err == nil {
1718		return false
1719	}
1720	if ErrCode(err) == codes.NotFound {
1721		if rt, ok := extractResourceType(err); ok {
1722			return rt == sessionResourceType
1723		}
1724	}
1725	return false
1726}
1727