1/*
2Copyright 2019 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"context"
21	"flag"
22	"fmt"
23	"log"
24	"math"
25	"math/rand"
26	"os"
27	"os/exec"
28	"reflect"
29	"sort"
30	"strconv"
31	"strings"
32	"sync"
33	"testing"
34	"time"
35
36	"cloud.google.com/go/iam"
37	"cloud.google.com/go/internal"
38	"cloud.google.com/go/internal/testutil"
39	"cloud.google.com/go/internal/uid"
40	"github.com/golang/protobuf/proto"
41	"github.com/google/go-cmp/cmp"
42	gax "github.com/googleapis/gax-go/v2"
43	"google.golang.org/api/iterator"
44	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
45	v1 "google.golang.org/genproto/googleapis/iam/v1"
46	longrunning "google.golang.org/genproto/googleapis/longrunning"
47	grpc "google.golang.org/grpc"
48	"google.golang.org/grpc/codes"
49	"google.golang.org/grpc/status"
50	"google.golang.org/protobuf/types/known/emptypb"
51)
52
53const (
54	directPathIPV6Prefix      = "[2001:4860:8040"
55	directPathIPV4Prefix      = "34.126"
56	timeUntilResourceCleanup  = time.Hour * 12 // 12 hours
57	prefixOfInstanceResources = "bt-it-"
58)
59
60var (
61	presidentsSocialGraph = map[string][]string{
62		"wmckinley":   {"tjefferson"},
63		"gwashington": {"j§adams"},
64		"tjefferson":  {"gwashington", "j§adams"},
65		"j§adams":     {"gwashington", "tjefferson"},
66	}
67
68	tableNameSpace   = uid.NewSpace("cbt-test", &uid.Options{Short: true})
69	myTableName      = fmt.Sprintf("mytable-%d", time.Now().Unix())
70	myOtherTableName = fmt.Sprintf("myothertable-%d", time.Now().Unix())
71)
72
73func populatePresidentsGraph(table *Table) error {
74	ctx := context.Background()
75	for row, ss := range presidentsSocialGraph {
76		mut := NewMutation()
77		for _, name := range ss {
78			mut.Set("follows", name, 1000, []byte("1"))
79		}
80		if err := table.Apply(ctx, row, mut); err != nil {
81			return fmt.Errorf("Mutating row %q: %v", row, err)
82		}
83	}
84	return nil
85}
86
87var instanceToCreate string
88
89func init() {
90	if runCreateInstanceTests {
91		instanceToCreate = fmt.Sprintf("bt-it-%d", time.Now().Unix())
92	}
93}
94
95func TestMain(m *testing.M) {
96	flag.Parse()
97
98	env, err := NewIntegrationEnv()
99	if err != nil {
100		panic(fmt.Sprintf("there was an issue creating an integration env: %v", err))
101	}
102	c := env.Config()
103	if c.UseProd {
104		fmt.Printf(
105			"Note: when using prod, you must first create an instance:\n"+
106				"cbt createinstance %s %s %s %s %s SSD\n",
107			c.Instance, c.Instance,
108			c.Cluster, "us-central1-b", "1",
109		)
110	}
111	exit := m.Run()
112	if err := cleanup(c); err != nil {
113		log.Printf("Post-test cleanup failed: %v", err)
114	}
115	os.Exit(exit)
116}
117
118func cleanup(c IntegrationTestConfig) error {
119	// Cleanup resources marked with bt-it- after a time delay
120	if !c.UseProd {
121		return nil
122	}
123	ctx := context.Background()
124	iac, err := NewInstanceAdminClient(ctx, c.Project)
125	if err != nil {
126		return err
127	}
128	instances, err := iac.Instances(ctx)
129	if err != nil {
130		return err
131	}
132
133	for _, info := range instances {
134		if strings.HasPrefix(info.Name, prefixOfInstanceResources) {
135			timestamp := info.Name[len(prefixOfInstanceResources):]
136			t, err := strconv.ParseInt(timestamp, 10, 64)
137			if err != nil {
138				return err
139			}
140			uT := time.Unix(t, 0)
141			if time.Now().After(uT.Add(timeUntilResourceCleanup)) {
142				iac.DeleteInstance(ctx, info.Name)
143			}
144		}
145	}
146
147	return nil
148}
149
150func TestIntegration_ConditionalMutations(t *testing.T) {
151	ctx := context.Background()
152	testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
153	if err != nil {
154		t.Fatal(err)
155	}
156	defer cleanup()
157
158	if err := populatePresidentsGraph(table); err != nil {
159		t.Fatal(err)
160	}
161
162	// Do a conditional mutation with a complex filter.
163	mutTrue := NewMutation()
164	mutTrue.Set("follows", "wmckinley", 1000, []byte("1"))
165	filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter("."))
166	mut := NewCondMutation(filter, mutTrue, nil)
167	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
168		t.Fatalf("Conditionally mutating row: %v", err)
169	}
170	verifyDirectPathRemoteAddress(testEnv, t)
171	// Do a second condition mutation with a filter that does not match,
172	// and thus no changes should be made.
173	mutTrue = NewMutation()
174	mutTrue.DeleteRow()
175	filter = ColumnFilter("snoop.dogg")
176	mut = NewCondMutation(filter, mutTrue, nil)
177	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
178		t.Fatalf("Conditionally mutating row: %v", err)
179	}
180	verifyDirectPathRemoteAddress(testEnv, t)
181
182	// Fetch a row.
183	row, err := table.ReadRow(ctx, "j§adams")
184	if err != nil {
185		t.Fatalf("Reading a row: %v", err)
186	}
187	verifyDirectPathRemoteAddress(testEnv, t)
188	wantRow := Row{
189		"follows": []ReadItem{
190			{Row: "j§adams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")},
191			{Row: "j§adams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")},
192		},
193	}
194	if !testutil.Equal(row, wantRow) {
195		t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
196	}
197}
198
199func TestIntegration_PartialReadRows(t *testing.T) {
200	ctx := context.Background()
201	_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
202	if err != nil {
203		t.Fatal(err)
204	}
205	defer cleanup()
206
207	if err := populatePresidentsGraph(table); err != nil {
208		t.Fatal(err)
209	}
210
211	// Do a scan and stop part way through.
212	// Verify that the ReadRows callback doesn't keep running.
213	stopped := false
214	err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
215		if r.Key() < "h" {
216			return true
217		}
218		if !stopped {
219			stopped = true
220			return false
221		}
222		t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key())
223		return false
224	})
225	if err != nil {
226		t.Fatalf("Partial ReadRows: %v", err)
227	}
228}
229
230func TestIntegration_ReadRowList(t *testing.T) {
231	ctx := context.Background()
232	_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
233	if err != nil {
234		t.Fatal(err)
235	}
236	defer cleanup()
237
238	if err := populatePresidentsGraph(table); err != nil {
239		t.Fatal(err)
240	}
241
242	// Read a RowList
243	var elt []string
244	keys := RowList{"wmckinley", "gwashington", "j§adams"}
245	want := "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,wmckinley-tjefferson-1"
246	err = table.ReadRows(ctx, keys, func(r Row) bool {
247		for _, ris := range r {
248			for _, ri := range ris {
249				elt = append(elt, formatReadItem(ri))
250			}
251		}
252		return true
253	})
254	if err != nil {
255		t.Fatalf("read RowList: %v", err)
256	}
257
258	if got := strings.Join(elt, ","); got != want {
259		t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
260	}
261}
262
263func TestIntegration_DeleteRow(t *testing.T) {
264	ctx := context.Background()
265	_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
266	if err != nil {
267		t.Fatal(err)
268	}
269	defer cleanup()
270
271	if err := populatePresidentsGraph(table); err != nil {
272		t.Fatal(err)
273	}
274
275	// Delete a row and check it goes away.
276	mut := NewMutation()
277	mut.DeleteRow()
278	if err := table.Apply(ctx, "wmckinley", mut); err != nil {
279		t.Fatalf("Apply DeleteRow: %v", err)
280	}
281	row, err := table.ReadRow(ctx, "wmckinley")
282	if err != nil {
283		t.Fatalf("Reading a row after DeleteRow: %v", err)
284	}
285	if len(row) != 0 {
286		t.Fatalf("Read non-zero row after DeleteRow: %v", row)
287	}
288}
289
290func TestIntegration_ReadModifyWrite(t *testing.T) {
291	ctx := context.Background()
292	testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
293	if err != nil {
294		t.Fatal(err)
295	}
296	defer cleanup()
297
298	if err := populatePresidentsGraph(table); err != nil {
299		t.Fatal(err)
300	}
301
302	if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil {
303		t.Fatalf("Creating column family: %v", err)
304	}
305
306	appendRMW := func(b []byte) *ReadModifyWrite {
307		rmw := NewReadModifyWrite()
308		rmw.AppendValue("counter", "likes", b)
309		return rmw
310	}
311	incRMW := func(n int64) *ReadModifyWrite {
312		rmw := NewReadModifyWrite()
313		rmw.Increment("counter", "likes", n)
314		return rmw
315	}
316	rmwSeq := []struct {
317		desc string
318		rmw  *ReadModifyWrite
319		want []byte
320	}{
321		{
322			desc: "append #1",
323			rmw:  appendRMW([]byte{0, 0, 0}),
324			want: []byte{0, 0, 0},
325		},
326		{
327			desc: "append #2",
328			rmw:  appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17
329			want: []byte{0, 0, 0, 0, 0, 0, 0, 17},
330		},
331		{
332			desc: "increment",
333			rmw:  incRMW(8),
334			want: []byte{0, 0, 0, 0, 0, 0, 0, 25},
335		},
336	}
337	for _, step := range rmwSeq {
338		row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw)
339		if err != nil {
340			t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err)
341		}
342		verifyDirectPathRemoteAddress(testEnv, t)
343		// Make sure the modified cell returned by the RMW operation has a timestamp.
344		if row["counter"][0].Timestamp == 0 {
345			t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp)
346		}
347		clearTimestamps(row)
348		wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}}
349		if !testutil.Equal(row, wantRow) {
350			t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow)
351		}
352	}
353
354	// Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator.
355	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0}))
356	if err != nil {
357		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
358	}
359	verifyDirectPathRemoteAddress(testEnv, t)
360	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0}))
361	if err != nil {
362		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
363	}
364	verifyDirectPathRemoteAddress(testEnv, t)
365	// Get only the correct row back on read.
366	r, err := table.ReadRow(ctx, "issue-723-1")
367	if err != nil {
368		t.Fatalf("Reading row: %v", err)
369	}
370	verifyDirectPathRemoteAddress(testEnv, t)
371	if r.Key() != "issue-723-1" {
372		t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1")
373	}
374}
375
376func TestIntegration_ArbitraryTimestamps(t *testing.T) {
377	ctx := context.Background()
378	_, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
379	if err != nil {
380		t.Fatal(err)
381	}
382	defer cleanup()
383
384	// Test arbitrary timestamps more thoroughly.
385	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
386		t.Fatalf("Creating column family: %v", err)
387	}
388	const numVersions = 4
389	mut := NewMutation()
390	for i := 1; i < numVersions; i++ {
391		// Timestamps are used in thousands because the server
392		// only permits that granularity.
393		mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
394		mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
395	}
396	if err := table.Apply(ctx, "testrow", mut); err != nil {
397		t.Fatalf("Mutating row: %v", err)
398	}
399	r, err := table.ReadRow(ctx, "testrow")
400	if err != nil {
401		t.Fatalf("Reading row: %v", err)
402	}
403	wantRow := Row{"ts": []ReadItem{
404		// These should be returned in descending timestamp order.
405		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
406		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
407		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
408		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
409		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
410		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
411	}}
412	if !testutil.Equal(r, wantRow) {
413		t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow)
414	}
415
416	// Do the same read, but filter to the latest two versions.
417	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
418	if err != nil {
419		t.Fatalf("Reading row: %v", err)
420	}
421	wantRow = Row{"ts": []ReadItem{
422		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
423		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
424		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
425		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
426	}}
427	if !testutil.Equal(r, wantRow) {
428		t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
429	}
430	// Check cell offset / limit
431	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3)))
432	if err != nil {
433		t.Fatalf("Reading row: %v", err)
434	}
435	wantRow = Row{"ts": []ReadItem{
436		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
437		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
438		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
439	}}
440	if !testutil.Equal(r, wantRow) {
441		t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow)
442	}
443	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3)))
444	if err != nil {
445		t.Fatalf("Reading row: %v", err)
446	}
447	wantRow = Row{"ts": []ReadItem{
448		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
449		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
450		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
451	}}
452	if !testutil.Equal(r, wantRow) {
453		t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow)
454	}
455	// Check timestamp range filtering (with truncation)
456	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000)))
457	if err != nil {
458		t.Fatalf("Reading row: %v", err)
459	}
460	wantRow = Row{"ts": []ReadItem{
461		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
462		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
463		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
464		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
465	}}
466	if !testutil.Equal(r, wantRow) {
467		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow)
468	}
469	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0)))
470	if err != nil {
471		t.Fatalf("Reading row: %v", err)
472	}
473	wantRow = Row{"ts": []ReadItem{
474		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
475		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
476		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
477		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
478		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
479		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
480	}}
481	if !testutil.Equal(r, wantRow) {
482		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow)
483	}
484	// Delete non-existing cells, no such column family in this row
485	// Should not delete anything
486	if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil {
487		t.Fatalf("Creating column family: %v", err)
488	}
489	mut = NewMutation()
490	mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval
491	if err := table.Apply(ctx, "testrow", mut); err != nil {
492		t.Fatalf("Mutating row: %v", err)
493	}
494	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
495	if err != nil {
496		t.Fatalf("Reading row: %v", err)
497	}
498	if !testutil.Equal(r, wantRow) {
499		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
500	}
501	// Delete non-existing cells, no such column in this column family
502	// Should not delete anything
503	mut = NewMutation()
504	mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval
505	if err := table.Apply(ctx, "testrow", mut); err != nil {
506		t.Fatalf("Mutating row: %v", err)
507	}
508	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
509	if err != nil {
510		t.Fatalf("Reading row: %v", err)
511	}
512	if !testutil.Equal(r, wantRow) {
513		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
514	}
515	// Delete the cell with timestamp 2000 and repeat the last read,
516	// checking that we get ts 3000 and ts 1000.
517	mut = NewMutation()
518	mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval
519	if err := table.Apply(ctx, "testrow", mut); err != nil {
520		t.Fatalf("Mutating row: %v", err)
521	}
522	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
523	if err != nil {
524		t.Fatalf("Reading row: %v", err)
525	}
526	wantRow = Row{"ts": []ReadItem{
527		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
528		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
529		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
530		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
531	}}
532	if !testutil.Equal(r, wantRow) {
533		t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow)
534	}
535
536	// Check DeleteCellsInFamily
537	if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil {
538		t.Fatalf("Creating column family: %v", err)
539	}
540
541	mut = NewMutation()
542	mut.Set("status", "start", 2000, []byte("2"))
543	mut.Set("status", "end", 3000, []byte("3"))
544	mut.Set("ts", "col", 1000, []byte("1"))
545	if err := table.Apply(ctx, "row1", mut); err != nil {
546		t.Fatalf("Mutating row: %v", err)
547	}
548	if err := table.Apply(ctx, "row2", mut); err != nil {
549		t.Fatalf("Mutating row: %v", err)
550	}
551
552	mut = NewMutation()
553	mut.DeleteCellsInFamily("status")
554	if err := table.Apply(ctx, "row1", mut); err != nil {
555		t.Fatalf("Delete cf: %v", err)
556	}
557
558	// ColumnFamily removed
559	r, err = table.ReadRow(ctx, "row1")
560	if err != nil {
561		t.Fatalf("Reading row: %v", err)
562	}
563	wantRow = Row{"ts": []ReadItem{
564		{Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
565	}}
566	if !testutil.Equal(r, wantRow) {
567		t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow)
568	}
569
570	// ColumnFamily not removed
571	r, err = table.ReadRow(ctx, "row2")
572	if err != nil {
573		t.Fatalf("Reading row: %v", err)
574	}
575	wantRow = Row{
576		"ts": []ReadItem{
577			{Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
578		},
579		"status": []ReadItem{
580			{Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")},
581			{Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
582		},
583	}
584	if !testutil.Equal(r, wantRow) {
585		t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow)
586	}
587
588	// Check DeleteCellsInColumn
589	mut = NewMutation()
590	mut.Set("status", "start", 2000, []byte("2"))
591	mut.Set("status", "middle", 3000, []byte("3"))
592	mut.Set("status", "end", 1000, []byte("1"))
593	if err := table.Apply(ctx, "row3", mut); err != nil {
594		t.Fatalf("Mutating row: %v", err)
595	}
596	mut = NewMutation()
597	mut.DeleteCellsInColumn("status", "middle")
598	if err := table.Apply(ctx, "row3", mut); err != nil {
599		t.Fatalf("Delete column: %v", err)
600	}
601	r, err = table.ReadRow(ctx, "row3")
602	if err != nil {
603		t.Fatalf("Reading row: %v", err)
604	}
605	wantRow = Row{
606		"status": []ReadItem{
607			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
608			{Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
609		},
610	}
611	if !testutil.Equal(r, wantRow) {
612		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
613	}
614	mut = NewMutation()
615	mut.DeleteCellsInColumn("status", "start")
616	if err := table.Apply(ctx, "row3", mut); err != nil {
617		t.Fatalf("Delete column: %v", err)
618	}
619	r, err = table.ReadRow(ctx, "row3")
620	if err != nil {
621		t.Fatalf("Reading row: %v", err)
622	}
623	wantRow = Row{
624		"status": []ReadItem{
625			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
626		},
627	}
628	if !testutil.Equal(r, wantRow) {
629		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
630	}
631	mut = NewMutation()
632	mut.DeleteCellsInColumn("status", "end")
633	if err := table.Apply(ctx, "row3", mut); err != nil {
634		t.Fatalf("Delete column: %v", err)
635	}
636	r, err = table.ReadRow(ctx, "row3")
637	if err != nil {
638		t.Fatalf("Reading row: %v", err)
639	}
640	if len(r) != 0 {
641		t.Fatalf("Delete column: got %v, want empty row", r)
642	}
643	// Add same cell after delete
644	mut = NewMutation()
645	mut.Set("status", "end", 1000, []byte("1"))
646	if err := table.Apply(ctx, "row3", mut); err != nil {
647		t.Fatalf("Mutating row: %v", err)
648	}
649	r, err = table.ReadRow(ctx, "row3")
650	if err != nil {
651		t.Fatalf("Reading row: %v", err)
652	}
653	if !testutil.Equal(r, wantRow) {
654		t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow)
655	}
656}
657
658func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) {
659	ctx := context.Background()
660	_, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
661	if err != nil {
662		t.Fatal(err)
663	}
664	defer cleanup()
665
666	if err := populatePresidentsGraph(table); err != nil {
667		t.Fatal(err)
668	}
669
670	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
671		t.Fatalf("Creating column family: %v", err)
672	}
673
674	// Do highly concurrent reads/writes.
675	const maxConcurrency = 1000
676	var wg sync.WaitGroup
677	for i := 0; i < maxConcurrency; i++ {
678		wg.Add(1)
679		go func() {
680			defer wg.Done()
681			switch r := rand.Intn(100); { // r ∈ [0,100)
682			case 0 <= r && r < 30:
683				// Do a read.
684				_, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1)))
685				if err != nil {
686					t.Errorf("Concurrent read: %v", err)
687				}
688			case 30 <= r && r < 100:
689				// Do a write.
690				mut := NewMutation()
691				mut.Set("ts", "col", 1000, []byte("data"))
692				if err := table.Apply(ctx, "testrow", mut); err != nil {
693					t.Errorf("Concurrent write: %v", err)
694				}
695			}
696		}()
697	}
698	wg.Wait()
699}
700
701func TestIntegration_LargeReadsWritesAndScans(t *testing.T) {
702	ctx := context.Background()
703	testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
704	if err != nil {
705		t.Fatal(err)
706	}
707	defer cleanup()
708
709	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
710		t.Fatalf("Creating column family: %v", err)
711	}
712
713	bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set.
714	nonsense := []byte("lorem ipsum dolor sit amet, ")
715	fill(bigBytes, nonsense)
716	mut := NewMutation()
717	mut.Set("ts", "col", 1000, bigBytes)
718	if err := table.Apply(ctx, "bigrow", mut); err != nil {
719		t.Fatalf("Big write: %v", err)
720	}
721	verifyDirectPathRemoteAddress(testEnv, t)
722	r, err := table.ReadRow(ctx, "bigrow")
723	if err != nil {
724		t.Fatalf("Big read: %v", err)
725	}
726	verifyDirectPathRemoteAddress(testEnv, t)
727	wantRow := Row{"ts": []ReadItem{
728		{Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes},
729	}}
730	if !testutil.Equal(r, wantRow) {
731		t.Fatalf("Big read returned incorrect bytes: %v", r)
732	}
733
734	var wg sync.WaitGroup
735	// Now write 1000 rows, each with 82 KB values, then scan them all.
736	medBytes := make([]byte, 82<<10)
737	fill(medBytes, nonsense)
738	sem := make(chan int, 50) // do up to 50 mutations at a time.
739	for i := 0; i < 1000; i++ {
740		mut := NewMutation()
741		mut.Set("ts", "big-scan", 1000, medBytes)
742		row := fmt.Sprintf("row-%d", i)
743		wg.Add(1)
744		go func() {
745			defer wg.Done()
746			defer func() { <-sem }()
747			sem <- 1
748			if err := table.Apply(ctx, row, mut); err != nil {
749				t.Errorf("Preparing large scan: %v", err)
750			}
751			verifyDirectPathRemoteAddress(testEnv, t)
752		}()
753	}
754	wg.Wait()
755	n := 0
756	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
757		for _, ris := range r {
758			for _, ri := range ris {
759				n += len(ri.Value)
760			}
761		}
762		return true
763	}, RowFilter(ColumnFilter("big-scan")))
764	if err != nil {
765		t.Fatalf("Doing large scan: %v", err)
766	}
767	verifyDirectPathRemoteAddress(testEnv, t)
768	if want := 1000 * len(medBytes); n != want {
769		t.Fatalf("Large scan returned %d bytes, want %d", n, want)
770	}
771	// Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption.
772	rc := 0
773	wantRc := 3
774	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
775		rc++
776		return true
777	}, LimitRows(int64(wantRc)))
778	if err != nil {
779		t.Fatal(err)
780	}
781	verifyDirectPathRemoteAddress(testEnv, t)
782	if rc != wantRc {
783		t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc)
784	}
785
786	// Test bulk mutations
787	if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil {
788		t.Fatalf("Creating column family: %v", err)
789	}
790	bulkData := map[string][]string{
791		"red sox":  {"2004", "2007", "2013"},
792		"patriots": {"2001", "2003", "2004", "2014"},
793		"celtics":  {"1981", "1984", "1986", "2008"},
794	}
795	var rowKeys []string
796	var muts []*Mutation
797	for row, ss := range bulkData {
798		mut := NewMutation()
799		for _, name := range ss {
800			mut.Set("bulk", name, 1000, []byte("1"))
801		}
802		rowKeys = append(rowKeys, row)
803		muts = append(muts, mut)
804	}
805	status, err := table.ApplyBulk(ctx, rowKeys, muts)
806	if err != nil {
807		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
808	}
809	verifyDirectPathRemoteAddress(testEnv, t)
810	if status != nil {
811		t.Fatalf("non-nil errors: %v", err)
812	}
813
814	// Read each row back
815	for rowKey, ss := range bulkData {
816		row, err := table.ReadRow(ctx, rowKey)
817		if err != nil {
818			t.Fatalf("Reading a bulk row: %v", err)
819		}
820		verifyDirectPathRemoteAddress(testEnv, t)
821		var wantItems []ReadItem
822		for _, val := range ss {
823			wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")})
824		}
825		wantRow := Row{"bulk": wantItems}
826		if !testutil.Equal(row, wantRow) {
827			t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
828		}
829	}
830
831	// Test bulk write errors.
832	// Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error.
833	badMut := NewMutation()
834	badMut.Set("badfamily", "col", ServerTime, nil)
835	badMut2 := NewMutation()
836	badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1"))
837	status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2})
838	if err != nil {
839		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
840	}
841	verifyDirectPathRemoteAddress(testEnv, t)
842	if status == nil {
843		t.Fatalf("No errors for bad bulk mutation")
844	} else if status[0] == nil || status[1] == nil {
845		t.Fatalf("No error for bad bulk mutation")
846	}
847}
848
849func TestIntegration_Read(t *testing.T) {
850	ctx := context.Background()
851	testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
852	if err != nil {
853		t.Fatal(err)
854	}
855	defer cleanup()
856
857	// Insert some data.
858	initialData := map[string][]string{
859		"wmckinley":   {"tjefferson"},
860		"gwashington": {"j§adams"},
861		"tjefferson":  {"gwashington", "j§adams", "wmckinley"},
862		"j§adams":     {"gwashington", "tjefferson"},
863	}
864	for row, ss := range initialData {
865		mut := NewMutation()
866		for _, name := range ss {
867			mut.Set("follows", name, 1000, []byte("1"))
868		}
869		if err := table.Apply(ctx, row, mut); err != nil {
870			t.Fatalf("Mutating row %q: %v", row, err)
871		}
872		verifyDirectPathRemoteAddress(testEnv, t)
873	}
874
875	for _, test := range []struct {
876		desc   string
877		rr     RowSet
878		filter Filter     // may be nil
879		limit  ReadOption // may be nil
880
881		// We do the read, grab all the cells, turn them into "<row>-<col>-<val>",
882		// and join with a comma.
883		want       string
884		wantLabels []string
885	}{
886		{
887			desc: "read all, unfiltered",
888			rr:   RowRange{},
889			want: "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
890		},
891		{
892			desc: "read with InfiniteRange, unfiltered",
893			rr:   InfiniteRange("tjefferson"),
894			want: "tjefferson-gwashington-1,tjefferson-j§adams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
895		},
896		{
897			desc: "read with NewRange, unfiltered",
898			rr:   NewRange("gargamel", "hubbard"),
899			want: "gwashington-j§adams-1",
900		},
901		{
902			desc: "read with PrefixRange, unfiltered",
903			rr:   PrefixRange("j§ad"),
904			want: "j§adams-gwashington-1,j§adams-tjefferson-1",
905		},
906		{
907			desc: "read with SingleRow, unfiltered",
908			rr:   SingleRow("wmckinley"),
909			want: "wmckinley-tjefferson-1",
910		},
911		{
912			desc:   "read all, with ColumnFilter",
913			rr:     RowRange{},
914			filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson"
915			want:   "gwashington-j§adams-1,j§adams-tjefferson-1,tjefferson-j§adams-1,wmckinley-tjefferson-1",
916		},
917		{
918			desc:   "read all, with ColumnFilter, prefix",
919			rr:     RowRange{},
920			filter: ColumnFilter("j"), // no matches
921			want:   "",
922		},
923		{
924			desc:   "read range, with ColumnRangeFilter",
925			rr:     RowRange{},
926			filter: ColumnRangeFilter("follows", "h", "k"),
927			want:   "gwashington-j§adams-1,tjefferson-j§adams-1",
928		},
929		{
930			desc:   "read range from empty, with ColumnRangeFilter",
931			rr:     RowRange{},
932			filter: ColumnRangeFilter("follows", "", "u"),
933			want:   "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,wmckinley-tjefferson-1",
934		},
935		{
936			desc:   "read range from start to empty, with ColumnRangeFilter",
937			rr:     RowRange{},
938			filter: ColumnRangeFilter("follows", "h", ""),
939			want:   "gwashington-j§adams-1,j§adams-tjefferson-1,tjefferson-j§adams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
940		},
941		{
942			desc:   "read with RowKeyFilter",
943			rr:     RowRange{},
944			filter: RowKeyFilter(".*wash.*"),
945			want:   "gwashington-j§adams-1",
946		},
947		{
948			desc:   "read with RowKeyFilter unicode",
949			rr:     RowRange{},
950			filter: RowKeyFilter(".*j§.*"),
951			want:   "j§adams-gwashington-1,j§adams-tjefferson-1",
952		},
953		{
954			desc:   "read with RowKeyFilter escaped",
955			rr:     RowRange{},
956			filter: RowKeyFilter(`.*j\xC2\xA7.*`),
957			want:   "j§adams-gwashington-1,j§adams-tjefferson-1",
958		},
959		{
960			desc:   "read with RowKeyFilter, prefix",
961			rr:     RowRange{},
962			filter: RowKeyFilter("gwash"),
963			want:   "",
964		},
965		{
966			desc:   "read with RowKeyFilter, no matches",
967			rr:     RowRange{},
968			filter: RowKeyFilter(".*xxx.*"),
969			want:   "",
970		},
971		{
972			desc:   "read with FamilyFilter, no matches",
973			rr:     RowRange{},
974			filter: FamilyFilter(".*xxx.*"),
975			want:   "",
976		},
977		{
978			desc:   "read with ColumnFilter + row limit",
979			rr:     RowRange{},
980			filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson"
981			limit:  LimitRows(2),
982			want:   "gwashington-j§adams-1,j§adams-tjefferson-1",
983		},
984		{
985			desc:       "apply labels to the result rows",
986			rr:         RowRange{},
987			filter:     LabelFilter("test-label"),
988			limit:      LimitRows(2),
989			want:       "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1",
990			wantLabels: []string{"test-label", "test-label", "test-label"},
991		},
992		{
993			desc:   "read all, strip values",
994			rr:     RowRange{},
995			filter: StripValueFilter(),
996			want:   "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
997		},
998		{
999			desc:   "read with ColumnFilter + row limit + strip values",
1000			rr:     RowRange{},
1001			filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "j§adams" and "tjefferson"
1002			limit:  LimitRows(2),
1003			want:   "gwashington-j§adams-,j§adams-tjefferson-",
1004		},
1005		{
1006			desc:   "read with condition, strip values on true",
1007			rr:     RowRange{},
1008			filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil),
1009			want:   "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
1010		},
1011		{
1012			desc:   "read with condition, strip values on false",
1013			rr:     RowRange{},
1014			filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()),
1015			want:   "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
1016		},
1017		{
1018			desc:   "read with ValueRangeFilter + row limit",
1019			rr:     RowRange{},
1020			filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1"
1021			limit:  LimitRows(2),
1022			want:   "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1",
1023		},
1024		{
1025			desc:   "read with ValueRangeFilter, no match on exclusive end",
1026			rr:     RowRange{},
1027			filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match
1028			want:   "",
1029		},
1030		{
1031			desc:   "read with ValueRangeFilter, no matches",
1032			rr:     RowRange{},
1033			filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing
1034			want:   "",
1035		},
1036		{
1037			desc:   "read with InterleaveFilter, no matches on all filters",
1038			rr:     RowRange{},
1039			filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")),
1040			want:   "",
1041		},
1042		{
1043			desc:   "read with InterleaveFilter, no duplicate cells",
1044			rr:     RowRange{},
1045			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")),
1046			want:   "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,wmckinley-tjefferson-1",
1047		},
1048		{
1049			desc:   "read with InterleaveFilter, with duplicate cells",
1050			rr:     RowRange{},
1051			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")),
1052			want:   "j§adams-gwashington-1,j§adams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1",
1053		},
1054		{
1055			desc: "read with a RowRangeList and no filter",
1056			rr:   RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")},
1057			want: "gwashington-j§adams-1,wmckinley-tjefferson-1",
1058		},
1059		{
1060			desc:   "chain that excludes rows and matches nothing, in a condition",
1061			rr:     RowRange{},
1062			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil),
1063			want:   "",
1064		},
1065		{
1066			desc:   "chain that ends with an interleave that has no match. covers #804",
1067			rr:     RowRange{},
1068			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil),
1069			want:   "",
1070		},
1071	} {
1072		t.Run(test.desc, func(t *testing.T) {
1073			var opts []ReadOption
1074			if test.filter != nil {
1075				opts = append(opts, RowFilter(test.filter))
1076			}
1077			if test.limit != nil {
1078				opts = append(opts, test.limit)
1079			}
1080			var elt, labels []string
1081			err := table.ReadRows(ctx, test.rr, func(r Row) bool {
1082				for _, ris := range r {
1083					for _, ri := range ris {
1084						labels = append(labels, ri.Labels...)
1085						elt = append(elt, formatReadItem(ri))
1086					}
1087				}
1088				return true
1089			}, opts...)
1090			if err != nil {
1091				t.Fatal(err)
1092			}
1093			verifyDirectPathRemoteAddress(testEnv, t)
1094			if got := strings.Join(elt, ","); got != test.want {
1095				t.Fatalf("got %q\nwant %q", got, test.want)
1096			}
1097			if got, want := labels, test.wantLabels; !reflect.DeepEqual(got, want) {
1098				t.Fatalf("got %q\nwant %q", got, want)
1099			}
1100		})
1101	}
1102}
1103
1104func TestIntegration_SampleRowKeys(t *testing.T) {
1105	ctx := context.Background()
1106	testEnv, client, adminClient, _, _, cleanup, err := setupIntegration(ctx, t)
1107	if err != nil {
1108		t.Fatal(err)
1109	}
1110	defer cleanup()
1111
1112	presplitTable := fmt.Sprintf("presplit-table-%d", time.Now().Unix())
1113	if err := adminClient.CreatePresplitTable(ctx, presplitTable, []string{"follows"}); err != nil {
1114		t.Fatal(err)
1115	}
1116	defer adminClient.DeleteTable(ctx, presplitTable)
1117
1118	if err := adminClient.CreateColumnFamily(ctx, presplitTable, "follows"); err != nil {
1119		t.Fatal(err)
1120	}
1121
1122	table := client.Open(presplitTable)
1123
1124	// Insert some data.
1125	initialData := map[string][]string{
1126		"wmckinley11":   {"tjefferson11"},
1127		"gwashington77": {"j§adams77"},
1128		"tjefferson0":   {"gwashington0", "j§adams0"},
1129	}
1130
1131	for row, ss := range initialData {
1132		mut := NewMutation()
1133		for _, name := range ss {
1134			mut.Set("follows", name, 1000, []byte("1"))
1135		}
1136		if err := table.Apply(ctx, row, mut); err != nil {
1137			t.Fatalf("Mutating row %q: %v", row, err)
1138		}
1139		verifyDirectPathRemoteAddress(testEnv, t)
1140	}
1141	sampleKeys, err := table.SampleRowKeys(context.Background())
1142	if err != nil {
1143		t.Fatalf("%s: %v", "SampleRowKeys:", err)
1144	}
1145	if len(sampleKeys) == 0 {
1146		t.Error("SampleRowKeys length 0")
1147	}
1148}
1149
1150func TestIntegration_Admin(t *testing.T) {
1151	testEnv, err := NewIntegrationEnv()
1152	if err != nil {
1153		t.Fatalf("IntegrationEnv: %v", err)
1154	}
1155	defer testEnv.Close()
1156
1157	timeout := 2 * time.Second
1158	if testEnv.Config().UseProd {
1159		timeout = 5 * time.Minute
1160	}
1161	ctx, _ := context.WithTimeout(context.Background(), timeout)
1162
1163	adminClient, err := testEnv.NewAdminClient()
1164	if err != nil {
1165		t.Fatalf("NewAdminClient: %v", err)
1166	}
1167	defer adminClient.Close()
1168
1169	iAdminClient, err := testEnv.NewInstanceAdminClient()
1170	if err != nil {
1171		t.Fatalf("NewInstanceAdminClient: %v", err)
1172	}
1173	if iAdminClient != nil {
1174		defer iAdminClient.Close()
1175		iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1176		if err != nil {
1177			t.Errorf("InstanceInfo: %v", err)
1178		}
1179		if iInfo.Name != adminClient.instance {
1180			t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1181		}
1182	}
1183
1184	list := func() []string {
1185		tbls, err := adminClient.Tables(ctx)
1186		if err != nil {
1187			t.Fatalf("Fetching list of tables: %v", err)
1188		}
1189		sort.Strings(tbls)
1190		return tbls
1191	}
1192	containsAll := func(got, want []string) bool {
1193		gotSet := make(map[string]bool)
1194
1195		for _, s := range got {
1196			gotSet[s] = true
1197		}
1198		for _, s := range want {
1199			if !gotSet[s] {
1200				return false
1201			}
1202		}
1203		return true
1204	}
1205
1206	defer deleteTable(ctx, t, adminClient, myTableName)
1207
1208	if err := adminClient.CreateTable(ctx, myTableName); err != nil {
1209		t.Fatalf("Creating table: %v", err)
1210	}
1211
1212	defer deleteTable(ctx, t, adminClient, myOtherTableName)
1213
1214	if err := adminClient.CreateTable(ctx, myOtherTableName); err != nil {
1215		t.Fatalf("Creating table: %v", err)
1216	}
1217
1218	if got, want := list(), []string{myOtherTableName, myTableName}; !containsAll(got, want) {
1219		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1220	}
1221
1222	must(adminClient.WaitForReplication(ctx, myTableName))
1223
1224	if err := adminClient.DeleteTable(ctx, myOtherTableName); err != nil {
1225		t.Fatalf("Deleting table: %v", err)
1226	}
1227	tables := list()
1228	if got, want := tables, []string{myTableName}; !containsAll(got, want) {
1229		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1230	}
1231	if got, unwanted := tables, []string{myOtherTableName}; containsAll(got, unwanted) {
1232		t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted)
1233	}
1234
1235	tblConf := TableConf{
1236		TableID: "conftable",
1237		Families: map[string]GCPolicy{
1238			"fam1": MaxVersionsPolicy(1),
1239			"fam2": MaxVersionsPolicy(2),
1240		},
1241	}
1242	if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
1243		t.Fatalf("Creating table from TableConf: %v", err)
1244	}
1245	defer deleteTable(ctx, t, adminClient, tblConf.TableID)
1246
1247	tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID)
1248	if err != nil {
1249		t.Fatalf("Getting table info: %v", err)
1250	}
1251	sort.Strings(tblInfo.Families)
1252	wantFams := []string{"fam1", "fam2"}
1253	if !testutil.Equal(tblInfo.Families, wantFams) {
1254		t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams)
1255	}
1256
1257	// Populate mytable and drop row ranges
1258	if err = adminClient.CreateColumnFamily(ctx, myTableName, "cf"); err != nil {
1259		t.Fatalf("Creating column family: %v", err)
1260	}
1261
1262	client, err := testEnv.NewClient()
1263	if err != nil {
1264		t.Fatalf("NewClient: %v", err)
1265	}
1266	defer client.Close()
1267
1268	tbl := client.Open(myTableName)
1269
1270	prefixes := []string{"a", "b", "c"}
1271	for _, prefix := range prefixes {
1272		for i := 0; i < 5; i++ {
1273			mut := NewMutation()
1274			mut.Set("cf", "col", 1000, []byte("1"))
1275			if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil {
1276				t.Fatalf("Mutating row: %v", err)
1277			}
1278		}
1279	}
1280
1281	if err = adminClient.DropRowRange(ctx, myTableName, "a"); err != nil {
1282		t.Errorf("DropRowRange a: %v", err)
1283	}
1284	if err = adminClient.DropRowRange(ctx, myTableName, "c"); err != nil {
1285		t.Errorf("DropRowRange c: %v", err)
1286	}
1287	if err = adminClient.DropRowRange(ctx, myTableName, "x"); err != nil {
1288		t.Errorf("DropRowRange x: %v", err)
1289	}
1290
1291	var gotRowCount int
1292	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1293		gotRowCount++
1294		if !strings.HasPrefix(row.Key(), "b") {
1295			t.Errorf("Invalid row after dropping range: %v", row)
1296		}
1297		return true
1298	}))
1299	if gotRowCount != 5 {
1300		t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5)
1301	}
1302
1303	if err = adminClient.DropAllRows(ctx, myTableName); err != nil {
1304		t.Errorf("DropAllRows mytable: %v", err)
1305	}
1306
1307	gotRowCount = 0
1308	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1309		gotRowCount++
1310		return true
1311	}))
1312	if gotRowCount != 0 {
1313		t.Errorf("Invalid row count after truncating table: got %v, want %v", gotRowCount, 0)
1314	}
1315
1316	// Validate Encryption Info configured to default. (not supported by emulator)
1317	if testEnv.Config().UseProd {
1318		encryptionInfo, err := adminClient.EncryptionInfo(ctx, myTableName)
1319		if err != nil {
1320			t.Fatalf("EncryptionInfo: %v", err)
1321		}
1322		if got, want := len(encryptionInfo), 1; !cmp.Equal(got, want) {
1323			t.Fatalf("Number of Clusters with Encryption Info: %v, want: %v", got, want)
1324		}
1325
1326		clusterEncryptionInfo := encryptionInfo[testEnv.Config().Cluster][0]
1327		if clusterEncryptionInfo.KMSKeyVersion != "" {
1328			t.Errorf("Encryption Info mismatch, got %v, want %v", clusterEncryptionInfo.KMSKeyVersion, 0)
1329		}
1330		if clusterEncryptionInfo.Type != GoogleDefaultEncryption {
1331			t.Errorf("Encryption Info mismatch, got %v, want %v", clusterEncryptionInfo.Type, GoogleDefaultEncryption)
1332		}
1333	}
1334
1335}
1336
1337func TestIntegration_TableIam(t *testing.T) {
1338	testEnv, err := NewIntegrationEnv()
1339	if err != nil {
1340		t.Fatalf("IntegrationEnv: %v", err)
1341	}
1342	defer testEnv.Close()
1343
1344	if !testEnv.Config().UseProd {
1345		t.Skip("emulator doesn't support IAM Policy creation")
1346	}
1347
1348	timeout := 5 * time.Minute
1349	ctx, _ := context.WithTimeout(context.Background(), timeout)
1350
1351	adminClient, err := testEnv.NewAdminClient()
1352	if err != nil {
1353		t.Fatalf("NewAdminClient: %v", err)
1354	}
1355	defer adminClient.Close()
1356
1357	defer deleteTable(ctx, t, adminClient, myTableName)
1358	if err := adminClient.CreateTable(ctx, myTableName); err != nil {
1359		t.Fatalf("Creating table: %v", err)
1360	}
1361
1362	// Verify that the IAM Controls work for Tables.
1363	iamHandle := adminClient.TableIAM(myTableName)
1364	p, err := iamHandle.Policy(ctx)
1365	if err != nil {
1366		t.Fatalf("Iam GetPolicy mytable: %v", err)
1367	}
1368	if err = iamHandle.SetPolicy(ctx, p); err != nil {
1369		t.Errorf("Iam SetPolicy mytable: %v", err)
1370	}
1371	if _, err = iamHandle.TestPermissions(ctx, []string{"bigtable.tables.get"}); err != nil {
1372		t.Errorf("Iam TestPermissions mytable: %v", err)
1373	}
1374}
1375
1376func TestIntegration_BackupIAM(t *testing.T) {
1377	testEnv, err := NewIntegrationEnv()
1378	if err != nil {
1379		t.Fatalf("IntegrationEnv: %v", err)
1380	}
1381	defer testEnv.Close()
1382
1383	if !testEnv.Config().UseProd {
1384		t.Skip("emulator doesn't support IAM Policy creation")
1385	}
1386	timeout := 5 * time.Minute
1387	ctx, _ := context.WithTimeout(context.Background(), timeout)
1388
1389	adminClient, err := testEnv.NewAdminClient()
1390	if err != nil {
1391		t.Fatalf("NewAdminClient: %v", err)
1392	}
1393	defer adminClient.Close()
1394
1395	table := testEnv.Config().Table
1396	cluster := testEnv.Config().Cluster
1397
1398	defer deleteTable(ctx, t, adminClient, table)
1399	if err := adminClient.CreateTable(ctx, table); err != nil {
1400		t.Fatalf("Creating table: %v", err)
1401	}
1402	// Create backup.
1403	backup := "backup"
1404	defer adminClient.DeleteBackup(ctx, cluster, backup)
1405	if err = adminClient.CreateBackup(ctx, table, cluster, backup, time.Now().Add(8*time.Hour)); err != nil {
1406		t.Fatalf("Creating backup: %v", err)
1407	}
1408	iamHandle := adminClient.BackupIAM(cluster, backup)
1409	// Get backup policy.
1410	p, err := iamHandle.Policy(ctx)
1411	if err != nil {
1412		t.Errorf("iamHandle.Policy: %v", err)
1413	}
1414	// The resource is new, so the policy should be empty.
1415	if got := p.Roles(); len(got) > 0 {
1416		t.Errorf("got roles %v, want none", got)
1417	}
1418	// Set backup policy.
1419	member := "domain:google.com"
1420	// Add a member, set the policy, then check that the member is present.
1421	p.Add(member, iam.Viewer)
1422	if err = iamHandle.SetPolicy(ctx, p); err != nil {
1423		t.Errorf("iamHandle.SetPolicy: %v", err)
1424	}
1425	p, err = iamHandle.Policy(ctx)
1426	if err != nil {
1427		t.Errorf("iamHandle.Policy: %v", err)
1428	}
1429	if got, want := p.Members(iam.Viewer), []string{member}; !testutil.Equal(got, want) {
1430		t.Errorf("iamHandle.Policy: got %v, want %v", got, want)
1431	}
1432	// Test backup permissions.
1433	permissions := []string{"bigtable.backups.get", "bigtable.backups.update"}
1434	_, err = iamHandle.TestPermissions(ctx, permissions)
1435	if err != nil {
1436		t.Errorf("iamHandle.TestPermissions: %v", err)
1437	}
1438}
1439
1440func TestIntegration_AdminCreateInstance(t *testing.T) {
1441	if instanceToCreate == "" {
1442		t.Skip("instanceToCreate not set, skipping instance creation testing")
1443	}
1444
1445	testEnv, err := NewIntegrationEnv()
1446	if err != nil {
1447		t.Fatalf("IntegrationEnv: %v", err)
1448	}
1449	defer testEnv.Close()
1450
1451	if !testEnv.Config().UseProd {
1452		t.Skip("emulator doesn't support instance creation")
1453	}
1454
1455	timeout := 5 * time.Minute
1456	ctx, _ := context.WithTimeout(context.Background(), timeout)
1457
1458	iAdminClient, err := testEnv.NewInstanceAdminClient()
1459	if err != nil {
1460		t.Fatalf("NewInstanceAdminClient: %v", err)
1461	}
1462	defer iAdminClient.Close()
1463
1464	clusterID := instanceToCreate + "-cluster"
1465
1466	// Create a development instance
1467	conf := &InstanceConf{
1468		InstanceId:   instanceToCreate,
1469		ClusterId:    clusterID,
1470		DisplayName:  "test instance",
1471		Zone:         instanceToCreateZone,
1472		InstanceType: DEVELOPMENT,
1473		Labels:       map[string]string{"test-label-key": "test-label-value"},
1474	}
1475	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1476		t.Fatalf("CreateInstance: %v", err)
1477	}
1478
1479	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1480
1481	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1482	if err != nil {
1483		t.Fatalf("InstanceInfo: %v", err)
1484	}
1485
1486	// Basic return values are tested elsewhere, check instance type
1487	if iInfo.InstanceType != DEVELOPMENT {
1488		t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType)
1489	}
1490	if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
1491		t.Fatalf("Labels: %v, want: %v", got, want)
1492	}
1493
1494	// Update everything we can about the instance in one call.
1495	confWithClusters := &InstanceWithClustersConfig{
1496		InstanceID:   instanceToCreate,
1497		DisplayName:  "new display name",
1498		InstanceType: PRODUCTION,
1499		Labels:       map[string]string{"new-label-key": "new-label-value"},
1500		Clusters: []ClusterConfig{
1501			{ClusterID: clusterID, NumNodes: 5},
1502		},
1503	}
1504
1505	if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil {
1506		t.Fatalf("UpdateInstanceWithClusters: %v", err)
1507	}
1508
1509	iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
1510	if err != nil {
1511		t.Fatalf("InstanceInfo: %v", err)
1512	}
1513
1514	if iInfo.InstanceType != PRODUCTION {
1515		t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType)
1516	}
1517	if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) {
1518		t.Fatalf("Labels: %v, want: %v", got, want)
1519	}
1520	if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
1521		t.Fatalf("Display name: %q, want: %q", got, want)
1522	}
1523
1524	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1525	if err != nil {
1526		t.Fatalf("GetCluster: %v", err)
1527	}
1528
1529	if cInfo.ServeNodes != 5 {
1530		t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
1531	}
1532
1533	if cInfo.KMSKeyName != "" {
1534		t.Fatalf("KMSKeyName: %v, want: %v", cInfo.KMSKeyName, "")
1535	}
1536}
1537
1538func TestIntegration_AdminEncryptionInfo(t *testing.T) {
1539	t.Skip("https://github.com/googleapis/google-cloud-go/issues/4173")
1540	if instanceToCreate == "" {
1541		t.Skip("instanceToCreate not set, skipping instance creation testing")
1542	}
1543
1544	testEnv, err := NewIntegrationEnv()
1545	if err != nil {
1546		t.Fatalf("IntegrationEnv: %v", err)
1547	}
1548	defer testEnv.Close()
1549
1550	if !testEnv.Config().UseProd {
1551		t.Skip("emulator doesn't support instance creation")
1552	}
1553
1554	// adjust test environment to use our cluster to create
1555	c := testEnv.Config()
1556	c.Instance = instanceToCreate
1557	testEnv, err = NewProdEnv(c)
1558	if err != nil {
1559		t.Fatalf("NewProdEnv: %v", err)
1560	}
1561
1562	timeout := 5 * time.Minute
1563	ctx, cancel := context.WithTimeout(context.Background(), timeout)
1564	defer cancel()
1565
1566	iAdminClient, err := testEnv.NewInstanceAdminClient()
1567	if err != nil {
1568		t.Fatalf("NewInstanceAdminClient: %v", err)
1569	}
1570	defer iAdminClient.Close()
1571
1572	adminClient, err := testEnv.NewAdminClient()
1573	if err != nil {
1574		t.Fatalf("NewAdminClient: %v", err)
1575	}
1576	defer adminClient.Close()
1577
1578	table := instanceToCreate + "-table"
1579	clusterID := instanceToCreate + "-cluster"
1580
1581	keyRingName := os.Getenv("GCLOUD_TESTS_BIGTABLE_KEYRING")
1582	if keyRingName == "" {
1583		// try to fall back on GOLANG keyring
1584		keyRingName = os.Getenv("GCLOUD_TESTS_GOLANG_KEYRING")
1585		if keyRingName == "" {
1586			t.Fatal("GCLOUD_TESTS_BIGTABLE_KEYRING or GCLOUD_TESTS_GOLANG_KEYRING must be set. See CONTRIBUTING.md for details")
1587		}
1588	}
1589	kmsKeyName := keyRingName + "/cryptoKeys/key1"
1590
1591	conf := &InstanceWithClustersConfig{
1592		InstanceID:  instanceToCreate,
1593		DisplayName: "test instance",
1594		Clusters: []ClusterConfig{
1595			{
1596				ClusterID:  clusterID,
1597				KMSKeyName: kmsKeyName,
1598				Zone:       instanceToCreateZone,
1599				NumNodes:   1,
1600			},
1601		},
1602	}
1603	if err := iAdminClient.CreateInstanceWithClusters(ctx, conf); err != nil {
1604		t.Fatalf("CreateInstance: %v", err)
1605	}
1606	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1607
1608	// Delete the table at the end of the test. Schedule ahead of time
1609	// in case the client fails
1610	defer deleteTable(ctx, t, adminClient, table)
1611	if err := adminClient.CreateTable(ctx, table); err != nil {
1612		t.Fatalf("Creating table: %v", err)
1613	}
1614
1615	encryptionKeyVersion := kmsKeyName + "/cryptoKeyVersions/1"
1616
1617	// The encryption info can take 30-300s (currently about 120-190s) to
1618	// become ready.
1619	for i := 0; i < 30; i++ {
1620		encryptionInfo, err := adminClient.EncryptionInfo(ctx, table)
1621		if err != nil {
1622			t.Fatalf("EncryptionInfo: %v", err)
1623		}
1624
1625		kmsKeyVersion := encryptionInfo[clusterID][0].KMSKeyVersion
1626		if kmsKeyVersion != "" {
1627			break
1628		}
1629
1630		time.Sleep(time.Second * 10)
1631	}
1632
1633	// Validate Encryption Info under getTable
1634	table2, err := adminClient.getTable(ctx, table, btapb.Table_ENCRYPTION_VIEW)
1635	if err != nil {
1636		t.Fatalf("Getting Table: %v", err)
1637	}
1638	if got, want := len(table2.ClusterStates), 1; !cmp.Equal(got, want) {
1639		t.Fatalf("Table Cluster States %v, want: %v", got, want)
1640	}
1641	clusterState := table2.ClusterStates[clusterID]
1642	if got, want := len(clusterState.EncryptionInfo), 1; !cmp.Equal(got, want) {
1643		t.Fatalf("Table Encryption Info Length: %v, want: %v", got, want)
1644	}
1645	tableEncInfo := clusterState.EncryptionInfo[0]
1646	if got, want := int(tableEncInfo.EncryptionStatus.Code), 0; !cmp.Equal(got, want) {
1647		t.Fatalf("EncryptionStatus: %v, want: %v", got, want)
1648	}
1649	// NOTE: this EncryptionType is btapb.EncryptionInfo_EncryptionType
1650	if got, want := tableEncInfo.EncryptionType, btapb.EncryptionInfo_CUSTOMER_MANAGED_ENCRYPTION; !cmp.Equal(got, want) {
1651		t.Fatalf("EncryptionType: %v, want: %v", got, want)
1652	}
1653	if got, want := tableEncInfo.KmsKeyVersion, encryptionKeyVersion; !cmp.Equal(got, want) {
1654		t.Fatalf("KMS Key Version: %v, want: %v", got, want)
1655	}
1656
1657	// Validate Encyrption Info retrieved via EncryptionInfo
1658	encryptionInfo, err := adminClient.EncryptionInfo(ctx, table)
1659	if err != nil {
1660		t.Fatalf("EncryptionInfo: %v", err)
1661	}
1662	if got, want := len(encryptionInfo), 1; !cmp.Equal(got, want) {
1663		t.Fatalf("Number of Clusters with Encryption Info: %v, want: %v", got, want)
1664	}
1665	encryptionInfos := encryptionInfo[clusterID]
1666	if got, want := len(encryptionInfos), 1; !cmp.Equal(got, want) {
1667		t.Fatalf("Encryption Info Length: %v, want: %v", got, want)
1668	}
1669	if len(encryptionInfos) != 1 {
1670		t.Fatalf("Expected Single EncryptionInfo")
1671	}
1672	v := encryptionInfos[0]
1673	if got, want := int(v.Status.Code), 0; !cmp.Equal(got, want) {
1674		t.Fatalf("EncryptionStatus: %v, want: %v", got, want)
1675	}
1676	// NOTE: this EncryptionType is EncryptionType
1677	if got, want := v.Type, CustomerManagedEncryption; !cmp.Equal(got, want) {
1678		t.Fatalf("EncryptionType: %v, want: %v", got, want)
1679	}
1680	if got, want := v.KMSKeyVersion, encryptionKeyVersion; !cmp.Equal(got, want) {
1681		t.Fatalf("KMS Key Version: %v, want: %v", got, want)
1682	}
1683
1684	// Validate CMEK on Cluster Info
1685	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1686	if err != nil {
1687		t.Fatalf("GetCluster: %v", err)
1688	}
1689
1690	if got, want := cInfo.KMSKeyName, kmsKeyName; !cmp.Equal(got, want) {
1691		t.Fatalf("KMSKeyName: %v, want: %v", got, want)
1692	}
1693
1694	// Create a backup with CMEK enabled, verify backup encryption info
1695	backupName := "backupCMEK"
1696	defer adminClient.DeleteBackup(ctx, clusterID, backupName)
1697	if err = adminClient.CreateBackup(ctx, table, clusterID, backupName, time.Now().Add(8*time.Hour)); err != nil {
1698		t.Fatalf("Creating backup: %v", err)
1699	}
1700	backup, err := adminClient.BackupInfo(ctx, clusterID, backupName)
1701	if err != nil {
1702		t.Fatalf("BackupInfo: %v", backup)
1703	}
1704
1705	if got, want := backup.EncryptionInfo.Type, CustomerManagedEncryption; !cmp.Equal(got, want) {
1706		t.Fatalf("Backup Encryption EncryptionType: %v, want: %v", got, want)
1707	}
1708	if got, want := backup.EncryptionInfo.KMSKeyVersion, encryptionKeyVersion; !cmp.Equal(got, want) {
1709		t.Fatalf("Backup Encryption KMSKeyVersion: %v, want: %v", got, want)
1710	}
1711	if got, want := int(backup.EncryptionInfo.Status.Code), 2; !cmp.Equal(got, want) {
1712		t.Fatalf("Backup EncryptionStatus: %v, want: %v", got, want)
1713	}
1714}
1715
1716func TestIntegration_AdminUpdateInstanceLabels(t *testing.T) {
1717	// Check the environments
1718	if instanceToCreate == "" {
1719		t.Skip("instanceToCreate not set, skipping instance creation testing")
1720	}
1721	testEnv, err := NewIntegrationEnv()
1722	if err != nil {
1723		t.Fatalf("IntegrationEnv: %v", err)
1724	}
1725	defer testEnv.Close()
1726	if !testEnv.Config().UseProd {
1727		t.Skip("emulator doesn't support instance creation")
1728	}
1729
1730	// Create an instance admin client
1731	timeout := 5 * time.Minute
1732	ctx, _ := context.WithTimeout(context.Background(), timeout)
1733	iAdminClient, err := testEnv.NewInstanceAdminClient()
1734	if err != nil {
1735		t.Fatalf("NewInstanceAdminClient: %v", err)
1736	}
1737	defer iAdminClient.Close()
1738
1739	// Create a test instance
1740	conf := &InstanceConf{
1741		InstanceId:   instanceToCreate,
1742		ClusterId:    instanceToCreate + "-cluster",
1743		DisplayName:  "test instance",
1744		InstanceType: DEVELOPMENT,
1745		Zone:         instanceToCreateZone,
1746	}
1747	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1748		t.Fatalf("CreateInstance: %v", err)
1749	}
1750	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1751
1752	// Check the created test instances
1753	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1754	if err != nil {
1755		t.Fatalf("InstanceInfo: %v", err)
1756	}
1757	if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
1758		t.Fatalf("Labels: %v, want: %v", got, want)
1759	}
1760
1761	// Test patterns to update instance labels
1762	tests := []struct {
1763		name string
1764		in   map[string]string
1765		out  map[string]string
1766	}{
1767		{
1768			name: "update labels",
1769			in:   map[string]string{"test-label-key": "test-label-value"},
1770			out:  map[string]string{"test-label-key": "test-label-value"},
1771		},
1772		{
1773			name: "update multiple labels",
1774			in:   map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
1775			out:  map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
1776		},
1777		{
1778			name: "not update existing labels",
1779			in:   nil, // nil map
1780			out:  map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
1781		},
1782		{
1783			name: "delete labels",
1784			in:   map[string]string{}, // empty map
1785			out:  nil,
1786		},
1787	}
1788	for _, tt := range tests {
1789		t.Run(tt.name, func(t *testing.T) {
1790			confWithClusters := &InstanceWithClustersConfig{
1791				InstanceID: instanceToCreate,
1792				Labels:     tt.in,
1793			}
1794			if err := iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil {
1795				t.Fatalf("UpdateInstanceWithClusters: %v", err)
1796			}
1797			iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1798			if err != nil {
1799				t.Fatalf("InstanceInfo: %v", err)
1800			}
1801			if got, want := iInfo.Labels, tt.out; !cmp.Equal(got, want) {
1802				t.Fatalf("Labels: %v, want: %v", got, want)
1803			}
1804		})
1805	}
1806}
1807
1808func TestIntegration_AdminUpdateInstanceAndSyncClusters(t *testing.T) {
1809	if instanceToCreate == "" {
1810		t.Skip("instanceToCreate not set, skipping instance update testing")
1811	}
1812
1813	testEnv, err := NewIntegrationEnv()
1814	if err != nil {
1815		t.Fatalf("IntegrationEnv: %v", err)
1816	}
1817	defer testEnv.Close()
1818
1819	if !testEnv.Config().UseProd {
1820		t.Skip("emulator doesn't support instance creation")
1821	}
1822
1823	timeout := 5 * time.Minute
1824	ctx, _ := context.WithTimeout(context.Background(), timeout)
1825
1826	iAdminClient, err := testEnv.NewInstanceAdminClient()
1827	if err != nil {
1828		t.Fatalf("NewInstanceAdminClient: %v", err)
1829	}
1830	defer iAdminClient.Close()
1831
1832	clusterID := instanceToCreate + "-cluster"
1833
1834	// Create a development instance
1835	conf := &InstanceConf{
1836		InstanceId:   instanceToCreate,
1837		ClusterId:    clusterID,
1838		DisplayName:  "test instance",
1839		Zone:         instanceToCreateZone,
1840		InstanceType: DEVELOPMENT,
1841		Labels:       map[string]string{"test-label-key": "test-label-value"},
1842	}
1843	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1844		t.Fatalf("CreateInstance: %v", err)
1845	}
1846	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1847
1848	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1849	if err != nil {
1850		t.Fatalf("InstanceInfo: %v", err)
1851	}
1852
1853	// Basic return values are tested elsewhere, check instance type
1854	if iInfo.InstanceType != DEVELOPMENT {
1855		t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType)
1856	}
1857	if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
1858		t.Fatalf("Labels: %v, want: %v", got, want)
1859	}
1860
1861	// Update everything we can about the instance in one call.
1862	confWithClusters := &InstanceWithClustersConfig{
1863		InstanceID:   instanceToCreate,
1864		DisplayName:  "new display name",
1865		InstanceType: PRODUCTION,
1866		Labels:       map[string]string{"new-label-key": "new-label-value"},
1867		Clusters: []ClusterConfig{
1868			{ClusterID: clusterID, NumNodes: 5},
1869		},
1870	}
1871
1872	results, err := UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1873	if err != nil {
1874		t.Fatalf("UpdateInstanceAndSyncClusters: %v", err)
1875	}
1876
1877	wantResults := UpdateInstanceResults{
1878		InstanceUpdated: true,
1879		UpdatedClusters: []string{clusterID},
1880	}
1881	if diff := testutil.Diff(*results, wantResults); diff != "" {
1882		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1883	}
1884
1885	iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
1886	if err != nil {
1887		t.Fatalf("InstanceInfo: %v", err)
1888	}
1889
1890	if iInfo.InstanceType != PRODUCTION {
1891		t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType)
1892	}
1893	if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) {
1894		t.Fatalf("Labels: %v, want: %v", got, want)
1895	}
1896	if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
1897		t.Fatalf("Display name: %q, want: %q", got, want)
1898	}
1899
1900	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1901	if err != nil {
1902		t.Fatalf("GetCluster: %v", err)
1903	}
1904
1905	if cInfo.ServeNodes != 5 {
1906		t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
1907	}
1908
1909	// Now add a second cluster as the only change. The first cluster must also be provided so
1910	// it is not removed.
1911	clusterID2 := clusterID + "-2"
1912	confWithClusters = &InstanceWithClustersConfig{
1913		InstanceID: instanceToCreate,
1914		Clusters: []ClusterConfig{
1915			{ClusterID: clusterID},
1916			{ClusterID: clusterID2, NumNodes: 3, StorageType: SSD, Zone: instanceToCreateZone2},
1917		},
1918	}
1919
1920	results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1921	if err != nil {
1922		t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err)
1923	}
1924
1925	wantResults = UpdateInstanceResults{
1926		InstanceUpdated: false,
1927		CreatedClusters: []string{clusterID2},
1928	}
1929	if diff := testutil.Diff(*results, wantResults); diff != "" {
1930		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1931	}
1932
1933	// Now update one cluster and delete the other
1934	confWithClusters = &InstanceWithClustersConfig{
1935		InstanceID: instanceToCreate,
1936		Clusters: []ClusterConfig{
1937			{ClusterID: clusterID, NumNodes: 4},
1938		},
1939	}
1940
1941	results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1942	if err != nil {
1943		t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err)
1944	}
1945
1946	wantResults = UpdateInstanceResults{
1947		InstanceUpdated: false,
1948		UpdatedClusters: []string{clusterID},
1949		DeletedClusters: []string{clusterID2},
1950	}
1951
1952	if diff := testutil.Diff(*results, wantResults); diff != "" {
1953		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1954	}
1955
1956	// Make sure the instance looks as we would expect
1957	clusters, err := iAdminClient.Clusters(ctx, conf.InstanceId)
1958	if err != nil {
1959		t.Fatalf("Clusters: %v", err)
1960	}
1961
1962	if len(clusters) != 1 {
1963		t.Fatalf("Clusters length %v, want: 1", len(clusters))
1964	}
1965
1966	wantCluster := &ClusterInfo{
1967		Name:       clusterID,
1968		Zone:       instanceToCreateZone,
1969		ServeNodes: 4,
1970		State:      "READY",
1971	}
1972	if diff := testutil.Diff(clusters[0], wantCluster); diff != "" {
1973		t.Fatalf("InstanceEquality: got - want +\n%s", diff)
1974	}
1975}
1976
1977// instanceAdminClientMock is used to test FailedLocations field processing.
1978type instanceAdminClientMock struct {
1979	Clusters             []*btapb.Cluster
1980	UnavailableLocations []string
1981}
1982
1983func (iacm *instanceAdminClientMock) ListClusters(ctx context.Context, req *btapb.ListClustersRequest, opts ...grpc.CallOption) (*btapb.ListClustersResponse, error) {
1984	res := btapb.ListClustersResponse{
1985		Clusters:        iacm.Clusters,
1986		FailedLocations: iacm.UnavailableLocations,
1987	}
1988	return &res, nil
1989}
1990
1991func (iacm *instanceAdminClientMock) CreateInstance(ctx context.Context, in *btapb.CreateInstanceRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
1992	return nil, nil
1993}
1994
1995func (iacm *instanceAdminClientMock) GetInstance(ctx context.Context, in *btapb.GetInstanceRequest, opts ...grpc.CallOption) (*btapb.Instance, error) {
1996	return nil, nil
1997}
1998
1999func (iacm *instanceAdminClientMock) ListInstances(ctx context.Context, in *btapb.ListInstancesRequest, opts ...grpc.CallOption) (*btapb.ListInstancesResponse, error) {
2000	return nil, nil
2001}
2002
2003func (iacm *instanceAdminClientMock) UpdateInstance(ctx context.Context, in *btapb.Instance, opts ...grpc.CallOption) (*btapb.Instance, error) {
2004	return nil, nil
2005}
2006
2007func (iacm *instanceAdminClientMock) PartialUpdateInstance(ctx context.Context, in *btapb.PartialUpdateInstanceRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
2008	return nil, nil
2009}
2010
2011func (iacm *instanceAdminClientMock) DeleteInstance(ctx context.Context, in *btapb.DeleteInstanceRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
2012	return nil, nil
2013}
2014
2015func (iacm *instanceAdminClientMock) CreateCluster(ctx context.Context, in *btapb.CreateClusterRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
2016	return nil, nil
2017}
2018
2019func (iacm *instanceAdminClientMock) GetCluster(ctx context.Context, in *btapb.GetClusterRequest, opts ...grpc.CallOption) (*btapb.Cluster, error) {
2020	return nil, nil
2021}
2022
2023func (iacm *instanceAdminClientMock) UpdateCluster(ctx context.Context, in *btapb.Cluster, opts ...grpc.CallOption) (*longrunning.Operation, error) {
2024	return nil, nil
2025}
2026
2027func (iacm *instanceAdminClientMock) DeleteCluster(ctx context.Context, in *btapb.DeleteClusterRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
2028	return nil, nil
2029}
2030
2031func (iacm *instanceAdminClientMock) CreateAppProfile(ctx context.Context, in *btapb.CreateAppProfileRequest, opts ...grpc.CallOption) (*btapb.AppProfile, error) {
2032	return nil, nil
2033}
2034
2035func (iacm *instanceAdminClientMock) GetAppProfile(ctx context.Context, in *btapb.GetAppProfileRequest, opts ...grpc.CallOption) (*btapb.AppProfile, error) {
2036	return nil, nil
2037}
2038
2039func (iacm *instanceAdminClientMock) ListAppProfiles(ctx context.Context, in *btapb.ListAppProfilesRequest, opts ...grpc.CallOption) (*btapb.ListAppProfilesResponse, error) {
2040	return nil, nil
2041}
2042
2043func (iacm *instanceAdminClientMock) UpdateAppProfile(ctx context.Context, in *btapb.UpdateAppProfileRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
2044	return nil, nil
2045}
2046
2047func (iacm *instanceAdminClientMock) DeleteAppProfile(ctx context.Context, in *btapb.DeleteAppProfileRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
2048	return nil, nil
2049}
2050
2051func (iacm *instanceAdminClientMock) GetIamPolicy(ctx context.Context, in *v1.GetIamPolicyRequest, opts ...grpc.CallOption) (*v1.Policy, error) {
2052	return nil, nil
2053}
2054
2055func (iacm *instanceAdminClientMock) SetIamPolicy(ctx context.Context, in *v1.SetIamPolicyRequest, opts ...grpc.CallOption) (*v1.Policy, error) {
2056	return nil, nil
2057}
2058
2059func (iacm *instanceAdminClientMock) TestIamPermissions(ctx context.Context, in *v1.TestIamPermissionsRequest, opts ...grpc.CallOption) (*v1.TestIamPermissionsResponse, error) {
2060	return nil, nil
2061}
2062
2063func TestIntegration_InstanceAdminClient_Clusters_WithFailedLocations(t *testing.T) {
2064	testEnv, err := NewIntegrationEnv()
2065	if err != nil {
2066		t.Fatalf("IntegrationEnv: %v", err)
2067	}
2068	defer testEnv.Close()
2069
2070	if !testEnv.Config().UseProd {
2071		t.Skip("emulator doesn't support snapshots")
2072	}
2073
2074	iAdminClient, err := testEnv.NewInstanceAdminClient()
2075	if err != nil {
2076		t.Fatalf("NewInstanceAdminClient: %v", err)
2077	}
2078	defer iAdminClient.Close()
2079
2080	cluster1 := btapb.Cluster{Name: "cluster1"}
2081	failedLoc := "euro1"
2082
2083	iAdminClient.iClient = &instanceAdminClientMock{
2084		Clusters:             []*btapb.Cluster{&cluster1},
2085		UnavailableLocations: []string{failedLoc},
2086	}
2087
2088	cis, err := iAdminClient.Clusters(context.Background(), "instance-id")
2089	convertedErr, ok := err.(ErrPartiallyUnavailable)
2090	if !ok {
2091		t.Fatalf("want error ErrPartiallyUnavailable, got other")
2092	}
2093	if got, want := len(convertedErr.Locations), 1; got != want {
2094		t.Fatalf("want %v failed locations, got %v", want, got)
2095	}
2096	if got, want := convertedErr.Locations[0], failedLoc; got != want {
2097		t.Fatalf("want failed location %v, got %v", want, got)
2098	}
2099	if got, want := len(cis), 1; got != want {
2100		t.Fatalf("want %v failed locations, got %v", want, got)
2101	}
2102	if got, want := cis[0].Name, cluster1.Name; got != want {
2103		t.Fatalf("want cluster %v, got %v", want, got)
2104	}
2105}
2106
2107func TestIntegration_Granularity(t *testing.T) {
2108	testEnv, err := NewIntegrationEnv()
2109	if err != nil {
2110		t.Fatalf("IntegrationEnv: %v", err)
2111	}
2112	defer testEnv.Close()
2113
2114	timeout := 2 * time.Second
2115	if testEnv.Config().UseProd {
2116		timeout = 5 * time.Minute
2117	}
2118	ctx, _ := context.WithTimeout(context.Background(), timeout)
2119
2120	adminClient, err := testEnv.NewAdminClient()
2121	if err != nil {
2122		t.Fatalf("NewAdminClient: %v", err)
2123	}
2124	defer adminClient.Close()
2125
2126	list := func() []string {
2127		tbls, err := adminClient.Tables(ctx)
2128		if err != nil {
2129			t.Fatalf("Fetching list of tables: %v", err)
2130		}
2131		sort.Strings(tbls)
2132		return tbls
2133	}
2134	containsAll := func(got, want []string) bool {
2135		gotSet := make(map[string]bool)
2136
2137		for _, s := range got {
2138			gotSet[s] = true
2139		}
2140		for _, s := range want {
2141			if !gotSet[s] {
2142				return false
2143			}
2144		}
2145		return true
2146	}
2147
2148	defer deleteTable(ctx, t, adminClient, myTableName)
2149
2150	if err := adminClient.CreateTable(ctx, myTableName); err != nil {
2151		t.Fatalf("Creating table: %v", err)
2152	}
2153
2154	tables := list()
2155	if got, want := tables, []string{myTableName}; !containsAll(got, want) {
2156		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
2157	}
2158
2159	// calling ModifyColumnFamilies to check the granularity of table
2160	prefix := adminClient.instancePrefix()
2161	req := &btapb.ModifyColumnFamiliesRequest{
2162		Name: prefix + "/tables/" + myTableName,
2163		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
2164			Id:  "cf",
2165			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}},
2166		}},
2167	}
2168	table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req)
2169	if err != nil {
2170		t.Fatalf("Creating column family: %v", err)
2171	}
2172	if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) {
2173		t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS))
2174	}
2175}
2176
2177func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) {
2178	testEnv, err := NewIntegrationEnv()
2179	if err != nil {
2180		t.Fatalf("IntegrationEnv: %v", err)
2181	}
2182	defer testEnv.Close()
2183
2184	timeout := 2 * time.Second
2185	if testEnv.Config().UseProd {
2186		timeout = 5 * time.Minute
2187	}
2188	ctx, cancel := context.WithTimeout(context.Background(), timeout)
2189	defer cancel()
2190
2191	adminClient, err := testEnv.NewAdminClient()
2192	if err != nil {
2193		t.Fatalf("NewAdminClient: %v", err)
2194	}
2195	defer adminClient.Close()
2196
2197	iAdminClient, err := testEnv.NewInstanceAdminClient()
2198	if err != nil {
2199		t.Fatalf("NewInstanceAdminClient: %v", err)
2200	}
2201
2202	if iAdminClient == nil {
2203		return
2204	}
2205
2206	err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1")
2207
2208	defer iAdminClient.Close()
2209	profile := ProfileConf{
2210		ProfileID:     "app_profile1",
2211		InstanceID:    adminClient.instance,
2212		ClusterID:     testEnv.Config().Cluster,
2213		Description:   "creating new app profile 1",
2214		RoutingPolicy: SingleClusterRouting,
2215	}
2216
2217	createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile)
2218	if err != nil {
2219		t.Fatalf("Creating app profile: %v", err)
2220	}
2221
2222	gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
2223	if err != nil {
2224		t.Fatalf("Get app profile: %v", err)
2225	}
2226
2227	if !proto.Equal(createdProfile, gotProfile) {
2228		t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name)
2229	}
2230
2231	list := func(instanceID string) ([]*btapb.AppProfile, error) {
2232		profiles := []*btapb.AppProfile(nil)
2233
2234		it := iAdminClient.ListAppProfiles(ctx, instanceID)
2235		for {
2236			s, err := it.Next()
2237			if err == iterator.Done {
2238				break
2239			}
2240			if err != nil {
2241				return nil, err
2242			}
2243			profiles = append(profiles, s)
2244		}
2245		return profiles, err
2246	}
2247
2248	profiles, err := list(adminClient.instance)
2249	if err != nil {
2250		t.Fatalf("List app profile: %v", err)
2251	}
2252
2253	// App Profile list should contain default, app_profile1
2254	if got, want := len(profiles), 2; got != want {
2255		t.Fatalf("Initial app profile list len: %d, want: %d", got, want)
2256	}
2257
2258	for _, test := range []struct {
2259		desc   string
2260		uattrs ProfileAttrsToUpdate
2261		want   *btapb.AppProfile // nil means error
2262	}{
2263		{
2264			desc:   "empty update",
2265			uattrs: ProfileAttrsToUpdate{},
2266			want:   nil,
2267		},
2268
2269		{
2270			desc:   "empty description update",
2271			uattrs: ProfileAttrsToUpdate{Description: ""},
2272			want: &btapb.AppProfile{
2273				Name:          gotProfile.Name,
2274				Description:   "",
2275				RoutingPolicy: gotProfile.RoutingPolicy,
2276				Etag:          gotProfile.Etag,
2277			},
2278		},
2279		{
2280			desc: "routing update",
2281			uattrs: ProfileAttrsToUpdate{
2282				RoutingPolicy: SingleClusterRouting,
2283				ClusterID:     testEnv.Config().Cluster,
2284			},
2285			want: &btapb.AppProfile{
2286				Name:        gotProfile.Name,
2287				Description: "",
2288				Etag:        gotProfile.Etag,
2289				RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
2290					SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
2291						ClusterId: testEnv.Config().Cluster,
2292					},
2293				},
2294			},
2295		},
2296	} {
2297		err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs)
2298		if err != nil {
2299			if test.want != nil {
2300				t.Errorf("%s: %v", test.desc, err)
2301			}
2302			continue
2303		}
2304		if err == nil && test.want == nil {
2305			t.Errorf("%s: got nil, want error", test.desc)
2306			continue
2307		}
2308
2309		got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
2310
2311		if !proto.Equal(got, test.want) {
2312			t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want)
2313		}
2314
2315	}
2316
2317	err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1")
2318	if err != nil {
2319		t.Fatalf("Delete app profile: %v", err)
2320	}
2321}
2322
2323func TestIntegration_InstanceUpdate(t *testing.T) {
2324	testEnv, err := NewIntegrationEnv()
2325	if err != nil {
2326		t.Fatalf("IntegrationEnv: %v", err)
2327	}
2328	defer testEnv.Close()
2329
2330	timeout := 2 * time.Second
2331	if testEnv.Config().UseProd {
2332		timeout = 5 * time.Minute
2333	}
2334	ctx, cancel := context.WithTimeout(context.Background(), timeout)
2335	defer cancel()
2336
2337	adminClient, err := testEnv.NewAdminClient()
2338	if err != nil {
2339		t.Fatalf("NewAdminClient: %v", err)
2340	}
2341
2342	defer adminClient.Close()
2343
2344	iAdminClient, err := testEnv.NewInstanceAdminClient()
2345	if err != nil {
2346		t.Fatalf("NewInstanceAdminClient: %v", err)
2347	}
2348
2349	if iAdminClient == nil {
2350		return
2351	}
2352
2353	defer iAdminClient.Close()
2354
2355	iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
2356	if err != nil {
2357		t.Errorf("InstanceInfo: %v", err)
2358	}
2359	if iInfo.Name != adminClient.instance {
2360		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
2361	}
2362
2363	if iInfo.DisplayName != adminClient.instance {
2364		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.DisplayName, adminClient.instance)
2365	}
2366
2367	const numNodes = 4
2368	// update cluster nodes
2369	if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil {
2370		t.Errorf("UpdateCluster: %v", err)
2371	}
2372
2373	// get cluster after updating
2374	cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster)
2375	if err != nil {
2376		t.Errorf("GetCluster %v", err)
2377	}
2378	if cis.ServeNodes != int(numNodes) {
2379		t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes))
2380	}
2381}
2382
2383func TestIntegration_AdminBackup(t *testing.T) {
2384	testEnv, err := NewIntegrationEnv()
2385	if err != nil {
2386		t.Fatalf("IntegrationEnv: %v", err)
2387	}
2388	defer testEnv.Close()
2389
2390	if !testEnv.Config().UseProd {
2391		t.Skip("emulator doesn't support backups")
2392	}
2393
2394	timeout := 5 * time.Minute
2395	ctx, _ := context.WithTimeout(context.Background(), timeout)
2396
2397	adminClient, err := testEnv.NewAdminClient()
2398	if err != nil {
2399		t.Fatalf("NewAdminClient: %v", err)
2400	}
2401	defer adminClient.Close()
2402
2403	tblConf := TableConf{
2404		TableID: testEnv.Config().Table,
2405		Families: map[string]GCPolicy{
2406			"fam1": MaxVersionsPolicy(1),
2407			"fam2": MaxVersionsPolicy(2),
2408		},
2409	}
2410	if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
2411		t.Fatalf("Creating table from TableConf: %v", err)
2412	}
2413	// Delete the table at the end of the test. Schedule ahead of time
2414	// in case the client fails
2415	defer deleteTable(ctx, t, adminClient, tblConf.TableID)
2416
2417	sourceInstance := testEnv.Config().Instance
2418	sourceCluster := testEnv.Config().Cluster
2419
2420	iAdminClient, err := testEnv.NewInstanceAdminClient()
2421	if err != nil {
2422		t.Fatalf("NewInstanceAdminClient: %v", err)
2423	}
2424	defer iAdminClient.Close()
2425	uniqueID := make([]byte, 8)
2426	_, err = rand.Read(uniqueID)
2427	diffInstance := testEnv.Config().Instance + "-d" + string(uniqueID)
2428	diffCluster := sourceCluster + "-d"
2429	conf := &InstanceConf{
2430		InstanceId:   diffInstance,
2431		ClusterId:    diffCluster,
2432		DisplayName:  "different test sourceInstance",
2433		Zone:         instanceToCreateZone2,
2434		InstanceType: DEVELOPMENT,
2435		Labels:       map[string]string{"test-label-key": "test-label-value"},
2436	}
2437	defer iAdminClient.DeleteInstance(ctx, diffInstance)
2438	// Create different instance to restore table.
2439	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
2440		t.Errorf("CreateInstance: %v", err)
2441	}
2442
2443	list := func(cluster string) ([]*BackupInfo, error) {
2444		infos := []*BackupInfo(nil)
2445
2446		it := adminClient.Backups(ctx, cluster)
2447		for {
2448			s, err := it.Next()
2449			if err == iterator.Done {
2450				break
2451			}
2452			if err != nil {
2453				return nil, err
2454			}
2455			infos = append(infos, s)
2456		}
2457		return infos, err
2458	}
2459
2460	// Create backup
2461	if err != nil {
2462		t.Fatalf("Failed to generate a unique ID: %v", err)
2463	}
2464
2465	backupName := fmt.Sprintf("mybackup-%x", uniqueID)
2466	defer adminClient.DeleteBackup(ctx, sourceCluster, backupName)
2467
2468	if err = adminClient.CreateBackup(ctx, tblConf.TableID, sourceCluster, backupName, time.Now().Add(8*time.Hour)); err != nil {
2469		t.Fatalf("Creating backup: %v", err)
2470	}
2471
2472	// List backup
2473	backups, err := list(sourceCluster)
2474	if err != nil {
2475		t.Fatalf("Listing backups: %v", err)
2476	}
2477	if got, want := len(backups), 1; got < want {
2478		t.Fatalf("Listing backup count: %d, want: >= %d", got, want)
2479	}
2480
2481	foundBackup := false
2482	for _, backup := range backups {
2483		if backup.Name == backupName {
2484			foundBackup = true
2485
2486			if got, want := backup.SourceTable, tblConf.TableID; got != want {
2487				t.Errorf("Backup SourceTable: %s, want: %s", got, want)
2488			}
2489			if got, want := backup.ExpireTime, backup.StartTime.Add(8*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
2490				t.Errorf("Backup ExpireTime: %s, want: %s", got, want)
2491			}
2492
2493			break
2494		}
2495	}
2496
2497	if !foundBackup {
2498		t.Errorf("Backup not found: %v", backupName)
2499	}
2500
2501	// Get backup
2502	backup, err := adminClient.BackupInfo(ctx, sourceCluster, backupName)
2503	if err != nil {
2504		t.Fatalf("BackupInfo: %v", backup)
2505	}
2506	if got, want := *backup, *backups[0]; cmp.Equal(got, &want) {
2507		t.Errorf("BackupInfo: %v, want: %v", got, want)
2508	}
2509
2510	// Update backup
2511	newExpireTime := time.Now().Add(10 * time.Hour)
2512	err = adminClient.UpdateBackup(ctx, sourceCluster, backupName, newExpireTime)
2513	if err != nil {
2514		t.Fatalf("UpdateBackup failed: %v", err)
2515	}
2516
2517	// Check that updated backup has the correct expire time
2518	updatedBackup, err := adminClient.BackupInfo(ctx, sourceCluster, backupName)
2519	if err != nil {
2520		t.Fatalf("BackupInfo: %v", err)
2521	}
2522	backup.ExpireTime = newExpireTime
2523	// Server clock and local clock may not be perfectly sync'ed.
2524	if got, want := *updatedBackup, *backup; got.ExpireTime.Sub(want.ExpireTime) > time.Minute {
2525		t.Errorf("BackupInfo: %v, want: %v", got, want)
2526	}
2527
2528	// Restore backup
2529	restoredTable := tblConf.TableID + "-restored"
2530	defer deleteTable(ctx, t, adminClient, restoredTable)
2531	if err = adminClient.RestoreTable(ctx, restoredTable, sourceCluster, backupName); err != nil {
2532		t.Fatalf("RestoreTable: %v", err)
2533	}
2534	if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
2535		t.Fatalf("Restored TableInfo: %v", err)
2536	}
2537	// Restore backup to different instance
2538	restoreTableName := tblConf.TableID + "-diff-restored"
2539	diffConf := IntegrationTestConfig{
2540		Project:  testEnv.Config().Project,
2541		Instance: diffInstance,
2542		Cluster:  diffCluster,
2543		Table:    restoreTableName,
2544	}
2545	env := &ProdEnv{
2546		config: diffConf,
2547	}
2548	dAdminClient, err := env.NewAdminClient()
2549	if err != nil {
2550		t.Errorf("NewAdminClient: %v", err)
2551	}
2552	defer dAdminClient.Close()
2553
2554	defer deleteTable(ctx, t, dAdminClient, restoreTableName)
2555	if err = dAdminClient.RestoreTableFrom(ctx, sourceInstance, restoreTableName, sourceCluster, backupName); err != nil {
2556		t.Fatalf("RestoreTableFrom: %v", err)
2557	}
2558	tblInfo, err := dAdminClient.TableInfo(ctx, restoreTableName)
2559	if err != nil {
2560		t.Fatalf("Restored to different sourceInstance failed, TableInfo: %v", err)
2561	}
2562	families := tblInfo.Families
2563	sort.Strings(tblInfo.Families)
2564	wantFams := []string{"fam1", "fam2"}
2565	if !testutil.Equal(families, wantFams) {
2566		t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams)
2567	}
2568
2569	// Delete backup
2570	if err = adminClient.DeleteBackup(ctx, sourceCluster, backupName); err != nil {
2571		t.Fatalf("DeleteBackup: %v", err)
2572	}
2573	backups, err = list(sourceCluster)
2574	if err != nil {
2575		t.Fatalf("List after Delete: %v", err)
2576	}
2577	if got, want := len(backups), 0; got != want {
2578		t.Errorf("List after delete len: %d, want: %d", got, want)
2579	}
2580}
2581
2582// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
2583func TestIntegration_DirectPathFallback(t *testing.T) {
2584	ctx := context.Background()
2585	testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
2586	if err != nil {
2587		t.Fatal(err)
2588	}
2589	defer cleanup()
2590
2591	if !testEnv.Config().AttemptDirectPath {
2592		t.Skip()
2593	}
2594
2595	if len(blackholeDpv6Cmd) == 0 {
2596		t.Fatal("-it.blackhole-dpv6-cmd unset")
2597	}
2598	if len(blackholeDpv4Cmd) == 0 {
2599		t.Fatal("-it.blackhole-dpv4-cmd unset")
2600	}
2601	if len(allowDpv6Cmd) == 0 {
2602		t.Fatal("-it.allowdpv6-cmd unset")
2603	}
2604	if len(allowDpv4Cmd) == 0 {
2605		t.Fatal("-it.allowdpv4-cmd unset")
2606	}
2607
2608	if err := populatePresidentsGraph(table); err != nil {
2609		t.Fatal(err)
2610	}
2611
2612	// Precondition: wait for DirectPath to connect.
2613	dpEnabled := examineTraffic(ctx, testEnv, table, false)
2614	if !dpEnabled {
2615		t.Fatalf("Failed to observe RPCs over DirectPath")
2616	}
2617
2618	// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
2619	blackholeDirectPath(testEnv, t)
2620	dpDisabled := examineTraffic(ctx, testEnv, table, true)
2621	if !dpDisabled {
2622		t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
2623	}
2624
2625	// Disable the blackhole, and client should use DirectPath again.
2626	allowDirectPath(testEnv, t)
2627	dpEnabled = examineTraffic(ctx, testEnv, table, false)
2628	if !dpEnabled {
2629		t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
2630	}
2631}
2632
2633// examineTraffic returns whether RPCs use DirectPath (blackholeDP = false) or CFE (blackholeDP = true).
2634func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool {
2635	numCount := 0
2636	const (
2637		numRPCsToSend  = 20
2638		minCompleteRPC = 40
2639	)
2640
2641	start := time.Now()
2642	for time.Since(start) < 2*time.Minute {
2643		for i := 0; i < numRPCsToSend; i++ {
2644			_, _ = table.ReadRow(ctx, "j§adams")
2645			if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != blackholeDP {
2646				numCount++
2647				if numCount >= minCompleteRPC {
2648					return true
2649				}
2650			}
2651			time.Sleep(100 * time.Millisecond)
2652		}
2653	}
2654	return false
2655}
2656
2657// Retries a function if it has an unavailable code for up to 30s using a backoff
2658func retryOnUnavailable(ctx context.Context, inner func() error) error {
2659	err := internal.Retry(ctx, gax.Backoff{Initial: 100 * time.Millisecond}, func() (stop bool, err error) {
2660		err = inner()
2661		if err == nil {
2662			return true, nil
2663		}
2664		s, ok := status.FromError(err)
2665		if !ok {
2666			return false, err
2667		}
2668		// Retry on Unavailable
2669		return s.Code() != codes.Unavailable, err
2670	})
2671	return err
2672}
2673
2674func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
2675	testEnv, err := NewIntegrationEnv()
2676	if err != nil {
2677		return nil, nil, nil, nil, "", nil, err
2678	}
2679
2680	var timeout time.Duration
2681	if testEnv.Config().UseProd {
2682		timeout = 10 * time.Minute
2683		t.Logf("Running test against production")
2684	} else {
2685		timeout = 5 * time.Minute
2686		t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint)
2687	}
2688	ctx, cancel := context.WithTimeout(ctx, timeout)
2689
2690	client, err := testEnv.NewClient()
2691	if err != nil {
2692		t.Logf("Error creating client: %v", err)
2693		return nil, nil, nil, nil, "", nil, err
2694	}
2695
2696	adminClient, err := testEnv.NewAdminClient()
2697	if err != nil {
2698		t.Logf("Error creating admin client: %v", err)
2699
2700		return nil, nil, nil, nil, "", nil, err
2701	}
2702
2703	if testEnv.Config().UseProd {
2704		// TODO: tables may not be successfully deleted in some cases, and will
2705		// become obsolete. We may need a way to automatically delete them.
2706		tableName = tableNameSpace.New()
2707	} else {
2708		tableName = testEnv.Config().Table
2709	}
2710
2711	if err := adminClient.CreateTable(ctx, tableName); err != nil {
2712		cancel()
2713		t.Logf("Error creating table: %v", err)
2714		return nil, nil, nil, nil, "", nil, err
2715	}
2716
2717	err = retryOnUnavailable(ctx, func() error {
2718		return adminClient.CreateColumnFamily(ctx, tableName, "follows")
2719	})
2720	if err != nil {
2721		cancel()
2722		t.Logf("Error creating column family: %v", err)
2723		return nil, nil, nil, nil, "", nil, err
2724	}
2725
2726	return testEnv, client, adminClient, client.Open(tableName), tableName, func() {
2727		if err := adminClient.DeleteTable(ctx, tableName); err != nil {
2728			t.Errorf("DeleteTable got error %v", err)
2729		}
2730		cancel()
2731		client.Close()
2732		adminClient.Close()
2733	}, nil
2734}
2735
2736func formatReadItem(ri ReadItem) string {
2737	// Use the column qualifier only to make the test data briefer.
2738	col := ri.Column[strings.Index(ri.Column, ":")+1:]
2739	return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value)
2740}
2741
2742func fill(b, sub []byte) {
2743	for len(b) > len(sub) {
2744		n := copy(b, sub)
2745		b = b[n:]
2746	}
2747}
2748
2749func clearTimestamps(r Row) {
2750	for _, ris := range r {
2751		for i := range ris {
2752			ris[i].Timestamp = 0
2753		}
2754	}
2755}
2756
2757func deleteTable(ctx context.Context, t *testing.T, ac *AdminClient, name string) {
2758	bo := gax.Backoff{
2759		Initial:    100 * time.Millisecond,
2760		Max:        2 * time.Second,
2761		Multiplier: 1.2,
2762	}
2763	ctx, _ = context.WithTimeout(ctx, time.Second*30)
2764
2765	err := internal.Retry(ctx, bo, func() (bool, error) {
2766		err := ac.DeleteTable(ctx, name)
2767		if err != nil {
2768			return false, err
2769		}
2770		return true, nil
2771	})
2772	if err != nil {
2773		t.Logf("DeleteTable: %v", err)
2774	}
2775}
2776
2777func verifyDirectPathRemoteAddress(testEnv IntegrationEnv, t *testing.T) {
2778	t.Helper()
2779	if !testEnv.Config().AttemptDirectPath {
2780		return
2781	}
2782	if remoteIP, res := isDirectPathRemoteAddress(testEnv); !res {
2783		if testEnv.Config().DirectPathIPV4Only {
2784			t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP)
2785		} else {
2786			t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP)
2787		}
2788	}
2789}
2790
2791func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) {
2792	remoteIP := testEnv.Peer().Addr.String()
2793	// DirectPath ipv4-only can only use ipv4 traffic.
2794	if testEnv.Config().DirectPathIPV4Only {
2795		return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix)
2796	}
2797	// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
2798	return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
2799}
2800
2801func blackholeDirectPath(testEnv IntegrationEnv, t *testing.T) {
2802	cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
2803	out, _ := cmdRes.CombinedOutput()
2804	t.Logf(string(out))
2805	if testEnv.Config().DirectPathIPV4Only {
2806		return
2807	}
2808	cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
2809	out, _ = cmdRes.CombinedOutput()
2810	t.Logf(string(out))
2811}
2812
2813func allowDirectPath(testEnv IntegrationEnv, t *testing.T) {
2814	cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
2815	out, _ := cmdRes.CombinedOutput()
2816	t.Logf(string(out))
2817	if testEnv.Config().DirectPathIPV4Only {
2818		return
2819	}
2820	cmdRes = exec.Command("bash", "-c", allowDpv6Cmd)
2821	out, _ = cmdRes.CombinedOutput()
2822	t.Logf(string(out))
2823}
2824