1/*
2Copyright 2019 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"context"
21	"flag"
22	"fmt"
23	"math"
24	"math/rand"
25	"sort"
26	"strings"
27	"sync"
28	"testing"
29	"time"
30
31	"cloud.google.com/go/internal/testutil"
32	"github.com/golang/protobuf/proto"
33	"google.golang.org/api/iterator"
34	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
35)
36
37var presidentsSocialGraph = map[string][]string{
38	"wmckinley":   {"tjefferson"},
39	"gwashington": {"jadams"},
40	"tjefferson":  {"gwashington", "jadams"},
41	"jadams":      {"gwashington", "tjefferson"},
42}
43
44func populatePresidentsGraph(table *Table) error {
45	ctx := context.Background()
46	for row, ss := range presidentsSocialGraph {
47		mut := NewMutation()
48		for _, name := range ss {
49			mut.Set("follows", name, 1000, []byte("1"))
50		}
51		if err := table.Apply(ctx, row, mut); err != nil {
52			return fmt.Errorf("Mutating row %q: %v", row, err)
53		}
54	}
55	return nil
56}
57
58var instanceToCreate string
59var instanceToCreateZone string
60
61func init() {
62	// Don't test instance creation by default, as quota is necessary and aborted tests could strand resources.
63	flag.StringVar(&instanceToCreate, "it.instance-to-create", "",
64		"The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.")
65	flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b",
66		"The zone in which to create the new test instance.")
67}
68
69func TestIntegration_ConditionalMutations(t *testing.T) {
70	ctx := context.Background()
71	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
72	if err != nil {
73		t.Fatal(err)
74	}
75	defer cleanup()
76
77	if err := populatePresidentsGraph(table); err != nil {
78		t.Fatal(err)
79	}
80
81	// Do a conditional mutation with a complex filter.
82	mutTrue := NewMutation()
83	mutTrue.Set("follows", "wmckinley", 1000, []byte("1"))
84	filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter("."))
85	mut := NewCondMutation(filter, mutTrue, nil)
86	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
87		t.Fatalf("Conditionally mutating row: %v", err)
88	}
89	// Do a second condition mutation with a filter that does not match,
90	// and thus no changes should be made.
91	mutTrue = NewMutation()
92	mutTrue.DeleteRow()
93	filter = ColumnFilter("snoop.dogg")
94	mut = NewCondMutation(filter, mutTrue, nil)
95	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
96		t.Fatalf("Conditionally mutating row: %v", err)
97	}
98
99	// Fetch a row.
100	row, err := table.ReadRow(ctx, "jadams")
101	if err != nil {
102		t.Fatalf("Reading a row: %v", err)
103	}
104	wantRow := Row{
105		"follows": []ReadItem{
106			{Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")},
107			{Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")},
108		},
109	}
110	if !testutil.Equal(row, wantRow) {
111		t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
112	}
113}
114
115func TestIntegration_PartialReadRows(t *testing.T) {
116	ctx := context.Background()
117	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
118	if err != nil {
119		t.Fatal(err)
120	}
121	defer cleanup()
122
123	if err := populatePresidentsGraph(table); err != nil {
124		t.Fatal(err)
125	}
126
127	// Do a scan and stop part way through.
128	// Verify that the ReadRows callback doesn't keep running.
129	stopped := false
130	err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
131		if r.Key() < "h" {
132			return true
133		}
134		if !stopped {
135			stopped = true
136			return false
137		}
138		t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key())
139		return false
140	})
141	if err != nil {
142		t.Fatalf("Partial ReadRows: %v", err)
143	}
144}
145
146func TestIntegration_ReadRowList(t *testing.T) {
147	ctx := context.Background()
148	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
149	if err != nil {
150		t.Fatal(err)
151	}
152	defer cleanup()
153
154	if err := populatePresidentsGraph(table); err != nil {
155		t.Fatal(err)
156	}
157
158	// Read a RowList
159	var elt []string
160	keys := RowList{"wmckinley", "gwashington", "jadams"}
161	want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1"
162	err = table.ReadRows(ctx, keys, func(r Row) bool {
163		for _, ris := range r {
164			for _, ri := range ris {
165				elt = append(elt, formatReadItem(ri))
166			}
167		}
168		return true
169	})
170	if err != nil {
171		t.Fatalf("read RowList: %v", err)
172	}
173
174	if got := strings.Join(elt, ","); got != want {
175		t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
176	}
177}
178
179func TestIntegration_DeleteRow(t *testing.T) {
180	ctx := context.Background()
181	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
182	if err != nil {
183		t.Fatal(err)
184	}
185	defer cleanup()
186
187	if err := populatePresidentsGraph(table); err != nil {
188		t.Fatal(err)
189	}
190
191	// Delete a row and check it goes away.
192	mut := NewMutation()
193	mut.DeleteRow()
194	if err := table.Apply(ctx, "wmckinley", mut); err != nil {
195		t.Fatalf("Apply DeleteRow: %v", err)
196	}
197	row, err := table.ReadRow(ctx, "wmckinley")
198	if err != nil {
199		t.Fatalf("Reading a row after DeleteRow: %v", err)
200	}
201	if len(row) != 0 {
202		t.Fatalf("Read non-zero row after DeleteRow: %v", row)
203	}
204}
205
206func TestIntegration_ReadModifyWrite(t *testing.T) {
207	ctx := context.Background()
208	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
209	if err != nil {
210		t.Fatal(err)
211	}
212	defer cleanup()
213
214	if err := populatePresidentsGraph(table); err != nil {
215		t.Fatal(err)
216	}
217
218	if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil {
219		t.Fatalf("Creating column family: %v", err)
220	}
221
222	appendRMW := func(b []byte) *ReadModifyWrite {
223		rmw := NewReadModifyWrite()
224		rmw.AppendValue("counter", "likes", b)
225		return rmw
226	}
227	incRMW := func(n int64) *ReadModifyWrite {
228		rmw := NewReadModifyWrite()
229		rmw.Increment("counter", "likes", n)
230		return rmw
231	}
232	rmwSeq := []struct {
233		desc string
234		rmw  *ReadModifyWrite
235		want []byte
236	}{
237		{
238			desc: "append #1",
239			rmw:  appendRMW([]byte{0, 0, 0}),
240			want: []byte{0, 0, 0},
241		},
242		{
243			desc: "append #2",
244			rmw:  appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17
245			want: []byte{0, 0, 0, 0, 0, 0, 0, 17},
246		},
247		{
248			desc: "increment",
249			rmw:  incRMW(8),
250			want: []byte{0, 0, 0, 0, 0, 0, 0, 25},
251		},
252	}
253	for _, step := range rmwSeq {
254		row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw)
255		if err != nil {
256			t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err)
257		}
258		// Make sure the modified cell returned by the RMW operation has a timestamp.
259		if row["counter"][0].Timestamp == 0 {
260			t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp)
261		}
262		clearTimestamps(row)
263		wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}}
264		if !testutil.Equal(row, wantRow) {
265			t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow)
266		}
267	}
268
269	// Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator.
270	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0}))
271	if err != nil {
272		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
273	}
274	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0}))
275	if err != nil {
276		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
277	}
278	// Get only the correct row back on read.
279	r, err := table.ReadRow(ctx, "issue-723-1")
280	if err != nil {
281		t.Fatalf("Reading row: %v", err)
282	}
283	if r.Key() != "issue-723-1" {
284		t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1")
285	}
286}
287
288func TestIntegration_ArbitraryTimestamps(t *testing.T) {
289	ctx := context.Background()
290	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
291	if err != nil {
292		t.Fatal(err)
293	}
294	defer cleanup()
295
296	// Test arbitrary timestamps more thoroughly.
297	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
298		t.Fatalf("Creating column family: %v", err)
299	}
300	const numVersions = 4
301	mut := NewMutation()
302	for i := 1; i < numVersions; i++ {
303		// Timestamps are used in thousands because the server
304		// only permits that granularity.
305		mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
306		mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
307	}
308	if err := table.Apply(ctx, "testrow", mut); err != nil {
309		t.Fatalf("Mutating row: %v", err)
310	}
311	r, err := table.ReadRow(ctx, "testrow")
312	if err != nil {
313		t.Fatalf("Reading row: %v", err)
314	}
315	wantRow := Row{"ts": []ReadItem{
316		// These should be returned in descending timestamp order.
317		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
318		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
319		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
320		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
321		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
322		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
323	}}
324	if !testutil.Equal(r, wantRow) {
325		t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow)
326	}
327
328	// Do the same read, but filter to the latest two versions.
329	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
330	if err != nil {
331		t.Fatalf("Reading row: %v", err)
332	}
333	wantRow = Row{"ts": []ReadItem{
334		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
335		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
336		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
337		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
338	}}
339	if !testutil.Equal(r, wantRow) {
340		t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
341	}
342	// Check cell offset / limit
343	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3)))
344	if err != nil {
345		t.Fatalf("Reading row: %v", err)
346	}
347	wantRow = Row{"ts": []ReadItem{
348		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
349		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
350		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
351	}}
352	if !testutil.Equal(r, wantRow) {
353		t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow)
354	}
355	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3)))
356	if err != nil {
357		t.Fatalf("Reading row: %v", err)
358	}
359	wantRow = Row{"ts": []ReadItem{
360		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
361		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
362		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
363	}}
364	if !testutil.Equal(r, wantRow) {
365		t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow)
366	}
367	// Check timestamp range filtering (with truncation)
368	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000)))
369	if err != nil {
370		t.Fatalf("Reading row: %v", err)
371	}
372	wantRow = Row{"ts": []ReadItem{
373		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
374		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
375		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
376		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
377	}}
378	if !testutil.Equal(r, wantRow) {
379		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow)
380	}
381	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0)))
382	if err != nil {
383		t.Fatalf("Reading row: %v", err)
384	}
385	wantRow = Row{"ts": []ReadItem{
386		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
387		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
388		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
389		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
390		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
391		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
392	}}
393	if !testutil.Equal(r, wantRow) {
394		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow)
395	}
396	// Delete non-existing cells, no such column family in this row
397	// Should not delete anything
398	if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil {
399		t.Fatalf("Creating column family: %v", err)
400	}
401	mut = NewMutation()
402	mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval
403	if err := table.Apply(ctx, "testrow", mut); err != nil {
404		t.Fatalf("Mutating row: %v", err)
405	}
406	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
407	if err != nil {
408		t.Fatalf("Reading row: %v", err)
409	}
410	if !testutil.Equal(r, wantRow) {
411		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
412	}
413	// Delete non-existing cells, no such column in this column family
414	// Should not delete anything
415	mut = NewMutation()
416	mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval
417	if err := table.Apply(ctx, "testrow", mut); err != nil {
418		t.Fatalf("Mutating row: %v", err)
419	}
420	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
421	if err != nil {
422		t.Fatalf("Reading row: %v", err)
423	}
424	if !testutil.Equal(r, wantRow) {
425		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
426	}
427	// Delete the cell with timestamp 2000 and repeat the last read,
428	// checking that we get ts 3000 and ts 1000.
429	mut = NewMutation()
430	mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval
431	if err := table.Apply(ctx, "testrow", mut); err != nil {
432		t.Fatalf("Mutating row: %v", err)
433	}
434	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
435	if err != nil {
436		t.Fatalf("Reading row: %v", err)
437	}
438	wantRow = Row{"ts": []ReadItem{
439		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
440		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
441		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
442		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
443	}}
444	if !testutil.Equal(r, wantRow) {
445		t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow)
446	}
447
448	// Check DeleteCellsInFamily
449	if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil {
450		t.Fatalf("Creating column family: %v", err)
451	}
452
453	mut = NewMutation()
454	mut.Set("status", "start", 2000, []byte("2"))
455	mut.Set("status", "end", 3000, []byte("3"))
456	mut.Set("ts", "col", 1000, []byte("1"))
457	if err := table.Apply(ctx, "row1", mut); err != nil {
458		t.Fatalf("Mutating row: %v", err)
459	}
460	if err := table.Apply(ctx, "row2", mut); err != nil {
461		t.Fatalf("Mutating row: %v", err)
462	}
463
464	mut = NewMutation()
465	mut.DeleteCellsInFamily("status")
466	if err := table.Apply(ctx, "row1", mut); err != nil {
467		t.Fatalf("Delete cf: %v", err)
468	}
469
470	// ColumnFamily removed
471	r, err = table.ReadRow(ctx, "row1")
472	if err != nil {
473		t.Fatalf("Reading row: %v", err)
474	}
475	wantRow = Row{"ts": []ReadItem{
476		{Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
477	}}
478	if !testutil.Equal(r, wantRow) {
479		t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow)
480	}
481
482	// ColumnFamily not removed
483	r, err = table.ReadRow(ctx, "row2")
484	if err != nil {
485		t.Fatalf("Reading row: %v", err)
486	}
487	wantRow = Row{
488		"ts": []ReadItem{
489			{Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
490		},
491		"status": []ReadItem{
492			{Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")},
493			{Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
494		},
495	}
496	if !testutil.Equal(r, wantRow) {
497		t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow)
498	}
499
500	// Check DeleteCellsInColumn
501	mut = NewMutation()
502	mut.Set("status", "start", 2000, []byte("2"))
503	mut.Set("status", "middle", 3000, []byte("3"))
504	mut.Set("status", "end", 1000, []byte("1"))
505	if err := table.Apply(ctx, "row3", mut); err != nil {
506		t.Fatalf("Mutating row: %v", err)
507	}
508	mut = NewMutation()
509	mut.DeleteCellsInColumn("status", "middle")
510	if err := table.Apply(ctx, "row3", mut); err != nil {
511		t.Fatalf("Delete column: %v", err)
512	}
513	r, err = table.ReadRow(ctx, "row3")
514	if err != nil {
515		t.Fatalf("Reading row: %v", err)
516	}
517	wantRow = Row{
518		"status": []ReadItem{
519			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
520			{Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
521		},
522	}
523	if !testutil.Equal(r, wantRow) {
524		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
525	}
526	mut = NewMutation()
527	mut.DeleteCellsInColumn("status", "start")
528	if err := table.Apply(ctx, "row3", mut); err != nil {
529		t.Fatalf("Delete column: %v", err)
530	}
531	r, err = table.ReadRow(ctx, "row3")
532	if err != nil {
533		t.Fatalf("Reading row: %v", err)
534	}
535	wantRow = Row{
536		"status": []ReadItem{
537			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
538		},
539	}
540	if !testutil.Equal(r, wantRow) {
541		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
542	}
543	mut = NewMutation()
544	mut.DeleteCellsInColumn("status", "end")
545	if err := table.Apply(ctx, "row3", mut); err != nil {
546		t.Fatalf("Delete column: %v", err)
547	}
548	r, err = table.ReadRow(ctx, "row3")
549	if err != nil {
550		t.Fatalf("Reading row: %v", err)
551	}
552	if len(r) != 0 {
553		t.Fatalf("Delete column: got %v, want empty row", r)
554	}
555	// Add same cell after delete
556	mut = NewMutation()
557	mut.Set("status", "end", 1000, []byte("1"))
558	if err := table.Apply(ctx, "row3", mut); err != nil {
559		t.Fatalf("Mutating row: %v", err)
560	}
561	r, err = table.ReadRow(ctx, "row3")
562	if err != nil {
563		t.Fatalf("Reading row: %v", err)
564	}
565	if !testutil.Equal(r, wantRow) {
566		t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow)
567	}
568}
569
570func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) {
571	ctx := context.Background()
572	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
573	if err != nil {
574		t.Fatal(err)
575	}
576	defer cleanup()
577
578	if err := populatePresidentsGraph(table); err != nil {
579		t.Fatal(err)
580	}
581
582	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
583		t.Fatalf("Creating column family: %v", err)
584	}
585
586	// Do highly concurrent reads/writes.
587	const maxConcurrency = 1000
588	var wg sync.WaitGroup
589	for i := 0; i < maxConcurrency; i++ {
590		wg.Add(1)
591		go func() {
592			defer wg.Done()
593			switch r := rand.Intn(100); { // r ∈ [0,100)
594			case 0 <= r && r < 30:
595				// Do a read.
596				_, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1)))
597				if err != nil {
598					t.Errorf("Concurrent read: %v", err)
599				}
600			case 30 <= r && r < 100:
601				// Do a write.
602				mut := NewMutation()
603				mut.Set("ts", "col", 1000, []byte("data"))
604				if err := table.Apply(ctx, "testrow", mut); err != nil {
605					t.Errorf("Concurrent write: %v", err)
606				}
607			}
608		}()
609	}
610	wg.Wait()
611}
612
613func TestIntegration_LargeReadsWritesAndScans(t *testing.T) {
614	ctx := context.Background()
615	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
616	if err != nil {
617		t.Fatal(err)
618	}
619	defer cleanup()
620
621	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
622		t.Fatalf("Creating column family: %v", err)
623	}
624
625	bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set.
626	nonsense := []byte("lorem ipsum dolor sit amet, ")
627	fill(bigBytes, nonsense)
628	mut := NewMutation()
629	mut.Set("ts", "col", 1000, bigBytes)
630	if err := table.Apply(ctx, "bigrow", mut); err != nil {
631		t.Fatalf("Big write: %v", err)
632	}
633	r, err := table.ReadRow(ctx, "bigrow")
634	if err != nil {
635		t.Fatalf("Big read: %v", err)
636	}
637	wantRow := Row{"ts": []ReadItem{
638		{Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes},
639	}}
640	if !testutil.Equal(r, wantRow) {
641		t.Fatalf("Big read returned incorrect bytes: %v", r)
642	}
643
644	var wg sync.WaitGroup
645	// Now write 1000 rows, each with 82 KB values, then scan them all.
646	medBytes := make([]byte, 82<<10)
647	fill(medBytes, nonsense)
648	sem := make(chan int, 50) // do up to 50 mutations at a time.
649	for i := 0; i < 1000; i++ {
650		mut := NewMutation()
651		mut.Set("ts", "big-scan", 1000, medBytes)
652		row := fmt.Sprintf("row-%d", i)
653		wg.Add(1)
654		go func() {
655			defer wg.Done()
656			defer func() { <-sem }()
657			sem <- 1
658			if err := table.Apply(ctx, row, mut); err != nil {
659				t.Errorf("Preparing large scan: %v", err)
660			}
661		}()
662	}
663	wg.Wait()
664	n := 0
665	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
666		for _, ris := range r {
667			for _, ri := range ris {
668				n += len(ri.Value)
669			}
670		}
671		return true
672	}, RowFilter(ColumnFilter("big-scan")))
673	if err != nil {
674		t.Fatalf("Doing large scan: %v", err)
675	}
676	if want := 1000 * len(medBytes); n != want {
677		t.Fatalf("Large scan returned %d bytes, want %d", n, want)
678	}
679	// Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption.
680	rc := 0
681	wantRc := 3
682	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
683		rc++
684		return true
685	}, LimitRows(int64(wantRc)))
686	if err != nil {
687		t.Fatal(err)
688	}
689	if rc != wantRc {
690		t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc)
691	}
692
693	// Test bulk mutations
694	if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil {
695		t.Fatalf("Creating column family: %v", err)
696	}
697	bulkData := map[string][]string{
698		"red sox":  {"2004", "2007", "2013"},
699		"patriots": {"2001", "2003", "2004", "2014"},
700		"celtics":  {"1981", "1984", "1986", "2008"},
701	}
702	var rowKeys []string
703	var muts []*Mutation
704	for row, ss := range bulkData {
705		mut := NewMutation()
706		for _, name := range ss {
707			mut.Set("bulk", name, 1000, []byte("1"))
708		}
709		rowKeys = append(rowKeys, row)
710		muts = append(muts, mut)
711	}
712	status, err := table.ApplyBulk(ctx, rowKeys, muts)
713	if err != nil {
714		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
715	}
716	if status != nil {
717		t.Fatalf("non-nil errors: %v", err)
718	}
719
720	// Read each row back
721	for rowKey, ss := range bulkData {
722		row, err := table.ReadRow(ctx, rowKey)
723		if err != nil {
724			t.Fatalf("Reading a bulk row: %v", err)
725		}
726		var wantItems []ReadItem
727		for _, val := range ss {
728			wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")})
729		}
730		wantRow := Row{"bulk": wantItems}
731		if !testutil.Equal(row, wantRow) {
732			t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
733		}
734	}
735
736	// Test bulk write errors.
737	// Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error.
738	badMut := NewMutation()
739	badMut.Set("badfamily", "col", ServerTime, nil)
740	badMut2 := NewMutation()
741	badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1"))
742	status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2})
743	if err != nil {
744		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
745	}
746	if status == nil {
747		t.Fatalf("No errors for bad bulk mutation")
748	} else if status[0] == nil || status[1] == nil {
749		t.Fatalf("No error for bad bulk mutation")
750	}
751}
752
753func TestIntegration_Read(t *testing.T) {
754	ctx := context.Background()
755	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
756	if err != nil {
757		t.Fatal(err)
758	}
759	defer cleanup()
760
761	// Insert some data.
762	initialData := map[string][]string{
763		"wmckinley":   {"tjefferson"},
764		"gwashington": {"jadams"},
765		"tjefferson":  {"gwashington", "jadams", "wmckinley"},
766		"jadams":      {"gwashington", "tjefferson"},
767	}
768	for row, ss := range initialData {
769		mut := NewMutation()
770		for _, name := range ss {
771			mut.Set("follows", name, 1000, []byte("1"))
772		}
773		if err := table.Apply(ctx, row, mut); err != nil {
774			t.Fatalf("Mutating row %q: %v", row, err)
775		}
776	}
777
778	for _, test := range []struct {
779		desc   string
780		rr     RowSet
781		filter Filter     // may be nil
782		limit  ReadOption // may be nil
783
784		// We do the read, grab all the cells, turn them into "<row>-<col>-<val>",
785		// and join with a comma.
786		want string
787	}{
788		{
789			desc: "read all, unfiltered",
790			rr:   RowRange{},
791			want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
792		},
793		{
794			desc: "read with InfiniteRange, unfiltered",
795			rr:   InfiniteRange("tjefferson"),
796			want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
797		},
798		{
799			desc: "read with NewRange, unfiltered",
800			rr:   NewRange("gargamel", "hubbard"),
801			want: "gwashington-jadams-1",
802		},
803		{
804			desc: "read with PrefixRange, unfiltered",
805			rr:   PrefixRange("jad"),
806			want: "jadams-gwashington-1,jadams-tjefferson-1",
807		},
808		{
809			desc: "read with SingleRow, unfiltered",
810			rr:   SingleRow("wmckinley"),
811			want: "wmckinley-tjefferson-1",
812		},
813		{
814			desc:   "read all, with ColumnFilter",
815			rr:     RowRange{},
816			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
817			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
818		},
819		{
820			desc:   "read all, with ColumnFilter, prefix",
821			rr:     RowRange{},
822			filter: ColumnFilter("j"), // no matches
823			want:   "",
824		},
825		{
826			desc:   "read range, with ColumnRangeFilter",
827			rr:     RowRange{},
828			filter: ColumnRangeFilter("follows", "h", "k"),
829			want:   "gwashington-jadams-1,tjefferson-jadams-1",
830		},
831		{
832			desc:   "read range from empty, with ColumnRangeFilter",
833			rr:     RowRange{},
834			filter: ColumnRangeFilter("follows", "", "u"),
835			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
836		},
837		{
838			desc:   "read range from start to empty, with ColumnRangeFilter",
839			rr:     RowRange{},
840			filter: ColumnRangeFilter("follows", "h", ""),
841			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
842		},
843		{
844			desc:   "read with RowKeyFilter",
845			rr:     RowRange{},
846			filter: RowKeyFilter(".*wash.*"),
847			want:   "gwashington-jadams-1",
848		},
849		{
850			desc:   "read with RowKeyFilter, prefix",
851			rr:     RowRange{},
852			filter: RowKeyFilter("gwash"),
853			want:   "",
854		},
855		{
856			desc:   "read with RowKeyFilter, no matches",
857			rr:     RowRange{},
858			filter: RowKeyFilter(".*xxx.*"),
859			want:   "",
860		},
861		{
862			desc:   "read with FamilyFilter, no matches",
863			rr:     RowRange{},
864			filter: FamilyFilter(".*xxx.*"),
865			want:   "",
866		},
867		{
868			desc:   "read with ColumnFilter + row limit",
869			rr:     RowRange{},
870			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
871			limit:  LimitRows(2),
872			want:   "gwashington-jadams-1,jadams-tjefferson-1",
873		},
874		{
875			desc:   "read all, strip values",
876			rr:     RowRange{},
877			filter: StripValueFilter(),
878			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
879		},
880		{
881			desc:   "read with ColumnFilter + row limit + strip values",
882			rr:     RowRange{},
883			filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson"
884			limit:  LimitRows(2),
885			want:   "gwashington-jadams-,jadams-tjefferson-",
886		},
887		{
888			desc:   "read with condition, strip values on true",
889			rr:     RowRange{},
890			filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil),
891			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
892		},
893		{
894			desc:   "read with condition, strip values on false",
895			rr:     RowRange{},
896			filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()),
897			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
898		},
899		{
900			desc:   "read with ValueRangeFilter + row limit",
901			rr:     RowRange{},
902			filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1"
903			limit:  LimitRows(2),
904			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1",
905		},
906		{
907			desc:   "read with ValueRangeFilter, no match on exclusive end",
908			rr:     RowRange{},
909			filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match
910			want:   "",
911		},
912		{
913			desc:   "read with ValueRangeFilter, no matches",
914			rr:     RowRange{},
915			filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing
916			want:   "",
917		},
918		{
919			desc:   "read with InterleaveFilter, no matches on all filters",
920			rr:     RowRange{},
921			filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")),
922			want:   "",
923		},
924		{
925			desc:   "read with InterleaveFilter, no duplicate cells",
926			rr:     RowRange{},
927			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")),
928			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
929		},
930		{
931			desc:   "read with InterleaveFilter, with duplicate cells",
932			rr:     RowRange{},
933			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")),
934			want:   "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1",
935		},
936		{
937			desc: "read with a RowRangeList and no filter",
938			rr:   RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")},
939			want: "gwashington-jadams-1,wmckinley-tjefferson-1",
940		},
941		{
942			desc:   "chain that excludes rows and matches nothing, in a condition",
943			rr:     RowRange{},
944			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil),
945			want:   "",
946		},
947		{
948			desc:   "chain that ends with an interleave that has no match. covers #804",
949			rr:     RowRange{},
950			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil),
951			want:   "",
952		},
953	} {
954		t.Run(test.desc, func(t *testing.T) {
955			var opts []ReadOption
956			if test.filter != nil {
957				opts = append(opts, RowFilter(test.filter))
958			}
959			if test.limit != nil {
960				opts = append(opts, test.limit)
961			}
962			var elt []string
963			err := table.ReadRows(ctx, test.rr, func(r Row) bool {
964				for _, ris := range r {
965					for _, ri := range ris {
966						elt = append(elt, formatReadItem(ri))
967					}
968				}
969				return true
970			}, opts...)
971			if err != nil {
972				t.Fatal(err)
973			}
974			if got := strings.Join(elt, ","); got != test.want {
975				t.Fatalf("got %q\nwant %q", got, test.want)
976			}
977		})
978	}
979}
980
981func TestIntegration_SampleRowKeys(t *testing.T) {
982	ctx := context.Background()
983	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
984	if err != nil {
985		t.Fatal(err)
986	}
987	defer cleanup()
988
989	// Insert some data.
990	initialData := map[string][]string{
991		"wmckinley11":   {"tjefferson11"},
992		"gwashington77": {"jadams77"},
993		"tjefferson0":   {"gwashington0", "jadams0"},
994	}
995
996	for row, ss := range initialData {
997		mut := NewMutation()
998		for _, name := range ss {
999			mut.Set("follows", name, 1000, []byte("1"))
1000		}
1001		if err := table.Apply(ctx, row, mut); err != nil {
1002			t.Fatalf("Mutating row %q: %v", row, err)
1003		}
1004	}
1005	sampleKeys, err := table.SampleRowKeys(context.Background())
1006	if err != nil {
1007		t.Fatalf("%s: %v", "SampleRowKeys:", err)
1008	}
1009	if len(sampleKeys) == 0 {
1010		t.Error("SampleRowKeys length 0")
1011	}
1012}
1013
1014func TestIntegration_Admin(t *testing.T) {
1015	testEnv, err := NewIntegrationEnv()
1016	if err != nil {
1017		t.Fatalf("IntegrationEnv: %v", err)
1018	}
1019	defer testEnv.Close()
1020
1021	timeout := 2 * time.Second
1022	if testEnv.Config().UseProd {
1023		timeout = 5 * time.Minute
1024	}
1025	ctx, _ := context.WithTimeout(context.Background(), timeout)
1026
1027	adminClient, err := testEnv.NewAdminClient()
1028	if err != nil {
1029		t.Fatalf("NewAdminClient: %v", err)
1030	}
1031	defer adminClient.Close()
1032
1033	iAdminClient, err := testEnv.NewInstanceAdminClient()
1034	if err != nil {
1035		t.Fatalf("NewInstanceAdminClient: %v", err)
1036	}
1037	if iAdminClient != nil {
1038		defer iAdminClient.Close()
1039
1040		iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1041		if err != nil {
1042			t.Errorf("InstanceInfo: %v", err)
1043		}
1044		if iInfo.Name != adminClient.instance {
1045			t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1046		}
1047	}
1048
1049	list := func() []string {
1050		tbls, err := adminClient.Tables(ctx)
1051		if err != nil {
1052			t.Fatalf("Fetching list of tables: %v", err)
1053		}
1054		sort.Strings(tbls)
1055		return tbls
1056	}
1057	containsAll := func(got, want []string) bool {
1058		gotSet := make(map[string]bool)
1059
1060		for _, s := range got {
1061			gotSet[s] = true
1062		}
1063		for _, s := range want {
1064			if !gotSet[s] {
1065				return false
1066			}
1067		}
1068		return true
1069	}
1070
1071	defer adminClient.DeleteTable(ctx, "mytable")
1072
1073	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1074		t.Fatalf("Creating table: %v", err)
1075	}
1076
1077	defer adminClient.DeleteTable(ctx, "myothertable")
1078
1079	if err := adminClient.CreateTable(ctx, "myothertable"); err != nil {
1080		t.Fatalf("Creating table: %v", err)
1081	}
1082
1083	if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) {
1084		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1085	}
1086
1087	must(adminClient.WaitForReplication(ctx, "mytable"))
1088
1089	if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil {
1090		t.Fatalf("Deleting table: %v", err)
1091	}
1092	tables := list()
1093	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1094		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1095	}
1096	if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) {
1097		t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted)
1098	}
1099
1100	tblConf := TableConf{
1101		TableID: "conftable",
1102		Families: map[string]GCPolicy{
1103			"fam1": MaxVersionsPolicy(1),
1104			"fam2": MaxVersionsPolicy(2),
1105		},
1106	}
1107	if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
1108		t.Fatalf("Creating table from TableConf: %v", err)
1109	}
1110	defer adminClient.DeleteTable(ctx, tblConf.TableID)
1111
1112	tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID)
1113	if err != nil {
1114		t.Fatalf("Getting table info: %v", err)
1115	}
1116	sort.Strings(tblInfo.Families)
1117	wantFams := []string{"fam1", "fam2"}
1118	if !testutil.Equal(tblInfo.Families, wantFams) {
1119		t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams)
1120	}
1121
1122	// Populate mytable and drop row ranges
1123	if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil {
1124		t.Fatalf("Creating column family: %v", err)
1125	}
1126
1127	client, err := testEnv.NewClient()
1128	if err != nil {
1129		t.Fatalf("NewClient: %v", err)
1130	}
1131	defer client.Close()
1132
1133	tbl := client.Open("mytable")
1134
1135	prefixes := []string{"a", "b", "c"}
1136	for _, prefix := range prefixes {
1137		for i := 0; i < 5; i++ {
1138			mut := NewMutation()
1139			mut.Set("cf", "col", 1000, []byte("1"))
1140			if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil {
1141				t.Fatalf("Mutating row: %v", err)
1142			}
1143		}
1144	}
1145
1146	if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil {
1147		t.Errorf("DropRowRange a: %v", err)
1148	}
1149	if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil {
1150		t.Errorf("DropRowRange c: %v", err)
1151	}
1152	if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil {
1153		t.Errorf("DropRowRange x: %v", err)
1154	}
1155
1156	var gotRowCount int
1157	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1158		gotRowCount++
1159		if !strings.HasPrefix(row.Key(), "b") {
1160			t.Errorf("Invalid row after dropping range: %v", row)
1161		}
1162		return true
1163	}))
1164	if gotRowCount != 5 {
1165		t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5)
1166	}
1167}
1168
1169func TestIntegration_AdminCreateInstance(t *testing.T) {
1170	if instanceToCreate == "" {
1171		t.Skip("instanceToCreate not set, skipping instance creation testing")
1172	}
1173
1174	testEnv, err := NewIntegrationEnv()
1175	if err != nil {
1176		t.Fatalf("IntegrationEnv: %v", err)
1177	}
1178	defer testEnv.Close()
1179
1180	if !testEnv.Config().UseProd {
1181		t.Skip("emulator doesn't support instance creation")
1182	}
1183
1184	timeout := 5 * time.Minute
1185	ctx, _ := context.WithTimeout(context.Background(), timeout)
1186
1187	iAdminClient, err := testEnv.NewInstanceAdminClient()
1188	if err != nil {
1189		t.Fatalf("NewInstanceAdminClient: %v", err)
1190	}
1191	defer iAdminClient.Close()
1192
1193	clusterID := instanceToCreate + "-cluster"
1194
1195	// Create a development instance
1196	conf := &InstanceConf{
1197		InstanceId:   instanceToCreate,
1198		ClusterId:    clusterID,
1199		DisplayName:  "test instance",
1200		Zone:         instanceToCreateZone,
1201		InstanceType: DEVELOPMENT,
1202	}
1203	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1204		t.Fatalf("CreateInstance: %v", err)
1205	}
1206	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1207
1208	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1209	if err != nil {
1210		t.Fatalf("InstanceInfo: %v", err)
1211	}
1212
1213	// Basic return values are tested elsewhere, check instance type
1214	if iInfo.InstanceType != DEVELOPMENT {
1215		t.Fatalf("Instance is not DEVELOPMENT: %v", err)
1216	}
1217
1218	// Update everything we can about the instance in one call.
1219	confWithClusters := &InstanceWithClustersConfig{
1220		InstanceID:   instanceToCreate,
1221		DisplayName:  "new display name",
1222		InstanceType: PRODUCTION,
1223		Clusters: []ClusterConfig{
1224			{ClusterID: clusterID, NumNodes: 5}},
1225	}
1226
1227	if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil {
1228		t.Fatalf("UpdateInstanceWithClusters: %v", err)
1229	}
1230
1231	iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
1232	if err != nil {
1233		t.Fatalf("InstanceInfo: %v", err)
1234	}
1235
1236	if iInfo.InstanceType != PRODUCTION {
1237		t.Fatalf("Instance type is not PRODUCTION: %v", err)
1238	}
1239	if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
1240		t.Fatalf("Display name: %q, want: %q", got, want)
1241	}
1242
1243	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1244	if err != nil {
1245		t.Fatalf("GetCluster: %v", err)
1246	}
1247
1248	if cInfo.ServeNodes != 5 {
1249		t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
1250	}
1251}
1252
1253func TestIntegration_AdminSnapshot(t *testing.T) {
1254	testEnv, err := NewIntegrationEnv()
1255	if err != nil {
1256		t.Fatalf("IntegrationEnv: %v", err)
1257	}
1258	defer testEnv.Close()
1259
1260	if !testEnv.Config().UseProd {
1261		t.Skip("emulator doesn't support snapshots")
1262	}
1263
1264	timeout := 2 * time.Second
1265	if testEnv.Config().UseProd {
1266		timeout = 5 * time.Minute
1267	}
1268	ctx, _ := context.WithTimeout(context.Background(), timeout)
1269
1270	adminClient, err := testEnv.NewAdminClient()
1271	if err != nil {
1272		t.Fatalf("NewAdminClient: %v", err)
1273	}
1274	defer adminClient.Close()
1275
1276	table := testEnv.Config().Table
1277	cluster := testEnv.Config().Cluster
1278
1279	list := func(cluster string) ([]*SnapshotInfo, error) {
1280		infos := []*SnapshotInfo(nil)
1281
1282		it := adminClient.Snapshots(ctx, cluster)
1283		for {
1284			s, err := it.Next()
1285			if err == iterator.Done {
1286				break
1287			}
1288			if err != nil {
1289				return nil, err
1290			}
1291			infos = append(infos, s)
1292		}
1293		return infos, err
1294	}
1295
1296	// Delete the table at the end of the test. Schedule ahead of time
1297	// in case the client fails
1298	defer adminClient.DeleteTable(ctx, table)
1299
1300	if err := adminClient.CreateTable(ctx, table); err != nil {
1301		t.Fatalf("Creating table: %v", err)
1302	}
1303
1304	// Precondition: no snapshots
1305	snapshots, err := list(cluster)
1306	if err != nil {
1307		t.Fatalf("Initial snapshot list: %v", err)
1308	}
1309	if got, want := len(snapshots), 0; got != want {
1310		t.Fatalf("Initial snapshot list len: %d, want: %d", got, want)
1311	}
1312
1313	// Create snapshot
1314	defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot")
1315
1316	if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil {
1317		t.Fatalf("Creating snaphot: %v", err)
1318	}
1319
1320	// List snapshot
1321	snapshots, err = list(cluster)
1322	if err != nil {
1323		t.Fatalf("Listing snapshots: %v", err)
1324	}
1325	if got, want := len(snapshots), 1; got != want {
1326		t.Fatalf("Listing snapshot count: %d, want: %d", got, want)
1327	}
1328	if got, want := snapshots[0].Name, "mysnapshot"; got != want {
1329		t.Fatalf("Snapshot name: %s, want: %s", got, want)
1330	}
1331	if got, want := snapshots[0].SourceTable, table; got != want {
1332		t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want)
1333	}
1334	if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
1335		t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want)
1336	}
1337
1338	// Get snapshot
1339	snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot")
1340	if err != nil {
1341		t.Fatalf("SnapshotInfo: %v", snapshot)
1342	}
1343	if got, want := *snapshot, *snapshots[0]; got != want {
1344		t.Fatalf("SnapshotInfo: %v, want: %v", got, want)
1345	}
1346
1347	// Restore
1348	restoredTable := table + "-restored"
1349	defer adminClient.DeleteTable(ctx, restoredTable)
1350	if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil {
1351		t.Fatalf("CreateTableFromSnapshot: %v", err)
1352	}
1353	if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
1354		t.Fatalf("Restored TableInfo: %v", err)
1355	}
1356
1357	// Delete snapshot
1358	if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil {
1359		t.Fatalf("DeleteSnapshot: %v", err)
1360	}
1361	snapshots, err = list(cluster)
1362	if err != nil {
1363		t.Fatalf("List after Delete: %v", err)
1364	}
1365	if got, want := len(snapshots), 0; got != want {
1366		t.Fatalf("List after delete len: %d, want: %d", got, want)
1367	}
1368}
1369
1370func TestIntegration_Granularity(t *testing.T) {
1371	testEnv, err := NewIntegrationEnv()
1372	if err != nil {
1373		t.Fatalf("IntegrationEnv: %v", err)
1374	}
1375	defer testEnv.Close()
1376
1377	timeout := 2 * time.Second
1378	if testEnv.Config().UseProd {
1379		timeout = 5 * time.Minute
1380	}
1381	ctx, _ := context.WithTimeout(context.Background(), timeout)
1382
1383	adminClient, err := testEnv.NewAdminClient()
1384	if err != nil {
1385		t.Fatalf("NewAdminClient: %v", err)
1386	}
1387	defer adminClient.Close()
1388
1389	list := func() []string {
1390		tbls, err := adminClient.Tables(ctx)
1391		if err != nil {
1392			t.Fatalf("Fetching list of tables: %v", err)
1393		}
1394		sort.Strings(tbls)
1395		return tbls
1396	}
1397	containsAll := func(got, want []string) bool {
1398		gotSet := make(map[string]bool)
1399
1400		for _, s := range got {
1401			gotSet[s] = true
1402		}
1403		for _, s := range want {
1404			if !gotSet[s] {
1405				return false
1406			}
1407		}
1408		return true
1409	}
1410
1411	defer adminClient.DeleteTable(ctx, "mytable")
1412
1413	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1414		t.Fatalf("Creating table: %v", err)
1415	}
1416
1417	tables := list()
1418	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1419		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1420	}
1421
1422	// calling ModifyColumnFamilies to check the granularity of table
1423	prefix := adminClient.instancePrefix()
1424	req := &btapb.ModifyColumnFamiliesRequest{
1425		Name: prefix + "/tables/" + "mytable",
1426		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
1427			Id:  "cf",
1428			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}},
1429		}},
1430	}
1431	table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req)
1432	if err != nil {
1433		t.Fatalf("Creating column family: %v", err)
1434	}
1435	if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) {
1436		t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS))
1437	}
1438}
1439
1440func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) {
1441	testEnv, err := NewIntegrationEnv()
1442	if err != nil {
1443		t.Fatalf("IntegrationEnv: %v", err)
1444	}
1445	defer testEnv.Close()
1446
1447	timeout := 2 * time.Second
1448	if testEnv.Config().UseProd {
1449		timeout = 5 * time.Minute
1450	}
1451	ctx, cancel := context.WithTimeout(context.Background(), timeout)
1452	defer cancel()
1453
1454	adminClient, err := testEnv.NewAdminClient()
1455	if err != nil {
1456		t.Fatalf("NewAdminClient: %v", err)
1457	}
1458	defer adminClient.Close()
1459
1460	iAdminClient, err := testEnv.NewInstanceAdminClient()
1461	if err != nil {
1462		t.Fatalf("NewInstanceAdminClient: %v", err)
1463	}
1464
1465	if iAdminClient == nil {
1466		return
1467	}
1468
1469	defer iAdminClient.Close()
1470	profile := ProfileConf{
1471		ProfileID:     "app_profile1",
1472		InstanceID:    adminClient.instance,
1473		ClusterID:     testEnv.Config().Cluster,
1474		Description:   "creating new app profile 1",
1475		RoutingPolicy: SingleClusterRouting,
1476	}
1477
1478	createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile)
1479	if err != nil {
1480		t.Fatalf("Creating app profile: %v", err)
1481
1482	}
1483
1484	gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
1485
1486	if err != nil {
1487		t.Fatalf("Get app profile: %v", err)
1488	}
1489
1490	if !proto.Equal(createdProfile, gotProfile) {
1491		t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name)
1492
1493	}
1494
1495	list := func(instanceID string) ([]*btapb.AppProfile, error) {
1496		profiles := []*btapb.AppProfile(nil)
1497
1498		it := iAdminClient.ListAppProfiles(ctx, instanceID)
1499		for {
1500			s, err := it.Next()
1501			if err == iterator.Done {
1502				break
1503			}
1504			if err != nil {
1505				return nil, err
1506			}
1507			profiles = append(profiles, s)
1508		}
1509		return profiles, err
1510	}
1511
1512	profiles, err := list(adminClient.instance)
1513	if err != nil {
1514		t.Fatalf("List app profile: %v", err)
1515	}
1516
1517	if got, want := len(profiles), 1; got != want {
1518		t.Fatalf("Initial app profile list len: %d, want: %d", got, want)
1519	}
1520
1521	for _, test := range []struct {
1522		desc   string
1523		uattrs ProfileAttrsToUpdate
1524		want   *btapb.AppProfile // nil means error
1525	}{
1526		{
1527			desc:   "empty update",
1528			uattrs: ProfileAttrsToUpdate{},
1529			want:   nil,
1530		},
1531
1532		{
1533			desc:   "empty description update",
1534			uattrs: ProfileAttrsToUpdate{Description: ""},
1535			want: &btapb.AppProfile{
1536				Name:          gotProfile.Name,
1537				Description:   "",
1538				RoutingPolicy: gotProfile.RoutingPolicy,
1539				Etag:          gotProfile.Etag},
1540		},
1541		{
1542			desc: "routing update",
1543			uattrs: ProfileAttrsToUpdate{
1544				RoutingPolicy: SingleClusterRouting,
1545				ClusterID:     testEnv.Config().Cluster,
1546			},
1547			want: &btapb.AppProfile{
1548				Name:        gotProfile.Name,
1549				Description: "",
1550				Etag:        gotProfile.Etag,
1551				RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
1552					SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
1553						ClusterId: testEnv.Config().Cluster,
1554					}},
1555			},
1556		},
1557	} {
1558		err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs)
1559		if err != nil {
1560			if test.want != nil {
1561				t.Errorf("%s: %v", test.desc, err)
1562			}
1563			continue
1564		}
1565		if err == nil && test.want == nil {
1566			t.Errorf("%s: got nil, want error", test.desc)
1567			continue
1568		}
1569
1570		got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
1571
1572		if !proto.Equal(got, test.want) {
1573			t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want)
1574		}
1575
1576	}
1577
1578	err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1")
1579	if err != nil {
1580		t.Fatalf("Delete app profile: %v", err)
1581	}
1582
1583}
1584
1585func TestIntegration_InstanceUpdate(t *testing.T) {
1586	testEnv, err := NewIntegrationEnv()
1587	if err != nil {
1588		t.Fatalf("IntegrationEnv: %v", err)
1589	}
1590	defer testEnv.Close()
1591
1592	timeout := 2 * time.Second
1593	if testEnv.Config().UseProd {
1594		timeout = 5 * time.Minute
1595	}
1596	ctx, cancel := context.WithTimeout(context.Background(), timeout)
1597	defer cancel()
1598
1599	adminClient, err := testEnv.NewAdminClient()
1600	if err != nil {
1601		t.Fatalf("NewAdminClient: %v", err)
1602	}
1603
1604	defer adminClient.Close()
1605
1606	iAdminClient, err := testEnv.NewInstanceAdminClient()
1607	if err != nil {
1608		t.Fatalf("NewInstanceAdminClient: %v", err)
1609	}
1610
1611	if iAdminClient == nil {
1612		return
1613	}
1614
1615	defer iAdminClient.Close()
1616
1617	iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1618	if err != nil {
1619		t.Errorf("InstanceInfo: %v", err)
1620	}
1621
1622	if iInfo.Name != adminClient.instance {
1623		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1624	}
1625
1626	if iInfo.DisplayName != adminClient.instance {
1627		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1628	}
1629
1630	const numNodes = 4
1631	// update cluster nodes
1632	if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil {
1633		t.Errorf("UpdateCluster: %v", err)
1634	}
1635
1636	// get cluster after updating
1637	cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster)
1638	if err != nil {
1639		t.Errorf("GetCluster %v", err)
1640	}
1641	if cis.ServeNodes != int(numNodes) {
1642		t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes))
1643	}
1644}
1645
1646func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
1647	testEnv, err := NewIntegrationEnv()
1648	if err != nil {
1649		return nil, nil, nil, "", nil, err
1650	}
1651
1652	var timeout time.Duration
1653	if testEnv.Config().UseProd {
1654		timeout = 10 * time.Minute
1655		t.Logf("Running test against production")
1656	} else {
1657		timeout = 1 * time.Minute
1658		t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint)
1659	}
1660	ctx, cancel := context.WithTimeout(ctx, timeout)
1661	defer cancel()
1662
1663	client, err := testEnv.NewClient()
1664	if err != nil {
1665		return nil, nil, nil, "", nil, err
1666	}
1667
1668	adminClient, err := testEnv.NewAdminClient()
1669	if err != nil {
1670		return nil, nil, nil, "", nil, err
1671	}
1672
1673	tableName = testEnv.Config().Table
1674	if err := adminClient.CreateTable(ctx, tableName); err != nil {
1675		return nil, nil, nil, "", nil, err
1676	}
1677	if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil {
1678		return nil, nil, nil, "", nil, err
1679	}
1680
1681	return client, adminClient, client.Open(tableName), tableName, func() {
1682		adminClient.DeleteTable(ctx, tableName)
1683		client.Close()
1684		adminClient.Close()
1685	}, nil
1686}
1687
1688func formatReadItem(ri ReadItem) string {
1689	// Use the column qualifier only to make the test data briefer.
1690	col := ri.Column[strings.Index(ri.Column, ":")+1:]
1691	return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value)
1692}
1693
1694func fill(b, sub []byte) {
1695	for len(b) > len(sub) {
1696		n := copy(b, sub)
1697		b = b[n:]
1698	}
1699}
1700
1701func clearTimestamps(r Row) {
1702	for _, ris := range r {
1703		for i := range ris {
1704			ris[i].Timestamp = 0
1705		}
1706	}
1707}
1708