1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "fmt" 22 "io" 23 "math/big" 24 "os" 25 "strings" 26 "testing" 27 "time" 28 29 "cloud.google.com/go/civil" 30 itestutil "cloud.google.com/go/internal/testutil" 31 vkit "cloud.google.com/go/spanner/apiv1" 32 . "cloud.google.com/go/spanner/internal/testutil" 33 structpb "github.com/golang/protobuf/ptypes/struct" 34 gax "github.com/googleapis/gax-go/v2" 35 "google.golang.org/api/iterator" 36 "google.golang.org/api/option" 37 sppb "google.golang.org/genproto/googleapis/spanner/v1" 38 "google.golang.org/grpc/codes" 39 "google.golang.org/grpc/status" 40) 41 42func setupMockedTestServer(t *testing.T) (server *MockedSpannerInMemTestServer, client *Client, teardown func()) { 43 return setupMockedTestServerWithConfig(t, ClientConfig{}) 44} 45 46func setupMockedTestServerWithConfig(t *testing.T, config ClientConfig) (server *MockedSpannerInMemTestServer, client *Client, teardown func()) { 47 return setupMockedTestServerWithConfigAndClientOptions(t, config, []option.ClientOption{}) 48} 49 50func setupMockedTestServerWithConfigAndClientOptions(t *testing.T, config ClientConfig, clientOptions []option.ClientOption) (server *MockedSpannerInMemTestServer, client *Client, teardown func()) { 51 grpcHeaderChecker := &itestutil.HeadersEnforcer{ 52 OnFailure: t.Fatalf, 53 Checkers: []*itestutil.HeaderChecker{ 54 { 55 Key: "x-goog-api-client", 56 ValuesValidator: func(token ...string) error { 57 if len(token) != 1 { 58 return status.Errorf(codes.Internal, "unexpected number of api client token headers: %v", len(token)) 59 } 60 if !strings.HasPrefix(token[0], "gl-go/") { 61 return status.Errorf(codes.Internal, "unexpected api client token: %v", token[0]) 62 } 63 if !strings.Contains(token[0], "gccl/") { 64 return status.Errorf(codes.Internal, "unexpected api client token: %v", token[0]) 65 } 66 return nil 67 }, 68 }, 69 }, 70 } 71 clientOptions = append(clientOptions, grpcHeaderChecker.CallOptions()...) 72 server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t) 73 opts = append(opts, clientOptions...) 74 ctx := context.Background() 75 formattedDatabase := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "[PROJECT]", "[INSTANCE]", "[DATABASE]") 76 client, err := NewClientWithConfig(ctx, formattedDatabase, config, opts...) 77 if err != nil { 78 t.Fatal(err) 79 } 80 return server, client, func() { 81 client.Close() 82 serverTeardown() 83 } 84} 85 86// Test validDatabaseName() 87func TestValidDatabaseName(t *testing.T) { 88 validDbURI := "projects/spanner-cloud-test/instances/foo/databases/foodb" 89 invalidDbUris := []string{ 90 // Completely wrong DB URI. 91 "foobarDB", 92 // Project ID contains "/". 93 "projects/spanner-cloud/test/instances/foo/databases/foodb", 94 // No instance ID. 95 "projects/spanner-cloud-test/instances//databases/foodb", 96 } 97 if err := validDatabaseName(validDbURI); err != nil { 98 t.Errorf("validateDatabaseName(%q) = %v, want nil", validDbURI, err) 99 } 100 for _, d := range invalidDbUris { 101 if err, wantErr := validDatabaseName(d), "should conform to pattern"; !strings.Contains(err.Error(), wantErr) { 102 t.Errorf("validateDatabaseName(%q) = %q, want error pattern %q", validDbURI, err, wantErr) 103 } 104 } 105} 106 107func TestReadOnlyTransactionClose(t *testing.T) { 108 // Closing a ReadOnlyTransaction shouldn't panic. 109 c := &Client{} 110 tx := c.ReadOnlyTransaction() 111 tx.Close() 112} 113 114func TestClient_Single(t *testing.T) { 115 t.Parallel() 116 err := testSingleQuery(t, nil) 117 if err != nil { 118 t.Fatal(err) 119 } 120} 121 122func TestClient_Single_Unavailable(t *testing.T) { 123 t.Parallel() 124 err := testSingleQuery(t, status.Error(codes.Unavailable, "Temporary unavailable")) 125 if err != nil { 126 t.Fatal(err) 127 } 128} 129 130func TestClient_Single_InvalidArgument(t *testing.T) { 131 t.Parallel() 132 err := testSingleQuery(t, status.Error(codes.InvalidArgument, "Invalid argument")) 133 if status.Code(err) != codes.InvalidArgument { 134 t.Fatalf("got: %v, want: %v", err, codes.InvalidArgument) 135 } 136} 137 138func TestClient_Single_SessionNotFound(t *testing.T) { 139 t.Parallel() 140 141 server, client, teardown := setupMockedTestServer(t) 142 defer teardown() 143 server.TestSpanner.PutExecutionTime( 144 MethodExecuteStreamingSql, 145 SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, 146 ) 147 ctx := context.Background() 148 iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 149 defer iter.Stop() 150 rowCount := int64(0) 151 for { 152 _, err := iter.Next() 153 if err == iterator.Done { 154 break 155 } 156 if err != nil { 157 t.Fatal(err) 158 } 159 rowCount++ 160 } 161 if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount { 162 t.Fatalf("row count mismatch\nGot: %v\nWant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount) 163 } 164} 165 166func TestClient_Single_Read_SessionNotFound(t *testing.T) { 167 t.Parallel() 168 169 server, client, teardown := setupMockedTestServer(t) 170 defer teardown() 171 server.TestSpanner.PutExecutionTime( 172 MethodStreamingRead, 173 SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, 174 ) 175 ctx := context.Background() 176 iter := client.Single().Read(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}) 177 defer iter.Stop() 178 rowCount := int64(0) 179 for { 180 _, err := iter.Next() 181 if err == iterator.Done { 182 break 183 } 184 if err != nil { 185 t.Fatal(err) 186 } 187 rowCount++ 188 } 189 if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount { 190 t.Fatalf("row count mismatch\nGot: %v\nWant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount) 191 } 192} 193 194func TestClient_Single_ReadRow_SessionNotFound(t *testing.T) { 195 t.Parallel() 196 197 server, client, teardown := setupMockedTestServer(t) 198 defer teardown() 199 server.TestSpanner.PutExecutionTime( 200 MethodStreamingRead, 201 SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, 202 ) 203 ctx := context.Background() 204 row, err := client.Single().ReadRow(ctx, "Albums", Key{"foo"}, []string{"SingerId", "AlbumId", "AlbumTitle"}) 205 if err != nil { 206 t.Fatalf("Unexpected error for read row: %v", err) 207 } 208 if row == nil { 209 t.Fatal("ReadRow did not return a row") 210 } 211} 212 213func TestClient_Single_RetryableErrorOnPartialResultSet(t *testing.T) { 214 t.Parallel() 215 server, client, teardown := setupMockedTestServer(t) 216 defer teardown() 217 218 // Add two errors that will be returned by the mock server when the client 219 // is trying to fetch a partial result set. Both errors are retryable. 220 // The errors are not 'sticky' on the mocked server, i.e. once the error 221 // has been returned once, the next call for the same partial result set 222 // will succeed. 223 224 // When the client is fetching the partial result set with resume token 2, 225 // the mock server will respond with an internal error with the message 226 // 'stream terminated by RST_STREAM'. The client will retry the call to get 227 // this partial result set. 228 server.TestSpanner.AddPartialResultSetError( 229 SelectSingerIDAlbumIDAlbumTitleFromAlbums, 230 PartialResultSetExecutionTime{ 231 ResumeToken: EncodeResumeToken(2), 232 Err: status.Errorf(codes.Internal, "stream terminated by RST_STREAM"), 233 }, 234 ) 235 // When the client is fetching the partial result set with resume token 3, 236 // the mock server will respond with a 'Unavailable' error. The client will 237 // retry the call to get this partial result set. 238 server.TestSpanner.AddPartialResultSetError( 239 SelectSingerIDAlbumIDAlbumTitleFromAlbums, 240 PartialResultSetExecutionTime{ 241 ResumeToken: EncodeResumeToken(3), 242 Err: status.Errorf(codes.Unavailable, "server is unavailable"), 243 }, 244 ) 245 ctx := context.Background() 246 if err := executeSingerQuery(ctx, client.Single()); err != nil { 247 t.Fatal(err) 248 } 249} 250 251func TestClient_Single_NonRetryableErrorOnPartialResultSet(t *testing.T) { 252 t.Parallel() 253 server, client, teardown := setupMockedTestServer(t) 254 defer teardown() 255 256 // Add two errors that will be returned by the mock server when the client 257 // is trying to fetch a partial result set. The first error is retryable, 258 // the second is not. 259 260 // This error will automatically be retried. 261 server.TestSpanner.AddPartialResultSetError( 262 SelectSingerIDAlbumIDAlbumTitleFromAlbums, 263 PartialResultSetExecutionTime{ 264 ResumeToken: EncodeResumeToken(2), 265 Err: status.Errorf(codes.Internal, "stream terminated by RST_STREAM"), 266 }, 267 ) 268 // 'Session not found' is not retryable and the error will be returned to 269 // the user. 270 server.TestSpanner.AddPartialResultSetError( 271 SelectSingerIDAlbumIDAlbumTitleFromAlbums, 272 PartialResultSetExecutionTime{ 273 ResumeToken: EncodeResumeToken(3), 274 Err: newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"), 275 }, 276 ) 277 ctx := context.Background() 278 err := executeSingerQuery(ctx, client.Single()) 279 if status.Code(err) != codes.NotFound { 280 t.Fatalf("Error mismatch:\ngot: %v\nwant: %v", err, codes.NotFound) 281 } 282} 283 284func TestClient_Single_NonRetryableInternalErrors(t *testing.T) { 285 t.Parallel() 286 server, client, teardown := setupMockedTestServer(t) 287 defer teardown() 288 289 server.TestSpanner.AddPartialResultSetError( 290 SelectSingerIDAlbumIDAlbumTitleFromAlbums, 291 PartialResultSetExecutionTime{ 292 ResumeToken: EncodeResumeToken(2), 293 Err: status.Errorf(codes.Internal, "grpc: error while marshaling: string field contains invalid UTF-8"), 294 }, 295 ) 296 ctx := context.Background() 297 err := executeSingerQuery(ctx, client.Single()) 298 if status.Code(err) != codes.Internal { 299 t.Fatalf("Error mismatch:\ngot: %v\nwant: %v", err, codes.Internal) 300 } 301} 302 303func TestClient_Single_DeadlineExceeded_NoErrors(t *testing.T) { 304 t.Parallel() 305 server, client, teardown := setupMockedTestServer(t) 306 defer teardown() 307 server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql, 308 SimulatedExecutionTime{ 309 MinimumExecutionTime: 50 * time.Millisecond, 310 }) 311 ctx := context.Background() 312 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(5*time.Millisecond)) 313 defer cancel() 314 err := executeSingerQuery(ctx, client.Single()) 315 if status.Code(err) != codes.DeadlineExceeded { 316 t.Fatalf("Error mismatch:\ngot: %v\nwant: %v", err, codes.DeadlineExceeded) 317 } 318} 319 320func TestClient_Single_DeadlineExceeded_WithErrors(t *testing.T) { 321 t.Parallel() 322 server, client, teardown := setupMockedTestServer(t) 323 defer teardown() 324 server.TestSpanner.AddPartialResultSetError( 325 SelectSingerIDAlbumIDAlbumTitleFromAlbums, 326 PartialResultSetExecutionTime{ 327 ResumeToken: EncodeResumeToken(2), 328 Err: status.Errorf(codes.Internal, "stream terminated by RST_STREAM"), 329 }, 330 ) 331 server.TestSpanner.AddPartialResultSetError( 332 SelectSingerIDAlbumIDAlbumTitleFromAlbums, 333 PartialResultSetExecutionTime{ 334 ResumeToken: EncodeResumeToken(3), 335 Err: status.Errorf(codes.Unavailable, "server is unavailable"), 336 ExecutionTime: 50 * time.Millisecond, 337 }, 338 ) 339 ctx := context.Background() 340 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(25*time.Millisecond)) 341 defer cancel() 342 err := executeSingerQuery(ctx, client.Single()) 343 if status.Code(err) != codes.DeadlineExceeded { 344 t.Fatalf("got unexpected error %v, expected DeadlineExceeded", err) 345 } 346} 347 348func TestClient_Single_ContextCanceled_noDeclaredServerErrors(t *testing.T) { 349 t.Parallel() 350 _, client, teardown := setupMockedTestServer(t) 351 defer teardown() 352 ctx := context.Background() 353 ctx, cancel := context.WithCancel(ctx) 354 cancel() 355 err := executeSingerQuery(ctx, client.Single()) 356 if status.Code(err) != codes.Canceled { 357 t.Fatalf("got unexpected error %v, expected Canceled", err) 358 } 359} 360 361func TestClient_Single_ContextCanceled_withDeclaredServerErrors(t *testing.T) { 362 t.Parallel() 363 server, client, teardown := setupMockedTestServer(t) 364 defer teardown() 365 server.TestSpanner.AddPartialResultSetError( 366 SelectSingerIDAlbumIDAlbumTitleFromAlbums, 367 PartialResultSetExecutionTime{ 368 ResumeToken: EncodeResumeToken(2), 369 Err: status.Errorf(codes.Internal, "stream terminated by RST_STREAM"), 370 }, 371 ) 372 server.TestSpanner.AddPartialResultSetError( 373 SelectSingerIDAlbumIDAlbumTitleFromAlbums, 374 PartialResultSetExecutionTime{ 375 ResumeToken: EncodeResumeToken(3), 376 Err: status.Errorf(codes.Unavailable, "server is unavailable"), 377 }, 378 ) 379 ctx := context.Background() 380 ctx, cancel := context.WithCancel(ctx) 381 defer cancel() 382 f := func(rowCount int64) error { 383 if rowCount == 2 { 384 cancel() 385 } 386 return nil 387 } 388 iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 389 defer iter.Stop() 390 err := executeSingerQueryWithRowFunc(ctx, client.Single(), f) 391 if status.Code(err) != codes.Canceled { 392 t.Fatalf("got unexpected error %v, expected Canceled", err) 393 } 394} 395 396func TestClient_Single_QueryOptions(t *testing.T) { 397 for _, tt := range queryOptionsTestCases() { 398 t.Run(tt.name, func(t *testing.T) { 399 if tt.env.Options != nil { 400 unset := setQueryOptionsEnvVars(tt.env.Options) 401 defer unset() 402 } 403 404 ctx := context.Background() 405 server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: tt.client}) 406 defer teardown() 407 408 var iter *RowIterator 409 if tt.query.Options == nil { 410 iter = client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 411 } else { 412 iter = client.Single().QueryWithOptions(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), tt.query) 413 } 414 testQueryOptions(t, iter, server.TestSpanner, tt.want) 415 }) 416 } 417} 418 419func TestClient_ReturnDatabaseName(t *testing.T) { 420 t.Parallel() 421 422 _, client, teardown := setupMockedTestServer(t) 423 defer teardown() 424 425 got := client.DatabaseName() 426 want := "projects/[PROJECT]/instances/[INSTANCE]/databases/[DATABASE]" 427 if got != want { 428 t.Fatalf("Incorrect database name returned, got: %s, want: %s", got, want) 429 } 430} 431 432func testQueryOptions(t *testing.T, iter *RowIterator, server InMemSpannerServer, qo QueryOptions) { 433 defer iter.Stop() 434 435 _, err := iter.Next() 436 if err != nil { 437 t.Fatalf("Failed to read from the iterator: %v", err) 438 } 439 440 checkReqsForQueryOptions(t, server, qo) 441} 442 443func checkReqsForQueryOptions(t *testing.T, server InMemSpannerServer, qo QueryOptions) { 444 reqs := drainRequestsFromServer(server) 445 sqlReqs := []*sppb.ExecuteSqlRequest{} 446 447 for _, req := range reqs { 448 if sqlReq, ok := req.(*sppb.ExecuteSqlRequest); ok { 449 sqlReqs = append(sqlReqs, sqlReq) 450 } 451 } 452 453 if got, want := len(sqlReqs), 1; got != want { 454 t.Fatalf("Length mismatch, got %v, want %v", got, want) 455 } 456 457 reqQueryOptions := sqlReqs[0].QueryOptions 458 if got, want := reqQueryOptions.OptimizerVersion, qo.Options.OptimizerVersion; got != want { 459 t.Fatalf("Optimizer version mismatch, got %v, want %v", got, want) 460 } 461 if got, want := reqQueryOptions.OptimizerStatisticsPackage, qo.Options.OptimizerStatisticsPackage; got != want { 462 t.Fatalf("Optimizer statistics package mismatch, got %v, want %v", got, want) 463 } 464} 465 466func testSingleQuery(t *testing.T, serverError error) error { 467 ctx := context.Background() 468 server, client, teardown := setupMockedTestServer(t) 469 defer teardown() 470 if serverError != nil { 471 server.TestSpanner.SetError(serverError) 472 } 473 return executeSingerQuery(ctx, client.Single()) 474} 475 476func executeSingerQuery(ctx context.Context, tx *ReadOnlyTransaction) error { 477 return executeSingerQueryWithRowFunc(ctx, tx, nil) 478} 479 480func executeSingerQueryWithRowFunc(ctx context.Context, tx *ReadOnlyTransaction, f func(rowCount int64) error) error { 481 iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 482 defer iter.Stop() 483 rowCount := int64(0) 484 for { 485 row, err := iter.Next() 486 if err == iterator.Done { 487 break 488 } 489 if err != nil { 490 return err 491 } 492 var singerID, albumID int64 493 var albumTitle string 494 if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil { 495 return err 496 } 497 rowCount++ 498 if f != nil { 499 if err := f(rowCount); err != nil { 500 return err 501 } 502 } 503 } 504 if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount { 505 return status.Errorf(codes.Internal, "Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount) 506 } 507 return nil 508} 509 510func createSimulatedExecutionTimeWithTwoUnavailableErrors(method string) map[string]SimulatedExecutionTime { 511 errors := make([]error, 2) 512 errors[0] = status.Error(codes.Unavailable, "Temporary unavailable") 513 errors[1] = status.Error(codes.Unavailable, "Temporary unavailable") 514 executionTimes := make(map[string]SimulatedExecutionTime) 515 executionTimes[method] = SimulatedExecutionTime{ 516 Errors: errors, 517 } 518 return executionTimes 519} 520 521func TestClient_ReadOnlyTransaction(t *testing.T) { 522 t.Parallel() 523 if err := testReadOnlyTransaction(t, make(map[string]SimulatedExecutionTime)); err != nil { 524 t.Fatal(err) 525 } 526} 527 528func TestClient_ReadOnlyTransaction_UnavailableOnSessionCreate(t *testing.T) { 529 t.Parallel() 530 if err := testReadOnlyTransaction(t, createSimulatedExecutionTimeWithTwoUnavailableErrors(MethodCreateSession)); err != nil { 531 t.Fatal(err) 532 } 533} 534 535func TestClient_ReadOnlyTransaction_UnavailableOnBeginTransaction(t *testing.T) { 536 t.Parallel() 537 if err := testReadOnlyTransaction(t, createSimulatedExecutionTimeWithTwoUnavailableErrors(MethodBeginTransaction)); err != nil { 538 t.Fatal(err) 539 } 540} 541 542func TestClient_ReadOnlyTransaction_UnavailableOnExecuteStreamingSql(t *testing.T) { 543 t.Parallel() 544 if err := testReadOnlyTransaction(t, createSimulatedExecutionTimeWithTwoUnavailableErrors(MethodExecuteStreamingSql)); err != nil { 545 t.Fatal(err) 546 } 547} 548 549func TestClient_ReadOnlyTransaction_SessionNotFoundOnExecuteStreamingSql(t *testing.T) { 550 t.Parallel() 551 // Session not found is not retryable for a query on a multi-use read-only 552 // transaction, as we would need to start a new transaction on a new 553 // session. 554 err := testReadOnlyTransaction(t, map[string]SimulatedExecutionTime{ 555 MethodExecuteStreamingSql: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, 556 }) 557 want := ToSpannerError(newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")) 558 if err == nil { 559 t.Fatalf("missing expected error\nGot: nil\nWant: %v", want) 560 } 561 if status.Code(err) != status.Code(want) || !strings.Contains(err.Error(), want.Error()) { 562 t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, want) 563 } 564} 565 566func TestClient_ReadOnlyTransaction_UnavailableOnCreateSessionAndBeginTransaction(t *testing.T) { 567 t.Parallel() 568 exec := map[string]SimulatedExecutionTime{ 569 MethodCreateSession: {Errors: []error{status.Error(codes.Unavailable, "Temporary unavailable")}}, 570 MethodBeginTransaction: {Errors: []error{status.Error(codes.Unavailable, "Temporary unavailable")}}, 571 } 572 if err := testReadOnlyTransaction(t, exec); err != nil { 573 t.Fatal(err) 574 } 575} 576 577func TestClient_ReadOnlyTransaction_UnavailableOnCreateSessionAndInvalidArgumentOnBeginTransaction(t *testing.T) { 578 t.Parallel() 579 exec := map[string]SimulatedExecutionTime{ 580 MethodCreateSession: {Errors: []error{status.Error(codes.Unavailable, "Temporary unavailable")}}, 581 MethodBeginTransaction: {Errors: []error{status.Error(codes.InvalidArgument, "Invalid argument")}}, 582 } 583 if err := testReadOnlyTransaction(t, exec); err == nil { 584 t.Fatalf("Missing expected exception") 585 } else if status.Code(err) != codes.InvalidArgument { 586 t.Fatalf("Got unexpected exception: %v", err) 587 } 588} 589 590func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction(t *testing.T) { 591 t.Parallel() 592 if err := testReadOnlyTransaction( 593 t, 594 map[string]SimulatedExecutionTime{ 595 MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, 596 }, 597 ); err != nil { 598 t.Fatal(err) 599 } 600} 601 602func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction_WithMaxOneSession(t *testing.T) { 603 t.Parallel() 604 server, client, teardown := setupMockedTestServerWithConfig( 605 t, 606 ClientConfig{ 607 SessionPoolConfig: SessionPoolConfig{ 608 MinOpened: 0, 609 MaxOpened: 1, 610 }, 611 }) 612 defer teardown() 613 server.TestSpanner.PutExecutionTime( 614 MethodBeginTransaction, 615 SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, 616 ) 617 tx := client.ReadOnlyTransaction() 618 defer tx.Close() 619 ctx := context.Background() 620 if err := executeSingerQuery(ctx, tx); err != nil { 621 t.Fatal(err) 622 } 623} 624 625func TestClient_ReadOnlyTransaction_QueryOptions(t *testing.T) { 626 for _, tt := range queryOptionsTestCases() { 627 t.Run(tt.name, func(t *testing.T) { 628 if tt.env.Options != nil { 629 unset := setQueryOptionsEnvVars(tt.env.Options) 630 defer unset() 631 } 632 633 ctx := context.Background() 634 server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: tt.client}) 635 defer teardown() 636 637 tx := client.ReadOnlyTransaction() 638 defer tx.Close() 639 640 var iter *RowIterator 641 if tt.query.Options == nil { 642 iter = tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 643 } else { 644 iter = tx.QueryWithOptions(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), tt.query) 645 } 646 testQueryOptions(t, iter, server.TestSpanner, tt.want) 647 }) 648 } 649} 650 651func setQueryOptionsEnvVars(opts *sppb.ExecuteSqlRequest_QueryOptions) func() { 652 os.Setenv("SPANNER_OPTIMIZER_VERSION", opts.OptimizerVersion) 653 os.Setenv("SPANNER_OPTIMIZER_STATISTICS_PACKAGE", opts.OptimizerStatisticsPackage) 654 return func() { 655 defer os.Setenv("SPANNER_OPTIMIZER_VERSION", "") 656 defer os.Setenv("SPANNER_OPTIMIZER_STATISTICS_PACKAGE", "") 657 } 658} 659 660func testReadOnlyTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime) error { 661 server, client, teardown := setupMockedTestServer(t) 662 defer teardown() 663 for method, exec := range executionTimes { 664 server.TestSpanner.PutExecutionTime(method, exec) 665 } 666 tx := client.ReadOnlyTransaction() 667 defer tx.Close() 668 ctx := context.Background() 669 return executeSingerQuery(ctx, tx) 670} 671 672func TestClient_ReadWriteTransaction(t *testing.T) { 673 t.Parallel() 674 if err := testReadWriteTransaction(t, make(map[string]SimulatedExecutionTime), 1); err != nil { 675 t.Fatal(err) 676 } 677} 678 679func TestClient_ReadWriteTransactionCommitAborted(t *testing.T) { 680 t.Parallel() 681 if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{ 682 MethodCommitTransaction: {Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}}, 683 }, 2); err != nil { 684 t.Fatal(err) 685 } 686} 687 688func TestClient_ReadWriteTransaction_SessionNotFoundOnCommit(t *testing.T) { 689 t.Parallel() 690 if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{ 691 MethodCommitTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, 692 }, 2); err != nil { 693 t.Fatal(err) 694 } 695} 696 697func TestClient_ReadWriteTransaction_SessionNotFoundOnBeginTransaction(t *testing.T) { 698 t.Parallel() 699 // We expect only 1 attempt, as the 'Session not found' error is already 700 //handled in the session pool where the session is prepared. 701 if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{ 702 MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, 703 }, 1); err != nil { 704 t.Fatal(err) 705 } 706} 707 708func TestClient_ReadWriteTransaction_SessionNotFoundOnBeginTransactionWithEmptySessionPool(t *testing.T) { 709 t.Parallel() 710 // There will be no prepared sessions in the pool, so the error will occur 711 // when the transaction tries to get a session from the pool. This will 712 // also be handled by the session pool, so the transaction itself does not 713 // need to retry, hence the expectedAttempts == 1. 714 if err := testReadWriteTransactionWithConfig(t, ClientConfig{ 715 SessionPoolConfig: SessionPoolConfig{WriteSessions: 0.0}, 716 }, map[string]SimulatedExecutionTime{ 717 MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, 718 }, 1); err != nil { 719 t.Fatal(err) 720 } 721} 722 723func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteStreamingSql(t *testing.T) { 724 t.Parallel() 725 if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{ 726 MethodExecuteStreamingSql: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, 727 }, 2); err != nil { 728 t.Fatal(err) 729 } 730} 731 732func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteUpdate(t *testing.T) { 733 t.Parallel() 734 735 server, client, teardown := setupMockedTestServer(t) 736 defer teardown() 737 server.TestSpanner.PutExecutionTime( 738 MethodExecuteSql, 739 SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, 740 ) 741 ctx := context.Background() 742 var attempts int 743 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 744 attempts++ 745 rowCount, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo)) 746 if err != nil { 747 return err 748 } 749 if g, w := rowCount, int64(UpdateBarSetFooRowCount); g != w { 750 return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w) 751 } 752 return nil 753 }) 754 if err != nil { 755 t.Fatal(err) 756 } 757 if g, w := attempts, 2; g != w { 758 t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w) 759 } 760} 761 762func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *testing.T) { 763 t.Parallel() 764 765 server, client, teardown := setupMockedTestServer(t) 766 defer teardown() 767 server.TestSpanner.PutExecutionTime( 768 MethodExecuteBatchDml, 769 SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, 770 ) 771 ctx := context.Background() 772 var attempts int 773 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 774 attempts++ 775 rowCounts, err := tx.BatchUpdate(ctx, []Statement{NewStatement(UpdateBarSetFoo)}) 776 if err != nil { 777 return err 778 } 779 if g, w := len(rowCounts), 1; g != w { 780 return status.Errorf(codes.FailedPrecondition, "Row counts length mismatch\nGot: %v\nWant: %v", g, w) 781 } 782 if g, w := rowCounts[0], int64(UpdateBarSetFooRowCount); g != w { 783 return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w) 784 } 785 return nil 786 }) 787 if err != nil { 788 t.Fatal(err) 789 } 790 if g, w := attempts, 2; g != w { 791 t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w) 792 } 793} 794 795func TestClient_ReadWriteTransaction_Query_QueryOptions(t *testing.T) { 796 for _, tt := range queryOptionsTestCases() { 797 t.Run(tt.name, func(t *testing.T) { 798 if tt.env.Options != nil { 799 unset := setQueryOptionsEnvVars(tt.env.Options) 800 defer unset() 801 } 802 803 ctx := context.Background() 804 server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: tt.client}) 805 defer teardown() 806 807 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 808 var iter *RowIterator 809 if tt.query.Options == nil { 810 iter = tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 811 } else { 812 iter = tx.QueryWithOptions(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), tt.query) 813 } 814 testQueryOptions(t, iter, server.TestSpanner, tt.want) 815 return nil 816 }) 817 if err != nil { 818 t.Fatal(err) 819 } 820 }) 821 } 822} 823 824func TestClient_ReadWriteTransaction_Update_QueryOptions(t *testing.T) { 825 for _, tt := range queryOptionsTestCases() { 826 t.Run(tt.name, func(t *testing.T) { 827 if tt.env.Options != nil { 828 unset := setQueryOptionsEnvVars(tt.env.Options) 829 defer unset() 830 } 831 832 ctx := context.Background() 833 server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: tt.client}) 834 defer teardown() 835 836 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 837 var rowCount int64 838 var err error 839 if tt.query.Options == nil { 840 rowCount, err = tx.Update(ctx, NewStatement(UpdateBarSetFoo)) 841 } else { 842 rowCount, err = tx.UpdateWithOptions(ctx, NewStatement(UpdateBarSetFoo), tt.query) 843 } 844 if got, want := rowCount, int64(5); got != want { 845 t.Fatalf("Incorrect updated row count: got %v, want %v", got, want) 846 } 847 return err 848 }) 849 if err != nil { 850 t.Fatalf("Failed to update rows: %v", err) 851 } 852 checkReqsForQueryOptions(t, server.TestSpanner, tt.want) 853 }) 854 } 855} 856 857func TestClient_ReadWriteTransactionWithOptions(t *testing.T) { 858 _, client, teardown := setupMockedTestServer(t) 859 defer teardown() 860 ctx := context.Background() 861 resp, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 862 iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 863 defer iter.Stop() 864 rowCount := int64(0) 865 for { 866 row, err := iter.Next() 867 if err == iterator.Done { 868 break 869 } 870 if err != nil { 871 return err 872 } 873 var singerID, albumID int64 874 var albumTitle string 875 if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil { 876 return err 877 } 878 rowCount++ 879 } 880 if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount { 881 return status.Errorf(codes.FailedPrecondition, "Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount) 882 } 883 return nil 884 }, TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}) 885 if err != nil { 886 t.Fatalf("Failed to execute the transaction: %s", err) 887 } 888 if got, want := resp.CommitStats.MutationCount, int64(1); got != want { 889 t.Fatalf("Mismatch mutation count - got: %d, want: %d", got, want) 890 } 891} 892 893func TestClient_ReadWriteStmtBasedTransactionWithOptions(t *testing.T) { 894 _, client, teardown := setupMockedTestServer(t) 895 defer teardown() 896 ctx := context.Background() 897 tx, err := NewReadWriteStmtBasedTransactionWithOptions( 898 ctx, 899 client, 900 TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}) 901 if err != nil { 902 t.Fatalf("Unexpected error when creating transaction: %v", err) 903 } 904 905 iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 906 defer iter.Stop() 907 rowCount := int64(0) 908 for { 909 row, err := iter.Next() 910 if err == iterator.Done { 911 break 912 } 913 if err != nil { 914 t.Fatalf("Unexpected error when fetching query results: %v", err) 915 } 916 var singerID, albumID int64 917 var albumTitle string 918 if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil { 919 t.Fatalf("Unexpected error when getting query data: %v", err) 920 } 921 rowCount++ 922 } 923 resp, err := tx.CommitWithReturnResp(ctx) 924 if err != nil { 925 t.Fatalf("Unexpected error when committing transaction: %v", err) 926 } 927 if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount { 928 t.Errorf("Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount) 929 } 930 if got, want := resp.CommitStats.MutationCount, int64(1); got != want { 931 t.Fatalf("Mismatch mutation count - got: %d, want: %d", got, want) 932 } 933} 934 935func TestClient_ReadWriteTransaction_DoNotLeakSessionOnPanic(t *testing.T) { 936 // Make sure that there is always only one session in the pool. 937 sc := SessionPoolConfig{ 938 MinOpened: 1, 939 MaxOpened: 1, 940 } 941 _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{SessionPoolConfig: sc}) 942 defer teardown() 943 ctx := context.Background() 944 945 // If a panic occurs during a transaction, the session will not leak. 946 func() { 947 defer func() { recover() }() 948 949 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 950 panic("cause panic") 951 return nil 952 }) 953 if err != nil { 954 t.Fatalf("Unexpected error during transaction: %v", err) 955 } 956 }() 957 958 if g, w := client.idleSessions.idleList.Len(), 1; g != w { 959 t.Fatalf("idle session count mismatch.\nGot: %v\nWant: %v", g, w) 960 } 961} 962 963func TestClient_SessionNotFound(t *testing.T) { 964 // Ensure we always have at least one session in the pool. 965 sc := SessionPoolConfig{ 966 MinOpened: 1, 967 } 968 server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{SessionPoolConfig: sc}) 969 defer teardown() 970 ctx := context.Background() 971 for { 972 client.idleSessions.mu.Lock() 973 numSessions := client.idleSessions.idleList.Len() 974 client.idleSessions.mu.Unlock() 975 if numSessions > 0 { 976 break 977 } 978 time.After(time.Millisecond) 979 } 980 // Remove the session from the server without the pool knowing it. 981 _, err := server.TestSpanner.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: client.idleSessions.idleList.Front().Value.(*session).id}) 982 if err != nil { 983 t.Fatalf("Failed to delete session unexpectedly: %v", err) 984 } 985 986 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 987 iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 988 defer iter.Stop() 989 rowCount := int64(0) 990 for { 991 row, err := iter.Next() 992 if err == iterator.Done { 993 break 994 } 995 if err != nil { 996 return err 997 } 998 var singerID, albumID int64 999 var albumTitle string 1000 if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil { 1001 return err 1002 } 1003 rowCount++ 1004 } 1005 if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount { 1006 return spannerErrorf(codes.FailedPrecondition, "Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount) 1007 } 1008 return nil 1009 }) 1010 if err != nil { 1011 t.Fatalf("Unexpected error during transaction: %v", err) 1012 } 1013} 1014 1015func TestClient_ReadWriteTransactionExecuteStreamingSqlAborted(t *testing.T) { 1016 t.Parallel() 1017 if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{ 1018 MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}}, 1019 }, 2); err != nil { 1020 t.Fatal(err) 1021 } 1022} 1023 1024func TestClient_ReadWriteTransaction_UnavailableOnBeginTransaction(t *testing.T) { 1025 t.Parallel() 1026 if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{ 1027 MethodBeginTransaction: {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}}, 1028 }, 1); err != nil { 1029 t.Fatal(err) 1030 } 1031} 1032 1033func TestClient_ReadWriteTransaction_UnavailableOnBeginAndAbortOnCommit(t *testing.T) { 1034 if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{ 1035 MethodBeginTransaction: {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}}, 1036 MethodCommitTransaction: {Errors: []error{status.Error(codes.Aborted, "Aborted")}}, 1037 }, 2); err != nil { 1038 t.Fatal(err) 1039 } 1040} 1041 1042func TestClient_ReadWriteTransaction_UnavailableOnExecuteStreamingSql(t *testing.T) { 1043 t.Parallel() 1044 if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{ 1045 MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}}, 1046 }, 1); err != nil { 1047 t.Fatal(err) 1048 } 1049} 1050 1051func TestClient_ReadWriteTransaction_UnavailableOnBeginAndExecuteStreamingSqlAndTwiceAbortOnCommit(t *testing.T) { 1052 t.Parallel() 1053 if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{ 1054 MethodBeginTransaction: {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}}, 1055 MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}}, 1056 MethodCommitTransaction: {Errors: []error{status.Error(codes.Aborted, "Aborted"), status.Error(codes.Aborted, "Aborted")}}, 1057 }, 3); err != nil { 1058 t.Fatal(err) 1059 } 1060} 1061 1062func TestClient_ReadWriteTransaction_CommitAborted(t *testing.T) { 1063 t.Parallel() 1064 server, client, teardown := setupMockedTestServer(t) 1065 server.TestSpanner.PutExecutionTime(MethodCommitTransaction, SimulatedExecutionTime{ 1066 Errors: []error{status.Error(codes.Aborted, "Aborted")}, 1067 }) 1068 defer teardown() 1069 ctx := context.Background() 1070 attempts := 0 1071 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1072 attempts++ 1073 _, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}) 1074 if err != nil { 1075 return err 1076 } 1077 return nil 1078 }) 1079 if err != nil { 1080 t.Fatal(err) 1081 } 1082 if g, w := attempts, 2; g != w { 1083 t.Fatalf("attempt count mismatch:\nWant: %v\nGot: %v", w, g) 1084 } 1085} 1086 1087func TestClient_ReadWriteTransaction_DMLAborted(t *testing.T) { 1088 t.Parallel() 1089 server, client, teardown := setupMockedTestServer(t) 1090 server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{ 1091 Errors: []error{status.Error(codes.Aborted, "Aborted")}, 1092 }) 1093 defer teardown() 1094 ctx := context.Background() 1095 attempts := 0 1096 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1097 attempts++ 1098 _, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}) 1099 if err != nil { 1100 return err 1101 } 1102 return nil 1103 }) 1104 if err != nil { 1105 t.Fatal(err) 1106 } 1107 if g, w := attempts, 2; g != w { 1108 t.Fatalf("attempt count mismatch:\nWant: %v\nGot: %v", w, g) 1109 } 1110} 1111 1112func TestClient_ReadWriteTransaction_BatchDMLAborted(t *testing.T) { 1113 t.Parallel() 1114 server, client, teardown := setupMockedTestServer(t) 1115 server.TestSpanner.PutExecutionTime(MethodExecuteBatchDml, SimulatedExecutionTime{ 1116 Errors: []error{status.Error(codes.Aborted, "Aborted")}, 1117 }) 1118 defer teardown() 1119 ctx := context.Background() 1120 attempts := 0 1121 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1122 attempts++ 1123 _, err := tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}}) 1124 if err != nil { 1125 return err 1126 } 1127 return nil 1128 }) 1129 if err != nil { 1130 t.Fatal(err) 1131 } 1132 if g, w := attempts, 2; g != w { 1133 t.Fatalf("attempt count mismatch:\nWant: %v\nGot: %v", w, g) 1134 } 1135} 1136 1137func TestClient_ReadWriteTransaction_BatchDMLAbortedHalfway(t *testing.T) { 1138 t.Parallel() 1139 server, client, teardown := setupMockedTestServer(t) 1140 defer teardown() 1141 abortedStatement := "UPDATE FOO_ABORTED SET BAR=1 WHERE ID=2" 1142 server.TestSpanner.PutStatementResult( 1143 abortedStatement, 1144 &StatementResult{ 1145 Type: StatementResultError, 1146 Err: status.Error(codes.Aborted, "Statement was aborted"), 1147 }, 1148 ) 1149 ctx := context.Background() 1150 var updateCounts []int64 1151 attempts := 0 1152 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1153 attempts++ 1154 if attempts > 1 { 1155 // Replace the aborted result with a real result to prevent the 1156 // transaction from aborting indefinitely. 1157 server.TestSpanner.PutStatementResult( 1158 abortedStatement, 1159 &StatementResult{ 1160 Type: StatementResultUpdateCount, 1161 UpdateCount: 3, 1162 }, 1163 ) 1164 } 1165 var err error 1166 updateCounts, err = tx.BatchUpdate(ctx, []Statement{{SQL: abortedStatement}, {SQL: UpdateBarSetFoo}}) 1167 if err != nil { 1168 return err 1169 } 1170 return nil 1171 }) 1172 if err != nil { 1173 t.Fatal(err) 1174 } 1175 if g, w := attempts, 2; g != w { 1176 t.Fatalf("attempt count mismatch:\nWant: %v\nGot: %v", w, g) 1177 } 1178 if g, w := updateCounts, []int64{3, UpdateBarSetFooRowCount}; !testEqual(w, g) { 1179 t.Fatalf("update count mismatch\nWant: %v\nGot: %v", w, g) 1180 } 1181} 1182 1183func TestClient_ReadWriteTransaction_QueryAborted(t *testing.T) { 1184 t.Parallel() 1185 server, client, teardown := setupMockedTestServer(t) 1186 server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql, SimulatedExecutionTime{ 1187 Errors: []error{status.Error(codes.Aborted, "Aborted")}, 1188 }) 1189 defer teardown() 1190 ctx := context.Background() 1191 attempts := 0 1192 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1193 attempts++ 1194 iter := tx.Query(ctx, Statement{SQL: SelectFooFromBar}) 1195 defer iter.Stop() 1196 for { 1197 _, err := iter.Next() 1198 if err == iterator.Done { 1199 break 1200 } 1201 if err != nil { 1202 return err 1203 } 1204 } 1205 return nil 1206 }) 1207 if err != nil { 1208 t.Fatal(err) 1209 } 1210 if g, w := attempts, 2; g != w { 1211 t.Fatalf("attempt count mismatch:\nWant: %v\nGot: %v", w, g) 1212 } 1213} 1214 1215func TestClient_ReadWriteTransaction_AbortedOnExecuteStreamingSqlAndCommit(t *testing.T) { 1216 t.Parallel() 1217 if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{ 1218 MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.Aborted, "Aborted")}}, 1219 MethodCommitTransaction: {Errors: []error{status.Error(codes.Aborted, "Aborted"), status.Error(codes.Aborted, "Aborted")}}, 1220 }, 4); err != nil { 1221 t.Fatal(err) 1222 } 1223} 1224 1225func TestClient_ReadWriteTransactionCommitAbortedAndUnavailable(t *testing.T) { 1226 t.Parallel() 1227 if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{ 1228 MethodCommitTransaction: { 1229 Errors: []error{ 1230 status.Error(codes.Aborted, "Transaction aborted"), 1231 status.Error(codes.Unavailable, "Unavailable"), 1232 }, 1233 }, 1234 }, 2); err != nil { 1235 t.Fatal(err) 1236 } 1237} 1238 1239func TestClient_ReadWriteTransactionCommitAlreadyExists(t *testing.T) { 1240 t.Parallel() 1241 if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{ 1242 MethodCommitTransaction: {Errors: []error{status.Error(codes.AlreadyExists, "A row with this key already exists")}}, 1243 }, 1); err != nil { 1244 if status.Code(err) != codes.AlreadyExists { 1245 t.Fatalf("Got unexpected error %v, expected %v", err, codes.AlreadyExists) 1246 } 1247 } else { 1248 t.Fatalf("Missing expected exception") 1249 } 1250} 1251 1252func testReadWriteTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime, expectedAttempts int) error { 1253 return testReadWriteTransactionWithConfig(t, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, executionTimes, expectedAttempts) 1254} 1255 1256func testReadWriteTransactionWithConfig(t *testing.T, config ClientConfig, executionTimes map[string]SimulatedExecutionTime, expectedAttempts int) error { 1257 server, client, teardown := setupMockedTestServer(t) 1258 defer teardown() 1259 for method, exec := range executionTimes { 1260 server.TestSpanner.PutExecutionTime(method, exec) 1261 } 1262 ctx := context.Background() 1263 var attempts int 1264 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1265 attempts++ 1266 iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 1267 defer iter.Stop() 1268 rowCount := int64(0) 1269 for { 1270 row, err := iter.Next() 1271 if err == iterator.Done { 1272 break 1273 } 1274 if err != nil { 1275 return err 1276 } 1277 var singerID, albumID int64 1278 var albumTitle string 1279 if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil { 1280 return err 1281 } 1282 rowCount++ 1283 } 1284 if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount { 1285 return status.Errorf(codes.FailedPrecondition, "Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount) 1286 } 1287 return nil 1288 }) 1289 if err != nil { 1290 return err 1291 } 1292 if expectedAttempts != attempts { 1293 t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts) 1294 } 1295 return nil 1296} 1297 1298func TestClient_ApplyAtLeastOnce(t *testing.T) { 1299 t.Parallel() 1300 server, client, teardown := setupMockedTestServer(t) 1301 defer teardown() 1302 ms := []*Mutation{ 1303 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 1304 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 1305 } 1306 server.TestSpanner.PutExecutionTime(MethodCommitTransaction, 1307 SimulatedExecutionTime{ 1308 Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}, 1309 }) 1310 _, err := client.Apply(context.Background(), ms, ApplyAtLeastOnce()) 1311 if err != nil { 1312 t.Fatal(err) 1313 } 1314} 1315 1316func TestClient_ApplyAtLeastOnceReuseSession(t *testing.T) { 1317 t.Parallel() 1318 server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ 1319 SessionPoolConfig: SessionPoolConfig{ 1320 MinOpened: 0, 1321 WriteSessions: 0.0, 1322 TrackSessionHandles: true, 1323 }, 1324 }) 1325 defer teardown() 1326 sp := client.idleSessions 1327 ms := []*Mutation{ 1328 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 1329 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 1330 } 1331 for i := 0; i < 10; i++ { 1332 _, err := client.Apply(context.Background(), ms, ApplyAtLeastOnce()) 1333 if err != nil { 1334 t.Fatal(err) 1335 } 1336 sp.mu.Lock() 1337 if g, w := uint64(sp.idleList.Len())+sp.createReqs, sp.incStep; g != w { 1338 t.Fatalf("idle session count mismatch:\nGot: %v\nWant: %v", g, w) 1339 } 1340 if g, w := uint64(len(server.TestSpanner.DumpSessions())), sp.incStep; g != w { 1341 t.Fatalf("server session count mismatch:\nGot: %v\nWant: %v", g, w) 1342 } 1343 sp.mu.Unlock() 1344 } 1345 // There should be no sessions marked as checked out. 1346 sp.mu.Lock() 1347 g, w := sp.trackedSessionHandles.Len(), 0 1348 sp.mu.Unlock() 1349 if g != w { 1350 t.Fatalf("checked out sessions count mismatch:\nGot: %v\nWant: %v", g, w) 1351 } 1352} 1353 1354func TestClient_ApplyAtLeastOnceInvalidArgument(t *testing.T) { 1355 t.Parallel() 1356 server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ 1357 SessionPoolConfig: SessionPoolConfig{ 1358 MinOpened: 0, 1359 WriteSessions: 0.0, 1360 TrackSessionHandles: true, 1361 }, 1362 }) 1363 defer teardown() 1364 sp := client.idleSessions 1365 ms := []*Mutation{ 1366 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 1367 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 1368 } 1369 for i := 0; i < 10; i++ { 1370 server.TestSpanner.PutExecutionTime(MethodCommitTransaction, 1371 SimulatedExecutionTime{ 1372 Errors: []error{status.Error(codes.InvalidArgument, "Invalid data")}, 1373 }) 1374 _, err := client.Apply(context.Background(), ms, ApplyAtLeastOnce()) 1375 if status.Code(err) != codes.InvalidArgument { 1376 t.Fatal(err) 1377 } 1378 sp.mu.Lock() 1379 if g, w := uint64(sp.idleList.Len())+sp.createReqs, sp.incStep; g != w { 1380 t.Fatalf("idle session count mismatch:\nGot: %v\nWant: %v", g, w) 1381 } 1382 if g, w := uint64(len(server.TestSpanner.DumpSessions())), sp.incStep; g != w { 1383 t.Fatalf("server session count mismatch:\nGot: %v\nWant: %v", g, w) 1384 } 1385 sp.mu.Unlock() 1386 } 1387 // There should be no sessions marked as checked out. 1388 client.idleSessions.mu.Lock() 1389 g, w := client.idleSessions.trackedSessionHandles.Len(), 0 1390 client.idleSessions.mu.Unlock() 1391 if g != w { 1392 t.Fatalf("checked out sessions count mismatch:\nGot: %v\nWant: %v", g, w) 1393 } 1394} 1395 1396func TestReadWriteTransaction_ErrUnexpectedEOF(t *testing.T) { 1397 t.Parallel() 1398 _, client, teardown := setupMockedTestServer(t) 1399 defer teardown() 1400 ctx := context.Background() 1401 var attempts int 1402 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1403 attempts++ 1404 iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 1405 defer iter.Stop() 1406 for { 1407 row, err := iter.Next() 1408 if err == iterator.Done { 1409 break 1410 } 1411 if err != nil { 1412 return err 1413 } 1414 var singerID, albumID int64 1415 var albumTitle string 1416 if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil { 1417 return err 1418 } 1419 } 1420 return io.ErrUnexpectedEOF 1421 }) 1422 if err != io.ErrUnexpectedEOF { 1423 t.Fatalf("Missing expected error %v, got %v", io.ErrUnexpectedEOF, err) 1424 } 1425 if attempts != 1 { 1426 t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, 1) 1427 } 1428} 1429 1430func TestReadWriteTransaction_WrapError(t *testing.T) { 1431 t.Parallel() 1432 server, client, teardown := setupMockedTestServer(t) 1433 defer teardown() 1434 // Abort the transaction on both the query as well as commit. 1435 // The first abort error will be wrapped. The client will unwrap the cause 1436 // of the error and retry the transaction. The aborted error on commit 1437 // will not be wrapped, but will also be recognized by the client as an 1438 // abort that should be retried. 1439 server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql, 1440 SimulatedExecutionTime{ 1441 Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}, 1442 }) 1443 server.TestSpanner.PutExecutionTime(MethodCommitTransaction, 1444 SimulatedExecutionTime{ 1445 Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}, 1446 }) 1447 msg := "query failed" 1448 numAttempts := 0 1449 ctx := context.Background() 1450 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1451 numAttempts++ 1452 iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 1453 defer iter.Stop() 1454 for { 1455 _, err := iter.Next() 1456 if err == iterator.Done { 1457 break 1458 } 1459 if err != nil { 1460 // Wrap the error in another error that implements the 1461 // (xerrors|errors).Wrapper interface. 1462 return &wrappedTestError{err, msg} 1463 } 1464 } 1465 return nil 1466 }) 1467 if err != nil { 1468 t.Fatalf("Unexpected error\nGot: %v\nWant: nil", err) 1469 } 1470 if g, w := numAttempts, 3; g != w { 1471 t.Fatalf("Number of transaction attempts mismatch\nGot: %d\nWant: %d", w, w) 1472 } 1473 1474 // Execute a transaction that returns a non-retryable error that is 1475 // wrapped in a custom error. The transaction should return the custom 1476 // error. 1477 server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql, 1478 SimulatedExecutionTime{ 1479 Errors: []error{status.Error(codes.NotFound, "Table not found")}, 1480 }) 1481 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1482 numAttempts++ 1483 iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 1484 defer iter.Stop() 1485 for { 1486 _, err := iter.Next() 1487 if err == iterator.Done { 1488 break 1489 } 1490 if err != nil { 1491 // Wrap the error in another error that implements the 1492 // (xerrors|errors).Wrapper interface. 1493 return &wrappedTestError{err, msg} 1494 } 1495 } 1496 return nil 1497 }) 1498 if err == nil || err.Error() != msg { 1499 t.Fatalf("Unexpected error\nGot: %v\nWant: %v", err, msg) 1500 } 1501} 1502 1503func TestReadWriteTransaction_WrapSessionNotFoundError(t *testing.T) { 1504 t.Parallel() 1505 server, client, teardown := setupMockedTestServer(t) 1506 defer teardown() 1507 server.TestSpanner.PutExecutionTime(MethodBeginTransaction, 1508 SimulatedExecutionTime{ 1509 Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}, 1510 }) 1511 server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql, 1512 SimulatedExecutionTime{ 1513 Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}, 1514 }) 1515 server.TestSpanner.PutExecutionTime(MethodCommitTransaction, 1516 SimulatedExecutionTime{ 1517 Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}, 1518 }) 1519 msg := "query failed" 1520 numAttempts := 0 1521 ctx := context.Background() 1522 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1523 numAttempts++ 1524 iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 1525 defer iter.Stop() 1526 for { 1527 _, err := iter.Next() 1528 if err == iterator.Done { 1529 break 1530 } 1531 if err != nil { 1532 // Wrap the error in another error that implements the 1533 // (xerrors|errors).Wrapper interface. 1534 return &wrappedTestError{err, msg} 1535 } 1536 } 1537 return nil 1538 }) 1539 if err != nil { 1540 t.Fatalf("Unexpected error\nGot: %v\nWant: nil", err) 1541 } 1542 // We want 3 attempts. The 'Session not found' error on BeginTransaction 1543 // will not retry the entire transaction, which means that we will have two 1544 // failed attempts and then a successful attempt. 1545 if g, w := numAttempts, 3; g != w { 1546 t.Fatalf("Number of transaction attempts mismatch\nGot: %d\nWant: %d", g, w) 1547 } 1548} 1549 1550func TestClient_WriteStructWithPointers(t *testing.T) { 1551 t.Parallel() 1552 server, client, teardown := setupMockedTestServer(t) 1553 defer teardown() 1554 type T struct { 1555 ID int64 1556 Col1 *string 1557 Col2 []*string 1558 Col3 *bool 1559 Col4 []*bool 1560 Col5 *int64 1561 Col6 []*int64 1562 Col7 *float64 1563 Col8 []*float64 1564 Col9 *time.Time 1565 Col10 []*time.Time 1566 Col11 *civil.Date 1567 Col12 []*civil.Date 1568 } 1569 t1 := T{ 1570 ID: 1, 1571 Col2: []*string{nil}, 1572 Col4: []*bool{nil}, 1573 Col6: []*int64{nil}, 1574 Col8: []*float64{nil}, 1575 Col10: []*time.Time{nil}, 1576 Col12: []*civil.Date{nil}, 1577 } 1578 s := "foo" 1579 b := true 1580 i := int64(100) 1581 f := 3.14 1582 tm := time.Now() 1583 d := civil.DateOf(time.Now()) 1584 t2 := T{ 1585 ID: 2, 1586 Col1: &s, 1587 Col2: []*string{&s}, 1588 Col3: &b, 1589 Col4: []*bool{&b}, 1590 Col5: &i, 1591 Col6: []*int64{&i}, 1592 Col7: &f, 1593 Col8: []*float64{&f}, 1594 Col9: &tm, 1595 Col10: []*time.Time{&tm}, 1596 Col11: &d, 1597 Col12: []*civil.Date{&d}, 1598 } 1599 m1, err := InsertStruct("Tab", &t1) 1600 if err != nil { 1601 t.Fatal(err) 1602 } 1603 m2, err := InsertStruct("Tab", &t2) 1604 if err != nil { 1605 t.Fatal(err) 1606 } 1607 _, err = client.Apply(context.Background(), []*Mutation{m1, m2}) 1608 if err != nil { 1609 t.Fatal(err) 1610 } 1611 requests := drainRequestsFromServer(server.TestSpanner) 1612 for _, req := range requests { 1613 if commit, ok := req.(*sppb.CommitRequest); ok { 1614 if g, w := len(commit.Mutations), 2; w != g { 1615 t.Fatalf("mutation count mismatch\nGot: %v\nWant: %v", g, w) 1616 } 1617 insert := commit.Mutations[0].GetInsert() 1618 // The first insert should contain NULL values and arrays 1619 // containing exactly one NULL element. 1620 for i := 1; i < len(insert.Values[0].Values); i += 2 { 1621 // The non-array columns should contain NULL values. 1622 g, w := insert.Values[0].Values[i].GetKind(), &structpb.Value_NullValue{} 1623 if _, ok := g.(*structpb.Value_NullValue); !ok { 1624 t.Fatalf("type mismatch\nGot: %v\nWant: %v", g, w) 1625 } 1626 // The array columns should not be NULL. 1627 g, wList := insert.Values[0].Values[i+1].GetKind(), &structpb.Value_ListValue{} 1628 if _, ok := g.(*structpb.Value_ListValue); !ok { 1629 t.Fatalf("type mismatch\nGot: %v\nWant: %v", g, wList) 1630 } 1631 // The array should contain 1 NULL value. 1632 if gLength, wLength := len(insert.Values[0].Values[i+1].GetListValue().Values), 1; gLength != wLength { 1633 t.Fatalf("list value length mismatch\nGot: %v\nWant: %v", gLength, wLength) 1634 } 1635 g, w = insert.Values[0].Values[i+1].GetListValue().Values[0].GetKind(), &structpb.Value_NullValue{} 1636 if _, ok := g.(*structpb.Value_NullValue); !ok { 1637 t.Fatalf("type mismatch\nGot: %v\nWant: %v", g, w) 1638 } 1639 } 1640 1641 // The second insert should contain all non-NULL values. 1642 insert = commit.Mutations[1].GetInsert() 1643 for i := 1; i < len(insert.Values[0].Values); i += 2 { 1644 // The non-array columns should contain non-NULL values. 1645 g := insert.Values[0].Values[i].GetKind() 1646 if _, ok := g.(*structpb.Value_NullValue); ok { 1647 t.Fatalf("type mismatch\nGot: %v\nWant: non-NULL value", g) 1648 } 1649 // The array columns should also be non-NULL. 1650 g, wList := insert.Values[0].Values[i+1].GetKind(), &structpb.Value_ListValue{} 1651 if _, ok := g.(*structpb.Value_ListValue); !ok { 1652 t.Fatalf("type mismatch\nGot: %v\nWant: %v", g, wList) 1653 } 1654 // The array should contain exactly 1 non-NULL value. 1655 if gLength, wLength := len(insert.Values[0].Values[i+1].GetListValue().Values), 1; gLength != wLength { 1656 t.Fatalf("list value length mismatch\nGot: %v\nWant: %v", gLength, wLength) 1657 } 1658 g = insert.Values[0].Values[i+1].GetListValue().Values[0].GetKind() 1659 if _, ok := g.(*structpb.Value_NullValue); ok { 1660 t.Fatalf("type mismatch\nGot: %v\nWant: non-NULL value", g) 1661 } 1662 } 1663 } 1664 } 1665} 1666 1667func TestClient_WriteStructWithCustomTypes(t *testing.T) { 1668 t.Parallel() 1669 server, client, teardown := setupMockedTestServer(t) 1670 defer teardown() 1671 type CustomString string 1672 type CustomBool bool 1673 type CustomInt64 int64 1674 type CustomFloat64 float64 1675 type CustomTime time.Time 1676 type CustomDate civil.Date 1677 type T struct { 1678 ID int64 1679 Col1 CustomString 1680 Col2 []CustomString 1681 Col3 CustomBool 1682 Col4 []CustomBool 1683 Col5 CustomInt64 1684 Col6 []CustomInt64 1685 Col7 CustomFloat64 1686 Col8 []CustomFloat64 1687 Col9 CustomTime 1688 Col10 []CustomTime 1689 Col11 CustomDate 1690 Col12 []CustomDate 1691 } 1692 t1 := T{ 1693 ID: 1, 1694 Col2: []CustomString{}, 1695 Col4: []CustomBool{}, 1696 Col6: []CustomInt64{}, 1697 Col8: []CustomFloat64{}, 1698 Col10: []CustomTime{}, 1699 Col12: []CustomDate{}, 1700 } 1701 t2 := T{ 1702 ID: 2, 1703 Col1: "foo", 1704 Col2: []CustomString{"foo"}, 1705 Col3: true, 1706 Col4: []CustomBool{true}, 1707 Col5: 100, 1708 Col6: []CustomInt64{100}, 1709 Col7: 3.14, 1710 Col8: []CustomFloat64{3.14}, 1711 Col9: CustomTime(time.Now()), 1712 Col10: []CustomTime{CustomTime(time.Now())}, 1713 Col11: CustomDate(civil.DateOf(time.Now())), 1714 Col12: []CustomDate{CustomDate(civil.DateOf(time.Now()))}, 1715 } 1716 m1, err := InsertStruct("Tab", &t1) 1717 if err != nil { 1718 t.Fatal(err) 1719 } 1720 m2, err := InsertStruct("Tab", &t2) 1721 if err != nil { 1722 t.Fatal(err) 1723 } 1724 _, err = client.Apply(context.Background(), []*Mutation{m1, m2}) 1725 if err != nil { 1726 t.Fatal(err) 1727 } 1728 requests := drainRequestsFromServer(server.TestSpanner) 1729 for _, req := range requests { 1730 if commit, ok := req.(*sppb.CommitRequest); ok { 1731 if g, w := len(commit.Mutations), 2; w != g { 1732 t.Fatalf("mutation count mismatch\nGot: %v\nWant: %v", g, w) 1733 } 1734 insert1 := commit.Mutations[0].GetInsert() 1735 row1 := insert1.Values[0] 1736 // The first insert should contain empty values and empty arrays 1737 for i := 1; i < len(row1.Values); i += 2 { 1738 // The non-array columns should contain empty values. 1739 g := row1.Values[i].GetKind() 1740 if _, ok := g.(*structpb.Value_NullValue); ok { 1741 t.Fatalf("type mismatch\nGot: %v\nWant: non-NULL value", g) 1742 } 1743 // The array columns should not be NULL. 1744 g, wList := row1.Values[i+1].GetKind(), &structpb.Value_ListValue{} 1745 if _, ok := g.(*structpb.Value_ListValue); !ok { 1746 t.Fatalf("type mismatch\nGot: %v\nWant: %v", g, wList) 1747 } 1748 } 1749 1750 // The second insert should contain all non-NULL values. 1751 insert2 := commit.Mutations[1].GetInsert() 1752 row2 := insert2.Values[0] 1753 for i := 1; i < len(row2.Values); i += 2 { 1754 // The non-array columns should contain non-NULL values. 1755 g := row2.Values[i].GetKind() 1756 if _, ok := g.(*structpb.Value_NullValue); ok { 1757 t.Fatalf("type mismatch\nGot: %v\nWant: non-NULL value", g) 1758 } 1759 // The array columns should also be non-NULL. 1760 g, wList := row2.Values[i+1].GetKind(), &structpb.Value_ListValue{} 1761 if _, ok := g.(*structpb.Value_ListValue); !ok { 1762 t.Fatalf("type mismatch\nGot: %v\nWant: %v", g, wList) 1763 } 1764 // The array should contain exactly 1 non-NULL value. 1765 if gLength, wLength := len(row2.Values[i+1].GetListValue().Values), 1; gLength != wLength { 1766 t.Fatalf("list value length mismatch\nGot: %v\nWant: %v", gLength, wLength) 1767 } 1768 g = row2.Values[i+1].GetListValue().Values[0].GetKind() 1769 if _, ok := g.(*structpb.Value_NullValue); ok { 1770 t.Fatalf("type mismatch\nGot: %v\nWant: non-NULL value", g) 1771 } 1772 } 1773 } 1774 } 1775} 1776 1777func TestReadWriteTransaction_ContextTimeoutDuringCommit(t *testing.T) { 1778 t.Parallel() 1779 server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ 1780 SessionPoolConfig: SessionPoolConfig{ 1781 MinOpened: 1, 1782 WriteSessions: 0, 1783 }, 1784 }) 1785 defer teardown() 1786 1787 // Wait until session creation has seized so that 1788 // context timeout won't happen while a session is being created. 1789 waitFor(t, func() error { 1790 sp := client.idleSessions 1791 sp.mu.Lock() 1792 defer sp.mu.Unlock() 1793 if sp.createReqs != 0 { 1794 return fmt.Errorf("%d sessions are still in creation", sp.createReqs) 1795 } 1796 return nil 1797 }) 1798 1799 server.TestSpanner.PutExecutionTime(MethodCommitTransaction, 1800 SimulatedExecutionTime{ 1801 MinimumExecutionTime: time.Minute, 1802 }) 1803 ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) 1804 defer cancel() 1805 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1806 tx.BufferWrite([]*Mutation{Insert("FOO", []string{"ID", "NAME"}, []interface{}{int64(1), "bar"})}) 1807 return nil 1808 }) 1809 errContext, _ := context.WithTimeout(context.Background(), -time.Second) 1810 w := toSpannerErrorWithCommitInfo(errContext.Err(), true).(*Error) 1811 var se *Error 1812 if !errorAs(err, &se) { 1813 t.Fatalf("Error mismatch\nGot: %v\nWant: %v", err, w) 1814 } 1815 if se.GRPCStatus().Code() != w.GRPCStatus().Code() { 1816 t.Fatalf("Error status mismatch:\nGot: %v\nWant: %v", se.GRPCStatus(), w.GRPCStatus()) 1817 } 1818 if se.Error() != w.Error() { 1819 t.Fatalf("Error message mismatch:\nGot %s\nWant: %s", se.Error(), w.Error()) 1820 } 1821 var outcome *TransactionOutcomeUnknownError 1822 if !errorAs(err, &outcome) { 1823 t.Fatalf("Missing wrapped TransactionOutcomeUnknownError error") 1824 } 1825} 1826 1827func TestFailedCommit_NoRollback(t *testing.T) { 1828 t.Parallel() 1829 server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ 1830 SessionPoolConfig: SessionPoolConfig{ 1831 MinOpened: 0, 1832 MaxOpened: 1, 1833 WriteSessions: 0, 1834 }, 1835 }) 1836 defer teardown() 1837 server.TestSpanner.PutExecutionTime(MethodCommitTransaction, 1838 SimulatedExecutionTime{ 1839 Errors: []error{status.Errorf(codes.InvalidArgument, "Invalid mutations")}, 1840 }) 1841 _, err := client.Apply(context.Background(), []*Mutation{ 1842 Insert("FOO", []string{"ID", "BAR"}, []interface{}{1, "value"}), 1843 }) 1844 if got, want := status.Convert(err).Code(), codes.InvalidArgument; got != want { 1845 t.Fatalf("Error mismatch\nGot: %v\nWant: %v", got, want) 1846 } 1847 // The failed commit should not trigger a rollback after the commit. 1848 if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{ 1849 &sppb.BatchCreateSessionsRequest{}, 1850 &sppb.BeginTransactionRequest{}, 1851 &sppb.CommitRequest{}, 1852 }); err != nil { 1853 t.Fatalf("Received RPCs mismatch: %v", err) 1854 } 1855} 1856 1857func TestFailedUpdate_ShouldRollback(t *testing.T) { 1858 t.Parallel() 1859 server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ 1860 SessionPoolConfig: SessionPoolConfig{ 1861 MinOpened: 0, 1862 MaxOpened: 1, 1863 WriteSessions: 0, 1864 }, 1865 }) 1866 defer teardown() 1867 server.TestSpanner.PutExecutionTime(MethodExecuteSql, 1868 SimulatedExecutionTime{ 1869 Errors: []error{status.Errorf(codes.InvalidArgument, "Invalid update")}, 1870 }) 1871 _, err := client.ReadWriteTransaction(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error { 1872 _, err := tx.Update(ctx, NewStatement("UPDATE FOO SET BAR='value' WHERE ID=1")) 1873 return err 1874 }) 1875 if got, want := status.Convert(err).Code(), codes.InvalidArgument; got != want { 1876 t.Fatalf("Error mismatch\nGot: %v\nWant: %v", got, want) 1877 } 1878 // The failed update should trigger a rollback. 1879 if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{ 1880 &sppb.BatchCreateSessionsRequest{}, 1881 &sppb.BeginTransactionRequest{}, 1882 &sppb.ExecuteSqlRequest{}, 1883 &sppb.RollbackRequest{}, 1884 }); err != nil { 1885 t.Fatalf("Received RPCs mismatch: %v", err) 1886 } 1887} 1888 1889func TestClient_NumChannels(t *testing.T) { 1890 t.Parallel() 1891 1892 configuredNumChannels := 8 1893 _, client, teardown := setupMockedTestServerWithConfig( 1894 t, 1895 ClientConfig{NumChannels: configuredNumChannels}, 1896 ) 1897 defer teardown() 1898 if g, w := client.sc.connPool.Num(), configuredNumChannels; g != w { 1899 t.Fatalf("NumChannels mismatch\nGot: %v\nWant: %v", g, w) 1900 } 1901} 1902 1903func TestClient_WithGRPCConnectionPool(t *testing.T) { 1904 t.Parallel() 1905 1906 configuredConnPool := 8 1907 _, client, teardown := setupMockedTestServerWithConfigAndClientOptions( 1908 t, 1909 ClientConfig{}, 1910 []option.ClientOption{option.WithGRPCConnectionPool(configuredConnPool)}, 1911 ) 1912 defer teardown() 1913 if g, w := client.sc.connPool.Num(), configuredConnPool; g != w { 1914 t.Fatalf("NumChannels mismatch\nGot: %v\nWant: %v", g, w) 1915 } 1916} 1917 1918func TestClient_WithGRPCConnectionPoolAndNumChannels(t *testing.T) { 1919 t.Parallel() 1920 1921 configuredNumChannels := 8 1922 configuredConnPool := 8 1923 _, client, teardown := setupMockedTestServerWithConfigAndClientOptions( 1924 t, 1925 ClientConfig{NumChannels: configuredNumChannels}, 1926 []option.ClientOption{option.WithGRPCConnectionPool(configuredConnPool)}, 1927 ) 1928 defer teardown() 1929 if g, w := client.sc.connPool.Num(), configuredConnPool; g != w { 1930 t.Fatalf("NumChannels mismatch\nGot: %v\nWant: %v", g, w) 1931 } 1932} 1933 1934func TestClient_WithGRPCConnectionPoolAndNumChannels_Misconfigured(t *testing.T) { 1935 t.Parallel() 1936 1937 // Deliberately misconfigure NumChannels and ConnPool. 1938 configuredNumChannels := 8 1939 configuredConnPool := 16 1940 1941 _, opts, serverTeardown := NewMockedSpannerInMemTestServer(t) 1942 defer serverTeardown() 1943 opts = append(opts, option.WithGRPCConnectionPool(configuredConnPool)) 1944 1945 _, err := NewClientWithConfig(context.Background(), "projects/p/instances/i/databases/d", ClientConfig{NumChannels: configuredNumChannels}, opts...) 1946 msg := "Connection pool mismatch:" 1947 if err == nil { 1948 t.Fatalf("Error mismatch\nGot: nil\nWant: %s", msg) 1949 } 1950 var se *Error 1951 if ok := errorAs(err, &se); !ok { 1952 t.Fatalf("Error mismatch\nGot: %v\nWant: An instance of a Spanner error", err) 1953 } 1954 if g, w := se.GRPCStatus().Code(), codes.InvalidArgument; g != w { 1955 t.Fatalf("Error code mismatch\nGot: %v\nWant: %v", g, w) 1956 } 1957 if !strings.Contains(se.Error(), msg) { 1958 t.Fatalf("Error message mismatch\nGot: %s\nWant: %s", se.Error(), msg) 1959 } 1960} 1961 1962func TestClient_CallOptions(t *testing.T) { 1963 t.Parallel() 1964 co := &vkit.CallOptions{ 1965 CreateSession: []gax.CallOption{ 1966 gax.WithRetry(func() gax.Retryer { 1967 return gax.OnCodes([]codes.Code{ 1968 codes.Unavailable, codes.DeadlineExceeded, 1969 }, gax.Backoff{ 1970 Initial: 200 * time.Millisecond, 1971 Max: 30000 * time.Millisecond, 1972 Multiplier: 1.25, 1973 }) 1974 }), 1975 }, 1976 } 1977 1978 _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{CallOptions: co}) 1979 defer teardown() 1980 1981 c, err := client.sc.nextClient() 1982 if err != nil { 1983 t.Fatalf("failed to get a session client: %v", err) 1984 } 1985 1986 cs := &gax.CallSettings{} 1987 // This is the default retry setting. 1988 c.CallOptions.CreateSession[0].Resolve(cs) 1989 if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14]}"; got != want { 1990 t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want) 1991 } 1992 1993 // This is the custom retry setting. 1994 c.CallOptions.CreateSession[1].Resolve(cs) 1995 if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{200000000 30000000000 1.25 0} [14 4]}"; got != want { 1996 t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want) 1997 } 1998} 1999 2000func TestClient_QueryWithCallOptions(t *testing.T) { 2001 t.Parallel() 2002 co := &vkit.CallOptions{ 2003 ExecuteSql: []gax.CallOption{ 2004 gax.WithRetry(func() gax.Retryer { 2005 return gax.OnCodes([]codes.Code{ 2006 codes.DeadlineExceeded, 2007 }, gax.Backoff{ 2008 Initial: 200 * time.Millisecond, 2009 Max: 30000 * time.Millisecond, 2010 Multiplier: 1.25, 2011 }) 2012 }), 2013 }, 2014 } 2015 server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{CallOptions: co}) 2016 server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{ 2017 Errors: []error{status.Error(codes.DeadlineExceeded, "Deadline exceeded")}, 2018 }) 2019 defer teardown() 2020 ctx := context.Background() 2021 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2022 _, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}) 2023 if err != nil { 2024 return err 2025 } 2026 return nil 2027 }) 2028 if err != nil { 2029 t.Fatal(err) 2030 } 2031} 2032 2033func TestClient_ShouldReceiveMetadataForEmptyResultSet(t *testing.T) { 2034 t.Parallel() 2035 2036 server, client, teardown := setupMockedTestServer(t) 2037 // This creates an empty result set. 2038 res := server.CreateSingleRowSingersResult(SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount) 2039 sql := "SELECT SingerId, AlbumId, AlbumTitle FROM Albums WHERE 1=2" 2040 server.TestSpanner.PutStatementResult(sql, res) 2041 defer teardown() 2042 ctx := context.Background() 2043 iter := client.Single().Query(ctx, NewStatement(sql)) 2044 defer iter.Stop() 2045 row, err := iter.Next() 2046 if err != iterator.Done { 2047 t.Errorf("Query result mismatch:\nGot: %v\nWant: <no rows>", row) 2048 } 2049 metadata := iter.Metadata 2050 if metadata == nil { 2051 t.Fatalf("Missing ResultSet Metadata") 2052 } 2053 if metadata.RowType == nil { 2054 t.Fatalf("Missing ResultSet RowType") 2055 } 2056 if metadata.RowType.Fields == nil { 2057 t.Fatalf("Missing ResultSet Fields") 2058 } 2059 if g, w := len(metadata.RowType.Fields), 3; g != w { 2060 t.Fatalf("Field count mismatch\nGot: %v\nWant: %v", g, w) 2061 } 2062 wantFieldNames := []string{"SingerId", "AlbumId", "AlbumTitle"} 2063 for i, w := range wantFieldNames { 2064 g := metadata.RowType.Fields[i].Name 2065 if g != w { 2066 t.Fatalf("Field[%v] name mismatch\nGot: %v\nWant: %v", i, g, w) 2067 } 2068 } 2069 wantFieldTypes := []sppb.TypeCode{sppb.TypeCode_INT64, sppb.TypeCode_INT64, sppb.TypeCode_STRING} 2070 for i, w := range wantFieldTypes { 2071 g := metadata.RowType.Fields[i].Type.Code 2072 if g != w { 2073 t.Fatalf("Field[%v] type mismatch\nGot: %v\nWant: %v", i, g, w) 2074 } 2075 } 2076} 2077 2078func TestClient_EncodeCustomFieldType(t *testing.T) { 2079 t.Parallel() 2080 2081 type typesTable struct { 2082 Int customStructToInt `spanner:"Int"` 2083 String customStructToString `spanner:"String"` 2084 Float customStructToFloat `spanner:"Float"` 2085 Bool customStructToBool `spanner:"Bool"` 2086 Time customStructToTime `spanner:"Time"` 2087 Date customStructToDate `spanner:"Date"` 2088 } 2089 2090 server, client, teardown := setupMockedTestServer(t) 2091 defer teardown() 2092 ctx := context.Background() 2093 2094 d := typesTable{ 2095 Int: customStructToInt{1, 23}, 2096 String: customStructToString{"A", "B"}, 2097 Float: customStructToFloat{1.23, 12.3}, 2098 Bool: customStructToBool{true, false}, 2099 Time: customStructToTime{"A", "B"}, 2100 Date: customStructToDate{"A", "B"}, 2101 } 2102 2103 m, err := InsertStruct("Types", &d) 2104 if err != nil { 2105 t.Fatalf("err: %v", err) 2106 } 2107 2108 ms := []*Mutation{m} 2109 _, err = client.Apply(ctx, ms) 2110 if err != nil { 2111 t.Fatalf("err: %v", err) 2112 } 2113 2114 reqs := drainRequestsFromServer(server.TestSpanner) 2115 2116 for _, req := range reqs { 2117 if commitReq, ok := req.(*sppb.CommitRequest); ok { 2118 val := commitReq.Mutations[0].GetInsert().Values[0] 2119 2120 if got, want := val.Values[0].GetStringValue(), "123"; got != want { 2121 t.Fatalf("value mismatch: got %v (kind %T), want %v", got, val.Values[0].GetKind(), want) 2122 } 2123 if got, want := val.Values[1].GetStringValue(), "A-B"; got != want { 2124 t.Fatalf("value mismatch: got %v (kind %T), want %v", got, val.Values[1].GetKind(), want) 2125 } 2126 if got, want := val.Values[2].GetNumberValue(), float64(123.123); got != want { 2127 t.Fatalf("value mismatch: got %v (kind %T), want %v", got, val.Values[2].GetKind(), want) 2128 } 2129 if got, want := val.Values[3].GetBoolValue(), true; got != want { 2130 t.Fatalf("value mismatch: got %v (kind %T), want %v", got, val.Values[3].GetKind(), want) 2131 } 2132 if got, want := val.Values[4].GetStringValue(), "2016-11-15T15:04:05.999999999Z"; got != want { 2133 t.Fatalf("value mismatch: got %v (kind %T), want %v", got, val.Values[4].GetKind(), want) 2134 } 2135 if got, want := val.Values[5].GetStringValue(), "2016-11-15"; got != want { 2136 t.Fatalf("value mismatch: got %v (kind %T), want %v", got, val.Values[5].GetKind(), want) 2137 } 2138 } 2139 } 2140} 2141 2142func setupDecodeCustomFieldResult(server *MockedSpannerInMemTestServer, stmt string) error { 2143 metadata := &sppb.ResultSetMetadata{ 2144 RowType: &sppb.StructType{ 2145 Fields: []*sppb.StructType_Field{ 2146 {Name: "Int", Type: &sppb.Type{Code: sppb.TypeCode_INT64}}, 2147 {Name: "String", Type: &sppb.Type{Code: sppb.TypeCode_STRING}}, 2148 {Name: "Float", Type: &sppb.Type{Code: sppb.TypeCode_FLOAT64}}, 2149 {Name: "Bool", Type: &sppb.Type{Code: sppb.TypeCode_BOOL}}, 2150 {Name: "Time", Type: &sppb.Type{Code: sppb.TypeCode_TIMESTAMP}}, 2151 {Name: "Date", Type: &sppb.Type{Code: sppb.TypeCode_DATE}}, 2152 }, 2153 }, 2154 } 2155 rowValues := []*structpb.Value{ 2156 {Kind: &structpb.Value_StringValue{StringValue: "123"}}, 2157 {Kind: &structpb.Value_StringValue{StringValue: "A-B"}}, 2158 {Kind: &structpb.Value_NumberValue{NumberValue: float64(123.123)}}, 2159 {Kind: &structpb.Value_BoolValue{BoolValue: true}}, 2160 {Kind: &structpb.Value_StringValue{StringValue: "2016-11-15T15:04:05.999999999Z"}}, 2161 {Kind: &structpb.Value_StringValue{StringValue: "2016-11-15"}}, 2162 } 2163 rows := []*structpb.ListValue{ 2164 {Values: rowValues}, 2165 } 2166 resultSet := &sppb.ResultSet{ 2167 Metadata: metadata, 2168 Rows: rows, 2169 } 2170 result := &StatementResult{ 2171 Type: StatementResultResultSet, 2172 ResultSet: resultSet, 2173 } 2174 return server.TestSpanner.PutStatementResult(stmt, result) 2175} 2176 2177func TestClient_DecodeCustomFieldType(t *testing.T) { 2178 t.Parallel() 2179 2180 type typesTable struct { 2181 Int customStructToInt `spanner:"Int"` 2182 String customStructToString `spanner:"String"` 2183 Float customStructToFloat `spanner:"Float"` 2184 Bool customStructToBool `spanner:"Bool"` 2185 Time customStructToTime `spanner:"Time"` 2186 Date customStructToDate `spanner:"Date"` 2187 } 2188 2189 server, client, teardown := setupMockedTestServer(t) 2190 defer teardown() 2191 2192 query := "SELECT * FROM Types" 2193 setupDecodeCustomFieldResult(server, query) 2194 2195 ctx := context.Background() 2196 stmt := Statement{SQL: query} 2197 iter := client.Single().Query(ctx, stmt) 2198 defer iter.Stop() 2199 2200 var results []typesTable 2201 for { 2202 row, err := iter.Next() 2203 if err == iterator.Done { 2204 break 2205 } 2206 if err != nil { 2207 t.Fatalf("failed to get next: %v", err) 2208 } 2209 2210 var d typesTable 2211 if err := row.ToStruct(&d); err != nil { 2212 t.Fatalf("failed to convert a row to a struct: %v", err) 2213 } 2214 results = append(results, d) 2215 } 2216 2217 if len(results) > 1 { 2218 t.Fatalf("mismatch length of array: got %v, want 1", results) 2219 } 2220 2221 want := typesTable{ 2222 Int: customStructToInt{1, 23}, 2223 String: customStructToString{"A", "B"}, 2224 Float: customStructToFloat{1.23, 12.3}, 2225 Bool: customStructToBool{true, false}, 2226 Time: customStructToTime{"A", "B"}, 2227 Date: customStructToDate{"A", "B"}, 2228 } 2229 got := results[0] 2230 if !testEqual(got, want) { 2231 t.Fatalf("mismatch result: got %v, want %v", got, want) 2232 } 2233} 2234 2235func TestClient_EmulatorWithCredentialsFile(t *testing.T) { 2236 old := os.Getenv("SPANNER_EMULATOR_HOST") 2237 defer os.Setenv("SPANNER_EMULATOR_HOST", old) 2238 2239 os.Setenv("SPANNER_EMULATOR_HOST", "localhost:1234") 2240 2241 client, err := NewClientWithConfig( 2242 context.Background(), 2243 "projects/p/instances/i/databases/d", 2244 ClientConfig{}, 2245 option.WithCredentialsFile("/path/to/key.json"), 2246 ) 2247 defer client.Close() 2248 if err != nil { 2249 t.Fatalf("Failed to create a client with credentials file when running against an emulator: %v", err) 2250 } 2251} 2252 2253func TestBatchReadOnlyTransaction_QueryOptions(t *testing.T) { 2254 ctx := context.Background() 2255 qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{ 2256 OptimizerVersion: "1", 2257 OptimizerStatisticsPackage: "latest", 2258 }} 2259 _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: qo}) 2260 defer teardown() 2261 2262 txn, err := client.BatchReadOnlyTransaction(ctx, StrongRead()) 2263 if err != nil { 2264 t.Fatal(err) 2265 } 2266 defer txn.Cleanup(ctx) 2267 2268 if txn.qo != qo { 2269 t.Fatalf("Query options are mismatched: got %v, want %v", txn.qo, qo) 2270 } 2271} 2272 2273func TestBatchReadOnlyTransactionFromID_QueryOptions(t *testing.T) { 2274 qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{ 2275 OptimizerVersion: "1", 2276 OptimizerStatisticsPackage: "latest", 2277 }} 2278 _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: qo}) 2279 defer teardown() 2280 2281 txn := client.BatchReadOnlyTransactionFromID(BatchReadOnlyTransactionID{}) 2282 2283 if txn.qo != qo { 2284 t.Fatalf("Query options are mismatched: got %v, want %v", txn.qo, qo) 2285 } 2286} 2287 2288type QueryOptionsTestCase struct { 2289 name string 2290 client QueryOptions 2291 env QueryOptions 2292 query QueryOptions 2293 want QueryOptions 2294} 2295 2296func queryOptionsTestCases() []QueryOptionsTestCase { 2297 statsPkg := "latest" 2298 return []QueryOptionsTestCase{ 2299 { 2300 "Client level", 2301 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, 2302 QueryOptions{Options: nil}, 2303 QueryOptions{Options: nil}, 2304 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, 2305 }, 2306 { 2307 "Environment level", 2308 QueryOptions{Options: nil}, 2309 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, 2310 QueryOptions{Options: nil}, 2311 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, 2312 }, 2313 { 2314 "Query level", 2315 QueryOptions{Options: nil}, 2316 QueryOptions{Options: nil}, 2317 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, 2318 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, 2319 }, 2320 { 2321 "Environment level has precedence", 2322 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, 2323 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}}, 2324 QueryOptions{Options: nil}, 2325 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}}, 2326 }, 2327 { 2328 "Query level has precedence than client level", 2329 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, 2330 QueryOptions{Options: nil}, 2331 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}}, 2332 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}}, 2333 }, 2334 { 2335 "Query level has highest precedence", 2336 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, 2337 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}}, 2338 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}}, 2339 QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}}, 2340 }, 2341 } 2342} 2343 2344func TestClient_DoForEachRow_ShouldNotEndSpanWithIteratorDoneError(t *testing.T) { 2345 // This test cannot be parallel, as the TestExporter does not support that. 2346 te := itestutil.NewTestExporter() 2347 defer te.Unregister() 2348 _, client, teardown := setupMockedTestServer(t) 2349 defer teardown() 2350 2351 iter := client.Single().Query(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 2352 iter.Do(func(r *Row) error { 2353 return nil 2354 }) 2355 select { 2356 case <-te.Stats: 2357 case <-time.After(1 * time.Second): 2358 t.Fatal("No stats were exported before timeout") 2359 } 2360 if len(te.Spans) == 0 { 2361 t.Fatal("No spans were exported") 2362 } 2363 s := te.Spans[len(te.Spans)-1].Status 2364 if s.Code != int32(codes.OK) { 2365 t.Errorf("Span status mismatch\nGot: %v\nWant: %v", s.Code, codes.OK) 2366 } 2367} 2368 2369func TestClient_DoForEachRow_ShouldEndSpanWithQueryError(t *testing.T) { 2370 // This test cannot be parallel, as the TestExporter does not support that. 2371 te := itestutil.NewTestExporter() 2372 defer te.Unregister() 2373 server, client, teardown := setupMockedTestServer(t) 2374 defer teardown() 2375 2376 sql := "SELECT * FROM" 2377 server.TestSpanner.PutStatementResult(sql, &StatementResult{ 2378 Type: StatementResultError, 2379 Err: status.Error(codes.InvalidArgument, "Invalid query"), 2380 }) 2381 2382 iter := client.Single().Query(context.Background(), NewStatement(sql)) 2383 iter.Do(func(r *Row) error { 2384 return nil 2385 }) 2386 select { 2387 case <-te.Stats: 2388 case <-time.After(1 * time.Second): 2389 t.Fatal("No stats were exported before timeout") 2390 } 2391 if len(te.Spans) == 0 { 2392 t.Fatal("No spans were exported") 2393 } 2394 s := te.Spans[len(te.Spans)-1].Status 2395 if s.Code != int32(codes.InvalidArgument) { 2396 t.Errorf("Span status mismatch\nGot: %v\nWant: %v", s.Code, codes.InvalidArgument) 2397 } 2398} 2399 2400func TestClient_ReadOnlyTransaction_Priority(t *testing.T) { 2401 t.Parallel() 2402 2403 server, client, teardown := setupMockedTestServer(t) 2404 defer teardown() 2405 for _, qo := range []QueryOptions{ 2406 {}, 2407 {Priority: sppb.RequestOptions_PRIORITY_HIGH}, 2408 } { 2409 for _, tx := range []*ReadOnlyTransaction{ 2410 client.Single(), 2411 client.ReadOnlyTransaction(), 2412 } { 2413 iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) 2414 iter.Next() 2415 iter.Stop() 2416 2417 if tx.singleUse { 2418 tx = client.Single() 2419 } 2420 iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority}) 2421 iter.Next() 2422 iter.Stop() 2423 2424 checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 2, sppb.RequestOptions{Priority: qo.Priority}) 2425 tx.Close() 2426 } 2427 } 2428} 2429 2430func TestClient_ReadWriteTransaction_Priority(t *testing.T) { 2431 t.Parallel() 2432 2433 server, client, teardown := setupMockedTestServer(t) 2434 defer teardown() 2435 for _, to := range []TransactionOptions{ 2436 {}, 2437 {CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM}, 2438 } { 2439 for _, qo := range []QueryOptions{ 2440 {}, 2441 {Priority: sppb.RequestOptions_PRIORITY_MEDIUM}, 2442 } { 2443 client.ReadWriteTransactionWithOptions(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error { 2444 iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) 2445 iter.Next() 2446 iter.Stop() 2447 2448 iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority}) 2449 iter.Next() 2450 iter.Stop() 2451 2452 tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) 2453 tx.BatchUpdateWithOptions(context.Background(), []Statement{ 2454 NewStatement(UpdateBarSetFoo), 2455 }, qo) 2456 checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{Priority: qo.Priority}) 2457 2458 return nil 2459 }, to) 2460 checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: to.CommitPriority}) 2461 } 2462 } 2463} 2464 2465func TestClient_StmtBasedReadWriteTransaction_Priority(t *testing.T) { 2466 t.Parallel() 2467 2468 server, client, teardown := setupMockedTestServer(t) 2469 defer teardown() 2470 for _, to := range []TransactionOptions{ 2471 {}, 2472 {CommitPriority: sppb.RequestOptions_PRIORITY_LOW}, 2473 } { 2474 for _, qo := range []QueryOptions{ 2475 {}, 2476 {Priority: sppb.RequestOptions_PRIORITY_LOW}, 2477 } { 2478 tx, _ := NewReadWriteStmtBasedTransactionWithOptions(context.Background(), client, to) 2479 iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) 2480 iter.Next() 2481 iter.Stop() 2482 2483 iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority}) 2484 iter.Next() 2485 iter.Stop() 2486 2487 tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) 2488 tx.BatchUpdateWithOptions(context.Background(), []Statement{ 2489 NewStatement(UpdateBarSetFoo), 2490 }, qo) 2491 2492 checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{Priority: qo.Priority}) 2493 tx.Commit(context.Background()) 2494 checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: to.CommitPriority}) 2495 } 2496 } 2497} 2498 2499func TestClient_PDML_Priority(t *testing.T) { 2500 t.Parallel() 2501 2502 server, client, teardown := setupMockedTestServer(t) 2503 defer teardown() 2504 2505 for _, qo := range []QueryOptions{ 2506 {}, 2507 {Priority: sppb.RequestOptions_PRIORITY_HIGH}, 2508 } { 2509 client.PartitionedUpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) 2510 checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{Priority: qo.Priority}) 2511 } 2512} 2513 2514func TestClient_Apply_Priority(t *testing.T) { 2515 t.Parallel() 2516 2517 server, client, teardown := setupMockedTestServer(t) 2518 defer teardown() 2519 2520 client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}) 2521 checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{}) 2522 2523 client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, Priority(sppb.RequestOptions_PRIORITY_HIGH)) 2524 checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_HIGH}) 2525 2526 client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce()) 2527 checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{}) 2528 2529 client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), Priority(sppb.RequestOptions_PRIORITY_MEDIUM)) 2530 checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_MEDIUM}) 2531} 2532 2533func TestClient_ReadOnlyTransaction_Tag(t *testing.T) { 2534 t.Parallel() 2535 2536 server, client, teardown := setupMockedTestServer(t) 2537 defer teardown() 2538 for _, qo := range []QueryOptions{ 2539 {}, 2540 {RequestTag: "tag-1"}, 2541 } { 2542 for _, tx := range []*ReadOnlyTransaction{ 2543 client.Single(), 2544 client.ReadOnlyTransaction(), 2545 } { 2546 iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) 2547 iter.Next() 2548 iter.Stop() 2549 2550 if tx.singleUse { 2551 tx = client.Single() 2552 } 2553 iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag}) 2554 iter.Next() 2555 iter.Stop() 2556 2557 checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 2, sppb.RequestOptions{RequestTag: qo.RequestTag}) 2558 tx.Close() 2559 } 2560 } 2561} 2562 2563func TestClient_ReadWriteTransaction_Tag(t *testing.T) { 2564 t.Parallel() 2565 2566 server, client, teardown := setupMockedTestServer(t) 2567 defer teardown() 2568 for _, to := range []TransactionOptions{ 2569 {}, 2570 {TransactionTag: "tx-tag-1"}, 2571 } { 2572 for _, qo := range []QueryOptions{ 2573 {}, 2574 {RequestTag: "request-tag-1"}, 2575 } { 2576 client.ReadWriteTransactionWithOptions(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error { 2577 iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) 2578 iter.Next() 2579 iter.Stop() 2580 2581 iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag}) 2582 iter.Next() 2583 iter.Stop() 2584 2585 tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) 2586 tx.BatchUpdateWithOptions(context.Background(), []Statement{ 2587 NewStatement(UpdateBarSetFoo), 2588 }, qo) 2589 2590 // Check for SQL requests inside the transaction to prevent the check to 2591 // drain the commit request from the server. 2592 checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{RequestTag: qo.RequestTag, TransactionTag: to.TransactionTag}) 2593 return nil 2594 }, to) 2595 checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: to.TransactionTag}) 2596 } 2597 } 2598} 2599 2600func TestClient_StmtBasedReadWriteTransaction_Tag(t *testing.T) { 2601 t.Parallel() 2602 2603 server, client, teardown := setupMockedTestServer(t) 2604 defer teardown() 2605 for _, to := range []TransactionOptions{ 2606 {}, 2607 {TransactionTag: "tx-tag-1"}, 2608 } { 2609 for _, qo := range []QueryOptions{ 2610 {}, 2611 {RequestTag: "request-tag-1"}, 2612 } { 2613 tx, _ := NewReadWriteStmtBasedTransactionWithOptions(context.Background(), client, to) 2614 iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) 2615 iter.Next() 2616 iter.Stop() 2617 2618 iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag}) 2619 iter.Next() 2620 iter.Stop() 2621 2622 tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) 2623 tx.BatchUpdateWithOptions(context.Background(), []Statement{ 2624 NewStatement(UpdateBarSetFoo), 2625 }, qo) 2626 checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{RequestTag: qo.RequestTag, TransactionTag: to.TransactionTag}) 2627 2628 tx.Commit(context.Background()) 2629 checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: to.TransactionTag}) 2630 } 2631 } 2632} 2633 2634func TestClient_PDML_Tag(t *testing.T) { 2635 t.Parallel() 2636 2637 server, client, teardown := setupMockedTestServer(t) 2638 defer teardown() 2639 2640 for _, qo := range []QueryOptions{ 2641 {}, 2642 {RequestTag: "request-tag-1"}, 2643 } { 2644 client.PartitionedUpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) 2645 checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{RequestTag: qo.RequestTag}) 2646 } 2647} 2648 2649func TestClient_Apply_Tagging(t *testing.T) { 2650 t.Parallel() 2651 2652 server, client, teardown := setupMockedTestServer(t) 2653 defer teardown() 2654 2655 client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}) 2656 checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{}) 2657 2658 client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, TransactionTag("tx-tag")) 2659 checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: "tx-tag"}) 2660 2661 client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce()) 2662 checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{}) 2663 2664 client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), TransactionTag("tx-tag")) 2665 checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: "tx-tag"}) 2666} 2667 2668func TestClient_PartitionQuery_RequestOptions(t *testing.T) { 2669 t.Parallel() 2670 2671 server, client, teardown := setupMockedTestServer(t) 2672 defer teardown() 2673 2674 for _, qo := range []QueryOptions{ 2675 {}, 2676 {Priority: sppb.RequestOptions_PRIORITY_LOW}, 2677 {RequestTag: "batch-query-tag"}, 2678 {Priority: sppb.RequestOptions_PRIORITY_MEDIUM, RequestTag: "batch-query-with-medium-prio"}, 2679 } { 2680 ctx := context.Background() 2681 txn, _ := client.BatchReadOnlyTransaction(ctx, StrongRead()) 2682 partitions, _ := txn.PartitionQueryWithOptions(ctx, NewStatement(SelectFooFromBar), PartitionOptions{MaxPartitions: 10}, qo) 2683 for _, p := range partitions { 2684 iter := txn.Execute(ctx, p) 2685 iter.Next() 2686 iter.Stop() 2687 } 2688 checkRequestsForExpectedRequestOptions(t, server.TestSpanner, len(partitions), sppb.RequestOptions{RequestTag: qo.RequestTag, Priority: qo.Priority}) 2689 } 2690} 2691 2692func TestClient_PartitionRead_RequestOptions(t *testing.T) { 2693 t.Parallel() 2694 2695 server, client, teardown := setupMockedTestServer(t) 2696 defer teardown() 2697 2698 for _, ro := range []ReadOptions{ 2699 {}, 2700 {Priority: sppb.RequestOptions_PRIORITY_LOW}, 2701 {RequestTag: "batch-read-tag"}, 2702 {Priority: sppb.RequestOptions_PRIORITY_MEDIUM, RequestTag: "batch-read-with-medium-prio"}, 2703 } { 2704 ctx := context.Background() 2705 txn, _ := client.BatchReadOnlyTransaction(ctx, StrongRead()) 2706 partitions, _ := txn.PartitionReadWithOptions(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}, PartitionOptions{MaxPartitions: 10}, ro) 2707 for _, p := range partitions { 2708 iter := txn.Execute(ctx, p) 2709 iter.Next() 2710 iter.Stop() 2711 } 2712 checkRequestsForExpectedRequestOptions(t, server.TestSpanner, len(partitions), sppb.RequestOptions{RequestTag: ro.RequestTag, Priority: ro.Priority}) 2713 } 2714} 2715 2716func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, reqCount int, ro sppb.RequestOptions) { 2717 reqs := drainRequestsFromServer(server) 2718 reqOptions := []*sppb.RequestOptions{} 2719 2720 for _, req := range reqs { 2721 if sqlReq, ok := req.(*sppb.ExecuteSqlRequest); ok { 2722 reqOptions = append(reqOptions, sqlReq.RequestOptions) 2723 } 2724 if batchReq, ok := req.(*sppb.ExecuteBatchDmlRequest); ok { 2725 reqOptions = append(reqOptions, batchReq.RequestOptions) 2726 } 2727 if readReq, ok := req.(*sppb.ReadRequest); ok { 2728 reqOptions = append(reqOptions, readReq.RequestOptions) 2729 } 2730 } 2731 2732 if got, want := len(reqOptions), reqCount; got != want { 2733 t.Fatalf("Requests length mismatch\nGot: %v\nWant: %v", got, want) 2734 } 2735 2736 for _, opts := range reqOptions { 2737 if opts == nil { 2738 opts = &sppb.RequestOptions{} 2739 } 2740 if got, want := opts.Priority, ro.Priority; got != want { 2741 t.Fatalf("Request priority mismatch\nGot: %v\nWant: %v", got, want) 2742 } 2743 if got, want := opts.RequestTag, ro.RequestTag; got != want { 2744 t.Fatalf("Request tag mismatch\nGot: %v\nWant: %v", got, want) 2745 } 2746 if got, want := opts.TransactionTag, ro.TransactionTag; got != want { 2747 t.Fatalf("Transaction tag mismatch\nGot: %v\nWant: %v", got, want) 2748 } 2749 } 2750} 2751 2752func checkCommitForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, ro sppb.RequestOptions) { 2753 reqs := drainRequestsFromServer(server) 2754 var commit *sppb.CommitRequest 2755 var ok bool 2756 2757 for _, req := range reqs { 2758 if commit, ok = req.(*sppb.CommitRequest); ok { 2759 break 2760 } 2761 } 2762 2763 if commit == nil { 2764 t.Fatalf("Missing commit request") 2765 } 2766 2767 var got sppb.RequestOptions_Priority 2768 if commit.RequestOptions != nil { 2769 got = commit.RequestOptions.Priority 2770 } 2771 want := ro.Priority 2772 if got != want { 2773 t.Fatalf("Commit priority mismatch\nGot: %v\nWant: %v", got, want) 2774 } 2775 2776 var requestTag string 2777 var transactionTag string 2778 if commit.RequestOptions != nil { 2779 requestTag = commit.RequestOptions.RequestTag 2780 transactionTag = commit.RequestOptions.TransactionTag 2781 } 2782 if got, want := requestTag, ro.RequestTag; got != want { 2783 t.Fatalf("Commit request tag mismatch\nGot: %v\nWant: %v", got, want) 2784 } 2785 if got, want := transactionTag, ro.TransactionTag; got != want { 2786 t.Fatalf("Commit transaction tag mismatch\nGot: %v\nWant: %v", got, want) 2787 } 2788} 2789 2790func TestClient_Single_Read_WithNumericKey(t *testing.T) { 2791 t.Parallel() 2792 2793 _, client, teardown := setupMockedTestServer(t) 2794 defer teardown() 2795 ctx := context.Background() 2796 iter := client.Single().Read(ctx, "Albums", KeySets(Key{*big.NewRat(1, 1)}), []string{"SingerId", "AlbumId", "AlbumTitle"}) 2797 defer iter.Stop() 2798 rowCount := int64(0) 2799 for { 2800 _, err := iter.Next() 2801 if err == iterator.Done { 2802 break 2803 } 2804 if err != nil { 2805 t.Fatal(err) 2806 } 2807 rowCount++ 2808 } 2809 if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount { 2810 t.Fatalf("row count mismatch\nGot: %v\nWant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount) 2811 } 2812} 2813 2814func TestClient_CloseWithUnresponsiveBackend(t *testing.T) { 2815 t.Parallel() 2816 2817 minOpened := uint64(5) 2818 server, client, teardown := setupMockedTestServerWithConfig(t, 2819 ClientConfig{ 2820 SessionPoolConfig: SessionPoolConfig{ 2821 MinOpened: minOpened, 2822 }, 2823 }) 2824 defer teardown() 2825 sp := client.idleSessions 2826 2827 waitFor(t, func() error { 2828 sp.mu.Lock() 2829 defer sp.mu.Unlock() 2830 if uint64(sp.idleList.Len()) != minOpened { 2831 return fmt.Errorf("num open sessions mismatch\nWant: %d\nGot: %d", sp.MinOpened, sp.numOpened) 2832 } 2833 return nil 2834 }) 2835 server.TestSpanner.Freeze() 2836 defer server.TestSpanner.Unfreeze() 2837 2838 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 2839 defer cancel() 2840 sp.close(ctx) 2841 2842 if w, g := context.DeadlineExceeded, ctx.Err(); w != g { 2843 t.Fatalf("context error mismatch\nWant: %v\nGot: %v", w, g) 2844 } 2845} 2846