17package spanner
19import (
20	"context"
21	"errors"
22	"flag"
23	"fmt"
24	"log"
25	"math"
26	"math/big"
27	"os"
28	"os/exec"
29	"reflect"
30	"regexp"
31	"strings"
32	"sync"
33	"testing"
34	"time"
36	"cloud.google.com/go/civil"
37	"cloud.google.com/go/internal/testutil"
38	"cloud.google.com/go/internal/uid"
39	database "cloud.google.com/go/spanner/admin/database/apiv1"
40	instance "cloud.google.com/go/spanner/admin/instance/apiv1"
41	"google.golang.org/api/iterator"
42	"google.golang.org/api/option"
43	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
44	instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
45	sppb "google.golang.org/genproto/googleapis/spanner/v1"
46	"google.golang.org/grpc"
47	"google.golang.org/grpc/codes"
48	"google.golang.org/grpc/peer"
49	"google.golang.org/grpc/status"
52const (
53	directPathIPV6Prefix = "[2001:4860:8040"
54	directPathIPV4Prefix = "34.126"
57var (
58	// testProjectID specifies the project used for testing. It can be changed
59	// by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID.
60	testProjectID = testutil.ProjID()
62	// spannerHost specifies the spanner API host used for testing. It can be changed
63	// by setting the environment variable GCLOUD_TESTS_GOLANG_SPANNER_HOST
64	spannerHost = getSpannerHost()
66	dbNameSpace       = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
67	instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true})
68	backupIDSpace     = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
69	testInstanceID    = instanceNameSpace.New()
71	testTable        = "TestTable"
72	testTableIndex   = "TestTableByValue"
73	testTableColumns = []string{"Key", "StringValue"}
75	databaseAdmin *database.DatabaseAdminClient
76	instanceAdmin *instance.InstanceAdminClient
78	dpConfig directPathTestConfig
79	peerInfo *peer.Peer
81	singerDBStatements = []string{
82		`CREATE TABLE Singers (
83				SingerId	INT64 NOT NULL,
84				FirstName	STRING(1024),
85				LastName	STRING(1024),
86				SingerInfo	BYTES(MAX)
87			) PRIMARY KEY (SingerId)`,
88		`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
89		`CREATE TABLE Accounts (
90				AccountId	INT64 NOT NULL,
91				Nickname	STRING(100),
92				Balance		INT64 NOT NULL,
93			) PRIMARY KEY (AccountId)`,
94		`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
95		`CREATE TABLE Types (
96				RowID		INT64 NOT NULL,
97				String		STRING(MAX),
98				StringArray	ARRAY<STRING(MAX)>,
99				Bytes		BYTES(MAX),
100				BytesArray	ARRAY<BYTES(MAX)>,
101				Int64a		INT64,
102				Int64Array	ARRAY<INT64>,
103				Bool		BOOL,
104				BoolArray	ARRAY<BOOL>,
105				Float64		FLOAT64,
106				Float64Array	ARRAY<FLOAT64>,
107				Date		DATE,
108				DateArray	ARRAY<DATE>,
109				Timestamp	TIMESTAMP,
110				TimestampArray	ARRAY<TIMESTAMP>,
111			) PRIMARY KEY (RowID)`,
112	}
114	readDBStatements = []string{
115		`CREATE TABLE TestTable (
116                    Key          STRING(MAX) NOT NULL,
117                    StringValue  STRING(MAX)
118            ) PRIMARY KEY (Key)`,
119		`CREATE INDEX TestTableByValue ON TestTable(StringValue)`,
120		`CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`,
121	}
123	simpleDBStatements = []string{
124		`CREATE TABLE test (
125				a	STRING(1024),
126				b	STRING(1024),
127			) PRIMARY KEY (a)`,
128	}
129	simpleDBTableColumns = []string{"a", "b"}
131	ctsDBStatements = []string{
132		`CREATE TABLE TestTable (
133		    Key  STRING(MAX) NOT NULL,
134		    Ts   TIMESTAMP OPTIONS (allow_commit_timestamp = true),
135	    ) PRIMARY KEY (Key)`,
136	}
137	backuDBStatements = []string{
138		`CREATE TABLE Singers (
139						SingerId	INT64 NOT NULL,
140						FirstName	STRING(1024),
141						LastName	STRING(1024),
142						SingerInfo	BYTES(MAX)
143					) PRIMARY KEY (SingerId)`,
144		`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
145		`CREATE TABLE Accounts (
146						AccountId	INT64 NOT NULL,
147						Nickname	STRING(100),
148						Balance		INT64 NOT NULL,
149					) PRIMARY KEY (AccountId)`,
150		`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
151		`CREATE TABLE Types (
152						RowID		INT64 NOT NULL,
153						String		STRING(MAX),
154						StringArray	ARRAY<STRING(MAX)>,
155						Bytes		BYTES(MAX),
156						BytesArray	ARRAY<BYTES(MAX)>,
157						Int64a		INT64,
158						Int64Array	ARRAY<INT64>,
159						Bool		BOOL,
160						BoolArray	ARRAY<BOOL>,
161						Float64		FLOAT64,
162						Float64Array	ARRAY<FLOAT64>,
163						Date		DATE,
164						DateArray	ARRAY<DATE>,
165						Timestamp	TIMESTAMP,
166						TimestampArray	ARRAY<TIMESTAMP>,
167					) PRIMARY KEY (RowID)`,
168	}
170	validInstancePattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)$")
172	blackholeDpv6Cmd string
173	blackholeDpv4Cmd string
174	allowDpv6Cmd     string
175	allowDpv4Cmd     string
178func init() {
179	flag.BoolVar(&dpConfig.attemptDirectPath, "it.attempt-directpath", false, "DirectPath integration test flag")
180	flag.BoolVar(&dpConfig.directPathIPv4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a IPv4-only VM")
181	peerInfo = &peer.Peer{}
182	// Use sysctl or iptables to blackhole DirectPath IP for fallback tests.
183	flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6")
184	flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4")
185	flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6")
186	flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4")
189type directPathTestConfig struct {
190	attemptDirectPath  bool
191	directPathIPv4Only bool
194func parseInstanceName(inst string) (project, instance string, err error) {
195	matches := validInstancePattern.FindStringSubmatch(inst)
196	if len(matches) == 0 {
197		return "", "", fmt.Errorf("Failed to parse instance name from %q according to pattern %q",
198			inst, validInstancePattern.String())
199	}
200	return matches[1], matches[2], nil
203func getSpannerHost() string {
207const (
208	str1 = "alice"
209	str2 = "a@example.com"
212func TestMain(m *testing.M) {
213	cleanup := initIntegrationTests()
214	res := m.Run()
215	cleanup()
216	os.Exit(res)
219var grpcHeaderChecker = testutil.DefaultHeadersEnforcer()
221func initIntegrationTests() (cleanup func()) {
222	ctx := context.Background()
223	flag.Parse() // Needed for testing.Short().
224	noop := func() {}
226	if testing.Short() {
227		log.Println("Integration tests skipped in -short mode.")
228		return noop
229	}
231	if testProjectID == "" {
232		log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing")
233		return noop
234	}
236	opts := grpcHeaderChecker.CallOptions()
237	if spannerHost != "" {
238		opts = append(opts, option.WithEndpoint(spannerHost))
239	}
240	if dpConfig.attemptDirectPath {
241		opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo))))
242	}
243	var err error
244	// Create InstanceAdmin and DatabaseAdmin clients.
245	instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...)
246	if err != nil {
247		log.Fatalf("cannot create instance databaseAdmin client: %v", err)
248	}
249	databaseAdmin, err = database.NewDatabaseAdminClient(ctx, opts...)
250	if err != nil {
251		log.Fatalf("cannot create databaseAdmin client: %v", err)
252	}
253	// Get the list of supported instance configs for the project that is used
254	// for the integration tests. The supported instance configs can differ per
255	// project. The integration tests will use the first instance config that
256	// is returned by Cloud Spanner. This will normally be the regional config
257	// that is physically the closest to where the request is coming from.
258	configIterator := instanceAdmin.ListInstanceConfigs(ctx, &instancepb.ListInstanceConfigsRequest{
259		Parent: fmt.Sprintf("projects/%s", testProjectID),
260	})
261	config, err := configIterator.Next()
262	if err != nil {
263		log.Fatalf("Cannot get any instance configurations.\nPlease make sure the Cloud Spanner API is enabled for the test project.\nGet error: %v", err)
264	}
266	// First clean up any old test instances before we start the actual testing
267	// as these might cause this test run to fail.
268	cleanupInstances()
270	// Create a test instance to use for this test run.
271	op, err := instanceAdmin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{
272		Parent:     fmt.Sprintf("projects/%s", testProjectID),
273		InstanceId: testInstanceID,
274		Instance: &instancepb.Instance{
275			Config:      config.Name,
276			DisplayName: testInstanceID,
277			NodeCount:   1,
278		},
279	})
280	if err != nil {
281		log.Fatalf("could not create instance with id %s: %v", fmt.Sprintf("projects/%s/instances/%s", testProjectID, testInstanceID), err)
282	}
283	// Wait for the instance creation to finish.
284	i, err := op.Wait(ctx)
285	if err != nil {
286		log.Fatalf("waiting for instance creation to finish failed: %v", err)
287	}
288	if i.State != instancepb.Instance_READY {
289		log.Printf("instance state is not READY, it might be that the test instance will cause problems during tests. Got state %v\n", i.State)
290	}
292	return func() {
293		// Delete this test instance.
294		instanceName := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID)
295		deleteInstanceAndBackups(ctx, instanceName)
296		// Delete other test instances that may be lingering around.
297		cleanupInstances()
298		databaseAdmin.Close()
299		instanceAdmin.Close()
300	}
303func TestIntegration_InitSessionPool(t *testing.T) {
304	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
305	defer cancel()
306	// Set up an empty testing environment.
307	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, []string{})
308	defer cleanup()
309	sp := client.idleSessions
310	sp.mu.Lock()
311	want := sp.MinOpened
312	sp.mu.Unlock()
313	var numOpened int
315	for {
316		select {
317		case <-ctx.Done():
318			t.Fatalf("timed out, got %d session(s), want %d", numOpened, want)
319		default:
320			sp.mu.Lock()
321			numOpened = sp.idleList.Len() + sp.idleWriteList.Len()
322			sp.mu.Unlock()
323			if uint64(numOpened) == want {
324				break loop
325			}
326		}
327	}
328	// Delete all sessions in the pool on the backend and then try to execute a
329	// simple query. The 'Session not found' error should cause an automatic
330	// retry of the read-only transaction.
331	sp.mu.Lock()
332	s := sp.idleList.Front()
333	for {
334		if s == nil {
335			break
336		}
337		// This will delete the session on the backend without removing it
338		// from the pool.
339		s.Value.(*session).delete(context.Background())
340		s = s.Next()
341	}
342	sp.mu.Unlock()
343	sql := "SELECT 1, 'FOO', 'BAR'"
344	tx := client.ReadOnlyTransaction()
345	defer tx.Close()
346	iter := tx.Query(context.Background(), NewStatement(sql))
347	rows, err := readAll(iter)
348	if err != nil {
349		t.Fatalf("Unexpected error for query %q: %v", sql, err)
350	}
351	if got, want := len(rows), 1; got != want {
352		t.Fatalf("Row count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
353	}
354	if got, want := len(rows[0]), 3; got != want {
355		t.Fatalf("Column count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
356	}
357	if got, want := rows[0][0].(int64), int64(1); got != want {
358		t.Fatalf("Column value mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
359	}
362// Test SingleUse transaction.
363func TestIntegration_SingleUse(t *testing.T) {
364	t.Parallel()
366	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
367	defer cancel()
368	// Set up testing environment.
369	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
370	defer cleanup()
372	writes := []struct {
373		row []interface{}
374		ts  time.Time
375	}{
376		{row: []interface{}{1, "Marc", "Foo"}},
377		{row: []interface{}{2, "Tars", "Bar"}},
378		{row: []interface{}{3, "Alpha", "Beta"}},
379		{row: []interface{}{4, "Last", "End"}},
380	}
381	// Try to write four rows through the Apply API.
382	for i, w := range writes {
383		var err error
384		m := InsertOrUpdate("Singers",
385			[]string{"SingerId", "FirstName", "LastName"},
386			w.row)
387		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
388			t.Fatal(err)
389		}
390		verifyDirectPathRemoteAddress(t)
391	}
392	// Calculate time difference between Cloud Spanner server and localhost to
393	// use to determine the exact staleness value to use.
394	timeDiff := maxDuration(time.Now().Sub(writes[0].ts), 0)
396	// Test reading rows with different timestamp bounds.
397	for i, test := range []struct {
398		name    string
399		want    [][]interface{}
400		tb      TimestampBound
401		checkTs func(time.Time) error
402	}{
403		{
404			name: "strong",
405			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
406			tb:   StrongRead(),
407			checkTs: func(ts time.Time) error {
408				// writes[3] is the last write, all subsequent strong read
409				// should have a timestamp larger than that.
410				if ts.Before(writes[3].ts) {
411					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
412				}
413				return nil
414			},
415		},
416		{
417			name: "min_read_timestamp",
418			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
419			tb:   MinReadTimestamp(writes[3].ts),
420			checkTs: func(ts time.Time) error {
421				if ts.Before(writes[3].ts) {
422					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
423				}
424				return nil
425			},
426		},
427		{
428			name: "max_staleness",
429			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
430			tb:   MaxStaleness(time.Second),
431			checkTs: func(ts time.Time) error {
432				if ts.Before(writes[3].ts) {
433					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
434				}
435				return nil
436			},
437		},
438		{
439			name: "read_timestamp",
440			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
441			tb:   ReadTimestamp(writes[2].ts),
442			checkTs: func(ts time.Time) error {
443				if ts != writes[2].ts {
444					return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts)
445				}
446				return nil
447			},
448		},
449		{
450			name: "exact_staleness",
451			want: nil,
452			// Specify a staleness which should be already before this test.
453			tb: ExactStaleness(time.Now().Sub(writes[0].ts) + timeDiff + 30*time.Second),
454			checkTs: func(ts time.Time) error {
455				if !ts.Before(writes[0].ts) {
456					return fmt.Errorf("read got timestamp %v, want it to be earlier than %v", ts, writes[0].ts)
457				}
458				return nil
459			},
460		},
461	} {
462		t.Run(test.name, func(t *testing.T) {
463			// SingleUse.Query
464			su := client.Single().WithTimestampBound(test.tb)
465			got, err := readAll(su.Query(
466				ctx,
467				Statement{
468					"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4) ORDER BY SingerId",
469					map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
470				}))
471			if err != nil {
472				t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err)
473			}
474			verifyDirectPathRemoteAddress(t)
475			rts, err := su.Timestamp()
476			if err != nil {
477				t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err)
478			}
479			if err := test.checkTs(rts); err != nil {
480				t.Fatalf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err)
481			}
482			if !testEqual(got, test.want) {
483				t.Fatalf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want)
484			}
485			// SingleUse.Read
486			su = client.Single().WithTimestampBound(test.tb)
487			got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
488			if err != nil {
489				t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err)
490			}
491			verifyDirectPathRemoteAddress(t)
492			rts, err = su.Timestamp()
493			if err != nil {
494				t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err)
495			}
496			if err := test.checkTs(rts); err != nil {
497				t.Fatalf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err)
498			}
499			if !testEqual(got, test.want) {
500				t.Fatalf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want)
501			}
502			// SingleUse.ReadRow
503			got = nil
504			for _, k := range []Key{{1}, {3}, {4}} {
505				su = client.Single().WithTimestampBound(test.tb)
506				r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
507				if err != nil {
508					continue
509				}
510				verifyDirectPathRemoteAddress(t)
511				v, err := rowToValues(r)
512				if err != nil {
513					continue
514				}
515				got = append(got, v)
516				rts, err = su.Timestamp()
517				if err != nil {
518					t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
519				}
520				if err := test.checkTs(rts); err != nil {
521					t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
522				}
523			}
524			if !testEqual(got, test.want) {
525				t.Fatalf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want)
526			}
527			// SingleUse.ReadUsingIndex
528			su = client.Single().WithTimestampBound(test.tb)
529			got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
530			if err != nil {
531				t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err)
532			}
533			verifyDirectPathRemoteAddress(t)
534			// The results from ReadUsingIndex is sorted by the index rather than primary key.
535			if len(got) != len(test.want) {
536				t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
537			}
538			for j, g := range got {
539				if j > 0 {
540					prev := got[j-1][1].(string) + got[j-1][2].(string)
541					curr := got[j][1].(string) + got[j][2].(string)
542					if strings.Compare(prev, curr) > 0 {
543						t.Fatalf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
544					}
545				}
546				found := false
547				for _, w := range test.want {
548					if testEqual(g, w) {
549						found = true
550					}
551				}
552				if !found {
553					t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
554					break
555				}
556			}
557			rts, err = su.Timestamp()
558			if err != nil {
559				t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
560			}
561			if err := test.checkTs(rts); err != nil {
562				t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
563			}
564			// SingleUse.ReadRowUsingIndex
565			got = nil
566			for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} {
567				su = client.Single().WithTimestampBound(test.tb)
568				r, err := su.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"})
569				if err != nil {
570					continue
571				}
572				verifyDirectPathRemoteAddress(t)
573				v, err := rowToValues(r)
574				if err != nil {
575					continue
576				}
577				got = append(got, v)
578				rts, err = su.Timestamp()
579				if err != nil {
580					t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err)
581				}
582				if err := test.checkTs(rts); err != nil {
583					t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err)
584				}
585			}
586			if !testEqual(got, test.want) {
587				t.Fatalf("%d: got unexpected results from SingleUse.ReadRowUsingIndex: %v, want %v", i, got, test.want)
588			}
589		})
590	}
593// Test custom query options provided on query-level configuration.
594func TestIntegration_SingleUse_WithQueryOptions(t *testing.T) {
595	skipEmulatorTest(t)
596	t.Parallel()
598	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
599	defer cancel()
600	// Set up testing environment.
601	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
602	defer cleanup()
604	writes := []struct {
605		row []interface{}
606		ts  time.Time
607	}{
608		{row: []interface{}{1, "Marc", "Foo"}},
609		{row: []interface{}{2, "Tars", "Bar"}},
610		{row: []interface{}{3, "Alpha", "Beta"}},
611		{row: []interface{}{4, "Last", "End"}},
612	}
613	// Try to write four rows through the Apply API.
614	for i, w := range writes {
615		var err error
616		m := InsertOrUpdate("Singers",
617			[]string{"SingerId", "FirstName", "LastName"},
618			w.row)
619		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
620			t.Fatal(err)
621		}
622	}
623	qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}}
624	got, err := readAll(client.Single().QueryWithOptions(ctx, Statement{
625		"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
626		map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
627	}, qo))
629	if err != nil {
630		t.Errorf("ReadOnlyTransaction.QueryWithOptions returns error %v, want nil", err)
631	}
633	want := [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}
634	if !testEqual(got, want) {
635		t.Errorf("got unexpected result from ReadOnlyTransaction.QueryWithOptions: %v, want %v", got, want)
636	}
639func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) {
640	t.Parallel()
642	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
643	defer cancel()
644	// Set up testing environment.
645	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
646	defer cleanup()
648	writes := []struct {
649		row []interface{}
650		ts  time.Time
651	}{
652		{row: []interface{}{1, "Marc", "Foo"}},
653		{row: []interface{}{2, "Tars", "Bar"}},
654		{row: []interface{}{3, "Alpha", "Beta"}},
655		{row: []interface{}{4, "Last", "End"}},
656	}
657	// Try to write four rows through the Apply API.
658	for i, w := range writes {
659		var err error
660		m := InsertOrUpdate("Singers",
661			[]string{"SingerId", "FirstName", "LastName"},
662			w.row)
663		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
664			t.Fatal(err)
665		}
666	}
668	su := client.Single()
669	const limit = 1
670	gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}),
671		[]string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit}))
672	if err != nil {
673		t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err)
674	}
675	if got, want := len(gotRows), limit; got != want {
676		t.Errorf("got %d, want %d", got, want)
677	}
680// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it
681// also tests for a single timestamp across multiple reads.
682func TestIntegration_ReadOnlyTransaction(t *testing.T) {
683	t.Parallel()
685	ctxTimeout := 5 * time.Minute
686	ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
687	defer cancel()
688	// Set up testing environment.
689	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
690	defer cleanup()
692	writes := []struct {
693		row []interface{}
694		ts  time.Time
695	}{
696		{row: []interface{}{1, "Marc", "Foo"}},
697		{row: []interface{}{2, "Tars", "Bar"}},
698		{row: []interface{}{3, "Alpha", "Beta"}},
699		{row: []interface{}{4, "Last", "End"}},
700	}
701	// Try to write four rows through the Apply API.
702	for i, w := range writes {
703		var err error
704		m := InsertOrUpdate("Singers",
705			[]string{"SingerId", "FirstName", "LastName"},
706			w.row)
707		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
708			t.Fatal(err)
709		}
710	}
712	// For testing timestamp bound staleness.
713	<-time.After(time.Second)
715	// Test reading rows with different timestamp bounds.
716	for i, test := range []struct {
717		want    [][]interface{}
718		tb      TimestampBound
719		checkTs func(time.Time) error
720	}{
721		// Note: min_read_timestamp and max_staleness are not supported by
722		// ReadOnlyTransaction. See API document for more details.
723		{
724			// strong
725			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
726			StrongRead(),
727			func(ts time.Time) error {
728				if ts.Before(writes[3].ts) {
729					return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
730				}
731				return nil
732			},
733		},
734		{
735			// read_timestamp
736			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
737			ReadTimestamp(writes[2].ts),
738			func(ts time.Time) error {
739				if ts != writes[2].ts {
740					return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts)
741				}
742				return nil
743			},
744		},
745		{
746			// exact_staleness
747			nil,
748			// Specify a staleness which should be already before this test.
749			ExactStaleness(ctxTimeout + 1),
750			func(ts time.Time) error {
751				if ts.After(writes[0].ts) {
752					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts)
753				}
754				return nil
755			},
756		},
757	} {
758		// ReadOnlyTransaction.Query
759		ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb)
760		got, err := readAll(ro.Query(
761			ctx,
762			Statement{
763				"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4) ORDER BY SingerId",
764				map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
765			}))
766		if err != nil {
767			t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err)
768		}
769		if !testEqual(got, test.want) {
770			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want)
771		}
772		rts, err := ro.Timestamp()
773		if err != nil {
774			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err)
775		}
776		if err := test.checkTs(rts); err != nil {
777			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err)
778		}
779		roTs := rts
780		// ReadOnlyTransaction.Read
781		got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
782		if err != nil {
783			t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err)
784		}
785		if !testEqual(got, test.want) {
786			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want)
787		}
788		rts, err = ro.Timestamp()
789		if err != nil {
790			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err)
791		}
792		if err := test.checkTs(rts); err != nil {
793			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err)
794		}
795		if roTs != rts {
796			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
797		}
798		// ReadOnlyTransaction.ReadRow
799		got = nil
800		for _, k := range []Key{{1}, {3}, {4}} {
801			r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
802			if err != nil {
803				continue
804			}
805			v, err := rowToValues(r)
806			if err != nil {
807				continue
808			}
809			got = append(got, v)
810			rts, err = ro.Timestamp()
811			if err != nil {
812				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
813			}
814			if err := test.checkTs(rts); err != nil {
815				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
816			}
817			if roTs != rts {
818				t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
819			}
820		}
821		if !testEqual(got, test.want) {
822			t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want)
823		}
824		// SingleUse.ReadUsingIndex
825		got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
826		if err != nil {
827			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err)
828		}
829		// The results from ReadUsingIndex is sorted by the index rather than
830		// primary key.
831		if len(got) != len(test.want) {
832			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
833		}
834		for j, g := range got {
835			if j > 0 {
836				prev := got[j-1][1].(string) + got[j-1][2].(string)
837				curr := got[j][1].(string) + got[j][2].(string)
838				if strings.Compare(prev, curr) > 0 {
839					t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
840				}
841			}
842			found := false
843			for _, w := range test.want {
844				if testEqual(g, w) {
845					found = true
846				}
847			}
848			if !found {
849				t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
850				break
851			}
852		}
853		rts, err = ro.Timestamp()
854		if err != nil {
855			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
856		}
857		if err := test.checkTs(rts); err != nil {
858			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
859		}
860		if roTs != rts {
861			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
862		}
863		// ReadOnlyTransaction.ReadRowUsingIndex
864		got = nil
865		for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} {
866			r, err := ro.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"})
867			if err != nil {
868				continue
869			}
870			v, err := rowToValues(r)
871			if err != nil {
872				continue
873			}
874			got = append(got, v)
875			rts, err = ro.Timestamp()
876			if err != nil {
877				t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err)
878			}
879			if err := test.checkTs(rts); err != nil {
880				t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err)
881			}
882			if roTs != rts {
883				t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
884			}
885		}
886		if !testEqual(got, test.want) {
887			t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRowUsingIndex: %v, want %v", i, got, test.want)
888		}
889		ro.Close()
890	}
893// Test ReadOnlyTransaction with different timestamp bound when there's an
894// update at the same time.
895func TestIntegration_UpdateDuringRead(t *testing.T) {
896	t.Parallel()
898	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
899	defer cancel()
900	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
901	defer cleanup()
903	for i, tb := range []TimestampBound{
904		StrongRead(),
905		ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour
906		ExactStaleness(time.Minute * 30),
907	} {
908		ro := client.ReadOnlyTransaction().WithTimestampBound(tb)
909		_, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
910		if ErrCode(err) != codes.NotFound {
911			t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err)
912		}
914		m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i})
915		if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
916			t.Fatal(err)
917		}
919		_, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
920		if ErrCode(err) != codes.NotFound {
921			t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err)
922		}
923	}
926// Test ReadWriteTransaction.
927func TestIntegration_ReadWriteTransaction(t *testing.T) {
928	t.Parallel()
930	// Give a longer deadline because of transaction backoffs.
931	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
932	defer cancel()
933	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
934	defer cleanup()
936	// Set up two accounts
937	accounts := []*Mutation{
938		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
939		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
940	}
941	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
942		t.Fatal(err)
943	}
944	wg := sync.WaitGroup{}
946	readBalance := func(iter *RowIterator) (int64, error) {
947		defer iter.Stop()
948		var bal int64
949		for {
950			row, err := iter.Next()
951			if err == iterator.Done {
952				return bal, nil
953			}
954			if err != nil {
955				return 0, err
956			}
957			if err := row.Column(0, &bal); err != nil {
958				return 0, err
959			}
960		}
961	}
963	for i := 0; i < 20; i++ {
964		wg.Add(1)
965		go func(iter int) {
966			defer wg.Done()
967			_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
968				// Query Foo's balance and Bar's balance.
969				bf, e := readBalance(tx.Query(ctx,
970					Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}}))
971				if e != nil {
972					return e
973				}
974				verifyDirectPathRemoteAddress(t)
975				bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
976				if e != nil {
977					return e
978				}
979				verifyDirectPathRemoteAddress(t)
980				if bf <= 0 {
981					return nil
982				}
983				bf--
984				bb++
985				return tx.BufferWrite([]*Mutation{
986					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}),
987					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}),
988				})
989			})
990			if err != nil {
991				t.Errorf("%d: failed to execute transaction: %v", iter, err)
992			}
993			verifyDirectPathRemoteAddress(t)
994		}(i)
995	}
996	// Because of context timeout, all goroutines will eventually return.
997	wg.Wait()
998	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
999		var bf, bb int64
1000		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"})
1001		if e != nil {
1002			return e
1003		}
1004		verifyDirectPathRemoteAddress(t)
1005		if ce := r.Column(0, &bf); ce != nil {
1006			return ce
1007		}
1008		bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"}))
1009		if e != nil {
1010			return e
1011		}
1012		verifyDirectPathRemoteAddress(t)
1013		if bf != 30 || bb != 21 {
1014			t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21)
1015		}
1016		return nil
1017	})
1018	if err != nil {
1019		t.Errorf("failed to check balances: %v", err)
1020	}
1021	verifyDirectPathRemoteAddress(t)
1024// Test ReadWriteTransactionWithOptions.
1025func TestIntegration_ReadWriteTransactionWithOptions(t *testing.T) {
1026	t.Parallel()
1027	skipEmulatorTest(t)
1029	// Give a longer deadline because of transaction backoffs.
1030	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1031	defer cancel()
1032	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1033	defer cleanup()
1035	// Set up two accounts
1036	accounts := []*Mutation{
1037		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1038		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1039	}
1040	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1041		t.Fatal(err)
1042	}
1044	readBalance := func(iter *RowIterator) (int64, error) {
1045		defer iter.Stop()
1046		var bal int64
1047		for {
1048			row, err := iter.Next()
1049			if err == iterator.Done {
1050				return bal, nil
1051			}
1052			if err != nil {
1053				return 0, err
1054			}
1055			if err := row.Column(0, &bal); err != nil {
1056				return 0, err
1057			}
1058		}
1059	}
1061	txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}
1062	resp, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1063		// Query Foo's balance and Bar's balance.
1064		bf, e := readBalance(tx.Query(ctx,
1065			Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}}))
1066		if e != nil {
1067			return e
1068		}
1069		bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
1070		if e != nil {
1071			return e
1072		}
1073		if bf <= 0 {
1074			return nil
1075		}
1076		bf--
1077		bb++
1078		return tx.BufferWrite([]*Mutation{
1079			Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}),
1080			Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}),
1081		})
1082	}, txOpts)
1083	if err != nil {
1084		t.Fatalf("Failed to execute transaction: %v", err)
1085	}
1086	if resp.CommitStats == nil {
1087		t.Fatal("Missing commit stats in commit response")
1088	}
1089	if got, want := resp.CommitStats.MutationCount, int64(8); got != want {
1090		t.Errorf("Mismatch mutation count - got: %v, want: %v", got, want)
1091	}
1094func TestIntegration_ReadWriteTransaction_StatementBased(t *testing.T) {
1095	t.Parallel()
1096	skipEmulatorTest(t)
1098	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1099	defer cancel()
1100	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1101	defer cleanup()
1103	// Set up two accounts
1104	accounts := []*Mutation{
1105		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1106		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1107	}
1108	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1109		t.Fatal(err)
1110	}
1112	getBalance := func(txn *ReadWriteStmtBasedTransaction, key Key) (int64, error) {
1113		row, err := txn.ReadRow(ctx, "Accounts", key, []string{"Balance"})
1114		if err != nil {
1115			return 0, err
1116		}
1117		var balance int64
1118		if err := row.Column(0, &balance); err != nil {
1119			return 0, err
1120		}
1121		return balance, nil
1122	}
1124	statements := func(txn *ReadWriteStmtBasedTransaction) error {
1125		outBalance, err := getBalance(txn, Key{1})
1126		if err != nil {
1127			return err
1128		}
1129		const transferAmt = 20
1130		if outBalance >= transferAmt {
1131			inBalance, err := getBalance(txn, Key{2})
1132			if err != nil {
1133				return err
1134			}
1135			inBalance += transferAmt
1136			outBalance -= transferAmt
1137			cols := []string{"AccountId", "Balance"}
1138			txn.BufferWrite([]*Mutation{
1139				Update("Accounts", cols, []interface{}{1, outBalance}),
1140				Update("Accounts", cols, []interface{}{2, inBalance}),
1141			})
1142		}
1143		return nil
1144	}
1146	for {
1147		tx, err := NewReadWriteStmtBasedTransaction(ctx, client)
1148		if err != nil {
1149			t.Fatalf("failed to begin a transaction: %v", err)
1150		}
1151		err = statements(tx)
1152		if err != nil && status.Code(err) != codes.Aborted {
1153			tx.Rollback(ctx)
1154			t.Fatalf("failed to execute statements: %v", err)
1155		} else if err == nil {
1156			_, err = tx.Commit(ctx)
1157			if err == nil {
1158				break
1159			} else if status.Code(err) != codes.Aborted {
1160				t.Fatalf("failed to commit a transaction: %v", err)
1161			}
1162		}
1163		// Set a default sleep time if the server delay is absent.
1164		delay := 10 * time.Millisecond
1165		if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay {
1166			delay = serverDelay
1167		}
1168		time.Sleep(delay)
1169	}
1171	// Query the updated values.
1172	stmt := Statement{SQL: `SELECT AccountId, Nickname, Balance FROM Accounts ORDER BY AccountId`}
1173	iter := client.Single().Query(ctx, stmt)
1174	got, err := readAllAccountsTable(iter)
1175	if err != nil {
1176		t.Fatalf("failed to read data: %v", err)
1177	}
1178	want := [][]interface{}{
1179		{int64(1), "Foo", int64(30)},
1180		{int64(2), "Bar", int64(21)},
1181	}
1182	if !testEqual(got, want) {
1183		t.Errorf("\ngot %v\nwant%v", got, want)
1184	}
1187func TestIntegration_ReadWriteTransaction_StatementBasedWithOptions(t *testing.T) {
1188	t.Parallel()
1189	skipEmulatorTest(t)
1191	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1192	defer cancel()
1193	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1194	defer cleanup()
1196	// Set up two accounts
1197	accounts := []*Mutation{
1198		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1199		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1200	}
1201	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1202		t.Fatal(err)
1203	}
1205	getBalance := func(txn *ReadWriteStmtBasedTransaction, key Key) (int64, error) {
1206		row, err := txn.ReadRow(ctx, "Accounts", key, []string{"Balance"})
1207		if err != nil {
1208			return 0, err
1209		}
1210		var balance int64
1211		if err := row.Column(0, &balance); err != nil {
1212			return 0, err
1213		}
1214		return balance, nil
1215	}
1217	statements := func(txn *ReadWriteStmtBasedTransaction) error {
1218		outBalance, err := getBalance(txn, Key{1})
1219		if err != nil {
1220			return err
1221		}
1222		const transferAmt = 20
1223		if outBalance >= transferAmt {
1224			inBalance, err := getBalance(txn, Key{2})
1225			if err != nil {
1226				return err
1227			}
1228			inBalance += transferAmt
1229			outBalance -= transferAmt
1230			cols := []string{"AccountId", "Balance"}
1231			txn.BufferWrite([]*Mutation{
1232				Update("Accounts", cols, []interface{}{1, outBalance}),
1233				Update("Accounts", cols, []interface{}{2, inBalance}),
1234			})
1235		}
1236		return nil
1237	}
1239	var resp CommitResponse
1240	txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}
1241	for {
1242		tx, err := NewReadWriteStmtBasedTransactionWithOptions(ctx, client, txOpts)
1243		if err != nil {
1244			t.Fatalf("failed to begin a transaction: %v", err)
1245		}
1246		err = statements(tx)
1247		if err != nil && status.Code(err) != codes.Aborted {
1248			tx.Rollback(ctx)
1249			t.Fatalf("failed to execute statements: %v", err)
1250		} else if err == nil {
1251			resp, err = tx.CommitWithReturnResp(ctx)
1252			if err == nil {
1253				break
1254			} else if status.Code(err) != codes.Aborted {
1255				t.Fatalf("failed to commit a transaction: %v", err)
1256			}
1257		}
1258		// Set a default sleep time if the server delay is absent.
1259		delay := 10 * time.Millisecond
1260		if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay {
1261			delay = serverDelay
1262		}
1263		time.Sleep(delay)
1264	}
1265	if resp.CommitStats == nil {
1266		t.Fatal("Missing commit stats in commit response")
1267	}
1268	if got, want := resp.CommitStats.MutationCount, int64(8); got != want {
1269		t.Errorf("Mismatch mutation count - got: %v, want: %v", got, want)
1270	}
1273func TestIntegration_Reads(t *testing.T) {
1274	t.Parallel()
1276	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1277	defer cancel()
1278	// Set up testing environment.
1279	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
1280	defer cleanup()
1282	// Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2".
1283	var ms []*Mutation
1284	for i := 0; i < 15; i++ {
1285		ms = append(ms, InsertOrUpdate(testTable,
1286			testTableColumns,
1287			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
1288	}
1289	// Don't use ApplyAtLeastOnce, so we can test the other code path.
1290	if _, err := client.Apply(ctx, ms); err != nil {
1291		t.Fatal(err)
1292	}
1293	verifyDirectPathRemoteAddress(t)
1295	// Empty read.
1296	rows, err := readAllTestTable(client.Single().Read(ctx, testTable,
1297		KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns))
1298	if err != nil {
1299		t.Fatal(err)
1300	}
1301	verifyDirectPathRemoteAddress(t)
1302	if got, want := len(rows), 0; got != want {
1303		t.Errorf("got %d, want %d", got, want)
1304	}
1306	// Index empty read.
1307	rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex,
1308		KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns))
1309	if err != nil {
1310		t.Fatal(err)
1311	}
1312	verifyDirectPathRemoteAddress(t)
1313	if got, want := len(rows), 0; got != want {
1314		t.Errorf("got %d, want %d", got, want)
1315	}
1317	// Point read.
1318	row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns)
1319	if err != nil {
1320		t.Fatal(err)
1321	}
1322	verifyDirectPathRemoteAddress(t)
1323	var got testTableRow
1324	if err := row.ToStruct(&got); err != nil {
1325		t.Fatal(err)
1326	}
1327	if want := (testTableRow{"k1", "v1"}); got != want {
1328		t.Errorf("got %v, want %v", got, want)
1329	}
1331	// Point read not found.
1332	_, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns)
1333	if ErrCode(err) != codes.NotFound {
1334		t.Fatalf("got %v, want NotFound", err)
1335	}
1336	verifyDirectPathRemoteAddress(t)
1338	// Index point read.
1339	rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns)
1340	if err != nil {
1341		t.Fatal(err)
1342	}
1343	verifyDirectPathRemoteAddress(t)
1344	var gotIndex testTableRow
1345	if err := rowIndex.ToStruct(&gotIndex); err != nil {
1346		t.Fatal(err)
1347	}
1348	if wantIndex := (testTableRow{"k1", "v1"}); gotIndex != wantIndex {
1349		t.Errorf("got %v, want %v", gotIndex, wantIndex)
1350	}
1351	// Index point read not found.
1352	_, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v999"}, testTableColumns)
1353	if ErrCode(err) != codes.NotFound {
1354		t.Fatalf("got %v, want NotFound", err)
1355	}
1356	verifyDirectPathRemoteAddress(t)
1357	rangeReads(ctx, t, client)
1358	indexRangeReads(ctx, t, client)
1361func TestIntegration_EarlyTimestamp(t *testing.T) {
1362	t.Parallel()
1364	// Test that we can get the timestamp from a read-only transaction as
1365	// soon as we have read at least one row.
1366	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1367	defer cancel()
1368	// Set up testing environment.
1369	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
1370	defer cleanup()
1372	var ms []*Mutation
1373	for i := 0; i < 3; i++ {
1374		ms = append(ms, InsertOrUpdate(testTable,
1375			testTableColumns,
1376			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
1377	}
1378	if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
1379		t.Fatal(err)
1380	}
1382	txn := client.Single()
1383	iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns)
1384	defer iter.Stop()
1385	// In single-use transaction, we should get an error before reading anything.
1386	if _, err := txn.Timestamp(); err == nil {
1387		t.Error("wanted error, got nil")
1388	}
1389	// After reading one row, the timestamp should be available.
1390	_, err := iter.Next()
1391	if err != nil {
1392		t.Fatal(err)
1393	}
1394	if _, err := txn.Timestamp(); err != nil {
1395		t.Errorf("got %v, want nil", err)
1396	}
1398	txn = client.ReadOnlyTransaction()
1399	defer txn.Close()
1400	iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns)
1401	defer iter.Stop()
1402	// In an ordinary read-only transaction, the timestamp should be
1403	// available immediately.
1404	if _, err := txn.Timestamp(); err != nil {
1405		t.Errorf("got %v, want nil", err)
1406	}
1409func TestIntegration_NestedTransaction(t *testing.T) {
1410	t.Parallel()
1412	// You cannot use a transaction from inside a read-write transaction.
1413	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1414	defer cancel()
1415	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1416	defer cleanup()
1418	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1419		_, err := client.ReadWriteTransaction(ctx,
1420			func(context.Context, *ReadWriteTransaction) error { return nil })
1421		if ErrCode(err) != codes.FailedPrecondition {
1422			t.Fatalf("got %v, want FailedPrecondition", err)
1423		}
1424		_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
1425		if ErrCode(err) != codes.FailedPrecondition {
1426			t.Fatalf("got %v, want FailedPrecondition", err)
1427		}
1428		rot := client.ReadOnlyTransaction()
1429		defer rot.Close()
1430		_, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
1431		if ErrCode(err) != codes.FailedPrecondition {
1432			t.Fatalf("got %v, want FailedPrecondition", err)
1433		}
1434		return nil
1435	})
1436	if err != nil {
1437		t.Fatal(err)
1438	}
1441func TestIntegration_CreateDBRetry(t *testing.T) {
1442	t.Parallel()
1444	if databaseAdmin == nil {
1445		t.Skip("Integration tests skipped")
1446	}
1447	skipEmulatorTest(t)
1449	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1450	defer cancel()
1451	dbName := dbNameSpace.New()
1453	// Simulate an Unavailable error on the CreateDatabase RPC.
1454	hasReturnedUnavailable := false
1455	interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1456		err := invoker(ctx, method, req, reply, cc, opts...)
1457		if !hasReturnedUnavailable && err == nil {
1458			hasReturnedUnavailable = true
1459			return status.Error(codes.Unavailable, "Injected unavailable error")
1460		}
1461		return err
1462	}
1464	dbAdmin, err := database.NewDatabaseAdminClient(ctx, option.WithGRPCDialOption(grpc.WithUnaryInterceptor(interceptor)))
1465	if err != nil {
1466		log.Fatalf("cannot create dbAdmin client: %v", err)
1467	}
1468	op, err := dbAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{
1469		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
1470		CreateStatement: "CREATE DATABASE " + dbName,
1471	})
1472	if err != nil {
1473		t.Fatalf("failed to create database: %v", err)
1474	}
1475	_, err = op.Wait(ctx)
1476	if err != nil {
1477		t.Fatalf("failed to create database: %v", err)
1478	}
1479	if !hasReturnedUnavailable {
1480		t.Fatalf("interceptor did not return Unavailable")
1481	}
1484// Test client recovery on database recreation.
1485func TestIntegration_DbRemovalRecovery(t *testing.T) {
1486	t.Parallel()
1488	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1489	defer cancel()
1490	// Create a client with MinOpened=0 to prevent the session pool maintainer
1491	// from repeatedly trying to create sessions for the invalid database.
1492	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, SessionPoolConfig{}, singerDBStatements)
1493	defer cleanup()
1495	// Drop the testing database.
1496	if err := databaseAdmin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil {
1497		t.Fatalf("failed to drop testing database %v: %v", dbPath, err)
1498	}
1500	// Now, send the query.
1501	iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
1502	defer iter.Stop()
1503	if _, err := iter.Next(); err == nil {
1504		t.Errorf("client sends query to removed database successfully, want it to fail")
1505	}
1506	verifyDirectPathRemoteAddress(t)
1508	// Recreate database and table.
1509	dbName := dbPath[strings.LastIndex(dbPath, "/")+1:]
1510	op, err := databaseAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{
1511		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
1512		CreateStatement: "CREATE DATABASE " + dbName,
1513		ExtraStatements: []string{
1514			`CREATE TABLE Singers (
1515				SingerId        INT64 NOT NULL,
1516				FirstName       STRING(1024),
1517				LastName        STRING(1024),
1518				SingerInfo      BYTES(MAX)
1519			) PRIMARY KEY (SingerId)`,
1520		},
1521	})
1522	if err != nil {
1523		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
1524	}
1525	if _, err := op.Wait(ctx); err != nil {
1526		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
1527	}
1529	// Now, send the query again.
1530	iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
1531	defer iter.Stop()
1532	_, err = iter.Next()
1533	if err != nil && err != iterator.Done {
1534		t.Errorf("failed to send query to database %v: %v", dbPath, err)
1535	}
1536	verifyDirectPathRemoteAddress(t)
1539// Test encoding/decoding non-struct Cloud Spanner types.
1540func TestIntegration_BasicTypes(t *testing.T) {
1541	t.Parallel()
1543	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1544	defer cancel()
1545	stmts := singerDBStatements
1546	if !isEmulatorEnvSet() {
1547		stmts = []string{
1548			`CREATE TABLE Singers (
1549					SingerId	INT64 NOT NULL,
1550					FirstName	STRING(1024),
1551					LastName	STRING(1024),
1552					SingerInfo	BYTES(MAX)
1553				) PRIMARY KEY (SingerId)`,
1554			`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
1555			`CREATE TABLE Accounts (
1556					AccountId	INT64 NOT NULL,
1557					Nickname	STRING(100),
1558					Balance		INT64 NOT NULL,
1559				) PRIMARY KEY (AccountId)`,
1560			`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
1561			`CREATE TABLE Types (
1562					RowID		INT64 NOT NULL,
1563					String		STRING(MAX),
1564					StringArray	ARRAY<STRING(MAX)>,
1565					Bytes		BYTES(MAX),
1566					BytesArray	ARRAY<BYTES(MAX)>,
1567					Int64a		INT64,
1568					Int64Array	ARRAY<INT64>,
1569					Bool		BOOL,
1570					BoolArray	ARRAY<BOOL>,
1571					Float64		FLOAT64,
1572					Float64Array	ARRAY<FLOAT64>,
1573					Date		DATE,
1574					DateArray	ARRAY<DATE>,
1575					Timestamp	TIMESTAMP,
1576					TimestampArray	ARRAY<TIMESTAMP>,
1577					Numeric		NUMERIC,
1578					NumericArray	ARRAY<NUMERIC>
1579				) PRIMARY KEY (RowID)`,
1580		}
1581	}
1582	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, stmts)
1583	defer cleanup()
1585	t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z")
1586	// Boundaries
1587	t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z")
1588	t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z")
1589	d1, _ := civil.ParseDate("2016-11-15")
1590	// Boundaries
1591	d2, _ := civil.ParseDate("0001-01-01")
1592	d3, _ := civil.ParseDate("9999-12-31")
1594	n0 := big.Rat{}
1595	n1p, _ := (&big.Rat{}).SetString("123456789")
1596	n2p, _ := (&big.Rat{}).SetString("123456789/1000000000")
1597	n1 := *n1p
1598	n2 := *n2p
1600	tests := []struct {
1601		col  string
1602		val  interface{}
1603		want interface{}
1604	}{
1605		{col: "String", val: ""},
1606		{col: "String", val: "", want: NullString{"", true}},
1607		{col: "String", val: "foo"},
1608		{col: "String", val: "foo", want: NullString{"foo", true}},
1609		{col: "String", val: NullString{"bar", true}, want: "bar"},
1610		{col: "String", val: NullString{"bar", false}, want: NullString{"", false}},
1611		{col: "String", val: nil, want: NullString{}},
1612		{col: "StringArray", val: []string(nil), want: []NullString(nil)},
1613		{col: "StringArray", val: []string{}, want: []NullString{}},
1614		{col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}},
1615		{col: "StringArray", val: []NullString(nil)},
1616		{col: "StringArray", val: []NullString{}},
1617		{col: "StringArray", val: []NullString{{"foo", true}, {}}},
1618		{col: "Bytes", val: []byte{}},
1619		{col: "Bytes", val: []byte{1, 2, 3}},
1620		{col: "Bytes", val: []byte(nil)},
1621		{col: "BytesArray", val: [][]byte(nil)},
1622		{col: "BytesArray", val: [][]byte{}},
1623		{col: "BytesArray", val: [][]byte{{1}, {2, 3}}},
1624		{col: "Int64a", val: 0, want: int64(0)},
1625		{col: "Int64a", val: -1, want: int64(-1)},
1626		{col: "Int64a", val: 2, want: int64(2)},
1627		{col: "Int64a", val: int64(3)},
1628		{col: "Int64a", val: 4, want: NullInt64{4, true}},
1629		{col: "Int64a", val: NullInt64{5, true}, want: int64(5)},
1630		{col: "Int64a", val: NullInt64{6, true}, want: int64(6)},
1631		{col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}},
1632		{col: "Int64a", val: nil, want: NullInt64{}},
1633		{col: "Int64Array", val: []int(nil), want: []NullInt64(nil)},
1634		{col: "Int64Array", val: []int{}, want: []NullInt64{}},
1635		{col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
1636		{col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)},
1637		{col: "Int64Array", val: []int64{}, want: []NullInt64{}},
1638		{col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
1639		{col: "Int64Array", val: []NullInt64(nil)},
1640		{col: "Int64Array", val: []NullInt64{}},
1641		{col: "Int64Array", val: []NullInt64{{1, true}, {}}},
1642		{col: "Bool", val: false},
1643		{col: "Bool", val: true},
1644		{col: "Bool", val: false, want: NullBool{false, true}},
1645		{col: "Bool", val: true, want: NullBool{true, true}},
1646		{col: "Bool", val: NullBool{true, true}},
1647		{col: "Bool", val: NullBool{false, false}},
1648		{col: "Bool", val: nil, want: NullBool{}},
1649		{col: "BoolArray", val: []bool(nil), want: []NullBool(nil)},
1650		{col: "BoolArray", val: []bool{}, want: []NullBool{}},
1651		{col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}},
1652		{col: "BoolArray", val: []NullBool(nil)},
1653		{col: "BoolArray", val: []NullBool{}},
1654		{col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}},
1655		{col: "Float64", val: 0.0},
1656		{col: "Float64", val: 3.14},
1657		{col: "Float64", val: math.NaN()},
1658		{col: "Float64", val: math.Inf(1)},
1659		{col: "Float64", val: math.Inf(-1)},
1660		{col: "Float64", val: 2.78, want: NullFloat64{2.78, true}},
1661		{col: "Float64", val: NullFloat64{2.71, true}, want: 2.71},
1662		{col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}},
1663		{col: "Float64", val: NullFloat64{0, false}},
1664		{col: "Float64", val: nil, want: NullFloat64{}},
1665		{col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)},
1666		{col: "Float64Array", val: []float64{}, want: []NullFloat64{}},
1667		{col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}},
1668		{col: "Float64Array", val: []NullFloat64(nil)},
1669		{col: "Float64Array", val: []NullFloat64{}},
1670		{col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}},
1671		{col: "Date", val: d1},
1672		{col: "Date", val: d1, want: NullDate{d1, true}},
1673		{col: "Date", val: NullDate{d1, true}},
1674		{col: "Date", val: NullDate{d1, true}, want: d1},
1675		{col: "Date", val: NullDate{civil.Date{}, false}},
1676		{col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)},
1677		{col: "DateArray", val: []civil.Date{}, want: []NullDate{}},
1678		{col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}},
1679		{col: "Timestamp", val: t1},
1680		{col: "Timestamp", val: t1, want: NullTime{t1, true}},
1681		{col: "Timestamp", val: NullTime{t1, true}},
1682		{col: "Timestamp", val: NullTime{t1, true}, want: t1},
1683		{col: "Timestamp", val: NullTime{}},
1684		{col: "Timestamp", val: nil, want: NullTime{}},
1685		{col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)},
1686		{col: "TimestampArray", val: []time.Time{}, want: []NullTime{}},
1687		{col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}},
1688	}
1690	if !isEmulatorEnvSet() {
1691		for _, tc := range []struct {
1692			col  string
1693			val  interface{}
1694			want interface{}
1695		}{
1696			{col: "Numeric", val: n1},
1697			{col: "Numeric", val: n2},
1698			{col: "Numeric", val: n1, want: NullNumeric{n1, true}},
1699			{col: "Numeric", val: n2, want: NullNumeric{n2, true}},
1700			{col: "Numeric", val: NullNumeric{n1, true}, want: n1},
1701			{col: "Numeric", val: NullNumeric{n1, true}, want: NullNumeric{n1, true}},
1702			{col: "Numeric", val: NullNumeric{n0, false}},
1703			{col: "Numeric", val: nil, want: NullNumeric{}},
1704			{col: "NumericArray", val: []big.Rat(nil), want: []NullNumeric(nil)},
1705			{col: "NumericArray", val: []big.Rat{}, want: []NullNumeric{}},
1706			{col: "NumericArray", val: []big.Rat{n1, n2}, want: []NullNumeric{{n1, true}, {n2, true}}},
1707			{col: "NumericArray", val: []NullNumeric(nil)},
1708			{col: "NumericArray", val: []NullNumeric{}},
1709			{col: "NumericArray", val: []NullNumeric{{n1, true}, {n2, true}, {}}},
1710		} {
1711			tests = append(tests, tc)
1712		}
1713	}
1715	// Write rows into table first.
1716	var muts []*Mutation
1717	for i, test := range tests {
1718		muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val}))
1719	}
1720	if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil {
1721		t.Fatal(err)
1722	}
1724	for i, test := range tests {
1725		row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col})
1726		if err != nil {
1727			t.Fatalf("Unable to fetch row %v: %v", i, err)
1728		}
1729		verifyDirectPathRemoteAddress(t)
1730		// Create new instance of type of test.want.
1731		want := test.want
1732		if want == nil {
1733			want = test.val
1734		}
1735		gotp := reflect.New(reflect.TypeOf(want))
1736		if err := row.Column(0, gotp.Interface()); err != nil {
1737			t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err)
1738			continue
1739		}
1740		got := reflect.Indirect(gotp).Interface()
1742		// One of the test cases is checking NaN handling.  Given
1743		// NaN!=NaN, we can't use reflect to test for it.
1744		if isNaN(got) && isNaN(want) {
1745			continue
1746		}
1748		// Check non-NaN cases.
1749		if !testEqual(got, want) {
1750			t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want)
1751			continue
1752		}
1753	}
1756// Test decoding Cloud Spanner STRUCT type.
1757func TestIntegration_StructTypes(t *testing.T) {
1758	t.Parallel()
1760	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1761	defer cancel()
1762	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1763	defer cleanup()
1765	tests := []struct {
1766		q    Statement
1767		want func(r *Row) error
1768	}{
1769		{
1770			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`},
1771			want: func(r *Row) error {
1772				// Test STRUCT ARRAY decoding to []NullRow.
1773				var rows []NullRow
1774				if err := r.Column(0, &rows); err != nil {
1775					return err
1776				}
1777				if len(rows) != 1 {
1778					return fmt.Errorf("len(rows) = %d; want 1", len(rows))
1779				}
1780				if !rows[0].Valid {
1781					return fmt.Errorf("rows[0] is NULL")
1782				}
1783				var i, j int64
1784				if err := rows[0].Row.Columns(&i, &j); err != nil {
1785					return err
1786				}
1787				if i != 1 || j != 2 {
1788					return fmt.Errorf("got (%d,%d), want (1,2)", i, j)
1789				}
1790				return nil
1791			},
1792		},
1793		{
1794			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`},
1795			want: func(r *Row) error {
1796				// Test Row.ToStruct.
1797				s := struct {
1798					Col1 []*struct {
1799						Foo int64 `spanner:"foo"`
1800						Bar int64 `spanner:"bar"`
1801					} `spanner:"col1"`
1802				}{}
1803				if err := r.ToStruct(&s); err != nil {
1804					return err
1805				}
1806				want := struct {
1807					Col1 []*struct {
1808						Foo int64 `spanner:"foo"`
1809						Bar int64 `spanner:"bar"`
1810					} `spanner:"col1"`
1811				}{
1812					Col1: []*struct {
1813						Foo int64 `spanner:"foo"`
1814						Bar int64 `spanner:"bar"`
1815					}{
1816						{
1817							Foo: 1,
1818							Bar: 2,
1819						},
1820					},
1821				}
1822				if !testEqual(want, s) {
1823					return fmt.Errorf("unexpected decoding result: %v, want %v", s, want)
1824				}
1825				return nil
1826			},
1827		},
1828	}
1829	for i, test := range tests {
1830		iter := client.Single().Query(ctx, test.q)
1831		defer iter.Stop()
1832		row, err := iter.Next()
1833		if err != nil {
1834			t.Errorf("%d: %v", i, err)
1835			continue
1836		}
1837		if err := test.want(row); err != nil {
1838			t.Errorf("%d: %v", i, err)
1839			continue
1840		}
1841	}
1844func TestIntegration_StructParametersUnsupported(t *testing.T) {
1845	skipEmulatorTest(t)
1846	t.Parallel()
1848	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1849	defer cancel()
1850	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
1851	defer cleanup()
1853	for _, test := range []struct {
1854		param       interface{}
1855		wantCode    codes.Code
1856		wantMsgPart string
1857	}{
1858		{
1859			struct {
1860				Field int
1861			}{10},
1862			codes.Unimplemented,
1863			"Unsupported query shape: " +
1864				"A struct value cannot be returned as a column value. " +
1865				"Rewrite the query to flatten the struct fields in the result.",
1866		},
1867		{
1868			[]struct {
1869				Field int
1870			}{{10}, {20}},
1871			codes.Unimplemented,
1872			"Unsupported query shape: " +
1873				"This query can return a null-valued array of struct, " +
1874				"which is not supported by Spanner.",
1875		},
1876	} {
1877		iter := client.Single().Query(ctx, Statement{
1878			SQL:    "SELECT @p",
1879			Params: map[string]interface{}{"p": test.param},
1880		})
1881		_, err := iter.Next()
1882		iter.Stop()
1883		if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok {
1884			t.Fatal(msg)
1885		}
1886	}
1889// Test queries of the form "SELECT expr".
1890func TestIntegration_QueryExpressions(t *testing.T) {
1891	t.Parallel()
1893	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1894	defer cancel()
1895	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
1896	defer cleanup()
1898	newRow := func(vals []interface{}) *Row {
1899		row, err := NewRow(make([]string, len(vals)), vals)
1900		if err != nil {
1901			t.Fatal(err)
1902		}
1903		return row
1904	}
1906	tests := []struct {
1907		expr string
1908		want interface{}
1909	}{
1910		{"1", int64(1)},
1911		{"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}},
1912		{"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}},
1913		{"IEEE_DIVIDE(1, 0)", math.Inf(1)},
1914		{"IEEE_DIVIDE(-1, 0)", math.Inf(-1)},
1915		{"IEEE_DIVIDE(0, 0)", math.NaN()},
1916		// TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate.
1917		{"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}},
1918		{"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}},
1919		{"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}},
1920	}
1921	for _, test := range tests {
1922		iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr})
1923		defer iter.Stop()
1924		row, err := iter.Next()
1925		if err != nil {
1926			t.Errorf("%q: %v", test.expr, err)
1927			continue
1928		}
1929		// Create new instance of type of test.want.
1930		gotp := reflect.New(reflect.TypeOf(test.want))
1931		if err := row.Column(0, gotp.Interface()); err != nil {
1932			t.Errorf("%q: Column returned error %v", test.expr, err)
1933			continue
1934		}
1935		got := reflect.Indirect(gotp).Interface()
1936		// TODO(jba): remove isNaN special case when we have a better equality predicate.
1937		if isNaN(got) && isNaN(test.want) {
1938			continue
1939		}
1940		if !testEqual(got, test.want) {
1941			t.Errorf("%q\n got  %#v\nwant %#v", test.expr, got, test.want)
1942		}
1943	}
1946func TestIntegration_QueryStats(t *testing.T) {
1947	skipEmulatorTest(t)
1948	t.Parallel()
1950	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1951	defer cancel()
1952	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1953	defer cleanup()
1955	accounts := []*Mutation{
1956		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1957		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1958	}
1959	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1960		t.Fatal(err)
1961	}
1962	const sql = "SELECT Balance FROM Accounts"
1964	qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil})
1965	if err != nil {
1966		t.Fatal(err)
1967	}
1968	if len(qp.PlanNodes) == 0 {
1969		t.Error("got zero plan nodes, expected at least one")
1970	}
1972	iter := client.Single().QueryWithStats(ctx, Statement{sql, nil})
1973	defer iter.Stop()
1974	for {
1975		_, err := iter.Next()
1976		if err == iterator.Done {
1977			break
1978		}
1979		if err != nil {
1980			t.Fatal(err)
1981		}
1982	}
1983	if iter.QueryPlan == nil {
1984		t.Error("got nil QueryPlan, expected one")
1985	}
1986	if iter.QueryStats == nil {
1987		t.Error("got nil QueryStats, expected some")
1988	}
1991func TestIntegration_InvalidDatabase(t *testing.T) {
1992	t.Parallel()
1994	if databaseAdmin == nil {
1995		t.Skip("Integration tests skipped")
1996	}
1997	ctx := context.Background()
1998	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID)
1999	c, err := createClient(ctx, dbPath, SessionPoolConfig{})
2000	// Client creation should succeed even if the database is invalid.
2001	if err != nil {
2002		t.Fatal(err)
2003	}
2004	_, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"})
2005	if msg, ok := matchError(err, codes.NotFound, ""); !ok {
2006		t.Fatal(msg)
2007	}
2010func TestIntegration_ReadErrors(t *testing.T) {
2011	t.Parallel()
2013	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2014	defer cancel()
2015	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
2016	defer cleanup()
2018	var ms []*Mutation
2019	for i := 0; i < 2; i++ {
2020		ms = append(ms, InsertOrUpdate(testTable,
2021			testTableColumns,
2022			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v")}))
2023	}
2024	if _, err := client.Apply(ctx, ms); err != nil {
2025		t.Fatal(err)
2026	}
2028	// Read over invalid table fails
2029	_, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"})
2030	if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok {
2031		t.Error(msg)
2032	}
2033	// Read over invalid column fails
2034	_, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"})
2035	if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok {
2036		t.Error(msg)
2037	}
2039	// Invalid query fails
2040	iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"})
2041	defer iter.Stop()
2042	_, err = iter.Next()
2043	if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok {
2044		t.Error(msg)
2045	}
2047	// Read should fail on cancellation.
2048	cctx, cancel := context.WithCancel(ctx)
2049	cancel()
2050	_, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"})
2051	if msg, ok := matchError(err, codes.Canceled, ""); !ok {
2052		t.Error(msg)
2053	}
2054	// Read should fail if deadline exceeded.
2055	dctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
2056	defer cancel()
2057	<-dctx.Done()
2058	_, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"})
2059	if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok {
2060		t.Error(msg)
2061	}
2062	// Read should fail if there are multiple rows returned.
2063	_, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v"}, testTableColumns)
2064	wantMsgPart := fmt.Sprintf("more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", testTable, Key{"v"}, testTableIndex)
2065	if msg, ok := matchError(err, codes.FailedPrecondition, wantMsgPart); !ok {
2066		t.Error(msg)
2067	}
2070// Test TransactionRunner. Test that transactions are aborted and retried as
2071// expected.
2072func TestIntegration_TransactionRunner(t *testing.T) {
2073	skipEmulatorTest(t)
2074	t.Parallel()
2076	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2077	defer cancel()
2078	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2079	defer cleanup()
2081	// Test 1: User error should abort the transaction.
2082	_, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2083		tx.BufferWrite([]*Mutation{
2084			Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})})
2085		return errors.New("user error")
2086	})
2087	// Empty read.
2088	rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"}))
2089	if err != nil {
2090		t.Fatal(err)
2091	}
2092	if got, want := len(rows), 0; got != want {
2093		t.Errorf("Empty read, got %d, want %d.", got, want)
2094	}
2096	// Test 2: Expect abort and retry.
2097	// We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by
2098	// committing writes to the column txn2 have read, and expect the following
2099	// read to abort and txn2 retries.
2101	// Set up two accounts
2102	accounts := []*Mutation{
2103		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}),
2104		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}),
2105	}
2106	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
2107		t.Fatal(err)
2108	}
2110	var (
2111		cTxn1Start  = make(chan struct{})
2112		cTxn1Commit = make(chan struct{})
2113		cTxn2Start  = make(chan struct{})
2114		wg          sync.WaitGroup
2115	)
2117	// read balance, check error if we don't expect abort.
2118	readBalance := func(tx interface {
2119		ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error)
2120	}, key int64, expectAbort bool) (int64, error) {
2121		var b int64
2122		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"})
2123		if e != nil {
2124			if expectAbort && !isAbortedErr(e) {
2125				t.Errorf("ReadRow got %v, want Abort error.", e)
2126			}
2127			// Verify that we received and are able to extract retry info from
2128			// the aborted error.
2129			if _, hasRetryInfo := ExtractRetryDelay(e); !hasRetryInfo {
2130				t.Errorf("Got Abort error without RetryInfo\nGot: %v", e)
2131			}
2132			return b, e
2133		}
2134		if ce := r.Column(0, &b); ce != nil {
2135			return b, ce
2136		}
2137		return b, nil
2138	}
2140	wg.Add(2)
2141	// Txn 1
2142	go func() {
2143		defer wg.Done()
2144		var once sync.Once
2145		_, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2146			b, e := readBalance(tx, 1, false)
2147			if e != nil {
2148				return e
2149			}
2150			// txn 1 can abort, in that case we skip closing the channel on
2151			// retry.
2152			once.Do(func() { close(cTxn1Start) })
2153			e = tx.BufferWrite([]*Mutation{
2154				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})})
2155			if e != nil {
2156				return e
2157			}
2158			// Wait for second transaction.
2159			<-cTxn2Start
2160			return nil
2161		})
2162		close(cTxn1Commit)
2163		if e != nil {
2164			t.Errorf("Transaction 1 commit, got %v, want nil.", e)
2165		}
2166	}()
2167	// Txn 2
2168	go func() {
2169		// Wait until txn 1 starts.
2170		<-cTxn1Start
2171		defer wg.Done()
2172		var (
2173			once sync.Once
2174			b1   int64
2175			b2   int64
2176			e    error
2177		)
2178		_, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2179			if b1, e = readBalance(tx, 1, false); e != nil {
2180				return e
2181			}
2182			// Skip closing channel on retry.
2183			once.Do(func() { close(cTxn2Start) })
2184			// Wait until txn 1 successfully commits.
2185			<-cTxn1Commit
2186			// Txn1 has committed and written a balance to the account. Now this
2187			// transaction (txn2) reads and re-writes the balance. The first
2188			// time through, it will abort because it overlaps with txn1. Then
2189			// it will retry after txn1 commits, and succeed.
2190			if b2, e = readBalance(tx, 2, true); e != nil {
2191				return e
2192			}
2193			return tx.BufferWrite([]*Mutation{
2194				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})})
2195		})
2196		if e != nil {
2197			t.Errorf("Transaction 2 commit, got %v, want nil.", e)
2198		}
2199	}()
2200	wg.Wait()
2201	// Check that both transactions' effects are visible.
2202	for i := int64(1); i <= int64(2); i++ {
2203		if b, e := readBalance(client.Single(), i, false); e != nil {
2204			t.Fatalf("ReadBalance for key %d error %v.", i, e)
2205		} else if b != i {
2206			t.Errorf("Balance for key %d, got %d, want %d.", i, b, i)
2207		}
2208	}
2211// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then
2212// serialize and deserialize both transaction and partition to be used in
2213// execution on another client, and compare results.
2214func TestIntegration_BatchQuery(t *testing.T) {
2215	t.Parallel()
2217	// Set up testing environment.
2218	var (
2219		client2 *Client
2220		err     error
2221	)
2222	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2223	defer cancel()
2224	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
2225	defer cleanup()
2227	if err = populate(ctx, client); err != nil {
2228		t.Fatal(err)
2229	}
2230	if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
2231		t.Fatal(err)
2232	}
2233	defer client2.Close()
2235	// PartitionQuery
2236	var (
2237		txn        *BatchReadOnlyTransaction
2238		partitions []*Partition
2239		stmt       = Statement{SQL: "SELECT * FROM test;"}
2240	)
2242	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
2243		t.Fatal(err)
2244	}
2245	defer txn.Cleanup(ctx)
2246	if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil {
2247		t.Fatal(err)
2248	}
2250	// Reconstruct BatchReadOnlyTransactionID and execute partitions
2251	var (
2252		tid2      BatchReadOnlyTransactionID
2253		data      []byte
2254		gotResult bool // if we get matching result from two separate txns
2255	)
2256	if data, err = txn.ID.MarshalBinary(); err != nil {
2257		t.Fatalf("encoding failed %v", err)
2258	}
2259	if err = tid2.UnmarshalBinary(data); err != nil {
2260		t.Fatalf("decoding failed %v", err)
2261	}
2262	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
2264	// Execute Partitions and compare results
2265	for i, p := range partitions {
2266		iter := txn.Execute(ctx, p)
2267		defer iter.Stop()
2268		p2 := serdesPartition(t, i, p)
2269		iter2 := txn2.Execute(ctx, &p2)
2270		defer iter2.Stop()
2272		row1, err1 := iter.Next()
2273		row2, err2 := iter2.Next()
2274		if err1 != err2 {
2275			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
2276			continue
2277		}
2278		if !testEqual(row1, row2) {
2279			t.Fatalf("execution returned different values: %v, %v", row1, row2)
2280			continue
2281		}
2282		if row1 == nil {
2283			continue
2284		}
2285		var a, b string
2286		if err = row1.Columns(&a, &b); err != nil {
2287			t.Fatalf("failed to parse row %v", err)
2288			continue
2289		}
2290		if a == str1 && b == str2 {
2291			gotResult = true
2292		}
2293	}
2294	if !gotResult {
2295		t.Fatalf("execution didn't return expected values")
2296	}
2299// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery
2300func TestIntegration_BatchRead(t *testing.T) {
2301	t.Parallel()
2303	// Set up testing environment.
2304	var (
2305		client2 *Client
2306		err     error
2307	)
2308	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2309	defer cancel()
2310	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
2311	defer cleanup()
2313	if err = populate(ctx, client); err != nil {
2314		t.Fatal(err)
2315	}
2316	if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
2317		t.Fatal(err)
2318	}
2319	defer client2.Close()
2321	// PartitionRead
2322	var (
2323		txn        *BatchReadOnlyTransaction
2324		partitions []*Partition
2325	)
2327	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
2328		t.Fatal(err)
2329	}
2330	defer txn.Cleanup(ctx)
2331	if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
2332		t.Fatal(err)
2333	}
2335	// Reconstruct BatchReadOnlyTransactionID and execute partitions.
2336	var (
2337		tid2      BatchReadOnlyTransactionID
2338		data      []byte
2339		gotResult bool // if we get matching result from two separate txns
2340	)
2341	if data, err = txn.ID.MarshalBinary(); err != nil {
2342		t.Fatalf("encoding failed %v", err)
2343	}
2344	if err = tid2.UnmarshalBinary(data); err != nil {
2345		t.Fatalf("decoding failed %v", err)
2346	}
2347	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
2349	// Execute Partitions and compare results.
2350	for i, p := range partitions {
2351		iter := txn.Execute(ctx, p)
2352		defer iter.Stop()
2353		p2 := serdesPartition(t, i, p)
2354		iter2 := txn2.Execute(ctx, &p2)
2355		defer iter2.Stop()
2357		row1, err1 := iter.Next()
2358		row2, err2 := iter2.Next()
2359		if err1 != err2 {
2360			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
2361			continue
2362		}
2363		if !testEqual(row1, row2) {
2364			t.Fatalf("execution returned different values: %v, %v", row1, row2)
2365			continue
2366		}
2367		if row1 == nil {
2368			continue
2369		}
2370		var a, b string
2371		if err = row1.Columns(&a, &b); err != nil {
2372			t.Fatalf("failed to parse row %v", err)
2373			continue
2374		}
2375		if a == str1 && b == str2 {
2376			gotResult = true
2377		}
2378	}
2379	if !gotResult {
2380		t.Fatalf("execution didn't return expected values")
2381	}
2384// Test normal txReadEnv method on BatchReadOnlyTransaction.
2385func TestIntegration_BROTNormal(t *testing.T) {
2386	t.Parallel()
2388	// Set up testing environment and create txn.
2389	var (
2390		txn *BatchReadOnlyTransaction
2391		err error
2392		row *Row
2393		i   int64
2394	)
2395	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2396	defer cancel()
2397	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
2398	defer cleanup()
2400	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
2401		t.Fatal(err)
2402	}
2403	defer txn.Cleanup(ctx)
2404	if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
2405		t.Fatal(err)
2406	}
2407	// Normal query should work with BatchReadOnlyTransaction.
2408	stmt2 := Statement{SQL: "SELECT 1"}
2409	iter := txn.Query(ctx, stmt2)
2410	defer iter.Stop()
2412	row, err = iter.Next()
2413	if err != nil {
2414		t.Errorf("query failed with %v", err)
2415	}
2416	if err = row.Columns(&i); err != nil {
2417		t.Errorf("failed to parse row %v", err)
2418	}
2421func TestIntegration_CommitTimestamp(t *testing.T) {
2422	t.Parallel()
2424	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2425	defer cancel()
2426	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, ctsDBStatements)
2427	defer cleanup()
2429	type testTableRow struct {
2430		Key string
2431		Ts  NullTime
2432	}
2434	var (
2435		cts1, cts2, ts1, ts2 time.Time
2436		err                  error
2437	)
2439	// Apply mutation in sequence, expect to see commit timestamp in good order,
2440	// check also the commit timestamp returned
2441	for _, it := range []struct {
2442		k string
2443		t *time.Time
2444	}{
2445		{"a", &cts1},
2446		{"b", &cts2},
2447	} {
2448		tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}}
2449		m, err := InsertStruct("TestTable", tt)
2450		if err != nil {
2451			t.Fatal(err)
2452		}
2453		*it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce())
2454		if err != nil {
2455			t.Fatal(err)
2456		}
2457	}
2459	txn := client.ReadOnlyTransaction()
2460	for _, it := range []struct {
2461		k string
2462		t *time.Time
2463	}{
2464		{"a", &ts1},
2465		{"b", &ts2},
2466	} {
2467		if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil {
2468			t.Fatal(err)
2469		} else {
2470			var got testTableRow
2471			if err := r.ToStruct(&got); err != nil {
2472				t.Fatal(err)
2473			}
2474			*it.t = got.Ts.Time
2475		}
2476	}
2477	if !cts1.Equal(ts1) {
2478		t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1)
2479	}
2480	if !cts2.Equal(ts2) {
2481		t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2)
2482	}
2484	// Try writing a timestamp in the future to commit timestamp, expect error.
2485	_, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce())
2486	if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok {
2487		t.Error(msg)
2488	}
2491func TestIntegration_DML(t *testing.T) {
2492	t.Parallel()
2494	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2495	defer cancel()
2496	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2497	defer cleanup()
2499	// Function that reads a single row's first name from within a transaction.
2500	readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) {
2501		row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"})
2502		if err != nil {
2503			return "", err
2504		}
2505		var fn string
2506		if err := row.Column(0, &fn); err != nil {
2507			return "", err
2508		}
2509		return fn, nil
2510	}
2512	// Function that reads multiple rows' first names from outside a read/write
2513	// transaction.
2514	readFirstNames := func(keys ...int) []string {
2515		var ks []KeySet
2516		for _, k := range keys {
2517			ks = append(ks, Key{k})
2518		}
2519		iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"})
2520		var got []string
2521		var fn string
2522		err := iter.Do(func(row *Row) error {
2523			if err := row.Column(0, &fn); err != nil {
2524				return err
2525			}
2526			got = append(got, fn)
2527			return nil
2528		})
2529		if err != nil {
2530			t.Fatalf("readFirstNames(%v): %v", keys, err)
2531		}
2532		return got
2533	}
2535	// Use ReadWriteTransaction.Query to execute a DML statement.
2536	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2537		iter := tx.Query(ctx, Statement{
2538			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`,
2539		})
2540		defer iter.Stop()
2541		row, err := iter.Next()
2542		if ErrCode(err) == codes.Aborted {
2543			return err
2544		}
2545		if err != iterator.Done {
2546			t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err)
2547		}
2548		if iter.RowCount != 1 {
2549			t.Errorf("row count: got %d, want 1", iter.RowCount)
2550		}
2551		// The results of the DML statement should be visible to the transaction.
2552		got, err := readFirstName(tx, 1)
2553		if err != nil {
2554			return err
2555		}
2556		if want := "Umm"; got != want {
2557			t.Errorf("got %q, want %q", got, want)
2558		}
2559		return nil
2560	})
2561	if err != nil {
2562		t.Fatal(err)
2563	}
2565	// Use ReadWriteTransaction.Update to execute a DML statement.
2566	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2567		count, err := tx.Update(ctx, Statement{
2568			SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`,
2569		})
2570		if err != nil {
2571			return err
2572		}
2573		if count != 1 {
2574			t.Errorf("row count: got %d, want 1", count)
2575		}
2576		got, err := readFirstName(tx, 2)
2577		if err != nil {
2578			return err
2579		}
2580		if want := "Eduard"; got != want {
2581			t.Errorf("got %q, want %q", got, want)
2582		}
2583		return nil
2585	})
2586	if err != nil {
2587		t.Fatal(err)
2588	}
2590	// Roll back a DML statement and confirm that it didn't happen.
2591	var fail = errors.New("fail")
2592	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2593		_, err := tx.Update(ctx, Statement{
2594			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
2595		})
2596		if err != nil {
2597			return err
2598		}
2599		return fail
2600	})
2601	if err != fail {
2602		t.Fatalf("rolling back: got error %v, want the error 'fail'", err)
2603	}
2604	_, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"})
2605	if got, want := ErrCode(err), codes.NotFound; got != want {
2606		t.Errorf("got %s, want %s", got, want)
2607	}
2609	// Run two DML statements in the same transaction.
2610	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2611		_, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`})
2612		if err != nil {
2613			return err
2614		}
2615		_, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`})
2616		if err != nil {
2617			return err
2618		}
2619		return nil
2620	})
2621	if err != nil {
2622		t.Fatal(err)
2623	}
2624	got := readFirstNames(1, 2)
2625	want := []string{"Oum", "Eddie"}
2626	if !testEqual(got, want) {
2627		t.Errorf("got %v, want %v", got, want)
2628	}
2630	// Run a DML statement and an ordinary mutation in the same transaction.
2631	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2632		_, err := tx.Update(ctx, Statement{
2633			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
2634		})
2635		if err != nil {
2636			return err
2637		}
2638		return tx.BufferWrite([]*Mutation{
2639			Insert("Singers", []string{"SingerId", "FirstName", "LastName"},
2640				[]interface{}{4, "Andy", "Irvine"}),
2641		})
2642	})
2643	if err != nil {
2644		t.Fatal(err)
2645	}
2646	got = readFirstNames(3, 4)
2647	want = []string{"Audra", "Andy"}
2648	if !testEqual(got, want) {
2649		t.Errorf("got %v, want %v", got, want)
2650	}
2652	// Attempt to run a query using update.
2653	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2654		_, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`})
2655		return err
2656	})
2657	if got, want := ErrCode(err), codes.InvalidArgument; got != want {
2658		t.Errorf("got %s, want %s", got, want)
2659	}
2662func TestIntegration_StructParametersBind(t *testing.T) {
2663	t.Parallel()
2665	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2666	defer cancel()
2667	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
2668	defer cleanup()
2670	type tRow []interface{}
2671	type tRows []struct{ trow tRow }
2673	type allFields struct {
2674		Stringf string
2675		Intf    int
2676		Boolf   bool
2677		Floatf  float64
2678		Bytef   []byte
2679		Timef   time.Time
2680		Datef   civil.Date
2681	}
2682	allColumns := []string{
2683		"Stringf",
2684		"Intf",
2685		"Boolf",
2686		"Floatf",
2687		"Bytef",
2688		"Timef",
2689		"Datef",
2690	}
2691	s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1}
2692	s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2}
2694	dynamicStructType := reflect.StructOf([]reflect.StructField{
2695		{Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`},
2696	})
2697	s3 := reflect.New(dynamicStructType)
2698	s3.Elem().Field(0).Set(reflect.ValueOf(t1))
2700	for i, test := range []struct {
2701		param interface{}
2702		sql   string
2703		cols  []string
2704		trows tRows
2705	}{
2706		// Struct value.
2707		{
2708			s1,
2709			"SELECT" +
2710				" @p.Stringf," +
2711				" @p.Intf," +
2712				" @p.Boolf," +
2713				" @p.Floatf," +
2714				" @p.Bytef," +
2715				" @p.Timef," +
2716				" @p.Datef",
2717			allColumns,
2718			tRows{
2719				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
2720			},
2721		},
2722		// Array of struct value.
2723		{
2724			[]allFields{s1, s2},
2725			"SELECT * FROM UNNEST(@p)",
2726			allColumns,
2727			tRows{
2728				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
2729				{tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}},
2730			},
2731		},
2732		// Null struct.
2733		{
2734			(*allFields)(nil),
2735			"SELECT @p IS NULL",
2736			[]string{""},
2737			tRows{
2738				{tRow{true}},
2739			},
2740		},
2741		// Null Array of struct.
2742		{
2743			[]allFields(nil),
2744			"SELECT @p IS NULL",
2745			[]string{""},
2746			tRows{
2747				{tRow{true}},
2748			},
2749		},
2750		// Empty struct.
2751		{
2752			struct{}{},
2753			"SELECT @p IS NULL ",
2754			[]string{""},
2755			tRows{
2756				{tRow{false}},
2757			},
2758		},
2759		// Empty array of struct.
2760		{
2761			[]allFields{},
2762			"SELECT * FROM UNNEST(@p) ",
2763			allColumns,
2764			tRows{},
2765		},
2766		// Struct with duplicate fields.
2767		{
2768			struct {
2769				A int `spanner:"field"`
2770				B int `spanner:"field"`
2771			}{10, 20},
2772			"SELECT * FROM UNNEST([@p]) ",
2773			[]string{"field", "field"},
2774			tRows{
2775				{tRow{10, 20}},
2776			},
2777		},
2778		// Struct with unnamed fields.
2779		{
2780			struct {
2781				A string `spanner:""`
2782			}{"hello"},
2783			"SELECT * FROM UNNEST([@p]) ",
2784			[]string{""},
2785			tRows{
2786				{tRow{"hello"}},
2787			},
2788		},
2789		// Mixed struct.
2790		{
2791			struct {
2792				DynamicStructField interface{}  `spanner:"f1"`
2793				ArrayStructField   []*allFields `spanner:"f2"`
2794			}{
2795				DynamicStructField: s3.Interface(),
2796				ArrayStructField:   []*allFields{nil},
2797			},
2798			"SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ",
2799			[]string{"ff1", "", ""},
2800			tRows{
2801				{tRow{t1, 1, true}},
2802			},
2803		},
2804	} {
2805		iter := client.Single().Query(ctx, Statement{
2806			SQL:    test.sql,
2807			Params: map[string]interface{}{"p": test.param},
2808		})
2809		var gotRows []*Row
2810		err := iter.Do(func(r *Row) error {
2811			gotRows = append(gotRows, r)
2812			return nil
2813		})
2814		if err != nil {
2815			t.Errorf("Failed to execute test case %d, error: %v", i, err)
2816		}
2818		var wantRows []*Row
2819		for j, row := range test.trows {
2820			r, err := NewRow(test.cols, row.trow)
2821			if err != nil {
2822				t.Errorf("Invalid row %d in test case %d", j, i)
2823			}
2824			wantRows = append(wantRows, r)
2825		}
2826		if !testEqual(gotRows, wantRows) {
2827			t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows)
2828		}
2829	}
2832func TestIntegration_PDML(t *testing.T) {
2833	t.Parallel()
2835	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2836	defer cancel()
2837	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2838	defer cleanup()
2840	columns := []string{"SingerId", "FirstName", "LastName"}
2842	// Populate the Singers table.
2843	var muts []*Mutation
2844	for _, row := range [][]interface{}{
2845		{1, "Umm", "Kulthum"},
2846		{2, "Eduard", "Khil"},
2847		{3, "Audra", "McDonald"},
2848		{4, "Enrique", "Iglesias"},
2849		{5, "Shakira", "Ripoll"},
2850	} {
2851		muts = append(muts, Insert("Singers", columns, row))
2852	}
2853	if _, err := client.Apply(ctx, muts); err != nil {
2854		t.Fatal(err)
2855	}
2856	// Identifiers in PDML statements must be fully qualified.
2857	// TODO(jba): revisit the above.
2858	count, err := client.PartitionedUpdate(ctx, Statement{
2859		SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= @end`,
2860		Params: map[string]interface{}{
2861			"end": 3,
2862		},
2863	})
2864	if err != nil {
2865		t.Fatal(err)
2866	}
2867	if want := int64(3); count != want {
2868		t.Fatalf("got %d, want %d", count, want)
2869	}
2870	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2871	if err != nil {
2872		t.Fatal(err)
2873	}
2874	want := [][]interface{}{
2875		{int64(1), "changed", "Kulthum"},
2876		{int64(2), "changed", "Khil"},
2877		{int64(3), "changed", "McDonald"},
2878		{int64(4), "Enrique", "Iglesias"},
2879		{int64(5), "Shakira", "Ripoll"},
2880	}
2881	if !testEqual(got, want) {
2882		t.Errorf("\ngot %v\nwant%v", got, want)
2883	}
2886func TestIntegration_BatchDML(t *testing.T) {
2887	t.Parallel()
2889	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2890	defer cancel()
2891	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2892	defer cleanup()
2894	columns := []string{"SingerId", "FirstName", "LastName"}
2896	// Populate the Singers table.
2897	var muts []*Mutation
2898	for _, row := range [][]interface{}{
2899		{1, "Umm", "Kulthum"},
2900		{2, "Eduard", "Khil"},
2901		{3, "Audra", "McDonald"},
2902	} {
2903		muts = append(muts, Insert("Singers", columns, row))
2904	}
2905	if _, err := client.Apply(ctx, muts); err != nil {
2906		t.Fatal(err)
2907	}
2909	var counts []int64
2910	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2911		counts, err = tx.BatchUpdate(ctx, []Statement{
2912			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2913			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2914			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2915		})
2916		return err
2917	})
2919	if err != nil {
2920		t.Fatal(err)
2921	}
2922	if want := []int64{1, 1, 1}; !testEqual(counts, want) {
2923		t.Fatalf("got %d, want %d", counts, want)
2924	}
2925	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2926	if err != nil {
2927		t.Fatal(err)
2928	}
2929	want := [][]interface{}{
2930		{int64(1), "changed 1", "Kulthum"},
2931		{int64(2), "changed 2", "Khil"},
2932		{int64(3), "changed 3", "McDonald"},
2933	}
2934	if !testEqual(got, want) {
2935		t.Errorf("\ngot %v\nwant%v", got, want)
2936	}
2939func TestIntegration_BatchDML_NoStatements(t *testing.T) {
2940	t.Parallel()
2942	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2943	defer cancel()
2944	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2945	defer cleanup()
2947	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2948		_, err = tx.BatchUpdate(ctx, []Statement{})
2949		return err
2950	})
2951	if err == nil {
2952		t.Fatal("expected error, got nil")
2953	}
2954	if s, ok := status.FromError(err); ok {
2955		if s.Code() != codes.InvalidArgument {
2956			t.Fatalf("expected InvalidArgument, got %v", err)
2957		}
2958	} else {
2959		t.Fatalf("expected InvalidArgument, got %v", err)
2960	}
2963func TestIntegration_BatchDML_TwoStatements(t *testing.T) {
2964	t.Parallel()
2966	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2967	defer cancel()
2968	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2969	defer cleanup()
2971	columns := []string{"SingerId", "FirstName", "LastName"}
2973	// Populate the Singers table.
2974	var muts []*Mutation
2975	for _, row := range [][]interface{}{
2976		{1, "Umm", "Kulthum"},
2977		{2, "Eduard", "Khil"},
2978		{3, "Audra", "McDonald"},
2979	} {
2980		muts = append(muts, Insert("Singers", columns, row))
2981	}
2982	if _, err := client.Apply(ctx, muts); err != nil {
2983		t.Fatal(err)
2984	}
2986	var updateCount int64
2987	var batchCounts []int64
2988	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2989		batchCounts, err = tx.BatchUpdate(ctx, []Statement{
2990			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2991			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2992			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2993		})
2994		if err != nil {
2995			return err
2996		}
2998		updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`})
2999		return err
3000	})
3001	if err != nil {
3002		t.Fatal(err)
3003	}
3004	if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) {
3005		t.Fatalf("got %d, want %d", batchCounts, want)
3006	}
3007	if updateCount != 1 {
3008		t.Fatalf("got %v, want 1", updateCount)
3009	}
3012func TestIntegration_BatchDML_Error(t *testing.T) {
3013	t.Parallel()
3015	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
3016	defer cancel()
3017	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
3018	defer cleanup()
3020	columns := []string{"SingerId", "FirstName", "LastName"}
3022	// Populate the Singers table.
3023	var muts []*Mutation
3024	for _, row := range [][]interface{}{
3025		{1, "Umm", "Kulthum"},
3026		{2, "Eduard", "Khil"},
3027		{3, "Audra", "McDonald"},
3028	} {
3029		muts = append(muts, Insert("Singers", columns, row))
3030	}
3031	if _, err := client.Apply(ctx, muts); err != nil {
3032		t.Fatal(err)
3033	}
3035	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
3036		counts, err := tx.BatchUpdate(ctx, []Statement{
3037			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
3038			{SQL: `some illegal statement`},
3039			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
3040		})
3041		if err == nil {
3042			t.Fatal("expected err, got nil")
3043		}
3044		// The statement may also have been aborted by Spanner, and in that
3045		// case we should just let the client library retry the transaction.
3046		if status.Code(err) == codes.Aborted {
3047			return err
3048		}
3049		if want := []int64{1}; !testEqual(counts, want) {
3050			t.Fatalf("got %d, want %d", counts, want)
3051		}
3053		got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns))
3054		// Aborted error is ok, just retry the transaction.
3055		if status.Code(err) == codes.Aborted {
3056			return err
3057		}
3058		if err != nil {
3059			t.Fatal(err)
3060		}
3061		want := [][]interface{}{
3062			{int64(1), "changed 1", "Kulthum"},
3063			{int64(2), "Eduard", "Khil"},
3064			{int64(3), "Audra", "McDonald"},
3065		}
3066		if !testEqual(got, want) {
3067			t.Errorf("\ngot %v\nwant%v", got, want)
3068		}
3070		return nil
3071	})
3072	if err != nil {
3073		t.Fatal(err)
3074	}
3077func TestIntegration_StartBackupOperation(t *testing.T) {
3078	skipEmulatorTest(t)
3079	t.Parallel()
3081	// Backups can be slow, so use a 30 minute timeout.
3082	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
3083	defer cancel()
3084	_, testDatabaseName, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, backuDBStatements)
3085	defer cleanup()
3087	backupID := backupIDSpace.New()
3088	backupName := fmt.Sprintf("projects/%s/instances/%s/backups/%s", testProjectID, testInstanceID, backupID)
3089	// Minimum expiry time is 6 hours
3090	expires := time.Now().Add(time.Hour * 7)
3091	respLRO, err := databaseAdmin.StartBackupOperation(ctx, backupID, testDatabaseName, expires)
3092	if err != nil {
3093		t.Fatal(err)
3094	}
3096	_, err = respLRO.Wait(ctx)
3097	if err != nil {
3098		t.Fatal(err)
3099	}
3100	respMetadata, err := respLRO.Metadata()
3101	if err != nil {
3102		t.Fatalf("backup response metadata, got error %v, want nil", err)
3103	}
3104	if respMetadata.Database != testDatabaseName {
3105		t.Fatalf("backup database name, got %s, want %s", respMetadata.Database, testDatabaseName)
3106	}
3107	if respMetadata.Progress.ProgressPercent != 100 {
3108		t.Fatalf("backup progress percent, got %d, want 100", respMetadata.Progress.ProgressPercent)
3109	}
3110	respCheck, err := databaseAdmin.GetBackup(ctx, &adminpb.GetBackupRequest{Name: backupName})
3111	if err != nil {
3112		t.Fatalf("backup metadata, got error %v, want nil", err)
3113	}
3114	if respCheck.CreateTime == nil {
3115		t.Fatal("backup create time, got nil, want non-nil")
3116	}
3117	if respCheck.State != adminpb.Backup_READY {
3118		t.Fatalf("backup state, got %v, want %v", respCheck.State, adminpb.Backup_READY)
3119	}
3120	if respCheck.SizeBytes == 0 {
3121		t.Fatalf("backup size, got %d, want non-zero", respCheck.SizeBytes)
3122	}
3125// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
3126func TestIntegration_DirectPathFallback(t *testing.T) {
3127	t.Parallel()
3128	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
3129	defer cancel()
3130	// Set up testing environment.
3131	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
3132	defer cleanup()
3134	if !dpConfig.attemptDirectPath {
3135		return
3136	}
3137	if len(blackholeDpv6Cmd) == 0 {
3138		t.Fatal("-it.blackhole-dpv6-cmd unset")
3139	}
3140	if len(blackholeDpv4Cmd) == 0 {
3141		t.Fatal("-it.blackhole-dpv4-cmd unset")
3142	}
3143	if len(allowDpv6Cmd) == 0 {
3144		t.Fatal("-it.allowdpv6-cmd unset")
3145	}
3146	if len(allowDpv4Cmd) == 0 {
3147		t.Fatal("-it.allowdpv4-cmd unset")
3148	}
3150	// Precondition: wait for DirectPath to connect.
3151	countEnough := examineTraffic(ctx, client /*blackholeDP = */, false)
3152	if !countEnough {
3153		t.Fatalf("Failed to observe RPCs over DirectPath")
3154	}
3156	// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
3157	blackholeOrAllowDirectPath(t /*blackholeDP = */, true)
3158	countEnough = examineTraffic(ctx, client /*blackholeDP = */, true)
3159	if !countEnough {
3160		t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
3161	}
3163	// Disable the blackhole, and client should use DirectPath again.
3164	blackholeOrAllowDirectPath(t /*blackholeDP = */, false)
3165	countEnough = examineTraffic(ctx, client /*blackholeDP = */, false)
3166	if !countEnough {
3167		t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
3168	}
3171// Prepare initializes Cloud Spanner testing DB and clients.
3172func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
3173	if databaseAdmin == nil {
3174		t.Skip("Integration tests skipped")
3175	}
3176	// Construct a unique test DB name.
3177	dbName := dbNameSpace.New()
3179	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName)
3180	// Create database and tables.
3181	op, err := databaseAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{
3182		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
3183		CreateStatement: "CREATE DATABASE " + dbName,
3184		ExtraStatements: statements,
3185	})
3186	if err != nil {
3187		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
3188	}
3189	if _, err := op.Wait(ctx); err != nil {
3190		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
3191	}
3192	client, err := createClient(ctx, dbPath, spc)
3193	if err != nil {
3194		t.Fatalf("cannot create data client on DB %v: %v", dbPath, err)
3195	}
3196	return client, dbPath, func() {
3197		client.Close()
3198	}
3201func cleanupInstances() {
3202	if instanceAdmin == nil {
3203		// Integration tests skipped.
3204		return
3205	}
3207	ctx := context.Background()
3208	parent := fmt.Sprintf("projects/%v", testProjectID)
3209	iter := instanceAdmin.ListInstances(ctx, &instancepb.ListInstancesRequest{
3210		Parent: parent,
3211		Filter: "name:gotest-",
3212	})
3213	expireAge := 24 * time.Hour
3215	for {
3216		inst, err := iter.Next()
3217		if err == iterator.Done {
3218			break
3219		}
3220		if err != nil {
3221			panic(err)
3222		}
3224		_, instID, err := parseInstanceName(inst.Name)
3225		if err != nil {
3226			log.Printf("Error: %v\n", err)
3227			continue
3228		}
3229		if instanceNameSpace.Older(instID, expireAge) {
3230			log.Printf("Deleting instance %s", inst.Name)
3231			deleteInstanceAndBackups(ctx, inst.Name)
3232		}
3233	}
3236func deleteInstanceAndBackups(ctx context.Context, instanceName string) {
3237	// First delete any lingering backups that might have been left on
3238	// the instance.
3239	backups := databaseAdmin.ListBackups(ctx, &adminpb.ListBackupsRequest{Parent: instanceName})
3240	for {
3241		backup, err := backups.Next()
3242		if err == iterator.Done {
3243			break
3244		}
3245		if err != nil {
3246			log.Printf("failed to retrieve backups from instance %s because of error %v", instanceName, err)
3247			break
3248		}
3249		if err := databaseAdmin.DeleteBackup(ctx, &adminpb.DeleteBackupRequest{Name: backup.Name}); err != nil {
3250			log.Printf("failed to delete backup %s (error %v)", backup.Name, err)
3251		}
3252	}
3254	if err := instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{Name: instanceName}); err != nil {
3255		log.Printf("failed to delete instance %s (error %v), might need a manual removal",
3256			instanceName, err)
3257	}
3260func rangeReads(ctx context.Context, t *testing.T, client *Client) {
3261	checkRange := func(ks KeySet, wantNums ...int) {
3262		if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok {
3263			t.Errorf("key set %+v: %s", ks, msg)
3264		}
3265	}
3267	checkRange(Key{"k1"}, 1)
3268	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4)
3269	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5)
3270	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5)
3271	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4)
3273	// Partial key specification.
3274	checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9)
3275	checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9)
3276	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10)
3277	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11)
3279	// The following produce empty ranges.
3280	// TODO(jba): Consider a multi-part key to illustrate partial key behavior.
3281	// checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen})
3282	// checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen})
3283	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen})
3284	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed})
3286	// Prefix is component-wise, not string prefix.
3287	checkRange(Key{"k1"}.AsPrefix(), 1)
3288	checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
3290	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
3293func indexRangeReads(ctx context.Context, t *testing.T, client *Client) {
3294	checkRange := func(ks KeySet, wantNums ...int) {
3295		if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns),
3296			wantNums); !ok {
3297			t.Errorf("key set %+v: %s", ks, msg)
3298		}
3299	}
3301	checkRange(Key{"v1"}, 1)
3302	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4)
3303	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5)
3304	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5)
3305	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4)
3307	// // Partial key specification.
3308	checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9)
3309	checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9)
3310	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10)
3311	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11)
3313	// // The following produce empty ranges.
3314	// checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen})
3315	// checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen})
3316	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen})
3317	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed})
3319	// // Prefix is component-wise, not string prefix.
3320	checkRange(Key{"v1"}.AsPrefix(), 1)
3321	checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
3322	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
3324	// Read from an index with DESC ordering.
3325	wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}
3326	if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns),
3327		wantNums); !ok {
3328		t.Errorf("desc: %s", msg)
3329	}
3332type testTableRow struct{ Key, StringValue string }
3334func compareRows(iter *RowIterator, wantNums []int) (string, bool) {
3335	rows, err := readAllTestTable(iter)
3336	if err != nil {
3337		return err.Error(), false
3338	}
3339	want := map[string]string{}
3340	for _, n := range wantNums {
3341		want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n)
3342	}
3343	got := map[string]string{}
3344	for _, r := range rows {
3345		got[r.Key] = r.StringValue
3346	}
3347	if !testEqual(got, want) {
3348		return fmt.Sprintf("got %v, want %v", got, want), false
3349	}
3350	return "", true
3353func isNaN(x interface{}) bool {
3354	f, ok := x.(float64)
3355	if !ok {
3356		return false
3357	}
3358	return math.IsNaN(f)
3361// createClient creates Cloud Spanner data client.
3362func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) {
3363	opts := grpcHeaderChecker.CallOptions()
3364	if spannerHost != "" {
3365		opts = append(opts, option.WithEndpoint(spannerHost))
3366	}
3367	if dpConfig.attemptDirectPath {
3368		opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo))))
3369	}
3370	client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}, opts...)
3371	if err != nil {
3372		return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
3373	}
3374	return client, nil
3377// populate prepares the database with some data.
3378func populate(ctx context.Context, client *Client) error {
3379	// Populate data
3380	var err error
3381	m := InsertMap("test", map[string]interface{}{
3382		"a": str1,
3383		"b": str2,
3384	})
3385	_, err = client.Apply(ctx, []*Mutation{m})
3386	return err
3389func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) {
3390	if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) {
3391		return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false
3392	}
3393	return "", true
3396func rowToValues(r *Row) ([]interface{}, error) {
3397	var x int64
3398	var y, z string
3399	if err := r.Column(0, &x); err != nil {
3400		return nil, err
3401	}
3402	if err := r.Column(1, &y); err != nil {
3403		return nil, err
3404	}
3405	if err := r.Column(2, &z); err != nil {
3406		return nil, err
3407	}
3408	return []interface{}{x, y, z}, nil
3411func readAll(iter *RowIterator) ([][]interface{}, error) {
3412	defer iter.Stop()
3413	var vals [][]interface{}
3414	for {
3415		row, err := iter.Next()
3416		if err == iterator.Done {
3417			return vals, nil
3418		}
3419		if err != nil {
3420			return nil, err
3421		}
3422		v, err := rowToValues(row)
3423		if err != nil {
3424			return nil, err
3425		}
3426		vals = append(vals, v)
3427	}
3430func readAllTestTable(iter *RowIterator) ([]testTableRow, error) {
3431	defer iter.Stop()
3432	var vals []testTableRow
3433	for {
3434		row, err := iter.Next()
3435		if err == iterator.Done {
3436			if iter.Metadata == nil {
3437				// All queries should always return metadata, regardless whether
3438				// they return any rows or not.
3439				return nil, errors.New("missing metadata from query")
3440			}
3441			return vals, nil
3442		}
3443		if err != nil {
3444			return nil, err
3445		}
3446		var ttr testTableRow
3447		if err := row.ToStruct(&ttr); err != nil {
3448			return nil, err
3449		}
3450		vals = append(vals, ttr)
3451	}
3454func readAllAccountsTable(iter *RowIterator) ([][]interface{}, error) {
3455	defer iter.Stop()
3456	var vals [][]interface{}
3457	for {
3458		row, err := iter.Next()
3459		if err == iterator.Done {
3460			return vals, nil
3461		}
3462		if err != nil {
3463			return nil, err
3464		}
3465		var id, balance int64
3466		var nickname string
3467		err = row.Columns(&id, &nickname, &balance)
3468		if err != nil {
3469			return nil, err
3470		}
3471		vals = append(vals, []interface{}{id, nickname, balance})
3472	}
3475func maxDuration(a, b time.Duration) time.Duration {
3476	if a > b {
3477		return a
3478	}
3479	return b
3482func isEmulatorEnvSet() bool {
3483	return os.Getenv("SPANNER_EMULATOR_HOST") != ""
3486func skipEmulatorTest(t *testing.T) {
3487	if isEmulatorEnvSet() {
3488		t.Skip("Skipping testing against the emulator.")
3489	}
3492func verifyDirectPathRemoteAddress(t *testing.T) {
3493	t.Helper()
3494	if !dpConfig.attemptDirectPath {
3495		return
3496	}
3497	if remoteIP, res := isDirectPathRemoteAddress(); !res {
3498		if dpConfig.directPathIPv4Only {
3499			t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP)
3500		} else {
3501			t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP)
3502		}
3503	}
3506func isDirectPathRemoteAddress() (_ string, _ bool) {
3507	remoteIP := peerInfo.Addr.String()
3508	// DirectPath ipv4-only can only use ipv4 traffic.
3509	if dpConfig.directPathIPv4Only {
3510		return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix)
3511	}
3512	// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
3513	return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
3516// examineTraffic counts RPCs use DirectPath or CFE traffic.
3517func examineTraffic(ctx context.Context, client *Client, expectDP bool) bool {
3518	var numCount uint64
3519	const (
3520		numRPCsToSend  = 20
3521		minCompleteRPC = 40
3522	)
3523	countEnough := false
3524	start := time.Now()
3525	for !countEnough && time.Since(start) < 2*time.Minute {
3526		for i := 0; i < numRPCsToSend; i++ {
3527			_, _ = readAllTestTable(client.Single().Read(ctx, testTable, Key{"k1"}, testTableColumns))
3528			if _, useDP := isDirectPathRemoteAddress(); useDP != expectDP {
3529				numCount++
3530			}
3531			time.Sleep(100 * time.Millisecond)
3532			countEnough = numCount >= minCompleteRPC
3533		}
3534	}
3535	return countEnough
3538func blackholeOrAllowDirectPath(t *testing.T, blackholeDP bool) {
3539	if dpConfig.directPathIPv4Only {
3540		if blackholeDP {
3541			cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
3542			out, _ := cmdRes.CombinedOutput()
3543			t.Logf(string(out))
3544		} else {
3545			cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
3546			out, _ := cmdRes.CombinedOutput()
3547			t.Logf(string(out))
3548		}
3549		return
3550	}
3551	// DirectPath supports both ipv4 and ipv6
3552	if blackholeDP {
3553		cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
3554		out, _ := cmdRes.CombinedOutput()
3555		t.Logf(string(out))
3556		cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
3557		out, _ = cmdRes.CombinedOutput()
3558		t.Logf(string(out))
3559	} else {
3560		cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
3561		out, _ := cmdRes.CombinedOutput()
3562		t.Logf(string(out))
3563		cmdRes = exec.Command("bash", "-c", allowDpv6Cmd)
3564		out, _ = cmdRes.CombinedOutput()
3565		t.Logf(string(out))
3566	}