1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"bytes"
21	"container/heap"
22	"context"
23	"fmt"
24	"io/ioutil"
25	"log"
26	"math/rand"
27	"os"
28	"strings"
29	"testing"
30	"time"
31
32	. "cloud.google.com/go/spanner/internal/testutil"
33	"google.golang.org/api/iterator"
34	"google.golang.org/genproto/googleapis/rpc/errdetails"
35	sppb "google.golang.org/genproto/googleapis/spanner/v1"
36	"google.golang.org/grpc/codes"
37	"google.golang.org/grpc/status"
38)
39
40func newSessionNotFoundError(name string) error {
41	s := status.Newf(codes.NotFound, "Session not found: Session with id %s not found", name)
42	s, _ = s.WithDetails(&errdetails.ResourceInfo{ResourceType: sessionResourceType, ResourceName: name})
43	return s.Err()
44}
45
46// TestSessionPoolConfigValidation tests session pool config validation.
47func TestSessionPoolConfigValidation(t *testing.T) {
48	t.Parallel()
49	_, client, teardown := setupMockedTestServer(t)
50	defer teardown()
51
52	for _, test := range []struct {
53		spc SessionPoolConfig
54		err error
55	}{
56		{
57			SessionPoolConfig{
58				MinOpened: 10,
59				MaxOpened: 5,
60			},
61			errMinOpenedGTMaxOpened(5, 10),
62		},
63		{
64			SessionPoolConfig{
65				WriteSessions: -0.1,
66			},
67			errWriteFractionOutOfRange(-0.1),
68		},
69		{
70			SessionPoolConfig{
71				WriteSessions: 2.0,
72			},
73			errWriteFractionOutOfRange(2.0),
74		},
75		{
76			SessionPoolConfig{
77				HealthCheckWorkers: -1,
78			},
79			errHealthCheckWorkersNegative(-1),
80		},
81		{
82			SessionPoolConfig{
83				HealthCheckInterval: -time.Second,
84			},
85			errHealthCheckIntervalNegative(-time.Second),
86		},
87	} {
88		if _, err := newSessionPool(client.sc, test.spc); !testEqual(err, test.err) {
89			t.Fatalf("want %v, got %v", test.err, err)
90		}
91	}
92}
93
94// TestSessionCreation tests session creation during sessionPool.Take().
95func TestSessionCreation(t *testing.T) {
96	t.Parallel()
97	ctx := context.Background()
98	server, client, teardown := setupMockedTestServer(t)
99	defer teardown()
100	sp := client.idleSessions
101
102	// Take three sessions from session pool, this should trigger session pool
103	// to create three new sessions.
104	shs := make([]*sessionHandle, 3)
105	// gotDs holds the unique sessions taken from session pool.
106	gotDs := map[string]bool{}
107	for i := 0; i < len(shs); i++ {
108		var err error
109		shs[i], err = sp.take(ctx)
110		if err != nil {
111			t.Fatalf("failed to get session(%v): %v", i, err)
112		}
113		gotDs[shs[i].getID()] = true
114	}
115	if len(gotDs) != len(shs) {
116		t.Fatalf("session pool created %v sessions, want %v", len(gotDs), len(shs))
117	}
118	if wantDs := server.TestSpanner.DumpSessions(); !testEqual(gotDs, wantDs) {
119		t.Fatalf("session pool creates sessions %v, want %v", gotDs, wantDs)
120	}
121	// Verify that created sessions are recorded correctly in session pool.
122	sp.mu.Lock()
123	if int(sp.numOpened) != len(shs) {
124		t.Fatalf("session pool reports %v open sessions, want %v", sp.numOpened, len(shs))
125	}
126	if sp.createReqs != 0 {
127		t.Fatalf("session pool reports %v session create requests, want 0", int(sp.createReqs))
128	}
129	sp.mu.Unlock()
130	// Verify that created sessions are tracked correctly by healthcheck queue.
131	hc := sp.hc
132	hc.mu.Lock()
133	if hc.queue.Len() != len(shs) {
134		t.Fatalf("healthcheck queue length = %v, want %v", hc.queue.Len(), len(shs))
135	}
136	for _, s := range hc.queue.sessions {
137		if !gotDs[s.getID()] {
138			t.Fatalf("session %v is in healthcheck queue, but it is not created by session pool", s.getID())
139		}
140	}
141	hc.mu.Unlock()
142}
143
144// TestLIFOSessionOrder tests if session pool hand out sessions in LIFO order.
145func TestLIFOSessionOrder(t *testing.T) {
146	t.Parallel()
147	ctx := context.Background()
148	_, client, teardown := setupMockedTestServerWithConfig(t,
149		ClientConfig{
150			SessionPoolConfig: SessionPoolConfig{
151				MaxOpened: 3,
152				MinOpened: 3,
153			},
154		})
155	defer teardown()
156	sp := client.idleSessions
157	// Create/take three sessions and recycle them.
158	shs, shsIDs := make([]*sessionHandle, 3), make([]string, 3)
159	for i := 0; i < len(shs); i++ {
160		var err error
161		if shs[i], err = sp.take(ctx); err != nil {
162			t.Fatalf("failed to take session(%v): %v", i, err)
163		}
164		shsIDs[i] = shs[i].getID()
165	}
166	for i := 0; i < len(shs); i++ {
167		shs[i].recycle()
168	}
169	for i := 2; i >= 0; i-- {
170		sh, err := sp.take(ctx)
171		if err != nil {
172			t.Fatalf("cannot take session from session pool: %v", err)
173		}
174		// check, if sessions returned in LIFO order.
175		if wantID, gotID := shsIDs[i], sh.getID(); wantID != gotID {
176			t.Fatalf("got session with id: %v, want: %v", gotID, wantID)
177		}
178	}
179}
180
181// TestLIFOTakeWriteSessionOrder tests if write session pool hand out sessions in LIFO order.
182func TestLIFOTakeWriteSessionOrder(t *testing.T) {
183	t.Skip("https://github.com/googleapis/google-cloud-go/issues/1704")
184	t.Parallel()
185	ctx := context.Background()
186	_, client, teardown := setupMockedTestServerWithConfig(t,
187		ClientConfig{
188			SessionPoolConfig: SessionPoolConfig{
189				MaxOpened:     3,
190				MinOpened:     3,
191				WriteSessions: 1,
192			},
193		})
194	defer teardown()
195	sp := client.idleSessions
196	// Create/take three sessions and recycle them.
197	shs, shsIDs := make([]*sessionHandle, 3), make([]string, 3)
198	for i := 0; i < len(shs); i++ {
199		var err error
200		if shs[i], err = sp.takeWriteSession(ctx); err != nil {
201			t.Fatalf("failed to take session(%v): %v", i, err)
202		}
203		shsIDs[i] = shs[i].getID()
204	}
205	for i := 0; i < len(shs); i++ {
206		shs[i].recycle()
207	}
208	for i := 2; i >= 0; i-- {
209		ws, err := sp.takeWriteSession(ctx)
210		if err != nil {
211			t.Fatalf("cannot take session from session pool: %v", err)
212		}
213		// check, if write sessions returned in LIFO order.
214		if wantID, gotID := shsIDs[i], ws.getID(); wantID != gotID {
215			t.Fatalf("got session with id: %v, want: %v", gotID, wantID)
216		}
217	}
218}
219
220// TestTakeFromIdleList tests taking sessions from session pool's idle list.
221func TestTakeFromIdleList(t *testing.T) {
222	t.Parallel()
223	ctx := context.Background()
224
225	// Make sure maintainer keeps the idle sessions.
226	server, client, teardown := setupMockedTestServerWithConfig(t,
227		ClientConfig{
228			SessionPoolConfig: SessionPoolConfig{MaxIdle: 10},
229		})
230	defer teardown()
231	sp := client.idleSessions
232
233	// Take ten sessions from session pool and recycle them.
234	shs := make([]*sessionHandle, 10)
235	for i := 0; i < len(shs); i++ {
236		var err error
237		shs[i], err = sp.take(ctx)
238		if err != nil {
239			t.Fatalf("failed to get session(%v): %v", i, err)
240		}
241	}
242	// Make sure it's sampled once before recycling, otherwise it will be
243	// cleaned up.
244	<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
245	for i := 0; i < len(shs); i++ {
246		shs[i].recycle()
247	}
248	// Further session requests from session pool won't cause mockclient to
249	// create more sessions.
250	wantSessions := server.TestSpanner.DumpSessions()
251	// Take ten sessions from session pool again, this time all sessions should
252	// come from idle list.
253	gotSessions := map[string]bool{}
254	for i := 0; i < len(shs); i++ {
255		sh, err := sp.take(ctx)
256		if err != nil {
257			t.Fatalf("cannot take session from session pool: %v", err)
258		}
259		gotSessions[sh.getID()] = true
260	}
261	if len(gotSessions) != 10 {
262		t.Fatalf("got %v unique sessions, want 10", len(gotSessions))
263	}
264	if !testEqual(gotSessions, wantSessions) {
265		t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions)
266	}
267}
268
269// TesttakeWriteSessionFromIdleList tests taking write sessions from session
270// pool's idle list.
271func TestTakeWriteSessionFromIdleList(t *testing.T) {
272	t.Parallel()
273	ctx := context.Background()
274
275	// Make sure maintainer keeps the idle sessions.
276	server, client, teardown := setupMockedTestServerWithConfig(t,
277		ClientConfig{
278			SessionPoolConfig: SessionPoolConfig{MaxIdle: 20},
279		})
280	defer teardown()
281	sp := client.idleSessions
282
283	// Take ten sessions from session pool and recycle them.
284	shs := make([]*sessionHandle, 10)
285	for i := 0; i < len(shs); i++ {
286		var err error
287		shs[i], err = sp.takeWriteSession(ctx)
288		if err != nil {
289			t.Fatalf("failed to get session(%v): %v", i, err)
290		}
291	}
292	// Make sure it's sampled once before recycling, otherwise it will be
293	// cleaned up.
294	<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
295	for i := 0; i < len(shs); i++ {
296		shs[i].recycle()
297	}
298	// Further session requests from session pool won't cause mockclient to
299	// create more sessions.
300	wantSessions := server.TestSpanner.DumpSessions()
301	// Take ten sessions from session pool again, this time all sessions should
302	// come from idle list.
303	gotSessions := map[string]bool{}
304	for i := 0; i < len(shs); i++ {
305		sh, err := sp.takeWriteSession(ctx)
306		if err != nil {
307			t.Fatalf("cannot take session from session pool: %v", err)
308		}
309		gotSessions[sh.getID()] = true
310	}
311	if len(gotSessions) != 10 {
312		t.Fatalf("got %v unique sessions, want 10", len(gotSessions))
313	}
314	if !testEqual(gotSessions, wantSessions) {
315		t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions)
316	}
317}
318
319// TestTakeFromIdleListChecked tests taking sessions from session pool's idle
320// list, but with a extra ping check.
321func TestTakeFromIdleListChecked(t *testing.T) {
322	t.Parallel()
323	ctx := context.Background()
324
325	// Make sure maintainer keeps the idle sessions.
326	server, client, teardown := setupMockedTestServerWithConfig(t,
327		ClientConfig{
328			SessionPoolConfig: SessionPoolConfig{
329				MaxIdle:                   1,
330				HealthCheckInterval:       50 * time.Millisecond,
331				healthCheckSampleInterval: 10 * time.Millisecond,
332			},
333		})
334	defer teardown()
335	sp := client.idleSessions
336
337	// Stop healthcheck workers to simulate slow pings.
338	sp.hc.close()
339
340	// Create a session and recycle it.
341	sh, err := sp.take(ctx)
342	if err != nil {
343		t.Fatalf("failed to get session: %v", err)
344	}
345
346	// Force ping during the first take() by setting check time to the past.
347	sh.session.nextCheck = time.Now().Add(-time.Minute)
348	wantSid := sh.getID()
349	sh.recycle()
350
351	// Two back-to-back session requests, both of them should return the same
352	// session created before, but only the first of them should trigger a session ping.
353	for i := 0; i < 2; i++ {
354		// Take the session from the idle list and recycle it.
355		sh, err = sp.take(ctx)
356		if err != nil {
357			t.Fatalf("%v - failed to get session: %v", i, err)
358		}
359		if gotSid := sh.getID(); gotSid != wantSid {
360			t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid)
361		}
362
363		// The two back-to-back session requests shouldn't trigger any session
364		// pings because sessionPool.Take reschedules the next healthcheck.
365		if got, want := server.TestSpanner.DumpPings(), ([]string{wantSid}); !testEqual(got, want) {
366			t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want)
367		}
368		sh.recycle()
369	}
370
371	// Inject session error to server stub, and take the session from the
372	// session pool, the old session should be destroyed and the session pool
373	// will create a new session.
374	server.TestSpanner.PutExecutionTime(MethodGetSession,
375		SimulatedExecutionTime{
376			Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
377		})
378
379	// Force ping by setting check time in the past.
380	s := sp.idleList.Front().Value.(*session)
381	s.nextCheck = time.Now().Add(-time.Minute)
382
383	// take will take the idle session. Then it will send a GetSession request
384	// to check if it's healthy. It'll discover that it's not healthy
385	// (NotFound), drop it, and create a new session.
386	sh, err = sp.take(ctx)
387	if err != nil {
388		t.Fatalf("failed to get session: %v", err)
389	}
390	ds := server.TestSpanner.DumpSessions()
391	if len(ds) != 1 {
392		t.Fatalf("dumped sessions from mockclient: %v, want %v", ds, sh.getID())
393	}
394	if sh.getID() == wantSid {
395		t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid)
396	}
397}
398
399// TestTakeFromIdleWriteListChecked tests taking sessions from session pool's
400// idle list, but with a extra ping check.
401func TestTakeFromIdleWriteListChecked(t *testing.T) {
402	t.Parallel()
403	ctx := context.Background()
404
405	// Make sure maintainer keeps the idle sessions.
406	server, client, teardown := setupMockedTestServerWithConfig(t,
407		ClientConfig{
408			SessionPoolConfig: SessionPoolConfig{
409				MaxIdle:                   1,
410				HealthCheckInterval:       50 * time.Millisecond,
411				healthCheckSampleInterval: 10 * time.Millisecond,
412			},
413		})
414	defer teardown()
415	sp := client.idleSessions
416
417	// Stop healthcheck workers to simulate slow pings.
418	sp.hc.close()
419
420	// Create a session and recycle it.
421	sh, err := sp.takeWriteSession(ctx)
422	if err != nil {
423		t.Fatalf("failed to get session: %v", err)
424	}
425	wantSid := sh.getID()
426	// Set the next check in the past to ensure the next take() call will
427	// trigger a health check.
428	sh.session.nextCheck = time.Now().Add(-time.Minute)
429	sh.recycle()
430
431	// Two back-to-back session requests, both of them should return the same
432	// session created before and only the first of them should trigger a
433	// session ping.
434	for i := 0; i < 2; i++ {
435		// Take the session from the idle list and recycle it.
436		sh, err = sp.takeWriteSession(ctx)
437		if err != nil {
438			t.Fatalf("%v - failed to get session: %v", i, err)
439		}
440		if gotSid := sh.getID(); gotSid != wantSid {
441			t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid)
442		}
443		// The two back-to-back session requests shouldn't trigger any session
444		// pings because sessionPool.Take reschedules the next healthcheck.
445		if got, want := server.TestSpanner.DumpPings(), ([]string{wantSid}); !testEqual(got, want) {
446			t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want)
447		}
448		sh.recycle()
449	}
450
451	// Inject session error to mockclient, and take the session from the
452	// session pool, the old session should be destroyed and the session pool
453	// will create a new session.
454	server.TestSpanner.PutExecutionTime(MethodGetSession,
455		SimulatedExecutionTime{
456			Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
457		})
458
459	// Force ping by setting check time in the past.
460	s := sp.idleList.Front().Value.(*session)
461	s.nextCheck = time.Now().Add(-time.Minute)
462
463	sh, err = sp.takeWriteSession(ctx)
464	if err != nil {
465		t.Fatalf("failed to get session: %v", err)
466	}
467	ds := server.TestSpanner.DumpSessions()
468	if len(ds) != 1 {
469		t.Fatalf("dumped sessions from mockclient: %v, want %v", ds, sh.getID())
470	}
471	if sh.getID() == wantSid {
472		t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid)
473	}
474}
475
476// TestSessionLeak tests leaking a session and getting the stack of the
477// goroutine that leaked it.
478func TestSessionLeak(t *testing.T) {
479	t.Parallel()
480	ctx := context.Background()
481
482	_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
483		SessionPoolConfig: SessionPoolConfig{
484			TrackSessionHandles: true,
485			MinOpened:           0,
486			MaxOpened:           1,
487		},
488	})
489	defer teardown()
490
491	// Execute a query without calling rowIterator.Stop. This will cause the
492	// session not to be returned to the pool.
493	single := client.Single()
494	iter := single.Query(ctx, NewStatement(SelectFooFromBar))
495	for {
496		_, err := iter.Next()
497		if err == iterator.Done {
498			break
499		}
500		if err != nil {
501			t.Fatalf("Got unexpected error while iterating results: %v\n", err)
502		}
503	}
504	// The session should not have been returned to the pool.
505	if g, w := client.idleSessions.idleList.Len(), 0; g != w {
506		t.Fatalf("Idle sessions count mismatch\nGot: %d\nWant: %d\n", g, w)
507	}
508	// The checked out session should contain a stack trace.
509	if single.sh.stack == nil {
510		t.Fatalf("Missing stacktrace from session handle")
511	}
512	stack := fmt.Sprintf("%s", single.sh.stack)
513	testMethod := "TestSessionLeak"
514	if !strings.Contains(stack, testMethod) {
515		t.Fatalf("Stacktrace does not contain '%s'\nGot: %s", testMethod, stack)
516	}
517	// Return the session to the pool.
518	iter.Stop()
519	// The stack should now have been removed from the session handle.
520	if single.sh.stack != nil {
521		t.Fatalf("Got unexpected stacktrace in session handle: %s", single.sh.stack)
522	}
523
524	// Do another query and hold on to the session.
525	single = client.Single()
526	iter = single.Query(ctx, NewStatement(SelectFooFromBar))
527	for {
528		_, err := iter.Next()
529		if err == iterator.Done {
530			break
531		}
532		if err != nil {
533			t.Fatalf("Got unexpected error while iterating results: %v\n", err)
534		}
535	}
536	// Try to do another query. This will fail as MaxOpened=1.
537	ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Millisecond*10)
538	defer cancel()
539	single2 := client.Single()
540	iter2 := single2.Query(ctxWithTimeout, NewStatement(SelectFooFromBar))
541	_, gotErr := iter2.Next()
542	wantErr := client.idleSessions.errGetSessionTimeoutWithTrackedSessionHandles()
543	// The error should contain the stacktraces of all the checked out
544	// sessions.
545	if !testEqual(gotErr, wantErr) {
546		t.Fatalf("Error mismatch on iterating result set.\nGot: %v\nWant: %v\n", gotErr, wantErr)
547	}
548	if !strings.Contains(gotErr.Error(), testMethod) {
549		t.Fatalf("Error does not contain '%s'\nGot: %s", testMethod, gotErr.Error())
550	}
551	// Close iterators to check sessions back into the pool before closing.
552	iter2.Stop()
553	iter.Stop()
554}
555
556// TestMaxOpenedSessions tests max open sessions constraint.
557func TestMaxOpenedSessions(t *testing.T) {
558	t.Parallel()
559	ctx := context.Background()
560	_, client, teardown := setupMockedTestServerWithConfig(t,
561		ClientConfig{
562			SessionPoolConfig: SessionPoolConfig{
563				MaxOpened: 1,
564			},
565		})
566	defer teardown()
567	sp := client.idleSessions
568
569	sh1, err := sp.take(ctx)
570	if err != nil {
571		t.Fatalf("cannot take session from session pool: %v", err)
572	}
573
574	// Session request will timeout due to the max open sessions constraint.
575	ctx2, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
576	defer cancel()
577	_, gotErr := sp.take(ctx2)
578	if wantErr := sp.errGetBasicSessionTimeout(); !testEqual(gotErr, wantErr) {
579		t.Fatalf("the second session retrival returns error %v, want %v", gotErr, wantErr)
580	}
581	doneWaiting := make(chan struct{})
582	go func() {
583		// Destroy the first session to allow the next session request to
584		// proceed.
585		<-doneWaiting
586		sh1.destroy()
587	}()
588
589	go func() {
590		// Wait a short random time before destroying the session handle.
591		<-time.After(10 * time.Millisecond)
592		close(doneWaiting)
593	}()
594	// Now session request can be processed because the first session will be
595	// destroyed.
596	ctx3, cancel := context.WithTimeout(ctx, time.Second)
597	defer cancel()
598	sh2, err := sp.take(ctx3)
599	if err != nil {
600		t.Fatalf("after the first session is destroyed, session retrival still returns error %v, want nil", err)
601	}
602	if !sh2.session.isValid() || sh2.getID() == "" {
603		t.Fatalf("got invalid session: %v", sh2.session)
604	}
605}
606
607// TestMinOpenedSessions tests min open session constraint.
608func TestMinOpenedSessions(t *testing.T) {
609	t.Parallel()
610	ctx := context.Background()
611	_, client, teardown := setupMockedTestServerWithConfig(t,
612		ClientConfig{
613			SessionPoolConfig: SessionPoolConfig{
614				MinOpened:                 1,
615				healthCheckSampleInterval: time.Millisecond,
616			},
617		})
618	defer teardown()
619	sp := client.idleSessions
620
621	// Take ten sessions from session pool and recycle them.
622	var ss []*session
623	var shs []*sessionHandle
624	for i := 0; i < 10; i++ {
625		sh, err := sp.take(ctx)
626		if err != nil {
627			t.Fatalf("failed to get session(%v): %v", i, err)
628		}
629		ss = append(ss, sh.session)
630		shs = append(shs, sh)
631		sh.recycle()
632	}
633	for _, sh := range shs {
634		sh.recycle()
635	}
636
637	// Simulate session expiration.
638	for _, s := range ss {
639		s.destroy(true)
640	}
641
642	// Wait until the maintainer has had a chance to replenish the pool.
643	for i := 0; i < 10; i++ {
644		sp.mu.Lock()
645		if sp.numOpened > 0 {
646			sp.mu.Unlock()
647			break
648		}
649		sp.mu.Unlock()
650		<-time.After(sp.healthCheckSampleInterval)
651	}
652	sp.mu.Lock()
653	defer sp.mu.Unlock()
654	// There should be still one session left in either the idle list or in one
655	// of the other opened states due to the min open sessions constraint.
656	if (sp.idleList.Len() +
657		sp.idleWriteList.Len() +
658		int(sp.prepareReqs) +
659		int(sp.createReqs)) != 1 {
660		t.Fatalf(
661			"got %v sessions in idle lists, want 1. Opened: %d, read: %d, "+
662				"write: %d, in preparation: %d, in creation: %d",
663			sp.idleList.Len()+sp.idleWriteList.Len(), sp.numOpened,
664			sp.idleList.Len(), sp.idleWriteList.Len(), sp.prepareReqs,
665			sp.createReqs)
666	}
667}
668
669// TestMaxBurst tests max burst constraint.
670func TestMaxBurst(t *testing.T) {
671	t.Parallel()
672	ctx := context.Background()
673	server, client, teardown := setupMockedTestServerWithConfig(t,
674		ClientConfig{
675			SessionPoolConfig: SessionPoolConfig{
676				MaxBurst: 1,
677			},
678		})
679	defer teardown()
680	sp := client.idleSessions
681
682	// Will cause session creation RPC to be retried forever.
683	server.TestSpanner.PutExecutionTime(MethodCreateSession,
684		SimulatedExecutionTime{
685			Errors:    []error{status.Errorf(codes.Unavailable, "try later")},
686			KeepError: true,
687		})
688
689	// This session request will never finish until the injected error is
690	// cleared.
691	go sp.take(ctx)
692
693	// Poll for the execution of the first session request.
694	for {
695		sp.mu.Lock()
696		cr := sp.createReqs
697		sp.mu.Unlock()
698		if cr == 0 {
699			<-time.After(time.Second)
700			continue
701		}
702		// The first session request is being executed.
703		break
704	}
705
706	ctx2, cancel := context.WithTimeout(ctx, time.Second)
707	defer cancel()
708	_, gotErr := sp.take(ctx2)
709
710	// Since MaxBurst == 1, the second session request should block.
711	if wantErr := sp.errGetBasicSessionTimeout(); !testEqual(gotErr, wantErr) {
712		t.Fatalf("session retrival returns error %v, want %v", gotErr, wantErr)
713	}
714
715	// Let the first session request succeed.
716	server.TestSpanner.Freeze()
717	server.TestSpanner.PutExecutionTime(MethodCreateSession, SimulatedExecutionTime{})
718	//close(allowRequests)
719	server.TestSpanner.Unfreeze()
720
721	// Now new session request can proceed because the first session request will eventually succeed.
722	sh, err := sp.take(ctx)
723	if err != nil {
724		t.Fatalf("session retrival returns error %v, want nil", err)
725	}
726	if !sh.session.isValid() || sh.getID() == "" {
727		t.Fatalf("got invalid session: %v", sh.session)
728	}
729}
730
731// TestSessionRecycle tests recycling sessions.
732func TestSessionRecycle(t *testing.T) {
733	t.Parallel()
734	ctx := context.Background()
735	// Set MaxBurst=MinOpened to prevent additional sessions to be created
736	// while session pool initialization is still running.
737	_, client, teardown := setupMockedTestServerWithConfig(t,
738		ClientConfig{
739			SessionPoolConfig: SessionPoolConfig{
740				MinOpened: 1,
741				MaxIdle:   5,
742				MaxBurst:  1,
743			},
744		})
745	defer teardown()
746	sp := client.idleSessions
747
748	// Test session is correctly recycled and reused.
749	for i := 0; i < 20; i++ {
750		s, err := sp.take(ctx)
751		if err != nil {
752			t.Fatalf("cannot get the session %v: %v", i, err)
753		}
754		s.recycle()
755	}
756
757	sp.mu.Lock()
758	defer sp.mu.Unlock()
759	// The session pool should only contain 1 session, as there is no minimum
760	// configured. In addition, there has never been more than one session in
761	// use at any time, so there's no need for the session pool to create a
762	// second session. The session has also been in use all the time, so there
763	// also no reason for the session pool to delete the session.
764	if sp.numOpened != 1 {
765		t.Fatalf("Expect session pool size 1, got %d", sp.numOpened)
766	}
767}
768
769// TestSessionDestroy tests destroying sessions.
770func TestSessionDestroy(t *testing.T) {
771	t.Parallel()
772	ctx := context.Background()
773	_, client, teardown := setupMockedTestServerWithConfig(t,
774		ClientConfig{
775			SessionPoolConfig: SessionPoolConfig{
776				MinOpened: 1,
777				MaxBurst:  1,
778			},
779		})
780	defer teardown()
781	sp := client.idleSessions
782
783	// Creating a session pool with MinSessions=1 will automatically start the
784	// creation of 1 session when the session pool is created. As MaxBurst=1,
785	// the session pool will never create more than 1 session at a time, so the
786	// take() method will wait if the initial session has not yet been created.
787	sh, err := sp.take(ctx)
788	if err != nil {
789		t.Fatalf("cannot get session from session pool: %v", err)
790	}
791	s := sh.session
792	sh.recycle()
793	if d := s.destroy(true); d || !s.isValid() {
794		// Session should be remaining because of min open sessions constraint.
795		t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d)
796	}
797	if d := s.destroy(false); !d || s.isValid() {
798		// Session should be destroyed.
799		t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d)
800	}
801}
802
803// TestHcHeap tests heap operation on top of hcHeap.
804func TestHcHeap(t *testing.T) {
805	in := []*session{
806		{nextCheck: time.Unix(10, 0)},
807		{nextCheck: time.Unix(0, 5)},
808		{nextCheck: time.Unix(1, 8)},
809		{nextCheck: time.Unix(11, 7)},
810		{nextCheck: time.Unix(6, 3)},
811	}
812	want := []*session{
813		{nextCheck: time.Unix(1, 8), hcIndex: 0},
814		{nextCheck: time.Unix(6, 3), hcIndex: 1},
815		{nextCheck: time.Unix(8, 2), hcIndex: 2},
816		{nextCheck: time.Unix(10, 0), hcIndex: 3},
817		{nextCheck: time.Unix(11, 7), hcIndex: 4},
818	}
819	hh := hcHeap{}
820	for _, s := range in {
821		heap.Push(&hh, s)
822	}
823	// Change top of the heap and do a adjustment.
824	hh.sessions[0].nextCheck = time.Unix(8, 2)
825	heap.Fix(&hh, 0)
826	for idx := 0; hh.Len() > 0; idx++ {
827		got := heap.Pop(&hh).(*session)
828		want[idx].hcIndex = -1
829		if !testEqual(got, want[idx]) {
830			t.Fatalf("%v: heap.Pop returns %v, want %v", idx, got, want[idx])
831		}
832	}
833}
834
835// TestHealthCheckScheduler tests if healthcheck workers can schedule and
836// perform healthchecks properly.
837func TestHealthCheckScheduler(t *testing.T) {
838	t.Parallel()
839	ctx := context.Background()
840	server, client, teardown := setupMockedTestServerWithConfig(t,
841		ClientConfig{
842			SessionPoolConfig: SessionPoolConfig{
843				HealthCheckInterval:       50 * time.Millisecond,
844				healthCheckSampleInterval: 10 * time.Millisecond,
845			},
846		})
847	defer teardown()
848	sp := client.idleSessions
849
850	// Create 50 sessions.
851	for i := 0; i < 50; i++ {
852		_, err := sp.take(ctx)
853		if err != nil {
854			t.Fatalf("cannot get session from session pool: %v", err)
855		}
856	}
857
858	// Make sure we start with a ping history to avoid that the first
859	// sessions that were created have not already exceeded the maximum
860	// number of pings.
861	server.TestSpanner.ClearPings()
862	// Wait for 10-30 pings per session.
863	waitFor(t, func() error {
864		// Only check actually live sessions and ignore any sessions the
865		// session pool may have deleted in the meantime.
866		liveSessions := server.TestSpanner.DumpSessions()
867		dp := server.TestSpanner.DumpPings()
868		gotPings := map[string]int64{}
869		for _, p := range dp {
870			gotPings[p]++
871		}
872		for s := range liveSessions {
873			want := int64(20)
874			if got := gotPings[s]; got < want/2 || got > want+want/2 {
875				// This is an unnacceptable amount of pings.
876				return fmt.Errorf("got %v healthchecks on session %v, want it between (%v, %v)", got, s, want/2, want+want/2)
877			}
878		}
879		return nil
880	})
881}
882
883// Tests that a fractions of sessions are prepared for write by health checker.
884func TestWriteSessionsPrepared(t *testing.T) {
885	t.Parallel()
886	ctx := context.Background()
887	_, client, teardown := setupMockedTestServerWithConfig(t,
888		ClientConfig{
889			SessionPoolConfig: SessionPoolConfig{
890				WriteSessions:       0.5,
891				MaxIdle:             20,
892				HealthCheckInterval: time.Nanosecond,
893			},
894		})
895	defer teardown()
896	sp := client.idleSessions
897
898	shs := make([]*sessionHandle, 10)
899	var err error
900	for i := 0; i < 10; i++ {
901		shs[i], err = sp.take(ctx)
902		if err != nil {
903			t.Fatalf("cannot get session from session pool: %v", err)
904		}
905	}
906	// Now there are 10 sessions in the pool. Release them.
907	for _, sh := range shs {
908		sh.recycle()
909	}
910
911	// Take 5 write sessions. The write sessions will be taken from either the
912	// list of prepared sessions (idleWriteList), or they will be prepared
913	// during the takeWriteSession method.
914	wshs := make([]*sessionHandle, 5)
915	for i := 0; i < 5; i++ {
916		wshs[i], err = sp.takeWriteSession(ctx)
917		if err != nil {
918			t.Fatalf("cannot get session from session pool: %v", err)
919		}
920		if wshs[i].getTransactionID() == nil {
921			t.Fatalf("got nil transaction id from session pool")
922		}
923	}
924	// Return the session to the pool.
925	for _, sh := range wshs {
926		sh.recycle()
927	}
928
929	// Now force creation of 10 more sessions.
930	shs = make([]*sessionHandle, 20)
931	for i := 0; i < 20; i++ {
932		shs[i], err = sp.take(ctx)
933		if err != nil {
934			t.Fatalf("cannot get session from session pool: %v", err)
935		}
936	}
937
938	// Now there are 20 sessions in the pool. Release them.
939	for _, sh := range shs {
940		sh.recycle()
941	}
942	// The health checker should eventually prepare 10 of the 20 sessions with
943	// a r/w tx.
944	waitUntil := time.After(time.Second)
945	var numWritePrepared int
946	for numWritePrepared < 10 {
947		select {
948		case <-waitUntil:
949			break
950		default:
951		}
952		sp.mu.Lock()
953		numWritePrepared = sp.idleWriteList.Len()
954		sp.mu.Unlock()
955	}
956
957	sp.mu.Lock()
958	defer sp.mu.Unlock()
959	if sp.idleWriteList.Len() != 10 {
960		t.Fatalf("Expect 10 write prepared session, got: %d", sp.idleWriteList.Len())
961	}
962}
963
964// TestTakeFromWriteQueue tests that sessionPool.take() returns write prepared
965// sessions as well.
966func TestTakeFromWriteQueue(t *testing.T) {
967	t.Parallel()
968	ctx := context.Background()
969	_, client, teardown := setupMockedTestServerWithConfig(t,
970		ClientConfig{
971			SessionPoolConfig: SessionPoolConfig{
972				MaxOpened:           1,
973				WriteSessions:       1.0,
974				MaxIdle:             1,
975				HealthCheckInterval: time.Nanosecond,
976			},
977		})
978	defer teardown()
979	sp := client.idleSessions
980
981	sh, err := sp.take(ctx)
982	if err != nil {
983		t.Fatalf("cannot get session from session pool: %v", err)
984	}
985	sh.recycle()
986
987	// Wait until the health checker has write-prepared the session.
988	waitUntil := time.After(time.Second)
989	var numWritePrepared int
990	for numWritePrepared == 0 {
991		select {
992		case <-waitUntil:
993			break
994		default:
995		}
996		sp.mu.Lock()
997		numWritePrepared = sp.idleWriteList.Len()
998		sp.mu.Unlock()
999	}
1000
1001	// The session should now be in write queue but take should also return it.
1002	sp.mu.Lock()
1003	if sp.idleWriteList.Len() == 0 {
1004		t.Fatalf("write queue unexpectedly empty")
1005	}
1006	if sp.idleList.Len() != 0 {
1007		t.Fatalf("read queue not empty")
1008	}
1009	sp.mu.Unlock()
1010	sh, err = sp.take(ctx)
1011	if err != nil {
1012		t.Fatalf("cannot get session from session pool: %v", err)
1013	}
1014	sh.recycle()
1015}
1016
1017// The session pool should stop trying to create write-prepared sessions if a
1018// non-transient error occurs while trying to begin a transaction. The
1019// process for preparing write sessions should automatically be re-enabled if
1020// a BeginTransaction call initiated by takeWriteSession succeeds.
1021//
1022// The only exception to the above is that a 'Session not found' error should
1023// cause the session to be removed from the session pool, and it should not
1024// affect the background process of preparing sessions.
1025func TestErrorOnPrepareSession(t *testing.T) {
1026	t.Parallel()
1027
1028	serverErrors := []error{
1029		status.Errorf(codes.PermissionDenied, "Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource"),
1030		status.Errorf(codes.NotFound, `Database not found: projects/<project>/instances/<instance>/databases/<database> resource_type: "type.googleapis.com/google.spanner.admin.database.v1.Database" resource_name: "projects/<project>/instances/<instance>/databases/<database>" description: "Database does not exist."`),
1031		status.Errorf(codes.FailedPrecondition, "Invalid transaction option"),
1032		status.Errorf(codes.Internal, "Unknown server error"),
1033	}
1034	logger := log.New(os.Stderr, "", log.LstdFlags)
1035	for _, serverErr := range serverErrors {
1036		ctx := context.Background()
1037		server, client, teardown := setupMockedTestServerWithConfig(t,
1038			ClientConfig{
1039				SessionPoolConfig: SessionPoolConfig{
1040					MinOpened:           10,
1041					MaxOpened:           10,
1042					WriteSessions:       0.5,
1043					HealthCheckInterval: time.Millisecond,
1044				},
1045				logger: logger,
1046			})
1047		defer teardown()
1048		// Discard logging until trying to prepare sessions has stopped.
1049		logger.SetOutput(ioutil.Discard)
1050		server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{
1051			Errors:    []error{serverErr},
1052			KeepError: true,
1053		})
1054		sp := client.idleSessions
1055
1056		// Wait until the health checker has tried to write-prepare a session.
1057		// This will cause the session pool to write some errors to the log that
1058		// preparing sessions failed.
1059		waitUntil := time.After(time.Second)
1060		var prepareDisabled bool
1061		var numOpened int
1062	waitForPrepare:
1063		for !prepareDisabled || numOpened < 10 {
1064			select {
1065			case <-waitUntil:
1066				break waitForPrepare
1067			default:
1068			}
1069			sp.mu.Lock()
1070			prepareDisabled = sp.disableBackgroundPrepareSessions
1071			numOpened = sp.idleList.Len()
1072			sp.mu.Unlock()
1073		}
1074		// Re-enable logging.
1075		logger.SetOutput(os.Stderr)
1076
1077		// There should be no write-prepared sessions.
1078		sp.mu.Lock()
1079		if sp.idleWriteList.Len() != 0 {
1080			sp.mu.Unlock()
1081			t.Fatalf("write queue unexpectedly not empty")
1082		}
1083		// All sessions should be in the read idle list.
1084		if g, w := sp.idleList.Len(), 10; g != w {
1085			sp.mu.Unlock()
1086			t.Fatalf("session count mismatch:\nWant: %v\nGot: %v", w, g)
1087		}
1088		sp.mu.Unlock()
1089		// Take a read session should succeed.
1090		sh, err := sp.take(ctx)
1091		if err != nil {
1092			t.Fatalf("cannot get session from session pool: %v", err)
1093		}
1094		sh.recycle()
1095		// Take a write session should fail with the server error.
1096		_, err = sp.takeWriteSession(ctx)
1097		if ErrCode(err) != ErrCode(serverErr) {
1098			t.Fatalf("take write session failed with unexpected error.\nGot: %v\nWant: %v\n", err, serverErr)
1099		}
1100
1101		// Clearing the error on the server should allow us to take a write
1102		// session.
1103		server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{})
1104		sh, err = sp.takeWriteSession(ctx)
1105		if err != nil {
1106			t.Fatalf("cannot get write session from session pool: %v", err)
1107		}
1108		sh.recycle()
1109		// The maintainer should also pick this up and prepare 50% of the sessions.
1110		waitUntil = time.After(time.Second)
1111		var numPrepared int
1112		for numPrepared < 5 {
1113			select {
1114			case <-waitUntil:
1115				break
1116			default:
1117			}
1118			sp.mu.Lock()
1119			numPrepared = sp.idleWriteList.Len()
1120			sp.mu.Unlock()
1121		}
1122		sp.mu.Lock()
1123		if g, w := sp.idleWriteList.Len(), 5; g != w {
1124			sp.mu.Unlock()
1125			t.Fatalf("write session count mismatch:\nWant: %v\nGot: %v", w, g)
1126		}
1127		sp.mu.Unlock()
1128	}
1129}
1130
1131// The session pool should continue to try to create write-prepared sessions if
1132// a 'Session not found' error occurs. The session that has been deleted by
1133// backend should be removed from the pool, and the maintainer should create a
1134// new session if this causes the number of sessions in the pool to fall below
1135// MinOpened.
1136func TestSessionNotFoundOnPrepareSession(t *testing.T) {
1137	t.Parallel()
1138
1139	// The server will return 'Session not found' for the first 8
1140	// BeginTransaction calls.
1141	sessionNotFoundErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")
1142	serverErrors := make([]error, 8)
1143	for i := range serverErrors {
1144		serverErrors[i] = sessionNotFoundErr
1145	}
1146	ctx := context.Background()
1147	logger := log.New(os.Stderr, "", log.LstdFlags)
1148	server, client, teardown := setupMockedTestServerWithConfig(t,
1149		ClientConfig{
1150			SessionPoolConfig: SessionPoolConfig{
1151				MinOpened:                 10,
1152				MaxOpened:                 10,
1153				WriteSessions:             0.5,
1154				HealthCheckInterval:       time.Millisecond,
1155				healthCheckSampleInterval: time.Millisecond,
1156			},
1157			logger: logger,
1158		})
1159	defer teardown()
1160	// Discard logging until trying to prepare sessions has stopped.
1161	logger.SetOutput(ioutil.Discard)
1162	server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{
1163		Errors: serverErrors,
1164	})
1165	sp := client.idleSessions
1166
1167	// Wait until the health checker has tried to write-prepare the sessions.
1168	waitUntil := time.After(5 * time.Second)
1169	var numWriteSessions int
1170	var numReadSessions int
1171waitForPrepare:
1172	for (numWriteSessions+numReadSessions) < 10 || numWriteSessions < 5 {
1173		select {
1174		case <-waitUntil:
1175			break waitForPrepare
1176		default:
1177		}
1178		sp.mu.Lock()
1179		numReadSessions = sp.idleList.Len()
1180		numWriteSessions = sp.idleWriteList.Len()
1181		sp.mu.Unlock()
1182	}
1183	// Re-enable logging.
1184	logger.SetOutput(os.Stderr)
1185
1186	// There should be at least 5 write-prepared sessions.
1187	sp.mu.Lock()
1188	if g, w := sp.idleWriteList.Len(), 5; g < w {
1189		sp.mu.Unlock()
1190		t.Fatalf("write-prepared session count mismatch.\nWant at least: %v\nGot: %v", w, g)
1191	}
1192	// The other sessions should be in the read idle list.
1193	if g, w := sp.idleList.Len()+sp.idleWriteList.Len(), 10; g != w {
1194		sp.mu.Unlock()
1195		t.Fatalf("total session count mismatch:\nWant: %v\nGot: %v", w, g)
1196	}
1197	sp.mu.Unlock()
1198	// Take a read session should succeed.
1199	sh, err := sp.take(ctx)
1200	if err != nil {
1201		t.Fatalf("cannot get session from session pool: %v", err)
1202	}
1203	sh.recycle()
1204	// Take a write session should succeed.
1205	sh, err = sp.takeWriteSession(ctx)
1206	if err != nil {
1207		t.Fatalf("take write session failed with unexpected error.\nGot: %v\nWant: %v\n", err, nil)
1208	}
1209	sh.recycle()
1210}
1211
1212// TestSessionHealthCheck tests healthchecking cases.
1213func TestSessionHealthCheck(t *testing.T) {
1214	t.Parallel()
1215	ctx := context.Background()
1216	server, client, teardown := setupMockedTestServerWithConfig(t,
1217		ClientConfig{
1218			SessionPoolConfig: SessionPoolConfig{
1219				HealthCheckInterval:       time.Nanosecond,
1220				healthCheckSampleInterval: 10 * time.Millisecond,
1221			},
1222		})
1223	defer teardown()
1224	sp := client.idleSessions
1225
1226	// Test pinging sessions.
1227	sh, err := sp.take(ctx)
1228	if err != nil {
1229		t.Fatalf("cannot get session from session pool: %v", err)
1230	}
1231
1232	// Wait for healthchecker to send pings to session.
1233	waitFor(t, func() error {
1234		pings := server.TestSpanner.DumpPings()
1235		if len(pings) == 0 || pings[0] != sh.getID() {
1236			return fmt.Errorf("healthchecker didn't send any ping to session %v", sh.getID())
1237		}
1238		return nil
1239	})
1240	// Test broken session detection.
1241	sh, err = sp.take(ctx)
1242	if err != nil {
1243		t.Fatalf("cannot get session from session pool: %v", err)
1244	}
1245
1246	server.TestSpanner.Freeze()
1247	server.TestSpanner.PutExecutionTime(MethodGetSession,
1248		SimulatedExecutionTime{
1249			Errors:    []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
1250			KeepError: true,
1251		})
1252	server.TestSpanner.Unfreeze()
1253
1254	s := sh.session
1255	waitFor(t, func() error {
1256		if sh.session.isValid() {
1257			return fmt.Errorf("session(%v) is still alive, want it to be dropped by healthcheck workers", s)
1258		}
1259		return nil
1260	})
1261
1262	server.TestSpanner.Freeze()
1263	server.TestSpanner.PutExecutionTime(MethodGetSession, SimulatedExecutionTime{})
1264	server.TestSpanner.Unfreeze()
1265
1266	// Test garbage collection.
1267	sh, err = sp.take(ctx)
1268	if err != nil {
1269		t.Fatalf("cannot get session from session pool: %v", err)
1270	}
1271	sp.close()
1272	if sh.session.isValid() {
1273		t.Fatalf("session(%v) is still alive, want it to be garbage collected", s)
1274	}
1275}
1276
1277// TestStressSessionPool does stress test on session pool by the following concurrent operations:
1278//	1) Test worker gets a session from the pool.
1279//	2) Test worker turns a session back into the pool.
1280//	3) Test worker destroys a session got from the pool.
1281//	4) Healthcheck destroys a broken session (because a worker has already destroyed it).
1282//	5) Test worker closes the session pool.
1283//
1284// During the test, the session pool maintainer maintains the number of sessions,
1285// and it is expected that all sessions that are taken from session pool remains valid.
1286// When all test workers and healthcheck workers exit, mockclient, session pool
1287// and healthchecker should be in consistent state.
1288func TestStressSessionPool(t *testing.T) {
1289	t.Parallel()
1290
1291	// Use concurrent workers to test different session pool built from different configurations.
1292	for ti, cfg := range []SessionPoolConfig{
1293		{},
1294		{MinOpened: 10, MaxOpened: 100},
1295		{MaxBurst: 50},
1296		{MinOpened: 10, MaxOpened: 200, MaxBurst: 5},
1297		{MinOpened: 10, MaxOpened: 200, MaxBurst: 5, WriteSessions: 0.2},
1298	} {
1299		// Create a more aggressive session healthchecker to increase test concurrency.
1300		cfg.HealthCheckInterval = 50 * time.Millisecond
1301		cfg.healthCheckSampleInterval = 10 * time.Millisecond
1302		cfg.HealthCheckWorkers = 50
1303
1304		server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
1305			SessionPoolConfig: cfg,
1306		})
1307		sp := client.idleSessions
1308
1309		// Create a test group for this configuration and schedule 100 sub
1310		// sub tests within the group.
1311		t.Run(fmt.Sprintf("TestStressSessionPoolGroup%v", ti), func(t *testing.T) {
1312			for i := 0; i < 100; i++ {
1313				idx := i
1314				t.Logf("TestStressSessionPoolWithCfg%dWorker%03d", ti, idx)
1315				testStressSessionPool(t, cfg, ti, idx, sp, client)
1316			}
1317		})
1318		sp.hc.close()
1319		// Here the states of healthchecker, session pool and mockclient are
1320		// stable.
1321		sp.mu.Lock()
1322		idleSessions := map[string]bool{}
1323		hcSessions := map[string]bool{}
1324		mockSessions := server.TestSpanner.DumpSessions()
1325		// Dump session pool's idle list.
1326		for sl := sp.idleList.Front(); sl != nil; sl = sl.Next() {
1327			s := sl.Value.(*session)
1328			if idleSessions[s.getID()] {
1329				t.Fatalf("%v: found duplicated session in idle list: %v", ti, s.getID())
1330			}
1331			idleSessions[s.getID()] = true
1332		}
1333		for sl := sp.idleWriteList.Front(); sl != nil; sl = sl.Next() {
1334			s := sl.Value.(*session)
1335			if idleSessions[s.getID()] {
1336				t.Fatalf("%v: found duplicated session in idle write list: %v", ti, s.getID())
1337			}
1338			idleSessions[s.getID()] = true
1339		}
1340		if int(sp.numOpened) != len(idleSessions) {
1341			t.Fatalf("%v: number of opened sessions (%v) != number of idle sessions (%v)", ti, sp.numOpened, len(idleSessions))
1342		}
1343		if sp.createReqs != 0 {
1344			t.Fatalf("%v: number of pending session creations = %v, want 0", ti, sp.createReqs)
1345		}
1346		// Dump healthcheck queue.
1347		for _, s := range sp.hc.queue.sessions {
1348			if hcSessions[s.getID()] {
1349				t.Fatalf("%v: found duplicated session in healthcheck queue: %v", ti, s.getID())
1350			}
1351			hcSessions[s.getID()] = true
1352		}
1353		sp.mu.Unlock()
1354
1355		// Verify that idleSessions == hcSessions == mockSessions.
1356		if !testEqual(idleSessions, hcSessions) {
1357			t.Fatalf("%v: sessions in idle list (%v) != sessions in healthcheck queue (%v)", ti, idleSessions, hcSessions)
1358		}
1359		// The server may contain more sessions than the health check queue.
1360		// This can be caused by a timeout client side during a CreateSession
1361		// request. The request may still be received and executed by the
1362		// server, but the session pool will not register the session.
1363		for id, b := range hcSessions {
1364			if b && !mockSessions[id] {
1365				t.Fatalf("%v: session in healthcheck queue (%v) was not found on server", ti, id)
1366			}
1367		}
1368		sp.close()
1369		mockSessions = server.TestSpanner.DumpSessions()
1370		for id, b := range hcSessions {
1371			if b && mockSessions[id] {
1372				t.Fatalf("Found session from pool still live on server: %v", id)
1373			}
1374		}
1375		teardown()
1376	}
1377}
1378
1379func testStressSessionPool(t *testing.T, cfg SessionPoolConfig, ti int, idx int, pool *sessionPool, client *Client) {
1380	ctx := context.Background()
1381	// Test worker iterates 1K times and tries different
1382	// session / session pool operations.
1383	for j := 0; j < 1000; j++ {
1384		if idx%10 == 0 && j >= 900 {
1385			// Close the pool in selected set of workers during the
1386			// middle of the test.
1387			pool.close()
1388		}
1389		// Take a write sessions ~ 20% of the times.
1390		takeWrite := rand.Intn(5) == 4
1391		var (
1392			sh     *sessionHandle
1393			gotErr error
1394		)
1395		wasValid := pool.isValid()
1396		if takeWrite {
1397			sh, gotErr = pool.takeWriteSession(ctx)
1398		} else {
1399			sh, gotErr = pool.take(ctx)
1400		}
1401		if gotErr != nil {
1402			if pool.isValid() {
1403				t.Fatalf("%v.%v: pool.take returns error when pool is still valid: %v", ti, idx, gotErr)
1404			}
1405			// If the session pool was closed when we tried to take a session
1406			// from the pool, then we should have gotten a specific error.
1407			// If the session pool was closed between the take() and now (or
1408			// even during a take()) then an error is ok.
1409			if !wasValid {
1410				if wantErr := errInvalidSessionPool; gotErr != wantErr {
1411					t.Fatalf("%v.%v: got error when pool is closed: %v, want %v", ti, idx, gotErr, wantErr)
1412				}
1413			}
1414			continue
1415		}
1416		// Verify if session is valid when session pool is valid.
1417		// Note that if session pool is invalid after sh is taken,
1418		// then sh might be invalidated by healthcheck workers.
1419		if (sh.getID() == "" || sh.session == nil || !sh.session.isValid()) && pool.isValid() {
1420			t.Fatalf("%v.%v.%v: pool.take returns invalid session %v", ti, idx, takeWrite, sh.session)
1421		}
1422		if takeWrite && sh.getTransactionID() == nil {
1423			t.Fatalf("%v.%v: pool.takeWriteSession returns session %v without transaction", ti, idx, sh.session)
1424		}
1425		if rand.Intn(100) < idx {
1426			// Random sleep before destroying/recycling the session,
1427			// to give healthcheck worker a chance to step in.
1428			<-time.After(time.Duration(rand.Int63n(int64(cfg.HealthCheckInterval))))
1429		}
1430		if rand.Intn(100) < idx {
1431			// destroy the session.
1432			sh.destroy()
1433			continue
1434		}
1435		// recycle the session.
1436		sh.recycle()
1437	}
1438}
1439
1440// TestMaintainer checks the session pool maintainer maintains the number of
1441// sessions in the following cases:
1442//
1443// 1. On initialization of session pool, replenish session pool to meet
1444//    MinOpened or MaxIdle.
1445// 2. On increased session usage, provision extra MaxIdle sessions.
1446// 3. After the surge passes, scale down the session pool accordingly.
1447func TestMaintainer(t *testing.T) {
1448	t.Parallel()
1449	ctx := context.Background()
1450
1451	minOpened := uint64(5)
1452	maxIdle := uint64(4)
1453	_, client, teardown := setupMockedTestServerWithConfig(t,
1454		ClientConfig{
1455			SessionPoolConfig: SessionPoolConfig{
1456				MinOpened:                 minOpened,
1457				MaxIdle:                   maxIdle,
1458				healthCheckSampleInterval: time.Millisecond,
1459			},
1460		})
1461	defer teardown()
1462	sp := client.idleSessions
1463
1464	waitFor(t, func() error {
1465		sp.mu.Lock()
1466		defer sp.mu.Unlock()
1467		if sp.numOpened != 5 {
1468			return fmt.Errorf("Replenish. Expect %d open, got %d", sp.MinOpened, sp.numOpened)
1469		}
1470		return nil
1471	})
1472
1473	// To save test time, we are not creating many sessions, because the time
1474	// to create sessions will have impact on the decision on sessionsToKeep.
1475	// We also parallelize the take and recycle process.
1476	shs := make([]*sessionHandle, 20)
1477	for i := 0; i < len(shs); i++ {
1478		var err error
1479		shs[i], err = sp.take(ctx)
1480		if err != nil {
1481			t.Fatalf("cannot get session from session pool: %v", err)
1482		}
1483	}
1484	sp.mu.Lock()
1485	if sp.numOpened != 20 {
1486		t.Fatalf("Scale out from normal use. Expect %d open, got %d", 20, sp.numOpened)
1487	}
1488	sp.mu.Unlock()
1489
1490	// Return 14 sessions to the pool. There are still 6 sessions checked out.
1491	for _, sh := range shs[:14] {
1492		sh.recycle()
1493	}
1494
1495	// The pool should scale down to sessionsInUse + MaxIdle = 6 + 4 = 10.
1496	waitFor(t, func() error {
1497		sp.mu.Lock()
1498		defer sp.mu.Unlock()
1499		if sp.numOpened != 10 {
1500			return fmt.Errorf("Keep extra MaxIdle sessions. Expect %d open, got %d", 10, sp.numOpened)
1501		}
1502		return nil
1503	})
1504
1505	// Return the remaining 6 sessions.
1506	// The pool should now scale down to minOpened + maxIdle.
1507	for _, sh := range shs[14:] {
1508		sh.recycle()
1509	}
1510	waitFor(t, func() error {
1511		sp.mu.Lock()
1512		defer sp.mu.Unlock()
1513		if sp.numOpened != minOpened {
1514			return fmt.Errorf("Scale down. Expect %d open, got %d", minOpened+maxIdle, sp.numOpened)
1515		}
1516		return nil
1517	})
1518}
1519
1520// Tests that the session pool creates up to MinOpened connections.
1521//
1522// Historical context: This test also checks that a low
1523// healthCheckSampleInterval does not prevent it from opening connections.
1524// The low healthCheckSampleInterval will however sometimes cause session
1525// creations to time out. That should not be considered a problem, but it
1526// could cause the test case to fail if it happens too often.
1527// See: https://github.com/googleapis/google-cloud-go/issues/1259
1528func TestInit_CreatesSessions(t *testing.T) {
1529	t.Parallel()
1530	spc := SessionPoolConfig{
1531		MinOpened:                 10,
1532		MaxIdle:                   10,
1533		WriteSessions:             0.0,
1534		healthCheckSampleInterval: 20 * time.Millisecond,
1535	}
1536	server, client, teardown := setupMockedTestServerWithConfig(t,
1537		ClientConfig{
1538			SessionPoolConfig: spc,
1539			NumChannels:       4,
1540		})
1541	defer teardown()
1542	sp := client.idleSessions
1543
1544	timeout := time.After(4 * time.Second)
1545	var numOpened int
1546loop:
1547	for {
1548		select {
1549		case <-timeout:
1550			t.Fatalf("timed out, got %d session(s), want %d", numOpened, spc.MinOpened)
1551		default:
1552			sp.mu.Lock()
1553			numOpened = sp.idleList.Len() + sp.idleWriteList.Len()
1554			sp.mu.Unlock()
1555			if numOpened == 10 {
1556				break loop
1557			}
1558		}
1559	}
1560	_, err := shouldHaveReceived(server.TestSpanner, []interface{}{
1561		&sppb.BatchCreateSessionsRequest{},
1562		&sppb.BatchCreateSessionsRequest{},
1563		&sppb.BatchCreateSessionsRequest{},
1564		&sppb.BatchCreateSessionsRequest{},
1565	})
1566	if err != nil {
1567		t.Fatal(err)
1568	}
1569}
1570
1571// Tests that the session pool with a MinSessions>0 also prepares WriteSessions
1572// sessions.
1573func TestInit_PreparesSessions(t *testing.T) {
1574	t.Parallel()
1575	spc := SessionPoolConfig{
1576		MinOpened:                 10,
1577		MaxIdle:                   10,
1578		WriteSessions:             0.5,
1579		healthCheckSampleInterval: 20 * time.Millisecond,
1580	}
1581	_, client, teardown := setupMockedTestServerWithConfig(t,
1582		ClientConfig{
1583			SessionPoolConfig: spc,
1584		})
1585	defer teardown()
1586	sp := client.idleSessions
1587
1588	timeoutAmt := 4 * time.Second
1589	timeout := time.After(timeoutAmt)
1590	var numPrepared int
1591	want := int(spc.WriteSessions * float64(spc.MinOpened))
1592loop:
1593	for {
1594		select {
1595		case <-timeout:
1596			t.Fatalf("timed out after %v, got %d write-prepared session(s), want %d", timeoutAmt, numPrepared, want)
1597		default:
1598			sp.mu.Lock()
1599			numPrepared = sp.idleWriteList.Len()
1600			sp.mu.Unlock()
1601			if numPrepared == want {
1602				break loop
1603			}
1604		}
1605	}
1606}
1607
1608func (s1 *session) Equal(s2 *session) bool {
1609	return s1.client == s2.client &&
1610		s1.id == s2.id &&
1611		s1.pool == s2.pool &&
1612		s1.createTime == s2.createTime &&
1613		s1.valid == s2.valid &&
1614		s1.hcIndex == s2.hcIndex &&
1615		s1.idleList == s2.idleList &&
1616		s1.nextCheck.Equal(s2.nextCheck) &&
1617		s1.checkingHealth == s2.checkingHealth &&
1618		testEqual(s1.md, s2.md) &&
1619		bytes.Equal(s1.tx, s2.tx)
1620}
1621
1622func waitFor(t *testing.T, assert func() error) {
1623	t.Helper()
1624	timeout := 15 * time.Second
1625	ta := time.After(timeout)
1626
1627	for {
1628		select {
1629		case <-ta:
1630			if err := assert(); err != nil {
1631				t.Fatalf("after %v waiting, got %v", timeout, err)
1632			}
1633			return
1634		default:
1635		}
1636
1637		if err := assert(); err != nil {
1638			// Fail. Let's pause and retry.
1639			time.Sleep(10 * time.Millisecond)
1640			continue
1641		}
1642
1643		return
1644	}
1645}
1646
1647// Tests that maintainer only deletes sessions after a full maintenance window
1648// of 10 cycles has finished.
1649func TestMaintainer_DeletesSessions(t *testing.T) {
1650	t.Parallel()
1651
1652	ctx := context.Background()
1653	const sampleInterval = time.Millisecond * 10
1654	_, client, teardown := setupMockedTestServerWithConfig(t,
1655		ClientConfig{
1656			SessionPoolConfig: SessionPoolConfig{healthCheckSampleInterval: sampleInterval},
1657		})
1658	defer teardown()
1659	sp := client.idleSessions
1660
1661	// Take two sessions from the pool.
1662	// This will cause max sessions in use to be 2 during this window.
1663	sh1 := takeSession(ctx, t, sp)
1664	sh2 := takeSession(ctx, t, sp)
1665	wantSessions := map[string]bool{}
1666	wantSessions[sh1.getID()] = true
1667	wantSessions[sh2.getID()] = true
1668	// Return the sessions to the pool and then assure that they
1669	// are not deleted while still within the maintenance window.
1670	sh1.recycle()
1671	sh2.recycle()
1672	// Wait for 20 milliseconds, i.e. approx 2 iterations of the
1673	// maintainer. The sessions should still be in the pool.
1674	<-time.After(sampleInterval * 2)
1675	sh3 := takeSession(ctx, t, sp)
1676	sh4 := takeSession(ctx, t, sp)
1677	// Check that the returned sessions are equal to the sessions that we got
1678	// the first time from the session pool.
1679	gotSessions := map[string]bool{}
1680	gotSessions[sh3.getID()] = true
1681	gotSessions[sh4.getID()] = true
1682	testEqual(wantSessions, gotSessions)
1683	// Return the sessions to the pool.
1684	sh3.recycle()
1685	sh4.recycle()
1686
1687	// Now wait for the maintenance window to finish. This will cause the
1688	// maintainer to enter a new window and reset the max number of sessions in
1689	// use to the currently number of checked out sessions. That is 0, as all
1690	// sessions have been returned to the pool. That again will cause the
1691	// maintainer to delete these sessions at the next iteration, unless we
1692	// checkout new sessions during the first iteration.
1693	waitFor(t, func() error {
1694		sp.mu.Lock()
1695		defer sp.mu.Unlock()
1696		if sp.numOpened > 0 {
1697			return fmt.Errorf("session pool still contains more than 0 sessions")
1698		}
1699		return nil
1700	})
1701	sh5 := takeSession(ctx, t, sp)
1702	sh6 := takeSession(ctx, t, sp)
1703	// Assure that these sessions are new sessions.
1704	if gotSessions[sh5.getID()] || gotSessions[sh6.getID()] {
1705		t.Fatal("got unexpected existing session from pool")
1706	}
1707}
1708
1709func takeSession(ctx context.Context, t *testing.T, sp *sessionPool) *sessionHandle {
1710	sh, err := sp.take(ctx)
1711	if err != nil {
1712		t.Fatalf("cannot get session from session pool: %v", err)
1713	}
1714	return sh
1715}
1716
1717func TestMaintenanceWindow_CycleAndUpdateMaxCheckedOut(t *testing.T) {
1718	t.Parallel()
1719
1720	maxOpened := uint64(1000)
1721	mw := newMaintenanceWindow(maxOpened)
1722	for _, m := range mw.maxSessionsCheckedOut {
1723		if m < maxOpened {
1724			t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, maxOpened)
1725		}
1726	}
1727	// Do one cycle and simulate that there are currently no sessions checked
1728	// out of the pool.
1729	mw.startNewCycle(0)
1730	if g, w := mw.maxSessionsCheckedOut[0], uint64(0); g != w {
1731		t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w)
1732	}
1733	for _, m := range mw.maxSessionsCheckedOut[1:] {
1734		if m < maxOpened {
1735			t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, maxOpened)
1736		}
1737	}
1738	// Check that the max checked out during the entire window is still
1739	// maxOpened.
1740	if g, w := mw.maxSessionsCheckedOutDuringWindow(), maxOpened; g != w {
1741		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1742	}
1743	// Update the max number checked out for the current cycle.
1744	mw.updateMaxSessionsCheckedOutDuringWindow(uint64(10))
1745	if g, w := mw.maxSessionsCheckedOut[0], uint64(10); g != w {
1746		t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w)
1747	}
1748	// The max of the entire window should still not change.
1749	if g, w := mw.maxSessionsCheckedOutDuringWindow(), maxOpened; g != w {
1750		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1751	}
1752	// Now pass enough cycles to complete a maintenance window. Each cycle has
1753	// no sessions checked out. We start at 1, as we have already passed one
1754	// cycle. This should then be the last cycle still in the maintenance
1755	// window, and the only one with a maxSessionsCheckedOut greater than 0.
1756	for i := 1; i < maintenanceWindowSize; i++ {
1757		mw.startNewCycle(0)
1758	}
1759	for _, m := range mw.maxSessionsCheckedOut[:9] {
1760		if m != 0 {
1761			t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, 0)
1762		}
1763	}
1764	// The oldest cycle in the window should have max=10.
1765	if g, w := mw.maxSessionsCheckedOut[maintenanceWindowSize-1], uint64(10); g != w {
1766		t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w)
1767	}
1768	// The max of the entire window should now be 10.
1769	if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(10); g != w {
1770		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1771	}
1772	// Do another cycle with max=0.
1773	mw.startNewCycle(0)
1774	// The max of the entire window should now be 0.
1775	if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(0); g != w {
1776		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1777	}
1778	// Do another cycle with 5 sessions as max. This should now be the new
1779	// window max.
1780	mw.startNewCycle(5)
1781	if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(5); g != w {
1782		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1783	}
1784	// Do a couple of cycles so that the only non-zero value is in the middle.
1785	// The max for the entire window should still be 5.
1786	for i := 0; i < maintenanceWindowSize/2; i++ {
1787		mw.startNewCycle(0)
1788	}
1789	if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(5); g != w {
1790		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1791	}
1792}
1793