1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"bytes"
21	"container/heap"
22	"context"
23	"fmt"
24	"io/ioutil"
25	"log"
26	"math/rand"
27	"os"
28	"strings"
29	"testing"
30	"time"
31
32	. "cloud.google.com/go/spanner/internal/testutil"
33	"google.golang.org/api/iterator"
34	"google.golang.org/genproto/googleapis/rpc/errdetails"
35	sppb "google.golang.org/genproto/googleapis/spanner/v1"
36	"google.golang.org/grpc/codes"
37	"google.golang.org/grpc/status"
38)
39
40func newSessionNotFoundError(name string) error {
41	s := status.Newf(codes.NotFound, "Session not found: Session with id %s not found", name)
42	s, _ = s.WithDetails(&errdetails.ResourceInfo{ResourceType: sessionResourceType, ResourceName: name})
43	return s.Err()
44}
45
46// TestSessionPoolConfigValidation tests session pool config validation.
47func TestSessionPoolConfigValidation(t *testing.T) {
48	t.Parallel()
49	_, client, teardown := setupMockedTestServer(t)
50	defer teardown()
51
52	for _, test := range []struct {
53		spc SessionPoolConfig
54		err error
55	}{
56		{
57			SessionPoolConfig{
58				MinOpened: 10,
59				MaxOpened: 5,
60			},
61			errMinOpenedGTMaxOpened(5, 10),
62		},
63		{
64			SessionPoolConfig{
65				WriteSessions: -0.1,
66			},
67			errWriteFractionOutOfRange(-0.1),
68		},
69		{
70			SessionPoolConfig{
71				WriteSessions: 2.0,
72			},
73			errWriteFractionOutOfRange(2.0),
74		},
75		{
76			SessionPoolConfig{
77				HealthCheckWorkers: -1,
78			},
79			errHealthCheckWorkersNegative(-1),
80		},
81		{
82			SessionPoolConfig{
83				HealthCheckInterval: -time.Second,
84			},
85			errHealthCheckIntervalNegative(-time.Second),
86		},
87	} {
88		if _, err := newSessionPool(client.sc, test.spc); !testEqual(err, test.err) {
89			t.Fatalf("want %v, got %v", test.err, err)
90		}
91	}
92}
93
94// TestSessionCreation tests session creation during sessionPool.Take().
95func TestSessionCreation(t *testing.T) {
96	t.Parallel()
97	ctx := context.Background()
98	server, client, teardown := setupMockedTestServer(t)
99	defer teardown()
100	sp := client.idleSessions
101
102	// Take three sessions from session pool, this should trigger session pool
103	// to create three new sessions.
104	shs := make([]*sessionHandle, 3)
105	// gotDs holds the unique sessions taken from session pool.
106	gotDs := map[string]bool{}
107	for i := 0; i < len(shs); i++ {
108		var err error
109		shs[i], err = sp.take(ctx)
110		if err != nil {
111			t.Fatalf("failed to get session(%v): %v", i, err)
112		}
113		gotDs[shs[i].getID()] = true
114	}
115	if len(gotDs) != len(shs) {
116		t.Fatalf("session pool created %v sessions, want %v", len(gotDs), len(shs))
117	}
118	if wantDs := server.TestSpanner.DumpSessions(); !testEqual(gotDs, wantDs) {
119		t.Fatalf("session pool creates sessions %v, want %v", gotDs, wantDs)
120	}
121	// Verify that created sessions are recorded correctly in session pool.
122	sp.mu.Lock()
123	if int(sp.numOpened) != len(shs) {
124		t.Fatalf("session pool reports %v open sessions, want %v", sp.numOpened, len(shs))
125	}
126	if sp.createReqs != 0 {
127		t.Fatalf("session pool reports %v session create requests, want 0", int(sp.createReqs))
128	}
129	sp.mu.Unlock()
130	// Verify that created sessions are tracked correctly by healthcheck queue.
131	hc := sp.hc
132	hc.mu.Lock()
133	if hc.queue.Len() != len(shs) {
134		t.Fatalf("healthcheck queue length = %v, want %v", hc.queue.Len(), len(shs))
135	}
136	for _, s := range hc.queue.sessions {
137		if !gotDs[s.getID()] {
138			t.Fatalf("session %v is in healthcheck queue, but it is not created by session pool", s.getID())
139		}
140	}
141	hc.mu.Unlock()
142}
143
144// TestLIFOSessionOrder tests if session pool hand out sessions in LIFO order.
145func TestLIFOSessionOrder(t *testing.T) {
146	t.Parallel()
147	ctx := context.Background()
148	_, client, teardown := setupMockedTestServerWithConfig(t,
149		ClientConfig{
150			SessionPoolConfig: SessionPoolConfig{
151				MaxOpened: 3,
152				MinOpened: 3,
153			},
154		})
155	defer teardown()
156	sp := client.idleSessions
157	// Create/take three sessions and recycle them.
158	shs, shsIDs := make([]*sessionHandle, 3), make([]string, 3)
159	for i := 0; i < len(shs); i++ {
160		var err error
161		if shs[i], err = sp.take(ctx); err != nil {
162			t.Fatalf("failed to take session(%v): %v", i, err)
163		}
164		shsIDs[i] = shs[i].getID()
165	}
166	for i := 0; i < len(shs); i++ {
167		shs[i].recycle()
168	}
169	for i := 2; i >= 0; i-- {
170		sh, err := sp.take(ctx)
171		if err != nil {
172			t.Fatalf("cannot take session from session pool: %v", err)
173		}
174		// check, if sessions returned in LIFO order.
175		if wantID, gotID := shsIDs[i], sh.getID(); wantID != gotID {
176			t.Fatalf("got session with id: %v, want: %v", gotID, wantID)
177		}
178	}
179}
180
181// TestLIFOTakeWriteSessionOrder tests if write session pool hand out sessions in LIFO order.
182func TestLIFOTakeWriteSessionOrder(t *testing.T) {
183	t.Skip("https://github.com/googleapis/google-cloud-go/issues/1704")
184	t.Parallel()
185	ctx := context.Background()
186	_, client, teardown := setupMockedTestServerWithConfig(t,
187		ClientConfig{
188			SessionPoolConfig: SessionPoolConfig{
189				MaxOpened:     3,
190				MinOpened:     3,
191				WriteSessions: 1,
192			},
193		})
194	defer teardown()
195	sp := client.idleSessions
196	// Create/take three sessions and recycle them.
197	shs, shsIDs := make([]*sessionHandle, 3), make([]string, 3)
198	for i := 0; i < len(shs); i++ {
199		var err error
200		if shs[i], err = sp.takeWriteSession(ctx); err != nil {
201			t.Fatalf("failed to take session(%v): %v", i, err)
202		}
203		shsIDs[i] = shs[i].getID()
204	}
205	for i := 0; i < len(shs); i++ {
206		shs[i].recycle()
207	}
208	for i := 2; i >= 0; i-- {
209		ws, err := sp.takeWriteSession(ctx)
210		if err != nil {
211			t.Fatalf("cannot take session from session pool: %v", err)
212		}
213		// check, if write sessions returned in LIFO order.
214		if wantID, gotID := shsIDs[i], ws.getID(); wantID != gotID {
215			t.Fatalf("got session with id: %v, want: %v", gotID, wantID)
216		}
217	}
218}
219
220// TestTakeFromIdleList tests taking sessions from session pool's idle list.
221func TestTakeFromIdleList(t *testing.T) {
222	t.Parallel()
223	ctx := context.Background()
224
225	// Make sure maintainer keeps the idle sessions.
226	server, client, teardown := setupMockedTestServerWithConfig(t,
227		ClientConfig{
228			SessionPoolConfig: SessionPoolConfig{MaxIdle: 10},
229		})
230	defer teardown()
231	sp := client.idleSessions
232
233	// Take ten sessions from session pool and recycle them.
234	shs := make([]*sessionHandle, 10)
235	for i := 0; i < len(shs); i++ {
236		var err error
237		shs[i], err = sp.take(ctx)
238		if err != nil {
239			t.Fatalf("failed to get session(%v): %v", i, err)
240		}
241	}
242	// Make sure it's sampled once before recycling, otherwise it will be
243	// cleaned up.
244	<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
245	for i := 0; i < len(shs); i++ {
246		shs[i].recycle()
247	}
248	// Further session requests from session pool won't cause mockclient to
249	// create more sessions.
250	wantSessions := server.TestSpanner.DumpSessions()
251	// Take ten sessions from session pool again, this time all sessions should
252	// come from idle list.
253	gotSessions := map[string]bool{}
254	for i := 0; i < len(shs); i++ {
255		sh, err := sp.take(ctx)
256		if err != nil {
257			t.Fatalf("cannot take session from session pool: %v", err)
258		}
259		gotSessions[sh.getID()] = true
260	}
261	if len(gotSessions) != 10 {
262		t.Fatalf("got %v unique sessions, want 10", len(gotSessions))
263	}
264	if !testEqual(gotSessions, wantSessions) {
265		t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions)
266	}
267}
268
269// TesttakeWriteSessionFromIdleList tests taking write sessions from session
270// pool's idle list.
271func TestTakeWriteSessionFromIdleList(t *testing.T) {
272	t.Parallel()
273	ctx := context.Background()
274
275	// Make sure maintainer keeps the idle sessions.
276	server, client, teardown := setupMockedTestServerWithConfig(t,
277		ClientConfig{
278			SessionPoolConfig: SessionPoolConfig{MaxIdle: 20},
279		})
280	defer teardown()
281	sp := client.idleSessions
282
283	// Take ten sessions from session pool and recycle them.
284	shs := make([]*sessionHandle, 10)
285	for i := 0; i < len(shs); i++ {
286		var err error
287		shs[i], err = sp.takeWriteSession(ctx)
288		if err != nil {
289			t.Fatalf("failed to get session(%v): %v", i, err)
290		}
291	}
292	// Make sure it's sampled once before recycling, otherwise it will be
293	// cleaned up.
294	<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
295	for i := 0; i < len(shs); i++ {
296		shs[i].recycle()
297	}
298	// Further session requests from session pool won't cause mockclient to
299	// create more sessions.
300	wantSessions := server.TestSpanner.DumpSessions()
301	// Take ten sessions from session pool again, this time all sessions should
302	// come from idle list.
303	gotSessions := map[string]bool{}
304	for i := 0; i < len(shs); i++ {
305		sh, err := sp.takeWriteSession(ctx)
306		if err != nil {
307			t.Fatalf("cannot take session from session pool: %v", err)
308		}
309		gotSessions[sh.getID()] = true
310	}
311	if len(gotSessions) != 10 {
312		t.Fatalf("got %v unique sessions, want 10", len(gotSessions))
313	}
314	if !testEqual(gotSessions, wantSessions) {
315		t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions)
316	}
317}
318
319// TestTakeFromIdleListChecked tests taking sessions from session pool's idle
320// list, but with a extra ping check.
321func TestTakeFromIdleListChecked(t *testing.T) {
322	t.Parallel()
323	ctx := context.Background()
324
325	// Make sure maintainer keeps the idle sessions.
326	server, client, teardown := setupMockedTestServerWithConfig(t,
327		ClientConfig{
328			SessionPoolConfig: SessionPoolConfig{
329				MaxIdle:                   1,
330				HealthCheckInterval:       50 * time.Millisecond,
331				healthCheckSampleInterval: 10 * time.Millisecond,
332			},
333		})
334	defer teardown()
335	sp := client.idleSessions
336
337	// Stop healthcheck workers to simulate slow pings.
338	sp.hc.close()
339
340	// Create a session and recycle it.
341	sh, err := sp.take(ctx)
342	if err != nil {
343		t.Fatalf("failed to get session: %v", err)
344	}
345
346	// Force ping during the first take() by setting check time to the past.
347	sh.session.nextCheck = time.Now().Add(-time.Minute)
348	wantSid := sh.getID()
349	sh.recycle()
350
351	// Two back-to-back session requests, both of them should return the same
352	// session created before, but only the first of them should trigger a session ping.
353	for i := 0; i < 2; i++ {
354		// Take the session from the idle list and recycle it.
355		sh, err = sp.take(ctx)
356		if err != nil {
357			t.Fatalf("%v - failed to get session: %v", i, err)
358		}
359		if gotSid := sh.getID(); gotSid != wantSid {
360			t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid)
361		}
362
363		// The two back-to-back session requests shouldn't trigger any session
364		// pings because sessionPool.Take reschedules the next healthcheck.
365		if got, want := server.TestSpanner.DumpPings(), ([]string{wantSid}); !testEqual(got, want) {
366			t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want)
367		}
368		sh.recycle()
369	}
370
371	// Inject session error to server stub, and take the session from the
372	// session pool, the old session should be destroyed and the session pool
373	// will create a new session.
374	server.TestSpanner.PutExecutionTime(MethodGetSession,
375		SimulatedExecutionTime{
376			Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
377		})
378
379	// Force ping by setting check time in the past.
380	s := sp.idleList.Front().Value.(*session)
381	s.nextCheck = time.Now().Add(-time.Minute)
382
383	// take will take the idle session. Then it will send a GetSession request
384	// to check if it's healthy. It'll discover that it's not healthy
385	// (NotFound), drop it, and create a new session.
386	sh, err = sp.take(ctx)
387	if err != nil {
388		t.Fatalf("failed to get session: %v", err)
389	}
390	ds := server.TestSpanner.DumpSessions()
391	if len(ds) != 1 {
392		t.Fatalf("dumped sessions from mockclient: %v, want %v", ds, sh.getID())
393	}
394	if sh.getID() == wantSid {
395		t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid)
396	}
397}
398
399// TestTakeFromIdleWriteListChecked tests taking sessions from session pool's
400// idle list, but with a extra ping check.
401func TestTakeFromIdleWriteListChecked(t *testing.T) {
402	t.Parallel()
403	ctx := context.Background()
404
405	// Make sure maintainer keeps the idle sessions.
406	server, client, teardown := setupMockedTestServerWithConfig(t,
407		ClientConfig{
408			SessionPoolConfig: SessionPoolConfig{
409				MaxIdle:                   1,
410				HealthCheckInterval:       50 * time.Millisecond,
411				healthCheckSampleInterval: 10 * time.Millisecond,
412			},
413		})
414	defer teardown()
415	sp := client.idleSessions
416
417	// Stop healthcheck workers to simulate slow pings.
418	sp.hc.close()
419
420	// Create a session and recycle it.
421	sh, err := sp.takeWriteSession(ctx)
422	if err != nil {
423		t.Fatalf("failed to get session: %v", err)
424	}
425	wantSid := sh.getID()
426	// Set the next check in the past to ensure the next take() call will
427	// trigger a health check.
428	sh.session.nextCheck = time.Now().Add(-time.Minute)
429	sh.recycle()
430
431	// Two back-to-back session requests, both of them should return the same
432	// session created before and only the first of them should trigger a
433	// session ping.
434	for i := 0; i < 2; i++ {
435		// Take the session from the idle list and recycle it.
436		sh, err = sp.takeWriteSession(ctx)
437		if err != nil {
438			t.Fatalf("%v - failed to get session: %v", i, err)
439		}
440		if gotSid := sh.getID(); gotSid != wantSid {
441			t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid)
442		}
443		// The two back-to-back session requests shouldn't trigger any session
444		// pings because sessionPool.Take reschedules the next healthcheck.
445		if got, want := server.TestSpanner.DumpPings(), ([]string{wantSid}); !testEqual(got, want) {
446			t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want)
447		}
448		sh.recycle()
449	}
450
451	// Inject session error to mockclient, and take the session from the
452	// session pool, the old session should be destroyed and the session pool
453	// will create a new session.
454	server.TestSpanner.PutExecutionTime(MethodGetSession,
455		SimulatedExecutionTime{
456			Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
457		})
458
459	// Force ping by setting check time in the past.
460	s := sp.idleList.Front().Value.(*session)
461	s.nextCheck = time.Now().Add(-time.Minute)
462
463	sh, err = sp.takeWriteSession(ctx)
464	if err != nil {
465		t.Fatalf("failed to get session: %v", err)
466	}
467	ds := server.TestSpanner.DumpSessions()
468	if len(ds) != 1 {
469		t.Fatalf("dumped sessions from mockclient: %v, want %v", ds, sh.getID())
470	}
471	if sh.getID() == wantSid {
472		t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid)
473	}
474}
475
476// TestSessionLeak tests leaking a session and getting the stack of the
477// goroutine that leaked it.
478func TestSessionLeak(t *testing.T) {
479	t.Parallel()
480	ctx := context.Background()
481
482	_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
483		SessionPoolConfig: SessionPoolConfig{
484			TrackSessionHandles: true,
485			MinOpened:           0,
486			MaxOpened:           1,
487		},
488	})
489	defer teardown()
490
491	// Execute a query without calling rowIterator.Stop. This will cause the
492	// session not to be returned to the pool.
493	single := client.Single()
494	iter := single.Query(ctx, NewStatement(SelectFooFromBar))
495	for {
496		_, err := iter.Next()
497		if err == iterator.Done {
498			break
499		}
500		if err != nil {
501			t.Fatalf("Got unexpected error while iterating results: %v\n", err)
502		}
503	}
504	// The session should not have been returned to the pool.
505	if g, w := client.idleSessions.idleList.Len(), 0; g != w {
506		t.Fatalf("Idle sessions count mismatch\nGot: %d\nWant: %d\n", g, w)
507	}
508	// The checked out session should contain a stack trace.
509	if single.sh.stack == nil {
510		t.Fatalf("Missing stacktrace from session handle")
511	}
512	stack := fmt.Sprintf("%s", single.sh.stack)
513	testMethod := "TestSessionLeak"
514	if !strings.Contains(stack, testMethod) {
515		t.Fatalf("Stacktrace does not contain '%s'\nGot: %s", testMethod, stack)
516	}
517	// Return the session to the pool.
518	iter.Stop()
519	// The stack should now have been removed from the session handle.
520	if single.sh.stack != nil {
521		t.Fatalf("Got unexpected stacktrace in session handle: %s", single.sh.stack)
522	}
523
524	// Do another query and hold on to the session.
525	single = client.Single()
526	iter = single.Query(ctx, NewStatement(SelectFooFromBar))
527	for {
528		_, err := iter.Next()
529		if err == iterator.Done {
530			break
531		}
532		if err != nil {
533			t.Fatalf("Got unexpected error while iterating results: %v\n", err)
534		}
535	}
536	// Try to do another query. This will fail as MaxOpened=1.
537	ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Millisecond*10)
538	defer cancel()
539	single2 := client.Single()
540	iter2 := single2.Query(ctxWithTimeout, NewStatement(SelectFooFromBar))
541	_, gotErr := iter2.Next()
542	wantErr := client.idleSessions.errGetSessionTimeoutWithTrackedSessionHandles()
543	// The error should contain the stacktraces of all the checked out
544	// sessions.
545	if !testEqual(gotErr, wantErr) {
546		t.Fatalf("Error mismatch on iterating result set.\nGot: %v\nWant: %v\n", gotErr, wantErr)
547	}
548	if !strings.Contains(gotErr.Error(), testMethod) {
549		t.Fatalf("Error does not contain '%s'\nGot: %s", testMethod, gotErr.Error())
550	}
551	// Close iterators to check sessions back into the pool before closing.
552	iter2.Stop()
553	iter.Stop()
554}
555
556// TestMaxOpenedSessions tests max open sessions constraint.
557func TestMaxOpenedSessions(t *testing.T) {
558	t.Parallel()
559	ctx := context.Background()
560	_, client, teardown := setupMockedTestServerWithConfig(t,
561		ClientConfig{
562			SessionPoolConfig: SessionPoolConfig{
563				MaxOpened: 1,
564			},
565		})
566	defer teardown()
567	sp := client.idleSessions
568
569	sh1, err := sp.take(ctx)
570	if err != nil {
571		t.Fatalf("cannot take session from session pool: %v", err)
572	}
573
574	// Session request will timeout due to the max open sessions constraint.
575	ctx2, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
576	defer cancel()
577	_, gotErr := sp.take(ctx2)
578	if wantErr := sp.errGetBasicSessionTimeout(); !testEqual(gotErr, wantErr) {
579		t.Fatalf("the second session retrival returns error %v, want %v", gotErr, wantErr)
580	}
581	doneWaiting := make(chan struct{})
582	go func() {
583		// Destroy the first session to allow the next session request to
584		// proceed.
585		<-doneWaiting
586		sh1.destroy()
587	}()
588
589	go func() {
590		// Wait a short random time before destroying the session handle.
591		<-time.After(10 * time.Millisecond)
592		close(doneWaiting)
593	}()
594	// Now session request can be processed because the first session will be
595	// destroyed.
596	ctx3, cancel := context.WithTimeout(ctx, time.Second)
597	defer cancel()
598	sh2, err := sp.take(ctx3)
599	if err != nil {
600		t.Fatalf("after the first session is destroyed, session retrival still returns error %v, want nil", err)
601	}
602	if !sh2.session.isValid() || sh2.getID() == "" {
603		t.Fatalf("got invalid session: %v", sh2.session)
604	}
605}
606
607// TestMinOpenedSessions tests min open session constraint.
608func TestMinOpenedSessions(t *testing.T) {
609	t.Parallel()
610	ctx := context.Background()
611	_, client, teardown := setupMockedTestServerWithConfig(t,
612		ClientConfig{
613			SessionPoolConfig: SessionPoolConfig{
614				MinOpened:                 1,
615				healthCheckSampleInterval: time.Millisecond,
616			},
617		})
618	defer teardown()
619	sp := client.idleSessions
620
621	// Take ten sessions from session pool and recycle them.
622	var ss []*session
623	var shs []*sessionHandle
624	for i := 0; i < 10; i++ {
625		sh, err := sp.take(ctx)
626		if err != nil {
627			t.Fatalf("failed to get session(%v): %v", i, err)
628		}
629		ss = append(ss, sh.session)
630		shs = append(shs, sh)
631		sh.recycle()
632	}
633	for _, sh := range shs {
634		sh.recycle()
635	}
636
637	// Simulate session expiration.
638	for _, s := range ss {
639		s.destroy(true)
640	}
641
642	// Wait until the maintainer has had a chance to replenish the pool.
643	for i := 0; i < 10; i++ {
644		sp.mu.Lock()
645		if sp.numOpened > 0 {
646			sp.mu.Unlock()
647			break
648		}
649		sp.mu.Unlock()
650		<-time.After(sp.healthCheckSampleInterval)
651	}
652	sp.mu.Lock()
653	defer sp.mu.Unlock()
654	// There should be still one session left in either the idle list or in one
655	// of the other opened states due to the min open sessions constraint.
656	if (sp.idleList.Len() +
657		sp.idleWriteList.Len() +
658		int(sp.prepareReqs) +
659		int(sp.createReqs)) != 1 {
660		t.Fatalf(
661			"got %v sessions in idle lists, want 1. Opened: %d, read: %d, "+
662				"write: %d, in preparation: %d, in creation: %d",
663			sp.idleList.Len()+sp.idleWriteList.Len(), sp.numOpened,
664			sp.idleList.Len(), sp.idleWriteList.Len(), sp.prepareReqs,
665			sp.createReqs)
666	}
667}
668
669// TestMaxBurst tests max burst constraint.
670func TestMaxBurst(t *testing.T) {
671	t.Parallel()
672	ctx := context.Background()
673	server, client, teardown := setupMockedTestServerWithConfig(t,
674		ClientConfig{
675			SessionPoolConfig: SessionPoolConfig{
676				MaxBurst: 1,
677			},
678		})
679	defer teardown()
680	sp := client.idleSessions
681
682	// Will cause session creation RPC to be retried forever.
683	server.TestSpanner.PutExecutionTime(MethodCreateSession,
684		SimulatedExecutionTime{
685			Errors:    []error{status.Errorf(codes.Unavailable, "try later")},
686			KeepError: true,
687		})
688
689	// This session request will never finish until the injected error is
690	// cleared.
691	go sp.take(ctx)
692
693	// Poll for the execution of the first session request.
694	for {
695		sp.mu.Lock()
696		cr := sp.createReqs
697		sp.mu.Unlock()
698		if cr == 0 {
699			<-time.After(time.Second)
700			continue
701		}
702		// The first session request is being executed.
703		break
704	}
705
706	ctx2, cancel := context.WithTimeout(ctx, time.Second)
707	defer cancel()
708	_, gotErr := sp.take(ctx2)
709
710	// Since MaxBurst == 1, the second session request should block.
711	if wantErr := sp.errGetBasicSessionTimeout(); !testEqual(gotErr, wantErr) {
712		t.Fatalf("session retrival returns error %v, want %v", gotErr, wantErr)
713	}
714
715	// Let the first session request succeed.
716	server.TestSpanner.Freeze()
717	server.TestSpanner.PutExecutionTime(MethodCreateSession, SimulatedExecutionTime{})
718	//close(allowRequests)
719	server.TestSpanner.Unfreeze()
720
721	// Now new session request can proceed because the first session request will eventually succeed.
722	sh, err := sp.take(ctx)
723	if err != nil {
724		t.Fatalf("session retrival returns error %v, want nil", err)
725	}
726	if !sh.session.isValid() || sh.getID() == "" {
727		t.Fatalf("got invalid session: %v", sh.session)
728	}
729}
730
731// TestSessionRecycle tests recycling sessions.
732func TestSessionRecycle(t *testing.T) {
733	t.Parallel()
734	ctx := context.Background()
735	// Set MaxBurst=MinOpened to prevent additional sessions to be created
736	// while session pool initialization is still running.
737	_, client, teardown := setupMockedTestServerWithConfig(t,
738		ClientConfig{
739			SessionPoolConfig: SessionPoolConfig{
740				MinOpened: 1,
741				MaxIdle:   5,
742				MaxBurst:  1,
743			},
744		})
745	defer teardown()
746	sp := client.idleSessions
747
748	// Test session is correctly recycled and reused.
749	for i := 0; i < 20; i++ {
750		s, err := sp.take(ctx)
751		if err != nil {
752			t.Fatalf("cannot get the session %v: %v", i, err)
753		}
754		s.recycle()
755	}
756
757	sp.mu.Lock()
758	defer sp.mu.Unlock()
759	// The session pool should only contain 1 session, as there is no minimum
760	// configured. In addition, there has never been more than one session in
761	// use at any time, so there's no need for the session pool to create a
762	// second session. The session has also been in use all the time, so there
763	// also no reason for the session pool to delete the session.
764	if sp.numOpened != 1 {
765		t.Fatalf("Expect session pool size 1, got %d", sp.numOpened)
766	}
767}
768
769// TestSessionDestroy tests destroying sessions.
770func TestSessionDestroy(t *testing.T) {
771	t.Parallel()
772	ctx := context.Background()
773	_, client, teardown := setupMockedTestServerWithConfig(t,
774		ClientConfig{
775			SessionPoolConfig: SessionPoolConfig{
776				MinOpened: 1,
777				MaxBurst:  1,
778			},
779		})
780	defer teardown()
781	sp := client.idleSessions
782
783	// Creating a session pool with MinSessions=1 will automatically start the
784	// creation of 1 session when the session pool is created. As MaxBurst=1,
785	// the session pool will never create more than 1 session at a time, so the
786	// take() method will wait if the initial session has not yet been created.
787	sh, err := sp.take(ctx)
788	if err != nil {
789		t.Fatalf("cannot get session from session pool: %v", err)
790	}
791	s := sh.session
792	sh.recycle()
793	if d := s.destroy(true); d || !s.isValid() {
794		// Session should be remaining because of min open sessions constraint.
795		t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d)
796	}
797	if d := s.destroy(false); !d || s.isValid() {
798		// Session should be destroyed.
799		t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d)
800	}
801}
802
803// TestHcHeap tests heap operation on top of hcHeap.
804func TestHcHeap(t *testing.T) {
805	in := []*session{
806		{nextCheck: time.Unix(10, 0)},
807		{nextCheck: time.Unix(0, 5)},
808		{nextCheck: time.Unix(1, 8)},
809		{nextCheck: time.Unix(11, 7)},
810		{nextCheck: time.Unix(6, 3)},
811	}
812	want := []*session{
813		{nextCheck: time.Unix(1, 8), hcIndex: 0},
814		{nextCheck: time.Unix(6, 3), hcIndex: 1},
815		{nextCheck: time.Unix(8, 2), hcIndex: 2},
816		{nextCheck: time.Unix(10, 0), hcIndex: 3},
817		{nextCheck: time.Unix(11, 7), hcIndex: 4},
818	}
819	hh := hcHeap{}
820	for _, s := range in {
821		heap.Push(&hh, s)
822	}
823	// Change top of the heap and do a adjustment.
824	hh.sessions[0].nextCheck = time.Unix(8, 2)
825	heap.Fix(&hh, 0)
826	for idx := 0; hh.Len() > 0; idx++ {
827		got := heap.Pop(&hh).(*session)
828		want[idx].hcIndex = -1
829		if !testEqual(got, want[idx]) {
830			t.Fatalf("%v: heap.Pop returns %v, want %v", idx, got, want[idx])
831		}
832	}
833}
834
835// TestHealthCheckScheduler tests if healthcheck workers can schedule and
836// perform healthchecks properly.
837func TestHealthCheckScheduler(t *testing.T) {
838	t.Parallel()
839	ctx := context.Background()
840	server, client, teardown := setupMockedTestServerWithConfig(t,
841		ClientConfig{
842			SessionPoolConfig: SessionPoolConfig{
843				HealthCheckInterval:       50 * time.Millisecond,
844				healthCheckSampleInterval: 10 * time.Millisecond,
845			},
846		})
847	defer teardown()
848	sp := client.idleSessions
849
850	// Create 50 sessions.
851	for i := 0; i < 50; i++ {
852		_, err := sp.take(ctx)
853		if err != nil {
854			t.Fatalf("cannot get session from session pool: %v", err)
855		}
856	}
857
858	// Make sure we start with a ping history to avoid that the first
859	// sessions that were created have not already exceeded the maximum
860	// number of pings.
861	server.TestSpanner.ClearPings()
862	// Wait for 10-30 pings per session.
863	waitFor(t, func() error {
864		// Only check actually live sessions and ignore any sessions the
865		// session pool may have deleted in the meantime.
866		liveSessions := server.TestSpanner.DumpSessions()
867		dp := server.TestSpanner.DumpPings()
868		gotPings := map[string]int64{}
869		for _, p := range dp {
870			gotPings[p]++
871		}
872		for s := range liveSessions {
873			want := int64(20)
874			if got := gotPings[s]; got < want/2 || got > want+want/2 {
875				// This is an unnacceptable amount of pings.
876				return fmt.Errorf("got %v healthchecks on session %v, want it between (%v, %v)", got, s, want/2, want+want/2)
877			}
878		}
879		return nil
880	})
881}
882
883// TestHealthCheck_FirstHealthCheck tests if the first healthcheck scheduling
884// works properly.
885func TestHealthCheck_FirstHealthCheck(t *testing.T) {
886	t.Parallel()
887	_, client, teardown := setupMockedTestServerWithConfig(t,
888		ClientConfig{
889			SessionPoolConfig: SessionPoolConfig{
890				MaxOpened:           0,
891				MinOpened:           0,
892				HealthCheckInterval: 50 * time.Minute,
893			},
894		})
895	defer teardown()
896	sp := client.idleSessions
897
898	now := time.Now()
899	start := now.Add(time.Duration(float64(sp.hc.interval) * 0.2))
900	// A second is added to avoid the edge case.
901	end := now.Add(time.Duration(float64(sp.hc.interval)*1.1) + time.Second)
902
903	s := &session{}
904	sp.hc.scheduledHCLocked(s)
905
906	if s.nextCheck.Before(start) || s.nextCheck.After(end) {
907		t.Fatalf("The first healthcheck schedule is not in the correct range: %v", s.nextCheck)
908	}
909	if !s.firstHCDone {
910		t.Fatal("The flag 'firstHCDone' should be set to true after the first healthcheck.")
911	}
912}
913
914// TestHealthCheck_NonFirstHealthCheck tests if the scheduling after the first
915// health check works properly.
916func TestHealthCheck_NonFirstHealthCheck(t *testing.T) {
917	t.Parallel()
918	_, client, teardown := setupMockedTestServerWithConfig(t,
919		ClientConfig{
920			SessionPoolConfig: SessionPoolConfig{
921				MaxOpened:           0,
922				MinOpened:           0,
923				HealthCheckInterval: 50 * time.Minute,
924			},
925		})
926	defer teardown()
927	sp := client.idleSessions
928
929	now := time.Now()
930	start := now.Add(time.Duration(float64(sp.hc.interval) * 0.9))
931	// A second is added to avoid the edge case.
932	end := now.Add(time.Duration(float64(sp.hc.interval)*1.1) + time.Second)
933
934	s := &session{firstHCDone: true}
935	sp.hc.scheduledHCLocked(s)
936
937	if s.nextCheck.Before(start) || s.nextCheck.After(end) {
938		t.Fatalf("The non-first healthcheck schedule is not in the correct range: %v", s.nextCheck)
939	}
940}
941
942// Tests that a fractions of sessions are prepared for write by health checker.
943func TestWriteSessionsPrepared(t *testing.T) {
944	t.Parallel()
945	ctx := context.Background()
946	_, client, teardown := setupMockedTestServerWithConfig(t,
947		ClientConfig{
948			SessionPoolConfig: SessionPoolConfig{
949				WriteSessions:       0.5,
950				MaxIdle:             20,
951				HealthCheckInterval: time.Nanosecond,
952			},
953		})
954	defer teardown()
955	sp := client.idleSessions
956
957	shs := make([]*sessionHandle, 10)
958	var err error
959	for i := 0; i < 10; i++ {
960		shs[i], err = sp.take(ctx)
961		if err != nil {
962			t.Fatalf("cannot get session from session pool: %v", err)
963		}
964	}
965	// Now there are 10 sessions in the pool. Release them.
966	for _, sh := range shs {
967		sh.recycle()
968	}
969
970	// Take 5 write sessions. The write sessions will be taken from either the
971	// list of prepared sessions (idleWriteList), or they will be prepared
972	// during the takeWriteSession method.
973	wshs := make([]*sessionHandle, 5)
974	for i := 0; i < 5; i++ {
975		wshs[i], err = sp.takeWriteSession(ctx)
976		if err != nil {
977			t.Fatalf("cannot get session from session pool: %v", err)
978		}
979		if wshs[i].getTransactionID() == nil {
980			t.Fatalf("got nil transaction id from session pool")
981		}
982	}
983	// Return the session to the pool.
984	for _, sh := range wshs {
985		sh.recycle()
986	}
987
988	// Now force creation of 10 more sessions.
989	shs = make([]*sessionHandle, 20)
990	for i := 0; i < 20; i++ {
991		shs[i], err = sp.take(ctx)
992		if err != nil {
993			t.Fatalf("cannot get session from session pool: %v", err)
994		}
995	}
996
997	// Now there are 20 sessions in the pool. Release them.
998	for _, sh := range shs {
999		sh.recycle()
1000	}
1001	// The health checker should eventually prepare 10 of the 20 sessions with
1002	// a r/w tx.
1003	waitUntil := time.After(time.Second)
1004	var numWritePrepared int
1005	for numWritePrepared < 10 {
1006		select {
1007		case <-waitUntil:
1008			break
1009		default:
1010		}
1011		sp.mu.Lock()
1012		numWritePrepared = sp.idleWriteList.Len()
1013		sp.mu.Unlock()
1014	}
1015
1016	sp.mu.Lock()
1017	defer sp.mu.Unlock()
1018	if sp.idleWriteList.Len() != 10 {
1019		t.Fatalf("Expect 10 write prepared session, got: %d", sp.idleWriteList.Len())
1020	}
1021}
1022
1023// TestTakeFromWriteQueue tests that sessionPool.take() returns write prepared
1024// sessions as well.
1025func TestTakeFromWriteQueue(t *testing.T) {
1026	t.Parallel()
1027	ctx := context.Background()
1028	_, client, teardown := setupMockedTestServerWithConfig(t,
1029		ClientConfig{
1030			SessionPoolConfig: SessionPoolConfig{
1031				MaxOpened:           1,
1032				WriteSessions:       1.0,
1033				MaxIdle:             1,
1034				HealthCheckInterval: time.Nanosecond,
1035			},
1036		})
1037	defer teardown()
1038	sp := client.idleSessions
1039
1040	sh, err := sp.take(ctx)
1041	if err != nil {
1042		t.Fatalf("cannot get session from session pool: %v", err)
1043	}
1044	sh.recycle()
1045
1046	// Wait until the health checker has write-prepared the session.
1047	waitUntil := time.After(time.Second)
1048	var numWritePrepared int
1049	for numWritePrepared == 0 {
1050		select {
1051		case <-waitUntil:
1052			break
1053		default:
1054		}
1055		sp.mu.Lock()
1056		numWritePrepared = sp.idleWriteList.Len()
1057		sp.mu.Unlock()
1058	}
1059
1060	// The session should now be in write queue but take should also return it.
1061	sp.mu.Lock()
1062	if sp.idleWriteList.Len() == 0 {
1063		t.Fatalf("write queue unexpectedly empty")
1064	}
1065	if sp.idleList.Len() != 0 {
1066		t.Fatalf("read queue not empty")
1067	}
1068	sp.mu.Unlock()
1069	sh, err = sp.take(ctx)
1070	if err != nil {
1071		t.Fatalf("cannot get session from session pool: %v", err)
1072	}
1073	sh.recycle()
1074}
1075
1076// The session pool should stop trying to create write-prepared sessions if a
1077// non-transient error occurs while trying to begin a transaction. The
1078// process for preparing write sessions should automatically be re-enabled if
1079// a BeginTransaction call initiated by takeWriteSession succeeds.
1080//
1081// The only exception to the above is that a 'Session not found' error should
1082// cause the session to be removed from the session pool, and it should not
1083// affect the background process of preparing sessions.
1084func TestErrorOnPrepareSession(t *testing.T) {
1085	t.Parallel()
1086
1087	serverErrors := []error{
1088		status.Errorf(codes.PermissionDenied, "Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource"),
1089		status.Errorf(codes.NotFound, `Database not found: projects/<project>/instances/<instance>/databases/<database> resource_type: "type.googleapis.com/google.spanner.admin.database.v1.Database" resource_name: "projects/<project>/instances/<instance>/databases/<database>" description: "Database does not exist."`),
1090		status.Errorf(codes.FailedPrecondition, "Invalid transaction option"),
1091		status.Errorf(codes.Internal, "Unknown server error"),
1092	}
1093	logger := log.New(os.Stderr, "", log.LstdFlags)
1094	for _, serverErr := range serverErrors {
1095		ctx := context.Background()
1096		server, client, teardown := setupMockedTestServerWithConfig(t,
1097			ClientConfig{
1098				SessionPoolConfig: SessionPoolConfig{
1099					MinOpened:           10,
1100					MaxOpened:           10,
1101					WriteSessions:       0.5,
1102					HealthCheckInterval: time.Millisecond,
1103				},
1104				logger: logger,
1105			})
1106		defer teardown()
1107		// Discard logging until trying to prepare sessions has stopped.
1108		logger.SetOutput(ioutil.Discard)
1109		server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{
1110			Errors:    []error{serverErr},
1111			KeepError: true,
1112		})
1113		sp := client.idleSessions
1114
1115		// Wait until the health checker has tried to write-prepare a session.
1116		// This will cause the session pool to write some errors to the log that
1117		// preparing sessions failed.
1118		waitUntil := time.After(time.Second)
1119		var prepareDisabled bool
1120		var numOpened int
1121	waitForPrepare:
1122		for !prepareDisabled || numOpened < 10 {
1123			select {
1124			case <-waitUntil:
1125				break waitForPrepare
1126			default:
1127			}
1128			sp.mu.Lock()
1129			prepareDisabled = sp.disableBackgroundPrepareSessions
1130			numOpened = sp.idleList.Len()
1131			sp.mu.Unlock()
1132		}
1133		// Re-enable logging.
1134		logger.SetOutput(os.Stderr)
1135
1136		// There should be no write-prepared sessions.
1137		sp.mu.Lock()
1138		if sp.idleWriteList.Len() != 0 {
1139			sp.mu.Unlock()
1140			t.Fatalf("write queue unexpectedly not empty")
1141		}
1142		// All sessions should be in the read idle list.
1143		if g, w := sp.idleList.Len(), 10; g != w {
1144			sp.mu.Unlock()
1145			t.Fatalf("session count mismatch:\nWant: %v\nGot: %v", w, g)
1146		}
1147		sp.mu.Unlock()
1148		// Take a read session should succeed.
1149		sh, err := sp.take(ctx)
1150		if err != nil {
1151			t.Fatalf("cannot get session from session pool: %v", err)
1152		}
1153		sh.recycle()
1154		// Take a write session should fail with the server error.
1155		_, err = sp.takeWriteSession(ctx)
1156		if ErrCode(err) != ErrCode(serverErr) {
1157			t.Fatalf("take write session failed with unexpected error.\nGot: %v\nWant: %v\n", err, serverErr)
1158		}
1159
1160		// Clearing the error on the server should allow us to take a write
1161		// session.
1162		server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{})
1163		sh, err = sp.takeWriteSession(ctx)
1164		if err != nil {
1165			t.Fatalf("cannot get write session from session pool: %v", err)
1166		}
1167		sh.recycle()
1168		// The maintainer should also pick this up and prepare 50% of the sessions.
1169		waitUntil = time.After(time.Second)
1170		var numPrepared int
1171		for numPrepared < 5 {
1172			select {
1173			case <-waitUntil:
1174				break
1175			default:
1176			}
1177			sp.mu.Lock()
1178			numPrepared = sp.idleWriteList.Len()
1179			sp.mu.Unlock()
1180		}
1181		sp.mu.Lock()
1182		if g, w := sp.idleWriteList.Len(), 5; g != w {
1183			sp.mu.Unlock()
1184			t.Fatalf("write session count mismatch:\nWant: %v\nGot: %v", w, g)
1185		}
1186		sp.mu.Unlock()
1187	}
1188}
1189
1190// The session pool should continue to try to create write-prepared sessions if
1191// a 'Session not found' error occurs. The session that has been deleted by
1192// backend should be removed from the pool, and the maintainer should create a
1193// new session if this causes the number of sessions in the pool to fall below
1194// MinOpened.
1195func TestSessionNotFoundOnPrepareSession(t *testing.T) {
1196	t.Parallel()
1197
1198	// The server will return 'Session not found' for the first 8
1199	// BeginTransaction calls.
1200	sessionNotFoundErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")
1201	serverErrors := make([]error, 8)
1202	for i := range serverErrors {
1203		serverErrors[i] = sessionNotFoundErr
1204	}
1205	ctx := context.Background()
1206	logger := log.New(os.Stderr, "", log.LstdFlags)
1207	server, client, teardown := setupMockedTestServerWithConfig(t,
1208		ClientConfig{
1209			SessionPoolConfig: SessionPoolConfig{
1210				MinOpened:                 10,
1211				MaxOpened:                 10,
1212				WriteSessions:             0.5,
1213				HealthCheckInterval:       time.Millisecond,
1214				healthCheckSampleInterval: time.Millisecond,
1215			},
1216			logger: logger,
1217		})
1218	defer teardown()
1219	// Discard logging until trying to prepare sessions has stopped.
1220	logger.SetOutput(ioutil.Discard)
1221	server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{
1222		Errors: serverErrors,
1223	})
1224	sp := client.idleSessions
1225
1226	// Wait until the health checker has tried to write-prepare the sessions.
1227	waitUntil := time.After(5 * time.Second)
1228	var numWriteSessions int
1229	var numReadSessions int
1230waitForPrepare:
1231	for (numWriteSessions+numReadSessions) < 10 || numWriteSessions < 5 {
1232		select {
1233		case <-waitUntil:
1234			break waitForPrepare
1235		default:
1236		}
1237		sp.mu.Lock()
1238		numReadSessions = sp.idleList.Len()
1239		numWriteSessions = sp.idleWriteList.Len()
1240		sp.mu.Unlock()
1241	}
1242	// Re-enable logging.
1243	logger.SetOutput(os.Stderr)
1244
1245	// There should be at least 5 write-prepared sessions.
1246	sp.mu.Lock()
1247	if g, w := sp.idleWriteList.Len(), 5; g < w {
1248		sp.mu.Unlock()
1249		t.Fatalf("write-prepared session count mismatch.\nWant at least: %v\nGot: %v", w, g)
1250	}
1251	// The other sessions should be in the read idle list.
1252	if g, w := sp.idleList.Len()+sp.idleWriteList.Len(), 10; g != w {
1253		sp.mu.Unlock()
1254		t.Fatalf("total session count mismatch:\nWant: %v\nGot: %v", w, g)
1255	}
1256	sp.mu.Unlock()
1257	// Take a read session should succeed.
1258	sh, err := sp.take(ctx)
1259	if err != nil {
1260		t.Fatalf("cannot get session from session pool: %v", err)
1261	}
1262	sh.recycle()
1263	// Take a write session should succeed.
1264	sh, err = sp.takeWriteSession(ctx)
1265	if err != nil {
1266		t.Fatalf("take write session failed with unexpected error.\nGot: %v\nWant: %v\n", err, nil)
1267	}
1268	sh.recycle()
1269}
1270
1271// TestSessionHealthCheck tests healthchecking cases.
1272func TestSessionHealthCheck(t *testing.T) {
1273	t.Parallel()
1274	ctx := context.Background()
1275	server, client, teardown := setupMockedTestServerWithConfig(t,
1276		ClientConfig{
1277			SessionPoolConfig: SessionPoolConfig{
1278				HealthCheckInterval:       time.Nanosecond,
1279				healthCheckSampleInterval: 10 * time.Millisecond,
1280			},
1281		})
1282	defer teardown()
1283	sp := client.idleSessions
1284
1285	// Test pinging sessions.
1286	sh, err := sp.take(ctx)
1287	if err != nil {
1288		t.Fatalf("cannot get session from session pool: %v", err)
1289	}
1290
1291	// Wait for healthchecker to send pings to session.
1292	waitFor(t, func() error {
1293		pings := server.TestSpanner.DumpPings()
1294		if len(pings) == 0 || pings[0] != sh.getID() {
1295			return fmt.Errorf("healthchecker didn't send any ping to session %v", sh.getID())
1296		}
1297		return nil
1298	})
1299	// Test broken session detection.
1300	sh, err = sp.take(ctx)
1301	if err != nil {
1302		t.Fatalf("cannot get session from session pool: %v", err)
1303	}
1304
1305	server.TestSpanner.Freeze()
1306	server.TestSpanner.PutExecutionTime(MethodGetSession,
1307		SimulatedExecutionTime{
1308			Errors:    []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
1309			KeepError: true,
1310		})
1311	server.TestSpanner.Unfreeze()
1312
1313	s := sh.session
1314	waitFor(t, func() error {
1315		if sh.session.isValid() {
1316			return fmt.Errorf("session(%v) is still alive, want it to be dropped by healthcheck workers", s)
1317		}
1318		return nil
1319	})
1320
1321	server.TestSpanner.Freeze()
1322	server.TestSpanner.PutExecutionTime(MethodGetSession, SimulatedExecutionTime{})
1323	server.TestSpanner.Unfreeze()
1324
1325	// Test garbage collection.
1326	sh, err = sp.take(ctx)
1327	if err != nil {
1328		t.Fatalf("cannot get session from session pool: %v", err)
1329	}
1330	sp.close()
1331	if sh.session.isValid() {
1332		t.Fatalf("session(%v) is still alive, want it to be garbage collected", s)
1333	}
1334}
1335
1336// TestStressSessionPool does stress test on session pool by the following concurrent operations:
1337//	1) Test worker gets a session from the pool.
1338//	2) Test worker turns a session back into the pool.
1339//	3) Test worker destroys a session got from the pool.
1340//	4) Healthcheck destroys a broken session (because a worker has already destroyed it).
1341//	5) Test worker closes the session pool.
1342//
1343// During the test, the session pool maintainer maintains the number of sessions,
1344// and it is expected that all sessions that are taken from session pool remains valid.
1345// When all test workers and healthcheck workers exit, mockclient, session pool
1346// and healthchecker should be in consistent state.
1347func TestStressSessionPool(t *testing.T) {
1348	t.Parallel()
1349
1350	// Use concurrent workers to test different session pool built from different configurations.
1351	for ti, cfg := range []SessionPoolConfig{
1352		{},
1353		{MinOpened: 10, MaxOpened: 100},
1354		{MaxBurst: 50},
1355		{MinOpened: 10, MaxOpened: 200, MaxBurst: 5},
1356		{MinOpened: 10, MaxOpened: 200, MaxBurst: 5, WriteSessions: 0.2},
1357	} {
1358		// Create a more aggressive session healthchecker to increase test concurrency.
1359		cfg.HealthCheckInterval = 50 * time.Millisecond
1360		cfg.healthCheckSampleInterval = 10 * time.Millisecond
1361		cfg.HealthCheckWorkers = 50
1362
1363		server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
1364			SessionPoolConfig: cfg,
1365		})
1366		sp := client.idleSessions
1367
1368		// Create a test group for this configuration and schedule 100 sub
1369		// sub tests within the group.
1370		t.Run(fmt.Sprintf("TestStressSessionPoolGroup%v", ti), func(t *testing.T) {
1371			for i := 0; i < 100; i++ {
1372				idx := i
1373				t.Logf("TestStressSessionPoolWithCfg%dWorker%03d", ti, idx)
1374				testStressSessionPool(t, cfg, ti, idx, sp, client)
1375			}
1376		})
1377		sp.hc.close()
1378		// Here the states of healthchecker, session pool and mockclient are
1379		// stable.
1380		sp.mu.Lock()
1381		idleSessions := map[string]bool{}
1382		hcSessions := map[string]bool{}
1383		mockSessions := server.TestSpanner.DumpSessions()
1384		// Dump session pool's idle list.
1385		for sl := sp.idleList.Front(); sl != nil; sl = sl.Next() {
1386			s := sl.Value.(*session)
1387			if idleSessions[s.getID()] {
1388				t.Fatalf("%v: found duplicated session in idle list: %v", ti, s.getID())
1389			}
1390			idleSessions[s.getID()] = true
1391		}
1392		for sl := sp.idleWriteList.Front(); sl != nil; sl = sl.Next() {
1393			s := sl.Value.(*session)
1394			if idleSessions[s.getID()] {
1395				t.Fatalf("%v: found duplicated session in idle write list: %v", ti, s.getID())
1396			}
1397			idleSessions[s.getID()] = true
1398		}
1399		if int(sp.numOpened) != len(idleSessions) {
1400			t.Fatalf("%v: number of opened sessions (%v) != number of idle sessions (%v)", ti, sp.numOpened, len(idleSessions))
1401		}
1402		if sp.createReqs != 0 {
1403			t.Fatalf("%v: number of pending session creations = %v, want 0", ti, sp.createReqs)
1404		}
1405		// Dump healthcheck queue.
1406		for _, s := range sp.hc.queue.sessions {
1407			if hcSessions[s.getID()] {
1408				t.Fatalf("%v: found duplicated session in healthcheck queue: %v", ti, s.getID())
1409			}
1410			hcSessions[s.getID()] = true
1411		}
1412		sp.mu.Unlock()
1413
1414		// Verify that idleSessions == hcSessions == mockSessions.
1415		if !testEqual(idleSessions, hcSessions) {
1416			t.Fatalf("%v: sessions in idle list (%v) != sessions in healthcheck queue (%v)", ti, idleSessions, hcSessions)
1417		}
1418		// The server may contain more sessions than the health check queue.
1419		// This can be caused by a timeout client side during a CreateSession
1420		// request. The request may still be received and executed by the
1421		// server, but the session pool will not register the session.
1422		for id, b := range hcSessions {
1423			if b && !mockSessions[id] {
1424				t.Fatalf("%v: session in healthcheck queue (%v) was not found on server", ti, id)
1425			}
1426		}
1427		sp.close()
1428		mockSessions = server.TestSpanner.DumpSessions()
1429		for id, b := range hcSessions {
1430			if b && mockSessions[id] {
1431				t.Fatalf("Found session from pool still live on server: %v", id)
1432			}
1433		}
1434		teardown()
1435	}
1436}
1437
1438func testStressSessionPool(t *testing.T, cfg SessionPoolConfig, ti int, idx int, pool *sessionPool, client *Client) {
1439	ctx := context.Background()
1440	// Test worker iterates 1K times and tries different
1441	// session / session pool operations.
1442	for j := 0; j < 1000; j++ {
1443		if idx%10 == 0 && j >= 900 {
1444			// Close the pool in selected set of workers during the
1445			// middle of the test.
1446			pool.close()
1447		}
1448		// Take a write sessions ~ 20% of the times.
1449		takeWrite := rand.Intn(5) == 4
1450		var (
1451			sh     *sessionHandle
1452			gotErr error
1453		)
1454		wasValid := pool.isValid()
1455		if takeWrite {
1456			sh, gotErr = pool.takeWriteSession(ctx)
1457		} else {
1458			sh, gotErr = pool.take(ctx)
1459		}
1460		if gotErr != nil {
1461			if pool.isValid() {
1462				t.Fatalf("%v.%v: pool.take returns error when pool is still valid: %v", ti, idx, gotErr)
1463			}
1464			// If the session pool was closed when we tried to take a session
1465			// from the pool, then we should have gotten a specific error.
1466			// If the session pool was closed between the take() and now (or
1467			// even during a take()) then an error is ok.
1468			if !wasValid {
1469				if wantErr := errInvalidSessionPool; gotErr != wantErr {
1470					t.Fatalf("%v.%v: got error when pool is closed: %v, want %v", ti, idx, gotErr, wantErr)
1471				}
1472			}
1473			continue
1474		}
1475		// Verify if session is valid when session pool is valid.
1476		// Note that if session pool is invalid after sh is taken,
1477		// then sh might be invalidated by healthcheck workers.
1478		if (sh.getID() == "" || sh.session == nil || !sh.session.isValid()) && pool.isValid() {
1479			t.Fatalf("%v.%v.%v: pool.take returns invalid session %v", ti, idx, takeWrite, sh.session)
1480		}
1481		if takeWrite && sh.getTransactionID() == nil {
1482			t.Fatalf("%v.%v: pool.takeWriteSession returns session %v without transaction", ti, idx, sh.session)
1483		}
1484		if rand.Intn(100) < idx {
1485			// Random sleep before destroying/recycling the session,
1486			// to give healthcheck worker a chance to step in.
1487			<-time.After(time.Duration(rand.Int63n(int64(cfg.HealthCheckInterval))))
1488		}
1489		if rand.Intn(100) < idx {
1490			// destroy the session.
1491			sh.destroy()
1492			continue
1493		}
1494		// recycle the session.
1495		sh.recycle()
1496	}
1497}
1498
1499// TestMaintainer checks the session pool maintainer maintains the number of
1500// sessions in the following cases:
1501//
1502// 1. On initialization of session pool, replenish session pool to meet
1503//    MinOpened or MaxIdle.
1504// 2. On increased session usage, provision extra MaxIdle sessions.
1505// 3. After the surge passes, scale down the session pool accordingly.
1506func TestMaintainer(t *testing.T) {
1507	t.Parallel()
1508	ctx := context.Background()
1509
1510	minOpened := uint64(5)
1511	maxIdle := uint64(4)
1512	_, client, teardown := setupMockedTestServerWithConfig(t,
1513		ClientConfig{
1514			SessionPoolConfig: SessionPoolConfig{
1515				MinOpened:                 minOpened,
1516				MaxIdle:                   maxIdle,
1517				healthCheckSampleInterval: time.Millisecond,
1518			},
1519		})
1520	defer teardown()
1521	sp := client.idleSessions
1522
1523	waitFor(t, func() error {
1524		sp.mu.Lock()
1525		defer sp.mu.Unlock()
1526		if sp.numOpened != 5 {
1527			return fmt.Errorf("Replenish. Expect %d open, got %d", sp.MinOpened, sp.numOpened)
1528		}
1529		return nil
1530	})
1531
1532	// To save test time, we are not creating many sessions, because the time
1533	// to create sessions will have impact on the decision on sessionsToKeep.
1534	// We also parallelize the take and recycle process.
1535	shs := make([]*sessionHandle, 20)
1536	for i := 0; i < len(shs); i++ {
1537		var err error
1538		shs[i], err = sp.take(ctx)
1539		if err != nil {
1540			t.Fatalf("cannot get session from session pool: %v", err)
1541		}
1542	}
1543	sp.mu.Lock()
1544	if sp.numOpened != 20 {
1545		t.Fatalf("Scale out from normal use. Expect %d open, got %d", 20, sp.numOpened)
1546	}
1547	sp.mu.Unlock()
1548
1549	// Return 14 sessions to the pool. There are still 6 sessions checked out.
1550	for _, sh := range shs[:14] {
1551		sh.recycle()
1552	}
1553
1554	// The pool should scale down to sessionsInUse + MaxIdle = 6 + 4 = 10.
1555	waitFor(t, func() error {
1556		sp.mu.Lock()
1557		defer sp.mu.Unlock()
1558		if sp.numOpened != 10 {
1559			return fmt.Errorf("Keep extra MaxIdle sessions. Expect %d open, got %d", 10, sp.numOpened)
1560		}
1561		return nil
1562	})
1563
1564	// Return the remaining 6 sessions.
1565	// The pool should now scale down to minOpened + maxIdle.
1566	for _, sh := range shs[14:] {
1567		sh.recycle()
1568	}
1569	waitFor(t, func() error {
1570		sp.mu.Lock()
1571		defer sp.mu.Unlock()
1572		if sp.numOpened != minOpened {
1573			return fmt.Errorf("Scale down. Expect %d open, got %d", minOpened+maxIdle, sp.numOpened)
1574		}
1575		return nil
1576	})
1577}
1578
1579// Tests that the session pool creates up to MinOpened connections.
1580//
1581// Historical context: This test also checks that a low
1582// healthCheckSampleInterval does not prevent it from opening connections.
1583// The low healthCheckSampleInterval will however sometimes cause session
1584// creations to time out. That should not be considered a problem, but it
1585// could cause the test case to fail if it happens too often.
1586// See: https://github.com/googleapis/google-cloud-go/issues/1259
1587func TestInit_CreatesSessions(t *testing.T) {
1588	t.Parallel()
1589	spc := SessionPoolConfig{
1590		MinOpened:                 10,
1591		MaxIdle:                   10,
1592		WriteSessions:             0.0,
1593		healthCheckSampleInterval: 20 * time.Millisecond,
1594	}
1595	server, client, teardown := setupMockedTestServerWithConfig(t,
1596		ClientConfig{
1597			SessionPoolConfig: spc,
1598			NumChannels:       4,
1599		})
1600	defer teardown()
1601	sp := client.idleSessions
1602
1603	timeout := time.After(4 * time.Second)
1604	var numOpened int
1605loop:
1606	for {
1607		select {
1608		case <-timeout:
1609			t.Fatalf("timed out, got %d session(s), want %d", numOpened, spc.MinOpened)
1610		default:
1611			sp.mu.Lock()
1612			numOpened = sp.idleList.Len() + sp.idleWriteList.Len()
1613			sp.mu.Unlock()
1614			if numOpened == 10 {
1615				break loop
1616			}
1617		}
1618	}
1619	_, err := shouldHaveReceived(server.TestSpanner, []interface{}{
1620		&sppb.BatchCreateSessionsRequest{},
1621		&sppb.BatchCreateSessionsRequest{},
1622		&sppb.BatchCreateSessionsRequest{},
1623		&sppb.BatchCreateSessionsRequest{},
1624	})
1625	if err != nil {
1626		t.Fatal(err)
1627	}
1628}
1629
1630// Tests that the session pool with a MinSessions>0 also prepares WriteSessions
1631// sessions.
1632func TestInit_PreparesSessions(t *testing.T) {
1633	t.Parallel()
1634	spc := SessionPoolConfig{
1635		MinOpened:                 10,
1636		MaxIdle:                   10,
1637		WriteSessions:             0.5,
1638		healthCheckSampleInterval: 20 * time.Millisecond,
1639	}
1640	_, client, teardown := setupMockedTestServerWithConfig(t,
1641		ClientConfig{
1642			SessionPoolConfig: spc,
1643		})
1644	defer teardown()
1645	sp := client.idleSessions
1646
1647	timeoutAmt := 4 * time.Second
1648	timeout := time.After(timeoutAmt)
1649	var numPrepared int
1650	want := int(spc.WriteSessions * float64(spc.MinOpened))
1651loop:
1652	for {
1653		select {
1654		case <-timeout:
1655			t.Fatalf("timed out after %v, got %d write-prepared session(s), want %d", timeoutAmt, numPrepared, want)
1656		default:
1657			sp.mu.Lock()
1658			numPrepared = sp.idleWriteList.Len()
1659			sp.mu.Unlock()
1660			if numPrepared == want {
1661				break loop
1662			}
1663		}
1664	}
1665}
1666
1667func (s1 *session) Equal(s2 *session) bool {
1668	return s1.client == s2.client &&
1669		s1.id == s2.id &&
1670		s1.pool == s2.pool &&
1671		s1.createTime == s2.createTime &&
1672		s1.valid == s2.valid &&
1673		s1.hcIndex == s2.hcIndex &&
1674		s1.idleList == s2.idleList &&
1675		s1.nextCheck.Equal(s2.nextCheck) &&
1676		s1.checkingHealth == s2.checkingHealth &&
1677		testEqual(s1.md, s2.md) &&
1678		bytes.Equal(s1.tx, s2.tx)
1679}
1680
1681func waitFor(t *testing.T, assert func() error) {
1682	t.Helper()
1683	timeout := 15 * time.Second
1684	ta := time.After(timeout)
1685
1686	for {
1687		select {
1688		case <-ta:
1689			if err := assert(); err != nil {
1690				t.Fatalf("after %v waiting, got %v", timeout, err)
1691			}
1692			return
1693		default:
1694		}
1695
1696		if err := assert(); err != nil {
1697			// Fail. Let's pause and retry.
1698			time.Sleep(10 * time.Millisecond)
1699			continue
1700		}
1701
1702		return
1703	}
1704}
1705
1706// Tests that maintainer only deletes sessions after a full maintenance window
1707// of 10 cycles has finished.
1708func TestMaintainer_DeletesSessions(t *testing.T) {
1709	t.Parallel()
1710
1711	ctx := context.Background()
1712	const sampleInterval = time.Millisecond * 10
1713	_, client, teardown := setupMockedTestServerWithConfig(t,
1714		ClientConfig{
1715			SessionPoolConfig: SessionPoolConfig{healthCheckSampleInterval: sampleInterval},
1716		})
1717	defer teardown()
1718	sp := client.idleSessions
1719
1720	// Take two sessions from the pool.
1721	// This will cause max sessions in use to be 2 during this window.
1722	sh1 := takeSession(ctx, t, sp)
1723	sh2 := takeSession(ctx, t, sp)
1724	wantSessions := map[string]bool{}
1725	wantSessions[sh1.getID()] = true
1726	wantSessions[sh2.getID()] = true
1727	// Return the sessions to the pool and then assure that they
1728	// are not deleted while still within the maintenance window.
1729	sh1.recycle()
1730	sh2.recycle()
1731	// Wait for 20 milliseconds, i.e. approx 2 iterations of the
1732	// maintainer. The sessions should still be in the pool.
1733	<-time.After(sampleInterval * 2)
1734	sh3 := takeSession(ctx, t, sp)
1735	sh4 := takeSession(ctx, t, sp)
1736	// Check that the returned sessions are equal to the sessions that we got
1737	// the first time from the session pool.
1738	gotSessions := map[string]bool{}
1739	gotSessions[sh3.getID()] = true
1740	gotSessions[sh4.getID()] = true
1741	testEqual(wantSessions, gotSessions)
1742	// Return the sessions to the pool.
1743	sh3.recycle()
1744	sh4.recycle()
1745
1746	// Now wait for the maintenance window to finish. This will cause the
1747	// maintainer to enter a new window and reset the max number of sessions in
1748	// use to the currently number of checked out sessions. That is 0, as all
1749	// sessions have been returned to the pool. That again will cause the
1750	// maintainer to delete these sessions at the next iteration, unless we
1751	// checkout new sessions during the first iteration.
1752	waitFor(t, func() error {
1753		sp.mu.Lock()
1754		defer sp.mu.Unlock()
1755		if sp.numOpened > 0 {
1756			return fmt.Errorf("session pool still contains more than 0 sessions")
1757		}
1758		return nil
1759	})
1760	sh5 := takeSession(ctx, t, sp)
1761	sh6 := takeSession(ctx, t, sp)
1762	// Assure that these sessions are new sessions.
1763	if gotSessions[sh5.getID()] || gotSessions[sh6.getID()] {
1764		t.Fatal("got unexpected existing session from pool")
1765	}
1766}
1767
1768func takeSession(ctx context.Context, t *testing.T, sp *sessionPool) *sessionHandle {
1769	sh, err := sp.take(ctx)
1770	if err != nil {
1771		t.Fatalf("cannot get session from session pool: %v", err)
1772	}
1773	return sh
1774}
1775
1776func TestMaintenanceWindow_CycleAndUpdateMaxCheckedOut(t *testing.T) {
1777	t.Parallel()
1778
1779	maxOpened := uint64(1000)
1780	mw := newMaintenanceWindow(maxOpened)
1781	for _, m := range mw.maxSessionsCheckedOut {
1782		if m < maxOpened {
1783			t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, maxOpened)
1784		}
1785	}
1786	// Do one cycle and simulate that there are currently no sessions checked
1787	// out of the pool.
1788	mw.startNewCycle(0)
1789	if g, w := mw.maxSessionsCheckedOut[0], uint64(0); g != w {
1790		t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w)
1791	}
1792	for _, m := range mw.maxSessionsCheckedOut[1:] {
1793		if m < maxOpened {
1794			t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, maxOpened)
1795		}
1796	}
1797	// Check that the max checked out during the entire window is still
1798	// maxOpened.
1799	if g, w := mw.maxSessionsCheckedOutDuringWindow(), maxOpened; g != w {
1800		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1801	}
1802	// Update the max number checked out for the current cycle.
1803	mw.updateMaxSessionsCheckedOutDuringWindow(uint64(10))
1804	if g, w := mw.maxSessionsCheckedOut[0], uint64(10); g != w {
1805		t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w)
1806	}
1807	// The max of the entire window should still not change.
1808	if g, w := mw.maxSessionsCheckedOutDuringWindow(), maxOpened; g != w {
1809		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1810	}
1811	// Now pass enough cycles to complete a maintenance window. Each cycle has
1812	// no sessions checked out. We start at 1, as we have already passed one
1813	// cycle. This should then be the last cycle still in the maintenance
1814	// window, and the only one with a maxSessionsCheckedOut greater than 0.
1815	for i := 1; i < maintenanceWindowSize; i++ {
1816		mw.startNewCycle(0)
1817	}
1818	for _, m := range mw.maxSessionsCheckedOut[:9] {
1819		if m != 0 {
1820			t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, 0)
1821		}
1822	}
1823	// The oldest cycle in the window should have max=10.
1824	if g, w := mw.maxSessionsCheckedOut[maintenanceWindowSize-1], uint64(10); g != w {
1825		t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w)
1826	}
1827	// The max of the entire window should now be 10.
1828	if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(10); g != w {
1829		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1830	}
1831	// Do another cycle with max=0.
1832	mw.startNewCycle(0)
1833	// The max of the entire window should now be 0.
1834	if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(0); g != w {
1835		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1836	}
1837	// Do another cycle with 5 sessions as max. This should now be the new
1838	// window max.
1839	mw.startNewCycle(5)
1840	if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(5); g != w {
1841		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1842	}
1843	// Do a couple of cycles so that the only non-zero value is in the middle.
1844	// The max for the entire window should still be 5.
1845	for i := 0; i < maintenanceWindowSize/2; i++ {
1846		mw.startNewCycle(0)
1847	}
1848	if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(5); g != w {
1849		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1850	}
1851}
1852