1/*
2Copyright 2019 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"context"
21	"flag"
22	"fmt"
23	"math"
24	"math/rand"
25	"reflect"
26	"sort"
27	"strings"
28	"sync"
29	"testing"
30	"time"
31
32	"cloud.google.com/go/internal"
33	"cloud.google.com/go/internal/testutil"
34	"cloud.google.com/go/internal/uid"
35	"github.com/golang/protobuf/proto"
36	"github.com/google/go-cmp/cmp"
37	gax "github.com/googleapis/gax-go/v2"
38	"google.golang.org/api/iterator"
39	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
40)
41
42var (
43	presidentsSocialGraph = map[string][]string{
44		"wmckinley":   {"tjefferson"},
45		"gwashington": {"jadams"},
46		"tjefferson":  {"gwashington", "jadams"},
47		"jadams":      {"gwashington", "tjefferson"},
48	}
49
50	tableNameSpace = uid.NewSpace("cbt-test", &uid.Options{Short: true})
51)
52
53func populatePresidentsGraph(table *Table) error {
54	ctx := context.Background()
55	for row, ss := range presidentsSocialGraph {
56		mut := NewMutation()
57		for _, name := range ss {
58			mut.Set("follows", name, 1000, []byte("1"))
59		}
60		if err := table.Apply(ctx, row, mut); err != nil {
61			return fmt.Errorf("Mutating row %q: %v", row, err)
62		}
63	}
64	return nil
65}
66
67var instanceToCreate string
68var instanceToCreateZone string
69var instanceToCreateZone2 string
70
71func init() {
72	// Don't test instance creation by default, as quota is necessary and aborted tests could strand resources.
73	flag.StringVar(&instanceToCreate, "it.instance-to-create", "",
74		"The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.")
75	flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b",
76		"The zone in which to create the new test instance.")
77	flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c",
78		"The zone in which to create a second cluster in the test instance.")
79}
80
81func TestIntegration_ConditionalMutations(t *testing.T) {
82	ctx := context.Background()
83	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
84	if err != nil {
85		t.Fatal(err)
86	}
87	defer cleanup()
88
89	if err := populatePresidentsGraph(table); err != nil {
90		t.Fatal(err)
91	}
92
93	// Do a conditional mutation with a complex filter.
94	mutTrue := NewMutation()
95	mutTrue.Set("follows", "wmckinley", 1000, []byte("1"))
96	filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter("."))
97	mut := NewCondMutation(filter, mutTrue, nil)
98	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
99		t.Fatalf("Conditionally mutating row: %v", err)
100	}
101	// Do a second condition mutation with a filter that does not match,
102	// and thus no changes should be made.
103	mutTrue = NewMutation()
104	mutTrue.DeleteRow()
105	filter = ColumnFilter("snoop.dogg")
106	mut = NewCondMutation(filter, mutTrue, nil)
107	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
108		t.Fatalf("Conditionally mutating row: %v", err)
109	}
110
111	// Fetch a row.
112	row, err := table.ReadRow(ctx, "jadams")
113	if err != nil {
114		t.Fatalf("Reading a row: %v", err)
115	}
116	wantRow := Row{
117		"follows": []ReadItem{
118			{Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")},
119			{Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")},
120		},
121	}
122	if !testutil.Equal(row, wantRow) {
123		t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
124	}
125}
126
127func TestIntegration_PartialReadRows(t *testing.T) {
128	ctx := context.Background()
129	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
130	if err != nil {
131		t.Fatal(err)
132	}
133	defer cleanup()
134
135	if err := populatePresidentsGraph(table); err != nil {
136		t.Fatal(err)
137	}
138
139	// Do a scan and stop part way through.
140	// Verify that the ReadRows callback doesn't keep running.
141	stopped := false
142	err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
143		if r.Key() < "h" {
144			return true
145		}
146		if !stopped {
147			stopped = true
148			return false
149		}
150		t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key())
151		return false
152	})
153	if err != nil {
154		t.Fatalf("Partial ReadRows: %v", err)
155	}
156}
157
158func TestIntegration_ReadRowList(t *testing.T) {
159	ctx := context.Background()
160	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
161	if err != nil {
162		t.Fatal(err)
163	}
164	defer cleanup()
165
166	if err := populatePresidentsGraph(table); err != nil {
167		t.Fatal(err)
168	}
169
170	// Read a RowList
171	var elt []string
172	keys := RowList{"wmckinley", "gwashington", "jadams"}
173	want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1"
174	err = table.ReadRows(ctx, keys, func(r Row) bool {
175		for _, ris := range r {
176			for _, ri := range ris {
177				elt = append(elt, formatReadItem(ri))
178			}
179		}
180		return true
181	})
182	if err != nil {
183		t.Fatalf("read RowList: %v", err)
184	}
185
186	if got := strings.Join(elt, ","); got != want {
187		t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
188	}
189}
190
191func TestIntegration_DeleteRow(t *testing.T) {
192	ctx := context.Background()
193	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
194	if err != nil {
195		t.Fatal(err)
196	}
197	defer cleanup()
198
199	if err := populatePresidentsGraph(table); err != nil {
200		t.Fatal(err)
201	}
202
203	// Delete a row and check it goes away.
204	mut := NewMutation()
205	mut.DeleteRow()
206	if err := table.Apply(ctx, "wmckinley", mut); err != nil {
207		t.Fatalf("Apply DeleteRow: %v", err)
208	}
209	row, err := table.ReadRow(ctx, "wmckinley")
210	if err != nil {
211		t.Fatalf("Reading a row after DeleteRow: %v", err)
212	}
213	if len(row) != 0 {
214		t.Fatalf("Read non-zero row after DeleteRow: %v", row)
215	}
216}
217
218func TestIntegration_ReadModifyWrite(t *testing.T) {
219	ctx := context.Background()
220	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
221	if err != nil {
222		t.Fatal(err)
223	}
224	defer cleanup()
225
226	if err := populatePresidentsGraph(table); err != nil {
227		t.Fatal(err)
228	}
229
230	if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil {
231		t.Fatalf("Creating column family: %v", err)
232	}
233
234	appendRMW := func(b []byte) *ReadModifyWrite {
235		rmw := NewReadModifyWrite()
236		rmw.AppendValue("counter", "likes", b)
237		return rmw
238	}
239	incRMW := func(n int64) *ReadModifyWrite {
240		rmw := NewReadModifyWrite()
241		rmw.Increment("counter", "likes", n)
242		return rmw
243	}
244	rmwSeq := []struct {
245		desc string
246		rmw  *ReadModifyWrite
247		want []byte
248	}{
249		{
250			desc: "append #1",
251			rmw:  appendRMW([]byte{0, 0, 0}),
252			want: []byte{0, 0, 0},
253		},
254		{
255			desc: "append #2",
256			rmw:  appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17
257			want: []byte{0, 0, 0, 0, 0, 0, 0, 17},
258		},
259		{
260			desc: "increment",
261			rmw:  incRMW(8),
262			want: []byte{0, 0, 0, 0, 0, 0, 0, 25},
263		},
264	}
265	for _, step := range rmwSeq {
266		row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw)
267		if err != nil {
268			t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err)
269		}
270		// Make sure the modified cell returned by the RMW operation has a timestamp.
271		if row["counter"][0].Timestamp == 0 {
272			t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp)
273		}
274		clearTimestamps(row)
275		wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}}
276		if !testutil.Equal(row, wantRow) {
277			t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow)
278		}
279	}
280
281	// Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator.
282	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0}))
283	if err != nil {
284		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
285	}
286	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0}))
287	if err != nil {
288		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
289	}
290	// Get only the correct row back on read.
291	r, err := table.ReadRow(ctx, "issue-723-1")
292	if err != nil {
293		t.Fatalf("Reading row: %v", err)
294	}
295	if r.Key() != "issue-723-1" {
296		t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1")
297	}
298}
299
300func TestIntegration_ArbitraryTimestamps(t *testing.T) {
301	ctx := context.Background()
302	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
303	if err != nil {
304		t.Fatal(err)
305	}
306	defer cleanup()
307
308	// Test arbitrary timestamps more thoroughly.
309	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
310		t.Fatalf("Creating column family: %v", err)
311	}
312	const numVersions = 4
313	mut := NewMutation()
314	for i := 1; i < numVersions; i++ {
315		// Timestamps are used in thousands because the server
316		// only permits that granularity.
317		mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
318		mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
319	}
320	if err := table.Apply(ctx, "testrow", mut); err != nil {
321		t.Fatalf("Mutating row: %v", err)
322	}
323	r, err := table.ReadRow(ctx, "testrow")
324	if err != nil {
325		t.Fatalf("Reading row: %v", err)
326	}
327	wantRow := Row{"ts": []ReadItem{
328		// These should be returned in descending timestamp order.
329		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
330		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
331		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
332		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
333		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
334		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
335	}}
336	if !testutil.Equal(r, wantRow) {
337		t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow)
338	}
339
340	// Do the same read, but filter to the latest two versions.
341	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
342	if err != nil {
343		t.Fatalf("Reading row: %v", err)
344	}
345	wantRow = Row{"ts": []ReadItem{
346		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
347		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
348		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
349		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
350	}}
351	if !testutil.Equal(r, wantRow) {
352		t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
353	}
354	// Check cell offset / limit
355	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3)))
356	if err != nil {
357		t.Fatalf("Reading row: %v", err)
358	}
359	wantRow = Row{"ts": []ReadItem{
360		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
361		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
362		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
363	}}
364	if !testutil.Equal(r, wantRow) {
365		t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow)
366	}
367	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3)))
368	if err != nil {
369		t.Fatalf("Reading row: %v", err)
370	}
371	wantRow = Row{"ts": []ReadItem{
372		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
373		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
374		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
375	}}
376	if !testutil.Equal(r, wantRow) {
377		t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow)
378	}
379	// Check timestamp range filtering (with truncation)
380	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000)))
381	if err != nil {
382		t.Fatalf("Reading row: %v", err)
383	}
384	wantRow = Row{"ts": []ReadItem{
385		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
386		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
387		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
388		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
389	}}
390	if !testutil.Equal(r, wantRow) {
391		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow)
392	}
393	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0)))
394	if err != nil {
395		t.Fatalf("Reading row: %v", err)
396	}
397	wantRow = Row{"ts": []ReadItem{
398		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
399		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
400		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
401		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
402		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
403		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
404	}}
405	if !testutil.Equal(r, wantRow) {
406		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow)
407	}
408	// Delete non-existing cells, no such column family in this row
409	// Should not delete anything
410	if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil {
411		t.Fatalf("Creating column family: %v", err)
412	}
413	mut = NewMutation()
414	mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval
415	if err := table.Apply(ctx, "testrow", mut); err != nil {
416		t.Fatalf("Mutating row: %v", err)
417	}
418	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
419	if err != nil {
420		t.Fatalf("Reading row: %v", err)
421	}
422	if !testutil.Equal(r, wantRow) {
423		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
424	}
425	// Delete non-existing cells, no such column in this column family
426	// Should not delete anything
427	mut = NewMutation()
428	mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval
429	if err := table.Apply(ctx, "testrow", mut); err != nil {
430		t.Fatalf("Mutating row: %v", err)
431	}
432	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
433	if err != nil {
434		t.Fatalf("Reading row: %v", err)
435	}
436	if !testutil.Equal(r, wantRow) {
437		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
438	}
439	// Delete the cell with timestamp 2000 and repeat the last read,
440	// checking that we get ts 3000 and ts 1000.
441	mut = NewMutation()
442	mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval
443	if err := table.Apply(ctx, "testrow", mut); err != nil {
444		t.Fatalf("Mutating row: %v", err)
445	}
446	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
447	if err != nil {
448		t.Fatalf("Reading row: %v", err)
449	}
450	wantRow = Row{"ts": []ReadItem{
451		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
452		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
453		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
454		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
455	}}
456	if !testutil.Equal(r, wantRow) {
457		t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow)
458	}
459
460	// Check DeleteCellsInFamily
461	if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil {
462		t.Fatalf("Creating column family: %v", err)
463	}
464
465	mut = NewMutation()
466	mut.Set("status", "start", 2000, []byte("2"))
467	mut.Set("status", "end", 3000, []byte("3"))
468	mut.Set("ts", "col", 1000, []byte("1"))
469	if err := table.Apply(ctx, "row1", mut); err != nil {
470		t.Fatalf("Mutating row: %v", err)
471	}
472	if err := table.Apply(ctx, "row2", mut); err != nil {
473		t.Fatalf("Mutating row: %v", err)
474	}
475
476	mut = NewMutation()
477	mut.DeleteCellsInFamily("status")
478	if err := table.Apply(ctx, "row1", mut); err != nil {
479		t.Fatalf("Delete cf: %v", err)
480	}
481
482	// ColumnFamily removed
483	r, err = table.ReadRow(ctx, "row1")
484	if err != nil {
485		t.Fatalf("Reading row: %v", err)
486	}
487	wantRow = Row{"ts": []ReadItem{
488		{Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
489	}}
490	if !testutil.Equal(r, wantRow) {
491		t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow)
492	}
493
494	// ColumnFamily not removed
495	r, err = table.ReadRow(ctx, "row2")
496	if err != nil {
497		t.Fatalf("Reading row: %v", err)
498	}
499	wantRow = Row{
500		"ts": []ReadItem{
501			{Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
502		},
503		"status": []ReadItem{
504			{Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")},
505			{Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
506		},
507	}
508	if !testutil.Equal(r, wantRow) {
509		t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow)
510	}
511
512	// Check DeleteCellsInColumn
513	mut = NewMutation()
514	mut.Set("status", "start", 2000, []byte("2"))
515	mut.Set("status", "middle", 3000, []byte("3"))
516	mut.Set("status", "end", 1000, []byte("1"))
517	if err := table.Apply(ctx, "row3", mut); err != nil {
518		t.Fatalf("Mutating row: %v", err)
519	}
520	mut = NewMutation()
521	mut.DeleteCellsInColumn("status", "middle")
522	if err := table.Apply(ctx, "row3", mut); err != nil {
523		t.Fatalf("Delete column: %v", err)
524	}
525	r, err = table.ReadRow(ctx, "row3")
526	if err != nil {
527		t.Fatalf("Reading row: %v", err)
528	}
529	wantRow = Row{
530		"status": []ReadItem{
531			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
532			{Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
533		},
534	}
535	if !testutil.Equal(r, wantRow) {
536		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
537	}
538	mut = NewMutation()
539	mut.DeleteCellsInColumn("status", "start")
540	if err := table.Apply(ctx, "row3", mut); err != nil {
541		t.Fatalf("Delete column: %v", err)
542	}
543	r, err = table.ReadRow(ctx, "row3")
544	if err != nil {
545		t.Fatalf("Reading row: %v", err)
546	}
547	wantRow = Row{
548		"status": []ReadItem{
549			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
550		},
551	}
552	if !testutil.Equal(r, wantRow) {
553		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
554	}
555	mut = NewMutation()
556	mut.DeleteCellsInColumn("status", "end")
557	if err := table.Apply(ctx, "row3", mut); err != nil {
558		t.Fatalf("Delete column: %v", err)
559	}
560	r, err = table.ReadRow(ctx, "row3")
561	if err != nil {
562		t.Fatalf("Reading row: %v", err)
563	}
564	if len(r) != 0 {
565		t.Fatalf("Delete column: got %v, want empty row", r)
566	}
567	// Add same cell after delete
568	mut = NewMutation()
569	mut.Set("status", "end", 1000, []byte("1"))
570	if err := table.Apply(ctx, "row3", mut); err != nil {
571		t.Fatalf("Mutating row: %v", err)
572	}
573	r, err = table.ReadRow(ctx, "row3")
574	if err != nil {
575		t.Fatalf("Reading row: %v", err)
576	}
577	if !testutil.Equal(r, wantRow) {
578		t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow)
579	}
580}
581
582func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) {
583	ctx := context.Background()
584	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
585	if err != nil {
586		t.Fatal(err)
587	}
588	defer cleanup()
589
590	if err := populatePresidentsGraph(table); err != nil {
591		t.Fatal(err)
592	}
593
594	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
595		t.Fatalf("Creating column family: %v", err)
596	}
597
598	// Do highly concurrent reads/writes.
599	const maxConcurrency = 1000
600	var wg sync.WaitGroup
601	for i := 0; i < maxConcurrency; i++ {
602		wg.Add(1)
603		go func() {
604			defer wg.Done()
605			switch r := rand.Intn(100); { // r ∈ [0,100)
606			case 0 <= r && r < 30:
607				// Do a read.
608				_, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1)))
609				if err != nil {
610					t.Errorf("Concurrent read: %v", err)
611				}
612			case 30 <= r && r < 100:
613				// Do a write.
614				mut := NewMutation()
615				mut.Set("ts", "col", 1000, []byte("data"))
616				if err := table.Apply(ctx, "testrow", mut); err != nil {
617					t.Errorf("Concurrent write: %v", err)
618				}
619			}
620		}()
621	}
622	wg.Wait()
623}
624
625func TestIntegration_LargeReadsWritesAndScans(t *testing.T) {
626	ctx := context.Background()
627	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
628	if err != nil {
629		t.Fatal(err)
630	}
631	defer cleanup()
632
633	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
634		t.Fatalf("Creating column family: %v", err)
635	}
636
637	bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set.
638	nonsense := []byte("lorem ipsum dolor sit amet, ")
639	fill(bigBytes, nonsense)
640	mut := NewMutation()
641	mut.Set("ts", "col", 1000, bigBytes)
642	if err := table.Apply(ctx, "bigrow", mut); err != nil {
643		t.Fatalf("Big write: %v", err)
644	}
645	r, err := table.ReadRow(ctx, "bigrow")
646	if err != nil {
647		t.Fatalf("Big read: %v", err)
648	}
649	wantRow := Row{"ts": []ReadItem{
650		{Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes},
651	}}
652	if !testutil.Equal(r, wantRow) {
653		t.Fatalf("Big read returned incorrect bytes: %v", r)
654	}
655
656	var wg sync.WaitGroup
657	// Now write 1000 rows, each with 82 KB values, then scan them all.
658	medBytes := make([]byte, 82<<10)
659	fill(medBytes, nonsense)
660	sem := make(chan int, 50) // do up to 50 mutations at a time.
661	for i := 0; i < 1000; i++ {
662		mut := NewMutation()
663		mut.Set("ts", "big-scan", 1000, medBytes)
664		row := fmt.Sprintf("row-%d", i)
665		wg.Add(1)
666		go func() {
667			defer wg.Done()
668			defer func() { <-sem }()
669			sem <- 1
670			if err := table.Apply(ctx, row, mut); err != nil {
671				t.Errorf("Preparing large scan: %v", err)
672			}
673		}()
674	}
675	wg.Wait()
676	n := 0
677	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
678		for _, ris := range r {
679			for _, ri := range ris {
680				n += len(ri.Value)
681			}
682		}
683		return true
684	}, RowFilter(ColumnFilter("big-scan")))
685	if err != nil {
686		t.Fatalf("Doing large scan: %v", err)
687	}
688	if want := 1000 * len(medBytes); n != want {
689		t.Fatalf("Large scan returned %d bytes, want %d", n, want)
690	}
691	// Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption.
692	rc := 0
693	wantRc := 3
694	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
695		rc++
696		return true
697	}, LimitRows(int64(wantRc)))
698	if err != nil {
699		t.Fatal(err)
700	}
701	if rc != wantRc {
702		t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc)
703	}
704
705	// Test bulk mutations
706	if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil {
707		t.Fatalf("Creating column family: %v", err)
708	}
709	bulkData := map[string][]string{
710		"red sox":  {"2004", "2007", "2013"},
711		"patriots": {"2001", "2003", "2004", "2014"},
712		"celtics":  {"1981", "1984", "1986", "2008"},
713	}
714	var rowKeys []string
715	var muts []*Mutation
716	for row, ss := range bulkData {
717		mut := NewMutation()
718		for _, name := range ss {
719			mut.Set("bulk", name, 1000, []byte("1"))
720		}
721		rowKeys = append(rowKeys, row)
722		muts = append(muts, mut)
723	}
724	status, err := table.ApplyBulk(ctx, rowKeys, muts)
725	if err != nil {
726		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
727	}
728	if status != nil {
729		t.Fatalf("non-nil errors: %v", err)
730	}
731
732	// Read each row back
733	for rowKey, ss := range bulkData {
734		row, err := table.ReadRow(ctx, rowKey)
735		if err != nil {
736			t.Fatalf("Reading a bulk row: %v", err)
737		}
738		var wantItems []ReadItem
739		for _, val := range ss {
740			wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")})
741		}
742		wantRow := Row{"bulk": wantItems}
743		if !testutil.Equal(row, wantRow) {
744			t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
745		}
746	}
747
748	// Test bulk write errors.
749	// Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error.
750	badMut := NewMutation()
751	badMut.Set("badfamily", "col", ServerTime, nil)
752	badMut2 := NewMutation()
753	badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1"))
754	status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2})
755	if err != nil {
756		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
757	}
758	if status == nil {
759		t.Fatalf("No errors for bad bulk mutation")
760	} else if status[0] == nil || status[1] == nil {
761		t.Fatalf("No error for bad bulk mutation")
762	}
763}
764
765func TestIntegration_Read(t *testing.T) {
766	ctx := context.Background()
767	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
768	if err != nil {
769		t.Fatal(err)
770	}
771	defer cleanup()
772
773	// Insert some data.
774	initialData := map[string][]string{
775		"wmckinley":   {"tjefferson"},
776		"gwashington": {"jadams"},
777		"tjefferson":  {"gwashington", "jadams", "wmckinley"},
778		"jadams":      {"gwashington", "tjefferson"},
779	}
780	for row, ss := range initialData {
781		mut := NewMutation()
782		for _, name := range ss {
783			mut.Set("follows", name, 1000, []byte("1"))
784		}
785		if err := table.Apply(ctx, row, mut); err != nil {
786			t.Fatalf("Mutating row %q: %v", row, err)
787		}
788	}
789
790	for _, test := range []struct {
791		desc   string
792		rr     RowSet
793		filter Filter     // may be nil
794		limit  ReadOption // may be nil
795
796		// We do the read, grab all the cells, turn them into "<row>-<col>-<val>",
797		// and join with a comma.
798		want       string
799		wantLabels []string
800	}{
801		{
802			desc: "read all, unfiltered",
803			rr:   RowRange{},
804			want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
805		},
806		{
807			desc: "read with InfiniteRange, unfiltered",
808			rr:   InfiniteRange("tjefferson"),
809			want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
810		},
811		{
812			desc: "read with NewRange, unfiltered",
813			rr:   NewRange("gargamel", "hubbard"),
814			want: "gwashington-jadams-1",
815		},
816		{
817			desc: "read with PrefixRange, unfiltered",
818			rr:   PrefixRange("jad"),
819			want: "jadams-gwashington-1,jadams-tjefferson-1",
820		},
821		{
822			desc: "read with SingleRow, unfiltered",
823			rr:   SingleRow("wmckinley"),
824			want: "wmckinley-tjefferson-1",
825		},
826		{
827			desc:   "read all, with ColumnFilter",
828			rr:     RowRange{},
829			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
830			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
831		},
832		{
833			desc:   "read all, with ColumnFilter, prefix",
834			rr:     RowRange{},
835			filter: ColumnFilter("j"), // no matches
836			want:   "",
837		},
838		{
839			desc:   "read range, with ColumnRangeFilter",
840			rr:     RowRange{},
841			filter: ColumnRangeFilter("follows", "h", "k"),
842			want:   "gwashington-jadams-1,tjefferson-jadams-1",
843		},
844		{
845			desc:   "read range from empty, with ColumnRangeFilter",
846			rr:     RowRange{},
847			filter: ColumnRangeFilter("follows", "", "u"),
848			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
849		},
850		{
851			desc:   "read range from start to empty, with ColumnRangeFilter",
852			rr:     RowRange{},
853			filter: ColumnRangeFilter("follows", "h", ""),
854			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
855		},
856		{
857			desc:   "read with RowKeyFilter",
858			rr:     RowRange{},
859			filter: RowKeyFilter(".*wash.*"),
860			want:   "gwashington-jadams-1",
861		},
862		{
863			desc:   "read with RowKeyFilter, prefix",
864			rr:     RowRange{},
865			filter: RowKeyFilter("gwash"),
866			want:   "",
867		},
868		{
869			desc:   "read with RowKeyFilter, no matches",
870			rr:     RowRange{},
871			filter: RowKeyFilter(".*xxx.*"),
872			want:   "",
873		},
874		{
875			desc:   "read with FamilyFilter, no matches",
876			rr:     RowRange{},
877			filter: FamilyFilter(".*xxx.*"),
878			want:   "",
879		},
880		{
881			desc:   "read with ColumnFilter + row limit",
882			rr:     RowRange{},
883			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
884			limit:  LimitRows(2),
885			want:   "gwashington-jadams-1,jadams-tjefferson-1",
886		},
887		{
888			desc:       "apply labels to the result rows",
889			rr:         RowRange{},
890			filter:     LabelFilter("test-label"),
891			limit:      LimitRows(2),
892			want:       "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1",
893			wantLabels: []string{"test-label", "test-label", "test-label"},
894		},
895		{
896			desc:   "read all, strip values",
897			rr:     RowRange{},
898			filter: StripValueFilter(),
899			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
900		},
901		{
902			desc:   "read with ColumnFilter + row limit + strip values",
903			rr:     RowRange{},
904			filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson"
905			limit:  LimitRows(2),
906			want:   "gwashington-jadams-,jadams-tjefferson-",
907		},
908		{
909			desc:   "read with condition, strip values on true",
910			rr:     RowRange{},
911			filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil),
912			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
913		},
914		{
915			desc:   "read with condition, strip values on false",
916			rr:     RowRange{},
917			filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()),
918			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
919		},
920		{
921			desc:   "read with ValueRangeFilter + row limit",
922			rr:     RowRange{},
923			filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1"
924			limit:  LimitRows(2),
925			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1",
926		},
927		{
928			desc:   "read with ValueRangeFilter, no match on exclusive end",
929			rr:     RowRange{},
930			filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match
931			want:   "",
932		},
933		{
934			desc:   "read with ValueRangeFilter, no matches",
935			rr:     RowRange{},
936			filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing
937			want:   "",
938		},
939		{
940			desc:   "read with InterleaveFilter, no matches on all filters",
941			rr:     RowRange{},
942			filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")),
943			want:   "",
944		},
945		{
946			desc:   "read with InterleaveFilter, no duplicate cells",
947			rr:     RowRange{},
948			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")),
949			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
950		},
951		{
952			desc:   "read with InterleaveFilter, with duplicate cells",
953			rr:     RowRange{},
954			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")),
955			want:   "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1",
956		},
957		{
958			desc: "read with a RowRangeList and no filter",
959			rr:   RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")},
960			want: "gwashington-jadams-1,wmckinley-tjefferson-1",
961		},
962		{
963			desc:   "chain that excludes rows and matches nothing, in a condition",
964			rr:     RowRange{},
965			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil),
966			want:   "",
967		},
968		{
969			desc:   "chain that ends with an interleave that has no match. covers #804",
970			rr:     RowRange{},
971			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil),
972			want:   "",
973		},
974	} {
975		t.Run(test.desc, func(t *testing.T) {
976			var opts []ReadOption
977			if test.filter != nil {
978				opts = append(opts, RowFilter(test.filter))
979			}
980			if test.limit != nil {
981				opts = append(opts, test.limit)
982			}
983			var elt, labels []string
984			err := table.ReadRows(ctx, test.rr, func(r Row) bool {
985				for _, ris := range r {
986					for _, ri := range ris {
987						labels = append(labels, ri.Labels...)
988						elt = append(elt, formatReadItem(ri))
989					}
990				}
991				return true
992			}, opts...)
993			if err != nil {
994				t.Fatal(err)
995			}
996			if got := strings.Join(elt, ","); got != test.want {
997				t.Fatalf("got %q\nwant %q", got, test.want)
998			}
999			if got, want := labels, test.wantLabels; !reflect.DeepEqual(got, want) {
1000				t.Fatalf("got %q\nwant %q", got, want)
1001			}
1002		})
1003	}
1004}
1005
1006func TestIntegration_SampleRowKeys(t *testing.T) {
1007	ctx := context.Background()
1008	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
1009	if err != nil {
1010		t.Fatal(err)
1011	}
1012	defer cleanup()
1013
1014	// Insert some data.
1015	initialData := map[string][]string{
1016		"wmckinley11":   {"tjefferson11"},
1017		"gwashington77": {"jadams77"},
1018		"tjefferson0":   {"gwashington0", "jadams0"},
1019	}
1020
1021	for row, ss := range initialData {
1022		mut := NewMutation()
1023		for _, name := range ss {
1024			mut.Set("follows", name, 1000, []byte("1"))
1025		}
1026		if err := table.Apply(ctx, row, mut); err != nil {
1027			t.Fatalf("Mutating row %q: %v", row, err)
1028		}
1029	}
1030	sampleKeys, err := table.SampleRowKeys(context.Background())
1031	if err != nil {
1032		t.Fatalf("%s: %v", "SampleRowKeys:", err)
1033	}
1034	if len(sampleKeys) == 0 {
1035		t.Error("SampleRowKeys length 0")
1036	}
1037}
1038
1039func TestIntegration_Admin(t *testing.T) {
1040	testEnv, err := NewIntegrationEnv()
1041	if err != nil {
1042		t.Fatalf("IntegrationEnv: %v", err)
1043	}
1044	defer testEnv.Close()
1045
1046	timeout := 2 * time.Second
1047	if testEnv.Config().UseProd {
1048		timeout = 5 * time.Minute
1049	}
1050	ctx, _ := context.WithTimeout(context.Background(), timeout)
1051
1052	adminClient, err := testEnv.NewAdminClient()
1053	if err != nil {
1054		t.Fatalf("NewAdminClient: %v", err)
1055	}
1056	defer adminClient.Close()
1057
1058	iAdminClient, err := testEnv.NewInstanceAdminClient()
1059	if err != nil {
1060		t.Fatalf("NewInstanceAdminClient: %v", err)
1061	}
1062	if iAdminClient != nil {
1063		defer iAdminClient.Close()
1064
1065		iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1066		if err != nil {
1067			t.Errorf("InstanceInfo: %v", err)
1068		}
1069		if iInfo.Name != adminClient.instance {
1070			t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1071		}
1072	}
1073
1074	list := func() []string {
1075		tbls, err := adminClient.Tables(ctx)
1076		if err != nil {
1077			t.Fatalf("Fetching list of tables: %v", err)
1078		}
1079		sort.Strings(tbls)
1080		return tbls
1081	}
1082	containsAll := func(got, want []string) bool {
1083		gotSet := make(map[string]bool)
1084
1085		for _, s := range got {
1086			gotSet[s] = true
1087		}
1088		for _, s := range want {
1089			if !gotSet[s] {
1090				return false
1091			}
1092		}
1093		return true
1094	}
1095
1096	defer deleteTable(ctx, t, adminClient, "mytable")
1097
1098	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1099		t.Fatalf("Creating table: %v", err)
1100	}
1101
1102	defer deleteTable(ctx, t, adminClient, "myothertable")
1103
1104	if err := adminClient.CreateTable(ctx, "myothertable"); err != nil {
1105		t.Fatalf("Creating table: %v", err)
1106	}
1107
1108	if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) {
1109		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1110	}
1111
1112	must(adminClient.WaitForReplication(ctx, "mytable"))
1113
1114	if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil {
1115		t.Fatalf("Deleting table: %v", err)
1116	}
1117	tables := list()
1118	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1119		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1120	}
1121	if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) {
1122		t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted)
1123	}
1124
1125	tblConf := TableConf{
1126		TableID: "conftable",
1127		Families: map[string]GCPolicy{
1128			"fam1": MaxVersionsPolicy(1),
1129			"fam2": MaxVersionsPolicy(2),
1130		},
1131	}
1132	if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
1133		t.Fatalf("Creating table from TableConf: %v", err)
1134	}
1135	defer deleteTable(ctx, t, adminClient, tblConf.TableID)
1136
1137	tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID)
1138	if err != nil {
1139		t.Fatalf("Getting table info: %v", err)
1140	}
1141	sort.Strings(tblInfo.Families)
1142	wantFams := []string{"fam1", "fam2"}
1143	if !testutil.Equal(tblInfo.Families, wantFams) {
1144		t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams)
1145	}
1146
1147	// Populate mytable and drop row ranges
1148	if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil {
1149		t.Fatalf("Creating column family: %v", err)
1150	}
1151
1152	client, err := testEnv.NewClient()
1153	if err != nil {
1154		t.Fatalf("NewClient: %v", err)
1155	}
1156	defer client.Close()
1157
1158	tbl := client.Open("mytable")
1159
1160	prefixes := []string{"a", "b", "c"}
1161	for _, prefix := range prefixes {
1162		for i := 0; i < 5; i++ {
1163			mut := NewMutation()
1164			mut.Set("cf", "col", 1000, []byte("1"))
1165			if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil {
1166				t.Fatalf("Mutating row: %v", err)
1167			}
1168		}
1169	}
1170
1171	if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil {
1172		t.Errorf("DropRowRange a: %v", err)
1173	}
1174	if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil {
1175		t.Errorf("DropRowRange c: %v", err)
1176	}
1177	if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil {
1178		t.Errorf("DropRowRange x: %v", err)
1179	}
1180
1181	var gotRowCount int
1182	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1183		gotRowCount++
1184		if !strings.HasPrefix(row.Key(), "b") {
1185			t.Errorf("Invalid row after dropping range: %v", row)
1186		}
1187		return true
1188	}))
1189	if gotRowCount != 5 {
1190		t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5)
1191	}
1192
1193	if err = adminClient.DropAllRows(ctx, "mytable"); err != nil {
1194		t.Errorf("DropAllRows mytable: %v", err)
1195	}
1196
1197	gotRowCount = 0
1198	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1199		gotRowCount++
1200		return true
1201	}))
1202	if gotRowCount != 0 {
1203		t.Errorf("Invalid row count after truncating table: got %v, want %v", gotRowCount, 0)
1204	}
1205}
1206
1207func TestIntegration_TableIam(t *testing.T) {
1208	testEnv, err := NewIntegrationEnv()
1209	if err != nil {
1210		t.Fatalf("IntegrationEnv: %v", err)
1211	}
1212	defer testEnv.Close()
1213
1214	if !testEnv.Config().UseProd {
1215		t.Skip("emulator doesn't support IAM Policy creation")
1216	}
1217
1218	timeout := 5 * time.Minute
1219	ctx, _ := context.WithTimeout(context.Background(), timeout)
1220
1221	adminClient, err := testEnv.NewAdminClient()
1222	if err != nil {
1223		t.Fatalf("NewAdminClient: %v", err)
1224	}
1225	defer adminClient.Close()
1226
1227	defer deleteTable(ctx, t, adminClient, "mytable")
1228	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1229		t.Fatalf("Creating table: %v", err)
1230	}
1231
1232	// Verify that the IAM Controls work for Tables.
1233	iamHandle := adminClient.TableIAM("mytable")
1234	p, err := iamHandle.Policy(ctx)
1235	if err != nil {
1236		t.Errorf("Iam GetPolicy mytable: %v", err)
1237	}
1238	if err = iamHandle.SetPolicy(ctx, p); err != nil {
1239		t.Errorf("Iam SetPolicy mytable: %v", err)
1240	}
1241	if _, err = iamHandle.TestPermissions(ctx, []string{"bigtable.tables.get"}); err != nil {
1242		t.Errorf("Iam TestPermissions mytable: %v", err)
1243	}
1244}
1245
1246func TestIntegration_AdminCreateInstance(t *testing.T) {
1247	if instanceToCreate == "" {
1248		t.Skip("instanceToCreate not set, skipping instance creation testing")
1249	}
1250
1251	testEnv, err := NewIntegrationEnv()
1252	if err != nil {
1253		t.Fatalf("IntegrationEnv: %v", err)
1254	}
1255	defer testEnv.Close()
1256
1257	if !testEnv.Config().UseProd {
1258		t.Skip("emulator doesn't support instance creation")
1259	}
1260
1261	timeout := 5 * time.Minute
1262	ctx, _ := context.WithTimeout(context.Background(), timeout)
1263
1264	iAdminClient, err := testEnv.NewInstanceAdminClient()
1265	if err != nil {
1266		t.Fatalf("NewInstanceAdminClient: %v", err)
1267	}
1268	defer iAdminClient.Close()
1269
1270	clusterID := instanceToCreate + "-cluster"
1271
1272	// Create a development instance
1273	conf := &InstanceConf{
1274		InstanceId:   instanceToCreate,
1275		ClusterId:    clusterID,
1276		DisplayName:  "test instance",
1277		Zone:         instanceToCreateZone,
1278		InstanceType: DEVELOPMENT,
1279		Labels:       map[string]string{"test-label-key": "test-label-value"},
1280	}
1281	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1282		t.Fatalf("CreateInstance: %v", err)
1283	}
1284	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1285
1286	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1287	if err != nil {
1288		t.Fatalf("InstanceInfo: %v", err)
1289	}
1290
1291	// Basic return values are tested elsewhere, check instance type
1292	if iInfo.InstanceType != DEVELOPMENT {
1293		t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType)
1294	}
1295	if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
1296		t.Fatalf("Labels: %v, want: %v", got, want)
1297	}
1298
1299	// Update everything we can about the instance in one call.
1300	confWithClusters := &InstanceWithClustersConfig{
1301		InstanceID:   instanceToCreate,
1302		DisplayName:  "new display name",
1303		InstanceType: PRODUCTION,
1304		Labels:       map[string]string{"new-label-key": "new-label-value"},
1305		Clusters: []ClusterConfig{
1306			{ClusterID: clusterID, NumNodes: 5}},
1307	}
1308
1309	if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil {
1310		t.Fatalf("UpdateInstanceWithClusters: %v", err)
1311	}
1312
1313	iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
1314	if err != nil {
1315		t.Fatalf("InstanceInfo: %v", err)
1316	}
1317
1318	if iInfo.InstanceType != PRODUCTION {
1319		t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType)
1320	}
1321	if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) {
1322		t.Fatalf("Labels: %v, want: %v", got, want)
1323	}
1324	if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
1325		t.Fatalf("Display name: %q, want: %q", got, want)
1326	}
1327
1328	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1329	if err != nil {
1330		t.Fatalf("GetCluster: %v", err)
1331	}
1332
1333	if cInfo.ServeNodes != 5 {
1334		t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
1335	}
1336}
1337
1338func TestIntegration_AdminUpdateInstanceLabels(t *testing.T) {
1339
1340	// Check the environments
1341	if instanceToCreate == "" {
1342		t.Skip("instanceToCreate not set, skipping instance creation testing")
1343	}
1344	testEnv, err := NewIntegrationEnv()
1345	if err != nil {
1346		t.Fatalf("IntegrationEnv: %v", err)
1347	}
1348	defer testEnv.Close()
1349	if !testEnv.Config().UseProd {
1350		t.Skip("emulator doesn't support instance creation")
1351	}
1352
1353	// Create an instance admin client
1354	timeout := 5 * time.Minute
1355	ctx, _ := context.WithTimeout(context.Background(), timeout)
1356	iAdminClient, err := testEnv.NewInstanceAdminClient()
1357	if err != nil {
1358		t.Fatalf("NewInstanceAdminClient: %v", err)
1359	}
1360	defer iAdminClient.Close()
1361
1362	// Create a test instance
1363	conf := &InstanceConf{
1364		InstanceId:   instanceToCreate,
1365		ClusterId:    instanceToCreate + "-cluster",
1366		DisplayName:  "test instance",
1367		InstanceType: DEVELOPMENT,
1368		Zone:         instanceToCreateZone,
1369	}
1370	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1371		t.Fatalf("CreateInstance: %v", err)
1372	}
1373	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1374
1375	// Check the created test instances
1376	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1377	if err != nil {
1378		t.Fatalf("InstanceInfo: %v", err)
1379	}
1380	if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
1381		t.Fatalf("Labels: %v, want: %v", got, want)
1382	}
1383
1384	// Test patterns to update instance labels
1385	tests := []struct {
1386		name string
1387		in   map[string]string
1388		out  map[string]string
1389	}{
1390		{
1391			name: "update labels",
1392			in:   map[string]string{"test-label-key": "test-label-value"},
1393			out:  map[string]string{"test-label-key": "test-label-value"},
1394		},
1395		{
1396			name: "update multiple labels",
1397			in:   map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
1398			out:  map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
1399		},
1400		{
1401			name: "not update existing labels",
1402			in:   nil, // nil map
1403			out:  map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
1404		},
1405		{
1406			name: "delete labels",
1407			in:   map[string]string{}, // empty map
1408			out:  nil,
1409		},
1410	}
1411	for _, tt := range tests {
1412		t.Run(tt.name, func(t *testing.T) {
1413			confWithClusters := &InstanceWithClustersConfig{
1414				InstanceID: instanceToCreate,
1415				Labels:     tt.in,
1416			}
1417			if err := iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil {
1418				t.Fatalf("UpdateInstanceWithClusters: %v", err)
1419			}
1420			iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1421			if err != nil {
1422				t.Fatalf("InstanceInfo: %v", err)
1423			}
1424			if got, want := iInfo.Labels, tt.out; !cmp.Equal(got, want) {
1425				t.Fatalf("Labels: %v, want: %v", got, want)
1426			}
1427		})
1428	}
1429}
1430
1431func TestIntegration_AdminUpdateInstanceAndSyncClusters(t *testing.T) {
1432	if instanceToCreate == "" {
1433		t.Skip("instanceToCreate not set, skipping instance update testing")
1434	}
1435
1436	testEnv, err := NewIntegrationEnv()
1437	if err != nil {
1438		t.Fatalf("IntegrationEnv: %v", err)
1439	}
1440	defer testEnv.Close()
1441
1442	if !testEnv.Config().UseProd {
1443		t.Skip("emulator doesn't support instance creation")
1444	}
1445
1446	timeout := 5 * time.Minute
1447	ctx, _ := context.WithTimeout(context.Background(), timeout)
1448
1449	iAdminClient, err := testEnv.NewInstanceAdminClient()
1450	if err != nil {
1451		t.Fatalf("NewInstanceAdminClient: %v", err)
1452	}
1453	defer iAdminClient.Close()
1454
1455	clusterID := instanceToCreate + "-cluster"
1456
1457	// Create a development instance
1458	conf := &InstanceConf{
1459		InstanceId:   instanceToCreate,
1460		ClusterId:    clusterID,
1461		DisplayName:  "test instance",
1462		Zone:         instanceToCreateZone,
1463		InstanceType: DEVELOPMENT,
1464		Labels:       map[string]string{"test-label-key": "test-label-value"},
1465	}
1466	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1467		t.Fatalf("CreateInstance: %v", err)
1468	}
1469	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1470
1471	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1472	if err != nil {
1473		t.Fatalf("InstanceInfo: %v", err)
1474	}
1475
1476	// Basic return values are tested elsewhere, check instance type
1477	if iInfo.InstanceType != DEVELOPMENT {
1478		t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType)
1479	}
1480	if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
1481		t.Fatalf("Labels: %v, want: %v", got, want)
1482	}
1483
1484	// Update everything we can about the instance in one call.
1485	confWithClusters := &InstanceWithClustersConfig{
1486		InstanceID:   instanceToCreate,
1487		DisplayName:  "new display name",
1488		InstanceType: PRODUCTION,
1489		Labels:       map[string]string{"new-label-key": "new-label-value"},
1490		Clusters: []ClusterConfig{
1491			{ClusterID: clusterID, NumNodes: 5}},
1492	}
1493
1494	results, err := UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1495	if err != nil {
1496		t.Fatalf("UpdateInstanceAndSyncClusters: %v", err)
1497	}
1498
1499	wantResults := UpdateInstanceResults{
1500		InstanceUpdated: true,
1501		UpdatedClusters: []string{clusterID},
1502	}
1503	if diff := testutil.Diff(*results, wantResults); diff != "" {
1504		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1505	}
1506
1507	iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
1508	if err != nil {
1509		t.Fatalf("InstanceInfo: %v", err)
1510	}
1511
1512	if iInfo.InstanceType != PRODUCTION {
1513		t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType)
1514	}
1515	if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) {
1516		t.Fatalf("Labels: %v, want: %v", got, want)
1517	}
1518	if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
1519		t.Fatalf("Display name: %q, want: %q", got, want)
1520	}
1521
1522	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1523	if err != nil {
1524		t.Fatalf("GetCluster: %v", err)
1525	}
1526
1527	if cInfo.ServeNodes != 5 {
1528		t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
1529	}
1530
1531	// Now add a second cluster as the only change. The first cluster must also be provided so
1532	// it is not removed.
1533	clusterID2 := clusterID + "-2"
1534	confWithClusters = &InstanceWithClustersConfig{
1535		InstanceID: instanceToCreate,
1536		Clusters: []ClusterConfig{
1537			{ClusterID: clusterID},
1538			{ClusterID: clusterID2, NumNodes: 3, StorageType: SSD, Zone: instanceToCreateZone2}},
1539	}
1540
1541	results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1542	if err != nil {
1543		t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err)
1544	}
1545
1546	wantResults = UpdateInstanceResults{
1547		InstanceUpdated: false,
1548		CreatedClusters: []string{clusterID2},
1549	}
1550	if diff := testutil.Diff(*results, wantResults); diff != "" {
1551		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1552	}
1553
1554	// Now update one cluster and delete the other
1555	confWithClusters = &InstanceWithClustersConfig{
1556		InstanceID: instanceToCreate,
1557		Clusters: []ClusterConfig{
1558			{ClusterID: clusterID, NumNodes: 4}},
1559	}
1560
1561	results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1562	if err != nil {
1563		t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err)
1564	}
1565
1566	wantResults = UpdateInstanceResults{
1567		InstanceUpdated: false,
1568		UpdatedClusters: []string{clusterID},
1569		DeletedClusters: []string{clusterID2},
1570	}
1571
1572	if diff := testutil.Diff(*results, wantResults); diff != "" {
1573		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1574	}
1575
1576	// Make sure the instance looks as we would expect
1577	clusters, err := iAdminClient.Clusters(ctx, conf.InstanceId)
1578	if err != nil {
1579		t.Fatalf("Clusters: %v", err)
1580	}
1581
1582	if len(clusters) != 1 {
1583		t.Fatalf("Clusters length %v, want: 1", len(clusters))
1584	}
1585
1586	wantCluster := &ClusterInfo{
1587		Name:       clusterID,
1588		Zone:       instanceToCreateZone,
1589		ServeNodes: 4,
1590		State:      "READY",
1591	}
1592	if diff := testutil.Diff(clusters[0], wantCluster); diff != "" {
1593		t.Fatalf("InstanceEquality: got - want +\n%s", diff)
1594	}
1595}
1596
1597func TestIntegration_AdminSnapshot(t *testing.T) {
1598	testEnv, err := NewIntegrationEnv()
1599	if err != nil {
1600		t.Fatalf("IntegrationEnv: %v", err)
1601	}
1602	defer testEnv.Close()
1603
1604	if !testEnv.Config().UseProd {
1605		t.Skip("emulator doesn't support snapshots")
1606	}
1607
1608	timeout := 2 * time.Second
1609	if testEnv.Config().UseProd {
1610		timeout = 5 * time.Minute
1611	}
1612	ctx, _ := context.WithTimeout(context.Background(), timeout)
1613
1614	adminClient, err := testEnv.NewAdminClient()
1615	if err != nil {
1616		t.Fatalf("NewAdminClient: %v", err)
1617	}
1618	defer adminClient.Close()
1619
1620	table := testEnv.Config().Table
1621	cluster := testEnv.Config().Cluster
1622
1623	list := func(cluster string) ([]*SnapshotInfo, error) {
1624		infos := []*SnapshotInfo(nil)
1625
1626		it := adminClient.Snapshots(ctx, cluster)
1627		for {
1628			s, err := it.Next()
1629			if err == iterator.Done {
1630				break
1631			}
1632			if err != nil {
1633				return nil, err
1634			}
1635			infos = append(infos, s)
1636		}
1637		return infos, err
1638	}
1639
1640	// Delete the table at the end of the test. Schedule ahead of time
1641	// in case the client fails
1642	defer deleteTable(ctx, t, adminClient, table)
1643
1644	if err := adminClient.CreateTable(ctx, table); err != nil {
1645		t.Fatalf("Creating table: %v", err)
1646	}
1647
1648	// Precondition: no snapshots
1649	snapshots, err := list(cluster)
1650	if err != nil {
1651		t.Fatalf("Initial snapshot list: %v", err)
1652	}
1653	if got, want := len(snapshots), 0; got != want {
1654		t.Fatalf("Initial snapshot list len: %d, want: %d", got, want)
1655	}
1656
1657	// Create snapshot
1658	defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot")
1659
1660	if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil {
1661		t.Fatalf("Creating snaphot: %v", err)
1662	}
1663
1664	// List snapshot
1665	snapshots, err = list(cluster)
1666	if err != nil {
1667		t.Fatalf("Listing snapshots: %v", err)
1668	}
1669	if got, want := len(snapshots), 1; got != want {
1670		t.Fatalf("Listing snapshot count: %d, want: %d", got, want)
1671	}
1672	if got, want := snapshots[0].Name, "mysnapshot"; got != want {
1673		t.Fatalf("Snapshot name: %s, want: %s", got, want)
1674	}
1675	if got, want := snapshots[0].SourceTable, table; got != want {
1676		t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want)
1677	}
1678	if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
1679		t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want)
1680	}
1681
1682	// Get snapshot
1683	snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot")
1684	if err != nil {
1685		t.Fatalf("SnapshotInfo: %v", snapshot)
1686	}
1687	if got, want := *snapshot, *snapshots[0]; got != want {
1688		t.Fatalf("SnapshotInfo: %v, want: %v", got, want)
1689	}
1690
1691	// Restore
1692	restoredTable := table + "-restored"
1693	defer deleteTable(ctx, t, adminClient, restoredTable)
1694	if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil {
1695		t.Fatalf("CreateTableFromSnapshot: %v", err)
1696	}
1697	if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
1698		t.Fatalf("Restored TableInfo: %v", err)
1699	}
1700
1701	// Delete snapshot
1702	if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil {
1703		t.Fatalf("DeleteSnapshot: %v", err)
1704	}
1705	snapshots, err = list(cluster)
1706	if err != nil {
1707		t.Fatalf("List after Delete: %v", err)
1708	}
1709	if got, want := len(snapshots), 0; got != want {
1710		t.Fatalf("List after delete len: %d, want: %d", got, want)
1711	}
1712}
1713
1714func TestIntegration_Granularity(t *testing.T) {
1715	testEnv, err := NewIntegrationEnv()
1716	if err != nil {
1717		t.Fatalf("IntegrationEnv: %v", err)
1718	}
1719	defer testEnv.Close()
1720
1721	timeout := 2 * time.Second
1722	if testEnv.Config().UseProd {
1723		timeout = 5 * time.Minute
1724	}
1725	ctx, _ := context.WithTimeout(context.Background(), timeout)
1726
1727	adminClient, err := testEnv.NewAdminClient()
1728	if err != nil {
1729		t.Fatalf("NewAdminClient: %v", err)
1730	}
1731	defer adminClient.Close()
1732
1733	list := func() []string {
1734		tbls, err := adminClient.Tables(ctx)
1735		if err != nil {
1736			t.Fatalf("Fetching list of tables: %v", err)
1737		}
1738		sort.Strings(tbls)
1739		return tbls
1740	}
1741	containsAll := func(got, want []string) bool {
1742		gotSet := make(map[string]bool)
1743
1744		for _, s := range got {
1745			gotSet[s] = true
1746		}
1747		for _, s := range want {
1748			if !gotSet[s] {
1749				return false
1750			}
1751		}
1752		return true
1753	}
1754
1755	defer deleteTable(ctx, t, adminClient, "mytable")
1756
1757	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1758		t.Fatalf("Creating table: %v", err)
1759	}
1760
1761	tables := list()
1762	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1763		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1764	}
1765
1766	// calling ModifyColumnFamilies to check the granularity of table
1767	prefix := adminClient.instancePrefix()
1768	req := &btapb.ModifyColumnFamiliesRequest{
1769		Name: prefix + "/tables/" + "mytable",
1770		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
1771			Id:  "cf",
1772			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}},
1773		}},
1774	}
1775	table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req)
1776	if err != nil {
1777		t.Fatalf("Creating column family: %v", err)
1778	}
1779	if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) {
1780		t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS))
1781	}
1782}
1783
1784func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) {
1785	testEnv, err := NewIntegrationEnv()
1786	if err != nil {
1787		t.Fatalf("IntegrationEnv: %v", err)
1788	}
1789	defer testEnv.Close()
1790
1791	timeout := 2 * time.Second
1792	if testEnv.Config().UseProd {
1793		timeout = 5 * time.Minute
1794	}
1795	ctx, cancel := context.WithTimeout(context.Background(), timeout)
1796	defer cancel()
1797
1798	adminClient, err := testEnv.NewAdminClient()
1799	if err != nil {
1800		t.Fatalf("NewAdminClient: %v", err)
1801	}
1802	defer adminClient.Close()
1803
1804	iAdminClient, err := testEnv.NewInstanceAdminClient()
1805	if err != nil {
1806		t.Fatalf("NewInstanceAdminClient: %v", err)
1807	}
1808
1809	if iAdminClient == nil {
1810		return
1811	}
1812
1813	defer iAdminClient.Close()
1814	profile := ProfileConf{
1815		ProfileID:     "app_profile1",
1816		InstanceID:    adminClient.instance,
1817		ClusterID:     testEnv.Config().Cluster,
1818		Description:   "creating new app profile 1",
1819		RoutingPolicy: SingleClusterRouting,
1820	}
1821
1822	createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile)
1823	if err != nil {
1824		t.Fatalf("Creating app profile: %v", err)
1825
1826	}
1827
1828	gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
1829
1830	if err != nil {
1831		t.Fatalf("Get app profile: %v", err)
1832	}
1833
1834	if !proto.Equal(createdProfile, gotProfile) {
1835		t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name)
1836
1837	}
1838
1839	list := func(instanceID string) ([]*btapb.AppProfile, error) {
1840		profiles := []*btapb.AppProfile(nil)
1841
1842		it := iAdminClient.ListAppProfiles(ctx, instanceID)
1843		for {
1844			s, err := it.Next()
1845			if err == iterator.Done {
1846				break
1847			}
1848			if err != nil {
1849				return nil, err
1850			}
1851			profiles = append(profiles, s)
1852		}
1853		return profiles, err
1854	}
1855
1856	profiles, err := list(adminClient.instance)
1857	if err != nil {
1858		t.Fatalf("List app profile: %v", err)
1859	}
1860
1861	if got, want := len(profiles), 1; got != want {
1862		t.Fatalf("Initial app profile list len: %d, want: %d", got, want)
1863	}
1864
1865	for _, test := range []struct {
1866		desc   string
1867		uattrs ProfileAttrsToUpdate
1868		want   *btapb.AppProfile // nil means error
1869	}{
1870		{
1871			desc:   "empty update",
1872			uattrs: ProfileAttrsToUpdate{},
1873			want:   nil,
1874		},
1875
1876		{
1877			desc:   "empty description update",
1878			uattrs: ProfileAttrsToUpdate{Description: ""},
1879			want: &btapb.AppProfile{
1880				Name:          gotProfile.Name,
1881				Description:   "",
1882				RoutingPolicy: gotProfile.RoutingPolicy,
1883				Etag:          gotProfile.Etag},
1884		},
1885		{
1886			desc: "routing update",
1887			uattrs: ProfileAttrsToUpdate{
1888				RoutingPolicy: SingleClusterRouting,
1889				ClusterID:     testEnv.Config().Cluster,
1890			},
1891			want: &btapb.AppProfile{
1892				Name:        gotProfile.Name,
1893				Description: "",
1894				Etag:        gotProfile.Etag,
1895				RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
1896					SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
1897						ClusterId: testEnv.Config().Cluster,
1898					}},
1899			},
1900		},
1901	} {
1902		err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs)
1903		if err != nil {
1904			if test.want != nil {
1905				t.Errorf("%s: %v", test.desc, err)
1906			}
1907			continue
1908		}
1909		if err == nil && test.want == nil {
1910			t.Errorf("%s: got nil, want error", test.desc)
1911			continue
1912		}
1913
1914		got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
1915
1916		if !proto.Equal(got, test.want) {
1917			t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want)
1918		}
1919
1920	}
1921
1922	err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1")
1923	if err != nil {
1924		t.Fatalf("Delete app profile: %v", err)
1925	}
1926
1927}
1928
1929func TestIntegration_InstanceUpdate(t *testing.T) {
1930	testEnv, err := NewIntegrationEnv()
1931	if err != nil {
1932		t.Fatalf("IntegrationEnv: %v", err)
1933	}
1934	defer testEnv.Close()
1935
1936	timeout := 2 * time.Second
1937	if testEnv.Config().UseProd {
1938		timeout = 5 * time.Minute
1939	}
1940	ctx, cancel := context.WithTimeout(context.Background(), timeout)
1941	defer cancel()
1942
1943	adminClient, err := testEnv.NewAdminClient()
1944	if err != nil {
1945		t.Fatalf("NewAdminClient: %v", err)
1946	}
1947
1948	defer adminClient.Close()
1949
1950	iAdminClient, err := testEnv.NewInstanceAdminClient()
1951	if err != nil {
1952		t.Fatalf("NewInstanceAdminClient: %v", err)
1953	}
1954
1955	if iAdminClient == nil {
1956		return
1957	}
1958
1959	defer iAdminClient.Close()
1960
1961	iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1962	if err != nil {
1963		t.Errorf("InstanceInfo: %v", err)
1964	}
1965
1966	if iInfo.Name != adminClient.instance {
1967		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1968	}
1969
1970	if iInfo.DisplayName != adminClient.instance {
1971		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1972	}
1973
1974	const numNodes = 4
1975	// update cluster nodes
1976	if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil {
1977		t.Errorf("UpdateCluster: %v", err)
1978	}
1979
1980	// get cluster after updating
1981	cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster)
1982	if err != nil {
1983		t.Errorf("GetCluster %v", err)
1984	}
1985	if cis.ServeNodes != int(numNodes) {
1986		t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes))
1987	}
1988}
1989
1990func TestIntegration_AdminBackup(t *testing.T) {
1991	testEnv, err := NewIntegrationEnv()
1992	if err != nil {
1993		t.Fatalf("IntegrationEnv: %v", err)
1994	}
1995	defer testEnv.Close()
1996
1997	if !testEnv.Config().UseProd {
1998		t.Skip("emulator doesn't support backups")
1999	}
2000
2001	timeout := 5 * time.Minute
2002	ctx, _ := context.WithTimeout(context.Background(), timeout)
2003
2004	adminClient, err := testEnv.NewAdminClient()
2005	if err != nil {
2006		t.Fatalf("NewAdminClient: %v", err)
2007	}
2008	defer adminClient.Close()
2009
2010	table := testEnv.Config().Table
2011	cluster := testEnv.Config().Cluster
2012
2013	// Delete the table at the end of the test. Schedule ahead of time
2014	// in case the client fails
2015	defer deleteTable(ctx, t, adminClient, table)
2016
2017	list := func(cluster string) ([]*BackupInfo, error) {
2018		infos := []*BackupInfo(nil)
2019
2020		it := adminClient.Backups(ctx, cluster)
2021		for {
2022			s, err := it.Next()
2023			if err == iterator.Done {
2024				break
2025			}
2026			if err != nil {
2027				return nil, err
2028			}
2029			infos = append(infos, s)
2030		}
2031		return infos, err
2032	}
2033
2034	if err := adminClient.CreateTable(ctx, table); err != nil {
2035		t.Fatalf("Creating table: %v", err)
2036	}
2037
2038	// Precondition: no backups
2039	backups, err := list(cluster)
2040	if err != nil {
2041		t.Fatalf("Initial backup list: %v", err)
2042	}
2043	if got, want := len(backups), 0; got != want {
2044		t.Fatalf("Initial backup list len: %d, want: %d", got, want)
2045	}
2046
2047	// Create backup
2048	defer adminClient.DeleteBackup(ctx, cluster, "mybackup")
2049
2050	if err = adminClient.CreateBackup(ctx, table, cluster, "mybackup", time.Now().Add(8*time.Hour)); err != nil {
2051		t.Fatalf("Creating backup: %v", err)
2052	}
2053
2054	// List backup
2055	backups, err = list(cluster)
2056	if err != nil {
2057		t.Fatalf("Listing backups: %v", err)
2058	}
2059	if got, want := len(backups), 1; got != want {
2060		t.Fatalf("Listing backup count: %d, want: %d", got, want)
2061	}
2062	if got, want := backups[0].Name, "mybackup"; got != want {
2063		t.Fatalf("Backup name: %s, want: %s", got, want)
2064	}
2065	if got, want := backups[0].SourceTable, table; got != want {
2066		t.Fatalf("Backup SourceTable: %s, want: %s", got, want)
2067	}
2068	if got, want := backups[0].ExpireTime, backups[0].StartTime.Add(8*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
2069		t.Fatalf("Backup ExpireTime: %s, want: %s", got, want)
2070	}
2071
2072	// Get backup
2073	backup, err := adminClient.BackupInfo(ctx, cluster, "mybackup")
2074	if err != nil {
2075		t.Fatalf("BackupInfo: %v", backup)
2076	}
2077	if got, want := *backup, *backups[0]; got != want {
2078		t.Fatalf("BackupInfo: %v, want: %v", got, want)
2079	}
2080
2081	// Update backup
2082	newExpireTime := time.Now().Add(10 * time.Hour)
2083	err = adminClient.UpdateBackup(ctx, cluster, "mybackup", newExpireTime)
2084	if err != nil {
2085		t.Fatalf("UpdateBackup failed: %v", err)
2086	}
2087
2088	// Check that updated backup has the correct expire time
2089	updatedBackup, err := adminClient.BackupInfo(ctx, cluster, "mybackup")
2090	if err != nil {
2091		t.Fatalf("BackupInfo: %v", err)
2092	}
2093	backup.ExpireTime = newExpireTime
2094	// Server clock and local clock may not be perfectly sync'ed.
2095	if got, want := *updatedBackup, *backup; got.ExpireTime.Sub(want.ExpireTime) > time.Minute {
2096		t.Fatalf("BackupInfo: %v, want: %v", got, want)
2097	}
2098
2099	// Restore backup
2100	restoredTable := table + "-restored"
2101	defer deleteTable(ctx, t, adminClient, restoredTable)
2102	if err = adminClient.RestoreTable(ctx, restoredTable, cluster, "mybackup"); err != nil {
2103		t.Fatalf("RestoreTable: %v", err)
2104	}
2105	if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
2106		t.Fatalf("Restored TableInfo: %v", err)
2107	}
2108
2109	// Delete backup
2110	if err = adminClient.DeleteBackup(ctx, cluster, "mybackup"); err != nil {
2111		t.Fatalf("DeleteBackup: %v", err)
2112	}
2113	backups, err = list(cluster)
2114	if err != nil {
2115		t.Fatalf("List after Delete: %v", err)
2116	}
2117	if got, want := len(backups), 0; got != want {
2118		t.Fatalf("List after delete len: %d, want: %d", got, want)
2119	}
2120}
2121
2122func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
2123	testEnv, err := NewIntegrationEnv()
2124	if err != nil {
2125		return nil, nil, nil, "", nil, err
2126	}
2127
2128	var timeout time.Duration
2129	if testEnv.Config().UseProd {
2130		timeout = 10 * time.Minute
2131		t.Logf("Running test against production")
2132	} else {
2133		timeout = 5 * time.Minute
2134		t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint)
2135	}
2136	ctx, cancel := context.WithTimeout(ctx, timeout)
2137
2138	client, err := testEnv.NewClient()
2139	if err != nil {
2140		return nil, nil, nil, "", nil, err
2141	}
2142
2143	adminClient, err := testEnv.NewAdminClient()
2144	if err != nil {
2145		return nil, nil, nil, "", nil, err
2146	}
2147
2148	if testEnv.Config().UseProd {
2149		// TODO: tables may not be successfully deleted in some cases, and will
2150		// become obsolete. We may need a way to automatically delete them.
2151		tableName = tableNameSpace.New()
2152	} else {
2153		tableName = testEnv.Config().Table
2154	}
2155
2156	if err := adminClient.CreateTable(ctx, tableName); err != nil {
2157		cancel()
2158		return nil, nil, nil, "", nil, err
2159	}
2160	if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil {
2161		cancel()
2162		return nil, nil, nil, "", nil, err
2163	}
2164
2165	return client, adminClient, client.Open(tableName), tableName, func() {
2166		if err := adminClient.DeleteTable(ctx, tableName); err != nil {
2167			t.Errorf("DeleteTable got error %v", err)
2168		}
2169		cancel()
2170		client.Close()
2171		adminClient.Close()
2172	}, nil
2173}
2174
2175func formatReadItem(ri ReadItem) string {
2176	// Use the column qualifier only to make the test data briefer.
2177	col := ri.Column[strings.Index(ri.Column, ":")+1:]
2178	return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value)
2179}
2180
2181func fill(b, sub []byte) {
2182	for len(b) > len(sub) {
2183		n := copy(b, sub)
2184		b = b[n:]
2185	}
2186}
2187
2188func clearTimestamps(r Row) {
2189	for _, ris := range r {
2190		for i := range ris {
2191			ris[i].Timestamp = 0
2192		}
2193	}
2194}
2195
2196func deleteTable(ctx context.Context, t *testing.T, ac *AdminClient, name string) {
2197	bo := gax.Backoff{
2198		Initial:    100 * time.Millisecond,
2199		Max:        2 * time.Second,
2200		Multiplier: 1.2,
2201	}
2202	ctx, _ = context.WithTimeout(ctx, time.Second*30)
2203
2204	err := internal.Retry(ctx, bo, func() (bool, error) {
2205		err := ac.DeleteTable(ctx, name)
2206		if err != nil {
2207			return true, err
2208		}
2209		return false, nil
2210	})
2211	if err != nil {
2212		t.Logf("DeleteTable: %v", err)
2213	}
2214}
2215