1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "errors" 22 "flag" 23 "fmt" 24 "log" 25 "math" 26 "os" 27 "reflect" 28 "strings" 29 "sync" 30 "testing" 31 "time" 32 33 "cloud.google.com/go/civil" 34 "cloud.google.com/go/internal/testutil" 35 "cloud.google.com/go/internal/uid" 36 database "cloud.google.com/go/spanner/admin/database/apiv1" 37 instance "cloud.google.com/go/spanner/admin/instance/apiv1" 38 "google.golang.org/api/iterator" 39 "google.golang.org/api/option" 40 adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" 41 instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1" 42 "google.golang.org/grpc" 43 "google.golang.org/grpc/codes" 44 "google.golang.org/grpc/status" 45) 46 47var ( 48 // testProjectID specifies the project used for testing. It can be changed 49 // by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID. 50 testProjectID = testutil.ProjID() 51 52 dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true}) 53 instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true}) 54 testInstanceID = instanceNameSpace.New() 55 56 testTable = "TestTable" 57 testTableIndex = "TestTableByValue" 58 testTableColumns = []string{"Key", "StringValue"} 59 60 databaseAdmin *database.DatabaseAdminClient 61 instanceAdmin *instance.InstanceAdminClient 62 63 singerDBStatements = []string{ 64 `CREATE TABLE Singers ( 65 SingerId INT64 NOT NULL, 66 FirstName STRING(1024), 67 LastName STRING(1024), 68 SingerInfo BYTES(MAX) 69 ) PRIMARY KEY (SingerId)`, 70 `CREATE INDEX SingerByName ON Singers(FirstName, LastName)`, 71 `CREATE TABLE Accounts ( 72 AccountId INT64 NOT NULL, 73 Nickname STRING(100), 74 Balance INT64 NOT NULL, 75 ) PRIMARY KEY (AccountId)`, 76 `CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`, 77 `CREATE TABLE Types ( 78 RowID INT64 NOT NULL, 79 String STRING(MAX), 80 StringArray ARRAY<STRING(MAX)>, 81 Bytes BYTES(MAX), 82 BytesArray ARRAY<BYTES(MAX)>, 83 Int64a INT64, 84 Int64Array ARRAY<INT64>, 85 Bool BOOL, 86 BoolArray ARRAY<BOOL>, 87 Float64 FLOAT64, 88 Float64Array ARRAY<FLOAT64>, 89 Date DATE, 90 DateArray ARRAY<DATE>, 91 Timestamp TIMESTAMP, 92 TimestampArray ARRAY<TIMESTAMP>, 93 ) PRIMARY KEY (RowID)`, 94 } 95 96 readDBStatements = []string{ 97 `CREATE TABLE TestTable ( 98 Key STRING(MAX) NOT NULL, 99 StringValue STRING(MAX) 100 ) PRIMARY KEY (Key)`, 101 `CREATE INDEX TestTableByValue ON TestTable(StringValue)`, 102 `CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`, 103 } 104 105 simpleDBStatements = []string{ 106 `CREATE TABLE test ( 107 a STRING(1024), 108 b STRING(1024), 109 ) PRIMARY KEY (a)`, 110 } 111 simpleDBTableColumns = []string{"a", "b"} 112 113 ctsDBStatements = []string{ 114 `CREATE TABLE TestTable ( 115 Key STRING(MAX) NOT NULL, 116 Ts TIMESTAMP OPTIONS (allow_commit_timestamp = true), 117 ) PRIMARY KEY (Key)`, 118 } 119) 120 121const ( 122 str1 = "alice" 123 str2 = "a@example.com" 124) 125 126func TestMain(m *testing.M) { 127 cleanup := initIntegrationTests() 128 res := m.Run() 129 cleanup() 130 os.Exit(res) 131} 132 133var grpcHeaderChecker = testutil.DefaultHeadersEnforcer() 134 135func initIntegrationTests() (cleanup func()) { 136 ctx := context.Background() 137 flag.Parse() // Needed for testing.Short(). 138 noop := func() {} 139 140 if testing.Short() { 141 log.Println("Integration tests skipped in -short mode.") 142 return noop 143 } 144 145 if testProjectID == "" { 146 log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing") 147 return noop 148 } 149 150 ts := testutil.TokenSource(ctx, AdminScope, Scope) 151 if ts == nil { 152 log.Printf("Integration test skipped: cannot get service account credential from environment variable %v", "GCLOUD_TESTS_GOLANG_KEY") 153 return noop 154 } 155 var err error 156 157 opts := append(grpcHeaderChecker.CallOptions(), option.WithTokenSource(ts), option.WithEndpoint(endpoint)) 158 159 // Run integration tests against the given emulator. Currently, the database and 160 // instance admin clients are auto-generated, which do not support to configure 161 // SPANNER_EMULATOR_HOST. 162 emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST") 163 if emulatorAddr != "" { 164 opts = append( 165 grpcHeaderChecker.CallOptions(), 166 option.WithEndpoint(emulatorAddr), 167 option.WithGRPCDialOption(grpc.WithInsecure()), 168 option.WithoutAuthentication(), 169 ) 170 } 171 172 // Create InstanceAdmin and DatabaseAdmin clients. 173 instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...) 174 if err != nil { 175 log.Fatalf("cannot create instance databaseAdmin client: %v", err) 176 } 177 databaseAdmin, err = database.NewDatabaseAdminClient(ctx, opts...) 178 if err != nil { 179 log.Fatalf("cannot create databaseAdmin client: %v", err) 180 } 181 // Get the list of supported instance configs for the project that is used 182 // for the integration tests. The supported instance configs can differ per 183 // project. The integration tests will use the first instance config that 184 // is returned by Cloud Spanner. This will normally be the regional config 185 // that is physically the closest to where the request is coming from. 186 configIterator := instanceAdmin.ListInstanceConfigs(ctx, &instancepb.ListInstanceConfigsRequest{ 187 Parent: fmt.Sprintf("projects/%s", testProjectID), 188 }) 189 config, err := configIterator.Next() 190 if err != nil { 191 log.Fatalf("Cannot get any instance configurations.\nPlease make sure the Cloud Spanner API is enabled for the test project.\nGet error: %v", err) 192 } 193 // Create a test instance to use for this test run. 194 op, err := instanceAdmin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{ 195 Parent: fmt.Sprintf("projects/%s", testProjectID), 196 InstanceId: testInstanceID, 197 Instance: &instancepb.Instance{ 198 Config: config.Name, 199 DisplayName: testInstanceID, 200 NodeCount: 1, 201 }, 202 }) 203 if err != nil { 204 log.Fatalf("could not create instance with id %s: %v", fmt.Sprintf("projects/%s/instances/%s", testProjectID, testInstanceID), err) 205 } 206 // Wait for the instance creation to finish. 207 i, err := op.Wait(ctx) 208 if err != nil { 209 log.Fatalf("waiting for instance creation to finish failed: %v", err) 210 } 211 if i.State != instancepb.Instance_READY { 212 log.Printf("instance state is not READY, it might be that the test instance will cause problems during tests. Got state %v\n", i.State) 213 } 214 215 return func() { 216 // Delete this test instance. 217 instanceName := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID) 218 if err = instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{ 219 Name: instanceName, 220 }); err != nil { 221 log.Printf("failed to drop instance %s (error %v), might need a manual removal", 222 instanceName, err) 223 } 224 // Delete other test instances that may be lingering around. 225 cleanupInstances() 226 databaseAdmin.Close() 227 instanceAdmin.Close() 228 } 229} 230 231func TestIntegration_InitSessionPool(t *testing.T) { 232 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 233 defer cancel() 234 // Set up an empty testing environment. 235 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, []string{}) 236 defer cleanup() 237 sp := client.idleSessions 238 sp.mu.Lock() 239 want := sp.MinOpened 240 sp.mu.Unlock() 241 var numOpened int 242loop: 243 for { 244 select { 245 case <-ctx.Done(): 246 t.Fatalf("timed out, got %d session(s), want %d", numOpened, want) 247 default: 248 sp.mu.Lock() 249 numOpened = sp.idleList.Len() + sp.idleWriteList.Len() 250 sp.mu.Unlock() 251 if uint64(numOpened) == want { 252 break loop 253 } 254 } 255 } 256 // Delete all sessions in the pool on the backend and then try to execute a 257 // simple query. The 'Session not found' error should cause an automatic 258 // retry of the read-only transaction. 259 sp.mu.Lock() 260 s := sp.idleList.Front() 261 for { 262 if s == nil { 263 break 264 } 265 // This will delete the session on the backend without removing it 266 // from the pool. 267 s.Value.(*session).delete(context.Background()) 268 s = s.Next() 269 } 270 sp.mu.Unlock() 271 sql := "SELECT 1, 'FOO', 'BAR'" 272 tx := client.ReadOnlyTransaction() 273 defer tx.Close() 274 iter := tx.Query(context.Background(), NewStatement(sql)) 275 rows, err := readAll(iter) 276 if err != nil { 277 t.Fatalf("Unexpected error for query %q: %v", sql, err) 278 } 279 if got, want := len(rows), 1; got != want { 280 t.Fatalf("Row count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want) 281 } 282 if got, want := len(rows[0]), 3; got != want { 283 t.Fatalf("Column count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want) 284 } 285 if got, want := rows[0][0].(int64), int64(1); got != want { 286 t.Fatalf("Column value mismatch for query %q\nGot: %v\nWant: %v", sql, got, want) 287 } 288} 289 290// Test SingleUse transaction. 291func TestIntegration_SingleUse(t *testing.T) { 292 t.Parallel() 293 294 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 295 defer cancel() 296 // Set up testing environment. 297 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 298 defer cleanup() 299 300 writes := []struct { 301 row []interface{} 302 ts time.Time 303 }{ 304 {row: []interface{}{1, "Marc", "Foo"}}, 305 {row: []interface{}{2, "Tars", "Bar"}}, 306 {row: []interface{}{3, "Alpha", "Beta"}}, 307 {row: []interface{}{4, "Last", "End"}}, 308 } 309 // Try to write four rows through the Apply API. 310 for i, w := range writes { 311 var err error 312 m := InsertOrUpdate("Singers", 313 []string{"SingerId", "FirstName", "LastName"}, 314 w.row) 315 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 316 t.Fatal(err) 317 } 318 } 319 // Calculate time difference between Cloud Spanner server and localhost to 320 // use to determine the exact staleness value to use. 321 timeDiff := maxDuration(time.Now().Sub(writes[0].ts), 0) 322 323 // Test reading rows with different timestamp bounds. 324 for i, test := range []struct { 325 name string 326 want [][]interface{} 327 tb TimestampBound 328 checkTs func(time.Time) error 329 }{ 330 { 331 name: "strong", 332 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 333 tb: StrongRead(), 334 checkTs: func(ts time.Time) error { 335 // writes[3] is the last write, all subsequent strong read 336 // should have a timestamp larger than that. 337 if ts.Before(writes[3].ts) { 338 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 339 } 340 return nil 341 }, 342 }, 343 { 344 name: "min_read_timestamp", 345 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 346 tb: MinReadTimestamp(writes[3].ts), 347 checkTs: func(ts time.Time) error { 348 if ts.Before(writes[3].ts) { 349 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 350 } 351 return nil 352 }, 353 }, 354 { 355 name: "max_staleness", 356 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 357 tb: MaxStaleness(time.Second), 358 checkTs: func(ts time.Time) error { 359 if ts.Before(writes[3].ts) { 360 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 361 } 362 return nil 363 }, 364 }, 365 { 366 name: "read_timestamp", 367 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}}, 368 tb: ReadTimestamp(writes[2].ts), 369 checkTs: func(ts time.Time) error { 370 if ts != writes[2].ts { 371 return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts) 372 } 373 return nil 374 }, 375 }, 376 { 377 name: "exact_staleness", 378 want: nil, 379 // Specify a staleness which should be already before this test. 380 tb: ExactStaleness(time.Now().Sub(writes[0].ts) + timeDiff + 30*time.Second), 381 checkTs: func(ts time.Time) error { 382 if !ts.Before(writes[0].ts) { 383 return fmt.Errorf("read got timestamp %v, want it to be earlier than %v", ts, writes[0].ts) 384 } 385 return nil 386 }, 387 }, 388 } { 389 t.Run(test.name, func(t *testing.T) { 390 // SingleUse.Query 391 su := client.Single().WithTimestampBound(test.tb) 392 got, err := readAll(su.Query( 393 ctx, 394 Statement{ 395 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)", 396 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 397 })) 398 if err != nil { 399 t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err) 400 } 401 rts, err := su.Timestamp() 402 if err != nil { 403 t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err) 404 } 405 if err := test.checkTs(rts); err != nil { 406 t.Fatalf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err) 407 } 408 if !testEqual(got, test.want) { 409 t.Fatalf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want) 410 } 411 // SingleUse.Read 412 su = client.Single().WithTimestampBound(test.tb) 413 got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"})) 414 if err != nil { 415 t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err) 416 } 417 rts, err = su.Timestamp() 418 if err != nil { 419 t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err) 420 } 421 if err := test.checkTs(rts); err != nil { 422 t.Fatalf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err) 423 } 424 if !testEqual(got, test.want) { 425 t.Fatalf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want) 426 } 427 // SingleUse.ReadRow 428 got = nil 429 for _, k := range []Key{{1}, {3}, {4}} { 430 su = client.Single().WithTimestampBound(test.tb) 431 r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"}) 432 if err != nil { 433 continue 434 } 435 v, err := rowToValues(r) 436 if err != nil { 437 continue 438 } 439 got = append(got, v) 440 rts, err = su.Timestamp() 441 if err != nil { 442 t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err) 443 } 444 if err := test.checkTs(rts); err != nil { 445 t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err) 446 } 447 } 448 if !testEqual(got, test.want) { 449 t.Fatalf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want) 450 } 451 // SingleUse.ReadUsingIndex 452 su = client.Single().WithTimestampBound(test.tb) 453 got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"})) 454 if err != nil { 455 t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err) 456 } 457 // The results from ReadUsingIndex is sorted by the index rather than primary key. 458 if len(got) != len(test.want) { 459 t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) 460 } 461 for j, g := range got { 462 if j > 0 { 463 prev := got[j-1][1].(string) + got[j-1][2].(string) 464 curr := got[j][1].(string) + got[j][2].(string) 465 if strings.Compare(prev, curr) > 0 { 466 t.Fatalf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j]) 467 } 468 } 469 found := false 470 for _, w := range test.want { 471 if testEqual(g, w) { 472 found = true 473 } 474 } 475 if !found { 476 t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) 477 break 478 } 479 } 480 rts, err = su.Timestamp() 481 if err != nil { 482 t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err) 483 } 484 if err := test.checkTs(rts); err != nil { 485 t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err) 486 } 487 // SingleUse.ReadRowUsingIndex 488 got = nil 489 for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} { 490 su = client.Single().WithTimestampBound(test.tb) 491 r, err := su.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"}) 492 if err != nil { 493 continue 494 } 495 v, err := rowToValues(r) 496 if err != nil { 497 continue 498 } 499 got = append(got, v) 500 rts, err = su.Timestamp() 501 if err != nil { 502 t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err) 503 } 504 if err := test.checkTs(rts); err != nil { 505 t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err) 506 } 507 } 508 if !testEqual(got, test.want) { 509 t.Fatalf("%d: got unexpected results from SingleUse.ReadRowUsingIndex: %v, want %v", i, got, test.want) 510 } 511 }) 512 } 513} 514 515// Test resource-based routing enabled. 516func TestIntegration_SingleUse_WithResourceBasedRouting(t *testing.T) { 517 os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true") 518 defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "") 519 520 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 521 defer cancel() 522 // Set up testing environment. 523 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 524 defer cleanup() 525 526 writes := []struct { 527 row []interface{} 528 ts time.Time 529 }{ 530 {row: []interface{}{1, "Marc", "Foo"}}, 531 {row: []interface{}{2, "Tars", "Bar"}}, 532 {row: []interface{}{3, "Alpha", "Beta"}}, 533 {row: []interface{}{4, "Last", "End"}}, 534 } 535 // Try to write four rows through the Apply API. 536 for i, w := range writes { 537 var err error 538 m := InsertOrUpdate("Singers", 539 []string{"SingerId", "FirstName", "LastName"}, 540 w.row) 541 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 542 t.Fatal(err) 543 } 544 } 545 546 row, err := client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"}) 547 if err != nil { 548 t.Errorf("SingleUse.ReadRow returns error %v, want nil", err) 549 } 550 var got string 551 if err := row.Column(0, &got); err != nil { 552 t.Errorf("row.Column returns error %v, want nil", err) 553 } 554 if want := "Alpha"; got != want { 555 t.Errorf("got %q, want %q", got, want) 556 } 557} 558 559func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) { 560 t.Parallel() 561 562 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 563 defer cancel() 564 // Set up testing environment. 565 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 566 defer cleanup() 567 568 writes := []struct { 569 row []interface{} 570 ts time.Time 571 }{ 572 {row: []interface{}{1, "Marc", "Foo"}}, 573 {row: []interface{}{2, "Tars", "Bar"}}, 574 {row: []interface{}{3, "Alpha", "Beta"}}, 575 {row: []interface{}{4, "Last", "End"}}, 576 } 577 // Try to write four rows through the Apply API. 578 for i, w := range writes { 579 var err error 580 m := InsertOrUpdate("Singers", 581 []string{"SingerId", "FirstName", "LastName"}, 582 w.row) 583 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 584 t.Fatal(err) 585 } 586 } 587 588 su := client.Single() 589 const limit = 1 590 gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), 591 []string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit})) 592 if err != nil { 593 t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err) 594 } 595 if got, want := len(gotRows), limit; got != want { 596 t.Errorf("got %d, want %d", got, want) 597 } 598} 599 600// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it 601// also tests for a single timestamp across multiple reads. 602func TestIntegration_ReadOnlyTransaction(t *testing.T) { 603 t.Parallel() 604 605 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 606 defer cancel() 607 // Set up testing environment. 608 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 609 defer cleanup() 610 611 writes := []struct { 612 row []interface{} 613 ts time.Time 614 }{ 615 {row: []interface{}{1, "Marc", "Foo"}}, 616 {row: []interface{}{2, "Tars", "Bar"}}, 617 {row: []interface{}{3, "Alpha", "Beta"}}, 618 {row: []interface{}{4, "Last", "End"}}, 619 } 620 // Try to write four rows through the Apply API. 621 for i, w := range writes { 622 var err error 623 m := InsertOrUpdate("Singers", 624 []string{"SingerId", "FirstName", "LastName"}, 625 w.row) 626 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 627 t.Fatal(err) 628 } 629 } 630 631 // For testing timestamp bound staleness. 632 <-time.After(time.Second) 633 634 // Test reading rows with different timestamp bounds. 635 for i, test := range []struct { 636 want [][]interface{} 637 tb TimestampBound 638 checkTs func(time.Time) error 639 }{ 640 // Note: min_read_timestamp and max_staleness are not supported by 641 // ReadOnlyTransaction. See API document for more details. 642 { 643 // strong 644 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 645 StrongRead(), 646 func(ts time.Time) error { 647 if ts.Before(writes[3].ts) { 648 return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts) 649 } 650 return nil 651 }, 652 }, 653 { 654 // read_timestamp 655 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}}, 656 ReadTimestamp(writes[2].ts), 657 func(ts time.Time) error { 658 if ts != writes[2].ts { 659 return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts) 660 } 661 return nil 662 }, 663 }, 664 { 665 // exact_staleness 666 nil, 667 // Specify a staleness which should be already before this test 668 // because context timeout is set to be 10s. 669 ExactStaleness(11 * time.Second), 670 func(ts time.Time) error { 671 if ts.After(writes[0].ts) { 672 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts) 673 } 674 return nil 675 }, 676 }, 677 } { 678 // ReadOnlyTransaction.Query 679 ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb) 680 got, err := readAll(ro.Query( 681 ctx, 682 Statement{ 683 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)", 684 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 685 })) 686 if err != nil { 687 t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err) 688 } 689 if !testEqual(got, test.want) { 690 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want) 691 } 692 rts, err := ro.Timestamp() 693 if err != nil { 694 t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err) 695 } 696 if err := test.checkTs(rts); err != nil { 697 t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err) 698 } 699 roTs := rts 700 // ReadOnlyTransaction.Read 701 got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"})) 702 if err != nil { 703 t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err) 704 } 705 if !testEqual(got, test.want) { 706 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want) 707 } 708 rts, err = ro.Timestamp() 709 if err != nil { 710 t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err) 711 } 712 if err := test.checkTs(rts); err != nil { 713 t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err) 714 } 715 if roTs != rts { 716 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 717 } 718 // ReadOnlyTransaction.ReadRow 719 got = nil 720 for _, k := range []Key{{1}, {3}, {4}} { 721 r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"}) 722 if err != nil { 723 continue 724 } 725 v, err := rowToValues(r) 726 if err != nil { 727 continue 728 } 729 got = append(got, v) 730 rts, err = ro.Timestamp() 731 if err != nil { 732 t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err) 733 } 734 if err := test.checkTs(rts); err != nil { 735 t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err) 736 } 737 if roTs != rts { 738 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 739 } 740 } 741 if !testEqual(got, test.want) { 742 t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want) 743 } 744 // SingleUse.ReadUsingIndex 745 got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"})) 746 if err != nil { 747 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err) 748 } 749 // The results from ReadUsingIndex is sorted by the index rather than 750 // primary key. 751 if len(got) != len(test.want) { 752 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want) 753 } 754 for j, g := range got { 755 if j > 0 { 756 prev := got[j-1][1].(string) + got[j-1][2].(string) 757 curr := got[j][1].(string) + got[j][2].(string) 758 if strings.Compare(prev, curr) > 0 { 759 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j]) 760 } 761 } 762 found := false 763 for _, w := range test.want { 764 if testEqual(g, w) { 765 found = true 766 } 767 } 768 if !found { 769 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want) 770 break 771 } 772 } 773 rts, err = ro.Timestamp() 774 if err != nil { 775 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err) 776 } 777 if err := test.checkTs(rts); err != nil { 778 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err) 779 } 780 if roTs != rts { 781 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 782 } 783 // ReadOnlyTransaction.ReadRowUsingIndex 784 got = nil 785 for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} { 786 r, err := ro.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"}) 787 if err != nil { 788 continue 789 } 790 v, err := rowToValues(r) 791 if err != nil { 792 continue 793 } 794 got = append(got, v) 795 rts, err = ro.Timestamp() 796 if err != nil { 797 t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err) 798 } 799 if err := test.checkTs(rts); err != nil { 800 t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err) 801 } 802 if roTs != rts { 803 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 804 } 805 } 806 if !testEqual(got, test.want) { 807 t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRowUsingIndex: %v, want %v", i, got, test.want) 808 } 809 ro.Close() 810 } 811} 812 813// Test ReadOnlyTransaction with different timestamp bound when there's an 814// update at the same time. 815func TestIntegration_UpdateDuringRead(t *testing.T) { 816 t.Parallel() 817 818 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 819 defer cancel() 820 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 821 defer cleanup() 822 823 for i, tb := range []TimestampBound{ 824 StrongRead(), 825 ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour 826 ExactStaleness(time.Minute * 30), 827 } { 828 ro := client.ReadOnlyTransaction().WithTimestampBound(tb) 829 _, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"}) 830 if ErrCode(err) != codes.NotFound { 831 t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err) 832 } 833 834 m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i}) 835 if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 836 t.Fatal(err) 837 } 838 839 _, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"}) 840 if ErrCode(err) != codes.NotFound { 841 t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err) 842 } 843 } 844} 845 846// Test ReadWriteTransaction. 847func TestIntegration_ReadWriteTransaction(t *testing.T) { 848 t.Parallel() 849 850 // Give a longer deadline because of transaction backoffs. 851 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 852 defer cancel() 853 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 854 defer cleanup() 855 856 // Set up two accounts 857 accounts := []*Mutation{ 858 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 859 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 860 } 861 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 862 t.Fatal(err) 863 } 864 wg := sync.WaitGroup{} 865 866 readBalance := func(iter *RowIterator) (int64, error) { 867 defer iter.Stop() 868 var bal int64 869 for { 870 row, err := iter.Next() 871 if err == iterator.Done { 872 return bal, nil 873 } 874 if err != nil { 875 return 0, err 876 } 877 if err := row.Column(0, &bal); err != nil { 878 return 0, err 879 } 880 } 881 } 882 883 for i := 0; i < 20; i++ { 884 wg.Add(1) 885 go func(iter int) { 886 defer wg.Done() 887 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 888 // Query Foo's balance and Bar's balance. 889 bf, e := readBalance(tx.Query(ctx, 890 Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}})) 891 if e != nil { 892 return e 893 } 894 bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"})) 895 if e != nil { 896 return e 897 } 898 if bf <= 0 { 899 return nil 900 } 901 bf-- 902 bb++ 903 return tx.BufferWrite([]*Mutation{ 904 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}), 905 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}), 906 }) 907 }) 908 if err != nil { 909 t.Errorf("%d: failed to execute transaction: %v", iter, err) 910 } 911 }(i) 912 } 913 // Because of context timeout, all goroutines will eventually return. 914 wg.Wait() 915 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 916 var bf, bb int64 917 r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"}) 918 if e != nil { 919 return e 920 } 921 if ce := r.Column(0, &bf); ce != nil { 922 return ce 923 } 924 bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"})) 925 if e != nil { 926 return e 927 } 928 if bf != 30 || bb != 21 { 929 t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21) 930 } 931 return nil 932 }) 933 if err != nil { 934 t.Errorf("failed to check balances: %v", err) 935 } 936} 937 938func TestIntegration_Reads(t *testing.T) { 939 t.Parallel() 940 941 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 942 defer cancel() 943 // Set up testing environment. 944 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 945 defer cleanup() 946 947 // Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2". 948 var ms []*Mutation 949 for i := 0; i < 15; i++ { 950 ms = append(ms, InsertOrUpdate(testTable, 951 testTableColumns, 952 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)})) 953 } 954 // Don't use ApplyAtLeastOnce, so we can test the other code path. 955 if _, err := client.Apply(ctx, ms); err != nil { 956 t.Fatal(err) 957 } 958 959 // Empty read. 960 rows, err := readAllTestTable(client.Single().Read(ctx, testTable, 961 KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns)) 962 if err != nil { 963 t.Fatal(err) 964 } 965 if got, want := len(rows), 0; got != want { 966 t.Errorf("got %d, want %d", got, want) 967 } 968 969 // Index empty read. 970 rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, 971 KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns)) 972 if err != nil { 973 t.Fatal(err) 974 } 975 if got, want := len(rows), 0; got != want { 976 t.Errorf("got %d, want %d", got, want) 977 } 978 979 // Point read. 980 row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns) 981 if err != nil { 982 t.Fatal(err) 983 } 984 var got testTableRow 985 if err := row.ToStruct(&got); err != nil { 986 t.Fatal(err) 987 } 988 if want := (testTableRow{"k1", "v1"}); got != want { 989 t.Errorf("got %v, want %v", got, want) 990 } 991 992 // Point read not found. 993 _, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns) 994 if ErrCode(err) != codes.NotFound { 995 t.Fatalf("got %v, want NotFound", err) 996 } 997 998 // Index point read. 999 rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns) 1000 if err != nil { 1001 t.Fatal(err) 1002 } 1003 var gotIndex testTableRow 1004 if err := rowIndex.ToStruct(&gotIndex); err != nil { 1005 t.Fatal(err) 1006 } 1007 if wantIndex := (testTableRow{"k1", "v1"}); gotIndex != wantIndex { 1008 t.Errorf("got %v, want %v", gotIndex, wantIndex) 1009 } 1010 // Index point read not found. 1011 _, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v999"}, testTableColumns) 1012 if ErrCode(err) != codes.NotFound { 1013 t.Fatalf("got %v, want NotFound", err) 1014 } 1015 rangeReads(ctx, t, client) 1016 indexRangeReads(ctx, t, client) 1017} 1018 1019func TestIntegration_EarlyTimestamp(t *testing.T) { 1020 t.Parallel() 1021 1022 // Test that we can get the timestamp from a read-only transaction as 1023 // soon as we have read at least one row. 1024 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1025 defer cancel() 1026 // Set up testing environment. 1027 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 1028 defer cleanup() 1029 1030 var ms []*Mutation 1031 for i := 0; i < 3; i++ { 1032 ms = append(ms, InsertOrUpdate(testTable, 1033 testTableColumns, 1034 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)})) 1035 } 1036 if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil { 1037 t.Fatal(err) 1038 } 1039 1040 txn := client.Single() 1041 iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns) 1042 defer iter.Stop() 1043 // In single-use transaction, we should get an error before reading anything. 1044 if _, err := txn.Timestamp(); err == nil { 1045 t.Error("wanted error, got nil") 1046 } 1047 // After reading one row, the timestamp should be available. 1048 _, err := iter.Next() 1049 if err != nil { 1050 t.Fatal(err) 1051 } 1052 if _, err := txn.Timestamp(); err != nil { 1053 t.Errorf("got %v, want nil", err) 1054 } 1055 1056 txn = client.ReadOnlyTransaction() 1057 defer txn.Close() 1058 iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns) 1059 defer iter.Stop() 1060 // In an ordinary read-only transaction, the timestamp should be 1061 // available immediately. 1062 if _, err := txn.Timestamp(); err != nil { 1063 t.Errorf("got %v, want nil", err) 1064 } 1065} 1066 1067func TestIntegration_NestedTransaction(t *testing.T) { 1068 t.Parallel() 1069 1070 // You cannot use a transaction from inside a read-write transaction. 1071 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1072 defer cancel() 1073 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1074 defer cleanup() 1075 1076 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1077 _, err := client.ReadWriteTransaction(ctx, 1078 func(context.Context, *ReadWriteTransaction) error { return nil }) 1079 if ErrCode(err) != codes.FailedPrecondition { 1080 t.Fatalf("got %v, want FailedPrecondition", err) 1081 } 1082 _, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"}) 1083 if ErrCode(err) != codes.FailedPrecondition { 1084 t.Fatalf("got %v, want FailedPrecondition", err) 1085 } 1086 rot := client.ReadOnlyTransaction() 1087 defer rot.Close() 1088 _, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"}) 1089 if ErrCode(err) != codes.FailedPrecondition { 1090 t.Fatalf("got %v, want FailedPrecondition", err) 1091 } 1092 return nil 1093 }) 1094 if err != nil { 1095 t.Fatal(err) 1096 } 1097} 1098 1099// Test client recovery on database recreation. 1100func TestIntegration_DbRemovalRecovery(t *testing.T) { 1101 t.Parallel() 1102 1103 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1104 defer cancel() 1105 // Create a client with MinOpened=0 to prevent the session pool maintainer 1106 // from repeatedly trying to create sessions for the invalid database. 1107 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, SessionPoolConfig{}, singerDBStatements) 1108 defer cleanup() 1109 1110 // Drop the testing database. 1111 if err := databaseAdmin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil { 1112 t.Fatalf("failed to drop testing database %v: %v", dbPath, err) 1113 } 1114 1115 // Now, send the query. 1116 iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"}) 1117 defer iter.Stop() 1118 if _, err := iter.Next(); err == nil { 1119 t.Errorf("client sends query to removed database successfully, want it to fail") 1120 } 1121 1122 // Recreate database and table. 1123 dbName := dbPath[strings.LastIndex(dbPath, "/")+1:] 1124 op, err := databaseAdmin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{ 1125 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 1126 CreateStatement: "CREATE DATABASE " + dbName, 1127 ExtraStatements: []string{ 1128 `CREATE TABLE Singers ( 1129 SingerId INT64 NOT NULL, 1130 FirstName STRING(1024), 1131 LastName STRING(1024), 1132 SingerInfo BYTES(MAX) 1133 ) PRIMARY KEY (SingerId)`, 1134 }, 1135 }) 1136 if err != nil { 1137 t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err) 1138 } 1139 if _, err := op.Wait(ctx); err != nil { 1140 t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err) 1141 } 1142 1143 // Now, send the query again. 1144 iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"}) 1145 defer iter.Stop() 1146 _, err = iter.Next() 1147 if err != nil && err != iterator.Done { 1148 t.Errorf("failed to send query to database %v: %v", dbPath, err) 1149 } 1150} 1151 1152// Test encoding/decoding non-struct Cloud Spanner types. 1153func TestIntegration_BasicTypes(t *testing.T) { 1154 t.Parallel() 1155 1156 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1157 defer cancel() 1158 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1159 defer cleanup() 1160 1161 t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z") 1162 // Boundaries 1163 t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z") 1164 t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z") 1165 d1, _ := civil.ParseDate("2016-11-15") 1166 // Boundaries 1167 d2, _ := civil.ParseDate("0001-01-01") 1168 d3, _ := civil.ParseDate("9999-12-31") 1169 1170 tests := []struct { 1171 col string 1172 val interface{} 1173 want interface{} 1174 }{ 1175 {col: "String", val: ""}, 1176 {col: "String", val: "", want: NullString{"", true}}, 1177 {col: "String", val: "foo"}, 1178 {col: "String", val: "foo", want: NullString{"foo", true}}, 1179 {col: "String", val: NullString{"bar", true}, want: "bar"}, 1180 {col: "String", val: NullString{"bar", false}, want: NullString{"", false}}, 1181 {col: "String", val: nil, want: NullString{}}, 1182 {col: "StringArray", val: []string(nil), want: []NullString(nil)}, 1183 {col: "StringArray", val: []string{}, want: []NullString{}}, 1184 {col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}}, 1185 {col: "StringArray", val: []NullString(nil)}, 1186 {col: "StringArray", val: []NullString{}}, 1187 {col: "StringArray", val: []NullString{{"foo", true}, {}}}, 1188 {col: "Bytes", val: []byte{}}, 1189 {col: "Bytes", val: []byte{1, 2, 3}}, 1190 {col: "Bytes", val: []byte(nil)}, 1191 {col: "BytesArray", val: [][]byte(nil)}, 1192 {col: "BytesArray", val: [][]byte{}}, 1193 {col: "BytesArray", val: [][]byte{{1}, {2, 3}}}, 1194 {col: "Int64a", val: 0, want: int64(0)}, 1195 {col: "Int64a", val: -1, want: int64(-1)}, 1196 {col: "Int64a", val: 2, want: int64(2)}, 1197 {col: "Int64a", val: int64(3)}, 1198 {col: "Int64a", val: 4, want: NullInt64{4, true}}, 1199 {col: "Int64a", val: NullInt64{5, true}, want: int64(5)}, 1200 {col: "Int64a", val: NullInt64{6, true}, want: int64(6)}, 1201 {col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}}, 1202 {col: "Int64a", val: nil, want: NullInt64{}}, 1203 {col: "Int64Array", val: []int(nil), want: []NullInt64(nil)}, 1204 {col: "Int64Array", val: []int{}, want: []NullInt64{}}, 1205 {col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}}, 1206 {col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)}, 1207 {col: "Int64Array", val: []int64{}, want: []NullInt64{}}, 1208 {col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}}, 1209 {col: "Int64Array", val: []NullInt64(nil)}, 1210 {col: "Int64Array", val: []NullInt64{}}, 1211 {col: "Int64Array", val: []NullInt64{{1, true}, {}}}, 1212 {col: "Bool", val: false}, 1213 {col: "Bool", val: true}, 1214 {col: "Bool", val: false, want: NullBool{false, true}}, 1215 {col: "Bool", val: true, want: NullBool{true, true}}, 1216 {col: "Bool", val: NullBool{true, true}}, 1217 {col: "Bool", val: NullBool{false, false}}, 1218 {col: "Bool", val: nil, want: NullBool{}}, 1219 {col: "BoolArray", val: []bool(nil), want: []NullBool(nil)}, 1220 {col: "BoolArray", val: []bool{}, want: []NullBool{}}, 1221 {col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}}, 1222 {col: "BoolArray", val: []NullBool(nil)}, 1223 {col: "BoolArray", val: []NullBool{}}, 1224 {col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}}, 1225 {col: "Float64", val: 0.0}, 1226 {col: "Float64", val: 3.14}, 1227 {col: "Float64", val: math.NaN()}, 1228 {col: "Float64", val: math.Inf(1)}, 1229 {col: "Float64", val: math.Inf(-1)}, 1230 {col: "Float64", val: 2.78, want: NullFloat64{2.78, true}}, 1231 {col: "Float64", val: NullFloat64{2.71, true}, want: 2.71}, 1232 {col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}}, 1233 {col: "Float64", val: NullFloat64{0, false}}, 1234 {col: "Float64", val: nil, want: NullFloat64{}}, 1235 {col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)}, 1236 {col: "Float64Array", val: []float64{}, want: []NullFloat64{}}, 1237 {col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}}, 1238 {col: "Float64Array", val: []NullFloat64(nil)}, 1239 {col: "Float64Array", val: []NullFloat64{}}, 1240 {col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}}, 1241 {col: "Date", val: d1}, 1242 {col: "Date", val: d1, want: NullDate{d1, true}}, 1243 {col: "Date", val: NullDate{d1, true}}, 1244 {col: "Date", val: NullDate{d1, true}, want: d1}, 1245 {col: "Date", val: NullDate{civil.Date{}, false}}, 1246 {col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)}, 1247 {col: "DateArray", val: []civil.Date{}, want: []NullDate{}}, 1248 {col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}}, 1249 {col: "Timestamp", val: t1}, 1250 {col: "Timestamp", val: t1, want: NullTime{t1, true}}, 1251 {col: "Timestamp", val: NullTime{t1, true}}, 1252 {col: "Timestamp", val: NullTime{t1, true}, want: t1}, 1253 {col: "Timestamp", val: NullTime{}}, 1254 {col: "Timestamp", val: nil, want: NullTime{}}, 1255 {col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)}, 1256 {col: "TimestampArray", val: []time.Time{}, want: []NullTime{}}, 1257 {col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}}, 1258 } 1259 1260 // Write rows into table first. 1261 var muts []*Mutation 1262 for i, test := range tests { 1263 muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val})) 1264 } 1265 if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil { 1266 t.Fatal(err) 1267 } 1268 1269 for i, test := range tests { 1270 row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col}) 1271 if err != nil { 1272 t.Fatalf("Unable to fetch row %v: %v", i, err) 1273 } 1274 // Create new instance of type of test.want. 1275 want := test.want 1276 if want == nil { 1277 want = test.val 1278 } 1279 gotp := reflect.New(reflect.TypeOf(want)) 1280 if err := row.Column(0, gotp.Interface()); err != nil { 1281 t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err) 1282 continue 1283 } 1284 got := reflect.Indirect(gotp).Interface() 1285 1286 // One of the test cases is checking NaN handling. Given 1287 // NaN!=NaN, we can't use reflect to test for it. 1288 if isNaN(got) && isNaN(want) { 1289 continue 1290 } 1291 1292 // Check non-NaN cases. 1293 if !testEqual(got, want) { 1294 t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want) 1295 continue 1296 } 1297 } 1298} 1299 1300// Test decoding Cloud Spanner STRUCT type. 1301func TestIntegration_StructTypes(t *testing.T) { 1302 t.Parallel() 1303 1304 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1305 defer cancel() 1306 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1307 defer cleanup() 1308 1309 tests := []struct { 1310 q Statement 1311 want func(r *Row) error 1312 }{ 1313 { 1314 q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`}, 1315 want: func(r *Row) error { 1316 // Test STRUCT ARRAY decoding to []NullRow. 1317 var rows []NullRow 1318 if err := r.Column(0, &rows); err != nil { 1319 return err 1320 } 1321 if len(rows) != 1 { 1322 return fmt.Errorf("len(rows) = %d; want 1", len(rows)) 1323 } 1324 if !rows[0].Valid { 1325 return fmt.Errorf("rows[0] is NULL") 1326 } 1327 var i, j int64 1328 if err := rows[0].Row.Columns(&i, &j); err != nil { 1329 return err 1330 } 1331 if i != 1 || j != 2 { 1332 return fmt.Errorf("got (%d,%d), want (1,2)", i, j) 1333 } 1334 return nil 1335 }, 1336 }, 1337 { 1338 q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`}, 1339 want: func(r *Row) error { 1340 // Test Row.ToStruct. 1341 s := struct { 1342 Col1 []*struct { 1343 Foo int64 `spanner:"foo"` 1344 Bar int64 `spanner:"bar"` 1345 } `spanner:"col1"` 1346 }{} 1347 if err := r.ToStruct(&s); err != nil { 1348 return err 1349 } 1350 want := struct { 1351 Col1 []*struct { 1352 Foo int64 `spanner:"foo"` 1353 Bar int64 `spanner:"bar"` 1354 } `spanner:"col1"` 1355 }{ 1356 Col1: []*struct { 1357 Foo int64 `spanner:"foo"` 1358 Bar int64 `spanner:"bar"` 1359 }{ 1360 { 1361 Foo: 1, 1362 Bar: 2, 1363 }, 1364 }, 1365 } 1366 if !testEqual(want, s) { 1367 return fmt.Errorf("unexpected decoding result: %v, want %v", s, want) 1368 } 1369 return nil 1370 }, 1371 }, 1372 } 1373 for i, test := range tests { 1374 iter := client.Single().Query(ctx, test.q) 1375 defer iter.Stop() 1376 row, err := iter.Next() 1377 if err != nil { 1378 t.Errorf("%d: %v", i, err) 1379 continue 1380 } 1381 if err := test.want(row); err != nil { 1382 t.Errorf("%d: %v", i, err) 1383 continue 1384 } 1385 } 1386} 1387 1388func TestIntegration_StructParametersUnsupported(t *testing.T) { 1389 skipEmulatorTest(t) 1390 t.Parallel() 1391 1392 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1393 defer cancel() 1394 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 1395 defer cleanup() 1396 1397 for _, test := range []struct { 1398 param interface{} 1399 wantCode codes.Code 1400 wantMsgPart string 1401 }{ 1402 { 1403 struct { 1404 Field int 1405 }{10}, 1406 codes.Unimplemented, 1407 "Unsupported query shape: " + 1408 "A struct value cannot be returned as a column value. " + 1409 "Rewrite the query to flatten the struct fields in the result.", 1410 }, 1411 { 1412 []struct { 1413 Field int 1414 }{{10}, {20}}, 1415 codes.Unimplemented, 1416 "Unsupported query shape: " + 1417 "This query can return a null-valued array of struct, " + 1418 "which is not supported by Spanner.", 1419 }, 1420 } { 1421 iter := client.Single().Query(ctx, Statement{ 1422 SQL: "SELECT @p", 1423 Params: map[string]interface{}{"p": test.param}, 1424 }) 1425 _, err := iter.Next() 1426 iter.Stop() 1427 if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok { 1428 t.Fatal(msg) 1429 } 1430 } 1431} 1432 1433// Test queries of the form "SELECT expr". 1434func TestIntegration_QueryExpressions(t *testing.T) { 1435 t.Parallel() 1436 1437 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1438 defer cancel() 1439 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 1440 defer cleanup() 1441 1442 newRow := func(vals []interface{}) *Row { 1443 row, err := NewRow(make([]string, len(vals)), vals) 1444 if err != nil { 1445 t.Fatal(err) 1446 } 1447 return row 1448 } 1449 1450 tests := []struct { 1451 expr string 1452 want interface{} 1453 }{ 1454 {"1", int64(1)}, 1455 {"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}}, 1456 {"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}}, 1457 {"IEEE_DIVIDE(1, 0)", math.Inf(1)}, 1458 {"IEEE_DIVIDE(-1, 0)", math.Inf(-1)}, 1459 {"IEEE_DIVIDE(0, 0)", math.NaN()}, 1460 // TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate. 1461 {"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}}, 1462 {"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}}, 1463 {"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}}, 1464 } 1465 for _, test := range tests { 1466 iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr}) 1467 defer iter.Stop() 1468 row, err := iter.Next() 1469 if err != nil { 1470 t.Errorf("%q: %v", test.expr, err) 1471 continue 1472 } 1473 // Create new instance of type of test.want. 1474 gotp := reflect.New(reflect.TypeOf(test.want)) 1475 if err := row.Column(0, gotp.Interface()); err != nil { 1476 t.Errorf("%q: Column returned error %v", test.expr, err) 1477 continue 1478 } 1479 got := reflect.Indirect(gotp).Interface() 1480 // TODO(jba): remove isNaN special case when we have a better equality predicate. 1481 if isNaN(got) && isNaN(test.want) { 1482 continue 1483 } 1484 if !testEqual(got, test.want) { 1485 t.Errorf("%q\n got %#v\nwant %#v", test.expr, got, test.want) 1486 } 1487 } 1488} 1489 1490func TestIntegration_QueryStats(t *testing.T) { 1491 skipEmulatorTest(t) 1492 t.Parallel() 1493 1494 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1495 defer cancel() 1496 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1497 defer cleanup() 1498 1499 accounts := []*Mutation{ 1500 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 1501 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 1502 } 1503 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1504 t.Fatal(err) 1505 } 1506 const sql = "SELECT Balance FROM Accounts" 1507 1508 qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil}) 1509 if err != nil { 1510 t.Fatal(err) 1511 } 1512 if len(qp.PlanNodes) == 0 { 1513 t.Error("got zero plan nodes, expected at least one") 1514 } 1515 1516 iter := client.Single().QueryWithStats(ctx, Statement{sql, nil}) 1517 defer iter.Stop() 1518 for { 1519 _, err := iter.Next() 1520 if err == iterator.Done { 1521 break 1522 } 1523 if err != nil { 1524 t.Fatal(err) 1525 } 1526 } 1527 if iter.QueryPlan == nil { 1528 t.Error("got nil QueryPlan, expected one") 1529 } 1530 if iter.QueryStats == nil { 1531 t.Error("got nil QueryStats, expected some") 1532 } 1533} 1534 1535func TestIntegration_InvalidDatabase(t *testing.T) { 1536 t.Parallel() 1537 1538 if databaseAdmin == nil { 1539 t.Skip("Integration tests skipped") 1540 } 1541 ctx := context.Background() 1542 dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID) 1543 c, err := createClient(ctx, dbPath, SessionPoolConfig{}) 1544 // Client creation should succeed even if the database is invalid. 1545 if err != nil { 1546 t.Fatal(err) 1547 } 1548 _, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"}) 1549 if msg, ok := matchError(err, codes.NotFound, ""); !ok { 1550 t.Fatal(msg) 1551 } 1552} 1553 1554func TestIntegration_ReadErrors(t *testing.T) { 1555 t.Parallel() 1556 1557 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1558 defer cancel() 1559 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 1560 defer cleanup() 1561 1562 var ms []*Mutation 1563 for i := 0; i < 2; i++ { 1564 ms = append(ms, InsertOrUpdate(testTable, 1565 testTableColumns, 1566 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v")})) 1567 } 1568 if _, err := client.Apply(ctx, ms); err != nil { 1569 t.Fatal(err) 1570 } 1571 1572 // Read over invalid table fails 1573 _, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"}) 1574 if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok { 1575 t.Error(msg) 1576 } 1577 // Read over invalid column fails 1578 _, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"}) 1579 if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok { 1580 t.Error(msg) 1581 } 1582 1583 // Invalid query fails 1584 iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"}) 1585 defer iter.Stop() 1586 _, err = iter.Next() 1587 if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok { 1588 t.Error(msg) 1589 } 1590 1591 // Read should fail on cancellation. 1592 cctx, cancel := context.WithCancel(ctx) 1593 cancel() 1594 _, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"}) 1595 if msg, ok := matchError(err, codes.Canceled, ""); !ok { 1596 t.Error(msg) 1597 } 1598 // Read should fail if deadline exceeded. 1599 dctx, cancel := context.WithTimeout(ctx, time.Nanosecond) 1600 defer cancel() 1601 <-dctx.Done() 1602 _, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"}) 1603 if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok { 1604 t.Error(msg) 1605 } 1606 // Read should fail if there are multiple rows returned. 1607 _, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v"}, testTableColumns) 1608 wantMsgPart := fmt.Sprintf("more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", testTable, Key{"v"}, testTableIndex) 1609 if msg, ok := matchError(err, codes.FailedPrecondition, wantMsgPart); !ok { 1610 t.Error(msg) 1611 } 1612} 1613 1614// Test TransactionRunner. Test that transactions are aborted and retried as 1615// expected. 1616func TestIntegration_TransactionRunner(t *testing.T) { 1617 skipEmulatorTest(t) 1618 t.Parallel() 1619 1620 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1621 defer cancel() 1622 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1623 defer cleanup() 1624 1625 // Test 1: User error should abort the transaction. 1626 _, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1627 tx.BufferWrite([]*Mutation{ 1628 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})}) 1629 return errors.New("user error") 1630 }) 1631 // Empty read. 1632 rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"})) 1633 if err != nil { 1634 t.Fatal(err) 1635 } 1636 if got, want := len(rows), 0; got != want { 1637 t.Errorf("Empty read, got %d, want %d.", got, want) 1638 } 1639 1640 // Test 2: Expect abort and retry. 1641 // We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by 1642 // committing writes to the column txn2 have read, and expect the following 1643 // read to abort and txn2 retries. 1644 1645 // Set up two accounts 1646 accounts := []*Mutation{ 1647 Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}), 1648 Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}), 1649 } 1650 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1651 t.Fatal(err) 1652 } 1653 1654 var ( 1655 cTxn1Start = make(chan struct{}) 1656 cTxn1Commit = make(chan struct{}) 1657 cTxn2Start = make(chan struct{}) 1658 wg sync.WaitGroup 1659 ) 1660 1661 // read balance, check error if we don't expect abort. 1662 readBalance := func(tx interface { 1663 ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) 1664 }, key int64, expectAbort bool) (int64, error) { 1665 var b int64 1666 r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"}) 1667 if e != nil { 1668 if expectAbort && !isAbortErr(e) { 1669 t.Errorf("ReadRow got %v, want Abort error.", e) 1670 } 1671 // Verify that we received and are able to extract retry info from 1672 // the aborted error. 1673 if _, hasRetryInfo := extractRetryDelay(e); !hasRetryInfo { 1674 t.Errorf("Got Abort error without RetryInfo\nGot: %v", e) 1675 } 1676 return b, e 1677 } 1678 if ce := r.Column(0, &b); ce != nil { 1679 return b, ce 1680 } 1681 return b, nil 1682 } 1683 1684 wg.Add(2) 1685 // Txn 1 1686 go func() { 1687 defer wg.Done() 1688 var once sync.Once 1689 _, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1690 b, e := readBalance(tx, 1, false) 1691 if e != nil { 1692 return e 1693 } 1694 // txn 1 can abort, in that case we skip closing the channel on 1695 // retry. 1696 once.Do(func() { close(cTxn1Start) }) 1697 e = tx.BufferWrite([]*Mutation{ 1698 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})}) 1699 if e != nil { 1700 return e 1701 } 1702 // Wait for second transaction. 1703 <-cTxn2Start 1704 return nil 1705 }) 1706 close(cTxn1Commit) 1707 if e != nil { 1708 t.Errorf("Transaction 1 commit, got %v, want nil.", e) 1709 } 1710 }() 1711 // Txn 2 1712 go func() { 1713 // Wait until txn 1 starts. 1714 <-cTxn1Start 1715 defer wg.Done() 1716 var ( 1717 once sync.Once 1718 b1 int64 1719 b2 int64 1720 e error 1721 ) 1722 _, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1723 if b1, e = readBalance(tx, 1, false); e != nil { 1724 return e 1725 } 1726 // Skip closing channel on retry. 1727 once.Do(func() { close(cTxn2Start) }) 1728 // Wait until txn 1 successfully commits. 1729 <-cTxn1Commit 1730 // Txn1 has committed and written a balance to the account. Now this 1731 // transaction (txn2) reads and re-writes the balance. The first 1732 // time through, it will abort because it overlaps with txn1. Then 1733 // it will retry after txn1 commits, and succeed. 1734 if b2, e = readBalance(tx, 2, true); e != nil { 1735 return e 1736 } 1737 return tx.BufferWrite([]*Mutation{ 1738 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})}) 1739 }) 1740 if e != nil { 1741 t.Errorf("Transaction 2 commit, got %v, want nil.", e) 1742 } 1743 }() 1744 wg.Wait() 1745 // Check that both transactions' effects are visible. 1746 for i := int64(1); i <= int64(2); i++ { 1747 if b, e := readBalance(client.Single(), i, false); e != nil { 1748 t.Fatalf("ReadBalance for key %d error %v.", i, e) 1749 } else if b != i { 1750 t.Errorf("Balance for key %d, got %d, want %d.", i, b, i) 1751 } 1752 } 1753} 1754 1755// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then 1756// serialize and deserialize both transaction and partition to be used in 1757// execution on another client, and compare results. 1758func TestIntegration_BatchQuery(t *testing.T) { 1759 skipEmulatorTest(t) 1760 t.Parallel() 1761 1762 // Set up testing environment. 1763 var ( 1764 client2 *Client 1765 err error 1766 ) 1767 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1768 defer cancel() 1769 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 1770 defer cleanup() 1771 1772 if err = populate(ctx, client); err != nil { 1773 t.Fatal(err) 1774 } 1775 if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil { 1776 t.Fatal(err) 1777 } 1778 defer client2.Close() 1779 1780 // PartitionQuery 1781 var ( 1782 txn *BatchReadOnlyTransaction 1783 partitions []*Partition 1784 stmt = Statement{SQL: "SELECT * FROM test;"} 1785 ) 1786 1787 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1788 t.Fatal(err) 1789 } 1790 defer txn.Cleanup(ctx) 1791 if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil { 1792 t.Fatal(err) 1793 } 1794 1795 // Reconstruct BatchReadOnlyTransactionID and execute partitions 1796 var ( 1797 tid2 BatchReadOnlyTransactionID 1798 data []byte 1799 gotResult bool // if we get matching result from two separate txns 1800 ) 1801 if data, err = txn.ID.MarshalBinary(); err != nil { 1802 t.Fatalf("encoding failed %v", err) 1803 } 1804 if err = tid2.UnmarshalBinary(data); err != nil { 1805 t.Fatalf("decoding failed %v", err) 1806 } 1807 txn2 := client2.BatchReadOnlyTransactionFromID(tid2) 1808 1809 // Execute Partitions and compare results 1810 for i, p := range partitions { 1811 iter := txn.Execute(ctx, p) 1812 defer iter.Stop() 1813 p2 := serdesPartition(t, i, p) 1814 iter2 := txn2.Execute(ctx, &p2) 1815 defer iter2.Stop() 1816 1817 row1, err1 := iter.Next() 1818 row2, err2 := iter2.Next() 1819 if err1 != err2 { 1820 t.Fatalf("execution failed for different reasons: %v, %v", err1, err2) 1821 continue 1822 } 1823 if !testEqual(row1, row2) { 1824 t.Fatalf("execution returned different values: %v, %v", row1, row2) 1825 continue 1826 } 1827 if row1 == nil { 1828 continue 1829 } 1830 var a, b string 1831 if err = row1.Columns(&a, &b); err != nil { 1832 t.Fatalf("failed to parse row %v", err) 1833 continue 1834 } 1835 if a == str1 && b == str2 { 1836 gotResult = true 1837 } 1838 } 1839 if !gotResult { 1840 t.Fatalf("execution didn't return expected values") 1841 } 1842} 1843 1844// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery 1845func TestIntegration_BatchRead(t *testing.T) { 1846 skipEmulatorTest(t) 1847 t.Parallel() 1848 1849 // Set up testing environment. 1850 var ( 1851 client2 *Client 1852 err error 1853 ) 1854 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1855 defer cancel() 1856 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 1857 defer cleanup() 1858 1859 if err = populate(ctx, client); err != nil { 1860 t.Fatal(err) 1861 } 1862 if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil { 1863 t.Fatal(err) 1864 } 1865 defer client2.Close() 1866 1867 // PartitionRead 1868 var ( 1869 txn *BatchReadOnlyTransaction 1870 partitions []*Partition 1871 ) 1872 1873 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1874 t.Fatal(err) 1875 } 1876 defer txn.Cleanup(ctx) 1877 if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { 1878 t.Fatal(err) 1879 } 1880 1881 // Reconstruct BatchReadOnlyTransactionID and execute partitions. 1882 var ( 1883 tid2 BatchReadOnlyTransactionID 1884 data []byte 1885 gotResult bool // if we get matching result from two separate txns 1886 ) 1887 if data, err = txn.ID.MarshalBinary(); err != nil { 1888 t.Fatalf("encoding failed %v", err) 1889 } 1890 if err = tid2.UnmarshalBinary(data); err != nil { 1891 t.Fatalf("decoding failed %v", err) 1892 } 1893 txn2 := client2.BatchReadOnlyTransactionFromID(tid2) 1894 1895 // Execute Partitions and compare results. 1896 for i, p := range partitions { 1897 iter := txn.Execute(ctx, p) 1898 defer iter.Stop() 1899 p2 := serdesPartition(t, i, p) 1900 iter2 := txn2.Execute(ctx, &p2) 1901 defer iter2.Stop() 1902 1903 row1, err1 := iter.Next() 1904 row2, err2 := iter2.Next() 1905 if err1 != err2 { 1906 t.Fatalf("execution failed for different reasons: %v, %v", err1, err2) 1907 continue 1908 } 1909 if !testEqual(row1, row2) { 1910 t.Fatalf("execution returned different values: %v, %v", row1, row2) 1911 continue 1912 } 1913 if row1 == nil { 1914 continue 1915 } 1916 var a, b string 1917 if err = row1.Columns(&a, &b); err != nil { 1918 t.Fatalf("failed to parse row %v", err) 1919 continue 1920 } 1921 if a == str1 && b == str2 { 1922 gotResult = true 1923 } 1924 } 1925 if !gotResult { 1926 t.Fatalf("execution didn't return expected values") 1927 } 1928} 1929 1930// Test normal txReadEnv method on BatchReadOnlyTransaction. 1931func TestIntegration_BROTNormal(t *testing.T) { 1932 skipEmulatorTest(t) 1933 t.Parallel() 1934 1935 // Set up testing environment and create txn. 1936 var ( 1937 txn *BatchReadOnlyTransaction 1938 err error 1939 row *Row 1940 i int64 1941 ) 1942 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1943 defer cancel() 1944 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 1945 defer cleanup() 1946 1947 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1948 t.Fatal(err) 1949 } 1950 defer txn.Cleanup(ctx) 1951 if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { 1952 t.Fatal(err) 1953 } 1954 // Normal query should work with BatchReadOnlyTransaction. 1955 stmt2 := Statement{SQL: "SELECT 1"} 1956 iter := txn.Query(ctx, stmt2) 1957 defer iter.Stop() 1958 1959 row, err = iter.Next() 1960 if err != nil { 1961 t.Errorf("query failed with %v", err) 1962 } 1963 if err = row.Columns(&i); err != nil { 1964 t.Errorf("failed to parse row %v", err) 1965 } 1966} 1967 1968func TestIntegration_CommitTimestamp(t *testing.T) { 1969 t.Parallel() 1970 1971 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1972 defer cancel() 1973 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, ctsDBStatements) 1974 defer cleanup() 1975 1976 type testTableRow struct { 1977 Key string 1978 Ts NullTime 1979 } 1980 1981 var ( 1982 cts1, cts2, ts1, ts2 time.Time 1983 err error 1984 ) 1985 1986 // Apply mutation in sequence, expect to see commit timestamp in good order, 1987 // check also the commit timestamp returned 1988 for _, it := range []struct { 1989 k string 1990 t *time.Time 1991 }{ 1992 {"a", &cts1}, 1993 {"b", &cts2}, 1994 } { 1995 tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}} 1996 m, err := InsertStruct("TestTable", tt) 1997 if err != nil { 1998 t.Fatal(err) 1999 } 2000 *it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()) 2001 if err != nil { 2002 t.Fatal(err) 2003 } 2004 } 2005 2006 txn := client.ReadOnlyTransaction() 2007 for _, it := range []struct { 2008 k string 2009 t *time.Time 2010 }{ 2011 {"a", &ts1}, 2012 {"b", &ts2}, 2013 } { 2014 if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil { 2015 t.Fatal(err) 2016 } else { 2017 var got testTableRow 2018 if err := r.ToStruct(&got); err != nil { 2019 t.Fatal(err) 2020 } 2021 *it.t = got.Ts.Time 2022 } 2023 } 2024 if !cts1.Equal(ts1) { 2025 t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1) 2026 } 2027 if !cts2.Equal(ts2) { 2028 t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2) 2029 } 2030 2031 // Try writing a timestamp in the future to commit timestamp, expect error. 2032 _, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce()) 2033 if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok { 2034 t.Error(msg) 2035 } 2036} 2037 2038func TestIntegration_DML(t *testing.T) { 2039 t.Parallel() 2040 2041 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2042 defer cancel() 2043 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2044 defer cleanup() 2045 2046 // Function that reads a single row's first name from within a transaction. 2047 readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) { 2048 row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"}) 2049 if err != nil { 2050 return "", err 2051 } 2052 var fn string 2053 if err := row.Column(0, &fn); err != nil { 2054 return "", err 2055 } 2056 return fn, nil 2057 } 2058 2059 // Function that reads multiple rows' first names from outside a read/write 2060 // transaction. 2061 readFirstNames := func(keys ...int) []string { 2062 var ks []KeySet 2063 for _, k := range keys { 2064 ks = append(ks, Key{k}) 2065 } 2066 iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"}) 2067 var got []string 2068 var fn string 2069 err := iter.Do(func(row *Row) error { 2070 if err := row.Column(0, &fn); err != nil { 2071 return err 2072 } 2073 got = append(got, fn) 2074 return nil 2075 }) 2076 if err != nil { 2077 t.Fatalf("readFirstNames(%v): %v", keys, err) 2078 } 2079 return got 2080 } 2081 2082 // Use ReadWriteTransaction.Query to execute a DML statement. 2083 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2084 iter := tx.Query(ctx, Statement{ 2085 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`, 2086 }) 2087 defer iter.Stop() 2088 if row, err := iter.Next(); err != iterator.Done { 2089 t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err) 2090 } 2091 if iter.RowCount != 1 { 2092 t.Errorf("row count: got %d, want 1", iter.RowCount) 2093 } 2094 // The results of the DML statement should be visible to the transaction. 2095 got, err := readFirstName(tx, 1) 2096 if err != nil { 2097 return err 2098 } 2099 if want := "Umm"; got != want { 2100 t.Errorf("got %q, want %q", got, want) 2101 } 2102 return nil 2103 }) 2104 if err != nil { 2105 t.Fatal(err) 2106 } 2107 2108 // Use ReadWriteTransaction.Update to execute a DML statement. 2109 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2110 count, err := tx.Update(ctx, Statement{ 2111 SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`, 2112 }) 2113 if err != nil { 2114 t.Fatal(err) 2115 } 2116 if count != 1 { 2117 t.Errorf("row count: got %d, want 1", count) 2118 } 2119 got, err := readFirstName(tx, 2) 2120 if err != nil { 2121 return err 2122 } 2123 if want := "Eduard"; got != want { 2124 t.Errorf("got %q, want %q", got, want) 2125 } 2126 return nil 2127 2128 }) 2129 if err != nil { 2130 t.Fatal(err) 2131 } 2132 2133 // Roll back a DML statement and confirm that it didn't happen. 2134 var fail = errors.New("fail") 2135 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2136 _, err := tx.Update(ctx, Statement{ 2137 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`, 2138 }) 2139 if err != nil { 2140 return err 2141 } 2142 return fail 2143 }) 2144 if err != fail { 2145 t.Fatalf("rolling back: got error %v, want the error 'fail'", err) 2146 } 2147 _, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"}) 2148 if got, want := ErrCode(err), codes.NotFound; got != want { 2149 t.Errorf("got %s, want %s", got, want) 2150 } 2151 2152 // Run two DML statements in the same transaction. 2153 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2154 _, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`}) 2155 if err != nil { 2156 return err 2157 } 2158 _, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`}) 2159 if err != nil { 2160 return err 2161 } 2162 return nil 2163 }) 2164 if err != nil { 2165 t.Fatal(err) 2166 } 2167 got := readFirstNames(1, 2) 2168 want := []string{"Oum", "Eddie"} 2169 if !testEqual(got, want) { 2170 t.Errorf("got %v, want %v", got, want) 2171 } 2172 2173 // Run a DML statement and an ordinary mutation in the same transaction. 2174 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2175 _, err := tx.Update(ctx, Statement{ 2176 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`, 2177 }) 2178 if err != nil { 2179 return err 2180 } 2181 return tx.BufferWrite([]*Mutation{ 2182 Insert("Singers", []string{"SingerId", "FirstName", "LastName"}, 2183 []interface{}{4, "Andy", "Irvine"}), 2184 }) 2185 }) 2186 if err != nil { 2187 t.Fatal(err) 2188 } 2189 got = readFirstNames(3, 4) 2190 want = []string{"Audra", "Andy"} 2191 if !testEqual(got, want) { 2192 t.Errorf("got %v, want %v", got, want) 2193 } 2194 2195 // Attempt to run a query using update. 2196 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2197 _, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`}) 2198 return err 2199 }) 2200 if got, want := ErrCode(err), codes.InvalidArgument; got != want { 2201 t.Errorf("got %s, want %s", got, want) 2202 } 2203} 2204 2205func TestIntegration_StructParametersBind(t *testing.T) { 2206 t.Parallel() 2207 2208 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2209 defer cancel() 2210 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 2211 defer cleanup() 2212 2213 type tRow []interface{} 2214 type tRows []struct{ trow tRow } 2215 2216 type allFields struct { 2217 Stringf string 2218 Intf int 2219 Boolf bool 2220 Floatf float64 2221 Bytef []byte 2222 Timef time.Time 2223 Datef civil.Date 2224 } 2225 allColumns := []string{ 2226 "Stringf", 2227 "Intf", 2228 "Boolf", 2229 "Floatf", 2230 "Bytef", 2231 "Timef", 2232 "Datef", 2233 } 2234 s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1} 2235 s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2} 2236 2237 dynamicStructType := reflect.StructOf([]reflect.StructField{ 2238 {Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`}, 2239 }) 2240 s3 := reflect.New(dynamicStructType) 2241 s3.Elem().Field(0).Set(reflect.ValueOf(t1)) 2242 2243 for i, test := range []struct { 2244 param interface{} 2245 sql string 2246 cols []string 2247 trows tRows 2248 }{ 2249 // Struct value. 2250 { 2251 s1, 2252 "SELECT" + 2253 " @p.Stringf," + 2254 " @p.Intf," + 2255 " @p.Boolf," + 2256 " @p.Floatf," + 2257 " @p.Bytef," + 2258 " @p.Timef," + 2259 " @p.Datef", 2260 allColumns, 2261 tRows{ 2262 {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}}, 2263 }, 2264 }, 2265 // Array of struct value. 2266 { 2267 []allFields{s1, s2}, 2268 "SELECT * FROM UNNEST(@p)", 2269 allColumns, 2270 tRows{ 2271 {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}}, 2272 {tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}}, 2273 }, 2274 }, 2275 // Null struct. 2276 { 2277 (*allFields)(nil), 2278 "SELECT @p IS NULL", 2279 []string{""}, 2280 tRows{ 2281 {tRow{true}}, 2282 }, 2283 }, 2284 // Null Array of struct. 2285 { 2286 []allFields(nil), 2287 "SELECT @p IS NULL", 2288 []string{""}, 2289 tRows{ 2290 {tRow{true}}, 2291 }, 2292 }, 2293 // Empty struct. 2294 { 2295 struct{}{}, 2296 "SELECT @p IS NULL ", 2297 []string{""}, 2298 tRows{ 2299 {tRow{false}}, 2300 }, 2301 }, 2302 // Empty array of struct. 2303 { 2304 []allFields{}, 2305 "SELECT * FROM UNNEST(@p) ", 2306 allColumns, 2307 tRows{}, 2308 }, 2309 // Struct with duplicate fields. 2310 { 2311 struct { 2312 A int `spanner:"field"` 2313 B int `spanner:"field"` 2314 }{10, 20}, 2315 "SELECT * FROM UNNEST([@p]) ", 2316 []string{"field", "field"}, 2317 tRows{ 2318 {tRow{10, 20}}, 2319 }, 2320 }, 2321 // Struct with unnamed fields. 2322 { 2323 struct { 2324 A string `spanner:""` 2325 }{"hello"}, 2326 "SELECT * FROM UNNEST([@p]) ", 2327 []string{""}, 2328 tRows{ 2329 {tRow{"hello"}}, 2330 }, 2331 }, 2332 // Mixed struct. 2333 { 2334 struct { 2335 DynamicStructField interface{} `spanner:"f1"` 2336 ArrayStructField []*allFields `spanner:"f2"` 2337 }{ 2338 DynamicStructField: s3.Interface(), 2339 ArrayStructField: []*allFields{nil}, 2340 }, 2341 "SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ", 2342 []string{"ff1", "", ""}, 2343 tRows{ 2344 {tRow{t1, 1, true}}, 2345 }, 2346 }, 2347 } { 2348 iter := client.Single().Query(ctx, Statement{ 2349 SQL: test.sql, 2350 Params: map[string]interface{}{"p": test.param}, 2351 }) 2352 var gotRows []*Row 2353 err := iter.Do(func(r *Row) error { 2354 gotRows = append(gotRows, r) 2355 return nil 2356 }) 2357 if err != nil { 2358 t.Errorf("Failed to execute test case %d, error: %v", i, err) 2359 } 2360 2361 var wantRows []*Row 2362 for j, row := range test.trows { 2363 r, err := NewRow(test.cols, row.trow) 2364 if err != nil { 2365 t.Errorf("Invalid row %d in test case %d", j, i) 2366 } 2367 wantRows = append(wantRows, r) 2368 } 2369 if !testEqual(gotRows, wantRows) { 2370 t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows) 2371 } 2372 } 2373} 2374 2375func TestIntegration_PDML(t *testing.T) { 2376 skipEmulatorTest(t) 2377 t.Parallel() 2378 2379 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2380 defer cancel() 2381 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2382 defer cleanup() 2383 2384 columns := []string{"SingerId", "FirstName", "LastName"} 2385 2386 // Populate the Singers table. 2387 var muts []*Mutation 2388 for _, row := range [][]interface{}{ 2389 {1, "Umm", "Kulthum"}, 2390 {2, "Eduard", "Khil"}, 2391 {3, "Audra", "McDonald"}, 2392 {4, "Enrique", "Iglesias"}, 2393 {5, "Shakira", "Ripoll"}, 2394 } { 2395 muts = append(muts, Insert("Singers", columns, row)) 2396 } 2397 if _, err := client.Apply(ctx, muts); err != nil { 2398 t.Fatal(err) 2399 } 2400 // Identifiers in PDML statements must be fully qualified. 2401 // TODO(jba): revisit the above. 2402 count, err := client.PartitionedUpdate(ctx, Statement{ 2403 SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= @end`, 2404 Params: map[string]interface{}{ 2405 "end": 3, 2406 }, 2407 }) 2408 if err != nil { 2409 t.Fatal(err) 2410 } 2411 if want := int64(3); count != want { 2412 t.Fatalf("got %d, want %d", count, want) 2413 } 2414 got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns)) 2415 if err != nil { 2416 t.Fatal(err) 2417 } 2418 want := [][]interface{}{ 2419 {int64(1), "changed", "Kulthum"}, 2420 {int64(2), "changed", "Khil"}, 2421 {int64(3), "changed", "McDonald"}, 2422 {int64(4), "Enrique", "Iglesias"}, 2423 {int64(5), "Shakira", "Ripoll"}, 2424 } 2425 if !testEqual(got, want) { 2426 t.Errorf("\ngot %v\nwant%v", got, want) 2427 } 2428} 2429 2430func TestBatchDML(t *testing.T) { 2431 t.Parallel() 2432 2433 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2434 defer cancel() 2435 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2436 defer cleanup() 2437 2438 columns := []string{"SingerId", "FirstName", "LastName"} 2439 2440 // Populate the Singers table. 2441 var muts []*Mutation 2442 for _, row := range [][]interface{}{ 2443 {1, "Umm", "Kulthum"}, 2444 {2, "Eduard", "Khil"}, 2445 {3, "Audra", "McDonald"}, 2446 } { 2447 muts = append(muts, Insert("Singers", columns, row)) 2448 } 2449 if _, err := client.Apply(ctx, muts); err != nil { 2450 t.Fatal(err) 2451 } 2452 2453 var counts []int64 2454 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2455 counts, err = tx.BatchUpdate(ctx, []Statement{ 2456 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2457 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`}, 2458 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2459 }) 2460 return err 2461 }) 2462 2463 if err != nil { 2464 t.Fatal(err) 2465 } 2466 if want := []int64{1, 1, 1}; !testEqual(counts, want) { 2467 t.Fatalf("got %d, want %d", counts, want) 2468 } 2469 got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns)) 2470 if err != nil { 2471 t.Fatal(err) 2472 } 2473 want := [][]interface{}{ 2474 {int64(1), "changed 1", "Kulthum"}, 2475 {int64(2), "changed 2", "Khil"}, 2476 {int64(3), "changed 3", "McDonald"}, 2477 } 2478 if !testEqual(got, want) { 2479 t.Errorf("\ngot %v\nwant%v", got, want) 2480 } 2481} 2482 2483func TestBatchDML_NoStatements(t *testing.T) { 2484 t.Parallel() 2485 2486 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2487 defer cancel() 2488 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2489 defer cleanup() 2490 2491 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2492 _, err = tx.BatchUpdate(ctx, []Statement{}) 2493 return err 2494 }) 2495 if err == nil { 2496 t.Fatal("expected error, got nil") 2497 } 2498 if s, ok := status.FromError(err); ok { 2499 if s.Code() != codes.InvalidArgument { 2500 t.Fatalf("expected InvalidArgument, got %v", err) 2501 } 2502 } else { 2503 t.Fatalf("expected InvalidArgument, got %v", err) 2504 } 2505} 2506 2507func TestBatchDML_TwoStatements(t *testing.T) { 2508 t.Parallel() 2509 2510 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2511 defer cancel() 2512 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2513 defer cleanup() 2514 2515 columns := []string{"SingerId", "FirstName", "LastName"} 2516 2517 // Populate the Singers table. 2518 var muts []*Mutation 2519 for _, row := range [][]interface{}{ 2520 {1, "Umm", "Kulthum"}, 2521 {2, "Eduard", "Khil"}, 2522 {3, "Audra", "McDonald"}, 2523 } { 2524 muts = append(muts, Insert("Singers", columns, row)) 2525 } 2526 if _, err := client.Apply(ctx, muts); err != nil { 2527 t.Fatal(err) 2528 } 2529 2530 var updateCount int64 2531 var batchCounts []int64 2532 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2533 batchCounts, err = tx.BatchUpdate(ctx, []Statement{ 2534 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2535 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`}, 2536 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2537 }) 2538 if err != nil { 2539 return err 2540 } 2541 2542 updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}) 2543 return err 2544 }) 2545 if err != nil { 2546 t.Fatal(err) 2547 } 2548 if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) { 2549 t.Fatalf("got %d, want %d", batchCounts, want) 2550 } 2551 if updateCount != 1 { 2552 t.Fatalf("got %v, want 1", updateCount) 2553 } 2554} 2555 2556// TODO(deklerk): this currently does not work because the transaction appears to 2557// get rolled back after a single statement fails. b/120158761 2558func TestBatchDML_Error(t *testing.T) { 2559 t.Parallel() 2560 2561 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2562 defer cancel() 2563 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2564 defer cleanup() 2565 2566 columns := []string{"SingerId", "FirstName", "LastName"} 2567 2568 // Populate the Singers table. 2569 var muts []*Mutation 2570 for _, row := range [][]interface{}{ 2571 {1, "Umm", "Kulthum"}, 2572 {2, "Eduard", "Khil"}, 2573 {3, "Audra", "McDonald"}, 2574 } { 2575 muts = append(muts, Insert("Singers", columns, row)) 2576 } 2577 if _, err := client.Apply(ctx, muts); err != nil { 2578 t.Fatal(err) 2579 } 2580 2581 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2582 counts, err := tx.BatchUpdate(ctx, []Statement{ 2583 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2584 {SQL: `some illegal statement`}, 2585 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2586 }) 2587 if err == nil { 2588 t.Fatal("expected err, got nil") 2589 } 2590 if want := []int64{1}; !testEqual(counts, want) { 2591 t.Fatalf("got %d, want %d", counts, want) 2592 } 2593 2594 got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns)) 2595 if err != nil { 2596 t.Fatal(err) 2597 } 2598 want := [][]interface{}{ 2599 {int64(1), "changed 1", "Kulthum"}, 2600 {int64(2), "Eduard", "Khil"}, 2601 {int64(3), "Audra", "McDonald"}, 2602 } 2603 if !testEqual(got, want) { 2604 t.Errorf("\ngot %v\nwant%v", got, want) 2605 } 2606 2607 return nil 2608 }) 2609 if err != nil { 2610 t.Fatal(err) 2611 } 2612} 2613 2614// Prepare initializes Cloud Spanner testing DB and clients. 2615func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) { 2616 if databaseAdmin == nil { 2617 t.Skip("Integration tests skipped") 2618 } 2619 // Construct a unique test DB name. 2620 dbName := dbNameSpace.New() 2621 2622 dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName) 2623 // Create database and tables. 2624 op, err := databaseAdmin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{ 2625 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 2626 CreateStatement: "CREATE DATABASE " + dbName, 2627 ExtraStatements: statements, 2628 }) 2629 if err != nil { 2630 t.Fatalf("cannot create testing DB %v: %v", dbPath, err) 2631 } 2632 if _, err := op.Wait(ctx); err != nil { 2633 t.Fatalf("cannot create testing DB %v: %v", dbPath, err) 2634 } 2635 client, err := createClient(ctx, dbPath, spc) 2636 if err != nil { 2637 t.Fatalf("cannot create data client on DB %v: %v", dbPath, err) 2638 } 2639 return client, dbPath, func() { 2640 client.Close() 2641 } 2642} 2643 2644func cleanupInstances() { 2645 if instanceAdmin == nil { 2646 // Integration tests skipped. 2647 return 2648 } 2649 2650 ctx := context.Background() 2651 parent := fmt.Sprintf("projects/%v", testProjectID) 2652 iter := instanceAdmin.ListInstances(ctx, &instancepb.ListInstancesRequest{ 2653 Parent: parent, 2654 Filter: "name:gotest-", 2655 }) 2656 expireAge := 24 * time.Hour 2657 2658 for { 2659 inst, err := iter.Next() 2660 if err == iterator.Done { 2661 break 2662 } 2663 if err != nil { 2664 panic(err) 2665 } 2666 if instanceNameSpace.Older(inst.Name, expireAge) { 2667 log.Printf("Deleting instance %s", inst.Name) 2668 2669 if err := instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{Name: inst.Name}); err != nil { 2670 log.Printf("failed to delete instance %s (error %v), might need a manual removal", 2671 inst.Name, err) 2672 } 2673 } 2674 } 2675} 2676 2677func rangeReads(ctx context.Context, t *testing.T, client *Client) { 2678 checkRange := func(ks KeySet, wantNums ...int) { 2679 if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok { 2680 t.Errorf("key set %+v: %s", ks, msg) 2681 } 2682 } 2683 2684 checkRange(Key{"k1"}, 1) 2685 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4) 2686 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5) 2687 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5) 2688 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4) 2689 2690 // Partial key specification. 2691 checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9) 2692 checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9) 2693 checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10) 2694 checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11) 2695 2696 // The following produce empty ranges. 2697 // TODO(jba): Consider a multi-part key to illustrate partial key behavior. 2698 // checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen}) 2699 // checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen}) 2700 // checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen}) 2701 // checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed}) 2702 2703 // Prefix is component-wise, not string prefix. 2704 checkRange(Key{"k1"}.AsPrefix(), 1) 2705 checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14) 2706 2707 checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14) 2708} 2709 2710func indexRangeReads(ctx context.Context, t *testing.T, client *Client) { 2711 checkRange := func(ks KeySet, wantNums ...int) { 2712 if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns), 2713 wantNums); !ok { 2714 t.Errorf("key set %+v: %s", ks, msg) 2715 } 2716 } 2717 2718 checkRange(Key{"v1"}, 1) 2719 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4) 2720 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5) 2721 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5) 2722 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4) 2723 2724 // // Partial key specification. 2725 checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9) 2726 checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9) 2727 checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10) 2728 checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11) 2729 2730 // // The following produce empty ranges. 2731 // checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen}) 2732 // checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen}) 2733 // checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen}) 2734 // checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed}) 2735 2736 // // Prefix is component-wise, not string prefix. 2737 checkRange(Key{"v1"}.AsPrefix(), 1) 2738 checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14) 2739 checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14) 2740 2741 // Read from an index with DESC ordering. 2742 wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0} 2743 if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns), 2744 wantNums); !ok { 2745 t.Errorf("desc: %s", msg) 2746 } 2747} 2748 2749type testTableRow struct{ Key, StringValue string } 2750 2751func compareRows(iter *RowIterator, wantNums []int) (string, bool) { 2752 rows, err := readAllTestTable(iter) 2753 if err != nil { 2754 return err.Error(), false 2755 } 2756 want := map[string]string{} 2757 for _, n := range wantNums { 2758 want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n) 2759 } 2760 got := map[string]string{} 2761 for _, r := range rows { 2762 got[r.Key] = r.StringValue 2763 } 2764 if !testEqual(got, want) { 2765 return fmt.Sprintf("got %v, want %v", got, want), false 2766 } 2767 return "", true 2768} 2769 2770func isNaN(x interface{}) bool { 2771 f, ok := x.(float64) 2772 if !ok { 2773 return false 2774 } 2775 return math.IsNaN(f) 2776} 2777 2778// createClient creates Cloud Spanner data client. 2779func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) { 2780 if os.Getenv("SPANNER_EMULATOR_HOST") == "" { 2781 client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{ 2782 SessionPoolConfig: spc, 2783 }, option.WithTokenSource(testutil.TokenSource(ctx, Scope, AdminScope)), option.WithEndpoint(endpoint)) 2784 } else { 2785 // When the emulator is enabled, option.WithoutAuthentication() 2786 // will be added automatically which is incompatible with 2787 // option.WithTokenSource(testutil.TokenSource(ctx, Scope)). 2788 client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}) 2789 } 2790 2791 if err != nil { 2792 return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err) 2793 } 2794 return client, nil 2795} 2796 2797// populate prepares the database with some data. 2798func populate(ctx context.Context, client *Client) error { 2799 // Populate data 2800 var err error 2801 m := InsertMap("test", map[string]interface{}{ 2802 "a": str1, 2803 "b": str2, 2804 }) 2805 _, err = client.Apply(ctx, []*Mutation{m}) 2806 return err 2807} 2808 2809func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) { 2810 if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) { 2811 return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false 2812 } 2813 return "", true 2814} 2815 2816func rowToValues(r *Row) ([]interface{}, error) { 2817 var x int64 2818 var y, z string 2819 if err := r.Column(0, &x); err != nil { 2820 return nil, err 2821 } 2822 if err := r.Column(1, &y); err != nil { 2823 return nil, err 2824 } 2825 if err := r.Column(2, &z); err != nil { 2826 return nil, err 2827 } 2828 return []interface{}{x, y, z}, nil 2829} 2830 2831func readAll(iter *RowIterator) ([][]interface{}, error) { 2832 defer iter.Stop() 2833 var vals [][]interface{} 2834 for { 2835 row, err := iter.Next() 2836 if err == iterator.Done { 2837 return vals, nil 2838 } 2839 if err != nil { 2840 return nil, err 2841 } 2842 v, err := rowToValues(row) 2843 if err != nil { 2844 return nil, err 2845 } 2846 vals = append(vals, v) 2847 } 2848} 2849 2850func readAllTestTable(iter *RowIterator) ([]testTableRow, error) { 2851 defer iter.Stop() 2852 var vals []testTableRow 2853 for { 2854 row, err := iter.Next() 2855 if err == iterator.Done { 2856 return vals, nil 2857 } 2858 if err != nil { 2859 return nil, err 2860 } 2861 var ttr testTableRow 2862 if err := row.ToStruct(&ttr); err != nil { 2863 return nil, err 2864 } 2865 vals = append(vals, ttr) 2866 } 2867} 2868 2869func maxDuration(a, b time.Duration) time.Duration { 2870 if a > b { 2871 return a 2872 } 2873 return b 2874} 2875 2876func skipEmulatorTest(t *testing.T) { 2877 if os.Getenv("SPANNER_EMULATOR_HOST") != "" { 2878 t.Skip("Skipping testing against the emulator.") 2879 } 2880} 2881