1/* 2Copyright 2016 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16package bigtable 17 18import ( 19 "context" 20 "strings" 21 "testing" 22 "time" 23 24 "cloud.google.com/go/bigtable/bttest" 25 "cloud.google.com/go/internal/testutil" 26 "github.com/golang/protobuf/ptypes/wrappers" 27 "github.com/google/go-cmp/cmp" 28 "google.golang.org/api/option" 29 btpb "google.golang.org/genproto/googleapis/bigtable/v2" 30 rpcpb "google.golang.org/genproto/googleapis/rpc/status" 31 "google.golang.org/grpc" 32 "google.golang.org/grpc/codes" 33 "google.golang.org/grpc/status" 34) 35 36func setupFakeServer(opt ...grpc.ServerOption) (tbl *Table, cleanup func(), err error) { 37 srv, err := bttest.NewServer("localhost:0", opt...) 38 if err != nil { 39 return nil, nil, err 40 } 41 conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure(), grpc.WithBlock()) 42 if err != nil { 43 return nil, nil, err 44 } 45 46 client, err := NewClient(context.Background(), "client", "instance", option.WithGRPCConn(conn), option.WithGRPCDialOption(grpc.WithBlock())) 47 if err != nil { 48 return nil, nil, err 49 } 50 51 adminClient, err := NewAdminClient(context.Background(), "client", "instance", option.WithGRPCConn(conn), option.WithGRPCDialOption(grpc.WithBlock())) 52 if err != nil { 53 return nil, nil, err 54 } 55 if err := adminClient.CreateTable(context.Background(), "table"); err != nil { 56 return nil, nil, err 57 } 58 if err := adminClient.CreateColumnFamily(context.Background(), "table", "cf"); err != nil { 59 return nil, nil, err 60 } 61 t := client.Open("table") 62 63 cleanupFunc := func() { 64 adminClient.Close() 65 client.Close() 66 srv.Close() 67 } 68 return t, cleanupFunc, nil 69} 70 71func TestRetryApply(t *testing.T) { 72 ctx := context.Background() 73 74 errCount := 0 75 code := codes.Unavailable // Will be retried 76 // Intercept requests and return an error or defer to the underlying handler 77 errInjector := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { 78 if strings.HasSuffix(info.FullMethod, "MutateRow") && errCount < 3 { 79 errCount++ 80 return nil, status.Errorf(code, "") 81 } 82 return handler(ctx, req) 83 } 84 tbl, cleanup, err := setupFakeServer(grpc.UnaryInterceptor(errInjector)) 85 if err != nil { 86 t.Fatalf("fake server setup: %v", err) 87 } 88 defer cleanup() 89 90 mut := NewMutation() 91 mut.Set("cf", "col", 1000, []byte("val")) 92 if err := tbl.Apply(ctx, "row1", mut); err != nil { 93 t.Errorf("applying single mutation with retries: %v", err) 94 } 95 row, err := tbl.ReadRow(ctx, "row1") 96 if err != nil { 97 t.Errorf("reading single value with retries: %v", err) 98 } 99 if row == nil { 100 t.Errorf("applying single mutation with retries: could not read back row") 101 } 102 103 code = codes.FailedPrecondition // Won't be retried 104 errCount = 0 105 if err := tbl.Apply(ctx, "row", mut); err == nil { 106 t.Errorf("applying single mutation with no retries: no error") 107 } 108 109 // Check and mutate 110 mutTrue := NewMutation() 111 mutTrue.DeleteRow() 112 mutFalse := NewMutation() 113 mutFalse.Set("cf", "col", 1000, []byte("val")) 114 condMut := NewCondMutation(ValueFilter(".*"), mutTrue, mutFalse) 115 116 errCount = 0 117 code = codes.Unavailable // Will be retried 118 if err := tbl.Apply(ctx, "row1", condMut); err != nil { 119 t.Errorf("conditionally mutating row with retries: %v", err) 120 } 121 row, err = tbl.ReadRow(ctx, "row1") // row1 already in the table 122 if err != nil { 123 t.Errorf("reading single value after conditional mutation: %v", err) 124 } 125 if row != nil { 126 t.Errorf("reading single value after conditional mutation: row not deleted") 127 } 128 129 errCount = 0 130 code = codes.FailedPrecondition // Won't be retried 131 if err := tbl.Apply(ctx, "row", condMut); err == nil { 132 t.Errorf("conditionally mutating row with no retries: no error") 133 } 134} 135 136// Test overall request failure and retries. 137func TestRetryApplyBulk_OverallRequestFailure(t *testing.T) { 138 ctx := context.Background() 139 140 // Intercept requests and delegate to an interceptor defined by the test case 141 errCount := 0 142 errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 143 if strings.HasSuffix(info.FullMethod, "MutateRows") { 144 return func() error { 145 if errCount < 3 { 146 errCount++ 147 return status.Errorf(codes.Aborted, "") 148 } 149 return nil 150 }() 151 } 152 return handler(ctx, ss) 153 } 154 155 tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector)) 156 defer cleanup() 157 if err != nil { 158 t.Fatalf("fake server setup: %v", err) 159 } 160 161 errCount = 0 162 163 mut := NewMutation() 164 mut.Set("cf", "col", 1, []byte{}) 165 errors, err := tbl.ApplyBulk(ctx, []string{"row2"}, []*Mutation{mut}) 166 if errors != nil || err != nil { 167 t.Errorf("bulk with request failure: got: %v, %v, want: nil", errors, err) 168 } 169} 170 171func TestRetryApplyBulk_FailuresAndRetriesInOneRequest(t *testing.T) { 172 ctx := context.Background() 173 174 // Intercept requests and delegate to an interceptor defined by the test case 175 errCount := 0 176 errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 177 if strings.HasSuffix(info.FullMethod, "MutateRows") { 178 return func(ss grpc.ServerStream) error { 179 var err error 180 req := new(btpb.MutateRowsRequest) 181 must(ss.RecvMsg(req)) 182 switch errCount { 183 case 0: 184 // Retryable request failure 185 err = status.Errorf(codes.Unavailable, "") 186 case 1: 187 // Two mutations fail 188 must(writeMutateRowsResponse(ss, codes.Unavailable, codes.OK, codes.Aborted)) 189 err = nil 190 case 2: 191 // Two failures were retried. One will succeed. 192 if want, got := 2, len(req.Entries); want != got { 193 t.Fatalf("2 bulk retries, got: %d, want %d", got, want) 194 } 195 must(writeMutateRowsResponse(ss, codes.OK, codes.Aborted)) 196 err = nil 197 case 3: 198 // One failure was retried and will succeed. 199 if want, got := 1, len(req.Entries); want != got { 200 t.Fatalf("1 bulk retry, got: %d, want %d", got, want) 201 } 202 must(writeMutateRowsResponse(ss, codes.OK)) 203 err = nil 204 } 205 errCount++ 206 return err 207 }(ss) 208 } 209 return handler(ctx, ss) 210 } 211 212 tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector)) 213 defer cleanup() 214 if err != nil { 215 t.Fatalf("fake server setup: %v", err) 216 } 217 218 errCount = 0 219 220 errCount = 0 221 m1 := NewMutation() 222 m1.Set("cf", "col", 1, []byte{}) 223 m2 := NewMutation() 224 m2.Set("cf", "col2", 1, []byte{}) 225 m3 := NewMutation() 226 m3.Set("cf", "col3", 1, []byte{}) 227 errors, err := tbl.ApplyBulk(ctx, []string{"row1", "row2", "row3"}, []*Mutation{m1, m2, m3}) 228 if errors != nil || err != nil { 229 t.Errorf("bulk with retries: got: %v, %v, want: nil", errors, err) 230 } 231} 232 233func TestRetryApplyBulk_UnretryableErrors(t *testing.T) { 234 ctx := context.Background() 235 236 // Intercept requests and delegate to an interceptor defined by the test case 237 errCount := 0 238 errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 239 if strings.HasSuffix(info.FullMethod, "MutateRows") { 240 return func(ss grpc.ServerStream) error { 241 var err error 242 req := new(btpb.MutateRowsRequest) 243 must(ss.RecvMsg(req)) 244 switch errCount { 245 case 0: 246 // Give non-idempotent mutation a retryable error code. 247 // Nothing should be retried. 248 must(writeMutateRowsResponse(ss, codes.FailedPrecondition, codes.Aborted)) 249 err = nil 250 case 1: 251 t.Fatalf("unretryable errors: got one retry, want no retries") 252 } 253 errCount++ 254 return err 255 }(ss) 256 } 257 return handler(ctx, ss) 258 } 259 260 tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector)) 261 defer cleanup() 262 if err != nil { 263 t.Fatalf("fake server setup: %v", err) 264 } 265 266 errCount = 0 267 268 m1 := NewMutation() 269 m1.Set("cf", "col", 1, []byte{}) 270 m2 := NewMutation() 271 m2.Set("cf", "col2", 1, []byte{}) 272 m3 := NewMutation() 273 m3.Set("cf", "col3", 1, []byte{}) 274 275 niMut := NewMutation() 276 niMut.Set("cf", "col", ServerTime, []byte{}) // Non-idempotent 277 errCount = 0 278 errors, err := tbl.ApplyBulk(ctx, []string{"row1", "row2"}, []*Mutation{m1, niMut}) 279 if err != nil { 280 t.Fatalf("unretryable errors: request failed %v", err) 281 } 282 want := []error{ 283 status.Errorf(codes.FailedPrecondition, ""), 284 status.Errorf(codes.Aborted, ""), 285 } 286 if !testutil.Equal(want, errors, 287 cmp.Comparer(func(x, y error) bool { 288 return x == y || (x != nil && y != nil && x.Error() == y.Error()) 289 }), 290 ) { 291 t.Errorf("unretryable errors: got: %v, want: %v", errors, want) 292 } 293} 294 295func TestRetryApplyBulk_IndividualErrorsAndDeadlineExceeded(t *testing.T) { 296 ctx := context.Background() 297 298 // Intercept requests and delegate to an interceptor defined by the test case 299 errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 300 if strings.HasSuffix(info.FullMethod, "MutateRows") { 301 return func(ss grpc.ServerStream) error { 302 return writeMutateRowsResponse(ss, codes.FailedPrecondition, codes.OK, codes.Aborted) 303 }(ss) 304 } 305 return handler(ctx, ss) 306 } 307 308 tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector)) 309 defer cleanup() 310 if err != nil { 311 t.Fatalf("fake server setup: %v", err) 312 } 313 314 m1 := NewMutation() 315 m1.Set("cf", "col", 1, []byte{}) 316 m2 := NewMutation() 317 m2.Set("cf", "col2", 1, []byte{}) 318 m3 := NewMutation() 319 m3.Set("cf", "col3", 1, []byte{}) 320 321 // This should cause a deadline exceeded error. 322 ctx, cancel := context.WithTimeout(ctx, -10*time.Millisecond) 323 defer cancel() 324 errors, err := tbl.ApplyBulk(ctx, []string{"row1", "row2", "row3"}, []*Mutation{m1, m2, m3}) 325 wantErr := context.DeadlineExceeded 326 if wantErr != err { 327 t.Fatalf("deadline exceeded error: got: %v, want: %v", err, wantErr) 328 } 329 if errors != nil { 330 t.Errorf("deadline exceeded errors: got: %v, want: nil", err) 331 } 332} 333 334func writeMutateRowsResponse(ss grpc.ServerStream, codes ...codes.Code) error { 335 res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(codes))} 336 for i, code := range codes { 337 res.Entries[i] = &btpb.MutateRowsResponse_Entry{ 338 Index: int64(i), 339 Status: &rpcpb.Status{Code: int32(code), Message: ""}, 340 } 341 } 342 return ss.SendMsg(res) 343} 344 345func TestRetainRowsAfter(t *testing.T) { 346 prevRowRange := NewRange("a", "z") 347 prevRowKey := "m" 348 want := NewRange("m\x00", "z") 349 got := prevRowRange.retainRowsAfter(prevRowKey) 350 if !testutil.Equal(want, got, cmp.AllowUnexported(RowRange{})) { 351 t.Errorf("range retry: got %v, want %v", got, want) 352 } 353 354 prevRowRangeList := RowRangeList{NewRange("a", "d"), NewRange("e", "g"), NewRange("h", "l")} 355 prevRowKey = "f" 356 wantRowRangeList := RowRangeList{NewRange("f\x00", "g"), NewRange("h", "l")} 357 got = prevRowRangeList.retainRowsAfter(prevRowKey) 358 if !testutil.Equal(wantRowRangeList, got, cmp.AllowUnexported(RowRange{})) { 359 t.Errorf("range list retry: got %v, want %v", got, wantRowRangeList) 360 } 361 362 prevRowList := RowList{"a", "b", "c", "d", "e", "f"} 363 prevRowKey = "b" 364 wantList := RowList{"c", "d", "e", "f"} 365 got = prevRowList.retainRowsAfter(prevRowKey) 366 if !testutil.Equal(wantList, got) { 367 t.Errorf("list retry: got %v, want %v", got, wantList) 368 } 369} 370 371func TestRetryReadRows(t *testing.T) { 372 ctx := context.Background() 373 374 // Intercept requests and delegate to an interceptor defined by the test case 375 errCount := 0 376 var f func(grpc.ServerStream) error 377 errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 378 if strings.HasSuffix(info.FullMethod, "ReadRows") { 379 return f(ss) 380 } 381 return handler(ctx, ss) 382 } 383 384 tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector)) 385 defer cleanup() 386 if err != nil { 387 t.Fatalf("fake server setup: %v", err) 388 } 389 390 errCount = 0 391 // Test overall request failure and retries 392 f = func(ss grpc.ServerStream) error { 393 var err error 394 req := new(btpb.ReadRowsRequest) 395 must(ss.RecvMsg(req)) 396 switch errCount { 397 case 0: 398 // Retryable request failure 399 err = status.Errorf(codes.Unavailable, "") 400 case 1: 401 // Write two rows then error 402 if want, got := "a", string(req.Rows.RowRanges[0].GetStartKeyClosed()); want != got { 403 t.Errorf("first retry, no data received yet: got %q, want %q", got, want) 404 } 405 must(writeReadRowsResponse(ss, "a", "b")) 406 err = status.Errorf(codes.Unavailable, "") 407 case 2: 408 // Retryable request failure 409 if want, got := "b\x00", string(req.Rows.RowRanges[0].GetStartKeyClosed()); want != got { 410 t.Errorf("2 range retries: got %q, want %q", got, want) 411 } 412 err = status.Errorf(codes.Unavailable, "") 413 case 3: 414 // Write two more rows 415 must(writeReadRowsResponse(ss, "c", "d")) 416 err = nil 417 } 418 errCount++ 419 return err 420 } 421 422 var got []string 423 must(tbl.ReadRows(ctx, NewRange("a", "z"), func(r Row) bool { 424 got = append(got, r.Key()) 425 return true 426 })) 427 want := []string{"a", "b", "c", "d"} 428 if !testutil.Equal(got, want) { 429 t.Errorf("retry range integration: got %v, want %v", got, want) 430 } 431} 432 433func writeReadRowsResponse(ss grpc.ServerStream, rowKeys ...string) error { 434 var chunks []*btpb.ReadRowsResponse_CellChunk 435 for _, key := range rowKeys { 436 chunks = append(chunks, &btpb.ReadRowsResponse_CellChunk{ 437 RowKey: []byte(key), 438 FamilyName: &wrappers.StringValue{Value: "fm"}, 439 Qualifier: &wrappers.BytesValue{Value: []byte("col")}, 440 RowStatus: &btpb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: true}, 441 }) 442 } 443 return ss.SendMsg(&btpb.ReadRowsResponse{Chunks: chunks}) 444} 445 446func must(err error) { 447 if err != nil { 448 panic(err) 449 } 450} 451