1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"bytes"
21	"container/heap"
22	"context"
23	"fmt"
24	"io/ioutil"
25	"log"
26	"math/rand"
27	"os"
28	"reflect"
29	"strings"
30	"testing"
31	"time"
32
33	. "cloud.google.com/go/spanner/internal/testutil"
34	"google.golang.org/api/iterator"
35	"google.golang.org/genproto/googleapis/rpc/errdetails"
36	sppb "google.golang.org/genproto/googleapis/spanner/v1"
37	"google.golang.org/grpc/codes"
38	"google.golang.org/grpc/status"
39)
40
41func newSessionNotFoundError(name string) error {
42	s := status.Newf(codes.NotFound, "Session not found: Session with id %s not found", name)
43	s, _ = s.WithDetails(&errdetails.ResourceInfo{ResourceType: sessionResourceType, ResourceName: name})
44	return s.Err()
45}
46
47// TestSessionPoolConfigValidation tests session pool config validation.
48func TestSessionPoolConfigValidation(t *testing.T) {
49	t.Parallel()
50	_, client, teardown := setupMockedTestServer(t)
51	defer teardown()
52
53	for _, test := range []struct {
54		spc SessionPoolConfig
55		err error
56	}{
57		{
58			SessionPoolConfig{
59				MinOpened: 10,
60				MaxOpened: 5,
61			},
62			errMinOpenedGTMaxOpened(5, 10),
63		},
64		{
65			SessionPoolConfig{
66				WriteSessions: -0.1,
67			},
68			errWriteFractionOutOfRange(-0.1),
69		},
70		{
71			SessionPoolConfig{
72				WriteSessions: 2.0,
73			},
74			errWriteFractionOutOfRange(2.0),
75		},
76		{
77			SessionPoolConfig{
78				HealthCheckWorkers: -1,
79			},
80			errHealthCheckWorkersNegative(-1),
81		},
82		{
83			SessionPoolConfig{
84				HealthCheckInterval: -time.Second,
85			},
86			errHealthCheckIntervalNegative(-time.Second),
87		},
88	} {
89		if _, err := newSessionPool(client.sc, test.spc); !testEqual(err, test.err) {
90			t.Fatalf("want %v, got %v", test.err, err)
91		}
92	}
93}
94
95// TestSessionCreation tests session creation during sessionPool.Take().
96func TestSessionCreation(t *testing.T) {
97	t.Parallel()
98	ctx := context.Background()
99	_, client, teardown := setupMockedTestServer(t)
100	defer teardown()
101	sp := client.idleSessions
102
103	// Take three sessions from session pool, this should trigger session pool
104	// to create SessionPoolConfig.incStep new sessions.
105	shs := make([]*sessionHandle, 3)
106	for i := 0; i < len(shs); i++ {
107		var err error
108		shs[i], err = sp.take(ctx)
109		if err != nil {
110			t.Fatalf("failed to get session(%v): %v", i, err)
111		}
112	}
113	// Wait until session creation has seized.
114	timeout := time.After(4 * time.Second)
115	var numBeingCreated uint64
116loop:
117	for {
118		sp.mu.Lock()
119		numBeingCreated = sp.createReqs
120		sp.mu.Unlock()
121		select {
122		case <-timeout:
123			t.Fatalf("timed out, still %d session(s) being created, want %d", numBeingCreated, 0)
124		default:
125			if numBeingCreated == 0 {
126				break loop
127			}
128		}
129	}
130	for _, sh := range shs {
131		if _, err := sh.getClient().GetSession(context.Background(), &sppb.GetSessionRequest{
132			Name: sh.getID(),
133		}); err != nil {
134			t.Fatalf("error getting expected session from server: %v", err)
135		}
136	}
137	// Verify that created sessions are recorded correctly in session pool.
138	sp.mu.Lock()
139	if sp.numOpened != sp.incStep {
140		t.Fatalf("session pool reports %v open sessions, want %v", sp.numOpened, sp.incStep)
141	}
142	if sp.createReqs != 0 {
143		t.Fatalf("session pool reports %v session create requests, want 0", int(sp.createReqs))
144	}
145	sp.mu.Unlock()
146	// Verify that created sessions are tracked correctly by healthcheck queue.
147	hc := sp.hc
148	hc.mu.Lock()
149	if uint64(hc.queue.Len()) != sp.incStep {
150		t.Fatalf("healthcheck queue length = %v, want %v", hc.queue.Len(), sp.incStep)
151	}
152	hc.mu.Unlock()
153}
154
155// TestLIFOSessionOrder tests if session pool hand out sessions in LIFO order.
156func TestLIFOSessionOrder(t *testing.T) {
157	t.Parallel()
158	ctx := context.Background()
159	_, client, teardown := setupMockedTestServerWithConfig(t,
160		ClientConfig{
161			SessionPoolConfig: SessionPoolConfig{
162				MaxOpened: 3,
163				MinOpened: 3,
164			},
165		})
166	defer teardown()
167	sp := client.idleSessions
168	// Create/take three sessions and recycle them.
169	shs, shsIDs := make([]*sessionHandle, 3), make([]string, 3)
170	for i := 0; i < len(shs); i++ {
171		var err error
172		if shs[i], err = sp.take(ctx); err != nil {
173			t.Fatalf("failed to take session(%v): %v", i, err)
174		}
175		shsIDs[i] = shs[i].getID()
176	}
177	for i := 0; i < len(shs); i++ {
178		shs[i].recycle()
179	}
180	for i := 2; i >= 0; i-- {
181		sh, err := sp.take(ctx)
182		if err != nil {
183			t.Fatalf("cannot take session from session pool: %v", err)
184		}
185		// check, if sessions returned in LIFO order.
186		if wantID, gotID := shsIDs[i], sh.getID(); wantID != gotID {
187			t.Fatalf("got session with id: %v, want: %v", gotID, wantID)
188		}
189	}
190}
191
192// TestLIFOTakeWriteSessionOrder tests if write session pool hand out sessions in LIFO order.
193func TestLIFOTakeWriteSessionOrder(t *testing.T) {
194	t.Parallel()
195	ctx := context.Background()
196	_, client, teardown := setupMockedTestServerWithConfig(t,
197		ClientConfig{
198			SessionPoolConfig: SessionPoolConfig{
199				MaxOpened: 3,
200				MinOpened: 3,
201				// Set the write fraction to 0 to ensure the write sessions are
202				// only created on demand, which will guarantee the exact order
203				// in which we receive the sessions.
204				WriteSessions: 0,
205			},
206		})
207	defer teardown()
208	sp := client.idleSessions
209	// Create/take three sessions and recycle them.
210	shs, shsIDs := make([]*sessionHandle, 3), make([]string, 3)
211	for i := 0; i < len(shs); i++ {
212		var err error
213		if shs[i], err = sp.takeWriteSession(ctx); err != nil {
214			t.Fatalf("failed to take session(%v): %v", i, err)
215		}
216		shsIDs[i] = shs[i].getID()
217	}
218	for i := 0; i < len(shs); i++ {
219		shs[i].recycle()
220	}
221	for i := 2; i >= 0; i-- {
222		ws, err := sp.takeWriteSession(ctx)
223		if err != nil {
224			t.Fatalf("cannot take session from session pool: %v", err)
225		}
226		// check, if write sessions returned in LIFO order.
227		if wantID, gotID := shsIDs[i], ws.getID(); wantID != gotID {
228			t.Fatalf("got session with id: %v, want: %v", gotID, wantID)
229		}
230	}
231}
232
233// TestTakeFromIdleList tests taking sessions from session pool's idle list.
234func TestTakeFromIdleList(t *testing.T) {
235	t.Parallel()
236	ctx := context.Background()
237
238	// Make sure maintainer keeps the idle sessions.
239	server, client, teardown := setupMockedTestServerWithConfig(t,
240		ClientConfig{
241			SessionPoolConfig: SessionPoolConfig{MaxIdle: 10, MaxOpened: 10},
242		})
243	defer teardown()
244	sp := client.idleSessions
245
246	// Take ten sessions from session pool and recycle them.
247	shs := make([]*sessionHandle, 10)
248	for i := 0; i < len(shs); i++ {
249		var err error
250		shs[i], err = sp.take(ctx)
251		if err != nil {
252			t.Fatalf("failed to get session(%v): %v", i, err)
253		}
254	}
255	// Make sure it's sampled once before recycling, otherwise it will be
256	// cleaned up.
257	<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
258	for i := 0; i < len(shs); i++ {
259		shs[i].recycle()
260	}
261	// Further session requests from session pool won't cause mockclient to
262	// create more sessions.
263	wantSessions := server.TestSpanner.DumpSessions()
264	// Take ten sessions from session pool again, this time all sessions should
265	// come from idle list.
266	gotSessions := map[string]bool{}
267	for i := 0; i < len(shs); i++ {
268		sh, err := sp.take(ctx)
269		if err != nil {
270			t.Fatalf("cannot take session from session pool: %v", err)
271		}
272		gotSessions[sh.getID()] = true
273	}
274	if len(gotSessions) != 10 {
275		t.Fatalf("got %v unique sessions, want 10", len(gotSessions))
276	}
277	if !testEqual(gotSessions, wantSessions) {
278		t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions)
279	}
280}
281
282// TesttakeWriteSessionFromIdleList tests taking write sessions from session
283// pool's idle list.
284func TestTakeWriteSessionFromIdleList(t *testing.T) {
285	t.Parallel()
286	ctx := context.Background()
287
288	// Make sure maintainer keeps the idle sessions.
289	server, client, teardown := setupMockedTestServerWithConfig(t,
290		ClientConfig{
291			SessionPoolConfig: SessionPoolConfig{MaxIdle: 10, MaxOpened: 10},
292		})
293	defer teardown()
294	sp := client.idleSessions
295
296	// Take ten sessions from session pool and recycle them.
297	shs := make([]*sessionHandle, 10)
298	for i := 0; i < len(shs); i++ {
299		var err error
300		shs[i], err = sp.takeWriteSession(ctx)
301		if err != nil {
302			t.Fatalf("failed to get session(%v): %v", i, err)
303		}
304	}
305	// Make sure it's sampled once before recycling, otherwise it will be
306	// cleaned up.
307	<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
308	for i := 0; i < len(shs); i++ {
309		shs[i].recycle()
310	}
311	// Further session requests from session pool won't cause mockclient to
312	// create more sessions.
313	wantSessions := server.TestSpanner.DumpSessions()
314	// Take ten sessions from session pool again, this time all sessions should
315	// come from idle list.
316	gotSessions := map[string]bool{}
317	for i := 0; i < len(shs); i++ {
318		sh, err := sp.takeWriteSession(ctx)
319		if err != nil {
320			t.Fatalf("cannot take session from session pool: %v", err)
321		}
322		gotSessions[sh.getID()] = true
323	}
324	if len(gotSessions) != 10 {
325		t.Fatalf("got %v unique sessions, want 10", len(gotSessions))
326	}
327	if !testEqual(gotSessions, wantSessions) {
328		t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions)
329	}
330}
331
332// TestTakeFromIdleListChecked tests taking sessions from session pool's idle
333// list, but with a extra ping check.
334func TestTakeFromIdleListChecked(t *testing.T) {
335	t.Parallel()
336	ctx := context.Background()
337
338	// Make sure maintainer keeps the idle sessions.
339	server, client, teardown := setupMockedTestServerWithConfig(t,
340		ClientConfig{
341			SessionPoolConfig: SessionPoolConfig{
342				WriteSessions:             0.0,
343				MaxIdle:                   1,
344				HealthCheckInterval:       50 * time.Millisecond,
345				healthCheckSampleInterval: 10 * time.Millisecond,
346			},
347		})
348	defer teardown()
349	sp := client.idleSessions
350
351	// Stop healthcheck workers to simulate slow pings.
352	sp.hc.close()
353
354	// Create a session and recycle it.
355	sh, err := sp.take(ctx)
356	if err != nil {
357		t.Fatalf("failed to get session: %v", err)
358	}
359
360	// Wait until all session creation has finished.
361	waitFor(t, func() error {
362		sp.mu.Lock()
363		// WriteSessions = 0, so we only have to check for read sessions.
364		numOpened := uint64(sp.idleList.Len())
365		sp.mu.Unlock()
366		if numOpened < sp.SessionPoolConfig.incStep-1 {
367			return fmt.Errorf("creation not yet finished")
368		}
369		return nil
370	})
371
372	// Force ping during the first take() by setting check time to the past.
373	sp.hc.mu.Lock()
374	sh.session.nextCheck = time.Now().Add(-time.Minute)
375	sp.hc.mu.Unlock()
376	wantSid := sh.getID()
377	sh.recycle()
378
379	// Two back-to-back session requests, both of them should return the same
380	// session created before, but only the first of them should trigger a session ping.
381	for i := 0; i < 2; i++ {
382		// Take the session from the idle list and recycle it.
383		sh, err = sp.take(ctx)
384		if err != nil {
385			t.Fatalf("%v - failed to get session: %v", i, err)
386		}
387		if gotSid := sh.getID(); gotSid != wantSid {
388			t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid)
389		}
390
391		// The two back-to-back session requests shouldn't trigger any session
392		// pings because sessionPool.Take reschedules the next healthcheck.
393		if got, want := server.TestSpanner.DumpPings(), ([]string{wantSid}); !testEqual(got, want) {
394			t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want)
395		}
396		sh.recycle()
397	}
398
399	// Inject session error to server stub, and take the session from the
400	// session pool, the old session should be destroyed and the session pool
401	// will create a new session.
402	server.TestSpanner.PutExecutionTime(MethodExecuteSql,
403		SimulatedExecutionTime{
404			Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
405		})
406
407	// Force ping by setting check time in the past.
408	s := sp.idleList.Front().Value.(*session)
409	s.nextCheck = time.Now().Add(-time.Minute)
410
411	// take will take the idle session. Then it will send a GetSession request
412	// to check if it's healthy. It'll discover that it's not healthy
413	// (NotFound) and drop it. No new session will be created as MinOpened=0.
414	sh, err = sp.take(ctx)
415	if err != nil {
416		t.Fatalf("failed to get session: %v", err)
417	}
418	ds := server.TestSpanner.DumpSessions()
419	if g, w := uint64(len(ds)), sp.incStep-1; g != w {
420		t.Fatalf("number of sessions from mock server mismatch\nGot: %v\nWant: %v\n", g, w)
421	}
422	if sh.getID() == wantSid {
423		t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid)
424	}
425}
426
427// TestTakeFromIdleWriteListChecked tests taking sessions from session pool's
428// idle list, but with a extra ping check.
429func TestTakeFromIdleWriteListChecked(t *testing.T) {
430	t.Parallel()
431	ctx := context.Background()
432
433	// Make sure maintainer keeps the idle sessions.
434	server, client, teardown := setupMockedTestServerWithConfig(t,
435		ClientConfig{
436			SessionPoolConfig: SessionPoolConfig{
437				MaxIdle:                   1,
438				HealthCheckInterval:       50 * time.Millisecond,
439				healthCheckSampleInterval: 10 * time.Millisecond,
440			},
441		})
442	defer teardown()
443	sp := client.idleSessions
444
445	// Stop healthcheck workers to simulate slow pings.
446	sp.hc.close()
447
448	// Create a session and recycle it.
449	sh, err := sp.takeWriteSession(ctx)
450	if err != nil {
451		t.Fatalf("failed to get session: %v", err)
452	}
453	wantSid := sh.getID()
454	// Set the next check in the past to ensure the next take() call will
455	// trigger a health check.
456	sh.session.nextCheck = time.Now().Add(-time.Minute)
457	sh.recycle()
458
459	// Two back-to-back session requests, both of them should return the same
460	// session created before and only the first of them should trigger a
461	// session ping.
462	for i := 0; i < 2; i++ {
463		// Take the session from the idle list and recycle it.
464		sh, err = sp.takeWriteSession(ctx)
465		if err != nil {
466			t.Fatalf("%v - failed to get session: %v", i, err)
467		}
468		if gotSid := sh.getID(); gotSid != wantSid {
469			t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid)
470		}
471		// The two back-to-back session requests shouldn't trigger any session
472		// pings because sessionPool.Take reschedules the next healthcheck.
473		if got, want := server.TestSpanner.DumpPings(), ([]string{wantSid}); !testEqual(got, want) {
474			t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want)
475		}
476		sh.recycle()
477	}
478
479	// Inject session error to mockclient, and take the session from the
480	// session pool, the old session should be destroyed and the session pool
481	// will create a new session.
482	server.TestSpanner.PutExecutionTime(MethodExecuteSql,
483		SimulatedExecutionTime{
484			Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
485		})
486
487	// Force ping by setting check time in the past.
488	s := sp.idleList.Front().Value.(*session)
489	s.nextCheck = time.Now().Add(-time.Minute)
490
491	sh, err = sp.takeWriteSession(ctx)
492	if err != nil {
493		t.Fatalf("failed to get session: %v", err)
494	}
495	ds := server.TestSpanner.DumpSessions()
496	if g, w := uint64(len(ds)), sp.incStep-1; g != w {
497		t.Fatalf("number of sessions from mock server mismatch\nGot: %v\nWant: %v\n", g, w)
498	}
499	if sh.getID() == wantSid {
500		t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid)
501	}
502}
503
504// TestSessionLeak tests leaking a session and getting the stack of the
505// goroutine that leaked it.
506func TestSessionLeak(t *testing.T) {
507	t.Parallel()
508	ctx := context.Background()
509
510	_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
511		SessionPoolConfig: SessionPoolConfig{
512			TrackSessionHandles: true,
513			MinOpened:           0,
514			MaxOpened:           1,
515		},
516	})
517	defer teardown()
518
519	// Execute a query without calling rowIterator.Stop. This will cause the
520	// session not to be returned to the pool.
521	single := client.Single()
522	iter := single.Query(ctx, NewStatement(SelectFooFromBar))
523	for {
524		_, err := iter.Next()
525		if err == iterator.Done {
526			break
527		}
528		if err != nil {
529			t.Fatalf("Got unexpected error while iterating results: %v\n", err)
530		}
531	}
532	// The session should not have been returned to the pool.
533	if g, w := client.idleSessions.idleList.Len(), 0; g != w {
534		t.Fatalf("Idle sessions count mismatch\nGot: %d\nWant: %d\n", g, w)
535	}
536	// The checked out session should contain a stack trace.
537	if single.sh.stack == nil {
538		t.Fatalf("Missing stacktrace from session handle")
539	}
540	stack := fmt.Sprintf("%s", single.sh.stack)
541	testMethod := "TestSessionLeak"
542	if !strings.Contains(stack, testMethod) {
543		t.Fatalf("Stacktrace does not contain '%s'\nGot: %s", testMethod, stack)
544	}
545	// Return the session to the pool.
546	iter.Stop()
547	// The stack should now have been removed from the session handle.
548	if single.sh.stack != nil {
549		t.Fatalf("Got unexpected stacktrace in session handle: %s", single.sh.stack)
550	}
551
552	// Do another query and hold on to the session.
553	single = client.Single()
554	iter = single.Query(ctx, NewStatement(SelectFooFromBar))
555	for {
556		_, err := iter.Next()
557		if err == iterator.Done {
558			break
559		}
560		if err != nil {
561			t.Fatalf("Got unexpected error while iterating results: %v\n", err)
562		}
563	}
564	// Try to do another query. This will fail as MaxOpened=1.
565	ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Millisecond*10)
566	defer cancel()
567	single2 := client.Single()
568	iter2 := single2.Query(ctxWithTimeout, NewStatement(SelectFooFromBar))
569	_, gotErr := iter2.Next()
570	wantErr := client.idleSessions.errGetSessionTimeoutWithTrackedSessionHandles(codes.DeadlineExceeded)
571	// The error should contain the stacktraces of all the checked out
572	// sessions.
573	if !testEqual(gotErr, wantErr) {
574		t.Fatalf("Error mismatch on iterating result set.\nGot: %v\nWant: %v\n", gotErr, wantErr)
575	}
576	if !strings.Contains(gotErr.Error(), testMethod) {
577		t.Fatalf("Error does not contain '%s'\nGot: %s", testMethod, gotErr.Error())
578	}
579	// Close iterators to check sessions back into the pool before closing.
580	iter2.Stop()
581	iter.Stop()
582}
583
584// TestMaxOpenedSessions tests max open sessions constraint.
585func TestMaxOpenedSessions(t *testing.T) {
586	t.Parallel()
587	ctx := context.Background()
588	_, client, teardown := setupMockedTestServerWithConfig(t,
589		ClientConfig{
590			SessionPoolConfig: SessionPoolConfig{
591				MaxOpened: 1,
592			},
593		})
594	defer teardown()
595	sp := client.idleSessions
596
597	sh1, err := sp.take(ctx)
598	if err != nil {
599		t.Fatalf("cannot take session from session pool: %v", err)
600	}
601
602	// Session request will timeout due to the max open sessions constraint.
603	ctx2, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
604	defer cancel()
605	_, gotErr := sp.take(ctx2)
606	if wantErr := sp.errGetBasicSessionTimeout(codes.DeadlineExceeded); !testEqual(gotErr, wantErr) {
607		t.Fatalf("the second session retrival returns error %v, want %v", gotErr, wantErr)
608	}
609	doneWaiting := make(chan struct{})
610	go func() {
611		// Destroy the first session to allow the next session request to
612		// proceed.
613		<-doneWaiting
614		sh1.destroy()
615	}()
616
617	go func() {
618		// Wait a short random time before destroying the session handle.
619		<-time.After(10 * time.Millisecond)
620		close(doneWaiting)
621	}()
622	// Now session request can be processed because the first session will be
623	// destroyed.
624	ctx3, cancel := context.WithTimeout(ctx, time.Second)
625	defer cancel()
626	sh2, err := sp.take(ctx3)
627	if err != nil {
628		t.Fatalf("after the first session is destroyed, session retrival still returns error %v, want nil", err)
629	}
630	if !sh2.session.isValid() || sh2.getID() == "" {
631		t.Fatalf("got invalid session: %v", sh2.session)
632	}
633}
634
635// TestMinOpenedSessions tests min open session constraint.
636func TestMinOpenedSessions(t *testing.T) {
637	t.Parallel()
638	ctx := context.Background()
639	_, client, teardown := setupMockedTestServerWithConfig(t,
640		ClientConfig{
641			SessionPoolConfig: SessionPoolConfig{
642				MinOpened:                 1,
643				healthCheckSampleInterval: time.Millisecond,
644			},
645		})
646	defer teardown()
647	sp := client.idleSessions
648
649	// Take ten sessions from session pool and recycle them.
650	var ss []*session
651	var shs []*sessionHandle
652	for i := 0; i < 10; i++ {
653		sh, err := sp.take(ctx)
654		if err != nil {
655			t.Fatalf("failed to get session(%v): %v", i, err)
656		}
657		ss = append(ss, sh.session)
658		shs = append(shs, sh)
659		sh.recycle()
660	}
661	for _, sh := range shs {
662		sh.recycle()
663	}
664
665	// Simulate session expiration.
666	for _, s := range ss {
667		s.destroy(true)
668	}
669
670	// Wait until the maintainer has had a chance to replenish the pool.
671	for i := 0; i < 10; i++ {
672		sp.mu.Lock()
673		if sp.numOpened > 0 {
674			sp.mu.Unlock()
675			break
676		}
677		sp.mu.Unlock()
678		<-time.After(sp.healthCheckSampleInterval)
679	}
680	sp.mu.Lock()
681	defer sp.mu.Unlock()
682	// There should be still one session left in either the idle list or in one
683	// of the other opened states due to the min open sessions constraint.
684	if (sp.idleList.Len() +
685		sp.idleWriteList.Len() +
686		int(sp.prepareReqs) +
687		int(sp.createReqs)) != 1 {
688		t.Fatalf(
689			"got %v sessions in idle lists, want 1. Opened: %d, read: %d, "+
690				"write: %d, in preparation: %d, in creation: %d",
691			sp.idleList.Len()+sp.idleWriteList.Len(), sp.numOpened,
692			sp.idleList.Len(), sp.idleWriteList.Len(), sp.prepareReqs,
693			sp.createReqs)
694	}
695}
696
697// TestMaxBurst tests max burst constraint.
698func TestMaxBurst(t *testing.T) {
699	t.Parallel()
700	ctx := context.Background()
701	server, client, teardown := setupMockedTestServerWithConfig(t,
702		ClientConfig{
703			SessionPoolConfig: SessionPoolConfig{
704				MaxBurst: 1,
705			},
706		})
707	defer teardown()
708	sp := client.idleSessions
709
710	// Will cause session creation RPC to be retried forever.
711	server.TestSpanner.PutExecutionTime(MethodBatchCreateSession,
712		SimulatedExecutionTime{
713			Errors:    []error{status.Errorf(codes.Unavailable, "try later")},
714			KeepError: true,
715		})
716
717	// This session request will never finish until the injected error is
718	// cleared.
719	go sp.take(ctx)
720
721	// Poll for the execution of the first session request.
722	for {
723		sp.mu.Lock()
724		cr := sp.createReqs
725		sp.mu.Unlock()
726		if cr == 0 {
727			<-time.After(time.Second)
728			continue
729		}
730		// The first session request is being executed.
731		break
732	}
733
734	ctx2, cancel := context.WithTimeout(ctx, time.Second)
735	defer cancel()
736	_, gotErr := sp.take(ctx2)
737
738	// Since MaxBurst == 1, the second session request should block.
739	if wantErr := sp.errGetBasicSessionTimeout(codes.DeadlineExceeded); !testEqual(gotErr, wantErr) {
740		t.Fatalf("session retrival returns error %v, want %v", gotErr, wantErr)
741	}
742
743	// Let the first session request succeed.
744	server.TestSpanner.Freeze()
745	server.TestSpanner.PutExecutionTime(MethodBatchCreateSession, SimulatedExecutionTime{})
746	server.TestSpanner.Unfreeze()
747
748	// Now new session request can proceed because the first session request will eventually succeed.
749	sh, err := sp.take(ctx)
750	if err != nil {
751		t.Fatalf("session retrival returns error %v, want nil", err)
752	}
753	if !sh.session.isValid() || sh.getID() == "" {
754		t.Fatalf("got invalid session: %v", sh.session)
755	}
756}
757
758// TestSessionRecycle tests recycling sessions.
759func TestSessionRecycle(t *testing.T) {
760	t.Parallel()
761	ctx := context.Background()
762	// Set MaxBurst=MinOpened to prevent additional sessions to be created
763	// while session pool initialization is still running.
764	_, client, teardown := setupMockedTestServerWithConfig(t,
765		ClientConfig{
766			SessionPoolConfig: SessionPoolConfig{
767				MinOpened: 1,
768				MaxIdle:   5,
769				MaxBurst:  1,
770			},
771		})
772	defer teardown()
773	sp := client.idleSessions
774
775	// Test session is correctly recycled and reused.
776	for i := 0; i < 20; i++ {
777		s, err := sp.take(ctx)
778		if err != nil {
779			t.Fatalf("cannot get the session %v: %v", i, err)
780		}
781		s.recycle()
782	}
783
784	sp.mu.Lock()
785	defer sp.mu.Unlock()
786	// The session pool should only contain 1 session, as there is no minimum
787	// configured. In addition, there has never been more than one session in
788	// use at any time, so there's no need for the session pool to create a
789	// second session. The session has also been in use all the time, so there
790	// also no reason for the session pool to delete the session.
791	if sp.numOpened != 1 {
792		t.Fatalf("Expect session pool size 1, got %d", sp.numOpened)
793	}
794}
795
796// TestSessionDestroy tests destroying sessions.
797func TestSessionDestroy(t *testing.T) {
798	t.Parallel()
799	ctx := context.Background()
800	_, client, teardown := setupMockedTestServerWithConfig(t,
801		ClientConfig{
802			SessionPoolConfig: SessionPoolConfig{
803				MinOpened: 1,
804				MaxBurst:  1,
805			},
806		})
807	defer teardown()
808	sp := client.idleSessions
809
810	// Creating a session pool with MinSessions=1 will automatically start the
811	// creation of 1 session when the session pool is created. As MaxBurst=1,
812	// the session pool will never create more than 1 session at a time, so the
813	// take() method will wait if the initial session has not yet been created.
814	sh, err := sp.take(ctx)
815	if err != nil {
816		t.Fatalf("cannot get session from session pool: %v", err)
817	}
818	s := sh.session
819	sh.recycle()
820	if d := s.destroy(true); d || !s.isValid() {
821		// Session should be remaining because of min open sessions constraint.
822		t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d)
823	}
824	if d := s.destroy(false); !d || s.isValid() {
825		// Session should be destroyed.
826		t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d)
827	}
828}
829
830// TestHcHeap tests heap operation on top of hcHeap.
831func TestHcHeap(t *testing.T) {
832	in := []*session{
833		{nextCheck: time.Unix(10, 0)},
834		{nextCheck: time.Unix(0, 5)},
835		{nextCheck: time.Unix(1, 8)},
836		{nextCheck: time.Unix(11, 7)},
837		{nextCheck: time.Unix(6, 3)},
838	}
839	want := []*session{
840		{nextCheck: time.Unix(1, 8), hcIndex: 0},
841		{nextCheck: time.Unix(6, 3), hcIndex: 1},
842		{nextCheck: time.Unix(8, 2), hcIndex: 2},
843		{nextCheck: time.Unix(10, 0), hcIndex: 3},
844		{nextCheck: time.Unix(11, 7), hcIndex: 4},
845	}
846	hh := hcHeap{}
847	for _, s := range in {
848		heap.Push(&hh, s)
849	}
850	// Change top of the heap and do a adjustment.
851	hh.sessions[0].nextCheck = time.Unix(8, 2)
852	heap.Fix(&hh, 0)
853	for idx := 0; hh.Len() > 0; idx++ {
854		got := heap.Pop(&hh).(*session)
855		want[idx].hcIndex = -1
856		if !testEqual(got, want[idx]) {
857			t.Fatalf("%v: heap.Pop returns %v, want %v", idx, got, want[idx])
858		}
859	}
860}
861
862// TestHealthCheckScheduler tests if healthcheck workers can schedule and
863// perform healthchecks properly.
864func TestHealthCheckScheduler(t *testing.T) {
865	t.Parallel()
866	ctx := context.Background()
867	server, client, teardown := setupMockedTestServerWithConfig(t,
868		ClientConfig{
869			SessionPoolConfig: SessionPoolConfig{
870				HealthCheckInterval:       50 * time.Millisecond,
871				healthCheckSampleInterval: 10 * time.Millisecond,
872			},
873		})
874	defer teardown()
875	sp := client.idleSessions
876
877	// Create 50 sessions.
878	for i := 0; i < 50; i++ {
879		_, err := sp.take(ctx)
880		if err != nil {
881			t.Fatalf("cannot get session from session pool: %v", err)
882		}
883	}
884
885	// Make sure we start with a ping history to avoid that the first
886	// sessions that were created have not already exceeded the maximum
887	// number of pings.
888	server.TestSpanner.ClearPings()
889	// Wait for 10-30 pings per session.
890	waitFor(t, func() error {
891		// Only check actually live sessions and ignore any sessions the
892		// session pool may have deleted in the meantime.
893		liveSessions := server.TestSpanner.DumpSessions()
894		dp := server.TestSpanner.DumpPings()
895		gotPings := map[string]int64{}
896		for _, p := range dp {
897			gotPings[p]++
898		}
899		for s := range liveSessions {
900			want := int64(20)
901			if got := gotPings[s]; got < want/2 || got > want+want/2 {
902				// This is an unnacceptable amount of pings.
903				return fmt.Errorf("got %v healthchecks on session %v, want it between (%v, %v)", got, s, want/2, want+want/2)
904			}
905		}
906		return nil
907	})
908}
909
910// TestHealthCheck_FirstHealthCheck tests if the first healthcheck scheduling
911// works properly.
912func TestHealthCheck_FirstHealthCheck(t *testing.T) {
913	t.Parallel()
914	_, client, teardown := setupMockedTestServerWithConfig(t,
915		ClientConfig{
916			SessionPoolConfig: SessionPoolConfig{
917				MaxOpened:           0,
918				MinOpened:           0,
919				HealthCheckInterval: 50 * time.Minute,
920			},
921		})
922	defer teardown()
923	sp := client.idleSessions
924
925	now := time.Now()
926	start := now.Add(time.Duration(float64(sp.hc.interval) * 0.2))
927	// A second is added to avoid the edge case.
928	end := now.Add(time.Duration(float64(sp.hc.interval)*1.1) + time.Second)
929
930	s := &session{}
931	sp.hc.scheduledHCLocked(s)
932
933	if s.nextCheck.Before(start) || s.nextCheck.After(end) {
934		t.Fatalf("The first healthcheck schedule is not in the correct range: %v", s.nextCheck)
935	}
936	if !s.firstHCDone {
937		t.Fatal("The flag 'firstHCDone' should be set to true after the first healthcheck.")
938	}
939}
940
941// TestHealthCheck_NonFirstHealthCheck tests if the scheduling after the first
942// health check works properly.
943func TestHealthCheck_NonFirstHealthCheck(t *testing.T) {
944	t.Parallel()
945	_, client, teardown := setupMockedTestServerWithConfig(t,
946		ClientConfig{
947			SessionPoolConfig: SessionPoolConfig{
948				MaxOpened:           0,
949				MinOpened:           0,
950				HealthCheckInterval: 50 * time.Minute,
951			},
952		})
953	defer teardown()
954	sp := client.idleSessions
955
956	now := time.Now()
957	start := now.Add(time.Duration(float64(sp.hc.interval) * 0.9))
958	// A second is added to avoid the edge case.
959	end := now.Add(time.Duration(float64(sp.hc.interval)*1.1) + time.Second)
960
961	s := &session{firstHCDone: true}
962	sp.hc.scheduledHCLocked(s)
963
964	if s.nextCheck.Before(start) || s.nextCheck.After(end) {
965		t.Fatalf("The non-first healthcheck schedule is not in the correct range: %v", s.nextCheck)
966	}
967}
968
969// Tests that a fractions of sessions are prepared for write by health checker.
970func TestWriteSessionsPrepared(t *testing.T) {
971	t.Parallel()
972	ctx := context.Background()
973	_, client, teardown := setupMockedTestServerWithConfig(t,
974		ClientConfig{
975			SessionPoolConfig: SessionPoolConfig{
976				WriteSessions:       0.5,
977				MaxIdle:             200,
978				HealthCheckInterval: time.Nanosecond,
979			},
980		})
981	defer teardown()
982	sp := client.idleSessions
983
984	shs := make([]*sessionHandle, 100)
985	var err error
986	for i := 0; i < 100; i++ {
987		shs[i], err = sp.take(ctx)
988		if err != nil {
989			t.Fatalf("cannot get session from session pool: %v", err)
990		}
991	}
992	// Now there are 100 sessions in the pool. Release them.
993	for _, sh := range shs {
994		sh.recycle()
995	}
996
997	// Take 50 write sessions. The write sessions will be taken from either the
998	// list of prepared sessions (idleWriteList), or they will be prepared
999	// during the takeWriteSession method.
1000	wshs := make([]*sessionHandle, 50)
1001	for i := 0; i < 50; i++ {
1002		wshs[i], err = sp.takeWriteSession(ctx)
1003		if err != nil {
1004			t.Fatalf("cannot get session from session pool: %v", err)
1005		}
1006		if wshs[i].getTransactionID() == nil {
1007			t.Fatalf("got nil transaction id from session pool")
1008		}
1009	}
1010	// Return the session to the pool.
1011	for _, sh := range wshs {
1012		sh.recycle()
1013	}
1014
1015	// Now force creation of 100 more sessions.
1016	shs = make([]*sessionHandle, 200)
1017	for i := 0; i < 200; i++ {
1018		shs[i], err = sp.take(ctx)
1019		if err != nil {
1020			t.Fatalf("cannot get session from session pool: %v", err)
1021		}
1022	}
1023
1024	// Now there are 200 sessions in the pool. Release them.
1025	for _, sh := range shs {
1026		sh.recycle()
1027	}
1028	// The health checker should eventually prepare 100 of the 200 sessions with
1029	// a r/w tx.
1030	waitUntil := time.After(time.Second)
1031	var numWritePrepared int
1032	for numWritePrepared < 100 {
1033		select {
1034		case <-waitUntil:
1035			break
1036		default:
1037		}
1038		sp.mu.Lock()
1039		numWritePrepared = sp.idleWriteList.Len()
1040		sp.mu.Unlock()
1041	}
1042
1043	sp.mu.Lock()
1044	defer sp.mu.Unlock()
1045	if sp.idleWriteList.Len() != 100 {
1046		t.Fatalf("Expect 100 write prepared session, got: %d", sp.idleWriteList.Len())
1047	}
1048}
1049
1050// TestTakeFromWriteQueue tests that sessionPool.take() returns write prepared
1051// sessions as well.
1052func TestTakeFromWriteQueue(t *testing.T) {
1053	t.Parallel()
1054	ctx := context.Background()
1055	_, client, teardown := setupMockedTestServerWithConfig(t,
1056		ClientConfig{
1057			SessionPoolConfig: SessionPoolConfig{
1058				MaxOpened:           1,
1059				WriteSessions:       1.0,
1060				MaxIdle:             1,
1061				HealthCheckInterval: time.Nanosecond,
1062			},
1063		})
1064	defer teardown()
1065	sp := client.idleSessions
1066
1067	sh, err := sp.take(ctx)
1068	if err != nil {
1069		t.Fatalf("cannot get session from session pool: %v", err)
1070	}
1071	sh.recycle()
1072
1073	// Wait until the health checker has write-prepared the session.
1074	waitUntil := time.After(time.Second)
1075	var numWritePrepared int
1076	for numWritePrepared == 0 {
1077		select {
1078		case <-waitUntil:
1079			break
1080		default:
1081		}
1082		sp.mu.Lock()
1083		numWritePrepared = sp.idleWriteList.Len()
1084		sp.mu.Unlock()
1085	}
1086
1087	// The session should now be in write queue but take should also return it.
1088	sp.mu.Lock()
1089	if sp.idleWriteList.Len() == 0 {
1090		t.Fatalf("write queue unexpectedly empty")
1091	}
1092	if sp.idleList.Len() != 0 {
1093		t.Fatalf("read queue not empty")
1094	}
1095	sp.mu.Unlock()
1096	sh, err = sp.take(ctx)
1097	if err != nil {
1098		t.Fatalf("cannot get session from session pool: %v", err)
1099	}
1100	sh.recycle()
1101}
1102
1103// The session pool should stop trying to create write-prepared sessions if a
1104// non-transient error occurs while trying to begin a transaction. The
1105// process for preparing write sessions should automatically be re-enabled if
1106// a BeginTransaction call initiated by takeWriteSession succeeds.
1107//
1108// The only exception to the above is that a 'Session not found' error should
1109// cause the session to be removed from the session pool, and it should not
1110// affect the background process of preparing sessions.
1111func TestErrorOnPrepareSession(t *testing.T) {
1112	t.Parallel()
1113
1114	serverErrors := []error{
1115		status.Errorf(codes.PermissionDenied, "Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource"),
1116		status.Errorf(codes.NotFound, `Database not found: projects/<project>/instances/<instance>/databases/<database> resource_type: "type.googleapis.com/google.spanner.admin.database.v1.Database" resource_name: "projects/<project>/instances/<instance>/databases/<database>" description: "Database does not exist."`),
1117		status.Errorf(codes.FailedPrecondition, "Invalid transaction option"),
1118		status.Errorf(codes.Internal, "Unknown server error"),
1119	}
1120	logger := log.New(os.Stderr, "", log.LstdFlags)
1121	for _, serverErr := range serverErrors {
1122		ctx := context.Background()
1123		server, client, teardown := setupMockedTestServerWithConfig(t,
1124			ClientConfig{
1125				SessionPoolConfig: SessionPoolConfig{
1126					MinOpened:           10,
1127					MaxOpened:           10,
1128					WriteSessions:       0.5,
1129					HealthCheckInterval: time.Millisecond,
1130				},
1131				logger: logger,
1132			})
1133		defer teardown()
1134		// Discard logging until trying to prepare sessions has stopped.
1135		logger.SetOutput(ioutil.Discard)
1136		server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{
1137			Errors:    []error{serverErr},
1138			KeepError: true,
1139		})
1140		sp := client.idleSessions
1141
1142		// Wait until the health checker has tried to write-prepare a session.
1143		// This will cause the session pool to write some errors to the log that
1144		// preparing sessions failed.
1145		waitUntil := time.After(time.Second)
1146		var prepareDisabled bool
1147		var numOpened int
1148	waitForPrepare:
1149		for !prepareDisabled || numOpened < 10 {
1150			select {
1151			case <-waitUntil:
1152				break waitForPrepare
1153			default:
1154			}
1155			sp.mu.Lock()
1156			prepareDisabled = sp.disableBackgroundPrepareSessions
1157			numOpened = sp.idleList.Len()
1158			sp.mu.Unlock()
1159		}
1160		// Re-enable logging.
1161		logger.SetOutput(os.Stderr)
1162
1163		// There should be no write-prepared sessions.
1164		sp.mu.Lock()
1165		if sp.idleWriteList.Len() != 0 {
1166			sp.mu.Unlock()
1167			t.Fatalf("write queue unexpectedly not empty")
1168		}
1169		// All sessions should be in the read idle list.
1170		if g, w := sp.idleList.Len(), 10; g != w {
1171			sp.mu.Unlock()
1172			t.Fatalf("session count mismatch:\nWant: %v\nGot: %v", w, g)
1173		}
1174		sp.mu.Unlock()
1175		// Take a read session should succeed.
1176		sh, err := sp.take(ctx)
1177		if err != nil {
1178			t.Fatalf("cannot get session from session pool: %v", err)
1179		}
1180		sh.recycle()
1181		// Take a write session should fail with the server error.
1182		_, err = sp.takeWriteSession(ctx)
1183		if ErrCode(err) != ErrCode(serverErr) {
1184			t.Fatalf("take write session failed with unexpected error.\nGot: %v\nWant: %v\n", err, serverErr)
1185		}
1186
1187		// Clearing the error on the server should allow us to take a write
1188		// session.
1189		server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{})
1190		sh, err = sp.takeWriteSession(ctx)
1191		if err != nil {
1192			t.Fatalf("cannot get write session from session pool: %v", err)
1193		}
1194		sh.recycle()
1195		// The maintainer should also pick this up and prepare 50% of the sessions.
1196		waitUntil = time.After(time.Second)
1197		var numPrepared int
1198		for numPrepared < 5 {
1199			select {
1200			case <-waitUntil:
1201				break
1202			default:
1203			}
1204			sp.mu.Lock()
1205			numPrepared = sp.idleWriteList.Len()
1206			sp.mu.Unlock()
1207		}
1208		sp.mu.Lock()
1209		if g, w := sp.idleWriteList.Len(), 5; g != w {
1210			sp.mu.Unlock()
1211			t.Fatalf("write session count mismatch:\nWant: %v\nGot: %v", w, g)
1212		}
1213		sp.mu.Unlock()
1214	}
1215}
1216
1217// The session pool should continue to try to create write-prepared sessions if
1218// a 'Session not found' error occurs. The session that has been deleted by
1219// backend should be removed from the pool, and the maintainer should create a
1220// new session if this causes the number of sessions in the pool to fall below
1221// MinOpened.
1222func TestSessionNotFoundOnPrepareSession(t *testing.T) {
1223	t.Parallel()
1224
1225	// The server will return 'Session not found' for the first 8
1226	// BeginTransaction calls.
1227	sessionNotFoundErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")
1228	serverErrors := make([]error, 8)
1229	for i := range serverErrors {
1230		serverErrors[i] = sessionNotFoundErr
1231	}
1232	ctx := context.Background()
1233	logger := log.New(os.Stderr, "", log.LstdFlags)
1234	server, client, teardown := setupMockedTestServerWithConfig(t,
1235		ClientConfig{
1236			SessionPoolConfig: SessionPoolConfig{
1237				MinOpened:                 10,
1238				MaxOpened:                 10,
1239				WriteSessions:             0.5,
1240				HealthCheckInterval:       time.Millisecond,
1241				healthCheckSampleInterval: time.Millisecond,
1242			},
1243			logger: logger,
1244		})
1245	defer teardown()
1246	// Discard logging until trying to prepare sessions has stopped.
1247	logger.SetOutput(ioutil.Discard)
1248	server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{
1249		Errors: serverErrors,
1250	})
1251	sp := client.idleSessions
1252
1253	// Wait until the health checker has tried to write-prepare the sessions.
1254	waitUntil := time.After(5 * time.Second)
1255	var numWriteSessions int
1256	var numReadSessions int
1257waitForPrepare:
1258	for (numWriteSessions+numReadSessions) < 10 || numWriteSessions < 5 {
1259		select {
1260		case <-waitUntil:
1261			break waitForPrepare
1262		default:
1263		}
1264		sp.mu.Lock()
1265		numReadSessions = sp.idleList.Len()
1266		numWriteSessions = sp.idleWriteList.Len()
1267		sp.mu.Unlock()
1268	}
1269	// Re-enable logging.
1270	logger.SetOutput(os.Stderr)
1271
1272	// There should be at least 5 write-prepared sessions.
1273	sp.mu.Lock()
1274	if g, w := sp.idleWriteList.Len(), 5; g < w {
1275		sp.mu.Unlock()
1276		t.Fatalf("write-prepared session count mismatch.\nWant at least: %v\nGot: %v", w, g)
1277	}
1278	// The other sessions should be in the read idle list.
1279	if g, w := sp.idleList.Len()+sp.idleWriteList.Len(), 10; g != w {
1280		sp.mu.Unlock()
1281		t.Fatalf("total session count mismatch:\nWant: %v\nGot: %v", w, g)
1282	}
1283	sp.mu.Unlock()
1284	// Take a read session should succeed.
1285	sh, err := sp.take(ctx)
1286	if err != nil {
1287		t.Fatalf("cannot get session from session pool: %v", err)
1288	}
1289	sh.recycle()
1290	// Take a write session should succeed.
1291	sh, err = sp.takeWriteSession(ctx)
1292	if err != nil {
1293		t.Fatalf("take write session failed with unexpected error.\nGot: %v\nWant: %v\n", err, nil)
1294	}
1295	sh.recycle()
1296}
1297
1298// TestSessionHealthCheck tests healthchecking cases.
1299func TestSessionHealthCheck(t *testing.T) {
1300	t.Parallel()
1301	ctx := context.Background()
1302	server, client, teardown := setupMockedTestServerWithConfig(t,
1303		ClientConfig{
1304			SessionPoolConfig: SessionPoolConfig{
1305				HealthCheckInterval:       time.Nanosecond,
1306				healthCheckSampleInterval: 10 * time.Millisecond,
1307				incStep:                   1,
1308			},
1309		})
1310	defer teardown()
1311	sp := client.idleSessions
1312
1313	// Test pinging sessions.
1314	sh, err := sp.take(ctx)
1315	if err != nil {
1316		t.Fatalf("cannot get session from session pool: %v", err)
1317	}
1318
1319	// Wait for healthchecker to send pings to session.
1320	waitFor(t, func() error {
1321		pings := server.TestSpanner.DumpPings()
1322		if len(pings) == 0 || pings[0] != sh.getID() {
1323			return fmt.Errorf("healthchecker didn't send any ping to session %v", sh.getID())
1324		}
1325		return nil
1326	})
1327	// Test broken session detection.
1328	sh, err = sp.take(ctx)
1329	if err != nil {
1330		t.Fatalf("cannot get session from session pool: %v", err)
1331	}
1332
1333	server.TestSpanner.Freeze()
1334	server.TestSpanner.PutExecutionTime(MethodExecuteSql,
1335		SimulatedExecutionTime{
1336			Errors:    []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
1337			KeepError: true,
1338		})
1339	server.TestSpanner.Unfreeze()
1340
1341	s := sh.session
1342	waitFor(t, func() error {
1343		if sh.session.isValid() {
1344			return fmt.Errorf("session(%v) is still alive, want it to be dropped by healthcheck workers", s)
1345		}
1346		return nil
1347	})
1348
1349	server.TestSpanner.Freeze()
1350	server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{})
1351	server.TestSpanner.Unfreeze()
1352
1353	// Test garbage collection.
1354	sh, err = sp.take(ctx)
1355	if err != nil {
1356		t.Fatalf("cannot get session from session pool: %v", err)
1357	}
1358	sp.close()
1359	if sh.session.isValid() {
1360		t.Fatalf("session(%v) is still alive, want it to be garbage collected", s)
1361	}
1362}
1363
1364// TestStressSessionPool does stress test on session pool by the following concurrent operations:
1365//	1) Test worker gets a session from the pool.
1366//	2) Test worker turns a session back into the pool.
1367//	3) Test worker destroys a session got from the pool.
1368//	4) Healthcheck destroys a broken session (because a worker has already destroyed it).
1369//	5) Test worker closes the session pool.
1370//
1371// During the test, the session pool maintainer maintains the number of sessions,
1372// and it is expected that all sessions that are taken from session pool remains valid.
1373// When all test workers and healthcheck workers exit, mockclient, session pool
1374// and healthchecker should be in consistent state.
1375func TestStressSessionPool(t *testing.T) {
1376	t.Parallel()
1377
1378	// Use concurrent workers to test different session pool built from different configurations.
1379	for ti, cfg := range []SessionPoolConfig{
1380		{},
1381		{MinOpened: 10, MaxOpened: 100},
1382		{MaxBurst: 50},
1383		{MinOpened: 10, MaxOpened: 200, MaxBurst: 5},
1384		{MinOpened: 10, MaxOpened: 200, MaxBurst: 5, WriteSessions: 0.2},
1385	} {
1386		// Create a more aggressive session healthchecker to increase test concurrency.
1387		cfg.HealthCheckInterval = 50 * time.Millisecond
1388		cfg.healthCheckSampleInterval = 10 * time.Millisecond
1389		cfg.HealthCheckWorkers = 50
1390
1391		server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
1392			SessionPoolConfig: cfg,
1393		})
1394		sp := client.idleSessions
1395
1396		// Create a test group for this configuration and schedule 100 sub
1397		// sub tests within the group.
1398		t.Run(fmt.Sprintf("TestStressSessionPoolGroup%v", ti), func(t *testing.T) {
1399			for i := 0; i < 100; i++ {
1400				idx := i
1401				t.Logf("TestStressSessionPoolWithCfg%dWorker%03d", ti, idx)
1402				testStressSessionPool(t, cfg, ti, idx, sp, client)
1403			}
1404		})
1405		sp.hc.close()
1406		// Here the states of healthchecker, session pool and mockclient are
1407		// stable.
1408		sp.mu.Lock()
1409		idleSessions := map[string]bool{}
1410		hcSessions := map[string]bool{}
1411		mockSessions := server.TestSpanner.DumpSessions()
1412		// Dump session pool's idle list.
1413		for sl := sp.idleList.Front(); sl != nil; sl = sl.Next() {
1414			s := sl.Value.(*session)
1415			if idleSessions[s.getID()] {
1416				t.Fatalf("%v: found duplicated session in idle list: %v", ti, s.getID())
1417			}
1418			idleSessions[s.getID()] = true
1419		}
1420		for sl := sp.idleWriteList.Front(); sl != nil; sl = sl.Next() {
1421			s := sl.Value.(*session)
1422			if idleSessions[s.getID()] {
1423				t.Fatalf("%v: found duplicated session in idle write list: %v", ti, s.getID())
1424			}
1425			idleSessions[s.getID()] = true
1426		}
1427		if int(sp.numOpened) != len(idleSessions) {
1428			t.Fatalf("%v: number of opened sessions (%v) != number of idle sessions (%v)", ti, sp.numOpened, len(idleSessions))
1429		}
1430		if sp.createReqs != 0 {
1431			t.Fatalf("%v: number of pending session creations = %v, want 0", ti, sp.createReqs)
1432		}
1433		// Dump healthcheck queue.
1434		sp.hc.mu.Lock()
1435		for _, s := range sp.hc.queue.sessions {
1436			if hcSessions[s.getID()] {
1437				t.Fatalf("%v: found duplicated session in healthcheck queue: %v", ti, s.getID())
1438			}
1439			hcSessions[s.getID()] = true
1440		}
1441		sp.mu.Unlock()
1442		sp.hc.mu.Unlock()
1443
1444		// Verify that idleSessions == hcSessions == mockSessions.
1445		if !testEqual(idleSessions, hcSessions) {
1446			t.Fatalf("%v: sessions in idle list (%v) != sessions in healthcheck queue (%v)", ti, idleSessions, hcSessions)
1447		}
1448		// The server may contain more sessions than the health check queue.
1449		// This can be caused by a timeout client side during a CreateSession
1450		// request. The request may still be received and executed by the
1451		// server, but the session pool will not register the session.
1452		for id, b := range hcSessions {
1453			if b && !mockSessions[id] {
1454				t.Fatalf("%v: session in healthcheck queue (%v) was not found on server", ti, id)
1455			}
1456		}
1457		sp.close()
1458		mockSessions = server.TestSpanner.DumpSessions()
1459		for id, b := range hcSessions {
1460			if b && mockSessions[id] {
1461				// We only log a warning for this, as it sometimes happens.
1462				// The exact reason for it is unknown, but in a real life
1463				// situation the session would be garbage collected by the
1464				// server after 60 minutes.
1465				t.Logf("Found session from pool still live on server: %v", id)
1466			}
1467		}
1468		teardown()
1469	}
1470}
1471
1472func testStressSessionPool(t *testing.T, cfg SessionPoolConfig, ti int, idx int, pool *sessionPool, client *Client) {
1473	ctx := context.Background()
1474	// Test worker iterates 1K times and tries different
1475	// session / session pool operations.
1476	for j := 0; j < 1000; j++ {
1477		if idx%10 == 0 && j >= 900 {
1478			// Close the pool in selected set of workers during the
1479			// middle of the test.
1480			pool.close()
1481		}
1482		// Take a write sessions ~ 20% of the times.
1483		takeWrite := rand.Intn(5) == 4
1484		var (
1485			sh     *sessionHandle
1486			gotErr error
1487		)
1488		wasValid := pool.isValid()
1489		if takeWrite {
1490			sh, gotErr = pool.takeWriteSession(ctx)
1491		} else {
1492			sh, gotErr = pool.take(ctx)
1493		}
1494		if gotErr != nil {
1495			if pool.isValid() {
1496				t.Fatalf("%v.%v: pool.take returns error when pool is still valid: %v", ti, idx, gotErr)
1497			}
1498			// If the session pool was closed when we tried to take a session
1499			// from the pool, then we should have gotten a specific error.
1500			// If the session pool was closed between the take() and now (or
1501			// even during a take()) then an error is ok.
1502			if !wasValid {
1503				if wantErr := errInvalidSessionPool; gotErr != wantErr {
1504					t.Fatalf("%v.%v: got error when pool is closed: %v, want %v", ti, idx, gotErr, wantErr)
1505				}
1506			}
1507			continue
1508		}
1509		// Verify if session is valid when session pool is valid.
1510		// Note that if session pool is invalid after sh is taken,
1511		// then sh might be invalidated by healthcheck workers.
1512		if (sh.getID() == "" || sh.session == nil || !sh.session.isValid()) && pool.isValid() {
1513			t.Fatalf("%v.%v.%v: pool.take returns invalid session %v", ti, idx, takeWrite, sh.session)
1514		}
1515		if takeWrite && sh.getTransactionID() == nil {
1516			t.Fatalf("%v.%v: pool.takeWriteSession returns session %v without transaction", ti, idx, sh.session)
1517		}
1518		if rand.Intn(100) < idx {
1519			// Random sleep before destroying/recycling the session,
1520			// to give healthcheck worker a chance to step in.
1521			<-time.After(time.Duration(rand.Int63n(int64(cfg.HealthCheckInterval))))
1522		}
1523		if rand.Intn(100) < idx {
1524			// destroy the session.
1525			sh.destroy()
1526			continue
1527		}
1528		// recycle the session.
1529		sh.recycle()
1530	}
1531}
1532
1533// TestMaintainer checks the session pool maintainer maintains the number of
1534// sessions in the following cases:
1535//
1536// 1. On initialization of session pool, replenish session pool to meet
1537//    MinOpened or MaxIdle.
1538// 2. On increased session usage, provision extra MaxIdle sessions.
1539// 3. After the surge passes, scale down the session pool accordingly.
1540func TestMaintainer(t *testing.T) {
1541	t.Parallel()
1542	ctx := context.Background()
1543
1544	minOpened := uint64(5)
1545	maxIdle := uint64(4)
1546	_, client, teardown := setupMockedTestServerWithConfig(t,
1547		ClientConfig{
1548			SessionPoolConfig: SessionPoolConfig{
1549				MinOpened:                 minOpened,
1550				MaxIdle:                   maxIdle,
1551				healthCheckSampleInterval: time.Millisecond,
1552			},
1553		})
1554	defer teardown()
1555	sp := client.idleSessions
1556
1557	waitFor(t, func() error {
1558		sp.mu.Lock()
1559		defer sp.mu.Unlock()
1560		if sp.numOpened != 5 {
1561			return fmt.Errorf("Replenish. Expect %d open, got %d", sp.MinOpened, sp.numOpened)
1562		}
1563		return nil
1564	})
1565
1566	// To save test time, we are not creating many sessions, because the time
1567	// to create sessions will have impact on the decision on sessionsToKeep.
1568	// We also parallelize the take and recycle process.
1569	shs := make([]*sessionHandle, 20)
1570	for i := 0; i < len(shs); i++ {
1571		var err error
1572		shs[i], err = sp.take(ctx)
1573		if err != nil {
1574			t.Fatalf("cannot get session from session pool: %v", err)
1575		}
1576	}
1577	sp.mu.Lock()
1578	g, w := sp.numOpened, sp.MinOpened+sp.incStep
1579	sp.mu.Unlock()
1580	if g != w {
1581		t.Fatalf("numOpened sessions mismatch\nGot: %d\nWant: %d", g, w)
1582	}
1583
1584	// Return 14 sessions to the pool. There are still 6 sessions checked out.
1585	for _, sh := range shs[:14] {
1586		sh.recycle()
1587	}
1588
1589	// The pool should scale down to sessionsInUse + MaxIdle = 6 + 4 = 10.
1590	waitFor(t, func() error {
1591		sp.mu.Lock()
1592		defer sp.mu.Unlock()
1593		if sp.numOpened != 10 {
1594			return fmt.Errorf("Keep extra MaxIdle sessions. Expect %d open, got %d", 10, sp.numOpened)
1595		}
1596		return nil
1597	})
1598
1599	// Return the remaining 6 sessions.
1600	// The pool should now scale down to minOpened + maxIdle.
1601	for _, sh := range shs[14:] {
1602		sh.recycle()
1603	}
1604	waitFor(t, func() error {
1605		sp.mu.Lock()
1606		defer sp.mu.Unlock()
1607		if sp.numOpened != minOpened {
1608			return fmt.Errorf("Scale down. Expect %d open, got %d", minOpened+maxIdle, sp.numOpened)
1609		}
1610		return nil
1611	})
1612}
1613
1614// Tests that the session pool creates up to MinOpened connections.
1615//
1616// Historical context: This test also checks that a low
1617// healthCheckSampleInterval does not prevent it from opening connections.
1618// The low healthCheckSampleInterval will however sometimes cause session
1619// creations to time out. That should not be considered a problem, but it
1620// could cause the test case to fail if it happens too often.
1621// See: https://github.com/googleapis/google-cloud-go/issues/1259
1622func TestInit_CreatesSessions(t *testing.T) {
1623	t.Parallel()
1624	spc := SessionPoolConfig{
1625		MinOpened:                 10,
1626		MaxIdle:                   10,
1627		WriteSessions:             0.0,
1628		healthCheckSampleInterval: 20 * time.Millisecond,
1629	}
1630	server, client, teardown := setupMockedTestServerWithConfig(t,
1631		ClientConfig{
1632			SessionPoolConfig: spc,
1633			NumChannels:       4,
1634		})
1635	defer teardown()
1636	sp := client.idleSessions
1637
1638	timeout := time.After(4 * time.Second)
1639	var numOpened int
1640loop:
1641	for {
1642		select {
1643		case <-timeout:
1644			t.Fatalf("timed out, got %d session(s), want %d", numOpened, spc.MinOpened)
1645		default:
1646			sp.mu.Lock()
1647			numOpened = sp.idleList.Len() + sp.idleWriteList.Len()
1648			sp.mu.Unlock()
1649			if numOpened == 10 {
1650				break loop
1651			}
1652		}
1653	}
1654	_, err := shouldHaveReceived(server.TestSpanner, []interface{}{
1655		&sppb.BatchCreateSessionsRequest{},
1656		&sppb.BatchCreateSessionsRequest{},
1657		&sppb.BatchCreateSessionsRequest{},
1658		&sppb.BatchCreateSessionsRequest{},
1659	})
1660	if err != nil {
1661		t.Fatal(err)
1662	}
1663}
1664
1665// Tests that the session pool with a MinSessions>0 also prepares WriteSessions
1666// sessions.
1667func TestInit_PreparesSessions(t *testing.T) {
1668	t.Parallel()
1669	spc := SessionPoolConfig{
1670		MinOpened:                 10,
1671		MaxIdle:                   10,
1672		WriteSessions:             0.5,
1673		healthCheckSampleInterval: 20 * time.Millisecond,
1674	}
1675	_, client, teardown := setupMockedTestServerWithConfig(t,
1676		ClientConfig{
1677			SessionPoolConfig: spc,
1678		})
1679	defer teardown()
1680	sp := client.idleSessions
1681
1682	timeoutAmt := 4 * time.Second
1683	timeout := time.After(timeoutAmt)
1684	var numPrepared int
1685	want := int(spc.WriteSessions * float64(spc.MinOpened))
1686loop:
1687	for {
1688		select {
1689		case <-timeout:
1690			t.Fatalf("timed out after %v, got %d write-prepared session(s), want %d", timeoutAmt, numPrepared, want)
1691		default:
1692			sp.mu.Lock()
1693			numPrepared = sp.idleWriteList.Len()
1694			sp.mu.Unlock()
1695			if numPrepared == want {
1696				break loop
1697			}
1698		}
1699	}
1700}
1701
1702func (s1 *session) Equal(s2 *session) bool {
1703	return s1.client == s2.client &&
1704		s1.id == s2.id &&
1705		s1.pool == s2.pool &&
1706		s1.createTime == s2.createTime &&
1707		s1.valid == s2.valid &&
1708		s1.hcIndex == s2.hcIndex &&
1709		s1.idleList == s2.idleList &&
1710		s1.nextCheck.Equal(s2.nextCheck) &&
1711		s1.checkingHealth == s2.checkingHealth &&
1712		testEqual(s1.md, s2.md) &&
1713		bytes.Equal(s1.tx, s2.tx)
1714}
1715
1716func waitFor(t *testing.T, assert func() error) {
1717	t.Helper()
1718	timeout := 15 * time.Second
1719	ta := time.After(timeout)
1720
1721	for {
1722		select {
1723		case <-ta:
1724			if err := assert(); err != nil {
1725				t.Fatalf("after %v waiting, got %v", timeout, err)
1726			}
1727			return
1728		default:
1729		}
1730
1731		if err := assert(); err != nil {
1732			// Fail. Let's pause and retry.
1733			time.Sleep(10 * time.Millisecond)
1734			continue
1735		}
1736
1737		return
1738	}
1739}
1740
1741// Tests that maintainer only deletes sessions after a full maintenance window
1742// of 10 cycles has finished.
1743func TestMaintainer_DeletesSessions(t *testing.T) {
1744	t.Parallel()
1745
1746	ctx := context.Background()
1747	const sampleInterval = time.Millisecond * 10
1748	_, client, teardown := setupMockedTestServerWithConfig(t,
1749		ClientConfig{
1750			SessionPoolConfig: SessionPoolConfig{healthCheckSampleInterval: sampleInterval},
1751		})
1752	defer teardown()
1753	sp := client.idleSessions
1754
1755	// Take two sessions from the pool.
1756	// This will cause max sessions in use to be 2 during this window.
1757	sh1 := takeSession(ctx, t, sp)
1758	sh2 := takeSession(ctx, t, sp)
1759	wantSessions := map[string]bool{}
1760	wantSessions[sh1.getID()] = true
1761	wantSessions[sh2.getID()] = true
1762	// Return the sessions to the pool and then assure that they
1763	// are not deleted while still within the maintenance window.
1764	sh1.recycle()
1765	sh2.recycle()
1766	// Wait for 20 milliseconds, i.e. approx 2 iterations of the
1767	// maintainer. The sessions should still be in the pool.
1768	<-time.After(sampleInterval * 2)
1769	sh3 := takeSession(ctx, t, sp)
1770	sh4 := takeSession(ctx, t, sp)
1771	// Check that the returned sessions are equal to the sessions that we got
1772	// the first time from the session pool.
1773	gotSessions := map[string]bool{}
1774	gotSessions[sh3.getID()] = true
1775	gotSessions[sh4.getID()] = true
1776	testEqual(wantSessions, gotSessions)
1777	// Return the sessions to the pool.
1778	sh3.recycle()
1779	sh4.recycle()
1780
1781	// Now wait for the maintenance window to finish. This will cause the
1782	// maintainer to enter a new window and reset the max number of sessions in
1783	// use to the currently number of checked out sessions. That is 0, as all
1784	// sessions have been returned to the pool. That again will cause the
1785	// maintainer to delete these sessions at the next iteration, unless we
1786	// checkout new sessions during the first iteration.
1787	waitFor(t, func() error {
1788		sp.mu.Lock()
1789		defer sp.mu.Unlock()
1790		if sp.numOpened > 0 {
1791			return fmt.Errorf("session pool still contains more than 0 sessions")
1792		}
1793		return nil
1794	})
1795	sh5 := takeSession(ctx, t, sp)
1796	sh6 := takeSession(ctx, t, sp)
1797	// Assure that these sessions are new sessions.
1798	if gotSessions[sh5.getID()] || gotSessions[sh6.getID()] {
1799		t.Fatal("got unexpected existing session from pool")
1800	}
1801}
1802
1803func takeSession(ctx context.Context, t *testing.T, sp *sessionPool) *sessionHandle {
1804	sh, err := sp.take(ctx)
1805	if err != nil {
1806		t.Fatalf("cannot get session from session pool: %v", err)
1807	}
1808	return sh
1809}
1810
1811func TestMaintenanceWindow_CycleAndUpdateMaxCheckedOut(t *testing.T) {
1812	t.Parallel()
1813
1814	maxOpened := uint64(1000)
1815	mw := newMaintenanceWindow(maxOpened)
1816	for _, m := range mw.maxSessionsCheckedOut {
1817		if m < maxOpened {
1818			t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, maxOpened)
1819		}
1820	}
1821	// Do one cycle and simulate that there are currently no sessions checked
1822	// out of the pool.
1823	mw.startNewCycle(0)
1824	if g, w := mw.maxSessionsCheckedOut[0], uint64(0); g != w {
1825		t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w)
1826	}
1827	for _, m := range mw.maxSessionsCheckedOut[1:] {
1828		if m < maxOpened {
1829			t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, maxOpened)
1830		}
1831	}
1832	// Check that the max checked out during the entire window is still
1833	// maxOpened.
1834	if g, w := mw.maxSessionsCheckedOutDuringWindow(), maxOpened; g != w {
1835		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1836	}
1837	// Update the max number checked out for the current cycle.
1838	mw.updateMaxSessionsCheckedOutDuringWindow(uint64(10))
1839	if g, w := mw.maxSessionsCheckedOut[0], uint64(10); g != w {
1840		t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w)
1841	}
1842	// The max of the entire window should still not change.
1843	if g, w := mw.maxSessionsCheckedOutDuringWindow(), maxOpened; g != w {
1844		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1845	}
1846	// Now pass enough cycles to complete a maintenance window. Each cycle has
1847	// no sessions checked out. We start at 1, as we have already passed one
1848	// cycle. This should then be the last cycle still in the maintenance
1849	// window, and the only one with a maxSessionsCheckedOut greater than 0.
1850	for i := 1; i < maintenanceWindowSize; i++ {
1851		mw.startNewCycle(0)
1852	}
1853	for _, m := range mw.maxSessionsCheckedOut[:9] {
1854		if m != 0 {
1855			t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, 0)
1856		}
1857	}
1858	// The oldest cycle in the window should have max=10.
1859	if g, w := mw.maxSessionsCheckedOut[maintenanceWindowSize-1], uint64(10); g != w {
1860		t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w)
1861	}
1862	// The max of the entire window should now be 10.
1863	if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(10); g != w {
1864		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1865	}
1866	// Do another cycle with max=0.
1867	mw.startNewCycle(0)
1868	// The max of the entire window should now be 0.
1869	if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(0); g != w {
1870		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1871	}
1872	// Do another cycle with 5 sessions as max. This should now be the new
1873	// window max.
1874	mw.startNewCycle(5)
1875	if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(5); g != w {
1876		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1877	}
1878	// Do a couple of cycles so that the only non-zero value is in the middle.
1879	// The max for the entire window should still be 5.
1880	for i := 0; i < maintenanceWindowSize/2; i++ {
1881		mw.startNewCycle(0)
1882	}
1883	if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(5); g != w {
1884		t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
1885	}
1886}
1887
1888func TestSessionCreationIsDistributedOverChannels(t *testing.T) {
1889	t.Parallel()
1890	numChannels := 4
1891	spc := SessionPoolConfig{
1892		MinOpened:     12,
1893		WriteSessions: 0.0,
1894		incStep:       2,
1895	}
1896	_, client, teardown := setupMockedTestServerWithConfig(t,
1897		ClientConfig{
1898			SessionPoolConfig: spc,
1899			NumChannels:       numChannels,
1900		})
1901	defer teardown()
1902	sp := client.idleSessions
1903
1904	waitFor(t, func() error {
1905		sp.mu.Lock()
1906		// WriteSessions = 0, so we only have to check for read sessions.
1907		numOpened := uint64(sp.idleList.Len())
1908		sp.mu.Unlock()
1909		if numOpened < spc.MinOpened {
1910			return fmt.Errorf("not yet initialized")
1911		}
1912		return nil
1913	})
1914
1915	sessionsPerChannel := getSessionsPerChannel(sp)
1916	if g, w := len(sessionsPerChannel), numChannels; g != w {
1917		t.Errorf("number of channels mismatch\nGot: %d\nWant: %d", g, w)
1918	}
1919	for k, v := range sessionsPerChannel {
1920		if g, w := v, int(sp.MinOpened)/numChannels; g != w {
1921			t.Errorf("number of sessions mismatch for %s:\nGot: %d\nWant: %d", k, g, w)
1922		}
1923	}
1924	// Check out all sessions + incStep * numChannels from the pool. This
1925	// should cause incStep * numChannels additional sessions to be created.
1926	checkedOut := make([]*sessionHandle, sp.MinOpened+sp.incStep*uint64(numChannels))
1927	var err error
1928	for i := 0; i < cap(checkedOut); i++ {
1929		checkedOut[i], err = sp.take(context.Background())
1930		if err != nil {
1931			t.Fatal(err)
1932		}
1933	}
1934	for i := 0; i < cap(checkedOut); i++ {
1935		checkedOut[i].recycle()
1936	}
1937	// The additional sessions should also be distributed over all available
1938	// channels.
1939	sessionsPerChannel = getSessionsPerChannel(sp)
1940	// There should not be any new clients (channels).
1941	if g, w := len(sessionsPerChannel), numChannels; g != w {
1942		t.Errorf("number of channels mismatch\nGot: %d\nWant: %d", g, w)
1943	}
1944	for k, v := range sessionsPerChannel {
1945		if g, w := v, int(sp.MinOpened)/numChannels+int(sp.incStep); g != w {
1946			t.Errorf("number of sessions mismatch for %s:\nGot: %d\nWant: %d", k, g, w)
1947		}
1948	}
1949}
1950
1951func getSessionsPerChannel(sp *sessionPool) map[string]int {
1952	sessionsPerChannel := make(map[string]int)
1953	sp.mu.Lock()
1954	defer sp.mu.Unlock()
1955	el := sp.idleList.Front()
1956	for el != nil {
1957		s, _ := el.Value.(*session)
1958		// Get the pointer to the actual underlying gRPC ClientConn and use
1959		// that as the key in the map.
1960		val := reflect.ValueOf(s.client).Elem()
1961		connPool := val.FieldByName("connPool").Elem().Elem()
1962		conn := connPool.Field(0).Pointer()
1963		key := fmt.Sprintf("%v", conn)
1964		sessionsPerChannel[key] = sessionsPerChannel[key] + 1
1965		el = el.Next()
1966	}
1967	return sessionsPerChannel
1968}
1969