1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"errors"
22	"flag"
23	"fmt"
24	"log"
25	"math"
26	"math/big"
27	"os"
28	"reflect"
29	"regexp"
30	"strings"
31	"sync"
32	"testing"
33	"time"
34
35	"cloud.google.com/go/civil"
36	"cloud.google.com/go/internal/testutil"
37	"cloud.google.com/go/internal/uid"
38	database "cloud.google.com/go/spanner/admin/database/apiv1"
39	instance "cloud.google.com/go/spanner/admin/instance/apiv1"
40	"google.golang.org/api/iterator"
41	"google.golang.org/api/option"
42	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
43	instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
44	sppb "google.golang.org/genproto/googleapis/spanner/v1"
45	"google.golang.org/grpc"
46	"google.golang.org/grpc/codes"
47	"google.golang.org/grpc/status"
48)
49
50var (
51	// testProjectID specifies the project used for testing. It can be changed
52	// by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID.
53	testProjectID = testutil.ProjID()
54
55	dbNameSpace       = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
56	instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true})
57	backupIDSpace     = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
58	testInstanceID    = instanceNameSpace.New()
59
60	testTable        = "TestTable"
61	testTableIndex   = "TestTableByValue"
62	testTableColumns = []string{"Key", "StringValue"}
63
64	databaseAdmin *database.DatabaseAdminClient
65	instanceAdmin *instance.InstanceAdminClient
66
67	singerDBStatements = []string{
68		`CREATE TABLE Singers (
69				SingerId	INT64 NOT NULL,
70				FirstName	STRING(1024),
71				LastName	STRING(1024),
72				SingerInfo	BYTES(MAX)
73			) PRIMARY KEY (SingerId)`,
74		`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
75		`CREATE TABLE Accounts (
76				AccountId	INT64 NOT NULL,
77				Nickname	STRING(100),
78				Balance		INT64 NOT NULL,
79			) PRIMARY KEY (AccountId)`,
80		`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
81		`CREATE TABLE Types (
82				RowID		INT64 NOT NULL,
83				String		STRING(MAX),
84				StringArray	ARRAY<STRING(MAX)>,
85				Bytes		BYTES(MAX),
86				BytesArray	ARRAY<BYTES(MAX)>,
87				Int64a		INT64,
88				Int64Array	ARRAY<INT64>,
89				Bool		BOOL,
90				BoolArray	ARRAY<BOOL>,
91				Float64		FLOAT64,
92				Float64Array	ARRAY<FLOAT64>,
93				Date		DATE,
94				DateArray	ARRAY<DATE>,
95				Timestamp	TIMESTAMP,
96				TimestampArray	ARRAY<TIMESTAMP>,
97			) PRIMARY KEY (RowID)`,
98	}
99
100	readDBStatements = []string{
101		`CREATE TABLE TestTable (
102                    Key          STRING(MAX) NOT NULL,
103                    StringValue  STRING(MAX)
104            ) PRIMARY KEY (Key)`,
105		`CREATE INDEX TestTableByValue ON TestTable(StringValue)`,
106		`CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`,
107	}
108
109	simpleDBStatements = []string{
110		`CREATE TABLE test (
111				a	STRING(1024),
112				b	STRING(1024),
113			) PRIMARY KEY (a)`,
114	}
115	simpleDBTableColumns = []string{"a", "b"}
116
117	ctsDBStatements = []string{
118		`CREATE TABLE TestTable (
119		    Key  STRING(MAX) NOT NULL,
120		    Ts   TIMESTAMP OPTIONS (allow_commit_timestamp = true),
121	    ) PRIMARY KEY (Key)`,
122	}
123	backuDBStatements = []string{
124		`CREATE TABLE Singers (
125						SingerId	INT64 NOT NULL,
126						FirstName	STRING(1024),
127						LastName	STRING(1024),
128						SingerInfo	BYTES(MAX)
129					) PRIMARY KEY (SingerId)`,
130		`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
131		`CREATE TABLE Accounts (
132						AccountId	INT64 NOT NULL,
133						Nickname	STRING(100),
134						Balance		INT64 NOT NULL,
135					) PRIMARY KEY (AccountId)`,
136		`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
137		`CREATE TABLE Types (
138						RowID		INT64 NOT NULL,
139						String		STRING(MAX),
140						StringArray	ARRAY<STRING(MAX)>,
141						Bytes		BYTES(MAX),
142						BytesArray	ARRAY<BYTES(MAX)>,
143						Int64a		INT64,
144						Int64Array	ARRAY<INT64>,
145						Bool		BOOL,
146						BoolArray	ARRAY<BOOL>,
147						Float64		FLOAT64,
148						Float64Array	ARRAY<FLOAT64>,
149						Date		DATE,
150						DateArray	ARRAY<DATE>,
151						Timestamp	TIMESTAMP,
152						TimestampArray	ARRAY<TIMESTAMP>,
153					) PRIMARY KEY (RowID)`,
154	}
155
156	validInstancePattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)$")
157)
158
159func parseInstanceName(inst string) (project, instance string, err error) {
160	matches := validInstancePattern.FindStringSubmatch(inst)
161	if len(matches) == 0 {
162		return "", "", fmt.Errorf("Failed to parse instance name from %q according to pattern %q",
163			inst, validInstancePattern.String())
164	}
165	return matches[1], matches[2], nil
166}
167
168const (
169	str1 = "alice"
170	str2 = "a@example.com"
171)
172
173func TestMain(m *testing.M) {
174	cleanup := initIntegrationTests()
175	res := m.Run()
176	cleanup()
177	os.Exit(res)
178}
179
180var grpcHeaderChecker = testutil.DefaultHeadersEnforcer()
181
182func initIntegrationTests() (cleanup func()) {
183	ctx := context.Background()
184	flag.Parse() // Needed for testing.Short().
185	noop := func() {}
186
187	if testing.Short() {
188		log.Println("Integration tests skipped in -short mode.")
189		return noop
190	}
191
192	if testProjectID == "" {
193		log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing")
194		return noop
195	}
196
197	opts := grpcHeaderChecker.CallOptions()
198	var err error
199	// Create InstanceAdmin and DatabaseAdmin clients.
200	instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...)
201	if err != nil {
202		log.Fatalf("cannot create instance databaseAdmin client: %v", err)
203	}
204	databaseAdmin, err = database.NewDatabaseAdminClient(ctx, opts...)
205	if err != nil {
206		log.Fatalf("cannot create databaseAdmin client: %v", err)
207	}
208	// Get the list of supported instance configs for the project that is used
209	// for the integration tests. The supported instance configs can differ per
210	// project. The integration tests will use the first instance config that
211	// is returned by Cloud Spanner. This will normally be the regional config
212	// that is physically the closest to where the request is coming from.
213	configIterator := instanceAdmin.ListInstanceConfigs(ctx, &instancepb.ListInstanceConfigsRequest{
214		Parent: fmt.Sprintf("projects/%s", testProjectID),
215	})
216	config, err := configIterator.Next()
217	if err != nil {
218		log.Fatalf("Cannot get any instance configurations.\nPlease make sure the Cloud Spanner API is enabled for the test project.\nGet error: %v", err)
219	}
220
221	// First clean up any old test instances before we start the actual testing
222	// as these might cause this test run to fail.
223	cleanupInstances()
224
225	// Create a test instance to use for this test run.
226	op, err := instanceAdmin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{
227		Parent:     fmt.Sprintf("projects/%s", testProjectID),
228		InstanceId: testInstanceID,
229		Instance: &instancepb.Instance{
230			Config:      config.Name,
231			DisplayName: testInstanceID,
232			NodeCount:   1,
233		},
234	})
235	if err != nil {
236		log.Fatalf("could not create instance with id %s: %v", fmt.Sprintf("projects/%s/instances/%s", testProjectID, testInstanceID), err)
237	}
238	// Wait for the instance creation to finish.
239	i, err := op.Wait(ctx)
240	if err != nil {
241		log.Fatalf("waiting for instance creation to finish failed: %v", err)
242	}
243	if i.State != instancepb.Instance_READY {
244		log.Printf("instance state is not READY, it might be that the test instance will cause problems during tests. Got state %v\n", i.State)
245	}
246
247	return func() {
248		// Delete this test instance.
249		instanceName := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID)
250		deleteInstanceAndBackups(ctx, instanceName)
251		// Delete other test instances that may be lingering around.
252		cleanupInstances()
253		databaseAdmin.Close()
254		instanceAdmin.Close()
255	}
256}
257
258func TestIntegration_InitSessionPool(t *testing.T) {
259	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
260	defer cancel()
261	// Set up an empty testing environment.
262	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, []string{})
263	defer cleanup()
264	sp := client.idleSessions
265	sp.mu.Lock()
266	want := sp.MinOpened
267	sp.mu.Unlock()
268	var numOpened int
269loop:
270	for {
271		select {
272		case <-ctx.Done():
273			t.Fatalf("timed out, got %d session(s), want %d", numOpened, want)
274		default:
275			sp.mu.Lock()
276			numOpened = sp.idleList.Len() + sp.idleWriteList.Len()
277			sp.mu.Unlock()
278			if uint64(numOpened) == want {
279				break loop
280			}
281		}
282	}
283	// Delete all sessions in the pool on the backend and then try to execute a
284	// simple query. The 'Session not found' error should cause an automatic
285	// retry of the read-only transaction.
286	sp.mu.Lock()
287	s := sp.idleList.Front()
288	for {
289		if s == nil {
290			break
291		}
292		// This will delete the session on the backend without removing it
293		// from the pool.
294		s.Value.(*session).delete(context.Background())
295		s = s.Next()
296	}
297	sp.mu.Unlock()
298	sql := "SELECT 1, 'FOO', 'BAR'"
299	tx := client.ReadOnlyTransaction()
300	defer tx.Close()
301	iter := tx.Query(context.Background(), NewStatement(sql))
302	rows, err := readAll(iter)
303	if err != nil {
304		t.Fatalf("Unexpected error for query %q: %v", sql, err)
305	}
306	if got, want := len(rows), 1; got != want {
307		t.Fatalf("Row count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
308	}
309	if got, want := len(rows[0]), 3; got != want {
310		t.Fatalf("Column count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
311	}
312	if got, want := rows[0][0].(int64), int64(1); got != want {
313		t.Fatalf("Column value mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
314	}
315}
316
317// Test SingleUse transaction.
318func TestIntegration_SingleUse(t *testing.T) {
319	t.Parallel()
320
321	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
322	defer cancel()
323	// Set up testing environment.
324	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
325	defer cleanup()
326
327	writes := []struct {
328		row []interface{}
329		ts  time.Time
330	}{
331		{row: []interface{}{1, "Marc", "Foo"}},
332		{row: []interface{}{2, "Tars", "Bar"}},
333		{row: []interface{}{3, "Alpha", "Beta"}},
334		{row: []interface{}{4, "Last", "End"}},
335	}
336	// Try to write four rows through the Apply API.
337	for i, w := range writes {
338		var err error
339		m := InsertOrUpdate("Singers",
340			[]string{"SingerId", "FirstName", "LastName"},
341			w.row)
342		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
343			t.Fatal(err)
344		}
345	}
346	// Calculate time difference between Cloud Spanner server and localhost to
347	// use to determine the exact staleness value to use.
348	timeDiff := maxDuration(time.Now().Sub(writes[0].ts), 0)
349
350	// Test reading rows with different timestamp bounds.
351	for i, test := range []struct {
352		name    string
353		want    [][]interface{}
354		tb      TimestampBound
355		checkTs func(time.Time) error
356	}{
357		{
358			name: "strong",
359			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
360			tb:   StrongRead(),
361			checkTs: func(ts time.Time) error {
362				// writes[3] is the last write, all subsequent strong read
363				// should have a timestamp larger than that.
364				if ts.Before(writes[3].ts) {
365					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
366				}
367				return nil
368			},
369		},
370		{
371			name: "min_read_timestamp",
372			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
373			tb:   MinReadTimestamp(writes[3].ts),
374			checkTs: func(ts time.Time) error {
375				if ts.Before(writes[3].ts) {
376					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
377				}
378				return nil
379			},
380		},
381		{
382			name: "max_staleness",
383			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
384			tb:   MaxStaleness(time.Second),
385			checkTs: func(ts time.Time) error {
386				if ts.Before(writes[3].ts) {
387					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
388				}
389				return nil
390			},
391		},
392		{
393			name: "read_timestamp",
394			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
395			tb:   ReadTimestamp(writes[2].ts),
396			checkTs: func(ts time.Time) error {
397				if ts != writes[2].ts {
398					return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts)
399				}
400				return nil
401			},
402		},
403		{
404			name: "exact_staleness",
405			want: nil,
406			// Specify a staleness which should be already before this test.
407			tb: ExactStaleness(time.Now().Sub(writes[0].ts) + timeDiff + 30*time.Second),
408			checkTs: func(ts time.Time) error {
409				if !ts.Before(writes[0].ts) {
410					return fmt.Errorf("read got timestamp %v, want it to be earlier than %v", ts, writes[0].ts)
411				}
412				return nil
413			},
414		},
415	} {
416		t.Run(test.name, func(t *testing.T) {
417			// SingleUse.Query
418			su := client.Single().WithTimestampBound(test.tb)
419			got, err := readAll(su.Query(
420				ctx,
421				Statement{
422					"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4) ORDER BY SingerId",
423					map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
424				}))
425			if err != nil {
426				t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err)
427			}
428			rts, err := su.Timestamp()
429			if err != nil {
430				t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err)
431			}
432			if err := test.checkTs(rts); err != nil {
433				t.Fatalf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err)
434			}
435			if !testEqual(got, test.want) {
436				t.Fatalf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want)
437			}
438			// SingleUse.Read
439			su = client.Single().WithTimestampBound(test.tb)
440			got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
441			if err != nil {
442				t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err)
443			}
444			rts, err = su.Timestamp()
445			if err != nil {
446				t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err)
447			}
448			if err := test.checkTs(rts); err != nil {
449				t.Fatalf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err)
450			}
451			if !testEqual(got, test.want) {
452				t.Fatalf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want)
453			}
454			// SingleUse.ReadRow
455			got = nil
456			for _, k := range []Key{{1}, {3}, {4}} {
457				su = client.Single().WithTimestampBound(test.tb)
458				r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
459				if err != nil {
460					continue
461				}
462				v, err := rowToValues(r)
463				if err != nil {
464					continue
465				}
466				got = append(got, v)
467				rts, err = su.Timestamp()
468				if err != nil {
469					t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
470				}
471				if err := test.checkTs(rts); err != nil {
472					t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
473				}
474			}
475			if !testEqual(got, test.want) {
476				t.Fatalf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want)
477			}
478			// SingleUse.ReadUsingIndex
479			su = client.Single().WithTimestampBound(test.tb)
480			got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
481			if err != nil {
482				t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err)
483			}
484			// The results from ReadUsingIndex is sorted by the index rather than primary key.
485			if len(got) != len(test.want) {
486				t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
487			}
488			for j, g := range got {
489				if j > 0 {
490					prev := got[j-1][1].(string) + got[j-1][2].(string)
491					curr := got[j][1].(string) + got[j][2].(string)
492					if strings.Compare(prev, curr) > 0 {
493						t.Fatalf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
494					}
495				}
496				found := false
497				for _, w := range test.want {
498					if testEqual(g, w) {
499						found = true
500					}
501				}
502				if !found {
503					t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
504					break
505				}
506			}
507			rts, err = su.Timestamp()
508			if err != nil {
509				t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
510			}
511			if err := test.checkTs(rts); err != nil {
512				t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
513			}
514			// SingleUse.ReadRowUsingIndex
515			got = nil
516			for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} {
517				su = client.Single().WithTimestampBound(test.tb)
518				r, err := su.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"})
519				if err != nil {
520					continue
521				}
522				v, err := rowToValues(r)
523				if err != nil {
524					continue
525				}
526				got = append(got, v)
527				rts, err = su.Timestamp()
528				if err != nil {
529					t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err)
530				}
531				if err := test.checkTs(rts); err != nil {
532					t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err)
533				}
534			}
535			if !testEqual(got, test.want) {
536				t.Fatalf("%d: got unexpected results from SingleUse.ReadRowUsingIndex: %v, want %v", i, got, test.want)
537			}
538		})
539	}
540}
541
542// Test custom query options provided on query-level configuration.
543func TestIntegration_SingleUse_WithQueryOptions(t *testing.T) {
544	skipEmulatorTest(t)
545	t.Parallel()
546
547	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
548	defer cancel()
549	// Set up testing environment.
550	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
551	defer cleanup()
552
553	writes := []struct {
554		row []interface{}
555		ts  time.Time
556	}{
557		{row: []interface{}{1, "Marc", "Foo"}},
558		{row: []interface{}{2, "Tars", "Bar"}},
559		{row: []interface{}{3, "Alpha", "Beta"}},
560		{row: []interface{}{4, "Last", "End"}},
561	}
562	// Try to write four rows through the Apply API.
563	for i, w := range writes {
564		var err error
565		m := InsertOrUpdate("Singers",
566			[]string{"SingerId", "FirstName", "LastName"},
567			w.row)
568		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
569			t.Fatal(err)
570		}
571	}
572	qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}}
573	got, err := readAll(client.Single().QueryWithOptions(ctx, Statement{
574		"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
575		map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
576	}, qo))
577
578	if err != nil {
579		t.Errorf("ReadOnlyTransaction.QueryWithOptions returns error %v, want nil", err)
580	}
581
582	want := [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}
583	if !testEqual(got, want) {
584		t.Errorf("got unexpected result from ReadOnlyTransaction.QueryWithOptions: %v, want %v", got, want)
585	}
586}
587
588func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) {
589	t.Parallel()
590
591	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
592	defer cancel()
593	// Set up testing environment.
594	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
595	defer cleanup()
596
597	writes := []struct {
598		row []interface{}
599		ts  time.Time
600	}{
601		{row: []interface{}{1, "Marc", "Foo"}},
602		{row: []interface{}{2, "Tars", "Bar"}},
603		{row: []interface{}{3, "Alpha", "Beta"}},
604		{row: []interface{}{4, "Last", "End"}},
605	}
606	// Try to write four rows through the Apply API.
607	for i, w := range writes {
608		var err error
609		m := InsertOrUpdate("Singers",
610			[]string{"SingerId", "FirstName", "LastName"},
611			w.row)
612		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
613			t.Fatal(err)
614		}
615	}
616
617	su := client.Single()
618	const limit = 1
619	gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}),
620		[]string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit}))
621	if err != nil {
622		t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err)
623	}
624	if got, want := len(gotRows), limit; got != want {
625		t.Errorf("got %d, want %d", got, want)
626	}
627}
628
629// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it
630// also tests for a single timestamp across multiple reads.
631func TestIntegration_ReadOnlyTransaction(t *testing.T) {
632	t.Parallel()
633
634	ctxTimeout := 5 * time.Minute
635	ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
636	defer cancel()
637	// Set up testing environment.
638	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
639	defer cleanup()
640
641	writes := []struct {
642		row []interface{}
643		ts  time.Time
644	}{
645		{row: []interface{}{1, "Marc", "Foo"}},
646		{row: []interface{}{2, "Tars", "Bar"}},
647		{row: []interface{}{3, "Alpha", "Beta"}},
648		{row: []interface{}{4, "Last", "End"}},
649	}
650	// Try to write four rows through the Apply API.
651	for i, w := range writes {
652		var err error
653		m := InsertOrUpdate("Singers",
654			[]string{"SingerId", "FirstName", "LastName"},
655			w.row)
656		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
657			t.Fatal(err)
658		}
659	}
660
661	// For testing timestamp bound staleness.
662	<-time.After(time.Second)
663
664	// Test reading rows with different timestamp bounds.
665	for i, test := range []struct {
666		want    [][]interface{}
667		tb      TimestampBound
668		checkTs func(time.Time) error
669	}{
670		// Note: min_read_timestamp and max_staleness are not supported by
671		// ReadOnlyTransaction. See API document for more details.
672		{
673			// strong
674			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
675			StrongRead(),
676			func(ts time.Time) error {
677				if ts.Before(writes[3].ts) {
678					return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
679				}
680				return nil
681			},
682		},
683		{
684			// read_timestamp
685			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
686			ReadTimestamp(writes[2].ts),
687			func(ts time.Time) error {
688				if ts != writes[2].ts {
689					return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts)
690				}
691				return nil
692			},
693		},
694		{
695			// exact_staleness
696			nil,
697			// Specify a staleness which should be already before this test.
698			ExactStaleness(ctxTimeout + 1),
699			func(ts time.Time) error {
700				if ts.After(writes[0].ts) {
701					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts)
702				}
703				return nil
704			},
705		},
706	} {
707		// ReadOnlyTransaction.Query
708		ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb)
709		got, err := readAll(ro.Query(
710			ctx,
711			Statement{
712				"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4) ORDER BY SingerId",
713				map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
714			}))
715		if err != nil {
716			t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err)
717		}
718		if !testEqual(got, test.want) {
719			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want)
720		}
721		rts, err := ro.Timestamp()
722		if err != nil {
723			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err)
724		}
725		if err := test.checkTs(rts); err != nil {
726			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err)
727		}
728		roTs := rts
729		// ReadOnlyTransaction.Read
730		got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
731		if err != nil {
732			t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err)
733		}
734		if !testEqual(got, test.want) {
735			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want)
736		}
737		rts, err = ro.Timestamp()
738		if err != nil {
739			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err)
740		}
741		if err := test.checkTs(rts); err != nil {
742			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err)
743		}
744		if roTs != rts {
745			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
746		}
747		// ReadOnlyTransaction.ReadRow
748		got = nil
749		for _, k := range []Key{{1}, {3}, {4}} {
750			r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
751			if err != nil {
752				continue
753			}
754			v, err := rowToValues(r)
755			if err != nil {
756				continue
757			}
758			got = append(got, v)
759			rts, err = ro.Timestamp()
760			if err != nil {
761				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
762			}
763			if err := test.checkTs(rts); err != nil {
764				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
765			}
766			if roTs != rts {
767				t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
768			}
769		}
770		if !testEqual(got, test.want) {
771			t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want)
772		}
773		// SingleUse.ReadUsingIndex
774		got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
775		if err != nil {
776			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err)
777		}
778		// The results from ReadUsingIndex is sorted by the index rather than
779		// primary key.
780		if len(got) != len(test.want) {
781			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
782		}
783		for j, g := range got {
784			if j > 0 {
785				prev := got[j-1][1].(string) + got[j-1][2].(string)
786				curr := got[j][1].(string) + got[j][2].(string)
787				if strings.Compare(prev, curr) > 0 {
788					t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
789				}
790			}
791			found := false
792			for _, w := range test.want {
793				if testEqual(g, w) {
794					found = true
795				}
796			}
797			if !found {
798				t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
799				break
800			}
801		}
802		rts, err = ro.Timestamp()
803		if err != nil {
804			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
805		}
806		if err := test.checkTs(rts); err != nil {
807			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
808		}
809		if roTs != rts {
810			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
811		}
812		// ReadOnlyTransaction.ReadRowUsingIndex
813		got = nil
814		for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} {
815			r, err := ro.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"})
816			if err != nil {
817				continue
818			}
819			v, err := rowToValues(r)
820			if err != nil {
821				continue
822			}
823			got = append(got, v)
824			rts, err = ro.Timestamp()
825			if err != nil {
826				t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err)
827			}
828			if err := test.checkTs(rts); err != nil {
829				t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err)
830			}
831			if roTs != rts {
832				t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
833			}
834		}
835		if !testEqual(got, test.want) {
836			t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRowUsingIndex: %v, want %v", i, got, test.want)
837		}
838		ro.Close()
839	}
840}
841
842// Test ReadOnlyTransaction with different timestamp bound when there's an
843// update at the same time.
844func TestIntegration_UpdateDuringRead(t *testing.T) {
845	t.Parallel()
846
847	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
848	defer cancel()
849	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
850	defer cleanup()
851
852	for i, tb := range []TimestampBound{
853		StrongRead(),
854		ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour
855		ExactStaleness(time.Minute * 30),
856	} {
857		ro := client.ReadOnlyTransaction().WithTimestampBound(tb)
858		_, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
859		if ErrCode(err) != codes.NotFound {
860			t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err)
861		}
862
863		m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i})
864		if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
865			t.Fatal(err)
866		}
867
868		_, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
869		if ErrCode(err) != codes.NotFound {
870			t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err)
871		}
872	}
873}
874
875// Test ReadWriteTransaction.
876func TestIntegration_ReadWriteTransaction(t *testing.T) {
877	t.Parallel()
878
879	// Give a longer deadline because of transaction backoffs.
880	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
881	defer cancel()
882	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
883	defer cleanup()
884
885	// Set up two accounts
886	accounts := []*Mutation{
887		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
888		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
889	}
890	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
891		t.Fatal(err)
892	}
893	wg := sync.WaitGroup{}
894
895	readBalance := func(iter *RowIterator) (int64, error) {
896		defer iter.Stop()
897		var bal int64
898		for {
899			row, err := iter.Next()
900			if err == iterator.Done {
901				return bal, nil
902			}
903			if err != nil {
904				return 0, err
905			}
906			if err := row.Column(0, &bal); err != nil {
907				return 0, err
908			}
909		}
910	}
911
912	for i := 0; i < 20; i++ {
913		wg.Add(1)
914		go func(iter int) {
915			defer wg.Done()
916			_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
917				// Query Foo's balance and Bar's balance.
918				bf, e := readBalance(tx.Query(ctx,
919					Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}}))
920				if e != nil {
921					return e
922				}
923				bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
924				if e != nil {
925					return e
926				}
927				if bf <= 0 {
928					return nil
929				}
930				bf--
931				bb++
932				return tx.BufferWrite([]*Mutation{
933					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}),
934					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}),
935				})
936			})
937			if err != nil {
938				t.Errorf("%d: failed to execute transaction: %v", iter, err)
939			}
940		}(i)
941	}
942	// Because of context timeout, all goroutines will eventually return.
943	wg.Wait()
944	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
945		var bf, bb int64
946		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"})
947		if e != nil {
948			return e
949		}
950		if ce := r.Column(0, &bf); ce != nil {
951			return ce
952		}
953		bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"}))
954		if e != nil {
955			return e
956		}
957		if bf != 30 || bb != 21 {
958			t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21)
959		}
960		return nil
961	})
962	if err != nil {
963		t.Errorf("failed to check balances: %v", err)
964	}
965}
966
967func TestIntegration_ReadWriteTransaction_StatementBased(t *testing.T) {
968	t.Parallel()
969
970	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
971	defer cancel()
972	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
973	defer cleanup()
974
975	// Set up two accounts
976	accounts := []*Mutation{
977		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
978		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
979	}
980	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
981		t.Fatal(err)
982	}
983
984	getBalance := func(txn *ReadWriteStmtBasedTransaction, key Key) (int64, error) {
985		row, err := txn.ReadRow(ctx, "Accounts", key, []string{"Balance"})
986		if err != nil {
987			return 0, err
988		}
989		var balance int64
990		if err := row.Column(0, &balance); err != nil {
991			return 0, err
992		}
993		return balance, nil
994	}
995
996	statements := func(txn *ReadWriteStmtBasedTransaction) error {
997		outBalance, err := getBalance(txn, Key{1})
998		if err != nil {
999			return err
1000		}
1001		const transferAmt = 20
1002		if outBalance >= transferAmt {
1003			inBalance, err := getBalance(txn, Key{2})
1004			if err != nil {
1005				return err
1006			}
1007			inBalance += transferAmt
1008			outBalance -= transferAmt
1009			cols := []string{"AccountId", "Balance"}
1010			txn.BufferWrite([]*Mutation{
1011				Update("Accounts", cols, []interface{}{1, outBalance}),
1012				Update("Accounts", cols, []interface{}{2, inBalance}),
1013			})
1014		}
1015		return nil
1016	}
1017
1018	for {
1019		tx, err := NewReadWriteStmtBasedTransaction(ctx, client)
1020		if err != nil {
1021			t.Fatalf("failed to begin a transaction: %v", err)
1022		}
1023		err = statements(tx)
1024		if err != nil && status.Code(err) != codes.Aborted {
1025			tx.Rollback(ctx)
1026			t.Fatalf("failed to execute statements: %v", err)
1027		} else if err == nil {
1028			_, err = tx.Commit(ctx)
1029			if err == nil {
1030				break
1031			} else if status.Code(err) != codes.Aborted {
1032				t.Fatalf("failed to commit a transaction: %v", err)
1033			}
1034		}
1035		// Set a default sleep time if the server delay is absent.
1036		delay := 10 * time.Millisecond
1037		if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay {
1038			delay = serverDelay
1039		}
1040		time.Sleep(delay)
1041	}
1042
1043	// Query the updated values.
1044	stmt := Statement{SQL: `SELECT AccountId, Nickname, Balance FROM Accounts ORDER BY AccountId`}
1045	iter := client.Single().Query(ctx, stmt)
1046	got, err := readAllAccountsTable(iter)
1047	if err != nil {
1048		t.Fatalf("failed to read data: %v", err)
1049	}
1050	want := [][]interface{}{
1051		{int64(1), "Foo", int64(30)},
1052		{int64(2), "Bar", int64(21)},
1053	}
1054	if !testEqual(got, want) {
1055		t.Errorf("\ngot %v\nwant%v", got, want)
1056	}
1057}
1058
1059func TestIntegration_Reads(t *testing.T) {
1060	t.Parallel()
1061
1062	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1063	defer cancel()
1064	// Set up testing environment.
1065	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
1066	defer cleanup()
1067
1068	// Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2".
1069	var ms []*Mutation
1070	for i := 0; i < 15; i++ {
1071		ms = append(ms, InsertOrUpdate(testTable,
1072			testTableColumns,
1073			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
1074	}
1075	// Don't use ApplyAtLeastOnce, so we can test the other code path.
1076	if _, err := client.Apply(ctx, ms); err != nil {
1077		t.Fatal(err)
1078	}
1079
1080	// Empty read.
1081	rows, err := readAllTestTable(client.Single().Read(ctx, testTable,
1082		KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns))
1083	if err != nil {
1084		t.Fatal(err)
1085	}
1086	if got, want := len(rows), 0; got != want {
1087		t.Errorf("got %d, want %d", got, want)
1088	}
1089
1090	// Index empty read.
1091	rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex,
1092		KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns))
1093	if err != nil {
1094		t.Fatal(err)
1095	}
1096	if got, want := len(rows), 0; got != want {
1097		t.Errorf("got %d, want %d", got, want)
1098	}
1099
1100	// Point read.
1101	row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns)
1102	if err != nil {
1103		t.Fatal(err)
1104	}
1105	var got testTableRow
1106	if err := row.ToStruct(&got); err != nil {
1107		t.Fatal(err)
1108	}
1109	if want := (testTableRow{"k1", "v1"}); got != want {
1110		t.Errorf("got %v, want %v", got, want)
1111	}
1112
1113	// Point read not found.
1114	_, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns)
1115	if ErrCode(err) != codes.NotFound {
1116		t.Fatalf("got %v, want NotFound", err)
1117	}
1118
1119	// Index point read.
1120	rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns)
1121	if err != nil {
1122		t.Fatal(err)
1123	}
1124	var gotIndex testTableRow
1125	if err := rowIndex.ToStruct(&gotIndex); err != nil {
1126		t.Fatal(err)
1127	}
1128	if wantIndex := (testTableRow{"k1", "v1"}); gotIndex != wantIndex {
1129		t.Errorf("got %v, want %v", gotIndex, wantIndex)
1130	}
1131	// Index point read not found.
1132	_, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v999"}, testTableColumns)
1133	if ErrCode(err) != codes.NotFound {
1134		t.Fatalf("got %v, want NotFound", err)
1135	}
1136	rangeReads(ctx, t, client)
1137	indexRangeReads(ctx, t, client)
1138}
1139
1140func TestIntegration_EarlyTimestamp(t *testing.T) {
1141	t.Parallel()
1142
1143	// Test that we can get the timestamp from a read-only transaction as
1144	// soon as we have read at least one row.
1145	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1146	defer cancel()
1147	// Set up testing environment.
1148	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
1149	defer cleanup()
1150
1151	var ms []*Mutation
1152	for i := 0; i < 3; i++ {
1153		ms = append(ms, InsertOrUpdate(testTable,
1154			testTableColumns,
1155			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
1156	}
1157	if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
1158		t.Fatal(err)
1159	}
1160
1161	txn := client.Single()
1162	iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns)
1163	defer iter.Stop()
1164	// In single-use transaction, we should get an error before reading anything.
1165	if _, err := txn.Timestamp(); err == nil {
1166		t.Error("wanted error, got nil")
1167	}
1168	// After reading one row, the timestamp should be available.
1169	_, err := iter.Next()
1170	if err != nil {
1171		t.Fatal(err)
1172	}
1173	if _, err := txn.Timestamp(); err != nil {
1174		t.Errorf("got %v, want nil", err)
1175	}
1176
1177	txn = client.ReadOnlyTransaction()
1178	defer txn.Close()
1179	iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns)
1180	defer iter.Stop()
1181	// In an ordinary read-only transaction, the timestamp should be
1182	// available immediately.
1183	if _, err := txn.Timestamp(); err != nil {
1184		t.Errorf("got %v, want nil", err)
1185	}
1186}
1187
1188func TestIntegration_NestedTransaction(t *testing.T) {
1189	t.Parallel()
1190
1191	// You cannot use a transaction from inside a read-write transaction.
1192	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1193	defer cancel()
1194	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1195	defer cleanup()
1196
1197	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1198		_, err := client.ReadWriteTransaction(ctx,
1199			func(context.Context, *ReadWriteTransaction) error { return nil })
1200		if ErrCode(err) != codes.FailedPrecondition {
1201			t.Fatalf("got %v, want FailedPrecondition", err)
1202		}
1203		_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
1204		if ErrCode(err) != codes.FailedPrecondition {
1205			t.Fatalf("got %v, want FailedPrecondition", err)
1206		}
1207		rot := client.ReadOnlyTransaction()
1208		defer rot.Close()
1209		_, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
1210		if ErrCode(err) != codes.FailedPrecondition {
1211			t.Fatalf("got %v, want FailedPrecondition", err)
1212		}
1213		return nil
1214	})
1215	if err != nil {
1216		t.Fatal(err)
1217	}
1218}
1219
1220func TestIntegration_CreateDBRetry(t *testing.T) {
1221	t.Parallel()
1222
1223	if databaseAdmin == nil {
1224		t.Skip("Integration tests skipped")
1225	}
1226	skipEmulatorTest(t)
1227
1228	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1229	defer cancel()
1230	dbName := dbNameSpace.New()
1231
1232	// Simulate an Unavailable error on the CreateDatabase RPC.
1233	hasReturnedUnavailable := false
1234	interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1235		err := invoker(ctx, method, req, reply, cc, opts...)
1236		if !hasReturnedUnavailable && err == nil {
1237			hasReturnedUnavailable = true
1238			return status.Error(codes.Unavailable, "Injected unavailable error")
1239		}
1240		return err
1241	}
1242
1243	dbAdmin, err := database.NewDatabaseAdminClient(ctx, option.WithGRPCDialOption(grpc.WithUnaryInterceptor(interceptor)))
1244	if err != nil {
1245		log.Fatalf("cannot create dbAdmin client: %v", err)
1246	}
1247	op, err := dbAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{
1248		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
1249		CreateStatement: "CREATE DATABASE " + dbName,
1250	})
1251	if err != nil {
1252		t.Fatalf("failed to create database: %v", err)
1253	}
1254	_, err = op.Wait(ctx)
1255	if err != nil {
1256		t.Fatalf("failed to create database: %v", err)
1257	}
1258	if !hasReturnedUnavailable {
1259		t.Fatalf("interceptor did not return Unavailable")
1260	}
1261}
1262
1263// Test client recovery on database recreation.
1264func TestIntegration_DbRemovalRecovery(t *testing.T) {
1265	t.Parallel()
1266
1267	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1268	defer cancel()
1269	// Create a client with MinOpened=0 to prevent the session pool maintainer
1270	// from repeatedly trying to create sessions for the invalid database.
1271	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, SessionPoolConfig{}, singerDBStatements)
1272	defer cleanup()
1273
1274	// Drop the testing database.
1275	if err := databaseAdmin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil {
1276		t.Fatalf("failed to drop testing database %v: %v", dbPath, err)
1277	}
1278
1279	// Now, send the query.
1280	iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
1281	defer iter.Stop()
1282	if _, err := iter.Next(); err == nil {
1283		t.Errorf("client sends query to removed database successfully, want it to fail")
1284	}
1285
1286	// Recreate database and table.
1287	dbName := dbPath[strings.LastIndex(dbPath, "/")+1:]
1288	op, err := databaseAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{
1289		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
1290		CreateStatement: "CREATE DATABASE " + dbName,
1291		ExtraStatements: []string{
1292			`CREATE TABLE Singers (
1293				SingerId        INT64 NOT NULL,
1294				FirstName       STRING(1024),
1295				LastName        STRING(1024),
1296				SingerInfo      BYTES(MAX)
1297			) PRIMARY KEY (SingerId)`,
1298		},
1299	})
1300	if err != nil {
1301		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
1302	}
1303	if _, err := op.Wait(ctx); err != nil {
1304		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
1305	}
1306
1307	// Now, send the query again.
1308	iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
1309	defer iter.Stop()
1310	_, err = iter.Next()
1311	if err != nil && err != iterator.Done {
1312		t.Errorf("failed to send query to database %v: %v", dbPath, err)
1313	}
1314}
1315
1316// Test encoding/decoding non-struct Cloud Spanner types.
1317func TestIntegration_BasicTypes(t *testing.T) {
1318	t.Parallel()
1319
1320	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1321	defer cancel()
1322	stmts := singerDBStatements
1323	if !isEmulatorEnvSet() {
1324		stmts = []string{
1325			`CREATE TABLE Singers (
1326					SingerId	INT64 NOT NULL,
1327					FirstName	STRING(1024),
1328					LastName	STRING(1024),
1329					SingerInfo	BYTES(MAX)
1330				) PRIMARY KEY (SingerId)`,
1331			`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
1332			`CREATE TABLE Accounts (
1333					AccountId	INT64 NOT NULL,
1334					Nickname	STRING(100),
1335					Balance		INT64 NOT NULL,
1336				) PRIMARY KEY (AccountId)`,
1337			`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
1338			`CREATE TABLE Types (
1339					RowID		INT64 NOT NULL,
1340					String		STRING(MAX),
1341					StringArray	ARRAY<STRING(MAX)>,
1342					Bytes		BYTES(MAX),
1343					BytesArray	ARRAY<BYTES(MAX)>,
1344					Int64a		INT64,
1345					Int64Array	ARRAY<INT64>,
1346					Bool		BOOL,
1347					BoolArray	ARRAY<BOOL>,
1348					Float64		FLOAT64,
1349					Float64Array	ARRAY<FLOAT64>,
1350					Date		DATE,
1351					DateArray	ARRAY<DATE>,
1352					Timestamp	TIMESTAMP,
1353					TimestampArray	ARRAY<TIMESTAMP>,
1354					Numeric		NUMERIC,
1355					NumericArray	ARRAY<NUMERIC>
1356				) PRIMARY KEY (RowID)`,
1357		}
1358	}
1359	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, stmts)
1360	defer cleanup()
1361
1362	t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z")
1363	// Boundaries
1364	t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z")
1365	t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z")
1366	d1, _ := civil.ParseDate("2016-11-15")
1367	// Boundaries
1368	d2, _ := civil.ParseDate("0001-01-01")
1369	d3, _ := civil.ParseDate("9999-12-31")
1370
1371	n0 := big.Rat{}
1372	n1p, _ := (&big.Rat{}).SetString("123456789")
1373	n2p, _ := (&big.Rat{}).SetString("123456789/1000000000")
1374	n1 := *n1p
1375	n2 := *n2p
1376
1377	tests := []struct {
1378		col  string
1379		val  interface{}
1380		want interface{}
1381	}{
1382		{col: "String", val: ""},
1383		{col: "String", val: "", want: NullString{"", true}},
1384		{col: "String", val: "foo"},
1385		{col: "String", val: "foo", want: NullString{"foo", true}},
1386		{col: "String", val: NullString{"bar", true}, want: "bar"},
1387		{col: "String", val: NullString{"bar", false}, want: NullString{"", false}},
1388		{col: "String", val: nil, want: NullString{}},
1389		{col: "StringArray", val: []string(nil), want: []NullString(nil)},
1390		{col: "StringArray", val: []string{}, want: []NullString{}},
1391		{col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}},
1392		{col: "StringArray", val: []NullString(nil)},
1393		{col: "StringArray", val: []NullString{}},
1394		{col: "StringArray", val: []NullString{{"foo", true}, {}}},
1395		{col: "Bytes", val: []byte{}},
1396		{col: "Bytes", val: []byte{1, 2, 3}},
1397		{col: "Bytes", val: []byte(nil)},
1398		{col: "BytesArray", val: [][]byte(nil)},
1399		{col: "BytesArray", val: [][]byte{}},
1400		{col: "BytesArray", val: [][]byte{{1}, {2, 3}}},
1401		{col: "Int64a", val: 0, want: int64(0)},
1402		{col: "Int64a", val: -1, want: int64(-1)},
1403		{col: "Int64a", val: 2, want: int64(2)},
1404		{col: "Int64a", val: int64(3)},
1405		{col: "Int64a", val: 4, want: NullInt64{4, true}},
1406		{col: "Int64a", val: NullInt64{5, true}, want: int64(5)},
1407		{col: "Int64a", val: NullInt64{6, true}, want: int64(6)},
1408		{col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}},
1409		{col: "Int64a", val: nil, want: NullInt64{}},
1410		{col: "Int64Array", val: []int(nil), want: []NullInt64(nil)},
1411		{col: "Int64Array", val: []int{}, want: []NullInt64{}},
1412		{col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
1413		{col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)},
1414		{col: "Int64Array", val: []int64{}, want: []NullInt64{}},
1415		{col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
1416		{col: "Int64Array", val: []NullInt64(nil)},
1417		{col: "Int64Array", val: []NullInt64{}},
1418		{col: "Int64Array", val: []NullInt64{{1, true}, {}}},
1419		{col: "Bool", val: false},
1420		{col: "Bool", val: true},
1421		{col: "Bool", val: false, want: NullBool{false, true}},
1422		{col: "Bool", val: true, want: NullBool{true, true}},
1423		{col: "Bool", val: NullBool{true, true}},
1424		{col: "Bool", val: NullBool{false, false}},
1425		{col: "Bool", val: nil, want: NullBool{}},
1426		{col: "BoolArray", val: []bool(nil), want: []NullBool(nil)},
1427		{col: "BoolArray", val: []bool{}, want: []NullBool{}},
1428		{col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}},
1429		{col: "BoolArray", val: []NullBool(nil)},
1430		{col: "BoolArray", val: []NullBool{}},
1431		{col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}},
1432		{col: "Float64", val: 0.0},
1433		{col: "Float64", val: 3.14},
1434		{col: "Float64", val: math.NaN()},
1435		{col: "Float64", val: math.Inf(1)},
1436		{col: "Float64", val: math.Inf(-1)},
1437		{col: "Float64", val: 2.78, want: NullFloat64{2.78, true}},
1438		{col: "Float64", val: NullFloat64{2.71, true}, want: 2.71},
1439		{col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}},
1440		{col: "Float64", val: NullFloat64{0, false}},
1441		{col: "Float64", val: nil, want: NullFloat64{}},
1442		{col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)},
1443		{col: "Float64Array", val: []float64{}, want: []NullFloat64{}},
1444		{col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}},
1445		{col: "Float64Array", val: []NullFloat64(nil)},
1446		{col: "Float64Array", val: []NullFloat64{}},
1447		{col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}},
1448		{col: "Date", val: d1},
1449		{col: "Date", val: d1, want: NullDate{d1, true}},
1450		{col: "Date", val: NullDate{d1, true}},
1451		{col: "Date", val: NullDate{d1, true}, want: d1},
1452		{col: "Date", val: NullDate{civil.Date{}, false}},
1453		{col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)},
1454		{col: "DateArray", val: []civil.Date{}, want: []NullDate{}},
1455		{col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}},
1456		{col: "Timestamp", val: t1},
1457		{col: "Timestamp", val: t1, want: NullTime{t1, true}},
1458		{col: "Timestamp", val: NullTime{t1, true}},
1459		{col: "Timestamp", val: NullTime{t1, true}, want: t1},
1460		{col: "Timestamp", val: NullTime{}},
1461		{col: "Timestamp", val: nil, want: NullTime{}},
1462		{col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)},
1463		{col: "TimestampArray", val: []time.Time{}, want: []NullTime{}},
1464		{col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}},
1465	}
1466
1467	if !isEmulatorEnvSet() {
1468		for _, tc := range []struct {
1469			col  string
1470			val  interface{}
1471			want interface{}
1472		}{
1473			{col: "Numeric", val: n1},
1474			{col: "Numeric", val: n2},
1475			{col: "Numeric", val: n1, want: NullNumeric{n1, true}},
1476			{col: "Numeric", val: n2, want: NullNumeric{n2, true}},
1477			{col: "Numeric", val: NullNumeric{n1, true}, want: n1},
1478			{col: "Numeric", val: NullNumeric{n1, true}, want: NullNumeric{n1, true}},
1479			{col: "Numeric", val: NullNumeric{n0, false}},
1480			{col: "Numeric", val: nil, want: NullNumeric{}},
1481			{col: "NumericArray", val: []big.Rat(nil), want: []NullNumeric(nil)},
1482			{col: "NumericArray", val: []big.Rat{}, want: []NullNumeric{}},
1483			{col: "NumericArray", val: []big.Rat{n1, n2}, want: []NullNumeric{{n1, true}, {n2, true}}},
1484			{col: "NumericArray", val: []NullNumeric(nil)},
1485			{col: "NumericArray", val: []NullNumeric{}},
1486			{col: "NumericArray", val: []NullNumeric{{n1, true}, {n2, true}, {}}},
1487		} {
1488			tests = append(tests, tc)
1489		}
1490	}
1491
1492	// Write rows into table first.
1493	var muts []*Mutation
1494	for i, test := range tests {
1495		muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val}))
1496	}
1497	if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil {
1498		t.Fatal(err)
1499	}
1500
1501	for i, test := range tests {
1502		row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col})
1503		if err != nil {
1504			t.Fatalf("Unable to fetch row %v: %v", i, err)
1505		}
1506		// Create new instance of type of test.want.
1507		want := test.want
1508		if want == nil {
1509			want = test.val
1510		}
1511		gotp := reflect.New(reflect.TypeOf(want))
1512		if err := row.Column(0, gotp.Interface()); err != nil {
1513			t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err)
1514			continue
1515		}
1516		got := reflect.Indirect(gotp).Interface()
1517
1518		// One of the test cases is checking NaN handling.  Given
1519		// NaN!=NaN, we can't use reflect to test for it.
1520		if isNaN(got) && isNaN(want) {
1521			continue
1522		}
1523
1524		// Check non-NaN cases.
1525		if !testEqual(got, want) {
1526			t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want)
1527			continue
1528		}
1529	}
1530}
1531
1532// Test decoding Cloud Spanner STRUCT type.
1533func TestIntegration_StructTypes(t *testing.T) {
1534	t.Parallel()
1535
1536	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1537	defer cancel()
1538	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1539	defer cleanup()
1540
1541	tests := []struct {
1542		q    Statement
1543		want func(r *Row) error
1544	}{
1545		{
1546			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`},
1547			want: func(r *Row) error {
1548				// Test STRUCT ARRAY decoding to []NullRow.
1549				var rows []NullRow
1550				if err := r.Column(0, &rows); err != nil {
1551					return err
1552				}
1553				if len(rows) != 1 {
1554					return fmt.Errorf("len(rows) = %d; want 1", len(rows))
1555				}
1556				if !rows[0].Valid {
1557					return fmt.Errorf("rows[0] is NULL")
1558				}
1559				var i, j int64
1560				if err := rows[0].Row.Columns(&i, &j); err != nil {
1561					return err
1562				}
1563				if i != 1 || j != 2 {
1564					return fmt.Errorf("got (%d,%d), want (1,2)", i, j)
1565				}
1566				return nil
1567			},
1568		},
1569		{
1570			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`},
1571			want: func(r *Row) error {
1572				// Test Row.ToStruct.
1573				s := struct {
1574					Col1 []*struct {
1575						Foo int64 `spanner:"foo"`
1576						Bar int64 `spanner:"bar"`
1577					} `spanner:"col1"`
1578				}{}
1579				if err := r.ToStruct(&s); err != nil {
1580					return err
1581				}
1582				want := struct {
1583					Col1 []*struct {
1584						Foo int64 `spanner:"foo"`
1585						Bar int64 `spanner:"bar"`
1586					} `spanner:"col1"`
1587				}{
1588					Col1: []*struct {
1589						Foo int64 `spanner:"foo"`
1590						Bar int64 `spanner:"bar"`
1591					}{
1592						{
1593							Foo: 1,
1594							Bar: 2,
1595						},
1596					},
1597				}
1598				if !testEqual(want, s) {
1599					return fmt.Errorf("unexpected decoding result: %v, want %v", s, want)
1600				}
1601				return nil
1602			},
1603		},
1604	}
1605	for i, test := range tests {
1606		iter := client.Single().Query(ctx, test.q)
1607		defer iter.Stop()
1608		row, err := iter.Next()
1609		if err != nil {
1610			t.Errorf("%d: %v", i, err)
1611			continue
1612		}
1613		if err := test.want(row); err != nil {
1614			t.Errorf("%d: %v", i, err)
1615			continue
1616		}
1617	}
1618}
1619
1620func TestIntegration_StructParametersUnsupported(t *testing.T) {
1621	skipEmulatorTest(t)
1622	t.Parallel()
1623
1624	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1625	defer cancel()
1626	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
1627	defer cleanup()
1628
1629	for _, test := range []struct {
1630		param       interface{}
1631		wantCode    codes.Code
1632		wantMsgPart string
1633	}{
1634		{
1635			struct {
1636				Field int
1637			}{10},
1638			codes.Unimplemented,
1639			"Unsupported query shape: " +
1640				"A struct value cannot be returned as a column value. " +
1641				"Rewrite the query to flatten the struct fields in the result.",
1642		},
1643		{
1644			[]struct {
1645				Field int
1646			}{{10}, {20}},
1647			codes.Unimplemented,
1648			"Unsupported query shape: " +
1649				"This query can return a null-valued array of struct, " +
1650				"which is not supported by Spanner.",
1651		},
1652	} {
1653		iter := client.Single().Query(ctx, Statement{
1654			SQL:    "SELECT @p",
1655			Params: map[string]interface{}{"p": test.param},
1656		})
1657		_, err := iter.Next()
1658		iter.Stop()
1659		if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok {
1660			t.Fatal(msg)
1661		}
1662	}
1663}
1664
1665// Test queries of the form "SELECT expr".
1666func TestIntegration_QueryExpressions(t *testing.T) {
1667	t.Parallel()
1668
1669	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1670	defer cancel()
1671	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
1672	defer cleanup()
1673
1674	newRow := func(vals []interface{}) *Row {
1675		row, err := NewRow(make([]string, len(vals)), vals)
1676		if err != nil {
1677			t.Fatal(err)
1678		}
1679		return row
1680	}
1681
1682	tests := []struct {
1683		expr string
1684		want interface{}
1685	}{
1686		{"1", int64(1)},
1687		{"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}},
1688		{"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}},
1689		{"IEEE_DIVIDE(1, 0)", math.Inf(1)},
1690		{"IEEE_DIVIDE(-1, 0)", math.Inf(-1)},
1691		{"IEEE_DIVIDE(0, 0)", math.NaN()},
1692		// TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate.
1693		{"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}},
1694		{"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}},
1695		{"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}},
1696	}
1697	for _, test := range tests {
1698		iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr})
1699		defer iter.Stop()
1700		row, err := iter.Next()
1701		if err != nil {
1702			t.Errorf("%q: %v", test.expr, err)
1703			continue
1704		}
1705		// Create new instance of type of test.want.
1706		gotp := reflect.New(reflect.TypeOf(test.want))
1707		if err := row.Column(0, gotp.Interface()); err != nil {
1708			t.Errorf("%q: Column returned error %v", test.expr, err)
1709			continue
1710		}
1711		got := reflect.Indirect(gotp).Interface()
1712		// TODO(jba): remove isNaN special case when we have a better equality predicate.
1713		if isNaN(got) && isNaN(test.want) {
1714			continue
1715		}
1716		if !testEqual(got, test.want) {
1717			t.Errorf("%q\n got  %#v\nwant %#v", test.expr, got, test.want)
1718		}
1719	}
1720}
1721
1722func TestIntegration_QueryStats(t *testing.T) {
1723	skipEmulatorTest(t)
1724	t.Parallel()
1725
1726	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1727	defer cancel()
1728	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1729	defer cleanup()
1730
1731	accounts := []*Mutation{
1732		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1733		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1734	}
1735	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1736		t.Fatal(err)
1737	}
1738	const sql = "SELECT Balance FROM Accounts"
1739
1740	qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil})
1741	if err != nil {
1742		t.Fatal(err)
1743	}
1744	if len(qp.PlanNodes) == 0 {
1745		t.Error("got zero plan nodes, expected at least one")
1746	}
1747
1748	iter := client.Single().QueryWithStats(ctx, Statement{sql, nil})
1749	defer iter.Stop()
1750	for {
1751		_, err := iter.Next()
1752		if err == iterator.Done {
1753			break
1754		}
1755		if err != nil {
1756			t.Fatal(err)
1757		}
1758	}
1759	if iter.QueryPlan == nil {
1760		t.Error("got nil QueryPlan, expected one")
1761	}
1762	if iter.QueryStats == nil {
1763		t.Error("got nil QueryStats, expected some")
1764	}
1765}
1766
1767func TestIntegration_InvalidDatabase(t *testing.T) {
1768	t.Parallel()
1769
1770	if databaseAdmin == nil {
1771		t.Skip("Integration tests skipped")
1772	}
1773	ctx := context.Background()
1774	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID)
1775	c, err := createClient(ctx, dbPath, SessionPoolConfig{})
1776	// Client creation should succeed even if the database is invalid.
1777	if err != nil {
1778		t.Fatal(err)
1779	}
1780	_, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"})
1781	if msg, ok := matchError(err, codes.NotFound, ""); !ok {
1782		t.Fatal(msg)
1783	}
1784}
1785
1786func TestIntegration_ReadErrors(t *testing.T) {
1787	t.Parallel()
1788
1789	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1790	defer cancel()
1791	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
1792	defer cleanup()
1793
1794	var ms []*Mutation
1795	for i := 0; i < 2; i++ {
1796		ms = append(ms, InsertOrUpdate(testTable,
1797			testTableColumns,
1798			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v")}))
1799	}
1800	if _, err := client.Apply(ctx, ms); err != nil {
1801		t.Fatal(err)
1802	}
1803
1804	// Read over invalid table fails
1805	_, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"})
1806	if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok {
1807		t.Error(msg)
1808	}
1809	// Read over invalid column fails
1810	_, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"})
1811	if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok {
1812		t.Error(msg)
1813	}
1814
1815	// Invalid query fails
1816	iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"})
1817	defer iter.Stop()
1818	_, err = iter.Next()
1819	if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok {
1820		t.Error(msg)
1821	}
1822
1823	// Read should fail on cancellation.
1824	cctx, cancel := context.WithCancel(ctx)
1825	cancel()
1826	_, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"})
1827	if msg, ok := matchError(err, codes.Canceled, ""); !ok {
1828		t.Error(msg)
1829	}
1830	// Read should fail if deadline exceeded.
1831	dctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
1832	defer cancel()
1833	<-dctx.Done()
1834	_, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"})
1835	if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok {
1836		t.Error(msg)
1837	}
1838	// Read should fail if there are multiple rows returned.
1839	_, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v"}, testTableColumns)
1840	wantMsgPart := fmt.Sprintf("more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", testTable, Key{"v"}, testTableIndex)
1841	if msg, ok := matchError(err, codes.FailedPrecondition, wantMsgPart); !ok {
1842		t.Error(msg)
1843	}
1844}
1845
1846// Test TransactionRunner. Test that transactions are aborted and retried as
1847// expected.
1848func TestIntegration_TransactionRunner(t *testing.T) {
1849	skipEmulatorTest(t)
1850	t.Parallel()
1851
1852	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1853	defer cancel()
1854	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1855	defer cleanup()
1856
1857	// Test 1: User error should abort the transaction.
1858	_, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1859		tx.BufferWrite([]*Mutation{
1860			Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})})
1861		return errors.New("user error")
1862	})
1863	// Empty read.
1864	rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"}))
1865	if err != nil {
1866		t.Fatal(err)
1867	}
1868	if got, want := len(rows), 0; got != want {
1869		t.Errorf("Empty read, got %d, want %d.", got, want)
1870	}
1871
1872	// Test 2: Expect abort and retry.
1873	// We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by
1874	// committing writes to the column txn2 have read, and expect the following
1875	// read to abort and txn2 retries.
1876
1877	// Set up two accounts
1878	accounts := []*Mutation{
1879		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}),
1880		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}),
1881	}
1882	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1883		t.Fatal(err)
1884	}
1885
1886	var (
1887		cTxn1Start  = make(chan struct{})
1888		cTxn1Commit = make(chan struct{})
1889		cTxn2Start  = make(chan struct{})
1890		wg          sync.WaitGroup
1891	)
1892
1893	// read balance, check error if we don't expect abort.
1894	readBalance := func(tx interface {
1895		ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error)
1896	}, key int64, expectAbort bool) (int64, error) {
1897		var b int64
1898		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"})
1899		if e != nil {
1900			if expectAbort && !isAbortedErr(e) {
1901				t.Errorf("ReadRow got %v, want Abort error.", e)
1902			}
1903			// Verify that we received and are able to extract retry info from
1904			// the aborted error.
1905			if _, hasRetryInfo := ExtractRetryDelay(e); !hasRetryInfo {
1906				t.Errorf("Got Abort error without RetryInfo\nGot: %v", e)
1907			}
1908			return b, e
1909		}
1910		if ce := r.Column(0, &b); ce != nil {
1911			return b, ce
1912		}
1913		return b, nil
1914	}
1915
1916	wg.Add(2)
1917	// Txn 1
1918	go func() {
1919		defer wg.Done()
1920		var once sync.Once
1921		_, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1922			b, e := readBalance(tx, 1, false)
1923			if e != nil {
1924				return e
1925			}
1926			// txn 1 can abort, in that case we skip closing the channel on
1927			// retry.
1928			once.Do(func() { close(cTxn1Start) })
1929			e = tx.BufferWrite([]*Mutation{
1930				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})})
1931			if e != nil {
1932				return e
1933			}
1934			// Wait for second transaction.
1935			<-cTxn2Start
1936			return nil
1937		})
1938		close(cTxn1Commit)
1939		if e != nil {
1940			t.Errorf("Transaction 1 commit, got %v, want nil.", e)
1941		}
1942	}()
1943	// Txn 2
1944	go func() {
1945		// Wait until txn 1 starts.
1946		<-cTxn1Start
1947		defer wg.Done()
1948		var (
1949			once sync.Once
1950			b1   int64
1951			b2   int64
1952			e    error
1953		)
1954		_, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1955			if b1, e = readBalance(tx, 1, false); e != nil {
1956				return e
1957			}
1958			// Skip closing channel on retry.
1959			once.Do(func() { close(cTxn2Start) })
1960			// Wait until txn 1 successfully commits.
1961			<-cTxn1Commit
1962			// Txn1 has committed and written a balance to the account. Now this
1963			// transaction (txn2) reads and re-writes the balance. The first
1964			// time through, it will abort because it overlaps with txn1. Then
1965			// it will retry after txn1 commits, and succeed.
1966			if b2, e = readBalance(tx, 2, true); e != nil {
1967				return e
1968			}
1969			return tx.BufferWrite([]*Mutation{
1970				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})})
1971		})
1972		if e != nil {
1973			t.Errorf("Transaction 2 commit, got %v, want nil.", e)
1974		}
1975	}()
1976	wg.Wait()
1977	// Check that both transactions' effects are visible.
1978	for i := int64(1); i <= int64(2); i++ {
1979		if b, e := readBalance(client.Single(), i, false); e != nil {
1980			t.Fatalf("ReadBalance for key %d error %v.", i, e)
1981		} else if b != i {
1982			t.Errorf("Balance for key %d, got %d, want %d.", i, b, i)
1983		}
1984	}
1985}
1986
1987// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then
1988// serialize and deserialize both transaction and partition to be used in
1989// execution on another client, and compare results.
1990func TestIntegration_BatchQuery(t *testing.T) {
1991	t.Parallel()
1992
1993	// Set up testing environment.
1994	var (
1995		client2 *Client
1996		err     error
1997	)
1998	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1999	defer cancel()
2000	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
2001	defer cleanup()
2002
2003	if err = populate(ctx, client); err != nil {
2004		t.Fatal(err)
2005	}
2006	if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
2007		t.Fatal(err)
2008	}
2009	defer client2.Close()
2010
2011	// PartitionQuery
2012	var (
2013		txn        *BatchReadOnlyTransaction
2014		partitions []*Partition
2015		stmt       = Statement{SQL: "SELECT * FROM test;"}
2016	)
2017
2018	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
2019		t.Fatal(err)
2020	}
2021	defer txn.Cleanup(ctx)
2022	if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil {
2023		t.Fatal(err)
2024	}
2025
2026	// Reconstruct BatchReadOnlyTransactionID and execute partitions
2027	var (
2028		tid2      BatchReadOnlyTransactionID
2029		data      []byte
2030		gotResult bool // if we get matching result from two separate txns
2031	)
2032	if data, err = txn.ID.MarshalBinary(); err != nil {
2033		t.Fatalf("encoding failed %v", err)
2034	}
2035	if err = tid2.UnmarshalBinary(data); err != nil {
2036		t.Fatalf("decoding failed %v", err)
2037	}
2038	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
2039
2040	// Execute Partitions and compare results
2041	for i, p := range partitions {
2042		iter := txn.Execute(ctx, p)
2043		defer iter.Stop()
2044		p2 := serdesPartition(t, i, p)
2045		iter2 := txn2.Execute(ctx, &p2)
2046		defer iter2.Stop()
2047
2048		row1, err1 := iter.Next()
2049		row2, err2 := iter2.Next()
2050		if err1 != err2 {
2051			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
2052			continue
2053		}
2054		if !testEqual(row1, row2) {
2055			t.Fatalf("execution returned different values: %v, %v", row1, row2)
2056			continue
2057		}
2058		if row1 == nil {
2059			continue
2060		}
2061		var a, b string
2062		if err = row1.Columns(&a, &b); err != nil {
2063			t.Fatalf("failed to parse row %v", err)
2064			continue
2065		}
2066		if a == str1 && b == str2 {
2067			gotResult = true
2068		}
2069	}
2070	if !gotResult {
2071		t.Fatalf("execution didn't return expected values")
2072	}
2073}
2074
2075// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery
2076func TestIntegration_BatchRead(t *testing.T) {
2077	t.Parallel()
2078
2079	// Set up testing environment.
2080	var (
2081		client2 *Client
2082		err     error
2083	)
2084	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2085	defer cancel()
2086	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
2087	defer cleanup()
2088
2089	if err = populate(ctx, client); err != nil {
2090		t.Fatal(err)
2091	}
2092	if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
2093		t.Fatal(err)
2094	}
2095	defer client2.Close()
2096
2097	// PartitionRead
2098	var (
2099		txn        *BatchReadOnlyTransaction
2100		partitions []*Partition
2101	)
2102
2103	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
2104		t.Fatal(err)
2105	}
2106	defer txn.Cleanup(ctx)
2107	if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
2108		t.Fatal(err)
2109	}
2110
2111	// Reconstruct BatchReadOnlyTransactionID and execute partitions.
2112	var (
2113		tid2      BatchReadOnlyTransactionID
2114		data      []byte
2115		gotResult bool // if we get matching result from two separate txns
2116	)
2117	if data, err = txn.ID.MarshalBinary(); err != nil {
2118		t.Fatalf("encoding failed %v", err)
2119	}
2120	if err = tid2.UnmarshalBinary(data); err != nil {
2121		t.Fatalf("decoding failed %v", err)
2122	}
2123	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
2124
2125	// Execute Partitions and compare results.
2126	for i, p := range partitions {
2127		iter := txn.Execute(ctx, p)
2128		defer iter.Stop()
2129		p2 := serdesPartition(t, i, p)
2130		iter2 := txn2.Execute(ctx, &p2)
2131		defer iter2.Stop()
2132
2133		row1, err1 := iter.Next()
2134		row2, err2 := iter2.Next()
2135		if err1 != err2 {
2136			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
2137			continue
2138		}
2139		if !testEqual(row1, row2) {
2140			t.Fatalf("execution returned different values: %v, %v", row1, row2)
2141			continue
2142		}
2143		if row1 == nil {
2144			continue
2145		}
2146		var a, b string
2147		if err = row1.Columns(&a, &b); err != nil {
2148			t.Fatalf("failed to parse row %v", err)
2149			continue
2150		}
2151		if a == str1 && b == str2 {
2152			gotResult = true
2153		}
2154	}
2155	if !gotResult {
2156		t.Fatalf("execution didn't return expected values")
2157	}
2158}
2159
2160// Test normal txReadEnv method on BatchReadOnlyTransaction.
2161func TestIntegration_BROTNormal(t *testing.T) {
2162	t.Parallel()
2163
2164	// Set up testing environment and create txn.
2165	var (
2166		txn *BatchReadOnlyTransaction
2167		err error
2168		row *Row
2169		i   int64
2170	)
2171	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2172	defer cancel()
2173	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
2174	defer cleanup()
2175
2176	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
2177		t.Fatal(err)
2178	}
2179	defer txn.Cleanup(ctx)
2180	if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
2181		t.Fatal(err)
2182	}
2183	// Normal query should work with BatchReadOnlyTransaction.
2184	stmt2 := Statement{SQL: "SELECT 1"}
2185	iter := txn.Query(ctx, stmt2)
2186	defer iter.Stop()
2187
2188	row, err = iter.Next()
2189	if err != nil {
2190		t.Errorf("query failed with %v", err)
2191	}
2192	if err = row.Columns(&i); err != nil {
2193		t.Errorf("failed to parse row %v", err)
2194	}
2195}
2196
2197func TestIntegration_CommitTimestamp(t *testing.T) {
2198	t.Parallel()
2199
2200	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2201	defer cancel()
2202	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, ctsDBStatements)
2203	defer cleanup()
2204
2205	type testTableRow struct {
2206		Key string
2207		Ts  NullTime
2208	}
2209
2210	var (
2211		cts1, cts2, ts1, ts2 time.Time
2212		err                  error
2213	)
2214
2215	// Apply mutation in sequence, expect to see commit timestamp in good order,
2216	// check also the commit timestamp returned
2217	for _, it := range []struct {
2218		k string
2219		t *time.Time
2220	}{
2221		{"a", &cts1},
2222		{"b", &cts2},
2223	} {
2224		tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}}
2225		m, err := InsertStruct("TestTable", tt)
2226		if err != nil {
2227			t.Fatal(err)
2228		}
2229		*it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce())
2230		if err != nil {
2231			t.Fatal(err)
2232		}
2233	}
2234
2235	txn := client.ReadOnlyTransaction()
2236	for _, it := range []struct {
2237		k string
2238		t *time.Time
2239	}{
2240		{"a", &ts1},
2241		{"b", &ts2},
2242	} {
2243		if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil {
2244			t.Fatal(err)
2245		} else {
2246			var got testTableRow
2247			if err := r.ToStruct(&got); err != nil {
2248				t.Fatal(err)
2249			}
2250			*it.t = got.Ts.Time
2251		}
2252	}
2253	if !cts1.Equal(ts1) {
2254		t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1)
2255	}
2256	if !cts2.Equal(ts2) {
2257		t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2)
2258	}
2259
2260	// Try writing a timestamp in the future to commit timestamp, expect error.
2261	_, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce())
2262	if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok {
2263		t.Error(msg)
2264	}
2265}
2266
2267func TestIntegration_DML(t *testing.T) {
2268	t.Parallel()
2269
2270	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2271	defer cancel()
2272	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2273	defer cleanup()
2274
2275	// Function that reads a single row's first name from within a transaction.
2276	readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) {
2277		row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"})
2278		if err != nil {
2279			return "", err
2280		}
2281		var fn string
2282		if err := row.Column(0, &fn); err != nil {
2283			return "", err
2284		}
2285		return fn, nil
2286	}
2287
2288	// Function that reads multiple rows' first names from outside a read/write
2289	// transaction.
2290	readFirstNames := func(keys ...int) []string {
2291		var ks []KeySet
2292		for _, k := range keys {
2293			ks = append(ks, Key{k})
2294		}
2295		iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"})
2296		var got []string
2297		var fn string
2298		err := iter.Do(func(row *Row) error {
2299			if err := row.Column(0, &fn); err != nil {
2300				return err
2301			}
2302			got = append(got, fn)
2303			return nil
2304		})
2305		if err != nil {
2306			t.Fatalf("readFirstNames(%v): %v", keys, err)
2307		}
2308		return got
2309	}
2310
2311	// Use ReadWriteTransaction.Query to execute a DML statement.
2312	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2313		iter := tx.Query(ctx, Statement{
2314			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`,
2315		})
2316		defer iter.Stop()
2317		if row, err := iter.Next(); err != iterator.Done {
2318			t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err)
2319		}
2320		if iter.RowCount != 1 {
2321			t.Errorf("row count: got %d, want 1", iter.RowCount)
2322		}
2323		// The results of the DML statement should be visible to the transaction.
2324		got, err := readFirstName(tx, 1)
2325		if err != nil {
2326			return err
2327		}
2328		if want := "Umm"; got != want {
2329			t.Errorf("got %q, want %q", got, want)
2330		}
2331		return nil
2332	})
2333	if err != nil {
2334		t.Fatal(err)
2335	}
2336
2337	// Use ReadWriteTransaction.Update to execute a DML statement.
2338	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2339		count, err := tx.Update(ctx, Statement{
2340			SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`,
2341		})
2342		if err != nil {
2343			return err
2344		}
2345		if count != 1 {
2346			t.Errorf("row count: got %d, want 1", count)
2347		}
2348		got, err := readFirstName(tx, 2)
2349		if err != nil {
2350			return err
2351		}
2352		if want := "Eduard"; got != want {
2353			t.Errorf("got %q, want %q", got, want)
2354		}
2355		return nil
2356
2357	})
2358	if err != nil {
2359		t.Fatal(err)
2360	}
2361
2362	// Roll back a DML statement and confirm that it didn't happen.
2363	var fail = errors.New("fail")
2364	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2365		_, err := tx.Update(ctx, Statement{
2366			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
2367		})
2368		if err != nil {
2369			return err
2370		}
2371		return fail
2372	})
2373	if err != fail {
2374		t.Fatalf("rolling back: got error %v, want the error 'fail'", err)
2375	}
2376	_, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"})
2377	if got, want := ErrCode(err), codes.NotFound; got != want {
2378		t.Errorf("got %s, want %s", got, want)
2379	}
2380
2381	// Run two DML statements in the same transaction.
2382	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2383		_, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`})
2384		if err != nil {
2385			return err
2386		}
2387		_, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`})
2388		if err != nil {
2389			return err
2390		}
2391		return nil
2392	})
2393	if err != nil {
2394		t.Fatal(err)
2395	}
2396	got := readFirstNames(1, 2)
2397	want := []string{"Oum", "Eddie"}
2398	if !testEqual(got, want) {
2399		t.Errorf("got %v, want %v", got, want)
2400	}
2401
2402	// Run a DML statement and an ordinary mutation in the same transaction.
2403	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2404		_, err := tx.Update(ctx, Statement{
2405			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
2406		})
2407		if err != nil {
2408			return err
2409		}
2410		return tx.BufferWrite([]*Mutation{
2411			Insert("Singers", []string{"SingerId", "FirstName", "LastName"},
2412				[]interface{}{4, "Andy", "Irvine"}),
2413		})
2414	})
2415	if err != nil {
2416		t.Fatal(err)
2417	}
2418	got = readFirstNames(3, 4)
2419	want = []string{"Audra", "Andy"}
2420	if !testEqual(got, want) {
2421		t.Errorf("got %v, want %v", got, want)
2422	}
2423
2424	// Attempt to run a query using update.
2425	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2426		_, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`})
2427		return err
2428	})
2429	if got, want := ErrCode(err), codes.InvalidArgument; got != want {
2430		t.Errorf("got %s, want %s", got, want)
2431	}
2432}
2433
2434func TestIntegration_StructParametersBind(t *testing.T) {
2435	t.Parallel()
2436
2437	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2438	defer cancel()
2439	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
2440	defer cleanup()
2441
2442	type tRow []interface{}
2443	type tRows []struct{ trow tRow }
2444
2445	type allFields struct {
2446		Stringf string
2447		Intf    int
2448		Boolf   bool
2449		Floatf  float64
2450		Bytef   []byte
2451		Timef   time.Time
2452		Datef   civil.Date
2453	}
2454	allColumns := []string{
2455		"Stringf",
2456		"Intf",
2457		"Boolf",
2458		"Floatf",
2459		"Bytef",
2460		"Timef",
2461		"Datef",
2462	}
2463	s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1}
2464	s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2}
2465
2466	dynamicStructType := reflect.StructOf([]reflect.StructField{
2467		{Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`},
2468	})
2469	s3 := reflect.New(dynamicStructType)
2470	s3.Elem().Field(0).Set(reflect.ValueOf(t1))
2471
2472	for i, test := range []struct {
2473		param interface{}
2474		sql   string
2475		cols  []string
2476		trows tRows
2477	}{
2478		// Struct value.
2479		{
2480			s1,
2481			"SELECT" +
2482				" @p.Stringf," +
2483				" @p.Intf," +
2484				" @p.Boolf," +
2485				" @p.Floatf," +
2486				" @p.Bytef," +
2487				" @p.Timef," +
2488				" @p.Datef",
2489			allColumns,
2490			tRows{
2491				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
2492			},
2493		},
2494		// Array of struct value.
2495		{
2496			[]allFields{s1, s2},
2497			"SELECT * FROM UNNEST(@p)",
2498			allColumns,
2499			tRows{
2500				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
2501				{tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}},
2502			},
2503		},
2504		// Null struct.
2505		{
2506			(*allFields)(nil),
2507			"SELECT @p IS NULL",
2508			[]string{""},
2509			tRows{
2510				{tRow{true}},
2511			},
2512		},
2513		// Null Array of struct.
2514		{
2515			[]allFields(nil),
2516			"SELECT @p IS NULL",
2517			[]string{""},
2518			tRows{
2519				{tRow{true}},
2520			},
2521		},
2522		// Empty struct.
2523		{
2524			struct{}{},
2525			"SELECT @p IS NULL ",
2526			[]string{""},
2527			tRows{
2528				{tRow{false}},
2529			},
2530		},
2531		// Empty array of struct.
2532		{
2533			[]allFields{},
2534			"SELECT * FROM UNNEST(@p) ",
2535			allColumns,
2536			tRows{},
2537		},
2538		// Struct with duplicate fields.
2539		{
2540			struct {
2541				A int `spanner:"field"`
2542				B int `spanner:"field"`
2543			}{10, 20},
2544			"SELECT * FROM UNNEST([@p]) ",
2545			[]string{"field", "field"},
2546			tRows{
2547				{tRow{10, 20}},
2548			},
2549		},
2550		// Struct with unnamed fields.
2551		{
2552			struct {
2553				A string `spanner:""`
2554			}{"hello"},
2555			"SELECT * FROM UNNEST([@p]) ",
2556			[]string{""},
2557			tRows{
2558				{tRow{"hello"}},
2559			},
2560		},
2561		// Mixed struct.
2562		{
2563			struct {
2564				DynamicStructField interface{}  `spanner:"f1"`
2565				ArrayStructField   []*allFields `spanner:"f2"`
2566			}{
2567				DynamicStructField: s3.Interface(),
2568				ArrayStructField:   []*allFields{nil},
2569			},
2570			"SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ",
2571			[]string{"ff1", "", ""},
2572			tRows{
2573				{tRow{t1, 1, true}},
2574			},
2575		},
2576	} {
2577		iter := client.Single().Query(ctx, Statement{
2578			SQL:    test.sql,
2579			Params: map[string]interface{}{"p": test.param},
2580		})
2581		var gotRows []*Row
2582		err := iter.Do(func(r *Row) error {
2583			gotRows = append(gotRows, r)
2584			return nil
2585		})
2586		if err != nil {
2587			t.Errorf("Failed to execute test case %d, error: %v", i, err)
2588		}
2589
2590		var wantRows []*Row
2591		for j, row := range test.trows {
2592			r, err := NewRow(test.cols, row.trow)
2593			if err != nil {
2594				t.Errorf("Invalid row %d in test case %d", j, i)
2595			}
2596			wantRows = append(wantRows, r)
2597		}
2598		if !testEqual(gotRows, wantRows) {
2599			t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows)
2600		}
2601	}
2602}
2603
2604func TestIntegration_PDML(t *testing.T) {
2605	t.Parallel()
2606
2607	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2608	defer cancel()
2609	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2610	defer cleanup()
2611
2612	columns := []string{"SingerId", "FirstName", "LastName"}
2613
2614	// Populate the Singers table.
2615	var muts []*Mutation
2616	for _, row := range [][]interface{}{
2617		{1, "Umm", "Kulthum"},
2618		{2, "Eduard", "Khil"},
2619		{3, "Audra", "McDonald"},
2620		{4, "Enrique", "Iglesias"},
2621		{5, "Shakira", "Ripoll"},
2622	} {
2623		muts = append(muts, Insert("Singers", columns, row))
2624	}
2625	if _, err := client.Apply(ctx, muts); err != nil {
2626		t.Fatal(err)
2627	}
2628	// Identifiers in PDML statements must be fully qualified.
2629	// TODO(jba): revisit the above.
2630	count, err := client.PartitionedUpdate(ctx, Statement{
2631		SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= @end`,
2632		Params: map[string]interface{}{
2633			"end": 3,
2634		},
2635	})
2636	if err != nil {
2637		t.Fatal(err)
2638	}
2639	if want := int64(3); count != want {
2640		t.Fatalf("got %d, want %d", count, want)
2641	}
2642	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2643	if err != nil {
2644		t.Fatal(err)
2645	}
2646	want := [][]interface{}{
2647		{int64(1), "changed", "Kulthum"},
2648		{int64(2), "changed", "Khil"},
2649		{int64(3), "changed", "McDonald"},
2650		{int64(4), "Enrique", "Iglesias"},
2651		{int64(5), "Shakira", "Ripoll"},
2652	}
2653	if !testEqual(got, want) {
2654		t.Errorf("\ngot %v\nwant%v", got, want)
2655	}
2656}
2657
2658func TestIntegration_BatchDML(t *testing.T) {
2659	t.Parallel()
2660
2661	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2662	defer cancel()
2663	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2664	defer cleanup()
2665
2666	columns := []string{"SingerId", "FirstName", "LastName"}
2667
2668	// Populate the Singers table.
2669	var muts []*Mutation
2670	for _, row := range [][]interface{}{
2671		{1, "Umm", "Kulthum"},
2672		{2, "Eduard", "Khil"},
2673		{3, "Audra", "McDonald"},
2674	} {
2675		muts = append(muts, Insert("Singers", columns, row))
2676	}
2677	if _, err := client.Apply(ctx, muts); err != nil {
2678		t.Fatal(err)
2679	}
2680
2681	var counts []int64
2682	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2683		counts, err = tx.BatchUpdate(ctx, []Statement{
2684			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2685			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2686			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2687		})
2688		return err
2689	})
2690
2691	if err != nil {
2692		t.Fatal(err)
2693	}
2694	if want := []int64{1, 1, 1}; !testEqual(counts, want) {
2695		t.Fatalf("got %d, want %d", counts, want)
2696	}
2697	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2698	if err != nil {
2699		t.Fatal(err)
2700	}
2701	want := [][]interface{}{
2702		{int64(1), "changed 1", "Kulthum"},
2703		{int64(2), "changed 2", "Khil"},
2704		{int64(3), "changed 3", "McDonald"},
2705	}
2706	if !testEqual(got, want) {
2707		t.Errorf("\ngot %v\nwant%v", got, want)
2708	}
2709}
2710
2711func TestIntegration_BatchDML_NoStatements(t *testing.T) {
2712	t.Parallel()
2713
2714	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2715	defer cancel()
2716	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2717	defer cleanup()
2718
2719	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2720		_, err = tx.BatchUpdate(ctx, []Statement{})
2721		return err
2722	})
2723	if err == nil {
2724		t.Fatal("expected error, got nil")
2725	}
2726	if s, ok := status.FromError(err); ok {
2727		if s.Code() != codes.InvalidArgument {
2728			t.Fatalf("expected InvalidArgument, got %v", err)
2729		}
2730	} else {
2731		t.Fatalf("expected InvalidArgument, got %v", err)
2732	}
2733}
2734
2735func TestIntegration_BatchDML_TwoStatements(t *testing.T) {
2736	t.Parallel()
2737
2738	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2739	defer cancel()
2740	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2741	defer cleanup()
2742
2743	columns := []string{"SingerId", "FirstName", "LastName"}
2744
2745	// Populate the Singers table.
2746	var muts []*Mutation
2747	for _, row := range [][]interface{}{
2748		{1, "Umm", "Kulthum"},
2749		{2, "Eduard", "Khil"},
2750		{3, "Audra", "McDonald"},
2751	} {
2752		muts = append(muts, Insert("Singers", columns, row))
2753	}
2754	if _, err := client.Apply(ctx, muts); err != nil {
2755		t.Fatal(err)
2756	}
2757
2758	var updateCount int64
2759	var batchCounts []int64
2760	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2761		batchCounts, err = tx.BatchUpdate(ctx, []Statement{
2762			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2763			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2764			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2765		})
2766		if err != nil {
2767			return err
2768		}
2769
2770		updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`})
2771		return err
2772	})
2773	if err != nil {
2774		t.Fatal(err)
2775	}
2776	if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) {
2777		t.Fatalf("got %d, want %d", batchCounts, want)
2778	}
2779	if updateCount != 1 {
2780		t.Fatalf("got %v, want 1", updateCount)
2781	}
2782}
2783
2784func TestIntegration_BatchDML_Error(t *testing.T) {
2785	t.Parallel()
2786
2787	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2788	defer cancel()
2789	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2790	defer cleanup()
2791
2792	columns := []string{"SingerId", "FirstName", "LastName"}
2793
2794	// Populate the Singers table.
2795	var muts []*Mutation
2796	for _, row := range [][]interface{}{
2797		{1, "Umm", "Kulthum"},
2798		{2, "Eduard", "Khil"},
2799		{3, "Audra", "McDonald"},
2800	} {
2801		muts = append(muts, Insert("Singers", columns, row))
2802	}
2803	if _, err := client.Apply(ctx, muts); err != nil {
2804		t.Fatal(err)
2805	}
2806
2807	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2808		counts, err := tx.BatchUpdate(ctx, []Statement{
2809			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2810			{SQL: `some illegal statement`},
2811			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2812		})
2813		if err == nil {
2814			t.Fatal("expected err, got nil")
2815		}
2816		// The statement may also have been aborted by Spanner, and in that
2817		// case we should just let the client library retry the transaction.
2818		if status.Code(err) == codes.Aborted {
2819			return err
2820		}
2821		if want := []int64{1}; !testEqual(counts, want) {
2822			t.Fatalf("got %d, want %d", counts, want)
2823		}
2824
2825		got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns))
2826		// Aborted error is ok, just retry the transaction.
2827		if status.Code(err) == codes.Aborted {
2828			return err
2829		}
2830		if err != nil {
2831			t.Fatal(err)
2832		}
2833		want := [][]interface{}{
2834			{int64(1), "changed 1", "Kulthum"},
2835			{int64(2), "Eduard", "Khil"},
2836			{int64(3), "Audra", "McDonald"},
2837		}
2838		if !testEqual(got, want) {
2839			t.Errorf("\ngot %v\nwant%v", got, want)
2840		}
2841
2842		return nil
2843	})
2844	if err != nil {
2845		t.Fatal(err)
2846	}
2847}
2848
2849func TestIntegration_StartBackupOperation(t *testing.T) {
2850	skipEmulatorTest(t)
2851	t.Parallel()
2852
2853	// Backups can be slow, so use a 30 minute timeout.
2854	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
2855	defer cancel()
2856	_, testDatabaseName, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, backuDBStatements)
2857	defer cleanup()
2858
2859	backupID := backupIDSpace.New()
2860	backupName := fmt.Sprintf("projects/%s/instances/%s/backups/%s", testProjectID, testInstanceID, backupID)
2861	// Minimum expiry time is 6 hours
2862	expires := time.Now().Add(time.Hour * 7)
2863	respLRO, err := databaseAdmin.StartBackupOperation(ctx, backupID, testDatabaseName, expires)
2864	if err != nil {
2865		t.Fatal(err)
2866	}
2867
2868	_, err = respLRO.Wait(ctx)
2869	if err != nil {
2870		t.Fatal(err)
2871	}
2872	respMetadata, err := respLRO.Metadata()
2873	if err != nil {
2874		t.Fatalf("backup response metadata, got error %v, want nil", err)
2875	}
2876	if respMetadata.Database != testDatabaseName {
2877		t.Fatalf("backup database name, got %s, want %s", respMetadata.Database, testDatabaseName)
2878	}
2879	if respMetadata.Progress.ProgressPercent != 100 {
2880		t.Fatalf("backup progress percent, got %d, want 100", respMetadata.Progress.ProgressPercent)
2881	}
2882	respCheck, err := databaseAdmin.GetBackup(ctx, &adminpb.GetBackupRequest{Name: backupName})
2883	if err != nil {
2884		t.Fatalf("backup metadata, got error %v, want nil", err)
2885	}
2886	if respCheck.CreateTime == nil {
2887		t.Fatal("backup create time, got nil, want non-nil")
2888	}
2889	if respCheck.State != adminpb.Backup_READY {
2890		t.Fatalf("backup state, got %v, want %v", respCheck.State, adminpb.Backup_READY)
2891	}
2892	if respCheck.SizeBytes == 0 {
2893		t.Fatalf("backup size, got %d, want non-zero", respCheck.SizeBytes)
2894	}
2895}
2896
2897// Prepare initializes Cloud Spanner testing DB and clients.
2898func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
2899	if databaseAdmin == nil {
2900		t.Skip("Integration tests skipped")
2901	}
2902	// Construct a unique test DB name.
2903	dbName := dbNameSpace.New()
2904
2905	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName)
2906	// Create database and tables.
2907	op, err := databaseAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{
2908		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
2909		CreateStatement: "CREATE DATABASE " + dbName,
2910		ExtraStatements: statements,
2911	})
2912	if err != nil {
2913		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
2914	}
2915	if _, err := op.Wait(ctx); err != nil {
2916		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
2917	}
2918	client, err := createClient(ctx, dbPath, spc)
2919	if err != nil {
2920		t.Fatalf("cannot create data client on DB %v: %v", dbPath, err)
2921	}
2922	return client, dbPath, func() {
2923		client.Close()
2924	}
2925}
2926
2927func cleanupInstances() {
2928	if instanceAdmin == nil {
2929		// Integration tests skipped.
2930		return
2931	}
2932
2933	ctx := context.Background()
2934	parent := fmt.Sprintf("projects/%v", testProjectID)
2935	iter := instanceAdmin.ListInstances(ctx, &instancepb.ListInstancesRequest{
2936		Parent: parent,
2937		Filter: "name:gotest-",
2938	})
2939	expireAge := 24 * time.Hour
2940
2941	for {
2942		inst, err := iter.Next()
2943		if err == iterator.Done {
2944			break
2945		}
2946		if err != nil {
2947			panic(err)
2948		}
2949
2950		_, instID, err := parseInstanceName(inst.Name)
2951		if err != nil {
2952			log.Printf("Error: %v\n", err)
2953			continue
2954		}
2955		if instanceNameSpace.Older(instID, expireAge) {
2956			log.Printf("Deleting instance %s", inst.Name)
2957			deleteInstanceAndBackups(ctx, inst.Name)
2958		}
2959	}
2960}
2961
2962func deleteInstanceAndBackups(ctx context.Context, instanceName string) {
2963	// First delete any lingering backups that might have been left on
2964	// the instance.
2965	backups := databaseAdmin.ListBackups(ctx, &adminpb.ListBackupsRequest{Parent: instanceName})
2966	for {
2967		backup, err := backups.Next()
2968		if err == iterator.Done {
2969			break
2970		}
2971		if err != nil {
2972			log.Printf("failed to retrieve backups from instance %s because of error %v", instanceName, err)
2973			break
2974		}
2975		if err := databaseAdmin.DeleteBackup(ctx, &adminpb.DeleteBackupRequest{Name: backup.Name}); err != nil {
2976			log.Printf("failed to delete backup %s (error %v)", backup.Name, err)
2977		}
2978	}
2979
2980	if err := instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{Name: instanceName}); err != nil {
2981		log.Printf("failed to delete instance %s (error %v), might need a manual removal",
2982			instanceName, err)
2983	}
2984}
2985
2986func rangeReads(ctx context.Context, t *testing.T, client *Client) {
2987	checkRange := func(ks KeySet, wantNums ...int) {
2988		if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok {
2989			t.Errorf("key set %+v: %s", ks, msg)
2990		}
2991	}
2992
2993	checkRange(Key{"k1"}, 1)
2994	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4)
2995	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5)
2996	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5)
2997	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4)
2998
2999	// Partial key specification.
3000	checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9)
3001	checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9)
3002	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10)
3003	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11)
3004
3005	// The following produce empty ranges.
3006	// TODO(jba): Consider a multi-part key to illustrate partial key behavior.
3007	// checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen})
3008	// checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen})
3009	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen})
3010	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed})
3011
3012	// Prefix is component-wise, not string prefix.
3013	checkRange(Key{"k1"}.AsPrefix(), 1)
3014	checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
3015
3016	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
3017}
3018
3019func indexRangeReads(ctx context.Context, t *testing.T, client *Client) {
3020	checkRange := func(ks KeySet, wantNums ...int) {
3021		if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns),
3022			wantNums); !ok {
3023			t.Errorf("key set %+v: %s", ks, msg)
3024		}
3025	}
3026
3027	checkRange(Key{"v1"}, 1)
3028	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4)
3029	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5)
3030	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5)
3031	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4)
3032
3033	// // Partial key specification.
3034	checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9)
3035	checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9)
3036	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10)
3037	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11)
3038
3039	// // The following produce empty ranges.
3040	// checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen})
3041	// checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen})
3042	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen})
3043	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed})
3044
3045	// // Prefix is component-wise, not string prefix.
3046	checkRange(Key{"v1"}.AsPrefix(), 1)
3047	checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
3048	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
3049
3050	// Read from an index with DESC ordering.
3051	wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}
3052	if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns),
3053		wantNums); !ok {
3054		t.Errorf("desc: %s", msg)
3055	}
3056}
3057
3058type testTableRow struct{ Key, StringValue string }
3059
3060func compareRows(iter *RowIterator, wantNums []int) (string, bool) {
3061	rows, err := readAllTestTable(iter)
3062	if err != nil {
3063		return err.Error(), false
3064	}
3065	want := map[string]string{}
3066	for _, n := range wantNums {
3067		want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n)
3068	}
3069	got := map[string]string{}
3070	for _, r := range rows {
3071		got[r.Key] = r.StringValue
3072	}
3073	if !testEqual(got, want) {
3074		return fmt.Sprintf("got %v, want %v", got, want), false
3075	}
3076	return "", true
3077}
3078
3079func isNaN(x interface{}) bool {
3080	f, ok := x.(float64)
3081	if !ok {
3082		return false
3083	}
3084	return math.IsNaN(f)
3085}
3086
3087// createClient creates Cloud Spanner data client.
3088func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) {
3089	client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc})
3090	if err != nil {
3091		return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
3092	}
3093	return client, nil
3094}
3095
3096// populate prepares the database with some data.
3097func populate(ctx context.Context, client *Client) error {
3098	// Populate data
3099	var err error
3100	m := InsertMap("test", map[string]interface{}{
3101		"a": str1,
3102		"b": str2,
3103	})
3104	_, err = client.Apply(ctx, []*Mutation{m})
3105	return err
3106}
3107
3108func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) {
3109	if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) {
3110		return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false
3111	}
3112	return "", true
3113}
3114
3115func rowToValues(r *Row) ([]interface{}, error) {
3116	var x int64
3117	var y, z string
3118	if err := r.Column(0, &x); err != nil {
3119		return nil, err
3120	}
3121	if err := r.Column(1, &y); err != nil {
3122		return nil, err
3123	}
3124	if err := r.Column(2, &z); err != nil {
3125		return nil, err
3126	}
3127	return []interface{}{x, y, z}, nil
3128}
3129
3130func readAll(iter *RowIterator) ([][]interface{}, error) {
3131	defer iter.Stop()
3132	var vals [][]interface{}
3133	for {
3134		row, err := iter.Next()
3135		if err == iterator.Done {
3136			return vals, nil
3137		}
3138		if err != nil {
3139			return nil, err
3140		}
3141		v, err := rowToValues(row)
3142		if err != nil {
3143			return nil, err
3144		}
3145		vals = append(vals, v)
3146	}
3147}
3148
3149func readAllTestTable(iter *RowIterator) ([]testTableRow, error) {
3150	defer iter.Stop()
3151	var vals []testTableRow
3152	for {
3153		row, err := iter.Next()
3154		if err == iterator.Done {
3155			return vals, nil
3156		}
3157		if err != nil {
3158			return nil, err
3159		}
3160		var ttr testTableRow
3161		if err := row.ToStruct(&ttr); err != nil {
3162			return nil, err
3163		}
3164		vals = append(vals, ttr)
3165	}
3166}
3167
3168func readAllAccountsTable(iter *RowIterator) ([][]interface{}, error) {
3169	defer iter.Stop()
3170	var vals [][]interface{}
3171	for {
3172		row, err := iter.Next()
3173		if err == iterator.Done {
3174			return vals, nil
3175		}
3176		if err != nil {
3177			return nil, err
3178		}
3179		var id, balance int64
3180		var nickname string
3181		err = row.Columns(&id, &nickname, &balance)
3182		if err != nil {
3183			return nil, err
3184		}
3185		vals = append(vals, []interface{}{id, nickname, balance})
3186	}
3187}
3188
3189func maxDuration(a, b time.Duration) time.Duration {
3190	if a > b {
3191		return a
3192	}
3193	return b
3194}
3195
3196func isEmulatorEnvSet() bool {
3197	return os.Getenv("SPANNER_EMULATOR_HOST") != ""
3198}
3199
3200func skipEmulatorTest(t *testing.T) {
3201	if isEmulatorEnvSet() {
3202		t.Skip("Skipping testing against the emulator.")
3203	}
3204}
3205