1/*
2Copyright 2019 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"context"
21	"flag"
22	"fmt"
23	"math"
24	"math/rand"
25	"os/exec"
26	"reflect"
27	"sort"
28	"strings"
29	"sync"
30	"testing"
31	"time"
32
33	"cloud.google.com/go/iam"
34	"cloud.google.com/go/internal"
35	"cloud.google.com/go/internal/testutil"
36	"cloud.google.com/go/internal/uid"
37	"github.com/golang/protobuf/proto"
38	"github.com/google/go-cmp/cmp"
39	gax "github.com/googleapis/gax-go/v2"
40	"google.golang.org/api/iterator"
41	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
42	v1 "google.golang.org/genproto/googleapis/iam/v1"
43	longrunning "google.golang.org/genproto/googleapis/longrunning"
44	grpc "google.golang.org/grpc"
45	"google.golang.org/protobuf/types/known/emptypb"
46)
47
48const (
49	directPathIPV6Prefix = "[2001:4860:8040"
50	directPathIPV4Prefix = "34.126"
51)
52
53var (
54	presidentsSocialGraph = map[string][]string{
55		"wmckinley":   {"tjefferson"},
56		"gwashington": {"j§adams"},
57		"tjefferson":  {"gwashington", "j§adams"},
58		"j§adams":     {"gwashington", "tjefferson"},
59	}
60
61	tableNameSpace = uid.NewSpace("cbt-test", &uid.Options{Short: true})
62)
63
64func populatePresidentsGraph(table *Table) error {
65	ctx := context.Background()
66	for row, ss := range presidentsSocialGraph {
67		mut := NewMutation()
68		for _, name := range ss {
69			mut.Set("follows", name, 1000, []byte("1"))
70		}
71		if err := table.Apply(ctx, row, mut); err != nil {
72			return fmt.Errorf("Mutating row %q: %v", row, err)
73		}
74	}
75	return nil
76}
77
78var (
79	instanceToCreate      string
80	instanceToCreateZone  string
81	instanceToCreateZone2 string
82	blackholeDpv6Cmd      string
83	blackholeDpv4Cmd      string
84	allowDpv6Cmd          string
85	allowDpv4Cmd          string
86)
87
88func init() {
89	// Don't test instance creation by default, as quota is necessary and aborted tests could strand resources.
90	flag.StringVar(&instanceToCreate, "it.instance-to-create", "",
91		"The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.")
92	flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b",
93		"The zone in which to create the new test instance.")
94	flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c",
95		"The zone in which to create a second cluster in the test instance.")
96	// Use sysctl or iptables to blackhole DirectPath IP for fallback tests.
97	flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6")
98	flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4")
99	flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6")
100	flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4")
101}
102
103func TestIntegration_ConditionalMutations(t *testing.T) {
104	ctx := context.Background()
105	testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
106	if err != nil {
107		t.Fatal(err)
108	}
109	defer cleanup()
110
111	if err := populatePresidentsGraph(table); err != nil {
112		t.Fatal(err)
113	}
114
115	// Do a conditional mutation with a complex filter.
116	mutTrue := NewMutation()
117	mutTrue.Set("follows", "wmckinley", 1000, []byte("1"))
118	filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter("."))
119	mut := NewCondMutation(filter, mutTrue, nil)
120	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
121		t.Fatalf("Conditionally mutating row: %v", err)
122	}
123	verifyDirectPathRemoteAddress(testEnv, t)
124	// Do a second condition mutation with a filter that does not match,
125	// and thus no changes should be made.
126	mutTrue = NewMutation()
127	mutTrue.DeleteRow()
128	filter = ColumnFilter("snoop.dogg")
129	mut = NewCondMutation(filter, mutTrue, nil)
130	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
131		t.Fatalf("Conditionally mutating row: %v", err)
132	}
133	verifyDirectPathRemoteAddress(testEnv, t)
134
135	// Fetch a row.
136	row, err := table.ReadRow(ctx, "j§adams")
137	if err != nil {
138		t.Fatalf("Reading a row: %v", err)
139	}
140	verifyDirectPathRemoteAddress(testEnv, t)
141	wantRow := Row{
142		"follows": []ReadItem{
143			{Row: "j§adams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")},
144			{Row: "j§adams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")},
145		},
146	}
147	if !testutil.Equal(row, wantRow) {
148		t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
149	}
150}
151
152func TestIntegration_PartialReadRows(t *testing.T) {
153	ctx := context.Background()
154	_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
155	if err != nil {
156		t.Fatal(err)
157	}
158	defer cleanup()
159
160	if err := populatePresidentsGraph(table); err != nil {
161		t.Fatal(err)
162	}
163
164	// Do a scan and stop part way through.
165	// Verify that the ReadRows callback doesn't keep running.
166	stopped := false
167	err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
168		if r.Key() < "h" {
169			return true
170		}
171		if !stopped {
172			stopped = true
173			return false
174		}
175		t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key())
176		return false
177	})
178	if err != nil {
179		t.Fatalf("Partial ReadRows: %v", err)
180	}
181}
182
183func TestIntegration_ReadRowList(t *testing.T) {
184	ctx := context.Background()
185	_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
186	if err != nil {
187		t.Fatal(err)
188	}
189	defer cleanup()
190
191	if err := populatePresidentsGraph(table); err != nil {
192		t.Fatal(err)
193	}
194
195	// Read a RowList
196	var elt []string
197	keys := RowList{"wmckinley", "gwashington", "j§adams"}
198	want := "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,wmckinley-tjefferson-1"
199	err = table.ReadRows(ctx, keys, func(r Row) bool {
200		for _, ris := range r {
201			for _, ri := range ris {
202				elt = append(elt, formatReadItem(ri))
203			}
204		}
205		return true
206	})
207	if err != nil {
208		t.Fatalf("read RowList: %v", err)
209	}
210
211	if got := strings.Join(elt, ","); got != want {
212		t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
213	}
214}
215
216func TestIntegration_DeleteRow(t *testing.T) {
217	ctx := context.Background()
218	_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
219	if err != nil {
220		t.Fatal(err)
221	}
222	defer cleanup()
223
224	if err := populatePresidentsGraph(table); err != nil {
225		t.Fatal(err)
226	}
227
228	// Delete a row and check it goes away.
229	mut := NewMutation()
230	mut.DeleteRow()
231	if err := table.Apply(ctx, "wmckinley", mut); err != nil {
232		t.Fatalf("Apply DeleteRow: %v", err)
233	}
234	row, err := table.ReadRow(ctx, "wmckinley")
235	if err != nil {
236		t.Fatalf("Reading a row after DeleteRow: %v", err)
237	}
238	if len(row) != 0 {
239		t.Fatalf("Read non-zero row after DeleteRow: %v", row)
240	}
241}
242
243func TestIntegration_ReadModifyWrite(t *testing.T) {
244	ctx := context.Background()
245	testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
246	if err != nil {
247		t.Fatal(err)
248	}
249	defer cleanup()
250
251	if err := populatePresidentsGraph(table); err != nil {
252		t.Fatal(err)
253	}
254
255	if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil {
256		t.Fatalf("Creating column family: %v", err)
257	}
258
259	appendRMW := func(b []byte) *ReadModifyWrite {
260		rmw := NewReadModifyWrite()
261		rmw.AppendValue("counter", "likes", b)
262		return rmw
263	}
264	incRMW := func(n int64) *ReadModifyWrite {
265		rmw := NewReadModifyWrite()
266		rmw.Increment("counter", "likes", n)
267		return rmw
268	}
269	rmwSeq := []struct {
270		desc string
271		rmw  *ReadModifyWrite
272		want []byte
273	}{
274		{
275			desc: "append #1",
276			rmw:  appendRMW([]byte{0, 0, 0}),
277			want: []byte{0, 0, 0},
278		},
279		{
280			desc: "append #2",
281			rmw:  appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17
282			want: []byte{0, 0, 0, 0, 0, 0, 0, 17},
283		},
284		{
285			desc: "increment",
286			rmw:  incRMW(8),
287			want: []byte{0, 0, 0, 0, 0, 0, 0, 25},
288		},
289	}
290	for _, step := range rmwSeq {
291		row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw)
292		if err != nil {
293			t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err)
294		}
295		verifyDirectPathRemoteAddress(testEnv, t)
296		// Make sure the modified cell returned by the RMW operation has a timestamp.
297		if row["counter"][0].Timestamp == 0 {
298			t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp)
299		}
300		clearTimestamps(row)
301		wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}}
302		if !testutil.Equal(row, wantRow) {
303			t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow)
304		}
305	}
306
307	// Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator.
308	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0}))
309	if err != nil {
310		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
311	}
312	verifyDirectPathRemoteAddress(testEnv, t)
313	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0}))
314	if err != nil {
315		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
316	}
317	verifyDirectPathRemoteAddress(testEnv, t)
318	// Get only the correct row back on read.
319	r, err := table.ReadRow(ctx, "issue-723-1")
320	if err != nil {
321		t.Fatalf("Reading row: %v", err)
322	}
323	verifyDirectPathRemoteAddress(testEnv, t)
324	if r.Key() != "issue-723-1" {
325		t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1")
326	}
327}
328
329func TestIntegration_ArbitraryTimestamps(t *testing.T) {
330	ctx := context.Background()
331	_, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
332	if err != nil {
333		t.Fatal(err)
334	}
335	defer cleanup()
336
337	// Test arbitrary timestamps more thoroughly.
338	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
339		t.Fatalf("Creating column family: %v", err)
340	}
341	const numVersions = 4
342	mut := NewMutation()
343	for i := 1; i < numVersions; i++ {
344		// Timestamps are used in thousands because the server
345		// only permits that granularity.
346		mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
347		mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
348	}
349	if err := table.Apply(ctx, "testrow", mut); err != nil {
350		t.Fatalf("Mutating row: %v", err)
351	}
352	r, err := table.ReadRow(ctx, "testrow")
353	if err != nil {
354		t.Fatalf("Reading row: %v", err)
355	}
356	wantRow := Row{"ts": []ReadItem{
357		// These should be returned in descending timestamp order.
358		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
359		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
360		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
361		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
362		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
363		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
364	}}
365	if !testutil.Equal(r, wantRow) {
366		t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow)
367	}
368
369	// Do the same read, but filter to the latest two versions.
370	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
371	if err != nil {
372		t.Fatalf("Reading row: %v", err)
373	}
374	wantRow = Row{"ts": []ReadItem{
375		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
376		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
377		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
378		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
379	}}
380	if !testutil.Equal(r, wantRow) {
381		t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
382	}
383	// Check cell offset / limit
384	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3)))
385	if err != nil {
386		t.Fatalf("Reading row: %v", err)
387	}
388	wantRow = Row{"ts": []ReadItem{
389		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
390		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
391		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
392	}}
393	if !testutil.Equal(r, wantRow) {
394		t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow)
395	}
396	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3)))
397	if err != nil {
398		t.Fatalf("Reading row: %v", err)
399	}
400	wantRow = Row{"ts": []ReadItem{
401		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
402		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
403		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
404	}}
405	if !testutil.Equal(r, wantRow) {
406		t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow)
407	}
408	// Check timestamp range filtering (with truncation)
409	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000)))
410	if err != nil {
411		t.Fatalf("Reading row: %v", err)
412	}
413	wantRow = Row{"ts": []ReadItem{
414		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
415		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
416		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
417		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
418	}}
419	if !testutil.Equal(r, wantRow) {
420		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow)
421	}
422	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0)))
423	if err != nil {
424		t.Fatalf("Reading row: %v", err)
425	}
426	wantRow = Row{"ts": []ReadItem{
427		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
428		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
429		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
430		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
431		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
432		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
433	}}
434	if !testutil.Equal(r, wantRow) {
435		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow)
436	}
437	// Delete non-existing cells, no such column family in this row
438	// Should not delete anything
439	if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil {
440		t.Fatalf("Creating column family: %v", err)
441	}
442	mut = NewMutation()
443	mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval
444	if err := table.Apply(ctx, "testrow", mut); err != nil {
445		t.Fatalf("Mutating row: %v", err)
446	}
447	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
448	if err != nil {
449		t.Fatalf("Reading row: %v", err)
450	}
451	if !testutil.Equal(r, wantRow) {
452		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
453	}
454	// Delete non-existing cells, no such column in this column family
455	// Should not delete anything
456	mut = NewMutation()
457	mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval
458	if err := table.Apply(ctx, "testrow", mut); err != nil {
459		t.Fatalf("Mutating row: %v", err)
460	}
461	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
462	if err != nil {
463		t.Fatalf("Reading row: %v", err)
464	}
465	if !testutil.Equal(r, wantRow) {
466		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
467	}
468	// Delete the cell with timestamp 2000 and repeat the last read,
469	// checking that we get ts 3000 and ts 1000.
470	mut = NewMutation()
471	mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval
472	if err := table.Apply(ctx, "testrow", mut); err != nil {
473		t.Fatalf("Mutating row: %v", err)
474	}
475	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
476	if err != nil {
477		t.Fatalf("Reading row: %v", err)
478	}
479	wantRow = Row{"ts": []ReadItem{
480		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
481		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
482		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
483		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
484	}}
485	if !testutil.Equal(r, wantRow) {
486		t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow)
487	}
488
489	// Check DeleteCellsInFamily
490	if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil {
491		t.Fatalf("Creating column family: %v", err)
492	}
493
494	mut = NewMutation()
495	mut.Set("status", "start", 2000, []byte("2"))
496	mut.Set("status", "end", 3000, []byte("3"))
497	mut.Set("ts", "col", 1000, []byte("1"))
498	if err := table.Apply(ctx, "row1", mut); err != nil {
499		t.Fatalf("Mutating row: %v", err)
500	}
501	if err := table.Apply(ctx, "row2", mut); err != nil {
502		t.Fatalf("Mutating row: %v", err)
503	}
504
505	mut = NewMutation()
506	mut.DeleteCellsInFamily("status")
507	if err := table.Apply(ctx, "row1", mut); err != nil {
508		t.Fatalf("Delete cf: %v", err)
509	}
510
511	// ColumnFamily removed
512	r, err = table.ReadRow(ctx, "row1")
513	if err != nil {
514		t.Fatalf("Reading row: %v", err)
515	}
516	wantRow = Row{"ts": []ReadItem{
517		{Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
518	}}
519	if !testutil.Equal(r, wantRow) {
520		t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow)
521	}
522
523	// ColumnFamily not removed
524	r, err = table.ReadRow(ctx, "row2")
525	if err != nil {
526		t.Fatalf("Reading row: %v", err)
527	}
528	wantRow = Row{
529		"ts": []ReadItem{
530			{Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
531		},
532		"status": []ReadItem{
533			{Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")},
534			{Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
535		},
536	}
537	if !testutil.Equal(r, wantRow) {
538		t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow)
539	}
540
541	// Check DeleteCellsInColumn
542	mut = NewMutation()
543	mut.Set("status", "start", 2000, []byte("2"))
544	mut.Set("status", "middle", 3000, []byte("3"))
545	mut.Set("status", "end", 1000, []byte("1"))
546	if err := table.Apply(ctx, "row3", mut); err != nil {
547		t.Fatalf("Mutating row: %v", err)
548	}
549	mut = NewMutation()
550	mut.DeleteCellsInColumn("status", "middle")
551	if err := table.Apply(ctx, "row3", mut); err != nil {
552		t.Fatalf("Delete column: %v", err)
553	}
554	r, err = table.ReadRow(ctx, "row3")
555	if err != nil {
556		t.Fatalf("Reading row: %v", err)
557	}
558	wantRow = Row{
559		"status": []ReadItem{
560			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
561			{Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
562		},
563	}
564	if !testutil.Equal(r, wantRow) {
565		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
566	}
567	mut = NewMutation()
568	mut.DeleteCellsInColumn("status", "start")
569	if err := table.Apply(ctx, "row3", mut); err != nil {
570		t.Fatalf("Delete column: %v", err)
571	}
572	r, err = table.ReadRow(ctx, "row3")
573	if err != nil {
574		t.Fatalf("Reading row: %v", err)
575	}
576	wantRow = Row{
577		"status": []ReadItem{
578			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
579		},
580	}
581	if !testutil.Equal(r, wantRow) {
582		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
583	}
584	mut = NewMutation()
585	mut.DeleteCellsInColumn("status", "end")
586	if err := table.Apply(ctx, "row3", mut); err != nil {
587		t.Fatalf("Delete column: %v", err)
588	}
589	r, err = table.ReadRow(ctx, "row3")
590	if err != nil {
591		t.Fatalf("Reading row: %v", err)
592	}
593	if len(r) != 0 {
594		t.Fatalf("Delete column: got %v, want empty row", r)
595	}
596	// Add same cell after delete
597	mut = NewMutation()
598	mut.Set("status", "end", 1000, []byte("1"))
599	if err := table.Apply(ctx, "row3", mut); err != nil {
600		t.Fatalf("Mutating row: %v", err)
601	}
602	r, err = table.ReadRow(ctx, "row3")
603	if err != nil {
604		t.Fatalf("Reading row: %v", err)
605	}
606	if !testutil.Equal(r, wantRow) {
607		t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow)
608	}
609}
610
611func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) {
612	ctx := context.Background()
613	_, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
614	if err != nil {
615		t.Fatal(err)
616	}
617	defer cleanup()
618
619	if err := populatePresidentsGraph(table); err != nil {
620		t.Fatal(err)
621	}
622
623	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
624		t.Fatalf("Creating column family: %v", err)
625	}
626
627	// Do highly concurrent reads/writes.
628	const maxConcurrency = 1000
629	var wg sync.WaitGroup
630	for i := 0; i < maxConcurrency; i++ {
631		wg.Add(1)
632		go func() {
633			defer wg.Done()
634			switch r := rand.Intn(100); { // r ∈ [0,100)
635			case 0 <= r && r < 30:
636				// Do a read.
637				_, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1)))
638				if err != nil {
639					t.Errorf("Concurrent read: %v", err)
640				}
641			case 30 <= r && r < 100:
642				// Do a write.
643				mut := NewMutation()
644				mut.Set("ts", "col", 1000, []byte("data"))
645				if err := table.Apply(ctx, "testrow", mut); err != nil {
646					t.Errorf("Concurrent write: %v", err)
647				}
648			}
649		}()
650	}
651	wg.Wait()
652}
653
654func TestIntegration_LargeReadsWritesAndScans(t *testing.T) {
655	ctx := context.Background()
656	testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
657	if err != nil {
658		t.Fatal(err)
659	}
660	defer cleanup()
661
662	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
663		t.Fatalf("Creating column family: %v", err)
664	}
665
666	bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set.
667	nonsense := []byte("lorem ipsum dolor sit amet, ")
668	fill(bigBytes, nonsense)
669	mut := NewMutation()
670	mut.Set("ts", "col", 1000, bigBytes)
671	if err := table.Apply(ctx, "bigrow", mut); err != nil {
672		t.Fatalf("Big write: %v", err)
673	}
674	verifyDirectPathRemoteAddress(testEnv, t)
675	r, err := table.ReadRow(ctx, "bigrow")
676	if err != nil {
677		t.Fatalf("Big read: %v", err)
678	}
679	verifyDirectPathRemoteAddress(testEnv, t)
680	wantRow := Row{"ts": []ReadItem{
681		{Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes},
682	}}
683	if !testutil.Equal(r, wantRow) {
684		t.Fatalf("Big read returned incorrect bytes: %v", r)
685	}
686
687	var wg sync.WaitGroup
688	// Now write 1000 rows, each with 82 KB values, then scan them all.
689	medBytes := make([]byte, 82<<10)
690	fill(medBytes, nonsense)
691	sem := make(chan int, 50) // do up to 50 mutations at a time.
692	for i := 0; i < 1000; i++ {
693		mut := NewMutation()
694		mut.Set("ts", "big-scan", 1000, medBytes)
695		row := fmt.Sprintf("row-%d", i)
696		wg.Add(1)
697		go func() {
698			defer wg.Done()
699			defer func() { <-sem }()
700			sem <- 1
701			if err := table.Apply(ctx, row, mut); err != nil {
702				t.Errorf("Preparing large scan: %v", err)
703			}
704			verifyDirectPathRemoteAddress(testEnv, t)
705		}()
706	}
707	wg.Wait()
708	n := 0
709	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
710		for _, ris := range r {
711			for _, ri := range ris {
712				n += len(ri.Value)
713			}
714		}
715		return true
716	}, RowFilter(ColumnFilter("big-scan")))
717	if err != nil {
718		t.Fatalf("Doing large scan: %v", err)
719	}
720	verifyDirectPathRemoteAddress(testEnv, t)
721	if want := 1000 * len(medBytes); n != want {
722		t.Fatalf("Large scan returned %d bytes, want %d", n, want)
723	}
724	// Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption.
725	rc := 0
726	wantRc := 3
727	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
728		rc++
729		return true
730	}, LimitRows(int64(wantRc)))
731	if err != nil {
732		t.Fatal(err)
733	}
734	verifyDirectPathRemoteAddress(testEnv, t)
735	if rc != wantRc {
736		t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc)
737	}
738
739	// Test bulk mutations
740	if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil {
741		t.Fatalf("Creating column family: %v", err)
742	}
743	bulkData := map[string][]string{
744		"red sox":  {"2004", "2007", "2013"},
745		"patriots": {"2001", "2003", "2004", "2014"},
746		"celtics":  {"1981", "1984", "1986", "2008"},
747	}
748	var rowKeys []string
749	var muts []*Mutation
750	for row, ss := range bulkData {
751		mut := NewMutation()
752		for _, name := range ss {
753			mut.Set("bulk", name, 1000, []byte("1"))
754		}
755		rowKeys = append(rowKeys, row)
756		muts = append(muts, mut)
757	}
758	status, err := table.ApplyBulk(ctx, rowKeys, muts)
759	if err != nil {
760		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
761	}
762	verifyDirectPathRemoteAddress(testEnv, t)
763	if status != nil {
764		t.Fatalf("non-nil errors: %v", err)
765	}
766
767	// Read each row back
768	for rowKey, ss := range bulkData {
769		row, err := table.ReadRow(ctx, rowKey)
770		if err != nil {
771			t.Fatalf("Reading a bulk row: %v", err)
772		}
773		verifyDirectPathRemoteAddress(testEnv, t)
774		var wantItems []ReadItem
775		for _, val := range ss {
776			wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")})
777		}
778		wantRow := Row{"bulk": wantItems}
779		if !testutil.Equal(row, wantRow) {
780			t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
781		}
782	}
783
784	// Test bulk write errors.
785	// Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error.
786	badMut := NewMutation()
787	badMut.Set("badfamily", "col", ServerTime, nil)
788	badMut2 := NewMutation()
789	badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1"))
790	status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2})
791	if err != nil {
792		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
793	}
794	verifyDirectPathRemoteAddress(testEnv, t)
795	if status == nil {
796		t.Fatalf("No errors for bad bulk mutation")
797	} else if status[0] == nil || status[1] == nil {
798		t.Fatalf("No error for bad bulk mutation")
799	}
800}
801
802func TestIntegration_Read(t *testing.T) {
803	ctx := context.Background()
804	testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
805	if err != nil {
806		t.Fatal(err)
807	}
808	defer cleanup()
809
810	// Insert some data.
811	initialData := map[string][]string{
812		"wmckinley":   {"tjefferson"},
813		"gwashington": {"j§adams"},
814		"tjefferson":  {"gwashington", "j§adams", "wmckinley"},
815		"j§adams":     {"gwashington", "tjefferson"},
816	}
817	for row, ss := range initialData {
818		mut := NewMutation()
819		for _, name := range ss {
820			mut.Set("follows", name, 1000, []byte("1"))
821		}
822		if err := table.Apply(ctx, row, mut); err != nil {
823			t.Fatalf("Mutating row %q: %v", row, err)
824		}
825		verifyDirectPathRemoteAddress(testEnv, t)
826	}
827
828	for _, test := range []struct {
829		desc   string
830		rr     RowSet
831		filter Filter     // may be nil
832		limit  ReadOption // may be nil
833
834		// We do the read, grab all the cells, turn them into "<row>-<col>-<val>",
835		// and join with a comma.
836		want       string
837		wantLabels []string
838	}{
839		{
840			desc: "read all, unfiltered",
841			rr:   RowRange{},
842			want: "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
843		},
844		{
845			desc: "read with InfiniteRange, unfiltered",
846			rr:   InfiniteRange("tjefferson"),
847			want: "tjefferson-gwashington-1,tjefferson-j§adams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
848		},
849		{
850			desc: "read with NewRange, unfiltered",
851			rr:   NewRange("gargamel", "hubbard"),
852			want: "gwashington-j§adams-1",
853		},
854		{
855			desc: "read with PrefixRange, unfiltered",
856			rr:   PrefixRange("j§ad"),
857			want: "j§adams-gwashington-1,j§adams-tjefferson-1",
858		},
859		{
860			desc: "read with SingleRow, unfiltered",
861			rr:   SingleRow("wmckinley"),
862			want: "wmckinley-tjefferson-1",
863		},
864		{
865			desc:   "read all, with ColumnFilter",
866			rr:     RowRange{},
867			filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson"
868			want:   "gwashington-j§adams-1,j§adams-tjefferson-1,tjefferson-j§adams-1,wmckinley-tjefferson-1",
869		},
870		{
871			desc:   "read all, with ColumnFilter, prefix",
872			rr:     RowRange{},
873			filter: ColumnFilter("j"), // no matches
874			want:   "",
875		},
876		{
877			desc:   "read range, with ColumnRangeFilter",
878			rr:     RowRange{},
879			filter: ColumnRangeFilter("follows", "h", "k"),
880			want:   "gwashington-j§adams-1,tjefferson-j§adams-1",
881		},
882		{
883			desc:   "read range from empty, with ColumnRangeFilter",
884			rr:     RowRange{},
885			filter: ColumnRangeFilter("follows", "", "u"),
886			want:   "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,wmckinley-tjefferson-1",
887		},
888		{
889			desc:   "read range from start to empty, with ColumnRangeFilter",
890			rr:     RowRange{},
891			filter: ColumnRangeFilter("follows", "h", ""),
892			want:   "gwashington-j§adams-1,j§adams-tjefferson-1,tjefferson-j§adams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
893		},
894		{
895			desc:   "read with RowKeyFilter",
896			rr:     RowRange{},
897			filter: RowKeyFilter(".*wash.*"),
898			want:   "gwashington-j§adams-1",
899		},
900		{
901			desc:   "read with RowKeyFilter unicode",
902			rr:     RowRange{},
903			filter: RowKeyFilter(".*j§.*"),
904			want:   "j§adams-gwashington-1,j§adams-tjefferson-1",
905		},
906		{
907			desc:   "read with RowKeyFilter escaped",
908			rr:     RowRange{},
909			filter: RowKeyFilter(`.*j\xC2\xA7.*`),
910			want:   "j§adams-gwashington-1,j§adams-tjefferson-1",
911		},
912		{
913			desc:   "read with RowKeyFilter, prefix",
914			rr:     RowRange{},
915			filter: RowKeyFilter("gwash"),
916			want:   "",
917		},
918		{
919			desc:   "read with RowKeyFilter, no matches",
920			rr:     RowRange{},
921			filter: RowKeyFilter(".*xxx.*"),
922			want:   "",
923		},
924		{
925			desc:   "read with FamilyFilter, no matches",
926			rr:     RowRange{},
927			filter: FamilyFilter(".*xxx.*"),
928			want:   "",
929		},
930		{
931			desc:   "read with ColumnFilter + row limit",
932			rr:     RowRange{},
933			filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson"
934			limit:  LimitRows(2),
935			want:   "gwashington-j§adams-1,j§adams-tjefferson-1",
936		},
937		{
938			desc:       "apply labels to the result rows",
939			rr:         RowRange{},
940			filter:     LabelFilter("test-label"),
941			limit:      LimitRows(2),
942			want:       "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1",
943			wantLabels: []string{"test-label", "test-label", "test-label"},
944		},
945		{
946			desc:   "read all, strip values",
947			rr:     RowRange{},
948			filter: StripValueFilter(),
949			want:   "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
950		},
951		{
952			desc:   "read with ColumnFilter + row limit + strip values",
953			rr:     RowRange{},
954			filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "j§adams" and "tjefferson"
955			limit:  LimitRows(2),
956			want:   "gwashington-j§adams-,j§adams-tjefferson-",
957		},
958		{
959			desc:   "read with condition, strip values on true",
960			rr:     RowRange{},
961			filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil),
962			want:   "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
963		},
964		{
965			desc:   "read with condition, strip values on false",
966			rr:     RowRange{},
967			filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()),
968			want:   "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
969		},
970		{
971			desc:   "read with ValueRangeFilter + row limit",
972			rr:     RowRange{},
973			filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1"
974			limit:  LimitRows(2),
975			want:   "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1",
976		},
977		{
978			desc:   "read with ValueRangeFilter, no match on exclusive end",
979			rr:     RowRange{},
980			filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match
981			want:   "",
982		},
983		{
984			desc:   "read with ValueRangeFilter, no matches",
985			rr:     RowRange{},
986			filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing
987			want:   "",
988		},
989		{
990			desc:   "read with InterleaveFilter, no matches on all filters",
991			rr:     RowRange{},
992			filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")),
993			want:   "",
994		},
995		{
996			desc:   "read with InterleaveFilter, no duplicate cells",
997			rr:     RowRange{},
998			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")),
999			want:   "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,wmckinley-tjefferson-1",
1000		},
1001		{
1002			desc:   "read with InterleaveFilter, with duplicate cells",
1003			rr:     RowRange{},
1004			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")),
1005			want:   "j§adams-gwashington-1,j§adams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1",
1006		},
1007		{
1008			desc: "read with a RowRangeList and no filter",
1009			rr:   RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")},
1010			want: "gwashington-j§adams-1,wmckinley-tjefferson-1",
1011		},
1012		{
1013			desc:   "chain that excludes rows and matches nothing, in a condition",
1014			rr:     RowRange{},
1015			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil),
1016			want:   "",
1017		},
1018		{
1019			desc:   "chain that ends with an interleave that has no match. covers #804",
1020			rr:     RowRange{},
1021			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil),
1022			want:   "",
1023		},
1024	} {
1025		t.Run(test.desc, func(t *testing.T) {
1026			var opts []ReadOption
1027			if test.filter != nil {
1028				opts = append(opts, RowFilter(test.filter))
1029			}
1030			if test.limit != nil {
1031				opts = append(opts, test.limit)
1032			}
1033			var elt, labels []string
1034			err := table.ReadRows(ctx, test.rr, func(r Row) bool {
1035				for _, ris := range r {
1036					for _, ri := range ris {
1037						labels = append(labels, ri.Labels...)
1038						elt = append(elt, formatReadItem(ri))
1039					}
1040				}
1041				return true
1042			}, opts...)
1043			if err != nil {
1044				t.Fatal(err)
1045			}
1046			verifyDirectPathRemoteAddress(testEnv, t)
1047			if got := strings.Join(elt, ","); got != test.want {
1048				t.Fatalf("got %q\nwant %q", got, test.want)
1049			}
1050			if got, want := labels, test.wantLabels; !reflect.DeepEqual(got, want) {
1051				t.Fatalf("got %q\nwant %q", got, want)
1052			}
1053		})
1054	}
1055}
1056
1057func TestIntegration_SampleRowKeys(t *testing.T) {
1058	ctx := context.Background()
1059	testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
1060	if err != nil {
1061		t.Fatal(err)
1062	}
1063	defer cleanup()
1064
1065	// Insert some data.
1066	initialData := map[string][]string{
1067		"wmckinley11":   {"tjefferson11"},
1068		"gwashington77": {"j§adams77"},
1069		"tjefferson0":   {"gwashington0", "j§adams0"},
1070	}
1071
1072	for row, ss := range initialData {
1073		mut := NewMutation()
1074		for _, name := range ss {
1075			mut.Set("follows", name, 1000, []byte("1"))
1076		}
1077		if err := table.Apply(ctx, row, mut); err != nil {
1078			t.Fatalf("Mutating row %q: %v", row, err)
1079		}
1080		verifyDirectPathRemoteAddress(testEnv, t)
1081	}
1082	sampleKeys, err := table.SampleRowKeys(context.Background())
1083	if err != nil {
1084		t.Fatalf("%s: %v", "SampleRowKeys:", err)
1085	}
1086	if len(sampleKeys) == 0 {
1087		t.Error("SampleRowKeys length 0")
1088	}
1089}
1090
1091func TestIntegration_Admin(t *testing.T) {
1092	testEnv, err := NewIntegrationEnv()
1093	if err != nil {
1094		t.Fatalf("IntegrationEnv: %v", err)
1095	}
1096	defer testEnv.Close()
1097
1098	timeout := 2 * time.Second
1099	if testEnv.Config().UseProd {
1100		timeout = 5 * time.Minute
1101	}
1102	ctx, _ := context.WithTimeout(context.Background(), timeout)
1103
1104	adminClient, err := testEnv.NewAdminClient()
1105	if err != nil {
1106		t.Fatalf("NewAdminClient: %v", err)
1107	}
1108	defer adminClient.Close()
1109
1110	iAdminClient, err := testEnv.NewInstanceAdminClient()
1111	if err != nil {
1112		t.Fatalf("NewInstanceAdminClient: %v", err)
1113	}
1114	if iAdminClient != nil {
1115		defer iAdminClient.Close()
1116
1117		iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1118		if err != nil {
1119			t.Errorf("InstanceInfo: %v", err)
1120		}
1121		if iInfo.Name != adminClient.instance {
1122			t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1123		}
1124	}
1125
1126	list := func() []string {
1127		tbls, err := adminClient.Tables(ctx)
1128		if err != nil {
1129			t.Fatalf("Fetching list of tables: %v", err)
1130		}
1131		sort.Strings(tbls)
1132		return tbls
1133	}
1134	containsAll := func(got, want []string) bool {
1135		gotSet := make(map[string]bool)
1136
1137		for _, s := range got {
1138			gotSet[s] = true
1139		}
1140		for _, s := range want {
1141			if !gotSet[s] {
1142				return false
1143			}
1144		}
1145		return true
1146	}
1147
1148	defer deleteTable(ctx, t, adminClient, "mytable")
1149
1150	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1151		t.Fatalf("Creating table: %v", err)
1152	}
1153
1154	defer deleteTable(ctx, t, adminClient, "myothertable")
1155
1156	if err := adminClient.CreateTable(ctx, "myothertable"); err != nil {
1157		t.Fatalf("Creating table: %v", err)
1158	}
1159
1160	if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) {
1161		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1162	}
1163
1164	must(adminClient.WaitForReplication(ctx, "mytable"))
1165
1166	if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil {
1167		t.Fatalf("Deleting table: %v", err)
1168	}
1169	tables := list()
1170	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1171		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1172	}
1173	if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) {
1174		t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted)
1175	}
1176
1177	tblConf := TableConf{
1178		TableID: "conftable",
1179		Families: map[string]GCPolicy{
1180			"fam1": MaxVersionsPolicy(1),
1181			"fam2": MaxVersionsPolicy(2),
1182		},
1183	}
1184	if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
1185		t.Fatalf("Creating table from TableConf: %v", err)
1186	}
1187	defer deleteTable(ctx, t, adminClient, tblConf.TableID)
1188
1189	tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID)
1190	if err != nil {
1191		t.Fatalf("Getting table info: %v", err)
1192	}
1193	sort.Strings(tblInfo.Families)
1194	wantFams := []string{"fam1", "fam2"}
1195	if !testutil.Equal(tblInfo.Families, wantFams) {
1196		t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams)
1197	}
1198
1199	// Populate mytable and drop row ranges
1200	if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil {
1201		t.Fatalf("Creating column family: %v", err)
1202	}
1203
1204	client, err := testEnv.NewClient()
1205	if err != nil {
1206		t.Fatalf("NewClient: %v", err)
1207	}
1208	defer client.Close()
1209
1210	tbl := client.Open("mytable")
1211
1212	prefixes := []string{"a", "b", "c"}
1213	for _, prefix := range prefixes {
1214		for i := 0; i < 5; i++ {
1215			mut := NewMutation()
1216			mut.Set("cf", "col", 1000, []byte("1"))
1217			if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil {
1218				t.Fatalf("Mutating row: %v", err)
1219			}
1220		}
1221	}
1222
1223	if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil {
1224		t.Errorf("DropRowRange a: %v", err)
1225	}
1226	if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil {
1227		t.Errorf("DropRowRange c: %v", err)
1228	}
1229	if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil {
1230		t.Errorf("DropRowRange x: %v", err)
1231	}
1232
1233	var gotRowCount int
1234	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1235		gotRowCount++
1236		if !strings.HasPrefix(row.Key(), "b") {
1237			t.Errorf("Invalid row after dropping range: %v", row)
1238		}
1239		return true
1240	}))
1241	if gotRowCount != 5 {
1242		t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5)
1243	}
1244
1245	if err = adminClient.DropAllRows(ctx, "mytable"); err != nil {
1246		t.Errorf("DropAllRows mytable: %v", err)
1247	}
1248
1249	gotRowCount = 0
1250	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1251		gotRowCount++
1252		return true
1253	}))
1254	if gotRowCount != 0 {
1255		t.Errorf("Invalid row count after truncating table: got %v, want %v", gotRowCount, 0)
1256	}
1257}
1258
1259func TestIntegration_TableIam(t *testing.T) {
1260	testEnv, err := NewIntegrationEnv()
1261	if err != nil {
1262		t.Fatalf("IntegrationEnv: %v", err)
1263	}
1264	defer testEnv.Close()
1265
1266	if !testEnv.Config().UseProd {
1267		t.Skip("emulator doesn't support IAM Policy creation")
1268	}
1269
1270	timeout := 5 * time.Minute
1271	ctx, _ := context.WithTimeout(context.Background(), timeout)
1272
1273	adminClient, err := testEnv.NewAdminClient()
1274	if err != nil {
1275		t.Fatalf("NewAdminClient: %v", err)
1276	}
1277	defer adminClient.Close()
1278
1279	defer deleteTable(ctx, t, adminClient, "mytable")
1280	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1281		t.Fatalf("Creating table: %v", err)
1282	}
1283
1284	// Verify that the IAM Controls work for Tables.
1285	iamHandle := adminClient.TableIAM("mytable")
1286	p, err := iamHandle.Policy(ctx)
1287	if err != nil {
1288		t.Errorf("Iam GetPolicy mytable: %v", err)
1289	}
1290	if err = iamHandle.SetPolicy(ctx, p); err != nil {
1291		t.Errorf("Iam SetPolicy mytable: %v", err)
1292	}
1293	if _, err = iamHandle.TestPermissions(ctx, []string{"bigtable.tables.get"}); err != nil {
1294		t.Errorf("Iam TestPermissions mytable: %v", err)
1295	}
1296}
1297
1298func TestIntegration_BackupIAM(t *testing.T) {
1299	testEnv, err := NewIntegrationEnv()
1300	if err != nil {
1301		t.Fatalf("IntegrationEnv: %v", err)
1302	}
1303	defer testEnv.Close()
1304
1305	if !testEnv.Config().UseProd {
1306		t.Skip("emulator doesn't support IAM Policy creation")
1307	}
1308	timeout := 5 * time.Minute
1309	ctx, _ := context.WithTimeout(context.Background(), timeout)
1310
1311	adminClient, err := testEnv.NewAdminClient()
1312	if err != nil {
1313		t.Fatalf("NewAdminClient: %v", err)
1314	}
1315	defer adminClient.Close()
1316
1317	table := testEnv.Config().Table
1318	cluster := testEnv.Config().Cluster
1319
1320	defer deleteTable(ctx, t, adminClient, table)
1321	if err := adminClient.CreateTable(ctx, table); err != nil {
1322		t.Fatalf("Creating table: %v", err)
1323	}
1324	// Create backup.
1325	backup := "backup"
1326	defer adminClient.DeleteBackup(ctx, cluster, backup)
1327	if err = adminClient.CreateBackup(ctx, table, cluster, backup, time.Now().Add(8*time.Hour)); err != nil {
1328		t.Fatalf("Creating backup: %v", err)
1329	}
1330	iamHandle := adminClient.BackupIAM(cluster, backup)
1331	// Get backup policy.
1332	p, err := iamHandle.Policy(ctx)
1333	if err != nil {
1334		t.Errorf("iamHandle.Policy: %v", err)
1335	}
1336	// The resource is new, so the policy should be empty.
1337	if got := p.Roles(); len(got) > 0 {
1338		t.Errorf("got roles %v, want none", got)
1339	}
1340	// Set backup policy.
1341	member := "domain:google.com"
1342	// Add a member, set the policy, then check that the member is present.
1343	p.Add(member, iam.Viewer)
1344	if err = iamHandle.SetPolicy(ctx, p); err != nil {
1345		t.Errorf("iamHandle.SetPolicy: %v", err)
1346	}
1347	p, err = iamHandle.Policy(ctx)
1348	if err != nil {
1349		t.Errorf("iamHandle.Policy: %v", err)
1350	}
1351	if got, want := p.Members(iam.Viewer), []string{member}; !testutil.Equal(got, want) {
1352		t.Errorf("iamHandle.Policy: got %v, want %v", got, want)
1353	}
1354	// Test backup permissions.
1355	permissions := []string{"bigtable.backups.get", "bigtable.backups.update"}
1356	_, err = iamHandle.TestPermissions(ctx, permissions)
1357	if err != nil {
1358		t.Errorf("iamHandle.TestPermissions: %v", err)
1359	}
1360}
1361
1362func TestIntegration_AdminCreateInstance(t *testing.T) {
1363	if instanceToCreate == "" {
1364		t.Skip("instanceToCreate not set, skipping instance creation testing")
1365	}
1366
1367	testEnv, err := NewIntegrationEnv()
1368	if err != nil {
1369		t.Fatalf("IntegrationEnv: %v", err)
1370	}
1371	defer testEnv.Close()
1372
1373	if !testEnv.Config().UseProd {
1374		t.Skip("emulator doesn't support instance creation")
1375	}
1376
1377	timeout := 5 * time.Minute
1378	ctx, _ := context.WithTimeout(context.Background(), timeout)
1379
1380	iAdminClient, err := testEnv.NewInstanceAdminClient()
1381	if err != nil {
1382		t.Fatalf("NewInstanceAdminClient: %v", err)
1383	}
1384	defer iAdminClient.Close()
1385
1386	clusterID := instanceToCreate + "-cluster"
1387
1388	// Create a development instance
1389	conf := &InstanceConf{
1390		InstanceId:   instanceToCreate,
1391		ClusterId:    clusterID,
1392		DisplayName:  "test instance",
1393		Zone:         instanceToCreateZone,
1394		InstanceType: DEVELOPMENT,
1395		Labels:       map[string]string{"test-label-key": "test-label-value"},
1396	}
1397	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1398		t.Fatalf("CreateInstance: %v", err)
1399	}
1400	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1401
1402	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1403	if err != nil {
1404		t.Fatalf("InstanceInfo: %v", err)
1405	}
1406
1407	// Basic return values are tested elsewhere, check instance type
1408	if iInfo.InstanceType != DEVELOPMENT {
1409		t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType)
1410	}
1411	if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
1412		t.Fatalf("Labels: %v, want: %v", got, want)
1413	}
1414
1415	// Update everything we can about the instance in one call.
1416	confWithClusters := &InstanceWithClustersConfig{
1417		InstanceID:   instanceToCreate,
1418		DisplayName:  "new display name",
1419		InstanceType: PRODUCTION,
1420		Labels:       map[string]string{"new-label-key": "new-label-value"},
1421		Clusters: []ClusterConfig{
1422			{ClusterID: clusterID, NumNodes: 5},
1423		},
1424	}
1425
1426	if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil {
1427		t.Fatalf("UpdateInstanceWithClusters: %v", err)
1428	}
1429
1430	iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
1431	if err != nil {
1432		t.Fatalf("InstanceInfo: %v", err)
1433	}
1434
1435	if iInfo.InstanceType != PRODUCTION {
1436		t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType)
1437	}
1438	if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) {
1439		t.Fatalf("Labels: %v, want: %v", got, want)
1440	}
1441	if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
1442		t.Fatalf("Display name: %q, want: %q", got, want)
1443	}
1444
1445	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1446	if err != nil {
1447		t.Fatalf("GetCluster: %v", err)
1448	}
1449
1450	if cInfo.ServeNodes != 5 {
1451		t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
1452	}
1453}
1454
1455func TestIntegration_AdminUpdateInstanceLabels(t *testing.T) {
1456	// Check the environments
1457	if instanceToCreate == "" {
1458		t.Skip("instanceToCreate not set, skipping instance creation testing")
1459	}
1460	testEnv, err := NewIntegrationEnv()
1461	if err != nil {
1462		t.Fatalf("IntegrationEnv: %v", err)
1463	}
1464	defer testEnv.Close()
1465	if !testEnv.Config().UseProd {
1466		t.Skip("emulator doesn't support instance creation")
1467	}
1468
1469	// Create an instance admin client
1470	timeout := 5 * time.Minute
1471	ctx, _ := context.WithTimeout(context.Background(), timeout)
1472	iAdminClient, err := testEnv.NewInstanceAdminClient()
1473	if err != nil {
1474		t.Fatalf("NewInstanceAdminClient: %v", err)
1475	}
1476	defer iAdminClient.Close()
1477
1478	// Create a test instance
1479	conf := &InstanceConf{
1480		InstanceId:   instanceToCreate,
1481		ClusterId:    instanceToCreate + "-cluster",
1482		DisplayName:  "test instance",
1483		InstanceType: DEVELOPMENT,
1484		Zone:         instanceToCreateZone,
1485	}
1486	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1487		t.Fatalf("CreateInstance: %v", err)
1488	}
1489	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1490
1491	// Check the created test instances
1492	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1493	if err != nil {
1494		t.Fatalf("InstanceInfo: %v", err)
1495	}
1496	if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
1497		t.Fatalf("Labels: %v, want: %v", got, want)
1498	}
1499
1500	// Test patterns to update instance labels
1501	tests := []struct {
1502		name string
1503		in   map[string]string
1504		out  map[string]string
1505	}{
1506		{
1507			name: "update labels",
1508			in:   map[string]string{"test-label-key": "test-label-value"},
1509			out:  map[string]string{"test-label-key": "test-label-value"},
1510		},
1511		{
1512			name: "update multiple labels",
1513			in:   map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
1514			out:  map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
1515		},
1516		{
1517			name: "not update existing labels",
1518			in:   nil, // nil map
1519			out:  map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
1520		},
1521		{
1522			name: "delete labels",
1523			in:   map[string]string{}, // empty map
1524			out:  nil,
1525		},
1526	}
1527	for _, tt := range tests {
1528		t.Run(tt.name, func(t *testing.T) {
1529			confWithClusters := &InstanceWithClustersConfig{
1530				InstanceID: instanceToCreate,
1531				Labels:     tt.in,
1532			}
1533			if err := iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil {
1534				t.Fatalf("UpdateInstanceWithClusters: %v", err)
1535			}
1536			iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1537			if err != nil {
1538				t.Fatalf("InstanceInfo: %v", err)
1539			}
1540			if got, want := iInfo.Labels, tt.out; !cmp.Equal(got, want) {
1541				t.Fatalf("Labels: %v, want: %v", got, want)
1542			}
1543		})
1544	}
1545}
1546
1547func TestIntegration_AdminUpdateInstanceAndSyncClusters(t *testing.T) {
1548	if instanceToCreate == "" {
1549		t.Skip("instanceToCreate not set, skipping instance update testing")
1550	}
1551
1552	testEnv, err := NewIntegrationEnv()
1553	if err != nil {
1554		t.Fatalf("IntegrationEnv: %v", err)
1555	}
1556	defer testEnv.Close()
1557
1558	if !testEnv.Config().UseProd {
1559		t.Skip("emulator doesn't support instance creation")
1560	}
1561
1562	timeout := 5 * time.Minute
1563	ctx, _ := context.WithTimeout(context.Background(), timeout)
1564
1565	iAdminClient, err := testEnv.NewInstanceAdminClient()
1566	if err != nil {
1567		t.Fatalf("NewInstanceAdminClient: %v", err)
1568	}
1569	defer iAdminClient.Close()
1570
1571	clusterID := instanceToCreate + "-cluster"
1572
1573	// Create a development instance
1574	conf := &InstanceConf{
1575		InstanceId:   instanceToCreate,
1576		ClusterId:    clusterID,
1577		DisplayName:  "test instance",
1578		Zone:         instanceToCreateZone,
1579		InstanceType: DEVELOPMENT,
1580		Labels:       map[string]string{"test-label-key": "test-label-value"},
1581	}
1582	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1583		t.Fatalf("CreateInstance: %v", err)
1584	}
1585	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1586
1587	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1588	if err != nil {
1589		t.Fatalf("InstanceInfo: %v", err)
1590	}
1591
1592	// Basic return values are tested elsewhere, check instance type
1593	if iInfo.InstanceType != DEVELOPMENT {
1594		t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType)
1595	}
1596	if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
1597		t.Fatalf("Labels: %v, want: %v", got, want)
1598	}
1599
1600	// Update everything we can about the instance in one call.
1601	confWithClusters := &InstanceWithClustersConfig{
1602		InstanceID:   instanceToCreate,
1603		DisplayName:  "new display name",
1604		InstanceType: PRODUCTION,
1605		Labels:       map[string]string{"new-label-key": "new-label-value"},
1606		Clusters: []ClusterConfig{
1607			{ClusterID: clusterID, NumNodes: 5},
1608		},
1609	}
1610
1611	results, err := UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1612	if err != nil {
1613		t.Fatalf("UpdateInstanceAndSyncClusters: %v", err)
1614	}
1615
1616	wantResults := UpdateInstanceResults{
1617		InstanceUpdated: true,
1618		UpdatedClusters: []string{clusterID},
1619	}
1620	if diff := testutil.Diff(*results, wantResults); diff != "" {
1621		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1622	}
1623
1624	iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
1625	if err != nil {
1626		t.Fatalf("InstanceInfo: %v", err)
1627	}
1628
1629	if iInfo.InstanceType != PRODUCTION {
1630		t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType)
1631	}
1632	if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) {
1633		t.Fatalf("Labels: %v, want: %v", got, want)
1634	}
1635	if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
1636		t.Fatalf("Display name: %q, want: %q", got, want)
1637	}
1638
1639	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1640	if err != nil {
1641		t.Fatalf("GetCluster: %v", err)
1642	}
1643
1644	if cInfo.ServeNodes != 5 {
1645		t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
1646	}
1647
1648	// Now add a second cluster as the only change. The first cluster must also be provided so
1649	// it is not removed.
1650	clusterID2 := clusterID + "-2"
1651	confWithClusters = &InstanceWithClustersConfig{
1652		InstanceID: instanceToCreate,
1653		Clusters: []ClusterConfig{
1654			{ClusterID: clusterID},
1655			{ClusterID: clusterID2, NumNodes: 3, StorageType: SSD, Zone: instanceToCreateZone2},
1656		},
1657	}
1658
1659	results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1660	if err != nil {
1661		t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err)
1662	}
1663
1664	wantResults = UpdateInstanceResults{
1665		InstanceUpdated: false,
1666		CreatedClusters: []string{clusterID2},
1667	}
1668	if diff := testutil.Diff(*results, wantResults); diff != "" {
1669		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1670	}
1671
1672	// Now update one cluster and delete the other
1673	confWithClusters = &InstanceWithClustersConfig{
1674		InstanceID: instanceToCreate,
1675		Clusters: []ClusterConfig{
1676			{ClusterID: clusterID, NumNodes: 4},
1677		},
1678	}
1679
1680	results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1681	if err != nil {
1682		t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err)
1683	}
1684
1685	wantResults = UpdateInstanceResults{
1686		InstanceUpdated: false,
1687		UpdatedClusters: []string{clusterID},
1688		DeletedClusters: []string{clusterID2},
1689	}
1690
1691	if diff := testutil.Diff(*results, wantResults); diff != "" {
1692		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1693	}
1694
1695	// Make sure the instance looks as we would expect
1696	clusters, err := iAdminClient.Clusters(ctx, conf.InstanceId)
1697	if err != nil {
1698		t.Fatalf("Clusters: %v", err)
1699	}
1700
1701	if len(clusters) != 1 {
1702		t.Fatalf("Clusters length %v, want: 1", len(clusters))
1703	}
1704
1705	wantCluster := &ClusterInfo{
1706		Name:       clusterID,
1707		Zone:       instanceToCreateZone,
1708		ServeNodes: 4,
1709		State:      "READY",
1710	}
1711	if diff := testutil.Diff(clusters[0], wantCluster); diff != "" {
1712		t.Fatalf("InstanceEquality: got - want +\n%s", diff)
1713	}
1714}
1715
1716func TestIntegration_AdminSnapshot(t *testing.T) {
1717	testEnv, err := NewIntegrationEnv()
1718	if err != nil {
1719		t.Fatalf("IntegrationEnv: %v", err)
1720	}
1721	defer testEnv.Close()
1722
1723	if !testEnv.Config().UseProd {
1724		t.Skip("emulator doesn't support snapshots")
1725	}
1726
1727	timeout := 2 * time.Second
1728	if testEnv.Config().UseProd {
1729		timeout = 5 * time.Minute
1730	}
1731	ctx, _ := context.WithTimeout(context.Background(), timeout)
1732
1733	adminClient, err := testEnv.NewAdminClient()
1734	if err != nil {
1735		t.Fatalf("NewAdminClient: %v", err)
1736	}
1737	defer adminClient.Close()
1738
1739	table := testEnv.Config().Table
1740	cluster := testEnv.Config().Cluster
1741
1742	list := func(cluster string) ([]*SnapshotInfo, error) {
1743		infos := []*SnapshotInfo(nil)
1744
1745		it := adminClient.Snapshots(ctx, cluster)
1746		for {
1747			s, err := it.Next()
1748			if err == iterator.Done {
1749				break
1750			}
1751			if err != nil {
1752				return nil, err
1753			}
1754			infos = append(infos, s)
1755		}
1756		return infos, err
1757	}
1758
1759	// Delete the table at the end of the test. Schedule ahead of time
1760	// in case the client fails
1761	defer deleteTable(ctx, t, adminClient, table)
1762
1763	if err := adminClient.CreateTable(ctx, table); err != nil {
1764		t.Fatalf("Creating table: %v", err)
1765	}
1766
1767	// Precondition: no snapshots
1768	snapshots, err := list(cluster)
1769	if err != nil {
1770		t.Fatalf("Initial snapshot list: %v", err)
1771	}
1772	if got, want := len(snapshots), 0; got != want {
1773		t.Fatalf("Initial snapshot list len: %d, want: %d", got, want)
1774	}
1775
1776	// Create snapshot
1777	defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot")
1778
1779	if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil {
1780		t.Fatalf("Creating snaphot: %v", err)
1781	}
1782
1783	// List snapshot
1784	snapshots, err = list(cluster)
1785	if err != nil {
1786		t.Fatalf("Listing snapshots: %v", err)
1787	}
1788	if got, want := len(snapshots), 1; got != want {
1789		t.Fatalf("Listing snapshot count: %d, want: %d", got, want)
1790	}
1791	if got, want := snapshots[0].Name, "mysnapshot"; got != want {
1792		t.Fatalf("Snapshot name: %s, want: %s", got, want)
1793	}
1794	if got, want := snapshots[0].SourceTable, table; got != want {
1795		t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want)
1796	}
1797	if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
1798		t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want)
1799	}
1800
1801	// Get snapshot
1802	snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot")
1803	if err != nil {
1804		t.Fatalf("SnapshotInfo: %v", snapshot)
1805	}
1806	if got, want := *snapshot, *snapshots[0]; got != want {
1807		t.Fatalf("SnapshotInfo: %v, want: %v", got, want)
1808	}
1809
1810	// Restore
1811	restoredTable := table + "-restored"
1812	defer deleteTable(ctx, t, adminClient, restoredTable)
1813	if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil {
1814		t.Fatalf("CreateTableFromSnapshot: %v", err)
1815	}
1816	if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
1817		t.Fatalf("Restored TableInfo: %v", err)
1818	}
1819
1820	// Delete snapshot
1821	if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil {
1822		t.Fatalf("DeleteSnapshot: %v", err)
1823	}
1824	snapshots, err = list(cluster)
1825	if err != nil {
1826		t.Fatalf("List after Delete: %v", err)
1827	}
1828	if got, want := len(snapshots), 0; got != want {
1829		t.Fatalf("List after delete len: %d, want: %d", got, want)
1830	}
1831}
1832
1833// instanceAdminClientMock is used to test FailedLocations field processing.
1834type instanceAdminClientMock struct {
1835	Clusters             []*btapb.Cluster
1836	UnavailableLocations []string
1837}
1838
1839func (iacm *instanceAdminClientMock) ListClusters(ctx context.Context, req *btapb.ListClustersRequest, opts ...grpc.CallOption) (*btapb.ListClustersResponse, error) {
1840	res := btapb.ListClustersResponse{
1841		Clusters:        iacm.Clusters,
1842		FailedLocations: iacm.UnavailableLocations,
1843	}
1844	return &res, nil
1845}
1846
1847func (iacm *instanceAdminClientMock) CreateInstance(ctx context.Context, in *btapb.CreateInstanceRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
1848	return nil, nil
1849}
1850
1851func (iacm *instanceAdminClientMock) GetInstance(ctx context.Context, in *btapb.GetInstanceRequest, opts ...grpc.CallOption) (*btapb.Instance, error) {
1852	return nil, nil
1853}
1854
1855func (iacm *instanceAdminClientMock) ListInstances(ctx context.Context, in *btapb.ListInstancesRequest, opts ...grpc.CallOption) (*btapb.ListInstancesResponse, error) {
1856	return nil, nil
1857}
1858
1859func (iacm *instanceAdminClientMock) UpdateInstance(ctx context.Context, in *btapb.Instance, opts ...grpc.CallOption) (*btapb.Instance, error) {
1860	return nil, nil
1861}
1862
1863func (iacm *instanceAdminClientMock) PartialUpdateInstance(ctx context.Context, in *btapb.PartialUpdateInstanceRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
1864	return nil, nil
1865}
1866
1867func (iacm *instanceAdminClientMock) DeleteInstance(ctx context.Context, in *btapb.DeleteInstanceRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
1868	return nil, nil
1869}
1870
1871func (iacm *instanceAdminClientMock) CreateCluster(ctx context.Context, in *btapb.CreateClusterRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
1872	return nil, nil
1873}
1874
1875func (iacm *instanceAdminClientMock) GetCluster(ctx context.Context, in *btapb.GetClusterRequest, opts ...grpc.CallOption) (*btapb.Cluster, error) {
1876	return nil, nil
1877}
1878
1879func (iacm *instanceAdminClientMock) UpdateCluster(ctx context.Context, in *btapb.Cluster, opts ...grpc.CallOption) (*longrunning.Operation, error) {
1880	return nil, nil
1881}
1882
1883func (iacm *instanceAdminClientMock) DeleteCluster(ctx context.Context, in *btapb.DeleteClusterRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
1884	return nil, nil
1885}
1886
1887func (iacm *instanceAdminClientMock) CreateAppProfile(ctx context.Context, in *btapb.CreateAppProfileRequest, opts ...grpc.CallOption) (*btapb.AppProfile, error) {
1888	return nil, nil
1889}
1890
1891func (iacm *instanceAdminClientMock) GetAppProfile(ctx context.Context, in *btapb.GetAppProfileRequest, opts ...grpc.CallOption) (*btapb.AppProfile, error) {
1892	return nil, nil
1893}
1894
1895func (iacm *instanceAdminClientMock) ListAppProfiles(ctx context.Context, in *btapb.ListAppProfilesRequest, opts ...grpc.CallOption) (*btapb.ListAppProfilesResponse, error) {
1896	return nil, nil
1897}
1898
1899func (iacm *instanceAdminClientMock) UpdateAppProfile(ctx context.Context, in *btapb.UpdateAppProfileRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
1900	return nil, nil
1901}
1902
1903func (iacm *instanceAdminClientMock) DeleteAppProfile(ctx context.Context, in *btapb.DeleteAppProfileRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
1904	return nil, nil
1905}
1906
1907func (iacm *instanceAdminClientMock) GetIamPolicy(ctx context.Context, in *v1.GetIamPolicyRequest, opts ...grpc.CallOption) (*v1.Policy, error) {
1908	return nil, nil
1909}
1910
1911func (iacm *instanceAdminClientMock) SetIamPolicy(ctx context.Context, in *v1.SetIamPolicyRequest, opts ...grpc.CallOption) (*v1.Policy, error) {
1912	return nil, nil
1913}
1914
1915func (iacm *instanceAdminClientMock) TestIamPermissions(ctx context.Context, in *v1.TestIamPermissionsRequest, opts ...grpc.CallOption) (*v1.TestIamPermissionsResponse, error) {
1916	return nil, nil
1917}
1918
1919func TestIntegration_InstanceAdminClient_Clusters_WithFailedLocations(t *testing.T) {
1920	testEnv, err := NewIntegrationEnv()
1921	if err != nil {
1922		t.Fatalf("IntegrationEnv: %v", err)
1923	}
1924	defer testEnv.Close()
1925
1926	if !testEnv.Config().UseProd {
1927		t.Skip("emulator doesn't support snapshots")
1928	}
1929
1930	iAdminClient, err := testEnv.NewInstanceAdminClient()
1931	if err != nil {
1932		t.Fatalf("NewInstanceAdminClient: %v", err)
1933	}
1934	defer iAdminClient.Close()
1935
1936	cluster1 := btapb.Cluster{Name: "cluster1"}
1937	failedLoc := "euro1"
1938
1939	iAdminClient.iClient = &instanceAdminClientMock{
1940		Clusters:             []*btapb.Cluster{&cluster1},
1941		UnavailableLocations: []string{failedLoc},
1942	}
1943
1944	cis, err := iAdminClient.Clusters(context.Background(), "instance-id")
1945	convertedErr, ok := err.(ErrPartiallyUnavailable)
1946	if !ok {
1947		t.Fatalf("want error ErrPartiallyUnavailable, got other")
1948	}
1949	if got, want := len(convertedErr.Locations), 1; got != want {
1950		t.Fatalf("want %v failed locations, got %v", want, got)
1951	}
1952	if got, want := convertedErr.Locations[0], failedLoc; got != want {
1953		t.Fatalf("want failed location %v, got %v", want, got)
1954	}
1955	if got, want := len(cis), 1; got != want {
1956		t.Fatalf("want %v failed locations, got %v", want, got)
1957	}
1958	if got, want := cis[0].Name, cluster1.Name; got != want {
1959		t.Fatalf("want cluster %v, got %v", want, got)
1960	}
1961}
1962
1963func TestIntegration_Granularity(t *testing.T) {
1964	testEnv, err := NewIntegrationEnv()
1965	if err != nil {
1966		t.Fatalf("IntegrationEnv: %v", err)
1967	}
1968	defer testEnv.Close()
1969
1970	timeout := 2 * time.Second
1971	if testEnv.Config().UseProd {
1972		timeout = 5 * time.Minute
1973	}
1974	ctx, _ := context.WithTimeout(context.Background(), timeout)
1975
1976	adminClient, err := testEnv.NewAdminClient()
1977	if err != nil {
1978		t.Fatalf("NewAdminClient: %v", err)
1979	}
1980	defer adminClient.Close()
1981
1982	list := func() []string {
1983		tbls, err := adminClient.Tables(ctx)
1984		if err != nil {
1985			t.Fatalf("Fetching list of tables: %v", err)
1986		}
1987		sort.Strings(tbls)
1988		return tbls
1989	}
1990	containsAll := func(got, want []string) bool {
1991		gotSet := make(map[string]bool)
1992
1993		for _, s := range got {
1994			gotSet[s] = true
1995		}
1996		for _, s := range want {
1997			if !gotSet[s] {
1998				return false
1999			}
2000		}
2001		return true
2002	}
2003
2004	defer deleteTable(ctx, t, adminClient, "mytable")
2005
2006	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
2007		t.Fatalf("Creating table: %v", err)
2008	}
2009
2010	tables := list()
2011	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
2012		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
2013	}
2014
2015	// calling ModifyColumnFamilies to check the granularity of table
2016	prefix := adminClient.instancePrefix()
2017	req := &btapb.ModifyColumnFamiliesRequest{
2018		Name: prefix + "/tables/" + "mytable",
2019		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
2020			Id:  "cf",
2021			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}},
2022		}},
2023	}
2024	table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req)
2025	if err != nil {
2026		t.Fatalf("Creating column family: %v", err)
2027	}
2028	if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) {
2029		t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS))
2030	}
2031}
2032
2033func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) {
2034	testEnv, err := NewIntegrationEnv()
2035	if err != nil {
2036		t.Fatalf("IntegrationEnv: %v", err)
2037	}
2038	defer testEnv.Close()
2039
2040	timeout := 2 * time.Second
2041	if testEnv.Config().UseProd {
2042		timeout = 5 * time.Minute
2043	}
2044	ctx, cancel := context.WithTimeout(context.Background(), timeout)
2045	defer cancel()
2046
2047	adminClient, err := testEnv.NewAdminClient()
2048	if err != nil {
2049		t.Fatalf("NewAdminClient: %v", err)
2050	}
2051	defer adminClient.Close()
2052
2053	iAdminClient, err := testEnv.NewInstanceAdminClient()
2054	if err != nil {
2055		t.Fatalf("NewInstanceAdminClient: %v", err)
2056	}
2057
2058	if iAdminClient == nil {
2059		return
2060	}
2061
2062	defer iAdminClient.Close()
2063	profile := ProfileConf{
2064		ProfileID:     "app_profile1",
2065		InstanceID:    adminClient.instance,
2066		ClusterID:     testEnv.Config().Cluster,
2067		Description:   "creating new app profile 1",
2068		RoutingPolicy: SingleClusterRouting,
2069	}
2070
2071	createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile)
2072	if err != nil {
2073		t.Fatalf("Creating app profile: %v", err)
2074	}
2075
2076	gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
2077	if err != nil {
2078		t.Fatalf("Get app profile: %v", err)
2079	}
2080
2081	if !proto.Equal(createdProfile, gotProfile) {
2082		t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name)
2083	}
2084
2085	list := func(instanceID string) ([]*btapb.AppProfile, error) {
2086		profiles := []*btapb.AppProfile(nil)
2087
2088		it := iAdminClient.ListAppProfiles(ctx, instanceID)
2089		for {
2090			s, err := it.Next()
2091			if err == iterator.Done {
2092				break
2093			}
2094			if err != nil {
2095				return nil, err
2096			}
2097			profiles = append(profiles, s)
2098		}
2099		return profiles, err
2100	}
2101
2102	profiles, err := list(adminClient.instance)
2103	if err != nil {
2104		t.Fatalf("List app profile: %v", err)
2105	}
2106
2107	if got, want := len(profiles), 1; got != want {
2108		t.Fatalf("Initial app profile list len: %d, want: %d", got, want)
2109	}
2110
2111	for _, test := range []struct {
2112		desc   string
2113		uattrs ProfileAttrsToUpdate
2114		want   *btapb.AppProfile // nil means error
2115	}{
2116		{
2117			desc:   "empty update",
2118			uattrs: ProfileAttrsToUpdate{},
2119			want:   nil,
2120		},
2121
2122		{
2123			desc:   "empty description update",
2124			uattrs: ProfileAttrsToUpdate{Description: ""},
2125			want: &btapb.AppProfile{
2126				Name:          gotProfile.Name,
2127				Description:   "",
2128				RoutingPolicy: gotProfile.RoutingPolicy,
2129				Etag:          gotProfile.Etag,
2130			},
2131		},
2132		{
2133			desc: "routing update",
2134			uattrs: ProfileAttrsToUpdate{
2135				RoutingPolicy: SingleClusterRouting,
2136				ClusterID:     testEnv.Config().Cluster,
2137			},
2138			want: &btapb.AppProfile{
2139				Name:        gotProfile.Name,
2140				Description: "",
2141				Etag:        gotProfile.Etag,
2142				RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
2143					SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
2144						ClusterId: testEnv.Config().Cluster,
2145					},
2146				},
2147			},
2148		},
2149	} {
2150		err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs)
2151		if err != nil {
2152			if test.want != nil {
2153				t.Errorf("%s: %v", test.desc, err)
2154			}
2155			continue
2156		}
2157		if err == nil && test.want == nil {
2158			t.Errorf("%s: got nil, want error", test.desc)
2159			continue
2160		}
2161
2162		got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
2163
2164		if !proto.Equal(got, test.want) {
2165			t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want)
2166		}
2167
2168	}
2169
2170	err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1")
2171	if err != nil {
2172		t.Fatalf("Delete app profile: %v", err)
2173	}
2174}
2175
2176func TestIntegration_InstanceUpdate(t *testing.T) {
2177	testEnv, err := NewIntegrationEnv()
2178	if err != nil {
2179		t.Fatalf("IntegrationEnv: %v", err)
2180	}
2181	defer testEnv.Close()
2182
2183	timeout := 2 * time.Second
2184	if testEnv.Config().UseProd {
2185		timeout = 5 * time.Minute
2186	}
2187	ctx, cancel := context.WithTimeout(context.Background(), timeout)
2188	defer cancel()
2189
2190	adminClient, err := testEnv.NewAdminClient()
2191	if err != nil {
2192		t.Fatalf("NewAdminClient: %v", err)
2193	}
2194
2195	defer adminClient.Close()
2196
2197	iAdminClient, err := testEnv.NewInstanceAdminClient()
2198	if err != nil {
2199		t.Fatalf("NewInstanceAdminClient: %v", err)
2200	}
2201
2202	if iAdminClient == nil {
2203		return
2204	}
2205
2206	defer iAdminClient.Close()
2207
2208	iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
2209	if err != nil {
2210		t.Errorf("InstanceInfo: %v", err)
2211	}
2212
2213	if iInfo.Name != adminClient.instance {
2214		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
2215	}
2216
2217	if iInfo.DisplayName != adminClient.instance {
2218		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
2219	}
2220
2221	const numNodes = 4
2222	// update cluster nodes
2223	if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil {
2224		t.Errorf("UpdateCluster: %v", err)
2225	}
2226
2227	// get cluster after updating
2228	cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster)
2229	if err != nil {
2230		t.Errorf("GetCluster %v", err)
2231	}
2232	if cis.ServeNodes != int(numNodes) {
2233		t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes))
2234	}
2235}
2236
2237func TestIntegration_AdminBackup(t *testing.T) {
2238	testEnv, err := NewIntegrationEnv()
2239	if err != nil {
2240		t.Fatalf("IntegrationEnv: %v", err)
2241	}
2242	defer testEnv.Close()
2243
2244	if !testEnv.Config().UseProd {
2245		t.Skip("emulator doesn't support backups")
2246	}
2247
2248	timeout := 5 * time.Minute
2249	ctx, _ := context.WithTimeout(context.Background(), timeout)
2250
2251	adminClient, err := testEnv.NewAdminClient()
2252	if err != nil {
2253		t.Fatalf("NewAdminClient: %v", err)
2254	}
2255	defer adminClient.Close()
2256
2257	table := testEnv.Config().Table
2258	cluster := testEnv.Config().Cluster
2259
2260	// Delete the table at the end of the test. Schedule ahead of time
2261	// in case the client fails
2262	defer deleteTable(ctx, t, adminClient, table)
2263
2264	list := func(cluster string) ([]*BackupInfo, error) {
2265		infos := []*BackupInfo(nil)
2266
2267		it := adminClient.Backups(ctx, cluster)
2268		for {
2269			s, err := it.Next()
2270			if err == iterator.Done {
2271				break
2272			}
2273			if err != nil {
2274				return nil, err
2275			}
2276			infos = append(infos, s)
2277		}
2278		return infos, err
2279	}
2280
2281	if err := adminClient.CreateTable(ctx, table); err != nil {
2282		t.Fatalf("Creating table: %v", err)
2283	}
2284
2285	// Precondition: no backups
2286	backups, err := list(cluster)
2287	if err != nil {
2288		t.Fatalf("Initial backup list: %v", err)
2289	}
2290	if got, want := len(backups), 0; got != want {
2291		t.Fatalf("Initial backup list len: %d, want: %d", got, want)
2292	}
2293
2294	// Create backup
2295	backupName := "mybackup"
2296	defer adminClient.DeleteBackup(ctx, cluster, "mybackup")
2297
2298	if err = adminClient.CreateBackup(ctx, table, cluster, backupName, time.Now().Add(8*time.Hour)); err != nil {
2299		t.Fatalf("Creating backup: %v", err)
2300	}
2301
2302	// List backup
2303	backups, err = list(cluster)
2304	if err != nil {
2305		t.Fatalf("Listing backups: %v", err)
2306	}
2307	if got, want := len(backups), 1; got != want {
2308		t.Fatalf("Listing backup count: %d, want: %d", got, want)
2309	}
2310	if got, want := backups[0].Name, backupName; got != want {
2311		t.Fatalf("Backup name: %s, want: %s", got, want)
2312	}
2313	if got, want := backups[0].SourceTable, table; got != want {
2314		t.Fatalf("Backup SourceTable: %s, want: %s", got, want)
2315	}
2316	if got, want := backups[0].ExpireTime, backups[0].StartTime.Add(8*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
2317		t.Fatalf("Backup ExpireTime: %s, want: %s", got, want)
2318	}
2319
2320	// Get backup
2321	backup, err := adminClient.BackupInfo(ctx, cluster, backupName)
2322	if err != nil {
2323		t.Fatalf("BackupInfo: %v", backup)
2324	}
2325	if got, want := *backup, *backups[0]; got != want {
2326		t.Fatalf("BackupInfo: %v, want: %v", got, want)
2327	}
2328
2329	// Update backup
2330	newExpireTime := time.Now().Add(10 * time.Hour)
2331	err = adminClient.UpdateBackup(ctx, cluster, backupName, newExpireTime)
2332	if err != nil {
2333		t.Fatalf("UpdateBackup failed: %v", err)
2334	}
2335
2336	// Check that updated backup has the correct expire time
2337	updatedBackup, err := adminClient.BackupInfo(ctx, cluster, backupName)
2338	if err != nil {
2339		t.Fatalf("BackupInfo: %v", err)
2340	}
2341	backup.ExpireTime = newExpireTime
2342	// Server clock and local clock may not be perfectly sync'ed.
2343	if got, want := *updatedBackup, *backup; got.ExpireTime.Sub(want.ExpireTime) > time.Minute {
2344		t.Fatalf("BackupInfo: %v, want: %v", got, want)
2345	}
2346
2347	// Restore backup
2348	restoredTable := table + "-restored"
2349	defer deleteTable(ctx, t, adminClient, restoredTable)
2350	if err = adminClient.RestoreTable(ctx, restoredTable, cluster, backupName); err != nil {
2351		t.Fatalf("RestoreTable: %v", err)
2352	}
2353	if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
2354		t.Fatalf("Restored TableInfo: %v", err)
2355	}
2356
2357	// Delete backup
2358	if err = adminClient.DeleteBackup(ctx, cluster, backupName); err != nil {
2359		t.Fatalf("DeleteBackup: %v", err)
2360	}
2361	backups, err = list(cluster)
2362	if err != nil {
2363		t.Fatalf("List after Delete: %v", err)
2364	}
2365	if got, want := len(backups), 0; got != want {
2366		t.Fatalf("List after delete len: %d, want: %d", got, want)
2367	}
2368}
2369
2370// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
2371func TestIntegration_DirectPathFallback(t *testing.T) {
2372	ctx := context.Background()
2373	testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
2374	if err != nil {
2375		t.Fatal(err)
2376	}
2377	defer cleanup()
2378
2379	if !testEnv.Config().AttemptDirectPath {
2380		t.Skip()
2381	}
2382
2383	if len(blackholeDpv6Cmd) == 0 {
2384		t.Fatal("-it.blackhole-dpv6-cmd unset")
2385	}
2386	if len(blackholeDpv4Cmd) == 0 {
2387		t.Fatal("-it.blackhole-dpv4-cmd unset")
2388	}
2389	if len(allowDpv6Cmd) == 0 {
2390		t.Fatal("-it.allowdpv6-cmd unset")
2391	}
2392	if len(allowDpv4Cmd) == 0 {
2393		t.Fatal("-it.allowdpv4-cmd unset")
2394	}
2395
2396	if err := populatePresidentsGraph(table); err != nil {
2397		t.Fatal(err)
2398	}
2399
2400	// Precondition: wait for DirectPath to connect.
2401	dpEnabled := examineTraffic(ctx, testEnv, table, false)
2402	if !dpEnabled {
2403		t.Fatalf("Failed to observe RPCs over DirectPath")
2404	}
2405
2406	// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
2407	blackholeDirectPath(testEnv, t)
2408	dpDisabled := examineTraffic(ctx, testEnv, table, true)
2409	if !dpDisabled {
2410		t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
2411	}
2412
2413	// Disable the blackhole, and client should use DirectPath again.
2414	allowDirectPath(testEnv, t)
2415	dpEnabled = examineTraffic(ctx, testEnv, table, false)
2416	if !dpEnabled {
2417		t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
2418	}
2419}
2420
2421// examineTraffic returns whether RPCs use DirectPath (blackholeDP = false) or CFE (blackholeDP = true).
2422func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool {
2423	numCount := 0
2424	const (
2425		numRPCsToSend  = 20
2426		minCompleteRPC = 40
2427	)
2428
2429	start := time.Now()
2430	for time.Since(start) < 2*time.Minute {
2431		for i := 0; i < numRPCsToSend; i++ {
2432			_, _ = table.ReadRow(ctx, "j§adams")
2433			if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != blackholeDP {
2434				numCount++
2435				if numCount >= minCompleteRPC {
2436					return true
2437				}
2438			}
2439			time.Sleep(100 * time.Millisecond)
2440		}
2441	}
2442	return false
2443}
2444
2445func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
2446	testEnv, err := NewIntegrationEnv()
2447	if err != nil {
2448		return nil, nil, nil, nil, "", nil, err
2449	}
2450
2451	var timeout time.Duration
2452	if testEnv.Config().UseProd {
2453		timeout = 10 * time.Minute
2454		t.Logf("Running test against production")
2455	} else {
2456		timeout = 5 * time.Minute
2457		t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint)
2458	}
2459	ctx, cancel := context.WithTimeout(ctx, timeout)
2460
2461	client, err := testEnv.NewClient()
2462	if err != nil {
2463		return nil, nil, nil, nil, "", nil, err
2464	}
2465
2466	adminClient, err := testEnv.NewAdminClient()
2467	if err != nil {
2468		return nil, nil, nil, nil, "", nil, err
2469	}
2470
2471	if testEnv.Config().UseProd {
2472		// TODO: tables may not be successfully deleted in some cases, and will
2473		// become obsolete. We may need a way to automatically delete them.
2474		tableName = tableNameSpace.New()
2475	} else {
2476		tableName = testEnv.Config().Table
2477	}
2478
2479	if err := adminClient.CreateTable(ctx, tableName); err != nil {
2480		cancel()
2481		return nil, nil, nil, nil, "", nil, err
2482	}
2483	if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil {
2484		cancel()
2485		return nil, nil, nil, nil, "", nil, err
2486	}
2487
2488	return testEnv, client, adminClient, client.Open(tableName), tableName, func() {
2489		if err := adminClient.DeleteTable(ctx, tableName); err != nil {
2490			t.Errorf("DeleteTable got error %v", err)
2491		}
2492		cancel()
2493		client.Close()
2494		adminClient.Close()
2495	}, nil
2496}
2497
2498func formatReadItem(ri ReadItem) string {
2499	// Use the column qualifier only to make the test data briefer.
2500	col := ri.Column[strings.Index(ri.Column, ":")+1:]
2501	return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value)
2502}
2503
2504func fill(b, sub []byte) {
2505	for len(b) > len(sub) {
2506		n := copy(b, sub)
2507		b = b[n:]
2508	}
2509}
2510
2511func clearTimestamps(r Row) {
2512	for _, ris := range r {
2513		for i := range ris {
2514			ris[i].Timestamp = 0
2515		}
2516	}
2517}
2518
2519func deleteTable(ctx context.Context, t *testing.T, ac *AdminClient, name string) {
2520	bo := gax.Backoff{
2521		Initial:    100 * time.Millisecond,
2522		Max:        2 * time.Second,
2523		Multiplier: 1.2,
2524	}
2525	ctx, _ = context.WithTimeout(ctx, time.Second*30)
2526
2527	err := internal.Retry(ctx, bo, func() (bool, error) {
2528		err := ac.DeleteTable(ctx, name)
2529		if err != nil {
2530			return false, err
2531		}
2532		return true, nil
2533	})
2534	if err != nil {
2535		t.Logf("DeleteTable: %v", err)
2536	}
2537}
2538
2539func verifyDirectPathRemoteAddress(testEnv IntegrationEnv, t *testing.T) {
2540	t.Helper()
2541	if !testEnv.Config().AttemptDirectPath {
2542		return
2543	}
2544	if remoteIP, res := isDirectPathRemoteAddress(testEnv); !res {
2545		if testEnv.Config().DirectPathIPV4Only {
2546			t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP)
2547		} else {
2548			t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP)
2549		}
2550	}
2551}
2552
2553func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) {
2554	remoteIP := testEnv.Peer().Addr.String()
2555	// DirectPath ipv4-only can only use ipv4 traffic.
2556	if testEnv.Config().DirectPathIPV4Only {
2557		return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix)
2558	}
2559	// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
2560	return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
2561}
2562
2563func blackholeDirectPath(testEnv IntegrationEnv, t *testing.T) {
2564	cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
2565	out, _ := cmdRes.CombinedOutput()
2566	t.Logf(string(out))
2567	if testEnv.Config().DirectPathIPV4Only {
2568		return
2569	}
2570	cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
2571	out, _ = cmdRes.CombinedOutput()
2572	t.Logf(string(out))
2573}
2574
2575func allowDirectPath(testEnv IntegrationEnv, t *testing.T) {
2576	cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
2577	out, _ := cmdRes.CombinedOutput()
2578	t.Logf(string(out))
2579	if testEnv.Config().DirectPathIPV4Only {
2580		return
2581	}
2582	cmdRes = exec.Command("bash", "-c", allowDpv6Cmd)
2583	out, _ = cmdRes.CombinedOutput()
2584	t.Logf(string(out))
2585}
2586