1/*
2Copyright 2019 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"context"
21	"fmt"
22	"math"
23	"math/rand"
24	"sort"
25	"strings"
26	"sync"
27	"testing"
28	"time"
29
30	"cloud.google.com/go/internal/testutil"
31	"github.com/golang/protobuf/proto"
32	"google.golang.org/api/iterator"
33	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
34)
35
36var presidentsSocialGraph = map[string][]string{
37	"wmckinley":   {"tjefferson"},
38	"gwashington": {"jadams"},
39	"tjefferson":  {"gwashington", "jadams"},
40	"jadams":      {"gwashington", "tjefferson"},
41}
42
43func populatePresidentsGraph(table *Table) error {
44	ctx := context.Background()
45	for row, ss := range presidentsSocialGraph {
46		mut := NewMutation()
47		for _, name := range ss {
48			mut.Set("follows", name, 1000, []byte("1"))
49		}
50		if err := table.Apply(ctx, row, mut); err != nil {
51			return fmt.Errorf("Mutating row %q: %v", row, err)
52		}
53	}
54	return nil
55}
56
57func TestIntegration_ConditionalMutations(t *testing.T) {
58	ctx := context.Background()
59	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
60	if err != nil {
61		t.Fatal(err)
62	}
63	defer cleanup()
64
65	if err := populatePresidentsGraph(table); err != nil {
66		t.Fatal(err)
67	}
68
69	// Do a conditional mutation with a complex filter.
70	mutTrue := NewMutation()
71	mutTrue.Set("follows", "wmckinley", 1000, []byte("1"))
72	filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter("."))
73	mut := NewCondMutation(filter, mutTrue, nil)
74	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
75		t.Fatalf("Conditionally mutating row: %v", err)
76	}
77	// Do a second condition mutation with a filter that does not match,
78	// and thus no changes should be made.
79	mutTrue = NewMutation()
80	mutTrue.DeleteRow()
81	filter = ColumnFilter("snoop.dogg")
82	mut = NewCondMutation(filter, mutTrue, nil)
83	if err := table.Apply(ctx, "tjefferson", mut); err != nil {
84		t.Fatalf("Conditionally mutating row: %v", err)
85	}
86
87	// Fetch a row.
88	row, err := table.ReadRow(ctx, "jadams")
89	if err != nil {
90		t.Fatalf("Reading a row: %v", err)
91	}
92	wantRow := Row{
93		"follows": []ReadItem{
94			{Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")},
95			{Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")},
96		},
97	}
98	if !testutil.Equal(row, wantRow) {
99		t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
100	}
101}
102
103func TestIntegration_PartialReadRows(t *testing.T) {
104	ctx := context.Background()
105	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
106	if err != nil {
107		t.Fatal(err)
108	}
109	defer cleanup()
110
111	if err := populatePresidentsGraph(table); err != nil {
112		t.Fatal(err)
113	}
114
115	// Do a scan and stop part way through.
116	// Verify that the ReadRows callback doesn't keep running.
117	stopped := false
118	err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
119		if r.Key() < "h" {
120			return true
121		}
122		if !stopped {
123			stopped = true
124			return false
125		}
126		t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key())
127		return false
128	})
129	if err != nil {
130		t.Fatalf("Partial ReadRows: %v", err)
131	}
132}
133
134func TestIntegration_ReadRowList(t *testing.T) {
135	ctx := context.Background()
136	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
137	if err != nil {
138		t.Fatal(err)
139	}
140	defer cleanup()
141
142	if err := populatePresidentsGraph(table); err != nil {
143		t.Fatal(err)
144	}
145
146	// Read a RowList
147	var elt []string
148	keys := RowList{"wmckinley", "gwashington", "jadams"}
149	want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1"
150	err = table.ReadRows(ctx, keys, func(r Row) bool {
151		for _, ris := range r {
152			for _, ri := range ris {
153				elt = append(elt, formatReadItem(ri))
154			}
155		}
156		return true
157	})
158	if err != nil {
159		t.Fatalf("read RowList: %v", err)
160	}
161
162	if got := strings.Join(elt, ","); got != want {
163		t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
164	}
165}
166
167func TestIntegration_DeleteRow(t *testing.T) {
168	ctx := context.Background()
169	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
170	if err != nil {
171		t.Fatal(err)
172	}
173	defer cleanup()
174
175	if err := populatePresidentsGraph(table); err != nil {
176		t.Fatal(err)
177	}
178
179	// Delete a row and check it goes away.
180	mut := NewMutation()
181	mut.DeleteRow()
182	if err := table.Apply(ctx, "wmckinley", mut); err != nil {
183		t.Fatalf("Apply DeleteRow: %v", err)
184	}
185	row, err := table.ReadRow(ctx, "wmckinley")
186	if err != nil {
187		t.Fatalf("Reading a row after DeleteRow: %v", err)
188	}
189	if len(row) != 0 {
190		t.Fatalf("Read non-zero row after DeleteRow: %v", row)
191	}
192}
193
194func TestIntegration_ReadModifyWrite(t *testing.T) {
195	ctx := context.Background()
196	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
197	if err != nil {
198		t.Fatal(err)
199	}
200	defer cleanup()
201
202	if err := populatePresidentsGraph(table); err != nil {
203		t.Fatal(err)
204	}
205
206	if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil {
207		t.Fatalf("Creating column family: %v", err)
208	}
209
210	appendRMW := func(b []byte) *ReadModifyWrite {
211		rmw := NewReadModifyWrite()
212		rmw.AppendValue("counter", "likes", b)
213		return rmw
214	}
215	incRMW := func(n int64) *ReadModifyWrite {
216		rmw := NewReadModifyWrite()
217		rmw.Increment("counter", "likes", n)
218		return rmw
219	}
220	rmwSeq := []struct {
221		desc string
222		rmw  *ReadModifyWrite
223		want []byte
224	}{
225		{
226			desc: "append #1",
227			rmw:  appendRMW([]byte{0, 0, 0}),
228			want: []byte{0, 0, 0},
229		},
230		{
231			desc: "append #2",
232			rmw:  appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17
233			want: []byte{0, 0, 0, 0, 0, 0, 0, 17},
234		},
235		{
236			desc: "increment",
237			rmw:  incRMW(8),
238			want: []byte{0, 0, 0, 0, 0, 0, 0, 25},
239		},
240	}
241	for _, step := range rmwSeq {
242		row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw)
243		if err != nil {
244			t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err)
245		}
246		// Make sure the modified cell returned by the RMW operation has a timestamp.
247		if row["counter"][0].Timestamp == 0 {
248			t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp)
249		}
250		clearTimestamps(row)
251		wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}}
252		if !testutil.Equal(row, wantRow) {
253			t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow)
254		}
255	}
256
257	// Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator.
258	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0}))
259	if err != nil {
260		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
261	}
262	_, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0}))
263	if err != nil {
264		t.Fatalf("ApplyReadModifyWrite null string: %v", err)
265	}
266	// Get only the correct row back on read.
267	r, err := table.ReadRow(ctx, "issue-723-1")
268	if err != nil {
269		t.Fatalf("Reading row: %v", err)
270	}
271	if r.Key() != "issue-723-1" {
272		t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1")
273	}
274}
275
276func TestIntegration_ArbitraryTimestamps(t *testing.T) {
277	ctx := context.Background()
278	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
279	if err != nil {
280		t.Fatal(err)
281	}
282	defer cleanup()
283
284	// Test arbitrary timestamps more thoroughly.
285	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
286		t.Fatalf("Creating column family: %v", err)
287	}
288	const numVersions = 4
289	mut := NewMutation()
290	for i := 1; i < numVersions; i++ {
291		// Timestamps are used in thousands because the server
292		// only permits that granularity.
293		mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
294		mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
295	}
296	if err := table.Apply(ctx, "testrow", mut); err != nil {
297		t.Fatalf("Mutating row: %v", err)
298	}
299	r, err := table.ReadRow(ctx, "testrow")
300	if err != nil {
301		t.Fatalf("Reading row: %v", err)
302	}
303	wantRow := Row{"ts": []ReadItem{
304		// These should be returned in descending timestamp order.
305		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
306		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
307		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
308		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
309		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
310		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
311	}}
312	if !testutil.Equal(r, wantRow) {
313		t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow)
314	}
315
316	// Do the same read, but filter to the latest two versions.
317	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
318	if err != nil {
319		t.Fatalf("Reading row: %v", err)
320	}
321	wantRow = Row{"ts": []ReadItem{
322		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
323		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
324		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
325		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
326	}}
327	if !testutil.Equal(r, wantRow) {
328		t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
329	}
330	// Check cell offset / limit
331	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3)))
332	if err != nil {
333		t.Fatalf("Reading row: %v", err)
334	}
335	wantRow = Row{"ts": []ReadItem{
336		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
337		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
338		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
339	}}
340	if !testutil.Equal(r, wantRow) {
341		t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow)
342	}
343	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3)))
344	if err != nil {
345		t.Fatalf("Reading row: %v", err)
346	}
347	wantRow = Row{"ts": []ReadItem{
348		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
349		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
350		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
351	}}
352	if !testutil.Equal(r, wantRow) {
353		t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow)
354	}
355	// Check timestamp range filtering (with truncation)
356	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000)))
357	if err != nil {
358		t.Fatalf("Reading row: %v", err)
359	}
360	wantRow = Row{"ts": []ReadItem{
361		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
362		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
363		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
364		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
365	}}
366	if !testutil.Equal(r, wantRow) {
367		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow)
368	}
369	r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0)))
370	if err != nil {
371		t.Fatalf("Reading row: %v", err)
372	}
373	wantRow = Row{"ts": []ReadItem{
374		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
375		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
376		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
377		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
378		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
379		{Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")},
380	}}
381	if !testutil.Equal(r, wantRow) {
382		t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow)
383	}
384	// Delete non-existing cells, no such column family in this row
385	// Should not delete anything
386	if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil {
387		t.Fatalf("Creating column family: %v", err)
388	}
389	mut = NewMutation()
390	mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval
391	if err := table.Apply(ctx, "testrow", mut); err != nil {
392		t.Fatalf("Mutating row: %v", err)
393	}
394	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
395	if err != nil {
396		t.Fatalf("Reading row: %v", err)
397	}
398	if !testutil.Equal(r, wantRow) {
399		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
400	}
401	// Delete non-existing cells, no such column in this column family
402	// Should not delete anything
403	mut = NewMutation()
404	mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval
405	if err := table.Apply(ctx, "testrow", mut); err != nil {
406		t.Fatalf("Mutating row: %v", err)
407	}
408	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3)))
409	if err != nil {
410		t.Fatalf("Reading row: %v", err)
411	}
412	if !testutil.Equal(r, wantRow) {
413		t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow)
414	}
415	// Delete the cell with timestamp 2000 and repeat the last read,
416	// checking that we get ts 3000 and ts 1000.
417	mut = NewMutation()
418	mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval
419	if err := table.Apply(ctx, "testrow", mut); err != nil {
420		t.Fatalf("Mutating row: %v", err)
421	}
422	r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
423	if err != nil {
424		t.Fatalf("Reading row: %v", err)
425	}
426	wantRow = Row{"ts": []ReadItem{
427		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
428		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
429		{Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")},
430		{Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")},
431	}}
432	if !testutil.Equal(r, wantRow) {
433		t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow)
434	}
435
436	// Check DeleteCellsInFamily
437	if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil {
438		t.Fatalf("Creating column family: %v", err)
439	}
440
441	mut = NewMutation()
442	mut.Set("status", "start", 2000, []byte("2"))
443	mut.Set("status", "end", 3000, []byte("3"))
444	mut.Set("ts", "col", 1000, []byte("1"))
445	if err := table.Apply(ctx, "row1", mut); err != nil {
446		t.Fatalf("Mutating row: %v", err)
447	}
448	if err := table.Apply(ctx, "row2", mut); err != nil {
449		t.Fatalf("Mutating row: %v", err)
450	}
451
452	mut = NewMutation()
453	mut.DeleteCellsInFamily("status")
454	if err := table.Apply(ctx, "row1", mut); err != nil {
455		t.Fatalf("Delete cf: %v", err)
456	}
457
458	// ColumnFamily removed
459	r, err = table.ReadRow(ctx, "row1")
460	if err != nil {
461		t.Fatalf("Reading row: %v", err)
462	}
463	wantRow = Row{"ts": []ReadItem{
464		{Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
465	}}
466	if !testutil.Equal(r, wantRow) {
467		t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow)
468	}
469
470	// ColumnFamily not removed
471	r, err = table.ReadRow(ctx, "row2")
472	if err != nil {
473		t.Fatalf("Reading row: %v", err)
474	}
475	wantRow = Row{
476		"ts": []ReadItem{
477			{Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")},
478		},
479		"status": []ReadItem{
480			{Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")},
481			{Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
482		},
483	}
484	if !testutil.Equal(r, wantRow) {
485		t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow)
486	}
487
488	// Check DeleteCellsInColumn
489	mut = NewMutation()
490	mut.Set("status", "start", 2000, []byte("2"))
491	mut.Set("status", "middle", 3000, []byte("3"))
492	mut.Set("status", "end", 1000, []byte("1"))
493	if err := table.Apply(ctx, "row3", mut); err != nil {
494		t.Fatalf("Mutating row: %v", err)
495	}
496	mut = NewMutation()
497	mut.DeleteCellsInColumn("status", "middle")
498	if err := table.Apply(ctx, "row3", mut); err != nil {
499		t.Fatalf("Delete column: %v", err)
500	}
501	r, err = table.ReadRow(ctx, "row3")
502	if err != nil {
503		t.Fatalf("Reading row: %v", err)
504	}
505	wantRow = Row{
506		"status": []ReadItem{
507			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
508			{Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")},
509		},
510	}
511	if !testutil.Equal(r, wantRow) {
512		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
513	}
514	mut = NewMutation()
515	mut.DeleteCellsInColumn("status", "start")
516	if err := table.Apply(ctx, "row3", mut); err != nil {
517		t.Fatalf("Delete column: %v", err)
518	}
519	r, err = table.ReadRow(ctx, "row3")
520	if err != nil {
521		t.Fatalf("Reading row: %v", err)
522	}
523	wantRow = Row{
524		"status": []ReadItem{
525			{Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")},
526		},
527	}
528	if !testutil.Equal(r, wantRow) {
529		t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow)
530	}
531	mut = NewMutation()
532	mut.DeleteCellsInColumn("status", "end")
533	if err := table.Apply(ctx, "row3", mut); err != nil {
534		t.Fatalf("Delete column: %v", err)
535	}
536	r, err = table.ReadRow(ctx, "row3")
537	if err != nil {
538		t.Fatalf("Reading row: %v", err)
539	}
540	if len(r) != 0 {
541		t.Fatalf("Delete column: got %v, want empty row", r)
542	}
543	// Add same cell after delete
544	mut = NewMutation()
545	mut.Set("status", "end", 1000, []byte("1"))
546	if err := table.Apply(ctx, "row3", mut); err != nil {
547		t.Fatalf("Mutating row: %v", err)
548	}
549	r, err = table.ReadRow(ctx, "row3")
550	if err != nil {
551		t.Fatalf("Reading row: %v", err)
552	}
553	if !testutil.Equal(r, wantRow) {
554		t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow)
555	}
556}
557
558func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) {
559	ctx := context.Background()
560	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
561	if err != nil {
562		t.Fatal(err)
563	}
564	defer cleanup()
565
566	if err := populatePresidentsGraph(table); err != nil {
567		t.Fatal(err)
568	}
569
570	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
571		t.Fatalf("Creating column family: %v", err)
572	}
573
574	// Do highly concurrent reads/writes.
575	const maxConcurrency = 1000
576	var wg sync.WaitGroup
577	for i := 0; i < maxConcurrency; i++ {
578		wg.Add(1)
579		go func() {
580			defer wg.Done()
581			switch r := rand.Intn(100); { // r ∈ [0,100)
582			case 0 <= r && r < 30:
583				// Do a read.
584				_, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1)))
585				if err != nil {
586					t.Errorf("Concurrent read: %v", err)
587				}
588			case 30 <= r && r < 100:
589				// Do a write.
590				mut := NewMutation()
591				mut.Set("ts", "col", 1000, []byte("data"))
592				if err := table.Apply(ctx, "testrow", mut); err != nil {
593					t.Errorf("Concurrent write: %v", err)
594				}
595			}
596		}()
597	}
598	wg.Wait()
599}
600
601func TestIntegration_LargeReadsWritesAndScans(t *testing.T) {
602	ctx := context.Background()
603	_, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
604	if err != nil {
605		t.Fatal(err)
606	}
607	defer cleanup()
608
609	if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
610		t.Fatalf("Creating column family: %v", err)
611	}
612
613	bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set.
614	nonsense := []byte("lorem ipsum dolor sit amet, ")
615	fill(bigBytes, nonsense)
616	mut := NewMutation()
617	mut.Set("ts", "col", 1000, bigBytes)
618	if err := table.Apply(ctx, "bigrow", mut); err != nil {
619		t.Fatalf("Big write: %v", err)
620	}
621	r, err := table.ReadRow(ctx, "bigrow")
622	if err != nil {
623		t.Fatalf("Big read: %v", err)
624	}
625	wantRow := Row{"ts": []ReadItem{
626		{Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes},
627	}}
628	if !testutil.Equal(r, wantRow) {
629		t.Fatalf("Big read returned incorrect bytes: %v", r)
630	}
631
632	var wg sync.WaitGroup
633	// Now write 1000 rows, each with 82 KB values, then scan them all.
634	medBytes := make([]byte, 82<<10)
635	fill(medBytes, nonsense)
636	sem := make(chan int, 50) // do up to 50 mutations at a time.
637	for i := 0; i < 1000; i++ {
638		mut := NewMutation()
639		mut.Set("ts", "big-scan", 1000, medBytes)
640		row := fmt.Sprintf("row-%d", i)
641		wg.Add(1)
642		go func() {
643			defer wg.Done()
644			defer func() { <-sem }()
645			sem <- 1
646			if err := table.Apply(ctx, row, mut); err != nil {
647				t.Errorf("Preparing large scan: %v", err)
648			}
649		}()
650	}
651	wg.Wait()
652	n := 0
653	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
654		for _, ris := range r {
655			for _, ri := range ris {
656				n += len(ri.Value)
657			}
658		}
659		return true
660	}, RowFilter(ColumnFilter("big-scan")))
661	if err != nil {
662		t.Fatalf("Doing large scan: %v", err)
663	}
664	if want := 1000 * len(medBytes); n != want {
665		t.Fatalf("Large scan returned %d bytes, want %d", n, want)
666	}
667	// Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption.
668	rc := 0
669	wantRc := 3
670	err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
671		rc++
672		return true
673	}, LimitRows(int64(wantRc)))
674	if err != nil {
675		t.Fatal(err)
676	}
677	if rc != wantRc {
678		t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc)
679	}
680
681	// Test bulk mutations
682	if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil {
683		t.Fatalf("Creating column family: %v", err)
684	}
685	bulkData := map[string][]string{
686		"red sox":  {"2004", "2007", "2013"},
687		"patriots": {"2001", "2003", "2004", "2014"},
688		"celtics":  {"1981", "1984", "1986", "2008"},
689	}
690	var rowKeys []string
691	var muts []*Mutation
692	for row, ss := range bulkData {
693		mut := NewMutation()
694		for _, name := range ss {
695			mut.Set("bulk", name, 1000, []byte("1"))
696		}
697		rowKeys = append(rowKeys, row)
698		muts = append(muts, mut)
699	}
700	status, err := table.ApplyBulk(ctx, rowKeys, muts)
701	if err != nil {
702		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
703	}
704	if status != nil {
705		t.Fatalf("non-nil errors: %v", err)
706	}
707
708	// Read each row back
709	for rowKey, ss := range bulkData {
710		row, err := table.ReadRow(ctx, rowKey)
711		if err != nil {
712			t.Fatalf("Reading a bulk row: %v", err)
713		}
714		var wantItems []ReadItem
715		for _, val := range ss {
716			wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")})
717		}
718		wantRow := Row{"bulk": wantItems}
719		if !testutil.Equal(row, wantRow) {
720			t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
721		}
722	}
723
724	// Test bulk write errors.
725	// Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error.
726	badMut := NewMutation()
727	badMut.Set("badfamily", "col", ServerTime, nil)
728	badMut2 := NewMutation()
729	badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1"))
730	status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2})
731	if err != nil {
732		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
733	}
734	if status == nil {
735		t.Fatalf("No errors for bad bulk mutation")
736	} else if status[0] == nil || status[1] == nil {
737		t.Fatalf("No error for bad bulk mutation")
738	}
739}
740
741func TestIntegration_Read(t *testing.T) {
742	ctx := context.Background()
743	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
744	if err != nil {
745		t.Fatal(err)
746	}
747	defer cleanup()
748
749	// Insert some data.
750	initialData := map[string][]string{
751		"wmckinley":   {"tjefferson"},
752		"gwashington": {"jadams"},
753		"tjefferson":  {"gwashington", "jadams", "wmckinley"},
754		"jadams":      {"gwashington", "tjefferson"},
755	}
756	for row, ss := range initialData {
757		mut := NewMutation()
758		for _, name := range ss {
759			mut.Set("follows", name, 1000, []byte("1"))
760		}
761		if err := table.Apply(ctx, row, mut); err != nil {
762			t.Fatalf("Mutating row %q: %v", row, err)
763		}
764	}
765
766	for _, test := range []struct {
767		desc   string
768		rr     RowSet
769		filter Filter     // may be nil
770		limit  ReadOption // may be nil
771
772		// We do the read, grab all the cells, turn them into "<row>-<col>-<val>",
773		// and join with a comma.
774		want string
775	}{
776		{
777			desc: "read all, unfiltered",
778			rr:   RowRange{},
779			want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
780		},
781		{
782			desc: "read with InfiniteRange, unfiltered",
783			rr:   InfiniteRange("tjefferson"),
784			want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
785		},
786		{
787			desc: "read with NewRange, unfiltered",
788			rr:   NewRange("gargamel", "hubbard"),
789			want: "gwashington-jadams-1",
790		},
791		{
792			desc: "read with PrefixRange, unfiltered",
793			rr:   PrefixRange("jad"),
794			want: "jadams-gwashington-1,jadams-tjefferson-1",
795		},
796		{
797			desc: "read with SingleRow, unfiltered",
798			rr:   SingleRow("wmckinley"),
799			want: "wmckinley-tjefferson-1",
800		},
801		{
802			desc:   "read all, with ColumnFilter",
803			rr:     RowRange{},
804			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
805			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
806		},
807		{
808			desc:   "read all, with ColumnFilter, prefix",
809			rr:     RowRange{},
810			filter: ColumnFilter("j"), // no matches
811			want:   "",
812		},
813		{
814			desc:   "read range, with ColumnRangeFilter",
815			rr:     RowRange{},
816			filter: ColumnRangeFilter("follows", "h", "k"),
817			want:   "gwashington-jadams-1,tjefferson-jadams-1",
818		},
819		{
820			desc:   "read range from empty, with ColumnRangeFilter",
821			rr:     RowRange{},
822			filter: ColumnRangeFilter("follows", "", "u"),
823			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
824		},
825		{
826			desc:   "read range from start to empty, with ColumnRangeFilter",
827			rr:     RowRange{},
828			filter: ColumnRangeFilter("follows", "h", ""),
829			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
830		},
831		{
832			desc:   "read with RowKeyFilter",
833			rr:     RowRange{},
834			filter: RowKeyFilter(".*wash.*"),
835			want:   "gwashington-jadams-1",
836		},
837		{
838			desc:   "read with RowKeyFilter, prefix",
839			rr:     RowRange{},
840			filter: RowKeyFilter("gwash"),
841			want:   "",
842		},
843		{
844			desc:   "read with RowKeyFilter, no matches",
845			rr:     RowRange{},
846			filter: RowKeyFilter(".*xxx.*"),
847			want:   "",
848		},
849		{
850			desc:   "read with FamilyFilter, no matches",
851			rr:     RowRange{},
852			filter: FamilyFilter(".*xxx.*"),
853			want:   "",
854		},
855		{
856			desc:   "read with ColumnFilter + row limit",
857			rr:     RowRange{},
858			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
859			limit:  LimitRows(2),
860			want:   "gwashington-jadams-1,jadams-tjefferson-1",
861		},
862		{
863			desc:   "read all, strip values",
864			rr:     RowRange{},
865			filter: StripValueFilter(),
866			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
867		},
868		{
869			desc:   "read with ColumnFilter + row limit + strip values",
870			rr:     RowRange{},
871			filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson"
872			limit:  LimitRows(2),
873			want:   "gwashington-jadams-,jadams-tjefferson-",
874		},
875		{
876			desc:   "read with condition, strip values on true",
877			rr:     RowRange{},
878			filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil),
879			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
880		},
881		{
882			desc:   "read with condition, strip values on false",
883			rr:     RowRange{},
884			filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()),
885			want:   "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
886		},
887		{
888			desc:   "read with ValueRangeFilter + row limit",
889			rr:     RowRange{},
890			filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1"
891			limit:  LimitRows(2),
892			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1",
893		},
894		{
895			desc:   "read with ValueRangeFilter, no match on exclusive end",
896			rr:     RowRange{},
897			filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match
898			want:   "",
899		},
900		{
901			desc:   "read with ValueRangeFilter, no matches",
902			rr:     RowRange{},
903			filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing
904			want:   "",
905		},
906		{
907			desc:   "read with InterleaveFilter, no matches on all filters",
908			rr:     RowRange{},
909			filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")),
910			want:   "",
911		},
912		{
913			desc:   "read with InterleaveFilter, no duplicate cells",
914			rr:     RowRange{},
915			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")),
916			want:   "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
917		},
918		{
919			desc:   "read with InterleaveFilter, with duplicate cells",
920			rr:     RowRange{},
921			filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")),
922			want:   "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1",
923		},
924		{
925			desc: "read with a RowRangeList and no filter",
926			rr:   RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")},
927			want: "gwashington-jadams-1,wmckinley-tjefferson-1",
928		},
929		{
930			desc:   "chain that excludes rows and matches nothing, in a condition",
931			rr:     RowRange{},
932			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil),
933			want:   "",
934		},
935		{
936			desc:   "chain that ends with an interleave that has no match. covers #804",
937			rr:     RowRange{},
938			filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil),
939			want:   "",
940		},
941	} {
942		t.Run(test.desc, func(t *testing.T) {
943			var opts []ReadOption
944			if test.filter != nil {
945				opts = append(opts, RowFilter(test.filter))
946			}
947			if test.limit != nil {
948				opts = append(opts, test.limit)
949			}
950			var elt []string
951			err := table.ReadRows(ctx, test.rr, func(r Row) bool {
952				for _, ris := range r {
953					for _, ri := range ris {
954						elt = append(elt, formatReadItem(ri))
955					}
956				}
957				return true
958			}, opts...)
959			if err != nil {
960				t.Fatal(err)
961			}
962			if got := strings.Join(elt, ","); got != test.want {
963				t.Fatalf("got %q\nwant %q", got, test.want)
964			}
965		})
966	}
967}
968
969func TestIntegration_SampleRowKeys(t *testing.T) {
970	ctx := context.Background()
971	_, _, table, _, cleanup, err := setupIntegration(ctx, t)
972	if err != nil {
973		t.Fatal(err)
974	}
975	defer cleanup()
976
977	// Insert some data.
978	initialData := map[string][]string{
979		"wmckinley11":   {"tjefferson11"},
980		"gwashington77": {"jadams77"},
981		"tjefferson0":   {"gwashington0", "jadams0"},
982	}
983
984	for row, ss := range initialData {
985		mut := NewMutation()
986		for _, name := range ss {
987			mut.Set("follows", name, 1000, []byte("1"))
988		}
989		if err := table.Apply(ctx, row, mut); err != nil {
990			t.Fatalf("Mutating row %q: %v", row, err)
991		}
992	}
993	sampleKeys, err := table.SampleRowKeys(context.Background())
994	if err != nil {
995		t.Fatalf("%s: %v", "SampleRowKeys:", err)
996	}
997	if len(sampleKeys) == 0 {
998		t.Error("SampleRowKeys length 0")
999	}
1000}
1001
1002func TestIntegration_Admin(t *testing.T) {
1003	testEnv, err := NewIntegrationEnv()
1004	if err != nil {
1005		t.Fatalf("IntegrationEnv: %v", err)
1006	}
1007	defer testEnv.Close()
1008
1009	timeout := 2 * time.Second
1010	if testEnv.Config().UseProd {
1011		timeout = 5 * time.Minute
1012	}
1013	ctx, _ := context.WithTimeout(context.Background(), timeout)
1014
1015	adminClient, err := testEnv.NewAdminClient()
1016	if err != nil {
1017		t.Fatalf("NewAdminClient: %v", err)
1018	}
1019	defer adminClient.Close()
1020
1021	iAdminClient, err := testEnv.NewInstanceAdminClient()
1022	if err != nil {
1023		t.Fatalf("NewInstanceAdminClient: %v", err)
1024	}
1025	if iAdminClient != nil {
1026		defer iAdminClient.Close()
1027
1028		iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1029		if err != nil {
1030			t.Errorf("InstanceInfo: %v", err)
1031		}
1032		if iInfo.Name != adminClient.instance {
1033			t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1034		}
1035	}
1036
1037	list := func() []string {
1038		tbls, err := adminClient.Tables(ctx)
1039		if err != nil {
1040			t.Fatalf("Fetching list of tables: %v", err)
1041		}
1042		sort.Strings(tbls)
1043		return tbls
1044	}
1045	containsAll := func(got, want []string) bool {
1046		gotSet := make(map[string]bool)
1047
1048		for _, s := range got {
1049			gotSet[s] = true
1050		}
1051		for _, s := range want {
1052			if !gotSet[s] {
1053				return false
1054			}
1055		}
1056		return true
1057	}
1058
1059	defer adminClient.DeleteTable(ctx, "mytable")
1060
1061	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1062		t.Fatalf("Creating table: %v", err)
1063	}
1064
1065	defer adminClient.DeleteTable(ctx, "myothertable")
1066
1067	if err := adminClient.CreateTable(ctx, "myothertable"); err != nil {
1068		t.Fatalf("Creating table: %v", err)
1069	}
1070
1071	if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) {
1072		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1073	}
1074
1075	must(adminClient.WaitForReplication(ctx, "mytable"))
1076
1077	if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil {
1078		t.Fatalf("Deleting table: %v", err)
1079	}
1080	tables := list()
1081	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1082		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1083	}
1084	if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) {
1085		t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted)
1086	}
1087
1088	tblConf := TableConf{
1089		TableID: "conftable",
1090		Families: map[string]GCPolicy{
1091			"fam1": MaxVersionsPolicy(1),
1092			"fam2": MaxVersionsPolicy(2),
1093		},
1094	}
1095	if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
1096		t.Fatalf("Creating table from TableConf: %v", err)
1097	}
1098	defer adminClient.DeleteTable(ctx, tblConf.TableID)
1099
1100	tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID)
1101	if err != nil {
1102		t.Fatalf("Getting table info: %v", err)
1103	}
1104	sort.Strings(tblInfo.Families)
1105	wantFams := []string{"fam1", "fam2"}
1106	if !testutil.Equal(tblInfo.Families, wantFams) {
1107		t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams)
1108	}
1109
1110	// Populate mytable and drop row ranges
1111	if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil {
1112		t.Fatalf("Creating column family: %v", err)
1113	}
1114
1115	client, err := testEnv.NewClient()
1116	if err != nil {
1117		t.Fatalf("NewClient: %v", err)
1118	}
1119	defer client.Close()
1120
1121	tbl := client.Open("mytable")
1122
1123	prefixes := []string{"a", "b", "c"}
1124	for _, prefix := range prefixes {
1125		for i := 0; i < 5; i++ {
1126			mut := NewMutation()
1127			mut.Set("cf", "col", 1000, []byte("1"))
1128			if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil {
1129				t.Fatalf("Mutating row: %v", err)
1130			}
1131		}
1132	}
1133
1134	if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil {
1135		t.Errorf("DropRowRange a: %v", err)
1136	}
1137	if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil {
1138		t.Errorf("DropRowRange c: %v", err)
1139	}
1140	if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil {
1141		t.Errorf("DropRowRange x: %v", err)
1142	}
1143
1144	var gotRowCount int
1145	must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
1146		gotRowCount++
1147		if !strings.HasPrefix(row.Key(), "b") {
1148			t.Errorf("Invalid row after dropping range: %v", row)
1149		}
1150		return true
1151	}))
1152	if gotRowCount != 5 {
1153		t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5)
1154	}
1155}
1156
1157func TestIntegration_AdminSnapshot(t *testing.T) {
1158	testEnv, err := NewIntegrationEnv()
1159	if err != nil {
1160		t.Fatalf("IntegrationEnv: %v", err)
1161	}
1162	defer testEnv.Close()
1163
1164	if !testEnv.Config().UseProd {
1165		t.Skip("emulator doesn't support snapshots")
1166	}
1167
1168	timeout := 2 * time.Second
1169	if testEnv.Config().UseProd {
1170		timeout = 5 * time.Minute
1171	}
1172	ctx, _ := context.WithTimeout(context.Background(), timeout)
1173
1174	adminClient, err := testEnv.NewAdminClient()
1175	if err != nil {
1176		t.Fatalf("NewAdminClient: %v", err)
1177	}
1178	defer adminClient.Close()
1179
1180	table := testEnv.Config().Table
1181	cluster := testEnv.Config().Cluster
1182
1183	list := func(cluster string) ([]*SnapshotInfo, error) {
1184		infos := []*SnapshotInfo(nil)
1185
1186		it := adminClient.Snapshots(ctx, cluster)
1187		for {
1188			s, err := it.Next()
1189			if err == iterator.Done {
1190				break
1191			}
1192			if err != nil {
1193				return nil, err
1194			}
1195			infos = append(infos, s)
1196		}
1197		return infos, err
1198	}
1199
1200	// Delete the table at the end of the test. Schedule ahead of time
1201	// in case the client fails
1202	defer adminClient.DeleteTable(ctx, table)
1203
1204	if err := adminClient.CreateTable(ctx, table); err != nil {
1205		t.Fatalf("Creating table: %v", err)
1206	}
1207
1208	// Precondition: no snapshots
1209	snapshots, err := list(cluster)
1210	if err != nil {
1211		t.Fatalf("Initial snapshot list: %v", err)
1212	}
1213	if got, want := len(snapshots), 0; got != want {
1214		t.Fatalf("Initial snapshot list len: %d, want: %d", got, want)
1215	}
1216
1217	// Create snapshot
1218	defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot")
1219
1220	if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil {
1221		t.Fatalf("Creating snaphot: %v", err)
1222	}
1223
1224	// List snapshot
1225	snapshots, err = list(cluster)
1226	if err != nil {
1227		t.Fatalf("Listing snapshots: %v", err)
1228	}
1229	if got, want := len(snapshots), 1; got != want {
1230		t.Fatalf("Listing snapshot count: %d, want: %d", got, want)
1231	}
1232	if got, want := snapshots[0].Name, "mysnapshot"; got != want {
1233		t.Fatalf("Snapshot name: %s, want: %s", got, want)
1234	}
1235	if got, want := snapshots[0].SourceTable, table; got != want {
1236		t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want)
1237	}
1238	if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
1239		t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want)
1240	}
1241
1242	// Get snapshot
1243	snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot")
1244	if err != nil {
1245		t.Fatalf("SnapshotInfo: %v", snapshot)
1246	}
1247	if got, want := *snapshot, *snapshots[0]; got != want {
1248		t.Fatalf("SnapshotInfo: %v, want: %v", got, want)
1249	}
1250
1251	// Restore
1252	restoredTable := table + "-restored"
1253	defer adminClient.DeleteTable(ctx, restoredTable)
1254	if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil {
1255		t.Fatalf("CreateTableFromSnapshot: %v", err)
1256	}
1257	if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
1258		t.Fatalf("Restored TableInfo: %v", err)
1259	}
1260
1261	// Delete snapshot
1262	if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil {
1263		t.Fatalf("DeleteSnapshot: %v", err)
1264	}
1265	snapshots, err = list(cluster)
1266	if err != nil {
1267		t.Fatalf("List after Delete: %v", err)
1268	}
1269	if got, want := len(snapshots), 0; got != want {
1270		t.Fatalf("List after delete len: %d, want: %d", got, want)
1271	}
1272}
1273
1274func TestIntegration_Granularity(t *testing.T) {
1275	testEnv, err := NewIntegrationEnv()
1276	if err != nil {
1277		t.Fatalf("IntegrationEnv: %v", err)
1278	}
1279	defer testEnv.Close()
1280
1281	timeout := 2 * time.Second
1282	if testEnv.Config().UseProd {
1283		timeout = 5 * time.Minute
1284	}
1285	ctx, _ := context.WithTimeout(context.Background(), timeout)
1286
1287	adminClient, err := testEnv.NewAdminClient()
1288	if err != nil {
1289		t.Fatalf("NewAdminClient: %v", err)
1290	}
1291	defer adminClient.Close()
1292
1293	list := func() []string {
1294		tbls, err := adminClient.Tables(ctx)
1295		if err != nil {
1296			t.Fatalf("Fetching list of tables: %v", err)
1297		}
1298		sort.Strings(tbls)
1299		return tbls
1300	}
1301	containsAll := func(got, want []string) bool {
1302		gotSet := make(map[string]bool)
1303
1304		for _, s := range got {
1305			gotSet[s] = true
1306		}
1307		for _, s := range want {
1308			if !gotSet[s] {
1309				return false
1310			}
1311		}
1312		return true
1313	}
1314
1315	defer adminClient.DeleteTable(ctx, "mytable")
1316
1317	if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
1318		t.Fatalf("Creating table: %v", err)
1319	}
1320
1321	tables := list()
1322	if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
1323		t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
1324	}
1325
1326	// calling ModifyColumnFamilies to check the granularity of table
1327	prefix := adminClient.instancePrefix()
1328	req := &btapb.ModifyColumnFamiliesRequest{
1329		Name: prefix + "/tables/" + "mytable",
1330		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
1331			Id:  "cf",
1332			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}},
1333		}},
1334	}
1335	table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req)
1336	if err != nil {
1337		t.Fatalf("Creating column family: %v", err)
1338	}
1339	if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) {
1340		t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS))
1341	}
1342}
1343
1344func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) {
1345	testEnv, err := NewIntegrationEnv()
1346	if err != nil {
1347		t.Fatalf("IntegrationEnv: %v", err)
1348	}
1349	defer testEnv.Close()
1350
1351	timeout := 2 * time.Second
1352	if testEnv.Config().UseProd {
1353		timeout = 5 * time.Minute
1354	}
1355	ctx, cancel := context.WithTimeout(context.Background(), timeout)
1356	defer cancel()
1357
1358	adminClient, err := testEnv.NewAdminClient()
1359	if err != nil {
1360		t.Fatalf("NewAdminClient: %v", err)
1361	}
1362	defer adminClient.Close()
1363
1364	iAdminClient, err := testEnv.NewInstanceAdminClient()
1365	if err != nil {
1366		t.Fatalf("NewInstanceAdminClient: %v", err)
1367	}
1368
1369	if iAdminClient == nil {
1370		return
1371	}
1372
1373	defer iAdminClient.Close()
1374	profile := ProfileConf{
1375		ProfileID:     "app_profile1",
1376		InstanceID:    adminClient.instance,
1377		ClusterID:     testEnv.Config().Cluster,
1378		Description:   "creating new app profile 1",
1379		RoutingPolicy: SingleClusterRouting,
1380	}
1381
1382	createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile)
1383	if err != nil {
1384		t.Fatalf("Creating app profile: %v", err)
1385
1386	}
1387
1388	gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
1389
1390	if err != nil {
1391		t.Fatalf("Get app profile: %v", err)
1392	}
1393
1394	if !proto.Equal(createdProfile, gotProfile) {
1395		t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name)
1396
1397	}
1398
1399	list := func(instanceID string) ([]*btapb.AppProfile, error) {
1400		profiles := []*btapb.AppProfile(nil)
1401
1402		it := iAdminClient.ListAppProfiles(ctx, instanceID)
1403		for {
1404			s, err := it.Next()
1405			if err == iterator.Done {
1406				break
1407			}
1408			if err != nil {
1409				return nil, err
1410			}
1411			profiles = append(profiles, s)
1412		}
1413		return profiles, err
1414	}
1415
1416	profiles, err := list(adminClient.instance)
1417	if err != nil {
1418		t.Fatalf("List app profile: %v", err)
1419	}
1420
1421	if got, want := len(profiles), 1; got != want {
1422		t.Fatalf("Initial app profile list len: %d, want: %d", got, want)
1423	}
1424
1425	for _, test := range []struct {
1426		desc   string
1427		uattrs ProfileAttrsToUpdate
1428		want   *btapb.AppProfile // nil means error
1429	}{
1430		{
1431			desc:   "empty update",
1432			uattrs: ProfileAttrsToUpdate{},
1433			want:   nil,
1434		},
1435
1436		{
1437			desc:   "empty description update",
1438			uattrs: ProfileAttrsToUpdate{Description: ""},
1439			want: &btapb.AppProfile{
1440				Name:          gotProfile.Name,
1441				Description:   "",
1442				RoutingPolicy: gotProfile.RoutingPolicy,
1443				Etag:          gotProfile.Etag},
1444		},
1445		{
1446			desc: "routing update",
1447			uattrs: ProfileAttrsToUpdate{
1448				RoutingPolicy: SingleClusterRouting,
1449				ClusterID:     testEnv.Config().Cluster,
1450			},
1451			want: &btapb.AppProfile{
1452				Name:        gotProfile.Name,
1453				Description: "",
1454				Etag:        gotProfile.Etag,
1455				RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
1456					SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
1457						ClusterId: testEnv.Config().Cluster,
1458					}},
1459			},
1460		},
1461	} {
1462		err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs)
1463		if err != nil {
1464			if test.want != nil {
1465				t.Errorf("%s: %v", test.desc, err)
1466			}
1467			continue
1468		}
1469		if err == nil && test.want == nil {
1470			t.Errorf("%s: got nil, want error", test.desc)
1471			continue
1472		}
1473
1474		got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
1475
1476		if !proto.Equal(got, test.want) {
1477			t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want)
1478		}
1479
1480	}
1481
1482	err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1")
1483	if err != nil {
1484		t.Fatalf("Delete app profile: %v", err)
1485	}
1486
1487}
1488
1489func TestIntegration_InstanceUpdate(t *testing.T) {
1490	testEnv, err := NewIntegrationEnv()
1491	if err != nil {
1492		t.Fatalf("IntegrationEnv: %v", err)
1493	}
1494	defer testEnv.Close()
1495
1496	timeout := 2 * time.Second
1497	if testEnv.Config().UseProd {
1498		timeout = 5 * time.Minute
1499	}
1500	ctx, cancel := context.WithTimeout(context.Background(), timeout)
1501	defer cancel()
1502
1503	adminClient, err := testEnv.NewAdminClient()
1504	if err != nil {
1505		t.Fatalf("NewAdminClient: %v", err)
1506	}
1507
1508	defer adminClient.Close()
1509
1510	iAdminClient, err := testEnv.NewInstanceAdminClient()
1511	if err != nil {
1512		t.Fatalf("NewInstanceAdminClient: %v", err)
1513	}
1514
1515	if iAdminClient == nil {
1516		return
1517	}
1518
1519	defer iAdminClient.Close()
1520
1521	iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
1522	if err != nil {
1523		t.Errorf("InstanceInfo: %v", err)
1524	}
1525
1526	if iInfo.Name != adminClient.instance {
1527		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1528	}
1529
1530	if iInfo.DisplayName != adminClient.instance {
1531		t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
1532	}
1533
1534	const numNodes = 4
1535	// update cluster nodes
1536	if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil {
1537		t.Errorf("UpdateCluster: %v", err)
1538	}
1539
1540	// get cluster after updating
1541	cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster)
1542	if err != nil {
1543		t.Errorf("GetCluster %v", err)
1544	}
1545	if cis.ServeNodes != int(numNodes) {
1546		t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes))
1547	}
1548}
1549
1550func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
1551	testEnv, err := NewIntegrationEnv()
1552	if err != nil {
1553		return nil, nil, nil, "", nil, err
1554	}
1555
1556	var timeout time.Duration
1557	if testEnv.Config().UseProd {
1558		timeout = 10 * time.Minute
1559		t.Logf("Running test against production")
1560	} else {
1561		timeout = 1 * time.Minute
1562		t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint)
1563	}
1564	ctx, cancel := context.WithTimeout(ctx, timeout)
1565	defer cancel()
1566
1567	client, err := testEnv.NewClient()
1568	if err != nil {
1569		return nil, nil, nil, "", nil, err
1570	}
1571
1572	adminClient, err := testEnv.NewAdminClient()
1573	if err != nil {
1574		return nil, nil, nil, "", nil, err
1575	}
1576
1577	tableName = testEnv.Config().Table
1578	if err := adminClient.CreateTable(ctx, tableName); err != nil {
1579		return nil, nil, nil, "", nil, err
1580	}
1581	if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil {
1582		return nil, nil, nil, "", nil, err
1583	}
1584
1585	return client, adminClient, client.Open(tableName), tableName, func() {
1586		adminClient.DeleteTable(ctx, tableName)
1587		client.Close()
1588		adminClient.Close()
1589	}, nil
1590}
1591
1592func formatReadItem(ri ReadItem) string {
1593	// Use the column qualifier only to make the test data briefer.
1594	col := ri.Column[strings.Index(ri.Column, ":")+1:]
1595	return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value)
1596}
1597
1598func fill(b, sub []byte) {
1599	for len(b) > len(sub) {
1600		n := copy(b, sub)
1601		b = b[n:]
1602	}
1603}
1604
1605func clearTimestamps(r Row) {
1606	for _, ris := range r {
1607		for i := range ris {
1608			ris[i].Timestamp = 0
1609		}
1610	}
1611}
1612