1/*
2Copyright 2019 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"context"
21	"flag"
22	"fmt"
23	"math"
24	"math/rand"
25	"os/exec"
26	"reflect"
27	"sort"
28	"strings"
29	"sync"
30	"testing"
31	"time"
32
33	"cloud.google.com/go/iam"
34	"cloud.google.com/go/internal"
35	"cloud.google.com/go/internal/testutil"
36	"cloud.google.com/go/internal/uid"
37	"github.com/golang/protobuf/proto"
38	"github.com/google/go-cmp/cmp"
39	gax "github.com/googleapis/gax-go/v2"
40	"google.golang.org/api/iterator"
41	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
42	v1 "google.golang.org/genproto/googleapis/iam/v1"
43	longrunning "google.golang.org/genproto/googleapis/longrunning"
44	grpc "google.golang.org/grpc"
45	"google.golang.org/protobuf/types/known/emptypb"
46)
47
48const (
49	directPathIPV6Prefix = "[2001:4860:8040"
50	directPathIPV4Prefix = "34.126"
51)
52
53var (
54	presidentsSocialGraph = map[string][]string{
55		"wmckinley":   {"tjefferson"},
56		"gwashington": {"jadams"},
57		"tjefferson":  {"gwashington", "jadams"},
58		"jadams":      {"gwashington", "tjefferson"},
59	}
60
61	tableNameSpace = uid.NewSpace("cbt-test", &uid.Options{Short: true})
62)
63
64func populatePresidentsGraph(table *Table) error {
65	ctx := context.Background()
66	for row, ss := range presidentsSocialGraph {
67		mut := NewMutation()
68		for _, name := range ss {
69			mut.Set("follows", name, 1000, []byte("1"))
70		}
71		if err := table.Apply(ctx, row, mut); err != nil {
72			return fmt.Errorf("Mutating row %q: %v", row, err)
73		}
74	}
75	return nil
76}
77
78var instanceToCreate string
79var instanceToCreateZone string
80var instanceToCreateZone2 string
81var blackholeDpv6Cmd string
82var blackholeDpv4Cmd string
83var allowDpv6Cmd string
84var allowDpv4Cmd string
85
86func init() {
87	// Don't test instance creation by default, as quota is necessary and aborted tests could strand resources.
88	flag.StringVar(&instanceToCreate, "it.instance-to-create", "",
89		"The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.")
90	flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b",
91		"The zone in which to create the new test instance.")
92	flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c",
93		"The zone in which to create a second cluster in the test instance.")
94	// Use sysctl or iptables to blackhole DirectPath IP for fallback tests.
95	flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6")
96	flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4")
97	flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6")
98	flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4")
99}
100
101func TestIntegration_ConditionalMutations(t *testing.T) {
102	ctx := context.Background()
103	testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
104	if err != nil {
105		t.Fatal(err)
106	}
107	defer cleanup()
108
109	if err := populatePresidentsGraph(table); err != nil {
110		t.Fatal(err)
111	}
112
113	// Do a conditional mutation with a complex filter.
114	mutTrue := NewMutation()
115	mutTrue.Set("follows", "wmckinley", 1000, []byte("1"))
116	filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter("."))
117	mut := NewCondMutation(filter, mutTrue, nil)
118	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
119		t.Fatalf("Conditionally mutating row: %v", err)
120	}
121	verifyDirectPathRemoteAddress(testEnv, t)
122	// Do a second condition mutation with a filter that does not match,
123	// and thus no changes should be made.
124	mutTrue = NewMutation()
125	mutTrue.DeleteRow()
126	filter = ColumnFilter("snoop.dogg")
127	mut = NewCondMutation(filter, mutTrue, nil)
128	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
129		t.Fatalf("Conditionally mutating row: %v", err)
130	}
131	verifyDirectPathRemoteAddress(testEnv, t)
132
133	// Fetch a row.
134	row, err := table.ReadRow(ctx, "jadams")
135	if err != nil {
136		t.Fatalf("Reading a row: %v", err)
137	}
138	verifyDirectPathRemoteAddress(testEnv, t)
139	wantRow := Row{
140		"follows": []ReadItem{
141			{Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")},
142			{Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")},
143		},
144	}
145	if !testutil.Equal(row, wantRow) {
146		t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
147	}
148}
149
150func TestIntegration_PartialReadRows(t *testing.T) {
151	ctx := context.Background()
152	_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
153	if err != nil {
154		t.Fatal(err)
155	}
156	defer cleanup()
157
158	if err := populatePresidentsGraph(table); err != nil {
159		t.Fatal(err)
160	}
161
162	// Do a scan and stop part way through.
163	// Verify that the ReadRows callback doesn't keep running.
164	stopped := false
165	err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
166		if r.Key() < "h" {
167			return true
168		}
169		if !stopped {
170			stopped = true
171			return false
172		}
173		t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key())
174		return false
175	})
176	if err != nil {
177		t.Fatalf("Partial ReadRows: %v", err)
178	}
179}
180
181func TestIntegration_ReadRowList(t *testing.T) {
182	ctx := context.Background()
183	_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
184	if err != nil {
185		t.Fatal(err)
186	}
187	defer cleanup()
188
189	if err := populatePresidentsGraph(table); err != nil {
190		t.Fatal(err)
191	}
192
193	// Read a RowList
194	var elt []string
195	keys := RowList{"wmckinley", "gwashington", "jadams"}
196	want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1"
197	err = table.ReadRows(ctx, keys, func(r Row) bool {
198		for _, ris := range r {
199			for _, ri := range ris {
200				elt = append(elt, formatReadItem(ri))
201			}
202		}
203		return true
204	})
205	if err != nil {
206		t.Fatalf("read RowList: %v", err)
207	}
208
209	if got := strings.Join(elt, ","); got != want {
210		t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
211	}
212}
213
214func TestIntegration_DeleteRow(t *testing.T) {
215	ctx := context.Background()
216	_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
217	if err != nil {
218		t.Fatal(err)
219	}
220	defer cleanup()
221
222	if err := populatePresidentsGraph(table); err != nil {
223		t.Fatal(err)
224	}
225
226	// Delete a row and check it goes away.
227	mut := NewMutation()
228	mut.DeleteRow()
229	if err := table.Apply(ctx, "wmckinley", mut); err != nil {
230		t.Fatalf("Apply DeleteRow: %v", err)
231	}
232	row, err := table.ReadRow(ctx, "wmckinley")
233	if err != nil {
234		t.Fatalf("Reading a row after DeleteRow: %v", err)
235	}
236	if len(row) != 0 {
237		t.Fatalf("Read non-zero row after DeleteRow: %v", row)
238	}
239}
240
241func TestIntegration_ReadModifyWrite(t *testing.T) {
242	ctx := context.Background()
243	testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
244	if err != nil {
245		t.Fatal(err)
246	}
247	defer cleanup()
248
249	if err := populatePresidentsGraph(table); err != nil {
250		t.Fatal(err)
251	}
252
253	if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil {
254		t.Fatalf("Creating column family: %v", err)
255	}
256
257	appendRMW := func(b []byte) *ReadModifyWrite {
258		rmw := NewReadModifyWrite()
259		rmw.AppendValue("counter", "likes", b)
260		return rmw
261	}
262	incRMW := func(n int64) *ReadModifyWrite {
263		rmw := NewReadModifyWrite()
264		rmw.Increment("counter", "likes", n)
265		return rmw
266	}
267	rmwSeq := []struct {
268		desc string
269		rmw  *ReadModifyWrite
270		want []byte
271	}{
272		{
273			desc: "append #1",
274			rmw:  appendRMW([]byte{0, 0, 0}),
275			want: []byte{0, 0, 0},
276		},
277		{
278			desc: "append #2",
279			rmw:  appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17
280			want: []byte{0, 0, 0, 0, 0, 0, 0, 17},
281		},
282		{
283			desc: "increment",
284			rmw:  incRMW(8),
285			want: []byte{0, 0, 0, 0, 0, 0, 0, 25},
286		},
287	}
288	for _, step := range rmwSeq {
289		row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw)
290		if err != nil {
291			t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err)
292		}
293		verifyDirectPathRemoteAddress(testEnv, t)
294		// Make sure the modified cell returned by the RMW operation has a timestamp.
295		if row["counter"][0].Timestamp == 0 {
296			t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp)
297		}
298		clearTimestamps(row)
299		wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}}
300		if !testutil.Equal(row, wantRow) {
301			t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow)
302		}
303	}
304
305	// Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator.
306	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0}))
307	if err != nil {
308		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
309	}
310	verifyDirectPathRemoteAddress(testEnv, t)
311	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0}))
312	if err != nil {
313		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
314	}
315	verifyDirectPathRemoteAddress(testEnv, t)
316	// Get only the correct row back on read.
317	r, err := table.ReadRow(ctx, "issue-723-1")
318	if err != nil {
319		t.Fatalf("Reading row: %v", err)
320	}
321	verifyDirectPathRemoteAddress(testEnv, t)
322	if r.Key() != "issue-723-1" {
323		t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1")
324	}
325}
326
327func TestIntegration_ArbitraryTimestamps(t *testing.T) {
328	ctx := context.Background()
329	_, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
330	if err != nil {
331		t.Fatal(err)
332	}
333	defer cleanup()
334
335	// Test arbitrary timestamps more thoroughly.
336	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
337		t.Fatalf("Creating column family: %v", err)
338	}
339	const numVersions = 4
340	mut := NewMutation()
341	for i := 1; i < numVersions; i++ {
342		// Timestamps are used in thousands because the server
343		// only permits that granularity.
344		mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
345		mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
346	}
347	if err := table.Apply(ctx, "testrow", mut); err != nil {
348		t.Fatalf("Mutating row: %v", err)
349	}
350	r, err := table.ReadRow(ctx, "testrow")
351	if err != nil {
352		t.Fatalf("Reading row: %v", err)
353	}
354	wantRow := Row{"ts": []ReadItem{
355		// These should be returned in descending timestamp order.
356		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
357		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
358		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
359		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
360		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
361		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
362	}}
363	if !testutil.Equal(r, wantRow) {
364		t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow)
365	}
366
367	// Do the same read, but filter to the latest two versions.
368	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
369	if err != nil {
370		t.Fatalf("Reading row: %v", err)
371	}
372	wantRow = Row{"ts": []ReadItem{
373		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
374		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
375		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
376		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
377	}}
378	if !testutil.Equal(r, wantRow) {
379		t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
380	}
381	// Check cell offset / limit
382	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3)))
383	if err != nil {
384		t.Fatalf("Reading row: %v", err)
385	}
386	wantRow = Row{"ts": []ReadItem{
387		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
388		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
389		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
390	}}
391	if !testutil.Equal(r, wantRow) {
392		t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow)
393	}
394	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3)))
395	if err != nil {
396		t.Fatalf("Reading row: %v", err)
397	}
398	wantRow = Row{"ts": []ReadItem{
399		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
400		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
401		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
402	}}
403	if !testutil.Equal(r, wantRow) {
404		t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow)
405	}
406	// Check timestamp range filtering (with truncation)
407	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000)))
408	if err != nil {
409		t.Fatalf("Reading row: %v", err)
410	}
411	wantRow = Row{"ts": []ReadItem{
412		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
413		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
414		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
415		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
416	}}
417	if !testutil.Equal(r, wantRow) {
418		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow)
419	}
420	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0)))
421	if err != nil {
422		t.Fatalf("Reading row: %v", err)
423	}
424	wantRow = Row{"ts": []ReadItem{
425		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
426		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
427		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
428		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
429		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
430		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
431	}}
432	if !testutil.Equal(r, wantRow) {
433		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow)
434	}
435	// Delete non-existing cells, no such column family in this row
436	// Should not delete anything
437	if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil {
438		t.Fatalf("Creating column family: %v", err)
439	}
440	mut = NewMutation()
441	mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval
442	if err := table.Apply(ctx, "testrow", mut); err != nil {
443		t.Fatalf("Mutating row: %v", err)
444	}
445	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
446	if err != nil {
447		t.Fatalf("Reading row: %v", err)
448	}
449	if !testutil.Equal(r, wantRow) {
450		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
451	}
452	// Delete non-existing cells, no such column in this column family
453	// Should not delete anything
454	mut = NewMutation()
455	mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval
456	if err := table.Apply(ctx, "testrow", mut); err != nil {
457		t.Fatalf("Mutating row: %v", err)
458	}
459	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
460	if err != nil {
461		t.Fatalf("Reading row: %v", err)
462	}
463	if !testutil.Equal(r, wantRow) {
464		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
465	}
466	// Delete the cell with timestamp 2000 and repeat the last read,
467	// checking that we get ts 3000 and ts 1000.
468	mut = NewMutation()
469	mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval
470	if err := table.Apply(ctx, "testrow", mut); err != nil {
471		t.Fatalf("Mutating row: %v", err)
472	}
473	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
474	if err != nil {
475		t.Fatalf("Reading row: %v", err)
476	}
477	wantRow = Row{"ts": []ReadItem{
478		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
479		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
480		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
481		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
482	}}
483	if !testutil.Equal(r, wantRow) {
484		t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow)
485	}
486
487	// Check DeleteCellsInFamily
488	if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil {
489		t.Fatalf("Creating column family: %v", err)
490	}
491
492	mut = NewMutation()
493	mut.Set("status", "start", 2000, []byte("2"))
494	mut.Set("status", "end", 3000, []byte("3"))
495	mut.Set("ts", "col", 1000, []byte("1"))
496	if err := table.Apply(ctx, "row1", mut); err != nil {
497		t.Fatalf("Mutating row: %v", err)
498	}
499	if err := table.Apply(ctx, "row2", mut); err != nil {
500		t.Fatalf("Mutating row: %v", err)
501	}
502
503	mut = NewMutation()
504	mut.DeleteCellsInFamily("status")
505	if err := table.Apply(ctx, "row1", mut); err != nil {
506		t.Fatalf("Delete cf: %v", err)
507	}
508
509	// ColumnFamily removed
510	r, err = table.ReadRow(ctx, "row1")
511	if err != nil {
512		t.Fatalf("Reading row: %v", err)
513	}
514	wantRow = Row{"ts": []ReadItem{
515		{Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
516	}}
517	if !testutil.Equal(r, wantRow) {
518		t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow)
519	}
520
521	// ColumnFamily not removed
522	r, err = table.ReadRow(ctx, "row2")
523	if err != nil {
524		t.Fatalf("Reading row: %v", err)
525	}
526	wantRow = Row{
527		"ts": []ReadItem{
528			{Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
529		},
530		"status": []ReadItem{
531			{Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")},
532			{Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
533		},
534	}
535	if !testutil.Equal(r, wantRow) {
536		t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow)
537	}
538
539	// Check DeleteCellsInColumn
540	mut = NewMutation()
541	mut.Set("status", "start", 2000, []byte("2"))
542	mut.Set("status", "middle", 3000, []byte("3"))
543	mut.Set("status", "end", 1000, []byte("1"))
544	if err := table.Apply(ctx, "row3", mut); err != nil {
545		t.Fatalf("Mutating row: %v", err)
546	}
547	mut = NewMutation()
548	mut.DeleteCellsInColumn("status", "middle")
549	if err := table.Apply(ctx, "row3", mut); err != nil {
550		t.Fatalf("Delete column: %v", err)
551	}
552	r, err = table.ReadRow(ctx, "row3")
553	if err != nil {
554		t.Fatalf("Reading row: %v", err)
555	}
556	wantRow = Row{
557		"status": []ReadItem{
558			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
559			{Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
560		},
561	}
562	if !testutil.Equal(r, wantRow) {
563		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
564	}
565	mut = NewMutation()
566	mut.DeleteCellsInColumn("status", "start")
567	if err := table.Apply(ctx, "row3", mut); err != nil {
568		t.Fatalf("Delete column: %v", err)
569	}
570	r, err = table.ReadRow(ctx, "row3")
571	if err != nil {
572		t.Fatalf("Reading row: %v", err)
573	}
574	wantRow = Row{
575		"status": []ReadItem{
576			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
577		},
578	}
579	if !testutil.Equal(r, wantRow) {
580		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
581	}
582	mut = NewMutation()
583	mut.DeleteCellsInColumn("status", "end")
584	if err := table.Apply(ctx, "row3", mut); err != nil {
585		t.Fatalf("Delete column: %v", err)
586	}
587	r, err = table.ReadRow(ctx, "row3")
588	if err != nil {
589		t.Fatalf("Reading row: %v", err)
590	}
591	if len(r) != 0 {
592		t.Fatalf("Delete column: got %v, want empty row", r)
593	}
594	// Add same cell after delete
595	mut = NewMutation()
596	mut.Set("status", "end", 1000, []byte("1"))
597	if err := table.Apply(ctx, "row3", mut); err != nil {
598		t.Fatalf("Mutating row: %v", err)
599	}
600	r, err = table.ReadRow(ctx, "row3")
601	if err != nil {
602		t.Fatalf("Reading row: %v", err)
603	}
604	if !testutil.Equal(r, wantRow) {
605		t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow)
606	}
607}
608
609func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) {
610	ctx := context.Background()
611	_, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
612	if err != nil {
613		t.Fatal(err)
614	}
615	defer cleanup()
616
617	if err := populatePresidentsGraph(table); err != nil {
618		t.Fatal(err)
619	}
620
621	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
622		t.Fatalf("Creating column family: %v", err)
623	}
624
625	// Do highly concurrent reads/writes.
626	const maxConcurrency = 1000
627	var wg sync.WaitGroup
628	for i := 0; i < maxConcurrency; i++ {
629		wg.Add(1)
630		go func() {
631			defer wg.Done()
632			switch r := rand.Intn(100); { // r ∈ [0,100)
633			case 0 <= r && r < 30:
634				// Do a read.
635				_, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1)))
636				if err != nil {
637					t.Errorf("Concurrent read: %v", err)
638				}
639			case 30 <= r && r < 100:
640				// Do a write.
641				mut := NewMutation()
642				mut.Set("ts", "col", 1000, []byte("data"))
643				if err := table.Apply(ctx, "testrow", mut); err != nil {
644					t.Errorf("Concurrent write: %v", err)
645				}
646			}
647		}()
648	}
649	wg.Wait()
650}
651
652func TestIntegration_LargeReadsWritesAndScans(t *testing.T) {
653	ctx := context.Background()
654	testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
655	if err != nil {
656		t.Fatal(err)
657	}
658	defer cleanup()
659
660	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
661		t.Fatalf("Creating column family: %v", err)
662	}
663
664	bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set.
665	nonsense := []byte("lorem ipsum dolor sit amet, ")
666	fill(bigBytes, nonsense)
667	mut := NewMutation()
668	mut.Set("ts", "col", 1000, bigBytes)
669	if err := table.Apply(ctx, "bigrow", mut); err != nil {
670		t.Fatalf("Big write: %v", err)
671	}
672	verifyDirectPathRemoteAddress(testEnv, t)
673	r, err := table.ReadRow(ctx, "bigrow")
674	if err != nil {
675		t.Fatalf("Big read: %v", err)
676	}
677	verifyDirectPathRemoteAddress(testEnv, t)
678	wantRow := Row{"ts": []ReadItem{
679		{Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes},
680	}}
681	if !testutil.Equal(r, wantRow) {
682		t.Fatalf("Big read returned incorrect bytes: %v", r)
683	}
684
685	var wg sync.WaitGroup
686	// Now write 1000 rows, each with 82 KB values, then scan them all.
687	medBytes := make([]byte, 82<<10)
688	fill(medBytes, nonsense)
689	sem := make(chan int, 50) // do up to 50 mutations at a time.
690	for i := 0; i < 1000; i++ {
691		mut := NewMutation()
692		mut.Set("ts", "big-scan", 1000, medBytes)
693		row := fmt.Sprintf("row-%d", i)
694		wg.Add(1)
695		go func() {
696			defer wg.Done()
697			defer func() { <-sem }()
698			sem <- 1
699			if err := table.Apply(ctx, row, mut); err != nil {
700				t.Errorf("Preparing large scan: %v", err)
701			}
702			verifyDirectPathRemoteAddress(testEnv, t)
703		}()
704	}
705	wg.Wait()
706	n := 0
707	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
708		for _, ris := range r {
709			for _, ri := range ris {
710				n += len(ri.Value)
711			}
712		}
713		return true
714	}, RowFilter(ColumnFilter("big-scan")))
715	if err != nil {
716		t.Fatalf("Doing large scan: %v", err)
717	}
718	verifyDirectPathRemoteAddress(testEnv, t)
719	if want := 1000 * len(medBytes); n != want {
720		t.Fatalf("Large scan returned %d bytes, want %d", n, want)
721	}
722	// Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption.
723	rc := 0
724	wantRc := 3
725	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
726		rc++
727		return true
728	}, LimitRows(int64(wantRc)))
729	if err != nil {
730		t.Fatal(err)
731	}
732	verifyDirectPathRemoteAddress(testEnv, t)
733	if rc != wantRc {
734		t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc)
735	}
736
737	// Test bulk mutations
738	if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil {
739		t.Fatalf("Creating column family: %v", err)
740	}
741	bulkData := map[string][]string{
742		"red sox":  {"2004", "2007", "2013"},
743		"patriots": {"2001", "2003", "2004", "2014"},
744		"celtics":  {"1981", "1984", "1986", "2008"},
745	}
746	var rowKeys []string
747	var muts []*Mutation
748	for row, ss := range bulkData {
749		mut := NewMutation()
750		for _, name := range ss {
751			mut.Set("bulk", name, 1000, []byte("1"))
752		}
753		rowKeys = append(rowKeys, row)
754		muts = append(muts, mut)
755	}
756	status, err := table.ApplyBulk(ctx, rowKeys, muts)
757	if err != nil {
758		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
759	}
760	verifyDirectPathRemoteAddress(testEnv, t)
761	if status != nil {
762		t.Fatalf("non-nil errors: %v", err)
763	}
764
765	// Read each row back
766	for rowKey, ss := range bulkData {
767		row, err := table.ReadRow(ctx, rowKey)
768		if err != nil {
769			t.Fatalf("Reading a bulk row: %v", err)
770		}
771		verifyDirectPathRemoteAddress(testEnv, t)
772		var wantItems []ReadItem
773		for _, val := range ss {
774			wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")})
775		}
776		wantRow := Row{"bulk": wantItems}
777		if !testutil.Equal(row, wantRow) {
778			t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
779		}
780	}
781
782	// Test bulk write errors.
783	// Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error.
784	badMut := NewMutation()
785	badMut.Set("badfamily", "col", ServerTime, nil)
786	badMut2 := NewMutation()
787	badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1"))
788	status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2})
789	if err != nil {
790		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
791	}
792	verifyDirectPathRemoteAddress(testEnv, t)
793	if status == nil {
794		t.Fatalf("No errors for bad bulk mutation")
795	} else if status[0] == nil || status[1] == nil {
796		t.Fatalf("No error for bad bulk mutation")
797	}
798}
799
800func TestIntegration_Read(t *testing.T) {
801	ctx := context.Background()
802	testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
803	if err != nil {
804		t.Fatal(err)
805	}
806	defer cleanup()
807
808	// Insert some data.
809	initialData := map[string][]string{
810		"wmckinley":   {"tjefferson"},
811		"gwashington": {"jadams"},
812		"tjefferson":  {"gwashington", "jadams", "wmckinley"},
813		"jadams":      {"gwashington", "tjefferson"},
814	}
815	for row, ss := range initialData {
816		mut := NewMutation()
817		for _, name := range ss {
818			mut.Set("follows", name, 1000, []byte("1"))
819		}
820		if err := table.Apply(ctx, row, mut); err != nil {
821			t.Fatalf("Mutating row %q: %v", row, err)
822		}
823		verifyDirectPathRemoteAddress(testEnv, t)
824	}
825
826	for _, test := range []struct {
827		desc   string
828		rr     RowSet
829		filter Filter     // may be nil
830		limit  ReadOption // may be nil
831
832		// We do the read, grab all the cells, turn them into "<row>-<col>-<val>",
833		// and join with a comma.
834		want       string
835		wantLabels []string
836	}{
837		{
838			desc: "read all, unfiltered",
839			rr:   RowRange{},
840			want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
841		},
842		{
843			desc: "read with InfiniteRange, unfiltered",
844			rr:   InfiniteRange("tjefferson"),
845			want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
846		},
847		{
848			desc: "read with NewRange, unfiltered",
849			rr:   NewRange("gargamel", "hubbard"),
850			want: "gwashington-jadams-1",
851		},
852		{
853			desc: "read with PrefixRange, unfiltered",
854			rr:   PrefixRange("jad"),
855			want: "jadams-gwashington-1,jadams-tjefferson-1",
856		},
857		{
858			desc: "read with SingleRow, unfiltered",
859			rr:   SingleRow("wmckinley"),
860			want: "wmckinley-tjefferson-1",
861		},
862		{
863			desc:   "read all, with ColumnFilter",
864			rr:     RowRange{},
865			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
866			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
867		},
868		{
869			desc:   "read all, with ColumnFilter, prefix",
870			rr:     RowRange{},
871			filter: ColumnFilter("j"), // no matches
872			want:   "",
873		},
874		{
875			desc:   "read range, with ColumnRangeFilter",
876			rr:     RowRange{},
877			filter: ColumnRangeFilter("follows", "h", "k"),
878			want:   "gwashington-jadams-1,tjefferson-jadams-1",
879		},
880		{
881			desc:   "read range from empty, with ColumnRangeFilter",
882			rr:     RowRange{},
883			filter: ColumnRangeFilter("follows", "", "u"),
884			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
885		},
886		{
887			desc:   "read range from start to empty, with ColumnRangeFilter",
888			rr:     RowRange{},
889			filter: ColumnRangeFilter("follows", "h", ""),
890			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
891		},
892		{
893			desc:   "read with RowKeyFilter",
894			rr:     RowRange{},
895			filter: RowKeyFilter(".*wash.*"),
896			want:   "gwashington-jadams-1",
897		},
898		{
899			desc:   "read with RowKeyFilter, prefix",
900			rr:     RowRange{},
901			filter: RowKeyFilter("gwash"),
902			want:   "",
903		},
904		{
905			desc:   "read with RowKeyFilter, no matches",
906			rr:     RowRange{},
907			filter: RowKeyFilter(".*xxx.*"),
908			want:   "",
909		},
910		{
911			desc:   "read with FamilyFilter, no matches",
912			rr:     RowRange{},
913			filter: FamilyFilter(".*xxx.*"),
914			want:   "",
915		},
916		{
917			desc:   "read with ColumnFilter + row limit",
918			rr:     RowRange{},
919			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
920			limit:  LimitRows(2),
921			want:   "gwashington-jadams-1,jadams-tjefferson-1",
922		},
923		{
924			desc:       "apply labels to the result rows",
925			rr:         RowRange{},
926			filter:     LabelFilter("test-label"),
927			limit:      LimitRows(2),
928			want:       "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1",
929			wantLabels: []string{"test-label", "test-label", "test-label"},
930		},
931		{
932			desc:   "read all, strip values",
933			rr:     RowRange{},
934			filter: StripValueFilter(),
935			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
936		},
937		{
938			desc:   "read with ColumnFilter + row limit + strip values",
939			rr:     RowRange{},
940			filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson"
941			limit:  LimitRows(2),
942			want:   "gwashington-jadams-,jadams-tjefferson-",
943		},
944		{
945			desc:   "read with condition, strip values on true",
946			rr:     RowRange{},
947			filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil),
948			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
949		},
950		{
951			desc:   "read with condition, strip values on false",
952			rr:     RowRange{},
953			filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()),
954			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
955		},
956		{
957			desc:   "read with ValueRangeFilter + row limit",
958			rr:     RowRange{},
959			filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1"
960			limit:  LimitRows(2),
961			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1",
962		},
963		{
964			desc:   "read with ValueRangeFilter, no match on exclusive end",
965			rr:     RowRange{},
966			filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match
967			want:   "",
968		},
969		{
970			desc:   "read with ValueRangeFilter, no matches",
971			rr:     RowRange{},
972			filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing
973			want:   "",
974		},
975		{
976			desc:   "read with InterleaveFilter, no matches on all filters",
977			rr:     RowRange{},
978			filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")),
979			want:   "",
980		},
981		{
982			desc:   "read with InterleaveFilter, no duplicate cells",
983			rr:     RowRange{},
984			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")),
985			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
986		},
987		{
988			desc:   "read with InterleaveFilter, with duplicate cells",
989			rr:     RowRange{},
990			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")),
991			want:   "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1",
992		},
993		{
994			desc: "read with a RowRangeList and no filter",
995			rr:   RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")},
996			want: "gwashington-jadams-1,wmckinley-tjefferson-1",
997		},
998		{
999			desc:   "chain that excludes rows and matches nothing, in a condition",
1000			rr:     RowRange{},
1001			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil),
1002			want:   "",
1003		},
1004		{
1005			desc:   "chain that ends with an interleave that has no match. covers #804",
1006			rr:     RowRange{},
1007			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil),
1008			want:   "",
1009		},
1010	} {
1011		t.Run(test.desc, func(t *testing.T) {
1012			var opts []ReadOption
1013			if test.filter != nil {
1014				opts = append(opts, RowFilter(test.filter))
1015			}
1016			if test.limit != nil {
1017				opts = append(opts, test.limit)
1018			}
1019			var elt, labels []string
1020			err := table.ReadRows(ctx, test.rr, func(r Row) bool {
1021				for _, ris := range r {
1022					for _, ri := range ris {
1023						labels = append(labels, ri.Labels...)
1024						elt = append(elt, formatReadItem(ri))
1025					}
1026				}
1027				return true
1028			}, opts...)
1029			if err != nil {
1030				t.Fatal(err)
1031			}
1032			verifyDirectPathRemoteAddress(testEnv, t)
1033			if got := strings.Join(elt, ","); got != test.want {
1034				t.Fatalf("got %q\nwant %q", got, test.want)
1035			}
1036			if got, want := labels, test.wantLabels; !reflect.DeepEqual(got, want) {
1037				t.Fatalf("got %q\nwant %q", got, want)
1038			}
1039		})
1040	}
1041}
1042
1043func TestIntegration_SampleRowKeys(t *testing.T) {
1044	ctx := context.Background()
1045	testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
1046	if err != nil {
1047		t.Fatal(err)
1048	}
1049	defer cleanup()
1050
1051	// Insert some data.
1052	initialData := map[string][]string{
1053		"wmckinley11":   {"tjefferson11"},
1054		"gwashington77": {"jadams77"},
1055		"tjefferson0":   {"gwashington0", "jadams0"},
1056	}
1057
1058	for row, ss := range initialData {
1059		mut := NewMutation()
1060		for _, name := range ss {
1061			mut.Set("follows", name, 1000, []byte("1"))
1062		}
1063		if err := table.Apply(ctx, row, mut); err != nil {
1064			t.Fatalf("Mutating row %q: %v", row, err)
1065		}
1066		verifyDirectPathRemoteAddress(testEnv, t)
1067	}
1068	sampleKeys, err := table.SampleRowKeys(context.Background())
1069	if err != nil {
1070		t.Fatalf("%s: %v", "SampleRowKeys:", err)
1071	}
1072	if len(sampleKeys) == 0 {
1073		t.Error("SampleRowKeys length 0")
1074	}
1075}
1076
1077func TestIntegration_Admin(t *testing.T) {
1078	testEnv, err := NewIntegrationEnv()
1079	if err != nil {
1080		t.Fatalf("IntegrationEnv: %v", err)
1081	}
1082	defer testEnv.Close()
1083
1084	timeout := 2 * time.Second
1085	if testEnv.Config().UseProd {
1086		timeout = 5 * time.Minute
1087	}
1088	ctx, _ := context.WithTimeout(context.Background(), timeout)
1089
1090	adminClient, err := testEnv.NewAdminClient()
1091	if err != nil {
1092		t.Fatalf("NewAdminClient: %v", err)
1093	}
1094	defer adminClient.Close()
1095
1096	iAdminClient, err := testEnv.NewInstanceAdminClient()
1097	if err != nil {
1098		t.Fatalf("NewInstanceAdminClient: %v", err)
1099	}
1100	if iAdminClient != nil {
1101		defer iAdminClient.Close()
1102
1103		iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1104		if err != nil {
1105			t.Errorf("InstanceInfo: %v", err)
1106		}
1107		if iInfo.Name != adminClient.instance {
1108			t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1109		}
1110	}
1111
1112	list := func() []string {
1113		tbls, err := adminClient.Tables(ctx)
1114		if err != nil {
1115			t.Fatalf("Fetching list of tables: %v", err)
1116		}
1117		sort.Strings(tbls)
1118		return tbls
1119	}
1120	containsAll := func(got, want []string) bool {
1121		gotSet := make(map[string]bool)
1122
1123		for _, s := range got {
1124			gotSet[s] = true
1125		}
1126		for _, s := range want {
1127			if !gotSet[s] {
1128				return false
1129			}
1130		}
1131		return true
1132	}
1133
1134	defer deleteTable(ctx, t, adminClient, "mytable")
1135
1136	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1137		t.Fatalf("Creating table: %v", err)
1138	}
1139
1140	defer deleteTable(ctx, t, adminClient, "myothertable")
1141
1142	if err := adminClient.CreateTable(ctx, "myothertable"); err != nil {
1143		t.Fatalf("Creating table: %v", err)
1144	}
1145
1146	if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) {
1147		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1148	}
1149
1150	must(adminClient.WaitForReplication(ctx, "mytable"))
1151
1152	if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil {
1153		t.Fatalf("Deleting table: %v", err)
1154	}
1155	tables := list()
1156	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1157		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1158	}
1159	if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) {
1160		t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted)
1161	}
1162
1163	tblConf := TableConf{
1164		TableID: "conftable",
1165		Families: map[string]GCPolicy{
1166			"fam1": MaxVersionsPolicy(1),
1167			"fam2": MaxVersionsPolicy(2),
1168		},
1169	}
1170	if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
1171		t.Fatalf("Creating table from TableConf: %v", err)
1172	}
1173	defer deleteTable(ctx, t, adminClient, tblConf.TableID)
1174
1175	tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID)
1176	if err != nil {
1177		t.Fatalf("Getting table info: %v", err)
1178	}
1179	sort.Strings(tblInfo.Families)
1180	wantFams := []string{"fam1", "fam2"}
1181	if !testutil.Equal(tblInfo.Families, wantFams) {
1182		t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams)
1183	}
1184
1185	// Populate mytable and drop row ranges
1186	if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil {
1187		t.Fatalf("Creating column family: %v", err)
1188	}
1189
1190	client, err := testEnv.NewClient()
1191	if err != nil {
1192		t.Fatalf("NewClient: %v", err)
1193	}
1194	defer client.Close()
1195
1196	tbl := client.Open("mytable")
1197
1198	prefixes := []string{"a", "b", "c"}
1199	for _, prefix := range prefixes {
1200		for i := 0; i < 5; i++ {
1201			mut := NewMutation()
1202			mut.Set("cf", "col", 1000, []byte("1"))
1203			if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil {
1204				t.Fatalf("Mutating row: %v", err)
1205			}
1206		}
1207	}
1208
1209	if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil {
1210		t.Errorf("DropRowRange a: %v", err)
1211	}
1212	if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil {
1213		t.Errorf("DropRowRange c: %v", err)
1214	}
1215	if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil {
1216		t.Errorf("DropRowRange x: %v", err)
1217	}
1218
1219	var gotRowCount int
1220	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1221		gotRowCount++
1222		if !strings.HasPrefix(row.Key(), "b") {
1223			t.Errorf("Invalid row after dropping range: %v", row)
1224		}
1225		return true
1226	}))
1227	if gotRowCount != 5 {
1228		t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5)
1229	}
1230
1231	if err = adminClient.DropAllRows(ctx, "mytable"); err != nil {
1232		t.Errorf("DropAllRows mytable: %v", err)
1233	}
1234
1235	gotRowCount = 0
1236	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1237		gotRowCount++
1238		return true
1239	}))
1240	if gotRowCount != 0 {
1241		t.Errorf("Invalid row count after truncating table: got %v, want %v", gotRowCount, 0)
1242	}
1243}
1244
1245func TestIntegration_TableIam(t *testing.T) {
1246	testEnv, err := NewIntegrationEnv()
1247	if err != nil {
1248		t.Fatalf("IntegrationEnv: %v", err)
1249	}
1250	defer testEnv.Close()
1251
1252	if !testEnv.Config().UseProd {
1253		t.Skip("emulator doesn't support IAM Policy creation")
1254	}
1255
1256	timeout := 5 * time.Minute
1257	ctx, _ := context.WithTimeout(context.Background(), timeout)
1258
1259	adminClient, err := testEnv.NewAdminClient()
1260	if err != nil {
1261		t.Fatalf("NewAdminClient: %v", err)
1262	}
1263	defer adminClient.Close()
1264
1265	defer deleteTable(ctx, t, adminClient, "mytable")
1266	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1267		t.Fatalf("Creating table: %v", err)
1268	}
1269
1270	// Verify that the IAM Controls work for Tables.
1271	iamHandle := adminClient.TableIAM("mytable")
1272	p, err := iamHandle.Policy(ctx)
1273	if err != nil {
1274		t.Errorf("Iam GetPolicy mytable: %v", err)
1275	}
1276	if err = iamHandle.SetPolicy(ctx, p); err != nil {
1277		t.Errorf("Iam SetPolicy mytable: %v", err)
1278	}
1279	if _, err = iamHandle.TestPermissions(ctx, []string{"bigtable.tables.get"}); err != nil {
1280		t.Errorf("Iam TestPermissions mytable: %v", err)
1281	}
1282}
1283
1284func TestIntegration_BackupIAM(t *testing.T) {
1285	testEnv, err := NewIntegrationEnv()
1286	if err != nil {
1287		t.Fatalf("IntegrationEnv: %v", err)
1288	}
1289	defer testEnv.Close()
1290
1291	if !testEnv.Config().UseProd {
1292		t.Skip("emulator doesn't support IAM Policy creation")
1293	}
1294	timeout := 5 * time.Minute
1295	ctx, _ := context.WithTimeout(context.Background(), timeout)
1296
1297	adminClient, err := testEnv.NewAdminClient()
1298	if err != nil {
1299		t.Fatalf("NewAdminClient: %v", err)
1300	}
1301	defer adminClient.Close()
1302
1303	table := testEnv.Config().Table
1304	cluster := testEnv.Config().Cluster
1305
1306	defer deleteTable(ctx, t, adminClient, table)
1307	if err := adminClient.CreateTable(ctx, table); err != nil {
1308		t.Fatalf("Creating table: %v", err)
1309	}
1310	// Create backup.
1311	backup := "backup"
1312	defer adminClient.DeleteBackup(ctx, cluster, backup)
1313	if err = adminClient.CreateBackup(ctx, table, cluster, backup, time.Now().Add(8*time.Hour)); err != nil {
1314		t.Fatalf("Creating backup: %v", err)
1315	}
1316	iamHandle := adminClient.BackupIAM(cluster, backup)
1317	// Get backup policy.
1318	p, err := iamHandle.Policy(ctx)
1319	if err != nil {
1320		t.Errorf("iamHandle.Policy: %v", err)
1321	}
1322	// The resource is new, so the policy should be empty.
1323	if got := p.Roles(); len(got) > 0 {
1324		t.Errorf("got roles %v, want none", got)
1325	}
1326	// Set backup policy.
1327	member := "domain:google.com"
1328	// Add a member, set the policy, then check that the member is present.
1329	p.Add(member, iam.Viewer)
1330	if err = iamHandle.SetPolicy(ctx, p); err != nil {
1331		t.Errorf("iamHandle.SetPolicy: %v", err)
1332	}
1333	p, err = iamHandle.Policy(ctx)
1334	if err != nil {
1335		t.Errorf("iamHandle.Policy: %v", err)
1336	}
1337	if got, want := p.Members(iam.Viewer), []string{member}; !testutil.Equal(got, want) {
1338		t.Errorf("iamHandle.Policy: got %v, want %v", got, want)
1339	}
1340	// Test backup permissions.
1341	permissions := []string{"bigtable.backups.get", "bigtable.backups.update"}
1342	_, err = iamHandle.TestPermissions(ctx, permissions)
1343	if err != nil {
1344		t.Errorf("iamHandle.TestPermissions: %v", err)
1345	}
1346}
1347
1348func TestIntegration_AdminCreateInstance(t *testing.T) {
1349	if instanceToCreate == "" {
1350		t.Skip("instanceToCreate not set, skipping instance creation testing")
1351	}
1352
1353	testEnv, err := NewIntegrationEnv()
1354	if err != nil {
1355		t.Fatalf("IntegrationEnv: %v", err)
1356	}
1357	defer testEnv.Close()
1358
1359	if !testEnv.Config().UseProd {
1360		t.Skip("emulator doesn't support instance creation")
1361	}
1362
1363	timeout := 5 * time.Minute
1364	ctx, _ := context.WithTimeout(context.Background(), timeout)
1365
1366	iAdminClient, err := testEnv.NewInstanceAdminClient()
1367	if err != nil {
1368		t.Fatalf("NewInstanceAdminClient: %v", err)
1369	}
1370	defer iAdminClient.Close()
1371
1372	clusterID := instanceToCreate + "-cluster"
1373
1374	// Create a development instance
1375	conf := &InstanceConf{
1376		InstanceId:   instanceToCreate,
1377		ClusterId:    clusterID,
1378		DisplayName:  "test instance",
1379		Zone:         instanceToCreateZone,
1380		InstanceType: DEVELOPMENT,
1381		Labels:       map[string]string{"test-label-key": "test-label-value"},
1382	}
1383	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1384		t.Fatalf("CreateInstance: %v", err)
1385	}
1386	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1387
1388	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1389	if err != nil {
1390		t.Fatalf("InstanceInfo: %v", err)
1391	}
1392
1393	// Basic return values are tested elsewhere, check instance type
1394	if iInfo.InstanceType != DEVELOPMENT {
1395		t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType)
1396	}
1397	if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
1398		t.Fatalf("Labels: %v, want: %v", got, want)
1399	}
1400
1401	// Update everything we can about the instance in one call.
1402	confWithClusters := &InstanceWithClustersConfig{
1403		InstanceID:   instanceToCreate,
1404		DisplayName:  "new display name",
1405		InstanceType: PRODUCTION,
1406		Labels:       map[string]string{"new-label-key": "new-label-value"},
1407		Clusters: []ClusterConfig{
1408			{ClusterID: clusterID, NumNodes: 5}},
1409	}
1410
1411	if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil {
1412		t.Fatalf("UpdateInstanceWithClusters: %v", err)
1413	}
1414
1415	iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
1416	if err != nil {
1417		t.Fatalf("InstanceInfo: %v", err)
1418	}
1419
1420	if iInfo.InstanceType != PRODUCTION {
1421		t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType)
1422	}
1423	if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) {
1424		t.Fatalf("Labels: %v, want: %v", got, want)
1425	}
1426	if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
1427		t.Fatalf("Display name: %q, want: %q", got, want)
1428	}
1429
1430	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1431	if err != nil {
1432		t.Fatalf("GetCluster: %v", err)
1433	}
1434
1435	if cInfo.ServeNodes != 5 {
1436		t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
1437	}
1438}
1439
1440func TestIntegration_AdminUpdateInstanceLabels(t *testing.T) {
1441
1442	// Check the environments
1443	if instanceToCreate == "" {
1444		t.Skip("instanceToCreate not set, skipping instance creation testing")
1445	}
1446	testEnv, err := NewIntegrationEnv()
1447	if err != nil {
1448		t.Fatalf("IntegrationEnv: %v", err)
1449	}
1450	defer testEnv.Close()
1451	if !testEnv.Config().UseProd {
1452		t.Skip("emulator doesn't support instance creation")
1453	}
1454
1455	// Create an instance admin client
1456	timeout := 5 * time.Minute
1457	ctx, _ := context.WithTimeout(context.Background(), timeout)
1458	iAdminClient, err := testEnv.NewInstanceAdminClient()
1459	if err != nil {
1460		t.Fatalf("NewInstanceAdminClient: %v", err)
1461	}
1462	defer iAdminClient.Close()
1463
1464	// Create a test instance
1465	conf := &InstanceConf{
1466		InstanceId:   instanceToCreate,
1467		ClusterId:    instanceToCreate + "-cluster",
1468		DisplayName:  "test instance",
1469		InstanceType: DEVELOPMENT,
1470		Zone:         instanceToCreateZone,
1471	}
1472	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1473		t.Fatalf("CreateInstance: %v", err)
1474	}
1475	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1476
1477	// Check the created test instances
1478	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1479	if err != nil {
1480		t.Fatalf("InstanceInfo: %v", err)
1481	}
1482	if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
1483		t.Fatalf("Labels: %v, want: %v", got, want)
1484	}
1485
1486	// Test patterns to update instance labels
1487	tests := []struct {
1488		name string
1489		in   map[string]string
1490		out  map[string]string
1491	}{
1492		{
1493			name: "update labels",
1494			in:   map[string]string{"test-label-key": "test-label-value"},
1495			out:  map[string]string{"test-label-key": "test-label-value"},
1496		},
1497		{
1498			name: "update multiple labels",
1499			in:   map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
1500			out:  map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
1501		},
1502		{
1503			name: "not update existing labels",
1504			in:   nil, // nil map
1505			out:  map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
1506		},
1507		{
1508			name: "delete labels",
1509			in:   map[string]string{}, // empty map
1510			out:  nil,
1511		},
1512	}
1513	for _, tt := range tests {
1514		t.Run(tt.name, func(t *testing.T) {
1515			confWithClusters := &InstanceWithClustersConfig{
1516				InstanceID: instanceToCreate,
1517				Labels:     tt.in,
1518			}
1519			if err := iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil {
1520				t.Fatalf("UpdateInstanceWithClusters: %v", err)
1521			}
1522			iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1523			if err != nil {
1524				t.Fatalf("InstanceInfo: %v", err)
1525			}
1526			if got, want := iInfo.Labels, tt.out; !cmp.Equal(got, want) {
1527				t.Fatalf("Labels: %v, want: %v", got, want)
1528			}
1529		})
1530	}
1531}
1532
1533func TestIntegration_AdminUpdateInstanceAndSyncClusters(t *testing.T) {
1534	if instanceToCreate == "" {
1535		t.Skip("instanceToCreate not set, skipping instance update testing")
1536	}
1537
1538	testEnv, err := NewIntegrationEnv()
1539	if err != nil {
1540		t.Fatalf("IntegrationEnv: %v", err)
1541	}
1542	defer testEnv.Close()
1543
1544	if !testEnv.Config().UseProd {
1545		t.Skip("emulator doesn't support instance creation")
1546	}
1547
1548	timeout := 5 * time.Minute
1549	ctx, _ := context.WithTimeout(context.Background(), timeout)
1550
1551	iAdminClient, err := testEnv.NewInstanceAdminClient()
1552	if err != nil {
1553		t.Fatalf("NewInstanceAdminClient: %v", err)
1554	}
1555	defer iAdminClient.Close()
1556
1557	clusterID := instanceToCreate + "-cluster"
1558
1559	// Create a development instance
1560	conf := &InstanceConf{
1561		InstanceId:   instanceToCreate,
1562		ClusterId:    clusterID,
1563		DisplayName:  "test instance",
1564		Zone:         instanceToCreateZone,
1565		InstanceType: DEVELOPMENT,
1566		Labels:       map[string]string{"test-label-key": "test-label-value"},
1567	}
1568	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1569		t.Fatalf("CreateInstance: %v", err)
1570	}
1571	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1572
1573	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1574	if err != nil {
1575		t.Fatalf("InstanceInfo: %v", err)
1576	}
1577
1578	// Basic return values are tested elsewhere, check instance type
1579	if iInfo.InstanceType != DEVELOPMENT {
1580		t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType)
1581	}
1582	if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
1583		t.Fatalf("Labels: %v, want: %v", got, want)
1584	}
1585
1586	// Update everything we can about the instance in one call.
1587	confWithClusters := &InstanceWithClustersConfig{
1588		InstanceID:   instanceToCreate,
1589		DisplayName:  "new display name",
1590		InstanceType: PRODUCTION,
1591		Labels:       map[string]string{"new-label-key": "new-label-value"},
1592		Clusters: []ClusterConfig{
1593			{ClusterID: clusterID, NumNodes: 5}},
1594	}
1595
1596	results, err := UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1597	if err != nil {
1598		t.Fatalf("UpdateInstanceAndSyncClusters: %v", err)
1599	}
1600
1601	wantResults := UpdateInstanceResults{
1602		InstanceUpdated: true,
1603		UpdatedClusters: []string{clusterID},
1604	}
1605	if diff := testutil.Diff(*results, wantResults); diff != "" {
1606		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1607	}
1608
1609	iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
1610	if err != nil {
1611		t.Fatalf("InstanceInfo: %v", err)
1612	}
1613
1614	if iInfo.InstanceType != PRODUCTION {
1615		t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType)
1616	}
1617	if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) {
1618		t.Fatalf("Labels: %v, want: %v", got, want)
1619	}
1620	if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
1621		t.Fatalf("Display name: %q, want: %q", got, want)
1622	}
1623
1624	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1625	if err != nil {
1626		t.Fatalf("GetCluster: %v", err)
1627	}
1628
1629	if cInfo.ServeNodes != 5 {
1630		t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
1631	}
1632
1633	// Now add a second cluster as the only change. The first cluster must also be provided so
1634	// it is not removed.
1635	clusterID2 := clusterID + "-2"
1636	confWithClusters = &InstanceWithClustersConfig{
1637		InstanceID: instanceToCreate,
1638		Clusters: []ClusterConfig{
1639			{ClusterID: clusterID},
1640			{ClusterID: clusterID2, NumNodes: 3, StorageType: SSD, Zone: instanceToCreateZone2}},
1641	}
1642
1643	results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1644	if err != nil {
1645		t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err)
1646	}
1647
1648	wantResults = UpdateInstanceResults{
1649		InstanceUpdated: false,
1650		CreatedClusters: []string{clusterID2},
1651	}
1652	if diff := testutil.Diff(*results, wantResults); diff != "" {
1653		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1654	}
1655
1656	// Now update one cluster and delete the other
1657	confWithClusters = &InstanceWithClustersConfig{
1658		InstanceID: instanceToCreate,
1659		Clusters: []ClusterConfig{
1660			{ClusterID: clusterID, NumNodes: 4}},
1661	}
1662
1663	results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1664	if err != nil {
1665		t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err)
1666	}
1667
1668	wantResults = UpdateInstanceResults{
1669		InstanceUpdated: false,
1670		UpdatedClusters: []string{clusterID},
1671		DeletedClusters: []string{clusterID2},
1672	}
1673
1674	if diff := testutil.Diff(*results, wantResults); diff != "" {
1675		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1676	}
1677
1678	// Make sure the instance looks as we would expect
1679	clusters, err := iAdminClient.Clusters(ctx, conf.InstanceId)
1680	if err != nil {
1681		t.Fatalf("Clusters: %v", err)
1682	}
1683
1684	if len(clusters) != 1 {
1685		t.Fatalf("Clusters length %v, want: 1", len(clusters))
1686	}
1687
1688	wantCluster := &ClusterInfo{
1689		Name:       clusterID,
1690		Zone:       instanceToCreateZone,
1691		ServeNodes: 4,
1692		State:      "READY",
1693	}
1694	if diff := testutil.Diff(clusters[0], wantCluster); diff != "" {
1695		t.Fatalf("InstanceEquality: got - want +\n%s", diff)
1696	}
1697}
1698
1699func TestIntegration_AdminSnapshot(t *testing.T) {
1700	testEnv, err := NewIntegrationEnv()
1701	if err != nil {
1702		t.Fatalf("IntegrationEnv: %v", err)
1703	}
1704	defer testEnv.Close()
1705
1706	if !testEnv.Config().UseProd {
1707		t.Skip("emulator doesn't support snapshots")
1708	}
1709
1710	timeout := 2 * time.Second
1711	if testEnv.Config().UseProd {
1712		timeout = 5 * time.Minute
1713	}
1714	ctx, _ := context.WithTimeout(context.Background(), timeout)
1715
1716	adminClient, err := testEnv.NewAdminClient()
1717	if err != nil {
1718		t.Fatalf("NewAdminClient: %v", err)
1719	}
1720	defer adminClient.Close()
1721
1722	table := testEnv.Config().Table
1723	cluster := testEnv.Config().Cluster
1724
1725	list := func(cluster string) ([]*SnapshotInfo, error) {
1726		infos := []*SnapshotInfo(nil)
1727
1728		it := adminClient.Snapshots(ctx, cluster)
1729		for {
1730			s, err := it.Next()
1731			if err == iterator.Done {
1732				break
1733			}
1734			if err != nil {
1735				return nil, err
1736			}
1737			infos = append(infos, s)
1738		}
1739		return infos, err
1740	}
1741
1742	// Delete the table at the end of the test. Schedule ahead of time
1743	// in case the client fails
1744	defer deleteTable(ctx, t, adminClient, table)
1745
1746	if err := adminClient.CreateTable(ctx, table); err != nil {
1747		t.Fatalf("Creating table: %v", err)
1748	}
1749
1750	// Precondition: no snapshots
1751	snapshots, err := list(cluster)
1752	if err != nil {
1753		t.Fatalf("Initial snapshot list: %v", err)
1754	}
1755	if got, want := len(snapshots), 0; got != want {
1756		t.Fatalf("Initial snapshot list len: %d, want: %d", got, want)
1757	}
1758
1759	// Create snapshot
1760	defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot")
1761
1762	if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil {
1763		t.Fatalf("Creating snaphot: %v", err)
1764	}
1765
1766	// List snapshot
1767	snapshots, err = list(cluster)
1768	if err != nil {
1769		t.Fatalf("Listing snapshots: %v", err)
1770	}
1771	if got, want := len(snapshots), 1; got != want {
1772		t.Fatalf("Listing snapshot count: %d, want: %d", got, want)
1773	}
1774	if got, want := snapshots[0].Name, "mysnapshot"; got != want {
1775		t.Fatalf("Snapshot name: %s, want: %s", got, want)
1776	}
1777	if got, want := snapshots[0].SourceTable, table; got != want {
1778		t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want)
1779	}
1780	if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
1781		t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want)
1782	}
1783
1784	// Get snapshot
1785	snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot")
1786	if err != nil {
1787		t.Fatalf("SnapshotInfo: %v", snapshot)
1788	}
1789	if got, want := *snapshot, *snapshots[0]; got != want {
1790		t.Fatalf("SnapshotInfo: %v, want: %v", got, want)
1791	}
1792
1793	// Restore
1794	restoredTable := table + "-restored"
1795	defer deleteTable(ctx, t, adminClient, restoredTable)
1796	if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil {
1797		t.Fatalf("CreateTableFromSnapshot: %v", err)
1798	}
1799	if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
1800		t.Fatalf("Restored TableInfo: %v", err)
1801	}
1802
1803	// Delete snapshot
1804	if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil {
1805		t.Fatalf("DeleteSnapshot: %v", err)
1806	}
1807	snapshots, err = list(cluster)
1808	if err != nil {
1809		t.Fatalf("List after Delete: %v", err)
1810	}
1811	if got, want := len(snapshots), 0; got != want {
1812		t.Fatalf("List after delete len: %d, want: %d", got, want)
1813	}
1814}
1815
1816// instanceAdminClientMock is used to test FailedLocations field processing.
1817type instanceAdminClientMock struct {
1818	Clusters             []*btapb.Cluster
1819	UnavailableLocations []string
1820}
1821
1822func (iacm *instanceAdminClientMock) ListClusters(ctx context.Context, req *btapb.ListClustersRequest, opts ...grpc.CallOption) (*btapb.ListClustersResponse, error) {
1823	res := btapb.ListClustersResponse{
1824		Clusters:        iacm.Clusters,
1825		FailedLocations: iacm.UnavailableLocations,
1826	}
1827	return &res, nil
1828}
1829
1830func (iacm *instanceAdminClientMock) CreateInstance(ctx context.Context, in *btapb.CreateInstanceRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
1831	return nil, nil
1832}
1833func (iacm *instanceAdminClientMock) GetInstance(ctx context.Context, in *btapb.GetInstanceRequest, opts ...grpc.CallOption) (*btapb.Instance, error) {
1834	return nil, nil
1835}
1836func (iacm *instanceAdminClientMock) ListInstances(ctx context.Context, in *btapb.ListInstancesRequest, opts ...grpc.CallOption) (*btapb.ListInstancesResponse, error) {
1837	return nil, nil
1838}
1839func (iacm *instanceAdminClientMock) UpdateInstance(ctx context.Context, in *btapb.Instance, opts ...grpc.CallOption) (*btapb.Instance, error) {
1840	return nil, nil
1841}
1842func (iacm *instanceAdminClientMock) PartialUpdateInstance(ctx context.Context, in *btapb.PartialUpdateInstanceRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
1843	return nil, nil
1844}
1845func (iacm *instanceAdminClientMock) DeleteInstance(ctx context.Context, in *btapb.DeleteInstanceRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
1846	return nil, nil
1847}
1848func (iacm *instanceAdminClientMock) CreateCluster(ctx context.Context, in *btapb.CreateClusterRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
1849	return nil, nil
1850}
1851func (iacm *instanceAdminClientMock) GetCluster(ctx context.Context, in *btapb.GetClusterRequest, opts ...grpc.CallOption) (*btapb.Cluster, error) {
1852	return nil, nil
1853}
1854func (iacm *instanceAdminClientMock) UpdateCluster(ctx context.Context, in *btapb.Cluster, opts ...grpc.CallOption) (*longrunning.Operation, error) {
1855	return nil, nil
1856}
1857func (iacm *instanceAdminClientMock) DeleteCluster(ctx context.Context, in *btapb.DeleteClusterRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
1858	return nil, nil
1859}
1860func (iacm *instanceAdminClientMock) CreateAppProfile(ctx context.Context, in *btapb.CreateAppProfileRequest, opts ...grpc.CallOption) (*btapb.AppProfile, error) {
1861	return nil, nil
1862}
1863func (iacm *instanceAdminClientMock) GetAppProfile(ctx context.Context, in *btapb.GetAppProfileRequest, opts ...grpc.CallOption) (*btapb.AppProfile, error) {
1864	return nil, nil
1865}
1866func (iacm *instanceAdminClientMock) ListAppProfiles(ctx context.Context, in *btapb.ListAppProfilesRequest, opts ...grpc.CallOption) (*btapb.ListAppProfilesResponse, error) {
1867	return nil, nil
1868}
1869func (iacm *instanceAdminClientMock) UpdateAppProfile(ctx context.Context, in *btapb.UpdateAppProfileRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
1870	return nil, nil
1871}
1872func (iacm *instanceAdminClientMock) DeleteAppProfile(ctx context.Context, in *btapb.DeleteAppProfileRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
1873	return nil, nil
1874}
1875func (iacm *instanceAdminClientMock) GetIamPolicy(ctx context.Context, in *v1.GetIamPolicyRequest, opts ...grpc.CallOption) (*v1.Policy, error) {
1876	return nil, nil
1877}
1878func (iacm *instanceAdminClientMock) SetIamPolicy(ctx context.Context, in *v1.SetIamPolicyRequest, opts ...grpc.CallOption) (*v1.Policy, error) {
1879	return nil, nil
1880}
1881func (iacm *instanceAdminClientMock) TestIamPermissions(ctx context.Context, in *v1.TestIamPermissionsRequest, opts ...grpc.CallOption) (*v1.TestIamPermissionsResponse, error) {
1882	return nil, nil
1883}
1884
1885func TestIntegration_InstanceAdminClient_Clusters_WithFailedLocations(t *testing.T) {
1886	testEnv, err := NewIntegrationEnv()
1887	if err != nil {
1888		t.Fatalf("IntegrationEnv: %v", err)
1889	}
1890	defer testEnv.Close()
1891
1892	if !testEnv.Config().UseProd {
1893		t.Skip("emulator doesn't support snapshots")
1894	}
1895
1896	iAdminClient, err := testEnv.NewInstanceAdminClient()
1897	if err != nil {
1898		t.Fatalf("NewInstanceAdminClient: %v", err)
1899	}
1900	defer iAdminClient.Close()
1901
1902	cluster1 := btapb.Cluster{Name: "cluster1"}
1903	failedLoc := "euro1"
1904
1905	iAdminClient.iClient = &instanceAdminClientMock{
1906		Clusters:             []*btapb.Cluster{&cluster1},
1907		UnavailableLocations: []string{failedLoc},
1908	}
1909
1910	cis, err := iAdminClient.Clusters(context.Background(), "instance-id")
1911	convertedErr, ok := err.(ErrPartiallyUnavailable)
1912	if !ok {
1913		t.Fatalf("want error ErrPartiallyUnavailable, got other")
1914	}
1915	if got, want := len(convertedErr.Locations), 1; got != want {
1916		t.Fatalf("want %v failed locations, got %v", want, got)
1917	}
1918	if got, want := convertedErr.Locations[0], failedLoc; got != want {
1919		t.Fatalf("want failed location %v, got %v", want, got)
1920	}
1921	if got, want := len(cis), 1; got != want {
1922		t.Fatalf("want %v failed locations, got %v", want, got)
1923	}
1924	if got, want := cis[0].Name, cluster1.Name; got != want {
1925		t.Fatalf("want cluster %v, got %v", want, got)
1926	}
1927}
1928
1929func TestIntegration_Granularity(t *testing.T) {
1930	testEnv, err := NewIntegrationEnv()
1931	if err != nil {
1932		t.Fatalf("IntegrationEnv: %v", err)
1933	}
1934	defer testEnv.Close()
1935
1936	timeout := 2 * time.Second
1937	if testEnv.Config().UseProd {
1938		timeout = 5 * time.Minute
1939	}
1940	ctx, _ := context.WithTimeout(context.Background(), timeout)
1941
1942	adminClient, err := testEnv.NewAdminClient()
1943	if err != nil {
1944		t.Fatalf("NewAdminClient: %v", err)
1945	}
1946	defer adminClient.Close()
1947
1948	list := func() []string {
1949		tbls, err := adminClient.Tables(ctx)
1950		if err != nil {
1951			t.Fatalf("Fetching list of tables: %v", err)
1952		}
1953		sort.Strings(tbls)
1954		return tbls
1955	}
1956	containsAll := func(got, want []string) bool {
1957		gotSet := make(map[string]bool)
1958
1959		for _, s := range got {
1960			gotSet[s] = true
1961		}
1962		for _, s := range want {
1963			if !gotSet[s] {
1964				return false
1965			}
1966		}
1967		return true
1968	}
1969
1970	defer deleteTable(ctx, t, adminClient, "mytable")
1971
1972	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1973		t.Fatalf("Creating table: %v", err)
1974	}
1975
1976	tables := list()
1977	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1978		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1979	}
1980
1981	// calling ModifyColumnFamilies to check the granularity of table
1982	prefix := adminClient.instancePrefix()
1983	req := &btapb.ModifyColumnFamiliesRequest{
1984		Name: prefix + "/tables/" + "mytable",
1985		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
1986			Id:  "cf",
1987			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}},
1988		}},
1989	}
1990	table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req)
1991	if err != nil {
1992		t.Fatalf("Creating column family: %v", err)
1993	}
1994	if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) {
1995		t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS))
1996	}
1997}
1998
1999func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) {
2000	testEnv, err := NewIntegrationEnv()
2001	if err != nil {
2002		t.Fatalf("IntegrationEnv: %v", err)
2003	}
2004	defer testEnv.Close()
2005
2006	timeout := 2 * time.Second
2007	if testEnv.Config().UseProd {
2008		timeout = 5 * time.Minute
2009	}
2010	ctx, cancel := context.WithTimeout(context.Background(), timeout)
2011	defer cancel()
2012
2013	adminClient, err := testEnv.NewAdminClient()
2014	if err != nil {
2015		t.Fatalf("NewAdminClient: %v", err)
2016	}
2017	defer adminClient.Close()
2018
2019	iAdminClient, err := testEnv.NewInstanceAdminClient()
2020	if err != nil {
2021		t.Fatalf("NewInstanceAdminClient: %v", err)
2022	}
2023
2024	if iAdminClient == nil {
2025		return
2026	}
2027
2028	defer iAdminClient.Close()
2029	profile := ProfileConf{
2030		ProfileID:     "app_profile1",
2031		InstanceID:    adminClient.instance,
2032		ClusterID:     testEnv.Config().Cluster,
2033		Description:   "creating new app profile 1",
2034		RoutingPolicy: SingleClusterRouting,
2035	}
2036
2037	createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile)
2038	if err != nil {
2039		t.Fatalf("Creating app profile: %v", err)
2040
2041	}
2042
2043	gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
2044
2045	if err != nil {
2046		t.Fatalf("Get app profile: %v", err)
2047	}
2048
2049	if !proto.Equal(createdProfile, gotProfile) {
2050		t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name)
2051
2052	}
2053
2054	list := func(instanceID string) ([]*btapb.AppProfile, error) {
2055		profiles := []*btapb.AppProfile(nil)
2056
2057		it := iAdminClient.ListAppProfiles(ctx, instanceID)
2058		for {
2059			s, err := it.Next()
2060			if err == iterator.Done {
2061				break
2062			}
2063			if err != nil {
2064				return nil, err
2065			}
2066			profiles = append(profiles, s)
2067		}
2068		return profiles, err
2069	}
2070
2071	profiles, err := list(adminClient.instance)
2072	if err != nil {
2073		t.Fatalf("List app profile: %v", err)
2074	}
2075
2076	if got, want := len(profiles), 1; got != want {
2077		t.Fatalf("Initial app profile list len: %d, want: %d", got, want)
2078	}
2079
2080	for _, test := range []struct {
2081		desc   string
2082		uattrs ProfileAttrsToUpdate
2083		want   *btapb.AppProfile // nil means error
2084	}{
2085		{
2086			desc:   "empty update",
2087			uattrs: ProfileAttrsToUpdate{},
2088			want:   nil,
2089		},
2090
2091		{
2092			desc:   "empty description update",
2093			uattrs: ProfileAttrsToUpdate{Description: ""},
2094			want: &btapb.AppProfile{
2095				Name:          gotProfile.Name,
2096				Description:   "",
2097				RoutingPolicy: gotProfile.RoutingPolicy,
2098				Etag:          gotProfile.Etag},
2099		},
2100		{
2101			desc: "routing update",
2102			uattrs: ProfileAttrsToUpdate{
2103				RoutingPolicy: SingleClusterRouting,
2104				ClusterID:     testEnv.Config().Cluster,
2105			},
2106			want: &btapb.AppProfile{
2107				Name:        gotProfile.Name,
2108				Description: "",
2109				Etag:        gotProfile.Etag,
2110				RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
2111					SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
2112						ClusterId: testEnv.Config().Cluster,
2113					}},
2114			},
2115		},
2116	} {
2117		err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs)
2118		if err != nil {
2119			if test.want != nil {
2120				t.Errorf("%s: %v", test.desc, err)
2121			}
2122			continue
2123		}
2124		if err == nil && test.want == nil {
2125			t.Errorf("%s: got nil, want error", test.desc)
2126			continue
2127		}
2128
2129		got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
2130
2131		if !proto.Equal(got, test.want) {
2132			t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want)
2133		}
2134
2135	}
2136
2137	err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1")
2138	if err != nil {
2139		t.Fatalf("Delete app profile: %v", err)
2140	}
2141
2142}
2143
2144func TestIntegration_InstanceUpdate(t *testing.T) {
2145	testEnv, err := NewIntegrationEnv()
2146	if err != nil {
2147		t.Fatalf("IntegrationEnv: %v", err)
2148	}
2149	defer testEnv.Close()
2150
2151	timeout := 2 * time.Second
2152	if testEnv.Config().UseProd {
2153		timeout = 5 * time.Minute
2154	}
2155	ctx, cancel := context.WithTimeout(context.Background(), timeout)
2156	defer cancel()
2157
2158	adminClient, err := testEnv.NewAdminClient()
2159	if err != nil {
2160		t.Fatalf("NewAdminClient: %v", err)
2161	}
2162
2163	defer adminClient.Close()
2164
2165	iAdminClient, err := testEnv.NewInstanceAdminClient()
2166	if err != nil {
2167		t.Fatalf("NewInstanceAdminClient: %v", err)
2168	}
2169
2170	if iAdminClient == nil {
2171		return
2172	}
2173
2174	defer iAdminClient.Close()
2175
2176	iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
2177	if err != nil {
2178		t.Errorf("InstanceInfo: %v", err)
2179	}
2180
2181	if iInfo.Name != adminClient.instance {
2182		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
2183	}
2184
2185	if iInfo.DisplayName != adminClient.instance {
2186		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
2187	}
2188
2189	const numNodes = 4
2190	// update cluster nodes
2191	if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil {
2192		t.Errorf("UpdateCluster: %v", err)
2193	}
2194
2195	// get cluster after updating
2196	cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster)
2197	if err != nil {
2198		t.Errorf("GetCluster %v", err)
2199	}
2200	if cis.ServeNodes != int(numNodes) {
2201		t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes))
2202	}
2203}
2204
2205func TestIntegration_AdminBackup(t *testing.T) {
2206	testEnv, err := NewIntegrationEnv()
2207	if err != nil {
2208		t.Fatalf("IntegrationEnv: %v", err)
2209	}
2210	defer testEnv.Close()
2211
2212	if !testEnv.Config().UseProd {
2213		t.Skip("emulator doesn't support backups")
2214	}
2215
2216	timeout := 5 * time.Minute
2217	ctx, _ := context.WithTimeout(context.Background(), timeout)
2218
2219	adminClient, err := testEnv.NewAdminClient()
2220	if err != nil {
2221		t.Fatalf("NewAdminClient: %v", err)
2222	}
2223	defer adminClient.Close()
2224
2225	table := testEnv.Config().Table
2226	cluster := testEnv.Config().Cluster
2227
2228	// Delete the table at the end of the test. Schedule ahead of time
2229	// in case the client fails
2230	defer deleteTable(ctx, t, adminClient, table)
2231
2232	list := func(cluster string) ([]*BackupInfo, error) {
2233		infos := []*BackupInfo(nil)
2234
2235		it := adminClient.Backups(ctx, cluster)
2236		for {
2237			s, err := it.Next()
2238			if err == iterator.Done {
2239				break
2240			}
2241			if err != nil {
2242				return nil, err
2243			}
2244			infos = append(infos, s)
2245		}
2246		return infos, err
2247	}
2248
2249	if err := adminClient.CreateTable(ctx, table); err != nil {
2250		t.Fatalf("Creating table: %v", err)
2251	}
2252
2253	// Precondition: no backups
2254	backups, err := list(cluster)
2255	if err != nil {
2256		t.Fatalf("Initial backup list: %v", err)
2257	}
2258	if got, want := len(backups), 0; got != want {
2259		t.Fatalf("Initial backup list len: %d, want: %d", got, want)
2260	}
2261
2262	// Create backup
2263	backupName := "mybackup"
2264	defer adminClient.DeleteBackup(ctx, cluster, "mybackup")
2265
2266	if err = adminClient.CreateBackup(ctx, table, cluster, backupName, time.Now().Add(8*time.Hour)); err != nil {
2267		t.Fatalf("Creating backup: %v", err)
2268	}
2269
2270	// List backup
2271	backups, err = list(cluster)
2272	if err != nil {
2273		t.Fatalf("Listing backups: %v", err)
2274	}
2275	if got, want := len(backups), 1; got != want {
2276		t.Fatalf("Listing backup count: %d, want: %d", got, want)
2277	}
2278	if got, want := backups[0].Name, backupName; got != want {
2279		t.Fatalf("Backup name: %s, want: %s", got, want)
2280	}
2281	if got, want := backups[0].SourceTable, table; got != want {
2282		t.Fatalf("Backup SourceTable: %s, want: %s", got, want)
2283	}
2284	if got, want := backups[0].ExpireTime, backups[0].StartTime.Add(8*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
2285		t.Fatalf("Backup ExpireTime: %s, want: %s", got, want)
2286	}
2287
2288	// Get backup
2289	backup, err := adminClient.BackupInfo(ctx, cluster, backupName)
2290	if err != nil {
2291		t.Fatalf("BackupInfo: %v", backup)
2292	}
2293	if got, want := *backup, *backups[0]; got != want {
2294		t.Fatalf("BackupInfo: %v, want: %v", got, want)
2295	}
2296
2297	// Update backup
2298	newExpireTime := time.Now().Add(10 * time.Hour)
2299	err = adminClient.UpdateBackup(ctx, cluster, backupName, newExpireTime)
2300	if err != nil {
2301		t.Fatalf("UpdateBackup failed: %v", err)
2302	}
2303
2304	// Check that updated backup has the correct expire time
2305	updatedBackup, err := adminClient.BackupInfo(ctx, cluster, backupName)
2306	if err != nil {
2307		t.Fatalf("BackupInfo: %v", err)
2308	}
2309	backup.ExpireTime = newExpireTime
2310	// Server clock and local clock may not be perfectly sync'ed.
2311	if got, want := *updatedBackup, *backup; got.ExpireTime.Sub(want.ExpireTime) > time.Minute {
2312		t.Fatalf("BackupInfo: %v, want: %v", got, want)
2313	}
2314
2315	// Restore backup
2316	restoredTable := table + "-restored"
2317	defer deleteTable(ctx, t, adminClient, restoredTable)
2318	if err = adminClient.RestoreTable(ctx, restoredTable, cluster, backupName); err != nil {
2319		t.Fatalf("RestoreTable: %v", err)
2320	}
2321	if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
2322		t.Fatalf("Restored TableInfo: %v", err)
2323	}
2324
2325	// Delete backup
2326	if err = adminClient.DeleteBackup(ctx, cluster, backupName); err != nil {
2327		t.Fatalf("DeleteBackup: %v", err)
2328	}
2329	backups, err = list(cluster)
2330	if err != nil {
2331		t.Fatalf("List after Delete: %v", err)
2332	}
2333	if got, want := len(backups), 0; got != want {
2334		t.Fatalf("List after delete len: %d, want: %d", got, want)
2335	}
2336}
2337
2338// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
2339func TestIntegration_DirectPathFallback(t *testing.T) {
2340	ctx := context.Background()
2341	testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
2342	if err != nil {
2343		t.Fatal(err)
2344	}
2345	defer cleanup()
2346
2347	if !testEnv.Config().AttemptDirectPath {
2348		t.Skip()
2349	}
2350
2351	if len(blackholeDpv6Cmd) == 0 {
2352		t.Fatal("-it.blackhole-dpv6-cmd unset")
2353	}
2354	if len(blackholeDpv4Cmd) == 0 {
2355		t.Fatal("-it.blackhole-dpv4-cmd unset")
2356	}
2357	if len(allowDpv6Cmd) == 0 {
2358		t.Fatal("-it.allowdpv6-cmd unset")
2359	}
2360	if len(allowDpv4Cmd) == 0 {
2361		t.Fatal("-it.allowdpv4-cmd unset")
2362	}
2363
2364	if err := populatePresidentsGraph(table); err != nil {
2365		t.Fatal(err)
2366	}
2367
2368	// Precondition: wait for DirectPath to connect.
2369	dpEnabled := examineTraffic(ctx, testEnv, table, false)
2370	if !dpEnabled {
2371		t.Fatalf("Failed to observe RPCs over DirectPath")
2372	}
2373
2374	// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
2375	blackholeDirectPath(testEnv, t)
2376	dpDisabled := examineTraffic(ctx, testEnv, table, true)
2377	if !dpDisabled {
2378		t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
2379	}
2380
2381	// Disable the blackhole, and client should use DirectPath again.
2382	allowDirectPath(testEnv, t)
2383	dpEnabled = examineTraffic(ctx, testEnv, table, false)
2384	if !dpEnabled {
2385		t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
2386	}
2387}
2388
2389// examineTraffic returns whether RPCs use DirectPath (blackholeDP = false) or CFE (blackholeDP = true).
2390func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool {
2391	numCount := 0
2392	const (
2393		numRPCsToSend  = 20
2394		minCompleteRPC = 40
2395	)
2396
2397	start := time.Now()
2398	for time.Since(start) < 2*time.Minute {
2399		for i := 0; i < numRPCsToSend; i++ {
2400			_, _ = table.ReadRow(ctx, "jadams")
2401			if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != blackholeDP {
2402				numCount++
2403				if numCount >= minCompleteRPC {
2404					return true
2405				}
2406			}
2407			time.Sleep(100 * time.Millisecond)
2408		}
2409	}
2410	return false
2411}
2412
2413func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
2414	testEnv, err := NewIntegrationEnv()
2415	if err != nil {
2416		return nil, nil, nil, nil, "", nil, err
2417	}
2418
2419	var timeout time.Duration
2420	if testEnv.Config().UseProd {
2421		timeout = 10 * time.Minute
2422		t.Logf("Running test against production")
2423	} else {
2424		timeout = 5 * time.Minute
2425		t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint)
2426	}
2427	ctx, cancel := context.WithTimeout(ctx, timeout)
2428
2429	client, err := testEnv.NewClient()
2430	if err != nil {
2431		return nil, nil, nil, nil, "", nil, err
2432	}
2433
2434	adminClient, err := testEnv.NewAdminClient()
2435	if err != nil {
2436		return nil, nil, nil, nil, "", nil, err
2437	}
2438
2439	if testEnv.Config().UseProd {
2440		// TODO: tables may not be successfully deleted in some cases, and will
2441		// become obsolete. We may need a way to automatically delete them.
2442		tableName = tableNameSpace.New()
2443	} else {
2444		tableName = testEnv.Config().Table
2445	}
2446
2447	if err := adminClient.CreateTable(ctx, tableName); err != nil {
2448		cancel()
2449		return nil, nil, nil, nil, "", nil, err
2450	}
2451	if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil {
2452		cancel()
2453		return nil, nil, nil, nil, "", nil, err
2454	}
2455
2456	return testEnv, client, adminClient, client.Open(tableName), tableName, func() {
2457		if err := adminClient.DeleteTable(ctx, tableName); err != nil {
2458			t.Errorf("DeleteTable got error %v", err)
2459		}
2460		cancel()
2461		client.Close()
2462		adminClient.Close()
2463	}, nil
2464}
2465
2466func formatReadItem(ri ReadItem) string {
2467	// Use the column qualifier only to make the test data briefer.
2468	col := ri.Column[strings.Index(ri.Column, ":")+1:]
2469	return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value)
2470}
2471
2472func fill(b, sub []byte) {
2473	for len(b) > len(sub) {
2474		n := copy(b, sub)
2475		b = b[n:]
2476	}
2477}
2478
2479func clearTimestamps(r Row) {
2480	for _, ris := range r {
2481		for i := range ris {
2482			ris[i].Timestamp = 0
2483		}
2484	}
2485}
2486
2487func deleteTable(ctx context.Context, t *testing.T, ac *AdminClient, name string) {
2488	bo := gax.Backoff{
2489		Initial:    100 * time.Millisecond,
2490		Max:        2 * time.Second,
2491		Multiplier: 1.2,
2492	}
2493	ctx, _ = context.WithTimeout(ctx, time.Second*30)
2494
2495	err := internal.Retry(ctx, bo, func() (bool, error) {
2496		err := ac.DeleteTable(ctx, name)
2497		if err != nil {
2498			return false, err
2499		}
2500		return true, nil
2501	})
2502	if err != nil {
2503		t.Logf("DeleteTable: %v", err)
2504	}
2505}
2506
2507func verifyDirectPathRemoteAddress(testEnv IntegrationEnv, t *testing.T) {
2508	t.Helper()
2509	if !testEnv.Config().AttemptDirectPath {
2510		return
2511	}
2512	if remoteIP, res := isDirectPathRemoteAddress(testEnv); !res {
2513		if testEnv.Config().DirectPathIPV4Only {
2514			t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP)
2515		} else {
2516			t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP)
2517		}
2518	}
2519}
2520
2521func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) {
2522	remoteIP := testEnv.Peer().Addr.String()
2523	// DirectPath ipv4-only can only use ipv4 traffic.
2524	if testEnv.Config().DirectPathIPV4Only {
2525		return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix)
2526	}
2527	// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
2528	return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
2529}
2530
2531func blackholeDirectPath(testEnv IntegrationEnv, t *testing.T) {
2532	cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
2533	out, _ := cmdRes.CombinedOutput()
2534	t.Logf(string(out))
2535	if testEnv.Config().DirectPathIPV4Only {
2536		return
2537	}
2538	cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
2539	out, _ = cmdRes.CombinedOutput()
2540	t.Logf(string(out))
2541}
2542
2543func allowDirectPath(testEnv IntegrationEnv, t *testing.T) {
2544	cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
2545	out, _ := cmdRes.CombinedOutput()
2546	t.Logf(string(out))
2547	if testEnv.Config().DirectPathIPV4Only {
2548		return
2549	}
2550	cmdRes = exec.Command("bash", "-c", allowDpv6Cmd)
2551	out, _ = cmdRes.CombinedOutput()
2552	t.Logf(string(out))
2553}
2554