1/*
2Copyright 2016 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16package bigtable
17
18import (
19	"context"
20	"strings"
21	"testing"
22	"time"
23
24	"cloud.google.com/go/bigtable/bttest"
25	"cloud.google.com/go/internal/testutil"
26	"github.com/golang/protobuf/ptypes/wrappers"
27	"github.com/google/go-cmp/cmp"
28	"google.golang.org/api/option"
29	btpb "google.golang.org/genproto/googleapis/bigtable/v2"
30	rpcpb "google.golang.org/genproto/googleapis/rpc/status"
31	"google.golang.org/grpc"
32	"google.golang.org/grpc/codes"
33	"google.golang.org/grpc/status"
34)
35
36func setupFakeServer(opt ...grpc.ServerOption) (tbl *Table, cleanup func(), err error) {
37	srv, err := bttest.NewServer("localhost:0", opt...)
38	if err != nil {
39		return nil, nil, err
40	}
41	conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure(), grpc.WithBlock())
42	if err != nil {
43		return nil, nil, err
44	}
45
46	client, err := NewClient(context.Background(), "client", "instance", option.WithGRPCConn(conn), option.WithGRPCDialOption(grpc.WithBlock()))
47	if err != nil {
48		return nil, nil, err
49	}
50
51	adminClient, err := NewAdminClient(context.Background(), "client", "instance", option.WithGRPCConn(conn), option.WithGRPCDialOption(grpc.WithBlock()))
52	if err != nil {
53		return nil, nil, err
54	}
55	if err := adminClient.CreateTable(context.Background(), "table"); err != nil {
56		return nil, nil, err
57	}
58	if err := adminClient.CreateColumnFamily(context.Background(), "table", "cf"); err != nil {
59		return nil, nil, err
60	}
61	t := client.Open("table")
62
63	cleanupFunc := func() {
64		adminClient.Close()
65		client.Close()
66		srv.Close()
67	}
68	return t, cleanupFunc, nil
69}
70
71func TestRetryApply(t *testing.T) {
72	ctx := context.Background()
73
74	errCount := 0
75	code := codes.Unavailable // Will be retried
76	// Intercept requests and return an error or defer to the underlying handler
77	errInjector := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
78		if strings.HasSuffix(info.FullMethod, "MutateRow") && errCount < 3 {
79			errCount++
80			return nil, status.Errorf(code, "")
81		}
82		return handler(ctx, req)
83	}
84	tbl, cleanup, err := setupFakeServer(grpc.UnaryInterceptor(errInjector))
85	if err != nil {
86		t.Fatalf("fake server setup: %v", err)
87	}
88	defer cleanup()
89
90	mut := NewMutation()
91	mut.Set("cf", "col", 1000, []byte("val"))
92	if err := tbl.Apply(ctx, "row1", mut); err != nil {
93		t.Errorf("applying single mutation with retries: %v", err)
94	}
95	row, err := tbl.ReadRow(ctx, "row1")
96	if err != nil {
97		t.Errorf("reading single value with retries: %v", err)
98	}
99	if row == nil {
100		t.Errorf("applying single mutation with retries: could not read back row")
101	}
102
103	code = codes.FailedPrecondition // Won't be retried
104	errCount = 0
105	if err := tbl.Apply(ctx, "row", mut); err == nil {
106		t.Errorf("applying single mutation with no retries: no error")
107	}
108
109	// Check and mutate
110	mutTrue := NewMutation()
111	mutTrue.DeleteRow()
112	mutFalse := NewMutation()
113	mutFalse.Set("cf", "col", 1000, []byte("val"))
114	condMut := NewCondMutation(ValueFilter(".*"), mutTrue, mutFalse)
115
116	errCount = 0
117	code = codes.Unavailable // Will be retried
118	if err := tbl.Apply(ctx, "row1", condMut); err != nil {
119		t.Errorf("conditionally mutating row with retries: %v", err)
120	}
121	row, err = tbl.ReadRow(ctx, "row1") // row1 already in the table
122	if err != nil {
123		t.Errorf("reading single value after conditional mutation: %v", err)
124	}
125	if row != nil {
126		t.Errorf("reading single value after conditional mutation: row not deleted")
127	}
128
129	errCount = 0
130	code = codes.FailedPrecondition // Won't be retried
131	if err := tbl.Apply(ctx, "row", condMut); err == nil {
132		t.Errorf("conditionally mutating row with no retries: no error")
133	}
134}
135
136// Test overall request failure and retries.
137func TestRetryApplyBulk_OverallRequestFailure(t *testing.T) {
138	ctx := context.Background()
139
140	// Intercept requests and delegate to an interceptor defined by the test case
141	errCount := 0
142	errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
143		if strings.HasSuffix(info.FullMethod, "MutateRows") {
144			return func() error {
145				if errCount < 3 {
146					errCount++
147					return status.Errorf(codes.Aborted, "")
148				}
149				return nil
150			}()
151		}
152		return handler(ctx, ss)
153	}
154
155	tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector))
156	defer cleanup()
157	if err != nil {
158		t.Fatalf("fake server setup: %v", err)
159	}
160
161	errCount = 0
162
163	mut := NewMutation()
164	mut.Set("cf", "col", 1, []byte{})
165	errors, err := tbl.ApplyBulk(ctx, []string{"row2"}, []*Mutation{mut})
166	if errors != nil || err != nil {
167		t.Errorf("bulk with request failure: got: %v, %v, want: nil", errors, err)
168	}
169}
170
171func TestRetryApplyBulk_FailuresAndRetriesInOneRequest(t *testing.T) {
172	ctx := context.Background()
173
174	// Intercept requests and delegate to an interceptor defined by the test case
175	errCount := 0
176	errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
177		if strings.HasSuffix(info.FullMethod, "MutateRows") {
178			return func(ss grpc.ServerStream) error {
179				var err error
180				req := new(btpb.MutateRowsRequest)
181				must(ss.RecvMsg(req))
182				switch errCount {
183				case 0:
184					// Retryable request failure
185					err = status.Errorf(codes.Unavailable, "")
186				case 1:
187					// Two mutations fail
188					must(writeMutateRowsResponse(ss, codes.Unavailable, codes.OK, codes.Aborted))
189					err = nil
190				case 2:
191					// Two failures were retried. One will succeed.
192					if want, got := 2, len(req.Entries); want != got {
193						t.Fatalf("2 bulk retries, got: %d, want %d", got, want)
194					}
195					must(writeMutateRowsResponse(ss, codes.OK, codes.Aborted))
196					err = nil
197				case 3:
198					// One failure was retried and will succeed.
199					if want, got := 1, len(req.Entries); want != got {
200						t.Fatalf("1 bulk retry, got: %d, want %d", got, want)
201					}
202					must(writeMutateRowsResponse(ss, codes.OK))
203					err = nil
204				}
205				errCount++
206				return err
207			}(ss)
208		}
209		return handler(ctx, ss)
210	}
211
212	tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector))
213	defer cleanup()
214	if err != nil {
215		t.Fatalf("fake server setup: %v", err)
216	}
217
218	errCount = 0
219
220	errCount = 0
221	m1 := NewMutation()
222	m1.Set("cf", "col", 1, []byte{})
223	m2 := NewMutation()
224	m2.Set("cf", "col2", 1, []byte{})
225	m3 := NewMutation()
226	m3.Set("cf", "col3", 1, []byte{})
227	errors, err := tbl.ApplyBulk(ctx, []string{"row1", "row2", "row3"}, []*Mutation{m1, m2, m3})
228	if errors != nil || err != nil {
229		t.Errorf("bulk with retries: got: %v, %v, want: nil", errors, err)
230	}
231}
232
233func TestRetryApplyBulk_UnretryableErrors(t *testing.T) {
234	ctx := context.Background()
235
236	// Intercept requests and delegate to an interceptor defined by the test case
237	errCount := 0
238	errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
239		if strings.HasSuffix(info.FullMethod, "MutateRows") {
240			return func(ss grpc.ServerStream) error {
241				var err error
242				req := new(btpb.MutateRowsRequest)
243				must(ss.RecvMsg(req))
244				switch errCount {
245				case 0:
246					// Give non-idempotent mutation a retryable error code.
247					// Nothing should be retried.
248					must(writeMutateRowsResponse(ss, codes.FailedPrecondition, codes.Aborted))
249					err = nil
250				case 1:
251					t.Fatalf("unretryable errors: got one retry, want no retries")
252				}
253				errCount++
254				return err
255			}(ss)
256		}
257		return handler(ctx, ss)
258	}
259
260	tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector))
261	defer cleanup()
262	if err != nil {
263		t.Fatalf("fake server setup: %v", err)
264	}
265
266	errCount = 0
267
268	m1 := NewMutation()
269	m1.Set("cf", "col", 1, []byte{})
270	m2 := NewMutation()
271	m2.Set("cf", "col2", 1, []byte{})
272	m3 := NewMutation()
273	m3.Set("cf", "col3", 1, []byte{})
274
275	niMut := NewMutation()
276	niMut.Set("cf", "col", ServerTime, []byte{}) // Non-idempotent
277	errCount = 0
278	errors, err := tbl.ApplyBulk(ctx, []string{"row1", "row2"}, []*Mutation{m1, niMut})
279	if err != nil {
280		t.Fatalf("unretryable errors: request failed %v", err)
281	}
282	want := []error{
283		status.Errorf(codes.FailedPrecondition, ""),
284		status.Errorf(codes.Aborted, ""),
285	}
286	if !testutil.Equal(want, errors,
287		cmp.Comparer(func(x, y error) bool {
288			return x == y || (x != nil && y != nil && x.Error() == y.Error())
289		}),
290	) {
291		t.Errorf("unretryable errors: got: %v, want: %v", errors, want)
292	}
293}
294
295func TestRetryApplyBulk_IndividualErrorsAndDeadlineExceeded(t *testing.T) {
296	ctx := context.Background()
297
298	// Intercept requests and delegate to an interceptor defined by the test case
299	errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
300		if strings.HasSuffix(info.FullMethod, "MutateRows") {
301			return func(ss grpc.ServerStream) error {
302				return writeMutateRowsResponse(ss, codes.FailedPrecondition, codes.OK, codes.Aborted)
303			}(ss)
304		}
305		return handler(ctx, ss)
306	}
307
308	tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector))
309	defer cleanup()
310	if err != nil {
311		t.Fatalf("fake server setup: %v", err)
312	}
313
314	m1 := NewMutation()
315	m1.Set("cf", "col", 1, []byte{})
316	m2 := NewMutation()
317	m2.Set("cf", "col2", 1, []byte{})
318	m3 := NewMutation()
319	m3.Set("cf", "col3", 1, []byte{})
320
321	// This should cause a deadline exceeded error.
322	ctx, cancel := context.WithTimeout(ctx, -10*time.Millisecond)
323	defer cancel()
324	errors, err := tbl.ApplyBulk(ctx, []string{"row1", "row2", "row3"}, []*Mutation{m1, m2, m3})
325	wantErr := context.DeadlineExceeded
326	if wantErr != err {
327		t.Fatalf("deadline exceeded error: got: %v, want: %v", err, wantErr)
328	}
329	if errors != nil {
330		t.Errorf("deadline exceeded errors: got: %v, want: nil", err)
331	}
332}
333
334func writeMutateRowsResponse(ss grpc.ServerStream, codes ...codes.Code) error {
335	res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(codes))}
336	for i, code := range codes {
337		res.Entries[i] = &btpb.MutateRowsResponse_Entry{
338			Index:  int64(i),
339			Status: &rpcpb.Status{Code: int32(code), Message: ""},
340		}
341	}
342	return ss.SendMsg(res)
343}
344
345func TestRetainRowsAfter(t *testing.T) {
346	prevRowRange := NewRange("a", "z")
347	prevRowKey := "m"
348	want := NewRange("m\x00", "z")
349	got := prevRowRange.retainRowsAfter(prevRowKey)
350	if !testutil.Equal(want, got, cmp.AllowUnexported(RowRange{})) {
351		t.Errorf("range retry: got %v, want %v", got, want)
352	}
353
354	prevRowRangeList := RowRangeList{NewRange("a", "d"), NewRange("e", "g"), NewRange("h", "l")}
355	prevRowKey = "f"
356	wantRowRangeList := RowRangeList{NewRange("f\x00", "g"), NewRange("h", "l")}
357	got = prevRowRangeList.retainRowsAfter(prevRowKey)
358	if !testutil.Equal(wantRowRangeList, got, cmp.AllowUnexported(RowRange{})) {
359		t.Errorf("range list retry: got %v, want %v", got, wantRowRangeList)
360	}
361
362	prevRowList := RowList{"a", "b", "c", "d", "e", "f"}
363	prevRowKey = "b"
364	wantList := RowList{"c", "d", "e", "f"}
365	got = prevRowList.retainRowsAfter(prevRowKey)
366	if !testutil.Equal(wantList, got) {
367		t.Errorf("list retry: got %v, want %v", got, wantList)
368	}
369}
370
371func TestRetryReadRows(t *testing.T) {
372	ctx := context.Background()
373
374	// Intercept requests and delegate to an interceptor defined by the test case
375	errCount := 0
376	var f func(grpc.ServerStream) error
377	errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
378		if strings.HasSuffix(info.FullMethod, "ReadRows") {
379			return f(ss)
380		}
381		return handler(ctx, ss)
382	}
383
384	tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector))
385	defer cleanup()
386	if err != nil {
387		t.Fatalf("fake server setup: %v", err)
388	}
389
390	errCount = 0
391	// Test overall request failure and retries
392	f = func(ss grpc.ServerStream) error {
393		var err error
394		req := new(btpb.ReadRowsRequest)
395		must(ss.RecvMsg(req))
396		switch errCount {
397		case 0:
398			// Retryable request failure
399			err = status.Errorf(codes.Unavailable, "")
400		case 1:
401			// Write two rows then error
402			if want, got := "a", string(req.Rows.RowRanges[0].GetStartKeyClosed()); want != got {
403				t.Errorf("first retry, no data received yet: got %q, want %q", got, want)
404			}
405			must(writeReadRowsResponse(ss, "a", "b"))
406			err = status.Errorf(codes.Unavailable, "")
407		case 2:
408			// Retryable request failure
409			if want, got := "b\x00", string(req.Rows.RowRanges[0].GetStartKeyClosed()); want != got {
410				t.Errorf("2 range retries: got %q, want %q", got, want)
411			}
412			err = status.Errorf(codes.Unavailable, "")
413		case 3:
414			// Write two more rows
415			must(writeReadRowsResponse(ss, "c", "d"))
416			err = nil
417		}
418		errCount++
419		return err
420	}
421
422	var got []string
423	must(tbl.ReadRows(ctx, NewRange("a", "z"), func(r Row) bool {
424		got = append(got, r.Key())
425		return true
426	}))
427	want := []string{"a", "b", "c", "d"}
428	if !testutil.Equal(got, want) {
429		t.Errorf("retry range integration: got %v, want %v", got, want)
430	}
431}
432
433func writeReadRowsResponse(ss grpc.ServerStream, rowKeys ...string) error {
434	var chunks []*btpb.ReadRowsResponse_CellChunk
435	for _, key := range rowKeys {
436		chunks = append(chunks, &btpb.ReadRowsResponse_CellChunk{
437			RowKey:     []byte(key),
438			FamilyName: &wrappers.StringValue{Value: "fm"},
439			Qualifier:  &wrappers.BytesValue{Value: []byte("col")},
440			RowStatus:  &btpb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: true},
441		})
442	}
443	return ss.SendMsg(&btpb.ReadRowsResponse{Chunks: chunks})
444}
445
446func must(err error) {
447	if err != nil {
448		panic(err)
449	}
450}
451