1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"errors"
22	"flag"
23	"fmt"
24	"log"
25	"math"
26	"os"
27	"reflect"
28	"strings"
29	"sync"
30	"testing"
31	"time"
32
33	"cloud.google.com/go/civil"
34	"cloud.google.com/go/internal/testutil"
35	"cloud.google.com/go/internal/uid"
36	database "cloud.google.com/go/spanner/admin/database/apiv1"
37	"google.golang.org/api/iterator"
38	"google.golang.org/api/option"
39	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
40	"google.golang.org/grpc/codes"
41	"google.golang.org/grpc/status"
42)
43
44var (
45	// testProjectID specifies the project used for testing.
46	// It can be changed by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID.
47	testProjectID = testutil.ProjID()
48
49	dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
50
51	// TODO(deklerk) When we can programmatically create instances, we should use
52	// uid.New as the test instance name.
53	// testInstanceID specifies the Cloud Spanner instance used for testing.
54	testInstanceID = "go-integration-test"
55
56	testTable        = "TestTable"
57	testTableIndex   = "TestTableByValue"
58	testTableColumns = []string{"Key", "StringValue"}
59
60	// admin is a spanner.DatabaseAdminClient.
61	admin *database.DatabaseAdminClient
62
63	singerDBStatements = []string{
64		`CREATE TABLE Singers (
65				SingerId	INT64 NOT NULL,
66				FirstName	STRING(1024),
67				LastName	STRING(1024),
68				SingerInfo	BYTES(MAX)
69			) PRIMARY KEY (SingerId)`,
70		`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
71		`CREATE TABLE Accounts (
72				AccountId	INT64 NOT NULL,
73				Nickname	STRING(100),
74				Balance		INT64 NOT NULL,
75			) PRIMARY KEY (AccountId)`,
76		`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
77		`CREATE TABLE Types (
78				RowID		INT64 NOT NULL,
79				String		STRING(MAX),
80				StringArray	ARRAY<STRING(MAX)>,
81				Bytes		BYTES(MAX),
82				BytesArray	ARRAY<BYTES(MAX)>,
83				Int64a		INT64,
84				Int64Array	ARRAY<INT64>,
85				Bool		BOOL,
86				BoolArray	ARRAY<BOOL>,
87				Float64		FLOAT64,
88				Float64Array	ARRAY<FLOAT64>,
89				Date		DATE,
90				DateArray	ARRAY<DATE>,
91				Timestamp	TIMESTAMP,
92				TimestampArray	ARRAY<TIMESTAMP>,
93			) PRIMARY KEY (RowID)`,
94	}
95
96	readDBStatements = []string{
97		`CREATE TABLE TestTable (
98                    Key          STRING(MAX) NOT NULL,
99                    StringValue  STRING(MAX)
100            ) PRIMARY KEY (Key)`,
101		`CREATE INDEX TestTableByValue ON TestTable(StringValue)`,
102		`CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`,
103	}
104
105	simpleDBStatements = []string{
106		`CREATE TABLE test (
107				a	STRING(1024),
108				b	STRING(1024),
109			) PRIMARY KEY (a)`,
110	}
111	simpleDBTableColumns = []string{"a", "b"}
112
113	ctsDBStatements = []string{
114		`CREATE TABLE TestTable (
115		    Key  STRING(MAX) NOT NULL,
116		    Ts   TIMESTAMP OPTIONS (allow_commit_timestamp = true),
117	    ) PRIMARY KEY (Key)`,
118	}
119)
120
121const (
122	str1 = "alice"
123	str2 = "a@example.com"
124)
125
126func TestMain(m *testing.M) {
127	cleanup := initIntegrationTests()
128	res := m.Run()
129	cleanup()
130	os.Exit(res)
131}
132
133func initIntegrationTests() func() {
134	ctx := context.Background()
135	flag.Parse() // needed for testing.Short()
136	noop := func() {}
137
138	if testing.Short() {
139		log.Println("Integration tests skipped in -short mode.")
140		return noop
141	}
142
143	if testProjectID == "" {
144		log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing")
145		return noop
146	}
147
148	ts := testutil.TokenSource(ctx, AdminScope, Scope)
149	if ts == nil {
150		log.Printf("Integration test skipped: cannot get service account credential from environment variable %v", "GCLOUD_TESTS_GOLANG_KEY")
151		return noop
152	}
153	var err error
154
155	// Create Admin client and Data client.
156	admin, err = database.NewDatabaseAdminClient(ctx, option.WithTokenSource(ts), option.WithEndpoint(endpoint))
157	if err != nil {
158		log.Fatalf("cannot create admin client: %v", err)
159	}
160
161	return func() {
162		cleanupDatabases()
163		admin.Close()
164	}
165}
166
167// Test SingleUse transaction.
168func TestIntegration_SingleUse(t *testing.T) {
169	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
170	defer cancel()
171	// Set up testing environment.
172	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
173	defer cleanup()
174
175	writes := []struct {
176		row []interface{}
177		ts  time.Time
178	}{
179		{row: []interface{}{1, "Marc", "Foo"}},
180		{row: []interface{}{2, "Tars", "Bar"}},
181		{row: []interface{}{3, "Alpha", "Beta"}},
182		{row: []interface{}{4, "Last", "End"}},
183	}
184	// Try to write four rows through the Apply API.
185	for i, w := range writes {
186		var err error
187		m := InsertOrUpdate("Singers",
188			[]string{"SingerId", "FirstName", "LastName"},
189			w.row)
190		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
191			t.Fatal(err)
192		}
193	}
194
195	// For testing timestamp bound staleness.
196	<-time.After(time.Second)
197
198	// Test reading rows with different timestamp bounds.
199	for i, test := range []struct {
200		want    [][]interface{}
201		tb      TimestampBound
202		checkTs func(time.Time) error
203	}{
204		{
205			// strong
206			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
207			StrongRead(),
208			func(ts time.Time) error {
209				// writes[3] is the last write, all subsequent strong read should have a timestamp larger than that.
210				if ts.Before(writes[3].ts) {
211					return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
212				}
213				return nil
214			},
215		},
216		{
217			// min_read_timestamp
218			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
219			MinReadTimestamp(writes[3].ts),
220			func(ts time.Time) error {
221				if ts.Before(writes[3].ts) {
222					return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
223				}
224				return nil
225			},
226		},
227		{
228			// max_staleness
229			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
230			MaxStaleness(time.Second),
231			func(ts time.Time) error {
232				if ts.Before(writes[3].ts) {
233					return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
234				}
235				return nil
236			},
237		},
238		{
239			// read_timestamp
240			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
241			ReadTimestamp(writes[2].ts),
242			func(ts time.Time) error {
243				if ts != writes[2].ts {
244					return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts)
245				}
246				return nil
247			},
248		},
249		{
250			// exact_staleness
251			nil,
252			// Specify a staleness which should be already before this test because
253			// context timeout is set to be 10s.
254			ExactStaleness(11 * time.Second),
255			func(ts time.Time) error {
256				if ts.After(writes[0].ts) {
257					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts)
258				}
259				return nil
260			},
261		},
262	} {
263		// SingleUse.Query
264		su := client.Single().WithTimestampBound(test.tb)
265		got, err := readAll(su.Query(
266			ctx,
267			Statement{
268				"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
269				map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
270			}))
271		if err != nil {
272			t.Errorf("%d: SingleUse.Query returns error %v, want nil", i, err)
273		}
274		if !testEqual(got, test.want) {
275			t.Errorf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want)
276		}
277		rts, err := su.Timestamp()
278		if err != nil {
279			t.Errorf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err)
280		}
281		if err := test.checkTs(rts); err != nil {
282			t.Errorf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err)
283		}
284		// SingleUse.Read
285		su = client.Single().WithTimestampBound(test.tb)
286		got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
287		if err != nil {
288			t.Errorf("%d: SingleUse.Read returns error %v, want nil", i, err)
289		}
290		if !testEqual(got, test.want) {
291			t.Errorf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want)
292		}
293		rts, err = su.Timestamp()
294		if err != nil {
295			t.Errorf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err)
296		}
297		if err := test.checkTs(rts); err != nil {
298			t.Errorf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err)
299		}
300		// SingleUse.ReadRow
301		got = nil
302		for _, k := range []Key{{1}, {3}, {4}} {
303			su = client.Single().WithTimestampBound(test.tb)
304			r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
305			if err != nil {
306				continue
307			}
308			v, err := rowToValues(r)
309			if err != nil {
310				continue
311			}
312			got = append(got, v)
313			rts, err = su.Timestamp()
314			if err != nil {
315				t.Errorf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
316			}
317			if err := test.checkTs(rts); err != nil {
318				t.Errorf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
319			}
320		}
321		if !testEqual(got, test.want) {
322			t.Errorf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want)
323		}
324		// SingleUse.ReadUsingIndex
325		su = client.Single().WithTimestampBound(test.tb)
326		got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
327		if err != nil {
328			t.Errorf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err)
329		}
330		// The results from ReadUsingIndex is sorted by the index rather than primary key.
331		if len(got) != len(test.want) {
332			t.Errorf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
333		}
334		for j, g := range got {
335			if j > 0 {
336				prev := got[j-1][1].(string) + got[j-1][2].(string)
337				curr := got[j][1].(string) + got[j][2].(string)
338				if strings.Compare(prev, curr) > 0 {
339					t.Errorf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
340				}
341			}
342			found := false
343			for _, w := range test.want {
344				if testEqual(g, w) {
345					found = true
346				}
347			}
348			if !found {
349				t.Errorf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
350				break
351			}
352		}
353		rts, err = su.Timestamp()
354		if err != nil {
355			t.Errorf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
356		}
357		if err := test.checkTs(rts); err != nil {
358			t.Errorf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
359		}
360	}
361
362	// Reading with limit.
363	su := client.Single()
364	const limit = 1
365	gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}),
366		[]string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit}))
367	if err != nil {
368		t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err)
369	}
370	if got, want := len(gotRows), limit; got != want {
371		t.Errorf("got %d, want %d", got, want)
372	}
373
374}
375
376// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it
377// also tests for a single timestamp across multiple reads.
378func TestIntegration_ReadOnlyTransaction(t *testing.T) {
379	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
380	defer cancel()
381	// Set up testing environment.
382	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
383	defer cleanup()
384
385	writes := []struct {
386		row []interface{}
387		ts  time.Time
388	}{
389		{row: []interface{}{1, "Marc", "Foo"}},
390		{row: []interface{}{2, "Tars", "Bar"}},
391		{row: []interface{}{3, "Alpha", "Beta"}},
392		{row: []interface{}{4, "Last", "End"}},
393	}
394	// Try to write four rows through the Apply API.
395	for i, w := range writes {
396		var err error
397		m := InsertOrUpdate("Singers",
398			[]string{"SingerId", "FirstName", "LastName"},
399			w.row)
400		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
401			t.Fatal(err)
402		}
403	}
404
405	// For testing timestamp bound staleness.
406	<-time.After(time.Second)
407
408	// Test reading rows with different timestamp bounds.
409	for i, test := range []struct {
410		want    [][]interface{}
411		tb      TimestampBound
412		checkTs func(time.Time) error
413	}{
414		// Note: min_read_timestamp and max_staleness are not supported by ReadOnlyTransaction. See
415		// API document for more details.
416		{
417			// strong
418			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
419			StrongRead(),
420			func(ts time.Time) error {
421				if ts.Before(writes[3].ts) {
422					return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
423				}
424				return nil
425			},
426		},
427		{
428			// read_timestamp
429			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
430			ReadTimestamp(writes[2].ts),
431			func(ts time.Time) error {
432				if ts != writes[2].ts {
433					return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts)
434				}
435				return nil
436			},
437		},
438		{
439			// exact_staleness
440			nil,
441			// Specify a staleness which should be already before this test because
442			// context timeout is set to be 10s.
443			ExactStaleness(11 * time.Second),
444			func(ts time.Time) error {
445				if ts.After(writes[0].ts) {
446					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts)
447				}
448				return nil
449			},
450		},
451	} {
452		// ReadOnlyTransaction.Query
453		ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb)
454		got, err := readAll(ro.Query(
455			ctx,
456			Statement{
457				"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
458				map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
459			}))
460		if err != nil {
461			t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err)
462		}
463		if !testEqual(got, test.want) {
464			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want)
465		}
466		rts, err := ro.Timestamp()
467		if err != nil {
468			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err)
469		}
470		if err := test.checkTs(rts); err != nil {
471			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err)
472		}
473		roTs := rts
474		// ReadOnlyTransaction.Read
475		got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
476		if err != nil {
477			t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err)
478		}
479		if !testEqual(got, test.want) {
480			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want)
481		}
482		rts, err = ro.Timestamp()
483		if err != nil {
484			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err)
485		}
486		if err := test.checkTs(rts); err != nil {
487			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err)
488		}
489		if roTs != rts {
490			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
491		}
492		// ReadOnlyTransaction.ReadRow
493		got = nil
494		for _, k := range []Key{{1}, {3}, {4}} {
495			r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
496			if err != nil {
497				continue
498			}
499			v, err := rowToValues(r)
500			if err != nil {
501				continue
502			}
503			got = append(got, v)
504			rts, err = ro.Timestamp()
505			if err != nil {
506				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
507			}
508			if err := test.checkTs(rts); err != nil {
509				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
510			}
511			if roTs != rts {
512				t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
513			}
514		}
515		if !testEqual(got, test.want) {
516			t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want)
517		}
518		// SingleUse.ReadUsingIndex
519		got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
520		if err != nil {
521			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err)
522		}
523		// The results from ReadUsingIndex is sorted by the index rather than primary key.
524		if len(got) != len(test.want) {
525			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
526		}
527		for j, g := range got {
528			if j > 0 {
529				prev := got[j-1][1].(string) + got[j-1][2].(string)
530				curr := got[j][1].(string) + got[j][2].(string)
531				if strings.Compare(prev, curr) > 0 {
532					t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
533				}
534			}
535			found := false
536			for _, w := range test.want {
537				if testEqual(g, w) {
538					found = true
539				}
540			}
541			if !found {
542				t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
543				break
544			}
545		}
546		rts, err = ro.Timestamp()
547		if err != nil {
548			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
549		}
550		if err := test.checkTs(rts); err != nil {
551			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
552		}
553		if roTs != rts {
554			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
555		}
556		ro.Close()
557	}
558}
559
560// Test ReadOnlyTransaction with different timestamp bound when there's an update at the same time.
561func TestIntegration_UpdateDuringRead(t *testing.T) {
562	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
563	defer cancel()
564	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
565	defer cleanup()
566
567	for i, tb := range []TimestampBound{
568		StrongRead(),
569		ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour
570		ExactStaleness(time.Minute * 30),
571	} {
572		ro := client.ReadOnlyTransaction().WithTimestampBound(tb)
573		_, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
574		if ErrCode(err) != codes.NotFound {
575			t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err)
576		}
577
578		m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i})
579		if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
580			t.Fatal(err)
581		}
582
583		_, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
584		if ErrCode(err) != codes.NotFound {
585			t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err)
586		}
587	}
588}
589
590// Test ReadWriteTransaction.
591func TestIntegration_ReadWriteTransaction(t *testing.T) {
592	// Give a longer deadline because of transaction backoffs.
593	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
594	defer cancel()
595	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
596	defer cleanup()
597
598	// Set up two accounts
599	accounts := []*Mutation{
600		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
601		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
602	}
603	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
604		t.Fatal(err)
605	}
606	wg := sync.WaitGroup{}
607
608	readBalance := func(iter *RowIterator) (int64, error) {
609		defer iter.Stop()
610		var bal int64
611		for {
612			row, err := iter.Next()
613			if err == iterator.Done {
614				return bal, nil
615			}
616			if err != nil {
617				return 0, err
618			}
619			if err := row.Column(0, &bal); err != nil {
620				return 0, err
621			}
622		}
623	}
624
625	for i := 0; i < 20; i++ {
626		wg.Add(1)
627		go func(iter int) {
628			defer wg.Done()
629			_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
630				// Query Foo's balance and Bar's balance.
631				bf, e := readBalance(tx.Query(ctx,
632					Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}}))
633				if e != nil {
634					return e
635				}
636				bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
637				if e != nil {
638					return e
639				}
640				if bf <= 0 {
641					return nil
642				}
643				bf--
644				bb++
645				return tx.BufferWrite([]*Mutation{
646					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}),
647					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}),
648				})
649			})
650			if err != nil {
651				t.Errorf("%d: failed to execute transaction: %v", iter, err)
652			}
653		}(i)
654	}
655	// Because of context timeout, all goroutines will eventually return.
656	wg.Wait()
657	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
658		var bf, bb int64
659		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"})
660		if e != nil {
661			return e
662		}
663		if ce := r.Column(0, &bf); ce != nil {
664			return ce
665		}
666		bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"}))
667		if e != nil {
668			return e
669		}
670		if bf != 30 || bb != 21 {
671			t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21)
672		}
673		return nil
674	})
675	if err != nil {
676		t.Errorf("failed to check balances: %v", err)
677	}
678}
679
680func TestIntegration_Reads(t *testing.T) {
681	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
682	defer cancel()
683	// Set up testing environment.
684	client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements)
685	defer cleanup()
686
687	// Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2".
688	var ms []*Mutation
689	for i := 0; i < 15; i++ {
690		ms = append(ms, InsertOrUpdate(testTable,
691			testTableColumns,
692			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
693	}
694	// Don't use ApplyAtLeastOnce, so we can test the other code path.
695	if _, err := client.Apply(ctx, ms); err != nil {
696		t.Fatal(err)
697	}
698
699	// Empty read.
700	rows, err := readAllTestTable(client.Single().Read(ctx, testTable,
701		KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns))
702	if err != nil {
703		t.Fatal(err)
704	}
705	if got, want := len(rows), 0; got != want {
706		t.Errorf("got %d, want %d", got, want)
707	}
708
709	// Index empty read.
710	rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex,
711		KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns))
712	if err != nil {
713		t.Fatal(err)
714	}
715	if got, want := len(rows), 0; got != want {
716		t.Errorf("got %d, want %d", got, want)
717	}
718
719	// Point read.
720	row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns)
721	if err != nil {
722		t.Fatal(err)
723	}
724	var got testTableRow
725	if err := row.ToStruct(&got); err != nil {
726		t.Fatal(err)
727	}
728	if want := (testTableRow{"k1", "v1"}); got != want {
729		t.Errorf("got %v, want %v", got, want)
730	}
731
732	// Point read not found.
733	_, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns)
734	if ErrCode(err) != codes.NotFound {
735		t.Fatalf("got %v, want NotFound", err)
736	}
737
738	// No index point read not found, because Go does not have ReadRowUsingIndex.
739
740	rangeReads(ctx, t, client)
741	indexRangeReads(ctx, t, client)
742}
743
744func TestIntegration_EarlyTimestamp(t *testing.T) {
745	// Test that we can get the timestamp from a read-only transaction as
746	// soon as we have read at least one row.
747	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
748	defer cancel()
749	// Set up testing environment.
750	client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements)
751	defer cleanup()
752
753	var ms []*Mutation
754	for i := 0; i < 3; i++ {
755		ms = append(ms, InsertOrUpdate(testTable,
756			testTableColumns,
757			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
758	}
759	if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
760		t.Fatal(err)
761	}
762
763	txn := client.Single()
764	iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns)
765	defer iter.Stop()
766	// In  single-use transaction, we should get an error before reading anything.
767	if _, err := txn.Timestamp(); err == nil {
768		t.Error("wanted error, got nil")
769	}
770	// After reading one row, the timestamp should be available.
771	_, err := iter.Next()
772	if err != nil {
773		t.Fatal(err)
774	}
775	if _, err := txn.Timestamp(); err != nil {
776		t.Errorf("got %v, want nil", err)
777	}
778
779	txn = client.ReadOnlyTransaction()
780	defer txn.Close()
781	iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns)
782	defer iter.Stop()
783	// In an ordinary read-only transaction, the timestamp should be
784	// available immediately.
785	if _, err := txn.Timestamp(); err != nil {
786		t.Errorf("got %v, want nil", err)
787	}
788}
789
790func TestIntegration_NestedTransaction(t *testing.T) {
791	// You cannot use a transaction from inside a read-write transaction.
792	ctx := context.Background()
793	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
794	defer cleanup()
795
796	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
797		_, err := client.ReadWriteTransaction(ctx,
798			func(context.Context, *ReadWriteTransaction) error { return nil })
799		if ErrCode(err) != codes.FailedPrecondition {
800			t.Fatalf("got %v, want FailedPrecondition", err)
801		}
802		_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
803		if ErrCode(err) != codes.FailedPrecondition {
804			t.Fatalf("got %v, want FailedPrecondition", err)
805		}
806		rot := client.ReadOnlyTransaction()
807		defer rot.Close()
808		_, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
809		if ErrCode(err) != codes.FailedPrecondition {
810			t.Fatalf("got %v, want FailedPrecondition", err)
811		}
812		return nil
813	})
814	if err != nil {
815		t.Fatal(err)
816	}
817}
818
819// Test client recovery on database recreation.
820func TestIntegration_DbRemovalRecovery(t *testing.T) {
821	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
822	defer cancel()
823	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
824	defer cleanup()
825
826	// Drop the testing database.
827	if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil {
828		t.Fatalf("failed to drop testing database %v: %v", dbPath, err)
829	}
830
831	// Now, send the query.
832	iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
833	defer iter.Stop()
834	if _, err := iter.Next(); err == nil {
835		t.Errorf("client sends query to removed database successfully, want it to fail")
836	}
837
838	// Recreate database and table.
839	dbName := dbPath[strings.LastIndex(dbPath, "/")+1:]
840	op, err := admin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{
841		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
842		CreateStatement: "CREATE DATABASE " + dbName,
843		ExtraStatements: []string{
844			`CREATE TABLE Singers (
845				SingerId        INT64 NOT NULL,
846				FirstName       STRING(1024),
847				LastName        STRING(1024),
848				SingerInfo      BYTES(MAX)
849			) PRIMARY KEY (SingerId)`,
850		},
851	})
852	if err != nil {
853		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
854	}
855	if _, err := op.Wait(ctx); err != nil {
856		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
857	}
858
859	// Now, send the query again.
860	iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
861	defer iter.Stop()
862	_, err = iter.Next()
863	if err != nil && err != iterator.Done {
864		t.Errorf("failed to send query to database %v: %v", dbPath, err)
865	}
866}
867
868// Test encoding/decoding non-struct Cloud Spanner types.
869func TestIntegration_BasicTypes(t *testing.T) {
870	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
871	defer cancel()
872	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
873	defer cleanup()
874
875	t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z")
876	// Boundaries
877	t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z")
878	t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z")
879	d1, _ := civil.ParseDate("2016-11-15")
880	// Boundaries
881	d2, _ := civil.ParseDate("0001-01-01")
882	d3, _ := civil.ParseDate("9999-12-31")
883
884	tests := []struct {
885		col  string
886		val  interface{}
887		want interface{}
888	}{
889		{col: "String", val: ""},
890		{col: "String", val: "", want: NullString{"", true}},
891		{col: "String", val: "foo"},
892		{col: "String", val: "foo", want: NullString{"foo", true}},
893		{col: "String", val: NullString{"bar", true}, want: "bar"},
894		{col: "String", val: NullString{"bar", false}, want: NullString{"", false}},
895		{col: "String", val: nil, want: NullString{}},
896		{col: "StringArray", val: []string(nil), want: []NullString(nil)},
897		{col: "StringArray", val: []string{}, want: []NullString{}},
898		{col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}},
899		{col: "StringArray", val: []NullString(nil)},
900		{col: "StringArray", val: []NullString{}},
901		{col: "StringArray", val: []NullString{{"foo", true}, {}}},
902		{col: "Bytes", val: []byte{}},
903		{col: "Bytes", val: []byte{1, 2, 3}},
904		{col: "Bytes", val: []byte(nil)},
905		{col: "BytesArray", val: [][]byte(nil)},
906		{col: "BytesArray", val: [][]byte{}},
907		{col: "BytesArray", val: [][]byte{{1}, {2, 3}}},
908		{col: "Int64a", val: 0, want: int64(0)},
909		{col: "Int64a", val: -1, want: int64(-1)},
910		{col: "Int64a", val: 2, want: int64(2)},
911		{col: "Int64a", val: int64(3)},
912		{col: "Int64a", val: 4, want: NullInt64{4, true}},
913		{col: "Int64a", val: NullInt64{5, true}, want: int64(5)},
914		{col: "Int64a", val: NullInt64{6, true}, want: int64(6)},
915		{col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}},
916		{col: "Int64a", val: nil, want: NullInt64{}},
917		{col: "Int64Array", val: []int(nil), want: []NullInt64(nil)},
918		{col: "Int64Array", val: []int{}, want: []NullInt64{}},
919		{col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
920		{col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)},
921		{col: "Int64Array", val: []int64{}, want: []NullInt64{}},
922		{col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
923		{col: "Int64Array", val: []NullInt64(nil)},
924		{col: "Int64Array", val: []NullInt64{}},
925		{col: "Int64Array", val: []NullInt64{{1, true}, {}}},
926		{col: "Bool", val: false},
927		{col: "Bool", val: true},
928		{col: "Bool", val: false, want: NullBool{false, true}},
929		{col: "Bool", val: true, want: NullBool{true, true}},
930		{col: "Bool", val: NullBool{true, true}},
931		{col: "Bool", val: NullBool{false, false}},
932		{col: "Bool", val: nil, want: NullBool{}},
933		{col: "BoolArray", val: []bool(nil), want: []NullBool(nil)},
934		{col: "BoolArray", val: []bool{}, want: []NullBool{}},
935		{col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}},
936		{col: "BoolArray", val: []NullBool(nil)},
937		{col: "BoolArray", val: []NullBool{}},
938		{col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}},
939		{col: "Float64", val: 0.0},
940		{col: "Float64", val: 3.14},
941		{col: "Float64", val: math.NaN()},
942		{col: "Float64", val: math.Inf(1)},
943		{col: "Float64", val: math.Inf(-1)},
944		{col: "Float64", val: 2.78, want: NullFloat64{2.78, true}},
945		{col: "Float64", val: NullFloat64{2.71, true}, want: 2.71},
946		{col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}},
947		{col: "Float64", val: NullFloat64{0, false}},
948		{col: "Float64", val: nil, want: NullFloat64{}},
949		{col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)},
950		{col: "Float64Array", val: []float64{}, want: []NullFloat64{}},
951		{col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}},
952		{col: "Float64Array", val: []NullFloat64(nil)},
953		{col: "Float64Array", val: []NullFloat64{}},
954		{col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}},
955		{col: "Date", val: d1},
956		{col: "Date", val: d1, want: NullDate{d1, true}},
957		{col: "Date", val: NullDate{d1, true}},
958		{col: "Date", val: NullDate{d1, true}, want: d1},
959		{col: "Date", val: NullDate{civil.Date{}, false}},
960		{col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)},
961		{col: "DateArray", val: []civil.Date{}, want: []NullDate{}},
962		{col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}},
963		{col: "Timestamp", val: t1},
964		{col: "Timestamp", val: t1, want: NullTime{t1, true}},
965		{col: "Timestamp", val: NullTime{t1, true}},
966		{col: "Timestamp", val: NullTime{t1, true}, want: t1},
967		{col: "Timestamp", val: NullTime{}},
968		{col: "Timestamp", val: nil, want: NullTime{}},
969		{col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)},
970		{col: "TimestampArray", val: []time.Time{}, want: []NullTime{}},
971		{col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}},
972	}
973
974	// Write rows into table first.
975	var muts []*Mutation
976	for i, test := range tests {
977		muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val}))
978	}
979	if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil {
980		t.Fatal(err)
981	}
982
983	for i, test := range tests {
984		row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col})
985		if err != nil {
986			t.Fatalf("Unable to fetch row %v: %v", i, err)
987		}
988		// Create new instance of type of test.want.
989		want := test.want
990		if want == nil {
991			want = test.val
992		}
993		gotp := reflect.New(reflect.TypeOf(want))
994		if err := row.Column(0, gotp.Interface()); err != nil {
995			t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err)
996			continue
997		}
998		got := reflect.Indirect(gotp).Interface()
999
1000		// One of the test cases is checking NaN handling.  Given
1001		// NaN!=NaN, we can't use reflect to test for it.
1002		if isNaN(got) && isNaN(want) {
1003			continue
1004		}
1005
1006		// Check non-NaN cases.
1007		if !testEqual(got, want) {
1008			t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want)
1009			continue
1010		}
1011	}
1012}
1013
1014// Test decoding Cloud Spanner STRUCT type.
1015func TestIntegration_StructTypes(t *testing.T) {
1016	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1017	defer cancel()
1018	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
1019	defer cleanup()
1020
1021	tests := []struct {
1022		q    Statement
1023		want func(r *Row) error
1024	}{
1025		{
1026			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`},
1027			want: func(r *Row) error {
1028				// Test STRUCT ARRAY decoding to []NullRow.
1029				var rows []NullRow
1030				if err := r.Column(0, &rows); err != nil {
1031					return err
1032				}
1033				if len(rows) != 1 {
1034					return fmt.Errorf("len(rows) = %d; want 1", len(rows))
1035				}
1036				if !rows[0].Valid {
1037					return fmt.Errorf("rows[0] is NULL")
1038				}
1039				var i, j int64
1040				if err := rows[0].Row.Columns(&i, &j); err != nil {
1041					return err
1042				}
1043				if i != 1 || j != 2 {
1044					return fmt.Errorf("got (%d,%d), want (1,2)", i, j)
1045				}
1046				return nil
1047			},
1048		},
1049		{
1050			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`},
1051			want: func(r *Row) error {
1052				// Test Row.ToStruct.
1053				s := struct {
1054					Col1 []*struct {
1055						Foo int64 `spanner:"foo"`
1056						Bar int64 `spanner:"bar"`
1057					} `spanner:"col1"`
1058				}{}
1059				if err := r.ToStruct(&s); err != nil {
1060					return err
1061				}
1062				want := struct {
1063					Col1 []*struct {
1064						Foo int64 `spanner:"foo"`
1065						Bar int64 `spanner:"bar"`
1066					} `spanner:"col1"`
1067				}{
1068					Col1: []*struct {
1069						Foo int64 `spanner:"foo"`
1070						Bar int64 `spanner:"bar"`
1071					}{
1072						{
1073							Foo: 1,
1074							Bar: 2,
1075						},
1076					},
1077				}
1078				if !testEqual(want, s) {
1079					return fmt.Errorf("unexpected decoding result: %v, want %v", s, want)
1080				}
1081				return nil
1082			},
1083		},
1084	}
1085	for i, test := range tests {
1086		iter := client.Single().Query(ctx, test.q)
1087		defer iter.Stop()
1088		row, err := iter.Next()
1089		if err != nil {
1090			t.Errorf("%d: %v", i, err)
1091			continue
1092		}
1093		if err := test.want(row); err != nil {
1094			t.Errorf("%d: %v", i, err)
1095			continue
1096		}
1097	}
1098}
1099
1100func TestIntegration_StructParametersUnsupported(t *testing.T) {
1101	ctx := context.Background()
1102	client, _, cleanup := prepareIntegrationTest(ctx, t, nil)
1103	defer cleanup()
1104
1105	for _, test := range []struct {
1106		param       interface{}
1107		wantCode    codes.Code
1108		wantMsgPart string
1109	}{
1110		{
1111			struct {
1112				Field int
1113			}{10},
1114			codes.Unimplemented,
1115			"Unsupported query shape: " +
1116				"A struct value cannot be returned as a column value. " +
1117				"Rewrite the query to flatten the struct fields in the result.",
1118		},
1119		{
1120			[]struct {
1121				Field int
1122			}{{10}, {20}},
1123			codes.Unimplemented,
1124			"Unsupported query shape: " +
1125				"This query can return a null-valued array of struct, " +
1126				"which is not supported by Spanner.",
1127		},
1128	} {
1129		iter := client.Single().Query(ctx, Statement{
1130			SQL:    "SELECT @p",
1131			Params: map[string]interface{}{"p": test.param},
1132		})
1133		_, err := iter.Next()
1134		iter.Stop()
1135		if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok {
1136			t.Fatal(msg)
1137		}
1138	}
1139}
1140
1141// Test queries of the form "SELECT expr".
1142func TestIntegration_QueryExpressions(t *testing.T) {
1143	ctx := context.Background()
1144	client, _, cleanup := prepareIntegrationTest(ctx, t, nil)
1145	defer cleanup()
1146
1147	newRow := func(vals []interface{}) *Row {
1148		row, err := NewRow(make([]string, len(vals)), vals)
1149		if err != nil {
1150			t.Fatal(err)
1151		}
1152		return row
1153	}
1154
1155	tests := []struct {
1156		expr string
1157		want interface{}
1158	}{
1159		{"1", int64(1)},
1160		{"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}},
1161		{"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}},
1162		{"IEEE_DIVIDE(1, 0)", math.Inf(1)},
1163		{"IEEE_DIVIDE(-1, 0)", math.Inf(-1)},
1164		{"IEEE_DIVIDE(0, 0)", math.NaN()},
1165		// TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate.
1166		{"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}},
1167		{"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}},
1168		{"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}},
1169	}
1170	for _, test := range tests {
1171		iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr})
1172		defer iter.Stop()
1173		row, err := iter.Next()
1174		if err != nil {
1175			t.Errorf("%q: %v", test.expr, err)
1176			continue
1177		}
1178		// Create new instance of type of test.want.
1179		gotp := reflect.New(reflect.TypeOf(test.want))
1180		if err := row.Column(0, gotp.Interface()); err != nil {
1181			t.Errorf("%q: Column returned error %v", test.expr, err)
1182			continue
1183		}
1184		got := reflect.Indirect(gotp).Interface()
1185		// TODO(jba): remove isNaN special case when we have a better equality predicate.
1186		if isNaN(got) && isNaN(test.want) {
1187			continue
1188		}
1189		if !testEqual(got, test.want) {
1190			t.Errorf("%q\n got  %#v\nwant %#v", test.expr, got, test.want)
1191		}
1192	}
1193}
1194
1195func TestIntegration_QueryStats(t *testing.T) {
1196	ctx := context.Background()
1197	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
1198	defer cleanup()
1199
1200	accounts := []*Mutation{
1201		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1202		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1203	}
1204	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1205		t.Fatal(err)
1206	}
1207	const sql = "SELECT Balance FROM Accounts"
1208
1209	qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil})
1210	if err != nil {
1211		t.Fatal(err)
1212	}
1213	if len(qp.PlanNodes) == 0 {
1214		t.Error("got zero plan nodes, expected at least one")
1215	}
1216
1217	iter := client.Single().QueryWithStats(ctx, Statement{sql, nil})
1218	defer iter.Stop()
1219	for {
1220		_, err := iter.Next()
1221		if err == iterator.Done {
1222			break
1223		}
1224		if err != nil {
1225			t.Fatal(err)
1226		}
1227	}
1228	if iter.QueryPlan == nil {
1229		t.Error("got nil QueryPlan, expected one")
1230	}
1231	if iter.QueryStats == nil {
1232		t.Error("got nil QueryStats, expected some")
1233	}
1234}
1235
1236func TestIntegration_InvalidDatabase(t *testing.T) {
1237	if testProjectID == "" {
1238		t.Skip("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing")
1239	}
1240	ctx := context.Background()
1241	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID)
1242	c, err := createClient(ctx, dbPath)
1243	// Client creation should succeed even if the database is invalid.
1244	if err != nil {
1245		t.Fatal(err)
1246	}
1247	_, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"})
1248	if msg, ok := matchError(err, codes.NotFound, ""); !ok {
1249		t.Fatal(msg)
1250	}
1251}
1252
1253func TestIntegration_ReadErrors(t *testing.T) {
1254	ctx := context.Background()
1255	client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements)
1256	defer cleanup()
1257
1258	// Read over invalid table fails
1259	_, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"})
1260	if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok {
1261		t.Error(msg)
1262	}
1263	// Read over invalid column fails
1264	_, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"})
1265	if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok {
1266		t.Error(msg)
1267	}
1268
1269	// Invalid query fails
1270	iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"})
1271	defer iter.Stop()
1272	_, err = iter.Next()
1273	if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok {
1274		t.Error(msg)
1275	}
1276
1277	// Read should fail on cancellation.
1278	cctx, cancel := context.WithCancel(ctx)
1279	cancel()
1280	_, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"})
1281	if msg, ok := matchError(err, codes.Canceled, ""); !ok {
1282		t.Error(msg)
1283	}
1284	// Read should fail if deadline exceeded.
1285	dctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
1286	defer cancel()
1287	<-dctx.Done()
1288	_, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"})
1289	if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok {
1290		t.Error(msg)
1291	}
1292}
1293
1294// Test TransactionRunner. Test that transactions are aborted and retried as expected.
1295func TestIntegration_TransactionRunner(t *testing.T) {
1296	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1297	defer cancel()
1298	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
1299	defer cleanup()
1300
1301	// Test 1: User error should abort the transaction.
1302	_, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1303		tx.BufferWrite([]*Mutation{
1304			Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})})
1305		return errors.New("user error")
1306	})
1307	// Empty read.
1308	rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"}))
1309	if err != nil {
1310		t.Fatal(err)
1311	}
1312	if got, want := len(rows), 0; got != want {
1313		t.Errorf("Empty read, got %d, want %d.", got, want)
1314	}
1315
1316	// Test 2: Expect abort and retry.
1317	// We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by committing writes to the column txn2 have read,
1318	// and expect the following read to abort and txn2 retries.
1319
1320	// Set up two accounts
1321	accounts := []*Mutation{
1322		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}),
1323		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}),
1324	}
1325	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1326		t.Fatal(err)
1327	}
1328
1329	var (
1330		cTxn1Start  = make(chan struct{})
1331		cTxn1Commit = make(chan struct{})
1332		cTxn2Start  = make(chan struct{})
1333		wg          sync.WaitGroup
1334	)
1335
1336	// read balance, check error if we don't expect abort.
1337	readBalance := func(tx interface {
1338		ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error)
1339	}, key int64, expectAbort bool) (int64, error) {
1340		var b int64
1341		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"})
1342		if e != nil {
1343			if expectAbort && !isAbortErr(e) {
1344				t.Errorf("ReadRow got %v, want Abort error.", e)
1345			}
1346			return b, e
1347		}
1348		if ce := r.Column(0, &b); ce != nil {
1349			return b, ce
1350		}
1351		return b, nil
1352	}
1353
1354	wg.Add(2)
1355	// Txn 1
1356	go func() {
1357		defer wg.Done()
1358		var once sync.Once
1359		_, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1360			b, e := readBalance(tx, 1, false)
1361			if e != nil {
1362				return e
1363			}
1364			// txn 1 can abort, in that case we skip closing the channel on retry.
1365			once.Do(func() { close(cTxn1Start) })
1366			e = tx.BufferWrite([]*Mutation{
1367				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})})
1368			if e != nil {
1369				return e
1370			}
1371			// Wait for second transaction.
1372			<-cTxn2Start
1373			return nil
1374		})
1375		close(cTxn1Commit)
1376		if e != nil {
1377			t.Errorf("Transaction 1 commit, got %v, want nil.", e)
1378		}
1379	}()
1380	// Txn 2
1381	go func() {
1382		// Wait until txn 1 starts.
1383		<-cTxn1Start
1384		defer wg.Done()
1385		var (
1386			once sync.Once
1387			b1   int64
1388			b2   int64
1389			e    error
1390		)
1391		_, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1392			if b1, e = readBalance(tx, 1, false); e != nil {
1393				return e
1394			}
1395			// Skip closing channel on retry.
1396			once.Do(func() { close(cTxn2Start) })
1397			// Wait until txn 1 successfully commits.
1398			<-cTxn1Commit
1399			// Txn1 has committed and written a balance to the account.
1400			// Now this transaction (txn2) reads and re-writes the balance.
1401			// The first time through, it will abort because it overlaps with txn1.
1402			// Then it will retry after txn1 commits, and succeed.
1403			if b2, e = readBalance(tx, 2, true); e != nil {
1404				return e
1405			}
1406			return tx.BufferWrite([]*Mutation{
1407				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})})
1408		})
1409		if e != nil {
1410			t.Errorf("Transaction 2 commit, got %v, want nil.", e)
1411		}
1412	}()
1413	wg.Wait()
1414	// Check that both transactions' effects are visible.
1415	for i := int64(1); i <= int64(2); i++ {
1416		if b, e := readBalance(client.Single(), i, false); e != nil {
1417			t.Fatalf("ReadBalance for key %d error %v.", i, e)
1418		} else if b != i {
1419			t.Errorf("Balance for key %d, got %d, want %d.", i, b, i)
1420		}
1421	}
1422}
1423
1424// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then
1425// serialize and deserialize both transaction and partition to be used in
1426// execution on another client, and compare results.
1427func TestIntegration_BatchQuery(t *testing.T) {
1428	// Set up testing environment.
1429	var (
1430		client2 *Client
1431		err     error
1432	)
1433	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1434	defer cancel()
1435	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements)
1436	defer cleanup()
1437
1438	if err = populate(ctx, client); err != nil {
1439		t.Fatal(err)
1440	}
1441	if client2, err = createClient(ctx, dbPath); err != nil {
1442		t.Fatal(err)
1443	}
1444	defer client2.Close()
1445
1446	// PartitionQuery
1447	var (
1448		txn        *BatchReadOnlyTransaction
1449		partitions []*Partition
1450		stmt       = Statement{SQL: "SELECT * FROM test;"}
1451	)
1452
1453	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1454		t.Fatal(err)
1455	}
1456	defer txn.Cleanup(ctx)
1457	if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil {
1458		t.Fatal(err)
1459	}
1460
1461	// Reconstruct BatchReadOnlyTransactionID and execute partitions
1462	var (
1463		tid2      BatchReadOnlyTransactionID
1464		data      []byte
1465		gotResult bool // if we get matching result from two separate txns
1466	)
1467	if data, err = txn.ID.MarshalBinary(); err != nil {
1468		t.Fatalf("encoding failed %v", err)
1469	}
1470	if err = tid2.UnmarshalBinary(data); err != nil {
1471		t.Fatalf("decoding failed %v", err)
1472	}
1473	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
1474
1475	// Execute Partitions and compare results
1476	for i, p := range partitions {
1477		iter := txn.Execute(ctx, p)
1478		defer iter.Stop()
1479		p2 := serdesPartition(t, i, p)
1480		iter2 := txn2.Execute(ctx, &p2)
1481		defer iter2.Stop()
1482
1483		row1, err1 := iter.Next()
1484		row2, err2 := iter2.Next()
1485		if err1 != err2 {
1486			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
1487			continue
1488		}
1489		if !testEqual(row1, row2) {
1490			t.Fatalf("execution returned different values: %v, %v", row1, row2)
1491			continue
1492		}
1493		if row1 == nil {
1494			continue
1495		}
1496		var a, b string
1497		if err = row1.Columns(&a, &b); err != nil {
1498			t.Fatalf("failed to parse row %v", err)
1499			continue
1500		}
1501		if a == str1 && b == str2 {
1502			gotResult = true
1503		}
1504	}
1505	if !gotResult {
1506		t.Fatalf("execution didn't return expected values")
1507	}
1508}
1509
1510// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery
1511func TestIntegration_BatchRead(t *testing.T) {
1512	// Set up testing environment.
1513	var (
1514		client2 *Client
1515		err     error
1516	)
1517	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1518	defer cancel()
1519	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements)
1520	defer cleanup()
1521
1522	if err = populate(ctx, client); err != nil {
1523		t.Fatal(err)
1524	}
1525	if client2, err = createClient(ctx, dbPath); err != nil {
1526		t.Fatal(err)
1527	}
1528	defer client2.Close()
1529
1530	// PartitionRead
1531	var (
1532		txn        *BatchReadOnlyTransaction
1533		partitions []*Partition
1534	)
1535
1536	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1537		t.Fatal(err)
1538	}
1539	defer txn.Cleanup(ctx)
1540	if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
1541		t.Fatal(err)
1542	}
1543
1544	// Reconstruct BatchReadOnlyTransactionID and execute partitions
1545	var (
1546		tid2      BatchReadOnlyTransactionID
1547		data      []byte
1548		gotResult bool // if we get matching result from two separate txns
1549	)
1550	if data, err = txn.ID.MarshalBinary(); err != nil {
1551		t.Fatalf("encoding failed %v", err)
1552	}
1553	if err = tid2.UnmarshalBinary(data); err != nil {
1554		t.Fatalf("decoding failed %v", err)
1555	}
1556	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
1557
1558	// Execute Partitions and compare results
1559	for i, p := range partitions {
1560		iter := txn.Execute(ctx, p)
1561		defer iter.Stop()
1562		p2 := serdesPartition(t, i, p)
1563		iter2 := txn2.Execute(ctx, &p2)
1564		defer iter2.Stop()
1565
1566		row1, err1 := iter.Next()
1567		row2, err2 := iter2.Next()
1568		if err1 != err2 {
1569			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
1570			continue
1571		}
1572		if !testEqual(row1, row2) {
1573			t.Fatalf("execution returned different values: %v, %v", row1, row2)
1574			continue
1575		}
1576		if row1 == nil {
1577			continue
1578		}
1579		var a, b string
1580		if err = row1.Columns(&a, &b); err != nil {
1581			t.Fatalf("failed to parse row %v", err)
1582			continue
1583		}
1584		if a == str1 && b == str2 {
1585			gotResult = true
1586		}
1587	}
1588	if !gotResult {
1589		t.Fatalf("execution didn't return expected values")
1590	}
1591}
1592
1593// Test normal txReadEnv method on BatchReadOnlyTransaction.
1594func TestIntegration_BROTNormal(t *testing.T) {
1595	// Set up testing environment and create txn.
1596	var (
1597		txn *BatchReadOnlyTransaction
1598		err error
1599		row *Row
1600		i   int64
1601	)
1602	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1603	defer cancel()
1604	client, _, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements)
1605	defer cleanup()
1606
1607	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1608		t.Fatal(err)
1609	}
1610	defer txn.Cleanup(ctx)
1611	if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
1612		t.Fatal(err)
1613	}
1614	// Normal query should work with BatchReadOnlyTransaction
1615	stmt2 := Statement{SQL: "SELECT 1"}
1616	iter := txn.Query(ctx, stmt2)
1617	defer iter.Stop()
1618
1619	row, err = iter.Next()
1620	if err != nil {
1621		t.Errorf("query failed with %v", err)
1622	}
1623	if err = row.Columns(&i); err != nil {
1624		t.Errorf("failed to parse row %v", err)
1625	}
1626}
1627
1628func TestIntegration_CommitTimestamp(t *testing.T) {
1629	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1630	defer cancel()
1631	client, _, cleanup := prepareIntegrationTest(ctx, t, ctsDBStatements)
1632	defer cleanup()
1633
1634	type testTableRow struct {
1635		Key string
1636		Ts  NullTime
1637	}
1638
1639	var (
1640		cts1, cts2, ts1, ts2 time.Time
1641		err                  error
1642	)
1643
1644	// Apply mutation in sequence, expect to see commit timestamp in good order, check also the commit timestamp returned
1645	for _, it := range []struct {
1646		k string
1647		t *time.Time
1648	}{
1649		{"a", &cts1},
1650		{"b", &cts2},
1651	} {
1652		tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}}
1653		m, err := InsertStruct("TestTable", tt)
1654		if err != nil {
1655			t.Fatal(err)
1656		}
1657		*it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce())
1658		if err != nil {
1659			t.Fatal(err)
1660		}
1661	}
1662
1663	txn := client.ReadOnlyTransaction()
1664	for _, it := range []struct {
1665		k string
1666		t *time.Time
1667	}{
1668		{"a", &ts1},
1669		{"b", &ts2},
1670	} {
1671		if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil {
1672			t.Fatal(err)
1673		} else {
1674			var got testTableRow
1675			if err := r.ToStruct(&got); err != nil {
1676				t.Fatal(err)
1677			}
1678			*it.t = got.Ts.Time
1679		}
1680	}
1681	if !cts1.Equal(ts1) {
1682		t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1)
1683	}
1684	if !cts2.Equal(ts2) {
1685		t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2)
1686	}
1687
1688	// Try writing a timestamp in the future to commit timestamp, expect error
1689	_, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce())
1690	if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok {
1691		t.Error(msg)
1692	}
1693}
1694
1695func TestIntegration_DML(t *testing.T) {
1696	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1697	defer cancel()
1698	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
1699	defer cleanup()
1700
1701	// Function that reads a single row's first name from within a transaction.
1702	readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) {
1703		row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"})
1704		if err != nil {
1705			return "", err
1706		}
1707		var fn string
1708		if err := row.Column(0, &fn); err != nil {
1709			return "", err
1710		}
1711		return fn, nil
1712	}
1713
1714	// Function that reads multiple rows' first names from outside a read/write transaction.
1715	readFirstNames := func(keys ...int) []string {
1716		var ks []KeySet
1717		for _, k := range keys {
1718			ks = append(ks, Key{k})
1719		}
1720		iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"})
1721		var got []string
1722		var fn string
1723		err := iter.Do(func(row *Row) error {
1724			if err := row.Column(0, &fn); err != nil {
1725				return err
1726			}
1727			got = append(got, fn)
1728			return nil
1729		})
1730		if err != nil {
1731			t.Fatalf("readFirstNames(%v): %v", keys, err)
1732		}
1733		return got
1734	}
1735
1736	// Use ReadWriteTransaction.Query to execute a DML statement.
1737	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1738		iter := tx.Query(ctx, Statement{
1739			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`,
1740		})
1741		defer iter.Stop()
1742		if row, err := iter.Next(); err != iterator.Done {
1743			t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err)
1744		}
1745		if iter.RowCount != 1 {
1746			t.Errorf("row count: got %d, want 1", iter.RowCount)
1747		}
1748		// The results of the DML statement should be visible to the transaction.
1749		got, err := readFirstName(tx, 1)
1750		if err != nil {
1751			return err
1752		}
1753		if want := "Umm"; got != want {
1754			t.Errorf("got %q, want %q", got, want)
1755		}
1756		return nil
1757	})
1758	if err != nil {
1759		t.Fatal(err)
1760	}
1761
1762	// Use ReadWriteTransaction.Update to execute a DML statement.
1763	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1764		count, err := tx.Update(ctx, Statement{
1765			SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`,
1766		})
1767		if err != nil {
1768			t.Fatal(err)
1769		}
1770		if count != 1 {
1771			t.Errorf("row count: got %d, want 1", count)
1772		}
1773		got, err := readFirstName(tx, 2)
1774		if err != nil {
1775			return err
1776		}
1777		if want := "Eduard"; got != want {
1778			t.Errorf("got %q, want %q", got, want)
1779		}
1780		return nil
1781
1782	})
1783	if err != nil {
1784		t.Fatal(err)
1785	}
1786
1787	// Roll back a DML statement and confirm that it didn't happen.
1788	var fail = errors.New("fail")
1789	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1790		_, err := tx.Update(ctx, Statement{
1791			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
1792		})
1793		if err != nil {
1794			return err
1795		}
1796		return fail
1797	})
1798	if err != fail {
1799		t.Fatalf("rolling back: got error %v, want the error 'fail'", err)
1800	}
1801	_, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"})
1802	if got, want := ErrCode(err), codes.NotFound; got != want {
1803		t.Errorf("got %s, want %s", got, want)
1804	}
1805
1806	// Run two DML statements in the same transaction.
1807	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1808		_, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`})
1809		if err != nil {
1810			return err
1811		}
1812		_, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`})
1813		if err != nil {
1814			return err
1815		}
1816		return nil
1817	})
1818	if err != nil {
1819		t.Fatal(err)
1820	}
1821	got := readFirstNames(1, 2)
1822	want := []string{"Oum", "Eddie"}
1823	if !testEqual(got, want) {
1824		t.Errorf("got %v, want %v", got, want)
1825	}
1826
1827	// Run a DML statement and an ordinary mutation in the same transaction.
1828	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1829		_, err := tx.Update(ctx, Statement{
1830			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
1831		})
1832		if err != nil {
1833			return err
1834		}
1835		tx.BufferWrite([]*Mutation{
1836			Insert("Singers", []string{"SingerId", "FirstName", "LastName"},
1837				[]interface{}{4, "Andy", "Irvine"}),
1838		})
1839		return nil
1840	})
1841	if err != nil {
1842		t.Fatal(err)
1843	}
1844	got = readFirstNames(3, 4)
1845	want = []string{"Audra", "Andy"}
1846	if !testEqual(got, want) {
1847		t.Errorf("got %v, want %v", got, want)
1848	}
1849
1850	// Attempt to run a query using update.
1851	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1852		_, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`})
1853		return err
1854	})
1855	if got, want := ErrCode(err), codes.InvalidArgument; got != want {
1856		t.Errorf("got %s, want %s", got, want)
1857	}
1858}
1859
1860func TestIntegration_StructParametersBind(t *testing.T) {
1861	t.Parallel()
1862	ctx := context.Background()
1863	client, _, cleanup := prepareIntegrationTest(ctx, t, nil)
1864	defer cleanup()
1865
1866	type tRow []interface{}
1867	type tRows []struct{ trow tRow }
1868
1869	type allFields struct {
1870		Stringf string
1871		Intf    int
1872		Boolf   bool
1873		Floatf  float64
1874		Bytef   []byte
1875		Timef   time.Time
1876		Datef   civil.Date
1877	}
1878	allColumns := []string{
1879		"Stringf",
1880		"Intf",
1881		"Boolf",
1882		"Floatf",
1883		"Bytef",
1884		"Timef",
1885		"Datef",
1886	}
1887	s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1}
1888	s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2}
1889
1890	dynamicStructType := reflect.StructOf([]reflect.StructField{
1891		{Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`},
1892	})
1893	s3 := reflect.New(dynamicStructType)
1894	s3.Elem().Field(0).Set(reflect.ValueOf(t1))
1895
1896	for i, test := range []struct {
1897		param interface{}
1898		sql   string
1899		cols  []string
1900		trows tRows
1901	}{
1902		// Struct value.
1903		{
1904			s1,
1905			"SELECT" +
1906				" @p.Stringf," +
1907				" @p.Intf," +
1908				" @p.Boolf," +
1909				" @p.Floatf," +
1910				" @p.Bytef," +
1911				" @p.Timef," +
1912				" @p.Datef",
1913			allColumns,
1914			tRows{
1915				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
1916			},
1917		},
1918		// Array of struct value.
1919		{
1920			[]allFields{s1, s2},
1921			"SELECT * FROM UNNEST(@p)",
1922			allColumns,
1923			tRows{
1924				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
1925				{tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}},
1926			},
1927		},
1928		// Null struct.
1929		{
1930			(*allFields)(nil),
1931			"SELECT @p IS NULL",
1932			[]string{""},
1933			tRows{
1934				{tRow{true}},
1935			},
1936		},
1937		// Null Array of struct.
1938		{
1939			[]allFields(nil),
1940			"SELECT @p IS NULL",
1941			[]string{""},
1942			tRows{
1943				{tRow{true}},
1944			},
1945		},
1946		// Empty struct.
1947		{
1948			struct{}{},
1949			"SELECT @p IS NULL ",
1950			[]string{""},
1951			tRows{
1952				{tRow{false}},
1953			},
1954		},
1955		// Empty array of struct.
1956		{
1957			[]allFields{},
1958			"SELECT * FROM UNNEST(@p) ",
1959			allColumns,
1960			tRows{},
1961		},
1962		// Struct with duplicate fields.
1963		{
1964			struct {
1965				A int `spanner:"field"`
1966				B int `spanner:"field"`
1967			}{10, 20},
1968			"SELECT * FROM UNNEST([@p]) ",
1969			[]string{"field", "field"},
1970			tRows{
1971				{tRow{10, 20}},
1972			},
1973		},
1974		// Struct with unnamed fields.
1975		{
1976			struct {
1977				A string `spanner:""`
1978			}{"hello"},
1979			"SELECT * FROM UNNEST([@p]) ",
1980			[]string{""},
1981			tRows{
1982				{tRow{"hello"}},
1983			},
1984		},
1985		// Mixed struct.
1986		{
1987			struct {
1988				DynamicStructField interface{}  `spanner:"f1"`
1989				ArrayStructField   []*allFields `spanner:"f2"`
1990			}{
1991				DynamicStructField: s3.Interface(),
1992				ArrayStructField:   []*allFields{nil},
1993			},
1994			"SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ",
1995			[]string{"ff1", "", ""},
1996			tRows{
1997				{tRow{t1, 1, true}},
1998			},
1999		},
2000	} {
2001		iter := client.Single().Query(ctx, Statement{
2002			SQL:    test.sql,
2003			Params: map[string]interface{}{"p": test.param},
2004		})
2005		var gotRows []*Row
2006		err := iter.Do(func(r *Row) error {
2007			gotRows = append(gotRows, r)
2008			return nil
2009		})
2010		if err != nil {
2011			t.Errorf("Failed to execute test case %d, error: %v", i, err)
2012		}
2013
2014		var wantRows []*Row
2015		for j, row := range test.trows {
2016			r, err := NewRow(test.cols, row.trow)
2017			if err != nil {
2018				t.Errorf("Invalid row %d in test case %d", j, i)
2019			}
2020			wantRows = append(wantRows, r)
2021		}
2022		if !testEqual(gotRows, wantRows) {
2023			t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows)
2024		}
2025	}
2026}
2027
2028func TestIntegration_PDML(t *testing.T) {
2029	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2030	defer cancel()
2031	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
2032	defer cleanup()
2033
2034	columns := []string{"SingerId", "FirstName", "LastName"}
2035
2036	// Populate the Singers table.
2037	var muts []*Mutation
2038	for _, row := range [][]interface{}{
2039		{1, "Umm", "Kulthum"},
2040		{2, "Eduard", "Khil"},
2041		{3, "Audra", "McDonald"},
2042	} {
2043		muts = append(muts, Insert("Singers", columns, row))
2044	}
2045	if _, err := client.Apply(ctx, muts); err != nil {
2046		t.Fatal(err)
2047	}
2048	// Identifiers in PDML statements must be fully qualified.
2049	// TODO(jba): revisit the above.
2050	count, err := client.PartitionedUpdate(ctx, Statement{
2051		SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= 3`,
2052	})
2053	if err != nil {
2054		t.Fatal(err)
2055	}
2056	if want := int64(3); count != want {
2057		t.Errorf("got %d, want %d", count, want)
2058	}
2059	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2060	if err != nil {
2061		t.Fatal(err)
2062	}
2063	want := [][]interface{}{
2064		{int64(1), "changed", "Kulthum"},
2065		{int64(2), "changed", "Khil"},
2066		{int64(3), "changed", "McDonald"},
2067	}
2068	if !testEqual(got, want) {
2069		t.Errorf("\ngot %v\nwant%v", got, want)
2070	}
2071}
2072
2073func TestBatchDML(t *testing.T) {
2074	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2075	defer cancel()
2076	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
2077	defer cleanup()
2078
2079	columns := []string{"SingerId", "FirstName", "LastName"}
2080
2081	// Populate the Singers table.
2082	var muts []*Mutation
2083	for _, row := range [][]interface{}{
2084		{1, "Umm", "Kulthum"},
2085		{2, "Eduard", "Khil"},
2086		{3, "Audra", "McDonald"},
2087	} {
2088		muts = append(muts, Insert("Singers", columns, row))
2089	}
2090	if _, err := client.Apply(ctx, muts); err != nil {
2091		t.Fatal(err)
2092	}
2093
2094	var counts []int64
2095	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2096		counts, err = tx.BatchUpdate(ctx, []Statement{
2097			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2098			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2099			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2100		})
2101		return err
2102	})
2103
2104	if err != nil {
2105		t.Fatal(err)
2106	}
2107	if want := []int64{1, 1, 1}; !testEqual(counts, want) {
2108		t.Fatalf("got %d, want %d", counts, want)
2109	}
2110	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2111	if err != nil {
2112		t.Fatal(err)
2113	}
2114	want := [][]interface{}{
2115		{int64(1), "changed 1", "Kulthum"},
2116		{int64(2), "changed 2", "Khil"},
2117		{int64(3), "changed 3", "McDonald"},
2118	}
2119	if !testEqual(got, want) {
2120		t.Errorf("\ngot %v\nwant%v", got, want)
2121	}
2122}
2123
2124func TestBatchDML_NoStatements(t *testing.T) {
2125	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2126	defer cancel()
2127	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
2128	defer cleanup()
2129
2130	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2131		_, err = tx.BatchUpdate(ctx, []Statement{})
2132		return err
2133	})
2134	if err == nil {
2135		t.Fatal("expected error, got nil")
2136	}
2137	if s, ok := status.FromError(err); ok {
2138		if s.Code() != codes.InvalidArgument {
2139			t.Fatalf("expected InvalidArgument, got %v", err)
2140		}
2141	} else {
2142		t.Fatalf("expected InvalidArgument, got %v", err)
2143	}
2144}
2145
2146func TestBatchDML_TwoStatements(t *testing.T) {
2147	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2148	defer cancel()
2149	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
2150	defer cleanup()
2151
2152	columns := []string{"SingerId", "FirstName", "LastName"}
2153
2154	// Populate the Singers table.
2155	var muts []*Mutation
2156	for _, row := range [][]interface{}{
2157		{1, "Umm", "Kulthum"},
2158		{2, "Eduard", "Khil"},
2159		{3, "Audra", "McDonald"},
2160	} {
2161		muts = append(muts, Insert("Singers", columns, row))
2162	}
2163	if _, err := client.Apply(ctx, muts); err != nil {
2164		t.Fatal(err)
2165	}
2166
2167	var updateCount int64
2168	var batchCounts []int64
2169	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2170		batchCounts, err = tx.BatchUpdate(ctx, []Statement{
2171			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2172			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2173			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2174		})
2175		if err != nil {
2176			return err
2177		}
2178
2179		updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`})
2180		return err
2181	})
2182	if err != nil {
2183		t.Fatal(err)
2184	}
2185	if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) {
2186		t.Fatalf("got %d, want %d", batchCounts, want)
2187	}
2188	if updateCount != 1 {
2189		t.Fatalf("got %v, want 1", updateCount)
2190	}
2191}
2192
2193// TODO(deklerk) this currently does not work because the transaction appears to
2194// get rolled back after a single statement fails. b/120158761
2195func TestBatchDML_Error(t *testing.T) {
2196	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2197	defer cancel()
2198	client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements)
2199	defer cleanup()
2200
2201	columns := []string{"SingerId", "FirstName", "LastName"}
2202
2203	// Populate the Singers table.
2204	var muts []*Mutation
2205	for _, row := range [][]interface{}{
2206		{1, "Umm", "Kulthum"},
2207		{2, "Eduard", "Khil"},
2208		{3, "Audra", "McDonald"},
2209	} {
2210		muts = append(muts, Insert("Singers", columns, row))
2211	}
2212	if _, err := client.Apply(ctx, muts); err != nil {
2213		t.Fatal(err)
2214	}
2215
2216	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2217		counts, err := tx.BatchUpdate(ctx, []Statement{
2218			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2219			{SQL: `some illegal statement`},
2220			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2221		})
2222		if err == nil {
2223			t.Fatal("expected err, got nil")
2224		}
2225		if want := []int64{1}; !testEqual(counts, want) {
2226			t.Fatalf("got %d, want %d", counts, want)
2227		}
2228
2229		got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns))
2230		if err != nil {
2231			t.Fatal(err)
2232		}
2233		want := [][]interface{}{
2234			{int64(1), "changed 1", "Kulthum"},
2235			{int64(2), "Eduard", "Khil"},
2236			{int64(3), "Audra", "McDonald"},
2237		}
2238		if !testEqual(got, want) {
2239			t.Errorf("\ngot %v\nwant%v", got, want)
2240		}
2241
2242		return nil
2243	})
2244	if err != nil {
2245		t.Fatal(err)
2246	}
2247}
2248
2249// Prepare initializes Cloud Spanner testing DB and clients.
2250func prepareIntegrationTest(ctx context.Context, t *testing.T, statements []string) (*Client, string, func()) {
2251	if admin == nil {
2252		t.Skip("Integration tests skipped")
2253	}
2254	// Construct a unique test DB name.
2255	dbName := dbNameSpace.New()
2256
2257	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName)
2258	// Create database and tables.
2259	op, err := admin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{
2260		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
2261		CreateStatement: "CREATE DATABASE " + dbName,
2262		ExtraStatements: statements,
2263	})
2264	if err != nil {
2265		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
2266	}
2267	if _, err := op.Wait(ctx); err != nil {
2268		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
2269	}
2270	client, err := createClient(ctx, dbPath)
2271	if err != nil {
2272		t.Fatalf("cannot create data client on DB %v: %v", dbPath, err)
2273	}
2274	return client, dbPath, func() {
2275		client.Close()
2276		if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil {
2277			t.Logf("failed to drop database %s (error %v), might need a manual removal",
2278				dbPath, err)
2279		}
2280	}
2281}
2282
2283func cleanupDatabases() {
2284	if admin == nil {
2285		// Integration tests skipped.
2286		return
2287	}
2288
2289	ctx := context.Background()
2290	dbsParent := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID)
2291	dbsIter := admin.ListDatabases(ctx, &adminpb.ListDatabasesRequest{Parent: dbsParent})
2292	expireAge := 24 * time.Hour
2293
2294	for {
2295		db, err := dbsIter.Next()
2296		if err == iterator.Done {
2297			break
2298		}
2299		if err != nil {
2300			panic(err)
2301		}
2302		// TODO(deklerk) When we have the ability to programmatically create
2303		// instances, we can create an instance with uid.New and delete all
2304		// tables in it. For now, we rely on matching prefixes.
2305		if dbNameSpace.Older(db.Name, expireAge) {
2306			log.Printf("Dropping database %s", db.Name)
2307
2308			if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: db.Name}); err != nil {
2309				log.Printf("failed to drop database %s (error %v), might need a manual removal",
2310					db.Name, err)
2311			}
2312		}
2313	}
2314}
2315
2316func rangeReads(ctx context.Context, t *testing.T, client *Client) {
2317	checkRange := func(ks KeySet, wantNums ...int) {
2318		if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok {
2319			t.Errorf("key set %+v: %s", ks, msg)
2320		}
2321	}
2322
2323	checkRange(Key{"k1"}, 1)
2324	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4)
2325	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5)
2326	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5)
2327	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4)
2328
2329	// Partial key specification.
2330	checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9)
2331	checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9)
2332	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10)
2333	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11)
2334
2335	// The following produce empty ranges.
2336	// TODO(jba): Consider a multi-part key to illustrate partial key behavior.
2337	// checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen})
2338	// checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen})
2339	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen})
2340	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed})
2341
2342	// Prefix is component-wise, not string prefix.
2343	checkRange(Key{"k1"}.AsPrefix(), 1)
2344	checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
2345
2346	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
2347}
2348
2349func indexRangeReads(ctx context.Context, t *testing.T, client *Client) {
2350	checkRange := func(ks KeySet, wantNums ...int) {
2351		if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns),
2352			wantNums); !ok {
2353			t.Errorf("key set %+v: %s", ks, msg)
2354		}
2355	}
2356
2357	checkRange(Key{"v1"}, 1)
2358	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4)
2359	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5)
2360	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5)
2361	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4)
2362
2363	// // Partial key specification.
2364	checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9)
2365	checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9)
2366	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10)
2367	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11)
2368
2369	// // The following produce empty ranges.
2370	// checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen})
2371	// checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen})
2372	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen})
2373	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed})
2374
2375	// // Prefix is component-wise, not string prefix.
2376	checkRange(Key{"v1"}.AsPrefix(), 1)
2377	checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
2378	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
2379
2380	// Read from an index with DESC ordering.
2381	wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}
2382	if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns),
2383		wantNums); !ok {
2384		t.Errorf("desc: %s", msg)
2385	}
2386}
2387
2388type testTableRow struct{ Key, StringValue string }
2389
2390func compareRows(iter *RowIterator, wantNums []int) (string, bool) {
2391	rows, err := readAllTestTable(iter)
2392	if err != nil {
2393		return err.Error(), false
2394	}
2395	want := map[string]string{}
2396	for _, n := range wantNums {
2397		want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n)
2398	}
2399	got := map[string]string{}
2400	for _, r := range rows {
2401		got[r.Key] = r.StringValue
2402	}
2403	if !testEqual(got, want) {
2404		return fmt.Sprintf("got %v, want %v", got, want), false
2405	}
2406	return "", true
2407}
2408
2409func isNaN(x interface{}) bool {
2410	f, ok := x.(float64)
2411	if !ok {
2412		return false
2413	}
2414	return math.IsNaN(f)
2415}
2416
2417// createClient creates Cloud Spanner data client.
2418func createClient(ctx context.Context, dbPath string) (client *Client, err error) {
2419	client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{
2420		SessionPoolConfig: SessionPoolConfig{WriteSessions: 0.2},
2421	}, option.WithTokenSource(testutil.TokenSource(ctx, Scope)), option.WithEndpoint(endpoint))
2422	if err != nil {
2423		return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
2424	}
2425	return client, nil
2426}
2427
2428// populate prepares the database with some data.
2429func populate(ctx context.Context, client *Client) error {
2430	// Populate data
2431	var err error
2432	m := InsertMap("test", map[string]interface{}{
2433		"a": str1,
2434		"b": str2,
2435	})
2436	_, err = client.Apply(ctx, []*Mutation{m})
2437	return err
2438}
2439
2440func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) {
2441	if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) {
2442		return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false
2443	}
2444	return "", true
2445}
2446
2447func rowToValues(r *Row) ([]interface{}, error) {
2448	var x int64
2449	var y, z string
2450	if err := r.Column(0, &x); err != nil {
2451		return nil, err
2452	}
2453	if err := r.Column(1, &y); err != nil {
2454		return nil, err
2455	}
2456	if err := r.Column(2, &z); err != nil {
2457		return nil, err
2458	}
2459	return []interface{}{x, y, z}, nil
2460}
2461
2462func readAll(iter *RowIterator) ([][]interface{}, error) {
2463	defer iter.Stop()
2464	var vals [][]interface{}
2465	for {
2466		row, err := iter.Next()
2467		if err == iterator.Done {
2468			return vals, nil
2469		}
2470		if err != nil {
2471			return nil, err
2472		}
2473		v, err := rowToValues(row)
2474		if err != nil {
2475			return nil, err
2476		}
2477		vals = append(vals, v)
2478	}
2479}
2480
2481func readAllTestTable(iter *RowIterator) ([]testTableRow, error) {
2482	defer iter.Stop()
2483	var vals []testTableRow
2484	for {
2485		row, err := iter.Next()
2486		if err == iterator.Done {
2487			return vals, nil
2488		}
2489		if err != nil {
2490			return nil, err
2491		}
2492		var ttr testTableRow
2493		if err := row.ToStruct(&ttr); err != nil {
2494			return nil, err
2495		}
2496		vals = append(vals, ttr)
2497	}
2498}
2499