1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"errors"
22	"flag"
23	"fmt"
24	"log"
25	"math"
26	"os"
27	"reflect"
28	"strings"
29	"sync"
30	"testing"
31	"time"
32
33	"cloud.google.com/go/civil"
34	"cloud.google.com/go/internal/testutil"
35	"cloud.google.com/go/internal/uid"
36	database "cloud.google.com/go/spanner/admin/database/apiv1"
37	instance "cloud.google.com/go/spanner/admin/instance/apiv1"
38	"google.golang.org/api/iterator"
39	"google.golang.org/api/option"
40	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
41	instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
42	"google.golang.org/grpc"
43	"google.golang.org/grpc/codes"
44	"google.golang.org/grpc/status"
45)
46
47var (
48	// testProjectID specifies the project used for testing. It can be changed
49	// by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID.
50	testProjectID = testutil.ProjID()
51
52	dbNameSpace       = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
53	instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true})
54	testInstanceID    = instanceNameSpace.New()
55
56	testTable        = "TestTable"
57	testTableIndex   = "TestTableByValue"
58	testTableColumns = []string{"Key", "StringValue"}
59
60	databaseAdmin *database.DatabaseAdminClient
61	instanceAdmin *instance.InstanceAdminClient
62
63	singerDBStatements = []string{
64		`CREATE TABLE Singers (
65				SingerId	INT64 NOT NULL,
66				FirstName	STRING(1024),
67				LastName	STRING(1024),
68				SingerInfo	BYTES(MAX)
69			) PRIMARY KEY (SingerId)`,
70		`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
71		`CREATE TABLE Accounts (
72				AccountId	INT64 NOT NULL,
73				Nickname	STRING(100),
74				Balance		INT64 NOT NULL,
75			) PRIMARY KEY (AccountId)`,
76		`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
77		`CREATE TABLE Types (
78				RowID		INT64 NOT NULL,
79				String		STRING(MAX),
80				StringArray	ARRAY<STRING(MAX)>,
81				Bytes		BYTES(MAX),
82				BytesArray	ARRAY<BYTES(MAX)>,
83				Int64a		INT64,
84				Int64Array	ARRAY<INT64>,
85				Bool		BOOL,
86				BoolArray	ARRAY<BOOL>,
87				Float64		FLOAT64,
88				Float64Array	ARRAY<FLOAT64>,
89				Date		DATE,
90				DateArray	ARRAY<DATE>,
91				Timestamp	TIMESTAMP,
92				TimestampArray	ARRAY<TIMESTAMP>,
93			) PRIMARY KEY (RowID)`,
94	}
95
96	readDBStatements = []string{
97		`CREATE TABLE TestTable (
98                    Key          STRING(MAX) NOT NULL,
99                    StringValue  STRING(MAX)
100            ) PRIMARY KEY (Key)`,
101		`CREATE INDEX TestTableByValue ON TestTable(StringValue)`,
102		`CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`,
103	}
104
105	simpleDBStatements = []string{
106		`CREATE TABLE test (
107				a	STRING(1024),
108				b	STRING(1024),
109			) PRIMARY KEY (a)`,
110	}
111	simpleDBTableColumns = []string{"a", "b"}
112
113	ctsDBStatements = []string{
114		`CREATE TABLE TestTable (
115		    Key  STRING(MAX) NOT NULL,
116		    Ts   TIMESTAMP OPTIONS (allow_commit_timestamp = true),
117	    ) PRIMARY KEY (Key)`,
118	}
119)
120
121const (
122	str1 = "alice"
123	str2 = "a@example.com"
124)
125
126func TestMain(m *testing.M) {
127	cleanup := initIntegrationTests()
128	res := m.Run()
129	cleanup()
130	os.Exit(res)
131}
132
133var grpcHeaderChecker = testutil.DefaultHeadersEnforcer()
134
135func initIntegrationTests() (cleanup func()) {
136	ctx := context.Background()
137	flag.Parse() // Needed for testing.Short().
138	noop := func() {}
139
140	if testing.Short() {
141		log.Println("Integration tests skipped in -short mode.")
142		return noop
143	}
144
145	if testProjectID == "" {
146		log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing")
147		return noop
148	}
149
150	ts := testutil.TokenSource(ctx, AdminScope, Scope)
151	if ts == nil {
152		log.Printf("Integration test skipped: cannot get service account credential from environment variable %v", "GCLOUD_TESTS_GOLANG_KEY")
153		return noop
154	}
155	var err error
156
157	opts := append(grpcHeaderChecker.CallOptions(), option.WithTokenSource(ts), option.WithEndpoint(endpoint))
158
159	// Run integration tests against the given emulator. Currently, the database and
160	// instance admin clients are auto-generated, which do not support to configure
161	// SPANNER_EMULATOR_HOST.
162	emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST")
163	if emulatorAddr != "" {
164		opts = append(
165			grpcHeaderChecker.CallOptions(),
166			option.WithEndpoint(emulatorAddr),
167			option.WithGRPCDialOption(grpc.WithInsecure()),
168			option.WithoutAuthentication(),
169		)
170	}
171
172	// Create InstanceAdmin and DatabaseAdmin clients.
173	instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...)
174	if err != nil {
175		log.Fatalf("cannot create instance databaseAdmin client: %v", err)
176	}
177	databaseAdmin, err = database.NewDatabaseAdminClient(ctx, opts...)
178	if err != nil {
179		log.Fatalf("cannot create databaseAdmin client: %v", err)
180	}
181	// Get the list of supported instance configs for the project that is used
182	// for the integration tests. The supported instance configs can differ per
183	// project. The integration tests will use the first instance config that
184	// is returned by Cloud Spanner. This will normally be the regional config
185	// that is physically the closest to where the request is coming from.
186	configIterator := instanceAdmin.ListInstanceConfigs(ctx, &instancepb.ListInstanceConfigsRequest{
187		Parent: fmt.Sprintf("projects/%s", testProjectID),
188	})
189	config, err := configIterator.Next()
190	if err != nil {
191		log.Fatalf("Cannot get any instance configurations.\nPlease make sure the Cloud Spanner API is enabled for the test project.\nGet error: %v", err)
192	}
193	// Create a test instance to use for this test run.
194	op, err := instanceAdmin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{
195		Parent:     fmt.Sprintf("projects/%s", testProjectID),
196		InstanceId: testInstanceID,
197		Instance: &instancepb.Instance{
198			Config:      config.Name,
199			DisplayName: testInstanceID,
200			NodeCount:   1,
201		},
202	})
203	if err != nil {
204		log.Fatalf("could not create instance with id %s: %v", fmt.Sprintf("projects/%s/instances/%s", testProjectID, testInstanceID), err)
205	}
206	// Wait for the instance creation to finish.
207	i, err := op.Wait(ctx)
208	if err != nil {
209		log.Fatalf("waiting for instance creation to finish failed: %v", err)
210	}
211	if i.State != instancepb.Instance_READY {
212		log.Printf("instance state is not READY, it might be that the test instance will cause problems during tests. Got state %v\n", i.State)
213	}
214
215	return func() {
216		// Delete this test instance.
217		instanceName := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID)
218		if err = instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{
219			Name: instanceName,
220		}); err != nil {
221			log.Printf("failed to drop instance %s (error %v), might need a manual removal",
222				instanceName, err)
223		}
224		// Delete other test instances that may be lingering around.
225		cleanupInstances()
226		databaseAdmin.Close()
227		instanceAdmin.Close()
228	}
229}
230
231func TestIntegration_InitSessionPool(t *testing.T) {
232	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
233	defer cancel()
234	// Set up testing environment.
235	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
236	defer cleanup()
237	sp := client.idleSessions
238	sp.mu.Lock()
239	want := sp.MinOpened
240	sp.mu.Unlock()
241	var numOpened int
242	for {
243		select {
244		case <-ctx.Done():
245			t.Fatalf("timed out, got %d session(s), want %d", numOpened, want)
246		default:
247			sp.mu.Lock()
248			numOpened = sp.idleList.Len() + sp.idleWriteList.Len()
249			sp.mu.Unlock()
250			if uint64(numOpened) == want {
251				return
252			}
253		}
254	}
255}
256
257// Test SingleUse transaction.
258func TestIntegration_SingleUse(t *testing.T) {
259	t.Parallel()
260
261	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
262	defer cancel()
263	// Set up testing environment.
264	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
265	defer cleanup()
266
267	writes := []struct {
268		row []interface{}
269		ts  time.Time
270	}{
271		{row: []interface{}{1, "Marc", "Foo"}},
272		{row: []interface{}{2, "Tars", "Bar"}},
273		{row: []interface{}{3, "Alpha", "Beta"}},
274		{row: []interface{}{4, "Last", "End"}},
275	}
276	// Try to write four rows through the Apply API.
277	for i, w := range writes {
278		var err error
279		m := InsertOrUpdate("Singers",
280			[]string{"SingerId", "FirstName", "LastName"},
281			w.row)
282		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
283			t.Fatal(err)
284		}
285	}
286	// Calculate time difference between Cloud Spanner server and localhost to
287	// use to determine the exact staleness value to use.
288	timeDiff := maxDuration(time.Now().Sub(writes[0].ts), 0)
289
290	// Test reading rows with different timestamp bounds.
291	for i, test := range []struct {
292		name    string
293		want    [][]interface{}
294		tb      TimestampBound
295		checkTs func(time.Time) error
296	}{
297		{
298			name: "strong",
299			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
300			tb:   StrongRead(),
301			checkTs: func(ts time.Time) error {
302				// writes[3] is the last write, all subsequent strong read
303				// should have a timestamp larger than that.
304				if ts.Before(writes[3].ts) {
305					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
306				}
307				return nil
308			},
309		},
310		{
311			name: "min_read_timestamp",
312			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
313			tb:   MinReadTimestamp(writes[3].ts),
314			checkTs: func(ts time.Time) error {
315				if ts.Before(writes[3].ts) {
316					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
317				}
318				return nil
319			},
320		},
321		{
322			name: "max_staleness",
323			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
324			tb:   MaxStaleness(time.Second),
325			checkTs: func(ts time.Time) error {
326				if ts.Before(writes[3].ts) {
327					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
328				}
329				return nil
330			},
331		},
332		{
333			name: "read_timestamp",
334			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
335			tb:   ReadTimestamp(writes[2].ts),
336			checkTs: func(ts time.Time) error {
337				if ts != writes[2].ts {
338					return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts)
339				}
340				return nil
341			},
342		},
343		{
344			name: "exact_staleness",
345			want: nil,
346			// Specify a staleness which should be already before this test.
347			tb: ExactStaleness(time.Now().Sub(writes[0].ts) + timeDiff + 30*time.Second),
348			checkTs: func(ts time.Time) error {
349				if !ts.Before(writes[0].ts) {
350					return fmt.Errorf("read got timestamp %v, want it to be earlier than %v", ts, writes[0].ts)
351				}
352				return nil
353			},
354		},
355	} {
356		t.Run(test.name, func(t *testing.T) {
357			// SingleUse.Query
358			su := client.Single().WithTimestampBound(test.tb)
359			got, err := readAll(su.Query(
360				ctx,
361				Statement{
362					"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
363					map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
364				}))
365			if err != nil {
366				t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err)
367			}
368			rts, err := su.Timestamp()
369			if err != nil {
370				t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err)
371			}
372			if err := test.checkTs(rts); err != nil {
373				t.Fatalf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err)
374			}
375			if !testEqual(got, test.want) {
376				t.Fatalf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want)
377			}
378			// SingleUse.Read
379			su = client.Single().WithTimestampBound(test.tb)
380			got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
381			if err != nil {
382				t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err)
383			}
384			rts, err = su.Timestamp()
385			if err != nil {
386				t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err)
387			}
388			if err := test.checkTs(rts); err != nil {
389				t.Fatalf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err)
390			}
391			if !testEqual(got, test.want) {
392				t.Fatalf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want)
393			}
394			// SingleUse.ReadRow
395			got = nil
396			for _, k := range []Key{{1}, {3}, {4}} {
397				su = client.Single().WithTimestampBound(test.tb)
398				r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
399				if err != nil {
400					continue
401				}
402				v, err := rowToValues(r)
403				if err != nil {
404					continue
405				}
406				got = append(got, v)
407				rts, err = su.Timestamp()
408				if err != nil {
409					t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
410				}
411				if err := test.checkTs(rts); err != nil {
412					t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
413				}
414			}
415			if !testEqual(got, test.want) {
416				t.Fatalf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want)
417			}
418			// SingleUse.ReadUsingIndex
419			su = client.Single().WithTimestampBound(test.tb)
420			got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
421			if err != nil {
422				t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err)
423			}
424			// The results from ReadUsingIndex is sorted by the index rather than primary key.
425			if len(got) != len(test.want) {
426				t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
427			}
428			for j, g := range got {
429				if j > 0 {
430					prev := got[j-1][1].(string) + got[j-1][2].(string)
431					curr := got[j][1].(string) + got[j][2].(string)
432					if strings.Compare(prev, curr) > 0 {
433						t.Fatalf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
434					}
435				}
436				found := false
437				for _, w := range test.want {
438					if testEqual(g, w) {
439						found = true
440					}
441				}
442				if !found {
443					t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
444					break
445				}
446			}
447			rts, err = su.Timestamp()
448			if err != nil {
449				t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
450			}
451			if err := test.checkTs(rts); err != nil {
452				t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
453			}
454			// SingleUse.ReadRowUsingIndex
455			got = nil
456			for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} {
457				su = client.Single().WithTimestampBound(test.tb)
458				r, err := su.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"})
459				if err != nil {
460					continue
461				}
462				v, err := rowToValues(r)
463				if err != nil {
464					continue
465				}
466				got = append(got, v)
467				rts, err = su.Timestamp()
468				if err != nil {
469					t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err)
470				}
471				if err := test.checkTs(rts); err != nil {
472					t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err)
473				}
474			}
475			if !testEqual(got, test.want) {
476				t.Fatalf("%d: got unexpected results from SingleUse.ReadRowUsingIndex: %v, want %v", i, got, test.want)
477			}
478		})
479	}
480}
481
482// Test resource-based routing enabled.
483func TestIntegration_SingleUse_WithResourceBasedRouting(t *testing.T) {
484	os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true")
485	defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "")
486
487	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
488	defer cancel()
489	// Set up testing environment.
490	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
491	defer cleanup()
492
493	writes := []struct {
494		row []interface{}
495		ts  time.Time
496	}{
497		{row: []interface{}{1, "Marc", "Foo"}},
498		{row: []interface{}{2, "Tars", "Bar"}},
499		{row: []interface{}{3, "Alpha", "Beta"}},
500		{row: []interface{}{4, "Last", "End"}},
501	}
502	// Try to write four rows through the Apply API.
503	for i, w := range writes {
504		var err error
505		m := InsertOrUpdate("Singers",
506			[]string{"SingerId", "FirstName", "LastName"},
507			w.row)
508		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
509			t.Fatal(err)
510		}
511	}
512
513	row, err := client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"})
514	if err != nil {
515		t.Errorf("SingleUse.ReadRow returns error %v, want nil", err)
516	}
517	var got string
518	if err := row.Column(0, &got); err != nil {
519		t.Errorf("row.Column returns error %v, want nil", err)
520	}
521	if want := "Alpha"; got != want {
522		t.Errorf("got %q, want %q", got, want)
523	}
524}
525
526func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) {
527	t.Parallel()
528
529	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
530	defer cancel()
531	// Set up testing environment.
532	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
533	defer cleanup()
534
535	writes := []struct {
536		row []interface{}
537		ts  time.Time
538	}{
539		{row: []interface{}{1, "Marc", "Foo"}},
540		{row: []interface{}{2, "Tars", "Bar"}},
541		{row: []interface{}{3, "Alpha", "Beta"}},
542		{row: []interface{}{4, "Last", "End"}},
543	}
544	// Try to write four rows through the Apply API.
545	for i, w := range writes {
546		var err error
547		m := InsertOrUpdate("Singers",
548			[]string{"SingerId", "FirstName", "LastName"},
549			w.row)
550		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
551			t.Fatal(err)
552		}
553	}
554
555	su := client.Single()
556	const limit = 1
557	gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}),
558		[]string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit}))
559	if err != nil {
560		t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err)
561	}
562	if got, want := len(gotRows), limit; got != want {
563		t.Errorf("got %d, want %d", got, want)
564	}
565}
566
567// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it
568// also tests for a single timestamp across multiple reads.
569func TestIntegration_ReadOnlyTransaction(t *testing.T) {
570	t.Parallel()
571
572	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
573	defer cancel()
574	// Set up testing environment.
575	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
576	defer cleanup()
577
578	writes := []struct {
579		row []interface{}
580		ts  time.Time
581	}{
582		{row: []interface{}{1, "Marc", "Foo"}},
583		{row: []interface{}{2, "Tars", "Bar"}},
584		{row: []interface{}{3, "Alpha", "Beta"}},
585		{row: []interface{}{4, "Last", "End"}},
586	}
587	// Try to write four rows through the Apply API.
588	for i, w := range writes {
589		var err error
590		m := InsertOrUpdate("Singers",
591			[]string{"SingerId", "FirstName", "LastName"},
592			w.row)
593		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
594			t.Fatal(err)
595		}
596	}
597
598	// For testing timestamp bound staleness.
599	<-time.After(time.Second)
600
601	// Test reading rows with different timestamp bounds.
602	for i, test := range []struct {
603		want    [][]interface{}
604		tb      TimestampBound
605		checkTs func(time.Time) error
606	}{
607		// Note: min_read_timestamp and max_staleness are not supported by
608		// ReadOnlyTransaction. See API document for more details.
609		{
610			// strong
611			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
612			StrongRead(),
613			func(ts time.Time) error {
614				if ts.Before(writes[3].ts) {
615					return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
616				}
617				return nil
618			},
619		},
620		{
621			// read_timestamp
622			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
623			ReadTimestamp(writes[2].ts),
624			func(ts time.Time) error {
625				if ts != writes[2].ts {
626					return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts)
627				}
628				return nil
629			},
630		},
631		{
632			// exact_staleness
633			nil,
634			// Specify a staleness which should be already before this test
635			// because context timeout is set to be 10s.
636			ExactStaleness(11 * time.Second),
637			func(ts time.Time) error {
638				if ts.After(writes[0].ts) {
639					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts)
640				}
641				return nil
642			},
643		},
644	} {
645		// ReadOnlyTransaction.Query
646		ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb)
647		got, err := readAll(ro.Query(
648			ctx,
649			Statement{
650				"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
651				map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
652			}))
653		if err != nil {
654			t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err)
655		}
656		if !testEqual(got, test.want) {
657			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want)
658		}
659		rts, err := ro.Timestamp()
660		if err != nil {
661			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err)
662		}
663		if err := test.checkTs(rts); err != nil {
664			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err)
665		}
666		roTs := rts
667		// ReadOnlyTransaction.Read
668		got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
669		if err != nil {
670			t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err)
671		}
672		if !testEqual(got, test.want) {
673			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want)
674		}
675		rts, err = ro.Timestamp()
676		if err != nil {
677			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err)
678		}
679		if err := test.checkTs(rts); err != nil {
680			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err)
681		}
682		if roTs != rts {
683			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
684		}
685		// ReadOnlyTransaction.ReadRow
686		got = nil
687		for _, k := range []Key{{1}, {3}, {4}} {
688			r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
689			if err != nil {
690				continue
691			}
692			v, err := rowToValues(r)
693			if err != nil {
694				continue
695			}
696			got = append(got, v)
697			rts, err = ro.Timestamp()
698			if err != nil {
699				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
700			}
701			if err := test.checkTs(rts); err != nil {
702				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
703			}
704			if roTs != rts {
705				t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
706			}
707		}
708		if !testEqual(got, test.want) {
709			t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want)
710		}
711		// SingleUse.ReadUsingIndex
712		got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
713		if err != nil {
714			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err)
715		}
716		// The results from ReadUsingIndex is sorted by the index rather than
717		// primary key.
718		if len(got) != len(test.want) {
719			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
720		}
721		for j, g := range got {
722			if j > 0 {
723				prev := got[j-1][1].(string) + got[j-1][2].(string)
724				curr := got[j][1].(string) + got[j][2].(string)
725				if strings.Compare(prev, curr) > 0 {
726					t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
727				}
728			}
729			found := false
730			for _, w := range test.want {
731				if testEqual(g, w) {
732					found = true
733				}
734			}
735			if !found {
736				t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
737				break
738			}
739		}
740		rts, err = ro.Timestamp()
741		if err != nil {
742			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
743		}
744		if err := test.checkTs(rts); err != nil {
745			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
746		}
747		if roTs != rts {
748			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
749		}
750		// ReadOnlyTransaction.ReadRowUsingIndex
751		got = nil
752		for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} {
753			r, err := ro.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"})
754			if err != nil {
755				continue
756			}
757			v, err := rowToValues(r)
758			if err != nil {
759				continue
760			}
761			got = append(got, v)
762			rts, err = ro.Timestamp()
763			if err != nil {
764				t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err)
765			}
766			if err := test.checkTs(rts); err != nil {
767				t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err)
768			}
769			if roTs != rts {
770				t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
771			}
772		}
773		if !testEqual(got, test.want) {
774			t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRowUsingIndex: %v, want %v", i, got, test.want)
775		}
776		ro.Close()
777	}
778}
779
780// Test ReadOnlyTransaction with different timestamp bound when there's an
781// update at the same time.
782func TestIntegration_UpdateDuringRead(t *testing.T) {
783	t.Parallel()
784
785	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
786	defer cancel()
787	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
788	defer cleanup()
789
790	for i, tb := range []TimestampBound{
791		StrongRead(),
792		ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour
793		ExactStaleness(time.Minute * 30),
794	} {
795		ro := client.ReadOnlyTransaction().WithTimestampBound(tb)
796		_, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
797		if ErrCode(err) != codes.NotFound {
798			t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err)
799		}
800
801		m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i})
802		if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
803			t.Fatal(err)
804		}
805
806		_, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
807		if ErrCode(err) != codes.NotFound {
808			t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err)
809		}
810	}
811}
812
813// Test ReadWriteTransaction.
814func TestIntegration_ReadWriteTransaction(t *testing.T) {
815	t.Parallel()
816
817	// Give a longer deadline because of transaction backoffs.
818	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
819	defer cancel()
820	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
821	defer cleanup()
822
823	// Set up two accounts
824	accounts := []*Mutation{
825		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
826		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
827	}
828	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
829		t.Fatal(err)
830	}
831	wg := sync.WaitGroup{}
832
833	readBalance := func(iter *RowIterator) (int64, error) {
834		defer iter.Stop()
835		var bal int64
836		for {
837			row, err := iter.Next()
838			if err == iterator.Done {
839				return bal, nil
840			}
841			if err != nil {
842				return 0, err
843			}
844			if err := row.Column(0, &bal); err != nil {
845				return 0, err
846			}
847		}
848	}
849
850	for i := 0; i < 20; i++ {
851		wg.Add(1)
852		go func(iter int) {
853			defer wg.Done()
854			_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
855				// Query Foo's balance and Bar's balance.
856				bf, e := readBalance(tx.Query(ctx,
857					Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}}))
858				if e != nil {
859					return e
860				}
861				bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
862				if e != nil {
863					return e
864				}
865				if bf <= 0 {
866					return nil
867				}
868				bf--
869				bb++
870				return tx.BufferWrite([]*Mutation{
871					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}),
872					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}),
873				})
874			})
875			if err != nil {
876				t.Errorf("%d: failed to execute transaction: %v", iter, err)
877			}
878		}(i)
879	}
880	// Because of context timeout, all goroutines will eventually return.
881	wg.Wait()
882	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
883		var bf, bb int64
884		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"})
885		if e != nil {
886			return e
887		}
888		if ce := r.Column(0, &bf); ce != nil {
889			return ce
890		}
891		bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"}))
892		if e != nil {
893			return e
894		}
895		if bf != 30 || bb != 21 {
896			t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21)
897		}
898		return nil
899	})
900	if err != nil {
901		t.Errorf("failed to check balances: %v", err)
902	}
903}
904
905func TestIntegration_Reads(t *testing.T) {
906	t.Parallel()
907
908	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
909	defer cancel()
910	// Set up testing environment.
911	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
912	defer cleanup()
913
914	// Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2".
915	var ms []*Mutation
916	for i := 0; i < 15; i++ {
917		ms = append(ms, InsertOrUpdate(testTable,
918			testTableColumns,
919			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
920	}
921	// Don't use ApplyAtLeastOnce, so we can test the other code path.
922	if _, err := client.Apply(ctx, ms); err != nil {
923		t.Fatal(err)
924	}
925
926	// Empty read.
927	rows, err := readAllTestTable(client.Single().Read(ctx, testTable,
928		KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns))
929	if err != nil {
930		t.Fatal(err)
931	}
932	if got, want := len(rows), 0; got != want {
933		t.Errorf("got %d, want %d", got, want)
934	}
935
936	// Index empty read.
937	rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex,
938		KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns))
939	if err != nil {
940		t.Fatal(err)
941	}
942	if got, want := len(rows), 0; got != want {
943		t.Errorf("got %d, want %d", got, want)
944	}
945
946	// Point read.
947	row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns)
948	if err != nil {
949		t.Fatal(err)
950	}
951	var got testTableRow
952	if err := row.ToStruct(&got); err != nil {
953		t.Fatal(err)
954	}
955	if want := (testTableRow{"k1", "v1"}); got != want {
956		t.Errorf("got %v, want %v", got, want)
957	}
958
959	// Point read not found.
960	_, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns)
961	if ErrCode(err) != codes.NotFound {
962		t.Fatalf("got %v, want NotFound", err)
963	}
964
965	// Index point read.
966	rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns)
967	if err != nil {
968		t.Fatal(err)
969	}
970	var gotIndex testTableRow
971	if err := rowIndex.ToStruct(&gotIndex); err != nil {
972		t.Fatal(err)
973	}
974	if wantIndex := (testTableRow{"k1", "v1"}); gotIndex != wantIndex {
975		t.Errorf("got %v, want %v", gotIndex, wantIndex)
976	}
977	// Index point read not found.
978	_, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v999"}, testTableColumns)
979	if ErrCode(err) != codes.NotFound {
980		t.Fatalf("got %v, want NotFound", err)
981	}
982	rangeReads(ctx, t, client)
983	indexRangeReads(ctx, t, client)
984}
985
986func TestIntegration_EarlyTimestamp(t *testing.T) {
987	t.Parallel()
988
989	// Test that we can get the timestamp from a read-only transaction as
990	// soon as we have read at least one row.
991	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
992	defer cancel()
993	// Set up testing environment.
994	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
995	defer cleanup()
996
997	var ms []*Mutation
998	for i := 0; i < 3; i++ {
999		ms = append(ms, InsertOrUpdate(testTable,
1000			testTableColumns,
1001			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
1002	}
1003	if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
1004		t.Fatal(err)
1005	}
1006
1007	txn := client.Single()
1008	iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns)
1009	defer iter.Stop()
1010	// In single-use transaction, we should get an error before reading anything.
1011	if _, err := txn.Timestamp(); err == nil {
1012		t.Error("wanted error, got nil")
1013	}
1014	// After reading one row, the timestamp should be available.
1015	_, err := iter.Next()
1016	if err != nil {
1017		t.Fatal(err)
1018	}
1019	if _, err := txn.Timestamp(); err != nil {
1020		t.Errorf("got %v, want nil", err)
1021	}
1022
1023	txn = client.ReadOnlyTransaction()
1024	defer txn.Close()
1025	iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns)
1026	defer iter.Stop()
1027	// In an ordinary read-only transaction, the timestamp should be
1028	// available immediately.
1029	if _, err := txn.Timestamp(); err != nil {
1030		t.Errorf("got %v, want nil", err)
1031	}
1032}
1033
1034func TestIntegration_NestedTransaction(t *testing.T) {
1035	t.Parallel()
1036
1037	// You cannot use a transaction from inside a read-write transaction.
1038	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1039	defer cancel()
1040	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1041	defer cleanup()
1042
1043	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1044		_, err := client.ReadWriteTransaction(ctx,
1045			func(context.Context, *ReadWriteTransaction) error { return nil })
1046		if ErrCode(err) != codes.FailedPrecondition {
1047			t.Fatalf("got %v, want FailedPrecondition", err)
1048		}
1049		_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
1050		if ErrCode(err) != codes.FailedPrecondition {
1051			t.Fatalf("got %v, want FailedPrecondition", err)
1052		}
1053		rot := client.ReadOnlyTransaction()
1054		defer rot.Close()
1055		_, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
1056		if ErrCode(err) != codes.FailedPrecondition {
1057			t.Fatalf("got %v, want FailedPrecondition", err)
1058		}
1059		return nil
1060	})
1061	if err != nil {
1062		t.Fatal(err)
1063	}
1064}
1065
1066// Test client recovery on database recreation.
1067func TestIntegration_DbRemovalRecovery(t *testing.T) {
1068	t.Parallel()
1069
1070	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1071	defer cancel()
1072	// Create a client with MinOpened=0 to prevent the session pool maintainer
1073	// from repeatedly trying to create sessions for the invalid database.
1074	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, SessionPoolConfig{}, singerDBStatements)
1075	defer cleanup()
1076
1077	// Drop the testing database.
1078	if err := databaseAdmin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil {
1079		t.Fatalf("failed to drop testing database %v: %v", dbPath, err)
1080	}
1081
1082	// Now, send the query.
1083	iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
1084	defer iter.Stop()
1085	if _, err := iter.Next(); err == nil {
1086		t.Errorf("client sends query to removed database successfully, want it to fail")
1087	}
1088
1089	// Recreate database and table.
1090	dbName := dbPath[strings.LastIndex(dbPath, "/")+1:]
1091	op, err := databaseAdmin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{
1092		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
1093		CreateStatement: "CREATE DATABASE " + dbName,
1094		ExtraStatements: []string{
1095			`CREATE TABLE Singers (
1096				SingerId        INT64 NOT NULL,
1097				FirstName       STRING(1024),
1098				LastName        STRING(1024),
1099				SingerInfo      BYTES(MAX)
1100			) PRIMARY KEY (SingerId)`,
1101		},
1102	})
1103	if err != nil {
1104		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
1105	}
1106	if _, err := op.Wait(ctx); err != nil {
1107		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
1108	}
1109
1110	// Now, send the query again.
1111	iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
1112	defer iter.Stop()
1113	_, err = iter.Next()
1114	if err != nil && err != iterator.Done {
1115		t.Errorf("failed to send query to database %v: %v", dbPath, err)
1116	}
1117}
1118
1119// Test encoding/decoding non-struct Cloud Spanner types.
1120func TestIntegration_BasicTypes(t *testing.T) {
1121	t.Parallel()
1122
1123	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1124	defer cancel()
1125	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1126	defer cleanup()
1127
1128	t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z")
1129	// Boundaries
1130	t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z")
1131	t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z")
1132	d1, _ := civil.ParseDate("2016-11-15")
1133	// Boundaries
1134	d2, _ := civil.ParseDate("0001-01-01")
1135	d3, _ := civil.ParseDate("9999-12-31")
1136
1137	tests := []struct {
1138		col  string
1139		val  interface{}
1140		want interface{}
1141	}{
1142		{col: "String", val: ""},
1143		{col: "String", val: "", want: NullString{"", true}},
1144		{col: "String", val: "foo"},
1145		{col: "String", val: "foo", want: NullString{"foo", true}},
1146		{col: "String", val: NullString{"bar", true}, want: "bar"},
1147		{col: "String", val: NullString{"bar", false}, want: NullString{"", false}},
1148		{col: "String", val: nil, want: NullString{}},
1149		{col: "StringArray", val: []string(nil), want: []NullString(nil)},
1150		{col: "StringArray", val: []string{}, want: []NullString{}},
1151		{col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}},
1152		{col: "StringArray", val: []NullString(nil)},
1153		{col: "StringArray", val: []NullString{}},
1154		{col: "StringArray", val: []NullString{{"foo", true}, {}}},
1155		{col: "Bytes", val: []byte{}},
1156		{col: "Bytes", val: []byte{1, 2, 3}},
1157		{col: "Bytes", val: []byte(nil)},
1158		{col: "BytesArray", val: [][]byte(nil)},
1159		{col: "BytesArray", val: [][]byte{}},
1160		{col: "BytesArray", val: [][]byte{{1}, {2, 3}}},
1161		{col: "Int64a", val: 0, want: int64(0)},
1162		{col: "Int64a", val: -1, want: int64(-1)},
1163		{col: "Int64a", val: 2, want: int64(2)},
1164		{col: "Int64a", val: int64(3)},
1165		{col: "Int64a", val: 4, want: NullInt64{4, true}},
1166		{col: "Int64a", val: NullInt64{5, true}, want: int64(5)},
1167		{col: "Int64a", val: NullInt64{6, true}, want: int64(6)},
1168		{col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}},
1169		{col: "Int64a", val: nil, want: NullInt64{}},
1170		{col: "Int64Array", val: []int(nil), want: []NullInt64(nil)},
1171		{col: "Int64Array", val: []int{}, want: []NullInt64{}},
1172		{col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
1173		{col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)},
1174		{col: "Int64Array", val: []int64{}, want: []NullInt64{}},
1175		{col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
1176		{col: "Int64Array", val: []NullInt64(nil)},
1177		{col: "Int64Array", val: []NullInt64{}},
1178		{col: "Int64Array", val: []NullInt64{{1, true}, {}}},
1179		{col: "Bool", val: false},
1180		{col: "Bool", val: true},
1181		{col: "Bool", val: false, want: NullBool{false, true}},
1182		{col: "Bool", val: true, want: NullBool{true, true}},
1183		{col: "Bool", val: NullBool{true, true}},
1184		{col: "Bool", val: NullBool{false, false}},
1185		{col: "Bool", val: nil, want: NullBool{}},
1186		{col: "BoolArray", val: []bool(nil), want: []NullBool(nil)},
1187		{col: "BoolArray", val: []bool{}, want: []NullBool{}},
1188		{col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}},
1189		{col: "BoolArray", val: []NullBool(nil)},
1190		{col: "BoolArray", val: []NullBool{}},
1191		{col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}},
1192		{col: "Float64", val: 0.0},
1193		{col: "Float64", val: 3.14},
1194		{col: "Float64", val: math.NaN()},
1195		{col: "Float64", val: math.Inf(1)},
1196		{col: "Float64", val: math.Inf(-1)},
1197		{col: "Float64", val: 2.78, want: NullFloat64{2.78, true}},
1198		{col: "Float64", val: NullFloat64{2.71, true}, want: 2.71},
1199		{col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}},
1200		{col: "Float64", val: NullFloat64{0, false}},
1201		{col: "Float64", val: nil, want: NullFloat64{}},
1202		{col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)},
1203		{col: "Float64Array", val: []float64{}, want: []NullFloat64{}},
1204		{col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}},
1205		{col: "Float64Array", val: []NullFloat64(nil)},
1206		{col: "Float64Array", val: []NullFloat64{}},
1207		{col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}},
1208		{col: "Date", val: d1},
1209		{col: "Date", val: d1, want: NullDate{d1, true}},
1210		{col: "Date", val: NullDate{d1, true}},
1211		{col: "Date", val: NullDate{d1, true}, want: d1},
1212		{col: "Date", val: NullDate{civil.Date{}, false}},
1213		{col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)},
1214		{col: "DateArray", val: []civil.Date{}, want: []NullDate{}},
1215		{col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}},
1216		{col: "Timestamp", val: t1},
1217		{col: "Timestamp", val: t1, want: NullTime{t1, true}},
1218		{col: "Timestamp", val: NullTime{t1, true}},
1219		{col: "Timestamp", val: NullTime{t1, true}, want: t1},
1220		{col: "Timestamp", val: NullTime{}},
1221		{col: "Timestamp", val: nil, want: NullTime{}},
1222		{col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)},
1223		{col: "TimestampArray", val: []time.Time{}, want: []NullTime{}},
1224		{col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}},
1225	}
1226
1227	// Write rows into table first.
1228	var muts []*Mutation
1229	for i, test := range tests {
1230		muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val}))
1231	}
1232	if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil {
1233		t.Fatal(err)
1234	}
1235
1236	for i, test := range tests {
1237		row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col})
1238		if err != nil {
1239			t.Fatalf("Unable to fetch row %v: %v", i, err)
1240		}
1241		// Create new instance of type of test.want.
1242		want := test.want
1243		if want == nil {
1244			want = test.val
1245		}
1246		gotp := reflect.New(reflect.TypeOf(want))
1247		if err := row.Column(0, gotp.Interface()); err != nil {
1248			t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err)
1249			continue
1250		}
1251		got := reflect.Indirect(gotp).Interface()
1252
1253		// One of the test cases is checking NaN handling.  Given
1254		// NaN!=NaN, we can't use reflect to test for it.
1255		if isNaN(got) && isNaN(want) {
1256			continue
1257		}
1258
1259		// Check non-NaN cases.
1260		if !testEqual(got, want) {
1261			t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want)
1262			continue
1263		}
1264	}
1265}
1266
1267// Test decoding Cloud Spanner STRUCT type.
1268func TestIntegration_StructTypes(t *testing.T) {
1269	t.Parallel()
1270
1271	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1272	defer cancel()
1273	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1274	defer cleanup()
1275
1276	tests := []struct {
1277		q    Statement
1278		want func(r *Row) error
1279	}{
1280		{
1281			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`},
1282			want: func(r *Row) error {
1283				// Test STRUCT ARRAY decoding to []NullRow.
1284				var rows []NullRow
1285				if err := r.Column(0, &rows); err != nil {
1286					return err
1287				}
1288				if len(rows) != 1 {
1289					return fmt.Errorf("len(rows) = %d; want 1", len(rows))
1290				}
1291				if !rows[0].Valid {
1292					return fmt.Errorf("rows[0] is NULL")
1293				}
1294				var i, j int64
1295				if err := rows[0].Row.Columns(&i, &j); err != nil {
1296					return err
1297				}
1298				if i != 1 || j != 2 {
1299					return fmt.Errorf("got (%d,%d), want (1,2)", i, j)
1300				}
1301				return nil
1302			},
1303		},
1304		{
1305			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`},
1306			want: func(r *Row) error {
1307				// Test Row.ToStruct.
1308				s := struct {
1309					Col1 []*struct {
1310						Foo int64 `spanner:"foo"`
1311						Bar int64 `spanner:"bar"`
1312					} `spanner:"col1"`
1313				}{}
1314				if err := r.ToStruct(&s); err != nil {
1315					return err
1316				}
1317				want := struct {
1318					Col1 []*struct {
1319						Foo int64 `spanner:"foo"`
1320						Bar int64 `spanner:"bar"`
1321					} `spanner:"col1"`
1322				}{
1323					Col1: []*struct {
1324						Foo int64 `spanner:"foo"`
1325						Bar int64 `spanner:"bar"`
1326					}{
1327						{
1328							Foo: 1,
1329							Bar: 2,
1330						},
1331					},
1332				}
1333				if !testEqual(want, s) {
1334					return fmt.Errorf("unexpected decoding result: %v, want %v", s, want)
1335				}
1336				return nil
1337			},
1338		},
1339	}
1340	for i, test := range tests {
1341		iter := client.Single().Query(ctx, test.q)
1342		defer iter.Stop()
1343		row, err := iter.Next()
1344		if err != nil {
1345			t.Errorf("%d: %v", i, err)
1346			continue
1347		}
1348		if err := test.want(row); err != nil {
1349			t.Errorf("%d: %v", i, err)
1350			continue
1351		}
1352	}
1353}
1354
1355func TestIntegration_StructParametersUnsupported(t *testing.T) {
1356	skipEmulatorTest(t)
1357	t.Parallel()
1358
1359	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1360	defer cancel()
1361	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
1362	defer cleanup()
1363
1364	for _, test := range []struct {
1365		param       interface{}
1366		wantCode    codes.Code
1367		wantMsgPart string
1368	}{
1369		{
1370			struct {
1371				Field int
1372			}{10},
1373			codes.Unimplemented,
1374			"Unsupported query shape: " +
1375				"A struct value cannot be returned as a column value. " +
1376				"Rewrite the query to flatten the struct fields in the result.",
1377		},
1378		{
1379			[]struct {
1380				Field int
1381			}{{10}, {20}},
1382			codes.Unimplemented,
1383			"Unsupported query shape: " +
1384				"This query can return a null-valued array of struct, " +
1385				"which is not supported by Spanner.",
1386		},
1387	} {
1388		iter := client.Single().Query(ctx, Statement{
1389			SQL:    "SELECT @p",
1390			Params: map[string]interface{}{"p": test.param},
1391		})
1392		_, err := iter.Next()
1393		iter.Stop()
1394		if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok {
1395			t.Fatal(msg)
1396		}
1397	}
1398}
1399
1400// Test queries of the form "SELECT expr".
1401func TestIntegration_QueryExpressions(t *testing.T) {
1402	t.Parallel()
1403
1404	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1405	defer cancel()
1406	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
1407	defer cleanup()
1408
1409	newRow := func(vals []interface{}) *Row {
1410		row, err := NewRow(make([]string, len(vals)), vals)
1411		if err != nil {
1412			t.Fatal(err)
1413		}
1414		return row
1415	}
1416
1417	tests := []struct {
1418		expr string
1419		want interface{}
1420	}{
1421		{"1", int64(1)},
1422		{"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}},
1423		{"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}},
1424		{"IEEE_DIVIDE(1, 0)", math.Inf(1)},
1425		{"IEEE_DIVIDE(-1, 0)", math.Inf(-1)},
1426		{"IEEE_DIVIDE(0, 0)", math.NaN()},
1427		// TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate.
1428		{"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}},
1429		{"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}},
1430		{"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}},
1431	}
1432	for _, test := range tests {
1433		iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr})
1434		defer iter.Stop()
1435		row, err := iter.Next()
1436		if err != nil {
1437			t.Errorf("%q: %v", test.expr, err)
1438			continue
1439		}
1440		// Create new instance of type of test.want.
1441		gotp := reflect.New(reflect.TypeOf(test.want))
1442		if err := row.Column(0, gotp.Interface()); err != nil {
1443			t.Errorf("%q: Column returned error %v", test.expr, err)
1444			continue
1445		}
1446		got := reflect.Indirect(gotp).Interface()
1447		// TODO(jba): remove isNaN special case when we have a better equality predicate.
1448		if isNaN(got) && isNaN(test.want) {
1449			continue
1450		}
1451		if !testEqual(got, test.want) {
1452			t.Errorf("%q\n got  %#v\nwant %#v", test.expr, got, test.want)
1453		}
1454	}
1455}
1456
1457func TestIntegration_QueryStats(t *testing.T) {
1458	skipEmulatorTest(t)
1459	t.Parallel()
1460
1461	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1462	defer cancel()
1463	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1464	defer cleanup()
1465
1466	accounts := []*Mutation{
1467		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1468		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1469	}
1470	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1471		t.Fatal(err)
1472	}
1473	const sql = "SELECT Balance FROM Accounts"
1474
1475	qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil})
1476	if err != nil {
1477		t.Fatal(err)
1478	}
1479	if len(qp.PlanNodes) == 0 {
1480		t.Error("got zero plan nodes, expected at least one")
1481	}
1482
1483	iter := client.Single().QueryWithStats(ctx, Statement{sql, nil})
1484	defer iter.Stop()
1485	for {
1486		_, err := iter.Next()
1487		if err == iterator.Done {
1488			break
1489		}
1490		if err != nil {
1491			t.Fatal(err)
1492		}
1493	}
1494	if iter.QueryPlan == nil {
1495		t.Error("got nil QueryPlan, expected one")
1496	}
1497	if iter.QueryStats == nil {
1498		t.Error("got nil QueryStats, expected some")
1499	}
1500}
1501
1502func TestIntegration_InvalidDatabase(t *testing.T) {
1503	t.Parallel()
1504
1505	if databaseAdmin == nil {
1506		t.Skip("Integration tests skipped")
1507	}
1508	ctx := context.Background()
1509	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID)
1510	c, err := createClient(ctx, dbPath, SessionPoolConfig{})
1511	// Client creation should succeed even if the database is invalid.
1512	if err != nil {
1513		t.Fatal(err)
1514	}
1515	_, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"})
1516	if msg, ok := matchError(err, codes.NotFound, ""); !ok {
1517		t.Fatal(msg)
1518	}
1519}
1520
1521func TestIntegration_ReadErrors(t *testing.T) {
1522	t.Parallel()
1523
1524	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1525	defer cancel()
1526	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
1527	defer cleanup()
1528
1529	var ms []*Mutation
1530	for i := 0; i < 2; i++ {
1531		ms = append(ms, InsertOrUpdate(testTable,
1532			testTableColumns,
1533			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v")}))
1534	}
1535	if _, err := client.Apply(ctx, ms); err != nil {
1536		t.Fatal(err)
1537	}
1538
1539	// Read over invalid table fails
1540	_, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"})
1541	if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok {
1542		t.Error(msg)
1543	}
1544	// Read over invalid column fails
1545	_, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"})
1546	if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok {
1547		t.Error(msg)
1548	}
1549
1550	// Invalid query fails
1551	iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"})
1552	defer iter.Stop()
1553	_, err = iter.Next()
1554	if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok {
1555		t.Error(msg)
1556	}
1557
1558	// Read should fail on cancellation.
1559	cctx, cancel := context.WithCancel(ctx)
1560	cancel()
1561	_, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"})
1562	if msg, ok := matchError(err, codes.Canceled, ""); !ok {
1563		t.Error(msg)
1564	}
1565	// Read should fail if deadline exceeded.
1566	dctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
1567	defer cancel()
1568	<-dctx.Done()
1569	_, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"})
1570	if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok {
1571		t.Error(msg)
1572	}
1573	// Read should fail if there are multiple rows returned.
1574	_, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v"}, testTableColumns)
1575	wantMsgPart := fmt.Sprintf("more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", testTable, Key{"v"}, testTableIndex)
1576	if msg, ok := matchError(err, codes.FailedPrecondition, wantMsgPart); !ok {
1577		t.Error(msg)
1578	}
1579}
1580
1581// Test TransactionRunner. Test that transactions are aborted and retried as
1582// expected.
1583func TestIntegration_TransactionRunner(t *testing.T) {
1584	skipEmulatorTest(t)
1585	t.Parallel()
1586
1587	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1588	defer cancel()
1589	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1590	defer cleanup()
1591
1592	// Test 1: User error should abort the transaction.
1593	_, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1594		tx.BufferWrite([]*Mutation{
1595			Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})})
1596		return errors.New("user error")
1597	})
1598	// Empty read.
1599	rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"}))
1600	if err != nil {
1601		t.Fatal(err)
1602	}
1603	if got, want := len(rows), 0; got != want {
1604		t.Errorf("Empty read, got %d, want %d.", got, want)
1605	}
1606
1607	// Test 2: Expect abort and retry.
1608	// We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by
1609	// committing writes to the column txn2 have read, and expect the following
1610	// read to abort and txn2 retries.
1611
1612	// Set up two accounts
1613	accounts := []*Mutation{
1614		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}),
1615		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}),
1616	}
1617	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1618		t.Fatal(err)
1619	}
1620
1621	var (
1622		cTxn1Start  = make(chan struct{})
1623		cTxn1Commit = make(chan struct{})
1624		cTxn2Start  = make(chan struct{})
1625		wg          sync.WaitGroup
1626	)
1627
1628	// read balance, check error if we don't expect abort.
1629	readBalance := func(tx interface {
1630		ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error)
1631	}, key int64, expectAbort bool) (int64, error) {
1632		var b int64
1633		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"})
1634		if e != nil {
1635			if expectAbort && !isAbortErr(e) {
1636				t.Errorf("ReadRow got %v, want Abort error.", e)
1637			}
1638			return b, e
1639		}
1640		if ce := r.Column(0, &b); ce != nil {
1641			return b, ce
1642		}
1643		return b, nil
1644	}
1645
1646	wg.Add(2)
1647	// Txn 1
1648	go func() {
1649		defer wg.Done()
1650		var once sync.Once
1651		_, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1652			b, e := readBalance(tx, 1, false)
1653			if e != nil {
1654				return e
1655			}
1656			// txn 1 can abort, in that case we skip closing the channel on
1657			// retry.
1658			once.Do(func() { close(cTxn1Start) })
1659			e = tx.BufferWrite([]*Mutation{
1660				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})})
1661			if e != nil {
1662				return e
1663			}
1664			// Wait for second transaction.
1665			<-cTxn2Start
1666			return nil
1667		})
1668		close(cTxn1Commit)
1669		if e != nil {
1670			t.Errorf("Transaction 1 commit, got %v, want nil.", e)
1671		}
1672	}()
1673	// Txn 2
1674	go func() {
1675		// Wait until txn 1 starts.
1676		<-cTxn1Start
1677		defer wg.Done()
1678		var (
1679			once sync.Once
1680			b1   int64
1681			b2   int64
1682			e    error
1683		)
1684		_, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1685			if b1, e = readBalance(tx, 1, false); e != nil {
1686				return e
1687			}
1688			// Skip closing channel on retry.
1689			once.Do(func() { close(cTxn2Start) })
1690			// Wait until txn 1 successfully commits.
1691			<-cTxn1Commit
1692			// Txn1 has committed and written a balance to the account. Now this
1693			// transaction (txn2) reads and re-writes the balance. The first
1694			// time through, it will abort because it overlaps with txn1. Then
1695			// it will retry after txn1 commits, and succeed.
1696			if b2, e = readBalance(tx, 2, true); e != nil {
1697				return e
1698			}
1699			return tx.BufferWrite([]*Mutation{
1700				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})})
1701		})
1702		if e != nil {
1703			t.Errorf("Transaction 2 commit, got %v, want nil.", e)
1704		}
1705	}()
1706	wg.Wait()
1707	// Check that both transactions' effects are visible.
1708	for i := int64(1); i <= int64(2); i++ {
1709		if b, e := readBalance(client.Single(), i, false); e != nil {
1710			t.Fatalf("ReadBalance for key %d error %v.", i, e)
1711		} else if b != i {
1712			t.Errorf("Balance for key %d, got %d, want %d.", i, b, i)
1713		}
1714	}
1715}
1716
1717// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then
1718// serialize and deserialize both transaction and partition to be used in
1719// execution on another client, and compare results.
1720func TestIntegration_BatchQuery(t *testing.T) {
1721	skipEmulatorTest(t)
1722	t.Parallel()
1723
1724	// Set up testing environment.
1725	var (
1726		client2 *Client
1727		err     error
1728	)
1729	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1730	defer cancel()
1731	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
1732	defer cleanup()
1733
1734	if err = populate(ctx, client); err != nil {
1735		t.Fatal(err)
1736	}
1737	if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
1738		t.Fatal(err)
1739	}
1740	defer client2.Close()
1741
1742	// PartitionQuery
1743	var (
1744		txn        *BatchReadOnlyTransaction
1745		partitions []*Partition
1746		stmt       = Statement{SQL: "SELECT * FROM test;"}
1747	)
1748
1749	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1750		t.Fatal(err)
1751	}
1752	defer txn.Cleanup(ctx)
1753	if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil {
1754		t.Fatal(err)
1755	}
1756
1757	// Reconstruct BatchReadOnlyTransactionID and execute partitions
1758	var (
1759		tid2      BatchReadOnlyTransactionID
1760		data      []byte
1761		gotResult bool // if we get matching result from two separate txns
1762	)
1763	if data, err = txn.ID.MarshalBinary(); err != nil {
1764		t.Fatalf("encoding failed %v", err)
1765	}
1766	if err = tid2.UnmarshalBinary(data); err != nil {
1767		t.Fatalf("decoding failed %v", err)
1768	}
1769	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
1770
1771	// Execute Partitions and compare results
1772	for i, p := range partitions {
1773		iter := txn.Execute(ctx, p)
1774		defer iter.Stop()
1775		p2 := serdesPartition(t, i, p)
1776		iter2 := txn2.Execute(ctx, &p2)
1777		defer iter2.Stop()
1778
1779		row1, err1 := iter.Next()
1780		row2, err2 := iter2.Next()
1781		if err1 != err2 {
1782			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
1783			continue
1784		}
1785		if !testEqual(row1, row2) {
1786			t.Fatalf("execution returned different values: %v, %v", row1, row2)
1787			continue
1788		}
1789		if row1 == nil {
1790			continue
1791		}
1792		var a, b string
1793		if err = row1.Columns(&a, &b); err != nil {
1794			t.Fatalf("failed to parse row %v", err)
1795			continue
1796		}
1797		if a == str1 && b == str2 {
1798			gotResult = true
1799		}
1800	}
1801	if !gotResult {
1802		t.Fatalf("execution didn't return expected values")
1803	}
1804}
1805
1806// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery
1807func TestIntegration_BatchRead(t *testing.T) {
1808	skipEmulatorTest(t)
1809	t.Parallel()
1810
1811	// Set up testing environment.
1812	var (
1813		client2 *Client
1814		err     error
1815	)
1816	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1817	defer cancel()
1818	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
1819	defer cleanup()
1820
1821	if err = populate(ctx, client); err != nil {
1822		t.Fatal(err)
1823	}
1824	if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
1825		t.Fatal(err)
1826	}
1827	defer client2.Close()
1828
1829	// PartitionRead
1830	var (
1831		txn        *BatchReadOnlyTransaction
1832		partitions []*Partition
1833	)
1834
1835	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1836		t.Fatal(err)
1837	}
1838	defer txn.Cleanup(ctx)
1839	if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
1840		t.Fatal(err)
1841	}
1842
1843	// Reconstruct BatchReadOnlyTransactionID and execute partitions.
1844	var (
1845		tid2      BatchReadOnlyTransactionID
1846		data      []byte
1847		gotResult bool // if we get matching result from two separate txns
1848	)
1849	if data, err = txn.ID.MarshalBinary(); err != nil {
1850		t.Fatalf("encoding failed %v", err)
1851	}
1852	if err = tid2.UnmarshalBinary(data); err != nil {
1853		t.Fatalf("decoding failed %v", err)
1854	}
1855	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
1856
1857	// Execute Partitions and compare results.
1858	for i, p := range partitions {
1859		iter := txn.Execute(ctx, p)
1860		defer iter.Stop()
1861		p2 := serdesPartition(t, i, p)
1862		iter2 := txn2.Execute(ctx, &p2)
1863		defer iter2.Stop()
1864
1865		row1, err1 := iter.Next()
1866		row2, err2 := iter2.Next()
1867		if err1 != err2 {
1868			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
1869			continue
1870		}
1871		if !testEqual(row1, row2) {
1872			t.Fatalf("execution returned different values: %v, %v", row1, row2)
1873			continue
1874		}
1875		if row1 == nil {
1876			continue
1877		}
1878		var a, b string
1879		if err = row1.Columns(&a, &b); err != nil {
1880			t.Fatalf("failed to parse row %v", err)
1881			continue
1882		}
1883		if a == str1 && b == str2 {
1884			gotResult = true
1885		}
1886	}
1887	if !gotResult {
1888		t.Fatalf("execution didn't return expected values")
1889	}
1890}
1891
1892// Test normal txReadEnv method on BatchReadOnlyTransaction.
1893func TestIntegration_BROTNormal(t *testing.T) {
1894	skipEmulatorTest(t)
1895	t.Parallel()
1896
1897	// Set up testing environment and create txn.
1898	var (
1899		txn *BatchReadOnlyTransaction
1900		err error
1901		row *Row
1902		i   int64
1903	)
1904	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1905	defer cancel()
1906	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
1907	defer cleanup()
1908
1909	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1910		t.Fatal(err)
1911	}
1912	defer txn.Cleanup(ctx)
1913	if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
1914		t.Fatal(err)
1915	}
1916	// Normal query should work with BatchReadOnlyTransaction.
1917	stmt2 := Statement{SQL: "SELECT 1"}
1918	iter := txn.Query(ctx, stmt2)
1919	defer iter.Stop()
1920
1921	row, err = iter.Next()
1922	if err != nil {
1923		t.Errorf("query failed with %v", err)
1924	}
1925	if err = row.Columns(&i); err != nil {
1926		t.Errorf("failed to parse row %v", err)
1927	}
1928}
1929
1930func TestIntegration_CommitTimestamp(t *testing.T) {
1931	t.Parallel()
1932
1933	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1934	defer cancel()
1935	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, ctsDBStatements)
1936	defer cleanup()
1937
1938	type testTableRow struct {
1939		Key string
1940		Ts  NullTime
1941	}
1942
1943	var (
1944		cts1, cts2, ts1, ts2 time.Time
1945		err                  error
1946	)
1947
1948	// Apply mutation in sequence, expect to see commit timestamp in good order,
1949	// check also the commit timestamp returned
1950	for _, it := range []struct {
1951		k string
1952		t *time.Time
1953	}{
1954		{"a", &cts1},
1955		{"b", &cts2},
1956	} {
1957		tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}}
1958		m, err := InsertStruct("TestTable", tt)
1959		if err != nil {
1960			t.Fatal(err)
1961		}
1962		*it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce())
1963		if err != nil {
1964			t.Fatal(err)
1965		}
1966	}
1967
1968	txn := client.ReadOnlyTransaction()
1969	for _, it := range []struct {
1970		k string
1971		t *time.Time
1972	}{
1973		{"a", &ts1},
1974		{"b", &ts2},
1975	} {
1976		if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil {
1977			t.Fatal(err)
1978		} else {
1979			var got testTableRow
1980			if err := r.ToStruct(&got); err != nil {
1981				t.Fatal(err)
1982			}
1983			*it.t = got.Ts.Time
1984		}
1985	}
1986	if !cts1.Equal(ts1) {
1987		t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1)
1988	}
1989	if !cts2.Equal(ts2) {
1990		t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2)
1991	}
1992
1993	// Try writing a timestamp in the future to commit timestamp, expect error.
1994	_, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce())
1995	if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok {
1996		t.Error(msg)
1997	}
1998}
1999
2000func TestIntegration_DML(t *testing.T) {
2001	t.Parallel()
2002
2003	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2004	defer cancel()
2005	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2006	defer cleanup()
2007
2008	// Function that reads a single row's first name from within a transaction.
2009	readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) {
2010		row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"})
2011		if err != nil {
2012			return "", err
2013		}
2014		var fn string
2015		if err := row.Column(0, &fn); err != nil {
2016			return "", err
2017		}
2018		return fn, nil
2019	}
2020
2021	// Function that reads multiple rows' first names from outside a read/write
2022	// transaction.
2023	readFirstNames := func(keys ...int) []string {
2024		var ks []KeySet
2025		for _, k := range keys {
2026			ks = append(ks, Key{k})
2027		}
2028		iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"})
2029		var got []string
2030		var fn string
2031		err := iter.Do(func(row *Row) error {
2032			if err := row.Column(0, &fn); err != nil {
2033				return err
2034			}
2035			got = append(got, fn)
2036			return nil
2037		})
2038		if err != nil {
2039			t.Fatalf("readFirstNames(%v): %v", keys, err)
2040		}
2041		return got
2042	}
2043
2044	// Use ReadWriteTransaction.Query to execute a DML statement.
2045	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2046		iter := tx.Query(ctx, Statement{
2047			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`,
2048		})
2049		defer iter.Stop()
2050		if row, err := iter.Next(); err != iterator.Done {
2051			t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err)
2052		}
2053		if iter.RowCount != 1 {
2054			t.Errorf("row count: got %d, want 1", iter.RowCount)
2055		}
2056		// The results of the DML statement should be visible to the transaction.
2057		got, err := readFirstName(tx, 1)
2058		if err != nil {
2059			return err
2060		}
2061		if want := "Umm"; got != want {
2062			t.Errorf("got %q, want %q", got, want)
2063		}
2064		return nil
2065	})
2066	if err != nil {
2067		t.Fatal(err)
2068	}
2069
2070	// Use ReadWriteTransaction.Update to execute a DML statement.
2071	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2072		count, err := tx.Update(ctx, Statement{
2073			SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`,
2074		})
2075		if err != nil {
2076			t.Fatal(err)
2077		}
2078		if count != 1 {
2079			t.Errorf("row count: got %d, want 1", count)
2080		}
2081		got, err := readFirstName(tx, 2)
2082		if err != nil {
2083			return err
2084		}
2085		if want := "Eduard"; got != want {
2086			t.Errorf("got %q, want %q", got, want)
2087		}
2088		return nil
2089
2090	})
2091	if err != nil {
2092		t.Fatal(err)
2093	}
2094
2095	// Roll back a DML statement and confirm that it didn't happen.
2096	var fail = errors.New("fail")
2097	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2098		_, err := tx.Update(ctx, Statement{
2099			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
2100		})
2101		if err != nil {
2102			return err
2103		}
2104		return fail
2105	})
2106	if err != fail {
2107		t.Fatalf("rolling back: got error %v, want the error 'fail'", err)
2108	}
2109	_, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"})
2110	if got, want := ErrCode(err), codes.NotFound; got != want {
2111		t.Errorf("got %s, want %s", got, want)
2112	}
2113
2114	// Run two DML statements in the same transaction.
2115	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2116		_, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`})
2117		if err != nil {
2118			return err
2119		}
2120		_, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`})
2121		if err != nil {
2122			return err
2123		}
2124		return nil
2125	})
2126	if err != nil {
2127		t.Fatal(err)
2128	}
2129	got := readFirstNames(1, 2)
2130	want := []string{"Oum", "Eddie"}
2131	if !testEqual(got, want) {
2132		t.Errorf("got %v, want %v", got, want)
2133	}
2134
2135	// Run a DML statement and an ordinary mutation in the same transaction.
2136	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2137		_, err := tx.Update(ctx, Statement{
2138			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
2139		})
2140		if err != nil {
2141			return err
2142		}
2143		return tx.BufferWrite([]*Mutation{
2144			Insert("Singers", []string{"SingerId", "FirstName", "LastName"},
2145				[]interface{}{4, "Andy", "Irvine"}),
2146		})
2147	})
2148	if err != nil {
2149		t.Fatal(err)
2150	}
2151	got = readFirstNames(3, 4)
2152	want = []string{"Audra", "Andy"}
2153	if !testEqual(got, want) {
2154		t.Errorf("got %v, want %v", got, want)
2155	}
2156
2157	// Attempt to run a query using update.
2158	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2159		_, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`})
2160		return err
2161	})
2162	if got, want := ErrCode(err), codes.InvalidArgument; got != want {
2163		t.Errorf("got %s, want %s", got, want)
2164	}
2165}
2166
2167func TestIntegration_StructParametersBind(t *testing.T) {
2168	t.Parallel()
2169
2170	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2171	defer cancel()
2172	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
2173	defer cleanup()
2174
2175	type tRow []interface{}
2176	type tRows []struct{ trow tRow }
2177
2178	type allFields struct {
2179		Stringf string
2180		Intf    int
2181		Boolf   bool
2182		Floatf  float64
2183		Bytef   []byte
2184		Timef   time.Time
2185		Datef   civil.Date
2186	}
2187	allColumns := []string{
2188		"Stringf",
2189		"Intf",
2190		"Boolf",
2191		"Floatf",
2192		"Bytef",
2193		"Timef",
2194		"Datef",
2195	}
2196	s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1}
2197	s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2}
2198
2199	dynamicStructType := reflect.StructOf([]reflect.StructField{
2200		{Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`},
2201	})
2202	s3 := reflect.New(dynamicStructType)
2203	s3.Elem().Field(0).Set(reflect.ValueOf(t1))
2204
2205	for i, test := range []struct {
2206		param interface{}
2207		sql   string
2208		cols  []string
2209		trows tRows
2210	}{
2211		// Struct value.
2212		{
2213			s1,
2214			"SELECT" +
2215				" @p.Stringf," +
2216				" @p.Intf," +
2217				" @p.Boolf," +
2218				" @p.Floatf," +
2219				" @p.Bytef," +
2220				" @p.Timef," +
2221				" @p.Datef",
2222			allColumns,
2223			tRows{
2224				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
2225			},
2226		},
2227		// Array of struct value.
2228		{
2229			[]allFields{s1, s2},
2230			"SELECT * FROM UNNEST(@p)",
2231			allColumns,
2232			tRows{
2233				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
2234				{tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}},
2235			},
2236		},
2237		// Null struct.
2238		{
2239			(*allFields)(nil),
2240			"SELECT @p IS NULL",
2241			[]string{""},
2242			tRows{
2243				{tRow{true}},
2244			},
2245		},
2246		// Null Array of struct.
2247		{
2248			[]allFields(nil),
2249			"SELECT @p IS NULL",
2250			[]string{""},
2251			tRows{
2252				{tRow{true}},
2253			},
2254		},
2255		// Empty struct.
2256		{
2257			struct{}{},
2258			"SELECT @p IS NULL ",
2259			[]string{""},
2260			tRows{
2261				{tRow{false}},
2262			},
2263		},
2264		// Empty array of struct.
2265		{
2266			[]allFields{},
2267			"SELECT * FROM UNNEST(@p) ",
2268			allColumns,
2269			tRows{},
2270		},
2271		// Struct with duplicate fields.
2272		{
2273			struct {
2274				A int `spanner:"field"`
2275				B int `spanner:"field"`
2276			}{10, 20},
2277			"SELECT * FROM UNNEST([@p]) ",
2278			[]string{"field", "field"},
2279			tRows{
2280				{tRow{10, 20}},
2281			},
2282		},
2283		// Struct with unnamed fields.
2284		{
2285			struct {
2286				A string `spanner:""`
2287			}{"hello"},
2288			"SELECT * FROM UNNEST([@p]) ",
2289			[]string{""},
2290			tRows{
2291				{tRow{"hello"}},
2292			},
2293		},
2294		// Mixed struct.
2295		{
2296			struct {
2297				DynamicStructField interface{}  `spanner:"f1"`
2298				ArrayStructField   []*allFields `spanner:"f2"`
2299			}{
2300				DynamicStructField: s3.Interface(),
2301				ArrayStructField:   []*allFields{nil},
2302			},
2303			"SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ",
2304			[]string{"ff1", "", ""},
2305			tRows{
2306				{tRow{t1, 1, true}},
2307			},
2308		},
2309	} {
2310		iter := client.Single().Query(ctx, Statement{
2311			SQL:    test.sql,
2312			Params: map[string]interface{}{"p": test.param},
2313		})
2314		var gotRows []*Row
2315		err := iter.Do(func(r *Row) error {
2316			gotRows = append(gotRows, r)
2317			return nil
2318		})
2319		if err != nil {
2320			t.Errorf("Failed to execute test case %d, error: %v", i, err)
2321		}
2322
2323		var wantRows []*Row
2324		for j, row := range test.trows {
2325			r, err := NewRow(test.cols, row.trow)
2326			if err != nil {
2327				t.Errorf("Invalid row %d in test case %d", j, i)
2328			}
2329			wantRows = append(wantRows, r)
2330		}
2331		if !testEqual(gotRows, wantRows) {
2332			t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows)
2333		}
2334	}
2335}
2336
2337func TestIntegration_PDML(t *testing.T) {
2338	skipEmulatorTest(t)
2339	t.Parallel()
2340
2341	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2342	defer cancel()
2343	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2344	defer cleanup()
2345
2346	columns := []string{"SingerId", "FirstName", "LastName"}
2347
2348	// Populate the Singers table.
2349	var muts []*Mutation
2350	for _, row := range [][]interface{}{
2351		{1, "Umm", "Kulthum"},
2352		{2, "Eduard", "Khil"},
2353		{3, "Audra", "McDonald"},
2354		{4, "Enrique", "Iglesias"},
2355		{5, "Shakira", "Ripoll"},
2356	} {
2357		muts = append(muts, Insert("Singers", columns, row))
2358	}
2359	if _, err := client.Apply(ctx, muts); err != nil {
2360		t.Fatal(err)
2361	}
2362	// Identifiers in PDML statements must be fully qualified.
2363	// TODO(jba): revisit the above.
2364	count, err := client.PartitionedUpdate(ctx, Statement{
2365		SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= @end`,
2366		Params: map[string]interface{}{
2367			"end": 3,
2368		},
2369	})
2370	if err != nil {
2371		t.Fatal(err)
2372	}
2373	if want := int64(3); count != want {
2374		t.Fatalf("got %d, want %d", count, want)
2375	}
2376	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2377	if err != nil {
2378		t.Fatal(err)
2379	}
2380	want := [][]interface{}{
2381		{int64(1), "changed", "Kulthum"},
2382		{int64(2), "changed", "Khil"},
2383		{int64(3), "changed", "McDonald"},
2384		{int64(4), "Enrique", "Iglesias"},
2385		{int64(5), "Shakira", "Ripoll"},
2386	}
2387	if !testEqual(got, want) {
2388		t.Errorf("\ngot %v\nwant%v", got, want)
2389	}
2390}
2391
2392func TestBatchDML(t *testing.T) {
2393	t.Parallel()
2394
2395	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2396	defer cancel()
2397	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2398	defer cleanup()
2399
2400	columns := []string{"SingerId", "FirstName", "LastName"}
2401
2402	// Populate the Singers table.
2403	var muts []*Mutation
2404	for _, row := range [][]interface{}{
2405		{1, "Umm", "Kulthum"},
2406		{2, "Eduard", "Khil"},
2407		{3, "Audra", "McDonald"},
2408	} {
2409		muts = append(muts, Insert("Singers", columns, row))
2410	}
2411	if _, err := client.Apply(ctx, muts); err != nil {
2412		t.Fatal(err)
2413	}
2414
2415	var counts []int64
2416	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2417		counts, err = tx.BatchUpdate(ctx, []Statement{
2418			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2419			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2420			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2421		})
2422		return err
2423	})
2424
2425	if err != nil {
2426		t.Fatal(err)
2427	}
2428	if want := []int64{1, 1, 1}; !testEqual(counts, want) {
2429		t.Fatalf("got %d, want %d", counts, want)
2430	}
2431	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2432	if err != nil {
2433		t.Fatal(err)
2434	}
2435	want := [][]interface{}{
2436		{int64(1), "changed 1", "Kulthum"},
2437		{int64(2), "changed 2", "Khil"},
2438		{int64(3), "changed 3", "McDonald"},
2439	}
2440	if !testEqual(got, want) {
2441		t.Errorf("\ngot %v\nwant%v", got, want)
2442	}
2443}
2444
2445func TestBatchDML_NoStatements(t *testing.T) {
2446	t.Parallel()
2447
2448	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2449	defer cancel()
2450	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2451	defer cleanup()
2452
2453	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2454		_, err = tx.BatchUpdate(ctx, []Statement{})
2455		return err
2456	})
2457	if err == nil {
2458		t.Fatal("expected error, got nil")
2459	}
2460	if s, ok := status.FromError(err); ok {
2461		if s.Code() != codes.InvalidArgument {
2462			t.Fatalf("expected InvalidArgument, got %v", err)
2463		}
2464	} else {
2465		t.Fatalf("expected InvalidArgument, got %v", err)
2466	}
2467}
2468
2469func TestBatchDML_TwoStatements(t *testing.T) {
2470	t.Parallel()
2471
2472	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2473	defer cancel()
2474	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2475	defer cleanup()
2476
2477	columns := []string{"SingerId", "FirstName", "LastName"}
2478
2479	// Populate the Singers table.
2480	var muts []*Mutation
2481	for _, row := range [][]interface{}{
2482		{1, "Umm", "Kulthum"},
2483		{2, "Eduard", "Khil"},
2484		{3, "Audra", "McDonald"},
2485	} {
2486		muts = append(muts, Insert("Singers", columns, row))
2487	}
2488	if _, err := client.Apply(ctx, muts); err != nil {
2489		t.Fatal(err)
2490	}
2491
2492	var updateCount int64
2493	var batchCounts []int64
2494	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2495		batchCounts, err = tx.BatchUpdate(ctx, []Statement{
2496			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2497			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2498			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2499		})
2500		if err != nil {
2501			return err
2502		}
2503
2504		updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`})
2505		return err
2506	})
2507	if err != nil {
2508		t.Fatal(err)
2509	}
2510	if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) {
2511		t.Fatalf("got %d, want %d", batchCounts, want)
2512	}
2513	if updateCount != 1 {
2514		t.Fatalf("got %v, want 1", updateCount)
2515	}
2516}
2517
2518// TODO(deklerk): this currently does not work because the transaction appears to
2519// get rolled back after a single statement fails. b/120158761
2520func TestBatchDML_Error(t *testing.T) {
2521	t.Parallel()
2522
2523	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2524	defer cancel()
2525	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2526	defer cleanup()
2527
2528	columns := []string{"SingerId", "FirstName", "LastName"}
2529
2530	// Populate the Singers table.
2531	var muts []*Mutation
2532	for _, row := range [][]interface{}{
2533		{1, "Umm", "Kulthum"},
2534		{2, "Eduard", "Khil"},
2535		{3, "Audra", "McDonald"},
2536	} {
2537		muts = append(muts, Insert("Singers", columns, row))
2538	}
2539	if _, err := client.Apply(ctx, muts); err != nil {
2540		t.Fatal(err)
2541	}
2542
2543	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2544		counts, err := tx.BatchUpdate(ctx, []Statement{
2545			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2546			{SQL: `some illegal statement`},
2547			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2548		})
2549		if err == nil {
2550			t.Fatal("expected err, got nil")
2551		}
2552		if want := []int64{1}; !testEqual(counts, want) {
2553			t.Fatalf("got %d, want %d", counts, want)
2554		}
2555
2556		got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns))
2557		if err != nil {
2558			t.Fatal(err)
2559		}
2560		want := [][]interface{}{
2561			{int64(1), "changed 1", "Kulthum"},
2562			{int64(2), "Eduard", "Khil"},
2563			{int64(3), "Audra", "McDonald"},
2564		}
2565		if !testEqual(got, want) {
2566			t.Errorf("\ngot %v\nwant%v", got, want)
2567		}
2568
2569		return nil
2570	})
2571	if err != nil {
2572		t.Fatal(err)
2573	}
2574}
2575
2576// Prepare initializes Cloud Spanner testing DB and clients.
2577func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
2578	if databaseAdmin == nil {
2579		t.Skip("Integration tests skipped")
2580	}
2581	// Construct a unique test DB name.
2582	dbName := dbNameSpace.New()
2583
2584	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName)
2585	// Create database and tables.
2586	op, err := databaseAdmin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{
2587		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
2588		CreateStatement: "CREATE DATABASE " + dbName,
2589		ExtraStatements: statements,
2590	})
2591	if err != nil {
2592		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
2593	}
2594	if _, err := op.Wait(ctx); err != nil {
2595		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
2596	}
2597	client, err := createClient(ctx, dbPath, spc)
2598	if err != nil {
2599		t.Fatalf("cannot create data client on DB %v: %v", dbPath, err)
2600	}
2601	return client, dbPath, func() {
2602		client.Close()
2603	}
2604}
2605
2606func cleanupInstances() {
2607	if instanceAdmin == nil {
2608		// Integration tests skipped.
2609		return
2610	}
2611
2612	ctx := context.Background()
2613	parent := fmt.Sprintf("projects/%v", testProjectID)
2614	iter := instanceAdmin.ListInstances(ctx, &instancepb.ListInstancesRequest{
2615		Parent: parent,
2616		Filter: "name:gotest-",
2617	})
2618	expireAge := 24 * time.Hour
2619
2620	for {
2621		inst, err := iter.Next()
2622		if err == iterator.Done {
2623			break
2624		}
2625		if err != nil {
2626			panic(err)
2627		}
2628		if instanceNameSpace.Older(inst.Name, expireAge) {
2629			log.Printf("Deleting instance %s", inst.Name)
2630
2631			if err := instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{Name: inst.Name}); err != nil {
2632				log.Printf("failed to delete instance %s (error %v), might need a manual removal",
2633					inst.Name, err)
2634			}
2635		}
2636	}
2637}
2638
2639func rangeReads(ctx context.Context, t *testing.T, client *Client) {
2640	checkRange := func(ks KeySet, wantNums ...int) {
2641		if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok {
2642			t.Errorf("key set %+v: %s", ks, msg)
2643		}
2644	}
2645
2646	checkRange(Key{"k1"}, 1)
2647	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4)
2648	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5)
2649	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5)
2650	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4)
2651
2652	// Partial key specification.
2653	checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9)
2654	checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9)
2655	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10)
2656	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11)
2657
2658	// The following produce empty ranges.
2659	// TODO(jba): Consider a multi-part key to illustrate partial key behavior.
2660	// checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen})
2661	// checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen})
2662	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen})
2663	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed})
2664
2665	// Prefix is component-wise, not string prefix.
2666	checkRange(Key{"k1"}.AsPrefix(), 1)
2667	checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
2668
2669	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
2670}
2671
2672func indexRangeReads(ctx context.Context, t *testing.T, client *Client) {
2673	checkRange := func(ks KeySet, wantNums ...int) {
2674		if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns),
2675			wantNums); !ok {
2676			t.Errorf("key set %+v: %s", ks, msg)
2677		}
2678	}
2679
2680	checkRange(Key{"v1"}, 1)
2681	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4)
2682	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5)
2683	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5)
2684	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4)
2685
2686	// // Partial key specification.
2687	checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9)
2688	checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9)
2689	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10)
2690	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11)
2691
2692	// // The following produce empty ranges.
2693	// checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen})
2694	// checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen})
2695	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen})
2696	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed})
2697
2698	// // Prefix is component-wise, not string prefix.
2699	checkRange(Key{"v1"}.AsPrefix(), 1)
2700	checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
2701	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
2702
2703	// Read from an index with DESC ordering.
2704	wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}
2705	if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns),
2706		wantNums); !ok {
2707		t.Errorf("desc: %s", msg)
2708	}
2709}
2710
2711type testTableRow struct{ Key, StringValue string }
2712
2713func compareRows(iter *RowIterator, wantNums []int) (string, bool) {
2714	rows, err := readAllTestTable(iter)
2715	if err != nil {
2716		return err.Error(), false
2717	}
2718	want := map[string]string{}
2719	for _, n := range wantNums {
2720		want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n)
2721	}
2722	got := map[string]string{}
2723	for _, r := range rows {
2724		got[r.Key] = r.StringValue
2725	}
2726	if !testEqual(got, want) {
2727		return fmt.Sprintf("got %v, want %v", got, want), false
2728	}
2729	return "", true
2730}
2731
2732func isNaN(x interface{}) bool {
2733	f, ok := x.(float64)
2734	if !ok {
2735		return false
2736	}
2737	return math.IsNaN(f)
2738}
2739
2740// createClient creates Cloud Spanner data client.
2741func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) {
2742	if os.Getenv("SPANNER_EMULATOR_HOST") == "" {
2743		client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{
2744			SessionPoolConfig: spc,
2745		}, option.WithTokenSource(testutil.TokenSource(ctx, Scope, AdminScope)), option.WithEndpoint(endpoint))
2746	} else {
2747		// When the emulator is enabled, option.WithoutAuthentication()
2748		// will be added automatically which is incompatible with
2749		// option.WithTokenSource(testutil.TokenSource(ctx, Scope)).
2750		client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc})
2751	}
2752
2753	if err != nil {
2754		return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
2755	}
2756	return client, nil
2757}
2758
2759// populate prepares the database with some data.
2760func populate(ctx context.Context, client *Client) error {
2761	// Populate data
2762	var err error
2763	m := InsertMap("test", map[string]interface{}{
2764		"a": str1,
2765		"b": str2,
2766	})
2767	_, err = client.Apply(ctx, []*Mutation{m})
2768	return err
2769}
2770
2771func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) {
2772	if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) {
2773		return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false
2774	}
2775	return "", true
2776}
2777
2778func rowToValues(r *Row) ([]interface{}, error) {
2779	var x int64
2780	var y, z string
2781	if err := r.Column(0, &x); err != nil {
2782		return nil, err
2783	}
2784	if err := r.Column(1, &y); err != nil {
2785		return nil, err
2786	}
2787	if err := r.Column(2, &z); err != nil {
2788		return nil, err
2789	}
2790	return []interface{}{x, y, z}, nil
2791}
2792
2793func readAll(iter *RowIterator) ([][]interface{}, error) {
2794	defer iter.Stop()
2795	var vals [][]interface{}
2796	for {
2797		row, err := iter.Next()
2798		if err == iterator.Done {
2799			return vals, nil
2800		}
2801		if err != nil {
2802			return nil, err
2803		}
2804		v, err := rowToValues(row)
2805		if err != nil {
2806			return nil, err
2807		}
2808		vals = append(vals, v)
2809	}
2810}
2811
2812func readAllTestTable(iter *RowIterator) ([]testTableRow, error) {
2813	defer iter.Stop()
2814	var vals []testTableRow
2815	for {
2816		row, err := iter.Next()
2817		if err == iterator.Done {
2818			return vals, nil
2819		}
2820		if err != nil {
2821			return nil, err
2822		}
2823		var ttr testTableRow
2824		if err := row.ToStruct(&ttr); err != nil {
2825			return nil, err
2826		}
2827		vals = append(vals, ttr)
2828	}
2829}
2830
2831func maxDuration(a, b time.Duration) time.Duration {
2832	if a > b {
2833		return a
2834	}
2835	return b
2836}
2837
2838func skipEmulatorTest(t *testing.T) {
2839	if os.Getenv("SPANNER_EMULATOR_HOST") != "" {
2840		t.Skip("Skipping testing against the emulator.")
2841	}
2842}
2843