1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"errors"
22	"flag"
23	"fmt"
24	"log"
25	"math"
26	"os"
27	"reflect"
28	"strings"
29	"sync"
30	"testing"
31	"time"
32
33	"cloud.google.com/go/civil"
34	"cloud.google.com/go/internal/testutil"
35	"cloud.google.com/go/internal/uid"
36	database "cloud.google.com/go/spanner/admin/database/apiv1"
37	"google.golang.org/api/iterator"
38	"google.golang.org/api/option"
39	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
40	"google.golang.org/grpc/codes"
41	"google.golang.org/grpc/status"
42)
43
44var (
45	// testProjectID specifies the project used for testing. It can be changed
46	// by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID.
47	testProjectID = testutil.ProjID()
48
49	dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
50
51	// TODO(deklerk): When we can programmatically create instances, we should use
52	// uid.New as the test instance name.
53	// testInstanceID specifies the Cloud Spanner instance used for testing.
54	testInstanceID = "go-integration-test"
55
56	testTable        = "TestTable"
57	testTableIndex   = "TestTableByValue"
58	testTableColumns = []string{"Key", "StringValue"}
59
60	// admin is a spanner.DatabaseAdminClient.
61	admin *database.DatabaseAdminClient
62
63	singerDBStatements = []string{
64		`CREATE TABLE Singers (
65				SingerId	INT64 NOT NULL,
66				FirstName	STRING(1024),
67				LastName	STRING(1024),
68				SingerInfo	BYTES(MAX)
69			) PRIMARY KEY (SingerId)`,
70		`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
71		`CREATE TABLE Accounts (
72				AccountId	INT64 NOT NULL,
73				Nickname	STRING(100),
74				Balance		INT64 NOT NULL,
75			) PRIMARY KEY (AccountId)`,
76		`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
77		`CREATE TABLE Types (
78				RowID		INT64 NOT NULL,
79				String		STRING(MAX),
80				StringArray	ARRAY<STRING(MAX)>,
81				Bytes		BYTES(MAX),
82				BytesArray	ARRAY<BYTES(MAX)>,
83				Int64a		INT64,
84				Int64Array	ARRAY<INT64>,
85				Bool		BOOL,
86				BoolArray	ARRAY<BOOL>,
87				Float64		FLOAT64,
88				Float64Array	ARRAY<FLOAT64>,
89				Date		DATE,
90				DateArray	ARRAY<DATE>,
91				Timestamp	TIMESTAMP,
92				TimestampArray	ARRAY<TIMESTAMP>,
93			) PRIMARY KEY (RowID)`,
94	}
95
96	readDBStatements = []string{
97		`CREATE TABLE TestTable (
98                    Key          STRING(MAX) NOT NULL,
99                    StringValue  STRING(MAX)
100            ) PRIMARY KEY (Key)`,
101		`CREATE INDEX TestTableByValue ON TestTable(StringValue)`,
102		`CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`,
103	}
104
105	simpleDBStatements = []string{
106		`CREATE TABLE test (
107				a	STRING(1024),
108				b	STRING(1024),
109			) PRIMARY KEY (a)`,
110	}
111	simpleDBTableColumns = []string{"a", "b"}
112
113	ctsDBStatements = []string{
114		`CREATE TABLE TestTable (
115		    Key  STRING(MAX) NOT NULL,
116		    Ts   TIMESTAMP OPTIONS (allow_commit_timestamp = true),
117	    ) PRIMARY KEY (Key)`,
118	}
119)
120
121const (
122	str1 = "alice"
123	str2 = "a@example.com"
124)
125
126func TestMain(m *testing.M) {
127	cleanup := initIntegrationTests()
128	res := m.Run()
129	cleanup()
130	os.Exit(res)
131}
132
133func initIntegrationTests() func() {
134	ctx := context.Background()
135	flag.Parse() // Needed for testing.Short().
136	noop := func() {}
137
138	if testing.Short() {
139		log.Println("Integration tests skipped in -short mode.")
140		return noop
141	}
142
143	if testProjectID == "" {
144		log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing")
145		return noop
146	}
147
148	ts := testutil.TokenSource(ctx, AdminScope, Scope)
149	if ts == nil {
150		log.Printf("Integration test skipped: cannot get service account credential from environment variable %v", "GCLOUD_TESTS_GOLANG_KEY")
151		return noop
152	}
153	var err error
154
155	// Create Admin client and Data client.
156	admin, err = database.NewDatabaseAdminClient(ctx, option.WithTokenSource(ts), option.WithEndpoint(endpoint))
157	if err != nil {
158		log.Fatalf("cannot create admin client: %v", err)
159	}
160
161	return func() {
162		cleanupDatabases()
163		admin.Close()
164	}
165}
166
167// Test SingleUse transaction.
168func TestIntegration_SingleUse(t *testing.T) {
169	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
170	defer cancel()
171	// Set up testing environment.
172	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
173	defer cleanup()
174
175	writes := []struct {
176		row []interface{}
177		ts  time.Time
178	}{
179		{row: []interface{}{1, "Marc", "Foo"}},
180		{row: []interface{}{2, "Tars", "Bar"}},
181		{row: []interface{}{3, "Alpha", "Beta"}},
182		{row: []interface{}{4, "Last", "End"}},
183	}
184	// Try to write four rows through the Apply API.
185	for i, w := range writes {
186		var err error
187		m := InsertOrUpdate("Singers",
188			[]string{"SingerId", "FirstName", "LastName"},
189			w.row)
190		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
191			t.Fatal(err)
192		}
193	}
194
195	// Test reading rows with different timestamp bounds.
196	for i, test := range []struct {
197		name    string
198		want    [][]interface{}
199		tb      TimestampBound
200		checkTs func(time.Time) error
201	}{
202		{
203			name: "strong",
204			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
205			tb:   StrongRead(),
206			checkTs: func(ts time.Time) error {
207				// writes[3] is the last write, all subsequent strong read
208				// should have a timestamp larger than that.
209				if ts.Before(writes[3].ts) {
210					return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
211				}
212				return nil
213			},
214		},
215		{
216			name: "min_read_timestamp",
217			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
218			tb:   MinReadTimestamp(writes[3].ts),
219			checkTs: func(ts time.Time) error {
220				if ts.Before(writes[3].ts) {
221					return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
222				}
223				return nil
224			},
225		},
226		{
227			name: "max_staleness",
228			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
229			tb:   MaxStaleness(time.Second),
230			checkTs: func(ts time.Time) error {
231				if ts.Before(writes[3].ts) {
232					return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
233				}
234				return nil
235			},
236		},
237		{
238			name: "read_timestamp",
239			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
240			tb:   ReadTimestamp(writes[2].ts),
241			checkTs: func(ts time.Time) error {
242				if ts != writes[2].ts {
243					return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts)
244				}
245				return nil
246			},
247		},
248		{
249			name: "exact_staleness",
250			want: nil,
251			// Specify a staleness which should be already before this test because
252			// context timeout is set to be 10s.
253			tb: ExactStaleness(11 * time.Second),
254			checkTs: func(ts time.Time) error {
255				if ts.After(writes[0].ts) {
256					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts)
257				}
258				return nil
259			},
260		},
261	} {
262		t.Run(test.name, func(t *testing.T) {
263			// SingleUse.Query
264			su := client.Single().WithTimestampBound(test.tb)
265			got, err := readAll(su.Query(
266				ctx,
267				Statement{
268					"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
269					map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
270				}))
271			if err != nil {
272				t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err)
273			}
274			if !testEqual(got, test.want) {
275				t.Fatalf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want)
276			}
277			rts, err := su.Timestamp()
278			if err != nil {
279				t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err)
280			}
281			if err := test.checkTs(rts); err != nil {
282				t.Fatalf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err)
283			}
284			// SingleUse.Read
285			su = client.Single().WithTimestampBound(test.tb)
286			got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
287			if err != nil {
288				t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err)
289			}
290			if !testEqual(got, test.want) {
291				t.Fatalf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want)
292			}
293			rts, err = su.Timestamp()
294			if err != nil {
295				t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err)
296			}
297			if err := test.checkTs(rts); err != nil {
298				t.Fatalf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err)
299			}
300			// SingleUse.ReadRow
301			got = nil
302			for _, k := range []Key{{1}, {3}, {4}} {
303				su = client.Single().WithTimestampBound(test.tb)
304				r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
305				if err != nil {
306					continue
307				}
308				v, err := rowToValues(r)
309				if err != nil {
310					continue
311				}
312				got = append(got, v)
313				rts, err = su.Timestamp()
314				if err != nil {
315					t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
316				}
317				if err := test.checkTs(rts); err != nil {
318					t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
319				}
320			}
321			if !testEqual(got, test.want) {
322				t.Fatalf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want)
323			}
324			// SingleUse.ReadUsingIndex
325			su = client.Single().WithTimestampBound(test.tb)
326			got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
327			if err != nil {
328				t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err)
329			}
330			// The results from ReadUsingIndex is sorted by the index rather than primary key.
331			if len(got) != len(test.want) {
332				t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
333			}
334			for j, g := range got {
335				if j > 0 {
336					prev := got[j-1][1].(string) + got[j-1][2].(string)
337					curr := got[j][1].(string) + got[j][2].(string)
338					if strings.Compare(prev, curr) > 0 {
339						t.Fatalf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
340					}
341				}
342				found := false
343				for _, w := range test.want {
344					if testEqual(g, w) {
345						found = true
346					}
347				}
348				if !found {
349					t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
350					break
351				}
352			}
353			rts, err = su.Timestamp()
354			if err != nil {
355				t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
356			}
357			if err := test.checkTs(rts); err != nil {
358				t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
359			}
360		})
361	}
362}
363
364func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) {
365	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
366	defer cancel()
367	// Set up testing environment.
368	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
369	defer cleanup()
370
371	writes := []struct {
372		row []interface{}
373		ts  time.Time
374	}{
375		{row: []interface{}{1, "Marc", "Foo"}},
376		{row: []interface{}{2, "Tars", "Bar"}},
377		{row: []interface{}{3, "Alpha", "Beta"}},
378		{row: []interface{}{4, "Last", "End"}},
379	}
380	// Try to write four rows through the Apply API.
381	for i, w := range writes {
382		var err error
383		m := InsertOrUpdate("Singers",
384			[]string{"SingerId", "FirstName", "LastName"},
385			w.row)
386		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
387			t.Fatal(err)
388		}
389	}
390
391	su := client.Single()
392	const limit = 1
393	gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}),
394		[]string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit}))
395	if err != nil {
396		t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err)
397	}
398	if got, want := len(gotRows), limit; got != want {
399		t.Errorf("got %d, want %d", got, want)
400	}
401}
402
403// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it
404// also tests for a single timestamp across multiple reads.
405func TestIntegration_ReadOnlyTransaction(t *testing.T) {
406	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
407	defer cancel()
408	// Set up testing environment.
409	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
410	defer cleanup()
411
412	writes := []struct {
413		row []interface{}
414		ts  time.Time
415	}{
416		{row: []interface{}{1, "Marc", "Foo"}},
417		{row: []interface{}{2, "Tars", "Bar"}},
418		{row: []interface{}{3, "Alpha", "Beta"}},
419		{row: []interface{}{4, "Last", "End"}},
420	}
421	// Try to write four rows through the Apply API.
422	for i, w := range writes {
423		var err error
424		m := InsertOrUpdate("Singers",
425			[]string{"SingerId", "FirstName", "LastName"},
426			w.row)
427		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
428			t.Fatal(err)
429		}
430	}
431
432	// For testing timestamp bound staleness.
433	<-time.After(time.Second)
434
435	// Test reading rows with different timestamp bounds.
436	for i, test := range []struct {
437		want    [][]interface{}
438		tb      TimestampBound
439		checkTs func(time.Time) error
440	}{
441		// Note: min_read_timestamp and max_staleness are not supported by
442		// ReadOnlyTransaction. See API document for more details.
443		{
444			// strong
445			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
446			StrongRead(),
447			func(ts time.Time) error {
448				if ts.Before(writes[3].ts) {
449					return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
450				}
451				return nil
452			},
453		},
454		{
455			// read_timestamp
456			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
457			ReadTimestamp(writes[2].ts),
458			func(ts time.Time) error {
459				if ts != writes[2].ts {
460					return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts)
461				}
462				return nil
463			},
464		},
465		{
466			// exact_staleness
467			nil,
468			// Specify a staleness which should be already before this test
469			// because context timeout is set to be 10s.
470			ExactStaleness(11 * time.Second),
471			func(ts time.Time) error {
472				if ts.After(writes[0].ts) {
473					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts)
474				}
475				return nil
476			},
477		},
478	} {
479		// ReadOnlyTransaction.Query
480		ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb)
481		got, err := readAll(ro.Query(
482			ctx,
483			Statement{
484				"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
485				map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
486			}))
487		if err != nil {
488			t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err)
489		}
490		if !testEqual(got, test.want) {
491			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want)
492		}
493		rts, err := ro.Timestamp()
494		if err != nil {
495			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err)
496		}
497		if err := test.checkTs(rts); err != nil {
498			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err)
499		}
500		roTs := rts
501		// ReadOnlyTransaction.Read
502		got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
503		if err != nil {
504			t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err)
505		}
506		if !testEqual(got, test.want) {
507			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want)
508		}
509		rts, err = ro.Timestamp()
510		if err != nil {
511			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err)
512		}
513		if err := test.checkTs(rts); err != nil {
514			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err)
515		}
516		if roTs != rts {
517			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
518		}
519		// ReadOnlyTransaction.ReadRow
520		got = nil
521		for _, k := range []Key{{1}, {3}, {4}} {
522			r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
523			if err != nil {
524				continue
525			}
526			v, err := rowToValues(r)
527			if err != nil {
528				continue
529			}
530			got = append(got, v)
531			rts, err = ro.Timestamp()
532			if err != nil {
533				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
534			}
535			if err := test.checkTs(rts); err != nil {
536				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
537			}
538			if roTs != rts {
539				t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
540			}
541		}
542		if !testEqual(got, test.want) {
543			t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want)
544		}
545		// SingleUse.ReadUsingIndex
546		got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
547		if err != nil {
548			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err)
549		}
550		// The results from ReadUsingIndex is sorted by the index rather than
551		// primary key.
552		if len(got) != len(test.want) {
553			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
554		}
555		for j, g := range got {
556			if j > 0 {
557				prev := got[j-1][1].(string) + got[j-1][2].(string)
558				curr := got[j][1].(string) + got[j][2].(string)
559				if strings.Compare(prev, curr) > 0 {
560					t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
561				}
562			}
563			found := false
564			for _, w := range test.want {
565				if testEqual(g, w) {
566					found = true
567				}
568			}
569			if !found {
570				t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
571				break
572			}
573		}
574		rts, err = ro.Timestamp()
575		if err != nil {
576			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
577		}
578		if err := test.checkTs(rts); err != nil {
579			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
580		}
581		if roTs != rts {
582			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
583		}
584		ro.Close()
585	}
586}
587
588// Test ReadOnlyTransaction with different timestamp bound when there's an
589// update at the same time.
590func TestIntegration_UpdateDuringRead(t *testing.T) {
591	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
592	defer cancel()
593	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
594	defer cleanup()
595
596	for i, tb := range []TimestampBound{
597		StrongRead(),
598		ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour
599		ExactStaleness(time.Minute * 30),
600	} {
601		ro := client.ReadOnlyTransaction().WithTimestampBound(tb)
602		_, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
603		if ErrCode(err) != codes.NotFound {
604			t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err)
605		}
606
607		m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i})
608		if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
609			t.Fatal(err)
610		}
611
612		_, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
613		if ErrCode(err) != codes.NotFound {
614			t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err)
615		}
616	}
617}
618
619// Test ReadWriteTransaction.
620func TestIntegration_ReadWriteTransaction(t *testing.T) {
621	// Give a longer deadline because of transaction backoffs.
622	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
623	defer cancel()
624	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
625	defer cleanup()
626
627	// Set up two accounts
628	accounts := []*Mutation{
629		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
630		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
631	}
632	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
633		t.Fatal(err)
634	}
635	wg := sync.WaitGroup{}
636
637	readBalance := func(iter *RowIterator) (int64, error) {
638		defer iter.Stop()
639		var bal int64
640		for {
641			row, err := iter.Next()
642			if err == iterator.Done {
643				return bal, nil
644			}
645			if err != nil {
646				return 0, err
647			}
648			if err := row.Column(0, &bal); err != nil {
649				return 0, err
650			}
651		}
652	}
653
654	for i := 0; i < 20; i++ {
655		wg.Add(1)
656		go func(iter int) {
657			defer wg.Done()
658			_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
659				// Query Foo's balance and Bar's balance.
660				bf, e := readBalance(tx.Query(ctx,
661					Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}}))
662				if e != nil {
663					return e
664				}
665				bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
666				if e != nil {
667					return e
668				}
669				if bf <= 0 {
670					return nil
671				}
672				bf--
673				bb++
674				return tx.BufferWrite([]*Mutation{
675					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}),
676					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}),
677				})
678			})
679			if err != nil {
680				t.Errorf("%d: failed to execute transaction: %v", iter, err)
681			}
682		}(i)
683	}
684	// Because of context timeout, all goroutines will eventually return.
685	wg.Wait()
686	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
687		var bf, bb int64
688		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"})
689		if e != nil {
690			return e
691		}
692		if ce := r.Column(0, &bf); ce != nil {
693			return ce
694		}
695		bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"}))
696		if e != nil {
697			return e
698		}
699		if bf != 30 || bb != 21 {
700			t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21)
701		}
702		return nil
703	})
704	if err != nil {
705		t.Errorf("failed to check balances: %v", err)
706	}
707}
708
709func TestIntegration_Reads(t *testing.T) {
710	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
711	defer cancel()
712	// Set up testing environment.
713	client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements)
714	defer cleanup()
715
716	// Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2".
717	var ms []*Mutation
718	for i := 0; i < 15; i++ {
719		ms = append(ms, InsertOrUpdate(testTable,
720			testTableColumns,
721			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
722	}
723	// Don't use ApplyAtLeastOnce, so we can test the other code path.
724	if _, err := client.Apply(ctx, ms); err != nil {
725		t.Fatal(err)
726	}
727
728	// Empty read.
729	rows, err := readAllTestTable(client.Single().Read(ctx, testTable,
730		KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns))
731	if err != nil {
732		t.Fatal(err)
733	}
734	if got, want := len(rows), 0; got != want {
735		t.Errorf("got %d, want %d", got, want)
736	}
737
738	// Index empty read.
739	rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex,
740		KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns))
741	if err != nil {
742		t.Fatal(err)
743	}
744	if got, want := len(rows), 0; got != want {
745		t.Errorf("got %d, want %d", got, want)
746	}
747
748	// Point read.
749	row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns)
750	if err != nil {
751		t.Fatal(err)
752	}
753	var got testTableRow
754	if err := row.ToStruct(&got); err != nil {
755		t.Fatal(err)
756	}
757	if want := (testTableRow{"k1", "v1"}); got != want {
758		t.Errorf("got %v, want %v", got, want)
759	}
760
761	// Point read not found.
762	_, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns)
763	if ErrCode(err) != codes.NotFound {
764		t.Fatalf("got %v, want NotFound", err)
765	}
766
767	// No index point read not found, because Go does not have ReadRowUsingIndex.
768
769	rangeReads(ctx, t, client)
770	indexRangeReads(ctx, t, client)
771}
772
773func TestIntegration_EarlyTimestamp(t *testing.T) {
774	// Test that we can get the timestamp from a read-only transaction as
775	// soon as we have read at least one row.
776	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
777	defer cancel()
778	// Set up testing environment.
779	client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements)
780	defer cleanup()
781
782	var ms []*Mutation
783	for i := 0; i < 3; i++ {
784		ms = append(ms, InsertOrUpdate(testTable,
785			testTableColumns,
786			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
787	}
788	if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
789		t.Fatal(err)
790	}
791
792	txn := client.Single()
793	iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns)
794	defer iter.Stop()
795	// In single-use transaction, we should get an error before reading anything.
796	if _, err := txn.Timestamp(); err == nil {
797		t.Error("wanted error, got nil")
798	}
799	// After reading one row, the timestamp should be available.
800	_, err := iter.Next()
801	if err != nil {
802		t.Fatal(err)
803	}
804	if _, err := txn.Timestamp(); err != nil {
805		t.Errorf("got %v, want nil", err)
806	}
807
808	txn = client.ReadOnlyTransaction()
809	defer txn.Close()
810	iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns)
811	defer iter.Stop()
812	// In an ordinary read-only transaction, the timestamp should be
813	// available immediately.
814	if _, err := txn.Timestamp(); err != nil {
815		t.Errorf("got %v, want nil", err)
816	}
817}
818
819func TestIntegration_NestedTransaction(t *testing.T) {
820	// You cannot use a transaction from inside a read-write transaction.
821	ctx := context.Background()
822	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
823	defer cleanup()
824
825	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
826		_, err := client.ReadWriteTransaction(ctx,
827			func(context.Context, *ReadWriteTransaction) error { return nil })
828		if ErrCode(err) != codes.FailedPrecondition {
829			t.Fatalf("got %v, want FailedPrecondition", err)
830		}
831		_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
832		if ErrCode(err) != codes.FailedPrecondition {
833			t.Fatalf("got %v, want FailedPrecondition", err)
834		}
835		rot := client.ReadOnlyTransaction()
836		defer rot.Close()
837		_, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
838		if ErrCode(err) != codes.FailedPrecondition {
839			t.Fatalf("got %v, want FailedPrecondition", err)
840		}
841		return nil
842	})
843	if err != nil {
844		t.Fatal(err)
845	}
846}
847
848// Test client recovery on database recreation.
849func TestIntegration_DbRemovalRecovery(t *testing.T) {
850	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
851	defer cancel()
852	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
853	defer cleanup()
854
855	// Drop the testing database.
856	if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil {
857		t.Fatalf("failed to drop testing database %v: %v", dbPath, err)
858	}
859
860	// Now, send the query.
861	iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
862	defer iter.Stop()
863	if _, err := iter.Next(); err == nil {
864		t.Errorf("client sends query to removed database successfully, want it to fail")
865	}
866
867	// Recreate database and table.
868	dbName := dbPath[strings.LastIndex(dbPath, "/")+1:]
869	op, err := admin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{
870		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
871		CreateStatement: "CREATE DATABASE " + dbName,
872		ExtraStatements: []string{
873			`CREATE TABLE Singers (
874				SingerId        INT64 NOT NULL,
875				FirstName       STRING(1024),
876				LastName        STRING(1024),
877				SingerInfo      BYTES(MAX)
878			) PRIMARY KEY (SingerId)`,
879		},
880	})
881	if err != nil {
882		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
883	}
884	if _, err := op.Wait(ctx); err != nil {
885		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
886	}
887
888	// Now, send the query again.
889	iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
890	defer iter.Stop()
891	_, err = iter.Next()
892	if err != nil && err != iterator.Done {
893		t.Errorf("failed to send query to database %v: %v", dbPath, err)
894	}
895}
896
897// Test encoding/decoding non-struct Cloud Spanner types.
898func TestIntegration_BasicTypes(t *testing.T) {
899	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
900	defer cancel()
901	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
902	defer cleanup()
903
904	t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z")
905	// Boundaries
906	t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z")
907	t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z")
908	d1, _ := civil.ParseDate("2016-11-15")
909	// Boundaries
910	d2, _ := civil.ParseDate("0001-01-01")
911	d3, _ := civil.ParseDate("9999-12-31")
912
913	tests := []struct {
914		col  string
915		val  interface{}
916		want interface{}
917	}{
918		{col: "String", val: ""},
919		{col: "String", val: "", want: NullString{"", true}},
920		{col: "String", val: "foo"},
921		{col: "String", val: "foo", want: NullString{"foo", true}},
922		{col: "String", val: NullString{"bar", true}, want: "bar"},
923		{col: "String", val: NullString{"bar", false}, want: NullString{"", false}},
924		{col: "String", val: nil, want: NullString{}},
925		{col: "StringArray", val: []string(nil), want: []NullString(nil)},
926		{col: "StringArray", val: []string{}, want: []NullString{}},
927		{col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}},
928		{col: "StringArray", val: []NullString(nil)},
929		{col: "StringArray", val: []NullString{}},
930		{col: "StringArray", val: []NullString{{"foo", true}, {}}},
931		{col: "Bytes", val: []byte{}},
932		{col: "Bytes", val: []byte{1, 2, 3}},
933		{col: "Bytes", val: []byte(nil)},
934		{col: "BytesArray", val: [][]byte(nil)},
935		{col: "BytesArray", val: [][]byte{}},
936		{col: "BytesArray", val: [][]byte{{1}, {2, 3}}},
937		{col: "Int64a", val: 0, want: int64(0)},
938		{col: "Int64a", val: -1, want: int64(-1)},
939		{col: "Int64a", val: 2, want: int64(2)},
940		{col: "Int64a", val: int64(3)},
941		{col: "Int64a", val: 4, want: NullInt64{4, true}},
942		{col: "Int64a", val: NullInt64{5, true}, want: int64(5)},
943		{col: "Int64a", val: NullInt64{6, true}, want: int64(6)},
944		{col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}},
945		{col: "Int64a", val: nil, want: NullInt64{}},
946		{col: "Int64Array", val: []int(nil), want: []NullInt64(nil)},
947		{col: "Int64Array", val: []int{}, want: []NullInt64{}},
948		{col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
949		{col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)},
950		{col: "Int64Array", val: []int64{}, want: []NullInt64{}},
951		{col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
952		{col: "Int64Array", val: []NullInt64(nil)},
953		{col: "Int64Array", val: []NullInt64{}},
954		{col: "Int64Array", val: []NullInt64{{1, true}, {}}},
955		{col: "Bool", val: false},
956		{col: "Bool", val: true},
957		{col: "Bool", val: false, want: NullBool{false, true}},
958		{col: "Bool", val: true, want: NullBool{true, true}},
959		{col: "Bool", val: NullBool{true, true}},
960		{col: "Bool", val: NullBool{false, false}},
961		{col: "Bool", val: nil, want: NullBool{}},
962		{col: "BoolArray", val: []bool(nil), want: []NullBool(nil)},
963		{col: "BoolArray", val: []bool{}, want: []NullBool{}},
964		{col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}},
965		{col: "BoolArray", val: []NullBool(nil)},
966		{col: "BoolArray", val: []NullBool{}},
967		{col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}},
968		{col: "Float64", val: 0.0},
969		{col: "Float64", val: 3.14},
970		{col: "Float64", val: math.NaN()},
971		{col: "Float64", val: math.Inf(1)},
972		{col: "Float64", val: math.Inf(-1)},
973		{col: "Float64", val: 2.78, want: NullFloat64{2.78, true}},
974		{col: "Float64", val: NullFloat64{2.71, true}, want: 2.71},
975		{col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}},
976		{col: "Float64", val: NullFloat64{0, false}},
977		{col: "Float64", val: nil, want: NullFloat64{}},
978		{col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)},
979		{col: "Float64Array", val: []float64{}, want: []NullFloat64{}},
980		{col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}},
981		{col: "Float64Array", val: []NullFloat64(nil)},
982		{col: "Float64Array", val: []NullFloat64{}},
983		{col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}},
984		{col: "Date", val: d1},
985		{col: "Date", val: d1, want: NullDate{d1, true}},
986		{col: "Date", val: NullDate{d1, true}},
987		{col: "Date", val: NullDate{d1, true}, want: d1},
988		{col: "Date", val: NullDate{civil.Date{}, false}},
989		{col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)},
990		{col: "DateArray", val: []civil.Date{}, want: []NullDate{}},
991		{col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}},
992		{col: "Timestamp", val: t1},
993		{col: "Timestamp", val: t1, want: NullTime{t1, true}},
994		{col: "Timestamp", val: NullTime{t1, true}},
995		{col: "Timestamp", val: NullTime{t1, true}, want: t1},
996		{col: "Timestamp", val: NullTime{}},
997		{col: "Timestamp", val: nil, want: NullTime{}},
998		{col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)},
999		{col: "TimestampArray", val: []time.Time{}, want: []NullTime{}},
1000		{col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}},
1001	}
1002
1003	// Write rows into table first.
1004	var muts []*Mutation
1005	for i, test := range tests {
1006		muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val}))
1007	}
1008	if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil {
1009		t.Fatal(err)
1010	}
1011
1012	for i, test := range tests {
1013		row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col})
1014		if err != nil {
1015			t.Fatalf("Unable to fetch row %v: %v", i, err)
1016		}
1017		// Create new instance of type of test.want.
1018		want := test.want
1019		if want == nil {
1020			want = test.val
1021		}
1022		gotp := reflect.New(reflect.TypeOf(want))
1023		if err := row.Column(0, gotp.Interface()); err != nil {
1024			t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err)
1025			continue
1026		}
1027		got := reflect.Indirect(gotp).Interface()
1028
1029		// One of the test cases is checking NaN handling.  Given
1030		// NaN!=NaN, we can't use reflect to test for it.
1031		if isNaN(got) && isNaN(want) {
1032			continue
1033		}
1034
1035		// Check non-NaN cases.
1036		if !testEqual(got, want) {
1037			t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want)
1038			continue
1039		}
1040	}
1041}
1042
1043// Test decoding Cloud Spanner STRUCT type.
1044func TestIntegration_StructTypes(t *testing.T) {
1045	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1046	defer cancel()
1047	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
1048	defer cleanup()
1049
1050	tests := []struct {
1051		q    Statement
1052		want func(r *Row) error
1053	}{
1054		{
1055			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`},
1056			want: func(r *Row) error {
1057				// Test STRUCT ARRAY decoding to []NullRow.
1058				var rows []NullRow
1059				if err := r.Column(0, &rows); err != nil {
1060					return err
1061				}
1062				if len(rows) != 1 {
1063					return fmt.Errorf("len(rows) = %d; want 1", len(rows))
1064				}
1065				if !rows[0].Valid {
1066					return fmt.Errorf("rows[0] is NULL")
1067				}
1068				var i, j int64
1069				if err := rows[0].Row.Columns(&i, &j); err != nil {
1070					return err
1071				}
1072				if i != 1 || j != 2 {
1073					return fmt.Errorf("got (%d,%d), want (1,2)", i, j)
1074				}
1075				return nil
1076			},
1077		},
1078		{
1079			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`},
1080			want: func(r *Row) error {
1081				// Test Row.ToStruct.
1082				s := struct {
1083					Col1 []*struct {
1084						Foo int64 `spanner:"foo"`
1085						Bar int64 `spanner:"bar"`
1086					} `spanner:"col1"`
1087				}{}
1088				if err := r.ToStruct(&s); err != nil {
1089					return err
1090				}
1091				want := struct {
1092					Col1 []*struct {
1093						Foo int64 `spanner:"foo"`
1094						Bar int64 `spanner:"bar"`
1095					} `spanner:"col1"`
1096				}{
1097					Col1: []*struct {
1098						Foo int64 `spanner:"foo"`
1099						Bar int64 `spanner:"bar"`
1100					}{
1101						{
1102							Foo: 1,
1103							Bar: 2,
1104						},
1105					},
1106				}
1107				if !testEqual(want, s) {
1108					return fmt.Errorf("unexpected decoding result: %v, want %v", s, want)
1109				}
1110				return nil
1111			},
1112		},
1113	}
1114	for i, test := range tests {
1115		iter := client.Single().Query(ctx, test.q)
1116		defer iter.Stop()
1117		row, err := iter.Next()
1118		if err != nil {
1119			t.Errorf("%d: %v", i, err)
1120			continue
1121		}
1122		if err := test.want(row); err != nil {
1123			t.Errorf("%d: %v", i, err)
1124			continue
1125		}
1126	}
1127}
1128
1129func TestIntegration_StructParametersUnsupported(t *testing.T) {
1130	ctx := context.Background()
1131	client, _, cleanup := prepareIntegrationTest(ctx, t, nil)
1132	defer cleanup()
1133
1134	for _, test := range []struct {
1135		param       interface{}
1136		wantCode    codes.Code
1137		wantMsgPart string
1138	}{
1139		{
1140			struct {
1141				Field int
1142			}{10},
1143			codes.Unimplemented,
1144			"Unsupported query shape: " +
1145				"A struct value cannot be returned as a column value. " +
1146				"Rewrite the query to flatten the struct fields in the result.",
1147		},
1148		{
1149			[]struct {
1150				Field int
1151			}{{10}, {20}},
1152			codes.Unimplemented,
1153			"Unsupported query shape: " +
1154				"This query can return a null-valued array of struct, " +
1155				"which is not supported by Spanner.",
1156		},
1157	} {
1158		iter := client.Single().Query(ctx, Statement{
1159			SQL:    "SELECT @p",
1160			Params: map[string]interface{}{"p": test.param},
1161		})
1162		_, err := iter.Next()
1163		iter.Stop()
1164		if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok {
1165			t.Fatal(msg)
1166		}
1167	}
1168}
1169
1170// Test queries of the form "SELECT expr".
1171func TestIntegration_QueryExpressions(t *testing.T) {
1172	ctx := context.Background()
1173	client, _, cleanup := prepareIntegrationTest(ctx, t, nil)
1174	defer cleanup()
1175
1176	newRow := func(vals []interface{}) *Row {
1177		row, err := NewRow(make([]string, len(vals)), vals)
1178		if err != nil {
1179			t.Fatal(err)
1180		}
1181		return row
1182	}
1183
1184	tests := []struct {
1185		expr string
1186		want interface{}
1187	}{
1188		{"1", int64(1)},
1189		{"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}},
1190		{"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}},
1191		{"IEEE_DIVIDE(1, 0)", math.Inf(1)},
1192		{"IEEE_DIVIDE(-1, 0)", math.Inf(-1)},
1193		{"IEEE_DIVIDE(0, 0)", math.NaN()},
1194		// TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate.
1195		{"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}},
1196		{"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}},
1197		{"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}},
1198	}
1199	for _, test := range tests {
1200		iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr})
1201		defer iter.Stop()
1202		row, err := iter.Next()
1203		if err != nil {
1204			t.Errorf("%q: %v", test.expr, err)
1205			continue
1206		}
1207		// Create new instance of type of test.want.
1208		gotp := reflect.New(reflect.TypeOf(test.want))
1209		if err := row.Column(0, gotp.Interface()); err != nil {
1210			t.Errorf("%q: Column returned error %v", test.expr, err)
1211			continue
1212		}
1213		got := reflect.Indirect(gotp).Interface()
1214		// TODO(jba): remove isNaN special case when we have a better equality predicate.
1215		if isNaN(got) && isNaN(test.want) {
1216			continue
1217		}
1218		if !testEqual(got, test.want) {
1219			t.Errorf("%q\n got  %#v\nwant %#v", test.expr, got, test.want)
1220		}
1221	}
1222}
1223
1224func TestIntegration_QueryStats(t *testing.T) {
1225	ctx := context.Background()
1226	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
1227	defer cleanup()
1228
1229	accounts := []*Mutation{
1230		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1231		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1232	}
1233	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1234		t.Fatal(err)
1235	}
1236	const sql = "SELECT Balance FROM Accounts"
1237
1238	qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil})
1239	if err != nil {
1240		t.Fatal(err)
1241	}
1242	if len(qp.PlanNodes) == 0 {
1243		t.Error("got zero plan nodes, expected at least one")
1244	}
1245
1246	iter := client.Single().QueryWithStats(ctx, Statement{sql, nil})
1247	defer iter.Stop()
1248	for {
1249		_, err := iter.Next()
1250		if err == iterator.Done {
1251			break
1252		}
1253		if err != nil {
1254			t.Fatal(err)
1255		}
1256	}
1257	if iter.QueryPlan == nil {
1258		t.Error("got nil QueryPlan, expected one")
1259	}
1260	if iter.QueryStats == nil {
1261		t.Error("got nil QueryStats, expected some")
1262	}
1263}
1264
1265func TestIntegration_InvalidDatabase(t *testing.T) {
1266	if testProjectID == "" {
1267		t.Skip("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing")
1268	}
1269	ctx := context.Background()
1270	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID)
1271	c, err := createClient(ctx, dbPath)
1272	// Client creation should succeed even if the database is invalid.
1273	if err != nil {
1274		t.Fatal(err)
1275	}
1276	_, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"})
1277	if msg, ok := matchError(err, codes.NotFound, ""); !ok {
1278		t.Fatal(msg)
1279	}
1280}
1281
1282func TestIntegration_ReadErrors(t *testing.T) {
1283	ctx := context.Background()
1284	client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements)
1285	defer cleanup()
1286
1287	// Read over invalid table fails
1288	_, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"})
1289	if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok {
1290		t.Error(msg)
1291	}
1292	// Read over invalid column fails
1293	_, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"})
1294	if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok {
1295		t.Error(msg)
1296	}
1297
1298	// Invalid query fails
1299	iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"})
1300	defer iter.Stop()
1301	_, err = iter.Next()
1302	if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok {
1303		t.Error(msg)
1304	}
1305
1306	// Read should fail on cancellation.
1307	cctx, cancel := context.WithCancel(ctx)
1308	cancel()
1309	_, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"})
1310	if msg, ok := matchError(err, codes.Canceled, ""); !ok {
1311		t.Error(msg)
1312	}
1313	// Read should fail if deadline exceeded.
1314	dctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
1315	defer cancel()
1316	<-dctx.Done()
1317	_, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"})
1318	if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok {
1319		t.Error(msg)
1320	}
1321}
1322
1323// Test TransactionRunner. Test that transactions are aborted and retried as
1324// expected.
1325func TestIntegration_TransactionRunner(t *testing.T) {
1326	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1327	defer cancel()
1328	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
1329	defer cleanup()
1330
1331	// Test 1: User error should abort the transaction.
1332	_, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1333		tx.BufferWrite([]*Mutation{
1334			Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})})
1335		return errors.New("user error")
1336	})
1337	// Empty read.
1338	rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"}))
1339	if err != nil {
1340		t.Fatal(err)
1341	}
1342	if got, want := len(rows), 0; got != want {
1343		t.Errorf("Empty read, got %d, want %d.", got, want)
1344	}
1345
1346	// Test 2: Expect abort and retry.
1347	// We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by
1348	// committing writes to the column txn2 have read, and expect the following
1349	// read to abort and txn2 retries.
1350
1351	// Set up two accounts
1352	accounts := []*Mutation{
1353		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}),
1354		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}),
1355	}
1356	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1357		t.Fatal(err)
1358	}
1359
1360	var (
1361		cTxn1Start  = make(chan struct{})
1362		cTxn1Commit = make(chan struct{})
1363		cTxn2Start  = make(chan struct{})
1364		wg          sync.WaitGroup
1365	)
1366
1367	// read balance, check error if we don't expect abort.
1368	readBalance := func(tx interface {
1369		ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error)
1370	}, key int64, expectAbort bool) (int64, error) {
1371		var b int64
1372		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"})
1373		if e != nil {
1374			if expectAbort && !isAbortErr(e) {
1375				t.Errorf("ReadRow got %v, want Abort error.", e)
1376			}
1377			return b, e
1378		}
1379		if ce := r.Column(0, &b); ce != nil {
1380			return b, ce
1381		}
1382		return b, nil
1383	}
1384
1385	wg.Add(2)
1386	// Txn 1
1387	go func() {
1388		defer wg.Done()
1389		var once sync.Once
1390		_, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1391			b, e := readBalance(tx, 1, false)
1392			if e != nil {
1393				return e
1394			}
1395			// txn 1 can abort, in that case we skip closing the channel on
1396			// retry.
1397			once.Do(func() { close(cTxn1Start) })
1398			e = tx.BufferWrite([]*Mutation{
1399				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})})
1400			if e != nil {
1401				return e
1402			}
1403			// Wait for second transaction.
1404			<-cTxn2Start
1405			return nil
1406		})
1407		close(cTxn1Commit)
1408		if e != nil {
1409			t.Errorf("Transaction 1 commit, got %v, want nil.", e)
1410		}
1411	}()
1412	// Txn 2
1413	go func() {
1414		// Wait until txn 1 starts.
1415		<-cTxn1Start
1416		defer wg.Done()
1417		var (
1418			once sync.Once
1419			b1   int64
1420			b2   int64
1421			e    error
1422		)
1423		_, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1424			if b1, e = readBalance(tx, 1, false); e != nil {
1425				return e
1426			}
1427			// Skip closing channel on retry.
1428			once.Do(func() { close(cTxn2Start) })
1429			// Wait until txn 1 successfully commits.
1430			<-cTxn1Commit
1431			// Txn1 has committed and written a balance to the account. Now this
1432			// transaction (txn2) reads and re-writes the balance. The first
1433			// time through, it will abort because it overlaps with txn1. Then
1434			// it will retry after txn1 commits, and succeed.
1435			if b2, e = readBalance(tx, 2, true); e != nil {
1436				return e
1437			}
1438			return tx.BufferWrite([]*Mutation{
1439				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})})
1440		})
1441		if e != nil {
1442			t.Errorf("Transaction 2 commit, got %v, want nil.", e)
1443		}
1444	}()
1445	wg.Wait()
1446	// Check that both transactions' effects are visible.
1447	for i := int64(1); i <= int64(2); i++ {
1448		if b, e := readBalance(client.Single(), i, false); e != nil {
1449			t.Fatalf("ReadBalance for key %d error %v.", i, e)
1450		} else if b != i {
1451			t.Errorf("Balance for key %d, got %d, want %d.", i, b, i)
1452		}
1453	}
1454}
1455
1456// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then
1457// serialize and deserialize both transaction and partition to be used in
1458// execution on another client, and compare results.
1459func TestIntegration_BatchQuery(t *testing.T) {
1460	// Set up testing environment.
1461	var (
1462		client2 *Client
1463		err     error
1464	)
1465	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1466	defer cancel()
1467	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements)
1468	defer cleanup()
1469
1470	if err = populate(ctx, client); err != nil {
1471		t.Fatal(err)
1472	}
1473	if client2, err = createClient(ctx, dbPath); err != nil {
1474		t.Fatal(err)
1475	}
1476	defer client2.Close()
1477
1478	// PartitionQuery
1479	var (
1480		txn        *BatchReadOnlyTransaction
1481		partitions []*Partition
1482		stmt       = Statement{SQL: "SELECT * FROM test;"}
1483	)
1484
1485	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1486		t.Fatal(err)
1487	}
1488	defer txn.Cleanup(ctx)
1489	if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil {
1490		t.Fatal(err)
1491	}
1492
1493	// Reconstruct BatchReadOnlyTransactionID and execute partitions
1494	var (
1495		tid2      BatchReadOnlyTransactionID
1496		data      []byte
1497		gotResult bool // if we get matching result from two separate txns
1498	)
1499	if data, err = txn.ID.MarshalBinary(); err != nil {
1500		t.Fatalf("encoding failed %v", err)
1501	}
1502	if err = tid2.UnmarshalBinary(data); err != nil {
1503		t.Fatalf("decoding failed %v", err)
1504	}
1505	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
1506
1507	// Execute Partitions and compare results
1508	for i, p := range partitions {
1509		iter := txn.Execute(ctx, p)
1510		defer iter.Stop()
1511		p2 := serdesPartition(t, i, p)
1512		iter2 := txn2.Execute(ctx, &p2)
1513		defer iter2.Stop()
1514
1515		row1, err1 := iter.Next()
1516		row2, err2 := iter2.Next()
1517		if err1 != err2 {
1518			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
1519			continue
1520		}
1521		if !testEqual(row1, row2) {
1522			t.Fatalf("execution returned different values: %v, %v", row1, row2)
1523			continue
1524		}
1525		if row1 == nil {
1526			continue
1527		}
1528		var a, b string
1529		if err = row1.Columns(&a, &b); err != nil {
1530			t.Fatalf("failed to parse row %v", err)
1531			continue
1532		}
1533		if a == str1 && b == str2 {
1534			gotResult = true
1535		}
1536	}
1537	if !gotResult {
1538		t.Fatalf("execution didn't return expected values")
1539	}
1540}
1541
1542// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery
1543func TestIntegration_BatchRead(t *testing.T) {
1544	// Set up testing environment.
1545	var (
1546		client2 *Client
1547		err     error
1548	)
1549	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1550	defer cancel()
1551	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements)
1552	defer cleanup()
1553
1554	if err = populate(ctx, client); err != nil {
1555		t.Fatal(err)
1556	}
1557	if client2, err = createClient(ctx, dbPath); err != nil {
1558		t.Fatal(err)
1559	}
1560	defer client2.Close()
1561
1562	// PartitionRead
1563	var (
1564		txn        *BatchReadOnlyTransaction
1565		partitions []*Partition
1566	)
1567
1568	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1569		t.Fatal(err)
1570	}
1571	defer txn.Cleanup(ctx)
1572	if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
1573		t.Fatal(err)
1574	}
1575
1576	// Reconstruct BatchReadOnlyTransactionID and execute partitions.
1577	var (
1578		tid2      BatchReadOnlyTransactionID
1579		data      []byte
1580		gotResult bool // if we get matching result from two separate txns
1581	)
1582	if data, err = txn.ID.MarshalBinary(); err != nil {
1583		t.Fatalf("encoding failed %v", err)
1584	}
1585	if err = tid2.UnmarshalBinary(data); err != nil {
1586		t.Fatalf("decoding failed %v", err)
1587	}
1588	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
1589
1590	// Execute Partitions and compare results.
1591	for i, p := range partitions {
1592		iter := txn.Execute(ctx, p)
1593		defer iter.Stop()
1594		p2 := serdesPartition(t, i, p)
1595		iter2 := txn2.Execute(ctx, &p2)
1596		defer iter2.Stop()
1597
1598		row1, err1 := iter.Next()
1599		row2, err2 := iter2.Next()
1600		if err1 != err2 {
1601			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
1602			continue
1603		}
1604		if !testEqual(row1, row2) {
1605			t.Fatalf("execution returned different values: %v, %v", row1, row2)
1606			continue
1607		}
1608		if row1 == nil {
1609			continue
1610		}
1611		var a, b string
1612		if err = row1.Columns(&a, &b); err != nil {
1613			t.Fatalf("failed to parse row %v", err)
1614			continue
1615		}
1616		if a == str1 && b == str2 {
1617			gotResult = true
1618		}
1619	}
1620	if !gotResult {
1621		t.Fatalf("execution didn't return expected values")
1622	}
1623}
1624
1625// Test normal txReadEnv method on BatchReadOnlyTransaction.
1626func TestIntegration_BROTNormal(t *testing.T) {
1627	// Set up testing environment and create txn.
1628	var (
1629		txn *BatchReadOnlyTransaction
1630		err error
1631		row *Row
1632		i   int64
1633	)
1634	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1635	defer cancel()
1636	client, _, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements)
1637	defer cleanup()
1638
1639	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1640		t.Fatal(err)
1641	}
1642	defer txn.Cleanup(ctx)
1643	if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
1644		t.Fatal(err)
1645	}
1646	// Normal query should work with BatchReadOnlyTransaction.
1647	stmt2 := Statement{SQL: "SELECT 1"}
1648	iter := txn.Query(ctx, stmt2)
1649	defer iter.Stop()
1650
1651	row, err = iter.Next()
1652	if err != nil {
1653		t.Errorf("query failed with %v", err)
1654	}
1655	if err = row.Columns(&i); err != nil {
1656		t.Errorf("failed to parse row %v", err)
1657	}
1658}
1659
1660func TestIntegration_CommitTimestamp(t *testing.T) {
1661	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1662	defer cancel()
1663	client, _, cleanup := prepareIntegrationTest(ctx, t, ctsDBStatements)
1664	defer cleanup()
1665
1666	type testTableRow struct {
1667		Key string
1668		Ts  NullTime
1669	}
1670
1671	var (
1672		cts1, cts2, ts1, ts2 time.Time
1673		err                  error
1674	)
1675
1676	// Apply mutation in sequence, expect to see commit timestamp in good order,
1677	// check also the commit timestamp returned
1678	for _, it := range []struct {
1679		k string
1680		t *time.Time
1681	}{
1682		{"a", &cts1},
1683		{"b", &cts2},
1684	} {
1685		tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}}
1686		m, err := InsertStruct("TestTable", tt)
1687		if err != nil {
1688			t.Fatal(err)
1689		}
1690		*it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce())
1691		if err != nil {
1692			t.Fatal(err)
1693		}
1694	}
1695
1696	txn := client.ReadOnlyTransaction()
1697	for _, it := range []struct {
1698		k string
1699		t *time.Time
1700	}{
1701		{"a", &ts1},
1702		{"b", &ts2},
1703	} {
1704		if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil {
1705			t.Fatal(err)
1706		} else {
1707			var got testTableRow
1708			if err := r.ToStruct(&got); err != nil {
1709				t.Fatal(err)
1710			}
1711			*it.t = got.Ts.Time
1712		}
1713	}
1714	if !cts1.Equal(ts1) {
1715		t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1)
1716	}
1717	if !cts2.Equal(ts2) {
1718		t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2)
1719	}
1720
1721	// Try writing a timestamp in the future to commit timestamp, expect error.
1722	_, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce())
1723	if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok {
1724		t.Error(msg)
1725	}
1726}
1727
1728func TestIntegration_DML(t *testing.T) {
1729	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1730	defer cancel()
1731	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
1732	defer cleanup()
1733
1734	// Function that reads a single row's first name from within a transaction.
1735	readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) {
1736		row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"})
1737		if err != nil {
1738			return "", err
1739		}
1740		var fn string
1741		if err := row.Column(0, &fn); err != nil {
1742			return "", err
1743		}
1744		return fn, nil
1745	}
1746
1747	// Function that reads multiple rows' first names from outside a read/write
1748	// transaction.
1749	readFirstNames := func(keys ...int) []string {
1750		var ks []KeySet
1751		for _, k := range keys {
1752			ks = append(ks, Key{k})
1753		}
1754		iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"})
1755		var got []string
1756		var fn string
1757		err := iter.Do(func(row *Row) error {
1758			if err := row.Column(0, &fn); err != nil {
1759				return err
1760			}
1761			got = append(got, fn)
1762			return nil
1763		})
1764		if err != nil {
1765			t.Fatalf("readFirstNames(%v): %v", keys, err)
1766		}
1767		return got
1768	}
1769
1770	// Use ReadWriteTransaction.Query to execute a DML statement.
1771	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1772		iter := tx.Query(ctx, Statement{
1773			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`,
1774		})
1775		defer iter.Stop()
1776		if row, err := iter.Next(); err != iterator.Done {
1777			t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err)
1778		}
1779		if iter.RowCount != 1 {
1780			t.Errorf("row count: got %d, want 1", iter.RowCount)
1781		}
1782		// The results of the DML statement should be visible to the transaction.
1783		got, err := readFirstName(tx, 1)
1784		if err != nil {
1785			return err
1786		}
1787		if want := "Umm"; got != want {
1788			t.Errorf("got %q, want %q", got, want)
1789		}
1790		return nil
1791	})
1792	if err != nil {
1793		t.Fatal(err)
1794	}
1795
1796	// Use ReadWriteTransaction.Update to execute a DML statement.
1797	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1798		count, err := tx.Update(ctx, Statement{
1799			SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`,
1800		})
1801		if err != nil {
1802			t.Fatal(err)
1803		}
1804		if count != 1 {
1805			t.Errorf("row count: got %d, want 1", count)
1806		}
1807		got, err := readFirstName(tx, 2)
1808		if err != nil {
1809			return err
1810		}
1811		if want := "Eduard"; got != want {
1812			t.Errorf("got %q, want %q", got, want)
1813		}
1814		return nil
1815
1816	})
1817	if err != nil {
1818		t.Fatal(err)
1819	}
1820
1821	// Roll back a DML statement and confirm that it didn't happen.
1822	var fail = errors.New("fail")
1823	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1824		_, err := tx.Update(ctx, Statement{
1825			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
1826		})
1827		if err != nil {
1828			return err
1829		}
1830		return fail
1831	})
1832	if err != fail {
1833		t.Fatalf("rolling back: got error %v, want the error 'fail'", err)
1834	}
1835	_, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"})
1836	if got, want := ErrCode(err), codes.NotFound; got != want {
1837		t.Errorf("got %s, want %s", got, want)
1838	}
1839
1840	// Run two DML statements in the same transaction.
1841	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1842		_, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`})
1843		if err != nil {
1844			return err
1845		}
1846		_, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`})
1847		if err != nil {
1848			return err
1849		}
1850		return nil
1851	})
1852	if err != nil {
1853		t.Fatal(err)
1854	}
1855	got := readFirstNames(1, 2)
1856	want := []string{"Oum", "Eddie"}
1857	if !testEqual(got, want) {
1858		t.Errorf("got %v, want %v", got, want)
1859	}
1860
1861	// Run a DML statement and an ordinary mutation in the same transaction.
1862	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1863		_, err := tx.Update(ctx, Statement{
1864			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
1865		})
1866		if err != nil {
1867			return err
1868		}
1869		return tx.BufferWrite([]*Mutation{
1870			Insert("Singers", []string{"SingerId", "FirstName", "LastName"},
1871				[]interface{}{4, "Andy", "Irvine"}),
1872		})
1873	})
1874	if err != nil {
1875		t.Fatal(err)
1876	}
1877	got = readFirstNames(3, 4)
1878	want = []string{"Audra", "Andy"}
1879	if !testEqual(got, want) {
1880		t.Errorf("got %v, want %v", got, want)
1881	}
1882
1883	// Attempt to run a query using update.
1884	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1885		_, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`})
1886		return err
1887	})
1888	if got, want := ErrCode(err), codes.InvalidArgument; got != want {
1889		t.Errorf("got %s, want %s", got, want)
1890	}
1891}
1892
1893func TestIntegration_StructParametersBind(t *testing.T) {
1894	t.Parallel()
1895	ctx := context.Background()
1896	client, _, cleanup := prepareIntegrationTest(ctx, t, nil)
1897	defer cleanup()
1898
1899	type tRow []interface{}
1900	type tRows []struct{ trow tRow }
1901
1902	type allFields struct {
1903		Stringf string
1904		Intf    int
1905		Boolf   bool
1906		Floatf  float64
1907		Bytef   []byte
1908		Timef   time.Time
1909		Datef   civil.Date
1910	}
1911	allColumns := []string{
1912		"Stringf",
1913		"Intf",
1914		"Boolf",
1915		"Floatf",
1916		"Bytef",
1917		"Timef",
1918		"Datef",
1919	}
1920	s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1}
1921	s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2}
1922
1923	dynamicStructType := reflect.StructOf([]reflect.StructField{
1924		{Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`},
1925	})
1926	s3 := reflect.New(dynamicStructType)
1927	s3.Elem().Field(0).Set(reflect.ValueOf(t1))
1928
1929	for i, test := range []struct {
1930		param interface{}
1931		sql   string
1932		cols  []string
1933		trows tRows
1934	}{
1935		// Struct value.
1936		{
1937			s1,
1938			"SELECT" +
1939				" @p.Stringf," +
1940				" @p.Intf," +
1941				" @p.Boolf," +
1942				" @p.Floatf," +
1943				" @p.Bytef," +
1944				" @p.Timef," +
1945				" @p.Datef",
1946			allColumns,
1947			tRows{
1948				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
1949			},
1950		},
1951		// Array of struct value.
1952		{
1953			[]allFields{s1, s2},
1954			"SELECT * FROM UNNEST(@p)",
1955			allColumns,
1956			tRows{
1957				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
1958				{tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}},
1959			},
1960		},
1961		// Null struct.
1962		{
1963			(*allFields)(nil),
1964			"SELECT @p IS NULL",
1965			[]string{""},
1966			tRows{
1967				{tRow{true}},
1968			},
1969		},
1970		// Null Array of struct.
1971		{
1972			[]allFields(nil),
1973			"SELECT @p IS NULL",
1974			[]string{""},
1975			tRows{
1976				{tRow{true}},
1977			},
1978		},
1979		// Empty struct.
1980		{
1981			struct{}{},
1982			"SELECT @p IS NULL ",
1983			[]string{""},
1984			tRows{
1985				{tRow{false}},
1986			},
1987		},
1988		// Empty array of struct.
1989		{
1990			[]allFields{},
1991			"SELECT * FROM UNNEST(@p) ",
1992			allColumns,
1993			tRows{},
1994		},
1995		// Struct with duplicate fields.
1996		{
1997			struct {
1998				A int `spanner:"field"`
1999				B int `spanner:"field"`
2000			}{10, 20},
2001			"SELECT * FROM UNNEST([@p]) ",
2002			[]string{"field", "field"},
2003			tRows{
2004				{tRow{10, 20}},
2005			},
2006		},
2007		// Struct with unnamed fields.
2008		{
2009			struct {
2010				A string `spanner:""`
2011			}{"hello"},
2012			"SELECT * FROM UNNEST([@p]) ",
2013			[]string{""},
2014			tRows{
2015				{tRow{"hello"}},
2016			},
2017		},
2018		// Mixed struct.
2019		{
2020			struct {
2021				DynamicStructField interface{}  `spanner:"f1"`
2022				ArrayStructField   []*allFields `spanner:"f2"`
2023			}{
2024				DynamicStructField: s3.Interface(),
2025				ArrayStructField:   []*allFields{nil},
2026			},
2027			"SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ",
2028			[]string{"ff1", "", ""},
2029			tRows{
2030				{tRow{t1, 1, true}},
2031			},
2032		},
2033	} {
2034		iter := client.Single().Query(ctx, Statement{
2035			SQL:    test.sql,
2036			Params: map[string]interface{}{"p": test.param},
2037		})
2038		var gotRows []*Row
2039		err := iter.Do(func(r *Row) error {
2040			gotRows = append(gotRows, r)
2041			return nil
2042		})
2043		if err != nil {
2044			t.Errorf("Failed to execute test case %d, error: %v", i, err)
2045		}
2046
2047		var wantRows []*Row
2048		for j, row := range test.trows {
2049			r, err := NewRow(test.cols, row.trow)
2050			if err != nil {
2051				t.Errorf("Invalid row %d in test case %d", j, i)
2052			}
2053			wantRows = append(wantRows, r)
2054		}
2055		if !testEqual(gotRows, wantRows) {
2056			t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows)
2057		}
2058	}
2059}
2060
2061func TestIntegration_PDML(t *testing.T) {
2062	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2063	defer cancel()
2064	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
2065	defer cleanup()
2066
2067	columns := []string{"SingerId", "FirstName", "LastName"}
2068
2069	// Populate the Singers table.
2070	var muts []*Mutation
2071	for _, row := range [][]interface{}{
2072		{1, "Umm", "Kulthum"},
2073		{2, "Eduard", "Khil"},
2074		{3, "Audra", "McDonald"},
2075	} {
2076		muts = append(muts, Insert("Singers", columns, row))
2077	}
2078	if _, err := client.Apply(ctx, muts); err != nil {
2079		t.Fatal(err)
2080	}
2081	// Identifiers in PDML statements must be fully qualified.
2082	// TODO(jba): revisit the above.
2083	count, err := client.PartitionedUpdate(ctx, Statement{
2084		SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= 3`,
2085	})
2086	if err != nil {
2087		t.Fatal(err)
2088	}
2089	if want := int64(3); count != want {
2090		t.Errorf("got %d, want %d", count, want)
2091	}
2092	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2093	if err != nil {
2094		t.Fatal(err)
2095	}
2096	want := [][]interface{}{
2097		{int64(1), "changed", "Kulthum"},
2098		{int64(2), "changed", "Khil"},
2099		{int64(3), "changed", "McDonald"},
2100	}
2101	if !testEqual(got, want) {
2102		t.Errorf("\ngot %v\nwant%v", got, want)
2103	}
2104}
2105
2106func TestBatchDML(t *testing.T) {
2107	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2108	defer cancel()
2109	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
2110	defer cleanup()
2111
2112	columns := []string{"SingerId", "FirstName", "LastName"}
2113
2114	// Populate the Singers table.
2115	var muts []*Mutation
2116	for _, row := range [][]interface{}{
2117		{1, "Umm", "Kulthum"},
2118		{2, "Eduard", "Khil"},
2119		{3, "Audra", "McDonald"},
2120	} {
2121		muts = append(muts, Insert("Singers", columns, row))
2122	}
2123	if _, err := client.Apply(ctx, muts); err != nil {
2124		t.Fatal(err)
2125	}
2126
2127	var counts []int64
2128	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2129		counts, err = tx.BatchUpdate(ctx, []Statement{
2130			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2131			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2132			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2133		})
2134		return err
2135	})
2136
2137	if err != nil {
2138		t.Fatal(err)
2139	}
2140	if want := []int64{1, 1, 1}; !testEqual(counts, want) {
2141		t.Fatalf("got %d, want %d", counts, want)
2142	}
2143	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2144	if err != nil {
2145		t.Fatal(err)
2146	}
2147	want := [][]interface{}{
2148		{int64(1), "changed 1", "Kulthum"},
2149		{int64(2), "changed 2", "Khil"},
2150		{int64(3), "changed 3", "McDonald"},
2151	}
2152	if !testEqual(got, want) {
2153		t.Errorf("\ngot %v\nwant%v", got, want)
2154	}
2155}
2156
2157func TestBatchDML_NoStatements(t *testing.T) {
2158	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2159	defer cancel()
2160	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
2161	defer cleanup()
2162
2163	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2164		_, err = tx.BatchUpdate(ctx, []Statement{})
2165		return err
2166	})
2167	if err == nil {
2168		t.Fatal("expected error, got nil")
2169	}
2170	if s, ok := status.FromError(err); ok {
2171		if s.Code() != codes.InvalidArgument {
2172			t.Fatalf("expected InvalidArgument, got %v", err)
2173		}
2174	} else {
2175		t.Fatalf("expected InvalidArgument, got %v", err)
2176	}
2177}
2178
2179func TestBatchDML_TwoStatements(t *testing.T) {
2180	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2181	defer cancel()
2182	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
2183	defer cleanup()
2184
2185	columns := []string{"SingerId", "FirstName", "LastName"}
2186
2187	// Populate the Singers table.
2188	var muts []*Mutation
2189	for _, row := range [][]interface{}{
2190		{1, "Umm", "Kulthum"},
2191		{2, "Eduard", "Khil"},
2192		{3, "Audra", "McDonald"},
2193	} {
2194		muts = append(muts, Insert("Singers", columns, row))
2195	}
2196	if _, err := client.Apply(ctx, muts); err != nil {
2197		t.Fatal(err)
2198	}
2199
2200	var updateCount int64
2201	var batchCounts []int64
2202	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2203		batchCounts, err = tx.BatchUpdate(ctx, []Statement{
2204			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2205			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2206			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2207		})
2208		if err != nil {
2209			return err
2210		}
2211
2212		updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`})
2213		return err
2214	})
2215	if err != nil {
2216		t.Fatal(err)
2217	}
2218	if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) {
2219		t.Fatalf("got %d, want %d", batchCounts, want)
2220	}
2221	if updateCount != 1 {
2222		t.Fatalf("got %v, want 1", updateCount)
2223	}
2224}
2225
2226// TODO(deklerk): this currently does not work because the transaction appears to
2227// get rolled back after a single statement fails. b/120158761
2228func TestBatchDML_Error(t *testing.T) {
2229	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2230	defer cancel()
2231	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
2232	defer cleanup()
2233
2234	columns := []string{"SingerId", "FirstName", "LastName"}
2235
2236	// Populate the Singers table.
2237	var muts []*Mutation
2238	for _, row := range [][]interface{}{
2239		{1, "Umm", "Kulthum"},
2240		{2, "Eduard", "Khil"},
2241		{3, "Audra", "McDonald"},
2242	} {
2243		muts = append(muts, Insert("Singers", columns, row))
2244	}
2245	if _, err := client.Apply(ctx, muts); err != nil {
2246		t.Fatal(err)
2247	}
2248
2249	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2250		counts, err := tx.BatchUpdate(ctx, []Statement{
2251			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2252			{SQL: `some illegal statement`},
2253			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2254		})
2255		if err == nil {
2256			t.Fatal("expected err, got nil")
2257		}
2258		if want := []int64{1}; !testEqual(counts, want) {
2259			t.Fatalf("got %d, want %d", counts, want)
2260		}
2261
2262		got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns))
2263		if err != nil {
2264			t.Fatal(err)
2265		}
2266		want := [][]interface{}{
2267			{int64(1), "changed 1", "Kulthum"},
2268			{int64(2), "Eduard", "Khil"},
2269			{int64(3), "Audra", "McDonald"},
2270		}
2271		if !testEqual(got, want) {
2272			t.Errorf("\ngot %v\nwant%v", got, want)
2273		}
2274
2275		return nil
2276	})
2277	if err != nil {
2278		t.Fatal(err)
2279	}
2280}
2281
2282// Prepare initializes Cloud Spanner testing DB and clients.
2283func prepareIntegrationTest(ctx context.Context, t *testing.T, statements []string) (*Client, string, func()) {
2284	if admin == nil {
2285		t.Skip("Integration tests skipped")
2286	}
2287	// Construct a unique test DB name.
2288	dbName := dbNameSpace.New()
2289
2290	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName)
2291	// Create database and tables.
2292	op, err := admin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{
2293		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
2294		CreateStatement: "CREATE DATABASE " + dbName,
2295		ExtraStatements: statements,
2296	})
2297	if err != nil {
2298		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
2299	}
2300	if _, err := op.Wait(ctx); err != nil {
2301		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
2302	}
2303	client, err := createClient(ctx, dbPath)
2304	if err != nil {
2305		t.Fatalf("cannot create data client on DB %v: %v", dbPath, err)
2306	}
2307	return client, dbPath, func() {
2308		client.Close()
2309		if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil {
2310			t.Logf("failed to drop database %s (error %v), might need a manual removal",
2311				dbPath, err)
2312		}
2313	}
2314}
2315
2316func cleanupDatabases() {
2317	if admin == nil {
2318		// Integration tests skipped.
2319		return
2320	}
2321
2322	ctx := context.Background()
2323	dbsParent := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID)
2324	dbsIter := admin.ListDatabases(ctx, &adminpb.ListDatabasesRequest{Parent: dbsParent})
2325	expireAge := 24 * time.Hour
2326
2327	for {
2328		db, err := dbsIter.Next()
2329		if err == iterator.Done {
2330			break
2331		}
2332		if err != nil {
2333			panic(err)
2334		}
2335		// TODO(deklerk): When we have the ability to programmatically create
2336		// instances, we can create an instance with uid.New and delete all
2337		// tables in it. For now, we rely on matching prefixes.
2338		if dbNameSpace.Older(db.Name, expireAge) {
2339			log.Printf("Dropping database %s", db.Name)
2340
2341			if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: db.Name}); err != nil {
2342				log.Printf("failed to drop database %s (error %v), might need a manual removal",
2343					db.Name, err)
2344			}
2345		}
2346	}
2347}
2348
2349func rangeReads(ctx context.Context, t *testing.T, client *Client) {
2350	checkRange := func(ks KeySet, wantNums ...int) {
2351		if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok {
2352			t.Errorf("key set %+v: %s", ks, msg)
2353		}
2354	}
2355
2356	checkRange(Key{"k1"}, 1)
2357	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4)
2358	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5)
2359	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5)
2360	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4)
2361
2362	// Partial key specification.
2363	checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9)
2364	checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9)
2365	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10)
2366	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11)
2367
2368	// The following produce empty ranges.
2369	// TODO(jba): Consider a multi-part key to illustrate partial key behavior.
2370	// checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen})
2371	// checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen})
2372	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen})
2373	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed})
2374
2375	// Prefix is component-wise, not string prefix.
2376	checkRange(Key{"k1"}.AsPrefix(), 1)
2377	checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
2378
2379	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
2380}
2381
2382func indexRangeReads(ctx context.Context, t *testing.T, client *Client) {
2383	checkRange := func(ks KeySet, wantNums ...int) {
2384		if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns),
2385			wantNums); !ok {
2386			t.Errorf("key set %+v: %s", ks, msg)
2387		}
2388	}
2389
2390	checkRange(Key{"v1"}, 1)
2391	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4)
2392	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5)
2393	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5)
2394	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4)
2395
2396	// // Partial key specification.
2397	checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9)
2398	checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9)
2399	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10)
2400	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11)
2401
2402	// // The following produce empty ranges.
2403	// checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen})
2404	// checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen})
2405	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen})
2406	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed})
2407
2408	// // Prefix is component-wise, not string prefix.
2409	checkRange(Key{"v1"}.AsPrefix(), 1)
2410	checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
2411	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
2412
2413	// Read from an index with DESC ordering.
2414	wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}
2415	if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns),
2416		wantNums); !ok {
2417		t.Errorf("desc: %s", msg)
2418	}
2419}
2420
2421type testTableRow struct{ Key, StringValue string }
2422
2423func compareRows(iter *RowIterator, wantNums []int) (string, bool) {
2424	rows, err := readAllTestTable(iter)
2425	if err != nil {
2426		return err.Error(), false
2427	}
2428	want := map[string]string{}
2429	for _, n := range wantNums {
2430		want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n)
2431	}
2432	got := map[string]string{}
2433	for _, r := range rows {
2434		got[r.Key] = r.StringValue
2435	}
2436	if !testEqual(got, want) {
2437		return fmt.Sprintf("got %v, want %v", got, want), false
2438	}
2439	return "", true
2440}
2441
2442func isNaN(x interface{}) bool {
2443	f, ok := x.(float64)
2444	if !ok {
2445		return false
2446	}
2447	return math.IsNaN(f)
2448}
2449
2450// createClient creates Cloud Spanner data client.
2451func createClient(ctx context.Context, dbPath string) (client *Client, err error) {
2452	client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{
2453		SessionPoolConfig: SessionPoolConfig{WriteSessions: 0.2},
2454	}, option.WithTokenSource(testutil.TokenSource(ctx, Scope)), option.WithEndpoint(endpoint))
2455	if err != nil {
2456		return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
2457	}
2458	return client, nil
2459}
2460
2461// populate prepares the database with some data.
2462func populate(ctx context.Context, client *Client) error {
2463	// Populate data
2464	var err error
2465	m := InsertMap("test", map[string]interface{}{
2466		"a": str1,
2467		"b": str2,
2468	})
2469	_, err = client.Apply(ctx, []*Mutation{m})
2470	return err
2471}
2472
2473func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) {
2474	if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) {
2475		return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false
2476	}
2477	return "", true
2478}
2479
2480func rowToValues(r *Row) ([]interface{}, error) {
2481	var x int64
2482	var y, z string
2483	if err := r.Column(0, &x); err != nil {
2484		return nil, err
2485	}
2486	if err := r.Column(1, &y); err != nil {
2487		return nil, err
2488	}
2489	if err := r.Column(2, &z); err != nil {
2490		return nil, err
2491	}
2492	return []interface{}{x, y, z}, nil
2493}
2494
2495func readAll(iter *RowIterator) ([][]interface{}, error) {
2496	defer iter.Stop()
2497	var vals [][]interface{}
2498	for {
2499		row, err := iter.Next()
2500		if err == iterator.Done {
2501			return vals, nil
2502		}
2503		if err != nil {
2504			return nil, err
2505		}
2506		v, err := rowToValues(row)
2507		if err != nil {
2508			return nil, err
2509		}
2510		vals = append(vals, v)
2511	}
2512}
2513
2514func readAllTestTable(iter *RowIterator) ([]testTableRow, error) {
2515	defer iter.Stop()
2516	var vals []testTableRow
2517	for {
2518		row, err := iter.Next()
2519		if err == iterator.Done {
2520			return vals, nil
2521		}
2522		if err != nil {
2523			return nil, err
2524		}
2525		var ttr testTableRow
2526		if err := row.ToStruct(&ttr); err != nil {
2527			return nil, err
2528		}
2529		vals = append(vals, ttr)
2530	}
2531}
2532