1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"errors"
22	"flag"
23	"fmt"
24	"log"
25	"math"
26	"os"
27	"reflect"
28	"strings"
29	"sync"
30	"testing"
31	"time"
32
33	"cloud.google.com/go/civil"
34	"cloud.google.com/go/internal/testutil"
35	"cloud.google.com/go/internal/uid"
36	database "cloud.google.com/go/spanner/admin/database/apiv1"
37	instance "cloud.google.com/go/spanner/admin/instance/apiv1"
38	"google.golang.org/api/iterator"
39	"google.golang.org/api/option"
40	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
41	instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
42	"google.golang.org/grpc"
43	"google.golang.org/grpc/codes"
44	"google.golang.org/grpc/status"
45)
46
47var (
48	// testProjectID specifies the project used for testing. It can be changed
49	// by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID.
50	testProjectID = testutil.ProjID()
51
52	dbNameSpace       = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
53	instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true})
54	testInstanceID    = instanceNameSpace.New()
55
56	testTable        = "TestTable"
57	testTableIndex   = "TestTableByValue"
58	testTableColumns = []string{"Key", "StringValue"}
59
60	databaseAdmin *database.DatabaseAdminClient
61	instanceAdmin *instance.InstanceAdminClient
62
63	singerDBStatements = []string{
64		`CREATE TABLE Singers (
65				SingerId	INT64 NOT NULL,
66				FirstName	STRING(1024),
67				LastName	STRING(1024),
68				SingerInfo	BYTES(MAX)
69			) PRIMARY KEY (SingerId)`,
70		`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
71		`CREATE TABLE Accounts (
72				AccountId	INT64 NOT NULL,
73				Nickname	STRING(100),
74				Balance		INT64 NOT NULL,
75			) PRIMARY KEY (AccountId)`,
76		`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
77		`CREATE TABLE Types (
78				RowID		INT64 NOT NULL,
79				String		STRING(MAX),
80				StringArray	ARRAY<STRING(MAX)>,
81				Bytes		BYTES(MAX),
82				BytesArray	ARRAY<BYTES(MAX)>,
83				Int64a		INT64,
84				Int64Array	ARRAY<INT64>,
85				Bool		BOOL,
86				BoolArray	ARRAY<BOOL>,
87				Float64		FLOAT64,
88				Float64Array	ARRAY<FLOAT64>,
89				Date		DATE,
90				DateArray	ARRAY<DATE>,
91				Timestamp	TIMESTAMP,
92				TimestampArray	ARRAY<TIMESTAMP>,
93			) PRIMARY KEY (RowID)`,
94	}
95
96	readDBStatements = []string{
97		`CREATE TABLE TestTable (
98                    Key          STRING(MAX) NOT NULL,
99                    StringValue  STRING(MAX)
100            ) PRIMARY KEY (Key)`,
101		`CREATE INDEX TestTableByValue ON TestTable(StringValue)`,
102		`CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`,
103	}
104
105	simpleDBStatements = []string{
106		`CREATE TABLE test (
107				a	STRING(1024),
108				b	STRING(1024),
109			) PRIMARY KEY (a)`,
110	}
111	simpleDBTableColumns = []string{"a", "b"}
112
113	ctsDBStatements = []string{
114		`CREATE TABLE TestTable (
115		    Key  STRING(MAX) NOT NULL,
116		    Ts   TIMESTAMP OPTIONS (allow_commit_timestamp = true),
117	    ) PRIMARY KEY (Key)`,
118	}
119)
120
121const (
122	str1 = "alice"
123	str2 = "a@example.com"
124)
125
126func TestMain(m *testing.M) {
127	cleanup := initIntegrationTests()
128	res := m.Run()
129	cleanup()
130	os.Exit(res)
131}
132
133var grpcHeaderChecker = testutil.DefaultHeadersEnforcer()
134
135func initIntegrationTests() (cleanup func()) {
136	ctx := context.Background()
137	flag.Parse() // Needed for testing.Short().
138	noop := func() {}
139
140	if testing.Short() {
141		log.Println("Integration tests skipped in -short mode.")
142		return noop
143	}
144
145	if testProjectID == "" {
146		log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing")
147		return noop
148	}
149
150	ts := testutil.TokenSource(ctx, AdminScope, Scope)
151	if ts == nil {
152		log.Printf("Integration test skipped: cannot get service account credential from environment variable %v", "GCLOUD_TESTS_GOLANG_KEY")
153		return noop
154	}
155	var err error
156
157	opts := append(grpcHeaderChecker.CallOptions(), option.WithTokenSource(ts), option.WithEndpoint(endpoint))
158
159	// Run integration tests against the given emulator. Currently, the database and
160	// instance admin clients are auto-generated, which do not support to configure
161	// SPANNER_EMULATOR_HOST.
162	emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST")
163	if emulatorAddr != "" {
164		opts = append(
165			grpcHeaderChecker.CallOptions(),
166			option.WithEndpoint(emulatorAddr),
167			option.WithGRPCDialOption(grpc.WithInsecure()),
168			option.WithoutAuthentication(),
169		)
170	}
171
172	// Create InstanceAdmin and DatabaseAdmin clients.
173	instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...)
174	if err != nil {
175		log.Fatalf("cannot create instance databaseAdmin client: %v", err)
176	}
177	databaseAdmin, err = database.NewDatabaseAdminClient(ctx, opts...)
178	if err != nil {
179		log.Fatalf("cannot create databaseAdmin client: %v", err)
180	}
181	// Get the list of supported instance configs for the project that is used
182	// for the integration tests. The supported instance configs can differ per
183	// project. The integration tests will use the first instance config that
184	// is returned by Cloud Spanner. This will normally be the regional config
185	// that is physically the closest to where the request is coming from.
186	configIterator := instanceAdmin.ListInstanceConfigs(ctx, &instancepb.ListInstanceConfigsRequest{
187		Parent: fmt.Sprintf("projects/%s", testProjectID),
188	})
189	config, err := configIterator.Next()
190	if err != nil {
191		log.Fatalf("Cannot get any instance configurations.\nPlease make sure the Cloud Spanner API is enabled for the test project.\nGet error: %v", err)
192	}
193	// Create a test instance to use for this test run.
194	op, err := instanceAdmin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{
195		Parent:     fmt.Sprintf("projects/%s", testProjectID),
196		InstanceId: testInstanceID,
197		Instance: &instancepb.Instance{
198			Config:      config.Name,
199			DisplayName: testInstanceID,
200			NodeCount:   1,
201		},
202	})
203	if err != nil {
204		log.Fatalf("could not create instance with id %s: %v", fmt.Sprintf("projects/%s/instances/%s", testProjectID, testInstanceID), err)
205	}
206	// Wait for the instance creation to finish.
207	i, err := op.Wait(ctx)
208	if err != nil {
209		log.Fatalf("waiting for instance creation to finish failed: %v", err)
210	}
211	if i.State != instancepb.Instance_READY {
212		log.Printf("instance state is not READY, it might be that the test instance will cause problems during tests. Got state %v\n", i.State)
213	}
214
215	return func() {
216		// Delete this test instance.
217		instanceName := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID)
218		if err = instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{
219			Name: instanceName,
220		}); err != nil {
221			log.Printf("failed to drop instance %s (error %v), might need a manual removal",
222				instanceName, err)
223		}
224		// Delete other test instances that may be lingering around.
225		cleanupInstances()
226		databaseAdmin.Close()
227		instanceAdmin.Close()
228	}
229}
230
231func TestIntegration_InitSessionPool(t *testing.T) {
232	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
233	defer cancel()
234	// Set up an empty testing environment.
235	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, []string{})
236	defer cleanup()
237	sp := client.idleSessions
238	sp.mu.Lock()
239	want := sp.MinOpened
240	sp.mu.Unlock()
241	var numOpened int
242loop:
243	for {
244		select {
245		case <-ctx.Done():
246			t.Fatalf("timed out, got %d session(s), want %d", numOpened, want)
247		default:
248			sp.mu.Lock()
249			numOpened = sp.idleList.Len() + sp.idleWriteList.Len()
250			sp.mu.Unlock()
251			if uint64(numOpened) == want {
252				break loop
253			}
254		}
255	}
256	// Delete all sessions in the pool on the backend and then try to execute a
257	// simple query. The 'Session not found' error should cause an automatic
258	// retry of the read-only transaction.
259	sp.mu.Lock()
260	s := sp.idleList.Front()
261	for {
262		if s == nil {
263			break
264		}
265		// This will delete the session on the backend without removing it
266		// from the pool.
267		s.Value.(*session).delete(context.Background())
268		s = s.Next()
269	}
270	sp.mu.Unlock()
271	sql := "SELECT 1, 'FOO', 'BAR'"
272	tx := client.ReadOnlyTransaction()
273	defer tx.Close()
274	iter := tx.Query(context.Background(), NewStatement(sql))
275	rows, err := readAll(iter)
276	if err != nil {
277		t.Fatalf("Unexpected error for query %q: %v", sql, err)
278	}
279	if got, want := len(rows), 1; got != want {
280		t.Fatalf("Row count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
281	}
282	if got, want := len(rows[0]), 3; got != want {
283		t.Fatalf("Column count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
284	}
285	if got, want := rows[0][0].(int64), int64(1); got != want {
286		t.Fatalf("Column value mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
287	}
288}
289
290// Test SingleUse transaction.
291func TestIntegration_SingleUse(t *testing.T) {
292	t.Parallel()
293
294	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
295	defer cancel()
296	// Set up testing environment.
297	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
298	defer cleanup()
299
300	writes := []struct {
301		row []interface{}
302		ts  time.Time
303	}{
304		{row: []interface{}{1, "Marc", "Foo"}},
305		{row: []interface{}{2, "Tars", "Bar"}},
306		{row: []interface{}{3, "Alpha", "Beta"}},
307		{row: []interface{}{4, "Last", "End"}},
308	}
309	// Try to write four rows through the Apply API.
310	for i, w := range writes {
311		var err error
312		m := InsertOrUpdate("Singers",
313			[]string{"SingerId", "FirstName", "LastName"},
314			w.row)
315		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
316			t.Fatal(err)
317		}
318	}
319	// Calculate time difference between Cloud Spanner server and localhost to
320	// use to determine the exact staleness value to use.
321	timeDiff := maxDuration(time.Now().Sub(writes[0].ts), 0)
322
323	// Test reading rows with different timestamp bounds.
324	for i, test := range []struct {
325		name    string
326		want    [][]interface{}
327		tb      TimestampBound
328		checkTs func(time.Time) error
329	}{
330		{
331			name: "strong",
332			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
333			tb:   StrongRead(),
334			checkTs: func(ts time.Time) error {
335				// writes[3] is the last write, all subsequent strong read
336				// should have a timestamp larger than that.
337				if ts.Before(writes[3].ts) {
338					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
339				}
340				return nil
341			},
342		},
343		{
344			name: "min_read_timestamp",
345			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
346			tb:   MinReadTimestamp(writes[3].ts),
347			checkTs: func(ts time.Time) error {
348				if ts.Before(writes[3].ts) {
349					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
350				}
351				return nil
352			},
353		},
354		{
355			name: "max_staleness",
356			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
357			tb:   MaxStaleness(time.Second),
358			checkTs: func(ts time.Time) error {
359				if ts.Before(writes[3].ts) {
360					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
361				}
362				return nil
363			},
364		},
365		{
366			name: "read_timestamp",
367			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
368			tb:   ReadTimestamp(writes[2].ts),
369			checkTs: func(ts time.Time) error {
370				if ts != writes[2].ts {
371					return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts)
372				}
373				return nil
374			},
375		},
376		{
377			name: "exact_staleness",
378			want: nil,
379			// Specify a staleness which should be already before this test.
380			tb: ExactStaleness(time.Now().Sub(writes[0].ts) + timeDiff + 30*time.Second),
381			checkTs: func(ts time.Time) error {
382				if !ts.Before(writes[0].ts) {
383					return fmt.Errorf("read got timestamp %v, want it to be earlier than %v", ts, writes[0].ts)
384				}
385				return nil
386			},
387		},
388	} {
389		t.Run(test.name, func(t *testing.T) {
390			// SingleUse.Query
391			su := client.Single().WithTimestampBound(test.tb)
392			got, err := readAll(su.Query(
393				ctx,
394				Statement{
395					"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
396					map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
397				}))
398			if err != nil {
399				t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err)
400			}
401			rts, err := su.Timestamp()
402			if err != nil {
403				t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err)
404			}
405			if err := test.checkTs(rts); err != nil {
406				t.Fatalf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err)
407			}
408			if !testEqual(got, test.want) {
409				t.Fatalf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want)
410			}
411			// SingleUse.Read
412			su = client.Single().WithTimestampBound(test.tb)
413			got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
414			if err != nil {
415				t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err)
416			}
417			rts, err = su.Timestamp()
418			if err != nil {
419				t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err)
420			}
421			if err := test.checkTs(rts); err != nil {
422				t.Fatalf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err)
423			}
424			if !testEqual(got, test.want) {
425				t.Fatalf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want)
426			}
427			// SingleUse.ReadRow
428			got = nil
429			for _, k := range []Key{{1}, {3}, {4}} {
430				su = client.Single().WithTimestampBound(test.tb)
431				r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
432				if err != nil {
433					continue
434				}
435				v, err := rowToValues(r)
436				if err != nil {
437					continue
438				}
439				got = append(got, v)
440				rts, err = su.Timestamp()
441				if err != nil {
442					t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
443				}
444				if err := test.checkTs(rts); err != nil {
445					t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
446				}
447			}
448			if !testEqual(got, test.want) {
449				t.Fatalf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want)
450			}
451			// SingleUse.ReadUsingIndex
452			su = client.Single().WithTimestampBound(test.tb)
453			got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
454			if err != nil {
455				t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err)
456			}
457			// The results from ReadUsingIndex is sorted by the index rather than primary key.
458			if len(got) != len(test.want) {
459				t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
460			}
461			for j, g := range got {
462				if j > 0 {
463					prev := got[j-1][1].(string) + got[j-1][2].(string)
464					curr := got[j][1].(string) + got[j][2].(string)
465					if strings.Compare(prev, curr) > 0 {
466						t.Fatalf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
467					}
468				}
469				found := false
470				for _, w := range test.want {
471					if testEqual(g, w) {
472						found = true
473					}
474				}
475				if !found {
476					t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
477					break
478				}
479			}
480			rts, err = su.Timestamp()
481			if err != nil {
482				t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
483			}
484			if err := test.checkTs(rts); err != nil {
485				t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
486			}
487			// SingleUse.ReadRowUsingIndex
488			got = nil
489			for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} {
490				su = client.Single().WithTimestampBound(test.tb)
491				r, err := su.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"})
492				if err != nil {
493					continue
494				}
495				v, err := rowToValues(r)
496				if err != nil {
497					continue
498				}
499				got = append(got, v)
500				rts, err = su.Timestamp()
501				if err != nil {
502					t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err)
503				}
504				if err := test.checkTs(rts); err != nil {
505					t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err)
506				}
507			}
508			if !testEqual(got, test.want) {
509				t.Fatalf("%d: got unexpected results from SingleUse.ReadRowUsingIndex: %v, want %v", i, got, test.want)
510			}
511		})
512	}
513}
514
515// Test resource-based routing enabled.
516func TestIntegration_SingleUse_WithResourceBasedRouting(t *testing.T) {
517	os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true")
518	defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "")
519
520	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
521	defer cancel()
522	// Set up testing environment.
523	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
524	defer cleanup()
525
526	writes := []struct {
527		row []interface{}
528		ts  time.Time
529	}{
530		{row: []interface{}{1, "Marc", "Foo"}},
531		{row: []interface{}{2, "Tars", "Bar"}},
532		{row: []interface{}{3, "Alpha", "Beta"}},
533		{row: []interface{}{4, "Last", "End"}},
534	}
535	// Try to write four rows through the Apply API.
536	for i, w := range writes {
537		var err error
538		m := InsertOrUpdate("Singers",
539			[]string{"SingerId", "FirstName", "LastName"},
540			w.row)
541		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
542			t.Fatal(err)
543		}
544	}
545
546	row, err := client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"})
547	if err != nil {
548		t.Errorf("SingleUse.ReadRow returns error %v, want nil", err)
549	}
550	var got string
551	if err := row.Column(0, &got); err != nil {
552		t.Errorf("row.Column returns error %v, want nil", err)
553	}
554	if want := "Alpha"; got != want {
555		t.Errorf("got %q, want %q", got, want)
556	}
557}
558
559func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) {
560	t.Parallel()
561
562	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
563	defer cancel()
564	// Set up testing environment.
565	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
566	defer cleanup()
567
568	writes := []struct {
569		row []interface{}
570		ts  time.Time
571	}{
572		{row: []interface{}{1, "Marc", "Foo"}},
573		{row: []interface{}{2, "Tars", "Bar"}},
574		{row: []interface{}{3, "Alpha", "Beta"}},
575		{row: []interface{}{4, "Last", "End"}},
576	}
577	// Try to write four rows through the Apply API.
578	for i, w := range writes {
579		var err error
580		m := InsertOrUpdate("Singers",
581			[]string{"SingerId", "FirstName", "LastName"},
582			w.row)
583		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
584			t.Fatal(err)
585		}
586	}
587
588	su := client.Single()
589	const limit = 1
590	gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}),
591		[]string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit}))
592	if err != nil {
593		t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err)
594	}
595	if got, want := len(gotRows), limit; got != want {
596		t.Errorf("got %d, want %d", got, want)
597	}
598}
599
600// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it
601// also tests for a single timestamp across multiple reads.
602func TestIntegration_ReadOnlyTransaction(t *testing.T) {
603	t.Parallel()
604
605	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
606	defer cancel()
607	// Set up testing environment.
608	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
609	defer cleanup()
610
611	writes := []struct {
612		row []interface{}
613		ts  time.Time
614	}{
615		{row: []interface{}{1, "Marc", "Foo"}},
616		{row: []interface{}{2, "Tars", "Bar"}},
617		{row: []interface{}{3, "Alpha", "Beta"}},
618		{row: []interface{}{4, "Last", "End"}},
619	}
620	// Try to write four rows through the Apply API.
621	for i, w := range writes {
622		var err error
623		m := InsertOrUpdate("Singers",
624			[]string{"SingerId", "FirstName", "LastName"},
625			w.row)
626		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
627			t.Fatal(err)
628		}
629	}
630
631	// For testing timestamp bound staleness.
632	<-time.After(time.Second)
633
634	// Test reading rows with different timestamp bounds.
635	for i, test := range []struct {
636		want    [][]interface{}
637		tb      TimestampBound
638		checkTs func(time.Time) error
639	}{
640		// Note: min_read_timestamp and max_staleness are not supported by
641		// ReadOnlyTransaction. See API document for more details.
642		{
643			// strong
644			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
645			StrongRead(),
646			func(ts time.Time) error {
647				if ts.Before(writes[3].ts) {
648					return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
649				}
650				return nil
651			},
652		},
653		{
654			// read_timestamp
655			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
656			ReadTimestamp(writes[2].ts),
657			func(ts time.Time) error {
658				if ts != writes[2].ts {
659					return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts)
660				}
661				return nil
662			},
663		},
664		{
665			// exact_staleness
666			nil,
667			// Specify a staleness which should be already before this test
668			// because context timeout is set to be 10s.
669			ExactStaleness(11 * time.Second),
670			func(ts time.Time) error {
671				if ts.After(writes[0].ts) {
672					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts)
673				}
674				return nil
675			},
676		},
677	} {
678		// ReadOnlyTransaction.Query
679		ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb)
680		got, err := readAll(ro.Query(
681			ctx,
682			Statement{
683				"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
684				map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
685			}))
686		if err != nil {
687			t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err)
688		}
689		if !testEqual(got, test.want) {
690			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want)
691		}
692		rts, err := ro.Timestamp()
693		if err != nil {
694			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err)
695		}
696		if err := test.checkTs(rts); err != nil {
697			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err)
698		}
699		roTs := rts
700		// ReadOnlyTransaction.Read
701		got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
702		if err != nil {
703			t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err)
704		}
705		if !testEqual(got, test.want) {
706			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want)
707		}
708		rts, err = ro.Timestamp()
709		if err != nil {
710			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err)
711		}
712		if err := test.checkTs(rts); err != nil {
713			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err)
714		}
715		if roTs != rts {
716			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
717		}
718		// ReadOnlyTransaction.ReadRow
719		got = nil
720		for _, k := range []Key{{1}, {3}, {4}} {
721			r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
722			if err != nil {
723				continue
724			}
725			v, err := rowToValues(r)
726			if err != nil {
727				continue
728			}
729			got = append(got, v)
730			rts, err = ro.Timestamp()
731			if err != nil {
732				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
733			}
734			if err := test.checkTs(rts); err != nil {
735				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
736			}
737			if roTs != rts {
738				t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
739			}
740		}
741		if !testEqual(got, test.want) {
742			t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want)
743		}
744		// SingleUse.ReadUsingIndex
745		got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
746		if err != nil {
747			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err)
748		}
749		// The results from ReadUsingIndex is sorted by the index rather than
750		// primary key.
751		if len(got) != len(test.want) {
752			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
753		}
754		for j, g := range got {
755			if j > 0 {
756				prev := got[j-1][1].(string) + got[j-1][2].(string)
757				curr := got[j][1].(string) + got[j][2].(string)
758				if strings.Compare(prev, curr) > 0 {
759					t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
760				}
761			}
762			found := false
763			for _, w := range test.want {
764				if testEqual(g, w) {
765					found = true
766				}
767			}
768			if !found {
769				t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
770				break
771			}
772		}
773		rts, err = ro.Timestamp()
774		if err != nil {
775			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
776		}
777		if err := test.checkTs(rts); err != nil {
778			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
779		}
780		if roTs != rts {
781			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
782		}
783		// ReadOnlyTransaction.ReadRowUsingIndex
784		got = nil
785		for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} {
786			r, err := ro.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"})
787			if err != nil {
788				continue
789			}
790			v, err := rowToValues(r)
791			if err != nil {
792				continue
793			}
794			got = append(got, v)
795			rts, err = ro.Timestamp()
796			if err != nil {
797				t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err)
798			}
799			if err := test.checkTs(rts); err != nil {
800				t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err)
801			}
802			if roTs != rts {
803				t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
804			}
805		}
806		if !testEqual(got, test.want) {
807			t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRowUsingIndex: %v, want %v", i, got, test.want)
808		}
809		ro.Close()
810	}
811}
812
813// Test ReadOnlyTransaction with different timestamp bound when there's an
814// update at the same time.
815func TestIntegration_UpdateDuringRead(t *testing.T) {
816	t.Parallel()
817
818	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
819	defer cancel()
820	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
821	defer cleanup()
822
823	for i, tb := range []TimestampBound{
824		StrongRead(),
825		ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour
826		ExactStaleness(time.Minute * 30),
827	} {
828		ro := client.ReadOnlyTransaction().WithTimestampBound(tb)
829		_, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
830		if ErrCode(err) != codes.NotFound {
831			t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err)
832		}
833
834		m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i})
835		if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
836			t.Fatal(err)
837		}
838
839		_, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
840		if ErrCode(err) != codes.NotFound {
841			t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err)
842		}
843	}
844}
845
846// Test ReadWriteTransaction.
847func TestIntegration_ReadWriteTransaction(t *testing.T) {
848	t.Parallel()
849
850	// Give a longer deadline because of transaction backoffs.
851	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
852	defer cancel()
853	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
854	defer cleanup()
855
856	// Set up two accounts
857	accounts := []*Mutation{
858		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
859		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
860	}
861	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
862		t.Fatal(err)
863	}
864	wg := sync.WaitGroup{}
865
866	readBalance := func(iter *RowIterator) (int64, error) {
867		defer iter.Stop()
868		var bal int64
869		for {
870			row, err := iter.Next()
871			if err == iterator.Done {
872				return bal, nil
873			}
874			if err != nil {
875				return 0, err
876			}
877			if err := row.Column(0, &bal); err != nil {
878				return 0, err
879			}
880		}
881	}
882
883	for i := 0; i < 20; i++ {
884		wg.Add(1)
885		go func(iter int) {
886			defer wg.Done()
887			_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
888				// Query Foo's balance and Bar's balance.
889				bf, e := readBalance(tx.Query(ctx,
890					Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}}))
891				if e != nil {
892					return e
893				}
894				bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
895				if e != nil {
896					return e
897				}
898				if bf <= 0 {
899					return nil
900				}
901				bf--
902				bb++
903				return tx.BufferWrite([]*Mutation{
904					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}),
905					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}),
906				})
907			})
908			if err != nil {
909				t.Errorf("%d: failed to execute transaction: %v", iter, err)
910			}
911		}(i)
912	}
913	// Because of context timeout, all goroutines will eventually return.
914	wg.Wait()
915	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
916		var bf, bb int64
917		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"})
918		if e != nil {
919			return e
920		}
921		if ce := r.Column(0, &bf); ce != nil {
922			return ce
923		}
924		bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"}))
925		if e != nil {
926			return e
927		}
928		if bf != 30 || bb != 21 {
929			t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21)
930		}
931		return nil
932	})
933	if err != nil {
934		t.Errorf("failed to check balances: %v", err)
935	}
936}
937
938func TestIntegration_Reads(t *testing.T) {
939	t.Parallel()
940
941	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
942	defer cancel()
943	// Set up testing environment.
944	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
945	defer cleanup()
946
947	// Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2".
948	var ms []*Mutation
949	for i := 0; i < 15; i++ {
950		ms = append(ms, InsertOrUpdate(testTable,
951			testTableColumns,
952			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
953	}
954	// Don't use ApplyAtLeastOnce, so we can test the other code path.
955	if _, err := client.Apply(ctx, ms); err != nil {
956		t.Fatal(err)
957	}
958
959	// Empty read.
960	rows, err := readAllTestTable(client.Single().Read(ctx, testTable,
961		KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns))
962	if err != nil {
963		t.Fatal(err)
964	}
965	if got, want := len(rows), 0; got != want {
966		t.Errorf("got %d, want %d", got, want)
967	}
968
969	// Index empty read.
970	rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex,
971		KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns))
972	if err != nil {
973		t.Fatal(err)
974	}
975	if got, want := len(rows), 0; got != want {
976		t.Errorf("got %d, want %d", got, want)
977	}
978
979	// Point read.
980	row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns)
981	if err != nil {
982		t.Fatal(err)
983	}
984	var got testTableRow
985	if err := row.ToStruct(&got); err != nil {
986		t.Fatal(err)
987	}
988	if want := (testTableRow{"k1", "v1"}); got != want {
989		t.Errorf("got %v, want %v", got, want)
990	}
991
992	// Point read not found.
993	_, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns)
994	if ErrCode(err) != codes.NotFound {
995		t.Fatalf("got %v, want NotFound", err)
996	}
997
998	// Index point read.
999	rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns)
1000	if err != nil {
1001		t.Fatal(err)
1002	}
1003	var gotIndex testTableRow
1004	if err := rowIndex.ToStruct(&gotIndex); err != nil {
1005		t.Fatal(err)
1006	}
1007	if wantIndex := (testTableRow{"k1", "v1"}); gotIndex != wantIndex {
1008		t.Errorf("got %v, want %v", gotIndex, wantIndex)
1009	}
1010	// Index point read not found.
1011	_, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v999"}, testTableColumns)
1012	if ErrCode(err) != codes.NotFound {
1013		t.Fatalf("got %v, want NotFound", err)
1014	}
1015	rangeReads(ctx, t, client)
1016	indexRangeReads(ctx, t, client)
1017}
1018
1019func TestIntegration_EarlyTimestamp(t *testing.T) {
1020	t.Parallel()
1021
1022	// Test that we can get the timestamp from a read-only transaction as
1023	// soon as we have read at least one row.
1024	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1025	defer cancel()
1026	// Set up testing environment.
1027	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
1028	defer cleanup()
1029
1030	var ms []*Mutation
1031	for i := 0; i < 3; i++ {
1032		ms = append(ms, InsertOrUpdate(testTable,
1033			testTableColumns,
1034			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
1035	}
1036	if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
1037		t.Fatal(err)
1038	}
1039
1040	txn := client.Single()
1041	iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns)
1042	defer iter.Stop()
1043	// In single-use transaction, we should get an error before reading anything.
1044	if _, err := txn.Timestamp(); err == nil {
1045		t.Error("wanted error, got nil")
1046	}
1047	// After reading one row, the timestamp should be available.
1048	_, err := iter.Next()
1049	if err != nil {
1050		t.Fatal(err)
1051	}
1052	if _, err := txn.Timestamp(); err != nil {
1053		t.Errorf("got %v, want nil", err)
1054	}
1055
1056	txn = client.ReadOnlyTransaction()
1057	defer txn.Close()
1058	iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns)
1059	defer iter.Stop()
1060	// In an ordinary read-only transaction, the timestamp should be
1061	// available immediately.
1062	if _, err := txn.Timestamp(); err != nil {
1063		t.Errorf("got %v, want nil", err)
1064	}
1065}
1066
1067func TestIntegration_NestedTransaction(t *testing.T) {
1068	t.Parallel()
1069
1070	// You cannot use a transaction from inside a read-write transaction.
1071	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1072	defer cancel()
1073	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1074	defer cleanup()
1075
1076	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1077		_, err := client.ReadWriteTransaction(ctx,
1078			func(context.Context, *ReadWriteTransaction) error { return nil })
1079		if ErrCode(err) != codes.FailedPrecondition {
1080			t.Fatalf("got %v, want FailedPrecondition", err)
1081		}
1082		_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
1083		if ErrCode(err) != codes.FailedPrecondition {
1084			t.Fatalf("got %v, want FailedPrecondition", err)
1085		}
1086		rot := client.ReadOnlyTransaction()
1087		defer rot.Close()
1088		_, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
1089		if ErrCode(err) != codes.FailedPrecondition {
1090			t.Fatalf("got %v, want FailedPrecondition", err)
1091		}
1092		return nil
1093	})
1094	if err != nil {
1095		t.Fatal(err)
1096	}
1097}
1098
1099// Test client recovery on database recreation.
1100func TestIntegration_DbRemovalRecovery(t *testing.T) {
1101	t.Parallel()
1102
1103	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1104	defer cancel()
1105	// Create a client with MinOpened=0 to prevent the session pool maintainer
1106	// from repeatedly trying to create sessions for the invalid database.
1107	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, SessionPoolConfig{}, singerDBStatements)
1108	defer cleanup()
1109
1110	// Drop the testing database.
1111	if err := databaseAdmin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil {
1112		t.Fatalf("failed to drop testing database %v: %v", dbPath, err)
1113	}
1114
1115	// Now, send the query.
1116	iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
1117	defer iter.Stop()
1118	if _, err := iter.Next(); err == nil {
1119		t.Errorf("client sends query to removed database successfully, want it to fail")
1120	}
1121
1122	// Recreate database and table.
1123	dbName := dbPath[strings.LastIndex(dbPath, "/")+1:]
1124	op, err := databaseAdmin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{
1125		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
1126		CreateStatement: "CREATE DATABASE " + dbName,
1127		ExtraStatements: []string{
1128			`CREATE TABLE Singers (
1129				SingerId        INT64 NOT NULL,
1130				FirstName       STRING(1024),
1131				LastName        STRING(1024),
1132				SingerInfo      BYTES(MAX)
1133			) PRIMARY KEY (SingerId)`,
1134		},
1135	})
1136	if err != nil {
1137		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
1138	}
1139	if _, err := op.Wait(ctx); err != nil {
1140		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
1141	}
1142
1143	// Now, send the query again.
1144	iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
1145	defer iter.Stop()
1146	_, err = iter.Next()
1147	if err != nil && err != iterator.Done {
1148		t.Errorf("failed to send query to database %v: %v", dbPath, err)
1149	}
1150}
1151
1152// Test encoding/decoding non-struct Cloud Spanner types.
1153func TestIntegration_BasicTypes(t *testing.T) {
1154	t.Parallel()
1155
1156	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1157	defer cancel()
1158	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1159	defer cleanup()
1160
1161	t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z")
1162	// Boundaries
1163	t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z")
1164	t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z")
1165	d1, _ := civil.ParseDate("2016-11-15")
1166	// Boundaries
1167	d2, _ := civil.ParseDate("0001-01-01")
1168	d3, _ := civil.ParseDate("9999-12-31")
1169
1170	tests := []struct {
1171		col  string
1172		val  interface{}
1173		want interface{}
1174	}{
1175		{col: "String", val: ""},
1176		{col: "String", val: "", want: NullString{"", true}},
1177		{col: "String", val: "foo"},
1178		{col: "String", val: "foo", want: NullString{"foo", true}},
1179		{col: "String", val: NullString{"bar", true}, want: "bar"},
1180		{col: "String", val: NullString{"bar", false}, want: NullString{"", false}},
1181		{col: "String", val: nil, want: NullString{}},
1182		{col: "StringArray", val: []string(nil), want: []NullString(nil)},
1183		{col: "StringArray", val: []string{}, want: []NullString{}},
1184		{col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}},
1185		{col: "StringArray", val: []NullString(nil)},
1186		{col: "StringArray", val: []NullString{}},
1187		{col: "StringArray", val: []NullString{{"foo", true}, {}}},
1188		{col: "Bytes", val: []byte{}},
1189		{col: "Bytes", val: []byte{1, 2, 3}},
1190		{col: "Bytes", val: []byte(nil)},
1191		{col: "BytesArray", val: [][]byte(nil)},
1192		{col: "BytesArray", val: [][]byte{}},
1193		{col: "BytesArray", val: [][]byte{{1}, {2, 3}}},
1194		{col: "Int64a", val: 0, want: int64(0)},
1195		{col: "Int64a", val: -1, want: int64(-1)},
1196		{col: "Int64a", val: 2, want: int64(2)},
1197		{col: "Int64a", val: int64(3)},
1198		{col: "Int64a", val: 4, want: NullInt64{4, true}},
1199		{col: "Int64a", val: NullInt64{5, true}, want: int64(5)},
1200		{col: "Int64a", val: NullInt64{6, true}, want: int64(6)},
1201		{col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}},
1202		{col: "Int64a", val: nil, want: NullInt64{}},
1203		{col: "Int64Array", val: []int(nil), want: []NullInt64(nil)},
1204		{col: "Int64Array", val: []int{}, want: []NullInt64{}},
1205		{col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
1206		{col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)},
1207		{col: "Int64Array", val: []int64{}, want: []NullInt64{}},
1208		{col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
1209		{col: "Int64Array", val: []NullInt64(nil)},
1210		{col: "Int64Array", val: []NullInt64{}},
1211		{col: "Int64Array", val: []NullInt64{{1, true}, {}}},
1212		{col: "Bool", val: false},
1213		{col: "Bool", val: true},
1214		{col: "Bool", val: false, want: NullBool{false, true}},
1215		{col: "Bool", val: true, want: NullBool{true, true}},
1216		{col: "Bool", val: NullBool{true, true}},
1217		{col: "Bool", val: NullBool{false, false}},
1218		{col: "Bool", val: nil, want: NullBool{}},
1219		{col: "BoolArray", val: []bool(nil), want: []NullBool(nil)},
1220		{col: "BoolArray", val: []bool{}, want: []NullBool{}},
1221		{col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}},
1222		{col: "BoolArray", val: []NullBool(nil)},
1223		{col: "BoolArray", val: []NullBool{}},
1224		{col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}},
1225		{col: "Float64", val: 0.0},
1226		{col: "Float64", val: 3.14},
1227		{col: "Float64", val: math.NaN()},
1228		{col: "Float64", val: math.Inf(1)},
1229		{col: "Float64", val: math.Inf(-1)},
1230		{col: "Float64", val: 2.78, want: NullFloat64{2.78, true}},
1231		{col: "Float64", val: NullFloat64{2.71, true}, want: 2.71},
1232		{col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}},
1233		{col: "Float64", val: NullFloat64{0, false}},
1234		{col: "Float64", val: nil, want: NullFloat64{}},
1235		{col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)},
1236		{col: "Float64Array", val: []float64{}, want: []NullFloat64{}},
1237		{col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}},
1238		{col: "Float64Array", val: []NullFloat64(nil)},
1239		{col: "Float64Array", val: []NullFloat64{}},
1240		{col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}},
1241		{col: "Date", val: d1},
1242		{col: "Date", val: d1, want: NullDate{d1, true}},
1243		{col: "Date", val: NullDate{d1, true}},
1244		{col: "Date", val: NullDate{d1, true}, want: d1},
1245		{col: "Date", val: NullDate{civil.Date{}, false}},
1246		{col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)},
1247		{col: "DateArray", val: []civil.Date{}, want: []NullDate{}},
1248		{col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}},
1249		{col: "Timestamp", val: t1},
1250		{col: "Timestamp", val: t1, want: NullTime{t1, true}},
1251		{col: "Timestamp", val: NullTime{t1, true}},
1252		{col: "Timestamp", val: NullTime{t1, true}, want: t1},
1253		{col: "Timestamp", val: NullTime{}},
1254		{col: "Timestamp", val: nil, want: NullTime{}},
1255		{col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)},
1256		{col: "TimestampArray", val: []time.Time{}, want: []NullTime{}},
1257		{col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}},
1258	}
1259
1260	// Write rows into table first.
1261	var muts []*Mutation
1262	for i, test := range tests {
1263		muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val}))
1264	}
1265	if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil {
1266		t.Fatal(err)
1267	}
1268
1269	for i, test := range tests {
1270		row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col})
1271		if err != nil {
1272			t.Fatalf("Unable to fetch row %v: %v", i, err)
1273		}
1274		// Create new instance of type of test.want.
1275		want := test.want
1276		if want == nil {
1277			want = test.val
1278		}
1279		gotp := reflect.New(reflect.TypeOf(want))
1280		if err := row.Column(0, gotp.Interface()); err != nil {
1281			t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err)
1282			continue
1283		}
1284		got := reflect.Indirect(gotp).Interface()
1285
1286		// One of the test cases is checking NaN handling.  Given
1287		// NaN!=NaN, we can't use reflect to test for it.
1288		if isNaN(got) && isNaN(want) {
1289			continue
1290		}
1291
1292		// Check non-NaN cases.
1293		if !testEqual(got, want) {
1294			t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want)
1295			continue
1296		}
1297	}
1298}
1299
1300// Test decoding Cloud Spanner STRUCT type.
1301func TestIntegration_StructTypes(t *testing.T) {
1302	t.Parallel()
1303
1304	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1305	defer cancel()
1306	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1307	defer cleanup()
1308
1309	tests := []struct {
1310		q    Statement
1311		want func(r *Row) error
1312	}{
1313		{
1314			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`},
1315			want: func(r *Row) error {
1316				// Test STRUCT ARRAY decoding to []NullRow.
1317				var rows []NullRow
1318				if err := r.Column(0, &rows); err != nil {
1319					return err
1320				}
1321				if len(rows) != 1 {
1322					return fmt.Errorf("len(rows) = %d; want 1", len(rows))
1323				}
1324				if !rows[0].Valid {
1325					return fmt.Errorf("rows[0] is NULL")
1326				}
1327				var i, j int64
1328				if err := rows[0].Row.Columns(&i, &j); err != nil {
1329					return err
1330				}
1331				if i != 1 || j != 2 {
1332					return fmt.Errorf("got (%d,%d), want (1,2)", i, j)
1333				}
1334				return nil
1335			},
1336		},
1337		{
1338			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`},
1339			want: func(r *Row) error {
1340				// Test Row.ToStruct.
1341				s := struct {
1342					Col1 []*struct {
1343						Foo int64 `spanner:"foo"`
1344						Bar int64 `spanner:"bar"`
1345					} `spanner:"col1"`
1346				}{}
1347				if err := r.ToStruct(&s); err != nil {
1348					return err
1349				}
1350				want := struct {
1351					Col1 []*struct {
1352						Foo int64 `spanner:"foo"`
1353						Bar int64 `spanner:"bar"`
1354					} `spanner:"col1"`
1355				}{
1356					Col1: []*struct {
1357						Foo int64 `spanner:"foo"`
1358						Bar int64 `spanner:"bar"`
1359					}{
1360						{
1361							Foo: 1,
1362							Bar: 2,
1363						},
1364					},
1365				}
1366				if !testEqual(want, s) {
1367					return fmt.Errorf("unexpected decoding result: %v, want %v", s, want)
1368				}
1369				return nil
1370			},
1371		},
1372	}
1373	for i, test := range tests {
1374		iter := client.Single().Query(ctx, test.q)
1375		defer iter.Stop()
1376		row, err := iter.Next()
1377		if err != nil {
1378			t.Errorf("%d: %v", i, err)
1379			continue
1380		}
1381		if err := test.want(row); err != nil {
1382			t.Errorf("%d: %v", i, err)
1383			continue
1384		}
1385	}
1386}
1387
1388func TestIntegration_StructParametersUnsupported(t *testing.T) {
1389	skipEmulatorTest(t)
1390	t.Parallel()
1391
1392	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1393	defer cancel()
1394	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
1395	defer cleanup()
1396
1397	for _, test := range []struct {
1398		param       interface{}
1399		wantCode    codes.Code
1400		wantMsgPart string
1401	}{
1402		{
1403			struct {
1404				Field int
1405			}{10},
1406			codes.Unimplemented,
1407			"Unsupported query shape: " +
1408				"A struct value cannot be returned as a column value. " +
1409				"Rewrite the query to flatten the struct fields in the result.",
1410		},
1411		{
1412			[]struct {
1413				Field int
1414			}{{10}, {20}},
1415			codes.Unimplemented,
1416			"Unsupported query shape: " +
1417				"This query can return a null-valued array of struct, " +
1418				"which is not supported by Spanner.",
1419		},
1420	} {
1421		iter := client.Single().Query(ctx, Statement{
1422			SQL:    "SELECT @p",
1423			Params: map[string]interface{}{"p": test.param},
1424		})
1425		_, err := iter.Next()
1426		iter.Stop()
1427		if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok {
1428			t.Fatal(msg)
1429		}
1430	}
1431}
1432
1433// Test queries of the form "SELECT expr".
1434func TestIntegration_QueryExpressions(t *testing.T) {
1435	t.Parallel()
1436
1437	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1438	defer cancel()
1439	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
1440	defer cleanup()
1441
1442	newRow := func(vals []interface{}) *Row {
1443		row, err := NewRow(make([]string, len(vals)), vals)
1444		if err != nil {
1445			t.Fatal(err)
1446		}
1447		return row
1448	}
1449
1450	tests := []struct {
1451		expr string
1452		want interface{}
1453	}{
1454		{"1", int64(1)},
1455		{"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}},
1456		{"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}},
1457		{"IEEE_DIVIDE(1, 0)", math.Inf(1)},
1458		{"IEEE_DIVIDE(-1, 0)", math.Inf(-1)},
1459		{"IEEE_DIVIDE(0, 0)", math.NaN()},
1460		// TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate.
1461		{"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}},
1462		{"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}},
1463		{"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}},
1464	}
1465	for _, test := range tests {
1466		iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr})
1467		defer iter.Stop()
1468		row, err := iter.Next()
1469		if err != nil {
1470			t.Errorf("%q: %v", test.expr, err)
1471			continue
1472		}
1473		// Create new instance of type of test.want.
1474		gotp := reflect.New(reflect.TypeOf(test.want))
1475		if err := row.Column(0, gotp.Interface()); err != nil {
1476			t.Errorf("%q: Column returned error %v", test.expr, err)
1477			continue
1478		}
1479		got := reflect.Indirect(gotp).Interface()
1480		// TODO(jba): remove isNaN special case when we have a better equality predicate.
1481		if isNaN(got) && isNaN(test.want) {
1482			continue
1483		}
1484		if !testEqual(got, test.want) {
1485			t.Errorf("%q\n got  %#v\nwant %#v", test.expr, got, test.want)
1486		}
1487	}
1488}
1489
1490func TestIntegration_QueryStats(t *testing.T) {
1491	skipEmulatorTest(t)
1492	t.Parallel()
1493
1494	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1495	defer cancel()
1496	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1497	defer cleanup()
1498
1499	accounts := []*Mutation{
1500		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1501		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1502	}
1503	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1504		t.Fatal(err)
1505	}
1506	const sql = "SELECT Balance FROM Accounts"
1507
1508	qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil})
1509	if err != nil {
1510		t.Fatal(err)
1511	}
1512	if len(qp.PlanNodes) == 0 {
1513		t.Error("got zero plan nodes, expected at least one")
1514	}
1515
1516	iter := client.Single().QueryWithStats(ctx, Statement{sql, nil})
1517	defer iter.Stop()
1518	for {
1519		_, err := iter.Next()
1520		if err == iterator.Done {
1521			break
1522		}
1523		if err != nil {
1524			t.Fatal(err)
1525		}
1526	}
1527	if iter.QueryPlan == nil {
1528		t.Error("got nil QueryPlan, expected one")
1529	}
1530	if iter.QueryStats == nil {
1531		t.Error("got nil QueryStats, expected some")
1532	}
1533}
1534
1535func TestIntegration_InvalidDatabase(t *testing.T) {
1536	t.Parallel()
1537
1538	if databaseAdmin == nil {
1539		t.Skip("Integration tests skipped")
1540	}
1541	ctx := context.Background()
1542	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID)
1543	c, err := createClient(ctx, dbPath, SessionPoolConfig{})
1544	// Client creation should succeed even if the database is invalid.
1545	if err != nil {
1546		t.Fatal(err)
1547	}
1548	_, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"})
1549	if msg, ok := matchError(err, codes.NotFound, ""); !ok {
1550		t.Fatal(msg)
1551	}
1552}
1553
1554func TestIntegration_ReadErrors(t *testing.T) {
1555	t.Parallel()
1556
1557	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1558	defer cancel()
1559	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
1560	defer cleanup()
1561
1562	var ms []*Mutation
1563	for i := 0; i < 2; i++ {
1564		ms = append(ms, InsertOrUpdate(testTable,
1565			testTableColumns,
1566			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v")}))
1567	}
1568	if _, err := client.Apply(ctx, ms); err != nil {
1569		t.Fatal(err)
1570	}
1571
1572	// Read over invalid table fails
1573	_, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"})
1574	if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok {
1575		t.Error(msg)
1576	}
1577	// Read over invalid column fails
1578	_, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"})
1579	if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok {
1580		t.Error(msg)
1581	}
1582
1583	// Invalid query fails
1584	iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"})
1585	defer iter.Stop()
1586	_, err = iter.Next()
1587	if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok {
1588		t.Error(msg)
1589	}
1590
1591	// Read should fail on cancellation.
1592	cctx, cancel := context.WithCancel(ctx)
1593	cancel()
1594	_, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"})
1595	if msg, ok := matchError(err, codes.Canceled, ""); !ok {
1596		t.Error(msg)
1597	}
1598	// Read should fail if deadline exceeded.
1599	dctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
1600	defer cancel()
1601	<-dctx.Done()
1602	_, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"})
1603	if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok {
1604		t.Error(msg)
1605	}
1606	// Read should fail if there are multiple rows returned.
1607	_, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v"}, testTableColumns)
1608	wantMsgPart := fmt.Sprintf("more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", testTable, Key{"v"}, testTableIndex)
1609	if msg, ok := matchError(err, codes.FailedPrecondition, wantMsgPart); !ok {
1610		t.Error(msg)
1611	}
1612}
1613
1614// Test TransactionRunner. Test that transactions are aborted and retried as
1615// expected.
1616func TestIntegration_TransactionRunner(t *testing.T) {
1617	skipEmulatorTest(t)
1618	t.Parallel()
1619
1620	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1621	defer cancel()
1622	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1623	defer cleanup()
1624
1625	// Test 1: User error should abort the transaction.
1626	_, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1627		tx.BufferWrite([]*Mutation{
1628			Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})})
1629		return errors.New("user error")
1630	})
1631	// Empty read.
1632	rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"}))
1633	if err != nil {
1634		t.Fatal(err)
1635	}
1636	if got, want := len(rows), 0; got != want {
1637		t.Errorf("Empty read, got %d, want %d.", got, want)
1638	}
1639
1640	// Test 2: Expect abort and retry.
1641	// We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by
1642	// committing writes to the column txn2 have read, and expect the following
1643	// read to abort and txn2 retries.
1644
1645	// Set up two accounts
1646	accounts := []*Mutation{
1647		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}),
1648		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}),
1649	}
1650	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1651		t.Fatal(err)
1652	}
1653
1654	var (
1655		cTxn1Start  = make(chan struct{})
1656		cTxn1Commit = make(chan struct{})
1657		cTxn2Start  = make(chan struct{})
1658		wg          sync.WaitGroup
1659	)
1660
1661	// read balance, check error if we don't expect abort.
1662	readBalance := func(tx interface {
1663		ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error)
1664	}, key int64, expectAbort bool) (int64, error) {
1665		var b int64
1666		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"})
1667		if e != nil {
1668			if expectAbort && !isAbortErr(e) {
1669				t.Errorf("ReadRow got %v, want Abort error.", e)
1670			}
1671			// Verify that we received and are able to extract retry info from
1672			// the aborted error.
1673			if _, hasRetryInfo := extractRetryDelay(e); !hasRetryInfo {
1674				t.Errorf("Got Abort error without RetryInfo\nGot: %v", e)
1675			}
1676			return b, e
1677		}
1678		if ce := r.Column(0, &b); ce != nil {
1679			return b, ce
1680		}
1681		return b, nil
1682	}
1683
1684	wg.Add(2)
1685	// Txn 1
1686	go func() {
1687		defer wg.Done()
1688		var once sync.Once
1689		_, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1690			b, e := readBalance(tx, 1, false)
1691			if e != nil {
1692				return e
1693			}
1694			// txn 1 can abort, in that case we skip closing the channel on
1695			// retry.
1696			once.Do(func() { close(cTxn1Start) })
1697			e = tx.BufferWrite([]*Mutation{
1698				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})})
1699			if e != nil {
1700				return e
1701			}
1702			// Wait for second transaction.
1703			<-cTxn2Start
1704			return nil
1705		})
1706		close(cTxn1Commit)
1707		if e != nil {
1708			t.Errorf("Transaction 1 commit, got %v, want nil.", e)
1709		}
1710	}()
1711	// Txn 2
1712	go func() {
1713		// Wait until txn 1 starts.
1714		<-cTxn1Start
1715		defer wg.Done()
1716		var (
1717			once sync.Once
1718			b1   int64
1719			b2   int64
1720			e    error
1721		)
1722		_, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1723			if b1, e = readBalance(tx, 1, false); e != nil {
1724				return e
1725			}
1726			// Skip closing channel on retry.
1727			once.Do(func() { close(cTxn2Start) })
1728			// Wait until txn 1 successfully commits.
1729			<-cTxn1Commit
1730			// Txn1 has committed and written a balance to the account. Now this
1731			// transaction (txn2) reads and re-writes the balance. The first
1732			// time through, it will abort because it overlaps with txn1. Then
1733			// it will retry after txn1 commits, and succeed.
1734			if b2, e = readBalance(tx, 2, true); e != nil {
1735				return e
1736			}
1737			return tx.BufferWrite([]*Mutation{
1738				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})})
1739		})
1740		if e != nil {
1741			t.Errorf("Transaction 2 commit, got %v, want nil.", e)
1742		}
1743	}()
1744	wg.Wait()
1745	// Check that both transactions' effects are visible.
1746	for i := int64(1); i <= int64(2); i++ {
1747		if b, e := readBalance(client.Single(), i, false); e != nil {
1748			t.Fatalf("ReadBalance for key %d error %v.", i, e)
1749		} else if b != i {
1750			t.Errorf("Balance for key %d, got %d, want %d.", i, b, i)
1751		}
1752	}
1753}
1754
1755// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then
1756// serialize and deserialize both transaction and partition to be used in
1757// execution on another client, and compare results.
1758func TestIntegration_BatchQuery(t *testing.T) {
1759	skipEmulatorTest(t)
1760	t.Parallel()
1761
1762	// Set up testing environment.
1763	var (
1764		client2 *Client
1765		err     error
1766	)
1767	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1768	defer cancel()
1769	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
1770	defer cleanup()
1771
1772	if err = populate(ctx, client); err != nil {
1773		t.Fatal(err)
1774	}
1775	if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
1776		t.Fatal(err)
1777	}
1778	defer client2.Close()
1779
1780	// PartitionQuery
1781	var (
1782		txn        *BatchReadOnlyTransaction
1783		partitions []*Partition
1784		stmt       = Statement{SQL: "SELECT * FROM test;"}
1785	)
1786
1787	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1788		t.Fatal(err)
1789	}
1790	defer txn.Cleanup(ctx)
1791	if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil {
1792		t.Fatal(err)
1793	}
1794
1795	// Reconstruct BatchReadOnlyTransactionID and execute partitions
1796	var (
1797		tid2      BatchReadOnlyTransactionID
1798		data      []byte
1799		gotResult bool // if we get matching result from two separate txns
1800	)
1801	if data, err = txn.ID.MarshalBinary(); err != nil {
1802		t.Fatalf("encoding failed %v", err)
1803	}
1804	if err = tid2.UnmarshalBinary(data); err != nil {
1805		t.Fatalf("decoding failed %v", err)
1806	}
1807	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
1808
1809	// Execute Partitions and compare results
1810	for i, p := range partitions {
1811		iter := txn.Execute(ctx, p)
1812		defer iter.Stop()
1813		p2 := serdesPartition(t, i, p)
1814		iter2 := txn2.Execute(ctx, &p2)
1815		defer iter2.Stop()
1816
1817		row1, err1 := iter.Next()
1818		row2, err2 := iter2.Next()
1819		if err1 != err2 {
1820			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
1821			continue
1822		}
1823		if !testEqual(row1, row2) {
1824			t.Fatalf("execution returned different values: %v, %v", row1, row2)
1825			continue
1826		}
1827		if row1 == nil {
1828			continue
1829		}
1830		var a, b string
1831		if err = row1.Columns(&a, &b); err != nil {
1832			t.Fatalf("failed to parse row %v", err)
1833			continue
1834		}
1835		if a == str1 && b == str2 {
1836			gotResult = true
1837		}
1838	}
1839	if !gotResult {
1840		t.Fatalf("execution didn't return expected values")
1841	}
1842}
1843
1844// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery
1845func TestIntegration_BatchRead(t *testing.T) {
1846	skipEmulatorTest(t)
1847	t.Parallel()
1848
1849	// Set up testing environment.
1850	var (
1851		client2 *Client
1852		err     error
1853	)
1854	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1855	defer cancel()
1856	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
1857	defer cleanup()
1858
1859	if err = populate(ctx, client); err != nil {
1860		t.Fatal(err)
1861	}
1862	if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
1863		t.Fatal(err)
1864	}
1865	defer client2.Close()
1866
1867	// PartitionRead
1868	var (
1869		txn        *BatchReadOnlyTransaction
1870		partitions []*Partition
1871	)
1872
1873	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1874		t.Fatal(err)
1875	}
1876	defer txn.Cleanup(ctx)
1877	if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
1878		t.Fatal(err)
1879	}
1880
1881	// Reconstruct BatchReadOnlyTransactionID and execute partitions.
1882	var (
1883		tid2      BatchReadOnlyTransactionID
1884		data      []byte
1885		gotResult bool // if we get matching result from two separate txns
1886	)
1887	if data, err = txn.ID.MarshalBinary(); err != nil {
1888		t.Fatalf("encoding failed %v", err)
1889	}
1890	if err = tid2.UnmarshalBinary(data); err != nil {
1891		t.Fatalf("decoding failed %v", err)
1892	}
1893	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
1894
1895	// Execute Partitions and compare results.
1896	for i, p := range partitions {
1897		iter := txn.Execute(ctx, p)
1898		defer iter.Stop()
1899		p2 := serdesPartition(t, i, p)
1900		iter2 := txn2.Execute(ctx, &p2)
1901		defer iter2.Stop()
1902
1903		row1, err1 := iter.Next()
1904		row2, err2 := iter2.Next()
1905		if err1 != err2 {
1906			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
1907			continue
1908		}
1909		if !testEqual(row1, row2) {
1910			t.Fatalf("execution returned different values: %v, %v", row1, row2)
1911			continue
1912		}
1913		if row1 == nil {
1914			continue
1915		}
1916		var a, b string
1917		if err = row1.Columns(&a, &b); err != nil {
1918			t.Fatalf("failed to parse row %v", err)
1919			continue
1920		}
1921		if a == str1 && b == str2 {
1922			gotResult = true
1923		}
1924	}
1925	if !gotResult {
1926		t.Fatalf("execution didn't return expected values")
1927	}
1928}
1929
1930// Test normal txReadEnv method on BatchReadOnlyTransaction.
1931func TestIntegration_BROTNormal(t *testing.T) {
1932	skipEmulatorTest(t)
1933	t.Parallel()
1934
1935	// Set up testing environment and create txn.
1936	var (
1937		txn *BatchReadOnlyTransaction
1938		err error
1939		row *Row
1940		i   int64
1941	)
1942	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1943	defer cancel()
1944	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
1945	defer cleanup()
1946
1947	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
1948		t.Fatal(err)
1949	}
1950	defer txn.Cleanup(ctx)
1951	if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
1952		t.Fatal(err)
1953	}
1954	// Normal query should work with BatchReadOnlyTransaction.
1955	stmt2 := Statement{SQL: "SELECT 1"}
1956	iter := txn.Query(ctx, stmt2)
1957	defer iter.Stop()
1958
1959	row, err = iter.Next()
1960	if err != nil {
1961		t.Errorf("query failed with %v", err)
1962	}
1963	if err = row.Columns(&i); err != nil {
1964		t.Errorf("failed to parse row %v", err)
1965	}
1966}
1967
1968func TestIntegration_CommitTimestamp(t *testing.T) {
1969	t.Parallel()
1970
1971	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1972	defer cancel()
1973	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, ctsDBStatements)
1974	defer cleanup()
1975
1976	type testTableRow struct {
1977		Key string
1978		Ts  NullTime
1979	}
1980
1981	var (
1982		cts1, cts2, ts1, ts2 time.Time
1983		err                  error
1984	)
1985
1986	// Apply mutation in sequence, expect to see commit timestamp in good order,
1987	// check also the commit timestamp returned
1988	for _, it := range []struct {
1989		k string
1990		t *time.Time
1991	}{
1992		{"a", &cts1},
1993		{"b", &cts2},
1994	} {
1995		tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}}
1996		m, err := InsertStruct("TestTable", tt)
1997		if err != nil {
1998			t.Fatal(err)
1999		}
2000		*it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce())
2001		if err != nil {
2002			t.Fatal(err)
2003		}
2004	}
2005
2006	txn := client.ReadOnlyTransaction()
2007	for _, it := range []struct {
2008		k string
2009		t *time.Time
2010	}{
2011		{"a", &ts1},
2012		{"b", &ts2},
2013	} {
2014		if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil {
2015			t.Fatal(err)
2016		} else {
2017			var got testTableRow
2018			if err := r.ToStruct(&got); err != nil {
2019				t.Fatal(err)
2020			}
2021			*it.t = got.Ts.Time
2022		}
2023	}
2024	if !cts1.Equal(ts1) {
2025		t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1)
2026	}
2027	if !cts2.Equal(ts2) {
2028		t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2)
2029	}
2030
2031	// Try writing a timestamp in the future to commit timestamp, expect error.
2032	_, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce())
2033	if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok {
2034		t.Error(msg)
2035	}
2036}
2037
2038func TestIntegration_DML(t *testing.T) {
2039	t.Parallel()
2040
2041	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2042	defer cancel()
2043	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2044	defer cleanup()
2045
2046	// Function that reads a single row's first name from within a transaction.
2047	readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) {
2048		row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"})
2049		if err != nil {
2050			return "", err
2051		}
2052		var fn string
2053		if err := row.Column(0, &fn); err != nil {
2054			return "", err
2055		}
2056		return fn, nil
2057	}
2058
2059	// Function that reads multiple rows' first names from outside a read/write
2060	// transaction.
2061	readFirstNames := func(keys ...int) []string {
2062		var ks []KeySet
2063		for _, k := range keys {
2064			ks = append(ks, Key{k})
2065		}
2066		iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"})
2067		var got []string
2068		var fn string
2069		err := iter.Do(func(row *Row) error {
2070			if err := row.Column(0, &fn); err != nil {
2071				return err
2072			}
2073			got = append(got, fn)
2074			return nil
2075		})
2076		if err != nil {
2077			t.Fatalf("readFirstNames(%v): %v", keys, err)
2078		}
2079		return got
2080	}
2081
2082	// Use ReadWriteTransaction.Query to execute a DML statement.
2083	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2084		iter := tx.Query(ctx, Statement{
2085			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`,
2086		})
2087		defer iter.Stop()
2088		if row, err := iter.Next(); err != iterator.Done {
2089			t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err)
2090		}
2091		if iter.RowCount != 1 {
2092			t.Errorf("row count: got %d, want 1", iter.RowCount)
2093		}
2094		// The results of the DML statement should be visible to the transaction.
2095		got, err := readFirstName(tx, 1)
2096		if err != nil {
2097			return err
2098		}
2099		if want := "Umm"; got != want {
2100			t.Errorf("got %q, want %q", got, want)
2101		}
2102		return nil
2103	})
2104	if err != nil {
2105		t.Fatal(err)
2106	}
2107
2108	// Use ReadWriteTransaction.Update to execute a DML statement.
2109	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2110		count, err := tx.Update(ctx, Statement{
2111			SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`,
2112		})
2113		if err != nil {
2114			t.Fatal(err)
2115		}
2116		if count != 1 {
2117			t.Errorf("row count: got %d, want 1", count)
2118		}
2119		got, err := readFirstName(tx, 2)
2120		if err != nil {
2121			return err
2122		}
2123		if want := "Eduard"; got != want {
2124			t.Errorf("got %q, want %q", got, want)
2125		}
2126		return nil
2127
2128	})
2129	if err != nil {
2130		t.Fatal(err)
2131	}
2132
2133	// Roll back a DML statement and confirm that it didn't happen.
2134	var fail = errors.New("fail")
2135	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2136		_, err := tx.Update(ctx, Statement{
2137			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
2138		})
2139		if err != nil {
2140			return err
2141		}
2142		return fail
2143	})
2144	if err != fail {
2145		t.Fatalf("rolling back: got error %v, want the error 'fail'", err)
2146	}
2147	_, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"})
2148	if got, want := ErrCode(err), codes.NotFound; got != want {
2149		t.Errorf("got %s, want %s", got, want)
2150	}
2151
2152	// Run two DML statements in the same transaction.
2153	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2154		_, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`})
2155		if err != nil {
2156			return err
2157		}
2158		_, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`})
2159		if err != nil {
2160			return err
2161		}
2162		return nil
2163	})
2164	if err != nil {
2165		t.Fatal(err)
2166	}
2167	got := readFirstNames(1, 2)
2168	want := []string{"Oum", "Eddie"}
2169	if !testEqual(got, want) {
2170		t.Errorf("got %v, want %v", got, want)
2171	}
2172
2173	// Run a DML statement and an ordinary mutation in the same transaction.
2174	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2175		_, err := tx.Update(ctx, Statement{
2176			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
2177		})
2178		if err != nil {
2179			return err
2180		}
2181		return tx.BufferWrite([]*Mutation{
2182			Insert("Singers", []string{"SingerId", "FirstName", "LastName"},
2183				[]interface{}{4, "Andy", "Irvine"}),
2184		})
2185	})
2186	if err != nil {
2187		t.Fatal(err)
2188	}
2189	got = readFirstNames(3, 4)
2190	want = []string{"Audra", "Andy"}
2191	if !testEqual(got, want) {
2192		t.Errorf("got %v, want %v", got, want)
2193	}
2194
2195	// Attempt to run a query using update.
2196	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2197		_, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`})
2198		return err
2199	})
2200	if got, want := ErrCode(err), codes.InvalidArgument; got != want {
2201		t.Errorf("got %s, want %s", got, want)
2202	}
2203}
2204
2205func TestIntegration_StructParametersBind(t *testing.T) {
2206	t.Parallel()
2207
2208	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2209	defer cancel()
2210	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
2211	defer cleanup()
2212
2213	type tRow []interface{}
2214	type tRows []struct{ trow tRow }
2215
2216	type allFields struct {
2217		Stringf string
2218		Intf    int
2219		Boolf   bool
2220		Floatf  float64
2221		Bytef   []byte
2222		Timef   time.Time
2223		Datef   civil.Date
2224	}
2225	allColumns := []string{
2226		"Stringf",
2227		"Intf",
2228		"Boolf",
2229		"Floatf",
2230		"Bytef",
2231		"Timef",
2232		"Datef",
2233	}
2234	s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1}
2235	s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2}
2236
2237	dynamicStructType := reflect.StructOf([]reflect.StructField{
2238		{Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`},
2239	})
2240	s3 := reflect.New(dynamicStructType)
2241	s3.Elem().Field(0).Set(reflect.ValueOf(t1))
2242
2243	for i, test := range []struct {
2244		param interface{}
2245		sql   string
2246		cols  []string
2247		trows tRows
2248	}{
2249		// Struct value.
2250		{
2251			s1,
2252			"SELECT" +
2253				" @p.Stringf," +
2254				" @p.Intf," +
2255				" @p.Boolf," +
2256				" @p.Floatf," +
2257				" @p.Bytef," +
2258				" @p.Timef," +
2259				" @p.Datef",
2260			allColumns,
2261			tRows{
2262				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
2263			},
2264		},
2265		// Array of struct value.
2266		{
2267			[]allFields{s1, s2},
2268			"SELECT * FROM UNNEST(@p)",
2269			allColumns,
2270			tRows{
2271				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
2272				{tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}},
2273			},
2274		},
2275		// Null struct.
2276		{
2277			(*allFields)(nil),
2278			"SELECT @p IS NULL",
2279			[]string{""},
2280			tRows{
2281				{tRow{true}},
2282			},
2283		},
2284		// Null Array of struct.
2285		{
2286			[]allFields(nil),
2287			"SELECT @p IS NULL",
2288			[]string{""},
2289			tRows{
2290				{tRow{true}},
2291			},
2292		},
2293		// Empty struct.
2294		{
2295			struct{}{},
2296			"SELECT @p IS NULL ",
2297			[]string{""},
2298			tRows{
2299				{tRow{false}},
2300			},
2301		},
2302		// Empty array of struct.
2303		{
2304			[]allFields{},
2305			"SELECT * FROM UNNEST(@p) ",
2306			allColumns,
2307			tRows{},
2308		},
2309		// Struct with duplicate fields.
2310		{
2311			struct {
2312				A int `spanner:"field"`
2313				B int `spanner:"field"`
2314			}{10, 20},
2315			"SELECT * FROM UNNEST([@p]) ",
2316			[]string{"field", "field"},
2317			tRows{
2318				{tRow{10, 20}},
2319			},
2320		},
2321		// Struct with unnamed fields.
2322		{
2323			struct {
2324				A string `spanner:""`
2325			}{"hello"},
2326			"SELECT * FROM UNNEST([@p]) ",
2327			[]string{""},
2328			tRows{
2329				{tRow{"hello"}},
2330			},
2331		},
2332		// Mixed struct.
2333		{
2334			struct {
2335				DynamicStructField interface{}  `spanner:"f1"`
2336				ArrayStructField   []*allFields `spanner:"f2"`
2337			}{
2338				DynamicStructField: s3.Interface(),
2339				ArrayStructField:   []*allFields{nil},
2340			},
2341			"SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ",
2342			[]string{"ff1", "", ""},
2343			tRows{
2344				{tRow{t1, 1, true}},
2345			},
2346		},
2347	} {
2348		iter := client.Single().Query(ctx, Statement{
2349			SQL:    test.sql,
2350			Params: map[string]interface{}{"p": test.param},
2351		})
2352		var gotRows []*Row
2353		err := iter.Do(func(r *Row) error {
2354			gotRows = append(gotRows, r)
2355			return nil
2356		})
2357		if err != nil {
2358			t.Errorf("Failed to execute test case %d, error: %v", i, err)
2359		}
2360
2361		var wantRows []*Row
2362		for j, row := range test.trows {
2363			r, err := NewRow(test.cols, row.trow)
2364			if err != nil {
2365				t.Errorf("Invalid row %d in test case %d", j, i)
2366			}
2367			wantRows = append(wantRows, r)
2368		}
2369		if !testEqual(gotRows, wantRows) {
2370			t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows)
2371		}
2372	}
2373}
2374
2375func TestIntegration_PDML(t *testing.T) {
2376	skipEmulatorTest(t)
2377	t.Parallel()
2378
2379	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2380	defer cancel()
2381	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2382	defer cleanup()
2383
2384	columns := []string{"SingerId", "FirstName", "LastName"}
2385
2386	// Populate the Singers table.
2387	var muts []*Mutation
2388	for _, row := range [][]interface{}{
2389		{1, "Umm", "Kulthum"},
2390		{2, "Eduard", "Khil"},
2391		{3, "Audra", "McDonald"},
2392		{4, "Enrique", "Iglesias"},
2393		{5, "Shakira", "Ripoll"},
2394	} {
2395		muts = append(muts, Insert("Singers", columns, row))
2396	}
2397	if _, err := client.Apply(ctx, muts); err != nil {
2398		t.Fatal(err)
2399	}
2400	// Identifiers in PDML statements must be fully qualified.
2401	// TODO(jba): revisit the above.
2402	count, err := client.PartitionedUpdate(ctx, Statement{
2403		SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= @end`,
2404		Params: map[string]interface{}{
2405			"end": 3,
2406		},
2407	})
2408	if err != nil {
2409		t.Fatal(err)
2410	}
2411	if want := int64(3); count != want {
2412		t.Fatalf("got %d, want %d", count, want)
2413	}
2414	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2415	if err != nil {
2416		t.Fatal(err)
2417	}
2418	want := [][]interface{}{
2419		{int64(1), "changed", "Kulthum"},
2420		{int64(2), "changed", "Khil"},
2421		{int64(3), "changed", "McDonald"},
2422		{int64(4), "Enrique", "Iglesias"},
2423		{int64(5), "Shakira", "Ripoll"},
2424	}
2425	if !testEqual(got, want) {
2426		t.Errorf("\ngot %v\nwant%v", got, want)
2427	}
2428}
2429
2430func TestBatchDML(t *testing.T) {
2431	t.Parallel()
2432
2433	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2434	defer cancel()
2435	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2436	defer cleanup()
2437
2438	columns := []string{"SingerId", "FirstName", "LastName"}
2439
2440	// Populate the Singers table.
2441	var muts []*Mutation
2442	for _, row := range [][]interface{}{
2443		{1, "Umm", "Kulthum"},
2444		{2, "Eduard", "Khil"},
2445		{3, "Audra", "McDonald"},
2446	} {
2447		muts = append(muts, Insert("Singers", columns, row))
2448	}
2449	if _, err := client.Apply(ctx, muts); err != nil {
2450		t.Fatal(err)
2451	}
2452
2453	var counts []int64
2454	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2455		counts, err = tx.BatchUpdate(ctx, []Statement{
2456			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2457			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2458			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2459		})
2460		return err
2461	})
2462
2463	if err != nil {
2464		t.Fatal(err)
2465	}
2466	if want := []int64{1, 1, 1}; !testEqual(counts, want) {
2467		t.Fatalf("got %d, want %d", counts, want)
2468	}
2469	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2470	if err != nil {
2471		t.Fatal(err)
2472	}
2473	want := [][]interface{}{
2474		{int64(1), "changed 1", "Kulthum"},
2475		{int64(2), "changed 2", "Khil"},
2476		{int64(3), "changed 3", "McDonald"},
2477	}
2478	if !testEqual(got, want) {
2479		t.Errorf("\ngot %v\nwant%v", got, want)
2480	}
2481}
2482
2483func TestBatchDML_NoStatements(t *testing.T) {
2484	t.Parallel()
2485
2486	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2487	defer cancel()
2488	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2489	defer cleanup()
2490
2491	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2492		_, err = tx.BatchUpdate(ctx, []Statement{})
2493		return err
2494	})
2495	if err == nil {
2496		t.Fatal("expected error, got nil")
2497	}
2498	if s, ok := status.FromError(err); ok {
2499		if s.Code() != codes.InvalidArgument {
2500			t.Fatalf("expected InvalidArgument, got %v", err)
2501		}
2502	} else {
2503		t.Fatalf("expected InvalidArgument, got %v", err)
2504	}
2505}
2506
2507func TestBatchDML_TwoStatements(t *testing.T) {
2508	t.Parallel()
2509
2510	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2511	defer cancel()
2512	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2513	defer cleanup()
2514
2515	columns := []string{"SingerId", "FirstName", "LastName"}
2516
2517	// Populate the Singers table.
2518	var muts []*Mutation
2519	for _, row := range [][]interface{}{
2520		{1, "Umm", "Kulthum"},
2521		{2, "Eduard", "Khil"},
2522		{3, "Audra", "McDonald"},
2523	} {
2524		muts = append(muts, Insert("Singers", columns, row))
2525	}
2526	if _, err := client.Apply(ctx, muts); err != nil {
2527		t.Fatal(err)
2528	}
2529
2530	var updateCount int64
2531	var batchCounts []int64
2532	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2533		batchCounts, err = tx.BatchUpdate(ctx, []Statement{
2534			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2535			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2536			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2537		})
2538		if err != nil {
2539			return err
2540		}
2541
2542		updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`})
2543		return err
2544	})
2545	if err != nil {
2546		t.Fatal(err)
2547	}
2548	if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) {
2549		t.Fatalf("got %d, want %d", batchCounts, want)
2550	}
2551	if updateCount != 1 {
2552		t.Fatalf("got %v, want 1", updateCount)
2553	}
2554}
2555
2556// TODO(deklerk): this currently does not work because the transaction appears to
2557// get rolled back after a single statement fails. b/120158761
2558func TestBatchDML_Error(t *testing.T) {
2559	t.Parallel()
2560
2561	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2562	defer cancel()
2563	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2564	defer cleanup()
2565
2566	columns := []string{"SingerId", "FirstName", "LastName"}
2567
2568	// Populate the Singers table.
2569	var muts []*Mutation
2570	for _, row := range [][]interface{}{
2571		{1, "Umm", "Kulthum"},
2572		{2, "Eduard", "Khil"},
2573		{3, "Audra", "McDonald"},
2574	} {
2575		muts = append(muts, Insert("Singers", columns, row))
2576	}
2577	if _, err := client.Apply(ctx, muts); err != nil {
2578		t.Fatal(err)
2579	}
2580
2581	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2582		counts, err := tx.BatchUpdate(ctx, []Statement{
2583			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2584			{SQL: `some illegal statement`},
2585			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2586		})
2587		if err == nil {
2588			t.Fatal("expected err, got nil")
2589		}
2590		if want := []int64{1}; !testEqual(counts, want) {
2591			t.Fatalf("got %d, want %d", counts, want)
2592		}
2593
2594		got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns))
2595		if err != nil {
2596			t.Fatal(err)
2597		}
2598		want := [][]interface{}{
2599			{int64(1), "changed 1", "Kulthum"},
2600			{int64(2), "Eduard", "Khil"},
2601			{int64(3), "Audra", "McDonald"},
2602		}
2603		if !testEqual(got, want) {
2604			t.Errorf("\ngot %v\nwant%v", got, want)
2605		}
2606
2607		return nil
2608	})
2609	if err != nil {
2610		t.Fatal(err)
2611	}
2612}
2613
2614// Prepare initializes Cloud Spanner testing DB and clients.
2615func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
2616	if databaseAdmin == nil {
2617		t.Skip("Integration tests skipped")
2618	}
2619	// Construct a unique test DB name.
2620	dbName := dbNameSpace.New()
2621
2622	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName)
2623	// Create database and tables.
2624	op, err := databaseAdmin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{
2625		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
2626		CreateStatement: "CREATE DATABASE " + dbName,
2627		ExtraStatements: statements,
2628	})
2629	if err != nil {
2630		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
2631	}
2632	if _, err := op.Wait(ctx); err != nil {
2633		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
2634	}
2635	client, err := createClient(ctx, dbPath, spc)
2636	if err != nil {
2637		t.Fatalf("cannot create data client on DB %v: %v", dbPath, err)
2638	}
2639	return client, dbPath, func() {
2640		client.Close()
2641	}
2642}
2643
2644func cleanupInstances() {
2645	if instanceAdmin == nil {
2646		// Integration tests skipped.
2647		return
2648	}
2649
2650	ctx := context.Background()
2651	parent := fmt.Sprintf("projects/%v", testProjectID)
2652	iter := instanceAdmin.ListInstances(ctx, &instancepb.ListInstancesRequest{
2653		Parent: parent,
2654		Filter: "name:gotest-",
2655	})
2656	expireAge := 24 * time.Hour
2657
2658	for {
2659		inst, err := iter.Next()
2660		if err == iterator.Done {
2661			break
2662		}
2663		if err != nil {
2664			panic(err)
2665		}
2666		if instanceNameSpace.Older(inst.Name, expireAge) {
2667			log.Printf("Deleting instance %s", inst.Name)
2668
2669			if err := instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{Name: inst.Name}); err != nil {
2670				log.Printf("failed to delete instance %s (error %v), might need a manual removal",
2671					inst.Name, err)
2672			}
2673		}
2674	}
2675}
2676
2677func rangeReads(ctx context.Context, t *testing.T, client *Client) {
2678	checkRange := func(ks KeySet, wantNums ...int) {
2679		if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok {
2680			t.Errorf("key set %+v: %s", ks, msg)
2681		}
2682	}
2683
2684	checkRange(Key{"k1"}, 1)
2685	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4)
2686	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5)
2687	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5)
2688	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4)
2689
2690	// Partial key specification.
2691	checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9)
2692	checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9)
2693	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10)
2694	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11)
2695
2696	// The following produce empty ranges.
2697	// TODO(jba): Consider a multi-part key to illustrate partial key behavior.
2698	// checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen})
2699	// checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen})
2700	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen})
2701	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed})
2702
2703	// Prefix is component-wise, not string prefix.
2704	checkRange(Key{"k1"}.AsPrefix(), 1)
2705	checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
2706
2707	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
2708}
2709
2710func indexRangeReads(ctx context.Context, t *testing.T, client *Client) {
2711	checkRange := func(ks KeySet, wantNums ...int) {
2712		if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns),
2713			wantNums); !ok {
2714			t.Errorf("key set %+v: %s", ks, msg)
2715		}
2716	}
2717
2718	checkRange(Key{"v1"}, 1)
2719	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4)
2720	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5)
2721	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5)
2722	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4)
2723
2724	// // Partial key specification.
2725	checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9)
2726	checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9)
2727	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10)
2728	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11)
2729
2730	// // The following produce empty ranges.
2731	// checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen})
2732	// checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen})
2733	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen})
2734	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed})
2735
2736	// // Prefix is component-wise, not string prefix.
2737	checkRange(Key{"v1"}.AsPrefix(), 1)
2738	checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
2739	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
2740
2741	// Read from an index with DESC ordering.
2742	wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}
2743	if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns),
2744		wantNums); !ok {
2745		t.Errorf("desc: %s", msg)
2746	}
2747}
2748
2749type testTableRow struct{ Key, StringValue string }
2750
2751func compareRows(iter *RowIterator, wantNums []int) (string, bool) {
2752	rows, err := readAllTestTable(iter)
2753	if err != nil {
2754		return err.Error(), false
2755	}
2756	want := map[string]string{}
2757	for _, n := range wantNums {
2758		want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n)
2759	}
2760	got := map[string]string{}
2761	for _, r := range rows {
2762		got[r.Key] = r.StringValue
2763	}
2764	if !testEqual(got, want) {
2765		return fmt.Sprintf("got %v, want %v", got, want), false
2766	}
2767	return "", true
2768}
2769
2770func isNaN(x interface{}) bool {
2771	f, ok := x.(float64)
2772	if !ok {
2773		return false
2774	}
2775	return math.IsNaN(f)
2776}
2777
2778// createClient creates Cloud Spanner data client.
2779func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) {
2780	if os.Getenv("SPANNER_EMULATOR_HOST") == "" {
2781		client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{
2782			SessionPoolConfig: spc,
2783		}, option.WithTokenSource(testutil.TokenSource(ctx, Scope, AdminScope)), option.WithEndpoint(endpoint))
2784	} else {
2785		// When the emulator is enabled, option.WithoutAuthentication()
2786		// will be added automatically which is incompatible with
2787		// option.WithTokenSource(testutil.TokenSource(ctx, Scope)).
2788		client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc})
2789	}
2790
2791	if err != nil {
2792		return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
2793	}
2794	return client, nil
2795}
2796
2797// populate prepares the database with some data.
2798func populate(ctx context.Context, client *Client) error {
2799	// Populate data
2800	var err error
2801	m := InsertMap("test", map[string]interface{}{
2802		"a": str1,
2803		"b": str2,
2804	})
2805	_, err = client.Apply(ctx, []*Mutation{m})
2806	return err
2807}
2808
2809func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) {
2810	if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) {
2811		return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false
2812	}
2813	return "", true
2814}
2815
2816func rowToValues(r *Row) ([]interface{}, error) {
2817	var x int64
2818	var y, z string
2819	if err := r.Column(0, &x); err != nil {
2820		return nil, err
2821	}
2822	if err := r.Column(1, &y); err != nil {
2823		return nil, err
2824	}
2825	if err := r.Column(2, &z); err != nil {
2826		return nil, err
2827	}
2828	return []interface{}{x, y, z}, nil
2829}
2830
2831func readAll(iter *RowIterator) ([][]interface{}, error) {
2832	defer iter.Stop()
2833	var vals [][]interface{}
2834	for {
2835		row, err := iter.Next()
2836		if err == iterator.Done {
2837			return vals, nil
2838		}
2839		if err != nil {
2840			return nil, err
2841		}
2842		v, err := rowToValues(row)
2843		if err != nil {
2844			return nil, err
2845		}
2846		vals = append(vals, v)
2847	}
2848}
2849
2850func readAllTestTable(iter *RowIterator) ([]testTableRow, error) {
2851	defer iter.Stop()
2852	var vals []testTableRow
2853	for {
2854		row, err := iter.Next()
2855		if err == iterator.Done {
2856			return vals, nil
2857		}
2858		if err != nil {
2859			return nil, err
2860		}
2861		var ttr testTableRow
2862		if err := row.ToStruct(&ttr); err != nil {
2863			return nil, err
2864		}
2865		vals = append(vals, ttr)
2866	}
2867}
2868
2869func maxDuration(a, b time.Duration) time.Duration {
2870	if a > b {
2871		return a
2872	}
2873	return b
2874}
2875
2876func skipEmulatorTest(t *testing.T) {
2877	if os.Getenv("SPANNER_EMULATOR_HOST") != "" {
2878		t.Skip("Skipping testing against the emulator.")
2879	}
2880}
2881