1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"fmt"
22	"io"
23	"math/big"
24	"os"
25	"strings"
26	"testing"
27	"time"
28
29	"cloud.google.com/go/civil"
30	itestutil "cloud.google.com/go/internal/testutil"
31	vkit "cloud.google.com/go/spanner/apiv1"
32	. "cloud.google.com/go/spanner/internal/testutil"
33	structpb "github.com/golang/protobuf/ptypes/struct"
34	gax "github.com/googleapis/gax-go/v2"
35	"google.golang.org/api/iterator"
36	"google.golang.org/api/option"
37	sppb "google.golang.org/genproto/googleapis/spanner/v1"
38	"google.golang.org/grpc/codes"
39	"google.golang.org/grpc/status"
40)
41
42func setupMockedTestServer(t *testing.T) (server *MockedSpannerInMemTestServer, client *Client, teardown func()) {
43	return setupMockedTestServerWithConfig(t, ClientConfig{})
44}
45
46func setupMockedTestServerWithConfig(t *testing.T, config ClientConfig) (server *MockedSpannerInMemTestServer, client *Client, teardown func()) {
47	return setupMockedTestServerWithConfigAndClientOptions(t, config, []option.ClientOption{})
48}
49
50func setupMockedTestServerWithConfigAndClientOptions(t *testing.T, config ClientConfig, clientOptions []option.ClientOption) (server *MockedSpannerInMemTestServer, client *Client, teardown func()) {
51	grpcHeaderChecker := &itestutil.HeadersEnforcer{
52		OnFailure: t.Fatalf,
53		Checkers: []*itestutil.HeaderChecker{
54			{
55				Key: "x-goog-api-client",
56				ValuesValidator: func(token ...string) error {
57					if len(token) != 1 {
58						return status.Errorf(codes.Internal, "unexpected number of api client token headers: %v", len(token))
59					}
60					if !strings.HasPrefix(token[0], "gl-go/") {
61						return status.Errorf(codes.Internal, "unexpected api client token: %v", token[0])
62					}
63					if !strings.Contains(token[0], "gccl/") {
64						return status.Errorf(codes.Internal, "unexpected api client token: %v", token[0])
65					}
66					return nil
67				},
68			},
69		},
70	}
71	clientOptions = append(clientOptions, grpcHeaderChecker.CallOptions()...)
72	server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
73	opts = append(opts, clientOptions...)
74	ctx := context.Background()
75	formattedDatabase := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "[PROJECT]", "[INSTANCE]", "[DATABASE]")
76	client, err := NewClientWithConfig(ctx, formattedDatabase, config, opts...)
77	if err != nil {
78		t.Fatal(err)
79	}
80	return server, client, func() {
81		client.Close()
82		serverTeardown()
83	}
84}
85
86// Test validDatabaseName()
87func TestValidDatabaseName(t *testing.T) {
88	validDbURI := "projects/spanner-cloud-test/instances/foo/databases/foodb"
89	invalidDbUris := []string{
90		// Completely wrong DB URI.
91		"foobarDB",
92		// Project ID contains "/".
93		"projects/spanner-cloud/test/instances/foo/databases/foodb",
94		// No instance ID.
95		"projects/spanner-cloud-test/instances//databases/foodb",
96	}
97	if err := validDatabaseName(validDbURI); err != nil {
98		t.Errorf("validateDatabaseName(%q) = %v, want nil", validDbURI, err)
99	}
100	for _, d := range invalidDbUris {
101		if err, wantErr := validDatabaseName(d), "should conform to pattern"; !strings.Contains(err.Error(), wantErr) {
102			t.Errorf("validateDatabaseName(%q) = %q, want error pattern %q", validDbURI, err, wantErr)
103		}
104	}
105}
106
107func TestReadOnlyTransactionClose(t *testing.T) {
108	// Closing a ReadOnlyTransaction shouldn't panic.
109	c := &Client{}
110	tx := c.ReadOnlyTransaction()
111	tx.Close()
112}
113
114func TestClient_Single(t *testing.T) {
115	t.Parallel()
116	err := testSingleQuery(t, nil)
117	if err != nil {
118		t.Fatal(err)
119	}
120}
121
122func TestClient_Single_Unavailable(t *testing.T) {
123	t.Parallel()
124	err := testSingleQuery(t, status.Error(codes.Unavailable, "Temporary unavailable"))
125	if err != nil {
126		t.Fatal(err)
127	}
128}
129
130func TestClient_Single_InvalidArgument(t *testing.T) {
131	t.Parallel()
132	err := testSingleQuery(t, status.Error(codes.InvalidArgument, "Invalid argument"))
133	if status.Code(err) != codes.InvalidArgument {
134		t.Fatalf("got: %v, want: %v", err, codes.InvalidArgument)
135	}
136}
137
138func TestClient_Single_SessionNotFound(t *testing.T) {
139	t.Parallel()
140
141	server, client, teardown := setupMockedTestServer(t)
142	defer teardown()
143	server.TestSpanner.PutExecutionTime(
144		MethodExecuteStreamingSql,
145		SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
146	)
147	ctx := context.Background()
148	iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
149	defer iter.Stop()
150	rowCount := int64(0)
151	for {
152		_, err := iter.Next()
153		if err == iterator.Done {
154			break
155		}
156		if err != nil {
157			t.Fatal(err)
158		}
159		rowCount++
160	}
161	if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
162		t.Fatalf("row count mismatch\nGot: %v\nWant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
163	}
164}
165
166func TestClient_Single_Read_SessionNotFound(t *testing.T) {
167	t.Parallel()
168
169	server, client, teardown := setupMockedTestServer(t)
170	defer teardown()
171	server.TestSpanner.PutExecutionTime(
172		MethodStreamingRead,
173		SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
174	)
175	ctx := context.Background()
176	iter := client.Single().Read(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"})
177	defer iter.Stop()
178	rowCount := int64(0)
179	for {
180		_, err := iter.Next()
181		if err == iterator.Done {
182			break
183		}
184		if err != nil {
185			t.Fatal(err)
186		}
187		rowCount++
188	}
189	if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
190		t.Fatalf("row count mismatch\nGot: %v\nWant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
191	}
192}
193
194func TestClient_Single_ReadRow_SessionNotFound(t *testing.T) {
195	t.Parallel()
196
197	server, client, teardown := setupMockedTestServer(t)
198	defer teardown()
199	server.TestSpanner.PutExecutionTime(
200		MethodStreamingRead,
201		SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
202	)
203	ctx := context.Background()
204	row, err := client.Single().ReadRow(ctx, "Albums", Key{"foo"}, []string{"SingerId", "AlbumId", "AlbumTitle"})
205	if err != nil {
206		t.Fatalf("Unexpected error for read row: %v", err)
207	}
208	if row == nil {
209		t.Fatal("ReadRow did not return a row")
210	}
211}
212
213func TestClient_Single_RetryableErrorOnPartialResultSet(t *testing.T) {
214	t.Parallel()
215	server, client, teardown := setupMockedTestServer(t)
216	defer teardown()
217
218	// Add two errors that will be returned by the mock server when the client
219	// is trying to fetch a partial result set. Both errors are retryable.
220	// The errors are not 'sticky' on the mocked server, i.e. once the error
221	// has been returned once, the next call for the same partial result set
222	// will succeed.
223
224	// When the client is fetching the partial result set with resume token 2,
225	// the mock server will respond with an internal error with the message
226	// 'stream terminated by RST_STREAM'. The client will retry the call to get
227	// this partial result set.
228	server.TestSpanner.AddPartialResultSetError(
229		SelectSingerIDAlbumIDAlbumTitleFromAlbums,
230		PartialResultSetExecutionTime{
231			ResumeToken: EncodeResumeToken(2),
232			Err:         status.Errorf(codes.Internal, "stream terminated by RST_STREAM"),
233		},
234	)
235	// When the client is fetching the partial result set with resume token 3,
236	// the mock server will respond with a 'Unavailable' error. The client will
237	// retry the call to get this partial result set.
238	server.TestSpanner.AddPartialResultSetError(
239		SelectSingerIDAlbumIDAlbumTitleFromAlbums,
240		PartialResultSetExecutionTime{
241			ResumeToken: EncodeResumeToken(3),
242			Err:         status.Errorf(codes.Unavailable, "server is unavailable"),
243		},
244	)
245	ctx := context.Background()
246	if err := executeSingerQuery(ctx, client.Single()); err != nil {
247		t.Fatal(err)
248	}
249}
250
251func TestClient_Single_NonRetryableErrorOnPartialResultSet(t *testing.T) {
252	t.Parallel()
253	server, client, teardown := setupMockedTestServer(t)
254	defer teardown()
255
256	// Add two errors that will be returned by the mock server when the client
257	// is trying to fetch a partial result set. The first error is retryable,
258	// the second is not.
259
260	// This error will automatically be retried.
261	server.TestSpanner.AddPartialResultSetError(
262		SelectSingerIDAlbumIDAlbumTitleFromAlbums,
263		PartialResultSetExecutionTime{
264			ResumeToken: EncodeResumeToken(2),
265			Err:         status.Errorf(codes.Internal, "stream terminated by RST_STREAM"),
266		},
267	)
268	// 'Session not found' is not retryable and the error will be returned to
269	// the user.
270	server.TestSpanner.AddPartialResultSetError(
271		SelectSingerIDAlbumIDAlbumTitleFromAlbums,
272		PartialResultSetExecutionTime{
273			ResumeToken: EncodeResumeToken(3),
274			Err:         newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"),
275		},
276	)
277	ctx := context.Background()
278	err := executeSingerQuery(ctx, client.Single())
279	if status.Code(err) != codes.NotFound {
280		t.Fatalf("Error mismatch:\ngot: %v\nwant: %v", err, codes.NotFound)
281	}
282}
283
284func TestClient_Single_NonRetryableInternalErrors(t *testing.T) {
285	t.Parallel()
286	server, client, teardown := setupMockedTestServer(t)
287	defer teardown()
288
289	server.TestSpanner.AddPartialResultSetError(
290		SelectSingerIDAlbumIDAlbumTitleFromAlbums,
291		PartialResultSetExecutionTime{
292			ResumeToken: EncodeResumeToken(2),
293			Err:         status.Errorf(codes.Internal, "grpc: error while marshaling: string field contains invalid UTF-8"),
294		},
295	)
296	ctx := context.Background()
297	err := executeSingerQuery(ctx, client.Single())
298	if status.Code(err) != codes.Internal {
299		t.Fatalf("Error mismatch:\ngot: %v\nwant: %v", err, codes.Internal)
300	}
301}
302
303func TestClient_Single_DeadlineExceeded_NoErrors(t *testing.T) {
304	t.Parallel()
305	server, client, teardown := setupMockedTestServer(t)
306	defer teardown()
307	server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
308		SimulatedExecutionTime{
309			MinimumExecutionTime: 50 * time.Millisecond,
310		})
311	ctx := context.Background()
312	ctx, cancel := context.WithDeadline(ctx, time.Now().Add(5*time.Millisecond))
313	defer cancel()
314	err := executeSingerQuery(ctx, client.Single())
315	if status.Code(err) != codes.DeadlineExceeded {
316		t.Fatalf("Error mismatch:\ngot: %v\nwant: %v", err, codes.DeadlineExceeded)
317	}
318}
319
320func TestClient_Single_DeadlineExceeded_WithErrors(t *testing.T) {
321	t.Parallel()
322	server, client, teardown := setupMockedTestServer(t)
323	defer teardown()
324	server.TestSpanner.AddPartialResultSetError(
325		SelectSingerIDAlbumIDAlbumTitleFromAlbums,
326		PartialResultSetExecutionTime{
327			ResumeToken: EncodeResumeToken(2),
328			Err:         status.Errorf(codes.Internal, "stream terminated by RST_STREAM"),
329		},
330	)
331	server.TestSpanner.AddPartialResultSetError(
332		SelectSingerIDAlbumIDAlbumTitleFromAlbums,
333		PartialResultSetExecutionTime{
334			ResumeToken:   EncodeResumeToken(3),
335			Err:           status.Errorf(codes.Unavailable, "server is unavailable"),
336			ExecutionTime: 50 * time.Millisecond,
337		},
338	)
339	ctx := context.Background()
340	ctx, cancel := context.WithDeadline(ctx, time.Now().Add(25*time.Millisecond))
341	defer cancel()
342	err := executeSingerQuery(ctx, client.Single())
343	if status.Code(err) != codes.DeadlineExceeded {
344		t.Fatalf("got unexpected error %v, expected DeadlineExceeded", err)
345	}
346}
347
348func TestClient_Single_ContextCanceled_noDeclaredServerErrors(t *testing.T) {
349	t.Parallel()
350	_, client, teardown := setupMockedTestServer(t)
351	defer teardown()
352	ctx := context.Background()
353	ctx, cancel := context.WithCancel(ctx)
354	cancel()
355	err := executeSingerQuery(ctx, client.Single())
356	if status.Code(err) != codes.Canceled {
357		t.Fatalf("got unexpected error %v, expected Canceled", err)
358	}
359}
360
361func TestClient_Single_ContextCanceled_withDeclaredServerErrors(t *testing.T) {
362	t.Parallel()
363	server, client, teardown := setupMockedTestServer(t)
364	defer teardown()
365	server.TestSpanner.AddPartialResultSetError(
366		SelectSingerIDAlbumIDAlbumTitleFromAlbums,
367		PartialResultSetExecutionTime{
368			ResumeToken: EncodeResumeToken(2),
369			Err:         status.Errorf(codes.Internal, "stream terminated by RST_STREAM"),
370		},
371	)
372	server.TestSpanner.AddPartialResultSetError(
373		SelectSingerIDAlbumIDAlbumTitleFromAlbums,
374		PartialResultSetExecutionTime{
375			ResumeToken: EncodeResumeToken(3),
376			Err:         status.Errorf(codes.Unavailable, "server is unavailable"),
377		},
378	)
379	ctx := context.Background()
380	ctx, cancel := context.WithCancel(ctx)
381	defer cancel()
382	f := func(rowCount int64) error {
383		if rowCount == 2 {
384			cancel()
385		}
386		return nil
387	}
388	iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
389	defer iter.Stop()
390	err := executeSingerQueryWithRowFunc(ctx, client.Single(), f)
391	if status.Code(err) != codes.Canceled {
392		t.Fatalf("got unexpected error %v, expected Canceled", err)
393	}
394}
395
396func TestClient_Single_QueryOptions(t *testing.T) {
397	for _, tt := range queryOptionsTestCases() {
398		t.Run(tt.name, func(t *testing.T) {
399			if tt.env.Options != nil {
400				unset := setQueryOptionsEnvVars(tt.env.Options)
401				defer unset()
402			}
403
404			ctx := context.Background()
405			server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: tt.client})
406			defer teardown()
407
408			var iter *RowIterator
409			if tt.query.Options == nil {
410				iter = client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
411			} else {
412				iter = client.Single().QueryWithOptions(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), tt.query)
413			}
414			testQueryOptions(t, iter, server.TestSpanner, tt.want)
415		})
416	}
417}
418
419func TestClient_ReturnDatabaseName(t *testing.T) {
420	t.Parallel()
421
422	_, client, teardown := setupMockedTestServer(t)
423	defer teardown()
424
425	got := client.DatabaseName()
426	want := "projects/[PROJECT]/instances/[INSTANCE]/databases/[DATABASE]"
427	if got != want {
428		t.Fatalf("Incorrect database name returned, got: %s, want: %s", got, want)
429	}
430}
431
432func testQueryOptions(t *testing.T, iter *RowIterator, server InMemSpannerServer, qo QueryOptions) {
433	defer iter.Stop()
434
435	_, err := iter.Next()
436	if err != nil {
437		t.Fatalf("Failed to read from the iterator: %v", err)
438	}
439
440	checkReqsForQueryOptions(t, server, qo)
441}
442
443func checkReqsForQueryOptions(t *testing.T, server InMemSpannerServer, qo QueryOptions) {
444	reqs := drainRequestsFromServer(server)
445	sqlReqs := []*sppb.ExecuteSqlRequest{}
446
447	for _, req := range reqs {
448		if sqlReq, ok := req.(*sppb.ExecuteSqlRequest); ok {
449			sqlReqs = append(sqlReqs, sqlReq)
450		}
451	}
452
453	if got, want := len(sqlReqs), 1; got != want {
454		t.Fatalf("Length mismatch, got %v, want %v", got, want)
455	}
456
457	reqQueryOptions := sqlReqs[0].QueryOptions
458	if got, want := reqQueryOptions.OptimizerVersion, qo.Options.OptimizerVersion; got != want {
459		t.Fatalf("Optimizer version mismatch, got %v, want %v", got, want)
460	}
461	if got, want := reqQueryOptions.OptimizerStatisticsPackage, qo.Options.OptimizerStatisticsPackage; got != want {
462		t.Fatalf("Optimizer statistics package mismatch, got %v, want %v", got, want)
463	}
464}
465
466func testSingleQuery(t *testing.T, serverError error) error {
467	ctx := context.Background()
468	server, client, teardown := setupMockedTestServer(t)
469	defer teardown()
470	if serverError != nil {
471		server.TestSpanner.SetError(serverError)
472	}
473	return executeSingerQuery(ctx, client.Single())
474}
475
476func executeSingerQuery(ctx context.Context, tx *ReadOnlyTransaction) error {
477	return executeSingerQueryWithRowFunc(ctx, tx, nil)
478}
479
480func executeSingerQueryWithRowFunc(ctx context.Context, tx *ReadOnlyTransaction, f func(rowCount int64) error) error {
481	iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
482	defer iter.Stop()
483	rowCount := int64(0)
484	for {
485		row, err := iter.Next()
486		if err == iterator.Done {
487			break
488		}
489		if err != nil {
490			return err
491		}
492		var singerID, albumID int64
493		var albumTitle string
494		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
495			return err
496		}
497		rowCount++
498		if f != nil {
499			if err := f(rowCount); err != nil {
500				return err
501			}
502		}
503	}
504	if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
505		return status.Errorf(codes.Internal, "Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
506	}
507	return nil
508}
509
510func createSimulatedExecutionTimeWithTwoUnavailableErrors(method string) map[string]SimulatedExecutionTime {
511	errors := make([]error, 2)
512	errors[0] = status.Error(codes.Unavailable, "Temporary unavailable")
513	errors[1] = status.Error(codes.Unavailable, "Temporary unavailable")
514	executionTimes := make(map[string]SimulatedExecutionTime)
515	executionTimes[method] = SimulatedExecutionTime{
516		Errors: errors,
517	}
518	return executionTimes
519}
520
521func TestClient_ReadOnlyTransaction(t *testing.T) {
522	t.Parallel()
523	if err := testReadOnlyTransaction(t, make(map[string]SimulatedExecutionTime)); err != nil {
524		t.Fatal(err)
525	}
526}
527
528func TestClient_ReadOnlyTransaction_UnavailableOnSessionCreate(t *testing.T) {
529	t.Parallel()
530	if err := testReadOnlyTransaction(t, createSimulatedExecutionTimeWithTwoUnavailableErrors(MethodCreateSession)); err != nil {
531		t.Fatal(err)
532	}
533}
534
535func TestClient_ReadOnlyTransaction_UnavailableOnBeginTransaction(t *testing.T) {
536	t.Parallel()
537	if err := testReadOnlyTransaction(t, createSimulatedExecutionTimeWithTwoUnavailableErrors(MethodBeginTransaction)); err != nil {
538		t.Fatal(err)
539	}
540}
541
542func TestClient_ReadOnlyTransaction_UnavailableOnExecuteStreamingSql(t *testing.T) {
543	t.Parallel()
544	if err := testReadOnlyTransaction(t, createSimulatedExecutionTimeWithTwoUnavailableErrors(MethodExecuteStreamingSql)); err != nil {
545		t.Fatal(err)
546	}
547}
548
549func TestClient_ReadOnlyTransaction_SessionNotFoundOnExecuteStreamingSql(t *testing.T) {
550	t.Parallel()
551	// Session not found is not retryable for a query on a multi-use read-only
552	// transaction, as we would need to start a new transaction on a new
553	// session.
554	err := testReadOnlyTransaction(t, map[string]SimulatedExecutionTime{
555		MethodExecuteStreamingSql: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
556	})
557	want := ToSpannerError(newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"))
558	if err == nil {
559		t.Fatalf("missing expected error\nGot: nil\nWant: %v", want)
560	}
561	if status.Code(err) != status.Code(want) || !strings.Contains(err.Error(), want.Error()) {
562		t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, want)
563	}
564}
565
566func TestClient_ReadOnlyTransaction_UnavailableOnCreateSessionAndBeginTransaction(t *testing.T) {
567	t.Parallel()
568	exec := map[string]SimulatedExecutionTime{
569		MethodCreateSession:    {Errors: []error{status.Error(codes.Unavailable, "Temporary unavailable")}},
570		MethodBeginTransaction: {Errors: []error{status.Error(codes.Unavailable, "Temporary unavailable")}},
571	}
572	if err := testReadOnlyTransaction(t, exec); err != nil {
573		t.Fatal(err)
574	}
575}
576
577func TestClient_ReadOnlyTransaction_UnavailableOnCreateSessionAndInvalidArgumentOnBeginTransaction(t *testing.T) {
578	t.Parallel()
579	exec := map[string]SimulatedExecutionTime{
580		MethodCreateSession:    {Errors: []error{status.Error(codes.Unavailable, "Temporary unavailable")}},
581		MethodBeginTransaction: {Errors: []error{status.Error(codes.InvalidArgument, "Invalid argument")}},
582	}
583	if err := testReadOnlyTransaction(t, exec); err == nil {
584		t.Fatalf("Missing expected exception")
585	} else if status.Code(err) != codes.InvalidArgument {
586		t.Fatalf("Got unexpected exception: %v", err)
587	}
588}
589
590func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction(t *testing.T) {
591	t.Parallel()
592	if err := testReadOnlyTransaction(
593		t,
594		map[string]SimulatedExecutionTime{
595			MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
596		},
597	); err != nil {
598		t.Fatal(err)
599	}
600}
601
602func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction_WithMaxOneSession(t *testing.T) {
603	t.Parallel()
604	server, client, teardown := setupMockedTestServerWithConfig(
605		t,
606		ClientConfig{
607			SessionPoolConfig: SessionPoolConfig{
608				MinOpened: 0,
609				MaxOpened: 1,
610			},
611		})
612	defer teardown()
613	server.TestSpanner.PutExecutionTime(
614		MethodBeginTransaction,
615		SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
616	)
617	tx := client.ReadOnlyTransaction()
618	defer tx.Close()
619	ctx := context.Background()
620	if err := executeSingerQuery(ctx, tx); err != nil {
621		t.Fatal(err)
622	}
623}
624
625func TestClient_ReadOnlyTransaction_QueryOptions(t *testing.T) {
626	for _, tt := range queryOptionsTestCases() {
627		t.Run(tt.name, func(t *testing.T) {
628			if tt.env.Options != nil {
629				unset := setQueryOptionsEnvVars(tt.env.Options)
630				defer unset()
631			}
632
633			ctx := context.Background()
634			server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: tt.client})
635			defer teardown()
636
637			tx := client.ReadOnlyTransaction()
638			defer tx.Close()
639
640			var iter *RowIterator
641			if tt.query.Options == nil {
642				iter = tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
643			} else {
644				iter = tx.QueryWithOptions(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), tt.query)
645			}
646			testQueryOptions(t, iter, server.TestSpanner, tt.want)
647		})
648	}
649}
650
651func setQueryOptionsEnvVars(opts *sppb.ExecuteSqlRequest_QueryOptions) func() {
652	os.Setenv("SPANNER_OPTIMIZER_VERSION", opts.OptimizerVersion)
653	os.Setenv("SPANNER_OPTIMIZER_STATISTICS_PACKAGE", opts.OptimizerStatisticsPackage)
654	return func() {
655		defer os.Setenv("SPANNER_OPTIMIZER_VERSION", "")
656		defer os.Setenv("SPANNER_OPTIMIZER_STATISTICS_PACKAGE", "")
657	}
658}
659
660func testReadOnlyTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime) error {
661	server, client, teardown := setupMockedTestServer(t)
662	defer teardown()
663	for method, exec := range executionTimes {
664		server.TestSpanner.PutExecutionTime(method, exec)
665	}
666	tx := client.ReadOnlyTransaction()
667	defer tx.Close()
668	ctx := context.Background()
669	return executeSingerQuery(ctx, tx)
670}
671
672func TestClient_ReadWriteTransaction(t *testing.T) {
673	t.Parallel()
674	if err := testReadWriteTransaction(t, make(map[string]SimulatedExecutionTime), 1); err != nil {
675		t.Fatal(err)
676	}
677}
678
679func TestClient_ReadWriteTransactionCommitAborted(t *testing.T) {
680	t.Parallel()
681	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
682		MethodCommitTransaction: {Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}},
683	}, 2); err != nil {
684		t.Fatal(err)
685	}
686}
687
688func TestClient_ReadWriteTransaction_SessionNotFoundOnCommit(t *testing.T) {
689	t.Parallel()
690	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
691		MethodCommitTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
692	}, 2); err != nil {
693		t.Fatal(err)
694	}
695}
696
697func TestClient_ReadWriteTransaction_SessionNotFoundOnBeginTransaction(t *testing.T) {
698	t.Parallel()
699	// We expect only 1 attempt, as the 'Session not found' error is already
700	//handled in the session pool where the session is prepared.
701	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
702		MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
703	}, 1); err != nil {
704		t.Fatal(err)
705	}
706}
707
708func TestClient_ReadWriteTransaction_SessionNotFoundOnBeginTransactionWithEmptySessionPool(t *testing.T) {
709	t.Parallel()
710	// There will be no prepared sessions in the pool, so the error will occur
711	// when the transaction tries to get a session from the pool. This will
712	// also be handled by the session pool, so the transaction itself does not
713	// need to retry, hence the expectedAttempts == 1.
714	if err := testReadWriteTransactionWithConfig(t, ClientConfig{
715		SessionPoolConfig: SessionPoolConfig{WriteSessions: 0.0},
716	}, map[string]SimulatedExecutionTime{
717		MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
718	}, 1); err != nil {
719		t.Fatal(err)
720	}
721}
722
723func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteStreamingSql(t *testing.T) {
724	t.Parallel()
725	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
726		MethodExecuteStreamingSql: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
727	}, 2); err != nil {
728		t.Fatal(err)
729	}
730}
731
732func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteUpdate(t *testing.T) {
733	t.Parallel()
734
735	server, client, teardown := setupMockedTestServer(t)
736	defer teardown()
737	server.TestSpanner.PutExecutionTime(
738		MethodExecuteSql,
739		SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
740	)
741	ctx := context.Background()
742	var attempts int
743	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
744		attempts++
745		rowCount, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
746		if err != nil {
747			return err
748		}
749		if g, w := rowCount, int64(UpdateBarSetFooRowCount); g != w {
750			return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w)
751		}
752		return nil
753	})
754	if err != nil {
755		t.Fatal(err)
756	}
757	if g, w := attempts, 2; g != w {
758		t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w)
759	}
760}
761
762func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *testing.T) {
763	t.Parallel()
764
765	server, client, teardown := setupMockedTestServer(t)
766	defer teardown()
767	server.TestSpanner.PutExecutionTime(
768		MethodExecuteBatchDml,
769		SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
770	)
771	ctx := context.Background()
772	var attempts int
773	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
774		attempts++
775		rowCounts, err := tx.BatchUpdate(ctx, []Statement{NewStatement(UpdateBarSetFoo)})
776		if err != nil {
777			return err
778		}
779		if g, w := len(rowCounts), 1; g != w {
780			return status.Errorf(codes.FailedPrecondition, "Row counts length mismatch\nGot: %v\nWant: %v", g, w)
781		}
782		if g, w := rowCounts[0], int64(UpdateBarSetFooRowCount); g != w {
783			return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w)
784		}
785		return nil
786	})
787	if err != nil {
788		t.Fatal(err)
789	}
790	if g, w := attempts, 2; g != w {
791		t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w)
792	}
793}
794
795func TestClient_ReadWriteTransaction_Query_QueryOptions(t *testing.T) {
796	for _, tt := range queryOptionsTestCases() {
797		t.Run(tt.name, func(t *testing.T) {
798			if tt.env.Options != nil {
799				unset := setQueryOptionsEnvVars(tt.env.Options)
800				defer unset()
801			}
802
803			ctx := context.Background()
804			server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: tt.client})
805			defer teardown()
806
807			_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
808				var iter *RowIterator
809				if tt.query.Options == nil {
810					iter = tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
811				} else {
812					iter = tx.QueryWithOptions(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), tt.query)
813				}
814				testQueryOptions(t, iter, server.TestSpanner, tt.want)
815				return nil
816			})
817			if err != nil {
818				t.Fatal(err)
819			}
820		})
821	}
822}
823
824func TestClient_ReadWriteTransaction_Update_QueryOptions(t *testing.T) {
825	for _, tt := range queryOptionsTestCases() {
826		t.Run(tt.name, func(t *testing.T) {
827			if tt.env.Options != nil {
828				unset := setQueryOptionsEnvVars(tt.env.Options)
829				defer unset()
830			}
831
832			ctx := context.Background()
833			server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: tt.client})
834			defer teardown()
835
836			_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
837				var rowCount int64
838				var err error
839				if tt.query.Options == nil {
840					rowCount, err = tx.Update(ctx, NewStatement(UpdateBarSetFoo))
841				} else {
842					rowCount, err = tx.UpdateWithOptions(ctx, NewStatement(UpdateBarSetFoo), tt.query)
843				}
844				if got, want := rowCount, int64(5); got != want {
845					t.Fatalf("Incorrect updated row count: got %v, want %v", got, want)
846				}
847				return err
848			})
849			if err != nil {
850				t.Fatalf("Failed to update rows: %v", err)
851			}
852			checkReqsForQueryOptions(t, server.TestSpanner, tt.want)
853		})
854	}
855}
856
857func TestClient_ReadWriteTransactionWithOptions(t *testing.T) {
858	_, client, teardown := setupMockedTestServer(t)
859	defer teardown()
860	ctx := context.Background()
861	resp, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
862		iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
863		defer iter.Stop()
864		rowCount := int64(0)
865		for {
866			row, err := iter.Next()
867			if err == iterator.Done {
868				break
869			}
870			if err != nil {
871				return err
872			}
873			var singerID, albumID int64
874			var albumTitle string
875			if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
876				return err
877			}
878			rowCount++
879		}
880		if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
881			return status.Errorf(codes.FailedPrecondition, "Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
882		}
883		return nil
884	}, TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}})
885	if err != nil {
886		t.Fatalf("Failed to execute the transaction: %s", err)
887	}
888	if got, want := resp.CommitStats.MutationCount, int64(1); got != want {
889		t.Fatalf("Mismatch mutation count - got: %d, want: %d", got, want)
890	}
891}
892
893func TestClient_ReadWriteStmtBasedTransactionWithOptions(t *testing.T) {
894	_, client, teardown := setupMockedTestServer(t)
895	defer teardown()
896	ctx := context.Background()
897	tx, err := NewReadWriteStmtBasedTransactionWithOptions(
898		ctx,
899		client,
900		TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}})
901	if err != nil {
902		t.Fatalf("Unexpected error when creating transaction: %v", err)
903	}
904
905	iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
906	defer iter.Stop()
907	rowCount := int64(0)
908	for {
909		row, err := iter.Next()
910		if err == iterator.Done {
911			break
912		}
913		if err != nil {
914			t.Fatalf("Unexpected error when fetching query results: %v", err)
915		}
916		var singerID, albumID int64
917		var albumTitle string
918		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
919			t.Fatalf("Unexpected error when getting query data: %v", err)
920		}
921		rowCount++
922	}
923	resp, err := tx.CommitWithReturnResp(ctx)
924	if err != nil {
925		t.Fatalf("Unexpected error when committing transaction: %v", err)
926	}
927	if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
928		t.Errorf("Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
929	}
930	if got, want := resp.CommitStats.MutationCount, int64(1); got != want {
931		t.Fatalf("Mismatch mutation count - got: %d, want: %d", got, want)
932	}
933}
934
935func TestClient_ReadWriteTransaction_DoNotLeakSessionOnPanic(t *testing.T) {
936	// Make sure that there is always only one session in the pool.
937	sc := SessionPoolConfig{
938		MinOpened: 1,
939		MaxOpened: 1,
940	}
941	_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{SessionPoolConfig: sc})
942	defer teardown()
943	ctx := context.Background()
944
945	// If a panic occurs during a transaction, the session will not leak.
946	func() {
947		defer func() { recover() }()
948
949		_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
950			panic("cause panic")
951			return nil
952		})
953		if err != nil {
954			t.Fatalf("Unexpected error during transaction: %v", err)
955		}
956	}()
957
958	if g, w := client.idleSessions.idleList.Len(), 1; g != w {
959		t.Fatalf("idle session count mismatch.\nGot: %v\nWant: %v", g, w)
960	}
961}
962
963func TestClient_SessionNotFound(t *testing.T) {
964	// Ensure we always have at least one session in the pool.
965	sc := SessionPoolConfig{
966		MinOpened: 1,
967	}
968	server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{SessionPoolConfig: sc})
969	defer teardown()
970	ctx := context.Background()
971	for {
972		client.idleSessions.mu.Lock()
973		numSessions := client.idleSessions.idleList.Len()
974		client.idleSessions.mu.Unlock()
975		if numSessions > 0 {
976			break
977		}
978		time.After(time.Millisecond)
979	}
980	// Remove the session from the server without the pool knowing it.
981	_, err := server.TestSpanner.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: client.idleSessions.idleList.Front().Value.(*session).id})
982	if err != nil {
983		t.Fatalf("Failed to delete session unexpectedly: %v", err)
984	}
985
986	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
987		iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
988		defer iter.Stop()
989		rowCount := int64(0)
990		for {
991			row, err := iter.Next()
992			if err == iterator.Done {
993				break
994			}
995			if err != nil {
996				return err
997			}
998			var singerID, albumID int64
999			var albumTitle string
1000			if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
1001				return err
1002			}
1003			rowCount++
1004		}
1005		if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
1006			return spannerErrorf(codes.FailedPrecondition, "Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
1007		}
1008		return nil
1009	})
1010	if err != nil {
1011		t.Fatalf("Unexpected error during transaction: %v", err)
1012	}
1013}
1014
1015func TestClient_ReadWriteTransactionExecuteStreamingSqlAborted(t *testing.T) {
1016	t.Parallel()
1017	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
1018		MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}},
1019	}, 2); err != nil {
1020		t.Fatal(err)
1021	}
1022}
1023
1024func TestClient_ReadWriteTransaction_UnavailableOnBeginTransaction(t *testing.T) {
1025	t.Parallel()
1026	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
1027		MethodBeginTransaction: {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}},
1028	}, 1); err != nil {
1029		t.Fatal(err)
1030	}
1031}
1032
1033func TestClient_ReadWriteTransaction_UnavailableOnBeginAndAbortOnCommit(t *testing.T) {
1034	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
1035		MethodBeginTransaction:  {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}},
1036		MethodCommitTransaction: {Errors: []error{status.Error(codes.Aborted, "Aborted")}},
1037	}, 2); err != nil {
1038		t.Fatal(err)
1039	}
1040}
1041
1042func TestClient_ReadWriteTransaction_UnavailableOnExecuteStreamingSql(t *testing.T) {
1043	t.Parallel()
1044	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
1045		MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}},
1046	}, 1); err != nil {
1047		t.Fatal(err)
1048	}
1049}
1050
1051func TestClient_ReadWriteTransaction_UnavailableOnBeginAndExecuteStreamingSqlAndTwiceAbortOnCommit(t *testing.T) {
1052	t.Parallel()
1053	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
1054		MethodBeginTransaction:    {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}},
1055		MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}},
1056		MethodCommitTransaction:   {Errors: []error{status.Error(codes.Aborted, "Aborted"), status.Error(codes.Aborted, "Aborted")}},
1057	}, 3); err != nil {
1058		t.Fatal(err)
1059	}
1060}
1061
1062func TestClient_ReadWriteTransaction_CommitAborted(t *testing.T) {
1063	t.Parallel()
1064	server, client, teardown := setupMockedTestServer(t)
1065	server.TestSpanner.PutExecutionTime(MethodCommitTransaction, SimulatedExecutionTime{
1066		Errors: []error{status.Error(codes.Aborted, "Aborted")},
1067	})
1068	defer teardown()
1069	ctx := context.Background()
1070	attempts := 0
1071	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1072		attempts++
1073		_, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo})
1074		if err != nil {
1075			return err
1076		}
1077		return nil
1078	})
1079	if err != nil {
1080		t.Fatal(err)
1081	}
1082	if g, w := attempts, 2; g != w {
1083		t.Fatalf("attempt count mismatch:\nWant: %v\nGot: %v", w, g)
1084	}
1085}
1086
1087func TestClient_ReadWriteTransaction_DMLAborted(t *testing.T) {
1088	t.Parallel()
1089	server, client, teardown := setupMockedTestServer(t)
1090	server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{
1091		Errors: []error{status.Error(codes.Aborted, "Aborted")},
1092	})
1093	defer teardown()
1094	ctx := context.Background()
1095	attempts := 0
1096	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1097		attempts++
1098		_, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo})
1099		if err != nil {
1100			return err
1101		}
1102		return nil
1103	})
1104	if err != nil {
1105		t.Fatal(err)
1106	}
1107	if g, w := attempts, 2; g != w {
1108		t.Fatalf("attempt count mismatch:\nWant: %v\nGot: %v", w, g)
1109	}
1110}
1111
1112func TestClient_ReadWriteTransaction_BatchDMLAborted(t *testing.T) {
1113	t.Parallel()
1114	server, client, teardown := setupMockedTestServer(t)
1115	server.TestSpanner.PutExecutionTime(MethodExecuteBatchDml, SimulatedExecutionTime{
1116		Errors: []error{status.Error(codes.Aborted, "Aborted")},
1117	})
1118	defer teardown()
1119	ctx := context.Background()
1120	attempts := 0
1121	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1122		attempts++
1123		_, err := tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}})
1124		if err != nil {
1125			return err
1126		}
1127		return nil
1128	})
1129	if err != nil {
1130		t.Fatal(err)
1131	}
1132	if g, w := attempts, 2; g != w {
1133		t.Fatalf("attempt count mismatch:\nWant: %v\nGot: %v", w, g)
1134	}
1135}
1136
1137func TestClient_ReadWriteTransaction_BatchDMLAbortedHalfway(t *testing.T) {
1138	t.Parallel()
1139	server, client, teardown := setupMockedTestServer(t)
1140	defer teardown()
1141	abortedStatement := "UPDATE FOO_ABORTED SET BAR=1 WHERE ID=2"
1142	server.TestSpanner.PutStatementResult(
1143		abortedStatement,
1144		&StatementResult{
1145			Type: StatementResultError,
1146			Err:  status.Error(codes.Aborted, "Statement was aborted"),
1147		},
1148	)
1149	ctx := context.Background()
1150	var updateCounts []int64
1151	attempts := 0
1152	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1153		attempts++
1154		if attempts > 1 {
1155			// Replace the aborted result with a real result to prevent the
1156			// transaction from aborting indefinitely.
1157			server.TestSpanner.PutStatementResult(
1158				abortedStatement,
1159				&StatementResult{
1160					Type:        StatementResultUpdateCount,
1161					UpdateCount: 3,
1162				},
1163			)
1164		}
1165		var err error
1166		updateCounts, err = tx.BatchUpdate(ctx, []Statement{{SQL: abortedStatement}, {SQL: UpdateBarSetFoo}})
1167		if err != nil {
1168			return err
1169		}
1170		return nil
1171	})
1172	if err != nil {
1173		t.Fatal(err)
1174	}
1175	if g, w := attempts, 2; g != w {
1176		t.Fatalf("attempt count mismatch:\nWant: %v\nGot: %v", w, g)
1177	}
1178	if g, w := updateCounts, []int64{3, UpdateBarSetFooRowCount}; !testEqual(w, g) {
1179		t.Fatalf("update count mismatch\nWant: %v\nGot: %v", w, g)
1180	}
1181}
1182
1183func TestClient_ReadWriteTransaction_QueryAborted(t *testing.T) {
1184	t.Parallel()
1185	server, client, teardown := setupMockedTestServer(t)
1186	server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql, SimulatedExecutionTime{
1187		Errors: []error{status.Error(codes.Aborted, "Aborted")},
1188	})
1189	defer teardown()
1190	ctx := context.Background()
1191	attempts := 0
1192	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1193		attempts++
1194		iter := tx.Query(ctx, Statement{SQL: SelectFooFromBar})
1195		defer iter.Stop()
1196		for {
1197			_, err := iter.Next()
1198			if err == iterator.Done {
1199				break
1200			}
1201			if err != nil {
1202				return err
1203			}
1204		}
1205		return nil
1206	})
1207	if err != nil {
1208		t.Fatal(err)
1209	}
1210	if g, w := attempts, 2; g != w {
1211		t.Fatalf("attempt count mismatch:\nWant: %v\nGot: %v", w, g)
1212	}
1213}
1214
1215func TestClient_ReadWriteTransaction_AbortedOnExecuteStreamingSqlAndCommit(t *testing.T) {
1216	t.Parallel()
1217	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
1218		MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.Aborted, "Aborted")}},
1219		MethodCommitTransaction:   {Errors: []error{status.Error(codes.Aborted, "Aborted"), status.Error(codes.Aborted, "Aborted")}},
1220	}, 4); err != nil {
1221		t.Fatal(err)
1222	}
1223}
1224
1225func TestClient_ReadWriteTransactionCommitAbortedAndUnavailable(t *testing.T) {
1226	t.Parallel()
1227	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
1228		MethodCommitTransaction: {
1229			Errors: []error{
1230				status.Error(codes.Aborted, "Transaction aborted"),
1231				status.Error(codes.Unavailable, "Unavailable"),
1232			},
1233		},
1234	}, 2); err != nil {
1235		t.Fatal(err)
1236	}
1237}
1238
1239func TestClient_ReadWriteTransactionCommitAlreadyExists(t *testing.T) {
1240	t.Parallel()
1241	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
1242		MethodCommitTransaction: {Errors: []error{status.Error(codes.AlreadyExists, "A row with this key already exists")}},
1243	}, 1); err != nil {
1244		if status.Code(err) != codes.AlreadyExists {
1245			t.Fatalf("Got unexpected error %v, expected %v", err, codes.AlreadyExists)
1246		}
1247	} else {
1248		t.Fatalf("Missing expected exception")
1249	}
1250}
1251
1252func testReadWriteTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime, expectedAttempts int) error {
1253	return testReadWriteTransactionWithConfig(t, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, executionTimes, expectedAttempts)
1254}
1255
1256func testReadWriteTransactionWithConfig(t *testing.T, config ClientConfig, executionTimes map[string]SimulatedExecutionTime, expectedAttempts int) error {
1257	server, client, teardown := setupMockedTestServer(t)
1258	defer teardown()
1259	for method, exec := range executionTimes {
1260		server.TestSpanner.PutExecutionTime(method, exec)
1261	}
1262	ctx := context.Background()
1263	var attempts int
1264	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1265		attempts++
1266		iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
1267		defer iter.Stop()
1268		rowCount := int64(0)
1269		for {
1270			row, err := iter.Next()
1271			if err == iterator.Done {
1272				break
1273			}
1274			if err != nil {
1275				return err
1276			}
1277			var singerID, albumID int64
1278			var albumTitle string
1279			if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
1280				return err
1281			}
1282			rowCount++
1283		}
1284		if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
1285			return status.Errorf(codes.FailedPrecondition, "Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
1286		}
1287		return nil
1288	})
1289	if err != nil {
1290		return err
1291	}
1292	if expectedAttempts != attempts {
1293		t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
1294	}
1295	return nil
1296}
1297
1298func TestClient_ApplyAtLeastOnce(t *testing.T) {
1299	t.Parallel()
1300	server, client, teardown := setupMockedTestServer(t)
1301	defer teardown()
1302	ms := []*Mutation{
1303		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1304		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1305	}
1306	server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
1307		SimulatedExecutionTime{
1308			Errors: []error{status.Error(codes.Aborted, "Transaction aborted")},
1309		})
1310	_, err := client.Apply(context.Background(), ms, ApplyAtLeastOnce())
1311	if err != nil {
1312		t.Fatal(err)
1313	}
1314}
1315
1316func TestClient_ApplyAtLeastOnceReuseSession(t *testing.T) {
1317	t.Parallel()
1318	server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
1319		SessionPoolConfig: SessionPoolConfig{
1320			MinOpened:           0,
1321			WriteSessions:       0.0,
1322			TrackSessionHandles: true,
1323		},
1324	})
1325	defer teardown()
1326	sp := client.idleSessions
1327	ms := []*Mutation{
1328		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1329		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1330	}
1331	for i := 0; i < 10; i++ {
1332		_, err := client.Apply(context.Background(), ms, ApplyAtLeastOnce())
1333		if err != nil {
1334			t.Fatal(err)
1335		}
1336		sp.mu.Lock()
1337		if g, w := uint64(sp.idleList.Len())+sp.createReqs, sp.incStep; g != w {
1338			t.Fatalf("idle session count mismatch:\nGot: %v\nWant: %v", g, w)
1339		}
1340		if g, w := uint64(len(server.TestSpanner.DumpSessions())), sp.incStep; g != w {
1341			t.Fatalf("server session count mismatch:\nGot: %v\nWant: %v", g, w)
1342		}
1343		sp.mu.Unlock()
1344	}
1345	// There should be no sessions marked as checked out.
1346	sp.mu.Lock()
1347	g, w := sp.trackedSessionHandles.Len(), 0
1348	sp.mu.Unlock()
1349	if g != w {
1350		t.Fatalf("checked out sessions count mismatch:\nGot: %v\nWant: %v", g, w)
1351	}
1352}
1353
1354func TestClient_ApplyAtLeastOnceInvalidArgument(t *testing.T) {
1355	t.Parallel()
1356	server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
1357		SessionPoolConfig: SessionPoolConfig{
1358			MinOpened:           0,
1359			WriteSessions:       0.0,
1360			TrackSessionHandles: true,
1361		},
1362	})
1363	defer teardown()
1364	sp := client.idleSessions
1365	ms := []*Mutation{
1366		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1367		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1368	}
1369	for i := 0; i < 10; i++ {
1370		server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
1371			SimulatedExecutionTime{
1372				Errors: []error{status.Error(codes.InvalidArgument, "Invalid data")},
1373			})
1374		_, err := client.Apply(context.Background(), ms, ApplyAtLeastOnce())
1375		if status.Code(err) != codes.InvalidArgument {
1376			t.Fatal(err)
1377		}
1378		sp.mu.Lock()
1379		if g, w := uint64(sp.idleList.Len())+sp.createReqs, sp.incStep; g != w {
1380			t.Fatalf("idle session count mismatch:\nGot: %v\nWant: %v", g, w)
1381		}
1382		if g, w := uint64(len(server.TestSpanner.DumpSessions())), sp.incStep; g != w {
1383			t.Fatalf("server session count mismatch:\nGot: %v\nWant: %v", g, w)
1384		}
1385		sp.mu.Unlock()
1386	}
1387	// There should be no sessions marked as checked out.
1388	client.idleSessions.mu.Lock()
1389	g, w := client.idleSessions.trackedSessionHandles.Len(), 0
1390	client.idleSessions.mu.Unlock()
1391	if g != w {
1392		t.Fatalf("checked out sessions count mismatch:\nGot: %v\nWant: %v", g, w)
1393	}
1394}
1395
1396func TestReadWriteTransaction_ErrUnexpectedEOF(t *testing.T) {
1397	t.Parallel()
1398	_, client, teardown := setupMockedTestServer(t)
1399	defer teardown()
1400	ctx := context.Background()
1401	var attempts int
1402	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1403		attempts++
1404		iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
1405		defer iter.Stop()
1406		for {
1407			row, err := iter.Next()
1408			if err == iterator.Done {
1409				break
1410			}
1411			if err != nil {
1412				return err
1413			}
1414			var singerID, albumID int64
1415			var albumTitle string
1416			if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
1417				return err
1418			}
1419		}
1420		return io.ErrUnexpectedEOF
1421	})
1422	if err != io.ErrUnexpectedEOF {
1423		t.Fatalf("Missing expected error %v, got %v", io.ErrUnexpectedEOF, err)
1424	}
1425	if attempts != 1 {
1426		t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, 1)
1427	}
1428}
1429
1430func TestReadWriteTransaction_WrapError(t *testing.T) {
1431	t.Parallel()
1432	server, client, teardown := setupMockedTestServer(t)
1433	defer teardown()
1434	// Abort the transaction on both the query as well as commit.
1435	// The first abort error will be wrapped. The client will unwrap the cause
1436	// of the error and retry the transaction. The aborted error on commit
1437	// will not be wrapped, but will also be recognized by the client as an
1438	// abort that should be retried.
1439	server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
1440		SimulatedExecutionTime{
1441			Errors: []error{status.Error(codes.Aborted, "Transaction aborted")},
1442		})
1443	server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
1444		SimulatedExecutionTime{
1445			Errors: []error{status.Error(codes.Aborted, "Transaction aborted")},
1446		})
1447	msg := "query failed"
1448	numAttempts := 0
1449	ctx := context.Background()
1450	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1451		numAttempts++
1452		iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
1453		defer iter.Stop()
1454		for {
1455			_, err := iter.Next()
1456			if err == iterator.Done {
1457				break
1458			}
1459			if err != nil {
1460				// Wrap the error in another error that implements the
1461				// (xerrors|errors).Wrapper interface.
1462				return &wrappedTestError{err, msg}
1463			}
1464		}
1465		return nil
1466	})
1467	if err != nil {
1468		t.Fatalf("Unexpected error\nGot: %v\nWant: nil", err)
1469	}
1470	if g, w := numAttempts, 3; g != w {
1471		t.Fatalf("Number of transaction attempts mismatch\nGot: %d\nWant: %d", w, w)
1472	}
1473
1474	// Execute a transaction that returns a non-retryable error that is
1475	// wrapped in a custom error. The transaction should return the custom
1476	// error.
1477	server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
1478		SimulatedExecutionTime{
1479			Errors: []error{status.Error(codes.NotFound, "Table not found")},
1480		})
1481	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1482		numAttempts++
1483		iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
1484		defer iter.Stop()
1485		for {
1486			_, err := iter.Next()
1487			if err == iterator.Done {
1488				break
1489			}
1490			if err != nil {
1491				// Wrap the error in another error that implements the
1492				// (xerrors|errors).Wrapper interface.
1493				return &wrappedTestError{err, msg}
1494			}
1495		}
1496		return nil
1497	})
1498	if err == nil || err.Error() != msg {
1499		t.Fatalf("Unexpected error\nGot: %v\nWant: %v", err, msg)
1500	}
1501}
1502
1503func TestReadWriteTransaction_WrapSessionNotFoundError(t *testing.T) {
1504	t.Parallel()
1505	server, client, teardown := setupMockedTestServer(t)
1506	defer teardown()
1507	server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
1508		SimulatedExecutionTime{
1509			Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
1510		})
1511	server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
1512		SimulatedExecutionTime{
1513			Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
1514		})
1515	server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
1516		SimulatedExecutionTime{
1517			Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
1518		})
1519	msg := "query failed"
1520	numAttempts := 0
1521	ctx := context.Background()
1522	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1523		numAttempts++
1524		iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
1525		defer iter.Stop()
1526		for {
1527			_, err := iter.Next()
1528			if err == iterator.Done {
1529				break
1530			}
1531			if err != nil {
1532				// Wrap the error in another error that implements the
1533				// (xerrors|errors).Wrapper interface.
1534				return &wrappedTestError{err, msg}
1535			}
1536		}
1537		return nil
1538	})
1539	if err != nil {
1540		t.Fatalf("Unexpected error\nGot: %v\nWant: nil", err)
1541	}
1542	// We want 3 attempts. The 'Session not found' error on BeginTransaction
1543	// will not retry the entire transaction, which means that we will have two
1544	// failed attempts and then a successful attempt.
1545	if g, w := numAttempts, 3; g != w {
1546		t.Fatalf("Number of transaction attempts mismatch\nGot: %d\nWant: %d", g, w)
1547	}
1548}
1549
1550func TestClient_WriteStructWithPointers(t *testing.T) {
1551	t.Parallel()
1552	server, client, teardown := setupMockedTestServer(t)
1553	defer teardown()
1554	type T struct {
1555		ID    int64
1556		Col1  *string
1557		Col2  []*string
1558		Col3  *bool
1559		Col4  []*bool
1560		Col5  *int64
1561		Col6  []*int64
1562		Col7  *float64
1563		Col8  []*float64
1564		Col9  *time.Time
1565		Col10 []*time.Time
1566		Col11 *civil.Date
1567		Col12 []*civil.Date
1568	}
1569	t1 := T{
1570		ID:    1,
1571		Col2:  []*string{nil},
1572		Col4:  []*bool{nil},
1573		Col6:  []*int64{nil},
1574		Col8:  []*float64{nil},
1575		Col10: []*time.Time{nil},
1576		Col12: []*civil.Date{nil},
1577	}
1578	s := "foo"
1579	b := true
1580	i := int64(100)
1581	f := 3.14
1582	tm := time.Now()
1583	d := civil.DateOf(time.Now())
1584	t2 := T{
1585		ID:    2,
1586		Col1:  &s,
1587		Col2:  []*string{&s},
1588		Col3:  &b,
1589		Col4:  []*bool{&b},
1590		Col5:  &i,
1591		Col6:  []*int64{&i},
1592		Col7:  &f,
1593		Col8:  []*float64{&f},
1594		Col9:  &tm,
1595		Col10: []*time.Time{&tm},
1596		Col11: &d,
1597		Col12: []*civil.Date{&d},
1598	}
1599	m1, err := InsertStruct("Tab", &t1)
1600	if err != nil {
1601		t.Fatal(err)
1602	}
1603	m2, err := InsertStruct("Tab", &t2)
1604	if err != nil {
1605		t.Fatal(err)
1606	}
1607	_, err = client.Apply(context.Background(), []*Mutation{m1, m2})
1608	if err != nil {
1609		t.Fatal(err)
1610	}
1611	requests := drainRequestsFromServer(server.TestSpanner)
1612	for _, req := range requests {
1613		if commit, ok := req.(*sppb.CommitRequest); ok {
1614			if g, w := len(commit.Mutations), 2; w != g {
1615				t.Fatalf("mutation count mismatch\nGot: %v\nWant: %v", g, w)
1616			}
1617			insert := commit.Mutations[0].GetInsert()
1618			// The first insert should contain NULL values and arrays
1619			// containing exactly one NULL element.
1620			for i := 1; i < len(insert.Values[0].Values); i += 2 {
1621				// The non-array columns should contain NULL values.
1622				g, w := insert.Values[0].Values[i].GetKind(), &structpb.Value_NullValue{}
1623				if _, ok := g.(*structpb.Value_NullValue); !ok {
1624					t.Fatalf("type mismatch\nGot: %v\nWant: %v", g, w)
1625				}
1626				// The array columns should not be NULL.
1627				g, wList := insert.Values[0].Values[i+1].GetKind(), &structpb.Value_ListValue{}
1628				if _, ok := g.(*structpb.Value_ListValue); !ok {
1629					t.Fatalf("type mismatch\nGot: %v\nWant: %v", g, wList)
1630				}
1631				// The array should contain 1 NULL value.
1632				if gLength, wLength := len(insert.Values[0].Values[i+1].GetListValue().Values), 1; gLength != wLength {
1633					t.Fatalf("list value length mismatch\nGot: %v\nWant: %v", gLength, wLength)
1634				}
1635				g, w = insert.Values[0].Values[i+1].GetListValue().Values[0].GetKind(), &structpb.Value_NullValue{}
1636				if _, ok := g.(*structpb.Value_NullValue); !ok {
1637					t.Fatalf("type mismatch\nGot: %v\nWant: %v", g, w)
1638				}
1639			}
1640
1641			// The second insert should contain all non-NULL values.
1642			insert = commit.Mutations[1].GetInsert()
1643			for i := 1; i < len(insert.Values[0].Values); i += 2 {
1644				// The non-array columns should contain non-NULL values.
1645				g := insert.Values[0].Values[i].GetKind()
1646				if _, ok := g.(*structpb.Value_NullValue); ok {
1647					t.Fatalf("type mismatch\nGot: %v\nWant: non-NULL value", g)
1648				}
1649				// The array columns should also be non-NULL.
1650				g, wList := insert.Values[0].Values[i+1].GetKind(), &structpb.Value_ListValue{}
1651				if _, ok := g.(*structpb.Value_ListValue); !ok {
1652					t.Fatalf("type mismatch\nGot: %v\nWant: %v", g, wList)
1653				}
1654				// The array should contain exactly 1 non-NULL value.
1655				if gLength, wLength := len(insert.Values[0].Values[i+1].GetListValue().Values), 1; gLength != wLength {
1656					t.Fatalf("list value length mismatch\nGot: %v\nWant: %v", gLength, wLength)
1657				}
1658				g = insert.Values[0].Values[i+1].GetListValue().Values[0].GetKind()
1659				if _, ok := g.(*structpb.Value_NullValue); ok {
1660					t.Fatalf("type mismatch\nGot: %v\nWant: non-NULL value", g)
1661				}
1662			}
1663		}
1664	}
1665}
1666
1667func TestClient_WriteStructWithCustomTypes(t *testing.T) {
1668	t.Parallel()
1669	server, client, teardown := setupMockedTestServer(t)
1670	defer teardown()
1671	type CustomString string
1672	type CustomBool bool
1673	type CustomInt64 int64
1674	type CustomFloat64 float64
1675	type CustomTime time.Time
1676	type CustomDate civil.Date
1677	type T struct {
1678		ID    int64
1679		Col1  CustomString
1680		Col2  []CustomString
1681		Col3  CustomBool
1682		Col4  []CustomBool
1683		Col5  CustomInt64
1684		Col6  []CustomInt64
1685		Col7  CustomFloat64
1686		Col8  []CustomFloat64
1687		Col9  CustomTime
1688		Col10 []CustomTime
1689		Col11 CustomDate
1690		Col12 []CustomDate
1691	}
1692	t1 := T{
1693		ID:    1,
1694		Col2:  []CustomString{},
1695		Col4:  []CustomBool{},
1696		Col6:  []CustomInt64{},
1697		Col8:  []CustomFloat64{},
1698		Col10: []CustomTime{},
1699		Col12: []CustomDate{},
1700	}
1701	t2 := T{
1702		ID:    2,
1703		Col1:  "foo",
1704		Col2:  []CustomString{"foo"},
1705		Col3:  true,
1706		Col4:  []CustomBool{true},
1707		Col5:  100,
1708		Col6:  []CustomInt64{100},
1709		Col7:  3.14,
1710		Col8:  []CustomFloat64{3.14},
1711		Col9:  CustomTime(time.Now()),
1712		Col10: []CustomTime{CustomTime(time.Now())},
1713		Col11: CustomDate(civil.DateOf(time.Now())),
1714		Col12: []CustomDate{CustomDate(civil.DateOf(time.Now()))},
1715	}
1716	m1, err := InsertStruct("Tab", &t1)
1717	if err != nil {
1718		t.Fatal(err)
1719	}
1720	m2, err := InsertStruct("Tab", &t2)
1721	if err != nil {
1722		t.Fatal(err)
1723	}
1724	_, err = client.Apply(context.Background(), []*Mutation{m1, m2})
1725	if err != nil {
1726		t.Fatal(err)
1727	}
1728	requests := drainRequestsFromServer(server.TestSpanner)
1729	for _, req := range requests {
1730		if commit, ok := req.(*sppb.CommitRequest); ok {
1731			if g, w := len(commit.Mutations), 2; w != g {
1732				t.Fatalf("mutation count mismatch\nGot: %v\nWant: %v", g, w)
1733			}
1734			insert1 := commit.Mutations[0].GetInsert()
1735			row1 := insert1.Values[0]
1736			// The first insert should contain empty values and empty arrays
1737			for i := 1; i < len(row1.Values); i += 2 {
1738				// The non-array columns should contain empty values.
1739				g := row1.Values[i].GetKind()
1740				if _, ok := g.(*structpb.Value_NullValue); ok {
1741					t.Fatalf("type mismatch\nGot: %v\nWant: non-NULL value", g)
1742				}
1743				// The array columns should not be NULL.
1744				g, wList := row1.Values[i+1].GetKind(), &structpb.Value_ListValue{}
1745				if _, ok := g.(*structpb.Value_ListValue); !ok {
1746					t.Fatalf("type mismatch\nGot: %v\nWant: %v", g, wList)
1747				}
1748			}
1749
1750			// The second insert should contain all non-NULL values.
1751			insert2 := commit.Mutations[1].GetInsert()
1752			row2 := insert2.Values[0]
1753			for i := 1; i < len(row2.Values); i += 2 {
1754				// The non-array columns should contain non-NULL values.
1755				g := row2.Values[i].GetKind()
1756				if _, ok := g.(*structpb.Value_NullValue); ok {
1757					t.Fatalf("type mismatch\nGot: %v\nWant: non-NULL value", g)
1758				}
1759				// The array columns should also be non-NULL.
1760				g, wList := row2.Values[i+1].GetKind(), &structpb.Value_ListValue{}
1761				if _, ok := g.(*structpb.Value_ListValue); !ok {
1762					t.Fatalf("type mismatch\nGot: %v\nWant: %v", g, wList)
1763				}
1764				// The array should contain exactly 1 non-NULL value.
1765				if gLength, wLength := len(row2.Values[i+1].GetListValue().Values), 1; gLength != wLength {
1766					t.Fatalf("list value length mismatch\nGot: %v\nWant: %v", gLength, wLength)
1767				}
1768				g = row2.Values[i+1].GetListValue().Values[0].GetKind()
1769				if _, ok := g.(*structpb.Value_NullValue); ok {
1770					t.Fatalf("type mismatch\nGot: %v\nWant: non-NULL value", g)
1771				}
1772			}
1773		}
1774	}
1775}
1776
1777func TestReadWriteTransaction_ContextTimeoutDuringCommit(t *testing.T) {
1778	t.Parallel()
1779	server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
1780		SessionPoolConfig: SessionPoolConfig{
1781			MinOpened:     1,
1782			WriteSessions: 0,
1783		},
1784	})
1785	defer teardown()
1786
1787	// Wait until session creation has seized so that
1788	// context timeout won't happen while a session is being created.
1789	waitFor(t, func() error {
1790		sp := client.idleSessions
1791		sp.mu.Lock()
1792		defer sp.mu.Unlock()
1793		if sp.createReqs != 0 {
1794			return fmt.Errorf("%d sessions are still in creation", sp.createReqs)
1795		}
1796		return nil
1797	})
1798
1799	server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
1800		SimulatedExecutionTime{
1801			MinimumExecutionTime: time.Minute,
1802		})
1803	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
1804	defer cancel()
1805	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1806		tx.BufferWrite([]*Mutation{Insert("FOO", []string{"ID", "NAME"}, []interface{}{int64(1), "bar"})})
1807		return nil
1808	})
1809	errContext, _ := context.WithTimeout(context.Background(), -time.Second)
1810	w := toSpannerErrorWithCommitInfo(errContext.Err(), true).(*Error)
1811	var se *Error
1812	if !errorAs(err, &se) {
1813		t.Fatalf("Error mismatch\nGot: %v\nWant: %v", err, w)
1814	}
1815	if se.GRPCStatus().Code() != w.GRPCStatus().Code() {
1816		t.Fatalf("Error status mismatch:\nGot: %v\nWant: %v", se.GRPCStatus(), w.GRPCStatus())
1817	}
1818	if se.Error() != w.Error() {
1819		t.Fatalf("Error message mismatch:\nGot %s\nWant: %s", se.Error(), w.Error())
1820	}
1821	var outcome *TransactionOutcomeUnknownError
1822	if !errorAs(err, &outcome) {
1823		t.Fatalf("Missing wrapped TransactionOutcomeUnknownError error")
1824	}
1825}
1826
1827func TestFailedCommit_NoRollback(t *testing.T) {
1828	t.Parallel()
1829	server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
1830		SessionPoolConfig: SessionPoolConfig{
1831			MinOpened:     0,
1832			MaxOpened:     1,
1833			WriteSessions: 0,
1834		},
1835	})
1836	defer teardown()
1837	server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
1838		SimulatedExecutionTime{
1839			Errors: []error{status.Errorf(codes.InvalidArgument, "Invalid mutations")},
1840		})
1841	_, err := client.Apply(context.Background(), []*Mutation{
1842		Insert("FOO", []string{"ID", "BAR"}, []interface{}{1, "value"}),
1843	})
1844	if got, want := status.Convert(err).Code(), codes.InvalidArgument; got != want {
1845		t.Fatalf("Error mismatch\nGot: %v\nWant: %v", got, want)
1846	}
1847	// The failed commit should not trigger a rollback after the commit.
1848	if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{
1849		&sppb.BatchCreateSessionsRequest{},
1850		&sppb.BeginTransactionRequest{},
1851		&sppb.CommitRequest{},
1852	}); err != nil {
1853		t.Fatalf("Received RPCs mismatch: %v", err)
1854	}
1855}
1856
1857func TestFailedUpdate_ShouldRollback(t *testing.T) {
1858	t.Parallel()
1859	server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
1860		SessionPoolConfig: SessionPoolConfig{
1861			MinOpened:     0,
1862			MaxOpened:     1,
1863			WriteSessions: 0,
1864		},
1865	})
1866	defer teardown()
1867	server.TestSpanner.PutExecutionTime(MethodExecuteSql,
1868		SimulatedExecutionTime{
1869			Errors: []error{status.Errorf(codes.InvalidArgument, "Invalid update")},
1870		})
1871	_, err := client.ReadWriteTransaction(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error {
1872		_, err := tx.Update(ctx, NewStatement("UPDATE FOO SET BAR='value' WHERE ID=1"))
1873		return err
1874	})
1875	if got, want := status.Convert(err).Code(), codes.InvalidArgument; got != want {
1876		t.Fatalf("Error mismatch\nGot: %v\nWant: %v", got, want)
1877	}
1878	// The failed update should trigger a rollback.
1879	if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{
1880		&sppb.BatchCreateSessionsRequest{},
1881		&sppb.BeginTransactionRequest{},
1882		&sppb.ExecuteSqlRequest{},
1883		&sppb.RollbackRequest{},
1884	}); err != nil {
1885		t.Fatalf("Received RPCs mismatch: %v", err)
1886	}
1887}
1888
1889func TestClient_NumChannels(t *testing.T) {
1890	t.Parallel()
1891
1892	configuredNumChannels := 8
1893	_, client, teardown := setupMockedTestServerWithConfig(
1894		t,
1895		ClientConfig{NumChannels: configuredNumChannels},
1896	)
1897	defer teardown()
1898	if g, w := client.sc.connPool.Num(), configuredNumChannels; g != w {
1899		t.Fatalf("NumChannels mismatch\nGot: %v\nWant: %v", g, w)
1900	}
1901}
1902
1903func TestClient_WithGRPCConnectionPool(t *testing.T) {
1904	t.Parallel()
1905
1906	configuredConnPool := 8
1907	_, client, teardown := setupMockedTestServerWithConfigAndClientOptions(
1908		t,
1909		ClientConfig{},
1910		[]option.ClientOption{option.WithGRPCConnectionPool(configuredConnPool)},
1911	)
1912	defer teardown()
1913	if g, w := client.sc.connPool.Num(), configuredConnPool; g != w {
1914		t.Fatalf("NumChannels mismatch\nGot: %v\nWant: %v", g, w)
1915	}
1916}
1917
1918func TestClient_WithGRPCConnectionPoolAndNumChannels(t *testing.T) {
1919	t.Parallel()
1920
1921	configuredNumChannels := 8
1922	configuredConnPool := 8
1923	_, client, teardown := setupMockedTestServerWithConfigAndClientOptions(
1924		t,
1925		ClientConfig{NumChannels: configuredNumChannels},
1926		[]option.ClientOption{option.WithGRPCConnectionPool(configuredConnPool)},
1927	)
1928	defer teardown()
1929	if g, w := client.sc.connPool.Num(), configuredConnPool; g != w {
1930		t.Fatalf("NumChannels mismatch\nGot: %v\nWant: %v", g, w)
1931	}
1932}
1933
1934func TestClient_WithGRPCConnectionPoolAndNumChannels_Misconfigured(t *testing.T) {
1935	t.Parallel()
1936
1937	// Deliberately misconfigure NumChannels and ConnPool.
1938	configuredNumChannels := 8
1939	configuredConnPool := 16
1940
1941	_, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
1942	defer serverTeardown()
1943	opts = append(opts, option.WithGRPCConnectionPool(configuredConnPool))
1944
1945	_, err := NewClientWithConfig(context.Background(), "projects/p/instances/i/databases/d", ClientConfig{NumChannels: configuredNumChannels}, opts...)
1946	msg := "Connection pool mismatch:"
1947	if err == nil {
1948		t.Fatalf("Error mismatch\nGot: nil\nWant: %s", msg)
1949	}
1950	var se *Error
1951	if ok := errorAs(err, &se); !ok {
1952		t.Fatalf("Error mismatch\nGot: %v\nWant: An instance of a Spanner error", err)
1953	}
1954	if g, w := se.GRPCStatus().Code(), codes.InvalidArgument; g != w {
1955		t.Fatalf("Error code mismatch\nGot: %v\nWant: %v", g, w)
1956	}
1957	if !strings.Contains(se.Error(), msg) {
1958		t.Fatalf("Error message mismatch\nGot: %s\nWant: %s", se.Error(), msg)
1959	}
1960}
1961
1962func TestClient_CallOptions(t *testing.T) {
1963	t.Parallel()
1964	co := &vkit.CallOptions{
1965		CreateSession: []gax.CallOption{
1966			gax.WithRetry(func() gax.Retryer {
1967				return gax.OnCodes([]codes.Code{
1968					codes.Unavailable, codes.DeadlineExceeded,
1969				}, gax.Backoff{
1970					Initial:    200 * time.Millisecond,
1971					Max:        30000 * time.Millisecond,
1972					Multiplier: 1.25,
1973				})
1974			}),
1975		},
1976	}
1977
1978	_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{CallOptions: co})
1979	defer teardown()
1980
1981	c, err := client.sc.nextClient()
1982	if err != nil {
1983		t.Fatalf("failed to get a session client: %v", err)
1984	}
1985
1986	cs := &gax.CallSettings{}
1987	// This is the default retry setting.
1988	c.CallOptions.CreateSession[0].Resolve(cs)
1989	if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14]}"; got != want {
1990		t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want)
1991	}
1992
1993	// This is the custom retry setting.
1994	c.CallOptions.CreateSession[1].Resolve(cs)
1995	if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{200000000 30000000000 1.25 0} [14 4]}"; got != want {
1996		t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want)
1997	}
1998}
1999
2000func TestClient_QueryWithCallOptions(t *testing.T) {
2001	t.Parallel()
2002	co := &vkit.CallOptions{
2003		ExecuteSql: []gax.CallOption{
2004			gax.WithRetry(func() gax.Retryer {
2005				return gax.OnCodes([]codes.Code{
2006					codes.DeadlineExceeded,
2007				}, gax.Backoff{
2008					Initial:    200 * time.Millisecond,
2009					Max:        30000 * time.Millisecond,
2010					Multiplier: 1.25,
2011				})
2012			}),
2013		},
2014	}
2015	server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{CallOptions: co})
2016	server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{
2017		Errors: []error{status.Error(codes.DeadlineExceeded, "Deadline exceeded")},
2018	})
2019	defer teardown()
2020	ctx := context.Background()
2021	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2022		_, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo})
2023		if err != nil {
2024			return err
2025		}
2026		return nil
2027	})
2028	if err != nil {
2029		t.Fatal(err)
2030	}
2031}
2032
2033func TestClient_ShouldReceiveMetadataForEmptyResultSet(t *testing.T) {
2034	t.Parallel()
2035
2036	server, client, teardown := setupMockedTestServer(t)
2037	// This creates an empty result set.
2038	res := server.CreateSingleRowSingersResult(SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
2039	sql := "SELECT SingerId, AlbumId, AlbumTitle FROM Albums WHERE 1=2"
2040	server.TestSpanner.PutStatementResult(sql, res)
2041	defer teardown()
2042	ctx := context.Background()
2043	iter := client.Single().Query(ctx, NewStatement(sql))
2044	defer iter.Stop()
2045	row, err := iter.Next()
2046	if err != iterator.Done {
2047		t.Errorf("Query result mismatch:\nGot: %v\nWant: <no rows>", row)
2048	}
2049	metadata := iter.Metadata
2050	if metadata == nil {
2051		t.Fatalf("Missing ResultSet Metadata")
2052	}
2053	if metadata.RowType == nil {
2054		t.Fatalf("Missing ResultSet RowType")
2055	}
2056	if metadata.RowType.Fields == nil {
2057		t.Fatalf("Missing ResultSet Fields")
2058	}
2059	if g, w := len(metadata.RowType.Fields), 3; g != w {
2060		t.Fatalf("Field count mismatch\nGot: %v\nWant: %v", g, w)
2061	}
2062	wantFieldNames := []string{"SingerId", "AlbumId", "AlbumTitle"}
2063	for i, w := range wantFieldNames {
2064		g := metadata.RowType.Fields[i].Name
2065		if g != w {
2066			t.Fatalf("Field[%v] name mismatch\nGot: %v\nWant: %v", i, g, w)
2067		}
2068	}
2069	wantFieldTypes := []sppb.TypeCode{sppb.TypeCode_INT64, sppb.TypeCode_INT64, sppb.TypeCode_STRING}
2070	for i, w := range wantFieldTypes {
2071		g := metadata.RowType.Fields[i].Type.Code
2072		if g != w {
2073			t.Fatalf("Field[%v] type mismatch\nGot: %v\nWant: %v", i, g, w)
2074		}
2075	}
2076}
2077
2078func TestClient_EncodeCustomFieldType(t *testing.T) {
2079	t.Parallel()
2080
2081	type typesTable struct {
2082		Int    customStructToInt    `spanner:"Int"`
2083		String customStructToString `spanner:"String"`
2084		Float  customStructToFloat  `spanner:"Float"`
2085		Bool   customStructToBool   `spanner:"Bool"`
2086		Time   customStructToTime   `spanner:"Time"`
2087		Date   customStructToDate   `spanner:"Date"`
2088	}
2089
2090	server, client, teardown := setupMockedTestServer(t)
2091	defer teardown()
2092	ctx := context.Background()
2093
2094	d := typesTable{
2095		Int:    customStructToInt{1, 23},
2096		String: customStructToString{"A", "B"},
2097		Float:  customStructToFloat{1.23, 12.3},
2098		Bool:   customStructToBool{true, false},
2099		Time:   customStructToTime{"A", "B"},
2100		Date:   customStructToDate{"A", "B"},
2101	}
2102
2103	m, err := InsertStruct("Types", &d)
2104	if err != nil {
2105		t.Fatalf("err: %v", err)
2106	}
2107
2108	ms := []*Mutation{m}
2109	_, err = client.Apply(ctx, ms)
2110	if err != nil {
2111		t.Fatalf("err: %v", err)
2112	}
2113
2114	reqs := drainRequestsFromServer(server.TestSpanner)
2115
2116	for _, req := range reqs {
2117		if commitReq, ok := req.(*sppb.CommitRequest); ok {
2118			val := commitReq.Mutations[0].GetInsert().Values[0]
2119
2120			if got, want := val.Values[0].GetStringValue(), "123"; got != want {
2121				t.Fatalf("value mismatch: got %v (kind %T), want %v", got, val.Values[0].GetKind(), want)
2122			}
2123			if got, want := val.Values[1].GetStringValue(), "A-B"; got != want {
2124				t.Fatalf("value mismatch: got %v (kind %T), want %v", got, val.Values[1].GetKind(), want)
2125			}
2126			if got, want := val.Values[2].GetNumberValue(), float64(123.123); got != want {
2127				t.Fatalf("value mismatch: got %v (kind %T), want %v", got, val.Values[2].GetKind(), want)
2128			}
2129			if got, want := val.Values[3].GetBoolValue(), true; got != want {
2130				t.Fatalf("value mismatch: got %v (kind %T), want %v", got, val.Values[3].GetKind(), want)
2131			}
2132			if got, want := val.Values[4].GetStringValue(), "2016-11-15T15:04:05.999999999Z"; got != want {
2133				t.Fatalf("value mismatch: got %v (kind %T), want %v", got, val.Values[4].GetKind(), want)
2134			}
2135			if got, want := val.Values[5].GetStringValue(), "2016-11-15"; got != want {
2136				t.Fatalf("value mismatch: got %v (kind %T), want %v", got, val.Values[5].GetKind(), want)
2137			}
2138		}
2139	}
2140}
2141
2142func setupDecodeCustomFieldResult(server *MockedSpannerInMemTestServer, stmt string) error {
2143	metadata := &sppb.ResultSetMetadata{
2144		RowType: &sppb.StructType{
2145			Fields: []*sppb.StructType_Field{
2146				{Name: "Int", Type: &sppb.Type{Code: sppb.TypeCode_INT64}},
2147				{Name: "String", Type: &sppb.Type{Code: sppb.TypeCode_STRING}},
2148				{Name: "Float", Type: &sppb.Type{Code: sppb.TypeCode_FLOAT64}},
2149				{Name: "Bool", Type: &sppb.Type{Code: sppb.TypeCode_BOOL}},
2150				{Name: "Time", Type: &sppb.Type{Code: sppb.TypeCode_TIMESTAMP}},
2151				{Name: "Date", Type: &sppb.Type{Code: sppb.TypeCode_DATE}},
2152			},
2153		},
2154	}
2155	rowValues := []*structpb.Value{
2156		{Kind: &structpb.Value_StringValue{StringValue: "123"}},
2157		{Kind: &structpb.Value_StringValue{StringValue: "A-B"}},
2158		{Kind: &structpb.Value_NumberValue{NumberValue: float64(123.123)}},
2159		{Kind: &structpb.Value_BoolValue{BoolValue: true}},
2160		{Kind: &structpb.Value_StringValue{StringValue: "2016-11-15T15:04:05.999999999Z"}},
2161		{Kind: &structpb.Value_StringValue{StringValue: "2016-11-15"}},
2162	}
2163	rows := []*structpb.ListValue{
2164		{Values: rowValues},
2165	}
2166	resultSet := &sppb.ResultSet{
2167		Metadata: metadata,
2168		Rows:     rows,
2169	}
2170	result := &StatementResult{
2171		Type:      StatementResultResultSet,
2172		ResultSet: resultSet,
2173	}
2174	return server.TestSpanner.PutStatementResult(stmt, result)
2175}
2176
2177func TestClient_DecodeCustomFieldType(t *testing.T) {
2178	t.Parallel()
2179
2180	type typesTable struct {
2181		Int    customStructToInt    `spanner:"Int"`
2182		String customStructToString `spanner:"String"`
2183		Float  customStructToFloat  `spanner:"Float"`
2184		Bool   customStructToBool   `spanner:"Bool"`
2185		Time   customStructToTime   `spanner:"Time"`
2186		Date   customStructToDate   `spanner:"Date"`
2187	}
2188
2189	server, client, teardown := setupMockedTestServer(t)
2190	defer teardown()
2191
2192	query := "SELECT * FROM Types"
2193	setupDecodeCustomFieldResult(server, query)
2194
2195	ctx := context.Background()
2196	stmt := Statement{SQL: query}
2197	iter := client.Single().Query(ctx, stmt)
2198	defer iter.Stop()
2199
2200	var results []typesTable
2201	for {
2202		row, err := iter.Next()
2203		if err == iterator.Done {
2204			break
2205		}
2206		if err != nil {
2207			t.Fatalf("failed to get next: %v", err)
2208		}
2209
2210		var d typesTable
2211		if err := row.ToStruct(&d); err != nil {
2212			t.Fatalf("failed to convert a row to a struct: %v", err)
2213		}
2214		results = append(results, d)
2215	}
2216
2217	if len(results) > 1 {
2218		t.Fatalf("mismatch length of array: got %v, want 1", results)
2219	}
2220
2221	want := typesTable{
2222		Int:    customStructToInt{1, 23},
2223		String: customStructToString{"A", "B"},
2224		Float:  customStructToFloat{1.23, 12.3},
2225		Bool:   customStructToBool{true, false},
2226		Time:   customStructToTime{"A", "B"},
2227		Date:   customStructToDate{"A", "B"},
2228	}
2229	got := results[0]
2230	if !testEqual(got, want) {
2231		t.Fatalf("mismatch result: got %v, want %v", got, want)
2232	}
2233}
2234
2235func TestClient_EmulatorWithCredentialsFile(t *testing.T) {
2236	old := os.Getenv("SPANNER_EMULATOR_HOST")
2237	defer os.Setenv("SPANNER_EMULATOR_HOST", old)
2238
2239	os.Setenv("SPANNER_EMULATOR_HOST", "localhost:1234")
2240
2241	client, err := NewClientWithConfig(
2242		context.Background(),
2243		"projects/p/instances/i/databases/d",
2244		ClientConfig{},
2245		option.WithCredentialsFile("/path/to/key.json"),
2246	)
2247	defer client.Close()
2248	if err != nil {
2249		t.Fatalf("Failed to create a client with credentials file when running against an emulator: %v", err)
2250	}
2251}
2252
2253func TestBatchReadOnlyTransaction_QueryOptions(t *testing.T) {
2254	ctx := context.Background()
2255	qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{
2256		OptimizerVersion:           "1",
2257		OptimizerStatisticsPackage: "latest",
2258	}}
2259	_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: qo})
2260	defer teardown()
2261
2262	txn, err := client.BatchReadOnlyTransaction(ctx, StrongRead())
2263	if err != nil {
2264		t.Fatal(err)
2265	}
2266	defer txn.Cleanup(ctx)
2267
2268	if txn.qo != qo {
2269		t.Fatalf("Query options are mismatched: got %v, want %v", txn.qo, qo)
2270	}
2271}
2272
2273func TestBatchReadOnlyTransactionFromID_QueryOptions(t *testing.T) {
2274	qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{
2275		OptimizerVersion:           "1",
2276		OptimizerStatisticsPackage: "latest",
2277	}}
2278	_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: qo})
2279	defer teardown()
2280
2281	txn := client.BatchReadOnlyTransactionFromID(BatchReadOnlyTransactionID{})
2282
2283	if txn.qo != qo {
2284		t.Fatalf("Query options are mismatched: got %v, want %v", txn.qo, qo)
2285	}
2286}
2287
2288type QueryOptionsTestCase struct {
2289	name   string
2290	client QueryOptions
2291	env    QueryOptions
2292	query  QueryOptions
2293	want   QueryOptions
2294}
2295
2296func queryOptionsTestCases() []QueryOptionsTestCase {
2297	statsPkg := "latest"
2298	return []QueryOptionsTestCase{
2299		{
2300			"Client level",
2301			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
2302			QueryOptions{Options: nil},
2303			QueryOptions{Options: nil},
2304			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
2305		},
2306		{
2307			"Environment level",
2308			QueryOptions{Options: nil},
2309			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
2310			QueryOptions{Options: nil},
2311			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
2312		},
2313		{
2314			"Query level",
2315			QueryOptions{Options: nil},
2316			QueryOptions{Options: nil},
2317			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
2318			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
2319		},
2320		{
2321			"Environment level has precedence",
2322			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
2323			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}},
2324			QueryOptions{Options: nil},
2325			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}},
2326		},
2327		{
2328			"Query level has precedence than client level",
2329			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
2330			QueryOptions{Options: nil},
2331			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}},
2332			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}},
2333		},
2334		{
2335			"Query level has highest precedence",
2336			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
2337			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}},
2338			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}},
2339			QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}},
2340		},
2341	}
2342}
2343
2344func TestClient_DoForEachRow_ShouldNotEndSpanWithIteratorDoneError(t *testing.T) {
2345	// This test cannot be parallel, as the TestExporter does not support that.
2346	te := itestutil.NewTestExporter()
2347	defer te.Unregister()
2348	_, client, teardown := setupMockedTestServer(t)
2349	defer teardown()
2350
2351	iter := client.Single().Query(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
2352	iter.Do(func(r *Row) error {
2353		return nil
2354	})
2355	select {
2356	case <-te.Stats:
2357	case <-time.After(1 * time.Second):
2358		t.Fatal("No stats were exported before timeout")
2359	}
2360	if len(te.Spans) == 0 {
2361		t.Fatal("No spans were exported")
2362	}
2363	s := te.Spans[len(te.Spans)-1].Status
2364	if s.Code != int32(codes.OK) {
2365		t.Errorf("Span status mismatch\nGot: %v\nWant: %v", s.Code, codes.OK)
2366	}
2367}
2368
2369func TestClient_DoForEachRow_ShouldEndSpanWithQueryError(t *testing.T) {
2370	// This test cannot be parallel, as the TestExporter does not support that.
2371	te := itestutil.NewTestExporter()
2372	defer te.Unregister()
2373	server, client, teardown := setupMockedTestServer(t)
2374	defer teardown()
2375
2376	sql := "SELECT * FROM"
2377	server.TestSpanner.PutStatementResult(sql, &StatementResult{
2378		Type: StatementResultError,
2379		Err:  status.Error(codes.InvalidArgument, "Invalid query"),
2380	})
2381
2382	iter := client.Single().Query(context.Background(), NewStatement(sql))
2383	iter.Do(func(r *Row) error {
2384		return nil
2385	})
2386	select {
2387	case <-te.Stats:
2388	case <-time.After(1 * time.Second):
2389		t.Fatal("No stats were exported before timeout")
2390	}
2391	if len(te.Spans) == 0 {
2392		t.Fatal("No spans were exported")
2393	}
2394	s := te.Spans[len(te.Spans)-1].Status
2395	if s.Code != int32(codes.InvalidArgument) {
2396		t.Errorf("Span status mismatch\nGot: %v\nWant: %v", s.Code, codes.InvalidArgument)
2397	}
2398}
2399
2400func TestClient_ReadOnlyTransaction_Priority(t *testing.T) {
2401	t.Parallel()
2402
2403	server, client, teardown := setupMockedTestServer(t)
2404	defer teardown()
2405	for _, qo := range []QueryOptions{
2406		{},
2407		{Priority: sppb.RequestOptions_PRIORITY_HIGH},
2408	} {
2409		for _, tx := range []*ReadOnlyTransaction{
2410			client.Single(),
2411			client.ReadOnlyTransaction(),
2412		} {
2413			iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
2414			iter.Next()
2415			iter.Stop()
2416
2417			if tx.singleUse {
2418				tx = client.Single()
2419			}
2420			iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority})
2421			iter.Next()
2422			iter.Stop()
2423
2424			checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 2, sppb.RequestOptions{Priority: qo.Priority})
2425			tx.Close()
2426		}
2427	}
2428}
2429
2430func TestClient_ReadWriteTransaction_Priority(t *testing.T) {
2431	t.Parallel()
2432
2433	server, client, teardown := setupMockedTestServer(t)
2434	defer teardown()
2435	for _, to := range []TransactionOptions{
2436		{},
2437		{CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
2438	} {
2439		for _, qo := range []QueryOptions{
2440			{},
2441			{Priority: sppb.RequestOptions_PRIORITY_MEDIUM},
2442		} {
2443			client.ReadWriteTransactionWithOptions(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error {
2444				iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
2445				iter.Next()
2446				iter.Stop()
2447
2448				iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority})
2449				iter.Next()
2450				iter.Stop()
2451
2452				tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
2453				tx.BatchUpdateWithOptions(context.Background(), []Statement{
2454					NewStatement(UpdateBarSetFoo),
2455				}, qo)
2456				checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{Priority: qo.Priority})
2457
2458				return nil
2459			}, to)
2460			checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: to.CommitPriority})
2461		}
2462	}
2463}
2464
2465func TestClient_StmtBasedReadWriteTransaction_Priority(t *testing.T) {
2466	t.Parallel()
2467
2468	server, client, teardown := setupMockedTestServer(t)
2469	defer teardown()
2470	for _, to := range []TransactionOptions{
2471		{},
2472		{CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
2473	} {
2474		for _, qo := range []QueryOptions{
2475			{},
2476			{Priority: sppb.RequestOptions_PRIORITY_LOW},
2477		} {
2478			tx, _ := NewReadWriteStmtBasedTransactionWithOptions(context.Background(), client, to)
2479			iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
2480			iter.Next()
2481			iter.Stop()
2482
2483			iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority})
2484			iter.Next()
2485			iter.Stop()
2486
2487			tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
2488			tx.BatchUpdateWithOptions(context.Background(), []Statement{
2489				NewStatement(UpdateBarSetFoo),
2490			}, qo)
2491
2492			checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{Priority: qo.Priority})
2493			tx.Commit(context.Background())
2494			checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: to.CommitPriority})
2495		}
2496	}
2497}
2498
2499func TestClient_PDML_Priority(t *testing.T) {
2500	t.Parallel()
2501
2502	server, client, teardown := setupMockedTestServer(t)
2503	defer teardown()
2504
2505	for _, qo := range []QueryOptions{
2506		{},
2507		{Priority: sppb.RequestOptions_PRIORITY_HIGH},
2508	} {
2509		client.PartitionedUpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
2510		checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{Priority: qo.Priority})
2511	}
2512}
2513
2514func TestClient_Apply_Priority(t *testing.T) {
2515	t.Parallel()
2516
2517	server, client, teardown := setupMockedTestServer(t)
2518	defer teardown()
2519
2520	client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})})
2521	checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{})
2522
2523	client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, Priority(sppb.RequestOptions_PRIORITY_HIGH))
2524	checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_HIGH})
2525
2526	client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce())
2527	checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{})
2528
2529	client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), Priority(sppb.RequestOptions_PRIORITY_MEDIUM))
2530	checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_MEDIUM})
2531}
2532
2533func TestClient_ReadOnlyTransaction_Tag(t *testing.T) {
2534	t.Parallel()
2535
2536	server, client, teardown := setupMockedTestServer(t)
2537	defer teardown()
2538	for _, qo := range []QueryOptions{
2539		{},
2540		{RequestTag: "tag-1"},
2541	} {
2542		for _, tx := range []*ReadOnlyTransaction{
2543			client.Single(),
2544			client.ReadOnlyTransaction(),
2545		} {
2546			iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
2547			iter.Next()
2548			iter.Stop()
2549
2550			if tx.singleUse {
2551				tx = client.Single()
2552			}
2553			iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag})
2554			iter.Next()
2555			iter.Stop()
2556
2557			checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 2, sppb.RequestOptions{RequestTag: qo.RequestTag})
2558			tx.Close()
2559		}
2560	}
2561}
2562
2563func TestClient_ReadWriteTransaction_Tag(t *testing.T) {
2564	t.Parallel()
2565
2566	server, client, teardown := setupMockedTestServer(t)
2567	defer teardown()
2568	for _, to := range []TransactionOptions{
2569		{},
2570		{TransactionTag: "tx-tag-1"},
2571	} {
2572		for _, qo := range []QueryOptions{
2573			{},
2574			{RequestTag: "request-tag-1"},
2575		} {
2576			client.ReadWriteTransactionWithOptions(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error {
2577				iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
2578				iter.Next()
2579				iter.Stop()
2580
2581				iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag})
2582				iter.Next()
2583				iter.Stop()
2584
2585				tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
2586				tx.BatchUpdateWithOptions(context.Background(), []Statement{
2587					NewStatement(UpdateBarSetFoo),
2588				}, qo)
2589
2590				// Check for SQL requests inside the transaction to prevent the check to
2591				// drain the commit request from the server.
2592				checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{RequestTag: qo.RequestTag, TransactionTag: to.TransactionTag})
2593				return nil
2594			}, to)
2595			checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: to.TransactionTag})
2596		}
2597	}
2598}
2599
2600func TestClient_StmtBasedReadWriteTransaction_Tag(t *testing.T) {
2601	t.Parallel()
2602
2603	server, client, teardown := setupMockedTestServer(t)
2604	defer teardown()
2605	for _, to := range []TransactionOptions{
2606		{},
2607		{TransactionTag: "tx-tag-1"},
2608	} {
2609		for _, qo := range []QueryOptions{
2610			{},
2611			{RequestTag: "request-tag-1"},
2612		} {
2613			tx, _ := NewReadWriteStmtBasedTransactionWithOptions(context.Background(), client, to)
2614			iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
2615			iter.Next()
2616			iter.Stop()
2617
2618			iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag})
2619			iter.Next()
2620			iter.Stop()
2621
2622			tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
2623			tx.BatchUpdateWithOptions(context.Background(), []Statement{
2624				NewStatement(UpdateBarSetFoo),
2625			}, qo)
2626			checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{RequestTag: qo.RequestTag, TransactionTag: to.TransactionTag})
2627
2628			tx.Commit(context.Background())
2629			checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: to.TransactionTag})
2630		}
2631	}
2632}
2633
2634func TestClient_PDML_Tag(t *testing.T) {
2635	t.Parallel()
2636
2637	server, client, teardown := setupMockedTestServer(t)
2638	defer teardown()
2639
2640	for _, qo := range []QueryOptions{
2641		{},
2642		{RequestTag: "request-tag-1"},
2643	} {
2644		client.PartitionedUpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
2645		checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{RequestTag: qo.RequestTag})
2646	}
2647}
2648
2649func TestClient_Apply_Tagging(t *testing.T) {
2650	t.Parallel()
2651
2652	server, client, teardown := setupMockedTestServer(t)
2653	defer teardown()
2654
2655	client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})})
2656	checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{})
2657
2658	client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, TransactionTag("tx-tag"))
2659	checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: "tx-tag"})
2660
2661	client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce())
2662	checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{})
2663
2664	client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), TransactionTag("tx-tag"))
2665	checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: "tx-tag"})
2666}
2667
2668func TestClient_PartitionQuery_RequestOptions(t *testing.T) {
2669	t.Parallel()
2670
2671	server, client, teardown := setupMockedTestServer(t)
2672	defer teardown()
2673
2674	for _, qo := range []QueryOptions{
2675		{},
2676		{Priority: sppb.RequestOptions_PRIORITY_LOW},
2677		{RequestTag: "batch-query-tag"},
2678		{Priority: sppb.RequestOptions_PRIORITY_MEDIUM, RequestTag: "batch-query-with-medium-prio"},
2679	} {
2680		ctx := context.Background()
2681		txn, _ := client.BatchReadOnlyTransaction(ctx, StrongRead())
2682		partitions, _ := txn.PartitionQueryWithOptions(ctx, NewStatement(SelectFooFromBar), PartitionOptions{MaxPartitions: 10}, qo)
2683		for _, p := range partitions {
2684			iter := txn.Execute(ctx, p)
2685			iter.Next()
2686			iter.Stop()
2687		}
2688		checkRequestsForExpectedRequestOptions(t, server.TestSpanner, len(partitions), sppb.RequestOptions{RequestTag: qo.RequestTag, Priority: qo.Priority})
2689	}
2690}
2691
2692func TestClient_PartitionRead_RequestOptions(t *testing.T) {
2693	t.Parallel()
2694
2695	server, client, teardown := setupMockedTestServer(t)
2696	defer teardown()
2697
2698	for _, ro := range []ReadOptions{
2699		{},
2700		{Priority: sppb.RequestOptions_PRIORITY_LOW},
2701		{RequestTag: "batch-read-tag"},
2702		{Priority: sppb.RequestOptions_PRIORITY_MEDIUM, RequestTag: "batch-read-with-medium-prio"},
2703	} {
2704		ctx := context.Background()
2705		txn, _ := client.BatchReadOnlyTransaction(ctx, StrongRead())
2706		partitions, _ := txn.PartitionReadWithOptions(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}, PartitionOptions{MaxPartitions: 10}, ro)
2707		for _, p := range partitions {
2708			iter := txn.Execute(ctx, p)
2709			iter.Next()
2710			iter.Stop()
2711		}
2712		checkRequestsForExpectedRequestOptions(t, server.TestSpanner, len(partitions), sppb.RequestOptions{RequestTag: ro.RequestTag, Priority: ro.Priority})
2713	}
2714}
2715
2716func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, reqCount int, ro sppb.RequestOptions) {
2717	reqs := drainRequestsFromServer(server)
2718	reqOptions := []*sppb.RequestOptions{}
2719
2720	for _, req := range reqs {
2721		if sqlReq, ok := req.(*sppb.ExecuteSqlRequest); ok {
2722			reqOptions = append(reqOptions, sqlReq.RequestOptions)
2723		}
2724		if batchReq, ok := req.(*sppb.ExecuteBatchDmlRequest); ok {
2725			reqOptions = append(reqOptions, batchReq.RequestOptions)
2726		}
2727		if readReq, ok := req.(*sppb.ReadRequest); ok {
2728			reqOptions = append(reqOptions, readReq.RequestOptions)
2729		}
2730	}
2731
2732	if got, want := len(reqOptions), reqCount; got != want {
2733		t.Fatalf("Requests length mismatch\nGot: %v\nWant: %v", got, want)
2734	}
2735
2736	for _, opts := range reqOptions {
2737		if opts == nil {
2738			opts = &sppb.RequestOptions{}
2739		}
2740		if got, want := opts.Priority, ro.Priority; got != want {
2741			t.Fatalf("Request priority mismatch\nGot: %v\nWant: %v", got, want)
2742		}
2743		if got, want := opts.RequestTag, ro.RequestTag; got != want {
2744			t.Fatalf("Request tag mismatch\nGot: %v\nWant: %v", got, want)
2745		}
2746		if got, want := opts.TransactionTag, ro.TransactionTag; got != want {
2747			t.Fatalf("Transaction tag mismatch\nGot: %v\nWant: %v", got, want)
2748		}
2749	}
2750}
2751
2752func checkCommitForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, ro sppb.RequestOptions) {
2753	reqs := drainRequestsFromServer(server)
2754	var commit *sppb.CommitRequest
2755	var ok bool
2756
2757	for _, req := range reqs {
2758		if commit, ok = req.(*sppb.CommitRequest); ok {
2759			break
2760		}
2761	}
2762
2763	if commit == nil {
2764		t.Fatalf("Missing commit request")
2765	}
2766
2767	var got sppb.RequestOptions_Priority
2768	if commit.RequestOptions != nil {
2769		got = commit.RequestOptions.Priority
2770	}
2771	want := ro.Priority
2772	if got != want {
2773		t.Fatalf("Commit priority mismatch\nGot: %v\nWant: %v", got, want)
2774	}
2775
2776	var requestTag string
2777	var transactionTag string
2778	if commit.RequestOptions != nil {
2779		requestTag = commit.RequestOptions.RequestTag
2780		transactionTag = commit.RequestOptions.TransactionTag
2781	}
2782	if got, want := requestTag, ro.RequestTag; got != want {
2783		t.Fatalf("Commit request tag mismatch\nGot: %v\nWant: %v", got, want)
2784	}
2785	if got, want := transactionTag, ro.TransactionTag; got != want {
2786		t.Fatalf("Commit transaction tag mismatch\nGot: %v\nWant: %v", got, want)
2787	}
2788}
2789
2790func TestClient_Single_Read_WithNumericKey(t *testing.T) {
2791	t.Parallel()
2792
2793	_, client, teardown := setupMockedTestServer(t)
2794	defer teardown()
2795	ctx := context.Background()
2796	iter := client.Single().Read(ctx, "Albums", KeySets(Key{*big.NewRat(1, 1)}), []string{"SingerId", "AlbumId", "AlbumTitle"})
2797	defer iter.Stop()
2798	rowCount := int64(0)
2799	for {
2800		_, err := iter.Next()
2801		if err == iterator.Done {
2802			break
2803		}
2804		if err != nil {
2805			t.Fatal(err)
2806		}
2807		rowCount++
2808	}
2809	if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
2810		t.Fatalf("row count mismatch\nGot: %v\nWant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
2811	}
2812}
2813
2814func TestClient_CloseWithUnresponsiveBackend(t *testing.T) {
2815	t.Parallel()
2816
2817	minOpened := uint64(5)
2818	server, client, teardown := setupMockedTestServerWithConfig(t,
2819		ClientConfig{
2820			SessionPoolConfig: SessionPoolConfig{
2821				MinOpened: minOpened,
2822			},
2823		})
2824	defer teardown()
2825	sp := client.idleSessions
2826
2827	waitFor(t, func() error {
2828		sp.mu.Lock()
2829		defer sp.mu.Unlock()
2830		if uint64(sp.idleList.Len()) != minOpened {
2831			return fmt.Errorf("num open sessions mismatch\nWant: %d\nGot: %d", sp.MinOpened, sp.numOpened)
2832		}
2833		return nil
2834	})
2835	server.TestSpanner.Freeze()
2836	defer server.TestSpanner.Unfreeze()
2837
2838	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
2839	defer cancel()
2840	sp.close(ctx)
2841
2842	if w, g := context.DeadlineExceeded, ctx.Err(); w != g {
2843		t.Fatalf("context error mismatch\nWant: %v\nGot: %v", w, g)
2844	}
2845}
2846