1/*
2Copyright 2019 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"context"
21	"flag"
22	"fmt"
23	"math"
24	"math/rand"
25	"sort"
26	"strings"
27	"sync"
28	"testing"
29	"time"
30
31	"cloud.google.com/go/internal/testutil"
32	"cloud.google.com/go/internal/uid"
33	"github.com/golang/protobuf/proto"
34	"google.golang.org/api/iterator"
35	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
36)
37
38var (
39	presidentsSocialGraph = map[string][]string{
40		"wmckinley":   {"tjefferson"},
41		"gwashington": {"jadams"},
42		"tjefferson":  {"gwashington", "jadams"},
43		"jadams":      {"gwashington", "tjefferson"},
44	}
45
46	tableNameSpace = uid.NewSpace("cbt-test", &uid.Options{Short: true})
47)
48
49func populatePresidentsGraph(table *Table) error {
50	ctx := context.Background()
51	for row, ss := range presidentsSocialGraph {
52		mut := NewMutation()
53		for _, name := range ss {
54			mut.Set("follows", name, 1000, []byte("1"))
55		}
56		if err := table.Apply(ctx, row, mut); err != nil {
57			return fmt.Errorf("Mutating row %q: %v", row, err)
58		}
59	}
60	return nil
61}
62
63var instanceToCreate string
64var instanceToCreateZone string
65var instanceToCreateZone2 string
66
67func init() {
68	// Don't test instance creation by default, as quota is necessary and aborted tests could strand resources.
69	flag.StringVar(&instanceToCreate, "it.instance-to-create", "",
70		"The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.")
71	flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b",
72		"The zone in which to create the new test instance.")
73	flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c",
74		"The zone in which to create a second cluster in the test instance.")
75}
76
77func TestIntegration_ConditionalMutations(t *testing.T) {
78	ctx := context.Background()
79	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
80	if err != nil {
81		t.Fatal(err)
82	}
83	defer cleanup()
84
85	if err := populatePresidentsGraph(table); err != nil {
86		t.Fatal(err)
87	}
88
89	// Do a conditional mutation with a complex filter.
90	mutTrue := NewMutation()
91	mutTrue.Set("follows", "wmckinley", 1000, []byte("1"))
92	filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter("."))
93	mut := NewCondMutation(filter, mutTrue, nil)
94	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
95		t.Fatalf("Conditionally mutating row: %v", err)
96	}
97	// Do a second condition mutation with a filter that does not match,
98	// and thus no changes should be made.
99	mutTrue = NewMutation()
100	mutTrue.DeleteRow()
101	filter = ColumnFilter("snoop.dogg")
102	mut = NewCondMutation(filter, mutTrue, nil)
103	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
104		t.Fatalf("Conditionally mutating row: %v", err)
105	}
106
107	// Fetch a row.
108	row, err := table.ReadRow(ctx, "jadams")
109	if err != nil {
110		t.Fatalf("Reading a row: %v", err)
111	}
112	wantRow := Row{
113		"follows": []ReadItem{
114			{Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")},
115			{Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")},
116		},
117	}
118	if !testutil.Equal(row, wantRow) {
119		t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
120	}
121}
122
123func TestIntegration_PartialReadRows(t *testing.T) {
124	ctx := context.Background()
125	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
126	if err != nil {
127		t.Fatal(err)
128	}
129	defer cleanup()
130
131	if err := populatePresidentsGraph(table); err != nil {
132		t.Fatal(err)
133	}
134
135	// Do a scan and stop part way through.
136	// Verify that the ReadRows callback doesn't keep running.
137	stopped := false
138	err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
139		if r.Key() < "h" {
140			return true
141		}
142		if !stopped {
143			stopped = true
144			return false
145		}
146		t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key())
147		return false
148	})
149	if err != nil {
150		t.Fatalf("Partial ReadRows: %v", err)
151	}
152}
153
154func TestIntegration_ReadRowList(t *testing.T) {
155	ctx := context.Background()
156	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
157	if err != nil {
158		t.Fatal(err)
159	}
160	defer cleanup()
161
162	if err := populatePresidentsGraph(table); err != nil {
163		t.Fatal(err)
164	}
165
166	// Read a RowList
167	var elt []string
168	keys := RowList{"wmckinley", "gwashington", "jadams"}
169	want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1"
170	err = table.ReadRows(ctx, keys, func(r Row) bool {
171		for _, ris := range r {
172			for _, ri := range ris {
173				elt = append(elt, formatReadItem(ri))
174			}
175		}
176		return true
177	})
178	if err != nil {
179		t.Fatalf("read RowList: %v", err)
180	}
181
182	if got := strings.Join(elt, ","); got != want {
183		t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
184	}
185}
186
187func TestIntegration_DeleteRow(t *testing.T) {
188	ctx := context.Background()
189	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
190	if err != nil {
191		t.Fatal(err)
192	}
193	defer cleanup()
194
195	if err := populatePresidentsGraph(table); err != nil {
196		t.Fatal(err)
197	}
198
199	// Delete a row and check it goes away.
200	mut := NewMutation()
201	mut.DeleteRow()
202	if err := table.Apply(ctx, "wmckinley", mut); err != nil {
203		t.Fatalf("Apply DeleteRow: %v", err)
204	}
205	row, err := table.ReadRow(ctx, "wmckinley")
206	if err != nil {
207		t.Fatalf("Reading a row after DeleteRow: %v", err)
208	}
209	if len(row) != 0 {
210		t.Fatalf("Read non-zero row after DeleteRow: %v", row)
211	}
212}
213
214func TestIntegration_ReadModifyWrite(t *testing.T) {
215	ctx := context.Background()
216	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
217	if err != nil {
218		t.Fatal(err)
219	}
220	defer cleanup()
221
222	if err := populatePresidentsGraph(table); err != nil {
223		t.Fatal(err)
224	}
225
226	if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil {
227		t.Fatalf("Creating column family: %v", err)
228	}
229
230	appendRMW := func(b []byte) *ReadModifyWrite {
231		rmw := NewReadModifyWrite()
232		rmw.AppendValue("counter", "likes", b)
233		return rmw
234	}
235	incRMW := func(n int64) *ReadModifyWrite {
236		rmw := NewReadModifyWrite()
237		rmw.Increment("counter", "likes", n)
238		return rmw
239	}
240	rmwSeq := []struct {
241		desc string
242		rmw  *ReadModifyWrite
243		want []byte
244	}{
245		{
246			desc: "append #1",
247			rmw:  appendRMW([]byte{0, 0, 0}),
248			want: []byte{0, 0, 0},
249		},
250		{
251			desc: "append #2",
252			rmw:  appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17
253			want: []byte{0, 0, 0, 0, 0, 0, 0, 17},
254		},
255		{
256			desc: "increment",
257			rmw:  incRMW(8),
258			want: []byte{0, 0, 0, 0, 0, 0, 0, 25},
259		},
260	}
261	for _, step := range rmwSeq {
262		row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw)
263		if err != nil {
264			t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err)
265		}
266		// Make sure the modified cell returned by the RMW operation has a timestamp.
267		if row["counter"][0].Timestamp == 0 {
268			t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp)
269		}
270		clearTimestamps(row)
271		wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}}
272		if !testutil.Equal(row, wantRow) {
273			t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow)
274		}
275	}
276
277	// Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator.
278	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0}))
279	if err != nil {
280		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
281	}
282	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0}))
283	if err != nil {
284		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
285	}
286	// Get only the correct row back on read.
287	r, err := table.ReadRow(ctx, "issue-723-1")
288	if err != nil {
289		t.Fatalf("Reading row: %v", err)
290	}
291	if r.Key() != "issue-723-1" {
292		t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1")
293	}
294}
295
296func TestIntegration_ArbitraryTimestamps(t *testing.T) {
297	ctx := context.Background()
298	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
299	if err != nil {
300		t.Fatal(err)
301	}
302	defer cleanup()
303
304	// Test arbitrary timestamps more thoroughly.
305	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
306		t.Fatalf("Creating column family: %v", err)
307	}
308	const numVersions = 4
309	mut := NewMutation()
310	for i := 1; i < numVersions; i++ {
311		// Timestamps are used in thousands because the server
312		// only permits that granularity.
313		mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
314		mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
315	}
316	if err := table.Apply(ctx, "testrow", mut); err != nil {
317		t.Fatalf("Mutating row: %v", err)
318	}
319	r, err := table.ReadRow(ctx, "testrow")
320	if err != nil {
321		t.Fatalf("Reading row: %v", err)
322	}
323	wantRow := Row{"ts": []ReadItem{
324		// These should be returned in descending timestamp order.
325		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
326		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
327		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
328		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
329		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
330		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
331	}}
332	if !testutil.Equal(r, wantRow) {
333		t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow)
334	}
335
336	// Do the same read, but filter to the latest two versions.
337	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
338	if err != nil {
339		t.Fatalf("Reading row: %v", err)
340	}
341	wantRow = Row{"ts": []ReadItem{
342		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
343		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
344		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
345		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
346	}}
347	if !testutil.Equal(r, wantRow) {
348		t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
349	}
350	// Check cell offset / limit
351	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3)))
352	if err != nil {
353		t.Fatalf("Reading row: %v", err)
354	}
355	wantRow = Row{"ts": []ReadItem{
356		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
357		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
358		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
359	}}
360	if !testutil.Equal(r, wantRow) {
361		t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow)
362	}
363	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3)))
364	if err != nil {
365		t.Fatalf("Reading row: %v", err)
366	}
367	wantRow = Row{"ts": []ReadItem{
368		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
369		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
370		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
371	}}
372	if !testutil.Equal(r, wantRow) {
373		t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow)
374	}
375	// Check timestamp range filtering (with truncation)
376	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000)))
377	if err != nil {
378		t.Fatalf("Reading row: %v", err)
379	}
380	wantRow = Row{"ts": []ReadItem{
381		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
382		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
383		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
384		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
385	}}
386	if !testutil.Equal(r, wantRow) {
387		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow)
388	}
389	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0)))
390	if err != nil {
391		t.Fatalf("Reading row: %v", err)
392	}
393	wantRow = Row{"ts": []ReadItem{
394		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
395		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
396		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
397		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
398		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
399		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
400	}}
401	if !testutil.Equal(r, wantRow) {
402		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow)
403	}
404	// Delete non-existing cells, no such column family in this row
405	// Should not delete anything
406	if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil {
407		t.Fatalf("Creating column family: %v", err)
408	}
409	mut = NewMutation()
410	mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval
411	if err := table.Apply(ctx, "testrow", mut); err != nil {
412		t.Fatalf("Mutating row: %v", err)
413	}
414	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
415	if err != nil {
416		t.Fatalf("Reading row: %v", err)
417	}
418	if !testutil.Equal(r, wantRow) {
419		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
420	}
421	// Delete non-existing cells, no such column in this column family
422	// Should not delete anything
423	mut = NewMutation()
424	mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval
425	if err := table.Apply(ctx, "testrow", mut); err != nil {
426		t.Fatalf("Mutating row: %v", err)
427	}
428	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
429	if err != nil {
430		t.Fatalf("Reading row: %v", err)
431	}
432	if !testutil.Equal(r, wantRow) {
433		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
434	}
435	// Delete the cell with timestamp 2000 and repeat the last read,
436	// checking that we get ts 3000 and ts 1000.
437	mut = NewMutation()
438	mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval
439	if err := table.Apply(ctx, "testrow", mut); err != nil {
440		t.Fatalf("Mutating row: %v", err)
441	}
442	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
443	if err != nil {
444		t.Fatalf("Reading row: %v", err)
445	}
446	wantRow = Row{"ts": []ReadItem{
447		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
448		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
449		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
450		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
451	}}
452	if !testutil.Equal(r, wantRow) {
453		t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow)
454	}
455
456	// Check DeleteCellsInFamily
457	if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil {
458		t.Fatalf("Creating column family: %v", err)
459	}
460
461	mut = NewMutation()
462	mut.Set("status", "start", 2000, []byte("2"))
463	mut.Set("status", "end", 3000, []byte("3"))
464	mut.Set("ts", "col", 1000, []byte("1"))
465	if err := table.Apply(ctx, "row1", mut); err != nil {
466		t.Fatalf("Mutating row: %v", err)
467	}
468	if err := table.Apply(ctx, "row2", mut); err != nil {
469		t.Fatalf("Mutating row: %v", err)
470	}
471
472	mut = NewMutation()
473	mut.DeleteCellsInFamily("status")
474	if err := table.Apply(ctx, "row1", mut); err != nil {
475		t.Fatalf("Delete cf: %v", err)
476	}
477
478	// ColumnFamily removed
479	r, err = table.ReadRow(ctx, "row1")
480	if err != nil {
481		t.Fatalf("Reading row: %v", err)
482	}
483	wantRow = Row{"ts": []ReadItem{
484		{Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
485	}}
486	if !testutil.Equal(r, wantRow) {
487		t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow)
488	}
489
490	// ColumnFamily not removed
491	r, err = table.ReadRow(ctx, "row2")
492	if err != nil {
493		t.Fatalf("Reading row: %v", err)
494	}
495	wantRow = Row{
496		"ts": []ReadItem{
497			{Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
498		},
499		"status": []ReadItem{
500			{Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")},
501			{Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
502		},
503	}
504	if !testutil.Equal(r, wantRow) {
505		t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow)
506	}
507
508	// Check DeleteCellsInColumn
509	mut = NewMutation()
510	mut.Set("status", "start", 2000, []byte("2"))
511	mut.Set("status", "middle", 3000, []byte("3"))
512	mut.Set("status", "end", 1000, []byte("1"))
513	if err := table.Apply(ctx, "row3", mut); err != nil {
514		t.Fatalf("Mutating row: %v", err)
515	}
516	mut = NewMutation()
517	mut.DeleteCellsInColumn("status", "middle")
518	if err := table.Apply(ctx, "row3", mut); err != nil {
519		t.Fatalf("Delete column: %v", err)
520	}
521	r, err = table.ReadRow(ctx, "row3")
522	if err != nil {
523		t.Fatalf("Reading row: %v", err)
524	}
525	wantRow = Row{
526		"status": []ReadItem{
527			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
528			{Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
529		},
530	}
531	if !testutil.Equal(r, wantRow) {
532		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
533	}
534	mut = NewMutation()
535	mut.DeleteCellsInColumn("status", "start")
536	if err := table.Apply(ctx, "row3", mut); err != nil {
537		t.Fatalf("Delete column: %v", err)
538	}
539	r, err = table.ReadRow(ctx, "row3")
540	if err != nil {
541		t.Fatalf("Reading row: %v", err)
542	}
543	wantRow = Row{
544		"status": []ReadItem{
545			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
546		},
547	}
548	if !testutil.Equal(r, wantRow) {
549		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
550	}
551	mut = NewMutation()
552	mut.DeleteCellsInColumn("status", "end")
553	if err := table.Apply(ctx, "row3", mut); err != nil {
554		t.Fatalf("Delete column: %v", err)
555	}
556	r, err = table.ReadRow(ctx, "row3")
557	if err != nil {
558		t.Fatalf("Reading row: %v", err)
559	}
560	if len(r) != 0 {
561		t.Fatalf("Delete column: got %v, want empty row", r)
562	}
563	// Add same cell after delete
564	mut = NewMutation()
565	mut.Set("status", "end", 1000, []byte("1"))
566	if err := table.Apply(ctx, "row3", mut); err != nil {
567		t.Fatalf("Mutating row: %v", err)
568	}
569	r, err = table.ReadRow(ctx, "row3")
570	if err != nil {
571		t.Fatalf("Reading row: %v", err)
572	}
573	if !testutil.Equal(r, wantRow) {
574		t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow)
575	}
576}
577
578func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) {
579	ctx := context.Background()
580	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
581	if err != nil {
582		t.Fatal(err)
583	}
584	defer cleanup()
585
586	if err := populatePresidentsGraph(table); err != nil {
587		t.Fatal(err)
588	}
589
590	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
591		t.Fatalf("Creating column family: %v", err)
592	}
593
594	// Do highly concurrent reads/writes.
595	const maxConcurrency = 1000
596	var wg sync.WaitGroup
597	for i := 0; i < maxConcurrency; i++ {
598		wg.Add(1)
599		go func() {
600			defer wg.Done()
601			switch r := rand.Intn(100); { // r ∈ [0,100)
602			case 0 <= r && r < 30:
603				// Do a read.
604				_, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1)))
605				if err != nil {
606					t.Errorf("Concurrent read: %v", err)
607				}
608			case 30 <= r && r < 100:
609				// Do a write.
610				mut := NewMutation()
611				mut.Set("ts", "col", 1000, []byte("data"))
612				if err := table.Apply(ctx, "testrow", mut); err != nil {
613					t.Errorf("Concurrent write: %v", err)
614				}
615			}
616		}()
617	}
618	wg.Wait()
619}
620
621func TestIntegration_LargeReadsWritesAndScans(t *testing.T) {
622	ctx := context.Background()
623	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
624	if err != nil {
625		t.Fatal(err)
626	}
627	defer cleanup()
628
629	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
630		t.Fatalf("Creating column family: %v", err)
631	}
632
633	bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set.
634	nonsense := []byte("lorem ipsum dolor sit amet, ")
635	fill(bigBytes, nonsense)
636	mut := NewMutation()
637	mut.Set("ts", "col", 1000, bigBytes)
638	if err := table.Apply(ctx, "bigrow", mut); err != nil {
639		t.Fatalf("Big write: %v", err)
640	}
641	r, err := table.ReadRow(ctx, "bigrow")
642	if err != nil {
643		t.Fatalf("Big read: %v", err)
644	}
645	wantRow := Row{"ts": []ReadItem{
646		{Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes},
647	}}
648	if !testutil.Equal(r, wantRow) {
649		t.Fatalf("Big read returned incorrect bytes: %v", r)
650	}
651
652	var wg sync.WaitGroup
653	// Now write 1000 rows, each with 82 KB values, then scan them all.
654	medBytes := make([]byte, 82<<10)
655	fill(medBytes, nonsense)
656	sem := make(chan int, 50) // do up to 50 mutations at a time.
657	for i := 0; i < 1000; i++ {
658		mut := NewMutation()
659		mut.Set("ts", "big-scan", 1000, medBytes)
660		row := fmt.Sprintf("row-%d", i)
661		wg.Add(1)
662		go func() {
663			defer wg.Done()
664			defer func() { <-sem }()
665			sem <- 1
666			if err := table.Apply(ctx, row, mut); err != nil {
667				t.Errorf("Preparing large scan: %v", err)
668			}
669		}()
670	}
671	wg.Wait()
672	n := 0
673	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
674		for _, ris := range r {
675			for _, ri := range ris {
676				n += len(ri.Value)
677			}
678		}
679		return true
680	}, RowFilter(ColumnFilter("big-scan")))
681	if err != nil {
682		t.Fatalf("Doing large scan: %v", err)
683	}
684	if want := 1000 * len(medBytes); n != want {
685		t.Fatalf("Large scan returned %d bytes, want %d", n, want)
686	}
687	// Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption.
688	rc := 0
689	wantRc := 3
690	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
691		rc++
692		return true
693	}, LimitRows(int64(wantRc)))
694	if err != nil {
695		t.Fatal(err)
696	}
697	if rc != wantRc {
698		t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc)
699	}
700
701	// Test bulk mutations
702	if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil {
703		t.Fatalf("Creating column family: %v", err)
704	}
705	bulkData := map[string][]string{
706		"red sox":  {"2004", "2007", "2013"},
707		"patriots": {"2001", "2003", "2004", "2014"},
708		"celtics":  {"1981", "1984", "1986", "2008"},
709	}
710	var rowKeys []string
711	var muts []*Mutation
712	for row, ss := range bulkData {
713		mut := NewMutation()
714		for _, name := range ss {
715			mut.Set("bulk", name, 1000, []byte("1"))
716		}
717		rowKeys = append(rowKeys, row)
718		muts = append(muts, mut)
719	}
720	status, err := table.ApplyBulk(ctx, rowKeys, muts)
721	if err != nil {
722		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
723	}
724	if status != nil {
725		t.Fatalf("non-nil errors: %v", err)
726	}
727
728	// Read each row back
729	for rowKey, ss := range bulkData {
730		row, err := table.ReadRow(ctx, rowKey)
731		if err != nil {
732			t.Fatalf("Reading a bulk row: %v", err)
733		}
734		var wantItems []ReadItem
735		for _, val := range ss {
736			wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")})
737		}
738		wantRow := Row{"bulk": wantItems}
739		if !testutil.Equal(row, wantRow) {
740			t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
741		}
742	}
743
744	// Test bulk write errors.
745	// Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error.
746	badMut := NewMutation()
747	badMut.Set("badfamily", "col", ServerTime, nil)
748	badMut2 := NewMutation()
749	badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1"))
750	status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2})
751	if err != nil {
752		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
753	}
754	if status == nil {
755		t.Fatalf("No errors for bad bulk mutation")
756	} else if status[0] == nil || status[1] == nil {
757		t.Fatalf("No error for bad bulk mutation")
758	}
759}
760
761func TestIntegration_Read(t *testing.T) {
762	ctx := context.Background()
763	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
764	if err != nil {
765		t.Fatal(err)
766	}
767	defer cleanup()
768
769	// Insert some data.
770	initialData := map[string][]string{
771		"wmckinley":   {"tjefferson"},
772		"gwashington": {"jadams"},
773		"tjefferson":  {"gwashington", "jadams", "wmckinley"},
774		"jadams":      {"gwashington", "tjefferson"},
775	}
776	for row, ss := range initialData {
777		mut := NewMutation()
778		for _, name := range ss {
779			mut.Set("follows", name, 1000, []byte("1"))
780		}
781		if err := table.Apply(ctx, row, mut); err != nil {
782			t.Fatalf("Mutating row %q: %v", row, err)
783		}
784	}
785
786	for _, test := range []struct {
787		desc   string
788		rr     RowSet
789		filter Filter     // may be nil
790		limit  ReadOption // may be nil
791
792		// We do the read, grab all the cells, turn them into "<row>-<col>-<val>",
793		// and join with a comma.
794		want string
795	}{
796		{
797			desc: "read all, unfiltered",
798			rr:   RowRange{},
799			want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
800		},
801		{
802			desc: "read with InfiniteRange, unfiltered",
803			rr:   InfiniteRange("tjefferson"),
804			want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
805		},
806		{
807			desc: "read with NewRange, unfiltered",
808			rr:   NewRange("gargamel", "hubbard"),
809			want: "gwashington-jadams-1",
810		},
811		{
812			desc: "read with PrefixRange, unfiltered",
813			rr:   PrefixRange("jad"),
814			want: "jadams-gwashington-1,jadams-tjefferson-1",
815		},
816		{
817			desc: "read with SingleRow, unfiltered",
818			rr:   SingleRow("wmckinley"),
819			want: "wmckinley-tjefferson-1",
820		},
821		{
822			desc:   "read all, with ColumnFilter",
823			rr:     RowRange{},
824			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
825			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
826		},
827		{
828			desc:   "read all, with ColumnFilter, prefix",
829			rr:     RowRange{},
830			filter: ColumnFilter("j"), // no matches
831			want:   "",
832		},
833		{
834			desc:   "read range, with ColumnRangeFilter",
835			rr:     RowRange{},
836			filter: ColumnRangeFilter("follows", "h", "k"),
837			want:   "gwashington-jadams-1,tjefferson-jadams-1",
838		},
839		{
840			desc:   "read range from empty, with ColumnRangeFilter",
841			rr:     RowRange{},
842			filter: ColumnRangeFilter("follows", "", "u"),
843			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
844		},
845		{
846			desc:   "read range from start to empty, with ColumnRangeFilter",
847			rr:     RowRange{},
848			filter: ColumnRangeFilter("follows", "h", ""),
849			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
850		},
851		{
852			desc:   "read with RowKeyFilter",
853			rr:     RowRange{},
854			filter: RowKeyFilter(".*wash.*"),
855			want:   "gwashington-jadams-1",
856		},
857		{
858			desc:   "read with RowKeyFilter, prefix",
859			rr:     RowRange{},
860			filter: RowKeyFilter("gwash"),
861			want:   "",
862		},
863		{
864			desc:   "read with RowKeyFilter, no matches",
865			rr:     RowRange{},
866			filter: RowKeyFilter(".*xxx.*"),
867			want:   "",
868		},
869		{
870			desc:   "read with FamilyFilter, no matches",
871			rr:     RowRange{},
872			filter: FamilyFilter(".*xxx.*"),
873			want:   "",
874		},
875		{
876			desc:   "read with ColumnFilter + row limit",
877			rr:     RowRange{},
878			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
879			limit:  LimitRows(2),
880			want:   "gwashington-jadams-1,jadams-tjefferson-1",
881		},
882		{
883			desc:   "read all, strip values",
884			rr:     RowRange{},
885			filter: StripValueFilter(),
886			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
887		},
888		{
889			desc:   "read with ColumnFilter + row limit + strip values",
890			rr:     RowRange{},
891			filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson"
892			limit:  LimitRows(2),
893			want:   "gwashington-jadams-,jadams-tjefferson-",
894		},
895		{
896			desc:   "read with condition, strip values on true",
897			rr:     RowRange{},
898			filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil),
899			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
900		},
901		{
902			desc:   "read with condition, strip values on false",
903			rr:     RowRange{},
904			filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()),
905			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
906		},
907		{
908			desc:   "read with ValueRangeFilter + row limit",
909			rr:     RowRange{},
910			filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1"
911			limit:  LimitRows(2),
912			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1",
913		},
914		{
915			desc:   "read with ValueRangeFilter, no match on exclusive end",
916			rr:     RowRange{},
917			filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match
918			want:   "",
919		},
920		{
921			desc:   "read with ValueRangeFilter, no matches",
922			rr:     RowRange{},
923			filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing
924			want:   "",
925		},
926		{
927			desc:   "read with InterleaveFilter, no matches on all filters",
928			rr:     RowRange{},
929			filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")),
930			want:   "",
931		},
932		{
933			desc:   "read with InterleaveFilter, no duplicate cells",
934			rr:     RowRange{},
935			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")),
936			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
937		},
938		{
939			desc:   "read with InterleaveFilter, with duplicate cells",
940			rr:     RowRange{},
941			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")),
942			want:   "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1",
943		},
944		{
945			desc: "read with a RowRangeList and no filter",
946			rr:   RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")},
947			want: "gwashington-jadams-1,wmckinley-tjefferson-1",
948		},
949		{
950			desc:   "chain that excludes rows and matches nothing, in a condition",
951			rr:     RowRange{},
952			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil),
953			want:   "",
954		},
955		{
956			desc:   "chain that ends with an interleave that has no match. covers #804",
957			rr:     RowRange{},
958			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil),
959			want:   "",
960		},
961	} {
962		t.Run(test.desc, func(t *testing.T) {
963			var opts []ReadOption
964			if test.filter != nil {
965				opts = append(opts, RowFilter(test.filter))
966			}
967			if test.limit != nil {
968				opts = append(opts, test.limit)
969			}
970			var elt []string
971			err := table.ReadRows(ctx, test.rr, func(r Row) bool {
972				for _, ris := range r {
973					for _, ri := range ris {
974						elt = append(elt, formatReadItem(ri))
975					}
976				}
977				return true
978			}, opts...)
979			if err != nil {
980				t.Fatal(err)
981			}
982			if got := strings.Join(elt, ","); got != test.want {
983				t.Fatalf("got %q\nwant %q", got, test.want)
984			}
985		})
986	}
987}
988
989func TestIntegration_SampleRowKeys(t *testing.T) {
990	ctx := context.Background()
991	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
992	if err != nil {
993		t.Fatal(err)
994	}
995	defer cleanup()
996
997	// Insert some data.
998	initialData := map[string][]string{
999		"wmckinley11":   {"tjefferson11"},
1000		"gwashington77": {"jadams77"},
1001		"tjefferson0":   {"gwashington0", "jadams0"},
1002	}
1003
1004	for row, ss := range initialData {
1005		mut := NewMutation()
1006		for _, name := range ss {
1007			mut.Set("follows", name, 1000, []byte("1"))
1008		}
1009		if err := table.Apply(ctx, row, mut); err != nil {
1010			t.Fatalf("Mutating row %q: %v", row, err)
1011		}
1012	}
1013	sampleKeys, err := table.SampleRowKeys(context.Background())
1014	if err != nil {
1015		t.Fatalf("%s: %v", "SampleRowKeys:", err)
1016	}
1017	if len(sampleKeys) == 0 {
1018		t.Error("SampleRowKeys length 0")
1019	}
1020}
1021
1022func TestIntegration_Admin(t *testing.T) {
1023	testEnv, err := NewIntegrationEnv()
1024	if err != nil {
1025		t.Fatalf("IntegrationEnv: %v", err)
1026	}
1027	defer testEnv.Close()
1028
1029	timeout := 2 * time.Second
1030	if testEnv.Config().UseProd {
1031		timeout = 5 * time.Minute
1032	}
1033	ctx, _ := context.WithTimeout(context.Background(), timeout)
1034
1035	adminClient, err := testEnv.NewAdminClient()
1036	if err != nil {
1037		t.Fatalf("NewAdminClient: %v", err)
1038	}
1039	defer adminClient.Close()
1040
1041	iAdminClient, err := testEnv.NewInstanceAdminClient()
1042	if err != nil {
1043		t.Fatalf("NewInstanceAdminClient: %v", err)
1044	}
1045	if iAdminClient != nil {
1046		defer iAdminClient.Close()
1047
1048		iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1049		if err != nil {
1050			t.Errorf("InstanceInfo: %v", err)
1051		}
1052		if iInfo.Name != adminClient.instance {
1053			t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1054		}
1055	}
1056
1057	list := func() []string {
1058		tbls, err := adminClient.Tables(ctx)
1059		if err != nil {
1060			t.Fatalf("Fetching list of tables: %v", err)
1061		}
1062		sort.Strings(tbls)
1063		return tbls
1064	}
1065	containsAll := func(got, want []string) bool {
1066		gotSet := make(map[string]bool)
1067
1068		for _, s := range got {
1069			gotSet[s] = true
1070		}
1071		for _, s := range want {
1072			if !gotSet[s] {
1073				return false
1074			}
1075		}
1076		return true
1077	}
1078
1079	defer adminClient.DeleteTable(ctx, "mytable")
1080
1081	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1082		t.Fatalf("Creating table: %v", err)
1083	}
1084
1085	defer adminClient.DeleteTable(ctx, "myothertable")
1086
1087	if err := adminClient.CreateTable(ctx, "myothertable"); err != nil {
1088		t.Fatalf("Creating table: %v", err)
1089	}
1090
1091	if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) {
1092		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1093	}
1094
1095	must(adminClient.WaitForReplication(ctx, "mytable"))
1096
1097	if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil {
1098		t.Fatalf("Deleting table: %v", err)
1099	}
1100	tables := list()
1101	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1102		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1103	}
1104	if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) {
1105		t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted)
1106	}
1107
1108	tblConf := TableConf{
1109		TableID: "conftable",
1110		Families: map[string]GCPolicy{
1111			"fam1": MaxVersionsPolicy(1),
1112			"fam2": MaxVersionsPolicy(2),
1113		},
1114	}
1115	if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
1116		t.Fatalf("Creating table from TableConf: %v", err)
1117	}
1118	defer adminClient.DeleteTable(ctx, tblConf.TableID)
1119
1120	tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID)
1121	if err != nil {
1122		t.Fatalf("Getting table info: %v", err)
1123	}
1124	sort.Strings(tblInfo.Families)
1125	wantFams := []string{"fam1", "fam2"}
1126	if !testutil.Equal(tblInfo.Families, wantFams) {
1127		t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams)
1128	}
1129
1130	// Populate mytable and drop row ranges
1131	if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil {
1132		t.Fatalf("Creating column family: %v", err)
1133	}
1134
1135	client, err := testEnv.NewClient()
1136	if err != nil {
1137		t.Fatalf("NewClient: %v", err)
1138	}
1139	defer client.Close()
1140
1141	tbl := client.Open("mytable")
1142
1143	prefixes := []string{"a", "b", "c"}
1144	for _, prefix := range prefixes {
1145		for i := 0; i < 5; i++ {
1146			mut := NewMutation()
1147			mut.Set("cf", "col", 1000, []byte("1"))
1148			if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil {
1149				t.Fatalf("Mutating row: %v", err)
1150			}
1151		}
1152	}
1153
1154	if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil {
1155		t.Errorf("DropRowRange a: %v", err)
1156	}
1157	if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil {
1158		t.Errorf("DropRowRange c: %v", err)
1159	}
1160	if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil {
1161		t.Errorf("DropRowRange x: %v", err)
1162	}
1163
1164	var gotRowCount int
1165	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1166		gotRowCount++
1167		if !strings.HasPrefix(row.Key(), "b") {
1168			t.Errorf("Invalid row after dropping range: %v", row)
1169		}
1170		return true
1171	}))
1172	if gotRowCount != 5 {
1173		t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5)
1174	}
1175
1176	if err = adminClient.DropAllRows(ctx, "mytable"); err != nil {
1177		t.Errorf("DropAllRows mytable: %v", err)
1178	}
1179
1180	gotRowCount = 0
1181	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1182		gotRowCount++
1183		return true
1184	}))
1185	if gotRowCount != 0 {
1186		t.Errorf("Invalid row count after truncating table: got %v, want %v", gotRowCount, 0)
1187	}
1188}
1189
1190func TestIntegration_TableIam(t *testing.T) {
1191	testEnv, err := NewIntegrationEnv()
1192	if err != nil {
1193		t.Fatalf("IntegrationEnv: %v", err)
1194	}
1195	defer testEnv.Close()
1196
1197	if !testEnv.Config().UseProd {
1198		t.Skip("emulator doesn't support IAM Policy creation")
1199	}
1200
1201	timeout := 5 * time.Minute
1202	ctx, _ := context.WithTimeout(context.Background(), timeout)
1203
1204	adminClient, err := testEnv.NewAdminClient()
1205	if err != nil {
1206		t.Fatalf("NewAdminClient: %v", err)
1207	}
1208	defer adminClient.Close()
1209
1210	defer adminClient.DeleteTable(ctx, "mytable")
1211	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1212		t.Fatalf("Creating table: %v", err)
1213	}
1214
1215	// Verify that the IAM Controls work for Tables.
1216	iamHandle := adminClient.TableIAM("mytable")
1217	p, err := iamHandle.Policy(ctx)
1218	if err != nil {
1219		t.Errorf("Iam GetPolicy mytable: %v", err)
1220	}
1221	if err = iamHandle.SetPolicy(ctx, p); err != nil {
1222		t.Errorf("Iam SetPolicy mytable: %v", err)
1223	}
1224	if _, err = iamHandle.TestPermissions(ctx, []string{"bigtable.tables.get"}); err != nil {
1225		t.Errorf("Iam TestPermissions mytable: %v", err)
1226	}
1227}
1228
1229func TestIntegration_AdminCreateInstance(t *testing.T) {
1230	if instanceToCreate == "" {
1231		t.Skip("instanceToCreate not set, skipping instance creation testing")
1232	}
1233
1234	testEnv, err := NewIntegrationEnv()
1235	if err != nil {
1236		t.Fatalf("IntegrationEnv: %v", err)
1237	}
1238	defer testEnv.Close()
1239
1240	if !testEnv.Config().UseProd {
1241		t.Skip("emulator doesn't support instance creation")
1242	}
1243
1244	timeout := 5 * time.Minute
1245	ctx, _ := context.WithTimeout(context.Background(), timeout)
1246
1247	iAdminClient, err := testEnv.NewInstanceAdminClient()
1248	if err != nil {
1249		t.Fatalf("NewInstanceAdminClient: %v", err)
1250	}
1251	defer iAdminClient.Close()
1252
1253	clusterID := instanceToCreate + "-cluster"
1254
1255	// Create a development instance
1256	conf := &InstanceConf{
1257		InstanceId:   instanceToCreate,
1258		ClusterId:    clusterID,
1259		DisplayName:  "test instance",
1260		Zone:         instanceToCreateZone,
1261		InstanceType: DEVELOPMENT,
1262	}
1263	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1264		t.Fatalf("CreateInstance: %v", err)
1265	}
1266	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1267
1268	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1269	if err != nil {
1270		t.Fatalf("InstanceInfo: %v", err)
1271	}
1272
1273	// Basic return values are tested elsewhere, check instance type
1274	if iInfo.InstanceType != DEVELOPMENT {
1275		t.Fatalf("Instance is not DEVELOPMENT: %v", err)
1276	}
1277
1278	// Update everything we can about the instance in one call.
1279	confWithClusters := &InstanceWithClustersConfig{
1280		InstanceID:   instanceToCreate,
1281		DisplayName:  "new display name",
1282		InstanceType: PRODUCTION,
1283		Clusters: []ClusterConfig{
1284			{ClusterID: clusterID, NumNodes: 5, StorageType: HDD}},
1285	}
1286
1287	if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil {
1288		t.Fatalf("UpdateInstanceWithClusters: %v", err)
1289	}
1290
1291	iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
1292	if err != nil {
1293		t.Fatalf("InstanceInfo: %v", err)
1294	}
1295
1296	if iInfo.InstanceType != PRODUCTION {
1297		t.Fatalf("Instance type is not PRODUCTION: %v", err)
1298	}
1299	if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
1300		t.Fatalf("Display name: %q, want: %q", got, want)
1301	}
1302
1303	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1304	if err != nil {
1305		t.Fatalf("GetCluster: %v", err)
1306	}
1307
1308	if cInfo.ServeNodes != 5 {
1309		t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
1310	}
1311
1312	if cInfo.StorageType != HDD {
1313		t.Fatalf("StorageType: %v, want: %v", cInfo.StorageType, HDD)
1314	}
1315}
1316
1317func TestIntegration_AdminUpdateInstanceAndSyncClusters(t *testing.T) {
1318	if instanceToCreate == "" {
1319		t.Skip("instanceToCreate not set, skipping instance update testing")
1320	}
1321
1322	testEnv, err := NewIntegrationEnv()
1323	if err != nil {
1324		t.Fatalf("IntegrationEnv: %v", err)
1325	}
1326	defer testEnv.Close()
1327
1328	if !testEnv.Config().UseProd {
1329		t.Skip("emulator doesn't support instance creation")
1330	}
1331
1332	timeout := 5 * time.Minute
1333	ctx, _ := context.WithTimeout(context.Background(), timeout)
1334
1335	iAdminClient, err := testEnv.NewInstanceAdminClient()
1336	if err != nil {
1337		t.Fatalf("NewInstanceAdminClient: %v", err)
1338	}
1339	defer iAdminClient.Close()
1340
1341	clusterID := instanceToCreate + "-cluster"
1342
1343	// Create a development instance
1344	conf := &InstanceConf{
1345		InstanceId:   instanceToCreate,
1346		ClusterId:    clusterID,
1347		DisplayName:  "test instance",
1348		Zone:         instanceToCreateZone,
1349		InstanceType: DEVELOPMENT,
1350	}
1351	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1352		t.Fatalf("CreateInstance: %v", err)
1353	}
1354	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1355
1356	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1357	if err != nil {
1358		t.Fatalf("InstanceInfo: %v", err)
1359	}
1360
1361	// Basic return values are tested elsewhere, check instance type
1362	if iInfo.InstanceType != DEVELOPMENT {
1363		t.Fatalf("Instance is not DEVELOPMENT: %v", err)
1364	}
1365
1366	// Update everything we can about the instance in one call.
1367	confWithClusters := &InstanceWithClustersConfig{
1368		InstanceID:   instanceToCreate,
1369		DisplayName:  "new display name",
1370		InstanceType: PRODUCTION,
1371		Clusters: []ClusterConfig{
1372			{ClusterID: clusterID, NumNodes: 5}},
1373	}
1374
1375	results, err := UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1376	if err != nil {
1377		t.Fatalf("UpdateInstanceAndSyncClusters: %v", err)
1378	}
1379
1380	wantResults := UpdateInstanceResults{
1381		InstanceUpdated: true,
1382		UpdatedClusters: []string{clusterID},
1383	}
1384	if diff := testutil.Diff(*results, wantResults); diff != "" {
1385		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1386	}
1387
1388	iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
1389	if err != nil {
1390		t.Fatalf("InstanceInfo: %v", err)
1391	}
1392
1393	if iInfo.InstanceType != PRODUCTION {
1394		t.Fatalf("Instance type is not PRODUCTION: %v", err)
1395	}
1396	if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
1397		t.Fatalf("Display name: %q, want: %q", got, want)
1398	}
1399
1400	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1401	if err != nil {
1402		t.Fatalf("GetCluster: %v", err)
1403	}
1404
1405	if cInfo.ServeNodes != 5 {
1406		t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
1407	}
1408
1409	// Now add a second cluster as the only change. The first cluster must also be provided so
1410	// it is not removed.
1411	clusterID2 := clusterID + "-2"
1412	confWithClusters = &InstanceWithClustersConfig{
1413		InstanceID: instanceToCreate,
1414		Clusters: []ClusterConfig{
1415			{ClusterID: clusterID},
1416			{ClusterID: clusterID2, NumNodes: 3, StorageType: SSD, Zone: instanceToCreateZone2}},
1417	}
1418
1419	results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1420	if err != nil {
1421		t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err)
1422	}
1423
1424	wantResults = UpdateInstanceResults{
1425		InstanceUpdated: false,
1426		CreatedClusters: []string{clusterID2},
1427	}
1428	if diff := testutil.Diff(*results, wantResults); diff != "" {
1429		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1430	}
1431
1432	// Now update one cluster and delete the other
1433	confWithClusters = &InstanceWithClustersConfig{
1434		InstanceID: instanceToCreate,
1435		Clusters: []ClusterConfig{
1436			{ClusterID: clusterID, NumNodes: 4}},
1437	}
1438
1439	results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1440	if err != nil {
1441		t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err)
1442	}
1443
1444	wantResults = UpdateInstanceResults{
1445		InstanceUpdated: false,
1446		UpdatedClusters: []string{clusterID},
1447		DeletedClusters: []string{clusterID2},
1448	}
1449
1450	if diff := testutil.Diff(*results, wantResults); diff != "" {
1451		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1452	}
1453
1454	// Make sure the instance looks as we would expect
1455	clusters, err := iAdminClient.Clusters(ctx, conf.InstanceId)
1456	if err != nil {
1457		t.Fatalf("Clusters: %v", err)
1458	}
1459
1460	if len(clusters) != 1 {
1461		t.Fatalf("Clusters length %v, want: 1", len(clusters))
1462	}
1463
1464	wantCluster := &ClusterInfo{
1465		Name:       clusterID,
1466		Zone:       instanceToCreateZone,
1467		ServeNodes: 4,
1468		State:      "READY",
1469	}
1470	if diff := testutil.Diff(clusters[0], wantCluster); diff != "" {
1471		t.Fatalf("InstanceEquality: got - want +\n%s", diff)
1472	}
1473}
1474
1475func TestIntegration_AdminSnapshot(t *testing.T) {
1476	testEnv, err := NewIntegrationEnv()
1477	if err != nil {
1478		t.Fatalf("IntegrationEnv: %v", err)
1479	}
1480	defer testEnv.Close()
1481
1482	if !testEnv.Config().UseProd {
1483		t.Skip("emulator doesn't support snapshots")
1484	}
1485
1486	timeout := 2 * time.Second
1487	if testEnv.Config().UseProd {
1488		timeout = 5 * time.Minute
1489	}
1490	ctx, _ := context.WithTimeout(context.Background(), timeout)
1491
1492	adminClient, err := testEnv.NewAdminClient()
1493	if err != nil {
1494		t.Fatalf("NewAdminClient: %v", err)
1495	}
1496	defer adminClient.Close()
1497
1498	table := testEnv.Config().Table
1499	cluster := testEnv.Config().Cluster
1500
1501	list := func(cluster string) ([]*SnapshotInfo, error) {
1502		infos := []*SnapshotInfo(nil)
1503
1504		it := adminClient.Snapshots(ctx, cluster)
1505		for {
1506			s, err := it.Next()
1507			if err == iterator.Done {
1508				break
1509			}
1510			if err != nil {
1511				return nil, err
1512			}
1513			infos = append(infos, s)
1514		}
1515		return infos, err
1516	}
1517
1518	// Delete the table at the end of the test. Schedule ahead of time
1519	// in case the client fails
1520	defer adminClient.DeleteTable(ctx, table)
1521
1522	if err := adminClient.CreateTable(ctx, table); err != nil {
1523		t.Fatalf("Creating table: %v", err)
1524	}
1525
1526	// Precondition: no snapshots
1527	snapshots, err := list(cluster)
1528	if err != nil {
1529		t.Fatalf("Initial snapshot list: %v", err)
1530	}
1531	if got, want := len(snapshots), 0; got != want {
1532		t.Fatalf("Initial snapshot list len: %d, want: %d", got, want)
1533	}
1534
1535	// Create snapshot
1536	defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot")
1537
1538	if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil {
1539		t.Fatalf("Creating snaphot: %v", err)
1540	}
1541
1542	// List snapshot
1543	snapshots, err = list(cluster)
1544	if err != nil {
1545		t.Fatalf("Listing snapshots: %v", err)
1546	}
1547	if got, want := len(snapshots), 1; got != want {
1548		t.Fatalf("Listing snapshot count: %d, want: %d", got, want)
1549	}
1550	if got, want := snapshots[0].Name, "mysnapshot"; got != want {
1551		t.Fatalf("Snapshot name: %s, want: %s", got, want)
1552	}
1553	if got, want := snapshots[0].SourceTable, table; got != want {
1554		t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want)
1555	}
1556	if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
1557		t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want)
1558	}
1559
1560	// Get snapshot
1561	snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot")
1562	if err != nil {
1563		t.Fatalf("SnapshotInfo: %v", snapshot)
1564	}
1565	if got, want := *snapshot, *snapshots[0]; got != want {
1566		t.Fatalf("SnapshotInfo: %v, want: %v", got, want)
1567	}
1568
1569	// Restore
1570	restoredTable := table + "-restored"
1571	defer adminClient.DeleteTable(ctx, restoredTable)
1572	if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil {
1573		t.Fatalf("CreateTableFromSnapshot: %v", err)
1574	}
1575	if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
1576		t.Fatalf("Restored TableInfo: %v", err)
1577	}
1578
1579	// Delete snapshot
1580	if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil {
1581		t.Fatalf("DeleteSnapshot: %v", err)
1582	}
1583	snapshots, err = list(cluster)
1584	if err != nil {
1585		t.Fatalf("List after Delete: %v", err)
1586	}
1587	if got, want := len(snapshots), 0; got != want {
1588		t.Fatalf("List after delete len: %d, want: %d", got, want)
1589	}
1590}
1591
1592func TestIntegration_Granularity(t *testing.T) {
1593	testEnv, err := NewIntegrationEnv()
1594	if err != nil {
1595		t.Fatalf("IntegrationEnv: %v", err)
1596	}
1597	defer testEnv.Close()
1598
1599	timeout := 2 * time.Second
1600	if testEnv.Config().UseProd {
1601		timeout = 5 * time.Minute
1602	}
1603	ctx, _ := context.WithTimeout(context.Background(), timeout)
1604
1605	adminClient, err := testEnv.NewAdminClient()
1606	if err != nil {
1607		t.Fatalf("NewAdminClient: %v", err)
1608	}
1609	defer adminClient.Close()
1610
1611	list := func() []string {
1612		tbls, err := adminClient.Tables(ctx)
1613		if err != nil {
1614			t.Fatalf("Fetching list of tables: %v", err)
1615		}
1616		sort.Strings(tbls)
1617		return tbls
1618	}
1619	containsAll := func(got, want []string) bool {
1620		gotSet := make(map[string]bool)
1621
1622		for _, s := range got {
1623			gotSet[s] = true
1624		}
1625		for _, s := range want {
1626			if !gotSet[s] {
1627				return false
1628			}
1629		}
1630		return true
1631	}
1632
1633	defer adminClient.DeleteTable(ctx, "mytable")
1634
1635	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1636		t.Fatalf("Creating table: %v", err)
1637	}
1638
1639	tables := list()
1640	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1641		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1642	}
1643
1644	// calling ModifyColumnFamilies to check the granularity of table
1645	prefix := adminClient.instancePrefix()
1646	req := &btapb.ModifyColumnFamiliesRequest{
1647		Name: prefix + "/tables/" + "mytable",
1648		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
1649			Id:  "cf",
1650			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}},
1651		}},
1652	}
1653	table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req)
1654	if err != nil {
1655		t.Fatalf("Creating column family: %v", err)
1656	}
1657	if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) {
1658		t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS))
1659	}
1660}
1661
1662func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) {
1663	testEnv, err := NewIntegrationEnv()
1664	if err != nil {
1665		t.Fatalf("IntegrationEnv: %v", err)
1666	}
1667	defer testEnv.Close()
1668
1669	timeout := 2 * time.Second
1670	if testEnv.Config().UseProd {
1671		timeout = 5 * time.Minute
1672	}
1673	ctx, cancel := context.WithTimeout(context.Background(), timeout)
1674	defer cancel()
1675
1676	adminClient, err := testEnv.NewAdminClient()
1677	if err != nil {
1678		t.Fatalf("NewAdminClient: %v", err)
1679	}
1680	defer adminClient.Close()
1681
1682	iAdminClient, err := testEnv.NewInstanceAdminClient()
1683	if err != nil {
1684		t.Fatalf("NewInstanceAdminClient: %v", err)
1685	}
1686
1687	if iAdminClient == nil {
1688		return
1689	}
1690
1691	defer iAdminClient.Close()
1692	profile := ProfileConf{
1693		ProfileID:     "app_profile1",
1694		InstanceID:    adminClient.instance,
1695		ClusterID:     testEnv.Config().Cluster,
1696		Description:   "creating new app profile 1",
1697		RoutingPolicy: SingleClusterRouting,
1698	}
1699
1700	createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile)
1701	if err != nil {
1702		t.Fatalf("Creating app profile: %v", err)
1703
1704	}
1705
1706	gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
1707
1708	if err != nil {
1709		t.Fatalf("Get app profile: %v", err)
1710	}
1711
1712	if !proto.Equal(createdProfile, gotProfile) {
1713		t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name)
1714
1715	}
1716
1717	list := func(instanceID string) ([]*btapb.AppProfile, error) {
1718		profiles := []*btapb.AppProfile(nil)
1719
1720		it := iAdminClient.ListAppProfiles(ctx, instanceID)
1721		for {
1722			s, err := it.Next()
1723			if err == iterator.Done {
1724				break
1725			}
1726			if err != nil {
1727				return nil, err
1728			}
1729			profiles = append(profiles, s)
1730		}
1731		return profiles, err
1732	}
1733
1734	profiles, err := list(adminClient.instance)
1735	if err != nil {
1736		t.Fatalf("List app profile: %v", err)
1737	}
1738
1739	if got, want := len(profiles), 1; got != want {
1740		t.Fatalf("Initial app profile list len: %d, want: %d", got, want)
1741	}
1742
1743	for _, test := range []struct {
1744		desc   string
1745		uattrs ProfileAttrsToUpdate
1746		want   *btapb.AppProfile // nil means error
1747	}{
1748		{
1749			desc:   "empty update",
1750			uattrs: ProfileAttrsToUpdate{},
1751			want:   nil,
1752		},
1753
1754		{
1755			desc:   "empty description update",
1756			uattrs: ProfileAttrsToUpdate{Description: ""},
1757			want: &btapb.AppProfile{
1758				Name:          gotProfile.Name,
1759				Description:   "",
1760				RoutingPolicy: gotProfile.RoutingPolicy,
1761				Etag:          gotProfile.Etag},
1762		},
1763		{
1764			desc: "routing update",
1765			uattrs: ProfileAttrsToUpdate{
1766				RoutingPolicy: SingleClusterRouting,
1767				ClusterID:     testEnv.Config().Cluster,
1768			},
1769			want: &btapb.AppProfile{
1770				Name:        gotProfile.Name,
1771				Description: "",
1772				Etag:        gotProfile.Etag,
1773				RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
1774					SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
1775						ClusterId: testEnv.Config().Cluster,
1776					}},
1777			},
1778		},
1779	} {
1780		err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs)
1781		if err != nil {
1782			if test.want != nil {
1783				t.Errorf("%s: %v", test.desc, err)
1784			}
1785			continue
1786		}
1787		if err == nil && test.want == nil {
1788			t.Errorf("%s: got nil, want error", test.desc)
1789			continue
1790		}
1791
1792		got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
1793
1794		if !proto.Equal(got, test.want) {
1795			t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want)
1796		}
1797
1798	}
1799
1800	err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1")
1801	if err != nil {
1802		t.Fatalf("Delete app profile: %v", err)
1803	}
1804
1805}
1806
1807func TestIntegration_InstanceUpdate(t *testing.T) {
1808	testEnv, err := NewIntegrationEnv()
1809	if err != nil {
1810		t.Fatalf("IntegrationEnv: %v", err)
1811	}
1812	defer testEnv.Close()
1813
1814	timeout := 2 * time.Second
1815	if testEnv.Config().UseProd {
1816		timeout = 5 * time.Minute
1817	}
1818	ctx, cancel := context.WithTimeout(context.Background(), timeout)
1819	defer cancel()
1820
1821	adminClient, err := testEnv.NewAdminClient()
1822	if err != nil {
1823		t.Fatalf("NewAdminClient: %v", err)
1824	}
1825
1826	defer adminClient.Close()
1827
1828	iAdminClient, err := testEnv.NewInstanceAdminClient()
1829	if err != nil {
1830		t.Fatalf("NewInstanceAdminClient: %v", err)
1831	}
1832
1833	if iAdminClient == nil {
1834		return
1835	}
1836
1837	defer iAdminClient.Close()
1838
1839	iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1840	if err != nil {
1841		t.Errorf("InstanceInfo: %v", err)
1842	}
1843
1844	if iInfo.Name != adminClient.instance {
1845		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1846	}
1847
1848	if iInfo.DisplayName != adminClient.instance {
1849		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1850	}
1851
1852	const numNodes = 4
1853	// update cluster nodes
1854	if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil {
1855		t.Errorf("UpdateCluster: %v", err)
1856	}
1857
1858	// get cluster after updating
1859	cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster)
1860	if err != nil {
1861		t.Errorf("GetCluster %v", err)
1862	}
1863	if cis.ServeNodes != int(numNodes) {
1864		t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes))
1865	}
1866}
1867
1868func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
1869	testEnv, err := NewIntegrationEnv()
1870	if err != nil {
1871		return nil, nil, nil, "", nil, err
1872	}
1873
1874	var timeout time.Duration
1875	if testEnv.Config().UseProd {
1876		timeout = 10 * time.Minute
1877		t.Logf("Running test against production")
1878	} else {
1879		timeout = 5 * time.Minute
1880		t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint)
1881	}
1882	ctx, cancel := context.WithTimeout(ctx, timeout)
1883
1884	client, err := testEnv.NewClient()
1885	if err != nil {
1886		return nil, nil, nil, "", nil, err
1887	}
1888
1889	adminClient, err := testEnv.NewAdminClient()
1890	if err != nil {
1891		return nil, nil, nil, "", nil, err
1892	}
1893
1894	if testEnv.Config().UseProd {
1895		// TODO: tables may not be successfully deleted in some cases, and will
1896		// become obsolete. We may need a way to automatically delete them.
1897		tableName = tableNameSpace.New()
1898	} else {
1899		tableName = testEnv.Config().Table
1900	}
1901
1902	if err := adminClient.CreateTable(ctx, tableName); err != nil {
1903		cancel()
1904		return nil, nil, nil, "", nil, err
1905	}
1906	if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil {
1907		cancel()
1908		return nil, nil, nil, "", nil, err
1909	}
1910
1911	return client, adminClient, client.Open(tableName), tableName, func() {
1912		if err := adminClient.DeleteTable(ctx, tableName); err != nil {
1913			t.Errorf("DeleteTable got error %v", err)
1914		}
1915		cancel()
1916		client.Close()
1917		adminClient.Close()
1918	}, nil
1919}
1920
1921func formatReadItem(ri ReadItem) string {
1922	// Use the column qualifier only to make the test data briefer.
1923	col := ri.Column[strings.Index(ri.Column, ":")+1:]
1924	return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value)
1925}
1926
1927func fill(b, sub []byte) {
1928	for len(b) > len(sub) {
1929		n := copy(b, sub)
1930		b = b[n:]
1931	}
1932}
1933
1934func clearTimestamps(r Row) {
1935	for _, ris := range r {
1936		for i := range ris {
1937			ris[i].Timestamp = 0
1938		}
1939	}
1940}
1941