1/*
2Copyright 2019 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"context"
21	"flag"
22	"fmt"
23	"math"
24	"math/rand"
25	"sort"
26	"strings"
27	"sync"
28	"testing"
29	"time"
30
31	"cloud.google.com/go/internal/testutil"
32	"github.com/golang/protobuf/proto"
33	"google.golang.org/api/iterator"
34	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
35)
36
37var presidentsSocialGraph = map[string][]string{
38	"wmckinley":   {"tjefferson"},
39	"gwashington": {"jadams"},
40	"tjefferson":  {"gwashington", "jadams"},
41	"jadams":      {"gwashington", "tjefferson"},
42}
43
44func populatePresidentsGraph(table *Table) error {
45	ctx := context.Background()
46	for row, ss := range presidentsSocialGraph {
47		mut := NewMutation()
48		for _, name := range ss {
49			mut.Set("follows", name, 1000, []byte("1"))
50		}
51		if err := table.Apply(ctx, row, mut); err != nil {
52			return fmt.Errorf("Mutating row %q: %v", row, err)
53		}
54	}
55	return nil
56}
57
58var instanceToCreate string
59var instanceToCreateZone string
60
61func init() {
62	// Don't test instance creation by default, as quota is necessary and aborted tests could strand resources.
63	flag.StringVar(&instanceToCreate, "it.instance-to-create", "",
64		"The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.")
65	flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b",
66		"The zone in which to create the new test instance.")
67}
68
69func TestIntegration_ConditionalMutations(t *testing.T) {
70	ctx := context.Background()
71	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
72	if err != nil {
73		t.Fatal(err)
74	}
75	defer cleanup()
76
77	if err := populatePresidentsGraph(table); err != nil {
78		t.Fatal(err)
79	}
80
81	// Do a conditional mutation with a complex filter.
82	mutTrue := NewMutation()
83	mutTrue.Set("follows", "wmckinley", 1000, []byte("1"))
84	filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter("."))
85	mut := NewCondMutation(filter, mutTrue, nil)
86	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
87		t.Fatalf("Conditionally mutating row: %v", err)
88	}
89	// Do a second condition mutation with a filter that does not match,
90	// and thus no changes should be made.
91	mutTrue = NewMutation()
92	mutTrue.DeleteRow()
93	filter = ColumnFilter("snoop.dogg")
94	mut = NewCondMutation(filter, mutTrue, nil)
95	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
96		t.Fatalf("Conditionally mutating row: %v", err)
97	}
98
99	// Fetch a row.
100	row, err := table.ReadRow(ctx, "jadams")
101	if err != nil {
102		t.Fatalf("Reading a row: %v", err)
103	}
104	wantRow := Row{
105		"follows": []ReadItem{
106			{Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")},
107			{Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")},
108		},
109	}
110	if !testutil.Equal(row, wantRow) {
111		t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
112	}
113}
114
115func TestIntegration_PartialReadRows(t *testing.T) {
116	ctx := context.Background()
117	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
118	if err != nil {
119		t.Fatal(err)
120	}
121	defer cleanup()
122
123	if err := populatePresidentsGraph(table); err != nil {
124		t.Fatal(err)
125	}
126
127	// Do a scan and stop part way through.
128	// Verify that the ReadRows callback doesn't keep running.
129	stopped := false
130	err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
131		if r.Key() < "h" {
132			return true
133		}
134		if !stopped {
135			stopped = true
136			return false
137		}
138		t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key())
139		return false
140	})
141	if err != nil {
142		t.Fatalf("Partial ReadRows: %v", err)
143	}
144}
145
146func TestIntegration_ReadRowList(t *testing.T) {
147	ctx := context.Background()
148	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
149	if err != nil {
150		t.Fatal(err)
151	}
152	defer cleanup()
153
154	if err := populatePresidentsGraph(table); err != nil {
155		t.Fatal(err)
156	}
157
158	// Read a RowList
159	var elt []string
160	keys := RowList{"wmckinley", "gwashington", "jadams"}
161	want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1"
162	err = table.ReadRows(ctx, keys, func(r Row) bool {
163		for _, ris := range r {
164			for _, ri := range ris {
165				elt = append(elt, formatReadItem(ri))
166			}
167		}
168		return true
169	})
170	if err != nil {
171		t.Fatalf("read RowList: %v", err)
172	}
173
174	if got := strings.Join(elt, ","); got != want {
175		t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
176	}
177}
178
179func TestIntegration_DeleteRow(t *testing.T) {
180	ctx := context.Background()
181	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
182	if err != nil {
183		t.Fatal(err)
184	}
185	defer cleanup()
186
187	if err := populatePresidentsGraph(table); err != nil {
188		t.Fatal(err)
189	}
190
191	// Delete a row and check it goes away.
192	mut := NewMutation()
193	mut.DeleteRow()
194	if err := table.Apply(ctx, "wmckinley", mut); err != nil {
195		t.Fatalf("Apply DeleteRow: %v", err)
196	}
197	row, err := table.ReadRow(ctx, "wmckinley")
198	if err != nil {
199		t.Fatalf("Reading a row after DeleteRow: %v", err)
200	}
201	if len(row) != 0 {
202		t.Fatalf("Read non-zero row after DeleteRow: %v", row)
203	}
204}
205
206func TestIntegration_ReadModifyWrite(t *testing.T) {
207	ctx := context.Background()
208	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
209	if err != nil {
210		t.Fatal(err)
211	}
212	defer cleanup()
213
214	if err := populatePresidentsGraph(table); err != nil {
215		t.Fatal(err)
216	}
217
218	if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil {
219		t.Fatalf("Creating column family: %v", err)
220	}
221
222	appendRMW := func(b []byte) *ReadModifyWrite {
223		rmw := NewReadModifyWrite()
224		rmw.AppendValue("counter", "likes", b)
225		return rmw
226	}
227	incRMW := func(n int64) *ReadModifyWrite {
228		rmw := NewReadModifyWrite()
229		rmw.Increment("counter", "likes", n)
230		return rmw
231	}
232	rmwSeq := []struct {
233		desc string
234		rmw  *ReadModifyWrite
235		want []byte
236	}{
237		{
238			desc: "append #1",
239			rmw:  appendRMW([]byte{0, 0, 0}),
240			want: []byte{0, 0, 0},
241		},
242		{
243			desc: "append #2",
244			rmw:  appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17
245			want: []byte{0, 0, 0, 0, 0, 0, 0, 17},
246		},
247		{
248			desc: "increment",
249			rmw:  incRMW(8),
250			want: []byte{0, 0, 0, 0, 0, 0, 0, 25},
251		},
252	}
253	for _, step := range rmwSeq {
254		row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw)
255		if err != nil {
256			t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err)
257		}
258		// Make sure the modified cell returned by the RMW operation has a timestamp.
259		if row["counter"][0].Timestamp == 0 {
260			t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp)
261		}
262		clearTimestamps(row)
263		wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}}
264		if !testutil.Equal(row, wantRow) {
265			t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow)
266		}
267	}
268
269	// Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator.
270	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0}))
271	if err != nil {
272		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
273	}
274	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0}))
275	if err != nil {
276		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
277	}
278	// Get only the correct row back on read.
279	r, err := table.ReadRow(ctx, "issue-723-1")
280	if err != nil {
281		t.Fatalf("Reading row: %v", err)
282	}
283	if r.Key() != "issue-723-1" {
284		t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1")
285	}
286}
287
288func TestIntegration_ArbitraryTimestamps(t *testing.T) {
289	ctx := context.Background()
290	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
291	if err != nil {
292		t.Fatal(err)
293	}
294	defer cleanup()
295
296	// Test arbitrary timestamps more thoroughly.
297	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
298		t.Fatalf("Creating column family: %v", err)
299	}
300	const numVersions = 4
301	mut := NewMutation()
302	for i := 1; i < numVersions; i++ {
303		// Timestamps are used in thousands because the server
304		// only permits that granularity.
305		mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
306		mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
307	}
308	if err := table.Apply(ctx, "testrow", mut); err != nil {
309		t.Fatalf("Mutating row: %v", err)
310	}
311	r, err := table.ReadRow(ctx, "testrow")
312	if err != nil {
313		t.Fatalf("Reading row: %v", err)
314	}
315	wantRow := Row{"ts": []ReadItem{
316		// These should be returned in descending timestamp order.
317		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
318		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
319		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
320		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
321		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
322		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
323	}}
324	if !testutil.Equal(r, wantRow) {
325		t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow)
326	}
327
328	// Do the same read, but filter to the latest two versions.
329	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
330	if err != nil {
331		t.Fatalf("Reading row: %v", err)
332	}
333	wantRow = Row{"ts": []ReadItem{
334		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
335		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
336		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
337		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
338	}}
339	if !testutil.Equal(r, wantRow) {
340		t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
341	}
342	// Check cell offset / limit
343	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3)))
344	if err != nil {
345		t.Fatalf("Reading row: %v", err)
346	}
347	wantRow = Row{"ts": []ReadItem{
348		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
349		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
350		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
351	}}
352	if !testutil.Equal(r, wantRow) {
353		t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow)
354	}
355	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3)))
356	if err != nil {
357		t.Fatalf("Reading row: %v", err)
358	}
359	wantRow = Row{"ts": []ReadItem{
360		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
361		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
362		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
363	}}
364	if !testutil.Equal(r, wantRow) {
365		t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow)
366	}
367	// Check timestamp range filtering (with truncation)
368	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000)))
369	if err != nil {
370		t.Fatalf("Reading row: %v", err)
371	}
372	wantRow = Row{"ts": []ReadItem{
373		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
374		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
375		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
376		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
377	}}
378	if !testutil.Equal(r, wantRow) {
379		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow)
380	}
381	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0)))
382	if err != nil {
383		t.Fatalf("Reading row: %v", err)
384	}
385	wantRow = Row{"ts": []ReadItem{
386		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
387		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
388		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
389		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
390		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
391		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
392	}}
393	if !testutil.Equal(r, wantRow) {
394		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow)
395	}
396	// Delete non-existing cells, no such column family in this row
397	// Should not delete anything
398	if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil {
399		t.Fatalf("Creating column family: %v", err)
400	}
401	mut = NewMutation()
402	mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval
403	if err := table.Apply(ctx, "testrow", mut); err != nil {
404		t.Fatalf("Mutating row: %v", err)
405	}
406	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
407	if err != nil {
408		t.Fatalf("Reading row: %v", err)
409	}
410	if !testutil.Equal(r, wantRow) {
411		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
412	}
413	// Delete non-existing cells, no such column in this column family
414	// Should not delete anything
415	mut = NewMutation()
416	mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval
417	if err := table.Apply(ctx, "testrow", mut); err != nil {
418		t.Fatalf("Mutating row: %v", err)
419	}
420	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
421	if err != nil {
422		t.Fatalf("Reading row: %v", err)
423	}
424	if !testutil.Equal(r, wantRow) {
425		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
426	}
427	// Delete the cell with timestamp 2000 and repeat the last read,
428	// checking that we get ts 3000 and ts 1000.
429	mut = NewMutation()
430	mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval
431	if err := table.Apply(ctx, "testrow", mut); err != nil {
432		t.Fatalf("Mutating row: %v", err)
433	}
434	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
435	if err != nil {
436		t.Fatalf("Reading row: %v", err)
437	}
438	wantRow = Row{"ts": []ReadItem{
439		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
440		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
441		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
442		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
443	}}
444	if !testutil.Equal(r, wantRow) {
445		t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow)
446	}
447
448	// Check DeleteCellsInFamily
449	if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil {
450		t.Fatalf("Creating column family: %v", err)
451	}
452
453	mut = NewMutation()
454	mut.Set("status", "start", 2000, []byte("2"))
455	mut.Set("status", "end", 3000, []byte("3"))
456	mut.Set("ts", "col", 1000, []byte("1"))
457	if err := table.Apply(ctx, "row1", mut); err != nil {
458		t.Fatalf("Mutating row: %v", err)
459	}
460	if err := table.Apply(ctx, "row2", mut); err != nil {
461		t.Fatalf("Mutating row: %v", err)
462	}
463
464	mut = NewMutation()
465	mut.DeleteCellsInFamily("status")
466	if err := table.Apply(ctx, "row1", mut); err != nil {
467		t.Fatalf("Delete cf: %v", err)
468	}
469
470	// ColumnFamily removed
471	r, err = table.ReadRow(ctx, "row1")
472	if err != nil {
473		t.Fatalf("Reading row: %v", err)
474	}
475	wantRow = Row{"ts": []ReadItem{
476		{Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
477	}}
478	if !testutil.Equal(r, wantRow) {
479		t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow)
480	}
481
482	// ColumnFamily not removed
483	r, err = table.ReadRow(ctx, "row2")
484	if err != nil {
485		t.Fatalf("Reading row: %v", err)
486	}
487	wantRow = Row{
488		"ts": []ReadItem{
489			{Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
490		},
491		"status": []ReadItem{
492			{Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")},
493			{Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
494		},
495	}
496	if !testutil.Equal(r, wantRow) {
497		t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow)
498	}
499
500	// Check DeleteCellsInColumn
501	mut = NewMutation()
502	mut.Set("status", "start", 2000, []byte("2"))
503	mut.Set("status", "middle", 3000, []byte("3"))
504	mut.Set("status", "end", 1000, []byte("1"))
505	if err := table.Apply(ctx, "row3", mut); err != nil {
506		t.Fatalf("Mutating row: %v", err)
507	}
508	mut = NewMutation()
509	mut.DeleteCellsInColumn("status", "middle")
510	if err := table.Apply(ctx, "row3", mut); err != nil {
511		t.Fatalf("Delete column: %v", err)
512	}
513	r, err = table.ReadRow(ctx, "row3")
514	if err != nil {
515		t.Fatalf("Reading row: %v", err)
516	}
517	wantRow = Row{
518		"status": []ReadItem{
519			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
520			{Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
521		},
522	}
523	if !testutil.Equal(r, wantRow) {
524		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
525	}
526	mut = NewMutation()
527	mut.DeleteCellsInColumn("status", "start")
528	if err := table.Apply(ctx, "row3", mut); err != nil {
529		t.Fatalf("Delete column: %v", err)
530	}
531	r, err = table.ReadRow(ctx, "row3")
532	if err != nil {
533		t.Fatalf("Reading row: %v", err)
534	}
535	wantRow = Row{
536		"status": []ReadItem{
537			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
538		},
539	}
540	if !testutil.Equal(r, wantRow) {
541		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
542	}
543	mut = NewMutation()
544	mut.DeleteCellsInColumn("status", "end")
545	if err := table.Apply(ctx, "row3", mut); err != nil {
546		t.Fatalf("Delete column: %v", err)
547	}
548	r, err = table.ReadRow(ctx, "row3")
549	if err != nil {
550		t.Fatalf("Reading row: %v", err)
551	}
552	if len(r) != 0 {
553		t.Fatalf("Delete column: got %v, want empty row", r)
554	}
555	// Add same cell after delete
556	mut = NewMutation()
557	mut.Set("status", "end", 1000, []byte("1"))
558	if err := table.Apply(ctx, "row3", mut); err != nil {
559		t.Fatalf("Mutating row: %v", err)
560	}
561	r, err = table.ReadRow(ctx, "row3")
562	if err != nil {
563		t.Fatalf("Reading row: %v", err)
564	}
565	if !testutil.Equal(r, wantRow) {
566		t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow)
567	}
568}
569
570func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) {
571	ctx := context.Background()
572	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
573	if err != nil {
574		t.Fatal(err)
575	}
576	defer cleanup()
577
578	if err := populatePresidentsGraph(table); err != nil {
579		t.Fatal(err)
580	}
581
582	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
583		t.Fatalf("Creating column family: %v", err)
584	}
585
586	// Do highly concurrent reads/writes.
587	const maxConcurrency = 1000
588	var wg sync.WaitGroup
589	for i := 0; i < maxConcurrency; i++ {
590		wg.Add(1)
591		go func() {
592			defer wg.Done()
593			switch r := rand.Intn(100); { // r ∈ [0,100)
594			case 0 <= r && r < 30:
595				// Do a read.
596				_, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1)))
597				if err != nil {
598					t.Errorf("Concurrent read: %v", err)
599				}
600			case 30 <= r && r < 100:
601				// Do a write.
602				mut := NewMutation()
603				mut.Set("ts", "col", 1000, []byte("data"))
604				if err := table.Apply(ctx, "testrow", mut); err != nil {
605					t.Errorf("Concurrent write: %v", err)
606				}
607			}
608		}()
609	}
610	wg.Wait()
611}
612
613func TestIntegration_LargeReadsWritesAndScans(t *testing.T) {
614	ctx := context.Background()
615	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
616	if err != nil {
617		t.Fatal(err)
618	}
619	defer cleanup()
620
621	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
622		t.Fatalf("Creating column family: %v", err)
623	}
624
625	bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set.
626	nonsense := []byte("lorem ipsum dolor sit amet, ")
627	fill(bigBytes, nonsense)
628	mut := NewMutation()
629	mut.Set("ts", "col", 1000, bigBytes)
630	if err := table.Apply(ctx, "bigrow", mut); err != nil {
631		t.Fatalf("Big write: %v", err)
632	}
633	r, err := table.ReadRow(ctx, "bigrow")
634	if err != nil {
635		t.Fatalf("Big read: %v", err)
636	}
637	wantRow := Row{"ts": []ReadItem{
638		{Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes},
639	}}
640	if !testutil.Equal(r, wantRow) {
641		t.Fatalf("Big read returned incorrect bytes: %v", r)
642	}
643
644	var wg sync.WaitGroup
645	// Now write 1000 rows, each with 82 KB values, then scan them all.
646	medBytes := make([]byte, 82<<10)
647	fill(medBytes, nonsense)
648	sem := make(chan int, 50) // do up to 50 mutations at a time.
649	for i := 0; i < 1000; i++ {
650		mut := NewMutation()
651		mut.Set("ts", "big-scan", 1000, medBytes)
652		row := fmt.Sprintf("row-%d", i)
653		wg.Add(1)
654		go func() {
655			defer wg.Done()
656			defer func() { <-sem }()
657			sem <- 1
658			if err := table.Apply(ctx, row, mut); err != nil {
659				t.Errorf("Preparing large scan: %v", err)
660			}
661		}()
662	}
663	wg.Wait()
664	n := 0
665	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
666		for _, ris := range r {
667			for _, ri := range ris {
668				n += len(ri.Value)
669			}
670		}
671		return true
672	}, RowFilter(ColumnFilter("big-scan")))
673	if err != nil {
674		t.Fatalf("Doing large scan: %v", err)
675	}
676	if want := 1000 * len(medBytes); n != want {
677		t.Fatalf("Large scan returned %d bytes, want %d", n, want)
678	}
679	// Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption.
680	rc := 0
681	wantRc := 3
682	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
683		rc++
684		return true
685	}, LimitRows(int64(wantRc)))
686	if err != nil {
687		t.Fatal(err)
688	}
689	if rc != wantRc {
690		t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc)
691	}
692
693	// Test bulk mutations
694	if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil {
695		t.Fatalf("Creating column family: %v", err)
696	}
697	bulkData := map[string][]string{
698		"red sox":  {"2004", "2007", "2013"},
699		"patriots": {"2001", "2003", "2004", "2014"},
700		"celtics":  {"1981", "1984", "1986", "2008"},
701	}
702	var rowKeys []string
703	var muts []*Mutation
704	for row, ss := range bulkData {
705		mut := NewMutation()
706		for _, name := range ss {
707			mut.Set("bulk", name, 1000, []byte("1"))
708		}
709		rowKeys = append(rowKeys, row)
710		muts = append(muts, mut)
711	}
712	status, err := table.ApplyBulk(ctx, rowKeys, muts)
713	if err != nil {
714		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
715	}
716	if status != nil {
717		t.Fatalf("non-nil errors: %v", err)
718	}
719
720	// Read each row back
721	for rowKey, ss := range bulkData {
722		row, err := table.ReadRow(ctx, rowKey)
723		if err != nil {
724			t.Fatalf("Reading a bulk row: %v", err)
725		}
726		var wantItems []ReadItem
727		for _, val := range ss {
728			wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")})
729		}
730		wantRow := Row{"bulk": wantItems}
731		if !testutil.Equal(row, wantRow) {
732			t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
733		}
734	}
735
736	// Test bulk write errors.
737	// Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error.
738	badMut := NewMutation()
739	badMut.Set("badfamily", "col", ServerTime, nil)
740	badMut2 := NewMutation()
741	badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1"))
742	status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2})
743	if err != nil {
744		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
745	}
746	if status == nil {
747		t.Fatalf("No errors for bad bulk mutation")
748	} else if status[0] == nil || status[1] == nil {
749		t.Fatalf("No error for bad bulk mutation")
750	}
751}
752
753func TestIntegration_Read(t *testing.T) {
754	ctx := context.Background()
755	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
756	if err != nil {
757		t.Fatal(err)
758	}
759	defer cleanup()
760
761	// Insert some data.
762	initialData := map[string][]string{
763		"wmckinley":   {"tjefferson"},
764		"gwashington": {"jadams"},
765		"tjefferson":  {"gwashington", "jadams", "wmckinley"},
766		"jadams":      {"gwashington", "tjefferson"},
767	}
768	for row, ss := range initialData {
769		mut := NewMutation()
770		for _, name := range ss {
771			mut.Set("follows", name, 1000, []byte("1"))
772		}
773		if err := table.Apply(ctx, row, mut); err != nil {
774			t.Fatalf("Mutating row %q: %v", row, err)
775		}
776	}
777
778	for _, test := range []struct {
779		desc   string
780		rr     RowSet
781		filter Filter     // may be nil
782		limit  ReadOption // may be nil
783
784		// We do the read, grab all the cells, turn them into "<row>-<col>-<val>",
785		// and join with a comma.
786		want string
787	}{
788		{
789			desc: "read all, unfiltered",
790			rr:   RowRange{},
791			want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
792		},
793		{
794			desc: "read with InfiniteRange, unfiltered",
795			rr:   InfiniteRange("tjefferson"),
796			want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
797		},
798		{
799			desc: "read with NewRange, unfiltered",
800			rr:   NewRange("gargamel", "hubbard"),
801			want: "gwashington-jadams-1",
802		},
803		{
804			desc: "read with PrefixRange, unfiltered",
805			rr:   PrefixRange("jad"),
806			want: "jadams-gwashington-1,jadams-tjefferson-1",
807		},
808		{
809			desc: "read with SingleRow, unfiltered",
810			rr:   SingleRow("wmckinley"),
811			want: "wmckinley-tjefferson-1",
812		},
813		{
814			desc:   "read all, with ColumnFilter",
815			rr:     RowRange{},
816			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
817			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
818		},
819		{
820			desc:   "read all, with ColumnFilter, prefix",
821			rr:     RowRange{},
822			filter: ColumnFilter("j"), // no matches
823			want:   "",
824		},
825		{
826			desc:   "read range, with ColumnRangeFilter",
827			rr:     RowRange{},
828			filter: ColumnRangeFilter("follows", "h", "k"),
829			want:   "gwashington-jadams-1,tjefferson-jadams-1",
830		},
831		{
832			desc:   "read range from empty, with ColumnRangeFilter",
833			rr:     RowRange{},
834			filter: ColumnRangeFilter("follows", "", "u"),
835			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
836		},
837		{
838			desc:   "read range from start to empty, with ColumnRangeFilter",
839			rr:     RowRange{},
840			filter: ColumnRangeFilter("follows", "h", ""),
841			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
842		},
843		{
844			desc:   "read with RowKeyFilter",
845			rr:     RowRange{},
846			filter: RowKeyFilter(".*wash.*"),
847			want:   "gwashington-jadams-1",
848		},
849		{
850			desc:   "read with RowKeyFilter, prefix",
851			rr:     RowRange{},
852			filter: RowKeyFilter("gwash"),
853			want:   "",
854		},
855		{
856			desc:   "read with RowKeyFilter, no matches",
857			rr:     RowRange{},
858			filter: RowKeyFilter(".*xxx.*"),
859			want:   "",
860		},
861		{
862			desc:   "read with FamilyFilter, no matches",
863			rr:     RowRange{},
864			filter: FamilyFilter(".*xxx.*"),
865			want:   "",
866		},
867		{
868			desc:   "read with ColumnFilter + row limit",
869			rr:     RowRange{},
870			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
871			limit:  LimitRows(2),
872			want:   "gwashington-jadams-1,jadams-tjefferson-1",
873		},
874		{
875			desc:   "read all, strip values",
876			rr:     RowRange{},
877			filter: StripValueFilter(),
878			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
879		},
880		{
881			desc:   "read with ColumnFilter + row limit + strip values",
882			rr:     RowRange{},
883			filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson"
884			limit:  LimitRows(2),
885			want:   "gwashington-jadams-,jadams-tjefferson-",
886		},
887		{
888			desc:   "read with condition, strip values on true",
889			rr:     RowRange{},
890			filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil),
891			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
892		},
893		{
894			desc:   "read with condition, strip values on false",
895			rr:     RowRange{},
896			filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()),
897			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
898		},
899		{
900			desc:   "read with ValueRangeFilter + row limit",
901			rr:     RowRange{},
902			filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1"
903			limit:  LimitRows(2),
904			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1",
905		},
906		{
907			desc:   "read with ValueRangeFilter, no match on exclusive end",
908			rr:     RowRange{},
909			filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match
910			want:   "",
911		},
912		{
913			desc:   "read with ValueRangeFilter, no matches",
914			rr:     RowRange{},
915			filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing
916			want:   "",
917		},
918		{
919			desc:   "read with InterleaveFilter, no matches on all filters",
920			rr:     RowRange{},
921			filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")),
922			want:   "",
923		},
924		{
925			desc:   "read with InterleaveFilter, no duplicate cells",
926			rr:     RowRange{},
927			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")),
928			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
929		},
930		{
931			desc:   "read with InterleaveFilter, with duplicate cells",
932			rr:     RowRange{},
933			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")),
934			want:   "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1",
935		},
936		{
937			desc: "read with a RowRangeList and no filter",
938			rr:   RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")},
939			want: "gwashington-jadams-1,wmckinley-tjefferson-1",
940		},
941		{
942			desc:   "chain that excludes rows and matches nothing, in a condition",
943			rr:     RowRange{},
944			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil),
945			want:   "",
946		},
947		{
948			desc:   "chain that ends with an interleave that has no match. covers #804",
949			rr:     RowRange{},
950			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil),
951			want:   "",
952		},
953	} {
954		t.Run(test.desc, func(t *testing.T) {
955			var opts []ReadOption
956			if test.filter != nil {
957				opts = append(opts, RowFilter(test.filter))
958			}
959			if test.limit != nil {
960				opts = append(opts, test.limit)
961			}
962			var elt []string
963			err := table.ReadRows(ctx, test.rr, func(r Row) bool {
964				for _, ris := range r {
965					for _, ri := range ris {
966						elt = append(elt, formatReadItem(ri))
967					}
968				}
969				return true
970			}, opts...)
971			if err != nil {
972				t.Fatal(err)
973			}
974			if got := strings.Join(elt, ","); got != test.want {
975				t.Fatalf("got %q\nwant %q", got, test.want)
976			}
977		})
978	}
979}
980
981func TestIntegration_SampleRowKeys(t *testing.T) {
982	ctx := context.Background()
983	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
984	if err != nil {
985		t.Fatal(err)
986	}
987	defer cleanup()
988
989	// Insert some data.
990	initialData := map[string][]string{
991		"wmckinley11":   {"tjefferson11"},
992		"gwashington77": {"jadams77"},
993		"tjefferson0":   {"gwashington0", "jadams0"},
994	}
995
996	for row, ss := range initialData {
997		mut := NewMutation()
998		for _, name := range ss {
999			mut.Set("follows", name, 1000, []byte("1"))
1000		}
1001		if err := table.Apply(ctx, row, mut); err != nil {
1002			t.Fatalf("Mutating row %q: %v", row, err)
1003		}
1004	}
1005	sampleKeys, err := table.SampleRowKeys(context.Background())
1006	if err != nil {
1007		t.Fatalf("%s: %v", "SampleRowKeys:", err)
1008	}
1009	if len(sampleKeys) == 0 {
1010		t.Error("SampleRowKeys length 0")
1011	}
1012}
1013
1014func TestIntegration_Admin(t *testing.T) {
1015	testEnv, err := NewIntegrationEnv()
1016	if err != nil {
1017		t.Fatalf("IntegrationEnv: %v", err)
1018	}
1019	defer testEnv.Close()
1020
1021	timeout := 2 * time.Second
1022	if testEnv.Config().UseProd {
1023		timeout = 5 * time.Minute
1024	}
1025	ctx, _ := context.WithTimeout(context.Background(), timeout)
1026
1027	adminClient, err := testEnv.NewAdminClient()
1028	if err != nil {
1029		t.Fatalf("NewAdminClient: %v", err)
1030	}
1031	defer adminClient.Close()
1032
1033	iAdminClient, err := testEnv.NewInstanceAdminClient()
1034	if err != nil {
1035		t.Fatalf("NewInstanceAdminClient: %v", err)
1036	}
1037	if iAdminClient != nil {
1038		defer iAdminClient.Close()
1039
1040		iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1041		if err != nil {
1042			t.Errorf("InstanceInfo: %v", err)
1043		}
1044		if iInfo.Name != adminClient.instance {
1045			t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1046		}
1047	}
1048
1049	list := func() []string {
1050		tbls, err := adminClient.Tables(ctx)
1051		if err != nil {
1052			t.Fatalf("Fetching list of tables: %v", err)
1053		}
1054		sort.Strings(tbls)
1055		return tbls
1056	}
1057	containsAll := func(got, want []string) bool {
1058		gotSet := make(map[string]bool)
1059
1060		for _, s := range got {
1061			gotSet[s] = true
1062		}
1063		for _, s := range want {
1064			if !gotSet[s] {
1065				return false
1066			}
1067		}
1068		return true
1069	}
1070
1071	defer adminClient.DeleteTable(ctx, "mytable")
1072
1073	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1074		t.Fatalf("Creating table: %v", err)
1075	}
1076
1077	defer adminClient.DeleteTable(ctx, "myothertable")
1078
1079	if err := adminClient.CreateTable(ctx, "myothertable"); err != nil {
1080		t.Fatalf("Creating table: %v", err)
1081	}
1082
1083	if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) {
1084		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1085	}
1086
1087	must(adminClient.WaitForReplication(ctx, "mytable"))
1088
1089	if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil {
1090		t.Fatalf("Deleting table: %v", err)
1091	}
1092	tables := list()
1093	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1094		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1095	}
1096	if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) {
1097		t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted)
1098	}
1099
1100	tblConf := TableConf{
1101		TableID: "conftable",
1102		Families: map[string]GCPolicy{
1103			"fam1": MaxVersionsPolicy(1),
1104			"fam2": MaxVersionsPolicy(2),
1105		},
1106	}
1107	if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
1108		t.Fatalf("Creating table from TableConf: %v", err)
1109	}
1110	defer adminClient.DeleteTable(ctx, tblConf.TableID)
1111
1112	tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID)
1113	if err != nil {
1114		t.Fatalf("Getting table info: %v", err)
1115	}
1116	sort.Strings(tblInfo.Families)
1117	wantFams := []string{"fam1", "fam2"}
1118	if !testutil.Equal(tblInfo.Families, wantFams) {
1119		t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams)
1120	}
1121
1122	// Populate mytable and drop row ranges
1123	if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil {
1124		t.Fatalf("Creating column family: %v", err)
1125	}
1126
1127	client, err := testEnv.NewClient()
1128	if err != nil {
1129		t.Fatalf("NewClient: %v", err)
1130	}
1131	defer client.Close()
1132
1133	tbl := client.Open("mytable")
1134
1135	prefixes := []string{"a", "b", "c"}
1136	for _, prefix := range prefixes {
1137		for i := 0; i < 5; i++ {
1138			mut := NewMutation()
1139			mut.Set("cf", "col", 1000, []byte("1"))
1140			if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil {
1141				t.Fatalf("Mutating row: %v", err)
1142			}
1143		}
1144	}
1145
1146	if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil {
1147		t.Errorf("DropRowRange a: %v", err)
1148	}
1149	if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil {
1150		t.Errorf("DropRowRange c: %v", err)
1151	}
1152	if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil {
1153		t.Errorf("DropRowRange x: %v", err)
1154	}
1155
1156	var gotRowCount int
1157	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1158		gotRowCount++
1159		if !strings.HasPrefix(row.Key(), "b") {
1160			t.Errorf("Invalid row after dropping range: %v", row)
1161		}
1162		return true
1163	}))
1164	if gotRowCount != 5 {
1165		t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5)
1166	}
1167}
1168
1169func TestIntegration_AdminCreateInstance(t *testing.T) {
1170	if instanceToCreate == "" {
1171		t.Skip("instanceToCreate not set, skipping instance creation testing")
1172	}
1173
1174	testEnv, err := NewIntegrationEnv()
1175	if err != nil {
1176		t.Fatalf("IntegrationEnv: %v", err)
1177	}
1178	defer testEnv.Close()
1179
1180	if !testEnv.Config().UseProd {
1181		t.Skip("emulator doesn't support instance creation")
1182	}
1183
1184	timeout := 5 * time.Minute
1185	ctx, _ := context.WithTimeout(context.Background(), timeout)
1186
1187	iAdminClient, err := testEnv.NewInstanceAdminClient()
1188	if err != nil {
1189		t.Fatalf("NewInstanceAdminClient: %v", err)
1190	}
1191	defer iAdminClient.Close()
1192
1193	clusterID := instanceToCreate + "-cluster"
1194
1195	// Create a development instance
1196	conf := &InstanceConf{
1197		InstanceId:   instanceToCreate,
1198		ClusterId:    clusterID,
1199		DisplayName:  "test instance",
1200		Zone:         instanceToCreateZone,
1201		InstanceType: DEVELOPMENT,
1202	}
1203	if err := iAdminClient.CreateInstance(ctx, conf); err != nil {
1204		t.Fatalf("CreateInstance: %v", err)
1205	}
1206	defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
1207
1208	iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
1209	if err != nil {
1210		t.Fatalf("InstanceInfo: %v", err)
1211	}
1212
1213	// Basic return values are tested elsewhere, check instance type
1214	if iInfo.InstanceType != DEVELOPMENT {
1215		t.Fatalf("Instance is not DEVELOPMENT: %v", err)
1216	}
1217
1218	// Update everything we can about the instance in one call.
1219	confWithClusters := &InstanceWithClustersConfig{
1220		InstanceID:   instanceToCreate,
1221		DisplayName:  "new display name",
1222		InstanceType: PRODUCTION,
1223		Clusters: []ClusterConfig{
1224			{ClusterID: clusterID, NumNodes: 5, StorageType: HDD}},
1225	}
1226
1227	if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil {
1228		t.Fatalf("UpdateInstanceWithClusters: %v", err)
1229	}
1230
1231	iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
1232	if err != nil {
1233		t.Fatalf("InstanceInfo: %v", err)
1234	}
1235
1236	if iInfo.InstanceType != PRODUCTION {
1237		t.Fatalf("Instance type is not PRODUCTION: %v", err)
1238	}
1239	if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
1240		t.Fatalf("Display name: %q, want: %q", got, want)
1241	}
1242
1243	cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
1244	if err != nil {
1245		t.Fatalf("GetCluster: %v", err)
1246	}
1247
1248	if cInfo.ServeNodes != 5 {
1249		t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
1250	}
1251
1252	if cInfo.StorageType != HDD {
1253		t.Fatalf("StorageType: %v, want: %v", cInfo.StorageType, HDD)
1254	}
1255}
1256
1257func TestIntegration_AdminSnapshot(t *testing.T) {
1258	testEnv, err := NewIntegrationEnv()
1259	if err != nil {
1260		t.Fatalf("IntegrationEnv: %v", err)
1261	}
1262	defer testEnv.Close()
1263
1264	if !testEnv.Config().UseProd {
1265		t.Skip("emulator doesn't support snapshots")
1266	}
1267
1268	timeout := 2 * time.Second
1269	if testEnv.Config().UseProd {
1270		timeout = 5 * time.Minute
1271	}
1272	ctx, _ := context.WithTimeout(context.Background(), timeout)
1273
1274	adminClient, err := testEnv.NewAdminClient()
1275	if err != nil {
1276		t.Fatalf("NewAdminClient: %v", err)
1277	}
1278	defer adminClient.Close()
1279
1280	table := testEnv.Config().Table
1281	cluster := testEnv.Config().Cluster
1282
1283	list := func(cluster string) ([]*SnapshotInfo, error) {
1284		infos := []*SnapshotInfo(nil)
1285
1286		it := adminClient.Snapshots(ctx, cluster)
1287		for {
1288			s, err := it.Next()
1289			if err == iterator.Done {
1290				break
1291			}
1292			if err != nil {
1293				return nil, err
1294			}
1295			infos = append(infos, s)
1296		}
1297		return infos, err
1298	}
1299
1300	// Delete the table at the end of the test. Schedule ahead of time
1301	// in case the client fails
1302	defer adminClient.DeleteTable(ctx, table)
1303
1304	if err := adminClient.CreateTable(ctx, table); err != nil {
1305		t.Fatalf("Creating table: %v", err)
1306	}
1307
1308	// Precondition: no snapshots
1309	snapshots, err := list(cluster)
1310	if err != nil {
1311		t.Fatalf("Initial snapshot list: %v", err)
1312	}
1313	if got, want := len(snapshots), 0; got != want {
1314		t.Fatalf("Initial snapshot list len: %d, want: %d", got, want)
1315	}
1316
1317	// Create snapshot
1318	defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot")
1319
1320	if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil {
1321		t.Fatalf("Creating snaphot: %v", err)
1322	}
1323
1324	// List snapshot
1325	snapshots, err = list(cluster)
1326	if err != nil {
1327		t.Fatalf("Listing snapshots: %v", err)
1328	}
1329	if got, want := len(snapshots), 1; got != want {
1330		t.Fatalf("Listing snapshot count: %d, want: %d", got, want)
1331	}
1332	if got, want := snapshots[0].Name, "mysnapshot"; got != want {
1333		t.Fatalf("Snapshot name: %s, want: %s", got, want)
1334	}
1335	if got, want := snapshots[0].SourceTable, table; got != want {
1336		t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want)
1337	}
1338	if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
1339		t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want)
1340	}
1341
1342	// Get snapshot
1343	snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot")
1344	if err != nil {
1345		t.Fatalf("SnapshotInfo: %v", snapshot)
1346	}
1347	if got, want := *snapshot, *snapshots[0]; got != want {
1348		t.Fatalf("SnapshotInfo: %v, want: %v", got, want)
1349	}
1350
1351	// Restore
1352	restoredTable := table + "-restored"
1353	defer adminClient.DeleteTable(ctx, restoredTable)
1354	if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil {
1355		t.Fatalf("CreateTableFromSnapshot: %v", err)
1356	}
1357	if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
1358		t.Fatalf("Restored TableInfo: %v", err)
1359	}
1360
1361	// Delete snapshot
1362	if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil {
1363		t.Fatalf("DeleteSnapshot: %v", err)
1364	}
1365	snapshots, err = list(cluster)
1366	if err != nil {
1367		t.Fatalf("List after Delete: %v", err)
1368	}
1369	if got, want := len(snapshots), 0; got != want {
1370		t.Fatalf("List after delete len: %d, want: %d", got, want)
1371	}
1372}
1373
1374func TestIntegration_Granularity(t *testing.T) {
1375	testEnv, err := NewIntegrationEnv()
1376	if err != nil {
1377		t.Fatalf("IntegrationEnv: %v", err)
1378	}
1379	defer testEnv.Close()
1380
1381	timeout := 2 * time.Second
1382	if testEnv.Config().UseProd {
1383		timeout = 5 * time.Minute
1384	}
1385	ctx, _ := context.WithTimeout(context.Background(), timeout)
1386	ctx = mergeOutgoingMetadata(ctx, withGoogleClientInfo())
1387
1388	adminClient, err := testEnv.NewAdminClient()
1389	if err != nil {
1390		t.Fatalf("NewAdminClient: %v", err)
1391	}
1392	defer adminClient.Close()
1393
1394	list := func() []string {
1395		tbls, err := adminClient.Tables(ctx)
1396		if err != nil {
1397			t.Fatalf("Fetching list of tables: %v", err)
1398		}
1399		sort.Strings(tbls)
1400		return tbls
1401	}
1402	containsAll := func(got, want []string) bool {
1403		gotSet := make(map[string]bool)
1404
1405		for _, s := range got {
1406			gotSet[s] = true
1407		}
1408		for _, s := range want {
1409			if !gotSet[s] {
1410				return false
1411			}
1412		}
1413		return true
1414	}
1415
1416	defer adminClient.DeleteTable(ctx, "mytable")
1417
1418	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1419		t.Fatalf("Creating table: %v", err)
1420	}
1421
1422	tables := list()
1423	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1424		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1425	}
1426
1427	// calling ModifyColumnFamilies to check the granularity of table
1428	prefix := adminClient.instancePrefix()
1429	req := &btapb.ModifyColumnFamiliesRequest{
1430		Name: prefix + "/tables/" + "mytable",
1431		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
1432			Id:  "cf",
1433			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}},
1434		}},
1435	}
1436	table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req)
1437	if err != nil {
1438		t.Fatalf("Creating column family: %v", err)
1439	}
1440	if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) {
1441		t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS))
1442	}
1443}
1444
1445func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) {
1446	testEnv, err := NewIntegrationEnv()
1447	if err != nil {
1448		t.Fatalf("IntegrationEnv: %v", err)
1449	}
1450	defer testEnv.Close()
1451
1452	timeout := 2 * time.Second
1453	if testEnv.Config().UseProd {
1454		timeout = 5 * time.Minute
1455	}
1456	ctx, cancel := context.WithTimeout(context.Background(), timeout)
1457	defer cancel()
1458
1459	adminClient, err := testEnv.NewAdminClient()
1460	if err != nil {
1461		t.Fatalf("NewAdminClient: %v", err)
1462	}
1463	defer adminClient.Close()
1464
1465	iAdminClient, err := testEnv.NewInstanceAdminClient()
1466	if err != nil {
1467		t.Fatalf("NewInstanceAdminClient: %v", err)
1468	}
1469
1470	if iAdminClient == nil {
1471		return
1472	}
1473
1474	defer iAdminClient.Close()
1475	profile := ProfileConf{
1476		ProfileID:     "app_profile1",
1477		InstanceID:    adminClient.instance,
1478		ClusterID:     testEnv.Config().Cluster,
1479		Description:   "creating new app profile 1",
1480		RoutingPolicy: SingleClusterRouting,
1481	}
1482
1483	createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile)
1484	if err != nil {
1485		t.Fatalf("Creating app profile: %v", err)
1486
1487	}
1488
1489	gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
1490
1491	if err != nil {
1492		t.Fatalf("Get app profile: %v", err)
1493	}
1494
1495	if !proto.Equal(createdProfile, gotProfile) {
1496		t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name)
1497
1498	}
1499
1500	list := func(instanceID string) ([]*btapb.AppProfile, error) {
1501		profiles := []*btapb.AppProfile(nil)
1502
1503		it := iAdminClient.ListAppProfiles(ctx, instanceID)
1504		for {
1505			s, err := it.Next()
1506			if err == iterator.Done {
1507				break
1508			}
1509			if err != nil {
1510				return nil, err
1511			}
1512			profiles = append(profiles, s)
1513		}
1514		return profiles, err
1515	}
1516
1517	profiles, err := list(adminClient.instance)
1518	if err != nil {
1519		t.Fatalf("List app profile: %v", err)
1520	}
1521
1522	if got, want := len(profiles), 1; got != want {
1523		t.Fatalf("Initial app profile list len: %d, want: %d", got, want)
1524	}
1525
1526	for _, test := range []struct {
1527		desc   string
1528		uattrs ProfileAttrsToUpdate
1529		want   *btapb.AppProfile // nil means error
1530	}{
1531		{
1532			desc:   "empty update",
1533			uattrs: ProfileAttrsToUpdate{},
1534			want:   nil,
1535		},
1536
1537		{
1538			desc:   "empty description update",
1539			uattrs: ProfileAttrsToUpdate{Description: ""},
1540			want: &btapb.AppProfile{
1541				Name:          gotProfile.Name,
1542				Description:   "",
1543				RoutingPolicy: gotProfile.RoutingPolicy,
1544				Etag:          gotProfile.Etag},
1545		},
1546		{
1547			desc: "routing update",
1548			uattrs: ProfileAttrsToUpdate{
1549				RoutingPolicy: SingleClusterRouting,
1550				ClusterID:     testEnv.Config().Cluster,
1551			},
1552			want: &btapb.AppProfile{
1553				Name:        gotProfile.Name,
1554				Description: "",
1555				Etag:        gotProfile.Etag,
1556				RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
1557					SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
1558						ClusterId: testEnv.Config().Cluster,
1559					}},
1560			},
1561		},
1562	} {
1563		err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs)
1564		if err != nil {
1565			if test.want != nil {
1566				t.Errorf("%s: %v", test.desc, err)
1567			}
1568			continue
1569		}
1570		if err == nil && test.want == nil {
1571			t.Errorf("%s: got nil, want error", test.desc)
1572			continue
1573		}
1574
1575		got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
1576
1577		if !proto.Equal(got, test.want) {
1578			t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want)
1579		}
1580
1581	}
1582
1583	err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1")
1584	if err != nil {
1585		t.Fatalf("Delete app profile: %v", err)
1586	}
1587
1588}
1589
1590func TestIntegration_InstanceUpdate(t *testing.T) {
1591	testEnv, err := NewIntegrationEnv()
1592	if err != nil {
1593		t.Fatalf("IntegrationEnv: %v", err)
1594	}
1595	defer testEnv.Close()
1596
1597	timeout := 2 * time.Second
1598	if testEnv.Config().UseProd {
1599		timeout = 5 * time.Minute
1600	}
1601	ctx, cancel := context.WithTimeout(context.Background(), timeout)
1602	defer cancel()
1603
1604	adminClient, err := testEnv.NewAdminClient()
1605	if err != nil {
1606		t.Fatalf("NewAdminClient: %v", err)
1607	}
1608
1609	defer adminClient.Close()
1610
1611	iAdminClient, err := testEnv.NewInstanceAdminClient()
1612	if err != nil {
1613		t.Fatalf("NewInstanceAdminClient: %v", err)
1614	}
1615
1616	if iAdminClient == nil {
1617		return
1618	}
1619
1620	defer iAdminClient.Close()
1621
1622	iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1623	if err != nil {
1624		t.Errorf("InstanceInfo: %v", err)
1625	}
1626
1627	if iInfo.Name != adminClient.instance {
1628		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1629	}
1630
1631	if iInfo.DisplayName != adminClient.instance {
1632		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1633	}
1634
1635	const numNodes = 4
1636	// update cluster nodes
1637	if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil {
1638		t.Errorf("UpdateCluster: %v", err)
1639	}
1640
1641	// get cluster after updating
1642	cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster)
1643	if err != nil {
1644		t.Errorf("GetCluster %v", err)
1645	}
1646	if cis.ServeNodes != int(numNodes) {
1647		t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes))
1648	}
1649}
1650
1651func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
1652	testEnv, err := NewIntegrationEnv()
1653	if err != nil {
1654		return nil, nil, nil, "", nil, err
1655	}
1656
1657	var timeout time.Duration
1658	if testEnv.Config().UseProd {
1659		timeout = 10 * time.Minute
1660		t.Logf("Running test against production")
1661	} else {
1662		timeout = 1 * time.Minute
1663		t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint)
1664	}
1665	ctx, cancel := context.WithTimeout(ctx, timeout)
1666	defer cancel()
1667
1668	client, err := testEnv.NewClient()
1669	if err != nil {
1670		return nil, nil, nil, "", nil, err
1671	}
1672
1673	adminClient, err := testEnv.NewAdminClient()
1674	if err != nil {
1675		return nil, nil, nil, "", nil, err
1676	}
1677
1678	tableName = testEnv.Config().Table
1679	if err := adminClient.CreateTable(ctx, tableName); err != nil {
1680		return nil, nil, nil, "", nil, err
1681	}
1682	if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil {
1683		return nil, nil, nil, "", nil, err
1684	}
1685
1686	return client, adminClient, client.Open(tableName), tableName, func() {
1687		adminClient.DeleteTable(ctx, tableName)
1688		client.Close()
1689		adminClient.Close()
1690	}, nil
1691}
1692
1693func formatReadItem(ri ReadItem) string {
1694	// Use the column qualifier only to make the test data briefer.
1695	col := ri.Column[strings.Index(ri.Column, ":")+1:]
1696	return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value)
1697}
1698
1699func fill(b, sub []byte) {
1700	for len(b) > len(sub) {
1701		n := copy(b, sub)
1702		b = b[n:]
1703	}
1704}
1705
1706func clearTimestamps(r Row) {
1707	for _, ris := range r {
1708		for i := range ris {
1709			ris[i].Timestamp = 0
1710		}
1711	}
1712}
1713