1/*
2Copyright 2019 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"context"
21	"flag"
22	"fmt"
23	"math"
24	"math/rand"
25	"sort"
26	"strings"
27	"sync"
28	"testing"
29	"time"
30
31	"cloud.google.com/go/internal/testutil"
32	"github.com/golang/protobuf/proto"
33	"google.golang.org/api/iterator"
34	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
35)
36
37var presidentsSocialGraph = map[string][]string{
38	"wmckinley":   {"tjefferson"},
39	"gwashington": {"jadams"},
40	"tjefferson":  {"gwashington", "jadams"},
41	"jadams":      {"gwashington", "tjefferson"},
42}
43
44func populatePresidentsGraph(table *Table) error {
45	ctx := context.Background()
46	for row, ss := range presidentsSocialGraph {
47		mut := NewMutation()
48		for _, name := range ss {
49			mut.Set("follows", name, 1000, []byte("1"))
50		}
51		if err := table.Apply(ctx, row, mut); err != nil {
52			return fmt.Errorf("Mutating row %q: %v", row, err)
53		}
54	}
55	return nil
56}
57
58var instanceToCreate string
59var instanceToCreateZone string
60var instanceToCreateZone2 string
61
62func init() {
63	// Don't test instance creation by default, as quota is necessary and aborted tests could strand resources.
64	flag.StringVar(&instanceToCreate, "it.instance-to-create", "",
65		"The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.")
66	flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b",
67		"The zone in which to create the new test instance.")
68	flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c",
69		"The zone in which to create a second cluster in the test instance.")
70}
71
72func TestIntegration_ConditionalMutations(t *testing.T) {
73	ctx := context.Background()
74	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
75	if err != nil {
76		t.Fatal(err)
77	}
78	defer cleanup()
79
80	if err := populatePresidentsGraph(table); err != nil {
81		t.Fatal(err)
82	}
83
84	// Do a conditional mutation with a complex filter.
85	mutTrue := NewMutation()
86	mutTrue.Set("follows", "wmckinley", 1000, []byte("1"))
87	filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter("."))
88	mut := NewCondMutation(filter, mutTrue, nil)
89	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
90		t.Fatalf("Conditionally mutating row: %v", err)
91	}
92	// Do a second condition mutation with a filter that does not match,
93	// and thus no changes should be made.
94	mutTrue = NewMutation()
95	mutTrue.DeleteRow()
96	filter = ColumnFilter("snoop.dogg")
97	mut = NewCondMutation(filter, mutTrue, nil)
98	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
99		t.Fatalf("Conditionally mutating row: %v", err)
100	}
101
102	// Fetch a row.
103	row, err := table.ReadRow(ctx, "jadams")
104	if err != nil {
105		t.Fatalf("Reading a row: %v", err)
106	}
107	wantRow := Row{
108		"follows": []ReadItem{
109			{Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")},
110			{Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")},
111		},
112	}
113	if !testutil.Equal(row, wantRow) {
114		t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
115	}
116}
117
118func TestIntegration_PartialReadRows(t *testing.T) {
119	ctx := context.Background()
120	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
121	if err != nil {
122		t.Fatal(err)
123	}
124	defer cleanup()
125
126	if err := populatePresidentsGraph(table); err != nil {
127		t.Fatal(err)
128	}
129
130	// Do a scan and stop part way through.
131	// Verify that the ReadRows callback doesn't keep running.
132	stopped := false
133	err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
134		if r.Key() < "h" {
135			return true
136		}
137		if !stopped {
138			stopped = true
139			return false
140		}
141		t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key())
142		return false
143	})
144	if err != nil {
145		t.Fatalf("Partial ReadRows: %v", err)
146	}
147}
148
149func TestIntegration_ReadRowList(t *testing.T) {
150	ctx := context.Background()
151	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
152	if err != nil {
153		t.Fatal(err)
154	}
155	defer cleanup()
156
157	if err := populatePresidentsGraph(table); err != nil {
158		t.Fatal(err)
159	}
160
161	// Read a RowList
162	var elt []string
163	keys := RowList{"wmckinley", "gwashington", "jadams"}
164	want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1"
165	err = table.ReadRows(ctx, keys, func(r Row) bool {
166		for _, ris := range r {
167			for _, ri := range ris {
168				elt = append(elt, formatReadItem(ri))
169			}
170		}
171		return true
172	})
173	if err != nil {
174		t.Fatalf("read RowList: %v", err)
175	}
176
177	if got := strings.Join(elt, ","); got != want {
178		t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
179	}
180}
181
182func TestIntegration_DeleteRow(t *testing.T) {
183	ctx := context.Background()
184	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
185	if err != nil {
186		t.Fatal(err)
187	}
188	defer cleanup()
189
190	if err := populatePresidentsGraph(table); err != nil {
191		t.Fatal(err)
192	}
193
194	// Delete a row and check it goes away.
195	mut := NewMutation()
196	mut.DeleteRow()
197	if err := table.Apply(ctx, "wmckinley", mut); err != nil {
198		t.Fatalf("Apply DeleteRow: %v", err)
199	}
200	row, err := table.ReadRow(ctx, "wmckinley")
201	if err != nil {
202		t.Fatalf("Reading a row after DeleteRow: %v", err)
203	}
204	if len(row) != 0 {
205		t.Fatalf("Read non-zero row after DeleteRow: %v", row)
206	}
207}
208
209func TestIntegration_ReadModifyWrite(t *testing.T) {
210	ctx := context.Background()
211	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
212	if err != nil {
213		t.Fatal(err)
214	}
215	defer cleanup()
216
217	if err := populatePresidentsGraph(table); err != nil {
218		t.Fatal(err)
219	}
220
221	if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil {
222		t.Fatalf("Creating column family: %v", err)
223	}
224
225	appendRMW := func(b []byte) *ReadModifyWrite {
226		rmw := NewReadModifyWrite()
227		rmw.AppendValue("counter", "likes", b)
228		return rmw
229	}
230	incRMW := func(n int64) *ReadModifyWrite {
231		rmw := NewReadModifyWrite()
232		rmw.Increment("counter", "likes", n)
233		return rmw
234	}
235	rmwSeq := []struct {
236		desc string
237		rmw  *ReadModifyWrite
238		want []byte
239	}{
240		{
241			desc: "append #1",
242			rmw:  appendRMW([]byte{0, 0, 0}),
243			want: []byte{0, 0, 0},
244		},
245		{
246			desc: "append #2",
247			rmw:  appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17
248			want: []byte{0, 0, 0, 0, 0, 0, 0, 17},
249		},
250		{
251			desc: "increment",
252			rmw:  incRMW(8),
253			want: []byte{0, 0, 0, 0, 0, 0, 0, 25},
254		},
255	}
256	for _, step := range rmwSeq {
257		row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw)
258		if err != nil {
259			t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err)
260		}
261		// Make sure the modified cell returned by the RMW operation has a timestamp.
262		if row["counter"][0].Timestamp == 0 {
263			t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp)
264		}
265		clearTimestamps(row)
266		wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}}
267		if !testutil.Equal(row, wantRow) {
268			t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow)
269		}
270	}
271
272	// Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator.
273	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0}))
274	if err != nil {
275		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
276	}
277	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0}))
278	if err != nil {
279		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
280	}
281	// Get only the correct row back on read.
282	r, err := table.ReadRow(ctx, "issue-723-1")
283	if err != nil {
284		t.Fatalf("Reading row: %v", err)
285	}
286	if r.Key() != "issue-723-1" {
287		t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1")
288	}
289}
290
291func TestIntegration_ArbitraryTimestamps(t *testing.T) {
292	ctx := context.Background()
293	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
294	if err != nil {
295		t.Fatal(err)
296	}
297	defer cleanup()
298
299	// Test arbitrary timestamps more thoroughly.
300	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
301		t.Fatalf("Creating column family: %v", err)
302	}
303	const numVersions = 4
304	mut := NewMutation()
305	for i := 1; i < numVersions; i++ {
306		// Timestamps are used in thousands because the server
307		// only permits that granularity.
308		mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
309		mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
310	}
311	if err := table.Apply(ctx, "testrow", mut); err != nil {
312		t.Fatalf("Mutating row: %v", err)
313	}
314	r, err := table.ReadRow(ctx, "testrow")
315	if err != nil {
316		t.Fatalf("Reading row: %v", err)
317	}
318	wantRow := Row{"ts": []ReadItem{
319		// These should be returned in descending timestamp order.
320		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
321		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
322		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
323		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
324		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
325		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
326	}}
327	if !testutil.Equal(r, wantRow) {
328		t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow)
329	}
330
331	// Do the same read, but filter to the latest two versions.
332	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
333	if err != nil {
334		t.Fatalf("Reading row: %v", err)
335	}
336	wantRow = Row{"ts": []ReadItem{
337		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
338		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
339		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
340		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
341	}}
342	if !testutil.Equal(r, wantRow) {
343		t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
344	}
345	// Check cell offset / limit
346	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3)))
347	if err != nil {
348		t.Fatalf("Reading row: %v", err)
349	}
350	wantRow = Row{"ts": []ReadItem{
351		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
352		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
353		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
354	}}
355	if !testutil.Equal(r, wantRow) {
356		t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow)
357	}
358	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3)))
359	if err != nil {
360		t.Fatalf("Reading row: %v", err)
361	}
362	wantRow = Row{"ts": []ReadItem{
363		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
364		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
365		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
366	}}
367	if !testutil.Equal(r, wantRow) {
368		t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow)
369	}
370	// Check timestamp range filtering (with truncation)
371	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000)))
372	if err != nil {
373		t.Fatalf("Reading row: %v", err)
374	}
375	wantRow = Row{"ts": []ReadItem{
376		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
377		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
378		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
379		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
380	}}
381	if !testutil.Equal(r, wantRow) {
382		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow)
383	}
384	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0)))
385	if err != nil {
386		t.Fatalf("Reading row: %v", err)
387	}
388	wantRow = Row{"ts": []ReadItem{
389		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
390		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
391		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
392		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
393		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
394		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
395	}}
396	if !testutil.Equal(r, wantRow) {
397		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow)
398	}
399	// Delete non-existing cells, no such column family in this row
400	// Should not delete anything
401	if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil {
402		t.Fatalf("Creating column family: %v", err)
403	}
404	mut = NewMutation()
405	mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval
406	if err := table.Apply(ctx, "testrow", mut); err != nil {
407		t.Fatalf("Mutating row: %v", err)
408	}
409	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
410	if err != nil {
411		t.Fatalf("Reading row: %v", err)
412	}
413	if !testutil.Equal(r, wantRow) {
414		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
415	}
416	// Delete non-existing cells, no such column in this column family
417	// Should not delete anything
418	mut = NewMutation()
419	mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval
420	if err := table.Apply(ctx, "testrow", mut); err != nil {
421		t.Fatalf("Mutating row: %v", err)
422	}
423	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
424	if err != nil {
425		t.Fatalf("Reading row: %v", err)
426	}
427	if !testutil.Equal(r, wantRow) {
428		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
429	}
430	// Delete the cell with timestamp 2000 and repeat the last read,
431	// checking that we get ts 3000 and ts 1000.
432	mut = NewMutation()
433	mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval
434	if err := table.Apply(ctx, "testrow", mut); err != nil {
435		t.Fatalf("Mutating row: %v", err)
436	}
437	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
438	if err != nil {
439		t.Fatalf("Reading row: %v", err)
440	}
441	wantRow = Row{"ts": []ReadItem{
442		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
443		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
444		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
445		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
446	}}
447	if !testutil.Equal(r, wantRow) {
448		t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow)
449	}
450
451	// Check DeleteCellsInFamily
452	if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil {
453		t.Fatalf("Creating column family: %v", err)
454	}
455
456	mut = NewMutation()
457	mut.Set("status", "start", 2000, []byte("2"))
458	mut.Set("status", "end", 3000, []byte("3"))
459	mut.Set("ts", "col", 1000, []byte("1"))
460	if err := table.Apply(ctx, "row1", mut); err != nil {
461		t.Fatalf("Mutating row: %v", err)
462	}
463	if err := table.Apply(ctx, "row2", mut); err != nil {
464		t.Fatalf("Mutating row: %v", err)
465	}
466
467	mut = NewMutation()
468	mut.DeleteCellsInFamily("status")
469	if err := table.Apply(ctx, "row1", mut); err != nil {
470		t.Fatalf("Delete cf: %v", err)
471	}
472
473	// ColumnFamily removed
474	r, err = table.ReadRow(ctx, "row1")
475	if err != nil {
476		t.Fatalf("Reading row: %v", err)
477	}
478	wantRow = Row{"ts": []ReadItem{
479		{Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
480	}}
481	if !testutil.Equal(r, wantRow) {
482		t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow)
483	}
484
485	// ColumnFamily not removed
486	r, err = table.ReadRow(ctx, "row2")
487	if err != nil {
488		t.Fatalf("Reading row: %v", err)
489	}
490	wantRow = Row{
491		"ts": []ReadItem{
492			{Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
493		},
494		"status": []ReadItem{
495			{Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")},
496			{Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
497		},
498	}
499	if !testutil.Equal(r, wantRow) {
500		t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow)
501	}
502
503	// Check DeleteCellsInColumn
504	mut = NewMutation()
505	mut.Set("status", "start", 2000, []byte("2"))
506	mut.Set("status", "middle", 3000, []byte("3"))
507	mut.Set("status", "end", 1000, []byte("1"))
508	if err := table.Apply(ctx, "row3", mut); err != nil {
509		t.Fatalf("Mutating row: %v", err)
510	}
511	mut = NewMutation()
512	mut.DeleteCellsInColumn("status", "middle")
513	if err := table.Apply(ctx, "row3", mut); err != nil {
514		t.Fatalf("Delete column: %v", err)
515	}
516	r, err = table.ReadRow(ctx, "row3")
517	if err != nil {
518		t.Fatalf("Reading row: %v", err)
519	}
520	wantRow = Row{
521		"status": []ReadItem{
522			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
523			{Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
524		},
525	}
526	if !testutil.Equal(r, wantRow) {
527		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
528	}
529	mut = NewMutation()
530	mut.DeleteCellsInColumn("status", "start")
531	if err := table.Apply(ctx, "row3", mut); err != nil {
532		t.Fatalf("Delete column: %v", err)
533	}
534	r, err = table.ReadRow(ctx, "row3")
535	if err != nil {
536		t.Fatalf("Reading row: %v", err)
537	}
538	wantRow = Row{
539		"status": []ReadItem{
540			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
541		},
542	}
543	if !testutil.Equal(r, wantRow) {
544		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
545	}
546	mut = NewMutation()
547	mut.DeleteCellsInColumn("status", "end")
548	if err := table.Apply(ctx, "row3", mut); err != nil {
549		t.Fatalf("Delete column: %v", err)
550	}
551	r, err = table.ReadRow(ctx, "row3")
552	if err != nil {
553		t.Fatalf("Reading row: %v", err)
554	}
555	if len(r) != 0 {
556		t.Fatalf("Delete column: got %v, want empty row", r)
557	}
558	// Add same cell after delete
559	mut = NewMutation()
560	mut.Set("status", "end", 1000, []byte("1"))
561	if err := table.Apply(ctx, "row3", mut); err != nil {
562		t.Fatalf("Mutating row: %v", err)
563	}
564	r, err = table.ReadRow(ctx, "row3")
565	if err != nil {
566		t.Fatalf("Reading row: %v", err)
567	}
568	if !testutil.Equal(r, wantRow) {
569		t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow)
570	}
571}
572
573func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) {
574	ctx := context.Background()
575	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
576	if err != nil {
577		t.Fatal(err)
578	}
579	defer cleanup()
580
581	if err := populatePresidentsGraph(table); err != nil {
582		t.Fatal(err)
583	}
584
585	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
586		t.Fatalf("Creating column family: %v", err)
587	}
588
589	// Do highly concurrent reads/writes.
590	const maxConcurrency = 1000
591	var wg sync.WaitGroup
592	for i := 0; i < maxConcurrency; i++ {
593		wg.Add(1)
594		go func() {
595			defer wg.Done()
596			switch r := rand.Intn(100); { // r ∈ [0,100)
597			case 0 <= r && r < 30:
598				// Do a read.
599				_, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1)))
600				if err != nil {
601					t.Errorf("Concurrent read: %v", err)
602				}
603			case 30 <= r && r < 100:
604				// Do a write.
605				mut := NewMutation()
606				mut.Set("ts", "col", 1000, []byte("data"))
607				if err := table.Apply(ctx, "testrow", mut); err != nil {
608					t.Errorf("Concurrent write: %v", err)
609				}
610			}
611		}()
612	}
613	wg.Wait()
614}
615
616func TestIntegration_LargeReadsWritesAndScans(t *testing.T) {
617	ctx := context.Background()
618	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
619	if err != nil {
620		t.Fatal(err)
621	}
622	defer cleanup()
623
624	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
625		t.Fatalf("Creating column family: %v", err)
626	}
627
628	bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set.
629	nonsense := []byte("lorem ipsum dolor sit amet, ")
630	fill(bigBytes, nonsense)
631	mut := NewMutation()
632	mut.Set("ts", "col", 1000, bigBytes)
633	if err := table.Apply(ctx, "bigrow", mut); err != nil {
634		t.Fatalf("Big write: %v", err)
635	}
636	r, err := table.ReadRow(ctx, "bigrow")
637	if err != nil {
638		t.Fatalf("Big read: %v", err)
639	}
640	wantRow := Row{"ts": []ReadItem{
641		{Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes},
642	}}
643	if !testutil.Equal(r, wantRow) {
644		t.Fatalf("Big read returned incorrect bytes: %v", r)
645	}
646
647	var wg sync.WaitGroup
648	// Now write 1000 rows, each with 82 KB values, then scan them all.
649	medBytes := make([]byte, 82<<10)
650	fill(medBytes, nonsense)
651	sem := make(chan int, 50) // do up to 50 mutations at a time.
652	for i := 0; i < 1000; i++ {
653		mut := NewMutation()
654		mut.Set("ts", "big-scan", 1000, medBytes)
655		row := fmt.Sprintf("row-%d", i)
656		wg.Add(1)
657		go func() {
658			defer wg.Done()
659			defer func() { <-sem }()
660			sem <- 1
661			if err := table.Apply(ctx, row, mut); err != nil {
662				t.Errorf("Preparing large scan: %v", err)
663			}
664		}()
665	}
666	wg.Wait()
667	n := 0
668	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
669		for _, ris := range r {
670			for _, ri := range ris {
671				n += len(ri.Value)
672			}
673		}
674		return true
675	}, RowFilter(ColumnFilter("big-scan")))
676	if err != nil {
677		t.Fatalf("Doing large scan: %v", err)
678	}
679	if want := 1000 * len(medBytes); n != want {
680		t.Fatalf("Large scan returned %d bytes, want %d", n, want)
681	}
682	// Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption.
683	rc := 0
684	wantRc := 3
685	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
686		rc++
687		return true
688	}, LimitRows(int64(wantRc)))
689	if err != nil {
690		t.Fatal(err)
691	}
692	if rc != wantRc {
693		t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc)
694	}
695
696	// Test bulk mutations
697	if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil {
698		t.Fatalf("Creating column family: %v", err)
699	}
700	bulkData := map[string][]string{
701		"red sox":  {"2004", "2007", "2013"},
702		"patriots": {"2001", "2003", "2004", "2014"},
703		"celtics":  {"1981", "1984", "1986", "2008"},
704	}
705	var rowKeys []string
706	var muts []*Mutation
707	for row, ss := range bulkData {
708		mut := NewMutation()
709		for _, name := range ss {
710			mut.Set("bulk", name, 1000, []byte("1"))
711		}
712		rowKeys = append(rowKeys, row)
713		muts = append(muts, mut)
714	}
715	status, err := table.ApplyBulk(ctx, rowKeys, muts)
716	if err != nil {
717		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
718	}
719	if status != nil {
720		t.Fatalf("non-nil errors: %v", err)
721	}
722
723	// Read each row back
724	for rowKey, ss := range bulkData {
725		row, err := table.ReadRow(ctx, rowKey)
726		if err != nil {
727			t.Fatalf("Reading a bulk row: %v", err)
728		}
729		var wantItems []ReadItem
730		for _, val := range ss {
731			wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")})
732		}
733		wantRow := Row{"bulk": wantItems}
734		if !testutil.Equal(row, wantRow) {
735			t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
736		}
737	}
738
739	// Test bulk write errors.
740	// Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error.
741	badMut := NewMutation()
742	badMut.Set("badfamily", "col", ServerTime, nil)
743	badMut2 := NewMutation()
744	badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1"))
745	status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2})
746	if err != nil {
747		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
748	}
749	if status == nil {
750		t.Fatalf("No errors for bad bulk mutation")
751	} else if status[0] == nil || status[1] == nil {
752		t.Fatalf("No error for bad bulk mutation")
753	}
754}
755
756func TestIntegration_Read(t *testing.T) {
757	ctx := context.Background()
758	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
759	if err != nil {
760		t.Fatal(err)
761	}
762	defer cleanup()
763
764	// Insert some data.
765	initialData := map[string][]string{
766		"wmckinley":   {"tjefferson"},
767		"gwashington": {"jadams"},
768		"tjefferson":  {"gwashington", "jadams", "wmckinley"},
769		"jadams":      {"gwashington", "tjefferson"},
770	}
771	for row, ss := range initialData {
772		mut := NewMutation()
773		for _, name := range ss {
774			mut.Set("follows", name, 1000, []byte("1"))
775		}
776		if err := table.Apply(ctx, row, mut); err != nil {
777			t.Fatalf("Mutating row %q: %v", row, err)
778		}
779	}
780
781	for _, test := range []struct {
782		desc   string
783		rr     RowSet
784		filter Filter     // may be nil
785		limit  ReadOption // may be nil
786
787		// We do the read, grab all the cells, turn them into "<row>-<col>-<val>",
788		// and join with a comma.
789		want string
790	}{
791		{
792			desc: "read all, unfiltered",
793			rr:   RowRange{},
794			want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
795		},
796		{
797			desc: "read with InfiniteRange, unfiltered",
798			rr:   InfiniteRange("tjefferson"),
799			want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
800		},
801		{
802			desc: "read with NewRange, unfiltered",
803			rr:   NewRange("gargamel", "hubbard"),
804			want: "gwashington-jadams-1",
805		},
806		{
807			desc: "read with PrefixRange, unfiltered",
808			rr:   PrefixRange("jad"),
809			want: "jadams-gwashington-1,jadams-tjefferson-1",
810		},
811		{
812			desc: "read with SingleRow, unfiltered",
813			rr:   SingleRow("wmckinley"),
814			want: "wmckinley-tjefferson-1",
815		},
816		{
817			desc:   "read all, with ColumnFilter",
818			rr:     RowRange{},
819			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
820			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
821		},
822		{
823			desc:   "read all, with ColumnFilter, prefix",
824			rr:     RowRange{},
825			filter: ColumnFilter("j"), // no matches
826			want:   "",
827		},
828		{
829			desc:   "read range, with ColumnRangeFilter",
830			rr:     RowRange{},
831			filter: ColumnRangeFilter("follows", "h", "k"),
832			want:   "gwashington-jadams-1,tjefferson-jadams-1",
833		},
834		{
835			desc:   "read range from empty, with ColumnRangeFilter",
836			rr:     RowRange{},
837			filter: ColumnRangeFilter("follows", "", "u"),
838			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
839		},
840		{
841			desc:   "read range from start to empty, with ColumnRangeFilter",
842			rr:     RowRange{},
843			filter: ColumnRangeFilter("follows", "h", ""),
844			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
845		},
846		{
847			desc:   "read with RowKeyFilter",
848			rr:     RowRange{},
849			filter: RowKeyFilter(".*wash.*"),
850			want:   "gwashington-jadams-1",
851		},
852		{
853			desc:   "read with RowKeyFilter, prefix",
854			rr:     RowRange{},
855			filter: RowKeyFilter("gwash"),
856			want:   "",
857		},
858		{
859			desc:   "read with RowKeyFilter, no matches",
860			rr:     RowRange{},
861			filter: RowKeyFilter(".*xxx.*"),
862			want:   "",
863		},
864		{
865			desc:   "read with FamilyFilter, no matches",
866			rr:     RowRange{},
867			filter: FamilyFilter(".*xxx.*"),
868			want:   "",
869		},
870		{
871			desc:   "read with ColumnFilter + row limit",
872			rr:     RowRange{},
873			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
874			limit:  LimitRows(2),
875			want:   "gwashington-jadams-1,jadams-tjefferson-1",
876		},
877		{
878			desc:   "read all, strip values",
879			rr:     RowRange{},
880			filter: StripValueFilter(),
881			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
882		},
883		{
884			desc:   "read with ColumnFilter + row limit + strip values",
885			rr:     RowRange{},
886			filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson"
887			limit:  LimitRows(2),
888			want:   "gwashington-jadams-,jadams-tjefferson-",
889		},
890		{
891			desc:   "read with condition, strip values on true",
892			rr:     RowRange{},
893			filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil),
894			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
895		},
896		{
897			desc:   "read with condition, strip values on false",
898			rr:     RowRange{},
899			filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()),
900			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
901		},
902		{
903			desc:   "read with ValueRangeFilter + row limit",
904			rr:     RowRange{},
905			filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1"
906			limit:  LimitRows(2),
907			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1",
908		},
909		{
910			desc:   "read with ValueRangeFilter, no match on exclusive end",
911			rr:     RowRange{},
912			filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match
913			want:   "",
914		},
915		{
916			desc:   "read with ValueRangeFilter, no matches",
917			rr:     RowRange{},
918			filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing
919			want:   "",
920		},
921		{
922			desc:   "read with InterleaveFilter, no matches on all filters",
923			rr:     RowRange{},
924			filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")),
925			want:   "",
926		},
927		{
928			desc:   "read with InterleaveFilter, no duplicate cells",
929			rr:     RowRange{},
930			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")),
931			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
932		},
933		{
934			desc:   "read with InterleaveFilter, with duplicate cells",
935			rr:     RowRange{},
936			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")),
937			want:   "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1",
938		},
939		{
940			desc: "read with a RowRangeList and no filter",
941			rr:   RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")},
942			want: "gwashington-jadams-1,wmckinley-tjefferson-1",
943		},
944		{
945			desc:   "chain that excludes rows and matches nothing, in a condition",
946			rr:     RowRange{},
947			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil),
948			want:   "",
949		},
950		{
951			desc:   "chain that ends with an interleave that has no match. covers #804",
952			rr:     RowRange{},
953			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil),
954			want:   "",
955		},
956	} {
957		t.Run(test.desc, func(t *testing.T) {
958			var opts []ReadOption
959			if test.filter != nil {
960				opts = append(opts, RowFilter(test.filter))
961			}
962			if test.limit != nil {
963				opts = append(opts, test.limit)
964			}
965			var elt []string
966			err := table.ReadRows(ctx, test.rr, func(r Row) bool {
967				for _, ris := range r {
968					for _, ri := range ris {
969						elt = append(elt, formatReadItem(ri))
970					}
971				}
972				return true
973			}, opts...)
974			if err != nil {
975				t.Fatal(err)
976			}
977			if got := strings.Join(elt, ","); got != test.want {
978				t.Fatalf("got %q\nwant %q", got, test.want)
979			}
980		})
981	}
982}
983
984func TestIntegration_SampleRowKeys(t *testing.T) {
985	ctx := context.Background()
986	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
987	if err != nil {
988		t.Fatal(err)
989	}
990	defer cleanup()
991
992	// Insert some data.
993	initialData := map[string][]string{
994		"wmckinley11":   {"tjefferson11"},
995		"gwashington77": {"jadams77"},
996		"tjefferson0":   {"gwashington0", "jadams0"},
997	}
998
999	for row, ss := range initialData {
1000		mut := NewMutation()
1001		for _, name := range ss {
1002			mut.Set("follows", name, 1000, []byte("1"))
1003		}
1004		if err := table.Apply(ctx, row, mut); err != nil {
1005			t.Fatalf("Mutating row %q: %v", row, err)
1006		}
1007	}
1008	sampleKeys, err := table.SampleRowKeys(context.Background())
1009	if err != nil {
1010		t.Fatalf("%s: %v", "SampleRowKeys:", err)
1011	}
1012	if len(sampleKeys) == 0 {
1013		t.Error("SampleRowKeys length 0")
1014	}
1015}
1016
1017func TestIntegration_Admin(t *testing.T) {
1018	testEnv, err := NewIntegrationEnv()
1019	if err != nil {
1020		t.Fatalf("IntegrationEnv: %v", err)
1021	}
1022	defer testEnv.Close()
1023
1024	timeout := 2 * time.Second
1025	if testEnv.Config().UseProd {
1026		timeout = 5 * time.Minute
1027	}
1028	ctx, _ := context.WithTimeout(context.Background(), timeout)
1029
1030	adminClient, err := testEnv.NewAdminClient()
1031	if err != nil {
1032		t.Fatalf("NewAdminClient: %v", err)
1033	}
1034	defer adminClient.Close()
1035
1036	iAdminClient, err := testEnv.NewInstanceAdminClient()
1037	if err != nil {
1038		t.Fatalf("NewInstanceAdminClient: %v", err)
1039	}
1040	if iAdminClient != nil {
1041		defer iAdminClient.Close()
1042
1043		iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1044		if err != nil {
1045			t.Errorf("InstanceInfo: %v", err)
1046		}
1047		if iInfo.Name != adminClient.instance {
1048			t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1049		}
1050	}
1051
1052	list := func() []string {
1053		tbls, err := adminClient.Tables(ctx)
1054		if err != nil {
1055			t.Fatalf("Fetching list of tables: %v", err)
1056		}
1057		sort.Strings(tbls)
1058		return tbls
1059	}
1060	containsAll := func(got, want []string) bool {
1061		gotSet := make(map[string]bool)
1062
1063		for _, s := range got {
1064			gotSet[s] = true
1065		}
1066		for _, s := range want {
1067			if !gotSet[s] {
1068				return false
1069			}
1070		}
1071		return true
1072	}
1073
1074	defer adminClient.DeleteTable(ctx, "mytable")
1075
1076	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1077		t.Fatalf("Creating table: %v", err)
1078	}
1079
1080	defer adminClient.DeleteTable(ctx, "myothertable")
1081
1082	if err := adminClient.CreateTable(ctx, "myothertable"); err != nil {
1083		t.Fatalf("Creating table: %v", err)
1084	}
1085
1086	if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) {
1087		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1088	}
1089
1090	must(adminClient.WaitForReplication(ctx, "mytable"))
1091
1092	if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil {
1093		t.Fatalf("Deleting table: %v", err)
1094	}
1095	tables := list()
1096	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1097		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1098	}
1099	if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) {
1100		t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted)
1101	}
1102
1103	tblConf := TableConf{
1104		TableID: "conftable",
1105		Families: map[string]GCPolicy{
1106			"fam1": MaxVersionsPolicy(1),
1107			"fam2": MaxVersionsPolicy(2),
1108		},
1109	}
1110	if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
1111		t.Fatalf("Creating table from TableConf: %v", err)
1112	}
1113	defer adminClient.DeleteTable(ctx, tblConf.TableID)
1114
1115	tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID)
1116	if err != nil {
1117		t.Fatalf("Getting table info: %v", err)
1118	}
1119	sort.Strings(tblInfo.Families)
1120	wantFams := []string{"fam1", "fam2"}
1121	if !testutil.Equal(tblInfo.Families, wantFams) {
1122		t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams)
1123	}
1124
1125	// Populate mytable and drop row ranges
1126	if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil {
1127		t.Fatalf("Creating column family: %v", err)
1128	}
1129
1130	client, err := testEnv.NewClient()
1131	if err != nil {
1132		t.Fatalf("NewClient: %v", err)
1133	}
1134	defer client.Close()
1135
1136	tbl := client.Open("mytable")
1137
1138	prefixes := []string{"a", "b", "c"}
1139	for _, prefix := range prefixes {
1140		for i := 0; i < 5; i++ {
1141			mut := NewMutation()
1142			mut.Set("cf", "col", 1000, []byte("1"))
1143			if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil {
1144				t.Fatalf("Mutating row: %v", err)
1145			}
1146		}
1147	}
1148
1149	if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil {
1150		t.Errorf("DropRowRange a: %v", err)
1151	}
1152	if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil {
1153		t.Errorf("DropRowRange c: %v", err)
1154	}
1155	if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil {
1156		t.Errorf("DropRowRange x: %v", err)
1157	}
1158
1159	var gotRowCount int
1160	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1161		gotRowCount++
1162		if !strings.HasPrefix(row.Key(), "b") {
1163			t.Errorf("Invalid row after dropping range: %v", row)
1164		}
1165		return true
1166	}))
1167	if gotRowCount != 5 {
1168		t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5)
1169	}
1170
1171	if err = adminClient.DropAllRows(ctx, "mytable"); err != nil {
1172		t.Errorf("DropAllRows mytable: %v", err)
1173	}
1174
1175	gotRowCount = 0
1176	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1177		gotRowCount++
1178		return true
1179	}))
1180	if gotRowCount != 0 {
1181		t.Errorf("Invalid row count after truncating table: got %v, want %v", gotRowCount, 0)
1182	}
1183}
1184
1185func TestIntegration_TableIam(t *testing.T) {
1186	testEnv, err := NewIntegrationEnv()
1187	if err != nil {
1188		t.Fatalf("IntegrationEnv: %v", err)
1189	}
1190	defer testEnv.Close()
1191
1192	if !testEnv.Config().UseProd {
1193		t.Skip("emulator doesn't support IAM Policy creation")
1194	}
1195
1196	timeout := 5 * time.Minute
1197	ctx, _ := context.WithTimeout(context.Background(), timeout)
1198
1199	adminClient, err := testEnv.NewAdminClient()
1200	if err != nil {
1201		t.Fatalf("NewAdminClient: %v", err)
1202	}
1203	defer adminClient.Close()
1204
1205	defer adminClient.DeleteTable(ctx, "mytable")
1206	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1207		t.Fatalf("Creating table: %v", err)
1208	}
1209
1210	// Verify that the IAM Controls work for Tables.
1211	iamHandle := adminClient.TableIAM("mytable")
1212	p, err := iamHandle.Policy(ctx)
1213	if err != nil {
1214		t.Errorf("Iam GetPolicy mytable: %v", err)
1215	}
1216	if err = iamHandle.SetPolicy(ctx, p); err != nil {
1217		t.Errorf("Iam SetPolicy mytable: %v", err)
1218	}
1219	if _, err = iamHandle.TestPermissions(ctx, []string{"bigtable.tables.get"}); err != nil {
1220		t.Errorf("Iam TestPermissions mytable: %v", err)
1221	}
1222}
1223
1224func TestIntegration_AdminCreateInstance(t *testing.T) {
1225	if instanceToCreate == "" {
1226		t.Skip("instanceToCreate not set, skipping instance creation testing")
1227	}
1228
1229	testEnv, err := NewIntegrationEnv()
1230	if err != nil {
1231		t.Fatalf("IntegrationEnv: %v", err)
1232	}
1233	defer testEnv.Close()
1234
1235	if !testEnv.Config().UseProd {
1236		t.Skip("emulator doesn't support instance creation")
1237	}
1238
1239	timeout := 5 * time.Minute
1240	ctx, _ := context.WithTimeout(context.Background(), timeout)
1241
1242	iAdminClient, err := testEnv.NewInstanceAdminClient()
1243	if err != nil {
1244		t.Fatalf("NewInstanceAdminClient: %v", err)
1245	}
1246	defer iAdminClient.Close()
1247
1248	clusterID := instanceToCreate + "-cluster"
1249
1250	// Create a development instance
1251	conf := &InstanceConf{
1252		InstanceId:   instanceToCreate,
1253		ClusterId:    clusterID,
1254		DisplayName:  "test instance",
1255		Zone:         instanceToCreateZone,
1256		InstanceType: DEVELOPMENT,
1257	}
1258	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1259		t.Fatalf("CreateInstance: %v", err)
1260	}
1261	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1262
1263	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1264	if err != nil {
1265		t.Fatalf("InstanceInfo: %v", err)
1266	}
1267
1268	// Basic return values are tested elsewhere, check instance type
1269	if iInfo.InstanceType != DEVELOPMENT {
1270		t.Fatalf("Instance is not DEVELOPMENT: %v", err)
1271	}
1272
1273	// Update everything we can about the instance in one call.
1274	confWithClusters := &InstanceWithClustersConfig{
1275		InstanceID:   instanceToCreate,
1276		DisplayName:  "new display name",
1277		InstanceType: PRODUCTION,
1278		Clusters: []ClusterConfig{
1279			{ClusterID: clusterID, NumNodes: 5, StorageType: HDD}},
1280	}
1281
1282	if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil {
1283		t.Fatalf("UpdateInstanceWithClusters: %v", err)
1284	}
1285
1286	iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
1287	if err != nil {
1288		t.Fatalf("InstanceInfo: %v", err)
1289	}
1290
1291	if iInfo.InstanceType != PRODUCTION {
1292		t.Fatalf("Instance type is not PRODUCTION: %v", err)
1293	}
1294	if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
1295		t.Fatalf("Display name: %q, want: %q", got, want)
1296	}
1297
1298	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1299	if err != nil {
1300		t.Fatalf("GetCluster: %v", err)
1301	}
1302
1303	if cInfo.ServeNodes != 5 {
1304		t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
1305	}
1306
1307	if cInfo.StorageType != HDD {
1308		t.Fatalf("StorageType: %v, want: %v", cInfo.StorageType, HDD)
1309	}
1310}
1311
1312func TestIntegration_AdminUpdateInstanceAndSyncClusters(t *testing.T) {
1313	if instanceToCreate == "" {
1314		t.Skip("instanceToCreate not set, skipping instance update testing")
1315	}
1316
1317	testEnv, err := NewIntegrationEnv()
1318	if err != nil {
1319		t.Fatalf("IntegrationEnv: %v", err)
1320	}
1321	defer testEnv.Close()
1322
1323	if !testEnv.Config().UseProd {
1324		t.Skip("emulator doesn't support instance creation")
1325	}
1326
1327	timeout := 5 * time.Minute
1328	ctx, _ := context.WithTimeout(context.Background(), timeout)
1329
1330	iAdminClient, err := testEnv.NewInstanceAdminClient()
1331	if err != nil {
1332		t.Fatalf("NewInstanceAdminClient: %v", err)
1333	}
1334	defer iAdminClient.Close()
1335
1336	clusterID := instanceToCreate + "-cluster"
1337
1338	// Create a development instance
1339	conf := &InstanceConf{
1340		InstanceId:   instanceToCreate,
1341		ClusterId:    clusterID,
1342		DisplayName:  "test instance",
1343		Zone:         instanceToCreateZone,
1344		InstanceType: DEVELOPMENT,
1345	}
1346	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1347		t.Fatalf("CreateInstance: %v", err)
1348	}
1349	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1350
1351	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1352	if err != nil {
1353		t.Fatalf("InstanceInfo: %v", err)
1354	}
1355
1356	// Basic return values are tested elsewhere, check instance type
1357	if iInfo.InstanceType != DEVELOPMENT {
1358		t.Fatalf("Instance is not DEVELOPMENT: %v", err)
1359	}
1360
1361	// Update everything we can about the instance in one call.
1362	confWithClusters := &InstanceWithClustersConfig{
1363		InstanceID:   instanceToCreate,
1364		DisplayName:  "new display name",
1365		InstanceType: PRODUCTION,
1366		Clusters: []ClusterConfig{
1367			{ClusterID: clusterID, NumNodes: 5}},
1368	}
1369
1370	results, err := UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1371	if err != nil {
1372		t.Fatalf("UpdateInstanceAndSyncClusters: %v", err)
1373	}
1374
1375	wantResults := UpdateInstanceResults{
1376		InstanceUpdated: true,
1377		UpdatedClusters: []string{clusterID},
1378	}
1379	if diff := testutil.Diff(*results, wantResults); diff != "" {
1380		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1381	}
1382
1383	iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
1384	if err != nil {
1385		t.Fatalf("InstanceInfo: %v", err)
1386	}
1387
1388	if iInfo.InstanceType != PRODUCTION {
1389		t.Fatalf("Instance type is not PRODUCTION: %v", err)
1390	}
1391	if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
1392		t.Fatalf("Display name: %q, want: %q", got, want)
1393	}
1394
1395	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1396	if err != nil {
1397		t.Fatalf("GetCluster: %v", err)
1398	}
1399
1400	if cInfo.ServeNodes != 5 {
1401		t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
1402	}
1403
1404	// Now add a second cluster as the only change. The first cluster must also be provided so
1405	// it is not removed.
1406	clusterID2 := clusterID + "-2"
1407	confWithClusters = &InstanceWithClustersConfig{
1408		InstanceID: instanceToCreate,
1409		Clusters: []ClusterConfig{
1410			{ClusterID: clusterID},
1411			{ClusterID: clusterID2, NumNodes: 3, StorageType: SSD, Zone: instanceToCreateZone2}},
1412	}
1413
1414	results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1415	if err != nil {
1416		t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err)
1417	}
1418
1419	wantResults = UpdateInstanceResults{
1420		InstanceUpdated: false,
1421		CreatedClusters: []string{clusterID2},
1422	}
1423	if diff := testutil.Diff(*results, wantResults); diff != "" {
1424		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1425	}
1426
1427	// Now update one cluster and delete the other
1428	confWithClusters = &InstanceWithClustersConfig{
1429		InstanceID: instanceToCreate,
1430		Clusters: []ClusterConfig{
1431			{ClusterID: clusterID, NumNodes: 4}},
1432	}
1433
1434	results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
1435	if err != nil {
1436		t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err)
1437	}
1438
1439	wantResults = UpdateInstanceResults{
1440		InstanceUpdated: false,
1441		UpdatedClusters: []string{clusterID},
1442		DeletedClusters: []string{clusterID2},
1443	}
1444
1445	if diff := testutil.Diff(*results, wantResults); diff != "" {
1446		t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
1447	}
1448
1449	// Make sure the instance looks as we would expect
1450	clusters, err := iAdminClient.Clusters(ctx, conf.InstanceId)
1451	if err != nil {
1452		t.Fatalf("Clusters: %v", err)
1453	}
1454
1455	if len(clusters) != 1 {
1456		t.Fatalf("Clusters length %v, want: 1", len(clusters))
1457	}
1458
1459	wantCluster := &ClusterInfo{
1460		Name:       clusterID,
1461		Zone:       instanceToCreateZone,
1462		ServeNodes: 4,
1463		State:      "READY",
1464	}
1465	if diff := testutil.Diff(clusters[0], wantCluster); diff != "" {
1466		t.Fatalf("InstanceEquality: got - want +\n%s", diff)
1467	}
1468}
1469
1470func TestIntegration_AdminSnapshot(t *testing.T) {
1471	testEnv, err := NewIntegrationEnv()
1472	if err != nil {
1473		t.Fatalf("IntegrationEnv: %v", err)
1474	}
1475	defer testEnv.Close()
1476
1477	if !testEnv.Config().UseProd {
1478		t.Skip("emulator doesn't support snapshots")
1479	}
1480
1481	timeout := 2 * time.Second
1482	if testEnv.Config().UseProd {
1483		timeout = 5 * time.Minute
1484	}
1485	ctx, _ := context.WithTimeout(context.Background(), timeout)
1486
1487	adminClient, err := testEnv.NewAdminClient()
1488	if err != nil {
1489		t.Fatalf("NewAdminClient: %v", err)
1490	}
1491	defer adminClient.Close()
1492
1493	table := testEnv.Config().Table
1494	cluster := testEnv.Config().Cluster
1495
1496	list := func(cluster string) ([]*SnapshotInfo, error) {
1497		infos := []*SnapshotInfo(nil)
1498
1499		it := adminClient.Snapshots(ctx, cluster)
1500		for {
1501			s, err := it.Next()
1502			if err == iterator.Done {
1503				break
1504			}
1505			if err != nil {
1506				return nil, err
1507			}
1508			infos = append(infos, s)
1509		}
1510		return infos, err
1511	}
1512
1513	// Delete the table at the end of the test. Schedule ahead of time
1514	// in case the client fails
1515	defer adminClient.DeleteTable(ctx, table)
1516
1517	if err := adminClient.CreateTable(ctx, table); err != nil {
1518		t.Fatalf("Creating table: %v", err)
1519	}
1520
1521	// Precondition: no snapshots
1522	snapshots, err := list(cluster)
1523	if err != nil {
1524		t.Fatalf("Initial snapshot list: %v", err)
1525	}
1526	if got, want := len(snapshots), 0; got != want {
1527		t.Fatalf("Initial snapshot list len: %d, want: %d", got, want)
1528	}
1529
1530	// Create snapshot
1531	defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot")
1532
1533	if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil {
1534		t.Fatalf("Creating snaphot: %v", err)
1535	}
1536
1537	// List snapshot
1538	snapshots, err = list(cluster)
1539	if err != nil {
1540		t.Fatalf("Listing snapshots: %v", err)
1541	}
1542	if got, want := len(snapshots), 1; got != want {
1543		t.Fatalf("Listing snapshot count: %d, want: %d", got, want)
1544	}
1545	if got, want := snapshots[0].Name, "mysnapshot"; got != want {
1546		t.Fatalf("Snapshot name: %s, want: %s", got, want)
1547	}
1548	if got, want := snapshots[0].SourceTable, table; got != want {
1549		t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want)
1550	}
1551	if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
1552		t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want)
1553	}
1554
1555	// Get snapshot
1556	snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot")
1557	if err != nil {
1558		t.Fatalf("SnapshotInfo: %v", snapshot)
1559	}
1560	if got, want := *snapshot, *snapshots[0]; got != want {
1561		t.Fatalf("SnapshotInfo: %v, want: %v", got, want)
1562	}
1563
1564	// Restore
1565	restoredTable := table + "-restored"
1566	defer adminClient.DeleteTable(ctx, restoredTable)
1567	if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil {
1568		t.Fatalf("CreateTableFromSnapshot: %v", err)
1569	}
1570	if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
1571		t.Fatalf("Restored TableInfo: %v", err)
1572	}
1573
1574	// Delete snapshot
1575	if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil {
1576		t.Fatalf("DeleteSnapshot: %v", err)
1577	}
1578	snapshots, err = list(cluster)
1579	if err != nil {
1580		t.Fatalf("List after Delete: %v", err)
1581	}
1582	if got, want := len(snapshots), 0; got != want {
1583		t.Fatalf("List after delete len: %d, want: %d", got, want)
1584	}
1585}
1586
1587func TestIntegration_Granularity(t *testing.T) {
1588	testEnv, err := NewIntegrationEnv()
1589	if err != nil {
1590		t.Fatalf("IntegrationEnv: %v", err)
1591	}
1592	defer testEnv.Close()
1593
1594	timeout := 2 * time.Second
1595	if testEnv.Config().UseProd {
1596		timeout = 5 * time.Minute
1597	}
1598	ctx, _ := context.WithTimeout(context.Background(), timeout)
1599
1600	adminClient, err := testEnv.NewAdminClient()
1601	if err != nil {
1602		t.Fatalf("NewAdminClient: %v", err)
1603	}
1604	defer adminClient.Close()
1605
1606	list := func() []string {
1607		tbls, err := adminClient.Tables(ctx)
1608		if err != nil {
1609			t.Fatalf("Fetching list of tables: %v", err)
1610		}
1611		sort.Strings(tbls)
1612		return tbls
1613	}
1614	containsAll := func(got, want []string) bool {
1615		gotSet := make(map[string]bool)
1616
1617		for _, s := range got {
1618			gotSet[s] = true
1619		}
1620		for _, s := range want {
1621			if !gotSet[s] {
1622				return false
1623			}
1624		}
1625		return true
1626	}
1627
1628	defer adminClient.DeleteTable(ctx, "mytable")
1629
1630	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1631		t.Fatalf("Creating table: %v", err)
1632	}
1633
1634	tables := list()
1635	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1636		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1637	}
1638
1639	// calling ModifyColumnFamilies to check the granularity of table
1640	prefix := adminClient.instancePrefix()
1641	req := &btapb.ModifyColumnFamiliesRequest{
1642		Name: prefix + "/tables/" + "mytable",
1643		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
1644			Id:  "cf",
1645			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}},
1646		}},
1647	}
1648	table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req)
1649	if err != nil {
1650		t.Fatalf("Creating column family: %v", err)
1651	}
1652	if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) {
1653		t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS))
1654	}
1655}
1656
1657func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) {
1658	testEnv, err := NewIntegrationEnv()
1659	if err != nil {
1660		t.Fatalf("IntegrationEnv: %v", err)
1661	}
1662	defer testEnv.Close()
1663
1664	timeout := 2 * time.Second
1665	if testEnv.Config().UseProd {
1666		timeout = 5 * time.Minute
1667	}
1668	ctx, cancel := context.WithTimeout(context.Background(), timeout)
1669	defer cancel()
1670
1671	adminClient, err := testEnv.NewAdminClient()
1672	if err != nil {
1673		t.Fatalf("NewAdminClient: %v", err)
1674	}
1675	defer adminClient.Close()
1676
1677	iAdminClient, err := testEnv.NewInstanceAdminClient()
1678	if err != nil {
1679		t.Fatalf("NewInstanceAdminClient: %v", err)
1680	}
1681
1682	if iAdminClient == nil {
1683		return
1684	}
1685
1686	defer iAdminClient.Close()
1687	profile := ProfileConf{
1688		ProfileID:     "app_profile1",
1689		InstanceID:    adminClient.instance,
1690		ClusterID:     testEnv.Config().Cluster,
1691		Description:   "creating new app profile 1",
1692		RoutingPolicy: SingleClusterRouting,
1693	}
1694
1695	createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile)
1696	if err != nil {
1697		t.Fatalf("Creating app profile: %v", err)
1698
1699	}
1700
1701	gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
1702
1703	if err != nil {
1704		t.Fatalf("Get app profile: %v", err)
1705	}
1706
1707	if !proto.Equal(createdProfile, gotProfile) {
1708		t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name)
1709
1710	}
1711
1712	list := func(instanceID string) ([]*btapb.AppProfile, error) {
1713		profiles := []*btapb.AppProfile(nil)
1714
1715		it := iAdminClient.ListAppProfiles(ctx, instanceID)
1716		for {
1717			s, err := it.Next()
1718			if err == iterator.Done {
1719				break
1720			}
1721			if err != nil {
1722				return nil, err
1723			}
1724			profiles = append(profiles, s)
1725		}
1726		return profiles, err
1727	}
1728
1729	profiles, err := list(adminClient.instance)
1730	if err != nil {
1731		t.Fatalf("List app profile: %v", err)
1732	}
1733
1734	if got, want := len(profiles), 1; got != want {
1735		t.Fatalf("Initial app profile list len: %d, want: %d", got, want)
1736	}
1737
1738	for _, test := range []struct {
1739		desc   string
1740		uattrs ProfileAttrsToUpdate
1741		want   *btapb.AppProfile // nil means error
1742	}{
1743		{
1744			desc:   "empty update",
1745			uattrs: ProfileAttrsToUpdate{},
1746			want:   nil,
1747		},
1748
1749		{
1750			desc:   "empty description update",
1751			uattrs: ProfileAttrsToUpdate{Description: ""},
1752			want: &btapb.AppProfile{
1753				Name:          gotProfile.Name,
1754				Description:   "",
1755				RoutingPolicy: gotProfile.RoutingPolicy,
1756				Etag:          gotProfile.Etag},
1757		},
1758		{
1759			desc: "routing update",
1760			uattrs: ProfileAttrsToUpdate{
1761				RoutingPolicy: SingleClusterRouting,
1762				ClusterID:     testEnv.Config().Cluster,
1763			},
1764			want: &btapb.AppProfile{
1765				Name:        gotProfile.Name,
1766				Description: "",
1767				Etag:        gotProfile.Etag,
1768				RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
1769					SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
1770						ClusterId: testEnv.Config().Cluster,
1771					}},
1772			},
1773		},
1774	} {
1775		err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs)
1776		if err != nil {
1777			if test.want != nil {
1778				t.Errorf("%s: %v", test.desc, err)
1779			}
1780			continue
1781		}
1782		if err == nil && test.want == nil {
1783			t.Errorf("%s: got nil, want error", test.desc)
1784			continue
1785		}
1786
1787		got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
1788
1789		if !proto.Equal(got, test.want) {
1790			t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want)
1791		}
1792
1793	}
1794
1795	err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1")
1796	if err != nil {
1797		t.Fatalf("Delete app profile: %v", err)
1798	}
1799
1800}
1801
1802func TestIntegration_InstanceUpdate(t *testing.T) {
1803	testEnv, err := NewIntegrationEnv()
1804	if err != nil {
1805		t.Fatalf("IntegrationEnv: %v", err)
1806	}
1807	defer testEnv.Close()
1808
1809	timeout := 2 * time.Second
1810	if testEnv.Config().UseProd {
1811		timeout = 5 * time.Minute
1812	}
1813	ctx, cancel := context.WithTimeout(context.Background(), timeout)
1814	defer cancel()
1815
1816	adminClient, err := testEnv.NewAdminClient()
1817	if err != nil {
1818		t.Fatalf("NewAdminClient: %v", err)
1819	}
1820
1821	defer adminClient.Close()
1822
1823	iAdminClient, err := testEnv.NewInstanceAdminClient()
1824	if err != nil {
1825		t.Fatalf("NewInstanceAdminClient: %v", err)
1826	}
1827
1828	if iAdminClient == nil {
1829		return
1830	}
1831
1832	defer iAdminClient.Close()
1833
1834	iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1835	if err != nil {
1836		t.Errorf("InstanceInfo: %v", err)
1837	}
1838
1839	if iInfo.Name != adminClient.instance {
1840		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1841	}
1842
1843	if iInfo.DisplayName != adminClient.instance {
1844		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1845	}
1846
1847	const numNodes = 4
1848	// update cluster nodes
1849	if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil {
1850		t.Errorf("UpdateCluster: %v", err)
1851	}
1852
1853	// get cluster after updating
1854	cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster)
1855	if err != nil {
1856		t.Errorf("GetCluster %v", err)
1857	}
1858	if cis.ServeNodes != int(numNodes) {
1859		t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes))
1860	}
1861}
1862
1863func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
1864	testEnv, err := NewIntegrationEnv()
1865	if err != nil {
1866		return nil, nil, nil, "", nil, err
1867	}
1868
1869	var timeout time.Duration
1870	if testEnv.Config().UseProd {
1871		timeout = 10 * time.Minute
1872		t.Logf("Running test against production")
1873	} else {
1874		timeout = 5 * time.Minute
1875		t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint)
1876	}
1877	ctx, cancel := context.WithTimeout(ctx, timeout)
1878
1879	client, err := testEnv.NewClient()
1880	if err != nil {
1881		return nil, nil, nil, "", nil, err
1882	}
1883
1884	adminClient, err := testEnv.NewAdminClient()
1885	if err != nil {
1886		return nil, nil, nil, "", nil, err
1887	}
1888
1889	tableName = testEnv.Config().Table
1890	if err := adminClient.CreateTable(ctx, tableName); err != nil {
1891		cancel()
1892		return nil, nil, nil, "", nil, err
1893	}
1894	if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil {
1895		cancel()
1896		return nil, nil, nil, "", nil, err
1897	}
1898
1899	return client, adminClient, client.Open(tableName), tableName, func() {
1900		if err := adminClient.DeleteTable(ctx, tableName); err != nil {
1901			t.Errorf("DeleteTable got error %v", err)
1902		}
1903		cancel()
1904		client.Close()
1905		adminClient.Close()
1906	}, nil
1907}
1908
1909func formatReadItem(ri ReadItem) string {
1910	// Use the column qualifier only to make the test data briefer.
1911	col := ri.Column[strings.Index(ri.Column, ":")+1:]
1912	return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value)
1913}
1914
1915func fill(b, sub []byte) {
1916	for len(b) > len(sub) {
1917		n := copy(b, sub)
1918		b = b[n:]
1919	}
1920}
1921
1922func clearTimestamps(r Row) {
1923	for _, ris := range r {
1924		for i := range ris {
1925			ris[i].Timestamp = 0
1926		}
1927	}
1928}
1929