1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "reflect" 24 "strings" 25 "sync" 26 "testing" 27 "time" 28 29 . "cloud.google.com/go/spanner/internal/testutil" 30 "github.com/golang/protobuf/ptypes" 31 "google.golang.org/api/iterator" 32 "google.golang.org/genproto/googleapis/rpc/errdetails" 33 sppb "google.golang.org/genproto/googleapis/spanner/v1" 34 "google.golang.org/grpc/codes" 35 "google.golang.org/grpc/status" 36 gstatus "google.golang.org/grpc/status" 37) 38 39// Single can only be used once. 40func TestSingle(t *testing.T) { 41 t.Parallel() 42 ctx := context.Background() 43 server, client, teardown := setupMockedTestServer(t) 44 defer teardown() 45 46 txn := client.Single() 47 defer txn.Close() 48 _, _, e := txn.acquire(ctx) 49 if e != nil { 50 t.Fatalf("Acquire for single use, got %v, want nil.", e) 51 } 52 _, _, e = txn.acquire(ctx) 53 if wantErr := errTxClosed(); !testEqual(e, wantErr) { 54 t.Fatalf("Second acquire for single use, got %v, want %v.", e, wantErr) 55 } 56 57 // Only one BatchCreateSessionsRequest is sent. 58 if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{&sppb.BatchCreateSessionsRequest{}}); err != nil { 59 t.Fatal(err) 60 } 61} 62 63// Re-using ReadOnlyTransaction: can recover from acquire failure. 64func TestReadOnlyTransaction_RecoverFromFailure(t *testing.T) { 65 t.Parallel() 66 ctx := context.Background() 67 server, client, teardown := setupMockedTestServer(t) 68 defer teardown() 69 70 txn := client.ReadOnlyTransaction() 71 defer txn.Close() 72 73 // First request will fail. 74 errUsr := gstatus.Error(codes.Unknown, "error") 75 server.TestSpanner.PutExecutionTime(MethodBeginTransaction, 76 SimulatedExecutionTime{ 77 Errors: []error{errUsr}, 78 }) 79 80 _, _, e := txn.acquire(ctx) 81 if wantErr := ToSpannerError(errUsr); !testEqual(e, wantErr) { 82 t.Fatalf("Acquire for multi use, got %v, want %v.", e, wantErr) 83 } 84 _, _, e = txn.acquire(ctx) 85 if e != nil { 86 t.Fatalf("Acquire for multi use, got %v, want nil.", e) 87 } 88} 89 90// ReadOnlyTransaction: can not be used after close. 91func TestReadOnlyTransaction_UseAfterClose(t *testing.T) { 92 t.Parallel() 93 ctx := context.Background() 94 _, client, teardown := setupMockedTestServer(t) 95 defer teardown() 96 97 txn := client.ReadOnlyTransaction() 98 txn.Close() 99 100 _, _, e := txn.acquire(ctx) 101 if wantErr := errTxClosed(); !testEqual(e, wantErr) { 102 t.Fatalf("Second acquire for multi use, got %v, want %v.", e, wantErr) 103 } 104} 105 106// ReadOnlyTransaction: can be acquired concurrently. 107func TestReadOnlyTransaction_Concurrent(t *testing.T) { 108 t.Parallel() 109 ctx := context.Background() 110 server, client, teardown := setupMockedTestServer(t) 111 defer teardown() 112 txn := client.ReadOnlyTransaction() 113 defer txn.Close() 114 115 server.TestSpanner.Freeze() 116 var ( 117 sh1 *sessionHandle 118 sh2 *sessionHandle 119 ts1 *sppb.TransactionSelector 120 ts2 *sppb.TransactionSelector 121 wg = sync.WaitGroup{} 122 ) 123 acquire := func(sh **sessionHandle, ts **sppb.TransactionSelector) { 124 defer wg.Done() 125 var e error 126 *sh, *ts, e = txn.acquire(ctx) 127 if e != nil { 128 t.Errorf("Concurrent acquire for multiuse, got %v, expect nil.", e) 129 } 130 } 131 wg.Add(2) 132 go acquire(&sh1, &ts1) 133 go acquire(&sh2, &ts2) 134 135 // TODO(deklerk): Get rid of this. 136 <-time.After(100 * time.Millisecond) 137 138 server.TestSpanner.Unfreeze() 139 wg.Wait() 140 if sh1.session.id != sh2.session.id { 141 t.Fatalf("Expected acquire to get same session handle, got %v and %v.", sh1, sh2) 142 } 143 if !testEqual(ts1, ts2) { 144 t.Fatalf("Expected acquire to get same transaction selector, got %v and %v.", ts1, ts2) 145 } 146} 147 148func TestApply_Single(t *testing.T) { 149 t.Parallel() 150 ctx := context.Background() 151 server, client, teardown := setupMockedTestServer(t) 152 defer teardown() 153 154 ms := []*Mutation{ 155 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 156 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 157 } 158 if _, e := client.Apply(ctx, ms, ApplyAtLeastOnce()); e != nil { 159 t.Fatalf("applyAtLeastOnce retry on abort, got %v, want nil.", e) 160 } 161 162 if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{ 163 &sppb.BatchCreateSessionsRequest{}, 164 &sppb.CommitRequest{}, 165 }); err != nil { 166 t.Fatal(err) 167 } 168} 169 170// Transaction retries on abort. 171func TestApply_RetryOnAbort(t *testing.T) { 172 ctx := context.Background() 173 t.Parallel() 174 server, client, teardown := setupMockedTestServer(t) 175 defer teardown() 176 177 // First commit will fail, and the retry will begin a new transaction. 178 server.TestSpanner.PutExecutionTime(MethodCommitTransaction, 179 SimulatedExecutionTime{ 180 Errors: []error{newAbortedErrorWithMinimalRetryDelay()}, 181 }) 182 183 ms := []*Mutation{ 184 Insert("Accounts", []string{"AccountId"}, []interface{}{int64(1)}), 185 } 186 187 if _, e := client.Apply(ctx, ms); e != nil { 188 t.Fatalf("ReadWriteTransaction retry on abort, got %v, want nil.", e) 189 } 190 191 if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{ 192 &sppb.BatchCreateSessionsRequest{}, 193 &sppb.BeginTransactionRequest{}, 194 &sppb.CommitRequest{}, // First commit fails. 195 &sppb.BeginTransactionRequest{}, 196 &sppb.CommitRequest{}, // Second commit succeeds. 197 }); err != nil { 198 t.Fatal(err) 199 } 200} 201 202// Tests that SessionNotFound errors are retried. 203func TestTransaction_SessionNotFound(t *testing.T) { 204 t.Parallel() 205 ctx := context.Background() 206 server, client, teardown := setupMockedTestServer(t) 207 defer teardown() 208 209 serverErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s") 210 server.TestSpanner.PutExecutionTime(MethodBeginTransaction, 211 SimulatedExecutionTime{ 212 Errors: []error{serverErr, serverErr, serverErr}, 213 }) 214 server.TestSpanner.PutExecutionTime(MethodCommitTransaction, 215 SimulatedExecutionTime{ 216 Errors: []error{serverErr}, 217 }) 218 219 txn := client.ReadOnlyTransaction() 220 defer txn.Close() 221 222 var wantErr error 223 if _, _, got := txn.acquire(ctx); !testEqual(wantErr, got) { 224 t.Fatalf("Expect acquire to succeed, got %v, want %v.", got, wantErr) 225 } 226 227 // The server error should lead to a retry of the BeginTransaction call and 228 // a valid session handle to be returned that will be used by the following 229 // requests. Note that calling txn.Query(...) does not actually send the 230 // query to the (mock) server. That is done at the first call to 231 // RowIterator.Next. The following statement only verifies that the 232 // transaction is in a valid state and received a valid session handle. 233 if got := txn.Query(ctx, NewStatement("SELECT 1")); !testEqual(wantErr, got.err) { 234 t.Fatalf("Expect Query to succeed, got %v, want %v.", got.err, wantErr) 235 } 236 237 if got := txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"}); !testEqual(wantErr, got.err) { 238 t.Fatalf("Expect Read to succeed, got %v, want %v.", got.err, wantErr) 239 } 240 241 wantErr = ToSpannerError(newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")) 242 ms := []*Mutation{ 243 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 244 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 245 } 246 _, got := client.Apply(ctx, ms, ApplyAtLeastOnce()) 247 if !testEqual(wantErr, got) { 248 t.Fatalf("Expect Apply to fail\nGot: %v\nWant: %v\n", got, wantErr) 249 } 250} 251 252// When an error is returned from the closure sent into ReadWriteTransaction, it 253// kicks off a rollback. 254func TestReadWriteTransaction_ErrorReturned(t *testing.T) { 255 t.Parallel() 256 ctx := context.Background() 257 server, client, teardown := setupMockedTestServer(t) 258 defer teardown() 259 260 want := errors.New("an error") 261 _, got := client.ReadWriteTransaction(ctx, func(context.Context, *ReadWriteTransaction) error { 262 return want 263 }) 264 if got != want { 265 t.Fatalf("got %+v, want %+v", got, want) 266 } 267 requests := drainRequestsFromServer(server.TestSpanner) 268 if err := compareRequests([]interface{}{ 269 &sppb.BatchCreateSessionsRequest{}, 270 &sppb.BeginTransactionRequest{}, 271 &sppb.RollbackRequest{}}, requests); err != nil { 272 // TODO: remove this once the session pool maintainer has been changed 273 // so that is doesn't delete sessions already during the first 274 // maintenance window. 275 // If we failed to get 3, it might have because - due to timing - we got 276 // a fourth request. If this request is DeleteSession, that's OK and 277 // expected. 278 if err := compareRequests([]interface{}{ 279 &sppb.BatchCreateSessionsRequest{}, 280 &sppb.BeginTransactionRequest{}, 281 &sppb.RollbackRequest{}, 282 &sppb.DeleteSessionRequest{}}, requests); err != nil { 283 t.Fatal(err) 284 } 285 } 286} 287 288func TestBatchDML_WithMultipleDML(t *testing.T) { 289 t.Parallel() 290 ctx := context.Background() 291 server, client, teardown := setupMockedTestServer(t) 292 defer teardown() 293 294 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 295 if _, err = tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}); err != nil { 296 return err 297 } 298 if _, err = tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}, {SQL: UpdateBarSetFoo}}); err != nil { 299 return err 300 } 301 if _, err = tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}); err != nil { 302 return err 303 } 304 _, err = tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}}) 305 return err 306 }) 307 if err != nil { 308 t.Fatal(err) 309 } 310 311 gotReqs, err := shouldHaveReceived(server.TestSpanner, []interface{}{ 312 &sppb.BatchCreateSessionsRequest{}, 313 &sppb.BeginTransactionRequest{}, 314 &sppb.ExecuteSqlRequest{}, 315 &sppb.ExecuteBatchDmlRequest{}, 316 &sppb.ExecuteSqlRequest{}, 317 &sppb.ExecuteBatchDmlRequest{}, 318 &sppb.CommitRequest{}, 319 }) 320 if err != nil { 321 t.Fatal(err) 322 } 323 324 if got, want := gotReqs[2].(*sppb.ExecuteSqlRequest).Seqno, int64(1); got != want { 325 t.Errorf("got %d, want %d", got, want) 326 } 327 if got, want := gotReqs[3].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(2); got != want { 328 t.Errorf("got %d, want %d", got, want) 329 } 330 if got, want := gotReqs[4].(*sppb.ExecuteSqlRequest).Seqno, int64(3); got != want { 331 t.Errorf("got %d, want %d", got, want) 332 } 333 if got, want := gotReqs[5].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(4); got != want { 334 t.Errorf("got %d, want %d", got, want) 335 } 336} 337 338// When an Aborted error happens during a commit, it does not kick off a 339// rollback. 340func TestReadWriteStmtBasedTransaction_CommitAbortedErrorReturned(t *testing.T) { 341 t.Parallel() 342 ctx := context.Background() 343 server, client, teardown := setupMockedTestServer(t) 344 defer teardown() 345 server.TestSpanner.PutExecutionTime(MethodCommitTransaction, 346 SimulatedExecutionTime{ 347 Errors: []error{status.Errorf(codes.Aborted, "Transaction aborted")}, 348 }) 349 350 txn, err := NewReadWriteStmtBasedTransaction(ctx, client) 351 if err != nil { 352 t.Fatalf("got an error: %v", err) 353 } 354 _, err = txn.Commit(ctx) 355 if status.Code(err) != codes.Aborted || !strings.Contains(err.Error(), "Transaction aborted") { 356 t.Fatalf("got an incorrect error: %v", err) 357 } 358 359 requests := drainRequestsFromServer(server.TestSpanner) 360 if err := compareRequests([]interface{}{ 361 &sppb.BatchCreateSessionsRequest{}, 362 &sppb.BeginTransactionRequest{}, 363 &sppb.CommitRequest{}}, requests); err != nil { 364 // TODO: remove this once the session pool maintainer has been changed 365 // so that is doesn't delete sessions already during the first 366 // maintenance window. 367 // If we failed to get 4, it might have because - due to timing - we got 368 // a fourth request. If this request is DeleteSession, that's OK and 369 // expected. 370 if err := compareRequests([]interface{}{ 371 &sppb.BatchCreateSessionsRequest{}, 372 &sppb.BeginTransactionRequest{}, 373 &sppb.CommitRequest{}, 374 &sppb.DeleteSessionRequest{}}, requests); err != nil { 375 t.Fatal(err) 376 } 377 } 378} 379 380// When a non-aborted error happens during a commit, it kicks off a rollback. 381func TestReadWriteStmtBasedTransaction_CommitNonAbortedErrorReturned(t *testing.T) { 382 t.Parallel() 383 ctx := context.Background() 384 server, client, teardown := setupMockedTestServer(t) 385 defer teardown() 386 server.TestSpanner.PutExecutionTime(MethodCommitTransaction, 387 SimulatedExecutionTime{ 388 Errors: []error{status.Errorf(codes.NotFound, "Session not found")}, 389 }) 390 391 txn, err := NewReadWriteStmtBasedTransaction(ctx, client) 392 if err != nil { 393 t.Fatalf("got an error: %v", err) 394 } 395 _, err = txn.Commit(ctx) 396 if status.Code(err) != codes.NotFound || !strings.Contains(err.Error(), "Session not found") { 397 t.Fatalf("got an incorrect error: %v", err) 398 } 399 400 requests := drainRequestsFromServer(server.TestSpanner) 401 if err := compareRequests([]interface{}{ 402 &sppb.BatchCreateSessionsRequest{}, 403 &sppb.BeginTransactionRequest{}, 404 &sppb.CommitRequest{}, 405 &sppb.RollbackRequest{}}, requests); err != nil { 406 // TODO: remove this once the session pool maintainer has been changed 407 // so that is doesn't delete sessions already during the first 408 // maintenance window. 409 // If we failed to get 4, it might have because - due to timing - we got 410 // a fourth request. If this request is DeleteSession, that's OK and 411 // expected. 412 if err := compareRequests([]interface{}{ 413 &sppb.BatchCreateSessionsRequest{}, 414 &sppb.BeginTransactionRequest{}, 415 &sppb.CommitRequest{}, 416 &sppb.RollbackRequest{}, 417 &sppb.DeleteSessionRequest{}}, requests); err != nil { 418 t.Fatal(err) 419 } 420 } 421} 422 423func TestReadWriteStmtBasedTransaction(t *testing.T) { 424 t.Parallel() 425 426 rowCount, attempts, err := testReadWriteStmtBasedTransaction(t, make(map[string]SimulatedExecutionTime)) 427 if err != nil { 428 t.Fatalf("transaction failed to commit: %v", err) 429 } 430 if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount { 431 t.Fatalf("Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount) 432 } 433 if g, w := attempts, 1; g != w { 434 t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w) 435 } 436} 437 438func TestReadWriteStmtBasedTransaction_CommitAborted(t *testing.T) { 439 t.Parallel() 440 rowCount, attempts, err := testReadWriteStmtBasedTransaction(t, map[string]SimulatedExecutionTime{ 441 MethodCommitTransaction: {Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}}, 442 }) 443 if err != nil { 444 t.Fatalf("transaction failed to commit: %v", err) 445 } 446 if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount { 447 t.Fatalf("Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount) 448 } 449 if g, w := attempts, 2; g != w { 450 t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w) 451 } 452} 453 454func testReadWriteStmtBasedTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime) (rowCount int64, attempts int, err error) { 455 server, client, teardown := setupMockedTestServer(t) 456 defer teardown() 457 for method, exec := range executionTimes { 458 server.TestSpanner.PutExecutionTime(method, exec) 459 } 460 ctx := context.Background() 461 462 f := func(tx *ReadWriteStmtBasedTransaction) (int64, error) { 463 iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 464 defer iter.Stop() 465 rowCount := int64(0) 466 for { 467 row, err := iter.Next() 468 if err == iterator.Done { 469 break 470 } 471 if err != nil { 472 return 0, err 473 } 474 var singerID, albumID int64 475 var albumTitle string 476 if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil { 477 return 0, err 478 } 479 rowCount++ 480 } 481 return rowCount, nil 482 } 483 484 for { 485 attempts++ 486 tx, err := NewReadWriteStmtBasedTransaction(ctx, client) 487 if err != nil { 488 return 0, attempts, fmt.Errorf("failed to begin a transaction: %v", err) 489 } 490 rowCount, err = f(tx) 491 if err != nil && status.Code(err) != codes.Aborted { 492 tx.Rollback(ctx) 493 return 0, attempts, err 494 } else if err == nil { 495 _, err = tx.Commit(ctx) 496 if err == nil { 497 break 498 } else if status.Code(err) != codes.Aborted { 499 return 0, attempts, err 500 } 501 } 502 // Set a default sleep time if the server delay is absent. 503 delay := 10 * time.Millisecond 504 if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay { 505 delay = serverDelay 506 } 507 time.Sleep(delay) 508 } 509 510 return rowCount, attempts, err 511} 512 513func TestReadWriteStmtBasedTransactionWithOptions(t *testing.T) { 514 t.Parallel() 515 516 _, client, teardown := setupMockedTestServer(t) 517 defer teardown() 518 ctx := context.Background() 519 520 f := func(tx *ReadWriteStmtBasedTransaction) (int64, error) { 521 iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 522 defer iter.Stop() 523 rowCount := int64(0) 524 for { 525 row, err := iter.Next() 526 if err == iterator.Done { 527 break 528 } 529 if err != nil { 530 return 0, err 531 } 532 var singerID, albumID int64 533 var albumTitle string 534 if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil { 535 return 0, err 536 } 537 rowCount++ 538 } 539 return rowCount, nil 540 } 541 542 var resp CommitResponse 543 for { 544 tx, err := NewReadWriteStmtBasedTransactionWithOptions( 545 ctx, 546 client, 547 TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}, 548 ) 549 _, err = f(tx) 550 if err != nil && status.Code(err) != codes.Aborted { 551 tx.Rollback(ctx) 552 break 553 } else if err == nil { 554 resp, err = tx.CommitWithReturnResp(ctx) 555 break 556 } 557 // Set a default sleep time if the server delay is absent. 558 delay := 10 * time.Millisecond 559 if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay { 560 delay = serverDelay 561 } 562 time.Sleep(delay) 563 } 564 if got, want := resp.CommitStats.MutationCount, int64(1); got != want { 565 t.Fatalf("Mismatch mutation count - got: %d, want: %d", got, want) 566 } 567} 568 569func TestBatchDML_StatementBased_WithMultipleDML(t *testing.T) { 570 t.Parallel() 571 ctx := context.Background() 572 server, client, teardown := setupMockedTestServer(t) 573 defer teardown() 574 575 tx, err := NewReadWriteStmtBasedTransaction(ctx, client) 576 if _, err = tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}); err != nil { 577 tx.Rollback(ctx) 578 t.Fatal(err) 579 } 580 if _, err = tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}, {SQL: UpdateBarSetFoo}}); err != nil { 581 tx.Rollback(ctx) 582 t.Fatal(err) 583 } 584 if _, err = tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}); err != nil { 585 tx.Rollback(ctx) 586 t.Fatal(err) 587 } 588 if _, err = tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}}); err != nil { 589 tx.Rollback(ctx) 590 t.Fatal(err) 591 } 592 _, err = tx.Commit(ctx) 593 if err != nil { 594 t.Fatal(err) 595 } 596 597 gotReqs, err := shouldHaveReceived(server.TestSpanner, []interface{}{ 598 &sppb.BatchCreateSessionsRequest{}, 599 &sppb.BeginTransactionRequest{}, 600 &sppb.ExecuteSqlRequest{}, 601 &sppb.ExecuteBatchDmlRequest{}, 602 &sppb.ExecuteSqlRequest{}, 603 &sppb.ExecuteBatchDmlRequest{}, 604 &sppb.CommitRequest{}, 605 }) 606 if err != nil { 607 t.Fatal(err) 608 } 609 610 if got, want := gotReqs[2].(*sppb.ExecuteSqlRequest).Seqno, int64(1); got != want { 611 t.Errorf("got %d, want %d", got, want) 612 } 613 if got, want := gotReqs[3].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(2); got != want { 614 t.Errorf("got %d, want %d", got, want) 615 } 616 if got, want := gotReqs[4].(*sppb.ExecuteSqlRequest).Seqno, int64(3); got != want { 617 t.Errorf("got %d, want %d", got, want) 618 } 619 if got, want := gotReqs[5].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(4); got != want { 620 t.Errorf("got %d, want %d", got, want) 621 } 622} 623 624// shouldHaveReceived asserts that exactly expectedRequests were present in 625// the server's ReceivedRequests channel. It only looks at type, not contents. 626// 627// Note: this in-place modifies serverClientMock by popping items off the 628// ReceivedRequests channel. 629func shouldHaveReceived(server InMemSpannerServer, want []interface{}) ([]interface{}, error) { 630 got := drainRequestsFromServer(server) 631 return got, compareRequests(want, got) 632} 633 634// Compares expected requests (want) with actual requests (got). 635func compareRequests(want []interface{}, got []interface{}) error { 636 if len(got) != len(want) { 637 var gotMsg string 638 for _, r := range got { 639 gotMsg += fmt.Sprintf("%v: %+v]\n", reflect.TypeOf(r), r) 640 } 641 642 var wantMsg string 643 for _, r := range want { 644 wantMsg += fmt.Sprintf("%v: %+v]\n", reflect.TypeOf(r), r) 645 } 646 647 return fmt.Errorf("got %d requests, want %d requests:\ngot:\n%s\nwant:\n%s", len(got), len(want), gotMsg, wantMsg) 648 } 649 650 for i, want := range want { 651 if reflect.TypeOf(got[i]) != reflect.TypeOf(want) { 652 return fmt.Errorf("request %d: got %+v, want %+v", i, reflect.TypeOf(got[i]), reflect.TypeOf(want)) 653 } 654 } 655 return nil 656} 657 658func drainRequestsFromServer(server InMemSpannerServer) []interface{} { 659 var reqs []interface{} 660loop: 661 for { 662 select { 663 case req := <-server.ReceivedRequests(): 664 reqs = append(reqs, req) 665 default: 666 break loop 667 } 668 } 669 return reqs 670} 671 672func newAbortedErrorWithMinimalRetryDelay() error { 673 st := gstatus.New(codes.Aborted, "Transaction has been aborted") 674 retry := &errdetails.RetryInfo{ 675 RetryDelay: ptypes.DurationProto(time.Nanosecond), 676 } 677 st, _ = st.WithDetails(retry) 678 return st.Err() 679} 680