1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "errors" 22 "flag" 23 "fmt" 24 "log" 25 "math" 26 "os" 27 "reflect" 28 "strings" 29 "sync" 30 "testing" 31 "time" 32 33 "cloud.google.com/go/civil" 34 "cloud.google.com/go/internal/testutil" 35 "cloud.google.com/go/internal/uid" 36 database "cloud.google.com/go/spanner/admin/database/apiv1" 37 instance "cloud.google.com/go/spanner/admin/instance/apiv1" 38 "google.golang.org/api/iterator" 39 "google.golang.org/api/option" 40 adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" 41 instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1" 42 sppb "google.golang.org/genproto/googleapis/spanner/v1" 43 "google.golang.org/grpc" 44 "google.golang.org/grpc/codes" 45 "google.golang.org/grpc/status" 46) 47 48var ( 49 // testProjectID specifies the project used for testing. It can be changed 50 // by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID. 51 testProjectID = testutil.ProjID() 52 53 dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true}) 54 instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true}) 55 testInstanceID = instanceNameSpace.New() 56 57 testTable = "TestTable" 58 testTableIndex = "TestTableByValue" 59 testTableColumns = []string{"Key", "StringValue"} 60 61 databaseAdmin *database.DatabaseAdminClient 62 instanceAdmin *instance.InstanceAdminClient 63 64 singerDBStatements = []string{ 65 `CREATE TABLE Singers ( 66 SingerId INT64 NOT NULL, 67 FirstName STRING(1024), 68 LastName STRING(1024), 69 SingerInfo BYTES(MAX) 70 ) PRIMARY KEY (SingerId)`, 71 `CREATE INDEX SingerByName ON Singers(FirstName, LastName)`, 72 `CREATE TABLE Accounts ( 73 AccountId INT64 NOT NULL, 74 Nickname STRING(100), 75 Balance INT64 NOT NULL, 76 ) PRIMARY KEY (AccountId)`, 77 `CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`, 78 `CREATE TABLE Types ( 79 RowID INT64 NOT NULL, 80 String STRING(MAX), 81 StringArray ARRAY<STRING(MAX)>, 82 Bytes BYTES(MAX), 83 BytesArray ARRAY<BYTES(MAX)>, 84 Int64a INT64, 85 Int64Array ARRAY<INT64>, 86 Bool BOOL, 87 BoolArray ARRAY<BOOL>, 88 Float64 FLOAT64, 89 Float64Array ARRAY<FLOAT64>, 90 Date DATE, 91 DateArray ARRAY<DATE>, 92 Timestamp TIMESTAMP, 93 TimestampArray ARRAY<TIMESTAMP>, 94 ) PRIMARY KEY (RowID)`, 95 } 96 97 readDBStatements = []string{ 98 `CREATE TABLE TestTable ( 99 Key STRING(MAX) NOT NULL, 100 StringValue STRING(MAX) 101 ) PRIMARY KEY (Key)`, 102 `CREATE INDEX TestTableByValue ON TestTable(StringValue)`, 103 `CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`, 104 } 105 106 simpleDBStatements = []string{ 107 `CREATE TABLE test ( 108 a STRING(1024), 109 b STRING(1024), 110 ) PRIMARY KEY (a)`, 111 } 112 simpleDBTableColumns = []string{"a", "b"} 113 114 ctsDBStatements = []string{ 115 `CREATE TABLE TestTable ( 116 Key STRING(MAX) NOT NULL, 117 Ts TIMESTAMP OPTIONS (allow_commit_timestamp = true), 118 ) PRIMARY KEY (Key)`, 119 } 120) 121 122const ( 123 str1 = "alice" 124 str2 = "a@example.com" 125) 126 127func TestMain(m *testing.M) { 128 cleanup := initIntegrationTests() 129 res := m.Run() 130 cleanup() 131 os.Exit(res) 132} 133 134var grpcHeaderChecker = testutil.DefaultHeadersEnforcer() 135 136func initIntegrationTests() (cleanup func()) { 137 ctx := context.Background() 138 flag.Parse() // Needed for testing.Short(). 139 noop := func() {} 140 141 if testing.Short() { 142 log.Println("Integration tests skipped in -short mode.") 143 return noop 144 } 145 146 if testProjectID == "" { 147 log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing") 148 return noop 149 } 150 151 opts := grpcHeaderChecker.CallOptions() 152 // Run integration tests against the given emulator. Currently, the database and 153 // instance admin clients are auto-generated, which do not support to configure 154 // SPANNER_EMULATOR_HOST. 155 emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST") 156 if emulatorAddr != "" { 157 opts = append( 158 opts, 159 option.WithEndpoint(emulatorAddr), 160 option.WithGRPCDialOption(grpc.WithInsecure()), 161 option.WithoutAuthentication(), 162 ) 163 } else { 164 ts := testutil.TokenSource(ctx, AdminScope, Scope) 165 if ts == nil { 166 log.Printf("Integration test skipped: cannot get service account credential from environment variable %v", "GCLOUD_TESTS_GOLANG_KEY") 167 return noop 168 } 169 170 opts = append(opts, option.WithTokenSource(ts), option.WithEndpoint(endpoint)) 171 } 172 173 var err error 174 // Create InstanceAdmin and DatabaseAdmin clients. 175 instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...) 176 if err != nil { 177 log.Fatalf("cannot create instance databaseAdmin client: %v", err) 178 } 179 databaseAdmin, err = database.NewDatabaseAdminClient(ctx, opts...) 180 if err != nil { 181 log.Fatalf("cannot create databaseAdmin client: %v", err) 182 } 183 // Get the list of supported instance configs for the project that is used 184 // for the integration tests. The supported instance configs can differ per 185 // project. The integration tests will use the first instance config that 186 // is returned by Cloud Spanner. This will normally be the regional config 187 // that is physically the closest to where the request is coming from. 188 configIterator := instanceAdmin.ListInstanceConfigs(ctx, &instancepb.ListInstanceConfigsRequest{ 189 Parent: fmt.Sprintf("projects/%s", testProjectID), 190 }) 191 config, err := configIterator.Next() 192 if err != nil { 193 log.Fatalf("Cannot get any instance configurations.\nPlease make sure the Cloud Spanner API is enabled for the test project.\nGet error: %v", err) 194 } 195 // Create a test instance to use for this test run. 196 op, err := instanceAdmin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{ 197 Parent: fmt.Sprintf("projects/%s", testProjectID), 198 InstanceId: testInstanceID, 199 Instance: &instancepb.Instance{ 200 Config: config.Name, 201 DisplayName: testInstanceID, 202 NodeCount: 1, 203 }, 204 }) 205 if err != nil { 206 log.Fatalf("could not create instance with id %s: %v", fmt.Sprintf("projects/%s/instances/%s", testProjectID, testInstanceID), err) 207 } 208 // Wait for the instance creation to finish. 209 i, err := op.Wait(ctx) 210 if err != nil { 211 log.Fatalf("waiting for instance creation to finish failed: %v", err) 212 } 213 if i.State != instancepb.Instance_READY { 214 log.Printf("instance state is not READY, it might be that the test instance will cause problems during tests. Got state %v\n", i.State) 215 } 216 217 return func() { 218 // Delete this test instance. 219 instanceName := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID) 220 if err = instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{ 221 Name: instanceName, 222 }); err != nil { 223 log.Printf("failed to drop instance %s (error %v), might need a manual removal", 224 instanceName, err) 225 } 226 // Delete other test instances that may be lingering around. 227 cleanupInstances() 228 databaseAdmin.Close() 229 instanceAdmin.Close() 230 } 231} 232 233func TestIntegration_InitSessionPool(t *testing.T) { 234 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 235 defer cancel() 236 // Set up an empty testing environment. 237 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, []string{}) 238 defer cleanup() 239 sp := client.idleSessions 240 sp.mu.Lock() 241 want := sp.MinOpened 242 sp.mu.Unlock() 243 var numOpened int 244loop: 245 for { 246 select { 247 case <-ctx.Done(): 248 t.Fatalf("timed out, got %d session(s), want %d", numOpened, want) 249 default: 250 sp.mu.Lock() 251 numOpened = sp.idleList.Len() + sp.idleWriteList.Len() 252 sp.mu.Unlock() 253 if uint64(numOpened) == want { 254 break loop 255 } 256 } 257 } 258 // Delete all sessions in the pool on the backend and then try to execute a 259 // simple query. The 'Session not found' error should cause an automatic 260 // retry of the read-only transaction. 261 sp.mu.Lock() 262 s := sp.idleList.Front() 263 for { 264 if s == nil { 265 break 266 } 267 // This will delete the session on the backend without removing it 268 // from the pool. 269 s.Value.(*session).delete(context.Background()) 270 s = s.Next() 271 } 272 sp.mu.Unlock() 273 sql := "SELECT 1, 'FOO', 'BAR'" 274 tx := client.ReadOnlyTransaction() 275 defer tx.Close() 276 iter := tx.Query(context.Background(), NewStatement(sql)) 277 rows, err := readAll(iter) 278 if err != nil { 279 t.Fatalf("Unexpected error for query %q: %v", sql, err) 280 } 281 if got, want := len(rows), 1; got != want { 282 t.Fatalf("Row count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want) 283 } 284 if got, want := len(rows[0]), 3; got != want { 285 t.Fatalf("Column count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want) 286 } 287 if got, want := rows[0][0].(int64), int64(1); got != want { 288 t.Fatalf("Column value mismatch for query %q\nGot: %v\nWant: %v", sql, got, want) 289 } 290} 291 292// Test SingleUse transaction. 293func TestIntegration_SingleUse(t *testing.T) { 294 t.Parallel() 295 296 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 297 defer cancel() 298 // Set up testing environment. 299 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 300 defer cleanup() 301 302 writes := []struct { 303 row []interface{} 304 ts time.Time 305 }{ 306 {row: []interface{}{1, "Marc", "Foo"}}, 307 {row: []interface{}{2, "Tars", "Bar"}}, 308 {row: []interface{}{3, "Alpha", "Beta"}}, 309 {row: []interface{}{4, "Last", "End"}}, 310 } 311 // Try to write four rows through the Apply API. 312 for i, w := range writes { 313 var err error 314 m := InsertOrUpdate("Singers", 315 []string{"SingerId", "FirstName", "LastName"}, 316 w.row) 317 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 318 t.Fatal(err) 319 } 320 } 321 // Calculate time difference between Cloud Spanner server and localhost to 322 // use to determine the exact staleness value to use. 323 timeDiff := maxDuration(time.Now().Sub(writes[0].ts), 0) 324 325 // Test reading rows with different timestamp bounds. 326 for i, test := range []struct { 327 name string 328 want [][]interface{} 329 tb TimestampBound 330 checkTs func(time.Time) error 331 }{ 332 { 333 name: "strong", 334 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 335 tb: StrongRead(), 336 checkTs: func(ts time.Time) error { 337 // writes[3] is the last write, all subsequent strong read 338 // should have a timestamp larger than that. 339 if ts.Before(writes[3].ts) { 340 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 341 } 342 return nil 343 }, 344 }, 345 { 346 name: "min_read_timestamp", 347 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 348 tb: MinReadTimestamp(writes[3].ts), 349 checkTs: func(ts time.Time) error { 350 if ts.Before(writes[3].ts) { 351 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 352 } 353 return nil 354 }, 355 }, 356 { 357 name: "max_staleness", 358 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 359 tb: MaxStaleness(time.Second), 360 checkTs: func(ts time.Time) error { 361 if ts.Before(writes[3].ts) { 362 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 363 } 364 return nil 365 }, 366 }, 367 { 368 name: "read_timestamp", 369 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}}, 370 tb: ReadTimestamp(writes[2].ts), 371 checkTs: func(ts time.Time) error { 372 if ts != writes[2].ts { 373 return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts) 374 } 375 return nil 376 }, 377 }, 378 { 379 name: "exact_staleness", 380 want: nil, 381 // Specify a staleness which should be already before this test. 382 tb: ExactStaleness(time.Now().Sub(writes[0].ts) + timeDiff + 30*time.Second), 383 checkTs: func(ts time.Time) error { 384 if !ts.Before(writes[0].ts) { 385 return fmt.Errorf("read got timestamp %v, want it to be earlier than %v", ts, writes[0].ts) 386 } 387 return nil 388 }, 389 }, 390 } { 391 t.Run(test.name, func(t *testing.T) { 392 // SingleUse.Query 393 su := client.Single().WithTimestampBound(test.tb) 394 got, err := readAll(su.Query( 395 ctx, 396 Statement{ 397 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4) ORDER BY SingerId", 398 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 399 })) 400 if err != nil { 401 t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err) 402 } 403 rts, err := su.Timestamp() 404 if err != nil { 405 t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err) 406 } 407 if err := test.checkTs(rts); err != nil { 408 t.Fatalf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err) 409 } 410 if !testEqual(got, test.want) { 411 t.Fatalf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want) 412 } 413 // SingleUse.Read 414 su = client.Single().WithTimestampBound(test.tb) 415 got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"})) 416 if err != nil { 417 t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err) 418 } 419 rts, err = su.Timestamp() 420 if err != nil { 421 t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err) 422 } 423 if err := test.checkTs(rts); err != nil { 424 t.Fatalf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err) 425 } 426 if !testEqual(got, test.want) { 427 t.Fatalf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want) 428 } 429 // SingleUse.ReadRow 430 got = nil 431 for _, k := range []Key{{1}, {3}, {4}} { 432 su = client.Single().WithTimestampBound(test.tb) 433 r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"}) 434 if err != nil { 435 continue 436 } 437 v, err := rowToValues(r) 438 if err != nil { 439 continue 440 } 441 got = append(got, v) 442 rts, err = su.Timestamp() 443 if err != nil { 444 t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err) 445 } 446 if err := test.checkTs(rts); err != nil { 447 t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err) 448 } 449 } 450 if !testEqual(got, test.want) { 451 t.Fatalf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want) 452 } 453 // SingleUse.ReadUsingIndex 454 su = client.Single().WithTimestampBound(test.tb) 455 got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"})) 456 if err != nil { 457 t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err) 458 } 459 // The results from ReadUsingIndex is sorted by the index rather than primary key. 460 if len(got) != len(test.want) { 461 t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) 462 } 463 for j, g := range got { 464 if j > 0 { 465 prev := got[j-1][1].(string) + got[j-1][2].(string) 466 curr := got[j][1].(string) + got[j][2].(string) 467 if strings.Compare(prev, curr) > 0 { 468 t.Fatalf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j]) 469 } 470 } 471 found := false 472 for _, w := range test.want { 473 if testEqual(g, w) { 474 found = true 475 } 476 } 477 if !found { 478 t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) 479 break 480 } 481 } 482 rts, err = su.Timestamp() 483 if err != nil { 484 t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err) 485 } 486 if err := test.checkTs(rts); err != nil { 487 t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err) 488 } 489 // SingleUse.ReadRowUsingIndex 490 got = nil 491 for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} { 492 su = client.Single().WithTimestampBound(test.tb) 493 r, err := su.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"}) 494 if err != nil { 495 continue 496 } 497 v, err := rowToValues(r) 498 if err != nil { 499 continue 500 } 501 got = append(got, v) 502 rts, err = su.Timestamp() 503 if err != nil { 504 t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err) 505 } 506 if err := test.checkTs(rts); err != nil { 507 t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err) 508 } 509 } 510 if !testEqual(got, test.want) { 511 t.Fatalf("%d: got unexpected results from SingleUse.ReadRowUsingIndex: %v, want %v", i, got, test.want) 512 } 513 }) 514 } 515} 516 517// Test resource-based routing enabled. 518func TestIntegration_SingleUse_WithResourceBasedRouting(t *testing.T) { 519 os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true") 520 defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "") 521 522 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 523 defer cancel() 524 // Set up testing environment. 525 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 526 defer cleanup() 527 528 writes := []struct { 529 row []interface{} 530 ts time.Time 531 }{ 532 {row: []interface{}{1, "Marc", "Foo"}}, 533 {row: []interface{}{2, "Tars", "Bar"}}, 534 {row: []interface{}{3, "Alpha", "Beta"}}, 535 {row: []interface{}{4, "Last", "End"}}, 536 } 537 // Try to write four rows through the Apply API. 538 for i, w := range writes { 539 var err error 540 m := InsertOrUpdate("Singers", 541 []string{"SingerId", "FirstName", "LastName"}, 542 w.row) 543 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 544 t.Fatal(err) 545 } 546 } 547 548 row, err := client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"}) 549 if err != nil { 550 t.Errorf("SingleUse.ReadRow returns error %v, want nil", err) 551 } 552 var got string 553 if err := row.Column(0, &got); err != nil { 554 t.Errorf("row.Column returns error %v, want nil", err) 555 } 556 if want := "Alpha"; got != want { 557 t.Errorf("got %q, want %q", got, want) 558 } 559} 560 561// Test custom query options provided on query-level configuration. 562func TestIntegration_SingleUse_WithQueryOptions(t *testing.T) { 563 skipEmulatorTest(t) 564 t.Parallel() 565 566 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 567 defer cancel() 568 // Set up testing environment. 569 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 570 defer cleanup() 571 572 writes := []struct { 573 row []interface{} 574 ts time.Time 575 }{ 576 {row: []interface{}{1, "Marc", "Foo"}}, 577 {row: []interface{}{2, "Tars", "Bar"}}, 578 {row: []interface{}{3, "Alpha", "Beta"}}, 579 {row: []interface{}{4, "Last", "End"}}, 580 } 581 // Try to write four rows through the Apply API. 582 for i, w := range writes { 583 var err error 584 m := InsertOrUpdate("Singers", 585 []string{"SingerId", "FirstName", "LastName"}, 586 w.row) 587 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 588 t.Fatal(err) 589 } 590 } 591 qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}} 592 got, err := readAll(client.Single().QueryWithOptions(ctx, Statement{ 593 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)", 594 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 595 }, qo)) 596 597 if err != nil { 598 t.Errorf("ReadOnlyTransaction.QueryWithOptions returns error %v, want nil", err) 599 } 600 601 want := [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}} 602 if !testEqual(got, want) { 603 t.Errorf("got unexpected result from ReadOnlyTransaction.QueryWithOptions: %v, want %v", got, want) 604 } 605} 606 607func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) { 608 t.Parallel() 609 610 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 611 defer cancel() 612 // Set up testing environment. 613 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 614 defer cleanup() 615 616 writes := []struct { 617 row []interface{} 618 ts time.Time 619 }{ 620 {row: []interface{}{1, "Marc", "Foo"}}, 621 {row: []interface{}{2, "Tars", "Bar"}}, 622 {row: []interface{}{3, "Alpha", "Beta"}}, 623 {row: []interface{}{4, "Last", "End"}}, 624 } 625 // Try to write four rows through the Apply API. 626 for i, w := range writes { 627 var err error 628 m := InsertOrUpdate("Singers", 629 []string{"SingerId", "FirstName", "LastName"}, 630 w.row) 631 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 632 t.Fatal(err) 633 } 634 } 635 636 su := client.Single() 637 const limit = 1 638 gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), 639 []string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit})) 640 if err != nil { 641 t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err) 642 } 643 if got, want := len(gotRows), limit; got != want { 644 t.Errorf("got %d, want %d", got, want) 645 } 646} 647 648// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it 649// also tests for a single timestamp across multiple reads. 650func TestIntegration_ReadOnlyTransaction(t *testing.T) { 651 t.Parallel() 652 653 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 654 defer cancel() 655 // Set up testing environment. 656 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 657 defer cleanup() 658 659 writes := []struct { 660 row []interface{} 661 ts time.Time 662 }{ 663 {row: []interface{}{1, "Marc", "Foo"}}, 664 {row: []interface{}{2, "Tars", "Bar"}}, 665 {row: []interface{}{3, "Alpha", "Beta"}}, 666 {row: []interface{}{4, "Last", "End"}}, 667 } 668 // Try to write four rows through the Apply API. 669 for i, w := range writes { 670 var err error 671 m := InsertOrUpdate("Singers", 672 []string{"SingerId", "FirstName", "LastName"}, 673 w.row) 674 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 675 t.Fatal(err) 676 } 677 } 678 679 // For testing timestamp bound staleness. 680 <-time.After(time.Second) 681 682 // Test reading rows with different timestamp bounds. 683 for i, test := range []struct { 684 want [][]interface{} 685 tb TimestampBound 686 checkTs func(time.Time) error 687 }{ 688 // Note: min_read_timestamp and max_staleness are not supported by 689 // ReadOnlyTransaction. See API document for more details. 690 { 691 // strong 692 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 693 StrongRead(), 694 func(ts time.Time) error { 695 if ts.Before(writes[3].ts) { 696 return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts) 697 } 698 return nil 699 }, 700 }, 701 { 702 // read_timestamp 703 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}}, 704 ReadTimestamp(writes[2].ts), 705 func(ts time.Time) error { 706 if ts != writes[2].ts { 707 return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts) 708 } 709 return nil 710 }, 711 }, 712 { 713 // exact_staleness 714 nil, 715 // Specify a staleness which should be already before this test 716 // because context timeout is set to be 10s. 717 ExactStaleness(11 * time.Second), 718 func(ts time.Time) error { 719 if ts.After(writes[0].ts) { 720 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts) 721 } 722 return nil 723 }, 724 }, 725 } { 726 // ReadOnlyTransaction.Query 727 ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb) 728 got, err := readAll(ro.Query( 729 ctx, 730 Statement{ 731 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4) ORDER BY SingerId", 732 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 733 })) 734 if err != nil { 735 t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err) 736 } 737 if !testEqual(got, test.want) { 738 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want) 739 } 740 rts, err := ro.Timestamp() 741 if err != nil { 742 t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err) 743 } 744 if err := test.checkTs(rts); err != nil { 745 t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err) 746 } 747 roTs := rts 748 // ReadOnlyTransaction.Read 749 got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"})) 750 if err != nil { 751 t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err) 752 } 753 if !testEqual(got, test.want) { 754 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want) 755 } 756 rts, err = ro.Timestamp() 757 if err != nil { 758 t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err) 759 } 760 if err := test.checkTs(rts); err != nil { 761 t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err) 762 } 763 if roTs != rts { 764 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 765 } 766 // ReadOnlyTransaction.ReadRow 767 got = nil 768 for _, k := range []Key{{1}, {3}, {4}} { 769 r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"}) 770 if err != nil { 771 continue 772 } 773 v, err := rowToValues(r) 774 if err != nil { 775 continue 776 } 777 got = append(got, v) 778 rts, err = ro.Timestamp() 779 if err != nil { 780 t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err) 781 } 782 if err := test.checkTs(rts); err != nil { 783 t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err) 784 } 785 if roTs != rts { 786 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 787 } 788 } 789 if !testEqual(got, test.want) { 790 t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want) 791 } 792 // SingleUse.ReadUsingIndex 793 got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"})) 794 if err != nil { 795 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err) 796 } 797 // The results from ReadUsingIndex is sorted by the index rather than 798 // primary key. 799 if len(got) != len(test.want) { 800 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want) 801 } 802 for j, g := range got { 803 if j > 0 { 804 prev := got[j-1][1].(string) + got[j-1][2].(string) 805 curr := got[j][1].(string) + got[j][2].(string) 806 if strings.Compare(prev, curr) > 0 { 807 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j]) 808 } 809 } 810 found := false 811 for _, w := range test.want { 812 if testEqual(g, w) { 813 found = true 814 } 815 } 816 if !found { 817 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want) 818 break 819 } 820 } 821 rts, err = ro.Timestamp() 822 if err != nil { 823 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err) 824 } 825 if err := test.checkTs(rts); err != nil { 826 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err) 827 } 828 if roTs != rts { 829 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 830 } 831 // ReadOnlyTransaction.ReadRowUsingIndex 832 got = nil 833 for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} { 834 r, err := ro.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"}) 835 if err != nil { 836 continue 837 } 838 v, err := rowToValues(r) 839 if err != nil { 840 continue 841 } 842 got = append(got, v) 843 rts, err = ro.Timestamp() 844 if err != nil { 845 t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err) 846 } 847 if err := test.checkTs(rts); err != nil { 848 t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err) 849 } 850 if roTs != rts { 851 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 852 } 853 } 854 if !testEqual(got, test.want) { 855 t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRowUsingIndex: %v, want %v", i, got, test.want) 856 } 857 ro.Close() 858 } 859} 860 861// Test ReadOnlyTransaction with different timestamp bound when there's an 862// update at the same time. 863func TestIntegration_UpdateDuringRead(t *testing.T) { 864 t.Parallel() 865 866 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 867 defer cancel() 868 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 869 defer cleanup() 870 871 for i, tb := range []TimestampBound{ 872 StrongRead(), 873 ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour 874 ExactStaleness(time.Minute * 30), 875 } { 876 ro := client.ReadOnlyTransaction().WithTimestampBound(tb) 877 _, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"}) 878 if ErrCode(err) != codes.NotFound { 879 t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err) 880 } 881 882 m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i}) 883 if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 884 t.Fatal(err) 885 } 886 887 _, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"}) 888 if ErrCode(err) != codes.NotFound { 889 t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err) 890 } 891 } 892} 893 894// Test ReadWriteTransaction. 895func TestIntegration_ReadWriteTransaction(t *testing.T) { 896 t.Parallel() 897 898 // Give a longer deadline because of transaction backoffs. 899 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 900 defer cancel() 901 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 902 defer cleanup() 903 904 // Set up two accounts 905 accounts := []*Mutation{ 906 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 907 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 908 } 909 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 910 t.Fatal(err) 911 } 912 wg := sync.WaitGroup{} 913 914 readBalance := func(iter *RowIterator) (int64, error) { 915 defer iter.Stop() 916 var bal int64 917 for { 918 row, err := iter.Next() 919 if err == iterator.Done { 920 return bal, nil 921 } 922 if err != nil { 923 return 0, err 924 } 925 if err := row.Column(0, &bal); err != nil { 926 return 0, err 927 } 928 } 929 } 930 931 for i := 0; i < 20; i++ { 932 wg.Add(1) 933 go func(iter int) { 934 defer wg.Done() 935 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 936 // Query Foo's balance and Bar's balance. 937 bf, e := readBalance(tx.Query(ctx, 938 Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}})) 939 if e != nil { 940 return e 941 } 942 bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"})) 943 if e != nil { 944 return e 945 } 946 if bf <= 0 { 947 return nil 948 } 949 bf-- 950 bb++ 951 return tx.BufferWrite([]*Mutation{ 952 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}), 953 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}), 954 }) 955 }) 956 if err != nil { 957 t.Errorf("%d: failed to execute transaction: %v", iter, err) 958 } 959 }(i) 960 } 961 // Because of context timeout, all goroutines will eventually return. 962 wg.Wait() 963 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 964 var bf, bb int64 965 r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"}) 966 if e != nil { 967 return e 968 } 969 if ce := r.Column(0, &bf); ce != nil { 970 return ce 971 } 972 bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"})) 973 if e != nil { 974 return e 975 } 976 if bf != 30 || bb != 21 { 977 t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21) 978 } 979 return nil 980 }) 981 if err != nil { 982 t.Errorf("failed to check balances: %v", err) 983 } 984} 985 986func TestIntegration_Reads(t *testing.T) { 987 t.Parallel() 988 989 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 990 defer cancel() 991 // Set up testing environment. 992 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 993 defer cleanup() 994 995 // Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2". 996 var ms []*Mutation 997 for i := 0; i < 15; i++ { 998 ms = append(ms, InsertOrUpdate(testTable, 999 testTableColumns, 1000 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)})) 1001 } 1002 // Don't use ApplyAtLeastOnce, so we can test the other code path. 1003 if _, err := client.Apply(ctx, ms); err != nil { 1004 t.Fatal(err) 1005 } 1006 1007 // Empty read. 1008 rows, err := readAllTestTable(client.Single().Read(ctx, testTable, 1009 KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns)) 1010 if err != nil { 1011 t.Fatal(err) 1012 } 1013 if got, want := len(rows), 0; got != want { 1014 t.Errorf("got %d, want %d", got, want) 1015 } 1016 1017 // Index empty read. 1018 rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, 1019 KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns)) 1020 if err != nil { 1021 t.Fatal(err) 1022 } 1023 if got, want := len(rows), 0; got != want { 1024 t.Errorf("got %d, want %d", got, want) 1025 } 1026 1027 // Point read. 1028 row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns) 1029 if err != nil { 1030 t.Fatal(err) 1031 } 1032 var got testTableRow 1033 if err := row.ToStruct(&got); err != nil { 1034 t.Fatal(err) 1035 } 1036 if want := (testTableRow{"k1", "v1"}); got != want { 1037 t.Errorf("got %v, want %v", got, want) 1038 } 1039 1040 // Point read not found. 1041 _, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns) 1042 if ErrCode(err) != codes.NotFound { 1043 t.Fatalf("got %v, want NotFound", err) 1044 } 1045 1046 // Index point read. 1047 rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns) 1048 if err != nil { 1049 t.Fatal(err) 1050 } 1051 var gotIndex testTableRow 1052 if err := rowIndex.ToStruct(&gotIndex); err != nil { 1053 t.Fatal(err) 1054 } 1055 if wantIndex := (testTableRow{"k1", "v1"}); gotIndex != wantIndex { 1056 t.Errorf("got %v, want %v", gotIndex, wantIndex) 1057 } 1058 // Index point read not found. 1059 _, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v999"}, testTableColumns) 1060 if ErrCode(err) != codes.NotFound { 1061 t.Fatalf("got %v, want NotFound", err) 1062 } 1063 rangeReads(ctx, t, client) 1064 indexRangeReads(ctx, t, client) 1065} 1066 1067func TestIntegration_EarlyTimestamp(t *testing.T) { 1068 t.Parallel() 1069 1070 // Test that we can get the timestamp from a read-only transaction as 1071 // soon as we have read at least one row. 1072 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1073 defer cancel() 1074 // Set up testing environment. 1075 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 1076 defer cleanup() 1077 1078 var ms []*Mutation 1079 for i := 0; i < 3; i++ { 1080 ms = append(ms, InsertOrUpdate(testTable, 1081 testTableColumns, 1082 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)})) 1083 } 1084 if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil { 1085 t.Fatal(err) 1086 } 1087 1088 txn := client.Single() 1089 iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns) 1090 defer iter.Stop() 1091 // In single-use transaction, we should get an error before reading anything. 1092 if _, err := txn.Timestamp(); err == nil { 1093 t.Error("wanted error, got nil") 1094 } 1095 // After reading one row, the timestamp should be available. 1096 _, err := iter.Next() 1097 if err != nil { 1098 t.Fatal(err) 1099 } 1100 if _, err := txn.Timestamp(); err != nil { 1101 t.Errorf("got %v, want nil", err) 1102 } 1103 1104 txn = client.ReadOnlyTransaction() 1105 defer txn.Close() 1106 iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns) 1107 defer iter.Stop() 1108 // In an ordinary read-only transaction, the timestamp should be 1109 // available immediately. 1110 if _, err := txn.Timestamp(); err != nil { 1111 t.Errorf("got %v, want nil", err) 1112 } 1113} 1114 1115func TestIntegration_NestedTransaction(t *testing.T) { 1116 t.Parallel() 1117 1118 // You cannot use a transaction from inside a read-write transaction. 1119 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1120 defer cancel() 1121 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1122 defer cleanup() 1123 1124 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1125 _, err := client.ReadWriteTransaction(ctx, 1126 func(context.Context, *ReadWriteTransaction) error { return nil }) 1127 if ErrCode(err) != codes.FailedPrecondition { 1128 t.Fatalf("got %v, want FailedPrecondition", err) 1129 } 1130 _, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"}) 1131 if ErrCode(err) != codes.FailedPrecondition { 1132 t.Fatalf("got %v, want FailedPrecondition", err) 1133 } 1134 rot := client.ReadOnlyTransaction() 1135 defer rot.Close() 1136 _, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"}) 1137 if ErrCode(err) != codes.FailedPrecondition { 1138 t.Fatalf("got %v, want FailedPrecondition", err) 1139 } 1140 return nil 1141 }) 1142 if err != nil { 1143 t.Fatal(err) 1144 } 1145} 1146 1147// Test client recovery on database recreation. 1148func TestIntegration_DbRemovalRecovery(t *testing.T) { 1149 t.Parallel() 1150 1151 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1152 defer cancel() 1153 // Create a client with MinOpened=0 to prevent the session pool maintainer 1154 // from repeatedly trying to create sessions for the invalid database. 1155 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, SessionPoolConfig{}, singerDBStatements) 1156 defer cleanup() 1157 1158 // Drop the testing database. 1159 if err := databaseAdmin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil { 1160 t.Fatalf("failed to drop testing database %v: %v", dbPath, err) 1161 } 1162 1163 // Now, send the query. 1164 iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"}) 1165 defer iter.Stop() 1166 if _, err := iter.Next(); err == nil { 1167 t.Errorf("client sends query to removed database successfully, want it to fail") 1168 } 1169 1170 // Recreate database and table. 1171 dbName := dbPath[strings.LastIndex(dbPath, "/")+1:] 1172 op, err := databaseAdmin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{ 1173 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 1174 CreateStatement: "CREATE DATABASE " + dbName, 1175 ExtraStatements: []string{ 1176 `CREATE TABLE Singers ( 1177 SingerId INT64 NOT NULL, 1178 FirstName STRING(1024), 1179 LastName STRING(1024), 1180 SingerInfo BYTES(MAX) 1181 ) PRIMARY KEY (SingerId)`, 1182 }, 1183 }) 1184 if err != nil { 1185 t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err) 1186 } 1187 if _, err := op.Wait(ctx); err != nil { 1188 t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err) 1189 } 1190 1191 // Now, send the query again. 1192 iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"}) 1193 defer iter.Stop() 1194 _, err = iter.Next() 1195 if err != nil && err != iterator.Done { 1196 t.Errorf("failed to send query to database %v: %v", dbPath, err) 1197 } 1198} 1199 1200// Test encoding/decoding non-struct Cloud Spanner types. 1201func TestIntegration_BasicTypes(t *testing.T) { 1202 t.Parallel() 1203 1204 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1205 defer cancel() 1206 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1207 defer cleanup() 1208 1209 t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z") 1210 // Boundaries 1211 t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z") 1212 t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z") 1213 d1, _ := civil.ParseDate("2016-11-15") 1214 // Boundaries 1215 d2, _ := civil.ParseDate("0001-01-01") 1216 d3, _ := civil.ParseDate("9999-12-31") 1217 1218 tests := []struct { 1219 col string 1220 val interface{} 1221 want interface{} 1222 }{ 1223 {col: "String", val: ""}, 1224 {col: "String", val: "", want: NullString{"", true}}, 1225 {col: "String", val: "foo"}, 1226 {col: "String", val: "foo", want: NullString{"foo", true}}, 1227 {col: "String", val: NullString{"bar", true}, want: "bar"}, 1228 {col: "String", val: NullString{"bar", false}, want: NullString{"", false}}, 1229 {col: "String", val: nil, want: NullString{}}, 1230 {col: "StringArray", val: []string(nil), want: []NullString(nil)}, 1231 {col: "StringArray", val: []string{}, want: []NullString{}}, 1232 {col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}}, 1233 {col: "StringArray", val: []NullString(nil)}, 1234 {col: "StringArray", val: []NullString{}}, 1235 {col: "StringArray", val: []NullString{{"foo", true}, {}}}, 1236 {col: "Bytes", val: []byte{}}, 1237 {col: "Bytes", val: []byte{1, 2, 3}}, 1238 {col: "Bytes", val: []byte(nil)}, 1239 {col: "BytesArray", val: [][]byte(nil)}, 1240 {col: "BytesArray", val: [][]byte{}}, 1241 {col: "BytesArray", val: [][]byte{{1}, {2, 3}}}, 1242 {col: "Int64a", val: 0, want: int64(0)}, 1243 {col: "Int64a", val: -1, want: int64(-1)}, 1244 {col: "Int64a", val: 2, want: int64(2)}, 1245 {col: "Int64a", val: int64(3)}, 1246 {col: "Int64a", val: 4, want: NullInt64{4, true}}, 1247 {col: "Int64a", val: NullInt64{5, true}, want: int64(5)}, 1248 {col: "Int64a", val: NullInt64{6, true}, want: int64(6)}, 1249 {col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}}, 1250 {col: "Int64a", val: nil, want: NullInt64{}}, 1251 {col: "Int64Array", val: []int(nil), want: []NullInt64(nil)}, 1252 {col: "Int64Array", val: []int{}, want: []NullInt64{}}, 1253 {col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}}, 1254 {col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)}, 1255 {col: "Int64Array", val: []int64{}, want: []NullInt64{}}, 1256 {col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}}, 1257 {col: "Int64Array", val: []NullInt64(nil)}, 1258 {col: "Int64Array", val: []NullInt64{}}, 1259 {col: "Int64Array", val: []NullInt64{{1, true}, {}}}, 1260 {col: "Bool", val: false}, 1261 {col: "Bool", val: true}, 1262 {col: "Bool", val: false, want: NullBool{false, true}}, 1263 {col: "Bool", val: true, want: NullBool{true, true}}, 1264 {col: "Bool", val: NullBool{true, true}}, 1265 {col: "Bool", val: NullBool{false, false}}, 1266 {col: "Bool", val: nil, want: NullBool{}}, 1267 {col: "BoolArray", val: []bool(nil), want: []NullBool(nil)}, 1268 {col: "BoolArray", val: []bool{}, want: []NullBool{}}, 1269 {col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}}, 1270 {col: "BoolArray", val: []NullBool(nil)}, 1271 {col: "BoolArray", val: []NullBool{}}, 1272 {col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}}, 1273 {col: "Float64", val: 0.0}, 1274 {col: "Float64", val: 3.14}, 1275 {col: "Float64", val: math.NaN()}, 1276 {col: "Float64", val: math.Inf(1)}, 1277 {col: "Float64", val: math.Inf(-1)}, 1278 {col: "Float64", val: 2.78, want: NullFloat64{2.78, true}}, 1279 {col: "Float64", val: NullFloat64{2.71, true}, want: 2.71}, 1280 {col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}}, 1281 {col: "Float64", val: NullFloat64{0, false}}, 1282 {col: "Float64", val: nil, want: NullFloat64{}}, 1283 {col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)}, 1284 {col: "Float64Array", val: []float64{}, want: []NullFloat64{}}, 1285 {col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}}, 1286 {col: "Float64Array", val: []NullFloat64(nil)}, 1287 {col: "Float64Array", val: []NullFloat64{}}, 1288 {col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}}, 1289 {col: "Date", val: d1}, 1290 {col: "Date", val: d1, want: NullDate{d1, true}}, 1291 {col: "Date", val: NullDate{d1, true}}, 1292 {col: "Date", val: NullDate{d1, true}, want: d1}, 1293 {col: "Date", val: NullDate{civil.Date{}, false}}, 1294 {col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)}, 1295 {col: "DateArray", val: []civil.Date{}, want: []NullDate{}}, 1296 {col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}}, 1297 {col: "Timestamp", val: t1}, 1298 {col: "Timestamp", val: t1, want: NullTime{t1, true}}, 1299 {col: "Timestamp", val: NullTime{t1, true}}, 1300 {col: "Timestamp", val: NullTime{t1, true}, want: t1}, 1301 {col: "Timestamp", val: NullTime{}}, 1302 {col: "Timestamp", val: nil, want: NullTime{}}, 1303 {col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)}, 1304 {col: "TimestampArray", val: []time.Time{}, want: []NullTime{}}, 1305 {col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}}, 1306 } 1307 1308 // Write rows into table first. 1309 var muts []*Mutation 1310 for i, test := range tests { 1311 muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val})) 1312 } 1313 if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil { 1314 t.Fatal(err) 1315 } 1316 1317 for i, test := range tests { 1318 row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col}) 1319 if err != nil { 1320 t.Fatalf("Unable to fetch row %v: %v", i, err) 1321 } 1322 // Create new instance of type of test.want. 1323 want := test.want 1324 if want == nil { 1325 want = test.val 1326 } 1327 gotp := reflect.New(reflect.TypeOf(want)) 1328 if err := row.Column(0, gotp.Interface()); err != nil { 1329 t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err) 1330 continue 1331 } 1332 got := reflect.Indirect(gotp).Interface() 1333 1334 // One of the test cases is checking NaN handling. Given 1335 // NaN!=NaN, we can't use reflect to test for it. 1336 if isNaN(got) && isNaN(want) { 1337 continue 1338 } 1339 1340 // Check non-NaN cases. 1341 if !testEqual(got, want) { 1342 t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want) 1343 continue 1344 } 1345 } 1346} 1347 1348// Test decoding Cloud Spanner STRUCT type. 1349func TestIntegration_StructTypes(t *testing.T) { 1350 t.Parallel() 1351 1352 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1353 defer cancel() 1354 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1355 defer cleanup() 1356 1357 tests := []struct { 1358 q Statement 1359 want func(r *Row) error 1360 }{ 1361 { 1362 q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`}, 1363 want: func(r *Row) error { 1364 // Test STRUCT ARRAY decoding to []NullRow. 1365 var rows []NullRow 1366 if err := r.Column(0, &rows); err != nil { 1367 return err 1368 } 1369 if len(rows) != 1 { 1370 return fmt.Errorf("len(rows) = %d; want 1", len(rows)) 1371 } 1372 if !rows[0].Valid { 1373 return fmt.Errorf("rows[0] is NULL") 1374 } 1375 var i, j int64 1376 if err := rows[0].Row.Columns(&i, &j); err != nil { 1377 return err 1378 } 1379 if i != 1 || j != 2 { 1380 return fmt.Errorf("got (%d,%d), want (1,2)", i, j) 1381 } 1382 return nil 1383 }, 1384 }, 1385 { 1386 q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`}, 1387 want: func(r *Row) error { 1388 // Test Row.ToStruct. 1389 s := struct { 1390 Col1 []*struct { 1391 Foo int64 `spanner:"foo"` 1392 Bar int64 `spanner:"bar"` 1393 } `spanner:"col1"` 1394 }{} 1395 if err := r.ToStruct(&s); err != nil { 1396 return err 1397 } 1398 want := struct { 1399 Col1 []*struct { 1400 Foo int64 `spanner:"foo"` 1401 Bar int64 `spanner:"bar"` 1402 } `spanner:"col1"` 1403 }{ 1404 Col1: []*struct { 1405 Foo int64 `spanner:"foo"` 1406 Bar int64 `spanner:"bar"` 1407 }{ 1408 { 1409 Foo: 1, 1410 Bar: 2, 1411 }, 1412 }, 1413 } 1414 if !testEqual(want, s) { 1415 return fmt.Errorf("unexpected decoding result: %v, want %v", s, want) 1416 } 1417 return nil 1418 }, 1419 }, 1420 } 1421 for i, test := range tests { 1422 iter := client.Single().Query(ctx, test.q) 1423 defer iter.Stop() 1424 row, err := iter.Next() 1425 if err != nil { 1426 t.Errorf("%d: %v", i, err) 1427 continue 1428 } 1429 if err := test.want(row); err != nil { 1430 t.Errorf("%d: %v", i, err) 1431 continue 1432 } 1433 } 1434} 1435 1436func TestIntegration_StructParametersUnsupported(t *testing.T) { 1437 skipEmulatorTest(t) 1438 t.Parallel() 1439 1440 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1441 defer cancel() 1442 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 1443 defer cleanup() 1444 1445 for _, test := range []struct { 1446 param interface{} 1447 wantCode codes.Code 1448 wantMsgPart string 1449 }{ 1450 { 1451 struct { 1452 Field int 1453 }{10}, 1454 codes.Unimplemented, 1455 "Unsupported query shape: " + 1456 "A struct value cannot be returned as a column value. " + 1457 "Rewrite the query to flatten the struct fields in the result.", 1458 }, 1459 { 1460 []struct { 1461 Field int 1462 }{{10}, {20}}, 1463 codes.Unimplemented, 1464 "Unsupported query shape: " + 1465 "This query can return a null-valued array of struct, " + 1466 "which is not supported by Spanner.", 1467 }, 1468 } { 1469 iter := client.Single().Query(ctx, Statement{ 1470 SQL: "SELECT @p", 1471 Params: map[string]interface{}{"p": test.param}, 1472 }) 1473 _, err := iter.Next() 1474 iter.Stop() 1475 if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok { 1476 t.Fatal(msg) 1477 } 1478 } 1479} 1480 1481// Test queries of the form "SELECT expr". 1482func TestIntegration_QueryExpressions(t *testing.T) { 1483 t.Parallel() 1484 1485 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1486 defer cancel() 1487 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 1488 defer cleanup() 1489 1490 newRow := func(vals []interface{}) *Row { 1491 row, err := NewRow(make([]string, len(vals)), vals) 1492 if err != nil { 1493 t.Fatal(err) 1494 } 1495 return row 1496 } 1497 1498 tests := []struct { 1499 expr string 1500 want interface{} 1501 }{ 1502 {"1", int64(1)}, 1503 {"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}}, 1504 {"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}}, 1505 {"IEEE_DIVIDE(1, 0)", math.Inf(1)}, 1506 {"IEEE_DIVIDE(-1, 0)", math.Inf(-1)}, 1507 {"IEEE_DIVIDE(0, 0)", math.NaN()}, 1508 // TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate. 1509 {"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}}, 1510 {"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}}, 1511 {"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}}, 1512 } 1513 for _, test := range tests { 1514 iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr}) 1515 defer iter.Stop() 1516 row, err := iter.Next() 1517 if err != nil { 1518 t.Errorf("%q: %v", test.expr, err) 1519 continue 1520 } 1521 // Create new instance of type of test.want. 1522 gotp := reflect.New(reflect.TypeOf(test.want)) 1523 if err := row.Column(0, gotp.Interface()); err != nil { 1524 t.Errorf("%q: Column returned error %v", test.expr, err) 1525 continue 1526 } 1527 got := reflect.Indirect(gotp).Interface() 1528 // TODO(jba): remove isNaN special case when we have a better equality predicate. 1529 if isNaN(got) && isNaN(test.want) { 1530 continue 1531 } 1532 if !testEqual(got, test.want) { 1533 t.Errorf("%q\n got %#v\nwant %#v", test.expr, got, test.want) 1534 } 1535 } 1536} 1537 1538func TestIntegration_QueryStats(t *testing.T) { 1539 skipEmulatorTest(t) 1540 t.Parallel() 1541 1542 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1543 defer cancel() 1544 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1545 defer cleanup() 1546 1547 accounts := []*Mutation{ 1548 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 1549 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 1550 } 1551 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1552 t.Fatal(err) 1553 } 1554 const sql = "SELECT Balance FROM Accounts" 1555 1556 qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil}) 1557 if err != nil { 1558 t.Fatal(err) 1559 } 1560 if len(qp.PlanNodes) == 0 { 1561 t.Error("got zero plan nodes, expected at least one") 1562 } 1563 1564 iter := client.Single().QueryWithStats(ctx, Statement{sql, nil}) 1565 defer iter.Stop() 1566 for { 1567 _, err := iter.Next() 1568 if err == iterator.Done { 1569 break 1570 } 1571 if err != nil { 1572 t.Fatal(err) 1573 } 1574 } 1575 if iter.QueryPlan == nil { 1576 t.Error("got nil QueryPlan, expected one") 1577 } 1578 if iter.QueryStats == nil { 1579 t.Error("got nil QueryStats, expected some") 1580 } 1581} 1582 1583func TestIntegration_InvalidDatabase(t *testing.T) { 1584 t.Parallel() 1585 1586 if databaseAdmin == nil { 1587 t.Skip("Integration tests skipped") 1588 } 1589 ctx := context.Background() 1590 dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID) 1591 c, err := createClient(ctx, dbPath, SessionPoolConfig{}) 1592 // Client creation should succeed even if the database is invalid. 1593 if err != nil { 1594 t.Fatal(err) 1595 } 1596 _, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"}) 1597 if msg, ok := matchError(err, codes.NotFound, ""); !ok { 1598 t.Fatal(msg) 1599 } 1600} 1601 1602func TestIntegration_ReadErrors(t *testing.T) { 1603 t.Parallel() 1604 1605 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1606 defer cancel() 1607 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 1608 defer cleanup() 1609 1610 var ms []*Mutation 1611 for i := 0; i < 2; i++ { 1612 ms = append(ms, InsertOrUpdate(testTable, 1613 testTableColumns, 1614 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v")})) 1615 } 1616 if _, err := client.Apply(ctx, ms); err != nil { 1617 t.Fatal(err) 1618 } 1619 1620 // Read over invalid table fails 1621 _, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"}) 1622 if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok { 1623 t.Error(msg) 1624 } 1625 // Read over invalid column fails 1626 _, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"}) 1627 if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok { 1628 t.Error(msg) 1629 } 1630 1631 // Invalid query fails 1632 iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"}) 1633 defer iter.Stop() 1634 _, err = iter.Next() 1635 if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok { 1636 t.Error(msg) 1637 } 1638 1639 // Read should fail on cancellation. 1640 cctx, cancel := context.WithCancel(ctx) 1641 cancel() 1642 _, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"}) 1643 if msg, ok := matchError(err, codes.Canceled, ""); !ok { 1644 t.Error(msg) 1645 } 1646 // Read should fail if deadline exceeded. 1647 dctx, cancel := context.WithTimeout(ctx, time.Nanosecond) 1648 defer cancel() 1649 <-dctx.Done() 1650 _, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"}) 1651 if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok { 1652 t.Error(msg) 1653 } 1654 // Read should fail if there are multiple rows returned. 1655 _, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v"}, testTableColumns) 1656 wantMsgPart := fmt.Sprintf("more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", testTable, Key{"v"}, testTableIndex) 1657 if msg, ok := matchError(err, codes.FailedPrecondition, wantMsgPart); !ok { 1658 t.Error(msg) 1659 } 1660} 1661 1662// Test TransactionRunner. Test that transactions are aborted and retried as 1663// expected. 1664func TestIntegration_TransactionRunner(t *testing.T) { 1665 skipEmulatorTest(t) 1666 t.Parallel() 1667 1668 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1669 defer cancel() 1670 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1671 defer cleanup() 1672 1673 // Test 1: User error should abort the transaction. 1674 _, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1675 tx.BufferWrite([]*Mutation{ 1676 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})}) 1677 return errors.New("user error") 1678 }) 1679 // Empty read. 1680 rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"})) 1681 if err != nil { 1682 t.Fatal(err) 1683 } 1684 if got, want := len(rows), 0; got != want { 1685 t.Errorf("Empty read, got %d, want %d.", got, want) 1686 } 1687 1688 // Test 2: Expect abort and retry. 1689 // We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by 1690 // committing writes to the column txn2 have read, and expect the following 1691 // read to abort and txn2 retries. 1692 1693 // Set up two accounts 1694 accounts := []*Mutation{ 1695 Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}), 1696 Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}), 1697 } 1698 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1699 t.Fatal(err) 1700 } 1701 1702 var ( 1703 cTxn1Start = make(chan struct{}) 1704 cTxn1Commit = make(chan struct{}) 1705 cTxn2Start = make(chan struct{}) 1706 wg sync.WaitGroup 1707 ) 1708 1709 // read balance, check error if we don't expect abort. 1710 readBalance := func(tx interface { 1711 ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) 1712 }, key int64, expectAbort bool) (int64, error) { 1713 var b int64 1714 r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"}) 1715 if e != nil { 1716 if expectAbort && !isAbortErr(e) { 1717 t.Errorf("ReadRow got %v, want Abort error.", e) 1718 } 1719 // Verify that we received and are able to extract retry info from 1720 // the aborted error. 1721 if _, hasRetryInfo := extractRetryDelay(e); !hasRetryInfo { 1722 t.Errorf("Got Abort error without RetryInfo\nGot: %v", e) 1723 } 1724 return b, e 1725 } 1726 if ce := r.Column(0, &b); ce != nil { 1727 return b, ce 1728 } 1729 return b, nil 1730 } 1731 1732 wg.Add(2) 1733 // Txn 1 1734 go func() { 1735 defer wg.Done() 1736 var once sync.Once 1737 _, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1738 b, e := readBalance(tx, 1, false) 1739 if e != nil { 1740 return e 1741 } 1742 // txn 1 can abort, in that case we skip closing the channel on 1743 // retry. 1744 once.Do(func() { close(cTxn1Start) }) 1745 e = tx.BufferWrite([]*Mutation{ 1746 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})}) 1747 if e != nil { 1748 return e 1749 } 1750 // Wait for second transaction. 1751 <-cTxn2Start 1752 return nil 1753 }) 1754 close(cTxn1Commit) 1755 if e != nil { 1756 t.Errorf("Transaction 1 commit, got %v, want nil.", e) 1757 } 1758 }() 1759 // Txn 2 1760 go func() { 1761 // Wait until txn 1 starts. 1762 <-cTxn1Start 1763 defer wg.Done() 1764 var ( 1765 once sync.Once 1766 b1 int64 1767 b2 int64 1768 e error 1769 ) 1770 _, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1771 if b1, e = readBalance(tx, 1, false); e != nil { 1772 return e 1773 } 1774 // Skip closing channel on retry. 1775 once.Do(func() { close(cTxn2Start) }) 1776 // Wait until txn 1 successfully commits. 1777 <-cTxn1Commit 1778 // Txn1 has committed and written a balance to the account. Now this 1779 // transaction (txn2) reads and re-writes the balance. The first 1780 // time through, it will abort because it overlaps with txn1. Then 1781 // it will retry after txn1 commits, and succeed. 1782 if b2, e = readBalance(tx, 2, true); e != nil { 1783 return e 1784 } 1785 return tx.BufferWrite([]*Mutation{ 1786 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})}) 1787 }) 1788 if e != nil { 1789 t.Errorf("Transaction 2 commit, got %v, want nil.", e) 1790 } 1791 }() 1792 wg.Wait() 1793 // Check that both transactions' effects are visible. 1794 for i := int64(1); i <= int64(2); i++ { 1795 if b, e := readBalance(client.Single(), i, false); e != nil { 1796 t.Fatalf("ReadBalance for key %d error %v.", i, e) 1797 } else if b != i { 1798 t.Errorf("Balance for key %d, got %d, want %d.", i, b, i) 1799 } 1800 } 1801} 1802 1803// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then 1804// serialize and deserialize both transaction and partition to be used in 1805// execution on another client, and compare results. 1806func TestIntegration_BatchQuery(t *testing.T) { 1807 skipEmulatorTest(t) 1808 t.Parallel() 1809 1810 // Set up testing environment. 1811 var ( 1812 client2 *Client 1813 err error 1814 ) 1815 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1816 defer cancel() 1817 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 1818 defer cleanup() 1819 1820 if err = populate(ctx, client); err != nil { 1821 t.Fatal(err) 1822 } 1823 if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil { 1824 t.Fatal(err) 1825 } 1826 defer client2.Close() 1827 1828 // PartitionQuery 1829 var ( 1830 txn *BatchReadOnlyTransaction 1831 partitions []*Partition 1832 stmt = Statement{SQL: "SELECT * FROM test;"} 1833 ) 1834 1835 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1836 t.Fatal(err) 1837 } 1838 defer txn.Cleanup(ctx) 1839 if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil { 1840 t.Fatal(err) 1841 } 1842 1843 // Reconstruct BatchReadOnlyTransactionID and execute partitions 1844 var ( 1845 tid2 BatchReadOnlyTransactionID 1846 data []byte 1847 gotResult bool // if we get matching result from two separate txns 1848 ) 1849 if data, err = txn.ID.MarshalBinary(); err != nil { 1850 t.Fatalf("encoding failed %v", err) 1851 } 1852 if err = tid2.UnmarshalBinary(data); err != nil { 1853 t.Fatalf("decoding failed %v", err) 1854 } 1855 txn2 := client2.BatchReadOnlyTransactionFromID(tid2) 1856 1857 // Execute Partitions and compare results 1858 for i, p := range partitions { 1859 iter := txn.Execute(ctx, p) 1860 defer iter.Stop() 1861 p2 := serdesPartition(t, i, p) 1862 iter2 := txn2.Execute(ctx, &p2) 1863 defer iter2.Stop() 1864 1865 row1, err1 := iter.Next() 1866 row2, err2 := iter2.Next() 1867 if err1 != err2 { 1868 t.Fatalf("execution failed for different reasons: %v, %v", err1, err2) 1869 continue 1870 } 1871 if !testEqual(row1, row2) { 1872 t.Fatalf("execution returned different values: %v, %v", row1, row2) 1873 continue 1874 } 1875 if row1 == nil { 1876 continue 1877 } 1878 var a, b string 1879 if err = row1.Columns(&a, &b); err != nil { 1880 t.Fatalf("failed to parse row %v", err) 1881 continue 1882 } 1883 if a == str1 && b == str2 { 1884 gotResult = true 1885 } 1886 } 1887 if !gotResult { 1888 t.Fatalf("execution didn't return expected values") 1889 } 1890} 1891 1892// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery 1893func TestIntegration_BatchRead(t *testing.T) { 1894 skipEmulatorTest(t) 1895 t.Parallel() 1896 1897 // Set up testing environment. 1898 var ( 1899 client2 *Client 1900 err error 1901 ) 1902 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1903 defer cancel() 1904 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 1905 defer cleanup() 1906 1907 if err = populate(ctx, client); err != nil { 1908 t.Fatal(err) 1909 } 1910 if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil { 1911 t.Fatal(err) 1912 } 1913 defer client2.Close() 1914 1915 // PartitionRead 1916 var ( 1917 txn *BatchReadOnlyTransaction 1918 partitions []*Partition 1919 ) 1920 1921 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1922 t.Fatal(err) 1923 } 1924 defer txn.Cleanup(ctx) 1925 if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { 1926 t.Fatal(err) 1927 } 1928 1929 // Reconstruct BatchReadOnlyTransactionID and execute partitions. 1930 var ( 1931 tid2 BatchReadOnlyTransactionID 1932 data []byte 1933 gotResult bool // if we get matching result from two separate txns 1934 ) 1935 if data, err = txn.ID.MarshalBinary(); err != nil { 1936 t.Fatalf("encoding failed %v", err) 1937 } 1938 if err = tid2.UnmarshalBinary(data); err != nil { 1939 t.Fatalf("decoding failed %v", err) 1940 } 1941 txn2 := client2.BatchReadOnlyTransactionFromID(tid2) 1942 1943 // Execute Partitions and compare results. 1944 for i, p := range partitions { 1945 iter := txn.Execute(ctx, p) 1946 defer iter.Stop() 1947 p2 := serdesPartition(t, i, p) 1948 iter2 := txn2.Execute(ctx, &p2) 1949 defer iter2.Stop() 1950 1951 row1, err1 := iter.Next() 1952 row2, err2 := iter2.Next() 1953 if err1 != err2 { 1954 t.Fatalf("execution failed for different reasons: %v, %v", err1, err2) 1955 continue 1956 } 1957 if !testEqual(row1, row2) { 1958 t.Fatalf("execution returned different values: %v, %v", row1, row2) 1959 continue 1960 } 1961 if row1 == nil { 1962 continue 1963 } 1964 var a, b string 1965 if err = row1.Columns(&a, &b); err != nil { 1966 t.Fatalf("failed to parse row %v", err) 1967 continue 1968 } 1969 if a == str1 && b == str2 { 1970 gotResult = true 1971 } 1972 } 1973 if !gotResult { 1974 t.Fatalf("execution didn't return expected values") 1975 } 1976} 1977 1978// Test normal txReadEnv method on BatchReadOnlyTransaction. 1979func TestIntegration_BROTNormal(t *testing.T) { 1980 skipEmulatorTest(t) 1981 t.Parallel() 1982 1983 // Set up testing environment and create txn. 1984 var ( 1985 txn *BatchReadOnlyTransaction 1986 err error 1987 row *Row 1988 i int64 1989 ) 1990 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1991 defer cancel() 1992 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 1993 defer cleanup() 1994 1995 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1996 t.Fatal(err) 1997 } 1998 defer txn.Cleanup(ctx) 1999 if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { 2000 t.Fatal(err) 2001 } 2002 // Normal query should work with BatchReadOnlyTransaction. 2003 stmt2 := Statement{SQL: "SELECT 1"} 2004 iter := txn.Query(ctx, stmt2) 2005 defer iter.Stop() 2006 2007 row, err = iter.Next() 2008 if err != nil { 2009 t.Errorf("query failed with %v", err) 2010 } 2011 if err = row.Columns(&i); err != nil { 2012 t.Errorf("failed to parse row %v", err) 2013 } 2014} 2015 2016func TestIntegration_CommitTimestamp(t *testing.T) { 2017 t.Parallel() 2018 2019 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2020 defer cancel() 2021 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, ctsDBStatements) 2022 defer cleanup() 2023 2024 type testTableRow struct { 2025 Key string 2026 Ts NullTime 2027 } 2028 2029 var ( 2030 cts1, cts2, ts1, ts2 time.Time 2031 err error 2032 ) 2033 2034 // Apply mutation in sequence, expect to see commit timestamp in good order, 2035 // check also the commit timestamp returned 2036 for _, it := range []struct { 2037 k string 2038 t *time.Time 2039 }{ 2040 {"a", &cts1}, 2041 {"b", &cts2}, 2042 } { 2043 tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}} 2044 m, err := InsertStruct("TestTable", tt) 2045 if err != nil { 2046 t.Fatal(err) 2047 } 2048 *it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()) 2049 if err != nil { 2050 t.Fatal(err) 2051 } 2052 } 2053 2054 txn := client.ReadOnlyTransaction() 2055 for _, it := range []struct { 2056 k string 2057 t *time.Time 2058 }{ 2059 {"a", &ts1}, 2060 {"b", &ts2}, 2061 } { 2062 if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil { 2063 t.Fatal(err) 2064 } else { 2065 var got testTableRow 2066 if err := r.ToStruct(&got); err != nil { 2067 t.Fatal(err) 2068 } 2069 *it.t = got.Ts.Time 2070 } 2071 } 2072 if !cts1.Equal(ts1) { 2073 t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1) 2074 } 2075 if !cts2.Equal(ts2) { 2076 t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2) 2077 } 2078 2079 // Try writing a timestamp in the future to commit timestamp, expect error. 2080 _, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce()) 2081 if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok { 2082 t.Error(msg) 2083 } 2084} 2085 2086func TestIntegration_DML(t *testing.T) { 2087 t.Parallel() 2088 2089 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2090 defer cancel() 2091 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2092 defer cleanup() 2093 2094 // Function that reads a single row's first name from within a transaction. 2095 readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) { 2096 row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"}) 2097 if err != nil { 2098 return "", err 2099 } 2100 var fn string 2101 if err := row.Column(0, &fn); err != nil { 2102 return "", err 2103 } 2104 return fn, nil 2105 } 2106 2107 // Function that reads multiple rows' first names from outside a read/write 2108 // transaction. 2109 readFirstNames := func(keys ...int) []string { 2110 var ks []KeySet 2111 for _, k := range keys { 2112 ks = append(ks, Key{k}) 2113 } 2114 iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"}) 2115 var got []string 2116 var fn string 2117 err := iter.Do(func(row *Row) error { 2118 if err := row.Column(0, &fn); err != nil { 2119 return err 2120 } 2121 got = append(got, fn) 2122 return nil 2123 }) 2124 if err != nil { 2125 t.Fatalf("readFirstNames(%v): %v", keys, err) 2126 } 2127 return got 2128 } 2129 2130 // Use ReadWriteTransaction.Query to execute a DML statement. 2131 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2132 iter := tx.Query(ctx, Statement{ 2133 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`, 2134 }) 2135 defer iter.Stop() 2136 if row, err := iter.Next(); err != iterator.Done { 2137 t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err) 2138 } 2139 if iter.RowCount != 1 { 2140 t.Errorf("row count: got %d, want 1", iter.RowCount) 2141 } 2142 // The results of the DML statement should be visible to the transaction. 2143 got, err := readFirstName(tx, 1) 2144 if err != nil { 2145 return err 2146 } 2147 if want := "Umm"; got != want { 2148 t.Errorf("got %q, want %q", got, want) 2149 } 2150 return nil 2151 }) 2152 if err != nil { 2153 t.Fatal(err) 2154 } 2155 2156 // Use ReadWriteTransaction.Update to execute a DML statement. 2157 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2158 count, err := tx.Update(ctx, Statement{ 2159 SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`, 2160 }) 2161 if err != nil { 2162 t.Fatal(err) 2163 } 2164 if count != 1 { 2165 t.Errorf("row count: got %d, want 1", count) 2166 } 2167 got, err := readFirstName(tx, 2) 2168 if err != nil { 2169 return err 2170 } 2171 if want := "Eduard"; got != want { 2172 t.Errorf("got %q, want %q", got, want) 2173 } 2174 return nil 2175 2176 }) 2177 if err != nil { 2178 t.Fatal(err) 2179 } 2180 2181 // Roll back a DML statement and confirm that it didn't happen. 2182 var fail = errors.New("fail") 2183 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2184 _, err := tx.Update(ctx, Statement{ 2185 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`, 2186 }) 2187 if err != nil { 2188 return err 2189 } 2190 return fail 2191 }) 2192 if err != fail { 2193 t.Fatalf("rolling back: got error %v, want the error 'fail'", err) 2194 } 2195 _, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"}) 2196 if got, want := ErrCode(err), codes.NotFound; got != want { 2197 t.Errorf("got %s, want %s", got, want) 2198 } 2199 2200 // Run two DML statements in the same transaction. 2201 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2202 _, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`}) 2203 if err != nil { 2204 return err 2205 } 2206 _, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`}) 2207 if err != nil { 2208 return err 2209 } 2210 return nil 2211 }) 2212 if err != nil { 2213 t.Fatal(err) 2214 } 2215 got := readFirstNames(1, 2) 2216 want := []string{"Oum", "Eddie"} 2217 if !testEqual(got, want) { 2218 t.Errorf("got %v, want %v", got, want) 2219 } 2220 2221 // Run a DML statement and an ordinary mutation in the same transaction. 2222 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2223 _, err := tx.Update(ctx, Statement{ 2224 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`, 2225 }) 2226 if err != nil { 2227 return err 2228 } 2229 return tx.BufferWrite([]*Mutation{ 2230 Insert("Singers", []string{"SingerId", "FirstName", "LastName"}, 2231 []interface{}{4, "Andy", "Irvine"}), 2232 }) 2233 }) 2234 if err != nil { 2235 t.Fatal(err) 2236 } 2237 got = readFirstNames(3, 4) 2238 want = []string{"Audra", "Andy"} 2239 if !testEqual(got, want) { 2240 t.Errorf("got %v, want %v", got, want) 2241 } 2242 2243 // Attempt to run a query using update. 2244 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2245 _, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`}) 2246 return err 2247 }) 2248 if got, want := ErrCode(err), codes.InvalidArgument; got != want { 2249 t.Errorf("got %s, want %s", got, want) 2250 } 2251} 2252 2253func TestIntegration_StructParametersBind(t *testing.T) { 2254 t.Parallel() 2255 2256 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2257 defer cancel() 2258 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 2259 defer cleanup() 2260 2261 type tRow []interface{} 2262 type tRows []struct{ trow tRow } 2263 2264 type allFields struct { 2265 Stringf string 2266 Intf int 2267 Boolf bool 2268 Floatf float64 2269 Bytef []byte 2270 Timef time.Time 2271 Datef civil.Date 2272 } 2273 allColumns := []string{ 2274 "Stringf", 2275 "Intf", 2276 "Boolf", 2277 "Floatf", 2278 "Bytef", 2279 "Timef", 2280 "Datef", 2281 } 2282 s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1} 2283 s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2} 2284 2285 dynamicStructType := reflect.StructOf([]reflect.StructField{ 2286 {Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`}, 2287 }) 2288 s3 := reflect.New(dynamicStructType) 2289 s3.Elem().Field(0).Set(reflect.ValueOf(t1)) 2290 2291 for i, test := range []struct { 2292 param interface{} 2293 sql string 2294 cols []string 2295 trows tRows 2296 }{ 2297 // Struct value. 2298 { 2299 s1, 2300 "SELECT" + 2301 " @p.Stringf," + 2302 " @p.Intf," + 2303 " @p.Boolf," + 2304 " @p.Floatf," + 2305 " @p.Bytef," + 2306 " @p.Timef," + 2307 " @p.Datef", 2308 allColumns, 2309 tRows{ 2310 {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}}, 2311 }, 2312 }, 2313 // Array of struct value. 2314 { 2315 []allFields{s1, s2}, 2316 "SELECT * FROM UNNEST(@p)", 2317 allColumns, 2318 tRows{ 2319 {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}}, 2320 {tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}}, 2321 }, 2322 }, 2323 // Null struct. 2324 { 2325 (*allFields)(nil), 2326 "SELECT @p IS NULL", 2327 []string{""}, 2328 tRows{ 2329 {tRow{true}}, 2330 }, 2331 }, 2332 // Null Array of struct. 2333 { 2334 []allFields(nil), 2335 "SELECT @p IS NULL", 2336 []string{""}, 2337 tRows{ 2338 {tRow{true}}, 2339 }, 2340 }, 2341 // Empty struct. 2342 { 2343 struct{}{}, 2344 "SELECT @p IS NULL ", 2345 []string{""}, 2346 tRows{ 2347 {tRow{false}}, 2348 }, 2349 }, 2350 // Empty array of struct. 2351 { 2352 []allFields{}, 2353 "SELECT * FROM UNNEST(@p) ", 2354 allColumns, 2355 tRows{}, 2356 }, 2357 // Struct with duplicate fields. 2358 { 2359 struct { 2360 A int `spanner:"field"` 2361 B int `spanner:"field"` 2362 }{10, 20}, 2363 "SELECT * FROM UNNEST([@p]) ", 2364 []string{"field", "field"}, 2365 tRows{ 2366 {tRow{10, 20}}, 2367 }, 2368 }, 2369 // Struct with unnamed fields. 2370 { 2371 struct { 2372 A string `spanner:""` 2373 }{"hello"}, 2374 "SELECT * FROM UNNEST([@p]) ", 2375 []string{""}, 2376 tRows{ 2377 {tRow{"hello"}}, 2378 }, 2379 }, 2380 // Mixed struct. 2381 { 2382 struct { 2383 DynamicStructField interface{} `spanner:"f1"` 2384 ArrayStructField []*allFields `spanner:"f2"` 2385 }{ 2386 DynamicStructField: s3.Interface(), 2387 ArrayStructField: []*allFields{nil}, 2388 }, 2389 "SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ", 2390 []string{"ff1", "", ""}, 2391 tRows{ 2392 {tRow{t1, 1, true}}, 2393 }, 2394 }, 2395 } { 2396 iter := client.Single().Query(ctx, Statement{ 2397 SQL: test.sql, 2398 Params: map[string]interface{}{"p": test.param}, 2399 }) 2400 var gotRows []*Row 2401 err := iter.Do(func(r *Row) error { 2402 gotRows = append(gotRows, r) 2403 return nil 2404 }) 2405 if err != nil { 2406 t.Errorf("Failed to execute test case %d, error: %v", i, err) 2407 } 2408 2409 var wantRows []*Row 2410 for j, row := range test.trows { 2411 r, err := NewRow(test.cols, row.trow) 2412 if err != nil { 2413 t.Errorf("Invalid row %d in test case %d", j, i) 2414 } 2415 wantRows = append(wantRows, r) 2416 } 2417 if !testEqual(gotRows, wantRows) { 2418 t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows) 2419 } 2420 } 2421} 2422 2423func TestIntegration_PDML(t *testing.T) { 2424 skipEmulatorTest(t) 2425 t.Parallel() 2426 2427 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2428 defer cancel() 2429 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2430 defer cleanup() 2431 2432 columns := []string{"SingerId", "FirstName", "LastName"} 2433 2434 // Populate the Singers table. 2435 var muts []*Mutation 2436 for _, row := range [][]interface{}{ 2437 {1, "Umm", "Kulthum"}, 2438 {2, "Eduard", "Khil"}, 2439 {3, "Audra", "McDonald"}, 2440 {4, "Enrique", "Iglesias"}, 2441 {5, "Shakira", "Ripoll"}, 2442 } { 2443 muts = append(muts, Insert("Singers", columns, row)) 2444 } 2445 if _, err := client.Apply(ctx, muts); err != nil { 2446 t.Fatal(err) 2447 } 2448 // Identifiers in PDML statements must be fully qualified. 2449 // TODO(jba): revisit the above. 2450 count, err := client.PartitionedUpdate(ctx, Statement{ 2451 SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= @end`, 2452 Params: map[string]interface{}{ 2453 "end": 3, 2454 }, 2455 }) 2456 if err != nil { 2457 t.Fatal(err) 2458 } 2459 if want := int64(3); count != want { 2460 t.Fatalf("got %d, want %d", count, want) 2461 } 2462 got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns)) 2463 if err != nil { 2464 t.Fatal(err) 2465 } 2466 want := [][]interface{}{ 2467 {int64(1), "changed", "Kulthum"}, 2468 {int64(2), "changed", "Khil"}, 2469 {int64(3), "changed", "McDonald"}, 2470 {int64(4), "Enrique", "Iglesias"}, 2471 {int64(5), "Shakira", "Ripoll"}, 2472 } 2473 if !testEqual(got, want) { 2474 t.Errorf("\ngot %v\nwant%v", got, want) 2475 } 2476} 2477 2478func TestIntegration_BatchDML(t *testing.T) { 2479 skipEmulatorTest(t) 2480 t.Parallel() 2481 2482 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2483 defer cancel() 2484 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2485 defer cleanup() 2486 2487 columns := []string{"SingerId", "FirstName", "LastName"} 2488 2489 // Populate the Singers table. 2490 var muts []*Mutation 2491 for _, row := range [][]interface{}{ 2492 {1, "Umm", "Kulthum"}, 2493 {2, "Eduard", "Khil"}, 2494 {3, "Audra", "McDonald"}, 2495 } { 2496 muts = append(muts, Insert("Singers", columns, row)) 2497 } 2498 if _, err := client.Apply(ctx, muts); err != nil { 2499 t.Fatal(err) 2500 } 2501 2502 var counts []int64 2503 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2504 counts, err = tx.BatchUpdate(ctx, []Statement{ 2505 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2506 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`}, 2507 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2508 }) 2509 return err 2510 }) 2511 2512 if err != nil { 2513 t.Fatal(err) 2514 } 2515 if want := []int64{1, 1, 1}; !testEqual(counts, want) { 2516 t.Fatalf("got %d, want %d", counts, want) 2517 } 2518 got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns)) 2519 if err != nil { 2520 t.Fatal(err) 2521 } 2522 want := [][]interface{}{ 2523 {int64(1), "changed 1", "Kulthum"}, 2524 {int64(2), "changed 2", "Khil"}, 2525 {int64(3), "changed 3", "McDonald"}, 2526 } 2527 if !testEqual(got, want) { 2528 t.Errorf("\ngot %v\nwant%v", got, want) 2529 } 2530} 2531 2532func TestIntegration_BatchDML_NoStatements(t *testing.T) { 2533 t.Parallel() 2534 2535 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2536 defer cancel() 2537 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2538 defer cleanup() 2539 2540 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2541 _, err = tx.BatchUpdate(ctx, []Statement{}) 2542 return err 2543 }) 2544 if err == nil { 2545 t.Fatal("expected error, got nil") 2546 } 2547 if s, ok := status.FromError(err); ok { 2548 if s.Code() != codes.InvalidArgument { 2549 t.Fatalf("expected InvalidArgument, got %v", err) 2550 } 2551 } else { 2552 t.Fatalf("expected InvalidArgument, got %v", err) 2553 } 2554} 2555 2556func TestIntegration_BatchDML_TwoStatements(t *testing.T) { 2557 skipEmulatorTest(t) 2558 t.Parallel() 2559 2560 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2561 defer cancel() 2562 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2563 defer cleanup() 2564 2565 columns := []string{"SingerId", "FirstName", "LastName"} 2566 2567 // Populate the Singers table. 2568 var muts []*Mutation 2569 for _, row := range [][]interface{}{ 2570 {1, "Umm", "Kulthum"}, 2571 {2, "Eduard", "Khil"}, 2572 {3, "Audra", "McDonald"}, 2573 } { 2574 muts = append(muts, Insert("Singers", columns, row)) 2575 } 2576 if _, err := client.Apply(ctx, muts); err != nil { 2577 t.Fatal(err) 2578 } 2579 2580 var updateCount int64 2581 var batchCounts []int64 2582 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2583 batchCounts, err = tx.BatchUpdate(ctx, []Statement{ 2584 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2585 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`}, 2586 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2587 }) 2588 if err != nil { 2589 return err 2590 } 2591 2592 updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}) 2593 return err 2594 }) 2595 if err != nil { 2596 t.Fatal(err) 2597 } 2598 if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) { 2599 t.Fatalf("got %d, want %d", batchCounts, want) 2600 } 2601 if updateCount != 1 { 2602 t.Fatalf("got %v, want 1", updateCount) 2603 } 2604} 2605 2606// TODO(deklerk): this currently does not work because the transaction appears to 2607// get rolled back after a single statement fails. b/120158761 2608func TestIntegration_BatchDML_Error(t *testing.T) { 2609 t.Parallel() 2610 2611 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2612 defer cancel() 2613 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2614 defer cleanup() 2615 2616 columns := []string{"SingerId", "FirstName", "LastName"} 2617 2618 // Populate the Singers table. 2619 var muts []*Mutation 2620 for _, row := range [][]interface{}{ 2621 {1, "Umm", "Kulthum"}, 2622 {2, "Eduard", "Khil"}, 2623 {3, "Audra", "McDonald"}, 2624 } { 2625 muts = append(muts, Insert("Singers", columns, row)) 2626 } 2627 if _, err := client.Apply(ctx, muts); err != nil { 2628 t.Fatal(err) 2629 } 2630 2631 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2632 counts, err := tx.BatchUpdate(ctx, []Statement{ 2633 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2634 {SQL: `some illegal statement`}, 2635 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2636 }) 2637 if err == nil { 2638 t.Fatal("expected err, got nil") 2639 } 2640 if want := []int64{1}; !testEqual(counts, want) { 2641 t.Fatalf("got %d, want %d", counts, want) 2642 } 2643 2644 got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns)) 2645 if err != nil { 2646 t.Fatal(err) 2647 } 2648 want := [][]interface{}{ 2649 {int64(1), "changed 1", "Kulthum"}, 2650 {int64(2), "Eduard", "Khil"}, 2651 {int64(3), "Audra", "McDonald"}, 2652 } 2653 if !testEqual(got, want) { 2654 t.Errorf("\ngot %v\nwant%v", got, want) 2655 } 2656 2657 return nil 2658 }) 2659 if err != nil { 2660 t.Fatal(err) 2661 } 2662} 2663 2664// Prepare initializes Cloud Spanner testing DB and clients. 2665func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) { 2666 if databaseAdmin == nil { 2667 t.Skip("Integration tests skipped") 2668 } 2669 // Construct a unique test DB name. 2670 dbName := dbNameSpace.New() 2671 2672 dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName) 2673 // Create database and tables. 2674 op, err := databaseAdmin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{ 2675 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 2676 CreateStatement: "CREATE DATABASE " + dbName, 2677 ExtraStatements: statements, 2678 }) 2679 if err != nil { 2680 t.Fatalf("cannot create testing DB %v: %v", dbPath, err) 2681 } 2682 if _, err := op.Wait(ctx); err != nil { 2683 t.Fatalf("cannot create testing DB %v: %v", dbPath, err) 2684 } 2685 client, err := createClient(ctx, dbPath, spc) 2686 if err != nil { 2687 t.Fatalf("cannot create data client on DB %v: %v", dbPath, err) 2688 } 2689 return client, dbPath, func() { 2690 client.Close() 2691 } 2692} 2693 2694func cleanupInstances() { 2695 if instanceAdmin == nil { 2696 // Integration tests skipped. 2697 return 2698 } 2699 2700 ctx := context.Background() 2701 parent := fmt.Sprintf("projects/%v", testProjectID) 2702 iter := instanceAdmin.ListInstances(ctx, &instancepb.ListInstancesRequest{ 2703 Parent: parent, 2704 Filter: "name:gotest-", 2705 }) 2706 expireAge := 24 * time.Hour 2707 2708 for { 2709 inst, err := iter.Next() 2710 if err == iterator.Done { 2711 break 2712 } 2713 if err != nil { 2714 panic(err) 2715 } 2716 if instanceNameSpace.Older(inst.Name, expireAge) { 2717 log.Printf("Deleting instance %s", inst.Name) 2718 2719 if err := instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{Name: inst.Name}); err != nil { 2720 log.Printf("failed to delete instance %s (error %v), might need a manual removal", 2721 inst.Name, err) 2722 } 2723 } 2724 } 2725} 2726 2727func rangeReads(ctx context.Context, t *testing.T, client *Client) { 2728 checkRange := func(ks KeySet, wantNums ...int) { 2729 if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok { 2730 t.Errorf("key set %+v: %s", ks, msg) 2731 } 2732 } 2733 2734 checkRange(Key{"k1"}, 1) 2735 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4) 2736 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5) 2737 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5) 2738 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4) 2739 2740 // Partial key specification. 2741 checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9) 2742 checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9) 2743 checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10) 2744 checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11) 2745 2746 // The following produce empty ranges. 2747 // TODO(jba): Consider a multi-part key to illustrate partial key behavior. 2748 // checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen}) 2749 // checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen}) 2750 // checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen}) 2751 // checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed}) 2752 2753 // Prefix is component-wise, not string prefix. 2754 checkRange(Key{"k1"}.AsPrefix(), 1) 2755 checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14) 2756 2757 checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14) 2758} 2759 2760func indexRangeReads(ctx context.Context, t *testing.T, client *Client) { 2761 checkRange := func(ks KeySet, wantNums ...int) { 2762 if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns), 2763 wantNums); !ok { 2764 t.Errorf("key set %+v: %s", ks, msg) 2765 } 2766 } 2767 2768 checkRange(Key{"v1"}, 1) 2769 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4) 2770 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5) 2771 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5) 2772 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4) 2773 2774 // // Partial key specification. 2775 checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9) 2776 checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9) 2777 checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10) 2778 checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11) 2779 2780 // // The following produce empty ranges. 2781 // checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen}) 2782 // checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen}) 2783 // checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen}) 2784 // checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed}) 2785 2786 // // Prefix is component-wise, not string prefix. 2787 checkRange(Key{"v1"}.AsPrefix(), 1) 2788 checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14) 2789 checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14) 2790 2791 // Read from an index with DESC ordering. 2792 wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0} 2793 if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns), 2794 wantNums); !ok { 2795 t.Errorf("desc: %s", msg) 2796 } 2797} 2798 2799type testTableRow struct{ Key, StringValue string } 2800 2801func compareRows(iter *RowIterator, wantNums []int) (string, bool) { 2802 rows, err := readAllTestTable(iter) 2803 if err != nil { 2804 return err.Error(), false 2805 } 2806 want := map[string]string{} 2807 for _, n := range wantNums { 2808 want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n) 2809 } 2810 got := map[string]string{} 2811 for _, r := range rows { 2812 got[r.Key] = r.StringValue 2813 } 2814 if !testEqual(got, want) { 2815 return fmt.Sprintf("got %v, want %v", got, want), false 2816 } 2817 return "", true 2818} 2819 2820func isNaN(x interface{}) bool { 2821 f, ok := x.(float64) 2822 if !ok { 2823 return false 2824 } 2825 return math.IsNaN(f) 2826} 2827 2828// createClient creates Cloud Spanner data client. 2829func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) { 2830 if os.Getenv("SPANNER_EMULATOR_HOST") == "" { 2831 client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{ 2832 SessionPoolConfig: spc, 2833 }, option.WithTokenSource(testutil.TokenSource(ctx, Scope, AdminScope)), option.WithEndpoint(endpoint)) 2834 } else { 2835 // When the emulator is enabled, option.WithoutAuthentication() 2836 // will be added automatically which is incompatible with 2837 // option.WithTokenSource(testutil.TokenSource(ctx, Scope)). 2838 client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}) 2839 } 2840 2841 if err != nil { 2842 return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err) 2843 } 2844 return client, nil 2845} 2846 2847// populate prepares the database with some data. 2848func populate(ctx context.Context, client *Client) error { 2849 // Populate data 2850 var err error 2851 m := InsertMap("test", map[string]interface{}{ 2852 "a": str1, 2853 "b": str2, 2854 }) 2855 _, err = client.Apply(ctx, []*Mutation{m}) 2856 return err 2857} 2858 2859func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) { 2860 if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) { 2861 return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false 2862 } 2863 return "", true 2864} 2865 2866func rowToValues(r *Row) ([]interface{}, error) { 2867 var x int64 2868 var y, z string 2869 if err := r.Column(0, &x); err != nil { 2870 return nil, err 2871 } 2872 if err := r.Column(1, &y); err != nil { 2873 return nil, err 2874 } 2875 if err := r.Column(2, &z); err != nil { 2876 return nil, err 2877 } 2878 return []interface{}{x, y, z}, nil 2879} 2880 2881func readAll(iter *RowIterator) ([][]interface{}, error) { 2882 defer iter.Stop() 2883 var vals [][]interface{} 2884 for { 2885 row, err := iter.Next() 2886 if err == iterator.Done { 2887 return vals, nil 2888 } 2889 if err != nil { 2890 return nil, err 2891 } 2892 v, err := rowToValues(row) 2893 if err != nil { 2894 return nil, err 2895 } 2896 vals = append(vals, v) 2897 } 2898} 2899 2900func readAllTestTable(iter *RowIterator) ([]testTableRow, error) { 2901 defer iter.Stop() 2902 var vals []testTableRow 2903 for { 2904 row, err := iter.Next() 2905 if err == iterator.Done { 2906 return vals, nil 2907 } 2908 if err != nil { 2909 return nil, err 2910 } 2911 var ttr testTableRow 2912 if err := row.ToStruct(&ttr); err != nil { 2913 return nil, err 2914 } 2915 vals = append(vals, ttr) 2916 } 2917} 2918 2919func maxDuration(a, b time.Duration) time.Duration { 2920 if a > b { 2921 return a 2922 } 2923 return b 2924} 2925 2926func skipEmulatorTest(t *testing.T) { 2927 if os.Getenv("SPANNER_EMULATOR_HOST") != "" { 2928 t.Skip("Skipping testing against the emulator.") 2929 } 2930} 2931