1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"errors"
22	"flag"
23	"fmt"
24	"log"
25	"math"
26	"os"
27	"reflect"
28	"strings"
29	"sync"
30	"testing"
31	"time"
32
33	"cloud.google.com/go/civil"
34	"cloud.google.com/go/internal/testutil"
35	"cloud.google.com/go/internal/uid"
36	database "cloud.google.com/go/spanner/admin/database/apiv1"
37	instance "cloud.google.com/go/spanner/admin/instance/apiv1"
38	"google.golang.org/api/iterator"
39	"google.golang.org/api/option"
40	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
41	instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
42	sppb "google.golang.org/genproto/googleapis/spanner/v1"
43	"google.golang.org/grpc"
44	"google.golang.org/grpc/codes"
45	"google.golang.org/grpc/status"
46)
47
48var (
49	// testProjectID specifies the project used for testing. It can be changed
50	// by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID.
51	testProjectID = testutil.ProjID()
52
53	dbNameSpace       = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
54	instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true})
55	testInstanceID    = instanceNameSpace.New()
56
57	testTable        = "TestTable"
58	testTableIndex   = "TestTableByValue"
59	testTableColumns = []string{"Key", "StringValue"}
60
61	databaseAdmin *database.DatabaseAdminClient
62	instanceAdmin *instance.InstanceAdminClient
63
64	singerDBStatements = []string{
65		`CREATE TABLE Singers (
66				SingerId	INT64 NOT NULL,
67				FirstName	STRING(1024),
68				LastName	STRING(1024),
69				SingerInfo	BYTES(MAX)
70			) PRIMARY KEY (SingerId)`,
71		`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
72		`CREATE TABLE Accounts (
73				AccountId	INT64 NOT NULL,
74				Nickname	STRING(100),
75				Balance		INT64 NOT NULL,
76			) PRIMARY KEY (AccountId)`,
77		`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
78		`CREATE TABLE Types (
79				RowID		INT64 NOT NULL,
80				String		STRING(MAX),
81				StringArray	ARRAY<STRING(MAX)>,
82				Bytes		BYTES(MAX),
83				BytesArray	ARRAY<BYTES(MAX)>,
84				Int64a		INT64,
85				Int64Array	ARRAY<INT64>,
86				Bool		BOOL,
87				BoolArray	ARRAY<BOOL>,
88				Float64		FLOAT64,
89				Float64Array	ARRAY<FLOAT64>,
90				Date		DATE,
91				DateArray	ARRAY<DATE>,
92				Timestamp	TIMESTAMP,
93				TimestampArray	ARRAY<TIMESTAMP>,
94			) PRIMARY KEY (RowID)`,
95	}
96
97	readDBStatements = []string{
98		`CREATE TABLE TestTable (
99                    Key          STRING(MAX) NOT NULL,
100                    StringValue  STRING(MAX)
101            ) PRIMARY KEY (Key)`,
102		`CREATE INDEX TestTableByValue ON TestTable(StringValue)`,
103		`CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`,
104	}
105
106	simpleDBStatements = []string{
107		`CREATE TABLE test (
108				a	STRING(1024),
109				b	STRING(1024),
110			) PRIMARY KEY (a)`,
111	}
112	simpleDBTableColumns = []string{"a", "b"}
113
114	ctsDBStatements = []string{
115		`CREATE TABLE TestTable (
116		    Key  STRING(MAX) NOT NULL,
117		    Ts   TIMESTAMP OPTIONS (allow_commit_timestamp = true),
118	    ) PRIMARY KEY (Key)`,
119	}
120)
121
122const (
123	str1 = "alice"
124	str2 = "a@example.com"
125)
126
127func TestMain(m *testing.M) {
128	cleanup := initIntegrationTests()
129	res := m.Run()
130	cleanup()
131	os.Exit(res)
132}
133
134var grpcHeaderChecker = testutil.DefaultHeadersEnforcer()
135
136func initIntegrationTests() (cleanup func()) {
137	ctx := context.Background()
138	flag.Parse() // Needed for testing.Short().
139	noop := func() {}
140
141	if testing.Short() {
142		log.Println("Integration tests skipped in -short mode.")
143		return noop
144	}
145
146	if testProjectID == "" {
147		log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing")
148		return noop
149	}
150
151	opts := grpcHeaderChecker.CallOptions()
152	// Run integration tests against the given emulator. Currently, the database and
153	// instance admin clients are auto-generated, which do not support to configure
154	// SPANNER_EMULATOR_HOST.
155	emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST")
156	if emulatorAddr != "" {
157		opts = append(
158			opts,
159			option.WithEndpoint(emulatorAddr),
160			option.WithGRPCDialOption(grpc.WithInsecure()),
161			option.WithoutAuthentication(),
162		)
163	} else {
164		ts := testutil.TokenSource(ctx, AdminScope, Scope)
165		if ts == nil {
166			log.Printf("Integration test skipped: cannot get service account credential from environment variable %v", "GCLOUD_TESTS_GOLANG_KEY")
167			return noop
168		}
169
170		opts = append(opts, option.WithTokenSource(ts), option.WithEndpoint(endpoint))
171	}
172
173	var err error
174	// Create InstanceAdmin and DatabaseAdmin clients.
175	instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...)
176	if err != nil {
177		log.Fatalf("cannot create instance databaseAdmin client: %v", err)
178	}
179	databaseAdmin, err = database.NewDatabaseAdminClient(ctx, opts...)
180	if err != nil {
181		log.Fatalf("cannot create databaseAdmin client: %v", err)
182	}
183	// Get the list of supported instance configs for the project that is used
184	// for the integration tests. The supported instance configs can differ per
185	// project. The integration tests will use the first instance config that
186	// is returned by Cloud Spanner. This will normally be the regional config
187	// that is physically the closest to where the request is coming from.
188	configIterator := instanceAdmin.ListInstanceConfigs(ctx, &instancepb.ListInstanceConfigsRequest{
189		Parent: fmt.Sprintf("projects/%s", testProjectID),
190	})
191	config, err := configIterator.Next()
192	if err != nil {
193		log.Fatalf("Cannot get any instance configurations.\nPlease make sure the Cloud Spanner API is enabled for the test project.\nGet error: %v", err)
194	}
195	// Create a test instance to use for this test run.
196	op, err := instanceAdmin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{
197		Parent:     fmt.Sprintf("projects/%s", testProjectID),
198		InstanceId: testInstanceID,
199		Instance: &instancepb.Instance{
200			Config:      config.Name,
201			DisplayName: testInstanceID,
202			NodeCount:   1,
203		},
204	})
205	if err != nil {
206		log.Fatalf("could not create instance with id %s: %v", fmt.Sprintf("projects/%s/instances/%s", testProjectID, testInstanceID), err)
207	}
208	// Wait for the instance creation to finish.
209	i, err := op.Wait(ctx)
210	if err != nil {
211		log.Fatalf("waiting for instance creation to finish failed: %v", err)
212	}
213	if i.State != instancepb.Instance_READY {
214		log.Printf("instance state is not READY, it might be that the test instance will cause problems during tests. Got state %v\n", i.State)
215	}
216
217	return func() {
218		// Delete this test instance.
219		instanceName := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID)
220		if err = instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{
221			Name: instanceName,
222		}); err != nil {
223			log.Printf("failed to drop instance %s (error %v), might need a manual removal",
224				instanceName, err)
225		}
226		// Delete other test instances that may be lingering around.
227		cleanupInstances()
228		databaseAdmin.Close()
229		instanceAdmin.Close()
230	}
231}
232
233func TestIntegration_InitSessionPool(t *testing.T) {
234	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
235	defer cancel()
236	// Set up an empty testing environment.
237	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, []string{})
238	defer cleanup()
239	sp := client.idleSessions
240	sp.mu.Lock()
241	want := sp.MinOpened
242	sp.mu.Unlock()
243	var numOpened int
244loop:
245	for {
246		select {
247		case <-ctx.Done():
248			t.Fatalf("timed out, got %d session(s), want %d", numOpened, want)
249		default:
250			sp.mu.Lock()
251			numOpened = sp.idleList.Len() + sp.idleWriteList.Len()
252			sp.mu.Unlock()
253			if uint64(numOpened) == want {
254				break loop
255			}
256		}
257	}
258	// Delete all sessions in the pool on the backend and then try to execute a
259	// simple query. The 'Session not found' error should cause an automatic
260	// retry of the read-only transaction.
261	sp.mu.Lock()
262	s := sp.idleList.Front()
263	for {
264		if s == nil {
265			break
266		}
267		// This will delete the session on the backend without removing it
268		// from the pool.
269		s.Value.(*session).delete(context.Background())
270		s = s.Next()
271	}
272	sp.mu.Unlock()
273	sql := "SELECT 1, 'FOO', 'BAR'"
274	tx := client.ReadOnlyTransaction()
275	defer tx.Close()
276	iter := tx.Query(context.Background(), NewStatement(sql))
277	rows, err := readAll(iter)
278	if err != nil {
279		t.Fatalf("Unexpected error for query %q: %v", sql, err)
280	}
281	if got, want := len(rows), 1; got != want {
282		t.Fatalf("Row count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
283	}
284	if got, want := len(rows[0]), 3; got != want {
285		t.Fatalf("Column count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
286	}
287	if got, want := rows[0][0].(int64), int64(1); got != want {
288		t.Fatalf("Column value mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
289	}
290}
291
292// Test SingleUse transaction.
293func TestIntegration_SingleUse(t *testing.T) {
294	t.Parallel()
295
296	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
297	defer cancel()
298	// Set up testing environment.
299	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
300	defer cleanup()
301
302	writes := []struct {
303		row []interface{}
304		ts  time.Time
305	}{
306		{row: []interface{}{1, "Marc", "Foo"}},
307		{row: []interface{}{2, "Tars", "Bar"}},
308		{row: []interface{}{3, "Alpha", "Beta"}},
309		{row: []interface{}{4, "Last", "End"}},
310	}
311	// Try to write four rows through the Apply API.
312	for i, w := range writes {
313		var err error
314		m := InsertOrUpdate("Singers",
315			[]string{"SingerId", "FirstName", "LastName"},
316			w.row)
317		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
318			t.Fatal(err)
319		}
320	}
321	// Calculate time difference between Cloud Spanner server and localhost to
322	// use to determine the exact staleness value to use.
323	timeDiff := maxDuration(time.Now().Sub(writes[0].ts), 0)
324
325	// Test reading rows with different timestamp bounds.
326	for i, test := range []struct {
327		name    string
328		want    [][]interface{}
329		tb      TimestampBound
330		checkTs func(time.Time) error
331	}{
332		{
333			name: "strong",
334			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
335			tb:   StrongRead(),
336			checkTs: func(ts time.Time) error {
337				// writes[3] is the last write, all subsequent strong read
338				// should have a timestamp larger than that.
339				if ts.Before(writes[3].ts) {
340					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
341				}
342				return nil
343			},
344		},
345		{
346			name: "min_read_timestamp",
347			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
348			tb:   MinReadTimestamp(writes[3].ts),
349			checkTs: func(ts time.Time) error {
350				if ts.Before(writes[3].ts) {
351					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
352				}
353				return nil
354			},
355		},
356		{
357			name: "max_staleness",
358			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
359			tb:   MaxStaleness(time.Second),
360			checkTs: func(ts time.Time) error {
361				if ts.Before(writes[3].ts) {
362					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
363				}
364				return nil
365			},
366		},
367		{
368			name: "read_timestamp",
369			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
370			tb:   ReadTimestamp(writes[2].ts),
371			checkTs: func(ts time.Time) error {
372				if ts != writes[2].ts {
373					return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts)
374				}
375				return nil
376			},
377		},
378		{
379			name: "exact_staleness",
380			want: nil,
381			// Specify a staleness which should be already before this test.
382			tb: ExactStaleness(time.Now().Sub(writes[0].ts) + timeDiff + 30*time.Second),
383			checkTs: func(ts time.Time) error {
384				if !ts.Before(writes[0].ts) {
385					return fmt.Errorf("read got timestamp %v, want it to be earlier than %v", ts, writes[0].ts)
386				}
387				return nil
388			},
389		},
390	} {
391		t.Run(test.name, func(t *testing.T) {
392			// SingleUse.Query
393			su := client.Single().WithTimestampBound(test.tb)
394			got, err := readAll(su.Query(
395				ctx,
396				Statement{
397					"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4) ORDER BY SingerId",
398					map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
399				}))
400			if err != nil {
401				t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err)
402			}
403			rts, err := su.Timestamp()
404			if err != nil {
405				t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err)
406			}
407			if err := test.checkTs(rts); err != nil {
408				t.Fatalf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err)
409			}
410			if !testEqual(got, test.want) {
411				t.Fatalf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want)
412			}
413			// SingleUse.Read
414			su = client.Single().WithTimestampBound(test.tb)
415			got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
416			if err != nil {
417				t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err)
418			}
419			rts, err = su.Timestamp()
420			if err != nil {
421				t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err)
422			}
423			if err := test.checkTs(rts); err != nil {
424				t.Fatalf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err)
425			}
426			if !testEqual(got, test.want) {
427				t.Fatalf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want)
428			}
429			// SingleUse.ReadRow
430			got = nil
431			for _, k := range []Key{{1}, {3}, {4}} {
432				su = client.Single().WithTimestampBound(test.tb)
433				r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
434				if err != nil {
435					continue
436				}
437				v, err := rowToValues(r)
438				if err != nil {
439					continue
440				}
441				got = append(got, v)
442				rts, err = su.Timestamp()
443				if err != nil {
444					t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
445				}
446				if err := test.checkTs(rts); err != nil {
447					t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
448				}
449			}
450			if !testEqual(got, test.want) {
451				t.Fatalf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want)
452			}
453			// SingleUse.ReadUsingIndex
454			su = client.Single().WithTimestampBound(test.tb)
455			got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
456			if err != nil {
457				t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err)
458			}
459			// The results from ReadUsingIndex is sorted by the index rather than primary key.
460			if len(got) != len(test.want) {
461				t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
462			}
463			for j, g := range got {
464				if j > 0 {
465					prev := got[j-1][1].(string) + got[j-1][2].(string)
466					curr := got[j][1].(string) + got[j][2].(string)
467					if strings.Compare(prev, curr) > 0 {
468						t.Fatalf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
469					}
470				}
471				found := false
472				for _, w := range test.want {
473					if testEqual(g, w) {
474						found = true
475					}
476				}
477				if !found {
478					t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
479					break
480				}
481			}
482			rts, err = su.Timestamp()
483			if err != nil {
484				t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
485			}
486			if err := test.checkTs(rts); err != nil {
487				t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
488			}
489			// SingleUse.ReadRowUsingIndex
490			got = nil
491			for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} {
492				su = client.Single().WithTimestampBound(test.tb)
493				r, err := su.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"})
494				if err != nil {
495					continue
496				}
497				v, err := rowToValues(r)
498				if err != nil {
499					continue
500				}
501				got = append(got, v)
502				rts, err = su.Timestamp()
503				if err != nil {
504					t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err)
505				}
506				if err := test.checkTs(rts); err != nil {
507					t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err)
508				}
509			}
510			if !testEqual(got, test.want) {
511				t.Fatalf("%d: got unexpected results from SingleUse.ReadRowUsingIndex: %v, want %v", i, got, test.want)
512			}
513		})
514	}
515}
516
517// Test resource-based routing enabled.
518func TestIntegration_SingleUse_WithResourceBasedRouting(t *testing.T) {
519	os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true")
520	defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "")
521
522	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
523	defer cancel()
524	// Set up testing environment.
525	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
526	defer cleanup()
527
528	writes := []struct {
529		row []interface{}
530		ts  time.Time
531	}{
532		{row: []interface{}{1, "Marc", "Foo"}},
533		{row: []interface{}{2, "Tars", "Bar"}},
534		{row: []interface{}{3, "Alpha", "Beta"}},
535		{row: []interface{}{4, "Last", "End"}},
536	}
537	// Try to write four rows through the Apply API.
538	for i, w := range writes {
539		var err error
540		m := InsertOrUpdate("Singers",
541			[]string{"SingerId", "FirstName", "LastName"},
542			w.row)
543		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
544			t.Fatal(err)
545		}
546	}
547
548	row, err := client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"})
549	if err != nil {
550		t.Errorf("SingleUse.ReadRow returns error %v, want nil", err)
551	}
552	var got string
553	if err := row.Column(0, &got); err != nil {
554		t.Errorf("row.Column returns error %v, want nil", err)
555	}
556	if want := "Alpha"; got != want {
557		t.Errorf("got %q, want %q", got, want)
558	}
559}
560
561// Test custom query options provided on query-level configuration.
562func TestIntegration_SingleUse_WithQueryOptions(t *testing.T) {
563	skipEmulatorTest(t)
564	t.Parallel()
565
566	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
567	defer cancel()
568	// Set up testing environment.
569	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
570	defer cleanup()
571
572	writes := []struct {
573		row []interface{}
574		ts  time.Time
575	}{
576		{row: []interface{}{1, "Marc", "Foo"}},
577		{row: []interface{}{2, "Tars", "Bar"}},
578		{row: []interface{}{3, "Alpha", "Beta"}},
579		{row: []interface{}{4, "Last", "End"}},
580	}
581	// Try to write four rows through the Apply API.
582	for i, w := range writes {
583		var err error
584		m := InsertOrUpdate("Singers",
585			[]string{"SingerId", "FirstName", "LastName"},
586			w.row)
587		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
588			t.Fatal(err)
589		}
590	}
591	qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}}
592	got, err := readAll(client.Single().QueryWithOptions(ctx, Statement{
593		"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
594		map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
595	}, qo))
596
597	if err != nil {
598		t.Errorf("ReadOnlyTransaction.QueryWithOptions returns error %v, want nil", err)
599	}
600
601	want := [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}
602	if !testEqual(got, want) {
603		t.Errorf("got unexpected result from ReadOnlyTransaction.QueryWithOptions: %v, want %v", got, want)
604	}
605}
606
607func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) {
608	t.Parallel()
609
610	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
611	defer cancel()
612	// Set up testing environment.
613	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
614	defer cleanup()
615
616	writes := []struct {
617		row []interface{}
618		ts  time.Time
619	}{
620		{row: []interface{}{1, "Marc", "Foo"}},
621		{row: []interface{}{2, "Tars", "Bar"}},
622		{row: []interface{}{3, "Alpha", "Beta"}},
623		{row: []interface{}{4, "Last", "End"}},
624	}
625	// Try to write four rows through the Apply API.
626	for i, w := range writes {
627		var err error
628		m := InsertOrUpdate("Singers",
629			[]string{"SingerId", "FirstName", "LastName"},
630			w.row)
631		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
632			t.Fatal(err)
633		}
634	}
635
636	su := client.Single()
637	const limit = 1
638	gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}),
639		[]string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit}))
640	if err != nil {
641		t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err)
642	}
643	if got, want := len(gotRows), limit; got != want {
644		t.Errorf("got %d, want %d", got, want)
645	}
646}
647
648// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it
649// also tests for a single timestamp across multiple reads.
650func TestIntegration_ReadOnlyTransaction(t *testing.T) {
651	t.Parallel()
652
653	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
654	defer cancel()
655	// Set up testing environment.
656	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
657	defer cleanup()
658
659	writes := []struct {
660		row []interface{}
661		ts  time.Time
662	}{
663		{row: []interface{}{1, "Marc", "Foo"}},
664		{row: []interface{}{2, "Tars", "Bar"}},
665		{row: []interface{}{3, "Alpha", "Beta"}},
666		{row: []interface{}{4, "Last", "End"}},
667	}
668	// Try to write four rows through the Apply API.
669	for i, w := range writes {
670		var err error
671		m := InsertOrUpdate("Singers",
672			[]string{"SingerId", "FirstName", "LastName"},
673			w.row)
674		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
675			t.Fatal(err)
676		}
677	}
678
679	// For testing timestamp bound staleness.
680	<-time.After(time.Second)
681
682	// Test reading rows with different timestamp bounds.
683	for i, test := range []struct {
684		want    [][]interface{}
685		tb      TimestampBound
686		checkTs func(time.Time) error
687	}{
688		// Note: min_read_timestamp and max_staleness are not supported by
689		// ReadOnlyTransaction. See API document for more details.
690		{
691			// strong
692			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
693			StrongRead(),
694			func(ts time.Time) error {
695				if ts.Before(writes[3].ts) {
696					return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
697				}
698				return nil
699			},
700		},
701		{
702			// read_timestamp
703			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
704			ReadTimestamp(writes[2].ts),
705			func(ts time.Time) error {
706				if ts != writes[2].ts {
707					return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts)
708				}
709				return nil
710			},
711		},
712		{
713			// exact_staleness
714			nil,
715			// Specify a staleness which should be already before this test
716			// because context timeout is set to be 10s.
717			ExactStaleness(11 * time.Second),
718			func(ts time.Time) error {
719				if ts.After(writes[0].ts) {
720					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts)
721				}
722				return nil
723			},
724		},
725	} {
726		// ReadOnlyTransaction.Query
727		ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb)
728		got, err := readAll(ro.Query(
729			ctx,
730			Statement{
731				"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4) ORDER BY SingerId",
732				map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
733			}))
734		if err != nil {
735			t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err)
736		}
737		if !testEqual(got, test.want) {
738			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want)
739		}
740		rts, err := ro.Timestamp()
741		if err != nil {
742			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err)
743		}
744		if err := test.checkTs(rts); err != nil {
745			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err)
746		}
747		roTs := rts
748		// ReadOnlyTransaction.Read
749		got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
750		if err != nil {
751			t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err)
752		}
753		if !testEqual(got, test.want) {
754			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want)
755		}
756		rts, err = ro.Timestamp()
757		if err != nil {
758			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err)
759		}
760		if err := test.checkTs(rts); err != nil {
761			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err)
762		}
763		if roTs != rts {
764			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
765		}
766		// ReadOnlyTransaction.ReadRow
767		got = nil
768		for _, k := range []Key{{1}, {3}, {4}} {
769			r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
770			if err != nil {
771				continue
772			}
773			v, err := rowToValues(r)
774			if err != nil {
775				continue
776			}
777			got = append(got, v)
778			rts, err = ro.Timestamp()
779			if err != nil {
780				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
781			}
782			if err := test.checkTs(rts); err != nil {
783				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
784			}
785			if roTs != rts {
786				t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
787			}
788		}
789		if !testEqual(got, test.want) {
790			t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want)
791		}
792		// SingleUse.ReadUsingIndex
793		got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
794		if err != nil {
795			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err)
796		}
797		// The results from ReadUsingIndex is sorted by the index rather than
798		// primary key.
799		if len(got) != len(test.want) {
800			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
801		}
802		for j, g := range got {
803			if j > 0 {
804				prev := got[j-1][1].(string) + got[j-1][2].(string)
805				curr := got[j][1].(string) + got[j][2].(string)
806				if strings.Compare(prev, curr) > 0 {
807					t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
808				}
809			}
810			found := false
811			for _, w := range test.want {
812				if testEqual(g, w) {
813					found = true
814				}
815			}
816			if !found {
817				t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
818				break
819			}
820		}
821		rts, err = ro.Timestamp()
822		if err != nil {
823			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
824		}
825		if err := test.checkTs(rts); err != nil {
826			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
827		}
828		if roTs != rts {
829			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
830		}
831		// ReadOnlyTransaction.ReadRowUsingIndex
832		got = nil
833		for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} {
834			r, err := ro.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"})
835			if err != nil {
836				continue
837			}
838			v, err := rowToValues(r)
839			if err != nil {
840				continue
841			}
842			got = append(got, v)
843			rts, err = ro.Timestamp()
844			if err != nil {
845				t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err)
846			}
847			if err := test.checkTs(rts); err != nil {
848				t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err)
849			}
850			if roTs != rts {
851				t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
852			}
853		}
854		if !testEqual(got, test.want) {
855			t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRowUsingIndex: %v, want %v", i, got, test.want)
856		}
857		ro.Close()
858	}
859}
860
861// Test ReadOnlyTransaction with different timestamp bound when there's an
862// update at the same time.
863func TestIntegration_UpdateDuringRead(t *testing.T) {
864	t.Parallel()
865
866	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
867	defer cancel()
868	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
869	defer cleanup()
870
871	for i, tb := range []TimestampBound{
872		StrongRead(),
873		ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour
874		ExactStaleness(time.Minute * 30),
875	} {
876		ro := client.ReadOnlyTransaction().WithTimestampBound(tb)
877		_, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
878		if ErrCode(err) != codes.NotFound {
879			t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err)
880		}
881
882		m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i})
883		if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
884			t.Fatal(err)
885		}
886
887		_, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
888		if ErrCode(err) != codes.NotFound {
889			t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err)
890		}
891	}
892}
893
894// Test ReadWriteTransaction.
895func TestIntegration_ReadWriteTransaction(t *testing.T) {
896	t.Parallel()
897
898	// Give a longer deadline because of transaction backoffs.
899	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
900	defer cancel()
901	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
902	defer cleanup()
903
904	// Set up two accounts
905	accounts := []*Mutation{
906		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
907		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
908	}
909	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
910		t.Fatal(err)
911	}
912	wg := sync.WaitGroup{}
913
914	readBalance := func(iter *RowIterator) (int64, error) {
915		defer iter.Stop()
916		var bal int64
917		for {
918			row, err := iter.Next()
919			if err == iterator.Done {
920				return bal, nil
921			}
922			if err != nil {
923				return 0, err
924			}
925			if err := row.Column(0, &bal); err != nil {
926				return 0, err
927			}
928		}
929	}
930
931	for i := 0; i < 20; i++ {
932		wg.Add(1)
933		go func(iter int) {
934			defer wg.Done()
935			_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
936				// Query Foo's balance and Bar's balance.
937				bf, e := readBalance(tx.Query(ctx,
938					Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}}))
939				if e != nil {
940					return e
941				}
942				bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
943				if e != nil {
944					return e
945				}
946				if bf <= 0 {
947					return nil
948				}
949				bf--
950				bb++
951				return tx.BufferWrite([]*Mutation{
952					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}),
953					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}),
954				})
955			})
956			if err != nil {
957				t.Errorf("%d: failed to execute transaction: %v", iter, err)
958			}
959		}(i)
960	}
961	// Because of context timeout, all goroutines will eventually return.
962	wg.Wait()
963	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
964		var bf, bb int64
965		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"})
966		if e != nil {
967			return e
968		}
969		if ce := r.Column(0, &bf); ce != nil {
970			return ce
971		}
972		bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"}))
973		if e != nil {
974			return e
975		}
976		if bf != 30 || bb != 21 {
977			t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21)
978		}
979		return nil
980	})
981	if err != nil {
982		t.Errorf("failed to check balances: %v", err)
983	}
984}
985
986func TestIntegration_Reads(t *testing.T) {
987	t.Parallel()
988
989	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
990	defer cancel()
991	// Set up testing environment.
992	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
993	defer cleanup()
994
995	// Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2".
996	var ms []*Mutation
997	for i := 0; i < 15; i++ {
998		ms = append(ms, InsertOrUpdate(testTable,
999			testTableColumns,
1000			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
1001	}
1002	// Don't use ApplyAtLeastOnce, so we can test the other code path.
1003	if _, err := client.Apply(ctx, ms); err != nil {
1004		t.Fatal(err)
1005	}
1006
1007	// Empty read.
1008	rows, err := readAllTestTable(client.Single().Read(ctx, testTable,
1009		KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns))
1010	if err != nil {
1011		t.Fatal(err)
1012	}
1013	if got, want := len(rows), 0; got != want {
1014		t.Errorf("got %d, want %d", got, want)
1015	}
1016
1017	// Index empty read.
1018	rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex,
1019		KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns))
1020	if err != nil {
1021		t.Fatal(err)
1022	}
1023	if got, want := len(rows), 0; got != want {
1024		t.Errorf("got %d, want %d", got, want)
1025	}
1026
1027	// Point read.
1028	row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns)
1029	if err != nil {
1030		t.Fatal(err)
1031	}
1032	var got testTableRow
1033	if err := row.ToStruct(&got); err != nil {
1034		t.Fatal(err)
1035	}
1036	if want := (testTableRow{"k1", "v1"}); got != want {
1037		t.Errorf("got %v, want %v", got, want)
1038	}
1039
1040	// Point read not found.
1041	_, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns)
1042	if ErrCode(err) != codes.NotFound {
1043		t.Fatalf("got %v, want NotFound", err)
1044	}
1045
1046	// Index point read.
1047	rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns)
1048	if err != nil {
1049		t.Fatal(err)
1050	}
1051	var gotIndex testTableRow
1052	if err := rowIndex.ToStruct(&gotIndex); err != nil {
1053		t.Fatal(err)
1054	}
1055	if wantIndex := (testTableRow{"k1", "v1"}); gotIndex != wantIndex {
1056		t.Errorf("got %v, want %v", gotIndex, wantIndex)
1057	}
1058	// Index point read not found.
1059	_, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v999"}, testTableColumns)
1060	if ErrCode(err) != codes.NotFound {
1061		t.Fatalf("got %v, want NotFound", err)
1062	}
1063	rangeReads(ctx, t, client)
1064	indexRangeReads(ctx, t, client)
1065}
1066
1067func TestIntegration_EarlyTimestamp(t *testing.T) {
1068	t.Parallel()
1069
1070	// Test that we can get the timestamp from a read-only transaction as
1071	// soon as we have read at least one row.
1072	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1073	defer cancel()
1074	// Set up testing environment.
1075	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
1076	defer cleanup()
1077
1078	var ms []*Mutation
1079	for i := 0; i < 3; i++ {
1080		ms = append(ms, InsertOrUpdate(testTable,
1081			testTableColumns,
1082			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
1083	}
1084	if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
1085		t.Fatal(err)
1086	}
1087
1088	txn := client.Single()
1089	iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns)
1090	defer iter.Stop()
1091	// In single-use transaction, we should get an error before reading anything.
1092	if _, err := txn.Timestamp(); err == nil {
1093		t.Error("wanted error, got nil")
1094	}
1095	// After reading one row, the timestamp should be available.
1096	_, err := iter.Next()
1097	if err != nil {
1098		t.Fatal(err)
1099	}
1100	if _, err := txn.Timestamp(); err != nil {
1101		t.Errorf("got %v, want nil", err)
1102	}
1103
1104	txn = client.ReadOnlyTransaction()
1105	defer txn.Close()
1106	iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns)
1107	defer iter.Stop()
1108	// In an ordinary read-only transaction, the timestamp should be
1109	// available immediately.
1110	if _, err := txn.Timestamp(); err != nil {
1111		t.Errorf("got %v, want nil", err)
1112	}
1113}
1114
1115func TestIntegration_NestedTransaction(t *testing.T) {
1116	t.Parallel()
1117
1118	// You cannot use a transaction from inside a read-write transaction.
1119	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1120	defer cancel()
1121	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1122	defer cleanup()
1123
1124	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1125		_, err := client.ReadWriteTransaction(ctx,
1126			func(context.Context, *ReadWriteTransaction) error { return nil })
1127		if ErrCode(err) != codes.FailedPrecondition {
1128			t.Fatalf("got %v, want FailedPrecondition", err)
1129		}
1130		_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
1131		if ErrCode(err) != codes.FailedPrecondition {
1132			t.Fatalf("got %v, want FailedPrecondition", err)
1133		}
1134		rot := client.ReadOnlyTransaction()
1135		defer rot.Close()
1136		_, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
1137		if ErrCode(err) != codes.FailedPrecondition {
1138			t.Fatalf("got %v, want FailedPrecondition", err)
1139		}
1140		return nil
1141	})
1142	if err != nil {
1143		t.Fatal(err)
1144	}
1145}
1146
1147// Test client recovery on database recreation.
1148func TestIntegration_DbRemovalRecovery(t *testing.T) {
1149	t.Parallel()
1150
1151	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1152	defer cancel()
1153	// Create a client with MinOpened=0 to prevent the session pool maintainer
1154	// from repeatedly trying to create sessions for the invalid database.
1155	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, SessionPoolConfig{}, singerDBStatements)
1156	defer cleanup()
1157
1158	// Drop the testing database.
1159	if err := databaseAdmin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil {
1160		t.Fatalf("failed to drop testing database %v: %v", dbPath, err)
1161	}
1162
1163	// Now, send the query.
1164	iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
1165	defer iter.Stop()
1166	if _, err := iter.Next(); err == nil {
1167		t.Errorf("client sends query to removed database successfully, want it to fail")
1168	}
1169
1170	// Recreate database and table.
1171	dbName := dbPath[strings.LastIndex(dbPath, "/")+1:]
1172	op, err := databaseAdmin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{
1173		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
1174		CreateStatement: "CREATE DATABASE " + dbName,
1175		ExtraStatements: []string{
1176			`CREATE TABLE Singers (
1177				SingerId        INT64 NOT NULL,
1178				FirstName       STRING(1024),
1179				LastName        STRING(1024),
1180				SingerInfo      BYTES(MAX)
1181			) PRIMARY KEY (SingerId)`,
1182		},
1183	})
1184	if err != nil {
1185		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
1186	}
1187	if _, err := op.Wait(ctx); err != nil {
1188		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
1189	}
1190
1191	// Now, send the query again.
1192	iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
1193	defer iter.Stop()
1194	_, err = iter.Next()
1195	if err != nil && err != iterator.Done {
1196		t.Errorf("failed to send query to database %v: %v", dbPath, err)
1197	}
1198}
1199
1200// Test encoding/decoding non-struct Cloud Spanner types.
1201func TestIntegration_BasicTypes(t *testing.T) {
1202	t.Parallel()
1203
1204	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1205	defer cancel()
1206	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1207	defer cleanup()
1208
1209	t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z")
1210	// Boundaries
1211	t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z")
1212	t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z")
1213	d1, _ := civil.ParseDate("2016-11-15")
1214	// Boundaries
1215	d2, _ := civil.ParseDate("0001-01-01")
1216	d3, _ := civil.ParseDate("9999-12-31")
1217
1218	tests := []struct {
1219		col  string
1220		val  interface{}
1221		want interface{}
1222	}{
1223		{col: "String", val: ""},
1224		{col: "String", val: "", want: NullString{"", true}},
1225		{col: "String", val: "foo"},
1226		{col: "String", val: "foo", want: NullString{"foo", true}},
1227		{col: "String", val: NullString{"bar", true}, want: "bar"},
1228		{col: "String", val: NullString{"bar", false}, want: NullString{"", false}},
1229		{col: "String", val: nil, want: NullString{}},
1230		{col: "StringArray", val: []string(nil), want: []NullString(nil)},
1231		{col: "StringArray", val: []string{}, want: []NullString{}},
1232		{col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}},
1233		{col: "StringArray", val: []NullString(nil)},
1234		{col: "StringArray", val: []NullString{}},
1235		{col: "StringArray", val: []NullString{{"foo", true}, {}}},
1236		{col: "Bytes", val: []byte{}},
1237		{col: "Bytes", val: []byte{1, 2, 3}},
1238		{col: "Bytes", val: []byte(nil)},
1239		{col: "BytesArray", val: [][]byte(nil)},
1240		{col: "BytesArray", val: [][]byte{}},
1241		{col: "BytesArray", val: [][]byte{{1}, {2, 3}}},
1242		{col: "Int64a", val: 0, want: int64(0)},
1243		{col: "Int64a", val: -1, want: int64(-1)},
1244		{col: "Int64a", val: 2, want: int64(2)},
1245		{col: "Int64a", val: int64(3)},
1246		{col: "Int64a", val: 4, want: NullInt64{4, true}},
1247		{col: "Int64a", val: NullInt64{5, true}, want: int64(5)},
1248		{col: "Int64a", val: NullInt64{6, true}, want: int64(6)},
1249		{col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}},
1250		{col: "Int64a", val: nil, want: NullInt64{}},
1251		{col: "Int64Array", val: []int(nil), want: []NullInt64(nil)},
1252		{col: "Int64Array", val: []int{}, want: []NullInt64{}},
1253		{col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
1254		{col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)},
1255		{col: "Int64Array", val: []int64{}, want: []NullInt64{}},
1256		{col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
1257		{col: "Int64Array", val: []NullInt64(nil)},
1258		{col: "Int64Array", val: []NullInt64{}},
1259		{col: "Int64Array", val: []NullInt64{{1, true}, {}}},
1260		{col: "Bool", val: false},
1261		{col: "Bool", val: true},
1262		{col: "Bool", val: false, want: NullBool{false, true}},
1263		{col: "Bool", val: true, want: NullBool{true, true}},
1264		{col: "Bool", val: NullBool{true, true}},
1265		{col: "Bool", val: NullBool{false, false}},
1266		{col: "Bool", val: nil, want: NullBool{}},
1267		{col: "BoolArray", val: []bool(nil), want: []NullBool(nil)},
1268		{col: "BoolArray", val: []bool{}, want: []NullBool{}},
1269		{col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}},
1270		{col: "BoolArray", val: []NullBool(nil)},
1271		{col: "BoolArray", val: []NullBool{}},
1272		{col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}},
1273		{col: "Float64", val: 0.0},
1274		{col: "Float64", val: 3.14},
1275		{col: "Float64", val: math.NaN()},
1276		{col: "Float64", val: math.Inf(1)},
1277		{col: "Float64", val: math.Inf(-1)},
1278		{col: "Float64", val: 2.78, want: NullFloat64{2.78, true}},
1279		{col: "Float64", val: NullFloat64{2.71, true}, want: 2.71},
1280		{col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}},
1281		{col: "Float64", val: NullFloat64{0, false}},
1282		{col: "Float64", val: nil, want: NullFloat64{}},
1283		{col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)},
1284		{col: "Float64Array", val: []float64{}, want: []NullFloat64{}},
1285		{col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}},
1286		{col: "Float64Array", val: []NullFloat64(nil)},
1287		{col: "Float64Array", val: []NullFloat64{}},
1288		{col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}},
1289		{col: "Date", val: d1},
1290		{col: "Date", val: d1, want: NullDate{d1, true}},
1291		{col: "Date", val: NullDate{d1, true}},
1292		{col: "Date", val: NullDate{d1, true}, want: d1},
1293		{col: "Date", val: NullDate{civil.Date{}, false}},
1294		{col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)},
1295		{col: "DateArray", val: []civil.Date{}, want: []NullDate{}},
1296		{col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}},
1297		{col: "Timestamp", val: t1},
1298		{col: "Timestamp", val: t1, want: NullTime{t1, true}},
1299		{col: "Timestamp", val: NullTime{t1, true}},
1300		{col: "Timestamp", val: NullTime{t1, true}, want: t1},
1301		{col: "Timestamp", val: NullTime{}},
1302		{col: "Timestamp", val: nil, want: NullTime{}},
1303		{col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)},
1304		{col: "TimestampArray", val: []time.Time{}, want: []NullTime{}},
1305		{col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}},
1306	}
1307
1308	// Write rows into table first.
1309	var muts []*Mutation
1310	for i, test := range tests {
1311		muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val}))
1312	}
1313	if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil {
1314		t.Fatal(err)
1315	}
1316
1317	for i, test := range tests {
1318		row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col})
1319		if err != nil {
1320			t.Fatalf("Unable to fetch row %v: %v", i, err)
1321		}
1322		// Create new instance of type of test.want.
1323		want := test.want
1324		if want == nil {
1325			want = test.val
1326		}
1327		gotp := reflect.New(reflect.TypeOf(want))
1328		if err := row.Column(0, gotp.Interface()); err != nil {
1329			t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err)
1330			continue
1331		}
1332		got := reflect.Indirect(gotp).Interface()
1333
1334		// One of the test cases is checking NaN handling.  Given
1335		// NaN!=NaN, we can't use reflect to test for it.
1336		if isNaN(got) && isNaN(want) {
1337			continue
1338		}
1339
1340		// Check non-NaN cases.
1341		if !testEqual(got, want) {
1342			t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want)
1343			continue
1344		}
1345	}
1346}
1347
1348// Test decoding Cloud Spanner STRUCT type.
1349func TestIntegration_StructTypes(t *testing.T) {
1350	t.Parallel()
1351
1352	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1353	defer cancel()
1354	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1355	defer cleanup()
1356
1357	tests := []struct {
1358		q    Statement
1359		want func(r *Row) error
1360	}{
1361		{
1362			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`},
1363			want: func(r *Row) error {
1364				// Test STRUCT ARRAY decoding to []NullRow.
1365				var rows []NullRow
1366				if err := r.Column(0, &rows); err != nil {
1367					return err
1368				}
1369				if len(rows) != 1 {
1370					return fmt.Errorf("len(rows) = %d; want 1", len(rows))
1371				}
1372				if !rows[0].Valid {
1373					return fmt.Errorf("rows[0] is NULL")
1374				}
1375				var i, j int64
1376				if err := rows[0].Row.Columns(&i, &j); err != nil {
1377					return err
1378				}
1379				if i != 1 || j != 2 {
1380					return fmt.Errorf("got (%d,%d), want (1,2)", i, j)
1381				}
1382				return nil
1383			},
1384		},
1385		{
1386			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`},
1387			want: func(r *Row) error {
1388				// Test Row.ToStruct.
1389				s := struct {
1390					Col1 []*struct {
1391						Foo int64 `spanner:"foo"`
1392						Bar int64 `spanner:"bar"`
1393					} `spanner:"col1"`
1394				}{}
1395				if err := r.ToStruct(&s); err != nil {
1396					return err
1397				}
1398				want := struct {
1399					Col1 []*struct {
1400						Foo int64 `spanner:"foo"`
1401						Bar int64 `spanner:"bar"`
1402					} `spanner:"col1"`
1403				}{
1404					Col1: []*struct {
1405						Foo int64 `spanner:"foo"`
1406						Bar int64 `spanner:"bar"`
1407					}{
1408						{
1409							Foo: 1,
1410							Bar: 2,
1411						},
1412					},
1413				}
1414				if !testEqual(want, s) {
1415					return fmt.Errorf("unexpected decoding result: %v, want %v", s, want)
1416				}
1417				return nil
1418			},
1419		},
1420	}
1421	for i, test := range tests {
1422		iter := client.Single().Query(ctx, test.q)
1423		defer iter.Stop()
1424		row, err := iter.Next()
1425		if err != nil {
1426			t.Errorf("%d: %v", i, err)
1427			continue
1428		}
1429		if err := test.want(row); err != nil {
1430			t.Errorf("%d: %v", i, err)
1431			continue
1432		}
1433	}
1434}
1435
1436func TestIntegration_StructParametersUnsupported(t *testing.T) {
1437	skipEmulatorTest(t)
1438	t.Parallel()
1439
1440	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1441	defer cancel()
1442	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
1443	defer cleanup()
1444
1445	for _, test := range []struct {
1446		param       interface{}
1447		wantCode    codes.Code
1448		wantMsgPart string
1449	}{
1450		{
1451			struct {
1452				Field int
1453			}{10},
1454			codes.Unimplemented,
1455			"Unsupported query shape: " +
1456				"A struct value cannot be returned as a column value. " +
1457				"Rewrite the query to flatten the struct fields in the result.",
1458		},
1459		{
1460			[]struct {
1461				Field int
1462			}{{10}, {20}},
1463			codes.Unimplemented,
1464			"Unsupported query shape: " +
1465				"This query can return a null-valued array of struct, " +
1466				"which is not supported by Spanner.",
1467		},
1468	} {
1469		iter := client.Single().Query(ctx, Statement{
1470			SQL:    "SELECT @p",
1471			Params: map[string]interface{}{"p": test.param},
1472		})
1473		_, err := iter.Next()
1474		iter.Stop()
1475		if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok {
1476			t.Fatal(msg)
1477		}
1478	}
1479}
1480
1481// Test queries of the form "SELECT expr".
1482func TestIntegration_QueryExpressions(t *testing.T) {
1483	t.Parallel()
1484
1485	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1486	defer cancel()
1487	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
1488	defer cleanup()
1489
1490	newRow := func(vals []interface{}) *Row {
1491		row, err := NewRow(make([]string, len(vals)), vals)
1492		if err != nil {
1493			t.Fatal(err)
1494		}
1495		return row
1496	}
1497
1498	tests := []struct {
1499		expr string
1500		want interface{}
1501	}{
1502		{"1", int64(1)},
1503		{"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}},
1504		{"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}},
1505		{"IEEE_DIVIDE(1, 0)", math.Inf(1)},
1506		{"IEEE_DIVIDE(-1, 0)", math.Inf(-1)},
1507		{"IEEE_DIVIDE(0, 0)", math.NaN()},
1508		// TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate.
1509		{"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}},
1510		{"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}},
1511		{"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}},
1512	}
1513	for _, test := range tests {
1514		iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr})
1515		defer iter.Stop()
1516		row, err := iter.Next()
1517		if err != nil {
1518			t.Errorf("%q: %v", test.expr, err)
1519			continue
1520		}
1521		// Create new instance of type of test.want.
1522		gotp := reflect.New(reflect.TypeOf(test.want))
1523		if err := row.Column(0, gotp.Interface()); err != nil {
1524			t.Errorf("%q: Column returned error %v", test.expr, err)
1525			continue
1526		}
1527		got := reflect.Indirect(gotp).Interface()
1528		// TODO(jba): remove isNaN special case when we have a better equality predicate.
1529		if isNaN(got) && isNaN(test.want) {
1530			continue
1531		}
1532		if !testEqual(got, test.want) {
1533			t.Errorf("%q\n got  %#v\nwant %#v", test.expr, got, test.want)
1534		}
1535	}
1536}
1537
1538func TestIntegration_QueryStats(t *testing.T) {
1539	skipEmulatorTest(t)
1540	t.Parallel()
1541
1542	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1543	defer cancel()
1544	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1545	defer cleanup()
1546
1547	accounts := []*Mutation{
1548		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1549		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1550	}
1551	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1552		t.Fatal(err)
1553	}
1554	const sql = "SELECT Balance FROM Accounts"
1555
1556	qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil})
1557	if err != nil {
1558		t.Fatal(err)
1559	}
1560	if len(qp.PlanNodes) == 0 {
1561		t.Error("got zero plan nodes, expected at least one")
1562	}
1563
1564	iter := client.Single().QueryWithStats(ctx, Statement{sql, nil})
1565	defer iter.Stop()
1566	for {
1567		_, err := iter.Next()
1568		if err == iterator.Done {
1569			break
1570		}
1571		if err != nil {
1572			t.Fatal(err)
1573		}
1574	}
1575	if iter.QueryPlan == nil {
1576		t.Error("got nil QueryPlan, expected one")
1577	}
1578	if iter.QueryStats == nil {
1579		t.Error("got nil QueryStats, expected some")
1580	}
1581}
1582
1583func TestIntegration_InvalidDatabase(t *testing.T) {
1584	t.Parallel()
1585
1586	if databaseAdmin == nil {
1587		t.Skip("Integration tests skipped")
1588	}
1589	ctx := context.Background()
1590	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID)
1591	c, err := createClient(ctx, dbPath, SessionPoolConfig{})
1592	// Client creation should succeed even if the database is invalid.
1593	if err != nil {
1594		t.Fatal(err)
1595	}
1596	_, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"})
1597	if msg, ok := matchError(err, codes.NotFound, ""); !ok {
1598		t.Fatal(msg)
1599	}
1600}
1601
1602func TestIntegration_ReadErrors(t *testing.T) {
1603	t.Parallel()
1604
1605	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1606	defer cancel()
1607	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
1608	defer cleanup()
1609
1610	var ms []*Mutation
1611	for i := 0; i < 2; i++ {
1612		ms = append(ms, InsertOrUpdate(testTable,
1613			testTableColumns,
1614			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v")}))
1615	}
1616	if _, err := client.Apply(ctx, ms); err != nil {
1617		t.Fatal(err)
1618	}
1619
1620	// Read over invalid table fails
1621	_, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"})
1622	if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok {
1623		t.Error(msg)
1624	}
1625	// Read over invalid column fails
1626	_, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"})
1627	if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok {
1628		t.Error(msg)
1629	}
1630
1631	// Invalid query fails
1632	iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"})
1633	defer iter.Stop()
1634	_, err = iter.Next()
1635	if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok {
1636		t.Error(msg)
1637	}
1638
1639	// Read should fail on cancellation.
1640	cctx, cancel := context.WithCancel(ctx)
1641	cancel()
1642	_, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"})
1643	if msg, ok := matchError(err, codes.Canceled, ""); !ok {
1644		t.Error(msg)
1645	}
1646	// Read should fail if deadline exceeded.
1647	dctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
1648	defer cancel()
1649	<-dctx.Done()
1650	_, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"})
1651	if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok {
1652		t.Error(msg)
1653	}
1654	// Read should fail if there are multiple rows returned.
1655	_, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v"}, testTableColumns)
1656	wantMsgPart := fmt.Sprintf("more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", testTable, Key{"v"}, testTableIndex)
1657	if msg, ok := matchError(err, codes.FailedPrecondition, wantMsgPart); !ok {
1658		t.Error(msg)
1659	}
1660}
1661
1662// Test TransactionRunner. Test that transactions are aborted and retried as
1663// expected.
1664func TestIntegration_TransactionRunner(t *testing.T) {
1665	skipEmulatorTest(t)
1666	t.Parallel()
1667
1668	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1669	defer cancel()
1670	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1671	defer cleanup()
1672
1673	// Test 1: User error should abort the transaction.
1674	_, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1675		tx.BufferWrite([]*Mutation{
1676			Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})})
1677		return errors.New("user error")
1678	})
1679	// Empty read.
1680	rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"}))
1681	if err != nil {
1682		t.Fatal(err)
1683	}
1684	if got, want := len(rows), 0; got != want {
1685		t.Errorf("Empty read, got %d, want %d.", got, want)
1686	}
1687
1688	// Test 2: Expect abort and retry.
1689	// We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by
1690	// committing writes to the column txn2 have read, and expect the following
1691	// read to abort and txn2 retries.
1692
1693	// Set up two accounts
1694	accounts := []*Mutation{
1695		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}),
1696		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}),
1697	}
1698	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1699		t.Fatal(err)
1700	}
1701
1702	var (
1703		cTxn1Start  = make(chan struct{})
1704		cTxn1Commit = make(chan struct{})
1705		cTxn2Start  = make(chan struct{})
1706		wg          sync.WaitGroup
1707	)
1708
1709	// read balance, check error if we don't expect abort.
1710	readBalance := func(tx interface {
1711		ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error)
1712	}, key int64, expectAbort bool) (int64, error) {
1713		var b int64
1714		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"})
1715		if e != nil {
1716			if expectAbort && !isAbortErr(e) {
1717				t.Errorf("ReadRow got %v, want Abort error.", e)
1718			}
1719			// Verify that we received and are able to extract retry info from
1720			// the aborted error.
1721			if _, hasRetryInfo := extractRetryDelay(e); !hasRetryInfo {
1722				t.Errorf("Got Abort error without RetryInfo\nGot: %v", e)
1723			}
1724			return b, e
1725		}
1726		if ce := r.Column(0, &b); ce != nil {
1727			return b, ce
1728		}
1729		return b, nil
1730	}
1731
1732	wg.Add(2)
1733	// Txn 1
1734	go func() {
1735		defer wg.Done()
1736		var once sync.Once
1737		_, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1738			b, e := readBalance(tx, 1, false)
1739			if e != nil {
1740				return e
1741			}
1742			// txn 1 can abort, in that case we skip closing the channel on
1743			// retry.
1744			once.Do(func() { close(cTxn1Start) })
1745			e = tx.BufferWrite([]*Mutation{
1746				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})})
1747			if e != nil {
1748				return e
1749			}
1750			// Wait for second transaction.
1751			<-cTxn2Start
1752			return nil
1753		})
1754		close(cTxn1Commit)
1755		if e != nil {
1756			t.Errorf("Transaction 1 commit, got %v, want nil.", e)
1757		}
1758	}()
1759	// Txn 2
1760	go func() {
1761		// Wait until txn 1 starts.
1762		<-cTxn1Start
1763		defer wg.Done()
1764		var (
1765			once sync.Once
1766			b1   int64
1767			b2   int64
1768			e    error
1769		)
1770		_, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1771			if b1, e = readBalance(tx, 1, false); e != nil {
1772				return e
1773			}
1774			// Skip closing channel on retry.
1775			once.Do(func() { close(cTxn2Start) })
1776			// Wait until txn 1 successfully commits.
1777			<-cTxn1Commit
1778			// Txn1 has committed and written a balance to the account. Now this
1779			// transaction (txn2) reads and re-writes the balance. The first
1780			// time through, it will abort because it overlaps with txn1. Then
1781			// it will retry after txn1 commits, and succeed.
1782			if b2, e = readBalance(tx, 2, true); e != nil {
1783				return e
1784			}
1785			return tx.BufferWrite([]*Mutation{
1786				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})})
1787		})
1788		if e != nil {
1789			t.Errorf("Transaction 2 commit, got %v, want nil.", e)
1790		}
1791	}()
1792	wg.Wait()
1793	// Check that both transactions' effects are visible.
1794	for i := int64(1); i <= int64(2); i++ {
1795		if b, e := readBalance(client.Single(), i, false); e != nil {
1796			t.Fatalf("ReadBalance for key %d error %v.", i, e)
1797		} else if b != i {
1798			t.Errorf("Balance for key %d, got %d, want %d.", i, b, i)
1799		}
1800	}
1801}
1802
1803// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then
1804// serialize and deserialize both transaction and partition to be used in
1805// execution on another client, and compare results.
1806func TestIntegration_BatchQuery(t *testing.T) {
1807	skipEmulatorTest(t)
1808	t.Parallel()
1809
1810	// Set up testing environment.
1811	var (
1812		client2 *Client
1813		err     error
1814	)
1815	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1816	defer cancel()
1817	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
1818	defer cleanup()
1819
1820	if err = populate(ctx, client); err != nil {
1821		t.Fatal(err)
1822	}
1823	if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
1824		t.Fatal(err)
1825	}
1826	defer client2.Close()
1827
1828	// PartitionQuery
1829	var (
1830		txn        *BatchReadOnlyTransaction
1831		partitions []*Partition
1832		stmt       = Statement{SQL: "SELECT * FROM test;"}
1833	)
1834
1835	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1836		t.Fatal(err)
1837	}
1838	defer txn.Cleanup(ctx)
1839	if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil {
1840		t.Fatal(err)
1841	}
1842
1843	// Reconstruct BatchReadOnlyTransactionID and execute partitions
1844	var (
1845		tid2      BatchReadOnlyTransactionID
1846		data      []byte
1847		gotResult bool // if we get matching result from two separate txns
1848	)
1849	if data, err = txn.ID.MarshalBinary(); err != nil {
1850		t.Fatalf("encoding failed %v", err)
1851	}
1852	if err = tid2.UnmarshalBinary(data); err != nil {
1853		t.Fatalf("decoding failed %v", err)
1854	}
1855	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
1856
1857	// Execute Partitions and compare results
1858	for i, p := range partitions {
1859		iter := txn.Execute(ctx, p)
1860		defer iter.Stop()
1861		p2 := serdesPartition(t, i, p)
1862		iter2 := txn2.Execute(ctx, &p2)
1863		defer iter2.Stop()
1864
1865		row1, err1 := iter.Next()
1866		row2, err2 := iter2.Next()
1867		if err1 != err2 {
1868			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
1869			continue
1870		}
1871		if !testEqual(row1, row2) {
1872			t.Fatalf("execution returned different values: %v, %v", row1, row2)
1873			continue
1874		}
1875		if row1 == nil {
1876			continue
1877		}
1878		var a, b string
1879		if err = row1.Columns(&a, &b); err != nil {
1880			t.Fatalf("failed to parse row %v", err)
1881			continue
1882		}
1883		if a == str1 && b == str2 {
1884			gotResult = true
1885		}
1886	}
1887	if !gotResult {
1888		t.Fatalf("execution didn't return expected values")
1889	}
1890}
1891
1892// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery
1893func TestIntegration_BatchRead(t *testing.T) {
1894	skipEmulatorTest(t)
1895	t.Parallel()
1896
1897	// Set up testing environment.
1898	var (
1899		client2 *Client
1900		err     error
1901	)
1902	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1903	defer cancel()
1904	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
1905	defer cleanup()
1906
1907	if err = populate(ctx, client); err != nil {
1908		t.Fatal(err)
1909	}
1910	if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
1911		t.Fatal(err)
1912	}
1913	defer client2.Close()
1914
1915	// PartitionRead
1916	var (
1917		txn        *BatchReadOnlyTransaction
1918		partitions []*Partition
1919	)
1920
1921	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1922		t.Fatal(err)
1923	}
1924	defer txn.Cleanup(ctx)
1925	if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
1926		t.Fatal(err)
1927	}
1928
1929	// Reconstruct BatchReadOnlyTransactionID and execute partitions.
1930	var (
1931		tid2      BatchReadOnlyTransactionID
1932		data      []byte
1933		gotResult bool // if we get matching result from two separate txns
1934	)
1935	if data, err = txn.ID.MarshalBinary(); err != nil {
1936		t.Fatalf("encoding failed %v", err)
1937	}
1938	if err = tid2.UnmarshalBinary(data); err != nil {
1939		t.Fatalf("decoding failed %v", err)
1940	}
1941	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
1942
1943	// Execute Partitions and compare results.
1944	for i, p := range partitions {
1945		iter := txn.Execute(ctx, p)
1946		defer iter.Stop()
1947		p2 := serdesPartition(t, i, p)
1948		iter2 := txn2.Execute(ctx, &p2)
1949		defer iter2.Stop()
1950
1951		row1, err1 := iter.Next()
1952		row2, err2 := iter2.Next()
1953		if err1 != err2 {
1954			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
1955			continue
1956		}
1957		if !testEqual(row1, row2) {
1958			t.Fatalf("execution returned different values: %v, %v", row1, row2)
1959			continue
1960		}
1961		if row1 == nil {
1962			continue
1963		}
1964		var a, b string
1965		if err = row1.Columns(&a, &b); err != nil {
1966			t.Fatalf("failed to parse row %v", err)
1967			continue
1968		}
1969		if a == str1 && b == str2 {
1970			gotResult = true
1971		}
1972	}
1973	if !gotResult {
1974		t.Fatalf("execution didn't return expected values")
1975	}
1976}
1977
1978// Test normal txReadEnv method on BatchReadOnlyTransaction.
1979func TestIntegration_BROTNormal(t *testing.T) {
1980	skipEmulatorTest(t)
1981	t.Parallel()
1982
1983	// Set up testing environment and create txn.
1984	var (
1985		txn *BatchReadOnlyTransaction
1986		err error
1987		row *Row
1988		i   int64
1989	)
1990	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1991	defer cancel()
1992	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
1993	defer cleanup()
1994
1995	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1996		t.Fatal(err)
1997	}
1998	defer txn.Cleanup(ctx)
1999	if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
2000		t.Fatal(err)
2001	}
2002	// Normal query should work with BatchReadOnlyTransaction.
2003	stmt2 := Statement{SQL: "SELECT 1"}
2004	iter := txn.Query(ctx, stmt2)
2005	defer iter.Stop()
2006
2007	row, err = iter.Next()
2008	if err != nil {
2009		t.Errorf("query failed with %v", err)
2010	}
2011	if err = row.Columns(&i); err != nil {
2012		t.Errorf("failed to parse row %v", err)
2013	}
2014}
2015
2016func TestIntegration_CommitTimestamp(t *testing.T) {
2017	t.Parallel()
2018
2019	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2020	defer cancel()
2021	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, ctsDBStatements)
2022	defer cleanup()
2023
2024	type testTableRow struct {
2025		Key string
2026		Ts  NullTime
2027	}
2028
2029	var (
2030		cts1, cts2, ts1, ts2 time.Time
2031		err                  error
2032	)
2033
2034	// Apply mutation in sequence, expect to see commit timestamp in good order,
2035	// check also the commit timestamp returned
2036	for _, it := range []struct {
2037		k string
2038		t *time.Time
2039	}{
2040		{"a", &cts1},
2041		{"b", &cts2},
2042	} {
2043		tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}}
2044		m, err := InsertStruct("TestTable", tt)
2045		if err != nil {
2046			t.Fatal(err)
2047		}
2048		*it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce())
2049		if err != nil {
2050			t.Fatal(err)
2051		}
2052	}
2053
2054	txn := client.ReadOnlyTransaction()
2055	for _, it := range []struct {
2056		k string
2057		t *time.Time
2058	}{
2059		{"a", &ts1},
2060		{"b", &ts2},
2061	} {
2062		if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil {
2063			t.Fatal(err)
2064		} else {
2065			var got testTableRow
2066			if err := r.ToStruct(&got); err != nil {
2067				t.Fatal(err)
2068			}
2069			*it.t = got.Ts.Time
2070		}
2071	}
2072	if !cts1.Equal(ts1) {
2073		t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1)
2074	}
2075	if !cts2.Equal(ts2) {
2076		t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2)
2077	}
2078
2079	// Try writing a timestamp in the future to commit timestamp, expect error.
2080	_, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce())
2081	if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok {
2082		t.Error(msg)
2083	}
2084}
2085
2086func TestIntegration_DML(t *testing.T) {
2087	t.Parallel()
2088
2089	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2090	defer cancel()
2091	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2092	defer cleanup()
2093
2094	// Function that reads a single row's first name from within a transaction.
2095	readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) {
2096		row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"})
2097		if err != nil {
2098			return "", err
2099		}
2100		var fn string
2101		if err := row.Column(0, &fn); err != nil {
2102			return "", err
2103		}
2104		return fn, nil
2105	}
2106
2107	// Function that reads multiple rows' first names from outside a read/write
2108	// transaction.
2109	readFirstNames := func(keys ...int) []string {
2110		var ks []KeySet
2111		for _, k := range keys {
2112			ks = append(ks, Key{k})
2113		}
2114		iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"})
2115		var got []string
2116		var fn string
2117		err := iter.Do(func(row *Row) error {
2118			if err := row.Column(0, &fn); err != nil {
2119				return err
2120			}
2121			got = append(got, fn)
2122			return nil
2123		})
2124		if err != nil {
2125			t.Fatalf("readFirstNames(%v): %v", keys, err)
2126		}
2127		return got
2128	}
2129
2130	// Use ReadWriteTransaction.Query to execute a DML statement.
2131	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2132		iter := tx.Query(ctx, Statement{
2133			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`,
2134		})
2135		defer iter.Stop()
2136		if row, err := iter.Next(); err != iterator.Done {
2137			t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err)
2138		}
2139		if iter.RowCount != 1 {
2140			t.Errorf("row count: got %d, want 1", iter.RowCount)
2141		}
2142		// The results of the DML statement should be visible to the transaction.
2143		got, err := readFirstName(tx, 1)
2144		if err != nil {
2145			return err
2146		}
2147		if want := "Umm"; got != want {
2148			t.Errorf("got %q, want %q", got, want)
2149		}
2150		return nil
2151	})
2152	if err != nil {
2153		t.Fatal(err)
2154	}
2155
2156	// Use ReadWriteTransaction.Update to execute a DML statement.
2157	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2158		count, err := tx.Update(ctx, Statement{
2159			SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`,
2160		})
2161		if err != nil {
2162			t.Fatal(err)
2163		}
2164		if count != 1 {
2165			t.Errorf("row count: got %d, want 1", count)
2166		}
2167		got, err := readFirstName(tx, 2)
2168		if err != nil {
2169			return err
2170		}
2171		if want := "Eduard"; got != want {
2172			t.Errorf("got %q, want %q", got, want)
2173		}
2174		return nil
2175
2176	})
2177	if err != nil {
2178		t.Fatal(err)
2179	}
2180
2181	// Roll back a DML statement and confirm that it didn't happen.
2182	var fail = errors.New("fail")
2183	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2184		_, err := tx.Update(ctx, Statement{
2185			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
2186		})
2187		if err != nil {
2188			return err
2189		}
2190		return fail
2191	})
2192	if err != fail {
2193		t.Fatalf("rolling back: got error %v, want the error 'fail'", err)
2194	}
2195	_, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"})
2196	if got, want := ErrCode(err), codes.NotFound; got != want {
2197		t.Errorf("got %s, want %s", got, want)
2198	}
2199
2200	// Run two DML statements in the same transaction.
2201	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2202		_, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`})
2203		if err != nil {
2204			return err
2205		}
2206		_, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`})
2207		if err != nil {
2208			return err
2209		}
2210		return nil
2211	})
2212	if err != nil {
2213		t.Fatal(err)
2214	}
2215	got := readFirstNames(1, 2)
2216	want := []string{"Oum", "Eddie"}
2217	if !testEqual(got, want) {
2218		t.Errorf("got %v, want %v", got, want)
2219	}
2220
2221	// Run a DML statement and an ordinary mutation in the same transaction.
2222	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2223		_, err := tx.Update(ctx, Statement{
2224			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
2225		})
2226		if err != nil {
2227			return err
2228		}
2229		return tx.BufferWrite([]*Mutation{
2230			Insert("Singers", []string{"SingerId", "FirstName", "LastName"},
2231				[]interface{}{4, "Andy", "Irvine"}),
2232		})
2233	})
2234	if err != nil {
2235		t.Fatal(err)
2236	}
2237	got = readFirstNames(3, 4)
2238	want = []string{"Audra", "Andy"}
2239	if !testEqual(got, want) {
2240		t.Errorf("got %v, want %v", got, want)
2241	}
2242
2243	// Attempt to run a query using update.
2244	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2245		_, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`})
2246		return err
2247	})
2248	if got, want := ErrCode(err), codes.InvalidArgument; got != want {
2249		t.Errorf("got %s, want %s", got, want)
2250	}
2251}
2252
2253func TestIntegration_StructParametersBind(t *testing.T) {
2254	t.Parallel()
2255
2256	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2257	defer cancel()
2258	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
2259	defer cleanup()
2260
2261	type tRow []interface{}
2262	type tRows []struct{ trow tRow }
2263
2264	type allFields struct {
2265		Stringf string
2266		Intf    int
2267		Boolf   bool
2268		Floatf  float64
2269		Bytef   []byte
2270		Timef   time.Time
2271		Datef   civil.Date
2272	}
2273	allColumns := []string{
2274		"Stringf",
2275		"Intf",
2276		"Boolf",
2277		"Floatf",
2278		"Bytef",
2279		"Timef",
2280		"Datef",
2281	}
2282	s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1}
2283	s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2}
2284
2285	dynamicStructType := reflect.StructOf([]reflect.StructField{
2286		{Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`},
2287	})
2288	s3 := reflect.New(dynamicStructType)
2289	s3.Elem().Field(0).Set(reflect.ValueOf(t1))
2290
2291	for i, test := range []struct {
2292		param interface{}
2293		sql   string
2294		cols  []string
2295		trows tRows
2296	}{
2297		// Struct value.
2298		{
2299			s1,
2300			"SELECT" +
2301				" @p.Stringf," +
2302				" @p.Intf," +
2303				" @p.Boolf," +
2304				" @p.Floatf," +
2305				" @p.Bytef," +
2306				" @p.Timef," +
2307				" @p.Datef",
2308			allColumns,
2309			tRows{
2310				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
2311			},
2312		},
2313		// Array of struct value.
2314		{
2315			[]allFields{s1, s2},
2316			"SELECT * FROM UNNEST(@p)",
2317			allColumns,
2318			tRows{
2319				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
2320				{tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}},
2321			},
2322		},
2323		// Null struct.
2324		{
2325			(*allFields)(nil),
2326			"SELECT @p IS NULL",
2327			[]string{""},
2328			tRows{
2329				{tRow{true}},
2330			},
2331		},
2332		// Null Array of struct.
2333		{
2334			[]allFields(nil),
2335			"SELECT @p IS NULL",
2336			[]string{""},
2337			tRows{
2338				{tRow{true}},
2339			},
2340		},
2341		// Empty struct.
2342		{
2343			struct{}{},
2344			"SELECT @p IS NULL ",
2345			[]string{""},
2346			tRows{
2347				{tRow{false}},
2348			},
2349		},
2350		// Empty array of struct.
2351		{
2352			[]allFields{},
2353			"SELECT * FROM UNNEST(@p) ",
2354			allColumns,
2355			tRows{},
2356		},
2357		// Struct with duplicate fields.
2358		{
2359			struct {
2360				A int `spanner:"field"`
2361				B int `spanner:"field"`
2362			}{10, 20},
2363			"SELECT * FROM UNNEST([@p]) ",
2364			[]string{"field", "field"},
2365			tRows{
2366				{tRow{10, 20}},
2367			},
2368		},
2369		// Struct with unnamed fields.
2370		{
2371			struct {
2372				A string `spanner:""`
2373			}{"hello"},
2374			"SELECT * FROM UNNEST([@p]) ",
2375			[]string{""},
2376			tRows{
2377				{tRow{"hello"}},
2378			},
2379		},
2380		// Mixed struct.
2381		{
2382			struct {
2383				DynamicStructField interface{}  `spanner:"f1"`
2384				ArrayStructField   []*allFields `spanner:"f2"`
2385			}{
2386				DynamicStructField: s3.Interface(),
2387				ArrayStructField:   []*allFields{nil},
2388			},
2389			"SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ",
2390			[]string{"ff1", "", ""},
2391			tRows{
2392				{tRow{t1, 1, true}},
2393			},
2394		},
2395	} {
2396		iter := client.Single().Query(ctx, Statement{
2397			SQL:    test.sql,
2398			Params: map[string]interface{}{"p": test.param},
2399		})
2400		var gotRows []*Row
2401		err := iter.Do(func(r *Row) error {
2402			gotRows = append(gotRows, r)
2403			return nil
2404		})
2405		if err != nil {
2406			t.Errorf("Failed to execute test case %d, error: %v", i, err)
2407		}
2408
2409		var wantRows []*Row
2410		for j, row := range test.trows {
2411			r, err := NewRow(test.cols, row.trow)
2412			if err != nil {
2413				t.Errorf("Invalid row %d in test case %d", j, i)
2414			}
2415			wantRows = append(wantRows, r)
2416		}
2417		if !testEqual(gotRows, wantRows) {
2418			t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows)
2419		}
2420	}
2421}
2422
2423func TestIntegration_PDML(t *testing.T) {
2424	skipEmulatorTest(t)
2425	t.Parallel()
2426
2427	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2428	defer cancel()
2429	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2430	defer cleanup()
2431
2432	columns := []string{"SingerId", "FirstName", "LastName"}
2433
2434	// Populate the Singers table.
2435	var muts []*Mutation
2436	for _, row := range [][]interface{}{
2437		{1, "Umm", "Kulthum"},
2438		{2, "Eduard", "Khil"},
2439		{3, "Audra", "McDonald"},
2440		{4, "Enrique", "Iglesias"},
2441		{5, "Shakira", "Ripoll"},
2442	} {
2443		muts = append(muts, Insert("Singers", columns, row))
2444	}
2445	if _, err := client.Apply(ctx, muts); err != nil {
2446		t.Fatal(err)
2447	}
2448	// Identifiers in PDML statements must be fully qualified.
2449	// TODO(jba): revisit the above.
2450	count, err := client.PartitionedUpdate(ctx, Statement{
2451		SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= @end`,
2452		Params: map[string]interface{}{
2453			"end": 3,
2454		},
2455	})
2456	if err != nil {
2457		t.Fatal(err)
2458	}
2459	if want := int64(3); count != want {
2460		t.Fatalf("got %d, want %d", count, want)
2461	}
2462	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2463	if err != nil {
2464		t.Fatal(err)
2465	}
2466	want := [][]interface{}{
2467		{int64(1), "changed", "Kulthum"},
2468		{int64(2), "changed", "Khil"},
2469		{int64(3), "changed", "McDonald"},
2470		{int64(4), "Enrique", "Iglesias"},
2471		{int64(5), "Shakira", "Ripoll"},
2472	}
2473	if !testEqual(got, want) {
2474		t.Errorf("\ngot %v\nwant%v", got, want)
2475	}
2476}
2477
2478func TestIntegration_BatchDML(t *testing.T) {
2479	skipEmulatorTest(t)
2480	t.Parallel()
2481
2482	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2483	defer cancel()
2484	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2485	defer cleanup()
2486
2487	columns := []string{"SingerId", "FirstName", "LastName"}
2488
2489	// Populate the Singers table.
2490	var muts []*Mutation
2491	for _, row := range [][]interface{}{
2492		{1, "Umm", "Kulthum"},
2493		{2, "Eduard", "Khil"},
2494		{3, "Audra", "McDonald"},
2495	} {
2496		muts = append(muts, Insert("Singers", columns, row))
2497	}
2498	if _, err := client.Apply(ctx, muts); err != nil {
2499		t.Fatal(err)
2500	}
2501
2502	var counts []int64
2503	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2504		counts, err = tx.BatchUpdate(ctx, []Statement{
2505			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2506			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2507			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2508		})
2509		return err
2510	})
2511
2512	if err != nil {
2513		t.Fatal(err)
2514	}
2515	if want := []int64{1, 1, 1}; !testEqual(counts, want) {
2516		t.Fatalf("got %d, want %d", counts, want)
2517	}
2518	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2519	if err != nil {
2520		t.Fatal(err)
2521	}
2522	want := [][]interface{}{
2523		{int64(1), "changed 1", "Kulthum"},
2524		{int64(2), "changed 2", "Khil"},
2525		{int64(3), "changed 3", "McDonald"},
2526	}
2527	if !testEqual(got, want) {
2528		t.Errorf("\ngot %v\nwant%v", got, want)
2529	}
2530}
2531
2532func TestIntegration_BatchDML_NoStatements(t *testing.T) {
2533	t.Parallel()
2534
2535	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2536	defer cancel()
2537	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2538	defer cleanup()
2539
2540	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2541		_, err = tx.BatchUpdate(ctx, []Statement{})
2542		return err
2543	})
2544	if err == nil {
2545		t.Fatal("expected error, got nil")
2546	}
2547	if s, ok := status.FromError(err); ok {
2548		if s.Code() != codes.InvalidArgument {
2549			t.Fatalf("expected InvalidArgument, got %v", err)
2550		}
2551	} else {
2552		t.Fatalf("expected InvalidArgument, got %v", err)
2553	}
2554}
2555
2556func TestIntegration_BatchDML_TwoStatements(t *testing.T) {
2557	skipEmulatorTest(t)
2558	t.Parallel()
2559
2560	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2561	defer cancel()
2562	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2563	defer cleanup()
2564
2565	columns := []string{"SingerId", "FirstName", "LastName"}
2566
2567	// Populate the Singers table.
2568	var muts []*Mutation
2569	for _, row := range [][]interface{}{
2570		{1, "Umm", "Kulthum"},
2571		{2, "Eduard", "Khil"},
2572		{3, "Audra", "McDonald"},
2573	} {
2574		muts = append(muts, Insert("Singers", columns, row))
2575	}
2576	if _, err := client.Apply(ctx, muts); err != nil {
2577		t.Fatal(err)
2578	}
2579
2580	var updateCount int64
2581	var batchCounts []int64
2582	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2583		batchCounts, err = tx.BatchUpdate(ctx, []Statement{
2584			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2585			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2586			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2587		})
2588		if err != nil {
2589			return err
2590		}
2591
2592		updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`})
2593		return err
2594	})
2595	if err != nil {
2596		t.Fatal(err)
2597	}
2598	if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) {
2599		t.Fatalf("got %d, want %d", batchCounts, want)
2600	}
2601	if updateCount != 1 {
2602		t.Fatalf("got %v, want 1", updateCount)
2603	}
2604}
2605
2606// TODO(deklerk): this currently does not work because the transaction appears to
2607// get rolled back after a single statement fails. b/120158761
2608func TestIntegration_BatchDML_Error(t *testing.T) {
2609	t.Parallel()
2610
2611	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2612	defer cancel()
2613	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2614	defer cleanup()
2615
2616	columns := []string{"SingerId", "FirstName", "LastName"}
2617
2618	// Populate the Singers table.
2619	var muts []*Mutation
2620	for _, row := range [][]interface{}{
2621		{1, "Umm", "Kulthum"},
2622		{2, "Eduard", "Khil"},
2623		{3, "Audra", "McDonald"},
2624	} {
2625		muts = append(muts, Insert("Singers", columns, row))
2626	}
2627	if _, err := client.Apply(ctx, muts); err != nil {
2628		t.Fatal(err)
2629	}
2630
2631	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2632		counts, err := tx.BatchUpdate(ctx, []Statement{
2633			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2634			{SQL: `some illegal statement`},
2635			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2636		})
2637		if err == nil {
2638			t.Fatal("expected err, got nil")
2639		}
2640		if want := []int64{1}; !testEqual(counts, want) {
2641			t.Fatalf("got %d, want %d", counts, want)
2642		}
2643
2644		got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns))
2645		if err != nil {
2646			t.Fatal(err)
2647		}
2648		want := [][]interface{}{
2649			{int64(1), "changed 1", "Kulthum"},
2650			{int64(2), "Eduard", "Khil"},
2651			{int64(3), "Audra", "McDonald"},
2652		}
2653		if !testEqual(got, want) {
2654			t.Errorf("\ngot %v\nwant%v", got, want)
2655		}
2656
2657		return nil
2658	})
2659	if err != nil {
2660		t.Fatal(err)
2661	}
2662}
2663
2664// Prepare initializes Cloud Spanner testing DB and clients.
2665func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
2666	if databaseAdmin == nil {
2667		t.Skip("Integration tests skipped")
2668	}
2669	// Construct a unique test DB name.
2670	dbName := dbNameSpace.New()
2671
2672	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName)
2673	// Create database and tables.
2674	op, err := databaseAdmin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{
2675		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
2676		CreateStatement: "CREATE DATABASE " + dbName,
2677		ExtraStatements: statements,
2678	})
2679	if err != nil {
2680		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
2681	}
2682	if _, err := op.Wait(ctx); err != nil {
2683		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
2684	}
2685	client, err := createClient(ctx, dbPath, spc)
2686	if err != nil {
2687		t.Fatalf("cannot create data client on DB %v: %v", dbPath, err)
2688	}
2689	return client, dbPath, func() {
2690		client.Close()
2691	}
2692}
2693
2694func cleanupInstances() {
2695	if instanceAdmin == nil {
2696		// Integration tests skipped.
2697		return
2698	}
2699
2700	ctx := context.Background()
2701	parent := fmt.Sprintf("projects/%v", testProjectID)
2702	iter := instanceAdmin.ListInstances(ctx, &instancepb.ListInstancesRequest{
2703		Parent: parent,
2704		Filter: "name:gotest-",
2705	})
2706	expireAge := 24 * time.Hour
2707
2708	for {
2709		inst, err := iter.Next()
2710		if err == iterator.Done {
2711			break
2712		}
2713		if err != nil {
2714			panic(err)
2715		}
2716		if instanceNameSpace.Older(inst.Name, expireAge) {
2717			log.Printf("Deleting instance %s", inst.Name)
2718
2719			if err := instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{Name: inst.Name}); err != nil {
2720				log.Printf("failed to delete instance %s (error %v), might need a manual removal",
2721					inst.Name, err)
2722			}
2723		}
2724	}
2725}
2726
2727func rangeReads(ctx context.Context, t *testing.T, client *Client) {
2728	checkRange := func(ks KeySet, wantNums ...int) {
2729		if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok {
2730			t.Errorf("key set %+v: %s", ks, msg)
2731		}
2732	}
2733
2734	checkRange(Key{"k1"}, 1)
2735	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4)
2736	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5)
2737	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5)
2738	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4)
2739
2740	// Partial key specification.
2741	checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9)
2742	checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9)
2743	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10)
2744	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11)
2745
2746	// The following produce empty ranges.
2747	// TODO(jba): Consider a multi-part key to illustrate partial key behavior.
2748	// checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen})
2749	// checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen})
2750	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen})
2751	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed})
2752
2753	// Prefix is component-wise, not string prefix.
2754	checkRange(Key{"k1"}.AsPrefix(), 1)
2755	checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
2756
2757	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
2758}
2759
2760func indexRangeReads(ctx context.Context, t *testing.T, client *Client) {
2761	checkRange := func(ks KeySet, wantNums ...int) {
2762		if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns),
2763			wantNums); !ok {
2764			t.Errorf("key set %+v: %s", ks, msg)
2765		}
2766	}
2767
2768	checkRange(Key{"v1"}, 1)
2769	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4)
2770	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5)
2771	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5)
2772	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4)
2773
2774	// // Partial key specification.
2775	checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9)
2776	checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9)
2777	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10)
2778	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11)
2779
2780	// // The following produce empty ranges.
2781	// checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen})
2782	// checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen})
2783	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen})
2784	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed})
2785
2786	// // Prefix is component-wise, not string prefix.
2787	checkRange(Key{"v1"}.AsPrefix(), 1)
2788	checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
2789	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
2790
2791	// Read from an index with DESC ordering.
2792	wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}
2793	if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns),
2794		wantNums); !ok {
2795		t.Errorf("desc: %s", msg)
2796	}
2797}
2798
2799type testTableRow struct{ Key, StringValue string }
2800
2801func compareRows(iter *RowIterator, wantNums []int) (string, bool) {
2802	rows, err := readAllTestTable(iter)
2803	if err != nil {
2804		return err.Error(), false
2805	}
2806	want := map[string]string{}
2807	for _, n := range wantNums {
2808		want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n)
2809	}
2810	got := map[string]string{}
2811	for _, r := range rows {
2812		got[r.Key] = r.StringValue
2813	}
2814	if !testEqual(got, want) {
2815		return fmt.Sprintf("got %v, want %v", got, want), false
2816	}
2817	return "", true
2818}
2819
2820func isNaN(x interface{}) bool {
2821	f, ok := x.(float64)
2822	if !ok {
2823		return false
2824	}
2825	return math.IsNaN(f)
2826}
2827
2828// createClient creates Cloud Spanner data client.
2829func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) {
2830	if os.Getenv("SPANNER_EMULATOR_HOST") == "" {
2831		client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{
2832			SessionPoolConfig: spc,
2833		}, option.WithTokenSource(testutil.TokenSource(ctx, Scope, AdminScope)), option.WithEndpoint(endpoint))
2834	} else {
2835		// When the emulator is enabled, option.WithoutAuthentication()
2836		// will be added automatically which is incompatible with
2837		// option.WithTokenSource(testutil.TokenSource(ctx, Scope)).
2838		client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc})
2839	}
2840
2841	if err != nil {
2842		return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
2843	}
2844	return client, nil
2845}
2846
2847// populate prepares the database with some data.
2848func populate(ctx context.Context, client *Client) error {
2849	// Populate data
2850	var err error
2851	m := InsertMap("test", map[string]interface{}{
2852		"a": str1,
2853		"b": str2,
2854	})
2855	_, err = client.Apply(ctx, []*Mutation{m})
2856	return err
2857}
2858
2859func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) {
2860	if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) {
2861		return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false
2862	}
2863	return "", true
2864}
2865
2866func rowToValues(r *Row) ([]interface{}, error) {
2867	var x int64
2868	var y, z string
2869	if err := r.Column(0, &x); err != nil {
2870		return nil, err
2871	}
2872	if err := r.Column(1, &y); err != nil {
2873		return nil, err
2874	}
2875	if err := r.Column(2, &z); err != nil {
2876		return nil, err
2877	}
2878	return []interface{}{x, y, z}, nil
2879}
2880
2881func readAll(iter *RowIterator) ([][]interface{}, error) {
2882	defer iter.Stop()
2883	var vals [][]interface{}
2884	for {
2885		row, err := iter.Next()
2886		if err == iterator.Done {
2887			return vals, nil
2888		}
2889		if err != nil {
2890			return nil, err
2891		}
2892		v, err := rowToValues(row)
2893		if err != nil {
2894			return nil, err
2895		}
2896		vals = append(vals, v)
2897	}
2898}
2899
2900func readAllTestTable(iter *RowIterator) ([]testTableRow, error) {
2901	defer iter.Stop()
2902	var vals []testTableRow
2903	for {
2904		row, err := iter.Next()
2905		if err == iterator.Done {
2906			return vals, nil
2907		}
2908		if err != nil {
2909			return nil, err
2910		}
2911		var ttr testTableRow
2912		if err := row.ToStruct(&ttr); err != nil {
2913			return nil, err
2914		}
2915		vals = append(vals, ttr)
2916	}
2917}
2918
2919func maxDuration(a, b time.Duration) time.Duration {
2920	if a > b {
2921		return a
2922	}
2923	return b
2924}
2925
2926func skipEmulatorTest(t *testing.T) {
2927	if os.Getenv("SPANNER_EMULATOR_HOST") != "" {
2928		t.Skip("Skipping testing against the emulator.")
2929	}
2930}
2931