1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"bytes"
21	"container/heap"
22	"context"
23	"fmt"
24	"math/rand"
25	"sync"
26	"sync/atomic"
27	"testing"
28	"time"
29
30	"cloud.google.com/go/spanner/internal/testutil"
31	sppb "google.golang.org/genproto/googleapis/spanner/v1"
32	"google.golang.org/grpc"
33	"google.golang.org/grpc/codes"
34	"google.golang.org/grpc/status"
35)
36
37// TestSessionPoolConfigValidation tests session pool config validation.
38func TestSessionPoolConfigValidation(t *testing.T) {
39	t.Parallel()
40
41	sc := testutil.NewMockCloudSpannerClient(t)
42	for _, test := range []struct {
43		spc SessionPoolConfig
44		err error
45	}{
46		{
47			SessionPoolConfig{},
48			errNoRPCGetter(),
49		},
50		{
51			SessionPoolConfig{
52				getRPCClient: func() (sppb.SpannerClient, error) {
53					return sc, nil
54				},
55				MinOpened: 10,
56				MaxOpened: 5,
57			},
58			errMinOpenedGTMaxOpened(5, 10),
59		},
60	} {
61		if _, err := newSessionPool("mockdb", test.spc, nil); !testEqual(err, test.err) {
62			t.Fatalf("want %v, got %v", test.err, err)
63		}
64	}
65}
66
67// TestSessionCreation tests session creation during sessionPool.Take().
68func TestSessionCreation(t *testing.T) {
69	t.Parallel()
70	ctx := context.Background()
71	_, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{})
72	defer cleanup()
73
74	// Take three sessions from session pool, this should trigger session pool
75	// to create three new sessions.
76	shs := make([]*sessionHandle, 3)
77	// gotDs holds the unique sessions taken from session pool.
78	gotDs := map[string]bool{}
79	for i := 0; i < len(shs); i++ {
80		var err error
81		shs[i], err = sp.take(ctx)
82		if err != nil {
83			t.Fatalf("failed to get session(%v): %v", i, err)
84		}
85		gotDs[shs[i].getID()] = true
86	}
87	if len(gotDs) != len(shs) {
88		t.Fatalf("session pool created %v sessions, want %v", len(gotDs), len(shs))
89	}
90	if wantDs := mock.DumpSessions(); !testEqual(gotDs, wantDs) {
91		t.Fatalf("session pool creates sessions %v, want %v", gotDs, wantDs)
92	}
93	// Verify that created sessions are recorded correctly in session pool.
94	sp.mu.Lock()
95	if int(sp.numOpened) != len(shs) {
96		t.Fatalf("session pool reports %v open sessions, want %v", sp.numOpened, len(shs))
97	}
98	if sp.createReqs != 0 {
99		t.Fatalf("session pool reports %v session create requests, want 0", int(sp.createReqs))
100	}
101	sp.mu.Unlock()
102	// Verify that created sessions are tracked correctly by healthcheck queue.
103	hc := sp.hc
104	hc.mu.Lock()
105	if hc.queue.Len() != len(shs) {
106		t.Fatalf("healthcheck queue length = %v, want %v", hc.queue.Len(), len(shs))
107	}
108	for _, s := range hc.queue.sessions {
109		if !gotDs[s.getID()] {
110			t.Fatalf("session %v is in healthcheck queue, but it is not created by session pool", s.getID())
111		}
112	}
113	hc.mu.Unlock()
114}
115
116// TestTakeFromIdleList tests taking sessions from session pool's idle list.
117func TestTakeFromIdleList(t *testing.T) {
118	t.Parallel()
119	ctx := context.Background()
120
121	// Make sure maintainer keeps the idle sessions.
122	_, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{MaxIdle: 10})
123	defer cleanup()
124
125	// Take ten sessions from session pool and recycle them.
126	shs := make([]*sessionHandle, 10)
127	for i := 0; i < len(shs); i++ {
128		var err error
129		shs[i], err = sp.take(ctx)
130		if err != nil {
131			t.Fatalf("failed to get session(%v): %v", i, err)
132		}
133	}
134	// Make sure it's sampled once before recycling, otherwise it will be
135	// cleaned up.
136	<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
137	for i := 0; i < len(shs); i++ {
138		shs[i].recycle()
139	}
140	// Further session requests from session pool won't cause mockclient to
141	// create more sessions.
142	wantSessions := mock.DumpSessions()
143	// Take ten sessions from session pool again, this time all sessions should
144	// come from idle list.
145	gotSessions := map[string]bool{}
146	for i := 0; i < len(shs); i++ {
147		sh, err := sp.take(ctx)
148		if err != nil {
149			t.Fatalf("cannot take session from session pool: %v", err)
150		}
151		gotSessions[sh.getID()] = true
152	}
153	if len(gotSessions) != 10 {
154		t.Fatalf("got %v unique sessions, want 10", len(gotSessions))
155	}
156	if !testEqual(gotSessions, wantSessions) {
157		t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions)
158	}
159}
160
161// TesttakeWriteSessionFromIdleList tests taking write sessions from session
162// pool's idle list.
163func TestTakeWriteSessionFromIdleList(t *testing.T) {
164	t.Parallel()
165	ctx := context.Background()
166
167	// Make sure maintainer keeps the idle sessions.
168	_, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{MaxIdle: 20})
169	defer cleanup()
170
171	// Take ten sessions from session pool and recycle them.
172	shs := make([]*sessionHandle, 10)
173	for i := 0; i < len(shs); i++ {
174		var err error
175		shs[i], err = sp.takeWriteSession(ctx)
176		if err != nil {
177			t.Fatalf("failed to get session(%v): %v", i, err)
178		}
179	}
180	// Make sure it's sampled once before recycling, otherwise it will be
181	// cleaned up.
182	<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
183	for i := 0; i < len(shs); i++ {
184		shs[i].recycle()
185	}
186	// Further session requests from session pool won't cause mockclient to
187	// create more sessions.
188	wantSessions := mock.DumpSessions()
189	// Take ten sessions from session pool again, this time all sessions should
190	// come from idle list.
191	gotSessions := map[string]bool{}
192	for i := 0; i < len(shs); i++ {
193		sh, err := sp.takeWriteSession(ctx)
194		if err != nil {
195			t.Fatalf("cannot take session from session pool: %v", err)
196		}
197		gotSessions[sh.getID()] = true
198	}
199	if len(gotSessions) != 10 {
200		t.Fatalf("got %v unique sessions, want 10", len(gotSessions))
201	}
202	if !testEqual(gotSessions, wantSessions) {
203		t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions)
204	}
205}
206
207// TestTakeFromIdleListChecked tests taking sessions from session pool's idle
208// list, but with a extra ping check.
209func TestTakeFromIdleListChecked(t *testing.T) {
210	t.Parallel()
211	ctx := context.Background()
212
213	// Make sure maintainer keeps the idle sessions.
214	_, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{
215		MaxIdle:                   1,
216		HealthCheckInterval:       50 * time.Millisecond,
217		healthCheckSampleInterval: 10 * time.Millisecond,
218	})
219	defer cleanup()
220
221	// Stop healthcheck workers to simulate slow pings.
222	sp.hc.close()
223
224	// Create a session and recycle it.
225	sh, err := sp.take(ctx)
226	if err != nil {
227		t.Fatalf("failed to get session: %v", err)
228	}
229
230	// Make sure it's sampled once before recycling, otherwise it will be
231	// cleaned up.
232	<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
233	wantSid := sh.getID()
234	sh.recycle()
235
236	// TODO(deklerk): get rid of this
237	<-time.After(time.Second)
238
239	// Two back-to-back session requests, both of them should return the same
240	// session created before and none of them should trigger a session ping.
241	for i := 0; i < 2; i++ {
242		// Take the session from the idle list and recycle it.
243		sh, err = sp.take(ctx)
244		if err != nil {
245			t.Fatalf("%v - failed to get session: %v", i, err)
246		}
247		if gotSid := sh.getID(); gotSid != wantSid {
248			t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid)
249		}
250
251		// The two back-to-back session requests shouldn't trigger any session
252		// pings because sessionPool.Take
253		// reschedules the next healthcheck.
254		if got, want := mock.DumpPings(), ([]string{wantSid}); !testEqual(got, want) {
255			t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want)
256		}
257		sh.recycle()
258	}
259
260	// Inject session error to server stub, and take the session from the
261	// session pool, the old session should be destroyed and the session pool
262	// will create a new session.
263	mock.GetSessionFn = func(c context.Context, r *sppb.GetSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
264		mock.MockCloudSpannerClient.ReceivedRequests <- r
265		return nil, status.Errorf(codes.NotFound, "Session not found")
266	}
267
268	// Delay to trigger sessionPool.Take to ping the session.
269	// TODO(deklerk): get rid of this
270	<-time.After(time.Second)
271
272	// take will take the idle session. Then it will send a GetSession request
273	// to check if it's healthy. It'll discover that it's not healthy
274	// (NotFound), drop it, and create a new session.
275	sh, err = sp.take(ctx)
276	if err != nil {
277		t.Fatalf("failed to get session: %v", err)
278	}
279	ds := mock.DumpSessions()
280	if len(ds) != 1 {
281		t.Fatalf("dumped sessions from mockclient: %v, want %v", ds, sh.getID())
282	}
283	if sh.getID() == wantSid {
284		t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid)
285	}
286}
287
288// TestTakeFromIdleWriteListChecked tests taking sessions from session pool's
289// idle list, but with a extra ping check.
290func TestTakeFromIdleWriteListChecked(t *testing.T) {
291	t.Parallel()
292	ctx := context.Background()
293
294	// Make sure maintainer keeps the idle sessions.
295	_, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{
296		MaxIdle:                   1,
297		HealthCheckInterval:       50 * time.Millisecond,
298		healthCheckSampleInterval: 10 * time.Millisecond,
299	})
300	defer cleanup()
301
302	// Stop healthcheck workers to simulate slow pings.
303	sp.hc.close()
304
305	// Create a session and recycle it.
306	sh, err := sp.takeWriteSession(ctx)
307	if err != nil {
308		t.Fatalf("failed to get session: %v", err)
309	}
310	wantSid := sh.getID()
311
312	// Make sure it's sampled once before recycling, otherwise it will be
313	// cleaned up.
314	<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
315	sh.recycle()
316
317	// TODO(deklerk): get rid of this
318	<-time.After(time.Second)
319
320	// Two back-to-back session requests, both of them should return the same
321	// session created before and none of them should trigger a session ping.
322	for i := 0; i < 2; i++ {
323		// Take the session from the idle list and recycle it.
324		sh, err = sp.takeWriteSession(ctx)
325		if err != nil {
326			t.Fatalf("%v - failed to get session: %v", i, err)
327		}
328		if gotSid := sh.getID(); gotSid != wantSid {
329			t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid)
330		}
331		// The two back-to-back session requests shouldn't trigger any session
332		// pings because sessionPool.Take reschedules the next healthcheck.
333		if got, want := mock.DumpPings(), ([]string{wantSid}); !testEqual(got, want) {
334			t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want)
335		}
336		sh.recycle()
337	}
338
339	// Inject session error to mockclient, and take the session from the
340	// session pool, the old session should be destroyed and the session pool
341	// will create a new session.
342	mock.GetSessionFn = func(c context.Context, r *sppb.GetSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
343		mock.MockCloudSpannerClient.ReceivedRequests <- r
344		return nil, status.Errorf(codes.NotFound, "Session not found")
345	}
346
347	// Delay to trigger sessionPool.Take to ping the session.
348	// TOOD(deklerk) get rid of this
349	<-time.After(time.Second)
350
351	sh, err = sp.takeWriteSession(ctx)
352	if err != nil {
353		t.Fatalf("failed to get session: %v", err)
354	}
355	ds := mock.DumpSessions()
356	if len(ds) != 1 {
357		t.Fatalf("dumped sessions from mockclient: %v, want %v", ds, sh.getID())
358	}
359	if sh.getID() == wantSid {
360		t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid)
361	}
362}
363
364// TestMaxOpenedSessions tests max open sessions constraint.
365func TestMaxOpenedSessions(t *testing.T) {
366	t.Parallel()
367	ctx := context.Background()
368	_, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MaxOpened: 1})
369	defer cleanup()
370
371	sh1, err := sp.take(ctx)
372	if err != nil {
373		t.Fatalf("cannot take session from session pool: %v", err)
374	}
375
376	// Session request will timeout due to the max open sessions constraint.
377	ctx2, cancel := context.WithTimeout(ctx, time.Second)
378	defer cancel()
379	_, gotErr := sp.take(ctx2)
380	if wantErr := errGetSessionTimeout(); !testEqual(gotErr, wantErr) {
381		t.Fatalf("the second session retrival returns error %v, want %v", gotErr, wantErr)
382	}
383
384	go func() {
385		// TODO(deklerk): remove this
386		<-time.After(time.Second)
387		// Destroy the first session to allow the next session request to
388		// proceed.
389		sh1.destroy()
390	}()
391
392	// Now session request can be processed because the first session will be
393	// destroyed.
394	sh2, err := sp.take(ctx)
395	if err != nil {
396		t.Fatalf("after the first session is destroyed, session retrival still returns error %v, want nil", err)
397	}
398	if !sh2.session.isValid() || sh2.getID() == "" {
399		t.Fatalf("got invalid session: %v", sh2.session)
400	}
401}
402
403// TestMinOpenedSessions tests min open session constraint.
404func TestMinOpenedSessions(t *testing.T) {
405	t.Parallel()
406	ctx := context.Background()
407	_, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MinOpened: 1})
408	defer cleanup()
409
410	// Take ten sessions from session pool and recycle them.
411	var ss []*session
412	var shs []*sessionHandle
413	for i := 0; i < 10; i++ {
414		sh, err := sp.take(ctx)
415		if err != nil {
416			t.Fatalf("failed to get session(%v): %v", i, err)
417		}
418		ss = append(ss, sh.session)
419		shs = append(shs, sh)
420		sh.recycle()
421	}
422	for _, sh := range shs {
423		sh.recycle()
424	}
425
426	// Simulate session expiration.
427	for _, s := range ss {
428		s.destroy(true)
429	}
430
431	sp.mu.Lock()
432	defer sp.mu.Unlock()
433	// There should be still one session left in idle list due to the min open
434	// sessions constraint.
435	if sp.idleList.Len() != 1 {
436		t.Fatalf("got %v sessions in idle list, want 1 %d", sp.idleList.Len(), sp.numOpened)
437	}
438}
439
440// TestMaxBurst tests max burst constraint.
441func TestMaxBurst(t *testing.T) {
442	t.Parallel()
443	ctx := context.Background()
444	_, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{MaxBurst: 1})
445	defer cleanup()
446
447	// Will cause session creation RPC to be retried forever.
448	allowRequests := make(chan struct{})
449	mock.CreateSessionFn = func(c context.Context, r *sppb.CreateSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
450		select {
451		case <-allowRequests:
452			return mock.MockCloudSpannerClient.CreateSession(c, r, opts...)
453		default:
454			mock.MockCloudSpannerClient.ReceivedRequests <- r
455			return nil, status.Errorf(codes.Unavailable, "try later")
456		}
457	}
458
459	// This session request will never finish until the injected error is
460	// cleared.
461	go sp.take(ctx)
462
463	// Poll for the execution of the first session request.
464	for {
465		sp.mu.Lock()
466		cr := sp.createReqs
467		sp.mu.Unlock()
468		if cr == 0 {
469			<-time.After(time.Second)
470			continue
471		}
472		// The first session request is being executed.
473		break
474	}
475
476	ctx2, cancel := context.WithTimeout(ctx, time.Second)
477	defer cancel()
478	_, gotErr := sp.take(ctx2)
479
480	// Since MaxBurst == 1, the second session request should block.
481	if wantErr := errGetSessionTimeout(); !testEqual(gotErr, wantErr) {
482		t.Fatalf("session retrival returns error %v, want %v", gotErr, wantErr)
483	}
484
485	// Let the first session request succeed.
486	close(allowRequests)
487
488	// Now new session request can proceed because the first session request will eventually succeed.
489	sh, err := sp.take(ctx)
490	if err != nil {
491		t.Fatalf("session retrival returns error %v, want nil", err)
492	}
493	if !sh.session.isValid() || sh.getID() == "" {
494		t.Fatalf("got invalid session: %v", sh.session)
495	}
496}
497
498// TestSessionRecycle tests recycling sessions.
499func TestSessionRecycle(t *testing.T) {
500	t.Parallel()
501	ctx := context.Background()
502	_, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MinOpened: 1, MaxIdle: 5})
503	defer cleanup()
504
505	// Test session is correctly recycled and reused.
506	for i := 0; i < 20; i++ {
507		s, err := sp.take(ctx)
508		if err != nil {
509			t.Fatalf("cannot get the session %v: %v", i, err)
510		}
511		s.recycle()
512	}
513
514	sp.mu.Lock()
515	defer sp.mu.Unlock()
516	// Ideally it should only be 1, because the session should be recycled and
517	// re-used each time. However, sometimes the pool maintainer might increase
518	// the pool size by 1 right around the time we take (which also increases
519	// the pool size by 1), so this assertion is OK with either 1 or 2. We
520	// expect never to see more than 2, though, even when MaxIdle is quite high:
521	// each session should be recycled and re-used.
522	if sp.numOpened != 1 && sp.numOpened != 2 {
523		t.Fatalf("Expect session pool size 1 or 2, got %d", sp.numOpened)
524	}
525}
526
527// TODO(deklerk): Investigate why s.destroy(true) is flakey.
528// TestSessionDestroy tests destroying sessions.
529func TestSessionDestroy(t *testing.T) {
530	t.Skip("s.destroy(true) is flakey")
531	t.Parallel()
532	ctx := context.Background()
533	_, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MinOpened: 1})
534	defer cleanup()
535
536	<-time.After(10 * time.Millisecond) // maintainer will create one session, we wait for it create session to avoid flakiness in test
537	sh, err := sp.take(ctx)
538	if err != nil {
539		t.Fatalf("cannot get session from session pool: %v", err)
540	}
541	s := sh.session
542	sh.recycle()
543	if d := s.destroy(true); d || !s.isValid() {
544		// Session should be remaining because of min open sessions constraint.
545		t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d)
546	}
547	if d := s.destroy(false); !d || s.isValid() {
548		// Session should be destroyed.
549		t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d)
550	}
551}
552
553// TestHcHeap tests heap operation on top of hcHeap.
554func TestHcHeap(t *testing.T) {
555	in := []*session{
556		{nextCheck: time.Unix(10, 0)},
557		{nextCheck: time.Unix(0, 5)},
558		{nextCheck: time.Unix(1, 8)},
559		{nextCheck: time.Unix(11, 7)},
560		{nextCheck: time.Unix(6, 3)},
561	}
562	want := []*session{
563		{nextCheck: time.Unix(1, 8), hcIndex: 0},
564		{nextCheck: time.Unix(6, 3), hcIndex: 1},
565		{nextCheck: time.Unix(8, 2), hcIndex: 2},
566		{nextCheck: time.Unix(10, 0), hcIndex: 3},
567		{nextCheck: time.Unix(11, 7), hcIndex: 4},
568	}
569	hh := hcHeap{}
570	for _, s := range in {
571		heap.Push(&hh, s)
572	}
573	// Change top of the heap and do a adjustment.
574	hh.sessions[0].nextCheck = time.Unix(8, 2)
575	heap.Fix(&hh, 0)
576	for idx := 0; hh.Len() > 0; idx++ {
577		got := heap.Pop(&hh).(*session)
578		want[idx].hcIndex = -1
579		if !testEqual(got, want[idx]) {
580			t.Fatalf("%v: heap.Pop returns %v, want %v", idx, got, want[idx])
581		}
582	}
583}
584
585// TestHealthCheckScheduler tests if healthcheck workers can schedule and
586// perform healthchecks properly.
587func TestHealthCheckScheduler(t *testing.T) {
588	t.Parallel()
589	ctx := context.Background()
590	_, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{
591		HealthCheckInterval:       50 * time.Millisecond,
592		healthCheckSampleInterval: 10 * time.Millisecond,
593	})
594	defer cleanup()
595
596	// Create 50 sessions.
597	ss := []string{}
598	for i := 0; i < 50; i++ {
599		sh, err := sp.take(ctx)
600		if err != nil {
601			t.Fatalf("cannot get session from session pool: %v", err)
602		}
603		ss = append(ss, sh.getID())
604	}
605
606	// Wait for 10-30 pings per session.
607	waitFor(t, func() error {
608		dp := mock.DumpPings()
609		gotPings := map[string]int64{}
610		for _, p := range dp {
611			gotPings[p]++
612		}
613		for _, s := range ss {
614			want := int64(20)
615			if got := gotPings[s]; got < want/2 || got > want+want/2 {
616				// This is an unnacceptable amount of pings.
617				return fmt.Errorf("got %v healthchecks on session %v, want it between (%v, %v)", got, s, want/2, want+want/2)
618			}
619		}
620		return nil
621	})
622}
623
624// Tests that a fractions of sessions are prepared for write by health checker.
625func TestWriteSessionsPrepared(t *testing.T) {
626	t.Parallel()
627	ctx := context.Background()
628	_, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{WriteSessions: 0.5, MaxIdle: 20})
629	defer cleanup()
630
631	shs := make([]*sessionHandle, 10)
632	var err error
633	for i := 0; i < 10; i++ {
634		shs[i], err = sp.take(ctx)
635		if err != nil {
636			t.Fatalf("cannot get session from session pool: %v", err)
637		}
638	}
639	// Now there are 10 sessions in the pool. Release them.
640	for _, sh := range shs {
641		sh.recycle()
642	}
643
644	// Sleep for 1s, allowing healthcheck workers to invoke begin transaction.
645	// TODO(deklerk): get rid of this
646	<-time.After(time.Second)
647	wshs := make([]*sessionHandle, 5)
648	for i := 0; i < 5; i++ {
649		wshs[i], err = sp.takeWriteSession(ctx)
650		if err != nil {
651			t.Fatalf("cannot get session from session pool: %v", err)
652		}
653		if wshs[i].getTransactionID() == nil {
654			t.Fatalf("got nil transaction id from session pool")
655		}
656	}
657	for _, sh := range wshs {
658		sh.recycle()
659	}
660
661	// TODO(deklerk): get rid of this
662	<-time.After(time.Second)
663
664	// Now force creation of 10 more sessions.
665	shs = make([]*sessionHandle, 20)
666	for i := 0; i < 20; i++ {
667		shs[i], err = sp.take(ctx)
668		if err != nil {
669			t.Fatalf("cannot get session from session pool: %v", err)
670		}
671	}
672
673	// Now there are 20 sessions in the pool. Release them.
674	for _, sh := range shs {
675		sh.recycle()
676	}
677
678	// TODO(deklerk): get rid of this
679	<-time.After(time.Second)
680
681	if sp.idleWriteList.Len() != 10 {
682		t.Fatalf("Expect 10 write prepared session, got: %d", sp.idleWriteList.Len())
683	}
684}
685
686// TestTakeFromWriteQueue tests that sessionPool.take() returns write prepared
687// sessions as well.
688func TestTakeFromWriteQueue(t *testing.T) {
689	t.Parallel()
690	ctx := context.Background()
691	_, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MaxOpened: 1, WriteSessions: 1.0, MaxIdle: 1})
692	defer cleanup()
693
694	sh, err := sp.take(ctx)
695	if err != nil {
696		t.Fatalf("cannot get session from session pool: %v", err)
697	}
698	sh.recycle()
699
700	// TODO(deklerk): get rid of this
701	<-time.After(time.Second)
702
703	// The session should now be in write queue but take should also return it.
704	if sp.idleWriteList.Len() == 0 {
705		t.Fatalf("write queue unexpectedly empty")
706	}
707	if sp.idleList.Len() != 0 {
708		t.Fatalf("read queue not empty")
709	}
710	sh, err = sp.take(ctx)
711	if err != nil {
712		t.Fatalf("cannot get session from session pool: %v", err)
713	}
714	sh.recycle()
715}
716
717// TestSessionHealthCheck tests healthchecking cases.
718func TestSessionHealthCheck(t *testing.T) {
719	t.Parallel()
720	ctx := context.Background()
721	_, sp, mock, cleanup := serverClientMock(t, SessionPoolConfig{
722		HealthCheckInterval:       50 * time.Millisecond,
723		healthCheckSampleInterval: 10 * time.Millisecond,
724	})
725	defer cleanup()
726
727	var requestShouldErr int64 // 0 == false, 1 == true
728	mock.GetSessionFn = func(c context.Context, r *sppb.GetSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
729		if shouldErr := atomic.LoadInt64(&requestShouldErr); shouldErr == 1 {
730			mock.MockCloudSpannerClient.ReceivedRequests <- r
731			return nil, status.Errorf(codes.NotFound, "Session not found")
732		}
733		return mock.MockCloudSpannerClient.GetSession(c, r, opts...)
734	}
735
736	// Test pinging sessions.
737	sh, err := sp.take(ctx)
738	if err != nil {
739		t.Fatalf("cannot get session from session pool: %v", err)
740	}
741
742	// Wait for healthchecker to send pings to session.
743	waitFor(t, func() error {
744		pings := mock.DumpPings()
745		if len(pings) == 0 || pings[0] != sh.getID() {
746			return fmt.Errorf("healthchecker didn't send any ping to session %v", sh.getID())
747		}
748		return nil
749	})
750	// Test broken session detection.
751	sh, err = sp.take(ctx)
752	if err != nil {
753		t.Fatalf("cannot get session from session pool: %v", err)
754	}
755
756	atomic.SwapInt64(&requestShouldErr, 1)
757
758	// Wait for healthcheck workers to find the broken session and tear it down.
759	// TODO(deklerk): get rid of this
760	<-time.After(1 * time.Second)
761
762	s := sh.session
763	if sh.session.isValid() {
764		t.Fatalf("session(%v) is still alive, want it to be dropped by healthcheck workers", s)
765	}
766
767	atomic.SwapInt64(&requestShouldErr, 0)
768
769	// Test garbage collection.
770	sh, err = sp.take(ctx)
771	if err != nil {
772		t.Fatalf("cannot get session from session pool: %v", err)
773	}
774	sp.close()
775	if sh.session.isValid() {
776		t.Fatalf("session(%v) is still alive, want it to be garbage collected", s)
777	}
778}
779
780// TestStressSessionPool does stress test on session pool by the following concurrent operations:
781//	1) Test worker gets a session from the pool.
782//	2) Test worker turns a session back into the pool.
783//	3) Test worker destroys a session got from the pool.
784//	4) Healthcheck destroys a broken session (because a worker has already destroyed it).
785//	5) Test worker closes the session pool.
786//
787// During the test, the session pool maintainer maintains the number of sessions,
788// and it is expected that all sessions that are taken from session pool remains valid.
789// When all test workers and healthcheck workers exit, mockclient, session pool
790// and healthchecker should be in consistent state.
791func TestStressSessionPool(t *testing.T) {
792	t.Parallel()
793	ctx := context.Background()
794
795	// Use concurrent workers to test different session pool built from different configurations.
796	for ti, cfg := range []SessionPoolConfig{
797		{},
798		{MinOpened: 10, MaxOpened: 100},
799		{MaxBurst: 50},
800		{MinOpened: 10, MaxOpened: 200, MaxBurst: 5},
801		{MinOpened: 10, MaxOpened: 200, MaxBurst: 5, WriteSessions: 0.2},
802	} {
803		var wg sync.WaitGroup
804		// Create a more aggressive session healthchecker to increase test concurrency.
805		cfg.HealthCheckInterval = 50 * time.Millisecond
806		cfg.healthCheckSampleInterval = 10 * time.Millisecond
807		cfg.HealthCheckWorkers = 50
808		sc := testutil.NewMockCloudSpannerClient(t)
809		cfg.getRPCClient = func() (sppb.SpannerClient, error) {
810			return sc, nil
811		}
812		sp, _ := newSessionPool("mockdb", cfg, nil)
813		defer sp.hc.close()
814		defer sp.close()
815
816		for i := 0; i < 100; i++ {
817			wg.Add(1)
818			// Schedule a test worker.
819			go func(idx int, pool *sessionPool, client sppb.SpannerClient) {
820				defer wg.Done()
821				// Test worker iterates 1K times and tries different
822				// session / session pool operations.
823				for j := 0; j < 1000; j++ {
824					if idx%10 == 0 && j >= 900 {
825						// Close the pool in selected set of workers during the
826						// middle of the test.
827						pool.close()
828					}
829					// Take a write sessions ~ 20% of the times.
830					takeWrite := rand.Intn(5) == 4
831					var (
832						sh     *sessionHandle
833						gotErr error
834					)
835					if takeWrite {
836						sh, gotErr = pool.takeWriteSession(ctx)
837					} else {
838						sh, gotErr = pool.take(ctx)
839					}
840					if gotErr != nil {
841						if pool.isValid() {
842							t.Errorf("%v.%v: pool.take returns error when pool is still valid: %v", ti, idx, gotErr)
843						}
844						if wantErr := errInvalidSessionPool(); !testEqual(gotErr, wantErr) {
845							t.Errorf("%v.%v: got error when pool is closed: %v, want %v", ti, idx, gotErr, wantErr)
846						}
847						continue
848					}
849					// Verify if session is valid when session pool is valid.
850					// Note that if session pool is invalid after sh is taken,
851					// then sh might be invalidated by healthcheck workers.
852					if (sh.getID() == "" || sh.session == nil || !sh.session.isValid()) && pool.isValid() {
853						t.Errorf("%v.%v.%v: pool.take returns invalid session %v", ti, idx, takeWrite, sh.session)
854					}
855					if takeWrite && sh.getTransactionID() == nil {
856						t.Errorf("%v.%v: pool.takeWriteSession returns session %v without transaction", ti, idx, sh.session)
857					}
858					if rand.Intn(100) < idx {
859						// Random sleep before destroying/recycling the session,
860						// to give healthcheck worker a chance to step in.
861						<-time.After(time.Duration(rand.Int63n(int64(cfg.HealthCheckInterval))))
862					}
863					if rand.Intn(100) < idx {
864						// destroy the session.
865						sh.destroy()
866						continue
867					}
868					// recycle the session.
869					sh.recycle()
870				}
871			}(i, sp, sc)
872		}
873		wg.Wait()
874		sp.hc.close()
875		// Here the states of healthchecker, session pool and mockclient are
876		// stable.
877		idleSessions := map[string]bool{}
878		hcSessions := map[string]bool{}
879		mockSessions := sc.DumpSessions()
880		// Dump session pool's idle list.
881		for sl := sp.idleList.Front(); sl != nil; sl = sl.Next() {
882			s := sl.Value.(*session)
883			if idleSessions[s.getID()] {
884				t.Fatalf("%v: found duplicated session in idle list: %v", ti, s.getID())
885			}
886			idleSessions[s.getID()] = true
887		}
888		for sl := sp.idleWriteList.Front(); sl != nil; sl = sl.Next() {
889			s := sl.Value.(*session)
890			if idleSessions[s.getID()] {
891				t.Fatalf("%v: found duplicated session in idle write list: %v", ti, s.getID())
892			}
893			idleSessions[s.getID()] = true
894		}
895		sp.mu.Lock()
896		if int(sp.numOpened) != len(idleSessions) {
897			t.Fatalf("%v: number of opened sessions (%v) != number of idle sessions (%v)", ti, sp.numOpened, len(idleSessions))
898		}
899		if sp.createReqs != 0 {
900			t.Fatalf("%v: number of pending session creations = %v, want 0", ti, sp.createReqs)
901		}
902		// Dump healthcheck queue.
903		for _, s := range sp.hc.queue.sessions {
904			if hcSessions[s.getID()] {
905				t.Fatalf("%v: found duplicated session in healthcheck queue: %v", ti, s.getID())
906			}
907			hcSessions[s.getID()] = true
908		}
909		sp.mu.Unlock()
910
911		// Verify that idleSessions == hcSessions == mockSessions.
912		if !testEqual(idleSessions, hcSessions) {
913			t.Fatalf("%v: sessions in idle list (%v) != sessions in healthcheck queue (%v)", ti, idleSessions, hcSessions)
914		}
915		if !testEqual(hcSessions, mockSessions) {
916			t.Fatalf("%v: sessions in healthcheck queue (%v) != sessions in mockclient (%v)", ti, hcSessions, mockSessions)
917		}
918		sp.close()
919		mockSessions = sc.DumpSessions()
920		if len(mockSessions) != 0 {
921			t.Fatalf("Found live sessions: %v", mockSessions)
922		}
923	}
924}
925
926// TODO(deklerk): Investigate why this test is flakey, even with waitFor. Example
927// flakey failure: session_test.go:946: after 15s waiting, got Scale down.
928// Expect 5 open, got 6
929//
930// TestMaintainer checks the session pool maintainer maintains the number of
931// sessions in the following cases:
932//
933// 1. On initialization of session pool, replenish session pool to meet
934//    MinOpened or MaxIdle.
935// 2. On increased session usage, provision extra MaxIdle sessions.
936// 3. After the surge passes, scale down the session pool accordingly.
937func TestMaintainer(t *testing.T) {
938	t.Skip("asserting session state seems flakey")
939	t.Parallel()
940	ctx := context.Background()
941
942	minOpened := uint64(5)
943	maxIdle := uint64(4)
944	_, sp, _, cleanup := serverClientMock(t, SessionPoolConfig{MinOpened: minOpened, MaxIdle: maxIdle})
945	defer cleanup()
946
947	sampleInterval := sp.SessionPoolConfig.healthCheckSampleInterval
948
949	waitFor(t, func() error {
950		sp.mu.Lock()
951		defer sp.mu.Unlock()
952		if sp.numOpened != 5 {
953			return fmt.Errorf("Replenish. Expect %d open, got %d", sp.MinOpened, sp.numOpened)
954		}
955		return nil
956	})
957
958	// To save test time, we are not creating many sessions, because the time
959	// to create sessions will have impact on the decision on sessionsToKeep.
960	// We also parallelize the take and recycle process.
961	shs := make([]*sessionHandle, 10)
962	for i := 0; i < len(shs); i++ {
963		var err error
964		shs[i], err = sp.take(ctx)
965		if err != nil {
966			t.Fatalf("cannot get session from session pool: %v", err)
967		}
968	}
969	sp.mu.Lock()
970	if sp.numOpened != 10 {
971		t.Fatalf("Scale out from normal use. Expect %d open, got %d", 10, sp.numOpened)
972	}
973	sp.mu.Unlock()
974
975	<-time.After(sampleInterval)
976	for _, sh := range shs[:7] {
977		sh.recycle()
978	}
979
980	waitFor(t, func() error {
981		sp.mu.Lock()
982		defer sp.mu.Unlock()
983		if sp.numOpened != 7 {
984			return fmt.Errorf("Keep extra MaxIdle sessions. Expect %d open, got %d", 7, sp.numOpened)
985		}
986		return nil
987	})
988
989	for _, sh := range shs[7:] {
990		sh.recycle()
991	}
992	waitFor(t, func() error {
993		sp.mu.Lock()
994		defer sp.mu.Unlock()
995		if sp.numOpened != minOpened {
996			return fmt.Errorf("Scale down. Expect %d open, got %d", minOpened, sp.numOpened)
997		}
998		return nil
999	})
1000}
1001
1002// Tests that maintainer creates up to MinOpened connections.
1003//
1004// Historical context: This test also checks that a low
1005// healthCheckSampleInterval does not prevent it from opening connections.
1006// See: https://github.com/googleapis/google-cloud-go/issues/1259
1007func TestMaintainer_CreatesSessions(t *testing.T) {
1008	t.Parallel()
1009
1010	rawServerStub := testutil.NewMockCloudSpannerClient(t)
1011	serverClientMock := testutil.FuncMock{MockCloudSpannerClient: rawServerStub}
1012	serverClientMock.CreateSessionFn = func(c context.Context, r *sppb.CreateSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
1013		time.Sleep(10 * time.Millisecond)
1014		return rawServerStub.CreateSession(c, r, opts...)
1015	}
1016	spc := SessionPoolConfig{
1017		MinOpened:                 10,
1018		MaxIdle:                   10,
1019		healthCheckSampleInterval: time.Millisecond,
1020		getRPCClient: func() (sppb.SpannerClient, error) {
1021			return &serverClientMock, nil
1022		},
1023	}
1024	db := "mockdb"
1025	sp, err := newSessionPool(db, spc, nil)
1026	if err != nil {
1027		t.Fatalf("cannot create session pool: %v", err)
1028	}
1029	client := Client{
1030		database:     db,
1031		idleSessions: sp,
1032	}
1033	defer func() {
1034		client.Close()
1035		sp.hc.close()
1036		sp.close()
1037	}()
1038
1039	timeoutAmt := 2 * time.Second
1040	timeout := time.After(timeoutAmt)
1041	var numOpened uint64
1042loop:
1043	for {
1044		select {
1045		case <-timeout:
1046			t.Fatalf("timed out after %v, got %d session(s), want %d", timeoutAmt, numOpened, spc.MinOpened)
1047		default:
1048			sp.mu.Lock()
1049			numOpened = sp.numOpened
1050			sp.mu.Unlock()
1051			if numOpened == 10 {
1052				break loop
1053			}
1054		}
1055	}
1056}
1057
1058func (s1 *session) Equal(s2 *session) bool {
1059	return s1.client == s2.client &&
1060		s1.id == s2.id &&
1061		s1.pool == s2.pool &&
1062		s1.createTime == s2.createTime &&
1063		s1.valid == s2.valid &&
1064		s1.hcIndex == s2.hcIndex &&
1065		s1.idleList == s2.idleList &&
1066		s1.nextCheck.Equal(s2.nextCheck) &&
1067		s1.checkingHealth == s2.checkingHealth &&
1068		testEqual(s1.md, s2.md) &&
1069		bytes.Equal(s1.tx, s2.tx)
1070}
1071
1072func waitFor(t *testing.T, assert func() error) {
1073	t.Helper()
1074	timeout := 15 * time.Second
1075	ta := time.After(timeout)
1076
1077	for {
1078		select {
1079		case <-ta:
1080			if err := assert(); err != nil {
1081				t.Fatalf("after %v waiting, got %v", timeout, err)
1082			}
1083			return
1084		default:
1085		}
1086
1087		if err := assert(); err != nil {
1088			// Fail. Let's pause and retry.
1089			time.Sleep(10 * time.Millisecond)
1090			continue
1091		}
1092
1093		return
1094	}
1095}
1096