1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "reflect" 24 "strings" 25 "sync" 26 "testing" 27 "time" 28 29 . "cloud.google.com/go/spanner/internal/testutil" 30 "github.com/golang/protobuf/ptypes" 31 "github.com/google/go-cmp/cmp" 32 "google.golang.org/genproto/googleapis/rpc/errdetails" 33 sppb "google.golang.org/genproto/googleapis/spanner/v1" 34 "google.golang.org/grpc/codes" 35 gstatus "google.golang.org/grpc/status" 36) 37 38// Single can only be used once. 39func TestSingle(t *testing.T) { 40 t.Parallel() 41 ctx := context.Background() 42 server, client, teardown := setupMockedTestServer(t) 43 defer teardown() 44 45 txn := client.Single() 46 defer txn.Close() 47 _, _, e := txn.acquire(ctx) 48 if e != nil { 49 t.Fatalf("Acquire for single use, got %v, want nil.", e) 50 } 51 _, _, e = txn.acquire(ctx) 52 if wantErr := errTxClosed(); !testEqual(e, wantErr) { 53 t.Fatalf("Second acquire for single use, got %v, want %v.", e, wantErr) 54 } 55 56 // Only one CreateSessionRequest is sent. 57 if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{&sppb.CreateSessionRequest{}}); err != nil { 58 t.Fatal(err) 59 } 60} 61 62// Re-using ReadOnlyTransaction: can recover from acquire failure. 63func TestReadOnlyTransaction_RecoverFromFailure(t *testing.T) { 64 t.Parallel() 65 ctx := context.Background() 66 server, client, teardown := setupMockedTestServer(t) 67 defer teardown() 68 69 txn := client.ReadOnlyTransaction() 70 defer txn.Close() 71 72 // First request will fail. 73 errUsr := gstatus.Error(codes.Unknown, "error") 74 server.TestSpanner.PutExecutionTime(MethodBeginTransaction, 75 SimulatedExecutionTime{ 76 Errors: []error{errUsr}, 77 }) 78 79 _, _, e := txn.acquire(ctx) 80 if wantErr := toSpannerError(errUsr); !testEqual(e, wantErr) { 81 t.Fatalf("Acquire for multi use, got %v, want %v.", e, wantErr) 82 } 83 _, _, e = txn.acquire(ctx) 84 if e != nil { 85 t.Fatalf("Acquire for multi use, got %v, want nil.", e) 86 } 87} 88 89// ReadOnlyTransaction: can not be used after close. 90func TestReadOnlyTransaction_UseAfterClose(t *testing.T) { 91 t.Parallel() 92 ctx := context.Background() 93 _, client, teardown := setupMockedTestServer(t) 94 defer teardown() 95 96 txn := client.ReadOnlyTransaction() 97 txn.Close() 98 99 _, _, e := txn.acquire(ctx) 100 if wantErr := errTxClosed(); !testEqual(e, wantErr) { 101 t.Fatalf("Second acquire for multi use, got %v, want %v.", e, wantErr) 102 } 103} 104 105// ReadOnlyTransaction: can be acquired concurrently. 106func TestReadOnlyTransaction_Concurrent(t *testing.T) { 107 t.Parallel() 108 ctx := context.Background() 109 server, client, teardown := setupMockedTestServer(t) 110 defer teardown() 111 txn := client.ReadOnlyTransaction() 112 defer txn.Close() 113 114 server.TestSpanner.Freeze() 115 var ( 116 sh1 *sessionHandle 117 sh2 *sessionHandle 118 ts1 *sppb.TransactionSelector 119 ts2 *sppb.TransactionSelector 120 wg = sync.WaitGroup{} 121 ) 122 acquire := func(sh **sessionHandle, ts **sppb.TransactionSelector) { 123 defer wg.Done() 124 var e error 125 *sh, *ts, e = txn.acquire(ctx) 126 if e != nil { 127 t.Errorf("Concurrent acquire for multiuse, got %v, expect nil.", e) 128 } 129 } 130 wg.Add(2) 131 go acquire(&sh1, &ts1) 132 go acquire(&sh2, &ts2) 133 134 // TODO(deklerk): Get rid of this. 135 <-time.After(100 * time.Millisecond) 136 137 server.TestSpanner.Unfreeze() 138 wg.Wait() 139 if sh1.session.id != sh2.session.id { 140 t.Fatalf("Expected acquire to get same session handle, got %v and %v.", sh1, sh2) 141 } 142 if !testEqual(ts1, ts2) { 143 t.Fatalf("Expected acquire to get same transaction selector, got %v and %v.", ts1, ts2) 144 } 145} 146 147func TestApply_Single(t *testing.T) { 148 t.Parallel() 149 ctx := context.Background() 150 server, client, teardown := setupMockedTestServer(t) 151 defer teardown() 152 153 ms := []*Mutation{ 154 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 155 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 156 } 157 if _, e := client.Apply(ctx, ms, ApplyAtLeastOnce()); e != nil { 158 t.Fatalf("applyAtLeastOnce retry on abort, got %v, want nil.", e) 159 } 160 161 if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{ 162 &sppb.CreateSessionRequest{}, 163 &sppb.CommitRequest{}, 164 }); err != nil { 165 t.Fatal(err) 166 } 167} 168 169// Transaction retries on abort. 170func TestApply_RetryOnAbort(t *testing.T) { 171 ctx := context.Background() 172 t.Parallel() 173 server, client, teardown := setupMockedTestServer(t) 174 defer teardown() 175 176 // First commit will fail, and the retry will begin a new transaction. 177 server.TestSpanner.PutExecutionTime(MethodCommitTransaction, 178 SimulatedExecutionTime{ 179 Errors: []error{newAbortedErrorWithMinimalRetryDelay()}, 180 }) 181 182 ms := []*Mutation{ 183 Insert("Accounts", []string{"AccountId"}, []interface{}{int64(1)}), 184 } 185 186 if _, e := client.Apply(ctx, ms); e != nil { 187 t.Fatalf("ReadWriteTransaction retry on abort, got %v, want nil.", e) 188 } 189 190 if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{ 191 &sppb.CreateSessionRequest{}, 192 &sppb.BeginTransactionRequest{}, 193 &sppb.CommitRequest{}, // First commit fails. 194 &sppb.BeginTransactionRequest{}, 195 &sppb.CommitRequest{}, // Second commit succeeds. 196 }); err != nil { 197 t.Fatal(err) 198 } 199} 200 201// Tests that SessionNotFound errors are retried. 202func TestTransaction_SessionNotFound(t *testing.T) { 203 t.Parallel() 204 ctx := context.Background() 205 server, client, teardown := setupMockedTestServer(t) 206 defer teardown() 207 208 serverErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s") 209 server.TestSpanner.PutExecutionTime(MethodBeginTransaction, 210 SimulatedExecutionTime{ 211 Errors: []error{serverErr, serverErr, serverErr}, 212 }) 213 server.TestSpanner.PutExecutionTime(MethodCommitTransaction, 214 SimulatedExecutionTime{ 215 Errors: []error{serverErr}, 216 }) 217 218 txn := client.ReadOnlyTransaction() 219 defer txn.Close() 220 221 var wantErr error 222 if _, _, got := txn.acquire(ctx); !testEqual(wantErr, got) { 223 t.Fatalf("Expect acquire to succeed, got %v, want %v.", got, wantErr) 224 } 225 226 // The server error should lead to a retry of the BeginTransaction call and 227 // a valid session handle to be returned that will be used by the following 228 // requests. Note that calling txn.Query(...) does not actually send the 229 // query to the (mock) server. That is done at the first call to 230 // RowIterator.Next. The following statement only verifies that the 231 // transaction is in a valid state and received a valid session handle. 232 if got := txn.Query(ctx, NewStatement("SELECT 1")); !testEqual(wantErr, got.err) { 233 t.Fatalf("Expect Query to succeed, got %v, want %v.", got.err, wantErr) 234 } 235 236 if got := txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"}); !testEqual(wantErr, got.err) { 237 t.Fatalf("Expect Read to succeed, got %v, want %v.", got.err, wantErr) 238 } 239 240 wantErr = toSpannerError(newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")) 241 ms := []*Mutation{ 242 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 243 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 244 } 245 _, got := client.Apply(ctx, ms, ApplyAtLeastOnce()) 246 if !cmp.Equal(wantErr, got, 247 cmp.AllowUnexported(Error{}), cmp.FilterPath(func(path cmp.Path) bool { 248 // Ignore statusError Details and Error.trailers. 249 if strings.Contains(path.GoString(), "{*spanner.Error}.err.(*status.statusError).Details") { 250 return true 251 } 252 if strings.Contains(path.GoString(), "{*spanner.Error}.trailers") { 253 return true 254 } 255 return false 256 }, cmp.Ignore())) { 257 t.Fatalf("Expect Apply to fail\nGot: %v\nWant: %v\n", got, wantErr) 258 } 259} 260 261// When an error is returned from the closure sent into ReadWriteTransaction, it 262// kicks off a rollback. 263func TestReadWriteTransaction_ErrorReturned(t *testing.T) { 264 t.Parallel() 265 ctx := context.Background() 266 server, client, teardown := setupMockedTestServer(t) 267 defer teardown() 268 269 want := errors.New("an error") 270 _, got := client.ReadWriteTransaction(ctx, func(context.Context, *ReadWriteTransaction) error { 271 return want 272 }) 273 if got != want { 274 t.Fatalf("got %+v, want %+v", got, want) 275 } 276 requests := drainRequestsFromServer(server.TestSpanner) 277 if err := compareRequests([]interface{}{ 278 &sppb.CreateSessionRequest{}, 279 &sppb.BeginTransactionRequest{}, 280 &sppb.RollbackRequest{}}, requests); err != nil { 281 // TODO: remove this once the session pool maintainer has been changed 282 // so that is doesn't delete sessions already during the first 283 // maintenance window. 284 // If we failed to get 3, it might have because - due to timing - we got 285 // a fourth request. If this request is DeleteSession, that's OK and 286 // expected. 287 if err := compareRequests([]interface{}{ 288 &sppb.CreateSessionRequest{}, 289 &sppb.BeginTransactionRequest{}, 290 &sppb.RollbackRequest{}, 291 &sppb.DeleteSessionRequest{}}, requests); err != nil { 292 t.Fatal(err) 293 } 294 } 295} 296 297func TestBatchDML_WithMultipleDML(t *testing.T) { 298 t.Parallel() 299 ctx := context.Background() 300 server, client, teardown := setupMockedTestServer(t) 301 defer teardown() 302 303 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 304 if _, err = tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}); err != nil { 305 return err 306 } 307 if _, err = tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}, {SQL: UpdateBarSetFoo}}); err != nil { 308 return err 309 } 310 if _, err = tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}); err != nil { 311 return err 312 } 313 _, err = tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}}) 314 return err 315 }) 316 if err != nil { 317 t.Fatal(err) 318 } 319 320 gotReqs, err := shouldHaveReceived(server.TestSpanner, []interface{}{ 321 &sppb.CreateSessionRequest{}, 322 &sppb.BeginTransactionRequest{}, 323 &sppb.ExecuteSqlRequest{}, 324 &sppb.ExecuteBatchDmlRequest{}, 325 &sppb.ExecuteSqlRequest{}, 326 &sppb.ExecuteBatchDmlRequest{}, 327 &sppb.CommitRequest{}, 328 }) 329 if err != nil { 330 t.Fatal(err) 331 } 332 333 if got, want := gotReqs[2].(*sppb.ExecuteSqlRequest).Seqno, int64(1); got != want { 334 t.Errorf("got %d, want %d", got, want) 335 } 336 if got, want := gotReqs[3].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(2); got != want { 337 t.Errorf("got %d, want %d", got, want) 338 } 339 if got, want := gotReqs[4].(*sppb.ExecuteSqlRequest).Seqno, int64(3); got != want { 340 t.Errorf("got %d, want %d", got, want) 341 } 342 if got, want := gotReqs[5].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(4); got != want { 343 t.Errorf("got %d, want %d", got, want) 344 } 345} 346 347// shouldHaveReceived asserts that exactly expectedRequests were present in 348// the server's ReceivedRequests channel. It only looks at type, not contents. 349// 350// Note: this in-place modifies serverClientMock by popping items off the 351// ReceivedRequests channel. 352func shouldHaveReceived(server InMemSpannerServer, want []interface{}) ([]interface{}, error) { 353 got := drainRequestsFromServer(server) 354 return got, compareRequests(want, got) 355} 356 357// Compares expected requests (want) with actual requests (got). 358func compareRequests(want []interface{}, got []interface{}) error { 359 if len(got) != len(want) { 360 var gotMsg string 361 for _, r := range got { 362 gotMsg += fmt.Sprintf("%v: %+v]\n", reflect.TypeOf(r), r) 363 } 364 365 var wantMsg string 366 for _, r := range want { 367 wantMsg += fmt.Sprintf("%v: %+v]\n", reflect.TypeOf(r), r) 368 } 369 370 return fmt.Errorf("got %d requests, want %d requests:\ngot:\n%s\nwant:\n%s", len(got), len(want), gotMsg, wantMsg) 371 } 372 373 for i, want := range want { 374 if reflect.TypeOf(got[i]) != reflect.TypeOf(want) { 375 return fmt.Errorf("request %d: got %+v, want %+v", i, reflect.TypeOf(got[i]), reflect.TypeOf(want)) 376 } 377 } 378 return nil 379} 380 381func drainRequestsFromServer(server InMemSpannerServer) []interface{} { 382 var reqs []interface{} 383loop: 384 for { 385 select { 386 case req := <-server.ReceivedRequests(): 387 reqs = append(reqs, req) 388 default: 389 break loop 390 } 391 } 392 return reqs 393} 394 395func newAbortedErrorWithMinimalRetryDelay() error { 396 st := gstatus.New(codes.Aborted, "Transaction has been aborted") 397 retry := &errdetails.RetryInfo{ 398 RetryDelay: ptypes.DurationProto(time.Nanosecond), 399 } 400 st, _ = st.WithDetails(retry) 401 return st.Err() 402} 403