1/*
2Copyright 2015 Google Inc. All Rights Reserved.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"flag"
21	"fmt"
22	"math/rand"
23	"reflect"
24	"sort"
25	"strings"
26	"sync"
27	"testing"
28	"time"
29
30	"cloud.google.com/go/bigtable/bttest"
31	"golang.org/x/net/context"
32	"google.golang.org/api/option"
33	"google.golang.org/grpc"
34)
35
36func TestPrefix(t *testing.T) {
37	tests := []struct {
38		prefix, succ string
39	}{
40		{"", ""},
41		{"\xff", ""}, // when used, "" means Infinity
42		{"x\xff", "y"},
43		{"\xfe", "\xff"},
44	}
45	for _, tc := range tests {
46		got := prefixSuccessor(tc.prefix)
47		if got != tc.succ {
48			t.Errorf("prefixSuccessor(%q) = %q, want %s", tc.prefix, got, tc.succ)
49			continue
50		}
51		r := PrefixRange(tc.prefix)
52		if tc.succ == "" && r.limit != "" {
53			t.Errorf("PrefixRange(%q) got limit %q", tc.prefix, r.limit)
54		}
55		if tc.succ != "" && r.limit != tc.succ {
56			t.Errorf("PrefixRange(%q) got limit %q, want %q", tc.prefix, r.limit, tc.succ)
57		}
58	}
59}
60
61var useProd = flag.String("use_prod", "", `if set to "proj,instance,table", run integration test against production`)
62
63func TestClientIntegration(t *testing.T) {
64	start := time.Now()
65	lastCheckpoint := start
66	checkpoint := func(s string) {
67		n := time.Now()
68		t.Logf("[%s] %v since start, %v since last checkpoint", s, n.Sub(start), n.Sub(lastCheckpoint))
69		lastCheckpoint = n
70	}
71
72	proj, instance, table := "proj", "instance", "mytable"
73	var clientOpts []option.ClientOption
74	timeout := 10 * time.Second
75	if *useProd == "" {
76		srv, err := bttest.NewServer("127.0.0.1:0")
77		if err != nil {
78			t.Fatal(err)
79		}
80		defer srv.Close()
81		t.Logf("bttest.Server running on %s", srv.Addr)
82		conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
83		if err != nil {
84			t.Fatalf("grpc.Dial: %v", err)
85		}
86		clientOpts = []option.ClientOption{option.WithGRPCConn(conn)}
87	} else {
88		t.Logf("Running test against production")
89		a := strings.SplitN(*useProd, ",", 3)
90		proj, instance, table = a[0], a[1], a[2]
91		timeout = 5 * time.Minute
92	}
93
94	ctx, _ := context.WithTimeout(context.Background(), timeout)
95
96	client, err := NewClient(ctx, proj, instance, clientOpts...)
97	if err != nil {
98		t.Fatalf("NewClient: %v", err)
99	}
100	defer client.Close()
101	checkpoint("dialed Client")
102
103	adminClient, err := NewAdminClient(ctx, proj, instance, clientOpts...)
104	if err != nil {
105		t.Fatalf("NewAdminClient: %v", err)
106	}
107	defer adminClient.Close()
108	checkpoint("dialed AdminClient")
109
110	// Delete the table at the end of the test.
111	// Do this even before creating the table so that if this is running
112	// against production and CreateTable fails there's a chance of cleaning it up.
113	defer adminClient.DeleteTable(ctx, table)
114
115	if err := adminClient.CreateTable(ctx, table); err != nil {
116		t.Fatalf("Creating table: %v", err)
117	}
118	checkpoint("created table")
119	if err := adminClient.CreateColumnFamily(ctx, table, "follows"); err != nil {
120		t.Fatalf("Creating column family: %v", err)
121	}
122	checkpoint(`created "follows" column family`)
123
124	tbl := client.Open(table)
125
126	// Insert some data.
127	initialData := map[string][]string{
128		"wmckinley":   []string{"tjefferson"},
129		"gwashington": []string{"jadams"},
130		"tjefferson":  []string{"gwashington", "jadams"}, // wmckinley set conditionally below
131		"jadams":      []string{"gwashington", "tjefferson"},
132	}
133	for row, ss := range initialData {
134		mut := NewMutation()
135		for _, name := range ss {
136			mut.Set("follows", name, 0, []byte("1"))
137		}
138		if err := tbl.Apply(ctx, row, mut); err != nil {
139			t.Errorf("Mutating row %q: %v", row, err)
140		}
141	}
142	checkpoint("inserted initial data")
143
144	// Do a conditional mutation with a complex filter.
145	mutTrue := NewMutation()
146	mutTrue.Set("follows", "wmckinley", 0, []byte("1"))
147	filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter("."))
148	mut := NewCondMutation(filter, mutTrue, nil)
149	if err := tbl.Apply(ctx, "tjefferson", mut); err != nil {
150		t.Errorf("Conditionally mutating row: %v", err)
151	}
152	// Do a second condition mutation with a filter that does not match,
153	// and thus no changes should be made.
154	mutTrue = NewMutation()
155	mutTrue.DeleteRow()
156	filter = ColumnFilter("snoop.dogg")
157	mut = NewCondMutation(filter, mutTrue, nil)
158	if err := tbl.Apply(ctx, "tjefferson", mut); err != nil {
159		t.Errorf("Conditionally mutating row: %v", err)
160	}
161	checkpoint("did two conditional mutations")
162
163	// Fetch a row.
164	row, err := tbl.ReadRow(ctx, "jadams")
165	if err != nil {
166		t.Fatalf("Reading a row: %v", err)
167	}
168	wantRow := Row{
169		"follows": []ReadItem{
170			{Row: "jadams", Column: "follows:gwashington", Value: []byte("1")},
171			{Row: "jadams", Column: "follows:tjefferson", Value: []byte("1")},
172		},
173	}
174	for _, ris := range row {
175		sort.Sort(byColumn(ris))
176	}
177	if !reflect.DeepEqual(row, wantRow) {
178		t.Errorf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
179	}
180	checkpoint("tested ReadRow")
181
182	// Do a bunch of reads with filters.
183	readTests := []struct {
184		desc   string
185		rr     RowRange
186		filter Filter // may be nil
187
188		// We do the read, grab all the cells, turn them into "<row>-<col>-<val>",
189		// sort that list, and join with a comma.
190		want string
191	}{
192		{
193			desc: "read all, unfiltered",
194			rr:   RowRange{},
195			want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
196		},
197		{
198			desc: "read with InfiniteRange, unfiltered",
199			rr:   InfiniteRange("tjefferson"),
200			want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
201		},
202		{
203			desc: "read with NewRange, unfiltered",
204			rr:   NewRange("gargamel", "hubbard"),
205			want: "gwashington-jadams-1",
206		},
207		{
208			desc: "read with PrefixRange, unfiltered",
209			rr:   PrefixRange("jad"),
210			want: "jadams-gwashington-1,jadams-tjefferson-1",
211		},
212		{
213			desc: "read with SingleRow, unfiltered",
214			rr:   SingleRow("wmckinley"),
215			want: "wmckinley-tjefferson-1",
216		},
217		{
218			desc:   "read all, with ColumnFilter",
219			rr:     RowRange{},
220			filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
221			want:   "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
222		},
223	}
224	for _, tc := range readTests {
225		var opts []ReadOption
226		if tc.filter != nil {
227			opts = append(opts, RowFilter(tc.filter))
228		}
229		var elt []string
230		err := tbl.ReadRows(context.Background(), tc.rr, func(r Row) bool {
231			for _, ris := range r {
232				for _, ri := range ris {
233					elt = append(elt, formatReadItem(ri))
234				}
235			}
236			return true
237		}, opts...)
238		if err != nil {
239			t.Errorf("%s: %v", tc.desc, err)
240			continue
241		}
242		sort.Strings(elt)
243		if got := strings.Join(elt, ","); got != tc.want {
244			t.Errorf("%s: wrong reads.\n got %q\nwant %q", tc.desc, got, tc.want)
245		}
246	}
247	// Read a RowList
248	var elt []string
249	keys := RowList{"wmckinley", "gwashington", "jadams"}
250	want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1"
251	err = tbl.ReadRows(ctx, keys, func(r Row) bool {
252		for _, ris := range r {
253			for _, ri := range ris {
254				elt = append(elt, formatReadItem(ri))
255			}
256		}
257		return true
258	})
259	if err != nil {
260		t.Errorf("read RowList: %v", err)
261	}
262
263	sort.Strings(elt)
264	if got := strings.Join(elt, ","); got != want {
265		t.Errorf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
266	}
267	checkpoint("tested ReadRows in a few ways")
268
269	// Do a scan and stop part way through.
270	// Verify that the ReadRows callback doesn't keep running.
271	stopped := false
272	err = tbl.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
273		if r.Key() < "h" {
274			return true
275		}
276		if !stopped {
277			stopped = true
278			return false
279		}
280		t.Errorf("ReadRows kept scanning to row %q after being told to stop", r.Key())
281		return false
282	})
283	if err != nil {
284		t.Errorf("Partial ReadRows: %v", err)
285	}
286	checkpoint("did partial ReadRows test")
287
288	// Delete a row and check it goes away.
289	mut = NewMutation()
290	mut.DeleteRow()
291	if err := tbl.Apply(ctx, "wmckinley", mut); err != nil {
292		t.Errorf("Apply DeleteRow: %v", err)
293	}
294	row, err = tbl.ReadRow(ctx, "wmckinley")
295	if err != nil {
296		t.Fatalf("Reading a row after DeleteRow: %v", err)
297	}
298	if len(row) != 0 {
299		t.Fatalf("Read non-zero row after DeleteRow: %v", row)
300	}
301	checkpoint("exercised DeleteRow")
302
303	// Check ReadModifyWrite.
304
305	if err := adminClient.CreateColumnFamily(ctx, table, "counter"); err != nil {
306		t.Fatalf("Creating column family: %v", err)
307	}
308
309	appendRMW := func(b []byte) *ReadModifyWrite {
310		rmw := NewReadModifyWrite()
311		rmw.AppendValue("counter", "likes", b)
312		return rmw
313	}
314	incRMW := func(n int64) *ReadModifyWrite {
315		rmw := NewReadModifyWrite()
316		rmw.Increment("counter", "likes", n)
317		return rmw
318	}
319	rmwSeq := []struct {
320		desc string
321		rmw  *ReadModifyWrite
322		want []byte
323	}{
324		{
325			desc: "append #1",
326			rmw:  appendRMW([]byte{0, 0, 0}),
327			want: []byte{0, 0, 0},
328		},
329		{
330			desc: "append #2",
331			rmw:  appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17
332			want: []byte{0, 0, 0, 0, 0, 0, 0, 17},
333		},
334		{
335			desc: "increment",
336			rmw:  incRMW(8),
337			want: []byte{0, 0, 0, 0, 0, 0, 0, 25},
338		},
339	}
340	for _, step := range rmwSeq {
341		row, err := tbl.ApplyReadModifyWrite(ctx, "gwashington", step.rmw)
342		if err != nil {
343			t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err)
344		}
345		clearTimestamps(row)
346		wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}}
347		if !reflect.DeepEqual(row, wantRow) {
348			t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow)
349		}
350	}
351	checkpoint("tested ReadModifyWrite")
352
353	// Test arbitrary timestamps more thoroughly.
354	if err := adminClient.CreateColumnFamily(ctx, table, "ts"); err != nil {
355		t.Fatalf("Creating column family: %v", err)
356	}
357	const numVersions = 4
358	mut = NewMutation()
359	for i := 0; i < numVersions; i++ {
360		// Timestamps are used in thousands because the server
361		// only permits that granularity.
362		mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
363	}
364	if err := tbl.Apply(ctx, "testrow", mut); err != nil {
365		t.Fatalf("Mutating row: %v", err)
366	}
367	r, err := tbl.ReadRow(ctx, "testrow")
368	if err != nil {
369		t.Fatalf("Reading row: %v", err)
370	}
371	wantRow = Row{"ts": []ReadItem{
372		// These should be returned in descending timestamp order.
373		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
374		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
375		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
376		{Row: "testrow", Column: "ts:col", Timestamp: 0, Value: []byte("val-0")},
377	}}
378	if !reflect.DeepEqual(r, wantRow) {
379		t.Errorf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow)
380	}
381	// Do the same read, but filter to the latest two versions.
382	r, err = tbl.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
383	if err != nil {
384		t.Fatalf("Reading row: %v", err)
385	}
386	wantRow = Row{"ts": []ReadItem{
387		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
388		{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
389	}}
390	if !reflect.DeepEqual(r, wantRow) {
391		t.Errorf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
392	}
393	// Delete the cell with timestamp 2000 and repeat the last read,
394	// checking that we get ts 3000 and ts 1000.
395	mut = NewMutation()
396	mut.DeleteTimestampRange("ts", "col", 2000, 3000) // half-open interval
397	if err := tbl.Apply(ctx, "testrow", mut); err != nil {
398		t.Fatalf("Mutating row: %v", err)
399	}
400	r, err = tbl.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
401	if err != nil {
402		t.Fatalf("Reading row: %v", err)
403	}
404	wantRow = Row{"ts": []ReadItem{
405		{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
406		{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
407	}}
408	if !reflect.DeepEqual(r, wantRow) {
409		t.Errorf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow)
410	}
411	checkpoint("tested multiple versions in a cell")
412
413	// Do highly concurrent reads/writes.
414	// TODO(dsymonds): Raise this to 1000 when https://github.com/grpc/grpc-go/issues/205 is resolved.
415	const maxConcurrency = 100
416	var wg sync.WaitGroup
417	for i := 0; i < maxConcurrency; i++ {
418		wg.Add(1)
419		go func() {
420			defer wg.Done()
421			switch r := rand.Intn(100); { // r ∈ [0,100)
422			case 0 <= r && r < 30:
423				// Do a read.
424				_, err := tbl.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1)))
425				if err != nil {
426					t.Errorf("Concurrent read: %v", err)
427				}
428			case 30 <= r && r < 100:
429				// Do a write.
430				mut := NewMutation()
431				mut.Set("ts", "col", 0, []byte("data"))
432				if err := tbl.Apply(ctx, "testrow", mut); err != nil {
433					t.Errorf("Concurrent write: %v", err)
434				}
435			}
436		}()
437	}
438	wg.Wait()
439	checkpoint("tested high concurrency")
440
441	// Large reads, writes and scans.
442	bigBytes := make([]byte, 3<<20) // 3 MB is large, but less than current gRPC max of 4 MB.
443	nonsense := []byte("lorem ipsum dolor sit amet, ")
444	fill(bigBytes, nonsense)
445	mut = NewMutation()
446	mut.Set("ts", "col", 0, bigBytes)
447	if err := tbl.Apply(ctx, "bigrow", mut); err != nil {
448		t.Errorf("Big write: %v", err)
449	}
450	r, err = tbl.ReadRow(ctx, "bigrow")
451	if err != nil {
452		t.Errorf("Big read: %v", err)
453	}
454	wantRow = Row{"ts": []ReadItem{
455		{Row: "bigrow", Column: "ts:col", Value: bigBytes},
456	}}
457	if !reflect.DeepEqual(r, wantRow) {
458		t.Errorf("Big read returned incorrect bytes: %v", r)
459	}
460	// Now write 1000 rows, each with 82 KB values, then scan them all.
461	medBytes := make([]byte, 82<<10)
462	fill(medBytes, nonsense)
463	sem := make(chan int, 50) // do up to 50 mutations at a time.
464	for i := 0; i < 1000; i++ {
465		mut := NewMutation()
466		mut.Set("ts", "big-scan", 0, medBytes)
467		row := fmt.Sprintf("row-%d", i)
468		wg.Add(1)
469		go func() {
470			defer wg.Done()
471			defer func() { <-sem }()
472			sem <- 1
473			if err := tbl.Apply(ctx, row, mut); err != nil {
474				t.Errorf("Preparing large scan: %v", err)
475			}
476		}()
477	}
478	wg.Wait()
479	n := 0
480	err = tbl.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
481		for _, ris := range r {
482			for _, ri := range ris {
483				n += len(ri.Value)
484			}
485		}
486		return true
487	}, RowFilter(ColumnFilter("big-scan")))
488	if err != nil {
489		t.Errorf("Doing large scan: %v", err)
490	}
491	if want := 1000 * len(medBytes); n != want {
492		t.Errorf("Large scan returned %d bytes, want %d", n, want)
493	}
494	checkpoint("tested big read/write/scan")
495
496	// Test bulk mutations
497	if err := adminClient.CreateColumnFamily(ctx, table, "bulk"); err != nil {
498		t.Fatalf("Creating column family: %v", err)
499	}
500	bulkData := map[string][]string{
501		"red sox":  []string{"2004", "2007", "2013"},
502		"patriots": []string{"2001", "2003", "2004", "2014"},
503		"celtics":  []string{"1981", "1984", "1986", "2008"},
504	}
505	var rowKeys []string
506	var muts []*Mutation
507	for row, ss := range bulkData {
508		mut := NewMutation()
509		for _, name := range ss {
510			mut.Set("bulk", name, 0, []byte("1"))
511		}
512		rowKeys = append(rowKeys, row)
513		muts = append(muts, mut)
514	}
515	status, err := tbl.ApplyBulk(ctx, rowKeys, muts)
516	if err != nil {
517		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
518	}
519	if status != nil {
520		t.Errorf("non-nil errors: %v", err)
521	}
522	checkpoint("inserted bulk data")
523
524	// Read each row back
525	for rowKey, ss := range bulkData {
526		row, err := tbl.ReadRow(ctx, rowKey)
527		if err != nil {
528			t.Fatalf("Reading a bulk row: %v", err)
529		}
530		for _, ris := range row {
531			sort.Sort(byColumn(ris))
532		}
533		var wantItems []ReadItem
534		for _, val := range ss {
535			wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Value: []byte("1")})
536		}
537		wantRow := Row{"bulk": wantItems}
538		if !reflect.DeepEqual(row, wantRow) {
539			t.Errorf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
540		}
541	}
542	checkpoint("tested reading from bulk insert")
543
544	// Test bulk write errors
545	badMut := NewMutation()
546	badMut.Set("badfamily", "col", -1, nil)
547	badMut2 := NewMutation()
548	badMut2.Set("badfamily2", "goodcol", -1, []byte("1"))
549	status, err = tbl.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2})
550	if err != nil {
551		t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
552	}
553	if status == nil {
554		t.Errorf("No errors for bad bulk mutation")
555	}
556	if status[0] == nil || status[1] == nil {
557		t.Errorf("No error for bad bulk mutation")
558	}
559}
560
561func formatReadItem(ri ReadItem) string {
562	// Use the column qualifier only to make the test data briefer.
563	col := ri.Column[strings.Index(ri.Column, ":")+1:]
564	return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value)
565}
566
567func fill(b, sub []byte) {
568	for len(b) > len(sub) {
569		n := copy(b, sub)
570		b = b[n:]
571	}
572}
573
574type byColumn []ReadItem
575
576func (b byColumn) Len() int           { return len(b) }
577func (b byColumn) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
578func (b byColumn) Less(i, j int) bool { return b[i].Column < b[j].Column }
579
580func clearTimestamps(r Row) {
581	for _, ris := range r {
582		for i := range ris {
583			ris[i].Timestamp = 0
584		}
585	}
586}
587