1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"errors"
22	"flag"
23	"fmt"
24	"log"
25	"math"
26	"math/big"
27	"os"
28	"os/exec"
29	"reflect"
30	"regexp"
31	"strings"
32	"sync"
33	"testing"
34	"time"
35
36	"cloud.google.com/go/civil"
37	"cloud.google.com/go/internal/testutil"
38	"cloud.google.com/go/internal/uid"
39	database "cloud.google.com/go/spanner/admin/database/apiv1"
40	instance "cloud.google.com/go/spanner/admin/instance/apiv1"
41	"google.golang.org/api/iterator"
42	"google.golang.org/api/option"
43	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
44	instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
45	sppb "google.golang.org/genproto/googleapis/spanner/v1"
46	"google.golang.org/grpc"
47	"google.golang.org/grpc/codes"
48	"google.golang.org/grpc/peer"
49	"google.golang.org/grpc/status"
50)
51
52const (
53	directPathIPV6Prefix = "[2001:4860:8040"
54	directPathIPV4Prefix = "34.126"
55)
56
57var (
58	// testProjectID specifies the project used for testing. It can be changed
59	// by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID.
60	testProjectID = testutil.ProjID()
61
62	// spannerHost specifies the spanner API host used for testing. It can be changed
63	// by setting the environment variable GCLOUD_TESTS_GOLANG_SPANNER_HOST
64	spannerHost = getSpannerHost()
65
66	dbNameSpace       = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
67	instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true})
68	backupIDSpace     = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
69	testInstanceID    = instanceNameSpace.New()
70
71	testTable        = "TestTable"
72	testTableIndex   = "TestTableByValue"
73	testTableColumns = []string{"Key", "StringValue"}
74
75	databaseAdmin *database.DatabaseAdminClient
76	instanceAdmin *instance.InstanceAdminClient
77
78	dpConfig directPathTestConfig
79	peerInfo *peer.Peer
80
81	singerDBStatements = []string{
82		`CREATE TABLE Singers (
83				SingerId	INT64 NOT NULL,
84				FirstName	STRING(1024),
85				LastName	STRING(1024),
86				SingerInfo	BYTES(MAX)
87			) PRIMARY KEY (SingerId)`,
88		`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
89		`CREATE TABLE Accounts (
90				AccountId	INT64 NOT NULL,
91				Nickname	STRING(100),
92				Balance		INT64 NOT NULL,
93			) PRIMARY KEY (AccountId)`,
94		`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
95		`CREATE TABLE Types (
96				RowID		INT64 NOT NULL,
97				String		STRING(MAX),
98				StringArray	ARRAY<STRING(MAX)>,
99				Bytes		BYTES(MAX),
100				BytesArray	ARRAY<BYTES(MAX)>,
101				Int64a		INT64,
102				Int64Array	ARRAY<INT64>,
103				Bool		BOOL,
104				BoolArray	ARRAY<BOOL>,
105				Float64		FLOAT64,
106				Float64Array	ARRAY<FLOAT64>,
107				Date		DATE,
108				DateArray	ARRAY<DATE>,
109				Timestamp	TIMESTAMP,
110				TimestampArray	ARRAY<TIMESTAMP>,
111			) PRIMARY KEY (RowID)`,
112	}
113
114	readDBStatements = []string{
115		`CREATE TABLE TestTable (
116                    Key          STRING(MAX) NOT NULL,
117                    StringValue  STRING(MAX)
118            ) PRIMARY KEY (Key)`,
119		`CREATE INDEX TestTableByValue ON TestTable(StringValue)`,
120		`CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`,
121	}
122
123	simpleDBStatements = []string{
124		`CREATE TABLE test (
125				a	STRING(1024),
126				b	STRING(1024),
127			) PRIMARY KEY (a)`,
128	}
129	simpleDBTableColumns = []string{"a", "b"}
130
131	ctsDBStatements = []string{
132		`CREATE TABLE TestTable (
133		    Key  STRING(MAX) NOT NULL,
134		    Ts   TIMESTAMP OPTIONS (allow_commit_timestamp = true),
135	    ) PRIMARY KEY (Key)`,
136	}
137	backuDBStatements = []string{
138		`CREATE TABLE Singers (
139						SingerId	INT64 NOT NULL,
140						FirstName	STRING(1024),
141						LastName	STRING(1024),
142						SingerInfo	BYTES(MAX)
143					) PRIMARY KEY (SingerId)`,
144		`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
145		`CREATE TABLE Accounts (
146						AccountId	INT64 NOT NULL,
147						Nickname	STRING(100),
148						Balance		INT64 NOT NULL,
149					) PRIMARY KEY (AccountId)`,
150		`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
151		`CREATE TABLE Types (
152						RowID		INT64 NOT NULL,
153						String		STRING(MAX),
154						StringArray	ARRAY<STRING(MAX)>,
155						Bytes		BYTES(MAX),
156						BytesArray	ARRAY<BYTES(MAX)>,
157						Int64a		INT64,
158						Int64Array	ARRAY<INT64>,
159						Bool		BOOL,
160						BoolArray	ARRAY<BOOL>,
161						Float64		FLOAT64,
162						Float64Array	ARRAY<FLOAT64>,
163						Date		DATE,
164						DateArray	ARRAY<DATE>,
165						Timestamp	TIMESTAMP,
166						TimestampArray	ARRAY<TIMESTAMP>,
167					) PRIMARY KEY (RowID)`,
168	}
169
170	validInstancePattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)$")
171
172	blackholeDpv6Cmd string
173	blackholeDpv4Cmd string
174	allowDpv6Cmd     string
175	allowDpv4Cmd     string
176)
177
178func init() {
179	flag.BoolVar(&dpConfig.attemptDirectPath, "it.attempt-directpath", false, "DirectPath integration test flag")
180	flag.BoolVar(&dpConfig.directPathIPv4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a IPv4-only VM")
181	peerInfo = &peer.Peer{}
182	// Use sysctl or iptables to blackhole DirectPath IP for fallback tests.
183	flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6")
184	flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4")
185	flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6")
186	flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4")
187}
188
189type directPathTestConfig struct {
190	attemptDirectPath  bool
191	directPathIPv4Only bool
192}
193
194func parseInstanceName(inst string) (project, instance string, err error) {
195	matches := validInstancePattern.FindStringSubmatch(inst)
196	if len(matches) == 0 {
197		return "", "", fmt.Errorf("Failed to parse instance name from %q according to pattern %q",
198			inst, validInstancePattern.String())
199	}
200	return matches[1], matches[2], nil
201}
202
203func getSpannerHost() string {
204	return os.Getenv("GCLOUD_TESTS_GOLANG_SPANNER_HOST")
205}
206
207const (
208	str1 = "alice"
209	str2 = "a@example.com"
210)
211
212func TestMain(m *testing.M) {
213	cleanup := initIntegrationTests()
214	res := m.Run()
215	cleanup()
216	os.Exit(res)
217}
218
219var grpcHeaderChecker = testutil.DefaultHeadersEnforcer()
220
221func initIntegrationTests() (cleanup func()) {
222	ctx := context.Background()
223	flag.Parse() // Needed for testing.Short().
224	noop := func() {}
225
226	if testing.Short() {
227		log.Println("Integration tests skipped in -short mode.")
228		return noop
229	}
230
231	if testProjectID == "" {
232		log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing")
233		return noop
234	}
235
236	opts := grpcHeaderChecker.CallOptions()
237	if spannerHost != "" {
238		opts = append(opts, option.WithEndpoint(spannerHost))
239	}
240	if dpConfig.attemptDirectPath {
241		opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo))))
242	}
243	var err error
244	// Create InstanceAdmin and DatabaseAdmin clients.
245	instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...)
246	if err != nil {
247		log.Fatalf("cannot create instance databaseAdmin client: %v", err)
248	}
249	databaseAdmin, err = database.NewDatabaseAdminClient(ctx, opts...)
250	if err != nil {
251		log.Fatalf("cannot create databaseAdmin client: %v", err)
252	}
253	// Get the list of supported instance configs for the project that is used
254	// for the integration tests. The supported instance configs can differ per
255	// project. The integration tests will use the first instance config that
256	// is returned by Cloud Spanner. This will normally be the regional config
257	// that is physically the closest to where the request is coming from.
258	configIterator := instanceAdmin.ListInstanceConfigs(ctx, &instancepb.ListInstanceConfigsRequest{
259		Parent: fmt.Sprintf("projects/%s", testProjectID),
260	})
261	config, err := configIterator.Next()
262	if err != nil {
263		log.Fatalf("Cannot get any instance configurations.\nPlease make sure the Cloud Spanner API is enabled for the test project.\nGet error: %v", err)
264	}
265
266	// First clean up any old test instances before we start the actual testing
267	// as these might cause this test run to fail.
268	cleanupInstances()
269
270	// Create a test instance to use for this test run.
271	op, err := instanceAdmin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{
272		Parent:     fmt.Sprintf("projects/%s", testProjectID),
273		InstanceId: testInstanceID,
274		Instance: &instancepb.Instance{
275			Config:      config.Name,
276			DisplayName: testInstanceID,
277			NodeCount:   1,
278		},
279	})
280	if err != nil {
281		log.Fatalf("could not create instance with id %s: %v", fmt.Sprintf("projects/%s/instances/%s", testProjectID, testInstanceID), err)
282	}
283	// Wait for the instance creation to finish.
284	i, err := op.Wait(ctx)
285	if err != nil {
286		log.Fatalf("waiting for instance creation to finish failed: %v", err)
287	}
288	if i.State != instancepb.Instance_READY {
289		log.Printf("instance state is not READY, it might be that the test instance will cause problems during tests. Got state %v\n", i.State)
290	}
291
292	return func() {
293		// Delete this test instance.
294		instanceName := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID)
295		deleteInstanceAndBackups(ctx, instanceName)
296		// Delete other test instances that may be lingering around.
297		cleanupInstances()
298		databaseAdmin.Close()
299		instanceAdmin.Close()
300	}
301}
302
303func TestIntegration_InitSessionPool(t *testing.T) {
304	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
305	defer cancel()
306	// Set up an empty testing environment.
307	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, []string{})
308	defer cleanup()
309	sp := client.idleSessions
310	sp.mu.Lock()
311	want := sp.MinOpened
312	sp.mu.Unlock()
313	var numOpened int
314loop:
315	for {
316		select {
317		case <-ctx.Done():
318			t.Fatalf("timed out, got %d session(s), want %d", numOpened, want)
319		default:
320			sp.mu.Lock()
321			numOpened = sp.idleList.Len() + sp.idleWriteList.Len()
322			sp.mu.Unlock()
323			if uint64(numOpened) == want {
324				break loop
325			}
326		}
327	}
328	// Delete all sessions in the pool on the backend and then try to execute a
329	// simple query. The 'Session not found' error should cause an automatic
330	// retry of the read-only transaction.
331	sp.mu.Lock()
332	s := sp.idleList.Front()
333	for {
334		if s == nil {
335			break
336		}
337		// This will delete the session on the backend without removing it
338		// from the pool.
339		s.Value.(*session).delete(context.Background())
340		s = s.Next()
341	}
342	sp.mu.Unlock()
343	sql := "SELECT 1, 'FOO', 'BAR'"
344	tx := client.ReadOnlyTransaction()
345	defer tx.Close()
346	iter := tx.Query(context.Background(), NewStatement(sql))
347	rows, err := readAll(iter)
348	if err != nil {
349		t.Fatalf("Unexpected error for query %q: %v", sql, err)
350	}
351	if got, want := len(rows), 1; got != want {
352		t.Fatalf("Row count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
353	}
354	if got, want := len(rows[0]), 3; got != want {
355		t.Fatalf("Column count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
356	}
357	if got, want := rows[0][0].(int64), int64(1); got != want {
358		t.Fatalf("Column value mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
359	}
360}
361
362// Test SingleUse transaction.
363func TestIntegration_SingleUse(t *testing.T) {
364	t.Parallel()
365
366	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
367	defer cancel()
368	// Set up testing environment.
369	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
370	defer cleanup()
371
372	writes := []struct {
373		row []interface{}
374		ts  time.Time
375	}{
376		{row: []interface{}{1, "Marc", "Foo"}},
377		{row: []interface{}{2, "Tars", "Bar"}},
378		{row: []interface{}{3, "Alpha", "Beta"}},
379		{row: []interface{}{4, "Last", "End"}},
380	}
381	// Try to write four rows through the Apply API.
382	for i, w := range writes {
383		var err error
384		m := InsertOrUpdate("Singers",
385			[]string{"SingerId", "FirstName", "LastName"},
386			w.row)
387		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
388			t.Fatal(err)
389		}
390		verifyDirectPathRemoteAddress(t)
391	}
392	// Calculate time difference between Cloud Spanner server and localhost to
393	// use to determine the exact staleness value to use.
394	timeDiff := maxDuration(time.Now().Sub(writes[0].ts), 0)
395
396	// Test reading rows with different timestamp bounds.
397	for i, test := range []struct {
398		name    string
399		want    [][]interface{}
400		tb      TimestampBound
401		checkTs func(time.Time) error
402	}{
403		{
404			name: "strong",
405			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
406			tb:   StrongRead(),
407			checkTs: func(ts time.Time) error {
408				// writes[3] is the last write, all subsequent strong read
409				// should have a timestamp larger than that.
410				if ts.Before(writes[3].ts) {
411					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
412				}
413				return nil
414			},
415		},
416		{
417			name: "min_read_timestamp",
418			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
419			tb:   MinReadTimestamp(writes[3].ts),
420			checkTs: func(ts time.Time) error {
421				if ts.Before(writes[3].ts) {
422					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
423				}
424				return nil
425			},
426		},
427		{
428			name: "max_staleness",
429			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
430			tb:   MaxStaleness(time.Second),
431			checkTs: func(ts time.Time) error {
432				if ts.Before(writes[3].ts) {
433					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
434				}
435				return nil
436			},
437		},
438		{
439			name: "read_timestamp",
440			want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
441			tb:   ReadTimestamp(writes[2].ts),
442			checkTs: func(ts time.Time) error {
443				if ts != writes[2].ts {
444					return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts)
445				}
446				return nil
447			},
448		},
449		{
450			name: "exact_staleness",
451			want: nil,
452			// Specify a staleness which should be already before this test.
453			tb: ExactStaleness(time.Now().Sub(writes[0].ts) + timeDiff + 30*time.Second),
454			checkTs: func(ts time.Time) error {
455				if !ts.Before(writes[0].ts) {
456					return fmt.Errorf("read got timestamp %v, want it to be earlier than %v", ts, writes[0].ts)
457				}
458				return nil
459			},
460		},
461	} {
462		t.Run(test.name, func(t *testing.T) {
463			// SingleUse.Query
464			su := client.Single().WithTimestampBound(test.tb)
465			got, err := readAll(su.Query(
466				ctx,
467				Statement{
468					"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4) ORDER BY SingerId",
469					map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
470				}))
471			if err != nil {
472				t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err)
473			}
474			verifyDirectPathRemoteAddress(t)
475			rts, err := su.Timestamp()
476			if err != nil {
477				t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err)
478			}
479			if err := test.checkTs(rts); err != nil {
480				t.Fatalf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err)
481			}
482			if !testEqual(got, test.want) {
483				t.Fatalf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want)
484			}
485			// SingleUse.Read
486			su = client.Single().WithTimestampBound(test.tb)
487			got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
488			if err != nil {
489				t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err)
490			}
491			verifyDirectPathRemoteAddress(t)
492			rts, err = su.Timestamp()
493			if err != nil {
494				t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err)
495			}
496			if err := test.checkTs(rts); err != nil {
497				t.Fatalf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err)
498			}
499			if !testEqual(got, test.want) {
500				t.Fatalf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want)
501			}
502			// SingleUse.ReadRow
503			got = nil
504			for _, k := range []Key{{1}, {3}, {4}} {
505				su = client.Single().WithTimestampBound(test.tb)
506				r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
507				if err != nil {
508					continue
509				}
510				verifyDirectPathRemoteAddress(t)
511				v, err := rowToValues(r)
512				if err != nil {
513					continue
514				}
515				got = append(got, v)
516				rts, err = su.Timestamp()
517				if err != nil {
518					t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
519				}
520				if err := test.checkTs(rts); err != nil {
521					t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
522				}
523			}
524			if !testEqual(got, test.want) {
525				t.Fatalf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want)
526			}
527			// SingleUse.ReadUsingIndex
528			su = client.Single().WithTimestampBound(test.tb)
529			got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
530			if err != nil {
531				t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err)
532			}
533			verifyDirectPathRemoteAddress(t)
534			// The results from ReadUsingIndex is sorted by the index rather than primary key.
535			if len(got) != len(test.want) {
536				t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
537			}
538			for j, g := range got {
539				if j > 0 {
540					prev := got[j-1][1].(string) + got[j-1][2].(string)
541					curr := got[j][1].(string) + got[j][2].(string)
542					if strings.Compare(prev, curr) > 0 {
543						t.Fatalf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
544					}
545				}
546				found := false
547				for _, w := range test.want {
548					if testEqual(g, w) {
549						found = true
550					}
551				}
552				if !found {
553					t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
554					break
555				}
556			}
557			rts, err = su.Timestamp()
558			if err != nil {
559				t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
560			}
561			if err := test.checkTs(rts); err != nil {
562				t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
563			}
564			// SingleUse.ReadRowUsingIndex
565			got = nil
566			for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} {
567				su = client.Single().WithTimestampBound(test.tb)
568				r, err := su.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"})
569				if err != nil {
570					continue
571				}
572				verifyDirectPathRemoteAddress(t)
573				v, err := rowToValues(r)
574				if err != nil {
575					continue
576				}
577				got = append(got, v)
578				rts, err = su.Timestamp()
579				if err != nil {
580					t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err)
581				}
582				if err := test.checkTs(rts); err != nil {
583					t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err)
584				}
585			}
586			if !testEqual(got, test.want) {
587				t.Fatalf("%d: got unexpected results from SingleUse.ReadRowUsingIndex: %v, want %v", i, got, test.want)
588			}
589		})
590	}
591}
592
593// Test custom query options provided on query-level configuration.
594func TestIntegration_SingleUse_WithQueryOptions(t *testing.T) {
595	skipEmulatorTest(t)
596	t.Parallel()
597
598	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
599	defer cancel()
600	// Set up testing environment.
601	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
602	defer cleanup()
603
604	writes := []struct {
605		row []interface{}
606		ts  time.Time
607	}{
608		{row: []interface{}{1, "Marc", "Foo"}},
609		{row: []interface{}{2, "Tars", "Bar"}},
610		{row: []interface{}{3, "Alpha", "Beta"}},
611		{row: []interface{}{4, "Last", "End"}},
612	}
613	// Try to write four rows through the Apply API.
614	for i, w := range writes {
615		var err error
616		m := InsertOrUpdate("Singers",
617			[]string{"SingerId", "FirstName", "LastName"},
618			w.row)
619		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
620			t.Fatal(err)
621		}
622	}
623	qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}}
624	got, err := readAll(client.Single().QueryWithOptions(ctx, Statement{
625		"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
626		map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
627	}, qo))
628
629	if err != nil {
630		t.Errorf("ReadOnlyTransaction.QueryWithOptions returns error %v, want nil", err)
631	}
632
633	want := [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}
634	if !testEqual(got, want) {
635		t.Errorf("got unexpected result from ReadOnlyTransaction.QueryWithOptions: %v, want %v", got, want)
636	}
637}
638
639func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) {
640	t.Parallel()
641
642	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
643	defer cancel()
644	// Set up testing environment.
645	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
646	defer cleanup()
647
648	writes := []struct {
649		row []interface{}
650		ts  time.Time
651	}{
652		{row: []interface{}{1, "Marc", "Foo"}},
653		{row: []interface{}{2, "Tars", "Bar"}},
654		{row: []interface{}{3, "Alpha", "Beta"}},
655		{row: []interface{}{4, "Last", "End"}},
656	}
657	// Try to write four rows through the Apply API.
658	for i, w := range writes {
659		var err error
660		m := InsertOrUpdate("Singers",
661			[]string{"SingerId", "FirstName", "LastName"},
662			w.row)
663		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
664			t.Fatal(err)
665		}
666	}
667
668	su := client.Single()
669	const limit = 1
670	gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}),
671		[]string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit}))
672	if err != nil {
673		t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err)
674	}
675	if got, want := len(gotRows), limit; got != want {
676		t.Errorf("got %d, want %d", got, want)
677	}
678}
679
680// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it
681// also tests for a single timestamp across multiple reads.
682func TestIntegration_ReadOnlyTransaction(t *testing.T) {
683	t.Parallel()
684
685	ctxTimeout := 5 * time.Minute
686	ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
687	defer cancel()
688	// Set up testing environment.
689	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
690	defer cleanup()
691
692	writes := []struct {
693		row []interface{}
694		ts  time.Time
695	}{
696		{row: []interface{}{1, "Marc", "Foo"}},
697		{row: []interface{}{2, "Tars", "Bar"}},
698		{row: []interface{}{3, "Alpha", "Beta"}},
699		{row: []interface{}{4, "Last", "End"}},
700	}
701	// Try to write four rows through the Apply API.
702	for i, w := range writes {
703		var err error
704		m := InsertOrUpdate("Singers",
705			[]string{"SingerId", "FirstName", "LastName"},
706			w.row)
707		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
708			t.Fatal(err)
709		}
710	}
711
712	// For testing timestamp bound staleness.
713	<-time.After(time.Second)
714
715	// Test reading rows with different timestamp bounds.
716	for i, test := range []struct {
717		want    [][]interface{}
718		tb      TimestampBound
719		checkTs func(time.Time) error
720	}{
721		// Note: min_read_timestamp and max_staleness are not supported by
722		// ReadOnlyTransaction. See API document for more details.
723		{
724			// strong
725			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
726			StrongRead(),
727			func(ts time.Time) error {
728				if ts.Before(writes[3].ts) {
729					return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
730				}
731				return nil
732			},
733		},
734		{
735			// read_timestamp
736			[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
737			ReadTimestamp(writes[2].ts),
738			func(ts time.Time) error {
739				if ts != writes[2].ts {
740					return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts)
741				}
742				return nil
743			},
744		},
745		{
746			// exact_staleness
747			nil,
748			// Specify a staleness which should be already before this test.
749			ExactStaleness(ctxTimeout + 1),
750			func(ts time.Time) error {
751				if ts.After(writes[0].ts) {
752					return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts)
753				}
754				return nil
755			},
756		},
757	} {
758		// ReadOnlyTransaction.Query
759		ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb)
760		got, err := readAll(ro.Query(
761			ctx,
762			Statement{
763				"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4) ORDER BY SingerId",
764				map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
765			}))
766		if err != nil {
767			t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err)
768		}
769		if !testEqual(got, test.want) {
770			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want)
771		}
772		rts, err := ro.Timestamp()
773		if err != nil {
774			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err)
775		}
776		if err := test.checkTs(rts); err != nil {
777			t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err)
778		}
779		roTs := rts
780		// ReadOnlyTransaction.Read
781		got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
782		if err != nil {
783			t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err)
784		}
785		if !testEqual(got, test.want) {
786			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want)
787		}
788		rts, err = ro.Timestamp()
789		if err != nil {
790			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err)
791		}
792		if err := test.checkTs(rts); err != nil {
793			t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err)
794		}
795		if roTs != rts {
796			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
797		}
798		// ReadOnlyTransaction.ReadRow
799		got = nil
800		for _, k := range []Key{{1}, {3}, {4}} {
801			r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
802			if err != nil {
803				continue
804			}
805			v, err := rowToValues(r)
806			if err != nil {
807				continue
808			}
809			got = append(got, v)
810			rts, err = ro.Timestamp()
811			if err != nil {
812				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
813			}
814			if err := test.checkTs(rts); err != nil {
815				t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
816			}
817			if roTs != rts {
818				t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
819			}
820		}
821		if !testEqual(got, test.want) {
822			t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want)
823		}
824		// SingleUse.ReadUsingIndex
825		got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
826		if err != nil {
827			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err)
828		}
829		// The results from ReadUsingIndex is sorted by the index rather than
830		// primary key.
831		if len(got) != len(test.want) {
832			t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
833		}
834		for j, g := range got {
835			if j > 0 {
836				prev := got[j-1][1].(string) + got[j-1][2].(string)
837				curr := got[j][1].(string) + got[j][2].(string)
838				if strings.Compare(prev, curr) > 0 {
839					t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
840				}
841			}
842			found := false
843			for _, w := range test.want {
844				if testEqual(g, w) {
845					found = true
846				}
847			}
848			if !found {
849				t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
850				break
851			}
852		}
853		rts, err = ro.Timestamp()
854		if err != nil {
855			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
856		}
857		if err := test.checkTs(rts); err != nil {
858			t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
859		}
860		if roTs != rts {
861			t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
862		}
863		// ReadOnlyTransaction.ReadRowUsingIndex
864		got = nil
865		for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} {
866			r, err := ro.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"})
867			if err != nil {
868				continue
869			}
870			v, err := rowToValues(r)
871			if err != nil {
872				continue
873			}
874			got = append(got, v)
875			rts, err = ro.Timestamp()
876			if err != nil {
877				t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err)
878			}
879			if err := test.checkTs(rts); err != nil {
880				t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err)
881			}
882			if roTs != rts {
883				t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
884			}
885		}
886		if !testEqual(got, test.want) {
887			t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRowUsingIndex: %v, want %v", i, got, test.want)
888		}
889		ro.Close()
890	}
891}
892
893// Test ReadOnlyTransaction with different timestamp bound when there's an
894// update at the same time.
895func TestIntegration_UpdateDuringRead(t *testing.T) {
896	t.Parallel()
897
898	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
899	defer cancel()
900	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
901	defer cleanup()
902
903	for i, tb := range []TimestampBound{
904		StrongRead(),
905		ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour
906		ExactStaleness(time.Minute * 30),
907	} {
908		ro := client.ReadOnlyTransaction().WithTimestampBound(tb)
909		_, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
910		if ErrCode(err) != codes.NotFound {
911			t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err)
912		}
913
914		m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i})
915		if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
916			t.Fatal(err)
917		}
918
919		_, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
920		if ErrCode(err) != codes.NotFound {
921			t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err)
922		}
923	}
924}
925
926// Test ReadWriteTransaction.
927func TestIntegration_ReadWriteTransaction(t *testing.T) {
928	t.Parallel()
929
930	// Give a longer deadline because of transaction backoffs.
931	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
932	defer cancel()
933	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
934	defer cleanup()
935
936	// Set up two accounts
937	accounts := []*Mutation{
938		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
939		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
940	}
941	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
942		t.Fatal(err)
943	}
944	wg := sync.WaitGroup{}
945
946	readBalance := func(iter *RowIterator) (int64, error) {
947		defer iter.Stop()
948		var bal int64
949		for {
950			row, err := iter.Next()
951			if err == iterator.Done {
952				return bal, nil
953			}
954			if err != nil {
955				return 0, err
956			}
957			if err := row.Column(0, &bal); err != nil {
958				return 0, err
959			}
960		}
961	}
962
963	for i := 0; i < 20; i++ {
964		wg.Add(1)
965		go func(iter int) {
966			defer wg.Done()
967			_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
968				// Query Foo's balance and Bar's balance.
969				bf, e := readBalance(tx.Query(ctx,
970					Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}}))
971				if e != nil {
972					return e
973				}
974				verifyDirectPathRemoteAddress(t)
975				bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
976				if e != nil {
977					return e
978				}
979				verifyDirectPathRemoteAddress(t)
980				if bf <= 0 {
981					return nil
982				}
983				bf--
984				bb++
985				return tx.BufferWrite([]*Mutation{
986					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}),
987					Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}),
988				})
989			})
990			if err != nil {
991				t.Errorf("%d: failed to execute transaction: %v", iter, err)
992			}
993			verifyDirectPathRemoteAddress(t)
994		}(i)
995	}
996	// Because of context timeout, all goroutines will eventually return.
997	wg.Wait()
998	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
999		var bf, bb int64
1000		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"})
1001		if e != nil {
1002			return e
1003		}
1004		verifyDirectPathRemoteAddress(t)
1005		if ce := r.Column(0, &bf); ce != nil {
1006			return ce
1007		}
1008		bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"}))
1009		if e != nil {
1010			return e
1011		}
1012		verifyDirectPathRemoteAddress(t)
1013		if bf != 30 || bb != 21 {
1014			t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21)
1015		}
1016		return nil
1017	})
1018	if err != nil {
1019		t.Errorf("failed to check balances: %v", err)
1020	}
1021	verifyDirectPathRemoteAddress(t)
1022}
1023
1024// Test ReadWriteTransactionWithOptions.
1025func TestIntegration_ReadWriteTransactionWithOptions(t *testing.T) {
1026	t.Parallel()
1027	skipEmulatorTest(t)
1028
1029	// Give a longer deadline because of transaction backoffs.
1030	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1031	defer cancel()
1032	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1033	defer cleanup()
1034
1035	// Set up two accounts
1036	accounts := []*Mutation{
1037		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1038		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1039	}
1040	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1041		t.Fatal(err)
1042	}
1043
1044	readBalance := func(iter *RowIterator) (int64, error) {
1045		defer iter.Stop()
1046		var bal int64
1047		for {
1048			row, err := iter.Next()
1049			if err == iterator.Done {
1050				return bal, nil
1051			}
1052			if err != nil {
1053				return 0, err
1054			}
1055			if err := row.Column(0, &bal); err != nil {
1056				return 0, err
1057			}
1058		}
1059	}
1060
1061	txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}
1062	resp, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1063		// Query Foo's balance and Bar's balance.
1064		bf, e := readBalance(tx.Query(ctx,
1065			Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}}))
1066		if e != nil {
1067			return e
1068		}
1069		bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
1070		if e != nil {
1071			return e
1072		}
1073		if bf <= 0 {
1074			return nil
1075		}
1076		bf--
1077		bb++
1078		return tx.BufferWrite([]*Mutation{
1079			Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}),
1080			Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}),
1081		})
1082	}, txOpts)
1083	if err != nil {
1084		t.Fatalf("Failed to execute transaction: %v", err)
1085	}
1086	if resp.CommitStats == nil {
1087		t.Fatal("Missing commit stats in commit response")
1088	}
1089	if got, want := resp.CommitStats.MutationCount, int64(8); got != want {
1090		t.Errorf("Mismatch mutation count - got: %v, want: %v", got, want)
1091	}
1092}
1093
1094func TestIntegration_ReadWriteTransaction_StatementBased(t *testing.T) {
1095	t.Parallel()
1096	skipEmulatorTest(t)
1097
1098	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1099	defer cancel()
1100	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1101	defer cleanup()
1102
1103	// Set up two accounts
1104	accounts := []*Mutation{
1105		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1106		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1107	}
1108	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1109		t.Fatal(err)
1110	}
1111
1112	getBalance := func(txn *ReadWriteStmtBasedTransaction, key Key) (int64, error) {
1113		row, err := txn.ReadRow(ctx, "Accounts", key, []string{"Balance"})
1114		if err != nil {
1115			return 0, err
1116		}
1117		var balance int64
1118		if err := row.Column(0, &balance); err != nil {
1119			return 0, err
1120		}
1121		return balance, nil
1122	}
1123
1124	statements := func(txn *ReadWriteStmtBasedTransaction) error {
1125		outBalance, err := getBalance(txn, Key{1})
1126		if err != nil {
1127			return err
1128		}
1129		const transferAmt = 20
1130		if outBalance >= transferAmt {
1131			inBalance, err := getBalance(txn, Key{2})
1132			if err != nil {
1133				return err
1134			}
1135			inBalance += transferAmt
1136			outBalance -= transferAmt
1137			cols := []string{"AccountId", "Balance"}
1138			txn.BufferWrite([]*Mutation{
1139				Update("Accounts", cols, []interface{}{1, outBalance}),
1140				Update("Accounts", cols, []interface{}{2, inBalance}),
1141			})
1142		}
1143		return nil
1144	}
1145
1146	for {
1147		tx, err := NewReadWriteStmtBasedTransaction(ctx, client)
1148		if err != nil {
1149			t.Fatalf("failed to begin a transaction: %v", err)
1150		}
1151		err = statements(tx)
1152		if err != nil && status.Code(err) != codes.Aborted {
1153			tx.Rollback(ctx)
1154			t.Fatalf("failed to execute statements: %v", err)
1155		} else if err == nil {
1156			_, err = tx.Commit(ctx)
1157			if err == nil {
1158				break
1159			} else if status.Code(err) != codes.Aborted {
1160				t.Fatalf("failed to commit a transaction: %v", err)
1161			}
1162		}
1163		// Set a default sleep time if the server delay is absent.
1164		delay := 10 * time.Millisecond
1165		if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay {
1166			delay = serverDelay
1167		}
1168		time.Sleep(delay)
1169	}
1170
1171	// Query the updated values.
1172	stmt := Statement{SQL: `SELECT AccountId, Nickname, Balance FROM Accounts ORDER BY AccountId`}
1173	iter := client.Single().Query(ctx, stmt)
1174	got, err := readAllAccountsTable(iter)
1175	if err != nil {
1176		t.Fatalf("failed to read data: %v", err)
1177	}
1178	want := [][]interface{}{
1179		{int64(1), "Foo", int64(30)},
1180		{int64(2), "Bar", int64(21)},
1181	}
1182	if !testEqual(got, want) {
1183		t.Errorf("\ngot %v\nwant%v", got, want)
1184	}
1185}
1186
1187func TestIntegration_ReadWriteTransaction_StatementBasedWithOptions(t *testing.T) {
1188	t.Parallel()
1189	skipEmulatorTest(t)
1190
1191	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1192	defer cancel()
1193	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1194	defer cleanup()
1195
1196	// Set up two accounts
1197	accounts := []*Mutation{
1198		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1199		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1200	}
1201	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1202		t.Fatal(err)
1203	}
1204
1205	getBalance := func(txn *ReadWriteStmtBasedTransaction, key Key) (int64, error) {
1206		row, err := txn.ReadRow(ctx, "Accounts", key, []string{"Balance"})
1207		if err != nil {
1208			return 0, err
1209		}
1210		var balance int64
1211		if err := row.Column(0, &balance); err != nil {
1212			return 0, err
1213		}
1214		return balance, nil
1215	}
1216
1217	statements := func(txn *ReadWriteStmtBasedTransaction) error {
1218		outBalance, err := getBalance(txn, Key{1})
1219		if err != nil {
1220			return err
1221		}
1222		const transferAmt = 20
1223		if outBalance >= transferAmt {
1224			inBalance, err := getBalance(txn, Key{2})
1225			if err != nil {
1226				return err
1227			}
1228			inBalance += transferAmt
1229			outBalance -= transferAmt
1230			cols := []string{"AccountId", "Balance"}
1231			txn.BufferWrite([]*Mutation{
1232				Update("Accounts", cols, []interface{}{1, outBalance}),
1233				Update("Accounts", cols, []interface{}{2, inBalance}),
1234			})
1235		}
1236		return nil
1237	}
1238
1239	var resp CommitResponse
1240	txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}
1241	for {
1242		tx, err := NewReadWriteStmtBasedTransactionWithOptions(ctx, client, txOpts)
1243		if err != nil {
1244			t.Fatalf("failed to begin a transaction: %v", err)
1245		}
1246		err = statements(tx)
1247		if err != nil && status.Code(err) != codes.Aborted {
1248			tx.Rollback(ctx)
1249			t.Fatalf("failed to execute statements: %v", err)
1250		} else if err == nil {
1251			resp, err = tx.CommitWithReturnResp(ctx)
1252			if err == nil {
1253				break
1254			} else if status.Code(err) != codes.Aborted {
1255				t.Fatalf("failed to commit a transaction: %v", err)
1256			}
1257		}
1258		// Set a default sleep time if the server delay is absent.
1259		delay := 10 * time.Millisecond
1260		if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay {
1261			delay = serverDelay
1262		}
1263		time.Sleep(delay)
1264	}
1265	if resp.CommitStats == nil {
1266		t.Fatal("Missing commit stats in commit response")
1267	}
1268	if got, want := resp.CommitStats.MutationCount, int64(8); got != want {
1269		t.Errorf("Mismatch mutation count - got: %v, want: %v", got, want)
1270	}
1271}
1272
1273func TestIntegration_Reads(t *testing.T) {
1274	t.Parallel()
1275
1276	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1277	defer cancel()
1278	// Set up testing environment.
1279	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
1280	defer cleanup()
1281
1282	// Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2".
1283	var ms []*Mutation
1284	for i := 0; i < 15; i++ {
1285		ms = append(ms, InsertOrUpdate(testTable,
1286			testTableColumns,
1287			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
1288	}
1289	// Don't use ApplyAtLeastOnce, so we can test the other code path.
1290	if _, err := client.Apply(ctx, ms); err != nil {
1291		t.Fatal(err)
1292	}
1293	verifyDirectPathRemoteAddress(t)
1294
1295	// Empty read.
1296	rows, err := readAllTestTable(client.Single().Read(ctx, testTable,
1297		KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns))
1298	if err != nil {
1299		t.Fatal(err)
1300	}
1301	verifyDirectPathRemoteAddress(t)
1302	if got, want := len(rows), 0; got != want {
1303		t.Errorf("got %d, want %d", got, want)
1304	}
1305
1306	// Index empty read.
1307	rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex,
1308		KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns))
1309	if err != nil {
1310		t.Fatal(err)
1311	}
1312	verifyDirectPathRemoteAddress(t)
1313	if got, want := len(rows), 0; got != want {
1314		t.Errorf("got %d, want %d", got, want)
1315	}
1316
1317	// Point read.
1318	row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns)
1319	if err != nil {
1320		t.Fatal(err)
1321	}
1322	verifyDirectPathRemoteAddress(t)
1323	var got testTableRow
1324	if err := row.ToStruct(&got); err != nil {
1325		t.Fatal(err)
1326	}
1327	if want := (testTableRow{"k1", "v1"}); got != want {
1328		t.Errorf("got %v, want %v", got, want)
1329	}
1330
1331	// Point read not found.
1332	_, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns)
1333	if ErrCode(err) != codes.NotFound {
1334		t.Fatalf("got %v, want NotFound", err)
1335	}
1336	verifyDirectPathRemoteAddress(t)
1337
1338	// Index point read.
1339	rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns)
1340	if err != nil {
1341		t.Fatal(err)
1342	}
1343	verifyDirectPathRemoteAddress(t)
1344	var gotIndex testTableRow
1345	if err := rowIndex.ToStruct(&gotIndex); err != nil {
1346		t.Fatal(err)
1347	}
1348	if wantIndex := (testTableRow{"k1", "v1"}); gotIndex != wantIndex {
1349		t.Errorf("got %v, want %v", gotIndex, wantIndex)
1350	}
1351	// Index point read not found.
1352	_, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v999"}, testTableColumns)
1353	if ErrCode(err) != codes.NotFound {
1354		t.Fatalf("got %v, want NotFound", err)
1355	}
1356	verifyDirectPathRemoteAddress(t)
1357	rangeReads(ctx, t, client)
1358	indexRangeReads(ctx, t, client)
1359}
1360
1361func TestIntegration_EarlyTimestamp(t *testing.T) {
1362	t.Parallel()
1363
1364	// Test that we can get the timestamp from a read-only transaction as
1365	// soon as we have read at least one row.
1366	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1367	defer cancel()
1368	// Set up testing environment.
1369	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
1370	defer cleanup()
1371
1372	var ms []*Mutation
1373	for i := 0; i < 3; i++ {
1374		ms = append(ms, InsertOrUpdate(testTable,
1375			testTableColumns,
1376			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
1377	}
1378	if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
1379		t.Fatal(err)
1380	}
1381
1382	txn := client.Single()
1383	iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns)
1384	defer iter.Stop()
1385	// In single-use transaction, we should get an error before reading anything.
1386	if _, err := txn.Timestamp(); err == nil {
1387		t.Error("wanted error, got nil")
1388	}
1389	// After reading one row, the timestamp should be available.
1390	_, err := iter.Next()
1391	if err != nil {
1392		t.Fatal(err)
1393	}
1394	if _, err := txn.Timestamp(); err != nil {
1395		t.Errorf("got %v, want nil", err)
1396	}
1397
1398	txn = client.ReadOnlyTransaction()
1399	defer txn.Close()
1400	iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns)
1401	defer iter.Stop()
1402	// In an ordinary read-only transaction, the timestamp should be
1403	// available immediately.
1404	if _, err := txn.Timestamp(); err != nil {
1405		t.Errorf("got %v, want nil", err)
1406	}
1407}
1408
1409func TestIntegration_NestedTransaction(t *testing.T) {
1410	t.Parallel()
1411
1412	// You cannot use a transaction from inside a read-write transaction.
1413	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1414	defer cancel()
1415	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1416	defer cleanup()
1417
1418	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
1419		_, err := client.ReadWriteTransaction(ctx,
1420			func(context.Context, *ReadWriteTransaction) error { return nil })
1421		if ErrCode(err) != codes.FailedPrecondition {
1422			t.Fatalf("got %v, want FailedPrecondition", err)
1423		}
1424		_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
1425		if ErrCode(err) != codes.FailedPrecondition {
1426			t.Fatalf("got %v, want FailedPrecondition", err)
1427		}
1428		rot := client.ReadOnlyTransaction()
1429		defer rot.Close()
1430		_, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
1431		if ErrCode(err) != codes.FailedPrecondition {
1432			t.Fatalf("got %v, want FailedPrecondition", err)
1433		}
1434		return nil
1435	})
1436	if err != nil {
1437		t.Fatal(err)
1438	}
1439}
1440
1441func TestIntegration_CreateDBRetry(t *testing.T) {
1442	t.Parallel()
1443
1444	if databaseAdmin == nil {
1445		t.Skip("Integration tests skipped")
1446	}
1447	skipEmulatorTest(t)
1448
1449	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1450	defer cancel()
1451	dbName := dbNameSpace.New()
1452
1453	// Simulate an Unavailable error on the CreateDatabase RPC.
1454	hasReturnedUnavailable := false
1455	interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1456		err := invoker(ctx, method, req, reply, cc, opts...)
1457		if !hasReturnedUnavailable && err == nil {
1458			hasReturnedUnavailable = true
1459			return status.Error(codes.Unavailable, "Injected unavailable error")
1460		}
1461		return err
1462	}
1463
1464	dbAdmin, err := database.NewDatabaseAdminClient(ctx, option.WithGRPCDialOption(grpc.WithUnaryInterceptor(interceptor)))
1465	if err != nil {
1466		log.Fatalf("cannot create dbAdmin client: %v", err)
1467	}
1468	op, err := dbAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{
1469		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
1470		CreateStatement: "CREATE DATABASE " + dbName,
1471	})
1472	if err != nil {
1473		t.Fatalf("failed to create database: %v", err)
1474	}
1475	_, err = op.Wait(ctx)
1476	if err != nil {
1477		t.Fatalf("failed to create database: %v", err)
1478	}
1479	if !hasReturnedUnavailable {
1480		t.Fatalf("interceptor did not return Unavailable")
1481	}
1482}
1483
1484// Test client recovery on database recreation.
1485func TestIntegration_DbRemovalRecovery(t *testing.T) {
1486	t.Parallel()
1487
1488	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1489	defer cancel()
1490	// Create a client with MinOpened=0 to prevent the session pool maintainer
1491	// from repeatedly trying to create sessions for the invalid database.
1492	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, SessionPoolConfig{}, singerDBStatements)
1493	defer cleanup()
1494
1495	// Drop the testing database.
1496	if err := databaseAdmin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil {
1497		t.Fatalf("failed to drop testing database %v: %v", dbPath, err)
1498	}
1499
1500	// Now, send the query.
1501	iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
1502	defer iter.Stop()
1503	if _, err := iter.Next(); err == nil {
1504		t.Errorf("client sends query to removed database successfully, want it to fail")
1505	}
1506	verifyDirectPathRemoteAddress(t)
1507
1508	// Recreate database and table.
1509	dbName := dbPath[strings.LastIndex(dbPath, "/")+1:]
1510	op, err := databaseAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{
1511		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
1512		CreateStatement: "CREATE DATABASE " + dbName,
1513		ExtraStatements: []string{
1514			`CREATE TABLE Singers (
1515				SingerId        INT64 NOT NULL,
1516				FirstName       STRING(1024),
1517				LastName        STRING(1024),
1518				SingerInfo      BYTES(MAX)
1519			) PRIMARY KEY (SingerId)`,
1520		},
1521	})
1522	if err != nil {
1523		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
1524	}
1525	if _, err := op.Wait(ctx); err != nil {
1526		t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
1527	}
1528
1529	// Now, send the query again.
1530	iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
1531	defer iter.Stop()
1532	_, err = iter.Next()
1533	if err != nil && err != iterator.Done {
1534		t.Errorf("failed to send query to database %v: %v", dbPath, err)
1535	}
1536	verifyDirectPathRemoteAddress(t)
1537}
1538
1539// Test encoding/decoding non-struct Cloud Spanner types.
1540func TestIntegration_BasicTypes(t *testing.T) {
1541	t.Parallel()
1542
1543	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1544	defer cancel()
1545	stmts := singerDBStatements
1546	if !isEmulatorEnvSet() {
1547		stmts = []string{
1548			`CREATE TABLE Singers (
1549					SingerId	INT64 NOT NULL,
1550					FirstName	STRING(1024),
1551					LastName	STRING(1024),
1552					SingerInfo	BYTES(MAX)
1553				) PRIMARY KEY (SingerId)`,
1554			`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
1555			`CREATE TABLE Accounts (
1556					AccountId	INT64 NOT NULL,
1557					Nickname	STRING(100),
1558					Balance		INT64 NOT NULL,
1559				) PRIMARY KEY (AccountId)`,
1560			`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
1561			`CREATE TABLE Types (
1562					RowID		INT64 NOT NULL,
1563					String		STRING(MAX),
1564					StringArray	ARRAY<STRING(MAX)>,
1565					Bytes		BYTES(MAX),
1566					BytesArray	ARRAY<BYTES(MAX)>,
1567					Int64a		INT64,
1568					Int64Array	ARRAY<INT64>,
1569					Bool		BOOL,
1570					BoolArray	ARRAY<BOOL>,
1571					Float64		FLOAT64,
1572					Float64Array	ARRAY<FLOAT64>,
1573					Date		DATE,
1574					DateArray	ARRAY<DATE>,
1575					Timestamp	TIMESTAMP,
1576					TimestampArray	ARRAY<TIMESTAMP>,
1577					Numeric		NUMERIC,
1578					NumericArray	ARRAY<NUMERIC>
1579				) PRIMARY KEY (RowID)`,
1580		}
1581	}
1582	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, stmts)
1583	defer cleanup()
1584
1585	t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z")
1586	// Boundaries
1587	t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z")
1588	t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z")
1589	d1, _ := civil.ParseDate("2016-11-15")
1590	// Boundaries
1591	d2, _ := civil.ParseDate("0001-01-01")
1592	d3, _ := civil.ParseDate("9999-12-31")
1593
1594	n0 := big.Rat{}
1595	n1p, _ := (&big.Rat{}).SetString("123456789")
1596	n2p, _ := (&big.Rat{}).SetString("123456789/1000000000")
1597	n1 := *n1p
1598	n2 := *n2p
1599
1600	tests := []struct {
1601		col  string
1602		val  interface{}
1603		want interface{}
1604	}{
1605		{col: "String", val: ""},
1606		{col: "String", val: "", want: NullString{"", true}},
1607		{col: "String", val: "foo"},
1608		{col: "String", val: "foo", want: NullString{"foo", true}},
1609		{col: "String", val: NullString{"bar", true}, want: "bar"},
1610		{col: "String", val: NullString{"bar", false}, want: NullString{"", false}},
1611		{col: "String", val: nil, want: NullString{}},
1612		{col: "StringArray", val: []string(nil), want: []NullString(nil)},
1613		{col: "StringArray", val: []string{}, want: []NullString{}},
1614		{col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}},
1615		{col: "StringArray", val: []NullString(nil)},
1616		{col: "StringArray", val: []NullString{}},
1617		{col: "StringArray", val: []NullString{{"foo", true}, {}}},
1618		{col: "Bytes", val: []byte{}},
1619		{col: "Bytes", val: []byte{1, 2, 3}},
1620		{col: "Bytes", val: []byte(nil)},
1621		{col: "BytesArray", val: [][]byte(nil)},
1622		{col: "BytesArray", val: [][]byte{}},
1623		{col: "BytesArray", val: [][]byte{{1}, {2, 3}}},
1624		{col: "Int64a", val: 0, want: int64(0)},
1625		{col: "Int64a", val: -1, want: int64(-1)},
1626		{col: "Int64a", val: 2, want: int64(2)},
1627		{col: "Int64a", val: int64(3)},
1628		{col: "Int64a", val: 4, want: NullInt64{4, true}},
1629		{col: "Int64a", val: NullInt64{5, true}, want: int64(5)},
1630		{col: "Int64a", val: NullInt64{6, true}, want: int64(6)},
1631		{col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}},
1632		{col: "Int64a", val: nil, want: NullInt64{}},
1633		{col: "Int64Array", val: []int(nil), want: []NullInt64(nil)},
1634		{col: "Int64Array", val: []int{}, want: []NullInt64{}},
1635		{col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
1636		{col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)},
1637		{col: "Int64Array", val: []int64{}, want: []NullInt64{}},
1638		{col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}},
1639		{col: "Int64Array", val: []NullInt64(nil)},
1640		{col: "Int64Array", val: []NullInt64{}},
1641		{col: "Int64Array", val: []NullInt64{{1, true}, {}}},
1642		{col: "Bool", val: false},
1643		{col: "Bool", val: true},
1644		{col: "Bool", val: false, want: NullBool{false, true}},
1645		{col: "Bool", val: true, want: NullBool{true, true}},
1646		{col: "Bool", val: NullBool{true, true}},
1647		{col: "Bool", val: NullBool{false, false}},
1648		{col: "Bool", val: nil, want: NullBool{}},
1649		{col: "BoolArray", val: []bool(nil), want: []NullBool(nil)},
1650		{col: "BoolArray", val: []bool{}, want: []NullBool{}},
1651		{col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}},
1652		{col: "BoolArray", val: []NullBool(nil)},
1653		{col: "BoolArray", val: []NullBool{}},
1654		{col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}},
1655		{col: "Float64", val: 0.0},
1656		{col: "Float64", val: 3.14},
1657		{col: "Float64", val: math.NaN()},
1658		{col: "Float64", val: math.Inf(1)},
1659		{col: "Float64", val: math.Inf(-1)},
1660		{col: "Float64", val: 2.78, want: NullFloat64{2.78, true}},
1661		{col: "Float64", val: NullFloat64{2.71, true}, want: 2.71},
1662		{col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}},
1663		{col: "Float64", val: NullFloat64{0, false}},
1664		{col: "Float64", val: nil, want: NullFloat64{}},
1665		{col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)},
1666		{col: "Float64Array", val: []float64{}, want: []NullFloat64{}},
1667		{col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}},
1668		{col: "Float64Array", val: []NullFloat64(nil)},
1669		{col: "Float64Array", val: []NullFloat64{}},
1670		{col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}},
1671		{col: "Date", val: d1},
1672		{col: "Date", val: d1, want: NullDate{d1, true}},
1673		{col: "Date", val: NullDate{d1, true}},
1674		{col: "Date", val: NullDate{d1, true}, want: d1},
1675		{col: "Date", val: NullDate{civil.Date{}, false}},
1676		{col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)},
1677		{col: "DateArray", val: []civil.Date{}, want: []NullDate{}},
1678		{col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}},
1679		{col: "Timestamp", val: t1},
1680		{col: "Timestamp", val: t1, want: NullTime{t1, true}},
1681		{col: "Timestamp", val: NullTime{t1, true}},
1682		{col: "Timestamp", val: NullTime{t1, true}, want: t1},
1683		{col: "Timestamp", val: NullTime{}},
1684		{col: "Timestamp", val: nil, want: NullTime{}},
1685		{col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)},
1686		{col: "TimestampArray", val: []time.Time{}, want: []NullTime{}},
1687		{col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}},
1688	}
1689
1690	if !isEmulatorEnvSet() {
1691		for _, tc := range []struct {
1692			col  string
1693			val  interface{}
1694			want interface{}
1695		}{
1696			{col: "Numeric", val: n1},
1697			{col: "Numeric", val: n2},
1698			{col: "Numeric", val: n1, want: NullNumeric{n1, true}},
1699			{col: "Numeric", val: n2, want: NullNumeric{n2, true}},
1700			{col: "Numeric", val: NullNumeric{n1, true}, want: n1},
1701			{col: "Numeric", val: NullNumeric{n1, true}, want: NullNumeric{n1, true}},
1702			{col: "Numeric", val: NullNumeric{n0, false}},
1703			{col: "Numeric", val: nil, want: NullNumeric{}},
1704			{col: "NumericArray", val: []big.Rat(nil), want: []NullNumeric(nil)},
1705			{col: "NumericArray", val: []big.Rat{}, want: []NullNumeric{}},
1706			{col: "NumericArray", val: []big.Rat{n1, n2}, want: []NullNumeric{{n1, true}, {n2, true}}},
1707			{col: "NumericArray", val: []NullNumeric(nil)},
1708			{col: "NumericArray", val: []NullNumeric{}},
1709			{col: "NumericArray", val: []NullNumeric{{n1, true}, {n2, true}, {}}},
1710		} {
1711			tests = append(tests, tc)
1712		}
1713	}
1714
1715	// Write rows into table first.
1716	var muts []*Mutation
1717	for i, test := range tests {
1718		muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val}))
1719	}
1720	if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil {
1721		t.Fatal(err)
1722	}
1723
1724	for i, test := range tests {
1725		row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col})
1726		if err != nil {
1727			t.Fatalf("Unable to fetch row %v: %v", i, err)
1728		}
1729		verifyDirectPathRemoteAddress(t)
1730		// Create new instance of type of test.want.
1731		want := test.want
1732		if want == nil {
1733			want = test.val
1734		}
1735		gotp := reflect.New(reflect.TypeOf(want))
1736		if err := row.Column(0, gotp.Interface()); err != nil {
1737			t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err)
1738			continue
1739		}
1740		got := reflect.Indirect(gotp).Interface()
1741
1742		// One of the test cases is checking NaN handling.  Given
1743		// NaN!=NaN, we can't use reflect to test for it.
1744		if isNaN(got) && isNaN(want) {
1745			continue
1746		}
1747
1748		// Check non-NaN cases.
1749		if !testEqual(got, want) {
1750			t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want)
1751			continue
1752		}
1753	}
1754}
1755
1756// Test decoding Cloud Spanner STRUCT type.
1757func TestIntegration_StructTypes(t *testing.T) {
1758	t.Parallel()
1759
1760	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1761	defer cancel()
1762	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1763	defer cleanup()
1764
1765	tests := []struct {
1766		q    Statement
1767		want func(r *Row) error
1768	}{
1769		{
1770			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`},
1771			want: func(r *Row) error {
1772				// Test STRUCT ARRAY decoding to []NullRow.
1773				var rows []NullRow
1774				if err := r.Column(0, &rows); err != nil {
1775					return err
1776				}
1777				if len(rows) != 1 {
1778					return fmt.Errorf("len(rows) = %d; want 1", len(rows))
1779				}
1780				if !rows[0].Valid {
1781					return fmt.Errorf("rows[0] is NULL")
1782				}
1783				var i, j int64
1784				if err := rows[0].Row.Columns(&i, &j); err != nil {
1785					return err
1786				}
1787				if i != 1 || j != 2 {
1788					return fmt.Errorf("got (%d,%d), want (1,2)", i, j)
1789				}
1790				return nil
1791			},
1792		},
1793		{
1794			q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`},
1795			want: func(r *Row) error {
1796				// Test Row.ToStruct.
1797				s := struct {
1798					Col1 []*struct {
1799						Foo int64 `spanner:"foo"`
1800						Bar int64 `spanner:"bar"`
1801					} `spanner:"col1"`
1802				}{}
1803				if err := r.ToStruct(&s); err != nil {
1804					return err
1805				}
1806				want := struct {
1807					Col1 []*struct {
1808						Foo int64 `spanner:"foo"`
1809						Bar int64 `spanner:"bar"`
1810					} `spanner:"col1"`
1811				}{
1812					Col1: []*struct {
1813						Foo int64 `spanner:"foo"`
1814						Bar int64 `spanner:"bar"`
1815					}{
1816						{
1817							Foo: 1,
1818							Bar: 2,
1819						},
1820					},
1821				}
1822				if !testEqual(want, s) {
1823					return fmt.Errorf("unexpected decoding result: %v, want %v", s, want)
1824				}
1825				return nil
1826			},
1827		},
1828	}
1829	for i, test := range tests {
1830		iter := client.Single().Query(ctx, test.q)
1831		defer iter.Stop()
1832		row, err := iter.Next()
1833		if err != nil {
1834			t.Errorf("%d: %v", i, err)
1835			continue
1836		}
1837		if err := test.want(row); err != nil {
1838			t.Errorf("%d: %v", i, err)
1839			continue
1840		}
1841	}
1842}
1843
1844func TestIntegration_StructParametersUnsupported(t *testing.T) {
1845	skipEmulatorTest(t)
1846	t.Parallel()
1847
1848	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1849	defer cancel()
1850	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
1851	defer cleanup()
1852
1853	for _, test := range []struct {
1854		param       interface{}
1855		wantCode    codes.Code
1856		wantMsgPart string
1857	}{
1858		{
1859			struct {
1860				Field int
1861			}{10},
1862			codes.Unimplemented,
1863			"Unsupported query shape: " +
1864				"A struct value cannot be returned as a column value. " +
1865				"Rewrite the query to flatten the struct fields in the result.",
1866		},
1867		{
1868			[]struct {
1869				Field int
1870			}{{10}, {20}},
1871			codes.Unimplemented,
1872			"Unsupported query shape: " +
1873				"This query can return a null-valued array of struct, " +
1874				"which is not supported by Spanner.",
1875		},
1876	} {
1877		iter := client.Single().Query(ctx, Statement{
1878			SQL:    "SELECT @p",
1879			Params: map[string]interface{}{"p": test.param},
1880		})
1881		_, err := iter.Next()
1882		iter.Stop()
1883		if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok {
1884			t.Fatal(msg)
1885		}
1886	}
1887}
1888
1889// Test queries of the form "SELECT expr".
1890func TestIntegration_QueryExpressions(t *testing.T) {
1891	t.Parallel()
1892
1893	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1894	defer cancel()
1895	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
1896	defer cleanup()
1897
1898	newRow := func(vals []interface{}) *Row {
1899		row, err := NewRow(make([]string, len(vals)), vals)
1900		if err != nil {
1901			t.Fatal(err)
1902		}
1903		return row
1904	}
1905
1906	tests := []struct {
1907		expr string
1908		want interface{}
1909	}{
1910		{"1", int64(1)},
1911		{"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}},
1912		{"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}},
1913		{"IEEE_DIVIDE(1, 0)", math.Inf(1)},
1914		{"IEEE_DIVIDE(-1, 0)", math.Inf(-1)},
1915		{"IEEE_DIVIDE(0, 0)", math.NaN()},
1916		// TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate.
1917		{"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}},
1918		{"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}},
1919		{"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}},
1920	}
1921	for _, test := range tests {
1922		iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr})
1923		defer iter.Stop()
1924		row, err := iter.Next()
1925		if err != nil {
1926			t.Errorf("%q: %v", test.expr, err)
1927			continue
1928		}
1929		// Create new instance of type of test.want.
1930		gotp := reflect.New(reflect.TypeOf(test.want))
1931		if err := row.Column(0, gotp.Interface()); err != nil {
1932			t.Errorf("%q: Column returned error %v", test.expr, err)
1933			continue
1934		}
1935		got := reflect.Indirect(gotp).Interface()
1936		// TODO(jba): remove isNaN special case when we have a better equality predicate.
1937		if isNaN(got) && isNaN(test.want) {
1938			continue
1939		}
1940		if !testEqual(got, test.want) {
1941			t.Errorf("%q\n got  %#v\nwant %#v", test.expr, got, test.want)
1942		}
1943	}
1944}
1945
1946func TestIntegration_QueryStats(t *testing.T) {
1947	skipEmulatorTest(t)
1948	t.Parallel()
1949
1950	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1951	defer cancel()
1952	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
1953	defer cleanup()
1954
1955	accounts := []*Mutation{
1956		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
1957		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
1958	}
1959	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
1960		t.Fatal(err)
1961	}
1962	const sql = "SELECT Balance FROM Accounts"
1963
1964	qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil})
1965	if err != nil {
1966		t.Fatal(err)
1967	}
1968	if len(qp.PlanNodes) == 0 {
1969		t.Error("got zero plan nodes, expected at least one")
1970	}
1971
1972	iter := client.Single().QueryWithStats(ctx, Statement{sql, nil})
1973	defer iter.Stop()
1974	for {
1975		_, err := iter.Next()
1976		if err == iterator.Done {
1977			break
1978		}
1979		if err != nil {
1980			t.Fatal(err)
1981		}
1982	}
1983	if iter.QueryPlan == nil {
1984		t.Error("got nil QueryPlan, expected one")
1985	}
1986	if iter.QueryStats == nil {
1987		t.Error("got nil QueryStats, expected some")
1988	}
1989}
1990
1991func TestIntegration_InvalidDatabase(t *testing.T) {
1992	t.Parallel()
1993
1994	if databaseAdmin == nil {
1995		t.Skip("Integration tests skipped")
1996	}
1997	ctx := context.Background()
1998	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID)
1999	c, err := createClient(ctx, dbPath, SessionPoolConfig{})
2000	// Client creation should succeed even if the database is invalid.
2001	if err != nil {
2002		t.Fatal(err)
2003	}
2004	_, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"})
2005	if msg, ok := matchError(err, codes.NotFound, ""); !ok {
2006		t.Fatal(msg)
2007	}
2008}
2009
2010func TestIntegration_ReadErrors(t *testing.T) {
2011	t.Parallel()
2012
2013	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2014	defer cancel()
2015	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
2016	defer cleanup()
2017
2018	var ms []*Mutation
2019	for i := 0; i < 2; i++ {
2020		ms = append(ms, InsertOrUpdate(testTable,
2021			testTableColumns,
2022			[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v")}))
2023	}
2024	if _, err := client.Apply(ctx, ms); err != nil {
2025		t.Fatal(err)
2026	}
2027
2028	// Read over invalid table fails
2029	_, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"})
2030	if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok {
2031		t.Error(msg)
2032	}
2033	// Read over invalid column fails
2034	_, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"})
2035	if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok {
2036		t.Error(msg)
2037	}
2038
2039	// Invalid query fails
2040	iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"})
2041	defer iter.Stop()
2042	_, err = iter.Next()
2043	if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok {
2044		t.Error(msg)
2045	}
2046
2047	// Read should fail on cancellation.
2048	cctx, cancel := context.WithCancel(ctx)
2049	cancel()
2050	_, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"})
2051	if msg, ok := matchError(err, codes.Canceled, ""); !ok {
2052		t.Error(msg)
2053	}
2054	// Read should fail if deadline exceeded.
2055	dctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
2056	defer cancel()
2057	<-dctx.Done()
2058	_, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"})
2059	if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok {
2060		t.Error(msg)
2061	}
2062	// Read should fail if there are multiple rows returned.
2063	_, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v"}, testTableColumns)
2064	wantMsgPart := fmt.Sprintf("more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", testTable, Key{"v"}, testTableIndex)
2065	if msg, ok := matchError(err, codes.FailedPrecondition, wantMsgPart); !ok {
2066		t.Error(msg)
2067	}
2068}
2069
2070// Test TransactionRunner. Test that transactions are aborted and retried as
2071// expected.
2072func TestIntegration_TransactionRunner(t *testing.T) {
2073	skipEmulatorTest(t)
2074	t.Parallel()
2075
2076	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2077	defer cancel()
2078	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2079	defer cleanup()
2080
2081	// Test 1: User error should abort the transaction.
2082	_, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2083		tx.BufferWrite([]*Mutation{
2084			Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})})
2085		return errors.New("user error")
2086	})
2087	// Empty read.
2088	rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"}))
2089	if err != nil {
2090		t.Fatal(err)
2091	}
2092	if got, want := len(rows), 0; got != want {
2093		t.Errorf("Empty read, got %d, want %d.", got, want)
2094	}
2095
2096	// Test 2: Expect abort and retry.
2097	// We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by
2098	// committing writes to the column txn2 have read, and expect the following
2099	// read to abort and txn2 retries.
2100
2101	// Set up two accounts
2102	accounts := []*Mutation{
2103		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}),
2104		Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}),
2105	}
2106	if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
2107		t.Fatal(err)
2108	}
2109
2110	var (
2111		cTxn1Start  = make(chan struct{})
2112		cTxn1Commit = make(chan struct{})
2113		cTxn2Start  = make(chan struct{})
2114		wg          sync.WaitGroup
2115	)
2116
2117	// read balance, check error if we don't expect abort.
2118	readBalance := func(tx interface {
2119		ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error)
2120	}, key int64, expectAbort bool) (int64, error) {
2121		var b int64
2122		r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"})
2123		if e != nil {
2124			if expectAbort && !isAbortedErr(e) {
2125				t.Errorf("ReadRow got %v, want Abort error.", e)
2126			}
2127			// Verify that we received and are able to extract retry info from
2128			// the aborted error.
2129			if _, hasRetryInfo := ExtractRetryDelay(e); !hasRetryInfo {
2130				t.Errorf("Got Abort error without RetryInfo\nGot: %v", e)
2131			}
2132			return b, e
2133		}
2134		if ce := r.Column(0, &b); ce != nil {
2135			return b, ce
2136		}
2137		return b, nil
2138	}
2139
2140	wg.Add(2)
2141	// Txn 1
2142	go func() {
2143		defer wg.Done()
2144		var once sync.Once
2145		_, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2146			b, e := readBalance(tx, 1, false)
2147			if e != nil {
2148				return e
2149			}
2150			// txn 1 can abort, in that case we skip closing the channel on
2151			// retry.
2152			once.Do(func() { close(cTxn1Start) })
2153			e = tx.BufferWrite([]*Mutation{
2154				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})})
2155			if e != nil {
2156				return e
2157			}
2158			// Wait for second transaction.
2159			<-cTxn2Start
2160			return nil
2161		})
2162		close(cTxn1Commit)
2163		if e != nil {
2164			t.Errorf("Transaction 1 commit, got %v, want nil.", e)
2165		}
2166	}()
2167	// Txn 2
2168	go func() {
2169		// Wait until txn 1 starts.
2170		<-cTxn1Start
2171		defer wg.Done()
2172		var (
2173			once sync.Once
2174			b1   int64
2175			b2   int64
2176			e    error
2177		)
2178		_, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2179			if b1, e = readBalance(tx, 1, false); e != nil {
2180				return e
2181			}
2182			// Skip closing channel on retry.
2183			once.Do(func() { close(cTxn2Start) })
2184			// Wait until txn 1 successfully commits.
2185			<-cTxn1Commit
2186			// Txn1 has committed and written a balance to the account. Now this
2187			// transaction (txn2) reads and re-writes the balance. The first
2188			// time through, it will abort because it overlaps with txn1. Then
2189			// it will retry after txn1 commits, and succeed.
2190			if b2, e = readBalance(tx, 2, true); e != nil {
2191				return e
2192			}
2193			return tx.BufferWrite([]*Mutation{
2194				Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})})
2195		})
2196		if e != nil {
2197			t.Errorf("Transaction 2 commit, got %v, want nil.", e)
2198		}
2199	}()
2200	wg.Wait()
2201	// Check that both transactions' effects are visible.
2202	for i := int64(1); i <= int64(2); i++ {
2203		if b, e := readBalance(client.Single(), i, false); e != nil {
2204			t.Fatalf("ReadBalance for key %d error %v.", i, e)
2205		} else if b != i {
2206			t.Errorf("Balance for key %d, got %d, want %d.", i, b, i)
2207		}
2208	}
2209}
2210
2211// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then
2212// serialize and deserialize both transaction and partition to be used in
2213// execution on another client, and compare results.
2214func TestIntegration_BatchQuery(t *testing.T) {
2215	t.Parallel()
2216
2217	// Set up testing environment.
2218	var (
2219		client2 *Client
2220		err     error
2221	)
2222	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2223	defer cancel()
2224	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
2225	defer cleanup()
2226
2227	if err = populate(ctx, client); err != nil {
2228		t.Fatal(err)
2229	}
2230	if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
2231		t.Fatal(err)
2232	}
2233	defer client2.Close()
2234
2235	// PartitionQuery
2236	var (
2237		txn        *BatchReadOnlyTransaction
2238		partitions []*Partition
2239		stmt       = Statement{SQL: "SELECT * FROM test;"}
2240	)
2241
2242	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
2243		t.Fatal(err)
2244	}
2245	defer txn.Cleanup(ctx)
2246	if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil {
2247		t.Fatal(err)
2248	}
2249
2250	// Reconstruct BatchReadOnlyTransactionID and execute partitions
2251	var (
2252		tid2      BatchReadOnlyTransactionID
2253		data      []byte
2254		gotResult bool // if we get matching result from two separate txns
2255	)
2256	if data, err = txn.ID.MarshalBinary(); err != nil {
2257		t.Fatalf("encoding failed %v", err)
2258	}
2259	if err = tid2.UnmarshalBinary(data); err != nil {
2260		t.Fatalf("decoding failed %v", err)
2261	}
2262	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
2263
2264	// Execute Partitions and compare results
2265	for i, p := range partitions {
2266		iter := txn.Execute(ctx, p)
2267		defer iter.Stop()
2268		p2 := serdesPartition(t, i, p)
2269		iter2 := txn2.Execute(ctx, &p2)
2270		defer iter2.Stop()
2271
2272		row1, err1 := iter.Next()
2273		row2, err2 := iter2.Next()
2274		if err1 != err2 {
2275			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
2276			continue
2277		}
2278		if !testEqual(row1, row2) {
2279			t.Fatalf("execution returned different values: %v, %v", row1, row2)
2280			continue
2281		}
2282		if row1 == nil {
2283			continue
2284		}
2285		var a, b string
2286		if err = row1.Columns(&a, &b); err != nil {
2287			t.Fatalf("failed to parse row %v", err)
2288			continue
2289		}
2290		if a == str1 && b == str2 {
2291			gotResult = true
2292		}
2293	}
2294	if !gotResult {
2295		t.Fatalf("execution didn't return expected values")
2296	}
2297}
2298
2299// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery
2300func TestIntegration_BatchRead(t *testing.T) {
2301	t.Parallel()
2302
2303	// Set up testing environment.
2304	var (
2305		client2 *Client
2306		err     error
2307	)
2308	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2309	defer cancel()
2310	client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
2311	defer cleanup()
2312
2313	if err = populate(ctx, client); err != nil {
2314		t.Fatal(err)
2315	}
2316	if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
2317		t.Fatal(err)
2318	}
2319	defer client2.Close()
2320
2321	// PartitionRead
2322	var (
2323		txn        *BatchReadOnlyTransaction
2324		partitions []*Partition
2325	)
2326
2327	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
2328		t.Fatal(err)
2329	}
2330	defer txn.Cleanup(ctx)
2331	if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
2332		t.Fatal(err)
2333	}
2334
2335	// Reconstruct BatchReadOnlyTransactionID and execute partitions.
2336	var (
2337		tid2      BatchReadOnlyTransactionID
2338		data      []byte
2339		gotResult bool // if we get matching result from two separate txns
2340	)
2341	if data, err = txn.ID.MarshalBinary(); err != nil {
2342		t.Fatalf("encoding failed %v", err)
2343	}
2344	if err = tid2.UnmarshalBinary(data); err != nil {
2345		t.Fatalf("decoding failed %v", err)
2346	}
2347	txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
2348
2349	// Execute Partitions and compare results.
2350	for i, p := range partitions {
2351		iter := txn.Execute(ctx, p)
2352		defer iter.Stop()
2353		p2 := serdesPartition(t, i, p)
2354		iter2 := txn2.Execute(ctx, &p2)
2355		defer iter2.Stop()
2356
2357		row1, err1 := iter.Next()
2358		row2, err2 := iter2.Next()
2359		if err1 != err2 {
2360			t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
2361			continue
2362		}
2363		if !testEqual(row1, row2) {
2364			t.Fatalf("execution returned different values: %v, %v", row1, row2)
2365			continue
2366		}
2367		if row1 == nil {
2368			continue
2369		}
2370		var a, b string
2371		if err = row1.Columns(&a, &b); err != nil {
2372			t.Fatalf("failed to parse row %v", err)
2373			continue
2374		}
2375		if a == str1 && b == str2 {
2376			gotResult = true
2377		}
2378	}
2379	if !gotResult {
2380		t.Fatalf("execution didn't return expected values")
2381	}
2382}
2383
2384// Test normal txReadEnv method on BatchReadOnlyTransaction.
2385func TestIntegration_BROTNormal(t *testing.T) {
2386	t.Parallel()
2387
2388	// Set up testing environment and create txn.
2389	var (
2390		txn *BatchReadOnlyTransaction
2391		err error
2392		row *Row
2393		i   int64
2394	)
2395	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2396	defer cancel()
2397	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements)
2398	defer cleanup()
2399
2400	if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
2401		t.Fatal(err)
2402	}
2403	defer txn.Cleanup(ctx)
2404	if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
2405		t.Fatal(err)
2406	}
2407	// Normal query should work with BatchReadOnlyTransaction.
2408	stmt2 := Statement{SQL: "SELECT 1"}
2409	iter := txn.Query(ctx, stmt2)
2410	defer iter.Stop()
2411
2412	row, err = iter.Next()
2413	if err != nil {
2414		t.Errorf("query failed with %v", err)
2415	}
2416	if err = row.Columns(&i); err != nil {
2417		t.Errorf("failed to parse row %v", err)
2418	}
2419}
2420
2421func TestIntegration_CommitTimestamp(t *testing.T) {
2422	t.Parallel()
2423
2424	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2425	defer cancel()
2426	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, ctsDBStatements)
2427	defer cleanup()
2428
2429	type testTableRow struct {
2430		Key string
2431		Ts  NullTime
2432	}
2433
2434	var (
2435		cts1, cts2, ts1, ts2 time.Time
2436		err                  error
2437	)
2438
2439	// Apply mutation in sequence, expect to see commit timestamp in good order,
2440	// check also the commit timestamp returned
2441	for _, it := range []struct {
2442		k string
2443		t *time.Time
2444	}{
2445		{"a", &cts1},
2446		{"b", &cts2},
2447	} {
2448		tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}}
2449		m, err := InsertStruct("TestTable", tt)
2450		if err != nil {
2451			t.Fatal(err)
2452		}
2453		*it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce())
2454		if err != nil {
2455			t.Fatal(err)
2456		}
2457	}
2458
2459	txn := client.ReadOnlyTransaction()
2460	for _, it := range []struct {
2461		k string
2462		t *time.Time
2463	}{
2464		{"a", &ts1},
2465		{"b", &ts2},
2466	} {
2467		if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil {
2468			t.Fatal(err)
2469		} else {
2470			var got testTableRow
2471			if err := r.ToStruct(&got); err != nil {
2472				t.Fatal(err)
2473			}
2474			*it.t = got.Ts.Time
2475		}
2476	}
2477	if !cts1.Equal(ts1) {
2478		t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1)
2479	}
2480	if !cts2.Equal(ts2) {
2481		t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2)
2482	}
2483
2484	// Try writing a timestamp in the future to commit timestamp, expect error.
2485	_, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce())
2486	if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok {
2487		t.Error(msg)
2488	}
2489}
2490
2491func TestIntegration_DML(t *testing.T) {
2492	t.Parallel()
2493
2494	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2495	defer cancel()
2496	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2497	defer cleanup()
2498
2499	// Function that reads a single row's first name from within a transaction.
2500	readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) {
2501		row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"})
2502		if err != nil {
2503			return "", err
2504		}
2505		var fn string
2506		if err := row.Column(0, &fn); err != nil {
2507			return "", err
2508		}
2509		return fn, nil
2510	}
2511
2512	// Function that reads multiple rows' first names from outside a read/write
2513	// transaction.
2514	readFirstNames := func(keys ...int) []string {
2515		var ks []KeySet
2516		for _, k := range keys {
2517			ks = append(ks, Key{k})
2518		}
2519		iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"})
2520		var got []string
2521		var fn string
2522		err := iter.Do(func(row *Row) error {
2523			if err := row.Column(0, &fn); err != nil {
2524				return err
2525			}
2526			got = append(got, fn)
2527			return nil
2528		})
2529		if err != nil {
2530			t.Fatalf("readFirstNames(%v): %v", keys, err)
2531		}
2532		return got
2533	}
2534
2535	// Use ReadWriteTransaction.Query to execute a DML statement.
2536	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2537		iter := tx.Query(ctx, Statement{
2538			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`,
2539		})
2540		defer iter.Stop()
2541		row, err := iter.Next()
2542		if ErrCode(err) == codes.Aborted {
2543			return err
2544		}
2545		if err != iterator.Done {
2546			t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err)
2547		}
2548		if iter.RowCount != 1 {
2549			t.Errorf("row count: got %d, want 1", iter.RowCount)
2550		}
2551		// The results of the DML statement should be visible to the transaction.
2552		got, err := readFirstName(tx, 1)
2553		if err != nil {
2554			return err
2555		}
2556		if want := "Umm"; got != want {
2557			t.Errorf("got %q, want %q", got, want)
2558		}
2559		return nil
2560	})
2561	if err != nil {
2562		t.Fatal(err)
2563	}
2564
2565	// Use ReadWriteTransaction.Update to execute a DML statement.
2566	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2567		count, err := tx.Update(ctx, Statement{
2568			SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`,
2569		})
2570		if err != nil {
2571			return err
2572		}
2573		if count != 1 {
2574			t.Errorf("row count: got %d, want 1", count)
2575		}
2576		got, err := readFirstName(tx, 2)
2577		if err != nil {
2578			return err
2579		}
2580		if want := "Eduard"; got != want {
2581			t.Errorf("got %q, want %q", got, want)
2582		}
2583		return nil
2584
2585	})
2586	if err != nil {
2587		t.Fatal(err)
2588	}
2589
2590	// Roll back a DML statement and confirm that it didn't happen.
2591	var fail = errors.New("fail")
2592	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2593		_, err := tx.Update(ctx, Statement{
2594			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
2595		})
2596		if err != nil {
2597			return err
2598		}
2599		return fail
2600	})
2601	if err != fail {
2602		t.Fatalf("rolling back: got error %v, want the error 'fail'", err)
2603	}
2604	_, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"})
2605	if got, want := ErrCode(err), codes.NotFound; got != want {
2606		t.Errorf("got %s, want %s", got, want)
2607	}
2608
2609	// Run two DML statements in the same transaction.
2610	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2611		_, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`})
2612		if err != nil {
2613			return err
2614		}
2615		_, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`})
2616		if err != nil {
2617			return err
2618		}
2619		return nil
2620	})
2621	if err != nil {
2622		t.Fatal(err)
2623	}
2624	got := readFirstNames(1, 2)
2625	want := []string{"Oum", "Eddie"}
2626	if !testEqual(got, want) {
2627		t.Errorf("got %v, want %v", got, want)
2628	}
2629
2630	// Run a DML statement and an ordinary mutation in the same transaction.
2631	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2632		_, err := tx.Update(ctx, Statement{
2633			SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
2634		})
2635		if err != nil {
2636			return err
2637		}
2638		return tx.BufferWrite([]*Mutation{
2639			Insert("Singers", []string{"SingerId", "FirstName", "LastName"},
2640				[]interface{}{4, "Andy", "Irvine"}),
2641		})
2642	})
2643	if err != nil {
2644		t.Fatal(err)
2645	}
2646	got = readFirstNames(3, 4)
2647	want = []string{"Audra", "Andy"}
2648	if !testEqual(got, want) {
2649		t.Errorf("got %v, want %v", got, want)
2650	}
2651
2652	// Attempt to run a query using update.
2653	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
2654		_, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`})
2655		return err
2656	})
2657	if got, want := ErrCode(err), codes.InvalidArgument; got != want {
2658		t.Errorf("got %s, want %s", got, want)
2659	}
2660}
2661
2662func TestIntegration_StructParametersBind(t *testing.T) {
2663	t.Parallel()
2664
2665	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2666	defer cancel()
2667	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
2668	defer cleanup()
2669
2670	type tRow []interface{}
2671	type tRows []struct{ trow tRow }
2672
2673	type allFields struct {
2674		Stringf string
2675		Intf    int
2676		Boolf   bool
2677		Floatf  float64
2678		Bytef   []byte
2679		Timef   time.Time
2680		Datef   civil.Date
2681	}
2682	allColumns := []string{
2683		"Stringf",
2684		"Intf",
2685		"Boolf",
2686		"Floatf",
2687		"Bytef",
2688		"Timef",
2689		"Datef",
2690	}
2691	s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1}
2692	s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2}
2693
2694	dynamicStructType := reflect.StructOf([]reflect.StructField{
2695		{Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`},
2696	})
2697	s3 := reflect.New(dynamicStructType)
2698	s3.Elem().Field(0).Set(reflect.ValueOf(t1))
2699
2700	for i, test := range []struct {
2701		param interface{}
2702		sql   string
2703		cols  []string
2704		trows tRows
2705	}{
2706		// Struct value.
2707		{
2708			s1,
2709			"SELECT" +
2710				" @p.Stringf," +
2711				" @p.Intf," +
2712				" @p.Boolf," +
2713				" @p.Floatf," +
2714				" @p.Bytef," +
2715				" @p.Timef," +
2716				" @p.Datef",
2717			allColumns,
2718			tRows{
2719				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
2720			},
2721		},
2722		// Array of struct value.
2723		{
2724			[]allFields{s1, s2},
2725			"SELECT * FROM UNNEST(@p)",
2726			allColumns,
2727			tRows{
2728				{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
2729				{tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}},
2730			},
2731		},
2732		// Null struct.
2733		{
2734			(*allFields)(nil),
2735			"SELECT @p IS NULL",
2736			[]string{""},
2737			tRows{
2738				{tRow{true}},
2739			},
2740		},
2741		// Null Array of struct.
2742		{
2743			[]allFields(nil),
2744			"SELECT @p IS NULL",
2745			[]string{""},
2746			tRows{
2747				{tRow{true}},
2748			},
2749		},
2750		// Empty struct.
2751		{
2752			struct{}{},
2753			"SELECT @p IS NULL ",
2754			[]string{""},
2755			tRows{
2756				{tRow{false}},
2757			},
2758		},
2759		// Empty array of struct.
2760		{
2761			[]allFields{},
2762			"SELECT * FROM UNNEST(@p) ",
2763			allColumns,
2764			tRows{},
2765		},
2766		// Struct with duplicate fields.
2767		{
2768			struct {
2769				A int `spanner:"field"`
2770				B int `spanner:"field"`
2771			}{10, 20},
2772			"SELECT * FROM UNNEST([@p]) ",
2773			[]string{"field", "field"},
2774			tRows{
2775				{tRow{10, 20}},
2776			},
2777		},
2778		// Struct with unnamed fields.
2779		{
2780			struct {
2781				A string `spanner:""`
2782			}{"hello"},
2783			"SELECT * FROM UNNEST([@p]) ",
2784			[]string{""},
2785			tRows{
2786				{tRow{"hello"}},
2787			},
2788		},
2789		// Mixed struct.
2790		{
2791			struct {
2792				DynamicStructField interface{}  `spanner:"f1"`
2793				ArrayStructField   []*allFields `spanner:"f2"`
2794			}{
2795				DynamicStructField: s3.Interface(),
2796				ArrayStructField:   []*allFields{nil},
2797			},
2798			"SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ",
2799			[]string{"ff1", "", ""},
2800			tRows{
2801				{tRow{t1, 1, true}},
2802			},
2803		},
2804	} {
2805		iter := client.Single().Query(ctx, Statement{
2806			SQL:    test.sql,
2807			Params: map[string]interface{}{"p": test.param},
2808		})
2809		var gotRows []*Row
2810		err := iter.Do(func(r *Row) error {
2811			gotRows = append(gotRows, r)
2812			return nil
2813		})
2814		if err != nil {
2815			t.Errorf("Failed to execute test case %d, error: %v", i, err)
2816		}
2817
2818		var wantRows []*Row
2819		for j, row := range test.trows {
2820			r, err := NewRow(test.cols, row.trow)
2821			if err != nil {
2822				t.Errorf("Invalid row %d in test case %d", j, i)
2823			}
2824			wantRows = append(wantRows, r)
2825		}
2826		if !testEqual(gotRows, wantRows) {
2827			t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows)
2828		}
2829	}
2830}
2831
2832func TestIntegration_PDML(t *testing.T) {
2833	t.Parallel()
2834
2835	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2836	defer cancel()
2837	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2838	defer cleanup()
2839
2840	columns := []string{"SingerId", "FirstName", "LastName"}
2841
2842	// Populate the Singers table.
2843	var muts []*Mutation
2844	for _, row := range [][]interface{}{
2845		{1, "Umm", "Kulthum"},
2846		{2, "Eduard", "Khil"},
2847		{3, "Audra", "McDonald"},
2848		{4, "Enrique", "Iglesias"},
2849		{5, "Shakira", "Ripoll"},
2850	} {
2851		muts = append(muts, Insert("Singers", columns, row))
2852	}
2853	if _, err := client.Apply(ctx, muts); err != nil {
2854		t.Fatal(err)
2855	}
2856	// Identifiers in PDML statements must be fully qualified.
2857	// TODO(jba): revisit the above.
2858	count, err := client.PartitionedUpdate(ctx, Statement{
2859		SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= @end`,
2860		Params: map[string]interface{}{
2861			"end": 3,
2862		},
2863	})
2864	if err != nil {
2865		t.Fatal(err)
2866	}
2867	if want := int64(3); count != want {
2868		t.Fatalf("got %d, want %d", count, want)
2869	}
2870	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2871	if err != nil {
2872		t.Fatal(err)
2873	}
2874	want := [][]interface{}{
2875		{int64(1), "changed", "Kulthum"},
2876		{int64(2), "changed", "Khil"},
2877		{int64(3), "changed", "McDonald"},
2878		{int64(4), "Enrique", "Iglesias"},
2879		{int64(5), "Shakira", "Ripoll"},
2880	}
2881	if !testEqual(got, want) {
2882		t.Errorf("\ngot %v\nwant%v", got, want)
2883	}
2884}
2885
2886func TestIntegration_BatchDML(t *testing.T) {
2887	t.Parallel()
2888
2889	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2890	defer cancel()
2891	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2892	defer cleanup()
2893
2894	columns := []string{"SingerId", "FirstName", "LastName"}
2895
2896	// Populate the Singers table.
2897	var muts []*Mutation
2898	for _, row := range [][]interface{}{
2899		{1, "Umm", "Kulthum"},
2900		{2, "Eduard", "Khil"},
2901		{3, "Audra", "McDonald"},
2902	} {
2903		muts = append(muts, Insert("Singers", columns, row))
2904	}
2905	if _, err := client.Apply(ctx, muts); err != nil {
2906		t.Fatal(err)
2907	}
2908
2909	var counts []int64
2910	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2911		counts, err = tx.BatchUpdate(ctx, []Statement{
2912			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2913			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2914			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2915		})
2916		return err
2917	})
2918
2919	if err != nil {
2920		t.Fatal(err)
2921	}
2922	if want := []int64{1, 1, 1}; !testEqual(counts, want) {
2923		t.Fatalf("got %d, want %d", counts, want)
2924	}
2925	got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
2926	if err != nil {
2927		t.Fatal(err)
2928	}
2929	want := [][]interface{}{
2930		{int64(1), "changed 1", "Kulthum"},
2931		{int64(2), "changed 2", "Khil"},
2932		{int64(3), "changed 3", "McDonald"},
2933	}
2934	if !testEqual(got, want) {
2935		t.Errorf("\ngot %v\nwant%v", got, want)
2936	}
2937}
2938
2939func TestIntegration_BatchDML_NoStatements(t *testing.T) {
2940	t.Parallel()
2941
2942	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2943	defer cancel()
2944	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2945	defer cleanup()
2946
2947	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2948		_, err = tx.BatchUpdate(ctx, []Statement{})
2949		return err
2950	})
2951	if err == nil {
2952		t.Fatal("expected error, got nil")
2953	}
2954	if s, ok := status.FromError(err); ok {
2955		if s.Code() != codes.InvalidArgument {
2956			t.Fatalf("expected InvalidArgument, got %v", err)
2957		}
2958	} else {
2959		t.Fatalf("expected InvalidArgument, got %v", err)
2960	}
2961}
2962
2963func TestIntegration_BatchDML_TwoStatements(t *testing.T) {
2964	t.Parallel()
2965
2966	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
2967	defer cancel()
2968	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
2969	defer cleanup()
2970
2971	columns := []string{"SingerId", "FirstName", "LastName"}
2972
2973	// Populate the Singers table.
2974	var muts []*Mutation
2975	for _, row := range [][]interface{}{
2976		{1, "Umm", "Kulthum"},
2977		{2, "Eduard", "Khil"},
2978		{3, "Audra", "McDonald"},
2979	} {
2980		muts = append(muts, Insert("Singers", columns, row))
2981	}
2982	if _, err := client.Apply(ctx, muts); err != nil {
2983		t.Fatal(err)
2984	}
2985
2986	var updateCount int64
2987	var batchCounts []int64
2988	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
2989		batchCounts, err = tx.BatchUpdate(ctx, []Statement{
2990			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
2991			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`},
2992			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
2993		})
2994		if err != nil {
2995			return err
2996		}
2997
2998		updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`})
2999		return err
3000	})
3001	if err != nil {
3002		t.Fatal(err)
3003	}
3004	if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) {
3005		t.Fatalf("got %d, want %d", batchCounts, want)
3006	}
3007	if updateCount != 1 {
3008		t.Fatalf("got %v, want 1", updateCount)
3009	}
3010}
3011
3012func TestIntegration_BatchDML_Error(t *testing.T) {
3013	t.Parallel()
3014
3015	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
3016	defer cancel()
3017	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
3018	defer cleanup()
3019
3020	columns := []string{"SingerId", "FirstName", "LastName"}
3021
3022	// Populate the Singers table.
3023	var muts []*Mutation
3024	for _, row := range [][]interface{}{
3025		{1, "Umm", "Kulthum"},
3026		{2, "Eduard", "Khil"},
3027		{3, "Audra", "McDonald"},
3028	} {
3029		muts = append(muts, Insert("Singers", columns, row))
3030	}
3031	if _, err := client.Apply(ctx, muts); err != nil {
3032		t.Fatal(err)
3033	}
3034
3035	_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
3036		counts, err := tx.BatchUpdate(ctx, []Statement{
3037			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`},
3038			{SQL: `some illegal statement`},
3039			{SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`},
3040		})
3041		if err == nil {
3042			t.Fatal("expected err, got nil")
3043		}
3044		// The statement may also have been aborted by Spanner, and in that
3045		// case we should just let the client library retry the transaction.
3046		if status.Code(err) == codes.Aborted {
3047			return err
3048		}
3049		if want := []int64{1}; !testEqual(counts, want) {
3050			t.Fatalf("got %d, want %d", counts, want)
3051		}
3052
3053		got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns))
3054		// Aborted error is ok, just retry the transaction.
3055		if status.Code(err) == codes.Aborted {
3056			return err
3057		}
3058		if err != nil {
3059			t.Fatal(err)
3060		}
3061		want := [][]interface{}{
3062			{int64(1), "changed 1", "Kulthum"},
3063			{int64(2), "Eduard", "Khil"},
3064			{int64(3), "Audra", "McDonald"},
3065		}
3066		if !testEqual(got, want) {
3067			t.Errorf("\ngot %v\nwant%v", got, want)
3068		}
3069
3070		return nil
3071	})
3072	if err != nil {
3073		t.Fatal(err)
3074	}
3075}
3076
3077func TestIntegration_StartBackupOperation(t *testing.T) {
3078	skipEmulatorTest(t)
3079	t.Parallel()
3080
3081	// Backups can be slow, so use a 30 minute timeout.
3082	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
3083	defer cancel()
3084	_, testDatabaseName, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, backuDBStatements)
3085	defer cleanup()
3086
3087	backupID := backupIDSpace.New()
3088	backupName := fmt.Sprintf("projects/%s/instances/%s/backups/%s", testProjectID, testInstanceID, backupID)
3089	// Minimum expiry time is 6 hours
3090	expires := time.Now().Add(time.Hour * 7)
3091	respLRO, err := databaseAdmin.StartBackupOperation(ctx, backupID, testDatabaseName, expires)
3092	if err != nil {
3093		t.Fatal(err)
3094	}
3095
3096	_, err = respLRO.Wait(ctx)
3097	if err != nil {
3098		t.Fatal(err)
3099	}
3100	respMetadata, err := respLRO.Metadata()
3101	if err != nil {
3102		t.Fatalf("backup response metadata, got error %v, want nil", err)
3103	}
3104	if respMetadata.Database != testDatabaseName {
3105		t.Fatalf("backup database name, got %s, want %s", respMetadata.Database, testDatabaseName)
3106	}
3107	if respMetadata.Progress.ProgressPercent != 100 {
3108		t.Fatalf("backup progress percent, got %d, want 100", respMetadata.Progress.ProgressPercent)
3109	}
3110	respCheck, err := databaseAdmin.GetBackup(ctx, &adminpb.GetBackupRequest{Name: backupName})
3111	if err != nil {
3112		t.Fatalf("backup metadata, got error %v, want nil", err)
3113	}
3114	if respCheck.CreateTime == nil {
3115		t.Fatal("backup create time, got nil, want non-nil")
3116	}
3117	if respCheck.State != adminpb.Backup_READY {
3118		t.Fatalf("backup state, got %v, want %v", respCheck.State, adminpb.Backup_READY)
3119	}
3120	if respCheck.SizeBytes == 0 {
3121		t.Fatalf("backup size, got %d, want non-zero", respCheck.SizeBytes)
3122	}
3123}
3124
3125// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
3126func TestIntegration_DirectPathFallback(t *testing.T) {
3127	t.Parallel()
3128	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
3129	defer cancel()
3130	// Set up testing environment.
3131	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
3132	defer cleanup()
3133
3134	if !dpConfig.attemptDirectPath {
3135		return
3136	}
3137	if len(blackholeDpv6Cmd) == 0 {
3138		t.Fatal("-it.blackhole-dpv6-cmd unset")
3139	}
3140	if len(blackholeDpv4Cmd) == 0 {
3141		t.Fatal("-it.blackhole-dpv4-cmd unset")
3142	}
3143	if len(allowDpv6Cmd) == 0 {
3144		t.Fatal("-it.allowdpv6-cmd unset")
3145	}
3146	if len(allowDpv4Cmd) == 0 {
3147		t.Fatal("-it.allowdpv4-cmd unset")
3148	}
3149
3150	// Precondition: wait for DirectPath to connect.
3151	countEnough := examineTraffic(ctx, client /*blackholeDP = */, false)
3152	if !countEnough {
3153		t.Fatalf("Failed to observe RPCs over DirectPath")
3154	}
3155
3156	// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
3157	blackholeOrAllowDirectPath(t /*blackholeDP = */, true)
3158	countEnough = examineTraffic(ctx, client /*blackholeDP = */, true)
3159	if !countEnough {
3160		t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
3161	}
3162
3163	// Disable the blackhole, and client should use DirectPath again.
3164	blackholeOrAllowDirectPath(t /*blackholeDP = */, false)
3165	countEnough = examineTraffic(ctx, client /*blackholeDP = */, false)
3166	if !countEnough {
3167		t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
3168	}
3169}
3170
3171// Prepare initializes Cloud Spanner testing DB and clients.
3172func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
3173	if databaseAdmin == nil {
3174		t.Skip("Integration tests skipped")
3175	}
3176	// Construct a unique test DB name.
3177	dbName := dbNameSpace.New()
3178
3179	dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName)
3180	// Create database and tables.
3181	op, err := databaseAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{
3182		Parent:          fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
3183		CreateStatement: "CREATE DATABASE " + dbName,
3184		ExtraStatements: statements,
3185	})
3186	if err != nil {
3187		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
3188	}
3189	if _, err := op.Wait(ctx); err != nil {
3190		t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
3191	}
3192	client, err := createClient(ctx, dbPath, spc)
3193	if err != nil {
3194		t.Fatalf("cannot create data client on DB %v: %v", dbPath, err)
3195	}
3196	return client, dbPath, func() {
3197		client.Close()
3198	}
3199}
3200
3201func cleanupInstances() {
3202	if instanceAdmin == nil {
3203		// Integration tests skipped.
3204		return
3205	}
3206
3207	ctx := context.Background()
3208	parent := fmt.Sprintf("projects/%v", testProjectID)
3209	iter := instanceAdmin.ListInstances(ctx, &instancepb.ListInstancesRequest{
3210		Parent: parent,
3211		Filter: "name:gotest-",
3212	})
3213	expireAge := 24 * time.Hour
3214
3215	for {
3216		inst, err := iter.Next()
3217		if err == iterator.Done {
3218			break
3219		}
3220		if err != nil {
3221			panic(err)
3222		}
3223
3224		_, instID, err := parseInstanceName(inst.Name)
3225		if err != nil {
3226			log.Printf("Error: %v\n", err)
3227			continue
3228		}
3229		if instanceNameSpace.Older(instID, expireAge) {
3230			log.Printf("Deleting instance %s", inst.Name)
3231			deleteInstanceAndBackups(ctx, inst.Name)
3232		}
3233	}
3234}
3235
3236func deleteInstanceAndBackups(ctx context.Context, instanceName string) {
3237	// First delete any lingering backups that might have been left on
3238	// the instance.
3239	backups := databaseAdmin.ListBackups(ctx, &adminpb.ListBackupsRequest{Parent: instanceName})
3240	for {
3241		backup, err := backups.Next()
3242		if err == iterator.Done {
3243			break
3244		}
3245		if err != nil {
3246			log.Printf("failed to retrieve backups from instance %s because of error %v", instanceName, err)
3247			break
3248		}
3249		if err := databaseAdmin.DeleteBackup(ctx, &adminpb.DeleteBackupRequest{Name: backup.Name}); err != nil {
3250			log.Printf("failed to delete backup %s (error %v)", backup.Name, err)
3251		}
3252	}
3253
3254	if err := instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{Name: instanceName}); err != nil {
3255		log.Printf("failed to delete instance %s (error %v), might need a manual removal",
3256			instanceName, err)
3257	}
3258}
3259
3260func rangeReads(ctx context.Context, t *testing.T, client *Client) {
3261	checkRange := func(ks KeySet, wantNums ...int) {
3262		if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok {
3263			t.Errorf("key set %+v: %s", ks, msg)
3264		}
3265	}
3266
3267	checkRange(Key{"k1"}, 1)
3268	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4)
3269	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5)
3270	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5)
3271	checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4)
3272
3273	// Partial key specification.
3274	checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9)
3275	checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9)
3276	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10)
3277	checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11)
3278
3279	// The following produce empty ranges.
3280	// TODO(jba): Consider a multi-part key to illustrate partial key behavior.
3281	// checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen})
3282	// checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen})
3283	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen})
3284	// checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed})
3285
3286	// Prefix is component-wise, not string prefix.
3287	checkRange(Key{"k1"}.AsPrefix(), 1)
3288	checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
3289
3290	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
3291}
3292
3293func indexRangeReads(ctx context.Context, t *testing.T, client *Client) {
3294	checkRange := func(ks KeySet, wantNums ...int) {
3295		if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns),
3296			wantNums); !ok {
3297			t.Errorf("key set %+v: %s", ks, msg)
3298		}
3299	}
3300
3301	checkRange(Key{"v1"}, 1)
3302	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4)
3303	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5)
3304	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5)
3305	checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4)
3306
3307	// // Partial key specification.
3308	checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9)
3309	checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9)
3310	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10)
3311	checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11)
3312
3313	// // The following produce empty ranges.
3314	// checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen})
3315	// checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen})
3316	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen})
3317	// checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed})
3318
3319	// // Prefix is component-wise, not string prefix.
3320	checkRange(Key{"v1"}.AsPrefix(), 1)
3321	checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
3322	checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
3323
3324	// Read from an index with DESC ordering.
3325	wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}
3326	if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns),
3327		wantNums); !ok {
3328		t.Errorf("desc: %s", msg)
3329	}
3330}
3331
3332type testTableRow struct{ Key, StringValue string }
3333
3334func compareRows(iter *RowIterator, wantNums []int) (string, bool) {
3335	rows, err := readAllTestTable(iter)
3336	if err != nil {
3337		return err.Error(), false
3338	}
3339	want := map[string]string{}
3340	for _, n := range wantNums {
3341		want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n)
3342	}
3343	got := map[string]string{}
3344	for _, r := range rows {
3345		got[r.Key] = r.StringValue
3346	}
3347	if !testEqual(got, want) {
3348		return fmt.Sprintf("got %v, want %v", got, want), false
3349	}
3350	return "", true
3351}
3352
3353func isNaN(x interface{}) bool {
3354	f, ok := x.(float64)
3355	if !ok {
3356		return false
3357	}
3358	return math.IsNaN(f)
3359}
3360
3361// createClient creates Cloud Spanner data client.
3362func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) {
3363	opts := grpcHeaderChecker.CallOptions()
3364	if spannerHost != "" {
3365		opts = append(opts, option.WithEndpoint(spannerHost))
3366	}
3367	if dpConfig.attemptDirectPath {
3368		opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo))))
3369	}
3370	client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}, opts...)
3371	if err != nil {
3372		return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
3373	}
3374	return client, nil
3375}
3376
3377// populate prepares the database with some data.
3378func populate(ctx context.Context, client *Client) error {
3379	// Populate data
3380	var err error
3381	m := InsertMap("test", map[string]interface{}{
3382		"a": str1,
3383		"b": str2,
3384	})
3385	_, err = client.Apply(ctx, []*Mutation{m})
3386	return err
3387}
3388
3389func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) {
3390	if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) {
3391		return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false
3392	}
3393	return "", true
3394}
3395
3396func rowToValues(r *Row) ([]interface{}, error) {
3397	var x int64
3398	var y, z string
3399	if err := r.Column(0, &x); err != nil {
3400		return nil, err
3401	}
3402	if err := r.Column(1, &y); err != nil {
3403		return nil, err
3404	}
3405	if err := r.Column(2, &z); err != nil {
3406		return nil, err
3407	}
3408	return []interface{}{x, y, z}, nil
3409}
3410
3411func readAll(iter *RowIterator) ([][]interface{}, error) {
3412	defer iter.Stop()
3413	var vals [][]interface{}
3414	for {
3415		row, err := iter.Next()
3416		if err == iterator.Done {
3417			return vals, nil
3418		}
3419		if err != nil {
3420			return nil, err
3421		}
3422		v, err := rowToValues(row)
3423		if err != nil {
3424			return nil, err
3425		}
3426		vals = append(vals, v)
3427	}
3428}
3429
3430func readAllTestTable(iter *RowIterator) ([]testTableRow, error) {
3431	defer iter.Stop()
3432	var vals []testTableRow
3433	for {
3434		row, err := iter.Next()
3435		if err == iterator.Done {
3436			if iter.Metadata == nil {
3437				// All queries should always return metadata, regardless whether
3438				// they return any rows or not.
3439				return nil, errors.New("missing metadata from query")
3440			}
3441			return vals, nil
3442		}
3443		if err != nil {
3444			return nil, err
3445		}
3446		var ttr testTableRow
3447		if err := row.ToStruct(&ttr); err != nil {
3448			return nil, err
3449		}
3450		vals = append(vals, ttr)
3451	}
3452}
3453
3454func readAllAccountsTable(iter *RowIterator) ([][]interface{}, error) {
3455	defer iter.Stop()
3456	var vals [][]interface{}
3457	for {
3458		row, err := iter.Next()
3459		if err == iterator.Done {
3460			return vals, nil
3461		}
3462		if err != nil {
3463			return nil, err
3464		}
3465		var id, balance int64
3466		var nickname string
3467		err = row.Columns(&id, &nickname, &balance)
3468		if err != nil {
3469			return nil, err
3470		}
3471		vals = append(vals, []interface{}{id, nickname, balance})
3472	}
3473}
3474
3475func maxDuration(a, b time.Duration) time.Duration {
3476	if a > b {
3477		return a
3478	}
3479	return b
3480}
3481
3482func isEmulatorEnvSet() bool {
3483	return os.Getenv("SPANNER_EMULATOR_HOST") != ""
3484}
3485
3486func skipEmulatorTest(t *testing.T) {
3487	if isEmulatorEnvSet() {
3488		t.Skip("Skipping testing against the emulator.")
3489	}
3490}
3491
3492func verifyDirectPathRemoteAddress(t *testing.T) {
3493	t.Helper()
3494	if !dpConfig.attemptDirectPath {
3495		return
3496	}
3497	if remoteIP, res := isDirectPathRemoteAddress(); !res {
3498		if dpConfig.directPathIPv4Only {
3499			t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP)
3500		} else {
3501			t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP)
3502		}
3503	}
3504}
3505
3506func isDirectPathRemoteAddress() (_ string, _ bool) {
3507	remoteIP := peerInfo.Addr.String()
3508	// DirectPath ipv4-only can only use ipv4 traffic.
3509	if dpConfig.directPathIPv4Only {
3510		return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix)
3511	}
3512	// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
3513	return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
3514}
3515
3516// examineTraffic counts RPCs use DirectPath or CFE traffic.
3517func examineTraffic(ctx context.Context, client *Client, expectDP bool) bool {
3518	var numCount uint64
3519	const (
3520		numRPCsToSend  = 20
3521		minCompleteRPC = 40
3522	)
3523	countEnough := false
3524	start := time.Now()
3525	for !countEnough && time.Since(start) < 2*time.Minute {
3526		for i := 0; i < numRPCsToSend; i++ {
3527			_, _ = readAllTestTable(client.Single().Read(ctx, testTable, Key{"k1"}, testTableColumns))
3528			if _, useDP := isDirectPathRemoteAddress(); useDP != expectDP {
3529				numCount++
3530			}
3531			time.Sleep(100 * time.Millisecond)
3532			countEnough = numCount >= minCompleteRPC
3533		}
3534	}
3535	return countEnough
3536}
3537
3538func blackholeOrAllowDirectPath(t *testing.T, blackholeDP bool) {
3539	if dpConfig.directPathIPv4Only {
3540		if blackholeDP {
3541			cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
3542			out, _ := cmdRes.CombinedOutput()
3543			t.Logf(string(out))
3544		} else {
3545			cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
3546			out, _ := cmdRes.CombinedOutput()
3547			t.Logf(string(out))
3548		}
3549		return
3550	}
3551	// DirectPath supports both ipv4 and ipv6
3552	if blackholeDP {
3553		cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
3554		out, _ := cmdRes.CombinedOutput()
3555		t.Logf(string(out))
3556		cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
3557		out, _ = cmdRes.CombinedOutput()
3558		t.Logf(string(out))
3559	} else {
3560		cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
3561		out, _ := cmdRes.CombinedOutput()
3562		t.Logf(string(out))
3563		cmdRes = exec.Command("bash", "-c", allowDpv6Cmd)
3564		out, _ = cmdRes.CombinedOutput()
3565		t.Logf(string(out))
3566	}
3567}
3568